Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Rollup] Job deletion should be invoked on the allocated task #34574

Merged

Conversation

polyfractal
Copy link
Contributor

We should delete a job by directly talking to the allocated task and telling it to shutdown. Today we shut down a job via the persistent task framework. This is not ideal because, while the job has been removed from the persistent task CS, the allocated task continues to live until it gets the shutdown message.

This means a user can delete a job, immediately delete the rollup index, and then see new documents appear in the just-deleted index. This happens because the indexer in the allocated task is still running and indexes a few more documents before getting the shutdown command.

In this PR, the transport action is changed to a TransportTasksAction, and we invoke onCancelled() directly on the matching job. The race condition still exists after this PR (albeit less likely), but this was a precursor to fixing the issue and a self-contained chunk of code. I'll followup with a second PR to fix the race itself.

We should delete a job by directly talking to the allocated task
and telling it to shutdown.  Today we shut down a job via the persistent
task framework.  This is not ideal because, while the job has been
removed from the persistent task CS, the allocated task continues
to live until it gets the shutdown message.

This means a user can delete a job, immediately delete the rollup index,
and then see new documents appear in the just-deleted index.  This
happens because the indexer in the allocated task is still running and
indexes a few more documents before getting the shutdown command.

In this PR, the transport action is changed to a TransportTasksAction,
and we invoke `onCancelled()` directly on the matching job.  There is
still a potential for a race if a bulk happens right before the task
shuts down, the async bulk will complete and cause the same situation.
The lag between signal and stop is shorter, so the chance is less, but
it still exists. We'll fix this in a followup PR.
@polyfractal polyfractal added v7.0.0 >refactoring :StorageEngine/Rollup Turn fine-grained time-based data into coarser-grained data v6.5.0 labels Oct 17, 2018
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-search-aggs

Copy link
Contributor

@colings86 colings86 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@polyfractal I left a comment but I also wonder whether its worth asking someone more familiar with the persistent tasks framework to look at this as well?

// If there are more, in production it should be ok as long as they are acknowledge shutting down.
// But in testing we'd like to know there were more than one hence the assert
assert tasks.size() == 1;
boolean cancelled = tasks.stream().anyMatch(DeleteRollupJobAction.Response::isDeleted);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the comment above should this not look for all tasks in the stream being deleted rather than just one?

@polyfractal
Copy link
Contributor Author

Oops, yes, I meant to ping @martijnvg as well but forgot. Mvg would you mind taking a look at the persistent-task changes here too?

To simplify the synchronization, we decided that only a STOPPED
job can be deleted.  This commit enforces that check at the task level.

The rest of the changes are just to facilitate percolating this
information back to the user:

- Common XContent output for failed task/node is moved out of
  ListTasksResponse and into the superclass for general use
- The Rest layer is adjusted to throw a 5xx error if there are failures
- Tweak to the acknowledged logic to account for task failures

Also updated docs and tests
@polyfractal
Copy link
Contributor Author

Chatted with @jimczi this morning, and we came up with a slightly different plan to help keep the synchronization simple:

  1. DeleteJob talks to the allocated task directly, but only allows deletion of STOPPED jobs
  2. StopJob is modified to include a wait_for_stopped flag which blocks the call until the indexer actually stops, instead of setting the flag and returning
  3. @jimczi is working on a state refactor which should simplify a lot of the synchronization complexity we have right now. This can proceed at it's leisure because 1) and 2) fix the immediate issue.

I just pushed a commit to accomplish 1), and 2) will be done in a followup PR.

The workflow is a little more irritating now (user must stop a job then delete). But it felt more natural for a "stop" operation to block than a "delete" operation, and it keeps life simple internally.

final DiscoveryNodes nodes = state.nodes();

