Skip to content

Commit

Permalink
make the kafka producer single threaded
Browse files Browse the repository at this point in the history
  • Loading branch information
vthacker committed Oct 23, 2023
1 parent a411f8a commit 146df94
Showing 1 changed file with 4 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,9 @@ public HttpResponse addDocument(String bulkRequest) {
}
}

public BulkIngestResponse produceDocuments(Map<String, List<Trace.Span>> indexDocs) {
// TODO: temporary synchronized till we produceDocuments supports writing multiple requests in one
// transaction
public synchronized BulkIngestResponse produceDocuments(Map<String, List<Trace.Span>> indexDocs) {
int totalDocs = indexDocs.values().stream().mapToInt(List::size).sum();

// we cannot create a generic pool of producers because the kafka API expects the transaction ID
Expand Down Expand Up @@ -222,6 +224,7 @@ public BulkIngestResponse produceDocuments(Map<String, List<Trace.Span>> indexDo
// exit.
new RuntimeHalterImpl().handleFatal(new Throwable("KafkaProducer needs to shutdown ", e));
} catch (Exception e) {
LOG.warn("failed transaction with error", e);
try {
kafkaProducer.abortTransaction();
} catch (ProducerFencedException err) {
Expand Down

0 comments on commit 146df94

Please sign in to comment.