Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce log level for pipeline failure #54097

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@
*/
public class TransportBulkAction extends HandledTransportAction<BulkRequest, BulkResponse> {

private static final Logger logger = LogManager.getLogger(TransportBulkAction.class);

private final ThreadPool threadPool;
private final AutoCreateIndex autoCreateIndex;
private final ClusterService clusterService;
Expand Down Expand Up @@ -233,7 +235,7 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
} else {
final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size());
for (String index : autoCreateIndices) {
createIndex(index, bulkRequest.timeout(), new ActionListener<CreateIndexResponse>() {
createIndex(index, bulkRequest.timeout(), new ActionListener<>() {
@Override
public void onResponse(CreateIndexResponse result) {
if (counter.decrementAndGet() == 0) {
Expand Down Expand Up @@ -634,7 +636,7 @@ private long relativeTime() {
return relativeTimeProvider.getAsLong();
}

void processBulkIndexIngestRequest(Task task, BulkRequest original, ActionListener<BulkResponse> listener) {
private void processBulkIndexIngestRequest(Task task, BulkRequest original, ActionListener<BulkResponse> listener) {
final long ingestStartTimeInNanos = System.nanoTime();
final BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original);
ingestService.executeBulkRequest(
Expand All @@ -643,7 +645,7 @@ void processBulkIndexIngestRequest(Task task, BulkRequest original, ActionListen
bulkRequestModifier::markItemAsFailed,
(originalThread, exception) -> {
if (exception != null) {
logger.error("failed to execute pipeline for a bulk request", exception);
logger.debug("failed to execute pipeline for a bulk request", exception);
listener.onFailure(exception);
} else {
long ingestTookInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - ingestStartTimeInNanos);
Expand Down Expand Up @@ -692,7 +694,7 @@ public boolean isForceExecution() {

static final class BulkRequestModifier implements Iterator<DocWriteRequest<?>> {

private static final Logger LOGGER = LogManager.getLogger(BulkRequestModifier.class);
private static final Logger logger = LogManager.getLogger(BulkRequestModifier.class);

final BulkRequest bulkRequest;
final SparseFixedBitSet failedSlots;
Expand Down Expand Up @@ -774,7 +776,7 @@ synchronized void markItemAsDropped(int slot) {

synchronized void markItemAsFailed(int slot, Exception e) {
IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(slot));
LOGGER.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}]",
logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}]",
indexRequest.getPipeline(), indexRequest.index(), indexRequest.id()), e);

// We hit a error during preprocessing a request, so we:
Expand Down