Redis C redis/redis

RESP Protocol Deep Dive: How Redis Talks Over the Wire

Eight stops through the parsing, reply, and negotiation code that drives every Redis client connection

8 stops ~30 min Verified 2026-04-30
What you will learn
  • How RESP2's five prefix types (`+`, `-`, `:`, `$`, `*`) are parsed from raw bytes into command arguments in `processInlineBuffer` and `processMultibulkBuffer`
  • What RESP3 adds on top of RESP2: the eight new type prefixes and how `addReplyDouble`, `addReplyBigNum`, `addReplyVerbatim`, and `addReplyMapLen` emit them
  • Where `processInputBuffer` decides which parsing path to take and how the lookahead loop enables pipelining
  • How `helloCommand` lets a client negotiate its RESP version, authenticate, and name itself in a single round trip
  • How RESP3 push messages (`>`) are deferred to `server.pending_push_messages` so they never collide with an in-flight command reply
  • How the output buffer works: `addReply`, `addReplyBulk`, and `addReplyArrayLen` all funnel through a two-stage static-buffer-then-list structure
  • Why pipelining is free in Redis: `processInputBuffer` loops until the query buffer is empty, batching multiple parsed commands before executing any of them
  • How Redis categorises error replies: a single `addReplyErrorLength` function auto-prefixes `-ERR`, while callers that provide their own prefix (`-WRONGTYPE`, `-MOVED`, `-ASK`, `-NOAUTH`) bypass it
Prerequisites
  • Comfortable reading C (you do not need to write it, just trace the logic)
  • Basic familiarity with TCP and the idea of a client sending text commands to a server
  • Having used a Redis client (e.g. `redis-cli`) at least once is helpful
1 / 8

RESP2 Wire Format

src/networking.c:2953

How processInlineBuffer and processMultibulkBuffer parse the five RESP2 prefix bytes from raw socket data into command arguments

RESP2 defines five prefix bytes that appear at the start of every framed value on the wire:

  • + introduces a simple string (+OK\r\n)
  • - introduces an error (-ERR unknown command\r\n)
  • : introduces an integer (:1\r\n)
  • $ introduces a bulk string: prefix byte, byte count, \r\n, payload, then another \r\n ($3\r\nfoo\r\n)
  • * introduces an array (multibulk): element count, then each element encoded with its own prefix type

All five types terminate frames with \r\n.

processInlineBuffer handles the simpler path: text typed at a telnet session or the redis-cli shell. It scans forward in c->querybuf from c->qb_pos looking for \n, slices out the line, calls sdssplitargs to tokenise on whitespace, and populates the pending command structure. The linefeed_chars variable accounts for the difference between bare \n and the canonical \r\n terminator.

processMultibulkBuffer (Stop 3) handles the *-prefixed binary-safe path. The split between them comes down to the first byte the client writes. RESP2 is self-delimiting, so neither parser needs out-of-band length information.

Key takeaway

RESP2 is a five-character alphabet: +, -, :, $, *. Every message a client sends or receives is built from these five framing rules, and the entire parsing logic lives in two functions totalling under 200 lines of C. ---

