LevelDB源码粗略分析

其它DB
浏览数 - 474发布于 - 2025-05-02 - 12:03

图一乐呵 并不严谨

[!IMPORTANT]

基础知识:

[TOC]

核心接口分析

db_impl.cc

函数 作用 备注
Status DBImpl::Write(const WriteOptions& options, WriteBatch updates)* 将需要存储的kv对写入 初始化一个writer,持有mutex,为writer设置需要写入的KV对以及任务的状态信息,此后上锁,将自身加入writers队列,这是等待写入的任务队列,writer若是非完成状态且不是队列的最前端那么将进入等待,至此这个写任务正式开始处理,通过MakeRoomForWrite函数检查memTable的状态,处理时尝试将当前写任务和正在等待的写任务合并处理,处理完这步就释放锁,把数据写入Log,随后写入memTable,此刻其他写任务也能正常进入任务队列,随后又上锁,开始处理错误以及其他所需的元信息,此时数据已经写入memTable,只需要把刚才批量处理掉的任务以及自身移出任务队列即可,此时若队列非空,那就唤醒头部的任务(writer)
**Status DBImpl::MakeRoomForWrite(bool force) ** 确保memTable能正常使用 开始├─ 1. 是否有后台错误? → 返回错误│├─ 2. L0 文件数是否触发了写入减速? → 延迟 1ms│├─ 3. 当前内存表是否有空间? → 允许写入│├─ 4. 是否有未完成的不可变内存表(imm_)压缩? → 等待压缩完成│├─ 5. L0 文件数是否触发了停止写入? → 等待压缩完成│└─ 6. 其他情况 → 切换内存表 + 触发压缩
*Status DBImpl::Get(const ReadOptions& options, const Slice& key,std::string value) ** 读取KV 尝试在memTable,immuMemTable,SSTable读,若在SSTable读需要更新统计信息,同时读取的统计信息可能会触发压缩条件,所以需要尝试压缩

MemTable 实现分析

MemTable的实现依赖于SkipList,这是一种媲美红黑树的数据结构

示例(ai生成)

class Skiplist {
public:
    struct Node {
        int value;
        std::vector<Node *> forward;    
    	Node(int value, int level) : value(value), forward(level, nullptr) {}
};

Skiplist() {
    srand(time(0));
    max_level = 4;
    header = new Node(-1, max_level);
    level = 0;
}

bool search(int target) {
    Node *current = header;
    for (int i = level; i >= 0; --i) {
        while (current->forward[i] != nullptr &&
               current->forward[i]->value < target) {
            current = current->forward[i];
        }
        if (current->forward[i] != nullptr &&
            current->forward[i]->value == target) {
            return true;
        }
    }
    return false;
}

void add(int num) {
    Node *current = header;
    std::vector<Node *> list(max_level, nullptr);
    for (int i = level; i >= 0; --i) {
        while (current->forward[i] != nullptr &&
               current->forward[i]->value < num) {
            current = current->forward[i];
        }
        list[i] = current->forward[i];
    }
    int node_lv = randomLevel();
    if (node_lv > level) {
        for (int i = level + 1; i <= node_lv; i++) {
            list[i] = header->forward[i];
        }
        level = node_lv;
    }
    Node *new_node = new Node(num, node_lv + 1);
    for (int i = 0; i <= node_lv; i++) {
        new_node->forward[i] = list[i];
        list[i] = new_node;
    }
}

bool erase(int num) {
    std::vector<Node *> update(max_level, nullptr);
    Node *current = header;
    for (int i = level; i >= 0; --i) {
        while (current->forward[i] != nullptr &&
               current->forward[i]->value < num) {
            current = current->forward[i];
        }
        update[i] = current;
    }

    current = current->forward[0];
    if (current != nullptr && current->value == num) {
        for (int i = 0; i <= level; ++i) {
            if (update[i]->forward[i] != current) {
                break;
            }
            update[i]->forward[i] = current->forward[i];
        }

        delete current; 
        while (level > 0 && header->forward[level] == nullptr) {
            --level;
        }

        return true; 
    }
    return false;  
}

int randomLevel() {
    int lv = 0;
    while (rand() % 2 && lv < max_level - 1) {
        lv++;
    }
    return lv;
}
private:
    int level;
    int max_level;
    Node *header;
};    

编码格式

key_len key tag value_len value

tag = sequence(7B) + ValueType(1B)

都使用varint编码

函数 作用 具体描述
**void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key, const Slice& value) ** 写入数据到memTable 编码并插入跳表
bool MemTable::Get(const LookupKey& key, std::string value, Status s)** 查询 通过内部跳表初始化一个迭代器,在跳表中查找,找到后再根据写入的格式进行解码,最后根据数据的tag判断,如果数据是删除状态的就返回不存在

