Hadoop Partitioner java example

posted on Nov 20th, 2016

Apache Hadoop

Hadoop is an Apache open source framework written in java that allows distributed processing of large datasets across clusters of computers using simple programming models.

The Hadoop framework application works in an environment that provides distributed storage and computation across clusters of computers. Hadoop is designed to scale up from single server to thousands of machines, each offering local computation and storage.

Pre Requirements

1) A machine with Ubuntu 14.04 LTS operating system installed.

2) Apache Hadoop 2.6.4 pre installed (How to install Hadoop on Ubuntu 14.04)

Hadoop Partitioner Example

A partitioner partitions the key-value pairs of intermediate Map-outputs. It partitions the data using a user-defined condition, which works like a hash function. The total number of partitions is same as the number of Reducer tasks for the job.

A partitioner works like a condition in processing an input dataset. The partition phase takes place after the Map phase and before the Reduce phase. The number of partitioners is equal to the number of reducers. That means a partitioner will divide the data according to the number of reducers. Therefore, the data passed from a single partitioner is processed by a single Reducer.

Step 1 - Add all hadoop jar files to your java project. Add following jars.

/usr/local/hadoop/share/hadoop/common/*
/usr/local/hadoop/share/hadoop/common/lib/*
/usr/local/hadoop/share/hadoop/mapreduce/*
/usr/local/hadoop/share/hadoop/mapreduce/lib* 
/usr/local/hadoop/share/hadoop/yarn/*
/usr/local/hadoop/share/hadoop/yarn/lib/*

PartitionerExample.java

import java.io.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.*;

public class PartitionerExample extends Configured implements Tool {
	// Map class
	static String[] line=null;
	public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
		@Override
		public void map(LongWritable key, Text value, Context context)
		throws IOException, InterruptedException
		{
		line=value.toString().split(" ");
		context.write(new Text(line[0]), new Text(line[1]));
		}
	}

	// Reducer class
	public static class ReduceClass extends
			Reducer<Text, Text, Text, IntWritable> {
		public int max = -1;

		public void reduce(Text key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {
			max = -1;
			for (Text val : values) {
				String[] str = val.toString().split("\t", -3);
				if (Integer.parseInt(str[4]) > max)
					max = Integer.parseInt(str[4]);
			}
			context.write(new Text(key), new IntWritable(max));
		}
	}
	// Partitioner class
	public static class CaderPartitioner extends Partitioner<Text, Text> {
		@Override
		public int getPartition(Text key, Text value, int numReduceTasks) {
			int seed =Integer.parseInt(line[1]);
			if((seed>=1)&&(seed<=5))
			return 0;
			else
				return 1;
		}
	}

	@Override
	public int run(String[] arg) throws Exception {
		Configuration conf = getConf();
		@SuppressWarnings("deprecation")
		Job job = new Job(conf, "topsal");
		job.setJarByClass(PartitionerExample.class);
		FileInputFormat.setInputPaths(job, new Path(
				"/home/hduser/Desktop/input.txt"));
		FileOutputFormat.setOutputPath(job,
				new Path("/home/hduser/Desktop/out"));
		job.setMapperClass(MapClass.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		// set partitioner statement
		job.setPartitionerClass(CaderPartitioner.class);
		job.setNumReduceTasks(2);
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		
		return job.waitForCompletion(true) ? 0 : 1;
	}

	public static void main(String ar[]) throws Exception {
		int res = ToolRunner.run(new Configuration(), new PartitionerExample(),
				ar);
		System.exit(res);
	}
}

Step 2 - Change the directory to /usr/local/hadoop/sbin

$ cd /usr/local/hadoop/sbin

Step 3 - Start all hadoop daemons

$ start-all.sh

Step 4 - Create input.txt file. In my case, i have stored input.txt in /home/hduser/Desktop/ directory.

input.txt

Step 5 - Add following lines to input.txt file.

aman 1
ashima 2
kaushik 3
sood 4
tony 5
stark 6
bruce 7
wayne 8
james 9
bond 10
mark 11
zuckerberg 12
saun 13
parker 14

Step 6 - Run your PartitionerExample program by submitting java project jar file to hadoop. Creating jar file is left to you.

$ hadoop jar /path/partitioner.jar PartitionerExample

Step 7 - Now you can see the output files. In my case, i have given output path as /home/hduser/Desktop/out. Check it in program.

part-r-00000

aman	1
ashima	2
kaushik	3
sood	4
tony	5

part-r-00001

bond	10
bruce	7
james	9
mark	11
parker	14
saun	13
stark	6
wayne	8
zuckerberg	12

Step 8 - Dont forget to stop hadoop daemons.

$ stop-all.sh

Please share this blog post and follow me for latest updates on

facebook             google+             twitter             feedburner

Previous Post                                                                                          Next Post

Labels : Hadoop Standalone Mode Installation   Hadoop Pseudo Distributed Mode Installation   Hadoop Fully Distributed Mode Installation   Hadoop HDFS commands usage Hadoop Commissioning and Decommissioning DataNode     Hadoop Mapper/Reducer Java Example   Hadoop WordCount Java Example   Hadoop Combiner Java Example   Hadoop HDFS operations using Java   Hadoop Distributed Cache Java Example