Thursday, June 5, 2008

Multicore Matcher

The third and final piece of my Scala Wide Finder 2 submission is the multicore Matcher. This component takes an InputStream, a regular expression, and a closure, and, using as many CPU cores as you can spare (All your cores are belong to us!), it applies your closure once for each match in the stream, in no particular order.

Externally, it looks pretty simple, like a normal loop construct that calls a closure. This example prints every line in the specified file:


object Test extends Application {
    eachMatch(new FileInputStream("O.100k"), "[^\n]+\n".r) { m =>

Looks pretty straightforward from the outside, but there's a lot going on behind the scenes, and unfortunately, this abstraction has a few big leaks that the caller needs to be aware of to prevent sinking themselves. I'll point them out after I describe the internal of the Matcher.

The Matcher is designed as a set of actors which communicate by passing messages to each other asynchronously, and is implemented using the Scala Actors library. There are three types of Actors used in the Matcher:

there is one reader responsible for reading bytes into Buffers from the input stream, and passing these Buffer messages to Matchers. When the input stream has been exhausted, the reader sends every matcher a Done message, and then exits.
there are many matchers, usually one for each CPU core. Matchers receive Buffers, which they search using the supplied regular expression, and for each match they find, they call the supplied closure. They also generate Endpieces, one for every Buffer they receive, containing some part of the head and tail of that Buffer, and send this as a message to the endJoiner. Additionally, when a matcher receives a Done message, and has finished processing all Buffers in it's input queue, it sends the endJoiner a Done message, and exits.
there is one endJoiner whose primary function is to receive Endpieces, and using the Endpieces from two consecutive Buffers, it joins the tail of the first buffer to the head of the next, and then searches this buffer using the same regular expression used by the matcher. If it finds a match, it calls the same closure. The secondary function of the endJoiner is signal completion of the matching run, which it does after receiving one Done message per matcher, and processing all remaining Endpieces.

The messages types are defined as:

class Buffer (val matcher: Int, val data: Array[Byte]) {
    var chunk, size = 0

case class Endpieces (val chunkNum: Int, val head: String, val tail: String)

case object Done

Here's a UML 2 Sequence diagram that puts it all together graphically, with the actors that send and receive messages (and the closure) along the top, and the parallel processing of each constituent shown in vertically separated frame fragments.

There are a few parts to this design that may not be apparent at first. Buffers deliver Bytes from the reader to each matcher. That means the job of character decoding is left to the matcher. Also, there's no newline detection built into the matcher at all. That's all done by the supplied regular expression (or within the closure, or never at all, depending on the needs of the caller). The abstraction leaks are all related to this need to maximize throughput in the reader by doing as little as possible, and then expecting subsequent actors to make sense of pieces they've been given.

As far as I can tell, here are all the gory internals you need to consider when using this Matcher:

ASCII only
this technique can easily be made to work with any fixed width character encodings, but for now it's limited to ASCII. Variable byte encodings will be a challenge to support without sequentially decoding the entire output stream. Too bad character streams don't have "keyframes" where a known good state can be guaranteed intermittently midstream. [Update: Meanwhile, over on the Wide Finder mailing list, James Aylett has just pointed out that UTF-8 is resynchronizable after all, so this scheme could easily be made to work for UTF-8 encoded data]
No overlapping matches
the matcher does not support matches that overlap each other. As it is right now, some overlapping matches will be detected, but those happening at buffer boundaries may not. Partially overlapping matches can be supported by slightly altering the construction of endpieces, but as it stands right now, even partially overlapping matches are not not guaranteed to be found at buffer boundaries. Partially overlapping means match A starts before match B starts and ends before match B ends. Fully overlapping means match A starts on or before the start of match B and ends on or after the end of match B. Partial overlaps are supported.
Maximum match size
to speed up cases where there are sparse matches resulting in huge endpieces, a maximum match size is imposed that restricts the size of endpieces (4096 bytes by default).
Regex boundary matchers lose their meaning
Line boundary matchers (^ and &), word boundary matchers (\b and \B), previous match end (\G) and input boundary matchers (\A, \Z, and \z) do not reliably detect their respective boundary types. The problem is they treat the edges of the input as a boundary, but what they sometimes detect as the edge on input is just a buffer boundary. So boundary markers should be used very cautiously, if at all. In order to help callers detect line boundaries reliably, the Matcher inserts \n at the beginning and end of the input stream. Using \n in your regular expression to detect line boundaries is guaranteed to work, and there is no chance a you'll accidentally mistake a buffer boundary for the beginning or end of the input stream.

Here's the complete listing of the Matcher component:



import scala.actors._
import scala.actors.Actor._
import scala.compat.Platform.currentTime
import scala.Console.{withOut,err}
import scala.util.matching.Regex
import scala.util.matching.Regex.{MatchData => Match}

import Config._

object Matcher {
    class Buffer (val matcher: Int, val data: Array[Byte]) {
        var chunk, size = 0

    case class Endpieces (val chunkNum: Int, val head: String, val tail: String)

    case object Done

    def debugLog(format: String, args: Any*) = if(debug) withOut(err) {printf(format, args:_*)}

    def mb(bytes: Long) = bytes/(1024.0*1024.0)

    def eachMatch(in: InputStream, regex: Regex)(closure: Match => Unit):Unit = 
            (eachMatchAsync(in, regex)(closure))()
    /** @return a future representing completion of this matcher */
    def eachMatchAsync(in: InputStream, regex: Regex)(closure: Match => Unit):Future[Any] = {
        val buffers = for(_ <- (0 until buffersPerMatcher).toList; 
                            i <- (0 until numMatchers).toList) yield {
                        new Buffer(i, new Array[Byte](bufferSize))

        lazy val reader: Actor = actor {
            var chunkNum = 0
            var bytesTotal = 0L
            val startTime = currentTime
            def elapsedSeconds = (currentTime - startTime)/1000.0
            def progressLog(bytes: Long) =
                    debugLog("\r%,3.1f MB (%,4.2f MBps)", {mb(bytes)}, {mb(bytes)/elapsedSeconds})
            loop {
                receive { 
                    case buffer: Buffer =>
                        val bytesRead =, 0, bufferSize)
                        if(bytesRead > -1) {
                            buffer.chunk = chunkNum
                            buffer.size = bytesRead
                            matchers(buffer.matcher) ! buffer
                            bytesTotal += bytesRead
                            chunkNum += 1
                        } else {
                            endJoiner ! new Endpieces(chunkNum+1, inputSuffix, "")
                            for(matcher <- matchers) matcher ! Done
        lazy val matchers = for(n <- (0 until numMatchers).toList) yield actor {
            loop { 
                receive {
                    case buffer: Buffer => 
                        var firstIndex = -1
                        var lastIndex = 0
                        var segment = 0
                        val str = new String(, 0, buffer.size, "ASCII")
                        // avoid using MatchIterator.matchData() because it forces, adding lots
                        // of overhead.  Instead use MatchIterator itself which is an overhead 
                        // free implementation of MatchData
                        val matchIter = regex.findAllIn(str)
                        for(_ <- matchIter) {
                            if(firstIndex == -1) {
                                firstIndex = matchIter.start min ((maxMatchBytes+1)/2)
                            if(lastIndex < matchIter.end) {
                                lastIndex = matchIter.end max (str.length - (maxMatchBytes/2))
                            segment += 1
                        val head = if(firstIndex < 1) "" else 
                                        new String(str.substring(0, firstIndex))
                        val tail = if(lastIndex == str.length) "" 
                                        else new String(str.substring(lastIndex, str.length))
                        endJoiner ! new Endpieces(buffer.chunk, head, tail)
                        reader ! buffer
                    case Done => endJoiner ! Done
        lazy val endJoiner = actor {
            var nextChunk = 0
            var lastTail = inputPrefix
            var doneCount = 0
            var completion: OutputChannel[Any] = null
            loop { 
                react {
                    // Endpieces might not arrive in order so filter for the next Endpart
                    case Endpieces(chunkNum, head, tail) if (chunkNum == nextChunk) => 
                        regex.findFirstMatchIn(lastTail + head) match {
                            case Some(m) => closure(m)
                            case _ =>
                        lastTail = tail
                        nextChunk += 1
                    case None => completion = sender

                    case Done if(doneCount == 1) => 
                        completion ! Done
                    case Done => doneCount += 1
        debugLog("cores:%d, matchers:%d, buffers/matcher:%d, buffersize:%dK\n", 
                numCores, numMatchers, buffersPerMatcher, bufferSize/1024)
        for(buffer <- buffers) reader ! buffer
        endJoiner !! None

1 comment: