2017年6月11日星期日

Redis 源码学习之事件驱动


Linuxeden 开源社区 --

Redis 基于多路复用技术实现了一套简单的事件驱动库,代码在 ae.hae.c 以及 ae_epoll.cae_evport.cae_kqueue.cae_select.c 这几个文件中。其中 ae 表示的是 antirez eventloop 的意思。

Redis 里面包含两种事件类型:FileEventTimeEvent

Redis 采用 IO 多路复用技术,所有的事件都是在一个线程中进行处理。Redis 的事件驱动模型可以以以下为代码进行表示:

在一个死循环中等待事件的到来,然后对事件进行处理,以此往复。这就是一个最经典的网络编程模型。

1. 基本数据结构

aeEventLoop

aeEventLoopRedis 中事件驱动模型的核心,封装了整个事件循环,其中每个字段解释如下:

  • maxfd: 已经接受的最大的文件描述符。
  • setsize: 当前循环中所能容纳的文件描述符的数量。
  • timeEventNextId: 下一个时间事件的 ID.
  • lastTime: 上一次被访问的时间,用来检测系统时钟是否被修改。
  • events: 指针,指向保存所有注册的事件的数组首地址。
  • fired: 指针,保存所有已经买被触发的事件的数组首地址。
  • timeEventHead:Redis 用一个链表来存储所有的时间事件,timeEventHead 是指向这个链表的首节点指针。
  • stop: 停止整个事件循环。
  • apiData: 指针,指向 epoll 结构。
  • beforeSleep: 函数指针。每次实现循环的时候,在阻塞直到时间到来之前,会先调用这个函数。

aeFileEvent 和 aeTimeEvent

这两个结构分别表示文件事件和时间事件,定义如下

其中 mask 表示文件事件类型掩码,可以是 AE_READABLE 表示是可读事件,AE_WRITABLE 为可写事件。aeFileProc 是函数指针。

aeFiredEvent

aeFiredEvent 结构表示一个已经被触发的事件,结果如下:

fd 表示事件发生在哪个文件描述符上面,mask 用来表示具体事件的类型。

aeApiState

Redis 底层采用 IO 多路复用技术实现高并发,具体实现可以采用 kqueueselectepoll 等技术。对于 Linux 来说,epoll 的性能要优于 select,所以以 epoll 为例来进行分析。

aeApiState 封装了跟 epoll 相关的数据,epfd 保存 epoll_create() 返回的文件描述符。

具体实现细节

事件循环启动:aeMain()

事件驱动的启动代码位于 ae.caeMain() 函数中,代码如下:

aeMain() 方法中可以看到,整个事件驱动是在一个 while() 循环中不停地执行 aeProcessEvents() 方法,在这个方法中执行从客户端发送过来的请求。

初始化:aeCreateEventLoop()

aeEventLoop 的初始化是在 aeCreateEventLoop() 方法中进行的,这个方法是在 server.c 中的 initServer() 中调用的。实现如下:

在这个方法中主要就是给 aeEventLoop 对象分配内存然后并进行初始化。其中关键的地方有:

1、调用 aeApiCreate() 初始化 epoll 相关的数据。aeApiCreate() 实现如下:

aeApiCreate() 方法中主要完成以下三件事:
1. 分配 aeApiState 结构需要的内存。
2. 调用 epoll_create() 方法生成 epoll 的文件描述符,并保存在 aeApiState.epfd 字段中。
3. 把第一步分配的 aeApiState 的内存地址保存在 EventLoop->apidata 字段中。

2、初始化 events 中的 mask 字段为为 AE_NONE

生成 fileEvent:aeCreateFileEvent()

Redis 使用 aeCreateFileEvent() 来生成 fileEvent,代码如下:

aeCreateFileEvent() 方法主要做了一下三件事:

  1. 检查新增的 fd 是否超过所能容纳最大值。
  2. 调用 aeApiAddEvent() 方法把对应的 fd 以 mask 模式添加到 epoll 监听器中。

设置相应的字段值。其中最关键的步骤是第二步,aeApiAddEvent() 方法如下:

生成 timeEvent:aeCreateTimeEvent()