WAL日志分析

日志文件数据按32KB固定大小的Block写入,写入的数据称为Record

编码格式

Header Raw
checksum 4B | length 2B type 1B | data length B

当遇到的内容非常大的数据,或者当前的Block剩余空间不足以存储当前要存储的数据时,会先将该数据进行分段,然后组织成多条Record存入多个Block中,为了标识Record的类型引入了type,他是一个枚举类型

Name value note
kFullType 1 表示该Record完整存储在当前block中
kFirstType 2 表示为第一个分段
kMiddleType 3 表示为中间分段
kLastType 4 表示为最后一个分段

Writer用来追加写入数据,Reader在启动后调用,通过从WAL日志中读取数据来尝试恢复之前内存中可能丢失的数据

Writer的实现

WAL日志通过这个结构对外提供AddRecord(data)的接口,负责编码和写入文件

Status Writer::AddRecord(const Slice& slice) {
  const char* ptr = slice.data();
  size_t left = slice.size();

  // Fragment the record if necessary and emit it.  Note that if slice
  // is empty, we still want to iterate once to emit a single
  // zero-length record
  Status s;
  bool begin = true;
  do {
    const int leftover = kBlockSize - block_offset_;
    assert(leftover >= 0);
    if (leftover < kHeaderSize) {
      // Switch to a new block
      if (leftover > 0) {
        // Fill the trailer (literal below relies on kHeaderSize being 7)
        static_assert(kHeaderSize == 7, "");
        dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover));
      }
      block_offset_ = 0;
    }

    // Invariant: we never leave < kHeaderSize bytes in a block.
    assert(kBlockSize - block_offset_ - kHeaderSize >= 0);

    const size_t avail = kBlockSize - block_offset_ - kHeaderSize;
    const size_t fragment_length = (left < avail) ? left : avail;

    RecordType type;
    //如果当前段的长度和待写入数据的剩余长度相等,说明是最后一个record
    const bool end = (left == fragment_length);
      //如果begin和end都为true,说明一次就能写完
    if (begin && end) {
      type = kFullType;
    } else if (begin) {
        //当前是第一段
      type = kFirstType;
    } else if (end) {
        //当前是最后段
      type = kLastType;
    } else {
        //是中间段
      type = kMiddleType;
    }
      //将ptr~ptr+fragment_length写入log
    s = EmitPhysicalRecord(type, ptr, fragment_length);
      //更新指针
    ptr += fragment_length;
    left -= fragment_length;
    begin = false;
  } while (s.ok() && left > 0);
  return s;
}

没啥好介绍的,下面是写入逻辑,通过dest_的append和flush完成

Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr,
                                  size_t length) {
  assert(length <= 0xffff);  // Must fit in two bytes
  assert(block_offset_ + kHeaderSize + length <= kBlockSize);

  // Format the header
  char buf[kHeaderSize];
  buf[4] = static_cast<char>(length & 0xff);
  buf[5] = static_cast<char>(length >> 8);
  buf[6] = static_cast<char>(t);

  // Compute the crc of the record type and the payload.
  uint32_t crc = crc32c::Extend(type_crc_[t], ptr, length);
  crc = crc32c::Mask(crc);  // Adjust for storage
  EncodeFixed32(buf, crc);

  // Write the header and the payload
  Status s = dest_->Append(Slice(buf, kHeaderSize));
  if (s.ok()) {
    s = dest_->Append(Slice(ptr, length));
    if (s.ok()) {
      s = dest_->Flush();
    }
  }
  block_offset_ += kHeaderSize + length;
  return s;
}

WritableFile实现

是顺序写文件的抽象结构

注意当调用Append(data)追加完数据以后,上层应用程序需要手动调用Flush()方法来触发缓冲区中的数据写入磁盘。同时LevelDB提供了处理写操作的WriteOptions参数,可选地设置参数sync的值。如果sync设置为true,那么写入WAL日志后还会同步调用WritableFile结构的Sync()方法确保数据一定写到磁盘上。因为系统默认数据会先写入操作系统的缓冲区,然后在将来的某个时刻,操作系统会将缓冲区中的数据刷到磁盘上。如果数据未刷到磁盘之前操作系统宕机了,那么数据仍然有丢失的风险。如果只是进程崩溃了,操作系统正常运行,则不会有风险。为了确保数据一定能写入磁盘,可以在调用LevelDB写入数据时将WriteOptions参数中的sync设置为true。

Reader实现

