Skip to content

Commit

Permalink
Fix comments
Browse files Browse the repository at this point in the history
renaming and refactor

Signed-off-by: naglera <[email protected]>
  • Loading branch information
naglera committed Sep 4, 2024
1 parent 6ad9291 commit bf1fab1
Showing 1 changed file with 29 additions and 32 deletions.
61 changes: 29 additions & 32 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ int replicaPutOnline(client *replica);
void replicaStartCommandStream(client *replica);
int cancelReplicationHandshake(int reconnect);
void replicationSteadyStateInit(void);
void setupMainConnForPsync(connection *conn);
void dualChannelSetupMainConnForPsync(connection *conn);
void dualChannelSyncHandleRdbLoadCompletion(void);
static void fullSyncWithPrimary(connection *conn);
static void dualChannelFullSyncWithPrimary(connection *conn);

/* We take a global flag to remember if this instance generated an RDB
* because of replication, so that we can remove the RDB file in case
Expand Down Expand Up @@ -2564,7 +2564,7 @@ int sendCurrentOffsetToReplica(client *replica) {
return C_OK;
}

static int dualChannelRepliacationHandleHandshake(connection *conn, char **err) {
static int dualChannelReplHandleHandshake(connection *conn, char **err) {
serverLog(LL_DEBUG, "Received first reply from primary using rdb connection.");
/* AUTH with the primary if required. */
if (server.primary_auth) {
Expand Down Expand Up @@ -2596,7 +2596,7 @@ static int dualChannelRepliacationHandleHandshake(connection *conn, char **err)
}
server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_RECEIVE_AUTH_REPLY;

