diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 9db28b12443d0..354f48e973a4c 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -9,9 +9,6 @@ package org.opensearch.indices.replication; import com.carrotsearch.randomizedtesting.RandomizedTest; -import org.opensearch.OpenSearchCorruptionException; -import org.opensearch.action.ActionFuture; -import org.opensearch.action.index.IndexResponse; import org.opensearch.action.support.WriteRequest; import org.opensearch.action.update.UpdateResponse; import org.opensearch.client.Requests; @@ -24,7 +21,6 @@ import org.opensearch.index.shard.IndexShard; import org.opensearch.indices.recovery.FileChunkRequest; import org.opensearch.indices.replication.common.ReplicationType; -import org.opensearch.rest.RestStatus; import org.opensearch.test.BackgroundIndexer; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; @@ -35,7 +31,6 @@ import java.util.List; import java.util.Set; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import static java.util.Arrays.asList; import static org.opensearch.index.query.QueryBuilders.matchQuery; @@ -605,99 +600,4 @@ public void testDropPrimaryDuringReplication() throws Exception { verifyStoreContent(); } } - - public void testWaitUntilRefresh() throws Exception { - final String primaryNode = internalCluster().startNode(); - createIndex(INDEX_NAME); - ensureYellowAndNoInitializingShards(INDEX_NAME); - final String replicaNode = internalCluster().startNode(); - ensureGreen(INDEX_NAME); - final int initialDocCount = scaledRandomIntBetween(6000, 7000); - final List> pendingIndexResponses = new ArrayList<>(); - IndexShard primaryShard = getIndexShard(primaryNode, INDEX_NAME); - IndexShard replicaShard = getIndexShard(replicaNode, INDEX_NAME); - - for (int i = 0; i < initialDocCount; i++) { - pendingIndexResponses.add( - client().prepareIndex(INDEX_NAME) - .setId(Integer.toString(i)) - .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) - .setSource("field", "value" + i) - .execute() - ); - } - assertBusy(() -> { - assertTrue(pendingIndexResponses.stream().allMatch(response -> response.actionGet().status().equals(RestStatus.CREATED))); - assertEquals(primaryShard.getProcessedLocalCheckpoint(), replicaShard.getProcessedLocalCheckpoint()); - }, 1, TimeUnit.MINUTES); - assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setPreference("_only_local").setSize(0).get(), initialDocCount); - assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setPreference("_only_local").setSize(0).get(), initialDocCount); - } - - public void testWaitUntilWhenReplicaPromoted() throws Exception { - final String primaryNode = internalCluster().startNode(); - createIndex(INDEX_NAME); - ensureYellowAndNoInitializingShards(INDEX_NAME); - final String replicaNode = internalCluster().startNode(); - final CountDownLatch waitForReplication = new CountDownLatch(1); - // Mock transport service to add behaviour of throwing corruption exception during segment replication process. - MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance( - TransportService.class, - primaryNode - )); - mockTransportService.addSendBehavior( - internalCluster().getInstance(TransportService.class, replicaNode), - (connection, requestId, action, request, options) -> { - if (action.equals(SegmentReplicationTargetService.Actions.FILE_CHUNK)) { - try { - waitForReplication.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - throw new OpenSearchCorruptionException("expected"); - } - connection.sendRequest(requestId, action, request, options); - } - ); - ensureGreen(INDEX_NAME); - final int initialDocCount = scaledRandomIntBetween(700, 5000); - final List> pendingIndexResponses = new ArrayList<>(); - for (int i = 0; i < initialDocCount; i++) { - pendingIndexResponses.add( - client().prepareIndex(INDEX_NAME) - .setId(Integer.toString(i)) - .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) - .setSource("field", "value" + i) - .execute() - ); - } - internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode)); - final ShardRouting replicaShardRouting = getShardRoutingForNodeName(replicaNode); - assertNotNull(replicaShardRouting); - waitForReplication.countDown(); - assertBusy(() -> { - assertTrue(replicaShardRouting + " should be promoted as a primary", replicaShardRouting.primary()); - client().admin().indices().prepareRefresh().execute().actionGet(); - assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone)); - }, 1, TimeUnit.MINUTES); - int successfulDocCount = 0; - for (ActionFuture response : pendingIndexResponses) { - try { - IndexResponse indexResponse = response.actionGet(); - successfulDocCount++; - } catch (Exception e) { - logger.trace("Failed to index Doc", e); - } - } - assertTrue( - client(replicaNode).prepareSearch(INDEX_NAME) - .setPreference("_only_local") - .setSize(0) - .get() - .getHits() - .getTotalHits().value >= successfulDocCount - ); - - } - } diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java index 942fa783ade97..4fdf69f3bd7cd 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java @@ -780,8 +780,8 @@ static BulkItemResponse processUpdateResponse( @Override protected void dispatchedShardOperationOnReplica(BulkShardRequest request, IndexShard replica, ActionListener listener) { ActionListener.completeWith(listener, () -> { - final Tuple tuple = performOnReplica(request, replica); - return new WriteReplicaResult<>(request, tuple.v1(), tuple.v2(), null, replica, logger); + final Translog.Location location = performOnReplica(request, replica); + return new WriteReplicaResult<>(request, location, null, replica, logger); }); } @@ -790,10 +790,8 @@ protected long replicaOperationSize(BulkShardRequest request) { return request.ramBytesUsed(); } - public static Tuple performOnReplica(BulkShardRequest request, IndexShard replica) throws Exception { + public static Translog.Location performOnReplica(BulkShardRequest request, IndexShard replica) throws Exception { Translog.Location location = null; - long maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED; - final boolean isSegRepEnabled = replica.indexSettings().isSegRepEnabled(); for (int i = 0; i < request.items().length; i++) { final BulkItemRequest item = request.items()[i]; final BulkItemResponse response = item.getPrimaryResponse(); @@ -815,23 +813,17 @@ public static Tuple performOnReplica(BulkShardRequest r primaryTerm, response.getFailure().getMessage() ); - if (isSegRepEnabled) { - maxSeqNo = Math.max(response.getFailure().getSeqNo(), maxSeqNo); - } } else { if (response.getResponse().getResult() == DocWriteResponse.Result.NOOP) { continue; // ignore replication as it's a noop } assert response.getResponse().getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO; operationResult = performOpOnReplica(response.getResponse(), item.request(), replica); - if (isSegRepEnabled) { - maxSeqNo = Math.max(response.getResponse().getSeqNo(), maxSeqNo); - } } assert operationResult != null : "operation result must never be null when primary response has no failure"; location = syncOperationResultOrThrow(operationResult, location); } - return new Tuple<>(location, maxSeqNo); + return location; } private static Engine.Result performOpOnReplica( diff --git a/server/src/main/java/org/opensearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/opensearch/action/resync/TransportResyncReplicationAction.java index 55cab0761b3f1..2249c7cace943 100644 --- a/server/src/main/java/org/opensearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/opensearch/action/resync/TransportResyncReplicationAction.java @@ -169,7 +169,7 @@ protected void dispatchedShardOperationOnReplica( ) { ActionListener.completeWith(listener, () -> { Translog.Location location = performOnReplica(request, replica); - return new WriteReplicaResult<>(request, location, null, null, replica, logger); + return new WriteReplicaResult<>(request, location, null, replica, logger); }); } diff --git a/server/src/main/java/org/opensearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/opensearch/action/support/replication/TransportWriteAction.java index 80dfb6d0c9cf9..26b15195cd8fc 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/opensearch/action/support/replication/TransportWriteAction.java @@ -307,7 +307,7 @@ public void runPostReplicationActions(ActionListener listener) { * We call this after replication because this might wait for a refresh and that can take a while. * This way we wait for the refresh in parallel on the primary and on the replica. */ - new AsyncAfterWriteAction(primary, replicaRequest, location, null, new RespondingWriteResult() { + new AsyncAfterWriteAction(primary, replicaRequest, location, new RespondingWriteResult() { @Override public void onSuccess(boolean forcedRefresh) { finalResponseIfSuccessful.setForcedRefresh(forcedRefresh); @@ -329,23 +329,20 @@ public void onFailure(Exception ex) { * @opensearch.internal */ public static class WriteReplicaResult> extends ReplicaResult { - private final Location location; + public final Location location; private final ReplicaRequest request; private final IndexShard replica; private final Logger logger; - private final Long maxSeqNo; public WriteReplicaResult( ReplicaRequest request, - @Nullable final Translog.Location location, - @Nullable final Long maxSeqNo, + @Nullable Location location, @Nullable Exception operationFailure, IndexShard replica, Logger logger ) { super(operationFailure); this.location = location; - this.maxSeqNo = maxSeqNo; this.request = request; this.replica = replica; this.logger = logger; @@ -356,7 +353,7 @@ public void runPostReplicaActions(ActionListener listener) { if (finalFailure != null) { listener.onFailure(finalFailure); } else { - new AsyncAfterWriteAction(replica, request, location, maxSeqNo, new RespondingWriteResult() { + new AsyncAfterWriteAction(replica, request, location, new RespondingWriteResult() { @Override public void onSuccess(boolean forcedRefresh) { listener.onResponse(null); @@ -406,6 +403,7 @@ interface RespondingWriteResult { * @opensearch.internal */ static final class AsyncAfterWriteAction { + private final Location location; private final boolean waitUntilRefresh; private final boolean sync; private final AtomicInteger pendingOps = new AtomicInteger(1); @@ -416,15 +414,10 @@ static final class AsyncAfterWriteAction { private final WriteRequest request; private final Logger logger; - private final Location location; - - private final Long maxSeqNo; - AsyncAfterWriteAction( final IndexShard indexShard, final WriteRequest request, @Nullable final Translog.Location location, - @Nullable final Long maxSeqNo, final RespondingWriteResult respond, final Logger logger ) { @@ -450,7 +443,6 @@ static final class AsyncAfterWriteAction { this.waitUntilRefresh = waitUntilRefresh; this.respond = respond; this.location = location; - this.maxSeqNo = maxSeqNo; if ((sync = indexShard.getTranslogDurability() == Translog.Durability.REQUEST && location != null)) { pendingOps.incrementAndGet(); } @@ -482,7 +474,7 @@ void run() { maybeFinish(); if (waitUntilRefresh) { assert pendingOps.get() > 0; - indexShard.addRefreshListener(location, maxSeqNo, forcedRefresh -> { + indexShard.addRefreshListener(location, forcedRefresh -> { if (forcedRefresh) { logger.warn("block until refresh ran out of slots and forced a refresh: [{}]", request); } diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index 0ee58ab3cae28..5fd49813bb849 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -309,22 +309,11 @@ public List segments(boolean verbose) { } @Override - public void refresh(String source) throws EngineException { - maybeRefresh(source); - } + public void refresh(String source) throws EngineException {} @Override public boolean maybeRefresh(String source) throws EngineException { - try { - return readerManager.maybeRefresh(); - } catch (IOException e) { - try { - failEngine("refresh failed source[" + source + "]", e); - } catch (Exception inner) { - e.addSuppressed(inner); - } - throw new RefreshFailedEngineException(shardId, e); - } + return false; } @Override diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java index 00748acb1d76d..8fbb24720aedc 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java @@ -51,10 +51,6 @@ public class NRTReplicationReaderManager extends OpenSearchReaderManager { @Override protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader referenceToRefresh) throws IOException { Objects.requireNonNull(referenceToRefresh); - // checks if an actual refresh (change in segments) happened - if (unwrapStandardReader(referenceToRefresh).getSegmentInfos().version == currentInfos.version) { - return null; - } final List subs = new ArrayList<>(); final StandardDirectoryReader standardDirectoryReader = unwrapStandardReader(referenceToRefresh); for (LeafReaderContext ctx : standardDirectoryReader.leaves()) { diff --git a/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseSyncAction.java b/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseSyncAction.java index c5beb22c6caa6..afcf5c6766194 100644 --- a/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseSyncAction.java +++ b/server/src/main/java/org/opensearch/index/seqno/RetentionLeaseSyncAction.java @@ -202,7 +202,7 @@ protected void dispatchedShardOperationOnReplica(Request request, IndexShard rep Objects.requireNonNull(replica); replica.updateRetentionLeasesOnReplica(request.getRetentionLeases()); replica.persistRetentionLeases(); - return new WriteReplicaResult<>(request, null, null, null, replica, getLogger()); + return new WriteReplicaResult<>(request, null, null, replica, getLogger()); }); } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 5f0a59f995feb..d42adc035e212 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -3994,8 +3994,7 @@ private RefreshListeners buildRefreshListeners() { () -> refresh("too_many_listeners"), logger, threadPool.getThreadContext(), - externalRefreshMetric, - this::getProcessedLocalCheckpoint + externalRefreshMetric ); } @@ -4138,7 +4137,7 @@ public final void awaitShardSearchActive(Consumer listener) { markSearcherAccessed(); // move the shard into non-search idle final Translog.Location location = pendingRefreshLocation.get(); if (location != null) { - addRefreshListener(location, null, (b) -> { + addRefreshListener(location, (b) -> { pendingRefreshLocation.compareAndSet(location, null); listener.accept(true); }); @@ -4148,14 +4147,13 @@ public final void awaitShardSearchActive(Consumer listener) { } /** - * Add a listener for refreshes. Only on Segment replication enabled replica shards we listen for maxSeqNo. In all other cases we listen for translog location + * Add a listener for refreshes. * - * @param location the translog location to listen for on a refresh - * @param maxSeqNo the Sequence Number to listen for on a refresh + * @param location the location to listen for * @param listener for the refresh. Called with true if registering the listener ran it out of slots and forced a refresh. Called with * false otherwise. */ - public void addRefreshListener(Translog.Location location, Long maxSeqNo, Consumer listener) { + public void addRefreshListener(Translog.Location location, Consumer listener) { final boolean readAllowed; if (isReadAllowed()) { readAllowed = true; @@ -4168,11 +4166,7 @@ public void addRefreshListener(Translog.Location location, Long maxSeqNo, Consum } } if (readAllowed) { - if (indexSettings.isSegRepEnabled() && shardRouting.primary() == false) { - refreshListeners.addOrNotify(maxSeqNo, listener); - } else { - refreshListeners.addOrNotify(location, listener); - } + refreshListeners.addOrNotify(location, listener); } else { // we're not yet ready fo ready for reads, just ignore refresh cycles listener.accept(false); diff --git a/server/src/main/java/org/opensearch/index/shard/RefreshListeners.java b/server/src/main/java/org/opensearch/index/shard/RefreshListeners.java index b47f08f7b10d0..7dbbcbb2d7d20 100644 --- a/server/src/main/java/org/opensearch/index/shard/RefreshListeners.java +++ b/server/src/main/java/org/opensearch/index/shard/RefreshListeners.java @@ -39,7 +39,6 @@ import org.opensearch.common.metrics.MeanMetric; import org.opensearch.common.util.concurrent.RunOnce; import org.opensearch.common.util.concurrent.ThreadContext; -import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.translog.Translog; import java.io.Closeable; @@ -48,7 +47,6 @@ import java.util.List; import java.util.function.Consumer; import java.util.function.IntSupplier; -import java.util.function.LongSupplier; import java.util.function.Supplier; import static java.util.Objects.requireNonNull; @@ -68,8 +66,6 @@ public final class RefreshListeners implements ReferenceManager.RefreshListener, private final ThreadContext threadContext; private final MeanMetric refreshMetric; - private final LongSupplier getProcessedCheckpoint; - /** * Time in nanosecond when beforeRefresh() is called. Used for calculating refresh metrics. */ @@ -94,38 +90,29 @@ public final class RefreshListeners implements ReferenceManager.RefreshListener, * We never set this to non-null while closed it {@code true}. */ private volatile List>> refreshListeners = null; - - private volatile List>> seqNoRefreshListeners = null; /** * The translog location that was last made visible by a refresh. */ private volatile Translog.Location lastRefreshedLocation; - private volatile Long lastMaxSeqNo = SequenceNumbers.NO_OPS_PERFORMED; - public RefreshListeners( final IntSupplier getMaxRefreshListeners, final Runnable forceRefresh, final Logger logger, final ThreadContext threadContext, - final MeanMetric refreshMetric, - final LongSupplier getProcessedLocalCheckpoint + final MeanMetric refreshMetric ) { this.getMaxRefreshListeners = getMaxRefreshListeners; this.forceRefresh = forceRefresh; this.logger = logger; this.threadContext = threadContext; this.refreshMetric = refreshMetric; - this.getProcessedCheckpoint = getProcessedLocalCheckpoint; } /** * Force-refreshes newly added listeners and forces a refresh if there are currently listeners registered. See {@link #refreshForcers}. */ public Releasable forceRefreshes() { - if (seqNoRefreshListeners != null) { - releaseSeqNoRefreshListeners(); - } synchronized (this) { assert refreshForcers >= 0; refreshForcers += 1; @@ -148,17 +135,6 @@ public Releasable forceRefreshes() { return () -> runOnce.run(); } - // Release refresh listeners on replica shard with segment replication enabled - private void releaseSeqNoRefreshListeners() { - List>> oldSeqNoListeners; - synchronized (this) { - oldSeqNoListeners = seqNoRefreshListeners; - seqNoRefreshListeners = null; - } - // Fire any listeners we might have had - fireSeqNoListeners(oldSeqNoListeners); - } - /** * Add a listener for refreshes, calling it immediately if the location is already visible. If this runs out of listener slots then it * forces a refresh and calls the listener immediately as well. @@ -184,7 +160,13 @@ public boolean addOrNotify(Translog.Location location, Consumer listene List>> listeners = refreshListeners; final int maxRefreshes = getMaxRefreshListeners.getAsInt(); if (refreshForcers == 0 && maxRefreshes > 0 && (listeners == null || listeners.size() < maxRefreshes)) { - Consumer contextPreservingListener = getContextListener(listener); + ThreadContext.StoredContext storedContext = threadContext.newStoredContext(true); + Consumer contextPreservingListener = forced -> { + try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { + storedContext.restore(); + listener.accept(forced); + } + }; if (listeners == null) { listeners = new ArrayList<>(); } @@ -200,69 +182,16 @@ public boolean addOrNotify(Translog.Location location, Consumer listene return true; } - /** - * Add a listener for refreshes, calling it immediately if the max sequence number is already visible. If this runs out of listener slots then it - * forces a refresh and calls the listener immediately as well. - * - * @param maxSeqNo the Sequence number to listen on segment replication enabled replica shards - * @param listener for the refresh.It waits until max sequence number is visible. Called with true if registering the listener ran it out of slots and forced a refresh. Called with - * false otherwise. - * @return did we call the listener (true) or register the listener to call later (false)? - */ - public boolean addOrNotify(Long maxSeqNo, Consumer listener) { - requireNonNull(listener, "listener cannot be null"); - requireNonNull(maxSeqNo, "maxSeqNo cannot be null"); - - if (lastMaxSeqNo != SequenceNumbers.NO_OPS_PERFORMED && lastMaxSeqNo >= maxSeqNo) { - listener.accept(false); - return true; - } - synchronized (this) { - if (closed) { - throw new IllegalStateException("can't wait for refresh on a closed index"); - } - List>> listeners = seqNoRefreshListeners; - final int maxRefreshes = getMaxRefreshListeners.getAsInt(); - if (refreshForcers == 0 && maxRefreshes > 0 && (listeners == null || listeners.size() < maxRefreshes)) { - Consumer contextPreservingListener = getContextListener(listener); - if (listeners == null) { - listeners = new ArrayList<>(); - } - listeners.add(new Tuple<>(maxSeqNo, contextPreservingListener)); - seqNoRefreshListeners = listeners; - return false; - } - } - // No free slot so release existing refresh listeners and call the listener in this thread - releaseSeqNoRefreshListeners(); - listener.accept(true); - return true; - } - - private Consumer getContextListener(Consumer listener) { - ThreadContext.StoredContext storedContext = threadContext.newStoredContext(true); - return forced -> { - try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { - storedContext.restore(); - listener.accept(forced); - } - }; - } - @Override public void close() throws IOException { List>> oldListeners; - List>> oldSeqNoListeners; synchronized (this) { oldListeners = refreshListeners; - oldSeqNoListeners = seqNoRefreshListeners; refreshListeners = null; - seqNoRefreshListeners = null; closed = true; } // Fire any listeners we might have had fireListeners(oldListeners); - fireSeqNoListeners(oldSeqNoListeners); } /** @@ -270,9 +199,6 @@ public void close() throws IOException { */ public boolean refreshNeeded() { // A null list doesn't need a refresh. If we're closed we don't need a refresh either. - if (seqNoRefreshListeners != null) { - return false == closed; - } return refreshListeners != null && false == closed; } @@ -282,12 +208,8 @@ public boolean refreshNeeded() { public int pendingCount() { // No need to synchronize here because we're doing a single volatile read List>> listeners = refreshListeners; - List>> seqNoListeners = seqNoRefreshListeners; - int count = 0; // A null list means we haven't accumulated any listeners. Otherwise we need the size. - count += listeners == null ? 0 : listeners.size(); - count += seqNoListeners == null ? 0 : seqNoListeners.size(); - return count; + return listeners == null ? 0 : listeners.size(); } /** @@ -331,73 +253,9 @@ public void afterRefresh(boolean didRefresh) throws IOException { * assignment into the synchronized block below and double checking lastRefreshedLocation in addOrNotify's synchronized block but * that doesn't seem worth it given that we already skip this process early if there aren't any listeners to iterate. */ lastRefreshedLocation = currentRefreshLocation; - - // Checks if there are any sequence number based refresh listeners that can be fired or preserved. - checkSeqNoRefreshListeners(); - - // Checks if there are any translog location based refresh listeners that can be fired or preserved. - checkRefreshListeners(); - } - - private void checkSeqNoRefreshListeners() { - /* Grab the current Sequence Number refresh listeners and replace them with null while synchronized. Any listeners that come in after this won't be - * in the list we iterate over and very likely won't be candidates for refresh anyway because we've already moved the - * lastRefreshedLocation. */ - - List>> candidates; - synchronized (this) { - candidates = seqNoRefreshListeners; - // No listeners to check so just bail early - if (candidates == null) { - return; - } - seqNoRefreshListeners = null; - } - // Iterate the list of listeners, copying the listeners to fire to one list and those to preserve to another list. - List>> listenersToFire = null; - List>> preservedListeners = null; - - for (Tuple> tuple : candidates) { - Long seqNo = tuple.v1(); - if (seqNo <= getProcessedCheckpoint.getAsLong()) { - if (listenersToFire == null) { - listenersToFire = new ArrayList<>(); - } - listenersToFire.add(tuple); - } else { - if (preservedListeners == null) { - preservedListeners = new ArrayList<>(); - } - preservedListeners.add(tuple); - } - } - /* Now deal with the listeners that it isn't time yet to fire. We need to do this under lock so we don't miss a concurrent close or - * newly registered listener. If we're not closed we just add the listeners to the list of listeners we check next time. If we are - * closed we fire the listeners even though it isn't time for them. */ - if (preservedListeners != null) { - synchronized (this) { - if (seqNoRefreshListeners == null) { - if (closed) { - listenersToFire.addAll(preservedListeners); - } else { - seqNoRefreshListeners = preservedListeners; - } - } else { - assert closed == false : "Can't be closed and have non-null refreshListeners"; - seqNoRefreshListeners.addAll(preservedListeners); - } - } - } - // Lastly, fire the listeners that are ready - fireSeqNoListeners(listenersToFire); - } - - private void checkRefreshListeners() { - /* Grab the current refresh listeners and replace them with null while synchronized. Any listeners that come in after this won't be * in the list we iterate over and very likely won't be candidates for refresh anyway because we've already moved the * lastRefreshedLocation. */ - List>> candidates; synchronized (this) { candidates = refreshListeners; @@ -410,7 +268,6 @@ private void checkRefreshListeners() { // Iterate the list of listeners, copying the listeners to fire to one list and those to preserve to another list. List>> listenersToFire = null; List>> preservedListeners = null; - for (Tuple> tuple : candidates) { Translog.Location location = tuple.v1(); if (location.compareTo(currentRefreshLocation) <= 0) { @@ -460,19 +317,4 @@ private void fireListeners(final List } } } - - /** - * Fire Sequence number based replica shard listeners. Does nothing if the list of listeners is null. - */ - private void fireSeqNoListeners(final List>> listenersToFire) { - if (listenersToFire != null) { - for (final Tuple> listener : listenersToFire) { - try { - listener.v2().accept(false); - } catch (final Exception e) { - logger.warn("error firing refresh listener", e); - } - } - } - } } diff --git a/server/src/test/java/org/opensearch/action/support/replication/TransportWriteActionForIndexingPressureTests.java b/server/src/test/java/org/opensearch/action/support/replication/TransportWriteActionForIndexingPressureTests.java index 5255cb72253b0..3aae1ee40818d 100644 --- a/server/src/test/java/org/opensearch/action/support/replication/TransportWriteActionForIndexingPressureTests.java +++ b/server/src/test/java/org/opensearch/action/support/replication/TransportWriteActionForIndexingPressureTests.java @@ -423,7 +423,7 @@ protected void dispatchedShardOperationOnPrimary( @Override protected void dispatchedShardOperationOnReplica(TestRequest request, IndexShard replica, ActionListener listener) { - ActionListener.completeWith(listener, () -> new WriteReplicaResult<>(request, location, null, null, replica, logger)); + ActionListener.completeWith(listener, () -> new WriteReplicaResult<>(request, location, null, replica, logger)); } } diff --git a/server/src/test/java/org/opensearch/action/support/replication/TransportWriteActionTests.java b/server/src/test/java/org/opensearch/action/support/replication/TransportWriteActionTests.java index 2e27616399d02..137aca4966936 100644 --- a/server/src/test/java/org/opensearch/action/support/replication/TransportWriteActionTests.java +++ b/server/src/test/java/org/opensearch/action/support/replication/TransportWriteActionTests.java @@ -161,7 +161,7 @@ public void testPrimaryNoRefreshCall() throws Exception { assertNotNull(listener.response); assertNull(listener.failure); verify(indexShard, never()).refresh(any()); - verify(indexShard, never()).addRefreshListener(any(), any(), any()); + verify(indexShard, never()).addRefreshListener(any(), any()); })); } @@ -177,7 +177,7 @@ public void testReplicaNoRefreshCall() throws Exception { assertNotNull(listener.response); assertNull(listener.failure); verify(indexShard, never()).refresh(any()); - verify(indexShard, never()).addRefreshListener(any(), any(), any()); + verify(indexShard, never()).addRefreshListener(any(), any()); } public void testPrimaryImmediateRefresh() throws Exception { @@ -191,7 +191,7 @@ public void testPrimaryImmediateRefresh() throws Exception { assertNull(listener.failure); assertTrue(listener.response.forcedRefresh); verify(indexShard).refresh("refresh_flag_index"); - verify(indexShard, never()).addRefreshListener(any(), any(), any()); + verify(indexShard, never()).addRefreshListener(any(), any()); })); } @@ -207,7 +207,7 @@ public void testReplicaImmediateRefresh() throws Exception { assertNotNull(listener.response); assertNull(listener.failure); verify(indexShard).refresh("refresh_flag_index"); - verify(indexShard, never()).addRefreshListener(any(), any(), any()); + verify(indexShard, never()).addRefreshListener(any(), any()); } public void testPrimaryWaitForRefresh() throws Exception { @@ -223,7 +223,7 @@ public void testPrimaryWaitForRefresh() throws Exception { @SuppressWarnings({ "unchecked", "rawtypes" }) ArgumentCaptor> refreshListener = ArgumentCaptor.forClass((Class) Consumer.class); verify(indexShard, never()).refresh(any()); - verify(indexShard).addRefreshListener(any(), any(), refreshListener.capture()); + verify(indexShard).addRefreshListener(any(), refreshListener.capture()); // Now we can fire the listener manually and we'll get a response boolean forcedRefresh = randomBoolean(); @@ -247,7 +247,7 @@ public void testReplicaWaitForRefresh() throws Exception { @SuppressWarnings({ "unchecked", "rawtypes" }) ArgumentCaptor> refreshListener = ArgumentCaptor.forClass((Class) Consumer.class); verify(indexShard, never()).refresh(any()); - verify(indexShard).addRefreshListener(any(), any(), refreshListener.capture()); + verify(indexShard).addRefreshListener(any(), refreshListener.capture()); // Now we can fire the listener manually and we'll get a response boolean forcedRefresh = randomBoolean(); @@ -531,9 +531,9 @@ protected void dispatchedShardOperationOnReplica(TestRequest request, IndexShard ActionListener.completeWith(listener, () -> { final WriteReplicaResult replicaResult; if (withDocumentFailureOnReplica) { - replicaResult = new WriteReplicaResult<>(request, null, null, new RuntimeException("simulated"), replica, logger); + replicaResult = new WriteReplicaResult<>(request, null, new RuntimeException("simulated"), replica, logger); } else { - replicaResult = new WriteReplicaResult<>(request, location, null, null, replica, logger); + replicaResult = new WriteReplicaResult<>(request, location, null, replica, logger); } return replicaResult; }); diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 2d116c02d8c50..c84d03a8454d9 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -3266,7 +3266,7 @@ public void testRefreshListenersDuringPeerRecovery() throws IOException { indexDoc(primary, "_doc", "0", "{\"foo\" : \"bar\"}"); Consumer assertListenerCalled = shard -> { AtomicBoolean called = new AtomicBoolean(); - shard.addRefreshListener(null, null, b -> { + shard.addRefreshListener(null, b -> { assertFalse(b); called.set(true); }); @@ -4179,7 +4179,7 @@ public void testRefreshIsNeededWithRefreshListeners() throws IOException, Interr assertTrue(primary.scheduledRefresh()); Engine.IndexResult doc = indexDoc(primary, "_doc", "1", "{\"foo\" : \"bar\"}"); CountDownLatch latch = new CountDownLatch(1); - primary.addRefreshListener(doc.getTranslogLocation(), null, r -> latch.countDown()); + primary.addRefreshListener(doc.getTranslogLocation(), r -> latch.countDown()); assertEquals(1, latch.getCount()); assertTrue(primary.getEngine().refreshNeeded()); assertTrue(primary.scheduledRefresh()); @@ -4191,7 +4191,7 @@ public void testRefreshIsNeededWithRefreshListeners() throws IOException, Interr doc = indexDoc(primary, "_doc", "2", "{\"foo\" : \"bar\"}"); CountDownLatch latch1 = new CountDownLatch(1); - primary.addRefreshListener(doc.getTranslogLocation(), null, r -> latch1.countDown()); + primary.addRefreshListener(doc.getTranslogLocation(), r -> latch1.countDown()); assertEquals(1, latch1.getCount()); assertTrue(primary.getEngine().refreshNeeded()); assertTrue(primary.scheduledRefresh()); diff --git a/server/src/test/java/org/opensearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/opensearch/index/shard/RefreshListenersTests.java index 34bcfd44faf07..dbb0207b4f1b7 100644 --- a/server/src/test/java/org/opensearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RefreshListenersTests.java @@ -118,8 +118,7 @@ public void setupListeners() throws Exception { () -> engine.refresh("too-many-listeners"), logger, threadPool.getThreadContext(), - refreshMetric, - () -> 10L + refreshMetric ); IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("index", Settings.EMPTY); @@ -211,46 +210,6 @@ public void testAfterRefresh() throws Exception { assertEquals(0, listeners.pendingCount()); } - public void testSeqNoRefreshListenersReleasedOnForceRefresh() throws IOException { - assertEquals(0, listeners.pendingCount()); - Engine.IndexResult indexOne = index("1"); - Engine.IndexResult indexTwo = index("2"); - Engine.IndexResult indexThree = index("3"); - DummyRefreshListener listenerOne = new DummyRefreshListener(); - DummyRefreshListener listenerTwo = new DummyRefreshListener(); - DummyRefreshListener listenerThree = new DummyRefreshListener(); - - // Add two listeners - listeners.addOrNotify(indexOne.getSeqNo(), listenerOne); - listeners.addOrNotify(indexTwo.getSeqNo(), listenerTwo); - - // verify that two listeners are added successfully - assertEquals(2, listeners.pendingCount()); - - // now Force refresh which would fire all seqNO listeners and block addition of any new seqNo Listeners. - listeners.forceRefreshes(); - - // Add new listener and verify this listener is not added. - listeners.addOrNotify(indexThree.getSeqNo(), listenerThree); - assertEquals(0, listeners.pendingCount()); - - } - - public void testSeqNoRefreshListener() throws IOException { - assertEquals(0, listeners.pendingCount()); - Engine.IndexResult index = index("1"); - DummyRefreshListener listener = new DummyRefreshListener(); - - // Add new listener - listeners.addOrNotify(index.getSeqNo(), listener); - assertEquals(1, listeners.pendingCount()); - - // Refresh and verify that listener is fired. - engine.refresh("test"); - assertEquals(0, listeners.pendingCount()); - - } - public void testContextIsPreserved() throws IOException, InterruptedException { assertEquals(0, listeners.pendingCount()); Engine.IndexResult index = index("1"); diff --git a/test/framework/src/main/java/org/opensearch/action/support/replication/TransportWriteActionTestHelper.java b/test/framework/src/main/java/org/opensearch/action/support/replication/TransportWriteActionTestHelper.java index c01460982a3b1..8fb95726215f3 100644 --- a/test/framework/src/main/java/org/opensearch/action/support/replication/TransportWriteActionTestHelper.java +++ b/test/framework/src/main/java/org/opensearch/action/support/replication/TransportWriteActionTestHelper.java @@ -59,7 +59,7 @@ public void onFailure(Exception ex) { throw new AssertionError(ex); } }; - new TransportWriteAction.AsyncAfterWriteAction(indexShard, request, location, null, writerResult, logger).run(); + new TransportWriteAction.AsyncAfterWriteAction(indexShard, request, location, writerResult, logger).run(); try { latch.await(); } catch (InterruptedException e) { diff --git a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java index 9e0d28ca4b819..8df57ccad85cc 100644 --- a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java @@ -73,7 +73,6 @@ import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.TestShardRouting; import org.opensearch.common.collect.Iterators; -import org.opensearch.common.collect.Tuple; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.lease.Releasable; @@ -1022,11 +1021,10 @@ private void executeShardBulkOnReplica( request ); final Translog.Location location; - final Tuple tuple; try (Releasable ignored = permitAcquiredFuture.actionGet()) { - tuple = TransportShardBulkAction.performOnReplica(request, replica); + location = TransportShardBulkAction.performOnReplica(request, replica); } - TransportWriteActionTestHelper.performPostWriteActions(replica, request, tuple.v1(), logger); + TransportWriteActionTestHelper.performPostWriteActions(replica, request, location, logger); } /**