/* * Copyright 2004 Sun Microsystems, Inc. All rights reserved. * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms. */ package com.sun.corba.se.impl.orbutil.threadpool; import java.util.LinkedList; import com.sun.corba.se.spi.orbutil.threadpool.ThreadPool; import com.sun.corba.se.spi.orbutil.threadpool.Work; import com.sun.corba.se.spi.orbutil.threadpool.WorkQueue; import com.sun.corba.se.impl.orbutil.ORBConstants; import com.sun.corba.se.impl.orbutil.threadpool.ThreadPoolImpl; import com.sun.corba.se.spi.monitoring.MonitoringConstants; import com.sun.corba.se.spi.monitoring.MonitoringFactories; import com.sun.corba.se.spi.monitoring.MonitoredObject; import com.sun.corba.se.spi.monitoring.LongMonitoredAttributeBase; public class WorkQueueImpl implements WorkQueue { private ThreadPool workerThreadPool; private LinkedList theWorkQueue = new LinkedList(); private long workItemsAdded = 0; // Initialized to 1 to avoid divide by zero in averageTimeInQueue() private long workItemsDequeued = 1; private long totalTimeInQueue = 0; // Name of the work queue private String name; // MonitoredObject for work queue private MonitoredObject workqueueMonitoredObject; public WorkQueueImpl() { name=ORBConstants.WORKQUEUE_DEFAULT_NAME; initializeMonitoring(); } public WorkQueueImpl(ThreadPool workerThreadPool) { this(workerThreadPool, ORBConstants.WORKQUEUE_DEFAULT_NAME); } public WorkQueueImpl(ThreadPool workerThreadPool, String name) { this.workerThreadPool = workerThreadPool; this.name = name; initializeMonitoring(); } // Setup monitoring for this workqueue private void initializeMonitoring() { workqueueMonitoredObject = MonitoringFactories. getMonitoredObjectFactory(). createMonitoredObject(name, MonitoringConstants.WORKQUEUE_MONITORING_DESCRIPTION); LongMonitoredAttributeBase b1 = new LongMonitoredAttributeBase(MonitoringConstants.WORKQUEUE_TOTAL_WORK_ITEMS_ADDED, MonitoringConstants.WORKQUEUE_TOTAL_WORK_ITEMS_ADDED_DESCRIPTION) { public Object getValue() { return new Long(WorkQueueImpl.this.totalWorkItemsAdded()); } }; workqueueMonitoredObject.addAttribute(b1); LongMonitoredAttributeBase b2 = new LongMonitoredAttributeBase(MonitoringConstants.WORKQUEUE_WORK_ITEMS_IN_QUEUE, MonitoringConstants.WORKQUEUE_WORK_ITEMS_IN_QUEUE_DESCRIPTION) { public Object getValue() { return new Long(WorkQueueImpl.this.workItemsInQueue()); } }; workqueueMonitoredObject.addAttribute(b2); LongMonitoredAttributeBase b3 = new LongMonitoredAttributeBase(MonitoringConstants.WORKQUEUE_AVERAGE_TIME_IN_QUEUE, MonitoringConstants.WORKQUEUE_AVERAGE_TIME_IN_QUEUE_DESCRIPTION) { public Object getValue() { return new Long(WorkQueueImpl.this.averageTimeInQueue()); } }; workqueueMonitoredObject.addAttribute(b3); } // Package private method to get the monitored object for this // class MonitoredObject getMonitoredObject() { return workqueueMonitoredObject; } public void addWork(Work work) { synchronized (this) { workItemsAdded++; work.setEnqueueTime(System.currentTimeMillis()); theWorkQueue.addLast(work); ((ThreadPoolImpl)workerThreadPool).notifyForAvailableWork(this); } } Work requestWork(long waitTime) throws TimeoutException, InterruptedException { Work workItem; synchronized (this) { if (theWorkQueue.size() != 0) { workItem = (Work)theWorkQueue.removeFirst(); totalTimeInQueue += System.currentTimeMillis() - workItem.getEnqueueTime(); workItemsDequeued++; return workItem; } try { long remainingWaitTime = waitTime; long finishTime = System.currentTimeMillis() + waitTime; do { this.wait(remainingWaitTime); if (theWorkQueue.size() != 0) { workItem = (Work)theWorkQueue.removeFirst(); totalTimeInQueue += System.currentTimeMillis() - workItem.getEnqueueTime(); workItemsDequeued++; return workItem; } remainingWaitTime = finishTime - System.currentTimeMillis(); } while (remainingWaitTime > 0); throw new TimeoutException(); } catch (InterruptedException ie) { throw ie; } } } public void setThreadPool(ThreadPool workerThreadPool) { this.workerThreadPool = workerThreadPool; } public ThreadPool getThreadPool() { return workerThreadPool; } /** * Returns the total number of Work items added to the Queue. * This method is unsynchronized and only gives a snapshot of the * state when it is called */ public long totalWorkItemsAdded() { return workItemsAdded; } /** * Returns the total number of Work items in the Queue to be processed * This method is unsynchronized and only gives a snapshot of the * state when it is called */ public int workItemsInQueue() { return theWorkQueue.size(); } public synchronized long averageTimeInQueue() { return (totalTimeInQueue/workItemsDequeued); } public String getName() { return name; } } // End of file.