Abracadabra

Hadoop ch02 MapReduce notes

MapReduce

首先我们有一个数据集,关于天气的,然后它的每一条记录是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
0057
332130 # USAF weather station identifier
99999 # WBAN weather station identifier
19500101 # observation date
0300 # observation time
4
+51317 # latitude (degrees x 1000)
+028783 # longitude (degrees x 1000)
FM-12
+0171 # elevation (meters)
99999
V020
320 # wind direction (degrees)
1 # quality code
N
0072
1
00450 # sky ceiling height (meters)
1 # quality code
C
N
010000 # visibility distance (meters)
1 # quality code
N
9
-0128 # air temperature (degrees Celsius x 10)
1 # quality code
-0139 # dew point temperature (degrees Celsius x 10)
1 # quality code
10268 # atmospheric pressure (hectopascals x 10)
1 # quality code

当然以上数据是经过处理之后的,一开始它长这样:

1
2
3
4
5
0067011990999991950051507004...9999999N9+00001+99999999999...
0043011990999991950051512004...9999999N9+00221+99999999999...
0043011990999991950051518004...9999999N9-00111+99999999999...
0043012650999991949032412004...0500001N9+01111+99999999999...
0043012650999991949032418004...0500001N9+00781+99999999999...

Hmmm….

这个天气数据集按照气象站编号-年份的形式来组织的:

1
2
3
4
5
6
7
8
9
10
010010-99999-1990.gz
010014-99999-1990.gz
010015-99999-1990.gz
010016-99999-1990.gz
010017-99999-1990.gz
010030-99999-1990.gz
010040-99999-1990.gz
010080-99999-1990.gz
010100-99999-1990.gz
010150-99999-1990.gz

这个原始数据显然用起来不方便,所以按照年份给它聚个类,用了如下方法:

1
2
3
4
5
6
7
8
9
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-D mapred.reduce.tasks=0 \
-D mapred.map.tasks.speculative.execution=false \
-D mapred.task.timeout=12000000 \
-input ncdc_files.txt \
-inputformat org.apache.hadoop.mapred.lib.NLineInputFormat \
-output output \
-mapper load_ncdc_map.sh \
-file load_ncdc_map.sh

然后里面用到的ncdc_files以及load_ncdc_map.sh这两个文件是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
s3n://hadoopbook/ncdc/raw/isd-1901.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1902.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1903.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1904.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1905.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1906.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1907.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1908.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1909.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1910.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1911.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1912.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1913.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1914.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1915.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1916.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1917.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1918.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1919.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1920.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1921.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1922.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1923.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1924.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1925.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1926.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1927.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1928.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1929.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1930.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1931.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1932.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1933.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1934.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1935.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1936.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1937.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1938.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1939.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1940.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1941.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1942.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1943.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1944.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1945.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1946.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1947.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1948.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1949.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1950.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1951.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1952.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1953.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1954.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1955.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1956.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1957.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1958.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1959.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1960.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1961.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1962.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1963.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1964.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1965.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1966.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1967.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1968.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1969.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1970.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1971.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1972.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1973.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1974.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1975.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1976.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1977.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1978.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1979.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1980.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1981.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1982.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1983.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1984.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1985.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1986.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1987.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1988.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1989.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1990.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1991.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1992.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1993.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1994.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1995.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1996.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1997.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1998.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1999.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-2000.tar.bz2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#!/usr/bin/env bash
# NLineInputFormat gives a single line: key is offset, value is S3 URI
read offset s3file
# Retrieve file from S3 to local disk
echo "reporter:status:Retrieving $s3file" >&2
$HADOOP_INSTALL/bin/hadoop fs -get $s3file .
# Un-bzip and un-tar the local file
target=`basename $s3file .tar.bz2`
mkdir -p $target
echo "reporter:status:Un-tarring $s3file to $target" >&2
tar jxf `basename $s3file` -C $target
# Un-gzip each station file and concat into one file
echo "reporter:status:Un-gzipping $target" >&2
for file in $target/*/*
do
gunzip -c $file >> $target.all
echo "reporter:status:Processed $file" >&2
done
# Put gzipped version into HDFS
echo "reporter:status:Gzipping $target and putting in HDFS" >&2
gzip -c $target.all | $HADOOP_INSTALL/bin/hadoop fs -put - gz/$target.gz

嗯…顺便说一句,这个文件是存在AWS上的,所以想用的话要有一个AWS账号,想要有个账号呢,你得先有个可以支付美刀的信用卡。

Hmmmmm…

其实作者给的sample data也挺好的我觉得,在这里.

那么我们的问题就是说,找出每一年的最高的温度。先看看不用Hadoop的实现方法,事实证明我shell脚本还是宝刀未老的。

1
2
3
4
5
6
7
8
9
10
#!/usr/bin/env bash
for year in all/*
do
echo -ne `basename $year .gz`"\t"
gunzip -c $year | \
awk '{ temp = substr($0, 88, 5) + 0;
q = substr($0, 93, 1);
if (temp !=9999 && q ~ /[01459]/ && temp > max) max = temp }
END { print max }'
done

结果如下:

1
2
3
4
5
6
7
% ./max_temperature.sh
1901 317
1902 244
1903 289
1904 256
1905 283
...

啊嘞,还不错的样子,但是对于大数据速度还是慢了点儿,所以直接上Hadoop看看。

对于以上的问题呢,MapReduce是这样解决的

mapred_pipeline

注意了,上面一行是hadoop的术语,下面呢,其实就是Unixpipe了,这给我们不用Java来实现提供了可能。

好了下面开始coding了,拿起键盘就是GAN

为了实现我们的任务,我们需要三个java文件,一个mapper,一个reducer。这俩是苦工,还要一个监工。

Mapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// cc MaxTemperatureMapper Mapper for maximum temperature example
// vv MaxTemperatureMapper
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MaxTemperatureMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {
private static final int MISSING = 9999;
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}
// ^^ MaxTemperatureMapper

Reducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// cc MaxTemperatureReducer Reducer for maximum temperature example
// vv MaxTemperatureReducer
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MaxTemperatureReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}
// ^^ MaxTemperatureReducer

Job

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// cc MaxTemperature Application to find the maximum temperature in the weather dataset
// vv MaxTemperature
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MaxTemperature {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: MaxTemperature <input path> <output path>");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(MaxTemperature.class);
job.setJobName("Max temperature");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
// ^^ MaxTemperature

然后这么运行:

1
2
% export HADOOP_CLASSPATH=hadoop-examples.jar
% hadoop MaxTemperature input/ncdc/sample.txt output

但是如果数据量非常大的话,需要在MapperReducer之间传递大量的数据,这个时候可以引入Combiner,它的机理是这样的。假如我有两个mapper,它们的输出结果是这样子的:

1
2
3
(1950, 0)
(1950, 20)
(1950, 10)

以及这样子的:

1
2
(1950, 25)
(1950, 15)

如果没有combiner的话,它们会先变成这样子:

1
(1950, [0, 20, 10, 25, 15])

然后作为reducer的输入,但是如果加入了combiner的话,相当于上面的问题变成了这样

max(0, 20, 10, 25, 15) = max(max(0, 20, 10), max(25, 15)) = max(20, 25) = 25

是不是简单多了。但是注意了,并不是所有的问题都是这样,比如下面这个问题:

mean(0, 20, 10, 25, 15) = 14

mean(mean(0, 20, 10), mean(25, 15)) = mean(10, 20) = 15

所以说要根据具体情况来定,不能直接套用。

好了我们继续combiner的话题,我们怎么把这货加到hadoop的流程中去呢,其实很简单,这样就可以:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// cc MaxTemperatureWithCombiner Application to find the maximum temperature, using a combiner function for efficiency
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
// vv MaxTemperatureWithCombiner
public class MaxTemperatureWithCombiner {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: MaxTemperatureWithCombiner <input path> " +
"<output path>");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(MaxTemperatureWithCombiner.class);
job.setJobName("Max temperature");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MaxTemperatureMapper.class);
/*[*/job.setCombinerClass(MaxTemperatureReducer.class)/*]*/;
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
// ^^ MaxTemperatureWithCombiner

