Pages

Sunday, 22 December 2013

Adding/Delete Column Family (HBase)

Adding/Delete a Column Family (HBase)

private static Configuration conf;
//Choose conf settings

HBaseAdmin hAdmin = new HBaseAdmin(conf);
hAdmin.disableTable("[TABLE]"); //MUST DISABLE FIRST

//Delete column family
hAdmin.deleteColumn("[TABLE]", "[COLUMN FAMILY]");

//Add it back
HColumnDescriptor cfA = new HColumnDescriptor(Bytes.toBytes("iq4"));
hAdmin.addColumn("[COLUMN FAMILY]", cfA);
hAdmin.enableTable("1005063l");  //ALWAYS RE-ENABLE
hAdmin.close();  

MapReduce Combiner Example (HBase)

MapReduce Combiner Example (HBase)

The combiner is a reduce type function that takes place during the map operation. This allows for local data reduction while the map output is still in memory and therefore reduces the amount of disk I/O.  

The following is an example where we used a combiner to help us find the top K articles with the most revisions made for any given timerange.  We didn't want to have two MapReduce jobs for the query so the Combiner helped us achieve our goal.

This is the schema we used for our HBase table:

Key : Timestamp 
Values (Composite) : article_id + revision_id [One per column for any given timestamp]

We are running Hadoop 2.0.0

Imported Classes

To make what we have done more readable, rather than displaying what we imported for each class, here is a list of what we used across the whole project.  If you are using Eclipse then hit Ctrl + Shift + O and it will organise your imports automatically for you anyway.  I may be missing a few imports, but you get the jist.
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

Setup Class

This is where all the Map & Reduce classes are assigned and HBase objects are created and where the program is run.
public class IWikiQuery4 extends Configured implements Tool {
 private static Configuration conf;
 
 public int run(String[] args) throws Exception {
  if (args.length < 3)
   return 1;
  
  conf = HBaseConfiguration.create(getConf());
  conf.addResource("all-client-conf.xml");

  conf.set("mapred.jar", "file:///[LOCATION OF JAR FILE]");

  conf.set("startTime", args[0]);  //User input
  conf.set("endTime", args[1]);
  conf.set("k", args[2]);

  Job job = new Job(conf);
  job.setJarByClass(IWikiQuery4.class);
  
  Scan scan = new Scan();
  scan.addFamily(Bytes.toBytes("[COLUMN FAMILY DATA BEING READ FROM]"));  //Column family to read from
  scan.setStartRow(Bytes.toBytes(start));  //Filter by date here
  scan.setStopRow(Bytes.toBytes(end + 1));
  scan.setCaching(500);   //Should always set high
  scan.setCacheBlocks(false); // Always set this to false for MR jobs!
  
  TableMapReduceUtil.initTableMapperJob("[OUTPUT COLUMN FAMILY NAME]", scan, IQueryMapper4.class, LongWritable.class, Text.class, job);
  job.setCombinerClass(IQueryCombiner4.class);
  TableMapReduceUtil.initTableReducerJob("[OUTPUT COLUMN FAMILY NAME]", IQueryReducer4.class, job);
  job.setNumReduceTasks(10);

  return job.waitForCompletion(true) ? 0 : 1;;
 }

 public static void main(String[] args) throws Exception {
  System.exit(ToolRunner.run(new IWikiQuery4(), args));
 }
}


Mapper Class

public class IQueryMapper4 extends TableMapper<LongWritable, Text> {

 public void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
   
                        //Get value from column
                        byte[] colValue = value.getValue(Bytes.toBytes("[INPUT COLUMN FAMILY]"), Bytes.toBytes("Value");

   //Out Key: a_id,  Value: rev_id
   context.write(new LongWritable(Bytes.toLong(colValue, 0, 8)), new Text(Bytes.toLong(colValue, 8, 8) + ""));
   
  }
 }
}

Combiner Class

You must remember that whatever type the output of the Mapper is, The combiner must also output the same.  If the Output Key of the Mapper is Text then the Combiner cannot output the Key to be  LongWritable.
public class IQueryCombiner4 extends TableReducer<LongWritable, Text, LongWritable> {

