Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Transform] implement node.transform to control where to run a transform #52712

Merged
merged 16 commits into from
Mar 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

[role="xpack"]
[[data-frames-settings]]
[[transform-settings]]
=== {transforms-cap} settings in Elasticsearch
[subs="attributes"]
++++
Expand All @@ -9,17 +9,30 @@

You do not need to configure any settings to use {transforms}. It is enabled by default.

All of these settings can be added to the `elasticsearch.yml` configuration file.
The dynamic settings can also be updated across a cluster with the
All of these settings can be added to the `elasticsearch.yml` configuration file.
The dynamic settings can also be updated across a cluster with the
<<cluster-update-settings,cluster update settings API>>.

TIP: Dynamic settings take precedence over settings in the `elasticsearch.yml`
TIP: Dynamic settings take precedence over settings in the `elasticsearch.yml`
file.

[float]
[[general-data-frames-settings]]
[[general-transform-settings]]
==== General {transforms} settings

`node.transform`::
Set to `true` to identify the node as a _transform node_. The default is `false` if
either `node.data` or `xpack.transform.enabled` is `false` for the node, and `true` otherwise. +
+
If set to `false` in `elasticsearch.yml`, the node cannot run transforms. If set to
`true` but `xpack.transform.enabled` is set to `false`, the `node.transform` setting is
ignored and the node cannot run transforms. If you want to run transforms, there must be at
least one transform node in your cluster. +
+
IMPORTANT: It is advised to use the `node.transform` setting to constrain the execution
of transforms to certain nodes instead of using `xpack.transform.enabled`. On dedicated
coordinating nodes or dedicated master nodes, disable the node.transform role.

`xpack.transform.enabled`::
hendrikmuhs marked this conversation as resolved.
Show resolved Hide resolved
Set to `true` (default) to enable {transforms} on the node. +
+
Expand Down
2 changes: 1 addition & 1 deletion docs/reference/setup.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ include::settings/audit-settings.asciidoc[]

include::settings/ccr-settings.asciidoc[]

include::settings/data-frames-settings.asciidoc[]
include::settings/transform-settings.asciidoc[]

include::settings/ilm-settings.asciidoc[]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.license.RemoteClusterLicenseChecker;
import org.elasticsearch.xpack.core.transform.utils.ExceptionsHelper;

import java.io.IOException;
Expand Down Expand Up @@ -98,6 +99,10 @@ public boolean isValid() {
return queryConfig.isValid();
}

public boolean requiresRemoteCluster() {
return Arrays.stream(index).anyMatch(RemoteClusterLicenseChecker::isRemoteIndex);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeStringArray(index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,35 @@ public class TransformTaskParams extends AbstractDiffable<TransformTaskParams> i

public static final String NAME = TransformField.TASK_NAME;
public static final ParseField FREQUENCY = TransformField.FREQUENCY;
public static final ParseField REQUIRES_REMOTE = new ParseField("requires_remote");

private final String transformId;
private final Version version;
private final TimeValue frequency;
private final Boolean requiresRemote;

public static final ConstructingObjectParser<TransformTaskParams, Void> PARSER = new ConstructingObjectParser<>(NAME, true,
a -> new TransformTaskParams((String) a[0], (String) a[1], (String) a[2]));
a -> new TransformTaskParams((String) a[0], (String) a[1], (String) a[2], (Boolean) a[3]));

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), TransformField.ID);
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), TransformField.VERSION);
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), FREQUENCY);
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), REQUIRES_REMOTE);
}

private TransformTaskParams(String transformId, String version, String frequency) {
private TransformTaskParams(String transformId, String version, String frequency, Boolean remote) {
this(transformId, version == null ? null : Version.fromString(version),
frequency == null ? null : TimeValue.parseTimeValue(frequency, FREQUENCY.getPreferredName()));
frequency == null ? null : TimeValue.parseTimeValue(frequency, FREQUENCY.getPreferredName()),
remote == null ? false : remote.booleanValue()
);
}

public TransformTaskParams(String transformId, Version version, TimeValue frequency) {
public TransformTaskParams(String transformId, Version version, TimeValue frequency, boolean remote) {
this.transformId = transformId;
this.version = version == null ? Version.V_7_2_0 : version;
this.frequency = frequency;
this.requiresRemote = remote;
}

public TransformTaskParams(StreamInput in) throws IOException {
Expand All @@ -62,6 +68,11 @@ public TransformTaskParams(StreamInput in) throws IOException {
} else {
this.frequency = null;
}
if (in.getVersion().onOrAfter(Version.V_8_0_0)) { // todo: V_7_7_0
this.requiresRemote = in.readBoolean();
} else {
this.requiresRemote = false;
}
}

@Override
Expand All @@ -83,6 +94,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_7_3_0)) {
out.writeOptionalTimeValue(frequency);
}
if (out.getVersion().onOrAfter(Version.V_8_0_0)) { // todo: V_7_7_0
out.writeBoolean(requiresRemote);
}
}

@Override
Expand All @@ -93,6 +107,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (frequency != null) {
builder.field(FREQUENCY.getPreferredName(), frequency.getStringRep());
}
builder.field(REQUIRES_REMOTE.getPreferredName(), requiresRemote);
builder.endObject();
return builder;
}
Expand All @@ -109,6 +124,10 @@ public TimeValue getFrequency() {
return frequency;
}

public boolean requiresRemote() {
return requiresRemote;
}

