title: 聊聊Linux定时器
tags: longzy:2018-11-17
在上一篇文章<如何选择TCP长连接与短连接>中,我介绍了 如何根据业务场景来选择TCP长连接还是短连接,其中在选择的长连接中,我提到了在长连接中用什么方法来检测长连接的存在与否?
为什么要去检测连接呢?连接建立以后不就可以收发数据了吗?no no no!其实TCP连接建立以后,并不是一直保持的,操作系统在实现TCP协议的时候做了一个限制,就是keepAlive。
所以我们需要一种方法来检测连接,那使用什么方法呢?这里我介绍两种方法,也是最常用的。
- 操作系统的KeepAlive机制
- 应用层实现心跳检测
KeepAlive机制
keepalive机制可以参考以下配置
cat /proc/sys/net/ipv4/tcp_keepalive_time
cat /proc/sys/net/ipv4/tcp_keepalive_intvl
cat /proc/sys/net/ipv4/tcp_keepalive_probes
大概含义是:
- tcp_keepalive_time: KeepAlive的空闲时长,或者说每次正常发送心跳的周期,默认值为7200s(2小时)
- tcp_keepalive_intvl: KeepAlive探测包的发送间隔,默认值为75s
- tcp_keepalive_probes:在tcp_keepalive_time之后,没有接收到对方确认,继续发送?;钐讲獍问现滴?(次)
keepalive默认不是开启的,如果想使用,则先修改配置文件 /etc/sysctl.conf:
net.ipv4.tcp_keepalive_time=7200
net.ipv4.tcp_keepalive_intvl=75
net.ipv4.tcp_keepalive_probes=9
然后通过linux底层函数 setsockopt ,原型为:
#include <sys/socket.h>
int setsockopt(int socket, int level, int option_name,const void *option_value, socklen_t option_len);
具体的使用这里就不介绍了,google有很多文章介绍的。
应用层心跳检测
应用层的心跳检测就是在server和client的接口协议中,设计一种用于心跳检测的协议格式,然后一般由客户端定时发送给服务端,因为这个连接一般是由服务端来管理的。
这里提到了定时,终于说到我这篇文章的重点了,对,接下来,该讨论重点:定时器!
定时器
在这里,我从三个方面来说定时器
- linux提供的定时器
- 高性能定时器
- 工业级定时器
linux提供的定时器
linux提供了三种定时器
- socket选项 SO_RCVTIMEO和SO_SNDTIMEO
- SIGALRM信号
- I/O复用系统调用的超时函数。
socket选项 SO_RCVTIMEO和SO_SNDTIMEO
这个选项分别是用来设置socket接受和发送数据的超时时间,因此这两个选项仅对与数据接受和发送相关的socket专用系统调用,这些调用包括send、sendmsg、recv、recvmsg、accept、和connect。我们将这两个选项对这些系统调用的影响总结为下表
系统调用 | 有效选项 | 系统调用超时后的行为 |
---|---|---|
send | SO_SNDTIMEO | return -1,errno is EAGAIN or EWOULDBLOCK |
sendmsg | SO_SNDTIMEO | return -1,errno is EAGAIN or EWOULDBLOCK |
recv | SO_RCVTIMEO | return -1,errno is EAGAIN or EWOULDBLOCK |
recvmsg | SO_RCVTIMEO | return -1,errno is EAGAIN or EWOULDBLOCK |
accept | SO_RCVTIMEO | return -1,errno is EAGAIN or EWOULDBLOCK |
connect | SO_SNDTIMEO | return -1,errno is EINPROGRESS |
这些选项在l实际inux服务器开发中很少用,因为如果设置这些选项后,意味着可能就选择了阻塞,然后在如今服务器开发过程中,基本不会采用阻塞。所以这里具体的用法我就不用举例了,大家去google,毕竟这篇文章的重点打算说的是工业级定时器
SIGALRM信号
由alarm和setitimer、sleep等函数设置的时间闹钟一旦超时,将会触发SIGALRM信号,因此可以利用该信号的信号处理函数来处理定时任务。这里不作过多的描述,如果想进一步了解的话,推荐大家去读<Linux高性能服务器编程>一书。
I/O复用系统调用的超时函数
Linux下的3组I/O复用(select、poll、epoll)系统调用都带有超时参数,他们不仅能处理信号事件和I/O事件,同样也能处理定时事件。由于I/O复用系统调用可能在超时时间到期之前就有返回(信号或I/O事件到来),所有如果利用I/O复用来做定时,每一次返回之后要重新更新时间,以保证处理最小定时器。
利用epoll设置定时的简单example
int timeout = 2000;
time_t start = time(NULL);
time_t end = time(NULL);
while(1)
{
start = time(NULL);
int num = epoll_wait(efd,events,events_size,timeout);
if (num < 0 && errno != EINTR) {
printf("epoll_wait failed!\n");
break;
}
if (0 == timeout)
{
printf("timeout is arrived\n");
continue;
}
end = time(NULL);
//重新计算下一次的超时时间
timeout -= (end - start)*1000;
//timeout有可能为0,这时设置成默认的超时时间好了
if (timeout <=0 ) {
timeout = 2000;
}
//处理到来的定时事件
}
这里只是简单提一下,后面在工业级定时器中会分析redis的定时器设置,以及后面再专门讲一下epoll。
高性能定时器
这里有几个理论知识要讲,我怕自己描述不清,给大家造成知识上的误解,所以在这里我推荐大家去读<Linux高性能服务器编程>一书,里面讲得特别清楚,如果大家需要电子版,可以给我留言或加我微信(个人资料中有微信号)。
工业级定时器
这里主要分析两个比较流行的开源代码,看看人家是怎么设计定时器的!
- redis
- muduo
redis
我们来看看redis的定时器是如何做的
1,创建一个定时器,调用如下
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
exit(1);
}
我们来看看实现:
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc)
{
/*这里解释下每个参数
eventLoop: 暂时不用管
milliseconds: 定时器的到期时间,是相对时间
proc: 定时器回调函数
clientData: 用户数据
finalizerProc: 最后处理函数
*/
long long id = eventLoop->timeEventNextId++;//定时器编号
aeTimeEvent *te;//具体结构不用管,表示一个具体定时器事件
te = zmalloc(sizeof(*te));
if (te == NULL) return AE_ERR;
te->id = id;
aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
//aeAddMillisecondsToNow这个函数是把相对时间转为绝对时间
te->timeProc = proc;//定时器处理函数
te->finalizerProc = finalizerProc;
te->clientData = clientData;
te->prev = NULL;
te->next = eventLoop->timeEventHead;
if (te->next)
te->next->prev = te;
eventLoop->timeEventHead = te;//放入到eventLoop维护的定时器列表中
return id;
}
在主处理事件的过程中
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
/* Nothing to do? return ASAP */
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
/* Note that we want call select() even if there are no
* file events to process as long as we want to process time
* events, in order to sleep until the next time event is ready
* to fire. */
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
/*如果是定时器事件,则去查找aeSearchNearestTimer最小定时器,
也就是即将到来的定时器
*/
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);
if (shortest) {
// 如果时间事件存在的话
// 那么根据最近可执行时间事件和现在时间的时间差来决定文件事件的阻塞时间
long now_sec, now_ms;
// 计算距今最近的时间事件还要多久才能达到
// 并将该时间距保存在 tv 结构中
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
/* How many milliseconds we need to wait for the next
* time event to fire? */
long long ms =
(shortest->when_sec - now_sec)*1000 +
shortest->when_ms - now_ms;
// 时间差小于 0 ,说明事件已经可以执行了,将秒和毫秒设为 0 (不阻塞)
if (ms > 0) {
tvp->tv_sec = ms/1000;
tvp->tv_usec = (ms % 1000)*1000;
} else {
tvp->tv_sec = 0;
tvp->tv_usec = 0;
}
} else {
/* If we have to check for events but need to return
* ASAP because of AE_DONT_WAIT we need to set the timeout
* to zero */
// 执行到这一步,说明没有时间事件
// 那么根据 AE_DONT_WAIT 是否设置来决定是否阻塞,以及阻塞的时间长度
if (flags & AE_DONT_WAIT) {
// 设置文件事件不阻塞
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
/* Otherwise we can block */
//设置阻塞
tvp = NULL; /* wait forever */
}
}
/* Call the multiplexing API, will return only on timeout or when
* some event fires. */
//调用epoll_wait
numevents = aeApiPoll(eventLoop, tvp);
/* After sleep callback. */
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int fired = 0; /* Number of events fired for current fd. */
/* Normally we execute the readable event first, and the writable
* event laster. This is useful as sometimes we may be able
* to serve the reply of a query immediately after processing the
* query.
*
* However if AE_BARRIER is set in the mask, our application is
* asking us to do the reverse: never fire the writable event
* after the readable. In such a case, we invert the calls.
* This is useful when, for instance, we want to do things
* in the beforeSleep() hook, like fsynching a file to disk,
* before replying to a client. */
int invert = fe->mask & AE_BARRIER;
/* Note the "fe->mask & mask & ..." code: maybe an already
* processed event removed an element that fired and we still
* didn't processed, so we check if the event is still valid.
*
* Fire the readable event if the call sequence is not
* inverted. */
if (!invert && fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
/* Fire the writable event. */
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
/* If we have to invert the call, fire the readable event now
* after the writable one. */
if (invert && fe->mask & mask & AE_READABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
processed++;
}
}
/* Check time events */
//这里才是关键,通过epoll_wait调用阻塞一段时间后,然后处理到期的定时器
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}
这里有两个关键的函数processTimeEvents,aeApiPoll。
processTimeEvents:轮询eventLoop维护的定时器列表,依次处理到期的定时器,然后调用定时器处理函数,但这里有一个很关键点,如果有人调整过系统时间,然后又调回来(之前调向前,现在调回来)。怎么办?redis是这样处理的
//由于每一次定时器到来后,都会更新这个lastTime,如果现在的时间now小于最后一次到来时间,就说明系统时间肯定被调整过了,然后重置定时器的运行时间
if (now < eventLoop->lastTime) {
te = eventLoop->timeEventHead;
while(te) {
te->when_sec = 0;
te = te->next;
}
}
// 更新最后一次处理时间事件的时间
eventLoop->lastTime = now;
aeApiPoll:其实就是调用 epoll_wait
muduo
muduo在定时器的设计上面首先选用了计时函数:gettimeofday,而没有用clock_gettime。
其实这两个函数各有优缺点,如下:
函数 | 优点 | 缺点 |
---|---|---|
gettiemofday | 不是系统调用,用户态实现的,没有上下文切换 | 精确到只到微妙(不过足以满足) |
clock_gettime | 精确到纳秒 | 系统调用,有上下文切换 |
定时函数选择timefd*系列
- timerfd_create
- timerfd_settime
- timerfd_gettime
用这一系列函数是由原因的,把时间转换成了一个文件描述符,该文件描述符在定时器超时的时刻变成可读,这样就很方便融入到I/O复用框架中,用统一的方式来处理I/O事件和定时器事件。这就是Reactor模式的特点,而且比调用I/O复用的超时精度还要低。
只要转换成了文件描述符,那处理起来就相对简单了,大概流程如下
- 通过timerfd_create 创建一个定时器的文件描述符
- 加入到定时器管理队列中
- 添加事件,调用epoll_ctl
- 调用epoll_wait等待事件的到来
- 当事件变为可读时,读取内容
- 去定时器管理队列中查找,执行对应的定时器处理函数
- 然后根据最初设置的定时器类型是否需要更新下一个超时时间
总结
定时器在linux服务器开发中,是必备的。
redis的设计相当巧妙,调用了epoll_wait来做超时,但有一点不好的地方,就是时间精度,epoll_wait超时时间是毫秒级。而muduo则把时间转为了文件描述符,利用Reactor的特点,对统一事件进行处理。