Hadoop Distributed Cache example

posted on Nov 20th, 2016

Apache Hadoop

Hadoop is an Apache open source framework written in java that allows distributed processing of large datasets across clusters of computers using simple programming models.

The Hadoop framework application works in an environment that provides distributed storage and computation across clusters of computers. Hadoop is designed to scale up from single server to thousands of machines, each offering local computation and storage.

Pre Requirements

1) A machine with Ubuntu 14.04 LTS operating system installed.

2) Apache Hadoop 2.6.4 pre installed (How to install Hadoop on Ubuntu 14.04)

Hadoop Distributed Cache Example

Distribute application-specific large, read-only files efficiently. DistributedCache is a facility provided by the Map-Reduce framework to cache files (text, archives, jars etc.) needed by applications. Applications specify the files, via urls (hdfs:// or http://) to be cached via the JobConf.

The DistributedCache assumes that the files specified via urls are already present on the FileSystem at the path specified by the url and are accessible by every machine in the cluster. The framework will copy the necessary files on to the slave node before any tasks for the job are executed on that node. Its efficiency stems from the fact that the files are only copied once per job and the ability to cache archives which are un-archived on the slaves.

Step 1 - Add all hadoop jar files to your java project. Add following jars.

/usr/local/hadoop/share/hadoop/common/*
/usr/local/hadoop/share/hadoop/common/lib/*
/usr/local/hadoop/share/hadoop/mapreduce/*
/usr/local/hadoop/share/hadoop/mapreduce/lib* 
/usr/local/hadoop/share/hadoop/yarn/*
/usr/local/hadoop/share/hadoop/yarn/lib/*

DistributedCacheExample.java

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.StringUtils;

public class DistributedCacheExample {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    static enum CountersEnum { INPUT_WORDS }

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    private boolean caseSensitive;
    private Set<String> patternsToSkip = new HashSet<String>();

    private Configuration conf;
    private BufferedReader fis;

    @Override
    public void setup(Context context) throws IOException,
        InterruptedException {
      conf = context.getConfiguration();
      caseSensitive = conf.getBoolean("wordcount.case.sensitive", true);
      if (conf.getBoolean("wordcount.skip.patterns", true)) {
        URI[] patternsURIs = Job.getInstance(conf).getCacheFiles();
        for (URI patternsURI : patternsURIs) {
          Path patternsPath = new Path(patternsURI.getPath());
          String patternsFileName = patternsPath.getName().toString();
          parseSkipFile(patternsFileName);
        }
      }
    }

    private void parseSkipFile(String fileName) {
      try {
        fis = new BufferedReader(new FileReader(fileName));
        String pattern = null;
        while ((pattern = fis.readLine()) != null) {
          patternsToSkip.add(pattern);
        }
      } catch (IOException ioe) {
        System.err.println("Caught exception while parsing the cached file '"
            + StringUtils.stringifyException(ioe));
      }
    }

    @Override
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      String line = (caseSensitive) ?
          value.toString() : value.toString().toLowerCase();
      for (String pattern : patternsToSkip) {
        line = line.replaceAll(pattern, "");
      }
      StringTokenizer itr = new StringTokenizer(line);
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
        Counter counter = context.getCounter(CountersEnum.class.getName(),
            CountersEnum.INPUT_WORDS.toString());
        counter.increment(1);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
    String[] remainingArgs = optionParser.getRemainingArgs();
    if (!(remainingArgs.length != 2 || remainingArgs.length != 4)) {
      System.err.println("Usage: wordcount <in> <out> [-skip skipPatternFile]");
      System.exit(2);
    }
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(DistributedCacheExample.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    List<String> otherArgs = new ArrayList<String>();
    for (int i=0; i < remainingArgs.length; ++i) {
      if ("-skip".equals(remainingArgs[i])) {
        job.addCacheFile(new Path(remainingArgs[++i]).toUri());
        job.getConfiguration().setBoolean("wordcount.skip.patterns", true);
      } else {
        otherArgs.add(remainingArgs[i]);
      }
    }
    FileInputFormat.addInputPath(job, new Path(otherArgs.get(0)));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(1)));

    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}
$ hadoop jar /path/dc.jar DistributedCacheExample -Dwordcount.case.sensitive=true hdfs://localhost:9000/user/hduser/dcin hdfs://localhost:9000/user/hduser/dcout -skip hdfs://localhost:9000/user/hduser/pat/patterns.txt

Step 2 - Change the directory to /usr/local/hadoop/sbin

$ cd /usr/local/hadoop/sbin

Step 3 - Start all hadoop daemons

$ start-all.sh

Step 4 - Create file01.txt and file02.txt files. In my case, i have stored these files in /user/hduser/dcin/ folder HDFS.

file01.txt

Hello World, Bye World!

file02.txt

Hello Hadoop, Goodbye to hadoop.

Step 5 - Create a patterns.txt file In my case, i have stored this file in /user/hduser/pat/ folder HDFS.

patterns.txt

\.
\,
\!
to

Step 6 - Run your DistributedCacheExample program by submitting java project jar file to hadoop. Creating jar file is left to you.

$ hadoop jar /path/dc.jar DistributedCacheExample -Dwordcount.case.sensitive=true hdfs://localhost:9000/user/hduser/dcin hdfs://localhost:9000/user/hduser/dcout -skip hdfs://localhost:9000/user/hduser/pat/patterns.txt

Step 7 - Output

Hadoop Distributed Cache Java Example

Step 8 - Dont forget to stop hadoop daemons.

$ stop-all.sh

Please share this blog post and follow me for latest updates on

facebook             google+             twitter             feedburner

Previous Post                                                                                          Next Post

Labels : Hadoop Standalone Mode Installation   Hadoop Pseudo Distributed Mode Installation   Hadoop Fully Distributed Mode Installation   Hadoop HDFS commands usage Hadoop Commissioning and Decommissioning DataNode     Hadoop Mapper/Reducer Java Example   Hadoop WordCount Java Example   Hadoop Partitioner Java Example   Hadoop HDFS operations using Java   Hadoop Combiner Java Example