Monday, June 2, 2008

Multicore Sorter

Continuing on from my last post, I'll start with a review of the simpler of the three multicore abstractions: the Sorter. Sorting is a great candidate for parallel processing as many sorting algorithms already divide and conquer their work. My approach was to borrow from two of these algorithms, QuickSort and MergeSort, to do just enough sorting just in time. Here's how:

  1. partition the data to be sorted into evenly sized shards, one for each cpu core
  2. launch a worker thread to quicksort each shard
  3. when all the workers are complete, merge the sorted shards back together

This still leaves a large merge in a single thread as the last step. Luckily, the WideFinder 2 Benchmark doesn't require us to completely sort all the data. We just need the top ten items in each category. So given a set of sorted shards (say that fast a few times!), it's easy to iterate over them to produce a list of the overall top ten items. The implementation below does the parallel quicksort, and then, acting as an Iterator over the entire sorted collection, amortizes the work of merging the shards back together over the sequential access of the entire collection. Given that we only need 10 items, most of this work goes undone in our use of Sorter. Here's the code:

package net.waldin.mc

import scala.actors.Actor._

import Config.numSorters

class Sorter[T](iter: Iterator[T], size: Int, comp: (T,T)=>Int) extends Iterator[T] {
    val shardSize = Math.ceil(size.toDouble / numSorters).toInt
    val shardIdx = List.range(0, size, shardSize).toArray
    val array = new Array[T](size)
    var sorted = false
    final def shardOffset(shard:Int) = shard * shardSize
        
    object Comparator extends java.util.Comparator[T] {
        override def compare(a: T, b: T) = comp(a,b)
    }
    
    def sort {
        //extract array
        iter.copyToArray(array,0)
    
        // create actors with array shards for quicksort
        val sorters = for(shard <- (0 until numSorters).toList) yield {
            scala.actors.Futures.future {
                val start = shardOffset(shard)
                val end = size min (start + shardSize)
                java.util.Arrays.sort(array, start, end, Comparator)
            }
        }

        // await completion of futures
        for(s <- sorters) s() 
        sorted = true
    }
    
    override def hasNext: Boolean = {
        if(!sorted) sort
        firstShardWithData.isDefined
    }
    
    private def firstShardWithData = {
        var shard = 0
        while(shard < numSorters && 
                shardIdx(shard) == (size min shardOffset(shard + 1))) {
            shard += 1
        }
        if(shard == numSorters) None else Some(shard)
    }
    
    override def next(): T = {
        if(!sorted) sort
        firstShardWithData match {
            case None => throw new NoSuchElementException
            case Some(n) =>
                var shard = n
                var tryShard = shard + 1
                while(tryShard < numSorters) {
                    if(shardIdx(tryShard) < (size min shardOffset(tryShard + 1)) && 
                            comp(array(shardIdx(tryShard)), array(shardIdx(shard))) < 0){
                        shard = tryShard
                    }
                    tryShard += 1
                }
                shardIdx(shard) += 1
                array(shardIdx(shard) - 1)
        }
    }
}

No comments:

Post a Comment