Custom partitioner in mapreduce – using new hadoop api 2

By | October 25, 2014
This is the example of custom partitioner for classic wordcount program.

Driver Class:
We are partitioning keys based on the first letter, so we will have 27 partitions, 26 for each partition plus 1 other characters. Below are the additional things in Driver class.

  • job.setNumReduceTasks(26);
  • job.setPartitionerClass(WordcountPartitioner.class);
package org.puneetha.customPartitioner;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class WordcountDriver extends Configured implements Tool {
	public int run(String[] args) throws Exception {
		if (args.length != 2) {
			System.out.println("Usage: [input] [output]");
			System.exit(-1);
		}

		Job job = Job.getInstance(getConf());
		job.setJobName("wordcount");
		job.setJarByClass(WordcountDriver.class);

		job.setNumReduceTasks(26);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		job.setMapperClass(WordcountMapper.class);
		job.setPartitionerClass(WordcountPartitioner.class);
		job.setCombinerClass(WordcountReducer.class);
		job.setReducerClass(WordcountReducer.class);

		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		
		Path inputFilePath = new Path(args[0]);
		Path outputFilePath = new Path(args[1]);

		/* This line is to accept input recursively */
		FileInputFormat.setInputDirRecursive(job, true);

		FileInputFormat.addInputPath(job, inputFilePath);
		FileOutputFormat.setOutputPath(job, outputFilePath);

		/*Delete output filepath if already exists*/
		FileSystem fs = FileSystem.newInstance(getConf());

		if (fs.exists(outputFilePath)) {
			fs.delete(outputFilePath, true);
		}

		return job.waitForCompletion(true) ? 0: 1;
	}

	public static void main(String[] args) throws Exception {
		WordcountDriver wordcountDriver = new WordcountDriver();
		int res = ToolRunner.run(wordcountDriver, args);
		System.exit(res);
	}
}

Mapper Class:

package org.puneetha.customPartitioner;

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

	private final static IntWritable one = new IntWritable(1);
	private Text word = new Text();

	@Override
	public void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		String line = value.toString();

		StringTokenizer tokenizer = new StringTokenizer(line);
		while (tokenizer.hasMoreTokens()) {
			word.set(tokenizer.nextToken());
			context.write(word, one);
		}
	}

	public void run(Context context) throws IOException, InterruptedException {
		setup(context);
		while (context.nextKeyValue()) {
			map(context.getCurrentKey(), context.getCurrentValue(), context);
		}
		cleanup(context);
	}
}

Partitioner Class:

package org.puneetha.customPartitioner;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

/**
 * Custom partitioner to ensure that the partitioning is happening based on the
 * key
 */
public class WordcountPartitioner extends Partitioner<Text, IntWritable> {

	@Override
	public int getPartition(Text key, IntWritable value, int numPartitions) {
		String word = key.toString();
		char letter = word.toLowerCase().charAt(0);
		return (int) letter - 97;
	}

	/* Other Implementation of partitioner
	 * If using this partition code, set reducer number to 27 in Driver class Ex: job.setNumReduceTasks(27); 
	@Override
	public int getPartition(Text key, IntWritable value, int numPartitions) {
			String word = key.toString();
			char letter = word.toLowerCase().charAt(0);
			int partitionNumber = 0;
			
			switch(letter) {
			case 'a': partitionNumber = 1;  break;
			case 'b': partitionNumber = 2;  break;
			case 'c': partitionNumber = 3;  break;
			case 'd': partitionNumber = 4;  break;
			case 'e': partitionNumber = 5;  break;
			case 'f': partitionNumber = 6;  break;
			case 'g': partitionNumber = 7;  break;
			case 'h': partitionNumber = 8;  break;
			case 'i': partitionNumber = 9;  break;
			case 'j': partitionNumber = 10;  break;
			case 'k': partitionNumber = 11;  break;
			case 'l': partitionNumber = 12;  break;
			case 'm': partitionNumber = 13;  break;
			case 'n': partitionNumber = 14;  break;
			case 'o': partitionNumber = 15;  break;
			case 'p': partitionNumber = 16;  break;
			case 'q': partitionNumber = 17;  break;
			case 'r': partitionNumber = 18;  break;
			case 's': partitionNumber = 19;  break;
			case 't': partitionNumber = 20;  break;
			case 'u': partitionNumber = 21;  break;
			case 'v': partitionNumber = 22;  break;
			case 'w': partitionNumber = 23;  break;
			case 'x': partitionNumber = 24;  break;
			case 'y': partitionNumber = 25;  break;
			case 'z': partitionNumber = 26;  break;
			default: partitionNumber = 0;  break;
			}
			
			return partitionNumber;
	}*/
}

Reducer Class:

package org.puneetha.customPartitioner;

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
	
	private IntWritable totalWordCount = new IntWritable();

	@Override
	public void reduce(final Text key, final Iterable<IntWritable> values,
			final Context context) throws IOException, InterruptedException {

		int sum = 0;
		Iterator<IntWritable> iterator = values.iterator();

		while (iterator.hasNext()) {
			sum += iterator.next().get();
		}

		totalWordCount.set(sum);
		context.write(key, totalWordCount);
	}
}

Run as below:

$ hadoop jar org.puneetha-0.0.1-SNAPSHOT.jar org.puneetha.customPartitioner.WordcountDriver /user/dummyuser/wordcount/input /user/dummyuser/wordcount/output

One thought on “Custom partitioner in mapreduce – using new hadoop api 2

  1. Sampath

    How will you reduce the partitions(should not create part file itself) if a record not starts with a particular letter ?

    Reply

Leave a Reply

Your email address will not be published. Required fields are marked *