diff --git a/internal/models/signalenrichment/src/main/java/org/eclipse/ditto/internal/models/signalenrichment/DittoCachingSignalEnrichmentFacade.java b/internal/models/signalenrichment/src/main/java/org/eclipse/ditto/internal/models/signalenrichment/DittoCachingSignalEnrichmentFacade.java index 1f8addd6ba..8eb226661e 100644 --- a/internal/models/signalenrichment/src/main/java/org/eclipse/ditto/internal/models/signalenrichment/DittoCachingSignalEnrichmentFacade.java +++ b/internal/models/signalenrichment/src/main/java/org/eclipse/ditto/internal/models/signalenrichment/DittoCachingSignalEnrichmentFacade.java @@ -219,8 +219,9 @@ private CompletableFuture smartUpdateCachedObject(final SignalEnrich final var nextExpectedThingEventsParameters = new CachingParameters(fieldSelector, thingEvents, invalidateCacheOnPolicyChange, cachingParameters.minAcceptableSeqNr); - result = handleNextExpectedThingEvents(cacheKey, JsonObject.empty(), nextExpectedThingEventsParameters) - .toCompletableFuture(); + result = doCacheLookup(cacheKey, dittoHeaders).thenCompose( + cachedJsonObject -> handleNextExpectedThingEvents(cacheKey, cachedJsonObject, + nextExpectedThingEventsParameters)); } else { // there are twin events; perform smart update result = doCacheLookup(cacheKey, dittoHeaders).thenCompose( diff --git a/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/EnforcementFlowTest.java b/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/EnforcementFlowTest.java index 4d1c068c42..319dbac615 100644 --- a/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/EnforcementFlowTest.java +++ b/thingsearch/service/src/test/java/org/eclipse/ditto/thingsearch/service/persistence/write/streaming/EnforcementFlowTest.java @@ -253,6 +253,8 @@ public void computeThingCacheValueFromThingEvents() { sourceProbe.sendNext(inputMap); sourceProbe.sendComplete(); + thingsProbe.expectMsgClass(Duration.apply(30, TimeUnit.SECONDS), SudoRetrieveThing.class); + thingsProbe.reply(SudoRetrieveThingResponse.of(JsonObject.empty(), DittoHeaders.empty())); // WHEN: policy is retrieved with up-to-date revisions policiesProbe.expectMsgClass(Duration.apply(30, TimeUnit.SECONDS), SudoRetrievePolicy.class); final var policy = Policy.newBuilder(policyId).setRevision(1).build(); @@ -319,6 +321,10 @@ public void computeThingCacheValueFromThingEventsWhenLastEventWasDeleted() { sourceProbe.sendNext(inputMap); sourceProbe.sendComplete(); + // WHEN + thingsProbe.expectMsgClass(Duration.apply(30, TimeUnit.SECONDS), SudoRetrieveThing.class); + thingsProbe.reply(SudoRetrieveThingResponse.of(JsonObject.empty(), DittoHeaders.empty())); + // THEN: the write model contains up-to-date revisions final AbstractWriteModel deleteModel = sinkProbe.expectNext().get(0); sinkProbe.expectComplete();