Redis C redis/redis

Redis Replication: How Primaries and Replicas Stay in Sync

How a replica connects, bootstraps from an RDB snapshot, and stays current through command propagation

7 stops ~25 min Verified 2026-04-30
What you will learn
  • How the `repl_state` enum drives every handshake decision a replica makes on the way to CONNECTED
  • What triggers a full resync versus a partial resync, and the BGSAVE child fork that delivers the initial dataset
  • How the replication backlog ring buffer makes reconnection cheap: a replica that was briefly offline does not re-download the world
  • How every write command is fanned out to connected replicas through `replicationFeedSlaves()` after the AOF append
  • What `replicationCacheMaster()` saves when a TCP link drops, and how `replicationResurrectCachedMaster()` uses it on reconnect
  • Where and how a write to a read-only replica is rejected before it reaches command execution
  • How diskless replication eliminates the intermediate RDB file by streaming directly from a forked child to replica sockets
Prerequisites
  • Comfortable reading C (you don't need to write it, just follow the logic)
  • Basic understanding of Redis as a primary/replica system: what REPLICAOF does
  • Familiarity with the concept of fork() and copy-on-write memory is helpful but not required
1 / 7

The Replication State Machine

src/server.h:511

The states a replica connection moves through, from no active replication to fully connected

Every handshake decision a replica makes is encoded in a single field: server.repl_state. Reading this enum top to bottom is reading the protocol spec. REPLICAOF pushes the state to REPL_STATE_CONNECT, connectWithMaster() dials the non-blocking TCP socket and advances to REPL_STATE_CONNECTING, and from there syncWithMaster() walks the remaining states one network round-trip at a time.

Each handshake step sends exactly one message and waits for one reply:

  1. Send PING, wait for PONG
  2. Send AUTH if required
  3. Advertise listening port and IP via REPLCONF
  4. Negotiate capabilities
  5. Send PSYNC

REPL_STATE_TRANSFER holds while the RDB is being received. REPL_STATE_CONNECTED is the steady state where the replica consumes a live stream of write commands. The "must be ordered" comment on the handshake block is load-bearing: multiple places in the code use server.repl_state >= REPL_STATE_RECEIVE_PING_REPLY as shorthand for "currently in handshake," relying on numeric ordering rather than enumerating each state.

Key takeaway

The entire replica handshake lifecycle fits in one C enum: fourteen named states from NONE to CONNECTED, each representing a discrete point where the replica is waiting on a specific network response. ---


/* Slave replication state. Used in server.repl_state for slaves to remember
 * what to do next. */
typedef enum {
    REPL_STATE_NONE = 0,            /* No active replication */
    REPL_STATE_CONNECT,             /* Must connect to master */
    REPL_STATE_CONNECTING,          /* Connecting to master */
    /* --- Handshake states, must be ordered --- */
    REPL_STATE_RECEIVE_PING_REPLY,  /* Wait for PING reply */
    REPL_STATE_SEND_HANDSHAKE,      /* Send handshake sequence to master */
    REPL_STATE_RECEIVE_AUTH_REPLY,  /* Wait for AUTH reply */
    REPL_STATE_RECEIVE_PORT_REPLY,  /* Wait for REPLCONF reply */
    REPL_STATE_RECEIVE_IP_REPLY,    /* Wait for REPLCONF reply */
    REPL_STATE_RECEIVE_REQ_REPLY,   /* Wait for REPLCONF reply */
    REPL_STATE_RECEIVE_CAPA_REPLY,  /* Wait for REPLCONF reply */
    REPL_STATE_SEND_PSYNC,          /* Send PSYNC */
    REPL_STATE_RECEIVE_PSYNC_REPLY, /* Wait for PSYNC reply */
    /* --- End of handshake states --- */
    REPL_STATE_TRANSFER,        /* Receiving .rdb from master */
    REPL_STATE_CONNECTED,       /* Connected to master */
} repl_state;
2 / 7

Full Sync -- RDB Transfer Over the Wire

src/replication.c:1188

How a primary triggers a BGSAVE and delivers the full dataset to a new replica

syncCommand() runs on the primary when a replica sends SYNC (legacy) or PSYNC <replid> <offset> (modern). It first calls masterTryPartialResynchronization(). If that returns C_OK, no RDB is needed and the function exits. When partial sync is impossible, the function falls through to a full resync: the replica's client struct is tagged CLIENT_SLAVE, added to server.slaves, and its replstate is set to SLAVE_STATE_WAIT_BGSAVE_START.

From there, startBgsaveForReplication() either attaches the new replica to an in-progress BGSAVE or starts a fresh one. The fork happens in rdbSaveBackground(): the child serializes the entire keyspace to disk while the parent returns immediately to serve clients. When the child exits, updateSlavesWaitingBgsave() delivers the completed RDB to every replica that arrived during the save window.

Key takeaway

A full sync splits into two concurrent streams: a forked child writes the dataset snapshot, and the parent buffers new write commands. Both reach the replica in order, leaving no gap in the replicated sequence. ---

void syncCommand(client *c) {
    /* ignore SYNC if already slave or in monitor mode */
    if (c->flags & CLIENT_SLAVE) return;

    /* Check if this is a failover request to a replica with the same replid and
     * become a master if so. */
    if (c->argc > 3 && !strcasecmp(c->argv[0]->ptr,"psync") && 
        !strcasecmp(c->argv[3]->ptr,"failover"))
    {
        serverLog(LL_NOTICE, "Failover request received for replid %s.",
            (unsigned char *)c->argv[1]->ptr);
        if (!server.masterhost) {
            addReplyError(c, "PSYNC FAILOVER can't be sent to a master.");
            return;
        }

        if (!strcasecmp(c->argv[1]->ptr,server.replid)) {
            if (server.cluster_enabled) {
                clusterPromoteSelfToMaster();
            } else {
                replicationUnsetMaster();
            }
            sds client = catClientInfoString(sdsempty(),c);
            serverLog(LL_NOTICE,
                "MASTER MODE enabled (failover request from '%s')",client);
            sdsfree(client);
        } else {
            addReplyError(c, "PSYNC FAILOVER replid must match my replid.");
            return;            
        }
    }

    /* Don't let replicas sync with us while we're failing over */
    if (server.failover_state != NO_FAILOVER) {
        addReplyError(c,"-NOMASTERLINK Can't SYNC while failing over");
        return;
    }

    /* Refuse SYNC requests if we are a slave but the link with our master
     * is not ok... */
    if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) {
        addReplyError(c,"-NOMASTERLINK Can't SYNC while not connected with my master");
        return;
    }

    /* SYNC can't be issued when the server has pending data to send to
     * the client about already issued commands. We need a fresh reply
     * buffer registering the differences between the BGSAVE and the current
     * dataset, so that we can copy to other slaves if needed. */
    if (clientHasPendingReplies(c)) {
        addReplyError(c,"SYNC and PSYNC are invalid with pending output");
        return;
    }

    /* Fail sync if slave doesn't support EOF capability but wants a filtered RDB. This is because we force filtered
     * RDB's to be generated over a socket and not through a file to avoid conflicts with the snapshot files. Forcing
     * use of a socket is handled, if needed, in `startBgsaveForReplication`. */
    if (c->slave_req & SLAVE_REQ_RDB_MASK && !(c->slave_capa & SLAVE_CAPA_EOF)) {
        addReplyError(c,"Filtered replica requires EOF capability");
        return;
    }

    serverLog(LL_NOTICE,"Replica %s asks for synchronization",
        replicationGetSlaveName(c));

    /* Try a partial resynchronization if this is a PSYNC command.
     * If it fails, we continue with usual full resynchronization, however
     * when this happens replicationSetupSlaveForFullResync will replied
     * with:
     *
     * +FULLRESYNC <replid> <offset>
     *
     * So the slave knows the new replid and offset to try a PSYNC later
     * if the connection with the master is lost. */
    if (!strcasecmp(c->argv[0]->ptr,"psync")) {
        long long psync_offset;
        if (getLongLongFromObjectOrReply(c, c->argv[2], &psync_offset, NULL) != C_OK) {
            serverLog(LL_WARNING, "Replica %s asks for synchronization but with a wrong offset",
                      replicationGetSlaveName(c));
            return;
        }

        if (masterTryPartialResynchronization(c, psync_offset) == C_OK) {
            server.stat_sync_partial_ok++;
            return; /* No full resync needed, return. */
        } else {
            char *master_replid = c->argv[1]->ptr;

            /* Increment stats for failed PSYNCs, but only if the
             * replid is not "?", as this is used by slaves to force a full
             * resync on purpose when they are not able to partially
             * resync. */
            if (master_replid[0] != '?') server.stat_sync_partial_err++;
            if (c->slave_capa & SLAVE_CAPA_RDB_CHANNEL_REPL) {
                int len;
                char buf[128];
                /* Replica is capable of rdbchannel replication. This is
                 * replica's main channel. Let replica know full sync is needed.
                 * Replica will open another connection (rdbchannel). Once rdb
                 * delivery starts, we'll stream repl data to the main channel.*/
                c->flags |= CLIENT_SLAVE;
                c->replstate = SLAVE_STATE_WAIT_RDB_CHANNEL;
                c->repl_ack_time = server.unixtime;
                listAddNodeTail(server.slaves, c);
                createReplicationBacklogIfNeeded();

                serverLog(LL_NOTICE,
                          "Replica %s is capable of rdb channel synchronization, and partial sync isn't possible. "
                          "Full sync will continue with dedicated rdb channel.",
                          replicationGetSlaveName(c));

                /* Send +RDBCHANNELSYNC with client id so we can associate replica connections on master.*/
                len = snprintf(buf, sizeof(buf), "+RDBCHANNELSYNC %llu\r\n",
                               (unsigned long long) c->id);
                if (connWrite(c->conn, buf, strlen(buf)) != len)
                    freeClientAsync(c);

                return;
            }
        }
    } else {
        /* If a slave uses SYNC, we are dealing with an old implementation
         * of the replication protocol (like redis-cli --slave). Flag the client
         * so that we don't expect to receive REPLCONF ACK feedbacks. */
        c->flags |= CLIENT_PRE_PSYNC;
    }

    /* Full resynchronization. */
    server.stat_sync_full++;

    /* Setup the slave as one waiting for BGSAVE to start. The following code
     * paths will change the state if we handle the slave differently. */
    c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
    if (server.repl_disable_tcp_nodelay)
        connDisableTcpNoDelay(c->conn); /* Non critical if it fails. */
    c->repldbfd = -1;
    c->flags |= CLIENT_SLAVE;
    listAddNodeTail(server.slaves,c);

    /* Create the replication backlog if needed. */
    createReplicationBacklogIfNeeded();

    /* Keep the client in the main thread to avoid data races between the
     * connWrite call in startBgsaveForReplication and the client's event
     * handler in IO threads. */
    if (c->tid != IOTHREAD_MAIN_THREAD_ID) keepClientInMainThread(c);

    /* CASE 1: BGSAVE is in progress, with disk target. */
    if (server.child_type == CHILD_TYPE_RDB &&
        server.rdb_child_type == RDB_CHILD_TYPE_DISK)
    {
        /* Ok a background save is in progress. Let's check if it is a good
         * one for replication, i.e. if there is another slave that is
         * registering differences since the server forked to save. */
        client *slave;
        listNode *ln;
        listIter li;

        listRewind(server.slaves,&li);
        while((ln = listNext(&li))) {
            slave = ln->value;
            /* If the client needs a buffer of commands, we can't use
             * a replica without replication buffer. */
            if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END &&
                (!(slave->flags & CLIENT_REPL_RDBONLY) ||
                 (c->flags & CLIENT_REPL_RDBONLY)))
                break;
        }
        /* To attach this slave, we check that it has at least all the
         * capabilities of the slave that triggered the current BGSAVE
         * and its exact requirements. */
        if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa) &&
            c->slave_req == slave->slave_req) {
            /* Perfect, the server is already registering differences for
             * another slave. Set the right state, and copy the buffer.
             * We don't copy buffer if clients don't want. */
            if (!(c->flags & CLIENT_REPL_RDBONLY))
                copyReplicaOutputBuffer(c,slave);
            replicationSetupSlaveForFullResync(c,slave->psync_initial_offset);
            serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC");
        } else {
            /* No way, we need to wait for the next BGSAVE in order to
             * register differences. */
            serverLog(LL_NOTICE,"Can't attach the replica to the current BGSAVE. Waiting for next BGSAVE for SYNC");
        }

    /* CASE 2: BGSAVE is in progress, with socket target. */
    } else if (server.child_type == CHILD_TYPE_RDB &&
               server.rdb_child_type == RDB_CHILD_TYPE_SOCKET)
    {
        /* There is an RDB child process but it is writing directly to
         * children sockets. We need to wait for the next BGSAVE
         * in order to synchronize. */
        serverLog(LL_NOTICE,"Current BGSAVE has socket target. Waiting for next BGSAVE for SYNC");

    /* CASE 3: There is no BGSAVE is in progress. */
    } else {
        if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF) &&
            server.repl_diskless_sync_delay)
        {
            /* Diskless replication RDB child is created inside
             * replicationCron() since we want to delay its start a
             * few seconds to wait for more slaves to arrive. */
            serverLog(LL_NOTICE,"Delay next BGSAVE for diskless SYNC");
        } else {
            /* We don't have a BGSAVE in progress, let's start one. Diskless
             * or disk-based mode is determined by replica's capacity. */
            if (!hasActiveChildProcess()) {
                startBgsaveForReplication(c->slave_capa, c->slave_req);
            } else {
                serverLog(LL_NOTICE,
                    "No BGSAVE in progress, but another BG operation is active. "
                    "BGSAVE for replication delayed");
3 / 7

Partial Sync -- PSYNC and the Replication Backlog

src/replication.c:986

How a reconnecting replica avoids re-downloading the full dataset

When a replica reconnects it sends PSYNC <replid> <offset>. masterTryPartialResynchronization() answers two questions in sequence. Both must pass for partial sync to succeed:

  • Replication ID valid? A mismatch means the primary's history diverged (failover, restart without persistence) and catch-up is impossible without a full resync. Redis checks a secondary replid (server.replid2) to handle the failover handoff case.
  • Offset still in backlog? server.repl_backlog is a ring buffer tracking the retained byte range. If psync_offset falls within [backlog->offset, backlog->offset + backlog->histlen], the data is still there.

When both checks pass, the primary sends +CONTINUE and addReplyReplicationBacklog() streams every buffered command from the requested offset forward. If either check fails, the function returns C_ERR and syncCommand() falls through to a full RDB transfer. repl-backlog-size is the operational knob here: too small and a lagging replica falls off the edge, forcing an expensive full resync. histlen tracks actual data present, not buffer capacity, so a partially filled backlog still works correctly.

Key takeaway

Partial sync succeeds when two conditions hold: the replica's remembered replication ID still matches the primary's history, and the replica's last-seen offset is still within the backlog window. Miss either, and a full RDB transfer restarts. ---

int masterTryPartialResynchronization(client *c, long long psync_offset) {
    long long psync_len;
    char *master_replid = c->argv[1]->ptr;
    char buf[128];
    int buflen;

    /* Is the replication ID of this master the same advertised by the wannabe
     * slave via PSYNC? If the replication ID changed this master has a
     * different replication history, and there is no way to continue.
     *
     * Note that there are two potentially valid replication IDs: the ID1
     * and the ID2. The ID2 however is only valid up to a specific offset. */
    if (strcasecmp(master_replid, server.replid) &&
        (strcasecmp(master_replid, server.replid2) ||
         psync_offset > server.second_replid_offset))
    {
        /* Replid "?" is used by slaves that want to force a full resync. */
        if (master_replid[0] != '?') {
            if (strcasecmp(master_replid, server.replid) &&
                strcasecmp(master_replid, server.replid2))
            {
                serverLog(LL_NOTICE,"Partial resynchronization not accepted: "
                    "Replication ID mismatch (Replica asked for '%s', my "
                    "replication IDs are '%s' and '%s')",
                    master_replid, server.replid, server.replid2);
            } else {
                serverLog(LL_NOTICE,"Partial resynchronization not accepted: "
                    "Requested offset for second ID was %lld, but I can reply "
                    "up to %lld", psync_offset, server.second_replid_offset);
            }
        } else {
            serverLog(LL_NOTICE,"Full resync requested by replica %s %s",
                replicationGetSlaveName(c),
                c->flags & CLIENT_REPL_RDB_CHANNEL ? "(rdb-channel)" : "");
        }
        goto need_full_resync;
    }

    /* We still have the data our slave is asking for? */
    if (!server.repl_backlog ||
        psync_offset < server.repl_backlog->offset ||
        psync_offset > (server.repl_backlog->offset + server.repl_backlog->histlen))
    {
        serverLog(LL_NOTICE,
            "Unable to partial resync with replica %s for lack of backlog (Replica request was: %lld).", replicationGetSlaveName(c), psync_offset);
        if (psync_offset > server.master_repl_offset) {
            serverLog(LL_WARNING,
                "Warning: replica %s tried to PSYNC with an offset that is greater than the master replication offset.", replicationGetSlaveName(c));
        }
        goto need_full_resync;
    }

    /* If we reached this point, we are able to perform a partial resync:
     * 1) Set client state to make it a slave.
     * 2) Inform the client we can continue with +CONTINUE
     * 3) Send the backlog data (from the offset to the end) to the slave. */
    c->flags |= CLIENT_SLAVE;
    c->replstate = SLAVE_STATE_ONLINE;
    c->repl_ack_time = server.unixtime;
    c->repl_start_cmd_stream_on_ack = 0;
    listAddNodeTail(server.slaves,c);
    /* We can't use the connection buffers since they are used to accumulate
     * new commands at this stage. But we are sure the socket send buffer is
     * empty so this write will never fail actually. */
    if (c->slave_capa & SLAVE_CAPA_PSYNC2) {
        buflen = snprintf(buf,sizeof(buf),"+CONTINUE %s\r\n", server.replid);
    } else {
        buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
    }
    if (connWrite(c->conn,buf,buflen) != buflen) {
        freeClientAsync(c);
        return C_OK;
    }
    psync_len = addReplyReplicationBacklog(c,psync_offset);
    serverLog(LL_NOTICE,
        "Partial resynchronization request from %s accepted. Sending %lld bytes of backlog starting from offset %lld.",
            replicationGetSlaveName(c),
            psync_len, psync_offset);
    /* Note that we don't need to set the selected DB at server.slaveseldb
     * to -1 to force the master to emit SELECT, since the slave already
     * has this state from the previous connection with the master. */

    refreshGoodSlavesCount();

    /* Fire the replica change modules event. */
    moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE,
                          REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE,
                          NULL);

    return C_OK; /* The caller can return, no full resync needed. */

need_full_resync:
    /* We need a full resync for some reason... Note that we can't
     * reply to PSYNC right now if a full SYNC is needed. The reply
     * must include the master offset at the time the RDB file we transfer
     * is generated, so we need to delay the reply to that moment. */
    return C_ERR;
}

/* Start a BGSAVE for replication goals, which is, selecting the disk or
 * socket target depending on the configuration, and making sure that
 * the script cache is flushed before to start.
 *
 * The mincapa argument is the bitwise AND among all the slaves capabilities
 * of the slaves waiting for this BGSAVE, so represents the slave capabilities
 * all the slaves support. Can be tested via SLAVE_CAPA_* macros.
 *
 * Side effects, other than starting a BGSAVE:
 *
 * 1) Handle the slaves in WAIT_START state, by preparing them for a full
 *    sync if the BGSAVE was successfully started, or sending them an error
 *    and dropping them from the list of slaves.
 *
 * 2) Flush the Lua scripting script cache if the BGSAVE was actually
 *    started.
4 / 7

Command Propagation

src/replication.c:631

How every write command is fanned out to connected replicas after execution

replicationFeedSlaves() is called from propagate() in server.c after a write command executes. The call chain is: call() detects a dirty count increase, propagate() fires, then feedAppendOnlyFile() handles AOF while replicationFeedSlaves() handles replicas. The function encodes the command in RESP (the same wire protocol the client used) and appends it to the global replication buffer via replBufWriterEnd().

All connected replicas share a view into that buffer. They are not sent separate copies, so memory overhead scales with replica lag, not replica count. The SELECT injection logic keeps replicas correct: replicas maintain their own notion of the active database, so a SELECT must be prepended whenever dictid changes between commands. One early-exit branch is worth noting: when server.masterhost != NULL, this instance is itself a replica and returns immediately. Mid-chain replicas receive the stream from their own primary rather than re-generating it.

Key takeaway

Write propagation is a single encoding pass into a shared replication buffer: all replicas read from the same buffer rather than receiving individually copied command streams, which keeps memory overhead proportional to lag rather than replica count. ---

void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
    int j, len;
    char llstr[LONG_STR_SIZE];

    /* In case we propagate a command that doesn't touch keys (PING, REPLCONF) we
     * pass dbid=-1 that indicate there is no need to replicate `select` command. */
    serverAssert(dictid == -1 || (dictid >= 0 && dictid < server.dbnum));

    /* If the instance is not a top level master, return ASAP: we'll just proxy
     * the stream of data we receive from our master instead, in order to
     * propagate *identical* replication stream. In this way this slave can
     * advertise the same replication ID as the master (since it shares the
     * master replication history and has the same backlog and offsets). */
    if (server.masterhost != NULL) return;

    /* If current client is marked as master, we will proxy the command stream
     * to our slaves instead of replicating them, that also happens when being
     * in atomic slot migration. */
    if (server.current_client && server.current_client->flags & CLIENT_MASTER) return;

    /* If there aren't slaves, and there is no backlog buffer to populate,
     * we can return ASAP. */
    if (server.repl_backlog == NULL && listLength(slaves) == 0) {
        /* We increment the repl_offset anyway, since we use that for tracking AOF fsyncs
         * even when there's no replication active. This code will not be reached if AOF
         * is also disabled. */
        server.master_repl_offset += 1;
        return;
    }

    /* We can't have slaves attached and no backlog. */
    serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));

    /* Update the time of sending replication stream to replicas. */
    server.repl_stream_lastio = server.unixtime;

    /* Must install write handler for all replicas first before feeding
     * replication stream. */
    prepareReplicasToWrite();

    /* Send SELECT command to every slave if needed. */
    if (dictid != -1 && server.slaveseldb != dictid) {
        robj *selectcmd;

        /* For a few DBs we have pre-computed SELECT command. */
        if (dictid >= 0 && dictid < PROTO_SHARED_SELECT_CMDS) {
            selectcmd = shared.select[dictid];
        } else {
            int dictid_len;

            dictid_len = ll2string(llstr,sizeof(llstr),dictid);
            selectcmd = createObject(OBJ_STRING,
                sdscatprintf(sdsempty(),
                "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
                dictid_len, llstr));
        }

        feedReplicationBuffer(selectcmd->ptr, sdslen(selectcmd->ptr));

        /* Although the SELECT command is not associated with any slot,
         * its per-slot network-bytes-out accumulation is made by the above function call.
         * To cancel-out this accumulation, below adjustment is made. */
        clusterSlotStatsDecrNetworkBytesOutForReplication(sdslen(selectcmd->ptr));

        if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS)
            decrRefCount(selectcmd);

        server.slaveseldb = dictid;
    }

    /* Write the command to the replication buffer if any. */
    char aux[LONG_STR_SIZE+3];
    replBufWriter wr;
    replBufWriterBegin(&wr);

    /* Write the multi bulk count */
    replBufWriterAppendBulkLen(&wr, '*', argc);

    for (j = 0; j < argc; j++) {
        /* Write the bulk count */
        long objlen = stringObjectLen(argv[j]);
        replBufWriterAppendBulkLen(&wr, '$', objlen);

        /* Write the bulk data */
        if (argv[j]->encoding == OBJ_ENCODING_INT) {
            len = ll2string(aux, sizeof(aux), (long)argv[j]->ptr);
            replBufWriterAppend(&wr, aux, len);
        } else {
            replBufWriterAppend(&wr, argv[j]->ptr, objlen);
        }
        replBufWriterAppend(&wr, "\r\n", 2);
    }

    replBufWriterEnd(&wr);
}

