From 09d61c7bf2d815451ad60086b6943a07f1aa2a00 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Thu, 10 Aug 2023 12:19:05 -0700 Subject: [PATCH] Self review Signed-off-by: Suraj Singh --- .../index/shard/RemoteIndexShardTests.java | 37 +++---- .../SegmentReplicationIndexShardTests.java | 2 - ...licationWithNodeToNodeIndexShardTests.java | 104 +++++++++--------- .../SegmentReplicationTargetServiceTests.java | 12 +- .../TestRemoteStoreReplicationSource.java | 30 +---- 5 files changed, 75 insertions(+), 110 deletions(-) diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java index 3340f72dee5c2..c874a34d5c237 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java @@ -1,20 +1,19 @@ /* -* SPDX-License-Identifier: Apache-2.0 -* -* The OpenSearch Contributors require contributions made to -* this file be licensed under the Apache-2.0 license or a -* compatible open source license. -*/ + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ package org.opensearch.index.shard; import org.apache.lucene.store.Directory; -import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.store.IOContext; import org.apache.lucene.util.Version; import org.junit.Assert; import org.junit.Before; -import org.opensearch.action.ActionListener; +import org.opensearch.core.action.ActionListener; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.CancellableThreads; @@ -24,7 +23,6 @@ import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.replication.TestRemoteStoreReplicationSource; -import org.opensearch.index.replication.TestReplicationSource; import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; @@ -37,15 +35,12 @@ import org.opensearch.indices.replication.common.ReplicationType; import java.io.IOException; -import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.stream.Collectors; import static org.hamcrest.Matchers.equalTo; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; import static org.opensearch.index.engine.EngineTestCase.assertAtMostOneLuceneDocumentPerSequenceNumber; public class RemoteIndexShardTests extends SegmentReplicationIndexShardTests { @@ -98,9 +93,7 @@ public void testCloseShardWhileGettingCheckpoint() throws Exception { shards.startAll(); IndexShard primary = shards.getPrimary(); final IndexShard replica = shards.getReplicas().get(0); - primary.refresh("Test"); - final SegmentReplicationTargetService targetService = newTargetService(); final CancellableThreads cancellableThreads = new CancellableThreads(); @@ -157,10 +150,8 @@ public void getCheckpointMetadata( ReplicationCheckpoint checkpoint, ActionListener listener ) { - logger.info("--> getCheckpointMetadata"); try { - RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = this.getRemoteDirectory(); - RemoteSegmentMetadata mdFile = remoteSegmentStoreDirectory.init(); + RemoteSegmentMetadata mdFile = this.getRemoteDirectory().init(); final Version version = replica.getSegmentInfosSnapshot().get().getCommitLuceneVersion(); Map metadataMap = mdFile.getMetadata() .entrySet() @@ -177,7 +168,9 @@ public void getCheckpointMetadata( ) ) ); - listener.onResponse(new CheckpointInfoResponse(mdFile.getReplicationCheckpoint(), metadataMap, mdFile.getSegmentInfosBytes())); + listener.onResponse( + new CheckpointInfoResponse(mdFile.getReplicationCheckpoint(), metadataMap, mdFile.getSegmentInfosBytes()) + ); } catch (IOException e) { throw new RuntimeException(e); } @@ -192,15 +185,10 @@ public void getSegmentFiles( ActionListener listener ) { try { - - logger.info("--> getSegmentFiles {}", filesToFetch); RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = this.getRemoteDirectory(); - RemoteSegmentMetadata mdFile = remoteSegmentStoreDirectory.init(); - Collection directoryFiles = List.of(indexShard.store().directory().listAll()); final Directory storeDirectory = indexShard.store().directory(); for (StoreFileMetadata fileMetadata : filesToFetch) { String file = fileMetadata.name(); - assert directoryFiles.contains(file) == false : "Local store already contains the file " + file; storeDirectory.copyFrom(remoteSegmentStoreDirectory, file, file, IOContext.DEFAULT); break; // download single file } @@ -210,7 +198,8 @@ public void getSegmentFiles( cancellableThreads.checkForCancel(); } catch (IOException e) { throw new RuntimeException(e); - } } + } + } }; startReplicationAndAssertCancellation(replica, primary, targetService, source, cancellableThreads); shards.removeReplica(replica); diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 2cced4257f13a..7aa2c15e1168e 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -810,9 +810,7 @@ public void onReplicationFailure(SegmentReplicationState state, ReplicationFaile }, cancellableThreads ); - targetService.startReplication(target); - latch.await(5, TimeUnit.SECONDS); assertEquals("Should have resolved listener with failure", 0, latch.getCount()); assertNull(targetService.get(target.getId())); diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java index f8e215eb88edf..6526b51437586 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java @@ -144,6 +144,58 @@ public void getSegmentFiles( } } + public void testCloseShardWhileGettingCheckpoint() throws Exception { + try (ReplicationGroup shards = createGroup(1, getIndexSettings(), new NRTReplicationEngineFactory())) { + shards.startAll(); + IndexShard primary = shards.getPrimary(); + final IndexShard replica = shards.getReplicas().get(0); + + primary.refresh("Test"); + + final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); + final SegmentReplicationTargetService targetService = newTargetService(sourceFactory); + SegmentReplicationSource source = new TestReplicationSource() { + + ActionListener listener; + + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + // set the listener, we will only fail it once we cancel the source. + this.listener = listener; + // shard is closing while we are copying files. + targetService.beforeIndexShardClosed(replica.shardId, replica, Settings.EMPTY); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + IndexShard indexShard, + ActionListener listener + ) { + Assert.fail("Unreachable"); + } + + @Override + public void cancel() { + // simulate listener resolving, but only after we have issued a cancel from beforeIndexShardClosed . + final RuntimeException exception = new CancellableThreads.ExecutionCancelledException("retryable action was cancelled"); + listener.onFailure(exception); + } + }; + when(sourceFactory.get(any(), any())).thenReturn(source); + startReplicationAndAssertCancellation(replica, primary, targetService); + + shards.removeReplica(replica); + closeShards(replica); + } + } + public void testPrimaryCancelsExecution() throws Exception { try (ReplicationGroup shards = createGroup(1, getIndexSettings(), new NRTReplicationEngineFactory())) { shards.startAll(); @@ -645,58 +697,6 @@ public void testSegmentReplication_Index_Update_Delete() throws Exception { } } - public void testCloseShardWhileGettingCheckpoint() throws Exception { - try (ReplicationGroup shards = createGroup(1, getIndexSettings(), new NRTReplicationEngineFactory())) { - shards.startAll(); - IndexShard primary = shards.getPrimary(); - final IndexShard replica = shards.getReplicas().get(0); - - primary.refresh("Test"); - - final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class); - final SegmentReplicationTargetService targetService = newTargetService(sourceFactory); - SegmentReplicationSource source = new TestReplicationSource() { - - ActionListener listener; - - @Override - public void getCheckpointMetadata( - long replicationId, - ReplicationCheckpoint checkpoint, - ActionListener listener - ) { - // set the listener, we will only fail it once we cancel the source. - this.listener = listener; - // shard is closing while we are copying files. - targetService.beforeIndexShardClosed(replica.shardId, replica, Settings.EMPTY); - } - - @Override - public void getSegmentFiles( - long replicationId, - ReplicationCheckpoint checkpoint, - List filesToFetch, - IndexShard indexShard, - ActionListener listener - ) { - Assert.fail("Unreachable"); - } - - @Override - public void cancel() { - // simulate listener resolving, but only after we have issued a cancel from beforeIndexShardClosed . - final RuntimeException exception = new CancellableThreads.ExecutionCancelledException("retryable action was cancelled"); - listener.onFailure(exception); - } - }; - when(sourceFactory.get(any(), any())).thenReturn(source); - startReplicationAndAssertCancellation(replica, primary, targetService); - - shards.removeReplica(replica); - closeShards(replica); - } - } - public void testBeforeIndexShardClosedWhileCopyingFiles() throws Exception { try (ReplicationGroup shards = createGroup(1, getIndexSettings(), new NRTReplicationEngineFactory())) { shards.startAll(); diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index 1383e79399512..e7e10470931dd 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -86,8 +86,6 @@ public class SegmentReplicationTargetServiceTests extends IndexShardTestCase { private SegmentReplicationState state; private ReplicationCheckpoint initialCheckpoint; - private CancellableThreads cancellableThreads; - private static final long TRANSPORT_TIMEOUT = 30000;// 30sec @Override @@ -156,8 +154,6 @@ public void setUp() throws Exception { "", new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT) ); - - cancellableThreads = new CancellableThreads(); } @Override @@ -234,7 +230,7 @@ public void onReplicationFailure(SegmentReplicationState state, ReplicationFaile latch.countDown(); } }, - cancellableThreads + new CancellableThreads() ); sut.startReplication(target); latch.await(2, TimeUnit.SECONDS); @@ -287,7 +283,7 @@ public void getSegmentFiles( primaryShard.getLatestReplicationCheckpoint(), source, mock(SegmentReplicationTargetService.SegmentReplicationListener.class), - cancellableThreads + new CancellableThreads() ) ); @@ -361,7 +357,7 @@ public void cancel() { updatedCheckpoint, source, mock(SegmentReplicationTargetService.SegmentReplicationListener.class), - cancellableThreads + new CancellableThreads() ) ); @@ -609,7 +605,7 @@ public void onReplicationFailure(SegmentReplicationState state, ReplicationFaile assertTrue(e.getCause() instanceof CancellableThreads.ExecutionCancelledException); } }, - cancellableThreads + new CancellableThreads() ); target.cancel("test"); sut.startReplication(target); diff --git a/test/framework/src/main/java/org/opensearch/index/replication/TestRemoteStoreReplicationSource.java b/test/framework/src/main/java/org/opensearch/index/replication/TestRemoteStoreReplicationSource.java index 31fe3a83d4d11..e30f986ce6896 100644 --- a/test/framework/src/main/java/org/opensearch/index/replication/TestRemoteStoreReplicationSource.java +++ b/test/framework/src/main/java/org/opensearch/index/replication/TestRemoteStoreReplicationSource.java @@ -8,48 +8,34 @@ package org.opensearch.index.replication; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.apache.lucene.store.FilterDirectory; -import org.opensearch.action.ActionListener; import org.opensearch.common.util.CancellableThreads; +import org.opensearch.core.action.ActionListener; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.replication.CheckpointInfoResponse; import org.opensearch.indices.replication.GetSegmentFilesResponse; -import org.opensearch.indices.replication.RemoteStoreReplicationSource; import org.opensearch.indices.replication.SegmentReplicationSource; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import java.util.List; /** - * Defines test SegmentReplicationSource for remote store to be used in unit tests + * Defines SegmentReplicationSource for remote store to be used in unit tests */ public abstract class TestRemoteStoreReplicationSource implements SegmentReplicationSource { - private static final Logger logger = LogManager.getLogger(TestRemoteStoreReplicationSource.class); + private final CancellableThreads cancellableThreads; + private final IndexShard targetShard; - private CancellableThreads cancellableThreads; - private IndexShard targetShard; - - public CancellableThreads getCancellableThreads() { - return cancellableThreads; - } - - public IndexShard getTargetShard() { - return targetShard; - } + private final RemoteSegmentStoreDirectory remoteDirectory; public RemoteSegmentStoreDirectory getRemoteDirectory() { return remoteDirectory; } - private RemoteSegmentStoreDirectory remoteDirectory; - public TestRemoteStoreReplicationSource(CancellableThreads cancellableThreads, IndexShard targetShard) { - logger.info("--> TestReplicationSource {}", cancellableThreads); this.targetShard = targetShard; FilterDirectory remoteStoreDirectory = (FilterDirectory) targetShard.remoteStore().directory(); FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate(); @@ -57,10 +43,6 @@ public TestRemoteStoreReplicationSource(CancellableThreads cancellableThreads, I this.cancellableThreads = cancellableThreads; } - public TestRemoteStoreReplicationSource() { - logger.info("--> TestReplicationSource default"); - } - @Override public abstract void getCheckpointMetadata( long replicationId, @@ -79,6 +61,6 @@ public abstract void getSegmentFiles( @Override public String getDescription() { - return "TestReplicationSource"; + return "TestRemoteStoreReplicationSource"; } }