Skip to content

Commit

Permalink
Rename many rollup usages in downsample codebase.
Browse files Browse the repository at this point in the history
  • Loading branch information
martijnvg committed Aug 15, 2023
1 parent 4a209b4 commit 776718e
Show file tree
Hide file tree
Showing 26 changed files with 693 additions and 664 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.xpack.core.datastreams.DataStreamFeatureSetUsage;
import org.elasticsearch.xpack.core.datastreams.DataStreamLifecycleFeatureSetUsage;
import org.elasticsearch.xpack.core.downsample.DownsampleIndexerAction;
import org.elasticsearch.xpack.core.downsample.DownsampleShardStatus;
import org.elasticsearch.xpack.core.enrich.EnrichFeatureSetUsage;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyStatus;
import org.elasticsearch.xpack.core.eql.EqlFeatureSetUsage;
Expand Down Expand Up @@ -153,7 +154,6 @@
import org.elasticsearch.xpack.core.rollup.action.GetRollupJobsAction;
import org.elasticsearch.xpack.core.rollup.action.PutRollupJobAction;
import org.elasticsearch.xpack.core.rollup.action.RollupSearchAction;
import org.elasticsearch.xpack.core.rollup.action.RollupShardStatus;
import org.elasticsearch.xpack.core.rollup.action.StartRollupJobAction;
import org.elasticsearch.xpack.core.rollup.action.StopRollupJobAction;
import org.elasticsearch.xpack.core.rollup.job.RollupJob;
Expand Down Expand Up @@ -462,7 +462,7 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, RollupJob.NAME, RollupJob::new),
new NamedWriteableRegistry.Entry(Task.Status.class, RollupJobStatus.NAME, RollupJobStatus::new),
new NamedWriteableRegistry.Entry(PersistentTaskState.class, RollupJobStatus.NAME, RollupJobStatus::new),
new NamedWriteableRegistry.Entry(Task.Status.class, RollupShardStatus.NAME, RollupShardStatus::new),
new NamedWriteableRegistry.Entry(Task.Status.class, DownsampleShardStatus.NAME, DownsampleShardStatus::new),
// ccr
new NamedWriteableRegistry.Entry(AutoFollowMetadata.class, AutoFollowMetadata.TYPE, AutoFollowMetadata::new),
new NamedWriteableRegistry.Entry(Metadata.Custom.class, AutoFollowMetadata.TYPE, AutoFollowMetadata::new),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* 2.0.
*/

package org.elasticsearch.xpack.core.rollup.action;
package org.elasticsearch.xpack.core.downsample;

