澳门金沙vip 1

详解Redis用链表实现消息队列

前言

原创作品,转载请标明:

列表类型简介

列表(list)用于存储多个有序的字符串。列表是一种比较灵活的数据结构,可以充当栈和队列的角色,在实际开发上有很多应用场景

列表的特点:

  1. 列表中的元素是有序的,可以通过索引下标来获取某个元素或者某个范围内的元素列表
  2. 列表中的元素是可以重复的

Redis链表经常会被用于消息队列的服务,以完成多程序之间的消息交换。个人认为redis消息队列有一个好处,就是可以实现分布式和共享,就和memcache作为mysql的缓存和mysql自带的缓存一样。

今天为大家带来Redis五大数据类型之一 – List的源码分析。

命令

链表实现消息队列


添加操作

rpush key value [value …] 从右边插入元素

127.0.0.1:6379> rpush testlist a b c
(integer) 3
127.0.0.1:6379> lrange testlist 0 -1
1) "a"
2) "b"
3) "c"

lpush key value [value …] 从左边插入元素

linsert key before|after pivot value 向某个元素前或后插入元素

从列表中找到等于pivot的元素,在其前|后插入value

127.0.0.1:6379> linsert testlist before b python
(integer) 4
127.0.0.1:6379> lrange testlist 0 -1
1) "a"
2) "python"
3) "b"
4) "c"

Redis链表支持前后插入以及前后取出,所以如果往尾部插入元素,往头部取出元素,这就是一种消息队列,也可以说是消费者/生产者模型。可以利用lpush和rpop来实现。但是有一个问题,如果链表中没有数据,那么消费者将要在while循环中调用rpop,这样以来就浪费cpu资源,好在Redis提供一种阻塞版pop命令brpop或者blpop,用法为brpop/blpop list timeout,
当链表为空的时候,brpop/blpop将阻塞,直到设置超时时间到或者list插入一个元素。

Redis中的List类型是一种双向链表结构,主要支持以下几种命令:

查找

lrange key start end 获取指定范围内的元素列表
lrange会获取列表指定索引范围所有的元素
索引下标有两个特点:

  1. 索引下标从左到右分别是0到N-1,从右到左分别是-1到-N
  2. lrange中的end选项包含了自身

获取列表第2到第4个元素:

127.0.0.1:6379> lrange testlist 1 3
1) "python"
2) "b"
3) "c"

lindex key index 获取列表指定索引下标的元素

127.0.0.1:6379> lindex testlist 1
"python"

llen key 获取列表长度
(integer) 4

用法如下:

  1. lpush、rpush、lpushx、rpushx
  2. lpop、rpop、lrange、ltrim、lrem、rpoplpush
  3. linsert、llen、lindex、lset
  4. 澳门金沙vip,blpop、brpop、brpoplpush

删除

lpop key 从列表左侧弹出元素

127.0.0.1:6379> lpop testlist
"a"

rpop key 从列表右侧弹出元素

lrem key count value 删除指定元素
lrem从从列表中找到等于value的元素进行删除,根据count的不同分为三种情况:

  1. count>0 从左到右,删除最多count个元素
  2. count<0 从右到左,删除最多count绝对值个元素
  3. count=0 删除所有

127.0.0.1:6379> lpush testlist a a a a
(integer) 7
127.0.0.1:6379> lrange testlist 0 -1
1) "a"
2) "a"
3) "a"
4) "a"
5) "python"
6) "b"
7) "c"
127.0.0.1:6379> lrem testlist 4 a
(integer) 4
127.0.0.1:6379> lrange testlist 0 -1
1) "python"
2) "b"
3) "c"

ltrim key start end 按照索引范围修剪列表

