文章目录
  1. Hbase源码05_Put
  2. 1. 代码示例 (kotlin)
  3. 2. 解析
    1. 2.1 客户端
    2. 2.2 服务端

[TOC]

Hbase源码05_Put

1. 代码示例 (kotlin)

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
fun testPut() {
// 创建连接
val connection = ConnectionFactory.createConnection()
// 获取表
val table = connection.getTable(TableName.valueOf("test"))
// put
val put = Put(Bytes.toBytes("row10"))
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("a"), Bytes.toBytes("testPut"))
table.put(put)

connection.close()
}

connection.getTable 返回 HTable 实例,执行 HTable#put(...) 方法

1
2
3
4
5
6
7
8
9
@Override
public void put(final Put put) throws IOException {
getBufferedMutator().mutate(put);
// 默认自动刷新
// 实际调用 getBufferedMutator().flush();
if (autoFlush) {
flushCommits();
}
}

2. 解析

2.1 客户端

  • getBufferedMutator() 返回 BufferedMutatorImpl 实例,这个类相当于一个 puts的buffer,当达到指定的阀值,就会 flush 执行这些puts。mutate(...)的过程就是将 puts 加入 Buffer 的过程。
  • 当Buffer满了之后,执行方法 backgroundFlushCommits(false); 提交puts。
  • 忽略其他细节(分是否同步提交),将所有的Rows提交给类AsyncProcess#submit执行。submit流程如下:
    • 1、构建 Map<ServerName, MultiAction<Row>>,相当于分组,依次处理每个put,根据RowKey获取它对应的RegionServer以及所在的Region名称。这里有点类似于Get操作,获取row对应的机器,然后在进行RPC调用。
    • 2、将请求封装在 SingleServerRequestRunnable 中,然后提交给线程池执行。
    • 3、执行 SingleServerRequestRunnable#run 方法,创建Callable,然丢给caller执行。
    • 4、进行RPC调用,执行 RSRpcServices#multi 方法。

2.2 服务端

执行 multi 方法:

  • 迭代所有的RegionActions,获取每个action对应的HRegion
  • Region 处理所有的 actions,执行 mutateRows 方法。然后区分是PUT还是DELETE,然后执行代码 region.mutateRow(rm);
  • 内部执行 processRowsWithLocks(...) 方法,后续代码就比较多了,带步骤的注释也比较详细。大致步骤如下:
    • 1、执行 Pre 协处理器。
    • 2、获取行锁,当前region获取读锁。
    • 3、生成要操作的mutations(actions),记录waledits(写前日志)。
    • 4、回滚准备(获取日志的entry和写入的SequenceId)。
    • 5、获取action中的所有cell,然后根据cell的列族找到对应的memStore,然后将cell放入store中。
    • 6、释放region读锁,和行锁。
    • 7、同步写前日志的事务号(txid)。
    • =====如果有异常,释放锁,删除store的cell,以及回滚写前日志。
    • 8、执行 post 协处理器.
    • 最后判断memStore有没有达到flushSize,如果达到了就需要执行Flushing到HFile操作,这个后续在分析。

到这基本的PUT操作就结束了。。。

文章目录
  1. Hbase源码05_Put
  2. 1. 代码示例 (kotlin)
  3. 2. 解析
    1. 2.1 客户端
    2. 2.2 服务端