diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java index 8b83bf2fb85e3..a065be7eae0e1 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java @@ -19,7 +19,6 @@ package org.elasticsearch.client; import com.carrotsearch.randomizedtesting.generators.CodepointSetGenerator; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.get.GetRequest; @@ -220,7 +219,6 @@ import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/63542") public class MachineLearningIT extends ESRestHighLevelClientTestCase { @After diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java index b09ba882780ee..b9add106b82d7 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java @@ -18,7 +18,6 @@ */ package org.elasticsearch.client.documentation; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.bulk.BulkRequest; @@ -245,7 +244,6 @@ import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.core.Is.is; -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/63542") public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase { @After diff --git a/x-pack/plugin/ml/qa/basic-multi-node/build.gradle b/x-pack/plugin/ml/qa/basic-multi-node/build.gradle index d0ea6f86fd7c0..7e30c27d721fe 100644 --- a/x-pack/plugin/ml/qa/basic-multi-node/build.gradle +++ b/x-pack/plugin/ml/qa/basic-multi-node/build.gradle @@ -11,8 +11,3 @@ testClusters.all { setting 'indices.lifecycle.history_index_enabled', 'false' setting 'slm.history_index_enabled', 'false' } - -// Entire suite muted while https://github.com/elastic/elasticsearch/pull/63542 -// and https://github.com/elastic/ml-cpp/pull/1520 are merged -javaRestTest.enabled = false - diff --git a/x-pack/plugin/ml/qa/ml-with-security/build.gradle b/x-pack/plugin/ml/qa/ml-with-security/build.gradle index e4ac4c9097f02..09148f6da5459 100644 --- a/x-pack/plugin/ml/qa/ml-with-security/build.gradle +++ b/x-pack/plugin/ml/qa/ml-with-security/build.gradle @@ -214,9 +214,6 @@ yamlRestTest { ].join(',') } -// Entire suite muted while https://github.com/elastic/elasticsearch/pull/63542 -// and https://github.com/elastic/ml-cpp/pull/1520 are merged -yamlRestTest.enabled = false testClusters.all { testDistribution = 'DEFAULT' diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/build.gradle b/x-pack/plugin/ml/qa/native-multi-node-tests/build.gradle index e48078314f179..cedfbf0ff8fe8 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/build.gradle +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/build.gradle @@ -33,10 +33,6 @@ javaRestTest { systemProperty 'es.set.netty.runtime.available.processors', 'false' } -// Entire suite muted while https://github.com/elastic/elasticsearch/pull/63542 -// and https://github.com/elastic/ml-cpp/pull/1520 are merged -javaRestTest.enabled = false - testClusters.all { numberOfNodes = 3 testDistribution = 'DEFAULT' diff --git a/x-pack/plugin/ml/qa/single-node-tests/build.gradle b/x-pack/plugin/ml/qa/single-node-tests/build.gradle index 8324a6e0f5f22..f3610bfa98672 100644 --- a/x-pack/plugin/ml/qa/single-node-tests/build.gradle +++ b/x-pack/plugin/ml/qa/single-node-tests/build.gradle @@ -5,8 +5,3 @@ testClusters.all { setting 'xpack.security.enabled', 'false' setting 'xpack.license.self_generated.type', 'trial' } - -// Entire suite muted while https://github.com/elastic/elasticsearch/pull/63542 -// and https://github.com/elastic/ml-cpp/pull/1520 are merged -javaRestTest.enabled = false - diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 3206e8eea9f09..61d914ad40c9a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -625,7 +625,8 @@ public Collection createComponents(Client client, ClusterService cluster AnalyticsProcessFactory memoryEstimationProcessFactory; if (MachineLearningField.AUTODETECT_PROCESS.get(settings)) { try { - NativeController nativeController = NativeController.makeNativeController(clusterService.getNodeName(), environment); + NativeController nativeController = + NativeController.makeNativeController(clusterService.getNodeName(), environment, xContentRegistry); autodetectProcessFactory = new NativeAutodetectProcessFactory( environment, settings, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsBuilder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsBuilder.java index 065767ba4e736..7a23428d4cc95 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsBuilder.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsBuilder.java @@ -52,7 +52,7 @@ public AnalyticsBuilder performMemoryUsageEstimationOnly() { return this; } - public void build() throws IOException { + public void build() throws IOException, InterruptedException { List command = buildAnalyticsCommand(); processPipes.addArgs(command); nativeController.startProcess(command); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java index 6b395c32d433d..8be7bb0efd823 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java @@ -83,7 +83,7 @@ public AnalyticsProcessManager(Settings settings, this( settings, client, - threadPool.generic(), + threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME), threadPool.executor(MachineLearning.JOB_COMMS_THREAD_POOL_NAME), analyticsProcessFactory, auditor, @@ -451,7 +451,7 @@ synchronized void stop() { } if (process.get() != null) { try { - process.get().kill(); + process.get().kill(true); } catch (IOException e) { LOGGER.error(new ParameterizedMessage("[{}] Failed to kill process", config.getId()), e); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java index b298d459544d2..1ffbe72e33bee 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeAnalyticsProcessFactory.java @@ -116,8 +116,11 @@ private void createNativeProcess(String jobId, AnalyticsProcessConfig analyticsP new AnalyticsBuilder(env::tmpFile, nativeController, processPipes, analyticsProcessConfig, filesToDelete); try { analyticsBuilder.build(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.warn("[{}] Interrupted while launching data frame analytics process", jobId); } catch (IOException e) { - String msg = "Failed to launch data frame analytics process for job " + jobId; + String msg = "[" + jobId + "] Failed to launch data frame analytics process"; LOGGER.error(msg); throw ExceptionsHelper.serverError(msg, e); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcessFactory.java index 61e2f9f5d7954..4abce691d41f1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/NativeMemoryUsageEstimationProcessFactory.java @@ -102,8 +102,11 @@ private void createNativeProcess(String jobId, AnalyticsProcessConfig analyticsP .performMemoryUsageEstimationOnly(); try { analyticsBuilder.build(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.warn("[{}] Interrupted while launching data frame analytics memory usage estimation process", jobId); } catch (IOException e) { - String msg = "Failed to launch data frame analytics memory usage estimation process for job " + jobId; + String msg = "[" + jobId + "] Failed to launch data frame analytics memory usage estimation process"; LOGGER.error(msg); throw ExceptionsHelper.serverError(msg, e); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java index 3a3222d3c0206..144347c40a5b9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java @@ -169,7 +169,7 @@ public AutodetectBuilder scheduledEvents(List scheduledEvents) { /** * Requests that the controller daemon start an autodetect process. */ - public void build() throws IOException { + public void build() throws IOException, InterruptedException { List command = buildAutodetectCommand(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java index 8851a2a965f44..910951374931f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java @@ -196,7 +196,7 @@ public void killProcess(boolean awaitCompletion, boolean finish, boolean finaliz processKilled = true; autodetectResultProcessor.setProcessKilled(); autodetectWorkerExecutor.shutdown(); - autodetectProcess.kill(); + autodetectProcess.kill(awaitCompletion); if (awaitCompletion) { try { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java index 246280d8c149a..9fe795686e0f6 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java @@ -134,7 +134,7 @@ public void close() { } @Override - public void kill() { + public void kill(boolean awaitCompletion) { open = false; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java index bdd1aaa30fae3..62858d798da56 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java @@ -127,8 +127,11 @@ void createNativeProcess(Job job, AutodetectParams autodetectParams, ProcessPipe autodetectBuilder.quantiles(autodetectParams.quantiles()); } autodetectBuilder.build(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.warn("[{}] Interrupted while launching autodetect", job.getId()); } catch (IOException e) { - String msg = "Failed to launch autodetect for job " + job.getId(); + String msg = "[" + job.getId() + "] Failed to launch autodetect"; LOGGER.error(msg); throw ExceptionsHelper.serverError(msg, e); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/MultiplyingNormalizerProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/MultiplyingNormalizerProcess.java index 652d575b07ab0..849e911a30e47 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/MultiplyingNormalizerProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/MultiplyingNormalizerProcess.java @@ -90,7 +90,7 @@ public void flushStream() { } @Override - public void kill() { + public void kill(boolean awaitCompletion) { // Nothing to do } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java index de1ea69382905..4e3c1b688ed9a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java @@ -81,8 +81,11 @@ private void createNativeProcess(String jobId, String quantilesState, ProcessPip List command = new NormalizerBuilder(env, jobId, quantilesState, bucketSpan).build(); processPipes.addArgs(command); nativeController.startProcess(command); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.warn("[{}] Interrupted while launching normalizer", jobId); } catch (IOException e) { - String msg = "Failed to launch normalizer for job " + jobId; + String msg = "[" + jobId + "] Failed to launch normalizer"; LOGGER.error(msg); throw ExceptionsHelper.serverError(msg, e); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java index c9ba271da0241..f041d9aebb046 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java @@ -206,19 +206,22 @@ public void close() throws IOException { } @Override - public void kill() throws IOException { + public void kill(boolean awaitCompletion) throws IOException { LOGGER.debug("[{}] Killing {} process", jobId, getName()); processKilled = true; try { // The PID comes via the processes log stream. We do wait here to give the process the time to start up and report its PID. // Without the PID we cannot kill the process. - nativeController.killProcess(cppLogHandler().getPid(processPipes.getTimeout())); + nativeController.killProcess(cppLogHandler().getPid(processPipes.getTimeout()), awaitCompletion); // Wait for the process to die before closing processInStream as if the process // is still alive when processInStream is closed it may start persisting state cppLogHandler().waitForLogStreamClose(WAIT_FOR_KILL_TIMEOUT); } catch (TimeoutException e) { LOGGER.warn("[{}] Failed to get PID of {} process to kill", jobId, getName()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.warn("[{}] Interrupted while killing {} process", jobId, getName()); } finally { try { if (processInStream() != null) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/ControllerResponse.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/ControllerResponse.java new file mode 100644 index 0000000000000..ce91e8ad27a49 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/ControllerResponse.java @@ -0,0 +1,86 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.process; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Objects; + +public class ControllerResponse implements ToXContentObject { + + public static final ParseField TYPE = new ParseField("controller_response"); + + public static final ParseField COMMAND_ID = new ParseField("id"); + public static final ParseField SUCCESS = new ParseField("success"); + public static final ParseField REASON = new ParseField("reason"); + + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + TYPE.getPreferredName(), a -> new ControllerResponse((int) a[0], (boolean) a[1], (String) a[2])); + + static { + PARSER.declareInt(ConstructingObjectParser.constructorArg(), COMMAND_ID); + PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), SUCCESS); + PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), REASON); + } + + private final int commandId; + private final boolean success; + private final String reason; + + ControllerResponse(int commandId, boolean success, String reason) { + this.commandId = commandId; + this.success = success; + this.reason = reason; + } + + public int getCommandId() { + return commandId; + } + + public boolean isSuccess() { + return success; + } + + public String getReason() { + return reason; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(COMMAND_ID.getPreferredName(), commandId); + builder.field(SUCCESS.getPreferredName(), success); + if (reason != null) { + builder.field(REASON.getPreferredName(), reason); + } + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ControllerResponse that = (ControllerResponse) o; + return this.commandId == that.commandId && + this.success == that.success && + Objects.equals(this.reason, that.reason); + } + + @Override + public int hashCode() { + return Objects.hash(commandId, success, reason); + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeController.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeController.java index 9d9fbc4f79d06..2af4d86998568 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeController.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeController.java @@ -7,18 +7,24 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.xpack.ml.process.logging.CppLogMessageHandler; import org.elasticsearch.xpack.ml.utils.NamedPipeHelper; import java.io.BufferedOutputStream; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeoutException; @@ -44,20 +50,34 @@ public class NativeController implements MlController { private final String localNodeName; private final CppLogMessageHandler cppLogHandler; private final OutputStream commandStream; + private final InputStream responseStream; + private final NamedXContentRegistry xContentRegistry; + private final Map responseTrackers = new ConcurrentHashMap<>(); + // The response iterator cannot be constructed until something is expected to be in the stream it's reading from, + // otherwise it will block while it tries to read a few bytes to determine the character set. It could be created + // immediately in a dedicated thread, but that's wasteful as we can reuse the threads that are sending the commands + // to the controller to read the responses. So we create it in the first thread that wants to know the response to + // a command. + private final SetOnce> responseIteratorHolder = new SetOnce<>(); + private int nextCommandId = 1; // synchronized on commandStream so doesn't need to be volatile - public static NativeController makeNativeController(String localNodeName, Environment env) throws IOException { - return new NativeController(localNodeName, env, new NamedPipeHelper()); + public static NativeController makeNativeController(String localNodeName, Environment env, NamedXContentRegistry xContentRegistry) + throws IOException { + return new NativeController(localNodeName, env, new NamedPipeHelper(), xContentRegistry); } - NativeController(String localNodeName, Environment env, NamedPipeHelper namedPipeHelper) throws IOException { + NativeController(String localNodeName, Environment env, NamedPipeHelper namedPipeHelper, NamedXContentRegistry xContentRegistry) + throws IOException { + this.localNodeName = localNodeName; ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, CONTROLLER_CONNECT_TIMEOUT, CONTROLLER, null, - true, false, false, false, false); + true, false, true, false, false); processPipes.connectLogStream(); - tailLogsInThread(processPipes.getLogStreamHandler()); - processPipes.connectOtherStreams(); - this.localNodeName = localNodeName; this.cppLogHandler = processPipes.getLogStreamHandler(); + tailLogsInThread(cppLogHandler); + processPipes.connectOtherStreams(); this.commandStream = new BufferedOutputStream(processPipes.getCommandStream().get()); + this.responseStream = processPipes.getProcessOutStream().get(); + this.xContentRegistry = xContentRegistry; } static void tailLogsInThread(CppLogMessageHandler cppLogHandler) { @@ -88,7 +108,7 @@ public Map getNativeCodeInfo() throws TimeoutException { return cppLogHandler.getNativeCodeInfo(CONTROLLER_CONNECT_TIMEOUT); } - public void startProcess(List command) throws IOException { + public void startProcess(List command) throws IOException, InterruptedException { if (command.isEmpty()) { throw new IllegalArgumentException("Cannot start process: no command supplied"); } @@ -110,19 +130,29 @@ public void startProcess(List command) throws IOException { throw new ElasticsearchException(msg); } - synchronized (commandStream) { - LOGGER.debug("Starting process with command: " + command); - commandStream.write(START_COMMAND.getBytes(StandardCharsets.UTF_8)); - for (String arg : command) { + int commandId = -1; + try { + synchronized (commandStream) { + commandId = nextCommandId++; + setupResponseTracker(commandId); + LOGGER.debug("Command [{}]: starting process with command {}", commandId, command); + commandStream.write(Integer.toString(commandId).getBytes(StandardCharsets.UTF_8)); commandStream.write('\t'); - commandStream.write(arg.getBytes(StandardCharsets.UTF_8)); + commandStream.write(START_COMMAND.getBytes(StandardCharsets.UTF_8)); + for (String arg : command) { + commandStream.write('\t'); + commandStream.write(arg.getBytes(StandardCharsets.UTF_8)); + } + commandStream.write('\n'); + commandStream.flush(); } - commandStream.write('\n'); - commandStream.flush(); + awaitCompletion(commandId); + } finally { + removeResponseTracker(commandId); } } - public void killProcess(long pid) throws TimeoutException, IOException { + public void killProcess(long pid, boolean awaitCompletion) throws TimeoutException, IOException, InterruptedException { if (pid <= 0) { throw new IllegalArgumentException("invalid PID to kill: " + pid); } @@ -137,13 +167,29 @@ public void killProcess(long pid) throws TimeoutException, IOException { throw new ElasticsearchException(msg); } - synchronized (commandStream) { - LOGGER.debug("Killing process with PID: " + pid); - commandStream.write(KILL_COMMAND.getBytes(StandardCharsets.UTF_8)); - commandStream.write('\t'); - commandStream.write(Long.toString(pid).getBytes(StandardCharsets.UTF_8)); - commandStream.write('\n'); - commandStream.flush(); + int commandId = -1; + try { + synchronized (commandStream) { + commandId = nextCommandId++; + if (awaitCompletion) { + setupResponseTracker(commandId); + } + LOGGER.debug("Command [{}]: killing process with PID [{}]", commandId, pid); + commandStream.write(Integer.toString(commandId).getBytes(StandardCharsets.UTF_8)); + commandStream.write('\t'); + commandStream.write(KILL_COMMAND.getBytes(StandardCharsets.UTF_8)); + commandStream.write('\t'); + commandStream.write(Long.toString(pid).getBytes(StandardCharsets.UTF_8)); + commandStream.write('\n'); + commandStream.flush(); + } + if (awaitCompletion) { + awaitCompletion(commandId); + } + } finally { + if (awaitCompletion) { + removeResponseTracker(commandId); + } } } @@ -152,4 +198,75 @@ public void stop() throws IOException { // The C++ process will exit when it gets EOF on the command stream commandStream.close(); } + + private void setupResponseTracker(int commandId) { + ResponseTracker tracker = new ResponseTracker(); + ResponseTracker previous = responseTrackers.put(commandId, tracker); + assert previous == null; + } + + private void removeResponseTracker(int commandId) { + responseTrackers.remove(commandId); + } + + private void awaitCompletion(int commandId) throws IOException, InterruptedException { + + ResponseTracker ourResponseTracker = responseTrackers.get(commandId); + assert ourResponseTracker != null; + + // If our response has not been seen already (by another thread), parse messages under lock until it is seen. + // This approach means that of all the threads waiting for controller responses, one is parsing the messages + // on behalf of all of them, and the others are blocked. When the thread that is parsing gets the response + // it needs another thread will pick up the parsing. + if (ourResponseTracker.hasResponded() == false) { + synchronized (responseIteratorHolder) { + Iterator responseIterator = responseIteratorHolder.get(); + if (responseIterator == null) { + responseIterator = new ProcessResultsParser(ControllerResponse.PARSER, xContentRegistry) + .parseResults(this.responseStream); + responseIteratorHolder.set(responseIterator); + } + while (ourResponseTracker.hasResponded() == false) { + if (responseIterator.hasNext() == false) { + throw new IOException("ML controller response stream ended while awaiting response for command [" + + commandId + "]"); + } + ControllerResponse response = responseIterator.next(); + ResponseTracker respondedTracker = responseTrackers.get(response.getCommandId()); + // It is not compulsory to track all responses, hence legitimate not to find every ID in the map + if (respondedTracker != null) { + respondedTracker.setResponse(response); + } + } + } + } + + ControllerResponse ourResponse = ourResponseTracker.getResponse(); + assert ourResponse.getCommandId() == commandId; + if (ourResponse.isSuccess()) { + LOGGER.debug("ML controller successfully executed command [" + commandId + "]: [" + ourResponse.getReason() + "]"); + } else { + throw new IOException("ML controller failed to execute command [" + commandId + "]: [" + ourResponse.getReason() + "]"); + } + } + + private static class ResponseTracker { + + private final CountDownLatch latch = new CountDownLatch(1); + private final SetOnce responseHolder = new SetOnce<>(); + + boolean hasResponded() { + return latch.getCount() < 1; + } + + void setResponse(ControllerResponse response) { + responseHolder.set(response); + latch.countDown(); + } + + ControllerResponse getResponse() throws InterruptedException { + latch.await(); + return responseHolder.get(); + } + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeProcess.java index c4f2b4a463185..57715f465fe04 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeProcess.java @@ -43,8 +43,10 @@ public interface NativeProcess extends Closeable { /** * Kill the process. Do not wait for it to stop gracefully. + * @param awaitCompletion Indicates whether to wait for the process to die. Even if this + * is set to true the process will not complete gracefully. */ - void kill() throws IOException; + void kill(boolean awaitCompletion) throws IOException; /** * The time the process was started diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManagerTests.java index 223a085455116..204407334361c 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManagerTests.java @@ -262,7 +262,7 @@ public void testProcessContext_StartAndStop() throws Exception { inOrder.verify(dataExtractor).getExtractedFields(); // stop inOrder.verify(dataExtractor).cancel(); - inOrder.verify(process).kill(); + inOrder.verify(process).kill(true); verifyNoMoreInteractions(dataExtractor, process, task); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java index 4b128d8fb700d..51ff6c8162974 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicatorTests.java @@ -31,7 +31,6 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange; import org.junit.Before; -import org.mockito.Mockito; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -52,6 +51,7 @@ import static org.elasticsearch.mock.orig.Mockito.doAnswer; import static org.hamcrest.Matchers.equalTo; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doNothing; @@ -117,7 +117,7 @@ public void testFlushJob() throws Exception { AtomicReference flushAcknowledgementHolder = new AtomicReference<>(); communicator.flushJob(params, (f, e) -> flushAcknowledgementHolder.set(f)); assertThat(flushAcknowledgementHolder.get(), equalTo(flushAcknowledgement)); - Mockito.verify(process).flushJob(params); + verify(process).flushJob(params); } } @@ -145,7 +145,7 @@ public void testFlushJob_throwsIfProcessIsDead() throws IOException { public void testFlushJob_givenFlushWaitReturnsTrueOnSecondCall() throws Exception { AutodetectProcess process = mockAutodetectProcessWithOutputStream(); when(process.isProcessAlive()).thenReturn(true); - AutodetectResultProcessor autodetectResultProcessor = Mockito.mock(AutodetectResultProcessor.class); + AutodetectResultProcessor autodetectResultProcessor = mock(AutodetectResultProcessor.class); FlushAcknowledgement flushAcknowledgement = mock(FlushAcknowledgement.class); when(autodetectResultProcessor.waitForFlushAcknowledgement(anyString(), eq(Duration.ofSeconds(1)))) .thenReturn(null).thenReturn(flushAcknowledgement); @@ -168,8 +168,8 @@ public void testCloseGivenProcessIsReady() throws IOException { communicator.close(); verify(process).close(); - verify(process, never()).kill(); - Mockito.verifyNoMoreInteractions(stateStreamer); + verify(process, never()).kill(anyBoolean()); + verifyNoMoreInteractions(stateStreamer); } public void testCloseGivenProcessIsNotReady() throws IOException { @@ -179,7 +179,7 @@ public void testCloseGivenProcessIsNotReady() throws IOException { communicator.close(); - verify(process).kill(); + verify(process).kill(false); verify(process, never()).close(); verify(stateStreamer).cancel(); } @@ -195,13 +195,13 @@ public void testKill() throws IOException, TimeoutException { boolean awaitCompletion = randomBoolean(); boolean finish = randomBoolean(); communicator.killProcess(awaitCompletion, finish); - Mockito.verify(resultProcessor).setProcessKilled(); - Mockito.verify(process).kill(); - Mockito.verify(executorService).shutdown(); + verify(resultProcessor).setProcessKilled(); + verify(process).kill(awaitCompletion); + verify(executorService).shutdown(); if (awaitCompletion) { - Mockito.verify(resultProcessor).awaitCompletion(); + verify(resultProcessor).awaitCompletion(); } else { - Mockito.verify(resultProcessor, never()).awaitCompletion(); + verify(resultProcessor, never()).awaitCompletion(); } assertEquals(finish, finishCalled.get()); } @@ -223,7 +223,7 @@ private Job createJobDetails() { } private AutodetectProcess mockAutodetectProcessWithOutputStream() throws IOException { - AutodetectProcess process = Mockito.mock(AutodetectProcess.class); + AutodetectProcess process = mock(AutodetectProcess.class); when(process.isProcessAlive()).thenReturn(true); return process; } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcessTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcessTests.java index 4dd24fb9537bb..64fda4c467cf5 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcessTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcessTests.java @@ -47,13 +47,14 @@ public class AbstractNativeProcessTests extends ESTestCase { // 1) After close() for jobs that stop gracefully // 2) After kill() for jobs that are forcefully terminated // 3) After a simulated crash when we test simulated crash - private CountDownLatch mockNativeProcessLoggingStreamEnds = new CountDownLatch(1); + private CountDownLatch mockNativeProcessLoggingStreamEnds; @Before @SuppressWarnings("unchecked") public void initialize() throws IOException { nativeController = mock(NativeController.class); cppLogHandler = mock(CppLogMessageHandler.class); + mockNativeProcessLoggingStreamEnds = new CountDownLatch(1); // This answer blocks the thread on the executor service. // In order to unblock it, the test needs to call mockNativeProcessLoggingStreamEnds.countDown(). doAnswer( @@ -105,7 +106,7 @@ public void testStart_DoNotDetectCrashWhenProcessIsBeingKilled() throws Exceptio AbstractNativeProcess process = new TestNativeProcess(); try { process.start(executorService); - process.kill(); + process.kill(randomBoolean()); } finally { // It is critical that this comes after kill() but before close(), otherwise we // would not be accurately simulating a kill(). This is why try-with-resources diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/ControllerResponseTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/ControllerResponseTests.java new file mode 100644 index 0000000000000..c4cd300d61ac3 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/ControllerResponseTests.java @@ -0,0 +1,30 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.process; + +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractXContentTestCase; + +import java.io.IOException; + +public class ControllerResponseTests extends AbstractXContentTestCase { + + @Override + protected ControllerResponse createTestInstance() { + return new ControllerResponse(randomIntBetween(1, 1000000), randomBoolean(), randomBoolean() ? null : randomAlphaOfLength(100)); + } + + @Override + protected ControllerResponse doParseInstance(XContentParser parser) throws IOException { + return ControllerResponse.PARSER.apply(parser, null); + } + + @Override + protected boolean supportsUnknownFields() { + return false; + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/NativeControllerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/NativeControllerTests.java index 7fbbbf7ec81f7..c9478ffbffb73 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/NativeControllerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/NativeControllerTests.java @@ -7,6 +7,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.env.TestEnvironment; import org.elasticsearch.test.ESTestCase; @@ -39,9 +40,9 @@ public class NativeControllerTests extends ESTestCase { + "\"thread\":\"0x7fff7d2a8000\",\"message\":\"controller (64 bit): Version 6.0.0-alpha1-SNAPSHOT (Build a0d6ef8819418c) " + "Copyright (c) 2017 Elasticsearch BV\",\"method\":\"main\",\"file\":\"Main.cc\",\"line\":123}\n"; - private Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build(); + private final Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build(); - public void testStartProcessCommand() throws IOException { + public void testStartProcessCommandSucceeds() throws Exception { final NamedPipeHelper namedPipeHelper = mock(NamedPipeHelper.class); final InputStream logStream = mock(InputStream.class); @@ -54,6 +55,9 @@ public void testStartProcessCommand() throws IOException { when(namedPipeHelper.openNamedPipeInputStream(contains("log"), any(Duration.class))).thenReturn(logStream); ByteArrayOutputStream commandStream = new ByteArrayOutputStream(); when(namedPipeHelper.openNamedPipeOutputStream(contains("command"), any(Duration.class))).thenReturn(commandStream); + ByteArrayInputStream outputStream = + new ByteArrayInputStream("[{\"id\":1,\"success\":true,\"reason\":\"ok\"}]".getBytes(StandardCharsets.UTF_8)); + when(namedPipeHelper.openNamedPipeInputStream(contains("output"), any(Duration.class))).thenReturn(outputStream); List command = new ArrayList<>(); command.add("my_process"); @@ -61,15 +65,50 @@ public void testStartProcessCommand() throws IOException { command.add("--arg2=42"); command.add("--arg3=something with spaces"); - NativeController nativeController = new NativeController(NODE_NAME, TestEnvironment.newEnvironment(settings), namedPipeHelper); + NativeController nativeController = new NativeController(NODE_NAME, TestEnvironment.newEnvironment(settings), namedPipeHelper, + mock(NamedXContentRegistry.class)); nativeController.startProcess(command); - assertEquals("start\tmy_process\t--arg1\t--arg2=42\t--arg3=something with spaces\n", + assertEquals("1\tstart\tmy_process\t--arg1\t--arg2=42\t--arg3=something with spaces\n", commandStream.toString(StandardCharsets.UTF_8.name())); mockNativeProcessLoggingStreamEnds.countDown(); } + public void testStartProcessCommandFails() throws Exception { + + final NamedPipeHelper namedPipeHelper = mock(NamedPipeHelper.class); + final InputStream logStream = mock(InputStream.class); + final CountDownLatch mockNativeProcessLoggingStreamEnds = new CountDownLatch(1); + doAnswer( + invocationOnMock -> { + mockNativeProcessLoggingStreamEnds.await(); + return -1; + }).when(logStream).read(any()); + when(namedPipeHelper.openNamedPipeInputStream(contains("log"), any(Duration.class))).thenReturn(logStream); + ByteArrayOutputStream commandStream = new ByteArrayOutputStream(); + when(namedPipeHelper.openNamedPipeOutputStream(contains("command"), any(Duration.class))).thenReturn(commandStream); + ByteArrayInputStream outputStream = + new ByteArrayInputStream("[{\"id\":1,\"success\":false,\"reason\":\"some problem\"}]".getBytes(StandardCharsets.UTF_8)); + when(namedPipeHelper.openNamedPipeInputStream(contains("output"), any(Duration.class))).thenReturn(outputStream); + + List command = new ArrayList<>(); + command.add("my_process"); + command.add("--arg1"); + command.add("--arg2=666"); + command.add("--arg3=something different with spaces"); + + NativeController nativeController = new NativeController(NODE_NAME, TestEnvironment.newEnvironment(settings), namedPipeHelper, + mock(NamedXContentRegistry.class)); + IOException e = expectThrows(IOException.class, () -> nativeController.startProcess(command)); + + assertEquals("1\tstart\tmy_process\t--arg1\t--arg2=666\t--arg3=something different with spaces\n", + commandStream.toString(StandardCharsets.UTF_8.name())); + assertEquals("ML controller failed to execute command [1]: [some problem]", e.getMessage()); + + mockNativeProcessLoggingStreamEnds.countDown(); + } + public void testGetNativeCodeInfo() throws IOException, TimeoutException { NamedPipeHelper namedPipeHelper = mock(NamedPipeHelper.class); @@ -77,8 +116,11 @@ public void testGetNativeCodeInfo() throws IOException, TimeoutException { when(namedPipeHelper.openNamedPipeInputStream(contains("log"), any(Duration.class))).thenReturn(logStream); ByteArrayOutputStream commandStream = new ByteArrayOutputStream(); when(namedPipeHelper.openNamedPipeOutputStream(contains("command"), any(Duration.class))).thenReturn(commandStream); + ByteArrayInputStream outputStream = new ByteArrayInputStream("[]".getBytes(StandardCharsets.UTF_8)); + when(namedPipeHelper.openNamedPipeInputStream(contains("output"), any(Duration.class))).thenReturn(outputStream); - NativeController nativeController = new NativeController(NODE_NAME, TestEnvironment.newEnvironment(settings), namedPipeHelper); + NativeController nativeController = new NativeController(NODE_NAME, TestEnvironment.newEnvironment(settings), namedPipeHelper, + mock(NamedXContentRegistry.class)); Map nativeCodeInfo = nativeController.getNativeCodeInfo(); assertNotNull(nativeCodeInfo); @@ -94,8 +136,11 @@ public void testControllerDeath() throws Exception { when(namedPipeHelper.openNamedPipeInputStream(contains("log"), any(Duration.class))).thenReturn(logStream); ByteArrayOutputStream commandStream = new ByteArrayOutputStream(); when(namedPipeHelper.openNamedPipeOutputStream(contains("command"), any(Duration.class))).thenReturn(commandStream); + ByteArrayInputStream outputStream = new ByteArrayInputStream("[".getBytes(StandardCharsets.UTF_8)); + when(namedPipeHelper.openNamedPipeInputStream(contains("output"), any(Duration.class))).thenReturn(outputStream); - NativeController nativeController = new NativeController(NODE_NAME, TestEnvironment.newEnvironment(settings), namedPipeHelper); + NativeController nativeController = new NativeController(NODE_NAME, TestEnvironment.newEnvironment(settings), namedPipeHelper, + mock(NamedXContentRegistry.class)); // As soon as the log stream ends startProcess should think the native controller has died assertBusy(() -> { diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/calendar_crud.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/calendar_crud.yml index 9adb61f5d38bc..cfe0ead9e5040 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/calendar_crud.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/calendar_crud.yml @@ -1,9 +1,5 @@ --- "Test calendar CRUD": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: ml.get_calendars: calendar_id: _all @@ -96,10 +92,6 @@ --- "Test get calendar given missing": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: catch: /No calendar with id \[unknown\]/ ml.get_calendars: @@ -107,10 +99,6 @@ --- "Test put calendar given id contains invalid chars": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: catch: bad_request ml.put_calendar: @@ -118,10 +106,6 @@ --- "Test PageParams": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: ml.put_calendar: calendar_id: "calendar1" @@ -163,10 +147,6 @@ --- "Test PageParams with ID is invalid": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: catch: bad_request ml.get_calendars: @@ -175,10 +155,6 @@ --- "Test cannot overwrite an exisiting calendar": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: ml.put_calendar: @@ -191,10 +167,6 @@ --- "Test cannot create calendar with name _all": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: catch: bad_request ml.put_calendar: @@ -202,10 +174,6 @@ --- "Test deleted job is removed from calendar": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: ml.put_job: @@ -241,10 +209,6 @@ --- "Test update calendar job ids": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: ml.put_calendar: @@ -318,10 +282,6 @@ job_id: "missing_job" --- "Test calendar get events": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: ml.put_calendar: @@ -430,8 +390,6 @@ --- "Test delete calendar deletes events": - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all features: warnings - do: @@ -498,10 +456,6 @@ --- "Test get all calendar events": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: ml.put_calendar: @@ -538,10 +492,6 @@ --- "Test get calendar events for job": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: ml.put_job: @@ -631,10 +581,6 @@ --- "Test get calendar events with job groups": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - # Test job group - do: ml.put_job: @@ -687,10 +633,6 @@ --- "Test post calendar events given empty events": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: catch: /At least 1 event is required/ @@ -703,10 +645,6 @@ --- "Test delete event from non existing calendar": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: catch: /No calendar with id \[unknown\]/ @@ -716,10 +654,6 @@ --- "Test delete job from non existing calendar": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: catch: /No calendar with id \[unknown\]/ @@ -729,10 +663,6 @@ --- "Test list of job Ids": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: ml.put_job: job_id: foo-a @@ -794,10 +724,6 @@ --- "Test calendar actions with new job group": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: ml.put_job: job_id: calendar-job diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/custom_all_field.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/custom_all_field.yml index d6524a9769a8d..eefd9b937cbec 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/custom_all_field.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/custom_all_field.yml @@ -1,7 +1,5 @@ setup: - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all features: headers - do: headers: @@ -81,10 +79,6 @@ setup: --- "Test querying custom all field": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: search: @@ -151,10 +145,6 @@ setup: --- "Test wildcard job id": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: search: diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_cat_apis.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_cat_apis.yml index 78001960a64b3..83d8d92b7641c 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_cat_apis.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_cat_apis.yml @@ -1,7 +1,5 @@ setup: - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all features: headers - do: indices.create: @@ -42,10 +40,6 @@ setup: --- "Test cat data frame analytics single job": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: cat.ml_data_frame_analytics: @@ -57,10 +51,6 @@ setup: --- "Test cat data frame analytics single job with header": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: cat.ml_data_frame_analytics: id: dfa-outlier-detection-job @@ -72,10 +62,6 @@ setup: --- "Test cat data frame analytics all jobs with header": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: cat.ml_data_frame_analytics: v: true @@ -88,10 +74,6 @@ setup: --- "Test cat data frame analytics all jobs with header and column selection": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: cat.ml_data_frame_analytics: v: true diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_crud.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_crud.yml index 52942ce7e4305..c7439e4774088 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_crud.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/data_frame_analytics_crud.yml @@ -1,19 +1,11 @@ --- setup: - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: indices.create: index: index-source --- "Test get-all and get-all-stats given no analytics exist": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: ml.get_data_frame_analytics: @@ -41,10 +33,6 @@ setup: --- "Test put valid config with default outlier detection, query, and filter": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: ml.put_data_frame_analytics: @@ -98,10 +86,6 @@ setup: --- "Test put config with security headers in the body": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: catch: /unknown field \[headers\]/ ml.put_data_frame_analytics: @@ -121,10 +105,6 @@ setup: --- "Test put config with create_time in the body": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: catch: /unknown field \[create_time\]/ @@ -144,10 +124,6 @@ setup: --- "Test put config with version in the body": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: catch: /unknown field \[version\]/ @@ -167,10 +143,6 @@ setup: --- "Test put valid config with default outlier detection": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: ml.put_data_frame_analytics: @@ -201,10 +173,6 @@ setup: --- "Test put valid config with custom outlier detection": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: ml.put_data_frame_analytics: @@ -247,10 +215,6 @@ setup: --- "Test put config with inconsistent body/param ids": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: catch: /Inconsistent id; 'body_id' specified in the body differs from 'url_id' specified as a URL argument/ @@ -270,10 +234,6 @@ setup: --- "Test put config with invalid id": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: catch: /Invalid id*/ @@ -292,10 +252,6 @@ setup: --- "Test put config with invalid dest index name": - - skip: - reason: "https://github.com/elastic/elasticsearch/pull/63542" - version: all - - do: catch: /Invalid index name \[