Thursday, June 5, 2008

Multicore Counter

Like sorting, thread safe tallying is pretty straightforward. Or at least it should be. If your coding in any JVM based language, it should be virtually free. The java.util.concurrent package has been part of the JDK for a few years now and provides very easy to use primitives that are almost perfect for tallying. Almost.

There are two parts to this puzzle. The first part is, you need a thread safe map (from strings to accumulators), and the second part is, you need an accumulator with atomic increment/add functions so that simultaneous updates to an accumulator will not collide. These two parts exist in the form of java.util.concurrent.ConcurrentHashMap and java.util.concurrent.atomic.AtomicInteger. Putting it all together with Scala's MapWrapper for Java maps, you get this simple solution:

class Counter extends MapWrapper[String,AtomicInteger] {
   override val underlying = new ConcurrentHashMap[String,AtomicInteger]
}

The problem is, that doesn't work as expected. When the Java concurrent classes are taken together with a teaspoon of Scala's syntactic sugar for maps, you can get yourself into some unexpected, unsynchronized, hot water. Normally, Scala offers a couple of sweet shortcuts for dealing with maps (or any other objects that adopt the same protocol), as this code demonstrates:

object Test extends Application {
    val m = new Counter              // create an empty counter
    m("a") = new AtomicInteger(4)    //"a" -> 4
    m("a") += 3                      //"a" -> 7
}

What's really happening under the covers is something like this:

object Test extends Application {
   val m = Map[String,Int]()
   m.update("a", new AtomicInteger(4))
   m.update("a", m.apply("a") + 3)
}

which brings us straight to our first problem: you can't use arithmetic operators with AtomicIntegers. You could extend AtomicInteger with a Scala class that defines +, but an even more incidious problem lurks just beyond that: you can't use Scala's compound assignment operator (<op>=) syntactic sugar and expect atomicity. At first it seemed like you could override whatever method Scala's compiler substitutes for += on a MapWrapper, and have it call the atomic ConcurrentMap.replace method, instead of Map.put, but as you can see, there is no single method. The compiler injects two method calls in place of +=. The update and apply are performed seperately without synchronization. So while it would be nice to use this += syntactic sugar, it's out. Instead we're going to need do define a new method that initializes or increments an accumulator with guaranteed atomicity, something like:

def inc(key:String, amount:Int) = {
   var oldCount = underlying.putIfAbsent(key, new AtomicInteger(amount))
   if(oldCount == null) amount
   else oldCount.addAndGet(amount)
}

def inc(key:String):Int = inc(key,1)

So that works fine for counters that have Integer range accumulators, how about when you need the range of a Long? Easy. Generics to the rescue. Right? No. No? Crap. In Java there is no way to parameterize your type over the set of {AtomicInteger, AtomicLong} so that your generic type can use the addAndGet method, for two reasons.

  1. they each sepearately define an addAndGet method, not inherit its declaration from a common ancestor, and
  2. they each take an argument of a different fixed type

