Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement CCR licensing #33002

Merged
merged 4 commits into from
Aug 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion x-pack/plugin/ccr/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ task internalClusterTest(type: RandomizedTestingTask,
}

internalClusterTest.mustRunAfter test
check.dependsOn(internalClusterTest, 'qa:multi-cluster:followClusterTest', 'qa:multi-cluster-with-security:followClusterTest')
check.dependsOn(
internalClusterTest,
'qa:multi-cluster:followClusterTest',
'qa:multi-cluster-with-incompatible-license:followClusterTest',
'qa:multi-cluster-with-security:followClusterTest')

dependencies {
compileOnly "org.elasticsearch:elasticsearch:${version}"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import org.elasticsearch.gradle.test.RestIntegTestTask

apply plugin: 'elasticsearch.standalone-test'

dependencies {
testCompile project(path: xpackModule('core'), configuration: 'shadow')
testCompile project(path: xpackModule('ccr'), configuration: 'runtime')
}

task leaderClusterTest(type: RestIntegTestTask) {
mustRunAfter(precommit)
}

leaderClusterTestCluster {
numNodes = 1
clusterName = 'leader-cluster'
}

leaderClusterTestRunner {
systemProperty 'tests.is_leader_cluster', 'true'
}

task followClusterTest(type: RestIntegTestTask) {}

followClusterTestCluster {
dependsOn leaderClusterTestRunner
numNodes = 1
clusterName = 'follow-cluster'
setting 'xpack.license.self_generated.type', 'trial'
setting 'search.remote.leader_cluster.seeds', "\"${-> leaderClusterTest.nodes.get(0).transportUri()}\""
}

followClusterTestRunner {
systemProperty 'tests.is_leader_cluster', 'false'
systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(0).httpUri()}"
finalizedBy 'leaderClusterTestCluster#stop'
}

test.enabled = false
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.ccr;

import org.elasticsearch.client.Request;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.test.rest.ESRestTestCase;

import java.util.Locale;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasToString;

