-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Generalize remote license checker #32971
Merged
jasontedor
merged 24 commits into
elastic:master
from
jasontedor:generalize-remote-license-check
Aug 20, 2018
Merged
Changes from all commits
Commits
Show all changes
24 commits
Select commit
Hold shift + click to select a range
a9c5d43
Generalize remote license checker
jasontedor 36f3c6a
Remove unnecessary test
jasontedor 71e1c3e
Fix imports
jasontedor a0ba2f3
Make class final
jasontedor fcbf9b2
Make class final
jasontedor 9eff09b
A little more cleanup
jasontedor 0c2a6e8
One more comment
jasontedor ae84438
Move comment
jasontedor c28c950
Fix forbidden API violations
jasontedor 87ad218
Make class final
jasontedor 3744702
Add missing newline
jasontedor 844830d
Merge branch 'master' into generalize-remote-license-check
jasontedor 2f8d5ad
Simplify error message
jasontedor 0ed0b7e
Remove import
jasontedor e16617a
Fix line length violation
jasontedor 952d501
Fix terminology
jasontedor 8f25930
Fix bug in getting remote client!
jasontedor 901acdc
Fix ML error handling
jasontedor 222dea9
Preserve context after remote license check
jasontedor 23381ae
Clarify with ()
jasontedor 24fb7de
Convert to assertion
jasontedor 2816672
More test coverage
jasontedor 73fe46c
Add test that we run under system context
jasontedor 5c097f3
Fix error message
jasontedor File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
281 changes: 281 additions & 0 deletions
281
x-pack/plugin/core/src/main/java/org/elasticsearch/license/RemoteClusterLicenseChecker.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,281 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
|
||
package org.elasticsearch.license; | ||
|
||
import org.elasticsearch.ElasticsearchException; | ||
import org.elasticsearch.action.ActionListener; | ||
import org.elasticsearch.action.support.ContextPreservingActionListener; | ||
import org.elasticsearch.client.Client; | ||
import org.elasticsearch.common.util.concurrent.ThreadContext; | ||
import org.elasticsearch.protocol.xpack.XPackInfoRequest; | ||
import org.elasticsearch.protocol.xpack.XPackInfoResponse; | ||
import org.elasticsearch.protocol.xpack.license.LicenseStatus; | ||
import org.elasticsearch.transport.RemoteClusterAware; | ||
import org.elasticsearch.xpack.core.action.XPackInfoAction; | ||
|
||
import java.util.EnumSet; | ||
import java.util.Iterator; | ||
import java.util.List; | ||
import java.util.Locale; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
import java.util.function.Predicate; | ||
import java.util.stream.Collectors; | ||
|
||
/** | ||
* Checks remote clusters for license compatibility with a specified license predicate. | ||
*/ | ||
public final class RemoteClusterLicenseChecker { | ||
|
||
/** | ||
* Encapsulates the license info of a remote cluster. | ||
*/ | ||
public static final class RemoteClusterLicenseInfo { | ||
|
||
private final String clusterAlias; | ||
|
||
/** | ||
* The alias of the remote cluster. | ||
* | ||
* @return the cluster alias | ||
*/ | ||
public String clusterAlias() { | ||
return clusterAlias; | ||
} | ||
|
||
private final XPackInfoResponse.LicenseInfo licenseInfo; | ||
|
||
/** | ||
* The license info of the remote cluster. | ||
* | ||
* @return the license info | ||
*/ | ||
public XPackInfoResponse.LicenseInfo licenseInfo() { | ||
return licenseInfo; | ||
} | ||
|
||
RemoteClusterLicenseInfo(final String clusterAlias, final XPackInfoResponse.LicenseInfo licenseInfo) { | ||
this.clusterAlias = clusterAlias; | ||
this.licenseInfo = licenseInfo; | ||
} | ||
|
||
} | ||
|
||
/** | ||
* Encapsulates a remote cluster license check. The check is either successful if the license of the remote cluster is compatible with | ||
* the predicate used to check license compatibility, or the check is a failure. | ||
*/ | ||
public static final class LicenseCheck { | ||
|
||
private final RemoteClusterLicenseInfo remoteClusterLicenseInfo; | ||
|
||
/** | ||
* The remote cluster license info. This method should only be invoked if this instance represents a failing license check. | ||
* | ||
* @return the remote cluster license info | ||
*/ | ||
public RemoteClusterLicenseInfo remoteClusterLicenseInfo() { | ||
assert isSuccess() == false; | ||
return remoteClusterLicenseInfo; | ||
} | ||
|
||
private static final LicenseCheck SUCCESS = new LicenseCheck(null); | ||
|
||
/** | ||
* A successful license check. | ||
* | ||
* @return a successful license check instance | ||
*/ | ||
public static LicenseCheck success() { | ||
return SUCCESS; | ||
} | ||
|
||
/** | ||
* Test if this instance represents a successful license check. | ||
* | ||
* @return true if this instance represents a successful license check, otherwise false | ||
*/ | ||
public boolean isSuccess() { | ||
return this == SUCCESS; | ||
} | ||
|
||
/** | ||
* Creates a failing license check encapsulating the specified remote cluster license info. | ||
* | ||
* @param remoteClusterLicenseInfo the remote cluster license info | ||
* @return a failing license check | ||
*/ | ||
public static LicenseCheck failure(final RemoteClusterLicenseInfo remoteClusterLicenseInfo) { | ||
return new LicenseCheck(remoteClusterLicenseInfo); | ||
} | ||
|
||
private LicenseCheck(final RemoteClusterLicenseInfo remoteClusterLicenseInfo) { | ||
this.remoteClusterLicenseInfo = remoteClusterLicenseInfo; | ||
} | ||
|
||
} | ||
|
||
private final Client client; | ||
private final Predicate<XPackInfoResponse.LicenseInfo> predicate; | ||
|
||
/** | ||
* Constructs a remote cluster license checker with the specified license predicate for checking license compatibility. The predicate | ||
* does not need to check for the active license state as this is handled by the remote cluster license checker. | ||
* | ||
* @param client the client | ||
* @param predicate the license predicate | ||
*/ | ||
public RemoteClusterLicenseChecker(final Client client, final Predicate<XPackInfoResponse.LicenseInfo> predicate) { | ||
this.client = client; | ||
this.predicate = predicate; | ||
} | ||
|
||
public static boolean isLicensePlatinumOrTrial(final XPackInfoResponse.LicenseInfo licenseInfo) { | ||
final License.OperationMode mode = License.OperationMode.resolve(licenseInfo.getMode()); | ||
return mode == License.OperationMode.PLATINUM || mode == License.OperationMode.TRIAL; | ||
} | ||
|
||
/** | ||
* Checks the specified clusters for license compatibility. The specified callback will be invoked once if all clusters are | ||
* license-compatible, otherwise the specified callback will be invoked once on the first cluster that is not license-compatible. | ||
* | ||
* @param clusterAliases the cluster aliases to check | ||
* @param listener a callback | ||
*/ | ||
public void checkRemoteClusterLicenses(final List<String> clusterAliases, final ActionListener<LicenseCheck> listener) { | ||
final Iterator<String> clusterAliasesIterator = clusterAliases.iterator(); | ||
if (clusterAliasesIterator.hasNext() == false) { | ||
listener.onResponse(LicenseCheck.success()); | ||
return; | ||
} | ||
|
||
final AtomicReference<String> clusterAlias = new AtomicReference<>(); | ||
|
||
final ActionListener<XPackInfoResponse> infoListener = new ActionListener<XPackInfoResponse>() { | ||
|
||
@Override | ||
public void onResponse(final XPackInfoResponse xPackInfoResponse) { | ||
final XPackInfoResponse.LicenseInfo licenseInfo = xPackInfoResponse.getLicenseInfo(); | ||
if ((licenseInfo.getStatus() == LicenseStatus.ACTIVE) == false || predicate.test(licenseInfo) == false) { | ||
listener.onResponse(LicenseCheck.failure(new RemoteClusterLicenseInfo(clusterAlias.get(), licenseInfo))); | ||
return; | ||
} | ||
|
||
if (clusterAliasesIterator.hasNext()) { | ||
clusterAlias.set(clusterAliasesIterator.next()); | ||
// recurse to the next cluster | ||
remoteClusterLicense(clusterAlias.get(), this); | ||
} else { | ||
listener.onResponse(LicenseCheck.success()); | ||
} | ||
} | ||
|
||
@Override | ||
public void onFailure(final Exception e) { | ||
final String message = "could not determine the license type for cluster [" + clusterAlias.get() + "]"; | ||
listener.onFailure(new ElasticsearchException(message, e)); | ||
} | ||
|
||
}; | ||
|
||
// check the license on the first cluster, and then we recursively check licenses on the remaining clusters | ||
clusterAlias.set(clusterAliasesIterator.next()); | ||
remoteClusterLicense(clusterAlias.get(), infoListener); | ||
} | ||
|
||
private void remoteClusterLicense(final String clusterAlias, final ActionListener<XPackInfoResponse> listener) { | ||
final ThreadContext threadContext = client.threadPool().getThreadContext(); | ||
final ContextPreservingActionListener<XPackInfoResponse> contextPreservingActionListener = | ||
new ContextPreservingActionListener<>(threadContext.newRestorableContext(false), listener); | ||
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { | ||
// we stash any context here since this is an internal execution and should not leak any existing context information | ||
threadContext.markAsSystemContext(); | ||
|
||
final XPackInfoRequest request = new XPackInfoRequest(); | ||
request.setCategories(EnumSet.of(XPackInfoRequest.Category.LICENSE)); | ||
try { | ||
client.getRemoteClusterClient(clusterAlias).execute(XPackInfoAction.INSTANCE, request, contextPreservingActionListener); | ||
} catch (final Exception e) { | ||
contextPreservingActionListener.onFailure(e); | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Predicate to test if the index name represents the name of a remote index. | ||
* | ||
* @param index the index name | ||
* @return true if the collection of indices contains a remote index, otherwise false | ||
*/ | ||
public static boolean isRemoteIndex(final String index) { | ||
return index.indexOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR) != -1; | ||
} | ||
|
||
/** | ||
* Predicate to test if the collection of index names contains any that represent the name of a remote index. | ||
* | ||
* @param indices the collection of index names | ||
* @return true if the collection of index names contains a name that represents a remote index, otherwise false | ||
*/ | ||
public static boolean containsRemoteIndex(final List<String> indices) { | ||
return indices.stream().anyMatch(RemoteClusterLicenseChecker::isRemoteIndex); | ||
} | ||
|
||
/** | ||
* Filters the collection of index names for names that represent a remote index. Remote index names are of the form | ||
* {@code cluster_name:index_name}. | ||
* | ||
* @param indices the collection of index names | ||
* @return list of index names that represent remote index names | ||
*/ | ||
public static List<String> remoteIndices(final List<String> indices) { | ||
return indices.stream().filter(RemoteClusterLicenseChecker::isRemoteIndex).collect(Collectors.toList()); | ||
} | ||
|
||
/** | ||
* Extract the list of remote cluster aliases from the list of index names. Remote index names are of the form | ||
* {@code cluster_alias:index_name} and the cluster_alias is extracted for each index name that represents a remote index. | ||
* | ||
* @param indices the collection of index names | ||
* @return the remote cluster names | ||
*/ | ||
public static List<String> remoteClusterAliases(final List<String> indices) { | ||
return indices.stream() | ||
.filter(RemoteClusterLicenseChecker::isRemoteIndex) | ||
.map(index -> index.substring(0, index.indexOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR))) | ||
.distinct() | ||
.collect(Collectors.toList()); | ||
} | ||
|
||
/** | ||
* Constructs an error message for license incompatibility. | ||
* | ||
* @param feature the name of the feature that initiated the remote cluster license check. | ||
* @param remoteClusterLicenseInfo the remote cluster license info of the cluster that failed the license check | ||
* @return an error message representing license incompatibility | ||
*/ | ||
public static String buildErrorMessage( | ||
final String feature, | ||
final RemoteClusterLicenseInfo remoteClusterLicenseInfo, | ||
final Predicate<XPackInfoResponse.LicenseInfo> predicate) { | ||
final StringBuilder error = new StringBuilder(); | ||
if (remoteClusterLicenseInfo.licenseInfo().getStatus() != LicenseStatus.ACTIVE) { | ||
error.append(String.format(Locale.ROOT, "the license on cluster [%s] is not active", remoteClusterLicenseInfo.clusterAlias())); | ||
} else { | ||
assert predicate.test(remoteClusterLicenseInfo.licenseInfo()) == false : "license must be incompatible to build error message"; | ||
final String message = String.format( | ||
Locale.ROOT, | ||
"the license mode [%s] on cluster [%s] does not enable [%s]", | ||
License.OperationMode.resolve(remoteClusterLicenseInfo.licenseInfo().getMode()), | ||
remoteClusterLicenseInfo.clusterAlias(), | ||
feature); | ||
error.append(message); | ||
} | ||
|
||
return error.toString(); | ||
} | ||
|
||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This fixes a bug in the existing implementation. This bug does not impact the existing usage of checking the license of a remote cluster. This is because the existing context is not needed when starting a data feed in ML. However, in CCR after we check the remote license we will execute another action (the follow index action). Prior to this fix, that action would execute under the system context. The system context runs with limited privileges. This means that invoking that additional action would not have privileges to execute. Instead, this action should be executed with the original context to preserve the user that initiated the action. This is why we must preserve the context here. There is a test for this behavior in the unit tests for this class.