import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -22,7 +22,7 @@
* This class includes statistics collected by the downsampling task after
* a bulk indexing operation ends.
*/
public record RollupAfterBulkInfo(
public record DownsampleAfterBulkInfo(
long currentTimeMillis,
long executionId,
long lastIngestTookInMillis,
Expand All @@ -40,11 +40,11 @@ public record RollupAfterBulkInfo(
private static final ParseField HAS_FAILURES = new ParseField("has_failures");
private static final ParseField REST_STATUS_CODE = new ParseField("rest_status_code");

private static final ConstructingObjectParser<RollupAfterBulkInfo, Void> PARSER;
private static final ConstructingObjectParser<DownsampleAfterBulkInfo, Void> PARSER;
static {
PARSER = new ConstructingObjectParser<>(
NAME,
args -> new RollupAfterBulkInfo(
args -> new DownsampleAfterBulkInfo(
(Long) args[0],
(Long) args[1],
(Long) args[2],
Expand All @@ -62,7 +62,7 @@ public record RollupAfterBulkInfo(
PARSER.declareInt(ConstructingObjectParser.constructorArg(), REST_STATUS_CODE);
}

public RollupAfterBulkInfo(final StreamInput in) throws IOException {
public DownsampleAfterBulkInfo(final StreamInput in) throws IOException {
this(in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readBoolean(), in.readVInt());
}

Expand Down Expand Up @@ -93,7 +93,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder.endObject();
}

public static RollupAfterBulkInfo fromXContent(XContentParser parser) throws IOException {
public static DownsampleAfterBulkInfo fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* 2.0.
*/

package org.elasticsearch.xpack.core.rollup.action;
package org.elasticsearch.xpack.core.downsample;

import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -22,7 +22,7 @@
* This class includes statistics collected by the downsampling task before
* a bulk indexing operation starts.
*/
public record RollupBeforeBulkInfo(long currentTimeMillis, long executionId, long estimatedSizeInBytes, int numberOfActions)
public record DownsampleBeforeBulkInfo(long currentTimeMillis, long executionId, long estimatedSizeInBytes, int numberOfActions)
implements
NamedWriteable,
ToXContentObject {
Expand All @@ -34,11 +34,11 @@ public record RollupBeforeBulkInfo(long currentTimeMillis, long executionId, lon
private static final ParseField ESTIMATED_SIZE_IN_BYTES = new ParseField("estimated_size_in_bytes");
private static final ParseField NUMBER_OF_ACTIONS = new ParseField("number_of_actions");

private static final ConstructingObjectParser<RollupBeforeBulkInfo, Void> PARSER;
private static final ConstructingObjectParser<DownsampleBeforeBulkInfo, Void> PARSER;
static {
PARSER = new ConstructingObjectParser<>(
NAME,
args -> new RollupBeforeBulkInfo((Long) args[0], (Long) args[1], (Long) args[2], (Integer) args[3])
args -> new DownsampleBeforeBulkInfo((Long) args[0], (Long) args[1], (Long) args[2], (Integer) args[3])
);

PARSER.declareLong(ConstructingObjectParser.constructorArg(), CURRENT_TIME_IN_MILLIS);
Expand All @@ -47,7 +47,7 @@ public record RollupBeforeBulkInfo(long currentTimeMillis, long executionId, lon
PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_ACTIONS);
}

public RollupBeforeBulkInfo(final StreamInput in) throws IOException {
public DownsampleBeforeBulkInfo(final StreamInput in) throws IOException {
this(in.readVLong(), in.readVLong(), in.readVLong(), in.readVInt());
}

Expand All @@ -74,7 +74,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder.endObject();
}

public static RollupBeforeBulkInfo fromXContent(XContentParser parser) throws IOException {
public static DownsampleBeforeBulkInfo fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* 2.0.
*/

package org.elasticsearch.xpack.core.rollup.action;
package org.elasticsearch.xpack.core.downsample;

import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -22,7 +22,7 @@
* This class includes statistics collected by the downsampling task
* for bulk indexing operations.
*/
public record RollupBulkInfo(
public record DownsampleBulkInfo(
long totalBulkCount,
long bulkIngestSumMillis,
long maxBulkIngestMillis,
Expand All @@ -42,11 +42,11 @@ public record RollupBulkInfo(
private static final ParseField MAX_BULK_TOOK_MILLIS = new ParseField("max_bulk_took_millis");
private static final ParseField MIN_BULK_TOOK_MILLIS = new ParseField("min_bulk_took_millis");

private static final ConstructingObjectParser<RollupBulkInfo, Void> PARSER;
private static final ConstructingObjectParser<DownsampleBulkInfo, Void> PARSER;
static {
PARSER = new ConstructingObjectParser<>(
NAME,
args -> new RollupBulkInfo(
args -> new DownsampleBulkInfo(
(Long) args[0],
(Long) args[1],
(Long) args[2],
Expand All @@ -66,11 +66,11 @@ public record RollupBulkInfo(
PARSER.declareLong(ConstructingObjectParser.constructorArg(), MIN_BULK_TOOK_MILLIS);
}

public RollupBulkInfo(final StreamInput in) throws IOException {
public DownsampleBulkInfo(final StreamInput in) throws IOException {
this(in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong());
}

public static RollupBulkInfo fromXContext(XContentParser parser) throws IOException {
public static DownsampleBulkInfo fromXContext(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
* 2.0.
*/

package org.elasticsearch.xpack.core.rollup.action;
package org.elasticsearch.xpack.core.downsample;

import java.util.concurrent.atomic.AtomicLong;

public class RollupBulkStats {
public class DownsampleBulkStats {
private final AtomicLong totalBulkCount = new AtomicLong(0);
private final AtomicLong bulkIngestSumMillis = new AtomicLong(0);
private final AtomicLong maxBulkIngestMillis = new AtomicLong(-1);
Expand Down Expand Up @@ -39,10 +39,10 @@ private static long max(long newValue, long existingValue) {
}

/**
* @return An instance of {@link RollupBulkInfo} including rollup bulk indexing statistics.
* @return An instance of {@link DownsampleBulkInfo} including rollup bulk indexing statistics.
*/
public RollupBulkInfo getRollupBulkInfo() {
return new RollupBulkInfo(
public DownsampleBulkInfo getRollupBulkInfo() {
return new DownsampleBulkInfo(
this.totalBulkCount.get(),
this.bulkIngestSumMillis.get(),
Math.max(0, this.maxBulkIngestMillis.get()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.rollup.action.RollupShardTask;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -249,7 +248,7 @@ public ShardDownsampleRequest(final ShardId shardId, final Request request) {
this.request = request;
}

public String getRollupIndex() {
public String getDownsampleIndex() {
return request.getDownsampleRequest().getTargetIndex();
}

Expand Down Expand Up @@ -277,7 +276,7 @@ public void writeTo(StreamOutput out) throws IOException {

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new RollupShardTask(
return new DownsampleShardTask(
id,
type,
action,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* 2.0.
*/

package org.elasticsearch.xpack.core.rollup.action;
package org.elasticsearch.xpack.core.downsample;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -16,15 +16,15 @@
/**
* Status of the rollup indexer task
*/
public enum RollupShardIndexerStatus implements Writeable {
public enum DownsampleShardIndexerStatus implements Writeable {
INITIALIZED,
STARTED,
FAILED,
COMPLETED,
CANCELLED;

public static RollupShardIndexerStatus readFromStream(final StreamInput in) throws IOException {
return in.readEnum(RollupShardIndexerStatus.class);
public static DownsampleShardIndexerStatus readFromStream(final StreamInput in) throws IOException {
return in.readEnum(DownsampleShardIndexerStatus.class);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.downsample;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.persistent.PersistentTaskState;
import org.elasticsearch.xcontent.ObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;

import java.io.IOException;

/**
* @param downsampleShardIndexerStatus An instance of {@link DownsampleShardIndexerStatus} with the downsampleShardIndexerStatus of
* the downsample task
* @param tsid The latest successfully processed tsid component of a tuple (tsid, timestamp)
*/
public record DownsampleShardPersistentTaskState(DownsampleShardIndexerStatus downsampleShardIndexerStatus, BytesRef tsid)
implements
PersistentTaskState {

public static final String NAME = DownsampleShardTask.TASK_NAME;
private static final ParseField ROLLUP_SHARD_INDEXER_STATUS = new ParseField("status");
private static final ParseField TSID = new ParseField("tsid");

public static final ObjectParser<DownsampleShardPersistentTaskState.Builder, Void> PARSER = new ObjectParser<>(NAME);

static {
PARSER.declareField(
DownsampleShardPersistentTaskState.Builder::status,
(p, c) -> DownsampleShardIndexerStatus.valueOf(p.textOrNull()),
ROLLUP_SHARD_INDEXER_STATUS,
ObjectParser.ValueType.STRING
);
PARSER.declareField(
DownsampleShardPersistentTaskState.Builder::tsid,
(p, c) -> new BytesRef(p.textOrNull()),
TSID,
ObjectParser.ValueType.STRING
);
}

public DownsampleShardPersistentTaskState(final StreamInput in) throws IOException {
this(DownsampleShardIndexerStatus.readFromStream(in), in.readBytesRef());
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(ROLLUP_SHARD_INDEXER_STATUS.getPreferredName(), downsampleShardIndexerStatus);
if (tsid != null) {
builder.field(TSID.getPreferredName(), tsid.utf8ToString());
}
return builder.endObject();
}

@Override
public String getWriteableName() {
return DownsampleShardTask.TASK_NAME;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
downsampleShardIndexerStatus.writeTo(out);
out.writeBytesRef(tsid);
}

public DownsampleShardIndexerStatus downsampleShardIndexerStatus() {
return downsampleShardIndexerStatus;
}

public boolean done() {
return DownsampleShardIndexerStatus.COMPLETED.equals(downsampleShardIndexerStatus)
|| DownsampleShardIndexerStatus.CANCELLED.equals(downsampleShardIndexerStatus)
|| DownsampleShardIndexerStatus.FAILED.equals(downsampleShardIndexerStatus);
}

public boolean started() {
return DownsampleShardIndexerStatus.STARTED.equals(downsampleShardIndexerStatus);
}

public boolean cancelled() {
return DownsampleShardIndexerStatus.CANCELLED.equals(downsampleShardIndexerStatus);
}

public boolean failed() {
return DownsampleShardIndexerStatus.FAILED.equals(downsampleShardIndexerStatus);
}

public static DownsampleShardPersistentTaskState readFromStream(final StreamInput in) throws IOException {
return new DownsampleShardPersistentTaskState(DownsampleShardIndexerStatus.readFromStream(in), in.readBytesRef());
}

public static DownsampleShardPersistentTaskState fromXContent(final XContentParser parser) throws IOException {
final DownsampleShardPersistentTaskState.Builder builder = new DownsampleShardPersistentTaskState.Builder();
PARSER.parse(parser, builder, null);
return builder.build();
}

public static class Builder {
private DownsampleShardIndexerStatus status;
private BytesRef tsid;

public Builder status(final DownsampleShardIndexerStatus status) {
this.status = status;
return this;
}

public Builder tsid(final BytesRef tsid) {
this.tsid = tsid;
return this;
}

public DownsampleShardPersistentTaskState build() {
return new DownsampleShardPersistentTaskState(status, tsid);
}
}
}
Loading

0 comments on commit 776718e

Please sign in to comment.