Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move rollup_index param out of RollupActionConfig #66139

Merged
merged 6 commits into from
Dec 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions docs/reference/rollup/apis/rollup-api.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ For example, you can roll up hourly data into daily or weekly summaries.

[source,console]
----
POST /my-index-000001/_rollup
POST /my-index-000001/_rollup/my-rollup-index
{
"rollup_index": "my-rollup-index",
"groups": {
"date_histogram": {
"field": "@timestamp",
Expand Down Expand Up @@ -44,7 +43,7 @@ POST /my-index-000001/_rollup
[[rollup-api-request]]
==== {api-request-title}

`PUT /<index>/_rollup`
`PUT /<index>/_rollup/<rollup-index>`

[[rollup-api-prereqs]]
==== {api-prereq-title}
Expand All @@ -61,11 +60,7 @@ Index to roll up. Cannot be a <<data-streams,data stream>> or
<<indices-aliases,index alias>>. Does not support <<multi-index,multi-target
syntax>> or wildcards (`*`).

[role="child_attributes"]
[[rollup-api-request-body]]
==== {api-request-body-title}

`rollup_index`::
`<rollup-index>`::
(Required, string)
New index that stores the rollup results. Cannot be an existing index,
a <<data-streams,data stream>>, or an <<indices-aliases,index alias>>.
Expand All @@ -75,6 +70,10 @@ The request creates this index with
`<index>` is a backing index for a data stream, this index is a backing index
for the same stream.

[role="child_attributes"]
[[rollup-api-request-body]]
==== {api-request-body-title}

`groups`::
(Required, object)
Aggregates and stores fields in the rollup.
Expand Down Expand Up @@ -105,7 +104,7 @@ Time interval used to group documents. For differences between
TIP: Choose this value carefully. You won't be able to use a smaller interval
later. For example, you can't aggregate daily rollups into hourly
summaries. However, smaller time intervals can greatly increase the size of your
`rollup_index`.
`<rollup-index>`.

`time_zone`::
(Optional, string)
Expand Down Expand Up @@ -147,7 +146,7 @@ Array of <<keyword,keyword family>> and <<number,numeric>> fields to store. If
you specify a `terms` object, this property is required.
+
TIP: Avoid storing high-cardinality fields. High-cardinality fields can greatly
increase the size of your `rollup_index`.
increase the size of your `<rollup-index>`.
=====
====

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ private RollupAction() {

public static class Request extends ActionRequest implements ToXContentObject {
private String sourceIndex;
private String rollupIndex;
private RollupActionConfig rollupConfig;

public Request(String sourceIndex, RollupActionConfig rollupConfig) {
public Request(String sourceIndex, String rollupIndex, RollupActionConfig rollupConfig) {
this.sourceIndex = sourceIndex;
this.rollupIndex = rollupIndex;
this.rollupConfig = rollupConfig;
}

Expand All @@ -47,25 +49,31 @@ public Request() {}
public Request(StreamInput in) throws IOException {
super(in);
sourceIndex = in.readString();
rollupIndex = in.readString();
rollupConfig = new RollupActionConfig(in);
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new RollupTask(id, type, action, parentTaskId, rollupConfig, headers);
return new RollupTask(id, type, action, parentTaskId, rollupIndex, rollupConfig, headers);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(sourceIndex);
out.writeString(rollupIndex);
rollupConfig.writeTo(out);
}

public String getSourceIndex() {
return sourceIndex;
}

public String getRollupIndex() {
return rollupIndex;
}

public RollupActionConfig getRollupConfig() {
return rollupConfig;
}
Expand All @@ -79,14 +87,15 @@ public ActionRequestValidationException validate() {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("source_index", sourceIndex);
builder.field("rollup_index", rollupIndex);
rollupConfig.toXContent(builder, params);
builder.endObject();
return builder;
}

@Override
public int hashCode() {
return Objects.hash(sourceIndex, rollupConfig);
return Objects.hash(sourceIndex, rollupIndex, rollupConfig);
}

@Override
Expand All @@ -99,6 +108,7 @@ public boolean equals(Object obj) {
}
Request other = (Request) obj;
return Objects.equals(sourceIndex, other.sourceIndex)
&& Objects.equals(rollupIndex, other.rollupIndex)
&& Objects.equals(rollupConfig, other.rollupConfig);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.rollup.RollupField;
import org.elasticsearch.xpack.core.rollup.job.GroupConfig;
import org.elasticsearch.xpack.core.rollup.job.MetricConfig;

Expand All @@ -31,7 +30,6 @@
import java.util.Objects;
import java.util.Set;

import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;

/**
Expand All @@ -43,59 +41,41 @@ public class RollupActionConfig implements NamedWriteable, ToXContentObject {
private static final String NAME = "xpack/rollup/action/config";
private static final TimeValue DEFAULT_TIMEOUT = TimeValue.timeValueSeconds(20);
private static final String TIMEOUT = "timeout";
private static final String ROLLUP_INDEX = "rollup_index";

private final GroupConfig groupConfig;
private final List<MetricConfig> metricsConfig;
private final TimeValue timeout;
private String rollupIndex;

private static final ConstructingObjectParser<RollupActionConfig, Void> PARSER;
static {
PARSER = new ConstructingObjectParser<>(NAME, false, (args) -> {
String rollupIndex = (String) args[0];
GroupConfig groupConfig = (GroupConfig) args[1];
GroupConfig groupConfig = (GroupConfig) args[0];
@SuppressWarnings("unchecked")
List<MetricConfig> metricsConfig = (List<MetricConfig>) args[2];
TimeValue timeout = (TimeValue) args[3];
return new RollupActionConfig(groupConfig, metricsConfig, timeout, rollupIndex);
List<MetricConfig> metricsConfig = (List<MetricConfig>) args[1];
TimeValue timeout = (TimeValue) args[2];
return new RollupActionConfig(groupConfig, metricsConfig, timeout);
});
PARSER.declareString(constructorArg(), new ParseField(ROLLUP_INDEX));
PARSER.declareObject(optionalConstructorArg(), (p, c) -> GroupConfig.fromXContent(p), new ParseField(GroupConfig.NAME));
PARSER.declareObjectArray(optionalConstructorArg(), (p, c) -> MetricConfig.fromXContent(p), new ParseField(MetricConfig.NAME));
PARSER.declareField(optionalConstructorArg(), (p, c) -> TimeValue.parseTimeValue(p.textOrNull(), TIMEOUT),
new ParseField(TIMEOUT), ObjectParser.ValueType.STRING_OR_NULL);
}

public RollupActionConfig(final GroupConfig groupConfig, final List<MetricConfig> metricsConfig,
final @Nullable TimeValue timeout, final String rollupIndex) {
if (rollupIndex == null || rollupIndex.isEmpty()) {
throw new IllegalArgumentException("Rollup index must be a non-null, non-empty string");
}
public RollupActionConfig(final GroupConfig groupConfig, final List<MetricConfig> metricsConfig, final @Nullable TimeValue timeout) {
if (groupConfig == null && (metricsConfig == null || metricsConfig.isEmpty())) {
throw new IllegalArgumentException("At least one grouping or metric must be configured");
}
this.rollupIndex = rollupIndex;
this.groupConfig = groupConfig;
this.metricsConfig = metricsConfig != null ? metricsConfig : Collections.emptyList();
this.timeout = timeout != null ? timeout : DEFAULT_TIMEOUT;
}

public RollupActionConfig(final StreamInput in) throws IOException {
rollupIndex = in.readString();
groupConfig = in.readOptionalWriteable(GroupConfig::new);
metricsConfig = in.readList(MetricConfig::new);
timeout = in.readTimeValue();
}

public String getId() {
return RollupField.NAME + "_" + rollupIndex;
}

public void setRollupIndex(String rollupIndex) {
this.rollupIndex = rollupIndex;
}

public GroupConfig getGroupConfig() {
return groupConfig;
}
Expand All @@ -108,10 +88,6 @@ public TimeValue getTimeout() {
return timeout;
}

public String getRollupIndex() {
return rollupIndex;
}

@Override
public String getWriteableName() {
return NAME;
Expand Down Expand Up @@ -142,7 +118,6 @@ public void validateMappings(final Map<String, Map<String, FieldCapabilities>> f
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
builder.startObject();
{
builder.field(ROLLUP_INDEX, rollupIndex);
if (groupConfig != null) {
builder.field(GroupConfig.NAME, groupConfig);
}
Expand All @@ -163,7 +138,6 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa

@Override
public void writeTo(final StreamOutput out) throws IOException {
out.writeString(rollupIndex);
out.writeOptionalWriteable(groupConfig);
out.writeList(metricsConfig);
out.writeTimeValue(timeout);
Expand All @@ -179,29 +153,21 @@ public boolean equals(Object other) {
}

final RollupActionConfig that = (RollupActionConfig) other;
return Objects.equals(this.rollupIndex, that.rollupIndex)
&& Objects.equals(this.groupConfig, that.groupConfig)
return Objects.equals(this.groupConfig, that.groupConfig)
&& Objects.equals(this.metricsConfig, that.metricsConfig)
&& Objects.equals(this.timeout, that.timeout);
}

@Override
public int hashCode() {
return Objects.hash(rollupIndex, groupConfig, metricsConfig, timeout);
return Objects.hash(groupConfig, metricsConfig, timeout);
}

@Override
public String toString() {
return Strings.toString(this, true, true);
}

/**
* Same as toString() but more explicitly named so the caller knows this is turned into JSON
*/
public String toJSONString() {
return toString();
}

public static RollupActionConfig fromXContent(final XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
*/
package org.elasticsearch.xpack.core.rollup.v2;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xpack.core.rollup.RollupField;
Expand All @@ -19,16 +17,21 @@
* which drives the indexing, and periodically updates it's parent PersistentTask with the indexing's current position.
*/
public class RollupTask extends CancellableTask {
private static final Logger logger = LogManager.getLogger(RollupTask.class.getName());

private String rollupIndex;
private RollupActionConfig config;
private RollupJobStatus status;

RollupTask(long id, String type, String action, TaskId parentTask, RollupActionConfig config, Map<String, String> headers) {
super(id, type, action, RollupField.NAME + "_" + config.getRollupIndex(), parentTask, headers);
RollupTask(long id, String type, String action, TaskId parentTask, String rollupIndex, RollupActionConfig config,
Map<String, String> headers) {
super(id, type, action, RollupField.NAME + "_" + rollupIndex, parentTask, headers);
this.rollupIndex = rollupIndex;
this.config = config;
}

public String getRollupIndex() {
return rollupIndex;
}

public RollupActionConfig config() {
return config;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,10 @@ protected RollupActionConfig createTestInstance() {
}

public static RollupActionConfig randomConfig(Random random) {
final String rollupIndex = randomAlphaOfLength(5);
final TimeValue timeout = random.nextBoolean() ? null : ConfigTestHelpers.randomTimeout(random);
final GroupConfig groupConfig = ConfigTestHelpers.randomGroupConfig(random);
final List<MetricConfig> metricConfigs = ConfigTestHelpers.randomMetricsConfigs(random);
return new RollupActionConfig(groupConfig, metricConfigs, timeout, rollupIndex);
return new RollupActionConfig(groupConfig, metricConfigs, timeout);
}

@Override
Expand All @@ -46,19 +45,10 @@ protected RollupActionConfig doParseInstance(final XContentParser parser) throws
return RollupActionConfig.fromXContent(parser);
}

public void testEmptyRollupIndex() {
final RollupActionConfig sample = createTestInstance();
Exception e = expectThrows(IllegalArgumentException.class, () ->
new RollupActionConfig(sample.getGroupConfig(), sample.getMetricsConfig(), sample.getTimeout(),
randomBoolean() ? null : ""));
assertThat(e.getMessage(), equalTo("Rollup index must be a non-null, non-empty string"));
}

public void testEmptyGroupAndMetrics() {
final RollupActionConfig sample = createTestInstance();
Exception e = expectThrows(IllegalArgumentException.class, () ->
new RollupActionConfig(null, randomBoolean() ? null : emptyList(), sample.getTimeout(),
sample.getRollupIndex()));
new RollupActionConfig(null, randomBoolean() ? null : emptyList(), sample.getTimeout()));
assertThat(e.getMessage(), equalTo("At least one grouping or metric must be configured"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ setup:
- do:
rollup.rollup:
index: docs
rollup_index: rollup_docs
body: >
{
"rollup_index": "rollup_docs",
"groups" : {
"date_histogram": {
"field": "timestamp",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ public class RestRollupAction extends BaseRestHandler {

@Override
public List<Route> routes() {
return List.of(new Route(POST, "/{index}/_rollup"));
return List.of(new Route(POST, "/{index}/_rollup/{rollup_index}"));
}

@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
String index = restRequest.param("index");
String rollupIndex = restRequest.param("rollup_index");
RollupActionConfig config = RollupActionConfig.fromXContent(restRequest.contentParser());
RollupAction.Request request = new RollupAction.Request(index, config);
RollupAction.Request request = new RollupAction.Request(index, rollupIndex, config);
return channel -> client.execute(RollupAction.INSTANCE, request, new RestToXContentListener<>(channel));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,13 @@ public class RollupV2Indexer extends AsyncTwoPhaseIndexer<Map<String, Object>, R
this.request = request;
this.headers = headers;
this.compositeBuilder = createCompositeBuilder(this.request.getRollupConfig());
this.tmpIndex = ".rolluptmp-" + this.request.getRollupConfig().getRollupIndex();
this.tmpIndex = ".rolluptmp-" + this.request.getRollupIndex();
this.completionListener = completionListener;
}

@Override
protected String getJobId() {
return request.getRollupConfig().getId();
return "rollup_" + request.getRollupIndex();
}

@Override
Expand Down Expand Up @@ -196,7 +196,7 @@ protected void onFailure(Exception exc) {
@Override
protected void onFinish(ActionListener<Void> listener) {
// "shrink index"
ResizeRequest resizeRequest = new ResizeRequest(request.getRollupConfig().getRollupIndex(), tmpIndex);
ResizeRequest resizeRequest = new ResizeRequest(request.getRollupIndex(), tmpIndex);
resizeRequest.setResizeType(ResizeType.CLONE);
resizeRequest.getTargetIndexRequest()
.settings(Settings.builder().put(IndexMetadata.SETTING_INDEX_HIDDEN, false).build());
Expand Down
Loading