首页 > 代码库 > leveldb源码分析--WriteBatch
leveldb源码分析--WriteBatch
从【leveldb源码分析--插入删除流程】和WriteBatch其名我们就很轻易的知道,这个是leveldb内部的一个批量写的结构,在leveldb为了提高插入和删除的效率,在其插入过程中都采用了批量集合相邻的多个具有相同同步设置的写请求以批量的方式进行写入。
其成员变量仅包含了一个 std::string 类型的 rep_变量,其Put和Delete(其实也是插入删除操作,而非删除Put进去的数据,或者你可以将其理解为Put Delete operation的过度简写)都将相应的操作Encode后存入其中。我们来看看其encode的格式
WriteBatch::rep_ := [ sequence: fixed64 | count: fixed32 | data: record[count ] record := [ kTypeValue | key(varint32 | data) | value(varint32 | data) ]
我们首先看一下WriteBatch内部相关的一些结构和成员
class Handler { public: virtual ~Handler(); virtual void Put(const Slice& key, const Slice& value) = 0; virtual void Delete(const Slice& key) = 0; }; Status Iterate(Handler* handler) const; friend class WriteBatchInternal;
WriteBatchInternal主要是对WriteBatch的内部解码编码的封装,简化WriteBatch的结构。而Handler接口我们可以看到MemTableInserter成为了其一个实现,这个handler是在WriteBatch的内部解码遍历过程中逐个调用Handler对应的Put、Delete到MemTable中。所以再回头分析DBImpl::BuildBatchGroup方法。
// REQUIRES: First writer must have a non-NULL batchWriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) { Writer* first = writers_.front(); WriteBatch* result = first->batch;
size_t size = WriteBatchInternal::ByteSize(first->batch); /* 这里主要是设置每次批量写入的最大的数据量,防止一次插入过多数据导致等待写完成的时间过长 因为从Write的逻辑分析中我们知道只有位于队列首的写线程会去批量组装然后执行真正的插入,
其他的线程都是在等待这个批量写的完成。 */
size_t max_size = 1 << 20; if (size <= (128<<10)) { max_size = size + (128<<10); } *last_writer = first; std::deque<Writer*>::iterator iter = writers_.begin(); ++iter; // 跳过 "first" for (; iter != writers_.end(); ++iter) { Writer* w = *iter; if (w->sync && !first->sync) { // 只组合相邻的sync设置相同的操作到一批进行处理. break; } if (w->batch != NULL) { size += WriteBatchInternal::ByteSize(w->batch); if (size > max_size) { // 如果总大小超过设置的现在大小,不再继续组装过程,跳出循环执行已组装的请求 break; } // 第一次进入循环,把tmp_batch_赋给result并把第一个放入其中,后继的都是append到tmp_batch_中
最后return出去进行操作的也是这个tmp_batch_ if (result == first->batch) { result = tmp_batch_; assert(WriteBatchInternal::Count(result) == 0); WriteBatchInternal::Append(result, first->batch); } WriteBatchInternal::Append(result, w->batch); } *last_writer = w; } return result;}
接下来对该batch进行操作的是WriteBatchInternal::InsertInto(updates, mem_)这个调用,这就是前面提到的采用了一个Iterator的方式解码遍历操作batch中的数据
Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) { MemTableInserter inserter; inserter.sequence_ = WriteBatchInternal::Sequence(b); inserter.mem_ = memtable; return b->Iterate(&inserter);}
Status WriteBatch::Iterate(Handler* handler) const {
while (!input.empty()) { switch (tag) { case kTypeValue: handler->Put(key, value); break; case kTypeDeletion: handler->Delete(key);
break; } } return Status::OK();}
这里我们再回顾一下Write有
while (!w.done && &w != writers_.front()) {
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。