From f9b6694c0eb4a4a7be326ba143edc41a6eba2f0c Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Fri, 1 Sep 2023 11:31:06 -0700 Subject: [PATCH] Fix Segment Replication stats bytes behind metric (#9686) * Fix Segment Replication stats bytes behind metric. This metric currently gives an estimate of the bytes behind based on the difference in size of the segments referenced by the active readers between shards. This does not give a good indication of the amount of bytes that need to be fetched and is inaccurate after deletes and merges. Fixed by sending file metadata with each checkpoint and computing a diff between checkpoints when SegmentReplicationShardStats is built. Signed-off-by: Marc Handalian * Skip SegRep bwc test until this is backported to 2.x. Signed-off-by: Marc Handalian * Add changelog entry. Signed-off-by: Marc Handalian --------- Signed-off-by: Marc Handalian --- CHANGELOG.md | 1 + .../org/opensearch/backwards/IndexingIT.java | 1 + .../index/seqno/ReplicationTracker.java | 28 ++++----- .../opensearch/index/shard/IndexShard.java | 6 +- .../metadata/RemoteSegmentMetadata.java | 29 +++++++-- .../checkpoint/ReplicationCheckpoint.java | 34 ++++++++-- .../index/seqno/ReplicationTrackerTests.java | 25 +++++--- .../SegmentReplicationIndexShardTests.java | 62 +++++++++++++++++++ 8 files changed, 154 insertions(+), 32 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d8d68c51ceed5..63e4486012a67 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -188,6 +188,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix condition to remove index create block ([#9437](https://github.com/opensearch-project/OpenSearch/pull/9437)) - Add support to clear archived index setting ([#9019](https://github.com/opensearch-project/OpenSearch/pull/9019)) - [Segment Replication] Fixed bug where replica shard temporarily serves stale data during an engine reset ([#9495](https://github.com/opensearch-project/OpenSearch/pull/9495)) +- [Segment Replication] Fixed bug where bytes behind metric is not accurate ([#9686](https://github.com/opensearch-project/OpenSearch/pull/9686)) ### Security diff --git a/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java b/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java index 686fc78dcec8a..75083a929b491 100644 --- a/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java +++ b/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java @@ -114,6 +114,7 @@ private void printClusterRouting() throws IOException, ParseException { * This test verifies that segment replication does not break when primary shards are on lower OS version. It does this * by verifying replica shards contains same number of documents as primary's. */ + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/9685") public void testIndexingWithPrimaryOnBwcNodes() throws Exception { if (UPGRADE_FROM_VERSION.before(Version.V_2_4_0)) { logger.info("--> Skip test for version {} where segment replication feature is not available", UPGRADE_FROM_VERSION); diff --git a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java index 48bfce1013f17..4b6d72b86ff62 100644 --- a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java @@ -57,6 +57,8 @@ import org.opensearch.index.shard.AbstractIndexShardComponent; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ReplicationGroup; +import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.SegmentReplicationLagTimer; @@ -1290,27 +1292,25 @@ public synchronized Set getSegmentReplicationStats && entry.getValue().inSync && replicationGroup.getUnavailableInSyncShards().contains(entry.getKey()) == false ) - .map(entry -> buildShardStats(latestReplicationCheckpoint.getLength(), entry.getKey(), entry.getValue())) + .map(entry -> buildShardStats(entry.getKey(), entry.getValue())) .collect(Collectors.toUnmodifiableSet()); } return Collections.emptySet(); } - private SegmentReplicationShardStats buildShardStats( - final long latestCheckpointLength, - final String allocationId, - final CheckpointState checkpointState - ) { - final Map checkpointTimers = checkpointState.checkpointTimers; + private SegmentReplicationShardStats buildShardStats(final String allocationId, final CheckpointState cps) { + final Store.RecoveryDiff diff = Store.segmentReplicationDiff( + latestReplicationCheckpoint.getMetadataMap(), + cps.visibleReplicationCheckpoint != null ? cps.visibleReplicationCheckpoint.getMetadataMap() : Collections.emptyMap() + ); + final long bytesBehind = diff.missing.stream().mapToLong(StoreFileMetadata::length).sum(); return new SegmentReplicationShardStats( allocationId, - checkpointTimers.size(), - checkpointState.visibleReplicationCheckpoint == null - ? latestCheckpointLength - : Math.max(latestCheckpointLength - checkpointState.visibleReplicationCheckpoint.getLength(), 0), - checkpointTimers.values().stream().mapToLong(SegmentReplicationLagTimer::time).max().orElse(0), - checkpointTimers.values().stream().mapToLong(SegmentReplicationLagTimer::totalElapsedTime).max().orElse(0), - checkpointState.lastCompletedReplicationLag + cps.checkpointTimers.size(), + bytesBehind, + cps.checkpointTimers.values().stream().mapToLong(SegmentReplicationLagTimer::time).max().orElse(0), + cps.checkpointTimers.values().stream().mapToLong(SegmentReplicationLagTimer::totalElapsedTime).max().orElse(0), + cps.lastCompletedReplicationLag ); } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index e398cab23a085..352876e54547e 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1610,6 +1610,7 @@ public Tuple, ReplicationCheckpoint> getLatestSegme snapshot = getSegmentInfosSnapshot(); if (snapshot.get() != null) { SegmentInfos segmentInfos = snapshot.get(); + final Map metadataMap = store.getSegmentMetadataMap(segmentInfos); return new Tuple<>( snapshot, new ReplicationCheckpoint( @@ -1617,8 +1618,9 @@ public Tuple, ReplicationCheckpoint> getLatestSegme getOperationPrimaryTerm(), segmentInfos.getGeneration(), segmentInfos.getVersion(), - store.getSegmentMetadataMap(segmentInfos).values().stream().mapToLong(StoreFileMetadata::length).sum(), - getEngine().config().getCodec().getName() + metadataMap.values().stream().mapToLong(StoreFileMetadata::length).sum(), + getEngine().config().getCodec().getName(), + metadataMap ) ); } diff --git a/server/src/main/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadata.java b/server/src/main/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadata.java index 207620573886d..1cec20ec3f6cc 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadata.java +++ b/server/src/main/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadata.java @@ -10,13 +10,16 @@ import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.util.Version; import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.store.RemoteSegmentStoreDirectory; +import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import java.io.IOException; import java.util.Map; +import java.util.function.Function; import java.util.stream.Collectors; /** @@ -110,11 +113,13 @@ public void write(IndexOutput out) throws IOException { public static RemoteSegmentMetadata read(IndexInput indexInput) throws IOException { Map metadata = indexInput.readMapOfStrings(); - ReplicationCheckpoint replicationCheckpoint = readCheckpointFromIndexInput(indexInput); + final Map uploadedSegmentMetadataMap = RemoteSegmentMetadata + .fromMapOfStrings(metadata); + ReplicationCheckpoint replicationCheckpoint = readCheckpointFromIndexInput(indexInput, uploadedSegmentMetadataMap); int byteArraySize = (int) indexInput.readLong(); byte[] segmentInfosBytes = new byte[byteArraySize]; indexInput.readBytes(segmentInfosBytes, 0, byteArraySize); - return new RemoteSegmentMetadata(RemoteSegmentMetadata.fromMapOfStrings(metadata), segmentInfosBytes, replicationCheckpoint); + return new RemoteSegmentMetadata(uploadedSegmentMetadataMap, segmentInfosBytes, replicationCheckpoint); } public static void writeCheckpointToIndexOutput(ReplicationCheckpoint replicationCheckpoint, IndexOutput out) throws IOException { @@ -131,14 +136,30 @@ public static void writeCheckpointToIndexOutput(ReplicationCheckpoint replicatio out.writeString(replicationCheckpoint.getCodec()); } - private static ReplicationCheckpoint readCheckpointFromIndexInput(IndexInput in) throws IOException { + private static ReplicationCheckpoint readCheckpointFromIndexInput( + IndexInput in, + Map uploadedSegmentMetadataMap + ) throws IOException { return new ReplicationCheckpoint( new ShardId(new Index(in.readString(), in.readString()), in.readVInt()), in.readLong(), in.readLong(), in.readLong(), in.readLong(), - in.readString() + in.readString(), + toStoreFileMetadata(uploadedSegmentMetadataMap) ); } + + private static Map toStoreFileMetadata( + Map metadata + ) { + return metadata.entrySet() + .stream() + // TODO: Version here should be read from UploadedSegmentMetadata. + .map( + entry -> new StoreFileMetadata(entry.getKey(), entry.getValue().getLength(), entry.getValue().getChecksum(), Version.LATEST) + ) + .collect(Collectors.toMap(StoreFileMetadata::name, Function.identity())); + } } diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java index 70c3e71ba18b9..521522803c726 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java @@ -15,8 +15,11 @@ import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.seqno.SequenceNumbers; +import org.opensearch.index.store.StoreFileMetadata; import java.io.IOException; +import java.util.Collections; +import java.util.Map; import java.util.Objects; /** @@ -32,6 +35,7 @@ public class ReplicationCheckpoint implements Writeable, Comparable metadataMap; public static ReplicationCheckpoint empty(ShardId shardId) { return empty(shardId, ""); @@ -48,19 +52,29 @@ private ReplicationCheckpoint(ShardId shardId, String codec) { segmentInfosVersion = SequenceNumbers.NO_OPS_PERFORMED; length = 0L; this.codec = codec; + this.metadataMap = Collections.emptyMap(); } public ReplicationCheckpoint(ShardId shardId, long primaryTerm, long segmentsGen, long segmentInfosVersion, String codec) { - this(shardId, primaryTerm, segmentsGen, segmentInfosVersion, 0L, codec); - } - - public ReplicationCheckpoint(ShardId shardId, long primaryTerm, long segmentsGen, long segmentInfosVersion, long length, String codec) { + this(shardId, primaryTerm, segmentsGen, segmentInfosVersion, 0L, codec, Collections.emptyMap()); + } + + public ReplicationCheckpoint( + ShardId shardId, + long primaryTerm, + long segmentsGen, + long segmentInfosVersion, + long length, + String codec, + Map metadataMap + ) { this.shardId = shardId; this.primaryTerm = primaryTerm; this.segmentsGen = segmentsGen; this.segmentInfosVersion = segmentInfosVersion; this.length = length; this.codec = codec; + this.metadataMap = metadataMap; } public ReplicationCheckpoint(StreamInput in) throws IOException { @@ -75,6 +89,11 @@ public ReplicationCheckpoint(StreamInput in) throws IOException { length = 0L; codec = null; } + if (in.getVersion().onOrAfter(Version.V_2_10_0)) { + this.metadataMap = in.readMap(StreamInput::readString, StoreFileMetadata::new); + } else { + this.metadataMap = Collections.emptyMap(); + } } /** @@ -135,6 +154,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(length); out.writeString(codec); } + if (out.getVersion().onOrAfter(Version.V_2_10_0)) { + out.writeMap(metadataMap, StreamOutput::writeString, (valueOut, fc) -> fc.writeTo(valueOut)); + } } @Override @@ -169,6 +191,10 @@ public boolean isAheadOf(@Nullable ReplicationCheckpoint other) { || (primaryTerm == other.getPrimaryTerm() && segmentInfosVersion > other.getSegmentInfosVersion()); } + public Map getMetadataMap() { + return metadataMap; + } + @Override public String toString() { return "ReplicationCheckpoint{" diff --git a/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTests.java b/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTests.java index ab87d31d15e2f..28c95ddf13fc4 100644 --- a/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTests.java +++ b/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTests.java @@ -33,6 +33,7 @@ package org.opensearch.index.seqno; import org.apache.lucene.codecs.Codec; +import org.apache.lucene.util.Version; import org.opensearch.action.support.replication.ReplicationResponse; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.AllocationId; @@ -50,6 +51,7 @@ import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.IndexSettings; import org.opensearch.index.SegmentReplicationShardStats; +import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.indices.replication.common.SegmentReplicationLagTimer; @@ -1826,29 +1828,35 @@ public void testSegmentReplicationCheckpointTracking() { initializingIds.forEach(aId -> markAsTrackingAndInSyncQuietly(tracker, aId.getId(), NO_OPS_PERFORMED)); + final StoreFileMetadata segment_1 = new StoreFileMetadata("segment_1", 1L, "abcd", Version.LATEST); + final StoreFileMetadata segment_2 = new StoreFileMetadata("segment_2", 50L, "abcd", Version.LATEST); + final StoreFileMetadata segment_3 = new StoreFileMetadata("segment_3", 100L, "abcd", Version.LATEST); final ReplicationCheckpoint initialCheckpoint = new ReplicationCheckpoint( tracker.shardId(), 0L, 1, 1, 1L, - Codec.getDefault().getName() + Codec.getDefault().getName(), + Map.of("segment_1", segment_1) ); final ReplicationCheckpoint secondCheckpoint = new ReplicationCheckpoint( tracker.shardId(), 0L, 2, 2, - 50L, - Codec.getDefault().getName() + 51L, + Codec.getDefault().getName(), + Map.of("segment_1", segment_1, "segment_2", segment_2) ); final ReplicationCheckpoint thirdCheckpoint = new ReplicationCheckpoint( tracker.shardId(), 0L, 2, 3, - 100L, - Codec.getDefault().getName() + 151L, + Codec.getDefault().getName(), + Map.of("segment_1", segment_1, "segment_2", segment_2, "segment_3", segment_3) ); tracker.setLatestReplicationCheckpoint(initialCheckpoint); @@ -1864,7 +1872,7 @@ public void testSegmentReplicationCheckpointTracking() { assertEquals(expectedIds.size(), groupStats.size()); for (SegmentReplicationShardStats shardStat : groupStats) { assertEquals(3, shardStat.getCheckpointsBehindCount()); - assertEquals(100L, shardStat.getBytesBehindCount()); + assertEquals(151L, shardStat.getBytesBehindCount()); assertTrue(shardStat.getCurrentReplicationLagMillis() >= shardStat.getCurrentReplicationTimeMillis()); } @@ -1881,7 +1889,7 @@ public void testSegmentReplicationCheckpointTracking() { assertEquals(expectedIds.size(), groupStats.size()); for (SegmentReplicationShardStats shardStat : groupStats) { assertEquals(2, shardStat.getCheckpointsBehindCount()); - assertEquals(99L, shardStat.getBytesBehindCount()); + assertEquals(150L, shardStat.getBytesBehindCount()); } for (String id : expectedIds) { @@ -1938,7 +1946,8 @@ public void testSegmentReplicationCheckpointTrackingInvalidAllocationIDs() { 1, 1, 1L, - Codec.getDefault().getName() + Codec.getDefault().getName(), + Collections.emptyMap() ); tracker.setLatestReplicationCheckpoint(initialCheckpoint); tracker.startReplicationLagTimers(initialCheckpoint); diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 29daa3936e8bb..b7972810dddb9 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -30,6 +30,8 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.index.IndexSettings; +import org.opensearch.index.SegmentReplicationShardStats; +import org.opensearch.index.engine.DocIdSeqNoAndSource; import org.opensearch.index.engine.Engine; import org.opensearch.index.engine.InternalEngineFactory; import org.opensearch.index.engine.NRTReplicationEngine; @@ -69,6 +71,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -805,6 +808,65 @@ public void testQueryDuringEngineResetShowsDocs() throws Exception { } } + public void testSegmentReplicationStats() throws Exception { + final NRTReplicationEngineFactory engineFactory = new NRTReplicationEngineFactory(); + final NRTReplicationEngineFactory spy = spy(engineFactory); + try (ReplicationGroup shards = createGroup(1, settings, indexMapping, spy, createTempDir())) { + final IndexShard primaryShard = shards.getPrimary(); + final IndexShard replicaShard = shards.getReplicas().get(0); + shards.startAll(); + + assertReplicaCaughtUp(primaryShard); + + shards.indexDocs(10); + shards.refresh("test"); + + final ReplicationCheckpoint primaryCheckpoint = primaryShard.getLatestReplicationCheckpoint(); + final long initialCheckpointSize = primaryCheckpoint.getMetadataMap() + .values() + .stream() + .mapToLong(StoreFileMetadata::length) + .sum(); + + Set postRefreshStats = primaryShard.getReplicationStats(); + SegmentReplicationShardStats shardStats = postRefreshStats.stream().findFirst().get(); + assertEquals(1, shardStats.getCheckpointsBehindCount()); + assertEquals(initialCheckpointSize, shardStats.getBytesBehindCount()); + replicateSegments(primaryShard, shards.getReplicas()); + assertReplicaCaughtUp(primaryShard); + shards.assertAllEqual(10); + + final List docIdAndSeqNos = getDocIdAndSeqNos(primaryShard); + for (DocIdSeqNoAndSource docIdAndSeqNo : docIdAndSeqNos.subList(0, 5)) { + deleteDoc(primaryShard, docIdAndSeqNo.getId()); + // delete on replica for xlog. + deleteDoc(replicaShard, docIdAndSeqNo.getId()); + } + primaryShard.forceMerge(new ForceMergeRequest().maxNumSegments(1).flush(true)); + + final Map segmentMetadataMap = primaryShard.getSegmentMetadataMap(); + final Store.RecoveryDiff diff = Store.segmentReplicationDiff(segmentMetadataMap, replicaShard.getSegmentMetadataMap()); + final long sizeAfterDeleteAndCommit = diff.missing.stream().mapToLong(StoreFileMetadata::length).sum(); + + final Set statsAfterFlush = primaryShard.getReplicationStats(); + shardStats = statsAfterFlush.stream().findFirst().get(); + assertEquals(sizeAfterDeleteAndCommit, shardStats.getBytesBehindCount()); + assertEquals(1, shardStats.getCheckpointsBehindCount()); + + replicateSegments(primaryShard, shards.getReplicas()); + assertReplicaCaughtUp(primaryShard); + shards.assertAllEqual(5); + } + } + + private void assertReplicaCaughtUp(IndexShard primaryShard) { + Set initialStats = primaryShard.getReplicationStats(); + assertEquals(initialStats.size(), 1); + SegmentReplicationShardStats shardStats = initialStats.stream().findFirst().get(); + assertEquals(0, shardStats.getCheckpointsBehindCount()); + assertEquals(0, shardStats.getBytesBehindCount()); + } + /** * Assert persisted and searchable doc counts. This method should not be used while docs are concurrently indexed because * it asserts point in time seqNos are relative to the doc counts.