首页 > 代码库 > leveldb源码分析--SSTable之Compaction

leveldb源码分析--SSTable之Compaction

对于compaction是leveldb中体量最大的一部分,也应该是最为复杂的部分,为了便于理解我们首先从一些基本的概念开始。下面是一些从doc/impl.html中翻译和整理的内容:

Level 0

当日志文件超过一定大小的阈值是 (默认为 1MB):

  • 建立一个新的memtable和日志文件,以后的操作都是用新的memtable和日志文件
  • 后台进行如下操作:
    • 将旧的 memtable写到SSTable中(过程为先转为immtable_table,然后遍历写入)
    • 废弃旧的 memtable
    • 删除旧的 memtable和日志文件
    • 将新的SSTable加到level 0中.

这是doc/impl.html中的说明,但是在源代码中我们可以看到在MakeRoomForWrite函数中有逻辑,当满足一些其他条件之后(这里的其他条件不涉及到这个阈值大小)(mem_->ApproximateMemoryUsage() > options_.write_buffer_size) 就会有

log_ = new log::Writer(lfile);    imm_ = mem_;    has_imm_.Release_Store(imm_);    mem_ = new MemTable(internal_comparator_);

  这些操作,及上面描述的操作。而再查找write_buffer_size在Options::Options()  中进行了如下初始化write_buffer_size(4<<20),所以这里不知道是不是文档过久未更新的原因,所以从代码来看应该是阈值达到4MB时。而且这里的计算也不是已日志文件为依据的,而是以memtable的内存使用量为依据,当然这里两个数据应该是相差不大的,只是直观上来说应该是memtable的内存使用量。

我们再来看看compaction涉及到的一些因素:

过程

当level L中的文件的大小超过阈值时,我们在后台对其进行compact。compaction过程是先在level L+1中查找和该文件存在key 范围有重叠(overlap)的文件,如果存在重叠的文件就将其作为compaction的输入文件,在合并完成以后将其删除。这里需要注意的是level-0比较特殊,因为level-0的文件本身就有可能相互重叠,所以level-0进行compaction时我们同样选择level-0中相互重叠的文件。

compaction就是讲选择的文件进行合并输出为level L+1的SSTable。当文件大小超过2MB的时候我们新生成一个文件;或者当当前文件可以和level L+2中的10个文件都有重叠时,这个条件是为了保证下次compaction level L+1的时候不会选择太多的 level L+2中的文件。

这个过程中会删除(逻辑上)旧文件,然后将新的文件加到工作状态(即加入到version set中)。

compaction每个level的时候我们都是以循环(以key为基准)的方式进行的,即每次compact之后我们记住compact到的key,下一次我们查找包含这个key之后的下一个key的文件,然后进行compact。

compaction会丢弃呗覆盖的value,丢弃无用的删除,这里的无用是指在这个key都不在更高所有的level的key range中。

Timing

Level 0的compaction最多从level 0读取4个1MB(4个4MB?)的文件,以及所有的level 1文件(10MB),也就是我们将读取14MB,并写入14BM。
Level > 0的compaction,从level L选择一个2MB的文件,最坏情况下,将会和levelL+1的12个文件有重合(10:level L+1的总文件大小是level L的10倍;边界的2:level L的文件范围通常不会和level L+1的文件对齐)。因此Compaction将会读26MB,写26MB。对于100MB/s的磁盘IO来讲,compaction将最坏需要0.5秒。
如果磁盘IO更低,比如10MB/s,那么compaction就需要更长的时间5秒。如果user以10MB/s的速度写入,我们可能生成很多level 0文件(50个来装载5*10MB的数据)。这将会严重影响读取效率,因为需要merge更多的文件。
解决方法1:为了降低该问题,我们可能想增加log切换的阈值,缺点就是,log文件越大,对应的memtable文件就越大,这需要更多的内存。
解决方法2:当level 0文件太多时,人工降低写入速度。
解决方法3:降低merge的开销,如把level 0文件都无压缩的存放在cache中。

Number of files

对于更高的level我们可以创建更大的文件,而不是2MB,代价就是更多突发性的compaction。或者,我们可以考虑分区,把文件放存放多目录中来降低这个的开销。
然而在2011年2月4号,作者做了一个实验,在ext3文件系统中当当前文件夹含有不同的文件数量时进行100K次打开文件,结果表明现在的文件系统其实可以不需要分区。

 

