Skip to content

Commit

Permalink
[ML] Throw an error when a datafeed needs CCS but it is not enabled f…
Browse files Browse the repository at this point in the history
…or the node (#46044) (#46186)

Though we allow CCS within datafeeds, users could prevent nodes from accessing remote clusters. This can cause mysterious errors and difficult to troubleshoot.

This commit adds a check to verify that `cluster.remote.connect` is enabled on the current node when a datafeed is configured with a remote index pattern.
  • Loading branch information
benwtrent authored Sep 3, 2019
1 parent 6ad0e5c commit 4fad6e3
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 5 deletions.
5 changes: 5 additions & 0 deletions docs/reference/ml/apis/put-datafeed.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ IMPORTANT: You must use {kib} or this API to create a {dfeed}. Do not put a {df
`indices` (required)::
(array) An array of index names. Wildcards are supported. For example:
`["it_ops_metrics", "server*"]`.
+
--
NOTE: If any indices are in remote clusters then `cluster.remote.connect` must
not be set to `false` on any ML node.
--

`job_id` (required)::
(string) A numerical character string that uniquely identifies the job.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ public final class Messages {
public static final String DATAFEED_FREQUENCY_MUST_BE_MULTIPLE_OF_AGGREGATIONS_INTERVAL =
"Datafeed frequency [{0}] must be a multiple of the aggregation interval [{1}]";
public static final String DATAFEED_ID_ALREADY_TAKEN = "A datafeed with id [{0}] already exists";
public static final String DATAFEED_NEEDS_REMOTE_CLUSTER_SEARCH = "Datafeed [{0}] is configured with a remote index pattern(s) {1}" +
" but the current node [{2}] is not allowed to connect to remote clusters." +
" Please enable cluster.remote.connect for all machine learning nodes.";

public static final String FILTER_CANNOT_DELETE = "Cannot delete filter [{0}] currently used by jobs {1}";
public static final String FILTER_CONTAINS_TOO_MANY_ITEMS = "Filter [{0}] contains too many items; up to [{1}] items are allowed";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
normalizerFactory, xContentRegistry, auditor, clusterService);
this.autodetectProcessManager.set(autodetectProcessManager);
DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(client, settings, xContentRegistry,
auditor, System::currentTimeMillis);
auditor, System::currentTimeMillis, clusterService.getNodeName());
DatafeedManager datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder,
System::currentTimeMillis, auditor, autodetectProcessManager);
this.datafeedManager.set(datafeedManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.ml.MlTasks;
Expand All @@ -44,6 +45,7 @@
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
Expand Down Expand Up @@ -81,6 +83,7 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
private final Auditor auditor;
private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;
private final NamedXContentRegistry xContentRegistry;
private final boolean remoteClusterSearchSupported;

@Inject
public TransportStartDatafeedAction(Settings settings, TransportService transportService, ThreadPool threadPool,
Expand All @@ -99,6 +102,7 @@ public TransportStartDatafeedAction(Settings settings, TransportService transpor
this.auditor = auditor;
this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
this.xContentRegistry = xContentRegistry;
this.remoteClusterSearchSupported = RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings);
}

static void validate(Job job,
Expand Down Expand Up @@ -176,7 +180,7 @@ public void onFailure(Exception e) {
};

// Verify data extractor factory can be created, then start persistent task
Consumer<Job> createDataExtrator = job -> {
Consumer<Job> createDataExtractor = job -> {
if (RemoteClusterLicenseChecker.containsRemoteIndex(params.getDatafeedIndices())) {
final RemoteClusterLicenseChecker remoteClusterLicenseChecker =
new RemoteClusterLicenseChecker(client, XPackLicenseState::isMachineLearningAllowedForOperationMode);
Expand All @@ -188,6 +192,13 @@ public void onFailure(Exception e) {
response -> {
if (response.isSuccess() == false) {
listener.onFailure(createUnlicensedError(params.getDatafeedId(), response));
} else if (remoteClusterSearchSupported == false) {
listener.onFailure(
ExceptionsHelper.badRequestException(Messages.getMessage(
Messages.DATAFEED_NEEDS_REMOTE_CLUSTER_SEARCH,
datafeedConfigHolder.get().getId(),
RemoteClusterLicenseChecker.remoteIndices(datafeedConfigHolder.get().getIndices()),
clusterService.getNodeName())));
} else {
createDataExtractor(job, datafeedConfigHolder.get(), params, waitForTaskListener);
}
Expand All @@ -208,7 +219,7 @@ public void onFailure(Exception e) {
try {
validate(job, datafeedConfigHolder.get(), tasks, xContentRegistry);
auditDeprecations(datafeedConfigHolder.get(), job, auditor, xContentRegistry);
createDataExtrator.accept(job);
createDataExtractor.accept(job);
} catch (Exception e) {
listener.onFailure(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,19 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.license.RemoteClusterLicenseChecker;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedJobValidator;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.core.ml.job.results.Result;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetector;
import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetectorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
Expand All @@ -30,6 +34,7 @@
import org.elasticsearch.xpack.ml.notifications.Auditor;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
Expand All @@ -42,14 +47,18 @@ public class DatafeedJobBuilder {
private final NamedXContentRegistry xContentRegistry;
private final Auditor auditor;
private final Supplier<Long> currentTimeSupplier;
private final boolean remoteClusterSearchSupported;
private final String nodeName;

public DatafeedJobBuilder(Client client, Settings settings, NamedXContentRegistry xContentRegistry,
Auditor auditor, Supplier<Long> currentTimeSupplier) {
Auditor auditor, Supplier<Long> currentTimeSupplier, String nodeName) {
this.client = client;
this.settings = Objects.requireNonNull(settings);
this.xContentRegistry = Objects.requireNonNull(xContentRegistry);
this.auditor = Objects.requireNonNull(auditor);
this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier);
this.remoteClusterSearchSupported = RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings);
this.nodeName = nodeName;
}

void build(String datafeedId, ClusterState state, ActionListener<DatafeedJob> listener) {
Expand Down Expand Up @@ -152,6 +161,18 @@ void build(String datafeedId, JobResultsProvider jobResultsProvider, JobConfigPr
datafeedConfig -> {
try {
datafeedConfigHolder.set(datafeedConfig);
if (remoteClusterSearchSupported == false) {
List<String> remoteIndices = RemoteClusterLicenseChecker.remoteIndices(datafeedConfig.getIndices());
if (remoteIndices.isEmpty() == false) {
listener.onFailure(
ExceptionsHelper.badRequestException(Messages.getMessage(
Messages.DATAFEED_NEEDS_REMOTE_CLUSTER_SEARCH,
datafeedConfig.getId(),
remoteIndices,
nodeName)));
return;
}
}
// Is the job in the cluster state?
Job job = MlMetadata.getMlMetadata(state).getJobs().get(datafeedConfig.getJobId());
if (job != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
import org.elasticsearch.mock.orig.Mockito;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
Expand Down Expand Up @@ -61,7 +63,8 @@ public void init() {
when(client.settings()).thenReturn(Settings.EMPTY);
auditor = mock(Auditor.class);
taskHandler = mock(Consumer.class);
datafeedJobBuilder = new DatafeedJobBuilder(client, Settings.EMPTY, xContentRegistry(), auditor, System::currentTimeMillis);
datafeedJobBuilder =
new DatafeedJobBuilder(client, Settings.EMPTY, xContentRegistry(), auditor, System::currentTimeMillis, "test_node");

jobResultsProvider = mock(JobResultsProvider.class);
Mockito.doAnswer(invocationOnMock -> {
Expand Down Expand Up @@ -202,6 +205,44 @@ public void testBuild_GivenBucketsRequestFails() {
verify(taskHandler).accept(error);
}

public void testBuildGivenRemoteIndicesButNoRemoteSearching() throws Exception {
Settings settings = Settings.builder().put(RemoteClusterService.ENABLE_REMOTE_CLUSTERS.getKey(), false).build();
datafeedJobBuilder =
new DatafeedJobBuilder(
client,
settings,
xContentRegistry(),
auditor,
System::currentTimeMillis,
"test_node");
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeField("time");
Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob();
jobBuilder.setDataDescription(dataDescription);
jobBuilder.setCreateTime(new Date());
DatafeedConfig.Builder datafeed = DatafeedManagerTests.createDatafeedConfig("datafeed1", jobBuilder.getId());
datafeed.setIndices(Collections.singletonList("remotecluster:index-*"));

AtomicBoolean wasHandlerCalled = new AtomicBoolean(false);
ActionListener<DatafeedJob> datafeedJobHandler = ActionListener.wrap(
datafeedJob -> fail("datafeed builder did not fail when remote index was given and remote clusters were not enabled"),
e -> {
assertThat(e.getMessage(), equalTo(Messages.getMessage(Messages.DATAFEED_NEEDS_REMOTE_CLUSTER_SEARCH,
"datafeed1",
"[remotecluster:index-*]",
"test_node")));
wasHandlerCalled.compareAndSet(false, true);
}
);

givenJob(jobBuilder);
givenDatafeed(datafeed);
ClusterState clusterState = ClusterState.builder(new ClusterName("datafeedjobbuildertest-cluster")).build();
datafeedJobBuilder.build("datafeed1", jobResultsProvider, jobConfigProvider, datafeedConfigReader,
clusterState, datafeedJobHandler);
assertBusy(() -> wasHandlerCalled.get());
}

private void givenJob(Job.Builder job) {
Mockito.doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
Expand Down

0 comments on commit 4fad6e3

Please sign in to comment.