In Scala you can get around these problems using either Structural Types or Implicit Type Parameters (a.k.a. the Poor Man's Type Classes). Both of these solutions work to some limited extent for this problem, but both also came with too much conceptual and computational overhead for my needs. Structural types, I was told, are implemented using Java reflection, which is a huge performance drain, and implicit type parameters require implicit objects to be visible through your code, and extra implicit argument lists on many function definitions. Way too much work for too little gain.

So instead of parameterizing over the accumulator type, I decided to just stick with Long accumulators for all cases. Even using AtomicLongs proved more trouble that it was worth, causing unneeded complexity either in the class definition of Counter, or externally, everywhere the accumulators were used in the code. So ultimately, I bit the bullet and just implemented an atomic increment method on Longs. Here's the finished product:

package net.waldin.mc

import scala.collection.jcl.MapWrapper
import java.util.concurrent.ConcurrentHashMap

import Config.{counters,segments}

/** a Map backed by a java.util.concurrent.ConcurrentHashMap with
 *  a simple atomic increment method 
 */
class Counter(override val underlying: ConcurrentHashMap[String,Long])
        extends MapWrapper[String,Long] {
    def this() = this(new ConcurrentHashMap[String,Long](counters,0.75f,segments))
    def this(initSize: Int, segments: Int) = 
            this(new ConcurrentHashMap[String,Long](initSize,0.75f,segments))

    /** atomically initializes or increments the counter at key by one
     *  @return the new count */
    def inc(key:String):Long = inc(key,1)

    /** atomically initializes or increments the counter at key by the specified amount
     *  @return the new count */
    def inc(key:String, amount:Int) = {
        var oldCount = 0L;
        if(underlying.containsKey(key) || 
                (underlying.putIfAbsent(new String(key), amount) != 0)) {
            do {
                oldCount = underlying.get(key)
            } while(!underlying.replace(key, oldCount, oldCount + amount))
        }
        oldCount + amount
    }
}

Seems like an awful long journey for so little code, but it was worth it. Writing this little block of code gave me a much better understanding of strengths and weaknesses of Scala.

Monday, June 2, 2008

Multicore Sorter

Continuing on from my last post, I'll start with a review of the simpler of the three multicore abstractions: the Sorter. Sorting is a great candidate for parallel processing as many sorting algorithms already divide and conquer their work. My approach was to borrow from two of these algorithms, QuickSort and MergeSort, to do just enough sorting just in time. Here's how:

  1. partition the data to be sorted into evenly sized shards, one for each cpu core
  2. launch a worker thread to quicksort each shard
  3. when all the workers are complete, merge the sorted shards back together

This still leaves a large merge in a single thread as the last step. Luckily, the WideFinder 2 Benchmark doesn't require us to completely sort all the data. We just need the top ten items in each category. So given a set of sorted shards (say that fast a few times!), it's easy to iterate over them to produce a list of the overall top ten items. The implementation below does the parallel quicksort, and then, acting as an Iterator over the entire sorted collection, amortizes the work of merging the shards back together over the sequential access of the entire collection. Given that we only need 10 items, most of this work goes undone in our use of Sorter. Here's the code:

package net.waldin.mc

import scala.actors.Actor._

import Config.numSorters

class Sorter[T](iter: Iterator[T], size: Int, comp: (T,T)=>Int) extends Iterator[T] {
    val shardSize = Math.ceil(size.toDouble / numSorters).toInt
    val shardIdx = List.range(0, size, shardSize).toArray
    val array = new Array[T](size)
    var sorted = false
    final def shardOffset(shard:Int) = shard * shardSize
        
    object Comparator extends java.util.Comparator[T] {
        override def compare(a: T, b: T) = comp(a,b)
    }
    
    def sort {
        //extract array
        iter.copyToArray(array,0)
    
        // create actors with array shards for quicksort
        val sorters = for(shard <- (0 until numSorters).toList) yield {
            scala.actors.Futures.future {
                val start = shardOffset(shard)
                val end = size min (start + shardSize)
                java.util.Arrays.sort(array, start, end, Comparator)
            }
        }

        // await completion of futures
        for(s <- sorters) s() 
        sorted = true
    }
    
    override def hasNext: Boolean = {
        if(!sorted) sort
        firstShardWithData.isDefined
    }
    
    private def firstShardWithData = {
        var shard = 0
        while(shard < numSorters && 
                shardIdx(shard) == (size min shardOffset(shard + 1))) {
            shard += 1
        }
        if(shard == numSorters) None else Some(shard)
    }
    
    override def next(): T = {
        if(!sorted) sort
        firstShardWithData match {
            case None => throw new NoSuchElementException
            case Some(n) =>
                var shard = n
                var tryShard = shard + 1
                while(tryShard < numSorters) {
                    if(shardIdx(tryShard) < (size min shardOffset(tryShard + 1)) && 
                            comp(array(shardIdx(tryShard)), array(shardIdx(shard))) < 0){
                        shard = tryShard
                    }
                    tryShard += 1
                }
                shardIdx(shard) += 1
                array(shardIdx(shard) - 1)
        }
    }
}

Sunday, June 1, 2008

Multicore WideFinding

Ok, so here is some actual code to speed up the widefinding across multiple processor cores. I've tried to abstract out the new stuff in such a was as to leave the main driver application looking somewhat like the original strawman impl in Scala, which in turn was made to resemble the original WideFinder 2 strawman impl in Ruby. Turns out the new stuff maps nicely to the three problems mentioned earlier: matching, counting, and sorting, and so you'll see sprinkled throughout the code below references to Matcher, Counter, and Sorter (classes from the net.waldin.mc package) where previously you saw actual regex matching, counting and sorting code.

Another major difference between this version and the last is that it uses a regular expression to extract the log entries from the input, instead of doing this:

  • find line boundaries
  • split lines of text into whitespace seperated fields
  • remove some extraneous quotes and brackets
  • concatenate back together fields that contain spaces (like time and useragent)

A single regular expression can accomplish the same thing faster, all with one declarative statement. The regular expression is a doozy, but since Scala uses the Java regular expression engine, which is known to be full-featured and performant, may as well take advantage of it. After all, this is exactly what regular expressions were meant to deal with, as long as the expression itself is optimized for speed, with minimal backtracking, and correctly handles all reasonable input. I just wish the expression itself was a little more readable by us non-machines. Hmm, I wonder if ... maybe later...

So here's the main driver with references to the abstracted out multicore functionality, and the gnarly regular expression:

package net.waldin.wf

import java.io.{FileInputStream,BufferedInputStream}

import scala.compat.Platform.currentTime
import scala.util.matching.Regex._

import net.waldin.mc.{Counter,Matcher,Sorter}

/** A normal HTML report: Top 10 popular URLs by hits and bytes, top 10 404s, *
 *  top 10 client IPs by hits, and the top 10 referrers.  Only, skip the HTML */
object WideFinder2c extends Application {
    val hits, bytes, s404s, refs = new Counter()
    val clients = new Counter(12*1024*1024,96)
    val logEntryRegex = 
            ("""(?m)\n(\S+) - \S+ \[[^\]]+\] "([^\\"]+(?:\\.[^\\"]*)*)" (\d+) """+
            """(-|\d+) "([^\\"]+(?:\\.[^\\"]*)*)" "([^\\"]+(?:\\.[^\\"]*)*)"$""").r
    val requestRegex = """GET ((/ongoing/When/\d+x/\d+/\d+/\d+/[^ .]+)|[^ ]+) """.r

    def record(host: String, article: String, ref: String) {
        if(article != null) {
            hits.inc(new String(article))
            clients.inc(host)
            if(ref.length() > 1 && !ref.startsWith("http://www.tbray.org/ongoing/")) 
                refs.inc(ref)
        }
    }

    def report(label: String, map: Counter, megabytes: Boolean) {
        printf("Top %s:\n", label)
        val keysByCount = new Sorter(map.keys, map.size, (a:String,b:String) => 
            (map(b) - map(a)) match {
                case 0 => a.compareTo(b)
                case n => n.toInt
            }
        )
        for(key <- keysByCount.take(10)) {
            val format = if(megabytes) "%10.1fM: %s\n" else "%11.0f: %s\n"
            val value: Double = if(megabytes) map(key)/(1024.0d*1024.0d) else map(key)
            printf(format, value, if(key.length>60) key.substring(0,60) + "..." else key)
        }   
        println
    }

    override def main(args: Array[String]) {
        new Matcher(new FileInputStream(args(0)), logEntryRegex, "\n", "", _ match {
            case Some(m) if(m.group(2).startsWith("GET")) => {
                def f(i:Int) = new String(m.group(i))
                val (host, request, status, size, ref) = 
                        (f(1), f(2), f(3).toInt, if(f(4)=="-") 0 else f(4).toInt, f(5))
                // avoid MatchIterator.matchData because it forces, lots of overhead
                val reqMatch = requestRegex.findAllIn(request)
                if(reqMatch.hasNext) {
                    val uri = new String(reqMatch.group(1))
                    // workaround for http://lampsvn.epfl.ch/trac/scala/ticket/983
                    val article = if(reqMatch.start(2) == -1) null else reqMatch.group(2)
                    status match {
                        case 200 => 
                            bytes.inc(uri, size)
                            record(host, article, ref)
                        case 304 => record(host, article, ref)
                        case 404 => s404s.inc(uri)
                        case _ =>
                    }
                }
            } 
            case Some(_) => // not GET, ignore
            case None =>    // matcher is done
                report("URIs by hit", hits, false)
                report("URIs by bytes", bytes, true)
                report("404s", s404s, false)
                report("client addresses", clients, false)
                report("refs", refs, false)
                printf("[total %,dms]\n", currentTime - executionStart)             
                exit
        }).start
    }
}

As it stands now, this code takes approximately 20 minutes to get through the entire 42GB dataset. Here are some times from a recent run:

        real    20:53.8
        user  8:19:50.2
        sys     28:13.9

That comes out to about 2500% cpu utilization, so hopefully there's still some room for improvement. I'll review Sorter, Counter, and Matcher in some future posts.

Tuesday, May 27, 2008

WideFinder 2 Strawman implemented in Scala

Tim Bray has posted a strawman of the WideFinder 2 Benchmark and also posted an implementation in Ruby. It's similar in spirit to the original WideFinder, with the following differences:

  1. much more logic: more pattern matching, counting, and conditional logic per record, which is designed to increase the CPU load
  2. much more data: over 40GB in fact.
  3. much more power: Sun Fire T2000 UltraSparcT1, 32 cores

Once again, recreating this basic implementation in Scala will help later to factor out any differences in performance based on language/runtime alone. Once pure language/runtime differences are understood, it will be easier to discern differences between various algorithmic approaches to concurrency. So without further ado, here is the basic, single-threaded, Scala version of the benchmark:

import scala.collection.mutable._

/** A normal HTML report: Top 10 popular URLs by hits and bytes, top 10 404s, *
 *  top 10 client IPs by hits, and the top 10 referrers.  Only, skip the HTML */
object WideFinder2 extends Application {
    class Counter extends HashMap[String,Long] {
        override def default(key:String) = 0L
        // override update to workaround http://lampsvn.epfl.ch/trac/scala/ticket/904
        override def update(k:String,v:Long) = super.update(k,v) 
    }
    
    val hits, bytes, s404s, clients, refs = new Counter
    val uriRegex = """/ongoing/When/\d\d\dx/\d\d\d\d/\d\d/\d\d/[^ .]+$""".r
    val refExclRegex = "-|http://www\\.tbray\\.org/ongoing/".r

    def record(host: String, uri: String, ref: String) {
        if(uriRegex.findPrefixOf(uri).isDefined) {
            hits(uri) += 1
            clients(host) += 1
            if(refExclRegex.findPrefixOf(ref).isEmpty) 
                refs(ref) += 1
        }
    }

    def report(label: String, map: Map[String,Long], megabytes: Boolean) {
        printf("Top %s:\n", label)
        val keysByCount = map.keySet.toList.sort { (a,b) =>
            (map(a) - map(b)) match {
                case 0 => a < b
                case n => n > 0
            }
        }
        for(key <- keysByCount.take(10)) {
            val format = if(megabytes) "%10.1fM: %s\n" else "%11.0f: %s\n"
            val value: Double = if(megabytes) map(key)/(1024.0d*1024.0d) else map(key)
            printf(format, value, if(key.length > 60) key.take(60) + "..." else key)
        }   
        println
    }

    override def main(args: Array[String]) {
        def toInt(a:String) = try {a.toInt} catch {case _=>0}
        for(line <- io.Source.fromFile(args(0)).getLines) {
            val f = line.split("\\s+")
            if(f(5) == "\"GET") {
                val (host, uri, status, size, ref) = 
                    (f(0), f(6), toInt(f(8)), toInt(f(9)), f(10).slice(1,f(10).length-1))

                status match {
                    case 200 => 
                        bytes(uri) += size
                        record(host, uri, ref)
                    case 304 => record(host, uri, ref)
                    case 404 => s404s(uri) += 1
                    case _ =>
                }
            }
        } 
        report("URIs by hit", hits, false)
        report("URIs by bytes", bytes, true)
        report("404s", s404s, false)
        report("client addresses", clients, false)
        report("refs", refs, false)
    }
}

The program can be run using this command:

java -cp .:scala-library.jar WideFinder2 O.100k

Interestingly, it has similar performance to the Ruby version, taking 23 seconds to process 100k lines on the test system, producing results identical to these. Besides input performance, which is neccessarily single threaded and can only be improved incrementally, there are three areas to focus on distributing:

  1. matching
  2. counting
  3. sorting

However, the incremental improvement to input, which I hinted at in my last post, changes the nature of the problem quite a bit, so before getting into these three areas, maybe I should tackled that first. The basic idea is, instead of tying up the single input thread with extra work (namely character decoding and newline detection), just read large arbitrarily aligned blocks of bytes, and let worker theads deal with them in parallel. This causes two new problems:

  1. multibyte and variable byte character encodings are more difficult to handle as a single character may span across multiple blocks
  2. the content you're looking for (let's call this the target data) may span across multiple blocks

