Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Functionality to wait for in-flight audit indexing to complete #70008

Closed
droberts195 opened this issue Mar 5, 2021 · 14 comments
Closed
Assignees
Labels
>enhancement :ml/Transform Transform :ml Machine learning Team:ML Meta label for the ML team

Comments

@droberts195
Copy link
Contributor

Both ML jobs and transforms can write audit notifications when they are stopped/closed/deleted. These are written asynchronously, so can be indexed after the stop/close/delete API call returns.

In our integration test cleanup we use "wait for pending tasks" functionality to make sure these in-flight audit notifications have been indexed before all indices are deleted. If we didn't do this then notifications indices could be created after the test cleanup, causing the next test to fail due to an unexpected index existing.

As a result of #69469 we will have a similar requirement in our production code. When the reset API is called for ML or transforms deleting the jobs may trigger notifications, and these must either never be indexed or have finishing indexing before the reset API deletes indices.

Ideally the functionality should be implemented in such a way that the code can be shared between transforms and ML, like the auditor itself is.

The functionality needs to be able to wait for the auditors on every node in the cluster to abort or complete their work, not just the current node.

A more specialised version of the "wait for pending tasks" used in our test cleanup is likely to be one part of the solution, but it needs to wait for a very focused subset of tasks to complete, not all tasks, because the cluster may be busy while ML or transforms is being reset. So a way will need to be found to clearly identify activity of the ML/transform auditor in tasks API output.

Another piece of the solution may be a way to broadcast an instruction to the auditors on every node telling them to ignore subsequent requests to index notifications. Then there would need to be a way to broadcast another instruction telling them to accept audit messages again, which would be sent at the very end of the reset.

There may be other considerations that I am missing.

@droberts195 droberts195 added >enhancement :ml Machine learning :ml/Transform Transform labels Mar 5, 2021
@elasticmachine elasticmachine added the Team:ML Meta label for the ML team label Mar 5, 2021
@elasticmachine
Copy link
Collaborator

Pinging @elastic/ml-core (Team:ML)

@benwtrent
Copy link
Member

@droberts195 should this API support specific job/transform ids? It seems to me that it will:

  • Somehow check to see if anything jobs are running (possibly via some parameter from the transform/ml caller)
  • Blow up if there are jobs as they need to all be stopped
  • wait around for the appropriate tasks to finish (if all the ML/Transform jobs are stopped)

I am thinking that this will be an internal transport class that both ML and transform know about. The individual rest APIs will exist within each plugin and the plugin will provide the required info to the internal transport class. (or possible abstract class).

I think this will end up being a tad tricky as audit tasks are just indexing tasks...I am not sure there is a way separate them out of the fray. Will investigate.

@benwtrent benwtrent self-assigned this Mar 5, 2021
@benwtrent
Copy link
Member

What a thing of beauty:

"<REDACTED>" : {
          "node" : "<REDACTED>",
          "id" : 194,
          "type" : "transport",
          "action" : "indices:data/write/bulk",
          "description" : "requests[356], indices[.ml-anomalies-.write-fooo]",
          "start_time_in_millis" : 1616086432935,
          "running_time_in_nanos" : 4157092,
          "cancellable" : false,
          "headers" : { }
        },

So, it does look like we can grab "action" : "indices:data/write/bulk", requests with detailed=true and then interrogate the description for the indices.

The only problem is that we don't really know if the async audit messages have actually started writing the bulk request or if the parent process just kicked them off.

@benwtrent
Copy link
Member

The only problem is that we don't really know if the async audit messages have actually started writing the bulk request or if the parent process just kicked them off.

OK, this is incorrect. The auditor methods info|warn|error are all synchronous methods that kick off an asynchronous call for indexing.

Waiting for indices:data/write/bulk actions with .ml* indices may be enough.

@benwtrent
Copy link
Member

benwtrent commented Mar 18, 2021

@elastic/es-core-features

The issue: if we stop all the jobs an asynchronous bulk action might have kicked off that will recreate the system index after it has been deleted.

I think we can prevent this by not only waiting for ML related tasks to finish, but also waiting for indices:data/write/bulk tasks whose description contains *.ml-*.

This would require updating ListTasksRequest or BaseTasksRequest to allow matching on description. Or a completely new action that is effectively ListTasksRequest but with this added capability.

What do you think?

A similar solution would be implemented for transforms (but matching on *.transform* descriptions for bulk writes)