127.0.0.1:6379> del testlist
(integer) 1
127.0.0.1:6379> lpush testlist a b c d e f g
(integer) 7
127.0.0.1:6379> ltrim testlist 1 3
OK
127.0.0.1:6379> lrange testlist 0 -1
1) "f"
2) "e"
3) "d"
charles@charles-Aspire-4741:~/mydir/mylib/redis$ ./src/redis-cli127.0.0.1:6379 lpush list hello(integer) 1127.0.0.1:6379 brpop list 01) "list"2) "hello"127.0.0.1:6379 brpop list 0//阻塞在这里/* ---------------------------------------------------- *///当我在另一个客户端lpush一个元素之后,客户端输出为127.0.0.1:6379 brpop list 01) "list"2) "world"(50.60s)//阻塞的时间

List的相关操作主要定义在t_list.c和redis.h文件中。归纳起来,主要有以下几个要点:

修改

lset key index newValue 修改指定索引下标的元素

127.0.0.1:6379> lindex testlist 2
"d"
127.0.0.1:6379> lset testlist 2 python
OK
127.0.0.1:6379> lindex testlist 2
"python"

当链表为空的时候,brpop是阻塞的,等待超时时间到或者另一个客户端lpush一个元素。接下来,看下源码是如何实现阻塞brpop命令的。要实现客户端阻塞,只需要服务器不给客户端发送消息,那么客户端就会阻塞在read调用中,等待消息到达。这是很好实现的,关键是如何判断这个客户端阻塞的链表有数据到达以及通知客户端解除阻塞?Redis的做法是,将阻塞的键以及阻塞在这个键上的客户端链表存储在一个字典中,然后每当向数据库插入一个链表时,就判断这个新插入的链表是否有客户端阻塞,有的话,就解除这个阻塞的客户端,并且发送刚插入链表元素给客户端,客户端就这样解除阻塞。

1、编码方式

在前面一篇文章中我们介绍过List类型主要有两种编码方式:REDIS_ENCODING_ZIPLIST和REDIS_ENCODING_LINKEDLIST。其中REDIS_ENCODING_ZIPLIST编码使用的是压缩列表ziplist,REDIS_ENCODING_LINKEDLIST编码使用的是双向链表list(为了便于区分,我们把它称之为linked
list
)。默认情况下List使用REDIS_ENCODING_ZIPLIST编码,当满足下面两个条件之一时会转变为REDIS_ENCODING_LINKEDLIST编码:

  1. 当待添加的新字符串长度超过server.list_max_ziplist_value
    (默认值为64)时。
  2. ziplist中保存的节点数量超过server.list_max_ziplist_entries(默认值为512)时。

既然List类型有两种底层结构,那么显然t_list.c的主要功能之一就是要在ziplist和linked
list这两种结构上维护一份统一的List操作接口,以屏蔽底层的差异。

例如我们来看一下listTypePush的源码:

void listTypePush(robj *subject, robj *value, int where) {
    /* Check if we need to convert the ziplist */
    // 检查是否需要转换编码(REDIS_ENCODING_ZIPLIST => REDIS_ENCODING_LINKEDLIST)
    listTypeTryConversion(subject,value);
    if (subject->encoding == REDIS_ENCODING_ZIPLIST &&
        // list_max_ziplist_entries的默认值为512,如果ziplist中存放的节点数超过该值也需要转换编码
        ziplistLen(subject->ptr) >= server.list_max_ziplist_entries)
            listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);

    // 分别处理以ziplist和linked list编码的情况
    if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
        /* 处理底层结构为ziplist的情况 */

        // 确定新元素是插入到头部还是尾部
        int pos = (where == REDIS_HEAD) ? ZIPLIST_HEAD : ZIPLIST_TAIL;
        value = getDecodedObject(value);
        // 直接调用ziplist的内部函数实现插入操作
        subject->ptr = ziplistPush(subject->ptr,value->ptr,sdslen(value->ptr),pos);
        decrRefCount(value);
    } else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) {
        /* 下面处理底层结构为linked list的情况 */
        if (where == REDIS_HEAD) {
            listAddNodeHead(subject->ptr,value);
        } else {
            listAddNodeTail(subject->ptr,value);
        }
        incrRefCount(value);
    } else {
        redisPanic("Unknown list encoding");
    }
}

listTypePush函数的作用是往List类型对象中添加一个元素,其中参数where用于指定添加到表头还是表尾。listTypePush的执行流程如下:

  1. 判断新添加的值的长度是否超过server.list_max_ziplist_value,如果超过则需要转换编码方式。
  2. 如果List的当前编码为REDIS_ENCODING_ZIPLIST方式,判断其保存的节点数量是否超过server.list_max_ziplist_entries,如果超过则需要编码转换。
  3. 判断List的当前编码,如果是REDIS_ENCODING_ZIPLIST,则调用ziplist的内部函数来实现添加操作。如果是REDIS_ENCODING_LINKEDLIST则调用linked
    list的内部函数实现添加操作。

我们可以看到List的操作基本上就是通过当前使用的底层数据结构来完成的,这些数据结构的基本操作我们以前就分析过,这里就不一一赘述了。

除了上面介绍的listTypePush操作,List还有listTypePop、listTypeLength、listTypeInsert、listTypeEqual、listTypeDelete、listTypeConvert等操作,这些操作的实现和listTypePush类似都是通过底层数据结构来实现,代码简单、直观,大家可以类比学习。

阻塞操作

blpop key [key …] timeout
brpop key [key …] timeout

timeout参数:阻塞时间(秒)

blpop 和 brpop 是 lpop 和 rpop 的阻塞版本

  1. 列表为空,如果timeout=3,那么客户端要等到3秒后返回,如果timeout=0,
    那么客户端一直阻塞等下去:

127.0.0.1:6379> brpop list:test 3
(nil)
(3.07s)
127.0.0.1:6379> brpop list:test 0
... 

因为列表为空,brpop会一直阻塞着等待元素的进入

此时新开一个redis-cli,新增元素入列表

127.0.0.1:6379> lpush list:test 1
(integer) 1

brpop立即返回元素:

127.0.0.1:6379> brpop list:test 0
1) "list:test"
2) "1"
(59.07s)
  1. 列表不为空,客户端立即返回结果

127.0.0.1:6379> lpush list:test 2
(integer) 1
127.0.0.1:6379> brpop list:test 1
1) "list:test"
2) "2"

注意:

  1. 如果有多个键,那么brpop会从左到右遍历键,一旦有一个键能弹出元素,客户端会立即返回
  2. 如果多个客户端对同一个键执行brpop,那么最先执行brpop命令的客户端可以获取到弹出的值

先看下有关数据结构,以及server和client有关属性

2、迭代器实现

Redis为List类型封装了一个简单的迭代器结构体,定义在redis.h文件中:

/* List类型迭代器结构体 */
typedef struct {
    // 原listType对象
    robj *subject;
    // 编码方式
    unsigned char encoding;
    // 迭代方向
    unsigned char direction; /* Iteration direction */
    // ziplist迭代器
    unsigned char *zi;
    // linked list迭代器
    listNode *ln;
} listTypeIterator;

同时还定义了迭代器节点:

/* List类型节点定义 */
typedef struct {
    listTypeIterator *li;
    unsigned char *zi;  /* Entry in ziplist */
    listNode *ln;       /* Entry in linked list */
} listTypeEntry;

实际上,这listTypeIterator就是将ziplist和linke
list的迭代器包装在一起来进一步屏蔽着两种编码方式的区别。与迭代器相关的操作主要有以下几个:

// 创建并返回一个列表迭代器
listTypeIterator *listTypeInitIterator(robj *subject, long index, unsigned char direction);
// 释放listType的迭代器
void listTypeReleaseIterator(listTypeIterator *li);
// 迭代到下一个节点
int listTypeNext(listTypeIterator *li, listTypeEntry *entry);
// 返回当前listTypeEntry结构所保存的节点
robj *listTypeGet(listTypeEntry *entry);

时间复杂度

