1. 复制的介绍
Redis为了解决单点数据库问题,会把数据复制多个副本部署到其他节点上,通过复制,实现Redis的高可用性,实现对数据的冗余备份,保证数据和服务的高度可靠性。
2. 复制的实现
2.1 主从关系的建立
复制的建立方法有三种。
- 在redis.conf文件中配置slaveof
选项,然后指定该配置文件启动Redis生效。 - 在redis-server启动命令后加上–slaveof
启动生效。 - 直接使用 slaveof
命令在从节点执行生效。
无论是通过哪一种方式来建立主从复制,都是从节点来执行slaveof命令,那么从节点执行了这个命令到底做了什么,我们上源码:123456789101112131415161718192021222324252627282930313233343536373839404142434445464748// SLAVEOF host port命令实现void slaveofCommand(client *c) {// 如果当前处于集群模式,不能进行复制操作if (server.cluster_enabled) {addReplyError(c,"SLAVEOF not allowed in cluster mode.");return;}// SLAVEOF NO ONE命令使得这个从节点关闭复制功能,并从从节点转变回主节点,原来同步所得的数据集不会被丢弃。if (!strcasecmp(c->argv[1]->ptr,"no") &&!strcasecmp(c->argv[2]->ptr,"one")) {// 如果保存了主节点IPif (server.masterhost) {// 取消复制操作,设置服务器为主服务器replicationUnsetMaster();// 获取client的每种信息,并以sds形式返回,并打印到日志中sds client = catClientInfoString(sdsempty(),c);serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')",client);sdsfree(client);}// SLAVEOF host port} else {long port;// 获取端口号if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != C_OK))return;// 如果已存在从属于masterhost主节点且命令参数指定的主节点和masterhost相等,端口也相等,直接返回if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr)&& server.masterport == port) {serverLog(LL_NOTICE,"SLAVE OF would result into synchronization with the master we are already connected with. No operation performed.");addReplySds(c,sdsnew("+OK Already connected to specified master\r\n"));return;}// 第一次执行设置端口和ip,或者是重新设置端口和IP// 设置服务器复制操作的主节点IP和端口replicationSetMaster(c->argv[1]->ptr, port);// 获取client的每种信息,并以sds形式返回,并打印到日志中sds client = catClientInfoString(sdsempty(),c);serverLog(LL_NOTICE,"SLAVE OF %s:%d enabled (user request from '%s')",server.masterhost, server.masterport, client);sdsfree(client);}// 回复okaddReply(c,shared.ok);}
当从节点的client执行SLAVEOF命令后,该命令会被构建成Redis协议格式,发送给从节点服务器,然后节点服务器会调用slaveofCommand()函数执行该命令。
而SLAVEOF命令做的操作并不多,主要以下三步:
- 判断当前环境是否在集群模式下,因为集群模式下不行执行该命令。
- 是否执行的是SLAVEOF NO ONE命令,该命令会断开主从的关系,设置当前节点为主节点服务器。
- 设置从节点所属主节点的IP和port。调用了replicationSetMaster()函数。
SLAVEOF命令能做的只有这么多,我们来具体看下replicationSetMaster()函数的代码,看看它做了哪些与复制相关的操作。
由代码知,replicationSetMaster()函数执行操作的也很简单,总结为两步:
- 清理之前所属的主节点的信息。
- 设置新的主节点IP和port等。
因为,当前从节点有可能之前从属于另外的一个主节点服务器,因此要清理所有关于之前主节点的缓存、关闭旧的连接等等。然后设置该从节点的新主节点,设置了IP和port,还设置了以下状态:1234// 设置复制必须重新连接主节点的状态server.repl_state = REPL_STATE_CONNECT;// 初始化全局复制的偏移量server.master_repl_offset = 0;
然后,就没有然后了,然后就会执行复制操作吗?这也没有什么关于复制的操作执行了,那么复制操作是怎么开始的呢?
2.2 主从网络连接建立
slaveof命令是一个异步命令,执行命令时,从节点保存主节点的信息,确立主从关系后就会立即返回,后续的复制流程在节点内部异步执行。那么,如何触发复制的执行呢?
周期性执行的函数:replicationCron()函数,该函数被服务器的时间事件的回调函数serverCron()所调用,而serverCron()函数在Redis服务器初始化时,被设置为时间事件的处理函数。
replicationCron()函数执行频率为1秒一次:
主从关系建立后,从节点服务器的server.repl_state被设置为REPL_STATE_CONNECT,而replicationCron()函数会被每秒执行一次,该函数会发现我(从节点)现在有主节点了,而且我要的状态是要连接主节点(REPL_STATE_CONNECT)。
replicationCron()函数处理这以情况的代码如下:
replicationCron()函数根据从节点的状态,调用connectWithMaster()非阻塞连接主节点。代码如下:
connectWithMaster()函数执行的操作可以总结为:
- 根据IP和port非阻塞的方式连接主节点,得到主从节点进行通信的文件描述符fd,并保存到从节点服务器server.repl_transfer_s中,并且将刚才的REPL_STATE_CONNECT状态设置为REPL_STATE_CONNECTING。
- 监听fd的可读和可写事件,并且设置事件发生的处理程序syncWithMaster()函数。
至此,主从网络建立就完成了。
2.3 发送PING命令
主从建立网络时,同时注册fd的AE_READABLE|AE_WRITABLE事件,因此会触发一个AE_WRITABLE事件,调用syncWithMaster()函数,处理写事件。
根据当前的REPL_STATE_CONNECTING状态,从节点向主节点发送PING命令,PING命令的目的有:
- 检测主从节点之间的网络是否可用。
- 检查主从节点当前是否接受处理命令。
syncWithMaster()函数中相关操作的代码如下:12345678910111213/* Send a PING to check the master is able to reply without errors. */// 如果复制的状态为REPL_STATE_CONNECTING,发送一个PING去检查主节点是否能正确回复一个PONGif (server.repl_state == REPL_STATE_CONNECTING) {serverLog(LL_NOTICE,"Non blocking connect for SYNC fired the event.");// 暂时取消接听fd的写事件,以便等待PONG回复时,注册可读事件aeDeleteFileEvent(server.el,fd,AE_WRITABLE);// 设置复制状态为等待PONG回复server.repl_state = REPL_STATE_RECEIVE_PONG;// 发送一个PING命令err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PING",NULL);if (err) goto write_error;return;}
发送PING命令主要的操作是:
- 先取消监听fd的写事件,因为接下来要读主节点服务器发送过来的PONG回复,因此只监听可读事件的发生。
- 设置从节点的复制状态为REPL_STATE_RECEIVE_PONG。等待一个主节点回复一个PONG命令。
- 以写的方式调用sendSynchronousCommand()函数发送一个PING命令给主节点。
主节点服务器从fd会读到一个PING命令,然后会回复一个PONG命令到fd中,执行的命令就是addReply(c,shared.pong);。
此时,会触发fd的可读事件,调用syncWithMaster()函数来处理,此时从节点的复制状态为REPL_STATE_RECEIVE_PONG,等待主节点回复PONG。syncWithMaster()函数中处理这一状态的代码如下:
在这里,以读的方式调用sendSynchronousCommand(),并将读到的”+PONG\r\n”返回到err中,如果从节点正确接收到主节点发送的PONG命令,会将从节点的复制状态设置为server.repl_state = REPL_STATE_SEND_AUTH。等待进行权限的认证。
2.4 认证权限
权限认证会在syncWithMaster()函数继续执行,紧接着刚才的代码:
如果从节点的服务器设置了认证密码,则会以写方式调用sendSynchronousCommand()函数,将AUTH命令和密码写到fd中,并且将从节点的复制状态设置为server.repl_state = REPL_STATE_RECEIVE_AUTH,接受AUTH的验证。
如果从节点服务器没有设置认证密码,就直接将从节点的复制状态设置为server.repl_state = REPL_STATE_SEND_PORT,准发发送一个端口号。
主节点会读取到AUTH命令,调用authCommand()函数来处理,主节点服务器会比较从节点发送过来的server.masterauth和主节点服务器保存的server.requirepass是否一致,如果一致,会回复一个”+OK\r\n”。
当主节点将回复写到fd时,又会触发从节点的可读事件,紧接着调用syncWithMaster()函数来处理接收AUTH认证结果:
以读方式从fd中读取一个回复,判断认证是否成功,认证成功,则会将从节点的复制状态设置为server.repl_state = REPL_STATE_SEND_PORT表示要发送一个端口号给主节点。这和没有设置认证的情况结果相同。
2.5 发送端口号
从节点在认证完权限后,会继续在syncWithMaster()函数执行,处理发送端口号的状态。
发送端口号,以REPLCONF listening-port命令的方式,写到fd中。然后将复制状态设置为server.repl_state = REPL_STATE_RECEIVE_PORT,等待接受主节点的回复。
主节点从fd中读到REPLCONF listening-port
当主节点将回复写到fd时,又会触发从节点的可读事件,紧接着调用syncWithMaster()函数来处理接受端口号,验证主节点是否正确的接收到从节点的端口号。
如果主节点正确的接收到从节点的端口号,会将从节点的复制状态设置为server.repl_state = REPL_STATE_SEND_IP表示要送一个IP给主节点。
2.6 发送 IP 地址
从节点发送完端口号并且正确收到主节点的回复后,紧接着syncWithMaster()函数执行发送IP的代码。发送IP和发送端口号过程几乎一致。
同样是以REPLCONF ip-address命令的方式,将从节点的IP写到fd中。并且设置从节点的复制状态为server.repl_state = REPL_STATE_RECEIVE_IP,等待接受主节点的回复。然后就直接返回,等待fd可读发生。
主节点仍然会调用replication.c文件中实现的replconfCommand()函数来处理REPLCONF命令,解析出REPLCONF ip-address ip命令,保存从节点的ip到主节点的对应从节点的client的c->slave_ip中。然后回复”+OK\r\n”状态,写到fd中。
此时,从节点监听到fd触发了可读事件,会调用syncWithMaster()函数来处理,验证主节点是否正确接收到从节点的IP。
如果主节点正确接收了从节点IP,就会设置从节点的复制状态server.repl_state = REPL_STATE_SEND_CAPA表示发送从节点的能力(capability)。
2.7 发送能力(capability)
发送能力和发送端口和IP也是如出一辙,紧接着syncWithMaster()函数执行发送capa的代码。
从节点将REPLCONF capa eof命令发送给主节点,写到fd中。
主节点仍然会调用replication.c文件中实现的replconfCommand()函数来处理REPLCONF命令,解析出REPLCONF capa eof命令,将eof对应的标识,按位与到主节点的对应从节点的client的c->slave_capa中。然后回复”+OK\r\n”状态,写到fd中。
此时,从节点监听到fd触发了可读事件,会调用syncWithMaster()函数来处理,验证主节点是否正确接收到从节点的capa。
如果主节点正确接收了从节点capa,就会设置从节点的复制状态server.repl_state = REPL_STATE_SEND_PSYNC表示发送一个PSYNC命令
2.8 发送PSYNC命令
从节点发送PSYNC命令给主节点,尝试进行同步主节点的数据集。同步分为两种:
- 全量同步:第一次执行复制的场景。
- 部分同步:用于主从复制因为网络中断等原因造成数据丢失的场景。
因为这是第一次执行同步,因此会进行全量同步。12345678910111213// 复制状态为发送PSYNC命令。尝试进行部分重同步。// 如果没有缓冲主节点的结构,slaveTryPartialResynchronization()函数将会至少尝试使用PSYNC去进行一个全同步,这样就能得到主节点的运行runid和全局复制偏移量。并且在下次重连接时可以尝试进行部分重同步。if (server.repl_state == REPL_STATE_SEND_PSYNC) {// 向主节点发送一个部分重同步命令PSYNC,参数0表示不读主节点的回复,只获取主节点的运行runid和全局复制偏移量if (slaveTryPartialResynchronization(fd,0) == PSYNC_WRITE_ERROR) {// 发送PSYNC出错err = sdsnew("Write error sending the PSYNC command.");goto write_error;}// 设置复制状态为等待接受一个PSYNC回复server.repl_state = REPL_STATE_RECEIVE_PSYNC;return;}
从节点调用slaveTryPartialResynchronization()函数尝试进行重同步,注意第二个参数是0。因为slaveTryPartialResynchronization()分成两部分,一部分是写,一部分是读,因为第二个参数是0,因此执行写的一部分,发送一个PSYNC命令给主节点。只列举出写的部分
由于从节点是第一次和主节点进行同步操作,因此从节点缓存的主节点client状态erver.cached_master为空,所以就会发送一个PSYNC ? -1命令给主节点,表示进行一次全量同步。
主节点会接收到PSYNC ? -1命令,然后调用replication.c文件中实现的syncCommand()函数处理PSYNC命令。
syncCommand()函数先会判断执行的是PSYNC还是SYNC命令,如果是PSYNC命令会调用masterTryPartialResynchronization()命令执行部分同步,但是由于这是第一次执行复制操作,所以会执行失败。进而执行全量同步。
syncCommand()函数的代码如下:
首先先明确,主节点执行处理从节点发来PSYNC命令的操作。所以主节点会将从节点视为自己的从节点客户端来操作。会将从节点的复制设置为SLAVE_STATE_WAIT_BGSAVE_START状态表示
主节点执行全量同步的情况有三种:
- 主节点服务器正在执行BGSAVE命令,且将RDB文件写到磁盘上。
这种情况,如果有已经设置过全局重同步偏移量的从节点,可以共用输出缓冲区的数据。 - 主节点服务器正在执行BGSAVE命令,且将RDB文件写到网络socket上,无盘同步。
由于本次BGSAVE命令直接将RDB写到socket中,因此只能等待下一BGSAVE。 - 主节点服务器没有正在执行BGSAVE。
如果也没有进行AOF持久化的操作,那么开始为复制操作执行BGSAVE,生成一个写到磁盘上的RDB文件。
我们针对第三种情况来分析。调用了startBgsaveForReplication()来开始执行BGSAVE命令。我们贴出主要的代码:1234567891011121314151617181920212223242526272829303132333435// 开始为复制执行BGSAVE,根据配置选择磁盘或套接字作为RDB发送的目标,在开始之前确保冲洗脚本缓存// mincapa参数是SLAVE_CAPA_*按位与的结果int startBgsaveForReplication(int mincapa) {int retval;// 是否直接写到socketint socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF);listIter li;listNode *ln;if (socket_target)// 直接写到socket中// fork一个子进程将rdb写到 状态为等待BGSAVE开始 的从节点的socket中retval = rdbSaveToSlavesSockets();else// 否则后台进行RDB持久化BGSAVE操作,保存到磁盘上retval = rdbSaveBackground(server.rdb_filename);......// 如果是直接写到socket中,rdbSaveToSlavesSockets()已经会设置从节点为全量复制// 否则直接写到磁盘上,执行以下代码if (!socket_target) {listRewind(server.slaves,&li);// 遍历从节点链表while((ln = listNext(&li))) {client *slave = ln->value;// 设置等待全量同步的从节点的状态if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {// 设置要执行全量重同步从节点的状态replicationSetupSlaveForFullResync(slave,getPsyncInitialOffset());}}}}
replicationSetupSlaveForFullResync()函数源码如下:
哇,主节点终于回复从节点的PSYNC命令了,回复了一个+FULLRESYNC,写到主从同步的fd。表示要进行全量同步啊!!!
此时,从节点的复制状态一定为REPL_STATE_RECEIVE_PSYNC,fd的读事件发生,调用syncWithMaster()函数进行处理。
处理这种情况的代码如下:
这次的第二个参数是1,因此会执行该函数的读部分。(因为这个函数有两个部分,上一次执行了写部分,因为第二个参数是0)
至此,从节点监听主节点的读命令事件已经完成,所以取消监听了读事件。等到主节点开始传送数据给从节点时,从节点会新创建读事件。
该函数可以解析出主节点发过来的命令是哪一个,一共有三种:
- “+FULLRESYNC”:代表要进行一次全量复制。
- “+CONTINUE”:代表要进行一次部分重同步。
- “-ERR”:发生了错误。有两种可能:Redis版本过低不支持PSYNC命令和从节点读到一个错误回复。
我们关注第一个全量同步的操作。如果读到了主节点发来的”+FULLRESYNC”,那么会将同时发来的主节点运行ID和全局的复制偏移量保存到从节点的服务器属性中server.repl_master_runid和server.repl_master_initial_offset。然后返回PSYNC_FULLRESYNC。
回到syncWithMaster函数,继续处理全量同步。由于要进行全量同步,如果当前从节点还作为其他节点的主节点,因此要断开所有从节点的连接,让他们也重新同步当前节点。
准备好了所有,接下来就要等待主节点来发送RDB文件了。因此上面做了这三件事:
- 打开一个临时文件,用来保存主节点发来的RDB文件数据的。
- 监听fd的读事件,等待主节点发送RDB文件数据,触发可读事件执行readSyncBulkPayload()函数,该函数就会把主节点发来的数据读到一个缓冲区中,然后将缓冲区的数据写到刚才打开的临时文件中,接着要载入到从节点的数据库中,最后同步到磁盘中。
- 设置复制操作的状态为server.repl_state = REPL_STATE_TRANSFER。并且初始化复制的信息,例如:RDB文件的大小,偏移量,等等。(具体看上面的代码)
主节点要发送RDB文件,但是回复完”+FULLRESYNC”就再也没有操作了。而子节点创建了监听主节点写RDB文件的事件,等待主节点来写,才调用readSyncBulkPayload()函数来处理。这又有问题了,到底主节点什么时候发送RDB文件呢?如果不是主动执行,那么一定就在周期性函数内被执行。
它的调用关系如下:
serverCron()->backgroundSaveDoneHandler()->backgroundSaveDoneHandlerDisk()->updateSlavesWaitingBgsave()
updateSlavesWaitingBgsave()函数定义在replication.c中,主要操作有两步,我们简单介绍:
- 只读打开主节点的临时RDB文件,然后设置从节点client复制状态为SLAVE_STATE_SEND_BULK。
- 立刻创建监听可写的事件,并设置sendBulkToSlave()函数为可写事件的处理程序。
当主节点执行周期性函数时,主节点会先清除之前监听的可写事件,然后立即监听新的可写事件,这样就会触发可写的事件,调用sendBulkToSlave()函数将RDB文件写入到fd中,触发从节点的读事件,从节点调用readSyncBulkPayload()函数,来将RDB文件的数据载入数据库中,至此,就保证了主从同步了。
我们来简单介绍sendBulkToSlave()函数在写RDB文件时做了什么:
- 将RDB文件的大小写给从节点,以协议格式的字符串表示的大小。
- 从RDB文件的repldbfd中读出RDB文件数据,然后写到主从同步的fd中。
- 写入完成后,又一次取消监听文件可写事件,等待下一次发送缓冲区数据时在监听触发,并且调用putSlaveOnline()函数将从节点client的复制状态设置为SLAVE_STATE_ONLINE。表示已经发送RDB文件完毕,发送缓存更新。
2.9 发送输出缓冲区数据
主节点发送完RDB文件后,调用putSlaveOnline()函数将从节点client的复制状态设置为SLAVE_STATE_ONLINE,表示已经发送RDB文件完毕,要发送缓存更新了。于是会新创建一个事件,监听写事件的发生,设置sendReplyToClient为可写的处理程序,而且会将从节点client当做私有数据闯入sendReplyToClient()当做发送缓冲区的对象。
创建可写事件的时候,就会触发第一次可写,执行sendReplyToClient(),该函数还直接调用了riteToClient(fd,privdata,1)函数,于是将从节点client输出缓冲区的数据发送给了从节点服务器。
riteToClient()函数数据Redis网络连接库的函数,定义在network.c中,具体分析请看:Redis 网络连接库源码分析
这样就保证主从服务器的数据库状态一致了。
2.10 命令传播
主从节点在第一次全量同步之后就达到了一致,但是之后主节点如果执行了写命令,主节点的数据库状态就又可能发生变化,导致主从再次不一致。为了让主从节点回到一致状态,主机的执行命令后都需要将命令传播到从节点。
传播时会调用server.c中的propagate()函数,如果传播到从节点会调用replicationFeedSlaves(server.slaves,dbid,argv,argc)函数,该函数则会将执行的命令以协议的传输格式写到从节点client的输出缓冲区中,这就是为什么主节点会将从节点client的输出缓冲区发送到从节点(具体见标题2.9),也会添加到server.repl_backlog中。
我们来看看replicationFeedSlaves()函数的实现:
和AOF持久化一样,再给从节点client写命令时,会将SELECT命令强制写入,以保证命令正确读到数据库中。
不仅写入了从节点client的输出缓冲区,而且还会将命令记录到主节点服务器的复制积压缓冲区server.repl_backlog中,这是为了网络闪断后进行部分重同步。
3. 部分重同步实现
刚才剖析完全量同步,但是没有考虑特殊的情况。如果在传输RDB文件的过程中,网络发生故障,主节点和从节点的连接中断,Redis会咋么做呢?
Redis 2.8 版本之前会在进行一次连接然后进行全量复制,但是这样效率非常地下,之后的版本都提供了部分重同步的实现。那么我们就分析一下部分重同步的实现过程。
部分重同步在复制的过程中,相当于标题2.8的发送PSYNC命令的部分,其他所有的部分都要进行,他只是主节点回复从节点的命令不同,回复+CONTINUE则执行部分重同步,回复+FULLRESYNC则执行全量同步。
3.1 心跳机制
主节点是如何发现和从节点连接中断?在主从节点建立连接后,他们之间都维护者长连接并彼此发送心跳命令。主从节点彼此都有心跳机制,各自模拟成对方的客户端进行通信。
- 主节点默认每隔10秒发送PING命令,判断从节点的连接状态。
文件配置项:repl-ping-salve-period,默认是10
123456789// 首先,根据当前节点发送PING命令给从节点的频率发送PING命令// 如果当前节点是某以节点的 主节点 ,那么发送PING给从节点if ((replication_cron_loops % server.repl_ping_slave_period) == 0) {// 创建PING命令对象ping_argv[0] = createStringObject("PING",4);// 将PING发送给从服务器replicationFeedSlaves(server.slaves, server.slaveseldb, ping_argv, 1);decrRefCount(ping_argv[0]);}从节点在主线程中每隔1秒发送REPLCONF ACK
命令,给主节点报告自己当前复制偏移量。 1234// 定期发送ack给主节点,旧版本的Redis除外if (server.masterhost && server.master && !(server.master->flags & CLIENT_PRE_PSYNC))// 发送一个REPLCONF ACK命令给主节点去报告关于当前处理的offset。replicationSendAck();
在周期性函数replicationCron(),每次都要检查和主节点处于连接状态的从节点和主节点的交互时间是否超时,如果超时则会调用cancelReplicationHandshake()函数,取消和主节点的连接。等到下一个周期在和主节点重新建立连接,进行复制。
3.2 复制积压缓冲区(backlog)
复制积压缓冲区是一个大小为1M的循环队列。主节点在命令传播时,不仅会将命令发送给所有的从节点,还会将命令写入复制积压缓冲区中(具体请看标题2.10)。
也就是说,复制积压缓冲区最多可以备份1M大小的数据,如果主从节点断线时间过长,复制积压缓冲区的数据会被新数据覆盖,那么当从主从中断连接起,主节点接收到的数据超过1M大小,那么从节点就无法进行部分重同步,只能进行全量复制。
在标题2.8,介绍的syncCommand()命令中,调用masterTryPartialResynchronization()函数会进行尝试部分重同步,在我们之前分析的第一次全量同步时,该函数会执行失败,然后返回syncCommand()函数执行全量同步,而在进行恢复主从连接后,则会进行部分重同步,masterTryPartialResynchronization()函数代码如下:
如果可以进行部分重同步,主节点则会发送”+CONTINUE\r\n”作为从节点发送PSYNC回复(看标题2.8)。然后调用addReplyReplicationBacklog()函数,将backlog中的数据发送给从节点。于是就完成了部分重同步。
addReplyReplicationBacklog()函数所做的就是将backlog写到从节点的client的输出缓冲区中。