From 96cdbd5b542c86ef490358ee55183978ee3f3601 Mon Sep 17 00:00:00 2001 From: Aleksandar Stanchev Date: Mon, 15 Jan 2024 13:44:59 +0200 Subject: [PATCH 1/2] fix _policy enrichment on ThingCreated event Signed-off-by: Aleksandar Stanchev --- .../signalenrichment/DittoCachingSignalEnrichmentFacade.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/internal/models/signalenrichment/src/main/java/org/eclipse/ditto/internal/models/signalenrichment/DittoCachingSignalEnrichmentFacade.java b/internal/models/signalenrichment/src/main/java/org/eclipse/ditto/internal/models/signalenrichment/DittoCachingSignalEnrichmentFacade.java index 1f8addd6ba..8eb226661e 100644 --- a/internal/models/signalenrichment/src/main/java/org/eclipse/ditto/internal/models/signalenrichment/DittoCachingSignalEnrichmentFacade.java +++ b/internal/models/signalenrichment/src/main/java/org/eclipse/ditto/internal/models/signalenrichment/DittoCachingSignalEnrichmentFacade.java @@ -219,8 +219,9 @@ private CompletableFuture smartUpdateCachedObject(final SignalEnrich final var nextExpectedThingEventsParameters = new CachingParameters(fieldSelector, thingEvents, invalidateCacheOnPolicyChange, cachingParameters.minAcceptableSeqNr); - result = handleNextExpectedThingEvents(cacheKey, JsonObject.empty(), nextExpectedThingEventsParameters) - .toCompletableFuture(); + result = doCacheLookup(cacheKey, dittoHeaders).thenCompose( + cachedJsonObject -> handleNextExpectedThingEvents(cacheKey, cachedJsonObject, + nextExpectedThingEventsParameters)); } else { // there are twin events; perform smart update result = doCacheLookup(cacheKey, dittoHeaders).thenCompose( From 242aa1d796c528ede3b7ca5f0d240d8536116240 Mon Sep 17 00:00:00 2001 From: Aleksandar Stanchev Date: Mon, 15 Jan 2024 14:45:02 +0200 Subject: [PATCH 2/2] update EnforcementFlowTest tests to supports updated cache retrieval for _policy enrichment Signed-off-by: Aleksandar Stanchev --- .../persistence/write/streaming/EnforcementFlowTest.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/EnforcementFlowTest.java b/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/EnforcementFlowTest.java index 4d1c068c42..319dbac615 100644 --- a/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/EnforcementFlowTest.java +++ b/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/EnforcementFlowTest.java @@ -253,6 +253,8 @@ public void computeThingCacheValueFromThingEvents() { sourceProbe.sendNext(inputMap); sourceProbe.sendComplete(); + thingsProbe.expectMsgClass(Duration.apply(30, TimeUnit.SECONDS), SudoRetrieveThing.class); + thingsProbe.reply(SudoRetrieveThingResponse.of(JsonObject.empty(), DittoHeaders.empty())); // WHEN: policy is retrieved with up-to-date revisions policiesProbe.expectMsgClass(Duration.apply(30, TimeUnit.SECONDS), SudoRetrievePolicy.class); final var policy = Policy.newBuilder(policyId).setRevision(1).build(); @@ -319,6 +321,10 @@ public void computeThingCacheValueFromThingEventsWhenLastEventWasDeleted() { sourceProbe.sendNext(inputMap); sourceProbe.sendComplete(); + // WHEN + thingsProbe.expectMsgClass(Duration.apply(30, TimeUnit.SECONDS), SudoRetrieveThing.class); + thingsProbe.reply(SudoRetrieveThingResponse.of(JsonObject.empty(), DittoHeaders.empty())); + // THEN: the write model contains up-to-date revisions final AbstractWriteModel deleteModel = sinkProbe.expectNext().get(0); sinkProbe.expectComplete();