Luckily he first one is easy to deal with: only support ASCII for the time-being. Any fixed-width character encoding can be handled pretty trivially by buffering and appending/truncating each input block to make it an even multiple of N. Variable-width encoding, like UTF-8, for example, would be much harder to handle. This might be a good subject for a future project.

Likewise, the second problem has a straightforward solution to deal the WideFinder data, and more complex solutions to deal with the general case. The WideFinder data is line based text, and the WideFinder benchmark deals with finding patterns on single lines of text. So we can save the head and tail of each line that spans multiple blocks and re-join them for processing (i.e. join the tail of block 1 with the head of block 2 to get the line that spans blocks 1 and 2). For the general case, there are a few other strategies to collect and join endpieces, each with a requirement on the nature of the target data:

  1. if the target data is guaranteed to not overlap, then the head and tail to be re-joined is any data between the end of the last match of a block, and the beginning of the first match in the next block. If the targets are too few and far between, and the size of the re-joined endpieces are frequently comparable to the input blocks themselves, this strategy results in too much repeated matching to be useful.
  2. if the target data is guaranteed to only overlap partially, the above strategy changes to taking all data between just after the start of the last match in a block, and just before the end of the first match in the next block. This strategy has the same problem for sparse matches
  3. if the target is guaranteed to be no bigger than some reasonably small (relative to the input block) size, then either of the two overlap strategies can be augmented with a maximum size which should reduce the redundant matching in huge endpieces.

