Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication][BUG] Global checkpoint out of sync with local checkpoint on replica #2873

Closed
Tracked by #3969
Poojita-Raj opened this issue Apr 12, 2022 · 7 comments
Assignees
Labels
bug Something isn't working distributed framework

Comments

@Poojita-Raj
Copy link
Contributor

Describe the bug
On replicas for segment replication, the global and local checkpoints are out of sync in 2 situations - (1) during recovery and (2) while waiting for indexed segment files to reach the replica from the primary. We want to avoid the failure in the latter scenario.

To Reproduce
Steps to reproduce the behavior:

  1. Start a cluster with ./gradlew run with numberOfNodes=2.
  2. Create an index with segrep enabled
  3. Index a very large document (~10M)
  4. You should see the below error:
=== Standard error of node `node{::runTask-1}` ===
»   ↓ last 40 non error or warning messages from /Users/poojiraj/logging/OpenSearch/build/testclusters/runTask-1/logs/opensearch.stderr.log ↓
» fatal error in thread [opensearch[runTask-1][refresh][T#3]], exiting
»  java.lang.AssertionError: supposedly in-sync shard copy received a global checkpoint [96215] that is higher than its local checkpoint [-1]
»       at org.opensearch.index.shard.IndexShard.updateGlobalCheckpointOnReplica(IndexShard.java:2833)
»       at org.opensearch.index.shard.IndexShard.lambda$innerAcquireReplicaOperationPermit$30(IndexShard.java:3525)
»       at org.opensearch.action.ActionListener$3.onResponse(ActionListener.java:128)
»       at org.opensearch.index.shard.IndexShardOperationPermits.acquire(IndexShardOperationPermits.java:307)
»       at org.opensearch.index.shard.IndexShardOperationPermits.acquire(IndexShardOperationPermits.java:252)
»       at org.opensearch.index.shard.IndexShard.lambda$acquireReplicaOperationPermit$28(IndexShard.java:3460)
»       at org.opensearch.index.shard.IndexShard.innerAcquireReplicaOperationPermit(IndexShard.java:3578)
»       at org.opensearch.index.shard.IndexShard.acquireReplicaOperationPermit(IndexShard.java:3454)
»       at org.opensearch.action.support.replication.TransportReplicationAction.acquireReplicaOperationPermit(TransportReplicationAction.java:1131)
»       at org.opensearch.action.support.replication.TransportReplicationAction$AsyncReplicaAction.doRun(TransportReplicationAction.java:778)
»       at org.opensearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:50)
»       at org.opensearch.action.support.replication.TransportReplicationAction.handleReplicaRequest(TransportReplicationAction.java:636)
»       at org.opensearch.transport.RequestHandlerRegistry.processMessageReceived(RequestHandlerRegistry.java:98)
»       at org.opensearch.transport.InboundHandler$RequestHandler.doRun(InboundHandler.java:443)
»       at org.opensearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:792)
»       at org.opensearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:50)
»       at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
»       at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
»       at java.base/java.lang.Thread.run(Thread.java:832)


Expected behavior
To avoid the above failure due to the global and local checkpoints being out of sync for replica in segment replication

@dreamer-89
Copy link
Member

Looking into it.

@dreamer-89 dreamer-89 self-assigned this Aug 11, 2022
@dreamer-89
Copy link
Member

dreamer-89 commented Aug 11, 2022

I followed repro steps but not able to replicate this issue. I tried multiple doc ingestions with ~10MB payload but didn't see this error. I tried increasing the payload to 20MBs in a single document but that also didn't help. Though identified a different issue, tracked in #4195

Steps followed

  1. ./gradlew run below run.gradle config for testclusters to run 2 nodes & enable feature flag
testClusters {
  runTask {
    testDistribution = 'archive'
    numberOfNodes = 2
    systemProperty 'opensearch.experimental.feature.replication_type.enabled', 'true'
    if (numZones > 1) numberOfZones = numZones
    if (numNodes > 1) numberOfNodes = numNodes
  }
}
  1. Create index with segrep enabled
