From 65ceee8bf80fe8c900e43bd057e29fe9d08f7f88 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Sat, 17 Oct 2020 10:28:25 +0100 Subject: [PATCH] [ML] Wait for controller to respond to commands (#63542) This change makes threads that send a command to the ML controller process wait for it to respond to the command. Previously such threads would block until the command was sent, but not until it was actioned. This was on the assumption that the sort of commands being sent would be actioned almost instantaneously, but that assumption has been shown to be false when anti-malware software is running. Relates elastic/ml-cpp#1520 Fixes #62823 --- .../client/MachineLearningIT.java | 2 - .../MlClientDocumentationIT.java | 2 - .../ml/qa/basic-multi-node/build.gradle | 5 - .../ml/qa/ml-with-security/build.gradle | 3 - .../qa/native-multi-node-tests/build.gradle | 4 - .../ml/qa/single-node-tests/build.gradle | 5 - .../xpack/ml/MachineLearning.java | 3 +- .../dataframe/process/AnalyticsBuilder.java | 2 +- .../process/AnalyticsProcessManager.java | 4 +- .../NativeAnalyticsProcessFactory.java | 5 +- ...veMemoryUsageEstimationProcessFactory.java | 5 +- .../process/autodetect/AutodetectBuilder.java | 2 +- .../autodetect/AutodetectCommunicator.java | 2 +- .../BlackHoleAutodetectProcess.java | 2 +- .../NativeAutodetectProcessFactory.java | 5 +- .../MultiplyingNormalizerProcess.java | 2 +- .../NativeNormalizerProcessFactory.java | 5 +- .../ml/process/AbstractNativeProcess.java | 7 +- .../xpack/ml/process/ControllerResponse.java | 86 +++++ .../xpack/ml/process/NativeController.java | 163 +++++++-- .../xpack/ml/process/NativeProcess.java | 4 +- .../process/AnalyticsProcessManagerTests.java | 2 +- .../AutodetectCommunicatorTests.java | 24 +- .../process/AbstractNativeProcessTests.java | 5 +- .../ml/process/ControllerResponseTests.java | 30 ++ .../ml/process/NativeControllerTests.java | 57 ++- .../rest-api-spec/test/ml/calendar_crud.yml | 74 ---- .../test/ml/custom_all_field.yml | 10 - .../test/ml/data_frame_analytics_cat_apis.yml | 18 - .../test/ml/data_frame_analytics_crud.yml | 334 +----------------- .../test/ml/datafeed_cat_apis.yml | 6 - .../rest-api-spec/test/ml/datafeeds_crud.yml | 86 ----- .../test/ml/delete_expired_data.yml | 20 -- .../rest-api-spec/test/ml/delete_forecast.yml | 22 -- .../test/ml/delete_job_force.yml | 18 - .../test/ml/delete_model_snapshot.yml | 22 -- .../test/ml/estimate_model_memory.yml | 68 ---- .../test/ml/evaluate_data_frame.yml | 187 ---------- .../test/ml/explain_data_frame_analytics.yml | 32 -- .../rest-api-spec/test/ml/filter_crud.yml | 58 --- .../test/ml/find_file_structure.yml | 10 - .../rest-api-spec/test/ml/forecast.yml | 28 -- .../test/ml/get_datafeed_stats.yml | 37 +- .../rest-api-spec/test/ml/get_datafeeds.yml | 18 - .../test/ml/get_model_snapshots.yml | 34 -- .../rest-api-spec/test/ml/index_layout.yml | 22 -- .../rest-api-spec/test/ml/inference_crud.yml | 72 ---- .../test/ml/inference_processor.yml | 10 - .../test/ml/inference_stats_crud.yml | 18 - .../rest-api-spec/test/ml/job_cat_apis.yml | 6 - .../rest-api-spec/test/ml/job_groups.yml | 34 -- .../rest-api-spec/test/ml/jobs_crud.yml | 139 +------- .../rest-api-spec/test/ml/jobs_get.yml | 22 -- .../test/ml/jobs_get_result_buckets.yml | 50 --- .../test/ml/jobs_get_result_categories.yml | 34 -- .../test/ml/jobs_get_result_influencers.yml | 38 -- .../ml/jobs_get_result_overall_buckets.yml | 76 ---- .../test/ml/jobs_get_result_records.yml | 30 -- .../rest-api-spec/test/ml/jobs_get_stats.yml | 42 --- .../test/ml/ml_anomalies_default_mappings.yml | 10 - .../test/ml/ml_classic_analyze.yml | 4 - .../rest-api-spec/test/ml/ml_info.yml | 7 - .../test/ml/pipeline_inference.yml | 18 - .../rest-api-spec/test/ml/post_data.yml | 22 -- .../test/ml/preview_datafeed.yml | 32 -- .../test/ml/revert_model_snapshot.yml | 18 - .../test/ml/set_upgrade_mode.yml | 20 -- .../test/ml/start_data_frame_analytics.yml | 36 -- .../test/ml/start_stop_datafeed.yml | 72 +--- .../test/ml/stop_data_frame_analytics.yml | 28 -- .../test/ml/trained_model_cat_apis.yml | 6 - .../test/ml/update_model_snapshot.yml | 26 -- .../rest-api-spec/test/ml/validate.yml | 24 -- .../test/ml/validate_detector.yml | 8 - ...nfigIndexMappingsFullClusterRestartIT.java | 1 - .../upgrades/MlMappingsUpgradeIT.java | 1 - .../test/mixed_cluster/30_ml_jobs_crud.yml | 20 -- .../mixed_cluster/40_ml_datafeed_crud.yml | 12 - .../90_ml_data_frame_analytics_crud.yml | 40 --- .../test/old_cluster/30_ml_jobs_crud.yml | 32 -- .../test/old_cluster/40_ml_datafeed_crud.yml | 11 +- .../90_ml_data_frame_analytics_crud.yml | 16 - .../test/upgraded_cluster/30_ml_jobs_crud.yml | 26 +- .../upgraded_cluster/40_ml_datafeed_crud.yml | 12 - .../90_ml_data_frame_analytics_crud.yml | 32 -- 85 files changed, 369 insertions(+), 2276 deletions(-) create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/ControllerResponse.java create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/ControllerResponseTests.java diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java index 8b83bf2fb85e3..a065be7eae0e1 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java @@ -19,7 +19,6 @@ package org.elasticsearch.client; import com.carrotsearch.randomizedtesting.generators.CodepointSetGenerator; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.get.GetRequest; @@ -220,7 +219,6 @@ import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/63542") public class MachineLearningIT extends ESRestHighLevelClientTestCase { @After diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java index b09ba882780ee..b9add106b82d7 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java @@ -18,7 +18,6 @@ */ package org.elasticsearch.client.documentation; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.bulk.BulkRequest; @@ -245,7 +244,6 @@ import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.core.Is.is; -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/63542") public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase { @After diff --git a/x-pack/plugin/ml/qa/basic-multi-node/build.gradle b/x-pack/plugin/ml/qa/basic-multi-node/build.gradle index d0ea6f86fd7c0..7e30c27d721fe 100644 --- a/x-pack/plugin/ml/qa/basic-multi-node/build.gradle +++ b/x-pack/plugin/ml/qa/basic-multi-node/build.gradle @@ -11,8 +11,3 @@ testClusters.all { setting 'indices.lifecycle.history_index_enabled', 'false' setting 'slm.history_index_enabled', 'false' } - -// Entire suite muted while https://github.com/elastic/elasticsearch/pull/63542 -// and https://github.com/elastic/ml-cpp/pull/1520 are merged -javaRestTest.enabled = false - diff --git a/x-pack/plugin/ml/qa/ml-with-security/build.gradle b/x-pack/plugin/ml/qa/ml-with-security/build.gradle index e4ac4c9097f02..09148f6da5459 100644 --- a/x-pack/plugin/ml/qa/ml-with-security/build.gradle +++ b/x-pack/plugin/ml/qa/ml-with-security/build.gradle @@ -214,9 +214,6 @@ yamlRestTest { ].join(',') } -// Entire suite muted while https://github.com/elastic/elasticsearch/pull/63542 -// and https://github.com/elastic/ml-cpp/pull/1520 are merged -yamlRestTest.enabled = false testClusters.all { testDistribution = 'DEFAULT' diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/build.gradle b/x-pack/plugin/ml/qa/native-multi-node-tests/build.gradle index e48078314f179..cedfbf0ff8fe8 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/build.gradle +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/build.gradle @@ -33,10 +33,6 @@ javaRestTest { systemProperty 'es.set.netty.runtime.available.processors', 'false' } -// Entire suite muted while https://github.com/elastic/elasticsearch/pull/63542 -// and https://github.com/elastic/ml-cpp/pull/1520 are merged -javaRestTest.enabled = false - testClusters.all { numberOfNodes = 3 testDistribution = 'DEFAULT' diff --git a/x-pack/plugin/ml/qa/single-node-tests/build.gradle b/x-pack/plugin/ml/qa/single-node-tests/build.gradle index 8324a6e0f5f22..f3610bfa98672 100644 --- a/x-pack/plugin/ml/qa/single-node-tests/build.gradle +++ b/x-pack/plugin/ml/qa/single-node-tests/build.gradle @@ -5,8 +5,3 @@ testClusters.all { setting 'xpack.security.enabled', 'false' setting 'xpack.license.self_generated.type', 'trial' } - -// Entire suite muted while https://github.com/elastic/elasticsearch/pull/63542 -// and https://github.com/elastic/ml-cpp/pull/1520 are merged -javaRestTest.enabled = false - diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 3206e8eea9f09..61d914ad40c9a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -625,7 +625,8 @@ public Collection createComponents(Client client, ClusterService cluster AnalyticsProcessFactory memoryEstimationProcessFactory; if (MachineLearningField.AUTODETECT_PROCESS.get(settings)) { try { - NativeController nativeController = NativeController.makeNativeController(clusterService.getNodeName(), environment); + NativeController nativeController = + NativeController.makeNativeController(clusterService.getNodeName(), environment, xContentRegistry); autodetectProcessFactory = new NativeAutodetectProcessFactory( environment, settings, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsBuilder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsBuilder.java index 065767ba4e736..7a23428d4cc95 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsBuilder.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsBuilder.java @@ -52,7 +52,7 @@ public AnalyticsBuilder performMemoryUsageEstimationOnly() { return this; } - public void build() throws IOException { + public void build() throws IOException, InterruptedException { List command = buildAnalyticsCommand(); processPipes.addArgs(command); nativeController.startProcess(command); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java index 6b395c32d433d..8be7bb0efd823 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java @@ -83,7 +83,7 @@ public AnalyticsProcessManager(Settings settings, this( settings, client, - threadPool.generic(), + threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME), threadPool.executor(MachineLearning.JOB_COMMS_THREAD_POOL_NAME), analyticsProcessFactory, auditor, @@ -451,7 +451,7 @@ synchronized void stop() { } if (process.get() != null) { try { - process.get().kill(); + process.get().kill(true); } catch (IOException e) { LOGGER.error(new ParameterizedMessage("[{}] Failed to kill process", config.getId()), e); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java index b298d459544d2..1ffbe72e33bee 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java @@ -116,8 +116,11 @@ private void createNativeProcess(String jobId, AnalyticsProcessConfig analyticsP new AnalyticsBuilder(env::tmpFile, nativeController, processPipes, analyticsProcessConfig, filesToDelete); try { analyticsBuilder.build(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.warn("[{}] Interrupted while launching data frame analytics process", jobId); } catch (IOException e) { - String msg = "Failed to launch data frame analytics process for job " + jobId; + String msg = "[" + jobId + "] Failed to launch data frame analytics process"; LOGGER.error(msg); throw ExceptionsHelper.serverError(msg, e); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcessFactory.java index 61e2f9f5d7954..4abce691d41f1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcessFactory.java @@ -102,8 +102,11 @@ private void createNativeProcess(String jobId, AnalyticsProcessConfig analyticsP .performMemoryUsageEstimationOnly(); try { analyticsBuilder.build(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.warn("[{}] Interrupted while launching data frame analytics memory usage estimation process", jobId); } catch (IOException e) { - String msg = "Failed to launch data frame analytics memory usage estimation process for job " + jobId; + String msg = "[" + jobId + "] Failed to launch data frame analytics memory usage estimation process"; LOGGER.error(msg); throw ExceptionsHelper.serverError(msg, e); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java index 3a3222d3c0206..144347c40a5b9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java @@ -169,7 +169,7 @@ public AutodetectBuilder scheduledEvents(List scheduledEvents) { /** * Requests that the controller daemon start an autodetect process. */ - public void build() throws IOException { + public void build() throws IOException, InterruptedException { List command = buildAutodetectCommand(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java index 8851a2a965f44..910951374931f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java @@ -196,7 +196,7 @@ public void killProcess(boolean awaitCompletion, boolean finish, boolean finaliz processKilled = true; autodetectResultProcessor.setProcessKilled(); autodetectWorkerExecutor.shutdown(); - autodetectProcess.kill(); + autodetectProcess.kill(awaitCompletion); if (awaitCompletion) { try { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java index 246280d8c149a..9fe795686e0f6 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java @@ -134,7 +134,7 @@ public void close() { } @Override - public void kill() { + public void kill(boolean awaitCompletion) { open = false; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java index bdd1aaa30fae3..62858d798da56 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java @@ -127,8 +127,11 @@ void createNativeProcess(Job job, AutodetectParams autodetectParams, ProcessPipe autodetectBuilder.quantiles(autodetectParams.quantiles()); } autodetectBuilder.build(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.warn("[{}] Interrupted while launching autodetect", job.getId()); } catch (IOException e) { - String msg = "Failed to launch autodetect for job " + job.getId(); + String msg = "[" + job.getId() + "] Failed to launch autodetect"; LOGGER.error(msg); throw ExceptionsHelper.serverError(msg, e); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/MultiplyingNormalizerProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/MultiplyingNormalizerProcess.java index 652d575b07ab0..849e911a30e47 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/MultiplyingNormalizerProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/MultiplyingNormalizerProcess.java @@ -90,7 +90,7 @@ public void flushStream() { } @Override - public void kill() { + public void kill(boolean awaitCompletion) { // Nothing to do } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java index de1ea69382905..4e3c1b688ed9a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java @@ -81,8 +81,11 @@ private void createNativeProcess(String jobId, String quantilesState, ProcessPip List command = new NormalizerBuilder(env, jobId, quantilesState, bucketSpan).build(); processPipes.addArgs(command); nativeController.startProcess(command); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.warn("[{}] Interrupted while launching normalizer", jobId); } catch (IOException e) { - String msg = "Failed to launch normalizer for job " + jobId; + String msg = "[" + jobId + "] Failed to launch normalizer"; LOGGER.error(msg); throw ExceptionsHelper.serverError(msg, e); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java index c9ba271da0241..f041d9aebb046 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java @@ -206,19 +206,22 @@ public void close() throws IOException { } @Override - public void kill() throws IOException { + public void kill(boolean awaitCompletion) throws IOException { LOGGER.debug("[{}] Killing {} process", jobId, getName()); processKilled = true; try { // The PID comes via the processes log stream. We do wait here to give the process the time to start up and report its PID. // Without the PID we cannot kill the process. - nativeController.killProcess(cppLogHandler().getPid(processPipes.getTimeout())); + nativeController.killProcess(cppLogHandler().getPid(processPipes.getTimeout()), awaitCompletion); // Wait for the process to die before closing processInStream as if the process // is still alive when processInStream is closed it may start persisting state cppLogHandler().waitForLogStreamClose(WAIT_FOR_KILL_TIMEOUT); } catch (TimeoutException e) { LOGGER.warn("[{}] Failed to get PID of {} process to kill", jobId, getName()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.warn("[{}] Interrupted while killing {} process", jobId, getName()); } finally { try { if (processInStream() != null) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/ControllerResponse.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/ControllerResponse.java new file mode 100644 index 0000000000000..ce91e8ad27a49 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/ControllerResponse.java @@ -0,0 +1,86 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.process; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Objects; + +public class ControllerResponse implements ToXContentObject { + + public static final ParseField TYPE = new ParseField("controller_response"); + + public static final ParseField COMMAND_ID = new ParseField("id"); + public static final ParseField SUCCESS = new ParseField("success"); + public static final ParseField REASON = new ParseField("reason"); + + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + TYPE.getPreferredName(), a -> new ControllerResponse((int) a[0], (boolean) a[1], (String) a[2])); + + static { + PARSER.declareInt(ConstructingObjectParser.constructorArg(), COMMAND_ID); + PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), SUCCESS); + PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), REASON); + } + + private final int commandId; + private final boolean success; + private final String reason; + + ControllerResponse(int commandId, boolean success, String reason) { + this.commandId = commandId; + this.success = success; + this.reason = reason; + } + + public int getCommandId() { + return commandId; + } + + public boolean isSuccess() { + return success; + } + + public String getReason() { + return reason; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(COMMAND_ID.getPreferredName(), commandId); + builder.field(SUCCESS.getPreferredName(), success); + if (reason != null) { + builder.field(REASON.getPreferredName(), reason); + } + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ControllerResponse that = (ControllerResponse) o; + return this.commandId == that.commandId && + this.success == that.success && + Objects.equals(this.reason, that.reason); + } + + @Override + public int hashCode() { + return Objects.hash(commandId, success, reason); + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeController.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeController.java index 9d9fbc4f79d06..2af4d86998568 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeController.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeController.java @@ -7,18 +7,24 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.xpack.ml.process.logging.CppLogMessageHandler; import org.elasticsearch.xpack.ml.utils.NamedPipeHelper; import java.io.BufferedOutputStream; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeoutException; @@ -44,20 +50,34 @@ public class NativeController implements MlController { private final String localNodeName; private final CppLogMessageHandler cppLogHandler; private final OutputStream commandStream; + private final InputStream responseStream; + private final NamedXContentRegistry xContentRegistry; + private final Map responseTrackers = new ConcurrentHashMap<>(); + // The response iterator cannot be constructed until something is expected to be in the stream it's reading from, + // otherwise it will block while it tries to read a few bytes to determine the character set. It could be created + // immediately in a dedicated thread, but that's wasteful as we can reuse the threads that are sending the commands + // to the controller to read the responses. So we create it in the first thread that wants to know the response to + // a command. + private final SetOnce> responseIteratorHolder = new SetOnce<>(); + private int nextCommandId = 1; // synchronized on commandStream so doesn't need to be volatile - public static NativeController makeNativeController(String localNodeName, Environment env) throws IOException { - return new NativeController(localNodeName, env, new NamedPipeHelper()); + public static NativeController makeNativeController(String localNodeName, Environment env, NamedXContentRegistry xContentRegistry) + throws IOException { + return new NativeController(localNodeName, env, new NamedPipeHelper(), xContentRegistry); } - NativeController(String localNodeName, Environment env, NamedPipeHelper namedPipeHelper) throws IOException { + NativeController(String localNodeName, Environment env, NamedPipeHelper namedPipeHelper, NamedXContentRegistry xContentRegistry) + throws IOException { + this.localNodeName = localNodeName; ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, CONTROLLER_CONNECT_TIMEOUT, CONTROLLER, null, - true, false, false, false, false); + true, false, true, false, false); processPipes.connectLogStream(); - tailLogsInThread(processPipes.getLogStreamHandler()); - processPipes.connectOtherStreams(); - this.localNodeName = localNodeName; this.cppLogHandler = processPipes.getLogStreamHandler(); + tailLogsInThread(cppLogHandler); + processPipes.connectOtherStreams(); this.commandStream = new BufferedOutputStream(processPipes.getCommandStream().get()); + this.responseStream = processPipes.getProcessOutStream().get(); + this.xContentRegistry = xContentRegistry; } static void tailLogsInThread(CppLogMessageHandler cppLogHandler) { @@ -88,7 +108,7 @@ public Map getNativeCodeInfo() throws TimeoutException { return cppLogHandler.getNativeCodeInfo(CONTROLLER_CONNECT_TIMEOUT); } - public void startProcess(List command) throws IOException { + public void startProcess(List command) throws IOException, InterruptedException { if (command.isEmpty()) { throw new IllegalArgumentException("Cannot start process: no command supplied"); } @@ -110,19 +130,29 @@ public void startProcess(List command) throws IOException { throw new ElasticsearchException(msg); } - synchronized (commandStream) { - LOGGER.debug("Starting process with command: " + command); - commandStream.write(START_COMMAND.getBytes(StandardCharsets.UTF_8)); - for (String arg : command) { + int commandId = -1; + try { + synchronized (commandStream) { + commandId = nextCommandId++; + setupResponseTracker(commandId); + LOGGER.debug("Command [{}]: starting process with command {}", commandId, command); + commandStream.write(Integer.toString(commandId).getBytes(StandardCharsets.UTF_8)); commandStream.write('\t'); - commandStream.write(arg.getBytes(StandardCharsets.UTF_8)); + commandStream.write(START_COMMAND.getBytes(StandardCharsets.UTF_8)); + for (String arg : command) { + commandStream.write('\t'); + commandStream.write(arg.getBytes(StandardCharsets.UTF_8)); + } + commandStream.write('\n'); + commandStream.flush(); } - commandStream.write('\n'); - commandStream.flush(); + awaitCompletion(commandId); + } finally { + removeResponseTracker(commandId); } } - public void killProcess(long pid) throws TimeoutException, IOException { + public void killProcess(long pid, boolean awaitCompletion) throws TimeoutException, IOException, InterruptedException { if (pid <= 0) { throw new IllegalArgumentException("invalid PID to kill: " + pid); } @@ -137,13 +167,29 @@ public void killProcess(long pid) throws TimeoutException, IOException { throw new ElasticsearchException(msg); } - synchronized (commandStream) { - LOGGER.debug("Killing process with PID: " + pid); - commandStream.write(KILL_COMMAND.getBytes(StandardCharsets.UTF_8)); - commandStream.write('\t'); - commandStream.write(Long.toString(pid).getBytes(StandardCharsets.UTF_8)); - commandStream.write('\n'); - commandStream.flush(); + int commandId = -1; + try { + synchronized (commandStream) { + commandId = nextCommandId++; + if (awaitCompletion) { + setupResponseTracker(commandId); + } + LOGGER.debug("Command [{}]: killing process with PID [{}]", commandId, pid); + commandStream.write(Integer.toString(commandId).getBytes(StandardCharsets.UTF_8)); + commandStream.write('\t'); + commandStream.write(KILL_COMMAND.getBytes(StandardCharsets.UTF_8)); + commandStream.write('\t'); + commandStream.write(Long.toString(pid).getBytes(StandardCharsets.UTF_8)); + commandStream.write('\n'); + commandStream.flush(); + } + if (awaitCompletion) { + awaitCompletion(commandId); + } + } finally { + if (awaitCompletion) { + removeResponseTracker(commandId); + } } } @@ -152,4 +198,75 @@ public void stop() throws IOException { // The C++ process will exit when it gets EOF on the command stream commandStream.close(); } + + private void setupResponseTracker(int commandId) { + ResponseTracker tracker = new ResponseTracker(); + ResponseTracker previous = responseTrackers.put(commandId, tracker); + assert previous == null; + } + + private void removeResponseTracker(int commandId) { + responseTrackers.remove(commandId); + } + + private void awaitCompletion(int commandId) throws IOException, InterruptedException { + + ResponseTracker ourResponseTracker = responseTrackers.get(commandId); + assert ourResponseTracker != null; + + // If our response has not been seen already (by another thread), parse messages under lock until it is seen. + // This approach means that of all the threads waiting for controller responses, one is parsing the messages + // on behalf of all of them, and the others are blocked. When the thread that is parsing gets the response + // it needs another thread will pick up the parsing. + if (ourResponseTracker.hasResponded() == false) { + synchronized (responseIteratorHolder) { + Iterator responseIterator = responseIteratorHolder.get(); + if (responseIterator == null) { + responseIterator = new ProcessResultsParser(ControllerResponse.PARSER, xContentRegistry) + .parseResults(this.responseStream); + responseIteratorHolder.set(responseIterator); + } + while (ourResponseTracker.hasResponded() == false) { + if (responseIterator.hasNext() == false) { + throw new IOException("ML controller response stream ended while awaiting response for command [" + + commandId + "]"); + } + ControllerResponse response = responseIterator.next(); + ResponseTracker respondedTracker = responseTrackers.get(response.getCommandId()); + // It is not compulsory to track all responses, hence legitimate not to find every ID in the map + if (respondedTracker != null) { + respondedTracker.setResponse(response); + } + } + } + } + + ControllerResponse ourResponse = ourResponseTracker.getResponse(); + assert ourResponse.getCommandId() == commandId; + if (ourResponse.isSuccess()) { + LOGGER.debug("ML controller successfully executed command [" + commandId + "]: [" + ourResponse.getReason() + "]"); + } else { + throw new IOException("ML controller failed to execute command [" + commandId + "]: [" + ourResponse.getReason() + "]"); + } + } + + private static class ResponseTracker { + + private final CountDownLatch latch = new CountDownLatch(1); + private final SetOnce responseHolder = new SetOnce<>(); + + boolean hasResponded() { + return latch.getCount() < 1; + } + + void setResponse(ControllerResponse response) { + responseHolder.set(response); + latch.countDown(); + } + + ControllerResponse getResponse() throws InterruptedException { + latch.await(); + return responseHolder.get(); + } + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeProcess.java index c4f2b4a463185..57715f465fe04 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeProcess.java @@ -43,8 +43,10 @@ public interface NativeProcess extends Closeable { /** * Kill the process. Do not wait for it to stop gracefully. + * @param awaitCompletion Indicates whether to wait for the process to die. Even if this + * is set to true the process will not complete gracefully. */ - void kill() throws IOException; + void kill(boolean awaitCompletion) throws IOException; /** * The time the process was started diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManagerTests.java index 223a085455116..204407334361c 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManagerTests.java @@ -262,7 +262,7 @@ public void testProcessContext_StartAndStop() throws Exception { inOrder.verify(dataExtractor).getExtractedFields(); // stop inOrder.verify(dataExtractor).cancel(); - inOrder.verify(process).kill(); + inOrder.verify(process).kill(true); verifyNoMoreInteractions(dataExtractor, process, task); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java index 4b128d8fb700d..51ff6c8162974 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java @@ -31,7 +31,6 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange; import org.junit.Before; -import org.mockito.Mockito; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -52,6 +51,7 @@ import static org.elasticsearch.mock.orig.Mockito.doAnswer; import static org.hamcrest.Matchers.equalTo; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doNothing; @@ -117,7 +117,7 @@ public void testFlushJob() throws Exception { AtomicReference flushAcknowledgementHolder = new AtomicReference<>(); communicator.flushJob(params, (f, e) -> flushAcknowledgementHolder.set(f)); assertThat(flushAcknowledgementHolder.get(), equalTo(flushAcknowledgement)); - Mockito.verify(process).flushJob(params); + verify(process).flushJob(params); } } @@ -145,7 +145,7 @@ public void testFlushJob_throwsIfProcessIsDead() throws IOException { public void testFlushJob_givenFlushWaitReturnsTrueOnSecondCall() throws Exception { AutodetectProcess process = mockAutodetectProcessWithOutputStream(); when(process.isProcessAlive()).thenReturn(true); - AutodetectResultProcessor autodetectResultProcessor = Mockito.mock(AutodetectResultProcessor.class); + AutodetectResultProcessor autodetectResultProcessor = mock(AutodetectResultProcessor.class); FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class); when(autodetectResultProcessor.waitForFlushAcknowledgement(anyString(), eq(Duration.ofSeconds(1)))) .thenReturn(null).thenReturn(flushAcknowledgement); @@ -168,8 +168,8 @@ public void testCloseGivenProcessIsReady() throws IOException { communicator.close(); verify(process).close(); - verify(process, never()).kill(); - Mockito.verifyNoMoreInteractions(stateStreamer); + verify(process, never()).kill(anyBoolean()); + verifyNoMoreInteractions(stateStreamer); } public void testCloseGivenProcessIsNotReady() throws IOException { @@ -179,7 +179,7 @@ public void testCloseGivenProcessIsNotReady() throws IOException { communicator.close(); - verify(process).kill(); + verify(process).kill(false); verify(process, never()).close(); verify(stateStreamer).cancel(); } @@ -195,13 +195,13 @@ public void testKill() throws IOException, TimeoutException { boolean awaitCompletion = randomBoolean(); boolean finish = randomBoolean(); communicator.killProcess(awaitCompletion, finish); - Mockito.verify(resultProcessor).setProcessKilled(); - Mockito.verify(process).kill(); - Mockito.verify(executorService).shutdown(); + verify(resultProcessor).setProcessKilled(); + verify(process).kill(awaitCompletion); + verify(executorService).shutdown(); if (awaitCompletion) { - Mockito.verify(resultProcessor).awaitCompletion(); + verify(resultProcessor).awaitCompletion(); } else { - Mockito.verify(resultProcessor, never()).awaitCompletion(); + verify(resultProcessor, never()).awaitCompletion(); } assertEquals(finish, finishCalled.get()); } @@ -223,7 +223,7 @@ private Job createJobDetails() { } private AutodetectProcess mockAutodetectProcessWithOutputStream() throws IOException { - AutodetectProcess process = Mockito.mock(AutodetectProcess.class); + AutodetectProcess process = mock(AutodetectProcess.class); when(process.isProcessAlive()).thenReturn(true); return process; } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcessTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcessTests.java index 4dd24fb9537bb..64fda4c467cf5 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcessTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcessTests.java @@ -47,13 +47,14 @@ public class AbstractNativeProcessTests extends ESTestCase { // 1) After close() for jobs that stop gracefully // 2) After kill() for jobs that are forcefully terminated // 3) After a simulated crash when we test simulated crash - private CountDownLatch mockNativeProcessLoggingStreamEnds = new CountDownLatch(1); + private CountDownLatch mockNativeProcessLoggingStreamEnds; @Before @SuppressWarnings("unchecked") public void initialize() throws IOException { nativeController = mock(NativeController.class); cppLogHandler = mock(CppLogMessageHandler.class); + mockNativeProcessLoggingStreamEnds = new CountDownLatch(1); // This answer blocks the thread on the executor service. // In order to unblock it, the test needs to call mockNativeProcessLoggingStreamEnds.countDown(). doAnswer( @@ -105,7 +106,7 @@ public void testStart_DoNotDetectCrashWhenProcessIsBeingKilled() throws Exceptio AbstractNativeProcess process = new TestNativeProcess(); try { process.start(executorService); - process.kill(); + process.kill(randomBoolean()); } finally { // It is critical that this comes after kill() but before close(), otherwise we // would not be accurately simulating a kill(). This is why try-with-resources diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/ControllerResponseTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/ControllerResponseTests.java new file mode 100644 index 0000000000000..c4cd300d61ac3 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/ControllerResponseTests.java @@ -0,0 +1,30 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.process; + +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractXContentTestCase; + +import java.io.IOException; + +public class ControllerResponseTests extends AbstractXContentTestCase { + + @Override + protected ControllerResponse createTestInstance() { + return new ControllerResponse(randomIntBetween(1, 1000000), randomBoolean(), randomBoolean() ? null : randomAlphaOfLength(100)); + } + + @Override + protected ControllerResponse doParseInstance(XContentParser parser) throws IOException { + return ControllerResponse.PARSER.apply(parser, null); + } + + @Override + protected boolean supportsUnknownFields() { + return false; + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/NativeControllerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/NativeControllerTests.java index 7fbbbf7ec81f7..c9478ffbffb73 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/NativeControllerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/NativeControllerTests.java @@ -7,6 +7,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.env.TestEnvironment; import org.elasticsearch.test.ESTestCase; @@ -39,9 +40,9 @@ public class NativeControllerTests extends ESTestCase { + "\"thread\":\"0x7fff7d2a8000\",\"message\":\"controller (64 bit): Version 6.0.0-alpha1-SNAPSHOT (Build a0d6ef8819418c) " + "Copyright (c) 2017 Elasticsearch BV\",\"method\":\"main\",\"file\":\"Main.cc\",\"line\":123}\n"; - private Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build(); + private final Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build(); - public void testStartProcessCommand() throws IOException { + public void testStartProcessCommandSucceeds() throws Exception { final NamedPipeHelper namedPipeHelper = mock(NamedPipeHelper.class); final InputStream logStream = mock(InputStream.class); @@ -54,6 +55,9 @@ public void testStartProcessCommand() throws IOException { when(namedPipeHelper.openNamedPipeInputStream(contains("log"), any(Duration.class))).thenReturn(logStream); ByteArrayOutputStream commandStream = new ByteArrayOutputStream(); when(namedPipeHelper.openNamedPipeOutputStream(contains("command"), any(Duration.class))).thenReturn(commandStream); + ByteArrayInputStream outputStream = + new ByteArrayInputStream("[{\"id\":1,\"success\":true,\"reason\":\"ok\"}]".getBytes(StandardCharsets.UTF_8)); + when(namedPipeHelper.openNamedPipeInputStream(contains("output"), any(Duration.class))).thenReturn(outputStream); List command = new ArrayList<>(); command.add("my_process"); @@ -61,15 +65,50 @@ public void testStartProcessCommand() throws IOException { command.add("--arg2=42"); command.add("--arg3=something with spaces"); - NativeController nativeController = new NativeController(NODE_NAME, TestEnvironment.newEnvironment(settings), namedPipeHelper); + NativeController nativeController = new NativeController(NODE_NAME, TestEnvironment.newEnvironment(settings), namedPipeHelper, + mock(NamedXContentRegistry.class)); nativeController.startProcess(command); - assertEquals("start\tmy_process\t--arg1\t--arg2=42\t--arg3=something with spaces\n", + assertEquals("1\tstart\tmy_process\t--arg1\t--arg2=42\t--arg3=something with spaces\n", commandStream.toString(StandardCharsets.UTF_8.name())); mockNativeProcessLoggingStreamEnds.countDown(); } + public void testStartProcessCommandFails() throws Exception { + + final NamedPipeHelper namedPipeHelper = mock(NamedPipeHelper.class); + final InputStream logStream = mock(InputStream.class); + final CountDownLatch mockNativeProcessLoggingStreamEnds = new CountDownLatch(1); + doAnswer( + invocationOnMock -> { + mockNativeProcessLoggingStreamEnds.await(); + return -1; + }).when(logStream).read(any()); + when(namedPipeHelper.openNamedPipeInputStream(contains("log"), any(Duration.class))).thenReturn(logStream); + ByteArrayOutputStream commandStream = new ByteArrayOutputStream(); + when(namedPipeHelper.openNamedPipeOutputStream(contains("command"), any(Duration.class))).thenReturn(commandStream); + ByteArrayInputStream outputStream = + new ByteArrayInputStream("[{\"id\":1,\"success\":false,\"reason\":\"some problem\"}]".getBytes(StandardCharsets.UTF_8)); + when(namedPipeHelper.openNamedPipeInputStream(contains("output"), any(Duration.class))).thenReturn(outputStream); + + List command = new ArrayList<>(); + command.add("my_process"); + command.add("--arg1"); + command.add("--arg2=666"); + command.add("--arg3=something different with spaces"); + + NativeController nativeController = new NativeController(NODE_NAME, TestEnvironment.newEnvironment(settings), namedPipeHelper, + mock(NamedXContentRegistry.class)); + IOException e = expectThrows(IOException.class, () -> nativeController.startProcess(command)); + + assertEquals("1\tstart\tmy_process\t--arg1\t--arg2=666\t--arg3=something different with spaces\n", + commandStream.toString(StandardCharsets.UTF_8.name())); + assertEquals("ML controller failed to execute command [1]: [some problem]", e.getMessage()); + + mockNativeProcessLoggingStreamEnds.countDown(); + } + public void testGetNativeCodeInfo() throws IOException, TimeoutException { NamedPipeHelper namedPipeHelper = mock(NamedPipeHelper.class); @@ -77,8 +116,11 @@ public void testGetNativeCodeInfo() throws IOException, TimeoutException { when(namedPipeHelper.openNamedPipeInputStream(contains("log"), any(Duration.class))).thenReturn(logStream); ByteArrayOutputStream commandStream = new ByteArrayOutputStream(); when(namedPipeHelper.openNamedPipeOutputStream(contains("command"), any(Duration.class))).thenReturn(commandStream); + ByteArrayInputStream outputStream = new ByteArrayInputStream("[]".getBytes(StandardCharsets.UTF_8)); + when(namedPipeHelper.openNamedPipeInputStream(contains("output"), any(Duration.class))).thenReturn(outputStream); - NativeController nativeController = new NativeController(NODE_NAME, TestEnvironment.newEnvironment(settings), namedPipeHelper); + NativeController nativeController = new NativeController(NODE_NAME, TestEnvironment.newEnvironment(settings), namedPipeHelper, + mock(NamedXContentRegistry.class)); Map nativeCodeInfo = nativeController.getNativeCodeInfo(); assertNotNull(nativeCodeInfo); @@ -94,8 +136,11 @@ public void testControllerDeath() throws Exception { when(namedPipeHelper.openNamedPipeInputStream(contains("log"), any(Duration.class))).thenReturn(logStream); ByteArrayOutputStream commandStream = new ByteArrayOutputStream(); when(namedPipeHelper.openNamedPipeOutputStream(contains("command"), any(Duration.class))).thenReturn(commandStream); + ByteArrayInputStream outputStream = new ByteArrayInputStream("[".getBytes(StandardCharsets.UTF_8)); + when(namedPipeHelper.openNamedPipeInputStream(contains("output"), any(Duration.class))).thenReturn(outputStream); - NativeController nativeController = new NativeController(NODE_NAME, TestEnvironment.newEnvironment(settings), namedPipeHelper); + NativeController nativeController = new NativeController(NODE_NAME, TestEnvironment.newEnvironment(settings), namedPipeHelper, + mock(NamedXContentRegistry.class)); // As soon as the log stream ends startProcess should think the native controller has died assertBusy(() -> { diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/calendar_crud.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/calendar_crud.yml index 9adb61f5d38bc..cfe0ead9e5040 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/calendar_crud.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/calendar_crud.yml @@ -1,9 +1,5 @@ --- "Test calendar CRUD": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: ml.get_calendars: calendar_id: _all @@ -96,10 +92,6 @@ --- "Test get calendar given missing": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: catch: /No calendar with id \[unknown\]/ ml.get_calendars: @@ -107,10 +99,6 @@ --- "Test put calendar given id contains invalid chars": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: catch: bad_request ml.put_calendar: @@ -118,10 +106,6 @@ --- "Test PageParams": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: ml.put_calendar: calendar_id: "calendar1" @@ -163,10 +147,6 @@ --- "Test PageParams with ID is invalid": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: catch: bad_request ml.get_calendars: @@ -175,10 +155,6 @@ --- "Test cannot overwrite an exisiting calendar": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: ml.put_calendar: @@ -191,10 +167,6 @@ --- "Test cannot create calendar with name _all": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: catch: bad_request ml.put_calendar: @@ -202,10 +174,6 @@ --- "Test deleted job is removed from calendar": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: ml.put_job: @@ -241,10 +209,6 @@ --- "Test update calendar job ids": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: ml.put_calendar: @@ -318,10 +282,6 @@ job_id: "missing_job" --- "Test calendar get events": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: ml.put_calendar: @@ -430,8 +390,6 @@ --- "Test delete calendar deletes events": - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all features: warnings - do: @@ -498,10 +456,6 @@ --- "Test get all calendar events": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: ml.put_calendar: @@ -538,10 +492,6 @@ --- "Test get calendar events for job": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: ml.put_job: @@ -631,10 +581,6 @@ --- "Test get calendar events with job groups": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - # Test job group - do: ml.put_job: @@ -687,10 +633,6 @@ --- "Test post calendar events given empty events": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: catch: /At least 1 event is required/ @@ -703,10 +645,6 @@ --- "Test delete event from non existing calendar": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: catch: /No calendar with id \[unknown\]/ @@ -716,10 +654,6 @@ --- "Test delete job from non existing calendar": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: catch: /No calendar with id \[unknown\]/ @@ -729,10 +663,6 @@ --- "Test list of job Ids": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: ml.put_job: job_id: foo-a @@ -794,10 +724,6 @@ --- "Test calendar actions with new job group": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: ml.put_job: job_id: calendar-job diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/custom_all_field.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/custom_all_field.yml index d6524a9769a8d..eefd9b937cbec 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/custom_all_field.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/custom_all_field.yml @@ -1,7 +1,5 @@ setup: - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all features: headers - do: headers: @@ -81,10 +79,6 @@ setup: --- "Test querying custom all field": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: search: @@ -151,10 +145,6 @@ setup: --- "Test wildcard job id": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: search: diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_cat_apis.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_cat_apis.yml index 78001960a64b3..83d8d92b7641c 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_cat_apis.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_cat_apis.yml @@ -1,7 +1,5 @@ setup: - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all features: headers - do: indices.create: @@ -42,10 +40,6 @@ setup: --- "Test cat data frame analytics single job": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: cat.ml_data_frame_analytics: @@ -57,10 +51,6 @@ setup: --- "Test cat data frame analytics single job with header": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: cat.ml_data_frame_analytics: id: dfa-outlier-detection-job @@ -72,10 +62,6 @@ setup: --- "Test cat data frame analytics all jobs with header": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: cat.ml_data_frame_analytics: v: true @@ -88,10 +74,6 @@ setup: --- "Test cat data frame analytics all jobs with header and column selection": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: cat.ml_data_frame_analytics: v: true diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_crud.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_crud.yml index 52942ce7e4305..c7439e4774088 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_crud.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_crud.yml @@ -1,19 +1,11 @@ --- setup: - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: indices.create: index: index-source --- "Test get-all and get-all-stats given no analytics exist": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: ml.get_data_frame_analytics: @@ -41,10 +33,6 @@ setup: --- "Test put valid config with default outlier detection, query, and filter": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: ml.put_data_frame_analytics: @@ -98,10 +86,6 @@ setup: --- "Test put config with security headers in the body": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: catch: /unknown field \[headers\]/ ml.put_data_frame_analytics: @@ -121,10 +105,6 @@ setup: --- "Test put config with create_time in the body": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: catch: /unknown field \[create_time\]/ @@ -144,10 +124,6 @@ setup: --- "Test put config with version in the body": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: catch: /unknown field \[version\]/ @@ -167,10 +143,6 @@ setup: --- "Test put valid config with default outlier detection": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: ml.put_data_frame_analytics: @@ -201,10 +173,6 @@ setup: --- "Test put valid config with custom outlier detection": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: ml.put_data_frame_analytics: @@ -247,10 +215,6 @@ setup: --- "Test put config with inconsistent body/param ids": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: catch: /Inconsistent id; 'body_id' specified in the body differs from 'url_id' specified as a URL argument/ @@ -270,10 +234,6 @@ setup: --- "Test put config with invalid id": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: catch: /Invalid id*/ @@ -292,10 +252,6 @@ setup: --- "Test put config with invalid dest index name": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: catch: /Invalid index name \[