Wednesday, November 22, 2006

Linear Time Sort Puzzler

Interviewing for an engineering position at Google can be grueling and humbling. You get less than an hour with each interviewer, and may get one technical problem after another to solve. If you have performance anxiety you may have a hard time focusing on the problem. The best approach, if you can do it, is to think of the interview as a technically challenging but friendly brainstorming session. If you get hired, that's what your life will be like.

My friend, Jim Davis, told me about someone he interviewed earlier this week. The candidate thought the interview wasn't going well and hoped to impress Jim by sharing an idea he had: a new algorithm for sorting in linear time. It isn't a sort based exclusively on comparisons; it can be proven that any such sort must perform O(n log n) comparisons. It isn't a bucket sort, which is a standard algorithm for this problem (assuming certain characteristics of the input set). But the candidate's algorithm helped Jim understand the candidate's capabilities.

The algorithm works like this: you start by making a linear scan through the n numbers to be sorted and note the largest and smallest of the elements. Using that, you compute a linear function f(x) that will map the smallest input element to zero and the largest to n. Then, for each input element xi, you fork a task that waits f(xi) milliseconds and then outputs xi. After n milliseconds the output contains the input elements in sorted order.

There may be problems when two tasks attempt to output their data at nearly the same time, but let's ignore that for a moment and assume an "ideal" system. If you have a truly concurrent computer, this may run in O(n) time using O(n) processors, thereby consuming a total of O(n2) processing time. There are far better concurrent sorting algorithms around. Much more interesting is the idea of running this algorithm on a sequential computer with a non-preemptive scheduler. The non-preemptive scheduler avoids the problem of two tasks producing data at the same time. The scheduler can also ensure that the tasks run in their proper order, even if the running time of some tasks causes others to start slightly "too late".

Jim explained the problem with the algorithm, concluding "so you've reduced the problem of sorting in linear time to a different linear time sorting algorithm." After Jim explained this remark, the candidate replied "well, the idea isn't really very well thought out." Here is the puzzle: what was Jim talking about?

Monday, November 20, 2006

A Thread Pool Puzzler

I participated in the design and development of a couple of concurrency libraries for shared-memory multiprocessors long before such machines were popular. So when I started using java.util.concurrent I was already somewhat comfortable with the concepts. But when I used it more intensely for production work in the Google Calendar server, I ran into a couple of "gotcha" situations. I'd like to tell you about one in particular, in part because it might help you avoid the problem yourself, and in part because I believe this issue exposes some missing functionality in the concurrency framework.

Many parallel programming problems can be expressed using fork-join parallelism, in which tasks spawn, or "fork", a number of subtasks that can be executed in parallel. The caller then waits for these subtasks to complete by "join"ing with them. Consider the following sequential program. It is an abstract model of some larger program that has three logical layers.

class Program {
    static final int N = 3;
    public static void main(String[] args) {
        doSomeWork();
        loopNtimes(N, new Runnable() {
                public void run() { doLayerOne(); }
            });
        System.out.println();
    }

    static void doLayerOne() {
        doSomeWork();
        loopNtimes(N, new Runnable() {
                public void run() { doLayerTwo(); }
            });
    }

    static void doLayerTwo() {
        doSomeWork();
        loopNtimes(N, new Runnable() {
                public void run() { doLayerThree(); }
            });
    }

    static void doLayerThree() {
        doSomeWork();
    }
    static void loopNtimes(int n, Runnable runnable) {
        for (int i=0; i<n; i++) runnable.run();
    }
    static void doSomeWork() {
        System.out.print(".");
        try { Thread.sleep(500L); } catch (InterruptedException _) {}
    }
}

