深入浅出带你走进 RocksDB

原创
09/28 10:54
阅读数 1.9K

RocksDB 是基于 Google LevelDB 研发的高性能 Key-Value 持久化存储引擎,以库组件形式嵌入程序中,为大规模分布式应用在 SSD 上运行提供优化。RocksDB 重点是提供工具支持,具体实现将交给上层应用。

正是这种高度可定制化能力,使得 RocksDB 可涵盖包括工作负载等众多场景的应用,今天我们将逐一为大家介绍:

"为什么需要内存管理器? 为什么不使用现有的内存管理器? RocksDB 究竟是如何实现的?"

01 为什么需要内存管理器?

RocksDB 有很多核心场景需要分配内存的,包括但不限于 Memtable、 Cache、Iterator 等,高效优质的内存分配器就显得尤为重要。一个优秀的通用内存分配器需要具备以下特性:

  • 尽量避免内存碎片;并发分配性能好;
  • 额外的空间损耗尽量少;
  • 兼具通用性、兼容性、可移植性且易调试。

内存管理可以分为 3 个层次,自下而上分别是:

  • 操作系统内核的内存管理;
  • glibc 层使用系统调用维护的内存管理;
  • 应用程序从 glibc 层动态分配内存后,根据应用程序本身的程序特性进行优化, 比如使用引用计数、std::shared_ptr、apache 等内存池方式的内存管理。

目前大部分服务端程序使用 glibc 提供的 malloc/free 系列函数,而 glibc 使用的 ptmalloc2 在性能上远远落后于 Google 的 Tcmalloc 和 Facebook 的 Jemalloc 。 后两者只需使用 LD_PRELOAD 环境变量启动程序即可,甚至并不需要重新编译。

02 为什么不使用现有内存管理器?

RocksDB 使用内存的场景集中在 Memtable、Cache、Iterator 等核心链路上,在保证高性能的同时,还需要实现对内存分配器内部控制(包括监控等)。

当前现有的内存分配器不具有主动上报内存监控细节,所以从长远看,依旧需要自己实现 RocksDB 专有的内存管理器。内存管理器设计思路如下:

  • 小内存的申请通过 Thread-cache 或者 Per-cpu cache,保证了 CPU 不会频繁的 cache-miss;
  • 若支持 MAP_HUGETLB,则会直接 mmap;
  • 若大页内存则直接从底层的分配器去申请。

03 RocksDB 是如何实现的?

如图所示,RocksDB 的内存管理器是支持并发的,接下来让我们一起从源码入手,看看具体如何实现的。

Allocator


// Abstract interface for allocating memory in blocks. This memory is freed
// When the allocator object is destroyed. See the Arena class for more info.
class Allocator {
 public:
  virtual ~Allocator() {}
  // 分配内存
  virtual char* Allocate(size_t bytes) = 0;                     
  // 对齐分配内存
  virtual char* AllocateAligned(size_t bytes, size_t huge_page_size = 0,
                                Logger* logger = nullptr) = 0;  
  // 块大小
  virtual size_t BlockSize() const = 0;                         
};

Arena

Arena 负责实现 RocksDB 的内存分配,我们从中可以看到其针对不同大小的内存分配请求,采取不同的分配策略,从而减少内存碎片。‍

class Arena : public Allocator {
    static const size_t kInlineSize = 2048;
    static const size_t kMinBlockSize;  // 4K
    static const size_t kMaxBlockSize;  // 2G
    
private:
    char inline_block_[kInlineSize] __attribute__((__aligned__(alignof(max_align_t))));
    const size_t kBlockSize;       // 每个 Block 的大小
    using Blocks = std::vector<char*>;
    Blocks blocks_;                // 分配的新 Block 地址(不够了就从新分配)
    
    struct MmapInfo {
        void* addr_;
        size_t length_;
        MmapInfo(void* addr, size_t length) : addr_(addr), length_(length) {}
    };
    std::vector<MmapInfo> huge_blocks_;     // 大块使用 mmap
    size_t irregular_block_num = 0;         // 不整齐的块分配次数(块大于 kBlockSize/4)
    
    char* unaligned_alloc_ptr_ = nullptr;   // 未对齐的一端指针(高地址开始)
    char* aligned_alloc_ptr_ = nullptr;     // 对齐一端指针(低地址开始)
    size_t alloc_bytes_remaining_ = 0;      // 内存剩余
    
    size_t blocks_memory_ = 0;              // 已经分配的内存大小
    
    #ifdef MAP_HUGETLB
    size_t hugetlb_size_ = 0;
    #endif  // MAP_HUGETLB
    char* AllocateFromHugePage(size_t bytes);
    char* AllocateFallback(size_t bytes, bool aligned);
    char* AllocateNewBlock(size_t block_bytes);
    
    AllocTracker* tracker_;
}

针对 Allocate 和 AllocateAligned,我们采用对同一块 Block 的两端进行分配。AllocateAligned 从内存块的低地址开始分配,Allocate 从高地址开始分配。分配流程图如下:

ConcurrentArena

