From e8a4210b28ba7bf3c18b202b1958fd2100a8b330 Mon Sep 17 00:00:00 2001 From: Rishikesh Pasham <62345295+Rishikesh1159@users.noreply.github.com> Date: Thu, 9 Mar 2023 19:29:51 -0800 Subject: [PATCH] [Segment Replication] Update RefreshPolicy.WAIT_UNTIL for replica shards with segment replication enabled to wait for replica refresh (#6464) * Initial draft PR for wait_until with segrep Signed-off-by: Rishikesh1159 * Refactor code and fix test failures. Signed-off-by: Rishikesh1159 * add comments and fix tests. Signed-off-by: Rishikesh1159 * Refactor code, address comments and fix test failures. Signed-off-by: Rishikesh1159 * Aplly spotless check Signed-off-by: Rishikesh1159 * Adress comments and add integ test. Signed-off-by: Rishikesh1159 * Address comments and fix failing tests. Signed-off-by: Rishikesh1159 * Fixing failing test. Signed-off-by: Rishikesh1159 * Remove unused code. Signed-off-by: Rishikesh1159 * Addressing comments and refactoring Signed-off-by: Rishikesh1159 * Adding max refreshlisteners limit that a replica shard can hold and force refresh. Signed-off-by: Rishikesh1159 * Changing assert message Signed-off-by: Rishikesh1159 * Fix call to release refresh listeners on replica shards. Signed-off-by: Rishikesh1159 * Fix call to release refresh listeners on replica shards. Signed-off-by: Rishikesh1159 * Address comments. Signed-off-by: Rishikesh1159 * Fixing compile errors. Signed-off-by: Rishikesh1159 * Spoltss Apply Signed-off-by: Rishikesh1159 --------- Signed-off-by: Rishikesh1159 --- .../replication/SegmentReplicationIT.java | 101 ++++++++++ .../action/bulk/TransportShardBulkAction.java | 16 +- .../TransportResyncReplicationAction.java | 2 +- .../replication/TransportWriteAction.java | 20 +- .../index/engine/NRTReplicationEngine.java | 15 +- .../engine/NRTReplicationReaderManager.java | 4 + .../index/seqno/RetentionLeaseSyncAction.java | 2 +- .../opensearch/index/shard/IndexShard.java | 18 +- .../index/shard/RefreshListeners.java | 176 +++++++++++++++++- ...rtWriteActionForIndexingPressureTests.java | 2 +- .../TransportWriteActionTests.java | 16 +- .../index/shard/IndexShardTests.java | 6 +- .../index/shard/RefreshListenersTests.java | 43 ++++- .../TransportWriteActionTestHelper.java | 2 +- ...enSearchIndexLevelReplicationTestCase.java | 6 +- 15 files changed, 384 insertions(+), 45 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 820fa17b77c48..c3a3e8c42c47f 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -9,6 +9,9 @@ package org.opensearch.indices.replication; import com.carrotsearch.randomizedtesting.RandomizedTest; +import org.opensearch.OpenSearchCorruptionException; +import org.opensearch.action.ActionFuture; +import org.opensearch.action.index.IndexResponse; import org.opensearch.action.support.WriteRequest; import org.opensearch.action.update.UpdateResponse; import org.opensearch.client.Requests; @@ -20,15 +23,18 @@ import org.opensearch.index.shard.IndexShard; import org.opensearch.indices.recovery.FileChunkRequest; import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.rest.RestStatus; import org.opensearch.test.BackgroundIndexer; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.transport.MockTransportService; import org.opensearch.transport.TransportService; +import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import static java.util.Arrays.asList; import static org.opensearch.index.query.QueryBuilders.matchQuery; @@ -577,4 +583,99 @@ public void testDropPrimaryDuringReplication() throws Exception { verifyStoreContent(); } } + + public void testWaitUntilRefresh() throws Exception { + final String primaryNode = internalCluster().startNode(); + createIndex(INDEX_NAME); + ensureYellowAndNoInitializingShards(INDEX_NAME); + final String replicaNode = internalCluster().startNode(); + ensureGreen(INDEX_NAME); + final int initialDocCount = scaledRandomIntBetween(6000, 7000); + final List> pendingIndexResponses = new ArrayList<>(); + IndexShard primaryShard = getIndexShard(primaryNode, INDEX_NAME); + IndexShard replicaShard = getIndexShard(replicaNode, INDEX_NAME); + + for (int i = 0; i < initialDocCount; i++) { + pendingIndexResponses.add( + client().prepareIndex(INDEX_NAME) + .setId(Integer.toString(i)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .setSource("field", "value" + i) + .execute() + ); + } + assertBusy(() -> { + assertTrue(pendingIndexResponses.stream().allMatch(response -> response.actionGet().status().equals(RestStatus.CREATED))); + assertEquals(primaryShard.getProcessedLocalCheckpoint(), replicaShard.getProcessedLocalCheckpoint()); + }, 1, TimeUnit.MINUTES); + assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setPreference("_only_local").setSize(0).get(), initialDocCount); + assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setPreference("_only_local").setSize(0).get(), initialDocCount); + } + + public void testWaitUntilWhenReplicaPromoted() throws Exception { + final String primaryNode = internalCluster().startNode(); + createIndex(INDEX_NAME); + ensureYellowAndNoInitializingShards(INDEX_NAME); + final String replicaNode = internalCluster().startNode(); + final CountDownLatch waitForReplication = new CountDownLatch(1); + // Mock transport service to add behaviour of throwing corruption exception during segment replication process. + MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance( + TransportService.class, + primaryNode + )); + mockTransportService.addSendBehavior( + internalCluster().getInstance(TransportService.class, replicaNode), + (connection, requestId, action, request, options) -> { + if (action.equals(SegmentReplicationTargetService.Actions.FILE_CHUNK)) { + try { + waitForReplication.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + throw new OpenSearchCorruptionException("expected"); + } + connection.sendRequest(requestId, action, request, options); + } + ); + ensureGreen(INDEX_NAME); + final int initialDocCount = scaledRandomIntBetween(700, 5000); + final List> pendingIndexResponses = new ArrayList<>(); + for (int i = 0; i < initialDocCount; i++) { + pendingIndexResponses.add( + client().prepareIndex(INDEX_NAME) + .setId(Integer.toString(i)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .setSource("field", "value" + i) + .execute() + ); + } + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode)); + final ShardRouting replicaShardRouting = getShardRoutingForNodeName(replicaNode); + assertNotNull(replicaShardRouting); + waitForReplication.countDown(); + assertBusy(() -> { + assertTrue(replicaShardRouting + " should be promoted as a primary", replicaShardRouting.primary()); + client().admin().indices().prepareRefresh().execute().actionGet(); + assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone)); + }, 1, TimeUnit.MINUTES); + int successfulDocCount = 0; + for (ActionFuture response : pendingIndexResponses) { + try { + IndexResponse indexResponse = response.actionGet(); + successfulDocCount++; + } catch (Exception e) { + logger.trace("Failed to index Doc", e); + } + } + assertTrue( + client(replicaNode).prepareSearch(INDEX_NAME) + .setPreference("_only_local") + .setSize(0) + .get() + .getHits() + .getTotalHits().value >= successfulDocCount + ); + + } + } diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java index 4fdf69f3bd7cd..942fa783ade97 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java @@ -780,8 +780,8 @@ static BulkItemResponse processUpdateResponse( @Override protected void dispatchedShardOperationOnReplica(BulkShardRequest request, IndexShard replica, ActionListener listener) { ActionListener.completeWith(listener, () -> { - final Translog.Location location = performOnReplica(request, replica); - return new WriteReplicaResult<>(request, location, null, replica, logger); + final Tuple tuple = performOnReplica(request, replica); + return new WriteReplicaResult<>(request, tuple.v1(), tuple.v2(), null, replica, logger); }); } @@ -790,8 +790,10 @@ protected long replicaOperationSize(BulkShardRequest request) { return request.ramBytesUsed(); } - public static Translog.Location performOnReplica(BulkShardRequest request, IndexShard replica) throws Exception { + public static Tuple performOnReplica(BulkShardRequest request, IndexShard replica) throws Exception { Translog.Location location = null; + long maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED; + final boolean isSegRepEnabled = replica.indexSettings().isSegRepEnabled(); for (int i = 0; i < request.items().length; i++) { final BulkItemRequest item = request.items()[i]; final BulkItemResponse response = item.getPrimaryResponse(); @@ -813,17 +815,23 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index primaryTerm, response.getFailure().getMessage() ); + if (isSegRepEnabled) { + maxSeqNo = Math.max(response.getFailure().getSeqNo(), maxSeqNo); + } } else { if (response.getResponse().getResult() == DocWriteResponse.Result.NOOP) { continue; // ignore replication as it's a noop } assert response.getResponse().getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO; operationResult = performOpOnReplica(response.getResponse(), item.request(), replica); + if (isSegRepEnabled) { + maxSeqNo = Math.max(response.getResponse().getSeqNo(), maxSeqNo); + } } assert operationResult != null : "operation result must never be null when primary response has no failure"; location = syncOperationResultOrThrow(operationResult, location); } - return location; + return new Tuple<>(location, maxSeqNo); } private static Engine.Result performOpOnReplica( diff --git a/server/src/main/java/org/opensearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/opensearch/action/resync/TransportResyncReplicationAction.java index 2249c7cace943..55cab0761b3f1 100644 --- a/server/src/main/java/org/opensearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/opensearch/action/resync/TransportResyncReplicationAction.java @@ -169,7 +169,7 @@ protected void dispatchedShardOperationOnReplica( ) { ActionListener.completeWith(listener, () -> { Translog.Location location = performOnReplica(request, replica); - return new WriteReplicaResult<>(request, location, null, replica, logger); + return new WriteReplicaResult<>(request, location, null, null, replica, logger); }); } diff --git a/server/src/main/java/org/opensearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/opensearch/action/support/replication/TransportWriteAction.java index 26b15195cd8fc..80dfb6d0c9cf9 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/opensearch/action/support/replication/TransportWriteAction.java @@ -307,7 +307,7 @@ public void runPostReplicationActions(ActionListener listener) { * We call this after replication because this might wait for a refresh and that can take a while. * This way we wait for the refresh in parallel on the primary and on the replica. */ - new AsyncAfterWriteAction(primary, replicaRequest, location, new RespondingWriteResult() { + new AsyncAfterWriteAction(primary, replicaRequest, location, null, new RespondingWriteResult() { @Override public void onSuccess(boolean forcedRefresh) { finalResponseIfSuccessful.setForcedRefresh(forcedRefresh); @@ -329,20 +329,23 @@ public void onFailure(Exception ex) { * @opensearch.internal */ public static class WriteReplicaResult> extends ReplicaResult { - public final Location location; + private final Location location; private final ReplicaRequest request; private final IndexShard replica; private final Logger logger; + private final Long maxSeqNo; public WriteReplicaResult( ReplicaRequest request, - @Nullable Location location, + @Nullable final Translog.Location location, + @Nullable final Long maxSeqNo, @Nullable Exception operationFailure, IndexShard replica, Logger logger ) { super(operationFailure); this.location = location; + this.maxSeqNo = maxSeqNo; this.request = request; this.replica = replica; this.logger = logger; @@ -353,7 +356,7 @@ public void runPostReplicaActions(ActionListener listener) { if (finalFailure != null) { listener.onFailure(finalFailure); } else { - new AsyncAfterWriteAction(replica, request, location, new RespondingWriteResult() { + new AsyncAfterWriteAction(replica, request, location, maxSeqNo, new RespondingWriteResult() { @Override public void onSuccess(boolean forcedRefresh) { listener.onResponse(null); @@ -403,7 +406,6 @@ interface RespondingWriteResult { * @opensearch.internal */ static final class AsyncAfterWriteAction { - private final Location location; private final boolean waitUntilRefresh; private final boolean sync; private final AtomicInteger pendingOps = new AtomicInteger(1); @@ -414,10 +416,15 @@ static final class AsyncAfterWriteAction { private final WriteRequest request; private final Logger logger; + private final Location location; + + private final Long maxSeqNo; + AsyncAfterWriteAction( final IndexShard indexShard, final WriteRequest request, @Nullable final Translog.Location location, + @Nullable final Long maxSeqNo, final RespondingWriteResult respond, final Logger logger ) { @@ -443,6 +450,7 @@ static final class AsyncAfterWriteAction { this.waitUntilRefresh = waitUntilRefresh; this.respond = respond; this.location = location; + this.maxSeqNo = maxSeqNo; if ((sync = indexShard.getTranslogDurability() == Translog.Durability.REQUEST && location != null)) { pendingOps.incrementAndGet(); } @@ -474,7 +482,7 @@ void run() { maybeFinish(); if (waitUntilRefresh) { assert pendingOps.get() > 0; - indexShard.addRefreshListener(location, forcedRefresh -> { + indexShard.addRefreshListener(location, maxSeqNo, forcedRefresh -> { if (forcedRefresh) { logger.warn("block until refresh ran out of slots and forced a refresh: [{}]", request); } diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index 5fd49813bb849..0ee58ab3cae28 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -309,11 +309,22 @@ public List segments(boolean verbose) { } @Override - public void refresh(String source) throws EngineException {} + public void refresh(String source) throws EngineException { + maybeRefresh(source); + } @Override public boolean maybeRefresh(String source) throws EngineException { - return false; + try { + return readerManager.maybeRefresh(); + } catch (IOException e) { + try { + failEngine("refresh failed source[" + source + "]", e); + } catch (Exception inner) { + e.addSuppressed(inner); + } + throw new RefreshFailedEngineException(shardId, e); + } } @Override diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java index 8fbb24720aedc..00748acb1d76d 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java @@ -51,6 +51,10 @@ public class NRTReplicationReaderManager extends OpenSearchReaderManager { @Override protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader referenceToRefresh) throws IOException { Objects.requireNonNull(referenceToRefresh); + // checks if an actual refresh (change in segments) happened + if (unwrapStandardReader(referenceToRefresh).getSegmentInfos().version == currentInfos.version) { + return null; + } final List subs = new ArrayList<>(); final StandardDirectoryReader standardDirectoryReader = unwrapStandardReader(referenceToRefresh); for (LeafReaderContext ctx : standardDirectoryReader.leaves()) { diff --git a/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseSyncAction.java b/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseSyncAction.java index afcf5c6766194..c5beb22c6caa6 100644 --- a/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseSyncAction.java +++ b/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseSyncAction.java @@ -202,7 +202,7 @@ protected void dispatchedShardOperationOnReplica(Request request, IndexShard rep Objects.requireNonNull(replica); replica.updateRetentionLeasesOnReplica(request.getRetentionLeases()); replica.persistRetentionLeases(); - return new WriteReplicaResult<>(request, null, null, replica, getLogger()); + return new WriteReplicaResult<>(request, null, null, null, replica, getLogger()); }); } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 546affcf48bd2..2d4d97f191502 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -3993,7 +3993,8 @@ private RefreshListeners buildRefreshListeners() { () -> refresh("too_many_listeners"), logger, threadPool.getThreadContext(), - externalRefreshMetric + externalRefreshMetric, + this::getProcessedLocalCheckpoint ); } @@ -4136,7 +4137,7 @@ public final void awaitShardSearchActive(Consumer listener) { markSearcherAccessed(); // move the shard into non-search idle final Translog.Location location = pendingRefreshLocation.get(); if (location != null) { - addRefreshListener(location, (b) -> { + addRefreshListener(location, null, (b) -> { pendingRefreshLocation.compareAndSet(location, null); listener.accept(true); }); @@ -4146,13 +4147,14 @@ public final void awaitShardSearchActive(Consumer listener) { } /** - * Add a listener for refreshes. + * Add a listener for refreshes. Only on Segment replication enabled replica shards we listen for maxSeqNo. In all other cases we listen for translog location * - * @param location the location to listen for + * @param location the translog location to listen for on a refresh + * @param maxSeqNo the Sequence Number to listen for on a refresh * @param listener for the refresh. Called with true if registering the listener ran it out of slots and forced a refresh. Called with * false otherwise. */ - public void addRefreshListener(Translog.Location location, Consumer listener) { + public void addRefreshListener(Translog.Location location, Long maxSeqNo, Consumer listener) { final boolean readAllowed; if (isReadAllowed()) { readAllowed = true; @@ -4165,7 +4167,11 @@ public void addRefreshListener(Translog.Location location, Consumer lis } } if (readAllowed) { - refreshListeners.addOrNotify(location, listener); + if (indexSettings.isSegRepEnabled() && shardRouting.primary() == false) { + refreshListeners.addOrNotify(maxSeqNo, listener); + } else { + refreshListeners.addOrNotify(location, listener); + } } else { // we're not yet ready fo ready for reads, just ignore refresh cycles listener.accept(false); diff --git a/server/src/main/java/org/opensearch/index/shard/RefreshListeners.java b/server/src/main/java/org/opensearch/index/shard/RefreshListeners.java index 7dbbcbb2d7d20..b47f08f7b10d0 100644 --- a/server/src/main/java/org/opensearch/index/shard/RefreshListeners.java +++ b/server/src/main/java/org/opensearch/index/shard/RefreshListeners.java @@ -39,6 +39,7 @@ import org.opensearch.common.metrics.MeanMetric; import org.opensearch.common.util.concurrent.RunOnce; import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.translog.Translog; import java.io.Closeable; @@ -47,6 +48,7 @@ import java.util.List; import java.util.function.Consumer; import java.util.function.IntSupplier; +import java.util.function.LongSupplier; import java.util.function.Supplier; import static java.util.Objects.requireNonNull; @@ -66,6 +68,8 @@ public final class RefreshListeners implements ReferenceManager.RefreshListener, private final ThreadContext threadContext; private final MeanMetric refreshMetric; + private final LongSupplier getProcessedCheckpoint; + /** * Time in nanosecond when beforeRefresh() is called. Used for calculating refresh metrics. */ @@ -90,29 +94,38 @@ public final class RefreshListeners implements ReferenceManager.RefreshListener, * We never set this to non-null while closed it {@code true}. */ private volatile List>> refreshListeners = null; + + private volatile List>> seqNoRefreshListeners = null; /** * The translog location that was last made visible by a refresh. */ private volatile Translog.Location lastRefreshedLocation; + private volatile Long lastMaxSeqNo = SequenceNumbers.NO_OPS_PERFORMED; + public RefreshListeners( final IntSupplier getMaxRefreshListeners, final Runnable forceRefresh, final Logger logger, final ThreadContext threadContext, - final MeanMetric refreshMetric + final MeanMetric refreshMetric, + final LongSupplier getProcessedLocalCheckpoint ) { this.getMaxRefreshListeners = getMaxRefreshListeners; this.forceRefresh = forceRefresh; this.logger = logger; this.threadContext = threadContext; this.refreshMetric = refreshMetric; + this.getProcessedCheckpoint = getProcessedLocalCheckpoint; } /** * Force-refreshes newly added listeners and forces a refresh if there are currently listeners registered. See {@link #refreshForcers}. */ public Releasable forceRefreshes() { + if (seqNoRefreshListeners != null) { + releaseSeqNoRefreshListeners(); + } synchronized (this) { assert refreshForcers >= 0; refreshForcers += 1; @@ -135,6 +148,17 @@ public Releasable forceRefreshes() { return () -> runOnce.run(); } + // Release refresh listeners on replica shard with segment replication enabled + private void releaseSeqNoRefreshListeners() { + List>> oldSeqNoListeners; + synchronized (this) { + oldSeqNoListeners = seqNoRefreshListeners; + seqNoRefreshListeners = null; + } + // Fire any listeners we might have had + fireSeqNoListeners(oldSeqNoListeners); + } + /** * Add a listener for refreshes, calling it immediately if the location is already visible. If this runs out of listener slots then it * forces a refresh and calls the listener immediately as well. @@ -160,13 +184,7 @@ public boolean addOrNotify(Translog.Location location, Consumer listene List>> listeners = refreshListeners; final int maxRefreshes = getMaxRefreshListeners.getAsInt(); if (refreshForcers == 0 && maxRefreshes > 0 && (listeners == null || listeners.size() < maxRefreshes)) { - ThreadContext.StoredContext storedContext = threadContext.newStoredContext(true); - Consumer contextPreservingListener = forced -> { - try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { - storedContext.restore(); - listener.accept(forced); - } - }; + Consumer contextPreservingListener = getContextListener(listener); if (listeners == null) { listeners = new ArrayList<>(); } @@ -182,16 +200,69 @@ public boolean addOrNotify(Translog.Location location, Consumer listene return true; } + /** + * Add a listener for refreshes, calling it immediately if the max sequence number is already visible. If this runs out of listener slots then it + * forces a refresh and calls the listener immediately as well. + * + * @param maxSeqNo the Sequence number to listen on segment replication enabled replica shards + * @param listener for the refresh.It waits until max sequence number is visible. Called with true if registering the listener ran it out of slots and forced a refresh. Called with + * false otherwise. + * @return did we call the listener (true) or register the listener to call later (false)? + */ + public boolean addOrNotify(Long maxSeqNo, Consumer listener) { + requireNonNull(listener, "listener cannot be null"); + requireNonNull(maxSeqNo, "maxSeqNo cannot be null"); + + if (lastMaxSeqNo != SequenceNumbers.NO_OPS_PERFORMED && lastMaxSeqNo >= maxSeqNo) { + listener.accept(false); + return true; + } + synchronized (this) { + if (closed) { + throw new IllegalStateException("can't wait for refresh on a closed index"); + } + List>> listeners = seqNoRefreshListeners; + final int maxRefreshes = getMaxRefreshListeners.getAsInt(); + if (refreshForcers == 0 && maxRefreshes > 0 && (listeners == null || listeners.size() < maxRefreshes)) { + Consumer contextPreservingListener = getContextListener(listener); + if (listeners == null) { + listeners = new ArrayList<>(); + } + listeners.add(new Tuple<>(maxSeqNo, contextPreservingListener)); + seqNoRefreshListeners = listeners; + return false; + } + } + // No free slot so release existing refresh listeners and call the listener in this thread + releaseSeqNoRefreshListeners(); + listener.accept(true); + return true; + } + + private Consumer getContextListener(Consumer listener) { + ThreadContext.StoredContext storedContext = threadContext.newStoredContext(true); + return forced -> { + try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { + storedContext.restore(); + listener.accept(forced); + } + }; + } + @Override public void close() throws IOException { List>> oldListeners; + List>> oldSeqNoListeners; synchronized (this) { oldListeners = refreshListeners; + oldSeqNoListeners = seqNoRefreshListeners; refreshListeners = null; + seqNoRefreshListeners = null; closed = true; } // Fire any listeners we might have had fireListeners(oldListeners); + fireSeqNoListeners(oldSeqNoListeners); } /** @@ -199,6 +270,9 @@ public void close() throws IOException { */ public boolean refreshNeeded() { // A null list doesn't need a refresh. If we're closed we don't need a refresh either. + if (seqNoRefreshListeners != null) { + return false == closed; + } return refreshListeners != null && false == closed; } @@ -208,8 +282,12 @@ public boolean refreshNeeded() { public int pendingCount() { // No need to synchronize here because we're doing a single volatile read List>> listeners = refreshListeners; + List>> seqNoListeners = seqNoRefreshListeners; + int count = 0; // A null list means we haven't accumulated any listeners. Otherwise we need the size. - return listeners == null ? 0 : listeners.size(); + count += listeners == null ? 0 : listeners.size(); + count += seqNoListeners == null ? 0 : seqNoListeners.size(); + return count; } /** @@ -253,9 +331,73 @@ public void afterRefresh(boolean didRefresh) throws IOException { * assignment into the synchronized block below and double checking lastRefreshedLocation in addOrNotify's synchronized block but * that doesn't seem worth it given that we already skip this process early if there aren't any listeners to iterate. */ lastRefreshedLocation = currentRefreshLocation; + + // Checks if there are any sequence number based refresh listeners that can be fired or preserved. + checkSeqNoRefreshListeners(); + + // Checks if there are any translog location based refresh listeners that can be fired or preserved. + checkRefreshListeners(); + } + + private void checkSeqNoRefreshListeners() { + /* Grab the current Sequence Number refresh listeners and replace them with null while synchronized. Any listeners that come in after this won't be + * in the list we iterate over and very likely won't be candidates for refresh anyway because we've already moved the + * lastRefreshedLocation. */ + + List>> candidates; + synchronized (this) { + candidates = seqNoRefreshListeners; + // No listeners to check so just bail early + if (candidates == null) { + return; + } + seqNoRefreshListeners = null; + } + // Iterate the list of listeners, copying the listeners to fire to one list and those to preserve to another list. + List>> listenersToFire = null; + List>> preservedListeners = null; + + for (Tuple> tuple : candidates) { + Long seqNo = tuple.v1(); + if (seqNo <= getProcessedCheckpoint.getAsLong()) { + if (listenersToFire == null) { + listenersToFire = new ArrayList<>(); + } + listenersToFire.add(tuple); + } else { + if (preservedListeners == null) { + preservedListeners = new ArrayList<>(); + } + preservedListeners.add(tuple); + } + } + /* Now deal with the listeners that it isn't time yet to fire. We need to do this under lock so we don't miss a concurrent close or + * newly registered listener. If we're not closed we just add the listeners to the list of listeners we check next time. If we are + * closed we fire the listeners even though it isn't time for them. */ + if (preservedListeners != null) { + synchronized (this) { + if (seqNoRefreshListeners == null) { + if (closed) { + listenersToFire.addAll(preservedListeners); + } else { + seqNoRefreshListeners = preservedListeners; + } + } else { + assert closed == false : "Can't be closed and have non-null refreshListeners"; + seqNoRefreshListeners.addAll(preservedListeners); + } + } + } + // Lastly, fire the listeners that are ready + fireSeqNoListeners(listenersToFire); + } + + private void checkRefreshListeners() { + /* Grab the current refresh listeners and replace them with null while synchronized. Any listeners that come in after this won't be * in the list we iterate over and very likely won't be candidates for refresh anyway because we've already moved the * lastRefreshedLocation. */ + List>> candidates; synchronized (this) { candidates = refreshListeners; @@ -268,6 +410,7 @@ public void afterRefresh(boolean didRefresh) throws IOException { // Iterate the list of listeners, copying the listeners to fire to one list and those to preserve to another list. List>> listenersToFire = null; List>> preservedListeners = null; + for (Tuple> tuple : candidates) { Translog.Location location = tuple.v1(); if (location.compareTo(currentRefreshLocation) <= 0) { @@ -317,4 +460,19 @@ private void fireListeners(final List } } } + + /** + * Fire Sequence number based replica shard listeners. Does nothing if the list of listeners is null. + */ + private void fireSeqNoListeners(final List>> listenersToFire) { + if (listenersToFire != null) { + for (final Tuple> listener : listenersToFire) { + try { + listener.v2().accept(false); + } catch (final Exception e) { + logger.warn("error firing refresh listener", e); + } + } + } + } } diff --git a/server/src/test/java/org/opensearch/action/support/replication/TransportWriteActionForIndexingPressureTests.java b/server/src/test/java/org/opensearch/action/support/replication/TransportWriteActionForIndexingPressureTests.java index 3aae1ee40818d..5255cb72253b0 100644 --- a/server/src/test/java/org/opensearch/action/support/replication/TransportWriteActionForIndexingPressureTests.java +++ b/server/src/test/java/org/opensearch/action/support/replication/TransportWriteActionForIndexingPressureTests.java @@ -423,7 +423,7 @@ protected void dispatchedShardOperationOnPrimary( @Override protected void dispatchedShardOperationOnReplica(TestRequest request, IndexShard replica, ActionListener listener) { - ActionListener.completeWith(listener, () -> new WriteReplicaResult<>(request, location, null, replica, logger)); + ActionListener.completeWith(listener, () -> new WriteReplicaResult<>(request, location, null, null, replica, logger)); } } diff --git a/server/src/test/java/org/opensearch/action/support/replication/TransportWriteActionTests.java b/server/src/test/java/org/opensearch/action/support/replication/TransportWriteActionTests.java index 137aca4966936..2e27616399d02 100644 --- a/server/src/test/java/org/opensearch/action/support/replication/TransportWriteActionTests.java +++ b/server/src/test/java/org/opensearch/action/support/replication/TransportWriteActionTests.java @@ -161,7 +161,7 @@ public void testPrimaryNoRefreshCall() throws Exception { assertNotNull(listener.response); assertNull(listener.failure); verify(indexShard, never()).refresh(any()); - verify(indexShard, never()).addRefreshListener(any(), any()); + verify(indexShard, never()).addRefreshListener(any(), any(), any()); })); } @@ -177,7 +177,7 @@ public void testReplicaNoRefreshCall() throws Exception { assertNotNull(listener.response); assertNull(listener.failure); verify(indexShard, never()).refresh(any()); - verify(indexShard, never()).addRefreshListener(any(), any()); + verify(indexShard, never()).addRefreshListener(any(), any(), any()); } public void testPrimaryImmediateRefresh() throws Exception { @@ -191,7 +191,7 @@ public void testPrimaryImmediateRefresh() throws Exception { assertNull(listener.failure); assertTrue(listener.response.forcedRefresh); verify(indexShard).refresh("refresh_flag_index"); - verify(indexShard, never()).addRefreshListener(any(), any()); + verify(indexShard, never()).addRefreshListener(any(), any(), any()); })); } @@ -207,7 +207,7 @@ public void testReplicaImmediateRefresh() throws Exception { assertNotNull(listener.response); assertNull(listener.failure); verify(indexShard).refresh("refresh_flag_index"); - verify(indexShard, never()).addRefreshListener(any(), any()); + verify(indexShard, never()).addRefreshListener(any(), any(), any()); } public void testPrimaryWaitForRefresh() throws Exception { @@ -223,7 +223,7 @@ public void testPrimaryWaitForRefresh() throws Exception { @SuppressWarnings({ "unchecked", "rawtypes" }) ArgumentCaptor> refreshListener = ArgumentCaptor.forClass((Class) Consumer.class); verify(indexShard, never()).refresh(any()); - verify(indexShard).addRefreshListener(any(), refreshListener.capture()); + verify(indexShard).addRefreshListener(any(), any(), refreshListener.capture()); // Now we can fire the listener manually and we'll get a response boolean forcedRefresh = randomBoolean(); @@ -247,7 +247,7 @@ public void testReplicaWaitForRefresh() throws Exception { @SuppressWarnings({ "unchecked", "rawtypes" }) ArgumentCaptor> refreshListener = ArgumentCaptor.forClass((Class) Consumer.class); verify(indexShard, never()).refresh(any()); - verify(indexShard).addRefreshListener(any(), refreshListener.capture()); + verify(indexShard).addRefreshListener(any(), any(), refreshListener.capture()); // Now we can fire the listener manually and we'll get a response boolean forcedRefresh = randomBoolean(); @@ -531,9 +531,9 @@ protected void dispatchedShardOperationOnReplica(TestRequest request, IndexShard ActionListener.completeWith(listener, () -> { final WriteReplicaResult replicaResult; if (withDocumentFailureOnReplica) { - replicaResult = new WriteReplicaResult<>(request, null, new RuntimeException("simulated"), replica, logger); + replicaResult = new WriteReplicaResult<>(request, null, null, new RuntimeException("simulated"), replica, logger); } else { - replicaResult = new WriteReplicaResult<>(request, location, null, replica, logger); + replicaResult = new WriteReplicaResult<>(request, location, null, null, replica, logger); } return replicaResult; }); diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index c84d03a8454d9..2d116c02d8c50 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -3266,7 +3266,7 @@ public void testRefreshListenersDuringPeerRecovery() throws IOException { indexDoc(primary, "_doc", "0", "{\"foo\" : \"bar\"}"); Consumer assertListenerCalled = shard -> { AtomicBoolean called = new AtomicBoolean(); - shard.addRefreshListener(null, b -> { + shard.addRefreshListener(null, null, b -> { assertFalse(b); called.set(true); }); @@ -4179,7 +4179,7 @@ public void testRefreshIsNeededWithRefreshListeners() throws IOException, Interr assertTrue(primary.scheduledRefresh()); Engine.IndexResult doc = indexDoc(primary, "_doc", "1", "{\"foo\" : \"bar\"}"); CountDownLatch latch = new CountDownLatch(1); - primary.addRefreshListener(doc.getTranslogLocation(), r -> latch.countDown()); + primary.addRefreshListener(doc.getTranslogLocation(), null, r -> latch.countDown()); assertEquals(1, latch.getCount()); assertTrue(primary.getEngine().refreshNeeded()); assertTrue(primary.scheduledRefresh()); @@ -4191,7 +4191,7 @@ public void testRefreshIsNeededWithRefreshListeners() throws IOException, Interr doc = indexDoc(primary, "_doc", "2", "{\"foo\" : \"bar\"}"); CountDownLatch latch1 = new CountDownLatch(1); - primary.addRefreshListener(doc.getTranslogLocation(), r -> latch1.countDown()); + primary.addRefreshListener(doc.getTranslogLocation(), null, r -> latch1.countDown()); assertEquals(1, latch1.getCount()); assertTrue(primary.getEngine().refreshNeeded()); assertTrue(primary.scheduledRefresh()); diff --git a/server/src/test/java/org/opensearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/opensearch/index/shard/RefreshListenersTests.java index dbb0207b4f1b7..34bcfd44faf07 100644 --- a/server/src/test/java/org/opensearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RefreshListenersTests.java @@ -118,7 +118,8 @@ public void setupListeners() throws Exception { () -> engine.refresh("too-many-listeners"), logger, threadPool.getThreadContext(), - refreshMetric + refreshMetric, + () -> 10L ); IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("index", Settings.EMPTY); @@ -210,6 +211,46 @@ public void testAfterRefresh() throws Exception { assertEquals(0, listeners.pendingCount()); } + public void testSeqNoRefreshListenersReleasedOnForceRefresh() throws IOException { + assertEquals(0, listeners.pendingCount()); + Engine.IndexResult indexOne = index("1"); + Engine.IndexResult indexTwo = index("2"); + Engine.IndexResult indexThree = index("3"); + DummyRefreshListener listenerOne = new DummyRefreshListener(); + DummyRefreshListener listenerTwo = new DummyRefreshListener(); + DummyRefreshListener listenerThree = new DummyRefreshListener(); + + // Add two listeners + listeners.addOrNotify(indexOne.getSeqNo(), listenerOne); + listeners.addOrNotify(indexTwo.getSeqNo(), listenerTwo); + + // verify that two listeners are added successfully + assertEquals(2, listeners.pendingCount()); + + // now Force refresh which would fire all seqNO listeners and block addition of any new seqNo Listeners. + listeners.forceRefreshes(); + + // Add new listener and verify this listener is not added. + listeners.addOrNotify(indexThree.getSeqNo(), listenerThree); + assertEquals(0, listeners.pendingCount()); + + } + + public void testSeqNoRefreshListener() throws IOException { + assertEquals(0, listeners.pendingCount()); + Engine.IndexResult index = index("1"); + DummyRefreshListener listener = new DummyRefreshListener(); + + // Add new listener + listeners.addOrNotify(index.getSeqNo(), listener); + assertEquals(1, listeners.pendingCount()); + + // Refresh and verify that listener is fired. + engine.refresh("test"); + assertEquals(0, listeners.pendingCount()); + + } + public void testContextIsPreserved() throws IOException, InterruptedException { assertEquals(0, listeners.pendingCount()); Engine.IndexResult index = index("1"); diff --git a/test/framework/src/main/java/org/opensearch/action/support/replication/TransportWriteActionTestHelper.java b/test/framework/src/main/java/org/opensearch/action/support/replication/TransportWriteActionTestHelper.java index 8fb95726215f3..c01460982a3b1 100644 --- a/test/framework/src/main/java/org/opensearch/action/support/replication/TransportWriteActionTestHelper.java +++ b/test/framework/src/main/java/org/opensearch/action/support/replication/TransportWriteActionTestHelper.java @@ -59,7 +59,7 @@ public void onFailure(Exception ex) { throw new AssertionError(ex); } }; - new TransportWriteAction.AsyncAfterWriteAction(indexShard, request, location, writerResult, logger).run(); + new TransportWriteAction.AsyncAfterWriteAction(indexShard, request, location, null, writerResult, logger).run(); try { latch.await(); } catch (InterruptedException e) { diff --git a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java index 8df57ccad85cc..9e0d28ca4b819 100644 --- a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java @@ -73,6 +73,7 @@ import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.TestShardRouting; import org.opensearch.common.collect.Iterators; +import org.opensearch.common.collect.Tuple; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.lease.Releasable; @@ -1021,10 +1022,11 @@ private void executeShardBulkOnReplica( request ); final Translog.Location location; + final Tuple tuple; try (Releasable ignored = permitAcquiredFuture.actionGet()) { - location = TransportShardBulkAction.performOnReplica(request, replica); + tuple = TransportShardBulkAction.performOnReplica(request, replica); } - TransportWriteActionTestHelper.performPostWriteActions(replica, request, location, logger); + TransportWriteActionTestHelper.performPostWriteActions(replica, request, tuple.v1(), logger); } /**