Hadoop keypoints for beginners
Key Concepts
- Every mapper communicates with all reducers (potentially sending data to all of them).
Shuffle
- communication from mappers to reducers.Block Size
- Files are split to blocks you can any file size it would just get split into blocks (64MB / 128MB ...)Partitioner
- Splits map result to reducer the byhash
so same keys always reach same reducers. same key will always reach same reducer.- If you want your result reside in single file (cant be multiple blocks) you need a single reducer.
Hadoop default configuration
/etc/hadoop/conf
Jobs management
- Master sends the actual
jar
to be executed to data nodes. - Hadoop sends data from mappers to reducers even before
mappers
finished in order to havereducers
be able to already start working. hadoop jar
is the command that will run your jar (you should have already uploaded it to hadoop cluster with-put
- Show list of running jobs
mapred job -list
Hadoop Map Reduce
FileInputFormat
takes a file and if we have multiple blocks splits it to multiple mappers.- You should have a main (not mapper and not reducer which is your
main
) it receives the command line parameters uses theFileInputFormat
to split the file (which has usually multipleblocks
to multiplemappers
. TheFileInputFormat
uses theInputPath
. We have similarlyFileOutputFormat
. - When referring to
Strings
for example in output you refer tojob.setOutputKeyClass(Text.class)
- You can give the
inputPath
multiple*
for all files in directory or all directories or just use theJobs
api to add multiple paths. - You have access to main
Job
object which handles the jobfrom
yourmain
. - The
id
of thekey
object brought tomapper
is the offset from file (it has also the actualkey
). - In mapper you use
context.write
in order to write the result (context
is parameter tomapper
). - If you write a string as output from mapper you write it with
new Text(somestr)
mapper
is a pure function it gets inputkey, value
and emitskey, value
or multiple keys and values. So as its as much pure as possible its not intended to perform states meaning it will not combine results for mapping internally (seecombiners
if you need that).Reducers
receive akey -> values
it will get called multiple times withsortedkeys
.
Example Hadoop Map Reduce
You need to have 3 files.
- Job manager.
- Mapper.
- Reducer.
Let's see each of them in an exmaple.
1. JobManager
/**
* Setup the job.
*/
public static void main(String[] args) throws Exception {
// inputs
if (args.length != 2) {
System.out.printf("Usage: YourJob );
System.exit(-1);
}
// Set job inputs outputs.
Job job = new Job();
job.setJarByClass(YourJob.class);
job.setJobName("Your Job Length");
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(YourMapper.class);
job.setReducerClass(YourReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
// exit..
boolean success = job.waitForCompletion(true);
System.exit(success ? 0 : 1);
}
2. YourMapper
public class YourMapper extends Mapper<LongWritable, Text, Text, IntWritable> { //
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
final String line = value.toString();
for (String word: line.split("\\W+")) {
context.write(context, new Text(word), new IntWritable(1)); // write mapper output.
}
}
3. YourReducer
public class YourReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
/**
* Note you get a key and then a list of all values which were emitted for this key.
* The keys which are handed to reducers are sorted.
*/
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int count = 0;
for (IntWritable value: values) {
count++;
}
context.write(new Text(key), new IntWritable(count)); // we use context to write output to file.
}
Tool Runner
Formalizes the command line arguments into hadoop jobs so you don't need to mess with them yourself they will have the same pattern.
hadoop jar somejar.jar YourMain -D mapred.reduce.tasks=2 someinputdir someoutputdir
State VS Stateless
map reduce are inherently stateless (pure functions), very nice, you should stick to it as much as possible. however if you need state persisted (but reallymwar, try to find other ways before reverting to state) you will use the
public void setup(Context context)
method its a standard setup method. Likewise you have a cleanup
method which is called after map/reduce finishes. It's quiete common that in cleanup
if you used some states you will write out the state to disk. Note In order to use toolrunner
your job main class should extends Configured implements Tool
Then instead of main
method you will override
the run
method inherited from toolrunner
.public class YourJob extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
// TODO Auto-generated method stub
return 0;
}
.
.
}
You still need the
main
method as you need to explicitly run the run
method.public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(), new WordCount(), args);
System.exit(exitCode);
}
Combiner the local mapper mini-reducer
In wordcount mapper resulted with
1
for every word. Which is kind of silly. In order to reduce the noise and communication between mappers and reducers you can use a combiner to combine the mapper results.job.setCombinerClass(MiniReducer.class)
it can actually be your standard reducer
.
mapper:
map(pointer,house) --> map(house,1)
mapper: map(pointer,house) --> map(house,1)
combiner: map(house,1) --> map(house,2)
Note: Combiner runs locally in the mapper. reducer: map(house,1) --> map(house,5)
No map reduce, simple HDFS access
You can also access the
HDFS
programatically without any relation to map/reduce.Filesystem.get(conf);
Path p = new Path("/somepath/file");
Distributed Cache
A command to distribute files to all mappers so that when they start up they will already have the data locally (usually for configuration or other side data).
DistributedCache.addCacheFile..
and more commands see its API for more info. you can skip using this api and just pass -files conf1.conf conf2.conf ...
to toolrunner
and it will distribute te in the hadoop jar
task. Note that when mapper loads this file it would not need any directories it simply loads the filename no directory needed (mapper will have it in current directory).Counters
Maps, Reducers can read and write
(group, name) --> int-value
should be used only for management not business logic (each of them can read any of the other). (not atomic).context.getCounter("counter-group","counter-name").increment(1);
Useful hadoop commands
hadoop fs -rm -r myoutputdir
hadoop fs -cat myoutputdir/part-r-00000 | head
hadoop jar myjob.jar YourJobMain -D someparam=true inputdir outputdir
hadoop jar myjob.jar MyJobMain -fs=file:/// -jt=local inputdir outputdir
(run local job)
Partitioners
TotalOrderPartitioner
so that you can partition the data and still preserve some order in the different files (like a-h file1 h-t partition 2 and still preserve that the files are in similar size not most of the data in one file (for example if you have many words starting with m and only one starting with b)).
CAUSION
- When you receive parameter to your reduce job
Text value
multiple times (obviously) then it might be the same reference but the value can be different (it will reuse the param reference to mapper but will change hte value so do not store the localvalue
it might mean different in different runs.
Scripting languages
Hive
- Kind of an
SQL
for map reduce jobs. - Can add user defined functions.
Pig
- Kind of a
scripting language
for map reduce jobs.
Other Components
- Data transfers -
Flume
,Scoop
- Workflow management -
oozie
,YARN
Impala
- Instead of start, read data, write data, those are living servers on hadoop cluster, so much faster (not standard
mapreduce
, not evenmapreduce
) - Query language similar to
HiveQL
Comments