Thursday, June 5, 2008

Multicore Counter

Like sorting, thread safe tallying is pretty straightforward. Or at least it should be. If your coding in any JVM based language, it should be virtually free. The java.util.concurrent package has been part of the JDK for a few years now and provides very easy to use primitives that are almost perfect for tallying. Almost.

There are two parts to this puzzle. The first part is, you need a thread safe map (from strings to accumulators), and the second part is, you need an accumulator with atomic increment/add functions so that simultaneous updates to an accumulator will not collide. These two parts exist in the form of java.util.concurrent.ConcurrentHashMap and java.util.concurrent.atomic.AtomicInteger. Putting it all together with Scala's MapWrapper for Java maps, you get this simple solution:

class Counter extends MapWrapper[String,AtomicInteger] {
   override val underlying = new ConcurrentHashMap[String,AtomicInteger]

The problem is, that doesn't work as expected. When the Java concurrent classes are taken together with a teaspoon of Scala's syntactic sugar for maps, you can get yourself into some unexpected, unsynchronized, hot water. Normally, Scala offers a couple of sweet shortcuts for dealing with maps (or any other objects that adopt the same protocol), as this code demonstrates:

object Test extends Application {
    val m = new Counter              // create an empty counter
    m("a") = new AtomicInteger(4)    //"a" -> 4
    m("a") += 3                      //"a" -> 7

What's really happening under the covers is something like this:

object Test extends Application {
   val m = Map[String,Int]()
   m.update("a", new AtomicInteger(4))
   m.update("a", m.apply("a") + 3)

which brings us straight to our first problem: you can't use arithmetic operators with AtomicIntegers. You could extend AtomicInteger with a Scala class that defines +, but an even more incidious problem lurks just beyond that: you can't use Scala's compound assignment operator (<op>=) syntactic sugar and expect atomicity. At first it seemed like you could override whatever method Scala's compiler substitutes for += on a MapWrapper, and have it call the atomic ConcurrentMap.replace method, instead of Map.put, but as you can see, there is no single method. The compiler injects two method calls in place of +=. The update and apply are performed seperately without synchronization. So while it would be nice to use this += syntactic sugar, it's out. Instead we're going to need do define a new method that initializes or increments an accumulator with guaranteed atomicity, something like:

def inc(key:String, amount:Int) = {
   var oldCount = underlying.putIfAbsent(key, new AtomicInteger(amount))
   if(oldCount == null) amount
   else oldCount.addAndGet(amount)

def inc(key:String):Int = inc(key,1)

So that works fine for counters that have Integer range accumulators, how about when you need the range of a Long? Easy. Generics to the rescue. Right? No. No? Crap. In Java there is no way to parameterize your type over the set of {AtomicInteger, AtomicLong} so that your generic type can use the addAndGet method, for two reasons.

  1. they each sepearately define an addAndGet method, not inherit its declaration from a common ancestor, and
  2. they each take an argument of a different fixed type

In Scala you can get around these problems using either Structural Types or Implicit Type Parameters (a.k.a. the Poor Man's Type Classes). Both of these solutions work to some limited extent for this problem, but both also came with too much conceptual and computational overhead for my needs. Structural types, I was told, are implemented using Java reflection, which is a huge performance drain, and implicit type parameters require implicit objects to be visible through your code, and extra implicit argument lists on many function definitions. Way too much work for too little gain.

So instead of parameterizing over the accumulator type, I decided to just stick with Long accumulators for all cases. Even using AtomicLongs proved more trouble that it was worth, causing unneeded complexity either in the class definition of Counter, or externally, everywhere the accumulators were used in the code. So ultimately, I bit the bullet and just implemented an atomic increment method on Longs. Here's the finished product:


import scala.collection.jcl.MapWrapper
import java.util.concurrent.ConcurrentHashMap

import Config.{counters,segments}

/** a Map backed by a java.util.concurrent.ConcurrentHashMap with
 *  a simple atomic increment method 
class Counter(override val underlying: ConcurrentHashMap[String,Long])
        extends MapWrapper[String,Long] {
    def this() = this(new ConcurrentHashMap[String,Long](counters,0.75f,segments))
    def this(initSize: Int, segments: Int) = 
            this(new ConcurrentHashMap[String,Long](initSize,0.75f,segments))

    /** atomically initializes or increments the counter at key by one
     *  @return the new count */
    def inc(key:String):Long = inc(key,1)

    /** atomically initializes or increments the counter at key by the specified amount
     *  @return the new count */
    def inc(key:String, amount:Int) = {
        var oldCount = 0L;
        if(underlying.containsKey(key) || 
                (underlying.putIfAbsent(new String(key), amount) != 0)) {
            do {
                oldCount = underlying.get(key)
            } while(!underlying.replace(key, oldCount, oldCount + amount))
        oldCount + amount

Seems like an awful long journey for so little code, but it was worth it. Writing this little block of code gave me a much better understanding of strengths and weaknesses of Scala.

1 comment:

  1. This comment has been removed by a blog administrator.