HeLei Blog

不要因为走得太远,就忘记为什么而出发


  • 首页

  • 分类

  • 归档

  • 标签

  • 搜索
close
HeLei Blog

Redis源码剖析和注释(二十二)--- Redis 复制(replicate)源码详细解析

发表于 2018-02-23 | 分类于 Redis

1. 复制的介绍

Redis为了解决单点数据库问题,会把数据复制多个副本部署到其他节点上,通过复制,实现Redis的高可用性,实现对数据的冗余备份,保证数据和服务的高度可靠性。

2. 复制的实现

2.1 主从关系的建立

复制的建立方法有三种。

  1. 在redis.conf文件中配置slaveof 选项,然后指定该配置文件启动Redis生效。
  2. 在redis-server启动命令后加上–slaveof 启动生效。
  3. 直接使用 slaveof 命令在从节点执行生效。
    无论是通过哪一种方式来建立主从复制,都是从节点来执行slaveof命令,那么从节点执行了这个命令到底做了什么,我们上源码:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    // SLAVEOF host port命令实现
    void slaveofCommand(client *c) {
    // 如果当前处于集群模式,不能进行复制操作
    if (server.cluster_enabled) {
    addReplyError(c,"SLAVEOF not allowed in cluster mode.");
    return;
    }
    // SLAVEOF NO ONE命令使得这个从节点关闭复制功能,并从从节点转变回主节点,原来同步所得的数据集不会被丢弃。
    if (!strcasecmp(c->argv[1]->ptr,"no") &&
    !strcasecmp(c->argv[2]->ptr,"one")) {
    // 如果保存了主节点IP
    if (server.masterhost) {
    // 取消复制操作,设置服务器为主服务器
    replicationUnsetMaster();
    // 获取client的每种信息,并以sds形式返回,并打印到日志中
    sds client = catClientInfoString(sdsempty(),c);
    serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')",
    client);
    sdsfree(client);
    }
    // SLAVEOF host port
    } else {
    long port;
    // 获取端口号
    if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != C_OK))
    return;
    // 如果已存在从属于masterhost主节点且命令参数指定的主节点和masterhost相等,端口也相等,直接返回
    if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr)
    && server.masterport == port) {
    serverLog(LL_NOTICE,"SLAVE OF would result into synchronization with the master we are already connected with. No operation performed.");
    addReplySds(c,sdsnew("+OK Already connected to specified master\r\n"));
    return;
    }
    // 第一次执行设置端口和ip,或者是重新设置端口和IP
    // 设置服务器复制操作的主节点IP和端口
    replicationSetMaster(c->argv[1]->ptr, port);
    // 获取client的每种信息,并以sds形式返回,并打印到日志中
    sds client = catClientInfoString(sdsempty(),c);
    serverLog(LL_NOTICE,"SLAVE OF %s:%d enabled (user request from '%s')",
    server.masterhost, server.masterport, client);
    sdsfree(client);
    }
    // 回复ok
    addReply(c,shared.ok);
    }

当从节点的client执行SLAVEOF命令后,该命令会被构建成Redis协议格式,发送给从节点服务器,然后节点服务器会调用slaveofCommand()函数执行该命令。

而SLAVEOF命令做的操作并不多,主要以下三步:

  • 判断当前环境是否在集群模式下,因为集群模式下不行执行该命令。
  • 是否执行的是SLAVEOF NO ONE命令,该命令会断开主从的关系,设置当前节点为主节点服务器。
  • 设置从节点所属主节点的IP和port。调用了replicationSetMaster()函数。

SLAVEOF命令能做的只有这么多,我们来具体看下replicationSetMaster()函数的代码,看看它做了哪些与复制相关的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 设置复制操作的主节点IP和端口
void replicationSetMaster(char *ip, int port) {
// 按需清除原来的主节点信息
sdsfree(server.masterhost);
// 设置ip和端口
server.masterhost = sdsnew(ip);
server.masterport = port;
// 如果有其他的主节点,在释放
// 例如服务器1是服务器2的主节点,现在服务器2要同步服务器3,服务器3要成为服务器2的主节点,因此要释放服务器1
if (server.master) freeClient(server.master);
// 解除所有客户端的阻塞状态
disconnectAllBlockedClients(); /* Clients blocked in master, now slave. */
// 关闭所有从节点服务器的连接,强制从节点服务器进行重新同步操作
disconnectSlaves(); /* Force our slaves to resync with us as well. */
// 释放主节点结构的缓存,不会执行部分重同步PSYNC
replicationDiscardCachedMaster(); /* Don't try a PSYNC. */
// 释放复制积压缓冲区
freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */
// 取消执行复制操作
cancelReplicationHandshake();
// 设置复制必须重新连接主节点的状态
server.repl_state = REPL_STATE_CONNECT;
// 初始化复制的偏移量
server.master_repl_offset = 0;
// 清零连接断开的时长
server.repl_down_since = 0;
}

由代码知,replicationSetMaster()函数执行操作的也很简单,总结为两步:

  • 清理之前所属的主节点的信息。
  • 设置新的主节点IP和port等。
    因为,当前从节点有可能之前从属于另外的一个主节点服务器,因此要清理所有关于之前主节点的缓存、关闭旧的连接等等。然后设置该从节点的新主节点,设置了IP和port,还设置了以下状态:
    1
    2
    3
    4
    // 设置复制必须重新连接主节点的状态
    server.repl_state = REPL_STATE_CONNECT;
    // 初始化全局复制的偏移量
    server.master_repl_offset = 0;

然后,就没有然后了,然后就会执行复制操作吗?这也没有什么关于复制的操作执行了,那么复制操作是怎么开始的呢?

2.2 主从网络连接建立

slaveof命令是一个异步命令,执行命令时,从节点保存主节点的信息,确立主从关系后就会立即返回,后续的复制流程在节点内部异步执行。那么,如何触发复制的执行呢?

周期性执行的函数:replicationCron()函数,该函数被服务器的时间事件的回调函数serverCron()所调用,而serverCron()函数在Redis服务器初始化时,被设置为时间事件的处理函数。

1
2
// void initServer(void) Redis服务器初始化
aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL)

replicationCron()函数执行频率为1秒一次:

1
2
3
// 节选自serverCron函数
// 周期性执行复制的任务
run_with_period(1000) replicationCron();

主从关系建立后,从节点服务器的server.repl_state被设置为REPL_STATE_CONNECT,而replicationCron()函数会被每秒执行一次,该函数会发现我(从节点)现在有主节点了,而且我要的状态是要连接主节点(REPL_STATE_CONNECT)。

replicationCron()函数处理这以情况的代码如下:

1
2
3
4
5
6
7
8
9
10
/* Check if we should connect to a MASTER */
// 如果处于要必须连接主节点的状态,尝试连接
if (server.repl_state == REPL_STATE_CONNECT) {
serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",
server.masterhost, server.masterport);
// 以非阻塞的方式连接主节点
if (connectWithMaster() == C_OK) {
serverLog(LL_NOTICE,"MASTER <-> SLAVE sync started");
}
}

replicationCron()函数根据从节点的状态,调用connectWithMaster()非阻塞连接主节点。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 以非阻塞的方式连接主节点
int connectWithMaster(void) {
int fd;
// 连接主节点
fd = anetTcpNonBlockBestEffortBindConnect(NULL,
server.masterhost,server.masterport,NET_FIRST_BIND_ADDR);
if (fd == -1) {
serverLog(LL_WARNING,"Unable to connect to MASTER: %s",
strerror(errno));
return C_ERR;
}
// 监听主节点fd的可读和可写事件的发生,并设置其处理程序为syncWithMaster
if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) == AE_ERR)
{
close(fd);
serverLog(LL_WARNING,"Can't create readable event for SYNC");
return C_ERR;
}
// 最近一次读到RDB文件内容的时间
server.repl_transfer_lastio = server.unixtime;
// 从节点和主节点的同步套接字
server.repl_transfer_s = fd;
// 处于和主节点正在连接的状态
server.repl_state = REPL_STATE_CONNECTING;
return C_OK;
}

connectWithMaster()函数执行的操作可以总结为:

  • 根据IP和port非阻塞的方式连接主节点,得到主从节点进行通信的文件描述符fd,并保存到从节点服务器server.repl_transfer_s中,并且将刚才的REPL_STATE_CONNECT状态设置为REPL_STATE_CONNECTING。
  • 监听fd的可读和可写事件,并且设置事件发生的处理程序syncWithMaster()函数。
    至此,主从网络建立就完成了。

2.3 发送PING命令

主从建立网络时,同时注册fd的AE_READABLE|AE_WRITABLE事件,因此会触发一个AE_WRITABLE事件,调用syncWithMaster()函数,处理写事件。

根据当前的REPL_STATE_CONNECTING状态,从节点向主节点发送PING命令,PING命令的目的有:

  • 检测主从节点之间的网络是否可用。
  • 检查主从节点当前是否接受处理命令。
    syncWithMaster()函数中相关操作的代码如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    /* Send a PING to check the master is able to reply without errors. */
    // 如果复制的状态为REPL_STATE_CONNECTING,发送一个PING去检查主节点是否能正确回复一个PONG
    if (server.repl_state == REPL_STATE_CONNECTING) {
    serverLog(LL_NOTICE,"Non blocking connect for SYNC fired the event.");
    // 暂时取消接听fd的写事件,以便等待PONG回复时,注册可读事件
    aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
    // 设置复制状态为等待PONG回复
    server.repl_state = REPL_STATE_RECEIVE_PONG;
    // 发送一个PING命令
    err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PING",NULL);
    if (err) goto write_error;
    return;
    }

发送PING命令主要的操作是:

  • 先取消监听fd的写事件,因为接下来要读主节点服务器发送过来的PONG回复,因此只监听可读事件的发生。
  • 设置从节点的复制状态为REPL_STATE_RECEIVE_PONG。等待一个主节点回复一个PONG命令。
  • 以写的方式调用sendSynchronousCommand()函数发送一个PING命令给主节点。
    主节点服务器从fd会读到一个PING命令,然后会回复一个PONG命令到fd中,执行的命令就是addReply(c,shared.pong);。

此时,会触发fd的可读事件,调用syncWithMaster()函数来处理,此时从节点的复制状态为REPL_STATE_RECEIVE_PONG,等待主节点回复PONG。syncWithMaster()函数中处理这一状态的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/* Receive the PONG command. */
// 如果复制的状态为REPL_STATE_RECEIVE_PONG,等待接受PONG命令
if (server.repl_state == REPL_STATE_RECEIVE_PONG) {
// 从主节点读一个PONG命令sendSynchronousCommand
err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
// 只接受两种有效的回复。一种是 "+PONG",一种是认证错误"-NOAUTH"。
// 旧版本的返回有"-ERR operation not permitted"
if (err[0] != '+' &&
strncmp(err,"-NOAUTH",7) != 0 &&
strncmp(err,"-ERR operation not permitted",28) != 0)
{ // 没有收到正确的PING命令的回复
serverLog(LL_WARNING,"Error reply to PING from master: '%s'",err);
sdsfree(err);
goto error;
} else {
serverLog(LL_NOTICE,"Master replied to PING, replication can continue...");
}
sdsfree(err);
// 已经收到PONG,更改状态设置为发送认证命令AUTH给主节点
server.repl_state = REPL_STATE_SEND_AUTH;
}

在这里,以读的方式调用sendSynchronousCommand(),并将读到的”+PONG\r\n”返回到err中,如果从节点正确接收到主节点发送的PONG命令,会将从节点的复制状态设置为server.repl_state = REPL_STATE_SEND_AUTH。等待进行权限的认证。

2.4 认证权限

权限认证会在syncWithMaster()函数继续执行,紧接着刚才的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/* AUTH with the master if required. */
// 如果需要,发送AUTH认证给主节点
if (server.repl_state == REPL_STATE_SEND_AUTH) {
// 如果服务器设置了认证密码
if (server.masterauth) {
// 写AUTH给主节点
err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"AUTH",server.masterauth,NULL);
if (err) goto write_error;
// 设置状态为等待接受认证回复
server.repl_state = REPL_STATE_RECEIVE_AUTH;
return;
// 如果没有设置认证密码,直接设置复制状态为发送端口号给主节点
} else {
server.repl_state = REPL_STATE_SEND_PORT;
}
}

如果从节点的服务器设置了认证密码,则会以写方式调用sendSynchronousCommand()函数,将AUTH命令和密码写到fd中,并且将从节点的复制状态设置为server.repl_state = REPL_STATE_RECEIVE_AUTH,接受AUTH的验证。

如果从节点服务器没有设置认证密码,就直接将从节点的复制状态设置为server.repl_state = REPL_STATE_SEND_PORT,准发发送一个端口号。

主节点会读取到AUTH命令,调用authCommand()函数来处理,主节点服务器会比较从节点发送过来的server.masterauth和主节点服务器保存的server.requirepass是否一致,如果一致,会回复一个”+OK\r\n”。

当主节点将回复写到fd时,又会触发从节点的可读事件,紧接着调用syncWithMaster()函数来处理接收AUTH认证结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/* Receive AUTH reply. */
// 接受AUTH认证的回复
if (server.repl_state == REPL_STATE_RECEIVE_AUTH) {
// 从主节点读回复
err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
// 回复错误,认证失败
if (err[0] == '-') {
serverLog(LL_WARNING,"Unable to AUTH to MASTER: %s",err);
sdsfree(err);
goto error;
}
sdsfree(err);
// 设置复制状态为发送端口号给主节点
server.repl_state = REPL_STATE_SEND_PORT;
}

以读方式从fd中读取一个回复,判断认证是否成功,认证成功,则会将从节点的复制状态设置为server.repl_state = REPL_STATE_SEND_PORT表示要发送一个端口号给主节点。这和没有设置认证的情况结果相同。

2.5 发送端口号

从节点在认证完权限后,会继续在syncWithMaster()函数执行,处理发送端口号的状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/* Set the slave port, so that Master's INFO command can list the
* slave listening port correctly. */
// 如果复制状态是,发送从节点端口号给主节点,主节点的INFO命令就能够列出从节点正在监听的端口号
if (server.repl_state == REPL_STATE_SEND_PORT) {
// 获取端口号
sds port = sdsfromlonglong(server.slave_announce_port ?
server.slave_announce_port : server.port);
// 将端口号写给主节点
err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF","listening-port",port, NULL);
sdsfree(port);
if (err) goto write_error;
sdsfree(err);
// 设置复制状态为接受端口号
server.repl_state = REPL_STATE_RECEIVE_PORT;
return;
}

发送端口号,以REPLCONF listening-port命令的方式,写到fd中。然后将复制状态设置为server.repl_state = REPL_STATE_RECEIVE_PORT,等待接受主节点的回复。

主节点从fd中读到REPLCONF listening-port 命令,调用replconfCommand()命令来处理,而replconfCommand()函数的定义就在replication.c文件中,REPLCONF命令可以设置多种不同的选项,解析到端口号后,将端口号保存从节点对应client状态的c->slave_listening_port = port中。最终回复一个”+OK\r\n”状态的回复,写在fd中。

当主节点将回复写到fd时,又会触发从节点的可读事件,紧接着调用syncWithMaster()函数来处理接受端口号,验证主节点是否正确的接收到从节点的端口号。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/* Receive REPLCONF listening-port reply. */
// 复制状态为接受端口号
if (server.repl_state == REPL_STATE_RECEIVE_PORT) {
// 从主节点读取端口号
err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
/* Ignore the error if any, not all the Redis versions support
* REPLCONF listening-port. */
// 忽略所有的错误,因为不是所有的Redis版本都支持REPLCONF listening-port命令
if (err[0] == '-') {
serverLog(LL_NOTICE,"(Non critical) Master does not understand REPLCONF listening-port: %s", err);
}
sdsfree(err);
// 设置复制状态为发送IP
server.repl_state = REPL_STATE_SEND_IP;
}

如果主节点正确的接收到从节点的端口号,会将从节点的复制状态设置为server.repl_state = REPL_STATE_SEND_IP表示要送一个IP给主节点。

2.6 发送 IP 地址

从节点发送完端口号并且正确收到主节点的回复后,紧接着syncWithMaster()函数执行发送IP的代码。发送IP和发送端口号过程几乎一致。

1
2
3
4
5
6
7
8
9
10
// 复制状态为发送IP
if (server.repl_state == REPL_STATE_SEND_IP) {
// 将IP写给主节点
err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF","ip-address",server.slave_announce_ip, NULL);
if (err) goto write_error;
sdsfree(err);
// 设置复制状态为接受IP
server.repl_state = REPL_STATE_RECEIVE_IP;
return;
}

同样是以REPLCONF ip-address命令的方式,将从节点的IP写到fd中。并且设置从节点的复制状态为server.repl_state = REPL_STATE_RECEIVE_IP,等待接受主节点的回复。然后就直接返回,等待fd可读发生。

主节点仍然会调用replication.c文件中实现的replconfCommand()函数来处理REPLCONF命令,解析出REPLCONF ip-address ip命令,保存从节点的ip到主节点的对应从节点的client的c->slave_ip中。然后回复”+OK\r\n”状态,写到fd中。

此时,从节点监听到fd触发了可读事件,会调用syncWithMaster()函数来处理,验证主节点是否正确接收到从节点的IP。

1
2
3
4
5
6
7
8
9
10
11
12
13
/* Receive REPLCONF ip-address reply. */
// 复制状态为接受IP回复
if (server.repl_state == REPL_STATE_RECEIVE_IP) {
// 从主节点读一个IP回复
err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
// 错误回复
if (err[0] == '-') {
serverLog(LL_NOTICE,"(Non critical) Master does not understand REPLCONF ip-address: %s", err);
}
sdsfree(err);
// 设置复制状态为发送一个capa(能力?能否解析出RDB文件的EOF流格式)
server.repl_state = REPL_STATE_SEND_CAPA;
}

如果主节点正确接收了从节点IP,就会设置从节点的复制状态server.repl_state = REPL_STATE_SEND_CAPA表示发送从节点的能力(capability)。

2.7 发送能力(capability)

发送能力和发送端口和IP也是如出一辙,紧接着syncWithMaster()函数执行发送capa的代码。

1
2
3
4
5
6
7
8
9
10
// 复制状态为发送capa,通知主节点从节点的能力
if (server.repl_state == REPL_STATE_SEND_CAPA) {
// 将从节点的capa写给主节点
err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF","capa","eof",NULL);
if (err) goto write_error;
sdsfree(err);
// 设置复制状态为接受从节点的capa
server.repl_state = REPL_STATE_RECEIVE_CAPA;
return;
}

从节点将REPLCONF capa eof命令发送给主节点,写到fd中。

1
目前只支持一种能力,就是能够解析出RDB文件的EOF流格式。用于无盘复制的方式中。

主节点仍然会调用replication.c文件中实现的replconfCommand()函数来处理REPLCONF命令,解析出REPLCONF capa eof命令,将eof对应的标识,按位与到主节点的对应从节点的client的c->slave_capa中。然后回复”+OK\r\n”状态,写到fd中。

此时,从节点监听到fd触发了可读事件,会调用syncWithMaster()函数来处理,验证主节点是否正确接收到从节点的capa。

1
2
3
4
5
6
7
8
9
10
11
12
13
/* Receive CAPA reply. */
// 复制状态为接受从节点的capa回复
if (server.repl_state == REPL_STATE_RECEIVE_CAPA) {
// 从主节点读取capa回复
err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
// 错误回复
if (err[0] == '-') {
serverLog(LL_NOTICE,"(Non critical) Master does not understand REPLCONF capa: %s", err);
}
sdsfree(err);
// 设置复制状态为发送PSYNC命令
server.repl_state = REPL_STATE_SEND_PSYNC;
}

如果主节点正确接收了从节点capa,就会设置从节点的复制状态server.repl_state = REPL_STATE_SEND_PSYNC表示发送一个PSYNC命令

2.8 发送PSYNC命令

从节点发送PSYNC命令给主节点,尝试进行同步主节点的数据集。同步分为两种:

  • 全量同步:第一次执行复制的场景。
  • 部分同步:用于主从复制因为网络中断等原因造成数据丢失的场景。
    因为这是第一次执行同步,因此会进行全量同步。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    // 复制状态为发送PSYNC命令。尝试进行部分重同步。
    // 如果没有缓冲主节点的结构,slaveTryPartialResynchronization()函数将会至少尝试使用PSYNC去进行一个全同步,这样就能得到主节点的运行runid和全局复制偏移量。并且在下次重连接时可以尝试进行部分重同步。
    if (server.repl_state == REPL_STATE_SEND_PSYNC) {
    // 向主节点发送一个部分重同步命令PSYNC,参数0表示不读主节点的回复,只获取主节点的运行runid和全局复制偏移量
    if (slaveTryPartialResynchronization(fd,0) == PSYNC_WRITE_ERROR) {
    // 发送PSYNC出错
    err = sdsnew("Write error sending the PSYNC command.");
    goto write_error;
    }
    // 设置复制状态为等待接受一个PSYNC回复
    server.repl_state = REPL_STATE_RECEIVE_PSYNC;
    return;
    }

从节点调用slaveTryPartialResynchronization()函数尝试进行重同步,注意第二个参数是0。因为slaveTryPartialResynchronization()分成两部分,一部分是写,一部分是读,因为第二个参数是0,因此执行写的一部分,发送一个PSYNC命令给主节点。只列举出写的部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/* Writing half */
// 如果read_reply为0,则该函数往socket上会写入一个PSYNC命令
if (!read_reply) {
// 将repl_master_initial_offset设置为-1表示主节点的run_id和全局复制偏移量是无效的。
// 如果能使用PSYNC命令执行一个全量同步,会正确设置全复制偏移量,以便这个信息被正确传播主节点的所有从节点中
server.repl_master_initial_offset = -1;
// 主节点的缓存不为空,可以尝试进行部分重同步。PSYNC <master_run_id> <repl_offset>
if (server.cached_master) {
// 保存缓存runid
psync_runid = server.cached_master->replrunid;
// 获取已经复制的偏移量
snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_runid, psync_offset);
// 主节点的缓存为空,发送PSYNC ? -1。请求全量同步
} else {
serverLog(LL_NOTICE,"Partial resynchronization not possible (no cached master)");
psync_runid = "?";
memcpy(psync_offset,"-1",3);
}
/* Issue the PSYNC command */
// 发送一个PSYNC命令给主节点
reply = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PSYNC",psync_runid,psync_offset,NULL);
// 写成功失败会返回一个"-"开头的字符串
if (reply != NULL) {
serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply);
sdsfree(reply);
// 删除文件的可读事件,返回写错误PSYNC_WRITE_ERROR
aeDeleteFileEvent(server.el,fd,AE_READABLE);
return PSYNC_WRITE_ERROR;
}
// 返回等待回复的标识PSYNC_WAIT_REPLY,调用者会将read_reply设置为1,然后再次调用该函数,执行下面的读部分。
return PSYNC_WAIT_REPLY;
}

由于从节点是第一次和主节点进行同步操作,因此从节点缓存的主节点client状态erver.cached_master为空,所以就会发送一个PSYNC ? -1命令给主节点,表示进行一次全量同步。

主节点会接收到PSYNC ? -1命令,然后调用replication.c文件中实现的syncCommand()函数处理PSYNC命令。

syncCommand()函数先会判断执行的是PSYNC还是SYNC命令,如果是PSYNC命令会调用masterTryPartialResynchronization()命令执行部分同步,但是由于这是第一次执行复制操作,所以会执行失败。进而执行全量同步。

syncCommand()函数的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
/* SYNC and PSYNC command implemenation. */
// SYNC and PSYNC 命令实现
void syncCommand(client *c) {
..........//为了简洁,删除一些判断条件的代码
// 尝试执行一个部分同步PSYNC的命令,则masterTryPartialResynchronization()会回复一个 "+FULLRESYNC <runid> <offset>",如果失败则执行全量同步
// 所以,从节点会如果和主节点连接断开,从节点会知道runid和offset,随后会尝试执行PSYNC
// 如果是执行PSYNC命令
if (!strcasecmp(c->argv[0]->ptr,"psync")) {
// 主节点尝试执行部分重同步,执行成功返回C_OK
if (masterTryPartialResynchronization(c) == C_OK) {
// 可以执行PSYNC命令,则将接受PSYNC命令的个数加1
server.stat_sync_partial_ok++;
// 不需要执行后面的全量同步,直接返回
return; /* No full resync needed, return. */
// 不能执行PSYNC部分重同步,需要进行全量同步
} else {
char *master_runid = c->argv[1]->ptr;
// 从节点以强制全量同步为目的,所以不能执行部分重同步,因此增加PSYNC命令失败的次数
if (master_runid[0] != '?') server.stat_sync_partial_err++;
}
// 执行SYNC命令
} else {
// 设置标识,执行SYNC命令,不接受REPLCONF ACK
c->flags |= CLIENT_PRE_PSYNC;
}
// 全量重同步次数加1
server.stat_sync_full++;
// 设置client状态为:从服务器节点等待BGSAVE节点的开始
c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
// 执行SYNC命令后是否关闭TCP_NODELAY
if (server.repl_disable_tcp_nodelay)
// 是的话,则启用nagle算法
anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */
// 保存主服务器传来的RDB文件的fd,设置为-1
c->repldbfd = -1;
// 设置client状态为从节点,标识client是一个从服务器
c->flags |= CLIENT_SLAVE;
// 添加到服务器从节点链表中
listAddNodeTail(server.slaves,c);
/* CASE 1: BGSAVE is in progress, with disk target. */
// 情况1. 正在执行 BGSAVE ,且是同步到磁盘上
if (server.rdb_child_pid != -1 &&
server.rdb_child_type == RDB_CHILD_TYPE_DISK)
{
client *slave;
listNode *ln;
listIter li;
listRewind(server.slaves,&li);
// 遍历从节点链表
while((ln = listNext(&li))) {
slave = ln->value;
// 如果有从节点已经创建子进程执行写RDB操作,等待完成,那么退出循环
// 从节点的状态为 SLAVE_STATE_WAIT_BGSAVE_END 在情况三中被设置
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) break;
}
// 对于这个从节点,我们检查它是否具有触发当前BGSAVE操作的能力
if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) {
// 将slave的输出缓冲区所有内容拷贝给c的所有输出缓冲区中
copyClientOutputBuffer(c,slave);
// 设置全量重同步从节点的状态,设置部分重同步的偏移量
replicationSetupSlaveForFullResync(c,slave->psync_initial_offset);
serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC");
} else {
serverLog(LL_NOTICE,"Can't attach the slave to the current BGSAVE. Waiting for next BGSAVE for SYNC");
}
/* CASE 2: BGSAVE is in progress, with socket target. */
// 情况2. 正在执行BGSAVE,且是无盘同步,直接写到socket中
} else if (server.rdb_child_pid != -1 &&
server.rdb_child_type == RDB_CHILD_TYPE_SOCKET)
{
// 虽然有子进程在执行写RDB,但是它直接写到socket中,所以等待下次执行BGSAVE
serverLog(LL_NOTICE,"Current BGSAVE has socket target. Waiting for next BGSAVE for SYNC");
/* CASE 3: There is no BGSAVE is progress. */
// 情况3:没有执行BGSAVE的进程
} else {
// 服务器支持无盘同步
if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF)) {
// 无盘同步复制的子进程被创建在replicationCron()中,因为想等待更多的从节点可以到来而延迟
if (server.repl_diskless_sync_delay)
serverLog(LL_NOTICE,"Delay next BGSAVE for diskless SYNC");
// 服务器不支持无盘复制
} else {
// 如果没有正在执行BGSAVE,且没有进行写AOF文件,则开始为复制执行BGSAVE,并且是将RDB文件写到磁盘上
if (server.aof_child_pid == -1) {
startBgsaveForReplication(c->slave_capa);
} else {
serverLog(LL_NOTICE,
"No BGSAVE in progress, but an AOF rewrite is active. BGSAVE for replication delayed");
}
}
}
// 只有一个从节点,且backlog为空,则创建一个新的backlog
if (listLength(server.slaves) == 1 && server.repl_backlog == NULL)
createReplicationBacklog();
return;
}

首先先明确,主节点执行处理从节点发来PSYNC命令的操作。所以主节点会将从节点视为自己的从节点客户端来操作。会将从节点的复制设置为SLAVE_STATE_WAIT_BGSAVE_START状态表示

主节点执行全量同步的情况有三种:

  • 主节点服务器正在执行BGSAVE命令,且将RDB文件写到磁盘上。
    这种情况,如果有已经设置过全局重同步偏移量的从节点,可以共用输出缓冲区的数据。
  • 主节点服务器正在执行BGSAVE命令,且将RDB文件写到网络socket上,无盘同步。
    由于本次BGSAVE命令直接将RDB写到socket中,因此只能等待下一BGSAVE。
  • 主节点服务器没有正在执行BGSAVE。
    如果也没有进行AOF持久化的操作,那么开始为复制操作执行BGSAVE,生成一个写到磁盘上的RDB文件。
    我们针对第三种情况来分析。调用了startBgsaveForReplication()来开始执行BGSAVE命令。我们贴出主要的代码:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    // 开始为复制执行BGSAVE,根据配置选择磁盘或套接字作为RDB发送的目标,在开始之前确保冲洗脚本缓存
    // mincapa参数是SLAVE_CAPA_*按位与的结果
    int startBgsaveForReplication(int mincapa) {
    int retval;
    // 是否直接写到socket
    int socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF);
    listIter li;
    listNode *ln;
    if (socket_target)
    // 直接写到socket中
    // fork一个子进程将rdb写到 状态为等待BGSAVE开始 的从节点的socket中
    retval = rdbSaveToSlavesSockets();
    else
    // 否则后台进行RDB持久化BGSAVE操作,保存到磁盘上
    retval = rdbSaveBackground(server.rdb_filename);
    ......
    // 如果是直接写到socket中,rdbSaveToSlavesSockets()已经会设置从节点为全量复制
    // 否则直接写到磁盘上,执行以下代码
    if (!socket_target) {
    listRewind(server.slaves,&li);
    // 遍历从节点链表
    while((ln = listNext(&li))) {
    client *slave = ln->value;
    // 设置等待全量同步的从节点的状态
    if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
    // 设置要执行全量重同步从节点的状态
    replicationSetupSlaveForFullResync(slave,
    getPsyncInitialOffset());
    }
    }
    }
    }

replicationSetupSlaveForFullResync()函数源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
int replicationSetupSlaveForFullResync(client *slave, long long offset) {
char buf[128];
int buflen;
// 设置全量重同步的偏移量
slave->psync_initial_offset = offset;
// 设置从节点复制状态,开始累计差异数据
slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END;
// 将slaveseldb设置为-1,是为了强制发送一个select命令在复制流中
server.slaveseldb = -1;
// 如果从节点的状态是CLIENT_PRE_PSYNC,则表示是Redis是2.8之前的版本,则不将这些信息发送给从节点。
// 因为在2.8之前只支持SYNC的全量复制同步,而在之后的版本提供了部分的重同步
if (!(slave->flags & CLIENT_PRE_PSYNC)) {
buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n",
server.runid,offset);
// 否则会将全量复制的信息写给从节点
if (write(slave->fd,buf,buflen) != buflen) {
freeClientAsync(slave);
return C_ERR;
}
}
return C_OK;
}

哇,主节点终于回复从节点的PSYNC命令了,回复了一个+FULLRESYNC,写到主从同步的fd。表示要进行全量同步啊!!!

此时,从节点的复制状态一定为REPL_STATE_RECEIVE_PSYNC,fd的读事件发生,调用syncWithMaster()函数进行处理。

处理这种情况的代码如下:

1
2
// 那么尝试进行第二次部分重同步,从主节点读取指令来决定执行部分重同步还是全量同步
psync_result = slaveTryPartialResynchronization(fd,1);

这次的第二个参数是1,因此会执行该函数的读部分。(因为这个函数有两个部分,上一次执行了写部分,因为第二个参数是0)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/* Reading half */
// 从主节点读一个命令保存在reply中
reply = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
if (sdslen(reply) == 0) {
// 主节点为了保持连接的状态,可能会在接收到PSYNC命令后发送一个空行
sdsfree(reply);
// 所以就返回PSYNC_WAIT_REPLY,调用者会将read_reply设置为1,然后再次调用该函数。
return PSYNC_WAIT_REPLY;
}
// 如果读到了一个命令,删除fd的可读事件
aeDeleteFileEvent(server.el,fd,AE_READABLE);
// 接受到的是"+FULLRESYNC",表示进行一次全量同步
if (!strncmp(reply,"+FULLRESYNC",11)) {
char *runid = NULL, *offset = NULL;
// 解析回复中的内容,将runid和复制偏移量提取出来
runid = strchr(reply,' ');
if (runid) {
runid++; //定位到runid的地址
offset = strchr(runid,' ');
if (offset) offset++; //定位offset
}
// 如果runid和offset任意为空,那么发生不期望错误
if (!runid || !offset || (offset-runid-1) != CONFIG_RUN_ID_SIZE) {
serverLog(LL_WARNING,"Master replied with wrong +FULLRESYNC syntax.");
// 将主节点的运行ID重置为0
memset(server.repl_master_runid,0,CONFIG_RUN_ID_SIZE+1);
// runid和offset获取成功
} else {
// 设置服务器保存的主节点的运行ID
memcpy(server.repl_master_runid, runid, offset-runid-1);
server.repl_master_runid[CONFIG_RUN_ID_SIZE] = '\0';
// 主节点的偏移量
server.repl_master_initial_offset = strtoll(offset,NULL,10);
serverLog(LL_NOTICE,"Full resync from master: %s:%lld",server.repl_master_runid, server.repl_master_initial_offset);
}
// 执行全量同步,所以缓存的主节点结构没用了,将其清空
replicationDiscardCachedMaster();
sdsfree(reply);
// 返回执行的状态
return PSYNC_FULLRESYNC;
}
// 接受到的是"+CONTINUE",表示进行一次部分重同步
if (!strncmp(reply,"+CONTINUE",9)) {
serverLog(LL_NOTICE,"Successful partial resynchronization with master.");
sdsfree(reply);
// 因为执行部分重同步,因此要使用缓存的主节点结构,所以将其设置为当前的主节点,被同步的主节点
replicationResurrectCachedMaster(fd);
// 返回执行的状态
return PSYNC_CONTINUE;
}
// 接收到了错误,两种情况。
// 1. 主节点不支持PSYNC命令,Redis版本低于2.8
// 2. 从主节点读取了一个不期望的回复
if (strncmp(reply,"-ERR",4)) {
/* If it's not an error, log the unexpected event. */
serverLog(LL_WARNING,"Unexpected reply to PSYNC from master: %s", reply);
} else {
serverLog(LL_NOTICE,"Master does not support PSYNC or is in error state (reply: %s)", reply);
}
sdsfree(reply);
replicationDiscardCachedMaster();
// 发送不支持PSYNC命令的状态
return PSYNC_NOT_SUPPORTED;

至此,从节点监听主节点的读命令事件已经完成,所以取消监听了读事件。等到主节点开始传送数据给从节点时,从节点会新创建读事件。

该函数可以解析出主节点发过来的命令是哪一个,一共有三种:

  • “+FULLRESYNC”:代表要进行一次全量复制。
  • “+CONTINUE”:代表要进行一次部分重同步。
  • “-ERR”:发生了错误。有两种可能:Redis版本过低不支持PSYNC命令和从节点读到一个错误回复。
    我们关注第一个全量同步的操作。如果读到了主节点发来的”+FULLRESYNC”,那么会将同时发来的主节点运行ID和全局的复制偏移量保存到从节点的服务器属性中server.repl_master_runid和server.repl_master_initial_offset。然后返回PSYNC_FULLRESYNC。

回到syncWithMaster函数,继续处理全量同步。由于要进行全量同步,如果当前从节点还作为其他节点的主节点,因此要断开所有从节点的连接,让他们也重新同步当前节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 执行到这里,psync_result == PSYNC_FULLRESYNC或PSYNC_NOT_SUPPORTED
// 准备一个合适临时文件用来写入和保存主节点传来的RDB文件数据
while(maxtries--) {
// 设置文件的名字
snprintf(tmpfile,256,
"temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
// 以读写,可执行权限打开临时文件
dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
// 打开成功,跳出循环
if (dfd != -1) break;
sleep(1);
}
/* Setup the non blocking download of the bulk file. */
// 监听一个fd的读事件,并设置该事件的处理程序为readSyncBulkPayload
if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
== AE_ERR)
{
serverLog(LL_WARNING,
"Can't create readable event for SYNC: %s (fd=%d)",
strerror(errno),fd);
goto error;
}
// 复制状态为正从主节点接受RDB文件
server.repl_state = REPL_STATE_TRANSFER;
// 初始化RDB文件的大小
server.repl_transfer_size = -1;
// 已读的大小
server.repl_transfer_read = 0;
// 最近一个执行fsync的偏移量为0
server.repl_transfer_last_fsync_off = 0;
// 传输RDB文件的临时fd
server.repl_transfer_fd = dfd;
// 最近一次读到RDB文件内容的时间
server.repl_transfer_lastio = server.unixtime;
// 保存RDB文件的临时文件名
server.repl_transfer_tmpfile = zstrdup(tmpfile);
return;

准备好了所有,接下来就要等待主节点来发送RDB文件了。因此上面做了这三件事:

  • 打开一个临时文件,用来保存主节点发来的RDB文件数据的。
  • 监听fd的读事件,等待主节点发送RDB文件数据,触发可读事件执行readSyncBulkPayload()函数,该函数就会把主节点发来的数据读到一个缓冲区中,然后将缓冲区的数据写到刚才打开的临时文件中,接着要载入到从节点的数据库中,最后同步到磁盘中。
  • 设置复制操作的状态为server.repl_state = REPL_STATE_TRANSFER。并且初始化复制的信息,例如:RDB文件的大小,偏移量,等等。(具体看上面的代码)
    主节点要发送RDB文件,但是回复完”+FULLRESYNC”就再也没有操作了。而子节点创建了监听主节点写RDB文件的事件,等待主节点来写,才调用readSyncBulkPayload()函数来处理。这又有问题了,到底主节点什么时候发送RDB文件呢?如果不是主动执行,那么一定就在周期性函数内被执行。

它的调用关系如下:

serverCron()->backgroundSaveDoneHandler()->backgroundSaveDoneHandlerDisk()->updateSlavesWaitingBgsave()

updateSlavesWaitingBgsave()函数定义在replication.c中,主要操作有两步,我们简单介绍:

  • 只读打开主节点的临时RDB文件,然后设置从节点client复制状态为SLAVE_STATE_SEND_BULK。
  • 立刻创建监听可写的事件,并设置sendBulkToSlave()函数为可写事件的处理程序。
    当主节点执行周期性函数时,主节点会先清除之前监听的可写事件,然后立即监听新的可写事件,这样就会触发可写的事件,调用sendBulkToSlave()函数将RDB文件写入到fd中,触发从节点的读事件,从节点调用readSyncBulkPayload()函数,来将RDB文件的数据载入数据库中,至此,就保证了主从同步了。

我们来简单介绍sendBulkToSlave()函数在写RDB文件时做了什么:

  • 将RDB文件的大小写给从节点,以协议格式的字符串表示的大小。
  • 从RDB文件的repldbfd中读出RDB文件数据,然后写到主从同步的fd中。
  • 写入完成后,又一次取消监听文件可写事件,等待下一次发送缓冲区数据时在监听触发,并且调用putSlaveOnline()函数将从节点client的复制状态设置为SLAVE_STATE_ONLINE。表示已经发送RDB文件完毕,发送缓存更新。

2.9 发送输出缓冲区数据

主节点发送完RDB文件后,调用putSlaveOnline()函数将从节点client的复制状态设置为SLAVE_STATE_ONLINE,表示已经发送RDB文件完毕,要发送缓存更新了。于是会新创建一个事件,监听写事件的发生,设置sendReplyToClient为可写的处理程序,而且会将从节点client当做私有数据闯入sendReplyToClient()当做发送缓冲区的对象。

1
aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,sendReplyToClient, slave)

创建可写事件的时候,就会触发第一次可写,执行sendReplyToClient(),该函数还直接调用了riteToClient(fd,privdata,1)函数,于是将从节点client输出缓冲区的数据发送给了从节点服务器。

riteToClient()函数数据Redis网络连接库的函数,定义在network.c中,具体分析请看:Redis 网络连接库源码分析

这样就保证主从服务器的数据库状态一致了。

2.10 命令传播

主从节点在第一次全量同步之后就达到了一致,但是之后主节点如果执行了写命令,主节点的数据库状态就又可能发生变化,导致主从再次不一致。为了让主从节点回到一致状态,主机的执行命令后都需要将命令传播到从节点。

传播时会调用server.c中的propagate()函数,如果传播到从节点会调用replicationFeedSlaves(server.slaves,dbid,argv,argc)函数,该函数则会将执行的命令以协议的传输格式写到从节点client的输出缓冲区中,这就是为什么主节点会将从节点client的输出缓冲区发送到从节点(具体见标题2.9),也会添加到server.repl_backlog中。

我们来看看replicationFeedSlaves()函数的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
// 将参数列表中的参数发送给从服务器
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
listNode *ln;
listIter li;
int j, len;
char llstr[LONG_STR_SIZE];
// 如果没有backlog且没有从节点服务器,直接返回
if (server.repl_backlog == NULL && listLength(slaves) == 0) return;
/* We can't have slaves attached and no backlog. */
serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
// 如果当前从节点使用的数据库不是目标的数据库,则要生成一个select命令
if (server.slaveseldb != dictid) {
robj *selectcmd;
// 0 <= id < 10 ,可以使用共享的select命令对象
if (dictid >= 0 && dictid < PROTO_SHARED_SELECT_CMDS) {
selectcmd = shared.select[dictid];
// 否则自行按照协议格式构建select命令对象
} else {
int dictid_len;
dictid_len = ll2string(llstr,sizeof(llstr),dictid);
selectcmd = createObject(OBJ_STRING,
sdscatprintf(sdsempty(),
"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
dictid_len, llstr));
}
// 将select 命令添加到backlog中
if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);
// 发送给从服务器
listRewind(slaves,&li);
// 遍历所有的从服务器节点
while((ln = listNext(&li))) {
client *slave = ln->value;
// 从节点服务器状态为等待BGSAVE的开始,因此跳过回复,遍历下一个节点
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
// 添加select命令到当前从节点的回复中
addReply(slave,selectcmd);
}
// 释放临时对象
if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS)
decrRefCount(selectcmd);
}
// 设置当前从节点使用的数据库ID
server.slaveseldb = dictid;
// 将命令写到backlog中
if (server.repl_backlog) {
char aux[LONG_STR_SIZE+3];
// 将参数个数构建成协议标准的字符串
// *<argc>\r\n
aux[0] = '*';
len = ll2string(aux+1,sizeof(aux)-1,argc);
aux[len+1] = '\r';
aux[len+2] = '\n';
// 添加到backlog中
feedReplicationBacklog(aux,len+3);
// 遍历所有的参数
for (j = 0; j < argc; j++) {
// 返回参数对象的长度
long objlen = stringObjectLen(argv[j]);
// 构建成协议标准的字符串,并添加到backlog中
// $<len>\r\n<argv>\r\n
aux[0] = '$';
len = ll2string(aux+1,sizeof(aux)-1,objlen);
aux[len+1] = '\r';
aux[len+2] = '\n';
// 添加$<len>\r\n
feedReplicationBacklog(aux,len+3);
// 添加参数对象<argv>
feedReplicationBacklogWithObject(argv[j]);
// 添加\r\n
feedReplicationBacklog(aux+len+1,2);
}
}
// 将命令写到每一个从节点中
listRewind(server.slaves,&li);
// 遍历从节点链表
while((ln = listNext(&li))) {
client *slave = ln->value;
// 从节点服务器状态为等待BGSAVE的开始,因此跳过回复,遍历下一个节点
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
// 将命令写给正在等待初次SYNC的从节点(所以这些命令在输出缓冲区中排队,直到初始SYNC完成),或已经与主节点同步
/* Add the multi bulk length. */
// 添加回复的长度
addReplyMultiBulkLen(slave,argc);
// 将所有的参数列表添加到从节点的输出缓冲区
for (j = 0; j < argc; j++)
addReplyBulk(slave,argv[j]);
}
}

和AOF持久化一样,再给从节点client写命令时,会将SELECT命令强制写入,以保证命令正确读到数据库中。

不仅写入了从节点client的输出缓冲区,而且还会将命令记录到主节点服务器的复制积压缓冲区server.repl_backlog中,这是为了网络闪断后进行部分重同步。

3. 部分重同步实现

刚才剖析完全量同步,但是没有考虑特殊的情况。如果在传输RDB文件的过程中,网络发生故障,主节点和从节点的连接中断,Redis会咋么做呢?

Redis 2.8 版本之前会在进行一次连接然后进行全量复制,但是这样效率非常地下,之后的版本都提供了部分重同步的实现。那么我们就分析一下部分重同步的实现过程。

部分重同步在复制的过程中,相当于标题2.8的发送PSYNC命令的部分,其他所有的部分都要进行,他只是主节点回复从节点的命令不同,回复+CONTINUE则执行部分重同步,回复+FULLRESYNC则执行全量同步。

3.1 心跳机制

主节点是如何发现和从节点连接中断?在主从节点建立连接后,他们之间都维护者长连接并彼此发送心跳命令。主从节点彼此都有心跳机制,各自模拟成对方的客户端进行通信。

  • 主节点默认每隔10秒发送PING命令,判断从节点的连接状态。
  • 文件配置项:repl-ping-salve-period,默认是10

    1
    2
    3
    4
    5
    6
    7
    8
    9
    // 首先,根据当前节点发送PING命令给从节点的频率发送PING命令
    // 如果当前节点是某以节点的 主节点 ,那么发送PING给从节点
    if ((replication_cron_loops % server.repl_ping_slave_period) == 0) {
    // 创建PING命令对象
    ping_argv[0] = createStringObject("PING",4);
    // 将PING发送给从服务器
    replicationFeedSlaves(server.slaves, server.slaveseldb, ping_argv, 1);
    decrRefCount(ping_argv[0]);
    }
  • 从节点在主线程中每隔1秒发送REPLCONF ACK 命令,给主节点报告自己当前复制偏移量。

    1
    2
    3
    4
    // 定期发送ack给主节点,旧版本的Redis除外
    if (server.masterhost && server.master && !(server.master->flags & CLIENT_PRE_PSYNC))
    // 发送一个REPLCONF ACK命令给主节点去报告关于当前处理的offset。
    replicationSendAck();

在周期性函数replicationCron(),每次都要检查和主节点处于连接状态的从节点和主节点的交互时间是否超时,如果超时则会调用cancelReplicationHandshake()函数,取消和主节点的连接。等到下一个周期在和主节点重新建立连接,进行复制。

3.2 复制积压缓冲区(backlog)

复制积压缓冲区是一个大小为1M的循环队列。主节点在命令传播时,不仅会将命令发送给所有的从节点,还会将命令写入复制积压缓冲区中(具体请看标题2.10)。

也就是说,复制积压缓冲区最多可以备份1M大小的数据,如果主从节点断线时间过长,复制积压缓冲区的数据会被新数据覆盖,那么当从主从中断连接起,主节点接收到的数据超过1M大小,那么从节点就无法进行部分重同步,只能进行全量复制。

在标题2.8,介绍的syncCommand()命令中,调用masterTryPartialResynchronization()函数会进行尝试部分重同步,在我们之前分析的第一次全量同步时,该函数会执行失败,然后返回syncCommand()函数执行全量同步,而在进行恢复主从连接后,则会进行部分重同步,masterTryPartialResynchronization()函数代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
// 该函数从主节点接收到部分重新同步请求的角度处理PSYNC命令
// 成功返回C_OK,否则返回C_ERR
int masterTryPartialResynchronization(client *c) {
long long psync_offset, psync_len;
char *master_runid = c->argv[1]->ptr; //主节点的运行ID
char buf[128];
int buflen;
// 主节点的运行ID是否和从节点执行PSYNC的参数提供的运行ID相同。
// 如果运行ID发生了改变,则主节点是一个不同的实例,那么就不能进行继续执行原有的复制进程
if (strcasecmp(master_runid, server.runid)) {
/* Run id "?" is used by slaves that want to force a full resync. */
// 如果从节点的运行ID是"?",表示想要强制进行一个全量同步
if (master_runid[0] != '?') {
serverLog(LL_NOTICE,"Partial resynchronization not accepted: "
"Runid mismatch (Client asked for runid '%s', my runid is '%s')",
master_runid, server.runid);
} else {
serverLog(LL_NOTICE,"Full resync requested by slave %s",
replicationGetSlaveName(c));
}
goto need_full_resync;
}
// 从参数对象中获取psync_offset
if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
C_OK) goto need_full_resync;
// 如果psync_offset小于repl_backlog_off,说明backlog所备份的数据的已经太新了,有一些数据被覆盖,则需要进行全量复制
// 如果psync_offset大于(server.repl_backlog_off + server.repl_backlog_histlen),表示当前backlog的数据不够全,则需要进行全量复制
if (!server.repl_backlog ||
psync_offset < server.repl_backlog_off ||
psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
{
serverLog(LL_NOTICE,
"Unable to partial resync with slave %s for lack of backlog (Slave request was: %lld).", replicationGetSlaveName(c), psync_offset);
if (psync_offset > server.master_repl_offset) {
serverLog(LL_WARNING,
"Warning: slave %s tried to PSYNC with an offset that is greater than the master replication offset.", replicationGetSlaveName(c));
}
goto need_full_resync;
}
// 执行到这里,则可以进行部分重同步
// 1. 设置client状态为从节点
// 2. 向从节点发送 +CONTINUE 表示接受 partial resync 被接受
// 3. 发送backlog的数据给从节点
// 设置client状态为从节点
c->flags |= CLIENT_SLAVE;
// 设置复制状态为在线,此时RDB文件传输完成,发送差异数据
c->replstate = SLAVE_STATE_ONLINE;
// 设置从节点收到ack的时间
c->repl_ack_time = server.unixtime;
// slave向master发送ack标志设置为0
c->repl_put_online_on_ack = 0;
// 将当前client加入到从节点链表中
listAddNodeTail(server.slaves,c);
// 向从节点发送 +CONTINUE
buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
if (write(c->fd,buf,buflen) != buflen) {
freeClientAsync(c);
return C_OK;
}
// 将backlog的数据发送从节点
psync_len = addReplyReplicationBacklog(c,psync_offset);
serverLog(LL_NOTICE,
"Partial resynchronization request from %s accepted. Sending %lld bytes of backlog starting from offset %lld.", replicationGetSlaveName(c), psync_len, psync_offset);
// 计算延迟值小于min-slaves-max-lag的从节点的个数
refreshGoodSlavesCount();
return C_OK; /* The caller can return, no full resync needed. */
need_full_resync:
return C_ERR;
}