列表的时间复杂度

//阻塞状态typedef struct blockingState { /* Generic fields. */ mstime_t timeout; /* 超时时间 */ /* REDIS_BLOCK_LIST */ dict *keys; /* The keys we are waiting to terminate a blocking * operation such as BLPOP. Otherwise NULL. */ robj *target; /* The key that should receive the element, * for BRPOPLPUSH. */ /* REDIS_BLOCK_WAIT */ int numreplicas; /* Number of replicas we are waiting for ACK. */ long long reploffset; /* Replication offset to reach. */} blockingState;//继续列表typedef struct readyList { redisDb *db;//就绪键所在的数据库 robj *key;//就绪键} readyList;//客户端有关属性typedef struct redisClient { int btype; /* Type of blocking op if REDIS_BLOCKED. */ blockingState bpop; /* blocking state */}//服务器有关属性struct redisServer { /* Blocked clients */ unsigned int bpop_blocked_clients; /* Number of clients blocked by lists */ list *unblocked_clients; /* list of clients to unblock before next loop */ list *ready_keys; /* List of readyList structures for BLPOP & co */}//数据库有关属性typedef struct redisDb { //keys-redisCLient映射 dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */ dict *ready_keys; /* Blocked keys that received a PUSH */}redisDB

3、阻塞操作

这是需要重点理解的地方!

Redis中有三个阻塞命令blpop、brpop和brpoplpush,这些命令可能会造成客户端被阻塞。接下来我们以blpop命令为例子讲解一下阻塞版的lpop命令是如何运行的。

(1)、如果用户执行BLPOP命令,且指定List不为空,那么程序就直接调用非阻塞的LPOP命令(所以blpop、brpop和brpoplpush只是有可能造成客户端阻塞)。
(2)、如果用户执行BLPOP命令,且指定List为空,这时需要阻塞操作。Redis将相应客户端的状态设置为“阻塞”状态,同时将该客户端添加到db->blocking_keys中。db->blocking_keys是一个字典结构,它的key为被阻塞的键,它的value是一个保存被阻塞客户端的列表。我们暂且把该过程称之为“阻塞过程”。
(3)、随后如果有PUSH命令往被阻塞的键中添加元素时,Redis将这个键标识为ready状态。当这个命令执行完毕后,Redis会按照先阻塞先服务的顺序将列表的元素返回给被阻塞的客户端,并且解除阻塞状态的客户端数量取决于PUSH命令添加的元素个数。我们暂且把该过程称作为“解除阻塞过程”。

下面我们详细讲解一下“阻塞过程”和“解除阻塞过程”的运行过程。

应用场景

必须对上述的数据结构足够了解,否则很难看懂下面的代码,因为这些代码需要操作上述的数据结构。先从brpop命令执行函数开始分析,brpop命令执行函数为

3.1、阻塞过程

阻塞操作是由blockForKeys函数完成的,函数原型如下:

void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj *target)

blockForKeys函数用于设置客户端对指定键的阻塞状态。参数keys可以指定任意数量的键,timeout指定超时时间,参数target即目标List对象,主要用于brpoplpush命令,用户存放从源列表中pop出来的值。
该函数完成了以下步骤:

(1)、设置阻塞超时时间timeout和目标选项target。
(2)、将客户端信息记录在在c->db->blocking_keys结构中。前面我们说过b->blocking_keys是一个字典结构,它的key为被阻塞的键,它的value是一个保存被阻塞客户端的列表。我们看到blocking_keys定义在redisClient->redisDb结构中,为了方便观察,我省略了其它无关代码:

typedef struct redisDb {
    ...
    dict *blocking_keys;        /* Keys with clients waiting for data (BLPOP) */
    ...
} redisDb;

所以整个结构是这样的:

澳门金沙vip 1

(3)、将客户端设置为“阻塞”状态。

blockForKeys的源码如下:

/* Set a client in blocking mode for the specified key, with the specified
 * timeout */
