Sunday, June 1, 2008

Multicore WideFinding

Ok, so here is some actual code to speed up the widefinding across multiple processor cores. I've tried to abstract out the new stuff in such a was as to leave the main driver application looking somewhat like the original strawman impl in Scala, which in turn was made to resemble the original WideFinder 2 strawman impl in Ruby. Turns out the new stuff maps nicely to the three problems mentioned earlier: matching, counting, and sorting, and so you'll see sprinkled throughout the code below references to Matcher, Counter, and Sorter (classes from the net.waldin.mc package) where previously you saw actual regex matching, counting and sorting code.

Another major difference between this version and the last is that it uses a regular expression to extract the log entries from the input, instead of doing this:

  • find line boundaries
  • split lines of text into whitespace seperated fields
  • remove some extraneous quotes and brackets
  • concatenate back together fields that contain spaces (like time and useragent)

A single regular expression can accomplish the same thing faster, all with one declarative statement. The regular expression is a doozy, but since Scala uses the Java regular expression engine, which is known to be full-featured and performant, may as well take advantage of it. After all, this is exactly what regular expressions were meant to deal with, as long as the expression itself is optimized for speed, with minimal backtracking, and correctly handles all reasonable input. I just wish the expression itself was a little more readable by us non-machines. Hmm, I wonder if ... maybe later...

So here's the main driver with references to the abstracted out multicore functionality, and the gnarly regular expression:

package net.waldin.wf

import java.io.{FileInputStream,BufferedInputStream}

import scala.compat.Platform.currentTime
import scala.util.matching.Regex._

import net.waldin.mc.{Counter,Matcher,Sorter}

/** A normal HTML report: Top 10 popular URLs by hits and bytes, top 10 404s, *
 *  top 10 client IPs by hits, and the top 10 referrers.  Only, skip the HTML */
object WideFinder2c extends Application {
    val hits, bytes, s404s, refs = new Counter()
    val clients = new Counter(12*1024*1024,96)
    val logEntryRegex = 
            ("""(?m)\n(\S+) - \S+ \[[^\]]+\] "([^\\"]+(?:\\.[^\\"]*)*)" (\d+) """+
            """(-|\d+) "([^\\"]+(?:\\.[^\\"]*)*)" "([^\\"]+(?:\\.[^\\"]*)*)"$""").r
    val requestRegex = """GET ((/ongoing/When/\d+x/\d+/\d+/\d+/[^ .]+)|[^ ]+) """.r

    def record(host: String, article: String, ref: String) {
        if(article != null) {
            hits.inc(new String(article))
            clients.inc(host)
            if(ref.length() > 1 && !ref.startsWith("http://www.tbray.org/ongoing/")) 
                refs.inc(ref)
        }
    }

    def report(label: String, map: Counter, megabytes: Boolean) {
        printf("Top %s:\n", label)
        val keysByCount = new Sorter(map.keys, map.size, (a:String,b:String) => 
            (map(b) - map(a)) match {
                case 0 => a.compareTo(b)
                case n => n.toInt
            }
        )
        for(key <- keysByCount.take(10)) {
            val format = if(megabytes) "%10.1fM: %s\n" else "%11.0f: %s\n"
            val value: Double = if(megabytes) map(key)/(1024.0d*1024.0d) else map(key)
            printf(format, value, if(key.length>60) key.substring(0,60) + "..." else key)
        }   
        println
    }

    override def main(args: Array[String]) {
        new Matcher(new FileInputStream(args(0)), logEntryRegex, "\n", "", _ match {
            case Some(m) if(m.group(2).startsWith("GET")) => {
                def f(i:Int) = new String(m.group(i))
                val (host, request, status, size, ref) = 
                        (f(1), f(2), f(3).toInt, if(f(4)=="-") 0 else f(4).toInt, f(5))
                // avoid MatchIterator.matchData because it forces, lots of overhead
                val reqMatch = requestRegex.findAllIn(request)
                if(reqMatch.hasNext) {
                    val uri = new String(reqMatch.group(1))
                    // workaround for http://lampsvn.epfl.ch/trac/scala/ticket/983
                    val article = if(reqMatch.start(2) == -1) null else reqMatch.group(2)
                    status match {
                        case 200 => 
                            bytes.inc(uri, size)
                            record(host, article, ref)
                        case 304 => record(host, article, ref)
                        case 404 => s404s.inc(uri)
                        case _ =>
                    }
                }
            } 
            case Some(_) => // not GET, ignore
            case None =>    // matcher is done
                report("URIs by hit", hits, false)
                report("URIs by bytes", bytes, true)
                report("404s", s404s, false)
                report("client addresses", clients, false)
                report("refs", refs, false)
                printf("[total %,dms]\n", currentTime - executionStart)             
                exit
        }).start
    }
}

As it stands now, this code takes approximately 20 minutes to get through the entire 42GB dataset. Here are some times from a recent run:

        real    20:53.8
        user  8:19:50.2
        sys     28:13.9

That comes out to about 2500% cpu utilization, so hopefully there's still some room for improvement. I'll review Sorter, Counter, and Matcher in some future posts.

No comments:

Post a Comment