http://localhost:9200/test-index
{
  "settings": {
    "index": {
      "number_of_shards": 1,  
      "number_of_replicas": 1,
      "replication.type": "SEGMENT"
    }
  }
}
  1. Ingest data
localhost:9200/test-index/_doc
{ 
    "name": "Jane Holland",
    "data": {10MB large string}
}
  1. Cluster stats
    a. _cat/shards
index      shard prirep state   docs store ip        node
test-index 0     p      STARTED    4  77mb 127.0.0.1 runTask-0
test-index 0     r      STARTED    4  77mb 127.0.0.1 runTask-1

b. index settings

localhost:9200/test-index/_settings?pretty
{
    "test-index": {
        "settings": {
            "index": {
                "replication": {
                    "type": "SEGMENT"
                },
                "number_of_shards": "1",
                "provided_name": "test-index",
                "creation_date": "1660251815920",
                "number_of_replicas": "1",
                "uuid": "FuJNEJ_ZTbyFSsWwDxsFiQ",
                "version": {
                    "created": "137217827"
                }
            }
        }
    }
}

This issue was reported a while (4 months) ago after which multiple changes have happened. It might have been fixed with recent changes.

@Poojita-Raj : I will try to repro this few more times with variations such more replicas, bigger payload size etc. If I am not able to repro, I will close this issue for now.

@dreamer-89
Copy link
Member

Closing it for now as non-reproducible.

@dreamer-89
Copy link
Member

Re-opening this found this issue while running below test in continuation. The

    private void indexDocs(int docCount) throws Exception {
        try (
            BackgroundIndexer indexer = new BackgroundIndexer(
                INDEX_NAME,
                "_doc",
                client(),
                -1,
                RandomizedTest.scaledRandomIntBetween(2, 5),
                false,
                random()
            )
        ) {
            indexer.start(docCount);
            waitForDocs(docCount, indexer);
            refresh(INDEX_NAME);
        }
    }

    private void indexDocsAndRefresh(int docCount) {
        Runnable asyncIndexer = () -> {
            try {
                indexDocs(docCount);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        };
        asyncIndexer.run();
    }

    private void restartNode(String nodeName) {
        Runnable t1 = () -> {
            try {
                internalCluster().restartNode(nodeName);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        };
        t1.run();
    }

    /**
     * Tests whether primary shard allocation via PrimaryShardAllocation chooses the replica with further ahead
     * ReplicationCheckpoint
     */
    public void testPrimaryShardAllocatorUsesFurthestAheadReplica() throws Exception {
        final Settings settings = Settings.builder()
            .put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 6)
            .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
            .build();
        final String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(Settings.EMPTY);
        final String primaryNode = internalCluster().startDataOnlyNode(Settings.EMPTY);
        createIndex(INDEX_NAME, settings);
        final String firstReplica = internalCluster().startDataOnlyNode(Settings.EMPTY);
        final String secondReplica = internalCluster().startDataOnlyNode(Settings.EMPTY);

        // Index docs & refresh to bring all replicas to initial checkpoint
        indexDocs(scaledRandomIntBetween(20, 200));
        flushAndRefresh(INDEX_NAME);


        final String thirdReplica = internalCluster().startDataOnlyNode(Settings.EMPTY);
        final String fourthReplica = internalCluster().startDataOnlyNode(Settings.EMPTY);
        final String fifthReplica = internalCluster().startDataOnlyNode(Settings.EMPTY);
        final String sixthReplica = internalCluster().startDataOnlyNode(Settings.EMPTY);

        for(int i=0;i<10;i++) {
            logger.info("Iteration {} --> ", i);
            indexDocsAndRefresh(scaledRandomIntBetween(10, 100));
        }

        final Index index = resolveIndex(INDEX_NAME);
        logger.info("--> primaryShard RC {}", getIndexShard(primaryNode).getLatestReplicationCheckpoint());
        internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode));

        for(int i=0;i<5;i++) {
            logger.info("Iteration {} --> ", i);
            indexDocsAndRefresh(scaledRandomIntBetween(10, 100));
        }

        ensureYellow(INDEX_NAME);
