Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More Efficient Ordering of Shard Upload Execution #42791

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
5036455
startt
original-brownbear Jun 2, 2019
019682d
simpler
original-brownbear Jun 2, 2019
3529f36
start
original-brownbear Jun 2, 2019
d285c3e
Merge remote-tracking branch 'elastic/master' into smarter-ordering-s…
original-brownbear Jun 3, 2019
cdd132e
fixed tests
original-brownbear Jun 3, 2019
2cbefd6
Merge remote-tracking branch 'elastic/master' into smarter-ordering-s…
original-brownbear Jun 3, 2019
7b5c325
snapshot pool for setting up new shards is fine after all
original-brownbear Jun 3, 2019
f25c588
drier
original-brownbear Jun 3, 2019
d222726
drier
original-brownbear Jun 3, 2019
01ce0ac
drier
original-brownbear Jun 3, 2019
a43d2dc
drier
original-brownbear Jun 3, 2019
7fc5460
simpler
original-brownbear Jun 3, 2019
8c64b96
safer looking
original-brownbear Jun 3, 2019
84586f1
some docs
original-brownbear Jun 3, 2019
c89c851
Merge remote-tracking branch 'elastic/master' into smarter-ordering-s…
original-brownbear Jun 6, 2019
7620af5
Merge remote-tracking branch 'elastic/master' into smarter-ordering-s…
original-brownbear Jun 22, 2019
e357b1d
Merge remote-tracking branch 'elastic/master' into smarter-ordering-s…
original-brownbear Jun 24, 2019
396b3ae
finished merge
original-brownbear Jun 24, 2019
2099b11
bcl
original-brownbear Jun 24, 2019
dcf374f
simplify
original-brownbear Jun 24, 2019
5b480c8
Merge remote-tracking branch 'elastic/master' into smarter-ordering-s…
original-brownbear Jun 24, 2019
fb0b81f
simpler
original-brownbear Jun 24, 2019
a852af2
Merge remote-tracking branch 'elastic/master' into smarter-ordering-s…
original-brownbear Jul 5, 2019
96efa64
Merge remote-tracking branch 'elastic/master' into smarter-ordering-s…
original-brownbear Jul 5, 2019
f31b503
Merge remote-tracking branch 'elastic/master' into smarter-ordering-s…
original-brownbear Jul 5, 2019
c2b6467
shorter diff
original-brownbear Jul 5, 2019
402bb50
Merge remote-tracking branch 'elastic/master' into smarter-ordering-s…
original-brownbear Aug 8, 2019
5136603
Merge remote-tracking branch 'elastic/master' into smarter-ordering-s…
original-brownbear Aug 21, 2019
269e542
Merge remote-tracking branch 'elastic/master' into smarter-ordering-s…
original-brownbear Aug 22, 2019
b6dbf49
Merge remote-tracking branch 'elastic/master' into smarter-ordering-s…
original-brownbear Aug 26, 2019
27a0729
CR: move try-catch into snapshot method
original-brownbear Aug 26, 2019
07389e7
Merge remote-tracking branch 'elastic/master' into smarter-ordering-s…
original-brownbear Aug 26, 2019
61a4fa2
Merge remote-tracking branch 'elastic/master' into smarter-ordering-s…
original-brownbear Aug 27, 2019
7a7864e
simplify signature
original-brownbear Aug 27, 2019
7a3e83e
nicer looking
original-brownbear Aug 28, 2019
3604ef5
Merge remote-tracking branch 'elastic/master' into smarter-ordering-s…
original-brownbear Aug 28, 2019
e10d8c0
fix noisy empty line
original-brownbear Aug 28, 2019
8c53eb0
stop being clever aobut group action listener
original-brownbear Aug 28, 2019
d22a98b
Merge remote-tracking branch 'elastic/master' into smarter-ordering-s…
original-brownbear Aug 28, 2019
a328127
CR: step listeners, test, comment,typo
original-brownbear Aug 28, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions server/src/main/java/org/elasticsearch/action/ActionListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.CheckedSupplier;

