Redis Pub/Sub: Channels, Patterns, and Keyspace Notifications
How Redis routes messages from publishers to subscribers using channel dicts, glob patterns, and automatic keyspace events
What you will learn
- How Redis represents active subscriptions as two parallel data structures: a per-channel dict of subscribers and a flat list of pattern entries
- How `pubsubSubscribeChannel()` updates both the client-side and server-side indexes in a single call, and why both must stay in sync
- How `pubsubPublishMessageInternal()` iterates the subscriber dict and pattern list to deliver a message to every matching client in one pass
- How `PSUBSCRIBE` stores a glob pattern and why the publish path must call `stringmatchlen()` against every registered pattern on each publish
- How Redis 7 sharded Pub/Sub (`SSUBSCRIBE`/`SPUBLISH`) routes messages to per-slot subscriber tables instead of the global channel dict
- How `notifyKeyspaceEvent()` in `notify.c` synthesizes channel names on the fly and feeds them back into `pubsubPublishMessage()` to turn any keyspace mutation into a subscribable event
Prerequisites
- Familiarity with the Redis `SUBSCRIBE`/`PUBLISH` commands from a client perspective
- Comfortable reading C structs and pointer indirection
- Understanding of hash tables (dict) and why lookup is O(1)
Subscription Model
src/pubsub.c:21The two data structures that represent every active subscription
Redis keeps two parallel indexes per active subscription. server.pubsub_channels is a kvstore (sharded dict wrapper) keyed by channel name; its values are dicts of subscribed clients, which is the publisher's lookup table. c->pubsub_channels on the client struct mirrors it so that UNSUBSCRIBE with no arguments can drain all of a client's subscriptions without scanning the server-side table.
Pattern subscriptions follow the same mirrored design. server.pubsub_patterns is a plain dict (no slot sharding, since patterns have no consistent hash slot), and each client carries c->pubsub_patterns for the same client-side enumeration reason.
pubsubtype is a vtable that lets pubsubSubscribeChannel() serve both SUBSCRIBE and SSUBSCRIBE without duplication. pubSubType points at server.pubsub_channels; pubSubShardType points at server.pubsubshard_channels. Every subscribe, unsubscribe, and publish call receives a pubsubtype and routes through its function pointers.
The channel-keyed index drives publish dispatch; the client-keyed index drives bulk unsubscribe. Keeping both in sync means each operation is O(1) or O(subscribers) with no cross-table scan. ---
typedef struct pubsubtype {
int shard;
dict *(*clientPubSubChannels)(client*);
int (*subscriptionCount)(client*);
kvstore **serverPubSubChannels;
robj **subscribeMsg;
robj **unsubscribeMsg;
robj **messageBulk;
}pubsubtype;
/*
* Get client's global Pub/Sub channels subscription count.
*/
int clientSubscriptionsCount(client *c);
/*
* Get client's shard level Pub/Sub channels subscription count.
*/
int clientShardSubscriptionsCount(client *c);
/*
* Get client's global Pub/Sub channels dict.
*/
dict* getClientPubSubChannels(client *c);
/*
* Get client's shard level Pub/Sub channels dict.
*/
dict* getClientPubSubShardChannels(client *c);
/*
* Get list of channels client is subscribed to.
* If a pattern is provided, the subset of channels is returned
* matching the pattern.
*/
void channelList(client *c, sds pat, kvstore *pubsub_channels);
/*
* Pub/Sub type for global channels.
*/
pubsubtype pubSubType = {
.shard = 0,
.clientPubSubChannels = getClientPubSubChannels,
.subscriptionCount = clientSubscriptionsCount,
.serverPubSubChannels = &server.pubsub_channels,
.subscribeMsg = &shared.subscribebulk,
.unsubscribeMsg = &shared.unsubscribebulk,
.messageBulk = &shared.messagebulk,
};
/*
* Pub/Sub type for shard level channels bounded to a slot.
*/
pubsubtype pubSubShardType = {
.shard = 1,
.clientPubSubChannels = getClientPubSubShardChannels,
.subscriptionCount = clientShardSubscriptionsCount,
.serverPubSubChannels = &server.pubsubshard_channels,
.subscribeMsg = &shared.ssubscribebulk,
.unsubscribeMsg = &shared.sunsubscribebulk,
.messageBulk = &shared.smessagebulk,
};SUBSCRIBE / UNSUBSCRIBE
src/pubsub.c:245Two-dict write on subscribe; eager reclaim on unsubscribe
pubsubSubscribeChannel() opens with dictFindLink() on the client dict. That call returns a bucket pointer, not just a boolean, so the later dictSetKeyAtLink() inserts without recomputing the hash. The common subscribe path touches the hash function once per channel.
If the channel is new to the server-side kvstore, kvstoreDictAddRaw() creates a fresh inner dict. If it already exists, existing is non-null and the function borrows the canonical channel robj stored as the key rather than duplicating it. incrRefCount() on both paths prevents the robj from being freed while either index holds it.
pubsubUnsubscribeChannel() reverses each step: remove from client dict, locate the subscriber list in the server dict, remove the client from it, and delete the inner dict entirely if it empties. Freeing on last-unsubscribe caps memory regardless of how many short-lived channels clients create.
Subscribe writes two dicts; unsubscribe frees the inner subscriber dict as soon as it empties. Channels that see brief bursts of activity do not accumulate permanent allocations. ---
int pubsubSubscribeChannel(client *c, robj *channel, pubsubtype type) {
dictEntry *de, *existing;
dict *clients = NULL;
int retval = 0;
unsigned int slot = 0;
/* Add the channel to the client -> channels hash table */
dictEntryLink bucket;
dictEntryLink link = dictFindLink(type.clientPubSubChannels(c),channel,&bucket);
if (link == NULL) { /* Not yet subscribed to this channel */
retval = 1;
/* Add the client to the channel -> list of clients hash table */
if (server.cluster_enabled && type.shard) {
slot = getKeySlot(channel->ptr);
}
de = kvstoreDictAddRaw(*type.serverPubSubChannels, slot, channel, &existing);
if (existing) {
clients = dictGetVal(existing);
channel = dictGetKey(existing);
} else {
clients = dictCreate(&clientDictType);
kvstoreDictSetVal(*type.serverPubSubChannels, slot, de, clients);
incrRefCount(channel);
}
serverAssert(dictAdd(clients, c, NULL) != DICT_ERR);
dictSetKeyAtLink(type.clientPubSubChannels(c), channel, &bucket, 1);
incrRefCount(channel);
}
/* Notify the client */
addReplyPubsubSubscribed(c,channel,type);
return retval;
}
/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
* 0 if the client was not subscribed to the specified channel. */
int pubsubUnsubscribeChannel(client *c, robj *channel, int notify, pubsubtype type) {
dictEntry *de;
dict *clients;
int retval = 0;
int slot = 0;
/* Remove the channel from the client -> channels hash table */
incrRefCount(channel); /* channel may be just a pointer to the same object
we have in the hash tables. Protect it... */
if (dictDelete(type.clientPubSubChannels(c),channel) == DICT_OK) {
retval = 1;
/* Remove the client from the channel -> clients list hash table */
if (server.cluster_enabled && type.shard) {
/* Compute the slot from the channel directly instead of using getKeySlot(),
* because the unsubscribe may be triggered by a different client, and
* getKeySlot() would return the cached slot of that client. */
slot = keyHashSlot(channel->ptr, sdslen(channel->ptr));
}
de = kvstoreDictFind(*type.serverPubSubChannels, slot, channel);
serverAssertWithInfo(c,NULL,de != NULL);
clients = dictGetVal(de);
serverAssertWithInfo(c, NULL, dictDelete(clients, c) == DICT_OK);
if (dictSize(clients) == 0) {
/* Free the dict and associated hash entry at all if this was
* the latest client, so that it will be possible to abuse
* Redis PUBSUB creating millions of channels. */
kvstoreDictDelete(*type.serverPubSubChannels, slot, channel);
}
}
/* Notify the client */
if (notify) {
addReplyPubsubUnsubscribed(c,channel,type);
}
decrRefCount(channel); /* it is finally safe to release it */
return retval;
}PUBLISH Dispatch
src/pubsub.c:469Exact-match delivery via dict lookup, then O(P) pattern scan
Exact-match delivery is a single kvstoreDictFind() followed by a dict iterator that calls addReplyPubsubMessage() per client. That function writes a three-element RESP push frame ([message_type_bulk, channel, message]) into the client's output buffer; no write() syscall happens here. The event loop drains the buffer later.
For global (non-sharded) publishes, the function then iterates all of server.pubsub_patterns, O(P) where P is the number of registered patterns. stringmatchlen() tests each glob against the channel name. On a match, addReplyPubsubPatMessage() writes a four-element frame (pmessage, pattern, channel, message) instead of three. The extra pattern field is how client libraries distinguish a pmessage from a message when a client holds both an exact subscription and a matching pattern subscription.
The return value is the total receiver count. Zero means the publish succeeded but nobody was listening.
Every PUBLISH call on a server with active pattern subscribers pays O(P) regardless of whether any pattern matches. That is the direct reason Redis documentation warns against large numbers of active PSUBSCRIBE clients.
---
int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type) {
int receivers = 0;
dictEntry *de;
dictIterator di;
unsigned int slot = 0;
/* Send to clients listening for that channel */
if (server.cluster_enabled && type.shard) {
slot = keyHashSlot(channel->ptr, sdslen(channel->ptr));
}
de = kvstoreDictFind(*type.serverPubSubChannels, slot, channel);
if (de) {
dict *clients = dictGetVal(de);
dictEntry *entry;
dictIterator iter;
dictInitIterator(&iter, clients);
while ((entry = dictNext(&iter)) != NULL) {
client *c = dictGetKey(entry);
addReplyPubsubMessage(c,channel,message,*type.messageBulk);
if (clusterSlotStatsEnabled(CLUSTER_SLOT_STATS_NET))
clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(c, slot);
updateClientMemUsageAndBucket(c);
receivers++;
}
dictResetIterator(&iter);
}
if (type.shard) {
/* Shard pubsub ignores patterns. */
return receivers;
}
/* Send to clients listening to matching channels */
if (dictSize(server.pubsub_patterns) > 0) {
channel = getDecodedObject(channel);
dictInitIterator(&di, server.pubsub_patterns);
while((de = dictNext(&di)) != NULL) {
robj *pattern = dictGetKey(de);
dict *clients = dictGetVal(de);
if (!stringmatchlen((char*)pattern->ptr,
sdslen(pattern->ptr),
(char*)channel->ptr,
sdslen(channel->ptr),0)) continue;
dictEntry *entry;
dictIterator iter;
dictInitIterator(&iter, clients);
while ((entry = dictNext(&iter)) != NULL) {
client *c = dictGetKey(entry);
addReplyPubsubPatMessage(c,pattern,channel,message);
updateClientMemUsageAndBucket(c);
receivers++;
}
dictResetIterator(&iter);
}
decrRefCount(channel);
dictResetIterator(&di);
}
return receivers;
}Pattern Subscription
src/pubsub.c:354How glob patterns are stored and why their cost lands at publish time
Code (reply format)
void addReplyPubsubPatMessage(client *c, robj *pat, robj *channel, robj *msg) {
uint64_t old_flags = c->flags;
c->flags |= CLIENT_PUSHING;
if (c->resp == 2)
addReply(c,shared.mbulkhdr[4]);
else
addReplyPushLen(c,4);
addReply(c,shared.pmessagebulk);
addReplyBulk(c,pat);
addReplyBulk(c,channel);
addReplyBulk(c,msg);
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
}PSUBSCRIBE news.* stores the literal string "news.*" verbatim in both c->pubsub_patterns and server.pubsub_patterns. Redis never compiles the glob; stringmatchlen() evaluates it against the channel name on each PUBLISH. Registration is O(1) and the matching cost is deferred entirely to publish time.
The subscribe path mirrors channel subscribe almost exactly, except server.pubsub_patterns is a plain dict rather than a kvstore, since patterns have no consistent hash slot and per-slot sharding is impossible. The same two-dict invariant holds: c->pubsub_patterns lets PUNSUBSCRIBE (no args) enumerate all of a client's patterns without touching the server dict.
Receivers get a pmessage frame with four bulk strings (pmessage, pattern, channel, message) instead of the three in a message frame. A client subscribed to both "news.sports" and "news.*" receives two separate frames for a single publish, and the frame type is how its client library tells them apart.
Keeping active pattern subscriptions small is not a style preference. Each registered pattern adds one stringmatchlen() call to every PUBLISH, regardless of whether it matches.
---
int pubsubSubscribePattern(client *c, robj *pattern) {
dictEntry *de;
dict *clients;
int retval = 0;
if (dictAdd(c->pubsub_patterns, pattern, NULL) == DICT_OK) {
retval = 1;
incrRefCount(pattern);
/* Add the client to the pattern -> list of clients hash table */
de = dictFind(server.pubsub_patterns,pattern);
if (de == NULL) {
clients = dictCreate(&clientDictType);
dictAdd(server.pubsub_patterns,pattern,clients);
incrRefCount(pattern);
} else {
clients = dictGetVal(de);
}
serverAssert(dictAdd(clients, c, NULL) != DICT_ERR);
}
/* Notify the client */
addReplyPubsubPatSubscribed(c,pattern);
return retval;
}
/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
* 0 if the client was not subscribed to the specified channel. */
int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) {
dictEntry *de;
dict *clients;
int retval = 0;
incrRefCount(pattern); /* Protect the object. May be the same we remove */
if (dictDelete(c->pubsub_patterns, pattern) == DICT_OK) {
retval = 1;
/* Remove the client from the pattern -> clients list hash table */
de = dictFind(server.pubsub_patterns,pattern);
serverAssertWithInfo(c,NULL,de != NULL);
clients = dictGetVal(de);
serverAssertWithInfo(c, NULL, dictDelete(clients, c) == DICT_OK);
if (dictSize(clients) == 0) {
/* Free the dict and associated hash entry at all if this was
* the latest client. */
dictDelete(server.pubsub_patterns,pattern);
}
}
/* Notify the client */
if (notify) addReplyPubsubPatUnsubscribed(c,pattern);
decrRefCount(pattern);
return retval;
}Sharded Pub/Sub (Redis 7)
src/pubsub.c:74SSUBSCRIBE/SPUBLISH and per-slot subscriber tables
Global PUBLISH in Redis Cluster broadcasts to every node so each node can deliver to its local subscribers. At high publish rates that creates O(cluster-size) fan-out traffic. Sharded Pub/Sub (Redis 7) fixes this by scoping a channel to its hash slot: SPUBLISH delivers only to the node that owns that slot.
The implementation reuses almost all global Pub/Sub code through the pubsubtype vtable. pubSubShardType swaps serverPubSubChannels to server.pubsubshard_channels and sets .shard = 1. That flag causes pubsubSubscribeChannel() to call getKeySlot() and route the subscription into the correct per-slot dict. On publish, pubsubPublishMessageInternal() checks type.shard, computes the slot via keyHashSlot(), and returns early, skipping server.pubsub_patterns entirely. Sharded Pub/Sub does not support glob patterns.
pubsubShardUnsubscribeAllChannelsInSlot() is the cluster-migration hook. When a slot moves to another node, subscriptions scoped to that slot are invalid; the function tears them all down by iterating the slot's inner dict, removing each client from its client-side dict, and sending the unsubscribe reply before deleting the slot entry.
Sharded Pub/Sub is global Pub/Sub with a slot dimension added. The vtable handles the code sharing; .shard = 1 gates the slot-aware paths and disables pattern matching, which is what makes per-slot routing deterministic.
---
pubsubtype pubSubShardType = {
.shard = 1,
.clientPubSubChannels = getClientPubSubShardChannels,
.subscriptionCount = clientShardSubscriptionsCount,
.serverPubSubChannels = &server.pubsubshard_channels,
.subscribeMsg = &shared.ssubscribebulk,
.unsubscribeMsg = &shared.sunsubscribebulk,
.messageBulk = &shared.smessagebulk,
};
void pubsubShardUnsubscribeAllChannelsInSlot(unsigned int slot) {
if (!kvstoreDictSize(server.pubsubshard_channels, slot))
return;
dictEntry *de;
kvstoreDictIterator kvs_di;
kvstoreInitDictSafeIterator(&kvs_di, server.pubsubshard_channels, slot);
while ((de = kvstoreDictIteratorNext(&kvs_di)) != NULL) {
robj *channel = dictGetKey(de);
dict *clients = dictGetVal(de);
/* For each client subscribed to the channel, unsubscribe it. */
dictIterator iter;
dictEntry *entry;
dictInitIterator(&iter, clients);
while ((entry = dictNext(&iter)) != NULL) {
client *c = dictGetKey(entry);
int retval = dictDelete(c->pubsubshard_channels, channel);
serverAssertWithInfo(c,channel,retval == DICT_OK);
addReplyPubsubUnsubscribed(c, channel, pubSubShardType);
/* If the client has no other pubsub subscription,
* move out of pubsub mode. */
if (clientTotalPubSubSubscriptionCount(c) == 0) {
unmarkClientAsPubSub(c);
}
}
dictResetIterator(&iter);
kvstoreDictDelete(server.pubsubshard_channels, slot, channel);
}
kvstoreResetDictIterator(&kvs_di);
}
/* SPUBLISH <shardchannel> <message> */
void spublishCommand(client *c) {
int receivers = pubsubPublishMessageAndPropagateToCluster(c->argv[1],c->argv[2],1);
if (!server.cluster_enabled)
forceCommandPropagation(c,PROPAGATE_REPL);
addReplyLongLong(c,receivers);
}
/* SSUBSCRIBE shardchannel [shardchannel ...] */
void ssubscribeCommand(client *c) {
if (c->flags & CLIENT_DENY_BLOCKING) {
/* A client that has CLIENT_DENY_BLOCKING flag on
* expect a reply per command and so can not execute subscribe. */
addReplyError(c, "SSUBSCRIBE isn't allowed for a DENY BLOCKING client");
return;
}
for (int j = 1; j < c->argc; j++) {
pubsubSubscribeChannel(c, c->argv[j], pubSubShardType);
}
markClientAsPubSub(c);
}
/* SUNSUBSCRIBE [shardchannel [shardchannel ...]] */
void sunsubscribeCommand(client *c) {
if (c->argc == 1) {
pubsubUnsubscribeShardAllChannels(c, 1);
} else {
for (int j = 1; j < c->argc; j++) {
pubsubUnsubscribeChannel(c, c->argv[j], 1, pubSubShardType);
}
}
if (clientTotalPubSubSubscriptionCount(c) == 0) {
unmarkClientAsPubSub(c);
}Keyspace Notifications
src/notify.c:136How notifyKeyspaceEvent() synthesizes channel names and feeds them into the standard publish path
Keyspace notifications are ordinary PUBLISH calls. notify.c generates them in response to keyspace mutations, but the subscriber layer never knows that. Every Redis command that modifies a key calls notifyKeyspaceEvent() with a bitmask type and a human-readable event name like "set" or "expired". The public function is a one-liner wrapper around notifyKeyspaceEventImpl().
Two early-exit checks gate all work inside notifyKeyspaceEventImpl():
- If the
notify_keyspace_eventsbitmask does not include the requested type, return immediately. - If both
server.pubsub_patternsandserver.pubsub_channelsare empty, skip channel name construction entirely.
Deployments that never use keyspace notifications pay a single bitmask test per write command, with no string allocation and no publish.
When enabled, the function builds channel names with SDS string operations. NOTIFY_KEYSPACE produces __keyspace@<dbid>__:<key> with the event name as the message; NOTIFY_KEYEVENT produces __keyevent@<dbid>__:<event> with the key name as the message. Both synthesize a channel robj, call pubsubPublishMessage(), then immediately release it with decrRefCount(). At this SHA, four subkey variants (NOTIFY_SUBKEYSPACE, NOTIFY_SUBKEYEVENT, NOTIFY_SUBKEYSPACEITEM, NOTIFY_SUBKEYSPACEEVENT) extend the format to hash-field granularity using catSubkeysPayload().
When disabled, keyspace notifications add one bitmask test per write. When enabled, the cost is SDS channel name construction plus standard Pub/Sub delivery, with no special-casing in the subscriber layer. ---
static void notifyKeyspaceEventImpl(int type, const char *event, robj *key, int dbid,
robj **subkeys, int count)
{
sds chan;
robj *chanobj, *eventobj;
char buf[24];
serverAssert(sdsEncodedObject(key));
/* If any modules are interested in events, notify the module system now.
* This bypasses the notifications configuration, but the module engine
* will only call event subscribers if the event type matches the types
* they are interested in. Subkeys are passed through so that subscribers
* with a subkey callback receive them. */
moduleNotifyKeyspaceEvent(type, event, key, dbid, subkeys, count);
/* If notifications for this class of events are off, return ASAP. */
if (!(server.notify_keyspace_events & type)) return;
/* If there are no Pub/Sub subscribers (neither pattern nor channel),
* skip the remaining notification work since nobody would receive it. */
if (dictSize(server.pubsub_patterns) == 0 && kvstoreSize(server.pubsub_channels) == 0)
return;
eventobj = createStringObject(event,strlen(event));
int len = ll2string(buf,sizeof(buf),dbid);
/* __keyspace@<db>__:<key> <event> notifications. */
if (server.notify_keyspace_events & NOTIFY_KEYSPACE) {
chan = sdsnewlen("__keyspace@",11);
chan = sdscatlen(chan, buf, len);
chan = sdscatlen(chan, "__:", 3);
chan = sdscatsds(chan, key->ptr);
chanobj = createObject(OBJ_STRING, chan);
pubsubPublishMessage(chanobj, eventobj, 0);
decrRefCount(chanobj);
}
/* __keyevent@<db>__:<event> <key> notifications. */
if (server.notify_keyspace_events & NOTIFY_KEYEVENT) {
chan = sdsnewlen("__keyevent@",11);
chan = sdscatlen(chan, buf, len);
chan = sdscatlen(chan, "__:", 3);
chan = sdscatsds(chan, eventobj->ptr);
chanobj = createObject(OBJ_STRING, chan);
pubsubPublishMessage(chanobj, key, 0);
decrRefCount(chanobj);
}
/* Subkey-level notifications (only when subkeys are provided). */
if (subkeys != NULL && count > 0) {
/* __subkeyspace@<db>__:<key> <event>|<len>:<subkey>[,...] notifications.
* Skip if the event contains '|' to avoid parsing ambiguity since '|'
* is used as a separator between event and subkeys in the payload. */
if (server.notify_keyspace_events & NOTIFY_SUBKEYSPACE && !strchr(event, '|')) {
chan = sdsnewlen("__subkeyspace@", 14);
chan = sdscatlen(chan, buf, len);
chan = sdscatlen(chan, "__:", 3);
chan = sdscatsds(chan, key->ptr);
chanobj = createObject(OBJ_STRING, chan);
/* Build payload: <event>|<subkeys_payload> */
sds payload = sdsdup(eventobj->ptr);
payload = sdscatlen(payload, "|", 1);
payload = catSubkeysPayload(payload, subkeys, count);
robj *payloadobj = createObject(OBJ_STRING, payload);
pubsubPublishMessage(chanobj, payloadobj, 0);
decrRefCount(chanobj);
decrRefCount(payloadobj);
}
/* __subkeyevent@<db>__:<event> <key_len>:<key>|<len>:<subkey>[,...] notifications. */
if (server.notify_keyspace_events & NOTIFY_SUBKEYEVENT) {
chan = sdsnewlen("__subkeyevent@", 14);
chan = sdscatlen(chan, buf, len);
chan = sdscatlen(chan, "__:", 3);
chan = sdscatsds(chan, eventobj->ptr);
chanobj = createObject(OBJ_STRING, chan);
/* Build payload: <key_len>:<key>|<subkeys_payload> */
size_t keylen = sdslen(key->ptr);
char keylenbuf[32];
int keylenlen = ll2string(keylenbuf, sizeof(keylenbuf), keylen);
sds payload = sdsnewlen(keylenbuf, keylenlen);
payload = sdscatlen(payload, ":", 1);
payload = sdscatsds(payload, key->ptr);
payload = sdscatlen(payload, "|", 1);
payload = catSubkeysPayload(payload, subkeys, count);
robj *payloadobj = createObject(OBJ_STRING, payload);
pubsubPublishMessage(chanobj, payloadobj, 0);
decrRefCount(chanobj);
decrRefCount(payloadobj);
}
/* __subkeyspaceitem@<db>__:<key>\n<subkey> <event> notifications (per subkey).
* Skip if the key contains '\n' to avoid parsing ambiguity in the channel name. */
if (server.notify_keyspace_events & NOTIFY_SUBKEYSPACEITEM &&
memchr(key->ptr, '\n', sdslen(key->ptr)) == NULL)
{
for (int i = 0; i < count; i++) {
serverAssert(sdsEncodedObject(subkeys[i]));
chan = sdsnewlen("__subkeyspaceitem@", 18);
chan = sdscatlen(chan, buf, len);
chan = sdscatlen(chan, "__:", 3);
chan = sdscatsds(chan, key->ptr);
chan = sdscatlen(chan, "\n", 1);
chan = sdscatsds(chan, subkeys[i]->ptr);
chanobj = createObject(OBJ_STRING, chan);
pubsubPublishMessage(chanobj, eventobj, 0);
decrRefCount(chanobj);
}
}
/* __subkeyspaceevent@<db>__:<event>|<key> <subkeys> notifications.
* Skip if the event contains '|' to avoid parsing ambiguity since '|'
* is used as a separator between event and key in the channel name. */
if (server.notify_keyspace_events & NOTIFY_SUBKEYSPACEEVENT && !strchr(event, '|')) {
chan = sdsnewlen("__subkeyspaceevent@", 19);
chan = sdscatlen(chan, buf, len);
chan = sdscatlen(chan, "__:", 3);
chan = sdscatsds(chan, eventobj->ptr);
chan = sdscatlen(chan, "|", 1);
chan = sdscatsds(chan, key->ptr);
chanobj = createObject(OBJ_STRING, chan);
robj *payloadobj = createObject(OBJ_STRING, catSubkeysPayload(NULL, subkeys, count));
pubsubPublishMessage(chanobj, payloadobj, 0);
decrRefCount(chanobj);
decrRefCount(payloadobj);
}
}
decrRefCount(eventobj);
}
/* Public API for key-level notifications (backward compatible). */
void notifyKeyspaceEvent(int type, const char *event, robj *key, int dbid) {
notifyKeyspaceEventImpl(type, event, key, dbid, NULL, 0);You've walked through 6 key areas of the Redis codebase.
Continue: Redis Persistence Deep Dive: RDB, AOF, and the Hybrid → Browse all projectsCreate code tours for your project
Intraview lets AI create interactive walkthroughs of any codebase. Install the free VS Code extension and generate your first tour in minutes.
Install Intraview Free