Skip to content

Commit

Permalink
Self review
Browse files Browse the repository at this point in the history
Signed-off-by: Suraj Singh <[email protected]>
  • Loading branch information
dreamer-89 committed Aug 10, 2023
1 parent 39624e0 commit 09d61c7
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 110 deletions.
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.shard;

import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.Version;
import org.junit.Assert;
import org.junit.Before;
import org.opensearch.action.ActionListener;
import org.opensearch.core.action.ActionListener;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.CancellableThreads;
Expand All @@ -24,7 +23,6 @@
import org.opensearch.index.engine.NRTReplicationEngineFactory;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.replication.TestRemoteStoreReplicationSource;
import org.opensearch.index.replication.TestReplicationSource;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
Expand All @@ -37,15 +35,12 @@
import org.opensearch.indices.replication.common.ReplicationType;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.equalTo;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.opensearch.index.engine.EngineTestCase.assertAtMostOneLuceneDocumentPerSequenceNumber;

public class RemoteIndexShardTests extends SegmentReplicationIndexShardTests {
Expand Down Expand Up @@ -98,9 +93,7 @@ public void testCloseShardWhileGettingCheckpoint() throws Exception {
shards.startAll();
IndexShard primary = shards.getPrimary();
final IndexShard replica = shards.getReplicas().get(0);

primary.refresh("Test");

final SegmentReplicationTargetService targetService = newTargetService();
final CancellableThreads cancellableThreads = new CancellableThreads();

Expand Down Expand Up @@ -157,10 +150,8 @@ public void getCheckpointMetadata(
ReplicationCheckpoint checkpoint,
ActionListener<CheckpointInfoResponse> listener
) {
logger.info("--> getCheckpointMetadata");
try {
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = this.getRemoteDirectory();
RemoteSegmentMetadata mdFile = remoteSegmentStoreDirectory.init();
RemoteSegmentMetadata mdFile = this.getRemoteDirectory().init();
final Version version = replica.getSegmentInfosSnapshot().get().getCommitLuceneVersion();
Map<String, StoreFileMetadata> metadataMap = mdFile.getMetadata()
.entrySet()
Expand All @@ -177,7 +168,9 @@ public void getCheckpointMetadata(
)
)
);
listener.onResponse(new CheckpointInfoResponse(mdFile.getReplicationCheckpoint(), metadataMap, mdFile.getSegmentInfosBytes()));
listener.onResponse(
new CheckpointInfoResponse(mdFile.getReplicationCheckpoint(), metadataMap, mdFile.getSegmentInfosBytes())
);
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand All @@ -192,15 +185,10 @@ public void getSegmentFiles(
ActionListener<GetSegmentFilesResponse> listener
) {
try {

logger.info("--> getSegmentFiles {}", filesToFetch);
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = this.getRemoteDirectory();
RemoteSegmentMetadata mdFile = remoteSegmentStoreDirectory.init();
Collection<String> directoryFiles = List.of(indexShard.store().directory().listAll());
final Directory storeDirectory = indexShard.store().directory();
for (StoreFileMetadata fileMetadata : filesToFetch) {
String file = fileMetadata.name();
assert directoryFiles.contains(file) == false : "Local store already contains the file " + file;
storeDirectory.copyFrom(remoteSegmentStoreDirectory, file, file, IOContext.DEFAULT);
break; // download single file
}
Expand All @@ -210,7 +198,8 @@ public void getSegmentFiles(
cancellableThreads.checkForCancel();
} catch (IOException e) {
throw new RuntimeException(e);
} }
}
}
};
startReplicationAndAssertCancellation(replica, primary, targetService, source, cancellableThreads);
shards.removeReplica(replica);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -810,9 +810,7 @@ public void onReplicationFailure(SegmentReplicationState state, ReplicationFaile
},
cancellableThreads
);

targetService.startReplication(target);

