网站建设与推广是什么/免费推广网站大全下载
二十redis之gossip协议
gossip协议是p2p方式的通信协议。通过节点之间不断交换信息,一段时间后所有节点都会知道整个集群完整的信息。
gossip算法,意思是八卦算法,在办公室中只要一个人八卦一下,在有限的时间内,办公室内的所有人都会知道八卦消息。
算法过程:集群中的一个节点广播自身信息,部分节点收到了信息,这些节点再继续在集群中传播这个节点的信息,一段时间后整个集群中都
有了这个节点的信息。实际上是gossip大部分节点都在一直做这个操作,所以集群在一段时间后信息透明。
通信过程
- 每一个节点有两个TCP端口:一个client访问的端口,一个节点间通信端口,通信端口号等于client访问端口加10000
- 每个节点在固定周期内通过特定规则选择几个节点发送ping消息。
- 接受到ping消息的节点会用Pong消息作为响应。
协议
消息类型分为: ping, pong, meet, fail
gossip协议消息由 消息头+ 消息体组成。
消息头:
typedef struct {char sig[4]; // 消息标识 RCmbuint32_t totlen; // 消息的总长度uint16_t ver; // 协议版本 当前是 1uint16_t port; // 基础端口号 client与server之间通信的端口uint16_t type; // 消息类型uint16_t count; // 如果是ping,pong表示消息体中的节点数uint64_t currentEpoch; //当前发送节点的配置纪元uint64_t configEpoch; // 主节点/从节点的主节点配置纪元uint64_t offset; // 复制偏移量char sender[CLUSTER_NAMELEN];// 当前发送节点的nodeIdunsigned char myslots[CLUSTER_SLOTS/8]; // 当前节点负责的槽信息char slaveof[CLUSTER_NAMELEN]; //如果发送节点是从节点,记录对应主节点的nodeIdchar myip[NET_IP_STR_LEN]; // 当前节点的ipchar notused1[34]; /// uint16_t cport; //集群节点间通信端口uint16_t flags; // 发送节点标识 区分主从、是否下线unsigned char state; // 发送节点所处的结群状态unsigned char mflags[3]; // 消息标识union clusterMsgData data; // 消息体
} clusterMsg;
消息类型
#define CLUSTERMSG_TYPE_PING 0 /* Ping */
#define CLUSTERMSG_TYPE_PONG 1 /* Pong (reply to Ping) */
#define CLUSTERMSG_TYPE_MEET 2 /* Meet "let's join" message */
#define CLUSTERMSG_TYPE_FAIL 3 /* Mark node xxx as failing */
消息体:
union clusterMsgData {/* PING, MEET and PONG */struct {// 数组类型携带多个节点的信息clusterMsgDataGossip gossip[1];} ping;// 失败节点信息struct {clusterMsgDataFail about;} fail;/* PUBLISH */struct {clusterMsgDataPublish msg;} publish;/* UPDATE */struct {clusterMsgDataUpdate nodecfg;} update;
};
节点Ping,pong消息体结构
typedef struct {char nodename[CLUSTER_NAMELEN]; // nodeIduint32_t ping_sent; // 最近一次Ping消息时间uint32_t pong_received; //最近一次收到Pong时间char ip[NET_IP_STR_LEN]; // node的ipuint16_t port; //node的基础端口uint16_t cport; //node集群间节点通信端口uint16_t flags; //节点标识uint32_t notused1;
} clusterMsgDataGossip;
节点fail消息体结构
typedef struct {char nodename[CLUSTER_NAMELEN]; // 失败nodeId
} clusterMsgDataFail;
ping —> pong 节点间通信
ping消息封装了自身和部分其他节点的状态数据
//方法调用链clusterSendPing() --> clusterSetGossipEntry() --> clusterSendMessage()
clusterSendPing
void clusterSendPing(clusterLink *link, int type) {unsigned char *buf;// 发送的消息clusterMsg *hdr;// 消息体中包含的节点数int gossipcount = 0;// 接受通知的节点数量 int wanted; // 消息总长度int totlen; int freshnodes = dictSize(server.cluster->nodes)-2;// 取集群中的10%的节点wanted = floor(dictSize(server.cluster->nodes)/10);if (wanted < 3) wanted = 3;if (wanted > freshnodes) wanted = freshnodes;/* Include all the nodes in PFAIL state, so that failure reports are* faster to propagate to go from PFAIL to FAIL state. */int pfail_wanted = server.cluster->stats_pfail_nodes;/* Compute the maxium totlen to allocate our buffer. We'll fix the totlen* later according to the number of gossip sections we really were able* to put inside the packet. */totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);totlen += (sizeof(clusterMsgDataGossip)*(wanted+pfail_wanted));/* Note: clusterBuildMessageHdr() expects the buffer to be always at least* sizeof(clusterMsg) or more. */if (totlen < (int)sizeof(clusterMsg)) totlen = sizeof(clusterMsg);buf = zcalloc(totlen);hdr = (clusterMsg*) buf;/* Populate the header. */if (link->node && type == CLUSTERMSG_TYPE_PING)link->node->ping_sent = mstime();// 初始化消息头clusterBuildMessageHdr(hdr,type);/* Populate the gossip fields */int maxiterations = wanted*3;while(freshnodes > 0 && gossipcount < wanted && maxiterations--) {// 随机选一个节点dictEntry *de = dictGetRandomKey(server.cluster->nodes);clusterNode *this = dictGetVal(de);/* Don't include this node: the whole packet header is about us* already, so we just gossip about other nodes. */// 因为消息头已经包含当前节点信息,所以消息体就不需要了if (this == myself) continue;/* PFAIL nodes will be added later. *///如果节点是失败状态,则进行ping-pongif (this->flags & CLUSTER_NODE_PFAIL) continue;/* In the gossip section don't include:* 1) Nodes in HANDSHAKE state.* 3) Nodes with the NOADDR flag set.* 4) Disconnected nodes if they don't have configured slots.*/if (this->flags & (CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_NOADDR) ||(this->link == NULL && this->numslots == 0)){freshnodes--; /* Tecnically not correct, but saves CPU. */continue;}/* Do not add a node we already have. */if (clusterNodeIsInGossipSection(hdr,gossipcount,this)) continue;// 添加节点到消息体中clusterSetGossipEntry(hdr,gossipcount,this);freshnodes--;gossipcount++;}/* If there are PFAIL nodes, add them at the end. */if (pfail_wanted) {dictIterator *di;dictEntry *de;di = dictGetSafeIterator(server.cluster->nodes);while((de = dictNext(di)) != NULL && pfail_wanted > 0) {clusterNode *node = dictGetVal(de);if (node->flags & CLUSTER_NODE_HANDSHAKE) continue;if (node->flags & CLUSTER_NODE_NOADDR) continue;if (!(node->flags & CLUSTER_NODE_PFAIL)) continue;clusterSetGossipEntry(hdr,gossipcount,node);freshnodes--;gossipcount++;/* We take the count of the slots we allocated, since the* PFAIL stats may not match perfectly with the current number* of PFAIL nodes. */pfail_wanted--;}dictReleaseIterator(di);}/* Ready to send... fix the totlen fiend and queue the message in the* output buffer. */totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);totlen += (sizeof(clusterMsgDataGossip)*gossipcount);hdr->count = htons(gossipcount);hdr->totlen = htonl(totlen);clusterSendMessage(link,buf,totlen);zfree(buf);
}#define CLUSTER_BROADCAST_ALL 0
#define CLUSTER_BROADCAST_LOCAL_SLAVES 1
void clusterBroadcastPong(int target) {dictIterator *di;dictEntry *de;di = dictGetSafeIterator(server.cluster->nodes);while((de = dictNext(di)) != NULL) {clusterNode *node = dictGetVal(de);if (!node->link) continue;if (node == myself || nodeInHandshake(node)) continue;if (target == CLUSTER_BROADCAST_LOCAL_SLAVES) {int local_slave =nodeIsSlave(node) && node->slaveof &&(node->slaveof == myself || node->slaveof == myself->slaveof);if (!local_slave) continue;}clusterSendPing(node->link,CLUSTERMSG_TYPE_PONG);}dictReleaseIterator(di);
}
clusterSetGossipEntry
// 构造消息体的单个节点信息
void clusterSetGossipEntry(clusterMsg *hdr, int i, clusterNode *n) {clusterMsgDataGossip *gossip;// 节点加入到消息体中的第i个gossip = &(hdr->data.ping.gossip[i]);memcpy(gossip->nodename,n->name,CLUSTER_NAMELEN);gossip->ping_sent = htonl(n->ping_sent/1000);gossip->pong_received = htonl(n->pong_received/1000);memcpy(gossip->ip,n->ip,sizeof(n->ip));gossip->port = htons(n->port);gossip->cport = htons(n->cport);gossip->flags = htons(n->flags);gossip->notused1 = 0;
}
meet —> pong 新节点加入
新加入节点发送meet消息到集群内任一节点(节点B),通知有新节点加入,节点B加入新节点到自身保存的节点信息,节点B与集群内的节点进行ping-ping通信,
最终集群内的所有节点都保存了新的节点信息。
pong —> other node 广播自身节点信息
pong消息封装了节点自身状态信息。当收到meet,ping消息,作为响应回复给发送方确认通信正常。节点也可以向集群内广播pong消息来通知整个集群对自身状态进行更新。
fial —> other node 广播节点失败
当节点判定集群内另一个节点下线,会向集群内广播一个fail消息,其他节点收到fail消息,会把对应的节点更新为下线状态。
// 方法调用链:markNodeAsFailingIfNeeded() ---> clusterSendFail() --> clusterBuildMessageHdr() --> clusterBroadcastMessage() --> clusterSendMessage()markNodeAsFailingIfNeeded --> clusterNodeFailureReportsCount --> clusterNodeCleanupFailureReports
markNodeAsFailingIfNeeded
void markNodeAsFailingIfNeeded(clusterNode *node) {// 集群内投票node失败的票数int failures;// 判定node失败下线需要的票数int needed_quorum = (server.cluster->size / 2) + 1;// 如果超时时间未到,则不处理if (!nodeTimedOut(node)) return; // 如果节点已经判定失败,则不处理// #define nodeFailed(n) ((n)->flags & CLUSTER_NODE_FAIL)if (nodeFailed(node)) return; // 集群内投票node失败的票数failures = clusterNodeFailureReportsCount(node);// 如果当前节点是主节点,参与投票if (nodeIsMaster(myself)) failures++;// 如果票数满足,则接下来发送fail通知集群内其他节点if (failures < needed_quorum) return; serverLog(LL_NOTICE,"Marking node %.40s as failing (quorum reached).", node->name);// 设置节点失败状态node->flags &= ~CLUSTER_NODE_PFAIL;node->flags |= CLUSTER_NODE_FAIL;node->fail_time = mstime();// 如果当前节点是主节点,则在集群内广播node失败的消息if (nodeIsMaster(myself)) clusterSendFail(node->name);clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
}
clusterNodeFailureReportsCount
int clusterNodeFailureReportsCount(clusterNode *node) {// 清理无效的失败票数clusterNodeCleanupFailureReports(node);// 认为node失败的票数return listLength(node->fail_reports);
}/**
** 当投票失败时间大于maxtime,则认为失败投票无效
**/
void clusterNodeCleanupFailureReports(clusterNode *node) {// 投票认为node失败的节点集合list *l = node->fail_reports;listNode *ln;listIter li;clusterNodeFailReport *fr;// 最大时间 = 节点超时时间 * 2mstime_t maxtime = server.cluster_node_timeout *CLUSTER_FAIL_REPORT_VALIDITY_MULT;mstime_t now = mstime();listRewind(l,&li);while ((ln = listNext(&li)) != NULL) {fr = ln->value;// 节点移除,票数减一if (now - fr->time > maxtime) listDelNode(l,ln);}
}
clusterSendFail
// 节点Nodename失败,通知集群内的其他节点
void clusterSendFail(char *nodename) {unsigned char buf[sizeof(clusterMsg)];clusterMsg *hdr = (clusterMsg*) buf;// 构造消息头clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAIL);//设置消息体fail的节点idmemcpy(hdr->data.fail.about.nodename,nodename,CLUSTER_NAMELEN);// 通知集群内的部分节点clusterBroadcastMessage(buf,ntohl(hdr->totlen));
}
clusterBuildMessageHdr
void clusterBuildMessageHdr(clusterMsg *hdr, int type) {int totlen = 0;uint64_t offset;clusterNode *master;// 如果当前节点是salve,则master为其主节点,如果当前节点是master节点,则master为当前master = (nodeIsSlave(myself) && myself->slaveof) ?myself->slaveof : myself;memset(hdr,0,sizeof(*hdr));// 初始化协议版本、标识、及类型,hdr->ver = htons(CLUSTER_PROTO_VER);hdr->sig[0] = 'R';hdr->sig[1] = 'C';hdr->sig[2] = 'm';hdr->sig[3] = 'b';hdr->type = htons(type);// 消息头设置当前节点idmemcpy(hdr->sender,myself->name,CLUSTER_NAMELEN);// 消息头设置当前节点ipmemset(hdr->myip,0,NET_IP_STR_LEN);if (server.cluster_announce_ip) {strncpy(hdr->myip,server.cluster_announce_ip,NET_IP_STR_LEN);hdr->myip[NET_IP_STR_LEN-1] = '\0';}// 基础端口及集群内节点通信端口int announced_port = server.cluster_announce_port ?server.cluster_announce_port : server.port;int announced_cport = server.cluster_announce_bus_port ?server.cluster_announce_bus_port :(server.port + CLUSTER_PORT_INCR);// 当前节点的槽信息memcpy(hdr->myslots,master->slots,sizeof(hdr->myslots));memset(hdr->slaveof,0,CLUSTER_NAMELEN);if (myself->slaveof != NULL)memcpy(hdr->slaveof,myself->slaveof->name, CLUSTER_NAMELEN);hdr->port = htons(announced_port);hdr->cport = htons(announced_cport);hdr->flags = htons(myself->flags);hdr->state = server.cluster->state;/* Set the currentEpoch and configEpochs. */hdr->currentEpoch = htonu64(server.cluster->currentEpoch);hdr->configEpoch = htonu64(master->configEpoch);// 设置复制偏移量if (nodeIsSlave(myself))offset = replicationGetSlaveOffset();elseoffset = server.master_repl_offset;hdr->offset = htonu64(offset);/* Set the message flags. */if (nodeIsMaster(myself) && server.cluster->mf_end)hdr->mflags[0] |= CLUSTERMSG_FLAG0_PAUSED;// 计算并设置消息的总长度if (type == CLUSTERMSG_TYPE_FAIL) {totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);totlen += sizeof(clusterMsgDataFail);} else if (type == CLUSTERMSG_TYPE_UPDATE) {totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);totlen += sizeof(clusterMsgDataUpdate);}hdr->totlen = htonl(totlen);}
clusterBroadcastMessage
void clusterBroadcastMessage(void *buf, size_t len) {dictIterator *di;dictEntry *de;// 集群内节点创建迭代器di = dictGetSafeIterator(server.cluster->nodes);while((de = dictNext(di)) != NULL) {// 得到一个集群中的一个节点clusterNode *node = dictGetVal(de);// 是否能发送消息if (!node->link) continue;if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))continue;//发送消息clusterSendMessage(node->link,buf,len);}// 释放迭代器dictReleaseIterator(di);
}
clusterSendMessage
// 发送消息 link: 接受消息的节点 msg: 消息内容 msglen: 消息长度
void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {if (sdslen(link->sndbuf) == 0 && msglen != 0)aeCreateFileEvent(server.el,link->fd,AE_WRITABLE,clusterWriteHandler,link);link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);/* Populate sent messages stats. */clusterMsg *hdr = (clusterMsg*) msg;uint16_t type = ntohs(hdr->type);if (type < CLUSTERMSG_TYPE_COUNT)server.cluster->stats_bus_messages_sent[type]++;
}
如何选择部分节点
// 方法调用链
serverCron() --> clusterCron() --> clusterSendPing()
redis保持有一个定时任务,1s运行10次。
- 每秒随机5次找出最久没有通信的节点
- 最后通信时间大于node_timeout/2
// 100ms一次,1s运行10次run_with_period(100) {if (server.cluster_enabled) clusterCron();}
// 每秒随机5次找出最久没有通信的节点if (!(iteration % 10)) {int j;/* Check a few random nodes and ping the one with the oldest* pong_received time. */for (j = 0; j < 5; j++) {de = dictGetRandomKey(server.cluster->nodes);clusterNode *this = dictGetVal(de);/* Don't ping nodes disconnected or with a ping currently active. */if (this->link == NULL || this->ping_sent != 0) continue;if (this->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))continue;if (min_pong_node == NULL || min_pong > this->pong_received) {min_pong_node = this;min_pong = this->pong_received;}}if (min_pong_node) {serverLog(LL_DEBUG,"Pinging node %.40s", min_pong_node->name);clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);}}
// 最后通信时间大于node_timeout/2
if (node->link &&node->ping_sent == 0 &&(now - node->pong_received) > server.cluster_node_timeout/2){clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);continue;}
每次ping会带最多1/10的节点信息,最少3个节点信息(server.cluster->nodes=6, 三主三从)
//clusterSendPing()方法部分代码段int freshnodes = dictSize(server.cluster->nodes)-2;
wanted = floor(dictSize(server.cluster->nodes)/10);
if (wanted < 3) wanted = 3;
if (wanted > freshnodes) wanted = freshnodes;
//want数为消息携带的节点信息
为什么要携带1/10的节点信息
How many gossip sections we want to add? 1/10 of the number of nodes
and anyway at least 3. Why 1/10?
If we have N masters, with N/10 entries, and we consider that in
node_timeout we exchange with each other node at least 4 packets
(we ping in the worst case in node_timeout/2 time, and we also
receive two pings from the host), we have a total of 8 packets
in the node_timeout*2 falure reports validity time. So we have
that, for a single PFAIL node, we can expect to receive the following
number of failure reports (in the specified window of time):
PROB * GOSSIP_ENTRIES_PER_PACKET * TOTAL_PACKETS:
PROB = probability of being featured in a single gossip entry,
which is 1 / NUM_OF_NODES.
ENTRIES = 10.
TOTAL_PACKETS = 2 * 4 * NUM_OF_MASTERS.
If we assume we have just masters (so num of nodes and num of masters
is the same), with 1/10 we always get over the majority, and specifically
80% of the number of nodes, to account for many masters failing at the
same time.
Since we have non-voting slaves that lower the probability of an entry
to feature our node, we set the number of entires per packet as
10% of the total nodes we have.
计算我们要附加的gossip节点数,gossip部分的节点数应该是所有节点数的1/10,但是最少应该包含3个节点信息。
之所以在gossip部分需要包含所有节点数的1/10,是为了能够在下线检测时间,也就是2倍的node_timeout时间内,
如果有节点下线的话,能够收到大部分集群节点发来的,关于该节点的下线报告; 1/10这个数是这样来的:
如果共有N个集群节点,在超时时间node_timeout内,当前节点最少会收到其他任一节点发来的4个心跳包:
因节点最长经过node_timeout/2时间,就会其他节点发送一次PING包。节点收到PING包后,会回复PONG包。
因此,在node_timeout时间内,当前节点会收到节点A发来的两个PING包,并且会收到节点A发来的,对于我发过去的PING包的回复包,也就是2个PONG包。
因此,在下线监测时间node_timeout2内,会收到其他任一集群节点发来的8个心跳包。
因此,当前节点总共可以收到8N个心跳包,每个心跳包中,包含下线节点信息的概率是1/10,
因此,收到下线报告的期望值就是8N(1/10),也就是N*80%,因此,这意味着可以收到大部分节点发来的下线报告。
参考
以上源码基于redis-4.0.6
《redis开发与运维》
https://blog.csdn.net/Jin_Kwok/article/details/90111631
http://c.biancheng.net/view/375.html
https://www.cnblogs.com/merlindu/p/6417957.html?utm_source=itdadao&utm_medium=referral
https://www.jianshu.com/p/652b45591bbf