Showing posts with label widefinder. Show all posts
Showing posts with label widefinder. Show all posts

Monday, June 2, 2008

Multicore Sorter

Continuing on from my last post, I'll start with a review of the simpler of the three multicore abstractions: the Sorter. Sorting is a great candidate for parallel processing as many sorting algorithms already divide and conquer their work. My approach was to borrow from two of these algorithms, QuickSort and MergeSort, to do just enough sorting just in time. Here's how:

  1. partition the data to be sorted into evenly sized shards, one for each cpu core
  2. launch a worker thread to quicksort each shard
  3. when all the workers are complete, merge the sorted shards back together

This still leaves a large merge in a single thread as the last step. Luckily, the WideFinder 2 Benchmark doesn't require us to completely sort all the data. We just need the top ten items in each category. So given a set of sorted shards (say that fast a few times!), it's easy to iterate over them to produce a list of the overall top ten items. The implementation below does the parallel quicksort, and then, acting as an Iterator over the entire sorted collection, amortizes the work of merging the shards back together over the sequential access of the entire collection. Given that we only need 10 items, most of this work goes undone in our use of Sorter. Here's the code:

package net.waldin.mc

import scala.actors.Actor._

import Config.numSorters

class Sorter[T](iter: Iterator[T], size: Int, comp: (T,T)=>Int) extends Iterator[T] {
    val shardSize = Math.ceil(size.toDouble / numSorters).toInt
    val shardIdx = List.range(0, size, shardSize).toArray
    val array = new Array[T](size)
    var sorted = false
    final def shardOffset(shard:Int) = shard * shardSize
        
    object Comparator extends java.util.Comparator[T] {
        override def compare(a: T, b: T) = comp(a,b)
    }
    
    def sort {
        //extract array
        iter.copyToArray(array,0)
    
        // create actors with array shards for quicksort
        val sorters = for(shard <- (0 until numSorters).toList) yield {
            scala.actors.Futures.future {
                val start = shardOffset(shard)
                val end = size min (start + shardSize)
                java.util.Arrays.sort(array, start, end, Comparator)
            }
        }

        // await completion of futures
        for(s <- sorters) s() 
        sorted = true
    }
    
    override def hasNext: Boolean = {
        if(!sorted) sort
        firstShardWithData.isDefined
    }
    
    private def firstShardWithData = {
        var shard = 0
        while(shard < numSorters && 
                shardIdx(shard) == (size min shardOffset(shard + 1))) {
            shard += 1
        }
        if(shard == numSorters) None else Some(shard)
    }
    
    override def next(): T = {
        if(!sorted) sort
        firstShardWithData match {
            case None => throw new NoSuchElementException
            case Some(n) =>
                var shard = n
                var tryShard = shard + 1
                while(tryShard < numSorters) {
                    if(shardIdx(tryShard) < (size min shardOffset(tryShard + 1)) && 
                            comp(array(shardIdx(tryShard)), array(shardIdx(shard))) < 0){
                        shard = tryShard
                    }
                    tryShard += 1
                }
                shardIdx(shard) += 1
                array(shardIdx(shard) - 1)
        }
    }
}

Friday, May 16, 2008

WiderFinder

Continuing on the theme of Scala WideFinding, I made a couple of attempts to scale the performance across multiple cores without much luck. These mostly involved using Actors in various ways to distribute load. For example, I tried distributing the regex matching, the counter lookup and incrementing, as well as the result sorting. None of that yielded much in the way of improved performance.

From what I can tell, there is some lower common denominator in all cases that keeps performance from improving, and it seems to be the use of Source.getLines in the main thread. Here's how it looked in the original version:

for (input <- fromFile(args(0),"ASCII").getLines if matcher.reset(input).find) {
    counts(matcher.group(1)) += 1
}

That didn't change too much across the experiments so far, with the one major exception of distributing the regex matching. If you haven't seen how Actors are used Scala, it might seem a little foreign, so let me briefly digress and explain how they work at a very high level so you'll understand what follows. The basic idea is that each Actor is an object that may own a thread or may participate in a thread pool. You send message objects to the Actors, and they decide when and how to handle each message in their queue. The syntax is dead simple:

actor ! message

where actor is an implementation of Actor, and message is basically any old object. Using Actors to distribute the regular expression matching made the above for expression look something like this:

var n = 0
for (input <- fromFile(args(0),"ASCII").getLines) {
    regexMatchers(n % numMatchers) ! Some(input)
    n += 1
}

This expression sends input lines to a number of different regexMatcher Actors in a round robin fashion. Regex matching is quite CPU intensive and each line is independent from the next, so this seems like a worthy candidate for parallelization. Unfortunately, the benefit is never realized since it sits behind what I suspect is a slow input source. Two things that stand our are:

  1. Source.fromFile() returns a BufferedSource which is an Iterator[Char], and getLines() returns an Iterator[String] which itself iterates over the BufferedSource character by character, looking for a newline, building up a string as it goes. Seems rather tedious for the main input thread to be doing this while multiple regex matching worker threads wait for input.
  2. By default, BufferedSource uses a 2048 character buffer. That's smaller than a single cluster on most filesystems, and is likely another source of slowness

Other areas to investigate are

  1. NIO offers scattering reads that can read into a number of buffers at once
  2. java.util.concurrent has some non-blocking and limited-blocking data structures that may help speed things up

Thursday, May 15, 2008

WideFinder in Scala

Tim Bray has put forth another challenge in Wide Finder 2. The purpose of this round is to try and take advantage of concurrent programming techniques on a single machine to better utilize the multiple cores commonly found in modern CPUs.

I didn't take part in the first Wide Finder project, so I feel I have a little catching up to do before tackling the current version. Since I'm also taking every opportunity I can to learn more Scala, I decided it would be best to just recreate the original version in Scala, preserving as much of the elegance and simplicity found in Tim's original Beautiful Code. That, plus it happens to be a requirement to participate to reproduce his results with a minimal version... Here's what I came up with:

import java.util.regex.Pattern
import scala.collection.mutable.{Map,HashMap}
import scala.io.Source.fromFile

object WideFinder extends Application {
    override def main(args: Array[String]) = {
        val counts: Map[String, Int] = new HashMap[String, Int] {
            override def default(key:String) = 0
        }
        
        val pattern = "GET /ongoing/When/\\d{3}x/(\\d{4}(/\\d{2}){2}[^ .]+) "
        val matcher = Pattern.compile(pattern).matcher("")
        
        for (input <- fromFile(args(0),"ASCII").getLines if matcher.reset(input).find) {
            counts(matcher.group(1)) += 1
        }
        
        val keysByCount = counts.keySet.toList.sort {(a,b) => counts(a) > counts(b)}
        for (key <- keysByCount.take(10)) {
            println(counts(key) + ": " + key)
        }
    }
}

Its ended up pretty close to the original Ruby code. Performance wise, it seems reasonable when tested against the 100K sample files on my 2.33 GHz MacBook Pro, taking about a second to complete. I'm chomping at the bit to do a concurrent version to handle the 45GB of log data on a 32 core server.