Pages

Saturday, 14 December 2013

Creating an Index Structure for MapReduce over HDFS (Hadoop 2.0.0)

Creating an Index Structure for MapReduce over HDFS (Hadoop 2.0.0)

Some tips and advice from experience designing an Index structure for MapReduce running over HDFS.

Cluster

The MapReduce jobs will run on a Hadoop cluster with the following Hardware Specification:
  • Intel I5 CPU (quad core, 3.4GHz, 6MB cache)
  • 16 GB RAM
  • 500GB 7200 RPM Hard Disk 
  • Intel(R) 82579LM Gigabit Ethernet NIC

Index Structure

The index structure that was designed was suited to our dataset and for the queries that were going to be run on the dataset.  There were much better index structures that could be designed but it was simple and effective for what was required.
For this task, we wanted a structure that would hopefully be small enough to fit in memory and therefore the costs to disk would be nullified.  

The dataset we were attempting to run MapReduce jobs over was a plain text file of Wikipedia edits and this is the format:

Each revision history record consists of 14 lines, each starting with a tag and containing a space-delimited series of entries. More specifically, each record contains the following data/tags, one tag per line:
  • REVISION: revision metadata, consisting of:
    • o article_id: a large integer, uniquely identifying each page.
    • o rev_id: a large number uniquely identifying each revision.
    • o article_title: a string denoting the page’s title (and the last part of the URL of the page).
    • o timestamp: the exact date and time of the revision, in ISO 8601 format; e.g., 13:45:00 UTC 30 September 2013 becomes 2013-09-12T13:45:00Z, where T separates the date from the time part and Z denotes the time is in UTC.
    • o [ip:]username: the name of the user who performed the revision, or her DNS-resolved IP address (e.g., ip:office.dcs.gla.ac.uk) if anonymous.
    • o user_id: a large number uniquely identifying the user who performed the revision, or her IP address as above if anonymous.
  • CATEGORY: list of categories this page is assigned to.
  • IMAGE: list of images in the page, each listed as many times as it occurs.
  • MAIN, TALK, USER, USER_TALK, OTHER: cross-references to pages in other namespaces.
  • EXTERNAL: list of hyperlinks to pages outside Wikipedia.
  • TEMPLATE: list of all templates used by the page, each listed as many times as it occurs.
  • COMMENT: revision comments as entered by the revision author.
  • MINOR: a Boolean flag (0|1) denoting whether the edit was marked as minor by the author.
  • TEXTDATA: word count of revision's plain text.
  • An empty line, denoting the end of the current record.

Identifying the what is needed in your queries is very important here.  The queries we were being asked to perform were only using the article_id revison_id and the timestamp and therefore there is a lot of unused data here.  For our situation we choose to go with an Index structure that contained the timestamp, article_id and revision_id one one line each.  This meant we managed to decrease the data size to about 1% of the original.  It also meant it could fit in memory and therefore we saw a large saving in the time taken to complete the job.  This is the code to create the index.

Implementation

Note, Java 6 RE was used here instead of the standard Java 7.

This the Class to run the job:

IndexingOne.java

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/* 
 * Edited By: Pascal Lueders & Allan MacMillan
 * Date: 4th November 2013
 */
public class IndexingOne extends Configured implements Tool {

    @Override
 public int run(String[] args) throws Exception {
     String inputFile = "/wiki-sample.txt";
     String outputFolder = "/user/Index/output_folder";
 
  Configuration conf = new Configuration();

  conf.set("fs.defaultFS", "hdfs://cluster:8020");
  conf.set("mapred.job.tracker", "cluster:8021");

  conf.set("mapred.jar", "file:///users/Index/I1.jar");
  
  FileSystem fs = FileSystem.get(conf);
  fs.delete(new Path(outputFolder), true);

  Job job = new Job(conf);
  job.setJobName("IndexingOne");
  job.setJarByClass(IndexingOne.class);
  
  job.setMapperClass(IndexOneMapper.class);
  job.setReducerClass(IndexOneReducer.class);
  
  job.setInputFormatClass(MyInputFormat.class);
  job.setMapOutputKeyClass(LongWritable.class);
  job.setMapOutputValueClass(Text.class);
    
  FileInputFormat.setInputPaths(job, new Path(inputFile));
  FileOutputFormat.setOutputPath(job, new Path(outputFolder));
  
  job.submit();
  return job.waitForCompletion(true) ? 0 : 1;
 }