latch.await(5, TimeUnit.SECONDS);
assertEquals("Should have resolved listener with failure", 0, latch.getCount());
assertNull(targetService.get(target.getId()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,58 @@ public void getSegmentFiles(
}
}

public void testCloseShardWhileGettingCheckpoint() throws Exception {
try (ReplicationGroup shards = createGroup(1, getIndexSettings(), new NRTReplicationEngineFactory())) {
shards.startAll();
IndexShard primary = shards.getPrimary();
final IndexShard replica = shards.getReplicas().get(0);

primary.refresh("Test");

final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class);
final SegmentReplicationTargetService targetService = newTargetService(sourceFactory);
SegmentReplicationSource source = new TestReplicationSource() {

ActionListener<CheckpointInfoResponse> listener;

@Override
public void getCheckpointMetadata(
long replicationId,
ReplicationCheckpoint checkpoint,
ActionListener<CheckpointInfoResponse> listener
) {
// set the listener, we will only fail it once we cancel the source.
this.listener = listener;
// shard is closing while we are copying files.
targetService.beforeIndexShardClosed(replica.shardId, replica, Settings.EMPTY);
}

@Override
public void getSegmentFiles(
long replicationId,
ReplicationCheckpoint checkpoint,
List<StoreFileMetadata> filesToFetch,
IndexShard indexShard,
ActionListener<GetSegmentFilesResponse> listener
) {
Assert.fail("Unreachable");
}

@Override
public void cancel() {
// simulate listener resolving, but only after we have issued a cancel from beforeIndexShardClosed .
final RuntimeException exception = new CancellableThreads.ExecutionCancelledException("retryable action was cancelled");
listener.onFailure(exception);
}
};
when(sourceFactory.get(any(), any())).thenReturn(source);
startReplicationAndAssertCancellation(replica, primary, targetService);

shards.removeReplica(replica);
closeShards(replica);
}
}

public void testPrimaryCancelsExecution() throws Exception {
try (ReplicationGroup shards = createGroup(1, getIndexSettings(), new NRTReplicationEngineFactory())) {
shards.startAll();
Expand Down Expand Up @@ -645,58 +697,6 @@ public void testSegmentReplication_Index_Update_Delete() throws Exception {
}
}

public void testCloseShardWhileGettingCheckpoint() throws Exception {
try (ReplicationGroup shards = createGroup(1, getIndexSettings(), new NRTReplicationEngineFactory())) {
shards.startAll();
IndexShard primary = shards.getPrimary();
final IndexShard replica = shards.getReplicas().get(0);

primary.refresh("Test");

final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class);
final SegmentReplicationTargetService targetService = newTargetService(sourceFactory);
SegmentReplicationSource source = new TestReplicationSource() {

ActionListener<CheckpointInfoResponse> listener;

@Override
public void getCheckpointMetadata(
long replicationId,
ReplicationCheckpoint checkpoint,
ActionListener<CheckpointInfoResponse> listener
) {
// set the listener, we will only fail it once we cancel the source.
this.listener = listener;
// shard is closing while we are copying files.
targetService.beforeIndexShardClosed(replica.shardId, replica, Settings.EMPTY);
}

@Override
public void getSegmentFiles(
long replicationId,
ReplicationCheckpoint checkpoint,
List<StoreFileMetadata> filesToFetch,
IndexShard indexShard,
ActionListener<GetSegmentFilesResponse> listener
) {
Assert.fail("Unreachable");
}

@Override
public void cancel() {
// simulate listener resolving, but only after we have issued a cancel from beforeIndexShardClosed .
final RuntimeException exception = new CancellableThreads.ExecutionCancelledException("retryable action was cancelled");
listener.onFailure(exception);
}
};
when(sourceFactory.get(any(), any())).thenReturn(source);
startReplicationAndAssertCancellation(replica, primary, targetService);

shards.removeReplica(replica);
closeShards(replica);
}
}

