Friday, June 20, 2008

KBps / LoC

Here's an interesting metric that sheds some more light on the Wide Finder 2 results. I took the overall throughput of each entry in kilobytes per second, and divided it by the total Lines of Code reported by each submitter. The intent was to capture the efficiency of the underlying language and algorithmic style, sort of a bang-for-buck comparison, and I think it's a pretty effective scalar metric to that effect.

It's interesting to see how high Brian Frank's Fan entry and Alex Morega's Python entry rank, and how much lower some of the other entries ended up. Since so many of the entries in the results table did not report a simple line count, the LoC listed here may not be what the submitter intended, but it was the best I could do with the data. Let me know if there's something you'd like me to change.

Less code

One dimension that I hadn't explored fully in my attempts at Wide Finding was that of program size. My primary concern so far has been algorithmic design and performance. Starting with around 340 lines of code in 5 separate source files, I removed all trace of reusability, and some of the configurability, and what resulted was one file (WideFinder2d.scala) containing just 196 lines including whitespace, comments, etc.

It's not as reusable. What was formerly a general purpose Sorter is now just an embedded sort function, and what was formerly a Matcher is now just an embedded eachMatch function. The configurability went mostly unused, so I just yanked that. In terms of performance, this version did pretty well:

real    14:57.6
user  7:18:11.7
sys     10:01.1

The difference in performance is most likely due to the rearranging of code to avoid duplication, and rehashing as the code no longer preallocates entries in the referrer and clients tables. The resulting code is here.

Saturday, June 14, 2008

Diminishing returns

With some minor changes to my Wide Finder 2 submission, I've been able to eek out an almost 10% improvement in performance. The first change was pretty straightforward, and is a lesson in trusting blind premature optimizations (or optimisations, as the case may be. The scalac compiler has a switch called -optimise which is described in the help text as "Generates faster bytecode by applying optimisations to the program." I have no idea what this option actually does, and a quick google search revealed no helpful information. I had used it blindly expecting that it would at least do no harm. Well, I can say for sure that it didn't help. My fastest times yet were achieved without using this switch. Whether it reduced performance is debatable, but I don't plan to use it again until I learn more about what it does.

The next change was to move the hit detection out of my main regular expression and into a spot that reduced the number of times it was applied (only for 200 and 304 status requests), and then to reduce its effect further by conditionally applying it only when the requested URL satisfies two requirements that are very efficient to check:

  1. The URL is at least 30 characters long, and
  2. the 28th character is a digit

By applying these checks first, many of the non-matching URLs can be quickly disqualified without the need to do any regular expression matching.

The final change was to revisit the decision to not use AtomicLongs in the Counter. Originally the difference in performance was negligible and the increase in complexity substantial. For example, in the comparator used for sorting map keys, I would have to change from:

map(b) - map(a)

to either this:

map(b).subtract(map(a)).longValue

or this:

map(b).longValue - map(a).longValue

This wasn't the only place in the code where such changes would be required. I really didn't like having so much of the implementation details of Counter leak out into the rest of the code. I even considered various ways to wrap a Map[String,AtomicLong] in a MapWrapper[String,Long], but the incompatible type parameter complicated things. Given that the performance difference was negligible at the time, and other parts of the code required more attention, I just left the counter using Long values.

I was reminded of all of this when Erik Engbrecht left me a comment asking why I hadn't used AtomicLongs. So I went back and took another look, and sure enough, there is a simple way to use AtomicLongs in the Counter without leaking them all over the rest of the code, and by now, performance was tight enough that the occasional lock contention incurred in ConcurrrentHashMap.replace() was becoming noticeable. Here's how I did it:

package net.waldin.mc

import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong

import Config.{counters,segments}

class Counter(initSize: Int, segs: Int) {
    def this() = this(counters,segments)
    val h = new ConcurrentHashMap[String,AtomicLong](initSize,0.75f,segs)
    def size = h.size

    def inc(k:String, i:Int) =
        if(h.containsKey(k) || h.putIfAbsent(new String(k), new AtomicLong(i)) != null)
            h.get(k).addAndGet(i)
    
    def apply(key:String) = {
        val ret = h.get(key)
        if(ret == null) 0L else ret.longValue
    }
    
    def keys = new Iterator[String] {
        val i = h.keySet.iterator
        def next = i.next
        def hasNext = i.hasNext
    }
}

Basically, I realized that Counter doesn't need to be a Map, so it doesn't need to implement MapWrapper. It just needs a few Map-like methods, namely size, apply, and keys. The apply method is then free to translate the underlying AtomicLong to a Long and the caller is none the wiser. Thanks Erik!

The best performance to date with this new code so far is:

real    14:04.6
user  6:58:06.7
sys      9:35.3

Given the diminishing returns I'm getting from small tweaks here and, I've been looking at changing something more fundamental, and hope to post about that effort soon.

Wednesday, June 11, 2008

Regular Expression vs. Space Delimiting

