Skip to content

Commit

Permalink
Fix use after free replication data block
Browse files Browse the repository at this point in the history
During replStreamProgressCallback replica may recive replicaof command
and hance free replica side replicaiton buffer. If replication buffer is
freed we should abort the streaming

Signed-off-by: naglera <[email protected]>
  • Loading branch information
naglera committed Jun 10, 2024
1 parent 0fa3260 commit a331234
Showing 1 changed file with 21 additions and 10 deletions.
31 changes: 21 additions & 10 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -2928,24 +2928,32 @@ void bufferReplData(connection *conn) {

/* Replication: Replica side.
* Streams accumulated replication data into the database while freeing read nodes */
void streamReplDataBufToDb(client *c) {
int streamReplDataBufToDb(client *c) {
serverAssert(c->flags & CLIENT_MASTER);
blockingOperationStarts();
size_t offset = 0;
size_t used, offset = 0;
listNode *cur = NULL;
time_t last_progress_callback = mstime();
while ((cur = listFirst(server.pending_repl_data.blocks))) {
while (server.pending_repl_data.blocks && (cur = listFirst(server.pending_repl_data.blocks))) {
/* Read and process repl data block */
replDataBufBlock *o = listNodeValue(cur);
c->querybuf = sdscatlen(c->querybuf, o->buf, o->used);
c->read_reploff += o->used;
used = o->used;
c->querybuf = sdscatlen(c->querybuf, o->buf, used);
c->read_reploff += used;
processInputBuffer(c);
server.pending_repl_data.len -= o->used;
replStreamProgressCallback(offset, o->used, &last_progress_callback);
offset += o->used;
server.pending_repl_data.len -= used;
offset += used;
listDelNode(server.pending_repl_data.blocks, cur);
replStreamProgressCallback(offset, used, &last_progress_callback);
}
blockingOperationEnds();
if (!server.pending_repl_data.blocks) {
/* If we encounter a `replicaof` command during the replStreamProgressCallback,
* pending_repl_data.blocks will be NULL, and we should return an error and
* abort the current sync session. */
return C_ERR;
}
return C_OK;
}

/* Replication: Replica side.
Expand All @@ -2955,13 +2963,17 @@ void rdbChannelSyncSuccess(void) {
server.master_initial_offset = server.repl_provisional_master.reploff;
replicationResurrectProvisionalMaster();
/* Wait for the accumulated buffer to be processed before reading any more replication updates */
if (server.pending_repl_data.blocks) streamReplDataBufToDb(server.master);
if (streamReplDataBufToDb(server.master)) {
/* Sync session aborted during repl data streaming. */
return;
}
freePendingReplDataBuf();
serverLog(LL_NOTICE, "Successfully streamed replication data into memory");
/* We can resume reading from the master connection once the local replication buffer has been loaded. */
replicationSteadyStateInit();
replicationSendAck(); /* Send ACK to notify primary that replica is synced */
server.rdb_client_id = -1;
server.repl_rdb_conn_state = REPL_RDB_CONN_STATE_NONE;
}

/* Replication: Replica side.
Expand Down Expand Up @@ -3002,7 +3014,6 @@ void completeTaskRDBChannelSyncRdbConn(connection *conn) {
if (server.repl_state == REPL_STATE_TRANSFER) {
connSetReadHandler(server.repl_transfer_s, NULL);
rdbChannelSyncSuccess();
server.repl_rdb_conn_state = REPL_RDB_CONN_STATE_NONE;
return;
}
serverPanic("Unrecognized replication state %d using rdb connection", server.repl_state);
Expand Down

0 comments on commit a331234

Please sign in to comment.