Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent file chunk fetching for CCR restore #38495

Merged
merged 12 commits into from
Feb 8, 2019

Conversation

ywelsch
Copy link
Contributor

@ywelsch ywelsch commented Feb 6, 2019

Adds the ability to fetch chunks from different files in parallel, configurable using the new ccr.indices.recovery.max_concurrent_file_chunks setting, which defaults to 5.

The implementation uses the parallel file writer functionality that is also used by peer recoveries.

@ywelsch ywelsch added >enhancement WIP v7.0.0 :Distributed Indexing/CCR Issues around the Cross Cluster State Replication features v6.7.0 labels Feb 6, 2019
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-distributed

@ywelsch
Copy link
Contributor Author

ywelsch commented Feb 6, 2019

@elasticmachine run elasticsearch-ci/2

@jasontedor jasontedor added v8.0.0 and removed v7.0.0 labels Feb 6, 2019
@ywelsch ywelsch added v7.0.0 and removed v8.0.0 labels Feb 6, 2019
@ywelsch ywelsch requested a review from Tim-Brooks February 6, 2019 18:01
@ywelsch ywelsch removed the WIP label Feb 6, 2019
Copy link
Contributor

@Tim-Brooks Tim-Brooks left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like you have a good idea that this works based upon the benchmark usages. I don't see anything wrong with it. It comes across as pretty complicated. I don't know if this is like a first step towards making the whole process asynchronous. But in the world where there is essentially a dedicated generic thread waiting on everything, I think the synchronization and all the thread interactions are difficult to reason about.

In the world where there is a dedicated generic thread I would do something like this and the get file chunk callbacks would just enqueue the results of the request in the blocking queue. And then all the operations (writing locally, rate limiting, error handling, etc) would be performed by the generic thread.

            class Result {
                private final Exception exception;
                private final FileInfo fileInfo;
                private final BytesReference content;

                Result(FileInfo fileInfo, Exception exception, BytesReference content) {
                    this.exception = exception;
                    this.fileInfo = fileInfo;
                    this.content = content;
                }

                private boolean isError(){
                    return exception != null;
                }
            }

            int inflightRequests = 0;
            ArrayDeque<FileInfo> remainingFiles = new ArrayDeque<>(filesToRecover);
            ArrayBlockingQueue<Result> queue = new ArrayBlockingQueue<>(ccrSettings.getMaxConcurrentFileChunks());

But maybe there is a reason why you took dispatch to generic threads on the callback approach. So pursue whatever makes sense to you.

final int actualChunkSize = r.getChunk().length();
final long nanosPaused = ccrSettings.getRateLimiter().maybePause(actualChunkSize);
throttleListener.accept(nanosPaused);
final boolean lastChunk = r.getOffset() + actualChunkSize >= fileToRecover.length();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should assert that the file length is not greater? I think we should continue to expect that the follower has a correct understanding of the length of the files it is fetching.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see a000339

@@ -232,6 +224,11 @@ private long readFileBytes(String fileName, BytesReference reference) throws IOE
}
});