aeCreateTimeEvent() 方法主要是用来生成 timeEvent 节点,其实现比较简单,代码如下所示:

处理 timeEevnt:processTimeEvents()

RedisprocessTimeEvents() 方法中来处理所有的 timeEvent,实现如下:

在这个方法中会

  1. 判断系统时间有没有调整过,如果调整过,则会把 timeEvent 链表中的所有的 timeEvent 的触发时间设置为 0,表示立即执行。
  2. timeEvent 链表进行遍历,对于每个 timeEvent 节点,如果有:
    1. 如果已经被标记为删除 (AE_DELETED_EVENT_ID),则立即释放对应节点内存,遍历下个节点。
    2. 如果 id 大于 maxId, 则表示当前节点为本次循环中新增节点,咋本次循环中不错处理,继续下个节点。
    3. 如果当前节点的触发时间大于当前时间,则调用对应节点的 timeProc() 方法执行任务。根据 timeProc() 方法的返回,又分为两种情况:
      1. 返回为 AE_NOMORE,表示当前 timeEvent 节点属于一次性事件,标记该节点 IDAE_DELETED_EVENT_ID, 表示删除节点,该节点将会在下一轮的循环中被删除。
      2. 返回不是 AE_NOMORE,表示当前 timeEvent 节点属于周期性事件,需要多次执行,调用 aeAddMillisecondsToNow() 方法设置下次被执行时间。

处理所有事件:aeProcessEvents()

Redis 中所有的事件,包括 timeEventfileEvent 都是在 aeProcessEvents() 方法中进行处理的,刚方法实现如下:

该方法的入参 flag 表示要处理哪些事件,可以取以下几个值 :

  • AE_ALL_EVENTS:timeEventfileEvent 都会处理。
  • AE_FILE_EVENTS: 只处理 fileEvent
  • AE_TIME_EVENTS: 只处理 timeEvent
  • AE_DONT_WAIT: 要么立马返回,要么处理完那些不需要等待的事件之后再立马返回。

aeProcessEvents() 方法会做下面几件事:

  1. 判断传入的 flag 的值,如果既不包含 AE_TIME_EVENTS 也不包含 AE_FILE_EVENTS 则直接返回。
  2. 计算如果有 aeFileEvent 事件需要进行处理,则先计算 epoll_wait() 方法需要阻塞等待的时间,计算方式如下:
    1. 先从 aeTimeEvent 事件链表中找到最近的需要被触发的 aeTimeEvent 节点并计算需要被触发的时间,该被触发时间则为 epoll_wait() 需要等待的时间。
    2. 如果没有找到最近的 aeTimeEvent 节点,表示没有 aeTimeEvent 节点被加入链表,则判断传入的 flags 是否包含 AE_DONT_WAIT 选项,则设置 epoll_wait() 需要等待时间为 0,表示立即返回。
    3. 如果没有设置 AE_DONT_WAIT, 则设置需要等待时间为 NULL, 表示 epoll_wait() 一直阻塞等待知道有 fileEvent 事件到来。
  3. 调用 aeApiPoll() 方法阻塞等待事件的到来,阻塞时间为第二步中计算的时间。aeApiPoll() 实现如下:

    aeApiPoll() 会做下面几件事:

    1. 根据传入的 tvp 计算需要阻塞的时间,然后调用 epoll_wait() 进行阻塞等待。
    2. 有事件到来之后先计算对应事件的类型。
    3. 把事件发生的 fd 以及对应的类型 mask 拷贝到 fired 数组中。
  4. aeApiPoll() 方法返回之后,所有事件已经就绪了的 fd 以及对应事件的类型 mask 已经保存在 eventLoop->fired[] 数组中。依次遍历 fired 数组,根据 mask 类型,执行对应的 frileProc() 或者 wfileProce() 方法。
  5. 如果传入的 flags 中有 AE_TIME_EVENTS, 则调用 processTimeEvents() 执行所有已经到时间了的 timeEvent
  6. 转自 http://ift.tt/2reWWGP

The post Redis 源码学习之事件驱动 appeared first on Linuxeden开源社区.

http://ift.tt/2r7UVYR

没有评论:

发表评论