主要是ReadRecord(record)方法,分为二步,从文件以block为单位加载到buffer,解码数据,重建写入前的数据,如果是分段的那就合并

SSTable实现

SSTable一般由Minor压缩和Major压缩生成,不管怎么样他都是只读文件,生成后不会改变,且他的数据是有序的,查询的时候需要使用这个性质

SSTable 结构

Data Block1 数据 存储KV数据
Data Block2 数据 存储kv数据
...... 数据 存储kv数据
DataBlock n 数据 存储kv数据
Filter Block 数据 布隆过滤器
Meta Index Block 索引 -> Filter Block 存储Filter Block的索引信息
Index Block 索引 -> Data Block n 存储 Data Block的索引信息
Footer 索引 -> Index Block | Meta Index Block 存储 Meta Index Block 和 Index Block的索引信息

Block结构

每个Block默认大小为4KB,由数据,压缩类型,校验码组成,默认采用Snappy压缩算法

Data CompressionType CRC

DataBlock

kv 1 kv数据
kv 2 kv数据
...... kv数据
kv n kv数据
restart offset 1(4) 重启点数据
restart offset 2(4) 重启点数据
...... 重启点数据
restart offset m(4) 重启点数据
restart count(4) 重启点数据
Type(1) 压缩类型
CRC(4) 校验码

在DataBlock中的KV数据如下

K的共享长度 K的未共享长度 V的长度 K的未共享内容 V的内容
shared_key_len unshared_key_len value_len unshared key value

LevelDB中多条KV数据之间是按照K的顺序有序存储的,这意味着相邻的多条数据之间key的内容可能会存在相同部分,因此leveldb在存储每条kv数据时对k部分进行压缩,将key分为两部分:第一部分是当前的k和前一条kv的k重复的部分,第二部分是不重复部分。

这样操作,空间得到了优化,问题是获取一条完整的kv数据需要对k进行拼接,如果全部的kv数据都按照这个方式存储那么恢复k的过程很长,所以引入重启点这个概念。通过设定一个数据间隔,每隔几条数据就记录一个完整的k,然后再按照上述方式进行压缩,当达到间隔后再记录一次完整的k,不断重复

重启点数据

重启点数据由多个重启项和重启点个数两个部分组成,大小均为4B。每个重启项记录的是该条重启点的KV数据写入DataBlock中的位置。通过该重启点就可以直接读取该条KV数据的完整数据。位于两个重启点之间的数据在恢复的时候需要顺序遍历逐个恢复。重启点个数记录的就是当前DataBlock总共存储的重启点的个数。重启点间隔通过参数来配置,默认16,16条数据就要保存一个重启点,这是一个超参数

Index Block

一个SSTable中有多个Data Block存储kv数据,满了之后就需要打开另一个文件继续写,采用Index Block来完成对已满的Data Block进行索引,他和**Data Block的结构完全一样**,不难设想对Data Block索引信息为key--offset--length,key表示当前DataBlock保存的所有KV数据中K的最大值,offset和length表示该DataBlock写入SSTable的位置以及长度。实际上LevelDB索引信息的K是当前DataBlock的K的最大值(最后一个)和下一个DataBlock的k的最小值(第一个)的最短分隔符,这样的设计可以减少占用空间。V是经过BlockHandle结构编码后的内容,就是封装了前面的offset和length

Filter Block

就是布隆过滤器,存在误判,发生误判概率为 $$ p = [1-(1-\frac{1}{m})^{kn}]^k \approx (1-e^{\frac{kn}{m}})^k $$ m为位数组大小,n为元素数量,k为哈希函数个数,最优k如下 $$ 当k=0.7\frac{m}{n}时误判率最低,p=f(k)=(1-e^{\frac{kn}{m}})^k=2^{-ln2\frac{m}{n}} \approx (0.6158)^{\frac{m}{n}} $$ 可以得到 $$ m=-\frac{nlnp}{{(ln2})^2} $$ 结构如下

filter 1 过滤器内容
filter 2 过滤器内容
...... 过滤器内容
filter n 过滤器内容
filter offset 1(4) 过滤器偏移量
filter offset 2(4) 过滤器偏移量
...... 过滤器偏移量
filter offset n(4) 过滤器偏移量
filter data size 过滤器元信息
filter base(1) 过滤器元信息
Type(1) 压缩类型
CRC(4) 检验码

对于SSTable而言,每2kb的kv数据就会生成一个布隆过滤器,布隆过滤器的相关数据存入filter。过滤器偏移量记录每个过滤器内容写入的位置,根据前后两个过滤器的偏移量就可以获取对应的过滤器的内容,每个偏移量使用4B存储。过滤器元数据主要包含过滤器内容大小和过滤器基数,过滤器内容大小用4B存储,主要记录过滤器内容所占大小,过滤器基数在LevelDB中是一个常数11,代表2的11次方,即每2KB分配一个布隆过滤器

