首页 > 代码库 > TextFile SequencFile性能对比

TextFile SequencFile性能对比

   首先所有的输入格式都继承FileInputFormat,对于TextFile和SequenceFile有对应的TextInputFormat和SequenceFileInputFormat。

   我们先来看一下TextInputFormat的实现:

public class TextInputFormat extends FileInputFormat<LongWritable, Text>
  implements JobConfigurable {

  private CompressionCodecFactory compressionCodecs = null;
  
  public void configure(JobConf conf) {
    compressionCodecs = new CompressionCodecFactory(conf);
  }
  
  //是否可以被切分
  //1、没压缩 return true
  //2、压缩类是SplittableCompressionCodec的实例 return false 否则return false
  protected boolean isSplitable(FileSystem fs, Path file) {
    final CompressionCodec codec = compressionCodecs.getCodec(file);
    if (null == codec) {
      return true;
    }
    return codec instanceof SplittableCompressionCodec;
  }

 //创建出LineRecordReader,其中会读取配置中的textinputformat.record.delimiter
 //已确定每行的分隔符
  public RecordReader<LongWritable, Text> getRecordReader(
                                          InputSplit genericSplit, JobConf job,
                                          Reporter reporter)
    throws IOException {
    
    reporter.setStatus(genericSplit.toString());
    String delimiter = job.get("textinputformat.record.delimiter");
    byte[] recordDelimiterBytes = null;
    if (null != delimiter) {
      recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
    }
    return new LineRecordReader(job, (FileSplit) genericSplit,
        recordDelimiterBytes);
  }
}

接下去,看一下LineRecordReader是如何读取记录的,以下是它的构造方法:

 
 private SplitLineReader in; //读取一行记录的实现类
 
 public LineRecordReader(Configuration job, FileSplit split,
      byte[] recordDelimiter) throws IOException {
    this.maxLineLength = job.getInt(org.apache.hadoop.mapreduce.lib.input.
      LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
    start = split.getStart();//获取分片文件的启始位置
    end = start + split.getLength();//获取分片文件的结束位置
    final Path file = split.getPath();//获取分片文件对应的完整文件
    compressionCodecs = new CompressionCodecFactory(job);
    codec = compressionCodecs.getCodec(file);//获取压缩类

    // open the file and seek to the start of the split
    final FileSystem fs = file.getFileSystem(job);
    fileIn = fs.open(file);
    if (isCompressedInput()) {//是否是压缩过的流
      decompressor = CodecPool.getDecompressor(codec);
      if (codec instanceof SplittableCompressionCodec) {//是否是可切分的压缩
        final SplitCompressionInputStream cIn =
          ((SplittableCompressionCodec)codec).createInputStream(
            fileIn, decompressor, start, end,
            SplittableCompressionCodec.READ_MODE.BYBLOCK);
        in = new CompressedSplitLineReader(cIn, job, recordDelimiter);
        start = cIn.getAdjustedStart();
        end = cIn.getAdjustedEnd();
        filePosition = cIn; // take pos from compressed stream
      } else {
        in = new SplitLineReader(codec.createInputStream(fileIn,
            decompressor), job, recordDelimiter);
        filePosition = fileIn;
      }
    } else {
      fileIn.seek(start);
      in = new SplitLineReader(fileIn, job, recordDelimiter);
      filePosition = fileIn;
    }
    // If this is not the first split, we always throw away first record
    // because we always (except the last split) read one extra line in
    // next() method.
    //如果不是第一个分片,放弃读取第一行数据,因为每个分片(除了最后一个分片文件)会多读一行。至于为啥要这么做后面会解释。
    if (start != 0) {
      start += in.readLine(new Text(), 0, maxBytesToConsume(start));
    }
    this.pos = start;
  }
 
 
 接下去客户端就可以调用来读取一行:

 /** Read a line. */
  public synchronized boolean next(LongWritable key, Text value)
    throws IOException {//同步方法,因为pos是全局可变量

    // We always read one extra line, which lies outside the upper
    // split limit i.e. (end - 1)
    //总是多读一行,getFilePosition() <= end ,可见当等于end还会执行一次in.readLine
    while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
      key.set(pos);//当前位置作为key

      int newSize = in.readLine(value, maxLineLength, Math.max(maxBytesToConsume(pos), maxLineLength));
      if (newSize == 0) {
        return false;
      }
      pos += newSize;
      if (newSize < maxLineLength) {
        return true;
      }

      // line too long. try again
      LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize));
    }

    return false;
  }

接下去,我们具体看下SplitLineReader里readLine的实现

 public int readLine(Text str, int maxLineLength,
                      int maxBytesToConsume) throws IOException {
    if (this.recordDelimiterBytes != null) {
    //根据用户自定义的行分隔符读记录
      return readCustomLine(str, maxLineLength, maxBytesToConsume);
    } else {
    //根据默认行分隔符读记录
      return readDefaultLine(str, maxLineLength, maxBytesToConsume);
    }
  }

我们先来看看readCustomLine(str, maxLineLength, maxBytesToConsume)的实现:

/**
   * Read a line terminated by a custom delimiter.
   */
  private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume)
      throws IOException {
   /* We‘re reading data from inputStream, but the head of the stream may be
    *  already captured in the previous buffer, so we have several cases:
    * 
    * 1. 缓冲区的尾部不包含行分隔符的任何字符。我们计ambiguousByteCount=0
    *    
    *    
    * 2. 缓冲区的尾部包含X个字符序列,是X个字符是行分隔符的头部,我们计ambiguousByteCount=X
    *    
    *    // ***  例子:输入片段
    *    
    *    " record 1792: I found this bug very interesting and
    *     I have completely read about it. record 1793: This bug
    *     can be solved easily record 1794: This ." 
    *    
    *    delimiter = "record";
    *        
    *    supposing:- String at the end of buffer =
    *    "I found this bug very interesting and I have completely re"
    *    There for next buffer = "ad about it. record 179       ...."           
    *     
    *     The matching characters in the input
    *     buffer tail and delimiter head = "re" 
    *     Therefore, ambiguous byte count = 2 ****   //
    *     
    *     2.1 If the following bytes are the remaining characters of
    *         the delimiter, then we have to capture only up to the starting 
    *         position of delimiter. That means, we need not include the 
    *         ambiguous characters in str.
    *     
    *     2.2 If the following bytes are not the remaining characters of
    *         the delimiter ( as mentioned in the example ), 
    *         then we have to include the ambiguous characters in str. 
    */
    str.clear();
    int txtLength = 0; // tracks str.getLength(), as an optimization
    long bytesConsumed = 0;
    int delPosn = 0;
    int ambiguousByteCount=0; // To capture the ambiguous characters count
    do {
      int startPosn = bufferPosn; // Start from previous end position
      if (bufferPosn >= bufferLength) {
        startPosn = bufferPosn = 0;
        bufferLength = fillBuffer(in, buffer, ambiguousByteCount > 0);
        if (bufferLength <= 0) {
          str.append(recordDelimiterBytes, 0, ambiguousByteCount);
          break; // EOF
        }
      }
      for (; bufferPosn < bufferLength; ++bufferPosn) {
        if (buffer[bufferPosn] == recordDelimiterBytes[delPosn]) {
          delPosn++;
          if (delPosn >= recordDelimiterBytes.length) {
            bufferPosn++;
            break;
          }
        } else if (delPosn != 0) {
          bufferPosn--;
          delPosn = 0;
        }
      }
      int readLength = bufferPosn - startPosn;
      bytesConsumed += readLength;
      int appendLength = readLength - delPosn;
      if (appendLength > maxLineLength - txtLength) {
        appendLength = maxLineLength - txtLength;
      }
      if (appendLength > 0) {
        if (ambiguousByteCount > 0) {
          str.append(recordDelimiterBytes, 0, ambiguousByteCount);
          //appending the ambiguous characters (refer case 2.2)
          bytesConsumed += ambiguousByteCount;
          ambiguousByteCount=0;
        }
        str.append(buffer, startPosn, appendLength);
        txtLength += appendLength;
      }
      if (bufferPosn >= bufferLength) {
        if (delPosn > 0 && delPosn < recordDelimiterBytes.length) {
          ambiguousByteCount = delPosn;
          bytesConsumed -= ambiguousByteCount; //to be consumed in next
        }
      }
    } while (delPosn < recordDelimiterBytes.length 
        && bytesConsumed < maxBytesToConsume);
    if (bytesConsumed > (long) Integer.MAX_VALUE) {
      throw new IOException("Too many bytes before delimiter: " + bytesConsumed);
    }
    return (int) bytesConsumed; 
  }