 public void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
  int i = 0;
  //Count rev_id's
  for (Iterator<Text> it = values.iterator(); it.hasNext(); it.next())
   i++;
  
  //Send all to same reducer. Key: 1, Value: a_id + revCount
  context.write(new LongWritable(1), new Text(key + "\t" + i));
 }
}

Reducer Class

public class IQueryReducer4 extends TableReducer<LongWritable, Text, ImmutableBytesWritable> {

 public void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
  Configuration conf = context.getConfiguration();
  int kValue = Integer.parseInt(conf.get("k").trim());

  ArrayList<ArrayList<Long>> list = new ArrayList<ArrayList<Long>>();
  String tmp = "";
  int i;
  boolean found = false;

  //Create top k list, if item is large enough add it in correct place and remove the last item
  for (Iterator<Text> it = values.iterator(); it.hasNext();) {
   tmp = it.next().toString().trim();

   for (i = 0; (list.isEmpty()) || (i < list.size()); i++) {
    if ((list.isEmpty()) || (Integer.parseInt(tmp.split("\t")[1].trim()) >= list.get(i).get(1))) {
     // if top k-List is empty or new record is in top k-List, break
     found = true;
     break;
    }
   }

   // add pair to top k-List
   if (found) {
    ArrayList<Long> map = new ArrayList<Long>();
    map.add(0, Long.parseLong(tmp.split("\t")[0].trim()));
    map.add(1, Long.parseLong(tmp.split("\t")[1].trim()));
    list.add(i, map);
   }
   // make sure top k-List stays of size k
   if (list.size() > kValue) list.remove(list.size()-1);

   found = false;
  }

  for (int x = 0; x < list.size(); x++) {
   Put put = new Put(Bytes.toBytes(x + 1));
   put.add(Bytes.toBytes("[OUTPUT COLUMN FAMILY]"), Bytes.toBytes("article"), Bytes.toBytes(list.get(x).get(0)));
   put.add(Bytes.toBytes("[OUTPUT COLUMN FAMILY]"), Bytes.toBytes("revCount"), Bytes.toBytes(list.get(x).get(1)));
   
   //Output Key: Rank_number, Value: article_id + revCount
   context.write(new ImmutableBytesWritable(Bytes.toBytes(x + 1)), put);
  }
 }
}

There we have it.  I am no expert at MapReduce or HBase but thought since the documentation isn't the most helpful that this example might be of use to somebody.




Sunday, 15 December 2013

Retrieving Composite Key (HBase)

Retrieving Composite Key (HBase)

Its common to want to have the key of your HBase table to be made up more than one component, especially to ensure that your key stays unique.  There is a lack of documentation on how this can be done, but it is pretty easy. HBase orders its keys and allows you to filter by the key prior to loading that data in from region servers and therefore a composite key is a powerful tool.

Creating the Key

Creating your composite key is easy.  You can use longs, strings, integers, whatever as long as Apache supports it so check the supported methods. 
long1 = 11111111;
long2 = 22222222;
Bytes.add(Bytes.toLong(long1), Bytes.toLong(long2));

Retrieving the Key

You will need to understand how many bytes long your data is.  A long is 8 bytes in size so the key can be retrieved by first retrieving the first lot of 8 bytes, then retrieving the second lot of 8 bytes.
//res is a Result object
byte[] rowkey = res.getRow();
long key1 = Bytes.toLong(rowkey, 0, 8);
long key2 = Bytes.toLong(rowkey, 8, 8);

Saturday, 14 December 2013

Creating an Index Structure for MapReduce over HDFS (Hadoop 2.0.0)

Creating an Index Structure for MapReduce over HDFS (Hadoop 2.0.0)

Some tips and advice from experience designing an Index structure for MapReduce running over HDFS.

Cluster

The MapReduce jobs will run on a Hadoop cluster with the following Hardware Specification:
  • Intel I5 CPU (quad core, 3.4GHz, 6MB cache)
  • 16 GB RAM
  • 500GB 7200 RPM Hard Disk 
  • Intel(R) 82579LM Gigabit Ethernet NIC

