Parallel map: more JRuby concurrency mischief

Like my last post this is more for my future benefit but if anybody else finds it useful then that’s cool too. Unlike the last one the fruits of my tinkering yielded a nice linear speedup.

Ok, let’s parallelize Array#map. We’ll break down the task as follows:

  1. Split the array into chunks
  2. Execute the chunks in asynchronously, in parallel, waiting for them all to complete
  3. Merge the chunks into a new array and return it

How many chunks is optimal? There’s no definitive answer; In the past I’ve opted for a very large number of small sub-arrays, e.g. for concurrent divide & conquer reductions where the minimal array length was some low power of the number of processors (I’ve played with associative reduction algorithms in the past). For our #pmap method I’m just going to split the original array up into as many chunks as there are logical cores on my machine. How do you find that out in JRuby? Java to the rescue again:

$cores = Runtime.getRuntime.availableProcessors

Now we need a pool of workers to assign tasks to. As parallel mapping is strictly CPU bound, a thread pool with fixed thread count but an unbounded work queue is probably most appropriate:

queue = Executors.newFixedThreadPool($cores)

That’s our ExecutorService up & running, we just need to do a bit of housekeeping before we can write our Array#pmap method. This is where Java’s baroque-complexity boilerplate rears its ugly head (wouldn’t it be nice if Executors could take lambdas as arguments for mass invocation?). Basically we implement the Callable interface- I instantiate my Task implementation with a block which the executor calls when it executes:

class Task
  include Callable

  def initialize(&block)
    @work = block
  end

  def call
    @work.call
  end
  
end

So now we’re good to go. Array#pmap here takes an executor as a first argument because I didn’t want the class to be responsible for starting / shutting down the work queue, but that’s just an implementation decision.

Once the original array is mapped to a set of Task classes, they can be handed to the executor. I call ExecutorService#invokeAll because it blocks until all the submitted work is done. It returns an array of FutureTasks which can be dereferenced immediately (we want the method to return something!):

class Array

  def pmap(executor, &block)
    # Parcel out the work into chunks to be executed sequentially
    tasks = self.each_slice(self.size / $cores).map do |slice|
      Task.new { slice.map &block }
    end

    # Execute them all, block until they're done
    results = executor.invokeAll(tasks)

    # Dereference and merge all the FutureTasks
    results.reduce([]) { |memo,obj| memo + obj.get }
  end
  
end

So does all that tomfoolery actually buy you any more performance? Time for a highly unscientific benchmark, incrementing an array of the first million Fixnums:

chris@think-chris:~/Documents/Experimentation$ jruby pmap.rb 
"Splendid new Array#pmap"
"That took 0.287166921s"
"Plain old sequential Array#map"
"That took 0.537166921s"

Not bad- I ran it on a Sandy Bridge i5 with 2 cores and 4 CPU threads.

More important than a cheesy parallel mapping imeplementation, I’ve learned (or is that re-learned?) two axioms about playing with concurrency in JVM hosted languages:

  1. As in the previous post, executors expect some kind of anonymous inner class as an argument in the absence of closures. You need to be aware of the cost of converting a ruby closure to a Java Runnable, Callable, Future etc; think of I/O-bound problems where allocating extra objects for each request/event is almost certainly not a good thing. You’ll definitly save time writing in straight Java as you won’t have to worry whether or not the closure you just used is going to throw all your performance gains out the window.
  2. Don’t discount the JVM’s ability to make a fool of your benchmarks. The one above isn’t worth the screen real-estate it occupies due to its simplicity and duration relative to startup / shutdown time of the VM. Both JIT and Server modes take time to hit a ‘quiescent state’ where performance stabilises. Don’t just look at wall-time, profile your code properly. Lots has been said about this elsewhere so I won’t rehash what others have articulated better, but be aware HotSpot has its own performance quirks.

The example in its entirety is here.

Easy thread safety with JRuby

EDIT: Beauty, as they say, is pain. In exchange for liberating your code from locks, piling the work onto a queue is approximately ~10x slower than a traditional approach. A quick profile suggests that the expense of block creation is non-trivial. It’s still nicer to look at though, right?

More for my benefit, but it’s handy that JRuby lets you use asynchronous Java queues. Want to make something like this thread-safe?

class Something

  def initialize(value)
    @value = value
  end

  # Not thread-safe

  def inc
    @value += 1
  end

  def dec
    @value -= 1
  end

end

Just wrap the assignment in a Runnable (JRuby coerces blocks/Procs/lambdas into Runnables for you) and submit it to a single-threaded Executor. All calls to inc and dec execute one at a time, in-order:

require 'java'

java_import java.util.concurrent.Executors

class Something

  def initialize(value)
    @queue = Executors.newSingleThreadExecutor
    @value = value
  end

  def inc
    @queue.execute { @value += 1 }
  end

  def dec
    @queue.execute { @value -= 1 }
  end

  def finalize
    @queue.shutdown
  end
end

Tasks are applied FIFO, @value stays consistent without locks- sound familiar?