首页 > 代码库 > MapReduce 编程 系列十二 用Hadoop Streaming技术集成newLISP脚本

MapReduce 编程 系列十二 用Hadoop Streaming技术集成newLISP脚本

本文环境和之前的Hadoop 1.x不同,是在Hadoop 2.x环境下测试。功能和前面的日志处理程序一样。

第一个newLISP脚本,起到mapper的作用,在stdin中读取文本数据,将did作为key, value为1,然后将结果输出到stdout

第二个newLISP脚本,起到reducer的作用,在stdin中读取<key, values>, key是dic, values是所有的value,简单对value求和后,写到stdout中

最后应该可以在HDFS下看到结果。


用脚本编程的好处是方便测试,现在先开发newLISP脚本读入文件,并仿照map逻辑处理,然后交给后续的newLISP脚本仿照reduce处理。

下面是map.lsp代码:

#!/usr/bin/newlisp

(while (read-line)
  (set ‘value (parse (current-line) ","))
  (println (string (value 2) "\t1"))
)

(exit)

测试一下:

cat logs/sign_2014-05-10.0.csv | ./map.lsp

结果还不错:

537025b84700aab27472b87f        1
537023124700aab27472b82a        1
537031a24700aab27472b982        1
537023c84700aab27472b841        1
537014e74700aab27472b48b        1
53702cac4700aab27472b928        1
537049cd4700aab27472ba91        1
5370dd0b4700aab27472bde4        1

将一行记录按照,拆开,放在一个list中,然后取第三个元素,也就是设备ID,之后添加\t为列分隔符号,然后再添加1.

这样就转成了did\t1\n的形式的<key,value>给reduce。注意newLISP的代码println函数会自动在字符串后面添加\n.


下面来实现reduce.lsp代码:

(new Tree ‘my-table)

(while (read-line)
  (set ‘line-value (parse (current-line) "\t"))
  (set ‘key (line-value 0))
  (set ‘value (int (line-value 1)))
  (set ‘v (my-table key))
  (if v
      (my-table key (+ v value))
    (my-table key value)
      )
)

(dolist (item (my-table)) (println (item 0) "\t" (item 1)))

(exit)

首先创建了一个my-table,用来保存<key,value>

然后将map.lsp输出的数据每行按照\t拆分,获取key和value

存入my-table中,用key查询,有则value加1,无key则添加进去。

最后遍历整个my-table,输出did\tsum\n这样的数据。


下面的命令可以将map和reduce脚本连起来测试:

cat logs/sign_2014-05-10.0.csv | ./map.lsp | sort | ./reduce.lsp


在hadoop集群部署的时候首先要确保newlisp二进制程序都部署在所有节点的/usr/bin/目录下,并且有执行权限。由于newlisp程序本身非常小,所以部署及其轻松,直接scp即可。

然后执行hadoop命令:

hadoop jar hadoop-streaming-1.0.0.jar -files map.lsp reduce.lsp -input /user/chenshu/share/logs -output /user/chenshu/share/output/lisp -mapper map.lsp -reducer reduce.lsp 




MapReduce 编程 系列十二 用Hadoop Streaming技术集成newLISP脚本