Skip to content

Commit

Permalink
[CCR] Auto follow pattern APIs adjustments (#34518)
Browse files Browse the repository at this point in the history
* Changed the resource id of auto follow patterns to be a user defined name
instead of being the leader cluster alias name.
* Fail when an unfollowed leader index matches with two or more auto follow patterns.
  • Loading branch information
martijnvg authored and kcm committed Oct 30, 2018
1 parent 01da252 commit 6c65667
Show file tree
Hide file tree
Showing 30 changed files with 324 additions and 186 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ public void testFollow() {
public void testAutoFollow() throws Exception {
assumeFalse("windows is the worst", Constants.WINDOWS);
if (runningAgainstLeaderCluster == false) {
final Request request = new Request("PUT", "/_ccr/auto_follow/leader_cluster");
request.setJsonEntity("{\"leader_index_patterns\":[\"*\"]}");
final Request request = new Request("PUT", "/_ccr/auto_follow/test_pattern");
request.setJsonEntity("{\"leader_index_patterns\":[\"*\"], \"leader_cluster\": \"leader_cluster\"}");
client().performRequest(request);

// parse the logs and ensure that the auto-coordinator skipped coordination on the leader cluster
Expand All @@ -64,7 +64,7 @@ public void testAutoFollow() throws Exception {
while (it.hasNext()) {
final String line = it.next();
if (line.matches(".*\\[WARN\\s*\\]\\[o\\.e\\.x\\.c\\.a\\.AutoFollowCoordinator\\s*\\] \\[node-0\\] " +
"failure occurred while fetching cluster state in leader cluster \\[leader_cluster\\]")) {
"failure occurred while fetching cluster state for auto follow pattern \\[test_pattern\\]")) {
warn = true;
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,14 @@ public void testAutoFollowPatterns() throws Exception {
String disallowedIndex = "logs-us-20190101";

{
Request request = new Request("PUT", "/_ccr/auto_follow/leader_cluster");
request.setJsonEntity("{\"leader_index_patterns\": [\"logs-*\"]}");
Request request = new Request("PUT", "/_ccr/auto_follow/test_pattern");
request.setJsonEntity("{\"leader_index_patterns\": [\"logs-*\"], \"leader_cluster\": \"leader_cluster\"}");
Exception e = expectThrows(ResponseException.class, () -> assertOK(client().performRequest(request)));
assertThat(e.getMessage(), containsString("insufficient privileges to follow index [logs-*]"));
}

Request request = new Request("PUT", "/_ccr/auto_follow/leader_cluster");
request.setJsonEntity("{\"leader_index_patterns\": [\"logs-eu-*\"]}");
Request request = new Request("PUT", "/_ccr/auto_follow/test_pattern");
request.setJsonEntity("{\"leader_index_patterns\": [\"logs-eu-*\"], \"leader_cluster\": \"leader_cluster\"}");
assertOK(client().performRequest(request));

try (RestClient leaderClient = buildLeaderClient()) {
Expand Down Expand Up @@ -185,7 +185,7 @@ public void testAutoFollowPatterns() throws Exception {
});

// Cleanup by deleting auto follow pattern and pause following:
request = new Request("DELETE", "/_ccr/auto_follow/leader_cluster");
request = new Request("DELETE", "/_ccr/auto_follow/test_pattern");
assertOK(client().performRequest(request));
pauseFollow(allowedIndex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ public void testFollowNonExistingLeaderIndex() throws Exception {
public void testAutoFollowPatterns() throws Exception {
assumeFalse("Test should only run when both clusters are running", runningAgainstLeaderCluster);

Request request = new Request("PUT", "/_ccr/auto_follow/leader_cluster");
request.setJsonEntity("{\"leader_index_patterns\": [\"logs-*\"]}");
Request request = new Request("PUT", "/_ccr/auto_follow/test_pattern");
request.setJsonEntity("{\"leader_index_patterns\": [\"logs-*\"], \"leader_cluster\": \"leader_cluster\"}");
assertOK(client().performRequest(request));

try (RestClient leaderClient = buildLeaderClient()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,32 @@

- do:
ccr.put_auto_follow_pattern:
leader_cluster: local
name: my_pattern
body:
leader_cluster: local
leader_index_patterns: ['logs-*']
max_concurrent_read_batches: 2
- is_true: acknowledged

- do:
ccr.get_auto_follow_pattern:
leader_cluster: local
- match: { local.leader_index_patterns: ['logs-*'] }
- match: { local.max_concurrent_read_batches: 2 }
name: my_pattern
- match: { my_pattern.leader_cluster: 'local' }
- match: { my_pattern.leader_index_patterns: ['logs-*'] }
- match: { my_pattern.max_concurrent_read_batches: 2 }

- do:
ccr.get_auto_follow_pattern: {}
- match: { local.leader_index_patterns: ['logs-*'] }
- match: { local.max_concurrent_read_batches: 2 }
- match: { my_pattern.leader_cluster: 'local' }
- match: { my_pattern.leader_index_patterns: ['logs-*'] }
- match: { my_pattern.max_concurrent_read_batches: 2 }

- do:
ccr.delete_auto_follow_pattern:
leader_cluster: local
name: my_pattern
- is_true: acknowledged

- do:
catch: missing
ccr.get_auto_follow_pattern:
leader_cluster: local
name: my_pattern
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* A component that runs only on the elected master node and follows leader indices automatically
Expand Down Expand Up @@ -105,19 +106,19 @@ public synchronized AutoFollowStats getStats() {
synchronized void updateStats(List<AutoFollowResult> results) {
for (AutoFollowResult result : results) {
if (result.clusterStateFetchException != null) {
recentAutoFollowErrors.put(result.clusterAlias,
recentAutoFollowErrors.put(result.autoFollowPatternName,
new ElasticsearchException(result.clusterStateFetchException));
numberOfFailedRemoteClusterStateRequests++;
LOGGER.warn(new ParameterizedMessage("failure occurred while fetching cluster state in leader cluster [{}]",
result.clusterAlias), result.clusterStateFetchException);
LOGGER.warn(new ParameterizedMessage("failure occurred while fetching cluster state for auto follow pattern [{}]",
result.autoFollowPatternName), result.clusterStateFetchException);
} else {
for (Map.Entry<Index, Exception> entry : result.autoFollowExecutionResults.entrySet()) {
if (entry.getValue() != null) {
numberOfFailedIndicesAutoFollowed++;
recentAutoFollowErrors.put(result.clusterAlias + ":" + entry.getKey().getName(),
recentAutoFollowErrors.put(result.autoFollowPatternName + ":" + entry.getKey().getName(),
ExceptionsHelper.convertToElastic(entry.getValue()));
LOGGER.warn(new ParameterizedMessage("failure occurred while auto following index [{}] in leader cluster [{}]",
entry.getKey(), result.clusterAlias), entry.getValue());
LOGGER.warn(new ParameterizedMessage("failure occurred while auto following index [{}] for auto follow " +
"pattern [{}]", entry.getKey(), result.autoFollowPatternName), entry.getValue());
} else {
numberOfSuccessfulIndicesAutoFollowed++;
}
Expand Down Expand Up @@ -243,51 +244,77 @@ void autoFollowIndices() {
int i = 0;
for (Map.Entry<String, AutoFollowPattern> entry : autoFollowMetadata.getPatterns().entrySet()) {
final int slot = i;
final String clusterAlias = entry.getKey();
final String autoFollowPattenName = entry.getKey();
final AutoFollowPattern autoFollowPattern = entry.getValue();
final String leaderCluster = autoFollowPattern.getLeaderCluster();

Map<String, String> headers = autoFollowMetadata.getHeaders().get(clusterAlias);
getLeaderClusterState(headers, clusterAlias, (leaderClusterState, e) -> {
Map<String, String> headers = autoFollowMetadata.getHeaders().get(autoFollowPattenName);
getLeaderClusterState(headers, leaderCluster, (leaderClusterState, e) -> {
if (leaderClusterState != null) {
assert e == null;
final List<String> followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(clusterAlias);
final List<Index> leaderIndicesToFollow = getLeaderIndicesToFollow(clusterAlias, autoFollowPattern,
final List<String> followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(autoFollowPattenName);
final List<Index> leaderIndicesToFollow = getLeaderIndicesToFollow(leaderCluster, autoFollowPattern,
leaderClusterState, followerClusterState, followedIndices);
if (leaderIndicesToFollow.isEmpty()) {
finalise(slot, new AutoFollowResult(clusterAlias));
finalise(slot, new AutoFollowResult(autoFollowPattenName));
} else {
List<Tuple<String, AutoFollowPattern>> patternsForTheSameLeaderCluster = autoFollowMetadata.getPatterns()
.entrySet().stream()
.filter(item -> autoFollowPattenName.equals(item.getKey()) == false)
.filter(item -> leaderCluster.equals(item.getValue().getLeaderCluster()))
.map(item -> new Tuple<>(item.getKey(), item.getValue()))
.collect(Collectors.toList());

Consumer<AutoFollowResult> resultHandler = result -> finalise(slot, result);
checkAutoFollowPattern(clusterAlias, autoFollowPattern, leaderIndicesToFollow, headers, resultHandler);
checkAutoFollowPattern(autoFollowPattenName, leaderCluster, autoFollowPattern, leaderIndicesToFollow, headers,
patternsForTheSameLeaderCluster, resultHandler);
}
} else {
finalise(slot, new AutoFollowResult(clusterAlias, e));
finalise(slot, new AutoFollowResult(autoFollowPattenName, e));
}
});
i++;
}
}

private void checkAutoFollowPattern(String clusterAlias,
private void checkAutoFollowPattern(String autoFollowPattenName,
String clusterAlias,
AutoFollowPattern autoFollowPattern,
List<Index> leaderIndicesToFollow,
Map<String, String> headers,
List<Tuple<String, AutoFollowPattern>> patternsForTheSameLeaderCluster,
Consumer<AutoFollowResult> resultHandler) {

final CountDown leaderIndicesCountDown = new CountDown(leaderIndicesToFollow.size());
final AtomicArray<Tuple<Index, Exception>> results = new AtomicArray<>(leaderIndicesToFollow.size());
for (int i = 0; i < leaderIndicesToFollow.size(); i++) {
final Index indexToFollow = leaderIndicesToFollow.get(i);
final int slot = i;
followLeaderIndex(clusterAlias, indexToFollow, autoFollowPattern, headers, error -> {
results.set(slot, new Tuple<>(indexToFollow, error));

List<String> otherMatchingPatterns = patternsForTheSameLeaderCluster.stream()
.filter(otherPattern -> otherPattern.v2().match(indexToFollow.getName()))
.map(Tuple::v1)
.collect(Collectors.toList());
if (otherMatchingPatterns.size() != 0) {
results.set(slot, new Tuple<>(indexToFollow, new ElasticsearchException("index to follow [" + indexToFollow.getName() +
"] for pattern [" + autoFollowPattenName + "] matches with other patterns " + otherMatchingPatterns + "")));
if (leaderIndicesCountDown.countDown()) {
resultHandler.accept(new AutoFollowResult(clusterAlias, results.asList()));
resultHandler.accept(new AutoFollowResult(autoFollowPattenName, results.asList()));
}
});
} else {
followLeaderIndex(autoFollowPattenName, clusterAlias, indexToFollow, autoFollowPattern, headers, error -> {
results.set(slot, new Tuple<>(indexToFollow, error));
if (leaderIndicesCountDown.countDown()) {
resultHandler.accept(new AutoFollowResult(autoFollowPattenName, results.asList()));
}
});
}

}
}

private void followLeaderIndex(String clusterAlias,
private void followLeaderIndex(String autoFollowPattenName,
String clusterAlias,
Index indexToFollow,
AutoFollowPattern pattern,
Map<String,String> headers,
Expand All @@ -313,7 +340,7 @@ private void followLeaderIndex(String clusterAlias,

// This function updates the auto follow metadata in the cluster to record that the leader index has been followed:
// (so that we do not try to follow it in subsequent auto follow runs)
Function<ClusterState, ClusterState> function = recordLeaderIndexAsFollowFunction(clusterAlias, indexToFollow);
Function<ClusterState, ClusterState> function = recordLeaderIndexAsFollowFunction(autoFollowPattenName, indexToFollow);
// The coordinator always runs on the elected master node, so we can update cluster state here:
updateAutoFollowMetadata(function, onResult);
};
Expand Down Expand Up @@ -356,12 +383,12 @@ static String getFollowerIndexName(AutoFollowPattern autoFollowPattern, String l
}
}

static Function<ClusterState, ClusterState> recordLeaderIndexAsFollowFunction(String clusterAlias,
static Function<ClusterState, ClusterState> recordLeaderIndexAsFollowFunction(String name,
Index indexToFollow) {
return currentState -> {
AutoFollowMetadata currentAutoFollowMetadata = currentState.metaData().custom(AutoFollowMetadata.TYPE);
Map<String, List<String>> newFollowedIndexUUIDS = new HashMap<>(currentAutoFollowMetadata.getFollowedLeaderIndexUUIDs());
newFollowedIndexUUIDS.compute(clusterAlias, (key, existingUUIDs) -> {
newFollowedIndexUUIDS.compute(name, (key, existingUUIDs) -> {
assert existingUUIDs != null;
List<String> newUUIDs = new ArrayList<>(existingUUIDs);
newUUIDs.add(indexToFollow.getUUID());
Expand Down Expand Up @@ -405,12 +432,12 @@ abstract void updateAutoFollowMetadata(

static class AutoFollowResult {

final String clusterAlias;
final String autoFollowPatternName;
final Exception clusterStateFetchException;
final Map<Index, Exception> autoFollowExecutionResults;

AutoFollowResult(String clusterAlias, List<Tuple<Index, Exception>> results) {
this.clusterAlias = clusterAlias;
AutoFollowResult(String autoFollowPatternName, List<Tuple<Index, Exception>> results) {
this.autoFollowPatternName = autoFollowPatternName;

Map<Index, Exception> autoFollowExecutionResults = new HashMap<>();
for (Tuple<Index, Exception> result : results) {
Expand All @@ -421,14 +448,14 @@ static class AutoFollowResult {
this.autoFollowExecutionResults = Collections.unmodifiableMap(autoFollowExecutionResults);
}

AutoFollowResult(String clusterAlias, Exception e) {
this.clusterAlias = clusterAlias;
AutoFollowResult(String autoFollowPatternName, Exception e) {
this.autoFollowPatternName = autoFollowPatternName;
this.clusterStateFetchException = e;
this.autoFollowExecutionResults = Collections.emptyMap();
}

AutoFollowResult(String clusterAlias) {
this(clusterAlias, (Exception) null);
AutoFollowResult(String autoFollowPatternName) {
this(autoFollowPatternName, (Exception) null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ protected AcknowledgedResponse newResponse() {
protected void masterOperation(DeleteAutoFollowPatternAction.Request request,
ClusterState state,
ActionListener<AcknowledgedResponse> listener) throws Exception {
clusterService.submitStateUpdateTask("put-auto-follow-pattern-" + request.getLeaderCluster(),
clusterService.submitStateUpdateTask("put-auto-follow-pattern-" + request.getName(),
new AckedClusterStateUpdateTask<AcknowledgedResponse>(request, listener) {

@Override
Expand All @@ -72,23 +72,23 @@ public ClusterState execute(ClusterState currentState) throws Exception {
static ClusterState innerDelete(DeleteAutoFollowPatternAction.Request request, ClusterState currentState) {
AutoFollowMetadata currentAutoFollowMetadata = currentState.metaData().custom(AutoFollowMetadata.TYPE);
if (currentAutoFollowMetadata == null) {
throw new ResourceNotFoundException("no auto-follow patterns for cluster alias [{}] found",
request.getLeaderCluster());
throw new ResourceNotFoundException("auto-follow pattern [{}] is missing",
request.getName());
}
Map<String, AutoFollowPattern> patterns = currentAutoFollowMetadata.getPatterns();
AutoFollowPattern autoFollowPatternToRemove = patterns.get(request.getLeaderCluster());
AutoFollowPattern autoFollowPatternToRemove = patterns.get(request.getName());
if (autoFollowPatternToRemove == null) {
throw new ResourceNotFoundException("no auto-follow patterns for cluster alias [{}] found",
request.getLeaderCluster());
throw new ResourceNotFoundException("auto-follow pattern [{}] is missing",
request.getName());
}

final Map<String, AutoFollowPattern> patternsCopy = new HashMap<>(patterns);
final Map<String, List<String>> followedLeaderIndexUUIDSCopy =
new HashMap<>(currentAutoFollowMetadata.getFollowedLeaderIndexUUIDs());
final Map<String, Map<String, String>> headers = new HashMap<>(currentAutoFollowMetadata.getHeaders());
patternsCopy.remove(request.getLeaderCluster());
followedLeaderIndexUUIDSCopy.remove(request.getLeaderCluster());
headers.remove(request.getLeaderCluster());
patternsCopy.remove(request.getName());
followedLeaderIndexUUIDSCopy.remove(request.getName());
headers.remove(request.getName());

AutoFollowMetadata newAutoFollowMetadata = new AutoFollowMetadata(patternsCopy, followedLeaderIndexUUIDSCopy, headers);
ClusterState.Builder newState = ClusterState.builder(currentState);
Expand Down
Loading

0 comments on commit 6c65667

Please sign in to comment.