 public static void main(String[] args) throws Exception {
  int a = ToolRunner.run(new IndexingOne(), args);
  System.exit(a);
 }
}
This is the java class that points to the class we designed to split the input.  

MyInputFormat.java

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

/*
 * Authors: Pascal Lueders & Allan MacMillan
 * Date: 4th November 2013
 */
public class MyInputFormat extends FileInputFormat<LongWritable, Text> {
 
 @Override
 public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {
  return new MyRecordReader();
 }
}
This is the class that dictates how the input should be split

MyRecordReader.java

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

/*
 * Edited By: Pascal Lueders & Allan MacMillan
 * Date: 4th November 2013
 */
public class MyRecordReader extends RecordReader<LongWritable, Text> {
 // Each record is separated by a triple newLine character
 private static final byte[] recordSeparator = "\n\n\n".getBytes();
 private FSDataInputStream fsin;
 private long start, end;
 private boolean stillInChunk = true;
 private DataOutputBuffer buffer = new DataOutputBuffer();
 
 private LongWritable key = new LongWritable();
 private Text value = new Text();

 public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException {
  FileSplit split = (FileSplit) inputSplit;
  Configuration conf = context.getConfiguration();
  Path path = split.getPath();
  FileSystem fs = path.getFileSystem(conf);

  fsin = fs.open(path);
  start = split.getStart();
  end = start + split.getLength();
  fsin.seek(start);

  if (start != 0)
   readRecord(false);
 }

 private boolean readRecord(boolean withinBlock) throws IOException {
  int i = 0, b;
  while (true) {
   if ((b = fsin.read()) == -1)
    return false;
   if (withinBlock)
    buffer.write(b);
   if (b == recordSeparator[i]) {
    if (++i == recordSeparator.length)
     return fsin.getPos() < end;
   } else
    i = 0;
  }
 }

 public boolean nextKeyValue() throws IOException {
  if (!stillInChunk)
   return false;
  boolean status = readRecord(true);
  value = new Text();
  value.set(buffer.getData(), 0, buffer.getLength());
  key.set(fsin.getPos());
  buffer.reset();
  if (!status)
   stillInChunk = false;
  return true;
 }

 public LongWritable getCurrentKey() { return key; }

 public Text getCurrentValue() { return value; }

 public float getProgress() throws IOException {
  return (float) (fsin.getPos() - start) / (end - start);
 }

 public void close() throws IOException { fsin.close(); 

This is the Mapper.

IndexOneMapper.java

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.StringTokenizer;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/*
 * Authors: Pascal Lueders & Allan MacMillan
 * Date: 4th November 2013
 */
public class IndexOneMapper extends Mapper<LongWritable, Text, LongWritable, Text>{

 private LongWritable _key = new LongWritable();
 private Text _value = new Text();

 @Override
 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  long unixtime = 0L;
  final String FORMAT = "yyyy-MM-dd'T'HH:mm:ss'Z'";

  // read one line at a time
  StringTokenizer tokenizer = new StringTokenizer(value.toString(), "\n");
  while (tokenizer.hasMoreTokens()) {
   String line = tokenizer.nextToken();
   if (line.split(" ")[0].equals("REVISION")) {
    try {
     // Converts the given String time-stamp (String format given above) into a long
     SimpleDateFormat sf = new SimpleDateFormat(FORMAT);
     unixtime = sf.parse(line.split(" ")[4].trim()).getTime();
    } catch (ParseException pe) {}

    // _value = "<article_id>\t<revision_id>"
    // Once the data is added, break out of the loop to decrease processing time
    _key.set(unixtime);
    _value.set(line.split(" ")[1] + "\t" + line.split(" ")[2]);
    context.write(_key, _value);
    break;
   }
  }
 }
}

This is the Reducer.

IndexOneReducer.java

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/*
 * Authors: Pascal Lueders & Allan MacMillan
 * Date: 4th November 2013
 */
public class IndexOneReducer extends Reducer<LongWritable, Text, LongWritable, Text>
{
 @Override
 /*
  * Reducer iterates through the given list of values and writes the key and values to the context.
  */
 protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
  
  for (Iterator<Text> it = values.iterator(); it.hasNext();)
   context.write(key, it.next());
 }
} 

Next Steps

http://amac4.blogspot.co.uk/2013/07/setting-up-tika-extracting-request.html

No comments:

Post a Comment