import java.util.ArrayList;
Expand Down Expand Up @@ -226,6 +227,37 @@ public void onFailure(Exception e) {
};
}

/**
* Wraps a given listener and returns a new listener which executes the provided {@code runBefore}
* callback before the listener is notified via either {@code #onResponse} or {@code #onFailure}.
* If the callback throws an exception then it will be passed to the listener's {@code #onFailure} and its {@code #onResponse} will
* not be executed.
*/
static <Response> ActionListener<Response> runBefore(ActionListener<Response> delegate, CheckedRunnable<?> runBefore) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a test for this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I just added the same one we have for the after hook here as well :)

return new ActionListener<>() {
@Override
public void onResponse(Response response) {
try {
runBefore.run();
} catch (Exception ex) {
delegate.onFailure(ex);
return;
}
delegate.onResponse(response);
}

@Override
public void onFailure(Exception e) {
try {
runBefore.run();
} catch (Exception ex) {
e.addSuppressed(ex);
}
delegate.onFailure(e);
}
};
}

/**
* Wraps a given listener and returns a new listener which makes sure {@link #onResponse(Object)}
* and {@link #onFailure(Exception)} of the provided listener will be called at most once.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,11 @@ public boolean isReadOnly() {
return in.isReadOnly();
}


@Override
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {
in.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus);
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener<Void> listener) {
in.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, listener);
}

@Override
public void restoreShard(Store store, SnapshotId snapshotId,
Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
* <ul>
* <li>Master calls {@link #initializeSnapshot(SnapshotId, List, org.elasticsearch.cluster.metadata.MetaData)}
* with list of indices that will be included into the snapshot</li>
* <li>Data nodes call {@link Repository#snapshotShard(Store, MapperService, SnapshotId, IndexId, IndexCommit, IndexShardSnapshotStatus)}
* <li>Data nodes call {@link Repository#snapshotShard}
* for each shard</li>
* <li>When all shard calls return master calls {@link #finalizeSnapshot} with possible list of failures</li>
* </ul>
Expand Down Expand Up @@ -204,9 +204,10 @@ SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long
* @param indexId id for the index being snapshotted
* @param snapshotIndexCommit commit point
* @param snapshotStatus snapshot status
* @param listener listener invoked on completion
*/
void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
IndexShardSnapshotStatus snapshotStatus);
IndexShardSnapshotStatus snapshotStatus, ActionListener<Void> listener);