if (connSetReadHandler(conn, fullSyncWithPrimary) == C_ERR) {
if (connSetReadHandler(conn, dualChannelFullSyncWithPrimary) == C_ERR) {
char conninfo[CONN_INFO_LEN];
serverLog(LL_WARNING, "Can't create readable event for SYNC: %s (%s)", *err,
connGetInfo(conn, conninfo, sizeof(conninfo)));
Expand All @@ -2605,7 +2605,7 @@ static int dualChannelRepliacationHandleHandshake(connection *conn, char **err)
return C_OK;
}

static int dualChannelReplicationHandleAuthReply(connection *conn, char **err) {
static int dualChannelReplHandleAuthReply(connection *conn, char **err) {
*err = receiveSynchronousResponse(conn);
if (*err == NULL) {
serverLog(LL_WARNING, "Primary did not respond to auth command during SYNC handshake");
Expand All @@ -2619,7 +2619,7 @@ static int dualChannelReplicationHandleAuthReply(connection *conn, char **err) {
return C_OK;
}

static int dualChannelReplicationHandleReplconfReply(connection *conn, char **err) {
static int dualChannelReplHandleReplconfReply(connection *conn, char **err) {
*err = receiveSynchronousResponse(conn);
if (*err == NULL) {
serverLog(LL_WARNING, "Primary did not respond to replconf command during SYNC handshake");
Expand All @@ -2639,7 +2639,7 @@ static int dualChannelReplicationHandleReplconfReply(connection *conn, char **er
return C_OK;
}

static int dualChannelReplicationHandleEndOffsetResponse(connection *conn, char **err) {
static int dualChannelReplHandleEndOffsetResponse(connection *conn, char **err) {
uint64_t rdb_client_id;
*err = receiveSynchronousResponse(conn);
if (*err == NULL) {
Expand Down Expand Up @@ -2673,8 +2673,8 @@ static int dualChannelReplicationHandleEndOffsetResponse(connection *conn, char
* main connection accordingly.*/
server.repl_transfer_s->state = CONN_STATE_CONNECTED;
server.repl_state = REPL_STATE_SEND_HANDSHAKE;
serverAssert(connSetReadHandler(server.repl_transfer_s, setupMainConnForPsync) != C_ERR);
setupMainConnForPsync(server.repl_transfer_s);
serverAssert(connSetReadHandler(server.repl_transfer_s, dualChannelSetupMainConnForPsync) != C_ERR);
dualChannelSetupMainConnForPsync(server.repl_transfer_s);

/* As the next block we will receive using this connection is the rdb, we need to prepare
* the connection accordingly */
Expand All @@ -2691,9 +2691,9 @@ static int dualChannelReplicationHandleEndOffsetResponse(connection *conn, char
/* Replication: Replica side.
* This connection handler is used to initialize the RDB connection (dual-channel-replication).
* Once a replica with dual-channel-replication enabled, denied from PSYNC with its primary,
* fullSyncWithPrimary begins its role. The connection handler prepares server.repl_rdb_transfer_s
* dualChannelFullSyncWithPrimary begins its role. The connection handler prepares server.repl_rdb_transfer_s
* for a rdb stream, and server.repl_transfer_s for increamental replication data stream. */
static void fullSyncWithPrimary(connection *conn) {
static void dualChannelFullSyncWithPrimary(connection *conn) {
char *err = NULL;
int ret = 0;
serverAssert(conn == server.repl_rdb_transfer_s);
Expand All @@ -2709,16 +2709,16 @@ static void fullSyncWithPrimary(connection *conn) {
goto error;
}
switch (server.repl_rdb_channel_state) {
case REPL_DUAL_CHANNEL_SEND_HANDSHAKE: ret = dualChannelRepliacationHandleHandshake(conn, &err); break;
case REPL_DUAL_CHANNEL_SEND_HANDSHAKE: ret = dualChannelReplHandleHandshake(conn, &err); break;
case REPL_DUAL_CHANNEL_RECEIVE_AUTH_REPLY:
if (server.primary_auth) {
ret = dualChannelReplicationHandleAuthReply(conn, &err);
ret = dualChannelReplHandleAuthReply(conn, &err);
break;
}
server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_RECEIVE_REPLCONF_REPLY;
/* fall through */
case REPL_DUAL_CHANNEL_RECEIVE_REPLCONF_REPLY: ret = dualChannelReplicationHandleReplconfReply(conn, &err); break;
case REPL_DUAL_CHANNEL_RECEIVE_ENDOFF: ret = dualChannelReplicationHandleEndOffsetResponse(conn, &err); break;
case REPL_DUAL_CHANNEL_RECEIVE_REPLCONF_REPLY: ret = dualChannelReplHandleReplconfReply(conn, &err); break;
case REPL_DUAL_CHANNEL_RECEIVE_ENDOFF: ret = dualChannelReplHandleEndOffsetResponse(conn, &err); break;
default: break;
}
sdsfree(err);
Expand Down Expand Up @@ -3178,7 +3178,7 @@ int replicaTryPartialResynchronization(connection *conn, int read_reply) {
return PSYNC_NOT_SUPPORTED;
}

int dualChannelReplicationMainConnSendHandshake(connection *conn, char **err) {
int dualChannelReplMainConnSendHandshake(connection *conn, char **err) {
char llstr[LONG_STR_SIZE];
ull2string(llstr, sizeof(llstr), server.rdb_client_id);
*err = sendCommand(conn, "REPLCONF", "set-rdb-client-id", llstr, NULL);
Expand All @@ -3187,7 +3187,7 @@ int dualChannelReplicationMainConnSendHandshake(connection *conn, char **err) {
return C_OK;
}

int dualChannelReplicationMainConnReceiveCapaReply(connection *conn, char **err) {
int dualChannelReplMainConnRecvCapaReply(connection *conn, char **err) {
*err = receiveSynchronousResponse(conn);
if (*err == NULL) return C_ERR;
if ((*err)[0] == '-') {
Expand All @@ -3198,7 +3198,7 @@ int dualChannelReplicationMainConnReceiveCapaReply(connection *conn, char **err)
return C_OK;
}

int dualChannelReplicationMainConnSendPsync(connection *conn, char **err) {
int dualChannelReplMainConnSendPsync(connection *conn, char **err) {
UNUSED(err);
if (server.debug_pause_after_fork) debugPauseProcess();
if (replicaTryPartialResynchronization(conn, 0) == PSYNC_WRITE_ERROR) {
Expand All @@ -3209,7 +3209,7 @@ int dualChannelReplicationMainConnSendPsync(connection *conn, char **err) {
return C_OK;
}

int dualChannelReplicationMainConnReceivePsyncReply(connection *conn, char **err) {
int dualChannelReplMainConnRecvPsyncReply(connection *conn, char **err) {
UNUSED(err);
int psync_result = replicaTryPartialResynchronization(conn, 1);
if (psync_result == PSYNC_WAIT_REPLY) return C_OK; /* Try again later... */
Expand All @@ -3231,36 +3231,33 @@ int dualChannelReplicationMainConnReceivePsyncReply(connection *conn, char **err
/* Replication: Replica side.
* This connection handler fires after rdb-connection was initialized. We use it
* to adjust the replica main for loading incremental changes into the local buffer. */
void setupMainConnForPsync(connection *conn) {
void dualChannelSetupMainConnForPsync(connection *conn) {
char *err = NULL;
int ret;

switch (server.repl_state) {
case REPL_STATE_SEND_HANDSHAKE: ret = dualChannelReplicationMainConnSendHandshake(conn, &err); break;
case REPL_STATE_SEND_HANDSHAKE: ret = dualChannelReplMainConnSendHandshake(conn, &err); break;
case REPL_STATE_RECEIVE_CAPA_REPLY:
ret = dualChannelReplicationMainConnReceiveCapaReply(conn, &err);
ret = dualChannelReplMainConnRecvCapaReply(conn, &err);
if (ret == C_ERR) {
break;
}
sdsfree(err);
err = NULL;
/* fall through */
case REPL_STATE_SEND_PSYNC: ret = dualChannelReplicationMainConnSendPsync(conn, &err); break;
case REPL_STATE_RECEIVE_PSYNC_REPLY: ret = dualChannelReplicationMainConnReceivePsyncReply(conn, &err); break;
case REPL_STATE_SEND_PSYNC: ret = dualChannelReplMainConnSendPsync(conn, &err); break;
case REPL_STATE_RECEIVE_PSYNC_REPLY: ret = dualChannelReplMainConnRecvPsyncReply(conn, &err); break;
default:
serverLog(LL_WARNING, "Unexpected replication state: %d", server.repl_state);
ret = C_ERR;
break;
}
sdsfree(err);
if (ret == C_ERR) goto error;
return;

error:
/* The dual-channel sync session must be aborted for any psync_result other than PSYNC_CONTINUE or PSYNC_WAIT_REPLY.
*/
serverLog(LL_WARNING, "Aborting dual channel sync. Main channel psync result %d", ret);
cancelReplicationHandshake(1);
if (ret != C_OK) {
serverLog(LL_WARNING, "Aborting dual channel sync. Main channel psync result %d", ret);
cancelReplicationHandshake(1);
}
}

/*
Expand Down Expand Up @@ -3627,7 +3624,7 @@ void syncWithPrimary(connection *conn) {
/* Create RDB connection */
server.repl_rdb_transfer_s = connCreate(connTypeOfReplication());
if (connConnect(server.repl_rdb_transfer_s, server.primary_host, server.primary_port, server.bind_source_addr,
fullSyncWithPrimary) == C_ERR) {
dualChannelFullSyncWithPrimary) == C_ERR) {
serverLog(LL_WARNING, "Unable to connect to Primary: %s", connGetLastError(server.repl_transfer_s));
connClose(server.repl_rdb_transfer_s);
server.repl_rdb_transfer_s = NULL;
Expand Down

0 comments on commit bf1fab1

Please sign in to comment.