[hadoop]用纯 python 写 hadoop 的 map reduce 程序

[hadoop]用纯 python 写 hadoop 的 map reduce 程序


好绕口的标题:)。hadoop 是 java 写成的,但是并不是只能用 java 才能在 hadoop 里面执行程序。但是第一个的观念要有的是,程序的执行环境,就是在 java VM 里面。所以,在 hadoop 文档里介绍的 python 程序,要执行的时候,是用 jython 转成 java jar,然后交由 hadoop framework 来执行。所以,以此类推,任何语言有能力转成 java jar 的,应该都可以写出程序让 hadoop framework 执行。

回头讲 python。用惯 python 的人,应该是不太习惯 jython 的(我就是)。这两个的确有些差别,让我无法适应(ironpython 也是)。所以,Noll 先生就介绍了如何用纯 python 在 hadoop framework 执行。

他所利用的是,hadoop 的一个 Streaming API 收发串流数据的能力,且 hadoop 可存取 stdin 及 stdout 。因此把它们当做通道,把 hadoop framework 与 python 执行期结合起来。所以,接下来要注意的是,mapper.py、reducer.py 是放在本机环境,而数据是放在 hadoop 环境里。稍不注意放错位置就会无法执行。

准备环境:

  • hadoop 环境,照之前的文章,准备一个单机环境。
  • 准备以下的程序:
    • mapper.py 放在 /home/hduser/mapper.py (记得 chmod +x)

#!/usr/bin/env python

import sys

# input comes from STDIN (standard input)
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # split the line into words
    words = line.split()
    # increase counters
    for word in words:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        print '%st%s' % (word, 1)

    • reducer.py 放在 /home/hduser/reducer.py (记得 chmod +x)

#!/usr/bin/env python

from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py
    word, count = line.split('t', 1)

    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue

    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print '%st%s' % (current_word, current_count)
        current_count = count
        current_word = word

# do not forget to output the last word if needed!
if current_word == word:
    print '%st%s' % (current_word, current_count)

Noll 先生还很好心地建议,利用一点小手法测试一下 mapper.py 及 reducer.py,免得执行完不如预期,牵拖别人。

# very basic test
[email protected]:~$ echo "foo foo quux labs foo bar quux" | /home/hduser/mapper.py
foo     1
foo     1
quux    1
labs    1
foo     1
bar     1
quux    1

[email protected]:~$ echo "foo foo quux labs foo bar quux" | /home/hduser/mapper.py | sort -k1,1 | /home/hduser/reducer.py
bar     1
foo     3
labs    1
quux    2

# using one of the ebooks as example input
# (see below on where to get the ebooks)
[email protected]:~$ cat /tmp/gutenberg/20417-8.txt | /home/hduser/mapper.py
The     1
Project 1
Gutenberg       1
EBook   1
of      1
[...]
(you get the idea)

好了,程序就这么简单。接下来要试着让它跑。要准备材料。跟之前测试的一样,到 Project Gutenberg 下载三本书,要下载文字版,UTF-8 编码的文件。(Plain Text UTF-8)

  • The Outline of Science, Vol. 1 (of 4) by J. Arthur Thomson
  • The Notebooks of Leonardo Da Vinci
  • Ulysses by James Joyce

把文字档放到 /tmp/gutenberg 目录去。

然后,要把这三个文件,放到 HDFS 去(也就是 Hadoop 环境的文件区)

[email protected]:/usr/local/hadoop$ bin/hadoop dfs -copyFromLocal /tmp/gutenberg /user/hduser/gutenberg
[email protected]:/usr/local/hadoop$ bin/hadoop dfs -ls
Found 1 items
drwxr-xr-x   - hduser supergroup          0 2010-05-08 17:40 /user/hduser/gutenberg
[email protected]:/usr/local/hadoop$ bin/hadoop dfs -ls /user/hduser/gutenberg
Found 3 items
-rw-r--r--   3 hduser supergroup     674566 2011-03-10 11:38 /user/hduser/gutenberg/pg20417.txt
-rw-r--r--   3 hduser supergroup    1573112 2011-03-10 11:38 /user/hduser/gutenberg/pg4300.txt
-rw-r--r--   3 hduser supergroup    1423801 2011-03-10 11:38 /user/hduser/gutenberg/pg5000.txt
[email protected]:/usr/local/hadoop$

然后,就是执行 mapreduce job 的时候,只有一行命令,当然参数很多,不要换行!

[email protected]:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar -file /home/hduser/mapper.py    -mapper /home/hduser/mapper.py
-file /home/hduser/reducer.py   -reducer /home/hduser/reducer.py -input /user/hduser/gutenberg/* -output /user/hduser/gutenberg-output

执行完的结果会放在,执行范例如下:

[email protected]:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar -mapper /home/hduser/mapper.py -reducer /home/hduser/reducer.py -input /user/hduser/gutenberg/* -output /user/hduser/gutenberg-output
additionalConfSpec_:null
[email protected]@@userJobConfProps_.get(stream.shipped.hadoopstreaming
packageJobJar: [/app/hadoop/tmp/hadoop-unjar54543/]
[] /tmp/streamjob54544.jar tmpDir=null
[...] INFO mapred.FileInputFormat: Total input paths to process : 7
[...] INFO streaming.StreamJob: getLocalDirs(): [/app/hadoop/tmp/mapred/local]
[...] INFO streaming.StreamJob: Running job: job_200803031615_0021
[...]
[...] INFO streaming.StreamJob:  map 0%  reduce 0%
[...] INFO streaming.StreamJob:  map 43%  reduce 0%
[...] INFO streaming.StreamJob:  map 86%  reduce 0%
[...] INFO streaming.StreamJob:  map 100%  reduce 0%
[...] INFO streaming.StreamJob:  map 100%  reduce 33%
[...] INFO streaming.StreamJob:  map 100%  reduce 70%
[...] INFO streaming.StreamJob:  map 100%  reduce 77%
[...] INFO streaming.StreamJob:  map 100%  reduce 100%
[...] INFO streaming.StreamJob: Job complete: job_200803031615_0021
[...] INFO streaming.StreamJob: Output: /user/hduser/gutenberg-output
[email protected]:/usr/local/hadoop$

执行完的结果在 HDFS 的 /user/hduser/gutenberg-output,用命令看一下结果的内容

[email protected]:/usr/local/hadoop$ bin/hadoop dfs -ls /user/hduser/gutenberg-output
Found 1 items
/user/hduser/gutenberg-output/part-00000     <r 1>   903193  2007-09-21 13:00
[email protected]:/usr/local/hadoop$
[email protected]:/usr/local/hadoop$ bin/hadoop dfs -cat /user/hduser/gutenberg-output/part-00000
"(Lo)cra"       1
"1490   1
"1498," 1
"35"    1
"40,"   1
"A      2
"AS-IS".        2
"A_     1
"Absoluti       1
[...]
[email protected]:/usr/local/hadoop$

分享