There's been a little debate brewing over at the Wide Finder mailing list regarding the use of space delimiters to parse log entries, as opposed to a full blown regular expression. I struggled with this decision myself before finally choosing to use a regular expression. Having done so, I found my results differed from those of the reference implementation as I described here. I never expected that the reference would have to change or be re-run, but I did want to understand why my results differed from it. In fact this entire exercise is really a way for me to become a more effective Scala programmer, and to become a more effective concurrent programmer. How my results compare to others', either in performance or accuracy is only relevant in so far as I can learn something from their successes and missteps. That being said, here's the regex I used in my most recent submission followed by some thoughts about it:

val logEntryRegex = 
    ("""\n(\S+) - \S+ \S+ \S+ "GET ((/ongoing/When/\d+x/\d+/\d+/\d+/[^" .]+"""+
    """(?=[ "]))|[^" ]+)[^"]*" (\d+) (\S+) "([^\\"]*(?:\\.[^\\"]*)*)" [^\n]*""").r

The idea behind using such a beastly regular expression was four-fold:

  1. Optimize for common cases while accepting as many edge cases as practical
  2. Do the line splitting during the same pass
  3. Do the GET request filtering during the same pass
  4. Do the hit detection during the same pass

The leading newline is used instead of the ^ boundary marker because I'm splitting buffers arbitrarily relative to line boundaries, and don't want false positives on buffer boundaries. However, you do need to prepend the input stream with a newline to match the first line.

While there are many regular expressions that are equivalent to the above expression in what they match, some have exponentially worse performance. I would argue that while using a regular expression can make your wide finder more precise, it can also make it much slower, given the reality of combinatorial explosions that plague complex, poorly written expressions. If you can, profile your expression using a profiler like RegexBuddy. This profiler does a pretty good job of uncovering backtracking overload.

Also, I know of at least two log entries which my regular expression matches, but does not extract the "correct" URL. One was reported by me, and the one reported by Erik Engbrecht starting the mailing list thread.

ded947.cotswold-internet.net - - [09/Jan/2008:05:23:29 -0800] "GET /ongoing/ When/200x/2005/03/08/BloggingIsGood HTTP/1.0" 200 15326 "-" "-"
customer-reverse-entry.208.96.54.93 - - [12/Oct/2007:18:41:47 -0700] "GET /ongoing/What/The World/Products HTTP/1.0" 404 371 "-" "disco/Nutch-1.0-dev (experimental crawler; www.discoveryengine.com; disco-crawl@discoveryengine.com)"

In both of these cases the first case, my Wide Finder recorded "/ongoing/" as the URL requested. I'm not sure what was fetchedMy Wide Finder made the same assumption about what was requested as the reference implementation and, apparently as the web server in the second case. I've collected some other unusual log entries found while debugging for use while testing, my favorites being:

r61h80.res.gatech.edu - - [18/Mar/2003:05:19:16 -0800] "GET /ongoing/rug70.jpg HTTP/1.1" 200 2163 "http://www.tbray.org/ongoing/rug70.jpg" "Lynx Opera Mozilla MSIE Telnet PCap 4.0 5.0 6.0 <a href=\"http://members.fortunecity.com/randomn\"> http://members.fortunecity.com/randomn</a> <h1><b>DEATH TO SPAM</b></h1>"
corsair.digi.com.br - - [13/Nov/2003:11:44:21 -0800] "GET /ongoing/scripts/..%c1%1c../winnt/system32/cmd.exe?/c+dir+c:\\ HTTP/1.0" 404 239 "-" "Mozilla/4.0 (compatible; MSIE 6.0; Windows 98)"

Maybe this is a good chance to reflect on Postel's Law?

Tuesday, June 10, 2008

java.util.regex leaks

There's a resource leak waiting to bite you if you use java.util.regex.* much. The problem is related to a known behavior of String.substring() where the returned substring is backed by the original string. Actually, the substring is backed by the same char[] that backs the original string to be more precise, which is a great optimization if:

  • both strings are of relatively equivalent sizes, or
  • you're planning on keeping the larger of the two strings around long term anyway.

After all, why copy characters out to another array when there's a perfectly good one to reuse?  Well, if either of these assumptions are untrue, you'll end up use more memory than you realize, and it's worse the longer your code runs, or the bigger the difference in size between the two strings.

It bit me again recently while working on my Wide Finder 2 submission, where the Matcher component would repeatedly read 3 megabytes from disk, decode it into a string backed by a 6 megabyte character array, and then perform regular expression matches on that string.  Whenever it found previously unseen value, it would store it as the key in some hash map.  The values were obtained from calls to java.util.regex.Matcher.group() or .group(n), and it turns out these Matcher methods use substring() on the string that was searched. So each HashMap key (just a few tens of bytes long, for sure) was backed by a 6 megabyte character array. The result was that most of the 6 megabyte character arrays were retained in memory for the duration of the application, even though only a few substrings from each string were required.

The solution I chose was to clone the string using the String copy constructor which copies the portion of the character array in the process of cloning. Applying the fix as late as possible ensures that the intended optimization remains beneficial whenever it can, but doesn't get in the way when it's inappropriate. Here's an example, in Java:

import java.util.*;
import java.util.regex.*;

