需求

接口

首先需求场景主要有这几种(简化):

  1. 在 n 秒以后执行一个任务 X
  2. 每隔 n 秒执行一次任务 X
  3. 取消一个已经添加的定时器

根据上面的简化需求,得到需要的主要接口:

  1. 添加一个定时器
  2. 定时器过期执行(可能需要重复执行)
  3. 取消一个定时器

数据结构

最后,就是考虑用来存放定时器的数据结构(也是定时器设计的核心)
上面的接口可以简单的看成这几个操作:

  1. 添加
  2. 删除
  3. 查询(获取最近需要执行的一个)

对于这几个操作,常用的复杂度比较均衡的数据结构:

  1. 红黑树
  2. 优先队列(最小堆)

另外跳表(本质类似红黑树)也是可用的;还有一种比较巧妙的hash结构时间轮,这是一个类似钟表指针的结构,将需要管理的定时器根据时间hash到数组中,以提高查询和添加的效率

这几种实现,现在流行的开源框架都各有采用,比如 Linux 内核用的就是时间轮的实现

实现

下面简单实现一下采用最小堆和时间轮作为数据结构的定时器,方便起见,编码演示使用python

时间堆

所谓的“时间堆”并不是什么稀奇东西,只是用小堆来管理以时间作为关键字的定时器集合
实现思路:

  1. 使用一个结构来保存定时器对象的数据,包括过期时间、回调函数等
  2. 所有的定时器对象保存在一个优先队列中(也就是最小堆),键值为时间
  3. 添加定时器即为向队列中插入新项
  4. 删除定时器可以使用惰性删除,首先给对应的定时器对象置位表示其已经取消,当达到一定条件时(比如取消的定时器数量达到总数量的1/2),进行一个清理操作
  5. 按照固定的时间间隔来tick,每次tick都需要看看有没有需要执行的定时器对象
  6. 因为对象由优先队列管理,因此,当队首元素不需要执行时,后面的元素则都不需要执行
  7. 一个对象需要执行时,可能需要将其删除;也可能是需要重复执行的,则需要设定好时间后再次加入

实现代码如下(仅为演示):

class Timer():
    def __init__(self, owner, timestamp, callback, callargs, loop):
        self.owner: Timers = owner
        self.timestamp = timestamp
        self.callback = callback
        self.callargs = callargs
        self.cancelled = False
        self.loop = loop

    def __lt__(self, other):
        return self.timestamp < other.timestamp

    @property
    def Cancelled(self): return self.cancelled
    @property
    def Time(self): return self.timestamp

    def cancel(self):
        self.cancelled = True
        self.owner.onCancel()

    def trigger(self, now):
        if not self.cancelled:
            self.callback(now, *self.callargs)
            if self.loop > 0:
                self.timestamp += self.loop
            else:
                self.cancel()

class Timers():
    def __init__(self):
        self.timerQue = []
        self.cancelCount = 0

    def tick(self):
        now = int(time.time())
        self.process(now)

    def process(self, now):
        while len(self.timerQue) > 0:
            timer: Timer = self.timerQue[0]

            # 到期或取消的定时器需要处理
            if timer.Time <= now or timer.Cancelled:

                if not timer.Cancelled:
                    timer.trigger(now)

                if not timer.Cancelled:
                    heapq.heapreplace(self.timerQue, timer)
                else:
                    heapq.heappop(self.timerQue)
                    self.cancelCount -= 1
            else:
                break

    def onCancel(self):
        self.cancelCount += 1
        if self.cancelCount > len(self.timerQue)/2:
            self.purge()

    # 这里处理的时候参考c++ std partition 操作,先将没用的都放到数组最后
    # 然后,将前面有用的重新进行一次建队操作
    def purge(self):
        # patition
        l = 0
        r = len(self.timerQue) - 1
        while l < r:
            while l < r and not self.timerQue[l].Cancelled: l += 1
            while l < r and self.timerQue[r].Cancelled: r -= 1

            self.timerQue[l], self.timerQue[r] = self.timerQue[r], self.timerQue[l]
            l += 1
            r -= 1

        # remove
        mid = math.ceil((l + r) / 2)
        if not self.timerQue[mid].Cancelled:
            mid += 1
        del self.timerQue[mid:]

        # heapmake
        heapq.heapify(self.timerQue)

    def add(self, duration, callback, callargs, looptime):
        now = int(time.time())
        timer: Timer = Timer(self, now + duration, callback, callargs, looptime)
        heapq.heappush(self.timerQue, timer)
        return timer

# test
def func(callback):
    waitEvent = threading.Event()
    while not waitEvent.wait(1):
        callback()
def func_a(now):
    print('in func a', now)
def func_b(now):
    print('in func b', now)
def func_c(a, b):
    print('in func c: before sleep')
    time.sleep(30)
    print('in func c: after sleep')
    a.cancel()
    b.cancel()

timers = Timers()
threading.Thread(target=func, args=(lambda:timers.tick(), )).start()
timer_a = timers.add(0, func_a, (), 10)
timer_b = timers.add(5, func_b, (), 10)
threading.Thread(target=func_c, args=(timer_a, timer_b)).start()

时间轮

时间轮是一个比较巧妙地hash结构,性质有点类似钟表指针
首先考虑一个简单的大小为60数组t,每一位表示对应时间的集合:第0秒需要处理的事务都在t[0],第5秒需要处理的事务都在t[5],依次类推
这样一来,当处在 x 秒时只需要去对应的数组元素即可;但是同样也带来了一个问题:最多只能处理一分钟内的事务(数组大小只有60)