/* This is a debugging function that gets called when we detect something
 * wrong with the replication protocol: the goal is to peek into the
 * replication backlog and show a few final bytes to make simpler to
 * guess what kind of bug it could be. */
void showLatestBacklog(void) {
    if (server.repl_backlog == NULL) return;
    if (listLength(server.repl_buffer_blocks) == 0) return;
    if (server.hide_user_data_from_log) {
        serverLog(LL_NOTICE,"hide-user-data-from-log is on, skip logging backlog content to avoid spilling PII.");
        return;
    }

    size_t dumplen = 256;
    if (server.repl_backlog->histlen < (long long)dumplen)
        dumplen = server.repl_backlog->histlen;

    sds dump = sdsempty();
    listNode *node = listLast(server.repl_buffer_blocks);
    while(dumplen) {
        if (node == NULL) break;
        replBufBlock *o = listNodeValue(node);
        size_t thislen = o->used >= dumplen ? dumplen : o->used;
        sds head = sdscatrepr(sdsempty(), o->buf+o->used-thislen, thislen);
        sds tmp = sdscatsds(head, dump);
        sdsfree(dump);
        dump = tmp;
        dumplen -= thislen;
        node = listPrevNode(node);
    }

    /* Finally log such bytes: this is vital debugging info to
     * understand what happened. */
    serverLog(LL_NOTICE,"Latest backlog is: '%s'", dump);
    sdsfree(dump);
}

/* This function is used in order to proxy what we receive from our master
 * to our sub-slaves. Besides, we also proxy the replication stream from
 * the source node when being in atomic slot migration. */
5 / 7

Replica Reconnection

src/replication.c:4511

How the replica preserves master state across a TCP disconnect to enable partial resync on reconnect

void replicationDiscardCachedMaster(void) {
    if (server.cached_master == NULL) return;

    serverLog(LL_NOTICE,"Discarding previously cached master state.");
    server.cached_master->flags &= ~CLIENT_MASTER;
    freeClient(server.cached_master);
    server.cached_master = NULL;
}

/* Turn the cached master into the current master, using the file descriptor
 * passed as argument as the socket for the new master.
 *
 * This function is called when successfully setup a partial resynchronization
 * so the stream of data that we'll receive will start from where this
 * master left. */
void replicationResurrectCachedMaster(connection *conn) {
    serverAssert(server.cached_master->tid == IOTHREAD_MAIN_THREAD_ID);

    server.master = server.cached_master;
    server.cached_master = NULL;
    server.master->conn = conn;
    connSetPrivateData(server.master->conn, server.master);
    server.master->flags &= ~(CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP);
    server.master->authenticated = 1;
    server.master->lastinteraction = server.unixtime;
    server.repl_state = REPL_STATE_CONNECTED;
    server.repl_down_since = 0;
    server.repl_up_since = server.unixtime;

When the TCP link to the primary breaks, replicationCacheMaster() preserves the disconnected master's client struct rather than freeing it. The critical field saved is reploff: the byte offset marking exactly how far the replica has consumed the primary's stream. The struct is unlinked from the active client list (so it receives no further I/O events) and stored in server.cached_master.

On reconnect, the replica reads server.cached_master->reploff and uses that value as the <offset> in its PSYNC. If the primary's backlog still covers that offset, masterTryPartialResynchronization() returns C_OK and replicationResurrectCachedMaster() swaps the cached struct back onto the live connection. Replication resumes exactly where it left off, no RDB transfer required. replicationDiscardCachedMaster() is called only when a full resync is confirmed unavoidable, at which point the offset is stale and the struct is freed.

Key takeaway

The cached master struct is a bet: "the primary's backlog still covers my offset." Redis places that bet on every disconnect by preserving reploff, and redeems it if the primary confirms the offset is still within range on reconnect. ---

void replicationCacheMaster(client *c) {
    serverAssert(server.master != NULL && server.cached_master == NULL);
    serverAssert(server.master->tid == IOTHREAD_MAIN_THREAD_ID);
    serverLog(LL_NOTICE,"Caching the disconnected master state.");

    /* Unlink the client from the server structures. */
    unlinkClient(c);

    /* Reset the master client so that's ready to accept new commands:
     * we want to discard the non processed query buffers and non processed
     * offsets, including pending transactions, already populated arguments,
     * pending outputs to the master. */
    sdsclear(server.master->querybuf);
    server.master->qb_pos = 0;
    server.master->repl_applied = 0;
    server.master->read_reploff = server.master->reploff;
    server.master->reploff_next = 0;
    if (c->flags & CLIENT_MULTI) discardTransaction(c);
    listEmpty(c->reply);
    c->sentlen = 0;
    c->reply_bytes = 0;
    c->bufpos = 0;
    resetClient(c, -1);
    resetClientQbufState(c);

    /* Save the master. Server.master will be set to null later by
     * replicationHandleMasterDisconnection(). */
    server.cached_master = server.master;

    /* Invalidate the Peer ID cache. */
    if (c->peerid) {
        sdsfree(c->peerid);
        c->peerid = NULL;
    }
    /* Invalidate the Sock Name cache. */
    if (c->sockname) {
        sdsfree(c->sockname);
        c->sockname = NULL;
    }

    /* Caching the master happens instead of the actual freeClient() call,
     * so make sure to adjust the replication state. This function will
     * also set server.master to NULL. */
    replicationHandleMasterDisconnection();
}

/* This function is called when a master is turned into a slave, in order to
 * create from scratch a cached master for the new client, that will allow
 * to PSYNC with the slave that was promoted as the new master after a
 * failover.
 *
 * Assuming this instance was previously the master instance of the new master,
 * the new master will accept its replication ID, and potential also the
 * current offset if no data was lost during the failover. So we use our
 * current replication ID and offset in order to synthesize a cached master. */
void replicationCacheMasterUsingMyself(void) {
    serverLog(LL_NOTICE,
        "Before turning into a replica, using my own master parameters "
        "to synthesize a cached master: I may be able to synchronize with "
        "the new master with just a partial transfer.");

    /* This will be used to populate the field server.master->reploff
     * by replicationCreateMasterClient(). We'll later set the created
     * master as server.cached_master, so the replica will use such
     * offset for PSYNC. */
    server.master_initial_offset = server.master_repl_offset;

    /* The master client we create can be set to any DBID, because
     * the new master will start its replication stream with SELECT. */
    replicationCreateMasterClient(NULL,-1);

    /* Use our own ID / offset. */
    memcpy(server.master->replid, server.replid, sizeof(server.replid));

    /* Set as cached master. */
    unlinkClient(server.master);
    server.cached_master = server.master;
    server.master = NULL;
}

/* Free a cached master, called when there are no longer the conditions for
 * a partial resync on reconnection. */
void replicationDiscardCachedMaster(void) {
    if (server.cached_master == NULL) return;

    serverLog(LL_NOTICE,"Discarding previously cached master state.");
    server.cached_master->flags &= ~CLIENT_MASTER;
    freeClient(server.cached_master);
    server.cached_master = NULL;
}

/* Turn the cached master into the current master, using the file descriptor
 * passed as argument as the socket for the new master.
 *
 * This function is called when successfully setup a partial resynchronization
 * so the stream of data that we'll receive will start from where this
 * master left. */
void replicationResurrectCachedMaster(connection *conn) {
    serverAssert(server.cached_master->tid == IOTHREAD_MAIN_THREAD_ID);

    server.master = server.cached_master;
    server.cached_master = NULL;
    server.master->conn = conn;
    connSetPrivateData(server.master->conn, server.master);
    server.master->flags &= ~(CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP);
    server.master->authenticated = 1;
    server.master->lastinteraction = server.unixtime;
    server.repl_state = REPL_STATE_CONNECTED;
    server.repl_down_since = 0;
    server.repl_up_since = server.unixtime;
    if (server.repl_disconnect_start_time != 0) {
        server.repl_total_disconnect_time += server.unixtime - server.repl_disconnect_start_time;
        server.repl_disconnect_start_time = 0;
    }
    /* Fire the master link modules event. */
    moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
                          REDISMODULE_SUBEVENT_MASTER_LINK_UP,
                          NULL);

    /* Re-add to the list of clients. */
6 / 7

Read-Only Enforcement

src/server.c:4609

Where a write command sent to a read-only replica is rejected, before any command handler runs

Read-only enforcement is a single if block inside processCommand(), the central dispatch function that handles every client command. Three conditions must all be true to trigger rejection:

  • The instance is a replica (server.masterhost is set)
  • The replica-read-only config is enabled (server.repl_slave_ro)
  • The command carries the write flag (is_write_command, derived from getCommandFlags())

The !obey_client guard exempts the primary itself: when the primary propagates a write to its replicas, obey_client is set and the check is bypassed. The rejection sends shared.roslaveerr, a pre-allocated SDS string initialized at startup: "-READONLY You can't write against a read only replica.\r\n". Returning C_OK here signals that processCommand() considers the command handled; no further execution occurs. Setting replica-read-only no disables the check entirely, which is occasionally useful for caching layers that tolerate stale reads.

Key takeaway

Read-only enforcement is a guard in processCommand(), not in the command implementations themselves: a single flag check and a pre-formatted error reply short-circuit execution before any command handler is reached. ---

    /* Don't accept write commands if this is a read only slave. But
     * accept write commands if this is our master. */
    if (server.masterhost && server.repl_slave_ro &&
        !obey_client &&
        is_write_command)
    {
        rejectCommand(c, shared.roslaveerr);
        return C_OK;
    }
7 / 7

Diskless Replication

src/rdb.c:4395

How the primary streams RDB data directly to replica sockets without writing to disk

Disk-based full sync writes an RDB to disk and then reads it back to send: two sequential I/O operations competing for the same bandwidth. Diskless replication, enabled by repl-diskless-sync yes, skips the intermediate file. rdbSaveToSlavesSockets() is selected by startBgsaveForReplication() when the replica has advertised SLAVE_CAPA_EOF and server.repl_diskless_sync is set.

Before forking, two Unix pipes are created: rdb_pipe carries the RDB byte stream from child to parent, and safe_to_exit_pipe lets the parent signal when the child may safely terminate. The child calls rdbSaveRioWithEOFMark(), writing to the pipe's write end via rioInitWithFd(). The parent registers rdbPipeReadHandler() on the read end as an event-loop file event. As data arrives, the handler buffers a chunk in server.rdb_pipe_buff and fans it to all waiting replica sockets in a single pass. The TLS indirection exists because TLS session state is owned by the parent process; the child cannot safely write TLS records directly to sockets. The EOF marker at the end of the RDB lets replicas detect completion without knowing the file size in advance.

Key takeaway

Diskless replication is a pipe-and-fork pattern: the child produces the RDB byte-by-byte into a pipe, the parent fans each chunk to all replica sockets simultaneously, eliminating both disk writes and the sequential send-after-write delay. ---

int rdbSaveToSlavesSockets(int req, rdbSaveInfo *rsi) {
    listNode *ln;
    listIter li;
    pid_t childpid;
    int pipefds[2], rdb_pipe_write = 0, safe_to_exit_pipe = 0;
    int rdb_channel = server.repl_rdb_channel && (req & SLAVE_REQ_RDB_CHANNEL);
    int slots_req = req & SLAVE_REQ_SLOTS_SNAPSHOT;

    if (hasActiveChildProcess()) return C_ERR;

    /* Even if the previous fork child exited, don't start a new one until we
     * drained the pipe. */
    if (server.rdb_pipe_conns) return C_ERR;

    if (!rdb_channel) {
        /* Before to fork, create a pipe that is used to transfer the rdb bytes to
         * the parent, we can't let it write directly to the sockets, since in case
         * of TLS we must let the parent handle a continuous TLS state when the
         * child terminates and parent takes over. */
        if (anetPipe(pipefds, O_NONBLOCK, 0) == -1) return C_ERR;
        server.rdb_pipe_read = pipefds[0]; /* read end */
        rdb_pipe_write = pipefds[1]; /* write end */

        /* create another pipe that is used by the parent to signal to the child
         * that it can exit. */
        if (anetPipe(pipefds, 0, 0) == -1) {
            close(rdb_pipe_write);
            close(server.rdb_pipe_read);
            return C_ERR;
        }
        safe_to_exit_pipe = pipefds[0]; /* read end */
        server.rdb_child_exit_pipe = pipefds[1]; /* write end */
    }

    /* Collect the connections of the replicas we want to transfer
     * the RDB to, which are in WAIT_BGSAVE_START state. */
    int numconns = 0;
    connection **conns = zmalloc(sizeof(*conns) * listLength(server.slaves));
    listRewind(server.slaves,&li);
    while((ln = listNext(&li))) {
        client *slave = ln->value;
        if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
            /* Check slave has the exact requirements */
            if (slave->slave_req != req)
                continue;
            replicationSetupSlaveForFullResync(slave, getPsyncInitialOffset());
            conns[numconns++] = slave->conn;
            if (rdb_channel) {
                /* Put the socket in blocking mode to simplify RDB transfer. */
                connSendTimeout(slave->conn, server.repl_timeout * 1000);
                connBlock(slave->conn);
            }
        }
    }

    if (!rdb_channel) {
        server.rdb_pipe_conns = conns;
        server.rdb_pipe_numconns = numconns;
        server.rdb_pipe_numconns_writing = 0;
    }

    /* Create the child process. */
    if ((childpid = redisFork(CHILD_TYPE_RDB)) == 0) {
        /* Child */
        int retval, dummy;
        rio rdb;

        if (rdb_channel) {
            rioInitWithConnset(&rdb, conns, numconns);
        } else {
            rioInitWithFd(&rdb,rdb_pipe_write);
            /* Close the reading part, so that if the parent crashes, the child
             * will get a write error and exit. */
            close(server.rdb_pipe_read);
        }

        redisSetProcTitle("redis-rdb-to-slaves");
        redisSetCpuAffinity(server.bgsave_cpulist);

        /* Disable RDB compression and checksum in the fork child if requested.
         * The parent's configuration is not affected. */
        if (req & SLAVE_REQ_RDB_NO_COMPRESS)
            server.rdb_compression = 0;
        if (req & SLAVE_REQ_RDB_NO_CHECKSUM)
            server.rdb_checksum = 0;

        if (req & SLAVE_REQ_SLOTS_SNAPSHOT) {
            /* Slots snapshot is required */
            retval = slotSnapshotSaveRio(req, &rdb, NULL);
        } else {
            retval = rdbSaveRioWithEOFMark(req,&rdb,NULL,rsi);
        }

        if (retval == C_OK && rioFlush(&rdb) == 0)
            retval = C_ERR;

        if (retval == C_OK) {
            sendChildCowInfo(CHILD_INFO_TYPE_RDB_COW_SIZE, "RDB");
        }

        if (rdb_channel) {
            rioFreeConnset(&rdb);
        } else {
            rioFreeFd(&rdb);
            /* wake up the reader, tell it we're done. */
            close(rdb_pipe_write);
            close(server.rdb_child_exit_pipe); /* close write end so that we can detect the close on the parent. */
            /* hold exit until the parent tells us it's safe. we're not expecting
             * to read anything, just get the error when the pipe is closed. */
            dummy = read(safe_to_exit_pipe, pipefds, 1);
            UNUSED(dummy);
        }
        zfree(conns);
        exitFromChild((retval == C_OK) ? 0 : 1, 0);
    } else {
        /* Parent */
Your codebase next

Create code tours for your project

Intraview lets AI create interactive walkthroughs of any codebase. Install the free VS Code extension and generate your first tour in minutes.

Install Intraview Free