if (nodes.isLocalNodeElectedMaster()) {
PersistentTasksCustomMetaData pTasksMeta = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is better than checking the cluster state on a non elected master node, but it is still possible that this cluster state is stale if for example multiple delete job requests for the same job id are executed concurrently. I think reading the cluster state from cluster state update thread will fix this properly (ClusterService#submitStateUpdateTask(...) without updating the cluster state). Maybe this approach is not worth it, considering that invoking onCancelled() twice has no negative impact.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that's good to know for future reference.

I think we're ok here because as you said cancelling twice is fine. onCancelled() will set state to ABORTING... if the indexer was idle during the first request shutdown() will be called directly. If not, it will defer until the indexer finishes.

And in both cases, the second request will stay at ABORTING and won't call shutdown().

I think we'd still have stale CS issues anyhow, since we're shutting down at the task first then alerting the master. So the task could shutdown, send the markAsCompleted() request, a second delete arrives at the master and checks state and sees the task is still alive before the markAsCompleted() shows up.

Since the rollup cleanup code was moved to ESRestTestCase,
this code is redundant (and throws errors since it is out of date)
@polyfractal
Copy link
Contributor Author

@jimczi @colings86 would you mind taking another look with the current changes?

I'll kick a CI run off once github/CI come back to life.

Copy link
Contributor

@jimczi jimczi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left some minor comments, LGTM otherwise

if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DeleteRollupJobAction.Response response = (DeleteRollupJobAction.Response) o;
return acknowledged == response.acknowledged;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: shouldn't we do super.equals first (and have an equals impl in BaseTasksResponse) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Thanks


@Override
public int hashCode() {
return Objects.hash(acknowledged);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here ?

@polyfractal polyfractal merged commit 4dbf498 into elastic:master Oct 23, 2018
polyfractal added a commit that referenced this pull request Oct 23, 2018
We should delete a job by directly talking to the allocated
task and telling it to shutdown. Today we shut down a job
via the persistent task framework. This is not ideal because,
while the job has been removed from the persistent task
CS, the allocated task continues to live until it gets the
shutdown message.

This means a user can delete a job, immediately delete
the rollup index, and then see new documents appear in
 the just-deleted index. This happens because the indexer
 in the allocated task is still running and indexes a few
more documents before getting the shutdown command.

In this PR, the transport action is changed to a TransportTasksAction,
and we invoke onCancelled() directly on the matching job.
The race condition still exists after this PR (albeit less likely),
but this was a precursor to fixing the issue and a self-contained
chunk of code. A second PR will followup to fix the race itself.
jasontedor added a commit to jasontedor/elasticsearch that referenced this pull request Oct 23, 2018
* master: (24 commits)
  ingest: better support for conditionals with simulate?verbose (elastic#34155)
  [Rollup] Job deletion should be invoked on the allocated task (elastic#34574)
  [DOCS] .Security index is never auto created (elastic#34589)
  CCR: Requires soft-deletes on the follower (elastic#34725)
  re-enable bwc tests (elastic#34743)
  Empty GetAliases authorization fix (elastic#34444)
  INGEST: Document Processor Conditional (elastic#33388)
  [CCR] Add total fetch time leader stat (elastic#34577)
  SQL: Support pattern against compatible indices (elastic#34718)
  [CCR] Auto follow pattern APIs adjustments (elastic#34518)
  [Test] Remove dead code from ExceptionSerializationTests (elastic#34713)
  A small typo in migration-assistance doc (elastic#34704)
  ingest: processor stats (elastic#34724)
  SQL: Implement IN(value1, value2, ...) expression. (elastic#34581)
  Tests: Add checks to GeoDistanceQueryBuilderTests (elastic#34273)
  INGEST: Rename Pipeline Processor Param. (elastic#34733)
  Core: Move IndexNameExpressionResolver to java time (elastic#34507)
  [DOCS] Force Merge: clarify execution and storage requirements (elastic#33882)
  TESTING.asciidoc fix examples using forbidden annotation (elastic#34515)
  SQL: Implement `CONVERT`, an alternative to `CAST` (elastic#34660)
  ...
kcm pushed a commit that referenced this pull request Oct 30, 2018
We should delete a job by directly talking to the allocated 
task and telling it to shutdown. Today we shut down a job 
via the persistent task framework. This is not ideal because, 
while the job has been removed from the persistent task 
CS, the allocated task continues to live until it gets the 
shutdown message.

This means a user can delete a job, immediately delete 
the rollup index, and then see new documents appear in
 the just-deleted index. This happens because the indexer
 in the allocated task is still running and indexes a few 
more documents before getting the shutdown command.

In this PR, the transport action is changed to a TransportTasksAction, 
and we invoke onCancelled() directly on the matching job. 
The race condition still exists after this PR (albeit less likely), 
but this was a precursor to fixing the issue and a self-contained
chunk of code. A second PR will followup to fix the race itself.
@colings86 colings86 removed the v7.0.0 label Feb 7, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
>refactoring :StorageEngine/Rollup Turn fine-grained time-based data into coarser-grained data v6.5.0 v7.0.0-beta1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants