首页 > 代码库 > 去除Hadoop-Streaming行末多余的TAB

去除Hadoop-Streaming行末多余的TAB

    单位有一组业务一直都是使用Streaming压缩文本日志,大体上就是设置作业输出为BZ2格式,怎么输入就怎么输出,没有任何处理功能在里面。但是每行结尾都多出来一个TAB。终于,有一个业务需要使用TAB前的最后一个字段,不去掉不行了。

    虽然是个小问题,但是网上搜了一圈,也没有很好的解决。很多人都遇到了,但是单位的业务比较特殊,只有map没有reduce。http://stackoverflow.com/questions/20137618/hadoop-streaming-api-how-to-remove-unwanted-delimiters这个上面直接说“As I discussed with friends, there‘s no easy way to achieve the goal,...”。

    Streaming有个特点,默认是按照TAB去区分Key和Value。如果没有设置Key字段的数目,默认一行里面第一个TAB之前的做Key,后面的是Value。如果没有找到Tab,就全都是Key字段,Value是空。之所以后面会多出个Tab,正是Key和Value之间的那个Tab。

    首先是考察Streaming的Map,在PipeMapper.java。InputWriter处理输出,所以尝试实现自定义输出。在MapReduce作业配置里面,stream.map.input.writer.class负责指定InputWriter是哪一个,默认是TextInputWriter。Streaming在这里比较坑,增加-Dstream.map.input.writer.class=XXX的选项并不能令Streaming使用自定义的实现类,必须实现自己的IdentifierResolver,然后在其中对不同类型的输入设定不同类型的InputWriter,而其中的输入类型,必须由stream.map.input选项传入。是否设置成功以作业运行时候JobTracker的配置参数表为准。

    不巧的是,使用自定义的InputWriter代替TextInputWriter,行尾的Tab是没了,行首又多了个数字。估计是Hadoop给Mapper传入的Key被打印出来了。oooorz....不能瞎猜了,还是看看代码吧。

    好在代码蛮短的还是。

    Streaming会把本身、以及用户-file -cacheFile -cacheArchive 等选项指定的文件,打成一个Jar包提交到集群进行MR作业。把集群的输出,作为用户实现Mapper的输入;读取用户实现Mapper的输出,作为整个Map作业的输出。Input/Output相对于用户自定义作业,Writer/Reader体现为Streaming的行为,因此是InputWriter和OutputReader。简单来讲,

Hadoop给出的(K,V)---streaming---> 用户自定义Mapper ---streaming--->Hadoop的Mapper输出

   Streaming由PipeMapRunner启动作业,异步收集用户作业输出,进而向Hadoop汇报作业进度。整个作业的基础设置、作业提交都是由StreamJob类完成。

    作业的执行是PipeMapRed/PipeMapper/PipReducer/PipCombiner这几个类。解决方案也就在这里。在MROutputThread的run方法里面,outCollector.collect(key, value);这句之前,加上下面的代码片段即可。

          if (value instanceof Text) {
            if (value.toString().isEmpty())
              value = NullWritable.get();
          }

    是不是很简单。


    为什么这样做是可行的?还是源于org.apache.hadoop.mapreduce.lib.output.TextOutputFormat。直接上代码。

package org.apache.hadoop.mapreduce.lib.output;

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataOutputStream;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.*;

