Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Parallel segment replication rounds for same index on replica #5706

Closed
dreamer-89 opened this issue Jan 4, 2023 · 3 comments · Fixed by #5831
Closed

[BUG] Parallel segment replication rounds for same index on replica #5706

dreamer-89 opened this issue Jan 4, 2023 · 3 comments · Fixed by #5831
Assignees
Labels
bug Something isn't working

Comments

@dreamer-89
Copy link
Member

Describe the bug
Coming from #5344 (comment), with primary allocation without a primary term bump (e.g. primary relocation), it is posible that replica is performing segment replication with both older and new primary.

To Reproduce
Used integration test below for simulating

   /**
     * This test tries to mimic state where segment replication from older primary (after primary recovery) is still
     * happening on target/replica node and not caught by existing guards (state/index/shard listeners). The test tries
     * to simulate this issue by blocking segment replication from older primary to a replica node and then
     * triggering a primary recovery to target. After primary change, the older primary still performing the segrep with
     * replica node.
     */
    public void testPrimaryRelocationWithDup() throws Exception {
        final String old_primary = internalCluster().startNode();
        createIndex();
        final String replica = internalCluster().startNode();
        ensureGreen(INDEX_NAME);

        CountDownLatch latch = new CountDownLatch(1);
        // Mock transport service to add behaviour of throwing corruption exception during segment replication process.
        MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(
            TransportService.class,
            old_primary
        ));
        mockTransportService.addSendBehavior(
            internalCluster().getInstance(TransportService.class, replica),
            (connection, requestId, action, request, options) -> {
                if (action.equals(SegmentReplicationTargetService.Actions.FILE_CHUNK)) {
                    try {
                        logger.info("--> blocking old primary");
                        latch.await();
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
                connection.sendRequest(requestId, action, request, options);
            }
        );

        final int initialDocCount = scaledRandomIntBetween(0, 200);
        for (int i = 0; i < initialDocCount; i++) {
            client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
        }
        refresh(INDEX_NAME); // this blocks the segrep on old primary -> replica

        logger.info("--> start target node");
        final String new_primary = internalCluster().startNode();
        ClusterHealthResponse clusterHealthResponse = client().admin()
            .cluster()
            .prepareHealth()
            .setWaitForEvents(Priority.LANGUID)
            .setWaitForNodes("3")
            .execute()
            .actionGet();
        assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));

        logger.info("--> relocate the shard");
        client().admin()
            .cluster()
            .prepareReroute()
            .add(new MoveAllocationCommand(INDEX_NAME, 0, old_primary, new_primary))
            .execute()
            .actionGet();
        clusterHealthResponse = client().admin()
            .cluster()
            .prepareHealth()
            .setWaitForEvents(Priority.LANGUID)
            .setWaitForNoRelocatingShards(true)
            .setTimeout(ACCEPTABLE_RELOCATION_TIME)
            .execute()
            .actionGet();
        assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));

        logger.info("--> get the state, verify shard 1 primary moved from node1 to node2");
        ClusterState state = client().admin().cluster().prepareState().execute().actionGet().getState();

        logger.info("--> state {}", state);

        assertThat(
            state.getRoutingNodes().node(state.nodes().resolveNode(new_primary).getId()).iterator().next().state(),
            equalTo(ShardRoutingState.STARTED)
        );

        final int finalDocCount = initialDocCount;
        for (int i = initialDocCount; i < 2 * initialDocCount; i++) {
            client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
        }
        refresh(INDEX_NAME);

        final IndexShard indexShard = getIndexShard(new_primary);

        ReplicationCollection<SegmentReplicationTarget> replications = internalCluster().getInstance(SegmentReplicationTargetService.class, replica).getOnGoingReplications();
        PrimaryShardReplicationSource source = (PrimaryShardReplicationSource) replications.getOngoingReplicationTarget(indexShard.shardId()).getSource();

        assertNotEquals(source.getSourceNode().getName(), old_primary);
        logger.info("Source node {} {}", source.getSourceNode().getName(), old_primary);

        logger.info("--> verifying count again {}", initialDocCount + finalDocCount);
        client().admin().indices().prepareRefresh().execute().actionGet();
        assertHitCount(
            client(new_primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(),
            initialDocCount + finalDocCount
        );
        assertHitCount(
            client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(),
            initialDocCount + finalDocCount
        );
        latch.countDown();
    }

Expected behavior
There should not be two rounds of segment replications for same shard. This can have unintended consequence on replica state.

@Poojita-Raj
Copy link
Contributor

Looking into it.

@mch2
Copy link
Member

mch2 commented Jan 9, 2023

I think the assertion being made in this test is invalid. We don't care if a replica is syncing to the old primary. Once the old primary shuts down the replica will start syncing from the new primary?

I think we do actually have a race condition where a replica could receive a checkpoint when it hits POST_RECOVERY and processes it. It will then force a round of segrep which does not block for any ongoing replications. We need the validation that prevents duplicate rounds of replication to move outside of onNewCheckpoint and apply whenever segrep is started.

@Poojita-Raj
Copy link
Contributor

I think we do actually have a race condition where a replica could receive a checkpoint when it hits POST_RECOVERY and processes it.

This race condition could result in parallel replication events taking place which we would like to avoid. The most straightforward way of ensuring this is to avoid any replication event from taking place on receiving a new checkpoint if the index shard state is not STARTED.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
Status: Done
3 participants