Sunday, July 10, 2005

lab 35 - Geryon, another Inferno grid


lab 35 - Geryon, another Inferno grid


Grid computing is everywhere it seems, and one of the obvious applications of Inferno is grid computing. As usual, I know little about it aside from reading a few papers to get me enthused enough to make some effort in that area. If I read all the current research I'd have little time to do the programming. So instead I'll jump in and see if I can swim. I'm trying to setup a simple grid, with tools for MapReduce functionality. I'm calling this system Geryon (because things need names).

This first stab is to see what can be done with the pieces already available in the system. And from this working prototype find out what pieces need to be written to fill in the gaps or improve it in any way.

Geryon is too large to cover in one blog entry so I'll be posting it piecemeal. I'll setup a wiki page to cover the whole setup and running of Geryon as I get further along.


I'll start with an overview of the system. I have a small number of machines running a mixed set of operating systems: Plan9, Linux, and Windows. On each machine is running one or more emu instances. One instance is running the registry. All emu instances must use that registry service, which is key to the operation of Geryon. Every emu instance must run one rstyxd service for the cpu command. Every service gets registered. So all computation is distributed by using the cpu command to send command blocks.

For storing data, I use a large number of chunks, which are files of equal size (64MB), and each file is used as the storage for kfs, which is registered as a styx service.

On only one emu instance, the master, is a ndb file that maps registered chunk services to "disk" names. This is edited manually. A disk name is of the form, where n is the number of the disk and app is an application name, e.g. d0.mbox. An application's data is assumed to be quite regular in it's layout on the disk and to span many disks.

Running a modified cpu command takes the form

% rcpu {ls /n/d0.mbox} d0.mbox

where rcpu runs cpu on a node of it's choosing, will mount the disk and run the given command block. The disk can be mounted from any node in the cluster by looking up its address in the registry. rcpu also uses the registry to find the available list of cpu nodes.

As well as there being nodes storing application data, each cpu has a set of registered disks for storing the output from commands running locally, e.g., d0.cpu ... dn.cpu. rcpu automatically mounts a disk local to the cpu node running the command block.

On top of this shell function I built a mapreduce shell function of the form,

% mapreduce {idx /n/d?.mbox/*/*/*}
        {agg} d0.mbox d1.mbox d2.mbox d3.mbox

The mapreduce command takes a map and reduce command block and a list of disks as input. Mapreduce uses rcpu to run the map command block on each disk on a different cpu (selected in round robin fashion). The mapper generates a key-value pair for all data stored in the files which it navigates on its own. The reducer command block takes as input the key-value pairs which are already sorted and performs a reduce function writing to stdout another key-value (see the MapReduce paper for a clearer explanation). The return value from mapreduce is the list of disks where all the output files are stored. Similar to the description in the MapReduce paper, an intermediate hash function is used to partition the data output from the mapper. And the data is collected and sorted for each partition before being sent to the reducer. The command lines for these are nothing more than, e.g.,

% {mapper} |intermediate 5 /n/d0.cpu/$job.inter.

where intermediate runs a hash function on input and writes to output files with a prefix given in the second argument, and

% cat /n/d?.cpu/$job.inter.0 | sort | 
     {reducer} > /n/d1.cpu/$job.partion.0

Notice that for the reducer all the cpu disks containing intermediate files are mounted. So a cat across a number of disks is performed as

% rcpu {cat /n/*/filename} d0.cpu d1.cpu d2.cpu d3.cpu 

I'll describe the shell code in more detail next post, and a link to a web page describing the whole setup.

No comments: