背景

LevelDB的名字中的Level表示文件是分层存储的,默认最多7层,每层有0到N个文件。除了L0层,每层内部的文件之间都是有序的(所谓Sorted String Table),互斥的(相互之间的key范围没有重叠)。而L0层的文件都是从Memtable直接落盘的,所以层内的文件之间是无序的,不互斥的。不同层之间也是没有互斥关系的,查找时如果在L(x)层没有找到,就会进入L(x+1)层查找。

前面的文章中说过一个新的SST文件可能放到L0、L1或者L2。随着落盘的Memtable数量越来越多,L0~L2的文件数量也越来越多,所以需要一个机制把低层的文件合并到高层去,这就是我们今天说的SST文件合并了。

LevelDB分层

LevelDB每层的单个文件最大size都是一样的,通过限制不同层的文件总size来控制每层的文件数量,L1层的最大总size为1M,其他层都是低一层的10倍,默认的分层size如下表:

L1 L2 L3 L4 L5 L6
1M 10M 100M 1000M 10000M 100000M

L0层是通过文件数量来限制的,涉及到几个不同的限制程度,达到的限制程度越高,对Memtable落盘影响越大,默认达到4个就进行合并,达到12个,就暂停Memtable落盘。L6层是最高层,L6层的size其实是没有限制的,因为就算超了,也不可能继续往上合并了。

合并一定是发生在相邻两层之间的,假设是level层和level+1层,合并是先在level层选择要合并的文件,然后去level+1层查找和选中的文件有key范围重叠的文件,最后把level层和level+1层选中的文件合并到一起,合并一般会产生多个有序的、互斥的文件,这些文件会被放到level+1层,被合并的文件会被删除。

合并是由DBImpl::BackgroundCompaction来完成的,我们从这里切入来分析合并过程。

DBImpl::BackgroundCompaction 合并入口

代码如下,删除了和手动合并相关的代码, 手动合并只有使用工具调用时才有可能发生,levelDB正常运行时不会出现手动合并,所以这里忽略了。

void DBImpl::BackgroundCompaction()
{
  mutex_.AssertHeld();
  //Step1
  if ( imm_ != nullptr )
  {
    CompactMemTable();
    return;
  }

  //Step2
  Compaction *c;
  c = versions_->PickCompaction();
  
  
  //Step3
  Status status;
  if ( c == nullptr )
  {
    // Nothing to do
  } else if ( c->IsTrivialMove())
  {
    //Step3.1
    // Move file to next level
    assert(c->num_input_files(0) == 1);
    FileMetaData *f = c->input(0, 0);
    c->edit()->RemoveFile(c->level(), f->number);
    c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest, f->largest);
    status = versions_->LogAndApply(c->edit(), &mutex_);
    if ( !status.ok())
    {
      RecordBackgroundError(status);
    }
  } else
  {
    //Step3.2
    CompactionState *compact = new CompactionState(c);
    status = DoCompactionWork(compact);
    if ( !status.ok())
    {
      RecordBackgroundError(status);
    }
    CleanupCompaction(compact);
    c->ReleaseInputs();
    RemoveObsoleteFiles();
  }
  delete c;
  
  if ( status.ok())
  {
    // Done
  } else if ( shutting_down_.load(std::memory_order_acquire))
  {
    // Ignore compaction errors found during shutting down
  } else
  {
    Log(options_.info_log, "Compaction error: %s", status.ToString().c_str());
  }
}

我们分为三个步骤来讲述

Step1:如果imm_不为null,即有Immutable Memtable,那么先完成imm_的落盘,CompactMemTable会把imm_转为一个SST文件,放到L0~L2中的一层,详细逻辑请见LevelDB源码解析(12) Memtable落盘

Step2:选择要合并的level,以及level层和level+1层要合并的文件,结果放到Compaction中。

