Skip to content

Commit

Permalink
feat(inventory-reindex-records): Add boundsWith into reindex instance…
Browse files Browse the repository at this point in the history
… event (#1068)

* feat(inventory-reindex-records): Add boundsWith into reindex instance event

- Rework database query
- Change object sent to kafka

Implements: MODINVSTOR-1249
  • Loading branch information
viacheslavkol authored Sep 4, 2024
1 parent c41f8c5 commit ed806c0
Show file tree
Hide file tree
Showing 8 changed files with 160 additions and 71 deletions.
29 changes: 29 additions & 0 deletions src/main/java/org/folio/persist/InstanceRepository.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
package org.folio.persist;

import static org.folio.rest.impl.BoundWithPartApi.BOUND_WITH_TABLE;
import static org.folio.rest.impl.HoldingsStorageApi.HOLDINGS_RECORD_TABLE;
import static org.folio.rest.impl.ItemStorageApi.ITEM_TABLE;
import static org.folio.rest.persist.PgUtil.postgresClient;

import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;
import io.vertx.sqlclient.RowStream;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
Expand Down Expand Up @@ -110,4 +115,28 @@ public Future<Response> getInstanceSet(boolean instance, boolean holdingsRecords
return Future.failedFuture(e);
}
}

public Future<List<String>> getReindexInstances(String fromId, String toId, boolean notConsortiumCentralTenant) {
var sql = new StringBuilder("SELECT i.jsonb || jsonb_build_object('isBoundWith', EXISTS(SELECT 1 FROM ");
sql.append(postgresClientFuturized.getFullTableName(BOUND_WITH_TABLE));
sql.append(" as bw JOIN ");
sql.append(postgresClientFuturized.getFullTableName(ITEM_TABLE));
sql.append(" as it ON it.id = bw.itemid JOIN ");
sql.append(postgresClientFuturized.getFullTableName(HOLDINGS_RECORD_TABLE));
sql.append(" as hr ON hr.id = bw.holdingsrecordid WHERE hr.instanceId = i.id LIMIT 1)) FROM ");
sql.append(postgresClientFuturized.getFullTableName(INSTANCE_TABLE));
sql.append(" i WHERE i.id >= '").append(fromId).append("' AND i.id <= '").append(toId).append("'");
if (notConsortiumCentralTenant) {
sql.append(" AND i.jsonb->>'source' NOT LIKE 'CONSORTIUM-%'");
}
sql.append(";");

return postgresClient.select(sql.toString()).map(rows -> {
var resultList = new LinkedList<String>();
for (var row : rows) {
resultList.add(row.getJsonObject(0).encode());
}
return resultList;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static io.vertx.core.Future.succeededFuture;
import static org.apache.logging.log4j.LogManager.getLogger;
import static org.folio.InventoryKafkaTopic.INSTANCE;
import static org.folio.InventoryKafkaTopic.REINDEX_RECORDS;
import static org.folio.rest.tools.utils.TenantTool.tenantId;

import io.vertx.core.Context;
Expand All @@ -20,19 +21,23 @@

public class InstanceDomainEventPublisher extends AbstractDomainEventPublisher<Instance, Instance> {
private static final Logger log = getLogger(InstanceDomainEventPublisher.class);

private final CommonDomainEventPublisher<String> instanceReindexPublisher;

public InstanceDomainEventPublisher(Context context, Map<String, String> okapiHeaders) {
super(new InstanceRepository(context, okapiHeaders),
new CommonDomainEventPublisher<>(context, okapiHeaders,
INSTANCE.fullTopicName(tenantId(okapiHeaders))));
instanceReindexPublisher = new CommonDomainEventPublisher<>(context, okapiHeaders,
REINDEX_RECORDS.fullTopicName(tenantId(okapiHeaders)));
}

public Future<Void> publishReindexInstances(String key, List<Instance> instances) {
public Future<Void> publishReindexInstances(String key, List<String> instances) {
if (CollectionUtils.isEmpty(instances) || StringUtils.isBlank(key)) {
return succeededFuture();
}

return domainEventService.publishReindexRecords(key, PublishReindexRecords.RecordType.INSTANCE, instances);
return instanceReindexPublisher.publishReindexRecords(key, PublishReindexRecords.RecordType.INSTANCE, instances);
}

public Future<Void> publishInstancesCreated(List<Instance> instances) {
Expand Down
18 changes: 2 additions & 16 deletions src/main/java/org/folio/services/instance/InstanceService.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import org.folio.persist.InstanceRepository;
import org.folio.rest.jaxrs.model.Instance;
import org.folio.rest.jaxrs.resource.InstanceStorage;
import org.folio.rest.persist.Criteria.Criteria;
import org.folio.rest.persist.Criteria.Criterion;
import org.folio.rest.persist.PostgresClient;
import org.folio.rest.support.CqlQuery;
import org.folio.rest.support.HridManager;
Expand Down Expand Up @@ -203,25 +201,13 @@ public Future<Response> deleteInstances(String cql) {
}

public Future<Void> publishReindexInstanceRecords(String rangeId, String fromId, String toId) {
var criteriaFrom = new Criteria().setJSONB(false)
.addField("id").setOperation(">=").setVal(fromId);
var criteriaTo = new Criteria().setJSONB(false)
.addField("id").setOperation("<=").setVal(toId);
final Criterion criterion = new Criterion(criteriaFrom)
.addCriterion(criteriaTo);

return consortiumService.getConsortiumData(okapiHeaders)
.map(consortiumDataOptional -> consortiumDataOptional
.map(consortiumData -> isCentralTenantId(okapiHeaders.get(TENANT), consortiumData))
.orElse(false))
.compose(isCentralTenant -> {
if (Boolean.TRUE.equals(isCentralTenant)) {
return instanceRepository.get(criterion);
}
var nonConsortia = new Criteria()
.addField("'source'").setOperation("NOT LIKE").setVal("CONSORTIUM-");
criterion.addCriterion(nonConsortia);
return instanceRepository.get(criterion);
var notConsortiumCentralTenant = Boolean.FALSE.equals(isCentralTenant);
return instanceRepository.getReindexInstances(fromId, toId, notConsortiumCentralTenant);
})
.compose(instances -> domainEventPublisher.publishReindexInstances(rangeId, instances));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import static org.folio.rest.support.http.InterfaceUrls.materialTypesStorageUrl;
import static org.folio.utility.ModuleUtility.getClient;
import static org.folio.utility.RestUtility.TENANT_ID;
import static org.folio.utility.RestUtility.USER_TENANTS_PATH;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;

Expand Down Expand Up @@ -60,6 +59,7 @@ public abstract class TestBaseWithInventoryUtil extends TestBase {
public static final UUID SECOND_FLOOR_LOCATION_ID = UUID.randomUUID();
public static final UUID THIRD_FLOOR_LOCATION_ID = UUID.randomUUID();
public static final UUID FOURTH_FLOOR_LOCATION_ID = UUID.randomUUID();
public static final String USER_TENANTS_PATH = "/user-tenants?limit=1";
protected static final String PERMANENT_LOCATION_ID_KEY = "permanentLocationId";
protected static final String TEMPORARY_LOCATION_ID_KEY = "temporaryLocationId";
protected static final String EFFECTIVE_LOCATION_ID_KEY = "effectiveLocationId";
Expand Down Expand Up @@ -302,7 +302,7 @@ protected static JsonObject contributor(UUID contributorNameTypeId, String name)
.put("name", name);
}

protected static JsonObject createInstanceRequest(
public static JsonObject createInstanceRequest(
UUID id,
String source,
String title,
Expand All @@ -321,7 +321,9 @@ protected static JsonObject createInstanceRequest(
instanceToCreate.put("source", source);
instanceToCreate.put("identifiers", identifiers);
instanceToCreate.put("contributors", contributors);
instanceToCreate.put("instanceTypeId", instanceTypeId.toString());
if (instanceTypeId != null) {
instanceToCreate.put("instanceTypeId", instanceTypeId.toString());
}
instanceToCreate.put("tags", new JsonObject().put("tagList", tags));
instanceToCreate.put("_version", 1);
return instanceToCreate;
Expand Down
18 changes: 9 additions & 9 deletions src/test/java/org/folio/rest/impl/BaseIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import static javax.ws.rs.core.MediaType.TEXT_PLAIN;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.folio.postgres.testing.PostgresTesterContainer.getImageName;
import static org.folio.rest.api.TestBaseWithInventoryUtil.USER_TENANTS_PATH;
import static org.folio.utility.RestUtility.TENANT_ID;
import static org.folio.utility.RestUtility.USER_TENANTS_PATH;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -137,6 +137,14 @@ protected static Future<TestResponse> doRequest(HttpClient client, HttpMethod me
});
}

public static void mockUserTenantsForNonConsortiumMember() {
var emptyUserTenantsCollection = new JsonObject()
.put("userTenants", JsonArray.of());
wm.stubFor(WireMock.get(USER_TENANTS_PATH)
.withHeader(XOkapiHeaders.TENANT, equalToIgnoreCase(TENANT_ID))
.willReturn(WireMock.ok().withBody(emptyUserTenantsCollection.encodePrettily())));
}

protected static Handler<AsyncResult<TestResponse>> verifyStatus(VertxTestContext ctx, HttpStatus expectedStatus) {
return ctx.succeeding(response -> ctx.verify(() -> assertEquals(expectedStatus.toInt(), response.status())));
}
Expand Down Expand Up @@ -182,14 +190,6 @@ public static JsonObject pojo2JsonObject(Object entity) {
return TestBase.pojo2JsonObject(entity);
}

private static void mockUserTenantsForNonConsortiumMember() {
JsonObject emptyUserTenantsCollection = new JsonObject()
.put("userTenants", JsonArray.of());
wm.stubFor(WireMock.get(USER_TENANTS_PATH)
.withHeader(XOkapiHeaders.TENANT, equalToIgnoreCase(TENANT_ID))
.willReturn(WireMock.ok().withBody(emptyUserTenantsCollection.encodePrettily())));
}

private static Future<TestResponse> enableTenant(String tenant, VertxTestContext ctx, HttpClient client) {
return doPost(client, "/_/tenant", tenant, BaseIntegrationTest.getJob(false))
.map(buffer -> buffer.jsonBody().getString("id"))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,34 @@
package org.folio.rest.impl;

import static org.folio.HttpStatus.HTTP_CREATED;
import static org.folio.persist.InstanceRepository.INSTANCE_TABLE;
import static org.folio.rest.api.TestBaseWithInventoryUtil.createInstanceRequest;
import static org.folio.rest.impl.BoundWithPartApi.BOUND_WITH_TABLE;
import static org.folio.rest.support.AwaitConfiguration.awaitAtMost;
import static org.folio.utility.ModuleUtility.vertxUrl;
import static org.folio.utility.RestUtility.TENANT_ID;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.params.provider.Arguments.arguments;

import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpClient;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.junit5.VertxTestContext;
import java.util.List;
import java.util.UUID;
import java.util.stream.Stream;
import lombok.SneakyThrows;
import org.assertj.core.api.Assertions;
import org.folio.rest.jaxrs.model.Holding;
import org.folio.rest.jaxrs.model.Item;
import org.folio.rest.jaxrs.model.PublishReindexRecords;
import org.folio.rest.jaxrs.model.RecordIdsRange;
import org.folio.rest.persist.PostgresClient;
import org.folio.rest.support.builders.ItemRequestBuilder;
import org.folio.rest.support.messages.matchers.EventMessageMatchers;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
Expand All @@ -44,30 +51,96 @@ void post_shouldReturn201_whenPublishingRecordsForReindex(String table,
Vertx vertx,
VertxTestContext ctx) {
var postgresClient = PostgresClient.getInstance(vertx, TENANT_ID);
Future.all(
records.stream()
.map(record -> postgresClient.save(table, record))
.toList()
)
.onFailure(ctx::failNow)
.onSuccess(id -> ctx.completeNow());

HttpClient client = vertx.createHttpClient();
var client = vertx.createHttpClient();
var rangeId = UUID.randomUUID().toString();
var publishRequestBody = new PublishReindexRecords()
.withId(rangeId)
.withRecordType(recordType)
.withRecordIdsRange(
new RecordIdsRange().withFrom(RECORD1_ID).withTo(RECORD2_ID));
postgresClient.save(table, RECORD1_ID, records.get(0))
.compose(r -> postgresClient.save(table, RECORD2_ID, records.get(1)))
.compose(r -> doPost(client, "/inventory-reindex-records/publish", pojo2JsonObject(publishRequestBody)))
.onComplete(ctx.succeeding(response -> ctx.verify(() -> assertEquals(HTTP_CREATED.toInt(), response.status()))))
.onComplete(ctx.succeeding(response -> ctx.completeNow()));

awaitAtMost().until(() -> KAFKA_CONSUMER.getMessagesForReindexRecord(rangeId),
hasSize(1));
assertThat(KAFKA_CONSUMER.getMessagesForReindexRecord(rangeId),
EVENT_MESSAGE_MATCHERS.hasReindexEventMessageFor());
}

@Test
@SneakyThrows
void post_shouldReturn201_whenPublishingInstancesForReindex(Vertx vertx,
VertxTestContext ctx) {
//prepare data for instances, holdings, items and boundWith
mockUserTenantsForNonConsortiumMember();
var client = vertx.createHttpClient();
var mainInstanceId = UUID.randomUUID();
var holdingsId = UUID.randomUUID();
var itemId = UUID.randomUUID();
var anotherInstanceId = UUID.randomUUID();
var consortiumInstanceId = UUID.randomUUID();

var mainInstance = createInstanceRequest(mainInstanceId, "TEST", "mm", new JsonArray(), new JsonArray(),
null, new JsonArray());
var holding = new JsonObject()
.put("id", holdingsId)
.put("instanceId", mainInstanceId);
var item = new ItemRequestBuilder()
.withId(itemId)
.forHolding(holdingsId)
.create();
var boundWith = new JsonObject().put("itemId", itemId).put("holdingsRecordId", holdingsId);
var anotherInstance = createInstanceRequest(anotherInstanceId, "TEST", "am", new JsonArray(), new JsonArray(),
null, new JsonArray());
var consortiumInstance = createInstanceRequest(consortiumInstanceId, "CONSORTIUM-TEST", "cm", new JsonArray(),
new JsonArray(), null, new JsonArray());

//prepare request
var rangeId = UUID.randomUUID().toString();
//sort created instance ids to have all three falling into request range
var instanceIds = Stream.of(mainInstanceId, consortiumInstanceId, anotherInstanceId)
.map(UUID::toString).sorted().toList();
var publishRequestBody = new PublishReindexRecords()
.withId(rangeId)
.withRecordType(PublishReindexRecords.RecordType.INSTANCE)
.withRecordIdsRange(
new RecordIdsRange().withFrom(instanceIds.get(0)).withTo(instanceIds.get(2)));

doPost(client, "/inventory-reindex-records/publish", pojo2JsonObject(publishRequestBody))
.onComplete(verifyStatus(ctx, HTTP_CREATED))
.onComplete(ctx.succeeding(response -> ctx.verify(() -> {
var jsonItem = new JsonObject();
jsonItem.put("id", rangeId);
reindexMessagePublished(jsonItem);
})))
var postgresClient = PostgresClient.getInstance(vertx, TENANT_ID);
//save entities sequentially
postgresClient.save(INSTANCE_TABLE, mainInstanceId.toString(), mainInstance)
//holdings, item, boundWith entities are all needed to fill the isBoundWith flag in kafka event
.compose(r -> postgresClient.save(HOLDING_TABLE, holdingsId.toString(), holding))
.compose(r -> postgresClient.save(ITEM_TABLE, itemId.toString(), item))
.compose(r -> postgresClient.save(INSTANCE_TABLE, anotherInstanceId.toString(), anotherInstance))
//create instance with consortium source to verify it's ignored
.compose(r -> postgresClient.save(INSTANCE_TABLE, consortiumInstanceId.toString(), consortiumInstance))
.compose(r -> postgresClient.save(BOUND_WITH_TABLE, boundWith))
//trigger records publishing
.compose(r -> doPost(client, "/inventory-reindex-records/publish", pojo2JsonObject(publishRequestBody)))
.onComplete(ctx.succeeding(response -> ctx.verify(() -> assertEquals(HTTP_CREATED.toInt(), response.status()))))
.onComplete(ctx.succeeding(response -> ctx.completeNow()));

//verify kafka event
awaitAtMost().until(() -> KAFKA_CONSUMER.getMessagesForReindexRecord(rangeId),
hasSize(1));

mainInstance.put("isBoundWith", true);
anotherInstance.put("isBoundWith", false);

var kafkaMessages = KAFKA_CONSUMER.getMessagesForReindexRecord(rangeId);

assertThat(kafkaMessages,
EVENT_MESSAGE_MATCHERS.hasReindexEventMessageFor());
var event = kafkaMessages.stream().toList().get(0).getBody();
var records = event.getJsonArray("records").stream()
.map(o -> new JsonObject((String) o))
.toList();
Assertions.assertThat(event.getString("recordType")).isEqualTo(PublishReindexRecords.RecordType.INSTANCE.value());
Assertions.assertThat(records).contains(mainInstance, anotherInstance);
}

private static Stream<Arguments> reindexTypesProvider() {
Expand All @@ -82,13 +155,4 @@ private static Stream<Arguments> reindexTypesProvider() {
List.of(new Holding().withId(RECORD1_ID), new Holding().withId(RECORD2_ID)))
);
}

private void reindexMessagePublished(JsonObject record) {
var id = record.getString("id");
awaitAtMost().until(() -> KAFKA_CONSUMER.getMessagesForReindexRecords(List.of(id)),
hasSize(1));

assertThat(KAFKA_CONSUMER.getMessagesForReindexRecords(List.of(id)),
EVENT_MESSAGE_MATCHERS.hasReindexEventMessageFor(record));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ public class EventMessage {
String tenant;
JsonObject newRepresentation;
JsonObject oldRepresentation;
JsonObject body;
Map<String, String> headers;

public static EventMessage fromConsumerRecord(
Expand All @@ -25,6 +26,7 @@ public static EventMessage fromConsumerRecord(
value.getString("tenant"),
value.getJsonObject("new"),
value.getJsonObject("old"),
value,
kafkaHeadersToMap(consumerRecord.headers()));
}
}
Loading

0 comments on commit ed806c0

Please sign in to comment.