From 8c36f081f39fa1cf1c6a38a410aa69be0c60ab8b Mon Sep 17 00:00:00 2001 From: Przemyslaw Gomulka Date: Tue, 16 Apr 2024 18:18:46 +0200 Subject: [PATCH] Improve EarlyDeprecationindexingIT test reliability backport(#105696) #107540 this test intends to test the bulkProcessor2's request consumer (see DeprecationIndexingComponent#getBulkProcessor) scheduling requests before the startup is completed (flush is enabled). To verify this behaviour the flush has to happen before the templates are loaded. To test this reliably the flush interval in the test should be as small as possible (not hardcoded 5s as of now) This commit introduces a setting (not meant to be exposed/documented) to allow for the flush interval to be configured. It also adds additional trace logging to help with troubleshooting. relates #104716 --- .../qa/early-deprecation-rest/build.gradle | 5 ++++- .../xpack/deprecation/Deprecation.java | 8 +++++++- .../logging/DeprecationIndexingAppender.java | 19 ++++++++++++++++--- .../logging/DeprecationIndexingComponent.java | 14 +++++++++++++- 4 files changed, 40 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/deprecation/qa/early-deprecation-rest/build.gradle b/x-pack/plugin/deprecation/qa/early-deprecation-rest/build.gradle index ab09c31d6f80c..2d8859bdcea3d 100644 --- a/x-pack/plugin/deprecation/qa/early-deprecation-rest/build.gradle +++ b/x-pack/plugin/deprecation/qa/early-deprecation-rest/build.gradle @@ -27,9 +27,12 @@ restResources { testClusters.configureEach { testDistribution = 'DEFAULT' - setting 'cluster.deprecation_indexing.enabled', 'true' setting 'xpack.security.enabled', 'false' setting 'xpack.license.self_generated.type', 'trial' + setting 'cluster.deprecation_indexing.enabled', 'true' + setting 'cluster.deprecation_indexing.flush_interval', '1ms' + setting 'logger.org.elasticsearch.xpack.deprecation','TRACE' + setting 'logger.org.elasticsearch.xpack.deprecation.logging','TRACE' } // Test clusters run with security disabled diff --git a/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/Deprecation.java b/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/Deprecation.java index 4e2c9da25e78b..85b7c89e7cb85 100644 --- a/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/Deprecation.java +++ b/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/Deprecation.java @@ -34,6 +34,7 @@ import java.util.function.Supplier; import static org.elasticsearch.xpack.deprecation.DeprecationChecks.SKIP_DEPRECATIONS_SETTING; +import static org.elasticsearch.xpack.deprecation.logging.DeprecationIndexingComponent.DEPRECATION_INDEXING_FLUSH_INTERVAL; /** * The plugin class for the Deprecation API @@ -110,6 +111,11 @@ public Collection createComponents(PluginServices services) { @Override public List> getSettings() { - return List.of(USE_X_OPAQUE_ID_IN_FILTERING, WRITE_DEPRECATION_LOGS_TO_INDEX, SKIP_DEPRECATIONS_SETTING); + return List.of( + USE_X_OPAQUE_ID_IN_FILTERING, + WRITE_DEPRECATION_LOGS_TO_INDEX, + SKIP_DEPRECATIONS_SETTING, + DEPRECATION_INDEXING_FLUSH_INTERVAL + ); } } diff --git a/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/logging/DeprecationIndexingAppender.java b/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/logging/DeprecationIndexingAppender.java index edd9a85862b01..22637b1640b51 100644 --- a/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/logging/DeprecationIndexingAppender.java +++ b/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/logging/DeprecationIndexingAppender.java @@ -7,6 +7,8 @@ package org.elasticsearch.xpack.deprecation.logging; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.core.Appender; import org.apache.logging.log4j.core.Core; import org.apache.logging.log4j.core.Filter; @@ -16,6 +18,7 @@ import org.apache.logging.log4j.core.config.plugins.Plugin; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.core.Strings; import org.elasticsearch.xcontent.XContentType; import java.util.Objects; @@ -28,6 +31,7 @@ */ @Plugin(name = "DeprecationIndexingAppender", category = Core.CATEGORY_NAME, elementType = Appender.ELEMENT_TYPE) public class DeprecationIndexingAppender extends AbstractAppender { + private static final Logger logger = LogManager.getLogger(DeprecationIndexingAppender.class); public static final String DEPRECATION_MESSAGES_DATA_STREAM = ".logs-deprecation.elasticsearch-default"; private final Consumer requestConsumer; @@ -40,9 +44,10 @@ public class DeprecationIndexingAppender extends AbstractAppender { /** * Creates a new appender. - * @param name the appender's name - * @param filter a filter to apply directly on the appender - * @param layout the layout to use for formatting message. It must return a JSON string. + * + * @param name the appender's name + * @param filter a filter to apply directly on the appender + * @param layout the layout to use for formatting message. It must return a JSON string. * @param requestConsumer a callback to handle the actual indexing of the log message. */ public DeprecationIndexingAppender(String name, Filter filter, Layout layout, Consumer requestConsumer) { @@ -56,6 +61,13 @@ public DeprecationIndexingAppender(String name, Filter filter, Layout la */ @Override public void append(LogEvent event) { + logger.trace( + () -> Strings.format( + "Received deprecation log event. Appender is %s. message = %s", + isEnabled ? "enabled" : "disabled", + event.getMessage().getFormattedMessage() + ) + ); if (this.isEnabled == false) { return; } @@ -71,6 +83,7 @@ public void append(LogEvent event) { /** * Sets whether this appender is enabled or disabled. When disabled, the appender will * not perform indexing operations. + * * @param enabled the enabled status of the appender. */ public void setEnabled(boolean enabled) { diff --git a/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/logging/DeprecationIndexingComponent.java b/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/logging/DeprecationIndexingComponent.java index 6a59a6832c91f..29041b0c58434 100644 --- a/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/logging/DeprecationIndexingComponent.java +++ b/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/logging/DeprecationIndexingComponent.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.logging.ECSJsonLayout; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.logging.RateLimitingFilter; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.xpack.core.ClientHelper; @@ -46,6 +47,13 @@ * It also starts and stops the appender */ public class DeprecationIndexingComponent extends AbstractLifecycleComponent implements ClusterStateListener { + + public static final Setting DEPRECATION_INDEXING_FLUSH_INTERVAL = Setting.timeSetting( + "cluster.deprecation_indexing.flush_interval", + TimeValue.timeValueSeconds(5), + Setting.Property.NodeScope + ); + private static final Logger logger = LogManager.getLogger(DeprecationIndexingComponent.class); private final DeprecationIndexingAppender appender; @@ -190,6 +198,7 @@ public void enableDeprecationLogIndexing(boolean newEnabled) { * @return an initialised bulk processor */ private BulkProcessor2 getBulkProcessor(Client client, Settings settings) { + TimeValue flushInterval = DEPRECATION_INDEXING_FLUSH_INTERVAL.get(settings); BulkProcessor2.Listener listener = new DeprecationBulkListener(); return BulkProcessor2.builder((bulkRequest, actionListener) -> { /* @@ -198,13 +207,16 @@ private BulkProcessor2 getBulkProcessor(Client client, Settings settings) { * in-flight-bytes limit has been exceeded. This means that we don't have to worry about bounding pendingRequestsBuffer. */ if (flushEnabled.get()) { + logger.trace("Flush is enabled, sending a bulk request"); client.bulk(bulkRequest, actionListener); flushBuffer(); // just in case something was missed after the first flush } else { + logger.trace("Flush is disabled, scheduling a bulk request"); + // this is an unbounded queue, so the entry will always be accepted pendingRequestsBuffer.offer(() -> client.bulk(bulkRequest, actionListener)); } - }, listener, client.threadPool()).setMaxNumberOfRetries(3).setFlushInterval(TimeValue.timeValueSeconds(5)).build(); + }, listener, client.threadPool()).setMaxNumberOfRetries(3).setFlushInterval(flushInterval).build(); } private static class DeprecationBulkListener implements BulkProcessor2.Listener {