diff --git a/src/replication.c b/src/replication.c index 7c402d668b..5bc95f4400 100644 --- a/src/replication.c +++ b/src/replication.c @@ -2928,24 +2928,32 @@ void bufferReplData(connection *conn) { /* Replication: Replica side. * Streams accumulated replication data into the database while freeing read nodes */ -void streamReplDataBufToDb(client *c) { +int streamReplDataBufToDb(client *c) { serverAssert(c->flags & CLIENT_MASTER); blockingOperationStarts(); - size_t offset = 0; + size_t used, offset = 0; listNode *cur = NULL; time_t last_progress_callback = mstime(); - while ((cur = listFirst(server.pending_repl_data.blocks))) { + while (server.pending_repl_data.blocks && (cur = listFirst(server.pending_repl_data.blocks))) { /* Read and process repl data block */ replDataBufBlock *o = listNodeValue(cur); - c->querybuf = sdscatlen(c->querybuf, o->buf, o->used); - c->read_reploff += o->used; + used = o->used; + c->querybuf = sdscatlen(c->querybuf, o->buf, used); + c->read_reploff += used; processInputBuffer(c); - server.pending_repl_data.len -= o->used; - replStreamProgressCallback(offset, o->used, &last_progress_callback); - offset += o->used; + server.pending_repl_data.len -= used; + offset += used; listDelNode(server.pending_repl_data.blocks, cur); + replStreamProgressCallback(offset, used, &last_progress_callback); } blockingOperationEnds(); + if (!server.pending_repl_data.blocks) { + /* If we encounter a `replicaof` command during the replStreamProgressCallback, + * pending_repl_data.blocks will be NULL, and we should return an error and + * abort the current sync session. */ + return C_ERR; + } + return C_OK; } /* Replication: Replica side. @@ -2955,13 +2963,17 @@ void rdbChannelSyncSuccess(void) { server.master_initial_offset = server.repl_provisional_master.reploff; replicationResurrectProvisionalMaster(); /* Wait for the accumulated buffer to be processed before reading any more replication updates */ - if (server.pending_repl_data.blocks) streamReplDataBufToDb(server.master); + if (streamReplDataBufToDb(server.master)) { + /* Sync session aborted during repl data streaming. */ + return; + } freePendingReplDataBuf(); serverLog(LL_NOTICE, "Successfully streamed replication data into memory"); /* We can resume reading from the master connection once the local replication buffer has been loaded. */ replicationSteadyStateInit(); replicationSendAck(); /* Send ACK to notify primary that replica is synced */ server.rdb_client_id = -1; + server.repl_rdb_conn_state = REPL_RDB_CONN_STATE_NONE; } /* Replication: Replica side. @@ -3002,7 +3014,6 @@ void completeTaskRDBChannelSyncRdbConn(connection *conn) { if (server.repl_state == REPL_STATE_TRANSFER) { connSetReadHandler(server.repl_transfer_s, NULL); rdbChannelSyncSuccess(); - server.repl_rdb_conn_state = REPL_RDB_CONN_STATE_NONE; return; } serverPanic("Unrecognized replication state %d using rdb connection", server.repl_state);