/* 设置客户端对指定键的阻塞状态。参数keys可以指定任意数量的键,timeout指定超时时间,参数target即目标listType对象,
    主要用于brpoplpush命令,用户存放从源列表中pop出来的值。 */
void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj *target) {
    dictEntry *de;
    list *l;
    int j;

    // 设置阻塞超时时间 
    c->bpop.timeout = timeout;
    // 设置目标选项,主要用于brpoplpush命令
    c->bpop.target = target;

    // target之拥入rpoplpush命令
    if (target != NULL) incrRefCount(target);

    // 在c->db->blocking_keys添加阻塞客户端和键的映射关系
    for (j = 0; j < numkeys; j++) {
        /* If the key already exists in the dict ignore it. */
        // bpop.keys记录所有阻塞的键
        if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue;
        incrRefCount(keys[j]);

        /* And in the other "side", to map keys -> clients */
        // 维护阻塞键和被阻塞客户端的映射关系
        de = dictFind(c->db->blocking_keys,keys[j]);
        if (de == NULL) {
            int retval;

            /* For every key we take a list of clients blocked for it */
            // 如果该键对应的被阻塞客户端列表不存在,则创建一个
            l = listCreate();
            retval = dictAdd(c->db->blocking_keys,keys[j],l);
            incrRefCount(keys[j]);
            redisAssertWithInfo(c,keys[j],retval == DICT_OK);
        } else {
            l = dictGetVal(de);
        }
        // 并把当前被阻塞客户端阻塞列表中
        listAddNodeTail(l,c);
    }

    /* Mark the client as a blocked client */
    // 将客户端设置为“阻塞”状态
    c->flags |= REDIS_BLOCKED;
    server.bpop_blocked_clients++;
}

消息队列

redis的lpush+brpop命令组合即可实现阻塞队列,生产者客户端使用lrpush从列表左侧插入元素,多个消费者客户端使用brpop命令阻塞式的抢列表尾部的元素,多个客户端保证了消费的负载均衡和高可用性

列表的使用场景很多,以下是命令组合口诀:
lpush + lpop = stack(栈)
lpush + rpop = queue(队列)
lpush + ltrim = capped collection(有限集合)
lpush + brpop = message queue(消息队列)

void brpopCommand(redisClient *c) { blockingPopGenericCommand(c,REDIS_TAIL);}//++++++++++++++++++++++++++++++++++++++++++++++++++void blockingPopGenericCommand(redisClient *c, int where) { robj *o; mstime_t timeout; int j; if (getTimeoutFromObjectOrReply(c,c-argv[c-argc-1],&timeout,UNIT_SECONDS) != REDIS_OK) return;//将超时时间保存在timeout中 for (j = 1; j  c-argc-1; j++) { o = lookupKeyWrite(c-db,c-argv[j]);//在数据库中查找操作的链表 if (o != NULL) {//如果不为空 if (o-type != REDIS_LIST) {//不是链表类型 addReply(c,shared.wrongtypeerr);//报错 return; } else { if (listTypeLength(o) != 0) {//链表不为空 /* Non empty list, this is like a non normal [LR]POP. */ char *event = (where == REDIS_HEAD)  "lpop" : "rpop"; robj *value = listTypePop(o,where);//从链表中pop出一个元素 redisAssert(value != NULL); //给客户端发送pop出来的元素信息 addReplyMultiBulkLen(c,2); addReplyBulk(c,c-argv[j]); addReplyBulk(c,value); decrRefCount(value); notifyKeyspaceEvent(REDIS_NOTIFY_LIST,event, c-argv[j],c-db-id); if (listTypeLength(o) == 0) {//如果链表为空,从数据库删除链表 dbDelete(c-db,c-argv[j]); notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del", c-argv[j],c-db-id); } /* 省略一部分 */ } } } } /* 如果链表为空,则阻塞客户端 */ blockForKeys(c, c-argv + 1, c-argc - 2, timeout, NULL);}