@benwtrent
Copy link
Member

@droberts195

I am not sure a completely new action is necessary. If we can extend ListsTasksRequest a bit, we could do something like:

client.admin()
                .cluster()
                .prepareListTasks()
                .setActions("xpack/ml/*", "indices:data/write/bulk")
                .setDescriptionMatch(MapBuilder.<String, String>newMapBuilder().put("indices:data/write/bulk", "*.ml-*"))
                .setWaitForCompletion(true)
                .execute(waitForPendingTasksListener);

Once all the tasks are stopped.

@droberts195
Copy link
Contributor Author

The only problem is that we don't really know if the async audit messages have actually started writing the bulk request or if the parent process just kicked them off.

Yes, I am thinking we might need an extra flag in the ML custom cluster state, to indicate "reset in progress". Or possibly we could reuse the "upgrade mode" flag.

The auditor then shouldn't commence writing an audit message if this flag is set.

We should also make sure that every action that triggers an audit message triggers it before returning. This means that although an audit message may be indexed after, say, close job has returned, the caller at least knows the audit message had been requested before close job returned. This means that a client that calls close job immediately followed by reset ML won't suffer from a leaked audit message (once the other fixes for this issue are complete).

At the very very end of the ML reset process, after waiting for in-flight ML indexing requests to complete, we should then reset the ML custom cluster state to empty. (This will also make resetting ML unset ML upgrade mode, which it doesn't at the moment but should do.)

If we can extend ListsTasksRequest a bit

Yes, I agree, that would be a nice enhancement that would make waiting for bulk ML indexing to complete much easier.

@benwtrent
Copy link
Member

benwtrent commented Mar 22, 2021

@droberts195

The auditor then shouldn't commence writing an audit message if this flag is set.

I don't think this will work. The audit message method itself is synchronous. Threads call it and then continue executing. The asynchronous portion is the bulk indexing action itself. Unless we update bulk indexing to look for a cluster state flag (which is a bad idea), I don't think this will work.

The only option is to wait for bulk indexing requests for ml indexes to finish.

Of course, unless I am missing that the actual Auditor#error|warn|info methods are called asynchronously on shutdown somewhere once the tasks go away.

If we wait for all the related ML tasks to finish, and then also wait for bulk indexing requests to finish, I am not sure how much more we can ensure the auditor has finished working. The only other thing would be to have some sort of latch on all the auditor objects that gets trigger when the bulk indexing requests finish (the onResults|onFailure handlers, which only write a logging message for now).

But, this would definitely be non-trivial as, for some code organization al reasons, the auditor object is not always from the same singleton.

@droberts195
Copy link
Contributor Author

To clarify, I definitely think the main part of the work will be extending ListTasksRequest and then using that.

The audit message method itself is synchronous. Threads call it and then continue executing. The asynchronous portion is the bulk indexing action itself. Unless we update bulk indexing to look for a cluster state flag (which is a bad idea), I don't think this will work.

True, writeDoc() is calls bulk index synchronously. However, writeBacklog() is called from an asynchronous listener. That is one bit that needs protecting. But it wouldn't hurt to check in writeDoc() too - it would make the situation where an audit message is requested after a reset has already started more efficient.

I definitely agree that bulk indexing shouldn't check any ML cluster state flag.

Of course, unless I am missing that the actual Auditor#error|warn|info methods are called asynchronously on shutdown somewhere once the tasks go away.

This is what I was getting at with:

We should also make sure that every action that triggers an audit message triggers it before returning.

In other words the bulk index operation is in-flight at the time the ML action that triggered it returns.

I did just think of one place where we call Auditor#error|warn|info completely independently of an action, and that's in the ML daily maintenance service. That already checks for upgrade mode when it starts. It could easily check for a "reset in progress" flag at the same time. Then if daily maintenance was in progress before a reset started the reset would probably make it fail or else complete very quickly, but we'd want to stop it writing audit messages about this, so that's a place where having the auditor ignore audit requests during a reset would be helpful.

I am coming to the conclusion that reusing the upgrade mode flag would be impossible, and we will need a separate "reset in progress" flag, because I think upgrade mode blocks most of our APIs that we might want to use as part of the reset process, like close job and delete job.

The only other thing would be to have some sort of latch on all the auditor objects that gets trigger when the bulk indexing requests finish

I agree we shouldn't do this. We need to be synchronizing cluster-wide, not just on one node, so latches that only affect one JVM will cause more problems than they solve.

@benwtrent
Copy link
Member

completely independently of an action, and that's in the ML daily maintenance service.

Ah, I see what you mean. For sure. Telling that service to not write would be wise.

We would need a better integration with the reset API to achieve this. As right now, we only get signaled at the start of the reset action, not when it completes.

@benwtrent
Copy link
Member

@elastic/es-core-infra what do y'all think about extending ListsTasksRequest to allow for filtering tasks based on task action pattern + description pattern? It could just be an internal thing for now

@gwbrown
Copy link
Contributor

gwbrown commented Mar 22, 2021

Tagging in @jaymode and @williamrandolph specifically as this relates to system indices (in particular, allowing ML to properly clean up its indices). Specifically for this question:

what do y'all think about extending ListsTasksRequest to allow for filtering tasks based on task action pattern + description pattern?

I'm not familiar enough this API to answer immediately, and my priority is node shutdown at the moment, but I want to make sure relevant folks see it.

@benwtrent
Copy link
Member

@jaymode @williamrandolph the change was not too difficult to add: #70759

Please let me know what y'all think.

benwtrent added a commit that referenced this issue Mar 29, 2021
This commit adds a new `descriptions` parameter for listing tasks in the transport layer. 

This allows tasks to be filtered by `actions` and `descriptions` so that matching specific tasks
within a particular action is possible. 

related to: #70008
benwtrent added a commit to benwtrent/elasticsearch that referenced this issue Mar 29, 2021
This commit adds a new `descriptions` parameter for listing tasks in the transport layer. 

This allows tasks to be filtered by `actions` and `descriptions` so that matching specific tasks
within a particular action is possible. 

related to: elastic#70008
benwtrent added a commit that referenced this issue Mar 29, 2021
* Add descriptions parameter for listing tasks (#70759)

This commit adds a new `descriptions` parameter for listing tasks in the transport layer. 

This allows tasks to be filtered by `actions` and `descriptions` so that matching specific tasks
within a particular action is possible. 

related to: #70008
benwtrent added a commit that referenced this issue Mar 30, 2021
…ion (#71011)

This completes the machine learning feature state cleanup integration.

This commit handles waiting for machine learning tasks to complete and adds a new
field to the ML Metadata cluster state to indicate when a reset is in progress for machine 
learning.

relates: #70008
benwtrent added a commit to benwtrent/elasticsearch that referenced this issue Mar 30, 2021
…ion (elastic#71011)

This completes the machine learning feature state cleanup integration.

This commit handles waiting for machine learning tasks to complete and adds a new
field to the ML Metadata cluster state to indicate when a reset is in progress for machine
learning.

relates: elastic#70008
benwtrent added a commit that referenced this issue Mar 31, 2021
…tegration (#71011) (#71071)

* [ML] complete machine learning plugin feature state clean up integration (#71011)

This completes the machine learning feature state cleanup integration.

This commit handles waiting for machine learning tasks to complete and adds a new
field to the ML Metadata cluster state to indicate when a reset is in progress for machine
learning.

relates: #70008

* [ML] fixing feature reset integration tests (#71081)

previously created pipelines referencing ML models were not being appropriately deleted in upstream tests.

This commit ensures that machine learning removes relevant pipelines from cluster state after tests complete

closes #71072
benwtrent added a commit that referenced this issue Apr 7, 2021
This commit updates transform feature reset to:

- wait for transform tasks to complete
- wait for all indexing actions to transform indices to complete
- and prevents transform audit messages from being written while the reset is being processed

related to #70008 & #69581
benwtrent added a commit to benwtrent/elasticsearch that referenced this issue Apr 7, 2021
This commit updates transform feature reset to:

- wait for transform tasks to complete
- wait for all indexing actions to transform indices to complete
- and prevents transform audit messages from being written while the reset is being processed

related to elastic#70008 & elastic#69581
benwtrent added a commit that referenced this issue Apr 7, 2021
* [Transform] finalize feature reset integration (#71133)

This commit updates transform feature reset to:

- wait for transform tasks to complete
- wait for all indexing actions to transform indices to complete
- and prevents transform audit messages from being written while the reset is being processed

related to #70008 & #69581
@benwtrent
Copy link
Member

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
>enhancement :ml/Transform Transform :ml Machine learning Team:ML Meta label for the ML team
Projects
None yet
Development

No branches or pull requests

4 participants