如果可以进行部分重同步,主节点则会发送”+CONTINUE\r\n”作为从节点发送PSYNC回复(看标题2.8)。然后调用addReplyReplicationBacklog()函数,将backlog中的数据发送给从节点。于是就完成了部分重同步。

addReplyReplicationBacklog()函数所做的就是将backlog写到从节点的client的输出缓冲区中。

HeLei Blog

Redis源码剖析和注释(十二)--- Redis 复制(replicate)源码详细解析

发表于 2018-02-23 | 分类于 Redis

1. 复制介绍

分布式数据库为了获取更大的存储容量和更高的并发访问量,会将原来集中式数据库中的数据分散存储到多个通过网络连接的数据存储节点上。Redis为了解决单点数据库问题,会把数据复制多个副本部署到其他节点上,通过复制,实现Redis的高可用性,实现对数据的冗余备份,保证数据和服务的高度可靠性。

2. 复制的建立

建立复制的配置方式有三种。

  • 在redis.conf文件中配置slaveof 选项,然后指定该配置文件启动Redis生效。
  • 在redis-server启动命令后加上–slaveof 启动生效。
  • 直接使用 slaveof 命令在从节点执行生效。
    我们以最简单的一主一从模型,使用第2种方式建立复制。

  • 首先先开启主节点master实例,端口8888

    1
    redis-server --port 8888
  • 接着开启从节点slave实例,端口9999,并指定指定主节点。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    redis-server --port 9999 --slaveof 127.0.0.1 8888
    //命令行开启Redis服务器后,会打印如下日志信息,已经开启复制了
    Connecting to MASTER 127.0.0.1:8888
    MASTER <-> SLAVE sync started
    Non blocking connect for SYNC fired the event.
    Master replied to PING, replication can continue...
    Partial resynchronization not possible (no cached master)
    Full resync from master: 1aff09ecd70ca640e33083f8422018b29883b9d1:1
    MASTER <-> SLAVE sync: receiving 76 bytes from master
    MASTER <-> SLAVE sync: Flushing old data
    MASTER <-> SLAVE sync: Loading DB in memory
    MASTER <-> SLAVE sync: Finished with success
  • 开启一个client,连接上从节点服务器。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    ➜ ~ redis-cli -p 9999
    127.0.0.1:9999> INFO replication
    # Replication
    role:slave //节点角色
    master_host:127.0.0.1 //主节点的IP
    master_port:8888 //主节点的端口
    master_link_status:up //与主节点的连接状态
    master_last_io_seconds_ago:0 //主节点最后与从节点的通信时间间隔,单位秒
    master_sync_in_progress:0 //从节点是否正在全量同步主节点的RDB文件
    slave_repl_offset:407 //复制偏移量
    slave_priority:100 //从节点的优先级
    slave_read_only:1 //从节点是否只读
    connected_slaves:0 //连接从节点的个数
    master_repl_offset:0 //当前从节点作为其他从节点的主节点时的复制偏移量
    //以下四种信息为通用的配置
    repl_backlog_active:0 //复制缓冲区的状态
    repl_backlog_size:1048576 //复制缓冲区的大小
    repl_backlog_first_byte_offset:0//复制缓冲区起始偏移量,标识当前缓冲区可用的范围
    repl_backlog_histlen:0 //标识复制缓冲区已存在的有效数据长度
    127.0.0.1:9999> KEYS * //由于主节点的键空间为空,所以从节点的键空间也为空。
    (empty list or set)

此时,我们查看主节点的INFO replication 信息

1
2
3
4
5
6
7
8
9
10
11
12
//通过INFO replication命令可以查看当前的复制信息
127.0.0.1:6380> INFO replication
# Replication
role:master //节点角色
connected_slaves:1 //连接从节点的个数
slave0:ip=127.0.0.1,port=9999,state=online,offset=631,lag=0 //连接从节点的信息
master_repl_offset:631 //主节点的偏移量
//以下四种信息为通用的配置
repl_backlog_active:1 //复制缓冲区的状态
repl_backlog_size:1048576 //复制缓冲区的大小
repl_backlog_first_byte_offset:2 //复制缓冲区起始偏移量,标识当前缓冲区可用的范围
repl_backlog_histlen:630 //由于主节点的键空间为空,所以从节点的键空间也为空。

到此,一主一从模型的复制就建立成功了。我们可以在主节点建立一些新的键,然后查看从节点的键空间的变化

1
2
3
4
5
//根据端口区别主从节点
127.0.0.1:8888> HSET hash_key hello world
(integer) 1
127.0.0.1:9999> KEYS * //从节点的键空间已经更新
1) "hash_key"

我们在查看从节点的INFO replication信息时,可以知道slave_read_only:1,从节点默认只能读不能写,因此执行写命令会得到如下回复:

1
2
127.0.0.1:9999> SET key value
(error) READONLY You can't write against a read only slave.

只读模式由 redis.conf 文件中的 slave-read-only 选项控制,也可以通过 CONFIG SET命令来开启或关闭这个模式。

我们在查看从节点的INFO replication信息时,还发现有connected_slaves:0选项 ,说明Redis的复制拓扑结构支持单层或多层复制关系,从节点还可以作为其他从节点的主节点进行复制。

根据拓扑关系可以分为三种:

  • 一主一从
    replicate01

  • 一主多从
    replicate01

  • 树型主从结构
    replicate01

3. 复制的断开

复制断开也是在从节点执行命令slaveof no one来断开于主节点的复制关系。例如,将刚才端口为9999的从节点断开复制:

1
2
3
4
5
6
7
8
9
127.0.0.1:9999> SLAVEOF no one
OK
//从节点服务器会打印如下日志
Connection with master lost.
Caching the disconnected master state.
Discarding previously cached master state.
MASTER MODE enabled (user request from 'id=3 addr=127.0.0.1:40218 fd=7 name= age=2218 idle=0 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=32768 obl=0 oll=0 omem=0 events=r cmd=slaveof')
//主节点服务器也会打印断开的信息
Connection with slave 127.0.0.1:9999 lost.

从节点服务器断开后,从节点会晋升为主节点。从日志中可以看到MASTER MODE enabled,也可以从INFO命令查看到

1
2
3
4
127.0.0.1:9999> INFO replication
# Replication
role:master //角色发生变化
......

4. min-slaves配置选项

Redis的min-slaves-to-write和min-slaves-max-lag两个选项可以防止主节点在不安全的情况下执行写命令。

1
2
min-slaves-to-write 3 //从节点数量少于3个,主节点拒绝执行写命令
min-slaves-max-lag 10 //3个从节点的延迟(lag)值,大于或等于10,主节点拒绝执行写命令

HeLei Blog

Redis源码剖析和注释(九)--- 字符串命令的实现(t_string)

发表于 2018-02-10 | 分类于 Redis

1. 字符串命令介绍

序号 命令及描述
1 SET key value:设置指定 key 的值
2 GET key: 获取指定 key 的值。
3 GETRANGE key start end: 返回 key 中字符串值的子字符
4 GETSET key value:将给定 key 的值设为 value ,并返回 key 的旧值(old value)。
5 GETBIT key offset:对 key 所储存的字符串值,获取指定偏移量上的位(bit)。
6 MGET key1 [key2..]:获取所有(一个或多个)给定 key 的值。
7 SETBIT key offset value:对 key 所储存的字符串值,设置或清除指定偏移量上的位(bit)。
8 SETEX key seconds value:将值 value 关联到 key ,并将 key 的过期时间设为 seconds (以秒为单位)。
9 SETNX key value:只有在 key 不存在时设置 key 的值。
10 SETRANGE key offset value:用 value 参数覆写给定 key 所储存的字符串值,从偏移量 offset 开始。
11 STRLEN key:返回 key 所储存的字符串值的长度。
12 MSET key value [key value …]:同时设置一个或多个 key-value 对。
13 MSETNX key value [key value …]:同时设置一个或多个 key-value 对,当且仅当所有给定 key 都不存在。
14 PSETEX key milliseconds value:这个命令和 SETEX 命令相似,但它以毫秒为单位设置 key 的生存时间,而不是像 SETEX 命令那样,以秒为单位。
15 INCR key:将 key 中储存的数字值增一。
16 INCRBY key increment将 key: 所储存的值加上给定的增量值(increment) 。
17 INCRBYFLOAT key increment:将 key 所储存的值加上给定的浮点增量值(increment) 。
18 DECR key:将 key 中储存的数字值减一。
19 DECRBY key decrementkey: 所储存的值减去给定的减量值(decrement) 。
20 APPEND key value:如果 key 已经存在并且是一个字符串, APPEND 命令将 value 追加到 key 原来的值的末尾。

2. 字符串命令的实现

字符串命令底层数据结构为 简单动态字符串SDS 。对于字符串命令,无论是命令本身还是参数,都是作为成一个对象对待的。关于redis的对象系统,请参考文章:redis对象系统源码剖析和注释。

在redis的对象系统中,字符串对象的底层实现类型有如下三种:

编码—encoding 对象—ptr
OBJ_ENCODING_RAW 简单动态字符串实现的字符串对象
OBJ_ENCODING_INT 整数值实现的字符串对象
OBJ_ENCODING_EMBSTR embstr编码的简单动态字符串实现的字符串对象

因此,一个字符串对象的结构定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
typedef struct redisObject {
//对象的数据类型,字符串对象应该为 OBJ_STRING
unsigned type:4;
//对象的编码类型,分别为OBJ_STRING、OBJ_ENCODING_INT或OBJ_ENCODING_EMBSTR
unsigned encoding:4;
//暂且不关心该成员
unsigned lru:LRU_BITS; /* lru time (relative to server.lruclock) */
//引用计数
int refcount;
//指向底层数据实现的指针
void *ptr;
} robj;

我们假设一个key的值为”Hello World” ,因此它的空间结构如图所示:
[url01]

3. 字符串命令源码注释

这里列出几个重要的命令。

3.1 SET 一类命令的最底层实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
#define OBJ_SET_NO_FLAGS 0
#define OBJ_SET_NX (1<<0) /* Set if key not exists. */ //在key不存在的情况下才会设置
#define OBJ_SET_XX (1<<1) /* Set if key exists. */ //在key存在的情况下才会设置
#define OBJ_SET_EX (1<<2) /* Set if time in seconds is given */ //以秒(s)为单位设置键的key过期时间
#define OBJ_SET_PX (1<<3) /* Set if time in ms in given */ //以毫秒(ms)为单位设置键的key过期时间
//setGenericCommand()函数是以下命令: SET, SETEX, PSETEX, SETNX.的最底层实现
//flags 可以是NX或XX,由上面的宏提供
//expire 定义key的过期时间,格式由unit指定
//ok_reply和abort_reply保存着回复client的内容,NX和XX也会改变回复
//如果ok_reply为空,则使用 "+OK"
//如果abort_reply为空,则使用 "$-1"
void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
long long milliseconds = 0; /* initialized to avoid any harmness warning */ //初始化,避免错误
//如果定义了key的过期时间
if (expire) {
//从expire对象中取出值,保存在milliseconds中,如果出错发送默认的信息给client
if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != C_OK)
return;
// 如果过期时间小于等于0,则发送错误信息给client
if (milliseconds <= 0) {
addReplyErrorFormat(c,"invalid expire time in %s",c->cmd->name);
return;
}
//如果unit的单位是秒,则需要转换为毫秒保存
if (unit == UNIT_SECONDS) milliseconds *= 1000;
}
//lookupKeyWrite函数是为执行写操作而取出key的值对象
//如果设置了NX(不存在),并且在数据库中 找到 该key,或者
//设置了XX(存在),并且在数据库中 没有找到 该key
//回复abort_reply给client
if ((flags & OBJ_SET_NX && lookupKeyWrite(c->db,key) != NULL) ||
(flags & OBJ_SET_XX && lookupKeyWrite(c->db,key) == NULL))
{
addReply(c, abort_reply ? abort_reply : shared.nullbulk);
return;
}
//在当前db设置键为key的值为val
setKey(c->db,key,val);
//设置数据库为脏(dirty),服务器每次修改一个key后,都会对脏键(dirty)增1
server.dirty++;
//设置key的过期时间
//mstime()返回毫秒为单位的格林威治时间
if (expire) setExpire(c->db,key,mstime()+milliseconds);
//发送"set"事件的通知,用于发布订阅模式,通知客户端接受发生的事件
notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);
//发送"expire"事件通知
if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC,
"expire",key,c->db->id);
//设置成功,则向客户端发送ok_reply
addReply(c, ok_reply ? ok_reply : shared.ok);
}

3.2 GET 一类命令的最底层实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//GET 命令的底层实现
int getGenericCommand(client *c) {
robj *o;
//lookupKeyReadOrReply函数是为执行读操作而返回key的值对象,找到返回该对象,找不到会发送信息给client
//如果key不存在直接,返回0表示GET命令执行成功
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL)
return C_OK;
//如果key的值的编码类型不是字符串对象
if (o->type != OBJ_STRING) {
addReply(c,shared.wrongtypeerr); //返回类型错误的信息给client,返回-1表示GET命令执行失败
return C_ERR;
} else {
addReplyBulk(c,o); //返回之前找到的对象作为回复给client,返回0表示GET命令执行成功
return C_OK;
}
}

3.3 DECR 和 INCR 底层实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// DECR key
// INCR key
//INCR和DECR命令的底层实现
void incrDecrCommand(client *c, long long incr) {
long long value, oldvalue;
robj *o, *new;
o = lookupKeyWrite(c->db,c->argv[1]); //以写操作获取key的value对象
//找到了value对象但是value对象不是字符串类型,直接返回
if (o != NULL && checkType(c,o,OBJ_STRING)) return;
//将字符串类型的value转换为longlong类型保存在value中
if (getLongLongFromObjectOrReply(c,o,&value,NULL) != C_OK) return;
oldvalue = value; //备份旧的value
//如果incr超出longlong类型所能表示的范围,发送错误信息
if ((incr < 0 && oldvalue < 0 && incr < (LLONG_MIN-oldvalue)) ||
(incr > 0 && oldvalue > 0 && incr > (LLONG_MAX-oldvalue))) {
addReplyError(c,"increment or decrement would overflow");
return;
}
value += incr; //计算新的value值
//value对象目前非共享,编码为整型类型,且新value值不在共享范围,且value处于long类型所表示的范围内
if (o && o->refcount == 1 && o->encoding == OBJ_ENCODING_INT &&
(value < 0 || value >= OBJ_SHARED_INTEGERS) &&
value >= LONG_MIN && value <= LONG_MAX)
{
new = o;
o->ptr = (void*)((long)value); //设置vlaue对象的值
} else {
//当不满足以上任意条件,则新创建一个字符串对象
new = createStringObjectFromLongLong(value);
//如果之前的value对象存在
if (o) {
dbOverwrite(c->db,c->argv[1],new); //用new对象去重写key的值
} else {
dbAdd(c->db,c->argv[1],new); //如果之前的value不存在,将key和new组成新的key-value对
}
}
signalModifiedKey(c->db,c->argv[1]); //当数据库的键被改动,则会调用该函数发送信号
//发送"incrby"事件通知
notifyKeyspaceEvent(NOTIFY_STRING,"incrby",c->argv[1],c->db->id);
//设置脏键
server.dirty++;
//回复信息给client
addReply(c,shared.colon);
addReply(c,new);
addReply(c,shared.crlf);
}

.4 APPEND 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// APPEND key value
// APPEND命令的实现
void appendCommand(client *c) {
size_t totlen;
robj *o, *append;
o = lookupKeyWrite(c->db,c->argv[1]); //以写操作获取key的value对象
//如果没有获取到vlaue,则要创建一个
if (o == NULL) {
/* Create the key */
c->argv[2] = tryObjectEncoding(c->argv[2]); //对参数value进行优化编码
dbAdd(c->db,c->argv[1],c->argv[2]); //将key和value组成新的key-value对
incrRefCount(c->argv[2]); //增加value的引用计数
totlen = stringObjectLen(c->argv[2]); //返回vlaue的长度
} else { //获取到value
/* Key exists, check type */
if (checkType(c,o,OBJ_STRING)) //如果value不是字符串类型的对象直接返回
return;
/* "append" is an argument, so always an sds */
//获得追加的值对象
append = c->argv[2];
//计算追加后的长度
totlen = stringObjectLen(o)+sdslen(append->ptr);
//如果追加后的长度超出范围,则返回
if (checkStringLength(c,totlen) != C_OK)
return;
/* Append the value */
//因为要根据value修改key的值,因此如果key原来的值是共享的,需要解除共享,新创建一个值对象与key组对
o = dbUnshareStringValue(c->db,c->argv[1],o);
//将vlaue对象的值后面追加上append的值
o->ptr = sdscatlen(o->ptr,append->ptr,sdslen(append->ptr));
//计算出追加后值的长度
totlen = sdslen(o->ptr);
}
signalModifiedKey(c->db,c->argv[1]);//当数据库的键被改动,则会调用该函数发送信号
//发送"append"事件通知
notifyKeyspaceEvent(NOTIFY_STRING,"append",c->argv[1],c->db->id);
//设置脏键
server.dirty++;
//发送追加后value的长度给client
addReplyLongLong(c,totlen);
}
HeLei Blog

Redis源码剖析和注释(二十一)--- 单机服务器实现

发表于 2018-02-10 | 分类于 Redis

1. Redis 服务器

Redis服务器负责与客户端建立网络连接,处理发送的命令请求,在数据库中保存客户端执行命令所产生的数据,并且通过一系列资源管理措施来维持服务器自身的正常运转。本次主要剖析server.c文件,本文主要介绍Redis服务器的一下几个实现:

  • 命令的执行过程
  • Redis服务器的周期性任务
  • maxmemory的策略
  • Redis服务器的main函数

2. 命令的执行过程

Redis一个命令的完整执行过程如下:

  • 客户端发送命令请求
  • 服务器接收命令请求
  • 服务器执行命令请求
  • 将回复发送给客户端
    关于命令接收与命令回复,在Redis 网络连接库剖析一文已经详细剖析过,本篇主要针对第三步,也就是服务器执行命令的过程进行剖析。

服务器在接收到命令后,会将命令以对象的形式保存在服务器client的参数列表robj **argv中,因此服务器执行命令请求时,服务器已经读入了一套命令参数保存在参数列表中。执行命令的过程对应的函数是processCommand(),源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
// 如果client没有被关闭则返回C_OK,调用者可以继续执行其他的操作,否则返回C_ERR,表示client被销毁
int processCommand(client *c) {
// 如果是 quit 命令,则单独处理
if (!strcasecmp(c->argv[0]->ptr,"quit")) {
addReply(c,shared.ok);
c->flags |= CLIENT_CLOSE_AFTER_REPLY; //设置client的状态为回复后立即关闭,返回C_ERR
return C_ERR;
}
// 从数据库的字典中查找该命令
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
// 不存在的命令
if (!c->cmd) {
flagTransaction(c); //如果是事务状态的命令,则设置事务为失败
addReplyErrorFormat(c,"unknown command '%s'",
(char*)c->argv[0]->ptr);
return C_OK;
// 参数数量不匹配
} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
(c->argc < -c->cmd->arity)) {
flagTransaction(c); //如果是事务状态的命令,则设置事务为失败
addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
c->cmd->name);
return C_OK;
}
/* Check if the user is authenticated */
// 如果服务器设置了密码,但是没有认证成功
if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)
{
flagTransaction(c); //如果是事务状态的命令,则设置事务为失败
addReply(c,shared.noautherr);
return C_OK;
}
// 如果开启了集群模式,则执行集群的重定向操作,下面的两种情况例外:
/*
1. 命令的发送是主节点服务器
2. 命令没有key
*/
if (server.cluster_enabled &&
!(c->flags & CLIENT_MASTER) &&
!(c->flags & CLIENT_LUA &&
server.lua_caller->flags & CLIENT_MASTER) &&
!(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 &&
c->cmd->proc != execCommand))
{
int hashslot;
int error_code;
// 从集群中返回一个能够执行命令的节点
clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
&hashslot,&error_code);
// 返回的节点不合格
if (n == NULL || n != server.cluster->myself) {
// 如果是执行事务的命令,则取消事务
if (c->cmd->proc == execCommand) {
discardTransaction(c);
} else {
// 将事务状态设置为失败
flagTransaction(c);
}
// 执行client的重定向操作
clusterRedirectClient(c,n,hashslot,error_code);
return C_OK;
}
}
// 如果服务器有最大内存的限制
if (server.maxmemory) {
// 按需释放一部分内存
int retval = freeMemoryIfNeeded();
// freeMemoryIfNeeded()函数之后需要冲洗从节点的输出缓冲区,这可能导致被释放的从节点是一个活跃的client
// 如果当前的client被释放,返回C_ERR
if (server.current_client == NULL) return C_ERR;
// 如果命令会耗费大量的内存但是释放内存失败
if ((c->cmd->flags & CMD_DENYOOM) && retval == C_ERR) {
// 将事务状态设置为失败
flagTransaction(c);
addReply(c, shared.oomerr);
return C_OK;
}
}
// 如果 BGSAVE 命令执行错误而且服务器是一个主节点,那么不接受写命令
if (((server.stop_writes_on_bgsave_err &&
server.saveparamslen > 0 &&
server.lastbgsave_status == C_ERR) ||
server.aof_last_write_status == C_ERR) &&
server.masterhost == NULL &&
(c->cmd->flags & CMD_WRITE ||
c->cmd->proc == pingCommand))
{
// 将事务状态设置为失败
flagTransaction(c);
// 如果上一次执行AOF成功回复BGSAVE错误回复
if (server.aof_last_write_status == C_OK)
addReply(c, shared.bgsaveerr);
else
addReplySds(c,
sdscatprintf(sdsempty(),
"-MISCONF Errors writing to the AOF file: %s\r\n",
strerror(server.aof_last_write_errno)));
return C_OK;
}
// 如果没有足够的良好的从节点而且用户配置了 min-slaves-to-write,那么不接受写命令
if (server.masterhost == NULL &&
server.repl_min_slaves_to_write &&
server.repl_min_slaves_max_lag &&
c->cmd->flags & CMD_WRITE &&
server.repl_good_slaves_count < server.repl_min_slaves_to_write)
{
// 将事务状态设置为失败
flagTransaction(c);
addReply(c, shared.noreplicaserr);
return C_OK;
}
// 如果这是一个只读的从节点服务器,则不接受写命令
if (server.masterhost && server.repl_slave_ro &&
!(c->flags & CLIENT_MASTER) &&
c->cmd->flags & CMD_WRITE)
{
addReply(c, shared.roslaveerr);
return C_OK;
}
// 如果处于发布订阅模式,但是执行的不是发布订阅命令,返回
if (c->flags & CLIENT_PUBSUB &&
c->cmd->proc != pingCommand &&
c->cmd->proc != subscribeCommand &&
c->cmd->proc != unsubscribeCommand &&
c->cmd->proc != psubscribeCommand &&
c->cmd->proc != punsubscribeCommand) {
addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context");
return C_OK;
}
// 如果是从节点且和主节点断开了连接,不允许从服务器带有过期数据,返回
if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&
server.repl_serve_stale_data == 0 &&
!(c->cmd->flags & CMD_STALE))
{
flagTransaction(c);
addReply(c, shared.masterdownerr);
return C_OK;
}
// 如果服务器处于载入状态,如果命令不是CMD_LOADING标识,则不执行,返回
if (server.loading && !(c->cmd->flags & CMD_LOADING)) {
addReply(c, shared.loadingerr);
return C_OK;
}
// 如果lua脚本超时,限制执行一部分命令,如shutdown、scriptCommand
if (server.lua_timedout &&
c->cmd->proc != authCommand &&
c->cmd->proc != replconfCommand &&
!(c->cmd->proc == shutdownCommand &&
c->argc == 2 &&
tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&
!(c->cmd->proc == scriptCommand &&
c->argc == 2 &&
tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))
{
flagTransaction(c);
addReply(c, shared.slowscripterr);
return C_OK;
}
// 执行命令
// client处于事务环境中,但是执行命令不是exec、discard、multi和watch
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
// 除了上述的四个命令,其他的命令添加到事务队列中
queueMultiCommand(c);
addReply(c,shared.queued);
// 执行普通的命令
} else {
call(c,CMD_CALL_FULL);
// 保存写全局的复制偏移量
c->woff = server.master_repl_offset;
// 如果因为BLPOP而阻塞的命令已经准备好,则处理client的阻塞状态
if (listLength(server.ready_keys))
handleClientsBlockedOnLists();
}
return C_OK;
}

我们总结出执行命令的大致过程:

  • 查找命令。对应的代码是:c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
  • 执行命令前的准备。对应这些判断语句。
  • 执行命令。对应代码是:call(c,CMD_CALL_FULL);

我们就大致就这三个过程详细解释。

2.1 查找命令

lookupCommand()函数是对dictFetchValue(server.commands, name);的封装。而这个函数的意思是:从server.commands字典中查找name命令。这个保存命令表的字典,键是命令的名称,值是命令表的地址。因此我们介绍服务器初始化时的一个操作,就是创建一张命令表。命令表代码简化表示如下:

1
2
3
4
5
struct redisCommand redisCommandTable[] = {
{"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
{"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
......
};

我们只展示了命令表的两条,可以通过COMMAND COUNT命令查看命令的个数。虽然只有两条,但是可以说明问题。

首先命令表是就是一个数组,数组的每个成员都是一个struct redisCommand结构体,对每个数组成员都进行了初始化。我们一次对每个值进行分析:以GET命令为例子。

  • char *name:命令的名字。对应 “get”。
  • redisCommandProc *proc:命令实现的函数。对应 getCommand。
  • int arity:参数个数,-N表示大于等于N。对应2。
  • char *sflags:命令的属性,用以下字符作为标识。对应”rF”。
    • w:写入命令,会修改数据库。
    • r:读取命令,不会修改数据库。
    • m:一旦执行会增加内存使用,如果内存短缺则不被允许执行。
    • a:管理员命令,例如:SAVE or SHUTDOWN。
    • p:发布订阅有关的命令。
    • f:强制进行复制的命令,无视服务器的脏键。
    • s:不能在脚本中执行的命令。
    • R:随机命令。相同的键有相同的参数,在相同的数据库中,可能会有不同的结果。
    • S:如果在脚本中调用,那么会对这个命令的输出进行一次排序。
    • l:当载入数据库时,允许执行该命令。
    • t:从节点服务器持有过期数据时,允许执行的命令。
    • M:不能在 MONITOR 下自动传播的命令。
    • k:为该命令执行一个隐式的ASKING,所以在集群模式下,如果槽被标记为’importing’,那这个命令会被接收。
      *F:快速执行的命令。时间复杂度为O(1) or O(log(N))的命令只要内核调度为Redis分配时间片,那么就不应该在执行时被延迟。
  • int flags:sflags的二进制标识形式,可以通过位运算进行组合。对应0。
  • redisGetKeysProc *getkeys_proc:从命令中获取键的参数,是一个可选的功能,一般用于三个字段不够执行键的参数的情况。对应NULL。
  • int firstkey:第一个参数是 key。对应1。
  • int lastkey:最后一个参数是 key。对应1。
  • int keystep:从第一个 key 到最后一个 key 的步长。MSET 的步长是 2 因为:key,val,key,val,…。对应1。
  • long long microseconds:记录执行命令的耗费总时长。对应0。
  • long long calls:记录命令被执行的总次数。对应0。
    当从命令表中找到命令后,会将找到的命令的地址,返回给struct redisCommand cmd, lastcmd;这两个指针保存起来。到此查找命令的操作就完成。

2.2 执行命令前的准备

此时,命令已经在命令表中查找到,并且保存在了对应的指针中。但是真正执行前,还进行了许多的情况的判断。我们简单列举几种。

  • 首先就是判断命令的参数是否匹配。
  • 检查服务器的认证是否通过。
  • 集群模式下的判断。
  • 服务器最大内存限制是否通过。
  • 某些情况下,不接受写命令。
  • 发布订阅模式。
  • 是否是lua脚本中的命令。
    等等……
    所以,命令执行的过程还是很复杂的,简单总结一句:命令不易,何况人生。

2.3 执行命令
执行命令调用了call(c,CMD_CALL_FULL)函数,该函数是执行命令的核心。但是不用想,这个函数一定是对回调函数c->cmd->proc(c)的封装,因为proc指向命令的实现函数。我们贴出该函数的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
void call(client *c, int flags) {
long long dirty, start, duration;
int client_old_flags = c->flags; //备份client的flags
// 将命令发送给 MONITOR
if (listLength(server.monitors) &&
!server.loading &&
!(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN)))
{
replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
}
// 清除一些需要按照命令需求设置的标志,以防干扰
c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
// 初始化Redis操作数组,用来追加命令的传播
redisOpArrayInit(&server.also_propagate);
/* Call the command. */
// 备份脏键数
dirty = server.dirty;
// 获取执行命令的开始时间
start = ustime();
// 执行命令
c->cmd->proc(c);
// 命令的执行时间
duration = ustime()-start;
// 命令修改的键的个数
dirty = server.dirty-dirty;
if (dirty < 0) dirty = 0;
// 当执行 EVAL 命令时正在加载AOF,而且不希望Lua调用的命令进入slowlog或填充统计信息
if (server.loading && c->flags & CLIENT_LUA)
flags &= ~(CMD_CALL_SLOWLOG | CMD_CALL_STATS); //取消慢查询和记录统计信息的标志
// 如果函数调用者是Lua脚本,且命令的flags或客户端的flags指定了强制传播,我们要强制EVAL调用者传播脚本
if (c->flags & CLIENT_LUA && server.lua_caller) {
// 如果指定了强制将命令传播到从节点
if (c->flags & CLIENT_FORCE_REPL)
server.lua_caller->flags |= CLIENT_FORCE_REPL; //强制执行lua脚本的client要传播命令到从节点
// 如果指定了强制将节点传播到AOF中
if (c->flags & CLIENT_FORCE_AOF)
server.lua_caller->flags |= CLIENT_FORCE_AOF; //强制执行lua脚本的client要传播命令到AOF文件
}
// 命令的flags指定了慢查询标志,要将总的统计信息推入慢查询日志中
if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) {
char *latency_event = (c->cmd->flags & CMD_FAST) ?
"fast-command" : "command";
// 记录将延迟事件和延迟时间关联到延迟诊断的字典中
latencyAddSampleIfNeeded(latency_event,duration/1000);
// 将总的统计信息推入慢查询日志中
slowlogPushEntryIfNeeded(c->argv,c->argc,duration);
}
// 命令的flags指定了CMD_CALL_STATS,更新命令的统计信息
if (flags & CMD_CALL_STATS) {
c->lastcmd->microseconds += duration;
c->lastcmd->calls++;
}
// 如果client设置了强制传播的标志或修改了数据集,则将命令发送给从节点服务器或追加到AOF中
if (flags & CMD_CALL_PROPAGATE &&
(c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
{
// 保存传播的标志,初始化为空
int propagate_flags = PROPAGATE_NONE;
// 如果命令修改了数据库中的键,则要传播到AOF和从节点中
if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);
// 如果client设置了强制AOF和复制的标志,则设置传播的标志
if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;
// 如果client的flags设置了CLIENT_PREVENT_REPL/AOF_PROP,表示阻止命令的传播到从节点或AOF,则取消传播对应标志
if (c->flags & CLIENT_PREVENT_REPL_PROP ||
!(flags & CMD_CALL_PROPAGATE_REPL))
propagate_flags &= ~PROPAGATE_REPL;
if (c->flags & CLIENT_PREVENT_AOF_PROP ||
!(flags & CMD_CALL_PROPAGATE_AOF))
propagate_flags &= ~PROPAGATE_AOF;
// 如果至少设置了一种传播,则执行相应传播命令操作
if (propagate_flags != PROPAGATE_NONE)
propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
}
// 清除一些需要按照命令需求设置的标志,以防干扰
c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
// 恢复client原始的flags
c->flags |= client_old_flags &
(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
// 传播追加在Redis操作数组中的命令
if (server.also_propagate.numops) {
int j;
redisOp *rop;
// 如果命令的flags设置传播的标志
if (flags & CMD_CALL_PROPAGATE) {
// 遍历所有的命令
for (j = 0; j < server.also_propagate.numops; j++) {
rop = &server.also_propagate.ops[j];
int target = rop->target;
/* Whatever the command wish is, we honor the call() flags. */
// 执行相应传播命令操作
if (!(flags&CMD_CALL_PROPAGATE_AOF)) target &= ~PROPAGATE_AOF;
if (!(flags&CMD_CALL_PROPAGATE_REPL)) target &= ~PROPAGATE_REPL;
if (target)
propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target);
}
}
// 释放Redis操作数组
redisOpArrayFree(&server.also_propagate);
}
// 命令执行的次数加1
server.stat_numcommands++;
}

执行命令时,可以指定一个flags。这个flags是用于执行完命令之后的一些后续工作。我们说明这些flags的含义:

1
2
3
4
5
6
7
CMD_CALL_NONE:没有指定flags
CMD_CALL_SLOWLOG:检查命令的执行速度,如果需要记录在慢查询日志中
CMD_CALL_STATS:记录命令的统计信息
CMD_CALL_PROPAGATE_AOF:如果client设置了强制传播的标志或修改了数据集,则将命令追加到AOF文件中
CMD_CALL_PROPAGATE_REPL:如果client设置了强制传播的标志或修改了数据集,则将命令发送给从节点服务器中
CMD_CALL_PROPAGATE:如果client设置了强制传播的标志或修改了数据集,则将命令发送给从节点服务器或追加到AOF中
CMD_CALL_FULL:包含以上所有的含义

执行命令c->cmd->proc(c)就相当于执行了命令实现的函数,然后会在执行完成后,由这些函数产生相应的命令回复,根据回复的大小,会将回复保存在输出缓冲区buf或回复链表repl中。然后服务器会调用writeToClient()函数来将回复写到fd中。详细请看:Redis 网络连接库剖析。

至此,一条命令的执行过程就很清楚明了了。

3. Redis服务器的周期性任务

我们曾经在Redis 事件处理实现一文中说到,Redis的事件分为文件事件(file event)和时间事件(time event)。时间事件虽然是晚于文件事件执行,但是会每隔100ms都会执行一次。话不多说直接上代码:Redis 单机服务器实现源码注释

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
// 使用一个宏定义:run_with_period(milliseconds) { .... },实现一部分代码有次数限制的被执行
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
int j;
UNUSED(eventLoop);
UNUSED(id);
UNUSED(clientData);
// 如果设置了看门狗,则在过期时间内,递达一个 SIGALRM 信号
if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period);
// 设置服务器的时间缓存
updateCachedTime();
// 更新服务器的一些统计值
run_with_period(100) {
// 命令执行的次数
trackInstantaneousMetric(STATS_METRIC_COMMAND,server.stat_numcommands);
// 从网络读到的字节数
trackInstantaneousMetric(STATS_METRIC_NET_INPUT,
server.stat_net_input_bytes);
// 已经写到网络的字节数
trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT,
server.stat_net_output_bytes);
}
// 服务器的LRU时间表示位数为24位,因此最长表示2^24秒,大约1.5年,只要在1.5年内,该对象被访问,那么就不会出现对象的LRU时间比服务器的时钟还要年轻的现象
// LRU_CLOCK_RESOLUTION 可以改变LRU时间的精度
// 获取服务器的LRU时钟
server.lruclock = getLRUClock();
// 更新服务器的最大内存使用量峰值
if (zmalloc_used_memory() > server.stat_peak_memory)
server.stat_peak_memory = zmalloc_used_memory();
// 更新常驻内存的大小
server.resident_set_size = zmalloc_get_rss();
// 安全的关闭服务器
if (server.shutdown_asap) {
// 关闭服务器前的准备动作,成功则关闭服务器
if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0);
// 失败则打印日志
serverLog(LL_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information");
// 撤销关闭服务器标志
server.shutdown_asap = 0;
}
// 打印数据库的信息到日志中
run_with_period(5000) {
// 遍历数据库
for (j = 0; j < server.dbnum; j++) {
long long size, used, vkeys;
// 获取当前数据库的键值对字典的槽位数,键值对字典已使用的数量,过期键字典已使用的数量
size = dictSlots(server.db[j].dict);
used = dictSize(server.db[j].dict);
vkeys = dictSize(server.db[j].expires);
// 打印到日志中
if (used || vkeys) {
serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
/* dictPrintStats(server.dict); */
}
}
}
// 如果服务器不在哨兵模式下,那么周期性打印一些连接client的信息到日志中
if (!server.sentinel_mode) {
run_with_period(5000) {
serverLog(LL_VERBOSE,
"%lu clients connected (%lu slaves), %zu bytes in use",
listLength(server.clients)-listLength(server.slaves),
listLength(server.slaves),
zmalloc_used_memory());
}
}
// 执行client的周期性任务
clientsCron();
// 执行数据库的周期性任务
databasesCron();
// 如果当前没有正在进行RDB和AOF持久化操作,且AOF重写操作被提上了日程,那么在后台执行AOF的重写操作
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
server.aof_rewrite_scheduled)
{
rewriteAppendOnlyFileBackground();
}
// 如果正在进行RDB或AOF重写等操作,那么等待接收子进程发来的信息
if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||
ldbPendingChildren())
{
int statloc;
pid_t pid;
// 接收所有子进程发送的信号,非阻塞
if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
// 获取退出码
int exitcode = WEXITSTATUS(statloc);
int bysignal = 0;
// 判断子进程是否因为信号而终止,是的话,取得子进程因信号而中止的信号码
if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
// 子进程没有退出,还在进行RDB或AOF重写等操作
if (pid == -1) {
// 打印日志
serverLog(LL_WARNING,"wait3() returned an error: %s. "
"rdb_child_pid = %d, aof_child_pid = %d",
strerror(errno),
(int) server.rdb_child_pid,
(int) server.aof_child_pid);
// RDB持久化完成
} else if (pid == server.rdb_child_pid) {
// 将RDB文件写入磁盘或网络中
backgroundSaveDoneHandler(exitcode,bysignal);
// AOF持久化完成
} else if (pid == server.aof_child_pid) {
// 将重写缓冲区的命令追加AOF文件中,且进行同步操作
backgroundRewriteDoneHandler(exitcode,bysignal);
// 其他子进程,打印日志
} else {
if (!ldbRemoveChild(pid)) {
serverLog(LL_WARNING,
"Warning, detected child with unmatched pid: %ld",
(long)pid);
}
}
// 更新能否resize哈希的策略
updateDictResizePolicy();
}
// 没有正在进行RDB或AOF重写等操作,那么检查是否需要执行
} else {
// 遍历save命令的参数数组
for (j = 0; j < server.saveparamslen; j++) {
struct saveparam *sp = server.saveparams+j;
// 数据库的键被修改的次数大于SAVE命令参数指定的修改次数,且已经过了SAVE命令参数指定的秒数
if (server.dirty >= sp->changes &&
server.unixtime-server.lastsave > sp->seconds &&
(server.unixtime-server.lastbgsave_try >
CONFIG_BGSAVE_RETRY_DELAY ||
server.lastbgsave_status == C_OK))
{
serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
sp->changes, (int)sp->seconds);
// 进行 BGSAVE 操作
rdbSaveBackground(server.rdb_filename);
break;
}
}
// 是否触发AOF重写操作
if (server.rdb_child_pid == -1 &&
server.aof_child_pid == -1 &&
server.aof_rewrite_perc &&
server.aof_current_size > server.aof_rewrite_min_size)
{
// 上一次重写后的大小
long long base = server.aof_rewrite_base_size ?
server.aof_rewrite_base_size : 1;
// AOF文件增长的百分比
long long growth = (server.aof_current_size*100/base) - 100;
// 大于设置的百分比100则进行AOF后台重写
if (growth >= server.aof_rewrite_perc) {
serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
rewriteAppendOnlyFileBackground();
}
}
}
// 将AOF缓存冲洗到磁盘中
if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
// 当AOF重写操作,同样将重写缓冲区的数据刷新到AOF文件中
run_with_period(1000) {
if (server.aof_last_write_status == C_ERR)
flushAppendOnlyFile(0);
}
// 释放被设置为异步释放的client
freeClientsInAsyncFreeQueue();
// 解除client的暂停状态
clientsArePaused(); /* Don't check return value, just use the side effect. */
// 周期性执行复制的任务
run_with_period(1000) replicationCron();
/* Run the Redis Cluster cron. */
// 周期性执行集群任务
run_with_period(100) {
if (server.cluster_enabled) clusterCron();
}
//周期性执行哨兵任务
run_with_period(100) {
if (server.sentinel_mode) sentinelTimer();
}
// 清理过期的被缓存的sockets连接
run_with_period(1000) {
migrateCloseTimedoutSockets();
}
// 如果 BGSAVE 被提上过日程,那么进行BGSAVE操作,因为AOF重写操作在更新
// 注意:此代码必须在上面的replicationCron()调用之后,确保在重构此文件以保持此顺序时。 这是有用的,因为我们希望优先考虑RDB节省的复制
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
server.rdb_bgsave_scheduled &&
(server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY ||
server.lastbgsave_status == C_OK))
{
// 更新执行BGSAVE,成功则清除rdb_bgsave_scheduled标志
if (rdbSaveBackground(server.rdb_filename) == C_OK)
server.rdb_bgsave_scheduled = 0;
}
// 周期loop计数器加1
server.cronloops++;
// 返回周期,默认为100ms
return 1000/server.hz;
}

我们也是大致总结列出部分:

  • 主动删除过期的键(也可以在读数据库时被动删除)
  • 喂看门狗 watchdog
  • 更新一些统计值
  • 渐进式rehash
  • 触发 BGSAVE / AOF 的重写操作,并处理子进程的中断
  • 不同状态的client的超时
  • 复制重连
    等……
    我们重点看两个函数,一个是关于客户端资源管理的clientsCron(),一个是关于数据库资源管理的databasesCron()。

3.1客户端资源管理

服务器要定时检查client是否与服务器有交互,如果超过了设置的限制时间,则要释放client所占用的资源。具体的函数是clientsCronHandleTimeout(),它被clientsCron()函数所调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 检查超时,如果client中断超时返回非零值,函数获取当前时间作为参数因为他被一个循环中调用多次。所以调用gettimeofday()为每一次迭代都是昂贵的,而没有任何实际的效益
// client被关闭则返回1,没有关闭返回0
int clientsCronHandleTimeout(client *c, mstime_t now_ms) {
// 当前时间,单位秒
time_t now = now_ms/1000;
// 当前时间 - client上一次和服务器交互的时间 如果大于 服务器中设置client超过的最大时间
// 不检查这四类client的超时时间:slaves从节点服务器、masters主节点服务器、BLPOP被阻塞的client、订阅状态的client
if (server.maxidletime &&
!(c->flags & CLIENT_SLAVE) && /* no timeout for slaves */
!(c->flags & CLIENT_MASTER) && /* no timeout for masters */
!(c->flags & CLIENT_BLOCKED) && /* no timeout for BLPOP */
!(c->flags & CLIENT_PUBSUB) && /* no timeout for Pub/Sub clients */
(now - c->lastinteraction > server.maxidletime))
{
serverLog(LL_VERBOSE,"Closing idle client");
freeClient(c);
return 1;
// 如果client处于BLPOP被阻塞
} else if (c->flags & CLIENT_BLOCKED) {
// 如果阻塞的client的超时时间已经到达
if (c->bpop.timeout != 0 && c->bpop.timeout < now_ms) {
// 回复client一个空回复
replyToBlockedClientTimedOut(c);
// 接触client的阻塞状态
unblockClient(c);
// 如果服务器处于集群模式
} else if (server.cluster_enabled) {
// 重定向client的阻塞到其他的服务器
if (clusterRedirectBlockedClientIfNeeded(c))
// 解除阻塞
unblockClient(c);
}
}
return 0;
}

3.2 数据库资源管理

服务器要定时检查数据库的输入缓冲区是否可以resize,以节省内存资源。而resize输入缓冲区的两个条件:

  • 输入缓冲区的大小大于32K以及超过缓冲区的峰值的2倍。
  • client超过时间大于2秒,且输入缓冲区的大小超过1k
    实现的函数是clientsCronResizeQueryBuffer(),被databasesCron()函数所调用。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    // resize客户端的输入缓冲区
    int clientsCronResizeQueryBuffer(client *c) {
    // 获取输入缓冲区的大小
    size_t querybuf_size = sdsAllocSize(c->querybuf);
    // 计算服务器对于client的空转时间,也就是client的超时时间
    time_t idletime = server.unixtime - c->lastinteraction;
    // resize输入缓冲区的两个条件:
    // 1. 输入缓冲区的大小大于32K以及超过缓冲区的峰值的2倍
    // 2. client超过时间大于2秒,且输入缓冲区的大小超过1k
    if (((querybuf_size > PROTO_MBULK_BIG_ARG) &&
    (querybuf_size/(c->querybuf_peak+1)) > 2) ||
    (querybuf_size > 1024 && idletime > 2))
    {
    // 只有输入缓冲区的未使用大小超过1k,则会释放未使用的空间
    if (sdsavail(c->querybuf) > 1024) {
    c->querybuf = sdsRemoveFreeSpace(c->querybuf);
    }
    }
    // 清空输入缓冲区的峰值
    c->querybuf_peak = 0;
    return 0;
    }

4. maxmemory的策略

Redis 服务器对内存使用会有一个server.maxmemory的限制,如果超过这个限制,就要通过删除一些键空间来释放一些内存,具体函数对应freeMemoryIfNeeded()。

释放内存时,可以指定不同的策略。策略保存在maxmemory_policy中,他可以指定以下的几个值:

1
2
3
4
5
6
#define MAXMEMORY_VOLATILE_LRU 0
#define MAXMEMORY_VOLATILE_TTL 1
#define MAXMEMORY_VOLATILE_RANDOM 2
#define MAXMEMORY_ALLKEYS_LRU 3
#define MAXMEMORY_ALLKEYS_RANDOM 4
#define MAXMEMORY_NO_EVICTION 5

可以看出主要分为三种,

  • LRU:优先删除最近最少使用的键。
  • TTL:优先删除生存时间最短的键。
  • RANDOM:随机删除。
    而ALLKEYS和VOLATILE的不同之处就是要确定是从数据库的键值对字典还是过期键字典中删除。

了解了以上这些,我们贴出代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
// 按需释放内存空间
int freeMemoryIfNeeded(void) {
size_t mem_used, mem_tofree, mem_freed;
int slaves = listLength(server.slaves);
mstime_t latency, eviction_latency;
// 计算出服务器总的内存使用量,但是有两部分要减去
/*
1、从节点的输出缓冲区
2、AOF缓冲区
*/
mem_used = zmalloc_used_memory();
// 存在从节点
if (slaves) {
listIter li;
listNode *ln;
listRewind(server.slaves,&li);
// 遍历从节点链表
while((ln = listNext(&li))) {
client *slave = listNodeValue(ln);
// 获取当前从节点的输出缓冲区的大小,不包含静态的固定回复缓冲区,因为他总被分配
unsigned long obuf_bytes = getClientOutputBufferMemoryUsage(slave);
// 减去当前从节点的输出缓冲区的大小
if (obuf_bytes > mem_used)
mem_used = 0;
else
mem_used -= obuf_bytes;
}
}
// 如果开启了AOF操作
if (server.aof_state != AOF_OFF) {
// 减去AOF缓冲区的大小
mem_used -= sdslen(server.aof_buf);
// 减去AOF重写缓冲区的大小
mem_used -= aofRewriteBufferSize();
}
// 如果没有超过服务器设置的最大内存限制,则返回C_OK
if (mem_used <= server.maxmemory) return C_OK;
// 如果内存回收策略为不回收,则返回C_ERR
if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION)
return C_ERR; /* We need to free memory, but policy forbids. */
// 计算需要回收的大小
mem_tofree = mem_used - server.maxmemory;
// 已回收的大小
mem_freed = 0;
// 设置回收延迟检测开始的时间
latencyStartMonitor(latency);
// 循环回收,直到到达需要回收大小
while (mem_freed < mem_tofree) {
int j, k, keys_freed = 0;
// 遍历所有的数据库
for (j = 0; j < server.dbnum; j++) {
long bestval = 0; /* just to prevent warning */
sds bestkey = NULL;
dictEntry *de;
redisDb *db = server.db+j;
dict *dict;
// 如果回收策略有ALLKEYS_LRU或RANDOM,从键值对字典中选择回收
if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_LRU ||
server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM)
{
// 则从键值对字典中选择回收的键。选择样品字典
dict = server.db[j].dict;
} else {
// 否则从过期键字典中选择回收的键。选择样品字典
dict = server.db[j].expires;
}
if (dictSize(dict) == 0) continue; //跳过空字典
/* volatile-random and allkeys-random policy */
// 如果回收策略有 ALLKEYS_RANDOM 或 VOLATILE_RANDOM,则是随机挑选
if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM ||
server.maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM)
{
// 随机返回一个key
de = dictGetRandomKey(dict);
bestkey = dictGetKey(de);
}
/* volatile-lru and allkeys-lru policy */
// 如果回收策略有 ALLKEYS_LRU 或 VOLATILE_LRU,则使用LRU策略
else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_LRU ||
server.maxmemory_policy == MAXMEMORY_VOLATILE_LRU)
{
// 回收池
struct evictionPoolEntry *pool = db->eviction_pool;
while(bestkey == NULL) {
// evictionPoolPopulate()用于在每次我们想要过期一个键的时候,用几个节点填充evictionPool。 空闲时间小于当前key的之一的key被添加。 如果有free的节点,则始终添加key。 我们按升序插入key,所以空闲时间越短的键在左边,右边的空闲时间越长。
// 从样品字典dict中随机选择样品
evictionPoolPopulate(dict, db->dict, db->eviction_pool);
// 从空转时间最长的开始遍历
for (k = MAXMEMORY_EVICTION_POOL_SIZE-1; k >= 0; k--) {
// 跳过空位置
if (pool[k].key == NULL) continue;
// 从样品字典dict中查找当前key
de = dictFind(dict,pool[k].key);
// 从收回池中删除
sdsfree(pool[k].key);
// 释放位置
memmove(pool+k,pool+k+1,
sizeof(pool[0])*(MAXMEMORY_EVICTION_POOL_SIZE-k-1));
// 重置key和空转时间
pool[MAXMEMORY_EVICTION_POOL_SIZE-1].key = NULL;
pool[MAXMEMORY_EVICTION_POOL_SIZE-1].idle = 0;
// 如果从样品字典中可以找到,则保存键
if (de) {
bestkey = dictGetKey(de);
break;
// 没找到,则继续找下一个样品空间所保存的键
} else {
/* Ghost... */
continue;
}
}
// 如果当前选出的所有的样品都没找到,则重新选择一批样品,知道找到一个可以释放的键
}
}
/* volatile-ttl */
// 如果回收策略有 VOLATILE_TTL,则选择生存时间最短的键
else if (server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) {
// 抽样个数为maxmemory_samples个
for (k = 0; k < server.maxmemory_samples; k++) {
sds thiskey;
long thisval;
// 返回一个键,获取他的生存时间
de = dictGetRandomKey(dict);
thiskey = dictGetKey(de);
thisval = (long) dictGetVal(de);
// 如果当前键的生存时间更短,则保存
if (bestkey == NULL || thisval < bestval) {
bestkey = thiskey;
bestval = thisval;
}
}
}
/* Finally remove the selected key. */
// 删除所有被选择的键
if (bestkey) {
long long delta;
robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
// 当一个键在主节点中过期时,主节点会发送del命令给从节点和AOF文件
propagateExpire(db,keyobj);
// 单独计算dbDelete()所释放的空间大小, 在AOF和复制链接中传播DEL的内存实际上大于我们释放的key的内存
// 但是无法解释,窦泽不会退出循环
// AOF和输出缓冲区的内存最终被释放,所以我们只关心键空间使用的内存
delta = (long long) zmalloc_used_memory();
// 设置删除key对象的开始时间
latencyStartMonitor(eviction_latency);
dbDelete(db,keyobj);
// 保存删除key对象时间
latencyEndMonitor(eviction_latency);
// 添加到延迟诊断字典中
latencyAddSampleIfNeeded("eviction-del",eviction_latency);
// 删除嵌套的延迟事件
latencyRemoveNestedEvent(latency,eviction_latency);
// 计算删除这个键的大小
delta -= (long long) zmalloc_used_memory();
// 更新内存释放量
mem_freed += delta;
// 服务器总的回收键的个数计数器加1
server.stat_evictedkeys++;
// 事件通知
notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted",
keyobj, db->id);
// 释放键对象
decrRefCount(keyobj);
// 释放键的个数加1
keys_freed++;
// 如果有从节点,则刷新所有的输出缓冲区数据
if (slaves) flushSlavesOutputBuffers();
}
}
// 如果所有数据库都没有释放键,返回C_ERR
if (!keys_freed) {
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("eviction-cycle",latency);
return C_ERR; /* nothing to free... */
}
}
// 计算回收延迟的时间
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("eviction-cycle",latency);
return C_OK;
}

5. Redis服务器的main函数

Redis 服务器的main()主要执行了一下操作:

  • 初始化服务器状态
  • 载入服务器的配置
  • 初始化服务器数据结构
  • 载入持久化文件还原数据库状态
  • 执行事件循环
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    int main(int argc, char **argv) {
    struct timeval tv;
    int j;
    #ifdef INIT_SETPROCTITLE_REPLACEMENT
    spt_init(argc, argv);
    #endif
    // 本函数用来配置地域的信息,设置当前程序使用的本地化信息,LC_COLLATE 配置字符串比较
    setlocale(LC_COLLATE,"");
    // 设置线程安全
    zmalloc_enable_thread_safeness();
    // 设置内存溢出的处理函数
    zmalloc_set_oom_handler(redisOutOfMemoryHandler);
    // 初始化随机数发生器
    srand(time(NULL)^getpid());
    // 保存当前信息
    gettimeofday(&tv,NULL);
    // 设置哈希函数的种子
    dictSetHashFunctionSeed(tv.tv_sec^tv.tv_usec^getpid());
    // 检查开启哨兵模式的两种方式
    server.sentinel_mode = checkForSentinelMode(argc,argv);
    // 初始化服务器配置
    initServerConfig();
    // 设置可执行文件的绝对路径
    server.executable = getAbsolutePath(argv[0]);
    // 分配执行executable文件的参数列表的空间
    server.exec_argv = zmalloc(sizeof(char*)*(argc+1));
    server.exec_argv[argc] = NULL;
    // 保存当前参数
    for (j = 0; j < argc; j++) server.exec_argv[j] = zstrdup(argv[j]);
    // 如果已开启哨兵模式
    if (server.sentinel_mode) {
    // 初始化哨兵的配置
    initSentinelConfig();
    initSentinel();
    }
    // 检查是否执行"redis-check-rdb"检查程序
    if (strstr(argv[0],"redis-check-rdb") != NULL)
    redis_check_rdb_main(argc,argv); //该函数不会返回
    // 解析参数
    if (argc >= 2) {
    j = 1; /* First option to parse in argv[] */
    sds options = sdsempty();
    char *configfile = NULL;
    /* Handle special options --help and --version */
    // 指定了打印版本信息,然后退出
    if (strcmp(argv[1], "-v") == 0 ||
    strcmp(argv[1], "--version") == 0) version();
    // 执行帮助信息,然后退出
    if (strcmp(argv[1], "--help") == 0 ||
    strcmp(argv[1], "-h") == 0) usage();
    // 执行内存测试程序
    if (strcmp(argv[1], "--test-memory") == 0) {
    if (argc == 3) {
    memtest(atoi(argv[2]),50);
    exit(0);
    } else {
    fprintf(stderr,"Please specify the amount of memory to test in megabytes.\n");
    fprintf(stderr,"Example: ./redis-server --test-memory 4096\n\n");
    exit(1);
    }
    }
    /* First argument is the config file name? */
    // 如果第1个参数不是'-',那么是配置文件
    if (argv[j][0] != '-' || argv[j][1] != '-') {
    configfile = argv[j];
    // 设置配置文件的绝对路径
    server.configfile = getAbsolutePath(configfile);
    /* Replace the config file in server.exec_argv with
    * its absoulte path. */
    zfree(server.exec_argv[j]);
    // 设置可执行的参数列表
    server.exec_argv[j] = zstrdup(server.configfile);
    j++;
    }
    // 解析指定的对象
    while(j != argc) {
    // 如果是以'-'开头
    if (argv[j][0] == '-' && argv[j][1] == '-') {
    /* Option name */
    // 跳过"--check-rdb"
    if (!strcmp(argv[j], "--check-rdb")) {
    /* Argument has no options, need to skip for parsing. */
    j++;
    continue;
    }
    // 每个选项之间用'\n'隔开
    if (sdslen(options)) options = sdscat(options,"\n");
    // 将选项追加在sds中
    options = sdscat(options,argv[j]+2);
    // 选项和参数用 " "隔开
    options = sdscat(options," ");
    } else {
    /* Option argument */
    // 追加选项参数
    options = sdscatrepr(options,argv[j],strlen(argv[j]));
    options = sdscat(options," ");
    }
    j++;
    }
    // 如果开启哨兵模式,哨兵模式配置文件不正确
    if (server.sentinel_mode && configfile && *configfile == '-') {
    serverLog(LL_WARNING,
    "Sentinel config from STDIN not allowed.");
    serverLog(LL_WARNING,
    "Sentinel needs config file on disk to save state. Exiting...");
    exit(1);
    }
    // 重置save命令的参数
    resetServerSaveParams();
    // 载入配置文件
    loadServerConfig(configfile,options);
    sdsfree(options);
    } else {
    serverLog(LL_WARNING, "Warning: no config file specified, using the default config. In order to specify a config file use %s /path/to/%s.conf", argv[0], server.sentinel_mode ? "sentinel" : "redis");
    }
    // 是否被监视
    server.supervised = redisIsSupervised(server.supervised_mode);
    // 是否以守护进程的方式运行
    int background = server.daemonize && !server.supervised;
    if (background) daemonize();
    // 初始化服务器
    initServer();
    // 创建保存pid的文件
    if (background || server.pidfile) createPidFile();
    // 为服务器进程设置标题
    redisSetProcTitle(argv[0]);
    // 打印Redis的logo
    redisAsciiArt();
    // 检查backlog队列
    checkTcpBacklogSettings();
    // 如果不是哨兵模式
    if (!server.sentinel_mode) {
    /* Things not needed when running in Sentinel mode. */
    serverLog(LL_WARNING,"Server started, Redis version " REDIS_VERSION);
    #ifdef __linux__
    // 打印内存警告
    linuxMemoryWarnings();
    #endif
    // 从AOF文件或RDB文件载入数据
    loadDataFromDisk();
    // 如果开启了集群模式
    if (server.cluster_enabled) {
    // 集群模式下验证载入的数据
    if (verifyClusterConfigWithData() == C_ERR) {
    serverLog(LL_WARNING,
    "You can't have keys in a DB different than DB 0 when in "
    "Cluster mode. Exiting.");
    exit(1);
    }
    }
    // 打印端口号
    if (server.ipfd_count > 0)
    serverLog(LL_NOTICE,"The server is now ready to accept connections on port %d", server.port);
    // 打印本地套接字fd
    if (server.sofd > 0)
    serverLog(LL_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket);
    } else {
    // 开启哨兵模式,哨兵模式和集群模式只能开启一种
    sentinelIsRunning();
    }
    /* Warning the user about suspicious maxmemory setting. */
    // 最大内存限制是否配置正确
    if (server.maxmemory > 0 && server.maxmemory < 1024*1024) {
    serverLog(LL_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory);
    }
    // 进入事件循环之前执行beforeSleep()函数
    aeSetBeforeSleepProc(server.el,beforeSleep);
    // 运行事件循环,一直到服务器关闭
    aeMain(server.el);
    // 服务器关闭,删除事件循环
    aeDeleteEventLoop(server.el);
    return 0;
    }
HeLei Blog

Redis源码剖析和注释(八)--- redis对象(redisObject)

发表于 2018-02-10 | 分类于 Redis

1. 介绍

redis中基于双端链表、简单动态字符串(sds)、字典、跳跃表、整数集合、压缩列表、快速列表等等数据结构实现了一个对象系统,并且实现了5种不同的对象,每种对象都使用了至少一种前面的数据结构,优化对象在不同场合下的使用效率。

2. 对象的系统的实现

2.1 对象的结构

对象结构robj功能:

  • 为5种不同的对象类型提供同一的表示形式。
  • 为不同的对象适用于不同的场景,支持同一种对象类型采用多种的数据结构方式。
  • 支持引用计数,实现对象共享机制。
  • 记录对象的访问时间,便于删除对象。

对象结构定义在redis 3.2版本的server.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#define LRU_BITS 24
#define LRU_CLOCK_MAX ((1<<LRU_BITS)-1) /* Max value of obj->lru */
#define LRU_CLOCK_RESOLUTION 1000 /* LRU clock resolution in ms */
typedef struct redisObject {
//对象的数据类型,占4bits,共5种类型
unsigned type:4;
//对象的编码类型,占4bits,共10种类型
unsigned encoding:4;
//least recently used
//实用LRU算法计算相对server.lruclock的LRU时间
unsigned lru:LRU_BITS; /* lru time (relative to server.lruclock) */
//引用计数
int refcount;
//指向底层数据实现的指针
void *ptr;
} robj;
//type的占5种类型:
/* Object types */
#define OBJ_STRING 0 //字符串对象
#define OBJ_LIST 1 //列表对象
#define OBJ_SET 2 //集合对象
#define OBJ_ZSET 3 //有序集合对象
#define OBJ_HASH 4 //哈希对象
/* Objects encoding. Some kind of objects like Strings and Hashes can be
* internally represented in multiple ways. The 'encoding' field of the object
* is set to one of this fields for this object. */
// encoding 的10种类型
#define OBJ_ENCODING_RAW 0 /* Raw representation */ //原始表示方式,字符串对象是简单动态字符串
#define OBJ_ENCODING_INT 1 /* Encoded as integer */ //long类型的整数
#define OBJ_ENCODING_HT 2 /* Encoded as hash table */ //字典
#define OBJ_ENCODING_ZIPMAP 3 /* Encoded as zipmap */ //不在使用
#define OBJ_ENCODING_LINKEDLIST 4 /* Encoded as regular linked list */ //双端链表,不在使用
#define OBJ_ENCODING_ZIPLIST 5 /* Encoded as ziplist */ //压缩列表
#define OBJ_ENCODING_INTSET 6 /* Encoded as intset */ //整数集合
#define OBJ_ENCODING_SKIPLIST 7 /* Encoded as skiplist */ //跳跃表和字典
#define OBJ_ENCODING_EMBSTR 8 /* Embedded sds string encoding */ //embstr编码的简单动态字符串
#define OBJ_ENCODING_QUICKLIST 9 /* Encoded as linked list of ziplists */ //由压缩列表组成

2.2 字符串对象的底层实现类型

编码—encoding 对象—ptr
OBJ_ENCODING_RAW 简单动态字符串实现的字符串对象
OBJ_ENCODING_INT 整数值实现的字符串对象
OBJ_ENCODING_EMBSTR embstr编码的简单动态字符串实现的字符串对象

2.3 列表对象的底层实现类型

编码—encoding 对象—ptr
OBJ_ENCODING_QUICKLIST 快速列表实现的列表对象
OBJ_ENCODING_ZIPLIST 压缩列表实现的列表对象

2.4 集合对象的底层实现类型

编码—encoding 对象—ptr
OBJ_ENCODING_HT 字典实现的集合对象
OBJ_ENCODING_INTSET 整数集合实现的集合对象

2.5 哈希对象的底层实现类型

编码—encoding 对象—ptr
OBJ_ENCODING_ZIPLIST 压缩列表实现的哈希对象
OBJ_ENCODING_HT 字典实现的哈希对象

2.6 有序集合对象的底层实现类型

编码—encoding 对象—ptr
OBJ_ENCODING_SKIPLIST 跳跃表和字典实现的有序集合对象
OBJ_ENCODING_ZIPLIST 压缩列表实现的有序集合对象

3. 对象系统的重要操作

3.1创建一个字符串对象

  • 编码为OBJ_ENCODING_RAW

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    robj *createObject(int type, void *ptr) { //创建一个对象
    robj *o = zmalloc(sizeof(*o)); //分配空间
    o->type = type; //设置对象类型
    o->encoding = OBJ_ENCODING_RAW; //设置编码方式为OBJ_ENCODING_RAW
    o->ptr = ptr; //设置
    o->refcount = 1; //引用计数为1
    /* Set the LRU to the current lruclock (minutes resolution). */
    o->lru = LRU_CLOCK(); //计算设置当前LRU时间
    return o;
    }
  • 编码为OBJ_ENCODING_EMBSTR

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    /* Create a string object with encoding OBJ_ENCODING_EMBSTR, that is
    * an object where the sds string is actually an unmodifiable string
    * allocated in the same chunk as the object itself. */
    //创建一个embstr编码的字符串对象
    robj *createEmbeddedStringObject(const char *ptr, size_t len) {
    robj *o = zmalloc(sizeof(robj)+sizeof(struct sdshdr8)+len+1); //分配空间
    struct sdshdr8 *sh = (void*)(o+1); //o+1刚好就是struct sdshdr8的地址
    o->type = OBJ_STRING; //类型为字符串对象
    o->encoding = OBJ_ENCODING_EMBSTR; //设置编码类型OBJ_ENCODING_EMBSTR
    o->ptr = sh+1; //指向分配的sds对象,分配的len+1的空间首地址
    o->refcount = 1; //设置引用计数
    o->lru = LRU_CLOCK(); //计算设置当前LRU时间
    sh->len = len; //设置字符串长度
    sh->alloc = len; //设置最大容量
    sh->flags = SDS_TYPE_8; //设置sds的类型
    if (ptr) { //如果传了字符串参数
    memcpy(sh->buf,ptr,len); //将传进来的ptr保存到对象中
    sh->buf[len] = '\0'; //结束符标志
    } else {
    memset(sh->buf,0,len+1); //否则将对象的空间初始化为0
    }
    return o;
    }
  • 两种字符串对象编码方式的区别

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    /* Create a string object with EMBSTR encoding if it is smaller than
    * REIDS_ENCODING_EMBSTR_SIZE_LIMIT, otherwise the RAW encoding is
    * used.
    *
    * The current limit of 39 is chosen so that the biggest string object
    * we allocate as EMBSTR will still fit into the 64 byte arena of jemalloc. */
    //sdshdr8的大小为3个字节,加上1个结束符共4个字节
    //redisObject的大小为16个字节
    //redis使用jemalloc内存分配器,且jemalloc会分配8,16,32,64等字节的内存
    //一个embstr固定的大小为16+3+1 = 20个字节,因此一个最大的embstr字符串为64-20 = 44字节
    #define OBJ_ENCODING_EMBSTR_SIZE_LIMIT 44
    // 创建字符串对象,根据长度使用不同的编码类型
    // createRawStringObject和createEmbeddedStringObject的区别是:
    // createRawStringObject是当字符串长度大于44字节时,robj结构和sdshdr结构在内存上是分开的
    // createEmbeddedStringObject是当字符串长度小于等于44字节时,robj结构和sdshdr结构在内存上是连续的
    robj *createStringObject(const char *ptr, size_t len) {
    if (len <= OBJ_ENCODING_EMBSTR_SIZE_LIMIT)
    return createEmbeddedStringObject(ptr,len);
    else
    return createRawStringObject(ptr,len);
    }

3.2 字符串对象编码的优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
/* Try to encode a string object in order to save space */
//尝试优化字符串对象的编码方式以节约空间
robj *tryObjectEncoding(robj *o) {
long value;
sds s = o->ptr;
size_t len;
/* Make sure this is a string object, the only type we encode
* in this function. Other types use encoded memory efficient
* representations but are handled by the commands implementing
* the type. */
serverAssertWithInfo(NULL,o,o->type == OBJ_STRING);
/* We try some specialized encoding only for objects that are
* RAW or EMBSTR encoded, in other words objects that are still
* in represented by an actually array of chars. */
//如果字符串对象的编码类型为RAW或EMBSTR时,才对其重新编码
if (!sdsEncodedObject(o)) return o;
/* It's not safe to encode shared objects: shared objects can be shared
* everywhere in the "object space" of Redis and may end in places where
* they are not handled. We handle them only as values in the keyspace. */
//如果refcount大于1,则说明对象的ptr指向的值是共享的,不对共享对象进行编码
if (o->refcount > 1) return o;
/* Check if we can represent this string as a long integer.
* Note that we are sure that a string larger than 20 chars is not
* representable as a 32 nor 64 bit integer. */
len = sdslen(s); //获得字符串s的长度
//如果len小于等于20,表示符合long long可以表示的范围,且可以转换为long类型的字符串进行编码
if (len <= 20 && string2l(s,len,&value)) {
/* This object is encodable as a long. Try to use a shared object.
* Note that we avoid using shared integers when maxmemory is used
* because every object needs to have a private LRU field for the LRU
* algorithm to work well. */
if ((server.maxmemory == 0 ||
(server.maxmemory_policy != MAXMEMORY_VOLATILE_LRU &&
server.maxmemory_policy != MAXMEMORY_ALLKEYS_LRU)) &&
value >= 0 &&
value < OBJ_SHARED_INTEGERS) //如果value处于共享整数的范围内
{
decrRefCount(o); //原对象的引用计数减1,释放对象
incrRefCount(shared.integers[value]); //增加共享对象的引用计数
return shared.integers[value]; //返回一个编码为整数的字符串对象
} else { //如果不处于共享整数的范围
if (o->encoding == OBJ_ENCODING_RAW) sdsfree(o->ptr); //释放编码为OBJ_ENCODING_RAW的对象
o->encoding = OBJ_ENCODING_INT; //转换为OBJ_ENCODING_INT编码
o->ptr = (void*) value; //指针ptr指向value对象
return o;
}
}
/* If the string is small and is still RAW encoded,
* try the EMBSTR encoding which is more efficient.
* In this representation the object and the SDS string are allocated
* in the same chunk of memory to save space and cache misses. */
//如果len小于44,44是最大的编码为EMBSTR类型的字符串对象长度
if (len <= OBJ_ENCODING_EMBSTR_SIZE_LIMIT) {
robj *emb;
if (o->encoding == OBJ_ENCODING_EMBSTR) return o; //将RAW对象转换为OBJ_ENCODING_EMBSTR编码类型
emb = createEmbeddedStringObject(s,sdslen(s)); //创建一个编码类型为OBJ_ENCODING_EMBSTR的字符串对象
decrRefCount(o); //释放之前的对象
return emb;
}
/* We can't encode the object...
*
* Do the last try, and at least optimize the SDS string inside
* the string object to require little space, in case there
* is more than 10% of free space at the end of the SDS string.
*
* We do that only for relatively large strings as this branch
* is only entered if the length of the string is greater than
* OBJ_ENCODING_EMBSTR_SIZE_LIMIT. */
//无法进行编码,但是如果s的未使用的空间大于使用空间的10分之1
if (o->encoding == OBJ_ENCODING_RAW &&
sdsavail(s) > len/10)
{
o->ptr = sdsRemoveFreeSpace(o->ptr); //释放所有的未使用空间
}
/* Return the original object. */
return o;
}

3.3 引用计数管理对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//引用计数加1
void incrRefCount(robj *o) {
o->refcount++;
}
//引用计数减1
void decrRefCount(robj *o) {
if (o->refcount <= 0) serverPanic("decrRefCount against refcount <= 0");
//当引用对象等于1时,在操作引用计数减1,直接释放对象的ptr和对象空间
if (o->refcount == 1) {
switch(o->type) {
case OBJ_STRING: freeStringObject(o); break;
case OBJ_LIST: freeListObject(o); break;
case OBJ_SET: freeSetObject(o); break;
case OBJ_ZSET: freeZsetObject(o); break;
case OBJ_HASH: freeHashObject(o); break;
default: serverPanic("Unknown object type"); break;
}
zfree(o);
} else {
o->refcount--; //否则减1
}
}

3.4 对象的复制,创建的对象非共享

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//返回 复制的o对象的副本的地址,且创建的对象非共享
robj *dupStringObject(robj *o) {
robj *d;
serverAssert(o->type == OBJ_STRING); //一定是OBJ_STRING类型
switch(o->encoding) { //根据不同的编码类型
case OBJ_ENCODING_RAW:
return createRawStringObject(o->ptr,sdslen(o->ptr)); //创建的对象非共享
case OBJ_ENCODING_EMBSTR:
return createEmbeddedStringObject(o->ptr,sdslen(o->ptr)); //创建的对象非共享
case OBJ_ENCODING_INT: //整数编码类型
d = createObject(OBJ_STRING, NULL); //即使是共享整数范围内的整数,创建的对象也是非共享的
d->encoding = OBJ_ENCODING_INT;
d->ptr = o->ptr;
return d;
default:
serverPanic("Wrong encoding.");
break;
}
}

3.5 对象的解码操作

将保存的整数值解码成字符串对象返回回来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/* Get a decoded version of an encoded object (returned as a new object).
* If the object is already raw-encoded just increment the ref count. */
//将对象是整型的解码为字符串并返回,如果是字符串编码则直接返回输入对象,只需增加引用计数
robj *getDecodedObject(robj *o) {
robj *dec;
if (sdsEncodedObject(o)) { //如果是OBJ_ENCODING_RAW或OBJ_ENCODING_EMBSTR类型的对象
incrRefCount(o); //增加引用计数,返回一个共享的对象
return o;
}
if (o->type == OBJ_STRING && o->encoding == OBJ_ENCODING_INT) { //如果是整数对象
char buf[32];
ll2string(buf,32,(long)o->ptr); //将整数转换为字符串
dec = createStringObject(buf,strlen(buf)); //创建一个字符串对象
return dec;
} else {
serverPanic("Unknown encoding type");
}
}

HeLei Blog

Redis源码剖析和注释(十九)--- Redis 事件处理实现

发表于 2018-02-10 | 分类于 Redis

1. Redis事件介绍

Redis服务器是一个事件驱动程序。下面先来简单介绍什么是事件驱动。

所谓事件驱动,就是当你输入一条命令并且按下回车,然后消息被组装成Redis协议的格式发送给Redis服务器,这就会产生一个事件,Redis服务器会接收该命令,处理该命令和发送回复,而当你没有与服务器进行交互时,那么服务器就会处于阻塞等待状态,会让出CPU从而进入睡眠状态,当事件触发时,就会被操作系统唤醒。事件驱动使CPU更高效的利用。

事件驱动是一种概括和抽象,也可以称为I/O多路复用(I/O multiplexing),它的实现方式各个系统都不同,一会会说到Redis的方式。