/**
* Restores snapshot of the shard.
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
Expand Down Expand Up @@ -53,9 +52,8 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.SnapshotFailedEngineException;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
Expand All @@ -80,7 +78,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -298,46 +295,33 @@ private void startNewSnapshots(SnapshotsInProgress snapshotsInProgress) {
}

private void startNewShards(SnapshotsInProgress.Entry entry, Map<ShardId, IndexShardSnapshotStatus> startedShards) {
final Snapshot snapshot = entry.snapshot();
final Map<String, IndexId> indicesMap = entry.indices().stream().collect(Collectors.toMap(IndexId::getName, Function.identity()));
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
for (final Map.Entry<ShardId, IndexShardSnapshotStatus> shardEntry : startedShards.entrySet()) {
final ShardId shardId = shardEntry.getKey();
final IndexId indexId = indicesMap.get(shardId.getIndexName());
assert indexId != null;
executor.execute(new AbstractRunnable() {

private final SetOnce<Exception> failure = new SetOnce<>();

@Override
public void doRun() {
final IndexShard indexShard =
indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id());
snapshot(indexShard, snapshot, indexId, shardEntry.getValue());
}

@Override
public void onFailure(Exception e) {
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e);
failure.set(e);
}

@Override
public void onRejection(Exception e) {
failure.set(e);
}

@Override
public void onAfter() {
final Exception exception = failure.get();
if (exception != null) {
notifyFailedSnapshotShard(snapshot, shardId, ExceptionsHelper.detailedMessage(exception));
} else {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
final Snapshot snapshot = entry.snapshot();
final Map<String, IndexId> indicesMap =
entry.indices().stream().collect(Collectors.toMap(IndexId::getName, Function.identity()));
for (final Map.Entry<ShardId, IndexShardSnapshotStatus> shardEntry : startedShards.entrySet()) {
final ShardId shardId = shardEntry.getKey();
final IndexShardSnapshotStatus snapshotStatus = shardEntry.getValue();
final IndexId indexId = indicesMap.get(shardId.getIndexName());
assert indexId != null;
snapshot(shardId, snapshot, indexId, snapshotStatus, new ActionListener<>() {
@Override
public void onResponse(final Void aVoid) {
if (logger.isDebugEnabled()) {
final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy();
logger.debug("snapshot ({}) completed to {} with {}", snapshot, snapshot.getRepository(), lastSnapshotStatus);
}
notifySuccessfulSnapshotShard(snapshot, shardId);
}
}
});
}

@Override
public void onFailure(Exception e) {
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e);
notifyFailedSnapshotShard(snapshot, shardId, ExceptionsHelper.detailedMessage(e));
}
});
}
});
}

/**
Expand All @@ -346,38 +330,37 @@ public void onAfter() {
* @param snapshot snapshot
* @param snapshotStatus snapshot status
*/
private void snapshot(final IndexShard indexShard, final Snapshot snapshot, final IndexId indexId,
final IndexShardSnapshotStatus snapshotStatus) {
final ShardId shardId = indexShard.shardId();
if (indexShard.routingEntry().primary() == false) {
throw new IndexShardSnapshotFailedException(shardId, "snapshot should be performed only on primary");
}
if (indexShard.routingEntry().relocating()) {
// do not snapshot when in the process of relocation of primaries so we won't get conflicts
throw new IndexShardSnapshotFailedException(shardId, "cannot snapshot while relocating");
}
private void snapshot(final ShardId shardId, final Snapshot snapshot, final IndexId indexId,
final IndexShardSnapshotStatus snapshotStatus, ActionListener<Void> listener) {
try {
final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id());
if (indexShard.routingEntry().primary() == false) {
throw new IndexShardSnapshotFailedException(shardId, "snapshot should be performed only on primary");
}
if (indexShard.routingEntry().relocating()) {
// do not snapshot when in the process of relocation of primaries so we won't get conflicts
throw new IndexShardSnapshotFailedException(shardId, "cannot snapshot while relocating");
}

final IndexShardState indexShardState = indexShard.state();
if (indexShardState == IndexShardState.CREATED || indexShardState == IndexShardState.RECOVERING) {
// shard has just been created, or still recovering
throw new IndexShardSnapshotFailedException(shardId, "shard didn't fully recover yet");
}
final IndexShardState indexShardState = indexShard.state();
if (indexShardState == IndexShardState.CREATED || indexShardState == IndexShardState.RECOVERING) {
// shard has just been created, or still recovering
throw new IndexShardSnapshotFailedException(shardId, "shard didn't fully recover yet");
}

final Repository repository = repositoriesService.repository(snapshot.getRepository());
try {
// we flush first to make sure we get the latest writes snapshotted
try (Engine.IndexCommitRef snapshotRef = indexShard.acquireLastIndexCommit(true)) {
final Repository repository = repositoriesService.repository(snapshot.getRepository());
Engine.IndexCommitRef snapshotRef = null;
try {
// we flush first to make sure we get the latest writes snapshotted
snapshotRef = indexShard.acquireLastIndexCommit(true);
original-brownbear marked this conversation as resolved.
Show resolved Hide resolved
repository.snapshotShard(indexShard.store(), indexShard.mapperService(), snapshot.getSnapshotId(), indexId,
snapshotRef.getIndexCommit(), snapshotStatus);
if (logger.isDebugEnabled()) {
final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy();
logger.debug("snapshot ({}) completed to {} with {}", snapshot, repository, lastSnapshotStatus);
}
snapshotRef.getIndexCommit(), snapshotStatus, ActionListener.runBefore(listener, snapshotRef::close));
} catch (Exception e) {
IOUtils.close(snapshotRef);
throw e;
}
} catch (SnapshotFailedEngineException | IndexShardSnapshotFailedException e) {
throw e;
} catch (Exception e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to snapshot", e);
listener.onFailure(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,23 @@ public void testRunAfter() {
}
}

public void testRunBefore() {
{
AtomicBoolean afterSuccess = new AtomicBoolean();
ActionListener<Object> listener =
ActionListener.runBefore(ActionListener.wrap(r -> {}, e -> {}), () -> afterSuccess.set(true));
listener.onResponse(null);
assertThat(afterSuccess.get(), equalTo(true));
}
{
AtomicBoolean afterFailure = new AtomicBoolean();
ActionListener<Object> listener =
ActionListener.runBefore(ActionListener.wrap(r -> {}, e -> {}), () -> afterFailure.set(true));
listener.onFailure(null);
assertThat(afterFailure.get(), equalTo(true));
}
}

public void testNotifyOnce() {
AtomicInteger onResponseTimes = new AtomicInteger();
AtomicInteger onFailureTimes = new AtomicInteger();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public boolean isReadOnly() {

@Override
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit
snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {
snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener<Void> listener) {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.lucene.util.IOSupplier;
import org.apache.lucene.util.TestUtil;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -99,10 +100,12 @@ public void testSnapshotAndRestore() throws IOException, InterruptedException {
IndexId indexId = new IndexId(idxSettings.getIndex().getName(), idxSettings.getUUID());

IndexCommit indexCommit = Lucene.getIndexCommit(Lucene.readSegmentInfos(store.directory()), store.directory());
final PlainActionFuture<Void> future1 = PlainActionFuture.newFuture();
runGeneric(threadPool, () -> {
IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing();
repository.snapshotShard(store, null, snapshotId, indexId, indexCommit,
snapshotStatus);
snapshotStatus, future1);
future1.actionGet();
IndexShardSnapshotStatus.Copy copy = snapshotStatus.asCopy();
assertEquals(copy.getTotalFileCount(), copy.getIncrementalFileCount());
});
Expand All @@ -124,9 +127,11 @@ public void testSnapshotAndRestore() throws IOException, InterruptedException {
SnapshotId incSnapshotId = new SnapshotId("test1", "test1");
IndexCommit incIndexCommit = Lucene.getIndexCommit(Lucene.readSegmentInfos(store.directory()), store.directory());
Collection<String> commitFileNames = incIndexCommit.getFileNames();
final PlainActionFuture<Void> future2 = PlainActionFuture.newFuture();
runGeneric(threadPool, () -> {
IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing();
repository.snapshotShard(store, null, incSnapshotId, indexId, incIndexCommit, snapshotStatus);
repository.snapshotShard(store, null, incSnapshotId, indexId, incIndexCommit, snapshotStatus, future2);
future2.actionGet();
IndexShardSnapshotStatus.Copy copy = snapshotStatus.asCopy();
assertEquals(2, copy.getIncrementalFileCount());
assertEquals(commitFileNames.size(), copy.getTotalFileCount());
Expand Down Expand Up @@ -198,4 +203,5 @@ private int indexDocs(Directory directory) throws IOException {
return docs;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -832,12 +832,14 @@ protected void snapshotShard(final IndexShard shard,
final Snapshot snapshot,
final Repository repository) throws IOException {
final IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing();
final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
try (Engine.IndexCommitRef indexCommitRef = shard.acquireLastIndexCommit(true)) {
Index index = shard.shardId().getIndex();
IndexId indexId = new IndexId(index.getName(), index.getUUID());

repository.snapshotShard(shard.store(), shard.mapperService(), snapshot.getSnapshotId(), indexId,
indexCommitRef.getIndexCommit(), snapshotStatus);
indexCommitRef.getIndexCommit(), snapshotStatus, future);
future.actionGet();
}

final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public boolean isReadOnly() {

@Override
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener<Void> listener) {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ public boolean isReadOnly() {

@Override
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener<Void> listener) {
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
}

Expand Down
Loading