Hadoop Streaming 实践以及Debug

Hadoop Streaming是一个便于变成Map Reduce进程的工具包,这个工具包可以支持各种可执行/脚本语言来创建Mapper和Reducer,利用Hadoop的优势进行大数据的处理,这些语言仅仅只需要支持*unix的表示输出输入即可(python,c,c++,perl,akw etc.)

Streaming实践

先直接来看一个由python写的Streaming进程,还有那个经典的word count,我们的数据集是一篇英语作文,
看来看他的mapper文档

1
2
3
4
5
6
7
8
9
10
11



import sys,re

re_english = re.compile(u'[^a-zA-Z0-9-]+')

for line in sys.stdin: #这里你可以看做是map类中的line输入
words = re_english.sub(' ',line.strip()) #这里只提取英文数字
for word in words.split():
print '%st%s' % (word, 1) #这儿就是标准的输出,用tab隔开 默认第一个值为key

其实看上面的mapper文档还是挺带感的,和标准的mapper类很类似,这里就不解释了,相信用java写过标准Map-Reduce都应该很熟悉

现在再来看reducer文档

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
!/usr/bin/env python
#-*- coding=utf8 -*-

import sys

lastk = None #这里标志最后一个k 用于控制同一个key 到一个组中
count = 0

for line in sys.stdin:
w,c = line.split('t')
c = int(c) #不转成int会比较麻烦 这是是计数
if lastk == None: #这里是判断是否过来的是第一个key
lastk=w
count += c
elif lastk == w:
count += c
else:
print "%st%s"%(lastk,count)
lastk=w
count = c #这里重置计数

if lastk is not None:
print "%st%s"%(lastk,count)

陌生感来了把~其实这里是这样的:

都说了是Streaming,他其实是流式进来的,在进来之前还是和标准的mr一样按key进行partation划分到各个桶中,然后每个桶会有若干个key,这里按key分组一次会将记录一条一条的使用*unix的标准输入 读入道sys.stdin中,那么问题了来了,原来mr中的 迭代器的值如何构造?这里主要使用lastk来的变量,每次当输入的key与lastk相等的时候,将当前的值加入到字典或者数组中(因为这个demo是wordcount,所以用累加计数来代替了,第16行),直到key与lastk不等时(第18行),此时的数组或者字典就是原来 值的迭代器里面的东西,和正常的mr一样操作,该输出的输出,完了之后同时得更新lastk以迎接下一组key的到来,同时清空数组或者字典,周而复始,直至全部输入之后,判断我的lastk是否存在值,有的话这个lastk作为最后一组key进行输出(第23行),这样的方式就可以构造出原来的(key,iter[value])模式了

上述的构造看上去些代码可能更加麻烦一点,但是其实这样的方式是应该灵活了

现在mapperreducer两个文档写完了,该如何执行呢?

