Skip to content

Commit

Permalink
Improve short-circuiting downsample execution (#106563)
Browse files Browse the repository at this point in the history
This is relevant in the case multiple downsample api invocations have been executed for the same source index, target index and  fixed interval. Whether the target index is ready, is now also checked just before starting the downsample persistent tasks.

Relates to #106403
  • Loading branch information
martijnvg authored Mar 21, 2024
1 parent 32dbc28 commit 8a7697b
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 26 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/106563.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 106563
summary: Improve short-circuiting downsample execution
area: TSDB
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -234,33 +234,11 @@ protected void masterOperation(
}

final TaskId parentTask = new TaskId(clusterService.localNode().getId(), task.getId());
// Shortcircuit if target index has been downsampled:
// Short circuit if target index has been downsampled:
final String downsampleIndexName = request.getTargetIndex();
IndexMetadata downsampleIndex = state.getMetadata().index(downsampleIndexName);
if (downsampleIndex != null) {
var downsampleStatus = IndexMetadata.INDEX_DOWNSAMPLE_STATUS.get(downsampleIndex.getSettings());
if (downsampleStatus == DownsampleTaskStatus.UNKNOWN) {
// This isn't a downsample index, so fail:
listener.onFailure(new ResourceAlreadyExistsException(downsampleIndex.getIndex()));
return;
} else if (downsampleStatus == DownsampleTaskStatus.SUCCESS) {
listener.onResponse(AcknowledgedResponse.TRUE);
return;
}
// In case the write block has been set on the target index means that the shard level downsampling itself was successful,
// but the previous invocation failed later performing settings update, refresh or force merge.
// The write block is used a signal to resume from the refresh part of the downsample api invocation.
if (downsampleIndex.getSettings().get(IndexMetadata.SETTING_BLOCKS_WRITE) != null) {
var refreshRequest = new RefreshRequest(downsampleIndexName);
refreshRequest.setParentTask(parentTask);
client.admin()
.indices()
.refresh(
refreshRequest,
new RefreshDownsampleIndexActionListener(listener, parentTask, downsampleIndexName, request.getWaitTimeout())
);
return;
}
if (canShortCircuit(downsampleIndexName, parentTask, request.getWaitTimeout(), state.metadata(), listener)) {
logger.info("Skipping downsampling, because a previous execution already completed downsampling");
return;
}
try {
MetadataCreateIndexService.validateIndexName(downsampleIndexName, state);
Expand Down Expand Up @@ -356,6 +334,11 @@ protected void masterOperation(
}
}, e -> {
if (e instanceof ResourceAlreadyExistsException) {
var metadata = clusterService.state().metadata();
if (canShortCircuit(request.getTargetIndex(), parentTask, request.getWaitTimeout(), metadata, listener)) {
logger.info("Downsample tasks are not created, because a previous execution already completed downsampling");
return;
}
performShardDownsampling(
request,
delegate,
Expand All @@ -374,6 +357,47 @@ protected void masterOperation(
}));
}

/**
* Shortcircuit when another downsample api invocation already completed successfully.
*/
private boolean canShortCircuit(
String targetIndexName,
TaskId parentTask,
TimeValue waitTimeout,
Metadata metadata,
ActionListener<AcknowledgedResponse> listener
) {
IndexMetadata targetIndexMetadata = metadata.index(targetIndexName);
if (targetIndexMetadata == null) {
return false;
}

var downsampleStatus = IndexMetadata.INDEX_DOWNSAMPLE_STATUS.get(targetIndexMetadata.getSettings());
if (downsampleStatus == DownsampleTaskStatus.UNKNOWN) {
// This isn't a downsample index, so fail:
listener.onFailure(new ResourceAlreadyExistsException(targetIndexMetadata.getIndex()));
return true;
} else if (downsampleStatus == DownsampleTaskStatus.SUCCESS) {
listener.onResponse(AcknowledgedResponse.TRUE);
return true;
}
// In case the write block has been set on the target index means that the shard level downsampling itself was successful,
// but the previous invocation failed later performing settings update, refresh or force merge.
// The write block is used a signal to resume from the refresh part of the downsample api invocation.
if (targetIndexMetadata.getSettings().get(IndexMetadata.SETTING_BLOCKS_WRITE) != null) {
var refreshRequest = new RefreshRequest(targetIndexMetadata.getIndex().getName());
refreshRequest.setParentTask(parentTask);
client.admin()
.indices()
.refresh(
refreshRequest,
new RefreshDownsampleIndexActionListener(listener, parentTask, targetIndexMetadata.getIndex().getName(), waitTimeout)
);
return true;
}
return false;
}

// 3. downsample index created or already exist (in case of retry). Run downsample indexer persistent task on each shard.
private void performShardDownsampling(
DownsampleAction.Request request,
Expand Down

0 comments on commit 8a7697b

Please sign in to comment.