public class Test {
    public static void main(String[] args) {
        String string = "really, really, really, really, really, really long string";
        Matcher matcher = Pattern.compile("(\\w+),").matcher(string);
        Map<String,List<Integer>> map = new HashMap<String,List<Integer>>();

        while(matcher.find()) {
            String found = matcher.group(1);
            List positions = map.get(found);
            if(positions == null) {
                positions = new ArrayList<Integer>();
                map.put(new String(found), positions);
            }
            positions.add(matcher.start(1));
        }
    }
}

I thought it would be best to demonstrate this in Java since that would benefit the widest potential audience, but after being so fully immersed in Groovy and Scala lately, writing even this much Java was quite tedious. There really is a lot of Syntactic Noise that I will be happy to forget.

Friday, June 6, 2008

Results so far

Here's an update on my Wide Finder 2 submission, including program output, some performance data, and some thoughts about what's shaping the current performance. I've been making improvements to the code while writing these posts over the past couple of weeks, so the picture you get when reading them all at once may seem somewhat smeary and inconsistent. If you want a point-in-time snapshot to see how it all fits together, you can download this executable jar + source code. The jar requires scala-library.jar from Scala 2.7.1-final to be in the same directory when executed, and requires a JVM version 1.5 or higher. Here's the command line used to invoke the application as well as the output and timings:

% java -server -Xss1m -Xms3g -Xmx3g -jar WideFinder2-Scala-RayWaldin.jar /wf1/data/logs/O.all

43,178.0 MB (48.94 MBps)

Top URIs by hit:
   218540: /ongoing/When/200x/2003/09/18/NXML
   148630: /ongoing/When/200x/2003/03/16/XML-Prog
   129746: /ongoing/When/200x/2003/10/16/Debbie
   110427: /ongoing/When/200x/2003/07/17/BrowserDream
   102513: /ongoing/When/200x/2003/07/30/OnSearchTOC
   100934: /ongoing/When/200x/2003/04/26/UTF
    94408: /ongoing/When/200x/2003/07/25/NotGaming
    93060: /ongoing/When/200x/2003/04/06/Unicode
    92006: /ongoing/When/200x/2006/01/31/Data-Protection
    89795: /ongoing/When/200x/2004/12/12/BMS

Top URIs by bytes:
870656.0M: /ongoing/ongoing.atom
373564.5M: /ongoing/potd.png
278894.3M: /ongoing/ongoing.rss
 91443.9M: /ongoing/rsslogo.jpg
 63061.5M: /ongoing/When/200x/2004/08/30/-big/IMGP0851.jpg
 39645.0M: /ongoing/When/200x/2006/05/16/J1d0.mov
 36557.5M: /ongoing/When/200x/2007/12/14/Shonen-Knife.mov
 36257.1M: /ongoing/
 33337.3M: /ongoing/moss60.jpg
 31966.7M: /ongoing/When/200x/2004/02/18/Bump.png

Top 404s:
    54271: /ongoing/ongoing.atom.xml
    28030: /ongoing/ongoing.pie
    27364: /ongoing/favicon.ico
    24631: /ongoing/When/200x/2004/04/27/-//W3C//DTD%20XHTML%201.1//EN
    24005: /ongoing/Browser-Market-Share.png
    23924: /ongoing/Browsers-via-search.png
    23850: /ongoing/Search-Engines.png
    22637: /ongoing/ongoing.atom'
    22619: //ongoing/ongoing.atom'
    20578: /ongoing/Feeds.png

Top client addresses:
   215496: msnbot.msn.com
   110909: crawl-66-249-72-173.googlebot.com
   106361: crawler14.googlebot.com
    96366: crawl-66-249-72-172.googlebot.com
    74220: crawl-66-249-72-72.googlebot.com
    67821: sv-crawlfw3.looksmart.com
    66361: h161.c221.tor.velocet.net
    65828: 125.19.50.169
    63676: crawl-66-249-72-165.googlebot.com
    56092: crawl-66-249-72-38.googlebot.com

Top referrers:
   182996: http://tbray.org/ongoing/
   158562: http://slashdot.org/
    62218: http://www.google.com/reader/view/
    40243: http://daringfireball.net/
    30197: http://programming.reddit.com/
    20463: http://reddit.com/
    15182: http://www.scripting.com/
    13864: http://www.tbray.org/
    13618: http://www.ask.com/web?q=page=23
     9917: http://www.macsurfer.com/

[total 912,280ms]
hits: 4680, bytes: 45692, 404s: 128489, refs: 1902292, clients: 3573466

real    15:17.0
user  7:35:07.7
sys      9:13.0

Two minor differences remain between my results and those of the reference implementation, which are shown below. The differences are minor, and are due to my choice to not assume fields are space delimited, as many of the other entries do, including the reference implementation.

--- coolstack.log   2008-06-06 22:45:33.000000000 -0700
+++ myresults.txt   2008-06-08 14:02:40.000000000 -0700
@@ -5,5 +5,5 @@
      110427: /ongoing/When/200x/2003/07/17/BrowserDream
  102513: /ongoing/When/200x/2003/07/30/OnSearchTOC
-     100932: /ongoing/When/200x/2003/04/26/UTF
+     100934: /ongoing/When/200x/2003/04/26/UTF
       94408: /ongoing/When/200x/2003/07/25/NotGaming
   93060: /ongoing/When/200x/2003/04/06/Unicode
