Skip to content

Commit

Permalink
Doc parsing error logging with throttling (elastic#117828) (elastic#1…
Browse files Browse the repository at this point in the history
…17968)

* Throttled doc parsing error logging

* add test

* move throttler to separate class

* small changes

* refactor unittest

* fix test
  • Loading branch information
kkrik-es authored Dec 4, 2024
1 parent 3c988de commit 516688a
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.List;

public class DocumentMapper {
static final NodeFeature INDEX_SORTING_ON_NESTED = new NodeFeature("mapper.index_sorting_on_nested");

private final String type;
private final CompressedXContent mappingSource;
private final MappingLookup mappingLookup;
Expand All @@ -29,8 +31,6 @@ public class DocumentMapper {
private final IndexVersion indexVersion;
private final Logger logger;

static final NodeFeature INDEX_SORTING_ON_NESTED = new NodeFeature("mapper.index_sorting_on_nested");

/**
* Create a new {@link DocumentMapper} that holds empty mappings.
* @param mapperService the mapper service that holds the needed components
Expand Down Expand Up @@ -72,9 +72,11 @@ public static DocumentMapper createEmpty(MapperService mapperService) {
: "provided source [" + source + "] differs from mapping [" + mapping.toCompressedXContent() + "]";
}

private void maybeLogDebug(Exception ex) {
private void maybeLog(Exception ex) {
if (logger.isDebugEnabled()) {
logger.debug("Error while parsing document: " + ex.getMessage(), ex);
} else if (IntervalThrottler.DOCUMENT_PARSING_FAILURE.accept()) {
logger.error("Error while parsing document: " + ex.getMessage(), ex);
}
}

Expand Down Expand Up @@ -125,7 +127,7 @@ public ParsedDocument parse(SourceToParse source) throws DocumentParsingExceptio
try {
return documentParser.parseDocument(source, mappingLookup);
} catch (Exception e) {
maybeLogDebug(e);
maybeLog(e);
throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.mapper;

import java.util.concurrent.atomic.AtomicBoolean;

/**
* Throttles tracked operations based on a time interval, restricting them to 1 per N seconds.
*/
enum IntervalThrottler {
DOCUMENT_PARSING_FAILURE(60);

static final int MILLISECONDS_IN_SECOND = 1000;

private final Acceptor acceptor;

IntervalThrottler(long intervalSeconds) {
acceptor = new Acceptor(intervalSeconds * MILLISECONDS_IN_SECOND);
}

/**
* @return true if the operation gets accepted, false if throttled.
*/
boolean accept() {
return acceptor.accept();
}

// Defined separately for testing.
static class Acceptor {
private final long intervalMillis;
private final AtomicBoolean lastAcceptedGuard = new AtomicBoolean(false);
private volatile long lastAcceptedTimeMillis = 0;

Acceptor(long intervalMillis) {
this.intervalMillis = intervalMillis;
}

boolean accept() {
final long now = System.currentTimeMillis();
// Check without guarding first, to reduce contention.
if (now - lastAcceptedTimeMillis > intervalMillis) {
// Check if another concurrent operation succeeded.
if (lastAcceptedGuard.compareAndSet(false, true)) {
try {
// Repeat check under guard protection, so that only one message gets written per interval.
if (now - lastAcceptedTimeMillis > intervalMillis) {
lastAcceptedTimeMillis = now;
return true;
}
} finally {
// Reset guard.
lastAcceptedGuard.set(false);
}
}
}
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,18 @@

package org.elasticsearch.index.mapper;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.core.KeywordAnalyzer;
import org.apache.lucene.analysis.core.WhitespaceAnalyzer;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.logging.MockAppender;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexVersion;
Expand Down Expand Up @@ -493,4 +499,35 @@ public void testDeeplyNestedMapping() throws Exception {
}
}
}

public void testParsingErrorLogging() throws Exception {
MockAppender appender = new MockAppender("mock_appender");
appender.start();
Logger testLogger = LogManager.getLogger(DocumentMapper.class);
Loggers.addAppender(testLogger, appender);
Level originalLogLevel = testLogger.getLevel();
Loggers.setLevel(testLogger, Level.ERROR);

try {
DocumentMapper doc = createDocumentMapper(mapping(b -> b.startObject("value").field("type", "integer").endObject()));

DocumentParsingException e = expectThrows(
DocumentParsingException.class,
() -> doc.parse(source(b -> b.field("value", "foo")))
);
assertThat(e.getMessage(), containsString("failed to parse field [value] of type [integer] in document with id '1'"));
LogEvent event = appender.getLastEventAndReset();
if (event != null) {
assertThat(event.getMessage().getFormattedMessage(), containsString(e.getMessage()));
}

e = expectThrows(DocumentParsingException.class, () -> doc.parse(source(b -> b.field("value", "foo"))));
assertThat(e.getMessage(), containsString("failed to parse field [value] of type [integer] in document with id '1'"));
assertThat(appender.getLastEventAndReset(), nullValue());
} finally {
Loggers.setLevel(testLogger, originalLogLevel);
Loggers.removeAppender(testLogger, appender);
appender.stop();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.mapper;

import org.elasticsearch.test.ESTestCase;

public class IntervalThrottlerTests extends ESTestCase {

public void testThrottling() throws Exception {
var throttler = new IntervalThrottler.Acceptor(10);
assertTrue(throttler.accept());
assertFalse(throttler.accept());
assertFalse(throttler.accept());

Thread.sleep(20);
assertTrue(throttler.accept());
assertFalse(throttler.accept());
assertFalse(throttler.accept());
}
}

0 comments on commit 516688a

Please sign in to comment.