Step3:如果c为null,说明不需要合并。反之调用IsTrivialMove(后面有介绍)判断是否可以直接把level层的文件移动到level+1层,如果返回True,进入Step3.1,如果返回False,进入Step3.2。

Step3.1:只有当Step2中选择的level层文件只有一个,选择的level+1层文件为0个,且唯一的文件和level+2层有key范围重叠的文件总size小于阈值,IsTrivialMove才会返回True。此时可以直接把level层的文件移动到level+1层。level+1层的文件为0,说明选中的level层文件和level+1层的文件都没有重叠,所以提升后,level+1层的文件之间仍然是有序且互斥的。

Step3.2:走正常的合并流程,调用DoCompactionWork完成文件合并,这个函数一会儿详细说。

删除旧文件、增加新文件和移动文件到上一层等操作,都会改变levelDB的状态,而MANIFEST文件保存了LevelDB当前的状态,包括有哪些文件、每层的合并状态等,所以无论是自动合并还是手动合并都需要更新MANIFEST中保存的版本信息,同样也要更新内存中的VersionSet。有空的话会详细讲一下LevelDB的版本控制,这里可以先理解为合并后会让LevelDB产生一个新的Version。

VersionSet::PickCompaction

PickCompaction负责挑选要执行合并的level,并在level层和level+1层挑选要合并的文件。

Compaction* VersionSet::PickCompaction()
{
  Compaction* c;
  int level;

  const bool size_compaction = (current_->compaction_score_ >= 1);
  const bool seek_compaction = (current_->file_to_compact_ != nullptr);
  if( size_compaction )
  {
    level = current_->compaction_level_;
    assert(level >= 0);
    assert(level + 1 < config::kNumLevels);
    c = new Compaction(options_, level);

    for( size_t i = 0; i < current_->files_[level].size(); i++ )
    {
      FileMetaData* f = current_->files_[level][i];
      if( compact_pointer_[level].empty() ||
          icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0 )
      {
        c->inputs_[0].push_back(f);
        break;
      }
    }
    if( c->inputs_[0].empty())
    {
      c->inputs_[0].push_back(current_->files_[level][0]);
    }
  }else if( seek_compaction )
  {
    level = current_->file_to_compact_level_;
    c = new Compaction(options_, level);
    c->inputs_[0].push_back(current_->file_to_compact_);
  }else
  {
    return nullptr;
  }

  c->input_version_ = current_;
  c->input_version_->Ref();

  if( level == 0 )
  {
    InternalKey smallest, largest;
    GetRange(c->inputs_[0], &smallest, &largest);
    current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]);
    assert(!c->inputs_[0].empty());
  }

  SetupOtherInputs(c);
  return c;
}

选择level和level层的文件

第一步是确定level,即要在哪一层执行合并,以及level层的文件。第二步是选择level+1层的文件。第一步有size_compaction和seek_compaction两种挑选机制,优先size_compaction,size_compaction未触发才会检查seek_compaction。

size_compaction

compact_pointer_[level]保存了level层上次被合并的文件的largest key,从level层文件中选出largest_key最小的文件,使其大于compact_pointer[level],如果compact_pointer_[level]为空,或者level层没有文件的larget_key比compact_pointe_[level]大,就选择第一个文件,从头开始。这个机制保证了level层所有文件都有均等的机会被合并,避免了一直合并头部的文件,导致后面的文件没有机会被合并。

seek_compaction

level = current_->file_to_compact_level_;
c = new Compaction(options_, level);
c->inputs_[0].push_back(current_->file_to_compact_);

逻辑简单直接,要合并的level是file_to_compact_level_,要合并的level层文件是file_to_compact_。这两个值是读操作的时候赋值的,当一个文件seek miss的次数超过阈值,就会把这个文件和所在层设置为需要合并。这里推测作者的想法是查询的key在这个文件范围内,但是却不在这个文件里,每次到这里都需要多读一次磁盘,那么把这个文件往高一层合并,可以避免在当前层无效地读文件。

