Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do no use x-opaque-id for deduplicating Elastic originating requests #82855

Merged
5 changes: 5 additions & 0 deletions docs/changelog/82855.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 82855
summary: Do no use x-opaque-id for deduplicating elastic originating requests
area: Infra/Logging
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
import org.apache.logging.log4j.core.config.plugins.PluginFactory;
import org.apache.logging.log4j.core.filter.AbstractFilter;
import org.apache.logging.log4j.message.Message;
import org.elasticsearch.common.Strings;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;

import static org.elasticsearch.common.logging.DeprecatedMessage.ELASTIC_ORIGIN_FIELD_NAME;
import static org.elasticsearch.common.logging.DeprecatedMessage.KEY_FIELD_NAME;
import static org.elasticsearch.common.logging.DeprecatedMessage.X_OPAQUE_ID_FIELD_NAME;

Expand All @@ -34,14 +36,15 @@
* passed by a user on a HTTP header.
* This filter works by using a lruKeyCache - a set of keys which prevents a second message with the same key to be logged.
* The lruKeyCache has a size limited to 128, which when breached will remove the oldest entries.
* <p>
* It is possible to disable use of `x-opaque-id` as a key with {@link RateLimitingFilter#setUseXOpaqueIdEnabled(boolean) }
*
* It is possible to disable use of `x-opaque-id` as a key with {@link RateLimitingFilter#setUseXOpaqueId(boolean) }
* @see <a href="https://logging.apache.org/log4j/2.x/manual/filters.htmlf">Log4j2 Filters</a>
*/
@Plugin(name = "RateLimitingFilter", category = Node.CATEGORY, elementType = Filter.ELEMENT_TYPE)
public class RateLimitingFilter extends AbstractFilter {

private volatile boolean useXOpaqueId = true;
// a flag to disable/enable use of xOpaqueId controlled by changing cluster setting
private volatile boolean useXOpaqueIdEnabled = true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the addition of "enabled"? The "use" prefix already denotes a boolean.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true - it does not make sens to me now either. I can't remember what I was thinking when renaming..


private final Set<String> lruKeyCache = Collections.newSetFromMap(Collections.synchronizedMap(new LinkedHashMap<>() {
@Override
Expand Down Expand Up @@ -76,7 +79,11 @@ public Result filter(Message message) {

private String getKey(ESLogMessage esLogMessage) {
final String key = esLogMessage.get(KEY_FIELD_NAME);
if (useXOpaqueId) {
final String productOrigin = esLogMessage.get(ELASTIC_ORIGIN_FIELD_NAME);
if (Strings.isNullOrEmpty(productOrigin) == false) {
return productOrigin + key;
}
if (useXOpaqueIdEnabled) {
String xOpaqueId = esLogMessage.get(X_OPAQUE_ID_FIELD_NAME);
return xOpaqueId + key;
}
Expand All @@ -101,7 +108,7 @@ public static RateLimitingFilter createFilter(
return new RateLimitingFilter(match, mismatch);
}

public void setUseXOpaqueId(boolean useXOpaqueId) {
this.useXOpaqueId = useXOpaqueId;
public void setUseXOpaqueIdEnabled(boolean useXOpaqueIdEnabled) {
this.useXOpaqueIdEnabled = useXOpaqueIdEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,20 +84,20 @@ public void testMessagesAreRateLimitedByXOpaqueId() {
public void testMessagesAreRateLimitedByKeyAndXOpaqueId() {
// Fill up the cache
for (int i = 0; i < 128; i++) {
Message message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key " + i, "opaque-id " + i, "productName", "msg " + i);
Message message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key " + i, "opaque-id " + i, null, "msg " + i);
assertThat("Expected key" + i + " to be accepted", filter.filter(message), equalTo(Result.ACCEPT));
}

// Should be rate-limited because it's still in the cache
Message message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key 0", "opaque-id 0", "productName", "msg 0");
Message message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key 0", "opaque-id 0", null, "msg 0");
assertThat(filter.filter(message), equalTo(Result.DENY));

// Filter a message with a previously unseen key, in order to evict key0 as it's the oldest
message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key 129", "opaque-id 129", "productName", "msg 129");
message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key 129", "opaque-id 129", null, "msg 129");
assertThat(filter.filter(message), equalTo(Result.ACCEPT));

// Should be allowed because key 0 was evicted from the cache
message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key 0", "opaque-id 0", "productName", "msg 0");
message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key 0", "opaque-id 0", null, "msg 0");
assertThat(filter.filter(message), equalTo(Result.ACCEPT));
}

Expand All @@ -106,18 +106,18 @@ public void testMessagesAreRateLimitedByKeyAndXOpaqueId() {
* independently and checking that a message is not filtered.
*/
public void testVariationsInKeyAndXOpaqueId() {
Message message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key 0", "opaque-id 0", "productName", "msg 0");
Message message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key 0", "opaque-id 0", null, "msg 0");
assertThat(filter.filter(message), equalTo(Result.ACCEPT));

message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key 0", "opaque-id 0", "productName", "msg 0");
message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key 0", "opaque-id 0", null, "msg 0");
// Rejected because the "x-opaque-id" and "key" values are the same as above
assertThat(filter.filter(message), equalTo(Result.DENY));

message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key 1", "opaque-id 0", "productName", "msg 0");
message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key 1", "opaque-id 0", null, "msg 0");
// Accepted because the "key" value is different
assertThat(filter.filter(message), equalTo(Result.ACCEPT));

message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key 0", "opaque-id 1", "productName", "msg 0");
message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key 0", "opaque-id 1", null, "msg 0");
// Accepted because the "x-opaque-id" value is different
assertThat(filter.filter(message), equalTo(Result.ACCEPT));
}
Expand Down Expand Up @@ -150,23 +150,49 @@ public void testFilterCanBeReset() {

public void testMessagesXOpaqueIsIgnoredWhenDisabled() {
RateLimitingFilter filter = new RateLimitingFilter();
filter.setUseXOpaqueId(false);
filter.setUseXOpaqueIdEnabled(false);
filter.start();

// Should NOT be rate-limited because it's not in the cache
Message message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key 0", "opaque-id 0", "productName", "msg 0");
Message message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key 0", "opaque-id 0", null, "msg 0");
assertThat(filter.filter(message), equalTo(Result.ACCEPT));

// Should be rate-limited because it was just added to the cache
message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key 0", "opaque-id 0", "productName", "msg 0");
message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key 0", "opaque-id 0", null, "msg 0");
assertThat(filter.filter(message), equalTo(Result.DENY));

// Should be rate-limited because X-Opaque-Id is not used
message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key 0", "opaque-id 1", "productName", "msg 0");
message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key 0", "opaque-id 1", null, "msg 0");
assertThat(filter.filter(message), equalTo(Result.DENY));

// Should NOT be rate-limited because "key 1" it not in the cache
message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key 1", "opaque-id 1", "productName", "msg 0");
message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key 1", "opaque-id 1", null, "msg 0");
assertThat(filter.filter(message), equalTo(Result.ACCEPT));
}

public void testXOpaqueIdNotBeingUsedFromElasticOriginatingRequests() {
RateLimitingFilter filter = new RateLimitingFilter();
filter.setUseXOpaqueIdEnabled(true);
filter.start();

// Should NOT be rate-limited because it's not in the cache
Message message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key", "opaque-id 0", "kibana", "msg 0");
assertThat(filter.filter(message), equalTo(Result.ACCEPT));

// Should be rate-limited even though the x-opaque-id is unique because it originates from kibana
message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key", "opaque-id 1", "kibana", "msg 0");
assertThat(filter.filter(message), equalTo(Result.DENY));

// Should not be rate-limited - it is the first request from beats. (x-opaque-id ignored as it originates from elastic)
message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key", "opaque-id 0", "beats", "msg 0");
assertThat(filter.filter(message), equalTo(Result.ACCEPT));

// second request from beats (elastic originating), should be rate-limited
message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key", "opaque-id 1", "beats", "msg 0");
assertThat(filter.filter(message), equalTo(Result.DENY));

// request from beats (elastic originating), but with a different key- should not be rate-limited
message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key2", "opaque-id 1", "beats", "msg 1");
assertThat(filter.filter(message), equalTo(Result.ACCEPT));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ public Collection<Object> createComponents(

final RateLimitingFilter rateLimitingFilterForIndexing = new RateLimitingFilter();
// enable on start.
rateLimitingFilterForIndexing.setUseXOpaqueId(USE_X_OPAQUE_ID_IN_FILTERING.get(environment.settings()));
rateLimitingFilterForIndexing.setUseXOpaqueIdEnabled(USE_X_OPAQUE_ID_IN_FILTERING.get(environment.settings()));
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(USE_X_OPAQUE_ID_IN_FILTERING, rateLimitingFilterForIndexing::setUseXOpaqueId);
.addSettingsUpdateConsumer(USE_X_OPAQUE_ID_IN_FILTERING, rateLimitingFilterForIndexing::setUseXOpaqueIdEnabled);

final DeprecationIndexingComponent component = DeprecationIndexingComponent.createDeprecationIndexingComponent(
client,
Expand Down