Skip to content

Commit

Permalink
move rollup_index param out of RollupActionConfig (#66139)
Browse files Browse the repository at this point in the history
This commit moves the ownership of tracking the rollup_index from
the RollupActionConfig to the RollupAction.Request.

This is cleaner since the config should not be concerned with the
source and rollup indices.

relates #42720.

Co-authored-by: James Rodewig <[email protected]>
  • Loading branch information
talevy and jrodewig authored Dec 14, 2020
1 parent 852f6a4 commit ea2145a
Show file tree
Hide file tree
Showing 10 changed files with 54 additions and 80 deletions.
19 changes: 9 additions & 10 deletions docs/reference/rollup/apis/rollup-api.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ For example, you can roll up hourly data into daily or weekly summaries.

[source,console]
----
POST /my-index-000001/_rollup
POST /my-index-000001/_rollup/my-rollup-index
{
"rollup_index": "my-rollup-index",
"groups": {
"date_histogram": {
"field": "@timestamp",
Expand Down Expand Up @@ -44,7 +43,7 @@ POST /my-index-000001/_rollup
[[rollup-api-request]]
==== {api-request-title}

`PUT /<index>/_rollup`
`PUT /<index>/_rollup/<rollup-index>`

[[rollup-api-prereqs]]
==== {api-prereq-title}
Expand All @@ -61,11 +60,7 @@ Index to roll up. Cannot be a <<data-streams,data stream>> or
<<indices-aliases,index alias>>. Does not support <<multi-index,multi-target
syntax>> or wildcards (`*`).

[role="child_attributes"]
[[rollup-api-request-body]]
==== {api-request-body-title}

`rollup_index`::
`<rollup-index>`::
(Required, string)
New index that stores the rollup results. Cannot be an existing index,
a <<data-streams,data stream>>, or an <<indices-aliases,index alias>>.
Expand All @@ -75,6 +70,10 @@ The request creates this index with
`<index>` is a backing index for a data stream, this index is a backing index
for the same stream.

[role="child_attributes"]
[[rollup-api-request-body]]
==== {api-request-body-title}

`groups`::
(Required, object)
Aggregates and stores fields in the rollup.
Expand Down Expand Up @@ -105,7 +104,7 @@ Time interval used to group documents. For differences between
TIP: Choose this value carefully. You won't be able to use a smaller interval
later. For example, you can't aggregate daily rollups into hourly
summaries. However, smaller time intervals can greatly increase the size of your
`rollup_index`.
`<rollup-index>`.

`time_zone`::
(Optional, string)
Expand Down Expand Up @@ -147,7 +146,7 @@ Array of <<keyword,keyword family>> and <<number,numeric>> fields to store. If
you specify a `terms` object, this property is required.
+
TIP: Avoid storing high-cardinality fields. High-cardinality fields can greatly
increase the size of your `rollup_index`.
increase the size of your `<rollup-index>`.
=====
====

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ private RollupAction() {

public static class Request extends ActionRequest implements ToXContentObject {
private String sourceIndex;
private String rollupIndex;
private RollupActionConfig rollupConfig;

public Request(String sourceIndex, RollupActionConfig rollupConfig) {
public Request(String sourceIndex, String rollupIndex, RollupActionConfig rollupConfig) {
this.sourceIndex = sourceIndex;
this.rollupIndex = rollupIndex;
this.rollupConfig = rollupConfig;
}

Expand All @@ -47,25 +49,31 @@ public Request() {}
public Request(StreamInput in) throws IOException {
super(in);
sourceIndex = in.readString();
rollupIndex = in.readString();
rollupConfig = new RollupActionConfig(in);
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new RollupTask(id, type, action, parentTaskId, rollupConfig, headers);
return new RollupTask(id, type, action, parentTaskId, rollupIndex, rollupConfig, headers);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(sourceIndex);
out.writeString(rollupIndex);
rollupConfig.writeTo(out);
}

public String getSourceIndex() {
return sourceIndex;
}

public String getRollupIndex() {
return rollupIndex;
}

public RollupActionConfig getRollupConfig() {
return rollupConfig;
}
Expand All @@ -79,14 +87,15 @@ public ActionRequestValidationException validate() {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("source_index", sourceIndex);
builder.field("rollup_index", rollupIndex);
rollupConfig.toXContent(builder, params);
builder.endObject();
return builder;
}

@Override
public int hashCode() {
return Objects.hash(sourceIndex, rollupConfig);
return Objects.hash(sourceIndex, rollupIndex, rollupConfig);
}

@Override
Expand All @@ -99,6 +108,7 @@ public boolean equals(Object obj) {
}
Request other = (Request) obj;
return Objects.equals(sourceIndex, other.sourceIndex)
&& Objects.equals(rollupIndex, other.rollupIndex)
&& Objects.equals(rollupConfig, other.rollupConfig);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.rollup.RollupField;
import org.elasticsearch.xpack.core.rollup.job.GroupConfig;
import org.elasticsearch.xpack.core.rollup.job.MetricConfig;

Expand All @@ -31,7 +30,6 @@
import java.util.Objects;
import java.util.Set;

import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;

/**
Expand All @@ -43,59 +41,41 @@ public class RollupActionConfig implements NamedWriteable, ToXContentObject {
private static final String NAME = "xpack/rollup/action/config";
private static final TimeValue DEFAULT_TIMEOUT = TimeValue.timeValueSeconds(20);
private static final String TIMEOUT = "timeout";
private static final String ROLLUP_INDEX = "rollup_index";

private final GroupConfig groupConfig;
private final List<MetricConfig> metricsConfig;
private final TimeValue timeout;
private String rollupIndex;

private static final ConstructingObjectParser<RollupActionConfig, Void> PARSER;
static {
PARSER = new ConstructingObjectParser<>(NAME, false, (args) -> {
String rollupIndex = (String) args[0];
GroupConfig groupConfig = (GroupConfig) args[1];
GroupConfig groupConfig = (GroupConfig) args[0];
@SuppressWarnings("unchecked")
List<MetricConfig> metricsConfig = (List<MetricConfig>) args[2];
TimeValue timeout = (TimeValue) args[3];
return new RollupActionConfig(groupConfig, metricsConfig, timeout, rollupIndex);
List<MetricConfig> metricsConfig = (List<MetricConfig>) args[1];
TimeValue timeout = (TimeValue) args[2];
return new RollupActionConfig(groupConfig, metricsConfig, timeout);
});
PARSER.declareString(constructorArg(), new ParseField(ROLLUP_INDEX));
PARSER.declareObject(optionalConstructorArg(), (p, c) -> GroupConfig.fromXContent(p), new ParseField(GroupConfig.NAME));
PARSER.declareObjectArray(optionalConstructorArg(), (p, c) -> MetricConfig.fromXContent(p), new ParseField(MetricConfig.NAME));
PARSER.declareField(optionalConstructorArg(), (p, c) -> TimeValue.parseTimeValue(p.textOrNull(), TIMEOUT),
new ParseField(TIMEOUT), ObjectParser.ValueType.STRING_OR_NULL);
}

public RollupActionConfig(final GroupConfig groupConfig, final List<MetricConfig> metricsConfig,
final @Nullable TimeValue timeout, final String rollupIndex) {
if (rollupIndex == null || rollupIndex.isEmpty()) {
throw new IllegalArgumentException("Rollup index must be a non-null, non-empty string");
}
public RollupActionConfig(final GroupConfig groupConfig, final List<MetricConfig> metricsConfig, final @Nullable TimeValue timeout) {
if (groupConfig == null && (metricsConfig == null || metricsConfig.isEmpty())) {
throw new IllegalArgumentException("At least one grouping or metric must be configured");
}
this.rollupIndex = rollupIndex;
this.groupConfig = groupConfig;
this.metricsConfig = metricsConfig != null ? metricsConfig : Collections.emptyList();
this.timeout = timeout != null ? timeout : DEFAULT_TIMEOUT;
}

public RollupActionConfig(final StreamInput in) throws IOException {
rollupIndex = in.readString();
groupConfig = in.readOptionalWriteable(GroupConfig::new);
metricsConfig = in.readList(MetricConfig::new);
timeout = in.readTimeValue();
}

public String getId() {
return RollupField.NAME + "_" + rollupIndex;
}

public void setRollupIndex(String rollupIndex) {
this.rollupIndex = rollupIndex;
}

public GroupConfig getGroupConfig() {
return groupConfig;
}
Expand All @@ -108,10 +88,6 @@ public TimeValue getTimeout() {
return timeout;
}

public String getRollupIndex() {
return rollupIndex;
}

@Override
public String getWriteableName() {
return NAME;
Expand Down Expand Up @@ -142,7 +118,6 @@ public void validateMappings(final Map<String, Map<String, FieldCapabilities>> f
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
builder.startObject();
{
builder.field(ROLLUP_INDEX, rollupIndex);
if (groupConfig != null) {
builder.field(GroupConfig.NAME, groupConfig);
}
Expand All @@ -163,7 +138,6 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa

@Override
public void writeTo(final StreamOutput out) throws IOException {
out.writeString(rollupIndex);
out.writeOptionalWriteable(groupConfig);
out.writeList(metricsConfig);
out.writeTimeValue(timeout);
Expand All @@ -179,29 +153,21 @@ public boolean equals(Object other) {
}

final RollupActionConfig that = (RollupActionConfig) other;
return Objects.equals(this.rollupIndex, that.rollupIndex)
&& Objects.equals(this.groupConfig, that.groupConfig)
return Objects.equals(this.groupConfig, that.groupConfig)
&& Objects.equals(this.metricsConfig, that.metricsConfig)
&& Objects.equals(this.timeout, that.timeout);
}

@Override
public int hashCode() {
return Objects.hash(rollupIndex, groupConfig, metricsConfig, timeout);
return Objects.hash(groupConfig, metricsConfig, timeout);
}

@Override
public String toString() {
return Strings.toString(this, true, true);
}

/**
* Same as toString() but more explicitly named so the caller knows this is turned into JSON
*/
public String toJSONString() {
return toString();
}

public static RollupActionConfig fromXContent(final XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
*/
package org.elasticsearch.xpack.core.rollup.v2;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xpack.core.rollup.RollupField;
Expand All @@ -19,16 +17,21 @@
* which drives the indexing, and periodically updates it's parent PersistentTask with the indexing's current position.
*/
public class RollupTask extends CancellableTask {
private static final Logger logger = LogManager.getLogger(RollupTask.class.getName());

private String rollupIndex;
private RollupActionConfig config;
private RollupJobStatus status;

RollupTask(long id, String type, String action, TaskId parentTask, RollupActionConfig config, Map<String, String> headers) {
super(id, type, action, RollupField.NAME + "_" + config.getRollupIndex(), parentTask, headers);
RollupTask(long id, String type, String action, TaskId parentTask, String rollupIndex, RollupActionConfig config,
Map<String, String> headers) {
super(id, type, action, RollupField.NAME + "_" + rollupIndex, parentTask, headers);
this.rollupIndex = rollupIndex;
this.config = config;
}

public String getRollupIndex() {
return rollupIndex;
}

public RollupActionConfig config() {
return config;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,10 @@ protected RollupActionConfig createTestInstance() {
}

public static RollupActionConfig randomConfig(Random random) {
final String rollupIndex = randomAlphaOfLength(5);
final TimeValue timeout = random.nextBoolean() ? null : ConfigTestHelpers.randomTimeout(random);
final GroupConfig groupConfig = ConfigTestHelpers.randomGroupConfig(random);
final List<MetricConfig> metricConfigs = ConfigTestHelpers.randomMetricsConfigs(random);
return new RollupActionConfig(groupConfig, metricConfigs, timeout, rollupIndex);
return new RollupActionConfig(groupConfig, metricConfigs, timeout);
}

@Override
Expand All @@ -46,19 +45,10 @@ protected RollupActionConfig doParseInstance(final XContentParser parser) throws
return RollupActionConfig.fromXContent(parser);
}

public void testEmptyRollupIndex() {
final RollupActionConfig sample = createTestInstance();
Exception e = expectThrows(IllegalArgumentException.class, () ->
new RollupActionConfig(sample.getGroupConfig(), sample.getMetricsConfig(), sample.getTimeout(),
randomBoolean() ? null : ""));
assertThat(e.getMessage(), equalTo("Rollup index must be a non-null, non-empty string"));
}

public void testEmptyGroupAndMetrics() {
final RollupActionConfig sample = createTestInstance();
Exception e = expectThrows(IllegalArgumentException.class, () ->
new RollupActionConfig(null, randomBoolean() ? null : emptyList(), sample.getTimeout(),
sample.getRollupIndex()));
new RollupActionConfig(null, randomBoolean() ? null : emptyList(), sample.getTimeout()));
assertThat(e.getMessage(), equalTo("At least one grouping or metric must be configured"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ setup:
- do:
rollup.rollup:
index: docs
rollup_index: rollup_docs
body: >
{
"rollup_index": "rollup_docs",
"groups" : {
"date_histogram": {
"field": "timestamp",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ public class RestRollupAction extends BaseRestHandler {

@Override
public List<Route> routes() {
return List.of(new Route(POST, "/{index}/_rollup"));
return List.of(new Route(POST, "/{index}/_rollup/{rollup_index}"));
}

@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
String index = restRequest.param("index");
String rollupIndex = restRequest.param("rollup_index");
RollupActionConfig config = RollupActionConfig.fromXContent(restRequest.contentParser());
RollupAction.Request request = new RollupAction.Request(index, config);
RollupAction.Request request = new RollupAction.Request(index, rollupIndex, config);
return channel -> client.execute(RollupAction.INSTANCE, request, new RestToXContentListener<>(channel));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,13 @@ public class RollupV2Indexer extends AsyncTwoPhaseIndexer<Map<String, Object>, R
this.request = request;
this.headers = headers;
this.compositeBuilder = createCompositeBuilder(this.request.getRollupConfig());
this.tmpIndex = ".rolluptmp-" + this.request.getRollupConfig().getRollupIndex();
this.tmpIndex = ".rolluptmp-" + this.request.getRollupIndex();
this.completionListener = completionListener;
}

@Override
protected String getJobId() {
return request.getRollupConfig().getId();
return "rollup_" + request.getRollupIndex();
}

@Override
Expand Down Expand Up @@ -196,7 +196,7 @@ protected void onFailure(Exception exc) {
@Override
protected void onFinish(ActionListener<Void> listener) {
// "shrink index"
ResizeRequest resizeRequest = new ResizeRequest(request.getRollupConfig().getRollupIndex(), tmpIndex);
ResizeRequest resizeRequest = new ResizeRequest(request.getRollupIndex(), tmpIndex);
resizeRequest.setResizeType(ResizeType.CLONE);
resizeRequest.getTargetIndexRequest()
.settings(Settings.builder().put(IndexMetadata.SETTING_INDEX_HIDDEN, false).build());
Expand Down
Loading

0 comments on commit ea2145a

Please sign in to comment.