Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ackSource fails when acknowledging to IBM MQ #2865

Closed
haroldpirum opened this issue May 16, 2022 · 4 comments
Closed

ackSource fails when acknowledging to IBM MQ #2865

haroldpirum opened this issue May 16, 2022 · 4 comments
Labels

Comments

@haroldpirum
Copy link
Contributor

haroldpirum commented May 16, 2022

I'm trying to connect to an ibm mq source using ack mode and am seeing the following error when attempting to batch up the messages. Is this a known issue?

[ERROR] [05/16/2022 11:36:42.681] [AckSourceApp-akka.actor.default-dispatcher-4] [JmsAckSourceStage$JmsAckSourceStageLogic(akka://AckSourceApp)] Error closing jms session
com.ibm.msg.client.jms.DetailedIllegalStateException: JMSCC0033: A synchronous method call is not permitted when a session is being used asynchronously: 'acknowledge'.
The JMS specification does not permit the use of a session for synchronous methods when asynchronous message delivery is running.
Create a separate session if you wish to use both synchronous methods and asynchronous delivery simultaneously.
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:78)
	at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:499)
	at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:480)
	at com.ibm.msg.client.commonservices.j2se.NLSServices.createException(NLSServices.java:319)
	at com.ibm.msg.client.commonservices.nls.NLSServices.createException(NLSServices.java:226)
	at com.ibm.msg.client.jms.internal.JmsErrorUtils.createException(JmsErrorUtils.java:126)
	at com.ibm.msg.client.jms.internal.JmsSessionImpl.checkSynchronousUsage(JmsSessionImpl.java:2923)
	at com.ibm.msg.client.jms.internal.JmsMessageImpl.acknowledge(JmsMessageImpl.java:2170)
	at com.ibm.jms.JMSMessage.acknowledge(JMSMessage.java:478)
	at akka.stream.alpakka.jms.impl.JmsAckSession.$anonfun$ack$1(Sessions.scala:92)
	at akka.stream.alpakka.jms.impl.JmsAckSession.drainAcks(Sessions.scala:123)
	at akka.stream.alpakka.jms.impl.JmsAckSession.stopMessageListenerAndCloseSession(Sessions.scala:100)
	at akka.stream.alpakka.jms.impl.JmsAckSession.closeSession(Sessions.scala:94
[mqsample.tar.gz](https://github.com/akka/alpakka/files/8699391/mqsample.tar.gz)
)
	at akka.stream.alpakka.jms.impl.JmsConnector.closeSession(JmsConnector.scala:290)
	at akka.stream.alpakka.jms.impl.JmsConnector.$anonfun$closeSessionsAsync$2(JmsConnector.scala:280)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:678)
	at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:467)
	at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
	at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:94)
	at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:295)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1016)
	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1665)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1598)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
com.ibm.msg.client.jms.DetailedIllegalStateException: JMSCC0033: A synchronous method call is not permitted when a session is being used asynchronously: 'acknowledge'.
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:78)
	at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:499)
	at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:480)
	at com.ibm.msg.client.commonservices.j2se.NLSServices.createException(NLSServices.java:319)
	at com.ibm.msg.client.commonservices.nls.NLSServices.createException(NLSServices.java:226)
	at com.ibm.msg.client.jms.internal.JmsErrorUtils.createException(JmsErrorUtils.java:126)
	at com.ibm.msg.client.jms.internal.JmsSessionImpl.checkSynchronousUsage(JmsSessionImpl.java:2923)
	at com.ibm.msg.client.jms.internal.JmsMessageImpl.acknowledge(JmsMessageImpl.java:2170)
	at com.ibm.jms.JMSMessage.acknowledge(JMSMessage.java:478)
	at akka.stream.alpakka.jms.impl.JmsAckSession.$anonfun$ack$1(Sessions.scala:92)
	at akka.stream.alpakka.jms.impl.JmsAckSession.drainAcks(Sessions.scala:123)
	at akka.stream.alpakka.jms.impl.JmsConnector.onTimer(JmsConnector.scala:218)
	at akka.stream.alpakka.jms.impl.JmsConnector.onTimer$(JmsConnector.scala:216)
	at akka.stream.alpakka.jms.impl.SourceStageLogic.onTimer(SourceStageLogic.scala:40)
	at akka.stream.stage.TimerGraphStageLogic.onInternalTimer(GraphStage.scala:1665)
	at akka.stream.stage.TimerGraphStageLogic.$anonfun$getTimerAsyncCallback$1(GraphStage.scala:1654)
	at akka.stream.stage.TimerGraphStageLogic.$anonfun$getTimerAsyncCallback$1$adapted(GraphStage.scala:1654)
	at akka.stream.impl.fusing.GraphInterpreter.runAsyncInput(GraphInterpreter.scala:467)
	at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:517)
@ennru ennru added the p:jms label May 20, 2022
@ennru
Copy link
Member

ennru commented May 28, 2022

This problem has not been reported before AFAIK. Did you find out how to avoid this in Alpakka?

@haroldpirum
Copy link
Contributor Author

I got around this by using transactional consumer and increasing parallelization by increasing sessions. Each session processes a single message at a time.

@leviramsey
Copy link
Contributor

This appears to be caused by an interaction between #2695 and IBM MQ.

@ennru
Copy link
Member

ennru commented Feb 29, 2024

Fixed with

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants