怎么创网站/如何做到精准客户推广
文章目录
- 前言
- 必要数据结构
- 封装 epoll 的必要成员
- 文件事件
- 时间事件结构
- 已就绪事件
- 事件处理器的状态 (就是最主要的 aeEventLoop
- 初始化事件处理器状态
- 创建 listenfd 并加入 epoll
- 执行主循环
- 处理事件 aeProcessEvents
- 数据读写
- 处理非活动连接
前言
因为 Redis 的网络模块是一个采用 epoll 的但线程模型, 阅读起来相对更加简单, 就先从这一部分入手
文章主要包括 :
- TCP socket accept 建立连接的过程
- Redis 的 epoll 模型
- 处理时间事件和文件事件的流程
- TCP 数据的读写
- 处理非活动连接
必要数据结构
封装 epoll 的必要成员
typedef struct aeApiState
{// epoll_event 实例描述符int epfd;// 事件槽struct epoll_event *events;} aeApiState;
文件事件
typedef struct aeFileEvent
{// 监听事件类型掩码,// 值可以是 AE_READABLE 或 AE_WRITABLE ,// 或者 AE_READABLE | AE_WRITABLEint mask; /* one of AE_(READABLE|WRITABLE) */// 读事件处理器aeFileProc *rfileProc;// 写事件处理器aeFileProc *wfileProc;// 多路复用库的私有数据void *clientData;
} aeFileEvent;
时间事件结构
typedef struct aeTimeEvent
{// 时间事件的唯一标识符long long id; /* time event identifier. */// 事件的到达时间long when_sec; /* seconds */long when_ms; /* milliseconds */// 事件处理函数aeTimeProc *timeProc;// 事件释放函数aeEventFinalizerProc *finalizerProc;// 多路复用库的私有数据void *clientData;// 指向下个时间事件结构,形成链表struct aeTimeEvent *next;
} aeTimeEvent;
已就绪事件
typedef struct aeFiredEvent{// 已就绪文件描述符int fd;// 事件类型掩码,// 值可以是 AE_READABLE 或 AE_WRITABLE// 或者是两者的或int mask;
} aeFiredEvent;
事件处理器的状态 (就是最主要的 aeEventLoop
typedef struct aeEventLoop
{// 目前已注册的最大描述符int maxfd; /* highest file descriptor currently registered */// 目前已追踪的最大描述符int setsize; /* max number of file descriptors tracked */// 用于生成时间事件 idlong long timeEventNextId;// 最后一次执行时间事件的时间time_t lastTime; /* Used to detect system clock skew */// 已注册的文件事件aeFileEvent *events; /* Registered events */// 已就绪的文件事件aeFiredEvent *fired; /* Fired events */// 时间事件aeTimeEvent *timeEventHead;// 事件处理器的开关int stop;// 多路复用库的私有数据void *apidata; //我们只分析指向 epoll// 在处理事件前要执行的函数aeBeforeSleepProc *beforesleep;
} aeEventLoop;
初始化事件处理器状态
ae.c 中
/** 初始化事件处理器状态*/
aeEventLoop *aeCreateEventLoop(int setsize)
{aeEventLoop *eventLoop;int i;// 创建事件状态结构if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL)goto err;// 初始化文件事件结构和已就绪文件事件结构数组eventLoop->events = zmalloc(sizeof(aeFileEvent) * setsize);eventLoop->fired = zmalloc(sizeof(aeFiredEvent) * setsize);if (eventLoop->events == NULL || eventLoop->fired == NULL)goto err;// 设置数组大小eventLoop->setsize = setsize;// 初始化执行最近一次执行时间eventLoop->lastTime = time(NULL);// 初始化时间事件结构eventLoop->timeEventHead = NULL; //定时事件链表置空eventLoop->timeEventNextId = 0; //定时事件id为0eventLoop->stop = 0;eventLoop->maxfd = -1;eventLoop->beforesleep = NULL;//给EPOLL申请空间if (aeApiCreate(eventLoop) == -1)goto err;/* Events with mask == AE_NONE are not set. So let's initialize the* vector with it. */// 初始化监听事件for (i = 0; i < setsize; i++)eventLoop->events[i].mask = AE_NONE;// 返回事件循环return eventLoop;err:if (eventLoop){zfree(eventLoop->events);zfree(eventLoop->fired);zfree(eventLoop);}return NULL;
}
先调用 aeCreateEventLoop 函数, 动态分配内存, 进行初始化
初始化事件处理器其中包含有:
- 文件事件数组 (内部是一个指针, 初始化时动态分配固定内存大小, 形如数组)
- 已就绪文件事件数组 (同上)
- 时间事件链表 (指针置空) Redis 的时间事件是一个单链表, 且无序, 所以查找复杂度为O(n), 每次新的时间事件放入表头
Redis 只使用一个 serverCron 一个时间事件, 在其中执行所有的周期性任务, 所以 redis 几乎将这个无序链表当成指针在用, 没有造成性能上的影响
- void* 指针指向封装的 epoll 结构体, 其中有 epfd 和 epoll_event 数组(也是一个 epoll_event 指针)
返回 eventloop 指针
创建 listenfd 并加入 epoll
初始化完后, 需要调用anetTcpServer
函数(anet.c 中)
int anetTcpServer(char *err, int port, char *bindaddr, int backlog)
{return _anetTcpServer(err, port, bindaddr, AF_INET, backlog);
}
这个函数中调用_anetTcpServer
去真正调用::socket
函数创建一个listenfd
, 经过 bind listen 后, 调用aeCreateFileEvent
(ae.c) 将其添加进 epoll 的监听事件合集
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,aeFileProc *proc, void *clientData)
{if (fd >= eventLoop->setsize){errno = ERANGE;return AE_ERR;}if (fd >= eventLoop->setsize)return AE_ERR;// 取出文件事件结构aeFileEvent *fe = &eventLoop->events[fd];// 监听指定 fd 的指定事件, 本质上调用 epoll_ctlif (aeApiAddEvent(eventLoop, fd, mask) == -1)return AE_ERR;// 设置文件事件类型,以及事件的处理器fe->mask |= mask;if (mask & AE_READABLE)fe->rfileProc = proc;if (mask & AE_WRITABLE)fe->wfileProc = proc;// 私有数据fe->clientData = clientData;// 如果有需要,更新事件处理器的最大 fdif (fd > eventLoop->maxfd)eventLoop->maxfd = fd;return AE_OK;
}
执行主循环
void aeMain(aeEventLoop *eventLoop)
{eventLoop->stop = 0;while (!eventLoop->stop){// 如果有需要在事件处理前执行的函数,那么运行它if (eventLoop->beforesleep != NULL)eventLoop->beforesleep(eventLoop);// 开始处理事件aeProcessEvents(eventLoop, AE_ALL_EVENTS);}
}
处理事件 aeProcessEvents
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{int processed = 0, numevents;/* flag 没有指定监听事件*/if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS))return 0;/*这里会调用epoll阻塞直到时间事件到期*/if (eventLoop->maxfd != -1 ||((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))){int j;aeTimeEvent *shortest = NULL;struct timeval tv, *tvp;// 获取最近的时间事件if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))shortest = aeSearchNearestTimer(eventLoop);if (shortest){// 如果时间事件存在的话// 那么根据最近可执行时间事件和现在时间的时间差来决定文件事件的阻塞时间long now_sec, now_ms;// 计算距今最近的时间事件还要多久才能达到// 并将该时间距保存在 tv 结构中aeGetTime(&now_sec, &now_ms); //获取当前时间tvp = &tv;tvp->tv_sec = shortest->when_sec - now_sec;if (shortest->when_ms < now_ms) //最近的时间事件还没有到期{tvp->tv_usec = ((shortest->when_ms + 1000) - now_ms) * 1000;tvp->tv_sec--;}else{tvp->tv_usec = (shortest->when_ms - now_ms) * 1000;}// 时间差小于 0 ,说明事件已经可以执行了,将秒和毫秒设为 0 (不阻塞)if (tvp->tv_sec < 0)tvp->tv_sec = 0;if (tvp->tv_usec < 0)tvp->tv_usec = 0;}else{// 执行到这一步,说明没有时间事件// 那么根据 AE_DONT_WAIT 是否设置来决定是否阻塞,以及阻塞的时间长度//如果flag设置的这个,需要尽快返回,阻塞时间为0if (flags & AE_DONT_WAIT){// 设置文件事件不阻塞tv.tv_sec = tv.tv_usec = 0;tvp = &tv;}else{//不是的话我们可以阻塞// 文件事件可以阻塞直到有事件到达为止tvp = NULL; //为NULL时,epoll_wait的timeout设置为-1,一直等待}}// 处理文件事件,阻塞时间由 tvp 决定numevents = aeApiPoll(eventLoop, tvp);//遍历就绪事件数组for (j = 0; j < numevents; j++){// 从已就绪数组中获取事件aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];int mask = eventLoop->fired[j].mask;int fd = eventLoop->fired[j].fd;int rfired = 0;/* note the fe->mask & mask & ... code: maybe an already processed* event removed an element that fired and we still didn't* processed, so we check if the event is still valid. */// 读事件if (fe->mask & mask & AE_READABLE){// rfired 确保读/写事件只能执行其中一个rfired = 1;//执行读事件回调fe->rfileProc(eventLoop, fd, fe->clientData, mask);}// 写事件if (fe->mask & mask & AE_WRITABLE){if (!rfired || fe->wfileProc != fe->rfileProc)fe->wfileProc(eventLoop, fd, fe->clientData, mask);}processed++;}}/* Check time events */// 执行时间事件if (flags & AE_TIME_EVENTS)processed += processTimeEvents(eventLoop); //处理所有已到达时间事件(遍历时间链表)return processed; /* return the number of processed file/time events */
}
该函数flag
默认设置为AE_ALL_EVENTS
, 即处理所有类型事件
如果没有设置不能阻塞立刻返回的AE_DONT_WAIT
标志且没有一个最近要到期的时间事件, epoll
本来会一直阻塞下去,
但是 Redis 默认是有一个周期性检查自身资源和状态的时间事件(上面也提到了, 即serverCron
), 所以会设置epoll
的超时时间为时间事件的到期时间间隔, 到期后执行时间事件
当然, 根据代码能看出来, 如果等待期间有文件事件发生, 会先解决文件事件, 然后再处理时间事件, 所以时间事件的实际执行时间总是稍晚一点
数据读写
Redis 的客户端与服务器交互数据时, 都按照 Redis 定义的协议对格式进行编码, 这样就使消息之间有了 “边界”, 来应对 TCP 协议的流特性
比如说 : 客户端发送SET msg “helloworld”
那么客户端实际发送的数据是:
*3\r\n$3\r\nSET\r\n$3\r\nmsg\r\n$11\r\nhelloworld\r\n
*3
即该命令有3个参数, $3
第一个参数长度为3, 值为SET
, 也就是要执行的命令, 同上第二个参数长度为3, 值为msg
, 第三个参数长度为11, 值为hello world
处理非活动连接
// 检查客户端是否已经超时,如果超时就关闭客户端,并返回 1 ;
// 否则返回 0 。
int clientsCronHandleTimeout(redisClient *c) {// 获取当前时间time_t now = server.unixtime;// 服务器设置了 maxidletime 时间if (server.maxidletime &&// 不检查作为从服务器的客户端!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */// 不检查作为主服务器的客户端!(c->flags & REDIS_MASTER) && /* no timeout for masters */// 不检查被阻塞的客户端!(c->flags & REDIS_BLOCKED) && /* no timeout for BLPOP */// 不检查订阅了频道的客户端dictSize(c->pubsub_channels) == 0 && /* no timeout for pubsub */// 不检查订阅了模式的客户端listLength(c->pubsub_patterns) == 0 &&// 客户端最后一次与服务器通讯的时间已经超过了 maxidletime 时间(now - c->lastinteraction > server.maxidletime)){redisLog(REDIS_VERBOSE,"Closing idle client");// 关闭超时客户端freeClient(c);return 1;} else if (c->flags & REDIS_BLOCKED) {/* Blocked OPS timeout is handled with milliseconds resolution.* However note that the actual resolution is limited by* server.hz. */// 获取最新的系统时间mstime_t now_ms = mstime();// 检查被 BLPOP 等命令阻塞的客户端的阻塞时间是否已经到达// 如果是的话,取消客户端的阻塞if (c->bpop.timeout != 0 && c->bpop.timeout < now_ms) {// 向客户端返回空回复replyToBlockedClientTimedOut(c);// 取消客户端的阻塞状态unblockClient(c);}}// 客户度没有被关闭return 0;
}
我们发现 Redis 简单粗暴的比较客户端上一次的访问时间, 如果唱过阈值, 就直接断开连接