再来看看readDefaultLine

 /**
   * Read a line terminated by one of CR, LF, or CRLF.
   */
  private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume)
  throws IOException {
    /* We‘re reading data from in, but the head of the stream may be
     * already buffered in buffer, so we have several cases:
     * 1. No newline characters are in the buffer, so we need to copy
     *    everything and read another buffer from the stream.
     * 2. An unambiguously terminated line is in buffer, so we just
     *    copy to str.
     * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
     *    in CR.  In this case we copy everything up to CR to str, but
     *    we also need to see what follows CR: if it‘s LF, then we
     *    need consume LF as well, so next call to readLine will read
     *    from after that.
     * We use a flag prevCharCR to signal if previous character was CR
     * and, if it happens to be at the end of the buffer, delay
     * consuming it until we have a chance to look at the char that
     * follows.
     */
     //以上注释说明了如何处理
     //UNIX: ‘\n‘  (LF)  
     //Mac:  ‘\r‘  (CR)  
     //Windows: ‘\r\n‘  (CR)(LF)  
    str.clear();
    int txtLength = 0; //tracks str.getLength(), as an optimization
    int newlineLength = 0; //length of terminating newline
    boolean prevCharCR = false; //true of prev char was CR
    long bytesConsumed = 0;
    do {
      int startPosn = bufferPosn; //starting from where we left off the last time
      if (bufferPosn >= bufferLength) {
        startPosn = bufferPosn = 0;
        if (prevCharCR) {
          ++bytesConsumed; //account for CR from previous read
        }
        bufferLength = fillBuffer(in, buffer, prevCharCR);
        if (bufferLength <= 0) {
          break; // EOF
        }
      }
      for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
        if (buffer[bufferPosn] == LF) {
          newlineLength = (prevCharCR) ? 2 : 1;
          ++bufferPosn; // at next invocation proceed from following byte
          break;
        }
        if (prevCharCR) { //CR + notLF, we are at notLF
          newlineLength = 1;
          break;
        }
        prevCharCR = (buffer[bufferPosn] == CR);
      }
      int readLength = bufferPosn - startPosn;
      if (prevCharCR && newlineLength == 0) {
        --readLength; //CR at the end of the buffer
      }
      bytesConsumed += readLength;
      int appendLength = readLength - newlineLength;
      if (appendLength > maxLineLength - txtLength) {
        appendLength = maxLineLength - txtLength;
      }
      if (appendLength > 0) {
        str.append(buffer, startPosn, appendLength);
        txtLength += appendLength;
      }
    } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
    
    if (bytesConsumed > (long)Integer.MAX_VALUE) {
      throw new IOException("Too many bytes before newline: " + bytesConsumed);
    }
    return (int)bytesConsumed;
  }

    其中