For now, there is no need for any of these complex strategies to deal with re-joining endpieces, and we only have to worry about ASCII encoding. Maybe this can all be tackled in some future WideFinder 3.

Friday, May 16, 2008

WiderFinder

Continuing on the theme of Scala WideFinding, I made a couple of attempts to scale the performance across multiple cores without much luck. These mostly involved using Actors in various ways to distribute load. For example, I tried distributing the regex matching, the counter lookup and incrementing, as well as the result sorting. None of that yielded much in the way of improved performance.

From what I can tell, there is some lower common denominator in all cases that keeps performance from improving, and it seems to be the use of Source.getLines in the main thread. Here's how it looked in the original version:

for (input <- fromFile(args(0),"ASCII").getLines if matcher.reset(input).find) {
    counts(matcher.group(1)) += 1
}

That didn't change too much across the experiments so far, with the one major exception of distributing the regex matching. If you haven't seen how Actors are used Scala, it might seem a little foreign, so let me briefly digress and explain how they work at a very high level so you'll understand what follows. The basic idea is that each Actor is an object that may own a thread or may participate in a thread pool. You send message objects to the Actors, and they decide when and how to handle each message in their queue. The syntax is dead simple:

actor ! message

where actor is an implementation of Actor, and message is basically any old object. Using Actors to distribute the regular expression matching made the above for expression look something like this:

