文章目录
  1. Hbase源码08_MemStore刷新到HFile
    1. 1. PUT 最后的代码
    2. 2. requestFlush
    3. 3. FlushHandler#run()
    4. 4. flushRegion 方法
    5. 5. HRegion#flush 方法
      1. 5.1 internalPrepareFlushCache 阶段
      2. 5.2 internalFlushCacheAndCommit 阶段

[TOC]

Hbase源码08_MemStore刷新到HFile

1. PUT 最后的代码

在PUT代码解析的最后,当MemStore写入到一定的阀值,就会将数据flushing到HFile中,代码如下:

1
2
3
4
5
6
7
8
9
10
11
  ......
......
} finally {
closeRegionOperation();
// 判断是否达到 flush size
if (!mutations.isEmpty() &&
isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize))) {
// 请求 flush
requestFlush();
}
}
  • this.addAndGetGlobalMemstoreSize 增加当前 HRegionMemstoreSize + addedSize
  • isFlushSize 就是判断阀值,可以通过建表的时候配置 MEMSTORE_FLUSHSIZE。也可以通过全局配置 hbase.hregion.memstore.flush.size 进行配置,默认 128M
  • 然后执行 requestFlush()

2. requestFlush

该方法最后执行的代码如下,创建一个FlushRegionEntry,然后提交到flush队列中。

1
2
3
4
5
6
7
8
9
10
11
12
13
// MemStoreFlusher 类
@Override
public void requestFlush(Region r, boolean forceFlushAllStores) {
synchronized (regionsInQueue) {
if (!regionsInQueue.containsKey(r)) {
// This entry has no delay so it will be added at the top of the flush
// queue. It'll come out near immediately.
FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores);
this.regionsInQueue.put(r, fqe);
this.flushQueue.add(fqe);
}
}
}

MemStoreFlusher类是在 HRegionServer 启动的时候初始化的类,在初始化过程中,会初始化一个 FlushHandler 类的数组,这个类就是专门用来处理flush操作的:

1
2
int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2);
this.flushHandlers = new FlushHandler[handlerCount];

FlushHandler 是实现了Runnable接口,即后面执行它的run方法

3. FlushHandler#run()

  • flushQueue 中取出一个 FlushQueueEntry,如果是FlushRegionEntry实例,进行flush操作:
1
2
3
4
5
// 如果是 FlushRegionEntry, 进行 flush 操作
FlushRegionEntry fre = (FlushRegionEntry) fqe;
if (!flushRegion(fre)) {
break;
}
  • flushRegion 代码如下,split&compact操作 暂时跳过

4. flushRegion 方法

  • 从队列中移除该 flush entry
  • 获取当前MemStore的读写锁
  • flush操作
1
FlushResult flushResult = region.flush(forceFlushAllStores);
  • 判断是否任然需要 split&compact操作
  • 释放锁,唤醒获取flush entry的线程

5. HRegion#flush 方法

实际执行 flushcache 方法:

  • 判断是否可以执行flush操作,不可以flush直接返回 CANNOT_FLUSH的FlushResult
  • 获取当前HRegion的读写锁
  • ========== 执行 preFlush 协处理器
  • 执行 FlushResult fs = internalFlushcache(specificStoresToFlush, status, writeFlushRequestWalMarker);
  • ========== 执行 postFlush 协处理器
  • 释放锁,返回成功的FlushResult结果

internalFlushcache 主要代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
protected FlushResult internalFlushcache(final WAL wal, final long myseqid,
final Collection<Store> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker)
throws IOException {
// 准备阶段
PrepareFlushResult result = internalPrepareFlushCache(wal, myseqid, storesToFlush, status, writeFlushWalMarker);
if (result.result == null) {
// 提交阶段
return internalFlushCacheAndCommit(wal, status, result, storesToFlush);
} else {
return result.result; // early exit due to failure from prepare stage
}
}

5.1 internalPrepareFlushCache 阶段

  • 准备写快照的日志WAL和mvcc多版本控制。
  • 准备 MemStore的快照DefaultMemStore 内部会将原空间(cellSet) 赋值给 snapshot对象,然后为cellSet重新新建一块空间。
1
2
3
4
// Prepare flush (take a snapshot)
for (StoreFlushContext flush : storeFlushCtxs.values()) {
flush.prepare();
}
  • 提交日志。

5.2 internalFlushCacheAndCommit 阶段

  • 执行flushCache操作,将file写入临时目录,返回路径
1
2
3
4
5
6
7
8
// A.  Flush memstore to all the HStores.
// Keep running vector of all store files that includes both old and the
// just-made new flush store file. The new flushed file is still in the
// tmp directory.

for (StoreFlushContext flush : storeFlushCtxs.values()) {
flush.flushCache(status);
}
  • 执行commit操作,将file写到实际的文件目录中。
1
2
3
4
5
6
7
8
// Switch snapshot (in memstore) -> new hfile (thus causing
// all the store scanners to reset/reseek).

// stores.values() and storeFlushCtxs have same order
for (StoreFlushContext flush : storeFlushCtxs.values()) {
boolean needsCompaction = flush.commit(status);
......
}
  • memstoreSize里面减去flush大小的值
  • 记录WAL日志,返回flush结果

至此,flush操作流程基本完毕了。。。至于 MemStore -> HFile 数据是怎样形成的,这个后面再窥探、、😅

文章目录
  1. Hbase源码08_MemStore刷新到HFile
    1. 1. PUT 最后的代码
    2. 2. requestFlush
    3. 3. FlushHandler#run()
    4. 4. flushRegion 方法
    5. 5. HRegion#flush 方法
      1. 5.1 internalPrepareFlushCache 阶段
      2. 5.2 internalFlushCacheAndCommit 阶段