From b08d02e3b73851b8d4d8446b13c521da6e06b4f0 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 20 Aug 2018 23:33:18 -0400 Subject: [PATCH] Implement CCR licensing (#33002) This commit implements licensing for CCR. CCR will require a platinum license, and administrative endpoints will be disabled when a license is non-compliant. --- x-pack/plugin/ccr/build.gradle | 6 +- .../build.gradle | 39 +++++ .../xpack/ccr/CcrMultiClusterLicenseIT.java | 56 +++++++ .../plugin/ccr/qa/multi-cluster/build.gradle | 2 + .../java/org/elasticsearch/xpack/ccr/Ccr.java | 33 ++++ .../xpack/ccr/CcrLicenseChecker.java | 141 ++++++++++++++++++ .../action/CreateAndFollowIndexAction.java | 82 ++++++---- .../xpack/ccr/action/FollowIndexAction.java | 96 +++++++----- .../ccr/action/TransportCcrStatsAction.java | 22 ++- .../elasticsearch/xpack/ccr/CcrLicenseIT.java | 112 ++++++++++++++ .../org/elasticsearch/xpack/ccr/CcrTests.java | 6 +- .../ccr/IncompatibleLicenseLocalStateCcr.java | 30 ++++ .../xpack/ccr/LocalStateCcr.java | 9 +- .../license/RemoteClusterLicenseChecker.java | 7 +- .../license/XPackLicenseState.java | 38 ++++- .../RemoteClusterLicenseCheckerTests.java | 10 +- .../action/TransportStartDatafeedAction.java | 2 +- 17 files changed, 603 insertions(+), 88 deletions(-) create mode 100644 x-pack/plugin/ccr/qa/multi-cluster-with-incompatible-license/build.gradle create mode 100644 x-pack/plugin/ccr/qa/multi-cluster-with-incompatible-license/src/test/java/org/elasticsearch/xpack/ccr/CcrMultiClusterLicenseIT.java create mode 100644 x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java create mode 100644 x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java create mode 100644 x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IncompatibleLicenseLocalStateCcr.java diff --git a/x-pack/plugin/ccr/build.gradle b/x-pack/plugin/ccr/build.gradle index a430745db06f5..8be8a4ba4b93d 100644 --- a/x-pack/plugin/ccr/build.gradle +++ b/x-pack/plugin/ccr/build.gradle @@ -33,7 +33,11 @@ task internalClusterTest(type: RandomizedTestingTask, } internalClusterTest.mustRunAfter test -check.dependsOn(internalClusterTest, 'qa:multi-cluster:followClusterTest', 'qa:multi-cluster-with-security:followClusterTest') +check.dependsOn( + internalClusterTest, + 'qa:multi-cluster:followClusterTest', + 'qa:multi-cluster-with-incompatible-license:followClusterTest', + 'qa:multi-cluster-with-security:followClusterTest') dependencies { compileOnly "org.elasticsearch:elasticsearch:${version}" diff --git a/x-pack/plugin/ccr/qa/multi-cluster-with-incompatible-license/build.gradle b/x-pack/plugin/ccr/qa/multi-cluster-with-incompatible-license/build.gradle new file mode 100644 index 0000000000000..e9e57762c37e2 --- /dev/null +++ b/x-pack/plugin/ccr/qa/multi-cluster-with-incompatible-license/build.gradle @@ -0,0 +1,39 @@ +import org.elasticsearch.gradle.test.RestIntegTestTask + +apply plugin: 'elasticsearch.standalone-test' + +dependencies { + testCompile project(path: xpackModule('core'), configuration: 'shadow') + testCompile project(path: xpackModule('ccr'), configuration: 'runtime') +} + +task leaderClusterTest(type: RestIntegTestTask) { + mustRunAfter(precommit) +} + +leaderClusterTestCluster { + numNodes = 1 + clusterName = 'leader-cluster' +} + +leaderClusterTestRunner { + systemProperty 'tests.is_leader_cluster', 'true' +} + +task followClusterTest(type: RestIntegTestTask) {} + +followClusterTestCluster { + dependsOn leaderClusterTestRunner + numNodes = 1 + clusterName = 'follow-cluster' + setting 'xpack.license.self_generated.type', 'trial' + setting 'search.remote.leader_cluster.seeds', "\"${-> leaderClusterTest.nodes.get(0).transportUri()}\"" +} + +followClusterTestRunner { + systemProperty 'tests.is_leader_cluster', 'false' + systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(0).httpUri()}" + finalizedBy 'leaderClusterTestCluster#stop' +} + +test.enabled = false diff --git a/x-pack/plugin/ccr/qa/multi-cluster-with-incompatible-license/src/test/java/org/elasticsearch/xpack/ccr/CcrMultiClusterLicenseIT.java b/x-pack/plugin/ccr/qa/multi-cluster-with-incompatible-license/src/test/java/org/elasticsearch/xpack/ccr/CcrMultiClusterLicenseIT.java new file mode 100644 index 0000000000000..06d9f91c7abb7 --- /dev/null +++ b/x-pack/plugin/ccr/qa/multi-cluster-with-incompatible-license/src/test/java/org/elasticsearch/xpack/ccr/CcrMultiClusterLicenseIT.java @@ -0,0 +1,56 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr; + +import org.elasticsearch.client.Request; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.common.Booleans; +import org.elasticsearch.test.rest.ESRestTestCase; + +import java.util.Locale; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.hasToString; + +public class CcrMultiClusterLicenseIT extends ESRestTestCase { + + private final boolean runningAgainstLeaderCluster = Booleans.parseBoolean(System.getProperty("tests.is_leader_cluster")); + + @Override + protected boolean preserveClusterUponCompletion() { + return true; + } + + public void testFollowIndex() { + if (runningAgainstLeaderCluster == false) { + final Request request = new Request("POST", "/follower/_ccr/follow"); + request.setJsonEntity("{\"leader_index\": \"leader_cluster:leader\"}"); + assertLicenseIncompatible(request); + } + } + + public void testCreateAndFollowIndex() { + if (runningAgainstLeaderCluster == false) { + final Request request = new Request("POST", "/follower/_ccr/create_and_follow"); + request.setJsonEntity("{\"leader_index\": \"leader_cluster:leader\"}"); + assertLicenseIncompatible(request); + } + } + + private static void assertLicenseIncompatible(final Request request) { + final ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest(request)); + final String expected = String.format( + Locale.ROOT, + "can not fetch remote index [%s] metadata as the remote cluster [%s] is not licensed for [ccr]; " + + "the license mode [BASIC] on cluster [%s] does not enable [ccr]", + "leader_cluster:leader", + "leader_cluster", + "leader_cluster"); + assertThat(e, hasToString(containsString(expected))); + } + +} diff --git a/x-pack/plugin/ccr/qa/multi-cluster/build.gradle b/x-pack/plugin/ccr/qa/multi-cluster/build.gradle index 2ee0f9c1b45c8..537584f7b59f1 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster/build.gradle +++ b/x-pack/plugin/ccr/qa/multi-cluster/build.gradle @@ -14,6 +14,7 @@ task leaderClusterTest(type: RestIntegTestTask) { leaderClusterTestCluster { numNodes = 1 clusterName = 'leader-cluster' + setting 'xpack.license.self_generated.type', 'trial' } leaderClusterTestRunner { @@ -26,6 +27,7 @@ followClusterTestCluster { dependsOn leaderClusterTestRunner numNodes = 1 clusterName = 'follow-cluster' + setting 'xpack.license.self_generated.type', 'trial' setting 'search.remote.leader_cluster.seeds', "\"${-> leaderClusterTest.nodes.get(0).transportUri()}\"" } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index dedf3ea0f7555..d76af9f3c5352 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -20,6 +20,8 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.env.Environment; +import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.license.XPackLicenseState; @@ -31,10 +33,12 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; +import org.elasticsearch.script.ScriptService; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ExecutorBuilder; import org.elasticsearch.threadpool.FixedExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xpack.ccr.action.CcrStatsAction; import org.elasticsearch.xpack.ccr.action.CreateAndFollowIndexAction; import org.elasticsearch.xpack.ccr.action.FollowIndexAction; @@ -54,8 +58,10 @@ import org.elasticsearch.xpack.core.XPackPlugin; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.function.Supplier; @@ -72,15 +78,42 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E private final boolean enabled; private final Settings settings; + private final CcrLicenseChecker ccrLicenseChecker; /** * Construct an instance of the CCR container with the specified settings. * * @param settings the settings */ + @SuppressWarnings("unused") // constructed reflectively by the plugin infrastructure public Ccr(final Settings settings) { + this(settings, new CcrLicenseChecker()); + } + + /** + * Construct an instance of the CCR container with the specified settings and license checker. + * + * @param settings the settings + * @param ccrLicenseChecker the CCR license checker + */ + Ccr(final Settings settings, final CcrLicenseChecker ccrLicenseChecker) { this.settings = settings; this.enabled = CCR_ENABLED_SETTING.get(settings); + this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker); + } + + @Override + public Collection createComponents( + final Client client, + final ClusterService clusterService, + final ThreadPool threadPool, + final ResourceWatcherService resourceWatcherService, + final ScriptService scriptService, + final NamedXContentRegistry xContentRegistry, + final Environment environment, + final NodeEnvironment nodeEnvironment, + final NamedWriteableRegistry namedWriteableRegistry) { + return Collections.singleton(ccrLicenseChecker); } @Override diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java new file mode 100644 index 0000000000000..cefa490f4f7e2 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java @@ -0,0 +1,141 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr; + +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.license.RemoteClusterLicenseChecker; +import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.xpack.core.XPackPlugin; + +import java.util.Collections; +import java.util.Locale; +import java.util.Objects; +import java.util.function.BooleanSupplier; +import java.util.function.Consumer; + +/** + * Encapsulates licensing checking for CCR. + */ +public final class CcrLicenseChecker { + + private final BooleanSupplier isCcrAllowed; + + /** + * Constructs a CCR license checker with the default rule based on the license state for checking if CCR is allowed. + */ + CcrLicenseChecker() { + this(XPackPlugin.getSharedLicenseState()::isCcrAllowed); + } + + /** + * Constructs a CCR license checker with the specified boolean supplier. + * + * @param isCcrAllowed a boolean supplier that should return true if CCR is allowed and false otherwise + */ + CcrLicenseChecker(final BooleanSupplier isCcrAllowed) { + this.isCcrAllowed = Objects.requireNonNull(isCcrAllowed); + } + + /** + * Returns whether or not CCR is allowed. + * + * @return true if CCR is allowed, otherwise false + */ + public boolean isCcrAllowed() { + return isCcrAllowed.getAsBoolean(); + } + + /** + * Fetches the leader index metadata from the remote cluster. Before fetching the index metadata, the remote cluster is checked for + * license compatibility with CCR. If the remote cluster is not licensed for CCR, the {@link ActionListener#onFailure(Exception)} method + * of the specified listener is invoked. Otherwise, the specified consumer is invoked with the leader index metadata fetched from the + * remote cluster. + * + * @param client the client + * @param clusterAlias the remote cluster alias + * @param leaderIndex the name of the leader index + * @param listener the listener + * @param leaderIndexMetadataConsumer the leader index metadata consumer + * @param the type of response the listener is waiting for + */ + public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadata( + final Client client, + final String clusterAlias, + final String leaderIndex, + final ActionListener listener, + final Consumer leaderIndexMetadataConsumer) { + // we have to check the license on the remote cluster + new RemoteClusterLicenseChecker(client, XPackLicenseState::isCcrAllowedForOperationMode).checkRemoteClusterLicenses( + Collections.singletonList(clusterAlias), + new ActionListener() { + + @Override + public void onResponse(final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) { + if (licenseCheck.isSuccess()) { + final Client remoteClient = client.getRemoteClusterClient(clusterAlias); + final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + clusterStateRequest.clear(); + clusterStateRequest.metaData(true); + clusterStateRequest.indices(leaderIndex); + final ActionListener clusterStateListener = ActionListener.wrap( + r -> { + final ClusterState remoteClusterState = r.getState(); + final IndexMetaData leaderIndexMetadata = + remoteClusterState.getMetaData().index(leaderIndex); + leaderIndexMetadataConsumer.accept(leaderIndexMetadata); + }, + listener::onFailure); + // following an index in remote cluster, so use remote client to fetch leader index metadata + remoteClient.admin().cluster().state(clusterStateRequest, clusterStateListener); + } else { + listener.onFailure(incompatibleRemoteLicense(leaderIndex, licenseCheck)); + } + } + + @Override + public void onFailure(final Exception e) { + listener.onFailure(unknownRemoteLicense(leaderIndex, clusterAlias, e)); + } + + }); + } + + private static ElasticsearchStatusException incompatibleRemoteLicense( + final String leaderIndex, final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) { + final String clusterAlias = licenseCheck.remoteClusterLicenseInfo().clusterAlias(); + final String message = String.format( + Locale.ROOT, + "can not fetch remote index [%s:%s] metadata as the remote cluster [%s] is not licensed for [ccr]; %s", + clusterAlias, + leaderIndex, + clusterAlias, + RemoteClusterLicenseChecker.buildErrorMessage( + "ccr", + licenseCheck.remoteClusterLicenseInfo(), + RemoteClusterLicenseChecker::isLicensePlatinumOrTrial)); + return new ElasticsearchStatusException(message, RestStatus.BAD_REQUEST); + } + + private static ElasticsearchStatusException unknownRemoteLicense( + final String leaderIndex, final String clusterAlias, final Exception cause) { + final String message = String.format( + Locale.ROOT, + "can not fetch remote index [%s:%s] metadata as the license state of the remote cluster [%s] could not be determined", + clusterAlias, + leaderIndex, + clusterAlias); + return new ElasticsearchStatusException(message, RestStatus.BAD_REQUEST, cause); + } + +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CreateAndFollowIndexAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CreateAndFollowIndexAction.java index 1ce915fb19269..2e36bca293225 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CreateAndFollowIndexAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CreateAndFollowIndexAction.java @@ -3,6 +3,7 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ + package org.elasticsearch.xpack.ccr.action; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; @@ -11,7 +12,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.ActiveShardsObserver; @@ -36,10 +36,12 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ccr.CcrLicenseChecker; import org.elasticsearch.xpack.ccr.CcrSettings; import java.io.IOException; @@ -185,16 +187,25 @@ public static class TransportAction extends TransportMasterNodeAction listener) throws Exception { - String[] indices = new String[]{request.getFollowRequest().getLeaderIndex()}; - Map> remoteClusterIndices = remoteClusterService.groupClusterIndices(indices, s -> false); - if (remoteClusterIndices.containsKey(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) { - // Following an index in local cluster, so use local cluster state to fetch leader IndexMetaData: - IndexMetaData leaderIndexMetadata = state.getMetaData().index(request.getFollowRequest().getLeaderIndex()); - createFollowIndex(leaderIndexMetadata, request, listener); + protected void masterOperation( + final Request request, final ClusterState state, final ActionListener listener) throws Exception { + if (ccrLicenseChecker.isCcrAllowed()) { + final String[] indices = new String[]{request.getFollowRequest().getLeaderIndex()}; + final Map> remoteClusterIndices = remoteClusterService.groupClusterIndices(indices, s -> false); + if (remoteClusterIndices.containsKey(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) { + createFollowerIndexAndFollowLocalIndex(request, state, listener); + } else { + assert remoteClusterIndices.size() == 1; + final Map.Entry> entry = remoteClusterIndices.entrySet().iterator().next(); + assert entry.getValue().size() == 1; + final String clusterAlias = entry.getKey(); + final String leaderIndex = entry.getValue().get(0); + createFollowerIndexAndFollowRemoteIndex(request, clusterAlias, leaderIndex, listener); + } } else { - // Following an index in remote cluster, so use remote client to fetch leader IndexMetaData: - assert remoteClusterIndices.size() == 1; - Map.Entry> entry = remoteClusterIndices.entrySet().iterator().next(); - assert entry.getValue().size() == 1; - String clusterNameAlias = entry.getKey(); - String leaderIndex = entry.getValue().get(0); - - Client remoteClient = client.getRemoteClusterClient(clusterNameAlias); - ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); - clusterStateRequest.clear(); - clusterStateRequest.metaData(true); - clusterStateRequest.indices(leaderIndex); - remoteClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(r -> { - ClusterState remoteClusterState = r.getState(); - IndexMetaData leaderIndexMetadata = remoteClusterState.getMetaData().index(leaderIndex); - createFollowIndex(leaderIndexMetadata, request, listener); - }, listener::onFailure)); + listener.onFailure(LicenseUtils.newComplianceException("ccr")); } } - private void createFollowIndex(IndexMetaData leaderIndexMetaData, Request request, ActionListener listener) { + private void createFollowerIndexAndFollowLocalIndex( + final Request request, final ClusterState state, final ActionListener listener) { + // following an index in local cluster, so use local cluster state to fetch leader index metadata + final IndexMetaData leaderIndexMetadata = state.getMetaData().index(request.getFollowRequest().getLeaderIndex()); + createFollowerIndex(leaderIndexMetadata, request, listener); + } + + private void createFollowerIndexAndFollowRemoteIndex( + final Request request, + final String clusterAlias, + final String leaderIndex, + final ActionListener listener) { + ccrLicenseChecker.checkRemoteClusterLicenseAndFetchLeaderIndexMetadata( + client, + clusterAlias, + leaderIndex, + listener, + leaderIndexMetaData -> createFollowerIndex(leaderIndexMetaData, request, listener)); + } + + private void createFollowerIndex( + final IndexMetaData leaderIndexMetaData, final Request request, final ActionListener listener) { if (leaderIndexMetaData == null) { listener.onFailure(new IllegalArgumentException("leader index [" + request.getFollowRequest().getLeaderIndex() + "] does not exist")); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java index 5f33bcd6fcd03..179c4f1c43896 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/FollowIndexAction.java @@ -10,7 +10,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.master.AcknowledgedResponse; @@ -40,6 +39,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesRequestCache; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.tasks.Task; @@ -47,6 +47,7 @@ import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ccr.CcrLicenseChecker; import org.elasticsearch.xpack.ccr.CcrSettings; import java.io.IOException; @@ -293,11 +294,19 @@ public static class TransportAction extends HandledTransportAction listener) { - ClusterState localClusterState = clusterService.state(); - IndexMetaData followIndexMetadata = localClusterState.getMetaData().index(request.followerIndex); - - String[] indices = new String[]{request.leaderIndex}; - Map> remoteClusterIndices = remoteClusterService.groupClusterIndices(indices, s -> false); - if (remoteClusterIndices.containsKey(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) { - // Following an index in local cluster, so use local cluster state to fetch leader IndexMetaData: - IndexMetaData leaderIndexMetadata = localClusterState.getMetaData().index(request.leaderIndex); - try { - start(request, null, leaderIndexMetadata, followIndexMetadata, listener); - } catch (IOException e) { - listener.onFailure(e); - return; + protected void doExecute(final Task task, final Request request, final ActionListener listener) { + if (ccrLicenseChecker.isCcrAllowed()) { + final String[] indices = new String[]{request.leaderIndex}; + final Map> remoteClusterIndices = remoteClusterService.groupClusterIndices(indices, s -> false); + if (remoteClusterIndices.containsKey(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) { + followLocalIndex(request, listener); + } else { + assert remoteClusterIndices.size() == 1; + final Map.Entry> entry = remoteClusterIndices.entrySet().iterator().next(); + assert entry.getValue().size() == 1; + final String clusterAlias = entry.getKey(); + final String leaderIndex = entry.getValue().get(0); + followRemoteIndex(request, clusterAlias, leaderIndex, listener); } } else { - // Following an index in remote cluster, so use remote client to fetch leader IndexMetaData: - assert remoteClusterIndices.size() == 1; - Map.Entry> entry = remoteClusterIndices.entrySet().iterator().next(); - assert entry.getValue().size() == 1; - String clusterNameAlias = entry.getKey(); - String leaderIndex = entry.getValue().get(0); - - Client remoteClient = client.getRemoteClusterClient(clusterNameAlias); - ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); - clusterStateRequest.clear(); - clusterStateRequest.metaData(true); - clusterStateRequest.indices(leaderIndex); - remoteClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(r -> { - ClusterState remoteClusterState = r.getState(); - IndexMetaData leaderIndexMetadata = remoteClusterState.getMetaData().index(leaderIndex); - start(request, clusterNameAlias, leaderIndexMetadata, followIndexMetadata, listener); - }, listener::onFailure)); + listener.onFailure(LicenseUtils.newComplianceException("ccr")); + } + } + + private void followLocalIndex(final Request request, final ActionListener listener) { + final ClusterState state = clusterService.state(); + final IndexMetaData followerIndexMetadata = state.getMetaData().index(request.getFollowerIndex()); + // following an index in local cluster, so use local cluster state to fetch leader index metadata + final IndexMetaData leaderIndexMetadata = state.getMetaData().index(request.getLeaderIndex()); + try { + start(request, null, leaderIndexMetadata, followerIndexMetadata, listener); + } catch (final IOException e) { + listener.onFailure(e); } } + private void followRemoteIndex( + final Request request, + final String clusterAlias, + final String leaderIndex, + final ActionListener listener) { + final ClusterState state = clusterService.state(); + final IndexMetaData followerIndexMetadata = state.getMetaData().index(request.getFollowerIndex()); + ccrLicenseChecker.checkRemoteClusterLicenseAndFetchLeaderIndexMetadata( + client, + clusterAlias, + leaderIndex, + listener, + leaderIndexMetadata -> { + try { + start(request, clusterAlias, leaderIndexMetadata, followerIndexMetadata, listener); + } catch (final IOException e) { + listener.onFailure(e); + } + }); + } + /** * Performs validation on the provided leader and follow {@link IndexMetaData} instances and then * creates a persistent task for each leader primary shard. This persistent tasks track changes in the leader diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportCcrStatsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportCcrStatsAction.java index 8949739e76d93..33873201f5fb3 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportCcrStatsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportCcrStatsAction.java @@ -17,14 +17,17 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.tasks.Task; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ccr.Ccr; +import org.elasticsearch.xpack.ccr.CcrLicenseChecker; import java.io.IOException; import java.util.Arrays; import java.util.HashSet; import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.function.Consumer; @@ -34,6 +37,7 @@ public class TransportCcrStatsAction extends TransportTasksAction< CcrStatsAction.TasksResponse, CcrStatsAction.TaskResponse> { private final IndexNameExpressionResolver resolver; + private final CcrLicenseChecker ccrLicenseChecker; @Inject public TransportCcrStatsAction( @@ -41,7 +45,8 @@ public TransportCcrStatsAction( final ClusterService clusterService, final TransportService transportService, final ActionFilters actionFilters, - final IndexNameExpressionResolver resolver) { + final IndexNameExpressionResolver resolver, + final CcrLicenseChecker ccrLicenseChecker) { super( settings, CcrStatsAction.NAME, @@ -51,7 +56,20 @@ public TransportCcrStatsAction( CcrStatsAction.TasksRequest::new, CcrStatsAction.TasksResponse::new, Ccr.CCR_THREAD_POOL_NAME); - this.resolver = resolver; + this.resolver = Objects.requireNonNull(resolver); + this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker); + } + + @Override + protected void doExecute( + final Task task, + final CcrStatsAction.TasksRequest request, + final ActionListener listener) { + if (ccrLicenseChecker.isCcrAllowed()) { + super.doExecute(task, request, listener); + } else { + listener.onFailure(LicenseUtils.newComplianceException("ccr")); + } } @Override diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java new file mode 100644 index 0000000000000..87772d0c15074 --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java @@ -0,0 +1,112 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr; + +import org.elasticsearch.ElasticsearchSecurityException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.xpack.ccr.action.CcrStatsAction; +import org.elasticsearch.xpack.ccr.action.CreateAndFollowIndexAction; +import org.elasticsearch.xpack.ccr.action.FollowIndexAction; +import org.elasticsearch.xpack.ccr.action.ShardFollowNodeTask; + +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.CountDownLatch; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; + +public class CcrLicenseIT extends ESSingleNodeTestCase { + + @Override + protected Collection> getPlugins() { + return Collections.singletonList(IncompatibleLicenseLocalStateCcr.class); + } + + public void testThatFollowingIndexIsUnavailableWithIncompatibleLicense() throws InterruptedException { + final FollowIndexAction.Request followRequest = getFollowRequest(); + final CountDownLatch latch = new CountDownLatch(1); + client().execute( + FollowIndexAction.INSTANCE, + followRequest, + new ActionListener() { + @Override + public void onResponse(final FollowIndexAction.Response response) { + fail(); + } + + @Override + public void onFailure(final Exception e) { + assertIncompatibleLicense(e); + latch.countDown(); + } + }); + latch.await(); + } + + public void testThatCreateAndFollowingIndexIsUnavailableWithIncompatibleLicense() throws InterruptedException { + final FollowIndexAction.Request followRequest = getFollowRequest(); + final CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest); + final CountDownLatch latch = new CountDownLatch(1); + client().execute( + CreateAndFollowIndexAction.INSTANCE, + createAndFollowRequest, + new ActionListener() { + @Override + public void onResponse(final CreateAndFollowIndexAction.Response response) { + fail(); + } + + @Override + public void onFailure(final Exception e) { + assertIncompatibleLicense(e); + latch.countDown(); + } + }); + latch.await(); + } + + public void testThatCcrStatsAreUnavailableWithIncompatibleLicense() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + client().execute(CcrStatsAction.INSTANCE, new CcrStatsAction.TasksRequest(), new ActionListener() { + @Override + public void onResponse(final CcrStatsAction.TasksResponse tasksResponse) { + fail(); + } + + @Override + public void onFailure(final Exception e) { + assertIncompatibleLicense(e); + latch.countDown(); + } + }); + + latch.await(); + } + + private void assertIncompatibleLicense(final Exception e) { + assertThat(e, instanceOf(ElasticsearchSecurityException.class)); + assertThat(e.getMessage(), equalTo("current license is non-compliant for [ccr]")); + } + + private FollowIndexAction.Request getFollowRequest() { + return new FollowIndexAction.Request( + "leader", + "follower", + ShardFollowNodeTask.DEFAULT_MAX_BATCH_OPERATION_COUNT, + ShardFollowNodeTask.DEFAULT_MAX_CONCURRENT_READ_BATCHES, + ShardFollowNodeTask.DEFAULT_MAX_BATCH_SIZE_IN_BYTES, + ShardFollowNodeTask.DEFAULT_MAX_CONCURRENT_WRITE_BATCHES, + ShardFollowNodeTask.DEFAULT_MAX_WRITE_BUFFER_SIZE, + TimeValue.timeValueMillis(10), + TimeValue.timeValueMillis(10)); + } + +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrTests.java index c9a862954566a..0a9ca00590b3d 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrTests.java @@ -3,6 +3,7 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ + package org.elasticsearch.xpack.ccr; import org.elasticsearch.Version; @@ -40,9 +41,8 @@ public void testGetEngineFactory() throws IOException { .numberOfShards(1) .numberOfReplicas(0) .build(); - final Ccr ccr = new Ccr(Settings.EMPTY); - final Optional engineFactory = - ccr.getEngineFactory(new IndexSettings(indexMetaData, Settings.EMPTY)); + final Ccr ccr = new Ccr(Settings.EMPTY, new CcrLicenseChecker(() -> true)); + final Optional engineFactory = ccr.getEngineFactory(new IndexSettings(indexMetaData, Settings.EMPTY)); if (value != null && value) { assertTrue(engineFactory.isPresent()); assertThat(engineFactory.get(), instanceOf(FollowingEngineFactory.class)); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IncompatibleLicenseLocalStateCcr.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IncompatibleLicenseLocalStateCcr.java new file mode 100644 index 0000000000000..c4b765d3c65ea --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IncompatibleLicenseLocalStateCcr.java @@ -0,0 +1,30 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; + +import java.nio.file.Path; + +public class IncompatibleLicenseLocalStateCcr extends LocalStateCompositeXPackPlugin { + + public IncompatibleLicenseLocalStateCcr(final Settings settings, final Path configPath) throws Exception { + super(settings, configPath); + + plugins.add(new Ccr(settings, new CcrLicenseChecker(() -> false)) { + + @Override + protected XPackLicenseState getLicenseState() { + return IncompatibleLicenseLocalStateCcr.this.getLicenseState(); + } + + }); + } + +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalStateCcr.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalStateCcr.java index b705ce53e7e1a..cfc30b8dfac47 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalStateCcr.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalStateCcr.java @@ -3,6 +3,7 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ + package org.elasticsearch.xpack.ccr; import org.elasticsearch.common.settings.Settings; @@ -15,14 +16,16 @@ public class LocalStateCcr extends LocalStateCompositeXPackPlugin { public LocalStateCcr(final Settings settings, final Path configPath) throws Exception { super(settings, configPath); - LocalStateCcr thisVar = this; - plugins.add(new Ccr(settings){ + plugins.add(new Ccr(settings, new CcrLicenseChecker(() -> true)) { + @Override protected XPackLicenseState getLicenseState() { - return thisVar.getLicenseState(); + return LocalStateCcr.this.getLicenseState(); } + }); } + } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java index 043224e357b91..e7460d5a2eb38 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java @@ -119,7 +119,7 @@ private LicenseCheck(final RemoteClusterLicenseInfo remoteClusterLicenseInfo) { } private final Client client; - private final Predicate predicate; + private final Predicate predicate; /** * Constructs a remote cluster license checker with the specified license predicate for checking license compatibility. The predicate @@ -128,7 +128,7 @@ private LicenseCheck(final RemoteClusterLicenseInfo remoteClusterLicenseInfo) { * @param client the client * @param predicate the license predicate */ - public RemoteClusterLicenseChecker(final Client client, final Predicate predicate) { + public RemoteClusterLicenseChecker(final Client client, final Predicate predicate) { this.client = client; this.predicate = predicate; } @@ -159,7 +159,8 @@ public void checkRemoteClusterLicenses(final List clusterAliases, final @Override public void onResponse(final XPackInfoResponse xPackInfoResponse) { final XPackInfoResponse.LicenseInfo licenseInfo = xPackInfoResponse.getLicenseInfo(); - if ((licenseInfo.getStatus() == LicenseStatus.ACTIVE) == false || predicate.test(licenseInfo) == false) { + if ((licenseInfo.getStatus() == LicenseStatus.ACTIVE) == false + || predicate.test(License.OperationMode.resolve(licenseInfo.getMode())) == false) { listener.onResponse(LicenseCheck.failure(new RemoteClusterLicenseInfo(clusterAlias.get(), licenseInfo))); return; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/XPackLicenseState.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/XPackLicenseState.java index 722c9d0e711aa..39ab0b29834a2 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/XPackLicenseState.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/XPackLicenseState.java @@ -514,13 +514,13 @@ public boolean isGraphAllowed() { * {@code false}. */ public boolean isMachineLearningAllowed() { - // status is volatile - Status localStatus = status; - OperationMode operationMode = localStatus.mode; - - boolean licensed = operationMode == OperationMode.TRIAL || operationMode == OperationMode.PLATINUM; + // one-time volatile read as status could be updated on us while performing this check + final Status currentStatus = status; + return currentStatus.active && isMachineLearningAllowedForOperationMode(currentStatus.mode); + } - return licensed && localStatus.active; + public static boolean isMachineLearningAllowedForOperationMode(final OperationMode operationMode) { + return isPlatinumOrTrialOperationMode(operationMode); } /** @@ -612,4 +612,30 @@ public boolean isSecurityEnabled() { final OperationMode mode = status.mode; return mode == OperationMode.TRIAL ? (isSecurityExplicitlyEnabled || isSecurityEnabledByTrialVersion) : isSecurityEnabled; } + + /** + * Determine if cross-cluster replication should be enabled. + *

+ * Cross-cluster replication is only disabled when the license has expired or if the mode is not: + *

    + *
  • {@link OperationMode#PLATINUM}
  • + *
  • {@link OperationMode#TRIAL}
  • + *
+ * + * @return true is the license is compatible, otherwise false + */ + public boolean isCcrAllowed() { + // one-time volatile read as status could be updated on us while performing this check + final Status currentStatus = status; + return currentStatus.active && isCcrAllowedForOperationMode(currentStatus.mode); + } + + public static boolean isCcrAllowedForOperationMode(final OperationMode operationMode) { + return isPlatinumOrTrialOperationMode(operationMode); + } + + public static boolean isPlatinumOrTrialOperationMode(final OperationMode operationMode) { + return operationMode == OperationMode.PLATINUM || operationMode == OperationMode.TRIAL; + } + } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java index a8627d2154209..58ca42c7f681e 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java @@ -118,7 +118,7 @@ public void testCheckRemoteClusterLicensesGivenCompatibleLicenses() { responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null)); final RemoteClusterLicenseChecker licenseChecker = - new RemoteClusterLicenseChecker(client, RemoteClusterLicenseChecker::isLicensePlatinumOrTrial); + new RemoteClusterLicenseChecker(client, XPackLicenseState::isPlatinumOrTrialOperationMode); final AtomicReference licenseCheck = new AtomicReference<>(); licenseChecker.checkRemoteClusterLicenses( @@ -160,7 +160,7 @@ public void testCheckRemoteClusterLicensesGivenIncompatibleLicense() { }).when(client).execute(same(XPackInfoAction.INSTANCE), any(), any()); final RemoteClusterLicenseChecker licenseChecker = - new RemoteClusterLicenseChecker(client, RemoteClusterLicenseChecker::isLicensePlatinumOrTrial); + new RemoteClusterLicenseChecker(client, XPackLicenseState::isPlatinumOrTrialOperationMode); final AtomicReference licenseCheck = new AtomicReference<>(); licenseChecker.checkRemoteClusterLicenses( @@ -206,7 +206,7 @@ public void testCheckRemoteClusterLicencesGivenNonExistentCluster() { responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null)); final RemoteClusterLicenseChecker licenseChecker = - new RemoteClusterLicenseChecker(client, RemoteClusterLicenseChecker::isLicensePlatinumOrTrial); + new RemoteClusterLicenseChecker(client, XPackLicenseState::isPlatinumOrTrialOperationMode); final AtomicReference exception = new AtomicReference<>(); licenseChecker.checkRemoteClusterLicenses( @@ -246,7 +246,7 @@ public void testRemoteClusterLicenseCallUsesSystemContext() throws InterruptedEx }).when(client).execute(same(XPackInfoAction.INSTANCE), any(), any()); final RemoteClusterLicenseChecker licenseChecker = - new RemoteClusterLicenseChecker(client, RemoteClusterLicenseChecker::isLicensePlatinumOrTrial); + new RemoteClusterLicenseChecker(client, XPackLicenseState::isPlatinumOrTrialOperationMode); final List remoteClusterAliases = Collections.singletonList("valid"); licenseChecker.checkRemoteClusterLicenses( @@ -285,7 +285,7 @@ public void testListenerIsExecutedWithCallingContext() throws InterruptedExcepti responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null)); final RemoteClusterLicenseChecker licenseChecker = - new RemoteClusterLicenseChecker(client, RemoteClusterLicenseChecker::isLicensePlatinumOrTrial); + new RemoteClusterLicenseChecker(client, XPackLicenseState::isPlatinumOrTrialOperationMode); final AtomicBoolean listenerInvoked = new AtomicBoolean(); threadPool.getThreadContext().putHeader("key", "value"); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index d6ebdd0449e98..f3f4a771443ef 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -144,7 +144,7 @@ public void onFailure(Exception e) { if (RemoteClusterLicenseChecker.containsRemoteIndex(datafeed.getIndices())) { final RemoteClusterLicenseChecker remoteClusterLicenseChecker = - new RemoteClusterLicenseChecker(client, RemoteClusterLicenseChecker::isLicensePlatinumOrTrial); + new RemoteClusterLicenseChecker(client, XPackLicenseState::isMachineLearningAllowedForOperationMode); remoteClusterLicenseChecker.checkRemoteClusterLicenses( RemoteClusterLicenseChecker.remoteClusterAliases(datafeed.getIndices()), ActionListener.wrap(