文章目录
  1. Hbase源码10_插曲StealJobQueue
    1. 1. 简介
    2. 源码

[TOC]

Hbase源码10_插曲StealJobQueue

1. 简介

在HBase做Compact过程, 会分两个线程池来处理大Compact小Compact, 用的就是StealJobQueue队列, 设计的挺巧, 可以看下。

大致思路如下:

  • StealJobQueue本身是个队列, 类里面又定义了一个队列stealFromQueue
  • StealJobQueue的获取是先从本身的获取, 获取不到从stealFromQueue偷取。stealFromQueue就是一个正常的队列。
  • 然后大Compact使用StealJobQueue, 小Compact 使用stealFromQueue
  • 这样就能优化大Compact的线程池, 当大Compact的线程池空闲的时候, 就可以从小Compact中偷取小任务执行。

源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package org.apache.hadoop.hbase.util;

import org.apache.hadoop.hbase.classification.InterfaceAudience;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* This queue allows a ThreadPoolExecutor to steal jobs from another ThreadPoolExecutor.
* This queue also acts as the factory for creating the PriorityBlockingQueue to be used in the
* steal-from ThreadPoolExecutor. The behavior of this queue is the same as a normal
* PriorityBlockingQueue except the take/poll(long,TimeUnit) methods would also check whether there
* are jobs in the steal-from queue if this q ueue is empty.
*
* Note the workers in ThreadPoolExecutor must be pre-started so that they can steal job from the
* other queue, otherwise the worker will only be started after there are jobs submitted to main
* queue.
*/
@InterfaceAudience.Private
public class StealJobQueue<T> extends PriorityBlockingQueue<T> {

private BlockingQueue<T> stealFromQueue;

private final Lock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();

public StealJobQueue() {
this.stealFromQueue = new PriorityBlockingQueue<T>() {
@Override
public boolean offer(T t) {
lock.lock();
try {
notEmpty.signal();
return super.offer(t);
} finally {
lock.unlock();
}
}
};
}

public BlockingQueue<T> getStealFromQueue() {
return stealFromQueue;
}

@Override
public boolean offer(T t) {
lock.lock();
try {
notEmpty.signal();
return super.offer(t);
} finally {
lock.unlock();
}
}

@Override
public T take() throws InterruptedException {
lock.lockInterruptibly();
try {
while (true) {
T retVal = this.poll();
if (retVal == null) {
retVal = stealFromQueue.poll();
}
if (retVal == null) {
notEmpty.await();
} else {
return retVal;
}
}
} finally {
lock.unlock();
}
}

@Override
public T poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
lock.lockInterruptibly();
try {
while (true) {
T retVal = this.poll();
if (retVal == null) {
retVal = stealFromQueue.poll();
}
if (retVal == null) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
} else {
return retVal;
}
}
} finally {
lock.unlock();
}
}
}
文章目录
  1. Hbase源码10_插曲StealJobQueue
    1. 1. 简介
    2. 源码