Python使用Hadoop集群

$MapReduce$ 是一个编程模型,也是一个处理和生成超大数据集的算法模型的相关实现。用户首先创建一个 $Map$ 函数将数据处理成 $key/value$ 格式的数据集合;然后再创建一个 $Reduce$ 函数用来合并所有的具有相同 $key$ 值的 $value$ 值。

编写MapReduce函数

以下的实例是 $Google$ 在 $MapReduce$ 的论文中提到的一个计算单词个数的例子。使用 $Python$ 编写 $MapReduce$ 函数与通常编写程序的区别是我们需要利用 $Hadoop$ 流的 $API$,通过标准输入($sys.stdin$)、标准输出($sys.stdout$)在 $Map$ 函数和 $Reduce$ 函数之间传递数据,其余的事情 $Hadoop$ 流将会帮助我们完成。

Mapper部分

1
2
3
4
5
6
7
#!/usr/bin/env python
import sys
for line in sys.stdin:
line = line.strip()
words = line.split()
for word in words:
print "%s\t%s" % (word, 1)

文件从标准输入 sys.stdin 读取文件后把单词切开,并把单词和词频输出标准输出。$mapper$ 不计算单词的总数,而是输出 (word, 1),方便让随后的 $reducer$ 做统计工作。

对文件赋权以使其可以运行

1
chmod +x mapper.py

Reducer部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
!/usr/bin/env python
import sys

current_word = None
current_count = 0
word = None

for line in sys.stdin:
line = line.strip()
word, count = line.split('\t', 1)
try:
count = int(count)
except ValueError:
continue
if current_word == word:
current_count += count
else:
if current_word:
print "%s\t%s" % (current_word, current_count)
current_count = count
current_word = word

if word == current_word:
print "%s\t%s" % (current_word, current_count)

按照 $Hadoop$ 的设计理念,$mapper$ 输出的相同的 $key/value$ 数据会存储在同一个 $part$ 上,并且 $mapper$ 的输出会自动作为 $reducer$ 的输入。在这个需求中,$reducer$ 要做的事情只是依次接受集群的标准输入,并统计连续单词的个数,因为在这个例子中,$key$ 代表的就是单词。

同样对文件赋权以使其可以运行

1
chmod +x reducer.py

本地测试

通常在把任务提交给 $Hadoop$ 集群执行的之前,都进行一次本地测试。本地与 $Hadoop$ 环境不同之处在于我们需要手动对 $mapper$ 的输出按照 $key$ 来进行排序,模拟 $Hadoop$ 集群场景。

1
cat data | mapper | sort | reducer > output

实例:

1
2
cat news.merge.0.json | python mapper.py | sort -k1,1 | python reducer.py > ret
cat file_input | python mapper.py > ret

第一行是对当前统计单词个数的测试命令,可以拓展到含有 $mapper$ 和 $reducer$ 的任务中,第二行是对 $mapper$ 的单独测试,同样也可是应用到只有 $mapper$ 没有 $reducer$ 的任务中。

使用 Hadoop 集群执行任务

集群运行

在把任务提交给集群执行时,我们常常将诸多命令合并到一个脚本文件中,一次性执行所有内容

1
nohup sh -x hadoop.sh &

$hadoop.sh$ 的书写方法如下
对于含有 $mapper$ 和 $reducer$ 的任务

1
2
3
4
5
6
7
8
9
10
11
12
13
#!/bin/bash
HADOOP='/home/work/infra/infra-client/bin/hadoop'

${HADOOP} fs -rmr /user/.../output

${HADOOP} jar /home/work/infra/infra-client/bin/current/common-infra_client-pack/bin/current/c3prc-hadoop-hadoop-pack/share/hadoop/tools/lib/hadoop-streaming-2.6.0-mdh3.11-jre8-SNAPSHOT.jar \
-D mapreduce.job.queuename=root.production.cloud_group.qabot \
-input /user/..../input-data \
-output /user/.../output/\
-mapper 'python mapper.py' \
-reducer 'python reducer.py' \
-file './mapper.py' \
-file './reducer.py'

只有 $mapper$ 没有 $reducer$ 的任务

1
2
3
4
5
6
7
8
9
10
11
12
#!/bin/bash
HADOOP=/home/work/infra/infra-client/bin/hadoop

${HADOOP} fs -rmr /user/.../output

${HADOOP} jar /home/work/infra/infra-client/bin/current/common-infra_client-pack/bin/current/c3prc-hadoop-hadoop-pack/share/hadoop/tools/lib/hadoop-streaming-2.6.0-mdh3.11-jre8-SNAPSHOT.jar \
-D mapreduce.job.queuename=root.production.cloud_group.qabot \
-input /user/..../input-data \
-output /user/.../output/\
-mapper 'python mapper.py' \
-reducer 'cat' \
-file './mapper.py' \

$nohup$ 中会实时更新集群对任务的处理进度,任务执行结束后会在指定的 $output$ 目录下输出一个 $part-00000$ 文件。

其中,-file命令用于上传文件到 $Hadoop$ 集群,若 $mapper$ 或 $reducer$ 中读入了额外的文件,同样需要将该程序上传到 $Hadoop$ 集群上。

常用的 Hadoop Shell 命令

$Hadoop$ 所有的对文件的操作都需要调用文件系统,所以其命令的格式为

1
hadoop fs -shell

详细的命令可以查看这里:Hadoop Shell

get

1
hadoop fs -get [-ignorecrc] [-crc] <src> <localdst>

复制文件到本地文件系统。可用 -ignorecrc 选项复制 CRC 校验失败的文件。使用 -crc 选项复制文件以及 CRC 信息。

1
2
hadoop fs -get /user/hadoop/file localfile
hadoop fs -get hdfs://host:port/user/hadoop/file localfile

put

1
hadoop fs -put <localsrc> ... <dst>

从本地文件系统中复制单个或多个源路径到目标文件系统。也支持从标准输入中读取输入写入目标文件系统。

1
2
3
4
hadoop fs -put localfile /user/hadoop/hadoopfile
hadoop fs -put localfile1 localfile2 /user/hadoop/hadoopdir
hadoop fs -put localfile hdfs://host:port/hadoop/hadoopfile
hadoop fs -put - hdfs://host:port/hadoop/hadoopfile

参考文献:
[1] MapReduce: Simplified Data Processing on Large Clusters-Google
[2] 用python写MapReduce函数 以WordCount为例-CNBolgs
[3] Hadoop Shell命令-Apache Hadoop