/* * @(#)BufferManagerReadStream.java 1.13 03/12/19 * * Copyright 2004 Sun Microsystems, Inc. All rights reserved. * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms. */ package com.sun.corba.se.impl.encoding; import java.nio.ByteBuffer; import com.sun.corba.se.pept.transport.ByteBufferPool; import com.sun.corba.se.spi.logging.CORBALogDomains; import com.sun.corba.se.spi.orb.ORB; import com.sun.corba.se.impl.logging.ORBUtilSystemException; import com.sun.corba.se.impl.orbutil.ORBUtility; import com.sun.corba.se.impl.protocol.RequestCanceledException; import com.sun.corba.se.impl.protocol.giopmsgheaders.FragmentMessage; import com.sun.corba.se.impl.protocol.giopmsgheaders.Message; import java.util.*; public class BufferManagerReadStream implements BufferManagerRead, MarkAndResetHandler { private boolean receivedCancel = false; private int cancelReqId = 0; // We should convert endOfStream to a final static dummy end node private boolean endOfStream = true; private BufferQueue fragmentQueue = new BufferQueue(); // REVISIT - This should go in BufferManagerRead. But, since // BufferManagerRead is an interface. BufferManagerRead // might ought to be an abstract class instead of an // interface. private ORB orb ; private ORBUtilSystemException wrapper ; private boolean debug = false; BufferManagerReadStream( ORB orb ) { this.orb = orb ; this.wrapper = ORBUtilSystemException.get( orb, CORBALogDomains.RPC_ENCODING ) ; debug = orb.transportDebugFlag; } public void cancelProcessing(int requestId) { synchronized(fragmentQueue) { receivedCancel = true; cancelReqId = requestId; fragmentQueue.notify(); } } public void processFragment(ByteBuffer byteBuffer, FragmentMessage msg) { ByteBufferWithInfo bbwi = new ByteBufferWithInfo(orb, byteBuffer, msg.getHeaderLength()); synchronized (fragmentQueue) { if (debug) { // print address of ByteBuffer being queued int bbAddress = System.identityHashCode(byteBuffer); StringBuffer sb = new StringBuffer(80); sb.append("processFragment() - queueing ByteBuffer id ("); sb.append(bbAddress).append(") to fragment queue."); String strMsg = sb.toString(); dprint(strMsg); } fragmentQueue.enqueue(bbwi); endOfStream = !msg.moreFragmentsToFollow(); fragmentQueue.notify(); } } public ByteBufferWithInfo underflow (ByteBufferWithInfo bbwi) { ByteBufferWithInfo result = null; try { //System.out.println("ENTER underflow"); synchronized (fragmentQueue) { if (receivedCancel) { throw new RequestCanceledException(cancelReqId); } while (fragmentQueue.size() == 0) { if (endOfStream) { throw wrapper.endOfStream() ; } try { fragmentQueue.wait(); } catch (InterruptedException e) {} if (receivedCancel) { throw new RequestCanceledException(cancelReqId); } } result = fragmentQueue.dequeue(); result.fragmented = true; if (debug) { // print address of ByteBuffer being dequeued int bbAddr = System.identityHashCode(result.byteBuffer); StringBuffer sb1 = new StringBuffer(80); sb1.append("underflow() - dequeued ByteBuffer id ("); sb1.append(bbAddr).append(") from fragment queue."); String msg1 = sb1.toString(); dprint(msg1); } // VERY IMPORTANT // Release bbwi.byteBuffer to the ByteBufferPool only if // this BufferManagerStream is not marked for potential restore. if (markEngaged == false && bbwi != null && bbwi.byteBuffer != null) { ByteBufferPool byteBufferPool = getByteBufferPool(); if (debug) { // print address of ByteBuffer being released int bbAddress = System.identityHashCode(bbwi.byteBuffer); StringBuffer sb = new StringBuffer(80); sb.append("underflow() - releasing ByteBuffer id ("); sb.append(bbAddress).append(") to ByteBufferPool."); String msg = sb.toString(); dprint(msg); } byteBufferPool.releaseByteBuffer(bbwi.byteBuffer); bbwi.byteBuffer = null; bbwi = null; } } return result; } finally { //System.out.println("EXIT underflow"); } } public void init(Message msg) { if (msg != null) endOfStream = !msg.moreFragmentsToFollow(); } // Release any queued ByteBufferWithInfo's byteBuffers to the // ByteBufferPoool public void close(ByteBufferWithInfo bbwi) { int inputBbAddress = 0; // release ByteBuffers on fragmentQueue if (fragmentQueue != null) { synchronized (fragmentQueue) { // IMPORTANT: The fragment queue may have one ByteBuffer // on it that's also on the CDRInputStream if // this method is called when the stream is 'marked'. // Thus, we'll compare the ByteBuffer passed // in (from a CDRInputStream) with all ByteBuffers // on the stack. If one is found to equal, it will // not be released to the ByteBufferPool. if (bbwi != null) { inputBbAddress = System.identityHashCode(bbwi.byteBuffer); } ByteBufferWithInfo abbwi = null; ByteBufferPool byteBufferPool = getByteBufferPool(); while (fragmentQueue.size() != 0) { abbwi = fragmentQueue.dequeue(); if (abbwi != null && abbwi.byteBuffer != null) { int bbAddress = System.identityHashCode(abbwi.byteBuffer); if (inputBbAddress != bbAddress) { if (debug) { // print address of ByteBuffer released StringBuffer sb = new StringBuffer(80); sb.append("close() - fragmentQueue is ") .append("releasing ByteBuffer id (") .append(bbAddress).append(") to ") .append("ByteBufferPool."); String msg = sb.toString(); dprint(msg); } } byteBufferPool.releaseByteBuffer(abbwi.byteBuffer); } } } fragmentQueue = null; } // release ByteBuffers on fragmentStack if (fragmentStack != null && fragmentStack.size() != 0) { // IMPORTANT: The fragment stack may have one ByteBuffer // on it that's also on the CDRInputStream if // this method is called when the stream is 'marked'. // Thus, we'll compare the ByteBuffer passed // in (from a CDRInputStream) with all ByteBuffers // on the stack. If one is found to equal, it will // not be released to the ByteBufferPool. if (bbwi != null) { inputBbAddress = System.identityHashCode(bbwi.byteBuffer); } ByteBufferWithInfo abbwi = null; ByteBufferPool byteBufferPool = getByteBufferPool(); ListIterator itr = fragmentStack.listIterator(); while (itr.hasNext()) { abbwi = (ByteBufferWithInfo)itr.next(); if (abbwi != null && abbwi.byteBuffer != null) { int bbAddress = System.identityHashCode(abbwi.byteBuffer); if (inputBbAddress != bbAddress) { if (debug) { // print address of ByteBuffer being released StringBuffer sb = new StringBuffer(80); sb.append("close() - fragmentStack - releasing ") .append("ByteBuffer id (" + bbAddress + ") to ") .append("ByteBufferPool."); String msg = sb.toString(); dprint(msg); } byteBufferPool.releaseByteBuffer(abbwi.byteBuffer); } } } fragmentStack = null; } } protected ByteBufferPool getByteBufferPool() { return orb.getByteBufferPool(); } private void dprint(String msg) { ORBUtility.dprint("BufferManagerReadStream", msg); } // Mark and reset handler ---------------------------------------- private boolean markEngaged = false; // List of fragment ByteBufferWithInfos received since // the mark was engaged. private LinkedList fragmentStack = null; private RestorableInputStream inputStream = null; // Original state of the stream private Object streamMemento = null; public void mark(RestorableInputStream inputStream) { this.inputStream = inputStream; markEngaged = true; // Get the magic Object that the stream will use to // reconstruct it's state when reset is called streamMemento = inputStream.createStreamMemento(); if (fragmentStack != null) { fragmentStack.clear(); } } // Collects fragments received since the mark was engaged. public void fragmentationOccured(ByteBufferWithInfo newFragment) { if (!markEngaged) return; if (fragmentStack == null) fragmentStack = new LinkedList(); fragmentStack.addFirst(new ByteBufferWithInfo(newFragment)); } public void reset() { if (!markEngaged) { // REVISIT - call to reset without call to mark return; } markEngaged = false; // If we actually did peek across fragments, we need // to push those fragments onto the front of the // buffer queue. if (fragmentStack != null && fragmentStack.size() != 0) { ListIterator iter = fragmentStack.listIterator(); synchronized(fragmentQueue) { while (iter.hasNext()) { fragmentQueue.push((ByteBufferWithInfo)iter.next()); } } fragmentStack.clear(); } // Give the stream the magic Object to restore // it's state. inputStream.restoreInternalState(streamMemento); } public MarkAndResetHandler getMarkAndResetHandler() { return this; } }