Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Throw an error when a datafeed needs CCS but it is not enabled for the node #46044

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ those same roles.
`indices`::
(Required, array) An array of index names. Wildcards are supported. For
example: `["it_ops_metrics", "server*"]`.
+
--
NOTE: If any indices are in remote clusters then `cluster.remote.connect` must
not be set to `false` on any ML node.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be tied to the previous item I think this needs to be:

+
--
NOTE: If any indices are in remote clusters then `cluster.remote.connect` must
not be set to `false` on any ML node.
--

I.e. + and -- before and -- after.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you often need a blank line after the admonition block (i.e. after the "ML node." and before the final "--"), otherwise it doesn't format properly. If you need any help with the formatting, just let me know.

--

`job_id`::
(Required, string) A numerical character string that uniquely identifies the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ public final class Messages {
public static final String DATAFEED_FREQUENCY_MUST_BE_MULTIPLE_OF_AGGREGATIONS_INTERVAL =
"Datafeed frequency [{0}] must be a multiple of the aggregation interval [{1}]";
public static final String DATAFEED_ID_ALREADY_TAKEN = "A datafeed with id [{0}] already exists";
public static final String DATAFEED_NEEDS_REMOTE_CLUSTER_SEARCH = "Datafeed [{0}] is configured with a remote index pattern(s) {1}" +
" but the current node [{2}] is not allowed to connect to remote clusters." +
" Please enable cluster.remote.connect for all machine learning nodes.";

public static final String DATA_FRAME_ANALYTICS_BAD_QUERY_FORMAT = "Data Frame Analytics config query is not parsable";
public static final String DATA_FRAME_ANALYTICS_BAD_FIELD_FILTER = "No field [{0}] could be detected";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,9 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
jobConfigProvider,
jobResultsProvider,
datafeedConfigProvider,
jobResultsPersister);
jobResultsPersister,
settings,
clusterService.getNodeName());
DatafeedManager datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder,
System::currentTimeMillis, auditor, autodetectProcessManager);
this.datafeedManager.set(datafeedManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.ml.MlTasks;
Expand All @@ -47,6 +48,7 @@
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
Expand Down Expand Up @@ -85,6 +87,7 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
private final AnomalyDetectionAuditor auditor;
private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;
private final NamedXContentRegistry xContentRegistry;
private final boolean remoteClusterSearchSupported;

@Inject
public TransportStartDatafeedAction(Settings settings, TransportService transportService, ThreadPool threadPool,
Expand All @@ -103,6 +106,7 @@ public TransportStartDatafeedAction(Settings settings, TransportService transpor
this.auditor = auditor;
this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
this.xContentRegistry = xContentRegistry;
this.remoteClusterSearchSupported = RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings);
}

static void validate(Job job,
Expand Down Expand Up @@ -181,7 +185,7 @@ public void onFailure(Exception e) {
};

// Verify data extractor factory can be created, then start persistent task
Consumer<Job> createDataExtrator = job -> {
Consumer<Job> createDataExtractor = job -> {
if (RemoteClusterLicenseChecker.containsRemoteIndex(params.getDatafeedIndices())) {
final RemoteClusterLicenseChecker remoteClusterLicenseChecker =
new RemoteClusterLicenseChecker(client, XPackLicenseState::isMachineLearningAllowedForOperationMode);
Expand All @@ -193,6 +197,13 @@ public void onFailure(Exception e) {
response -> {
if (response.isSuccess() == false) {
listener.onFailure(createUnlicensedError(params.getDatafeedId(), response));
} else if (remoteClusterSearchSupported == false) {
listener.onFailure(
ExceptionsHelper.badRequestException(Messages.getMessage(
Messages.DATAFEED_NEEDS_REMOTE_CLUSTER_SEARCH,
datafeedConfigHolder.get().getId(),
RemoteClusterLicenseChecker.remoteIndices(datafeedConfigHolder.get().getIndices()),
clusterService.getNodeName())));
} else {
createDataExtractor(job, datafeedConfigHolder.get(), params, waitForTaskListener);
}
Expand All @@ -214,7 +225,7 @@ public void onFailure(Exception e) {
Job job = jobBuilder.build();
validate(job, datafeedConfigHolder.get(), tasks, xContentRegistry);
auditDeprecations(datafeedConfigHolder.get(), job, auditor, xContentRegistry);
createDataExtrator.accept(job);
createDataExtractor.accept(job);
} catch (Exception e) {
listener.onFailure(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,22 @@
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.license.RemoteClusterLicenseChecker;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.xpack.core.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedJobValidator;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.core.ml.job.results.Result;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetector;
import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetectorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
Expand All @@ -30,6 +35,7 @@
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
Expand All @@ -45,11 +51,13 @@ public class DatafeedJobBuilder {
private final JobResultsProvider jobResultsProvider;
private final DatafeedConfigProvider datafeedConfigProvider;
private final JobResultsPersister jobResultsPersister;
private final boolean remoteClusterSearchSupported;
private final String nodeName;

public DatafeedJobBuilder(Client client, NamedXContentRegistry xContentRegistry, AnomalyDetectionAuditor auditor,
Supplier<Long> currentTimeSupplier, JobConfigProvider jobConfigProvider,
JobResultsProvider jobResultsProvider, DatafeedConfigProvider datafeedConfigProvider,
JobResultsPersister jobResultsPersister) {
JobResultsPersister jobResultsPersister, Settings settings, String nodeName) {
this.client = client;
this.xContentRegistry = Objects.requireNonNull(xContentRegistry);
this.auditor = Objects.requireNonNull(auditor);
Expand All @@ -58,6 +66,8 @@ public DatafeedJobBuilder(Client client, NamedXContentRegistry xContentRegistry,
this.jobResultsProvider = Objects.requireNonNull(jobResultsProvider);
this.datafeedConfigProvider = Objects.requireNonNull(datafeedConfigProvider);
this.jobResultsPersister = Objects.requireNonNull(jobResultsPersister);
this.remoteClusterSearchSupported = RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings);
this.nodeName = nodeName;
}

void build(String datafeedId, ActionListener<DatafeedJob> listener) {
Expand Down Expand Up @@ -168,6 +178,18 @@ void build(String datafeedId, ActionListener<DatafeedJob> listener) {
configBuilder -> {
try {
datafeedConfigHolder.set(configBuilder.build());
if (remoteClusterSearchSupported == false) {
List<String> remoteIndices = RemoteClusterLicenseChecker.remoteIndices(datafeedConfigHolder.get().getIndices());
if (remoteIndices.isEmpty() == false) {
listener.onFailure(
ExceptionsHelper.badRequestException(Messages.getMessage(
Messages.DATAFEED_NEEDS_REMOTE_CLUSTER_SEARCH,
configBuilder.getId(),
remoteIndices,
nodeName)));
return;
}
}
jobConfigProvider.getJob(datafeedConfigHolder.get().getJobId(), jobConfigListener);
} catch (Exception e) {
listener.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
import org.elasticsearch.mock.orig.Mockito;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.xpack.core.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
Expand Down Expand Up @@ -92,7 +94,9 @@ public void init() {
jobConfigProvider,
jobResultsProvider,
datafeedConfigProvider,
jobResultsPersister);
jobResultsPersister,
Settings.EMPTY,
"test_node");
}

public void testBuild_GivenScrollDatafeedAndNewJob() throws Exception {
Expand Down Expand Up @@ -202,6 +206,46 @@ public void testBuild_GivenBucketsRequestFails() {
verify(taskHandler).accept(error);
}

public void testBuildGivenRemoteIndicesButNoRemoteSearching() throws Exception {
Settings settings = Settings.builder().put(RemoteClusterService.ENABLE_REMOTE_CLUSTERS.getKey(), false).build();
datafeedJobBuilder =
new DatafeedJobBuilder(
client,
xContentRegistry(),
auditor,
System::currentTimeMillis,
jobConfigProvider,
jobResultsProvider,
datafeedConfigProvider,
jobResultsPersister,
settings,
"test_node");
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeField("time");
Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob();
jobBuilder.setDataDescription(dataDescription);
jobBuilder.setCreateTime(new Date());
DatafeedConfig.Builder datafeed = DatafeedManagerTests.createDatafeedConfig("datafeed1", jobBuilder.getId());
datafeed.setIndices(Collections.singletonList("remotecluster:index-*"));

AtomicBoolean wasHandlerCalled = new AtomicBoolean(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the handler run in a different thread than the main test thread?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it runs in the same thread but I think because the boolean is updated in a lambda it must be effectively final and you can't do that with a simple boolean it needs to be in some sort of holder.

ActionListener<DatafeedJob> datafeedJobHandler = ActionListener.wrap(
datafeedJob -> fail("datafeed builder did not fail when remote index was given and remote clusters were not enabled"),
e -> {
assertThat(e.getMessage(), equalTo(Messages.getMessage(Messages.DATAFEED_NEEDS_REMOTE_CLUSTER_SEARCH,
"datafeed1",
"[remotecluster:index-*]",
"test_node")));
wasHandlerCalled.compareAndSet(false, true);
}
);

givenJob(jobBuilder);
givenDatafeed(datafeed);
datafeedJobBuilder.build("datafeed1", datafeedJobHandler);
assertBusy(() -> wasHandlerCalled.get());
}

private void givenJob(Job.Builder job) {
Mockito.doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
Expand Down