在redis服务器中,处理了两类事件:

  • 文件事件(file event):Redis服务器通过套接字于客户端(或其他Redis服务器)进行连接,而文件事件就是服务器对套接字操作的抽象。
  • 时间事件(time event):Redis服务器的一些操作需要在给定的事件点执行,而时间事件就是服务器对这类定时操作的抽象。

2. 事件的抽象

Redis将这两个事件分别抽象成一个数据结构来管理。

2.1 文件事件结构

1
2
3
4
5
6
7
8
9
10
11
/* File event structure */
typedef struct aeFileEvent {
// 文件时间类型:AE_NONE,AE_READABLE,AE_WRITABLE
int mask; /* one of AE_(READABLE|WRITABLE) */
// 可读处理函数
aeFileProc *rfileProc;
// 可写处理函数
aeFileProc *wfileProc;
// 客户端传入的数据
void *clientData;
} aeFileEvent; //文件事件

其中rfileProc和wfileProc成员分别为两个函数指针,他们的原型为

1
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask)

这个函数是回调函数,如果当前文件事件所指定的事件类型发生时,则会调用对应的回调函数处理该事件。函数指针与回调函数详解

当事件就绪时,我们需要知道文件事件的文件描述符还有事件类型才能锁定该事件,因此定义了aeFiredEvent结构统一管理:

1
2
3
4
5
6
7
/* A fired event */
typedef struct aeFiredEvent {
// 就绪事件的文件描述符
int fd;
// 就绪事件类型:AE_NONE,AE_READABLE,AE_WRITABLE
int mask;
} aeFiredEvent; //就绪事件

2.2 时间事件结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/* Time event structure */
typedef struct aeTimeEvent {
// 时间事件的id
long long id; /* time event identifier. */
// 时间事件到达的时间的秒数
long when_sec; /* seconds */
// 时间事件到达的时间的毫秒数
long when_ms; /* milliseconds */
// 时间事件处理函数
aeTimeProc *timeProc;
// 时间事件终结函数
aeEventFinalizerProc *finalizerProc;
// 客户端传入的数据
void *clientData;
// 指向下一个时间事件
struct aeTimeEvent *next;
} aeTimeEvent; //时间事件

从这个结构中可以看出,时间事件表是一个链表,因为它有一个next指针域,指向下一个时间事件。

和文件事件一样,当时间事件所指定的事件发生时,也会调用对应的回调函数,结构成员timeProc和finalizerProc都是回调函数,函数原型如下:

1
2
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);

虽然对文件事件和时间事件都做了抽象,Redis仍然需要对事件做整体抽象,于是定义了aeEventLoop结构。

2.3 事件状态结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/* State of an event based program */
typedef struct aeEventLoop {
// 当前已注册的最大的文件描述符
int maxfd; /* highest file descriptor currently registered */
// 文件描述符监听集合的大小
int setsize; /* max number of file descriptors tracked */
// 下一个时间事件的ID
long long timeEventNextId;
// 最后一次执行事件的时间
time_t lastTime; /* Used to detect system clock skew */
// 注册的文件事件表
aeFileEvent *events; /* Registered events */
// 已就绪的文件事件表
aeFiredEvent *fired; /* Fired events */
// 时间事件的头节点指针
aeTimeEvent *timeEventHead;
// 事件处理开关
int stop;
// 多路复用库的事件状态数据
void *apidata; /* This is used for polling API specific data */
// 执行处理事件之前的函数
aeBeforeSleepProc *beforesleep;
} aeEventLoop; //事件轮询的状态结构

aeEventLoop结构保存了一个void *类型的万能指针apidata,是用来保存轮询事件的状态的,也就是保存底层调用的多路复用库的事件状态,关于Redis的多路复用库的选择,Redis包装了常见的select epoll evport kqueue,他们在编译阶段,根据不同的系统选择性能最高的一个多路复用库作为Redis的多路复用程序的实现,而且所有库实现的接口名称都是相同的,因此Redis多路复用程序底层实现是可以互换的。具体选择库的源码为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// IO复用的选择,性能依次下降,Linux支持 "ae_epoll.c" 和 "ae_select.c"
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endif

也可以通过Redis客户端的命令来查看当前选择的多路复用库,INFO server

1
2
3
4
5
127.0.0.1:6379> INFO server
# Server
……
multiplexing_api:epoll
……

那么,既然知道了多路复用库的选择,那么我们来查看一下apidata保存的epoll模型的事件状态结构:ae_epoll.c文件中

1
2
3
4
5
6
typedef struct aeApiState {
// epoll事件的文件描述符
int epfd;
// 事件表
struct epoll_event *events;
} aeApiState; //事件的状态

epoll模型的struct epoll_event的结构中定义这自己的事件类型,例如EPOLLIN POLLOUT等等,但是Redis的文件事件结构aeFileEvent中也在mask中定义了自己的事件类型,例如:AE_READABLE AE_WRITABLE等,于是,就需要实现一个中间层将两者的事件类型相联系起来,这也就是之前提到的ae_epoll.c文件中实现的相同的API,我们列出来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 创建一个epoll实例,保存到eventLoop中
static int aeApiCreate(aeEventLoop *eventLoop)
// 调整事件表的大小
static int aeApiResize(aeEventLoop *eventLoop, int setsize)
// 释放epoll实例和事件表空间
static void aeApiFree(aeEventLoop *eventLoop)
// 在epfd标识的事件表上注册fd的事件
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask)
// 在epfd标识的事件表上注删除fd的事件
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask)
// 等待所监听文件描述符上有事件发生
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp)
// 返回正在使用的IO多路复用库的名字
static char *aeApiName(void)

这些API都是调用相应的底层多路复用库来将Redis事件状态结构aeEventLoop所关联,就是将epoll的底层函数封装起来,Redis实现事件时,只需调用这些接口即可。我们查看两个重要的函数的源码,看看是如何实现的

  • 向Redis事件状态结构aeEventLoop的事件表event注册一个事件,对应的是epoll_ctl函数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    // 在epfd标识的事件表上注册fd的事件
    static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0}; /* avoid valgrind warning */
    /* If the fd was already monitored for some event, we need a MOD
    * operation. Otherwise we need an ADD operation. */
    // EPOLL_CTL_ADD,向epfd注册fd的上的event
    // EPOLL_CTL_MOD,修改fd已注册的event
    // #define AE_NONE 0 //未设置
    // #define AE_READABLE 1 //事件可读
    // #define AE_WRITABLE 2 //事件可写
    // 判断fd事件的操作,如果没有设置事件,则进行关联mask类型事件,否则进行修改
    int op = eventLoop->events[fd].mask == AE_NONE ?
    EPOLL_CTL_ADD : EPOLL_CTL_MOD;
    // struct epoll_event {
    // uint32_t events; /* Epoll events */
    // epoll_data_t data; /* User data variable */
    // };
    ee.events = 0;
    // 如果是修改事件,合并之前的事件类型
    mask |= eventLoop->events[fd].mask; /* Merge old events */
    // 根据mask映射epoll的事件类型
    if (mask & AE_READABLE) ee.events |= EPOLLIN; //读事件
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; //写事件
    ee.data.fd = fd; //设置事件所从属的目标文件描述符
    // 将ee事件注册到epoll中
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    return 0;
    }
  • 等待所监听文件描述符上有事件发生,对应着底层epoll_wait函数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    // 等待所监听文件描述符上有事件发生
    static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;
    // 监听事件表上是否有事件发生
    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
    tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    // 至少有一个就绪的事件
    if (retval > 0) {
    int j;
    numevents = retval;
    // 遍历就绪的事件表,将其加入到eventLoop的就绪事件表中
    for (j = 0; j < numevents; j++) {
    int mask = 0;
    struct epoll_event *e = state->events+j;
    // 根据就绪的事件类型,设置mask
    if (e->events & EPOLLIN) mask |= AE_READABLE;
    if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
    if (e->events & EPOLLERR) mask |= AE_WRITABLE;
    if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
    // 添加到就绪事件表中
    eventLoop->fired[j].fd = e->data.fd;
    eventLoop->fired[j].mask = mask;
    }
    }
    // 返回就绪的事件个数
    return numevents;
    }

3. 事件的源码实现

Redis事件的源码全部定义在ae.c文件中,我们从事件的主函数aeMain说起,一步一步深入剖析。

1
2
3
4
5
6
7
8
9
10
11
12
// 事件轮询的主函数
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
// 一直处理事件
while (!eventLoop->stop) {
// 执行处理事件之前的函数
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
//处理到时的时间事件和就绪的文件事件
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}

这个事件的主函数aeMain很清楚的可以看到,如果服务器一直处理事件,那么就是一个死循环,而一个最典型的事件驱动,就是一个死循环。调用处理事件的函数aeProcessEvents,他们参数是一个事件状态结构aeEventLoop和AE_ALL_EVENTS,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
// 处理到时的时间事件和就绪的文件事件
// 函数返回执行的事件个数
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
/* Nothing to do? return ASAP */
// 如果什么事件都没有设置则直接返回
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
/* Note that we want call select() even if there are no
* file events to process as long as we want to process time
* events, in order to sleep until the next time event is ready
* to fire. */
// 请注意,既然我们要处理时间事件,即使没有要处理的文件事件,我们仍要调用select(),以便在下一次事件准备启动之前进行休眠
// 当前还没有要处理的文件事件,或者设置了时间时间但是没有设置不阻塞标识
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
// 如果设置了时间事件而没有设置不阻塞标识
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
// 获取最近到时的时间事件
shortest = aeSearchNearestTimer(eventLoop);
// 获取到了最早到时的时间事件
if (shortest) {
long now_sec, now_ms;
// 获取当前时间
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
/* How many milliseconds we need to wait for the next
* time event to fire? */
// 等待该时间事件到时所需要的时长
long long ms =
(shortest->when_sec - now_sec)*1000 +
shortest->when_ms - now_ms;
// 如果没到时
if (ms > 0) {
// 保存时长到tvp中
tvp->tv_sec = ms/1000;
tvp->tv_usec = (ms % 1000)*1000;
// 如果已经到时,则将tvp的时间设置为0
} else {
tvp->tv_sec = 0;
tvp->tv_usec = 0;
}
// 没有获取到了最早到时的时间事件,时间事件链表为空
} else {
/* If we have to check for events but need to return
* ASAP because of AE_DONT_WAIT we need to set the timeout
* to zero */
// 如果设置了不阻塞标识
if (flags & AE_DONT_WAIT) {
// 将tvp的时间设置为0,就不会阻塞
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
// 阻塞到第一个时间事件的到来
/* Otherwise we can block */
tvp = NULL; /* wait forever */
}
}
// 等待所监听文件描述符上有事件发生
// 如果tvp为NULL,则阻塞在此,否则等待tvp设置阻塞的时间,就会有时间事件到时
// 返回了就绪文件事件的个数
numevents = aeApiPoll(eventLoop, tvp);
// 遍历就绪文件事件表
for (j = 0; j < numevents; j++) {
// 获取就绪文件事件的地址
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
// 获取就绪文件事件的类型,文件描述符
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
/* note the fe->mask & mask & ... code: maybe an already processed
* event removed an element that fired and we still didn't
* processed, so we check if the event is still valid. */
// 如果是文件可读事件发生
if (fe->mask & mask & AE_READABLE) {
// 设置读事件标识 且 调用读事件方法处理读事件
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
// 如果是文件可写事件发生
if (fe->mask & mask & AE_WRITABLE) {
// 读写事件的执行发法不同,则执行写事件,避免重复执行相同的方法
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++; //执行的事件次数加1
}
}
/* Check time events */
// 执行时间事件
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}

刚才提到该函数的一个参数是AE_ALL_EVENTS,他的定义在ae.h中,定义如下:

1
2
3
4
#define AE_FILE_EVENTS 1 //文件事件
#define AE_TIME_EVENTS 2 //时间事件
#define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS) //文件和时间事件
#define AE_DONT_WAIT 4 //不阻塞等待标识

很明显,flags是AE_FILE_EVENTS和AE_TIME_EVENTS或的结果,他们的含义如下:

  • 如果flags = 0,函数什么都不做,直接返回
  • 如果flags设置了 AE_ALL_EVENTS ,则执行所有类型的事件
  • 如果flags设置了 AE_FILE_EVENTS ,则执行文件事件
  • 如果flags设置了 AE_TIME_EVENTS ,则执行时间事件
  • 如果flags设置了 AE_DONT_WAIT ,那么函数处理完事件后直接返回,不阻塞等待
    Redis服务器在没有被事件触发时,就会阻塞等待,因为没有设置AE_DONT_WAIT标识。但是他不会一直的死等待,等待文件事件的到来,因为他还要处理时间时间,因此,在调用aeApiPoll进行监听之前,先从时间事件表中获取一个最近到达的时间时间,根据要等待的时间构建一个struct timeval tv, *tvp结构的变量,这个变量保存着服务器阻塞等待文件事件的最长时间,一旦时间到达而没有触发文件事件,aeApiPoll函数就会停止阻塞,进而调用processTimeEvents处理时间事件,因为Redis服务器设定一个对自身资源和状态进行检查的周期性检查的时间事件,而该函数就是timeProc所指向的回调函数
    1
    int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData)

如果在阻塞等待的最长时间之间,触发了文件事件,就会先执行文件事件,后执行时间事件,因此处理时间事件通常比预设的会晚一点。

而执行文件事件rfileProc和wfileProc也是调用了回调函数,Redis将文件事件的处理分为了好几种,用于处理不同的网络通信需求,下面列出回调函数的原型:

1
2
3
4
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask)
void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask)
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask)
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask)

  • acceptTcpHandler:用于accept client的connect。
  • acceptUnixHandler:用于acceptclient的本地connect。
  • sendReplyToClient:用于向client发送命令回复。
  • readQueryFromClient:用于读入client发送的请求。
    接下来,我们查看获取最快达到的时间事件的函数aeSearchNearestTimer实现
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    // 寻找第一个快到时的时间事件
    // 这个操作是有用的知道有多少时间可以选择该事件设置为不用推迟任何事件的睡眠中。
    // 如果事件链表没有时间将返回NULL。
    static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop)
    {
    // 时间事件头节点地址
    aeTimeEvent *te = eventLoop->timeEventHead;
    aeTimeEvent *nearest = NULL;
    // 遍历所有的时间事件
    while(te) {
    // 寻找第一个快到时的时间事件,保存到nearest中
    if (!nearest || te->when_sec < nearest->when_sec ||
    (te->when_sec == nearest->when_sec &&
    te->when_ms < nearest->when_ms))
    nearest = te;
    te = te->next;
    }
    return nearest;
    }

这个函数没什么,就是遍历链表,找到最小值。我们重点看执行时间事件的函数processTimeEvents实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
/* Process time events */
// 执行时间事件
static int processTimeEvents(aeEventLoop *eventLoop) {
int processed = 0;
aeTimeEvent *te, *prev;
long long maxId;
time_t now = time(NULL);
// 这里尝试发现时间混乱的情况,上一次处理事件的时间比当前时间还要大
// 重置最近一次处理事件的时间
if (now < eventLoop->lastTime) {
te = eventLoop->timeEventHead;
while(te) {
te->when_sec = 0;
te = te->next;
}
}
// 设置上一次时间事件处理的时间为当前时间
eventLoop->lastTime = now;
prev = NULL;
te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId-1; //当前时间事件表中的最大ID
// 遍历时间事件链表
while(te) {
long now_sec, now_ms;
long long id;
/* Remove events scheduled for deletion. */
// 如果时间事件已被删除了
if (te->id == AE_DELETED_EVENT_ID) {
aeTimeEvent *next = te->next;
// 从事件链表中删除事件的节点
if (prev == NULL)
eventLoop->timeEventHead = te->next;
else
prev->next = te->next;
// 调用时间事件终结方法清楚该事件
if (te->finalizerProc)
te->finalizerProc(eventLoop, te->clientData);
zfree(te);
te = next;
continue;
}
// 确保我们不处理在此迭代中由时间事件创建的时间事件。 请注意,此检查目前无效:我们总是在头节点添加新的计时器,但是如果我们更改实施细节,则该检查可能会再次有用:我们将其保留在未来的防御
if (te->id > maxId) {
te = te->next;
continue;
}
// 获取当前时间
aeGetTime(&now_sec, &now_ms);
// 找到已经到时的时间事件
if (now_sec > te->when_sec ||
(now_sec == te->when_sec && now_ms >= te->when_ms))
{
int retval;
id = te->id;
// 调用时间事件处理方法
retval = te->timeProc(eventLoop, id, te->clientData);
// 时间事件次数加1
processed++;
// 如果不是定时事件,则继续设置它的到时时间
if (retval != AE_NOMORE) {
aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
// 如果是定时时间,则retval为-1,则将其时间事件删除,惰性删除
} else {
te->id = AE_DELETED_EVENT_ID;
}
}
// 更新前驱节点指针和后继节点指针
prev = te;
te = te->next;
}
return processed; //返回执行事件的次数
}

如果时间事件不存在,则就调用finalizerProc指向的回调函数,删除当前的时间事件。如果存在,就调用timeProc指向的回调函数处理时间事件。Redis的时间事件分为两类

  • 定时事件:让一段程序在指定的时间后执行一次。
  • 周期性事件:让一段程序每隔指定的时间后执行一次。
    如果当前的时间事件是周期性,那么就会在将时间周期添加到周期事件的到时时间中。如果是定时事件,则将该时间事件删除。

至此,Redis事件的实现就剖析完毕,但是事件的其他API,例如:创建事件,删除事件,调整事件表的大小等等都没有列出。

HeLei Blog

Redis源码剖析和注释(六)--- 压缩列表(ziplist)

发表于 2018-02-08 | 分类于 Redis

1. 介绍

压缩列表(ziplist)是哈希键的底层实现之一。它是经过特殊编码的双向链表,和整数集合(intset)一样,是为了提高内存的存储效率而设计的。当保存的对象是小整数值,或者是长度较短的字符串,那么redis就会使用压缩列表来作为哈希键的实现。

1
2
3
4
5
6
7
8
9
10
11
127.0.0.1:6379> HMSET hash name mike age 28 sex male
OK
127.0.0.1:6379> HGETALL hash
1) "name"
2) "mike"
3) "age"
4) "28"
5) "sex"
6) "male"
127.0.0.1:6379> OBJECT ENCODING hash //编码格式为ziplist
"ziplist"

注:redis 3.2以后,quicklist作为列表键的实现底层实现之一,代替了压缩列表。

通过命令来查看一下:

1
2
3
4
5
6
7
127.0.0.1:6379> RPUSH list 1 2
(integer) 2
127.0.0.1:6379> LRANGE list 0 -1
1) "1"
2) "2"
127.0.0.1:6379> OBJECT ENCODING list //是quicklist而非ziplist
"quicklist"

2. 压缩列表的结构

压缩列表是一系列特殊编码的连续内存块组成的顺序序列数据结构,可以包含任意多个节点(entry),每一个节点可以保存一个字节数组或者一个整数值。

空间中的结构组成如下图所示:
[url01]

这里写图片描述

  • zlbytes:占4个字节,记录整个压缩列表占用的内存字节数。
  • zltail_offset:占4个字节,记录压缩列表尾节点entryN距离压缩列表的起始地址的字节数。
  • zllength:占2个字节,记录了压缩列表的节点数量。
  • entry[1-N]:长度不定,保存数据。
  • zlend:占1个字节,保存一个常数255(0xFF),标记压缩列表的末端。
    redis没有提供一个结构体来保存压缩列表的信息,而是提供了一组宏来定位每个成员的地址,定义在ziplist.c文件中:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    由于压缩列表对数据的信息访问都是以字节为单位的,所以参数zl的类型是char *类型的,因此对zl指针进行一系列的强制类型转换,以便对不用长度成员的访问。
    /* Utility macros */
    // ziplist的成员宏定义
    // (*((uint32_t*)(zl))) 先对char *类型的zl进行强制类型转换成uint32_t *类型,
    // 然后在用*运算符进行取内容运算,此时zl能访问的内存大小为4个字节。
    #define ZIPLIST_BYTES(zl) (*((uint32_t*)(zl)))
    //将zl定位到前4个字节的bytes成员,记录这整个压缩列表的内存字节数
    #define ZIPLIST_TAIL_OFFSET(zl) (*((uint32_t*)((zl)+sizeof(uint32_t))))
    //将zl定位到4字节到8字节的tail_offset成员,记录着压缩列表尾节点距离列表的起始地址的偏移字节量
    #define ZIPLIST_LENGTH(zl) (*((uint16_t*)((zl)+sizeof(uint32_t)*2)))
    //将zl定位到8字节到10字节的length成员,记录着压缩列表的节点数量
    #define ZIPLIST_HEADER_SIZE (sizeof(uint32_t)*2+sizeof(uint16_t))
    //压缩列表表头(以上三个属性)的大小10个字节
    #define ZIPLIST_ENTRY_HEAD(zl) ((zl)+ZIPLIST_HEADER_SIZE)
    //返回压缩列表首节点的地址
    #define ZIPLIST_ENTRY_TAIL(zl) ((zl)+intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl)))
    //返回压缩列表尾节点的地址
    #define ZIPLIST_ENTRY_END(zl) ((zl)+intrev32ifbe(ZIPLIST_BYTES(zl))-1)
    //返回end成员的地址,一个字节。
  • intrev32ifbe()是封装的宏,用来根据主机的字节序按需要进行字节大小端的转换。

3. 创建一个空的压缩列表

空的压缩列表就是没有节点的列表。

1
2
3
4
5
6
7
8
9
10
11
12
/* Create a new empty ziplist. */
unsigned char *ziplistNew(void) { //创建并返回一个新的压缩列表
//ZIPLIST_HEADER_SIZE是压缩列表的表头大小,1字节是末端的end大小
unsigned int bytes = ZIPLIST_HEADER_SIZE+1;
unsigned char *zl = zmalloc(bytes); //为表头和表尾end成员分配空间
ZIPLIST_BYTES(zl) = intrev32ifbe(bytes); //将bytes成员初始化为bytes=11
ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(ZIPLIST_HEADER_SIZE); //空列表的tail_offset成员为表头大小为10
ZIPLIST_LENGTH(zl) = 0; //节点数量为0
zl[bytes-1] = ZIP_END; //将表尾end成员设置成默认的255
return zl;
}

如下图所示:
[url02]

4. 压缩列表节点结构

redis对于压缩列表节点定义了一个zlentry的结构,用来管理节点的所有信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
typedef struct zlentry {
//prevrawlen 前驱节点的长度
//prevrawlensize 编码前驱节点的长度prevrawlen所需要的字节大小
unsigned int prevrawlensize, prevrawlen;
//len 当前节点值长度
//lensize 编码当前节点长度len所需的字节数
unsigned int lensize, len;
//当前节点header的大小 = lensize + prevrawlensize
unsigned int headersize;
//当前节点的编码格式
unsigned char encoding;
//指向当前节点的指针,以char *类型保存
unsigned char *p;
} zlentry; //压缩列表节点信息的结构

虽然定义了这个结构体,但是根本就没有使用zlentry结构来作为压缩列表中用来存储数据节点中的结构,但是。因为,这个结构存小整数或短字符串实在是太浪费空间了。这个结构总共在32位机占用了28个字节(32位机),在64位机占用了32个字节。这不符合压缩列表的设计目的:提高内存的利用率。因此,在redis中,并没有定义结构体来进行操作,也是定义了一些宏,压缩列表的节点真正的结构如下图所示:
[url03]

  • prev_entry_len:记录前驱节点的长度。
  • encoding:记录当前节点的value成员的数据类型以及长度。
  • value:根据encoding来保存字节数组或整数。
    接下来就分别讨论这三个成员:

接下来就分别讨论这三个成员:

4.1 prev_entry_len成员

prev_entry_len成员实际上就是zlentry结构中prevrawlensize,和prevrawlen这两个成员的压缩版。

prevrawlen:记录着上一个节点的长度。
prevrawlensize:记录编码prevrawlen值的所需的字节个数。
而这两个成员都是int类型,因此将两者压缩为一个成员prev_entry_len,而且分别对不同长度的前驱节点使用不同的字节数来表示。

当前驱节点的长度小于254字节,那么prev_entry_len使用1字节表示。
当前驱节点的长度大于等于255字节,那么prev_entry_len使用5个字节表示。并且用5个字节中的最高8位(最高1个字节)用 0xFE 标示prev_entry_len占用了5个字节,后四个字节才是真正保存前驱节点的长度值。
因为,对于访问的指针都是char 类型,它能访问的范围1个字节,如果这个字节的大小等于0xFE,那么就会继续访问四个字节来获取前驱节点的长度,如果该字节的大小小于0xFE,那么该字节就是要获取的前驱节点的长度。因此这样就使prev_entry_len同时具有了prevrawlen和prevrawlensize的功能,而且更加节约内存。*

