Skip to content

Commit

Permalink
Add getHistoryOperationsFromTranslog method to fetch the history snap…
Browse files Browse the repository at this point in the history
…shot from translogs (opensearch-project#3948)

* Add getHistoryOperationsFromTranslog method to fetch the hostory snapshot from translogs

Signed-off-by: Ankit Kala <[email protected]>
  • Loading branch information
ankitkala authored Sep 13, 2022
1 parent 54364a5 commit 5dc3d2e
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Plugin ZIP publication groupId value is configurable ([#4156](https://github.com/opensearch-project/OpenSearch/pull/4156))
- Add index specific setting for remote repository ([#4253](https://github.com/opensearch-project/OpenSearch/pull/4253))
- [Segment Replication] Update replicas to commit SegmentInfos instead of relying on SIS files from primary shards. ([#4402](https://github.com/opensearch-project/OpenSearch/pull/4402))
- [CCR] Add getHistoryOperationsFromTranslog method to fetch the history snapshot from translogs ([#3948](https://github.com/opensearch-project/OpenSearch/pull/3948))

### Deprecated

Expand Down
11 changes: 11 additions & 0 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -2357,6 +2357,17 @@ public Translog.Snapshot getHistoryOperations(String reason, long startingSeqNo,
return getEngine().newChangesSnapshot(reason, startingSeqNo, endSeqNo, true, accurateCount);
}

/**
* Creates a new history snapshot from the translog instead of the lucene index. Required for cross cluster replication.
* Use the recommended {@link #getHistoryOperations(String, long, long, boolean)} method for other cases.
* This method should only be invoked if Segment Replication or Remote Store is not enabled.
*/
public Translog.Snapshot getHistoryOperationsFromTranslog(long startingSeqNo, long endSeqNo) throws IOException {
assert (indexSettings.isSegRepEnabled() || indexSettings.isRemoteStoreEnabled()) == false
: "unsupported operation for segment replication enabled indices or remote store backed indices";
return getEngine().translogManager().newChangesSnapshot(startingSeqNo, endSeqNo, true);
}

/**
* Checks if we have a completed history of operations since the given starting seqno (inclusive).
* This method should be called after acquiring the retention lock; See {@link #acquireHistoryRetentionLock()}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ public void rollTranslogGeneration() throws TranslogException {
}
}

@Override
public Translog.Snapshot newChangesSnapshot(long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
return translog.newSnapshot(fromSeqNo, toSeqNo, requiredFullRange);
}

/**
* Performs recovery from the transaction log up to {@code recoverUpToSeqNo} (inclusive).
* This operation will close the engine if the recovery fails.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,9 @@ public Translog.Operation readOperation(Translog.Location location) throws IOExc
public Translog.Location add(Translog.Operation operation) throws IOException {
return new Translog.Location(0, 0, 0);
}

@Override
public Translog.Snapshot newChangesSnapshot(long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
throw new UnsupportedOperationException("Translog snapshot unsupported with no-op translogs");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ public interface TranslogManager {
*/
int recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long localCheckpoint, long recoverUpToSeqNo) throws IOException;

/**
* Creates a new history snapshot from the translog file instead of the lucene index.
*/
Translog.Snapshot newChangesSnapshot(long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException;

/**
* Checks if the underlying storage sync is required.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,9 @@ public int recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, lo
public void skipTranslogRecovery() {
// Do nothing.
}

@Override
public Translog.Snapshot newChangesSnapshot(long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
throw new UnsupportedOperationException("Translog snapshot unsupported with no-op translogs");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1196,6 +1196,66 @@ public void testAcquireReplicaPermitAdvanceMaxSeqNoOfUpdates() throws Exception
closeShards(replica);
}

public void testGetChangesSnapshotThrowsAssertForSegRep() throws IOException {
final ShardId shardId = new ShardId("index", "_na_", 0);
final ShardRouting shardRouting = TestShardRouting.newShardRouting(
shardId,
randomAlphaOfLength(8),
true,
ShardRoutingState.INITIALIZING,
RecoverySource.EmptyStoreRecoverySource.INSTANCE
);
final Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT.toString())
.build();
final IndexMetadata.Builder indexMetadata = IndexMetadata.builder(shardRouting.getIndexName()).settings(settings).primaryTerm(0, 1);
final AtomicBoolean synced = new AtomicBoolean();
final IndexShard primaryShard = newShard(
shardRouting,
indexMetadata.build(),
null,
new InternalEngineFactory(),
() -> synced.set(true),
RetentionLeaseSyncer.EMPTY,
null
);
expectThrows(AssertionError.class, () -> primaryShard.getHistoryOperationsFromTranslog(0, 1));
closeShard(primaryShard, false);
}

public void testGetChangesSnapshotThrowsAssertForRemoteStore() throws IOException {
final ShardId shardId = new ShardId("index", "_na_", 0);
final ShardRouting shardRouting = TestShardRouting.newShardRouting(
shardId,
randomAlphaOfLength(8),
true,
ShardRoutingState.INITIALIZING,
RecoverySource.EmptyStoreRecoverySource.INSTANCE
);
final Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.build();
final IndexMetadata.Builder indexMetadata = IndexMetadata.builder(shardRouting.getIndexName()).settings(settings).primaryTerm(0, 1);
final AtomicBoolean synced = new AtomicBoolean();
final IndexShard primaryShard = newShard(
shardRouting,
indexMetadata.build(),
null,
new InternalEngineFactory(),
() -> synced.set(true),
RetentionLeaseSyncer.EMPTY,
null
);
expectThrows(AssertionError.class, () -> primaryShard.getHistoryOperationsFromTranslog(0, 1));
closeShard(primaryShard, false);
}

public void testGlobalCheckpointSync() throws IOException {
// create the primary shard with a callback that sets a boolean when the global checkpoint sync is invoked
final ShardId shardId = new ShardId("index", "_na_", 0);
Expand Down

0 comments on commit 5dc3d2e

Please sign in to comment.