Skip to content

Commit

Permalink
TEST: Retry synced-flush if ongoing ops on primary (#30978)
Browse files Browse the repository at this point in the history
When the last indexing operation is completed, we will fire a global
checkpoint sync. Since a global checkpoint sync request is a replication
request, it will acquire an index shard permit on the primary when
executing. If this happens at the same time while we are issuing the
synced-flush, the synced-flush request will fail as it thinks there are
in-flight operations. We can avoid such situation by retrying another
synced-flush if the current request fails due to ongoing operations on
the primary.

Closes #29392
  • Loading branch information
dnhatn committed Jun 10, 2018
1 parent c52cf4b commit 5e3ae47
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,13 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -102,7 +100,7 @@ public void onFailure(Exception e) {
}
}

public void testSyncedFlush() throws ExecutionException, InterruptedException, IOException {
public void testSyncedFlush() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(2);
prepareCreate("test").setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)).get();
ensureGreen();
Expand Down Expand Up @@ -245,13 +243,6 @@ private void indexDoc(Engine engine, String id) throws IOException {
assertThat(indexResult.getFailure(), nullValue());
}

private String syncedFlushDescription(ShardsSyncedFlushResult result) {
return result.shardResponses().entrySet().stream()
.map(e -> "Shard [" + e.getKey() + "], result [" + e.getValue() + "]")
.collect(Collectors.joining(","));
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/29392")
public void testSyncedFlushSkipOutOfSyncReplicas() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(between(2, 3));
final int numberOfReplicas = internalCluster().numDataNodes() - 1;
Expand All @@ -277,7 +268,6 @@ public void testSyncedFlushSkipOutOfSyncReplicas() throws Exception {
indexDoc(IndexShardTestCase.getEngine(outOfSyncReplica), "extra_" + i);
}
final ShardsSyncedFlushResult partialResult = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), shardId);
logger.info("Partial seal: {}", syncedFlushDescription(partialResult));
assertThat(partialResult.totalShards(), equalTo(numberOfReplicas + 1));
assertThat(partialResult.successfulShards(), equalTo(numberOfReplicas));
assertThat(partialResult.shardResponses().get(outOfSyncReplica.routingEntry()).failureReason, equalTo(
Expand All @@ -293,8 +283,6 @@ public void testSyncedFlushSkipOutOfSyncReplicas() throws Exception {
assertThat(fullResult.successfulShards(), equalTo(numberOfReplicas + 1));
}

@TestLogging("_root:DEBUG")
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/29392")
public void testDoNotRenewSyncedFlushWhenAllSealed() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(between(2, 3));
final int numberOfReplicas = internalCluster().numDataNodes() - 1;
Expand All @@ -311,11 +299,9 @@ public void testDoNotRenewSyncedFlushWhenAllSealed() throws Exception {
index("test", "doc", Integer.toString(i));
}
final ShardsSyncedFlushResult firstSeal = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), shardId);
logger.info("First seal: {}", syncedFlushDescription(firstSeal));
assertThat(firstSeal.successfulShards(), equalTo(numberOfReplicas + 1));
// Do not renew synced-flush
final ShardsSyncedFlushResult secondSeal = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), shardId);
logger.info("Second seal: {}", syncedFlushDescription(secondSeal));
assertThat(secondSeal.successfulShards(), equalTo(numberOfReplicas + 1));
assertThat(secondSeal.syncId(), equalTo(firstSeal.syncId()));
// Shards were updated, renew synced flush.
Expand All @@ -324,7 +310,6 @@ public void testDoNotRenewSyncedFlushWhenAllSealed() throws Exception {
index("test", "doc", Integer.toString(i));
}
final ShardsSyncedFlushResult thirdSeal = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), shardId);
logger.info("Third seal: {}", syncedFlushDescription(thirdSeal));
assertThat(thirdSeal.successfulShards(), equalTo(numberOfReplicas + 1));
assertThat(thirdSeal.syncId(), not(equalTo(firstSeal.syncId())));
// Manually remove or change sync-id, renew synced flush.
Expand All @@ -340,7 +325,6 @@ public void testDoNotRenewSyncedFlushWhenAllSealed() throws Exception {
assertThat(shard.commitStats().syncId(), nullValue());
}
final ShardsSyncedFlushResult forthSeal = SyncedFlushUtil.attemptSyncedFlush(internalCluster(), shardId);
logger.info("Forth seal: {}", syncedFlushDescription(forthSeal));
assertThat(forthSeal.successfulShards(), equalTo(numberOfReplicas + 1));
assertThat(forthSeal.syncId(), not(equalTo(thirdSeal.syncId())));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.InternalTestCluster;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

import static org.elasticsearch.test.ESTestCase.assertBusy;

/** Utils for SyncedFlush */
public class SyncedFlushUtil {
Expand All @@ -40,19 +42,31 @@ private SyncedFlushUtil() {
/**
* Blocking version of {@link SyncedFlushService#attemptSyncedFlush(ShardId, ActionListener)}
*/
public static ShardsSyncedFlushResult attemptSyncedFlush(InternalTestCluster cluster, ShardId shardId) {
public static ShardsSyncedFlushResult attemptSyncedFlush(InternalTestCluster cluster, ShardId shardId) throws Exception {
/*
* When the last indexing operation is completed, we will fire a global checkpoint sync.
* Since a global checkpoint sync request is a replication request, it will acquire an index
* shard permit on the primary when executing. If this happens at the same time while we are
* issuing the synced-flush, the synced-flush request will fail as it thinks there are
* in-flight operations. We can avoid such situation by continuing issuing another synced-flush
* if the synced-flush failed due to the ongoing operations on the primary.
*/
SyncedFlushService service = cluster.getInstance(SyncedFlushService.class);
LatchedListener<ShardsSyncedFlushResult> listener = new LatchedListener();
service.attemptSyncedFlush(shardId, listener);
try {
AtomicReference<LatchedListener<ShardsSyncedFlushResult>> listenerHolder = new AtomicReference<>();
assertBusy(() -> {
LatchedListener<ShardsSyncedFlushResult> listener = new LatchedListener<>();
listenerHolder.set(listener);
service.attemptSyncedFlush(shardId, listener);
listener.latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
if (listener.result != null && listener.result.failureReason() != null
&& listener.result.failureReason().contains("ongoing operations on primary")) {
throw new AssertionError(listener.result.failureReason()); // cause the assert busy to retry
}
});
if (listenerHolder.get().error != null) {
throw ExceptionsHelper.convertToElastic(listenerHolder.get().error);
}
if (listener.error != null) {
throw ExceptionsHelper.convertToElastic(listener.error);
}
return listener.result;
return listenerHolder.get().result;
}

public static final class LatchedListener<T> implements ActionListener<T> {
Expand Down

0 comments on commit 5e3ae47

Please sign in to comment.