Skip to content

Commit

Permalink
[Backport 2.x] [Segment Replication] Add a new Engine implementation …
Browse files Browse the repository at this point in the history
…for replicas with segment replication enabled. (#4003)

* Backporting to 2.x by resolving merge conflicts and adding implementation for newChangesSnapshotFromTranslogFile() in NRTReplicationEngine

Signed-off-by: Rishikesh1159 <[email protected]>

Co-authored-by: Marc Handalian <[email protected]>
  • Loading branch information
Rishikesh1159 and mch2 authored Jul 28, 2022
1 parent 1d68b52 commit 053cf55
Show file tree
Hide file tree
Showing 16 changed files with 1,050 additions and 28 deletions.
20 changes: 20 additions & 0 deletions server/src/main/java/org/opensearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -169,13 +169,33 @@ public final EngineConfig config() {

protected abstract SegmentInfos getLastCommittedSegmentInfos();

/**
* Return the latest active SegmentInfos from the engine.
* @return {@link SegmentInfos}
*/
protected SegmentInfos getLatestSegmentInfos() {
// Default Implementation.
return null;
};

public MergeStats getMergeStats() {
return new MergeStats();
}

/** returns the history uuid for the engine */
public abstract String getHistoryUUID();

/**
* Reads the current stored history ID from commit data.
*/
String loadHistoryUUID(Map<String, String> commitData) {
final String uuid = commitData.get(HISTORY_UUID_KEY);
if (uuid == null) {
throw new IllegalStateException("commit doesn't contain history uuid");
}
return uuid;
}

/** Returns how many bytes we are currently moving from heap to disk */
public abstract long getWritingBytes();

Expand Down
72 changes: 72 additions & 0 deletions server/src/main/java/org/opensearch/index/engine/EngineConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public final class EngineConfig {
private final CircuitBreakerService circuitBreakerService;
private final LongSupplier globalCheckpointSupplier;
private final Supplier<RetentionLeases> retentionLeasesSupplier;
private final boolean isReadOnlyReplica;

/**
* A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been
Expand Down Expand Up @@ -228,6 +229,66 @@ public EngineConfig(
LongSupplier primaryTermSupplier,
TombstoneDocSupplier tombstoneDocSupplier
) {
this(
shardId,
threadPool,
indexSettings,
warmer,
store,
mergePolicy,
analyzer,
similarity,
codecService,
eventListener,
queryCache,
queryCachingPolicy,
translogConfig,
translogDeletionPolicyFactory,
flushMergesAfter,
externalRefreshListener,
internalRefreshListener,
indexSort,
circuitBreakerService,
globalCheckpointSupplier,
retentionLeasesSupplier,
primaryTermSupplier,
tombstoneDocSupplier,
false
);
}

/**
* Creates a new {@link org.opensearch.index.engine.EngineConfig}
*/
EngineConfig(
ShardId shardId,
ThreadPool threadPool,
IndexSettings indexSettings,
Engine.Warmer warmer,
Store store,
MergePolicy mergePolicy,
Analyzer analyzer,
Similarity similarity,
CodecService codecService,
Engine.EventListener eventListener,
QueryCache queryCache,
QueryCachingPolicy queryCachingPolicy,
TranslogConfig translogConfig,
TranslogDeletionPolicyFactory translogDeletionPolicyFactory,
TimeValue flushMergesAfter,
List<ReferenceManager.RefreshListener> externalRefreshListener,
List<ReferenceManager.RefreshListener> internalRefreshListener,
Sort indexSort,
CircuitBreakerService circuitBreakerService,
LongSupplier globalCheckpointSupplier,
Supplier<RetentionLeases> retentionLeasesSupplier,
LongSupplier primaryTermSupplier,
TombstoneDocSupplier tombstoneDocSupplier,
boolean isReadOnlyReplica
) {
if (isReadOnlyReplica && indexSettings.isSegRepEnabled() == false) {
throw new IllegalArgumentException("Shard can only be wired as a read only replica with Segment Replication enabled");
}
this.shardId = shardId;
this.indexSettings = indexSettings;
this.threadPool = threadPool;
Expand Down Expand Up @@ -266,6 +327,7 @@ public EngineConfig(
this.retentionLeasesSupplier = Objects.requireNonNull(retentionLeasesSupplier);
this.primaryTermSupplier = primaryTermSupplier;
this.tombstoneDocSupplier = tombstoneDocSupplier;
this.isReadOnlyReplica = isReadOnlyReplica;
}

/**
Expand Down Expand Up @@ -460,6 +522,16 @@ public LongSupplier getPrimaryTermSupplier() {
return primaryTermSupplier;
}

/**
* Returns if this replica should be wired as a read only.
* This is used for Segment Replication where the engine implementation used is dependent on
* if the shard is a primary/replica.
* @return true if this engine should be wired as read only.
*/
public boolean isReadOnlyReplica() {
return indexSettings.isSegRepEnabled() && isReadOnlyReplica;
}

/**
* A supplier supplies tombstone documents which will be used in soft-update methods.
* The returned document consists only _uid, _seqno, _term and _version fields; other metadata fields are excluded.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ public EngineConfig newEngineConfig(
LongSupplier globalCheckpointSupplier,
Supplier<RetentionLeases> retentionLeasesSupplier,
LongSupplier primaryTermSupplier,
EngineConfig.TombstoneDocSupplier tombstoneDocSupplier
EngineConfig.TombstoneDocSupplier tombstoneDocSupplier,
boolean isReadOnlyReplica
) {
CodecService codecServiceToUse = codecService;
if (codecService == null && this.codecServiceFactory != null) {
Expand Down Expand Up @@ -176,7 +177,8 @@ public EngineConfig newEngineConfig(
globalCheckpointSupplier,
retentionLeasesSupplier,
primaryTermSupplier,
tombstoneDocSupplier
tombstoneDocSupplier,
isReadOnlyReplica
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.ShuffleForcedMergePolicy;
import org.apache.lucene.index.SoftDeletesRetentionMergePolicy;
import org.apache.lucene.index.StandardDirectoryReader;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
Expand Down Expand Up @@ -648,17 +649,6 @@ public long getWritingBytes() {
return indexWriter.getFlushingBytes() + versionMap.getRefreshingBytes();
}

/**
* Reads the current stored history ID from the IW commit data.
*/
private String loadHistoryUUID(Map<String, String> commitData) {
final String uuid = commitData.get(HISTORY_UUID_KEY);
if (uuid == null) {
throw new IllegalStateException("commit doesn't contain history uuid");
}
return uuid;
}

private ExternalReaderManager createReaderManager(RefreshWarmerListener externalRefreshListener) throws EngineException {
boolean success = false;
OpenSearchReaderManager internalReaderManager = null;
Expand Down Expand Up @@ -2298,6 +2288,23 @@ protected SegmentInfos getLastCommittedSegmentInfos() {
return lastCommittedSegmentInfos;
}

@Override
public SegmentInfos getLatestSegmentInfos() {
OpenSearchDirectoryReader reader = null;
try {
reader = internalReaderManager.acquire();
return ((StandardDirectoryReader) reader.getDelegate()).getSegmentInfos();
} catch (IOException e) {
throw new EngineException(shardId, e.getMessage(), e);
} finally {
try {
internalReaderManager.release(reader);
} catch (IOException e) {
throw new EngineException(shardId, e.getMessage(), e);
}
}
}

@Override
protected final void writerSegmentStats(SegmentsStats stats) {
stats.addVersionMapMemoryInBytes(versionMap.ramBytesUsed());
Expand Down
Loading

0 comments on commit 053cf55

Please sign in to comment.