Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] ASB consumer stops with "ReactorDispatcher instance is closed", only recovers after restart #25085

Closed
3 tasks done
p4p4 opened this issue Oct 28, 2021 · 15 comments
Closed
3 tasks done
Assignees
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Bus

Comments

@p4p4
Copy link

p4p4 commented Oct 28, 2021

Describe the bug
The behaviour looks quite similar to this issue, although we are using recent library versions:
Azure/azure-service-bus-java#335

Multiple (independent) applications, reading from different topics/queues stopped consuming messages from the servicebus roughly in the same time span. (see increase of active messages)
image

we recently upgraded to 'azure-messaging-servicebus' version: '7.4.1' and did not see this behaviour before, but shortly after upgrading it happened with 2 different servicebus namespaces.

Exception or Stack Trace

the logs of "orderdataservice" were showing the following errors:

ReactorDispatcher instance is closed.
...
Cannot add credits to closed link: dgl-s1-order-topic/subscriptions/dgl-s1-orderdataservice_e0f4d2_1635333545378
...
Operator called default onErrorDropped 
thrown_cause_extendedStackTrace
java.lang.IllegalStateException: Cannot add credits to closed link: dgl-s1-order-topic/subscriptions/dgl-s1-orderdataservice_e0f4d2_1635333545378
	at com.azure.core.amqp.implementation.ReactorReceiver.addCredits(ReactorReceiver.java:176) ~[azure-core-amqp-2.3.2.jar:2.3.2]
	at com.azure.messaging.servicebus.implementation.ServiceBusReceiveLinkProcessor.checkAndAddCredits(ServiceBusReceiveLinkProcessor.java:537) ~[azure-messaging-servicebus-7.4.1.jar:7.4.1]
	at com.azure.messaging.servicebus.implementation.ServiceBusReceiveLinkProcessor.onNext(ServiceBusReceiveLinkProcessor.java:242) ~[azure-messaging-servicebus-7.4.1.jar:7.4.1]
	at com.azure.messaging.servicebus.implementation.ServiceBusReceiveLinkProcessor.onNext(ServiceBusReceiveLinkProcessor.java:43) ~[azure-messaging-servicebus-7.4.1.jar:7.4.1]
	at reactor.core.publisher.FluxRepeatPredicate$RepeatPredicateSubscriber.onNext(FluxRepeatPredicate.java:86) ~[reactor-core-3.4.9.jar:3.4.9]
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180) ~[reactor-core-3.4.9.jar:3.4.9]
	at reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:99) ~[reactor-core-3.4.9.jar:3.4.9]
	at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onNext(FluxRetryWhen.java:174) ~[reactor-core-3.4.9.jar:3.4.9]
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210) ~[reactor-core-3.4.9.jar:3.4.9]
	at reactor.core.publisher.Operators$MonoSubscriber.request(Operators.java:1906) ~[reactor-core-3.4.9.jar:3.4.9]
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:144) ~[reactor-core-3.4.9.jar:3.4.9]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2158) ~[reactor-core-3.4.9.jar:3.4.9]
	at reactor.core.publisher.SerializedSubscriber.request(SerializedSubscriber.java:151) ~[reactor-core-3.4.9.jar:3.4.9]
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:139) ~[reactor-core-3.4.9.jar:3.4.9]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2158) ~[reactor-core-3.4.9.jar:3.4.9]
	at com.azure.messaging.servicebus.implementation.ServiceBusReceiveLinkProcessor.requestUpstream(ServiceBusReceiveLinkProcessor.java:413) ~[azure-messaging-servicebus-7.4.1.jar:7.4.1]
	at com.azure.messaging.servicebus.implementation.ServiceBusReceiveLinkProcessor.lambda$onNext$5(ServiceBusReceiveLinkProcessor.java:237) ~[azure-messaging-servicebus-7.4.1.jar:7.4.1]
	at reactor.core.publisher.LambdaSubscriber.onComplete(LambdaSubscriber.java:132) ~[reactor-core-3.4.9.jar:3.4.9]
	at reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber.onComplete(FluxSubscribeOn.java:166) ~[reactor-core-3.4.9.jar:3.4.9]
	at reactor.core.publisher.FluxDistinct$DistinctFuseableSubscriber.onComplete(FluxDistinct.java:501) ~[reactor-core-3.4.9.jar:3.4.9]
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:805) ~[reactor-core-3.4.9.jar:3.4.9]
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:898) ~[reactor-core-3.4.9.jar:3.4.9]
	at reactor.core.publisher.FluxReplay$ReplaySubscriber.onComplete(FluxReplay.java:1273) ~[reactor-core-3.4.9.jar:3.4.9]
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260) ~[reactor-core-3.4.9.jar:3.4.9]
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260) ~[reactor-core-3.4.9.jar:3.4.9]
	at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142) ~[reactor-core-3.4.9.jar:3.4.9]
	at reactor.core.publisher.FluxDistinctUntilChanged$DistinctUntilChangedSubscriber.onComplete(FluxDistinctUntilChanged.java:173) ~[reactor-core-3.4.9.jar:3.4.9]
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replayNormal(FluxReplay.java:805) ~[reactor-core-3.4.9.jar:3.4.9]
	at reactor.core.publisher.FluxReplay$SizeBoundReplayBuffer.replay(FluxReplay.java:898) ~[reactor-core-3.4.9.jar:3.4.9]
	at reactor.core.publisher.ReplayProcessor.tryEmitComplete(ReplayProcessor.java:465) ~[reactor-core-3.4.9.jar:3.4.9]
	at reactor.core.publisher.SinkManySerialized.tryEmitComplete(SinkManySerialized.java:64) ~[reactor-core-3.4.9.jar:3.4.9]
	at reactor.core.publisher.InternalManySink.emitComplete(InternalManySink.java:68) ~[reactor-core-3.4.9.jar:3.4.9]
	at com.azure.core.amqp.implementation.handler.Handler.close(Handler.java:132) ~[azure-core-amqp-2.3.2.jar:2.3.2]
	at com.azure.core.amqp.implementation.handler.LinkHandler.handleRemoteLinkClosed(LinkHandler.java:115) ~[azure-core-amqp-2.3.2.jar:2.3.2]
	at com.azure.core.amqp.implementation.handler.LinkHandler.onLinkRemoteClose(LinkHandler.java:61) ~[azure-core-amqp-2.3.2.jar:2.3.2]
	at com.azure.core.amqp.implementation.handler.ReceiveLinkHandler.onLinkRemoteClose(ReceiveLinkHandler.java:193) ~[azure-core-amqp-2.3.2.jar:2.3.2]
	at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:176) ~[proton-j-0.33.8.jar:?]
	at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108) ~[proton-j-0.33.8.jar:?]
	at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324) ~[proton-j-0.33.8.jar:?]
	at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291) ~[proton-j-0.33.8.jar:?]
	at com.azure.core.amqp.implementation.ReactorExecutor.run(ReactorExecutor.java:86) ~[azure-core-amqp-2.3.2.jar:2.3.2]
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) [reactor-core-3.4.9.jar:3.4.9]
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) [reactor-core-3.4.9.jar:3.4.9]
	at java.util.concurrent.FutureTask.run(Unknown Source) [?:?]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) [?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
	at java.lang.Thread.run(Unknown Source) [?:?]

the logs of "userdataservice" look similar:

ReactorDispatcher instance is closed.
...
Cannot add credits to closed link: dgl-s1-cdc-webhook-queue_a9432f_1634858677706
...
lockToken[eeac4396-6bb3-4e3e-b376-bcff2ecd15fc]. state[Accepted{}]. Cannot update disposition with no link.
...
Operator called default onErrorDropped
...
ReactorSender connectionId[MF_28cd08_1635368968533] linkName[dgl-s1-cdc-webhook-queuedgl-s1-cdc-webhook-queue]: Waiting for send and receive handler to be ACTIVE
Retries exhausted: 3/3

To Reproduce

  1. run consumer application
  2. wait for possible temporary network issue
  3. consumers do not recover automatically
  4. restart consumers -> everything works again.

Code Snippet
Consumer is created as a spring bean as follows and not closed anywhere

   @Bean
    public static ServiceBusProcessorClient serviceBusAfsQueueConsumer(
          @Value("${servicebus.afsQueue.connectionString}") String connectionString,
          @Value("${servicebus.afsQueue.name}") String afsQueueName,
          @Value("${servicebus.afsQueue.maxConcurrentThreads:#{1}}") int maxConcurrentThreads,
          Consumer<ServiceBusReceivedMessageContext> afsQueueAsbConsumer,
          Consumer<ServiceBusErrorContext> asbErrorLoggingConsumer) {

            ServiceBusProcessorClient client = new ServiceBusClientBuilder()
                  .connectionString(connectionString)
                  .processor()
                  .disableAutoComplete()
                  .queueName(afsQueueName)
                  .maxConcurrentCalls(maxConcurrentThreads)
                  .processMessage(afsQueueAsbConsumer)
                  .processError(asbErrorLoggingConsumer)
                  .buildProcessorClient();
            client.start();
        return client;
    }

Expected behavior
Consumers recover by themselves again after network issues and don't get closed.
(Note that our code nowhere explicitly closes processorclients)

Setup (please complete the following information):

  • Java 11
  • 'com.azure', name: 'azure-core', version: '1.20.0'
  • 'com.azure', name: 'azure-messaging-servicebus', version: '7.4.1'
  • running in docker in kubernetes cluster on Azure cloud (AKS)

Additional context

  • affected servicebus namespace names: epp-staging, epp-prod
  • date of incidents: Oct 27/28 2021

Information Checklist
Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report

  • Bug Description Added
  • Repro Steps Added
  • Setup information Added

Please let me know if I can provide more information to you.

@ghost ghost added needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team. customer-reported Issues that are reported by GitHub users external to the Azure organization. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that labels Oct 28, 2021
@p4p4
Copy link
Author

p4p4 commented Oct 29, 2021

The issue happened again for 2 applications. One thing I noticed about all the problems since the library upgrade from 7.2.0 to 7.4.1 is that only applications, which connect to more than 1 servicebus resource, seem to be affected, but maybe it is coincidence

Affected applications:

  • Orderdataservice reads from 1 topic subscription and 1 queue
  • Arvatofinanceservice reads from 1 topic and writes to 1 queue
  • userdataservice writes to 1 queue, reads from 1 queue and 1 topic subscription

2 other services, which just read from 1 ASB resource, are not affected by this problem so far.

@joshfree joshfree added Client This issue points to a problem in the data-plane of the library. Service Bus labels Oct 29, 2021
@ghost ghost removed the needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team. label Oct 29, 2021
@joshfree
Copy link
Member

Thanks for reporting this @p4p4. @anuchandy can you please follow up?

/cc @ki1729

@p4p4
Copy link
Author

p4p4 commented Nov 3, 2021

According to Azure Support (TrackingID#2110280050001032) there was an OS update of the servicebus at the time of the incidents, which did also lead to server errors, possibly this was the root cause of the problem, and the client from the sdk did not recover from this.

The support referred to

@anuchandy
Copy link
Member

Hi Patrick, thank you for the extra context on the service upgrade. Sharing the SDK DEBUG level logging during the time frame (around ~20 mins before and after the incident) will help us to understand activities in the link and what could lead the receiver to stop.

@p4p4
Copy link
Author

p4p4 commented Nov 4, 2021

Hi Anu,
last received message was logged at 2021-11-01 21:44:03.970 (UTC+0)

Error Logs during that time:
image

all logs >= level INFO
graylog-search-result-absolute-2021-11-01T21_20_00.000Z-2021-11-01T22_05_00.000Z.csv

unfortunately DEBUG logging was not active for the com.azure.* package name. I can activate it now, but maybe you'll find the INFO logs useful as well.
I can enable DEBUG logging now, and provide logs when the problem appears again.

@jiyongseong
Copy link
Member

Hi. My customer is experiencing the same issue. any update on this?

@anuchandy
Copy link
Member

@p4p4 Unfortunately, there is not enough info in the INFO log to confirm the reason for this to happen. The only thing I could find is, there is a graceful closure of endpoint (without error), the only known (fixed) issue we could map it to this one https://github.com/Azure/azure-sdk-for-java/blob/azure-core-amqp_2.3.3/sdk/core/azure-core-amqp/CHANGELOG.md#bugs-fixed, which is addressed in SB 7.4.2, but you're on 7.4.1. Again I'm unable to confirm this since the error pattern is available only in DEBUG level.

@jiyongseong the "ReactorDispatcher instance is closed" is misleading (I think we suppressed it in the recent version because it isn't actually the cause of errors, I need to check). If you have any DEBUG logs we can see what happened before this exception.

@jiyongseong
Copy link
Member

gaspicmsapp.log.zip
@anuchandy I got log file from customer and attached.

@anuchandy
Copy link
Member

@p4p4 multiple reliability issues were fixed since 7.4.1, including fixes in the recovery route that SDK goes through during service upgrades. Closing this ticket for now, please try out the latest 7.7.x, and if you still run into an issue, please reopen (with DEBUG logs).

@hargut
Copy link

hargut commented Apr 26, 2022

Hi @anuchandy,

unfortunately we are still seeing the error containing the following information with version 7.7.0:

  {"az.sdk.message":"","exception":"lockToken[07c24088-a743-4a5b-bfd8-c62badd66349]. state[Accepted{}]. Cannot update disposition with no link.","lockToken":"07c24088-a743-4a5b-bfd8-c62badd66349","deliveryState":"Accepted{}"}

We also have opened a ticket, and supplied the debug log details with TrackingID#2202140050000661 but it seems that the details did not reach the right team, or got stuck somewhere...

Thank you for all your efforts and enhancements in the stability area.

Best regards,
Harald

@anuchandy
Copy link
Member

Hi @hargut, Is this an intermittent error that does not stop the receiver, OR is it leaving the receiver in a state where it no longer produces messages?

I think what is happening here is - the message processing took some time, and by the time application is ready to complete, the original link that the message delivered no longer exists (e.g., a transient error), and the library created/ is creating a new link to continue to receive. It is impossible to complete a message on a link different from the one received.

Here is a related comment #26761 (comment)

@anuchandy
Copy link
Member

I double confirmed with the service bus team - Currently, completing a message relies on the lock token; each amqp-link objects track these tokens associated with the messages it produces. So by service design, once the amqp-link is closed (due to transient error, timeouts, etc..), those records do not exist anymore, rejecting completion of the related messages. As usual, uncompleted messages will be redelivered (as long as delivery_count <= max_delivery_count).

The error message you saw above ("Cannot update disposition with no link") is a client-side error because the client identified that the link no longer exists.

The general recommendation is - an application taking more than a couple of minutes should be designed to handle any re-delivery of the messages.

@hargut
Copy link

hargut commented May 3, 2022

Hi @anuchandy,

thank you very much for your clarifications.

As the stacktrace shows that this is related to the .complete() call on the ServiceBusReceiverAsyncClient we will change the flow to continue on this specific error, and allow for a re-delivery of the message.

Have a great time.

Best regards,
Harald

@maksatbolatov
Copy link

Hi @hargut

Do you have any progress on this issue? We are facing the same problem.

@srikarasr
Copy link

Hi @p4p4 If we are defining the @bean with arguments of processMessage, processError. How to pass these arguments when we are autowiring the bean?

@github-actions github-actions bot locked and limited conversation to collaborators Apr 11, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Bus
Projects
None yet
Development

No branches or pull requests

8 participants