Skip to content

Commit

Permalink
Preventing serialization errors in the nodes stats API (elastic#90319) (
Browse files Browse the repository at this point in the history
elastic#90430)

Preventing serialization errors in the nodes stats API, and adding logging to the ingest counter code
so that we can find the root cause of the problem in the future.
  • Loading branch information
masseyke authored Sep 27, 2022
1 parent 1b5f1fa commit 42f05b9
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 73 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/90319.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 90319
summary: Preventing serialization errors in the nodes stats API
area: Ingest Node
type: bug
issues:
- 77973
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.elasticsearch.ingest;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.core.Tuple;

Expand All @@ -16,6 +18,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
Expand All @@ -30,6 +33,8 @@ public class CompoundProcessor implements Processor {
public static final String ON_FAILURE_PROCESSOR_TAG_FIELD = "on_failure_processor_tag";
public static final String ON_FAILURE_PIPELINE_FIELD = "on_failure_pipeline";

private static final Logger logger = LogManager.getLogger(CompoundProcessor.class);

private final boolean ignoreFailure;
private final List<Processor> processors;
private final List<Processor> onFailureProcessors;
Expand Down Expand Up @@ -191,25 +196,43 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, BiConsume
final IngestMetric finalMetric = processorsWithMetrics.get(currentProcessor).v2();
final Processor finalProcessor = processorsWithMetrics.get(currentProcessor).v1();
final IngestDocument finalIngestDocument = ingestDocument;
/*
* Our assumption is that the listener passed to the processor is only ever called once. However, there is no way to enforce
* that in all processors and all of the code that they call. If the listener is called more than once it causes problems
* such as the metrics being wrong. The listenerHasBeenCalled variable is used to make sure that the code in the listener
* is only executed once.
*/
final AtomicBoolean listenerHasBeenCalled = new AtomicBoolean(false);
finalMetric.preIngest();
final AtomicBoolean postIngestHasBeenCalled = new AtomicBoolean(false);
try {
finalProcessor.execute(ingestDocument, (result, e) -> {
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - finalStartTimeInNanos;
finalMetric.postIngest(ingestTimeInNanos);

if (e != null) {
executeOnFailureOuter(finalCurrentProcessor, finalIngestDocument, handler, finalProcessor, finalMetric, e);
if (listenerHasBeenCalled.getAndSet(true)) {
logger.warn("A listener was unexpectedly called more than once", new RuntimeException());
assert false : "A listener was unexpectedly called more than once";
} else {
if (result != null) {
innerExecute(nextProcessor, result, handler);
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - finalStartTimeInNanos;
finalMetric.postIngest(ingestTimeInNanos);
postIngestHasBeenCalled.set(true);
if (e != null) {
executeOnFailureOuter(finalCurrentProcessor, finalIngestDocument, handler, finalProcessor, finalMetric, e);
} else {
handler.accept(null, null);
if (result != null) {
innerExecute(nextProcessor, result, handler);
} else {
handler.accept(null, null);
}
}
}
});
} catch (Exception e) {
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos;
finalMetric.postIngest(ingestTimeInNanos);
if (postIngestHasBeenCalled.get()) {
logger.warn("Preventing postIngest from being called more than once", new RuntimeException());
assert false : "Attempt to call postIngest more than once";
} else {
finalMetric.postIngest(ingestTimeInNanos);
}
executeOnFailureOuter(currentProcessor, finalIngestDocument, handler, finalProcessor, finalMetric, e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.elasticsearch.ingest;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.logging.DeprecationCategory;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.script.DynamicMap;
Expand All @@ -26,6 +28,7 @@
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
Expand All @@ -45,6 +48,8 @@ public class ConditionalProcessor extends AbstractProcessor implements WrappingP
return value;
});

private static final Logger logger = LogManager.getLogger(ConditionalProcessor.class);

static final String TYPE = "conditional";

private final Script condition;
Expand Down Expand Up @@ -120,15 +125,27 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex

if (matches) {
final long startTimeInNanos = relativeTimeProvider.getAsLong();
/*
* Our assumption is that the listener passed to the processor is only ever called once. However, there is no way to enforce
* that in all processors and all of the code that they call. If the listener is called more than once it causes problems
* such as the metrics being wrong. The listenerHasBeenCalled variable is used to make sure that the code in the listener
* is only executed once.
*/
final AtomicBoolean listenerHasBeenCalled = new AtomicBoolean(false);
metric.preIngest();
processor.execute(ingestDocument, (result, e) -> {
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos;
metric.postIngest(ingestTimeInNanos);
if (e != null) {
metric.ingestFailed();
handler.accept(null, e);
if (listenerHasBeenCalled.getAndSet(true)) {
logger.warn("A listener was unexpectedly called more than once", new RuntimeException());
assert false : "A listener was unexpectedly called more than once";
} else {
handler.accept(result, null);
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos;
metric.postIngest(ingestTimeInNanos);
if (e != null) {
metric.ingestFailed();
handler.accept(null, e);
} else {
handler.accept(result, null);
}
}
});
} else {
Expand Down
22 changes: 20 additions & 2 deletions server/src/main/java/org/elasticsearch/ingest/IngestMetric.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.elasticsearch.ingest;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.metrics.CounterMetric;

import java.util.concurrent.TimeUnit;
Expand All @@ -22,6 +24,8 @@
*/
class IngestMetric {

private static final Logger logger = LogManager.getLogger(IngestMetric.class);

/**
* The time it takes to complete the measured item.
*/
Expand Down Expand Up @@ -53,7 +57,19 @@ void preIngest() {
*/
void postIngest(long ingestTimeInNanos) {
long current = ingestCurrent.decrementAndGet();
assert current >= 0 : "ingest metric current count double-decremented";
if (current < 0) {
/*
* This ought to never happen. However if it does, it's incredibly bad because ingestCurrent being negative causes a
* serialization error that prevents the nodes stats API from working. So we're doing 3 things here:
* (1) Log a stack trace at warn level so that the Elasticsearch engineering team can track down and fix the source of the
* bug if it still exists
* (2) Throw an AssertionError if assertions are enabled so that we are aware of the bug
* (3) Increment the counter back up so that we don't hit serialization failures
*/
logger.warn("Current ingest counter decremented below 0", new RuntimeException());
assert false : "ingest metric current count double-decremented";
ingestCurrent.incrementAndGet();
}
this.ingestTimeInNanos.inc(ingestTimeInNanos);
ingestCount.inc();
}
Expand Down Expand Up @@ -84,6 +100,8 @@ void add(IngestMetric metrics) {
IngestStats.Stats createStats() {
// we track ingestTime at nanosecond resolution, but IngestStats uses millisecond resolution for reporting
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(ingestTimeInNanos.count());
return new IngestStats.Stats(ingestCount.count(), ingestTimeInMillis, ingestCurrent.get(), ingestFailed.count());
// It is possible for the current count to briefly drop below 0, causing serialization problems. See #90319
long currentCount = Math.max(0, ingestCurrent.get());
return new IngestStats.Stats(ingestCount.count(), ingestTimeInMillis, currentCount, ingestFailed.count());
}
}
113 changes: 63 additions & 50 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
Expand Down Expand Up @@ -887,60 +888,72 @@ private void innerExecute(
VersionType versionType = indexRequest.versionType();
Map<String, Object> sourceAsMap = indexRequest.sourceAsMap();
IngestDocument ingestDocument = new IngestDocument(index, id, version, routing, versionType, sourceAsMap);
/*
* Our assumption is that the listener passed to the processor is only ever called once. However, there is no way to enforce
* that in all processors and all of the code that they call. If the listener is called more than once it causes problems
* such as the metrics being wrong. The listenerHasBeenCalled variable is used to make sure that the code in the listener
* is only executed once.
*/
final AtomicBoolean listenerHasBeenCalled = new AtomicBoolean(false);
ingestDocument.executePipeline(pipeline, (result, e) -> {
long ingestTimeInNanos = System.nanoTime() - startTimeInNanos;
totalMetrics.postIngest(ingestTimeInNanos);
if (e != null) {
totalMetrics.ingestFailed();
handler.accept(e);
} else if (result == null) {
itemDroppedHandler.accept(slot);
handler.accept(null);
if (listenerHasBeenCalled.getAndSet(true)) {
logger.warn("A listener was unexpectedly called more than once", new RuntimeException());
assert false : "A listener was unexpectedly called more than once";
} else {
org.elasticsearch.script.Metadata metadata = ingestDocument.getMetadata();

// it's fine to set all metadata fields all the time, as ingest document holds their starting values
// before ingestion, which might also get modified during ingestion.
indexRequest.index(metadata.getIndex());
indexRequest.id(metadata.getId());
indexRequest.routing(metadata.getRouting());
indexRequest.version(metadata.getVersion());
if (metadata.getVersionType() != null) {
indexRequest.versionType(VersionType.fromString(metadata.getVersionType()));
}
Number number;
if ((number = metadata.getIfSeqNo()) != null) {
indexRequest.setIfSeqNo(number.longValue());
}
if ((number = metadata.getIfPrimaryTerm()) != null) {
indexRequest.setIfPrimaryTerm(number.longValue());
}
try {
boolean ensureNoSelfReferences = ingestDocument.doNoSelfReferencesCheck();
indexRequest.source(ingestDocument.getSource(), indexRequest.getContentType(), ensureNoSelfReferences);
} catch (IllegalArgumentException ex) {
// An IllegalArgumentException can be thrown when an ingest
// processor creates a source map that is self-referencing.
// In that case, we catch and wrap the exception so we can
// include which pipeline failed.
long ingestTimeInNanos = System.nanoTime() - startTimeInNanos;
totalMetrics.postIngest(ingestTimeInNanos);
if (e != null) {
totalMetrics.ingestFailed();
handler.accept(
new IllegalArgumentException(
"Failed to generate the source document for ingest pipeline [" + pipeline.getId() + "]",
ex
)
);
return;
}
Map<String, String> map;
if ((map = metadata.getDynamicTemplates()) != null) {
Map<String, String> mergedDynamicTemplates = new HashMap<>(indexRequest.getDynamicTemplates());
mergedDynamicTemplates.putAll(map);
indexRequest.setDynamicTemplates(mergedDynamicTemplates);
}
postIngest(ingestDocument, indexRequest);
handler.accept(e);
} else if (result == null) {
itemDroppedHandler.accept(slot);
handler.accept(null);
} else {
org.elasticsearch.script.Metadata metadata = ingestDocument.getMetadata();

// it's fine to set all metadata fields all the time, as ingest document holds their starting values
// before ingestion, which might also get modified during ingestion.
indexRequest.index(metadata.getIndex());
indexRequest.id(metadata.getId());
indexRequest.routing(metadata.getRouting());
indexRequest.version(metadata.getVersion());
if (metadata.getVersionType() != null) {
indexRequest.versionType(VersionType.fromString(metadata.getVersionType()));
}
Number number;
if ((number = metadata.getIfSeqNo()) != null) {
indexRequest.setIfSeqNo(number.longValue());
}
if ((number = metadata.getIfPrimaryTerm()) != null) {
indexRequest.setIfPrimaryTerm(number.longValue());
}
try {
boolean ensureNoSelfReferences = ingestDocument.doNoSelfReferencesCheck();
indexRequest.source(ingestDocument.getSource(), indexRequest.getContentType(), ensureNoSelfReferences);
} catch (IllegalArgumentException ex) {
// An IllegalArgumentException can be thrown when an ingest
// processor creates a source map that is self-referencing.
// In that case, we catch and wrap the exception so we can
// include which pipeline failed.
totalMetrics.ingestFailed();
handler.accept(
new IllegalArgumentException(
"Failed to generate the source document for ingest pipeline [" + pipeline.getId() + "]",
ex
)
);
return;
}
Map<String, String> map;
if ((map = metadata.getDynamicTemplates()) != null) {
Map<String, String> mergedDynamicTemplates = new HashMap<>(indexRequest.getDynamicTemplates());
mergedDynamicTemplates.putAll(map);
indexRequest.setDynamicTemplates(mergedDynamicTemplates);
}
postIngest(ingestDocument, indexRequest);

handler.accept(null);
handler.accept(null);
}
}
});
}
Expand Down
27 changes: 22 additions & 5 deletions server/src/main/java/org/elasticsearch/ingest/Pipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.elasticsearch.ingest;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.script.ScriptService;
Expand All @@ -16,6 +18,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.LongSupplier;

Expand All @@ -30,6 +33,8 @@ public final class Pipeline {
public static final String ON_FAILURE_KEY = "on_failure";
public static final String META_KEY = "_meta";

private static final Logger logger = LogManager.getLogger(Pipeline.class);

private final String id;
@Nullable
private final String description;
Expand Down Expand Up @@ -113,14 +118,26 @@ public static Pipeline create(
*/
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
final long startTimeInNanos = relativeTimeProvider.getAsLong();
/*
* Our assumption is that the listener passed to the processor is only ever called once. However, there is no way to enforce
* that in all processors and all of the code that they call. If the listener is called more than once it causes problems
* such as the metrics being wrong. The listenerHasBeenCalled variable is used to make sure that the code in the listener
* is only executed once.
*/
final AtomicBoolean listenerHasBeenCalled = new AtomicBoolean(false);
metrics.preIngest();
compoundProcessor.execute(ingestDocument, (result, e) -> {
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos;
metrics.postIngest(ingestTimeInNanos);
if (e != null) {
metrics.ingestFailed();
if (listenerHasBeenCalled.getAndSet(true)) {
logger.warn("A listener was unexpectedly called more than once", new RuntimeException());
assert false : "A listener was unexpectedly called more than once";
} else {
long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos;
metrics.postIngest(ingestTimeInNanos);
if (e != null) {
metrics.ingestFailed();
}
handler.accept(result, e);
}
handler.accept(result, e);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public void testPostIngestDoubleDecrement() {

// the second postIngest triggers an assertion error
expectThrows(AssertionError.class, () -> metric.postIngest(500000L));
assertThat(-1L, equalTo(metric.createStats().getIngestCurrent()));
// We never allow the reported ingestCurrent to be negative:
assertThat(metric.createStats().getIngestCurrent(), equalTo(0L));
}

}

0 comments on commit 42f05b9

Please sign in to comment.