Java closures and threads

I have always thought that the biggest weakness of the proposed closures for Java is threading. Strangely enough, the impression one gets from other articles is that closures and threading is a good match and that closures will somehow make threading easier. To analyze this issue I'll start from Neal Gafter's blog post about concurrent loops and closures.

Neal proposes the following code using closures:

public Collection getAttendees() {
List result = new ArrayList();
for eachConcurrently(EventResponse r : getResponses(), threadPool) {
if (r.mayAttend()) {
Principal attendee = r.getAttendee();
synchronized (result) {
result.add(attendee);
}
}
}
return result;
}

Imagining myself coming across this code in some code mass I was analyzing, I would think it looked very neat. Then the synchronized keyword would pop out at me and rouse my curiosity, I would look closer and see for eachConcurrently which explains it nicely and I would move on, no problem.

But what if the synchronized block was missing? How easily would I catch the error?

Therein lies the weakness, there is no clear signal that there may be other issues to consider, in this case access to shared resources. You may sometimes even be submitting your closure to multi-threaded execution without knowing it. A variation of the same flaw were mechanisms like DCOM that allowed transparent use of remote components.

At least you get the idea that something out of the ordinary is happening in the horribly clumsy code presented by Neal as being the current state of affairs without closures, no doubt to emphasize his point and inspire fear in the hearts of the doubters.

But the java.util.concurrent package actually leads you down the right path if you care to follow. A Callable should normally return a result and a Future returns a "canned" execution result from the Callable, or the exception thrown packaged in an ExecutionException, once the processing is done and you are ready to deal with it. Which might lead to the following code instead:

public Collection getAttendees() {
CompletionService ecs =
new ExecutorCompletionService(threadPool);
int numberOfTasks = 0;
for (final EventResponse r : getResponses()) {
if (r.mayAttend()) {
++numberOfTasks;
submitGetAttendeeTask(r, ecs);
}
}
return collectGetAttendeeTaskResults(numberOfTasks, ecs);
}

Does that look so formidable? Fair enough, it needs some more code to be complete, but nothing scary there either:

private void submitGetAttendeeTask(final EventResponse r,
CompletionService ecs) {
ecs.submit(new Callable() {
public Principal call() {
return r.getAttendee();
}
});
}

private Collection collectGetAttendeeTaskResults(int numberOfTasks,
CompletionService ecs) {
final List result = new ArrayList();
for (int i = 0; i < numberOfTasks; ++i) {
try {
result.add(ecs.take().get());
} catch (InterruptedException ex) {
throw new AssertionError(ex); // shouldn't happen
} catch (ExecutionException ex) {
throw new AssertionError(ex); // shouldn't happen
}
}
return result;
}

Having come this far, it would certainly be nice to lose some of the boilerplate in creating the Callable and the possible unwrapping of the ExecutionException, so there are certainly some good motivations behind closures. Can this be solved by other means? Certainly, CICE would help with the first, and for the second there could be devised a cute way of specifying and parametrizing sets of exceptions rather than having Callable throws Exception and Future.get() throws ExecutionException. One thing you still would have to deal with is the InterruptedException that is thrown if your main thread is interrupted while waiting for the results. But that is something you maybe should think about or at least might want to think about or use in the future, it is one of those threading issues after all.

Comments

hlovatt said…
Very good post, I agree with your sentiments. In my proposal, C3S:

http://www.artima.com/weblogs/viewpost.jsp?thread=182412

You could write:

private void submitGetAttendeeTask(final EventResponse r,
CompletionService ecs) {
ecs.submit( method { r.getAttendee } );
}

Popular Posts