This program prints 40 dots, taking a half second for each one. It runs to completion in about 20 seconds. Let's rewrite the loops as concurrent (instead of sequential) loops, using an idiom recommended by Martin Buchholz. To do that we replace the method loopNtimes with the following:

    static ExecutorService threadPool = Executors.newCachedThreadPool();
      static void loopNtimes(int n, Runnable runnable) {
        Collection<Callable<Object>> c = new ArrayList<Callable<Object>>();
        for (int i=0; i<n; i++) c.add(Executors.callable(runnable));
        Collection<Future<Object>> futures = null;
        try { futures = threadPool.invokeAll(c); } catch (InterruptedException _) {}
        if (futures != null) for (Future<Object> f : futures) {
            try { f.get(); }
            catch (InterruptedException ex) {}
            catch (ExecutionException ex) {
                ex.printStackTrace();
                System.exit(1);
            }
        }
    }

This requires a couple of other minor changes to the program (two import statements and System.exit(0) at the end of main), but the program now runs in two seconds instead of twenty. So far so good, but if N is larger, say a hundred, this program fails. It throws OutOfMemoryError becuase it tries to allocate too many threads. My first attempt to fix this replaced the thread pool by one containing a fixed number of threads:

    static ExecutorService threadPool = Executors.newFixedThreadPool(100);

This version of the program works and runs in 2 seconds. But why should we use 100 threads? If we imagine that the Thread.sleep statements represent computationally intensive parts of the program, it might make more sense to have a number of threads approximately the same as the number of physical processors. I'm running this on a machine with an Intel Cetrino Duo processor, which acts roughly like 2 processors. Let's be generous, however, and make ten threads. So we modify this version of the program by changing 100 to 10. That won't be as fast as the version with 100 threads, but just how fast will it be?

If you haven't guessed the punch line by now, I'll tell you: with ten threads in the pool the program prints 11 periods and then deadlocks! If you use a debugger to examine the state of the program to figure out what's going on, you'll find the main thread waiting for invokeAll, three threads in doLayerOne waiting for invokeAll, seven threads in doLayerTwo waiting for invokeAll, and there are no threads left to do any of the work of calling doLayerThree. This is a classic thread starvation deadlock.

If you're just trying out this program to see what happens, you might be slightly annoyed and finally give up and hit control-C to quit the Java program, but when our program (Google Calendar) encounters this kind of problem our customers get annoyed, give up, and sign up for a competitor like Yahoo Calendar or 30Boxes. Hey, don't click those links; trust me, you really want Google Calendar. My point is that we can't leave this to chance.

What can or should we do about this problem? The first idea is to change the 10 back into 100, but those numbers are pulled out of thin air. Without analyzing the behavior and interaction of all the places where the thread pool is used, understanding the dynamic performance of the application under real loads, and placing bounds on the number of tasks that will be used at each level in the program's hierarchy, it is difficult or impossible to pick a number that will always avoid this kind of deadlock. Another idea is to use unbounded thread pools, but as we've seen under high load situations those can cause an explosion in the number of threads, resulting in the program failing by running out of memory.

What we did to address this issue is avoid the single monolithic thread pool altogether. Instead, we use a separate thread pool at every level in the hierarchy. In terms of this example, we would have a thread pool for use in main, one for use in doLayerOne, and one for use in doLayerTwo. Every subsystem that requires concurrency gets its own personal thread pool. That way every layer that uses concurrency is guaranteed to make progress when it has work to do, so this kind of deadlock cannot occur. But there is a cost to this as well: balancing the sizes of these thread pools is a black art. During operation we have hundreds of threads, most of which are sitting around doing nothing. Besides being a waste of resources, the generous surplus of "extra" threads make debugging more difficult than it should be. If the system doesn't break down so neatly into layers (perhaps because there are recursive loops in the call cycle of the subsystems) then even this solution can break down and result in thread starvation.

The situation is not entirely hopeless. In my opinion, this kind of thread starvation should never occur because there is always one thread that can contribute processing power toward execution the subtasks: the thread that is waiting for the subtasks to complete. Here's the implementation of invokeAll as it appears in the JDK:

    public <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            for (Callable<T> t : tasks) {
                FutureTask<T> f = new FutureTask<T>(t);
                futures.add(f);
                execute(f);
            }
            for (Future<T> f : futures) {
                if (!f.isDone()) {
                    try { 
                        f.get(); 
                    } catch(CancellationException ignore) {
                    } catch(ExecutionException ignore) {
                    }
                }
            }
            done = true;
            return futures;
        } finally {
            if (!done)
                for (Future<T> f : futures) 
                    f.cancel(true);
        }
    }