没错,combinerreducer是一样的。其实仔细想想这也很自然,因为它们俩实际实现的功能是一样的。

Hadoop Streaming

作为一个machine learning专业的,有时候用Java还是感觉挺不爽的,哪有Python啊,Ruby啊这种脚本语言方便嘛。所以hadoop还是很人性地提供了解决方法,就是标题所表示的技术。直接看代码怎么用吧。

Ruby

Map

1
2
3
4
5
6
7
#!/usr/bin/env ruby
STDIN.each_line do |line|
val = line
year, temp, q = val[15,4], val[87,5], val[92,1]
puts "#{year}\t#{temp}" if (temp != "+9999" && q =~ /[01459]/)
end

Reduce

1
2
3
4
5
6
7
8
9
10
11
12
13
#!/usr/bin/env ruby
last_key, max_val = nil, -1000000
STDIN.each_line do |line|
key, val = line.split("\t")
if last_key && last_key != key
puts "#{last_key}\t#{max_val}"
last_key, max_val = key, val.to_i
else
last_key, max_val = key, [max_val, val.to_i].max
end
end
puts "#{last_key}\t#{max_val}" if last_key

然后这样调用:

1
2
3
4
5
% hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-input input/ncdc/sample.txt \
-output output \
-mapper ch02-mr-intro/src/main/ruby/max_temperature_map.rb \
-reducer ch02-mr-intro/src/main/ruby/max_temperature_reduce.rb

是不是很方便?如果要加上combiner的话,更方便了,都不用再写额外的文件:

1
2
3
4
5
6
7
8
% hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-files ch02-mr-intro/src/main/ruby/max_temperature_map.rb,\
ch02-mr-intro/src/main/ruby/max_temperature_reduce.rb \
-input input/ncdc/all \
-output output \
-mapper ch02-mr-intro/src/main/ruby/max_temperature_map.rb \
-combiner ch02-mr-intro/src/main/ruby/max_temperature_reduce.rb \
-reducer ch02-mr-intro/src/main/ruby/max_temperature_reduce.rb

注意,以上的-files命令是为了在集群环境下运行时,将脚本复制到各子节点上。

Python

啊,Python大大出场,其实和Ruby没啥区别。

Map

1
2
3
4
5
6
7
8
9
10
#!/usr/bin/env python
import re
import sys
for line in sys.stdin:
val = line.strip()
(year, temp, q) = (val[15:19], val[87:92], val[92:93])
if (temp != "+9999" and re.match("[01459]", q)):
print "%s\t%s" % (year, temp)

Reduce

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#!/usr/bin/env python
import sys
(last_key, max_val) = (None, -sys.maxint)
for line in sys.stdin:
(key, val) = line.strip().split("\t")
if last_key and last_key != key:
print "%s\t%s" % (last_key, max_val)
(last_key, max_val) = (key, int(val))
else:
(last_key, max_val) = (key, max(max_val, int(val)))
if last_key:
print "%s\t%s" % (last_key, max_val)

运行都是一样的,就不多做赘述了。