L0层特殊处理

因为L0层的文件之间是有重叠的,所以会递归的把所有和选中文件有重叠的文件也加入到被合并的文件列表中。所谓的递归,是指如果有新文件加入到列表中,对新文件,也要递归的去查找和新文件有重叠的文件。

选择level+1层文件

上面选择level层要合并的文件,还需要找到和选中的level层文件有重叠的level+1层文件。

level+1层的选择逻辑在SetupOtherInputs中,前面选择的level层文件保存在inputs_[0]中,level+1层文件将保存到inputs_[1]中,下面我们分成3个步骤来说。

void VersionSet::SetupOtherInputs(Compaction* c)
{
  //Step1
  const int level = c->level();
  InternalKey smallest, largest;
  AddBoundaryInputs(icmp_, current_->files_[level], &c->inputs_[0]);
  GetRange(c->inputs_[0], &smallest, &largest);
  current_->GetOverlappingInputs(level + 1, &smallest, &largest, &c->inputs_[1]);
  InternalKey all_start, all_limit;
  GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);

  //Step2
  if( !c->inputs_[1].empty())
  {
    std::vector<FileMetaData*> expanded0;
    current_->GetOverlappingInputs(level, &all_start, &all_limit, &expanded0);
    AddBoundaryInputs(icmp_, current_->files_[level], &expanded0);

    const int64_t inputs0_size = TotalFileSize(c->inputs_[0]);
    const int64_t inputs1_size = TotalFileSize(c->inputs_[1]);
    const int64_t expanded0_size = TotalFileSize(expanded0);
    if( expanded0.size() > c->inputs_[0].size() &&
        inputs1_size + expanded0_size < ExpandedCompactionByteSizeLimit(options_))
    {
      InternalKey new_start, new_limit;
      GetRange(expanded0, &new_start, &new_limit);
      std::vector<FileMetaData*> expanded1;
      current_->GetOverlappingInputs(level + 1, &new_start, &new_limit, &expanded1);
      if( expanded1.size() == c->inputs_[1].size())
      {
        Log(options_->info_log, "Expanding@%d %d+%d (%ld+%ld bytes) to %d+%d (%ld+%ld bytes)\n",
            level, int(c->inputs_[0].size()), int(c->inputs_[1].size()), long(inputs0_size),
            long(inputs1_size), int(expanded0.size()), int(expanded1.size()),
            long(expanded0_size), long(inputs1_size));
        smallest = new_start;
        largest = new_limit;
        c->inputs_[0] = expanded0;
        c->inputs_[1] = expanded1;
        GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
      }
    }
  }

  //Step3
  if( level + 2 < config::kNumLevels )
  {
    current_->GetOverlappingInputs(level + 2, &all_start, &all_limit, &c->grandparents_);
  }
  compact_pointer_[level] = largest.Encode().ToString();
  c->edit_.SetCompactPointer(level, largest);
}

Step1:调用AddBoundaryInputs检查inputs_[0]中的文件input_file和level层的文件level_file,找到所有满足以下两个条件的input_file和level_file:

  • input_file.largest_key < level_file.smallest_key
  • input_file.largest_key.user_key == level_file.smallest_key.user_key

SST文件中存储的key是internal key,由user key + seq + type 组成。user key相同时,seq越大,则internal key越小。两个user key相同的internal key,seq大的key有效性高于seq小的key,所以此时input_file的largest_key的有效性高于level_file的smallest_key。如果把input_file合并到了level+1层,而把level_file留在了level层。那么下次查找这个user key就会在level层找到level_file中已经无效地user key。所以要把level_file也合并到level+1层中。新加入的文件同样可能存在上述情况,所以要递归地去添加level层文件。

GetRange返回一组文件中的最小key(smallest)和最大key(largest),调用GetOverlappingInputs检查level+1层中和[smallest,largest]范围重叠的文件,放到inputs_[1]中。这些文件就是和要合并的level层文件有重叠的文件了,也就是本次合并涉及到的level+1层文件。GetRange2返回两组文件共同的最小key(all_start)和最大key(all_limit)。

