/* * Copyright 2005 Sun Microsystems, Inc. All rights reserved. * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms. */ package com.sun.corba.se.impl.orbutil.threadpool; import java.security.AccessController; import java.security.PrivilegedAction; import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException; import com.sun.corba.se.spi.orbutil.threadpool.ThreadPool; import com.sun.corba.se.spi.orbutil.threadpool.Work; import com.sun.corba.se.spi.orbutil.threadpool.WorkQueue; import com.sun.corba.se.impl.orbutil.ORBConstants; import com.sun.corba.se.impl.orbutil.threadpool.WorkQueueImpl; import com.sun.corba.se.spi.monitoring.MonitoringConstants; import com.sun.corba.se.spi.monitoring.MonitoredObject; import com.sun.corba.se.spi.monitoring.MonitoringFactories; import com.sun.corba.se.spi.monitoring.LongMonitoredAttributeBase; public class ThreadPoolImpl implements ThreadPool { private static int threadCounter = 0; // serial counter useful for debugging private WorkQueue workQueue; // Stores the number of available worker threads private int availableWorkerThreads = 0; // Stores the number of threads in the threadpool currently private int currentThreadCount = 0; // Minimum number of worker threads created at instantiation of the threadpool private int minWorkerThreads = 0; // Maximum number of worker threads in the threadpool private int maxWorkerThreads = 0; // Inactivity timeout value for worker threads to exit and stop running private long inactivityTimeout = ORBConstants.DEFAULT_INACTIVITY_TIMEOUT ; // Indicates if the threadpool is bounded or unbounded private boolean boundedThreadPool = false; // Running count of the work items processed // Set the value to 1 so that divide by zero is avoided in // averageWorkCompletionTime() private long processedCount = 1; // Running aggregate of the time taken in millis to execute work items // processed by the threads in the threadpool private long totalTimeTaken = 0; // Lock for protecting state when required private Object lock = new Object(); // Name of the ThreadPool private String name; // MonitoredObject for ThreadPool private MonitoredObject threadpoolMonitoredObject; // ThreadGroup in which threads should be created private final ThreadGroup threadGroup ; /** * This constructor is used to create an unbounded threadpool */ public ThreadPoolImpl(ThreadGroup tg, String threadpoolName) { maxWorkerThreads = Integer.MAX_VALUE; workQueue = new WorkQueueImpl(this); threadGroup = tg ; name = threadpoolName; initializeMonitoring(); } /** * This constructor is used to create an unbounded threadpool * in the ThreadGroup of the current thread */ public ThreadPoolImpl(String threadpoolName) { this( Thread.currentThread().getThreadGroup(), threadpoolName ) ; } /** * This constructor is used to create bounded threadpool */ public ThreadPoolImpl(int minSize, int maxSize, long timeout, String threadpoolName) { inactivityTimeout = timeout; minWorkerThreads = minSize; maxWorkerThreads = maxSize; boundedThreadPool = true; workQueue = new WorkQueueImpl(this); threadGroup = Thread.currentThread().getThreadGroup() ; name = threadpoolName; for (int i = 0; i < minWorkerThreads; i++) { createWorkerThread(); } initializeMonitoring(); } // Setup monitoring for this threadpool private void initializeMonitoring() { // Get root monitored object MonitoredObject root = MonitoringFactories.getMonitoringManagerFactory(). createMonitoringManager(MonitoringConstants.DEFAULT_MONITORING_ROOT, null). getRootMonitoredObject(); // Create the threadpool monitoring root MonitoredObject threadPoolMonitoringObjectRoot = root.getChild( MonitoringConstants.THREADPOOL_MONITORING_ROOT); if (threadPoolMonitoringObjectRoot == null) { threadPoolMonitoringObjectRoot = MonitoringFactories. getMonitoredObjectFactory().createMonitoredObject( MonitoringConstants.THREADPOOL_MONITORING_ROOT, MonitoringConstants.THREADPOOL_MONITORING_ROOT_DESCRIPTION); root.addChild(threadPoolMonitoringObjectRoot); } threadpoolMonitoredObject = MonitoringFactories. getMonitoredObjectFactory(). createMonitoredObject(name, MonitoringConstants.THREADPOOL_MONITORING_DESCRIPTION); threadPoolMonitoringObjectRoot.addChild(threadpoolMonitoredObject); LongMonitoredAttributeBase b1 = new LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_CURRENT_NUMBER_OF_THREADS, MonitoringConstants.THREADPOOL_CURRENT_NUMBER_OF_THREADS_DESCRIPTION) { public Object getValue() { return new Long(ThreadPoolImpl.this.currentNumberOfThreads()); } }; threadpoolMonitoredObject.addAttribute(b1); LongMonitoredAttributeBase b2 = new LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_NUMBER_OF_AVAILABLE_THREADS, MonitoringConstants.THREADPOOL_CURRENT_NUMBER_OF_THREADS_DESCRIPTION) { public Object getValue() { return new Long(ThreadPoolImpl.this.numberOfAvailableThreads()); } }; threadpoolMonitoredObject.addAttribute(b2); LongMonitoredAttributeBase b3 = new LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_NUMBER_OF_BUSY_THREADS, MonitoringConstants.THREADPOOL_NUMBER_OF_BUSY_THREADS_DESCRIPTION) { public Object getValue() { return new Long(ThreadPoolImpl.this.numberOfBusyThreads()); } }; threadpoolMonitoredObject.addAttribute(b3); LongMonitoredAttributeBase b4 = new LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_AVERAGE_WORK_COMPLETION_TIME, MonitoringConstants.THREADPOOL_AVERAGE_WORK_COMPLETION_TIME_DESCRIPTION) { public Object getValue() { return new Long(ThreadPoolImpl.this.averageWorkCompletionTime()); } }; threadpoolMonitoredObject.addAttribute(b4); LongMonitoredAttributeBase b5 = new LongMonitoredAttributeBase(MonitoringConstants.THREADPOOL_CURRENT_PROCESSED_COUNT, MonitoringConstants.THREADPOOL_CURRENT_PROCESSED_COUNT_DESCRIPTION) { public Object getValue() { return new Long(ThreadPoolImpl.this.currentProcessedCount()); } }; threadpoolMonitoredObject.addAttribute(b5); // Add the monitored object for the WorkQueue threadpoolMonitoredObject.addChild( ((WorkQueueImpl)workQueue).getMonitoredObject()); } // Package private method to get the monitored object for this // class MonitoredObject getMonitoredObject() { return threadpoolMonitoredObject; } public WorkQueue getAnyWorkQueue() { return workQueue; } public WorkQueue getWorkQueue(int queueId) throws NoSuchWorkQueueException { if (queueId != 0) throw new NoSuchWorkQueueException(); return workQueue; } /** * To be called from the workqueue when work is added to the * workQueue. This method would create new threads if required * or notify waiting threads on the queue for available work */ void notifyForAvailableWork(WorkQueue aWorkQueue) { synchronized (lock) { if (availableWorkerThreads == 0) { createWorkerThread(); } else { aWorkQueue.notify(); } } } /** * To be called from the workqueue to create worker threads when none * available. */ void createWorkerThread() { synchronized (lock) { final String name = getName() ; if (boundedThreadPool) { if (currentThreadCount < maxWorkerThreads) { currentThreadCount++; } else { // REVIST - Need to create a thread to monitor the // the state for deadlock i.e. all threads waiting for // something which can be got from the item in the // workqueue, but there is no thread available to // process that work item - DEADLOCK !! return; } } else { currentThreadCount++; } // If we get here, we need to create a thread. AccessController.doPrivileged( new PrivilegedAction() { public Object run() { // Thread creation needs to be in a doPrivileged block // for two reasons: // 1. The creation of a thread in a specific ThreadGroup // is a privileged operation. Lack of a doPrivileged // block here causes an AccessControlException // (see bug 6268145). // 2. We want to make sure that the permissions associated // with this thread do NOT include the permissions of // the current thread that is calling this method. // This leads to problems in the app server where // some threads in the ThreadPool randomly get // bad permissions, leading to unpredictable // permission errors. WorkerThread thread = new WorkerThread(threadGroup, name); // The thread must be set to a daemon thread so the // VM can exit if the only threads left are PooledThreads // or other daemons. We don't want to rely on the // calling thread always being a daemon. // Note that no exception is possible here since we // are inside the doPrivileged block. thread.setDaemon(true); thread.start(); return null ; } } ) ; } } /** * This method will return the minimum number of threads maintained * by the threadpool. */ public int minimumNumberOfThreads() { return minWorkerThreads; } /** * This method will return the maximum number of threads in the * threadpool at any point in time, for the life of the threadpool */ public int maximumNumberOfThreads() { return maxWorkerThreads; } /** * This method will return the time in milliseconds when idle * threads in the threadpool are removed. */ public long idleTimeoutForThreads() { return inactivityTimeout; } /** * This method will return the total number of threads currently in the * threadpool. This method returns a value which is not synchronized. */ public int currentNumberOfThreads() { synchronized (lock) { return currentThreadCount; } } /** * This method will return the number of available threads in the * threadpool which are waiting for work. This method returns a * value which is not synchronized. */ public int numberOfAvailableThreads() { synchronized (lock) { return availableWorkerThreads; } } /** * This method will return the number of busy threads in the threadpool * This method returns a value which is not synchronized. */ public int numberOfBusyThreads() { synchronized (lock) { return (currentThreadCount - availableWorkerThreads); } } /** * This method returns the average elapsed time taken to complete a Work * item in milliseconds. */ public long averageWorkCompletionTime() { synchronized (lock) { return (totalTimeTaken / processedCount); } } /** * This method returns the number of Work items processed by the threadpool */ public long currentProcessedCount() { synchronized (lock) { return processedCount; } } public String getName() { return name; } /** * This method will return the number of WorkQueues serviced by the threadpool. */ public int numberOfWorkQueues() { return 1; } private static synchronized int getUniqueThreadId() { return ThreadPoolImpl.threadCounter++; } private class WorkerThread extends Thread { private Work currentWork; private int threadId = 0; // unique id for the thread // thread pool this WorkerThread belongs too private String threadPoolName; // name seen by Thread.getName() private StringBuffer workerThreadName = new StringBuffer(); WorkerThread(ThreadGroup tg, String threadPoolName) { super(tg, "Idle"); this.threadId = ThreadPoolImpl.getUniqueThreadId(); this.threadPoolName = threadPoolName; setName(composeWorkerThreadName(threadPoolName, "Idle")); } public void run() { while (true) { try { synchronized (lock) { availableWorkerThreads++; } // Get some work to do currentWork = ((WorkQueueImpl)workQueue).requestWork(inactivityTimeout); synchronized (lock) { availableWorkerThreads--; // It is possible in notifyForAvailableWork that the // check for availableWorkerThreads = 0 may return // false, because the availableWorkerThreads has not been // decremented to zero before the producer thread added // work to the queue. This may create a deadlock, if the // executing thread needs information which is in the work // item queued in the workqueue, but has no thread to work // on it since none was created because availableWorkerThreads = 0 // returned false. // The following code will ensure that a thread is always available // in those situations if ((availableWorkerThreads == 0) && (workQueue.workItemsInQueue() > 0)) { createWorkerThread(); } } // Set the thread name for debugging. setName(composeWorkerThreadName(threadPoolName, Integer.toString(this.threadId))); long start = System.currentTimeMillis(); try { // Do the work currentWork.doWork(); } catch (Throwable t) { // Ignore all errors. ; } long end = System.currentTimeMillis(); synchronized (lock) { totalTimeTaken += (end - start); processedCount++; } // set currentWork to null so that the work item can be // garbage collected currentWork = null; setName(composeWorkerThreadName(threadPoolName, "Idle")); } catch (TimeoutException e) { // This thread timed out waiting for something to do. synchronized (lock) { availableWorkerThreads--; // This should for both bounded and unbounded case if (currentThreadCount > minWorkerThreads) { currentThreadCount--; // This thread can exit. return; } else { // Go back to waiting on workQueue continue; } } } catch (InterruptedException ie) { // InterruptedExceptions are // caught here. Thus, threads can be forced out of // requestWork and so they have to reacquire the lock. // Other options include ignoring or // letting this thread die. // Ignoring for now. REVISIT synchronized (lock) { availableWorkerThreads--; } } catch (Throwable e) { // Ignore any exceptions that currentWork.process // accidently lets through, but let Errors pass. // Add debugging output? REVISIT synchronized (lock) { availableWorkerThreads--; } } } } private String composeWorkerThreadName(String poolName, String workerName) { workerThreadName.setLength(0); workerThreadName.append("p: ").append(poolName); workerThreadName.append("; w: ").append(workerName); return workerThreadName.toString(); } } // End of WorkerThread class } // End of file.