文章目录
  1. HBase源码01_创建连接
  2. 1. 代码示例 (kotlin)
  3. 2. 解析
    1. 2.1 创建 conf
    2. 2.2 user
    3. 2.3 pool
    4. 2.4 建立连接

[TOC]

HBase源码01_创建连接

👉 Github源码地址

源码基于分支 branch-1.2(v1.2.7),HBase的源码希望能坚持搞下去吧 😂。该系列参考学习的流程HBase源代码分析,感谢。

1. 代码示例 (kotlin)

代码从 client 端连接开始。

1
2
3
4
5
6
7
8
// 查看 hbase 表是否存在
@Test
fun testConnection() {
val connection = ConnectionFactory.createConnection()
println(connection.admin.tableExists(TableName.valueOf("test")))

connection.close()
}

2. 解析

创建连接的方法最多可以接收的参数如下:

1
public static Connection createConnection(Configuration conf, ExecutorService pool, User user)

2.1 创建 conf

conf 默认为 HBaseConfiguration.create()HBaseConfiguration 本身继承 Hadoop 的 Configuration,所以本身不是特别复杂。主要步骤如下:

  • 加载 hbase-default.xmlhbase-site.xml
  • 校验版本号,检查不通过会抛异常,涉及到的配置如下:

    1
    2
    3
    4
    // 是否忽略检查, 默认false
    hbase.defaults.for.version.skip
    // 版本号, 默认为 hbase-default.xml 中的配置
    hbase.defaults.for.version
  • 校验 (1 - 配置的[memstore/BlockCache]内存百分比) 是否大于 20%

核心代码如下:

1
2
3
4
5
6
conf.addResource("hbase-default.xml");
conf.addResource("hbase-site.xml");

checkDefaultsVersion(conf);
HeapMemorySizeUtil.checkForClusterFreeMemoryLimit(conf);
return conf;

2.2 user

可忽略, 返回 Hadoop 的 User, 用于做权限控制

核心代码如下:

1
2
3
4
if (user == null) {
UserProvider provider = UserProvider.instantiate(conf);
user = provider.getCurrent();
}

2.3 pool

可忽略, 做 批处理 的线程池

2.4 建立连接

  • 1、 获取连接实现类,通过反射初始化实现

  • 2、下面主要分析 ConnectionManager.HConnectionImplementation

  • 2.1、初始化一堆 hbase.client.* 相关的配置

  • 2.2、初始化 zookeeper 连接

1
2
3
1. RegistryFactory.getRegistry 获取 ZooKeeperRegistry 实例
2. 执行 retrieveClusterId() 建立 zookeeper 连接(代码比较深)
3. 返回 clusterId
  • 2.3 创建 RpcClient,主要负责 RPC调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 入口代码
this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics);

// 最终执行的代码
public static RpcClient createClient(Configuration conf, String clusterId,
SocketAddress localAddr, MetricsConnection metrics) {
// 常量: hbase.rpc.client.impl
String rpcClientClass = conf.get(CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, RpcClientImpl.class.getName());
return ReflectionUtils.instantiateWithCustomCtor(
rpcClientClass,
new Class[] { Configuration.class, String.class, SocketAddress.class,
MetricsConnection.class },
new Object[] { conf, clusterId, localAddr, metrics }
);
}

// !!! 可以看出 HBase 特别喜欢将重要的实现类提取出配置, 然后通过反射的方式去实例化
  • 2.4、其他一些初始化
1
2
3
4
5
6
7
8
9
10
// 创建改connect所相关region的监控信息, 由参数 hbase.client.backpressure.enabled 决定, 默认false
stats = ServerStatisticTracker.create(conf);
// client id 随机生成器
ConnectionManager.nonceGenerator = new PerClientRandomNonceGenerator();
// 异步的处理类, 带有线程池, 负责连续的请求, 比如用于批处理
this.asyncProcess = createAsyncProcess(this.conf);
// 远程调用的时候, 出现异常的处理机制
this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build();
// RpcRetryingCaller 的创建工厂, 配合 RetryingCallable 使用
this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);

到这基本结束了, 一个connection主要生成 rpcClientrpcCallerFactoryrpcClient 会结合 RpcRetryingCaller 进行 RPC 调用

文章目录
  1. HBase源码01_创建连接
  2. 1. 代码示例 (kotlin)
  3. 2. 解析
    1. 2.1 创建 conf
    2. 2.2 user
    3. 2.3 pool
    4. 2.4 建立连接