内存分配器不仅需要减少内存碎片,同样需要保证并发分配的性能,那么 RocksDB 是如何实现 ConcurrentArena 的呢?

从内存管理架构图可以看出,RocksDB 维护了 CoreLocal 内存数组,每个线程从所在 CPU 对应的本地 Shard 上分配内存,若不足再去主内存 Arena 进行分配。我们从几个核心类开始逐一介绍:

1、ConcurrentArena


class ConcurrentArena : public Allocator {
 public:
  // block_size and huge_page_size are the same as for Arena (and are
  // in fact just passed to the constructor of arena_.  The core-local
  // shards compute their shard_block_size as a fraction of block_size
  // that varies according to the hardware concurrency level.
  explicit ConcurrentArena(size_t block_size = Arena::kMinBlockSize,
                           AllocTracker* tracker = nullptr,
                           size_t huge_page_size = 0);
                           
  char* Allocate(size_t bytes) override {
    return AllocateImpl(bytes, false /*force_arena*/,
                        [this, bytes]() { return arena_.Allocate(bytes); });
  }

 private:
  ...
  CoreLocalArray<Shard> shards_;    // 维护了一个 CoreLocal 内存数组
  Arena arena_;                     // 主内存
  ...
};

2、CoreLocalArray


// An array of core-local values. Ideally the value type, T, is cache aligned to
// prevent false sharing.
template <typename T>
class CoreLocalArray {
 public:
  ...
  // returns pointer to element for the specified core index. This can be used,
  // e.g., for aggregation, or if the client caches core index.
  T* AccessAtCore(size_t core_idx) const;

 private:
  std::unique_ptr<T[]> data_;
  int size_shift_;
};

3、并发分配流程


template <typename Func>
  char* AllocateImpl(size_t bytes, bool force_arena, const Func& func) {
    size_t cpu;

    // 1:大块则直接从 Arena 上分配,需要加锁
    std::unique_lock<SpinMutex> arena_lock(arena_mutex_, std::defer_lock);
    if (bytes > shard_block_size_ / 4 || force_arena ||
        ((cpu = tls_cpuid) == 0 &&
         !shards_.AccessAtCore(0)->allocated_and_unused_.load(
             std::memory_order_relaxed) &&
         arena_lock.try_lock())) {
      if (!arena_lock.owns_lock()) {
        arena_lock.lock();
      }
      auto rv = func();
      Fixup();
      return rv;
    }

    // 2:挑选 CPU 对应的 Shard
    Shard* s = shards_.AccessAtCore(cpu & (shards_.Size() - 1));
    if (!s->mutex.try_lock()) {
      s = Repick();
      s->mutex.lock();
    }
    std::unique_lock<SpinMutex> lock(s->mutex, std::adopt_lock);

    size_t avail = s->allocated_and_unused_.load(std::memory_order_relaxed);
    
    // 2.1:若当前 Shard 可用内存不够时,去 Arena 分配存入 Shard
    if (avail < bytes) {
      // reload
      std::lock_guard<SpinMutex> reload_lock(arena_mutex_);

      // If the arena's current block is within a factor of 2 of the right
      // size, we adjust our request to avoid arena waste.
      auto exact = arena_allocated_and_unused_.load(std::memory_order_relaxed);
      assert(exact == arena_.AllocatedAndUnused());

      if (exact >= bytes && arena_.IsInInlineBlock()) {
        // If we haven't exhausted arena's inline block yet, allocate from arena
        // directly. This ensures that we'll do the first few small allocations
        // without allocating any blocks.
        // In particular this prevents empty memtables from using
        // disproportionately large amount of memory: a memtable allocates on
        // the order of 1 KB of memory when created; we wouldn't want to
        // allocate a full arena block (typically a few megabytes) for that,
        // especially if there are thousands of empty memtables.
        auto rv = func();
        Fixup();
        return rv;
      }

      avail = exact >= shard_block_size_ / 2 && exact < shard_block_size_ * 2
                  ? exact
                  : shard_block_size_;
      s->free_begin_ = arena_.AllocateAligned(avail);
      Fixup();
    }
    s->allocated_and_unused_.store(avail - bytes, std::memory_order_relaxed);

    // 3:根据是否对齐,判断是从高地址/低地址分配
    char* rv;
    if ((bytes % sizeof(void*)) == 0) {
      // aligned allocation from the beginning
      rv = s->free_begin_;
      s->free_begin_ += bytes;
    } else {
      // unaligned from the end
      rv = s->free_begin_ + avail - bytes;
    }
    return rv;
  }

总结

  1. 入参 Func 就是上面传入的 Lambda 表达式:this, bytes { returnarena_.Allocate(bytes) ;
  2. 当请求内存块较大时,直接从 Arena 分配且需要加锁;否则直接从当前 CPU 对应的 Shard 分配;
  3. 若当前 Shard 可用内存不够,需要从 Arena 再次请求;4、根据是否对齐,判断从高/低地址分配。
展开阅读全文
加载中

作者的其它热门文章

打赏
0
0 收藏
分享
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部