Files in directoryMicroseconds to open a file
10009
1000010
10000016

 

了解了compaction的一些原理和机制以后我们该回到代码来看看具体的代码流程是怎么样的,首先回到DBimpl中的MakeRoomForWrite

Status DBImpl::MakeRoomForWrite(bool force) {  bool allow_delay = !force;  Status s;  while (true) {    if (!bg_error_.ok()) {      // Yield previous error      s = bg_error_;      break;    } else if (        allow_delay &&        versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) {          // 当L0的文件数量要达到阈值的时候,我们每次写入都延迟1ms,           // 这样可以为后台的compaction腾出一定的cpu(当后台compaction         //和当前线程是使用的一个内核的时候)这样可以降低写入延迟的方差          //因为延迟被分摊到多个写上面,而不是在几个甚至一个写的时候      env_->SleepForMicroseconds(1000);      allow_delay = false; // 每次写只允许延迟一次    } else if (!force &&  //当前mmetable的占用量未达到阈值               (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {      break;    } else if (imm_ != NULL) {      // 上一次memtable的compaction尚未结束,等待后台compaction完成       // 因为compaction的过程为 mem ->imm 完成后删除imm      bg_cv_.Wait();    } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {      // level 0的文件数量超过阈值,等待后台compaction完成      bg_cv_.Wait();    } else {      // memtable达到阈值,新生成日志和memtable,并将原先的mem转化为imm给后台compact      s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);          delete log_;      delete logfile_;      logfile_ = lfile;      logfile_number_ = new_log_number;      log_ = new log::Writer(lfile);      imm_ = mem_;      has_imm_.Release_Store(imm_);      mem_ = new MemTable(internal_comparator_);      mem_->Ref();      force = false;             // Do not force another compaction if have room      MaybeScheduleCompaction(); //触发后台compaction    }  }  return s;}

MaybeScheduleCompaction函数只是简单判断后台线程是否已经启动和一些其他的错误判断,如果未启动则启动后台compaction线程。这个compaction线程的实现在DBImpl::BackgroundCall,这个函数也只是简单的调用实现了compaction实际逻辑的函数BackgroundCompaction,我们这里就来仔细分析一下这个函数

void DBImpl::BackgroundCompaction() {  if (imm_ != NULL) { //有转化的memtable,直接将MemTable写入SSTable即返回    CompactMemTable();    return;  }  if (is_manual) { //用户主动(手动)触发的compaction    ManualCompaction* m = manual_compaction_;   //取得进项compact的输入文件生成compaction类    c = versions_->CompactRange(m->level, m->begin, m->end);    m->done = (c == NULL);    if (c != NULL) {     //取得level中最大的一个key      manual_end = c->input(0, c->num_input_files(0) - 1)->largest;    }  } else {    c = versions_->PickCompaction();  }  if (c == NULL) {  } else if (!is_manual && c->IsTrivialMove()) {  //如果不是主动触发的,并且level中的输入文件与level+1中无重叠,且与level + 2中重叠不大于  //kMaxGrandParentOverlapBytes = 10 * kTargetFileSize,直接将文件移到level+1中    c->edit()->DeleteFile(c->level(), f->number);    c->edit()->AddFile(c->level() + 1, f->number, f->file_size,                       f->smallest, f->largest);    status = versions_->LogAndApply(c->edit(), &mutex_);    //写入version中,稍后分析  } else {//否则调用DoCompactionWork进行Compact输入文件    CompactionState* compact = new CompactionState(c);    status = DoCompactionWork(compact);    CleanupCompaction(compact);      //清理compact过程中的临时变量    c->ReleaseInputs();               //清除输入文件描述符    DeleteObsoleteFiles();            //删除无引用的文件  }  delete c;  if (is_manual) {    ManualCompaction* m = manual_compaction_;    if (!status.ok()) {//如果compaction出错,也将手动的compaction标记为done      m->done = true;    }    if (!m->done) {//如果没有完成也仅仅记录基本状态,感觉manual的形式未实现完整逻辑      m->tmp_storage = manual_end;      m->begin = &m->tmp_storage;    }    manual_compaction_ = NULL;  }}

当手动触发compaction时,具体compaction哪些文件是由 versions_->CompactRange 根据,level, begin, end来计算的,下面我们来看看这个函数的实现,看看是如何取得输入文件的

Compaction* VersionSet::CompactRange(    int level,    const InternalKey* begin,    const InternalKey* end) { //将Level-level中的range与begin,end有重叠的SSTable描述符放入inputs中  current_->GetOverlappingInputs(level, begin, end, &inputs);  if (inputs.empty()) {    return NULL;  }  // 一次不能compact过大的量,将前N个已经大于的保存下来,后面的文件描述符从inputs中移除.  const uint64_t limit = MaxFileSizeForLevel(level);   //kTargetFileSize = 2 * 1048576;  uint64_t total = 0;  for (int i = 0; i < inputs.size(); i++) {    uint64_t s = inputs[i]->file_size;    total += s;    if (total >= limit) {      inputs.resize(i + 1);      break;    }  }//new一个Compaction类  Compaction* c = new Compaction(level);  c->input_version_ = current_;  c->input_version_->Ref();  c->inputs_[0] = inputs;  SetupOtherInputs(c); //尝试加入level中新的文件,条件为不再与level+1中新的文件重叠  return c;}

可以看到在初步得到了应该compaction的文件和范围以后,代码还调用了SetupOtherInputs这个函数,他的作用是为了在不影响性能的情况下尽可能多的compaction当前level的文件

void VersionSet::SetupOtherInputs(Compaction* c) {  GetRange(c->inputs_[0], &smallest, &largest);//上一层中oeverlap的加入inputs_[1]  current_->GetOverlappingInputs(level+1, &smallest, &largest, &c->inputs_[1]);  // 所有inputs的开始结束范围  GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);  // 看能否将level中与取出的level+1中的range重叠的也加到inputs中,  // 而新加的文件的range都在已经加入的level+1的文件的范围中  if (!c->inputs_[1].empty()) {   //取得和level+1的inputs重叠的level中的文件    current_->GetOverlappingInputs(level, &all_start, &all_limit, &expanded0);    const int64_t inputs0_size = TotalFileSize(c->inputs_[0]);    const int64_t inputs1_size = TotalFileSize(c->inputs_[1]);    const int64_t expanded0_size = TotalFileSize(expanded0);    if (expanded0.size() > c->inputs_[0].size() &&  //level中有新文件加入,所有的大小不大于阈值        inputs1_size + expanded0_size < kExpandedCompactionByteSizeLimit) {        //kExpandedCompactionByteSizeLimit = 25 * kTargetFileSize;      GetRange(expanded0, &new_start, &new_limit);     // 取得level+1中与新的level中的输入文件overlap的文件      current_->GetOverlappingInputs(level+1, &new_start, &new_limit,                                     &expanded1);      if (expanded1.size() == c->inputs_[1].size()) {      //如果level+1中无新的文件加入,设置为新的inputs和范围        smallest = new_start;        largest = new_limit;        c->inputs_[0] = expanded0;        c->inputs_[1] = expanded1; //这里应该是相等的,此句可以省略        GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);      }    }  }  // 取得level+2 中重叠的文件放入grandparents_  if (level + 2 < config::kNumLevels) {    current_->GetOverlappingInputs(level + 2, &all_start, &all_limit,                                   &c->grandparents_);  }    //记录本次compact到的key,下次从这个key继续往后compact  compact_pointer_[level] = largest.Encode().ToString();  c->edit_.SetCompactPointer(level, largest);}

手动compaction时如何获取选择输入文件的逻辑就分析完了,那么leveldb满足其内部一些阈值条件后触发的compaction是如何选择输入文件的呢?这个逻辑在中,下面我们来仔细的分析一下

Compaction* VersionSet::PickCompaction() { //每次compact完成在VersionSet::Finalize中计算每个level中TotalFileSize / MaxBytesForLevel // 的值,并且将最大的值最为compaction_score_ ,和compaction_level_  const bool size_compaction = (current_->compaction_score_ >= 1);    //对于每个SSTable会有一个 允许seek的次数 (f->file_size / 16384)超过这么多次会将其设置为  const bool seek_compaction = (current_->file_to_compact_ != NULL);  // 这两种可能导致的compaction中,我们优先compact第一种情况的  if (size_compaction) {    level = current_->compaction_level_;    c = new Compaction(level);    // 查找第一个包含比上次已经compact的最大key大的key的文件    for (size_t i = 0; i < current_->files_[level].size(); i++) {      if (compact_pointer_[level].empty() ||          icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) {        c->inputs_[0].push_back(f);        break;      }    }    if (c->inputs_[0].empty()) {      // 如果上次已经是最大的key,那么回到第一个文件开始compact      c->inputs_[0].push_back(current_->files_[level][0]);    }  } else if (seek_compaction) {//如果是查找导致的,直接将导致compact的文件加入inputs_[0]    level = current_->file_to_compact_level_;    c = new Compaction(level);    c->inputs_[0].push_back(current_->file_to_compact_);  } else {    return NULL;  }  c->input_version_ = current_;  c->input_version_->Ref();  // 如果是level 0 则还需查找level 0中其他和输入文件重叠的文件  if (level == 0) {    GetRange(c->inputs_[0], &smallest, &largest);    current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]);  }  SetupOtherInputs(c); //尝试加入level中新的文件,条件为不再与level+1中新的文件重叠,这个函数已经分析  return c;}

选择好了需要进行Compaction的的文件以后,就该调用实际的Compaction过程了,我们来分析其逻辑,过程比较长但是只要仔细细心的阅读,其处理的逻辑并不复杂,主要是遍历所有输入文件,然后将相同的可以进行合并,以及删除一些无用的delete操作等。

Status DBImpl::DoCompactionWork(CompactionState* compact) {    //将snapshot相关的内容记录到compact信息中  if (snapshots_.empty()) {    compact->smallest_snapshot = versions_->LastSequence();  } else {    compact->smallest_snapshot = snapshots_.oldest()->number_;  }  //遍历所有inputs文件  Iterator* input = versions_->MakeInputIterator(compact->compaction);  for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {    // 每次都判断如果有memtable 需要compact,先compact memtable    if (has_imm_.NoBarrier_Load() != NULL) {      if (imm_ != NULL) {         CompactMemTable();        bg_cv_.SignalAll(); // Wakeup 等待空间的线程      }    }    Slice key = input->key();    if (compact->compaction->ShouldStopBefore(key) &&        compact->builder != NULL) { //当前(level +1)生成的文件和level + 2中有过多的重叠      status = FinishCompactionOutputFile(compact, input); //写当前文件到磁盘      if (!status.ok()) {        break;      }    }    // Handle key/value, add to state, etc.    bool drop = false;    if (!ParseInternalKey(key, &ikey)) {      // 解码错误,清除之前的状态      current_user_key.clear();      has_current_user_key = false;      last_sequence_for_key = kMaxSequenceNumber;    } else {      if (!has_current_user_key ||          user_comparator()->Compare(ikey.user_key,                                     Slice(current_user_key)) != 0) {        // 第一次出现的key,将seq设置为最大标记新key开始        current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());        has_current_user_key = true;        last_sequence_for_key = kMaxSequenceNumber;      }        //因为第一次出现会将last seq设置为最大,表示上一个key的关于seq的比较结束      if (last_sequence_for_key <= compact->smallest_snapshot) {        // Hidden by an newer entry for same user key        drop = true; // (A)      } else if (ikey.type == kTypeDeletion &&                   ikey.sequence <= compact->smallest_snapshot &&               //无snapshot引用                 compact->compaction->IsBaseLevelForKey(ikey.user_key)) { //(1)        // For this user key:        // (1) there is no data in higher levels        // 而我们知道在底层的文件中seq会更大,正在被compact的相同的key会稍后标记这个为删除(ruleA)        drop = true;      }      last_sequence_for_key = ikey.sequence;     }    if (!drop) {      // 第一次进入compact或者上次文件刚刚写到磁盘,新建一个文件和table_builder      if (compact->builder == NULL) {        status = OpenCompactionOutputFile(compact);        if (!status.ok()) {          break;        }      }    //新文件,记录当前key 为 整个文件的smallest      if (compact->builder->NumEntries() == 0) {        compact->current_output()->smallest.DecodeFrom(key);      }      //每遍历到一个就将其记录为largest      compact->current_output()->largest.DecodeFrom(key);      compact->builder->Add(key, input->value());      // 超过level的阈值大小,将文件写到磁盘      if (compact->builder->FileSize() >= compact->compaction->MaxOutputFileSize()) {        status = FinishCompactionOutputFile(compact, input);        if (!status.ok()) {          break;        }      }    }    input->Next();  }//判断状态和将未写到磁盘的数据写入磁盘  if (status.ok() && shutting_down_.Acquire_Load()) {    status = Status::IOError("Deleting DB during compaction");  }  if (status.ok() && compact->builder != NULL) {    status = FinishCompactionOutputFile(compact, input);  }  if (status.ok()) {    status = input->status();  }  delete input;  input = NULL;  CompactionStats stats;  stats.micros = env_->NowMicros() - start_micros - imm_micros;  for (int which = 0; which < 2; which++) {//计算本次Compaction读入文件的总大小    for (int i = 0; i < compact->compaction->num_input_files(which); i++) {      stats.bytes_read += compact->compaction->input(which, i)->file_size;    }  }  for (size_t i = 0; i < compact->outputs.size(); i++) {    stats.bytes_written += compact->outputs[i].file_size;  }//本次Compaction写出文件的总大小  mutex_.Lock();  stats_[compact->compaction->level() + 1].Add(stats);  if (status.ok()) {//记录统计信息以及将Compaction导致的文件变动记录到versionedit中    status = InstallCompactionResults(compact);  }  return status;}

SSTable的Compaction就分析完了,关于Compaction还剩下MemTable的Compaction,或者也可以将其说明为Memtable的dump为SSTable。再分析完上面的SSTable Compaction后你就发现MemTable的Compaction是如此之简单了,我们简单罗列一下

void DBImpl::CompactMemTable() {  Status s = WriteLevel0Table(imm_, &edit, base);  // Replace immutable memtable with the generated Table  if (s.ok()) {    edit.SetPrevLogNumber(0);    edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed    s = versions_->LogAndApply(&edit, &mutex_);  }  if (s.ok()) {    // Commit to the new state    imm_->Unref();    imm_ = NULL;    has_imm_.Release_Store(NULL);    DeleteObsoleteFiles();  } else {    RecordBackgroundError(s);  }}

这个逻辑中就一个主要的函数WriteLevel0Table,其流程如下:

Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base) {   meta.number = versions_->NewFileNumber();  pending_outputs_.insert(meta.number);  Iterator* iter = mem->NewIterator();  //新生成一个Table_builder负责写文件    s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);  // Note that if file_size is zero, the file has been deleted and   // should not be added to the manifest.  int level = 0;  if (s.ok() && meta.file_size > 0) {    const Slice min_user_key = meta.smallest.user_key();    const Slice max_user_key = meta.largest.user_key();    if (base != NULL) {      /* 找到一个当层未overlap 且上册overlap 不会过多(kMaxGrandParentOverlapBytes)的层返回*/       level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);    }    //将文件加到versionedit中    edit->AddFile(level, meta.number, meta.file_size,                  meta.smallest, meta.largest);  }  CompactionStats stats;  stats.micros = env_->NowMicros() - start_micros;  stats.bytes_written = meta.file_size;  stats_[level].Add(stats);  return s;}

这里有一个唯一需要注意的是——将Memtable dump到磁盘以后并不是如文档描述的“将新的SSTable加到level 0中.”,而是会用一个函数PickLevelForMemTableOutput选择一个最高的可以将这个SSTable放入的level中。一般来说会是level 0,但是还是存在一些特殊情况可以将其放到更高的level中,这样可以降低Compaction的频率。PickLevelForMemTableOutput的逻辑简单,请读者自行阅读。

至此comaction流程相关的函数就分析完了,本节内容比较多,但是只要静下心来慢慢品读理解还是不难的。至此leveldb中剩下的还有recover,new (新建一个数据库)、snapshot、get相关的代码没有分析了。我们在compaction的分析过程中涉及到了很多有关version的类、方法、结构,leveldb的vesion是整个系统极其重要的一环,而且recovery,snapshot,get在一定程度上都会依赖于version的实现,所以接下来的文章准备对version相关的内容进行介绍。敬请期待……