Skip to content

Commit

Permalink
MODDATAIMP-372 (#141)
Browse files Browse the repository at this point in the history
(cherry picked from commit f3cb9b3)
  • Loading branch information
OleksiiKuzminov authored and KaterynaSenchenko committed Mar 25, 2021
1 parent 409fc6e commit f9e8098
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 46 deletions.
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## xxxx-xx-xx v2.1.0-SNAPSHOT
* [MODINV-373](https://issues.folio.org/browse/MODINV-373) Ensure exactly once processing for interaction via Kafka.
* [MODDATAIMP-372](https://issues.folio.org/browse/MODDATAIMP-372) Data Import job creates SRS records but not all expected Inventory records

## 2021-03-06 v2.0.1
* [MODPUBSUB-152](https://issues.folio.org/browse/MODPUBSUB-152) Module registration in mod-pubsub fails when MessagingDescriptor contains no publications
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.vertx.core.Vertx;
import io.vertx.core.WorkerExecutor;
import io.vertx.core.json.Json;
import io.vertx.kafka.client.producer.KafkaProducer;
import io.vertx.kafka.client.producer.impl.KafkaProducerRecordImpl;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -13,7 +14,6 @@
import org.folio.rest.jaxrs.model.AuditMessage;
import org.folio.rest.jaxrs.model.Event;
import org.folio.services.audit.AuditService;
import org.folio.services.impl.KafkaProducersBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

Expand All @@ -28,27 +28,26 @@ public class PublishingServiceImpl implements PublishingService {
private static final int THREAD_POOL_SIZE =
Integer.parseInt(MODULE_SPECIFIC_ARGS.getOrDefault("event.publishing.thread.pool.size", "20"));

private KafkaProducersBuilder manager;
private KafkaConfig kafkaConfig;
private WorkerExecutor executor;
private AuditService auditService;
private Vertx vertx;

public PublishingServiceImpl(@Autowired Vertx vertx,
@Autowired KafkaProducersBuilder manager,
@Autowired KafkaConfig kafkaConfig) {
this.manager = manager;
this.kafkaConfig = kafkaConfig;
this.auditService = AuditService.createProxy(vertx);
this.vertx = vertx;
this.executor = vertx.createSharedWorkerExecutor("event-publishing-thread-pool", THREAD_POOL_SIZE);
}

@Override
public Future<Boolean> sendEvent(Event event, String tenantId) {
Promise<Boolean> promise = Promise.promise();
PubSubConfig config = new PubSubConfig(kafkaConfig.getEnvId(), tenantId, event.getEventType());
executor.<Boolean>executeBlocking(future -> {
executor.executeBlocking(future -> {
try {
manager.getKafkaProducer().write(new KafkaProducerRecordImpl<>(config.getTopicName(), Json.encode(event)), done -> {
KafkaProducer.<String, String>createShared(vertx, config.getTopicName() + "_Producer", kafkaConfig.getProducerProps()).write(new KafkaProducerRecordImpl<>(config.getTopicName(), Json.encode(event)), done -> {
if (done.succeeded()) {
LOGGER.info("Sent {} event with id '{}' to topic {}", event.getEventType(), event.getId(), config.getTopicName());
auditService.saveAuditMessage(constructJsonAuditMessage(event, tenantId, AuditMessage.State.PUBLISHED));
Expand Down

0 comments on commit f9e8098

Please sign in to comment.