Skip to content

Commit

Permalink
Implement CCR licensing (#33002)
Browse files Browse the repository at this point in the history
This commit implements licensing for CCR. CCR will require a platinum
license, and administrative endpoints will be disabled when a license is
non-compliant.
  • Loading branch information
jasontedor authored Aug 21, 2018
1 parent 77d7547 commit b08d02e
Show file tree
Hide file tree
Showing 17 changed files with 603 additions and 88 deletions.
6 changes: 5 additions & 1 deletion x-pack/plugin/ccr/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ task internalClusterTest(type: RandomizedTestingTask,
}

internalClusterTest.mustRunAfter test
check.dependsOn(internalClusterTest, 'qa:multi-cluster:followClusterTest', 'qa:multi-cluster-with-security:followClusterTest')
check.dependsOn(
internalClusterTest,
'qa:multi-cluster:followClusterTest',
'qa:multi-cluster-with-incompatible-license:followClusterTest',
'qa:multi-cluster-with-security:followClusterTest')

dependencies {
compileOnly "org.elasticsearch:elasticsearch:${version}"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import org.elasticsearch.gradle.test.RestIntegTestTask

apply plugin: 'elasticsearch.standalone-test'

dependencies {
testCompile project(path: xpackModule('core'), configuration: 'shadow')
testCompile project(path: xpackModule('ccr'), configuration: 'runtime')
}

task leaderClusterTest(type: RestIntegTestTask) {
mustRunAfter(precommit)
}

leaderClusterTestCluster {
numNodes = 1
clusterName = 'leader-cluster'
}

leaderClusterTestRunner {
systemProperty 'tests.is_leader_cluster', 'true'
}

task followClusterTest(type: RestIntegTestTask) {}

followClusterTestCluster {
dependsOn leaderClusterTestRunner
numNodes = 1
clusterName = 'follow-cluster'
setting 'xpack.license.self_generated.type', 'trial'
setting 'search.remote.leader_cluster.seeds', "\"${-> leaderClusterTest.nodes.get(0).transportUri()}\""
}

followClusterTestRunner {
systemProperty 'tests.is_leader_cluster', 'false'
systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(0).httpUri()}"
finalizedBy 'leaderClusterTestCluster#stop'
}

test.enabled = false
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.ccr;

import org.elasticsearch.client.Request;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.test.rest.ESRestTestCase;

import java.util.Locale;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasToString;

public class CcrMultiClusterLicenseIT extends ESRestTestCase {

private final boolean runningAgainstLeaderCluster = Booleans.parseBoolean(System.getProperty("tests.is_leader_cluster"));

@Override
protected boolean preserveClusterUponCompletion() {
return true;
}

public void testFollowIndex() {
if (runningAgainstLeaderCluster == false) {
final Request request = new Request("POST", "/follower/_ccr/follow");
request.setJsonEntity("{\"leader_index\": \"leader_cluster:leader\"}");
assertLicenseIncompatible(request);
}
}

public void testCreateAndFollowIndex() {
if (runningAgainstLeaderCluster == false) {
final Request request = new Request("POST", "/follower/_ccr/create_and_follow");
request.setJsonEntity("{\"leader_index\": \"leader_cluster:leader\"}");
assertLicenseIncompatible(request);
}
}

private static void assertLicenseIncompatible(final Request request) {
final ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest(request));
final String expected = String.format(
Locale.ROOT,
"can not fetch remote index [%s] metadata as the remote cluster [%s] is not licensed for [ccr]; " +
"the license mode [BASIC] on cluster [%s] does not enable [ccr]",
"leader_cluster:leader",
"leader_cluster",
"leader_cluster");
assertThat(e, hasToString(containsString(expected)));
}

}
2 changes: 2 additions & 0 deletions x-pack/plugin/ccr/qa/multi-cluster/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ task leaderClusterTest(type: RestIntegTestTask) {
leaderClusterTestCluster {
numNodes = 1
clusterName = 'leader-cluster'
setting 'xpack.license.self_generated.type', 'trial'
}

leaderClusterTestRunner {
Expand All @@ -26,6 +27,7 @@ followClusterTestCluster {
dependsOn leaderClusterTestRunner
numNodes = 1
clusterName = 'follow-cluster'
setting 'xpack.license.self_generated.type', 'trial'
setting 'search.remote.leader_cluster.seeds', "\"${-> leaderClusterTest.nodes.get(0).transportUri()}\""
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.license.XPackLicenseState;
Expand All @@ -31,10 +33,12 @@
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.FixedExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.ccr.action.CcrStatsAction;
import org.elasticsearch.xpack.ccr.action.CreateAndFollowIndexAction;
import org.elasticsearch.xpack.ccr.action.FollowIndexAction;
Expand All @@ -54,8 +58,10 @@
import org.elasticsearch.xpack.core.XPackPlugin;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;

Expand All @@ -72,15 +78,42 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E

private final boolean enabled;
private final Settings settings;
private final CcrLicenseChecker ccrLicenseChecker;

/**
* Construct an instance of the CCR container with the specified settings.
*
* @param settings the settings
*/
@SuppressWarnings("unused") // constructed reflectively by the plugin infrastructure
public Ccr(final Settings settings) {
this(settings, new CcrLicenseChecker());
}

/**
* Construct an instance of the CCR container with the specified settings and license checker.
*
* @param settings the settings
* @param ccrLicenseChecker the CCR license checker
*/
Ccr(final Settings settings, final CcrLicenseChecker ccrLicenseChecker) {
this.settings = settings;
this.enabled = CCR_ENABLED_SETTING.get(settings);
this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker);
}

@Override
public Collection<Object> createComponents(
final Client client,
final ClusterService clusterService,
final ThreadPool threadPool,
final ResourceWatcherService resourceWatcherService,
final ScriptService scriptService,
final NamedXContentRegistry xContentRegistry,
final Environment environment,
final NodeEnvironment nodeEnvironment,
final NamedWriteableRegistry namedWriteableRegistry) {
return Collections.singleton(ccrLicenseChecker);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.ccr;

import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.license.RemoteClusterLicenseChecker;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.core.XPackPlugin;

import java.util.Collections;
import java.util.Locale;
import java.util.Objects;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;

/**
* Encapsulates licensing checking for CCR.
*/
public final class CcrLicenseChecker {

private final BooleanSupplier isCcrAllowed;

/**
* Constructs a CCR license checker with the default rule based on the license state for checking if CCR is allowed.
*/
CcrLicenseChecker() {
this(XPackPlugin.getSharedLicenseState()::isCcrAllowed);
}

/**
* Constructs a CCR license checker with the specified boolean supplier.
*
* @param isCcrAllowed a boolean supplier that should return true if CCR is allowed and false otherwise
*/
CcrLicenseChecker(final BooleanSupplier isCcrAllowed) {
this.isCcrAllowed = Objects.requireNonNull(isCcrAllowed);
}

/**
* Returns whether or not CCR is allowed.
*
* @return true if CCR is allowed, otherwise false
*/
public boolean isCcrAllowed() {
return isCcrAllowed.getAsBoolean();
}

/**
* Fetches the leader index metadata from the remote cluster. Before fetching the index metadata, the remote cluster is checked for
* license compatibility with CCR. If the remote cluster is not licensed for CCR, the {@link ActionListener#onFailure(Exception)} method
* of the specified listener is invoked. Otherwise, the specified consumer is invoked with the leader index metadata fetched from the
* remote cluster.
*
* @param client the client
* @param clusterAlias the remote cluster alias
* @param leaderIndex the name of the leader index
* @param listener the listener
* @param leaderIndexMetadataConsumer the leader index metadata consumer
* @param <T> the type of response the listener is waiting for
*/
public <T> void checkRemoteClusterLicenseAndFetchLeaderIndexMetadata(
final Client client,
final String clusterAlias,
final String leaderIndex,
final ActionListener<T> listener,
final Consumer<IndexMetaData> leaderIndexMetadataConsumer) {
// we have to check the license on the remote cluster
new RemoteClusterLicenseChecker(client, XPackLicenseState::isCcrAllowedForOperationMode).checkRemoteClusterLicenses(
Collections.singletonList(clusterAlias),
new ActionListener<RemoteClusterLicenseChecker.LicenseCheck>() {

@Override
public void onResponse(final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) {
if (licenseCheck.isSuccess()) {
final Client remoteClient = client.getRemoteClusterClient(clusterAlias);
final ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.clear();
clusterStateRequest.metaData(true);
clusterStateRequest.indices(leaderIndex);
final ActionListener<ClusterStateResponse> clusterStateListener = ActionListener.wrap(
r -> {
final ClusterState remoteClusterState = r.getState();
final IndexMetaData leaderIndexMetadata =
remoteClusterState.getMetaData().index(leaderIndex);
leaderIndexMetadataConsumer.accept(leaderIndexMetadata);
},
listener::onFailure);
// following an index in remote cluster, so use remote client to fetch leader index metadata
remoteClient.admin().cluster().state(clusterStateRequest, clusterStateListener);
} else {
listener.onFailure(incompatibleRemoteLicense(leaderIndex, licenseCheck));
}
}

@Override
public void onFailure(final Exception e) {
listener.onFailure(unknownRemoteLicense(leaderIndex, clusterAlias, e));
}

});
}

private static ElasticsearchStatusException incompatibleRemoteLicense(
final String leaderIndex, final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) {
final String clusterAlias = licenseCheck.remoteClusterLicenseInfo().clusterAlias();
final String message = String.format(
Locale.ROOT,
"can not fetch remote index [%s:%s] metadata as the remote cluster [%s] is not licensed for [ccr]; %s",
clusterAlias,
leaderIndex,
clusterAlias,
RemoteClusterLicenseChecker.buildErrorMessage(
"ccr",
licenseCheck.remoteClusterLicenseInfo(),
RemoteClusterLicenseChecker::isLicensePlatinumOrTrial));
return new ElasticsearchStatusException(message, RestStatus.BAD_REQUEST);
}

private static ElasticsearchStatusException unknownRemoteLicense(
final String leaderIndex, final String clusterAlias, final Exception cause) {
final String message = String.format(
Locale.ROOT,
"can not fetch remote index [%s:%s] metadata as the license state of the remote cluster [%s] could not be determined",
clusterAlias,
leaderIndex,
clusterAlias);
return new ElasticsearchStatusException(message, RestStatus.BAD_REQUEST, cause);
}

}
Loading

0 comments on commit b08d02e

Please sign in to comment.