This code does not use the current thread to do any of the work of invoking the callables. Below is a slightly modified version (I've added a line to the original and refactored it to make it a static method that we can put in the program) that uses the current thread to do any work that another thread hasn't already started. I've highlighted the newly added code:

    public static <T> List<Future<T>> invokeAll(
            ExecutorService threadPool, Collection<Callable<T>> tasks)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            for (Callable<T> t : tasks) {
                FutureTask<T> f = new FutureTask<T>(t);
                futures.add(f);
                threadPool.execute(f);
            }
            // force unstarted futures to execute using the current thread
            for (Future<T> f : futures) ((FutureTask)f).run();
            for (Future<T> f : futures) {
                if (!f.isDone()) {
                    try { 
                        f.get(); 
                    } catch(CancellationException ignore) {
                    } catch(ExecutionException ignore) {
                    }
                }
            }
            done = true;
            return futures;
        } finally {
            if (!done)
                for (Future<T> f : futures) 
                    f.cancel(true);
        }
    }

Using this version of invokeAll, the program does not experience thread starvation. If the thread pool is reduced in size to just one thread, the program runs to completion in about 11 seconds, because two threads are contributing to doing the work (the main thread and the thread from the pool).

I discussed this issue with Doug Lea, and he warned me that selecting tasks for efficient scheduling in a fork-join concurrency framework is not trivial; the standard solution is to have a double-ended queue for each worker task where it enqueues its subtasks. The worker removes the most recently generated task from this queue for itself to process, thereby simulating a depth-first execution strategy in the single-thread case. When a worker finds itself without any work to do, it steals work from the other end of the queue of another task. That is, it should steal one of the least-recently created (course-grained) subtasks. In addition, it is beneficial to have a mechanism to avoid the queue altogether for the bottom of the call chain. Doug told me that this strategy was pioneered by the Cilk work, but I first learned about this strategy 10 years earlier reading WorkCrews: An Abstraction for Controlling Parallelism by Mark T. Vandervoorde and Eric S. Roberts. My implementation provides exactly this behavior but with a much simpler implementation. The invocation of run executes one of the tasks most recently generated by the current thread. When a thread has no more work to do, it removes work from the queue of the underlying ExecutorService, which is a FIFO queue, and so it takes the least-recently generated task of all workers. On the other hand, because this implementation shares a single queue among all worker threads, there may be additional synchronization overhead compared to the WorkCrews/Cilk solution.

It is possible to use the existing concurrency utilities to work around the problem, if you don't mind the task scheduling being far from optimal. You can do that by setting CallerRuns policy on a ThreadPoolExecutor, and using a synchronous queue:

static ThreadPoolExecutor threadPool =
  new ThreadPoolExecutor(0, 10, 60L, TimeUnit.SECONDS,
                         new SynchronousQueue<Runnable>());
static {
    threadPool.setRejectedExecutionHandler(
        new ThreadPoolExecutor.CallerRunsPolicy());
}

Doug explained to me that the earlier public-domain version of the concurrency utilities had a full implementation of a framework for fork-join parallelism, but they didn't get included in JDK5:

"... The vast majority of such usages are nicest to support as "loop parallelism" utilities. And it is not so much that utilities based on FJ tasks are inconvenient to use that has kept them out, but instead uncertainty about basic APIs until closures are settled. All of the methods for aggregate operations on collections and arrays (applyToAll, map, reduce, mapReduce, findAny, findAll, etc) require function-type arguments (perhaps along with some sort of purity annotation as might be introduced via JSR305) that, depending on how things work out otherwise, would need to be introduced more generally."

Did you think I would get through an entire blog post without mentioning Closures?

Monday, November 13, 2006

Closures Esoterica: completion transparency