redis中的代码这样描述,定义在ziplist.c中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#define ZIP_BIGLEN 254
//对前驱节点的长度len进行编码,并写入p中,如果p为空,则仅仅返回编码len所需要的字节数
static unsigned int zipPrevEncodeLength(unsigned char *p, unsigned int len) {
if (p == NULL) {
return (len < ZIP_BIGLEN) ? 1 : sizeof(len)+1; //如果前驱节点的长度len字节小于254则返回1个字节,否则返回5个
} else {
if (len < ZIP_BIGLEN) { //如果前驱节点的长度len字节小于254
p[0] = len; //将len保存在p[0]中
return 1; //返回所需的编码数1字节
} else { //前驱节点的长度len大于等于255字节
p[0] = ZIP_BIGLEN; //添加5字节的标示,0xFE
memcpy(p+1,&len,sizeof(len)); //从p+1的起始地址开始拷贝len,拷贝四个字节
memrev32ifbe(p+1);
return 1+sizeof(len); //返回所需的编码数5字节
}
}
}

4.2 encoding成员

和prev_entry_len一样,encoding成员同样可以看做成zlentry结构中lensize和len的压缩版。

len:当前节点值长度。
lensize: 编码当前节点长度len所需的字节数。
同样的lensize和len都是占4个字节的,因此将两者压缩为一个成员encoding,只要encoding能够同时具有lensize和len成员的功能,而且对当前节点保存的是字节数组还是整数分别编码。

redis对字节数组和整数编码提供了一组宏定义,定义在ziplist.c中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/* Different encoding/length possibilities */
#define ZIP_STR_MASK 0xc0 //1100 0000 字节数组的掩码
#define ZIP_STR_06B (0 << 6) //0000 0000
#define ZIP_STR_14B (1 << 6) //0100 0000
#define ZIP_STR_32B (2 << 6) //1000 0000
#define ZIP_INT_MASK 0x30 //0011 0000 整数的掩码
#define ZIP_INT_16B (0xc0 | 0<<4) //1100 0000
#define ZIP_INT_32B (0xc0 | 1<<4) //1101 0000
#define ZIP_INT_64B (0xc0 | 2<<4) //1110 0000
#define ZIP_INT_24B (0xc0 | 3<<4) //1111 0000
#define ZIP_INT_8B 0xfe //1111 1110
//掩码个功能就是区分一个encoding是字节数组编码还是整数编码
//如果这个宏返回 1 就代表该enc是字节数组,如果是 0 就代表是整数的编码
#define ZIP_IS_STR(enc) (((enc) & ZIP_STR_MASK) < ZIP_STR_MASK)

HeLei Blog

Redis源码剖析和注释(五)--- 整数集合(intset)

发表于 2018-02-08 | 分类于 Redis

1. 介绍

整数集合(intset)是集合键底层实现之一。集合键另一实现是值为空的散列表(hash table),虽然使用散列表对集合的加入删除元素,判断元素是否存在等等操作时间复杂度为O(1),但是当存储的元素是整型且元素数目较少时,如果使用散列表存储,就会比较浪费内存,因此整数集合(intset)类型因为节约内存就存在。

散列表的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
127.0.0.1:6379> SADD set1 1 2 3
(integer) 3
127.0.0.1:6379> SADD set1 1
(integer) 0
127.0.0.1:6379> SMEMBERS set1
1) "1"
2) "2"
3) "3"
127.0.0.1:6379> SISMEMBER set1 2
(integer) 1
127.0.0.1:6379> SREM set1 1
(integer) 1
127.0.0.1:6379> SMEMBERS set1
1) "2"
2) "3"

2. 整数集合结构的实现

1
2
3
4
5
6
7
8
9
10
11
redis根目录下的intset.h文件
typedef struct intset {
uint32_t encoding; //编码格式,有如下三种格式,初始值默认为INTSET_ENC_INT16
uint32_t length; //集合元素数量
int8_t contents[]; //保存元素的数组,元素类型并不一定是ini8_t类型,柔性数组不占intset结构体大小,并且数组中的元素从小到大排列。
} intset; //整数集合结构
#define INTSET_ENC_INT16 (sizeof(int16_t)) //16位,2个字节,表示范围-32,768~32,767
#define INTSET_ENC_INT32 (sizeof(int32_t)) //32位,4个字节,表示范围-2,147,483,648~2,147,483,647
#define INTSET_ENC_INT64 (sizeof(int64_t)) //64位,8个字节,表示范围-9,223,372,036,854,775

3. 升级

intset整数集合之所以有三种表示编码格式的宏定义,是因为根据存储的元素数值大小,能够选取一个最”合适”的类型存储,”合适”可以理解为:既能够表示元素的大小,又可以节省空间。

因此,当新添加的元素,例如:65535,超过当前集合编码格式所能表示的范围,就要进行升级操作。

我们使用刚才命令中的集合,它在结构如下图:
[url01]

3.1获得新元素的编码格式

当前新元素要插入到集合中时,首先就要判获得新元素的编码格式,所以调用_intsetValueEncoding()来返回一个”适合”该元素的编码格式。65535的最”适合”的编码格式是INTSET_ENC_INT32。

1
2
3
4
5
6
7
8
9
/* Return the required encoding for the provided value. */
static uint8_t _intsetValueEncoding(int64_t v) { //返回合适v的编码方式
if (v < INT32_MIN || v > INT32_MAX) //如果超出32位所能表示数值的范围则返回INTSET_ENC_INT64
return INTSET_ENC_INT64;
else if (v < INT16_MIN || v > INT16_MAX) //如果超出16位所能表示数值的范围则返回INTSET_ENC_INT32
return INTSET_ENC_INT32;
else
return INTSET_ENC_INT16; //否则返回用16位表示的INTSET_ENC_INT16
}

3.2 调整内存空间

当得到新元素的编码格式后,就要将集合中所有元素的编码格式都要变成升级后的编码格式,因此,需要调整集合数组contents的内存空间大小,调用intsetResize()函数。

1
2
3
4
5
6
7
/* Resize the intset */
static intset *intsetResize(intset *is, uint32_t len) { //调整集合的内存空间大小
uint32_t size = len*intrev32ifbe(is->encoding); //计算数组的大小
is = zrealloc(is,sizeof(intset)+size);
//分配空间,如果新空间的大小比原来的空间大,那么数组的元素会被保留
return is;
}

  • intrev32ifbe()是一个宏定义,定义和实现在redis根目录下的endianconv.h和endianconv.c中根据主机字节序用来做整数大小端的转换。
    已经获知65535的编码格式,因此调整内存空间的大小等于编码格式的大小乘以集合元素的个数。如果图:
    [url02]
    注意:encoding成员已经发生变化,但是length并没有更新。

3.3 根据编码格式设置对应的值

调整好内存空间后就根据编码格式来设置集合元素的值和最后将新元素添加到集合中,都调用_intsetSet()函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/* Set the value at pos, using the configured encoding. */
//根据集合is设置的编码方式,设置下标为pos的值为value
static void _intsetSet(intset *is, int pos, int64_t value) {
uint32_t encoding = intrev32ifbe(is->encoding); //获取集合设置的编码方式
if (encoding == INTSET_ENC_INT64) { //如果是64位
((int64_t*)is->contents)[pos] = value; //设置下标pos的值为value
memrev64ifbe(((int64_t*)is->contents)+pos); //如果需要转换大小端
} else if (encoding == INTSET_ENC_INT32) { //如果是32位
((int32_t*)is->contents)[pos] = value; //设置下标pos的值为value
memrev32ifbe(((int32_t*)is->contents)+pos); //如果需要转换大小端
} else {
((int16_t*)is->contents)[pos] = value; //设置下标pos的值为value
memrev16ifbe(((int16_t*)is->contents)+pos); //如果需要转换大小端
}
}

  • memrev16ifbe()是一个宏定义,定义和实现在redis根目录下的endianconv.h和endianconv.c中根据主机字节序用来做内存大小端的转换。

将集合中原来的元素和新插入的元素以”合适”的编码格式INTSET_ENC_INT32写到数组中,顺序过程如下图:
[url03]

最后要更新length。

3.4 升级实现源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/* Upgrades the intset to a larger encoding and inserts the given integer. */
static intset *intsetUpgradeAndAdd(intset *is, int64_t value) { //根据value的编码方式,对整数集合is的编码格式升级
uint8_t curenc = intrev32ifbe(is->encoding); //当前集合的编码方式
uint8_t newenc = _intsetValueEncoding(value); //得到value合适的编码方式
int length = intrev32ifbe(is->length); //集合元素数量
int prepend = value < 0 ? 1 : 0; //如果value小于0,则要将value添加到数组最前端,因此为移动1个编码长度
//集合的编码格式要升级,也就是内存增大
//因为 value 的编码比集合原有的其他元素的编码都要大,所以value如果是负数,就是最小值,如果是正数则是最大值
//索引value要么放在数组集合的最前端,要么最后端,根据prepend判断
/* First set new encoding and resize */
is->encoding = intrev32ifbe(newenc); //更新集合is的编码方式
is = intsetResize(is,intrev32ifbe(is->length)+1); //根据新的编码方式重新设置内存空间大小
/* Upgrade back-to-front so we don't overwrite values.
* Note that the "prepend" variable is used to make sure we have an empty
* space at either the beginning or the end of the intset. */
//_intsetGetEncoded()得到下标为length的值
//_intsetSet设置下标为prepend+length的值为_intsetGetEncoded返回的值
//但是,编码格式已经发生改变,数组元素没变但是内存大小改变
while(length--)
_intsetSet(is,length+prepend,_intsetGetEncoded(is,length,curenc));
/* Set the value at the beginning or the end. */
if (prepend) //value是负数,要放在最前端
_intsetSet(is,0,value); //设置下标为0的值为value
else
_intsetSet(is,intrev32ifbe(is->length),value); //value为正数,设置最末尾+1的值为value
is->length = intrev32ifbe(intrev32ifbe(is->length)+1); //数组元素加1
return is;
}

3.5 升级的特点

  • 提升灵活性:因为C语言是静态类型的语言,通常在在数组中只是用一种类型保存数据,例如,要么只用int16_t类型,要么只用int32_t类型。通过自动升级底层数组来适应不同类型的新元素,不必担心类型的错误。
  • 节约内存:整数集合既可以让集合保存三种不同类型的值,又可以确保升级操作只在有需要的时候进行,这样就节省了内存。
  • 不支持降级:一旦对数组进行升级,编码就会一直保存升级后的状态。

4.整数集合的其他操作

源代码注释下载:redis源码注释

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
intset *intsetNew(void); //创建一个空集合
intset *intsetAdd(intset *is, int64_t value, uint8_t *success);//将value添加到is集合中,如果成功success被设置为1否则为0
intset *intsetRemove(intset *is, int64_t value, int *success);//从集合中删除value,删除成功success设置为1,失败为0
uint8_t intsetFind(intset *is, int64_t value);//返回1表示value是集合中的元素,否则返回0
int64_t intsetRandom(intset *is);//随机返回一个元素
uint8_t intsetGet(intset *is, uint32_t pos, int64_t *value);//获得下标为pos的值并保存在value中
uint32_t intsetLen(intset *is);//返回集合的元素个数
size_t intsetBlobLen(intset *is);//返回集合所占用的字节总量
/* Return the required encoding for the provided value. */
static uint8_t _intsetValueEncoding(int64_t v) { //返回合适v的编码方式
if (v < INT32_MIN || v > INT32_MAX) //如果超出32位所能表示数值的范围则返回INTSET_ENC_INT64
return INTSET_ENC_INT64;
else if (v < INT16_MIN || v > INT16_MAX) //如果超出16位所能表示数值的范围则返回INTSET_ENC_INT32
return INTSET_ENC_INT32;
else
return INTSET_ENC_INT16; //返回用16位表示的INTSET_ENC_INT16
}
/* Return the value at pos, given an encoding. */
static int64_t _intsetGetEncoded(intset *is, int pos, uint8_t enc) { //根据编码方式enc,返回在集合is中下标为pos的元素
int64_t v64;
int32_t v32;
int16_t v16;
if (enc == INTSET_ENC_INT64) { //64位编码
memcpy(&v64,((int64_t*)is->contents)+pos,sizeof(v64)); //从下标pos开始的内存空间拷贝64bit的数据到v64
memrev64ifbe(&v64); //如果是大端序,就会转换成小端序
return v64;
} else if (enc == INTSET_ENC_INT32) {//32位编码
memcpy(&v32,((int32_t*)is->contents)+pos,sizeof(v32));//从下标pos开始的内存空间拷贝32bit的数据到v32
memrev32ifbe(&v32); //32位大小端转换
return v32;
} else {//16位编码
memcpy(&v16,((int16_t*)is->contents)+pos,sizeof(v16));//从下标pos开始的内存空间拷贝16bit的数据到v16
memrev16ifbe(&v16); //16位大小端转换
return v16;
}
}
/* Return the value at pos, using the configured encoding. */
static int64_t _intsetGet(intset *is, int pos) { //根据集合is设置的编码方式,返回下标为pos的值
return _intsetGetEncoded(is,pos,intrev32ifbe(is->encoding));
//intrev32ifbe()函数返回参数的编码格式并且根据需求转换大小端
}
/* Set the value at pos, using the configured encoding. */
static void _intsetSet(intset *is, int pos, int64_t value) { //根据集合is设置的编码方式,设置下标为pos的值为value
uint32_t encoding = intrev32ifbe(is->encoding); //获取集合设置的编码方式
if (encoding == INTSET_ENC_INT64) { //如果是64位
((int64_t*)is->contents)[pos] = value; //设置下标pos的值为value
memrev64ifbe(((int64_t*)is->contents)+pos); //如果需要转换大小端
} else if (encoding == INTSET_ENC_INT32) { //如果是32位
((int32_t*)is->contents)[pos] = value; //设置下标pos的值为value
memrev32ifbe(((int32_t*)is->contents)+pos); //如果需要转换大小端
} else {
((int16_t*)is->contents)[pos] = value; //设置下标pos的值为value
memrev16ifbe(((int16_t*)is->contents)+pos); //如果需要转换大小端
}
}
/* Create an empty intset. */
intset *intsetNew(void) { //创建一个空集合
intset *is = zmalloc(sizeof(intset)); //分配空间
is->encoding = intrev32ifbe(INTSET_ENC_INT16); //设置编码方式
is->length = 0; //集合为空
return is;
}
/* Resize the intset */
static intset *intsetResize(intset *is, uint32_t len) { //调整集合的内存空间大小
uint32_t size = len*intrev32ifbe(is->encoding); //计算数组的大小
is = zrealloc(is,sizeof(intset)+size); //分配空间,如果新空间的大小比原来的空间大,那么数组的元素会被保留
return is;
}
/* Search for the position of "value". Return 1 when the value was found and
* sets "pos" to the position of the value within the intset. Return 0 when
* the value is not present in the intset and sets "pos" to the position
* where "value" can be inserted. */
//找到is集合中值为value的下标,返回1,并保存在pos中,没有找到返回0,并将pos设置为value可以插入到数组的位置
static uint8_t intsetSearch(intset *is, int64_t value, uint32_t *pos) {
int min = 0, max = intrev32ifbe(is->length)-1, mid = -1;
int64_t cur = -1;
/* The value can never be found when the set is empty */
if (intrev32ifbe(is->length) == 0) { //如果为空集合
if (pos) *pos = 0; //pos设置为0
return 0;
} else {
/* Check for the case where we know we cannot find the value,
* but do know the insert position. */
if (value > _intsetGet(is,intrev32ifbe(is->length)-1)) { //因为数组是有序的,如果value大于数组最大值
if (pos) *pos = intrev32ifbe(is->length); //可以将pos设置为数组末尾
return 0;
} else if (value < _intsetGet(is,0)) { //如果小于数组的最小值
if (pos) *pos = 0; //pos可以是下标为0的位置
return 0;
}
}
while(max >= min) { //有序集合中进行二分查找
mid = ((unsigned int)min + (unsigned int)max) >> 1; //(min+max)/2,找到中间数的下标
cur = _intsetGet(is,mid); //等到下标为mid的值cur
if (value > cur) { //value大于当前值cur
min = mid+1; //后一半找
} else if (value < cur) { //value小于当前值cur
max = mid-1; //前一半找
} else {
break; //找到退出循环
}
}
if (value == cur) { //确认找到
if (pos) *pos = mid; //设置pos为找到的位置,返回1
return 1;
} else {
if (pos) *pos = min; //此时min和max相等,所以pos可以设置为min或max,返回0
return 0;
}
}
/* Upgrades the intset to a larger encoding and inserts the given integer. */
static intset *intsetUpgradeAndAdd(intset *is, int64_t value) { //根据value的编码方式,对整数集合is的编码格式升级
uint8_t curenc = intrev32ifbe(is->encoding); //当前集合的编码方式
uint8_t newenc = _intsetValueEncoding(value); //得到value合适的编码方式
int length = intrev32ifbe(is->length); //集合元素数量
int prepend = value < 0 ? 1 : 0; //如果value小于0,则要将value添加到数组最前端,因此为移动1个编码长度
//集合的编码格式要升级,也就是内存增大
//因为 value 的编码比集合原有的其他元素的编码都要大,所以value如果是负数,就是最小值,如果是正数则是最大值
//索引value要么放在数组集合的最前端,要么最后端,根据prepend判断
/* First set new encoding and resize */
is->encoding = intrev32ifbe(newenc); //更新集合is的编码方式
is = intsetResize(is,intrev32ifbe(is->length)+1); //根据新的编码方式重新设置内存空间大小
/* Upgrade back-to-front so we don't overwrite values.
* Note that the "prepend" variable is used to make sure we have an empty
* space at either the beginning or the end of the intset. */
//_intsetGetEncoded()得到下标为length的值
//_intsetSet设置下标为prepend+length的值为_intsetGetEncoded返回的值
//但是,编码格式已经发生改变,数组元素没变但是内存大小改变
while(length--)
_intsetSet(is,length+prepend,_intsetGetEncoded(is,length,curenc));
/* Set the value at the beginning or the end. */
if (prepend) //value是负数,要放在最前端
_intsetSet(is,0,value); //设置下标为0的值为value
else
_intsetSet(is,intrev32ifbe(is->length),value); //value为正数,设置最末尾+1的值为value
is->length = intrev32ifbe(intrev32ifbe(is->length)+1); //数组元素加1
return is;
}
static void intsetMoveTail(intset *is, uint32_t from, uint32_t to) { //向前或向后移动指定下标范围内的数组元素
void *src, *dst;
uint32_t bytes = intrev32ifbe(is->length)-from; //获得要移动的元素的个数
uint32_t encoding = intrev32ifbe(is->encoding); //获得集合is的默认编码方式
if (encoding == INTSET_ENC_INT64) { //判断不同的编码格式
src = (int64_t*)is->contents+from; //获得要被移动范围的起始地址
dst = (int64_t*)is->contents+to; //获得要被移动到的目的地址
bytes *= sizeof(int64_t); //计算要移动多少个字节
} else if (encoding == INTSET_ENC_INT32) {
src = (int32_t*)is->contents+from;
dst = (int32_t*)is->contents+to;
bytes *= sizeof(int32_t);
} else {
src = (int16_t*)is->contents+from;
dst = (int16_t*)is->contents+to;
bytes *= sizeof(int16_t);
}
memmove(dst,src,bytes); //从src开始移动bytes个字节到dst
}
/* Insert an integer in the intset */
intset *intsetAdd(intset *is, int64_t value, uint8_t *success) {//将value添加到is集合中,如果成功success被设置为1否则为0
uint8_t valenc = _intsetValueEncoding(value); //获得value适合的编码类型
uint32_t pos;
if (success) *success = 1; //设置success默认为1
/* Upgrade encoding if necessary. If we need to upgrade, we know that
* this value should be either appended (if > 0) or prepended (if < 0),
* because it lies outside the range of existing values. */
if (valenc > intrev32ifbe(is->encoding)) { //如果value的编码类型大于集合的编码类型
/* This always succeeds, so we don't need to curry *success. */
return intsetUpgradeAndAdd(is,value); //升级集合,并且将value加入集合,一定成功
} else {
/* Abort if the value is already present in the set.
* This call will populate "pos" with the right position to insert
* the value when it cannot be found. */
if (intsetSearch(is,value,&pos)) { //查找value,若果value已经存在,intsetSearch返回1,如果不存在,pos保存value可以插入的位置
if (success) *success = 0; //value存在,success设置为0
return is;
}
//value在集合中不存在,且pos保存可以插入的位置
is = intsetResize(is,intrev32ifbe(is->length)+1); //调整集合大小
if (pos < intrev32ifbe(is->length)) intsetMoveTail(is,pos,pos+1); //如果pos不是在数组末尾则要移动调整集合
}
_intsetSet(is,pos,value); //设置pos下标的值为value
is->length = intrev32ifbe(intrev32ifbe(is->length)+1); //集合节点数量加1
return is;
}
/* Delete integer from intset */
intset *intsetRemove(intset *is, int64_t value, int *success) { //从集合中删除value,删除成功success设置为1,失败为0
uint8_t valenc = _intsetValueEncoding(value); //获得value适合的编码类型
uint32_t pos;
if (success) *success = 0; //设置success默认为0
//如果value的编码格式小于集合的编码格式且value在集合中已存在,pos保存着下标
if (valenc <= intrev32ifbe(is->encoding) && intsetSearch(is,value,&pos)) {
uint32_t len = intrev32ifbe(is->length); //备份当前集合元素数量
/* We know we can delete */
if (success) *success = 1; //删除成功,设置success为1
/* Overwrite value with tail and update length */
if (pos < (len-1)) intsetMoveTail(is,pos+1,pos); //如果不是最后一个元素,则移动元素覆盖掉被删除的元素
is = intsetResize(is,len-1); //缩小大小
is->length = intrev32ifbe(len-1); //更新集合元素个数
}
return is;
}
/* Determine whether a value belongs to this set */
uint8_t intsetFind(intset *is, int64_t value) { //返回1表示value是集合中的元素,否则返回0
uint8_t valenc = _intsetValueEncoding(value); //获得value适合的编码类型
return valenc <= intrev32ifbe(is->encoding) && intsetSearch(is,value,NULL);
//如果value的编码格式小于集合的编码格式且value在集合中已存在,返回1,其中任何一个不成立返回0
}
/* Return random member */
int64_t intsetRandom(intset *is) { //随机返回一个元素
return _intsetGet(is,rand()%intrev32ifbe(is->length)); //随机生成一个下标,返回该下标的值
}
/* Sets the value to the value at the given position. When this position is
* out of range the function returns 0, when in range it returns 1. */
uint8_t intsetGet(intset *is, uint32_t pos, int64_t *value) { //获得下标为pos的值并保存在value中
if (pos < intrev32ifbe(is->length)) { //如果pos小于数组长度
*value = _intsetGet(is,pos); //返回pos下标的值,保存在value中
return 1;
}
return 0;
}
/* Return intset length */
uint32_t intsetLen(intset *is) { //返回集合的元素个数
return intrev32ifbe(is->length); //返回length成员
}
/* Return intset blob size in bytes. */
size_t intsetBlobLen(intset *is) { //返回集合所占用的字节总量
return sizeof(intset)+intrev32ifbe(is->length)*intrev32ifbe(is->encoding); //编码格式×元素个数+集合大小
}

HeLei Blog

Redis源码剖析和注释(六)--- 压缩列表(ziplist)

发表于 2018-02-05 | 分类于 Redis

1. 跳跃表(skiplist)介绍

定义:跳跃表是一个有序链表,其中每个节点包含不定数量的链接,节点中的第i个链接构成的单向链表跳过含有少于i个链接的节点。

  • 跳跃表支持平均O(logN),最坏O(N)
  • 复杂度的节点查找,大部分情况下,跳跃表的效率可以和平衡树相媲美。
  • 跳跃表在redis中当数据较多时作为有序集合键的实现方式之一。

接下来,还是举个有序集合键的例子:

1
2
3
4
5
6
7
8
9
10
11
127.0.0.1:6379> ZADD score 95.5 Mike 98 Li 96 Wang //socre是一个有序集合键
(integer) 3
127.0.0.1:6379> ZRANGE score 0 -1 WITHSCORES//所有分数按从小到大排列,每一个成员都保存了一个分数
1) "Mike"
2) "95.5"
3) "Wang"
4) "96"
5) "Li"
6) "98"
127.0.0.1:6379> ZSCORE score Mike //查询Mike的分值
"95.5"

2. 跳跃表的实现

redis 3.0版本将跳跃表定义在redis.h文件中,而3.2版本定义在server.h文件中

1
2
3
4
5
6
7
8
9
10
跳跃表节点 zskiplistNode
typedef struct zskiplistNode {
robj *obj; //保存成员对象的地址
double score; //分值
struct zskiplistNode *backward; //后退指针
struct zskiplistLevel {
struct zskiplistNode *forward; //前进指针
unsigned int span; //跨度
} level[]; //层级,柔型数组
} zskiplistNode;

1
2
3
4
5
6
跳跃表表头 zskiplist(记录跳跃表信息)
typedef struct zskiplist {
struct zskiplistNode *header, *tail;//header指向跳跃表的表头节点,tail指向跳跃表的表尾节点
unsigned long length; //跳跃表的长度或跳跃表节点数量计数器,除去第一个节点
int level; //跳跃表中节点的最大层数,除了第一个节点
} zskiplist;

3. 幂次定律

在redis中,返回一个随机层数值,随机算法所使用的幂次定律。

  • 含义是:如果某件事的发生频率和它的某个属性成幂关系,那么这个频率就可以称之为符合幂次定律。
  • 表现是:少数几个事件的发生频率占了整个发生频率的大部分, 而其余的大多数事件只占整个发生频率的一个小部分。

