Skip to content

Commit

Permalink
Reduce log level for pipeline failure
Browse files Browse the repository at this point in the history
Today we log `failed to execute pipeline for a bulk request` at `ERROR` level
if an attempt to run an ingest pipeline fails. A failure here is commonly due
to an `EsRejectedExecutionException`. We also feed such failures back to the
client and record the rejection in the threadpool statistics.

In line with elastic#51459 there is no need to log failures within actions so noisily
and with such urgency. It is better to leave it up to the client to react
accordingly. Typically an `EsRejectedExecutionException` should result in the
client backing off and retrying, so a failure here is not normally fatal enough
to justify an `ERROR` log at all.

This commit reduces the log level for this message to `DEBUG`.
  • Loading branch information
DaveCTurner committed Mar 24, 2020
1 parent c576488 commit fbb2a2f
Showing 1 changed file with 7 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@
*/
public class TransportBulkAction extends HandledTransportAction<BulkRequest, BulkResponse> {

private static final Logger logger = LogManager.getLogger(TransportBulkAction.class);

private final ThreadPool threadPool;
private final AutoCreateIndex autoCreateIndex;
private final ClusterService clusterService;
Expand Down Expand Up @@ -233,7 +235,7 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
} else {
final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size());
for (String index : autoCreateIndices) {
createIndex(index, bulkRequest.timeout(), new ActionListener<CreateIndexResponse>() {
createIndex(index, bulkRequest.timeout(), new ActionListener<>() {
@Override
public void onResponse(CreateIndexResponse result) {
if (counter.decrementAndGet() == 0) {
Expand Down Expand Up @@ -634,7 +636,7 @@ private long relativeTime() {
return relativeTimeProvider.getAsLong();
}

void processBulkIndexIngestRequest(Task task, BulkRequest original, ActionListener<BulkResponse> listener) {
private void processBulkIndexIngestRequest(Task task, BulkRequest original, ActionListener<BulkResponse> listener) {
final long ingestStartTimeInNanos = System.nanoTime();
final BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original);
ingestService.executeBulkRequest(
Expand All @@ -643,7 +645,7 @@ void processBulkIndexIngestRequest(Task task, BulkRequest original, ActionListen
bulkRequestModifier::markItemAsFailed,
(originalThread, exception) -> {
if (exception != null) {
logger.error("failed to execute pipeline for a bulk request", exception);
logger.debug("failed to execute pipeline for a bulk request", exception);
listener.onFailure(exception);
} else {
long ingestTookInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - ingestStartTimeInNanos);
Expand Down Expand Up @@ -692,7 +694,7 @@ public boolean isForceExecution() {

static final class BulkRequestModifier implements Iterator<DocWriteRequest<?>> {

private static final Logger LOGGER = LogManager.getLogger(BulkRequestModifier.class);
private static final Logger logger = LogManager.getLogger(BulkRequestModifier.class);

final BulkRequest bulkRequest;
final SparseFixedBitSet failedSlots;
Expand Down Expand Up @@ -774,7 +776,7 @@ synchronized void markItemAsDropped(int slot) {

synchronized void markItemAsFailed(int slot, Exception e) {
IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(slot));
LOGGER.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}]",
logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}]",
indexRequest.getPipeline(), indexRequest.index(), indexRequest.id()), e);

// We hit a error during preprocessing a request, so we:
Expand Down

0 comments on commit fbb2a2f

Please sign in to comment.