while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);

   

    这个newlineLength == 0,说明如果没有读到行尾,所以会继续读取,所以不存在读取一个断行,即行的两部分分别存在于两个分片中。根据这个行为如何避免漏读和重复读分片文件的开头一行呢?答案是读取的时候第一行不读,读到分片结尾的时候多读一行就可以避免这个问题。具体实现在前面的源代码里已经展现了。可见虽然文件被spilt一个个分片,但是读取逻辑并不完全遵循这个分片。会根据   实际情况调整分片。

 以上便是TextFile文件的读取核心代码,接下去我们看一下SequenceFile的行为:  SequenceFileInputFormat源码如下:

  
  public class SequenceFileInputFormat<K, V> extends FileInputFormat<K, V> {

  public SequenceFileInputFormat() {
    setMinSplitSize(SequenceFile.SYNC_INTERVAL);
  }
  
  @Override
  protected FileStatus[] listStatus(JobConf job) throws IOException {
    FileStatus[] files = super.listStatus(job);
    for (int i = 0; i < files.length; i++) {
      FileStatus file = files[i];
      if (file.isDirectory()) {     // it‘s a MapFile
        Path dataFile = new Path(file.getPath(), MapFile.DATA_FILE_NAME);
        FileSystem fs = file.getPath().getFileSystem(job);
        // use the data file
        files[i] = fs.getFileStatus(dataFile);
      }
    }
    return files;
  }
  //创建出SequenceFileRecordReader
  public RecordReader<K, V> getRecordReader(InputSplit split,
                                      JobConf job, Reporter reporter)
    throws IOException {

    reporter.setStatus(split.toString());

    return new SequenceFileRecordReader<K, V>(job, (FileSplit) split);
  }

}

 其中 protected boolean isSplitable(FileSystem fs, Path file) 直接继承父类的实现,return true 说明SequeceFile总是可以被切分,其上可见用来读取SequeceFile的内容,让我们看看SequenceFileRecordReader里的借个主要方法的源码实现:

 
    private SequenceFile.Reader in;
    
     public SequenceFileRecordReader(Configuration conf, FileSplit split)
    throws IOException {
    Path path = split.getPath();
    FileSystem fs = path.getFileSystem(conf);
    this.in = new SequenceFile.Reader(fs, path, conf);
    this.end = split.getStart() + split.getLength();
    this.conf = conf;

    if (split.getStart() > in.getPosition())
      in.sync(split.getStart());
     // start位置定位到同步点,sync的说明见之前的Sequence File的介绍博文

    this.start = in.getPosition();
    more = start < end;
    }
    
    public synchronized boolean next(K key, V value) throws IOException {
    if (!more) return false;
    long pos = in.getPosition();
    boolean remaining = (in.next(key) != null);
    if (remaining) {
      getCurrentValue(value);
    }
    if (pos >= end && in.syncSeen()) {
      more = false;
    } else {
      more = remaining;
    }
    return more;
  }
  
  protected synchronized boolean next(K key)
    throws IOException {
    if (!more) return false;
    long pos = in.getPosition();
    boolean remaining = (in.next(key) != null);
    if (pos >= end && in.syncSeen()) {
      more = false;
    } else {
      more = remaining;
    }
    return more;
  }
  
   protected synchronized void getCurrentValue(V value)
    throws IOException {
    in.getCurrentValue(value);
  }

   主要调用了SequenceFile.Reader的next方法和getCurrentValue方法,我们来具体看看SequenceFile.Reader中这些方法的实现:

首先是读取文件的一写SEQ特有的信息,解析SEQ头部信息,获取压缩类,key,value的类等等,进行初始化

