From a9c5d4319c837ec685f012bc4a04cc185cdd99f4 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 17 Aug 2018 23:10:51 -0400 Subject: [PATCH 01/23] Generalize remote license checker Machine learning has baked a remote license checker for use in checking license compatibility of a remote license. This remote license checker has general usage for any feature that relies on a remote cluster. For example, cross-cluster replication will pull changes from a remote cluster and require that the local and remote clusters have platinum licenses. This commit generalizes the remote cluster license check for use in cross-cluster replication. --- .../license/RemoteClusterLicenseChecker.java | 278 ++++++++++++++++++ .../RemoteClusterLicenseCheckerTests.java | 225 ++++++++++++++ .../action/TransportStartDatafeedAction.java | 27 +- .../ml/datafeed/DatafeedNodeSelector.java | 3 +- .../ml/datafeed/MlRemoteLicenseChecker.java | 193 ------------ .../TransportStartDatafeedActionTests.java | 21 +- .../datafeed/MlRemoteLicenseCheckerTests.java | 199 ------------- 7 files changed, 540 insertions(+), 406 deletions(-) create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java delete mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseChecker.java delete mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseCheckerTests.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java new file mode 100644 index 0000000000000..d1685d1a0e4b2 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java @@ -0,0 +1,278 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.license; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.protocol.xpack.XPackInfoRequest; +import org.elasticsearch.protocol.xpack.XPackInfoResponse; +import org.elasticsearch.protocol.xpack.license.LicenseStatus; +import org.elasticsearch.transport.RemoteClusterAware; +import org.elasticsearch.xpack.core.action.XPackInfoAction; + +import java.util.EnumSet; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +/** + * Checks remote clusters for license compatibility with a specified license predicate. + */ +public class RemoteClusterLicenseChecker { + + /** + * Encapsulates the license info of a remote cluster. + */ + public static class RemoteClusterLicenseInfo { + + private final String clusterName; + + /** + * The name of the remote cluster. + * + * @return the cluster name + */ + public String clusterName() { + return clusterName; + } + + private final XPackInfoResponse.LicenseInfo licenseInfo; + + /** + * The license info of the remote cluster. + * + * @return the license info + */ + public XPackInfoResponse.LicenseInfo licenseInfo() { + return licenseInfo; + } + + RemoteClusterLicenseInfo(final String clusterName, final XPackInfoResponse.LicenseInfo licenseInfo) { + this.clusterName = clusterName; + this.licenseInfo = licenseInfo; + } + + } + + /** + * Encapsulates a remote cluster license check. The check is either successful if the license of the remote cluster is compatible with + * the predicate used to check license compatibility, or the check is a failure. + */ + public static final class LicenseCheck { + + private final RemoteClusterLicenseInfo remoteClusterLicenseInfo; + + /** + * The remote cluster license info. This method should only be invoked if this instance represents a failing license check. + * + * @return the remote cluster license info + */ + public RemoteClusterLicenseInfo remoteClusterLicenseInfo() { + assert isSuccess() == false; + return remoteClusterLicenseInfo; + } + + private static final LicenseCheck SUCCESS = new LicenseCheck(null); + + /** + * A successful license check. + * + * @return a successful license check instance + */ + public static LicenseCheck success() { + return SUCCESS; + } + + /** + * Test if this instance represents a successful license check. + * + * @return true if this instance represents a successful license check, otherwise false + */ + public boolean isSuccess() { + return this == SUCCESS; + } + + /** + * Creates a failing license check encapsulating the specified remote cluster license info. + * + * @param remoteClusterLicenseInfo the remote cluster license info + * @return a failing license check + */ + public static LicenseCheck failure(final RemoteClusterLicenseInfo remoteClusterLicenseInfo) { + return new LicenseCheck(remoteClusterLicenseInfo); + } + + private LicenseCheck(final RemoteClusterLicenseInfo remoteClusterLicenseInfo) { + this.remoteClusterLicenseInfo = remoteClusterLicenseInfo; + } + + } + + private final Client client; + private final Predicate licensePredicate; + + /** + * Constructs a remote license checker with the specified license predicate for checking license compatibility. The predicate does not + * need to check for the active license state as this is handled by the remote cluster license checker. + * + * @param client the client + * @param licensePredicate the license predicate + */ + public RemoteClusterLicenseChecker(final Client client, final Predicate licensePredicate) { + this.client = client; + this.licensePredicate = licensePredicate; + } + + public static boolean isLicensePlatinumOrTrial(final XPackInfoResponse.LicenseInfo licenseInfo) { + final License.OperationMode mode = License.OperationMode.resolve(licenseInfo.getMode()); + return mode == License.OperationMode.PLATINUM || mode == License.OperationMode.TRIAL; + } + + /** + * Checks the specified clusters for license compatibility. The specified callback will be invoked once if all clusters are + * license-compatible, otherwise the specified callback will be invoked once on the first cluster that is not license-compatible. + * + * @param clusterNames the cluster names to check + * @param listener a callback + */ + public void checkRemoteClusterLicenses(final List clusterNames, final ActionListener listener) { + final Iterator clusterNamesIterator = clusterNames.iterator(); + if (clusterNamesIterator.hasNext() == false) { + listener.onResponse(LicenseCheck.success()); + return; + } + + final AtomicReference clusterName = new AtomicReference<>(); + + final ActionListener infoListener = new ActionListener() { + + @Override + public void onResponse(final XPackInfoResponse xPackInfoResponse) { + final XPackInfoResponse.LicenseInfo licenseInfo = xPackInfoResponse.getLicenseInfo(); + if (licenseInfo.getStatus() == LicenseStatus.ACTIVE == false || licensePredicate.test(licenseInfo) == false) { + listener.onResponse(LicenseCheck.failure(new RemoteClusterLicenseInfo(clusterName.get(), licenseInfo))); + return; + } + + if (clusterNamesIterator.hasNext()) { + clusterName.set(clusterNamesIterator.next()); + remoteClusterLicense(clusterName.get(), this); + } else { + listener.onResponse(LicenseCheck.success()); + } + } + + @Override + public void onFailure(final Exception e) { + final String message = "could not determine the licence type for cluster [" + clusterName.get() + "]"; + listener.onFailure(new ElasticsearchException(message, e)); + } + + }; + + // check the license on the first cluster, and then we recursively check licenses on the remaining clusters + clusterName.set(clusterNamesIterator.next()); + remoteClusterLicense(clusterName.get(), infoListener); + } + + private void remoteClusterLicense(final String clusterName, final ActionListener listener) { + final Client remoteClusterClient = client.getRemoteClusterClient(clusterName); + final ThreadContext threadContext = remoteClusterClient.threadPool().getThreadContext(); + try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { + // we stash any context here since this is an internal execution and should not leak any existing context information + threadContext.markAsSystemContext(); + + final XPackInfoRequest request = new XPackInfoRequest(); + request.setCategories(EnumSet.of(XPackInfoRequest.Category.LICENSE)); + remoteClusterClient.execute(XPackInfoAction.INSTANCE, request, listener); + } + } + + /** + * Predicate to test if the index name represents the name of a remote index. + * + * @param index the index name + * @return true if the collection of indices contains a remote index, otherwise false + */ + public static boolean isRemoteIndex(final String index) { + return index.indexOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR) != -1; + } + + /** + * Predicate to test if the collection of index names contains any that represent the name of a remote index. + * + * @param indices the collection of index names + * @return true if the collection of index names contains a name that represents a remote index, otherwise false + */ + public static boolean containsRemoteIndex(final List indices) { + return indices.stream().anyMatch(RemoteClusterLicenseChecker::isRemoteIndex); + } + + /** + * Filters the collection of index names for names that represent a remote index. Remote index names are of the form + * {@code cluster_name:index_name}. + * + * @param indices the collection of index names + * @return list of index names that represent remote index names + */ + public static List remoteIndices(final List indices) { + return indices.stream().filter(RemoteClusterLicenseChecker::isRemoteIndex).collect(Collectors.toList()); + } + + /** + * Extract the list of remote cluster names from the list of index names. Remote index names are of the form + * {@code cluster_name:index_name} and the cluster_name is extracted for each index name that represents a remote index. + * + * @param indices the collection of index names + * @return the remote cluster names + */ + public static List remoteClusterNames(final List indices) { + return indices.stream() + .filter(RemoteClusterLicenseChecker::isRemoteIndex) + .map(index -> index.substring(0, index.indexOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR))) + .distinct() + .collect(Collectors.toList()); + } + + /** + * Constructs an error message for license incompatibility. + * + * @param feature the name of the feature that initiated the remote cluster license check. + * @param remoteClusterLicenseInfo the remote cluster license info of the cluster that failed the license check + * @return an error message representing license incompatibility + */ + public static String buildErrorMessage( + final String feature, + final RemoteClusterLicenseInfo remoteClusterLicenseInfo, + final Predicate predicate) { + final StringBuilder error = new StringBuilder(); + if (remoteClusterLicenseInfo.licenseInfo().getStatus() != LicenseStatus.ACTIVE) { + error.append(String.format("The license on cluster [%s] is not active. ", remoteClusterLicenseInfo.clusterName())); + } else { + final License.OperationMode mode = License.OperationMode.resolve(remoteClusterLicenseInfo.licenseInfo().getMode()); + if (predicate.test(remoteClusterLicenseInfo.licenseInfo())) { + throw new IllegalStateException("license must be incompatible to build error message"); + } else { + final String message = String.format( + "The license mode [%s] on cluster [%s] does not enable [%s]. ", + mode, + remoteClusterLicenseInfo.clusterName(), + feature); + error.append(message); + } + } + + error.append(Strings.toString(remoteClusterLicenseInfo.licenseInfo())); + return error.toString(); + } + +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java new file mode 100644 index 0000000000000..eec1e5da1dead --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java @@ -0,0 +1,225 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.license; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.protocol.xpack.XPackInfoResponse; +import org.elasticsearch.protocol.xpack.license.LicenseStatus; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.action.XPackInfoAction; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasToString; +import static org.hamcrest.Matchers.is; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class RemoteClusterLicenseCheckerTests extends ESTestCase { + + public void testNoRemoteIndex() { + final List indices = Arrays.asList("local-index1", "local-index2"); + assertFalse(RemoteClusterLicenseChecker.containsRemoteIndex(indices)); + } + + public void testRemoteIndex() { + final List indices = Arrays.asList("local-index1", "remote-cluster:remote-index2"); + assertTrue(RemoteClusterLicenseChecker.containsRemoteIndex(indices)); + } + + public void testNoRemoteIndices() { + final List indices = Collections.singletonList("local-index"); + assertThat(RemoteClusterLicenseChecker.remoteIndices(indices), is(empty())); + } + + public void testRemoteIndices() { + final List indices = Arrays.asList("local-index", "remote-cluster:index1", "local-index2", "remote-cluster2:index1"); + assertThat( + RemoteClusterLicenseChecker.remoteIndices(indices), + containsInAnyOrder("remote-cluster:index1", "remote-cluster2:index1")); + } + + public void testNoRemoteClusterNames() { + final List indices = Arrays.asList("local-index1", "local-index2"); + assertThat(RemoteClusterLicenseChecker.remoteClusterNames(indices), empty()); + } + + public void testOneRemoteClusterNames() { + final List indices = Arrays.asList("local-index1", "remote-cluster1:remote-index2"); + assertThat(RemoteClusterLicenseChecker.remoteClusterNames(indices), contains("remote-cluster1")); + } + + public void testMoreThanOneRemoteClusterName() { + final List indices = Arrays.asList("remote-cluster1:index2", "index1", "remote-cluster2:index1"); + assertThat(RemoteClusterLicenseChecker.remoteClusterNames(indices), contains("remote-cluster1", "remote-cluster2")); + } + + public void testDuplicateRemoteClusterNames() { + final List indices = Arrays.asList("remote-cluster1:index2", "index1", "remote-cluster2:index1", "remote-cluster2:index2"); + assertThat(RemoteClusterLicenseChecker.remoteClusterNames(indices), contains("remote-cluster1", "remote-cluster2")); + } + + public void testCheckRemoteClusterLicensesGivenCompatibleLicenses() { + final AtomicInteger index = new AtomicInteger(0); + final List responses = new ArrayList<>(); + + final Client client = createMockClient(); + doAnswer(invocationMock -> { + @SuppressWarnings("unchecked") ActionListener listener = + (ActionListener) invocationMock.getArguments()[2]; + listener.onResponse(responses.get(index.getAndIncrement())); + return null; + }).when(client).execute(same(XPackInfoAction.INSTANCE), any(), any()); + + + final List remoteClusterNames = Arrays.asList("valid1", "valid2", "valid3"); + responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null)); + responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null)); + responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null)); + + final RemoteClusterLicenseChecker licenseChecker = + new RemoteClusterLicenseChecker(client, RemoteClusterLicenseChecker::isLicensePlatinumOrTrial); + final AtomicReference licenseCheck = new AtomicReference<>(); + + licenseChecker.checkRemoteClusterLicenses(remoteClusterNames, + new ActionListener() { + + @Override + public void onResponse(RemoteClusterLicenseChecker.LicenseCheck response) { + licenseCheck.set(response); + } + + @Override + public void onFailure(Exception e) { + fail(e.getMessage()); + } + + }); + + verify(client, times(3)).execute(same(XPackInfoAction.INSTANCE), any(), any()); + assertNotNull(licenseCheck.get()); + assertTrue(licenseCheck.get().isSuccess()); + } + + public void testCheckRemoteClusterLicensesGivenIncompatibleLicense() { + final AtomicInteger index = new AtomicInteger(0); + final List remoteClusterNames = Arrays.asList("good", "cluster-with-basic-license", "good2"); + final List responses = new ArrayList<>(); + responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null)); + responses.add(new XPackInfoResponse(null, createBasicLicenseResponse(), null)); + responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null)); + + final Client client = createMockClient(); + doAnswer(invocationMock -> { + @SuppressWarnings("unchecked") ActionListener listener = + (ActionListener) invocationMock.getArguments()[2]; + listener.onResponse(responses.get(index.getAndIncrement())); + return null; + }).when(client).execute(same(XPackInfoAction.INSTANCE), any(), any()); + + final RemoteClusterLicenseChecker licenseChecker = + new RemoteClusterLicenseChecker(client, RemoteClusterLicenseChecker::isLicensePlatinumOrTrial); + final AtomicReference licenseCheck = new AtomicReference<>(); + + licenseChecker.checkRemoteClusterLicenses( + remoteClusterNames, + new ActionListener() { + + @Override + public void onResponse(final RemoteClusterLicenseChecker.LicenseCheck response) { + licenseCheck.set(response); + } + + @Override + public void onFailure(final Exception e) { + fail(e.getMessage()); + } + + }); + + verify(client, times(2)).execute(same(XPackInfoAction.INSTANCE), any(), any()); + assertNotNull(licenseCheck.get()); + assertFalse(licenseCheck.get().isSuccess()); + assertThat(licenseCheck.get().remoteClusterLicenseInfo().clusterName(), equalTo("cluster-with-basic-license")); + assertThat(licenseCheck.get().remoteClusterLicenseInfo().licenseInfo().getType(), equalTo("BASIC")); + } + + + public void testBuildErrorMessageForActiveCompatibleLicense() { + final XPackInfoResponse.LicenseInfo platinumLicence = createPlatinumLicenseResponse(); + final RemoteClusterLicenseChecker.RemoteClusterLicenseInfo info = + new RemoteClusterLicenseChecker.RemoteClusterLicenseInfo("platinum-cluster", platinumLicence); + final IllegalStateException e = expectThrows( + IllegalStateException.class, + () -> RemoteClusterLicenseChecker.buildErrorMessage("", info, RemoteClusterLicenseChecker::isLicensePlatinumOrTrial)); + assertThat(e, hasToString(containsString("license must be incompatible to build error message"))); + } + + public void testBuildErrorMessageForIncompatibleLicense() { + final XPackInfoResponse.LicenseInfo basicLicense = createBasicLicenseResponse(); + final RemoteClusterLicenseChecker.RemoteClusterLicenseInfo info = + new RemoteClusterLicenseChecker.RemoteClusterLicenseInfo("basic-cluster", basicLicense); + final String expected = "The license mode [BASIC] on cluster [basic-cluster] does not enable [Feature]. " + + Strings.toString(basicLicense); + assertThat( + RemoteClusterLicenseChecker.buildErrorMessage("Feature", info, RemoteClusterLicenseChecker::isLicensePlatinumOrTrial), + equalTo(expected)); + } + + public void testBuildErrorMessageForInactiveLicense() { + final XPackInfoResponse.LicenseInfo expiredLicense = createExpiredLicenseResponse(); + final RemoteClusterLicenseChecker.RemoteClusterLicenseInfo info = + new RemoteClusterLicenseChecker.RemoteClusterLicenseInfo("expired-cluster", expiredLicense); + final String expected = "The license on cluster [expired-cluster] is not active. " + Strings.toString(expiredLicense); + assertThat( + RemoteClusterLicenseChecker.buildErrorMessage("Feature", info, RemoteClusterLicenseChecker::isLicensePlatinumOrTrial), + equalTo(expected)); + } + + private Client createMockClient() { + final Client client = mock(Client.class); + final ThreadPool threadPool = mock(ThreadPool.class); + when(client.threadPool()).thenReturn(threadPool); + when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); + when(client.getRemoteClusterClient(anyString())).thenReturn(client); + return client; + } + + private XPackInfoResponse.LicenseInfo createPlatinumLicenseResponse() { + return new XPackInfoResponse.LicenseInfo("uid", "PLATINUM", "PLATINUM", LicenseStatus.ACTIVE, randomNonNegativeLong()); + } + + private XPackInfoResponse.LicenseInfo createBasicLicenseResponse() { + return new XPackInfoResponse.LicenseInfo("uid", "BASIC", "BASIC", LicenseStatus.ACTIVE, randomNonNegativeLong()); + } + + private XPackInfoResponse.LicenseInfo createExpiredLicenseResponse() { + return new XPackInfoResponse.LicenseInfo("uid", "PLATINUM", "PLATINUM", LicenseStatus.EXPIRED, randomNonNegativeLong()); + } + +} \ No newline at end of file diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index 0ea9eb7764803..cf932f4ca54ca 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.license.LicenseUtils; +import org.elasticsearch.license.RemoteClusterLicenseChecker; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.persistent.PersistentTaskState; @@ -46,7 +47,6 @@ import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.datafeed.DatafeedManager; import org.elasticsearch.xpack.ml.datafeed.DatafeedNodeSelector; -import org.elasticsearch.xpack.ml.datafeed.MlRemoteLicenseChecker; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; import java.util.List; @@ -141,19 +141,22 @@ public void onFailure(Exception e) { DatafeedConfig datafeed = mlMetadata.getDatafeed(params.getDatafeedId()); Job job = mlMetadata.getJobs().get(datafeed.getJobId()); - if (MlRemoteLicenseChecker.containsRemoteIndex(datafeed.getIndices())) { - MlRemoteLicenseChecker remoteLicenseChecker = new MlRemoteLicenseChecker(client); - remoteLicenseChecker.checkRemoteClusterLicenses(MlRemoteLicenseChecker.remoteClusterNames(datafeed.getIndices()), + if (RemoteClusterLicenseChecker.containsRemoteIndex(datafeed.getIndices())) { + final RemoteClusterLicenseChecker remoteClusterLicenseChecker = + new RemoteClusterLicenseChecker(client, RemoteClusterLicenseChecker::isLicensePlatinumOrTrial); + remoteClusterLicenseChecker.checkRemoteClusterLicenses( + RemoteClusterLicenseChecker.remoteClusterNames(datafeed.getIndices()), ActionListener.wrap( response -> { - if (response.isViolated()) { + if (response.isSuccess() == false) { listener.onFailure(createUnlicensedError(datafeed.getId(), response)); } else { createDataExtractor(job, datafeed, params, waitForTaskListener); } }, - e -> listener.onFailure(createUnknownLicenseError(datafeed.getId(), - MlRemoteLicenseChecker.remoteIndices(datafeed.getIndices()), e)) + e -> listener.onFailure( + createUnknownLicenseError( + datafeed.getId(), RemoteClusterLicenseChecker.remoteIndices(datafeed.getIndices()), e)) )); } else { createDataExtractor(job, datafeed, params, waitForTaskListener); @@ -232,12 +235,12 @@ public void onFailure(Exception e) { ); } - private ElasticsearchStatusException createUnlicensedError(String datafeedId, - MlRemoteLicenseChecker.LicenseViolation licenseViolation) { - String message = "Cannot start datafeed [" + datafeedId + "] as it is configured to use " - + "indices on a remote cluster [" + licenseViolation.get().getClusterName() + private ElasticsearchStatusException createUnlicensedError( + final String datafeedId, final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) { + final String message = "Cannot start datafeed [" + datafeedId + "] as it is configured to use " + + "indices on a remote cluster [" + licenseCheck.remoteClusterLicenseInfo().clusterName() + "] that is not licensed for Machine Learning. " - + MlRemoteLicenseChecker.buildErrorMessage(licenseViolation.get()); + + RemoteClusterLicenseChecker.buildErrorMessage("Machine Learning", licenseCheck.remoteClusterLicenseInfo(), RemoteClusterLicenseChecker::isLicensePlatinumOrTrial); return new ElasticsearchStatusException(message, RestStatus.BAD_REQUEST); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java index a6be047648623..ce3f611b2227a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java @@ -12,6 +12,7 @@ import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.license.RemoteClusterLicenseChecker; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; @@ -92,7 +93,7 @@ private AssignmentFailure verifyIndicesActive(DatafeedConfig datafeed) { List indices = datafeed.getIndices(); for (String index : indices) { - if (MlRemoteLicenseChecker.isRemoteIndex(index)) { + if (RemoteClusterLicenseChecker.isRemoteIndex(index)) { // We cannot verify remote indices continue; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseChecker.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseChecker.java deleted file mode 100644 index b0eeed2c800ec..0000000000000 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseChecker.java +++ /dev/null @@ -1,193 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -package org.elasticsearch.xpack.ml.datafeed; - -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.client.Client; -import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.Strings; -import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.license.License; -import org.elasticsearch.protocol.xpack.XPackInfoRequest; -import org.elasticsearch.protocol.xpack.XPackInfoResponse; -import org.elasticsearch.protocol.xpack.license.LicenseStatus; -import org.elasticsearch.transport.ActionNotFoundTransportException; -import org.elasticsearch.transport.RemoteClusterAware; -import org.elasticsearch.xpack.core.action.XPackInfoAction; - -import java.util.EnumSet; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; - -/** - * ML datafeeds can use cross cluster search to access data in a remote cluster. - * The remote cluster should be licenced for ML this class performs that check - * using the _xpack (info) endpoint. - */ -public class MlRemoteLicenseChecker { - - private final Client client; - - public static class RemoteClusterLicenseInfo { - private final String clusterName; - private final XPackInfoResponse.LicenseInfo licenseInfo; - - RemoteClusterLicenseInfo(String clusterName, XPackInfoResponse.LicenseInfo licenseInfo) { - this.clusterName = clusterName; - this.licenseInfo = licenseInfo; - } - - public String getClusterName() { - return clusterName; - } - - public XPackInfoResponse.LicenseInfo getLicenseInfo() { - return licenseInfo; - } - } - - public class LicenseViolation { - private final RemoteClusterLicenseInfo licenseInfo; - - private LicenseViolation(@Nullable RemoteClusterLicenseInfo licenseInfo) { - this.licenseInfo = licenseInfo; - } - - public boolean isViolated() { - return licenseInfo != null; - } - - public RemoteClusterLicenseInfo get() { - return licenseInfo; - } - } - - public MlRemoteLicenseChecker(Client client) { - this.client = client; - } - - /** - * Check each cluster is licensed for ML. - * This function evaluates lazily and will terminate when the first cluster - * that is not licensed is found or an error occurs. - * - * @param clusterNames List of remote cluster names - * @param listener Response listener - */ - public void checkRemoteClusterLicenses(List clusterNames, ActionListener listener) { - final Iterator itr = clusterNames.iterator(); - if (itr.hasNext() == false) { - listener.onResponse(new LicenseViolation(null)); - return; - } - - final AtomicReference clusterName = new AtomicReference<>(itr.next()); - - ActionListener infoListener = new ActionListener() { - @Override - public void onResponse(XPackInfoResponse xPackInfoResponse) { - if (licenseSupportsML(xPackInfoResponse.getLicenseInfo()) == false) { - listener.onResponse(new LicenseViolation( - new RemoteClusterLicenseInfo(clusterName.get(), xPackInfoResponse.getLicenseInfo()))); - return; - } - - if (itr.hasNext()) { - clusterName.set(itr.next()); - remoteClusterLicense(clusterName.get(), this); - } else { - listener.onResponse(new LicenseViolation(null)); - } - } - - @Override - public void onFailure(Exception e) { - String message = "Could not determine the X-Pack licence type for cluster [" + clusterName.get() + "]"; - if (e instanceof ActionNotFoundTransportException) { - // This is likely to be because x-pack is not installed in the target cluster - message += ". Is X-Pack installed on the target cluster?"; - } - listener.onFailure(new ElasticsearchException(message, e)); - } - }; - - remoteClusterLicense(clusterName.get(), infoListener); - } - - private void remoteClusterLicense(String clusterName, ActionListener listener) { - Client remoteClusterClient = client.getRemoteClusterClient(clusterName); - ThreadContext threadContext = remoteClusterClient.threadPool().getThreadContext(); - try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { - // we stash any context here since this is an internal execution and should not leak any - // existing context information. - threadContext.markAsSystemContext(); - - XPackInfoRequest request = new XPackInfoRequest(); - request.setCategories(EnumSet.of(XPackInfoRequest.Category.LICENSE)); - remoteClusterClient.execute(XPackInfoAction.INSTANCE, request, listener); - } - } - - static boolean licenseSupportsML(XPackInfoResponse.LicenseInfo licenseInfo) { - License.OperationMode mode = License.OperationMode.resolve(licenseInfo.getMode()); - return licenseInfo.getStatus() == LicenseStatus.ACTIVE && - (mode == License.OperationMode.PLATINUM || mode == License.OperationMode.TRIAL); - } - - public static boolean isRemoteIndex(String index) { - return index.indexOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR) != -1; - } - - public static boolean containsRemoteIndex(List indices) { - return indices.stream().anyMatch(MlRemoteLicenseChecker::isRemoteIndex); - } - - /** - * Get any remote indices used in cross cluster search. - * Remote indices are of the form {@code cluster_name:index_name} - * @return List of remote cluster indices - */ - public static List remoteIndices(List indices) { - return indices.stream().filter(MlRemoteLicenseChecker::isRemoteIndex).collect(Collectors.toList()); - } - - /** - * Extract the list of remote cluster names from the list of indices. - * @param indices List of indices. Remote cluster indices are prefixed - * with {@code cluster-name:} - * @return Every cluster name found in {@code indices} - */ - public static List remoteClusterNames(List indices) { - return indices.stream() - .filter(MlRemoteLicenseChecker::isRemoteIndex) - .map(index -> index.substring(0, index.indexOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR))) - .distinct() - .collect(Collectors.toList()); - } - - public static String buildErrorMessage(RemoteClusterLicenseInfo clusterLicenseInfo) { - StringBuilder error = new StringBuilder(); - if (clusterLicenseInfo.licenseInfo.getStatus() != LicenseStatus.ACTIVE) { - error.append("The license on cluster [").append(clusterLicenseInfo.clusterName) - .append("] is not active. "); - } else { - License.OperationMode mode = License.OperationMode.resolve(clusterLicenseInfo.licenseInfo.getMode()); - if (mode != License.OperationMode.PLATINUM && mode != License.OperationMode.TRIAL) { - error.append("The license mode [").append(mode) - .append("] on cluster [") - .append(clusterLicenseInfo.clusterName) - .append("] does not enable Machine Learning. "); - } - } - - error.append(Strings.toString(clusterLicenseInfo.licenseInfo)); - return error.toString(); - } -} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedActionTests.java index 72c8d361dd882..19c3eaae1bdaa 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedActionTests.java @@ -3,10 +3,15 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ + package org.elasticsearch.xpack.ml.action; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.license.RemoteClusterLicenseChecker; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.protocol.xpack.XPackInfoResponse; +import org.elasticsearch.protocol.xpack.license.LicenseStatus; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.MlMetadata; @@ -14,7 +19,6 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.ml.datafeed.DatafeedManager; import org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests; @@ -84,6 +88,21 @@ public void testValidate_jobOpened() { TransportStartDatafeedAction.validate("foo-datafeed", mlMetadata2, tasks); } + public void testLicenseSupportsML() { + XPackInfoResponse.LicenseInfo licenseInfo = new XPackInfoResponse.LicenseInfo("uid", "trial", "trial", + LicenseStatus.ACTIVE, randomNonNegativeLong()); + assertTrue(RemoteClusterLicenseChecker.isLicensePlatinumOrTrial(licenseInfo)); + + licenseInfo = new XPackInfoResponse.LicenseInfo("uid", "trial", "trial", LicenseStatus.EXPIRED, randomNonNegativeLong()); + assertFalse(RemoteClusterLicenseChecker.isLicensePlatinumOrTrial(licenseInfo)); + + licenseInfo = new XPackInfoResponse.LicenseInfo("uid", "GOLD", "GOLD", LicenseStatus.ACTIVE, randomNonNegativeLong()); + assertFalse(RemoteClusterLicenseChecker.isLicensePlatinumOrTrial(licenseInfo)); + + licenseInfo = new XPackInfoResponse.LicenseInfo("uid", "PLATINUM", "PLATINUM", LicenseStatus.ACTIVE, randomNonNegativeLong()); + assertTrue(RemoteClusterLicenseChecker.isLicensePlatinumOrTrial(licenseInfo)); + } + public static TransportStartDatafeedAction.DatafeedTask createDatafeedTask(long id, String type, String action, TaskId parentTaskId, StartDatafeedAction.DatafeedParams params, diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseCheckerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseCheckerTests.java deleted file mode 100644 index 81e4c75cfad7c..0000000000000 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseCheckerTests.java +++ /dev/null @@ -1,199 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -package org.elasticsearch.xpack.ml.datafeed; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.client.Client; -import org.elasticsearch.common.Strings; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.protocol.xpack.XPackInfoResponse; -import org.elasticsearch.protocol.xpack.license.LicenseStatus; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.action.XPackInfoAction; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.empty; -import static org.hamcrest.Matchers.is; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.same; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -public class MlRemoteLicenseCheckerTests extends ESTestCase { - - public void testIsRemoteIndex() { - List indices = Arrays.asList("local-index1", "local-index2"); - assertFalse(MlRemoteLicenseChecker.containsRemoteIndex(indices)); - indices = Arrays.asList("local-index1", "remote-cluster:remote-index2"); - assertTrue(MlRemoteLicenseChecker.containsRemoteIndex(indices)); - } - - public void testRemoteIndices() { - List indices = Collections.singletonList("local-index"); - assertThat(MlRemoteLicenseChecker.remoteIndices(indices), is(empty())); - indices = Arrays.asList("local-index", "remote-cluster:index1", "local-index2", "remote-cluster2:index1"); - assertThat(MlRemoteLicenseChecker.remoteIndices(indices), containsInAnyOrder("remote-cluster:index1", "remote-cluster2:index1")); - } - - public void testRemoteClusterNames() { - List indices = Arrays.asList("local-index1", "local-index2"); - assertThat(MlRemoteLicenseChecker.remoteClusterNames(indices), empty()); - indices = Arrays.asList("local-index1", "remote-cluster1:remote-index2"); - assertThat(MlRemoteLicenseChecker.remoteClusterNames(indices), contains("remote-cluster1")); - indices = Arrays.asList("remote-cluster1:index2", "index1", "remote-cluster2:index1"); - assertThat(MlRemoteLicenseChecker.remoteClusterNames(indices), contains("remote-cluster1", "remote-cluster2")); - indices = Arrays.asList("remote-cluster1:index2", "index1", "remote-cluster2:index1", "remote-cluster2:index2"); - assertThat(MlRemoteLicenseChecker.remoteClusterNames(indices), contains("remote-cluster1", "remote-cluster2")); - } - - public void testLicenseSupportsML() { - XPackInfoResponse.LicenseInfo licenseInfo = new XPackInfoResponse.LicenseInfo("uid", "trial", "trial", - LicenseStatus.ACTIVE, randomNonNegativeLong()); - assertTrue(MlRemoteLicenseChecker.licenseSupportsML(licenseInfo)); - - licenseInfo = new XPackInfoResponse.LicenseInfo("uid", "trial", "trial", LicenseStatus.EXPIRED, randomNonNegativeLong()); - assertFalse(MlRemoteLicenseChecker.licenseSupportsML(licenseInfo)); - - licenseInfo = new XPackInfoResponse.LicenseInfo("uid", "GOLD", "GOLD", LicenseStatus.ACTIVE, randomNonNegativeLong()); - assertFalse(MlRemoteLicenseChecker.licenseSupportsML(licenseInfo)); - - licenseInfo = new XPackInfoResponse.LicenseInfo("uid", "PLATINUM", "PLATINUM", LicenseStatus.ACTIVE, randomNonNegativeLong()); - assertTrue(MlRemoteLicenseChecker.licenseSupportsML(licenseInfo)); - } - - public void testCheckRemoteClusterLicenses_givenValidLicenses() { - final AtomicInteger index = new AtomicInteger(0); - final List responses = new ArrayList<>(); - - Client client = createMockClient(); - doAnswer(invocationMock -> { - @SuppressWarnings("raw_types") - ActionListener listener = (ActionListener) invocationMock.getArguments()[2]; - listener.onResponse(responses.get(index.getAndIncrement())); - return null; - }).when(client).execute(same(XPackInfoAction.INSTANCE), any(), any()); - - - List remoteClusterNames = Arrays.asList("valid1", "valid2", "valid3"); - responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null)); - responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null)); - responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null)); - - MlRemoteLicenseChecker licenseChecker = new MlRemoteLicenseChecker(client); - AtomicReference licCheckResponse = new AtomicReference<>(); - - licenseChecker.checkRemoteClusterLicenses(remoteClusterNames, - new ActionListener() { - @Override - public void onResponse(MlRemoteLicenseChecker.LicenseViolation response) { - licCheckResponse.set(response); - } - - @Override - public void onFailure(Exception e) { - fail(e.getMessage()); - } - }); - - verify(client, times(3)).execute(same(XPackInfoAction.INSTANCE), any(), any()); - assertNotNull(licCheckResponse.get()); - assertFalse(licCheckResponse.get().isViolated()); - assertNull(licCheckResponse.get().get()); - } - - public void testCheckRemoteClusterLicenses_givenInvalidLicense() { - final AtomicInteger index = new AtomicInteger(0); - List remoteClusterNames = Arrays.asList("good", "cluster-with-basic-license", "good2"); - final List responses = new ArrayList<>(); - responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null)); - responses.add(new XPackInfoResponse(null, createBasicLicenseResponse(), null)); - responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null)); - - Client client = createMockClient(); - doAnswer(invocationMock -> { - @SuppressWarnings("raw_types") - ActionListener listener = (ActionListener) invocationMock.getArguments()[2]; - listener.onResponse(responses.get(index.getAndIncrement())); - return null; - }).when(client).execute(same(XPackInfoAction.INSTANCE), any(), any()); - - MlRemoteLicenseChecker licenseChecker = new MlRemoteLicenseChecker(client); - AtomicReference licCheckResponse = new AtomicReference<>(); - - licenseChecker.checkRemoteClusterLicenses(remoteClusterNames, - new ActionListener() { - @Override - public void onResponse(MlRemoteLicenseChecker.LicenseViolation response) { - licCheckResponse.set(response); - } - - @Override - public void onFailure(Exception e) { - fail(e.getMessage()); - } - }); - - verify(client, times(2)).execute(same(XPackInfoAction.INSTANCE), any(), any()); - assertNotNull(licCheckResponse.get()); - assertTrue(licCheckResponse.get().isViolated()); - assertEquals("cluster-with-basic-license", licCheckResponse.get().get().getClusterName()); - assertEquals("BASIC", licCheckResponse.get().get().getLicenseInfo().getType()); - } - - public void testBuildErrorMessage() { - XPackInfoResponse.LicenseInfo platinumLicence = createPlatinumLicenseResponse(); - MlRemoteLicenseChecker.RemoteClusterLicenseInfo info = - new MlRemoteLicenseChecker.RemoteClusterLicenseInfo("platinum-cluster", platinumLicence); - assertEquals(Strings.toString(platinumLicence), MlRemoteLicenseChecker.buildErrorMessage(info)); - - XPackInfoResponse.LicenseInfo basicLicense = createBasicLicenseResponse(); - info = new MlRemoteLicenseChecker.RemoteClusterLicenseInfo("basic-cluster", basicLicense); - String expected = "The license mode [BASIC] on cluster [basic-cluster] does not enable Machine Learning. " - + Strings.toString(basicLicense); - assertEquals(expected, MlRemoteLicenseChecker.buildErrorMessage(info)); - - XPackInfoResponse.LicenseInfo expiredLicense = createExpiredLicenseResponse(); - info = new MlRemoteLicenseChecker.RemoteClusterLicenseInfo("expired-cluster", expiredLicense); - expected = "The license on cluster [expired-cluster] is not active. " + Strings.toString(expiredLicense); - assertEquals(expected, MlRemoteLicenseChecker.buildErrorMessage(info)); - } - - private Client createMockClient() { - Client client = mock(Client.class); - ThreadPool threadPool = mock(ThreadPool.class); - when(client.threadPool()).thenReturn(threadPool); - when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); - when(client.getRemoteClusterClient(anyString())).thenReturn(client); - return client; - } - - private XPackInfoResponse.LicenseInfo createPlatinumLicenseResponse() { - return new XPackInfoResponse.LicenseInfo("uid", "PLATINUM", "PLATINUM", LicenseStatus.ACTIVE, randomNonNegativeLong()); - } - - private XPackInfoResponse.LicenseInfo createBasicLicenseResponse() { - return new XPackInfoResponse.LicenseInfo("uid", "BASIC", "BASIC", LicenseStatus.ACTIVE, randomNonNegativeLong()); - } - - private XPackInfoResponse.LicenseInfo createExpiredLicenseResponse() { - return new XPackInfoResponse.LicenseInfo("uid", "PLATINUM", "PLATINUM", LicenseStatus.EXPIRED, randomNonNegativeLong()); - } -} From 36f3c6acf96b793fe5749be225bbada02514bafb Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 17 Aug 2018 23:23:03 -0400 Subject: [PATCH 02/23] Remove unnecessary test --- .../action/TransportStartDatafeedActionTests.java | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedActionTests.java index 19c3eaae1bdaa..c4cdc52eab514 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedActionTests.java @@ -88,21 +88,6 @@ public void testValidate_jobOpened() { TransportStartDatafeedAction.validate("foo-datafeed", mlMetadata2, tasks); } - public void testLicenseSupportsML() { - XPackInfoResponse.LicenseInfo licenseInfo = new XPackInfoResponse.LicenseInfo("uid", "trial", "trial", - LicenseStatus.ACTIVE, randomNonNegativeLong()); - assertTrue(RemoteClusterLicenseChecker.isLicensePlatinumOrTrial(licenseInfo)); - - licenseInfo = new XPackInfoResponse.LicenseInfo("uid", "trial", "trial", LicenseStatus.EXPIRED, randomNonNegativeLong()); - assertFalse(RemoteClusterLicenseChecker.isLicensePlatinumOrTrial(licenseInfo)); - - licenseInfo = new XPackInfoResponse.LicenseInfo("uid", "GOLD", "GOLD", LicenseStatus.ACTIVE, randomNonNegativeLong()); - assertFalse(RemoteClusterLicenseChecker.isLicensePlatinumOrTrial(licenseInfo)); - - licenseInfo = new XPackInfoResponse.LicenseInfo("uid", "PLATINUM", "PLATINUM", LicenseStatus.ACTIVE, randomNonNegativeLong()); - assertTrue(RemoteClusterLicenseChecker.isLicensePlatinumOrTrial(licenseInfo)); - } - public static TransportStartDatafeedAction.DatafeedTask createDatafeedTask(long id, String type, String action, TaskId parentTaskId, StartDatafeedAction.DatafeedParams params, From 71e1c3e8477f1a7eafa035748a08dd79f2f74153 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 17 Aug 2018 23:23:35 -0400 Subject: [PATCH 03/23] Fix imports --- .../xpack/ml/action/TransportStartDatafeedActionTests.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedActionTests.java index c4cdc52eab514..610a5c1b92fb6 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedActionTests.java @@ -8,10 +8,7 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceNotFoundException; -import org.elasticsearch.license.RemoteClusterLicenseChecker; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; -import org.elasticsearch.protocol.xpack.XPackInfoResponse; -import org.elasticsearch.protocol.xpack.license.LicenseStatus; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.MlMetadata; From a0ba2f31b30bbd0fbafe8d2274439a7b6832c499 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 17 Aug 2018 23:31:37 -0400 Subject: [PATCH 04/23] Make class final --- .../org/elasticsearch/license/RemoteClusterLicenseChecker.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java index d1685d1a0e4b2..4f3e8d8c19863 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java @@ -27,7 +27,7 @@ /** * Checks remote clusters for license compatibility with a specified license predicate. */ -public class RemoteClusterLicenseChecker { +public final class RemoteClusterLicenseChecker { /** * Encapsulates the license info of a remote cluster. From fcbf9b25e07c381d96442d3e9bcce0161c89ea0b Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 17 Aug 2018 23:32:00 -0400 Subject: [PATCH 05/23] Make class final --- .../elasticsearch/license/RemoteClusterLicenseCheckerTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java index eec1e5da1dead..08e410efb53e4 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java @@ -40,7 +40,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -public class RemoteClusterLicenseCheckerTests extends ESTestCase { +public final class RemoteClusterLicenseCheckerTests extends ESTestCase { public void testNoRemoteIndex() { final List indices = Arrays.asList("local-index1", "local-index2"); From 9eff09beef3fab68c4b8aa2c4db2bd218e7aa102 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 17 Aug 2018 23:38:27 -0400 Subject: [PATCH 06/23] A little more cleanup --- .../license/RemoteClusterLicenseChecker.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java index 4f3e8d8c19863..7501bd0ffb41d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java @@ -118,18 +118,18 @@ private LicenseCheck(final RemoteClusterLicenseInfo remoteClusterLicenseInfo) { } private final Client client; - private final Predicate licensePredicate; + private final Predicate predicate; /** - * Constructs a remote license checker with the specified license predicate for checking license compatibility. The predicate does not - * need to check for the active license state as this is handled by the remote cluster license checker. + * Constructs a remote cluster license checker with the specified license predicate for checking license compatibility. The predicate + * does not need to check for the active license state as this is handled by the remote cluster license checker. * - * @param client the client - * @param licensePredicate the license predicate + * @param client the client + * @param predicate the license predicate */ - public RemoteClusterLicenseChecker(final Client client, final Predicate licensePredicate) { + public RemoteClusterLicenseChecker(final Client client, final Predicate predicate) { this.client = client; - this.licensePredicate = licensePredicate; + this.predicate = predicate; } public static boolean isLicensePlatinumOrTrial(final XPackInfoResponse.LicenseInfo licenseInfo) { @@ -158,7 +158,7 @@ public void checkRemoteClusterLicenses(final List clusterNames, final Ac @Override public void onResponse(final XPackInfoResponse xPackInfoResponse) { final XPackInfoResponse.LicenseInfo licenseInfo = xPackInfoResponse.getLicenseInfo(); - if (licenseInfo.getStatus() == LicenseStatus.ACTIVE == false || licensePredicate.test(licenseInfo) == false) { + if (licenseInfo.getStatus() == LicenseStatus.ACTIVE == false || predicate.test(licenseInfo) == false) { listener.onResponse(LicenseCheck.failure(new RemoteClusterLicenseInfo(clusterName.get(), licenseInfo))); return; } From 0c2a6e81687c53d44824a36530636155f0c002dd Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 17 Aug 2018 23:39:39 -0400 Subject: [PATCH 07/23] One more comment --- .../org/elasticsearch/license/RemoteClusterLicenseChecker.java | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java index 7501bd0ffb41d..7f5dfbe5ca890 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java @@ -163,6 +163,7 @@ public void onResponse(final XPackInfoResponse xPackInfoResponse) { return; } + // recurse to the next cluster if (clusterNamesIterator.hasNext()) { clusterName.set(clusterNamesIterator.next()); remoteClusterLicense(clusterName.get(), this); From ae84438468fc38d318ab4a289d8660aa2be58a98 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 17 Aug 2018 23:40:03 -0400 Subject: [PATCH 08/23] Move comment --- .../org/elasticsearch/license/RemoteClusterLicenseChecker.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java index 7f5dfbe5ca890..8d11967951717 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java @@ -163,9 +163,9 @@ public void onResponse(final XPackInfoResponse xPackInfoResponse) { return; } - // recurse to the next cluster if (clusterNamesIterator.hasNext()) { clusterName.set(clusterNamesIterator.next()); + // recurse to the next cluster remoteClusterLicense(clusterName.get(), this); } else { listener.onResponse(LicenseCheck.success()); From c28c95075f6b9423fa67db56213c1c9b9c9b5ebe Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Sat, 18 Aug 2018 09:32:11 -0400 Subject: [PATCH 09/23] Fix forbidden API violations --- .../elasticsearch/license/RemoteClusterLicenseChecker.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java index 8d11967951717..a953f3d73f7b2 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java @@ -20,6 +20,7 @@ import java.util.EnumSet; import java.util.Iterator; import java.util.List; +import java.util.Locale; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -257,13 +258,14 @@ public static String buildErrorMessage( final Predicate predicate) { final StringBuilder error = new StringBuilder(); if (remoteClusterLicenseInfo.licenseInfo().getStatus() != LicenseStatus.ACTIVE) { - error.append(String.format("The license on cluster [%s] is not active. ", remoteClusterLicenseInfo.clusterName())); + error.append(String.format(Locale.ROOT, "The license on cluster [%s] is not active. ", remoteClusterLicenseInfo.clusterName())); } else { final License.OperationMode mode = License.OperationMode.resolve(remoteClusterLicenseInfo.licenseInfo().getMode()); if (predicate.test(remoteClusterLicenseInfo.licenseInfo())) { throw new IllegalStateException("license must be incompatible to build error message"); } else { final String message = String.format( + Locale.ROOT, "The license mode [%s] on cluster [%s] does not enable [%s]. ", mode, remoteClusterLicenseInfo.clusterName(), From 87ad2182a7056ea85c35b4a8a135364605b6e39a Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Sat, 18 Aug 2018 09:32:25 -0400 Subject: [PATCH 10/23] Make class final --- .../org/elasticsearch/license/RemoteClusterLicenseChecker.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java index a953f3d73f7b2..3a70532ed53a5 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java @@ -33,7 +33,7 @@ public final class RemoteClusterLicenseChecker { /** * Encapsulates the license info of a remote cluster. */ - public static class RemoteClusterLicenseInfo { + public static final class RemoteClusterLicenseInfo { private final String clusterName; From 374470232ca037e2853a5522ee6c36d1e8ed78db Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Sat, 18 Aug 2018 09:32:39 -0400 Subject: [PATCH 11/23] Add missing newline --- .../elasticsearch/license/RemoteClusterLicenseCheckerTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java index 08e410efb53e4..ad1f21dd69b58 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java @@ -222,4 +222,4 @@ private XPackInfoResponse.LicenseInfo createExpiredLicenseResponse() { return new XPackInfoResponse.LicenseInfo("uid", "PLATINUM", "PLATINUM", LicenseStatus.EXPIRED, randomNonNegativeLong()); } -} \ No newline at end of file +} From 2f8d5ad8f0407479be4e2323309c0fcd37ddbbda Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Sat, 18 Aug 2018 12:43:36 -0400 Subject: [PATCH 12/23] Simplify error message --- .../license/RemoteClusterLicenseChecker.java | 8 +++----- .../license/RemoteClusterLicenseCheckerTests.java | 7 ++----- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java index 3a70532ed53a5..194bf76ede19a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java @@ -258,23 +258,21 @@ public static String buildErrorMessage( final Predicate predicate) { final StringBuilder error = new StringBuilder(); if (remoteClusterLicenseInfo.licenseInfo().getStatus() != LicenseStatus.ACTIVE) { - error.append(String.format(Locale.ROOT, "The license on cluster [%s] is not active. ", remoteClusterLicenseInfo.clusterName())); + error.append(String.format(Locale.ROOT, "the license on cluster [%s] is not active", remoteClusterLicenseInfo.clusterName())); } else { - final License.OperationMode mode = License.OperationMode.resolve(remoteClusterLicenseInfo.licenseInfo().getMode()); if (predicate.test(remoteClusterLicenseInfo.licenseInfo())) { throw new IllegalStateException("license must be incompatible to build error message"); } else { final String message = String.format( Locale.ROOT, - "The license mode [%s] on cluster [%s] does not enable [%s]. ", - mode, + "the license mode [%s] on cluster [%s] does not enable [%s]", + License.OperationMode.resolve(remoteClusterLicenseInfo.licenseInfo().getMode()), remoteClusterLicenseInfo.clusterName(), feature); error.append(message); } } - error.append(Strings.toString(remoteClusterLicenseInfo.licenseInfo())); return error.toString(); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java index ad1f21dd69b58..c01576b95adf3 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java @@ -184,21 +184,18 @@ public void testBuildErrorMessageForIncompatibleLicense() { final XPackInfoResponse.LicenseInfo basicLicense = createBasicLicenseResponse(); final RemoteClusterLicenseChecker.RemoteClusterLicenseInfo info = new RemoteClusterLicenseChecker.RemoteClusterLicenseInfo("basic-cluster", basicLicense); - final String expected = "The license mode [BASIC] on cluster [basic-cluster] does not enable [Feature]. " - + Strings.toString(basicLicense); assertThat( RemoteClusterLicenseChecker.buildErrorMessage("Feature", info, RemoteClusterLicenseChecker::isLicensePlatinumOrTrial), - equalTo(expected)); + equalTo("the license mode [BASIC] on cluster [basic-cluster] does not enable [Feature]")); } public void testBuildErrorMessageForInactiveLicense() { final XPackInfoResponse.LicenseInfo expiredLicense = createExpiredLicenseResponse(); final RemoteClusterLicenseChecker.RemoteClusterLicenseInfo info = new RemoteClusterLicenseChecker.RemoteClusterLicenseInfo("expired-cluster", expiredLicense); - final String expected = "The license on cluster [expired-cluster] is not active. " + Strings.toString(expiredLicense); assertThat( RemoteClusterLicenseChecker.buildErrorMessage("Feature", info, RemoteClusterLicenseChecker::isLicensePlatinumOrTrial), - equalTo(expected)); + equalTo("the license on cluster [expired-cluster] is not active")); } private Client createMockClient() { From 0ed0b7eaa6a36bb5557f93c07e918d4e5c21b846 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Sat, 18 Aug 2018 12:44:08 -0400 Subject: [PATCH 13/23] Remove import --- .../org/elasticsearch/license/RemoteClusterLicenseChecker.java | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java index 194bf76ede19a..a92162e368267 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java @@ -9,7 +9,6 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.protocol.xpack.XPackInfoRequest; import org.elasticsearch.protocol.xpack.XPackInfoResponse; From e16617a41a40fe9199a166b075eb6e3fc4a263e6 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Sat, 18 Aug 2018 12:49:26 -0400 Subject: [PATCH 14/23] Fix line length violation --- .../action/TransportStartDatafeedAction.java | 27 ++++++++++++------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index cf932f4ca54ca..7b649ffce9478 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -50,6 +50,7 @@ import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.function.Predicate; @@ -237,19 +238,25 @@ public void onFailure(Exception e) { private ElasticsearchStatusException createUnlicensedError( final String datafeedId, final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) { - final String message = "Cannot start datafeed [" + datafeedId + "] as it is configured to use " - + "indices on a remote cluster [" + licenseCheck.remoteClusterLicenseInfo().clusterName() - + "] that is not licensed for Machine Learning. " - + RemoteClusterLicenseChecker.buildErrorMessage("Machine Learning", licenseCheck.remoteClusterLicenseInfo(), RemoteClusterLicenseChecker::isLicensePlatinumOrTrial); - + final String message = String.format( + Locale.ROOT, + "cannot start datafeed [%s] as it is configured to use indices on remote cluster [%s] that is not licensed for ml; %s", + datafeedId, + licenseCheck.remoteClusterLicenseInfo().clusterName(), + RemoteClusterLicenseChecker.buildErrorMessage( + "ml", + licenseCheck.remoteClusterLicenseInfo(), + RemoteClusterLicenseChecker::isLicensePlatinumOrTrial)); return new ElasticsearchStatusException(message, RestStatus.BAD_REQUEST); } - private ElasticsearchStatusException createUnknownLicenseError(String datafeedId, List remoteIndices, - Exception cause) { - String message = "Cannot start datafeed [" + datafeedId + "] as it is configured to use" - + " indices on a remote cluster " + remoteIndices - + " but the license type could not be verified"; + private ElasticsearchStatusException createUnknownLicenseError( + final String datafeedId, final List remoteIndices, final Exception cause) { + final String message = String.format( + Locale.ROOT, + "cannot start datafeed [%s] as it uses indices on remote cluster [%s] but the license type could not be verified", + datafeedId, + remoteIndices); return new ElasticsearchStatusException(message, RestStatus.BAD_REQUEST, new Exception(cause.getMessage())); } From 952d501f7cbcfa80c8ad9b388ae924cc548a545b Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Sat, 18 Aug 2018 12:56:08 -0400 Subject: [PATCH 15/23] Fix terminology --- .../license/RemoteClusterLicenseChecker.java | 54 +++++++++---------- .../RemoteClusterLicenseCheckerTests.java | 27 +++++----- .../action/TransportStartDatafeedAction.java | 4 +- 3 files changed, 42 insertions(+), 43 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java index a92162e368267..7f24b69179707 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java @@ -34,15 +34,15 @@ public final class RemoteClusterLicenseChecker { */ public static final class RemoteClusterLicenseInfo { - private final String clusterName; + private final String clusterAlias; /** - * The name of the remote cluster. + * The alias of the remote cluster. * - * @return the cluster name + * @return the cluster alias */ - public String clusterName() { - return clusterName; + public String clusterAlias() { + return clusterAlias; } private final XPackInfoResponse.LicenseInfo licenseInfo; @@ -56,8 +56,8 @@ public XPackInfoResponse.LicenseInfo licenseInfo() { return licenseInfo; } - RemoteClusterLicenseInfo(final String clusterName, final XPackInfoResponse.LicenseInfo licenseInfo) { - this.clusterName = clusterName; + RemoteClusterLicenseInfo(final String clusterAlias, final XPackInfoResponse.LicenseInfo licenseInfo) { + this.clusterAlias = clusterAlias; this.licenseInfo = licenseInfo; } @@ -141,17 +141,17 @@ public static boolean isLicensePlatinumOrTrial(final XPackInfoResponse.LicenseIn * Checks the specified clusters for license compatibility. The specified callback will be invoked once if all clusters are * license-compatible, otherwise the specified callback will be invoked once on the first cluster that is not license-compatible. * - * @param clusterNames the cluster names to check - * @param listener a callback + * @param clusterAliases the cluster aliases to check + * @param listener a callback */ - public void checkRemoteClusterLicenses(final List clusterNames, final ActionListener listener) { - final Iterator clusterNamesIterator = clusterNames.iterator(); - if (clusterNamesIterator.hasNext() == false) { + public void checkRemoteClusterLicenses(final List clusterAliases, final ActionListener listener) { + final Iterator clusterAliasesIterator = clusterAliases.iterator(); + if (clusterAliasesIterator.hasNext() == false) { listener.onResponse(LicenseCheck.success()); return; } - final AtomicReference clusterName = new AtomicReference<>(); + final AtomicReference clusterAlias = new AtomicReference<>(); final ActionListener infoListener = new ActionListener() { @@ -159,14 +159,14 @@ public void checkRemoteClusterLicenses(final List clusterNames, final Ac public void onResponse(final XPackInfoResponse xPackInfoResponse) { final XPackInfoResponse.LicenseInfo licenseInfo = xPackInfoResponse.getLicenseInfo(); if (licenseInfo.getStatus() == LicenseStatus.ACTIVE == false || predicate.test(licenseInfo) == false) { - listener.onResponse(LicenseCheck.failure(new RemoteClusterLicenseInfo(clusterName.get(), licenseInfo))); + listener.onResponse(LicenseCheck.failure(new RemoteClusterLicenseInfo(clusterAlias.get(), licenseInfo))); return; } - if (clusterNamesIterator.hasNext()) { - clusterName.set(clusterNamesIterator.next()); + if (clusterAliasesIterator.hasNext()) { + clusterAlias.set(clusterAliasesIterator.next()); // recurse to the next cluster - remoteClusterLicense(clusterName.get(), this); + remoteClusterLicense(clusterAlias.get(), this); } else { listener.onResponse(LicenseCheck.success()); } @@ -174,19 +174,19 @@ public void onResponse(final XPackInfoResponse xPackInfoResponse) { @Override public void onFailure(final Exception e) { - final String message = "could not determine the licence type for cluster [" + clusterName.get() + "]"; + final String message = "could not determine the licence type for cluster [" + clusterAlias.get() + "]"; listener.onFailure(new ElasticsearchException(message, e)); } }; // check the license on the first cluster, and then we recursively check licenses on the remaining clusters - clusterName.set(clusterNamesIterator.next()); - remoteClusterLicense(clusterName.get(), infoListener); + clusterAlias.set(clusterAliasesIterator.next()); + remoteClusterLicense(clusterAlias.get(), infoListener); } - private void remoteClusterLicense(final String clusterName, final ActionListener listener) { - final Client remoteClusterClient = client.getRemoteClusterClient(clusterName); + private void remoteClusterLicense(final String clusterAlias, final ActionListener listener) { + final Client remoteClusterClient = client.getRemoteClusterClient(clusterAlias); final ThreadContext threadContext = remoteClusterClient.threadPool().getThreadContext(); try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { // we stash any context here since this is an internal execution and should not leak any existing context information @@ -230,13 +230,13 @@ public static List remoteIndices(final List indices) { } /** - * Extract the list of remote cluster names from the list of index names. Remote index names are of the form - * {@code cluster_name:index_name} and the cluster_name is extracted for each index name that represents a remote index. + * Extract the list of remote cluster aliases from the list of index names. Remote index names are of the form + * {@code cluster_alias:index_name} and the cluster_alias is extracted for each index name that represents a remote index. * * @param indices the collection of index names * @return the remote cluster names */ - public static List remoteClusterNames(final List indices) { + public static List remoteClusterAliases(final List indices) { return indices.stream() .filter(RemoteClusterLicenseChecker::isRemoteIndex) .map(index -> index.substring(0, index.indexOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR))) @@ -257,7 +257,7 @@ public static String buildErrorMessage( final Predicate predicate) { final StringBuilder error = new StringBuilder(); if (remoteClusterLicenseInfo.licenseInfo().getStatus() != LicenseStatus.ACTIVE) { - error.append(String.format(Locale.ROOT, "the license on cluster [%s] is not active", remoteClusterLicenseInfo.clusterName())); + error.append(String.format(Locale.ROOT, "the license on cluster [%s] is not active", remoteClusterLicenseInfo.clusterAlias())); } else { if (predicate.test(remoteClusterLicenseInfo.licenseInfo())) { throw new IllegalStateException("license must be incompatible to build error message"); @@ -266,7 +266,7 @@ public static String buildErrorMessage( Locale.ROOT, "the license mode [%s] on cluster [%s] does not enable [%s]", License.OperationMode.resolve(remoteClusterLicenseInfo.licenseInfo().getMode()), - remoteClusterLicenseInfo.clusterName(), + remoteClusterLicenseInfo.clusterAlias(), feature); error.append(message); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java index c01576b95adf3..0a45ce3c30c0a 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java @@ -8,7 +8,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.protocol.xpack.XPackInfoResponse; @@ -64,24 +63,24 @@ public void testRemoteIndices() { containsInAnyOrder("remote-cluster:index1", "remote-cluster2:index1")); } - public void testNoRemoteClusterNames() { + public void testNoRemoteClusterAliases() { final List indices = Arrays.asList("local-index1", "local-index2"); - assertThat(RemoteClusterLicenseChecker.remoteClusterNames(indices), empty()); + assertThat(RemoteClusterLicenseChecker.remoteClusterAliases(indices), empty()); } - public void testOneRemoteClusterNames() { + public void testOneRemoteClusterAlias() { final List indices = Arrays.asList("local-index1", "remote-cluster1:remote-index2"); - assertThat(RemoteClusterLicenseChecker.remoteClusterNames(indices), contains("remote-cluster1")); + assertThat(RemoteClusterLicenseChecker.remoteClusterAliases(indices), contains("remote-cluster1")); } - public void testMoreThanOneRemoteClusterName() { + public void testMoreThanOneRemoteClusterAlias() { final List indices = Arrays.asList("remote-cluster1:index2", "index1", "remote-cluster2:index1"); - assertThat(RemoteClusterLicenseChecker.remoteClusterNames(indices), contains("remote-cluster1", "remote-cluster2")); + assertThat(RemoteClusterLicenseChecker.remoteClusterAliases(indices), contains("remote-cluster1", "remote-cluster2")); } - public void testDuplicateRemoteClusterNames() { + public void testDuplicateRemoteClusterAlias() { final List indices = Arrays.asList("remote-cluster1:index2", "index1", "remote-cluster2:index1", "remote-cluster2:index2"); - assertThat(RemoteClusterLicenseChecker.remoteClusterNames(indices), contains("remote-cluster1", "remote-cluster2")); + assertThat(RemoteClusterLicenseChecker.remoteClusterAliases(indices), contains("remote-cluster1", "remote-cluster2")); } public void testCheckRemoteClusterLicensesGivenCompatibleLicenses() { @@ -97,7 +96,7 @@ public void testCheckRemoteClusterLicensesGivenCompatibleLicenses() { }).when(client).execute(same(XPackInfoAction.INSTANCE), any(), any()); - final List remoteClusterNames = Arrays.asList("valid1", "valid2", "valid3"); + final List remoteClusterAliases = Arrays.asList("valid1", "valid2", "valid3"); responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null)); responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null)); responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null)); @@ -106,7 +105,7 @@ public void testCheckRemoteClusterLicensesGivenCompatibleLicenses() { new RemoteClusterLicenseChecker(client, RemoteClusterLicenseChecker::isLicensePlatinumOrTrial); final AtomicReference licenseCheck = new AtomicReference<>(); - licenseChecker.checkRemoteClusterLicenses(remoteClusterNames, + licenseChecker.checkRemoteClusterLicenses(remoteClusterAliases, new ActionListener() { @Override @@ -128,7 +127,7 @@ public void onFailure(Exception e) { public void testCheckRemoteClusterLicensesGivenIncompatibleLicense() { final AtomicInteger index = new AtomicInteger(0); - final List remoteClusterNames = Arrays.asList("good", "cluster-with-basic-license", "good2"); + final List remoteClusterAliases = Arrays.asList("good", "cluster-with-basic-license", "good2"); final List responses = new ArrayList<>(); responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null)); responses.add(new XPackInfoResponse(null, createBasicLicenseResponse(), null)); @@ -147,7 +146,7 @@ public void testCheckRemoteClusterLicensesGivenIncompatibleLicense() { final AtomicReference licenseCheck = new AtomicReference<>(); licenseChecker.checkRemoteClusterLicenses( - remoteClusterNames, + remoteClusterAliases, new ActionListener() { @Override @@ -165,7 +164,7 @@ public void onFailure(final Exception e) { verify(client, times(2)).execute(same(XPackInfoAction.INSTANCE), any(), any()); assertNotNull(licenseCheck.get()); assertFalse(licenseCheck.get().isSuccess()); - assertThat(licenseCheck.get().remoteClusterLicenseInfo().clusterName(), equalTo("cluster-with-basic-license")); + assertThat(licenseCheck.get().remoteClusterLicenseInfo().clusterAlias(), equalTo("cluster-with-basic-license")); assertThat(licenseCheck.get().remoteClusterLicenseInfo().licenseInfo().getType(), equalTo("BASIC")); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index 7b649ffce9478..3dee34401bae9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -146,7 +146,7 @@ public void onFailure(Exception e) { final RemoteClusterLicenseChecker remoteClusterLicenseChecker = new RemoteClusterLicenseChecker(client, RemoteClusterLicenseChecker::isLicensePlatinumOrTrial); remoteClusterLicenseChecker.checkRemoteClusterLicenses( - RemoteClusterLicenseChecker.remoteClusterNames(datafeed.getIndices()), + RemoteClusterLicenseChecker.remoteClusterAliases(datafeed.getIndices()), ActionListener.wrap( response -> { if (response.isSuccess() == false) { @@ -242,7 +242,7 @@ private ElasticsearchStatusException createUnlicensedError( Locale.ROOT, "cannot start datafeed [%s] as it is configured to use indices on remote cluster [%s] that is not licensed for ml; %s", datafeedId, - licenseCheck.remoteClusterLicenseInfo().clusterName(), + licenseCheck.remoteClusterLicenseInfo().clusterAlias(), RemoteClusterLicenseChecker.buildErrorMessage( "ml", licenseCheck.remoteClusterLicenseInfo(), From 8f25930bd7681b05ccada87e6786c2b9c10ce7c6 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Sat, 18 Aug 2018 19:30:54 -0400 Subject: [PATCH 16/23] Fix bug in getting remote client! --- .../license/RemoteClusterLicenseChecker.java | 11 +-- .../RemoteClusterLicenseCheckerTests.java | 70 +++++++++++++++++-- 2 files changed, 72 insertions(+), 9 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java index 7f24b69179707..eaef459814170 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java @@ -174,7 +174,7 @@ public void onResponse(final XPackInfoResponse xPackInfoResponse) { @Override public void onFailure(final Exception e) { - final String message = "could not determine the licence type for cluster [" + clusterAlias.get() + "]"; + final String message = "could not determine the license type for cluster [" + clusterAlias.get() + "]"; listener.onFailure(new ElasticsearchException(message, e)); } @@ -186,15 +186,18 @@ public void onFailure(final Exception e) { } private void remoteClusterLicense(final String clusterAlias, final ActionListener listener) { - final Client remoteClusterClient = client.getRemoteClusterClient(clusterAlias); - final ThreadContext threadContext = remoteClusterClient.threadPool().getThreadContext(); + final ThreadContext threadContext = client.threadPool().getThreadContext(); try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { // we stash any context here since this is an internal execution and should not leak any existing context information threadContext.markAsSystemContext(); final XPackInfoRequest request = new XPackInfoRequest(); request.setCategories(EnumSet.of(XPackInfoRequest.Category.LICENSE)); - remoteClusterClient.execute(XPackInfoAction.INSTANCE, request, listener); + try { + client.getRemoteClusterClient(clusterAlias).execute(XPackInfoAction.INSTANCE, request, listener); + } catch (final Exception e) { + listener.onFailure(e); + } } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java index 0a45ce3c30c0a..8abfeac149289 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java @@ -6,6 +6,7 @@ package org.elasticsearch.license; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.common.settings.Settings; @@ -22,6 +23,7 @@ import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -29,9 +31,12 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasToString; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.same; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -84,7 +89,7 @@ public void testDuplicateRemoteClusterAlias() { } public void testCheckRemoteClusterLicensesGivenCompatibleLicenses() { - final AtomicInteger index = new AtomicInteger(0); + final AtomicInteger index = new AtomicInteger(); final List responses = new ArrayList<>(); final Client client = createMockClient(); @@ -109,12 +114,12 @@ public void testCheckRemoteClusterLicensesGivenCompatibleLicenses() { new ActionListener() { @Override - public void onResponse(RemoteClusterLicenseChecker.LicenseCheck response) { + public void onResponse(final RemoteClusterLicenseChecker.LicenseCheck response) { licenseCheck.set(response); } @Override - public void onFailure(Exception e) { + public void onFailure(final Exception e) { fail(e.getMessage()); } @@ -126,7 +131,7 @@ public void onFailure(Exception e) { } public void testCheckRemoteClusterLicensesGivenIncompatibleLicense() { - final AtomicInteger index = new AtomicInteger(0); + final AtomicInteger index = new AtomicInteger(); final List remoteClusterAliases = Arrays.asList("good", "cluster-with-basic-license", "good2"); final List responses = new ArrayList<>(); responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null)); @@ -168,6 +173,50 @@ public void onFailure(final Exception e) { assertThat(licenseCheck.get().remoteClusterLicenseInfo().licenseInfo().getType(), equalTo("BASIC")); } + public void testCheckRemoteClusterLicencesGivenNonExistentCluster() { + final AtomicInteger index = new AtomicInteger(); + final List responses = new ArrayList<>(); + + final List remoteClusterAliases = Arrays.asList("valid1", "valid2", "valid3"); + final String failingClusterAlias = randomFrom(remoteClusterAliases); + final Client client = createMockClientThatThrowsOnGetRemoteClusterClient(failingClusterAlias); + doAnswer(invocationMock -> { + @SuppressWarnings("unchecked") ActionListener listener = + (ActionListener) invocationMock.getArguments()[2]; + listener.onResponse(responses.get(index.getAndIncrement())); + return null; + }).when(client).execute(same(XPackInfoAction.INSTANCE), any(), any()); + + + responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null)); + responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null)); + responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null)); + + final RemoteClusterLicenseChecker licenseChecker = + new RemoteClusterLicenseChecker(client, RemoteClusterLicenseChecker::isLicensePlatinumOrTrial); + final AtomicReference exception = new AtomicReference<>(); + + licenseChecker.checkRemoteClusterLicenses(remoteClusterAliases, + new ActionListener() { + + @Override + public void onResponse(final RemoteClusterLicenseChecker.LicenseCheck response) { + fail(); + } + + @Override + public void onFailure(final Exception e) { + exception.set(e); + } + + }); + + assertNotNull(exception.get()); + assertThat(exception.get(), instanceOf(ElasticsearchException.class)); + assertThat(exception.get().getMessage(), equalTo("could not determine the license type for cluster [" + failingClusterAlias + "]")); + assertNotNull(exception.get().getCause()); + assertThat(exception.get().getCause(), instanceOf(IllegalArgumentException.class)); + } public void testBuildErrorMessageForActiveCompatibleLicense() { final XPackInfoResponse.LicenseInfo platinumLicence = createPlatinumLicenseResponse(); @@ -198,11 +247,22 @@ public void testBuildErrorMessageForInactiveLicense() { } private Client createMockClient() { + return createMockClient(client -> when(client.getRemoteClusterClient(anyString())).thenReturn(client)); + } + + private Client createMockClientThatThrowsOnGetRemoteClusterClient(final String clusterAlias) { + return createMockClient(client -> { + when(client.getRemoteClusterClient(clusterAlias)).thenThrow(new IllegalArgumentException()); + when(client.getRemoteClusterClient(argThat(not(clusterAlias)))).thenReturn(client); + }); + } + + private Client createMockClient(final Consumer finish) { final Client client = mock(Client.class); final ThreadPool threadPool = mock(ThreadPool.class); when(client.threadPool()).thenReturn(threadPool); when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); - when(client.getRemoteClusterClient(anyString())).thenReturn(client); + finish.accept(client); return client; } From 901acdccd22ba7c2b0ed4d0589613808130f4466 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Sat, 18 Aug 2018 20:00:45 -0400 Subject: [PATCH 17/23] Fix ML error handling --- .../xpack/ml/action/TransportStartDatafeedAction.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index 3dee34401bae9..35a4ac497293d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -252,13 +252,17 @@ private ElasticsearchStatusException createUnlicensedError( private ElasticsearchStatusException createUnknownLicenseError( final String datafeedId, final List remoteIndices, final Exception cause) { + final String remoteClusterQualifier = remoteIndices.size() == 1 ? "a remote cluster" : "remote clusters"; + final String licenseTypeQualifier = remoteIndices.size() == 1 ? "" : "s"; final String message = String.format( Locale.ROOT, - "cannot start datafeed [%s] as it uses indices on remote cluster [%s] but the license type could not be verified", + "cannot start datafeed [%s] as it uses indices on %s %s but the license type%s could not be verified", datafeedId, - remoteIndices); + remoteClusterQualifier, + remoteIndices, + licenseTypeQualifier); - return new ElasticsearchStatusException(message, RestStatus.BAD_REQUEST, new Exception(cause.getMessage())); + return new ElasticsearchStatusException(message, RestStatus.BAD_REQUEST, cause); } public static class StartDatafeedPersistentTasksExecutor extends PersistentTasksExecutor { From 222dea9e0d198d05f8ceca27467b41bc32962ef1 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Sat, 18 Aug 2018 20:48:58 -0400 Subject: [PATCH 18/23] Preserve context after remote license check --- .../license/RemoteClusterLicenseChecker.java | 7 +- .../RemoteClusterLicenseCheckerTests.java | 144 +++++++++++++++--- 2 files changed, 126 insertions(+), 25 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java index eaef459814170..d2c076fbec6cf 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java @@ -8,6 +8,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.protocol.xpack.XPackInfoRequest; @@ -187,6 +188,8 @@ public void onFailure(final Exception e) { private void remoteClusterLicense(final String clusterAlias, final ActionListener listener) { final ThreadContext threadContext = client.threadPool().getThreadContext(); + final ContextPreservingActionListener contextPreservingActionListener = + new ContextPreservingActionListener<>(threadContext.newRestorableContext(false), listener); try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { // we stash any context here since this is an internal execution and should not leak any existing context information threadContext.markAsSystemContext(); @@ -194,9 +197,9 @@ private void remoteClusterLicense(final String clusterAlias, final ActionListene final XPackInfoRequest request = new XPackInfoRequest(); request.setCategories(EnumSet.of(XPackInfoRequest.Category.LICENSE)); try { - client.getRemoteClusterClient(clusterAlias).execute(XPackInfoAction.INSTANCE, request, listener); + client.getRemoteClusterClient(clusterAlias).execute(XPackInfoAction.INSTANCE, request, contextPreservingActionListener); } catch (final Exception e) { - listener.onFailure(e); + contextPreservingActionListener.onFailure(e); } } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java index 8abfeac149289..5677fb51ee907 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.protocol.xpack.XPackInfoResponse; import org.elasticsearch.protocol.xpack.license.LicenseStatus; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.action.XPackInfoAction; @@ -21,6 +22,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -92,7 +94,8 @@ public void testCheckRemoteClusterLicensesGivenCompatibleLicenses() { final AtomicInteger index = new AtomicInteger(); final List responses = new ArrayList<>(); - final Client client = createMockClient(); + final ThreadPool threadPool = createMockThreadPool(); + final Client client = createMockClient(threadPool); doAnswer(invocationMock -> { @SuppressWarnings("unchecked") ActionListener listener = (ActionListener) invocationMock.getArguments()[2]; @@ -100,7 +103,6 @@ public void testCheckRemoteClusterLicensesGivenCompatibleLicenses() { return null; }).when(client).execute(same(XPackInfoAction.INSTANCE), any(), any()); - final List remoteClusterAliases = Arrays.asList("valid1", "valid2", "valid3"); responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null)); responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null)); @@ -110,8 +112,9 @@ public void testCheckRemoteClusterLicensesGivenCompatibleLicenses() { new RemoteClusterLicenseChecker(client, RemoteClusterLicenseChecker::isLicensePlatinumOrTrial); final AtomicReference licenseCheck = new AtomicReference<>(); - licenseChecker.checkRemoteClusterLicenses(remoteClusterAliases, - new ActionListener() { + licenseChecker.checkRemoteClusterLicenses( + remoteClusterAliases, + doubleInvocationProtectingListener(new ActionListener() { @Override public void onResponse(final RemoteClusterLicenseChecker.LicenseCheck response) { @@ -123,7 +126,7 @@ public void onFailure(final Exception e) { fail(e.getMessage()); } - }); + })); verify(client, times(3)).execute(same(XPackInfoAction.INSTANCE), any(), any()); assertNotNull(licenseCheck.get()); @@ -138,7 +141,8 @@ public void testCheckRemoteClusterLicensesGivenIncompatibleLicense() { responses.add(new XPackInfoResponse(null, createBasicLicenseResponse(), null)); responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null)); - final Client client = createMockClient(); + final ThreadPool threadPool = createMockThreadPool(); + final Client client = createMockClient(threadPool); doAnswer(invocationMock -> { @SuppressWarnings("unchecked") ActionListener listener = (ActionListener) invocationMock.getArguments()[2]; @@ -152,7 +156,7 @@ public void testCheckRemoteClusterLicensesGivenIncompatibleLicense() { licenseChecker.checkRemoteClusterLicenses( remoteClusterAliases, - new ActionListener() { + doubleInvocationProtectingListener(new ActionListener() { @Override public void onResponse(final RemoteClusterLicenseChecker.LicenseCheck response) { @@ -164,7 +168,7 @@ public void onFailure(final Exception e) { fail(e.getMessage()); } - }); + })); verify(client, times(2)).execute(same(XPackInfoAction.INSTANCE), any(), any()); assertNotNull(licenseCheck.get()); @@ -179,7 +183,8 @@ public void testCheckRemoteClusterLicencesGivenNonExistentCluster() { final List remoteClusterAliases = Arrays.asList("valid1", "valid2", "valid3"); final String failingClusterAlias = randomFrom(remoteClusterAliases); - final Client client = createMockClientThatThrowsOnGetRemoteClusterClient(failingClusterAlias); + final ThreadPool threadPool = createMockThreadPool(); + final Client client = createMockClientThatThrowsOnGetRemoteClusterClient(threadPool, failingClusterAlias); doAnswer(invocationMock -> { @SuppressWarnings("unchecked") ActionListener listener = (ActionListener) invocationMock.getArguments()[2]; @@ -187,7 +192,6 @@ public void testCheckRemoteClusterLicencesGivenNonExistentCluster() { return null; }).when(client).execute(same(XPackInfoAction.INSTANCE), any(), any()); - responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null)); responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null)); responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null)); @@ -196,8 +200,9 @@ public void testCheckRemoteClusterLicencesGivenNonExistentCluster() { new RemoteClusterLicenseChecker(client, RemoteClusterLicenseChecker::isLicensePlatinumOrTrial); final AtomicReference exception = new AtomicReference<>(); - licenseChecker.checkRemoteClusterLicenses(remoteClusterAliases, - new ActionListener() { + licenseChecker.checkRemoteClusterLicenses( + remoteClusterAliases, + doubleInvocationProtectingListener(new ActionListener() { @Override public void onResponse(final RemoteClusterLicenseChecker.LicenseCheck response) { @@ -209,7 +214,7 @@ public void onFailure(final Exception e) { exception.set(e); } - }); + })); assertNotNull(exception.get()); assertThat(exception.get(), instanceOf(ElasticsearchException.class)); @@ -218,6 +223,69 @@ public void onFailure(final Exception e) { assertThat(exception.get().getCause(), instanceOf(IllegalArgumentException.class)); } + public void testListenerIsExecutedWithCallingContext() throws InterruptedException { + final AtomicInteger index = new AtomicInteger(); + final List responses = new ArrayList<>(); + + final ThreadPool threadPool = new TestThreadPool(getTestName()); + + try { + final List remoteClusterAliases = Arrays.asList("valid1", "valid2", "valid3"); + final Client client; + final boolean failure = randomBoolean(); + if (failure) { + client = createMockClientThatThrowsOnGetRemoteClusterClient(threadPool, randomFrom(remoteClusterAliases)); + } else { + client = createMockClient(threadPool); + } + doAnswer(invocationMock -> { + @SuppressWarnings("unchecked") ActionListener listener = + (ActionListener) invocationMock.getArguments()[2]; + listener.onResponse(responses.get(index.getAndIncrement())); + return null; + }).when(client).execute(same(XPackInfoAction.INSTANCE), any(), any()); + + responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null)); + responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null)); + responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null)); + + final RemoteClusterLicenseChecker licenseChecker = + new RemoteClusterLicenseChecker(client, RemoteClusterLicenseChecker::isLicensePlatinumOrTrial); + + final AtomicBoolean listenerInvoked = new AtomicBoolean(); + threadPool.getThreadContext().putHeader("key", "value"); + licenseChecker.checkRemoteClusterLicenses( + remoteClusterAliases, + doubleInvocationProtectingListener(new ActionListener() { + + @Override + public void onResponse(final RemoteClusterLicenseChecker.LicenseCheck response) { + if (failure) { + fail(); + } + assertThat(threadPool.getThreadContext().getHeader("key"), equalTo("value")); + assertFalse(threadPool.getThreadContext().isSystemContext()); + listenerInvoked.set(true); + } + + @Override + public void onFailure(final Exception e) { + if (failure == false) { + fail(); + } + assertThat(threadPool.getThreadContext().getHeader("key"), equalTo("value")); + assertFalse(threadPool.getThreadContext().isSystemContext()); + listenerInvoked.set(true); + } + + })); + + assertTrue(listenerInvoked.get()); + } finally { + terminate(threadPool); + } + } + public void testBuildErrorMessageForActiveCompatibleLicense() { final XPackInfoResponse.LicenseInfo platinumLicence = createPlatinumLicenseResponse(); final RemoteClusterLicenseChecker.RemoteClusterLicenseInfo info = @@ -246,22 +314,52 @@ public void testBuildErrorMessageForInactiveLicense() { equalTo("the license on cluster [expired-cluster] is not active")); } - private Client createMockClient() { - return createMockClient(client -> when(client.getRemoteClusterClient(anyString())).thenReturn(client)); + private ActionListener doubleInvocationProtectingListener( + final ActionListener listener) { + final AtomicBoolean listenerInvoked = new AtomicBoolean(); + return new ActionListener() { + + @Override + public void onResponse(final RemoteClusterLicenseChecker.LicenseCheck response) { + if (listenerInvoked.compareAndSet(false, true) == false) { + fail("listener invoked twice"); + } + listener.onResponse(response); + } + + @Override + public void onFailure(final Exception e) { + if (listenerInvoked.compareAndSet(false, true) == false) { + fail("listener invoked twice"); + } + listener.onFailure(e); + } + + }; + } + + private ThreadPool createMockThreadPool() { + final ThreadPool threadPool = mock(ThreadPool.class); + when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); + return threadPool; } - private Client createMockClientThatThrowsOnGetRemoteClusterClient(final String clusterAlias) { - return createMockClient(client -> { - when(client.getRemoteClusterClient(clusterAlias)).thenThrow(new IllegalArgumentException()); - when(client.getRemoteClusterClient(argThat(not(clusterAlias)))).thenReturn(client); - }); + private Client createMockClient(final ThreadPool threadPool) { + return createMockClient(threadPool, client -> when(client.getRemoteClusterClient(anyString())).thenReturn(client)); } - private Client createMockClient(final Consumer finish) { + private Client createMockClientThatThrowsOnGetRemoteClusterClient(final ThreadPool threadPool, final String clusterAlias) { + return createMockClient( + threadPool, + client -> { + when(client.getRemoteClusterClient(clusterAlias)).thenThrow(new IllegalArgumentException()); + when(client.getRemoteClusterClient(argThat(not(clusterAlias)))).thenReturn(client); + }); + } + + private Client createMockClient(final ThreadPool threadPool, final Consumer finish) { final Client client = mock(Client.class); - final ThreadPool threadPool = mock(ThreadPool.class); when(client.threadPool()).thenReturn(threadPool); - when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); finish.accept(client); return client; } From 23381ae4db4d28607ced169358ca99c313ae626d Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Sat, 18 Aug 2018 21:00:18 -0400 Subject: [PATCH 19/23] Clarify with () --- .../org/elasticsearch/license/RemoteClusterLicenseChecker.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java index d2c076fbec6cf..77d4a367c5a4e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java @@ -159,7 +159,7 @@ public void checkRemoteClusterLicenses(final List clusterAliases, final @Override public void onResponse(final XPackInfoResponse xPackInfoResponse) { final XPackInfoResponse.LicenseInfo licenseInfo = xPackInfoResponse.getLicenseInfo(); - if (licenseInfo.getStatus() == LicenseStatus.ACTIVE == false || predicate.test(licenseInfo) == false) { + if ((licenseInfo.getStatus() == LicenseStatus.ACTIVE) == false || predicate.test(licenseInfo) == false) { listener.onResponse(LicenseCheck.failure(new RemoteClusterLicenseInfo(clusterAlias.get(), licenseInfo))); return; } From 24fb7defd561d8ec59920c3ab803ec5cb027aa58 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Sat, 18 Aug 2018 22:47:56 -0400 Subject: [PATCH 20/23] Convert to assertion --- .../license/RemoteClusterLicenseChecker.java | 19 ++++++++----------- .../RemoteClusterLicenseCheckerTests.java | 4 ++-- 2 files changed, 10 insertions(+), 13 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java index 77d4a367c5a4e..043224e357b91 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java @@ -265,17 +265,14 @@ public static String buildErrorMessage( if (remoteClusterLicenseInfo.licenseInfo().getStatus() != LicenseStatus.ACTIVE) { error.append(String.format(Locale.ROOT, "the license on cluster [%s] is not active", remoteClusterLicenseInfo.clusterAlias())); } else { - if (predicate.test(remoteClusterLicenseInfo.licenseInfo())) { - throw new IllegalStateException("license must be incompatible to build error message"); - } else { - final String message = String.format( - Locale.ROOT, - "the license mode [%s] on cluster [%s] does not enable [%s]", - License.OperationMode.resolve(remoteClusterLicenseInfo.licenseInfo().getMode()), - remoteClusterLicenseInfo.clusterAlias(), - feature); - error.append(message); - } + assert predicate.test(remoteClusterLicenseInfo.licenseInfo()) == false : "license must be incompatible to build error message"; + final String message = String.format( + Locale.ROOT, + "the license mode [%s] on cluster [%s] does not enable [%s]", + License.OperationMode.resolve(remoteClusterLicenseInfo.licenseInfo().getMode()), + remoteClusterLicenseInfo.clusterAlias(), + feature); + error.append(message); } return error.toString(); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java index 5677fb51ee907..4f8027f1a3cb3 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java @@ -290,8 +290,8 @@ public void testBuildErrorMessageForActiveCompatibleLicense() { final XPackInfoResponse.LicenseInfo platinumLicence = createPlatinumLicenseResponse(); final RemoteClusterLicenseChecker.RemoteClusterLicenseInfo info = new RemoteClusterLicenseChecker.RemoteClusterLicenseInfo("platinum-cluster", platinumLicence); - final IllegalStateException e = expectThrows( - IllegalStateException.class, + final AssertionError e = expectThrows( + AssertionError.class, () -> RemoteClusterLicenseChecker.buildErrorMessage("", info, RemoteClusterLicenseChecker::isLicensePlatinumOrTrial)); assertThat(e, hasToString(containsString("license must be incompatible to build error message"))); } From 28166728d5cf5c24fb80d5def6c6c1594717fd98 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Sat, 18 Aug 2018 22:50:54 -0400 Subject: [PATCH 21/23] More test coverage --- .../RemoteClusterLicenseCheckerTests.java | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java index 4f8027f1a3cb3..e9101c337db01 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java @@ -48,13 +48,21 @@ public final class RemoteClusterLicenseCheckerTests extends ESTestCase { + public void testIsNotRemoteIndex() { + assertFalse(RemoteClusterLicenseChecker.isRemoteIndex("local-index")); + } + + public void testIsRemoteIndex() { + assertTrue(RemoteClusterLicenseChecker.isRemoteIndex("remote-cluster:remote-index")); + } + public void testNoRemoteIndex() { final List indices = Arrays.asList("local-index1", "local-index2"); assertFalse(RemoteClusterLicenseChecker.containsRemoteIndex(indices)); } public void testRemoteIndex() { - final List indices = Arrays.asList("local-index1", "remote-cluster:remote-index2"); + final List indices = Arrays.asList("local-index", "remote-cluster:remote-index"); assertTrue(RemoteClusterLicenseChecker.containsRemoteIndex(indices)); } @@ -64,10 +72,10 @@ public void testNoRemoteIndices() { } public void testRemoteIndices() { - final List indices = Arrays.asList("local-index", "remote-cluster:index1", "local-index2", "remote-cluster2:index1"); + final List indices = Arrays.asList("local-index1", "remote-cluster1:index1", "local-index2", "remote-cluster2:index1"); assertThat( RemoteClusterLicenseChecker.remoteIndices(indices), - containsInAnyOrder("remote-cluster:index1", "remote-cluster2:index1")); + containsInAnyOrder("remote-cluster1:index1", "remote-cluster2:index1")); } public void testNoRemoteClusterAliases() { @@ -76,17 +84,18 @@ public void testNoRemoteClusterAliases() { } public void testOneRemoteClusterAlias() { - final List indices = Arrays.asList("local-index1", "remote-cluster1:remote-index2"); + final List indices = Arrays.asList("local-index1", "remote-cluster1:remote-index1"); assertThat(RemoteClusterLicenseChecker.remoteClusterAliases(indices), contains("remote-cluster1")); } public void testMoreThanOneRemoteClusterAlias() { - final List indices = Arrays.asList("remote-cluster1:index2", "index1", "remote-cluster2:index1"); + final List indices = Arrays.asList("remote-cluster1:remote-index1", "local-index1", "remote-cluster2:remote-index1"); assertThat(RemoteClusterLicenseChecker.remoteClusterAliases(indices), contains("remote-cluster1", "remote-cluster2")); } public void testDuplicateRemoteClusterAlias() { - final List indices = Arrays.asList("remote-cluster1:index2", "index1", "remote-cluster2:index1", "remote-cluster2:index2"); + final List indices = Arrays.asList( + "remote-cluster1:remote-index1", "local-index1", "remote-cluster2:index1", "remote-cluster2:remote-index2"); assertThat(RemoteClusterLicenseChecker.remoteClusterAliases(indices), contains("remote-cluster1", "remote-cluster2")); } From 73fe46caae792ca1367b9a479ecab568d090d2e3 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Sat, 18 Aug 2018 22:56:26 -0400 Subject: [PATCH 22/23] Add test that we run under system context --- .../RemoteClusterLicenseCheckerTests.java | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java index e9101c337db01..a8627d2154209 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java @@ -232,6 +232,32 @@ public void onFailure(final Exception e) { assertThat(exception.get().getCause(), instanceOf(IllegalArgumentException.class)); } + public void testRemoteClusterLicenseCallUsesSystemContext() throws InterruptedException { + final ThreadPool threadPool = new TestThreadPool(getTestName()); + + try { + final Client client = createMockClient(threadPool); + doAnswer(invocationMock -> { + assertTrue(threadPool.getThreadContext().isSystemContext()); + @SuppressWarnings("unchecked") ActionListener listener = + (ActionListener) invocationMock.getArguments()[2]; + listener.onResponse(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null)); + return null; + }).when(client).execute(same(XPackInfoAction.INSTANCE), any(), any()); + + final RemoteClusterLicenseChecker licenseChecker = + new RemoteClusterLicenseChecker(client, RemoteClusterLicenseChecker::isLicensePlatinumOrTrial); + + final List remoteClusterAliases = Collections.singletonList("valid"); + licenseChecker.checkRemoteClusterLicenses( + remoteClusterAliases, doubleInvocationProtectingListener(ActionListener.wrap(() -> {}))); + + verify(client, times(1)).execute(same(XPackInfoAction.INSTANCE), any(), any()); + } finally { + terminate(threadPool); + } + } + public void testListenerIsExecutedWithCallingContext() throws InterruptedException { final AtomicInteger index = new AtomicInteger(); final List responses = new ArrayList<>(); From 5c097f318b3025521d297abbd7e2f57fe60cf132 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 20 Aug 2018 10:05:34 -0400 Subject: [PATCH 23/23] Fix error message --- .../xpack/ml/action/TransportStartDatafeedAction.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index 35a4ac497293d..d6ebdd0449e98 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -252,8 +252,10 @@ private ElasticsearchStatusException createUnlicensedError( private ElasticsearchStatusException createUnknownLicenseError( final String datafeedId, final List remoteIndices, final Exception cause) { - final String remoteClusterQualifier = remoteIndices.size() == 1 ? "a remote cluster" : "remote clusters"; - final String licenseTypeQualifier = remoteIndices.size() == 1 ? "" : "s"; + final int numberOfRemoteClusters = RemoteClusterLicenseChecker.remoteClusterAliases(remoteIndices).size(); + assert numberOfRemoteClusters > 0; + final String remoteClusterQualifier = numberOfRemoteClusters == 1 ? "a remote cluster" : "remote clusters"; + final String licenseTypeQualifier = numberOfRemoteClusters == 1 ? "" : "s"; final String message = String.format( Locale.ROOT, "cannot start datafeed [%s] as it uses indices on %s %s but the license type%s could not be verified",