@@ -19,5 +19,5 @@
    39645.0M: /ongoing/When/200x/2006/05/16/J1d0.mov
36557.5M: /ongoing/When/200x/2007/12/14/Shonen-Knife.mov
-   36257.0M: /ongoing/
+   36257.1M: /ongoing/
    33337.3M: /ongoing/moss60.jpg
31966.7M: /ongoing/When/200x/2004/02/18/Bump.png

Profiling has shown that this process is CPU bound. It's been noted by several people that even a simple, single threaded, non-NIO Java application can traverse the entire 42GB file in about 5 minutes on the T2000 server, yet the best performance to date from any JVM based submission is still under 50MBps.  It's becoming pretty clear that there's a major obstacle to any further performance improvements: the Java Wide Character Tax.

There are two forms of overhead caused by Java's support for wide characters. The most obvious, but least taxing form is character decoding. Since we're dealing with ASCII data, the overhead of decoding it is pretty light. It's definitely best to not do this as bytes are being read in a single blocking thread, but rather to let the worker threads each do character decoding for their own chunks of data. But it's still a pretty trivial process. My profiling shows that character decoding was active about 2% of the time while sampling CPU performance.

The more incidious form of the Wide Character Tax comes from the fact that, while Java characters are two bytes wide, what they actually hold are variable width UTF-16 representations of characters, which are either two bytes or four bytes, depending on the character. I call this the Double Wide Character Tax, even though it's effects are much more than twice that of the decoding tax. From the Javadocs for java.lang.Character:

The char data type (and therefore the value that a Character object encapsulates) are based on the original Unicode specification, which defined characters as fixed-width 16-bit entities. The Unicode standard has since been changed to allow for characters whose representation requires more than 16 bits. ... The Java 2 platform uses the UTF-16 representation in char arrays and in the String and StringBuffer classes. In this representation, supplementary characters are represented as a pair of char values, the first from the high-surrogates range, (\uD800-\uDBFF), the second from the low-surrogates range (\uDC00-\uDFFF).

Each and every character must be checked for the possibility that it's a supplementary character, and to understand where character boundaries are, and to correctly compute character counts. The numbers below come from a 22 second CPU sampling of a single thread as captured by Shark


Self      Total     Library              Symbol                          
10.2 s    10.2 s    java.lang            Character::codePointAt
4.0 s     4.0 s     java.util.regex      Pattern$Curly::match0
2.1 s     2.1 s     java.util.regex      Pattern$Sub::match
1.6 s     1.6 s     java.lang            String::equals
702.6 ms  702.6 ms  scala.util.matching  Regex$MatchData$class::group
437.5 ms  437.5 ms  java.lang            StringCoding$CharsetSD::decode

As you can see, in this worker thread, almost 50% of the time is spent computing code points. Like ASCII character decoding, this should be a trivial operation as there are no ASCII characters that translate into supplementary characters. Unlike character decoding, however, this has to be done repeatedly, every time a character position is traversed. Regular expression application is particularly prone to this type of penalty because it frequently revisits character positions, and the regular expression evaluator built into Java does nothing to memoize or otherwise optimize away the repeated checks for supplementary characters. From the above numbers, it seems that a significant portion of the time in each worker thread is spent paying this Double Wide Character Tax by the narrowest of characters.

So what next? I'm not sure if there's a way around this particular obstacle short of creating a byte-oriented regular expression engine on the JVM, far more trouble than it's worth for this challenge. If there's an easier solution out there, I'm sure someone will find it and we'll know what it is soon.

Thursday, June 5, 2008

Multicore Matcher

The third and final piece of my Scala Wide Finder 2 submission is the multicore Matcher. This component takes an InputStream, a regular expression, and a closure, and, using as many CPU cores as you can spare (All your cores are belong to us!), it applies your closure once for each match in the stream, in no particular order.

Externally, it looks pretty simple, like a normal loop construct that calls a closure. This example prints every line in the specified file:

import java.io.FileInputStream
import net.waldin.mc.Matcher._

object Test extends Application {
    eachMatch(new FileInputStream("O.100k"), "[^\n]+\n".r) { m =>
        print(m.matched)
    }
}

Looks pretty straightforward from the outside, but there's a lot going on behind the scenes, and unfortunately, this abstraction has a few big leaks that the caller needs to be aware of to prevent sinking themselves. I'll point them out after I describe the internal of the Matcher.

The Matcher is designed as a set of actors which communicate by passing messages to each other asynchronously, and is implemented using the Scala Actors library. There are three types of Actors used in the Matcher:

reader
there is one reader responsible for reading bytes into Buffers from the input stream, and passing these Buffer messages to Matchers. When the input stream has been exhausted, the reader sends every matcher a Done message, and then exits.
matcher
there are many matchers, usually one for each CPU core. Matchers receive Buffers, which they search using the supplied regular expression, and for each match they find, they call the supplied closure. They also generate Endpieces, one for every Buffer they receive, containing some part of the head and tail of that Buffer, and send this as a message to the endJoiner. Additionally, when a matcher receives a Done message, and has finished processing all Buffers in it's input queue, it sends the endJoiner a Done message, and exits.
endJoiner
there is one endJoiner whose primary function is to receive Endpieces, and using the Endpieces from two consecutive Buffers, it joins the tail of the first buffer to the head of the next, and then searches this buffer using the same regular expression used by the matcher. If it finds a match, it calls the same closure. The secondary function of the endJoiner is signal completion of the matching run, which it does after receiving one Done message per matcher, and processing all remaining Endpieces.