在文件t_set.c中,zslRandomLevel函数的定义为:

1
2
3
4
5
6
7
8
9
10
int zslRandomLevel(void) { //返回一个随机层数值
int level = 1;
//(random()&0xFFFF)只保留低两个字节的位值,其他高位全部清零,所以该值范围为0到0xFFFF
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF)) //ZSKIPLIST_P(0.25)所以level+1的概率为0.25
level += 1; //返回一个1到ZSKIPLIST_MAXLEVEL(32)之间的值
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
#define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
#define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */

算法性能分析:

层数至少为1,所以层数恰好等于1(不执行while循环体)的概率为 1−p.

  • 层数恰好等于2的概率为 p(1−p)(执行1次while循环体)。
  • 层数恰好等于3的概率为 p^2(1−p)(执行2次while循环体)。
  • 层数恰好等于4的概率为 p^3(1−p)(执行3次while循环体)。
  • 层数恰好等于k的概率为 p^k−1(1−p)(执行k-1次while循环体)。(k <= ZSKIPLIST_MAXLEVEL)
    因此,一个节点的平均层数,或平均指针数为:

​ 1×(1−p)+2p(1−p)+3p^2(1−p)+…+kp^(k−1)(1−p)
​ =1/(1−p)

因此,
当 p = 1/2 时,每个节点的平均指针为2;
当 p = 1/4 时,每个节点的平均指针为1.33;
而redis的概率 ZSKIPLIST_P 取值就为0.25,所以跳跃表的指针开销为1.33

4. 跳跃表与哈希表和平衡树的比较

跳跃表和平衡树的元素都是有序排列,而哈希表不是有序的。因此在哈希表上的查找只能是单个key的查找,不适合做范围查找。

  • 跳跃表和平衡树做范围查找时,跳跃表算法简单,实现方便,而平衡树逻辑复杂。
  • 查找单个key,跳跃表和平衡树的平均时间复杂度都为O(logN),而哈希表的时间复杂度为O(1)。
  • 跳跃表平均每个节点包含1.33个指针,而平衡树每个节点包含2个指针,更加节约内存。

因此,在redis中实现有序集合的办法是:跳跃表+哈希表

  • 跳跃表元素有序,而且可以范围查找,且比平衡树简单。
  • 哈希表查找单个key时间复杂度性能高。

5 跳跃表基本操作

redis关于跳跃表的API都定义在t_zset.c文件中。

5.1 创建跳跃表 zslCreate()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
zskiplist *zslCreate(void) { //创建返回一个跳跃表 表头zskiplist
int j;
zskiplist *zsl;
zsl = zmalloc(sizeof(*zsl)); //分配空间
zsl->level = 1; //设置默认层数
zsl->length = 0; //设置跳跃表长度
//创建一个层数为32,分数为0,没有obj的跳跃表头节点
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
//跳跃表头节点初始化
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
zsl->header->level[j].forward = NULL; //将跳跃表头节点的所有前进指针forward设置为NULL
zsl->header->level[j].span = 0; //将跳跃表头节点的所有跨度span设置为0
}
zsl->header->backward = NULL; //跳跃表头节点的后退指针backward置为NULL
zsl->tail = NULL; //表头指向跳跃表尾节点的指针置为NULL
return zsl;
}

5.2 插入节点 zslInsert()

//创建一个节点,分数为score,对象为obj,插入到zsl表头管理的跳跃表中,并返回新节点的地址

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
redisAssert(!isnan(score));
x = zsl->header; //获取跳跃表头结点地址,从头节点开始一层一层遍历
for (i = zsl->level-1; i >= 0; i--) { //遍历头节点的每个level,从下标最大层减1到0
/* store rank that is crossed to reach the insert position */
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1]; //更新rank[i]为i+1所跨越的节点数,但是最外一层为0
//这个while循环是查找的过程,沿着x指针遍历跳跃表,满足以下条件则要继续在当层往前走
while (x->level[i].forward && //当前层的前进指针不为空且
(x->level[i].forward->score < score || //当前的要插入的score大于当前层的score或
(x->level[i].forward->score == score && //当前score等于要插入的score且
compareStringObjects(x->level[i].forward->obj,obj) < 0))) {//当前层的对象与要插入的obj不等
rank[i] += x->level[i].span; //记录该层一共跨越了多少节点 加上 上一层遍历所跨越的节点数
x = x->level[i].forward; //指向下一个节点
}
//while循环跳出时,用update[i]记录第i层所遍历到的最后一个节点,遍历到i=0时,就要在该节点后要插入节点
update[i] = x;
}
/* we assume the key is not already inside, since we allow duplicated
* scores, and the re-insertion of score and redis object should never
* happen since the caller of zslInsert() should test in the hash table
* if the element is already inside or not.
* zslInsert() 的调用者会确保同分值且同成员的元素不会出现,
* 所以这里不需要进一步进行检查,可以直接创建新元素。
*/
level = zslRandomLevel(); //获得一个随机的层数
if (level > zsl->level) { //如果大于当前所有节点最大的层数时
for (i = zsl->level; i < level; i++) {
rank[i] = 0; //将大于等于原来zsl->level层以上的rank[]设置为0
update[i] = zsl->header; //将大于等于原来zsl->level层以上update[i]指向头结点
update[i]->level[i].span = zsl->length; //update[i]已经指向头结点,将第i层的跨度设置为length
//length代表跳跃表的节点数量
}
zsl->level = level; //更新表中的最大成数值
}
x = zslCreateNode(level,score,obj); //创建一个节点
for (i = 0; i < level; i++) { //遍历每一层
x->level[i].forward = update[i]->level[i].forward; //设置新节点的前进指针为查找时(while循环)每一层最后一个节点的的前进指针
update[i]->level[i].forward = x;//再把查找时每层的最后一个节点的前进指针设置为新创建的节点地址
/* update span covered by update[i] as x is inserted here */
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]); //更新插入节点的跨度值
update[i]->level[i].span = (rank[0] - rank[i]) + 1; //更新插入节点前一个节点的跨度值
}
/* increment span for untouched levels */
for (i = level; i < zsl->level; i++) { //如果插入节点的level小于原来的zsl->level才会执行
update[i]->level[i].span++; //因为高度没有达到这些层,所以只需将查找时每层最后一个节点的值的跨度加1
}
//设置插入节点的后退指针,就是查找时最下层的最后一个节点,该节点的地址记录在update[0]中
//如果插入在第二个节点,也就是头结点后的位置就将后退指针设置为NULL
x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward) //如果x节点不是最尾部的节点
x->level[0].forward->backward = x; //就将x节点后面的节点的后退节点设置成为x地址
else
zsl->tail = x; //否则更新表头的tail指针,指向最尾部的节点x
zsl->length++; //跳跃表节点计数器加1
return x; //返回x地址
}

4.3 删除节点

//被zslDelete, zslDeleteByScore and zslDeleteByRank使用的内部函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) { //删除节点
int i;
//设置前进指针和跨度
for (i = 0; i < zsl->level; i++) { //遍历下标为0到跳跃表最大层数-1的层
if (update[i]->level[i].forward == x) { //如果找到该节点
update[i]->level[i].span += x->level[i].span - 1; //将前一个节点的跨度减1
update[i]->level[i].forward = x->level[i].forward;
//前一个节点的前进指针指向被删除的节点的后一个节点,跳过该节点
} else {
update[i]->level[i].span -= 1; //在第i层没找到,只将该层的最后一个节点的跨度减1
}
}
//设置后退指针
if (x->level[0].forward) { //如果被删除的前进节点不为空,后面还有节点
x->level[0].forward->backward = x->backward; //就将后面节点的后退指针指向被删除节点x的回退指针
} else {
zsl->tail = x->backward; //否则直接将被删除的x节点的后退节点设置为表头的tail指针
}
//更新跳跃表最大层数
while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
zsl->level--;
zsl->length--; //节点计数器减1
}

4.4 获取节点排名

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) { //查找score和o对象在跳跃表中的排位
zskiplistNode *x;
unsigned long rank = 0;
int i;
x = zsl->header; //遍历头结点的每一层
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score || //只要分值还小于给定的score或者
(x->level[i].forward->score == score && //分值相等但是对象小于给定对象o
compareStringObjects(x->level[i].forward->obj,o) <= 0))) {
rank += x->level[i].span; //更新排位值
x = x->level[i].forward; //指向下一个节点
}
/* x might be equal to zsl->header, so test if obj is non-NULL */
//确保在第i层找到分值相同,且对象相同时才会返回排位值
if (x->obj && equalStringObjects(x->obj,o)) {
return rank;
}
}
return 0; //没找到
}

4.5 区间操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec *range) { //返回第一个分数在range范围内的节点
zskiplistNode *x;
int i;
/* If everything is out of range, return early. */
if (!zslIsInRange(zsl,range)) return NULL; //如果不在范围内,则返回NULL,确保至少有一个节点符号range
//判断下限
x = zsl->header;//遍历跳跃表
for (i = zsl->level-1; i >= 0; i--) {//遍历每一层
/* Go forward while *OUT* of range. */
while (x->level[i].forward && //如果该层有下一个节点且
!zslValueGteMin(x->level[i].forward->score,range))//当前节点的score还小于(小于等于)range的min
x = x->level[i].forward; //继续指向下一个节点
}
/* This is an inner range, so the next node cannot be NULL. */
x = x->level[0].forward; //找到目标节点
redisAssert(x != NULL); //保证能找到
/* Check if score <= max. */
//判断上限
if (!zslValueLteMax(x->score,range)) return NULL; //该节点的分值如果比max还要大,就返回NULL
return x;
}
zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec *range) {//返回最后一个分数在range范围内的节点
zskiplistNode *x;
int i;
/* If everything is out of range, return early. */
if (!zslIsInRange(zsl,range)) return NULL; //如果不在范围内,则返回NULL,确保至少有一个节点符号range
//判断上限
x = zsl->header;//遍历跳跃表
for (i = zsl->level-1; i >= 0; i--) { //遍历每一层
/* Go forward while *IN* range. */
while (x->level[i].forward && //如果该层有下一个节点且
zslValueLteMax(x->level[i].forward->score,range))//当前节点的score小于(小于等于)max
x = x->level[i].forward; //继续指向下一个节点
}
/* This is an inner range, so this node cannot be NULL. */
redisAssert(x != NULL);//保证能找到
/* Check if score >= min. */
//判断下限
if (!zslValueGteMin(x->score,range)) return NULL; //如果找到的节点的分值比range的min还要小
return x;
}
HeLei Blog

Redis源码剖析和注释(三)---字典结构

发表于 2018-02-04 | 分类于 Redis

1. 介绍

字典又称为符号表(symbol table)、关联数组(associative array)或映射(map),是一种用于保存键值对(key-value pair)的抽象数据结构。例如:redis中的所有key到value的映射,就是通过字典结构维护,还有hash类型的键值。

通过redis中的命令感受一下哈希键。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
127.0.0.1:6379> HSET user name Mike
(integer) 1
127.0.0.1:6379> HSET user passwd 123456
(integer) 1
127.0.0.1:6379> HSET user sex male
(integer) 1
127.0.0.1:6379> HLEN user //user就是一个包含3个键值对的哈希键
(integer) 3
127.0.0.1:6379> HGETALL user
1) "name"
2) "Mike"
3) "passwd"
4) "123456"
5) "sex"
6) "male"

user键在底层实现就是一个字典,字典包含3个键值对。

2. 字典的实现

redis的字典是由哈希表实现的,一个哈希表有多个节点,每个节点保存一个键值对。

2.1 哈希表

redis中哈希表定义在dict.h/dictht中。

1
2
3
4
5
6
typedef struct dictht { //哈希表
dictEntry **table; //存放一个数组的地址,数组存放着哈希表节点dictEntry的地址。
unsigned long size; //哈希表table的大小,初始化大小为4
unsigned long sizemask; //用于将哈希值映射到table的位置索引。它的值总是等于(size-1)。
unsigned long used; //记录哈希表已有的节点(键值对)数量。
} dictht;

2.2 哈希表节点

哈希表的table指向的数组存放这dictEntry类型的地址。也定义在dict.h/dictEntryt中。

1
2
3
4
5
6
7
8
9
10
ypedef struct dictEntry {
void *key; //key
union {
void *val;
uint64_t u64;
int64_t s64;
double d;
} v; //value
struct dictEntry *next; //指向下一个hash节点,用来解决hash键冲突(collision)
} dictEntry;

下图展现的就是通过链接法(chaining)来解决冲突的方法。
[url01]

2.3 字典

字典结构定义在dict.h/dict中。

1
2
3
4
5
6
7
typedef struct dict {
dictType *type; //指向dictType结构,dictType结构中包含自定义的函数,这些函数使得key和value能够存储任何类型的数据。
void *privdata; //私有数据,保存着dictType结构中函数的参数。
dictht ht[2]; //两张哈希表。
long rehashidx; //rehash的标记,rehashidx==-1,表示没在进行rehash
int iterators; //正在迭代的迭代器数量
} dict;

dictType类型保存着 操作字典不同类型key和value的方法 的指针。

1
2
3
4
5
6
7
8
typedef struct dictType {
unsigned int (*hashFunction)(const void *key); //计算hash值的函数
void *(*keyDup)(void *privdata, const void *key); //复制key的函数
void *(*valDup)(void *privdata, const void *obj); //复制value的函数
int (*keyCompare)(void *privdata, const void *key1, const void *key2); //比较key的函数
void (*keyDestructor)(void *privdata, void *key); //销毁key的析构函数
void (*valDestructor)(void *privdata, void *obj); //销毁val的析构函数
} dictType;

下图展现的就是刚才命令user哈希键所展现的内部结构:
[url02]

3. 哈希算法

Thomas Wang认为好的hash函数具有两个好的特点:

  • hash函数是可逆的。
  • 具有雪崩效应,意思是,输入值1bit位的变化会造成输出值1/2的bit位发生变化

3.1 计算int整型哈希值的哈希函数

1
2
3
4
5
6
7
8
9
10
unsigned int dictIntHashFunction(unsigned int key) //用于计算int整型哈希值的哈希函数
{
key += ~(key << 15);
key ^= (key >> 10);
key += (key << 3);
key ^= (key >> 6);
key += ~(key << 11);
key ^= (key >> 16);
return key;
}

3.2 MurmurHash2哈希算法

当字典被用作数据库的底层实现,或者哈希键的底层实现时,redis用MurmurHash2算法来计算哈希值,能产生32-bit或64-bit哈希值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
unsigned int dictGenHashFunction(const void *key, int len) { //用于计算字符串的哈希值的哈希函数
/* 'm' and 'r' are mixing constants generated offline.
They're not really 'magic', they just happen to work well. */
//m和r这两个值用于计算哈希值,只是因为效果好。
uint32_t seed = dict_hash_function_seed;
const uint32_t m = 0x5bd1e995;
const int r = 24;
/* Initialize the hash to a 'random' value */
uint32_t h = seed ^ len; //初始化
/* Mix 4 bytes at a time into the hash */
const unsigned char *data = (const unsigned char *)key;
//将字符串key每四个一组看成uint32_t类型,进行运算的到h
while(len >= 4) {
uint32_t k = *(uint32_t*)data;
k *= m;
k ^= k >> r;
k *= m;
h *= m;
h ^= k;
data += 4;
len -= 4;
}
/* Handle the last few bytes of the input array */
switch(len) {
case 3: h ^= data[2] << 16;
case 2: h ^= data[1] << 8;
case 1: h ^= data[0]; h *= m;
};
/* Do a few final mixes of the hash to ensure the last few
* bytes are well-incorporated. */
h ^= h >> 13;
h *= m;
h ^= h >> 15;
return (unsigned int)h;
}

3.3 djb哈希算法

djb哈希算法,算法的思想是利用字符串中的ascii码值与一个随机seed,通过len次变换,得到最后的hash值。

1
2
3
4
5
6
7
unsigned int dictGenCaseHashFunction(const unsigned char *buf, int len) { //用于计算字符串的哈希值的哈希函数
unsigned int hash = (unsigned int)dict_hash_function_seed;
while (len--)
hash = ((hash << 5) + hash) + (tolower(*buf++)); /* hash * 33 + c */
return hash;
}

4. rehash

当哈希表的大小不能满足需求,就可能会有两个或者以上数量的键被分配到了哈希表数组上的同一个索引上,于是就发生冲突(collision),在Redis中解决冲突的办法是链接法(separate chaining)。但是需要尽可能避免冲突,希望哈希表的负载因子(load factor),维持在一个合理的范围之内,就需要对哈希表进行扩展或收缩。

Redis对哈希表的rehash操作步骤如下:

  • 扩展或收缩
    • 扩展:ht[1]的大小为第一个大于等于ht[0].used * 2的 2n 。
    • 收缩:ht[1]的大小为第一个大于等于ht[0].used的 2n 。
  • 将所有的ht[0]上的节点rehash到ht[1]上。
  • 释放ht[0],将ht[1]设置为第0号表,并创建新的ht[1]。

源码:

  • 扩展操作

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    static int _dictExpandIfNeeded(dict *d) //扩展d字典,并初始化
    {
    /* Incremental rehashing already in progress. Return. */
    if (dictIsRehashing(d)) return DICT_OK; //正在进行rehash,直接返回
    /* If the hash table is empty expand it to the initial size. */
    if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE); //如果字典(的 0 号哈希表)为空,那么创建并返回初始化大小的 0 号哈希表
    /* If we reached the 1:1 ratio, and we are allowed to resize the hash
    * table (global setting) or we should avoid it but the ratio between
    * elements/buckets is over the "safe" threshold, we resize doubling
    * the number of buckets. */
    //1. 字典已使用节点数和字典大小之间的比率接近 1:1
    //2. 能够扩展的标志为真
    //3. 已使用节点数和字典大小之间的比率超过 dict_force_resize_ratio
    if (d->ht[0].used >= d->ht[0].size && (dict_can_resize ||
    d->ht[0].used/d->ht[0].size > dict_force_resize_ratio))
    {
    return dictExpand(d, d->ht[0].used*2); //扩展为节点个数的2倍
    }
    return DICT_OK;
    }
  • 收缩操作:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    int dictResize(dict *d) //缩小字典d
    {
    int minimal;
    //如果dict_can_resize被设置成0,表示不能进行rehash,或正在进行rehash,返回出错标志DICT_ERR
    if (!dict_can_resize || dictIsRehashing(d)) return DICT_ERR;
    minimal = d->ht[0].used; //获得已经有的节点数量作为最小限度minimal
    if (minimal < DICT_HT_INITIAL_SIZE)//但是minimal不能小于最低值DICT_HT_INITIAL_SIZE(4)
    minimal = DICT_HT_INITIAL_SIZE;
    return dictExpand(d, minimal); //用minimal调整字典d的大小
    }

扩展和收缩操作都调用了dictExpand()函数,该函数通过计算传入的第二个大小参数进行计算,算出一个最接近2n的realsize,然后进行扩展或收缩,dictExpand()函数源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
int dictExpand(dict *d, unsigned long size) //根据size调整或创建字典d的哈希表
{
dictht n; /* the new hash table */
unsigned long realsize = _dictNextPower(size); //获得一个最接近2^n的realsize
/* the size is invalid if it is smaller than the number of
* elements already inside the hash table */
if (dictIsRehashing(d) || d->ht[0].used > size) //正在rehash或size不够大返回出错标志
return DICT_ERR;
/* Rehashing to the same table size is not useful. */
if (realsize == d->ht[0].size) return DICT_ERR; //如果新的realsize和原本的size一样则返回出错标志
/* Allocate the new hash table and initialize all pointers to NULL */
//初始化新的哈希表的成员
n.size = realsize;
n.sizemask = realsize-1;
n.table = zcalloc(realsize*sizeof(dictEntry*));
n.used = 0;
/* Is this the first initialization? If so it's not really a rehashing
* we just set the first hash table so that it can accept keys. */
if (d->ht[0].table == NULL) { //如果ht[0]哈希表为空,则将新的哈希表n设置为ht[0]
d->ht[0] = n;
return DICT_OK;
}
/* Prepare a second hash table for incremental rehashing */
d->ht[1] = n; //如果ht[0]非空,则需要rehash
d->rehashidx = 0; //设置rehash标志位为0,开始渐进式rehash(incremental rehashing)
return DICT_OK;
}

收缩或者扩展哈希表需要将ht[0]表中的所有键全部rehash到ht[1]中,但是rehash操作不是一次性、集中式完成的,而是分多次,渐进式,断续进行的,这样才不会对服务器性能造成影响。因此下面介绍渐进式rehash。

5. 渐进式rehash(incremental rehashing)

渐进式rehash的关键:

  1. 字典结构dict中的一个成员rehashidx,当rehashidx为-1时表示不进行rehash,当rehashidx值为0时,表示开始进行rehash。
  2. 在rehash期间,每次对字典的添加、删除、查找、或更新操作时,都会判断是否正在进行rehash操作,如果是,则顺带进行单步rehash,并将rehashidx+1。
  3. 当rehash时进行完成时,将rehashidx置为-1,表示完成rehash。

源码在此:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
static void _dictRehashStep(dict *d) { //单步rehash
if (d->iterators == 0) dictRehash(d,1); //当迭代器数量不为0,才能进行1步rehash
}
int dictRehash(dict *d, int n) { //n步进行rehash
int empty_visits = n*10; /* Max number of empty buckets to visit. */
if (!dictIsRehashing(d)) return 0; //只有rehashidx不等于-1时,才表示正在进行rehash,否则返回0
while(n-- && d->ht[0].used != 0) { //分n步,而且ht[0]上还有没有移动的节点
dictEntry *de, *nextde;
/* Note that rehashidx can't overflow as we are sure there are more
* elements because ht[0].used != 0 */
//确保rehashidx没有越界,因为rehashidx是从-1开始,0表示已经移动1个节点,它总是小于hash表的size的
assert(d->ht[0].size > (unsigned long)d->rehashidx);
//第一个循环用来更新 rehashidx 的值,因为有些桶为空,所以 rehashidx并非每次都比原来前进一个位置,而是有可能前进几个位置,但最多不超过 10。
//将rehashidx移动到ht[0]有节点的下标,也就是table[d->rehashidx]非空
while(d->ht[0].table[d->rehashidx] == NULL) {
d->rehashidx++;
if (--empty_visits == 0) return 1;
}
de = d->ht[0].table[d->rehashidx]; //ht[0]下标为rehashidx有节点,得到该节点的地址
/* Move all the keys in this bucket from the old to the new hash HT */
//第二个循环用来将ht[0]表中每次找到的非空桶中的链表(或者就是单个节点)拷贝到ht[1]中
while(de) {
unsigned int h;
nextde = de->next; //备份下一个节点的地址
/* Get the index in the new hash table */
h = dictHashKey(d, de->key) & d->ht[1].sizemask; //获得计算哈希值并得到哈希表中的下标h
//将该节点插入到下标为h的位置
de->next = d->ht[1].table[h];
d->ht[1].table[h] = de;
//更新两个表节点数目计数器
d->ht[0].used--;
d->ht[1].used++;
//将de指向以一个处理的节点
de = nextde;
}
d->ht[0].table[d->rehashidx] = NULL; //迁移过后将该下标的指针置为空
d->rehashidx++; //更新rehashidx
}
/* Check if we already rehashed the whole table... */
if (d->ht[0].used == 0) { //ht[0]上已经没有节点了,说明已经迁移完成
zfree(d->ht[0].table); //释放hash表内存
d->ht[0] = d->ht[1]; //将迁移过的1号哈希表设置为0号哈希表
_dictReset(&d->ht[1]); //重置ht[1]哈希表
d->rehashidx = -1; //rehash标志关闭
return 0; //表示前已完成
}
/* More to rehash... */
return 1; //表示还有节点等待迁移
}

6. 迭代器

redis在字典结构也定义了迭代器

1
2
3
4
5
6
7
8
typedef struct dictIterator {
dict *d; //被迭代的字典
long index; //迭代器当前所指向的哈希表索引位置
int table, safe; //table表示正迭代的哈希表号码,ht[0]或ht[1]。safe表示这个迭代器是否安全。
dictEntry *entry, *nextEntry; //entry指向当前迭代的哈希表节点,nextEntry则指向当前节点的下一个节点。
/* unsafe iterator fingerprint for misuse detection. */
long long fingerprint; //避免不安全迭代器的指纹标记
} dictIterator;

迭代器分为安全迭代器和不安全迭代器:

非安全迭代器只能进行Get等读的操作, 而安全迭代器则可以进行iterator支持的任何操作。
由于dict结构中保存了safe iterators的数量,如果数量不为0, 是不能进行下一步的rehash的; 因此安全迭代器的存在保证了遍历数据的准确性。
在非安全迭代器的迭代过程中, 会通过fingerprint方法来校验iterator在初始化与释放时字典的hash值是否一致; 如果不一致说明迭代过程中发生了非法操作.

1…345…8
He Lei

He Lei

c/c++/python | redis | recommend algorithm | search engine

75 日志
16 分类
3 标签
GitHub Weibo
© 2018 He Lei
由 Hexo 强力驱动
主题 - NexT.Pisces