Skip to content

Commit

Permalink
improve
Browse files Browse the repository at this point in the history
  • Loading branch information
weizijun committed Nov 7, 2022
1 parent 1d28c3b commit 558bf0a
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
public class RollupShardStatus implements Task.Status {
public static final String NAME = "rollup-index-shard";
private static final ParseField SHARD_FIELD = new ParseField("shard");
private static final ParseField STATUS_FIELD = new ParseField("status");
private static final ParseField START_TIME_FIELD = new ParseField("start_time");
private static final ParseField IN_NUM_DOCS_RECEIVED_FIELD = new ParseField("in_num_docs_received");
private static final ParseField OUT_NUM_DOCS_SENT_FIELD = new ParseField("out_num_docs_sent");
Expand All @@ -36,19 +35,17 @@ public class RollupShardStatus implements Task.Status {

private final ShardId shardId;
private final long rollupStart;
private Status status;
private AtomicLong numReceived = new AtomicLong(0);
private AtomicLong numSent = new AtomicLong(0);
private AtomicLong numIndexed = new AtomicLong(0);
private AtomicLong numFailed = new AtomicLong(0);
private final AtomicLong numReceived;
private final AtomicLong numSent;
private final AtomicLong numIndexed;
private final AtomicLong numFailed;

private static final ConstructingObjectParser<RollupShardStatus, Void> PARSER;
static {
PARSER = new ConstructingObjectParser<>(
NAME,
args -> new RollupShardStatus(
ShardId.fromString((String) args[0]),
Status.valueOf((String) args[1]),
Instant.parse((String) args[2]).toEpochMilli(),
new AtomicLong((Long) args[3]),
new AtomicLong((Long) args[4]),
Expand All @@ -58,7 +55,6 @@ public class RollupShardStatus implements Task.Status {
);

PARSER.declareString(constructorArg(), SHARD_FIELD);
PARSER.declareString(constructorArg(), STATUS_FIELD);
PARSER.declareString(constructorArg(), START_TIME_FIELD);
PARSER.declareLong(constructorArg(), IN_NUM_DOCS_RECEIVED_FIELD);
PARSER.declareLong(constructorArg(), OUT_NUM_DOCS_SENT_FIELD);
Expand All @@ -68,7 +64,6 @@ public class RollupShardStatus implements Task.Status {

public RollupShardStatus(StreamInput in) throws IOException {
shardId = new ShardId(in);
status = in.readEnum(Status.class);
rollupStart = in.readLong();
numReceived = new AtomicLong(in.readLong());
numSent = new AtomicLong(in.readLong());
Expand All @@ -78,47 +73,29 @@ public RollupShardStatus(StreamInput in) throws IOException {

public RollupShardStatus(
ShardId shardId,
Status status,
long rollupStart,
AtomicLong numReceived,
AtomicLong numSent,
AtomicLong numIndexed,
AtomicLong numFailed
) {
this.shardId = shardId;
this.status = status;
this.rollupStart = rollupStart;
this.numReceived = numReceived;
this.numSent = numSent;
this.numIndexed = numIndexed;
this.numFailed = numFailed;
}

public RollupShardStatus(ShardId shardId) {
status = Status.STARTED;
public RollupShardStatus(ShardId shardId, AtomicLong numReceived, AtomicLong numSent, AtomicLong numIndexed, AtomicLong numFailed) {
this.shardId = shardId;
this.rollupStart = System.currentTimeMillis();
}

public void init(AtomicLong numReceived, AtomicLong numSent, AtomicLong numIndexed, AtomicLong numFailed) {
this.numReceived = numReceived;
this.numSent = numSent;
this.numIndexed = numIndexed;
this.numFailed = numFailed;
}

public void setFinished() {
this.status = Status.FINISHED;
}

public boolean isCancelled() {
return this.status == Status.ABORT;
}

public void setCancelled() {
this.status = Status.ABORT;
}

public static RollupShardStatus fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}
Expand All @@ -127,7 +104,6 @@ public static RollupShardStatus fromXContent(XContentParser parser) throws IOExc
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(SHARD_FIELD.getPreferredName(), shardId);
builder.field(STATUS_FIELD.getPreferredName(), status);
builder.field(START_TIME_FIELD.getPreferredName(), Instant.ofEpochMilli(rollupStart).toString());
builder.field(IN_NUM_DOCS_RECEIVED_FIELD.getPreferredName(), numReceived.get());
builder.field(OUT_NUM_DOCS_SENT_FIELD.getPreferredName(), numSent.get());
Expand All @@ -145,7 +121,6 @@ public String getWriteableName() {
@Override
public void writeTo(StreamOutput out) throws IOException {
shardId.writeTo(out);
out.writeEnum(status);
out.writeLong(rollupStart);
out.writeLong(numReceived.get());
out.writeLong(numSent.get());
Expand All @@ -165,7 +140,6 @@ public boolean equals(Object o) {
return rollupStart == that.rollupStart
&& Objects.equals(shardId.getIndexName(), that.shardId.getIndexName())
&& Objects.equals(shardId.id(), that.shardId.id())
&& status == that.status
&& Objects.equals(numReceived.get(), that.numReceived.get())
&& Objects.equals(numSent.get(), that.numSent.get())
&& Objects.equals(numIndexed.get(), that.numIndexed.get())
Expand All @@ -178,7 +152,6 @@ public int hashCode() {
shardId.getIndexName(),
shardId.id(),
rollupStart,
status,
numReceived.get(),
numSent.get(),
numIndexed.get(),
Expand All @@ -190,22 +163,4 @@ public int hashCode() {
public String toString() {
return Strings.toString(this);
}

public enum Status {
STARTED,
FINISHED,
ABORT
}

public void setNumSent(AtomicLong numSent) {
this.numSent = numSent;
}

public void setNumIndexed(AtomicLong numIndexed) {
this.numIndexed = numIndexed;
}

public void setNumFailed(AtomicLong numFailed) {
this.numFailed = numFailed;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,16 @@
import org.elasticsearch.xpack.core.rollup.RollupField;

import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

public class RollupShardTask extends CancellableTask {
private String rollupIndex;
private DownsampleConfig config;
private volatile RollupShardStatus status;
private final RollupShardStatus status;
private final AtomicLong numReceived = new AtomicLong(0);
private final AtomicLong numSent = new AtomicLong(0);
private final AtomicLong numIndexed = new AtomicLong(0);
private final AtomicLong numFailed = new AtomicLong(0);

public RollupShardTask(
long id,
Expand All @@ -33,7 +38,7 @@ public RollupShardTask(
super(id, type, action, RollupField.NAME + "_" + rollupIndex + "[" + shardId.id() + "]", parentTask, headers);
this.rollupIndex = rollupIndex;
this.config = config;
this.status = new RollupShardStatus(shardId);
this.status = new RollupShardStatus(shardId, numReceived, numSent, numIndexed, numFailed);
}

public String getRollupIndex() {
Expand All @@ -49,8 +54,19 @@ public Status getStatus() {
return status;
}

@Override
public void onCancelled() {
status.setCancelled();
public AtomicLong getNumReceived() {
return numReceived;
}

public AtomicLong getNumSent() {
return numSent;
}

public AtomicLong getNumIndexed() {
return numIndexed;
}

public AtomicLong getNumFailed() {
return numFailed;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ protected Reader<RollupShardStatus> instanceReader() {
@Override
protected RollupShardStatus createTestInstance() {
RollupShardStatus rollupShardStatus = new RollupShardStatus(
new ShardId(randomAlphaOfLength(5), randomAlphaOfLength(5), randomInt(5))
);
rollupShardStatus.init(
new ShardId(randomAlphaOfLength(5), randomAlphaOfLength(5), randomInt(5)),
new AtomicLong(randomNonNegativeLong()),
new AtomicLong(randomNonNegativeLong()),
new AtomicLong(randomNonNegativeLong()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xpack.core.downsample.DownsampleConfig;
import org.elasticsearch.xpack.core.downsample.RollupIndexerAction;
import org.elasticsearch.xpack.core.rollup.action.RollupShardStatus;
import org.elasticsearch.xpack.core.rollup.action.RollupShardTask;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -92,15 +92,16 @@ class RollupShardIndexer {
private final String[] metricFields;
private final String[] labelFields;
private final Map<String, FieldValueFetcher> fieldValueFetchers;
private final AtomicLong numReceived = new AtomicLong();
private final AtomicLong numSent = new AtomicLong();
private final AtomicLong numIndexed = new AtomicLong();
private final AtomicLong numFailed = new AtomicLong();
private final AtomicLong numReceived;
private final AtomicLong numSent;
private final AtomicLong numIndexed;
private final AtomicLong numFailed;

private final RollupShardStatus status;
private final RollupShardTask task;
private volatile boolean abort = false;

RollupShardIndexer(
RollupShardStatus status,
RollupShardTask task,
Client client,
IndexService indexService,
ShardId shardId,
Expand All @@ -110,16 +111,18 @@ class RollupShardIndexer {
String[] metricFields,
String[] labelFields
) {
this.status = status;
this.task = task;
this.numReceived = task.getNumReceived();
this.numSent = task.getNumSent();
this.numIndexed = task.getNumIndexed();
this.numFailed = task.getNumFailed();
this.client = client;
this.indexShard = indexService.getShard(shardId.id());
this.config = config;
this.rollupIndex = rollupIndex;
this.dimensionFields = dimensionFields;
this.metricFields = metricFields;
this.labelFields = labelFields;

this.status.init(numReceived, numSent, numIndexed, numFailed);
this.searcher = indexShard.acquireSearcher("downsampling");
Closeable toClose = searcher;
try {
Expand Down Expand Up @@ -174,12 +177,11 @@ public RollupIndexerAction.ShardRollupResponse execute() throws IOException {
);
}

status.setFinished();
return new RollupIndexerAction.ShardRollupResponse(indexShard.shardId(), numIndexed.get());
}

private void checkCancelled() {
if (status.isCancelled()) {
if (task.isCancelled() || abort) {
logger.warn(
"Shard [{}] rollup abort, sent [{}], indexed [{}], failed[{}]",
indexShard.shardId(),
Expand Down Expand Up @@ -218,7 +220,7 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon
logger.error("Shard [{}] failed to populate rollup index. Failures: [{}]", indexShard.shardId(), failures);

// cancel rollup task
status.setCancelled();
abort = true;
}
}

Expand All @@ -230,7 +232,7 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
logger.error(() -> format("Shard [%s] failed to populate rollup index.", indexShard.shardId()), failure);

// cancel rollup task
status.setCancelled();
abort = true;
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.downsample.RollupIndexerAction;
import org.elasticsearch.xpack.core.rollup.action.RollupShardStatus;
import org.elasticsearch.xpack.core.rollup.action.RollupShardTask;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -133,7 +133,7 @@ protected RollupIndexerAction.ShardRollupResponse shardOperation(RollupIndexerAc
throws IOException {
IndexService indexService = indicesService.indexService(request.shardId().getIndex());
RollupShardIndexer indexer = new RollupShardIndexer(
(RollupShardStatus) task.getStatus(),
(RollupShardTask) task,
client,
indexService,
request.shardId(),
Expand Down Expand Up @@ -201,6 +201,7 @@ protected void finishHim() {

@Override
protected void onOperation(@Nullable ShardRouting shard, final ShardIterator shardIt, int shardIndex, Exception e) {
// when this shard operation failed, cancel other shard operations
cancelOtherShardIndexers();
super.onOperation(shard, shardIt, shardIndex, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@
import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.tasks.TaskCancelHelper;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
Expand All @@ -78,7 +80,7 @@
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
import org.elasticsearch.xpack.core.ilm.RolloverAction;
import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers;
import org.elasticsearch.xpack.core.rollup.action.RollupShardStatus;
import org.elasticsearch.xpack.core.rollup.action.RollupShardTask;
import org.elasticsearch.xpack.ilm.IndexLifecycle;
import org.elasticsearch.xpack.rollup.Rollup;
import org.junit.Before;
Expand All @@ -100,6 +102,7 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static java.util.Collections.emptyMap;
import static org.elasticsearch.index.mapper.TimeSeriesParams.TIME_SERIES_METRIC_PARAM;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
Expand Down Expand Up @@ -593,11 +596,21 @@ public void testCancelRollupIndexer() throws IOException {
IndexService indexService = indexServices.indexServiceSafe(srcIndex);
int shardNum = randomIntBetween(0, numOfShards - 1);
IndexShard shard = indexService.getShard(shardNum);
RollupShardStatus status = new RollupShardStatus(shard.shardId());
RollupShardTask task = new RollupShardTask(
randomLong(),
"rollup",
"action",
TaskId.EMPTY_TASK_ID,
rollupIndex,
config,
emptyMap(),
shard.shardId()
);
TaskCancelHelper.cancel(task, "test cancel");

// re-use source index as temp index for test
RollupShardIndexer indexer = new RollupShardIndexer(
status,
task,
client(),
indexService,
shard.shardId(),
Expand All @@ -607,7 +620,6 @@ public void testCancelRollupIndexer() throws IOException {
new String[] { FIELD_NUMERIC_1, FIELD_NUMERIC_2 },
new String[] {}
);
status.setCancelled();

TaskCancelledException exception = expectThrows(TaskCancelledException.class, () -> indexer.execute());
assertThat(exception.getMessage(), equalTo("Shard [" + sourceIndex + "][" + shardNum + "] rollup cancelled"));
Expand Down

0 comments on commit 558bf0a

Please sign in to comment.