Step2:其实Step1的数据就已经够合并了,但是作者 want more [狗头.jpg],所以这里会检查是否能在不增加level+1层要合并的文件数量的同时,增加level层选中的文件数量。

  • Step2.1:先查找level层和[all_start,all_limit]有重叠的文件集合,显然这个集合一定会是inputs_[0]的超集。
  • Step2.2:和Step1类似,调用AddBoundaryInputs检查level层是否有更多的文件需要被加入。
  • Step2.3:计算inputs0_size、inputs1_size和expanded0_size。分别表示原level层选中文件总size,level+1层选中文件总size,level层扩展后的文件总size。
  • Step2.4:如果level层扩展后选中的文件数量超过原level层选中的文件数量,即有增加文件。且inputs1_size和expanded0_size之和没有超过扩展size阈值,那么就进入Step2.5,反之结束扩展尝试。
  • Step2.5:检查扩展后,level+1层是否需要新增文件,如果不需要,那么level层使用扩展文件集合,并更新all_start和all_limit。如果需要新增文件,就不适用扩展集合。

Step3

  • 统计level+2层中和[all_start,all_limit]重合的文件,放到Compaction的grandparents_中,这个在IsTrivialMove中会被用于判断是否可以直接把level层文件升到level+1层。
  • 更新compact_pointer_[level]的值为本次选中的level层文件的最大key(largest),并把更新信息写到VersionEdit中。

合并 DoCompactionWork

PickCompaction负责挑选要合并的SST文件,而合并操作由DoCompactionWork来完成。

Status DBImpl::DoCompactionWork(CompactionState *compact)
{
  //Step1
  if ( snapshots_.empty())
  {
    compact->smallest_snapshot = versions_->LastSequence();
  } else
  {
    compact->smallest_snapshot = snapshots_.oldest()->sequence_number();
  }

  //Step2
  Iterator *input = versions_->MakeInputIterator(compact->compaction);
  mutex_.Unlock();

  //Step3
  input->SeekToFirst();
  Status status;
  ParsedInternalKey ikey;
  std::string current_user_key;
  bool has_current_user_key = false;
  SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
  while ( input->Valid() && !shutting_down_.load(std::memory_order_acquire))
  {
    //Step3.1
    if ( has_imm_.load(std::memory_order_relaxed))
    {
      const uint64_t imm_start = env_->NowMicros();
      mutex_.Lock();
      if ( imm_ != nullptr )
      {
        CompactMemTable();
        background_work_finished_signal_.SignalAll();
      }
      mutex_.Unlock();
      imm_micros += (env_->NowMicros() - imm_start);
    }

    //Step3.2
    Slice key = input->key();
    if ( compact->compaction->ShouldStopBefore(key) && compact->builder != nullptr )
    {
      status = FinishCompactionOutputFile(compact, input);
      if ( !status.ok())
      {
        break;
      }
    }

    //Step3.3
    bool drop = false;
    if ( !ParseInternalKey(key, &ikey))
    {
      current_user_key.clear();
      has_current_user_key = false;
      last_sequence_for_key = kMaxSequenceNumber;
    } else
    {
      if ( !has_current_user_key ||
           user_comparator()->Compare(ikey.user_key, Slice(current_user_key)) != 0 )
      {
        current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
        has_current_user_key = true;
        last_sequence_for_key = kMaxSequenceNumber;
      }

      if ( last_sequence_for_key <= compact->smallest_snapshot )
      {
        drop = true;
      } else if ( ikey.type == kTypeDeletion && ikey.sequence <= compact->smallest_snapshot &&
                  compact->compaction->IsBaseLevelForKey(ikey.user_key))
      {
        drop = true;
      }

      last_sequence_for_key = ikey.sequence;
    }

    //Step 3.4
    if ( !drop )
    {
      // Open output file if necessary
      if ( compact->builder == nullptr )
      {
        status = OpenCompactionOutputFile(compact);
        if ( !status.ok())
        {
          break;
        }
      }
      if ( compact->builder->NumEntries() == 0 )
      {
        compact->current_output()->smallest.DecodeFrom(key);
      }
      compact->current_output()->largest.DecodeFrom(key);
      compact->builder->Add(key, input->value());

      if ( compact->builder->FileSize() >= compact->compaction->MaxOutputFileSize())
      {
        status = FinishCompactionOutputFile(compact, input);
        if ( !status.ok())
        {
          break;
        }
      }
    }

    input->Next();
  }

  //Step4
  if ( status.ok() && shutting_down_.load(std::memory_order_acquire))
  {
    status = Status::IOError("Deleting DB during compaction");
  }
  if ( status.ok() && compact->builder != nullptr )
  {
    status = FinishCompactionOutputFile(compact, input);
  }
  if ( status.ok())
  {
    status = input->status();
  }
  delete input;
  input = nullptr;

  //Step5
  CompactionStats stats;
  stats.micros = env_->NowMicros() - start_micros - imm_micros;
  for ( int which = 0; which < 2; which++ )
  {
    for ( int i = 0; i < compact->compaction->num_input_files(which); i++ )
    {
      stats.bytes_read += compact->compaction->input(which, i)->file_size;
    }
  }
  for ( size_t i = 0; i < compact->outputs.size(); i++ )
  {
    stats.bytes_written += compact->outputs[i].file_size;
  }
  mutex_.Lock();
  stats_[compact->compaction->level() + 1].Add(stats);

  if ( status.ok())
  {
    status = InstallCompactionResults(compact);
  }
  if ( !status.ok())
  {
    RecordBackgroundError(status);
  }
  VersionSet::LevelSummaryStorage tmp;
  return status;
}