int processInlineBuffer(client *c, pendingCommand *pcmd) {
    char *newline;
    int argc, j, linefeed_chars = 1;
    sds *argv, aux;
    size_t querylen;

    /* Search for end of line */
    newline = strchr(c->querybuf+c->qb_pos,'\n');

    /* Nothing to do without a \r\n */
    if (newline == NULL) {
        if (sdslen(c->querybuf)-c->qb_pos > PROTO_INLINE_MAX_SIZE) {
            pcmd->read_error = CLIENT_READ_TOO_BIG_INLINE_REQUEST;
        }
        return C_ERR;
    }

    /* Handle the \r\n case. */
    if (newline != c->querybuf+c->qb_pos && *(newline-1) == '\r')
        newline--, linefeed_chars++;

    /* Split the input buffer up to the \r\n */
    querylen = newline-(c->querybuf+c->qb_pos);
    aux = sdsnewlen(c->querybuf+c->qb_pos,querylen);
    argv = sdssplitargs(aux,&argc);
    sdsfree(aux);
    if (argv == NULL) {
        pcmd->read_error = CLIENT_READ_UNBALANCED_QUOTES;
        return C_ERR;
    }

    /* Newline from slaves can be used to refresh the last ACK time.
     * This is useful for a slave to ping back while loading a big
     * RDB file. */
    if (querylen == 0 && clientTypeIsSlave(c)) {
        if (c->running_tid == IOTHREAD_MAIN_THREAD_ID)
            c->repl_ack_time = server.unixtime;
        else
            /* If this is a replica client running in an IO thread we cache the
             * last ack time in a different member variable in order to avoid
             * contention with main thread. f.e see refreshGoodSlavesCount()
             * Note c->repl_ack_time will still be updated in
             * updateClientDataFromIOThread with the value of c->io_repl_ack_time
             * when the client moves from IO to main thread. */
            c->io_repl_ack_time = server.unixtime;
    }

    /* Masters should never send us inline protocol to run actual
     * commands. If this happens, it is likely due to a bug in Redis where
     * we got some desynchronization in the protocol, for example
     * because of a PSYNC gone bad.
     *
     * However there is an exception: masters may send us just a newline
     * to keep the connection active. */
    if (querylen != 0 && c->flags & CLIENT_MASTER) {
        sdsfreesplitres(argv,argc);
        pcmd->read_error = CLIENT_READ_MASTER_USING_INLINE_PROTOCAL;
        return C_ERR;
    }

    /* Move querybuffer position to the next query in the buffer. */
    c->qb_pos += querylen+linefeed_chars;

    /* Setup argv array on client structure */
    if (argc) {
        /* Create new argv if space is insufficient. */
        if (argc > pcmd->argv_len) {
            zfree(pcmd->argv);
2 / 8

RESP3 Additions

src/networking.c:1026

Eight new type prefixes in RESP3 and how addReplyDouble, addReplyBigNum, addReplyMapLen, and addReplyVerbatim emit them

RESP3, introduced in Redis 6, adds eight new type prefixes on top of RESP2:

  • % (Map) — key-value pairs with a leading count of pairs; addReplyMapLen doubles the count for RESP2 clients, turning the map into a plain array
  • ~ (Set) — like an array but semantically unordered
  • > (Push) — out-of-band server pushes (covered in Stop 5)
  • ( (Big number) — arbitrarily large integers without floating-point loss
  • # (Boolean) — literal #t\r\n or #f\r\n instead of the RESP2 integer 0/1
  • , (Double) — native float64 wire type; addReplyDouble prefixes the decimal string with , when c->resp == 3, falling back to a $-prefixed bulk string for RESP2
  • _ (Null) — replaces RESP2's $-1\r\n and *-1\r\n sentinels with a unified null type
  • = (Verbatim string) — string with a three-character media type hint embedded in the prefix (=<length>\r\n<ext>:<content>\r\n), as shown in addReplyVerbatim

Every reply constructor checks c->resp and emits the appropriate encoding. The version check lives entirely inside the reply helpers, not in command handlers, so the same command code serves both RESP2 and RESP3 clients unchanged.

Key takeaway

RESP3 is backwards-compatible because each addReply* helper contains the version branch. A RESP3 client gets richer native types; a RESP2 client gets bulk strings or arrays carrying the same data. No changes are needed in command handlers. ---

void addReplyDouble(client *c, double d) {
    if (c->resp == 3) {
        char dbuf[MAX_D2STRING_CHARS+3];
        dbuf[0] = ',';
        const int dlen = d2string(dbuf+1,sizeof(dbuf)-1,d);
        dbuf[dlen+1] = '\r';
        dbuf[dlen+2] = '\n';
        dbuf[dlen+3] = '\0';
        addReplyProto(c,dbuf,dlen+3);
    } else {
        char dbuf[MAX_LONG_DOUBLE_CHARS+32];
        /* In order to prepend the string length before the formatted number,
         * but still avoid an extra memcpy of the whole number, we reserve space
         * for maximum header `$0000\r\n`, print double, add the resp header in
         * front of it, and then send the buffer with the right `start` offset. */
        const int dlen = d2string(dbuf+7,sizeof(dbuf)-7,d);
        int digits = digits10(dlen);
        int start = 4 - digits;
        serverAssert(start >= 0);
        dbuf[start] = '$';

        /* Convert `dlen` to string, putting it's digits after '$' and before the
            * formatted double string. */
        for(int i = digits, val = dlen; val && i > 0 ; --i, val /= 10) {
            dbuf[start + i] = "0123456789"[val % 10];
        }
        dbuf[5] = '\r';
        dbuf[6] = '\n';
        dbuf[dlen+7] = '\r';
        dbuf[dlen+8] = '\n';
        dbuf[dlen+9] = '\0';
        addReplyProto(c,dbuf+start,dlen+9-start);
    }
}

void addReplyBigNum(client *c, const char* num, size_t len) {
    if (c->resp == 2) {
        addReplyBulkCBuffer(c, num, len);
    } else {
        addReplyProto(c,"(",1);
        addReplyProto(c,num,len);
        addReplyProto(c,"\r\n",2);
    }
}


void addReplyMapLen(client *c, long length) {
    int prefix = c->resp == 2 ? '*' : '%';
    if (c->resp == 2) length *= 2;
    addReplyAggregateLen(c,length,prefix);
}

void addReplySetLen(client *c, long length) {
    int prefix = c->resp == 2 ? '*' : '~';
    addReplyAggregateLen(c,length,prefix);
}

void addReplyAttributeLen(client *c, long length) {
    serverAssert(c->resp >= 3);
    addReplyAggregateLen(c,length,'|');
}

void addReplyPushLen(client *c, long length) {
    serverAssert(c->resp >= 3);
    serverAssertWithInfo(c, NULL, c->flags & CLIENT_PUSHING);
    addReplyAggregateLen(c,length,'>');
}

void addReplyNull(client *c) {
    if (c->resp == 2) {
        addReplyProto(c,"$-1\r\n",5);
    } else {
        addReplyProto(c,"_\r\n",3);
    }
}

void addReplyBool(client *c, int b) {
    if (c->resp == 2) {
        addReply(c, b ? shared.cone : shared.czero);
    } else {
        addReplyProto(c, b ? "#t\r\n" : "#f\r\n",4);

void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext) {
    if (c->resp == 2) {
        addReplyBulkCBuffer(c,s,len);
    } else {
        char buf[32];
        size_t preflen = snprintf(buf,sizeof(buf),"=%zu\r\nxxx:",len+4);
        char *p = buf+preflen-4;
        for (int i = 0; i < 3; i++) {
            if (*ext == '\0') {
                p[i] = ' ';
            } else {
                p[i] = *ext++;
            }
        }
        addReplyProto(c,buf,preflen);
        addReplyProto(c,s,len);
        addReplyProto(c,"\r\n",2);
    }
}

/* This function is similar to the addReplyHelp function but adds the
 * ability to pass in two arrays of strings. Some commands have
 * some additional subcommands based on the specific feature implementation
 * Redis is compiled with (currently just clustering). This function allows
3 / 8

Inline vs Multibulk Dispatch

src/networking.c:3514

How processInputBuffer inspects the first byte to choose between inline and multibulk parsing paths

processInputBuffer is the parsing dispatcher. After guard conditions for blocked clients and in-progress commands, protocol detection is a single byte comparison: if c->querybuf[c->qb_pos] == '*', this is a multibulk command using the binary-safe protocol; anything else goes through the inline path.

The inline path acquires a fresh pendingCommand and calls processInlineBuffer. The multibulk path adds one condition: when the incomplete flag is set, a previous call partially parsed a large bulk argument, so the function re-uses the in-progress pendingCommand from the tail of the queue instead of allocating a new one.

In both cases, a C_ERR return without read_error means the buffer contains an incomplete frame (wait for more bytes). A C_ERR with read_error set means a protocol violation, which triggers disconnection. The lookahead variable controls how many commands can be parsed ahead before execution begins, making this loop the foundation of pipelining (Stop 7).

Key takeaway

Protocol detection at the entry of processInputBuffer is a single byte comparison: * means binary-safe multibulk; anything else means inline text. All stateful partial-frame recovery logic lives in the respective parser, not in this dispatcher. ---

int processInputBuffer(client *c) {
    atomicIncr(server.stat_total_client_process_input_buff_events, 1);

    /* Keep active-client window updates on main-thread paths only (here and
     * in IO-thread handoff processing) to avoid races with serverCron()
     * maintenance of the circular slots. */
    if (c->running_tid == IOTHREAD_MAIN_THREAD_ID)
        statsUpdateActiveClients(c);

    /* We limit the lookahead for unauthenticated connections to 1.
     * This is both to reduce memory overhead, and to prevent errors: AUTH can
     * affect the handling of succeeding commands. Parsing of "large"
     * unauthenticated multibulk commands is rejected, which would cause those
     * commands to incorrectly return an error to the client. */
    const int lookahead = authRequired(c) ? 1 : server.lookahead;

    /* Keep processing while there is something in the input buffer */
    while ((c->querybuf && c->qb_pos < sdslen(c->querybuf)) ||
           c->pending_cmds.ready_len > 0)
    {
        /* Immediately abort if the client is in the middle of something. */
        if (c->flags & CLIENT_BLOCKED || c->flags & CLIENT_UNBLOCKED) break;

        /* Don't process more buffers from clients that have already pending
         * commands to execute in c->argv. */
        if (c->flags & CLIENT_PENDING_COMMAND) break;

        /* Don't process input from the master while there is a busy script
         * condition on the slave. We want just to accumulate the replication
         * stream (instead of replying -BUSY like we do with other clients) and
         * later resume the processing. */
        if (c->flags & CLIENT_MASTER && isInsideYieldingLongCommand()) break;

        /* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is
         * written to the client. Make sure to not let the reply grow after
         * this flag has been set (i.e. don't process more commands).
         *
         * The same applies for clients we want to terminate ASAP. */
        if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;

        /* Determine if we need to parse more commands from the query buffer.
         * Only parse when there are no ready commands waiting to be processed. */
        const int parse_more = !c->pending_cmds.ready_len;
        int pending_cmd_before_reading = c->pending_cmds.ready_len;

        /* Parse up to lookahead commands only if we don't have enough ready commands */
        while (parse_more && c->pending_cmds.ready_len < lookahead &&
               c->querybuf && c->qb_pos < sdslen(c->querybuf))
        {
            /* Determine request type when unknown. */
            if (!c->reqtype) {
                if (c->querybuf[c->qb_pos] == '*') {
                    c->reqtype = PROTO_REQ_MULTIBULK;
                } else {
                    c->reqtype = PROTO_REQ_INLINE;
                }
            }

            pendingCommand *pcmd = NULL;
            if (c->reqtype == PROTO_REQ_INLINE) {
                pcmd = acquirePendingCommand();
                if (processInlineBuffer(c, pcmd) == C_ERR && !pcmd->read_error) {
                    /* If it fails but there are no errors, it means that it might just be
                     * that the desired content cannot be parsed. At this point, we exit and wait for the next time. */
                    freePendingCommand(c, pcmd);
                    break;
                }
            } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
                int incomplete = (c->pending_cmds.len != c->pending_cmds.ready_len);
                if (unlikely(incomplete)) {
                    pcmd = popPendingCommandFromTail(&c->pending_cmds);
                } else {
                    pcmd = acquirePendingCommand();
                }

                if (processMultibulkBuffer(c, pcmd) == C_ERR && !pcmd->read_error) {
                    /* If it fails but there are no errors, it means that it might just be
                     * that the desired content cannot be parsed. At this point, we exit and wait for the next time. */
                    freePendingCommand(c, pcmd);
                    break;
                }
            } else {
                serverPanic("Unknown request type");
            }

            addPendingCommand(&c->pending_cmds, pcmd);
            if (unlikely(pcmd->read_error || (pcmd->flags & PENDING_CMD_FLAG_INCOMPLETE)))
4 / 8

HELLO Negotiation

src/networking.c:4750

How helloCommand lets a client select RESP version, authenticate, and set a client name in one command

HELLO accepts an optional protocol version (2 or 3), an optional AUTH <user> <password> block, and an optional SETNAME <name>. Any version outside [2, 3] is rejected immediately with -NOPROTO.

Authentication and version switch happen in the same command execution. HELLO 3 AUTH default password runs ACLAuthenticateUser before c->authenticated is checked, so there is no window where the connection is unauthenticated but the version is already upgraded.

The version switch is one assignment: if (ver) c->resp = ver. All subsequent addReply* calls on this client branch on the new value of c->resp, so RESP3 prefix types take effect immediately. The HELLO response is itself a map (seven key-value pairs covering server identity, protocol version, client ID, mode, role, and loaded modules) because c->resp is already updated before addReplyMapLen is called.

Key takeaway

HELLO switches the client's protocol version with one assignment before building the response map, so the handshake reply itself is already encoded in the newly negotiated format. ---

/* HELLO [<protocol-version> [AUTH <user> <password>] [SETNAME <name>] ] */
void helloCommand(client *c) {
    long long ver = 0;
    int next_arg = 1;

    if (c->argc >= 2) {
        if (getLongLongFromObjectOrReply(c, c->argv[next_arg++], &ver,
            "Protocol version is not an integer or out of range") != C_OK) {
            return;
        }

        if (ver < 2 || ver > 3) {
            addReplyError(c,"-NOPROTO unsupported protocol version");
            return;
        }
    }

    robj *username = NULL;
    robj *password = NULL;
    robj *clientname = NULL;
    for (int j = next_arg; j < c->argc; j++) {
        int moreargs = (c->argc-1) - j;
        const char *opt = c->argv[j]->ptr;
        if (!strcasecmp(opt,"AUTH") && moreargs >= 2) {
            redactClientCommandArgument(c, j+1);
            redactClientCommandArgument(c, j+2);
            username = c->argv[j+1];
            password = c->argv[j+2];
            j += 2;
        } else if (!strcasecmp(opt,"SETNAME") && moreargs) {
            clientname = c->argv[j+1];
            const char *err = NULL;
            if (validateClientName(clientname, &err) == C_ERR) {
                addReplyError(c, err);
                return;
            }
            j++;
        } else {
            addReplyErrorFormat(c,"Syntax error in HELLO option '%s'",opt);
            return;
        }
    }

    if (username && password) {
        robj *err = NULL;
        int auth_result = ACLAuthenticateUser(c, username, password, &err);
        if (auth_result == AUTH_ERR) {
            addAuthErrReply(c, err);
        }
        if (err) decrRefCount(err);
        /* In case of auth errors, return early since we already replied with an ERR.
         * In case of blocking module auth, we reply to the client/setname later upon unblocking. */
        if (auth_result == AUTH_ERR || auth_result == AUTH_BLOCKED) {
            return;
        }
    }

    /* At this point we need to be authenticated to continue. */
    if (!c->authenticated) {
        addReplyError(c,"-NOAUTH HELLO must be called with the client already "
                        "authenticated, otherwise the HELLO <proto> AUTH <user> <pass> "
                        "option can be used to authenticate the client and "
                        "select the RESP protocol version at the same time");
        return;
    }

    /* Now that we're authenticated, set the client name. */
    if (clientname) clientSetName(c, clientname, NULL);

    /* Let's switch to the specified RESP mode. */
    if (ver) c->resp = ver;
    addReplyMapLen(c,6 + !server.sentinel_mode);

    ADD_REPLY_BULK_CBUFFER_STRING_CONSTANT(c,"server");
    ADD_REPLY_BULK_CBUFFER_STRING_CONSTANT(c,"redis");

    ADD_REPLY_BULK_CBUFFER_STRING_CONSTANT(c,"version");
    ADD_REPLY_BULK_CBUFFER_STRING_CONSTANT(c,REDIS_VERSION);

    ADD_REPLY_BULK_CBUFFER_STRING_CONSTANT(c,"proto");

    addReplyLongLong(c,c->resp);

    ADD_REPLY_BULK_CBUFFER_STRING_CONSTANT(c,"id");
    addReplyLongLong(c,c->id);

    ADD_REPLY_BULK_CBUFFER_STRING_CONSTANT(c,"mode");
    if (server.sentinel_mode) addReplyBulkCString(c,"sentinel");
    else if (server.cluster_enabled) addReplyBulkCString(c,"cluster");
    else addReplyBulkCString(c,"standalone");

    if (!server.sentinel_mode) {
        ADD_REPLY_BULK_CBUFFER_STRING_CONSTANT(c,"role");
        addReplyBulkCString(c,server.masterhost ? "replica" : "master");
    }

    ADD_REPLY_BULK_CBUFFER_STRING_CONSTANT(c,"modules");
    addReplyLoadedModules(c);
}

/* This callback is bound to POST and "Host:" command names. Those are not
 * really commands, but are used in security attacks in order to talk to
 * Redis instances via HTTP, with a technique called "cross protocol scripting"
 * which exploits the fact that services like Redis will discard invalid
 * HTTP headers and will process what follows.
 *
 * As a protection against this attack, Redis will terminate the connection
 * when a POST or "Host:" header is seen, and will log the event from
 * time to time (to avoid creating a DOS as a result of too many logs). */
void securityWarningCommand(client *c) {
    static time_t logged_time = 0;
    time_t now = time(NULL);
5 / 8

Push Messages and Client Tracking

src/networking.c:505

How CLIENT_PUSHING and addReplyPushLen emit RESP3 push frames without colliding with command replies

void addReplyPushLen(client *c, long length) {
    serverAssert(c->resp >= 3);
    serverAssertWithInfo(c, NULL, c->flags & CLIENT_PUSHING);
    addReplyAggregateLen(c,length,'>');
}

RESP3 push frames let Redis send unsolicited data (key invalidation notifications from CLIENT TRACKING, pub/sub messages, and similar server-initiated events) without the client polling. On the wire, a push frame opens with > followed by the element count, then each element encoded normally. addReplyPushLen emits that > header; its two serverAssert guards enforce that push messages are RESP3-only (c->resp >= 3) and that CLIENT_PUSHING is set before the function is called.

CLIENT_PUSHING does two things. First, it bypasses the normal reply-suppression path (CLIENT_REPLY_OFF, CLIENT_REPLY_SKIP) so push payloads can always reach the output buffer. Second, it triggers a deferred-write path: if a subscribed client executes a command like PUBLISH to a channel it is itself subscribed to, Redis would otherwise interleave a push message in the middle of the PUBLISH reply.

The code in _addReplyToBufferOrList detects this collision (c == server.current_client and server.executing_client is set and the running command does not itself have a push-as-reply) and redirects the payload to server.pending_push_messages. Those messages flush after the command reply completes, preserving protocol ordering.

Key takeaway

CLIENT_PUSHING is not just a permission flag: it also triggers the deferred-write path that routes push payloads to server.pending_push_messages when they would otherwise interleave with a command reply. ---

     * buffer offset (see function comment) */
    reqresSaveClientReplyOffset(c);

    /* If we're processing a push message into the current client (i.e. executing PUBLISH
     * to a channel which we are subscribed to, then we wanna postpone that message to be added
     * after the command's reply (specifically important during multi-exec). the exception is
     * the SUBSCRIBE command family, which (currently) have a push message instead of a proper reply.
     * The check for executing_client also avoids affecting push messages that are part of eviction.
     * Check CLIENT_PUSHING first to avoid race conditions, as it's absent in module's fake client. */
    if ((c->flags & CLIENT_PUSHING) && c == server.current_client &&
        server.executing_client && !cmdHasPushAsReply(server.executing_client->cmd))
    {
        _addReplyPayloadToList(c,server.pending_push_messages,s,len,PLAIN_REPLY);
        return;
    }


void addReplyPushLen(client *c, long length) {
    serverAssert(c->resp >= 3);
    serverAssertWithInfo(c, NULL, c->flags & CLIENT_PUSHING);
    addReplyAggregateLen(c,length,'>');
}

void addReplyNull(client *c) {
    if (c->resp == 2) {
        addReplyProto(c,"$-1\r\n",5);
6 / 8

Reply Types and Output Buffer Structure

src/networking.c:576

How addReply, addReplyBulk, and addReplyArrayLen write serialised RESP bytes into the client output buffer

Every reply function eventually calls _addReplyToBufferOrList, which writes bytes into one of two places:

  1. c->buf — a fixed-size per-client static buffer (16KB by default)
  2. c->reply — a linked list of SDS strings that receives overflow when c->buf is full

The event loop drains both into the socket via the write handler. Small replies never allocate heap memory beyond the pre-allocated c->buf, which is the common case.

addReply is the lowest-level call above the raw _addReplyToBufferOrList. It accepts a Redis object, handles SDS and integer-as-pointer encodings, and writes already-serialised bytes. This makes addReply correct only for objects whose wire encoding is already complete (pre-formatted shared objects like shared.ok, shared.czero, and shared.wrongtypeerr).

addReplyBulkWithFlag constructs the $<len>\r\n<payload>\r\n envelope, with an optional copy-avoidance path (tryAvoidBulkStrCopyToReply) that references the object directly to save a memcpy for large strings. addReplyArrayLen emits only the *<count>\r\n header; callers are responsible for emitting exactly count elements after it. addReplyMapLen does the same for map headers, inserting the RESP2/3 prefix branch at the aggregate level.

Key takeaway

The output buffer is a static array followed by a linked-list overflow. addReply writes pre-serialised bytes; addReplyBulk and addReplyArrayLen construct RESP envelopes on the fly. Every other reply helper is a thin wrapper over these three primitives. ---

void addReply(client *c, robj *obj) {
    if (_prepareClientToWrite(c) != C_OK) return;

    if (sdsEncodedObject(obj)) {
        _addReplyToBufferOrList(c,obj->ptr,sdslen(obj->ptr));
    } else if (obj->encoding == OBJ_ENCODING_INT) {
        /* For integer encoded strings we just convert it into a string
         * using our optimized function, and attach the resulting string
         * to the output buffer. */
        char buf[32];
        size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
        _addReplyToBufferOrList(c,buf,len);
    } else {
        serverPanic("Wrong obj->encoding in addReply()");
    }
}

/* Add the SDS 's' string to the client output buffer, as a side effect
 * the SDS string is freed. */
void addReplySds(client *c, sds s) {

void addReplyArrayLen(client *c, long length) {
    serverAssert(length >= 0);
    if (_prepareClientToWrite(c) != C_OK) return;
    _addReplyLongLongMBulk(c, length);
}

void addReplyMapLen(client *c, long length) {
    int prefix = c->resp == 2 ? '*' : '%';
    if (c->resp == 2) length *= 2;
    addReplyAggregateLen(c,length,prefix);
}

void addReplySetLen(client *c, long length) {
    int prefix = c->resp == 2 ? '*' : '~';
    addReplyAggregateLen(c,length,prefix);
}

void addReplyAttributeLen(client *c, long length) {
    serverAssert(c->resp >= 3);
    addReplyAggregateLen(c,length,'|');
}

void addReplyPushLen(client *c, long length) {
    serverAssert(c->resp >= 3);
    serverAssertWithInfo(c, NULL, c->flags & CLIENT_PUSHING);
    addReplyAggregateLen(c,length,'>');
}

void addReplyNull(client *c) {
    if (c->resp == 2) {
        addReplyProto(c,"$-1\r\n",5);

void addReplyBulkWithFlag(client *c, robj *obj, int avoid_copy) {
    if (_prepareClientToWrite(c) != C_OK) return;

    if (sdsEncodedObject(obj)) {
        const size_t len = sdslen(obj->ptr);
        if (avoid_copy && tryAvoidBulkStrCopyToReply(c, obj, len) == C_OK)
            return;
        _addReplyLongLongBulk(c, len);
        _addReplyToBufferOrList(c,obj->ptr,len);
        _addReplyToBufferOrList(c,"\r\n",2);
    } else if (obj->encoding == OBJ_ENCODING_INT) {
        /* For integer encoded strings we just convert it into a string
         * using our optimized function, and attach the resulting string
         * to the output buffer. */
        char buf[34];
        size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
        buf[len] = '\r';
        buf[len+1] = '\n';
        _addReplyLongLongBulk(c, len);
        _addReplyToBufferOrList(c,buf,len+2);
    } else {
        serverPanic("Wrong obj->encoding in addReply()");
    }
}

/* Add a Redis Object as a bulk reply */
void addReplyBulk(client *c, robj *obj) {
    addReplyBulkWithFlag(c, obj, 1);
}

/* Add a C buffer as bulk reply */
void addReplyBulkCBuffer(client *c, const void *p, size_t len) {
    if (_prepareClientToWrite(c) != C_OK) return;
    _addReplyLongLongBulk(c, len);
    _addReplyToBufferOrList(c, p, len);
    _addReplyToBufferOrList(c, "\r\n", 2);
}

/* Add sds to reply (takes ownership of sds and frees it) */
void addReplyBulkSds(client *c, sds s) {
    if (_prepareClientToWrite(c) != C_OK) {
        sdsfree(s);
        return;
    }
    _addReplyLongLongWithPrefix(c, sdslen(s), '$');
    _addReplyToBufferOrList(c, s, sdslen(s));
    sdsfree(s);
    _addReplyToBufferOrList(c, "\r\n", 2);
}
7 / 8

Pipelining Mechanics

src/networking.c:3514

How processInputBuffer loops over multiple commands queued in the same read before executing any of them

Pipelining in Redis is not a special mode. It is a direct consequence of how processInputBuffer is structured. The outer while loop runs as long as there are bytes in c->querybuf or ready commands in c->pending_cmds. The inner parsing loop fills c->pending_cmds up to lookahead entries before execution begins.

lookahead defaults to server.lookahead (configurable, typically several commands) but is capped at 1 for unauthenticated connections. That cap prevents a vector where slow authentication is exploited to pre-parse many commands before credentials are checked.

After the inner loop exits, the outer loop pops c->pending_cmds.head, populates the legacy c->argc/c->argv fields for backward compatibility with command handlers, and calls processCommandAndResetClient. Replies accumulate in the output buffer across all commands; the socket write does not happen until the event loop returns to the beforeSleep hook. A client that sends SET a 1\r\nGET a\r\n in a single TCP write will have both commands parsed and executed in one pass, with both replies batched into c->buf before any bytes leave the server. The stat_avg_pipeline_length_sum and stat_avg_pipeline_length_cnt counters track average pipeline depth, exposed in INFO stats.

Key takeaway

Pipelining is implicit: processInputBuffer always exhausts c->querybuf before returning, parsing and queuing as many complete commands as it finds. No special client or server configuration is required. ---

int processInputBuffer(client *c) {
    atomicIncr(server.stat_total_client_process_input_buff_events, 1);

    /* Keep active-client window updates on main-thread paths only (here and
     * in IO-thread handoff processing) to avoid races with serverCron()
     * maintenance of the circular slots. */
    if (c->running_tid == IOTHREAD_MAIN_THREAD_ID)
        statsUpdateActiveClients(c);

    /* We limit the lookahead for unauthenticated connections to 1.
     * This is both to reduce memory overhead, and to prevent errors: AUTH can
     * affect the handling of succeeding commands. Parsing of "large"
     * unauthenticated multibulk commands is rejected, which would cause those
     * commands to incorrectly return an error to the client. */
    const int lookahead = authRequired(c) ? 1 : server.lookahead;

    /* Keep processing while there is something in the input buffer */

        while (parse_more && c->pending_cmds.ready_len < lookahead &&
               c->querybuf && c->qb_pos < sdslen(c->querybuf))
        {
            /* Determine request type when unknown. */
            if (!c->reqtype) {
                if (c->querybuf[c->qb_pos] == '*') {
                    c->reqtype = PROTO_REQ_MULTIBULK;
                } else {
                    c->reqtype = PROTO_REQ_INLINE;
                }
            }

            pendingCommand *pcmd = NULL;
            if (c->reqtype == PROTO_REQ_INLINE) {
                pcmd = acquirePendingCommand();
                if (processInlineBuffer(c, pcmd) == C_ERR && !pcmd->read_error) {
                    /* If it fails but there are no errors, it means that it might just be
                     * that the desired content cannot be parsed. At this point, we exit and wait for the next time. */
                    freePendingCommand(c, pcmd);
                    break;
                }
            } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
                int incomplete = (c->pending_cmds.len != c->pending_cmds.ready_len);
                if (unlikely(incomplete)) {
                    pcmd = popPendingCommandFromTail(&c->pending_cmds);
                } else {
                    pcmd = acquirePendingCommand();
                }

                if (processMultibulkBuffer(c, pcmd) == C_ERR && !pcmd->read_error) {
                    /* If it fails but there are no errors, it means that it might just be
                     * that the desired content cannot be parsed. At this point, we exit and wait for the next time. */
                    freePendingCommand(c, pcmd);
                    break;
                }
            } else {
                serverPanic("Unknown request type");
            }

            addPendingCommand(&c->pending_cmds, pcmd);
            if (unlikely(pcmd->read_error || (pcmd->flags & PENDING_CMD_FLAG_INCOMPLETE)))
                break;

            if (c->running_tid == IOTHREAD_MAIN_THREAD_ID)
                pcmd->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
            else
                pcmd->reploff = c->io_read_reploff - sdslen(c->querybuf) + c->qb_pos;

            preprocessCommand(c, pcmd);
            pcmd->flags |= PENDING_CMD_FLAG_PREPROCESSED;
            resetClientQbufState(c);
        }

        if (c->pending_cmds.ready_len != pending_cmd_before_reading) {
            int newly_parsed_cmds = c->pending_cmds.ready_len - pending_cmd_before_reading;
            atomicIncr(server.stat_avg_pipeline_length_sum, newly_parsed_cmds);
            atomicIncr(server.stat_avg_pipeline_length_cnt, 1);

            c->stat_avg_pipeline_length_sum += newly_parsed_cmds;
            c->stat_avg_pipeline_length_cnt++;
        }
8 / 8

Error Categories

src/networking.c:627

How addReplyErrorLength auto-prefixes -ERR and how callers supply their own category prefix for WRONGTYPE, MOVED, ASK, NOAUTH, and others

/* server.c: shared error objects built at startup */
shared.wrongtypeerr = createObject(OBJ_STRING,sdsnew(
    "-WRONGTYPE Operation against a key holding the wrong kind of value\r\n"));
shared.noautherr = createObject(OBJ_STRING,sdsnew(
    "-NOAUTH Authentication required.\r\n"));
shared.roslaveerr = createObject(OBJ_STRING,sdsnew(
    "-READONLY You can't write against a read only replica.\r\n"));
shared.oomerr = createObject(OBJ_STRING,sdsnew(
    "-OOM command not allowed when used memory > 'maxmemory'.\r\n"));
shared.busykeyerr = createObject(OBJ_STRING,sdsnew(
    "-BUSYKEY Target key name already exists.\r\n"));
/* cluster.c: MOVED and ASK redirection errors */
addReplyErrorSds(c,sdscatprintf(sdsempty(),
                                "-%s %d %s:%d",
                                (error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
                                hashslot, clusterNodePreferredEndpoint(n), port));

All Redis error replies are --prefixed strings terminated with \r\n. The category word immediately after - is what clients parse to distinguish error types without string-matching the human-readable message.

**addReplyErrorLength is the single function all addReplyError* variants ultimately call.** Its logic is a two-line heuristic: if the string already starts with -, the caller has supplied the category prefix and addReplyErrorLength writes it as-is; if it does not start with -, -ERR is prepended automatically.

Structured error prefixes serve two audiences:

  • Client libraries use them for typed exceptions: -WRONGTYPE triggers a type-mismatch exception, prompting developers to fix their data model rather than retry the command.
  • -MOVED <slot> <host>:<port> and -ASK <slot> <host>:<port> carry machine-readable routing information so clients can transparently redirect requests to the correct cluster shard.
  • -NOAUTH surfaces as an authentication exception in most client libraries.

The afterErrorReply function (called on the path back through the addReplyError wrappers) parses the prefix and calls incrementErrorCount to track per-category error rates, which appear in INFO errorstats.

Key takeaway

addReplyErrorLength auto-inserts -ERR only when the caller omits a category prefix. Callers that need a typed error (WRONGTYPE, MOVED, ASK, NOAUTH) pass a string starting with - and own their prefix entirely, which also drives per-category error statistics in INFO errorstats. ---

void addReplyErrorLength(client *c, const char *s, size_t len) {
    /* If the string already starts with "-..." then the error code
     * is provided by the caller. Otherwise we use "-ERR". */
    if (!len || s[0] != '-') addReplyProto(c,"-ERR ",5);
    addReplyProto(c,s,len);
    addReplyProto(c,"\r\n",2);
}

/* Do some actions after an error reply was sent (Log if needed, updates stats, etc.)
 * Possible flags:
 * * ERR_REPLY_FLAG_NO_STATS_UPDATE - indicate not to update any error stats. */
void afterErrorReply(client *c, const char *s, size_t len, int flags) {

    shared.wrongtypeerr = createObject(OBJ_STRING,sdsnew(
        "-WRONGTYPE Operation against a key holding the wrong kind of value\r\n"));
    shared.err = createObject(OBJ_STRING,sdsnew("-ERR\r\n"));
    shared.nokeyerr = createObject(OBJ_STRING,sdsnew(
        "-ERR no such key\r\n"));
    shared.syntaxerr = createObject(OBJ_STRING,sdsnew(
        "-ERR syntax error\r\n"));
    shared.sameobjecterr = createObject(OBJ_STRING,sdsnew(
        "-ERR source and destination objects are the same\r\n"));
    shared.outofrangeerr = createObject(OBJ_STRING,sdsnew(
        "-ERR index out of range\r\n"));
    shared.noscripterr = createObject(OBJ_STRING,sdsnew(
        "-NOSCRIPT No matching script. Please use EVAL.\r\n"));
    shared.loadingerr = createObject(OBJ_STRING,sdsnew(
        "-LOADING Redis is loading the dataset in memory\r\n"));
    shared.slowevalerr = createObject(OBJ_STRING,sdsnew(
        "-BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.\r\n"));
    shared.slowscripterr = createObject(OBJ_STRING,sdsnew(
        "-BUSY Redis is busy running a script. You can only call FUNCTION KILL or SHUTDOWN NOSAVE.\r\n"));
    shared.slowmoduleerr = createObject(OBJ_STRING,sdsnew(
        "-BUSY Redis is busy running a module command.\r\n"));
    shared.masterdownerr = createObject(OBJ_STRING,sdsnew(
        "-MASTERDOWN Link with MASTER is down and replica-serve-stale-data is set to 'no'.\r\n"));
    shared.bgsaveerr = createObject(OBJ_STRING,sdsnew(
        "-MISCONF Redis is configured to save RDB snapshots, but it's currently unable to persist to disk. Commands that may modify the data set are disabled, because this instance is configured to report errors during writes if RDB snapshotting fails (stop-writes-on-bgsave-error option). Please check the Redis logs for details about the RDB error.\r\n"));
    shared.roslaveerr = createObject(OBJ_STRING,sdsnew(
        "-READONLY You can't write against a read only replica.\r\n"));
    shared.noautherr = createObject(OBJ_STRING,sdsnew(
        "-NOAUTH Authentication required.\r\n"));
    shared.oomerr = createObject(OBJ_STRING,sdsnew(
        "-OOM command not allowed when used memory > 'maxmemory'.\r\n"));
    shared.execaborterr = createObject(OBJ_STRING,sdsnew(
        "-EXECABORT Transaction discarded because of previous errors.\r\n"));
    shared.noreplicaserr = createObject(OBJ_STRING,sdsnew(
        "-NOREPLICAS Not enough good replicas to write.\r\n"));
    shared.busykeyerr = createObject(OBJ_STRING,sdsnew(
        "-BUSYKEY Target key name already exists.\r\n"));

    /* The shared NULL depends on the protocol version. */
    shared.null[0] = NULL;
    shared.null[1] = NULL;
    shared.null[2] = createObject(OBJ_STRING,sdsnew("$-1\r\n"));
    shared.null[3] = createObject(OBJ_STRING,sdsnew("_\r\n"));

    shared.nullarray[0] = NULL;
    shared.nullarray[1] = NULL;
    shared.nullarray[2] = createObject(OBJ_STRING,sdsnew("*-1\r\n"));
    shared.nullarray[3] = createObject(OBJ_STRING,sdsnew("_\r\n"));

                                        (error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
                                        hashslot, clusterNodePreferredEndpoint(n), port));
    } else if (error_code == CLUSTER_REDIR_TRIMMING) {
        addReplyError(c,"-TRYAGAIN Slot is being trimmed");
    } else {
        serverPanic("getNodeByQuery() unknown error.");
    }
}

/* This function is called by the function processing clients incrementally
Your codebase next

Create code tours for your project

Intraview lets AI create interactive walkthroughs of any codebase. Install the free VS Code extension and generate your first tour in minutes.

Install Intraview Free