diff --git a/server/src/main/java/org/elasticsearch/index/mapper/DocumentMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/DocumentMapper.java index 1c9321737ab5f..c56885eded38f 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/DocumentMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/DocumentMapper.java @@ -21,6 +21,8 @@ import java.util.List; public class DocumentMapper { + static final NodeFeature INDEX_SORTING_ON_NESTED = new NodeFeature("mapper.index_sorting_on_nested"); + private final String type; private final CompressedXContent mappingSource; private final MappingLookup mappingLookup; @@ -29,8 +31,6 @@ public class DocumentMapper { private final IndexVersion indexVersion; private final Logger logger; - static final NodeFeature INDEX_SORTING_ON_NESTED = new NodeFeature("mapper.index_sorting_on_nested"); - /** * Create a new {@link DocumentMapper} that holds empty mappings. * @param mapperService the mapper service that holds the needed components @@ -72,9 +72,11 @@ public static DocumentMapper createEmpty(MapperService mapperService) { : "provided source [" + source + "] differs from mapping [" + mapping.toCompressedXContent() + "]"; } - private void maybeLogDebug(Exception ex) { + private void maybeLog(Exception ex) { if (logger.isDebugEnabled()) { logger.debug("Error while parsing document: " + ex.getMessage(), ex); + } else if (IntervalThrottler.DOCUMENT_PARSING_FAILURE.accept()) { + logger.error("Error while parsing document: " + ex.getMessage(), ex); } } @@ -125,7 +127,7 @@ public ParsedDocument parse(SourceToParse source) throws DocumentParsingExceptio try { return documentParser.parseDocument(source, mappingLookup); } catch (Exception e) { - maybeLogDebug(e); + maybeLog(e); throw e; } } diff --git a/server/src/main/java/org/elasticsearch/index/mapper/IntervalThrottler.java b/server/src/main/java/org/elasticsearch/index/mapper/IntervalThrottler.java new file mode 100644 index 0000000000000..ffc35d9eaf769 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/mapper/IntervalThrottler.java @@ -0,0 +1,66 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.mapper; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Throttles tracked operations based on a time interval, restricting them to 1 per N seconds. + */ +enum IntervalThrottler { + DOCUMENT_PARSING_FAILURE(60); + + static final int MILLISECONDS_IN_SECOND = 1000; + + private final Acceptor acceptor; + + IntervalThrottler(long intervalSeconds) { + acceptor = new Acceptor(intervalSeconds * MILLISECONDS_IN_SECOND); + } + + /** + * @return true if the operation gets accepted, false if throttled. + */ + boolean accept() { + return acceptor.accept(); + } + + // Defined separately for testing. + static class Acceptor { + private final long intervalMillis; + private final AtomicBoolean lastAcceptedGuard = new AtomicBoolean(false); + private volatile long lastAcceptedTimeMillis = 0; + + Acceptor(long intervalMillis) { + this.intervalMillis = intervalMillis; + } + + boolean accept() { + final long now = System.currentTimeMillis(); + // Check without guarding first, to reduce contention. + if (now - lastAcceptedTimeMillis > intervalMillis) { + // Check if another concurrent operation succeeded. + if (lastAcceptedGuard.compareAndSet(false, true)) { + try { + // Repeat check under guard protection, so that only one message gets written per interval. + if (now - lastAcceptedTimeMillis > intervalMillis) { + lastAcceptedTimeMillis = now; + return true; + } + } finally { + // Reset guard. + lastAcceptedGuard.set(false); + } + } + } + return false; + } + } +} diff --git a/server/src/test/java/org/elasticsearch/index/mapper/DocumentMapperTests.java b/server/src/test/java/org/elasticsearch/index/mapper/DocumentMapperTests.java index db9fdead949de..b2ba3d60d2174 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/DocumentMapperTests.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/DocumentMapperTests.java @@ -9,12 +9,18 @@ package org.elasticsearch.index.mapper; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.core.LogEvent; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.core.KeywordAnalyzer; import org.apache.lucene.analysis.core.WhitespaceAnalyzer; import org.apache.lucene.analysis.standard.StandardAnalyzer; import org.elasticsearch.common.Strings; import org.elasticsearch.common.compress.CompressedXContent; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.logging.MockAppender; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexVersion; @@ -493,4 +499,35 @@ public void testDeeplyNestedMapping() throws Exception { } } } + + public void testParsingErrorLogging() throws Exception { + MockAppender appender = new MockAppender("mock_appender"); + appender.start(); + Logger testLogger = LogManager.getLogger(DocumentMapper.class); + Loggers.addAppender(testLogger, appender); + Level originalLogLevel = testLogger.getLevel(); + Loggers.setLevel(testLogger, Level.ERROR); + + try { + DocumentMapper doc = createDocumentMapper(mapping(b -> b.startObject("value").field("type", "integer").endObject())); + + DocumentParsingException e = expectThrows( + DocumentParsingException.class, + () -> doc.parse(source(b -> b.field("value", "foo"))) + ); + assertThat(e.getMessage(), containsString("failed to parse field [value] of type [integer] in document with id '1'")); + LogEvent event = appender.getLastEventAndReset(); + if (event != null) { + assertThat(event.getMessage().getFormattedMessage(), containsString(e.getMessage())); + } + + e = expectThrows(DocumentParsingException.class, () -> doc.parse(source(b -> b.field("value", "foo")))); + assertThat(e.getMessage(), containsString("failed to parse field [value] of type [integer] in document with id '1'")); + assertThat(appender.getLastEventAndReset(), nullValue()); + } finally { + Loggers.setLevel(testLogger, originalLogLevel); + Loggers.removeAppender(testLogger, appender); + appender.stop(); + } + } } diff --git a/server/src/test/java/org/elasticsearch/index/mapper/IntervalThrottlerTests.java b/server/src/test/java/org/elasticsearch/index/mapper/IntervalThrottlerTests.java new file mode 100644 index 0000000000000..25fd614524441 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/mapper/IntervalThrottlerTests.java @@ -0,0 +1,27 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.mapper; + +import org.elasticsearch.test.ESTestCase; + +public class IntervalThrottlerTests extends ESTestCase { + + public void testThrottling() throws Exception { + var throttler = new IntervalThrottler.Acceptor(10); + assertTrue(throttler.accept()); + assertFalse(throttler.accept()); + assertFalse(throttler.accept()); + + Thread.sleep(20); + assertTrue(throttler.accept()); + assertFalse(throttler.accept()); + assertFalse(throttler.accept()); + } +}