Using Java concurrency utilities
The inspiration for this post comes from Jacob Hookom's blog and I can only second the recommendations he gives. Although, as always, I would caution to test any such implementation properly, that it works well and actually provides a benefit. There are lots of pitfalls and concurrency is tricky even with the excellent utilities provided in Java.
To summarize the interesting problem: parallelize the execution of lengthy tasks in a web request, without creating many threads for each request, but also ensuring that the thread pool is not starved by one request. The idea is to have a reasonably sized thread pool and to limit the number of tasks executing in parallel to a number small enough to allow the expected amount of concurrent requests to share the pooled threads.
Essentially, limiting the number of tasks executing in parallel can be done in two ways: limit the number of tasks submitted at one time or limit the number of workers that execute a set of tasks. Jacob takes the first approach, I will take the second approach, which seems to make it simpler to manage time-out issues.
Here's some code:
<V> Queue<Future><V>> submit(int numberOfWorkers, Queue<Callable><V>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, TimeoutException {
Queue<Future><V>> result = new ConcurrentLinkedQueue<Future><V>>();
List<WorkerTask><V>> workers = new ArrayList<WorkerTask><V>>(numberOfWorkers);
for (int i = 0; i < numberOfWorkers; i++) {
workers.add(new WorkerTask<V>(result, tasks));
}
List<Future><Object>> deadWorkers
= executor.invokeAll(workers, timeout, unit);
for (Future<Object> obituary : deadWorkers) {
if (obituary.isCancelled()) {
throw new TimeoutException();
}
}
return result;
}
And the code for a WorkerTask:
private static class WorkerTask<V> implements Callable<Object> {
private Queue<Callable><V>> tasks;
private Queue<Future><V>> result;
public WorkerTask(Queue<Future><V>> result, Queue<Callable><V>> tasks) {
this.result = result;
this.tasks = tasks;
}
public Object call() {
for (Callable<V> task = tasks.poll(); task != null; task = tasks.poll()) {
FutureTask<V> future = new FutureTask<V>(task);
future.run();
if (Thread.interrupted()) {
Thread.currentThread().interrupt(); // Restore interrupt.
break;
}
result.add(future);
}
return null;
}
}
Note that it is important to have thread-safe collections for tasks and result, we should actually make sure that the tasks are in a thread-safe collection, but I'll ignore that for now. Note also the check if the thread has been interrupted in the call() method of WorkerTask. That is vital to be able to cancel the task when you don't want to wait for it any longer (i.e. on time-out). If possible, the submitted tasks should also handle interrupts. Note the careful restoration of the interrupt status so that the caller of the method may also be notified.
Comments