Skip to content

Commit

Permalink
Merge branch 'master' into MODINVSTOR-1280
Browse files Browse the repository at this point in the history
  • Loading branch information
SvitlanaKovalova1 committed Jan 10, 2025
2 parents 5350223 + 4a2bbae commit 05395b3
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 40 deletions.
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* Service points synchronization: create a verticle ([MODINVSTOR-1245](https://folio-org.atlassian.net/browse/MODINVSTOR-1245))
* Do not return routing service points by default ([MODINVSTOR-1219](https://folio-org.atlassian.net/browse/MODINVSTOR-1219))
* Implement Kafka Event Publishing for Call-Number Type CRUD Operations ([MODINVSTOR-1275](https://folio-org.atlassian.net/browse/MODINVSTOR-1275))
* Extend domain events with eventId and eventTs ([MODINVSTOR-1322](https://folio-org.atlassian.net/browse/MODINVSTOR-1322))

### Bug fixes
* Add item barcode right truncation search index ([MODINVSTOR-1292](https://folio-org.atlassian.net/browse/MODINVSTOR-1292))
Expand Down
2 changes: 1 addition & 1 deletion docker/.env
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ [email protected]
PGADMIN_DEFAULT_PASSWORD=admin
PGADMIN_PORT=5050
KAFKA_HOST=kafka
KAFKA_PORT=9092
KAFKA_PORT=9093
REPLICATION_FACTOR=1
ENV=folio
DEBUG_PORT=5005
62 changes: 27 additions & 35 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ version: "3.9"
services:
postgres:
container_name: postgres_mod-inventory-storage
image: postgres:12-alpine
image: postgres:16-alpine
ports:
- "5432:5432"
volumes:
Expand All @@ -18,7 +18,7 @@ services:

pgadmin:
container_name: pgadmin_mod-inventory-storage
image: dpage/pgadmin4:6.7
image: dpage/pgadmin4:8.13
ports:
- ${PGADMIN_PORT}:80
volumes:
Expand All @@ -30,50 +30,42 @@ services:
networks:
- mod-inventory-storage-local

zookeeper:
container_name: zookeeper_mod-inventory-storage
image: wurstmeister/zookeeper:3.4.6
ports:
- "2181:2181"
kafka:
container_name: kafka_mod-search
image: apache/kafka-native
networks:
- mod-inventory-storage-local

kafka:
container_name: kafka_mod-inventory-storage
image: wurstmeister/kafka:2.13-2.8.1
ports:
- "9092:9092"
- "29092:29092"
- "9093:9093"
environment:
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://:9092,LOCAL://localhost:29092"
KAFKA_BROKER_ID: "1"
KAFKA_LOG_RETENTION_HOURS: "-1"
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_LISTENERS: "INTERNAL://:9092,LOCAL://:29092"
KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
KAFKA_LOG_RETENTION_BYTES: "-1"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "LOCAL:PLAINTEXT,INTERNAL:PLAINTEXT"
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
depends_on:
- zookeeper
networks:
- mod-inventory-storage-local
# Configure listeners for both docker and host communication
KAFKA_LISTENERS: CONTROLLER://localhost:9091,HOST://0.0.0.0:9092,DOCKER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: HOST://localhost:9092,DOCKER://kafka:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,DOCKER:PLAINTEXT,HOST:PLAINTEXT
# Settings required for KRaft mode
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9091
# Listener to use for broker-to-broker communication
KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
# Required for a single node cluster
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

kafka-ui:
container_name: kafka-ui_mod-inventory-storage
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui_mod-search
image: ghcr.io/kafbat/kafka-ui:latest
networks:
- mod-inventory-storage-local
ports:
- "8080:8080"
depends_on:
- zookeeper
- kafka
environment:
DYNAMIC_CONFIG_ENABLED: 'true'
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
KAFKA_CLUSTERS_0_JMXPORT: 9997
networks:
- mod-inventory-storage-local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9093
depends_on:
- kafka

mod-inventory-storage:
container_name: mod-inventory-storage
Expand Down
28 changes: 24 additions & 4 deletions src/main/java/org/folio/services/domainevent/DomainEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.UUID;
import org.apache.commons.lang3.builder.ToStringBuilder;

@JsonInclude(JsonInclude.Include.NON_NULL)
public class DomainEvent<T> {

private UUID eventId;
private Long eventTs;
@JsonProperty("old")
private T oldEntity;
@JsonProperty("new")
Expand All @@ -24,6 +28,8 @@ public class DomainEvent<T> {
@JsonCreator
public DomainEvent(@JsonProperty("old") T oldEntity, @JsonProperty("new") T newEntity,
@JsonProperty("type") DomainEventType type, @JsonProperty("tenant") String tenant) {
this.eventId = UUID.randomUUID();
this.eventTs = System.currentTimeMillis();
this.oldEntity = oldEntity;
this.newEntity = newEntity;
this.type = type;
Expand All @@ -50,14 +56,26 @@ public static <T> DomainEvent<T> reindexEvent(String tenant) {
return new DomainEvent<>(null, null, REINDEX, tenant);
}

public static <T> DomainEvent<T> reindexEvent(String tenant, T newEntity) {
return new DomainEvent<>(null, newEntity, REINDEX, tenant);
}

public static <T> DomainEvent<T> asyncMigrationEvent(T job, String tenant) {
return new DomainEvent<>(null, job, MIGRATION, tenant);
}

public UUID getEventId() {
return eventId;
}

public void setEventId(UUID eventId) {
this.eventId = eventId;
}

public Long getEventTs() {
return eventTs;
}

public void setEventTs(Long eventTs) {
this.eventTs = eventTs;
}

public T getOldEntity() {
return oldEntity;
}
Expand Down Expand Up @@ -93,6 +111,8 @@ public void setTenant(String tenant) {
@Override
public String toString() {
return new ToStringBuilder(this)
.append("eventId", eventId)
.append("eventTs", eventTs)
.append("oldEntity", oldEntity)
.append("newEntity", newEntity)
.append("type", type)
Expand Down
93 changes: 93 additions & 0 deletions src/test/java/org/folio/services/domainevent/DomainEventTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package org.folio.services.domainevent;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;

import org.junit.jupiter.api.Test;

class DomainEventTest {

@Test
void testCreateEvent() {
String newEntity = "newEntity";
String tenant = "tenant";
DomainEvent<String> event = DomainEvent.createEvent(newEntity, tenant);

assertNotNull(event.getEventId());
assertNotNull(event.getEventTs());
assertEquals(newEntity, event.getNewEntity());
assertNull(event.getOldEntity());
assertEquals(DomainEventType.CREATE, event.getType());
assertEquals(tenant, event.getTenant());
}

@Test
void testUpdateEvent() {
String oldEntity = "oldEntity";
String newEntity = "newEntity";
String tenant = "tenant";
DomainEvent<String> event = DomainEvent.updateEvent(oldEntity, newEntity, tenant);

assertNotNull(event.getEventId());
assertNotNull(event.getEventTs());
assertEquals(newEntity, event.getNewEntity());
assertEquals(oldEntity, event.getOldEntity());
assertEquals(DomainEventType.UPDATE, event.getType());
assertEquals(tenant, event.getTenant());
}

@Test
void testDeleteEvent() {
String oldEntity = "oldEntity";
String tenant = "tenant";
DomainEvent<String> event = DomainEvent.deleteEvent(oldEntity, tenant);

assertNotNull(event.getEventId());
assertNotNull(event.getEventTs());
assertNull(event.getNewEntity());
assertEquals(oldEntity, event.getOldEntity());
assertEquals(DomainEventType.DELETE, event.getType());
assertEquals(tenant, event.getTenant());
}

@Test
void testDeleteAllEvent() {
String tenant = "tenant";
DomainEvent<String> event = DomainEvent.deleteAllEvent(tenant);

assertNotNull(event.getEventId());
assertNotNull(event.getEventTs());
assertNull(event.getNewEntity());
assertNull(event.getOldEntity());
assertEquals(DomainEventType.DELETE_ALL, event.getType());
assertEquals(tenant, event.getTenant());
}

@Test
void testReindexEvent() {
String tenant = "tenant";
DomainEvent<String> event = DomainEvent.reindexEvent(tenant);

assertNotNull(event.getEventId());
assertNotNull(event.getEventTs());
assertNull(event.getNewEntity());
assertNull(event.getOldEntity());
assertEquals(DomainEventType.REINDEX, event.getType());
assertEquals(tenant, event.getTenant());
}

@Test
void testAsyncMigrationEvent() {
String job = "job";
String tenant = "tenant";
DomainEvent<String> event = DomainEvent.asyncMigrationEvent(job, tenant);

assertNotNull(event.getEventId());
assertNotNull(event.getEventTs());
assertEquals(job, event.getNewEntity());
assertNull(event.getOldEntity());
assertEquals(DomainEventType.MIGRATION, event.getType());
assertEquals(tenant, event.getTenant());
}
}

0 comments on commit 05395b3

Please sign in to comment.