Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inline TransportReplAct#createReplicatedOperation #41197

Conversation

DaveCTurner
Copy link
Contributor

TransportReplicationAction.AsyncPrimaryAction#createReplicatedOperation
exists so it can be overridden in tests. This commit re-works these tests to
use a real ReplicationOperation and inlines the now-unnecessary method.

Relates #40706.

`TransportReplicationAction.AsyncPrimaryAction#createReplicatedOperation`
exists so it can be overridden in tests. This commit re-works these tests to
use a real `ReplicationOperation` and inlines the now-unnecessary method.

Relates elastic#40706.
@DaveCTurner DaveCTurner added >non-issue :Distributed Indexing/CRUD A catch all label for issues around indexing, updating and getting a doc by id. Not search. v8.0.0 v7.2.0 labels Apr 15, 2019
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-distributed

@@ -821,57 +809,50 @@ public void testCounterOnPrimary() throws Exception {
Request request = new Request(shardId);
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
ReplicationTask task = maybeTask();
int i = randomInt(3);
final boolean throwExceptionOnCreation = i == 1;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case doesn't seem to be possible in production, so I removed it.

Copy link
Contributor

@henningandersen henningandersen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

Thanks @DaveCTurner, I left 3 comments to consider.

@Override
public void onFailure(Exception e) {
handleException(primaryShardReference, e);
final ActionListener<Response> referenceClosingListener = ActionListener.wrap(response -> {
Copy link
Contributor

@henningandersen henningandersen Apr 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find the separation into two listeners artificial and a bit confusing. I suggest something like following instead:

                    final ActionListener<Response> globalCheckpointSyncingListener = ActionListener.wrap(response -> {
                        if (syncGlobalCheckpointAfterOperation) {
                            final IndexShard shard = primaryShardReference.indexShard;
                            try {
                                shard.maybeSyncGlobalCheckpoint("post-operation");
                            } catch (final Exception e) {
                                // only log non-closed exceptions
                                if (ExceptionsHelper.unwrap(
                                    e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
                                    // intentionally swallow, a missed global checkpoint sync should not fail this operation
                                    logger.info(
                                        new ParameterizedMessage(
                                            "{} failed to execute post-operation global checkpoint sync", shard.shardId()), e);
                                }
                            }
                        }
                        primaryShardReference.close(); // release shard operation lock before responding to caller
                        setPhase(replicationTask, "finished");
                        onCompletionListener.onResponse(response);
                    }, e -> handleException(primaryShardReference, e));

                    new ReplicationOperation<>(primaryRequest.getRequest(), primaryShardReference,
                        ActionListener.wrap(result -> result.respond(globalCheckpointSyncingListener),
                            globalCheckpointSyncingListener::onFailure),
                        newReplicasProxy(), logger, actionName, primaryRequest.getPrimaryTerm()).execute();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In isolation I agree, but this separation will be important in a followup so I hope it's ok to leave it like it is. The global checkpoint syncing is the responsibility of the primary, whereas the cleanup of the replication task and the primaryShardReference is the responsibility of the reroute/delegation phase.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}
}.run();
}.new AsyncPrimaryAction(primaryRequest, ActionListener.wrap(listener::onResponse, throwable -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we instead of using ActionListener.wrap just assert that listener.isDone() and do listener.get() like in the test above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it seems we can. I pushed 1350c0f.

} else {
throw e;
}
}

if (throwExceptionOnRun || respondWithError) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think it is more logical to put this inside the try-catch (after listener.get()) and remove the return above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I pushed fb1f7eb.

@@ -376,7 +377,8 @@ public void handleException(TransportException exp) {
// intentionally swallow, a missed global checkpoint sync should not fail this operation
logger.info(
new ParameterizedMessage(
"{} failed to execute post-operation global checkpoint sync", shard.shardId()), e);
"{} failed to execute post-operation global checkpoint sync",
primaryShardReference.routingEntry().shardId()), e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I follow this change, I cannot figure out how this makes a difference. I think using just shard.shardId() is simpler unless there is a reason for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More foreshadowing of changes to come, but I can defer this until later.

@DaveCTurner DaveCTurner merged commit 5708796 into elastic:master Apr 16, 2019
@DaveCTurner DaveCTurner deleted the 2019-04-15-inline-createReplicatedOperation branch April 16, 2019 12:03
DaveCTurner added a commit that referenced this pull request Apr 16, 2019
`TransportReplicationAction.AsyncPrimaryAction#createReplicatedOperation`
exists so it can be overridden in tests. This commit re-works these tests to
use a real `ReplicationOperation` and inlines the now-unnecessary method.

Relates #40706.
gurkankaymak pushed a commit to gurkankaymak/elasticsearch that referenced this pull request May 27, 2019
`TransportReplicationAction.AsyncPrimaryAction#createReplicatedOperation`
exists so it can be overridden in tests. This commit re-works these tests to
use a real `ReplicationOperation` and inlines the now-unnecessary method.

Relates elastic#40706.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed Indexing/CRUD A catch all label for issues around indexing, updating and getting a doc by id. Not search. >non-issue v7.2.0 v8.0.0-alpha1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants