Monday, June 5, 2017

Chaining Jobs in Hadoop MapReduce

Chaining Jobs in Hadoop MapReduce



There are cases where we need to write more than one MapReduce Job.
Map1--Reduce1--Map2--Reduce2
How do you manage the jobs so they are executed in order? There are several approaches, Here is an approach to easily chain jobs together by writing multiple driver methods, one for each job:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
* @author Unmesha SreeVeni U.B
*
*/
public class ChainJobs extends Configured implements Tool {

private static final String OUTPUT_PATH = "intermediate_output";

@Override
public int run(String[] args) throws Exception {
/*
* Job 1
*/

Configuration conf = getConf();
FileSystem fs = FileSystem.get(conf);
Job job = new Job(conf, "Job1");
job.setJarByClass(ChainJobs.class);

job.setMapperClass(MyMapper1.class);
job.setReducerClass(MyReducer1.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

TextInputFormat.addInputPath(job, new Path(args[0]));
TextOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));


job.waitForCompletion(true);

/*
* Job 2
*/


Job job2 = new Job(
conf, "Job 2");
job2.setJarByClass(ChainJobs.class);

job2.setMapperClass(MyMapper2.class);
job2.setReducerClass(MyReducer2.class);

job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(Text.class);

job2.setInputFormatClass(TextInputFormat.class);
job2.setOutputFormatClass(TextOutputFormat.class);

TextInputFormat.addInputPath(job2, new Path(OUTPUT_PATH));
TextOutputFormat.setOutputPath(job2, new Path(args[1]));


return job2.waitForCompletion(true) ? 0 : 1;
}

/**
* Method Name: main Return type: none Purpose:Read the arguments from
* command line and run the Job till completion
*
*/
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
if (args.length != 2) {
System.err.println("Enter valid number of arguments <Inputdirectory> <Outputlocation>");
System.exit(0);
}
ToolRunner.run(new Configuration(), new ChainJobs(), args);
}
}

The above code has 2 jobs named job1 and job2
private static final String OUTPUT_PATH = "intermediate_output";
String "OUTPUT_PATH" is used to write the output for first job.
TextInputFormat.addInputPath(job, new Path(args[0]));
TextOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
So in first job our input will be args[0] and output will be new Path(OUTPUT_PATH).

First Job Configuration


 /*
* Job 1
*/
Configuration conf = getConf();
FileSystem fs = FileSystem.get(conf);
Job job = new Job(conf, "Job1");
job.setJarByClass(ChainJobs1.class);

job.setMapperClass(MyMapper1.class);
job.setReducerClass(MyReducer1.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

TextInputFormat.addInputPath(job, new Path(args[0]));
TextOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));

job.waitForCompletion(true);


Once the first job has executed successfully  "OUTPUT_PATH" is served as the input to second job and the output of job2 is written to args[1].
TextInputFormat.addInputPath(job2, new Path(OUTPUT_PATH));
TextOutputFormat.setOutputPath(job2, new Path(args[1]));

Second Job Configuration

 /*
* Job 2
*/

Job job2 = new Job(
conf, "Job 2");
job2.setJarByClass(ChainJobs1.class);

job2.setMapperClass(MyMapper2.class);
job2.setReducerClass(MyReducer2.class);

job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(Text.class);

job2.setInputFormatClass(TextInputFormat.class);
job2.setOutputFormatClass(TextOutputFormat.class);

TextInputFormat.addInputPath(job2, new Path(OUTPUT_PATH));
TextOutputFormat.setOutputPath(job2, new Path(args[1]));

return job2.waitForCompletion(true) ? 0 : 1;

Happy Hadooping . . .