From f9e8098fa12d33e7ecec0a2fa94a3cf8b6b4a668 Mon Sep 17 00:00:00 2001 From: Oleksii Kuzminov <40833111+OleksiiKuzminov@users.noreply.github.com> Date: Thu, 25 Mar 2021 10:19:32 +0200 Subject: [PATCH] MODDATAIMP-372 (#141) (cherry picked from commit f3cb9b30b0a4cd5570202061f268cbf047997620) --- NEWS.md | 1 + .../services/impl/KafkaProducersBuilder.java | 40 ------------------- .../publish/PublishingServiceImpl.java | 11 +++-- 3 files changed, 6 insertions(+), 46 deletions(-) delete mode 100644 mod-pubsub-server/src/main/java/org/folio/services/impl/KafkaProducersBuilder.java diff --git a/NEWS.md b/NEWS.md index b471c52b..261109df 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,5 +1,6 @@ ## xxxx-xx-xx v2.1.0-SNAPSHOT * [MODINV-373](https://issues.folio.org/browse/MODINV-373) Ensure exactly once processing for interaction via Kafka. +* [MODDATAIMP-372](https://issues.folio.org/browse/MODDATAIMP-372) Data Import job creates SRS records but not all expected Inventory records ## 2021-03-06 v2.0.1 * [MODPUBSUB-152](https://issues.folio.org/browse/MODPUBSUB-152) Module registration in mod-pubsub fails when MessagingDescriptor contains no publications diff --git a/mod-pubsub-server/src/main/java/org/folio/services/impl/KafkaProducersBuilder.java b/mod-pubsub-server/src/main/java/org/folio/services/impl/KafkaProducersBuilder.java deleted file mode 100644 index 8beac1e4..00000000 --- a/mod-pubsub-server/src/main/java/org/folio/services/impl/KafkaProducersBuilder.java +++ /dev/null @@ -1,40 +0,0 @@ -package org.folio.services.impl; - -import io.vertx.core.Vertx; -import io.vertx.kafka.client.producer.KafkaProducer; -import org.apache.commons.collections4.list.UnmodifiableList; -import org.folio.kafka.KafkaConfig; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import java.util.Iterator; -import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import static com.google.common.collect.Iterators.cycle; -import static org.folio.rest.RestVerticle.MODULE_SPECIFIC_ARGS; - -@Component -public class KafkaProducersBuilder { - - // number of producers to be created is equal to allocated thread pool - private static final int NUMBER_OF_PRODUCERS = - Integer.parseInt(MODULE_SPECIFIC_ARGS.getOrDefault("event.publishing.thread.pool.size", "20")); - private Iterator> producerIterator; - - public KafkaProducersBuilder(@Autowired Vertx vertx, @Autowired KafkaConfig config) { - List> producers = - Stream.generate(() -> KafkaProducer.create(vertx, config.getProducerProps())) - .limit(NUMBER_OF_PRODUCERS) - .collect(Collectors.toList()); - producerIterator = cycle(new UnmodifiableList<>(producers)); - } - - public KafkaProducer getKafkaProducer() { - return Stream.generate(producerIterator::next) - .filter(producer -> !producer.writeQueueFull()) - .findAny() - .orElseThrow(() -> new RuntimeException("No eligible Kafka Producer available")); - } -} diff --git a/mod-pubsub-server/src/main/java/org/folio/services/publish/PublishingServiceImpl.java b/mod-pubsub-server/src/main/java/org/folio/services/publish/PublishingServiceImpl.java index fd19f0a1..188c5afd 100644 --- a/mod-pubsub-server/src/main/java/org/folio/services/publish/PublishingServiceImpl.java +++ b/mod-pubsub-server/src/main/java/org/folio/services/publish/PublishingServiceImpl.java @@ -5,6 +5,7 @@ import io.vertx.core.Vertx; import io.vertx.core.WorkerExecutor; import io.vertx.core.json.Json; +import io.vertx.kafka.client.producer.KafkaProducer; import io.vertx.kafka.client.producer.impl.KafkaProducerRecordImpl; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -13,7 +14,6 @@ import org.folio.rest.jaxrs.model.AuditMessage; import org.folio.rest.jaxrs.model.Event; import org.folio.services.audit.AuditService; -import org.folio.services.impl.KafkaProducersBuilder; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -28,17 +28,16 @@ public class PublishingServiceImpl implements PublishingService { private static final int THREAD_POOL_SIZE = Integer.parseInt(MODULE_SPECIFIC_ARGS.getOrDefault("event.publishing.thread.pool.size", "20")); - private KafkaProducersBuilder manager; private KafkaConfig kafkaConfig; private WorkerExecutor executor; private AuditService auditService; + private Vertx vertx; public PublishingServiceImpl(@Autowired Vertx vertx, - @Autowired KafkaProducersBuilder manager, @Autowired KafkaConfig kafkaConfig) { - this.manager = manager; this.kafkaConfig = kafkaConfig; this.auditService = AuditService.createProxy(vertx); + this.vertx = vertx; this.executor = vertx.createSharedWorkerExecutor("event-publishing-thread-pool", THREAD_POOL_SIZE); } @@ -46,9 +45,9 @@ public PublishingServiceImpl(@Autowired Vertx vertx, public Future sendEvent(Event event, String tenantId) { Promise promise = Promise.promise(); PubSubConfig config = new PubSubConfig(kafkaConfig.getEnvId(), tenantId, event.getEventType()); - executor.executeBlocking(future -> { + executor.executeBlocking(future -> { try { - manager.getKafkaProducer().write(new KafkaProducerRecordImpl<>(config.getTopicName(), Json.encode(event)), done -> { + KafkaProducer.createShared(vertx, config.getTopicName() + "_Producer", kafkaConfig.getProducerProps()).write(new KafkaProducerRecordImpl<>(config.getTopicName(), Json.encode(event)), done -> { if (done.succeeded()) { LOGGER.info("Sent {} event with id '{}' to topic {}", event.getEventType(), event.getId(), config.getTopicName()); auditService.saveAuditMessage(constructJsonAuditMessage(event, tenantId, AuditMessage.State.PUBLISHED));