The messages types are defined as:

class Buffer (val matcher: Int, val data: Array[Byte]) {
    var chunk, size = 0
}

case class Endpieces (val chunkNum: Int, val head: String, val tail: String)

case object Done

Here's a UML 2 Sequence diagram that puts it all together graphically, with the actors that send and receive messages (and the closure) along the top, and the parallel processing of each constituent shown in vertically separated frame fragments.

There are a few parts to this design that may not be apparent at first. Buffers deliver Bytes from the reader to each matcher. That means the job of character decoding is left to the matcher. Also, there's no newline detection built into the matcher at all. That's all done by the supplied regular expression (or within the closure, or never at all, depending on the needs of the caller). The abstraction leaks are all related to this need to maximize throughput in the reader by doing as little as possible, and then expecting subsequent actors to make sense of pieces they've been given.

As far as I can tell, here are all the gory internals you need to consider when using this Matcher:

ASCII only
this technique can easily be made to work with any fixed width character encodings, but for now it's limited to ASCII. Variable byte encodings will be a challenge to support without sequentially decoding the entire output stream. Too bad character streams don't have "keyframes" where a known good state can be guaranteed intermittently midstream. [Update: Meanwhile, over on the Wide Finder mailing list, James Aylett has just pointed out that UTF-8 is resynchronizable after all, so this scheme could easily be made to work for UTF-8 encoded data]
No overlapping matches
the matcher does not support matches that overlap each other. As it is right now, some overlapping matches will be detected, but those happening at buffer boundaries may not. Partially overlapping matches can be supported by slightly altering the construction of endpieces, but as it stands right now, even partially overlapping matches are not not guaranteed to be found at buffer boundaries. Partially overlapping means match A starts before match B starts and ends before match B ends. Fully overlapping means match A starts on or before the start of match B and ends on or after the end of match B. Partial overlaps are supported.
Maximum match size
to speed up cases where there are sparse matches resulting in huge endpieces, a maximum match size is imposed that restricts the size of endpieces (4096 bytes by default).
Regex boundary matchers lose their meaning
Line boundary matchers (^ and &), word boundary matchers (\b and \B), previous match end (\G) and input boundary matchers (\A, \Z, and \z) do not reliably detect their respective boundary types. The problem is they treat the edges of the input as a boundary, but what they sometimes detect as the edge on input is just a buffer boundary. So boundary markers should be used very cautiously, if at all. In order to help callers detect line boundaries reliably, the Matcher inserts \n at the beginning and end of the input stream. Using \n in your regular expression to detect line boundaries is guaranteed to work, and there is no chance a you'll accidentally mistake a buffer boundary for the beginning or end of the input stream.

Here's the complete listing of the Matcher component:

package net.waldin.mc

import java.io.InputStream

import scala.actors._
import scala.actors.Actor._
import scala.compat.Platform.currentTime
import scala.Console.{withOut,err}
import scala.util.matching.Regex
import scala.util.matching.Regex.{MatchData => Match}

import Config._

object Matcher {
    class Buffer (val matcher: Int, val data: Array[Byte]) {
        var chunk, size = 0
    }

    case class Endpieces (val chunkNum: Int, val head: String, val tail: String)

    case object Done

    def debugLog(format: String, args: Any*) = if(debug) withOut(err) {printf(format, args:_*)}

    def mb(bytes: Long) = bytes/(1024.0*1024.0)

    def eachMatch(in: InputStream, regex: Regex)(closure: Match => Unit):Unit = 
            (eachMatchAsync(in, regex)(closure))()
    
