MapReduce Combiner Example (HBase)
The combiner is a reduce type function that takes place during the map operation. This allows for local data reduction while the map output is still in memory and therefore reduces the amount of disk I/O.
The following is an example where we used a combiner to help us find the top K articles with the most revisions made for any given timerange. We didn't want to have two MapReduce jobs for the query so the Combiner helped us achieve our goal.
This is the schema we used for our HBase table:
Key : Timestamp
Values (Composite) : article_id + revision_id [One per column for any given timestamp]
We are running Hadoop 2.0.0
The following is an example where we used a combiner to help us find the top K articles with the most revisions made for any given timerange. We didn't want to have two MapReduce jobs for the query so the Combiner helped us achieve our goal.
This is the schema we used for our HBase table:
Key : Timestamp
Values (Composite) : article_id + revision_id [One per column for any given timestamp]
We are running Hadoop 2.0.0
Imported Classes
To make what we have done more readable, rather than displaying what we imported for each class, here is a list of what we used across the whole project. If you are using Eclipse then hit Ctrl + Shift + O and it will organise your imports automatically for you anyway. I may be missing a few imports, but you get the jist.
import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner;
Setup Class
This is where all the Map & Reduce classes are assigned and HBase objects are created and where the program is run.
public class IWikiQuery4 extends Configured implements Tool {
private static Configuration conf;
public int run(String[] args) throws Exception {
if (args.length < 3)
return 1;
conf = HBaseConfiguration.create(getConf());
conf.addResource("all-client-conf.xml");
conf.set("mapred.jar", "file:///[LOCATION OF JAR FILE]");
conf.set("startTime", args[0]); //User input
conf.set("endTime", args[1]);
conf.set("k", args[2]);
Job job = new Job(conf);
job.setJarByClass(IWikiQuery4.class);
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes("[COLUMN FAMILY DATA BEING READ FROM]")); //Column family to read from
scan.setStartRow(Bytes.toBytes(start)); //Filter by date here
scan.setStopRow(Bytes.toBytes(end + 1));
scan.setCaching(500); //Should always set high
scan.setCacheBlocks(false); // Always set this to false for MR jobs!
TableMapReduceUtil.initTableMapperJob("[OUTPUT COLUMN FAMILY NAME]", scan, IQueryMapper4.class, LongWritable.class, Text.class, job);
job.setCombinerClass(IQueryCombiner4.class);
TableMapReduceUtil.initTableReducerJob("[OUTPUT COLUMN FAMILY NAME]", IQueryReducer4.class, job);
job.setNumReduceTasks(10);
return job.waitForCompletion(true) ? 0 : 1;;
}
public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(new IWikiQuery4(), args));
}
}
public class IWikiQuery4 extends Configured implements Tool { private static Configuration conf; public int run(String[] args) throws Exception { if (args.length < 3) return 1; conf = HBaseConfiguration.create(getConf()); conf.addResource("all-client-conf.xml"); conf.set("mapred.jar", "file:///[LOCATION OF JAR FILE]"); conf.set("startTime", args[0]); //User input conf.set("endTime", args[1]); conf.set("k", args[2]); Job job = new Job(conf); job.setJarByClass(IWikiQuery4.class); Scan scan = new Scan(); scan.addFamily(Bytes.toBytes("[COLUMN FAMILY DATA BEING READ FROM]")); //Column family to read from scan.setStartRow(Bytes.toBytes(start)); //Filter by date here scan.setStopRow(Bytes.toBytes(end + 1)); scan.setCaching(500); //Should always set high scan.setCacheBlocks(false); // Always set this to false for MR jobs! TableMapReduceUtil.initTableMapperJob("[OUTPUT COLUMN FAMILY NAME]", scan, IQueryMapper4.class, LongWritable.class, Text.class, job); job.setCombinerClass(IQueryCombiner4.class); TableMapReduceUtil.initTableReducerJob("[OUTPUT COLUMN FAMILY NAME]", IQueryReducer4.class, job); job.setNumReduceTasks(10); return job.waitForCompletion(true) ? 0 : 1;; } public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new IWikiQuery4(), args)); } }
Mapper Class
public class IQueryMapper4 extends TableMapper<LongWritable, Text> {
public void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
//Get value from column
byte[] colValue = value.getValue(Bytes.toBytes("[INPUT COLUMN FAMILY]"), Bytes.toBytes("Value");
//Out Key: a_id, Value: rev_id
context.write(new LongWritable(Bytes.toLong(colValue, 0, 8)), new Text(Bytes.toLong(colValue, 8, 8) + ""));
}
}
}
public class IQueryMapper4 extends TableMapper<LongWritable, Text> { public void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {//Get value from columnbyte[] colValue = value.getValue(Bytes.toBytes("[INPUT COLUMN FAMILY]"), Bytes.toBytes("Value"); //Out Key: a_id, Value: rev_id context.write(new LongWritable(Bytes.toLong(colValue, 0, 8)), new Text(Bytes.toLong(colValue, 8, 8) + "")); } } }
Combiner Class
You must remember that whatever type the output of the Mapper is, The combiner must also output the same. If the Output Key of the Mapper is Text then the Combiner cannot output the Key to be LongWritable.public class IQueryCombiner4 extends TableReducer<LongWritable, Text, LongWritable> { public void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { int i = 0; //Count rev_id's for (Iterator<Text> it = values.iterator(); it.hasNext(); it.next()) i++; //Send all to same reducer. Key: 1, Value: a_id + revCount context.write(new LongWritable(1), new Text(key + "\t" + i)); } }
Reducer Class
public class IQueryReducer4 extends TableReducer<LongWritable, Text, ImmutableBytesWritable> { public void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); int kValue = Integer.parseInt(conf.get("k").trim()); ArrayList<ArrayList<Long>> list = new ArrayList<ArrayList<Long>>(); String tmp = ""; int i; boolean found = false; //Create top k list, if item is large enough add it in correct place and remove the last item for (Iterator<Text> it = values.iterator(); it.hasNext();) { tmp = it.next().toString().trim(); for (i = 0; (list.isEmpty()) || (i < list.size()); i++) { if ((list.isEmpty()) || (Integer.parseInt(tmp.split("\t")[1].trim()) >= list.get(i).get(1))) { // if top k-List is empty or new record is in top k-List, break found = true; break; } } // add pair to top k-List if (found) { ArrayList<Long> map = new ArrayList<Long>(); map.add(0, Long.parseLong(tmp.split("\t")[0].trim())); map.add(1, Long.parseLong(tmp.split("\t")[1].trim())); list.add(i, map); } // make sure top k-List stays of size k if (list.size() > kValue) list.remove(list.size()-1); found = false; } for (int x = 0; x < list.size(); x++) { Put put = new Put(Bytes.toBytes(x + 1)); put.add(Bytes.toBytes("[OUTPUT COLUMN FAMILY]"), Bytes.toBytes("article"), Bytes.toBytes(list.get(x).get(0))); put.add(Bytes.toBytes("[OUTPUT COLUMN FAMILY]"), Bytes.toBytes("revCount"), Bytes.toBytes(list.get(x).get(1))); //Output Key: Rank_number, Value: article_id + revCount context.write(new ImmutableBytesWritable(Bytes.toBytes(x + 1)), put); } } }
There we have it. I am no expert at MapReduce or HBase but thought since the documentation isn't the most helpful that this example might be of use to somebody.
No comments:
Post a Comment