Skip to content

Commit

Permalink
Fix Segment Replication stats bytes behind metric.
Browse files Browse the repository at this point in the history
This metric currently gives an estimate of the bytes behind based on the difference in size of the segments
referenced by the active readers between shards. This does not give a good indication of the amount of bytes
that need to be fetched and is inaccurate after deletes and merges. Fixed by sending file metadata with each checkpoint
and computing a diff between checkpoints when SegmentReplicationShardStats is built.

Signed-off-by: Marc Handalian <[email protected]>
  • Loading branch information
mch2 committed Sep 1, 2023
1 parent 04c90c7 commit b3e779a
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import org.opensearch.index.shard.AbstractIndexShardComponent;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ReplicationGroup;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.SegmentReplicationLagTimer;

Expand Down Expand Up @@ -1290,27 +1292,25 @@ public synchronized Set<SegmentReplicationShardStats> getSegmentReplicationStats
&& entry.getValue().inSync
&& replicationGroup.getUnavailableInSyncShards().contains(entry.getKey()) == false
)
.map(entry -> buildShardStats(latestReplicationCheckpoint.getLength(), entry.getKey(), entry.getValue()))
.map(entry -> buildShardStats(entry.getKey(), entry.getValue()))
.collect(Collectors.toUnmodifiableSet());
}
return Collections.emptySet();
}