private void init(boolean tempReader) throws IOException {
      byte[] versionBlock = new byte[VERSION.length];
      in.readFully(versionBlock);

      if ((versionBlock[0] != VERSION[0]) ||
          (versionBlock[1] != VERSION[1]) ||
          (versionBlock[2] != VERSION[2]))
        throw new IOException(this + " not a SequenceFile");//判断是否是SEQ文件

      // Set ‘version‘
      version = versionBlock[3];
      if (version > VERSION[3])//版本是否正确
        throw new VersionMismatchException(VERSION[3], version);

      if (version < BLOCK_COMPRESS_VERSION) {
        UTF8 className = new UTF8();

        className.readFields(in);
        keyClassName = className.toStringChecked(); // key class name

        className.readFields(in);
        valClassName = className.toStringChecked(); // val class name
      } else {
        keyClassName = Text.readString(in);
        valClassName = Text.readString(in);
      }

      if (version > 2) {                          // if version > 2
        this.decompress = in.readBoolean();       // is compressed?
      } else {
        decompress = false;
      }

      if (version >= BLOCK_COMPRESS_VERSION) {    // if version >= 4
        this.blockCompressed = in.readBoolean();  // is block-compressed?
      } else {
        blockCompressed = false;
      }
      
      // if version >= 5
      // setup the compression codec
      if (decompress) {
        if (version >= CUSTOM_COMPRESS_VERSION) {
          String codecClassname = Text.readString(in);
          try {
            Class<? extends CompressionCodec> codecClass
              = conf.getClassByName(codecClassname).asSubclass(CompressionCodec.class);
            this.codec = ReflectionUtils.newInstance(codecClass, conf);
          } catch (ClassNotFoundException cnfe) {
            throw new IllegalArgumentException("Unknown codec: " + 
                                               codecClassname, cnfe);
          }
        } else {
          codec = new DefaultCodec();
          ((Configurable)codec).setConf(conf);
        }
      }
      
      this.metadata = new Metadata();
      if (version >= VERSION_WITH_METADATA) {    // if version >= 6
        this.metadata.readFields(in);
      }
      
      if (version > 1) {                          // if version > 1
        in.readFully(sync);                       // read sync bytes
        headerEnd = in.getPos();                  // record end of header
      }
      
      // Initialize... *not* if this we are constructing a temporary Reader
      if (!tempReader) {
        valBuffer = new DataInputBuffer();
        if (decompress) {
          valDecompressor = CodecPool.getDecompressor(codec);
          valInFilter = codec.createInputStream(valBuffer, valDecompressor);
          valIn = new DataInputStream(valInFilter);
        } else {
          valIn = valBuffer;
        }

        if (blockCompressed) {
          keyLenBuffer = new DataInputBuffer();
          keyBuffer = new DataInputBuffer();
          valLenBuffer = new DataInputBuffer();

          keyLenDecompressor = CodecPool.getDecompressor(codec);
          keyLenInFilter = codec.createInputStream(keyLenBuffer, 
                                                   keyLenDecompressor);
          keyLenIn = new DataInputStream(keyLenInFilter);

          keyDecompressor = CodecPool.getDecompressor(codec);
          keyInFilter = codec.createInputStream(keyBuffer, keyDecompressor);
          keyIn = new DataInputStream(keyInFilter);

          valLenDecompressor = CodecPool.getDecompressor(codec);
          valLenInFilter = codec.createInputStream(valLenBuffer, 
                                                   valLenDecompressor);
          valLenIn = new DataInputStream(valLenInFilter);
        }
        
        SerializationFactory serializationFactory =
          new SerializationFactory(conf);
        this.keyDeserializer =
          getDeserializer(serializationFactory, getKeyClass());
        if (this.keyDeserializer == null) {
          throw new IOException(
              "Could not find a deserializer for the Key class: ‘"
                  + getKeyClass().getCanonicalName() + "‘. "
                  + "Please ensure that the configuration ‘" +
                  CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "‘ is "
                  + "properly configured, if you‘re using "
                  + "custom serialization.");
        }
        if (!blockCompressed) {
          this.keyDeserializer.open(valBuffer);
        } else {
          this.keyDeserializer.open(keyIn);
        }
        this.valDeserializer =
          getDeserializer(serializationFactory, getValueClass());
        if (this.valDeserializer == null) {
          throw new IOException(
              "Could not find a deserializer for the Value class: ‘"
                  + getValueClass().getCanonicalName() + "‘. "
                  + "Please ensure that the configuration ‘" +
                  CommonConfigurationKeys.IO_SERIALIZATIONS_KEY + "‘ is "
                  + "properly configured, if you‘re using "
                  + "custom serialization.");
        }
        this.valDeserializer.open(valIn);
      }
    }
 /** Read the next key/value pair in the file into <code>key</code> and
     * <code>val</code>.  Returns true if such a pair exists and false when at
     * end of file */
    public synchronized boolean next(Writable key, Writable val)
      throws IOException {
      if (val.getClass() != getValueClass())
        throw new IOException("wrong value class: "+val+" is not "+valClass);

      boolean more = next(key);//读取下一个key
      
      if (more) {
        getCurrentValue(val);//获取可以对应的val
      }

      return more;
    }
 /** Read the next key in the file into <code>key</code>, skipping its
     * value.  True if another entry exists, and false at end of file. */
    public synchronized boolean next(Writable key) throws IOException {
      if (key.getClass() != getKeyClass())
        throw new IOException("wrong key class: "+key.getClass().getName()
                              +" is not "+keyClass);

      if (!blockCompressed) {//不是块压缩
        outBuf.reset();
        
        keyLength = next(outBuf);//获取key的长度
        if (keyLength < 0)
          return false;
        
        valBuffer.reset(outBuf.getData(), outBuf.getLength());
        
        key.readFields(valBuffer);//读取key的内容
        valBuffer.mark(0);
        if (valBuffer.getPosition() != keyLength)
          throw new IOException(key + " read " + valBuffer.getPosition()
                                + " bytes, should read " + keyLength);
      } else {
        //Reset syncSeen
        syncSeen = false;
        
        if (noBufferedKeys == 0) {
          try {
            readBlock();
          } catch (EOFException eof) {
            return false;
          }
        }
        
        int keyLength = WritableUtils.readVInt(keyLenIn);
        
        // Sanity check
        if (keyLength < 0) {
          return false;
        }
        
        //Read another compressed ‘key‘
        key.readFields(keyIn);
        --noBufferedKeys;
      }

      return true;
    }
