Skip to content

Commit

Permalink
Simplify computation of whether a GCP sync is needed (#84865)
Browse files Browse the repository at this point in the history
We do a bunch of allocation simply to compute whether any replica's
global checkpoint lags behind the primary. With this commit we skip all
that jazz and just use a loop.
  • Loading branch information
DaveCTurner authored Mar 10, 2022
1 parent bd343f6 commit 4814da7
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,18 @@ public synchronized ObjectLongMap<String> getInSyncGlobalCheckpoints() {
return globalCheckpoints;
}

/**
* @return true iff any tracked global checkpoint for an in-sync copy lags behind our global checkpoint
*/
public synchronized boolean trackedGlobalCheckpointsNeedSync() {
for (final var checkpointState : checkpoints.values()) {
if (checkpointState.inSync && checkpointState.globalCheckpoint < globalCheckpoint) {
return true;
}
}
return false;
}

/**
* Returns whether the replication tracker is in primary mode, i.e., whether the current shard is acting as primary from the point of
* view of replication.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@
import java.util.function.LongUnaryOperator;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.elasticsearch.cluster.metadata.DataStream.TIMESERIES_LEAF_READERS_SORTER;
import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
Expand Down Expand Up @@ -2690,17 +2689,15 @@ public void maybeSyncGlobalCheckpoint(final String reason) {
final SeqNoStats stats = getEngine().getSeqNoStats(replicationTracker.getGlobalCheckpoint());
final boolean asyncDurability = indexSettings().getTranslogDurability() == Translog.Durability.ASYNC;
if (stats.getMaxSeqNo() == stats.getGlobalCheckpoint() || asyncDurability) {
final ObjectLongMap<String> globalCheckpoints = getInSyncGlobalCheckpoints();
final long globalCheckpoint = replicationTracker.getGlobalCheckpoint();
final var trackedGlobalCheckpointsNeedSync = replicationTracker.trackedGlobalCheckpointsNeedSync();
// async durability means that the local checkpoint might lag (as it is only advanced on fsync)
// periodically ask for the newest local checkpoint by syncing the global checkpoint, so that ultimately the global
// checkpoint can be synced. Also take into account that a shard might be pending sync, which means that it isn't
// in the in-sync set just yet but might be blocked on waiting for its persisted local checkpoint to catch up to
// the global checkpoint.
final boolean syncNeeded = (asyncDurability
&& (stats.getGlobalCheckpoint() < stats.getMaxSeqNo() || replicationTracker.pendingInSync()))
// check if the persisted global checkpoint
|| StreamSupport.stream(globalCheckpoints.values().spliterator(), false).anyMatch(v -> v.value < globalCheckpoint);
|| trackedGlobalCheckpointsNeedSync;
// only sync if index is not closed and there is a shard lagging the primary
if (syncNeeded && indexSettings.getIndexMetadata().getState() == IndexMetadata.State.OPEN) {
logger.trace("syncing global checkpoint for [{}]", reason);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1232,4 +1232,66 @@ public void testPeerRecoveryRetentionLeaseCreationAndRenewal() {
);
}

public void testTrackedGlobalCheckpointsNeedSync() {
final int numberOfActiveAllocationsIds = randomIntBetween(2, 8);
final int numberOfInitializingIds = randomIntBetween(0, 8);
final var activeAndInitializingAllocationIds = randomActiveAndInitializingAllocationIds(
numberOfActiveAllocationsIds,
numberOfInitializingIds
);
final var activeAllocationIds = activeAndInitializingAllocationIds.v1();
final var initializingAllocationIds = activeAndInitializingAllocationIds.v2();

final var primaryId = activeAllocationIds.iterator().next();

final var initialClusterStateVersion = randomNonNegativeLong();

final var currentTimeMillis = new AtomicLong(0L);
final var tracker = newTracker(primaryId, updatedGlobalCheckpoint::set, currentTimeMillis::get);

final var routingTable = routingTable(initializingAllocationIds, primaryId);
tracker.updateFromMaster(initialClusterStateVersion, ids(activeAllocationIds), routingTable);
final var localCheckpoint = randomLongBetween(0L, 1000L);
tracker.activatePrimaryMode(localCheckpoint);

// the global checkpoint hasn't moved yet so no sync is needed
assertEquals(UNASSIGNED_SEQ_NO, tracker.globalCheckpoint);
assertFalse(tracker.trackedGlobalCheckpointsNeedSync());

// advance the local checkpoint on all in-sync copies to move the global checkpoint on the primary
for (final var activeAllocationId : activeAllocationIds) {
assertEquals(UNASSIGNED_SEQ_NO, tracker.globalCheckpoint);
assertFalse(tracker.trackedGlobalCheckpointsNeedSync());
tracker.updateLocalCheckpoint(activeAllocationId.getId(), localCheckpoint);

// there may also be some activity on initializing shards but this is irrelevant
if (0 < numberOfInitializingIds && randomBoolean()) {
tracker.updateLocalCheckpoint(
randomFrom(initializingAllocationIds).getId(),
usually() ? localCheckpoint : randomLongBetween(0L, localCheckpoint)
);
}
}
// the global checkpoint advanced on the primary, but not on any other copy, so a sync is needed
assertEquals(localCheckpoint, tracker.globalCheckpoint);
assertTrue(tracker.trackedGlobalCheckpointsNeedSync());

// sync the global checkpoint on every active copy
for (final var activeAllocationId : activeAllocationIds) {
assertTrue(tracker.trackedGlobalCheckpointsNeedSync());
tracker.updateGlobalCheckpointForShard(activeAllocationId.getId(), localCheckpoint);

// there may also be some activity on initializing shards but this is irrelevant
if (0 < numberOfInitializingIds && randomBoolean()) {
tracker.updateGlobalCheckpointForShard(
randomFrom(initializingAllocationIds).getId(),
usually() ? localCheckpoint : randomLongBetween(0L, localCheckpoint)
);
}
}

// the global checkpoint is up to date on every in-sync copy so no further sync is needed
assertFalse(tracker.trackedGlobalCheckpointsNeedSync());
}

}

0 comments on commit 4814da7

Please sign in to comment.