Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] update results persisting service respect node shutdown #66128

Conversation

benwtrent
Copy link
Member

When a node is shutting down, any scheduled retry tasks should be cancelled.

Also, any currently finishing but failed tasks should not be retried.

@elasticmachine
Copy link
Collaborator

Pinging @elastic/ml-core (:ml)

@benwtrent
Copy link
Member Author

I ran this on both master and 7.x branch.

Below are some logs from a particular 7.x run.

Here it can be seen that even though persisting data count stats is currently being retried, it is cancelled and is NOT retried allowing node_t3 to exit.

 1> [2020-12-09T10:21:05,393][INFO ][o.e.t.InternalTestCluster] [testClusterWithTwoMlNodes_RunsDatafeed_GivenOriginalNodeGoesDown] Closing node [node_t3]
  1> [2020-12-09T10:21:05,393][INFO ][o.e.n.Node               ] [testClusterWithTwoMlNodes_RunsDatafeed_GivenOriginalNodeGoesDown] stopping ...
  1> [2020-12-09T10:21:05,394][INFO ][o.e.x.m.j.p.DataCountsReporter] [node_t3] [test-node-goes-down-while-running-job] 30000 records written to autodetect; missingFieldCount=0, invalidDateCount=0, outOfOrderCount=0
  1> [2020-12-09T10:21:05,394][INFO ][o.e.c.c.Coordinator      ] [node_t3] master node [{node_t1}{IQTW3u0aRdWgrEkziAQjQw}{-1xcJXdvTFW16w4QtnGcyg}{127.0.0.1}{127.0.0.1:56734}{m}{xpack.installed=true}] failed, restarting discovery
  1> org.elasticsearch.transport.NodeDisconnectedException: [node_t1][127.0.0.1:56734][disconnected] disconnected
  1> [2020-12-09T10:21:05,395][DEBUG][o.e.x.m.u.p.ResultsPersisterService] [node_t3] [test-node-goes-down-while-running-job] retrying cancelled for action [index]
  1> org.elasticsearch.common.util.CancellableThreads$ExecutionCancelledException: Node is shutting down
  1>    at org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService.bulkIndexWithRetry(ResultsPersisterService.java:192) [main/:?]
  1>    at org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService.bulkIndexWithRetry(ResultsPersisterService.java:146) [main/:?]
  1>    at org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService.indexWithRetry(ResultsPersisterService.java:139) [main/:?]
  1>    at org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister.persistDataCounts(JobDataCountsPersister.java:62) [main/:?]
  1>    at org.elasticsearch.xpack.ml.job.process.DataCountsReporter.finishReporting(DataCountsReporter.java:237) [main/:?]
  1>    at org.elasticsearch.xpack.ml.job.process.autodetect.writer.JsonDataToProcessWriter.write(JsonDataToProcessWriter.java:72) [main/:?]
  1>    at org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectCommunicator.lambda$writeToJob$1(AutodetectCommunicator.java:127) [main/:?]
  1>    at org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectCommunicator$1.doRun(AutodetectCommunicator.java:382) [main/:?]
  1>    at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:737) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectWorkerExecutorService.start(AutodetectWorkerExecutorService.java:94) [main/:?]
  1>    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
  1>    at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
  1>    at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:678) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
  1>    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
  1>    at java.lang.Thread.run(Thread.java:834) [?:?]
  1> [2020-12-09T10:21:05,396][ERROR][o.e.x.m.j.p.JobDataCountsPersister] [node_t3] [test-node-goes-down-while-running-job] Failed persisting data_counts stats
  1> org.elasticsearch.common.util.CancellableThreads$ExecutionCancelledException: Node is shutting down
  1>    at org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService.bulkIndexWithRetry(ResultsPersisterService.java:192) ~[main/:?]
  1>    at org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService.bulkIndexWithRetry(ResultsPersisterService.java:146) ~[main/:?]
  1>    at org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService.indexWithRetry(ResultsPersisterService.java:139) ~[main/:?]
  1>    at org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister.persistDataCounts(JobDataCountsPersister.java:62) [main/:?]
  1>    at org.elasticsearch.xpack.ml.job.process.DataCountsReporter.finishReporting(DataCountsReporter.java:237) [main/:?]
  1>    at org.elasticsearch.xpack.ml.job.process.autodetect.writer.JsonDataToProcessWriter.write(JsonDataToProcessWriter.java:72) [main/:?]
  1>    at org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectCommunicator.lambda$writeToJob$1(AutodetectCommunicator.java:127) [main/:?]
  1>    at org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectCommunicator$1.doRun(AutodetectCommunicator.java:382) [main/:?]
  1>    at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:737) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectWorkerExecutorService.start(AutodetectWorkerExecutorService.java:94) [main/:?]
  1>    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
  1>    at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
  1>    at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:678) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
  1>    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
  1>    at java.lang.Thread.run(Thread.java:834) [?:?]
  1> [2020-12-09T10:21:05,398][INFO ][o.e.c.r.a.AllocationService] [node_t1] updating number_of_replicas to [0] for indices [.ml-annotations-6, .ml-anomalies-shared, .ml-notifications-000001, .ml-config, .ml-state-000001]
  1> [2020-12-09T10:21:05,398][WARN ][o.e.t.TransportService   ] [node_t3] Transport response handler not found of id [295]
  1> [2020-12-09T10:21:05,399][INFO ][o.e.c.s.MasterService    ] [node_t1] node-left[{node_t3}{ifo3MGt5SgC_veAGI__1uw}{ny4CJTPCQoWzbtEe4fNGsw}{127.0.0.1}{127.0.0.1:56745}{dl}{ml.machine_memory=68719476736, ml.max_open_jobs=20, xpack.installed=true, ml.max_jvm_size=536870912} reason: disconnected], term: 1, version: 52, delta: removed {{node_t3}{ifo3MGt5SgC_veAGI__1uw}{ny4CJTPCQoWzbtEe4fNGsw}{127.0.0.1}{127.0.0.1:56745}{dl}{ml.machine_memory=68719476736, ml.max_open_jobs=20, xpack.installed=true, ml.max_jvm_size=536870912}}
  1> [2020-12-09T10:21:05,396][INFO ][o.e.x.m.u.p.ResultsPersisterService] [node_t3] [test-node-goes-down-while-running-job] should not retry index after [1] attempts
  1> org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService$RecoverableException: null
  1>    at org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService$BulkRetryableAction.lambda$new$0(ResultsPersisterService.java:293) [main/:?]
  1>    at org.elasticsearch.action.ActionListener$1.onResponse(ActionListener.java:128) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.action.support.ContextPreservingActionListener.onResponse(ContextPreservingActionListener.java:43) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.action.support.TransportAction$1.onResponse(TransportAction.java:89) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.action.support.TransportAction$1.onResponse(TransportAction.java:83) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.action.ActionListener$5.onResponse(ActionListener.java:297) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.action.bulk.TransportBulkAction$BulkOperation$1.finishHim(TransportBulkAction.java:557) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.action.bulk.TransportBulkAction$BulkOperation$1.onFailure(TransportBulkAction.java:552) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.action.support.TransportAction$1.onFailure(TransportAction.java:98) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.action.support.replication.TransportReplicationAction$ReroutePhase.finishAsFailed(TransportReplicationAction.java:864) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.action.support.replication.TransportReplicationAction$ReroutePhase$2.onClusterServiceClose(TransportReplicationAction.java:849) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.cluster.ClusterStateObserver$ContextPreservingListener.onClusterServiceClose(ClusterStateObserver.java:328) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.cluster.ClusterStateObserver$ObserverClusterStateListener.onClose(ClusterStateObserver.java:237) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.cluster.service.ClusterApplierService.addTimeoutListener(ClusterApplierService.java:263) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.cluster.ClusterStateObserver.waitForNextChange(ClusterStateObserver.java:176) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.cluster.ClusterStateObserver.waitForNextChange(ClusterStateObserver.java:120) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.cluster.ClusterStateObserver.waitForNextChange(ClusterStateObserver.java:112) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.action.support.replication.TransportReplicationAction$ReroutePhase.retry(TransportReplicationAction.java:841) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.action.support.replication.TransportReplicationAction$ReroutePhase$1.handleException(TransportReplicationAction.java:820) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.transport.TransportService.sendRequest(TransportService.java:694) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.action.support.replication.TransportReplicationAction$ReroutePhase.performAction(TransportReplicationAction.java:798) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.action.support.replication.TransportReplicationAction$ReroutePhase.performRemoteAction(TransportReplicationAction.java:793) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.action.support.replication.TransportReplicationAction$ReroutePhase.doRun(TransportReplicationAction.java:758) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.action.support.replication.TransportReplicationAction.runReroutePhase(TransportReplicationAction.java:190) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.action.support.replication.TransportReplicationAction.doExecute(TransportReplicationAction.java:185) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.action.support.replication.TransportReplicationAction.doExecute(TransportReplicationAction.java:95) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:179) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.action.support.ActionFilter$Simple.apply(ActionFilter.java:53) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:177) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:155) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:83) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.action.bulk.TransportBulkAction$BulkOperation.doRun(TransportBulkAction.java:527) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.action.bulk.TransportBulkAction.executeBulk(TransportBulkAction.java:655) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.action.bulk.TransportBulkAction.doInternalExecute(TransportBulkAction.java:252) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.action.bulk.TransportBulkAction.doExecute(TransportBulkAction.java:173) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.action.bulk.TransportBulkAction.doExecute(TransportBulkAction.java:100) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:179) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.action.support.ActionFilter$Simple.apply(ActionFilter.java:53) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:177) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:155) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.action.support.TransportAction.execute(TransportAction.java:83) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.client.node.NodeClient.executeLocally(NodeClient.java:86) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.client.node.NodeClient.doExecute(NodeClient.java:75) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.client.support.AbstractClient.execute(AbstractClient.java:412) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.client.FilterClient.doExecute(FilterClient.java:65) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.client.OriginSettingClient.doExecute(OriginSettingClient.java:51) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.client.support.AbstractClient.execute(AbstractClient.java:412) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.client.support.AbstractClient.bulk(AbstractClient.java:490) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService$BulkRetryableAction.lambda$new$1(ResultsPersisterService.java:269) [main/:?]
  1>    at org.elasticsear  1> ch.xpack.ml.utils.persistence.ResultsPersisterService$MlRetryableAction.tryAction(ResultsPersisterService.java:392) [main/:?]
  1>    at org.elasticsearch.action.support.RetryableAction$1.doRun(RetryableAction.java:99) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:737) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-7.11.0-SNAPSHOT.jar:7.11.0-SNAPSHOT]
  1>    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
  1>    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
  1>    at java.lang.Thread.run(Thread.java:834) [?:?]
  1> [2020-12-09T10:21:05,436][INFO ][o.e.c.s.ClusterApplierService] [node_t2] removed {{node_t3}{ifo3MGt5SgC_veAGI__1uw}{ny4CJTPCQoWzbtEe4fNGsw}{127.0.0.1}{127.0.0.1:56745}{dl}{ml.machine_memory=68719476736, ml.max_open_jobs=20, xpack.installed=true, ml.max_jvm_size=536

@benwtrent benwtrent force-pushed the feature/ml-results-persister-handles-node-shutdown branch from 0f2597d to fc0dd94 Compare December 9, 2020 18:52
Copy link
Contributor

@dimitris-athanasiou dimitris-athanasiou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@benwtrent
Copy link
Member Author

@elasticmachine update branch

@benwtrent benwtrent merged commit 9c99b5b into elastic:master Dec 10, 2020
@benwtrent benwtrent deleted the feature/ml-results-persister-handles-node-shutdown branch December 10, 2020 19:07
benwtrent added a commit to benwtrent/elasticsearch that referenced this pull request Dec 10, 2020
…#66128)

When a node is shutting down, any scheduled retry tasks should be cancelled.

Also, any currently finishing but failed tasks should not be retried.
benwtrent added a commit that referenced this pull request Dec 10, 2020
…66128) (#66187)

* [ML] results persisting service should respect node shutdown (#66128)

When a node is shutting down, any scheduled retry tasks should be cancelled.

Also, any currently finishing but failed tasks should not be retried.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants