lab 40 - distributing data

NAME

lab 40 - distributing data

NOTES

The first problem I had to deal with once extracting the data from the reality mining data set was how to distribute it across all the disks. So this lab describes how I did it.

The Geyron grid I have at home is a simulation grid. I'm just pretending that I have 16 real disks, when I actually only have two. [1] The sizes of things are also scaled down, so each disk is only 64MB instead of a more typical 40GB.

I divided each disk into 1MB chunks for this simulation. On each disk I created 60 files numbered 0.ckfree ... 59.ckfree. Each empty file is a record that an available chunk is on this disk. I don't use 64 because I leave room for slop. Each chunk is going to be slightly over 1MB. I do all this easily enough using my friend, the registry:

% for (i in `{ndb/regquery -n resource kfs}) {
    mount $i /n/remote
    for (j in `{seq 0 59}) {
        > /n/remote/$j.ckfree
    }
  }

The idea behind this, as suggested in the google fs paper, is that each disk is the master record of used and free chunks that it contains. I scan all disks to update a cache for fast querying. (Seq in the code above is a sequence generator as in plan 9 and included in the files for this lab.)

I created a file2chan server, chunkqueue, that can allocate a free chunk from all the available chunks across all disks. Writes to the file add free chunks to memory. Reads from the file return the next chunk. The chunk is chosen by randomly selecting a disk, then taking the head of a chunk list. The aim is to uniformly distribute the repository across all disks. But this will only work on average. With the few disks I have I'll still get many chunks on the same disk.

A important goal is to get very high parallelism in my distributed file processing. Two truly concurrent processes are ones that are running on independent CPUs and reading from separate disks with no communication between the two. If I have two disks I want to split the file between the disks and process both chunks at the same time on different CPUs. Anytime I get several chunks for the same repository on the same disk, and there is no other replica of those chunks on other disks, I am loosing some amount of parallelism.

I announce chunkqueue to the registry. I can then scan all disks, just like I did above, and write all the free chunks to the chunkqueue.

% chunkqueue /tmp/chunkq/cq
% grid/reglisten -r svc chunkq 'tcp!*!0' {export /tmp/chunkq&}
% freechunkscan

Freechunkscan is in the new gridlib file.

A chunk handle is a cluster unique filename for the chunk. It is just a sequence number with .ck appended. When a free chunk is allocated the .ckfree file is removed and replaced with the nnn.ck file.

subfn allochunk {
 chunkid := `{next /n/master/next} ^ .ck
 (free disk) := `{read < /n/chunkq/cq}
 mount -c $disk /n/remote
 mv /n/remote/^$free /n/remote/^$chunkid
 echo $chunkid $disk >> /n/master/locator
 result=$chunkid
}

I assign one disk as the master file system that will keep a permanent record of chunks used by a repository file. The master is assigned in /lib/ndb/cluster and after a refresh shows up in the registry. A file on the master represents on repository file (BigFile) and contains a list of chunk handles. Also stored on the master is the mapping of chunk handle to disks, but this can be regenerated by periodically scanning all the disks. The mapping of chunks to disks is kept distinct from the repository-to-chunks mapping to allow for the possibility of chunks stored on multiple disks and chunk movement among disks. To locate a chunk I just grep the /n/master/locator file (see locatechunk in gridlib).

Now to tie it together with a command to write a file to the repository file.

CHUNKSIZE=1048576
fn putfiles {
 load expr
 pctl forkns
 rmnt master
 mount `{ndb/regquery -n svc chunkq} /n/chunkq
 bigfile:=$1
 files:=${tl $*}

 chunk := `{tail -1 $bigfile}
 if {~ $#chunk 0} {
  chunk = ${allochunk}
  echo $chunk >> $bigfile
 }
 mount -c ${locatechunk $chunk} /n/remote
 for(i in $files){
  size=0
  (perm device  inst owner group size rest) := `{ls -l /n/remote/$chunk}
  if { ntest ${expr $size $CHUNKSIZE gt} } {
   chunk = ${allochunk}
   echo $chunk >> $bigfile
   mount -c ${locatechunk $chunk} /n/remote
  }
  putwar $i |gzip >> /n/remote/$chunk
 }
}

I append files until I go over the CHUNKSIZE threshold. There are no files that span chunks. Each chunk is self contained, which I hope will make applying mapreduce easier.

When I extracted the data from the reality mining mysql dump I got over 200 1MB files. A database table, like callspan, would be broken up into about 20 files. To distribute the callspan table as one repository file across Geryon I ran the following,

% putfiles /n/master/callspan callspan*
% cat /n/master/callspan
0001.ck
0002.ck
0003.ck
...
% cat /n/master/locator
0001.ck tcp!oak!1850
0002.ck tcp!fir!10181
...

It should be possible to see how a rudimentary inferno cluster file system could be built using similar techniques, hiding much of the above functionality behind a clean file system interface. I'll likely try this at some point. The problem that always gets me is that once I allow files to span chunks, how do I cleanly handle breaking data up for mapreduce?

I still haven't addressed fault tolerance and replication of chunks yet. If anyone cares to have a stab at it ...

FILES

caerwyn.com/lab/40

FOOTNOTES

I hope to turn Geryon from a simulation into a modest working grid. And of course, I'll describe the progress of setting up the geryon here.

Comments

Unknown said…
Nice!

Tying this together with your following lab, could you cheat and maybe simplify by doling out chunks based on their hash, with each disk handling a separate section of the hash namespace? You should get a pretty even statistical distribution across disks, though it would complicate relocating chunks or adding additional disks.

It could make it easier to implement redundancy, though, and I can envision using creative namespace binding to retrieve chunks during disk failures.

Popular posts from this blog

lab 110 - inferno archive edition

lab 107 - midiplay

lab 111 - wavloop