我们分成5个步骤来分析。

Step1:获取最旧的仍然在被使用的快照(snapshot)。所谓快照,指的是版本号,即某个sequence。一个快照表示插入该sequence的key后的瞬时状态,不受后续写入key的影响。用户读取时,如果指定快照,那么对用户来说,整个数据库始终处于该快照的状态,后续写入key或者删除key都不会影响用户读到的结果。

这里检查snapshots_列表,如果为空,就把smallest_snapshot置为最新的sequence,如果snapshots_不为空,就把smallest_snapshot置为最老的snapshot。smallest_snapshot在后面将被用于判断重复的user key是否可以删除。

Step2:这里只有两行,但是逻辑一点都不少。前面PickCompaction把要合并的level层和level+1层文件保存在了compact->compaction中,每个SST文件内部都是有序的,现在合并这些SST文件其实就是合并多个有序列表,那么每次都要挑选最小的key加入。MakeInputIterator是对这个逻辑封装,返回的Iterator,每次调用Next返回下一个最小的key。对后续的逻辑来说就是用这个Iterator按顺序取key,生成新的SST文件。MakeInputIterator其实已经是读逻辑的范畴了,这个等以后讲到读逻辑的时候再说吧,这里知道返回的Iterator的作用就行了。

Step3:把Iterator指向第一个key,然后初始化一些字段。current_user_key记录上一个key,初始为空。has_current_user_key表示是否有上一个key,初始为false。last_sequence_for_key记录上一个key的sequence,初始为kMaxSequenceNumber,此时表示没有上一个key。

Step3.1:如果imm_(即Immutable Memtable)不为空,就先把imm_落盘,这里和合并逻辑没有关系,因为落盘imm_和合并SST都是要加锁的,两者不能同时进行,所以优先落盘imm_,以免后台合并阻塞了imm_落盘,进而影响Memtable的写入。详细逻辑请见LevelDB源码解析(12) Memtable落盘

