Skip to content

Commit

Permalink
Improve short-circuiting downsample execution
Browse files Browse the repository at this point in the history
This is relevant in the case multiple downsample api invocations have been executed for the same source index, target index and  fixed interval. Whether the target index is ready, is now also checked just before starting the downsample persistent tasks.

Relates to elastic#106403
  • Loading branch information
martijnvg committed Mar 20, 2024
1 parent 8357d2b commit 28cea02
Showing 1 changed file with 49 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -234,31 +234,12 @@ protected void masterOperation(
}

final TaskId parentTask = new TaskId(clusterService.localNode().getId(), task.getId());
// Shortcircuit if target index has been downsampled:
// Short circuit if target index has been downsampled:
final String downsampleIndexName = request.getTargetIndex();
IndexMetadata downsampleIndex = state.getMetadata().index(downsampleIndexName);
if (downsampleIndex != null) {
var downsampleStatus = IndexMetadata.INDEX_DOWNSAMPLE_STATUS.get(downsampleIndex.getSettings());
if (downsampleStatus == DownsampleTaskStatus.UNKNOWN) {
// This isn't a downsample index, so fail:
listener.onFailure(new ResourceAlreadyExistsException(downsampleIndex.getIndex()));
return;
} else if (downsampleStatus == DownsampleTaskStatus.SUCCESS) {
listener.onResponse(AcknowledgedResponse.TRUE);
return;
}
// In case the write block has been set on the target index means that the shard level downsampling itself was successful,
// but the previous invocation failed later performing settings update, refresh or force merge.
// The write block is used a signal to resume from the refresh part of the downsample api invocation.
if (downsampleIndex.getSettings().get(IndexMetadata.SETTING_BLOCKS_WRITE) != null) {
var refreshRequest = new RefreshRequest(downsampleIndexName);
refreshRequest.setParentTask(parentTask);
client.admin()
.indices()
.refresh(
refreshRequest,
new RefreshDownsampleIndexActionListener(listener, parentTask, downsampleIndexName, request.getWaitTimeout())
);
if (canShortCircuit(downsampleIndex, parentTask, request.getWaitTimeout(), listener)) {
logger.info("Skipping downsampling, because a previous execution already completed downsampling");
return;
}
}
Expand Down Expand Up @@ -345,6 +326,7 @@ protected void masterOperation(
request,
delegate,
sourceIndexMetadata,
null,
downsampleIndexName,
parentTask,
metricFields,
Expand All @@ -356,10 +338,12 @@ protected void masterOperation(
}
}, e -> {
if (e instanceof ResourceAlreadyExistsException) {
var targetIndexMetadata = state.getMetadata().index(downsampleIndexName);
performShardDownsampling(
request,
delegate,
sourceIndexMetadata,
targetIndexMetadata,
downsampleIndexName,
parentTask,
metricFields,
Expand All @@ -374,17 +358,60 @@ protected void masterOperation(
}));
}

/**
* Shortcircuit when another downsample api invocation already completed successfully.
*/
private boolean canShortCircuit(
IndexMetadata targetIndexMetadata,
TaskId parentTask,
TimeValue waitTimeout,
ActionListener<AcknowledgedResponse> listener
) {
var downsampleStatus = IndexMetadata.INDEX_DOWNSAMPLE_STATUS.get(targetIndexMetadata.getSettings());
if (downsampleStatus == DownsampleTaskStatus.UNKNOWN) {
// This isn't a downsample index, so fail:
listener.onFailure(new ResourceAlreadyExistsException(targetIndexMetadata.getIndex()));
return true;
} else if (downsampleStatus == DownsampleTaskStatus.SUCCESS) {
listener.onResponse(AcknowledgedResponse.TRUE);
return true;
}
// In case the write block has been set on the target index means that the shard level downsampling itself was successful,
// but the previous invocation failed later performing settings update, refresh or force merge.
// The write block is used a signal to resume from the refresh part of the downsample api invocation.
if (targetIndexMetadata.getSettings().get(IndexMetadata.SETTING_BLOCKS_WRITE) != null) {
var refreshRequest = new RefreshRequest(targetIndexMetadata.getIndex().getName());
refreshRequest.setParentTask(parentTask);
client.admin()
.indices()
.refresh(
refreshRequest,
new RefreshDownsampleIndexActionListener(listener, parentTask, targetIndexMetadata.getIndex().getName(), waitTimeout)
);
return true;
}
return false;
}

// 3. downsample index created or already exist (in case of retry). Run downsample indexer persistent task on each shard.
private void performShardDownsampling(
DownsampleAction.Request request,
ActionListener<AcknowledgedResponse> listener,
IndexMetadata sourceIndexMetadata,
IndexMetadata targetIndexMetadata,
String downsampleIndexName,
TaskId parentTask,
List<String> metricFields,
List<String> labelFields,
List<String> dimensionFields
) {
if (targetIndexMetadata != null) {
if (canShortCircuit(targetIndexMetadata, parentTask, request.getWaitTimeout(), listener)) {
logger.info("Downsample tasks are not created, because a previous execution already completed downsampling");
return;
}
}

final int numberOfShards = sourceIndexMetadata.getNumberOfShards();
final Index sourceIndex = sourceIndexMetadata.getIndex();
// NOTE: before we set the number of replicas to 0, as a result here we are
Expand Down

0 comments on commit 28cea02

Please sign in to comment.