diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java index e3128fd1b904b..e4f4f88254977 100644 --- a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java @@ -27,6 +27,7 @@ import org.elasticsearch.action.datastreams.CreateDataStreamAction; import org.elasticsearch.action.datastreams.GetDataStreamAction; import org.elasticsearch.action.datastreams.ModifyDataStreamsAction; +import org.elasticsearch.action.datastreams.lifecycle.ErrorEntry; import org.elasticsearch.action.datastreams.lifecycle.ExplainIndexDataStreamLifecycle; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; @@ -405,7 +406,7 @@ public void testErrorRecordingOnRollover() throws Exception { assertBusy(() -> { String writeIndexName = getBackingIndices(dataStreamName).get(1); - String writeIndexRolloverError = null; + ErrorEntry writeIndexRolloverError = null; Iterable lifecycleServices = internalCluster().getInstances(DataStreamLifecycleService.class); for (DataStreamLifecycleService lifecycleService : lifecycleServices) { @@ -416,7 +417,7 @@ public void testErrorRecordingOnRollover() throws Exception { } assertThat(writeIndexRolloverError, is(notNullValue())); - assertThat(writeIndexRolloverError, containsString("maximum normal shards open")); + assertThat(writeIndexRolloverError.error(), containsString("maximum normal shards open")); }); // let's reset the cluster max shards per node limit to allow rollover to proceed and check the error store is empty @@ -497,7 +498,7 @@ public void testErrorRecordingOnRetention() throws Exception { String writeIndex = backingIndices.get(1).getName(); assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2)); - String recordedRetentionExecutionError = null; + ErrorEntry recordedRetentionExecutionError = null; Iterable lifecycleServices = internalCluster().getInstances(DataStreamLifecycleService.class); for (DataStreamLifecycleService lifecycleService : lifecycleServices) { @@ -508,7 +509,7 @@ public void testErrorRecordingOnRetention() throws Exception { } assertThat(recordedRetentionExecutionError, is(notNullValue())); - assertThat(recordedRetentionExecutionError, containsString("blocked by: [FORBIDDEN/5/index read-only (api)")); + assertThat(recordedRetentionExecutionError.error(), containsString("blocked by: [FORBIDDEN/5/index read-only (api)")); }); // let's mark the index as writeable and make sure it's deleted and the error store is empty diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/ExplainDataStreamLifecycleIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/ExplainDataStreamLifecycleIT.java index b3156849c148a..6ff50d88aeb05 100644 --- a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/ExplainDataStreamLifecycleIT.java +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/ExplainDataStreamLifecycleIT.java @@ -49,6 +49,7 @@ import static org.elasticsearch.indices.ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @@ -252,7 +253,9 @@ public void testExplainLifecycleForIndicesWithErrors() throws Exception { // index has not been rolled over yet assertThat(explainIndex.getGenerationTime(System::currentTimeMillis), nullValue()); - assertThat(explainIndex.getError(), containsString("maximum normal shards open")); + assertThat(explainIndex.getError(), notNullValue()); + assertThat(explainIndex.getError().error(), containsString("maximum normal shards open")); + assertThat(explainIndex.getError().retryCount(), greaterThanOrEqualTo(1)); } }); diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java index a845b75450366..2cf44dc0e3218 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java @@ -141,6 +141,7 @@ public List> getSettings() { pluginSettings.add(DataStreamLifecycleService.DATA_STREAM_LIFECYCLE_POLL_INTERVAL_SETTING); pluginSettings.add(DataStreamLifecycleService.DATA_STREAM_MERGE_POLICY_TARGET_FLOOR_SEGMENT_SETTING); pluginSettings.add(DataStreamLifecycleService.DATA_STREAM_MERGE_POLICY_TARGET_FACTOR_SETTING); + pluginSettings.add(DataStreamLifecycleService.DATA_STREAM_SIGNALLING_ERROR_RETRY_INTERVAL_SETTING); return pluginSettings; } @@ -155,7 +156,7 @@ public Collection createComponents(PluginServices services) { ); this.updateTimeSeriesRangeService.set(updateTimeSeriesRangeService); components.add(this.updateTimeSeriesRangeService.get()); - errorStoreInitialisationService.set(new DataStreamLifecycleErrorStore()); + errorStoreInitialisationService.set(new DataStreamLifecycleErrorStore(services.threadPool()::absoluteTimeInMillis)); dataLifecycleInitialisationService.set( new DataStreamLifecycleService( settings, diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleErrorStore.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleErrorStore.java index c0b517fa2ca9c..47589fd7276f4 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleErrorStore.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleErrorStore.java @@ -9,12 +9,14 @@ package org.elasticsearch.datastreams.lifecycle; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.datastreams.lifecycle.ErrorEntry; import org.elasticsearch.common.Strings; import org.elasticsearch.core.Nullable; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.function.LongSupplier; import static org.elasticsearch.xcontent.ToXContent.EMPTY_PARAMS; @@ -26,7 +28,12 @@ public class DataStreamLifecycleErrorStore { public static final int MAX_ERROR_MESSAGE_LENGTH = 1000; - private final ConcurrentMap indexNameToError = new ConcurrentHashMap<>(); + private final ConcurrentMap indexNameToError = new ConcurrentHashMap<>(); + private final LongSupplier nowSupplier; + + public DataStreamLifecycleErrorStore(LongSupplier nowSupplier) { + this.nowSupplier = nowSupplier; + } /** * Records a string representation of the provided exception for the provided index. @@ -35,13 +42,24 @@ public class DataStreamLifecycleErrorStore { * Returns the previously recorded error for the provided index, or null otherwise. */ @Nullable - public String recordError(String indexName, Exception e) { + public ErrorEntry recordError(String indexName, Exception e) { String exceptionToString = Strings.toString((builder, params) -> { ElasticsearchException.generateThrowableXContent(builder, EMPTY_PARAMS, e); return builder; }); - String recordedError = Strings.substring(exceptionToString, 0, MAX_ERROR_MESSAGE_LENGTH); - return indexNameToError.put(indexName, recordedError); + String newError = Strings.substring(exceptionToString, 0, MAX_ERROR_MESSAGE_LENGTH); + ErrorEntry existingError = indexNameToError.get(indexName); + long recordedTimestamp = nowSupplier.getAsLong(); + if (existingError == null) { + indexNameToError.put(indexName, new ErrorEntry(recordedTimestamp, newError, recordedTimestamp, 0)); + } else { + if (existingError.error().equals(newError)) { + indexNameToError.put(indexName, ErrorEntry.incrementRetryCount(existingError, nowSupplier)); + } else { + indexNameToError.put(indexName, new ErrorEntry(recordedTimestamp, newError, recordedTimestamp, 0)); + } + } + return existingError; } /** @@ -62,7 +80,7 @@ public void clearStore() { * Retrieves the recorded error for the provided index. */ @Nullable - public String getError(String indexName) { + public ErrorEntry getError(String indexName) { return indexNameToError.get(indexName); } diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java index 5d2d95a3dc954..03d1340c14dbb 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java @@ -29,6 +29,7 @@ import org.elasticsearch.action.admin.indices.rollover.RolloverResponse; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsAction; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; +import org.elasticsearch.action.datastreams.lifecycle.ErrorEntry; import org.elasticsearch.action.downsample.DownsampleAction; import org.elasticsearch.action.downsample.DownsampleConfig; import org.elasticsearch.action.support.DefaultShardOperationFailedException; @@ -127,6 +128,19 @@ public class DataStreamLifecycleService implements ClusterStateListener, Closeab Setting.Property.Dynamic, Setting.Property.NodeScope ); + /** + * This setting controls how often we signal that an index is in the error state when it comes to its data stream lifecycle + * progression. + * The signalling is currently logging at the `error` level but in the future it can signify other types of signalling. + */ + public static final Setting DATA_STREAM_SIGNALLING_ERROR_RETRY_INTERVAL_SETTING = Setting.intSetting( + "data_streams.lifecycle.signalling.error_retry_interval", + 10, + 1, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + public static final String DOWNSAMPLED_INDEX_PREFIX = "downsample-"; private static final Logger logger = LogManager.getLogger(DataStreamLifecycleService.class); @@ -156,6 +170,10 @@ public class DataStreamLifecycleService implements ClusterStateListener, Closeab private final MasterServiceTaskQueue swapSourceWithDownsampleIndexQueue; private volatile ByteSizeValue targetMergePolicyFloorSegment; private volatile int targetMergePolicyFactor; + /** + * The number of retries for a particular index and error after which DSL will emmit a signal (e.g. log statement) + */ + private volatile int signallingErrorRetryInterval; private static final SimpleBatchedExecutor FORCE_MERGE_STATE_UPDATE_TASK_EXECUTOR = new SimpleBatchedExecutor<>() { @@ -194,6 +212,7 @@ public DataStreamLifecycleService( this.pollInterval = DATA_STREAM_LIFECYCLE_POLL_INTERVAL_SETTING.get(settings); this.targetMergePolicyFloorSegment = DATA_STREAM_MERGE_POLICY_TARGET_FLOOR_SEGMENT_SETTING.get(settings); this.targetMergePolicyFactor = DATA_STREAM_MERGE_POLICY_TARGET_FACTOR_SETTING.get(settings); + this.signallingErrorRetryInterval = DATA_STREAM_SIGNALLING_ERROR_RETRY_INTERVAL_SETTING.get(settings); this.rolloverConfiguration = clusterService.getClusterSettings() .get(DataStreamLifecycle.CLUSTER_LIFECYCLE_DEFAULT_ROLLOVER_SETTING); this.forceMergeClusterStateUpdateTaskQueue = clusterService.createTaskQueue( @@ -221,6 +240,8 @@ public void init() { .addSettingsUpdateConsumer(DATA_STREAM_MERGE_POLICY_TARGET_FACTOR_SETTING, this::updateMergePolicyFactor); clusterService.getClusterSettings() .addSettingsUpdateConsumer(DATA_STREAM_MERGE_POLICY_TARGET_FLOOR_SEGMENT_SETTING, this::updateMergePolicyFloorSegment); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(DATA_STREAM_SIGNALLING_ERROR_RETRY_INTERVAL_SETTING, this::updateSignallingRetryThreshold); } @Override @@ -526,7 +547,8 @@ private void downsampleIndexOnce(DataStreamLifecycle.Downsampling.Round round, S "Data stream lifecycle encountered an error trying to downsample index [%s]. Data stream lifecycle will " + "attempt to downsample the index on its next run.", sourceIndex - ) + ), + signallingErrorRetryInterval ), (req, reqListener) -> downsampleIndex(request, reqListener) ); @@ -553,20 +575,22 @@ private Set evaluateDownsampleStatus( if (currentRound.equals(lastRound)) { // target downsampling index exists and is not a downsampling index (name clash?) // we fail now but perhaps we should just randomise the name? - String previousError = errorStore.getError(indexName); - - errorStore.recordError(indexName, new ResourceAlreadyExistsException(downsampleIndexName)); - // To avoid spamming our logs, we only want to log the error once. - if (previousError == null || previousError.equals(errorStore.getError(indexName)) == false) { - logger.error( - "Data stream lifecycle service is unable to downsample backing index [{}] for data stream [{}] and " - + "donwsampling round [{}] because the target downsample index [{}] already exists", + + recordAndLogError( + indexName, + errorStore, + new ResourceAlreadyExistsException(downsampleIndexName), + String.format( + Locale.ROOT, + "Data stream lifecycle service is unable to downsample backing index [%s] for data " + + "stream [%s] and donwsampling round [%s] because the target downsample index [%s] already exists", indexName, dataStream.getName(), currentRound, downsampleIndexName - ); - } + ), + signallingErrorRetryInterval + ); } yield affectedIndices; } @@ -625,7 +649,8 @@ private void replaceBackingIndexWithDownsampleIndexOnce(DataStream dataStream, S backingIndexName, downsampleIndexName, dataStream - ) + ), + signallingErrorRetryInterval ), (req, reqListener) -> { logger.trace( @@ -660,7 +685,8 @@ private void deleteIndexOnce(String indexName, String reason) { DeleteIndexAction.NAME, indexName, errorStore, - Strings.format("Data stream lifecycle encountered an error trying to delete index [%s]", indexName) + Strings.format("Data stream lifecycle encountered an error trying to delete index [%s]", indexName), + signallingErrorRetryInterval ), (req, reqListener) -> deleteIndex(deleteIndexRequest, reason, reqListener) ); @@ -677,7 +703,8 @@ private void addIndexBlockOnce(String indexName) { AddIndexBlockAction.NAME, indexName, errorStore, - Strings.format("Data stream lifecycle service encountered an error trying to mark index [%s] as readonly", indexName) + Strings.format("Data stream lifecycle service encountered an error trying to mark index [%s] as readonly", indexName), + signallingErrorRetryInterval ), (req, reqListener) -> addIndexBlock(addIndexBlockRequest, reqListener) ); @@ -740,7 +767,8 @@ private void maybeExecuteRollover(ClusterState state, DataStream dataStream) { RolloverAction.NAME, writeIndex.getName(), errorStore, - Strings.format("Data stream lifecycle encountered an error trying to rollover data steam [%s]", dataStream.getName()) + Strings.format("Data stream lifecycle encountered an error trying to rollover data steam [%s]", dataStream.getName()), + signallingErrorRetryInterval ), (req, reqListener) -> rolloverDataStream(writeIndex.getName(), rolloverRequest, reqListener) ); @@ -837,7 +865,8 @@ private Set maybeExecuteForceMerge(ClusterState state, List indice "Data stream lifecycle encountered an error trying to to update settings [%s] for index [%s]", updateMergePolicySettingsRequest.settings().keySet(), indexName - ) + ), + signallingErrorRetryInterval ), (req, reqListener) -> updateIndexSetting(updateMergePolicySettingsRequest, reqListener) ); @@ -855,7 +884,8 @@ private Set maybeExecuteForceMerge(ClusterState state, List indice "Data stream lifecycle encountered an error trying to force merge index [%s]. Data stream lifecycle will " + "attempt to force merge the index on its next run.", indexName - ) + ), + signallingErrorRetryInterval ), (req, reqListener) -> forceMergeIndex(forceMergeRequest, reqListener) ); @@ -1174,17 +1204,20 @@ static class ErrorRecordingActionListener implements ActionListener { private final String targetIndex; private final DataStreamLifecycleErrorStore errorStore; private final String errorLogMessage; + private final int signallingErrorRetryThreshold; ErrorRecordingActionListener( String actionName, String targetIndex, DataStreamLifecycleErrorStore errorStore, - String errorLogMessage + String errorLogMessage, + int signallingErrorRetryThreshold ) { this.actionName = actionName; this.targetIndex = targetIndex; this.errorStore = errorStore; this.errorLogMessage = errorLogMessage; + this.signallingErrorRetryThreshold = signallingErrorRetryThreshold; } @Override @@ -1195,21 +1228,64 @@ public void onResponse(Void unused) { @Override public void onFailure(Exception e) { - recordAndLogError(targetIndex, errorStore, e, errorLogMessage); + recordAndLogError(targetIndex, errorStore, e, errorLogMessage, signallingErrorRetryThreshold); } } /** * Records the provided error for the index in the error store and logs the error message at `ERROR` level if the error for the index - * is different to what's already in the error store. - * This allows us to not spam the logs and only log new errors when we're about to record them in the store. + * is different to what's already in the error store or if the same error was in the error store for a number of retries divible by + * the provided signallingErrorRetryThreshold (i.e. we log to level `error` every signallingErrorRetryThreshold retries, if the error + * stays the same) + * This allows us to not spam the logs, but signal to the logs if DSL is not making progress. */ - static void recordAndLogError(String targetIndex, DataStreamLifecycleErrorStore errorStore, Exception e, String logMessage) { - String previousError = errorStore.recordError(targetIndex, e); - if (previousError == null || previousError.equals(errorStore.getError(targetIndex)) == false) { + static void recordAndLogError( + String targetIndex, + DataStreamLifecycleErrorStore errorStore, + Exception e, + String logMessage, + int signallingErrorRetryThreshold + ) { + ErrorEntry previousError = errorStore.recordError(targetIndex, e); + ErrorEntry currentError = errorStore.getError(targetIndex); + if (previousError == null || (currentError != null && previousError.error().equals(currentError.error()) == false)) { logger.error(logMessage, e); } else { - logger.trace(logMessage, e); + if (currentError != null) { + if (currentError.retryCount() % signallingErrorRetryThreshold == 0) { + logger.error( + String.format( + Locale.ROOT, + "%s\nFailing since [%d], operation retried [%d] times", + logMessage, + currentError.firstOccurrenceTimestamp(), + currentError.retryCount() + ), + e + ); + } else { + logger.trace( + String.format( + Locale.ROOT, + "%s\nFailing since [%d], operation retried [%d] times", + logMessage, + currentError.firstOccurrenceTimestamp(), + currentError.retryCount() + ), + e + ); + } + } else { + logger.trace( + String.format( + Locale.ROOT, + "Index [%s] encountered error [%s] but there's no record in the error store anymore", + targetIndex, + logMessage + ), + e + ); + } } } @@ -1240,6 +1316,10 @@ private void updateMergePolicyFactor(int newFactor) { this.targetMergePolicyFactor = newFactor; } + public void updateSignallingRetryThreshold(int retryThreshold) { + this.signallingErrorRetryInterval = retryThreshold; + } + private void cancelJob() { if (scheduler.get() != null) { scheduler.get().remove(LIFECYCLE_JOB_NAME); diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleErrorStoreTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleErrorStoreTests.java index ace52cd4c3f8b..c1255cc9e3a72 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleErrorStoreTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleErrorStoreTests.java @@ -8,6 +8,7 @@ package org.elasticsearch.datastreams.lifecycle; +import org.elasticsearch.action.datastreams.lifecycle.ErrorEntry; import org.elasticsearch.test.ESTestCase; import org.junit.Before; @@ -27,11 +28,11 @@ public class DataStreamLifecycleErrorStoreTests extends ESTestCase { @Before public void setupServices() { - errorStore = new DataStreamLifecycleErrorStore(); + errorStore = new DataStreamLifecycleErrorStore(System::currentTimeMillis); } public void testRecordAndRetrieveError() { - String existingRecordedError = errorStore.recordError("test", new NullPointerException("testing")); + ErrorEntry existingRecordedError = errorStore.recordError("test", new NullPointerException("testing")); assertThat(existingRecordedError, is(nullValue())); assertThat(errorStore.getError("test"), is(notNullValue())); assertThat(errorStore.getAllIndices().size(), is(1)); @@ -39,7 +40,7 @@ public void testRecordAndRetrieveError() { existingRecordedError = errorStore.recordError("test", new IllegalStateException("bad state")); assertThat(existingRecordedError, is(notNullValue())); - assertThat(existingRecordedError, containsString("testing")); + assertThat(existingRecordedError.error(), containsString("testing")); } public void testRetrieveAfterClear() { @@ -80,6 +81,6 @@ public void testRecordedErrorIsMaxOneThousandChars() { NullPointerException exceptionWithLongMessage = new NullPointerException(randomAlphaOfLength(2000)); errorStore.recordError("test", exceptionWithLongMessage); assertThat(errorStore.getError("test"), is(notNullValue())); - assertThat(errorStore.getError("test").length(), is(MAX_ERROR_MESSAGE_LENGTH)); + assertThat(errorStore.getError("test").error().length(), is(MAX_ERROR_MESSAGE_LENGTH)); } } diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java index fd9664dd94493..bd6100c95b412 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.admin.indices.rollover.RolloverInfo; import org.elasticsearch.action.admin.indices.rollover.RolloverRequest; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; +import org.elasticsearch.action.datastreams.lifecycle.ErrorEntry; import org.elasticsearch.action.downsample.DownsampleAction; import org.elasticsearch.action.downsample.DownsampleConfig; import org.elasticsearch.action.support.DefaultShardOperationFailedException; @@ -103,6 +104,7 @@ import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleFixtures.createDataStream; import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService.DATA_STREAM_MERGE_POLICY_TARGET_FACTOR_SETTING; import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService.DATA_STREAM_MERGE_POLICY_TARGET_FLOOR_SEGMENT_SETTING; +import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService.DATA_STREAM_SIGNALLING_ERROR_RETRY_INTERVAL_SETTING; import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService.DOWNSAMPLED_INDEX_PREFIX; import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService.FORCE_MERGE_COMPLETED_TIMESTAMP_METADATA_KEY; import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService.ONE_HUNDRED_MB; @@ -135,6 +137,7 @@ public void setupServices() { builtInClusterSettings.add(DataStreamLifecycleService.DATA_STREAM_LIFECYCLE_POLL_INTERVAL_SETTING); builtInClusterSettings.add(DATA_STREAM_MERGE_POLICY_TARGET_FLOOR_SEGMENT_SETTING); builtInClusterSettings.add(DATA_STREAM_MERGE_POLICY_TARGET_FACTOR_SETTING); + builtInClusterSettings.add(DATA_STREAM_SIGNALLING_ERROR_RETRY_INTERVAL_SETTING); ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, builtInClusterSettings); clusterService = createClusterService(threadPool, clusterSettings); @@ -162,7 +165,7 @@ public void setupServices() { clock, threadPool, () -> now, - new DataStreamLifecycleErrorStore(), + new DataStreamLifecycleErrorStore(() -> now), allocationService ); clientDelegate = null; @@ -1307,9 +1310,9 @@ public void testDownsamplingWhenTargetIndexNameClashYieldsException() throws Exc dataStreamLifecycleService.maybeExecuteDownsampling(clusterService.state(), dataStream, List.of(firstGenIndex)); assertThat(clientSeenRequests.size(), is(0)); - String error = dataStreamLifecycleService.getErrorStore().getError(firstGenIndexName); + ErrorEntry error = dataStreamLifecycleService.getErrorStore().getError(firstGenIndexName); assertThat(error, notNullValue()); - assertThat(error, containsString("resource_already_exists_exception")); + assertThat(error.error(), containsString("resource_already_exists_exception")); } public void testTimeSeriesIndicesStillWithinTimeBounds() { diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/action/ExplainDataStreamLifecycleResponseTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/action/ExplainDataStreamLifecycleResponseTests.java index 3d9ec173e7116..0c10e3964e168 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/action/ExplainDataStreamLifecycleResponseTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/action/ExplainDataStreamLifecycleResponseTests.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.admin.indices.rollover.MinPrimaryShardDocsCondition; import org.elasticsearch.action.admin.indices.rollover.RolloverConditions; import org.elasticsearch.action.admin.indices.rollover.RolloverConfiguration; +import org.elasticsearch.action.datastreams.lifecycle.ErrorEntry; import org.elasticsearch.action.datastreams.lifecycle.ExplainIndexDataStreamLifecycle; import org.elasticsearch.cluster.metadata.DataStreamLifecycle; import org.elasticsearch.common.bytes.BytesReference; @@ -103,7 +104,21 @@ public void testToXContent() throws IOException { assertThat(explainIndexMap.get("generation_time"), is(nullValue())); } assertThat(explainIndexMap.get("lifecycle"), is(Map.of("enabled", true))); // empty lifecycle - assertThat(explainIndexMap.get("error"), is(explainIndex.getError())); + if (explainIndex.getError() != null) { + Map errorObject = (Map) explainIndexMap.get("error"); + assertThat(errorObject.get(ErrorEntry.MESSAGE_FIELD.getPreferredName()), is(explainIndex.getError().error())); + assertThat( + errorObject.get(ErrorEntry.FIRST_OCCURRENCE_MILLIS_FIELD.getPreferredName()), + is(explainIndex.getError().firstOccurrenceTimestamp()) + ); + assertThat( + errorObject.get(ErrorEntry.LAST_RECORDED_MILLIS_FIELD.getPreferredName()), + is(explainIndex.getError().recordedTimestamp()) + ); + assertThat(errorObject.get(ErrorEntry.RETRY_COUNT_FIELD.getPreferredName()), is(explainIndex.getError().retryCount())); + } else { + assertThat(explainIndexMap.get("error"), is(nullValue())); + } } } @@ -155,7 +170,21 @@ public void testToXContent() throws IOException { } else { assertThat(explainIndexMap.get("generation_time"), is(nullValue())); } - assertThat(explainIndexMap.get("error"), is(explainIndex.getError())); + if (explainIndex.getError() != null) { + Map errorObject = (Map) explainIndexMap.get("error"); + assertThat(errorObject.get(ErrorEntry.MESSAGE_FIELD.getPreferredName()), is(explainIndex.getError().error())); + assertThat( + errorObject.get(ErrorEntry.FIRST_OCCURRENCE_MILLIS_FIELD.getPreferredName()), + is(explainIndex.getError().firstOccurrenceTimestamp()) + ); + assertThat( + errorObject.get(ErrorEntry.LAST_RECORDED_MILLIS_FIELD.getPreferredName()), + is(explainIndex.getError().recordedTimestamp()) + ); + assertThat(errorObject.get(ErrorEntry.RETRY_COUNT_FIELD.getPreferredName()), is(explainIndex.getError().retryCount())); + } else { + assertThat(explainIndexMap.get("error"), is(nullValue())); + } Map lifecycleRollover = (Map) ((Map) explainIndexMap.get("lifecycle")).get( "rollover" @@ -173,7 +202,14 @@ public void testToXContent() throws IOException { randomBoolean() ? now + TimeValue.timeValueDays(1).getMillis() : null, null, lifecycle, - randomBoolean() ? new NullPointerException("bad times").getMessage() : null + randomBoolean() + ? new ErrorEntry( + System.currentTimeMillis(), + new NullPointerException("bad times").getMessage(), + System.currentTimeMillis(), + randomIntBetween(0, 30) + ) + : null ); Response response = new Response(List.of(explainIndexWithNullGenerationDate), null); @@ -223,7 +259,14 @@ private static ExplainIndexDataStreamLifecycle createRandomIndexDataStreamLifecy randomBoolean() ? now + TimeValue.timeValueDays(1).getMillis() : null, randomBoolean() ? TimeValue.timeValueMillis(now) : null, lifecycle, - randomBoolean() ? new NullPointerException("bad times").getMessage() : null + randomBoolean() + ? new ErrorEntry( + System.currentTimeMillis(), + new NullPointerException("bad times").getMessage(), + System.currentTimeMillis(), + randomIntBetween(0, 30) + ) + : null ); } diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/10_explain_lifecycle.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/10_explain_lifecycle.yml index 51f2980671add..b52c860a812ee 100644 --- a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/10_explain_lifecycle.yml +++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/10_explain_lifecycle.yml @@ -2,7 +2,7 @@ "Explain backing index lifecycle": - skip: version: " - 8.10.99" - reason: "Data stream lifecycle was GA in 8.11" + reason: "Data stream lifecycle was released as tech preview in 8.11" features: allowed_warnings - do: allowed_warnings: diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index f265ebb240dcf..032d1b2499588 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -149,6 +149,7 @@ static TransportVersion def(int id) { public static final TransportVersion IP_ADDRESS_WRITEABLE = def(8_524_00_0); public static final TransportVersion PRIMARY_TERM_ADDED = def(8_525_00_0); public static final TransportVersion CLUSTER_FEATURES_ADDED = def(8_526_00_0); + public static final TransportVersion DSL_ERROR_STORE_INFORMATION_ENHANCED = def(8_527_00_0); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/action/datastreams/lifecycle/ErrorEntry.java b/server/src/main/java/org/elasticsearch/action/datastreams/lifecycle/ErrorEntry.java new file mode 100644 index 0000000000000..79c59314d7425 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/datastreams/lifecycle/ErrorEntry.java @@ -0,0 +1,80 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.datastreams.lifecycle; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.ToXContentObject; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.function.LongSupplier; + +/** + * Represents the recorded error for an index that Data Stream Lifecycle Service encountered. + */ +public record ErrorEntry(long firstOccurrenceTimestamp, String error, long recordedTimestamp, int retryCount) + implements + Writeable, + ToXContentObject { + + public ErrorEntry(StreamInput in) throws IOException { + this(in.readLong(), in.readString(), in.readLong(), in.readInt()); + } + + public static final ParseField FIRST_OCCURRENCE_FIELD = new ParseField("first_occurrence"); + public static final ParseField FIRST_OCCURRENCE_MILLIS_FIELD = new ParseField("first_occurrence_millis"); + public static final ParseField MESSAGE_FIELD = new ParseField("message"); + public static final ParseField LAST_RECORDED_MILLIS_FIELD = new ParseField("last_recorded_millis"); + public static final ParseField LAST_RECORDED_FIELD = new ParseField("last_recorded"); + public static final ParseField RETRY_COUNT_FIELD = new ParseField("retry_count"); + + /** + * Creates a new ErrorEntry with the same first occurent timestamp and error message as the provided existing record, but with a fresh + * timestamp for the latest occurrence and an incremented retry count. + */ + public static ErrorEntry incrementRetryCount(ErrorEntry existingRecord, LongSupplier nowSupplier) { + return new ErrorEntry( + existingRecord.firstOccurrenceTimestamp(), + existingRecord.error(), + nowSupplier.getAsLong(), + existingRecord.retryCount() + 1 + ); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.humanReadableField( + FIRST_OCCURRENCE_MILLIS_FIELD.getPreferredName(), + FIRST_OCCURRENCE_FIELD.getPreferredName(), + new TimeValue(firstOccurrenceTimestamp) + ); + builder.field(MESSAGE_FIELD.getPreferredName(), error); + builder.humanReadableField( + LAST_RECORDED_MILLIS_FIELD.getPreferredName(), + LAST_RECORDED_FIELD.getPreferredName(), + new TimeValue(recordedTimestamp) + ); + builder.field(RETRY_COUNT_FIELD.getPreferredName(), retryCount); + builder.endObject(); + return builder; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeLong(firstOccurrenceTimestamp); + out.writeString(error); + out.writeLong(recordedTimestamp); + out.writeInt(retryCount); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/datastreams/lifecycle/ExplainIndexDataStreamLifecycle.java b/server/src/main/java/org/elasticsearch/action/datastreams/lifecycle/ExplainIndexDataStreamLifecycle.java index c6d8cc3f085a2..ea78f43a54b21 100644 --- a/server/src/main/java/org/elasticsearch/action/datastreams/lifecycle/ExplainIndexDataStreamLifecycle.java +++ b/server/src/main/java/org/elasticsearch/action/datastreams/lifecycle/ExplainIndexDataStreamLifecycle.java @@ -23,6 +23,8 @@ import java.util.Objects; import java.util.function.Supplier; +import static org.elasticsearch.TransportVersions.DSL_ERROR_STORE_INFORMATION_ENHANCED; + /** * Encapsulates the information that describes an index from its data stream lifecycle perspective. */ @@ -50,7 +52,7 @@ public class ExplainIndexDataStreamLifecycle implements Writeable, ToXContentObj @Nullable private final DataStreamLifecycle lifecycle; @Nullable - private final String error; + private final ErrorEntry error; private Supplier nowSupplier = System::currentTimeMillis; public ExplainIndexDataStreamLifecycle( @@ -60,7 +62,7 @@ public ExplainIndexDataStreamLifecycle( @Nullable Long rolloverDate, @Nullable TimeValue generationDate, @Nullable DataStreamLifecycle lifecycle, - @Nullable String error + @Nullable ErrorEntry error ) { this.index = index; this.managedByLifecycle = managedByLifecycle; @@ -79,7 +81,12 @@ public ExplainIndexDataStreamLifecycle(StreamInput in) throws IOException { this.rolloverDate = in.readOptionalLong(); this.generationDateMillis = in.readOptionalLong(); this.lifecycle = in.readOptionalWriteable(DataStreamLifecycle::new); - this.error = in.readOptionalString(); + if (in.getTransportVersion().onOrAfter(DSL_ERROR_STORE_INFORMATION_ENHANCED)) { + this.error = in.readOptionalWriteable(ErrorEntry::new); + } else { + String bwcErrorMessage = in.readOptionalString(); + this.error = new ErrorEntry(-1L, bwcErrorMessage, -1L, -1); + } } else { this.indexCreationDate = null; this.rolloverDate = null; @@ -123,7 +130,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params, @Nulla lifecycle.toXContent(builder, params, rolloverConfiguration); } if (this.error != null) { - builder.field(ERROR_FIELD.getPreferredName(), error); + if (error.firstOccurrenceTimestamp() != -1L && error.recordedTimestamp() != -1L && error.retryCount() != -1) { + builder.field(ERROR_FIELD.getPreferredName(), error); + } else { + // bwc for error field being a string + builder.field(ERROR_FIELD.getPreferredName(), error.error()); + } } } builder.endObject(); @@ -139,7 +151,12 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalLong(rolloverDate); out.writeOptionalLong(generationDateMillis); out.writeOptionalWriteable(lifecycle); - out.writeOptionalString(error); + if (out.getTransportVersion().onOrAfter(DSL_ERROR_STORE_INFORMATION_ENHANCED)) { + out.writeOptionalWriteable(error); + } else { + String errorMessage = error != null ? error.error() : null; + out.writeOptionalString(errorMessage); + } } } @@ -202,7 +219,7 @@ public DataStreamLifecycle getLifecycle() { return lifecycle; } - public String getError() { + public ErrorEntry getError() { return error; } diff --git a/server/src/test/java/org/elasticsearch/action/datastreams/lifecycle/ErrorEntryTests.java b/server/src/test/java/org/elasticsearch/action/datastreams/lifecycle/ErrorEntryTests.java new file mode 100644 index 0000000000000..c96f849899254 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/datastreams/lifecycle/ErrorEntryTests.java @@ -0,0 +1,29 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.datastreams.lifecycle; + +import org.elasticsearch.test.ESTestCase; + +import static org.hamcrest.Matchers.is; + +public class ErrorEntryTests extends ESTestCase { + + public void testIncrementRetryCount() { + long now = System.currentTimeMillis(); + ErrorEntry existingRecord = new ErrorEntry(now, "error message", now, 0); + long newOccurenceTimestamp = now + 2L; + ErrorEntry newEntry = ErrorEntry.incrementRetryCount(existingRecord, () -> newOccurenceTimestamp); + + assertThat(newEntry.firstOccurrenceTimestamp(), is(existingRecord.firstOccurrenceTimestamp())); + assertThat(newEntry.error(), is(existingRecord.error())); + assertThat(newEntry.recordedTimestamp(), is(newOccurenceTimestamp)); + assertThat(newEntry.retryCount(), is(1)); + } + +} diff --git a/server/src/test/java/org/elasticsearch/action/datastreams/lifecycle/ExplainIndexDataStreamLifecycleTests.java b/server/src/test/java/org/elasticsearch/action/datastreams/lifecycle/ExplainIndexDataStreamLifecycleTests.java index b3b8b6b753cf2..7087b677673e7 100644 --- a/server/src/test/java/org/elasticsearch/action/datastreams/lifecycle/ExplainIndexDataStreamLifecycleTests.java +++ b/server/src/test/java/org/elasticsearch/action/datastreams/lifecycle/ExplainIndexDataStreamLifecycleTests.java @@ -31,7 +31,14 @@ public void testGetGenerationTime() { randomBoolean() ? now + TimeValue.timeValueDays(1).getMillis() : null, null, new DataStreamLifecycle(), - randomBoolean() ? new NullPointerException("bad times").getMessage() : null + randomBoolean() + ? new ErrorEntry( + System.currentTimeMillis(), + new NullPointerException("bad times").getMessage(), + System.currentTimeMillis(), + randomIntBetween(0, 30) + ) + : null ); assertThat(explainIndexDataStreamLifecycle.getGenerationTime(() -> now + 50L), is(nullValue())); explainIndexDataStreamLifecycle = new ExplainIndexDataStreamLifecycle( @@ -41,7 +48,14 @@ public void testGetGenerationTime() { randomBoolean() ? now + TimeValue.timeValueDays(1).getMillis() : null, TimeValue.timeValueMillis(now + 100), new DataStreamLifecycle(), - randomBoolean() ? new NullPointerException("bad times").getMessage() : null + randomBoolean() + ? new ErrorEntry( + System.currentTimeMillis(), + new NullPointerException("bad times").getMessage(), + System.currentTimeMillis(), + randomIntBetween(0, 30) + ) + : null ); assertThat(explainIndexDataStreamLifecycle.getGenerationTime(() -> now + 500L), is(TimeValue.timeValueMillis(400))); } @@ -189,7 +203,14 @@ private static ExplainIndexDataStreamLifecycle createManagedIndexDataStreamLifec randomBoolean() ? now + TimeValue.timeValueDays(1).getMillis() : null, TimeValue.timeValueMillis(now), lifecycle, - randomBoolean() ? new NullPointerException("bad times").getMessage() : null + randomBoolean() + ? new ErrorEntry( + System.currentTimeMillis(), + new NullPointerException("bad times").getMessage(), + System.currentTimeMillis(), + randomIntBetween(0, 30) + ) + : null ); } diff --git a/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamLifecycleDownsamplingSecurityIT.java b/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamLifecycleDownsamplingSecurityIT.java index cdddd0a5e5fe0..4b7a9f46431a5 100644 --- a/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamLifecycleDownsamplingSecurityIT.java +++ b/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamLifecycleDownsamplingSecurityIT.java @@ -18,6 +18,7 @@ import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.datastreams.GetDataStreamAction; +import org.elasticsearch.action.datastreams.lifecycle.ErrorEntry; import org.elasticsearch.action.downsample.DownsampleConfig; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.WriteRequest; @@ -245,7 +246,10 @@ private Map collectErrorsFromStoreAsMap() { DataStreamLifecycleErrorStore errorStore = lifecycleService.getErrorStore(); List allIndices = errorStore.getAllIndices(); for (var index : allIndices) { - indicesAndErrors.put(index, errorStore.getError(index)); + ErrorEntry error = errorStore.getError(index); + if (error != null) { + indicesAndErrors.put(index, error.error()); + } } } return indicesAndErrors; diff --git a/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamLifecycleServiceRuntimeSecurityIT.java b/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamLifecycleServiceRuntimeSecurityIT.java index 6e68557bae2e1..93cd03060842b 100644 --- a/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamLifecycleServiceRuntimeSecurityIT.java +++ b/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamLifecycleServiceRuntimeSecurityIT.java @@ -15,6 +15,7 @@ import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.datastreams.CreateDataStreamAction; import org.elasticsearch.action.datastreams.GetDataStreamAction; +import org.elasticsearch.action.datastreams.lifecycle.ErrorEntry; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.DataStream; @@ -169,7 +170,10 @@ private Map collectErrorsFromStoreAsMap() { DataStreamLifecycleErrorStore errorStore = lifecycleService.getErrorStore(); List allIndices = errorStore.getAllIndices(); for (var index : allIndices) { - indicesAndErrors.put(index, errorStore.getError(index)); + ErrorEntry error = errorStore.getError(index); + if (error != null) { + indicesAndErrors.put(index, error.error()); + } } } return indicesAndErrors;