Step3.2:多个level层和level+1层SST文件的合并会产生多个level+1层SST文件,ShouldStopBefore用于判断是否结束当前输出文件的,ShouldStopBefor比较当前文件已经插入的key和level+2层重叠的文件的总size是否超过阈值,如果超过,就结束文件。避免这个新的level+1层文件和level+2层重叠的文件数太多,导致以后该level+1层文件合并到level+2层时牵扯到太多level+2层文件。如果ShouldStopBefore判断应当结束文件,就会调用FinishCompactionOutputFile填充文件尾部的数据,并结束文件,一个新的SST文件就诞生了。

Step3.3:前面说过internal key由user key + seq + type 组成,一个user key可能有多个版本的internal key,旧版本的inernal key可能是不需要的,这一步就是要找出不需要的旧版本internal key。首先解析产生user key,如果解析失败,那么不会对这个key进行处理,一会儿会被直接插入到新SST文件中。

如果解析成功,判断是否已经有current_user_key了,如果没有current_user_key,那么这个internal key就是当前user key的第一个internal key,因为internal key越新排序越靠前,所以第一个internal key也是最新的。如果有current_user_key,就进行比较,如果相等,说明当前inernal key不是最新的了。那么等待它的可能就是法律的制裁了(大雾)。如果不相等。那么当前internal key就是当前user key第一个也是最新的internal key了。修改current_user_key等变量,把当前user key作为最新的current_user_key。

接着判断last_sequence_for_key,可以发现,前面如果判断当前的internal key是当前user key的第一个,那么last_sequence_for_key会被置为kMaxSequenceNumber,那这里的判断一定为false,即一定不会drop,并且last_sequence_for_key会被置为当前internal key的sequence。如果当前的internal key不是当前user key的第一个,即不是最新的user key版本,那么如果last_sequence_for_key(即上一个相同user key的internal key的sequence)小于smallest_snapshot,由于intern key的排序策略,当前的internal key的sequence一定也小于smallest_snapshot,两个internal key的sequence都小于smallest_snapshot,当前的ingernal key的value肯定不会再被使用了,就可以删除了,所以把drop置为true。

如果当前internal key的类型是kTypeDeletion,即删除。那么如果当前interanl key的sequence小于smallest_snapshot。就调用IsBaseLevelForKey,这个函数会判断level+2及以上的层是否有这个user key,如果没有的话,就可以直接删除当前internal key了,这个很好理解,既然当前user key在更高层没有key,那么删除当前internal key的效果就等于用户读不到这个key了,和把key标记为删除的效果是一样的。如果level+1及以上层存在这个user key,那这个internal key是不能删除的,因为如果删除了,查找时就会查找到上层的user key,返回一个错误的结果。因为查找时是从低层开始逐步往上查找的,所以找到这个标记为删除的key,就会直接返回没有这个Key的结果,而不是往上继续查找。

Step3.4:如果drop为true,那么直接跳过本步骤,这个key就被丢弃了。如果drop为false,那么执行本步骤。如果builder为空,新建一个SST文件用于输出,比如前面FinishCompactionOutputFile结束一个文件后,builder就会被置为空,这里就需要新建一个文件了。然后调用Add把当前internal key加入到新文件中。然后判断新文件的size是否超过了阈值,如果超过了就调用FinishCompactionOutputFile结束当前文件。构建文件复用了Memtable落盘使用的TableBuilder,详细逻辑请见LevelDB源码解析(10) TableBuilder(Memtable序列化)

Step4:这里主要是结束当前文件,到这里,所有输出文件都已经完成了。

Step5:更新统计信息和版本信息,这个和合并的核心逻辑关系不大,限于篇幅,就不介绍了。

总结

本文篇幅有点长,主要介绍了如何挑选要合并的层、SST文件,以及合并时的详细逻辑,对于版本控制点到为止,先鸽了,有机会再介绍吧。

源码版本

https://github.com/google/leveldb/releases/tag/1.22