Hadoop keypoints for beginners
Key Concepts
- Every mapper communicates with all reducers (potentially sending data to all of them).
Shuffle- communication from mappers to reducers.Block Size- Files are split to blocks you can any file size it would just get split into blocks (64MB / 128MB ...)Partitioner- Splits map result to reducer the byhashso same keys always reach same reducers. same key will always reach same reducer.- If you want your result reside in single file (cant be multiple blocks) you need a single reducer.
Hadoop default configuration
/etc/hadoop/conf
Jobs management
- Master sends the actual
jarto be executed to data nodes. - Hadoop sends data from mappers to reducers even before
mappersfinished in order to havereducersbe able to already start working. hadoop jaris the command that will run your jar (you should have already uploaded it to hadoop cluster with-put- Show list of running jobs
mapred job -list
Hadoop Map Reduce
FileInputFormattakes a file and if we have multiple blocks splits it to multiple mappers.- You should have a main (not mapper and not reducer which is your
main) it receives the command line parameters uses theFileInputFormatto split the file (which has usually multipleblocksto multiplemappers. TheFileInputFormatuses theInputPath. We have similarlyFileOutputFormat. - When referring to
Stringsfor example in output you refer tojob.setOutputKeyClass(Text.class) - You can give the
inputPathmultiple*for all files in directory or all directories or just use theJobsapi to add multiple paths. - You have access to main
Jobobject which handles the jobfromyourmain. - The
idof thekeyobject brought tomapperis the offset from file (it has also the actualkey). - In mapper you use
context.writein order to write the result (contextis parameter tomapper). - If you write a string as output from mapper you write it with
new Text(somestr) mapperis a pure function it gets inputkey, valueand emitskey, valueor multiple keys and values. So as its as much pure as possible its not intended to perform states meaning it will not combine results for mapping internally (seecombinersif you need that).Reducersreceive akey -> valuesit will get called multiple times withsortedkeys.
Example Hadoop Map Reduce
You need to have 3 files.
- Job manager.
- Mapper.
- Reducer.
Let's see each of them in an exmaple.
1. JobManager
/**
* Setup the job.
*/
public static void main(String[] args) throws Exception {
// inputs
if (args.length != 2) {
System.out.printf("Usage: YourJob );
System.exit(-1);
}
// Set job inputs outputs.
Job job = new Job();
job.setJarByClass(YourJob.class);
job.setJobName("Your Job Length");
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(YourMapper.class);
job.setReducerClass(YourReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
// exit..
boolean success = job.waitForCompletion(true);
System.exit(success ? 0 : 1);
}
2. YourMapper
public class YourMapper extends Mapper<LongWritable, Text, Text, IntWritable> { //
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
final String line = value.toString();
for (String word: line.split("\\W+")) {
context.write(context, new Text(word), new IntWritable(1)); // write mapper output.
}
}
3. YourReducer
public class YourReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
/**
* Note you get a key and then a list of all values which were emitted for this key.
* The keys which are handed to reducers are sorted.
*/
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int count = 0;
for (IntWritable value: values) {
count++;
}
context.write(new Text(key), new IntWritable(count)); // we use context to write output to file.
}
Tool Runner
Formalizes the command line arguments into hadoop jobs so you don't need to mess with them yourself they will have the same pattern.
hadoop jar somejar.jar YourMain -D mapred.reduce.tasks=2 someinputdir someoutputdirState VS Stateless
map reduce are inherently stateless (pure functions), very nice, you should stick to it as much as possible. however if you need state persisted (but reallymwar, try to find other ways before reverting to state) you will use the
public void setup(Context context) method its a standard setup method. Likewise you have a cleanup method which is called after map/reduce finishes. It's quiete common that in cleanup if you used some states you will write out the state to disk. Note In order to use toolrunner your job main class should extends Configured implements Tool Then instead of main method you will override the run method inherited from toolrunner.public class YourJob extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
// TODO Auto-generated method stub
return 0;
}
.
.
}
You still need the
main method as you need to explicitly run the run method.public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(), new WordCount(), args);
System.exit(exitCode);
}
Combiner the local mapper mini-reducer
In wordcount mapper resulted with
1 for every word. Which is kind of silly. In order to reduce the noise and communication between mappers and reducers you can use a combiner to combine the mapper results.job.setCombinerClass(MiniReducer.class) it can actually be your standard reducer.
mapper:
map(pointer,house) --> map(house,1) mapper: map(pointer,house) --> map(house,1) combiner: map(house,1) --> map(house,2) Note: Combiner runs locally in the mapper. reducer: map(house,1) --> map(house,5)No map reduce, simple HDFS access
You can also access the
HDFS programatically without any relation to map/reduce.Filesystem.get(conf);
Path p = new Path("/somepath/file");
Distributed Cache
A command to distribute files to all mappers so that when they start up they will already have the data locally (usually for configuration or other side data).
DistributedCache.addCacheFile.. and more commands see its API for more info. you can skip using this api and just pass -files conf1.conf conf2.conf ... to toolrunner and it will distribute te in the hadoop jar task. Note that when mapper loads this file it would not need any directories it simply loads the filename no directory needed (mapper will have it in current directory).Counters
Maps, Reducers can read and write
(group, name) --> int-value should be used only for management not business logic (each of them can read any of the other). (not atomic).context.getCounter("counter-group","counter-name").increment(1);Useful hadoop commands
hadoop fs -rm -r myoutputdirhadoop fs -cat myoutputdir/part-r-00000 | headhadoop jar myjob.jar YourJobMain -D someparam=true inputdir outputdirhadoop jar myjob.jar MyJobMain -fs=file:/// -jt=local inputdir outputdir(run local job)
Partitioners
TotalOrderPartitionerso that you can partition the data and still preserve some order in the different files (like a-h file1 h-t partition 2 and still preserve that the files are in similar size not most of the data in one file (for example if you have many words starting with m and only one starting with b)).
CAUSION
- When you receive parameter to your reduce job
Text valuemultiple times (obviously) then it might be the same reference but the value can be different (it will reuse the param reference to mapper but will change hte value so do not store the localvalueit might mean different in different runs.
Scripting languages
Hive
- Kind of an
SQLfor map reduce jobs. - Can add user defined functions.
Pig
- Kind of a
scripting languagefor map reduce jobs.
Other Components
- Data transfers -
Flume,Scoop - Workflow management -
oozie,YARN
Impala
- Instead of start, read data, write data, those are living servers on hadoop cluster, so much faster (not standard
mapreduce, not evenmapreduce) - Query language similar to
HiveQL

Comments