public class CcrMultiClusterLicenseIT extends ESRestTestCase {

private final boolean runningAgainstLeaderCluster = Booleans.parseBoolean(System.getProperty("tests.is_leader_cluster"));

@Override
protected boolean preserveClusterUponCompletion() {
return true;
}

public void testFollowIndex() {
if (runningAgainstLeaderCluster == false) {
final Request request = new Request("POST", "/follower/_ccr/follow");
request.setJsonEntity("{\"leader_index\": \"leader_cluster:leader\"}");
assertLicenseIncompatible(request);
}
}

public void testCreateAndFollowIndex() {
if (runningAgainstLeaderCluster == false) {
final Request request = new Request("POST", "/follower/_ccr/create_and_follow");
request.setJsonEntity("{\"leader_index\": \"leader_cluster:leader\"}");
assertLicenseIncompatible(request);
}
}

private static void assertLicenseIncompatible(final Request request) {
final ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest(request));
final String expected = String.format(
Locale.ROOT,
"can not fetch remote index [%s] metadata as the remote cluster [%s] is not licensed for [ccr]; " +
"the license mode [BASIC] on cluster [%s] does not enable [ccr]",
"leader_cluster:leader",
"leader_cluster",
"leader_cluster");
assertThat(e, hasToString(containsString(expected)));
}

}
2 changes: 2 additions & 0 deletions x-pack/plugin/ccr/qa/multi-cluster/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ task leaderClusterTest(type: RestIntegTestTask) {
leaderClusterTestCluster {
numNodes = 1
clusterName = 'leader-cluster'
setting 'xpack.license.self_generated.type', 'trial'
}

leaderClusterTestRunner {
Expand All @@ -26,6 +27,7 @@ followClusterTestCluster {
dependsOn leaderClusterTestRunner
numNodes = 1
clusterName = 'follow-cluster'
setting 'xpack.license.self_generated.type', 'trial'
setting 'search.remote.leader_cluster.seeds', "\"${-> leaderClusterTest.nodes.get(0).transportUri()}\""
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.license.XPackLicenseState;
Expand All @@ -31,10 +33,12 @@
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.FixedExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.ccr.action.CcrStatsAction;
import org.elasticsearch.xpack.ccr.action.CreateAndFollowIndexAction;
import org.elasticsearch.xpack.ccr.action.FollowIndexAction;
Expand All @@ -54,8 +58,10 @@
import org.elasticsearch.xpack.core.XPackPlugin;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;

Expand All @@ -72,15 +78,42 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E

private final boolean enabled;
private final Settings settings;
private final CcrLicenseChecker ccrLicenseChecker;

/**
* Construct an instance of the CCR container with the specified settings.
*
* @param settings the settings
*/
@SuppressWarnings("unused") // constructed reflectively by the plugin infrastructure
public Ccr(final Settings settings) {
this(settings, new CcrLicenseChecker());
}

/**
* Construct an instance of the CCR container with the specified settings and license checker.
*
* @param settings the settings
* @param ccrLicenseChecker the CCR license checker
*/
Ccr(final Settings settings, final CcrLicenseChecker ccrLicenseChecker) {
this.settings = settings;
this.enabled = CCR_ENABLED_SETTING.get(settings);
this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker);
}

@Override
public Collection<Object> createComponents(
final Client client,
final ClusterService clusterService,
final ThreadPool threadPool,
final ResourceWatcherService resourceWatcherService,
final ScriptService scriptService,
final NamedXContentRegistry xContentRegistry,
final Environment environment,
final NodeEnvironment nodeEnvironment,
final NamedWriteableRegistry namedWriteableRegistry) {
return Collections.singleton(ccrLicenseChecker);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.ccr;

import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.license.RemoteClusterLicenseChecker;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.core.XPackPlugin;

import java.util.Collections;
import java.util.Locale;
import java.util.Objects;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;

/**
* Encapsulates licensing checking for CCR.
*/
public final class CcrLicenseChecker {

private final BooleanSupplier isCcrAllowed;

/**
* Constructs a CCR license checker with the default rule based on the license state for checking if CCR is allowed.
*/
CcrLicenseChecker() {
this(XPackPlugin.getSharedLicenseState()::isCcrAllowed);
}

/**
* Constructs a CCR license checker with the specified boolean supplier.
*
* @param isCcrAllowed a boolean supplier that should return true if CCR is allowed and false otherwise
*/
CcrLicenseChecker(final BooleanSupplier isCcrAllowed) {
this.isCcrAllowed = Objects.requireNonNull(isCcrAllowed);
}

/**
* Returns whether or not CCR is allowed.
*
* @return true if CCR is allowed, otherwise false
*/
public boolean isCcrAllowed() {
return isCcrAllowed.getAsBoolean();
}

/**
* Fetches the leader index metadata from the remote cluster. Before fetching the index metadata, the remote cluster is checked for
* license compatibility with CCR. If the remote cluster is not licensed for CCR, the {@link ActionListener#onFailure(Exception)} method
* of the specified listener is invoked. Otherwise, the specified consumer is invoked with the leader index metadata fetched from the
* remote cluster.
*
* @param client the client
* @param clusterAlias the remote cluster alias
* @param leaderIndex the name of the leader index
* @param listener the listener
* @param leaderIndexMetadataConsumer the leader index metadata consumer
* @param <T> the type of response the listener is waiting for
*/
public <T> void checkRemoteClusterLicenseAndFetchLeaderIndexMetadata(
final Client client,
final String clusterAlias,
final String leaderIndex,
final ActionListener<T> listener,
final Consumer<IndexMetaData> leaderIndexMetadataConsumer) {
// we have to check the license on the remote cluster
new RemoteClusterLicenseChecker(client, XPackLicenseState::isCcrAllowedForOperationMode).checkRemoteClusterLicenses(
Collections.singletonList(clusterAlias),
new ActionListener<RemoteClusterLicenseChecker.LicenseCheck>() {

@Override
public void onResponse(final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) {
if (licenseCheck.isSuccess()) {
final Client remoteClient = client.getRemoteClusterClient(clusterAlias);
final ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.clear();
clusterStateRequest.metaData(true);
clusterStateRequest.indices(leaderIndex);
final ActionListener<ClusterStateResponse> clusterStateListener = ActionListener.wrap(
r -> {
final ClusterState remoteClusterState = r.getState();
final IndexMetaData leaderIndexMetadata =
remoteClusterState.getMetaData().index(leaderIndex);
leaderIndexMetadataConsumer.accept(leaderIndexMetadata);
},
listener::onFailure);
// following an index in remote cluster, so use remote client to fetch leader index metadata
remoteClient.admin().cluster().state(clusterStateRequest, clusterStateListener);
} else {
listener.onFailure(incompatibleRemoteLicense(leaderIndex, licenseCheck));
}
}

@Override
public void onFailure(final Exception e) {
listener.onFailure(unknownRemoteLicense(leaderIndex, clusterAlias, e));
}

});
}

private static ElasticsearchStatusException incompatibleRemoteLicense(
final String leaderIndex, final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) {
final String clusterAlias = licenseCheck.remoteClusterLicenseInfo().clusterAlias();
final String message = String.format(
Locale.ROOT,
"can not fetch remote index [%s:%s] metadata as the remote cluster [%s] is not licensed for [ccr]; %s",
clusterAlias,
leaderIndex,
clusterAlias,
RemoteClusterLicenseChecker.buildErrorMessage(
"ccr",
licenseCheck.remoteClusterLicenseInfo(),
RemoteClusterLicenseChecker::isLicensePlatinumOrTrial));
return new ElasticsearchStatusException(message, RestStatus.BAD_REQUEST);
}

private static ElasticsearchStatusException unknownRemoteLicense(
final String leaderIndex, final String clusterAlias, final Exception cause) {
final String message = String.format(
Locale.ROOT,
"can not fetch remote index [%s:%s] metadata as the license state of the remote cluster [%s] could not be determined",
clusterAlias,
leaderIndex,
clusterAlias);
return new ElasticsearchStatusException(message, RestStatus.BAD_REQUEST, cause);
}

}
Loading