Index Structure

The index structure that was designed was suited to our dataset and for the queries that were going to be run on the dataset.  There were much better index structures that could be designed but it was simple and effective for what was required.
For this task, we wanted a structure that would hopefully be small enough to fit in memory and therefore the costs to disk would be nullified.  

The dataset we were attempting to run MapReduce jobs over was a plain text file of Wikipedia edits and this is the format:

Each revision history record consists of 14 lines, each starting with a tag and containing a space-delimited series of entries. More specifically, each record contains the following data/tags, one tag per line:
  • REVISION: revision metadata, consisting of:
    • o article_id: a large integer, uniquely identifying each page.
    • o rev_id: a large number uniquely identifying each revision.
    • o article_title: a string denoting the page’s title (and the last part of the URL of the page).
    • o timestamp: the exact date and time of the revision, in ISO 8601 format; e.g., 13:45:00 UTC 30 September 2013 becomes 2013-09-12T13:45:00Z, where T separates the date from the time part and Z denotes the time is in UTC.
    • o [ip:]username: the name of the user who performed the revision, or her DNS-resolved IP address (e.g., ip:office.dcs.gla.ac.uk) if anonymous.
    • o user_id: a large number uniquely identifying the user who performed the revision, or her IP address as above if anonymous.
  • CATEGORY: list of categories this page is assigned to.
  • IMAGE: list of images in the page, each listed as many times as it occurs.
  • MAIN, TALK, USER, USER_TALK, OTHER: cross-references to pages in other namespaces.
  • EXTERNAL: list of hyperlinks to pages outside Wikipedia.
  • TEMPLATE: list of all templates used by the page, each listed as many times as it occurs.
  • COMMENT: revision comments as entered by the revision author.
  • MINOR: a Boolean flag (0|1) denoting whether the edit was marked as minor by the author.
  • TEXTDATA: word count of revision's plain text.
  • An empty line, denoting the end of the current record.

Identifying the what is needed in your queries is very important here.  The queries we were being asked to perform were only using the article_id revison_id and the timestamp and therefore there is a lot of unused data here.  For our situation we choose to go with an Index structure that contained the timestamp, article_id and revision_id one one line each.  This meant we managed to decrease the data size to about 1% of the original.  It also meant it could fit in memory and therefore we saw a large saving in the time taken to complete the job.  This is the code to create the index.

Implementation

Note, Java 6 RE was used here instead of the standard Java 7.

This the Class to run the job:

IndexingOne.java

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/* 
 * Edited By: Pascal Lueders & Allan MacMillan
 * Date: 4th November 2013
 */
public class IndexingOne extends Configured implements Tool {

    @Override
 public int run(String[] args) throws Exception {
     String inputFile = "/wiki-sample.txt";
     String outputFolder = "/user/Index/output_folder";
 
  Configuration conf = new Configuration();

  conf.set("fs.defaultFS", "hdfs://cluster:8020");
  conf.set("mapred.job.tracker", "cluster:8021");

  conf.set("mapred.jar", "file:///users/Index/I1.jar");
  
  FileSystem fs = FileSystem.get(conf);
  fs.delete(new Path(outputFolder), true);

  Job job = new Job(conf);
  job.setJobName("IndexingOne");
  job.setJarByClass(IndexingOne.class);
  
  job.setMapperClass(IndexOneMapper.class);
  job.setReducerClass(IndexOneReducer.class);
  
  job.setInputFormatClass(MyInputFormat.class);
  job.setMapOutputKeyClass(LongWritable.class);
  job.setMapOutputValueClass(Text.class);
    
  FileInputFormat.setInputPaths(job, new Path(inputFile));
  FileOutputFormat.setOutputPath(job, new Path(outputFolder));
  
  job.submit();
  return job.waitForCompletion(true) ? 0 : 1;
 }

 public static void main(String[] args) throws Exception {
  int a = ToolRunner.run(new IndexingOne(), args);
  System.exit(a);
 }
}
This is the java class that points to the class we designed to split the input.  

MyInputFormat.java

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

