From 4a2bbaee66f4fa6c0f911bf4d5a7d376c62cf828 Mon Sep 17 00:00:00 2001 From: Pavlo Smahin Date: Thu, 9 Jan 2025 14:01:25 +0200 Subject: [PATCH] feat: extend domain events with eventId and eventTs (#1133) Closes: MODINVSTOR-1322 --- NEWS.md | 1 + docker/.env | 2 +- docker/docker-compose.yml | 62 ++++++------- .../services/domainevent/DomainEvent.java | 28 +++++- .../services/domainevent/DomainEventTest.java | 93 +++++++++++++++++++ 5 files changed, 146 insertions(+), 40 deletions(-) create mode 100644 src/test/java/org/folio/services/domainevent/DomainEventTest.java diff --git a/NEWS.md b/NEWS.md index 4a5b86175..d330a42ba 100644 --- a/NEWS.md +++ b/NEWS.md @@ -16,6 +16,7 @@ * Service points synchronization: create a verticle ([MODINVSTOR-1245](https://folio-org.atlassian.net/browse/MODINVSTOR-1245)) * Do not return routing service points by default ([MODINVSTOR-1219](https://folio-org.atlassian.net/browse/MODINVSTOR-1219)) * Implement Kafka Event Publishing for Call-Number Type CRUD Operations ([MODINVSTOR-1275](https://folio-org.atlassian.net/browse/MODINVSTOR-1275)) +* Extend domain events with eventId and eventTs ([MODINVSTOR-1322](https://folio-org.atlassian.net/browse/MODINVSTOR-1322)) ### Bug fixes * Add item barcode right truncation search index ([MODINVSTOR-1292](https://folio-org.atlassian.net/browse/MODINVSTOR-1292)) diff --git a/docker/.env b/docker/.env index aae0d08d9..13a144e62 100644 --- a/docker/.env +++ b/docker/.env @@ -8,7 +8,7 @@ PGADMIN_DEFAULT_EMAIL=user@domain.com PGADMIN_DEFAULT_PASSWORD=admin PGADMIN_PORT=5050 KAFKA_HOST=kafka -KAFKA_PORT=9092 +KAFKA_PORT=9093 REPLICATION_FACTOR=1 ENV=folio DEBUG_PORT=5005 diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 19ff478ee..ec9fafc2e 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -3,7 +3,7 @@ version: "3.9" services: postgres: container_name: postgres_mod-inventory-storage - image: postgres:12-alpine + image: postgres:16-alpine ports: - "5432:5432" volumes: @@ -18,7 +18,7 @@ services: pgadmin: container_name: pgadmin_mod-inventory-storage - image: dpage/pgadmin4:6.7 + image: dpage/pgadmin4:8.13 ports: - ${PGADMIN_PORT}:80 volumes: @@ -30,50 +30,42 @@ services: networks: - mod-inventory-storage-local - zookeeper: - container_name: zookeeper_mod-inventory-storage - image: wurstmeister/zookeeper:3.4.6 - ports: - - "2181:2181" + kafka: + container_name: kafka_mod-search + image: apache/kafka-native networks: - mod-inventory-storage-local - - kafka: - container_name: kafka_mod-inventory-storage - image: wurstmeister/kafka:2.13-2.8.1 ports: - "9092:9092" - - "29092:29092" + - "9093:9093" environment: - KAFKA_ADVERTISED_LISTENERS: "INTERNAL://:9092,LOCAL://localhost:29092" - KAFKA_BROKER_ID: "1" - KAFKA_LOG_RETENTION_HOURS: "-1" - KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" - KAFKA_LISTENERS: "INTERNAL://:9092,LOCAL://:29092" - KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL" - KAFKA_LOG_RETENTION_BYTES: "-1" - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "LOCAL:PLAINTEXT,INTERNAL:PLAINTEXT" - KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" - depends_on: - - zookeeper - networks: - - mod-inventory-storage-local + # Configure listeners for both docker and host communication + KAFKA_LISTENERS: CONTROLLER://localhost:9091,HOST://0.0.0.0:9092,DOCKER://0.0.0.0:9093 + KAFKA_ADVERTISED_LISTENERS: HOST://localhost:9092,DOCKER://kafka:9093 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,DOCKER:PLAINTEXT,HOST:PLAINTEXT + # Settings required for KRaft mode + KAFKA_NODE_ID: 1 + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9091 + # Listener to use for broker-to-broker communication + KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER + # Required for a single node cluster + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 kafka-ui: - container_name: kafka-ui_mod-inventory-storage - image: provectuslabs/kafka-ui:latest + container_name: kafka-ui_mod-search + image: ghcr.io/kafbat/kafka-ui:latest + networks: + - mod-inventory-storage-local ports: - "8080:8080" - depends_on: - - zookeeper - - kafka environment: + DYNAMIC_CONFIG_ENABLED: 'true' KAFKA_CLUSTERS_0_NAME: local - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092 - KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181 - KAFKA_CLUSTERS_0_JMXPORT: 9997 - networks: - - mod-inventory-storage-local + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9093 + depends_on: + - kafka mod-inventory-storage: container_name: mod-inventory-storage diff --git a/src/main/java/org/folio/services/domainevent/DomainEvent.java b/src/main/java/org/folio/services/domainevent/DomainEvent.java index 64200e72e..b5d9aa4ad 100644 --- a/src/main/java/org/folio/services/domainevent/DomainEvent.java +++ b/src/main/java/org/folio/services/domainevent/DomainEvent.java @@ -10,10 +10,14 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.UUID; import org.apache.commons.lang3.builder.ToStringBuilder; @JsonInclude(JsonInclude.Include.NON_NULL) public class DomainEvent { + + private UUID eventId; + private Long eventTs; @JsonProperty("old") private T oldEntity; @JsonProperty("new") @@ -24,6 +28,8 @@ public class DomainEvent { @JsonCreator public DomainEvent(@JsonProperty("old") T oldEntity, @JsonProperty("new") T newEntity, @JsonProperty("type") DomainEventType type, @JsonProperty("tenant") String tenant) { + this.eventId = UUID.randomUUID(); + this.eventTs = System.currentTimeMillis(); this.oldEntity = oldEntity; this.newEntity = newEntity; this.type = type; @@ -50,14 +56,26 @@ public static DomainEvent reindexEvent(String tenant) { return new DomainEvent<>(null, null, REINDEX, tenant); } - public static DomainEvent reindexEvent(String tenant, T newEntity) { - return new DomainEvent<>(null, newEntity, REINDEX, tenant); - } - public static DomainEvent asyncMigrationEvent(T job, String tenant) { return new DomainEvent<>(null, job, MIGRATION, tenant); } + public UUID getEventId() { + return eventId; + } + + public void setEventId(UUID eventId) { + this.eventId = eventId; + } + + public Long getEventTs() { + return eventTs; + } + + public void setEventTs(Long eventTs) { + this.eventTs = eventTs; + } + public T getOldEntity() { return oldEntity; } @@ -93,6 +111,8 @@ public void setTenant(String tenant) { @Override public String toString() { return new ToStringBuilder(this) + .append("eventId", eventId) + .append("eventTs", eventTs) .append("oldEntity", oldEntity) .append("newEntity", newEntity) .append("type", type) diff --git a/src/test/java/org/folio/services/domainevent/DomainEventTest.java b/src/test/java/org/folio/services/domainevent/DomainEventTest.java new file mode 100644 index 000000000..4cb765969 --- /dev/null +++ b/src/test/java/org/folio/services/domainevent/DomainEventTest.java @@ -0,0 +1,93 @@ +package org.folio.services.domainevent; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +import org.junit.jupiter.api.Test; + +class DomainEventTest { + + @Test + void testCreateEvent() { + String newEntity = "newEntity"; + String tenant = "tenant"; + DomainEvent event = DomainEvent.createEvent(newEntity, tenant); + + assertNotNull(event.getEventId()); + assertNotNull(event.getEventTs()); + assertEquals(newEntity, event.getNewEntity()); + assertNull(event.getOldEntity()); + assertEquals(DomainEventType.CREATE, event.getType()); + assertEquals(tenant, event.getTenant()); + } + + @Test + void testUpdateEvent() { + String oldEntity = "oldEntity"; + String newEntity = "newEntity"; + String tenant = "tenant"; + DomainEvent event = DomainEvent.updateEvent(oldEntity, newEntity, tenant); + + assertNotNull(event.getEventId()); + assertNotNull(event.getEventTs()); + assertEquals(newEntity, event.getNewEntity()); + assertEquals(oldEntity, event.getOldEntity()); + assertEquals(DomainEventType.UPDATE, event.getType()); + assertEquals(tenant, event.getTenant()); + } + + @Test + void testDeleteEvent() { + String oldEntity = "oldEntity"; + String tenant = "tenant"; + DomainEvent event = DomainEvent.deleteEvent(oldEntity, tenant); + + assertNotNull(event.getEventId()); + assertNotNull(event.getEventTs()); + assertNull(event.getNewEntity()); + assertEquals(oldEntity, event.getOldEntity()); + assertEquals(DomainEventType.DELETE, event.getType()); + assertEquals(tenant, event.getTenant()); + } + + @Test + void testDeleteAllEvent() { + String tenant = "tenant"; + DomainEvent event = DomainEvent.deleteAllEvent(tenant); + + assertNotNull(event.getEventId()); + assertNotNull(event.getEventTs()); + assertNull(event.getNewEntity()); + assertNull(event.getOldEntity()); + assertEquals(DomainEventType.DELETE_ALL, event.getType()); + assertEquals(tenant, event.getTenant()); + } + + @Test + void testReindexEvent() { + String tenant = "tenant"; + DomainEvent event = DomainEvent.reindexEvent(tenant); + + assertNotNull(event.getEventId()); + assertNotNull(event.getEventTs()); + assertNull(event.getNewEntity()); + assertNull(event.getOldEntity()); + assertEquals(DomainEventType.REINDEX, event.getType()); + assertEquals(tenant, event.getTenant()); + } + + @Test + void testAsyncMigrationEvent() { + String job = "job"; + String tenant = "tenant"; + DomainEvent event = DomainEvent.asyncMigrationEvent(job, tenant); + + assertNotNull(event.getEventId()); + assertNotNull(event.getEventTs()); + assertEquals(job, event.getNewEntity()); + assertNull(event.getOldEntity()); + assertEquals(DomainEventType.MIGRATION, event.getType()); + assertEquals(tenant, event.getTenant()); + } +}