From c0d060f864f7bc7797a04d5c0776b9a0f0c37503 Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Tue, 22 Jun 2021 13:37:38 -0700 Subject: [PATCH] [ServiceBus] Cache UpdateDispositionWorkItem Mono (#22317) --- .../servicebus/implementation/ServiceBusReactorReceiver.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.java index ca77bbf32802c..c9a29d5e15bc9 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.java @@ -239,7 +239,7 @@ private Mono updateDispositionInternal(String lockToken, DeliveryState del } final UpdateDispositionWorkItem workItem = new UpdateDispositionWorkItem(lockToken, deliveryState, timeout); - final Mono result = Mono.create(sink -> { + final Mono result = Mono.create(sink -> { workItem.start(sink); try { provider.getReactorDispatcher().invoke(() -> { @@ -250,7 +250,7 @@ private Mono updateDispositionInternal(String lockToken, DeliveryState del sink.error(new AmqpException(false, "updateDisposition failed while dispatching to Reactor.", error, handler.getErrorContext(receiver))); } - }); + }).cache(); // cache because closeAsync use `when` to subscribe this Mono again. workItem.setMono(result);