Map Reduce and Hadoop

A short introduction to the basic ideas.

1 Cloud Computing

1.1 Everyone Agrees

1.2 Driving Factors

1.3 Death of Moore's Law

CPU Frequency

1.4 Google, Goose Creek, SC

Google Data Center in Goose Creek, SC. [5]

1.5 Governor Sanford

Photo op. [6]

1.6 Map Reduce

2 Functional Programming

2.1 Functions

d = function (x) {return 2*x;};

t = function (x) {return 3*x};

f = function (a,b) {
    return d(a) + t(b);
};


2.2 By Value Only

a = "hello";

reverse = function(x) {
  var result = "";
  for (i = 0; i < x.length; i++){
      result = x[i] + result;
  };
  return result;
};

reverse(a);

2.3 Map

t = function(x) {return 3*x;};

[1,2,3,4,5].map(t);

2.4 Reduce

add = function(a, b){ return a + b; };

[0, 1, 2, 3].reduce(add);

2.5 Map then Reduce

count = function(x) {return x.length;};

add = function(x,y) {return x + y;};

l = ["The", "status", "is", "not", "quo"];

l.map(count).reduce(add);

2.6 On a Large Scale

3 Hadoop

3.1 Map Function

3.2 Reduce Function

3.3 Process

  1. Programmer defines map and reduce functions.
  2. Map input is a very large file. Hadoop distributes it among CPUs.
  3. Run map functions on section of input.
  4. Output keys and their values are accumulated and sorted.
  5. Apply reduce to all the values of each output key.

3.4 Parallelism

3.5 Example: Count Word Occurrences

/input_key: document name
//input_value: document contents
map(String input_key, String input_value):
  for each word w in input_value:
    EmitIntermediate(w,"1");

//output_key: a word
//output_values: al list of counts
reduce(String output_key, Iterator intermediate_values):
  in result = 0;
  for each v in intermediate_values:
    result+= ParseInt(v)
  Emit(AsString(result))

3.6 WordCount in Java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. */
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * This is an example Hadoop Map/Reduce application.
 * It reads the text input files, breaks each line into words
 * and counts them. The output is a locally sorted list of words and the 
 * count of how often they occurred.
 *
 * To run: bin/hadoop jar build/hadoop-examples.jar wordcount
 *            [-m <i>maps</i>] [-r <i>reduces</i>] <i>in-dir</i> <i>out-dir</i> 
 */
public class WordCount extends Configured implements Tool {
  
  /**
   * Counts the words in each line.
   * For each line of input, break the line into words and emit them as
   * (<b>word</b>, <b>1</b>).
   */
  public static class MapClass extends MapReduceBase
    implements Mapper<LongWritable, Text, Text, IntWritable> {
    
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    
    public void map(LongWritable key, Text value, 
                    OutputCollector<Text, IntWritable> output, 
                    Reporter reporter) throws IOException {
      String line = value.toString();
      StringTokenizer itr = new StringTokenizer(line);
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        output.collect(word, one);
      }
    }
  }
  
  /**
   * A reducer class that just emits the sum of the input values.
   */
  public static class Reduce extends MapReduceBase
    implements Reducer<Text, IntWritable, Text, IntWritable> {
    
    public void reduce(Text key, Iterator<IntWritable> values,
                       OutputCollector<Text, IntWritable> output, 
                       Reporter reporter) throws IOException {
      int sum = 0;
      while (values.hasNext()) {
        sum += values.next().get();
      }
      output.collect(key, new IntWritable(sum));
    }
  }
  
  static int printUsage() {
    System.out.println("wordcount [-m <maps>] [-r <reduces>] <input> <output>");
    ToolRunner.printGenericCommandUsage(System.out);
    return -1;
  }
  
  /**
   * The main driver for word count map/reduce program.
   * Invoke this method to submit the map/reduce job.
   * @throws IOException When there is communication problems with the 
   *                     job tracker.
   */
  public int run(String[] args) throws Exception {
    JobConf conf = new JobConf(getConf(), WordCount.class);
    conf.setJobName("wordcount");
 
    // the keys are words (strings)
    conf.setOutputKeyClass(Text.class);
    // the values are counts (ints)
    conf.setOutputValueClass(IntWritable.class);
    
    conf.setMapperClass(MapClass.class);        
    conf.setCombinerClass(Reduce.class);
    conf.setReducerClass(Reduce.class);
    
    List<String> other_args = new ArrayList<String>();
    for(int i=0; i < args.length; ++i) {
      try {
        if ("-m".equals(args[i])) {
          conf.setNumMapTasks(Integer.parseInt(args[++i]));
        } else if ("-r".equals(args[i])) {
          conf.setNumReduceTasks(Integer.parseInt(args[++i]));
        } else {
          other_args.add(args[i]);
        }
      } catch (NumberFormatException except) {
        System.out.println("ERROR: Integer expected instead of " + args[i]);
        return printUsage();
      } catch (ArrayIndexOutOfBoundsException except) {
        System.out.println("ERROR: Required parameter missing from " +
                           args[i-1]);
        return printUsage();
      }
    }
    // Make sure there are exactly 2 parameters left.
    if (other_args.size() != 2) {
      System.out.println("ERROR: Wrong number of parameters: " +
                         other_args.size() + " instead of 2.");
      return printUsage();
    }
    conf.setInputPath(new Path(other_args.get(0)));
    conf.setOutputPath(new Path(other_args.get(1)));
        
    JobClient.runJob(conf);
    return 0;
  }
  
  
  public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new Configuration(), new WordCount(), args);
    System.exit(res);
  }

}

3.7 Language Support

3.8 Hadoop Behind the Scenes

  1. Tries to have map task in same machine as data.
  2. If a machine dies, its jobs get re-run automatically.
  3. If a key-value causes a crash, it gets ignored.
  4. If slow map task, several copies are started.
  5. A combiner (mini-reduce) can run on the same machine as a map.

3.9 Hadoop Conclusions

URLs

  1. Craig Mundie, http://www.technologyreview.com/computing/21422/?a=f
  2. RoundTable, http://www.technologyreview.com/computing/21422/?a=f
  3. Bezos, http://cnettv.cnet.com/2001-1_53-24640.html
  4. Blue Cloud, http://www.youtube.com/watch?v=zfLVvk7CjY4
  5. http://www.flickr.com/photos/29456235@N04/sets/72157607883586121/
  6. http://thedigitel.com/news/searching-times-1783-1008
  7. *, http://www.cs.stanford.edu/people/ang/papers/nips06-mapreducemulticore.pdf
  8. Hadoop, http://hadoop.apache.org

This talk available at http://jmvidal.cse.sc.edu/talks/mapreduce/
Copyright © 2009 José M. Vidal . All rights reserved.

27 October 2008, 03:03PM