背景

写任务合并是由函数BuildBatchGroup来完成的,BuildBatchGroup会把任务队列writers_中若干个Writer的数据合并到一起,从而减少磁盘写次数,提高写性能。虽然每次写入的时候是往Memtable里写,但是为了在故障发生时保证数据完整性,每次写入都需要先预写日志(WAL)并落盘,所以每次写入都要写一次磁盘。

如果调用方是串行写入key-value的,那么不会有合并操作,只有当调用方是并发写入时,才会出现任务合并(只有这种情况下,任务队列才可能同时存在多个任务)。每次执行BuildBatchGroup时,都会尝试把队列中还没有完成的任务进行合并。

合并涉及到WriteBatch的一些接口,这在之前的文章WriteBatch中介绍过,本文涉及到WriteBatch的三个接口。其中ByteSize、Count分别返回WriteBatch使用的字节数和存储的key-value个数。Append把第二个WriteBatch合并到第一个WriteBatch中。

写任务合并

任务队列存储在writers_中

std::deque<Writer*> writers_;

writers_不能并发访问,使用mutex来保护,所以调用BuildBatchGroup时,要保证已经持有mutex了。

BuildBatchGroup代码如下:

// 源码文件:db/db_impl.cc
WriteBatch *DBImpl::BuildBatchGroup(Writer **last_writer)
{
  //part1 begin
  mutex_.AssertHeld();
  assert(!writers_.empty());

  // first分别指向第一个Writer和第一个Writer的WriteBatch
  Writer *first = writers_.front();
  WriteBatch *result = first->batch;
  assert(result != nullptr);

  size_t size = WriteBatchInternal::ByteSize(first->batch);
  size_t max_size = 1 << 20;
  // 如果first batch的size小于等于2^17,就调低max_size
  if ( size <= (128 << 10))
  {
    max_size = size + (128 << 10);
  }
  //part1 end
  
  //part2 begin
  *last_writer = first;
  std::deque<Writer *>::iterator iter = writers_.begin();
  ++iter;
  for ( ; iter != writers_.end(); ++iter )
  {
    Writer *w = *iter;
    if ( w->sync && !first->sync )
    {
      break;
    }
    if ( w->batch != nullptr )
    {
      size += WriteBatchInternal::ByteSize(w->batch);
      if ( size > max_size )
      {
        break;
      }
      //part3 begin
      if ( result == first->batch )
      {
        result = tmp_batch_;
        assert(WriteBatchInternal::Count(result) == 0);
        WriteBatchInternal::Append(result, first->batch);
      }
      //part3 end
      WriteBatchInternal::Append(result, w->batch);
    }
    *last_writer = w;
  }
  return result;
  //part2 end
}

我们分为三个Part来阐述。

Part1

AssertHeld是leveldb基于编译器的线程安全注解(thread safety annotations)实现的一个断言函数,这个是给编译器看的,如果编译器发现执行这里之前没有加锁,就会报错。

max_size会根据第一个Writer的size来调整,如果size比较小,就把max_size调低为size + (128<<10),作者解释这里是为了耗时考虑,避免小size的写操作被延迟太久。猜测作者这样考虑的原因是通常一个数据库的key-value的size都是比较类似的,如果第一个batch的size就比较小,那么后续的batch大概率也很小。如果还是按照原来的max_size来实现,可能一次合并大量的写操作。虽然这样吞吐量上去了,但是写操作的延时就上升了。这里是在吞吐量和延时上做的一个平衡。

Part2

从前往后遍历任务队列writers_,把Writer的WriteBatch合并到result中。以下三种情况会提前终止合并:

  1. 当前的Writer要求Sync,而第一个Writer不要求Sync,两个的磁盘写入策略不一致。
  2. result的size已经超过max_size。
  3. writers_已经遍历完成。

Part3

part3属于part2,如果result == first->batch,就执行下面的逻辑:

  result = tmp_batch_;
  assert(WriteBatchInternal::Count(result) == 0);
  WriteBatchInternal::Append(result, first->batch);

让reuslt指向tmp_batch_,并把first->batch合并到result中,这段逻辑只要是writers_中有任意一个(不包含第一个)可以合并的Writer就会执行且只执行一次,反之不会执行。写简单一点的话,这部分逻辑可以放到循环体的外面,直接把第一个Writer合并到result中。为什么没有这样做呢?因为可能出现没有Writer可以合并的情况,即最终结果只包含第一个Writer。这种情况下作者会直接返回first ->batch,一次合并操作都不需要执行,性能高一点。

tmp_batch_是合并后的batch的缓存区,外部完成写入和落盘后要把tmp_batch_清空,以便下次再用。

总结

BuildBatchGroup把任务队列writers_中的Writer合并,并返回合并后的WriteBatch。如果没有进行合并,会返回第一个Writer的batch,如果有合并就返回tmp_batch_,合并结果放在tmp_batch_里面。

源码版本

https://github.com/google/leveldb/releases/tag/1.22