Monday, July 18, 2005

lab 37 - Geryon's mapreduce


lab 37 - Geryon's mapreduce


I have a registry of kfs disks and cpus in my cluster, and the registry is mounted on every node. Upon this infrastructure I've written some shell scripts to simulate a MapReduce function.

I want to quantize and bound the data and the computation. To that end, a process works on only one 64MB disk for input, and may write to another disk for output. The spliting of the data is also crucial to the parallelization. Each input disk can be handled concurrently on any cpu node in the cluster. The namespace for a process is built from finding resources in the registry.

Because there are several cpus available to run a command block, I choose the cpu in round robin using a shell function to populate a list of the available cpu nodes, and take the head of the list with each subsequent call until the list is empty. [1]

subfn nextcpu {
  if {~ $#cpulist 0} {
    cpulist=`{ndb/regquery -n svc rstyx}
  result = ${hd $cpulist}
  cpulist = ${tl $cpulist}

I do a similar thing for finding a disk local to the cpu for output files. Finddisk takes the host name and returns the head of the list of available disks on that host.

Together these two functions are pretty much all the intelligence in selecting nodes for work and allocating disks. Obviously, more could be done.

Now for the definition of rcpu. Using the above functions, it picks the next cpu and output disk. Then it constructs a command block to send to that node. The command block does some initialization such as mounting the disks and running a gridrc file from the users lib directory. As part of the rstyx protocol the client device exports its namespace which is mounted at /n/client; this provides an easy way of distributing shell and dis code to every node. The gridrc file includes the shell function definition for rmnt. [lab 36] The constructed block contains the command block passed as argument to the function. It runs in the background and returns the name of the cpu disk it allocated for output.

fn rcpu {
 disk:=${tl $*}
 (net host port) := ${split '!' $cpu}
 disk = ${finddisk $host} $disk
 s=${parse '{cpudisk=/n/' ^ ${hd $disk} ^'
 run /n/client/usr/caerwyn/lib/gridrc
 rmnt ' ^ $"disk ^ ' 
 ' ^ $cmd ^ '}'}
 cpu $cpu sh -c $s &
 rpid=$apid $rpid
 echo ${hd $disk}

From this building block I construct the mapreduce (see the file gridlib for its definition). Just to remind you of how it is invoked,

% (mapreduce {idx /n/d?.mbox/*/*/*} {agg} 
   d0.mbox d1.mbox d2.mbox d3.mbox)

This example command counts the word frequency of all files on all mbox disks. An mbox disk is a kfs filesystem containing one text-only mail message per file. The example map command block uses shell pattern matching to traverse the disk and read every file. It writes to standard output a key-value pair, which in this case is a word and the value "1".

The reduce block is a filter that reads key-value lines from stdin and writes key-values to stdout. In this case, agg sums the value field for each distinct key then writes the word and the total word count.

Mapreduce interposes between the map and reduce an intermediate command that hashes the keys into a number of partitions, and writes the key-value to a partition file on the local cpu disk. Mapreduce then needs to concatenate all the intermediate partition files for one partition into a single sorted partition for input into the reduce block. The reduce command block then has all the values for a key as a contiguous stream of lines.

Mapreduce runs the map command block once for each disk passed as argument concurrently. It waits for the map workers to finish then runs the reduce block for each partition concurrently. The number of partitions is hardcoded at the moment, but should be configurable. The result is a list of disks on which the result partition files are stored.

Well that's it. Almost all implemented in inferno shell code. Many things are not handled such as fault tolerance. But this is a starting point for a working grid function. The next step, for me, is to collect a large sample data set to experiment with.



[1] A true closure with lexical binding would be ideal here. I really need to finish the symbolic shell language I started. Sending a closure with it's environment to a remote host is an appealing idea I need to explore further.

Wednesday, July 13, 2005

lab 36 - Geryon's registry


lab 36 - Geryon's registry


A critical piece of Geryon is the registry of disks and cpus. One of the immediate problems to deal with when setting up a grid cluster is the fact that the nodes come and go including the one holding the registry.

To deal with one aspect of this, the registry restarting, I modified grid/register and grid/reglisten commands from Inferno to monitor the availability of the registry, remount it if neccessary and re-announce the service.

I use grid/register for the disk services. Here is an example of how I create a new chunk and register a new kfs disk.

% zeros 1024 65536 > /chunks/chunk0
% (grid/register -a id chunk0 
{disk/kfs -rPW /chunks/chunk0})

I do this a number of times on each node that has spare disk capacity. A simple script registers all disks when I restart a node. All these disks will appear as services in the registry identified as tcp!hostname!port with some attributes including the chunk identifier, which should be unique for the host.

The next step is to name each disk. For this I use ndb. I add a file, /lib/ndb/cluster, to the ndb database with entries of the form

name=d0.mbox kfs 

name=d1.mbox kfs 

name=d0.replica kfs 
  master=host1!chunk0 replica

name=d1.replica kfs 
  master=host1!chunk1 replica

The first field is the disk name, which is unique for the cluster. The master is the chunk running on the host that serves kfs for this disk. The replica field identifies a backup disk. I hope in the future to make if possible to dynamically switch between master and replicas and use replicas during computation. But I'll skip it for now. I replicate disks by using Inferno's applylog and updatelog tools.

Once this is all in ndb I can run a script that will update the registry with the disk names.

fn refresh {
 names=`{ndb/query -a  kfs '' name}

 for (i in $names) {
  (host chunk) = ${split ! `{ndb/query name $i master}}
  addr = `{ndb/regquery -n name $host id $chunk}
  if {ftest -e /mnt/registry/ ^$i} {
   (echo host $host automount 1 persist 1 
     addr $addr replica 
     `{ndb/query name $i replica}> /mnt/registry/^$i)
  } {
   (echo $i host $host automount 1 persist 1
     addr $addr replica
     `{ndb/query name $i replica}> /mnt/registry/new)

This needs to be run when the ndb file or list of registered services changes. So ideally this should be automatic. I can quite easily see that happen, either by building a ndb file inside the registry and have it respond to changes, or implement an events file in the registry, and attach a process to that. This is a problem to work on later.

Once registered, these disks can be used from any node within the cluster. For example, I use a shell function to take a disk name and mount it as /n/diskname,

% fn rmnt {
 for (file in $*) {
  (disk rest ) := `{cat /mnt/registry/$file}
  while {! ~ $#rest 0} {
   (name val tail) := $rest
   if { ~ $name 'addr'} {mount -c $val /n/ ^ $disk}
   rest = $tail
% rmnt d0.mbox d1.mbox
% ls /n/d?.mbox

To register a cpu service,

% grid/reglisten -r svc rstyx 'tcp!*!0' {runas $user auxi/rstyxd&}

This will announce on a new address and use that address as the service name in the registry. We can then get a list of all addresses of cpu service

% ndb/regquery -n svc rstyx

For both grid/register and grid/reglisten, the service names are automatically removed from the registry once the process exits. All connections are authenticated. For the kfs disks, they should all use the same /adm/users file, something that should be copied onto the disk when it is initialized, so that permissions are enforced consistently across the cluster.

So far we have all the services we need announced dynamically. We have a naming scheme and the infrastructure for running code anywhere in the cluster. What remains is the shell code to tie it together to build a simple mapreduce.

I'll put instructions for setting up Geryon on my inferno wiki.


Sunday, July 10, 2005

lab 35 - Geryon, another Inferno grid


lab 35 - Geryon, another Inferno grid


Grid computing is everywhere it seems, and one of the obvious applications of Inferno is grid computing. As usual, I know little about it aside from reading a few papers to get me enthused enough to make some effort in that area. If I read all the current research I'd have little time to do the programming. So instead I'll jump in and see if I can swim. I'm trying to setup a simple grid, with tools for MapReduce functionality. I'm calling this system Geryon (because things need names).

This first stab is to see what can be done with the pieces already available in the system. And from this working prototype find out what pieces need to be written to fill in the gaps or improve it in any way.

Geryon is too large to cover in one blog entry so I'll be posting it piecemeal. I'll setup a wiki page to cover the whole setup and running of Geryon as I get further along.


I'll start with an overview of the system. I have a small number of machines running a mixed set of operating systems: Plan9, Linux, and Windows. On each machine is running one or more emu instances. One instance is running the registry. All emu instances must use that registry service, which is key to the operation of Geryon. Every emu instance must run one rstyxd service for the cpu command. Every service gets registered. So all computation is distributed by using the cpu command to send command blocks.

For storing data, I use a large number of chunks, which are files of equal size (64MB), and each file is used as the storage for kfs, which is registered as a styx service.

On only one emu instance, the master, is a ndb file that maps registered chunk services to "disk" names. This is edited manually. A disk name is of the form, where n is the number of the disk and app is an application name, e.g. d0.mbox. An application's data is assumed to be quite regular in it's layout on the disk and to span many disks.

Running a modified cpu command takes the form

% rcpu {ls /n/d0.mbox} d0.mbox

where rcpu runs cpu on a node of it's choosing, will mount the disk and run the given command block. The disk can be mounted from any node in the cluster by looking up its address in the registry. rcpu also uses the registry to find the available list of cpu nodes.

As well as there being nodes storing application data, each cpu has a set of registered disks for storing the output from commands running locally, e.g., d0.cpu ... dn.cpu. rcpu automatically mounts a disk local to the cpu node running the command block.

On top of this shell function I built a mapreduce shell function of the form,

% mapreduce {idx /n/d?.mbox/*/*/*}
        {agg} d0.mbox d1.mbox d2.mbox d3.mbox

The mapreduce command takes a map and reduce command block and a list of disks as input. Mapreduce uses rcpu to run the map command block on each disk on a different cpu (selected in round robin fashion). The mapper generates a key-value pair for all data stored in the files which it navigates on its own. The reducer command block takes as input the key-value pairs which are already sorted and performs a reduce function writing to stdout another key-value (see the MapReduce paper for a clearer explanation). The return value from mapreduce is the list of disks where all the output files are stored. Similar to the description in the MapReduce paper, an intermediate hash function is used to partition the data output from the mapper. And the data is collected and sorted for each partition before being sent to the reducer. The command lines for these are nothing more than, e.g.,

% {mapper} |intermediate 5 /n/d0.cpu/$job.inter.

where intermediate runs a hash function on input and writes to output files with a prefix given in the second argument, and

% cat /n/d?.cpu/$job.inter.0 | sort | 
     {reducer} > /n/d1.cpu/$job.partion.0

Notice that for the reducer all the cpu disks containing intermediate files are mounted. So a cat across a number of disks is performed as

% rcpu {cat /n/*/filename} d0.cpu d1.cpu d2.cpu d3.cpu 

I'll describe the shell code in more detail next post, and a link to a web page describing the whole setup.

Saturday, July 09, 2005

usual disclaimer

I do not know what I am doing. The point about writing the software is to learn about a problem. This means the software is not likely to be of practical use, though it maybe, I don't know.

Why don't I go and read the research papers on the subject? Reading one or two is fine, and I do, and they help. But reading too much means all I'm doing is reading and not coding. Hands on experience is learning, maybe at a slower rate than reading, but at a much deeper level. The fun is in the coding. Coding generates ideas. After coding, reading the research papers becomes much more valuable, because I now have real experience to compare against.

I'm doing it for fun. This should be called a fun lab or something. It is a lab for doing experiments and writing about them, or in the words of Homer Simpson, "It's just a bunch of stuff that happens."

Friday, July 01, 2005

lab 34 - lexis: semantic binary model implementation


lab 34 - lexis: semantic binary model implementation


The code linked to below is a database implementation based on the paper, A File Structure for Semantic Databases by N. Rishe. The paper is brief and required reading for the rest of this lab.

The application is in three modules: the Btree, the SBM abstraction called lexis, and the query module.

The Btree implementation stores key-value pairs. This Btree is a large improvement over my previous attempt, tickfs, in that a prefix length is included for each entry which improves storage efficiency. Binary search is used when searching within a block making it faster , especially considering the increase in the number of entries per block. The caching and locking mechanism have also changed, and are described in earlier labs.

Lexis is the SBM abstraction layer that sits on top of the Btree. It does not use the value field of the Btree as all information is stored in the key. The abstractions exported are Facts, which include Categories, Relations, and Attributes. The last is a small difference to the Rishe model. Attributes are relations of abstract objects to concrete objects (array of byte, which is uninterpreted by lexis). Relations are between abstract objects only. Each fact is stored in normal and inverted form as described in the paper.

Finally, there is the query module. All the query abstractions from the paper are implemented. However, the approach used for combining operators to build a complex query expression is based on data streams and communicating processes. The idea is described in D. McIlroy's, Squinting the Power Series. See also lab 31. Here's an example of inserting data to lexis and querying it.

% cd lab/34
% mkdir /dis/folkfs
% mk install
% mkdir /lib/lexis
% > /lib/lexis/       # the lockfile
% >
% folkfs/tok -i *.[bm]      # index all the source files

Now query for the symbol schema that is on the right side of the relation Hit,

% folkfs/get -q 'Hit schema ?Ra'

The query string is tokenized. Each token is either an operator (a?, aRy, aR?, ?Ra, a??, aC), an abstract object, or a concrete object. Each operator has the function prototype:

op(U: chan of list of ref Dat): chan of list of ref Dat;

The operator will create a new channel and spawn another process that receives from U and sends a result down the created channel. So in functional form it looks like:

?Ra(push('fact', push('Hit', startchan)))

In effect, for each token we get a spawned process that moves in lock step with the other processes receiving from an input channel and sending to an output channel.

An operator such as ?Ra will pop the Relation and abstract object symbols from the stack, search the Btree for all matching Facts and push the abstract or concrete object back onto the stack. This happens for each input from the channel so the channel is a stream of stacks.

This is a starting point for further experimentation with the data streams style of programming. Also, lexis will now serve as a database backend for some future file systems for web services.