Wrap your SQL head around Riak's Map-Reduce

by Sean Cribbs

UPDATE 2010-03-05: There were some major conceptual errors in the code and design of this post, mainly that map phases must return bucket/key pairs if followed by another map phase. This has been fixed.

One of the conceptual issues you’ll face when switching your persistence layer from SQL over to Riak is grokking map-reduce, so let’s take a pretty predictable query from SQL and convert it to Riak’s Javascript map-reduce.

SELECT addresses.state, COUNT(*)
  FROM people INNER JOIN addresses ON people.id = addresses.person_id
  WHERE people.age < 18
  GROUP BY addresses.state

This query finds how many minors are in each state, using a join and a grouping. Now, assuming you stored this data as JSON in Riak, you’d probably denormalize that belongs_to :person relationship by storing the address(es) in the person record, effectively removing the join. Let’s build a plan for this query:

  1. Query over all the objects in the ‘people’ bucket (input)
  2. Filter out the people 18 years or older (map)
  3. Add a counter for the person’s state (map)
  4. Sum those counts for each state (reduce)

Now, you could easily collapse those map phases, but for clarity I’m going to keep them separate. Since each map-reduce job is submitted as a JSON object, I’ll build that object in a couple of snippets which we’ll join together at the end. First the input, which should either be a list of bucket/key pairs or in our case, the name of a bucket:

{"input":"people",

Riak lets you specify each map or reduce phase as a Javascript function literal, the name of a built-in, a bucket-key pair where the function is stored, or the module-function pair of an Erlang function. For our first phase, we’ll return only the objects for minors (line-breaks are added for clarity — you’d need to escape them for it to be valid JSON).

{"map":{
   "language":"javascript",
   "source":"function(value, keyData, arg){ 
     var data = Riak.mapValuesJson(value)[0]; // extract the object data as JSON
     if(data.age && data.age < 18) 
       return [[value.bucket, value.key]]; 
     else 
       return []; 
    }"
  }
},

This code might seem familiar if you’ve done CouchDB views, except that you don’t call emit() with key/value data, you just return a list of the values you want. Since we have another map phase following this one, we need to return bucket/key pairs. Now let’s extract the state name and count:

{"map":{
   "language":"javascript",
   "source":"function(value, keyData, arg){
     var data = Riak.mapValuesJson(value)[0];
     if(data.address && data.address.state) 
       return [{data.address.state: 1}]; 
     else 
       return []; 
    }"
  }
},

Map functions only work on one item at a time, so we only add one to the count. Now, the output from this phase will look something like: [{"AL":1},{"CO":1},{"NY":1}...]. Unlike our previous phase, since the results of this go directly to a reduce phase, we can return anything. Our final reduce phase will merge those JSON objects into a single one, summing the counts.

{"reduce":{
   "language":"javascript",
   "source":"function(values, arg){ 
     return values.reduce(function(acc, item){
       for(state in item){
         if(acc[state])
          acc[state] += item[state];
         else
          acc[state] = item[state];
       }
       return acc;
     });
    }"
  }
}]}

Now if we submit a POST request to the map-reduce resource (/mapred by default), we can get the results of this query. Here’s the complete POST request body (with line-breaks maintained for clarity):

{ "input":"people",
  "query":[
    {"map":{"language":"javascript", "name":"Riak.mapValuesJson"}},
    {"map":{
       "language":"javascript",
       "source":"function(value, keyData, arg){ 
         var data = Riak.mapValuesJson(value);
         if(data.age && data.age < 18) 
           return [[value.bucket, value.key]]; 
         else 
           return []; 
        }"
      }
    },
    {"map":{
       "language":"javascript",
       "source":"function(value, keyData, arg){ 
         var data = Riak.mapValuesJson(value);
         if(data.address && data.address.state) 
           return [{data.address.state: 1}]; 
         else 
           return []; 
        }"
      }
    },
    {"reduce":{
       "language":"javascript",
       "source":"function(values, arg){ 
         return values.reduce(function(acc, item){
           for(state in item){
             if(acc[state])
              acc[state] += item[state];
             else
              acc[state] = item[state];
           }
           return acc;
         });
        }"
      }
    }
  ]
}

This is just a taste of the kinds of queries and data workflows you can create inside Riak. I hope it’s obvious that it will be easy to generalize the Javascript map and reduce functions we created above, and to build up a library of them that can tackle many problems in a composable, modular way.

© 2006-present Sean CribbsGithub PagesTufte CSS