Thursday, December 25, 2008

Using Java concurrency utilities

The inspiration for this post comes from Jacob Hookom's blog and I can only second the recommendations he gives. Although, as always, I would caution to test any such implementation properly, that it works well and actually provides a benefit. There are lots of pitfalls and concurrency is tricky even with the excellent utilities provided in Java.

To summarize the interesting problem: parallelize the execution of lengthy tasks in a web request, without creating many threads for each request, but also ensuring that the thread pool is not starved by one request. The idea is to have a reasonably sized thread pool and to limit the number of tasks executing in parallel to a number small enough to allow the expected amount of concurrent requests to share the pooled threads.

Essentially, limiting the number of tasks executing in parallel can be done in two ways: limit the number of tasks submitted at one time or limit the number of workers that execute a set of tasks. Jacob takes the first approach, I will take the second approach, which seems to make it simpler to manage time-out issues.

Here's some code:

<V> Queue<Future><V>> submit(int numberOfWorkers, Queue<Callable><V>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, TimeoutException {
Queue<Future><V>> result = new ConcurrentLinkedQueue<Future><V>>();
List<WorkerTask><V>> workers = new ArrayList<WorkerTask><V>>(numberOfWorkers);
for (int i = 0; i < numberOfWorkers; i++) {
workers.add(new WorkerTask<V>(result, tasks));
}
List<Future><Object>> deadWorkers
= executor.invokeAll(workers, timeout, unit);
for (Future<Object> obituary : deadWorkers) {
if (obituary.isCancelled()) {
throw new TimeoutException();
}
}
return result;
}


And the code for a WorkerTask:

private static class WorkerTask<V> implements Callable<Object> {

private Queue<Callable><V>> tasks;
private Queue<Future><V>> result;

public WorkerTask(Queue<Future><V>> result, Queue<Callable><V>> tasks) {
this.result = result;
this.tasks = tasks;
}

public Object call() {
for (Callable<V> task = tasks.poll(); task != null; task = tasks.poll()) {
FutureTask<V> future = new FutureTask<V>(task);
future.run();
if (Thread.interrupted()) {
Thread.currentThread().interrupt(); // Restore interrupt.
break;
}
result.add(future);
}
return null;
}
}

Note that it is important to have thread-safe collections for tasks and result, we should actually make sure that the tasks are in a thread-safe collection, but I'll ignore that for now. Note also the check if the thread has been interrupted in the call() method of WorkerTask. That is vital to be able to cancel the task when you don't want to wait for it any longer (i.e. on time-out). If possible, the submitted tasks should also handle interrupts. Note the careful restoration of the interrupt status so that the caller of the method may also be notified.

3 comments:

Jacob Hookom said...

Nice post-- I wonder with your implementation if it wouldn't afford for concurrent requests to share/race for processor time? Meaning, a large request could starve out 5 threads until the request was finished, not allowing another request to come in and schedule use of the processing thread?

tobe said...

Fair point, Jacob, it would use 5 threads until the request was finished or timed out, while your best approaches carefully used at most 5 threads.

pbsl said...

you have a nice site. thanks for sharing this site. there are various kinds of ebooks are available here

http://feboook.blogspot.com