Skip to content

Commit

Permalink
[CCR] Auto follow Coordinator fetch cluster state in system context (#…
Browse files Browse the repository at this point in the history
…35120)

Auto follow Coordinator should fetch the leader cluster state using system context.
  • Loading branch information
martijnvg committed Nov 8, 2018
1 parent c3478a5 commit 1e1fd96
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,8 @@ public boolean isCcrAllowed() {
* @param leaderIndex the name of the leader index
* @param onFailure the failure consumer
* @param consumer the consumer for supplying the leader index metadata and historyUUIDs of all leader shards
* @param <T> the type of response the listener is waiting for
*/
public <T> void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
final Client client,
final String clusterAlias,
final String leaderIndex,
Expand All @@ -119,8 +118,8 @@ public <T> void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUU
request.indices(leaderIndex);
checkRemoteClusterLicenseAndFetchClusterState(
client,
Collections.emptyMap(),
clusterAlias,
client.getRemoteClusterClient(clusterAlias),
request,
onFailure,
leaderClusterState -> {
Expand Down Expand Up @@ -152,22 +151,20 @@ public <T> void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUU
*
* @param client the client
* @param clusterAlias the remote cluster alias
* @param headers the headers to use for leader client
* @param request the cluster state request
* @param onFailure the failure consumer
* @param leaderClusterStateConsumer the leader cluster state consumer
*/
public void checkRemoteClusterLicenseAndFetchClusterState(
final Client client,
final Map<String, String> headers,
final String clusterAlias,
final ClusterStateRequest request,
final Consumer<Exception> onFailure,
final Consumer<ClusterState> leaderClusterStateConsumer) {
checkRemoteClusterLicenseAndFetchClusterState(
client,
headers,
clusterAlias,
systemClient(client.getRemoteClusterClient(clusterAlias)),
request,
onFailure,
leaderClusterStateConsumer,
Expand All @@ -183,18 +180,17 @@ public void checkRemoteClusterLicenseAndFetchClusterState(
*
* @param client the client
* @param clusterAlias the remote cluster alias
* @param headers the headers to use for leader client
* @param leaderClient the leader client to use to execute cluster state API
* @param request the cluster state request
* @param onFailure the failure consumer
* @param leaderClusterStateConsumer the leader cluster state consumer
* @param nonCompliantLicense the supplier for when the license state of the remote cluster is non-compliant
* @param unknownLicense the supplier for when the license state of the remote cluster is unknown due to failure
* @param <T> the type of response the listener is waiting for
*/
private <T> void checkRemoteClusterLicenseAndFetchClusterState(
private void checkRemoteClusterLicenseAndFetchClusterState(
final Client client,
final Map<String, String> headers,
final String clusterAlias,
final Client leaderClient,
final ClusterStateRequest request,
final Consumer<Exception> onFailure,
final Consumer<ClusterState> leaderClusterStateConsumer,
Expand All @@ -208,7 +204,6 @@ private <T> void checkRemoteClusterLicenseAndFetchClusterState(
@Override
public void onResponse(final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) {
if (licenseCheck.isSuccess()) {
final Client leaderClient = wrapClient(client.getRemoteClusterClient(clusterAlias), headers);
final ActionListener<ClusterStateResponse> clusterStateListener =
ActionListener.wrap(s -> leaderClusterStateConsumer.accept(s.getState()), onFailure);
// following an index in remote cluster, so use remote client to fetch leader index metadata
Expand Down Expand Up @@ -363,6 +358,22 @@ void doExecute(Action<Request, Response, RequestBuilder> action, Request request
}
}

private static Client systemClient(Client client) {
final ThreadContext threadContext = client.threadPool().getThreadContext();
return new FilterClient(client) {
@Override
protected <Request extends ActionRequest, Response extends ActionResponse,
RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>>
void doExecute(Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(false);
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
threadContext.markAsSystemContext();
super.doExecute(action, request, new ContextPreservingActionListener<>(supplier, listener));
}
}
};
}

private static ThreadContext.StoredContext stashWithHeaders(ThreadContext threadContext, Map<String, String> headers) {
final ThreadContext.StoredContext storedContext = threadContext.stashContext();
threadContext.copyHeaders(headers.entrySet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,16 +159,14 @@ private void doAutoFollow() {
AutoFollower operation = new AutoFollower(handler, followerClusterState) {

@Override
void getLeaderClusterState(final Map<String, String> headers,
final String remoteCluster,
void getLeaderClusterState(final String remoteCluster,
final BiConsumer<ClusterState, Exception> handler) {
final ClusterStateRequest request = new ClusterStateRequest();
request.clear();
request.metaData(true);
// TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API
ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState(
client,
headers,
remoteCluster,
request,
e -> handler.accept(null, e),
Expand Down Expand Up @@ -249,7 +247,7 @@ void autoFollowIndices() {
final String remoteCluster = autoFollowPattern.getRemoteCluster();

Map<String, String> headers = autoFollowMetadata.getHeaders().get(autoFollowPattenName);
getLeaderClusterState(headers, remoteCluster, (leaderClusterState, e) -> {
getLeaderClusterState(remoteCluster, (leaderClusterState, e) -> {
if (leaderClusterState != null) {
assert e == null;
final List<String> followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(autoFollowPattenName);
Expand Down Expand Up @@ -413,13 +411,10 @@ static Function<ClusterState, ClusterState> recordLeaderIndexAsFollowFunction(St

/**
* Fetch the cluster state from the leader with the specified cluster alias
*
* @param headers the client headers
* @param remoteCluster the name of the leader cluster
* @param handler the callback to invoke
*/
abstract void getLeaderClusterState(
Map<String, String> headers,
String remoteCluster,
BiConsumer<ClusterState, Exception> handler
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,20 +100,12 @@ protected void masterOperation(
client.getRemoteClusterClient(remoteCluster);

String leaderIndex = request.getLeaderIndex();
createFollowerIndexAndFollowRemoteIndex(request, remoteCluster, leaderIndex, listener);
}

private void createFollowerIndexAndFollowRemoteIndex(
final PutFollowAction.Request request,
final String remoteCluster,
final String leaderIndex,
final ActionListener<PutFollowAction.Response> listener) {
ccrLicenseChecker.checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
client,
remoteCluster,
leaderIndex,
listener::onFailure,
(historyUUID, leaderIndexMetaData) -> createFollowerIndex(leaderIndexMetaData, historyUUID, request, listener));
client,
remoteCluster,
leaderIndex,
listener::onFailure,
(historyUUID, leaderIndexMetaData) -> createFollowerIndex(leaderIndexMetaData, historyUUID, request, listener));
}

private void createFollowerIndex(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,9 @@ public void testAutoFollower() {
};
AutoFollower autoFollower = new AutoFollower(handler, currentState) {
@Override
void getLeaderClusterState(Map<String, String> headers,
String remoteCluster,
void getLeaderClusterState(String remoteCluster,
BiConsumer<ClusterState, Exception> handler) {
assertThat(headers, equalTo(autoFollowHeaders.get("remote")));
assertThat(remoteCluster, equalTo("remote"));
handler.accept(leaderState, null);
}

Expand Down Expand Up @@ -143,8 +142,7 @@ public void testAutoFollowerClusterStateApiFailure() {
};
AutoFollower autoFollower = new AutoFollower(handler, followerState) {
@Override
void getLeaderClusterState(Map<String, String> headers,
String remoteCluster,
void getLeaderClusterState(String remoteCluster,
BiConsumer<ClusterState, Exception> handler) {
handler.accept(null, failure);
}
Expand Down Expand Up @@ -204,8 +202,7 @@ public void testAutoFollowerUpdateClusterStateFailure() {
};
AutoFollower autoFollower = new AutoFollower(handler, followerState) {
@Override
void getLeaderClusterState(Map<String, String> headers,
String remoteCluster,
void getLeaderClusterState(String remoteCluster,
BiConsumer<ClusterState, Exception> handler) {
handler.accept(leaderState, null);
}
Expand Down Expand Up @@ -267,8 +264,7 @@ public void testAutoFollowerCreateAndFollowApiCallFailure() {
};
AutoFollower autoFollower = new AutoFollower(handler, followerState) {
@Override
void getLeaderClusterState(Map<String, String> headers,
String remoteCluster,
void getLeaderClusterState(String remoteCluster,
BiConsumer<ClusterState, Exception> handler) {
handler.accept(leaderState, null);
}
Expand Down

0 comments on commit 1e1fd96

Please sign in to comment.