Skip to content

Commit

Permalink
refactor serupMainConn using switch
Browse files Browse the repository at this point in the history
Signed-off-by: naglera <[email protected]>
  • Loading branch information
naglera committed Sep 1, 2024
1 parent 33f94aa commit 6ad9291
Showing 1 changed file with 63 additions and 38 deletions.
101 changes: 63 additions & 38 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -3178,45 +3178,41 @@ int replicaTryPartialResynchronization(connection *conn, int read_reply) {
return PSYNC_NOT_SUPPORTED;
}

/* Replication: Replica side.
* This connection handler fires after rdb-connection was initialized. We use it
* to adjust the replica main for loading incremental changes into the local buffer. */
void setupMainConnForPsync(connection *conn) {
int psync_result = -1;
int dualChannelReplicationMainConnSendHandshake(connection *conn, char **err) {
char llstr[LONG_STR_SIZE];
char *err = NULL;
if (server.repl_state == REPL_STATE_SEND_HANDSHAKE) {
/* We already have an initialized connection at primary side, we only need to associate it with RDB connection */
ull2string(llstr, sizeof(llstr), server.rdb_client_id);
err = sendCommand(conn, "REPLCONF", "set-rdb-client-id", llstr, NULL);
if (err) goto error;
server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY;
goto cleanup;
}
ull2string(llstr, sizeof(llstr), server.rdb_client_id);
*err = sendCommand(conn, "REPLCONF", "set-rdb-client-id", llstr, NULL);
if (*err) return C_ERR;
server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY;
return C_OK;
}

if (server.repl_state == REPL_STATE_RECEIVE_CAPA_REPLY) {
err = receiveSynchronousResponse(conn);
if (err == NULL) goto error;
if (err[0] == '-') {
serverLog(LL_NOTICE, "Primary does not understand REPLCONF identify: %s", err);
goto error;
}
sdsfree(err);
err = NULL;
server.repl_state = REPL_STATE_SEND_PSYNC;
int dualChannelReplicationMainConnReceiveCapaReply(connection *conn, char **err) {
*err = receiveSynchronousResponse(conn);
if (*err == NULL) return C_ERR;
if ((*err)[0] == '-') {
serverLog(LL_NOTICE, "Primary does not understand REPLCONF identify: %s", *err);
return C_ERR;
}
server.repl_state = REPL_STATE_SEND_PSYNC;
return C_OK;
}

if (server.repl_state == REPL_STATE_SEND_PSYNC) {
if (server.debug_pause_after_fork) debugPauseProcess();
if (replicaTryPartialResynchronization(conn, 0) == PSYNC_WRITE_ERROR) {
serverLog(LL_WARNING, "Aborting dual channel sync. Write error.");
cancelReplicationHandshake(1);
}
server.repl_state = REPL_STATE_RECEIVE_PSYNC_REPLY;
goto cleanup;
int dualChannelReplicationMainConnSendPsync(connection *conn, char **err) {
UNUSED(err);
if (server.debug_pause_after_fork) debugPauseProcess();
if (replicaTryPartialResynchronization(conn, 0) == PSYNC_WRITE_ERROR) {
serverLog(LL_WARNING, "Aborting dual channel sync. Write error.");
return C_ERR;
}
psync_result = replicaTryPartialResynchronization(conn, 1);
if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */
server.repl_state = REPL_STATE_RECEIVE_PSYNC_REPLY;
return C_OK;
}

int dualChannelReplicationMainConnReceivePsyncReply(connection *conn, char **err) {
UNUSED(err);
int psync_result = replicaTryPartialResynchronization(conn, 1);
if (psync_result == PSYNC_WAIT_REPLY) return C_OK; /* Try again later... */

if (psync_result == PSYNC_CONTINUE) {
serverLog(LL_NOTICE, "Primary <-> REPLICA sync: Primary accepted a Partial Resynchronization%s",
Expand All @@ -3226,16 +3222,45 @@ void setupMainConnForPsync(connection *conn) {
"accept connections in read-write mode.\n");
}
dualChannelSyncHandlePsync();
goto cleanup;
return C_OK;
}

return psync_result;
}

/* Replication: Replica side.
* This connection handler fires after rdb-connection was initialized. We use it
* to adjust the replica main for loading incremental changes into the local buffer. */
void setupMainConnForPsync(connection *conn) {
char *err = NULL;
int ret;

switch (server.repl_state) {
case REPL_STATE_SEND_HANDSHAKE: ret = dualChannelReplicationMainConnSendHandshake(conn, &err); break;
case REPL_STATE_RECEIVE_CAPA_REPLY:
ret = dualChannelReplicationMainConnReceiveCapaReply(conn, &err);
if (ret == C_ERR) {
break;
}
sdsfree(err);
err = NULL;
/* fall through */
case REPL_STATE_SEND_PSYNC: ret = dualChannelReplicationMainConnSendPsync(conn, &err); break;
case REPL_STATE_RECEIVE_PSYNC_REPLY: ret = dualChannelReplicationMainConnReceivePsyncReply(conn, &err); break;
default:
serverLog(LL_WARNING, "Unexpected replication state: %d", server.repl_state);
ret = C_ERR;
break;
}
sdsfree(err);
if (ret == C_ERR) goto error;
return;

error:
/* The dual-channel sync session must be aborted for any psync_result other than PSYNC_CONTINUE or PSYNC_WAIT_REPLY.
*/
serverLog(LL_WARNING, "Aborting dual channel sync. Main channel psync result %d", psync_result);
serverLog(LL_WARNING, "Aborting dual channel sync. Main channel psync result %d", ret);
cancelReplicationHandshake(1);
cleanup:
sdsfree(err);
}

/*
Expand Down

0 comments on commit 6ad9291

Please sign in to comment.