    /** @return a future representing completion of this matcher */
    def eachMatchAsync(in: InputStream, regex: Regex)(closure: Match => Unit):Future[Any] = {
        val buffers = for(_ <- (0 until buffersPerMatcher).toList; 
                            i <- (0 until numMatchers).toList) yield {
                        new Buffer(i, new Array[Byte](bufferSize))
        }

        lazy val reader: Actor = actor {
            var chunkNum = 0
            var bytesTotal = 0L
            val startTime = currentTime
            def elapsedSeconds = (currentTime - startTime)/1000.0
            def progressLog(bytes: Long) =
                    debugLog("\r%,3.1f MB (%,4.2f MBps)", {mb(bytes)}, {mb(bytes)/elapsedSeconds})
        
            loop {
                receive { 
                    case buffer: Buffer =>
                        val bytesRead = in.read(buffer.data, 0, bufferSize)
                        if(bytesRead > -1) {
                            buffer.chunk = chunkNum
                            buffer.size = bytesRead
                            matchers(buffer.matcher) ! buffer
                            bytesTotal += bytesRead
                            progressLog(bytesTotal)
                            chunkNum += 1
                        } else {
                            progressLog(bytesTotal)
                            println
                            endJoiner ! new Endpieces(chunkNum+1, inputSuffix, "")
                            for(matcher <- matchers) matcher ! Done
                            exit
                        }
                }
            }
        }
    
        lazy val matchers = for(n <- (0 until numMatchers).toList) yield actor {
            loop { 
                receive {
                    case buffer: Buffer => 
                        var firstIndex = -1
                        var lastIndex = 0
                        var segment = 0
                        val str = new String(buffer.data, 0, buffer.size, "ASCII")
                    
                        // avoid using MatchIterator.matchData() because it forces, adding lots
                        // of overhead.  Instead use MatchIterator itself which is an overhead 
                        // free implementation of MatchData
                        val matchIter = regex.findAllIn(str)
                        for(_ <- matchIter) {
                            if(firstIndex == -1) {
                                firstIndex = matchIter.start min ((maxMatchBytes+1)/2)
                            }
                            if(lastIndex < matchIter.end) {
                                lastIndex = matchIter.end max (str.length - (maxMatchBytes/2))
                            }
                            closure(matchIter)
                            segment += 1
                        }
                        val head = if(firstIndex < 1) "" else 
                                        new String(str.substring(0, firstIndex))
                                        
                        val tail = if(lastIndex == str.length) "" 
                                        else new String(str.substring(lastIndex, str.length))
                                        
                        endJoiner ! new Endpieces(buffer.chunk, head, tail)
                        reader ! buffer
                    
                    case Done => endJoiner ! Done
                                 exit
                }
            }
        }
    
        lazy val endJoiner = actor {
            var nextChunk = 0
            var lastTail = inputPrefix
            var doneCount = 0
            var completion: OutputChannel[Any] = null
            
            loop { 
                react {
                    // Endpieces might not arrive in order so filter for the next Endpart
                    case Endpieces(chunkNum, head, tail) if (chunkNum == nextChunk) => 
                        regex.findFirstMatchIn(lastTail + head) match {
                            case Some(m) => closure(m)
                            case _ =>
                        }
                        lastTail = tail
                        nextChunk += 1
                
                    case None => completion = sender

                    case Done if(doneCount == 1) => 
                        completion ! Done
                        exit
                    
                    case Done => doneCount += 1
                }
            }
        } 
    
        debugLog("cores:%d, matchers:%d, buffers/matcher:%d, buffersize:%dK\n", 
                numCores, numMatchers, buffersPerMatcher, bufferSize/1024)
            
        for(buffer <- buffers) reader ! buffer
        
        endJoiner !! None
    }
}

Multicore Counter

Like sorting, thread safe tallying is pretty straightforward. Or at least it should be. If your coding in any JVM based language, it should be virtually free. The java.util.concurrent package has been part of the JDK for a few years now and provides very easy to use primitives that are almost perfect for tallying. Almost.

There are two parts to this puzzle. The first part is, you need a thread safe map (from strings to accumulators), and the second part is, you need an accumulator with atomic increment/add functions so that simultaneous updates to an accumulator will not collide. These two parts exist in the form of java.util.concurrent.ConcurrentHashMap and java.util.concurrent.atomic.AtomicInteger. Putting it all together with Scala's MapWrapper for Java maps, you get this simple solution:

class Counter extends MapWrapper[String,AtomicInteger] {
   override val underlying = new ConcurrentHashMap[String,AtomicInteger]
}

The problem is, that doesn't work as expected. When the Java concurrent classes are taken together with a teaspoon of Scala's syntactic sugar for maps, you can get yourself into some unexpected, unsynchronized, hot water. Normally, Scala offers a couple of sweet shortcuts for dealing with maps (or any other objects that adopt the same protocol), as this code demonstrates:

object Test extends Application {
    val m = new Counter              // create an empty counter
    m("a") = new AtomicInteger(4)    //"a" -> 4
    m("a") += 3                      //"a" -> 7
}

What's really happening under the covers is something like this:

object Test extends Application {
   val m = Map[String,Int]()
   m.update("a", new AtomicInteger(4))
   m.update("a", m.apply("a") + 3)
}

which brings us straight to our first problem: you can't use arithmetic operators with AtomicIntegers. You could extend AtomicInteger with a Scala class that defines +, but an even more incidious problem lurks just beyond that: you can't use Scala's compound assignment operator (<op>=) syntactic sugar and expect atomicity. At first it seemed like you could override whatever method Scala's compiler substitutes for += on a MapWrapper, and have it call the atomic ConcurrentMap.replace method, instead of Map.put, but as you can see, there is no single method. The compiler injects two method calls in place of +=. The update and apply are performed seperately without synchronization. So while it would be nice to use this += syntactic sugar, it's out. Instead we're going to need do define a new method that initializes or increments an accumulator with guaranteed atomicity, something like:

def inc(key:String, amount:Int) = {
   var oldCount = underlying.putIfAbsent(key, new AtomicInteger(amount))
   if(oldCount == null) amount
   else oldCount.addAndGet(amount)
}

def inc(key:String):Int = inc(key,1)

So that works fine for counters that have Integer range accumulators, how about when you need the range of a Long? Easy. Generics to the rescue. Right? No. No? Crap. In Java there is no way to parameterize your type over the set of {AtomicInteger, AtomicLong} so that your generic type can use the addAndGet method, for two reasons.

  1. they each sepearately define an addAndGet method, not inherit its declaration from a common ancestor, and
  2. they each take an argument of a different fixed type

