Redis Streams: A Log With Consumer Groups
How Redis stores an append-only log in a radix tree, delivers messages to consumer groups, and tracks unacknowledged entries in the pending entries list
What you will learn
- How the `stream` struct uses a radix tree (rax) keyed by 128-bit IDs to store entries in compact listpack nodes
- How `streamAppendItem` packs each XADD entry using delta-encoded IDs and a SAMEFIELD optimization that omits repeated field names
- What a consumer group (`streamCG`) is structurally: `last_id`, a group-level PEL, a consumer-level PEL, and a time-ordered doubly-linked list for efficient XCLAIM
- How delivery creates a `streamNACK` entry in two rax trees simultaneously (group PEL and consumer PEL) as the unit of at-least-once delivery tracking
- How XACK removes a NACK from both PELs atomically, and what happens to the delivery counter when XCLAIM or XAUTOCLAIM reassigns a stale entry
- How `streamTrim` walks the radix tree node by node, dropping whole listpack nodes when possible and breaking early under approximate trim semantics
Prerequisites
- Familiarity with Redis commands: XADD, XREAD, XREADGROUP, XACK, XCLAIM, XTRIM
- Comfortable reading C structs and pointer manipulation
- Basic understanding of hash tables or trees as index structures
The Radix Tree Backing a Stream
src/stream.h:37How stream entries are stored: rax keyed by 128-bit <ms>-<seq> IDs, with entries packed into listpack nodes
The stream struct is the root object behind every Redis stream key. Its central field, rax, is a radix tree where each key is a big-endian 128-bit streamID (ms + seq) and each value is a pointer to a listpack node holding one or more consecutive entries. This design gives O(log N) range seeks by ID without allocating one heap object per entry. Redis groups consecutive entries into a single listpack macro node, so the rax is typically shallow — thousands of entries may live in dozens of nodes.
The streamIterator just below the stream definition shows what traversal requires: a raxIterator moves between listpack nodes while lp_ele advances within each one. The cgroups field is a second rax, keyed by consumer group name, lazily allocated only on the first XGROUP CREATE. Streams read with plain XREAD carry zero overhead for group tracking.
A Redis stream is a radix tree of listpack nodes. The rax key is the entry ID; each listpack holds field-value pairs for one or more consecutive entries. Range queries are tree seeks, not linear scans. ---
rax *rax; /* The radix tree holding the stream. */
uint64_t length; /* Current number of elements inside this stream. */
streamID last_id; /* Zero if there are yet no items. */
streamID first_id; /* The first non-tombstone entry, zero if empty. */
streamID max_deleted_entry_id; /* The maximal ID that was deleted. */
uint64_t entries_added; /* All time count of elements added. */
size_t alloc_size; /* Total allocated memory (in bytes) by this stream. */
rax *cgroups; /* Consumer groups dictionary: name -> streamCG */
rax *cgroups_ref; /* Index mapping message IDs to their consumer groups. */
streamID min_cgroup_last_id; /* The minimum ID of consume group. */
unsigned int min_cgroup_last_id_valid: 1;
uint64_t idmp_duration; /* IDMP duration in seconds. */
uint64_t idmp_max_entries; /* Max number of IID for tracking. */
rax *idmp_producers; /* IDMP producers radix tree: pid -> idmpProducer */
uint64_t iids_added; /* All time count of entries with IID added. */
uint64_t iids_duplicates; /* All time count of duplicate IIDs detected. */
} stream;
/* We define an iterator to iterate stream items in an abstract way, without
* caring about the radix tree + listpack representation. Technically speaking
* the iterator is only used inside streamReplyWithRange(), so could just
* be implemented inside the function, but practically there is the AOF
* rewriting code that also needs to iterate the stream to emit the XADD
* commands. */
typedef struct streamIterator {
stream *stream; /* The stream we are iterating. */
streamID master_id; /* ID of the master entry at listpack head. */
uint64_t master_fields_count; /* Master entries # of fields. */
unsigned char *master_fields_start; /* Master entries start in listpack. */
unsigned char *master_fields_ptr; /* Master field to emit next. */
int entry_flags; /* Flags of entry we are emitting. */
int rev; /* True if iterating end to start (reverse). */
int skip_tombstones; /* True if not emitting tombstone entries. */
uint64_t start_key[2]; /* Start key as 128 bit big endian. */
uint64_t end_key[2]; /* End key as 128 bit big endian. */
/* Decoded native-endian fields for fast numeric comparison */
uint64_t start_ms;
uint64_t start_seq;
uint64_t end_ms;
uint64_t end_seq;
raxIterator ri; /* Rax iterator. */
unsigned char *lp; /* Current listpack. */
unsigned char *lp_last_ele; /* Previous listpack element position for corruption detection. */
unsigned char *lp_ele; /* Current listpack cursor. */
unsigned char *lp_flags; /* Current entry flags pointer. */
/* Buffers used to hold the string of lpGet() when the element is
* integer encoded, so that there is no string representation of the
* element inside the listpack itself. */
unsigned char field_buf[LP_INTBUF_SIZE];
unsigned char value_buf[LP_INTBUF_SIZE];
} streamIterator;
/* Forward declarations */
typedef struct streamNACK streamNACK;
/* Consumer group. */XADD: Entry Layout
src/t_stream.c:702Delta-encoded IDs and SAMEFIELD compression inside listpack nodes
Every streamAppendItem call (the function behind XADD) appends to the tail listpack node of the rax. Each entry's byte layout follows the ASCII diagram in the source: flags, then a delta-encoded entry-id (milliseconds and sequence relative to the listpack's master entry, not the absolute ID), then field-value pairs, then a backward-traversal count lp-count.
The space optimization is STREAM_ITEM_FLAG_SAMEFIELDS: when the incoming entry has the same field names as the master entry at the listpack head, streamAppendItem omits the field names entirely and stores only values. For a telemetry stream with uniform fields like sensor_id, temperature, and voltage, this cuts entry size roughly in half. The lp-count field at each entry's tail exists purely to support reverse iteration — Redis reads the count and jumps back that many listpack elements to find the preceding flags field.
Stream entries inside a listpack are delta-encoded (IDs relative to the master entry) and optionally field-name-compressed. SAMEFIELD is the space saving for uniform-schema streams. lp-count is the backward-seek bookmark that makes XREVRANGE possible without a separate index.
---
/* Populate the listpack with the new entry. We use the following
* encoding:
*
* +-----+--------+----------+-------+-------+-/-+-------+-------+--------+
* |flags|entry-id|num-fields|field-1|value-1|...|field-N|value-N|lp-count|
* +-----+--------+----------+-------+-------+-/-+-------+-------+--------+
*
* However if the SAMEFIELD flag is set, we have just to populate
* the entry with the values, so it becomes:
*
* +-----+--------+-------+-/-+-------+--------+
* |flags|entry-id|value-1|...|value-N|lp-count|
* +-----+--------+-------+-/-+-------+--------+
*
* The entry-id field is actually two separated fields: the ms
* and seq difference compared to the master entry.
*
* The lp-count field is a number that states the number of listpack pieces
* that compose the entry, so that it's possible to travel the entry
* in reverse order: we can just start from the end of the listpack, read
* the entry, and jump back N times to seek the "flags" field to read
* the stream full entry. */
size_t oldsize = lpBytes(lp);
lp = lpAppendInteger(lp,flags);
lp = lpAppendInteger(lp,id.ms - master_id.ms);
lp = lpAppendInteger(lp,id.seq - master_id.seq);
if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS))
lp = lpAppendInteger(lp,numfields);
for (int64_t i = 0; i < numfields; i++) {
sds field = argv[i*2]->ptr, value = argv[i*2+1]->ptr;
if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS))
lp = lpAppend(lp,(unsigned char*)field,sdslen(field));
lp = lpAppend(lp,(unsigned char*)value,sdslen(value));
}
/* Compute and store the lp-count field. */
int64_t lp_count = numfields;
lp_count += 3; /* Add the 3 fixed fields flags + ms-diff + seq-diff. */
if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS)) {
/* If the item is not compressed, it also has the fields other than
* the values, and an additional num-fields field. */
lp_count += numfields+1;
}
lp = lpAppendInteger(lp,lp_count);
s->alloc_size -= oldsize;
s->alloc_size += lpBytes(lp);
/* Insert back into the tree in order to update the listpack pointer. */
if (ri.data != lp)
raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
s->length++;
s->entries_added++;
s->last_id = id;
if (s->length == 1) s->first_id = id;
if (added_id) *added_id = id;
return C_OK;
}
Consumer Groups
src/t_stream.c:3303streamCG allocation, last_id cursor semantics, and teardown invariants
streamCreateCG shows the full structure of a consumer group in one allocation sequence. Each group gets three index structures:
pel— group-level pending entries list, a rax keyed by encoded stream IDconsumers— a rax keyed by consumer namepel_time_head/pel_time_tail— a time-ordered doubly-linked list
The linked list is what makes XCLAIM efficient: instead of scanning the entire PEL for stale entries, Redis walks from pel_time_head and stops at the first entry that is not idle long enough. The last_id field is the group's cursor into the stream — messages from XREADGROUP > always have IDs strictly greater than last_id, advanced atomically as messages are handed out.
The streamDestroyCG path shows the teardown contract: every NACK must be unlinked from cgroups_ref before the group frees, and the cached min_cgroup_last_id on the parent stream is invalidated so trim logic can recalculate the safe deletion boundary.
A consumer group is three index structures sharing one last_id cursor: a rax PEL for O(log N) NACK lookup by ID, a rax consumers directory, and a time-ordered linked list for O(1) access to the oldest pending entry.
---
streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id, long long entries_read) {
if (s->cgroups == NULL)
s->cgroups = raxNewWithMetadata(0, &s->alloc_size);
if (raxFind(s->cgroups,(unsigned char*)name,namelen,NULL))
return NULL;
size_t usable;
streamCG *cg = zmalloc_usable(sizeof(*cg), &usable);
s->alloc_size += usable;
cg->pel = raxNewWithMetadata(0, &s->alloc_size);
cg->pel_time_head = NULL;
cg->pel_time_tail = NULL;
cg->pel_nack_tail = NULL;
cg->consumers = raxNewWithMetadata(0, &s->alloc_size);
cg->last_id.ms = 0;
cg->last_id.seq = 0;
streamUpdateCGroupLastId(s, cg, id);
cg->entries_read = entries_read;
raxInsert(s->cgroups,(unsigned char*)name,namelen,cg,NULL);
return cg;
}
/* Free a consumer group and all its associated data. */
static void streamFreeCG(stream *s, streamCG *cg) {
/* Free the pel, unlinking each NACK from the time list in the callback */
streamFreeNACKCtx ctx = {s, cg};
raxFreeWithCbAndContext(cg->pel, streamFreeNACKGeneric, &ctx);
/* pel_time_head/tail/pel_nack_tail should now be NULL after unlinking all NACKs */
serverAssert(cg->pel_time_head == NULL && cg->pel_time_tail == NULL && cg->pel_nack_tail == NULL);
raxFreeWithCbAndContext(cg->consumers, streamFreeConsumerGeneric, s);
size_t usable;
zfree_usable(cg, &usable);
s->alloc_size -= usable;
}
/* Destroy a consumer group and clean up all associated references. */
void streamDestroyCG(stream *s, streamCG *cg) {
/* Remove all references from the cgroups_ref. */
raxIterator it;
raxStart(&it, cg->pel);
raxSeek(&it, "^", NULL, 0);
while (raxNext(&it)) {
streamNACK *nack = it.data;
streamUnlinkEntryFromCGroupRef(s, nack, it.key);
}
raxStop(&it);
/* If we're destroying the group with the minimum last_id, the cached
* minimum is no longer valid and needs to be recalculated from the
* remaining groups. */
if (s->min_cgroup_last_id_valid && streamCompareID(&s->min_cgroup_last_id, &cg->last_id) == 0)
s->min_cgroup_last_id_valid = 0;
streamFreeCG(s, cg);
}
XREADGROUP and the Pending Entries List
src/t_stream.c:2223Dual-PEL registration at delivery time and the re-ownership path on cursor rewind
This block inside streamReplyWithRange is where at-least-once delivery is implemented. When a consumer reads a new message (not a history re-read, and not with NOACK), Redis allocates a streamNACK struct and attempts raxTryInsert into the group's PEL. The optimistic insert handles the common case — new message, no prior owner — in a single call.
If the insert fails because the ID already has a NACK (for example, after XGROUP SETID rewound the cursor), Redis transfers ownership: it removes the NACK from the old consumer's personal PEL, points nack->consumer at the new consumer, and resets delivery_count to 1. In both paths, the NACK ends up in two rax trees simultaneously — group->pel (the authoritative index) and consumer->pel (the per-consumer view) — as pointers to the same object. That shared pointer is what lets XACK remove from both trees without a join step.
On every XREADGROUP delivery, one streamNACK object is inserted into two rax trees as the same pointer. The message is not considered processed until both registrations are removed by XACK.
---
if (group && !noack) {
unsigned char buf[sizeof(streamID)];
streamEncodeID(buf,&id);
/* Try to add a new NACK. Most of the time this will work and
* will not require extra lookups. We'll fix the problem later
* if we find that there is already an entry for this ID. */
streamNACK *nack = streamCreateNACK(s, consumer, &id);
int group_inserted =
raxTryInsert(group->pel,buf,sizeof(buf),nack,NULL);
/* Now we can check if the entry was already busy, and
* in that case reassign the entry to the new consumer,
* or update it if the consumer is the same as before. */
if (group_inserted == 0) {
streamFreeNACK(s,nack);
void *result;
int found = raxFind(group->pel,buf,sizeof(buf),&result);
serverAssert(found);
nack = result;
/* Only transfer between consumers if they're different */
if (nack->consumer != consumer) {
if (nack->consumer)
raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
nack->consumer = consumer;
raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
}
nack->delivery_count = 1;
/* Update delivery time and reposition in time list */
pelListUpdate(group, nack, cmd_time_snapshot);
} else {
/* New NACK - insert into consumer's PEL and time list */
raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
nack->cgroup_ref_node = streamLinkCGroupToEntry(s, group, buf);
pelListInsertAtTail(group, nack);
}
consumer->active_time = cmd_time_snapshot;
/* Propagate as XCLAIM. */
if (spi) {
robj *delivery_count = createStringObjectFromLongLong(nack->delivery_count);
streamPropagateXCLAIMCopyFree(db_id,spi->keyname,group_last_id,spi->groupname,idarg,consumername,delivery_time,delivery_count);
decrRefCount(delivery_count);
if (propCount) (*propCount)++;
}
}
decrRefCount(idarg);
arraylen++;
if (count && count == arraylen) break;
}
if (spi && consumer) {XACK and Acknowledgment
src/t_stream.c:3782Single-lookup dual-PEL removal and idempotent ack semantics
xackCommand is deliberately simple because the dual-PEL invariant established at delivery time does the heavy lifting. IDs are parsed upfront — any malformed ID aborts before any ack happens, making the batch all-or-nothing on parse errors (though IDs already acked within the same loop are not rolled back).
For each valid ID, the lookup goes through group->pel, not the consumer's PEL. The group PEL is the authoritative index: the NACK pointer retrieved there carries nack->consumer, which gives Redis the consumer's personal PEL for the second removal. The removal sequence is:
pelListUnlink— removes the NACK from the time-ordered linked list- Two
raxRemovecalls — prune both rax trees streamDestroyNACK— unlinks fromcgroups_refand frees the allocation
Acking an ID not in the PEL is silently ignored and does not increment the return value. XACK is idempotent.
XACK reaches both PELs through one pointer: look up group->pel by ID to get the NACK, read nack->consumer for the consumer's PEL, remove from both. Three rax operations and the entry is cleared from all indexes.
---
void xackCommand(client *c) {
streamCG *group = NULL;
kvobj *kv = lookupKeyRead(c->db, c->argv[1]);
if (kv) {
if (checkType(c, kv, OBJ_STREAM)) return; /* Type error. */
group = streamLookupCG(kv->ptr, c->argv[2]->ptr);
}
/* No key or group? Nothing to ack. */
if (kv == NULL || group == NULL) {
addReply(c,shared.czero);
return;
}
/* Start parsing the IDs, so that we abort ASAP if there is a syntax
* error: the return value of this command cannot be an error in case
* the client successfully acknowledged some messages, so it should be
* executed in a "all or nothing" fashion. */
streamID static_ids[STREAMID_STATIC_VECTOR_LEN];
streamID *ids = static_ids;
int id_count = c->argc-3;
if (id_count > STREAMID_STATIC_VECTOR_LEN)
ids = zmalloc(sizeof(streamID)*id_count);
for (int j = 3; j < c->argc; j++) {
if (streamParseStrictIDOrReply(c,c->argv[j],&ids[j-3],0,NULL) != C_OK) goto cleanup;
}
int acknowledged = 0;
size_t old_alloc = server.memory_tracking_enabled ? kvobjAllocSize(kv) : 0;
for (int j = 3; j < c->argc; j++) {
unsigned char buf[sizeof(streamID)];
streamEncodeID(buf,&ids[j-3]);
/* Lookup the ID in the group PEL: it will have a reference to the
* NACK structure that will have a reference to the consumer, so that
* we are able to remove the entry from both PELs. */
void *result;
if (raxFind(group->pel,buf,sizeof(buf),&result)) {
streamNACK *nack = result;
pelListUnlink(group, nack);
raxRemove(group->pel,buf,sizeof(buf),NULL);
if (nack->consumer)
raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
streamDestroyNACK(kv->ptr, nack, buf);
acknowledged++;
server.dirty++;
keyModified(c,c->db,c->argv[1],kv,0);
}
}
if (server.memory_tracking_enabled)
updateSlotAllocSize(c->db,getKeySlot(c->argv[1]->ptr),kv,old_alloc,kvobjAllocSize(kv));
addReplyLongLong(c,acknowledged);
cleanup:
if (ids != static_ids) zfree(ids);
}
/* XNACK key group <SILENT|FAIL|FATAL> IDS numids id [id ...]
* [RETRYCOUNT count] [FORCE]
*
* Release pending messages back to the group's PEL without acknowledging them.XCLAIM and XAUTOCLAIM
src/t_stream.c:4372Pointer surgery for ownership transfer and time-list scanning for XAUTOCLAIM
XCLAIM is how a consumer group recovers from a dead consumer. When consumer A crashes mid-processing, its PEL entries sit unacknowledged indefinitely. Another consumer calls XCLAIM with a min-idle-time threshold, and Redis transfers ownership for each claimed entry:
- Remove the NACK from the original consumer's PEL
- Update
nack->consumerto the new consumer - Reinsert into the new consumer's PEL, increment
delivery_count, updatedelivery_time
XAUTOCLAIM removes the manual discovery step. Instead of requiring the caller to know which IDs are stale, it takes a start-id and a min-idle-time, scans the group's time-ordered linked list (pel_time_head) forward, and returns up to COUNT entries idle long enough. Because the list is sorted oldest-first, XAUTOCLAIM stops at the first entry below the threshold, making the scan O(k) where k is entries returned, not total PEL size. The JUSTID option for both commands lets a consumer claim ownership without receiving the message body — useful for resetting the idle timer without re-queuing work.
XCLAIM re-ownership is three-step pointer surgery: remove from old consumer's PEL, update nack->consumer, insert into new consumer's PEL. XAUTOCLAIM uses the time-ordered linked list to find stale entries in O(k), not O(total PEL size).
---
void xclaimCommand(client *c) {
streamCG *group = NULL;
kvobj *o = lookupKeyRead(c->db,c->argv[1]);
long long minidle; /* Minimum idle time argument. */
long long retrycount = -1; /* -1 means RETRYCOUNT option not given. */
mstime_t deliverytime = -1; /* -1 means IDLE/TIME options not given. */
int force = 0;
int justid = 0;
if (o) {
if (checkType(c,o,OBJ_STREAM)) return; /* Type error. */
group = streamLookupCG(o->ptr,c->argv[2]->ptr);
}
/* No key or group? Send an error given that the group creation
* is mandatory. */
if (o == NULL || group == NULL) {
addReplyErrorFormat(c,"-NOGROUP No such key '%s' or "
"consumer group '%s'", (char*)c->argv[1]->ptr,
(char*)c->argv[2]->ptr);
return;
}
if (getLongLongFromObjectOrReply(c,c->argv[4],&minidle,
"Invalid min-idle-time argument for XCLAIM")
!= C_OK) return;
if (minidle < 0) minidle = 0;
/* Start parsing the IDs, so that we abort ASAP if there is a syntax
* error: the return value of this command cannot be an error in case
* the client successfully claimed some message, so it should be
* executed in a "all or nothing" fashion. */
int j;
streamID static_ids[STREAMID_STATIC_VECTOR_LEN];
streamID *ids = static_ids;
int id_count = c->argc-5;
if (id_count > STREAMID_STATIC_VECTOR_LEN)
ids = zmalloc(sizeof(streamID)*id_count);
for (j = 5; j < c->argc; j++) {
if (streamParseStrictIDOrReply(NULL,c->argv[j],&ids[j-5],0,NULL) != C_OK) break;
}
int last_id_arg = j-1; /* Next time we iterate the IDs we now the range. */
/* If we stopped because some IDs cannot be parsed, perhaps they
* are trailing options. */
mstime_t now = commandTimeSnapshot();
streamID last_id = {0,0};
int propagate_last_id = 0;
for (; j < c->argc; j++) {
int moreargs = (c->argc-1) - j; /* Number of additional arguments. */
char *opt = c->argv[j]->ptr;
if (!strcasecmp(opt,"FORCE")) {
force = 1;
} else if (!strcasecmp(opt,"JUSTID")) {
justid = 1;
} else if (!strcasecmp(opt,"IDLE") && moreargs) {
j++;
if (getLongLongFromObjectOrReply(c,c->argv[j],&deliverytime,
"Invalid IDLE option argument for XCLAIM")
!= C_OK) goto cleanup;Trimming: XTRIM and MAXLEN
src/t_stream.c:850Node-level deletion, approximate trim early exit, and the limit work cap
streamTrim is the shared implementation behind both XTRIM and the inline MAXLEN/MINID option of XADD. It walks the rax from the beginning ("^" seek) in chronological order. Redis operates at the listpack node level first: if every entry inside a node falls below the trim threshold, the entire node is freed with one lpFree and removed with one raxRemove — far cheaper than entry-by-entry deletion.
The approx flag, set when you write MAXLEN ~ 1000, enables early exit: after a full node deletion, if approximate trim is on, the loop breaks rather than descending into partial node trimming. The stream may overshoot by up to one node's worth of entries, but no partial listpack parsing is required. The limit parameter adds a hard cap on total work per call — critical for streams with consumer group references, which require entry-by-entry cleanup instead of whole-node drops.
XTRIM works node-by-node through the rax. A node fully below the threshold is freed in O(1). Approximate trimming (~) stops at the node boundary, trading a small length overshoot for a significant reduction in CPU work per XADD.
---
int64_t streamTrim(stream *s, streamAddTrimArgs *args) {
size_t maxlen = args->maxlen;
streamID *id = &args->minid;
int approx = args->approx_trim;
int64_t limit = args->limit;
int trim_strategy = args->trim_strategy;
int delete_strategy = args->delete_strategy;
if (trim_strategy == TRIM_STRATEGY_NONE)
return 0;
raxIterator ri;
raxStart(&ri,s->rax);
raxSeek(&ri,"^",NULL,0);
int64_t deleted = 0;
while (raxNext(&ri)) {
if (trim_strategy == TRIM_STRATEGY_MAXLEN && s->length <= maxlen)
break;
unsigned char *lp = ri.data, *p = lpFirst(lp);
int64_t entries = lpGetInteger(p);
/* Check if we exceeded the amount of work we could do */
if (limit && (deleted + entries) > limit)
break;
/* Check if we can remove the whole node */
int remove_node = 0; /* Final decision flag for node removal */
int node_eligible_for_remove = 0; /* Whether node meets the basic criteria for removal */
streamID master_id = {0};
/* Read the master ID from the radix tree key. */
streamDecodeID(ri.key, &master_id);
if (trim_strategy == TRIM_STRATEGY_MAXLEN) {
node_eligible_for_remove = s->length - entries >= maxlen;
} else {
/* Read last ID. */
streamID last_id = {0,0};
lpGetEdgeStreamID(lp, 0, &master_id, &last_id);
/* We can remove the entire node id its last ID < 'id' */
node_eligible_for_remove = streamCompareID(&last_id, id) < 0;
}
if (node_eligible_for_remove && delete_strategy == DELETE_STRATEGY_KEEPREF) {
/* With KEEPREF strategy, we can remove the whole node directly since we don't need
* to check or clean up consumer group references. */
remove_node = 1;
}
if (remove_node) {
s->alloc_size -= lpBytes(lp);
lpFree(lp);
raxRemove(s->rax,ri.key,ri.key_len,NULL);
raxSeek(&ri,">=",ri.key,ri.key_len);
s->length -= entries;
deleted += entries;
continue;
}
You've walked through 7 key areas of the Redis codebase.
Continue: Redis Sentinel: Watch, Vote, Fail Over → Browse all projectsCreate code tours for your project
Intraview lets AI create interactive walkthroughs of any codebase. Install the free VS Code extension and generate your first tour in minutes.
Install Intraview Free