public void testBeforeIndexShardClosedWhileCopyingFiles() throws Exception {
try (ReplicationGroup shards = createGroup(1, getIndexSettings(), new NRTReplicationEngineFactory())) {
shards.startAll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,6 @@ public class SegmentReplicationTargetServiceTests extends IndexShardTestCase {
private SegmentReplicationState state;
private ReplicationCheckpoint initialCheckpoint;

private CancellableThreads cancellableThreads;

private static final long TRANSPORT_TIMEOUT = 30000;// 30sec

@Override
Expand Down Expand Up @@ -156,8 +154,6 @@ public void setUp() throws Exception {
"",
new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT)
);

cancellableThreads = new CancellableThreads();
}

@Override
Expand Down Expand Up @@ -234,7 +230,7 @@ public void onReplicationFailure(SegmentReplicationState state, ReplicationFaile
latch.countDown();
}
},
cancellableThreads
new CancellableThreads()
);
sut.startReplication(target);
latch.await(2, TimeUnit.SECONDS);
Expand Down Expand Up @@ -287,7 +283,7 @@ public void getSegmentFiles(
primaryShard.getLatestReplicationCheckpoint(),
source,
mock(SegmentReplicationTargetService.SegmentReplicationListener.class),
cancellableThreads
new CancellableThreads()
)
);

Expand Down Expand Up @@ -361,7 +357,7 @@ public void cancel() {
updatedCheckpoint,
source,
mock(SegmentReplicationTargetService.SegmentReplicationListener.class),
cancellableThreads
new CancellableThreads()
)
);

Expand Down Expand Up @@ -609,7 +605,7 @@ public void onReplicationFailure(SegmentReplicationState state, ReplicationFaile
assertTrue(e.getCause() instanceof CancellableThreads.ExecutionCancelledException);
}
},
cancellableThreads
new CancellableThreads()
);
target.cancel("test");
sut.startReplication(target);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,59 +8,41 @@

package org.opensearch.index.replication;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.FilterDirectory;
import org.opensearch.action.ActionListener;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.core.action.ActionListener;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.replication.CheckpointInfoResponse;
import org.opensearch.indices.replication.GetSegmentFilesResponse;
import org.opensearch.indices.replication.RemoteStoreReplicationSource;
import org.opensearch.indices.replication.SegmentReplicationSource;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;

import java.util.List;

/**
* Defines test SegmentReplicationSource for remote store to be used in unit tests
* Defines SegmentReplicationSource for remote store to be used in unit tests
*/
public abstract class TestRemoteStoreReplicationSource implements SegmentReplicationSource {

private static final Logger logger = LogManager.getLogger(TestRemoteStoreReplicationSource.class);
private final CancellableThreads cancellableThreads;
private final IndexShard targetShard;

private CancellableThreads cancellableThreads;
private IndexShard targetShard;

public CancellableThreads getCancellableThreads() {
return cancellableThreads;
}

public IndexShard getTargetShard() {
return targetShard;
}
private final RemoteSegmentStoreDirectory remoteDirectory;

public RemoteSegmentStoreDirectory getRemoteDirectory() {
return remoteDirectory;
}

private RemoteSegmentStoreDirectory remoteDirectory;

public TestRemoteStoreReplicationSource(CancellableThreads cancellableThreads, IndexShard targetShard) {
logger.info("--> TestReplicationSource {}", cancellableThreads);
this.targetShard = targetShard;
FilterDirectory remoteStoreDirectory = (FilterDirectory) targetShard.remoteStore().directory();
FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate();
this.remoteDirectory = (RemoteSegmentStoreDirectory) byteSizeCachingStoreDirectory.getDelegate();
this.cancellableThreads = cancellableThreads;
}

public TestRemoteStoreReplicationSource() {
logger.info("--> TestReplicationSource default");
}

@Override
public abstract void getCheckpointMetadata(
long replicationId,
Expand All @@ -79,6 +61,6 @@ public abstract void getSegmentFiles(

@Override
public String getDescription() {
return "TestReplicationSource";
return "TestRemoteStoreReplicationSource";
}
}

0 comments on commit 09d61c7

Please sign in to comment.