Sunday, March 23, 2008

lab 84 - gridfs pattern (mapreduce)


lab 84 - gridfs pattern (mapreduce)


I've mentioned mapreduce in previous posts. It makes a good example application for thinking about grid computing. This lab is also about mapreduce although the point here is to illustrate an inferno pattern for grid computing. I'll call it here the gridfs pattern.

Say you have a grid of compute nodes and you want to distribute and coordinate work among them. For the gridfs pattern you construct a synthetic file system that will get exported to all the nodes. The file system is the master process and all clients to the file system are workers.

Both cpu(1) and rcmd(1) use the rstyxd(8) protocol that exports the local namespace when running remote jobs. To implement the gridfs pattern we bind our master fs into our local namespace so it gets exported when we run multiple workers across our compute grid.

A very simple example of this pattern is explained in the Simple Grid Tutorial Part 2. I export a named pipe with a provider process writing one line at a time to the pipe; then multiple worker processes running across the grid consume lines from the pipe as fast as they can do the work.

The mapreduce source files I've included in this lab are a concrete (rough and experimental) example of this pattern taken to the next level. The namespace it exports is the following,


Each worker opens the clone file and gets a unique connection to the master process, represented by a numbered directory. The open clone file becomes an open file descriptor to the ctl file of the new connection. The worker reads messages from ctl describing new work, and it can write back messages about work completed. The master process will keep the status file up to date with the progress of the worker, analogous to prog(3).

An advantage of this approach over the simpler named pipe is that the master process knows exactly when the worker has closed the connection and knows how much work they have completed based on the messages written to the ctl file. It also provides a better interface to the user; The ps(1) command can easily be adapted to read the status files from the mapreduce namespace.

To try out some examples using mapreduce I need to provide a mapper and reducer function. I wrote a module interface for a mapper,

Mapper : module {
    map: fn(key, value: string, emit: chan of (string, string));

This takes a key and value and maps it to an intermediate key and value, which it emits on a channel; it may emit many intermediate key value pairs for a single input key value pair. Here's an implementation for a mapper that takes a string input, tokenizes it, and outputs the token and '1', which will be added later for a wordcount.

# the map function may not get the whole file in one go. maybe
# just a segment, or a line.
map(nil, value: string, emit: chan of (string, string))
 if(sys == nil)
  sys = load Sys Sys->PATH;
 if(str == nil)
  str = load String String->PATH;
 (nil, f) := sys->tokenize(value, 
               "[]{}()!@#$%^&*?><\":;.,|\\-_~`'+=/ \t\n\r");
 for ( ; f != nil; f = tl f) {
  ss := str->tolower(hd f);
  emit <-= (ss, "1");

There is also an interface for a reducer,

Reducer : module {
    reduce: fn(key: string, input: chan of string, emit: chan of string);

This takes all the intermediate values for a key and emits a value. Here's the adder, used by the wordcount.

reduce(nil: string, v: chan of string, emit: chan of string)
 value := 0;
 while((s :=<- v) != nil)
  value += int s;
 emit <-= string value;

The mapper and reducer interfaces are known by a worker process that loads them on demand. An intermediate process that combines values of the same keys and sorts them is also implemented in the worker process (See the Google MapReduce paper for a good explanation.) This implementation of mapreduce knows only how to walk directory hierarchies and print the file names to all the worker processes. Here's an example of a mapreduce command line that counts words in all files below /lib/legal.

  % mkdir /mnt/mapreduce
  % mapreduce -M4 -R3 wordcount adder /lib/legal
  % ls /mnt/mapreduce

Mapreduce should launch and manage all its own processes. However, for the code checked into this lab, to illustrate what is going on, I have it launching nothing. It just mounts the file system on /mnt/mapreduce. The arguments '-M4 -R3' say to expect 4 Mapper processes and 3 Reducer processes. As workers connect it will configure it to be a mapper or reducer depending on whether work remains. Therefore, after running the above command and doing a cat(1) on /mnt/mapreduce/clone we should see the config line then the pathnames for the first worker.

  % cat /mnt/mapreduce/clone
  worker -m -R 3 -d wordcount -i 1
  /lib/legal/GPL 0 17982

The pathnames are divided up among the workers as fast as they can process them. So in this implementation mapreduce functions almost the same as the named pipe in the simple grid tutorial. The cat of the first clone file will return all pathnames!

Mapreduce however is still expecting more workers. Cat the clone file three more times to see the input to the next 3 workers. The next cat after that you should see the config and input to the reducer. For example from a remote node,

  % rcmd ${nextcpu} cat /n/client/mnt/mapreduce/clone

Doing a listing on the /mnt/mapreduce path should show you the current workers connected (if any). After all reducers have disconnected, the mapreduce filesystem will report it's done and exit.

Lets run it again for real using the mapreduce worker processes.

  % mapreduce -m /mnt/mapreduce -M4 -R3 wordcount adder /lib
  % for i in 1 2 3 4 {mapreduce/worker /mnt/mapreduce/clone&}
  % for i in 1 2 3 {mapreduce/worker /mnt/mapreduce/clone&}

You should see the result files in /tmp/out.*

For the GSoC 2008 I suggested a project where the student implement a splitjoin file system. Create a coarse grained splitjoin service as defined loosely here (PDF) (see slides 18 on for fine grained task parallelism). This suggested implementation is really another concrete example of the gridfs pattern. It would allow control over how messages are passed round robin to all the workers. It would permit different configurations of how many to push to each node, how many to join from each node, how many commands to duplicate. E.g.,

filter | splitjoin -d10 -m5 -n3 {cmd} | filter 

creates 10 duplicates of the cmd, take input from a pipeline and distributes m=5 records at a time round robin to each node and join the output n=3 records at a time from each task back out to the pipeline.

Splitjoin would take care of launching the task, and monitoring the task for completion. (Ideally, it would interact with the registry to decide where to launch services.)

Because Plan 9/Inferno is not participating this year in GSoC I will probably have a crack at this.


lab 84 code