Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve EarlyDeprecationindexingIT test reliability #105696

Merged
merged 4 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@ restResources {

testClusters.configureEach {
testDistribution = 'DEFAULT'
setting 'cluster.deprecation_indexing.enabled', 'true'
setting 'xpack.security.enabled', 'false'
setting 'xpack.license.self_generated.type', 'trial'
setting 'cluster.deprecation_indexing.enabled', 'true'
setting 'cluster.deprecation_indexing.flush_interval', '1ms'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems infeasibly short. Would 100ms be a bit closer to real-world usage?

Copy link
Contributor Author

@pgomulka pgomulka Feb 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the smallest the interval the better. We want the flush happen as soon as possible (to make sure it will be before the templates are loaded)
On my laptop it takes <500ms from the indexRequest being added into bulk processor to loading the templates. Could be that on CI machine it would be faster?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My only concern is that 1ms might disrupt normal ES working, but if it works - fine

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

disrupt normal ES working

this is the goal. indexing should not happen that early in startup so there is a queue of those too early requests to process later

setting 'logger.org.elasticsearch.xpack.deprecation','TRACE'
thecoop marked this conversation as resolved.
Show resolved Hide resolved
setting 'logger.org.elasticsearch.xpack.deprecation.logging','TRACE'
}

// Test clusters run with security disabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.function.Supplier;

import static org.elasticsearch.xpack.deprecation.DeprecationChecks.SKIP_DEPRECATIONS_SETTING;
import static org.elasticsearch.xpack.deprecation.logging.DeprecationIndexingComponent.DEPRECATION_INDEXING_FLUSH_INTERVAL;

/**
* The plugin class for the Deprecation API
Expand Down Expand Up @@ -110,6 +111,11 @@ public Collection<?> createComponents(PluginServices services) {

@Override
public List<Setting<?>> getSettings() {
return List.of(USE_X_OPAQUE_ID_IN_FILTERING, WRITE_DEPRECATION_LOGS_TO_INDEX, SKIP_DEPRECATIONS_SETTING);
return List.of(
USE_X_OPAQUE_ID_IN_FILTERING,
WRITE_DEPRECATION_LOGS_TO_INDEX,
SKIP_DEPRECATIONS_SETTING,
DEPRECATION_INDEXING_FLUSH_INTERVAL
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

package org.elasticsearch.xpack.deprecation.logging;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.Appender;
import org.apache.logging.log4j.core.Core;
import org.apache.logging.log4j.core.Filter;
Expand All @@ -16,6 +18,7 @@
import org.apache.logging.log4j.core.config.plugins.Plugin;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.core.Strings;
import org.elasticsearch.xcontent.XContentType;

import java.util.Objects;
Expand All @@ -28,6 +31,7 @@
*/
@Plugin(name = "DeprecationIndexingAppender", category = Core.CATEGORY_NAME, elementType = Appender.ELEMENT_TYPE)
public class DeprecationIndexingAppender extends AbstractAppender {
private static final Logger logger = LogManager.getLogger(DeprecationIndexingAppender.class);
public static final String DEPRECATION_MESSAGES_DATA_STREAM = ".logs-deprecation.elasticsearch-default";

private final Consumer<IndexRequest> requestConsumer;
Expand All @@ -40,9 +44,10 @@ public class DeprecationIndexingAppender extends AbstractAppender {

/**
* Creates a new appender.
* @param name the appender's name
* @param filter a filter to apply directly on the appender
* @param layout the layout to use for formatting message. It must return a JSON string.
*
* @param name the appender's name
* @param filter a filter to apply directly on the appender
* @param layout the layout to use for formatting message. It must return a JSON string.
* @param requestConsumer a callback to handle the actual indexing of the log message.
*/
public DeprecationIndexingAppender(String name, Filter filter, Layout<String> layout, Consumer<IndexRequest> requestConsumer) {
Expand All @@ -56,6 +61,13 @@ public DeprecationIndexingAppender(String name, Filter filter, Layout<String> la
*/
@Override
public void append(LogEvent event) {
logger.trace(
() -> Strings.format(
"Received deprecation log event. Appender is %s. message = %s",
isEnabled ? "enabled" : "disabled",
event.getMessage().getFormattedMessage()
)
);
if (this.isEnabled == false) {
return;
}
Expand All @@ -66,11 +78,13 @@ public void append(LogEvent event) {
.opType(DocWriteRequest.OpType.CREATE);

this.requestConsumer.accept(request);
logger.trace(() -> "sent");
}

/**
* Sets whether this appender is enabled or disabled. When disabled, the appender will
* not perform indexing operations.
*
* @param enabled the enabled status of the appender.
*/
public void setEnabled(boolean enabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.common.logging.ECSJsonLayout;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.logging.RateLimitingFilter;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xpack.core.ClientHelper;
Expand All @@ -46,6 +47,13 @@
* It also starts and stops the appender
*/
public class DeprecationIndexingComponent extends AbstractLifecycleComponent implements ClusterStateListener {

public static final Setting<TimeValue> DEPRECATION_INDEXING_FLUSH_INTERVAL = Setting.timeSetting(
"cluster.deprecation_indexing.flush_interval",
TimeValue.timeValueSeconds(5),
Setting.Property.NodeScope
);

private static final Logger logger = LogManager.getLogger(DeprecationIndexingComponent.class);

private final DeprecationIndexingAppender appender;
Expand Down Expand Up @@ -190,6 +198,7 @@ public void enableDeprecationLogIndexing(boolean newEnabled) {
* @return an initialised bulk processor
*/
private BulkProcessor2 getBulkProcessor(Client client, Settings settings) {
TimeValue flushInterval = DEPRECATION_INDEXING_FLUSH_INTERVAL.get(settings);
BulkProcessor2.Listener listener = new DeprecationBulkListener();
return BulkProcessor2.builder((bulkRequest, actionListener) -> {
/*
Expand All @@ -198,13 +207,16 @@ private BulkProcessor2 getBulkProcessor(Client client, Settings settings) {
* in-flight-bytes limit has been exceeded. This means that we don't have to worry about bounding pendingRequestsBuffer.
*/
if (flushEnabled.get()) {
logger.trace(() -> "Flush is enabled, sending a bulk request");
thecoop marked this conversation as resolved.
Show resolved Hide resolved
client.bulk(bulkRequest, actionListener);
flushBuffer(); // just in case something was missed after the first flush
} else {
logger.trace(() -> "Flush is disabled, scheduling a bulk request");

// this is an unbounded queue, so the entry will always be accepted
pendingRequestsBuffer.offer(() -> client.bulk(bulkRequest, actionListener));
}
}, listener, client.threadPool()).setMaxNumberOfRetries(3).setFlushInterval(TimeValue.timeValueSeconds(5)).build();
}, listener, client.threadPool()).setMaxNumberOfRetries(3).setFlushInterval(flushInterval).build();
}

private static class DeprecationBulkListener implements BulkProcessor2.Listener {
Expand Down