每日算法-时间轮

-- Pageviews

timewheel

最近做了一个订单15分钟过期变更状态机的功能,中间想过以定时任务来实现,考虑到定时任务存在一定的时间误差,于是在系统中引入了时间轮算法。

时间轮是一种环形的数据结构,类似于时钟,秒针、分针、时针分别为一层,每层分成多个格子,每个格子中存放任务集合,一个单独的线程推进时间一格一格的移动,并执行格子中的任务。它常用于延时任务,在Netty、akka、Quartz、Zookeeper等高性能组件中都存在时间轮定时器的踪影。

点击查看java版时间轮实现

时间轮相关名词解释

  • 时间格:环形结构中用于存放延迟任务的区块
  • 指针(currentTime):指向当前操作的时间格,代表当前时间
  • 格数(ticksPerWheel):时间轮中时间格的个数
  • 间隔(tickDuration):每个时间格之间的间隔
  • 总间隔(interval):当前时间轮总间隔$ ticksPerWheel \times tickDuration $

kafka时间轮

在Kafka中应用了大量的延迟操作,但它并没用使用JDK自带的Timer或是DelayQueue用于延迟操作,而是使用自己开发的DelayedOperationPurgatory组件用于管理延迟操作,
Kafka这类分布式框架有大量延迟操作并且对性能要求及高,而java.util.Timer与java.util.concurrent.DelayQueue的插入和删除复杂度都为对数阶O(log n)并不能满足Kafka性能要求,
所以Kafka实现了基于时间轮的定时任务组件,该时间轮定时任务实现的插入与删除(开始定时器与暂停定时器)的时间复杂度都为常数阶O(1)。

Timer

Timer是kafka中的定时器类,定义了共客户端调用的方法。SystemTimer是对Timer的具体实现。

  • taskExecutor:过期任务执行线程,为了不影响性能,过期任务单独开辟线程执行;
  • delayQueue:一个Timer只有一个delayQueue,Timer中所有timingWheel共用,用于获取过期任务;
  • timingWheel:最底层时间轮tickMs(间隔)为1ms,wheelSize(格数)为20;

kafka_timer

关键代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//添加任务
def add(timerTask: TimerTask): Unit = {
readLock.lock()
try {
addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + Time.SYSTEM.hiResClockMs))
} finally {
readLock.unlock()
}
}
//添加任务,失败直接执行
private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {
if (!timingWheel.add(timerTaskEntry)) {
if (!timerTaskEntry.cancelled)
taskExecutor.submit(timerTaskEntry.timerTask)
}
}
//获取过期任务,推进时间,任务执行或降轮重入
def advanceClock(timeoutMs: Long): Boolean = {
var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)
if (bucket != null) {
writeLock.lock()
try {
while (bucket != null) {
timingWheel.advanceClock(bucket.getExpiration())
bucket.flush(reinsert)
bucket = delayQueue.poll()
}
} finally {
writeLock.unlock()
}
true
} else {
false
}
}

TimingWheel

  • interval:时间轮时间范围$ tickMs \times wheelSize $
  • buckets:TimerTaskList数组,长度对应wheelSize
  • currentTime:当前时间$ startMs-(startMs \% tickMs) $,取整为tickMs的倍数
  • overflowWheel:上级时间轮

kafka_timingwheel

关键代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
//添加或获取上级时间轮
private[this] def addOverflowWheel(): Unit = {
synchronized {
if (overflowWheel == null) {
overflowWheel = new TimingWheel(
tickMs = interval,
wheelSize = wheelSize,
startMs = currentTime,
taskCounter = taskCounter,
queue
)
}
}
}
//添加任务 失败返回false直接执行
def add(timerTaskEntry: TimerTaskEntry): Boolean = {
val expiration = timerTaskEntry.expirationMs
if (timerTaskEntry.cancelled) {
//取消
false
} else if (expiration < currentTime + tickMs) {
//已过期
false
} else if (expiration < currentTime + interval) {
//当前时间轮可以容纳该任务
val virtualId = expiration / tickMs
val bucket = buckets((virtualId % wheelSize.toLong).toInt)
bucket.add(timerTaskEntry)
if (bucket.setExpiration(virtualId * tickMs)) {
queue.offer(bucket)
}
true
} else {
//加入上级时间轮
if (overflowWheel == null) addOverflowWheel()
overflowWheel.add(timerTaskEntry)
}
}
//推进时间
def advanceClock(timeMs: Long): Unit = {
if (timeMs >= currentTime + tickMs) {
currentTime = timeMs - (timeMs % tickMs)
//推进上级时间轮时间
if (overflowWheel != null) overflowWheel.advanceClock(currentTime)
}
}

TimerTaskList

  • TimerTaskEntry:用于封装TimerTask

kafka_timertasklist

TimerTask

继承java Runnable接口

kafka_timertask