Meta Index Block

当Filter Block写完后也需要记录其索引信息,该索引信息在SSTable中是采用Meta Index Block,以KV数据存储的,其**和Data Block结构一致**,Filter Block的索引信息中K为 “filter."加上布隆过滤器的名字。V也是一个Block Handle结构,存储Filter Block在SSTable中写入的位置和长度。通过这个可以得到Filter Block的完整内容

结构如下

Meta Index Block 的索引
Index Block 的索引
Padding 填充
Magic 魔数

Block的写入

从上面可以得知,只有Filter Block的结构和其他Block不同,所以对于Filter Block他的读写需要单独的Builder,这里分为FilterBlockBuilder和BlockBuilder,读取则是FilterBlockReader和Block::Iter

BlockBuilder结构

table/block_builder.cc

核心函数

void BlockBuilder::Add(const Slice& key, const Slice& value) 添加KV数据
Slice BlockBuilder::Finish() 返回完整Block
void BlockBuilder::Add(const Slice& key, const Slice& value) {
  Slice last_key_piece(last_key_);
  assert(!finished_);// 确保未调用 Finish()
  assert(counter_ <= options_->block_restart_interval);
  assert(buffer_.empty()  // No values yet?
         || options_->comparator->Compare(key, last_key_piece) > 0);// 保证有序
  size_t shared = 0;
  if (counter_ < options_->block_restart_interval) {
    // See how much sharing to do with previous string
    // 计算当前 key 与前一个 key 的共享前缀长度
    const size_t min_length = std::min(last_key_piece.size(), key.size());
    while ((shared < min_length) && (last_key_piece[shared] == key[shared])) {
      shared++;
    }
  } else {
    // 达到重启间隔,记录重启点并重置计数器
    // Restart compression
    restarts_.push_back(buffer_.size());
    counter_ = 0;
  }
  const size_t non_shared = key.size() - shared;

  // Add "<shared><non_shared><value_size>" to buffer_
  PutVarint32(&buffer_, shared);
  PutVarint32(&buffer_, non_shared);
  PutVarint32(&buffer_, value.size());

  // Add string delta to buffer_ followed by value
  buffer_.append(key.data() + shared, non_shared);
  buffer_.append(value.data(), value.size());

  // Update state
  last_key_.resize(shared);
  last_key_.append(key.data() + shared, non_shared);
  // 更新 last_key_ 为当前 key
  assert(Slice(last_key_) == key);
  counter_++;
}
//添加重启点数据进Block
Slice BlockBuilder::Finish() {
  // Append restart array
  for (size_t i = 0; i < restarts_.size(); i++) {
    PutFixed32(&buffer_, restarts_[i]);
  }
  PutFixed32(&buffer_, restarts_.size());
  finished_ = true;
  return Slice(buffer_);
}

FilterBlockBuilder结构

table/filter_block.cc

成员变量

变量名 类型 描述
policy_ const FilterPolicy* 指向过滤器策略的指针(如 Bloom Filter 的具体实现),负责创建和检查过滤器。
keys_ std::string 扁平化存储所有键的字符串,例如将多个键连续存储为 "key1key2key3..."
start_ std::vector<size_t> 记录每个键在 keys_ 中的起始位置索引,用于快速定位键内容(例如 start_ = [0, 4, 8] 表示三个键分别从 0、4、8 字节开始)。
result_ std::string 最终生成的 Filter Block 数据,包含所有过滤器二进制内容及元数据。
tmp_keys_ std::vector 临时存储当前数据块的键集合,用于调用 policy_->CreateFilter() 生成单个过滤器。
filter_offsets_ std::vector<uint32_t> 记录每个过滤器在 result_ 中的偏移量(起始位置),用于快速定位特定数据块的过滤器。

核心函数

void StartBlock(uint64_t block_offset) 初始化
void AddKey(const Slice& key) 添加Key
Slice Finish(); 完成
void GenerateFilter() 生成过滤器
void FilterBlockBuilder::StartBlock(uint64_t block_offset) {
  uint64_t filter_index = (block_offset / kFilterBase);
  assert(filter_index >= filter_offsets_.size());
  while (filter_index > filter_offsets_.size()) {
    GenerateFilter();
  }
}

void FilterBlockBuilder::AddKey(const Slice& key) {
  Slice k = key;
  start_.push_back(keys_.size());
  keys_.append(k.data(), k.size());
}

