Skip to content

Commit

Permalink
Improve EarlyDeprecationindexingIT test reliability (elastic#105696)
Browse files Browse the repository at this point in the history
this test intends to test the bulkProcessor2's request consumer (see DeprecationIndexingComponent#getBulkProcessor) scheduling requests before the startup is completed (flush is enabled). To verify this behaviour the flush has to happen before the templates are loaded. To test this reliably the flush interval in the test should be as small as possible (not hardcoded 5s as of now)

This commit introduces a setting (not meant to be exposed/documented) to allow for the flush interval to be configured. It also adds additional trace logging to help with troubleshooting.

relates elastic#104716
  • Loading branch information
pgomulka committed Apr 16, 2024
1 parent 46b4a73 commit c958ea2
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@ restResources {

testClusters.configureEach {
testDistribution = 'DEFAULT'
setting 'cluster.deprecation_indexing.enabled', 'true'
setting 'xpack.security.enabled', 'false'
setting 'xpack.license.self_generated.type', 'trial'
setting 'cluster.deprecation_indexing.enabled', 'true'
setting 'cluster.deprecation_indexing.flush_interval', '1ms'
setting 'logger.org.elasticsearch.xpack.deprecation','TRACE'
setting 'logger.org.elasticsearch.xpack.deprecation.logging','TRACE'
}

// Test clusters run with security disabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.function.Supplier;

import static org.elasticsearch.xpack.deprecation.DeprecationChecks.SKIP_DEPRECATIONS_SETTING;
import static org.elasticsearch.xpack.deprecation.logging.DeprecationIndexingComponent.DEPRECATION_INDEXING_FLUSH_INTERVAL;

/**
* The plugin class for the Deprecation API
Expand Down Expand Up @@ -110,6 +111,11 @@ public Collection<?> createComponents(PluginServices services) {

@Override
public List<Setting<?>> getSettings() {
return List.of(USE_X_OPAQUE_ID_IN_FILTERING, WRITE_DEPRECATION_LOGS_TO_INDEX, SKIP_DEPRECATIONS_SETTING);
return List.of(
USE_X_OPAQUE_ID_IN_FILTERING,
WRITE_DEPRECATION_LOGS_TO_INDEX,
SKIP_DEPRECATIONS_SETTING,
DEPRECATION_INDEXING_FLUSH_INTERVAL
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

package org.elasticsearch.xpack.deprecation.logging;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.Appender;
import org.apache.logging.log4j.core.Core;
import org.apache.logging.log4j.core.Filter;
Expand All @@ -16,6 +18,7 @@
import org.apache.logging.log4j.core.config.plugins.Plugin;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.core.Strings;
import org.elasticsearch.xcontent.XContentType;

import java.util.Objects;
Expand All @@ -28,6 +31,7 @@
*/
@Plugin(name = "DeprecationIndexingAppender", category = Core.CATEGORY_NAME, elementType = Appender.ELEMENT_TYPE)
public class DeprecationIndexingAppender extends AbstractAppender {
private static final Logger logger = LogManager.getLogger(DeprecationIndexingAppender.class);
public static final String DEPRECATION_MESSAGES_DATA_STREAM = ".logs-deprecation.elasticsearch-default";

private final Consumer<IndexRequest> requestConsumer;
Expand All @@ -40,9 +44,10 @@ public class DeprecationIndexingAppender extends AbstractAppender {

/**
* Creates a new appender.
* @param name the appender's name
* @param filter a filter to apply directly on the appender
* @param layout the layout to use for formatting message. It must return a JSON string.
*
* @param name the appender's name
* @param filter a filter to apply directly on the appender
* @param layout the layout to use for formatting message. It must return a JSON string.
* @param requestConsumer a callback to handle the actual indexing of the log message.
*/
public DeprecationIndexingAppender(String name, Filter filter, Layout<String> layout, Consumer<IndexRequest> requestConsumer) {
Expand All @@ -56,6 +61,13 @@ public DeprecationIndexingAppender(String name, Filter filter, Layout<String> la
*/
@Override
public void append(LogEvent event) {
logger.trace(
() -> Strings.format(
"Received deprecation log event. Appender is %s. message = %s",
isEnabled ? "enabled" : "disabled",
event.getMessage().getFormattedMessage()
)
);
if (this.isEnabled == false) {
return;
}
Expand All @@ -71,6 +83,7 @@ public void append(LogEvent event) {
/**
* Sets whether this appender is enabled or disabled. When disabled, the appender will
* not perform indexing operations.
*
* @param enabled the enabled status of the appender.
*/
public void setEnabled(boolean enabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.common.logging.ECSJsonLayout;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.logging.RateLimitingFilter;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xpack.core.ClientHelper;
Expand All @@ -46,6 +47,13 @@
* It also starts and stops the appender
*/
public class DeprecationIndexingComponent extends AbstractLifecycleComponent implements ClusterStateListener {

public static final Setting<TimeValue> DEPRECATION_INDEXING_FLUSH_INTERVAL = Setting.timeSetting(
"cluster.deprecation_indexing.flush_interval",
TimeValue.timeValueSeconds(5),
Setting.Property.NodeScope
);

private static final Logger logger = LogManager.getLogger(DeprecationIndexingComponent.class);

private final DeprecationIndexingAppender appender;
Expand Down Expand Up @@ -190,6 +198,7 @@ public void enableDeprecationLogIndexing(boolean newEnabled) {
* @return an initialised bulk processor
*/
private BulkProcessor2 getBulkProcessor(Client client, Settings settings) {
TimeValue flushInterval = DEPRECATION_INDEXING_FLUSH_INTERVAL.get(settings);
BulkProcessor2.Listener listener = new DeprecationBulkListener();
return BulkProcessor2.builder((bulkRequest, actionListener) -> {
/*
Expand All @@ -198,13 +207,16 @@ private BulkProcessor2 getBulkProcessor(Client client, Settings settings) {
* in-flight-bytes limit has been exceeded. This means that we don't have to worry about bounding pendingRequestsBuffer.
*/
if (flushEnabled.get()) {
logger.trace("Flush is enabled, sending a bulk request");
client.bulk(bulkRequest, actionListener);
flushBuffer(); // just in case something was missed after the first flush
} else {
logger.trace("Flush is disabled, scheduling a bulk request");

// this is an unbounded queue, so the entry will always be accepted
pendingRequestsBuffer.offer(() -> client.bulk(bulkRequest, actionListener));
}
}, listener, client.threadPool()).setMaxNumberOfRetries(3).setFlushInterval(TimeValue.timeValueSeconds(5)).build();
}, listener, client.threadPool()).setMaxNumberOfRetries(3).setFlushInterval(flushInterval).build();
}

private static class DeprecationBulkListener implements BulkProcessor2.Listener {
Expand Down

0 comments on commit c958ea2

Please sign in to comment.