LevelDB源码解析(5) WriteBatch
简介
LevelDB的官方注释是这么介绍WriteBatch的:
WriteBatch holds a collection of updates to apply atomically to a DB
如何保证原子性可能需要看完对WriteBatch的使用才能理清楚,这里只能确定一个WriteBatch对象可以包含多条更新记录(插入/删除),支持批量写入。
WriteBatch的很多操作是通过辅助类来实现的,辅助类会直接操作WriteBatch的成员变量,本文会先介绍WriteBatch的成员变量和这些辅助类,最后介绍WrteBatch的成员函数
WriteBatch的成员变量
WriteBatch只有一个成员,是个字符串类型。所有更新记录都会编码后写入到这个 rep_ 中。
std::string rep_;
rep_ 的编码格式如下:
字节数 | 8字节 | 4字节 | 变长 | 变长 | 变长 | 变长 |
---|---|---|---|---|---|---|
内容 | sequence | count | record 1 | reocrd 2 | record 3 | record … |
sequence是一个64bit的序列号,每个WriteBatch都有一个唯一序列号。
count为rep_包含的record(更新记录)数量。
count后面是record列表,每个record的编码格式相同,格式如下:
字节数 | 1字节 | 变长 | key size | 变长 | value size |
---|---|---|---|---|---|
内容 | ValueType | key size | key | value size | value |
key size和value size使用的是LevelDB的变长编码格式
WriteBatchInternal 辅助类
WriteBatchInternal没有成员变量,只有成员函数。
SetCount与Count
void WriteBatchInternal::SetCount(WriteBatch* b, int n)
{
EncodeFixed32(&b->rep_[8], n);
}
int WriteBatchInternal::Count(const WriteBatch* b)
{
return DecodeFixed32(b->rep_.data() + 8);
}
SetCount调用EncodeFixed32把输入的参数n以小端编码方式写入WriteBatch的rep_的count域。
Count调用DecodeFixed32把rep_的count域解码后返回。
SetSequence与Sequence
void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq)
{
EncodeFixed64(&b->rep_[0], seq);
}
SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b)
{
return SequenceNumber(DecodeFixed64(b->rep_.data()));
}
SetSequence调用EncodeFixed64把序列号seq以小端编码写入到rep_的sequence域。这里的SequenceNumber其实就是std::uint64_t类型。
Sequence函数调用DecodeFixed64把rep_的sequence域解码后返回。
SetContents与Contents、ByteSize
void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents)
{
assert(contents.size() >= kHeader);
b->rep_.assign(contents.data(), contents.size());
}
static Slice WriteBatchInternal::Contents(const WriteBatch* batch){ return Slice(batch->rep_); }
static size_t WriteBatchInternal::ByteSize(const WriteBatch* batch){ return batch->rep_.size(); }
SetContent用conents覆盖了rep_。kHeader的值为12,就是sequence域和count域的总长度,这两个域是必须有的。
Contents把rep_包装成Slice对象返回。Slice在前面的文章提过,当成字符串看待就可以了。
ByteSize返回rep_的size,即rep_的字节数。
Append
void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src)
{
SetCount(dst, Count(dst) + Count(src));
assert(src->rep_.size() >= kHeader);
dst->rep_.append(src->rep_.data() + kHeader, src->rep_.size() - kHeader);
}
Append是比较重要的一个函数,有两个参数,dst和src。Append把src的数据附加合并(append)到dst中。
合并后,dst的sequence域不变,dst的count域值等于dst和src的count之和。src的record列表附加合并到了dst的reocrd列表后面。
InsertInto
Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable)
{
MemTableInserter inserter;
inserter.sequence_ = WriteBatchInternal::Sequence(b);
inserter.mem_ = memtable;
return b->Iterate(&inserter);
}
构造MemTableInserter对象,填充sequence和Memtable对象指针。然后调用WriteBatch的Iterate把当前batch的更新记录插入到memtable中。
MemTableInserter辅助类
MemTableInserter辅助类用于把记录插入MemTable。MemTableInserter继承了Handler,Handler是一个抽象类,只有Put和Delete两个纯虚函数。
Handler使用了模板方法设计模式,这样可以很方便地搞出不同版本的Handler,比如MemTableInserterPro等。
namespace
{
class MemTableInserter : public WriteBatch::Handler
{
public:
SequenceNumber sequence_;
MemTable* mem_;
void Put(const Slice& key, const Slice& value) override
{
mem_->Add(sequence_, kTypeValue, key, value);
sequence_++;
}
void Delete(const Slice& key) override
{
mem_->Add(sequence_, kTypeDeletion, key, Slice());
sequence_++;
}
};
}
笔者看到代码的时候非常困惑,为什么要写一个没有名字的namespace呢?特意去查了一下,这种叫做匿名namespace,可以让namespace内部的函数、类、对象等只能在当前文件里被访问,和C语言的static作用域类似。
成员变量
MemTableInserter包含两个成员变量,sequence_和mem_。sequence_是插入Memtable时要填写的序列号。mem_是Memtable对象指针。
Put
调用Memtable的Add函数,添加一条记录,记录类型为kTypeValue,表示这是一个插入记录。
Delete
调用Memtable的Add函数,添加一条记录,记录类型为kTypeDeletion,表示这是一个删除记录。
WriteBatch 成员函数
Put
void WriteBatch::Put(const Slice& key, const Slice& value)
{
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
rep_.push_back(static_cast<char>(kTypeValue));
PutLengthPrefixedSlice(&rep_, key);
PutLengthPrefixedSlice(&rep_, value);
}
首先是把rep_的count域加1。然后把ValueType、key和value编码成一个record,写到rep_的后面。
Delete
void WriteBatch::Delete(const Slice& key)
{
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
rep_.push_back(static_cast<char>(kTypeDeletion));
PutLengthPrefixedSlice(&rep_, key);
}
LevelDB中删除元素是通过添加一条删除记录来实现的,所以删除还是添加一条记录,和Put的区别有两个,一个是record的ValueType是kTypeDeletetion,另一个是表示删除的record只有key,没有value。
Append
void WriteBatch::Append(const WriteBatch& source)
{
WriteBatchInternal::Append(this, &source);
}
调用WriteBatchInternal的Append,把source合并到当前的WriteBatch对象。
Iterate
Status WriteBatch::Iterate(Handler* handler) const
{
Slice input(rep_);
if( input.size() < kHeader )
{
return Status::Corruption("malformed WriteBatch (too small)");
}
input.remove_prefix(kHeader);
Slice key, value;
int found = 0;
while( !input.empty())
{
found++;
char tag = input[0];
input.remove_prefix(1);
switch( tag )
{
case kTypeValue:
if( GetLengthPrefixedSlice(&input, &key) && GetLengthPrefixedSlice(&input, &value))
{
handler->Put(key, value);
}else
{
return Status::Corruption("bad WriteBatch Put");
}
break;
case kTypeDeletion:
if( GetLengthPrefixedSlice(&input, &key))
{
handler->Delete(key);
}else
{
return Status::Corruption("bad WriteBatch Delete");
}
break;
default:
return Status::Corruption("unknown WriteBatch tag");
}
}
if( found != WriteBatchInternal::Count(this))
{
return Status::Corruption("WriteBatch has wrong count");
}else
{
return Status::OK();
}
}
上面的代码挺多,但是逻辑很简单,就是遍历。把WriteBatch的rep_解码,对每个record,根据ValueType域,调用handler的Put添加插入记录或者调用Delete添加删除记录。
遍历过程中和遍历完成后都会进行一些校验,如果发现数据有问题,就返回一个”Corruption”状态。如果数据ok,返回一个”OK”状态。
ApproximateSize 与 Clear
ApproximateSize返回rep_的size,Clear清除rep_的record列表,并把sequence域和count域置0。
为什么要独立出来一个WriteBatchInternal
从代码功能来看,作者把对WriteBatch的rep_的解码和编码工作都收纳到了WriteBatchInternal里,而WriteBatch只有供外部使用的封装接口。使让代码结构更加清晰。
但是有一个例外,WriteBatch的Iterate成员函数是有对rep_进行解码的。按照上面的想法,Iterate应当由WriteBatchInternal来实现,WriteBatch再调用才对。可能是笔者对这里的理解还有些偏差,欢迎在评论区发表想法。
源码版本
- 原文作者:胡刘郏
- 原文链接:https://www.huliujia.com/blog/3c240f2a7b/