private SegmentReplicationShardStats buildShardStats(
final long latestCheckpointLength,
final String allocationId,
final CheckpointState checkpointState
) {
final Map<ReplicationCheckpoint, SegmentReplicationLagTimer> checkpointTimers = checkpointState.checkpointTimers;
private SegmentReplicationShardStats buildShardStats(final String allocationId, final CheckpointState cps) {
final Store.RecoveryDiff diff = Store.segmentReplicationDiff(
latestReplicationCheckpoint.getMetadataMap(),
cps.visibleReplicationCheckpoint != null ? cps.visibleReplicationCheckpoint.getMetadataMap() : Collections.emptyMap()
);
final long bytesBehind = diff.missing.stream().mapToLong(StoreFileMetadata::length).sum();
return new SegmentReplicationShardStats(
allocationId,
checkpointTimers.size(),
checkpointState.visibleReplicationCheckpoint == null
? latestCheckpointLength
: Math.max(latestCheckpointLength - checkpointState.visibleReplicationCheckpoint.getLength(), 0),
checkpointTimers.values().stream().mapToLong(SegmentReplicationLagTimer::time).max().orElse(0),
checkpointTimers.values().stream().mapToLong(SegmentReplicationLagTimer::totalElapsedTime).max().orElse(0),
checkpointState.lastCompletedReplicationLag
cps.checkpointTimers.size(),
bytesBehind,
cps.checkpointTimers.values().stream().mapToLong(SegmentReplicationLagTimer::time).max().orElse(0),
cps.checkpointTimers.values().stream().mapToLong(SegmentReplicationLagTimer::totalElapsedTime).max().orElse(0),
cps.lastCompletedReplicationLag
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1609,15 +1609,17 @@ public Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> getLatestSegme
snapshot = getSegmentInfosSnapshot();
if (snapshot.get() != null) {
SegmentInfos segmentInfos = snapshot.get();
final Map<String, StoreFileMetadata> metadataMap = store.getSegmentMetadataMap(segmentInfos);
return new Tuple<>(
snapshot,
new ReplicationCheckpoint(
this.shardId,
getOperationPrimaryTerm(),
segmentInfos.getGeneration(),
segmentInfos.getVersion(),
store.getSegmentMetadataMap(segmentInfos).values().stream().mapToLong(StoreFileMetadata::length).sum(),
getEngine().config().getCodec().getName()
metadataMap.values().stream().mapToLong(StoreFileMetadata::length).sum(),
getEngine().config().getCodec().getName(),
metadataMap
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@

import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.Version;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;

import java.io.IOException;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -110,11 +113,13 @@ public void write(IndexOutput out) throws IOException {

public static RemoteSegmentMetadata read(IndexInput indexInput) throws IOException {
Map<String, String> metadata = indexInput.readMapOfStrings();
ReplicationCheckpoint replicationCheckpoint = readCheckpointFromIndexInput(indexInput);
final Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegmentMetadataMap = RemoteSegmentMetadata
.fromMapOfStrings(metadata);
ReplicationCheckpoint replicationCheckpoint = readCheckpointFromIndexInput(indexInput, uploadedSegmentMetadataMap);
int byteArraySize = (int) indexInput.readLong();
byte[] segmentInfosBytes = new byte[byteArraySize];
indexInput.readBytes(segmentInfosBytes, 0, byteArraySize);
return new RemoteSegmentMetadata(RemoteSegmentMetadata.fromMapOfStrings(metadata), segmentInfosBytes, replicationCheckpoint);
return new RemoteSegmentMetadata(uploadedSegmentMetadataMap, segmentInfosBytes, replicationCheckpoint);
}

public static void writeCheckpointToIndexOutput(ReplicationCheckpoint replicationCheckpoint, IndexOutput out) throws IOException {
Expand All @@ -131,14 +136,30 @@ public static void writeCheckpointToIndexOutput(ReplicationCheckpoint replicatio
out.writeString(replicationCheckpoint.getCodec());
}

private static ReplicationCheckpoint readCheckpointFromIndexInput(IndexInput in) throws IOException {
private static ReplicationCheckpoint readCheckpointFromIndexInput(
IndexInput in,
Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegmentMetadataMap
) throws IOException {
return new ReplicationCheckpoint(
new ShardId(new Index(in.readString(), in.readString()), in.readVInt()),
in.readLong(),
in.readLong(),
in.readLong(),
in.readLong(),
in.readString()
in.readString(),
toStoreFileMetadata(uploadedSegmentMetadataMap)
);
}

private static Map<String, StoreFileMetadata> toStoreFileMetadata(
Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> metadata
) {
return metadata.entrySet()
.stream()
// TODO: Version here should be read from UploadedSegmentMetadata.
.map(
entry -> new StoreFileMetadata(entry.getKey(), entry.getValue().getLength(), entry.getValue().getChecksum(), Version.LATEST)
)
.collect(Collectors.toMap(StoreFileMetadata::name, Function.identity()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.store.StoreFileMetadata;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;

/**
Expand All @@ -32,6 +35,7 @@ public class ReplicationCheckpoint implements Writeable, Comparable<ReplicationC
private final long segmentInfosVersion;
private final long length;
private final String codec;
private final Map<String, StoreFileMetadata> metadataMap;

public static ReplicationCheckpoint empty(ShardId shardId) {
return empty(shardId, "");
Expand All @@ -48,19 +52,29 @@ private ReplicationCheckpoint(ShardId shardId, String codec) {
segmentInfosVersion = SequenceNumbers.NO_OPS_PERFORMED;
length = 0L;
this.codec = codec;
this.metadataMap = Collections.emptyMap();
}

public ReplicationCheckpoint(ShardId shardId, long primaryTerm, long segmentsGen, long segmentInfosVersion, String codec) {
this(shardId, primaryTerm, segmentsGen, segmentInfosVersion, 0L, codec);
}

public ReplicationCheckpoint(ShardId shardId, long primaryTerm, long segmentsGen, long segmentInfosVersion, long length, String codec) {
this(shardId, primaryTerm, segmentsGen, segmentInfosVersion, 0L, codec, Collections.emptyMap());
}

public ReplicationCheckpoint(
ShardId shardId,
long primaryTerm,
long segmentsGen,
long segmentInfosVersion,
long length,
String codec,
Map<String, StoreFileMetadata> metadataMap
) {
this.shardId = shardId;
this.primaryTerm = primaryTerm;
this.segmentsGen = segmentsGen;
this.segmentInfosVersion = segmentInfosVersion;
this.length = length;
this.codec = codec;
this.metadataMap = metadataMap;
}

public ReplicationCheckpoint(StreamInput in) throws IOException {
Expand All @@ -75,6 +89,11 @@ public ReplicationCheckpoint(StreamInput in) throws IOException {
length = 0L;
codec = null;
}
if (in.getVersion().onOrAfter(Version.V_2_10_0)) {
this.metadataMap = in.readMap(StreamInput::readString, StoreFileMetadata::new);
} else {
this.metadataMap = Collections.emptyMap();
}
}

/**
Expand Down Expand Up @@ -135,6 +154,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(length);
out.writeString(codec);
}
if (out.getVersion().onOrAfter(Version.V_2_10_0)) {
out.writeMap(metadataMap, StreamOutput::writeString, (valueOut, fc) -> fc.writeTo(valueOut));
}
}

@Override
Expand Down Expand Up @@ -169,6 +191,10 @@ public boolean isAheadOf(@Nullable ReplicationCheckpoint other) {
|| (primaryTerm == other.getPrimaryTerm() && segmentInfosVersion > other.getSegmentInfosVersion());
}

public Map<String, StoreFileMetadata> getMetadataMap() {
return metadataMap;
}

@Override
public String toString() {
return "ReplicationCheckpoint{"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.index.seqno;

import org.apache.lucene.codecs.Codec;
import org.apache.lucene.util.Version;
import org.opensearch.action.support.replication.ReplicationResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.AllocationId;
Expand All @@ -50,6 +51,7 @@
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.SegmentReplicationShardStats;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.indices.replication.common.SegmentReplicationLagTimer;
Expand Down Expand Up @@ -1826,29 +1828,35 @@ public void testSegmentReplicationCheckpointTracking() {

initializingIds.forEach(aId -> markAsTrackingAndInSyncQuietly(tracker, aId.getId(), NO_OPS_PERFORMED));

final StoreFileMetadata segment_1 = new StoreFileMetadata("segment_1", 1L, "abcd", Version.LATEST);
final StoreFileMetadata segment_2 = new StoreFileMetadata("segment_2", 50L, "abcd", Version.LATEST);
final StoreFileMetadata segment_3 = new StoreFileMetadata("segment_3", 100L, "abcd", Version.LATEST);
final ReplicationCheckpoint initialCheckpoint = new ReplicationCheckpoint(
tracker.shardId(),
0L,
1,
1,
1L,
Codec.getDefault().getName()
Codec.getDefault().getName(),
Map.of("segment_1", segment_1)
);
final ReplicationCheckpoint secondCheckpoint = new ReplicationCheckpoint(
tracker.shardId(),
0L,
2,
2,
50L,
Codec.getDefault().getName()
51L,
Codec.getDefault().getName(),
Map.of("segment_1", segment_1, "segment_2", segment_2)
);
final ReplicationCheckpoint thirdCheckpoint = new ReplicationCheckpoint(
tracker.shardId(),
0L,
2,
3,
100L,
Codec.getDefault().getName()
151L,
Codec.getDefault().getName(),
Map.of("segment_1", segment_1, "segment_2", segment_2, "segment_3", segment_3)
);

tracker.setLatestReplicationCheckpoint(initialCheckpoint);
Expand All @@ -1864,7 +1872,7 @@ public void testSegmentReplicationCheckpointTracking() {
assertEquals(expectedIds.size(), groupStats.size());
for (SegmentReplicationShardStats shardStat : groupStats) {
assertEquals(3, shardStat.getCheckpointsBehindCount());
assertEquals(100L, shardStat.getBytesBehindCount());
assertEquals(151L, shardStat.getBytesBehindCount());
assertTrue(shardStat.getCurrentReplicationLagMillis() >= shardStat.getCurrentReplicationTimeMillis());
}

Expand All @@ -1881,7 +1889,7 @@ public void testSegmentReplicationCheckpointTracking() {
assertEquals(expectedIds.size(), groupStats.size());
for (SegmentReplicationShardStats shardStat : groupStats) {
assertEquals(2, shardStat.getCheckpointsBehindCount());
assertEquals(99L, shardStat.getBytesBehindCount());
assertEquals(150L, shardStat.getBytesBehindCount());
}

for (String id : expectedIds) {
Expand Down Expand Up @@ -1938,7 +1946,8 @@ public void testSegmentReplicationCheckpointTrackingInvalidAllocationIDs() {
1,
1,
1L,
Codec.getDefault().getName()
Codec.getDefault().getName(),
Collections.emptyMap()
);
tracker.setLatestReplicationCheckpoint(initialCheckpoint);
tracker.startReplicationLagTimers(initialCheckpoint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.SegmentReplicationShardStats;
import org.opensearch.index.engine.DocIdSeqNoAndSource;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.engine.InternalEngineFactory;
import org.opensearch.index.engine.NRTReplicationEngine;
Expand Down Expand Up @@ -69,6 +71,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -805,6 +808,65 @@ public void testQueryDuringEngineResetShowsDocs() throws Exception {
}
}

public void testSegmentReplicationStats() throws Exception {
final NRTReplicationEngineFactory engineFactory = new NRTReplicationEngineFactory();
final NRTReplicationEngineFactory spy = spy(engineFactory);
try (ReplicationGroup shards = createGroup(1, settings, indexMapping, spy, createTempDir())) {
final IndexShard primaryShard = shards.getPrimary();
final IndexShard replicaShard = shards.getReplicas().get(0);
shards.startAll();

assertReplicaCaughtUp(primaryShard);

shards.indexDocs(10);
shards.refresh("test");

final ReplicationCheckpoint primaryCheckpoint = primaryShard.getLatestReplicationCheckpoint();
final long initialCheckpointSize = primaryCheckpoint.getMetadataMap()
.values()
.stream()
.mapToLong(StoreFileMetadata::length)
.sum();

Set<SegmentReplicationShardStats> postRefreshStats = primaryShard.getReplicationStats();
SegmentReplicationShardStats shardStats = postRefreshStats.stream().findFirst().get();
assertEquals(1, shardStats.getCheckpointsBehindCount());
assertEquals(initialCheckpointSize, shardStats.getBytesBehindCount());
replicateSegments(primaryShard, shards.getReplicas());
assertReplicaCaughtUp(primaryShard);
shards.assertAllEqual(10);

final List<DocIdSeqNoAndSource> docIdAndSeqNos = getDocIdAndSeqNos(primaryShard);
for (DocIdSeqNoAndSource docIdAndSeqNo : docIdAndSeqNos.subList(0, 5)) {
deleteDoc(primaryShard, docIdAndSeqNo.getId());
// delete on replica for xlog.
deleteDoc(replicaShard, docIdAndSeqNo.getId());
}
primaryShard.forceMerge(new ForceMergeRequest().maxNumSegments(1).flush(true));

final Map<String, StoreFileMetadata> segmentMetadataMap = primaryShard.getSegmentMetadataMap();
final Store.RecoveryDiff diff = Store.segmentReplicationDiff(segmentMetadataMap, replicaShard.getSegmentMetadataMap());
final long sizeAfterDeleteAndCommit = diff.missing.stream().mapToLong(StoreFileMetadata::length).sum();

final Set<SegmentReplicationShardStats> statsAfterFlush = primaryShard.getReplicationStats();
shardStats = statsAfterFlush.stream().findFirst().get();
assertEquals(sizeAfterDeleteAndCommit, shardStats.getBytesBehindCount());
assertEquals(1, shardStats.getCheckpointsBehindCount());

replicateSegments(primaryShard, shards.getReplicas());
assertReplicaCaughtUp(primaryShard);
shards.assertAllEqual(5);
}
}

private void assertReplicaCaughtUp(IndexShard primaryShard) {
Set<SegmentReplicationShardStats> initialStats = primaryShard.getReplicationStats();
assertEquals(initialStats.size(), 1);
SegmentReplicationShardStats shardStats = initialStats.stream().findFirst().get();
assertEquals(0, shardStats.getCheckpointsBehindCount());
assertEquals(0, shardStats.getBytesBehindCount());
}

/**
* Assert persisted and searchable doc counts. This method should not be used while docs are concurrently indexed because
* it asserts point in time seqNos are relative to the doc counts.
Expand Down

0 comments on commit b3e779a

Please sign in to comment.