Class ConcurrentQ<I,R>
- java.lang.Object
-
- generic.concurrent.ConcurrentQ<I,R>
-
- Type Parameters:
I
- The type of the items to be processed.R
- The type of objects resulting from processing an item; if you don't care about the return value, then make this value whatever you want, likeObject
or the same value asI
and return null fromQCallback.process(Object, TaskMonitor)
.
public class ConcurrentQ<I,R> extends java.lang.Object
A queue for easily scheduling tasks to be run in parallel (or sequentially) via a thread pool. This class provides a clean separation of items that need to be processed from the algorithm that does the processing, making it easy to parallelize the processing of multiple items. Further, you can control the maximum number of items that can be processed concurrently. This is useful to throttle operations that may starve the other threads in the system. You may also control how many items get placed into the queue at one time, blocking if some threshold is exceeded.Examples:
Put and Forget:
QCallback<ITEM, RESULT> callback = new AbstractQCallback<ITEM, RESULT>() { public RESULT process(ITEM item, TaskMonitor monitor) { // do work here... } }; ConcurrentQBuilder<ITEM, RESULT> builder = new ConcurrentQBuilder<ITEM, RESULT>(); builder.setThreadPoolName("Thread Pool Name"); concurrentQ = builder.getQueue(callback); ... ... concurrentQ.add(item); // where item is one of the instances of ITEM
Put Items and Handle Results in Any Order as They Available:
QCallback<ITEM, RESULT> callback = new AbstractQCallback<ITEM, RESULT>() { public RESULT process(ITEM item, TaskMonitor monitor) { // do work here... } }; QItemListener<ITEM, RESULT> itemListener = new QItemListener<ITEM, RESULT>() { public void itemProcessed(QResult<ITEM, RESULT> result) { RESULT result = result.getResult(); // work on my result... } }; ConcurrentQBuilder<ITEM, RESULT> builder = new ConcurrentQBuilder<ITEM, RESULT>(); builder.setThreadPoolName("Thread Pool Name"); builder.setListener(itemListener); concurrentQ = builder.build(callback); ... ... concurrentQ.add(item); // where item is one of the instances of ITEM concurrentQ.add(item); concurrentQ.add(item);
Put Items and Handle Results When All Items Have Been Processed:
QCallback<ITEM, RESULT> callback = new AbstractQCallback<ITEM, RESULT>() { public RESULT process(ITEM item, TaskMonitor monitor) { // do work here... } }; ConcurrentQBuilder<ITEM, RESULT> builder = new ConcurrentQBuilder<ITEM, RESULT>(); builder.setThreadPoolName("Thread Pool Name"); builder.setCollectResults(true); concurrentQ = builder.getQueue(callback); ... ... concurrentQ.add(item); // where item is one of the instances of ITEM concurrentQ.add(item); concurrentQ.add(item); ... List<QResult<I, R>> results = concurrentQ.waitForResults(); // process the results...
Put Items, Blocking While Full, and Handle Results in Any Order as They Available:
QCallback<ITEM, RESULT> callback = new AbstractQCallback<ITEM, RESULT>() { public RESULT process(ITEM item, TaskMonitor monitor) { // do work here... } }; QItemListener<ITEM, RESULT> itemListener = new QItemListener<ITEM, RESULT>() { public void itemProcessed(QResult<ITEM, RESULT> result) { RESULT result = result.getResult(); // work on my result... } }; ConcurrentQBuilder<ITEM, RESULT> builder = new ConcurrentQBuilder<ITEM, RESULT>(); builder.setThreadPoolName("Thread Pool Name"); builder.setQueue(new LinkedBlockingQueue(100)); concurrentQ = builder.getQueue(callback); ... ... Iterator<ITEM> iterator = <get an iterator for 1000s of items somewhere>
concurrentQ.offer(iterator); // this call will block when the queue fills up (100 items or more)
-
-
Constructor Summary
Constructors Constructor Description ConcurrentQ(QCallback<I,R> callback, java.util.Queue<I> queue, GThreadPool threadPool, QItemListener<I,R> listener, boolean collectResults, int maxInProgress, boolean jobsReportProgress)
Creates a ConcurrentQ that will process at most maxInProgress items at a time, regardless of how many threads are available in the GThreadPool.ConcurrentQ(java.lang.String name, QCallback<I,R> callback)
Creates a ConcurrentQ that will process as many items as the given threadPool can handle at one time.
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
add(I item)
Adds the item to this queue for concurrent processing.void
add(java.util.Collection<I> items)
Adds the list of items to this queue for concurrent processing.void
add(java.util.Iterator<I> iterator)
Adds the items of the given iterator to this queue for concurrent processing.void
addProgressListener(QProgressListener<I> listener)
Adds a progress listener for this queue.java.util.List<I>
cancelAllTasks(boolean interruptRunningTasks)
Cancels the processing of currently scheduled items in this queue.void
cancelScheduledJobs()
void
dispose()
Cancels all running tasks and disposes of the internal thread pool if it is a private pool.boolean
isEmpty()
Returns true if this queue has no items waiting to be processed or currently being processed.void
offer(java.util.Iterator<I> iterator)
Allows clients to use a bounded queue (such as aLinkedBlockingQueue
to control how many items get placed into this queue at one time.void
removeProgressListener(QProgressListener<I> listener)
Removes a progress listener from this queue.java.util.List<I>
removeUnscheduledJobs()
void
setMonitor(TaskMonitor monitor, boolean cancelClearsAllItems)
Sets the monitor to use with this queue.QResult<I,R>
waitForNextResult()
Wait until at least one result is available and then return the first result.java.util.Collection<QResult<I,R>>
waitForResults()
Waits until all scheduled items have been completed or cancelled and returns a list of QResults if this queue has been told to collect results.java.util.Collection<QResult<I,R>>
waitForResults(long timeout, java.util.concurrent.TimeUnit unit)
Waits up to the specified time for scheduled jobs to complete.void
waitUntilDone()
Waits until all items have been processed OR an Exception happens during the processing of ANY item.boolean
waitUntilDone(long timeout, java.util.concurrent.TimeUnit unit)
-
-
-
Constructor Detail
-
ConcurrentQ
public ConcurrentQ(java.lang.String name, QCallback<I,R> callback)
Creates a ConcurrentQ that will process as many items as the given threadPool can handle at one time.- Parameters:
name
- The name of the thread pool that will be created by this constructor.callback
- the QWorker object that will be used to process items concurrently.
-
ConcurrentQ
public ConcurrentQ(QCallback<I,R> callback, java.util.Queue<I> queue, GThreadPool threadPool, QItemListener<I,R> listener, boolean collectResults, int maxInProgress, boolean jobsReportProgress)
Creates a ConcurrentQ that will process at most maxInProgress items at a time, regardless of how many threads are available in the GThreadPool.- Parameters:
callback
- the QWorker object that will be used to process items concurrently.queue
- the internal storage queue to use in this concurrent queue.threadPool
- the GThreadPool to used for providing the threads for concurrent processing.listener
- An optional QItemListener that will be called back with results when the item has been processed.collectResults
- specifies if this queue should collect the results as items are processed so they can be returned in a waitForResults() call.maxInProgress
- specifies the maximum number of items that can be process at a time. If this is set to 0, then this queue will attempt to execute as many items at a time as there are threads in the given threadPool. Setting this parameter to 1 will have the effect of guaranteeing that all times are processed one at a time in the order they were submitted. Any other positive value will run that many items concurrently, up to the number of available threads.jobsReportProgress
- true signals that jobs wish to report progress via their task monitor. The default is false, which triggers this queue to report an overall progress for each job that is processed. False is a good default for clients that have a finite number of jobs to be done.
-
-
Method Detail
-
addProgressListener
public void addProgressListener(QProgressListener<I> listener)
Adds a progress listener for this queue. All the progress and messages reported by a QWorker will be routed to these listener.- Parameters:
listener
- the listener for receiving progress and message notifications.
-
removeProgressListener
public void removeProgressListener(QProgressListener<I> listener)
Removes a progress listener from this queue. All the progress and messages reported by a QWorker will be routed to this listener.- Parameters:
listener
- the listener for receiving progress and message notifications.
-
setMonitor
public void setMonitor(TaskMonitor monitor, boolean cancelClearsAllItems)
Sets the monitor to use with this queue.- Parameters:
monitor
- the monitor to attache to this queuecancelClearsAllItems
- if true, cancelling the monitor will cancel all items currently being processed by a thread and clear the scheduled items that haven't yet run. If false, only the items currently being processed will be cancelled.
-
add
public void add(java.util.Collection<I> items)
Adds the list of items to this queue for concurrent processing.- Parameters:
items
- the items to be scheduled for concurrent processing
-
add
public void add(java.util.Iterator<I> iterator)
Adds the items of the given iterator to this queue for concurrent processing.- Parameters:
iterator
- an iterator from which the items to be scheduled for concurrent processing will be taken.
-
offer
public void offer(java.util.Iterator<I> iterator) throws java.lang.InterruptedException
Allows clients to use a bounded queue (such as aLinkedBlockingQueue
to control how many items get placed into this queue at one time. Calling theadd
methods will place all items into the queue, which for a large number of items, can consume a large amount of memory. This method will block once the queue at maximum capacity, continuing to add new items as existing items on the queue are processed.To enable blocking on the queue when it is full, construct this
ConcurrentQ
with an instance ofBlockingQueue
.- Parameters:
iterator
- An iterator from which items will be taken.- Throws:
java.lang.InterruptedException
- if this queue is interrupted while waiting to add more items
-
add
public void add(I item)
Adds the item to this queue for concurrent processing.- Parameters:
item
- the item to be scheduled for concurrent processing.
-
isEmpty
public boolean isEmpty()
Returns true if this queue has no items waiting to be processed or currently being processed.- Returns:
- true if this queue has no items waiting to be processed or currently being processed.
-
waitForResults
public java.util.Collection<QResult<I,R>> waitForResults() throws java.lang.InterruptedException
Waits until all scheduled items have been completed or cancelled and returns a list of QResults if this queue has been told to collect results.You can still call this method to wait for items to be processed, even if you did not specify to collect results. In that case, the list returned will be empty.
- Returns:
- the list of QResult objects that have all the results of the completed jobs.
- Throws:
java.lang.InterruptedException
- if this call was interrupted--Note: this interruption only happens if the calling thread cannot acquire the lock. If the thread is interrupted while waiting for results, then it will try again.
-
waitForNextResult
public QResult<I,R> waitForNextResult() throws java.lang.InterruptedException
Wait until at least one result is available and then return the first result.- Returns:
- the first available result
- Throws:
java.lang.InterruptedException
- if interrupted while waiting for a resultjava.lang.IllegalStateException
- if this queue has been set to not collect results (see the constructor).
-
waitUntilDone
public void waitUntilDone() throws java.lang.InterruptedException, java.lang.Exception
Waits until all items have been processed OR an Exception happens during the processing of ANY item.Note: If an exception does occur then the remaining items in the queue will be cleared and all current items will be cancelled.
If you wish for processing to continue for remaining items when any item encounters an exception, then you should instead use
waitForResults()
. That method will return all results, both with and without exceptions, which you can then process, including checking for exceptions. Note that to usewaitForResults()
to examine exceptions, you must have created this queue withcollectResults
as true.- Throws:
java.lang.InterruptedException
- if interrupted while waiting for a resultjava.lang.Exception
- any exception encountered while processing an item (this will cancel all items in the queue).
-
waitForResults
public java.util.Collection<QResult<I,R>> waitForResults(long timeout, java.util.concurrent.TimeUnit unit) throws java.lang.InterruptedException
Waits up to the specified time for scheduled jobs to complete. The results of all completed jobs will be returned if this queue has been told to collect results. At the time that this returns, there may still be work to process. The returned list will contain as much work as has been processed when the wait has finished. Repeated calls to this method will not return results from previous waits.You can still call this method to wait for items to be processed, even if you did not specify to collect results. In that case, the list returned will be empty.
- Parameters:
timeout
- the timeoutunit
- the timeout unit- Returns:
- the list of QResult objects that have all the results of the completed jobs.
- Throws:
java.lang.InterruptedException
- if this call was interrupted.
-
cancelAllTasks
public java.util.List<I> cancelAllTasks(boolean interruptRunningTasks)
Cancels the processing of currently scheduled items in this queue. Any items that haven't yet been scheduled on the threadPool are returned immediately from this call. Items that are currently being processed will be cancelled and those results will be available on the next waitForResults() call and also if there is a QItemListener, it will be called with the QResult. There is no guarantee that scheduled tasks will terminate any time soon. If they check the isCancelled() state of their QMonitor, it will be true. Setting the interruptRunningTasks to true, will result in a thread interrupt to any currently running task which might be useful if the task perform waiting operations like I/O.- Parameters:
interruptRunningTasks
- if true, an attempt will be made to interrupt any currently processing thread.- Returns:
- a list of all items that have not yet been queued to the threadPool.
-
removeUnscheduledJobs
public java.util.List<I> removeUnscheduledJobs()
-
cancelScheduledJobs
public void cancelScheduledJobs()
-
dispose
public void dispose()
Cancels all running tasks and disposes of the internal thread pool if it is a private pool.
-
waitUntilDone
public boolean waitUntilDone(long timeout, java.util.concurrent.TimeUnit unit) throws java.lang.InterruptedException
- Throws:
java.lang.InterruptedException
-
-