//        logger.info("--> cluster state {}", client().admin().cluster().prepareState().get().getState());
        logger.info("--> firstReplica RC {}", getIndexShard(firstReplica).getLatestReplicationCheckpoint());
        logger.info("--> secondReplica RC {}", getIndexShard(secondReplica).getLatestReplicationCheckpoint());
        logger.info("--> thirdReplica RC {}", getIndexShard(thirdReplica).getLatestReplicationCheckpoint());
        logger.info("--> fourthReplica RC {}", getIndexShard(fourthReplica).getLatestReplicationCheckpoint());

    }

@dreamer-89 dreamer-89 reopened this Aug 12, 2022
@dreamer-89
Copy link
Member

Sample failure trace shows that replica's local checkpoint (> 0 rather than -1) is falling behind global checkpoint

[2022-08-13T00:06:29,341][INFO ][o.o.c.c.Coordinator      ] [node_t5] cluster-manager node [{node_t0}{3FtvvJBFSKC5J09DLK7YmA}{Adv4KIhGSlmgAFLX499mOw}{127.0.0.1}{127.0.0.1:61707}{m}{shard_indexing_pressure_enabled=true}] failed, restarting discovery
org.opensearch.transport.NodeDisconnectedException: [node_t0][127.0.0.1:61707][disconnected] disconnected
[2022-08-13T00:06:29,342][INFO ][o.o.c.s.ClusterApplierService] [node_t5] cluster-manager node changed {previous [{node_t0}{3FtvvJBFSKC5J09DLK7YmA}{Adv4KIhGSlmgAFLX499mOw}{127.0.0.1}{127.0.0.1:61707}{m}{shard_indexing_pressure_enabled=true}], current []}, term: 1, version: 22, reason: becoming candidate: onLeaderFailure
[2022-08-13T00:06:29,342][INFO ][o.o.c.c.FollowersChecker ] [node_t0] FollowerChecker{discoveryNode={node_t5}{LxUFVb99R0qzdUoCPJPWMw}{aNnFtOSfSTCeoPFJYDzDCw}{127.0.0.1}{127.0.0.1:61785}{d}{shard_indexing_pressure_enabled=true}, failureCountSinceLastSuccess=0, [cluster.fault_detection.follower_check.retry_count]=3} disconnected
[2022-08-13T00:06:29,343][INFO ][o.o.c.c.FollowersChecker ] [node_t0] FollowerChecker{discoveryNode={node_t5}{LxUFVb99R0qzdUoCPJPWMw}{aNnFtOSfSTCeoPFJYDzDCw}{127.0.0.1}{127.0.0.1:61785}{d}{shard_indexing_pressure_enabled=true}, failureCountSinceLastSuccess=0, [cluster.fault_detection.follower_check.retry_count]=3} marking node as faulty
[2022-08-13T00:06:29,344][INFO ][o.o.c.c.Coordinator      ] [node_t3] cluster-manager node [{node_t0}{3FtvvJBFSKC5J09DLK7YmA}{Adv4KIhGSlmgAFLX499mOw}{127.0.0.1}{127.0.0.1:61707}{m}{shard_indexing_pressure_enabled=true}] failed, restarting discovery
org.opensearch.transport.NodeDisconnectedException: [node_t0][127.0.0.1:61707][disconnected] disconnected
ago 13, 2022 12:06:29 A.M. com.carrotsearch.randomizedtesting.RandomizedRunner$QueueUncaughtExceptionsHandler uncaughtException
ADVERTENCIA: Uncaught exception in thread: Thread[opensearch[node_t5][transport_worker][T#1],5,TGRP-SegmentReplicationIT]
java.lang.AssertionError: supposedly in-sync shard copy received a global checkpoint [74] that is higher than its local checkpoint [73]
	at __randomizedtesting.SeedInfo.seed([9EFE1B4F25B3092E]:0)
	at org.opensearch.index.shard.IndexShard.updateGlobalCheckpointOnReplica(IndexShard.java:2830)
	at org.opensearch.index.shard.IndexShard.lambda$innerAcquireReplicaOperationPermit$34(IndexShard.java:3519)
	at org.opensearch.action.ActionListener$3.onResponse(ActionListener.java:130)
	at org.opensearch.index.shard.IndexShardOperationPermits.acquire(IndexShardOperationPermits.java:309)
	at org.opensearch.index.shard.IndexShardOperationPermits.acquire(IndexShardOperationPermits.java:254)
	at org.opensearch.index.shard.IndexShard.lambda$acquireReplicaOperationPermit$32(IndexShard.java:3454)
	at org.opensearch.index.shard.IndexShard.innerAcquireReplicaOperationPermit(IndexShard.java:3572)
	at org.opensearch.index.shard.IndexShard.acquireReplicaOperationPermit(IndexShard.java:3448)
	at org.opensearch.action.support.replication.TransportReplicationAction.acquireReplicaOperationPermit(TransportReplicationAction.java:1160)
	at org.opensearch.action.support.replication.TransportReplicationAction$AsyncReplicaAction.doRun(TransportReplicationAction.java:805)
	at org.opensearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:52)
	at org.opensearch.action.support.replication.TransportReplicationAction.handleReplicaRequest(TransportReplicationAction.java:653)
	at org.opensearch.transport.RequestHandlerRegistry.processMessageReceived(RequestHandlerRegistry.java:106)
	at org.opensearch.transport.InboundHandler.handleRequest(InboundHandler.java:249)
	at org.opensearch.transport.InboundHandler.messageReceived(InboundHandler.java:132)
	at org.opensearch.transport.InboundHandler.inboundMessage(InboundHandler.java:114)
	at org.opensearch.transport.TcpTransport.inboundMessage(TcpTransport.java:769)
	at org.opensearch.transport.InboundPipeline.forwardFragments(InboundPipeline.java:175)
	at org.opensearch.transport.InboundPipeline.doHandleBytes(InboundPipeline.java:150)
	at org.opensearch.transport.InboundPipeline.handleBytes(InboundPipeline.java:115)
	at org.opensearch.transport.nio.MockNioTransport$MockTcpReadWriteHandler.consumeReads(MockNioTransport.java:341)
	at org.opensearch.nio.SocketChannelContext.handleReadBytes(SocketChannelContext.java:246)
	at org.opensearch.nio.BytesChannelContext.read(BytesChannelContext.java:59)
	at org.opensearch.nio.EventHandler.handleRead(EventHandler.java:152)
	at org.opensearch.transport.nio.TestEventHandler.handleRead(TestEventHandler.java:167)
	at org.opensearch.nio.NioSelector.handleRead(NioSelector.java:438)
	at org.opensearch.nio.NioSelector.processKey(NioSelector.java:264)
	at org.opensearch.nio.NioSelector.singleLoop(NioSelector.java:191)
	at org.opensearch.nio.NioSelector.runLoop(NioSelector.java:148)
	at java.base/java.lang.Thread.run(Thread.java:833)

@dreamer-89
Copy link
Member

Tried to reproduce this using below gradle command but it pops-up rarely. With #4224 hoping to see this go away completely (thanks @mch2 for the fix).

./gradlew ':server:internalClusterTest' --tests "org.opensearch.indices.replication.SegmentReplicationIT.testPrimaryShardAllocatorUsesFurthestAheadReplica" -Dtests.seed=9EFE1B4F25B3092E -Dtests.iters=50 -Dtests.opensearch.logger.level=INFO --rerun-tasks

@dreamer-89
Copy link
Member

Closing this as probable fix #4224 is merged.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working distributed framework
Projects
None yet
Development

No branches or pull requests

3 participants