Monday, July 18, 2005

lab 37 - Geryon's mapreduce

NAME

lab 37 - Geryon's mapreduce

NOTES

I have a registry of kfs disks and cpus in my cluster, and the registry is mounted on every node. Upon this infrastructure I've written some shell scripts to simulate a MapReduce function.

I want to quantize and bound the data and the computation. To that end, a process works on only one 64MB disk for input, and may write to another disk for output. The spliting of the data is also crucial to the parallelization. Each input disk can be handled concurrently on any cpu node in the cluster. The namespace for a process is built from finding resources in the registry.

Because there are several cpus available to run a command block, I choose the cpu in round robin using a shell function to populate a list of the available cpu nodes, and take the head of the list with each subsequent call until the list is empty. [1]

cpulist=()
subfn nextcpu {
  if {~ $#cpulist 0} {
    cpulist=`{ndb/regquery -n svc rstyx}
  }
  result = ${hd $cpulist}
  cpulist = ${tl $cpulist}
}

I do a similar thing for finding a disk local to the cpu for output files. Finddisk takes the host name and returns the head of the list of available disks on that host.

Together these two functions are pretty much all the intelligence in selecting nodes for work and allocating disks. Obviously, more could be done.

Now for the definition of rcpu. Using the above functions, it picks the next cpu and output disk. Then it constructs a command block to send to that node. The command block does some initialization such as mounting the disks and running a gridrc file from the users lib directory. As part of the rstyx protocol the client device exports its namespace which is mounted at /n/client; this provides an easy way of distributing shell and dis code to every node. The gridrc file includes the shell function definition for rmnt. [lab 36] The constructed block contains the command block passed as argument to the function. It runs in the background and returns the name of the cpu disk it allocated for output.

rpid=()
fn rcpu {
 cmd:=$1
 disk:=${tl $*}
 cpu:=${nextcpu}
 (net host port) := ${split '!' $cpu}
 disk = ${finddisk $host} $disk
 s=${parse '{cpudisk=/n/' ^ ${hd $disk} ^'
 run /n/client/usr/caerwyn/lib/gridrc
 rmnt ' ^ $"disk ^ ' 
 ' ^ $cmd ^ '}'}
 cpu $cpu sh -c $s &
 rpid=$apid $rpid
 echo ${hd $disk}
}

From this building block I construct the mapreduce (see the file gridlib for its definition). Just to remind you of how it is invoked,

% (mapreduce {idx /n/d?.mbox/*/*/*} {agg} 
   d0.mbox d1.mbox d2.mbox d3.mbox)

This example command counts the word frequency of all files on all mbox disks. An mbox disk is a kfs filesystem containing one text-only mail message per file. The example map command block uses shell pattern matching to traverse the disk and read every file. It writes to standard output a key-value pair, which in this case is a word and the value "1".

The reduce block is a filter that reads key-value lines from stdin and writes key-values to stdout. In this case, agg sums the value field for each distinct key then writes the word and the total word count.

Mapreduce interposes between the map and reduce an intermediate command that hashes the keys into a number of partitions, and writes the key-value to a partition file on the local cpu disk. Mapreduce then needs to concatenate all the intermediate partition files for one partition into a single sorted partition for input into the reduce block. The reduce command block then has all the values for a key as a contiguous stream of lines.

Mapreduce runs the map command block once for each disk passed as argument concurrently. It waits for the map workers to finish then runs the reduce block for each partition concurrently. The number of partitions is hardcoded at the moment, but should be configurable. The result is a list of disks on which the result partition files are stored.

Well that's it. Almost all implemented in inferno shell code. Many things are not handled such as fault tolerance. But this is a starting point for a working grid function. The next step, for me, is to collect a large sample data set to experiment with.

FILES

caerwyn.com/lab/37

Footnotes

[1] A true closure with lexical binding would be ideal here. I really need to finish the symbolic shell language I started. Sending a closure with it's environment to a remote host is an appealing idea I need to explore further.

1 comment:

demerzal said...

I was wondering if your site was more of an individual endeavor or if you were willing to accept submissions? I would be interested in submitting/posting some of the "labs" I will be working on.

Like yourself I have been doing some recent exploratory programming with Inferno to gain a better understanding of the system as a whole, and to also further my knowledge in certain CS areas.

I primarily focus on the graphics side of Inferno, with a focus on experimenting with different UI ideas I have. I must say that when I first grokked Acme I was blown away by it's utility.