Pseudo Parallel Streams in JDK 1.7

Recently I was working on some Java 1.7 code trying to improve its performance a bit. One part of the code was traversing a collection of thousands of objects and enhancing them with database lookups. The database lookups could generate about a 1,000 detail records at a time.  The logic was to pull a batch of detail records, enhance those objects and then move on to the next batch in sequence.  What I wanted was to do the batches in parallel as much as possible.  The following solution was low impact and produced a significant performance improvement.

The Approach

Thinking a bit more functionally and drawing from JDK 8 streams I took the following approach.  I decided to  break the list it into chunks,  throw them all into a thread pool with a function to handle them, and wait on them all to complete.  I wasn’t supposed to put much time or effort into this refactor so it all had to be dead simple.

Phase I: Support Code

First I needed to segment the collection up into smaller thread safe parts, I didn’t have an actual Collection, but I did have an Iterator available.  I decided to segment the Iterator into sub Lists that I could safely use in parallel.

First lets write a function to get the next N elements from an Iterator:


public static <T> Iterator<T> next(final Iterator<? extends T> iterator,
                                   final int count) {
  return new Iterator<T>() {
    private int position = 0;

    @Override
    public boolean hasNext() {
      return iterator.hasNext() && position < count;
    }

    @Override
    public T next() {
      T next = iterator.next();
      position++;
      return next;
    }

    @Override
    public void remove() {
      throw new UnsupportedOperationException("no removal");
    }
  };
}

Now lets collect those elements into a list:


public static <T> List<T> collect(Iterator<T> iterator) {
  List<T> list = new ArrayList<T>();
    while (iterator.hasNext()) {
      list.add(iterator.next());
    }
  return list;
}

Creating a Pool of Threads

It 1.7 there is the ExecutorService available already, and I could ask the runtime for the processor count to size the pool:


private static final ExecutorService EXECUTOR_SERVICE =
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

A Consumer

Drawing from Java 8 I also added a consumer function:


public interface Consumer<T> {
  void accept(T consumable);
}

 

Phase 2: Using The Chunks

With the support code in place, it was simple to finish this:


public static <T> void parallelBatch(Iterator<? extends T> iterator,
                    final Consumer<Iterator<? extends T>> consumer,
                    int batchSize)
                    throws InterruptedException, ExecutionException {
  List<Callable<Boolean>> callables = new ArrayList<Callable<Boolean>>();
  while (iterator.hasNext()) {
    Iterator<T> i = next(iterator, batchSize);
    final List<T> list = collect(i);
    callables.add(new Callable<Boolean>() {
      @Override
      public Boolean call() throws Exception {
        consumer.accept(list.iterator());
        return true;
      }
    });
  }
  List<Future<Boolean>> futures = EXECUTOR_SERVICE.invokeAll(callables);
  for (Future<Boolean> future : futures) {
    future.get();
  }
}

Conclusion

With the above code I easily switch the enrichment logic. I moved the operations into the Consumer, and then called parallelBatch on the iteration, with the Consumer.  Sure enough I saw a nice speed up with this.

If you want to see the code all coming together you can take a look at my almost-functional github project.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s