1
2
3
4
5
6
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.0.jar 
-input /yyl/data/words.txt
-output /yyl/test/ouput/streaming
-mapper $HADOOP_HOME/runjar/pyscript/word_count_mapper.py
-reducer $HADOOP_HOME/runjar/pyscript/word_count_reducer.py
-file $HADOOP_HOME/runjar/pyscript/*.py

其实提交的方式很类似原生的jar包提交,只是这里的jar是使用了Hadoop自带的streaming包,敲火车键进行执行

15/11/07 20:46:47 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
15/11/07 20:46:47 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
packageJobJar: [/root/program/hadoop-2.7.0/runjar/pyscript/word_count_mapper.py, /root/program/hadoop-2.7.0/runjar/pyscript/word_count_reducer.py, /tmp/hadoop-unjar1825196483906999229/] [] /tmp/streamjob6480432411236657839.jar tmpDir=null
15/11/07 20:46:51 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.56.2:8032
15/11/07 20:46:51 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.56.2:8032
15/11/07 20:46:53 INFO mapred.FileInputFormat: Total input paths to process : 1
15/11/07 20:46:54 INFO mapreduce.JobSubmitter: number of splits:2
15/11/07 20:46:54 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1446946409340_0001
15/11/07 20:46:55 INFO impl.YarnClientImpl: Submitted application application_1446946409340_0001
15/11/07 20:46:55 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1446946409340_0001/
15/11/07 20:46:55 INFO mapreduce.Job: Running job: job_1446946409340_0001
15/11/07 20:47:12 INFO mapreduce.Job: Job job_1446946409340_0001 running in uber mode : false
15/11/07 20:47:12 INFO mapreduce.Job:  map 0% reduce 0%
15/11/07 20:47:33 INFO mapreduce.Job:  map 100% reduce 0%
15/11/07 20:47:46 INFO mapreduce.Job:  map 100% reduce 100%
15/11/07 20:47:47 INFO mapreduce.Job: Job job_1446946409340_0001 completed successfully
15/11/07 20:47:47 INFO mapreduce.Job: Counters: 49
    File System Counters
----此处和mr一样  省略1w字
    File Input Format Counters
        Bytes Read=2223
    File Output Format Counters
        Bytes Written=1208
15/11/07 20:47:47 INFO streaming.StreamJob: Output directory: /yyl/test/ouput/streaming

然后来查看熟悉的word count结果

[root@master pyscript]# hadoop fs -get /yyl/test/ouput/streaming
15/11/07 20:48:16 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/11/07 20:48:19 WARN hdfs.DFSClient: DFSInputStream has been closed already
15/11/07 20:48:19 WARN hdfs.DFSClient: DFSInputStream has been closed already
[root@master pyscript]# cd streaming/
[root@master streaming]# sort -nr -k 2 part-00000 | head -n 10
and    11
to    10
people    10
Micro-blog    8
their    6
a    5
other    4
of    4
more    4
has    4

取计数最高的10个,看到了熟悉的字样,好了,这样一次streaming写执行完了,是不是甚是方便

这里有一个提示,可以再通hadoop版本hadoop-streaming-*.jar的位置不一样,你可以使用find命令进行查找具体的位置

到了这里已经可以基本执行streaming进程了,但是从上面的跑的命令里可以看到有好多配置的样子,还有另外可以发现,在写的mapperreducer中只写了数据的处理逻辑,其他的一些配置参数根本无法写入,那么这些东西都是得在执行的命令里面进行配置的,他可以有的配置参数大致有如下几个

参数名称 可选/必选 描述
-input 必选 输入文档/目录的位置
-output 必选 输出目录
-mapper 必选 mapper的执行文档或者JavaClassName
-reducer 必选 reducer的执行或者JavaClassName
-file 必选 执行的mapper或者reducer文档以及其依赖文档,一定要写,多个可以写多行,他会共享到各个节点上,也可以是jar包
-inputformat 可选 JavaClassName,为自定义的输入格式,默认是TextInputFormat
-outputformat 可选 JavaClassName,为自定义的输出格式,默认是TextOutputformat
-partitioner 可选 JavaClassName,为自定义的分区函数
-combiner 可选 mapper输出之后的合并类,是JavaClassName
-cmdenv 可选 name=value为输入到流命令里面的环境变量
-inputreader 可选 貌似可以代替-inputformat这个东西
-verbose 可选 启用javaverbose输出
-lazyOutput 可选 当输出格式为FileOutputFormat时,可以配置为懒输出-_-
-numReduceTasks 可选 指定的reducer的数目
-mapdebug 可选 指定一个脚本当mapper失败的时候进行调用
-reducedebug 可选 指定一个脚本当reducer失败的时候进行调用
-conf 可选 指定配置文档
-D 可选 property=value 可以配置Hadoop原生的配置项 实用^_^

这么看来,streaming还是很强大以及很灵活的

Streaming调试

从上述的配置中可以看到 可以配置mapdebugreducedebug来追踪streaming中的错误信息来进行调试,除了这种方式,streaming调试还有一种更加方便的方式,
先来看streaming的执行过程mapper->shuffle->reducer,数据以流的方式进行传递的,在Linux中可以配合自带的命令以及官道来完成这一过程,现在可以看模拟刚刚的demo执行

1
[[email protected] pyscript]# cat ~/data/words.txt | python word_count_mapper.py | sort | python word_count_reducer.py | sort -nr -k 2 |head -n 10

可以看到其输出

and    11
to    10
people    10
Micro-blog    8
their    6
a    5
other    4
of    4
more    4
has    4

与上述demo中的结果一模一样,现在大致来分解一下上述命令

  1. cat ~/data/words.txt :输出文档内容
  2. | python word_count_mapper.py:管道命令 将上一步输出的内容输出到要执行的mapper
  3. | sort :管道命令 直接将mapper中输出的内容按第一列进行排序,排序完了之后其实就是达到了分组的效应
  4. | python word_count_reducer.py:管道 将排序后的值一次输入到reducer中进行执行,其实到了这一步已经完成了streaming的模拟
  5. | sort -nr -k 2 |head -n 10 将最后的结果排个序,再取top

使用上述方式来进行调试我感觉有两大优势

  1. 快,不需要提交到服务器上 慢悠悠的取执行
  2. 准,可以直接看到python抛出来的错误

Streaming常见错误

  • Caused by: java.io.IOException: error=2, No such file or directory
    这个往往是由于没有指定mapper,redue=cer-file引起的,也可以使用通配符*
  • 另一未知的错误 估计就是写的脚本执行出了问题,使用上述方式先在本地调试完了再跑

总结

Hadoop streaming写起来很灵活,并且由于跨语言,迁移起来很很快,熟悉不同语言的开发人员也非常容易合作,如果Hadoop streaming进程由多个Map-Reduce构成,那么用Shell来组织整个进程也就会非常的方便快捷

参考