The Closures for Java draft specifiation (currently at v0.3) is the product of a lot of work. Everything in the spec has been discussed and scrutinized before being placed into the specification, and is there for a reason. From your point of view, you have seen snapshots of our specification, with very little explanation of why things are the way they are. Changes from one revision to another may seem arbitrary. I am not writing detailed notes on the progress and evolution of the specification and its prototype, because there is too much to explain and too little time would be left for me to work on the specification and prototype. I am, after all, working on this on my own time. I suspect few people would care for that level of detail anyway. Much of the work on the specification takes place during face-to-face meetings, after which I update the specification to reflect our decisions. Some issues get resolved through email discussions. We've been keeping an eye on the comments on this blog, various message boards, mailing lists, and elsewhere for input, and meeting with interested folks, and that has been a very useful part of the process. Thank you for your help!

I'm preparing the next revision of the specification, and we just resolved an issue by email. Unless you know what the issue is, the change in the upcoming version of the specification will appear totally arbitrary. Because the issue was resolved by email, I have a nice record of the problem and its resolution. Here is an edited excerpt of the the start of our discussion:

We'd like programmers to be able to define their own control constructs. One thing that would make this easier would be if programmer-defined control constructs act like built-in ones for the purposes of handling reachability of statements (and "can complete normally"). That's why we added the bottom type java.lang.Unreachable to the specification. But just having that isn't enough. Watch as I try to define a withLock method that is completion transparent.

First, ignoring reachability, the library writer might define

<throws E> void withLock(Lock lock, {=>void throws E} block) throws E { ... }

this achieves exception transparency, but not completion transparency. If the block returns a value, this would work:

<T, throws E> T withLock(Lock lock, {=>T throws E} block) throws E { ... }

this works because a closure that can't complete normally can be converted, via the closure conversion, to {=>Unreachable}. In practice, any specific invocation of withLock will either have a block that doesn't return anything (i.e. =>void) or can't complete normally (i.e. =>Unreachable}. You might think, therefore, that the library writer need only write these two overloaded methods to achieve completion transparency, and let overload resolution take care of the rest.

Unfortunately it isn't that simple. With these two methods, an invocation of withLock using a closure that can't complete normally can be an invocation of either of these methods. That's because the closure conversion can convert a closure that results in Unreachable to an interface whose method returns void. Since both are applicable, the compiler must select the most specific. Neither of these methods is more specific than the other (the closures are unrelated, given our scheme of mapping function types to interfaces), so the invocation is ambiguous.

I don't propose that we mess with the specification for "most specific". I'm afraid that would be a disaster, though maybe you can reassure me that it isn't. Instead, I propose that the closure conversion be allowed to convert a closure that results in void to an interface whose method's return type is java.lang.Void. The generated code would naturally return a null value. Then the library writer would write only the second version, above, and it would work both for the void-returning case and the Unreachable-returning case. I think being able to write control abstractions as a single method (instead of two overloadings) is a significant advantage. Additionally, this API is more flexible because it can be used by programmers to pass a value back through from the closure to the caller.

We discussed this issue and found the proposed resolution our best option. The next version of the proposal will include in the closure conversion a provision allowing conversion of a closure that returns void to an interface type whose method returns java.lang.Void. You can see hints in this email thread that the syntax of a function type has changed slightly (the throws clause has moved inside the curly braces), and that a function type is now specified to be an interface type, rather than having its own type rules. The specification is getting simpler, which is definitely a move in the right direction!

Sunday, November 05, 2006

Reified Generics for Java

Many people are unsatisfied with the restrictions caused by the way generics are implemented in Java. Specifically, they are unhappy that generic type parameters are not reified: they are not available at runtime. Generics are implemented using erasure, in which generic type parameters are simply removed at runtime. That doesn't render generics useless, because you get typechecking at compile-time based on the generic type parameters, and also because the compiler inserts casts in the code (so that you don't have to) based on the type parameters.

Generics are implemented using erasure as a response to the design requirement that they support migration compatibility: it should be possible to add generic type parameters to existing classes without breaking source or binary compatibility with existing clients. I wrote about this two years ago. Migration compatibility is exploited widely in JDK5; all of the collection classes and interfaces use generics, yet existing code using collections continues to work. Without migration compatibility, the collection APIs could not be retrofitted use generics; we would probably have added a separate, new set of collection APIs that use generics. That was the approach used by C# when generics were introduced, but Java did not take this approach because of the huge amount of pre-existing Java code using collections.

