From 8fedee6fae2cd915b24772868445d52b28a821c3 Mon Sep 17 00:00:00 2001 From: Ankit Kala Date: Thu, 8 Sep 2022 12:54:11 +0530 Subject: [PATCH] Addressing PR comments Signed-off-by: Ankit Kala --- CHANGELOG.md | 3 +- .../opensearch/index/shard/IndexShard.java | 6 +- .../index/shard/IndexShardTests.java | 60 +++++++++++++++++++ 3 files changed, 66 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ec8441831448f..9d0381fdfb27f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Plugin ZIP publication groupId value is configurable ([#4156](https://github.com/opensearch-project/OpenSearch/pull/4156)) - Add index specific setting for remote repository ([#4253](https://github.com/opensearch-project/OpenSearch/pull/4253)) - [Segment Replication] Update replicas to commit SegmentInfos instead of relying on SIS files from primary shards. ([#4402](https://github.com/opensearch-project/OpenSearch/pull/4402)) +- [CCR] Add getHistoryOperationsFromTranslog method to fetch the history snapshot from translogs ([#3948](https://github.com/opensearch-project/OpenSearch/pull/3948)) ### Deprecated @@ -78,4 +79,4 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) [Unreleased]: https://github.com/opensearch-project/OpenSearch/compare/2.2.0...HEAD -[2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.2.0...2.x \ No newline at end of file +[2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.2.0...2.x diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 2277930042a1c..089afa00902a6 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -2360,10 +2360,12 @@ public Translog.Snapshot getHistoryOperations(String reason, long startingSeqNo, /** * Creates a new history snapshot from the translog instead of the lucene index. Required for cross cluster replication. * Use the recommended {@link #getHistoryOperations(String, long, long, boolean)} method for other cases. - * Depending on how translog durability is configured, this method might/might not return the snapshot. For e.g, - * If the translog has been configured with no-durability, it'll return UnsupportedOperationException. + * This method should only be invoked if Segment Replication or Remote Store is not enabled. */ public Translog.Snapshot getHistoryOperationsFromTranslog(long startingSeqNo, long endSeqNo) throws IOException { + if (indexSettings.isSegRepEnabled() || indexSettings.isRemoteStoreEnabled()) { + throw new AssertionError("unsupported operation for segment replication enabled indices or remote store backed indices"); + } return getEngine().translogManager().newChangesSnapshot(startingSeqNo, endSeqNo, true); } diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 662afa80f65fc..27c0437236f63 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -1196,6 +1196,66 @@ public void testAcquireReplicaPermitAdvanceMaxSeqNoOfUpdates() throws Exception closeShards(replica); } + public void testGetChangesSnapshotThrowsAssertForSegRep() throws IOException { + final ShardId shardId = new ShardId("index", "_na_", 0); + final ShardRouting shardRouting = TestShardRouting.newShardRouting( + shardId, + randomAlphaOfLength(8), + true, + ShardRoutingState.INITIALIZING, + RecoverySource.EmptyStoreRecoverySource.INSTANCE + ); + final Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT.toString()) + .build(); + final IndexMetadata.Builder indexMetadata = IndexMetadata.builder(shardRouting.getIndexName()).settings(settings).primaryTerm(0, 1); + final AtomicBoolean synced = new AtomicBoolean(); + final IndexShard primaryShard = newShard( + shardRouting, + indexMetadata.build(), + null, + new InternalEngineFactory(), + () -> synced.set(true), + RetentionLeaseSyncer.EMPTY, + null + ); + expectThrows(AssertionError.class, () -> primaryShard.getHistoryOperationsFromTranslog(0, 1)); + closeShard(primaryShard, false); + } + + public void testGetChangesSnapshotThrowsAssertForRemoteStore() throws IOException { + final ShardId shardId = new ShardId("index", "_na_", 0); + final ShardRouting shardRouting = TestShardRouting.newShardRouting( + shardId, + randomAlphaOfLength(8), + true, + ShardRoutingState.INITIALIZING, + RecoverySource.EmptyStoreRecoverySource.INSTANCE + ); + final Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .build(); + final IndexMetadata.Builder indexMetadata = IndexMetadata.builder(shardRouting.getIndexName()).settings(settings).primaryTerm(0, 1); + final AtomicBoolean synced = new AtomicBoolean(); + final IndexShard primaryShard = newShard( + shardRouting, + indexMetadata.build(), + null, + new InternalEngineFactory(), + () -> synced.set(true), + RetentionLeaseSyncer.EMPTY, + null + ); + expectThrows(AssertionError.class, () -> primaryShard.getHistoryOperationsFromTranslog(0, 1)); + closeShard(primaryShard, false); + } + public void testGlobalCheckpointSync() throws IOException { // create the primary shard with a callback that sets a boolean when the global checkpoint sync is invoked final ShardId shardId = new ShardId("index", "_na_", 0);