最近做了一个订单15分钟过期变更状态机的功能,中间想过以定时任务来实现,考虑到定时任务存在一定的时间误差,于是在系统中引入了时间轮算法。
时间轮是一种环形的数据结构,类似于时钟,秒针、分针、时针分别为一层,每层分成多个格子,每个格子中存放任务集合,一个单独的线程推进时间一格一格的移动,并执行格子中的任务。它常用于延时任务,在Netty、akka、Quartz、Zookeeper等高性能组件中都存在时间轮定时器的踪影。
点击查看java版时间轮实现
时间轮相关名词解释
- 时间格:环形结构中用于存放延迟任务的区块
- 指针(currentTime):指向当前操作的时间格,代表当前时间
- 格数(ticksPerWheel):时间轮中时间格的个数
- 间隔(tickDuration):每个时间格之间的间隔
- 总间隔(interval):当前时间轮总间隔$ ticksPerWheel \times tickDuration $
kafka时间轮
在Kafka中应用了大量的延迟操作,但它并没用使用JDK自带的Timer或是DelayQueue用于延迟操作,而是使用自己开发的DelayedOperationPurgatory组件用于管理延迟操作,
Kafka这类分布式框架有大量延迟操作并且对性能要求及高,而java.util.Timer与java.util.concurrent.DelayQueue的插入和删除复杂度都为对数阶O(log n)并不能满足Kafka性能要求,
所以Kafka实现了基于时间轮的定时任务组件,该时间轮定时任务实现的插入与删除(开始定时器与暂停定时器)的时间复杂度都为常数阶O(1)。
Timer
Timer是kafka中的定时器类,定义了共客户端调用的方法。SystemTimer是对Timer的具体实现。
- taskExecutor:过期任务执行线程,为了不影响性能,过期任务单独开辟线程执行;
- delayQueue:一个Timer只有一个delayQueue,Timer中所有timingWheel共用,用于获取过期任务;
- timingWheel:最底层时间轮tickMs(间隔)为1ms,wheelSize(格数)为20;
关键代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| def add(timerTask: TimerTask): Unit = { readLock.lock() try { addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + Time.SYSTEM.hiResClockMs)) } finally { readLock.unlock() } } private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = { if (!timingWheel.add(timerTaskEntry)) { if (!timerTaskEntry.cancelled) taskExecutor.submit(timerTaskEntry.timerTask) } } def advanceClock(timeoutMs: Long): Boolean = { var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS) if (bucket != null) { writeLock.lock() try { while (bucket != null) { timingWheel.advanceClock(bucket.getExpiration()) bucket.flush(reinsert) bucket = delayQueue.poll() } } finally { writeLock.unlock() } true } else { false } }
|
TimingWheel
- interval:时间轮时间范围$ tickMs \times wheelSize $
- buckets:TimerTaskList数组,长度对应wheelSize
- currentTime:当前时间$ startMs-(startMs \% tickMs) $,取整为tickMs的倍数
- overflowWheel:上级时间轮
关键代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| private[this] def addOverflowWheel(): Unit = { synchronized { if (overflowWheel == null) { overflowWheel = new TimingWheel( tickMs = interval, wheelSize = wheelSize, startMs = currentTime, taskCounter = taskCounter, queue ) } } }
def add(timerTaskEntry: TimerTaskEntry): Boolean = { val expiration = timerTaskEntry.expirationMs if (timerTaskEntry.cancelled) { false } else if (expiration < currentTime + tickMs) { false } else if (expiration < currentTime + interval) { val virtualId = expiration / tickMs val bucket = buckets((virtualId % wheelSize.toLong).toInt) bucket.add(timerTaskEntry) if (bucket.setExpiration(virtualId * tickMs)) { queue.offer(bucket) } true } else { if (overflowWheel == null) addOverflowWheel() overflowWheel.add(timerTaskEntry) } }
def advanceClock(timeMs: Long): Unit = { if (timeMs >= currentTime + tickMs) { currentTime = timeMs - (timeMs % tickMs) if (overflowWheel != null) overflowWheel.advanceClock(currentTime) } }
|
TimerTaskList
- TimerTaskEntry:用于封装TimerTask
TimerTask
继承java Runnable接口