/** An {@link OutputFormat} that writes plain text files. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {
  public static String SEPERATOR = "mapreduce.output.textoutputformat.separator";
  protected static class LineRecordWriter<K, V>
    extends RecordWriter<K, V> {
    private static final String utf8 = "UTF-8";
    private static final byte[] newline;
    static {
      try {
        newline = "\n".getBytes(utf8);
      } catch (UnsupportedEncodingException uee) {
        throw new IllegalArgumentException("can‘t find " + utf8 + " encoding");
      }
    }

    protected DataOutputStream out;
    private final byte[] keyValueSeparator;

    public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
      this.out = out;
      try {
        this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
      } catch (UnsupportedEncodingException uee) {
        throw new IllegalArgumentException("can‘t find " + utf8 + " encoding");
      }
    }

    public LineRecordWriter(DataOutputStream out) {
      this(out, "\t");
    }

    /**
     * Write the object to the byte stream, handling Text as a special
     * case.
     * @param o the object to print
     * @throws IOException if the write throws, we pass it on
     */
    private void writeObject(Object o) throws IOException {
      if (o instanceof Text) {
        Text to = (Text) o;
        out.write(to.getBytes(), 0, to.getLength());
      } else {
        out.write(o.toString().getBytes(utf8));
      }
    }

    public synchronized void write(K key, V value)
      throws IOException {

      boolean nullKey = key == null || key instanceof NullWritable;
      boolean nullValue = value == null || value instanceof NullWritable;
      if (nullKey && nullValue) {
        return;
      }
      if (!nullKey) {
        writeObject(key);
      }
      if (!(nullKey || nullValue)) {
        out.write(keyValueSeparator);
      }
      if (!nullValue) {
        writeObject(value);
      }
      out.write(newline);
    }

    public synchronized 
    void close(TaskAttemptContext context) throws IOException {
      out.close();
    }
  }

  public RecordWriter<K, V> 
         getRecordWriter(TaskAttemptContext job
                         ) throws IOException, InterruptedException {
    Configuration conf = job.getConfiguration();
    boolean isCompressed = getCompressOutput(job);
    String keyValueSeparator= conf.get(SEPERATOR, "\t");
    CompressionCodec codec = null;
    String extension = "";
    if (isCompressed) {
      Class<? extends CompressionCodec> codecClass = 
        getOutputCompressorClass(job, GzipCodec.class);
      codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
      extension = codec.getDefaultExtension();
    }
    Path file = getDefaultWorkFile(job, extension);
    FileSystem fs = file.getFileSystem(conf);
    if (!isCompressed) {
      FSDataOutputStream fileOut = fs.create(file, false);
      return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
    } else {
      FSDataOutputStream fileOut = fs.create(file, false);
      return new LineRecordWriter<K, V>(new DataOutputStream
                                        (codec.createOutputStream(fileOut)),
                                        keyValueSeparator);
    }
  }
}

    注意到LineRecordWriter.write了么?


后记:

    A. 网上很多是想办法修改分隔符,把TAB换成空字符。这是一个非常粗暴的做法,基本上就是埋坑!为什么呢?

    日志文本内容可以是很丰富的,这次出问题是因为每行没有TAB。如果换做含有TAB的文本,把分隔符变为空串,就把日志中原有的TAB去掉了。

    B. 之所以这么搞,也是受到了stackoverflow的这个Q&A的启发。http://stackoverflow.com/questions/18133290/hadoop-streaming-remove-trailing-tab-from-reducer-output。类似的,Q&A也是采用修改分隔符的办法,是不可取的。但是仔细发现,是可以在自己重写的TextOutputFormat<K,V>里,修改LineRecordWriter.write方法的。

    重写TextOutputFormat是十分优雅的解决,看似修改了Hadoop本身的东西,但是在Streaming最新版没有加入这个fix之前,防止对每个版本的Streaming都要变更、重新编译打包。另外,Streaming不是独立的项目,编译它需要同时编译Hadoop!

    用vim写Java打包确实略蛋疼,周一上班试试这个更加优雅的办法。

    C. 虽然是修改了Streaming代码,但是不需要考虑会影响同一机器所有用户的问题,也不用修改$HADOOP_HOME下的Streaming包。streaming提供了这个参数stream.shipped.hadoopstreaming。

    D. 有些设置似乎是指对Reducer生效,对于这种只有Mapper的作业不起作用。比如

mapred.textoutputformat.ignoreseparator
mapred.textoutputformat.separator

    设置了,没看到什么效果。

    再有就是,命令行选项里面如果写-DXXX= \ 这样的语句,似乎也没有把这个参数设置为空串的效果,写-DXXX= ""也是一样。

本文出自 “新青年” 博客,请务必保留此出处http://luckybins.blog.51cto.com/786164/1601722

去除Hadoop-Streaming行末多余的TAB