diff --git a/kaldb/src/main/java/com/slack/kaldb/server/OpenSearchBulkIngestApi.java b/kaldb/src/main/java/com/slack/kaldb/server/OpenSearchBulkIngestApi.java index a81f8fbc4c..d39b8b4128 100644 --- a/kaldb/src/main/java/com/slack/kaldb/server/OpenSearchBulkIngestApi.java +++ b/kaldb/src/main/java/com/slack/kaldb/server/OpenSearchBulkIngestApi.java @@ -191,7 +191,9 @@ public HttpResponse addDocument(String bulkRequest) { } } - public BulkIngestResponse produceDocuments(Map> indexDocs) { + // TODO: temporary synchronized till we produceDocuments supports writing multiple requests in one + // transaction + public synchronized BulkIngestResponse produceDocuments(Map> indexDocs) { int totalDocs = indexDocs.values().stream().mapToInt(List::size).sum(); // we cannot create a generic pool of producers because the kafka API expects the transaction ID @@ -222,6 +224,7 @@ public BulkIngestResponse produceDocuments(Map> indexDo // exit. new RuntimeHalterImpl().handleFatal(new Throwable("KafkaProducer needs to shutdown ", e)); } catch (Exception e) { + LOG.warn("failed transaction with error", e); try { kafkaProducer.abortTransaction(); } catch (ProducerFencedException err) {