为了解决这个问题,有一个简单的方案:在事务对象加一个字段 nRound 用来表示循环使用这个数组,当循环次数为 nRound 时才执行本事务
这样一来就解决了数组容量问题,但是随之而来的就是效率变得低下了;本来使用hash的目的就是为了避免不必要的遍历,可以直截了当地获取当前需要处理地任务,而现在又要遍历 t[n] 来判断 nRound 是否为当前轮次了

基于此,就有了这种更优美地解决方案时间轮:再加一个大小为60数组d,每一位还是表示对应时间地集合:第0分钟要处理地事务都在d[0]中,第5分钟需要处理地事务都在d[5]中,依次类推
当然,因为0-59秒的事务都放在了数组t里了,所以d[0]为空即可;当时间来到第1分钟时,再将d[1]中的事务放置到t中对应位置即可
这样一来,就已经可以处理1个小时内的任务了,可想而知,再加上更多的数组就可以处理更长的时间跨度了
(很明显,这里的数组大小和数组数量只是一个进制关系而已)

工作原理如下:

  1. 一级轮(就是一个数组)的每个格对应一个时间间隔
  2. 一级轮指针每次tick加1;当指针指向一级轮的某一格时,即表示这一格里的定时器都到期了
  3. 二级轮(包括更多级同理)的每个格对应一级轮的一圈
  4. 二级轮指针每当一级轮指针转一圈加1;当指针指向某一个时,即表示接下来需要处理这一格的定时器了(分散到一级轮里)

后面给出了一个简单的实现,简单起见就没有再处理重复执行的情况
这里再谈谈一些可以优化的地方:

  1. 指针和数组大小:在Linux内核定时器的实现里,采用了一个很巧妙的设计,一共5个数组,大小分别为 64(a) 64(b) 64(c) 64(d) 255(e) (64是2的6次方,255是2的8次方,也就总共占了32位);这样一来,只需利用整数的进位就可以自然的处理数组之间的进制关系了,这体现在,只需要一个32bit的指针和对应的位操作即可表示5个数组中的情况了,不再需要每个数组分配一个指针(例如末8位表示数组e的指针,之前的6位表示数组d的指针)
  2. 数组元素:更高级的数组对应的时间刻度就越长,就看可能有更多甚至非常大量的事务挤在一个格里,这时候特殊需求的操作(比如查询),可能就不能很好地支持,因此可以采用合适的数据结构来管理每个格里的事务;当然通常情况是不需要的,因为一般操作只是要把当前格里的事务hash到下一级的数组里
  3. 取消定时器:还是跟上面时间轮的实现一样,可以考虑采用惰性删除的策略
class Timer():
    def __init__(self, delay, callback, callargs):
        self.delay = delay
        self.callback = callback
        self.callargs = callargs
        self.cancelled = False

    @property
    def Cancelled(self): return self.cancelled

    def cancel(self):
        self.cancelled = True

    def trigger(self):
        if not self.cancelled:
            self.callback(*self.callargs)
            self.cancelled = True

class Wheels():
    WHEEL_NUM = 3
    SOLT_NUM = 8
    MAX_NUM = int(math.pow(SOLT_NUM, WHEEL_NUM))
    def __init__(self):
        self.pointer = []
        self.wheels = []
        for i in range(self.WHEEL_NUM):
            self.pointer.append(0)
            self.wheels.append([])
            for j in range(self.SOLT_NUM):
                self.wheels[i].append([])
    
    def add(self, delay, func, args):
        if delay >= self.MAX_NUM: return

        past = delay
        wheel = self.WHEEL_NUM - 1
        while past >= self.SOLT_NUM:
            past = past // self.SOLT_NUM
            wheel -= 1
        solt = (self.pointer[wheel] + past) % self.SOLT_NUM
        delay = delay % int(math.pow(self.SOLT_NUM, self.WHEEL_NUM - wheel - 1))

        self.wheels[wheel][solt].append(Timer(delay, func, args))

    def tick(self):
        print('tick')
        for i in range(self.WHEEL_NUM - 1):
            while self.wheels[i][self.pointer[i]]:
                timer = self.wheels[i][self.pointer[i]].pop()
                self.add(timer.delay, timer.callback, timer.callargs)

        idx = self.WHEEL_NUM - 1
        while self.wheels[idx][self.pointer[idx]]:
            timer = self.wheels[idx][self.pointer[idx]].pop()
            timer.trigger()

        for i in range(self.WHEEL_NUM - 1, -1, -1):
            self.pointer[i] = (self.pointer[i] + 1) % self.SOLT_NUM
            if self.pointer[i] > 0:
                break

def func(callback):
    waitEvent = threading.Event()
    while not waitEvent.wait(1):
        callback()
wheels = Wheels()
threading.Thread(target=func, args=(lambda:wheels.tick(), )).start()

# test
def func_a():
    print('in func a')
def func_b():
    print('in func b')

timer_a = wheels.add(5, func_a, ())
timer_b = wheels.add(10, func_b, ())

总结

到这里,关于定时器的实现就讲完了;实现本身可能并不精彩,秒的是一些细节处的设计思想,最后再来回顾一下:

  1. 惰性删除和清理操作:参考了开源引擎kbengine中定时器的实现;惰性删除可以节省高频操作的开销,还可以减少频繁的内存操作;在清理操作的细节中,先将有用和无用的定时器分离(分别放置到数组前后),可以有效提高内存操作的效率
  2. 时间轮结构的设计:时间轮结构的设计非常巧妙,源于hash也兼顾了空间,并且将高频操作(最近时间段)和低频操作(较远时间段)分离了开来
  3. 时间轮指针设计:利用整数进位,简化时间轮指针
内容来源于网络如有侵权请私信删除

文章来源: 博客园

原文链接: https://www.cnblogs.com/yingjie-zh/p/15350134.html

你还没有登录,请先登录注册
  • 还没有人评论,欢迎说说您的想法!