Slice FilterBlockBuilder::Finish() {
  if (!start_.empty()) {
    GenerateFilter();
  }

  // Append array of per-filter offsets
  const uint32_t array_offset = result_.size();
  for (size_t i = 0; i < filter_offsets_.size(); i++) {
    PutFixed32(&result_, filter_offsets_[i]);
  }

  PutFixed32(&result_, array_offset);
  result_.push_back(kFilterBaseLg);  // Save encoding parameter in result
  return Slice(result_);
}

void FilterBlockBuilder::GenerateFilter() {
  const size_t num_keys = start_.size();
  if (num_keys == 0) {
    // Fast path if there are no keys for this filter
    filter_offsets_.push_back(result_.size());
    return;
  }

  // Make list of keys from flattened key structure
  start_.push_back(keys_.size());  // Simplify length computation
  tmp_keys_.resize(num_keys);
  for (size_t i = 0; i < num_keys; i++) {
    const char* base = keys_.data() + start_[i];
    size_t length = start_[i + 1] - start_[i];
    tmp_keys_[i] = Slice(base, length);
  }

  // Generate filter for current set of keys and append to result_.
  filter_offsets_.push_back(result_.size());
  policy_->CreateFilter(&tmp_keys_[0], static_cast<int>(num_keys), &result_);

  tmp_keys_.clear();
  keys_.clear();
  start_.clear();
}

前面提到每2k需要一个Filter,这反应在 uint64_t filter_index = (block_offset / kFilterBase)其中kFilterBase就是2048,计算得到当前数据的FilterIndex大于filter数量时那就创建新的Filter,在向SSTable中添加数据时会同步调用FilterBlockBuilder的Add方法来设置Filter,添加的Key会扁平化存储在keys中,同时在starts中存储索引信息。当一个SSTable写满后会调用Finish生成一个DataBlock,同时也会同步调用Filter的Finsh将布隆过滤器的数据写入FilterBlock。过滤器的创建在bloom.cc文件中,跳过不讲了。

Block结构和FilterBlockReade结构

Block结构

SSTable中每个Block读取出来后通过Block结构来存储,而读取是通过Block::Iter迭代器实现的。

成员变量

变量名 类型 描述
data_ const char* 指向块数据的指针,存储实际内容(不可修改)
size_ size_t 块数据的实际大小(字节)
restart_offset_ uint32_t 块内重启点(Restart Point)数组在 data_中的偏移量,用于快速定位
owned_ bool 标记 Block 是否拥有 data_ 的内存所有权,控制析构时是否释放资源
Block::Block(const BlockContents& contents)
    : data_(contents.data.data()),
      size_(contents.data.size()),
      owned_(contents.heap_allocated) {
  if (size_ < sizeof(uint32_t)) {
    size_ = 0;  // Error marker
  } else {
    size_t max_restarts_allowed = (size_ - sizeof(uint32_t)) / sizeof(uint32_t);
    if (NumRestarts() > max_restarts_allowed) {
      // The size is too small for NumRestarts()
      size_ = 0;
    } else {
      restart_offset_ = size_ - (1 + NumRestarts()) * sizeof(uint32_t);
    }
  }
}

先进行基本校验,DataBlock末尾必须存储一个重启点数据大小(uint32_t),再验证重启点数量合法性和计算重启点数组偏移量。

// Helper routine: decode the next block entry starting at "p",
// storing the number of shared key bytes, non_shared key bytes,
// and the length of the value in "*shared", "*non_shared", and
// "*value_length", respectively.  Will not dereference past "limit".
//
// If any errors are detected, returns nullptr.  Otherwise, returns a
// pointer to the key delta (just past the three decoded values).
static inline const char* DecodeEntry(const char* p, const char* limit,
                                      uint32_t* shared, uint32_t* non_shared,
                                      uint32_t* value_length) {
  if (limit - p < 3) return nullptr;
  *shared = reinterpret_cast<const uint8_t*>(p)[0];
  *non_shared = reinterpret_cast<const uint8_t*>(p)[1];
  *value_length = reinterpret_cast<const uint8_t*>(p)[2];
  if ((*shared | *non_shared | *value_length) < 128) {
    // Fast path: all three values are encoded in one byte each
    p += 3;
  } else {
    if ((p = GetVarint32Ptr(p, limit, shared)) == nullptr) return nullptr;
    if ((p = GetVarint32Ptr(p, limit, non_shared)) == nullptr) return nullptr;
    if ((p = GetVarint32Ptr(p, limit, value_length)) == nullptr) return nullptr;
  }

  if (static_cast<uint32_t>(limit - p) < (*non_shared + *value_length)) {
    return nullptr;
  }
  return p;
}