In Scala you can get around these problems using either Structural Types or Implicit Type Parameters (a.k.a. the Poor Man's Type Classes). Both of these solutions work to some limited extent for this problem, but both also came with too much conceptual and computational overhead for my needs. Structural types, I was told, are implemented using Java reflection, which is a huge performance drain, and implicit type parameters require implicit objects to be visible through your code, and extra implicit argument lists on many function definitions. Way too much work for too little gain.

So instead of parameterizing over the accumulator type, I decided to just stick with Long accumulators for all cases. Even using AtomicLongs proved more trouble that it was worth, causing unneeded complexity either in the class definition of Counter, or externally, everywhere the accumulators were used in the code. So ultimately, I bit the bullet and just implemented an atomic increment method on Longs. Here's the finished product:

package net.waldin.mc

import scala.collection.jcl.MapWrapper
import java.util.concurrent.ConcurrentHashMap

import Config.{counters,segments}

/** a Map backed by a java.util.concurrent.ConcurrentHashMap with
 *  a simple atomic increment method 
 */
class Counter(override val underlying: ConcurrentHashMap[String,Long])
        extends MapWrapper[String,Long] {
    def this() = this(new ConcurrentHashMap[String,Long](counters,0.75f,segments))
    def this(initSize: Int, segments: Int) = 
            this(new ConcurrentHashMap[String,Long](initSize,0.75f,segments))

    /** atomically initializes or increments the counter at key by one
     *  @return the new count */
    def inc(key:String):Long = inc(key,1)

    /** atomically initializes or increments the counter at key by the specified amount
     *  @return the new count */
    def inc(key:String, amount:Int) = {
        var oldCount = 0L;
        if(underlying.containsKey(key) || 
                (underlying.putIfAbsent(new String(key), amount) != 0)) {
            do {
                oldCount = underlying.get(key)
            } while(!underlying.replace(key, oldCount, oldCount + amount))
        }
        oldCount + amount
    }
}

Seems like an awful long journey for so little code, but it was worth it. Writing this little block of code gave me a much better understanding of strengths and weaknesses of Scala.

Monday, June 2, 2008

Multicore Sorter

Continuing on from my last post, I'll start with a review of the simpler of the three multicore abstractions: the Sorter. Sorting is a great candidate for parallel processing as many sorting algorithms already divide and conquer their work. My approach was to borrow from two of these algorithms, QuickSort and MergeSort, to do just enough sorting just in time. Here's how:

  1. partition the data to be sorted into evenly sized shards, one for each cpu core
  2. launch a worker thread to quicksort each shard
  3. when all the workers are complete, merge the sorted shards back together

This still leaves a large merge in a single thread as the last step. Luckily, the WideFinder 2 Benchmark doesn't require us to completely sort all the data. We just need the top ten items in each category. So given a set of sorted shards (say that fast a few times!), it's easy to iterate over them to produce a list of the overall top ten items. The implementation below does the parallel quicksort, and then, acting as an Iterator over the entire sorted collection, amortizes the work of merging the shards back together over the sequential access of the entire collection. Given that we only need 10 items, most of this work goes undone in our use of Sorter. Here's the code:

package net.waldin.mc

import scala.actors.Actor._

import Config.numSorters

class Sorter[T](iter: Iterator[T], size: Int, comp: (T,T)=>Int) extends Iterator[T] {
    val shardSize = Math.ceil(size.toDouble / numSorters).toInt
    val shardIdx = List.range(0, size, shardSize).toArray
    val array = new Array[T](size)
    var sorted = false
    final def shardOffset(shard:Int) = shard * shardSize
        
    object Comparator extends java.util.Comparator[T] {
        override def compare(a: T, b: T) = comp(a,b)
    }
    
    def sort {
        //extract array
        iter.copyToArray(array,0)
    
        // create actors with array shards for quicksort
        val sorters = for(shard <- (0 until numSorters).toList) yield {
            scala.actors.Futures.future {
                val start = shardOffset(shard)
                val end = size min (start + shardSize)
                java.util.Arrays.sort(array, start, end, Comparator)
            }
        }

        // await completion of futures
        for(s <- sorters) s() 
        sorted = true
    }
    
    override def hasNext: Boolean = {
        if(!sorted) sort
        firstShardWithData.isDefined
    }
    
    private def firstShardWithData = {
        var shard = 0
        while(shard < numSorters && 
                shardIdx(shard) == (size min shardOffset(shard + 1))) {
            shard += 1
        }
        if(shard == numSorters) None else Some(shard)
    }
    
    override def next(): T = {
        if(!sorted) sort
        firstShardWithData match {
            case None => throw new NoSuchElementException
            case Some(n) =>
                var shard = n
                var tryShard = shard + 1
                while(tryShard < numSorters) {
                    if(shardIdx(tryShard) < (size min shardOffset(tryShard + 1)) && 
                            comp(array(shardIdx(tryShard)), array(shardIdx(shard))) < 0){
                        shard = tryShard
                    }
                    tryShard += 1
                }
                shardIdx(shard) += 1
                array(shardIdx(shard) - 1)
        }
    }
}

Sunday, June 1, 2008

Multicore WideFinding

Ok, so here is some actual code to speed up the widefinding across multiple processor cores. I've tried to abstract out the new stuff in such a was as to leave the main driver application looking somewhat like the original strawman impl in Scala, which in turn was made to resemble the original WideFinder 2 strawman impl in Ruby. Turns out the new stuff maps nicely to the three problems mentioned earlier: matching, counting, and sorting, and so you'll see sprinkled throughout the code below references to Matcher, Counter, and Sorter (classes from the net.waldin.mc package) where previously you saw actual regex matching, counting and sorting code.

Another major difference between this version and the last is that it uses a regular expression to extract the log entries from the input, instead of doing this:

  • find line boundaries
  • split lines of text into whitespace seperated fields
  • remove some extraneous quotes and brackets
  • concatenate back together fields that contain spaces (like time and useragent)

A single regular expression can accomplish the same thing faster, all with one declarative statement. The regular expression is a doozy, but since Scala uses the Java regular expression engine, which is known to be full-featured and performant, may as well take advantage of it. After all, this is exactly what regular expressions were meant to deal with, as long as the expression itself is optimized for speed, with minimal backtracking, and correctly handles all reasonable input. I just wish the expression itself was a little more readable by us non-machines. Hmm, I wonder if ... maybe later...

So here's the main driver with references to the abstracted out multicore functionality, and the gnarly regular expression:

package net.waldin.wf

import java.io.{FileInputStream,BufferedInputStream}

import scala.compat.Platform.currentTime
import scala.util.matching.Regex._

import net.waldin.mc.{Counter,Matcher,Sorter}

/** A normal HTML report: Top 10 popular URLs by hits and bytes, top 10 404s, *
 *  top 10 client IPs by hits, and the top 10 referrers.  Only, skip the HTML */
object WideFinder2c extends Application {
    val hits, bytes, s404s, refs = new Counter()
    val clients = new Counter(12*1024*1024,96)
    val logEntryRegex = 
            ("""(?m)\n(\S+) - \S+ \[[^\]]+\] "([^\\"]+(?:\\.[^\\"]*)*)" (\d+) """+
            """(-|\d+) "([^\\"]+(?:\\.[^\\"]*)*)" "([^\\"]+(?:\\.[^\\"]*)*)"$""").r
    val requestRegex = """GET ((/ongoing/When/\d+x/\d+/\d+/\d+/[^ .]+)|[^ ]+) """.r

    def record(host: String, article: String, ref: String) {
        if(article != null) {
            hits.inc(new String(article))
            clients.inc(host)
            if(ref.length() > 1 && !ref.startsWith("http://www.tbray.org/ongoing/")) 
                refs.inc(ref)
        }
    }

    def report(label: String, map: Counter, megabytes: Boolean) {
        printf("Top %s:\n", label)
        val keysByCount = new Sorter(map.keys, map.size, (a:String,b:String) => 
            (map(b) - map(a)) match {
                case 0 => a.compareTo(b)
                case n => n.toInt
            }
        )
        for(key <- keysByCount.take(10)) {
            val format = if(megabytes) "%10.1fM: %s\n" else "%11.0f: %s\n"
            val value: Double = if(megabytes) map(key)/(1024.0d*1024.0d) else map(key)
            printf(format, value, if(key.length>60) key.substring(0,60) + "..." else key)
        }   
        println
    }

    override def main(args: Array[String]) {
        new Matcher(new FileInputStream(args(0)), logEntryRegex, "\n", "", _ match {
            case Some(m) if(m.group(2).startsWith("GET")) => {
                def f(i:Int) = new String(m.group(i))
                val (host, request, status, size, ref) = 
                        (f(1), f(2), f(3).toInt, if(f(4)=="-") 0 else f(4).toInt, f(5))
                // avoid MatchIterator.matchData because it forces, lots of overhead
                val reqMatch = requestRegex.findAllIn(request)
                if(reqMatch.hasNext) {
                    val uri = new String(reqMatch.group(1))
                    // workaround for http://lampsvn.epfl.ch/trac/scala/ticket/983
                    val article = if(reqMatch.start(2) == -1) null else reqMatch.group(2)
                    status match {
                        case 200 => 
                            bytes.inc(uri, size)
                            record(host, article, ref)
                        case 304 => record(host, article, ref)
                        case 404 => s404s.inc(uri)
                        case _ =>
                    }
                }
            } 
            case Some(_) => // not GET, ignore
            case None =>    // matcher is done
                report("URIs by hit", hits, false)
                report("URIs by bytes", bytes, true)
                report("404s", s404s, false)
                report("client addresses", clients, false)
                report("refs", refs, false)
                printf("[total %,dms]\n", currentTime - executionStart)             
                exit
        }).start
    }
}

As it stands now, this code takes approximately 20 minutes to get through the entire 42GB dataset. Here are some times from a recent run:

        real    20:53.8
        user  8:19:50.2
        sys     28:13.9

That comes out to about 2500% cpu utilization, so hopefully there's still some room for improvement. I'll review Sorter, Counter, and Matcher in some future posts.