3.2、解除阻塞过程

List的阻塞解除过程如下:

(1)、
如果有其它客户端执行命令往该key(即List)添加新值,先在blocking_keys中检查是否有客户端因该key而被阻塞,如果有则调用signalListAsReady为该key创建一个readyList结构并放入server.ready_keys链表中,同时也将该key添加到db->ready_keys中。db->ready_keys是一个哈希表,它的value为NULL。这个server.ready_keys列表最后会handleClientsBlockedOnLists函数处理。

这里有一个注意点:为什么要用一个链表和一个哈希表来存储同一个key?如果往一个key中添加了多个新值,Redis只需要往server.ready_keys为该key保存一个相关的readyList节点即可,这样可以避免在一个事务或脚本中将同一个key一次又一次地添加到server.ready_keys列表中。为了不重复添加,每次执行添加查找前需要进行一次“查重”操作,但是server.ready_keys是一个链表,在其中进行查找操作时间复杂度为O(n),效率比较差。为解决这个问题Redis引入了db->ready_keys哈希表结构来保存同一个key,哈希表的查找查找效率高,所以每次往server.ready_keys添加节点时候只要在db->ready_keys检查一下就知道server.ready_keys有没有相同的节点了。

下面我们来看看signalListAsReady函数涉及到的结构体:

readyList定义在redis.h文件中:

typedef struct readyList {
    // key所在的数据库
    redisDb *db;
    // 造成阻塞的键
    robj *key;
} readyList;

readyList结构表示server.ready_keys链表中的一个节点,其中key字段表示阻塞的key,db指向该键所在的数据库。

db->ready_keys定义在redisDb结构体中,用于存放已经准备好数据的阻塞状态的key:

typedef struct redisDb {
    ...
    dict *ready_keys;           /* Blocked keys that received a PUSH */
    ...
} redisDb;

signalListAsReady函数的源码如下:

void signalListAsReady(redisDb *db, robj *key) {
    // readyList定义在redis.h中,表示server.ready_keys的一个节点
    readyList *rl;

    /* No clients blocking for this key? No need to queue it. */
    // 如果没有客户端因这个key而被阻塞,则直接返回
    if (dictFind(db->blocking_keys,key) == NULL) return;

    /* Key was already signaled? No need to queue it again. */
    // 如果这个key已经添加到ready_keys,为避免重复添加直接返回
    if (dictFind(db->ready_keys,key) != NULL) return;

    /* Ok, we need to queue this key into server.ready_keys. */
    // 创建一个readyList结构,然后添加到server.ready_keys尾部
    rl = zmalloc(sizeof(*rl));
    rl->key = key;
    rl->db = db;
    incrRefCount(key);
    listAddNodeTail(server.ready_keys,rl);

    /* We also add the key in the db->ready_keys dictionary in order
     * to avoid adding it multiple times into a list with a simple O(1)
     * check. */
    incrRefCount(key);
    // 将key添加到db->ready_keys中,避免重复添加
    redisAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK);
}

到目前为止,Redis只是收集好了已经准备好数据的处于阻塞状态的key信息,接下来才是真正解除客户端阻塞状态的操作。

(2)、调用handleClientsBlockedOnLists函数,该函数将遍历server.ready_keys中已经准备好数据的key,同时遍历阻塞在该key上的所有客户端(直接从c->db->blocking_keys地点中获取客户端列表)。如果key不为空则从key中弹出一个元素返回给客户端并解除客户端的阻塞状态直到该key为空或没有客户端因为该key而阻塞为止。

handleClientsBlockedOnLists函数的源码如下,代码也很简单。