从p位置开始解析entry的shared,non_shared,value_length等信息

Block通过迭代器来读取,在Block中进行查找时,主要通过Block::Iter的Seek方法完成

  void Seek(const Slice& target) override {
    // Binary search in restart array to find the last restart point
    // with a key < target
    uint32_t left = 0;
    uint32_t right = num_restarts_ - 1;
    int current_key_compare = 0;

    if (Valid()) {
      // If we're already scanning, use the current position as a starting
      // point. This is beneficial if the key we're seeking to is ahead of the
      // current position.
      current_key_compare = Compare(key_, target);
      if (current_key_compare < 0) {
        // key_ is smaller than target
        left = restart_index_;
      } else if (current_key_compare > 0) {
        right = restart_index_;
      } else {
        // We're seeking to the key we're already at.
        return;
      }
    }

    while (left < right) {
      uint32_t mid = (left + right + 1) / 2;
      uint32_t region_offset = GetRestartPoint(mid);
      uint32_t shared, non_shared, value_length;
      const char* key_ptr =
          DecodeEntry(data_ + region_offset, data_ + restarts_, &shared,
                      &non_shared, &value_length);
      if (key_ptr == nullptr || (shared != 0)) {
        CorruptionError();
        return;
      }
      Slice mid_key(key_ptr, non_shared);
      if (Compare(mid_key, target) < 0) {
        // Key at "mid" is smaller than "target".  Therefore all
        // blocks before "mid" are uninteresting.
        left = mid;
      } else {
        // Key at "mid" is >= "target".  Therefore all blocks at or
        // after "mid" are uninteresting.
        right = mid - 1;
      }
    }

    // We might be able to use our current position within the restart block.
    // This is true if we determined the key we desire is in the current block
    // and is after than the current key.
    assert(current_key_compare == 0 || Valid());
    bool skip_seek = left == restart_index_ && current_key_compare < 0;
    if (!skip_seek) {
      SeekToRestartPoint(left);
    }
    // Linear search (within restart block) for first key >= target
    while (true) {
      if (!ParseNextKey()) {
        return;
      }
      if (Compare(key_, target) >= 0) {
        return;
      }
    }
  }
  bool ParseNextKey() {
    current_ = NextEntryOffset();
    const char* p = data_ + current_;
    const char* limit = data_ + restarts_;  // Restarts come right after data
    if (p >= limit) {
      // No more entries to return.  Mark as invalid.
      current_ = restarts_;
      restart_index_ = num_restarts_;
      return false;
    }

    // Decode next entry
    uint32_t shared, non_shared, value_length;
    p = DecodeEntry(p, limit, &shared, &non_shared, &value_length);
    if (p == nullptr || key_.size() < shared) {
      CorruptionError();
      return false;
    } else {
      key_.resize(shared);
      key_.append(p, non_shared);
      value_ = Slice(p + non_shared, value_length);
      while (restart_index_ + 1 < num_restarts_ &&
             GetRestartPoint(restart_index_ + 1) < current_) {
        ++restart_index_;
      }
      return true;
    }
  }

首先在重启点列表中进行二分查找,定位到比target小的最近的一个重启点,然后在该重启点开始顺序解析Entry(KV数据)进行查找比较,直到找到为止

FilterBlockReader结构

很简单,没什么好写的

SSTable的写入和读取

TableBuilder

void TableBuilder::Add(const Slice& key, const Slice& value) {
  Rep* r = rep_;
  assert(!r->closed);
  if (!ok()) return;
  if (r->num_entries > 0) {
    assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0);
  }

  if (r->pending_index_entry) {
    assert(r->data_block.empty());
    r->options.comparator->FindShortestSeparator(&r->last_key, key);
    std::string handle_encoding;
    r->pending_handle.EncodeTo(&handle_encoding);
    r->index_block.Add(r->last_key, Slice(handle_encoding));
    r->pending_index_entry = false;
  }

  if (r->filter_block != nullptr) {
    r->filter_block->AddKey(key);
  }

  r->last_key.assign(key.data(), key.size());
  r->num_entries++;
  r->data_block.Add(key, value);

  const size_t estimated_block_size = r->data_block.CurrentSizeEstimate();
  if (estimated_block_size >= r->options.block_size) {
    Flush();
  }
}

当调用SSTable的Add(k,v)方法添加一条KV数据时,首先会将该数据依次加入Data Block和Filter Block中,添加完成后再判定当前的Data Block大小是否已经大于设定的阈值了。如果大于阈值则会调用Flush()方法将当前的Data Block写入SSTable文件并清空(block->reset())Data Block,同时将pending_index_entry的值设为true。当下一条KV数据再进来时会命中该值为true的逻辑,然后往Index Block中追加一条索引信息,追加完成后再将其重新置回false

