LevelDB源码解析(15) 写操作之MakeRoomForWrite
背景
LevelDB每次写入key-value都是写入到内存中的Memtable中的,但是Memtable的空间不是无限的,Memtable写满后,就需要调用MakeRoomForWrite把Memtable转存为Immutable Memtable,并创建新的Memtable来存储写入数据。必要时还会调度后台线程把Immutable Memtable落盘,以及合并SST文件。
MakeRoomForWrite
Status DBImpl::MakeRoomForWrite(bool force)
{
mutex_.AssertHeld();
assert(!writers_.empty());
bool allow_delay = !force;
Status s;
while ( true )
{
//Step0
if ( !bg_error_.ok())
{
//Step1
s = bg_error_;
break;
} else if ( allow_delay && versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger )
{
//Step2
mutex_.Unlock();
env_->SleepForMicroseconds(1000);
allow_delay = false; // Do not delay a single write more than once
mutex_.Lock();
} else if ( !force && (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size))
{
//Step3
// There is room in current memtable
break;
} else if ( imm_ != nullptr )
{
//Step4
Log(options_.info_log, "Current memtable full; waiting...\n");
background_work_finished_signal_.Wait();
} else if ( versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger )
{
//Step5
Log(options_.info_log, "Too many L0 files; waiting...\n");
background_work_finished_signal_.Wait();
} else
{
//Step6
assert(versions_->PrevLogNumber() == 0);
uint64_t new_log_number = versions_->NewFileNumber();
WritableFile *lfile = nullptr;
s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
if ( !s.ok())
{
versions_->ReuseFileNumber(new_log_number);
break;
}
delete log_; //log::Writer* log_;
delete logfile_; //WritableFile
logfile_ = lfile;
logfile_number_ = new_log_number;
log_ = new log::Writer(lfile);
imm_ = mem_;
has_imm_.store(true, std::memory_order_release);
mem_ = new MemTable(internal_comparator_);
mem_->Ref();
force = false; // Do not force another compaction if have room
MaybeScheduleCompaction();
}
}
return s;
}
force表示是否强制把Memtable转存为Immutable Memtable。allow_delay的初始值和force相反,表示是否允许delay。如果force为true,allow_delay就是false。在Writer主流程中调用MakeRoomForWrite时,force一定为false。MakeRoomForWrite的主要逻辑都在while循环中,为了方便表述,我们把循环内部的起始位置定为Step0。
- Step1:检查后台线程是否发生错误,如果发生错误,直接返回错误Status。
- Step2:如果允许delay,且L0层文件数量超过慢写阈值(默认为8),就等待1ms,然后把allow_delay置为false,回到Step0。allow_delay在下次执行时就是false了,所以慢写最多执行一次,避免Write主流程被阻塞太久。
- Step3:如果Memtable未写满(空间占用没有超过阈值),说明当前Memtable是有空间写入的。如果force为false,即不强制转存Memtable,那么结束循环,继续使用当前的Memtable。
- Step4:能走到这一步,要么force为true,要么当前Memtable已经写满了。都需要把Memtable转为Immutable Memtable。所以检查imm_是否为null,不为null的话,就要等待后台程序先把当前Immutable Memtable落盘(落盘后imm_会被置为null)。所以要在条件变量background_work_finished_signal_上Wait。被条件变量唤醒后将回到Step0重新执行。重新执行后会再来到Step4,如果imm_为空了,就进入Step5进行判断,如果imm_不为空,那么继续Wait。
- Step5:能走到这里说明需要转存Memtable,并且Immutable Memtable已经落盘了(imm_为null)。判断L0层文件数量是否超过停写阈值(默认为12),如果超过了,就停止写入,并在background_work_finished_signal_上Wait,被唤醒后回到Step0。因为读操作需要检查L0层的所有文件,所以L0层文件数量过多会降低读的速度。Step2和Step5的目的都是为了避免L0层的文件数量过多,所以要减慢或者停止写入,给后台程序足够的时间去完成文件合并,不然L0层文件数量就会无限膨胀。
- Step6:走到这一步,说明需要转存Memtable,且imm_为null。那么就把Memtable转存为Immutable Memtable,然后创建一个新的Memtable。每个Memtable都有一个WAL日志文件,所以会为这个新的Memtable创建新的WAL日志文件。MaybeScheduleCompaction尝试调度后台线程。imm_落盘和SST文件合并都是由后台线程完成的。
MaybeScheduleCompaction
void DBImpl::MaybeScheduleCompaction()
{
mutex_.AssertHeld();
if( background_compaction_scheduled_ )
{
// Already scheduled
}else if( shutting_down_.load(std::memory_order_acquire))
{
// DB is being deleted; no more background compactions
}else if( !bg_error_.ok())
{
// Already got an error; no more changes
}else if( imm_ == nullptr && manual_compaction_ == nullptr && !versions_->NeedsCompaction())
{
// No work to be done
}else
{
background_compaction_scheduled_ = true;
env_->Schedule(&DBImpl::BGWork, this);
}
}
先检查线程是否已经被调度了,如果已经被调度了,就直接退出。如果DB已经被关闭,那么就不调度了。如果后台线程出错,也不调度。
前面几个都是常规检查,第三个判断是关键逻辑。只有三个子条件都会true,才会直接返回,只要任意一个子条件为false,就会走到else语句调度后台线程。即:
- 如果imm_不为空,调度
- 如果设置了手动合并,调度
- 如果版本系统认为需要合并,调度。
void DBImpl::BGWork(void* db)
{
reinterpret_cast<DBImpl*>(db)->BackgroundCall();
}
任务队列池存储的是BGWork函数指针,参数是DBImpl对象。执行任务也就是执行BGWork,而BGWork实际执行的是BackgroundCall。
BackgroundCall
void DBImpl::BackgroundCall()
{
MutexLock l(&mutex_);
assert(background_compaction_scheduled_);
if( shutting_down_.load(std::memory_order_acquire))
{
// No more background work when shutting down.
}else if( !bg_error_.ok())
{
// No more background work after a background error.
}else
{
BackgroundCompaction();
}
background_compaction_scheduled_ = false;
// Previous compaction may have produced too many files in a level,
// so reschedule another compaction if needed.
MaybeScheduleCompaction();
background_work_finished_signal_.SignalAll();
}
BackgroundCall先判断DB是否被关闭,后台线程有无错误,如果没有问题,就调用BackgroundCompaction完成后台合并(详见LevelDB源码解析(13) BackgroundCompaction SST文件合并),完成BackgroundCompaction后把background_compaction_scheduled_置为false,允许其他线程发起新的调度。然后再调用MaybeScheduleCompaction检查是否需要再次调度。最后唤醒等待条件变量的线程。比如在MakeRoomForWrite中Wait的线程。
后台线程是不存在并发的,同一时刻只会有一个后台线程在执行。后台线程和Write线程存在并发竞争,所以在关键区域要使用成员变量mutex_加锁。
- 原文作者:胡刘郏
- 原文链接:https://www.huliujia.com/blog/056e5fe63f/