/**
     * Get the ‘value‘ corresponding to the last read ‘key‘.
     * @param val : The ‘value‘ to be read.
     * @throws IOException
     */
    public synchronized void getCurrentValue(Writable val) 
      throws IOException {
      if (val instanceof Configurable) {
        ((Configurable) val).setConf(this.conf);
      }

      // Position stream to ‘current‘ value
      seekToCurrentValue();

      if (!blockCompressed) {
        val.readFields(valIn);
        
        if (valIn.read() > 0) {
          LOG.info("available bytes: " + valIn.available());
          throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
                                + " bytes, should read " +
                                (valBuffer.getLength()-keyLength));
        }
      } else {
        // Get the value
        int valLength = WritableUtils.readVInt(valLenIn);//获取val的长度
        val.readFields(valIn);//读取val值
        
        // Read another compressed ‘value‘
        --noBufferedValues;
        
        // Sanity check
        if ((valLength < 0) && LOG.isDebugEnabled()) {
          LOG.debug(val + " is a zero-length value");
        }
      }

    }

以上便是SeqFile文件的读取核心代码。我们从源代码角度预测下两者的区别:


存储大小查询效率
SeqFile大(要存key长度,value长度,sync等信息)好(应为有key长度,value长度等信息所以可以快速定位key或者value并读取)
TextFile差(需要反复读直到遇到分隔符)

下面我们实践一下看看是不是这样的:

    首先通过wget http://files.grouplens.org/datasets/movielens/ml-100k.zip

    然后使用hive

CREATE TABLE u_data 
(  userid INT, 
 movieid INT,  
 rating INT,  
 unixtime STRING) 
 ROW FORMAT DELIMITED 
 FIELDS TERMINATED BY ‘\t‘ STORED AS TEXTFILE;
  LOAD DATA LOCAL INPATH ‘/usr/test/hivesourcedata/ml-100k/u.data‘ OVERWRITE INTO TABLE u_data;
 CREATE TABLE u_data_seq 
 ( userid INT, 
  movieid INT, 
   rating INT, 
    unixtime STRING) 
    ROW FORMAT DELIMITED 
    FIELDS TERMINATED BY ‘\t‘ STORED AS SEQUENCEFILE; 
    INSERT OVERWIRTE TABLE u_data_seq SELECT * FROM u.data;

首先比较下大小:

 3.1M    ./u_data_seq 

 1.9M    ./u_data

果然u_data_seq 比u_data大

我们看看u_data_seq的存储内容:

wKioL1PfR5mg_6hOAAPd77J72rE685.jpg

很标准的SEQ文件,但是大家有没有发现没有key,因为hiveSeqFile是空key,见HiveSequenceFileOutputFormat的源码

public class HiveSequenceFileOutputFormat<K,V> extends SequenceFileOutputFormat<K,V>
    implements HiveOutputFormat<K, V> {

  BytesWritable EMPTY_KEY = new BytesWritable();

  /**
   * create the final out file, and output an empty key as the key.
   *
   * @param jc
   *          the job configuration file
   * @param finalOutPath
   *          the final output file to be created
   * @param valueClass
   *          the value class used for create
   * @param isCompressed
   *          whether the content is compressed or not
   * @param tableProperties
   *          the tableInfo of this file‘s corresponding table
   * @param progress
   *          progress used for status report
   * @return the RecordWriter for the output file
   */
  @Override
  public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
      Class<? extends Writable> valueClass, boolean isCompressed,
      Properties tableProperties, Progressable progress) throws IOException {

    FileSystem fs = finalOutPath.getFileSystem(jc);
    final SequenceFile.Writer outStream = Utilities.createSequenceWriter(jc,
        fs, finalOutPath, BytesWritable.class, valueClass, isCompressed);

    return new RecordWriter() {
      @Override
      public void write(Writable r) throws IOException {
        outStream.append(EMPTY_KEY, r);//key是EMPTY_KEY
      }

      @Override
      public void close(boolean abort) throws IOException {
        outStream.close();
      }
    };
  }

}

接下去看看u_data的存储内容:

wKiom1PfSD7hcGKpAASjora9vck329.jpg

可见也是没key,hive对于TextFile的实现是org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat

接下去我们执行个简单查询,看看谁的查询速度快

hive> select * from u_data_seq where  userid >1000; 

Time taken: 15.379 seconds

hive> select * from u_data where  userid>1000;

Time taken: 17.683 seconds 

第一个SQL块了2秒多。


本文出自 “软件开发” 博客,请务必保留此出处http://tangjj.blog.51cto.com/1848040/1535555