/*
 * Authors: Pascal Lueders & Allan MacMillan
 * Date: 4th November 2013
 */
public class MyInputFormat extends FileInputFormat<LongWritable, Text> {
 
 @Override
 public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {
  return new MyRecordReader();
 }
}
This is the class that dictates how the input should be split

MyRecordReader.java

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

/*
 * Edited By: Pascal Lueders & Allan MacMillan
 * Date: 4th November 2013
 */
public class MyRecordReader extends RecordReader<LongWritable, Text> {
 // Each record is separated by a triple newLine character
 private static final byte[] recordSeparator = "\n\n\n".getBytes();
 private FSDataInputStream fsin;
 private long start, end;
 private boolean stillInChunk = true;
 private DataOutputBuffer buffer = new DataOutputBuffer();
 
 private LongWritable key = new LongWritable();
 private Text value = new Text();

 public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException {
  FileSplit split = (FileSplit) inputSplit;
  Configuration conf = context.getConfiguration();
  Path path = split.getPath();
  FileSystem fs = path.getFileSystem(conf);

  fsin = fs.open(path);
  start = split.getStart();
  end = start + split.getLength();
  fsin.seek(start);

  if (start != 0)
   readRecord(false);
 }

 private boolean readRecord(boolean withinBlock) throws IOException {
  int i = 0, b;
  while (true) {
   if ((b = fsin.read()) == -1)
    return false;
   if (withinBlock)
    buffer.write(b);
   if (b == recordSeparator[i]) {
    if (++i == recordSeparator.length)
     return fsin.getPos() < end;
   } else
    i = 0;
  }
 }

 public boolean nextKeyValue() throws IOException {
  if (!stillInChunk)
   return false;
  boolean status = readRecord(true);
  value = new Text();
  value.set(buffer.getData(), 0, buffer.getLength());
  key.set(fsin.getPos());
  buffer.reset();
  if (!status)
   stillInChunk = false;
  return true;
 }

 public LongWritable getCurrentKey() { return key; }

 public Text getCurrentValue() { return value; }

 public float getProgress() throws IOException {
  return (float) (fsin.getPos() - start) / (end - start);
 }

 public void close() throws IOException { fsin.close(); 

This is the Mapper.

IndexOneMapper.java

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.StringTokenizer;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/*
 * Authors: Pascal Lueders & Allan MacMillan
 * Date: 4th November 2013
 */
public class IndexOneMapper extends Mapper<LongWritable, Text, LongWritable, Text>{

 private LongWritable _key = new LongWritable();
 private Text _value = new Text();

 @Override
 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  long unixtime = 0L;
  final String FORMAT = "yyyy-MM-dd'T'HH:mm:ss'Z'";

  // read one line at a time
  StringTokenizer tokenizer = new StringTokenizer(value.toString(), "\n");
  while (tokenizer.hasMoreTokens()) {
   String line = tokenizer.nextToken();
   if (line.split(" ")[0].equals("REVISION")) {
    try {
     // Converts the given String time-stamp (String format given above) into a long
     SimpleDateFormat sf = new SimpleDateFormat(FORMAT);
     unixtime = sf.parse(line.split(" ")[4].trim()).getTime();
    } catch (ParseException pe) {}

    // _value = "<article_id>\t<revision_id>"
    // Once the data is added, break out of the loop to decrease processing time
    _key.set(unixtime);
    _value.set(line.split(" ")[1] + "\t" + line.split(" ")[2]);
    context.write(_key, _value);
    break;
   }
  }
 }
}

This is the Reducer.

IndexOneReducer.java

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/*
 * Authors: Pascal Lueders & Allan MacMillan
 * Date: 4th November 2013
 */
public class IndexOneReducer extends Reducer<LongWritable, Text, LongWritable, Text>
{
 @Override
 /*
  * Reducer iterates through the given list of values and writes the key and values to the context.
  */
 protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
  
  for (Iterator<Text> it = values.iterator(); it.hasNext();)
   context.write(key, it.next());
 }
} 

Next Steps

http://amac4.blogspot.co.uk/2013/07/setting-up-tika-extracting-request.html