Skip to content

Commit

Permalink
[CCR] Added auto follow patterns feature (#33118)
Browse files Browse the repository at this point in the history
Auto Following Patterns is a cross cluster replication feature that
keeps track whether in the leader cluster indices are being created with
names that match with a specific pattern and if so automatically let
the follower cluster follow these newly created indices.

This change adds an `AutoFollowCoordinator` component that is only active
on the elected master node. Periodically this component checks the
 the cluster state of remote clusters if there new leader indices that
match with configured auto follow patterns that have been defined in
`AutoFollowMetadata` custom metadata.

This change also adds two new APIs to manage auto follow patterns. A put
auto follow pattern api:

```
PUT /_ccr/_autofollow/{{remote_cluster}}
{
   "leader_index_pattern": ["logs-*", ...],
   "follow_index_pattern": "{{leader_index}}-copy",
   "max_concurrent_read_batches": 2
   ... // other optional parameters
}
```

and delete auto follow pattern api:

```
DELETE /_ccr/_autofollow/{{remote_cluster_alias}}
```

The auto follow patterns are directly tied to the remote cluster aliases
configured in the follow cluster.

Relates to #33007

Co-authored-by: Jason Tedor <[email protected]>
  • Loading branch information
martijnvg and jasontedor committed Sep 6, 2018
1 parent 09714a8 commit c6f3ada
Show file tree
Hide file tree
Showing 23 changed files with 2,410 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,15 @@ public static boolean simpleMatch(String[] patterns, String str) {
return false;
}

/**
* Similar to {@link #simpleMatch(String[], String)}, but accepts a list of strings instead of an array of strings for the patterns to
* match.
*/
public static boolean simpleMatch(final List<String> patterns, final String str) {
// #simpleMatch(String[], String) is likely to be inlined into this method
return patterns != null && simpleMatch(patterns.toArray(Strings.EMPTY_ARRAY), str);
}

public static boolean simpleMatch(String[] patterns, String[] types) {
if (patterns != null && types != null) {
for (String type : types) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,34 @@ public void testFollowIndex() throws Exception {
}
}

public void testAutoFollowPatterns() throws Exception {
assumeFalse("Test should only run when both clusters are running", runningAgainstLeaderCluster);

Request request = new Request("PUT", "/_ccr/_auto_follow/leader_cluster");
request.setJsonEntity("{\"leader_index_patterns\": [\"logs-*\"]}");
assertOK(client().performRequest(request));

try (RestClient leaderClient = buildLeaderClient()) {
Settings settings = Settings.builder()
.put("index.soft_deletes.enabled", true)
.build();
request = new Request("PUT", "/logs-20190101");
request.setJsonEntity("{\"settings\": " + Strings.toString(settings) +
", \"mappings\": {\"_doc\": {\"properties\": {\"field\": {\"type\": \"keyword\"}}}} }");
assertOK(leaderClient.performRequest(request));

for (int i = 0; i < 5; i++) {
String id = Integer.toString(i);
index(leaderClient, "logs-20190101", id, "field", i, "filtered_field", "true");
}
}

assertBusy(() -> {
ensureYellow("logs-20190101");
verifyDocuments("logs-20190101", 5);
});
}

private static void index(RestClient client, String index, String id, Object... fields) throws IOException {
XContentBuilder document = jsonBuilder().startObject();
for (int i = 0; i < fields.length; i += 2) {
Expand Down Expand Up @@ -135,6 +163,15 @@ private static Map<String, Object> toMap(String response) {
return XContentHelper.convertToMap(JsonXContent.jsonXContent, response, false);
}

private static void ensureYellow(String index) throws IOException {
Request request = new Request("GET", "/_cluster/health/" + index);
request.addParameter("wait_for_status", "yellow");
request.addParameter("wait_for_no_relocating_shards", "true");
request.addParameter("timeout", "70s");
request.addParameter("level", "shards");
client().performRequest(request);
}

private RestClient buildLeaderClient() throws IOException {
assert runningAgainstLeaderCluster == false;
String leaderUrl = System.getProperty("tests.leader_host");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,28 @@
import org.elasticsearch.threadpool.FixedExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator;
import org.elasticsearch.xpack.ccr.action.CcrStatsAction;
import org.elasticsearch.xpack.ccr.action.CreateAndFollowIndexAction;
import org.elasticsearch.xpack.ccr.action.DeleteAutoFollowPatternAction;
import org.elasticsearch.xpack.ccr.action.FollowIndexAction;
import org.elasticsearch.xpack.ccr.action.PutAutoFollowPatternAction;
import org.elasticsearch.xpack.ccr.action.ShardChangesAction;
import org.elasticsearch.xpack.ccr.action.ShardFollowNodeTask;
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor;
import org.elasticsearch.xpack.ccr.action.TransportCcrStatsAction;
import org.elasticsearch.xpack.ccr.action.TransportDeleteAutoFollowPatternAction;
import org.elasticsearch.xpack.ccr.action.TransportPutAutoFollowPatternAction;
import org.elasticsearch.xpack.ccr.action.UnfollowIndexAction;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction;
import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction;
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory;
import org.elasticsearch.xpack.ccr.rest.RestCcrStatsAction;
import org.elasticsearch.xpack.ccr.rest.RestCreateAndFollowIndexAction;
import org.elasticsearch.xpack.ccr.rest.RestDeleteAutoFollowPatternAction;
import org.elasticsearch.xpack.ccr.rest.RestFollowIndexAction;
import org.elasticsearch.xpack.ccr.rest.RestPutAutoFollowPatternAction;
import org.elasticsearch.xpack.ccr.rest.RestUnfollowIndexAction;
import org.elasticsearch.xpack.core.XPackClientActionPlugin;
import org.elasticsearch.xpack.core.XPackPlugin;
Expand Down Expand Up @@ -119,7 +126,14 @@ public Collection<Object> createComponents(
final Environment environment,
final NodeEnvironment nodeEnvironment,
final NamedWriteableRegistry namedWriteableRegistry) {
return Collections.singleton(ccrLicenseChecker);
if (enabled == false) {
return emptyList();
}

return Arrays.asList(
ccrLicenseChecker,
new AutoFollowCoordinator(settings, client, threadPool, clusterService)
);
}

@Override
Expand All @@ -134,23 +148,34 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterServic
}

return Arrays.asList(
// internal actions
new ActionHandler<>(BulkShardOperationsAction.INSTANCE, TransportBulkShardOperationsAction.class),
new ActionHandler<>(ShardChangesAction.INSTANCE, ShardChangesAction.TransportAction.class),
// stats action
new ActionHandler<>(CcrStatsAction.INSTANCE, TransportCcrStatsAction.class),
// follow actions
new ActionHandler<>(CreateAndFollowIndexAction.INSTANCE, CreateAndFollowIndexAction.TransportAction.class),
new ActionHandler<>(FollowIndexAction.INSTANCE, FollowIndexAction.TransportAction.class),
new ActionHandler<>(ShardChangesAction.INSTANCE, ShardChangesAction.TransportAction.class),
new ActionHandler<>(UnfollowIndexAction.INSTANCE, UnfollowIndexAction.TransportAction.class));
new ActionHandler<>(UnfollowIndexAction.INSTANCE, UnfollowIndexAction.TransportAction.class),
// auto-follow actions
new ActionHandler<>(DeleteAutoFollowPatternAction.INSTANCE, TransportDeleteAutoFollowPatternAction.class),
new ActionHandler<>(PutAutoFollowPatternAction.INSTANCE, TransportPutAutoFollowPatternAction.class));
}

public List<RestHandler> getRestHandlers(Settings settings, RestController restController, ClusterSettings clusterSettings,
IndexScopedSettings indexScopedSettings, SettingsFilter settingsFilter,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster) {
return Arrays.asList(
// stats API
new RestCcrStatsAction(settings, restController),
// follow APIs
new RestCreateAndFollowIndexAction(settings, restController),
new RestFollowIndexAction(settings, restController),
new RestUnfollowIndexAction(settings, restController));
new RestUnfollowIndexAction(settings, restController),
// auto-follow APIs
new RestDeleteAutoFollowPatternAction(settings, restController),
new RestPutAutoFollowPatternAction(settings, restController));
}

public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.unit.TimeValue;

import java.util.Arrays;
import java.util.List;
Expand All @@ -32,6 +33,12 @@ private CcrSettings() {
public static final Setting<Boolean> CCR_FOLLOWING_INDEX_SETTING =
Setting.boolSetting("index.xpack.ccr.following_index", false, Setting.Property.IndexScope);

/**
* Setting for controlling the interval in between polling leader clusters to check whether there are indices to follow
*/
public static final Setting<TimeValue> CCR_AUTO_FOLLOW_POLL_INTERVAL =
Setting.timeSetting("xpack.ccr.auto_follow.poll_interval", TimeValue.timeValueMillis(2500), Property.NodeScope);

/**
* The settings defined by CCR.
*
Expand All @@ -40,7 +47,8 @@ private CcrSettings() {
static List<Setting<?>> getSettings() {
return Arrays.asList(
CCR_ENABLED_SETTING,
CCR_FOLLOWING_INDEX_SETTING);
CCR_FOLLOWING_INDEX_SETTING,
CCR_AUTO_FOLLOW_POLL_INTERVAL);
}

}
Loading

0 comments on commit c6f3ada

Please sign in to comment.