From 8a7697bdc962e4265ad9248ad80adb26086517cc Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 21 Mar 2024 11:37:34 +0100 Subject: [PATCH] Improve short-circuiting downsample execution (#106563) This is relevant in the case multiple downsample api invocations have been executed for the same source index, target index and fixed interval. Whether the target index is ready, is now also checked just before starting the downsample persistent tasks. Relates to #106403 --- docs/changelog/106563.yaml | 5 ++ .../downsample/TransportDownsampleAction.java | 76 ++++++++++++------- 2 files changed, 55 insertions(+), 26 deletions(-) create mode 100644 docs/changelog/106563.yaml diff --git a/docs/changelog/106563.yaml b/docs/changelog/106563.yaml new file mode 100644 index 0000000000000..79476f909a04c --- /dev/null +++ b/docs/changelog/106563.yaml @@ -0,0 +1,5 @@ +pr: 106563 +summary: Improve short-circuiting downsample execution +area: TSDB +type: enhancement +issues: [] diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java index 5debe5d2edfc9..0570d93441be1 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java @@ -234,33 +234,11 @@ protected void masterOperation( } final TaskId parentTask = new TaskId(clusterService.localNode().getId(), task.getId()); - // Shortcircuit if target index has been downsampled: + // Short circuit if target index has been downsampled: final String downsampleIndexName = request.getTargetIndex(); - IndexMetadata downsampleIndex = state.getMetadata().index(downsampleIndexName); - if (downsampleIndex != null) { - var downsampleStatus = IndexMetadata.INDEX_DOWNSAMPLE_STATUS.get(downsampleIndex.getSettings()); - if (downsampleStatus == DownsampleTaskStatus.UNKNOWN) { - // This isn't a downsample index, so fail: - listener.onFailure(new ResourceAlreadyExistsException(downsampleIndex.getIndex())); - return; - } else if (downsampleStatus == DownsampleTaskStatus.SUCCESS) { - listener.onResponse(AcknowledgedResponse.TRUE); - return; - } - // In case the write block has been set on the target index means that the shard level downsampling itself was successful, - // but the previous invocation failed later performing settings update, refresh or force merge. - // The write block is used a signal to resume from the refresh part of the downsample api invocation. - if (downsampleIndex.getSettings().get(IndexMetadata.SETTING_BLOCKS_WRITE) != null) { - var refreshRequest = new RefreshRequest(downsampleIndexName); - refreshRequest.setParentTask(parentTask); - client.admin() - .indices() - .refresh( - refreshRequest, - new RefreshDownsampleIndexActionListener(listener, parentTask, downsampleIndexName, request.getWaitTimeout()) - ); - return; - } + if (canShortCircuit(downsampleIndexName, parentTask, request.getWaitTimeout(), state.metadata(), listener)) { + logger.info("Skipping downsampling, because a previous execution already completed downsampling"); + return; } try { MetadataCreateIndexService.validateIndexName(downsampleIndexName, state); @@ -356,6 +334,11 @@ protected void masterOperation( } }, e -> { if (e instanceof ResourceAlreadyExistsException) { + var metadata = clusterService.state().metadata(); + if (canShortCircuit(request.getTargetIndex(), parentTask, request.getWaitTimeout(), metadata, listener)) { + logger.info("Downsample tasks are not created, because a previous execution already completed downsampling"); + return; + } performShardDownsampling( request, delegate, @@ -374,6 +357,47 @@ protected void masterOperation( })); } + /** + * Shortcircuit when another downsample api invocation already completed successfully. + */ + private boolean canShortCircuit( + String targetIndexName, + TaskId parentTask, + TimeValue waitTimeout, + Metadata metadata, + ActionListener listener + ) { + IndexMetadata targetIndexMetadata = metadata.index(targetIndexName); + if (targetIndexMetadata == null) { + return false; + } + + var downsampleStatus = IndexMetadata.INDEX_DOWNSAMPLE_STATUS.get(targetIndexMetadata.getSettings()); + if (downsampleStatus == DownsampleTaskStatus.UNKNOWN) { + // This isn't a downsample index, so fail: + listener.onFailure(new ResourceAlreadyExistsException(targetIndexMetadata.getIndex())); + return true; + } else if (downsampleStatus == DownsampleTaskStatus.SUCCESS) { + listener.onResponse(AcknowledgedResponse.TRUE); + return true; + } + // In case the write block has been set on the target index means that the shard level downsampling itself was successful, + // but the previous invocation failed later performing settings update, refresh or force merge. + // The write block is used a signal to resume from the refresh part of the downsample api invocation. + if (targetIndexMetadata.getSettings().get(IndexMetadata.SETTING_BLOCKS_WRITE) != null) { + var refreshRequest = new RefreshRequest(targetIndexMetadata.getIndex().getName()); + refreshRequest.setParentTask(parentTask); + client.admin() + .indices() + .refresh( + refreshRequest, + new RefreshDownsampleIndexActionListener(listener, parentTask, targetIndexMetadata.getIndex().getName(), waitTimeout) + ); + return true; + } + return false; + } + // 3. downsample index created or already exist (in case of retry). Run downsample indexer persistent task on each shard. private void performShardDownsampling( DownsampleAction.Request request,