Status TableBuilder::Finish() {
  Rep* r = rep_;
  Flush();
  assert(!r->closed);
  r->closed = true;

  BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle;

  // Write filter block
  if (ok() && r->filter_block != nullptr) {
    WriteRawBlock(r->filter_block->Finish(), kNoCompression,
                  &filter_block_handle);
  }

  // Write metaindex block
  if (ok()) {
    BlockBuilder meta_index_block(&r->options);
    if (r->filter_block != nullptr) {
      // Add mapping from "filter.Name" to location of filter data
      std::string key = "filter.";
      key.append(r->options.filter_policy->Name());
      std::string handle_encoding;
      filter_block_handle.EncodeTo(&handle_encoding);
      meta_index_block.Add(key, handle_encoding);
    }

    // TODO(postrelease): Add stats and other meta blocks
    WriteBlock(&meta_index_block, &metaindex_block_handle);
  }

  // Write index block
  if (ok()) {
    if (r->pending_index_entry) {
      r->options.comparator->FindShortSuccessor(&r->last_key);
      std::string handle_encoding;
      r->pending_handle.EncodeTo(&handle_encoding);
      r->index_block.Add(r->last_key, Slice(handle_encoding));
      r->pending_index_entry = false;
    }
    WriteBlock(&r->index_block, &index_block_handle);
  }

  // Write footer
  if (ok()) {
    Footer footer;
    footer.set_metaindex_handle(metaindex_block_handle);
    footer.set_index_handle(index_block_handle);
    std::string footer_encoding;
    footer.EncodeTo(&footer_encoding);
    r->status = r->file->Append(footer_encoding);
    if (r->status.ok()) {
      r->offset += footer_encoding.size();
    }
  }
  return r->status;
}

就是按SSTable的结构将每部分Block追加写入文件而已

Table

读取靠Table的Iter完成

Status Table::Open(const Options& options, RandomAccessFile* file,
                   uint64_t size, Table** table) {
  *table = nullptr;
  if (size < Footer::kEncodedLength) {
    return Status::Corruption("file is too short to be an sstable");
  }

  char footer_space[Footer::kEncodedLength];
  Slice footer_input;
  Status s = file->Read(size - Footer::kEncodedLength, Footer::kEncodedLength,
                        &footer_input, footer_space);
  if (!s.ok()) return s;

  Footer footer;
  s = footer.DecodeFrom(&footer_input);
  if (!s.ok()) return s;

  // Read the index block
  BlockContents index_block_contents;
  ReadOptions opt;
  if (options.paranoid_checks) {
    opt.verify_checksums = true;
  }
  s = ReadBlock(file, opt, footer.index_handle(), &index_block_contents);

  if (s.ok()) {
    // We've successfully read the footer and the index block: we're
    // ready to serve requests.
    Block* index_block = new Block(index_block_contents);
    Rep* rep = new Table::Rep;
    rep->options = options;
    rep->file = file;
    rep->metaindex_handle = footer.metaindex_handle();
    rep->index_block = index_block;
    rep->cache_id = (options.block_cache ? options.block_cache->NewId() : 0);
    rep->filter_data = nullptr;
    rep->filter = nullptr;
    *table = new Table(rep);
    (*table)->ReadMeta(footer);
  }

  return s;
}

首先读取Footer,根据Footer存储的Meta Index Block和Index Block的索引信息,依次调用ReadBlock读取数据,读取索引信息后进一步调用ReadFilter读取FilterBlock中布隆过滤器的数据,然后就可以处理查询请求了。在查询时SSTable对外通过TwoLevelIterator迭代器来查找。该迭代器创建时需要传递两个迭代器:一个是Index Block的迭代器,另一个是Data Block的迭代器。这也是TwoLevelIterator名称的由来。

SSTable读取全过程

入口是Version::Get()

