首页 > 代码库 > leveldb源码分析—Recover和Repair

leveldb源码分析—Recover和Repair

 

leveldb作为一个KV存储引擎将数据持久化到磁盘,而对于一个存储引擎来说在存储过程中因为一些其他原因导致程序down掉甚至数据文件被破坏等都会导致程序不能按正常流程再次启动。那么遇到这些状况以后如何使程序最大程度的恢复数据就是非常重要的一项工作,leveldb也提供了这方面的工作。

 

首先来看recover,这是每一次启动数据库的时候都会呗调用到的流程。其功能是恢复数据库在运行中突然因为某些原因down掉而这个时候leveldb中的丢失的当前状态,以及memtable甚至immtable中还未持久化到SSTable中的数据。我们知道leveldb采用的是WAL的方式进行的,那么对于leveldb中的当前状态存储在manifest文件中,通过读取已经持久化的状态,然后在结合WAL总的信息就可以恢复到最新状态;而memtable和immtable中的数据恢复的主要就是将未持久化到SSTable的数据从Log中读取出来重做到memtable或者SSTable即可。整个恢复的流程如下:

Status DBImpl::Recover(VersionEdit* edit) {    env_->CreateDir(dbname_);    Status s = env_->LockFile(LockFileName(dbname_), &db_lock_);    if (!env_->FileExists(CurrentFileName(dbname_))) {        if (options_.create_if_missing) {            s = NewDB(); //生成全新的manifest和current文件            return s;        }     }    s = versions_->Recover();                   // 恢复当前version信息    s = env_->GetChildren(dbname_, &filenames); // 获取文件列表    versions_->AddLiveFiles(&expected);    for (size_t i = 0; i < filenames.size(); i++) {        if (ParseFileName(filenames[i], &number, &type)) {            expected.erase(number); //删除存在的文件            if (type == kLogFile && ((number >= min_log) || (number == prev_log)))                logs.push_back(number); //存储当前已有的日志文件        }    }    if (!expected.empty()) { //如果文件缺失        return Status::Corruption(buf);    }    std::sort(logs.begin(), logs.end()); //排序日志文件    for (size_t i = 0; i < logs.size(); i++) {        s = RecoverLogFile(logs[i], edit, &max_sequence); //重做日志操作        versions_->MarkFileNumberUsed(logs[i]);     }    if (s.ok()) {        if (versions_->LastSequence() < max_sequence) {            versions_->SetLastSequence(max_sequence);        }    }}

 

下面我们首先来详细看看从manifest文件中恢复version信息的流程

Status VersionSet::Recover() {    // Read "CURRENT" file, which contains a pointer to the current manifest file    Status s = ReadFileToString(env_, CurrentFileName(dbname_), &current);    // 生成当前manifest的文件名    std::string dscname = dbname_ + "/" + current;    SequentialFile* file;    s = env_->NewSequentialFile(dscname, &file);    if (!s.ok()) {        return s;    }    // 各种初始化    {        // 逐个读取每个versionedit,并重建、记录响应的log_num等        while (reader.ReadRecord(&record, &scratch) && s.ok()) {            s = edit.DecodeFrom(record);            if (s.ok()) {                if (edit.has_comparator_ &&                        edit.comparator_ != icmp_.user_comparator()->Name()) {                }            }            if (s.ok()) {                builder.Apply(&edit);            }            if (edit.has_log_number_) {                log_number = edit.log_number_;                have_log_number = true;            }            if (edit.has_prev_log_number_) {                prev_log_number = edit.prev_log_number_;                have_prev_log_number = true;            }            if (edit.has_next_file_number_) {                next_file = edit.next_file_number_;                have_next_file = true;            }            if (edit.has_last_sequence_) {                last_sequence = edit.last_sequence_;                have_last_sequence = true;            }        }    }    delete file;    file = NULL;    if (s.ok()) {        //恢复时获取到的num,seq等的判断和处理        MarkFileNumberUsed(prev_log_number);        MarkFileNumberUsed(log_number);    }    if (s.ok()) {        Version* v = new Version(this);        builder.SaveTo(v); // 存储一个全量的version        // 计算Compaction相关的数据        Finalize(v);        AppendVersion(v); // 将version加入version链表中,然后根据恢复得到的信息维护当前version的信息        manifest_file_number_ = next_file;        next_file_number_ = next_file + 1;        last_sequence_ = last_sequence;        log_number_ = log_number;        prev_log_number_ = prev_log_number;    }    return s;}

 

 

恢复了当前version的基本信息以后进行日志操作就可以恢复内存中的数据了,重做日志操作即是将日志中记录的操作读取出来,然后再将读取到的操作重新写入到leveldb中

Status DBImpl::RecoverLogFile(uint64_t log_number,        VersionEdit* edit,        SequenceNumber* max_sequence) {    while (reader.ReadRecord(&record, &scratch) &&            status.ok()) {        if (record.size() < 12) ; //size不对        WriteBatchInternal::SetContents(&batch, record);        status = WriteBatchInternal::InsertInto(&batch, mem);//插入        if (last_seq > *max_sequence) {            *max_sequence = last_seq;        }        if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {            status = WriteLevel0Table(mem, edit, NULL); //写成SSTable        }    }    if (status.ok() && mem != NULL) {        status = WriteLevel0Table(mem, edit, NULL);        //写成SSTable    }    return status;}

 

recover是leveldb中较为轻量级的恢复,主要是恢复leveldb未持久化到磁盘的数据和状态,每次启动时都会被调用到。那么如果我们的数据库受到了更大程度的伤害呢?比如SSTable文件损坏甚至丢失,manifest文件丢失。那么这个时候就必须使用下面的repair了,这个是当leveldb不能正常启动的时候需要手动进行的。

repair主要包含了如下几个流程来完成数据的修复:

FindFiles();               // 遍历数据目录下的文件解析文件名至manifest,日志,SSTable   ConvertLogFilesToTables(); // 解析日志文件生成为SSTble,主要用到了ConvertLogToTable,    ExtractMetaData();        /* 逐个遍历解析扫描到的SSTable并重新manifest的filemeta信息以供后面使用,
                            如果解析过程中有不能解析的数据,丢弃不能解析的数据并生成新的SSTable     */    WriteDescriptor();       // 重置manifest文件号为1并生成最新的manifest,将其记录到current文件

 

ConvertLogFilesToTables主要用了一个和RecoverLogFile函数类似的ConvertLogToTable函数,他们的主要区别在RecoverLogFile在转化日志操作的时候使用的是leveldb的全局环境memtable和immtable,所以recover的数据有可能会持久化为SSTable,而有一部分则会留在内存中的memtable中;而ConvertLogToTable则是自己新建了一个局部的memtable来进行操作,然后将数据持久化为SSTable,这个过程从log恢复的数据一定会持久化为SSTable。

然后我们着重看下ExtractMetaData函数对扫描到的SSTable文件的逐个进行ScanTable的流程:

void ScanTable(uint64_t number) {    std::string fname = TableFileName(dbname_, number); //生成ldb文件名    Status status = env_->GetFileSize(fname, &t.meta.file_size); //获取文件大小,否则尝试sst文件后缀名    if (!status.ok()) {        // 略,和上面类似    }    if (!status.ok()) { // 如果都不能获取到文件大小,这个文件恢复失败,将其放入lost下        ArchiveFile(TableFileName(dbname_, number));        ArchiveFile(SSTTableFileName(dbname_, number));        return;    }    // 遍历文件获得metadata信息.    Iterator* iter = NewTableIterator(t.meta);    for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {        Slice key = iter->key();        if (!ParseInternalKey(key, &parsed)) {            continue;        }        counter++;        if (empty) { // 第一个key记录问文件最小key            empty = false;            t.meta.smallest.DecodeFrom(key);        } //否则替换上一次设置的最大key,这样遍历到最后一个时就保留的是最后的一个        t.meta.largest.DecodeFrom(key);        if (parsed.sequence > t.max_sequence) {            t.max_sequence = parsed.sequence;        }    }    // 如果遍历过程中解析key无失败,直接存储metadata,否则遍历SSTable生成新的SSTable,并删除旧的    if (status.ok()) {        tables_.push_back(t);    } else {        RepairTable(fname, t); // RepairTable archives input file.    }}

 

如果数据有损坏,那么就必须重建SSTable,其过程如下:

 

void RepairTable(const std::string& src, TableInfo t) {    // 遍历SSTable生成新的SSTable,并删除旧的    // 生成新文件    std::string copy = TableFileName(dbname_, next_file_number_++);    WritableFile* file;    Status s = env_->NewWritableFile(copy, &file);    if (!s.ok()) {        return;    }    TableBuilder* builder = new TableBuilder(options_, file);    // Copy data.    Iterator* iter = NewTableIterator(t.meta);    int counter = 0;    for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {        builder->Add(iter->key(), iter->value());        counter++;    }    delete iter;    ArchiveFile(src); //将旧文件移到lost下    if (counter == 0) {        builder->Abandon(); // Nothing to save    } else {        s = builder->Finish();        if (s.ok()) {            t.meta.file_size = builder->FileSize();        }    }    delete builder;    builder = NULL;    if (s.ok()) {        s = file->Close();    }    delete file;    file = NULL;    if (counter > 0 && s.ok()) {        std::string orig = TableFileName(dbname_, t.meta.number);        s = env_->RenameFile(copy, orig);        if (s.ok()) {            tables_.push_back(t);        }    }    if (!s.ok()) {        env_->DeleteFile(copy);    }}

 

 

在recover和repair的过程中我们发现了version的维护和管理十分重要,原本不打算再对这部分进行分析,但是现在看来对其进行一下细致的梳理还是很有必要的,预计下一篇专门介绍这部分内容。