While solving one set of problems, erasure adds a set of its own problems. For a type parameter T, you can't write a class literal T.class. You can't use instanceof to test if an object is of type T. And you can't create an array of T. But it gets worse. You also can't write class literals for generic types like List<String>.class, or test if an object is an instanceof List<String>, or create an array of List<String>. The implementation of generics using erasure also causes Java to have unchecked operations, which are operations that would normally check something at runtime but can't do so because not enough information is available. For example, a cast to the type List<String> is an unchecked cast, because the generated code checks that the object is a List but doesn't check whether it is the right kind of list.

It isn't too late to add reified generics to Java. There are two basic approaches. The first uses the language as it exists today but adds the generic type information at runtime. In the ideal world, we wait until every bit of Java code in the world has been modified to use generics safely, and then go through a transition in which we start recording type parameters at runtime by using a new javac and VM. There are two main difficulties with this approach. First, it is not compatible. It is not source compatible because you would no longer be allowed to use raw types (i.e., it is impractical to wait until all code has been generified); it is not binary compatible because new clients would invoke new kinds of constructors for generic classes that also record the type parameters, but existing classes do not have those constructors. (A hybrid approach is being investigated in which the system records type parameters only when they are available, but I haven't yet seen a workable proposal along these lines.) The second problem is more insidious: lots of existing code uses generics but uses them in an unsafe way. For example, I have seen code that creates an ArrayList<Object>, but later casts it to a List<String> where the author knows that it only contains objects of type String. I would not be surprised to find such code in the JDK. Such code must fail at runtime when generics are reified, so some existing (but working) code would be broken.

The other approach modifies the language so that the declaration of a generic parameter distinguishes reified type parameters from erased ones. It is a pure extension of the language. The existing syntax for generics would continue to designate generics by type erasure, but a newly introduced syntax would be used to declare reified type parameters. Perhaps something like this:

class NewCollection<class E> extends Collection<E> { ... }
class NewList<class E> extends NewCollection<E>, List<E> { ... }

The rules for these reified type parameters are straightforward. When using a reified generic class, the type argument has to be a reified type. You would not be allowed to create a NewCollection in its raw form. Code that uses only reified types would never get an unchecked warning from the compiler, because the compiler can always generate the correct checking code. If you have a reference of type Collection that refers to a NewCollection<String>, you would get a runtime error if you attempt to put anything other than a String into it. In this sense reified collections are like java.util.Collections.checkedCollection, only better because the element type can itself be generic and the guarantees are stronger. You can even create an array of E inside an implementation of this type. But there is a disadvantage: the strategy requires a new set of reified Collections APIs parallel to the existing erasure-based ones. This is why C# introduced new collections APIs when they introduced generics.

As the above declaration illustrates, the new reified collection interfaces could be extensions of the existing collection interfaces. Consequently, existing APIs that receive old collections can be passed new collections. For example, you could pass a NewCollection<String> to an API that is declared to receive a Collection<String>. In addition, the new reified collection classes (that is, the implementations) could be extensions of the existing collection classes. That allows the implementation of the old and new collection classes to be shared, and provides a nice migration path. Existing APIs that return old collections can be modified to return new collections. Over time, most libraries can be migrated to use the new APIs without breaking backward compatibility (though it would break unsafe clients). In this way we can have a more gradual migration than would be possible with the first approach.

I don't mean to trivialize the work involved: this requires a significant revision of the VM and language specifications and a significant effort from the teams who implement VMs, java compilers (i.e. javac and Hotspot), and the core JDK libraries. From the programmer's point of view, however, the language change is small and mostly involves removing restrictions.

Approaching the problem this way has a secondary benefit. Because the new reified collection interfaces don't exist yet, it is possible for methods to be added to them when they are introduced. Perhaps sorting methods belong in NewList? If reified generics are added at the same time as (or after) closures, a number of useful methods could be added to take advantage of closures. Filters, mappers, reducers, and visitors are just some of the ideas.