From 792002464a32a53b0c0bdded46709e7690a91d11 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Fri, 23 Sep 2022 14:50:27 -0500 Subject: [PATCH 1/4] Preventing serialization errors in the nodes stats API --- .../ingest/CompoundProcessor.java | 41 +++++-- .../ingest/ConditionalProcessor.java | 29 ++++- .../elasticsearch/ingest/IngestMetric.java | 18 ++- .../elasticsearch/ingest/IngestService.java | 113 ++++++++++-------- .../org/elasticsearch/ingest/Pipeline.java | 27 ++++- 5 files changed, 157 insertions(+), 71 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java index 3d5637c36f012..74700ddd6d4e5 100644 --- a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java @@ -8,6 +8,8 @@ package org.elasticsearch.ingest; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.core.Tuple; @@ -16,6 +18,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import java.util.function.LongSupplier; import java.util.stream.Collectors; @@ -30,6 +33,8 @@ public class CompoundProcessor implements Processor { public static final String ON_FAILURE_PROCESSOR_TAG_FIELD = "on_failure_processor_tag"; public static final String ON_FAILURE_PIPELINE_FIELD = "on_failure_pipeline"; + private static final Logger logger = LogManager.getLogger(CompoundProcessor.class); + private final boolean ignoreFailure; private final List processors; private final List onFailureProcessors; @@ -191,25 +196,43 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, BiConsume final IngestMetric finalMetric = processorsWithMetrics.get(currentProcessor).v2(); final Processor finalProcessor = processorsWithMetrics.get(currentProcessor).v1(); final IngestDocument finalIngestDocument = ingestDocument; + /* + * Our assumption is that the listener passed to the processor is only ever called once. However, there is no way to enforce + * that in all processors and all of the code that they call. If the listener is called more than once it causes problems + * such as the metrics being wrong. The listenerHasBeenCalled variable is used to make sure that the code in the listener + * is only executed once. + */ + final AtomicBoolean listenerHasBeenCalled = new AtomicBoolean(false); finalMetric.preIngest(); + final AtomicBoolean postIngestHasBeenCalled = new AtomicBoolean(false); try { finalProcessor.execute(ingestDocument, (result, e) -> { - long ingestTimeInNanos = relativeTimeProvider.getAsLong() - finalStartTimeInNanos; - finalMetric.postIngest(ingestTimeInNanos); - - if (e != null) { - executeOnFailureOuter(finalCurrentProcessor, finalIngestDocument, handler, finalProcessor, finalMetric, e); + if (listenerHasBeenCalled.getAndSet(true)) { + logger.warn("A listener was unexpectedly called more than once", new RuntimeException()); + assert false : "A listener was unexpectedly called more than once"; } else { - if (result != null) { - innerExecute(nextProcessor, result, handler); + long ingestTimeInNanos = relativeTimeProvider.getAsLong() - finalStartTimeInNanos; + finalMetric.postIngest(ingestTimeInNanos); + postIngestHasBeenCalled.set(true); + if (e != null) { + executeOnFailureOuter(finalCurrentProcessor, finalIngestDocument, handler, finalProcessor, finalMetric, e); } else { - handler.accept(null, null); + if (result != null) { + innerExecute(nextProcessor, result, handler); + } else { + handler.accept(null, null); + } } } }); } catch (Exception e) { long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos; - finalMetric.postIngest(ingestTimeInNanos); + if (postIngestHasBeenCalled.get()) { + logger.warn("Preventing postIngest from being called more than once", new RuntimeException()); + assert false : "Attempt to call postIngest more than once"; + } else { + finalMetric.postIngest(ingestTimeInNanos); + } executeOnFailureOuter(currentProcessor, finalIngestDocument, handler, finalProcessor, finalMetric, e); } } diff --git a/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java b/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java index 528bb402a59e8..858fe5e675b36 100644 --- a/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java @@ -8,6 +8,8 @@ package org.elasticsearch.ingest; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.common.logging.DeprecationCategory; import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.script.DynamicMap; @@ -26,6 +28,7 @@ import java.util.ListIterator; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import java.util.function.Function; import java.util.function.LongSupplier; @@ -45,6 +48,8 @@ public class ConditionalProcessor extends AbstractProcessor implements WrappingP return value; }); + private static final Logger logger = LogManager.getLogger(ConditionalProcessor.class); + static final String TYPE = "conditional"; private final Script condition; @@ -120,15 +125,27 @@ public void execute(IngestDocument ingestDocument, BiConsumer { - long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos; - metric.postIngest(ingestTimeInNanos); - if (e != null) { - metric.ingestFailed(); - handler.accept(null, e); + if (listenerHasBeenCalled.getAndSet(true)) { + logger.warn("A listener was unexpectedly called more than once", new RuntimeException()); + assert false : "A listener was unexpectedly called more than once"; } else { - handler.accept(result, null); + long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos; + metric.postIngest(ingestTimeInNanos); + if (e != null) { + metric.ingestFailed(); + handler.accept(null, e); + } else { + handler.accept(result, null); + } } }); } else { diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestMetric.java b/server/src/main/java/org/elasticsearch/ingest/IngestMetric.java index de26acff1a024..5265e5c8079d5 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestMetric.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestMetric.java @@ -8,6 +8,8 @@ package org.elasticsearch.ingest; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.common.metrics.CounterMetric; import java.util.concurrent.TimeUnit; @@ -22,6 +24,8 @@ */ class IngestMetric { + private static final Logger logger = LogManager.getLogger(IngestMetric.class); + /** * The time it takes to complete the measured item. */ @@ -53,7 +57,19 @@ void preIngest() { */ void postIngest(long ingestTimeInNanos) { long current = ingestCurrent.decrementAndGet(); - assert current >= 0 : "ingest metric current count double-decremented"; + if (current < 0) { + /* + * This ought to never happen. However if it does, it's incredibly bad because ingestCurrent being negative causes a + * serialization error that prevents the nodes stats API from working. So we're doing 3 things here: + * (1) Log a stack trace at warn level so that the Elasticsearch engineering team can track down and fix the source of the + * bug if it still exists + * (2) Throw an AssertionError if assertions are enabled so that we are aware of the bug + * (3) Increment the counter back up so that we don't hit serialization failures + */ + logger.warn("Current ingest counter decremented below 0", new RuntimeException()); + assert false : "ingest metric current count double-decremented"; + ingestCurrent.incrementAndGet(); + } this.ingestTimeInNanos.inc(ingestTimeInNanos); ingestCount.inc(); } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index d2e671550a39a..b58594edd26ac 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -72,6 +72,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -884,60 +885,72 @@ private void innerExecute( VersionType versionType = indexRequest.versionType(); Map sourceAsMap = indexRequest.sourceAsMap(); IngestDocument ingestDocument = new IngestDocument(index, id, version, routing, versionType, sourceAsMap); + /* + * Our assumption is that the listener passed to the processor is only ever called once. However, there is no way to enforce + * that in all processors and all of the code that they call. If the listener is called more than once it causes problems + * such as the metrics being wrong. The listenerHasBeenCalled variable is used to make sure that the code in the listener + * is only executed once. + */ + final AtomicBoolean listenerHasBeenCalled = new AtomicBoolean(false); ingestDocument.executePipeline(pipeline, (result, e) -> { - long ingestTimeInNanos = System.nanoTime() - startTimeInNanos; - totalMetrics.postIngest(ingestTimeInNanos); - if (e != null) { - totalMetrics.ingestFailed(); - handler.accept(e); - } else if (result == null) { - itemDroppedHandler.accept(slot); - handler.accept(null); + if (listenerHasBeenCalled.getAndSet(true)) { + logger.warn("A listener was unexpectedly called more than once", new RuntimeException()); + assert false : "A listener was unexpectedly called more than once"; } else { - org.elasticsearch.script.Metadata metadata = ingestDocument.getMetadata(); - - // it's fine to set all metadata fields all the time, as ingest document holds their starting values - // before ingestion, which might also get modified during ingestion. - indexRequest.index(metadata.getIndex()); - indexRequest.id(metadata.getId()); - indexRequest.routing(metadata.getRouting()); - indexRequest.version(metadata.getVersion()); - if (metadata.getVersionType() != null) { - indexRequest.versionType(VersionType.fromString(metadata.getVersionType())); - } - Number number; - if ((number = metadata.getIfSeqNo()) != null) { - indexRequest.setIfSeqNo(number.longValue()); - } - if ((number = metadata.getIfPrimaryTerm()) != null) { - indexRequest.setIfPrimaryTerm(number.longValue()); - } - try { - boolean ensureNoSelfReferences = ingestDocument.doNoSelfReferencesCheck(); - indexRequest.source(ingestDocument.getSource(), indexRequest.getContentType(), ensureNoSelfReferences); - } catch (IllegalArgumentException ex) { - // An IllegalArgumentException can be thrown when an ingest - // processor creates a source map that is self-referencing. - // In that case, we catch and wrap the exception so we can - // include which pipeline failed. + long ingestTimeInNanos = System.nanoTime() - startTimeInNanos; + totalMetrics.postIngest(ingestTimeInNanos); + if (e != null) { totalMetrics.ingestFailed(); - handler.accept( - new IllegalArgumentException( - "Failed to generate the source document for ingest pipeline [" + pipeline.getId() + "]", - ex - ) - ); - return; - } - Map map; - if ((map = metadata.getDynamicTemplates()) != null) { - Map mergedDynamicTemplates = new HashMap<>(indexRequest.getDynamicTemplates()); - mergedDynamicTemplates.putAll(map); - indexRequest.setDynamicTemplates(mergedDynamicTemplates); - } - postIngest(ingestDocument, indexRequest); + handler.accept(e); + } else if (result == null) { + itemDroppedHandler.accept(slot); + handler.accept(null); + } else { + org.elasticsearch.script.Metadata metadata = ingestDocument.getMetadata(); + + // it's fine to set all metadata fields all the time, as ingest document holds their starting values + // before ingestion, which might also get modified during ingestion. + indexRequest.index(metadata.getIndex()); + indexRequest.id(metadata.getId()); + indexRequest.routing(metadata.getRouting()); + indexRequest.version(metadata.getVersion()); + if (metadata.getVersionType() != null) { + indexRequest.versionType(VersionType.fromString(metadata.getVersionType())); + } + Number number; + if ((number = metadata.getIfSeqNo()) != null) { + indexRequest.setIfSeqNo(number.longValue()); + } + if ((number = metadata.getIfPrimaryTerm()) != null) { + indexRequest.setIfPrimaryTerm(number.longValue()); + } + try { + boolean ensureNoSelfReferences = ingestDocument.doNoSelfReferencesCheck(); + indexRequest.source(ingestDocument.getSource(), indexRequest.getContentType(), ensureNoSelfReferences); + } catch (IllegalArgumentException ex) { + // An IllegalArgumentException can be thrown when an ingest + // processor creates a source map that is self-referencing. + // In that case, we catch and wrap the exception so we can + // include which pipeline failed. + totalMetrics.ingestFailed(); + handler.accept( + new IllegalArgumentException( + "Failed to generate the source document for ingest pipeline [" + pipeline.getId() + "]", + ex + ) + ); + return; + } + Map map; + if ((map = metadata.getDynamicTemplates()) != null) { + Map mergedDynamicTemplates = new HashMap<>(indexRequest.getDynamicTemplates()); + mergedDynamicTemplates.putAll(map); + indexRequest.setDynamicTemplates(mergedDynamicTemplates); + } + postIngest(ingestDocument, indexRequest); - handler.accept(null); + handler.accept(null); + } } }); } diff --git a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java index e9d78bd9f1003..8061e7a90aee9 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java +++ b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java @@ -8,6 +8,8 @@ package org.elasticsearch.ingest; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.core.Nullable; import org.elasticsearch.script.ScriptService; @@ -16,6 +18,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import java.util.function.LongSupplier; @@ -30,6 +33,8 @@ public final class Pipeline { public static final String ON_FAILURE_KEY = "on_failure"; public static final String META_KEY = "_meta"; + private static final Logger logger = LogManager.getLogger(Pipeline.class); + private final String id; @Nullable private final String description; @@ -113,14 +118,26 @@ public static Pipeline create( */ public void execute(IngestDocument ingestDocument, BiConsumer handler) { final long startTimeInNanos = relativeTimeProvider.getAsLong(); + /* + * Our assumption is that the listener passed to the processor is only ever called once. However, there is no way to enforce + * that in all processors and all of the code that they call. If the listener is called more than once it causes problems + * such as the metrics being wrong. The listenerHasBeenCalled variable is used to make sure that the code in the listener + * is only executed once. + */ + final AtomicBoolean listenerHasBeenCalled = new AtomicBoolean(false); metrics.preIngest(); compoundProcessor.execute(ingestDocument, (result, e) -> { - long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos; - metrics.postIngest(ingestTimeInNanos); - if (e != null) { - metrics.ingestFailed(); + if (listenerHasBeenCalled.getAndSet(true)) { + logger.warn("A listener was unexpectedly called more than once", new RuntimeException()); + assert false : "A listener was unexpectedly called more than once"; + } else { + long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos; + metrics.postIngest(ingestTimeInNanos); + if (e != null) { + metrics.ingestFailed(); + } + handler.accept(result, e); } - handler.accept(result, e); }); } From 30c9d83444b93ce1de843a91c1119e1bdbf39285 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Fri, 23 Sep 2022 15:18:12 -0500 Subject: [PATCH 2/4] Update docs/changelog/90319.yaml --- docs/changelog/90319.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/90319.yaml diff --git a/docs/changelog/90319.yaml b/docs/changelog/90319.yaml new file mode 100644 index 0000000000000..c0a667b8ee3e6 --- /dev/null +++ b/docs/changelog/90319.yaml @@ -0,0 +1,5 @@ +pr: 90319 +summary: Preventing serialization errors in the nodes stats API +area: Ingest Node +type: bug +issues: [] From b0ce8261475804d002c71045374b4894d53ae9d7 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Fri, 23 Sep 2022 15:22:14 -0500 Subject: [PATCH 3/4] Update docs/changelog/90319.yaml --- docs/changelog/90319.yaml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/changelog/90319.yaml b/docs/changelog/90319.yaml index c0a667b8ee3e6..0d94afcd1c12e 100644 --- a/docs/changelog/90319.yaml +++ b/docs/changelog/90319.yaml @@ -2,4 +2,5 @@ pr: 90319 summary: Preventing serialization errors in the nodes stats API area: Ingest Node type: bug -issues: [] +issues: + - 77973 From 58f84bdbd17520ad74e5d630502fe1f0df716333 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Mon, 26 Sep 2022 08:29:41 -0500 Subject: [PATCH 4/4] Never reporting a negative ingest current --- .../src/main/java/org/elasticsearch/ingest/IngestMetric.java | 4 +++- .../test/java/org/elasticsearch/ingest/IngestMetricTests.java | 3 ++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestMetric.java b/server/src/main/java/org/elasticsearch/ingest/IngestMetric.java index 5265e5c8079d5..13c57d9e06b28 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestMetric.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestMetric.java @@ -100,6 +100,8 @@ void add(IngestMetric metrics) { IngestStats.Stats createStats() { // we track ingestTime at nanosecond resolution, but IngestStats uses millisecond resolution for reporting long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(ingestTimeInNanos.count()); - return new IngestStats.Stats(ingestCount.count(), ingestTimeInMillis, ingestCurrent.get(), ingestFailed.count()); + // It is possible for the current count to briefly drop below 0, causing serialization problems. See #90319 + long currentCount = Math.max(0, ingestCurrent.get()); + return new IngestStats.Stats(ingestCount.count(), ingestTimeInMillis, currentCount, ingestFailed.count()); } } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestMetricTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestMetricTests.java index d4fd0d6c0ef52..06fa24e493949 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestMetricTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestMetricTests.java @@ -42,7 +42,8 @@ public void testPostIngestDoubleDecrement() { // the second postIngest triggers an assertion error expectThrows(AssertionError.class, () -> metric.postIngest(500000L)); - assertThat(-1L, equalTo(metric.createStats().getIngestCurrent())); + // We never allow the reported ingestCurrent to be negative: + assertThat(metric.createStats().getIngestCurrent(), equalTo(0L)); } }