Skip to content

Commit

Permalink
[ML] Throw an error when a datafeed needs CCS but it is not enabled f…
Browse files Browse the repository at this point in the history
…or the node (elastic#46044)

Though we allow CCS within datafeeds, users could prevent nodes from accessing remote clusters. This can cause mysterious errors and difficult to troubleshoot.

This commit adds a check to verify that `cluster.remote.connect` is enabled on the current node when a datafeed is configured with a remote index pattern.
  • Loading branch information
benwtrent committed Aug 30, 2019
1 parent 06ea4dd commit 5d551ba
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ those same roles.
`indices`::
(Required, array) An array of index names. Wildcards are supported. For
example: `["it_ops_metrics", "server*"]`.
+
--
NOTE: If any indices are in remote clusters then `cluster.remote.connect` must
not be set to `false` on any ML node.
--

`job_id`::
(Required, string) A numerical character string that uniquely identifies the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ public final class Messages {
public static final String DATAFEED_FREQUENCY_MUST_BE_MULTIPLE_OF_AGGREGATIONS_INTERVAL =
"Datafeed frequency [{0}] must be a multiple of the aggregation interval [{1}]";
public static final String DATAFEED_ID_ALREADY_TAKEN = "A datafeed with id [{0}] already exists";
public static final String DATAFEED_NEEDS_REMOTE_CLUSTER_SEARCH = "Datafeed [{0}] is configured with a remote index pattern(s) {1}" +
" but the current node [{2}] is not allowed to connect to remote clusters." +
" Please enable cluster.remote.connect for all machine learning nodes.";

public static final String DATA_FRAME_ANALYTICS_BAD_QUERY_FORMAT = "Data Frame Analytics config query is not parsable";
public static final String DATA_FRAME_ANALYTICS_BAD_FIELD_FILTER = "No field [{0}] could be detected";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
autodetectProcessFactory, normalizerFactory, nativeStorageProvider);
this.autodetectProcessManager.set(autodetectProcessManager);
DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(client, settings, xContentRegistry,
auditor, System::currentTimeMillis);
auditor, System::currentTimeMillis, clusterService.getNodeName());
DatafeedManager datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder,
System::currentTimeMillis, auditor, autodetectProcessManager);
this.datafeedManager.set(datafeedManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.ml.MlTasks;
Expand All @@ -45,6 +46,7 @@
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
Expand Down Expand Up @@ -82,6 +84,7 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
private final Auditor auditor;
private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;
private final NamedXContentRegistry xContentRegistry;
private final boolean remoteClusterSearchSupported;

@Inject
public TransportStartDatafeedAction(Settings settings, TransportService transportService, ThreadPool threadPool,
Expand All @@ -100,6 +103,7 @@ public TransportStartDatafeedAction(Settings settings, TransportService transpor
this.auditor = auditor;
this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
this.xContentRegistry = xContentRegistry;
this.remoteClusterSearchSupported = RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings);
}

static void validate(Job job,
Expand Down Expand Up @@ -182,7 +186,7 @@ public void onFailure(Exception e) {
};

// Verify data extractor factory can be created, then start persistent task
Consumer<Job> createDataExtrator = job -> {
Consumer<Job> createDataExtractor = job -> {
if (RemoteClusterLicenseChecker.containsRemoteIndex(params.getDatafeedIndices())) {
final RemoteClusterLicenseChecker remoteClusterLicenseChecker =
new RemoteClusterLicenseChecker(client, XPackLicenseState::isMachineLearningAllowedForOperationMode);
Expand All @@ -194,6 +198,13 @@ public void onFailure(Exception e) {
response -> {
if (response.isSuccess() == false) {
listener.onFailure(createUnlicensedError(params.getDatafeedId(), response));
} else if (remoteClusterSearchSupported == false) {
listener.onFailure(
ExceptionsHelper.badRequestException(Messages.getMessage(
Messages.DATAFEED_NEEDS_REMOTE_CLUSTER_SEARCH,
datafeedConfigHolder.get().getId(),
RemoteClusterLicenseChecker.remoteIndices(datafeedConfigHolder.get().getIndices()),
clusterService.getNodeName())));
} else {
createDataExtractor(job, datafeedConfigHolder.get(), params, waitForTaskListener);
}
Expand All @@ -215,7 +226,7 @@ public void onFailure(Exception e) {
Job job = jobBuilder.build();
validate(job, datafeedConfigHolder.get(), tasks, xContentRegistry);
auditDeprecations(datafeedConfigHolder.get(), job, auditor, xContentRegistry);
createDataExtrator.accept(job);
createDataExtractor.accept(job);
} catch (Exception e) {
listener.onFailure(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,18 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.license.RemoteClusterLicenseChecker;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.xpack.core.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedJobValidator;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.core.ml.job.results.Result;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetector;
import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetectorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
Expand All @@ -29,6 +33,7 @@
import org.elasticsearch.xpack.ml.notifications.Auditor;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
Expand All @@ -41,14 +46,18 @@ public class DatafeedJobBuilder {
private final NamedXContentRegistry xContentRegistry;
private final Auditor auditor;
private final Supplier<Long> currentTimeSupplier;

private final boolean remoteClusterSearchSupported;
private final String nodeName;

public DatafeedJobBuilder(Client client, Settings settings, NamedXContentRegistry xContentRegistry,
Auditor auditor, Supplier<Long> currentTimeSupplier) {
Auditor auditor, Supplier<Long> currentTimeSupplier, String nodeName) {
this.client = client;
this.settings = Objects.requireNonNull(settings);
this.xContentRegistry = Objects.requireNonNull(xContentRegistry);
this.auditor = Objects.requireNonNull(auditor);
this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier);
this.remoteClusterSearchSupported = RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings);
this.nodeName = nodeName;
}

void build(String datafeedId, ActionListener<DatafeedJob> listener) {
Expand Down Expand Up @@ -151,6 +160,18 @@ void build(String datafeedId, JobResultsProvider jobResultsProvider, JobConfigPr
configBuilder -> {
try {
datafeedConfigHolder.set(configBuilder.build());
if (remoteClusterSearchSupported == false) {
List<String> remoteIndices = RemoteClusterLicenseChecker.remoteIndices(datafeedConfigHolder.get().getIndices());
if (remoteIndices.isEmpty() == false) {
listener.onFailure(
ExceptionsHelper.badRequestException(Messages.getMessage(
Messages.DATAFEED_NEEDS_REMOTE_CLUSTER_SEARCH,
configBuilder.getId(),
remoteIndices,
nodeName)));
return;
}
}
jobConfigProvider.getJob(datafeedConfigHolder.get().getJobId(), jobConfigListener);
} catch (Exception e) {
listener.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
import org.elasticsearch.mock.orig.Mockito;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.xpack.core.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
Expand Down Expand Up @@ -60,7 +62,7 @@ public void init() {
when(client.settings()).thenReturn(Settings.EMPTY);
auditor = mock(Auditor.class);
taskHandler = mock(Consumer.class);
datafeedJobBuilder = new DatafeedJobBuilder(client, Settings.EMPTY, xContentRegistry(), auditor, System::currentTimeMillis);
datafeedJobBuilder = new DatafeedJobBuilder(client, Settings.EMPTY, xContentRegistry(), auditor, System::currentTimeMillis, "test_node");

jobResultsProvider = mock(JobResultsProvider.class);
Mockito.doAnswer(invocationOnMock -> {
Expand Down Expand Up @@ -190,6 +192,42 @@ public void testBuild_GivenBucketsRequestFails() {
verify(taskHandler).accept(error);
}

public void testBuildGivenRemoteIndicesButNoRemoteSearching() throws Exception {
Settings settings = Settings.builder().put(RemoteClusterService.ENABLE_REMOTE_CLUSTERS.getKey(), false).build();
datafeedJobBuilder =
new DatafeedJobBuilder(
client,
settings,
xContentRegistry(),
auditor,
System::currentTimeMillis,
"test_node");
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeField("time");
Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob();
jobBuilder.setDataDescription(dataDescription);
jobBuilder.setCreateTime(new Date());
DatafeedConfig.Builder datafeed = DatafeedManagerTests.createDatafeedConfig("datafeed1", jobBuilder.getId());
datafeed.setIndices(Collections.singletonList("remotecluster:index-*"));

AtomicBoolean wasHandlerCalled = new AtomicBoolean(false);
ActionListener<DatafeedJob> datafeedJobHandler = ActionListener.wrap(
datafeedJob -> fail("datafeed builder did not fail when remote index was given and remote clusters were not enabled"),
e -> {
assertThat(e.getMessage(), equalTo(Messages.getMessage(Messages.DATAFEED_NEEDS_REMOTE_CLUSTER_SEARCH,
"datafeed1",
"[remotecluster:index-*]",
"test_node")));
wasHandlerCalled.compareAndSet(false, true);
}
);

givenJob(jobBuilder);
givenDatafeed(datafeed);
datafeedJobBuilder.build("datafeed1", jobResultsProvider, jobConfigProvider, datafeedConfigProvider, datafeedJobHandler);
assertBusy(() -> wasHandlerCalled.get());
}

private void givenJob(Job.Builder job) {
Mockito.doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
Expand Down

0 comments on commit 5d551ba

Please sign in to comment.