Skip to content

Commit

Permalink
Incorporate PR review feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed Nov 16, 2022
1 parent 7842788 commit 3e5e2ee
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@
import java.util.function.Consumer;

/**
* This handler is used for the peer recovery when there is no remote store available for segments/translogs. TODO -
* Add more details as this is refactored further.
* This handler is used for node-to-node peer recovery when the recovery target is a replica/ or a relocating primary
* shard with translog backed by local store.
*
* @opensearch.internal
*/
public class DefaultRecoverySourceHandler extends RecoverySourceHandler {
public class LocalStorePeerRecoverySourceHandler extends RecoverySourceHandler {

public DefaultRecoverySourceHandler(
public LocalStorePeerRecoverySourceHandler(
IndexShard shard,
RecoveryTargetHandler recoveryTarget,
ThreadPool threadPool,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,6 @@ void awaitEmpty() {

private final class ShardRecoveryContext {
final Map<RecoverySourceHandler, RemoteRecoveryTargetHandler> recoveryHandlers = new HashMap<>();
private final RecoverySourceHandlerFactory recoverySourceHandlerFactory = new RecoverySourceHandlerFactory();

/**
* Adds recovery source handler.
Expand Down Expand Up @@ -379,7 +378,7 @@ private Tuple<RecoverySourceHandler, RemoteRecoveryTargetHandler> createRecovery
recoverySettings,
throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime)
);
handler = recoverySourceHandlerFactory.create(shard, recoveryTarget, request, recoverySettings);
handler = RecoverySourceHandlerFactory.create(shard, recoveryTarget, request, recoverySettings);
return Tuple.tuple(handler, recoveryTarget);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public abstract class RecoverySourceHandler {
public static final String PEER_RECOVERY_NAME = "peer-recovery";
private final SegmentFileTransferHandler transferHandler;

public RecoverySourceHandler(
RecoverySourceHandler(
IndexShard shard,
RecoveryTargetHandler recoveryTarget,
ThreadPool threadPool,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,20 @@

/**
* Factory that supplies {@link RecoverySourceHandler}.
*
* @opensearch.internal
*/
public class RecoverySourceHandlerFactory {

public RecoverySourceHandler create(
public static RecoverySourceHandler create(
IndexShard shard,
RecoveryTargetHandler recoveryTarget,
StartRecoveryRequest request,
RecoverySettings recoverySettings
) {
boolean isReplicaRecoveryWithRemoteTranslog = request.isPrimaryRelocation() == false && shard.isRemoteTranslogEnabled();
if (isReplicaRecoveryWithRemoteTranslog) {
return new RemoteStoreReplicaRecoverySourceHandler(
return new RemoteStorePeerRecoverySourceHandler(
shard,
recoveryTarget,
shard.getThreadPool(),
Expand All @@ -33,7 +35,7 @@ public RecoverySourceHandler create(
recoverySettings.getMaxConcurrentOperations()
);
} else {
return new DefaultRecoverySourceHandler(
return new LocalStorePeerRecoverySourceHandler(
shard,
recoveryTarget,
shard.getThreadPool(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@

/**
* This handler is used when peer recovery target is a remote store enabled replica.
*
* @opensearch.internal
*/
public class RemoteStoreReplicaRecoverySourceHandler extends RecoverySourceHandler {
public class RemoteStorePeerRecoverySourceHandler extends RecoverySourceHandler {

public RemoteStoreReplicaRecoverySourceHandler(
public RemoteStorePeerRecoverySourceHandler(
IndexShard shard,
RecoveryTargetHandler recoveryTarget,
ThreadPool threadPool,
Expand All @@ -44,11 +46,8 @@ public RemoteStoreReplicaRecoverySourceHandler(
@Override
protected void innerRecoveryToTarget(ActionListener<RecoveryResponse> listener, Consumer<Exception> onFailure) throws IOException {
// A replica of an index with remote translog does not require the translogs locally and keeps receiving the
// updated segments file on refresh, flushes, and merges. We plan to make the existing replication call to
// just no-op for primary term validation. Hence, there is essentially no writing to the indexWriter as well
// as the translogs locally. In recovery, we will resort to file-based recovery and leave the buffered writes
// to lucene (which are yet to be flushed as segments). Subsequent segment replication will take care of syncing
// the refreshed segment files to the replica.
// updated segments file on refresh, flushes, and merges. In recovery, here, only file-based recovery is performed
// and there is no translog replay done.

final StepListener<SendFileResult> sendFileStep = new StepListener<>();
final StepListener<TimeValue> prepareEngineStep = new StepListener<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,9 @@
import static org.mockito.Mockito.when;

/**
* This covers test cases for {@link RecoverySourceHandler} and {@link DefaultRecoverySourceHandler}.
* This covers test cases for {@link RecoverySourceHandler} and {@link LocalStorePeerRecoverySourceHandler}.
*/
public class DefaultRecoverySourceHandlerTests extends OpenSearchTestCase {
public class LocalStorePeerRecoverySourceHandlerTests extends OpenSearchTestCase {
private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings(
"index",
Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT).build()
Expand Down Expand Up @@ -218,7 +218,7 @@ public void writeFileChunk(
});
}
};
RecoverySourceHandler handler = new DefaultRecoverySourceHandler(
RecoverySourceHandler handler = new LocalStorePeerRecoverySourceHandler(
null,
new AsyncRecoveryTarget(target, recoveryExecutor),
threadPool,
Expand Down Expand Up @@ -299,7 +299,7 @@ public void indexTranslogOperations(
listener.onResponse(checkpointOnTarget.get());
}
};
RecoverySourceHandler handler = new DefaultRecoverySourceHandler(
RecoverySourceHandler handler = new LocalStorePeerRecoverySourceHandler(
shard,
new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()),
threadPool,
Expand Down Expand Up @@ -362,7 +362,7 @@ public void indexTranslogOperations(
}
}
};
RecoverySourceHandler handler = new DefaultRecoverySourceHandler(
RecoverySourceHandler handler = new LocalStorePeerRecoverySourceHandler(
shard,
new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()),
threadPool,
Expand Down Expand Up @@ -436,7 +436,7 @@ public void indexTranslogOperations(
Randomness.shuffle(operations);
List<Translog.Operation> skipOperations = randomSubsetOf(operations);
Translog.Snapshot snapshot = newTranslogSnapshot(operations, skipOperations);
RecoverySourceHandler handler = new DefaultRecoverySourceHandler(
RecoverySourceHandler handler = new LocalStorePeerRecoverySourceHandler(
shard,
new AsyncRecoveryTarget(target, recoveryExecutor),
threadPool,
Expand Down Expand Up @@ -555,7 +555,7 @@ public void writeFileChunk(
failedEngine.set(true);
return null;
}).when(mockShard).failShard(any(), any());
RecoverySourceHandler handler = new DefaultRecoverySourceHandler(
RecoverySourceHandler handler = new LocalStorePeerRecoverySourceHandler(
mockShard,
new AsyncRecoveryTarget(target, recoveryExecutor),
threadPool,
Expand Down Expand Up @@ -630,7 +630,7 @@ public void writeFileChunk(
failedEngine.set(true);
return null;
}).when(mockShard).failShard(any(), any());
RecoverySourceHandler handler = new DefaultRecoverySourceHandler(
RecoverySourceHandler handler = new LocalStorePeerRecoverySourceHandler(
mockShard,
new AsyncRecoveryTarget(target, recoveryExecutor),
threadPool,
Expand Down Expand Up @@ -683,7 +683,7 @@ public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Started() throws IOE
final AtomicBoolean phase1Called = new AtomicBoolean();
final AtomicBoolean prepareTargetForTranslogCalled = new AtomicBoolean();
final AtomicBoolean phase2Called = new AtomicBoolean();
final RecoverySourceHandler handler = new DefaultRecoverySourceHandler(
final RecoverySourceHandler handler = new LocalStorePeerRecoverySourceHandler(
shard,
mock(RecoveryTargetHandler.class),
threadPool,
Expand Down Expand Up @@ -795,7 +795,7 @@ public void writeFileChunk(
};
final int maxConcurrentChunks = between(1, 8);
final int chunkSize = between(1, 32);
final RecoverySourceHandler handler = new DefaultRecoverySourceHandler(
final RecoverySourceHandler handler = new LocalStorePeerRecoverySourceHandler(
shard,
recoveryTarget,
threadPool,
Expand Down Expand Up @@ -868,7 +868,7 @@ public void writeFileChunk(
};
final int maxConcurrentChunks = between(1, 4);
final int chunkSize = between(1, 16);
final RecoverySourceHandler handler = new DefaultRecoverySourceHandler(
final RecoverySourceHandler handler = new LocalStorePeerRecoverySourceHandler(
null,
new AsyncRecoveryTarget(recoveryTarget, recoveryExecutor),
threadPool,
Expand Down Expand Up @@ -976,7 +976,7 @@ public void cleanFiles(
}
};
final StartRecoveryRequest startRecoveryRequest = getStartRecoveryRequest();
final RecoverySourceHandler handler = new DefaultRecoverySourceHandler(
final RecoverySourceHandler handler = new LocalStorePeerRecoverySourceHandler(
shard,
recoveryTarget,
threadPool,
Expand Down Expand Up @@ -1015,7 +1015,7 @@ void createRetentionLease(long startingSeqNo, ActionListener<RetentionLease> lis
public void testVerifySeqNoStatsWhenRecoverWithSyncId() throws Exception {
IndexShard shard = mock(IndexShard.class);
when(shard.state()).thenReturn(IndexShardState.STARTED);
RecoverySourceHandler handler = new DefaultRecoverySourceHandler(
RecoverySourceHandler handler = new LocalStorePeerRecoverySourceHandler(
shard,
new TestRecoveryTargetHandler(),
threadPool,
Expand Down Expand Up @@ -1070,7 +1070,7 @@ private Store newStore(Path path) throws IOException {
}

private Store newStore(Path path, boolean checkIndex) throws IOException {
BaseDirectoryWrapper baseDirectoryWrapper = DefaultRecoverySourceHandlerTests.newFSDirectory(path);
BaseDirectoryWrapper baseDirectoryWrapper = LocalStorePeerRecoverySourceHandlerTests.newFSDirectory(path);
if (checkIndex == false) {
baseDirectoryWrapper.setCheckIndexOnClose(false); // don't run checkindex we might corrupt the index in these tests
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.replication.common.ReplicationType;

public class RemoteStoreReplicaRecoverySourceHandlerTests extends OpenSearchIndexLevelReplicationTestCase {
public class RemoteStorePeerRecoverySourceHandlerTests extends OpenSearchIndexLevelReplicationTestCase {

private static final Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -875,7 +875,7 @@ protected final void recoverUnstartedReplica(
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
);
recoverySettings.setChunkSize(new ByteSizeValue(fileChunkSizeInBytes));
final RecoverySourceHandler recovery = new RecoverySourceHandlerFactory().create(
final RecoverySourceHandler recovery = RecoverySourceHandlerFactory.create(
primary,
new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()),
request,
Expand Down

0 comments on commit 3e5e2ee

Please sign in to comment.