postReconnection
,
* this method is intended to be called only by a client connetor:
* RMIConnector
and
ClientIntermediary.
* Call this method will set the flag beingReconnection to true
,
* and the thread used to fetch notifis will be stopped, a new thread can be
* created only after the method postReconnection
is called.
*
* It is caller's responsiblity to not re-call this method before calling
* postReconnection.
*/
public synchronized ClientListenerInfo[] preReconnection() throws IOException {
if (state == TERMINATED || beingReconnected) { // should never
throw new IOException("Illegal state.");
}
final ClientListenerInfo[] tmp = (ClientListenerInfo[])
infoList.values().toArray(new ClientListenerInfo[0]);
beingReconnected = true;
infoList.clear();
if (currentFetchThread == Thread.currentThread()) {
/* we do not need to stop the fetching thread, because this thread is
used to do restarting and it will not be used to do fetching during
the re-registering the listeners.*/
return tmp;
}
while (state == STARTING) {
try {
wait();
} catch (InterruptedException ire) {
IOException ioe = new IOException(ire.toString());
EnvHelp.initCause(ioe, ire);
throw ioe;
}
}
if (state == STARTED) {
setState(STOPPING);
}
return tmp;
}
/**
* Called after reconnection is finished.
* This method is intended to be called only by a client connetor:
* RMIConnector
and
ClientIntermediary
.
*/
public synchronized void postReconnection(ClientListenerInfo[] listenerInfos)
throws IOException {
if (state == TERMINATED) {
return;
}
while (state == STOPPING) {
try {
wait();
} catch (InterruptedException ire) {
IOException ioe = new IOException(ire.toString());
EnvHelp.initCause(ioe, ire);
throw ioe;
}
}
final boolean trace = logger.traceOn();
final int len = listenerInfos.length;
for (int i=0; iThe parameter reconnected will decide whether to initilize the clientSequenceNumber,
* initilaizing the clientSequenceNumber means to ignore all notifications arrived before.
* If it is reconnected, we will not initialize in order to get all notifications arrived
* during the reconnection. It may cause the newly registered listeners to receive some
* notifications arrived before its registray.
*/
private synchronized void init(boolean reconnected) throws IOException {
switch (state) {
case STARTED:
return;
case STARTING:
return;
case TERMINATED:
throw new IOException("The ClientNotifForwarder has been terminated.");
case STOPPING:
if (beingReconnected == true) {
// wait for another thread to do, which is doing reconnection
return;
}
while (state == STOPPING) { // make sure only one fetching thread.
try {
wait();
} catch (InterruptedException ire) {
IOException ioe = new IOException(ire.toString());
EnvHelp.initCause(ioe, ire);
throw ioe;
}
}
// re-call this method to check the state again,
// the state can be other value like TERMINATED.
init(reconnected);
return;
case STOPPED:
if (beingReconnected == true) {
// wait for another thread to do, which is doing reconnection
return;
}
if (logger.traceOn()) {
logger.trace("init", "Initializing...");
}
// init the clientSequenceNumber if not reconnected.
if (!reconnected) {
try {
NotificationResult nr = fetchNotifs(-1, 0, 0);
clientSequenceNumber = nr.getNextSequenceNumber();
} catch (ClassNotFoundException e) {
// can't happen
logger.warning("init", "Impossible exception: "+ e);
logger.debug("init",e);
}
}
// for cleaning
try {
mbeanRemovedNotifID = addListenerForMBeanRemovedNotif();
} catch (Exception e) {
final String msg =
"Failed to register a listener to the mbean " +
"server: the client will not do clean when an MBean " +
"is unregistered";
if (logger.traceOn()) {
logger.trace("init", msg, e);
}
}
setState(STARTING);
// start fetching
executor.execute(new NotifFetcher());
return;
default:
// should not
throw new IOException("Unknown state.");
}
}
/**
* Import: should not remove a listener dureing reconnection, the reconnection
* needs to change the listener list and that will possibly make removal fail.
*/
private synchronized void beforeRemove() throws IOException {
while (beingReconnected) {
if (state == TERMINATED) {
throw new IOException("Terminated.");
}
try {
wait();
} catch (InterruptedException ire) {
IOException ioe = new IOException(ire.toString());
EnvHelp.initCause(ioe, ire);
throw ioe;
}
}
if (state == TERMINATED) {
throw new IOException("Terminated.");
}
}
// -------------------------------------------------
// private variables
// -------------------------------------------------
private final ClassLoader defaultClassLoader;
private final Executor executor;
private final HashMap infoList = new HashMap();
// Integer -> ClientListenerInfo
// notif stuff
private long clientSequenceNumber = -1;
private final int maxNotifications;
private final long timeout;
private Integer mbeanRemovedNotifID = null;
private Thread currentFetchThread;
// admin stuff
private boolean inited = false;
// state
/**
* This state means that a thread is being created for fetching and forwarding notifications.
*/
private static final int STARTING = 0;
/**
* This state tells that a thread has been started for fetching and forwarding notifications.
*/
private static final int STARTED = 1;
/**
* This state means that the fetching thread is informed to stop.
*/
private static final int STOPPING = 2;
/**
* This state means that the fetching thread is already stopped.
*/
private static final int STOPPED = 3;
/**
* This state means that this object is terminated and no more thread will be created
* for fetching notifications.
*/
private static final int TERMINATED = 4;
private int state = STOPPED;
/**
* This variable is used to tell whether a connector (RMIConnector or ClientIntermediary)
* is doing reconnection.
* This variable will be set to true by the method preReconnection
, and set
* fase by postReconnection
.
* When beingReconnected == true, no thread will be created for fetching notifications.
*/
private boolean beingReconnected = false;
private static final ClassLogger logger =
new ClassLogger("javax.management.remote.misc",
"ClientNotifForwarder");
}