From 776718e2c40fd2d2df8758ea22975085b4435202 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 15 Aug 2023 13:48:59 +0700 Subject: [PATCH] Rename many rollup usages in downsample codebase. --- .../xpack/core/XPackClientPlugin.java | 4 +- .../DownsampleAfterBulkInfo.java} | 12 +- .../DownsampleBeforeBulkInfo.java} | 12 +- .../DownsampleBulkInfo.java} | 12 +- .../DownsampleBulkStats.java} | 10 +- .../downsample/DownsampleIndexerAction.java | 5 +- .../DownsampleShardIndexerStatus.java} | 8 +- .../DownsampleShardPersistentTaskState.java | 126 +++++ .../DownsampleShardStatus.java} | 114 ++--- .../DownsampleShardTask.java} | 64 +-- .../RollupShardPersistentTaskState.java | 126 ----- .../DownsampleActionConfigTests.java | 3 +- ...nsampleShardPersistentTaskStateTests.java} | 28 +- ...ownsampleShardStatusSerializingTests.java} | 34 +- .../downsample/ILMDownsampleDisruptionIT.java | 10 +- .../AggregateMetricFieldSerializer.java | 10 +- .../AggregateMetricFieldValueFetcher.java | 12 +- .../xpack/downsample/Downsample.java | 19 +- ...dexer.java => DownsampleShardIndexer.java} | 182 ++++---- ...a => DownsampleShardIndexerException.java} | 6 +- ...ownsampleShardPersistentTaskExecutor.java} | 50 +- .../downsample/DownsampleShardTaskParams.java | 26 +- .../xpack/downsample/FieldValueFetcher.java | 12 +- .../downsample/TransportDownsampleAction.java | 22 +- .../TransportDownsampleIndexerAction.java | 16 +- .../DownsampleActionSingleNodeTests.java | 434 +++++++++--------- 26 files changed, 693 insertions(+), 664 deletions(-) rename x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/{rollup/action/RollupAfterBulkInfo.java => downsample/DownsampleAfterBulkInfo.java} (90%) rename x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/{rollup/action/RollupBeforeBulkInfo.java => downsample/DownsampleBeforeBulkInfo.java} (83%) rename x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/{rollup/action/RollupBulkInfo.java => downsample/DownsampleBulkInfo.java} (91%) rename x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/{rollup/action/RollupBulkStats.java => downsample/DownsampleBulkStats.java} (88%) rename x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/{rollup/action/RollupShardIndexerStatus.java => downsample/DownsampleShardIndexerStatus.java} (71%) create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/DownsampleShardPersistentTaskState.java rename x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/{rollup/action/RollupShardStatus.java => downsample/DownsampleShardStatus.java} (77%) rename x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/{rollup/action/RollupShardTask.java => downsample/DownsampleShardTask.java} (73%) delete mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardPersistentTaskState.java rename x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/{rollup => downsample}/DownsampleActionConfigTests.java (94%) rename x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/{rollup/RollupShardPersistentTaskStateTests.java => downsample/DownsampleShardPersistentTaskStateTests.java} (53%) rename x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/{rollup/action/RollupShardStatusSerializingTests.java => downsample/DownsampleShardStatusSerializingTests.java} (73%) rename x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/{RollupShardIndexer.java => DownsampleShardIndexer.java} (75%) rename x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/{RollupShardIndexerException.java => DownsampleShardIndexerException.java} (68%) rename x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/{RollupShardPersistentTaskExecutor.java => DownsampleShardPersistentTaskExecutor.java} (79%) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java index 129d2abc76d06..bfa9b42211092 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java @@ -41,6 +41,7 @@ import org.elasticsearch.xpack.core.datastreams.DataStreamFeatureSetUsage; import org.elasticsearch.xpack.core.datastreams.DataStreamLifecycleFeatureSetUsage; import org.elasticsearch.xpack.core.downsample.DownsampleIndexerAction; +import org.elasticsearch.xpack.core.downsample.DownsampleShardStatus; import org.elasticsearch.xpack.core.enrich.EnrichFeatureSetUsage; import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus; import org.elasticsearch.xpack.core.eql.EqlFeatureSetUsage; @@ -153,7 +154,6 @@ import org.elasticsearch.xpack.core.rollup.action.GetRollupJobsAction; import org.elasticsearch.xpack.core.rollup.action.PutRollupJobAction; import org.elasticsearch.xpack.core.rollup.action.RollupSearchAction; -import org.elasticsearch.xpack.core.rollup.action.RollupShardStatus; import org.elasticsearch.xpack.core.rollup.action.StartRollupJobAction; import org.elasticsearch.xpack.core.rollup.action.StopRollupJobAction; import org.elasticsearch.xpack.core.rollup.job.RollupJob; @@ -462,7 +462,7 @@ public List getNamedWriteables() { new NamedWriteableRegistry.Entry(PersistentTaskParams.class, RollupJob.NAME, RollupJob::new), new NamedWriteableRegistry.Entry(Task.Status.class, RollupJobStatus.NAME, RollupJobStatus::new), new NamedWriteableRegistry.Entry(PersistentTaskState.class, RollupJobStatus.NAME, RollupJobStatus::new), - new NamedWriteableRegistry.Entry(Task.Status.class, RollupShardStatus.NAME, RollupShardStatus::new), + new NamedWriteableRegistry.Entry(Task.Status.class, DownsampleShardStatus.NAME, DownsampleShardStatus::new), // ccr new NamedWriteableRegistry.Entry(AutoFollowMetadata.class, AutoFollowMetadata.TYPE, AutoFollowMetadata::new), new NamedWriteableRegistry.Entry(Metadata.Custom.class, AutoFollowMetadata.TYPE, AutoFollowMetadata::new), diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupAfterBulkInfo.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/DownsampleAfterBulkInfo.java similarity index 90% rename from x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupAfterBulkInfo.java rename to x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/DownsampleAfterBulkInfo.java index fbb1c510f63b8..4b18f3a3fee6f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupAfterBulkInfo.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/DownsampleAfterBulkInfo.java @@ -5,7 +5,7 @@ * 2.0. */ -package org.elasticsearch.xpack.core.rollup.action; +package org.elasticsearch.xpack.core.downsample; import org.elasticsearch.common.io.stream.NamedWriteable; import org.elasticsearch.common.io.stream.StreamInput; @@ -22,7 +22,7 @@ * This class includes statistics collected by the downsampling task after * a bulk indexing operation ends. */ -public record RollupAfterBulkInfo( +public record DownsampleAfterBulkInfo( long currentTimeMillis, long executionId, long lastIngestTookInMillis, @@ -40,11 +40,11 @@ public record RollupAfterBulkInfo( private static final ParseField HAS_FAILURES = new ParseField("has_failures"); private static final ParseField REST_STATUS_CODE = new ParseField("rest_status_code"); - private static final ConstructingObjectParser PARSER; + private static final ConstructingObjectParser PARSER; static { PARSER = new ConstructingObjectParser<>( NAME, - args -> new RollupAfterBulkInfo( + args -> new DownsampleAfterBulkInfo( (Long) args[0], (Long) args[1], (Long) args[2], @@ -62,7 +62,7 @@ public record RollupAfterBulkInfo( PARSER.declareInt(ConstructingObjectParser.constructorArg(), REST_STATUS_CODE); } - public RollupAfterBulkInfo(final StreamInput in) throws IOException { + public DownsampleAfterBulkInfo(final StreamInput in) throws IOException { this(in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readBoolean(), in.readVInt()); } @@ -93,7 +93,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder.endObject(); } - public static RollupAfterBulkInfo fromXContent(XContentParser parser) throws IOException { + public static DownsampleAfterBulkInfo fromXContent(XContentParser parser) throws IOException { return PARSER.parse(parser, null); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupBeforeBulkInfo.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/DownsampleBeforeBulkInfo.java similarity index 83% rename from x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupBeforeBulkInfo.java rename to x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/DownsampleBeforeBulkInfo.java index 4a5955319a8ad..efe8b79510daa 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupBeforeBulkInfo.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/DownsampleBeforeBulkInfo.java @@ -5,7 +5,7 @@ * 2.0. */ -package org.elasticsearch.xpack.core.rollup.action; +package org.elasticsearch.xpack.core.downsample; import org.elasticsearch.common.io.stream.NamedWriteable; import org.elasticsearch.common.io.stream.StreamInput; @@ -22,7 +22,7 @@ * This class includes statistics collected by the downsampling task before * a bulk indexing operation starts. */ -public record RollupBeforeBulkInfo(long currentTimeMillis, long executionId, long estimatedSizeInBytes, int numberOfActions) +public record DownsampleBeforeBulkInfo(long currentTimeMillis, long executionId, long estimatedSizeInBytes, int numberOfActions) implements NamedWriteable, ToXContentObject { @@ -34,11 +34,11 @@ public record RollupBeforeBulkInfo(long currentTimeMillis, long executionId, lon private static final ParseField ESTIMATED_SIZE_IN_BYTES = new ParseField("estimated_size_in_bytes"); private static final ParseField NUMBER_OF_ACTIONS = new ParseField("number_of_actions"); - private static final ConstructingObjectParser PARSER; + private static final ConstructingObjectParser PARSER; static { PARSER = new ConstructingObjectParser<>( NAME, - args -> new RollupBeforeBulkInfo((Long) args[0], (Long) args[1], (Long) args[2], (Integer) args[3]) + args -> new DownsampleBeforeBulkInfo((Long) args[0], (Long) args[1], (Long) args[2], (Integer) args[3]) ); PARSER.declareLong(ConstructingObjectParser.constructorArg(), CURRENT_TIME_IN_MILLIS); @@ -47,7 +47,7 @@ public record RollupBeforeBulkInfo(long currentTimeMillis, long executionId, lon PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_ACTIONS); } - public RollupBeforeBulkInfo(final StreamInput in) throws IOException { + public DownsampleBeforeBulkInfo(final StreamInput in) throws IOException { this(in.readVLong(), in.readVLong(), in.readVLong(), in.readVInt()); } @@ -74,7 +74,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder.endObject(); } - public static RollupBeforeBulkInfo fromXContent(XContentParser parser) throws IOException { + public static DownsampleBeforeBulkInfo fromXContent(XContentParser parser) throws IOException { return PARSER.parse(parser, null); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupBulkInfo.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/DownsampleBulkInfo.java similarity index 91% rename from x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupBulkInfo.java rename to x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/DownsampleBulkInfo.java index 105001d59f4c7..570121bafa7c2 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupBulkInfo.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/DownsampleBulkInfo.java @@ -5,7 +5,7 @@ * 2.0. */ -package org.elasticsearch.xpack.core.rollup.action; +package org.elasticsearch.xpack.core.downsample; import org.elasticsearch.common.io.stream.NamedWriteable; import org.elasticsearch.common.io.stream.StreamInput; @@ -22,7 +22,7 @@ * This class includes statistics collected by the downsampling task * for bulk indexing operations. */ -public record RollupBulkInfo( +public record DownsampleBulkInfo( long totalBulkCount, long bulkIngestSumMillis, long maxBulkIngestMillis, @@ -42,11 +42,11 @@ public record RollupBulkInfo( private static final ParseField MAX_BULK_TOOK_MILLIS = new ParseField("max_bulk_took_millis"); private static final ParseField MIN_BULK_TOOK_MILLIS = new ParseField("min_bulk_took_millis"); - private static final ConstructingObjectParser PARSER; + private static final ConstructingObjectParser PARSER; static { PARSER = new ConstructingObjectParser<>( NAME, - args -> new RollupBulkInfo( + args -> new DownsampleBulkInfo( (Long) args[0], (Long) args[1], (Long) args[2], @@ -66,11 +66,11 @@ public record RollupBulkInfo( PARSER.declareLong(ConstructingObjectParser.constructorArg(), MIN_BULK_TOOK_MILLIS); } - public RollupBulkInfo(final StreamInput in) throws IOException { + public DownsampleBulkInfo(final StreamInput in) throws IOException { this(in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong()); } - public static RollupBulkInfo fromXContext(XContentParser parser) throws IOException { + public static DownsampleBulkInfo fromXContext(XContentParser parser) throws IOException { return PARSER.parse(parser, null); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupBulkStats.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/DownsampleBulkStats.java similarity index 88% rename from x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupBulkStats.java rename to x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/DownsampleBulkStats.java index fea1a88fe793f..9ab96bad32db7 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupBulkStats.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/DownsampleBulkStats.java @@ -5,11 +5,11 @@ * 2.0. */ -package org.elasticsearch.xpack.core.rollup.action; +package org.elasticsearch.xpack.core.downsample; import java.util.concurrent.atomic.AtomicLong; -public class RollupBulkStats { +public class DownsampleBulkStats { private final AtomicLong totalBulkCount = new AtomicLong(0); private final AtomicLong bulkIngestSumMillis = new AtomicLong(0); private final AtomicLong maxBulkIngestMillis = new AtomicLong(-1); @@ -39,10 +39,10 @@ private static long max(long newValue, long existingValue) { } /** - * @return An instance of {@link RollupBulkInfo} including rollup bulk indexing statistics. + * @return An instance of {@link DownsampleBulkInfo} including rollup bulk indexing statistics. */ - public RollupBulkInfo getRollupBulkInfo() { - return new RollupBulkInfo( + public DownsampleBulkInfo getRollupBulkInfo() { + return new DownsampleBulkInfo( this.totalBulkCount.get(), this.bulkIngestSumMillis.get(), Math.max(0, this.maxBulkIngestMillis.get()), diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/DownsampleIndexerAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/DownsampleIndexerAction.java index 770b0f8ceff2d..cd4a3b430c26f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/DownsampleIndexerAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/DownsampleIndexerAction.java @@ -23,7 +23,6 @@ import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xcontent.ToXContentObject; import org.elasticsearch.xcontent.XContentBuilder; -import org.elasticsearch.xpack.core.rollup.action.RollupShardTask; import java.io.IOException; import java.util.Arrays; @@ -249,7 +248,7 @@ public ShardDownsampleRequest(final ShardId shardId, final Request request) { this.request = request; } - public String getRollupIndex() { + public String getDownsampleIndex() { return request.getDownsampleRequest().getTargetIndex(); } @@ -277,7 +276,7 @@ public void writeTo(StreamOutput out) throws IOException { @Override public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { - return new RollupShardTask( + return new DownsampleShardTask( id, type, action, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardIndexerStatus.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/DownsampleShardIndexerStatus.java similarity index 71% rename from x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardIndexerStatus.java rename to x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/DownsampleShardIndexerStatus.java index 25c7efc657fc1..adfb695e39212 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardIndexerStatus.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/DownsampleShardIndexerStatus.java @@ -5,7 +5,7 @@ * 2.0. */ -package org.elasticsearch.xpack.core.rollup.action; +package org.elasticsearch.xpack.core.downsample; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -16,15 +16,15 @@ /** * Status of the rollup indexer task */ -public enum RollupShardIndexerStatus implements Writeable { +public enum DownsampleShardIndexerStatus implements Writeable { INITIALIZED, STARTED, FAILED, COMPLETED, CANCELLED; - public static RollupShardIndexerStatus readFromStream(final StreamInput in) throws IOException { - return in.readEnum(RollupShardIndexerStatus.class); + public static DownsampleShardIndexerStatus readFromStream(final StreamInput in) throws IOException { + return in.readEnum(DownsampleShardIndexerStatus.class); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/DownsampleShardPersistentTaskState.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/DownsampleShardPersistentTaskState.java new file mode 100644 index 0000000000000..6c7cf4be5a114 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/DownsampleShardPersistentTaskState.java @@ -0,0 +1,126 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.downsample; + +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.persistent.PersistentTaskState; +import org.elasticsearch.xcontent.ObjectParser; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; + +/** + * @param downsampleShardIndexerStatus An instance of {@link DownsampleShardIndexerStatus} with the downsampleShardIndexerStatus of + * the downsample task + * @param tsid The latest successfully processed tsid component of a tuple (tsid, timestamp) + */ +public record DownsampleShardPersistentTaskState(DownsampleShardIndexerStatus downsampleShardIndexerStatus, BytesRef tsid) + implements + PersistentTaskState { + + public static final String NAME = DownsampleShardTask.TASK_NAME; + private static final ParseField ROLLUP_SHARD_INDEXER_STATUS = new ParseField("status"); + private static final ParseField TSID = new ParseField("tsid"); + + public static final ObjectParser PARSER = new ObjectParser<>(NAME); + + static { + PARSER.declareField( + DownsampleShardPersistentTaskState.Builder::status, + (p, c) -> DownsampleShardIndexerStatus.valueOf(p.textOrNull()), + ROLLUP_SHARD_INDEXER_STATUS, + ObjectParser.ValueType.STRING + ); + PARSER.declareField( + DownsampleShardPersistentTaskState.Builder::tsid, + (p, c) -> new BytesRef(p.textOrNull()), + TSID, + ObjectParser.ValueType.STRING + ); + } + + public DownsampleShardPersistentTaskState(final StreamInput in) throws IOException { + this(DownsampleShardIndexerStatus.readFromStream(in), in.readBytesRef()); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(ROLLUP_SHARD_INDEXER_STATUS.getPreferredName(), downsampleShardIndexerStatus); + if (tsid != null) { + builder.field(TSID.getPreferredName(), tsid.utf8ToString()); + } + return builder.endObject(); + } + + @Override + public String getWriteableName() { + return DownsampleShardTask.TASK_NAME; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + downsampleShardIndexerStatus.writeTo(out); + out.writeBytesRef(tsid); + } + + public DownsampleShardIndexerStatus downsampleShardIndexerStatus() { + return downsampleShardIndexerStatus; + } + + public boolean done() { + return DownsampleShardIndexerStatus.COMPLETED.equals(downsampleShardIndexerStatus) + || DownsampleShardIndexerStatus.CANCELLED.equals(downsampleShardIndexerStatus) + || DownsampleShardIndexerStatus.FAILED.equals(downsampleShardIndexerStatus); + } + + public boolean started() { + return DownsampleShardIndexerStatus.STARTED.equals(downsampleShardIndexerStatus); + } + + public boolean cancelled() { + return DownsampleShardIndexerStatus.CANCELLED.equals(downsampleShardIndexerStatus); + } + + public boolean failed() { + return DownsampleShardIndexerStatus.FAILED.equals(downsampleShardIndexerStatus); + } + + public static DownsampleShardPersistentTaskState readFromStream(final StreamInput in) throws IOException { + return new DownsampleShardPersistentTaskState(DownsampleShardIndexerStatus.readFromStream(in), in.readBytesRef()); + } + + public static DownsampleShardPersistentTaskState fromXContent(final XContentParser parser) throws IOException { + final DownsampleShardPersistentTaskState.Builder builder = new DownsampleShardPersistentTaskState.Builder(); + PARSER.parse(parser, builder, null); + return builder.build(); + } + + public static class Builder { + private DownsampleShardIndexerStatus status; + private BytesRef tsid; + + public Builder status(final DownsampleShardIndexerStatus status) { + this.status = status; + return this; + } + + public Builder tsid(final BytesRef tsid) { + this.tsid = tsid; + return this; + } + + public DownsampleShardPersistentTaskState build() { + return new DownsampleShardPersistentTaskState(status, tsid); + } + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatus.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/DownsampleShardStatus.java similarity index 77% rename from x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatus.java rename to x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/DownsampleShardStatus.java index c4d227f057842..b19afa5ee8c74 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatus.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/DownsampleShardStatus.java @@ -5,7 +5,7 @@ * 2.0. */ -package org.elasticsearch.xpack.core.rollup.action; +package org.elasticsearch.xpack.core.downsample; import org.elasticsearch.TransportVersion; import org.elasticsearch.common.Strings; @@ -22,7 +22,7 @@ import java.time.Instant; import java.util.Objects; -public class RollupShardStatus implements Task.Status { +public class DownsampleShardStatus implements Task.Status { public static final String NAME = "rollup-index-shard"; private static final ParseField SHARD_FIELD = new ParseField("shard"); private static final ParseField START_TIME_FIELD = new ParseField("start_time"); @@ -57,12 +57,12 @@ public class RollupShardStatus implements Task.Status { private final long indexEndTimeMillis; private final long docsProcessed; private final float docsProcessedPercentage; - private final RollupBulkInfo rollupBulkInfo; - private final RollupBeforeBulkInfo rollupBeforeBulkInfo; - private final RollupAfterBulkInfo rollupAfterBulkInfo; - private final RollupShardIndexerStatus rollupShardIndexerStatus; + private final DownsampleBulkInfo downsampleBulkInfo; + private final DownsampleBeforeBulkInfo downsampleBeforeBulkInfo; + private final DownsampleAfterBulkInfo downsampleAfterBulkInfo; + private final DownsampleShardIndexerStatus downsampleShardIndexerStatus; - private static final ConstructingObjectParser PARSER; + private static final ConstructingObjectParser PARSER; static { PARSER = new ConstructingObjectParser<>(NAME, args -> { final ShardId _shardId = ShardId.fromString((String) args[0]); @@ -79,11 +79,11 @@ public class RollupShardStatus implements Task.Status { final Long _indexEndTimeMillis = (Long) args[11]; final Long _docsProcessed = (Long) args[12]; final Float _docsProcessedPercentage = (Float) args[13]; - final RollupBulkInfo _rollupBulkInfo = (RollupBulkInfo) args[14]; - final RollupBeforeBulkInfo _rollupBeforeBulkInfo = (RollupBeforeBulkInfo) args[15]; - final RollupAfterBulkInfo _rollupAfterBulkInfo = (RollupAfterBulkInfo) args[16]; - final RollupShardIndexerStatus _rollupShardIndexerStatus = RollupShardIndexerStatus.valueOf((String) args[17]); - return new RollupShardStatus( + final DownsampleBulkInfo _downsampleBulkInfo = (DownsampleBulkInfo) args[14]; + final DownsampleBeforeBulkInfo _downsampleBeforeBulkInfo = (DownsampleBeforeBulkInfo) args[15]; + final DownsampleAfterBulkInfo _downsampleAfterBulkInfo = (DownsampleAfterBulkInfo) args[16]; + final DownsampleShardIndexerStatus _downsampleShardIndexerStatus = DownsampleShardIndexerStatus.valueOf((String) args[17]); + return new DownsampleShardStatus( _shardId, _rollupStart, _numReceived, @@ -98,10 +98,10 @@ public class RollupShardStatus implements Task.Status { _indexEndTimeMillis, _docsProcessed, _docsProcessedPercentage, - _rollupBulkInfo, - _rollupBeforeBulkInfo, - _rollupAfterBulkInfo, - _rollupShardIndexerStatus + _downsampleBulkInfo, + _downsampleBeforeBulkInfo, + _downsampleAfterBulkInfo, + _downsampleShardIndexerStatus ); }); @@ -119,21 +119,25 @@ public class RollupShardStatus implements Task.Status { PARSER.declareLong(ConstructingObjectParser.constructorArg(), INDEX_END_TIME_MILLIS); PARSER.declareLong(ConstructingObjectParser.constructorArg(), DOCS_PROCESSED); PARSER.declareFloat(ConstructingObjectParser.constructorArg(), DOCS_PROCESSED_PERCENTAGE); - PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> RollupBulkInfo.fromXContext(p), ROLLUP_BULK_INFO); PARSER.declareObject( ConstructingObjectParser.optionalConstructorArg(), - (p, c) -> RollupBeforeBulkInfo.fromXContent(p), + (p, c) -> DownsampleBulkInfo.fromXContext(p), + ROLLUP_BULK_INFO + ); + PARSER.declareObject( + ConstructingObjectParser.optionalConstructorArg(), + (p, c) -> DownsampleBeforeBulkInfo.fromXContent(p), ROLLUP_BEFORE_BULK_INFO ); PARSER.declareObject( ConstructingObjectParser.optionalConstructorArg(), - (p, c) -> RollupAfterBulkInfo.fromXContent(p), + (p, c) -> DownsampleAfterBulkInfo.fromXContent(p), ROLLUP_AFTER_BULK_INFO ); PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), ROLLUP_SHARD_INDEXER_STATUS); } - public RollupShardStatus(StreamInput in) throws IOException { + public DownsampleShardStatus(StreamInput in) throws IOException { shardId = new ShardId(in); rollupStart = in.readLong(); numReceived = in.readLong(); @@ -149,10 +153,10 @@ public RollupShardStatus(StreamInput in) throws IOException { indexEndTimeMillis = in.readVLong(); docsProcessed = in.readVLong(); docsProcessedPercentage = in.readFloat(); - rollupBulkInfo = new RollupBulkInfo(in); - rollupBeforeBulkInfo = new RollupBeforeBulkInfo(in); - rollupAfterBulkInfo = new RollupAfterBulkInfo(in); - rollupShardIndexerStatus = in.readEnum(RollupShardIndexerStatus.class); + downsampleBulkInfo = new DownsampleBulkInfo(in); + downsampleBeforeBulkInfo = new DownsampleBeforeBulkInfo(in); + downsampleAfterBulkInfo = new DownsampleAfterBulkInfo(in); + downsampleShardIndexerStatus = in.readEnum(DownsampleShardIndexerStatus.class); } else { totalShardDocCount = -1; lastSourceTimestamp = -1; @@ -162,14 +166,14 @@ public RollupShardStatus(StreamInput in) throws IOException { indexEndTimeMillis = -1; docsProcessed = 0; docsProcessedPercentage = 0; - rollupBulkInfo = null; - rollupBeforeBulkInfo = null; - rollupAfterBulkInfo = null; - rollupShardIndexerStatus = null; + downsampleBulkInfo = null; + downsampleBeforeBulkInfo = null; + downsampleAfterBulkInfo = null; + downsampleShardIndexerStatus = null; } } - public RollupShardStatus( + public DownsampleShardStatus( ShardId shardId, long rollupStart, long numReceived, @@ -184,10 +188,10 @@ public RollupShardStatus( long indexEndTimeMillis, long docsProcessed, float docsProcessedPercentage, - final RollupBulkInfo rollupBulkInfo, - final RollupBeforeBulkInfo rollupBeforeBulkInfo, - final RollupAfterBulkInfo rollupAfterBulkInfo, - final RollupShardIndexerStatus rollupShardIndexerStatus + final DownsampleBulkInfo downsampleBulkInfo, + final DownsampleBeforeBulkInfo downsampleBeforeBulkInfo, + final DownsampleAfterBulkInfo downsampleAfterBulkInfo, + final DownsampleShardIndexerStatus downsampleShardIndexerStatus ) { this.shardId = shardId; this.rollupStart = rollupStart; @@ -203,13 +207,13 @@ public RollupShardStatus( this.indexEndTimeMillis = indexEndTimeMillis; this.docsProcessed = docsProcessed; this.docsProcessedPercentage = docsProcessedPercentage; - this.rollupBulkInfo = rollupBulkInfo; - this.rollupBeforeBulkInfo = rollupBeforeBulkInfo; - this.rollupAfterBulkInfo = rollupAfterBulkInfo; - this.rollupShardIndexerStatus = rollupShardIndexerStatus; + this.downsampleBulkInfo = downsampleBulkInfo; + this.downsampleBeforeBulkInfo = downsampleBeforeBulkInfo; + this.downsampleAfterBulkInfo = downsampleAfterBulkInfo; + this.downsampleShardIndexerStatus = downsampleShardIndexerStatus; } - public static RollupShardStatus fromXContent(XContentParser parser) throws IOException { + public static DownsampleShardStatus fromXContent(XContentParser parser) throws IOException { return PARSER.parse(parser, null); } @@ -230,10 +234,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(INDEX_END_TIME_MILLIS.getPreferredName(), indexEndTimeMillis); builder.field(DOCS_PROCESSED.getPreferredName(), docsProcessed); builder.field(DOCS_PROCESSED_PERCENTAGE.getPreferredName(), docsProcessedPercentage); - rollupBulkInfo.toXContent(builder, params); - rollupBeforeBulkInfo.toXContent(builder, params); - rollupAfterBulkInfo.toXContent(builder, params); - builder.field(ROLLUP_SHARD_INDEXER_STATUS.getPreferredName(), rollupShardIndexerStatus); + downsampleBulkInfo.toXContent(builder, params); + downsampleBeforeBulkInfo.toXContent(builder, params); + downsampleAfterBulkInfo.toXContent(builder, params); + builder.field(ROLLUP_SHARD_INDEXER_STATUS.getPreferredName(), downsampleShardIndexerStatus); return builder.endObject(); } @@ -260,10 +264,10 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVLong(indexEndTimeMillis); out.writeVLong(docsProcessed); out.writeFloat(docsProcessedPercentage); - rollupBulkInfo.writeTo(out); - rollupBeforeBulkInfo.writeTo(out); - rollupAfterBulkInfo.writeTo(out); - out.writeEnum(rollupShardIndexerStatus); + downsampleBulkInfo.writeTo(out); + downsampleBeforeBulkInfo.writeTo(out); + downsampleAfterBulkInfo.writeTo(out); + out.writeEnum(downsampleShardIndexerStatus); } else { out.writeBoolean(false); } @@ -273,7 +277,7 @@ public void writeTo(StreamOutput out) throws IOException { public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; - RollupShardStatus that = (RollupShardStatus) o; + DownsampleShardStatus that = (DownsampleShardStatus) o; return rollupStart == that.rollupStart && numReceived == that.numReceived && numSent == that.numSent @@ -289,10 +293,10 @@ public boolean equals(Object o) { && docsProcessedPercentage == that.docsProcessedPercentage && Objects.equals(shardId.getIndexName(), that.shardId.getIndexName()) && Objects.equals(shardId.id(), that.shardId.id()) - && Objects.equals(rollupBulkInfo, that.rollupBulkInfo) - && Objects.equals(rollupBeforeBulkInfo, that.rollupBeforeBulkInfo) - && Objects.equals(rollupAfterBulkInfo, that.rollupAfterBulkInfo) - && Objects.equals(rollupShardIndexerStatus, that.rollupShardIndexerStatus); + && Objects.equals(downsampleBulkInfo, that.downsampleBulkInfo) + && Objects.equals(downsampleBeforeBulkInfo, that.downsampleBeforeBulkInfo) + && Objects.equals(downsampleAfterBulkInfo, that.downsampleAfterBulkInfo) + && Objects.equals(downsampleShardIndexerStatus, that.downsampleShardIndexerStatus); } @Override @@ -313,10 +317,10 @@ public int hashCode() { indexEndTimeMillis, docsProcessed, docsProcessedPercentage, - rollupBulkInfo, - rollupBeforeBulkInfo, - rollupAfterBulkInfo, - rollupShardIndexerStatus + downsampleBulkInfo, + downsampleBeforeBulkInfo, + downsampleAfterBulkInfo, + downsampleShardIndexerStatus ); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardTask.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/DownsampleShardTask.java similarity index 73% rename from x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardTask.java rename to x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/DownsampleShardTask.java index 8a7abf6101e13..2cb0da5d315a5 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardTask.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/DownsampleShardTask.java @@ -5,7 +5,7 @@ * 2.0. */ -package org.elasticsearch.xpack.core.rollup.action; +package org.elasticsearch.xpack.core.downsample; import org.elasticsearch.action.downsample.DownsampleConfig; import org.elasticsearch.index.shard.ShardId; @@ -19,9 +19,9 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -public class RollupShardTask extends AllocatedPersistentTask { +public class DownsampleShardTask extends AllocatedPersistentTask { public static final String TASK_NAME = "rollup-shard"; - private final String rollupIndex; + private final String downsampleIndex; private volatile long totalShardDocCount; private volatile long docsProcessed; private final long indexStartTimeMillis; @@ -36,36 +36,38 @@ public class RollupShardTask extends AllocatedPersistentTask { private final AtomicLong lastSourceTimestamp = new AtomicLong(0); private final AtomicLong lastTargetTimestamp = new AtomicLong(0); private final AtomicLong lastIndexingTimestamp = new AtomicLong(0); - private final AtomicReference rollupShardIndexerStatus = new AtomicReference<>( - RollupShardIndexerStatus.INITIALIZED + private final AtomicReference downsampleShardIndexerStatus = new AtomicReference<>( + DownsampleShardIndexerStatus.INITIALIZED ); - private final RollupBulkStats rollupBulkStats; + private final DownsampleBulkStats downsampleBulkStats; // Need to set initial values, because these atomic references can be read before bulk indexing started or when downsampling empty index - private final AtomicReference lastBeforeBulkInfo = new AtomicReference<>(new RollupBeforeBulkInfo(0, 0, 0, 0)); - private final AtomicReference lastAfterBulkInfo = new AtomicReference<>( - new RollupAfterBulkInfo(0, 0, 0, 0, false, 0) + private final AtomicReference lastBeforeBulkInfo = new AtomicReference<>( + new DownsampleBeforeBulkInfo(0, 0, 0, 0) + ); + private final AtomicReference lastAfterBulkInfo = new AtomicReference<>( + new DownsampleAfterBulkInfo(0, 0, 0, 0, false, 0) ); - public RollupShardTask( + public DownsampleShardTask( long id, final String type, final String action, final TaskId parentTask, - final String rollupIndex, + final String downsampleIndex, long indexStartTimeMillis, long indexEndTimeMillis, final DownsampleConfig config, final Map headers, final ShardId shardId ) { - super(id, type, action, RollupField.NAME + "_" + rollupIndex + "[" + shardId.id() + "]", parentTask, headers); - this.rollupIndex = rollupIndex; + super(id, type, action, RollupField.NAME + "_" + downsampleIndex + "[" + shardId.id() + "]", parentTask, headers); + this.downsampleIndex = downsampleIndex; this.indexStartTimeMillis = indexStartTimeMillis; this.indexEndTimeMillis = indexEndTimeMillis; this.config = config; this.shardId = shardId; this.rollupStartTime = System.currentTimeMillis(); - this.rollupBulkStats = new RollupBulkStats(); + this.downsampleBulkStats = new DownsampleBulkStats(); } @Override @@ -88,8 +90,8 @@ public void testInit( init(persistentTasksService, taskManager, persistentTaskId, allocationId); } - public String getRollupIndex() { - return rollupIndex; + public String getDownsampleIndex() { + return downsampleIndex; } public DownsampleConfig config() { @@ -102,7 +104,7 @@ public long getTotalShardDocCount() { @Override public Status getStatus() { - return new RollupShardStatus( + return new DownsampleShardStatus( shardId, rollupStartTime, numReceived.get(), @@ -117,10 +119,10 @@ public Status getStatus() { indexEndTimeMillis, docsProcessed, 100.0F * docsProcessed / totalShardDocCount, - rollupBulkStats.getRollupBulkInfo(), + downsampleBulkStats.getRollupBulkInfo(), lastBeforeBulkInfo.get(), lastAfterBulkInfo.get(), - rollupShardIndexerStatus.get() + downsampleShardIndexerStatus.get() ); } @@ -148,11 +150,11 @@ public long getLastTargetTimestamp() { return lastTargetTimestamp.get(); } - public RollupBeforeBulkInfo getLastBeforeBulkInfo() { + public DownsampleBeforeBulkInfo getLastBeforeBulkInfo() { return lastBeforeBulkInfo.get(); } - public RollupAfterBulkInfo getLastAfterBulkInfo() { + public DownsampleAfterBulkInfo getLastAfterBulkInfo() { return lastAfterBulkInfo.get(); } @@ -164,8 +166,8 @@ public long getRollupStartTime() { return rollupStartTime; } - public RollupShardIndexerStatus getRollupShardIndexerStatus() { - return rollupShardIndexerStatus.get(); + public DownsampleShardIndexerStatus getDownsampleShardIndexerStatus() { + return downsampleShardIndexerStatus.get(); } public long getLastIndexingTimestamp() { @@ -212,16 +214,16 @@ public void setLastIndexingTimestamp(long timestamp) { lastIndexingTimestamp.set(timestamp); } - public void setBeforeBulkInfo(final RollupBeforeBulkInfo beforeBulkInfo) { + public void setBeforeBulkInfo(final DownsampleBeforeBulkInfo beforeBulkInfo) { lastBeforeBulkInfo.set(beforeBulkInfo); } - public void setAfterBulkInfo(final RollupAfterBulkInfo afterBulkInfo) { + public void setAfterBulkInfo(final DownsampleAfterBulkInfo afterBulkInfo) { lastAfterBulkInfo.set(afterBulkInfo); } - public void setRollupShardIndexerStatus(final RollupShardIndexerStatus status) { - this.rollupShardIndexerStatus.set(status); + public void setDownsampleShardIndexerStatus(final DownsampleShardIndexerStatus status) { + this.downsampleShardIndexerStatus.set(status); } public void setTotalShardDocCount(int totalShardDocCount) { @@ -232,11 +234,11 @@ public void setDocsProcessed(long docsProcessed) { this.docsProcessed = docsProcessed; } - public void updateRollupBulkInfo(long bulkIngestTookMillis, long bulkTookMillis) { - this.rollupBulkStats.update(bulkIngestTookMillis, bulkTookMillis); + public void updateBulkInfo(long bulkIngestTookMillis, long bulkTookMillis) { + this.downsampleBulkStats.update(bulkIngestTookMillis, bulkTookMillis); } - public RollupBulkInfo getRollupBulkInfo() { - return this.rollupBulkStats.getRollupBulkInfo(); + public DownsampleBulkInfo getDownsampleBulkInfo() { + return this.downsampleBulkStats.getRollupBulkInfo(); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardPersistentTaskState.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardPersistentTaskState.java deleted file mode 100644 index 5ffdb6b2be7ad..0000000000000 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardPersistentTaskState.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.core.rollup.action; - -import org.apache.lucene.util.BytesRef; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.persistent.PersistentTaskState; -import org.elasticsearch.xcontent.ObjectParser; -import org.elasticsearch.xcontent.ParseField; -import org.elasticsearch.xcontent.XContentBuilder; -import org.elasticsearch.xcontent.XContentParser; - -import java.io.IOException; - -/** - * @param rollupShardIndexerStatus An instance of {@link RollupShardIndexerStatus} with the rollupShardIndexerStatus of the rollup task - * @param tsid The latest successfully processed tsid component of a tuple (tsid, timestamp) - */ -public record RollupShardPersistentTaskState(RollupShardIndexerStatus rollupShardIndexerStatus, BytesRef tsid) - implements - PersistentTaskState { - - public static final String NAME = RollupShardTask.TASK_NAME; - private static final ParseField ROLLUP_SHARD_INDEXER_STATUS = new ParseField("status"); - private static final ParseField TSID = new ParseField("tsid"); - - public static final ObjectParser PARSER = new ObjectParser<>(NAME); - - static { - PARSER.declareField( - RollupShardPersistentTaskState.Builder::status, - (p, c) -> RollupShardIndexerStatus.valueOf(p.textOrNull()), - ROLLUP_SHARD_INDEXER_STATUS, - ObjectParser.ValueType.STRING - ); - PARSER.declareField( - RollupShardPersistentTaskState.Builder::tsid, - (p, c) -> new BytesRef(p.textOrNull()), - TSID, - ObjectParser.ValueType.STRING - ); - } - - public RollupShardPersistentTaskState(final StreamInput in) throws IOException { - this(RollupShardIndexerStatus.readFromStream(in), in.readBytesRef()); - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - builder.field(ROLLUP_SHARD_INDEXER_STATUS.getPreferredName(), rollupShardIndexerStatus); - if (tsid != null) { - builder.field(TSID.getPreferredName(), tsid.utf8ToString()); - } - return builder.endObject(); - } - - @Override - public String getWriteableName() { - return RollupShardTask.TASK_NAME; - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - rollupShardIndexerStatus.writeTo(out); - out.writeBytesRef(tsid); - } - - @Override - public RollupShardIndexerStatus rollupShardIndexerStatus() { - return rollupShardIndexerStatus; - } - - public boolean done() { - return RollupShardIndexerStatus.COMPLETED.equals(rollupShardIndexerStatus) - || RollupShardIndexerStatus.CANCELLED.equals(rollupShardIndexerStatus) - || RollupShardIndexerStatus.FAILED.equals(rollupShardIndexerStatus); - } - - public boolean started() { - return RollupShardIndexerStatus.STARTED.equals(rollupShardIndexerStatus); - } - - public boolean cancelled() { - return RollupShardIndexerStatus.CANCELLED.equals(rollupShardIndexerStatus); - } - - public boolean failed() { - return RollupShardIndexerStatus.FAILED.equals(rollupShardIndexerStatus); - } - - public static RollupShardPersistentTaskState readFromStream(final StreamInput in) throws IOException { - return new RollupShardPersistentTaskState(RollupShardIndexerStatus.readFromStream(in), in.readBytesRef()); - } - - public static RollupShardPersistentTaskState fromXContent(final XContentParser parser) throws IOException { - final RollupShardPersistentTaskState.Builder builder = new RollupShardPersistentTaskState.Builder(); - PARSER.parse(parser, builder, null); - return builder.build(); - } - - public static class Builder { - private RollupShardIndexerStatus status; - private BytesRef tsid; - - public Builder status(final RollupShardIndexerStatus status) { - this.status = status; - return this; - } - - public Builder tsid(final BytesRef tsid) { - this.tsid = tsid; - return this; - } - - public RollupShardPersistentTaskState build() { - return new RollupShardPersistentTaskState(status, tsid); - } - } -} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/DownsampleActionConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/downsample/DownsampleActionConfigTests.java similarity index 94% rename from x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/DownsampleActionConfigTests.java rename to x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/downsample/DownsampleActionConfigTests.java index 0f0ed022b3ca3..a32b43214e34f 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/DownsampleActionConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/downsample/DownsampleActionConfigTests.java @@ -4,13 +4,14 @@ * 2.0; you may not use this file except in compliance with the Elastic License * 2.0. */ -package org.elasticsearch.xpack.core.rollup; +package org.elasticsearch.xpack.core.downsample; import org.elasticsearch.action.downsample.DownsampleConfig; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.test.AbstractXContentSerializingTestCase; import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers; import java.io.IOException; diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/RollupShardPersistentTaskStateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/downsample/DownsampleShardPersistentTaskStateTests.java similarity index 53% rename from x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/RollupShardPersistentTaskStateTests.java rename to x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/downsample/DownsampleShardPersistentTaskStateTests.java index a3bcb427621b1..af30647bd6139 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/RollupShardPersistentTaskStateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/downsample/DownsampleShardPersistentTaskStateTests.java @@ -5,7 +5,7 @@ * 2.0. */ -package org.elasticsearch.xpack.core.rollup; +package org.elasticsearch.xpack.core.downsample; import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; @@ -13,13 +13,11 @@ import org.elasticsearch.persistent.PersistentTaskState; import org.elasticsearch.test.AbstractXContentSerializingTestCase; import org.elasticsearch.xcontent.XContentParser; -import org.elasticsearch.xpack.core.rollup.action.RollupShardIndexerStatus; -import org.elasticsearch.xpack.core.rollup.action.RollupShardPersistentTaskState; import java.io.IOException; import java.util.List; -public class RollupShardPersistentTaskStateTests extends AbstractXContentSerializingTestCase { +public class DownsampleShardPersistentTaskStateTests extends AbstractXContentSerializingTestCase { @Override protected NamedWriteableRegistry getNamedWriteableRegistry() { @@ -27,20 +25,20 @@ protected NamedWriteableRegistry getNamedWriteableRegistry() { List.of( new NamedWriteableRegistry.Entry( PersistentTaskState.class, - RollupShardPersistentTaskState.NAME, - RollupShardPersistentTaskState::readFromStream + DownsampleShardPersistentTaskState.NAME, + DownsampleShardPersistentTaskState::readFromStream ) ) ); } @Override - protected Writeable.Reader instanceReader() { - return RollupShardPersistentTaskState::new; + protected Writeable.Reader instanceReader() { + return DownsampleShardPersistentTaskState::new; } @Override - protected RollupShardPersistentTaskState createTestInstance() { + protected DownsampleShardPersistentTaskState createTestInstance() { try { return createRollupShardPersistentTaskState(); } catch (IOException e) { @@ -48,20 +46,20 @@ protected RollupShardPersistentTaskState createTestInstance() { } } - private static RollupShardPersistentTaskState createRollupShardPersistentTaskState() throws IOException { - return new RollupShardPersistentTaskState( - randomFrom(RollupShardIndexerStatus.values()), + private static DownsampleShardPersistentTaskState createRollupShardPersistentTaskState() throws IOException { + return new DownsampleShardPersistentTaskState( + randomFrom(DownsampleShardIndexerStatus.values()), new BytesRef(randomAlphaOfLengthBetween(10, 100)) ); } @Override - protected RollupShardPersistentTaskState mutateInstance(RollupShardPersistentTaskState instance) throws IOException { + protected DownsampleShardPersistentTaskState mutateInstance(DownsampleShardPersistentTaskState instance) throws IOException { return null; // TODO implement https://github.com/elastic/elasticsearch/issues/25929 } @Override - protected RollupShardPersistentTaskState doParseInstance(XContentParser parser) throws IOException { - return RollupShardPersistentTaskState.fromXContent(parser); + protected DownsampleShardPersistentTaskState doParseInstance(XContentParser parser) throws IOException { + return DownsampleShardPersistentTaskState.fromXContent(parser); } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatusSerializingTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/downsample/DownsampleShardStatusSerializingTests.java similarity index 73% rename from x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatusSerializingTests.java rename to x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/downsample/DownsampleShardStatusSerializingTests.java index 5a6537ac5b56f..b695137b8a4a0 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/action/RollupShardStatusSerializingTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/downsample/DownsampleShardStatusSerializingTests.java @@ -5,7 +5,7 @@ * 2.0. */ -package org.elasticsearch.xpack.core.rollup.action; +package org.elasticsearch.xpack.core.downsample; import org.elasticsearch.common.io.stream.Writeable.Reader; import org.elasticsearch.index.shard.ShardId; @@ -15,20 +15,20 @@ import java.io.IOException; -public class RollupShardStatusSerializingTests extends AbstractXContentSerializingTestCase { +public class DownsampleShardStatusSerializingTests extends AbstractXContentSerializingTestCase { @Override - protected RollupShardStatus doParseInstance(XContentParser parser) throws IOException { - return RollupShardStatus.fromXContent(parser); + protected DownsampleShardStatus doParseInstance(XContentParser parser) throws IOException { + return DownsampleShardStatus.fromXContent(parser); } @Override - protected Reader instanceReader() { - return RollupShardStatus::new; + protected Reader instanceReader() { + return DownsampleShardStatus::new; } @Override - protected RollupShardStatus createTestInstance() { + protected DownsampleShardStatus createTestInstance() { long docsProcessed = randomLongBetween(500_000, 800_000); long indexEndTimeMillis = System.currentTimeMillis() + randomLongBetween(400_000, 500_000); long indexStartTimeMillis = System.currentTimeMillis() - randomLongBetween(400_000, 500_000); @@ -42,8 +42,8 @@ protected RollupShardStatus createTestInstance() { long numReceived = randomNonNegativeLong(); long rollupStart = randomMillisUpToYear9999(); final ShardId shardId = new ShardId(randomAlphaOfLength(5), randomAlphaOfLength(5), randomInt(5)); - final RollupShardIndexerStatus rollupShardIndexerStatus = randomFrom(RollupShardIndexerStatus.values()); - return new RollupShardStatus( + final DownsampleShardIndexerStatus downsampleShardIndexerStatus = randomFrom(DownsampleShardIndexerStatus.values()); + return new DownsampleShardStatus( shardId, rollupStart, numReceived, @@ -61,12 +61,12 @@ protected RollupShardStatus createTestInstance() { createTestRollupBulkInfo(), createTestBeforeBulkInfoInstance(), createTestAfterBulkInfoInstance(), - rollupShardIndexerStatus + downsampleShardIndexerStatus ); } - private RollupBulkInfo createTestRollupBulkInfo() { - return new RollupBulkInfo( + private DownsampleBulkInfo createTestRollupBulkInfo() { + return new DownsampleBulkInfo( randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), @@ -77,8 +77,8 @@ private RollupBulkInfo createTestRollupBulkInfo() { ); } - private RollupBeforeBulkInfo createTestBeforeBulkInfoInstance() { - return new RollupBeforeBulkInfo( + private DownsampleBeforeBulkInfo createTestBeforeBulkInfoInstance() { + return new DownsampleBeforeBulkInfo( System.currentTimeMillis(), randomNonNegativeLong(), randomNonNegativeLong(), @@ -86,11 +86,11 @@ private RollupBeforeBulkInfo createTestBeforeBulkInfoInstance() { ); } - private RollupAfterBulkInfo createTestAfterBulkInfoInstance() { + private DownsampleAfterBulkInfo createTestAfterBulkInfoInstance() { int randomRestStatusCode = randomBoolean() ? RestStatus.OK.getStatus() : randomBoolean() ? RestStatus.INTERNAL_SERVER_ERROR.getStatus() : RestStatus.BAD_REQUEST.getStatus(); - return new RollupAfterBulkInfo( + return new DownsampleAfterBulkInfo( System.currentTimeMillis(), randomLongBetween(1_000, 5_000), randomLongBetween(1_000, 5_000), @@ -101,7 +101,7 @@ private RollupAfterBulkInfo createTestAfterBulkInfoInstance() { } @Override - protected RollupShardStatus mutateInstance(RollupShardStatus instance) { + protected DownsampleShardStatus mutateInstance(DownsampleShardStatus instance) { return null;// TODO implement https://github.com/elastic/elasticsearch/issues/25929 } } diff --git a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/ILMDownsampleDisruptionIT.java b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/ILMDownsampleDisruptionIT.java index 9b1827e47375d..12a97521d2866 100644 --- a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/ILMDownsampleDisruptionIT.java +++ b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/ILMDownsampleDisruptionIT.java @@ -192,15 +192,19 @@ public boolean validateClusterForming() { })).start(); final String targetIndex = "downsample-" + sourceIndex + "-1h"; - startRollupTaskViaIlm(sourceIndex, targetIndex, disruptionStart, disruptionEnd); + startDownsampleTaskViaIlm(sourceIndex, targetIndex, disruptionStart, disruptionEnd); waitUntil(() -> cluster.client().admin().cluster().preparePendingClusterTasks().get().pendingTasks().isEmpty()); ensureStableCluster(cluster.numDataAndMasterNodes()); assertTargetIndex(cluster, targetIndex, indexedDocs); } } - private void startRollupTaskViaIlm(String sourceIndex, String targetIndex, CountDownLatch disruptionStart, CountDownLatch disruptionEnd) - throws Exception { + private void startDownsampleTaskViaIlm( + String sourceIndex, + String targetIndex, + CountDownLatch disruptionStart, + CountDownLatch disruptionEnd + ) throws Exception { disruptionStart.await(); var request = new UpdateSettingsRequest(sourceIndex).settings( Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, POLICY_NAME) diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/AggregateMetricFieldSerializer.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/AggregateMetricFieldSerializer.java index 8133852762252..57137ec429978 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/AggregateMetricFieldSerializer.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/AggregateMetricFieldSerializer.java @@ -34,16 +34,16 @@ public void write(XContentBuilder builder) throws IOException { } builder.startObject(name); - for (AbstractDownsampleFieldProducer rollupFieldProducer : producers) { - assert name.equals(rollupFieldProducer.name()) : "producer has a different name"; - if (rollupFieldProducer.isEmpty() == false) { - if (rollupFieldProducer instanceof MetricFieldProducer metricFieldProducer) { + for (AbstractDownsampleFieldProducer fieldProducer : producers) { + assert name.equals(fieldProducer.name()) : "producer has a different name"; + if (fieldProducer.isEmpty() == false) { + if (fieldProducer instanceof MetricFieldProducer metricFieldProducer) { for (MetricFieldProducer.Metric metric : metricFieldProducer.metrics()) { if (metric.get() != null) { builder.field(metric.name(), metric.get()); } } - } else if (rollupFieldProducer instanceof LabelFieldProducer labelFieldProducer) { + } else if (fieldProducer instanceof LabelFieldProducer labelFieldProducer) { LabelFieldProducer.Label label = labelFieldProducer.label(); if (label.get() != null) { builder.field(label.name(), label.get()); diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/AggregateMetricFieldValueFetcher.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/AggregateMetricFieldValueFetcher.java index fdfc4fa991e9b..ec98cbeb152f4 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/AggregateMetricFieldValueFetcher.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/AggregateMetricFieldValueFetcher.java @@ -17,7 +17,7 @@ public class AggregateMetricFieldValueFetcher extends FieldValueFetcher { private final AggregateDoubleMetricFieldType aggMetricFieldType; - private final AbstractDownsampleFieldProducer rollupFieldProducer; + private final AbstractDownsampleFieldProducer fieldProducer; protected AggregateMetricFieldValueFetcher( MappedFieldType fieldType, @@ -26,14 +26,14 @@ protected AggregateMetricFieldValueFetcher( ) { super(fieldType.name(), fieldType, fieldData); this.aggMetricFieldType = aggMetricFieldType; - this.rollupFieldProducer = createRollupFieldProducer(); + this.fieldProducer = createFieldProducer(); } - public AbstractDownsampleFieldProducer rollupFieldProducer() { - return rollupFieldProducer; + public AbstractDownsampleFieldProducer fieldProducer() { + return fieldProducer; } - private AbstractDownsampleFieldProducer createRollupFieldProducer() { + private AbstractDownsampleFieldProducer createFieldProducer() { AggregateDoubleMetricFieldMapper.Metric metric = null; for (var e : aggMetricFieldType.getMetricFields().entrySet()) { NumberFieldMapper.NumberFieldType metricSubField = e.getValue(); @@ -46,7 +46,7 @@ private AbstractDownsampleFieldProducer createRollupFieldProducer() { if (aggMetricFieldType.getMetricType() != null) { // If the field is an aggregate_metric_double field, we should use the correct subfields - // for each aggregation. This is a rollup-of-rollup case + // for each aggregation. This is a downsample-of-downsample case MetricFieldProducer.Metric metricOperation = switch (metric) { case max -> new MetricFieldProducer.Max(); case min -> new MetricFieldProducer.Min(); diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/Downsample.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/Downsample.java index 61b944e865d4c..634d579840aaf 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/Downsample.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/Downsample.java @@ -43,8 +43,8 @@ import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xpack.core.downsample.DownsampleAction; import org.elasticsearch.xpack.core.downsample.DownsampleIndexerAction; -import org.elasticsearch.xpack.core.rollup.action.RollupShardPersistentTaskState; -import org.elasticsearch.xpack.core.rollup.action.RollupShardTask; +import org.elasticsearch.xpack.core.downsample.DownsampleShardPersistentTaskState; +import org.elasticsearch.xpack.core.downsample.DownsampleShardTask; import java.util.Collection; import java.util.List; @@ -100,7 +100,12 @@ public List> getPersistentTasksExecutor( IndexNameExpressionResolver expressionResolver ) { return List.of( - new RollupShardPersistentTaskExecutor(client, this.indicesService, RollupShardTask.TASK_NAME, DOWSAMPLE_TASK_THREAD_POOL_NAME) + new DownsampleShardPersistentTaskExecutor( + client, + this.indicesService, + DownsampleShardTask.TASK_NAME, + DOWSAMPLE_TASK_THREAD_POOL_NAME + ) ); } @@ -109,8 +114,8 @@ public List getNamedXContent() { return List.of( new NamedXContentRegistry.Entry( PersistentTaskState.class, - new ParseField(RollupShardPersistentTaskState.NAME), - RollupShardPersistentTaskState::fromXContent + new ParseField(DownsampleShardPersistentTaskState.NAME), + DownsampleShardPersistentTaskState::fromXContent ), new NamedXContentRegistry.Entry( PersistentTaskParams.class, @@ -125,8 +130,8 @@ public List getNamedWriteables() { return List.of( new NamedWriteableRegistry.Entry( PersistentTaskState.class, - RollupShardPersistentTaskState.NAME, - RollupShardPersistentTaskState::readFromStream + DownsampleShardPersistentTaskState.NAME, + DownsampleShardPersistentTaskState::readFromStream ), new NamedWriteableRegistry.Entry( PersistentTaskParams.class, diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardIndexer.java similarity index 75% rename from x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java rename to x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardIndexer.java index 523e105f9c5bc..460401650aed3 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardIndexer.java @@ -49,12 +49,12 @@ import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentFactory; import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.xpack.core.downsample.DownsampleAfterBulkInfo; +import org.elasticsearch.xpack.core.downsample.DownsampleBeforeBulkInfo; import org.elasticsearch.xpack.core.downsample.DownsampleIndexerAction; -import org.elasticsearch.xpack.core.rollup.action.RollupAfterBulkInfo; -import org.elasticsearch.xpack.core.rollup.action.RollupBeforeBulkInfo; -import org.elasticsearch.xpack.core.rollup.action.RollupShardIndexerStatus; -import org.elasticsearch.xpack.core.rollup.action.RollupShardPersistentTaskState; -import org.elasticsearch.xpack.core.rollup.action.RollupShardTask; +import org.elasticsearch.xpack.core.downsample.DownsampleShardIndexerStatus; +import org.elasticsearch.xpack.core.downsample.DownsampleShardPersistentTaskState; +import org.elasticsearch.xpack.core.downsample.DownsampleShardTask; import java.io.Closeable; import java.io.IOException; @@ -71,50 +71,47 @@ /** * An indexer for downsampling that iterates documents collected by {@link TimeSeriesIndexSearcher}, - * computes the rollup buckets and stores the buckets in the downsampled index. + * computes the downsample buckets and stores the buckets in the downsampled index. *

* The documents collected by the {@link TimeSeriesIndexSearcher} are expected to be sorted * by _tsid in ascending order and @timestamp in descending order. */ -class RollupShardIndexer { - - public static final String NAME = "rollup-shard-indexer"; - private static final Logger logger = LogManager.getLogger(RollupShardIndexer.class); - public static final int ROLLUP_BULK_ACTIONS = 10000; - public static final ByteSizeValue ROLLUP_BULK_SIZE = new ByteSizeValue(1, ByteSizeUnit.MB); - public static final ByteSizeValue ROLLUP_MAX_BYTES_IN_FLIGHT = new ByteSizeValue(50, ByteSizeUnit.MB); - private final IndexService indexService; +class DownsampleShardIndexer { + + private static final Logger logger = LogManager.getLogger(DownsampleShardIndexer.class); + public static final int DOWNSAMPLE_BULK_ACTIONS = 10000; + public static final ByteSizeValue DOWNSAMPLE_BULK_SIZE = new ByteSizeValue(1, ByteSizeUnit.MB); + public static final ByteSizeValue DOWNSAMPLE_MAX_BYTES_IN_FLIGHT = new ByteSizeValue(50, ByteSizeUnit.MB); private final IndexShard indexShard; private final Client client; - private final String rollupIndex; + private final String downsampleIndex; private final Engine.Searcher searcher; private final SearchExecutionContext searchExecutionContext; private final DateFieldMapper.DateFieldType timestampField; private final DocValueFormat timestampFormat; private final Rounding.Prepared rounding; private final List fieldValueFetchers; - private final RollupShardTask task; - private final RollupShardPersistentTaskState state; + private final DownsampleShardTask task; + private final DownsampleShardPersistentTaskState state; private volatile boolean abort = false; - ByteSizeValue rollupBulkSize = ROLLUP_BULK_SIZE; - ByteSizeValue rollupMaxBytesInFlight = ROLLUP_MAX_BYTES_IN_FLIGHT; + ByteSizeValue downsampleBulkSize = DOWNSAMPLE_BULK_SIZE; + ByteSizeValue downsampleMaxBytesInFlight = DOWNSAMPLE_MAX_BYTES_IN_FLIGHT; - RollupShardIndexer( - final RollupShardTask task, + DownsampleShardIndexer( + final DownsampleShardTask task, final Client client, final IndexService indexService, final ShardId shardId, - final String rollupIndex, + final String downsampleIndex, final DownsampleConfig config, final String[] metrics, final String[] labels, - final RollupShardPersistentTaskState state + final DownsampleShardPersistentTaskState state ) { this.task = task; this.client = client; - this.indexService = indexService; this.indexShard = indexService.getShard(shardId.id()); - this.rollupIndex = rollupIndex; + this.downsampleIndex = downsampleIndex; this.searcher = indexShard.acquireSearcher("downsampling"); this.state = state; Closeable toClose = searcher; @@ -148,8 +145,11 @@ public DownsampleIndexerAction.ShardDownsampleResponse execute() throws IOExcept } long startTime = client.threadPool().relativeTimeInMillis(); task.setTotalShardDocCount(searcher.getDirectoryReader().numDocs()); - task.setRollupShardIndexerStatus(RollupShardIndexerStatus.STARTED); - task.updatePersistentTaskState(new RollupShardPersistentTaskState(RollupShardIndexerStatus.STARTED, null), ActionListener.noop()); + task.setDownsampleShardIndexerStatus(DownsampleShardIndexerStatus.STARTED); + task.updatePersistentTaskState( + new DownsampleShardPersistentTaskState(DownsampleShardIndexerStatus.STARTED, null), + ActionListener.noop() + ); logger.info("Downsampling task [" + task.getPersistentTaskId() + " on shard " + indexShard.shardId() + " started"); BulkProcessor2 bulkProcessor = createBulkProcessor(); try (searcher; bulkProcessor) { @@ -161,7 +161,7 @@ public DownsampleIndexerAction.ShardDownsampleResponse execute() throws IOExcept } logger.info( - "Shard [{}] successfully sent [{}], received source doc [{}], indexed rollup doc [{}], failed [{}], took [{}]", + "Shard [{}] successfully sent [{}], received source doc [{}], indexed downsampled doc [{}], failed [{}], took [{}]", indexShard.shardId(), task.getNumReceived(), task.getNumSent(), @@ -171,9 +171,9 @@ public DownsampleIndexerAction.ShardDownsampleResponse execute() throws IOExcept ); if (task.getNumIndexed() != task.getNumSent()) { - task.setRollupShardIndexerStatus(RollupShardIndexerStatus.FAILED); + task.setDownsampleShardIndexerStatus(DownsampleShardIndexerStatus.FAILED); task.updatePersistentTaskState( - new RollupShardPersistentTaskState(RollupShardIndexerStatus.FAILED, null), + new DownsampleShardPersistentTaskState(DownsampleShardIndexerStatus.FAILED, null), ActionListener.noop() ); final String error = "Downsampling task [" @@ -187,7 +187,7 @@ public DownsampleIndexerAction.ShardDownsampleResponse execute() throws IOExcept + task.getNumSent() + "]"; logger.info(error); - throw new RollupShardIndexerException(error, false); + throw new DownsampleShardIndexerException(error, false); } if (task.getNumFailed() > 0) { @@ -200,14 +200,17 @@ public DownsampleIndexerAction.ShardDownsampleResponse execute() throws IOExcept + "]"; logger.info(error); task.updatePersistentTaskState( - new RollupShardPersistentTaskState(RollupShardIndexerStatus.FAILED, null), + new DownsampleShardPersistentTaskState(DownsampleShardIndexerStatus.FAILED, null), ActionListener.noop() ); - throw new RollupShardIndexerException(error, false); + throw new DownsampleShardIndexerException(error, false); } - task.setRollupShardIndexerStatus(RollupShardIndexerStatus.COMPLETED); - task.updatePersistentTaskState(new RollupShardPersistentTaskState(RollupShardIndexerStatus.COMPLETED, null), ActionListener.noop()); + task.setDownsampleShardIndexerStatus(DownsampleShardIndexerStatus.COMPLETED); + task.updatePersistentTaskState( + new DownsampleShardPersistentTaskState(DownsampleShardIndexerStatus.COMPLETED, null), + ActionListener.noop() + ); logger.info("Downsampling task [" + task.getPersistentTaskId() + " on shard " + indexShard.shardId() + " completed"); return new DownsampleIndexerAction.ShardDownsampleResponse(indexShard.shardId(), task.getNumIndexed()); } @@ -222,39 +225,39 @@ private Query createQuery() { private void checkCancelled() { if (task.isCancelled()) { logger.warn( - "Shard [{}] rollup abort, sent [{}], indexed [{}], failed[{}]", + "Shard [{}] downsampled abort, sent [{}], indexed [{}], failed[{}]", indexShard.shardId(), task.getNumSent(), task.getNumIndexed(), task.getNumFailed() ); - task.setRollupShardIndexerStatus(RollupShardIndexerStatus.CANCELLED); + task.setDownsampleShardIndexerStatus(DownsampleShardIndexerStatus.CANCELLED); task.updatePersistentTaskState( - new RollupShardPersistentTaskState(RollupShardIndexerStatus.CANCELLED, null), + new DownsampleShardPersistentTaskState(DownsampleShardIndexerStatus.CANCELLED, null), ActionListener.noop() ); logger.info("Downsampling task [" + task.getPersistentTaskId() + "] on shard " + indexShard.shardId() + " cancelled"); - throw new RollupShardIndexerException( - new TaskCancelledException(format("Shard %s rollup cancelled", indexShard.shardId())), - format("Shard %s rollup cancelled", indexShard.shardId()), + throw new DownsampleShardIndexerException( + new TaskCancelledException(format("Shard %s downsample cancelled", indexShard.shardId())), + format("Shard %s downsample cancelled", indexShard.shardId()), false ); } if (abort) { logger.warn( - "Shard [{}] rollup abort, sent [{}], indexed [{}], failed[{}]", + "Shard [{}] downsample abort, sent [{}], indexed [{}], failed[{}]", indexShard.shardId(), task.getNumSent(), task.getNumIndexed(), task.getNumFailed() ); - task.setRollupShardIndexerStatus(RollupShardIndexerStatus.FAILED); + task.setDownsampleShardIndexerStatus(DownsampleShardIndexerStatus.FAILED); task.updatePersistentTaskState( - new RollupShardPersistentTaskState(RollupShardIndexerStatus.FAILED, null), + new DownsampleShardPersistentTaskState(DownsampleShardIndexerStatus.FAILED, null), ActionListener.noop() ); - throw new RollupShardIndexerException("Bulk indexing failure", true); + throw new DownsampleShardIndexerException("Bulk indexing failure", true); } } @@ -265,7 +268,7 @@ private BulkProcessor2 createBulkProcessor() { public void beforeBulk(long executionId, BulkRequest request) { task.addNumSent(request.numberOfActions()); task.setBeforeBulkInfo( - new RollupBeforeBulkInfo( + new DownsampleBeforeBulkInfo( client.threadPool().absoluteTimeInMillis(), executionId, request.estimatedSizeInBytes(), @@ -280,7 +283,7 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon long bulkTookMillis = response.getTook().getMillis(); task.addNumIndexed(request.numberOfActions()); task.setAfterBulkInfo( - new RollupAfterBulkInfo( + new DownsampleAfterBulkInfo( client.threadPool().absoluteTimeInMillis(), executionId, bulkIngestTookMillis, @@ -289,7 +292,7 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon response.status().getStatus() ) ); - task.updateRollupBulkInfo(bulkIngestTookMillis, bulkTookMillis); + task.updateBulkInfo(bulkIngestTookMillis, bulkTookMillis); if (response.hasFailures()) { List failedItems = Arrays.stream(response.getItems()).filter(BulkItemResponse::isFailed).toList(); @@ -303,7 +306,7 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon (msg1, msg2) -> Objects.equals(msg1, msg2) ? msg1 : msg1 + "," + msg2 ) ); - logger.error("Shard [{}] failed to populate rollup index. Failures: [{}]", indexShard.shardId(), failures); + logger.error("Shard [{}] failed to populate downsample index. Failures: [{}]", indexShard.shardId(), failures); abort = true; } @@ -314,7 +317,7 @@ public void afterBulk(long executionId, BulkRequest request, Exception failure) if (failure != null) { long items = request.numberOfActions(); task.addNumFailed(items); - logger.error(() -> format("Shard [%s] failed to populate rollup index.", indexShard.shardId()), failure); + logger.error(() -> format("Shard [%s] failed to populate downsample index.", indexShard.shardId()), failure); abort = true; } @@ -322,16 +325,16 @@ public void afterBulk(long executionId, BulkRequest request, Exception failure) }; return BulkProcessor2.builder(client::bulk, listener, client.threadPool()) - .setBulkActions(ROLLUP_BULK_ACTIONS) - .setBulkSize(ROLLUP_BULK_SIZE) - .setMaxBytesInFlight(rollupMaxBytesInFlight) + .setBulkActions(DOWNSAMPLE_BULK_ACTIONS) + .setBulkSize(DOWNSAMPLE_BULK_SIZE) + .setMaxBytesInFlight(downsampleMaxBytesInFlight) .setMaxNumberOfRetries(3) .build(); } private class TimeSeriesBucketCollector extends BucketCollector { private final BulkProcessor2 bulkProcessor; - private final RollupBucketBuilder rollupBucketBuilder; + private final DownsampleBucketBuilder downsampleBucketBuilder; private long docsProcessed; private long bucketsCreated; long lastTimestamp = Long.MAX_VALUE; @@ -339,10 +342,10 @@ private class TimeSeriesBucketCollector extends BucketCollector { TimeSeriesBucketCollector(BulkProcessor2 bulkProcessor) { this.bulkProcessor = bulkProcessor; - AbstractDownsampleFieldProducer[] rollupFieldProducers = fieldValueFetchers.stream() - .map(FieldValueFetcher::rollupFieldProducer) + AbstractDownsampleFieldProducer[] fieldProducers = fieldValueFetchers.stream() + .map(FieldValueFetcher::fieldProducer) .toArray(AbstractDownsampleFieldProducer[]::new); - this.rollupBucketBuilder = new RollupBucketBuilder(rollupFieldProducers); + this.downsampleBucketBuilder = new DownsampleBucketBuilder(fieldProducers); } @Override @@ -351,11 +354,11 @@ public LeafBucketCollector getLeafCollector(final AggregationExecutionContext ag final DocCountProvider docCountProvider = new DocCountProvider(); docCountProvider.setLeafReaderContext(ctx); - // For each field, return a tuple with the rollup field producer and the field value leaf + // For each field, return a tuple with the downsample field producer and the field value leaf final AbstractDownsampleFieldProducer[] fieldProducers = new AbstractDownsampleFieldProducer[fieldValueFetchers.size()]; final FormattedDocValues[] formattedDocValues = new FormattedDocValues[fieldValueFetchers.size()]; for (int i = 0; i < fieldProducers.length; i++) { - fieldProducers[i] = fieldValueFetchers.get(i).rollupFieldProducer(); + fieldProducers[i] = fieldValueFetchers.get(i).fieldProducer(); formattedDocValues[i] = fieldValueFetchers.get(i).getLeaf(ctx); } @@ -368,7 +371,7 @@ public void collect(int docId, long owningBucketOrd) throws IOException { final int tsidOrd = aggCtx.getTsidOrd(); final long timestamp = timestampField.resolution().roundDownToMillis(aggCtx.getTimestamp()); - boolean tsidChanged = tsidOrd != rollupBucketBuilder.tsidOrd(); + boolean tsidChanged = tsidOrd != downsampleBucketBuilder.tsidOrd(); if (tsidChanged || timestamp < lastHistoTimestamp) { lastHistoTimestamp = Math.max( rounding.round(timestamp), @@ -380,7 +383,7 @@ public void collect(int docId, long owningBucketOrd) throws IOException { if (logger.isTraceEnabled()) { logger.trace( - "Doc: [{}] - _tsid: [{}], @timestamp: [{}}] -> rollup bucket ts: [{}]", + "Doc: [{}] - _tsid: [{}], @timestamp: [{}}] -> downsample bucket ts: [{}]", docId, DocValueFormat.TIME_SERIES_ID.format(tsid), timestampFormat.format(timestamp), @@ -393,7 +396,7 @@ public void collect(int docId, long owningBucketOrd) throws IOException { * - _tsid must be sorted in ascending order * - @timestamp must be sorted in descending order within the same _tsid */ - BytesRef lastTsid = rollupBucketBuilder.tsid(); + BytesRef lastTsid = downsampleBucketBuilder.tsid(); assert lastTsid == null || lastTsid.compareTo(tsid) <= 0 : "_tsid is not sorted in ascending order: [" + DocValueFormat.TIME_SERIES_ID.format(lastTsid) @@ -408,29 +411,29 @@ public void collect(int docId, long owningBucketOrd) throws IOException { + "]"; lastTimestamp = timestamp; - if (tsidChanged || rollupBucketBuilder.timestamp() != lastHistoTimestamp) { - // Flush rollup doc if not empty - if (rollupBucketBuilder.isEmpty() == false) { - XContentBuilder doc = rollupBucketBuilder.buildRollupDocument(); + if (tsidChanged || downsampleBucketBuilder.timestamp() != lastHistoTimestamp) { + // Flush downsample doc if not empty + if (downsampleBucketBuilder.isEmpty() == false) { + XContentBuilder doc = downsampleBucketBuilder.buildDownsampleDocument(); indexBucket(doc); } - // Create new rollup bucket + // Create new downsample bucket if (tsidChanged) { - rollupBucketBuilder.resetTsid(tsid, tsidOrd, lastHistoTimestamp); + downsampleBucketBuilder.resetTsid(tsid, tsidOrd, lastHistoTimestamp); } else { - rollupBucketBuilder.resetTimestamp(lastHistoTimestamp); + downsampleBucketBuilder.resetTimestamp(lastHistoTimestamp); } bucketsCreated++; } final int docCount = docCountProvider.getDocCount(docId); - rollupBucketBuilder.collectDocCount(docCount); + downsampleBucketBuilder.collectDocCount(docCount); // Iterate over all field values and collect the doc_values for this docId for (int i = 0; i < fieldProducers.length; i++) { - AbstractDownsampleFieldProducer rollupFieldProducer = fieldProducers[i]; + AbstractDownsampleFieldProducer fieldProducer = fieldProducers[i]; FormattedDocValues docValues = formattedDocValues[i]; - rollupFieldProducer.collect(docValues, docId); + fieldProducer.collect(docValues, docId); } docsProcessed++; task.setDocsProcessed(docsProcessed); @@ -439,10 +442,10 @@ public void collect(int docId, long owningBucketOrd) throws IOException { } private void indexBucket(XContentBuilder doc) { - IndexRequestBuilder request = client.prepareIndex(rollupIndex); + IndexRequestBuilder request = client.prepareIndex(downsampleIndex); request.setSource(doc); if (logger.isTraceEnabled()) { - logger.trace("Indexing rollup doc: [{}]", Strings.toString(doc)); + logger.trace("Indexing downsample doc: [{}]", Strings.toString(doc)); } IndexRequest indexRequest = request.request(); task.setLastIndexingTimestamp(System.currentTimeMillis()); @@ -457,16 +460,21 @@ public void preCollection() { @Override public void postCollection() throws IOException { - // Flush rollup doc if not empty - if (rollupBucketBuilder.isEmpty() == false) { - XContentBuilder doc = rollupBucketBuilder.buildRollupDocument(); + // Flush downsample doc if not empty + if (downsampleBucketBuilder.isEmpty() == false) { + XContentBuilder doc = downsampleBucketBuilder.buildDownsampleDocument(); indexBucket(doc); } // check cancel after the flush all data checkCancelled(); - logger.info("Shard {} processed [{}] docs, created [{}] rollup buckets", indexShard.shardId(), docsProcessed, bucketsCreated); + logger.info( + "Shard {} processed [{}] docs, created [{}] downsample buckets", + indexShard.shardId(), + docsProcessed, + bucketsCreated + ); } @Override @@ -475,23 +483,23 @@ public ScoreMode scoreMode() { } } - private class RollupBucketBuilder { + private class DownsampleBucketBuilder { private BytesRef tsid; private int tsidOrd = -1; private long timestamp; private int docCount; - private final AbstractDownsampleFieldProducer[] rollupFieldProducers; + private final AbstractDownsampleFieldProducer[] fieldProducers; private final DownsampleFieldSerializer[] groupedProducers; - RollupBucketBuilder(AbstractDownsampleFieldProducer[] rollupFieldProducers) { - this.rollupFieldProducers = rollupFieldProducers; + DownsampleBucketBuilder(AbstractDownsampleFieldProducer[] fieldProducers) { + this.fieldProducers = fieldProducers; /* - * The rollup field producers for aggregate_metric_double all share the same name (this is + * The downsample field producers for aggregate_metric_double all share the same name (this is * the name they will be serialized in the target index). We group all field producers by - * name. If grouping yields multiple rollup field producers, we delegate serialization to + * name. If grouping yields multiple downsample field producers, we delegate serialization to * the AggregateMetricFieldSerializer class. */ - groupedProducers = Arrays.stream(rollupFieldProducers) + groupedProducers = Arrays.stream(fieldProducers) .collect(groupingBy(AbstractDownsampleFieldProducer::name)) .entrySet() .stream() @@ -520,7 +528,7 @@ public void resetTsid(BytesRef tsid, int tsidOrd, long timestamp) { public void resetTimestamp(long timestamp) { this.timestamp = timestamp; this.docCount = 0; - for (AbstractDownsampleFieldProducer producer : rollupFieldProducers) { + for (AbstractDownsampleFieldProducer producer : fieldProducers) { producer.reset(); } if (logger.isTraceEnabled()) { @@ -536,7 +544,7 @@ public void collectDocCount(int docCount) { this.docCount += docCount; } - public XContentBuilder buildRollupDocument() throws IOException { + public XContentBuilder buildDownsampleDocument() throws IOException { XContentBuilder builder = XContentFactory.contentBuilder(XContentType.SMILE); builder.startObject(); if (isEmpty()) { diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexerException.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardIndexerException.java similarity index 68% rename from x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexerException.java rename to x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardIndexerException.java index 327c125bbc445..17e746ada8ba9 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexerException.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardIndexerException.java @@ -9,15 +9,15 @@ import org.elasticsearch.ElasticsearchException; -public class RollupShardIndexerException extends ElasticsearchException { +public class DownsampleShardIndexerException extends ElasticsearchException { private final boolean retriable; - public RollupShardIndexerException(final Throwable cause, final String message, boolean retriable) { + public DownsampleShardIndexerException(final Throwable cause, final String message, boolean retriable) { super(message, cause); this.retriable = retriable; } - public RollupShardIndexerException(final String message, boolean retriable) { + public DownsampleShardIndexerException(final String message, boolean retriable) { super(message); this.retriable = retriable; } diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/RollupShardPersistentTaskExecutor.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java similarity index 79% rename from x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/RollupShardPersistentTaskExecutor.java rename to x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java index 5e08aa26f2469..8910fca95cdb3 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/RollupShardPersistentTaskExecutor.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java @@ -27,21 +27,21 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.tasks.TaskId; -import org.elasticsearch.xpack.core.rollup.action.RollupShardIndexerStatus; -import org.elasticsearch.xpack.core.rollup.action.RollupShardPersistentTaskState; -import org.elasticsearch.xpack.core.rollup.action.RollupShardTask; +import org.elasticsearch.xpack.core.downsample.DownsampleShardIndexerStatus; +import org.elasticsearch.xpack.core.downsample.DownsampleShardPersistentTaskState; +import org.elasticsearch.xpack.core.downsample.DownsampleShardTask; import java.util.Arrays; import java.util.Collection; import java.util.Map; import java.util.Objects; -public class RollupShardPersistentTaskExecutor extends PersistentTasksExecutor { - private static final Logger logger = LogManager.getLogger(RollupShardPersistentTaskExecutor.class); +public class DownsampleShardPersistentTaskExecutor extends PersistentTasksExecutor { + private static final Logger logger = LogManager.getLogger(DownsampleShardPersistentTaskExecutor.class); private final Client client; private final IndicesService indicesService; - public RollupShardPersistentTaskExecutor( + public DownsampleShardPersistentTaskExecutor( final Client client, final IndicesService indicesService, final String taskName, @@ -59,7 +59,7 @@ protected void nodeOperation( final PersistentTaskState state ) { // NOTE: query the downsampling target index so that we can start the downsampling task from the latest indexed tsid. - final SearchRequest searchRequest = new SearchRequest(params.rollupIndex()); + final SearchRequest searchRequest = new SearchRequest(params.downsampleIndex()); searchRequest.source().sort(TimeSeriesIdFieldMapper.NAME, SortOrder.DESC).size(1); searchRequest.preference("_shards:" + params.shardId().id()); client.search( @@ -71,7 +71,11 @@ protected void nodeOperation( ); } - private void fork(final AllocatedPersistentTask task, final DownsampleShardTaskParams params, final SearchHit[] lastRollupTsidHits) { + private void fork( + final AllocatedPersistentTask task, + final DownsampleShardTaskParams params, + final SearchHit[] lastDownsampledTsidHits + ) { client.threadPool().executor(Downsample.DOWSAMPLE_TASK_THREAD_POOL_NAME).execute(new AbstractRunnable() { @Override public void onFailure(Exception e) { @@ -80,37 +84,37 @@ public void onFailure(Exception e) { @Override protected void doRun() throws Exception { - startRollupShardIndexer(task, params, lastRollupTsidHits); + startDownsampleShardIndexer(task, params, lastDownsampledTsidHits); } }); } - private void startRollupShardIndexer( + private void startDownsampleShardIndexer( final AllocatedPersistentTask task, final DownsampleShardTaskParams params, - final SearchHit[] lastRollupTsidHits + final SearchHit[] lastDownsampleTsidHits ) { - final RollupShardPersistentTaskState initialState = lastRollupTsidHits.length == 0 - ? new RollupShardPersistentTaskState(RollupShardIndexerStatus.INITIALIZED, null) - : new RollupShardPersistentTaskState( - RollupShardIndexerStatus.STARTED, - Arrays.stream(lastRollupTsidHits).findFirst().get().field("_tsid").getValue() + final DownsampleShardPersistentTaskState initialState = lastDownsampleTsidHits.length == 0 + ? new DownsampleShardPersistentTaskState(DownsampleShardIndexerStatus.INITIALIZED, null) + : new DownsampleShardPersistentTaskState( + DownsampleShardIndexerStatus.STARTED, + Arrays.stream(lastDownsampleTsidHits).findFirst().get().field("_tsid").getValue() ); - final RollupShardIndexer rollupShardIndexer = new RollupShardIndexer( - (RollupShardTask) task, + final DownsampleShardIndexer downsampleShardIndexer = new DownsampleShardIndexer( + (DownsampleShardTask) task, client, getIndexService(indicesService, params), params.shardId(), - params.rollupIndex(), + params.downsampleIndex(), params.downsampleConfig(), params.metrics(), params.labels(), initialState ); try { - rollupShardIndexer.execute(); + downsampleShardIndexer.execute(); task.markAsCompleted(); - } catch (final RollupShardIndexerException e) { + } catch (final DownsampleShardIndexerException e) { if (e.isRetriable()) { logger.error("Downsampling task [" + task.getPersistentTaskId() + " retriable failure [" + e.getMessage() + "]"); task.markAsLocallyAborted(e.getMessage()); @@ -138,12 +142,12 @@ protected AllocatedPersistentTask createTask( final Map headers ) { final DownsampleShardTaskParams params = taskInProgress.getParams(); - return new RollupShardTask( + return new DownsampleShardTask( id, type, action, parentTaskId, - params.rollupIndex(), + params.downsampleIndex(), params.indexStartTimeMillis(), params.indexEndTimeMillis(), params.downsampleConfig(), diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardTaskParams.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardTaskParams.java index 4774dfe038874..a49ef3c306fe8 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardTaskParams.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardTaskParams.java @@ -17,7 +17,7 @@ import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentParser; -import org.elasticsearch.xpack.core.rollup.action.RollupShardTask; +import org.elasticsearch.xpack.core.downsample.DownsampleShardTask; import java.io.IOException; import java.util.Arrays; @@ -26,7 +26,7 @@ public record DownsampleShardTaskParams( DownsampleConfig downsampleConfig, - String rollupIndex, + String downsampleIndex, long indexStartTimeMillis, long indexEndTimeMillis, ShardId shardId, @@ -34,9 +34,9 @@ public record DownsampleShardTaskParams( String[] labels ) implements PersistentTaskParams { - public static final String NAME = RollupShardTask.TASK_NAME; + public static final String NAME = DownsampleShardTask.TASK_NAME; private static final ParseField DOWNSAMPLE_CONFIG = new ParseField("downsample_config"); - private static final ParseField ROLLUP_INDEX = new ParseField("rollup_index"); + private static final ParseField DOWNSAMPLE_INDEX = new ParseField("rollup_index"); private static final ParseField INDEX_START_TIME_MILLIS = new ParseField("index_start_time_millis"); private static final ParseField INDEX_END_TIME_MILLIS = new ParseField("index_end_time_millis"); private static final ParseField SHARD_ID = new ParseField("shard_id"); @@ -50,7 +50,7 @@ public record DownsampleShardTaskParams( (p, c) -> DownsampleConfig.fromXContent(p), DOWNSAMPLE_CONFIG ); - PARSER.declareString(DownsampleShardTaskParams.Builder::rollupIndex, ROLLUP_INDEX); + PARSER.declareString(DownsampleShardTaskParams.Builder::downsampleIndex, DOWNSAMPLE_INDEX); PARSER.declareLong(DownsampleShardTaskParams.Builder::indexStartTimeMillis, INDEX_START_TIME_MILLIS); PARSER.declareLong(DownsampleShardTaskParams.Builder::indexEndTimeMillis, INDEX_END_TIME_MILLIS); PARSER.declareString(DownsampleShardTaskParams.Builder::shardId, SHARD_ID); @@ -74,7 +74,7 @@ public record DownsampleShardTaskParams( public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); builder.field(DOWNSAMPLE_CONFIG.getPreferredName(), downsampleConfig); - builder.field(ROLLUP_INDEX.getPreferredName(), rollupIndex); + builder.field(DOWNSAMPLE_INDEX.getPreferredName(), downsampleIndex); builder.field(INDEX_START_TIME_MILLIS.getPreferredName(), indexStartTimeMillis); builder.field(INDEX_END_TIME_MILLIS.getPreferredName(), indexEndTimeMillis); builder.field(SHARD_ID.getPreferredName(), shardId); @@ -96,7 +96,7 @@ public TransportVersion getMinimalSupportedVersion() { @Override public void writeTo(StreamOutput out) throws IOException { downsampleConfig.writeTo(out); - out.writeString(rollupIndex); + out.writeString(downsampleIndex); out.writeVLong(indexStartTimeMillis); out.writeVLong(indexEndTimeMillis); shardId.writeTo(out); @@ -130,7 +130,7 @@ public boolean equals(Object o) { return indexStartTimeMillis == that.indexStartTimeMillis && indexEndTimeMillis == that.indexEndTimeMillis && Objects.equals(downsampleConfig, that.downsampleConfig) - && Objects.equals(rollupIndex, that.rollupIndex) + && Objects.equals(downsampleIndex, that.downsampleIndex) && Objects.equals(shardId.id(), that.shardId.id()) && Objects.equals(shardId.getIndexName(), that.shardId.getIndexName()) && Arrays.equals(metrics, that.metrics) @@ -141,7 +141,7 @@ public boolean equals(Object o) { public int hashCode() { int result = Objects.hash( downsampleConfig, - rollupIndex, + downsampleIndex, indexStartTimeMillis, indexEndTimeMillis, shardId.id(), @@ -154,7 +154,7 @@ public int hashCode() { public static class Builder { DownsampleConfig downsampleConfig; - String rollupIndex; + String downsampleIndex; long indexStartTimeMillis; long indexEndTimeMillis; ShardId shardId; @@ -166,8 +166,8 @@ public Builder downsampleConfig(final DownsampleConfig downsampleConfig) { return this; } - public Builder rollupIndex(final String rollupIndex) { - this.rollupIndex = rollupIndex; + public Builder downsampleIndex(final String downsampleIndex) { + this.downsampleIndex = downsampleIndex; return this; } @@ -199,7 +199,7 @@ public Builder labels(final List labels) { public DownsampleShardTaskParams build() { return new DownsampleShardTaskParams( downsampleConfig, - rollupIndex, + downsampleIndex, indexStartTimeMillis, indexEndTimeMillis, shardId, diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/FieldValueFetcher.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/FieldValueFetcher.java index 54a73e564ff87..2788932a228a8 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/FieldValueFetcher.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/FieldValueFetcher.java @@ -31,13 +31,13 @@ class FieldValueFetcher { protected final String name; protected final MappedFieldType fieldType; protected final IndexFieldData fieldData; - protected final AbstractDownsampleFieldProducer rollupFieldProducer; + protected final AbstractDownsampleFieldProducer fieldProducer; protected FieldValueFetcher(String name, MappedFieldType fieldType, IndexFieldData fieldData) { this.name = name; this.fieldType = fieldType; this.fieldData = fieldData; - this.rollupFieldProducer = createRollupFieldProducer(); + this.fieldProducer = createieldProducer(); } public String name() { @@ -49,11 +49,11 @@ public FormattedDocValues getLeaf(LeafReaderContext context) { return fieldData.load(context).getFormattedValues(format); } - public AbstractDownsampleFieldProducer rollupFieldProducer() { - return rollupFieldProducer; + public AbstractDownsampleFieldProducer fieldProducer() { + return fieldProducer; } - private AbstractDownsampleFieldProducer createRollupFieldProducer() { + private AbstractDownsampleFieldProducer createieldProducer() { if (fieldType.getMetricType() != null) { return switch (fieldType.getMetricType()) { case GAUGE -> new MetricFieldProducer.GaugeMetricFieldProducer(name()); @@ -81,7 +81,7 @@ static List create(SearchExecutionContext context, String[] f if (fieldType instanceof AggregateDoubleMetricFieldMapper.AggregateDoubleMetricFieldType aggMetricFieldType) { // If the field is an aggregate_metric_double field, we should load all its subfields - // This is a rollup-of-rollup case + // This is a downsample-of-downsample case for (NumberFieldMapper.NumberFieldType metricSubField : aggMetricFieldType.getMetricFields().values()) { if (context.fieldExistsInIndex(metricSubField.name())) { IndexFieldData fieldData = context.getForField(metricSubField, MappedFieldType.FielddataOperation.SEARCH); diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java index b938e728df645..c8a3237f165a6 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java @@ -76,9 +76,9 @@ import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.downsample.DownsampleAction; +import org.elasticsearch.xpack.core.downsample.DownsampleShardPersistentTaskState; +import org.elasticsearch.xpack.core.downsample.DownsampleShardTask; import org.elasticsearch.xpack.core.ilm.LifecycleSettings; -import org.elasticsearch.xpack.core.rollup.action.RollupShardPersistentTaskState; -import org.elasticsearch.xpack.core.rollup.action.RollupShardTask; import org.elasticsearch.xpack.core.security.authz.AuthorizationServiceField; import org.elasticsearch.xpack.core.security.authz.accesscontrol.IndicesAccessControl; @@ -99,7 +99,7 @@ /** * The master downsample action that coordinates * - creating the downsample index - * - instantiating {@link RollupShardIndexer}s to index downsample documents + * - instantiating {@link DownsampleShardIndexer}s to index downsample documents * - cleaning up state */ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAction { @@ -170,7 +170,7 @@ public TransportDownsampleAction( this.metadataCreateIndexService = metadataCreateIndexService; this.indexScopedSettings = indexScopedSettings; this.threadContext = threadPool.getThreadContext(); - this.taskQueue = clusterService.createTaskQueue("rollup", Priority.URGENT, STATE_UPDATE_TASK_EXECUTOR); + this.taskQueue = clusterService.createTaskQueue("downsample", Priority.URGENT, STATE_UPDATE_TASK_EXECUTOR); this.persistentTasksService = persistentTasksService; } @@ -192,7 +192,7 @@ protected void masterOperation( if (hasDocumentLevelPermissions || hasFieldLevelSecurity) { listener.onFailure( new ElasticsearchException( - "Rollup forbidden for index [" + sourceIndexName + "] with document level or field level security settings." + "Downsample forbidden for index [" + sourceIndexName + "] with document level or field level security settings." ) ); return; @@ -210,7 +210,7 @@ protected void masterOperation( if (IndexSettings.MODE.get(sourceIndexMetadata.getSettings()) != IndexMode.TIME_SERIES) { listener.onFailure( new ElasticsearchException( - "Rollup requires setting [" + "Downsample requires setting [" + IndexSettings.MODE.getKey() + "=" + IndexMode.TIME_SERIES @@ -226,7 +226,7 @@ protected void masterOperation( if (state.blocks().indexBlocked(ClusterBlockLevel.WRITE, sourceIndexName) == false) { listener.onFailure( new ElasticsearchException( - "Rollup requires setting [" + IndexMetadata.SETTING_BLOCKS_WRITE + " = true] for index [" + sourceIndexName + "]" + "Downsample requires setting [" + IndexMetadata.SETTING_BLOCKS_WRITE + " = true] for index [" + sourceIndexName + "]" ) ); return; @@ -274,7 +274,7 @@ protected void masterOperation( .filter(entry -> sourceIndexName.equals(entry.getKey())) .findFirst() .map(mappingMetadata -> mappingMetadata.getValue().sourceAsMap()) - .orElseThrow(() -> new IllegalArgumentException("No mapping found for rollup source index [" + sourceIndexName + "]")); + .orElseThrow(() -> new IllegalArgumentException("No mapping found for downsample source index [" + sourceIndexName + "]")); // 2. Extract downsample config from index mappings final MapperService mapperService = indicesService.createIndexMapperServiceForValidation(sourceIndexMetadata); @@ -330,7 +330,7 @@ protected void masterOperation( labelFields ); } else { - listener.onFailure(new ElasticsearchException("Failed to create rollup index [" + downsampleIndexName + "]")); + listener.onFailure(new ElasticsearchException("Failed to create downsample index [" + downsampleIndexName + "]")); } }, e -> { if (e instanceof ResourceAlreadyExistsException) { @@ -385,7 +385,7 @@ private void performShardDownsampling( // NOTE: don't need to wait if the persistent task completed and was removed return true; } - RollupShardPersistentTaskState runningPersistentTaskState = (RollupShardPersistentTaskState) runningTask.getState(); + DownsampleShardPersistentTaskState runningPersistentTaskState = (DownsampleShardPersistentTaskState) runningTask.getState(); return runningPersistentTaskState != null && runningPersistentTaskState.done(); }; var taskListener = new PersistentTasksService.WaitForPersistentTaskListener<>() { @@ -407,7 +407,7 @@ public void onFailure(Exception e) { }; persistentTasksService.sendStartRequest( persistentTaskId, - RollupShardTask.TASK_NAME, + DownsampleShardTask.TASK_NAME, params, ActionListener.wrap( startedTask -> persistentTasksService.waitForPersistentTaskCondition( diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleIndexerAction.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleIndexerAction.java index d704cf942b695..3fa32c9f86644 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleIndexerAction.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleIndexerAction.java @@ -29,16 +29,16 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.downsample.DownsampleIndexerAction; -import org.elasticsearch.xpack.core.rollup.action.RollupShardIndexerStatus; -import org.elasticsearch.xpack.core.rollup.action.RollupShardPersistentTaskState; -import org.elasticsearch.xpack.core.rollup.action.RollupShardTask; +import org.elasticsearch.xpack.core.downsample.DownsampleShardIndexerStatus; +import org.elasticsearch.xpack.core.downsample.DownsampleShardPersistentTaskState; +import org.elasticsearch.xpack.core.downsample.DownsampleShardTask; import java.io.IOException; import java.util.Arrays; import java.util.concurrent.atomic.AtomicReferenceArray; /** - * A {@link TransportBroadcastAction} that rollups all the shards of a source index into a new rollup index. + * A {@link TransportBroadcastAction} that downsamples all the shards of a source index into a new downsample index. * * TODO: Enforce that we don't retry on another replica if we throw an error after sending some buckets. */ @@ -135,16 +135,16 @@ protected DownsampleIndexerAction.ShardDownsampleResponse shardOperation( Task task ) throws IOException { IndexService indexService = indicesService.indexService(request.shardId().getIndex()); - RollupShardIndexer indexer = new RollupShardIndexer( - (RollupShardTask) task, + DownsampleShardIndexer indexer = new DownsampleShardIndexer( + (DownsampleShardTask) task, client, indexService, request.shardId(), - request.getRollupIndex(), + request.getDownsampleIndex(), request.getRollupConfig(), request.getMetricFields(), request.getLabelFields(), - new RollupShardPersistentTaskState(RollupShardIndexerStatus.INITIALIZED, null) + new DownsampleShardPersistentTaskState(DownsampleShardIndexerStatus.INITIALIZED, null) ); return indexer.execute(); } diff --git a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java index 8d8956735bc3f..95568e27932a0 100644 --- a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java +++ b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java @@ -84,12 +84,12 @@ import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; import org.elasticsearch.xpack.core.downsample.DownsampleAction; import org.elasticsearch.xpack.core.downsample.DownsampleIndexerAction; +import org.elasticsearch.xpack.core.downsample.DownsampleShardIndexerStatus; +import org.elasticsearch.xpack.core.downsample.DownsampleShardPersistentTaskState; +import org.elasticsearch.xpack.core.downsample.DownsampleShardTask; import org.elasticsearch.xpack.core.ilm.LifecycleSettings; import org.elasticsearch.xpack.core.ilm.RolloverAction; import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers; -import org.elasticsearch.xpack.core.rollup.action.RollupShardIndexerStatus; -import org.elasticsearch.xpack.core.rollup.action.RollupShardPersistentTaskState; -import org.elasticsearch.xpack.core.rollup.action.RollupShardTask; import org.elasticsearch.xpack.ilm.IndexLifecycle; import org.junit.Before; @@ -148,7 +148,7 @@ public class DownsampleActionSingleNodeTests extends ESSingleNodeTestCase { private static final long MAX_NUM_BUCKETS = 10; public static final TimeValue TIMEOUT = new TimeValue(1, TimeUnit.MINUTES); - private String sourceIndex, rollupIndex; + private String sourceIndex, downsampleIndex; private long startTime; private int docCount, numOfShards, numOfReplicas; private List dimensionValues; @@ -167,7 +167,7 @@ protected Collection> getPlugins() { @Before public void setup() throws IOException { sourceIndex = randomAlphaOfLength(14).toLowerCase(Locale.ROOT); - rollupIndex = "rollup-" + sourceIndex; + downsampleIndex = "downsample-" + sourceIndex; startTime = randomLongBetween(946769284000L, 1607470084000L); // random date between 2000-2020 docCount = randomIntBetween(1000, 9000); numOfShards = randomIntBetween(1, 1); @@ -240,7 +240,7 @@ public void setup() throws IOException { assertAcked(indicesAdmin().prepareCreate(sourceIndex).setSettings(settings.build()).setMapping(mapping).get()); } - public void testRollupIndex() throws IOException { + public void testDownsampleIndex() throws IOException { DownsampleConfig config = new DownsampleConfig(randomInterval()); SourceSupplier sourceSupplier = () -> { String ts = randomDateForInterval(config.getInterval()); @@ -295,11 +295,11 @@ public void testRollupIndex() throws IOException { }; bulkIndex(sourceSupplier); prepareSourceIndex(sourceIndex, true); - rollup(sourceIndex, rollupIndex, config); - assertRollupIndex(sourceIndex, rollupIndex, config); + downsample(sourceIndex, downsampleIndex, config); + assertDownsampleIndex(sourceIndex, downsampleIndex, config); } - public void testRollupOfRollups() throws IOException { + public void testDownsampleOfDownsample() throws IOException { int intervalMinutes = randomIntBetween(10, 120); DownsampleConfig config = new DownsampleConfig(DateHistogramInterval.minutes(intervalMinutes)); SourceSupplier sourceSupplier = () -> { @@ -332,14 +332,14 @@ public void testRollupOfRollups() throws IOException { // Downsample the source index prepareSourceIndex(sourceIndex, true); - rollup(sourceIndex, rollupIndex, config); - assertRollupIndex(sourceIndex, rollupIndex, config); + downsample(sourceIndex, downsampleIndex, config); + assertDownsampleIndex(sourceIndex, downsampleIndex, config); - // Downsample the rollup index. The downsampling interval is a multiple of the previous downsampling interval. - String rollupIndex2 = rollupIndex + "-2"; + // Downsample the downsample index. The downsampling interval is a multiple of the previous downsampling interval. + String downsampleIndex2 = downsampleIndex + "-2"; DownsampleConfig config2 = new DownsampleConfig(DateHistogramInterval.minutes(intervalMinutes * randomIntBetween(2, 50))); - rollup(rollupIndex, rollupIndex2, config2); - assertRollupIndex(sourceIndex, rollupIndex2, config2); + downsample(downsampleIndex, downsampleIndex2, config2); + assertDownsampleIndex(sourceIndex, downsampleIndex2, config2); } private Date randomDate() { @@ -383,15 +383,15 @@ public void testCopyIndexSettings() throws IOException { }; bulkIndex(sourceSupplier); prepareSourceIndex(sourceIndex, true); - rollup(sourceIndex, rollupIndex, config); + downsample(sourceIndex, downsampleIndex, config); - GetIndexResponse indexSettingsResp = indicesAdmin().prepareGetIndex().addIndices(sourceIndex, rollupIndex).get(); - assertRollupIndexSettings(sourceIndex, rollupIndex, indexSettingsResp); + GetIndexResponse indexSettingsResp = indicesAdmin().prepareGetIndex().addIndices(sourceIndex, downsampleIndex).get(); + assertDownsampleIndexSettings(sourceIndex, downsampleIndex, indexSettingsResp); for (String key : settings.keySet()) { if (LifecycleSettings.LIFECYCLE_NAME_SETTING.getKey().equals(key)) { - assertNull(indexSettingsResp.getSetting(rollupIndex, key)); + assertNull(indexSettingsResp.getSetting(downsampleIndex, key)); } else { - assertEquals(settings.get(key), indexSettingsResp.getSetting(rollupIndex, key)); + assertEquals(settings.get(key), indexSettingsResp.getSetting(downsampleIndex, key)); } } } @@ -400,29 +400,29 @@ public void testNullSourceIndexName() { DownsampleConfig config = new DownsampleConfig(randomInterval()); ActionRequestValidationException exception = expectThrows( ActionRequestValidationException.class, - () -> rollup(null, rollupIndex, config) + () -> downsample(null, downsampleIndex, config) ); assertThat(exception.getMessage(), containsString("source index is missing")); } - public void testNullRollupIndexName() { + public void testNullDownsampleIndexName() { DownsampleConfig config = new DownsampleConfig(randomInterval()); ActionRequestValidationException exception = expectThrows( ActionRequestValidationException.class, - () -> rollup(sourceIndex, null, config) + () -> downsample(sourceIndex, null, config) ); assertThat(exception.getMessage(), containsString("target index name is missing")); } - public void testNullRollupConfig() { + public void testNullDownsampleConfig() { ActionRequestValidationException exception = expectThrows( ActionRequestValidationException.class, - () -> rollup(sourceIndex, rollupIndex, null) + () -> downsample(sourceIndex, downsampleIndex, null) ); assertThat(exception.getMessage(), containsString("downsample configuration is missing")); } - public void testRollupSparseMetrics() throws IOException { + public void testDownsampleSparseMetrics() throws IOException { DownsampleConfig config = new DownsampleConfig(randomInterval()); SourceSupplier sourceSupplier = () -> { XContentBuilder builder = XContentFactory.jsonBuilder() @@ -439,32 +439,32 @@ public void testRollupSparseMetrics() throws IOException { }; bulkIndex(sourceSupplier); prepareSourceIndex(sourceIndex, true); - rollup(sourceIndex, rollupIndex, config); - assertRollupIndex(sourceIndex, rollupIndex, config); + downsample(sourceIndex, downsampleIndex, config); + assertDownsampleIndex(sourceIndex, downsampleIndex, config); } - public void testCannotRollupToExistingIndex() throws Exception { + public void testCannotDownsampleToExistingIndex() throws Exception { DownsampleConfig config = new DownsampleConfig(randomInterval()); prepareSourceIndex(sourceIndex, true); - // Create an empty index with the same name as the rollup index - assertAcked(indicesAdmin().prepareCreate(rollupIndex).setSettings(indexSettings(1, 0)).get()); + // Create an empty index with the same name as the downsample index + assertAcked(indicesAdmin().prepareCreate(downsampleIndex).setSettings(indexSettings(1, 0)).get()); ResourceAlreadyExistsException exception = expectThrows( ResourceAlreadyExistsException.class, - () -> rollup(sourceIndex, rollupIndex, config) + () -> downsample(sourceIndex, downsampleIndex, config) ); - assertThat(exception.getMessage(), containsString(rollupIndex)); + assertThat(exception.getMessage(), containsString(downsampleIndex)); } - public void testRollupEmptyIndex() throws IOException { + public void testDownsampleEmptyIndex() throws IOException { DownsampleConfig config = new DownsampleConfig(randomInterval()); // Source index has been created in the setup() method prepareSourceIndex(sourceIndex, true); - rollup(sourceIndex, rollupIndex, config); - assertRollupIndex(sourceIndex, rollupIndex, config); + downsample(sourceIndex, downsampleIndex, config); + assertDownsampleIndex(sourceIndex, downsampleIndex, config); } - public void testRollupIndexWithNoMetrics() throws IOException { + public void testDownsampleIndexWithNoMetrics() throws IOException { // Create a source index that contains no metric fields in its mapping String sourceIndex = "no-metrics-idx-" + randomAlphaOfLength(5).toLowerCase(Locale.ROOT); indicesAdmin().prepareCreate(sourceIndex) @@ -486,25 +486,28 @@ public void testRollupIndexWithNoMetrics() throws IOException { DownsampleConfig config = new DownsampleConfig(randomInterval()); prepareSourceIndex(sourceIndex, true); - rollup(sourceIndex, rollupIndex, config); - assertRollupIndex(sourceIndex, rollupIndex, config); + downsample(sourceIndex, downsampleIndex, config); + assertDownsampleIndex(sourceIndex, downsampleIndex, config); } - public void testCannotRollupWriteableIndex() { + public void testCannotDownsampleWriteableIndex() { DownsampleConfig config = new DownsampleConfig(randomInterval()); // Source index has been created in the setup() method and is empty and still writable - Exception exception = expectThrows(ElasticsearchException.class, () -> rollup(sourceIndex, rollupIndex, config)); - assertThat(exception.getMessage(), containsString("Rollup requires setting [index.blocks.write = true] for index")); + Exception exception = expectThrows(ElasticsearchException.class, () -> downsample(sourceIndex, downsampleIndex, config)); + assertThat(exception.getMessage(), containsString("Downsample requires setting [index.blocks.write = true] for index")); } - public void testCannotRollupMissingIndex() { + public void testCannotDownsampleMissingIndex() { DownsampleConfig config = new DownsampleConfig(randomInterval()); - IndexNotFoundException exception = expectThrows(IndexNotFoundException.class, () -> rollup("missing-index", rollupIndex, config)); + IndexNotFoundException exception = expectThrows( + IndexNotFoundException.class, + () -> downsample("missing-index", downsampleIndex, config) + ); assertEquals("missing-index", exception.getIndex().getName()); assertThat(exception.getMessage(), containsString("no such index [missing-index]")); } - public void testCannotRollupWhileOtherRollupInProgress() throws Exception { + public void testCannotDownsampleWhileOtherDownsampleInProgress() throws Exception { DownsampleConfig config = new DownsampleConfig(randomInterval()); SourceSupplier sourceSupplier = () -> XContentFactory.jsonBuilder() .startObject() @@ -514,7 +517,7 @@ public void testCannotRollupWhileOtherRollupInProgress() throws Exception { .endObject(); bulkIndex(sourceSupplier); prepareSourceIndex(sourceIndex, true); - var rollupListener = new ActionListener() { + var downsampleListener = new ActionListener() { boolean success; @Override @@ -522,33 +525,33 @@ public void onResponse(AcknowledgedResponse acknowledgedResponse) { if (acknowledgedResponse.isAcknowledged()) { success = true; } else { - fail("Failed to receive rollup acknowledgement"); + fail("Failed to receive downsample acknowledgement"); } } @Override public void onFailure(Exception e) { - fail("Rollup failed: " + e.getMessage()); + fail("Downsample failed: " + e.getMessage()); } }; client().execute( DownsampleAction.INSTANCE, - new DownsampleAction.Request(sourceIndex, rollupIndex, TIMEOUT, config), - rollupListener + new DownsampleAction.Request(sourceIndex, downsampleIndex, TIMEOUT, config), + downsampleListener ); assertBusy(() -> { try { - assertEquals(indicesAdmin().prepareGetIndex().addIndices(rollupIndex).get().getIndices().length, 1); + assertEquals(indicesAdmin().prepareGetIndex().addIndices(downsampleIndex).get().getIndices().length, 1); } catch (IndexNotFoundException e) { - fail("rollup index has not been created"); + fail("downsample index has not been created"); } }); - rollup(sourceIndex, rollupIndex, config); - // We must wait until the in-progress rollup ends, otherwise data will not be cleaned up - assertBusy(() -> assertTrue("In progress rollup did not complete", rollupListener.success), 60, TimeUnit.SECONDS); + downsample(sourceIndex, downsampleIndex, config); + // We must wait until the in-progress downsample ends, otherwise data will not be cleaned up + assertBusy(() -> assertTrue("In progress downsample did not complete", downsampleListener.success), 60, TimeUnit.SECONDS); } - public void testRollupDatastream() throws Exception { + public void testDownsampleDatastream() throws Exception { DownsampleConfig config = new DownsampleConfig(randomInterval()); String dataStreamName = createDataStream(); @@ -567,21 +570,21 @@ public void testRollupDatastream() throws Exception { String sourceIndex = rollover(dataStreamName).getOldIndex(); prepareSourceIndex(sourceIndex, true); - String rollupIndex = "rollup-" + sourceIndex; - rollup(sourceIndex, rollupIndex, config); - assertRollupIndex(sourceIndex, rollupIndex, config); + String downsampleIndex = "downsample-" + sourceIndex; + downsample(sourceIndex, downsampleIndex, config); + assertDownsampleIndex(sourceIndex, downsampleIndex, config); var r = client().execute(GetDataStreamAction.INSTANCE, new GetDataStreamAction.Request(new String[] { dataStreamName })).get(); assertEquals(1, r.getDataStreams().size()); List indices = r.getDataStreams().get(0).getDataStream().getIndices(); - // Assert that the rollup index has not been added to the data stream - assertTrue(indices.stream().filter(i -> i.getName().equals(rollupIndex)).toList().isEmpty()); + // Assert that the downsample index has not been added to the data stream + assertTrue(indices.stream().filter(i -> i.getName().equals(downsampleIndex)).toList().isEmpty()); // Assert that the source index is still a member of the data stream assertFalse(indices.stream().filter(i -> i.getName().equals(sourceIndex)).toList().isEmpty()); } - public void testCancelRollupIndexer() throws IOException { - // create rollup config and index documents into source index + public void testCancelDownsampleIndexer() throws IOException { + // create downsample config and index documents into source index DownsampleConfig config = new DownsampleConfig(randomInterval()); SourceSupplier sourceSupplier = () -> XContentFactory.jsonBuilder() .startObject() @@ -598,12 +601,12 @@ public void testCancelRollupIndexer() throws IOException { int shardNum = randomIntBetween(0, numOfShards - 1); IndexShard shard = indexService.getShard(shardNum); - RollupShardTask task = new RollupShardTask( + DownsampleShardTask task = new DownsampleShardTask( randomLong(), "rollup", "action", TaskId.EMPTY_TASK_ID, - rollupIndex, + downsampleIndex, indexService.getIndexSettings().getTimestampBounds().startTime(), indexService.getIndexSettings().getTimestampBounds().endTime(), config, @@ -614,24 +617,24 @@ public void testCancelRollupIndexer() throws IOException { TaskCancelHelper.cancel(task, "test cancel"); // re-use source index as temp index for test - RollupShardIndexer indexer = new RollupShardIndexer( + DownsampleShardIndexer indexer = new DownsampleShardIndexer( task, client(), indexService, shard.shardId(), - rollupIndex, + downsampleIndex, config, new String[] { FIELD_NUMERIC_1, FIELD_NUMERIC_2 }, new String[] {}, - new RollupShardPersistentTaskState(RollupShardIndexerStatus.INITIALIZED, null) + new DownsampleShardPersistentTaskState(DownsampleShardIndexerStatus.INITIALIZED, null) ); - RollupShardIndexerException exception = expectThrows(RollupShardIndexerException.class, () -> indexer.execute()); - assertThat(exception.getCause().getMessage(), equalTo("Shard [" + sourceIndex + "][" + shardNum + "] rollup cancelled")); + DownsampleShardIndexerException exception = expectThrows(DownsampleShardIndexerException.class, () -> indexer.execute()); + assertThat(exception.getCause().getMessage(), equalTo("Shard [" + sourceIndex + "][" + shardNum + "] downsample cancelled")); } - public void testRollupBulkFailed() throws IOException { - // create rollup config and index documents into source index + public void testDownsampleBulkFailed() throws IOException { + // create downsample config and index documents into source index DownsampleConfig config = new DownsampleConfig(randomInterval()); SourceSupplier sourceSupplier = () -> XContentFactory.jsonBuilder() .startObject() @@ -647,12 +650,12 @@ public void testRollupBulkFailed() throws IOException { IndexService indexService = indexServices.indexServiceSafe(srcIndex); int shardNum = randomIntBetween(0, numOfShards - 1); IndexShard shard = indexService.getShard(shardNum); - RollupShardTask task = new RollupShardTask( + DownsampleShardTask task = new DownsampleShardTask( randomLong(), "rollup", "action", TaskId.EMPTY_TASK_ID, - rollupIndex, + downsampleIndex, indexService.getIndexSettings().getTimestampBounds().startTime(), indexService.getIndexSettings().getTimestampBounds().endTime(), config, @@ -662,22 +665,22 @@ public void testRollupBulkFailed() throws IOException { task.testInit(mock(PersistentTasksService.class), mock(TaskManager.class), randomAlphaOfLength(5), randomIntBetween(1, 5)); // re-use source index as temp index for test - RollupShardIndexer indexer = new RollupShardIndexer( + DownsampleShardIndexer indexer = new DownsampleShardIndexer( task, client(), indexService, shard.shardId(), - rollupIndex, + downsampleIndex, config, new String[] { FIELD_NUMERIC_1, FIELD_NUMERIC_2 }, new String[] {}, - new RollupShardPersistentTaskState(RollupShardIndexerStatus.INITIALIZED, null) + new DownsampleShardPersistentTaskState(DownsampleShardIndexerStatus.INITIALIZED, null) ); - // block rollup index + // block downsample index assertAcked( - indicesAdmin().preparePutTemplate(rollupIndex) - .setPatterns(List.of(rollupIndex)) + indicesAdmin().preparePutTemplate(downsampleIndex) + .setPatterns(List.of(downsampleIndex)) .setSettings(Settings.builder().put("index.blocks.write", "true").build()) .get() ); @@ -698,7 +701,7 @@ public void testRollupBulkFailed() throws IOException { } public void testTooManyBytesInFlight() throws IOException { - // create rollup config and index documents into source index + // create downsample config and index documents into source index DownsampleConfig config = new DownsampleConfig(randomInterval()); SourceSupplier sourceSupplier = () -> XContentFactory.jsonBuilder() .startObject() @@ -714,12 +717,12 @@ public void testTooManyBytesInFlight() throws IOException { IndexService indexService = indexServices.indexServiceSafe(srcIndex); int shardNum = randomIntBetween(0, numOfShards - 1); IndexShard shard = indexService.getShard(shardNum); - RollupShardTask task = new RollupShardTask( + DownsampleShardTask task = new DownsampleShardTask( randomLong(), "rollup", "action", TaskId.EMPTY_TASK_ID, - rollupIndex, + downsampleIndex, indexService.getIndexSettings().getTimestampBounds().startTime(), indexService.getIndexSettings().getTimestampBounds().endTime(), config, @@ -729,27 +732,27 @@ public void testTooManyBytesInFlight() throws IOException { task.testInit(mock(PersistentTasksService.class), mock(TaskManager.class), randomAlphaOfLength(5), randomIntBetween(1, 5)); // re-use source index as temp index for test - RollupShardIndexer indexer = new RollupShardIndexer( + DownsampleShardIndexer indexer = new DownsampleShardIndexer( task, client(), indexService, shard.shardId(), - rollupIndex, + downsampleIndex, config, new String[] { FIELD_NUMERIC_1, FIELD_NUMERIC_2 }, new String[] {}, - new RollupShardPersistentTaskState(RollupShardIndexerStatus.INITIALIZED, null) + new DownsampleShardPersistentTaskState(DownsampleShardIndexerStatus.INITIALIZED, null) ); /* * Here we set the batch size and the total bytes in flight size to tiny numbers so that we are guaranteed to trigger the bulk - * processor to reject some calls to add(), so that we can make sure RollupShardIndexer keeps trying until success. + * processor to reject some calls to add(), so that we can make sure DownsampleShardIndexer keeps trying until success. */ - indexer.rollupMaxBytesInFlight = ByteSizeValue.ofBytes(1024); - indexer.rollupBulkSize = ByteSizeValue.ofBytes(512); + indexer.downsampleMaxBytesInFlight = ByteSizeValue.ofBytes(1024); + indexer.downsampleBulkSize = ByteSizeValue.ofBytes(512); indexer.execute(); } - public void testRollupStats() throws IOException { + public void testDownsampleStats() throws IOException { final PersistentTasksService persistentTasksService = mock(PersistentTasksService.class); final DownsampleConfig config = new DownsampleConfig(randomInterval()); final SourceSupplier sourceSupplier = () -> XContentFactory.jsonBuilder() @@ -766,12 +769,12 @@ public void testRollupStats() throws IOException { final IndexService indexService = indexServices.indexServiceSafe(resolvedSourceIndex); for (int shardNum = 0; shardNum < numOfShards; shardNum++) { final IndexShard shard = indexService.getShard(shardNum); - final RollupShardTask task = new RollupShardTask( + final DownsampleShardTask task = new DownsampleShardTask( randomLong(), "rollup", "action", TaskId.EMPTY_TASK_ID, - rollupIndex, + downsampleIndex, indexService.getIndexSettings().getTimestampBounds().startTime(), indexService.getIndexSettings().getTimestampBounds().endTime(), config, @@ -780,32 +783,32 @@ public void testRollupStats() throws IOException { ); task.testInit(persistentTasksService, mock(TaskManager.class), randomAlphaOfLength(5), randomIntBetween(1, 5)); - final RollupShardIndexer indexer = new RollupShardIndexer( + final DownsampleShardIndexer indexer = new DownsampleShardIndexer( task, client(), indexService, shard.shardId(), - rollupIndex, + downsampleIndex, config, new String[] { FIELD_NUMERIC_1, FIELD_NUMERIC_2 }, new String[] {}, - new RollupShardPersistentTaskState(RollupShardIndexerStatus.INITIALIZED, null) + new DownsampleShardPersistentTaskState(DownsampleShardIndexerStatus.INITIALIZED, null) ); assertEquals(0.0F, task.getDocsProcessedPercentage(), 0.001); - assertEquals(0L, task.getRollupBulkInfo().totalBulkCount()); - assertEquals(0L, task.getRollupBulkInfo().bulkTookSumMillis()); - assertEquals(0L, task.getRollupBulkInfo().bulkIngestSumMillis()); - assertEquals(RollupShardIndexerStatus.INITIALIZED, task.getRollupShardIndexerStatus()); + assertEquals(0L, task.getDownsampleBulkInfo().totalBulkCount()); + assertEquals(0L, task.getDownsampleBulkInfo().bulkTookSumMillis()); + assertEquals(0L, task.getDownsampleBulkInfo().bulkIngestSumMillis()); + assertEquals(DownsampleShardIndexerStatus.INITIALIZED, task.getDownsampleShardIndexerStatus()); final DownsampleIndexerAction.ShardDownsampleResponse executeResponse = indexer.execute(); - assertRollupIndexer(indexService, shardNum, task, executeResponse, task.getTotalShardDocCount()); + assertDownsampleIndexer(indexService, shardNum, task, executeResponse, task.getTotalShardDocCount()); } } - public void testResumeRollup() throws IOException { - // create rollup config and index documents into source index + public void testResumeDownsample() throws IOException { + // create downsample config and index documents into source index DownsampleConfig config = new DownsampleConfig(randomInterval()); SourceSupplier sourceSupplier = () -> XContentFactory.jsonBuilder() .startObject() @@ -822,12 +825,12 @@ public void testResumeRollup() throws IOException { int shardNum = randomIntBetween(0, numOfShards - 1); IndexShard shard = indexService.getShard(shardNum); - RollupShardTask task = new RollupShardTask( + DownsampleShardTask task = new DownsampleShardTask( randomLong(), "rollup", "action", TaskId.EMPTY_TASK_ID, - rollupIndex, + downsampleIndex, indexService.getIndexSettings().getTimestampBounds().startTime(), indexService.getIndexSettings().getTimestampBounds().endTime(), config, @@ -836,17 +839,17 @@ public void testResumeRollup() throws IOException { ); task.testInit(mock(PersistentTasksService.class), mock(TaskManager.class), randomAlphaOfLength(5), randomIntBetween(1, 5)); - RollupShardIndexer indexer = new RollupShardIndexer( + DownsampleShardIndexer indexer = new DownsampleShardIndexer( task, client(), indexService, shard.shardId(), - rollupIndex, + downsampleIndex, config, new String[] { FIELD_NUMERIC_1, FIELD_NUMERIC_2 }, new String[] {}, - new RollupShardPersistentTaskState( - RollupShardIndexerStatus.STARTED, + new DownsampleShardPersistentTaskState( + DownsampleShardIndexerStatus.STARTED, new BytesRef( new byte[] { 0x01, @@ -875,11 +878,11 @@ public void testResumeRollup() throws IOException { final DownsampleIndexerAction.ShardDownsampleResponse response2 = indexer.execute(); - assertRollupIndexer(indexService, shardNum, task, response2, task.getTotalShardDocCount()); + assertDownsampleIndexer(indexService, shardNum, task, response2, task.getTotalShardDocCount()); } - public void testResumeRollupPartial() throws IOException { - // create rollup config and index documents into source index + public void testResumeDownsamplePartial() throws IOException { + // create downsample config and index documents into source index DownsampleConfig config = new DownsampleConfig(randomInterval()); SourceSupplier sourceSupplier = () -> XContentFactory.jsonBuilder() .startObject() @@ -896,12 +899,12 @@ public void testResumeRollupPartial() throws IOException { int shardNum = randomIntBetween(0, numOfShards - 1); IndexShard shard = indexService.getShard(shardNum); - RollupShardTask task = new RollupShardTask( + DownsampleShardTask task = new DownsampleShardTask( randomLong(), "rollup", "action", TaskId.EMPTY_TASK_ID, - rollupIndex, + downsampleIndex, indexService.getIndexSettings().getTimestampBounds().startTime(), indexService.getIndexSettings().getTimestampBounds().endTime(), config, @@ -910,17 +913,17 @@ public void testResumeRollupPartial() throws IOException { ); task.testInit(mock(PersistentTasksService.class), mock(TaskManager.class), randomAlphaOfLength(5), randomIntBetween(1, 5)); - RollupShardIndexer indexer = new RollupShardIndexer( + DownsampleShardIndexer indexer = new DownsampleShardIndexer( task, client(), indexService, shard.shardId(), - rollupIndex, + downsampleIndex, config, new String[] { FIELD_NUMERIC_1, FIELD_NUMERIC_2 }, new String[] {}, - new RollupShardPersistentTaskState( - RollupShardIndexerStatus.STARTED, + new DownsampleShardPersistentTaskState( + DownsampleShardIndexerStatus.STARTED, new BytesRef( new byte[] { 0x01, @@ -955,13 +958,13 @@ public void testResumeRollupPartial() throws IOException { .getHits() .getHits().length; - assertRollupIndexer(indexService, shardNum, task, response2, dim2DocCount); + assertDownsampleIndexer(indexService, shardNum, task, response2, dim2DocCount); } - private static void assertRollupIndexer( + private static void assertDownsampleIndexer( final IndexService indexService, int shardNum, - final RollupShardTask task, + final DownsampleShardTask task, final DownsampleIndexerAction.ShardDownsampleResponse response, long totalShardDocCount ) { @@ -969,16 +972,16 @@ private static void assertRollupIndexer( assertEquals(task.getNumReceived(), totalShardDocCount); assertEquals(indexService.getShard(shardNum).docStats().getCount(), task.getTotalShardDocCount()); assertEquals(100.0D * task.getNumReceived() / task.getTotalShardDocCount(), task.getDocsProcessedPercentage(), 0.001); - assertTrue(task.getRollupBulkInfo().bulkTookSumMillis() >= 0); - assertEquals(task.getRollupBulkInfo().bulkIngestSumMillis(), task.getRollupBulkInfo().maxBulkIngestMillis()); - assertEquals(task.getRollupBulkInfo().bulkIngestSumMillis(), task.getRollupBulkInfo().minBulkIngestMillis()); - assertTrue(task.getRollupBulkInfo().bulkTookSumMillis() >= 0); - assertEquals(task.getRollupBulkInfo().bulkTookSumMillis(), task.getRollupBulkInfo().maxBulkTookMillis()); - assertEquals(task.getRollupBulkInfo().bulkTookSumMillis(), task.getRollupBulkInfo().minBulkTookMillis()); - assertEquals(1L, task.getRollupBulkInfo().totalBulkCount()); + assertTrue(task.getDownsampleBulkInfo().bulkTookSumMillis() >= 0); + assertEquals(task.getDownsampleBulkInfo().bulkIngestSumMillis(), task.getDownsampleBulkInfo().maxBulkIngestMillis()); + assertEquals(task.getDownsampleBulkInfo().bulkIngestSumMillis(), task.getDownsampleBulkInfo().minBulkIngestMillis()); + assertTrue(task.getDownsampleBulkInfo().bulkTookSumMillis() >= 0); + assertEquals(task.getDownsampleBulkInfo().bulkTookSumMillis(), task.getDownsampleBulkInfo().maxBulkTookMillis()); + assertEquals(task.getDownsampleBulkInfo().bulkTookSumMillis(), task.getDownsampleBulkInfo().minBulkTookMillis()); + assertEquals(1L, task.getDownsampleBulkInfo().totalBulkCount()); assertEquals(indexService.getIndexSettings().getTimestampBounds().startTime(), task.getIndexStartTimeMillis()); assertEquals(indexService.getIndexSettings().getTimestampBounds().endTime(), task.getIndexEndTimeMillis()); - assertEquals(RollupShardIndexerStatus.COMPLETED, task.getRollupShardIndexerStatus()); + assertEquals(DownsampleShardIndexerStatus.COMPLETED, task.getDownsampleShardIndexerStatus()); assertEquals(task.getNumSent(), task.getNumIndexed()); assertEquals(task.getNumIndexed(), task.getLastBeforeBulkInfo().numberOfActions()); assertTrue(task.getLastBeforeBulkInfo().estimatedSizeInBytes() > 0); @@ -1047,9 +1050,10 @@ private void prepareSourceIndex(final String sourceIndex, boolean blockWrite) { ); } - private void rollup(String sourceIndex, String rollupIndex, DownsampleConfig config) { + private void downsample(String sourceIndex, String downsampleIndex, DownsampleConfig config) { assertAcked( - client().execute(DownsampleAction.INSTANCE, new DownsampleAction.Request(sourceIndex, rollupIndex, TIMEOUT, config)).actionGet() + client().execute(DownsampleAction.INSTANCE, new DownsampleAction.Request(sourceIndex, downsampleIndex, TIMEOUT, config)) + .actionGet() ); } @@ -1064,7 +1068,7 @@ private Aggregations aggregate(final String index, AggregationBuilder aggregatio } @SuppressWarnings("unchecked") - private void assertRollupIndex(String sourceIndex, String rollupIndex, DownsampleConfig config) throws IOException { + private void assertDownsampleIndex(String sourceIndex, String downsampleIndex, DownsampleConfig config) throws IOException { // Retrieve field information for the metric fields final GetMappingsResponse getMappingsResponse = indicesAdmin().prepareGetMappings(sourceIndex).get(); final Map sourceIndexMappings = getMappingsResponse.mappings() @@ -1073,7 +1077,7 @@ private void assertRollupIndex(String sourceIndex, String rollupIndex, Downsampl .filter(entry -> sourceIndex.equals(entry.getKey())) .findFirst() .map(mappingMetadata -> mappingMetadata.getValue().sourceAsMap()) - .orElseThrow(() -> new IllegalArgumentException("No mapping found for rollup source index [" + sourceIndex + "]")); + .orElseThrow(() -> new IllegalArgumentException("No mapping found for downsample source index [" + sourceIndex + "]")); final IndexMetadata indexMetadata = clusterAdmin().prepareState().get().getState().getMetadata().index(sourceIndex); final IndicesService indicesService = getInstanceFromNode(IndicesService.class); @@ -1092,29 +1096,29 @@ private void assertRollupIndex(String sourceIndex, String rollupIndex, Downsampl } }); - assertRollupIndexAggregations(sourceIndex, rollupIndex, config, metricFields, labelFields); + assertDownsampleIndexAggregations(sourceIndex, downsampleIndex, config, metricFields, labelFields); - GetIndexResponse indexSettingsResp = indicesAdmin().prepareGetIndex().addIndices(sourceIndex, rollupIndex).get(); - assertRollupIndexSettings(sourceIndex, rollupIndex, indexSettingsResp); + GetIndexResponse indexSettingsResp = indicesAdmin().prepareGetIndex().addIndices(sourceIndex, downsampleIndex).get(); + assertDownsampleIndexSettings(sourceIndex, downsampleIndex, indexSettingsResp); Map> mappings = (Map>) indexSettingsResp.getMappings() - .get(rollupIndex) + .get(downsampleIndex) .getSourceAsMap() .get("properties"); assertFieldMappings(config, metricFields, mappings); - GetMappingsResponse indexMappings = indicesAdmin().getMappings(new GetMappingsRequest().indices(rollupIndex, sourceIndex)) + GetMappingsResponse indexMappings = indicesAdmin().getMappings(new GetMappingsRequest().indices(downsampleIndex, sourceIndex)) .actionGet(); - Map rollupIndexProperties = (Map) indexMappings.mappings() - .get(rollupIndex) + Map downsampleIndexProperties = (Map) indexMappings.mappings() + .get(downsampleIndex) .sourceAsMap() .get("properties"); Map sourceIndexCloneProperties = (Map) indexMappings.mappings() .get(sourceIndex) .sourceAsMap() .get("properties"); - List> labelFieldRollupIndexCloneProperties = (rollupIndexProperties.entrySet() + List> labelFieldDownsampleIndexCloneProperties = (downsampleIndexProperties.entrySet() .stream() .filter(entry -> labelFields.containsKey(entry.getKey())) .toList()); @@ -1122,110 +1126,110 @@ private void assertRollupIndex(String sourceIndex, String rollupIndex, Downsampl .stream() .filter(entry -> labelFields.containsKey(entry.getKey())) .toList()); - assertEquals(labelFieldRollupIndexCloneProperties, labelFieldSourceIndexProperties); + assertEquals(labelFieldDownsampleIndexCloneProperties, labelFieldSourceIndexProperties); } - private void assertRollupIndexAggregations( + private void assertDownsampleIndexAggregations( String sourceIndex, - String rollupIndex, + String downsampleIndex, DownsampleConfig config, Map metricFields, Map labelFields ) { final AggregationBuilder aggregations = buildAggregations(config, metricFields, labelFields, config.getTimestampField()); Aggregations origResp = aggregate(sourceIndex, aggregations); - Aggregations rollupResp = aggregate(rollupIndex, aggregations); - assertEquals(origResp.asMap().keySet(), rollupResp.asMap().keySet()); + Aggregations downsampleResp = aggregate(downsampleIndex, aggregations); + assertEquals(origResp.asMap().keySet(), downsampleResp.asMap().keySet()); StringTerms originalTsIdTermsAggregation = (StringTerms) origResp.getAsMap().values().stream().toList().get(0); - StringTerms rollupTsIdTermsAggregation = (StringTerms) rollupResp.getAsMap().values().stream().toList().get(0); + StringTerms downsampleTsIdTermsAggregation = (StringTerms) downsampleResp.getAsMap().values().stream().toList().get(0); originalTsIdTermsAggregation.getBuckets().forEach(originalBucket -> { - StringTerms.Bucket rollupBucket = rollupTsIdTermsAggregation.getBucketByKey(originalBucket.getKeyAsString()); - assertEquals(originalBucket.getAggregations().asList().size(), rollupBucket.getAggregations().asList().size()); + StringTerms.Bucket downsampleBucket = downsampleTsIdTermsAggregation.getBucketByKey(originalBucket.getKeyAsString()); + assertEquals(originalBucket.getAggregations().asList().size(), downsampleBucket.getAggregations().asList().size()); InternalDateHistogram originalDateHistogram = (InternalDateHistogram) originalBucket.getAggregations().asList().get(0); - InternalDateHistogram rollupDateHistogram = (InternalDateHistogram) rollupBucket.getAggregations().asList().get(0); + InternalDateHistogram downsamoleDateHistogram = (InternalDateHistogram) downsampleBucket.getAggregations().asList().get(0); List originalDateHistogramBuckets = originalDateHistogram.getBuckets(); - List rollupDateHistogramBuckets = rollupDateHistogram.getBuckets(); - assertEquals(originalDateHistogramBuckets.size(), rollupDateHistogramBuckets.size()); + List downsampleDateHistogramBuckets = downsamoleDateHistogram.getBuckets(); + assertEquals(originalDateHistogramBuckets.size(), downsampleDateHistogramBuckets.size()); assertEquals( originalDateHistogramBuckets.stream().map(InternalDateHistogram.Bucket::getKeyAsString).collect(Collectors.toList()), - rollupDateHistogramBuckets.stream().map(InternalDateHistogram.Bucket::getKeyAsString).collect(Collectors.toList()) + downsampleDateHistogramBuckets.stream().map(InternalDateHistogram.Bucket::getKeyAsString).collect(Collectors.toList()) ); for (int i = 0; i < originalDateHistogramBuckets.size(); ++i) { InternalDateHistogram.Bucket originalDateHistogramBucket = originalDateHistogramBuckets.get(i); - InternalDateHistogram.Bucket rollupDateHistogramBucket = rollupDateHistogramBuckets.get(i); - assertEquals(originalDateHistogramBucket.getKeyAsString(), rollupDateHistogramBucket.getKeyAsString()); + InternalDateHistogram.Bucket downsampleDateHistogramBucket = downsampleDateHistogramBuckets.get(i); + assertEquals(originalDateHistogramBucket.getKeyAsString(), downsampleDateHistogramBucket.getKeyAsString()); Aggregations originalAggregations = originalDateHistogramBucket.getAggregations(); - Aggregations rollupAggregations = rollupDateHistogramBucket.getAggregations(); - assertEquals(originalAggregations.asList().size(), rollupAggregations.asList().size()); + Aggregations downsampleAggregations = downsampleDateHistogramBucket.getAggregations(); + assertEquals(originalAggregations.asList().size(), downsampleAggregations.asList().size()); List nonTopHitsOriginalAggregations = originalAggregations.asList() .stream() .filter(agg -> agg.getType().equals("top_hits") == false) .toList(); - List nonTopHitsRollupAggregations = rollupAggregations.asList() + List nonTopHitsDownsampleAggregations = downsampleAggregations.asList() .stream() .filter(agg -> agg.getType().equals("top_hits") == false) .toList(); - assertEquals(nonTopHitsOriginalAggregations, nonTopHitsRollupAggregations); + assertEquals(nonTopHitsOriginalAggregations, nonTopHitsDownsampleAggregations); List topHitsOriginalAggregations = originalAggregations.asList() .stream() .filter(agg -> agg.getType().equals("top_hits")) .toList(); - List topHitsRollupAggregations = rollupAggregations.asList() + List topHitsDownsampleAggregations = downsampleAggregations.asList() .stream() .filter(agg -> agg.getType().equals("top_hits")) .toList(); - assertEquals(topHitsRollupAggregations.size(), topHitsRollupAggregations.size()); + assertEquals(topHitsDownsampleAggregations.size(), topHitsDownsampleAggregations.size()); - for (int j = 0; j < topHitsRollupAggregations.size(); ++j) { + for (int j = 0; j < topHitsDownsampleAggregations.size(); ++j) { InternalTopHits originalTopHits = (InternalTopHits) topHitsOriginalAggregations.get(j); - InternalTopHits rollupTopHits = (InternalTopHits) topHitsRollupAggregations.get(j); + InternalTopHits downsampleTopHits = (InternalTopHits) topHitsDownsampleAggregations.get(j); SearchHit[] originalHits = originalTopHits.getHits().getHits(); - SearchHit[] rollupHits = rollupTopHits.getHits().getHits(); - assertEquals(originalHits.length, rollupHits.length); + SearchHit[] downsampleHits = downsampleTopHits.getHits().getHits(); + assertEquals(originalHits.length, downsampleHits.length); for (int k = 0; k < originalHits.length; ++k) { SearchHit originalHit = originalHits[k]; - SearchHit rollupHit = rollupHits[k]; + SearchHit downsampleHit = downsampleHits[k]; Map originalHitDocumentFields = originalHit.getDocumentFields(); - Map rollupHitDocumentFields = rollupHit.getDocumentFields(); + Map downsampleHitDocumentFields = downsampleHit.getDocumentFields(); List originalFields = originalHitDocumentFields.values().stream().toList(); - List rollupFields = rollupHitDocumentFields.values().stream().toList(); + List downsampleFields = downsampleHitDocumentFields.values().stream().toList(); List originalFieldsList = originalFields.stream().flatMap(x -> x.getValues().stream()).toList(); - List rollupFieldsList = rollupFields.stream().flatMap(x -> x.getValues().stream()).toList(); - if (originalFieldsList.isEmpty() == false && rollupFieldsList.isEmpty() == false) { + List downsampelFieldsList = downsampleFields.stream().flatMap(x -> x.getValues().stream()).toList(); + if (originalFieldsList.isEmpty() == false && downsampelFieldsList.isEmpty() == false) { // NOTE: here we take advantage of the fact that a label field is indexed also as a metric of type - // `counter`. This way we can actually check that the label value stored in the rollup index + // `counter`. This way we can actually check that the label value stored in the downsample index // is the last value (which is what we store for a metric of type counter) by comparing the metric // field value to the label field value. originalFieldsList.forEach( field -> assertTrue( - "Field [" + field + "] is not included in the rollup fields: " + rollupFieldsList, - rollupFieldsList.contains(field) + "Field [" + field + "] is not included in the downsample fields: " + downsampelFieldsList, + downsampelFieldsList.contains(field) ) ); - rollupFieldsList.forEach( + downsampelFieldsList.forEach( field -> assertTrue( "Field [" + field + "] is not included in the source fields: " + originalFieldsList, originalFieldsList.contains(field) ) ); Object originalLabelValue = originalHit.getDocumentFields().values().stream().toList().get(0).getValue(); - Object rollupLabelValue = rollupHit.getDocumentFields().values().stream().toList().get(0).getValue(); + Object downsampleLabelValue = downsampleHit.getDocumentFields().values().stream().toList().get(0).getValue(); Optional labelAsMetric = nonTopHitsOriginalAggregations.stream() - .filter(agg -> agg.getName().equals("metric_" + rollupTopHits.getName())) + .filter(agg -> agg.getName().equals("metric_" + downsampleTopHits.getName())) .findFirst(); // NOTE: this check is possible only if the label can be indexed as a metric (the label is a numeric field) if (labelAsMetric.isPresent()) { double metricValue = ((Max) labelAsMetric.get()).value(); - assertEquals(metricValue, rollupLabelValue); + assertEquals(metricValue, downsampleLabelValue); assertEquals(metricValue, originalLabelValue); } } @@ -1257,74 +1261,74 @@ private void assertFieldMappings( }); } - private void assertRollupIndexSettings(String sourceIndex, String rollupIndex, GetIndexResponse indexSettingsResp) { + private void assertDownsampleIndexSettings(String sourceIndex, String downsampleIndex, GetIndexResponse indexSettingsResp) { Settings sourceSettings = indexSettingsResp.settings().get(sourceIndex); - Settings rollupSettings = indexSettingsResp.settings().get(rollupIndex); + Settings downsampleSettings = indexSettingsResp.settings().get(downsampleIndex); - // Assert rollup metadata are set in index settings - assertEquals("success", rollupSettings.get(IndexMetadata.INDEX_DOWNSAMPLE_STATUS_KEY)); + // Assert downsample metadata are set in index settings + assertEquals("success", downsampleSettings.get(IndexMetadata.INDEX_DOWNSAMPLE_STATUS_KEY)); assertNotNull(sourceSettings.get(IndexMetadata.SETTING_INDEX_UUID)); - assertNotNull(rollupSettings.get(IndexMetadata.INDEX_DOWNSAMPLE_SOURCE_UUID_KEY)); + assertNotNull(downsampleSettings.get(IndexMetadata.INDEX_DOWNSAMPLE_SOURCE_UUID_KEY)); assertEquals( sourceSettings.get(IndexMetadata.SETTING_INDEX_UUID), - rollupSettings.get(IndexMetadata.INDEX_DOWNSAMPLE_SOURCE_UUID_KEY) + downsampleSettings.get(IndexMetadata.INDEX_DOWNSAMPLE_SOURCE_UUID_KEY) ); - assertEquals(sourceIndex, rollupSettings.get(IndexMetadata.INDEX_DOWNSAMPLE_SOURCE_NAME_KEY)); - assertEquals(sourceSettings.get(IndexSettings.MODE.getKey()), rollupSettings.get(IndexSettings.MODE.getKey())); + assertEquals(sourceIndex, downsampleSettings.get(IndexMetadata.INDEX_DOWNSAMPLE_SOURCE_NAME_KEY)); + assertEquals(sourceSettings.get(IndexSettings.MODE.getKey()), downsampleSettings.get(IndexSettings.MODE.getKey())); assertNotNull(sourceSettings.get(IndexSettings.TIME_SERIES_START_TIME.getKey())); - assertNotNull(rollupSettings.get(IndexSettings.TIME_SERIES_START_TIME.getKey())); + assertNotNull(downsampleSettings.get(IndexSettings.TIME_SERIES_START_TIME.getKey())); assertEquals( sourceSettings.get(IndexSettings.TIME_SERIES_START_TIME.getKey()), - rollupSettings.get(IndexSettings.TIME_SERIES_START_TIME.getKey()) + downsampleSettings.get(IndexSettings.TIME_SERIES_START_TIME.getKey()) ); assertNotNull(sourceSettings.get(IndexSettings.TIME_SERIES_END_TIME.getKey())); - assertNotNull(rollupSettings.get(IndexSettings.TIME_SERIES_END_TIME.getKey())); + assertNotNull(downsampleSettings.get(IndexSettings.TIME_SERIES_END_TIME.getKey())); assertEquals( sourceSettings.get(IndexSettings.TIME_SERIES_END_TIME.getKey()), - rollupSettings.get(IndexSettings.TIME_SERIES_END_TIME.getKey()) + downsampleSettings.get(IndexSettings.TIME_SERIES_END_TIME.getKey()) ); assertNotNull(sourceSettings.get(IndexMetadata.INDEX_ROUTING_PATH.getKey())); - assertNotNull(rollupSettings.get(IndexMetadata.INDEX_ROUTING_PATH.getKey())); + assertNotNull(downsampleSettings.get(IndexMetadata.INDEX_ROUTING_PATH.getKey())); assertEquals( sourceSettings.get(IndexMetadata.INDEX_ROUTING_PATH.getKey()), - rollupSettings.get(IndexMetadata.INDEX_ROUTING_PATH.getKey()) + downsampleSettings.get(IndexMetadata.INDEX_ROUTING_PATH.getKey()) ); assertNotNull(sourceSettings.get(IndexMetadata.SETTING_NUMBER_OF_SHARDS)); - assertNotNull(rollupSettings.get(IndexMetadata.SETTING_NUMBER_OF_SHARDS)); + assertNotNull(downsampleSettings.get(IndexMetadata.SETTING_NUMBER_OF_SHARDS)); assertEquals( sourceSettings.get(IndexMetadata.SETTING_NUMBER_OF_SHARDS), - rollupSettings.get(IndexMetadata.SETTING_NUMBER_OF_SHARDS) + downsampleSettings.get(IndexMetadata.SETTING_NUMBER_OF_SHARDS) ); assertNotNull(sourceSettings.get(IndexMetadata.SETTING_NUMBER_OF_REPLICAS)); - assertNotNull(rollupSettings.get(IndexMetadata.SETTING_NUMBER_OF_REPLICAS)); + assertNotNull(downsampleSettings.get(IndexMetadata.SETTING_NUMBER_OF_REPLICAS)); assertEquals( sourceSettings.get(IndexMetadata.SETTING_NUMBER_OF_REPLICAS), - rollupSettings.get(IndexMetadata.SETTING_NUMBER_OF_REPLICAS) + downsampleSettings.get(IndexMetadata.SETTING_NUMBER_OF_REPLICAS) ); - assertNull(rollupSettings.get(LifecycleSettings.LIFECYCLE_NAME_SETTING.getKey())); - assertEquals("true", rollupSettings.get(IndexMetadata.SETTING_BLOCKS_WRITE)); - assertEquals(sourceSettings.get(IndexMetadata.SETTING_INDEX_HIDDEN), rollupSettings.get(IndexMetadata.SETTING_INDEX_HIDDEN)); + assertNull(downsampleSettings.get(LifecycleSettings.LIFECYCLE_NAME_SETTING.getKey())); + assertEquals("true", downsampleSettings.get(IndexMetadata.SETTING_BLOCKS_WRITE)); + assertEquals(sourceSettings.get(IndexMetadata.SETTING_INDEX_HIDDEN), downsampleSettings.get(IndexMetadata.SETTING_INDEX_HIDDEN)); if (sourceSettings.keySet().contains(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey())) { - assertNotNull(indexSettingsResp.getSetting(rollupIndex, MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey())); + assertNotNull(indexSettingsResp.getSetting(downsampleIndex, MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey())); } if (sourceSettings.keySet().contains(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey()) - && rollupSettings.keySet().contains(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey())) { + && downsampleSettings.keySet().contains(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey())) { assertEquals( sourceSettings.get(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey()), - rollupSettings.get(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey()) + downsampleSettings.get(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey()) ); } else { assertFalse(sourceSettings.keySet().contains(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey())); - assertFalse(rollupSettings.keySet().contains(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey())); + assertFalse(downsampleSettings.keySet().contains(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey())); } } @@ -1437,7 +1441,7 @@ private String createDataStream() throws Exception { return dataStreamName; } - public void testConcurrentRollup() throws IOException, InterruptedException { + public void testConcurrentDownsample() throws IOException, InterruptedException { final DownsampleConfig config = new DownsampleConfig(randomInterval()); SourceSupplier sourceSupplier = () -> { String ts = randomDateForInterval(config.getInterval()); @@ -1494,29 +1498,29 @@ public void testConcurrentRollup() throws IOException, InterruptedException { prepareSourceIndex(sourceIndex, true); int n = randomIntBetween(3, 6); - final CountDownLatch rollupComplete = new CountDownLatch(n); + final CountDownLatch downsampleComplete = new CountDownLatch(n); final List targets = new ArrayList<>(); final List threads = new ArrayList<>(); for (int i = 0; i < n; i++) { final String targetIndex = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); targets.add(targetIndex); threads.add(new Thread(() -> { - rollup(sourceIndex, targetIndex, config); - rollupComplete.countDown(); + downsample(sourceIndex, targetIndex, config); + downsampleComplete.countDown(); })); } for (int i = 0; i < n; i++) { threads.get(i).start(); } - assertTrue(rollupComplete.await(30, TimeUnit.SECONDS)); + assertTrue(downsampleComplete.await(30, TimeUnit.SECONDS)); for (int i = 0; i < n; i++) { - assertRollupIndex(sourceIndex, targets.get(i), config); + assertDownsampleIndex(sourceIndex, targets.get(i), config); } } - public void testDuplicateRollupRequest() throws IOException, InterruptedException { + public void testDuplicateDownsampleRequest() throws IOException, InterruptedException { final DownsampleConfig config = new DownsampleConfig(randomInterval()); SourceSupplier sourceSupplier = () -> { String ts = randomDateForInterval(config.getInterval()); @@ -1572,33 +1576,33 @@ public void testDuplicateRollupRequest() throws IOException, InterruptedExceptio bulkIndex(sourceIndex, sourceSupplier, 512); prepareSourceIndex(sourceIndex, true); - final CountDownLatch rollupComplete = new CountDownLatch(2); + final CountDownLatch downsampleComplete = new CountDownLatch(2); final String targetIndex = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); AtomicBoolean firstFailed = new AtomicBoolean(false); AtomicBoolean secondFailed = new AtomicBoolean(false); // NOTE: we expect one thread to run the downsample operation and the other one to fail new Thread(() -> { try { - rollup(sourceIndex, targetIndex, config); + downsample(sourceIndex, targetIndex, config); } catch (ResourceAlreadyExistsException e) { firstFailed.set(true); } finally { - rollupComplete.countDown(); + downsampleComplete.countDown(); } }).start(); new Thread(() -> { try { - rollup(sourceIndex, targetIndex, config); + downsample(sourceIndex, targetIndex, config); } catch (ResourceAlreadyExistsException e) { secondFailed.set(true); } finally { - rollupComplete.countDown(); + downsampleComplete.countDown(); } }).start(); - assertTrue(rollupComplete.await(30, TimeUnit.SECONDS)); + assertTrue(downsampleComplete.await(30, TimeUnit.SECONDS)); assertFalse(firstFailed.get() ^ secondFailed.get()); - assertRollupIndex(sourceIndex, targetIndex, config); + assertDownsampleIndex(sourceIndex, targetIndex, config); } }