diff --git a/src/replication.c b/src/replication.c index 176b9c2a7e..8660af244e 100644 --- a/src/replication.c +++ b/src/replication.c @@ -3178,45 +3178,41 @@ int replicaTryPartialResynchronization(connection *conn, int read_reply) { return PSYNC_NOT_SUPPORTED; } -/* Replication: Replica side. - * This connection handler fires after rdb-connection was initialized. We use it - * to adjust the replica main for loading incremental changes into the local buffer. */ -void setupMainConnForPsync(connection *conn) { - int psync_result = -1; +int dualChannelReplicationMainConnSendHandshake(connection *conn, char **err) { char llstr[LONG_STR_SIZE]; - char *err = NULL; - if (server.repl_state == REPL_STATE_SEND_HANDSHAKE) { - /* We already have an initialized connection at primary side, we only need to associate it with RDB connection */ - ull2string(llstr, sizeof(llstr), server.rdb_client_id); - err = sendCommand(conn, "REPLCONF", "set-rdb-client-id", llstr, NULL); - if (err) goto error; - server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY; - goto cleanup; - } + ull2string(llstr, sizeof(llstr), server.rdb_client_id); + *err = sendCommand(conn, "REPLCONF", "set-rdb-client-id", llstr, NULL); + if (*err) return C_ERR; + server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY; + return C_OK; +} - if (server.repl_state == REPL_STATE_RECEIVE_CAPA_REPLY) { - err = receiveSynchronousResponse(conn); - if (err == NULL) goto error; - if (err[0] == '-') { - serverLog(LL_NOTICE, "Primary does not understand REPLCONF identify: %s", err); - goto error; - } - sdsfree(err); - err = NULL; - server.repl_state = REPL_STATE_SEND_PSYNC; +int dualChannelReplicationMainConnReceiveCapaReply(connection *conn, char **err) { + *err = receiveSynchronousResponse(conn); + if (*err == NULL) return C_ERR; + if ((*err)[0] == '-') { + serverLog(LL_NOTICE, "Primary does not understand REPLCONF identify: %s", *err); + return C_ERR; } + server.repl_state = REPL_STATE_SEND_PSYNC; + return C_OK; +} - if (server.repl_state == REPL_STATE_SEND_PSYNC) { - if (server.debug_pause_after_fork) debugPauseProcess(); - if (replicaTryPartialResynchronization(conn, 0) == PSYNC_WRITE_ERROR) { - serverLog(LL_WARNING, "Aborting dual channel sync. Write error."); - cancelReplicationHandshake(1); - } - server.repl_state = REPL_STATE_RECEIVE_PSYNC_REPLY; - goto cleanup; +int dualChannelReplicationMainConnSendPsync(connection *conn, char **err) { + UNUSED(err); + if (server.debug_pause_after_fork) debugPauseProcess(); + if (replicaTryPartialResynchronization(conn, 0) == PSYNC_WRITE_ERROR) { + serverLog(LL_WARNING, "Aborting dual channel sync. Write error."); + return C_ERR; } - psync_result = replicaTryPartialResynchronization(conn, 1); - if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */ + server.repl_state = REPL_STATE_RECEIVE_PSYNC_REPLY; + return C_OK; +} + +int dualChannelReplicationMainConnReceivePsyncReply(connection *conn, char **err) { + UNUSED(err); + int psync_result = replicaTryPartialResynchronization(conn, 1); + if (psync_result == PSYNC_WAIT_REPLY) return C_OK; /* Try again later... */ if (psync_result == PSYNC_CONTINUE) { serverLog(LL_NOTICE, "Primary <-> REPLICA sync: Primary accepted a Partial Resynchronization%s", @@ -3226,16 +3222,45 @@ void setupMainConnForPsync(connection *conn) { "accept connections in read-write mode.\n"); } dualChannelSyncHandlePsync(); - goto cleanup; + return C_OK; } + return psync_result; +} + +/* Replication: Replica side. + * This connection handler fires after rdb-connection was initialized. We use it + * to adjust the replica main for loading incremental changes into the local buffer. */ +void setupMainConnForPsync(connection *conn) { + char *err = NULL; + int ret; + + switch (server.repl_state) { + case REPL_STATE_SEND_HANDSHAKE: ret = dualChannelReplicationMainConnSendHandshake(conn, &err); break; + case REPL_STATE_RECEIVE_CAPA_REPLY: + ret = dualChannelReplicationMainConnReceiveCapaReply(conn, &err); + if (ret == C_ERR) { + break; + } + sdsfree(err); + err = NULL; + /* fall through */ + case REPL_STATE_SEND_PSYNC: ret = dualChannelReplicationMainConnSendPsync(conn, &err); break; + case REPL_STATE_RECEIVE_PSYNC_REPLY: ret = dualChannelReplicationMainConnReceivePsyncReply(conn, &err); break; + default: + serverLog(LL_WARNING, "Unexpected replication state: %d", server.repl_state); + ret = C_ERR; + break; + } + sdsfree(err); + if (ret == C_ERR) goto error; + return; + error: /* The dual-channel sync session must be aborted for any psync_result other than PSYNC_CONTINUE or PSYNC_WAIT_REPLY. */ - serverLog(LL_WARNING, "Aborting dual channel sync. Main channel psync result %d", psync_result); + serverLog(LL_WARNING, "Aborting dual channel sync. Main channel psync result %d", ret); cancelReplicationHandshake(1); -cleanup: - sdsfree(err); } /*