From 4814da71591cf65adbfaabc76f0ce067025f5f68 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 10 Mar 2022 14:00:35 +0000 Subject: [PATCH] Simplify computation of whether a GCP sync is needed (#84865) We do a bunch of allocation simply to compute whether any replica's global checkpoint lags behind the primary. With this commit we skip all that jazz and just use a loop. --- .../index/seqno/ReplicationTracker.java | 12 ++++ .../elasticsearch/index/shard/IndexShard.java | 7 +-- .../index/seqno/ReplicationTrackerTests.java | 62 +++++++++++++++++++ 3 files changed, 76 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index 7245359cf5e8a..622ce97be9cc0 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -741,6 +741,18 @@ public synchronized ObjectLongMap getInSyncGlobalCheckpoints() { return globalCheckpoints; } + /** + * @return true iff any tracked global checkpoint for an in-sync copy lags behind our global checkpoint + */ + public synchronized boolean trackedGlobalCheckpointsNeedSync() { + for (final var checkpointState : checkpoints.values()) { + if (checkpointState.inSync && checkpointState.globalCheckpoint < globalCheckpoint) { + return true; + } + } + return false; + } + /** * Returns whether the replication tracker is in primary mode, i.e., whether the current shard is acting as primary from the point of * view of replication. diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 4913a6ba32965..663d433d923d3 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -185,7 +185,6 @@ import java.util.function.LongUnaryOperator; import java.util.function.Supplier; import java.util.stream.Collectors; -import java.util.stream.StreamSupport; import static org.elasticsearch.cluster.metadata.DataStream.TIMESERIES_LEAF_READERS_SORTER; import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL; @@ -2690,8 +2689,7 @@ public void maybeSyncGlobalCheckpoint(final String reason) { final SeqNoStats stats = getEngine().getSeqNoStats(replicationTracker.getGlobalCheckpoint()); final boolean asyncDurability = indexSettings().getTranslogDurability() == Translog.Durability.ASYNC; if (stats.getMaxSeqNo() == stats.getGlobalCheckpoint() || asyncDurability) { - final ObjectLongMap globalCheckpoints = getInSyncGlobalCheckpoints(); - final long globalCheckpoint = replicationTracker.getGlobalCheckpoint(); + final var trackedGlobalCheckpointsNeedSync = replicationTracker.trackedGlobalCheckpointsNeedSync(); // async durability means that the local checkpoint might lag (as it is only advanced on fsync) // periodically ask for the newest local checkpoint by syncing the global checkpoint, so that ultimately the global // checkpoint can be synced. Also take into account that a shard might be pending sync, which means that it isn't @@ -2699,8 +2697,7 @@ public void maybeSyncGlobalCheckpoint(final String reason) { // the global checkpoint. final boolean syncNeeded = (asyncDurability && (stats.getGlobalCheckpoint() < stats.getMaxSeqNo() || replicationTracker.pendingInSync())) - // check if the persisted global checkpoint - || StreamSupport.stream(globalCheckpoints.values().spliterator(), false).anyMatch(v -> v.value < globalCheckpoint); + || trackedGlobalCheckpointsNeedSync; // only sync if index is not closed and there is a shard lagging the primary if (syncNeeded && indexSettings.getIndexMetadata().getState() == IndexMetadata.State.OPEN) { logger.trace("syncing global checkpoint for [{}]", reason); diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java index f29eeb6183244..94bbc09aa8b3d 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java @@ -1232,4 +1232,66 @@ public void testPeerRecoveryRetentionLeaseCreationAndRenewal() { ); } + public void testTrackedGlobalCheckpointsNeedSync() { + final int numberOfActiveAllocationsIds = randomIntBetween(2, 8); + final int numberOfInitializingIds = randomIntBetween(0, 8); + final var activeAndInitializingAllocationIds = randomActiveAndInitializingAllocationIds( + numberOfActiveAllocationsIds, + numberOfInitializingIds + ); + final var activeAllocationIds = activeAndInitializingAllocationIds.v1(); + final var initializingAllocationIds = activeAndInitializingAllocationIds.v2(); + + final var primaryId = activeAllocationIds.iterator().next(); + + final var initialClusterStateVersion = randomNonNegativeLong(); + + final var currentTimeMillis = new AtomicLong(0L); + final var tracker = newTracker(primaryId, updatedGlobalCheckpoint::set, currentTimeMillis::get); + + final var routingTable = routingTable(initializingAllocationIds, primaryId); + tracker.updateFromMaster(initialClusterStateVersion, ids(activeAllocationIds), routingTable); + final var localCheckpoint = randomLongBetween(0L, 1000L); + tracker.activatePrimaryMode(localCheckpoint); + + // the global checkpoint hasn't moved yet so no sync is needed + assertEquals(UNASSIGNED_SEQ_NO, tracker.globalCheckpoint); + assertFalse(tracker.trackedGlobalCheckpointsNeedSync()); + + // advance the local checkpoint on all in-sync copies to move the global checkpoint on the primary + for (final var activeAllocationId : activeAllocationIds) { + assertEquals(UNASSIGNED_SEQ_NO, tracker.globalCheckpoint); + assertFalse(tracker.trackedGlobalCheckpointsNeedSync()); + tracker.updateLocalCheckpoint(activeAllocationId.getId(), localCheckpoint); + + // there may also be some activity on initializing shards but this is irrelevant + if (0 < numberOfInitializingIds && randomBoolean()) { + tracker.updateLocalCheckpoint( + randomFrom(initializingAllocationIds).getId(), + usually() ? localCheckpoint : randomLongBetween(0L, localCheckpoint) + ); + } + } + // the global checkpoint advanced on the primary, but not on any other copy, so a sync is needed + assertEquals(localCheckpoint, tracker.globalCheckpoint); + assertTrue(tracker.trackedGlobalCheckpointsNeedSync()); + + // sync the global checkpoint on every active copy + for (final var activeAllocationId : activeAllocationIds) { + assertTrue(tracker.trackedGlobalCheckpointsNeedSync()); + tracker.updateGlobalCheckpointForShard(activeAllocationId.getId(), localCheckpoint); + + // there may also be some activity on initializing shards but this is irrelevant + if (0 < numberOfInitializingIds && randomBoolean()) { + tracker.updateGlobalCheckpointForShard( + randomFrom(initializingAllocationIds).getId(), + usually() ? localCheckpoint : randomLongBetween(0L, localCheckpoint) + ); + } + } + + // the global checkpoint is up to date on every in-sync copy so no further sync is needed + assertFalse(tracker.trackedGlobalCheckpointsNeedSync()); + } + }