Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flush instead of synced-flush inactive shards #49126

Merged
merged 7 commits into from
Nov 22, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,18 +118,6 @@ public void afterIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSha
}
}

@Override
public void onShardInactive(IndexShard indexShard) {
for (IndexEventListener listener : listeners) {
try {
listener.onShardInactive(indexShard);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("[{}] failed to invoke on shard inactive callback",
indexShard.shardId().getId()), e);
throw e;
}
}
}

@Override
public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState, IndexShardState currentState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,6 @@ default void afterIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh
default void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState,
IndexShardState currentState, @Nullable String reason) {}

/**
* Called when a shard is marked as inactive
*
* @param indexShard The shard that was marked inactive
*/
default void onShardInactive(IndexShard indexShard) {}

/**
* Called before the index gets created. Note that this is also called
* when the index is created on data nodes
Expand Down
25 changes: 17 additions & 8 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1578,7 +1578,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
onNewEngine(newEngine);
currentEngineReference.set(newEngine);
// We set active because we are now writing operations to the engine; this way,
// if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
// we can flush if we go idle after some time and become inactive.
active.set(true);
}
// time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during
Expand Down Expand Up @@ -1758,19 +1758,28 @@ public void addShardFailureCallback(Consumer<ShardFailure> onShardFailure) {

/**
* Called by {@link IndexingMemoryController} to check whether more than {@code inactiveTimeNS} has passed since the last
* indexing operation, and notify listeners that we are now inactive so e.g. sync'd flush can happen.
* indexing operation, so we can flush the index.
*/
public void checkIdle(long inactiveTimeNS) {
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
Engine engineOrNull = getEngineOrNull();
if (engineOrNull != null && System.nanoTime() - engineOrNull.getLastWriteNanos() >= inactiveTimeNS) {
boolean wasActive = active.getAndSet(false);
if (wasActive) {
logger.debug("shard is now inactive");
try {
indexEventListener.onShardInactive(this);
} catch (Exception e) {
logger.warn("failed to notify index event listener", e);
}
logger.debug("flushing shard on inactive");
threadPool.executor(ThreadPool.Names.FLUSH).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
if (state != IndexShardState.CLOSED) {
logger.warn("failed to flush shard on inactive", e);
}
}

@Override
protected void doRun() {
periodicFlushMetric.inc();
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
flush(new FlushRequest().waitIfOngoing(false).force(false));
}
});
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ private void runUnlocked() {
long totalBytesWriting = 0;
for (IndexShard shard : availableShards()) {

// Give shard a chance to transition to inactive so sync'd flush can happen:
// Give shard a chance to transition to inactive so we can flush
checkIdle(shard, inactiveTime.nanos());

// How many bytes this shard is currently (async'd) moving from heap to disk:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,13 +179,7 @@ public IndicesClusterStateService(
final PrimaryReplicaSyncer primaryReplicaSyncer,
final NodeClient client) {
this.settings = settings;
this.buildInIndexListener =
Arrays.asList(
peerRecoverySourceService,
recoveryTargetService,
searchService,
syncedFlushService,
snapshotShardsService);
this.buildInIndexListener = Arrays.asList(peerRecoverySourceService, recoveryTargetService, searchService, snapshotShardsService);
this.indicesService = indicesService;
this.clusterService = clusterService;
this.threadPool = threadPool;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
Expand All @@ -71,7 +70,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentMap;

public class SyncedFlushService implements IndexEventListener {
public class SyncedFlushService {

private static final Logger logger = LogManager.getLogger(SyncedFlushService.class);

Expand Down Expand Up @@ -101,25 +100,6 @@ public SyncedFlushService(IndicesService indicesService,
new InFlightOpCountTransportHandler());
}

@Override
public void onShardInactive(final IndexShard indexShard) {
// we only want to call sync flush once, so only trigger it when we are on a primary
if (indexShard.routingEntry().primary()) {
attemptSyncedFlush(indexShard.shardId(), new ActionListener<ShardsSyncedFlushResult>() {
@Override
public void onResponse(ShardsSyncedFlushResult syncedFlushResult) {
logger.trace("{} sync flush on inactive shard returned successfully for sync_id: {}",
syncedFlushResult.getShardId(), syncedFlushResult.syncId());
}

@Override
public void onFailure(Exception e) {
logger.debug(() -> new ParameterizedMessage("{} sync flush on inactive shard failed", indexShard.shardId()), e);
}
});
}
}

/**
* a utility method to perform a synced flush for all shards of multiple indices.
* see {@link #attemptSyncedFlush(ShardId, ActionListener)}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
Expand Down Expand Up @@ -167,23 +166,6 @@ public void testLockTryingToDelete() throws Exception {
}
}

public void testMarkAsInactiveTriggersSyncedFlush() throws Exception {
assertAcked(client().admin().indices().prepareCreate("test")
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0)));
client().prepareIndex("test").setSource("{}", XContentType.JSON).get();
ensureGreen("test");
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
indicesService.indexService(resolveIndex("test")).getShardOrNull(0).checkIdle(0);
assertBusy(() -> {
IndexStats indexStats = client().admin().indices().prepareStats("test").clear().get().getIndex("test");
assertNotNull(indexStats.getShards()[0].getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID));
indicesService.indexService(resolveIndex("test")).getShardOrNull(0).checkIdle(0);
}
);
IndexStats indexStats = client().admin().indices().prepareStats("test").get().getIndex("test");
assertNotNull(indexStats.getShards()[0].getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID));
}

public void testDurableFlagHasEffect() throws Exception {
createIndex("test");
ensureGreen();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3630,48 +3630,23 @@ public void testSegmentMemoryTrackedWithRandomSearchers() throws Exception {
}

public void testFlushOnInactive() throws Exception {
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.build();
IndexMetaData metaData = IndexMetaData.builder("test")
.putMapping("{ \"properties\": { \"foo\": { \"type\": \"text\"}}}")
.settings(settings)
.primaryTerm(0, 1).build();
ShardRouting shardRouting =
TestShardRouting.newShardRouting(new ShardId(metaData.getIndex(), 0), "n1", true,
ShardRoutingState.INITIALIZING, RecoverySource.EmptyStoreRecoverySource.INSTANCE);
final ShardId shardId = shardRouting.shardId();
final NodeEnvironment.NodePath nodePath = new NodeEnvironment.NodePath(createTempDir());
ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId);
AtomicBoolean markedInactive = new AtomicBoolean();
AtomicReference<IndexShard> primaryRef = new AtomicReference<>();
IndexShard primary = newShard(shardRouting, shardPath, metaData, null, null, new InternalEngineFactory(), () -> { },
RetentionLeaseSyncer.EMPTY, new IndexEventListener() {
@Override
public void onShardInactive(IndexShard indexShard) {
markedInactive.set(true);
primaryRef.get().flush(new FlushRequest());
}
});
primaryRef.set(primary);
recoverShardFromStore(primary);
IndexShard shard = newStartedShard();
for (int i = 0; i < 3; i++) {
indexDoc(primary, "_doc", "" + i, "{\"foo\" : \"" + randomAlphaOfLength(10) + "\"}");
primary.refresh("test"); // produce segments
indexDoc(shard, "_doc", Integer.toString(i));
shard.refresh("test"); // produce segments
}
List<Segment> segments = primary.segments(false);
List<Segment> segments = shard.segments(false);
Set<String> names = new HashSet<>();
for (Segment segment : segments) {
assertFalse(segment.committed);
assertTrue(segment.search);
names.add(segment.getName());
}
assertEquals(3, segments.size());
primary.flush(new FlushRequest());
primary.forceMerge(new ForceMergeRequest().maxNumSegments(1).flush(false));
primary.refresh("test");
segments = primary.segments(false);
shard.flush(new FlushRequest());
shard.forceMerge(new ForceMergeRequest().maxNumSegments(1).flush(false));
shard.refresh("test");
segments = shard.segments(false);
for (Segment segment : segments) {
if (names.contains(segment.getName())) {
assertTrue(segment.committed);
Expand All @@ -3683,20 +3658,18 @@ public void onShardInactive(IndexShard indexShard) {
}
assertEquals(4, segments.size());

assertFalse(markedInactive.get());
assertBusy(() -> {
primary.checkIdle(0);
assertFalse(primary.isActive());
});
shard.checkIdle(0);
assertFalse(shard.isActive());

assertTrue(markedInactive.get());
segments = primary.segments(false);
assertEquals(1, segments.size());
for (Segment segment : segments) {
assertTrue(segment.committed);
assertTrue(segment.search);
}
closeShards(primary);
assertBusy(() -> { // flush happens in the background using the flush threadpool
List<Segment> segmentsAfterFlush = shard.segments(false);
assertEquals(1, segmentsAfterFlush.size());
for (Segment segment : segmentsAfterFlush) {
assertTrue(segment.committed);
assertTrue(segment.search);
}
});
closeShards(shard);
}

public void testOnCloseStats() throws IOException {
Expand Down
38 changes: 38 additions & 0 deletions server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.InternalEngine;
Expand All @@ -47,15 +48,22 @@
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndexingMemoryController;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.test.InternalTestCluster;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
Expand All @@ -71,6 +79,11 @@
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST)
public class FlushIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(InternalSettingsPlugin.class);
}

public void testWaitIfOngoing() throws InterruptedException {
createIndex("test");
ensureGreen("test");
Expand Down Expand Up @@ -369,4 +382,29 @@ public void testDoNotRenewSyncedFlushWhenAllSealed() throws Exception {
assertThat(forthSeal.successfulShards(), equalTo(numberOfReplicas + 1));
assertThat(forthSeal.syncId(), not(equalTo(thirdSeal.syncId())));
}

public void testFlushOnInactive() throws Exception {
final String indexName = "flush_on_inactive";
List<String> dataNodes = internalCluster().startDataOnlyNodes(2, Settings.builder()
.put(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.getKey(), randomTimeValue(2, 5, "s")).build());
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
assertAcked(client().admin().indices().prepareCreate(indexName).setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), randomTimeValue(200, 500, "ms"))
.put("index.routing.allocation.include._name", String.join(",", dataNodes))
.build()));
ensureGreen(indexName);
int numDocs = randomIntBetween(1, 10);
for (int i = 0; i < numDocs; i++) {
client().prepareIndex(indexName).setSource("f", "v").get();
}
if (randomBoolean()) {
internalCluster().restartNode(randomFrom(dataNodes), new InternalTestCluster.RestartCallback());
ensureGreen(indexName);
}
assertBusy(() -> {
for (ShardStats shardStats : client().admin().indices().prepareStats(indexName).get().getShards()) {
assertThat(shardStats.getStats().getTranslog().getUncommittedOperations(), equalTo(0));
}
}, 30, TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1978,9 +1978,13 @@ public List<String> startMasterOnlyNodes(int numNodes, Settings settings) {
}

public List<String> startDataOnlyNodes(int numNodes) {
return startDataOnlyNodes(numNodes, Settings.EMPTY);
}

public List<String> startDataOnlyNodes(int numNodes, Settings settings) {
return startNodes(
numNodes,
Settings.builder().put(Settings.EMPTY).put(Node.NODE_MASTER_SETTING.getKey(), false)
Settings.builder().put(settings).put(Node.NODE_MASTER_SETTING.getKey(), false)
.put(Node.NODE_DATA_SETTING.getKey(), true).build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,6 @@ public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardSt
delegate.indexShardStateChanged(indexShard, previousState, currentState, reason);
}

@Override
public void onShardInactive(IndexShard indexShard) {
delegate.onShardInactive(indexShard);
}

@Override
public void beforeIndexCreated(Index index, Settings indexSettings) {
delegate.beforeIndexCreated(index, indexSettings);
Expand Down