Sunday, 22 December 2013

MapReduce Combiner Example (HBase)

MapReduce Combiner Example (HBase)

The combiner is a reduce type function that takes place during the map operation. This allows for local data reduction while the map output is still in memory and therefore reduces the amount of disk I/O.  

The following is an example where we used a combiner to help us find the top K articles with the most revisions made for any given timerange.  We didn't want to have two MapReduce jobs for the query so the Combiner helped us achieve our goal.

This is the schema we used for our HBase table:

Key : Timestamp 
Values (Composite) : article_id + revision_id [One per column for any given timestamp]

We are running Hadoop 2.0.0

Imported Classes

To make what we have done more readable, rather than displaying what we imported for each class, here is a list of what we used across the whole project.  If you are using Eclipse then hit Ctrl + Shift + O and it will organise your imports automatically for you anyway.  I may be missing a few imports, but you get the jist.
import java.text.ParseException;
import java.text.SimpleDateFormat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

Setup Class

This is where all the Map & Reduce classes are assigned and HBase objects are created and where the program is run.
public class IWikiQuery4 extends Configured implements Tool {
 private static Configuration conf;
 public int run(String[] args) throws Exception {
  if (args.length < 3)
   return 1;
  conf = HBaseConfiguration.create(getConf());

  conf.set("mapred.jar", "file:///[LOCATION OF JAR FILE]");

  conf.set("startTime", args[0]);  //User input
  conf.set("endTime", args[1]);
  conf.set("k", args[2]);

  Job job = new Job(conf);
  Scan scan = new Scan();
  scan.addFamily(Bytes.toBytes("[COLUMN FAMILY DATA BEING READ FROM]"));  //Column family to read from
  scan.setStartRow(Bytes.toBytes(start));  //Filter by date here
  scan.setStopRow(Bytes.toBytes(end + 1));
  scan.setCaching(500);   //Should always set high
  scan.setCacheBlocks(false); // Always set this to false for MR jobs!
  TableMapReduceUtil.initTableMapperJob("[OUTPUT COLUMN FAMILY NAME]", scan, IQueryMapper4.class, LongWritable.class, Text.class, job);
  TableMapReduceUtil.initTableReducerJob("[OUTPUT COLUMN FAMILY NAME]", IQueryReducer4.class, job);

  return job.waitForCompletion(true) ? 0 : 1;;

 public static void main(String[] args) throws Exception {
  System.exit( IWikiQuery4(), args));

Mapper Class

public class IQueryMapper4 extends TableMapper<LongWritable, Text> {

 public void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
                        //Get value from column
                        byte[] colValue = value.getValue(Bytes.toBytes("[INPUT COLUMN FAMILY]"), Bytes.toBytes("Value");

   //Out Key: a_id,  Value: rev_id
   context.write(new LongWritable(Bytes.toLong(colValue, 0, 8)), new Text(Bytes.toLong(colValue, 8, 8) + ""));

Combiner Class

You must remember that whatever type the output of the Mapper is, The combiner must also output the same.  If the Output Key of the Mapper is Text then the Combiner cannot output the Key to be  LongWritable.
public class IQueryCombiner4 extends TableReducer<LongWritable, Text, LongWritable> {

 public void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
  int i = 0;
  //Count rev_id's
  for (Iterator<Text> it = values.iterator(); it.hasNext();
  //Send all to same reducer. Key: 1, Value: a_id + revCount
  context.write(new LongWritable(1), new Text(key + "\t" + i));

Reducer Class

public class IQueryReducer4 extends TableReducer<LongWritable, Text, ImmutableBytesWritable> {

 public void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
  Configuration conf = context.getConfiguration();
  int kValue = Integer.parseInt(conf.get("k").trim());

  ArrayList<ArrayList<Long>> list = new ArrayList<ArrayList<Long>>();
  String tmp = "";
  int i;
  boolean found = false;

  //Create top k list, if item is large enough add it in correct place and remove the last item
  for (Iterator<Text> it = values.iterator(); it.hasNext();) {
   tmp =;

   for (i = 0; (list.isEmpty()) || (i < list.size()); i++) {
    if ((list.isEmpty()) || (Integer.parseInt(tmp.split("\t")[1].trim()) >= list.get(i).get(1))) {
     // if top k-List is empty or new record is in top k-List, break
     found = true;

   // add pair to top k-List
   if (found) {
    ArrayList<Long> map = new ArrayList<Long>();
    map.add(0, Long.parseLong(tmp.split("\t")[0].trim()));
    map.add(1, Long.parseLong(tmp.split("\t")[1].trim()));
    list.add(i, map);
   // make sure top k-List stays of size k
   if (list.size() > kValue) list.remove(list.size()-1);

   found = false;

  for (int x = 0; x < list.size(); x++) {
   Put put = new Put(Bytes.toBytes(x + 1));
   put.add(Bytes.toBytes("[OUTPUT COLUMN FAMILY]"), Bytes.toBytes("article"), Bytes.toBytes(list.get(x).get(0)));
   put.add(Bytes.toBytes("[OUTPUT COLUMN FAMILY]"), Bytes.toBytes("revCount"), Bytes.toBytes(list.get(x).get(1)));
   //Output Key: Rank_number, Value: article_id + revCount
   context.write(new ImmutableBytesWritable(Bytes.toBytes(x + 1)), put);

There we have it.  I am no expert at MapReduce or HBase but thought since the documentation isn't the most helpful that this example might be of use to somebody.

No comments:

Post a Comment