Status Version::Get(const ReadOptions& options, const LookupKey& k,
                    std::string* value, GetStats* stats) {
  stats->seek_file = nullptr;
  stats->seek_file_level = -1;

  struct State {
    Saver saver;
    GetStats* stats;
    const ReadOptions* options;
    Slice ikey;
    FileMetaData* last_file_read;
    int last_file_read_level;

    VersionSet* vset;
    Status s;
    bool found;
    //从第level层的第f个文件开始判断是否匹配  
    static bool Match(void* arg, int level, FileMetaData* f) {
      State* state = reinterpret_cast<State*>(arg);

      if (state->stats->seek_file == nullptr &&
          state->last_file_read != nullptr) {
        // We have had more than one seek for this read.  Charge the 1st file.
        state->stats->seek_file = state->last_file_read;
        state->stats->seek_file_level = state->last_file_read_level;
      }

      state->last_file_read = f;
      state->last_file_read_level = level;
      //从SSTable的缓存中查找,其中Savlue是个函数  
      state->s = state->vset->table_cache_->Get(*state->options, f->number,
                                                f->file_size, state->ikey,
                                                &state->saver, SaveValue);
      if (!state->s.ok()) {
        state->found = true;
        return false;
      }
      switch (state->saver.state) {
        case kNotFound:
          return true;  // Keep searching in other files
        case kFound:
          state->found = true;
          return false;
        case kDeleted:
          return false;
        case kCorrupt:
          state->s =
              Status::Corruption("corrupted key for ", state->saver.user_key);
          state->found = true;
          return false;
      }

      // Not reached. Added to avoid false compilation warnings of
      // "control reaches end of non-void function".
      return false;
    }
  };

  State state;
  state.found = false;
  state.stats = stats;
  state.last_file_read = nullptr;
  state.last_file_read_level = -1;

  state.options = &options;
  state.ikey = k.internal_key();
  state.vset = vset_;

  state.saver.state = kNotFound;
  state.saver.ucmp = vset_->icmp_.user_comparator();
  state.saver.user_key = k.user_key();
  state.saver.value = value;
  //遍历所有层的SSTable
  ForEachOverlapping(state.saver.user_key, state.ikey, &state, &State::Match);

  return state.found ? state.s : Status::NotFound(Slice());
}
void Version::ForEachOverlapping(Slice user_key, Slice internal_key, void* arg,
                                 bool (*func)(void*, int, FileMetaData*)) {
  const Comparator* ucmp = vset_->icmp_.user_comparator();

  // Search level-0 in order from newest to oldest.
  std::vector<FileMetaData*> tmp;
  tmp.reserve(files_[0].size());
  for (uint32_t i = 0; i < files_[0].size(); i++) {
    FileMetaData* f = files_[0][i];
    if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 &&
        ucmp->Compare(user_key, f->largest.user_key()) <= 0) {
      tmp.push_back(f);
    }
  }
  if (!tmp.empty()) {
    //从新到旧排序
    std::sort(tmp.begin(), tmp.end(), NewestFirst);
    for (uint32_t i = 0; i < tmp.size(); i++) {
      if (!(*func)(arg, 0, tmp[i])) {
        return;
      }
    }
  }

  // Search other levels.
  for (int level = 1; level < config::kNumLevels; level++) {
    size_t num_files = files_[level].size();
    if (num_files == 0) continue;

    // Binary search to find earliest index whose largest key >= internal_key.
    uint32_t index = FindFile(vset_->icmp_, files_[level], internal_key);
    if (index < num_files) {
      FileMetaData* f = files_[level][index];
      if (ucmp->Compare(user_key, f->smallest.user_key()) < 0) {
        // All of "f" is past any data for user_key
      } else {
        if (!(*func)(arg, level, f)) {
          return;
        }
      }
    }
  }
}
//在一个层的files中通过二分查找定位到某个SSTable文件
int FindFile(const InternalKeyComparator& icmp,
             const std::vector<FileMetaData*>& files, const Slice& key) {
  uint32_t left = 0;
  uint32_t right = files.size();
  while (left < right) {
    uint32_t mid = (left + right) / 2;
    const FileMetaData* f = files[mid];
    if (icmp.InternalKeyComparator::Compare(f->largest.Encode(), key) < 0) {
      // Key at "mid.largest" is < "target".  Therefore all
      // files at or before "mid" are uninteresting.
      left = mid + 1;
    } else {
      // Key at "mid.largest" is >= "target".  Therefore all files
      // after "mid" are uninteresting.
      right = mid;
    }
  }
  return right;
}

在上述查找过程中,首先调用ForEachOverlapping在所有层开始查找。具体过程是,首先在level 0层按照文件新旧的顺序逐个查找(因为level0层的SSTable之间的数据有可能相互重叠),只要找到就结束查找。当level 0层没有找到时,在剩下的层开始逐层查找。level 0层之外的其他层的多个SSTable中的数据是不重叠的,因此待查找的key只会命中其中一个SSTable文件。这也是FindFile中通过二分查找、利用每个SSTable文件保存的最大值来定位SSTable文件的逻辑。当找到该文件后,再在该文件中查找。单个SSTable的具体查找过程实际上是在Match方法中完成的

kohaku