var n = 0
for (input <- fromFile(args(0),"ASCII").getLines) {
    regexMatchers(n % numMatchers) ! Some(input)
    n += 1
}

This expression sends input lines to a number of different regexMatcher Actors in a round robin fashion. Regex matching is quite CPU intensive and each line is independent from the next, so this seems like a worthy candidate for parallelization. Unfortunately, the benefit is never realized since it sits behind what I suspect is a slow input source. Two things that stand our are:

  1. Source.fromFile() returns a BufferedSource which is an Iterator[Char], and getLines() returns an Iterator[String] which itself iterates over the BufferedSource character by character, looking for a newline, building up a string as it goes. Seems rather tedious for the main input thread to be doing this while multiple regex matching worker threads wait for input.
  2. By default, BufferedSource uses a 2048 character buffer. That's smaller than a single cluster on most filesystems, and is likely another source of slowness

Other areas to investigate are

  1. NIO offers scattering reads that can read into a number of buffers at once
  2. java.util.concurrent has some non-blocking and limited-blocking data structures that may help speed things up

Thursday, May 15, 2008

WideFinder in Scala

Tim Bray has put forth another challenge in Wide Finder 2. The purpose of this round is to try and take advantage of concurrent programming techniques on a single machine to better utilize the multiple cores commonly found in modern CPUs.

I didn't take part in the first Wide Finder project, so I feel I have a little catching up to do before tackling the current version. Since I'm also taking every opportunity I can to learn more Scala, I decided it would be best to just recreate the original version in Scala, preserving as much of the elegance and simplicity found in Tim's original Beautiful Code. That, plus it happens to be a requirement to participate to reproduce his results with a minimal version... Here's what I came up with:

import java.util.regex.Pattern
import scala.collection.mutable.{Map,HashMap}
import scala.io.Source.fromFile

object WideFinder extends Application {
    override def main(args: Array[String]) = {
        val counts: Map[String, Int] = new HashMap[String, Int] {
            override def default(key:String) = 0
        }
        
        val pattern = "GET /ongoing/When/\\d{3}x/(\\d{4}(/\\d{2}){2}[^ .]+) "
        val matcher = Pattern.compile(pattern).matcher("")
        
        for (input <- fromFile(args(0),"ASCII").getLines if matcher.reset(input).find) {
            counts(matcher.group(1)) += 1
        }
        
        val keysByCount = counts.keySet.toList.sort {(a,b) => counts(a) > counts(b)}
        for (key <- keysByCount.take(10)) {
            println(counts(key) + ": " + key)
        }
    }
}

Its ended up pretty close to the original Ruby code. Performance wise, it seems reasonable when tested against the 100K sample files on my 2.33 GHz MacBook Pro, taking about a second to complete. I'm chomping at the bit to do a concurrent version to handle the 45GB of log data on a 32 core server.