void handleClientsBlockedOnLists(void) {
    // 遍历server.ready_keys列表
    while(listLength(server.ready_keys) != 0) {
        list *l;

        /* Point server.ready_keys to a fresh list and save the current one
         * locally. This way as we run the old list we are free to call
         * signalListAsReady() that may push new elements in server.ready_keys
         * when handling clients blocked into BRPOPLPUSH. */
        // 备份server.ready_keys,然后再给服务器创建一个新列表。接下来的操作都在备份server.ready_keys上进行
        l = server.ready_keys;
        server.ready_keys = listCreate();

        while(listLength(l) != 0) {
            // 取出server.ready_keys的第一个节点
            listNode *ln = listFirst(l);
            readyList *rl = ln->value;

            /* First of all remove this key from db->ready_keys so that
             * we can safely call signalListAsReady() against this key. */
            // 从db->ready_keys删除就绪的key
            dictDelete(rl->db->ready_keys,rl->key);

            /* If the key exists and it's a list, serve blocked clients
             * with data. */
            // 获取listType对象
            robj *o = lookupKeyWrite(rl->db,rl->key);
            if (o != NULL && o->type == REDIS_LIST) {
                dictEntry *de;

                /* We serve clients in the same order they blocked for
                 * this key, from the first blocked to the last. */
                // 取出所有被这个key阻塞的客户端列表
                de = dictFind(rl->db->blocking_keys,rl->key);
                if (de) {
                    list *clients = dictGetVal(de);
                    int numclients = listLength(clients);

                    while(numclients--) {
                        // 取出一个客户端
                        listNode *clientnode = listFirst(clients);
                        redisClient *receiver = clientnode->value;
                        // 设置pop出的目标对象
                        robj *dstkey = receiver->bpop.target;
                        // 从列表中弹出对象
                        int where = (receiver->lastcmd &&
                                     receiver->lastcmd->proc == blpopCommand) ?
                                    REDIS_HEAD : REDIS_TAIL;
                        robj *value = listTypePop(o,where);

                        // 如果listType还有元素,返回给相应客户端
                        if (value) {
                            /* Protect receiver->bpop.target, that will be
                             * freed by the next unblockClientWaitingData()
                             * call. */
                            if (dstkey) incrRefCount(dstkey);
                            // 解除相应客户端的阻塞状态
                            unblockClientWaitingData(receiver);

                            // 将pop出来的值返回给相应的客户端receiver
                            if (serveClientBlockedOnList(receiver,
                                rl->key,dstkey,rl->db,value,
                                where) == REDIS_ERR)
                            {
                                /* If we failed serving the client we need
                                 * to also undo the POP operation. */
                                // 如果操作失败,则回滚(插入原listType对象)
                                    listTypePush(o,value,where);
                            }

                            if (dstkey) decrRefCount(dstkey);
                            decrRefCount(value);
                        } else {
                            // 如果listType中没有元素了,没有元素可以返回剩余被阻塞客户端,只能等待以后的push操作
                            break;
                        }
                    }
                }

                // 如果列表元素已经为空,则删除之
                if (listTypeLength(o) == 0) dbDelete(rl->db,rl->key);
                /* We don't call signalModifiedKey() as it was already called
                 * when an element was pushed on the list. */
            }

            /* Free this item. */
            // 资源释放
            decrRefCount(rl->key);
            zfree(rl);
            listDelNode(l,ln);
        }
        listRelease(l); /* We have the new list on place at this point. */
    }
}

从上面的分析中我们可以看出List是按照“先阻塞先服务”的策略来处理阻塞解除的。

另外,客户端阻塞状态的解除还可能是由阻塞超时引起的。这个过程很简单,只要遍历一遍处于阻塞状态的客户端,对超时的客户端撤销其阻塞状态并返回一个空回复即可。


List的源码就分析到这里。按照惯例,最后提供一份注释的代码供大家参考:传送门。

从源码可以看出,brpop可以操作多个链表变量,例如brpop list1 list2 0,但是只能输出第一个有元素的链表。如果list1没有元素,而list2有元素,则输出list2的元素;如果两个都有元素,则输出list1的元素;如果都没有元素,则等待其中某个链表插入一个元素,之后在2返回。最后调用blockForyKeys阻塞