Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CCR] Auto follow Coordinator fetch cluster state in system context #35120

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,8 @@ public boolean isCcrAllowed() {
* @param leaderIndex the name of the leader index
* @param onFailure the failure consumer
* @param consumer the consumer for supplying the leader index metadata and historyUUIDs of all leader shards
* @param <T> the type of response the listener is waiting for
*/
public <T> void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
final Client client,
final String clusterAlias,
final String leaderIndex,
Expand All @@ -118,8 +117,8 @@ public <T> void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUU
request.indices(leaderIndex);
checkRemoteClusterLicenseAndFetchClusterState(
client,
Collections.emptyMap(),
clusterAlias,
client.getRemoteClusterClient(clusterAlias),
request,
onFailure,
leaderClusterState -> {
Expand Down Expand Up @@ -151,22 +150,20 @@ public <T> void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUU
*
* @param client the client
* @param clusterAlias the remote cluster alias
* @param headers the headers to use for leader client
* @param request the cluster state request
* @param onFailure the failure consumer
* @param leaderClusterStateConsumer the leader cluster state consumer
*/
public void checkRemoteClusterLicenseAndFetchClusterState(
final Client client,
final Map<String, String> headers,
final String clusterAlias,
final ClusterStateRequest request,
final Consumer<Exception> onFailure,
final Consumer<ClusterState> leaderClusterStateConsumer) {
checkRemoteClusterLicenseAndFetchClusterState(
client,
headers,
clusterAlias,
systemClient(client.getRemoteClusterClient(clusterAlias)),
request,
onFailure,
leaderClusterStateConsumer,
Expand All @@ -182,18 +179,17 @@ public void checkRemoteClusterLicenseAndFetchClusterState(
*
* @param client the client
* @param clusterAlias the remote cluster alias
* @param headers the headers to use for leader client
* @param leaderClient the leader client to use to execute cluster state API
* @param request the cluster state request
* @param onFailure the failure consumer
* @param leaderClusterStateConsumer the leader cluster state consumer
* @param nonCompliantLicense the supplier for when the license state of the remote cluster is non-compliant
* @param unknownLicense the supplier for when the license state of the remote cluster is unknown due to failure
* @param <T> the type of response the listener is waiting for
*/
private <T> void checkRemoteClusterLicenseAndFetchClusterState(
private void checkRemoteClusterLicenseAndFetchClusterState(
final Client client,
final Map<String, String> headers,
final String clusterAlias,
final Client leaderClient,
final ClusterStateRequest request,
final Consumer<Exception> onFailure,
final Consumer<ClusterState> leaderClusterStateConsumer,
Expand All @@ -207,7 +203,6 @@ private <T> void checkRemoteClusterLicenseAndFetchClusterState(
@Override
public void onResponse(final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) {
if (licenseCheck.isSuccess()) {
final Client leaderClient = wrapClient(client.getRemoteClusterClient(clusterAlias), headers);
final ActionListener<ClusterStateResponse> clusterStateListener =
ActionListener.wrap(s -> leaderClusterStateConsumer.accept(s.getState()), onFailure);
// following an index in remote cluster, so use remote client to fetch leader index metadata
Expand Down Expand Up @@ -356,6 +351,21 @@ void doExecute(Action<Response> action, Request request, ActionListener<Response
}
}

private static Client systemClient(Client client) {
final ThreadContext threadContext = client.threadPool().getThreadContext();
return new FilterClient(client) {
@Override
protected <Request extends ActionRequest, Response extends ActionResponse>
void doExecute(Action<Response> action, Request request, ActionListener<Response> listener) {
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(false);
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
threadContext.markAsSystemContext();
super.doExecute(action, request, new ContextPreservingActionListener<>(supplier, listener));
}
}
};
}

private static ThreadContext.StoredContext stashWithHeaders(ThreadContext threadContext, Map<String, String> headers) {
final ThreadContext.StoredContext storedContext = threadContext.stashContext();
threadContext.copyHeaders(headers.entrySet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,16 +159,14 @@ private void doAutoFollow() {
AutoFollower operation = new AutoFollower(handler, followerClusterState) {

@Override
void getLeaderClusterState(final Map<String, String> headers,
final String remoteCluster,
void getLeaderClusterState(final String remoteCluster,
final BiConsumer<ClusterState, Exception> handler) {
final ClusterStateRequest request = new ClusterStateRequest();
request.clear();
request.metaData(true);
// TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API
ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState(
client,
headers,
remoteCluster,
request,
e -> handler.accept(null, e),
Expand Down Expand Up @@ -249,7 +247,7 @@ void autoFollowIndices() {
final String remoteCluster = autoFollowPattern.getRemoteCluster();

Map<String, String> headers = autoFollowMetadata.getHeaders().get(autoFollowPattenName);
getLeaderClusterState(headers, remoteCluster, (leaderClusterState, e) -> {
getLeaderClusterState(remoteCluster, (leaderClusterState, e) -> {
if (leaderClusterState != null) {
assert e == null;
final List<String> followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(autoFollowPattenName);
Expand Down Expand Up @@ -413,13 +411,10 @@ static Function<ClusterState, ClusterState> recordLeaderIndexAsFollowFunction(St

/**
* Fetch the cluster state from the leader with the specified cluster alias
*
* @param headers the client headers
* @param remoteCluster the name of the leader cluster
* @param handler the callback to invoke
*/
abstract void getLeaderClusterState(
Map<String, String> headers,
String remoteCluster,
BiConsumer<ClusterState, Exception> handler
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,20 +100,12 @@ protected void masterOperation(
client.getRemoteClusterClient(remoteCluster);

String leaderIndex = request.getLeaderIndex();
createFollowerIndexAndFollowRemoteIndex(request, remoteCluster, leaderIndex, listener);
}

private void createFollowerIndexAndFollowRemoteIndex(
final PutFollowAction.Request request,
final String remoteCluster,
final String leaderIndex,
final ActionListener<PutFollowAction.Response> listener) {
ccrLicenseChecker.checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
client,
remoteCluster,
leaderIndex,
listener::onFailure,
(historyUUID, leaderIndexMetaData) -> createFollowerIndex(leaderIndexMetaData, historyUUID, request, listener));
client,
remoteCluster,
leaderIndex,
listener::onFailure,
(historyUUID, leaderIndexMetaData) -> createFollowerIndex(leaderIndexMetaData, historyUUID, request, listener));
}

private void createFollowerIndex(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,9 @@ public void testAutoFollower() {
};
AutoFollower autoFollower = new AutoFollower(handler, currentState) {
@Override
void getLeaderClusterState(Map<String, String> headers,
String remoteCluster,
void getLeaderClusterState(String remoteCluster,
BiConsumer<ClusterState, Exception> handler) {
assertThat(headers, equalTo(autoFollowHeaders.get("remote")));
assertThat(remoteCluster, equalTo("remote"));
handler.accept(leaderState, null);
}

Expand Down Expand Up @@ -143,8 +142,7 @@ public void testAutoFollowerClusterStateApiFailure() {
};
AutoFollower autoFollower = new AutoFollower(handler, followerState) {
@Override
void getLeaderClusterState(Map<String, String> headers,
String remoteCluster,
void getLeaderClusterState(String remoteCluster,
BiConsumer<ClusterState, Exception> handler) {
handler.accept(null, failure);
}
Expand Down Expand Up @@ -204,8 +202,7 @@ public void testAutoFollowerUpdateClusterStateFailure() {
};
AutoFollower autoFollower = new AutoFollower(handler, followerState) {
@Override
void getLeaderClusterState(Map<String, String> headers,
String remoteCluster,
void getLeaderClusterState(String remoteCluster,
BiConsumer<ClusterState, Exception> handler) {
handler.accept(leaderState, null);
}
Expand Down Expand Up @@ -267,8 +264,7 @@ public void testAutoFollowerCreateAndFollowApiCallFailure() {
};
AutoFollower autoFollower = new AutoFollower(handler, followerState) {
@Override
void getLeaderClusterState(Map<String, String> headers,
String remoteCluster,
void getLeaderClusterState(String remoteCluster,
BiConsumer<ClusterState, Exception> handler) {
handler.accept(leaderState, null);
}
Expand Down