long remainingSize = indexInput.length() - indexInput.getFilePointer();
if (remainingSize < reference.length()) {
reference = reference.slice(0, Math.toIntExact(remainingSize));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not wild about the read method slicing internally and returning a different bytes reference. I also don't really understand why it is necessary as the follower should never request more data than is remaining in a file (you continue to only request the remaining bytes in this PR).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current API for getting a file is more iterator-oriented (i.e. read next, read next, read next), which is why I find it weird that the target will have to track information about where it is at in that file and make sure to request the right size. I have reverted this for now.


final long requestSeqId = requestSeqIdTracker.generateSeqNo();
try {
remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, request,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have lost the timeout. You can configure request options by overriding a method on the action.

Unfortunately it is not apparent to me if there is a way to get them back while using callbacks. We could always move the restore process into a local transport action and use the guice injections to get the RemoteClusterService and TransportService. But that is probably beyond the scope of this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I would like us to investigate this in a follow-up.

@Override
protected void doRun() throws Exception {
final int actualChunkSize = r.getChunk().length();
final long nanosPaused = ccrSettings.getRateLimiter().maybePause(actualChunkSize);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like we should throttle after writing the file chunk? Since I think the rate limiting activity more about network activity, we might as well spend some time writing to disk before determining if we want to pause.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've chosen to use the same way we do for peer recovery, to keep things consistent. I'm fine if we want to follow-up on this with a change to both.

@ywelsch
Copy link
Contributor Author

ywelsch commented Feb 7, 2019

But maybe there is a reason why you took dispatch to generic threads on the callback approach

I want to make the restore process fully async, and the current approach here gets us closer to that. I've refactored some of the code to make it hopefully clearer what's going on.

@ywelsch ywelsch requested a review from Tim-Brooks February 8, 2019 09:11
Copy link
Contributor

@Tim-Brooks Tim-Brooks left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@Tim-Brooks Tim-Brooks merged commit 9b75a70 into elastic:master Feb 8, 2019
Tim-Brooks pushed a commit to Tim-Brooks/elasticsearch that referenced this pull request Feb 9, 2019
Adds the ability to fetch chunks from different files in parallel, configurable using the new `ccr.indices.recovery.max_concurrent_file_chunks` setting, which defaults to 5 in this PR.

The implementation uses the parallel file writer functionality that is also used by peer recoveries.
Tim-Brooks pushed a commit to Tim-Brooks/elasticsearch that referenced this pull request Feb 9, 2019
Adds the ability to fetch chunks from different files in parallel, configurable using the new `ccr.indices.recovery.max_concurrent_file_chunks` setting, which defaults to 5 in this PR.

The implementation uses the parallel file writer functionality that is also used by peer recoveries.
Tim-Brooks pushed a commit to Tim-Brooks/elasticsearch that referenced this pull request Feb 9, 2019
Adds the ability to fetch chunks from different files in parallel, configurable using the new `ccr.indices.recovery.max_concurrent_file_chunks` setting, which defaults to 5 in this PR.

The implementation uses the parallel file writer functionality that is also used by peer recoveries.
jasontedor added a commit to liketic/elasticsearch that referenced this pull request Feb 10, 2019
* master: (1159 commits)
  Fix timezone fallback in ingest processor (elastic#38407)
  Avoid polluting download stats on builds (elastic#38660)
  SQL: Prevent grouping over grouping functions (elastic#38649)
  SQL: Relax StackOverflow circuit breaker for constants (elastic#38572)
  [DOCS] Fixes broken migration links (elastic#38655)
  Drop support for the low-level REST client on JDK 7 (elastic#38540)
  [DOCS] Adds placeholders for v8 highlights, breaking changes, release notes (elastic#38641)
  fix dissect doc "ip" --> "clientip" (elastic#38545)
  Concurrent file chunk fetching for CCR restore (elastic#38495)
  make DateMathIndexExpressionsIntegrationIT more resilient (elastic#38473)
  SQL: Replace joda with java time (elastic#38437)
  Add fuzziness example (elastic#37194) (elastic#38648)
  Mute AnalysisModuleTests#testStandardFilterBWC (elastic#38636)
  add geotile_grid ref to asciidoc (elastic#38632)
  Enable Dockerfile from artifacts.elastic.co (elastic#38552)
  Mute FollowerFailOverIT testFailOverOnFollower (elastic#38634)
  Account for a possible rolled over file while reading the audit log file (elastic#34909)
  Mute failure in InternalEngineTests (elastic#38622)
  Fix Issue with Concurrent Snapshot Init + Delete (elastic#38518)
  Refactor ZonedDateTime.now in millis resolution (elastic#38577)
  ...
jasontedor added a commit to jasontedor/elasticsearch that referenced this pull request Feb 11, 2019
* master:
  Fix timezone fallback in ingest processor (elastic#38407)
  Avoid polluting download stats on builds (elastic#38660)
  SQL: Prevent grouping over grouping functions (elastic#38649)
  SQL: Relax StackOverflow circuit breaker for constants (elastic#38572)
  [DOCS] Fixes broken migration links (elastic#38655)
  Drop support for the low-level REST client on JDK 7 (elastic#38540)
  [DOCS] Adds placeholders for v8 highlights, breaking changes, release notes (elastic#38641)
  fix dissect doc "ip" --> "clientip" (elastic#38545)
  Concurrent file chunk fetching for CCR restore (elastic#38495)
  make DateMathIndexExpressionsIntegrationIT more resilient (elastic#38473)
  SQL: Replace joda with java time (elastic#38437)
  Add fuzziness example (elastic#37194) (elastic#38648)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants