/* * @(#)SocketOrChannelConnectionImpl.java 1.91 04/06/21 * * Copyright 2004 Sun Microsystems, Inc. All rights reserved. * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms. */ package com.sun.corba.se.impl.transport; import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.ByteBuffer; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.security.AccessController; import java.security.PrivilegedAction; import java.util.Collections; import java.util.Hashtable; import java.util.HashMap; import java.util.Map; import org.omg.CORBA.COMM_FAILURE; import org.omg.CORBA.CompletionStatus; import org.omg.CORBA.DATA_CONVERSION; import org.omg.CORBA.INTERNAL; import org.omg.CORBA.MARSHAL; import org.omg.CORBA.OBJECT_NOT_EXIST; import org.omg.CORBA.SystemException; import com.sun.org.omg.SendingContext.CodeBase; import com.sun.corba.se.pept.broker.Broker; import com.sun.corba.se.pept.encoding.InputObject; import com.sun.corba.se.pept.encoding.OutputObject; import com.sun.corba.se.pept.protocol.MessageMediator; import com.sun.corba.se.pept.transport.Acceptor; import com.sun.corba.se.pept.transport.Connection; import com.sun.corba.se.pept.transport.ConnectionCache; import com.sun.corba.se.pept.transport.ContactInfo; import com.sun.corba.se.pept.transport.EventHandler; import com.sun.corba.se.pept.transport.InboundConnectionCache; import com.sun.corba.se.pept.transport.OutboundConnectionCache; import com.sun.corba.se.pept.transport.ResponseWaitingRoom; import com.sun.corba.se.pept.transport.Selector; import com.sun.corba.se.spi.ior.IOR; import com.sun.corba.se.spi.ior.iiop.GIOPVersion; import com.sun.corba.se.spi.logging.CORBALogDomains; import com.sun.corba.se.spi.orb.ORB ; import com.sun.corba.se.spi.orbutil.threadpool.NoSuchThreadPoolException; import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException; import com.sun.corba.se.spi.orbutil.threadpool.Work; import com.sun.corba.se.spi.protocol.CorbaMessageMediator; import com.sun.corba.se.spi.transport.CorbaContactInfo; import com.sun.corba.se.spi.transport.CorbaConnection; import com.sun.corba.se.spi.transport.CorbaResponseWaitingRoom; import com.sun.corba.se.spi.transport.ReadTimeouts; import com.sun.corba.se.impl.encoding.CachedCodeBase; import com.sun.corba.se.impl.encoding.CDRInputStream_1_0; import com.sun.corba.se.impl.encoding.CDROutputObject; import com.sun.corba.se.impl.encoding.CDROutputStream_1_0; import com.sun.corba.se.impl.encoding.CodeSetComponentInfo; import com.sun.corba.se.impl.encoding.OSFCodeSetRegistry; import com.sun.corba.se.impl.logging.ORBUtilSystemException; import com.sun.corba.se.impl.orbutil.ORBConstants; import com.sun.corba.se.impl.orbutil.ORBUtility; import com.sun.corba.se.impl.protocol.giopmsgheaders.Message; import com.sun.corba.se.impl.protocol.giopmsgheaders.MessageBase; import com.sun.corba.se.impl.transport.CorbaResponseWaitingRoomImpl; /** * @author Harold Carr */ public class SocketOrChannelConnectionImpl extends EventHandlerBase implements CorbaConnection, Work { public static boolean dprintWriteLocks = false; // // New transport. // protected long enqueueTime; protected SocketChannel socketChannel; public SocketChannel getSocketChannel() { return socketChannel; } // REVISIT: // protected for test: genericRPCMSGFramework.IIOPConnection constructor. protected CorbaContactInfo contactInfo; protected Acceptor acceptor; protected ConnectionCache connectionCache; // // From iiop.Connection.java // protected Socket socket; // The socket used for this connection. protected long timeStamp = 0; protected boolean isServer = false; // Start at some value other than zero since this is a magic // value in some protocols. protected int requestId = 5; protected CorbaResponseWaitingRoom responseWaitingRoom; protected int state; protected java.lang.Object stateEvent = new java.lang.Object(); protected java.lang.Object writeEvent = new java.lang.Object(); protected boolean writeLocked; protected int serverRequestCount = 0; // Server request map: used on the server side of Connection // Maps request ID to IIOPInputStream. Map serverRequestMap = null; // This is a flag associated per connection telling us if the // initial set of sending contexts were sent to the receiver // already... protected boolean postInitialContexts = false; // Remote reference to CodeBase server (supplies // FullValueDescription, among other things) protected IOR codeBaseServerIOR; // CodeBase cache for this connection. This will cache remote operations, // handle connecting, and ensure we don't do any remote operations until // necessary. protected CachedCodeBase cachedCodeBase = new CachedCodeBase(this); protected ORBUtilSystemException wrapper ; // transport read timeout values protected ReadTimeouts readTimeouts; protected boolean shouldReadGiopHeaderOnly; // A message mediator used when shouldReadGiopHeaderOnly is // true to maintain request message state across execution in a // SelectorThread and WorkerThread. protected CorbaMessageMediator partialMessageMediator = null; // Used in genericRPCMSGFramework test. protected SocketOrChannelConnectionImpl(ORB orb) { this.orb = orb; wrapper = ORBUtilSystemException.get( orb, CORBALogDomains.RPC_TRANSPORT ) ; setWork(this); responseWaitingRoom = new CorbaResponseWaitingRoomImpl(orb, this); setReadTimeouts(orb.getORBData().getTransportTCPReadTimeouts()); } // Both client and servers. protected SocketOrChannelConnectionImpl(ORB orb, boolean useSelectThreadToWait, boolean useWorkerThread) { this(orb) ; setUseSelectThreadToWait(useSelectThreadToWait); setUseWorkerThreadForEvent(useWorkerThread); } // Client constructor. public SocketOrChannelConnectionImpl(ORB orb, CorbaContactInfo contactInfo, boolean useSelectThreadToWait, boolean useWorkerThread, String socketType, String hostname, int port) { this(orb, useSelectThreadToWait, useWorkerThread); this.contactInfo = contactInfo; try { socket = orb.getORBData().getSocketFactory() .createSocket(socketType, new InetSocketAddress(hostname, port)); socketChannel = socket.getChannel(); if (socketChannel != null) { boolean isBlocking = !useSelectThreadToWait; socketChannel.configureBlocking(isBlocking); } else { // IMPORTANT: non-channel-backed sockets must use // dedicated reader threads. setUseSelectThreadToWait(false); } if (orb.transportDebugFlag) { dprint(".initialize: connection created: " + socket); } } catch (Throwable t) { throw wrapper.connectFailure(t, socketType, hostname, Integer.toString(port)); } state = OPENING; } // Client-side convenience. public SocketOrChannelConnectionImpl(ORB orb, CorbaContactInfo contactInfo, String socketType, String hostname, int port) { this(orb, contactInfo, orb.getORBData().connectionSocketUseSelectThreadToWait(), orb.getORBData().connectionSocketUseWorkerThreadForEvent(), socketType, hostname, port); } // Server-side constructor. public SocketOrChannelConnectionImpl(ORB orb, Acceptor acceptor, Socket socket, boolean useSelectThreadToWait, boolean useWorkerThread) { this(orb, useSelectThreadToWait, useWorkerThread); this.socket = socket; socketChannel = socket.getChannel(); if (socketChannel != null) { // REVISIT try { boolean isBlocking = !useSelectThreadToWait; socketChannel.configureBlocking(isBlocking); } catch (IOException e) { RuntimeException rte = new RuntimeException(); rte.initCause(e); throw rte; } } this.acceptor = acceptor; serverRequestMap = Collections.synchronizedMap(new HashMap()); isServer = true; state = ESTABLISHED; } // Server-side convenience public SocketOrChannelConnectionImpl(ORB orb, Acceptor acceptor, Socket socket) { this(orb, acceptor, socket, (socket.getChannel() == null ? false : orb.getORBData().connectionSocketUseSelectThreadToWait()), (socket.getChannel() == null ? false : orb.getORBData().connectionSocketUseWorkerThreadForEvent())); } //////////////////////////////////////////////////// // // framework.transport.Connection // public boolean shouldRegisterReadEvent() { return true; } public boolean shouldRegisterServerReadEvent() { return true; } public boolean read() { try { if (orb.transportDebugFlag) { dprint(".read->: " + this); } CorbaMessageMediator messageMediator = readBits(); if (messageMediator != null) { // Null can happen when client closes stream // causing purgecalls. return dispatch(messageMediator); } return true; } finally { if (orb.transportDebugFlag) { dprint(".read<-: " + this); } } } protected CorbaMessageMediator readBits() { try { if (orb.transportDebugFlag) { dprint(".readBits->: " + this); } MessageMediator messageMediator; // REVISIT - use common factory base class. if (contactInfo != null) { messageMediator = contactInfo.createMessageMediator(orb, this); } else if (acceptor != null) { messageMediator = acceptor.createMessageMediator(orb, this); } else { throw new RuntimeException("SocketOrChannelConnectionImpl.readBits"); } return (CorbaMessageMediator) messageMediator; } catch (ThreadDeath td) { if (orb.transportDebugFlag) { dprint(".readBits: " + this + ": ThreadDeath: " + td, td); } try { purgeCalls(wrapper.connectionAbort(td), false, false); } catch (Throwable t) { if (orb.transportDebugFlag) { dprint(".readBits: " + this + ": purgeCalls: Throwable: " + t, t); } } throw td; } catch (Throwable ex) { if (orb.transportDebugFlag) { dprint(".readBits: " + this + ": Throwable: " + ex, ex); } try { if (ex instanceof INTERNAL) { sendMessageError(GIOPVersion.DEFAULT_VERSION); } } catch (IOException e) { if (orb.transportDebugFlag) { dprint(".readBits: " + this + ": sendMessageError: IOException: " + e, e); } } // REVISIT - make sure reader thread is killed. orb.getTransportManager().getSelector(0).unregisterForEvent(this); // Notify anyone waiting. purgeCalls(wrapper.connectionAbort(ex), true, false); // REVISIT //keepRunning = false; // REVISIT - if this is called after purgeCalls then // the state of the socket is ABORT so the writeLock // in close throws an exception. It is ignored but // causes IBM (screen scraping) tests to fail. //close(); } finally { if (orb.transportDebugFlag) { dprint(".readBits<-: " + this); } } return null; } protected CorbaMessageMediator finishReadingBits(MessageMediator messageMediator) { try { if (orb.transportDebugFlag) { dprint(".finishReadingBits->: " + this); } // REVISIT - use common factory base class. if (contactInfo != null) { messageMediator = contactInfo.finishCreatingMessageMediator(orb, this, messageMediator); } else if (acceptor != null) { messageMediator = acceptor.finishCreatingMessageMediator(orb, this, messageMediator); } else { throw new RuntimeException("SocketOrChannelConnectionImpl.finishReadingBits"); } return (CorbaMessageMediator) messageMediator; } catch (ThreadDeath td) { if (orb.transportDebugFlag) { dprint(".finishReadingBits: " + this + ": ThreadDeath: " + td, td); } try { purgeCalls(wrapper.connectionAbort(td), false, false); } catch (Throwable t) { if (orb.transportDebugFlag) { dprint(".finishReadingBits: " + this + ": purgeCalls: Throwable: " + t, t); } } throw td; } catch (Throwable ex) { if (orb.transportDebugFlag) { dprint(".finishReadingBits: " + this + ": Throwable: " + ex, ex); } try { if (ex instanceof INTERNAL) { sendMessageError(GIOPVersion.DEFAULT_VERSION); } } catch (IOException e) { if (orb.transportDebugFlag) { dprint(".finishReadingBits: " + this + ": sendMessageError: IOException: " + e, e); } } // REVISIT - make sure reader thread is killed. orb.getTransportManager().getSelector(0).unregisterForEvent(this); // Notify anyone waiting. purgeCalls(wrapper.connectionAbort(ex), true, false); // REVISIT //keepRunning = false; // REVISIT - if this is called after purgeCalls then // the state of the socket is ABORT so the writeLock // in close throws an exception. It is ignored but // causes IBM (screen scraping) tests to fail. //close(); } finally { if (orb.transportDebugFlag) { dprint(".finishReadingBits<-: " + this); } } return null; } protected boolean dispatch(CorbaMessageMediator messageMediator) { try { if (orb.transportDebugFlag) { dprint(".dispatch->: " + this); } // // NOTE: // // This call is the transition from the tranport block // to the protocol block. // boolean result = messageMediator.getProtocolHandler() .handleRequest(messageMediator); return result; } catch (ThreadDeath td) { if (orb.transportDebugFlag) { dprint(".dispatch: ThreadDeath", td ); } try { purgeCalls(wrapper.connectionAbort(td), false, false); } catch (Throwable t) { if (orb.transportDebugFlag) { dprint(".dispatch: purgeCalls: Throwable", t); } } throw td; } catch (Throwable ex) { if (orb.transportDebugFlag) { dprint(".dispatch: Throwable", ex ) ; } try { if (ex instanceof INTERNAL) { sendMessageError(GIOPVersion.DEFAULT_VERSION); } } catch (IOException e) { if (orb.transportDebugFlag) { dprint(".dispatch: sendMessageError: IOException", e); } } purgeCalls(wrapper.connectionAbort(ex), false, false); // REVISIT //keepRunning = false; } finally { if (orb.transportDebugFlag) { dprint(".dispatch<-: " + this); } } return true; } public boolean shouldUseDirectByteBuffers() { return getSocketChannel() != null; } public ByteBuffer read(int size, int offset, int length, long max_wait_time) throws IOException { if (shouldUseDirectByteBuffers()) { ByteBuffer byteBuffer = orb.getByteBufferPool().getByteBuffer(size); if (orb.transportDebugFlag) { // print address of ByteBuffer gotten from pool int bbAddress = System.identityHashCode(byteBuffer); StringBuffer sb = new StringBuffer(80); sb.append(".read: got ByteBuffer id ("); sb.append(bbAddress).append(") from ByteBufferPool."); String msgStr = sb.toString(); dprint(msgStr); } byteBuffer.position(offset); byteBuffer.limit(size); readFully(byteBuffer, length, max_wait_time); return byteBuffer; } byte[] buf = new byte[size]; readFully(getSocket().getInputStream(), buf, offset, length, max_wait_time); ByteBuffer byteBuffer = ByteBuffer.wrap(buf); byteBuffer.limit(size); return byteBuffer; } public ByteBuffer read(ByteBuffer byteBuffer, int offset, int length, long max_wait_time) throws IOException { int size = offset + length; if (shouldUseDirectByteBuffers()) { if (! byteBuffer.isDirect()) { throw wrapper.unexpectedNonDirectByteBufferWithChannelSocket(); } if (size > byteBuffer.capacity()) { if (orb.transportDebugFlag) { // print address of ByteBuffer being released int bbAddress = System.identityHashCode(byteBuffer); StringBuffer bbsb = new StringBuffer(80); bbsb.append(".read: releasing ByteBuffer id (") .append(bbAddress).append(") to ByteBufferPool."); String bbmsg = bbsb.toString(); dprint(bbmsg); } orb.getByteBufferPool().releaseByteBuffer(byteBuffer); byteBuffer = orb.getByteBufferPool().getByteBuffer(size); } byteBuffer.position(offset); byteBuffer.limit(size); readFully(byteBuffer, length, max_wait_time); byteBuffer.position(0); byteBuffer.limit(size); return byteBuffer; } if (byteBuffer.isDirect()) { throw wrapper.unexpectedDirectByteBufferWithNonChannelSocket(); } byte[] buf = new byte[size]; readFully(getSocket().getInputStream(), buf, offset, length, max_wait_time); return ByteBuffer.wrap(buf); } public void readFully(ByteBuffer byteBuffer, int size, long max_wait_time) throws IOException { int n = 0; int bytecount = 0; long time_to_wait = readTimeouts.get_initial_time_to_wait(); long total_time_in_wait = 0; // The reading of data incorporates a strategy to detect a // rogue client. The strategy is implemented as follows. As // long as data is being read, at least 1 byte or more, we // assume we have a well behaved client. If no data is read, // then we sleep for a time to wait, re-calculate a new time to // wait which is lengthier than the previous time spent waiting. // Then, if the total time spent waiting does not exceed a // maximum time we are willing to wait, we attempt another // read. If the maximum amount of time we are willing to // spend waiting for more data is exceeded, we throw an // IOException. // NOTE: Reading of GIOP headers are treated with a smaller // maximum time to wait threshold. Based on extensive // performance testing, all GIOP headers are being // read in 1 read access. do { bytecount = getSocketChannel().read(byteBuffer); if (bytecount < 0) { throw new IOException("End-of-stream"); } else if (bytecount == 0) { try { Thread.sleep(time_to_wait); total_time_in_wait += time_to_wait; time_to_wait = (long)(time_to_wait*readTimeouts.get_backoff_factor()); } catch (InterruptedException ie) { // ignore exception if (orb.transportDebugFlag) { dprint("readFully(): unexpected exception " + ie.toString()); } } } else { n += bytecount; } } while (n < size && total_time_in_wait < max_wait_time); if (n < size && total_time_in_wait >= max_wait_time) { // failed to read entire message throw wrapper.transportReadTimeoutExceeded(new Integer(size), new Integer(n), new Long(max_wait_time), new Long(total_time_in_wait)); } getConnectionCache().stampTime(this); } // To support non-channel connections. public void readFully(java.io.InputStream is, byte[] buf, int offset, int size, long max_wait_time) throws IOException { int n = 0; int bytecount = 0; long time_to_wait = readTimeouts.get_initial_time_to_wait(); long total_time_in_wait = 0; // The reading of data incorporates a strategy to detect a // rogue client. The strategy is implemented as follows. As // long as data is being read, at least 1 byte or more, we // assume we have a well behaved client. If no data is read, // then we sleep for a time to wait, re-calculate a new time to // wait which is lengthier than the previous time spent waiting. // Then, if the total time spent waiting does not exceed a // maximum time we are willing to wait, we attempt another // read. If the maximum amount of time we are willing to // spend waiting for more data is exceeded, we throw an // IOException. // NOTE: Reading of GIOP headers are treated with a smaller // maximum time to wait threshold. Based on extensive // performance testing, all GIOP headers are being // read in 1 read access. do { bytecount = is.read(buf, offset + n, size - n); if (bytecount < 0) { throw new IOException("End-of-stream"); } else if (bytecount == 0) { try { Thread.sleep(time_to_wait); total_time_in_wait += time_to_wait; time_to_wait = (long)(time_to_wait*readTimeouts.get_backoff_factor()); } catch (InterruptedException ie) { // ignore exception if (orb.transportDebugFlag) { dprint("readFully(): unexpected exception " + ie.toString()); } } } else { n += bytecount; } } while (n < size && total_time_in_wait < max_wait_time); if (n < size && total_time_in_wait >= max_wait_time) { // failed to read entire message throw wrapper.transportReadTimeoutExceeded(new Integer(size), new Integer(n), new Long(max_wait_time), new Long(total_time_in_wait)); } getConnectionCache().stampTime(this); } public void write(ByteBuffer byteBuffer) throws IOException { if (shouldUseDirectByteBuffers()) { /* NOTE: cannot perform this test. If one ask for a ByteBuffer from the pool which is bigger than the size of ByteBuffers managed by the pool, then the pool will return a HeapByteBuffer. if (byteBuffer.hasArray()) { throw wrapper.unexpectedNonDirectByteBufferWithChannelSocket(); } */ // IMPORTANT: For non-blocking SocketChannels, there's no guarantee // all bytes are written on first write attempt. do { getSocketChannel().write(byteBuffer); } while (byteBuffer.hasRemaining()); } else { if (! byteBuffer.hasArray()) { throw wrapper.unexpectedDirectByteBufferWithNonChannelSocket(); } byte[] tmpBuf = byteBuffer.array(); getSocket().getOutputStream().write(tmpBuf, 0, byteBuffer.limit()); getSocket().getOutputStream().flush(); } // TimeStamp connection to indicate it has been used // Note granularity of connection usage is assumed for // now to be that of a IIOP packet. getConnectionCache().stampTime(this); } /** * Note:it is possible for this to be called more than once */ public synchronized void close() { try { if (orb.transportDebugFlag) { dprint(".close->: " + this); } writeLock(); // REVISIT It will be good to have a read lock on the reader thread // before we proceed further, to avoid the reader thread (server side) // from processing requests. This avoids the risk that a new request // will be accepted by ReaderThread while the ListenerThread is // attempting to close this connection. if (isBusy()) { // we are busy! writeUnlock(); if (orb.transportDebugFlag) { dprint(".close: isBusy so no close: " + this); } return; } try { try { sendCloseConnection(GIOPVersion.V1_0); } catch (Throwable t) { wrapper.exceptionWhenSendingCloseConnection(t); } synchronized ( stateEvent ){ state = CLOSE_SENT; stateEvent.notifyAll(); } // stop the reader without causing it to do purgeCalls //Exception ex = new Exception(); //reader.stop(ex); // REVISIT // NOTE: !!!!!! // This does writeUnlock(). purgeCalls(wrapper.connectionRebind(), false, true); } catch (Exception ex) { if (orb.transportDebugFlag) { dprint(".close: exception: " + this, ex); } } try { Selector selector = orb.getTransportManager().getSelector(0); selector.unregisterForEvent(this); if (socketChannel != null) { socketChannel.close(); } socket.close(); } catch (IOException e) { if (orb.transportDebugFlag) { dprint(".close: " + this, e); } } } finally { if (orb.transportDebugFlag) { dprint(".close<-: " + this); } } } public Acceptor getAcceptor() { return acceptor; } public ContactInfo getContactInfo() { return contactInfo; } public EventHandler getEventHandler() { return this; } public OutputObject createOutputObject(MessageMediator messageMediator) { // REVISIT - remove this method from Connection and all it subclasses. throw new RuntimeException("*****SocketOrChannelConnectionImpl.createOutputObject - should not be called."); } // This is used by the GIOPOutputObject in order to // throw the correct error when handling code sets. // Can we determine if we are on the server side by // other means? XREVISIT public boolean isServer() { return isServer; } public boolean isBusy() { if (serverRequestCount > 0 || getResponseWaitingRoom().numberRegistered() > 0) { return true; } else { return false; } } public long getTimeStamp() { return timeStamp; } public void setTimeStamp(long time) { timeStamp = time; } public void setState(String stateString) { synchronized (stateEvent) { if (stateString.equals("ESTABLISHED")) { state = ESTABLISHED; stateEvent.notifyAll(); } else { // REVISIT: ASSERT } } } /** * Sets the writeLock for this connection. * If the writeLock is already set by someone else, block till the * writeLock is released and can set by us. * IMPORTANT: this connection's lock must be acquired before * setting the writeLock and must be unlocked after setting the writeLock. */ public void writeLock() { try { if (dprintWriteLocks && orb.transportDebugFlag) { dprint(".writeLock->: " + this); } // Keep looping till we can set the writeLock. while ( true ) { int localState = state; switch ( localState ) { case OPENING: synchronized (stateEvent) { if (state != OPENING) { // somebody has changed 'state' so be careful break; } try { stateEvent.wait(); } catch (InterruptedException ie) { if (orb.transportDebugFlag) { dprint(".writeLock: OPENING InterruptedException: " + this); } } } // Loop back break; case ESTABLISHED: synchronized (writeEvent) { if (!writeLocked) { writeLocked = true; return; } try { // do not stay here too long if state != ESTABLISHED // Bug 4752117 while (state == ESTABLISHED && writeLocked) { writeEvent.wait(100); } } catch (InterruptedException ie) { if (orb.transportDebugFlag) { dprint(".writeLock: ESTABLISHED InterruptedException: " + this); } } } // Loop back break; // // XXX // Need to distinguish between client and server roles // here probably. // case ABORT: synchronized ( stateEvent ){ if (state != ABORT) { break; } throw wrapper.writeErrorSend() ; } case CLOSE_RECVD: // the connection has been closed or closing // ==> throw rebind exception synchronized ( stateEvent ){ if (state != CLOSE_RECVD) { break; } throw wrapper.connectionCloseRebind() ; } default: if (orb.transportDebugFlag) { dprint(".writeLock: default: " + this); } // REVISIT throw new RuntimeException(".writeLock: bad state"); } } } finally { if (dprintWriteLocks && orb.transportDebugFlag) { dprint(".writeLock<-: " + this); } } } public void writeUnlock() { try { if (dprintWriteLocks && orb.transportDebugFlag) { dprint(".writeUnlock->: " + this); } synchronized (writeEvent) { writeLocked = false; writeEvent.notify(); // wake up one guy waiting to write } } finally { if (dprintWriteLocks && orb.transportDebugFlag) { dprint(".writeUnlock<-: " + this); } } } // Assumes the caller handles writeLock and writeUnlock public void sendWithoutLock(OutputObject outputObject) { // Don't we need to check for CloseConnection // here? REVISIT // XREVISIT - Shouldn't the MessageMediator // be the one to handle writing the data here? try { // Write the fragment/message CDROutputObject cdrOutputObject = (CDROutputObject) outputObject; cdrOutputObject.writeTo(this); // REVISIT - no flush? //socket.getOutputStream().flush(); } catch (IOException e1) { /* * ADDED(Ram J) 10/13/2000 In the event of an IOException, try * sending a CancelRequest for regular requests / locate requests */ // Since IIOPOutputStream's msgheader is set only once, and not // altered during sending multiple fragments, the original // msgheader will always have the requestId. // REVISIT This could be optimized to send a CancelRequest only // if any fragments had been sent already. /* REVISIT: MOVE TO SUBCONTRACT Message msg = os.getMessage(); if (msg.getType() == Message.GIOPRequest || msg.getType() == Message.GIOPLocateRequest) { GIOPVersion requestVersion = msg.getGIOPVersion(); int requestId = MessageBase.getRequestId(msg); try { sendCancelRequest(requestVersion, requestId); } catch (IOException e2) { // most likely an abortive connection closure. // ignore, since nothing more can be done. if (orb.transportDebugFlag) { } } */ // REVISIT When a send failure happens, purgeCalls() need to be // called to ensure that the connection is properly removed from // further usage (ie., cancelling pending requests with COMM_FAILURE // with an appropriate minor_code CompletionStatus.MAY_BE). // Relying on the IIOPOutputStream (as noted below) is not // sufficient as it handles COMM_FAILURE only for the final // fragment (during invoke processing). Note that COMM_FAILURE could // happen while sending the initial fragments. // Also the IIOPOutputStream does not properly close the connection. // It simply removes the connection from the table. An orderly // closure is needed (ie., cancel pending requests on the connection // COMM_FAILURE as well. // IIOPOutputStream will cleanup the connection info when it // sees this exception. throw wrapper.writeErrorSend(e1) ; } } public void registerWaiter(MessageMediator messageMediator) { responseWaitingRoom.registerWaiter(messageMediator); } public void unregisterWaiter(MessageMediator messageMediator) { responseWaitingRoom.unregisterWaiter(messageMediator); } public InputObject waitForResponse(MessageMediator messageMediator) { return responseWaitingRoom.waitForResponse(messageMediator); } public void setConnectionCache(ConnectionCache connectionCache) { this.connectionCache = connectionCache; } public ConnectionCache getConnectionCache() { return connectionCache; } //////////////////////////////////////////////////// // // EventHandler methods // public void setUseSelectThreadToWait(boolean x) { useSelectThreadToWait = x; // REVISIT - Reading of a GIOP header only is information // that should be passed into the constructor // from the SocketOrChannelConnection factory. setReadGiopHeaderOnly(shouldUseSelectThreadToWait()); } public void handleEvent() { if (orb.transportDebugFlag) { dprint(".handleEvent->: " + this); } getSelectionKey().interestOps(getSelectionKey().interestOps() & (~ getInterestOps())); if (shouldUseWorkerThreadForEvent()) { Throwable throwable = null; try { int poolToUse = 0; if (shouldReadGiopHeaderOnly()) { partialMessageMediator = readBits(); poolToUse = partialMessageMediator.getThreadPoolToUse(); } if (orb.transportDebugFlag) { dprint(".handleEvent: addWork to pool: " + poolToUse); } orb.getThreadPoolManager().getThreadPool(poolToUse) .getWorkQueue(0).addWork(getWork()); } catch (NoSuchThreadPoolException e) { throwable = e; } catch (NoSuchWorkQueueException e) { throwable = e; } // REVISIT: need to close connection. if (throwable != null) { if (orb.transportDebugFlag) { dprint(".handleEvent: " + throwable); } INTERNAL i = new INTERNAL("NoSuchThreadPoolException"); i.initCause(throwable); throw i; } } else { if (orb.transportDebugFlag) { dprint(".handleEvent: doWork"); } getWork().doWork(); } if (orb.transportDebugFlag) { dprint(".handleEvent<-: " + this); } } public SelectableChannel getChannel() { return socketChannel; } public int getInterestOps() { return SelectionKey.OP_READ; } // public Acceptor getAcceptor() - already defined above. public Connection getConnection() { return this; } //////////////////////////////////////////////////// // // Work methods. // public String getName() { return this.toString(); } public void doWork() { try { if (orb.transportDebugFlag) { dprint(".doWork->: " + this); } // IMPORTANT: Sanity checks on SelectionKeys such as // SelectorKey.isValid() should not be done // here. // if (!shouldReadGiopHeaderOnly()) { read(); } else { // get the partialMessageMediator // created by SelectorThread CorbaMessageMediator messageMediator = this.getPartialMessageMediator(); // read remaining info needed in a MessageMediator messageMediator = finishReadingBits(messageMediator); if (messageMediator != null) { // Null can happen when client closes stream // causing purgecalls. dispatch(messageMediator); } } } catch (Throwable t) { if (orb.transportDebugFlag) { dprint(".doWork: ignoring Throwable: " + t + " " + this); } } finally { if (orb.transportDebugFlag) { dprint(".doWork<-: " + this); } } } public void setEnqueueTime(long timeInMillis) { enqueueTime = timeInMillis; } public long getEnqueueTime() { return enqueueTime; } //////////////////////////////////////////////////// // // spi.transport.CorbaConnection. // // IMPORTANT: Reader Threads must NOT read Giop header only. public boolean shouldReadGiopHeaderOnly() { return shouldReadGiopHeaderOnly; } protected void setReadGiopHeaderOnly(boolean shouldReadHeaderOnly) { shouldReadGiopHeaderOnly = shouldReadHeaderOnly; } public ResponseWaitingRoom getResponseWaitingRoom() { return responseWaitingRoom; } // REVISIT - inteface defines isServer but already defined in // higher interface. public void serverRequestMapPut(int requestId, CorbaMessageMediator messageMediator) { serverRequestMap.put(new Integer(requestId), messageMediator); } public CorbaMessageMediator serverRequestMapGet(int requestId) { return (CorbaMessageMediator) serverRequestMap.get(new Integer(requestId)); } public void serverRequestMapRemove(int requestId) { serverRequestMap.remove(new Integer(requestId)); } // REVISIT: this is also defined in: // com.sun.corba.se.spi.legacy.connection.Connection public java.net.Socket getSocket() { return socket; } /** It is possible for a Close Connection to have been ** sent here, but we will not check for this. A "lazy" ** Exception will be thrown in the Worker thread after the ** incoming request has been processed even though the connection ** is closed before the request is processed. This is o.k because ** it is a boundary condition. To prevent it we would have to add ** more locks which would reduce performance in the normal case. **/ public synchronized void serverRequestProcessingBegins() { serverRequestCount++; } public synchronized void serverRequestProcessingEnds() { serverRequestCount--; } // // // public synchronized int getNextRequestId() { return requestId++; } // Negotiated code sets for char and wchar data protected CodeSetComponentInfo.CodeSetContext codeSetContext = null; public ORB getBroker() { return orb; } public CodeSetComponentInfo.CodeSetContext getCodeSetContext() { // Needs to be synchronized for the following case when the client // doesn't send the code set context twice, and we have two threads // in ServerRequestDispatcher processCodeSetContext. // // Thread A checks to see if there is a context, there is none, so // it calls setCodeSetContext, getting the synch lock. // Thread B checks to see if there is a context. If we didn't synch, // it might decide to outlaw wchar/wstring. if (codeSetContext == null) { synchronized(this) { return codeSetContext; } } return codeSetContext; } public synchronized void setCodeSetContext(CodeSetComponentInfo.CodeSetContext csc) { // Double check whether or not we need to do this if (codeSetContext == null) { if (OSFCodeSetRegistry.lookupEntry(csc.getCharCodeSet()) == null || OSFCodeSetRegistry.lookupEntry(csc.getWCharCodeSet()) == null) { // If the client says it's negotiated a code set that // isn't a fallback and we never said we support, then // it has a bug. throw wrapper.badCodesetsFromClient() ; } codeSetContext = csc; } } // // from iiop.IIOPConnection.java // // Map request ID to an InputObject. // This is so the client thread can start unmarshaling // the reply and remove it from the out_calls map while the // ReaderThread can still obtain the input stream to give // new fragments. Only the ReaderThread touches the clientReplyMap, // so it doesn't incur synchronization overhead. public MessageMediator clientRequestMapGet(int requestId) { return responseWaitingRoom.getMessageMediator(requestId); } protected MessageMediator clientReply_1_1; public void clientReply_1_1_Put(MessageMediator x) { clientReply_1_1 = x; } public MessageMediator clientReply_1_1_Get() { return clientReply_1_1; } public void clientReply_1_1_Remove() { clientReply_1_1 = null; } protected MessageMediator serverRequest_1_1; public void serverRequest_1_1_Put(MessageMediator x) { serverRequest_1_1 = x; } public MessageMediator serverRequest_1_1_Get() { return serverRequest_1_1; } public void serverRequest_1_1_Remove() { serverRequest_1_1 = null; } protected String getStateString( int state ) { synchronized ( stateEvent ){ switch (state) { case OPENING : return "OPENING" ; case ESTABLISHED : return "ESTABLISHED" ; case CLOSE_SENT : return "CLOSE_SENT" ; case CLOSE_RECVD : return "CLOSE_RECVD" ; case ABORT : return "ABORT" ; default : return "???" ; } } } public synchronized boolean isPostInitialContexts() { return postInitialContexts; } // Can never be unset... public synchronized void setPostInitialContexts(){ postInitialContexts = true; } /** * Wake up the outstanding requests on the connection, and hand them * COMM_FAILURE exception with a given minor code. * * Also, delete connection from connection table and * stop the reader thread. * Note that this should only ever be called by the Reader thread for * this connection. * * @param minor_code The minor code for the COMM_FAILURE major code. * @param die Kill the reader thread (this thread) before exiting. */ public void purgeCalls(SystemException systemException, boolean die, boolean lockHeld) { int minor_code = systemException.minor; try{ if (orb.transportDebugFlag) { dprint(".purgeCalls->: " + minor_code + "/" + die + "/" + lockHeld + " " + this); } // If this invocation is a result of ThreadDeath caused // by a previous execution of this routine, just exit. synchronized ( stateEvent ){ if ((state == ABORT) || (state == CLOSE_RECVD)) { if (orb.transportDebugFlag) { dprint(".purgeCalls: exiting since state is: " + getStateString(state) + " " + this); } return; } } // Grab the writeLock (freeze the calls) try { if (!lockHeld) { writeLock(); } } catch (SystemException ex) { if (orb.transportDebugFlag) dprint(".purgeCalls: SystemException" + ex + "; continuing " + this); } // Mark the state of the connection // and determine the request status org.omg.CORBA.CompletionStatus completion_status; synchronized ( stateEvent ){ if (minor_code == ORBUtilSystemException.CONNECTION_REBIND) { state = CLOSE_RECVD; systemException.completed = CompletionStatus.COMPLETED_NO; } else { state = ABORT; systemException.completed = CompletionStatus.COMPLETED_MAYBE; } stateEvent.notifyAll(); } try { socket.getInputStream().close(); socket.getOutputStream().close(); socket.close(); } catch (Exception ex) { if (orb.transportDebugFlag) { dprint(".purgeCalls: Exception closing socket: " + ex + " " + this); } } // Signal all threads with outstanding requests on this // connection and give them the SystemException; responseWaitingRoom.signalExceptionToAllWaiters(systemException); if (contactInfo != null) { ((OutboundConnectionCache)getConnectionCache()).remove(contactInfo); } else if (acceptor != null) { ((InboundConnectionCache)getConnectionCache()).remove(this); } // // REVISIT: Stop the reader thread // // Signal all the waiters of the writeLock. // There are 4 types of writeLock waiters: // 1. Send waiters: // 2. SendReply waiters: // 3. cleanUp waiters: // 4. purge_call waiters: // writeUnlock(); } finally { if (orb.transportDebugFlag) { dprint(".purgeCalls<-: " + minor_code + "/" + die + "/" + lockHeld + " " + this); } } } /************************************************************************* * The following methods are for dealing with Connection cleaning for * better scalability of servers in high network load conditions. **************************************************************************/ public void sendCloseConnection(GIOPVersion giopVersion) throws IOException { Message msg = MessageBase.createCloseConnection(giopVersion); sendHelper(giopVersion, msg); } public void sendMessageError(GIOPVersion giopVersion) throws IOException { Message msg = MessageBase.createMessageError(giopVersion); sendHelper(giopVersion, msg); } /** * Send a CancelRequest message. This does not lock the connection, so the * caller needs to ensure this method is called appropriately. * @exception IOException - could be due to abortive connection closure. */ public void sendCancelRequest(GIOPVersion giopVersion, int requestId) throws IOException { Message msg = MessageBase.createCancelRequest(giopVersion, requestId); sendHelper(giopVersion, msg); } protected void sendHelper(GIOPVersion giopVersion, Message msg) throws IOException { // REVISIT: See comments in CDROutputObject constructor. CDROutputObject outputObject = new CDROutputObject((ORB)orb, null, giopVersion, this, msg, ORBConstants.STREAM_FORMAT_VERSION_1); msg.write(outputObject); outputObject.writeTo(this); } public void sendCancelRequestWithLock(GIOPVersion giopVersion, int requestId) throws IOException { writeLock(); try { sendCancelRequest(giopVersion, requestId); } finally { writeUnlock(); } } // Begin Code Base methods --------------------------------------- // // Set this connection's code base IOR. The IOR comes from the // SendingContext. This is an optional service context, but all // JavaSoft ORBs send it. // // The set and get methods don't need to be synchronized since the // first possible get would occur during reading a valuetype, and // that would be after the set. // Sets this connection's code base IOR. This is done after // getting the IOR out of the SendingContext service context. // Our ORBs always send this, but it's optional in CORBA. public final void setCodeBaseIOR(IOR ior) { codeBaseServerIOR = ior; } public final IOR getCodeBaseIOR() { return codeBaseServerIOR; } // Get a CodeBase stub to use in unmarshaling. The CachedCodeBase // won't connect to the remote codebase unless it's necessary. public final CodeBase getCodeBase() { return cachedCodeBase; } // End Code Base methods ----------------------------------------- // set transport read thresholds protected void setReadTimeouts(ReadTimeouts readTimeouts) { this.readTimeouts = readTimeouts; } protected void setPartialMessageMediator(CorbaMessageMediator messageMediator) { partialMessageMediator = messageMediator; } protected CorbaMessageMediator getPartialMessageMediator() { return partialMessageMediator; } public String toString() { synchronized ( stateEvent ){ return "SocketOrChannelConnectionImpl[" + " " + (socketChannel == null ? socket.toString() : socketChannel.toString()) + " " + getStateString( state ) + " " + shouldUseSelectThreadToWait() + " " + shouldUseWorkerThreadForEvent() + " " + shouldReadGiopHeaderOnly() + "]" ; } } // Must be public - used in encoding. public void dprint(String msg) { ORBUtility.dprint("SocketOrChannelConnectionImpl", msg); } protected void dprint(String msg, Throwable t) { dprint(msg); t.printStackTrace(System.out); } } // End of file.