public static TransformTaskParams fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}
Expand All @@ -127,11 +146,12 @@ public boolean equals(Object other) {

return Objects.equals(this.transformId, that.transformId)
&& Objects.equals(this.version, that.version)
&& Objects.equals(this.frequency, that.frequency);
&& Objects.equals(this.frequency, that.frequency)
&& this.requiresRemote == that.requiresRemote;
}

@Override
public int hashCode() {
return Objects.hash(transformId, version, frequency);
return Objects.hash(transformId, version, frequency, requiresRemote);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,21 @@ protected Reader<SourceConfig> instanceReader() {
return SourceConfig::new;
}

public void testRequiresRemoteCluster() {
assertFalse(new SourceConfig(new String [] {"index1", "index2", "index3"},
QueryConfigTests.randomQueryConfig()).requiresRemoteCluster());

assertTrue(new SourceConfig(new String [] {"index1", "remote2:index2", "index3"},
QueryConfigTests.randomQueryConfig()).requiresRemoteCluster());

assertTrue(new SourceConfig(new String [] {"index1", "index2", "remote3:index3"},
QueryConfigTests.randomQueryConfig()).requiresRemoteCluster());

assertTrue(new SourceConfig(new String [] {"index1", "remote2:index2", "remote3:index3"},
QueryConfigTests.randomQueryConfig()).requiresRemoteCluster());

assertTrue(new SourceConfig(new String [] {"remote1:index1"},
QueryConfigTests.randomQueryConfig()).requiresRemoteCluster());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ protected TransformTaskParams doParseInstance(XContentParser parser) throws IOEx
@Override
protected TransformTaskParams createTestInstance() {
return new TransformTaskParams(randomAlphaOfLength(10), randomBoolean() ? null : Version.CURRENT,
randomBoolean() ? null : TimeValue.timeValueMillis(randomIntBetween(1_000, 3_600_000)));
randomBoolean() ? null : TimeValue.timeValueMillis(randomIntBetween(1_000, 3_600_000)), randomBoolean());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.settings.SettingsModule;
Expand All @@ -39,6 +41,7 @@
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.FixedExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.XPackSettings;
Expand Down Expand Up @@ -137,6 +140,23 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
Setting.Property.Dynamic
);

/**
* Node attributes for transform, automatically created and retrievable via cluster state.
* These attributes should never be set directly, use the node setting counter parts instead.
*/
public static final String TRANSFORM_ENABLED_NODE_ATTR = "transform.node";
public static final String TRANSFORM_REMOTE_ENABLED_NODE_ATTR = "transform.remote_connect";

/**
* Setting whether transform (the coordinator task) can run on this node and REST API's are available,
* respects xpack.transform.enabled (for the whole plugin) as fallback
*/
public static final Setting<Boolean> TRANSFORM_ENABLED_NODE = Setting.boolSetting(
"node.transform",
settings -> Boolean.toString(XPackSettings.TRANSFORM_ENABLED.get(settings) && DiscoveryNode.isDataNode(settings)),
Property.NodeScope
);

public Transform(Settings settings) {
this.settings = settings;
this.enabled = XPackSettings.TRANSFORM_ENABLED.get(settings);
Expand Down Expand Up @@ -222,8 +242,14 @@ public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
return emptyList();
}

FixedExecutorBuilder indexing = new FixedExecutorBuilder(settings, TASK_THREAD_POOL_NAME, 4, 4, "transform.task_thread_pool",
false);
FixedExecutorBuilder indexing = new FixedExecutorBuilder(
settings,
TASK_THREAD_POOL_NAME,
4,
4,
"transform.task_thread_pool",
false
);

return Collections.singletonList(indexing);
}
Expand Down Expand Up @@ -296,13 +322,44 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(
// the transform services should have been created
assert transformServices.get() != null;

return Collections.singletonList(new TransformPersistentTasksExecutor(client, transformServices.get(), threadPool, clusterService,
settingsModule.getSettings(), expressionResolver));
return Collections.singletonList(
new TransformPersistentTasksExecutor(
client,
transformServices.get(),
threadPool,
clusterService,
settingsModule.getSettings(),
expressionResolver
)
);
}

@Override
public List<Setting<?>> getSettings() {
return Collections.singletonList(NUM_FAILURE_RETRIES_SETTING);
return Collections.unmodifiableList(Arrays.asList(TRANSFORM_ENABLED_NODE, NUM_FAILURE_RETRIES_SETTING));
}

@Override
public Settings additionalSettings() {
String transformEnabledNodeAttribute = "node.attr." + TRANSFORM_ENABLED_NODE_ATTR;
String transformRemoteEnabledNodeAttribute = "node.attr." + TRANSFORM_REMOTE_ENABLED_NODE_ATTR;

if (settings.get(transformEnabledNodeAttribute) != null || settings.get(transformRemoteEnabledNodeAttribute) != null) {
throw new IllegalArgumentException(
"Directly setting transform node attributes is not permitted, please use the documented node settings instead"
hendrikmuhs marked this conversation as resolved.
Show resolved Hide resolved
);
}

hendrikmuhs marked this conversation as resolved.
Show resolved Hide resolved
if (enabled == false) {
return Settings.EMPTY;
}

Settings.Builder additionalSettings = Settings.builder();

additionalSettings.put(transformEnabledNodeAttribute, TRANSFORM_ENABLED_NODE.get(settings));
additionalSettings.put(transformRemoteEnabledNodeAttribute, RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings));

return additionalSettings.build();
}

@Override
Expand Down
Loading