Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error when retrieving message with SqsListener: Unable to convert to JSON #1288

Open
stormxa-i8c opened this issue Dec 2, 2024 · 4 comments
Labels
component: sqs SQS integration related issue status: waiting-for-feedback Waiting for feedback from issuer

Comments

@stormxa-i8c
Copy link

stormxa-i8c commented Dec 2, 2024

Type: Bug

Component: SQS

Describe the bug
Spring Cloud AWS version: 3.2.0

I am testing a simple Spring Integration flow with SQS. In this test, we create mock SQS queues in Docker with LocalStack, send a message to one queue, let the flow send the message to the second queue, and then pick up the message from there. I am receiving an error when the test tries to retrieve the message from the second queue.

[2024-12-02 15:43:51.206] - 27528 ERROR [sdk-async-response-1-9] --- i.a.c.s.l.s.AbstractMessageConvertingMessageSource: Error converting message Message(MessageId=a180ab3b-2278-409c-a42c-0ef150fae893, ReceiptHandle=Y2NjM2U2MzktN2MzOS00YjQ2LWExZDAtY2FlNDdkZTJlNzNmIGFybjphd3M6c3FzOnVzLWVhc3QtMTowMDAwMDAwMDAwMDA6c3Rvcm14YS1yZXN1bHQtcXVldWUgYTE4MGFiM2ItMjI3OC00MDljLWE0MmMtMGVmMTUwZmFlODkzIDE3MzMxNTA2MzEuMTU2MjEyMw==, MD5OfBody=0f5fd36ebc7671562a7b91e47681cc8c, Body={"payload":"error: this is an error","headers":{"replyChannel":"nullChannel","errorChannel":"","id":"0f4fb9c9-c017-a0cb-89a1-61c9d9c6d4fd","timestamp":1733150631035}}, Attributes={ApproximateReceiveCount=1, SentTimestamp=1733150631155, SenderId=000000000000, ApproximateFirstReceiveTimestamp=1733150631156}, MD5OfMessageAttributes=3b4c2e873b0438e910219a49e49df6bc, MessageAttributes={JavaType=MessageAttributeValue(StringValue=org.springframework.messaging.support.GenericMessage, DataType=String), contentType=MessageAttributeValue(StringValue=application/json, DataType=String)}), ignoring.
org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Cannot construct instance of `org.springframework.messaging.support.GenericMessage` (no Creators, like default constructor, exist): cannot deserialize from Object value (no delegate- or property-based Creator)
 at [Source: REDACTED (`StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION` disabled); line: 1, column: 2]
	at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:256)
	at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:183)
	at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:174)
	at org.springframework.messaging.converter.CompositeMessageConverter.fromMessage(CompositeMessageConverter.java:57)
	at io.awspring.cloud.sqs.support.converter.AbstractMessagingMessageConverter.convertPayload(AbstractMessagingMessageConverter.java:182)
	at io.awspring.cloud.sqs.support.converter.AbstractMessagingMessageConverter.toMessagingMessage(AbstractMessagingMessageConverter.java:163)
	at io.awspring.cloud.sqs.listener.source.AbstractMessageConvertingMessageSource.convertMessage(AbstractMessageConvertingMessageSource.java:99)
	at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
	at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
	at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
	at io.awspring.cloud.sqs.listener.source.AbstractMessageConvertingMessageSource.convertMessages(AbstractMessageConvertingMessageSource.java:90)
	at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallMetricCollectionStage.lambda$execute$0(AsyncApiCallMetricCollectionStage.java:58)
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallTimeoutTrackingStage.lambda$execute$2(AsyncApiCallTimeoutTrackingStage.java:69)
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:177)
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$execute$0(MakeAsyncHttpRequestStage.java:110)
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.completeResponseFuture(MakeAsyncHttpRequestStage.java:253)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$executeHttpRequest$3(MakeAsyncHttpRequestStage.java:167)
	at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934)
	at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911)
	at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `org.springframework.messaging.support.GenericMessage` (no Creators, like default constructor, exist): cannot deserialize from Object value (no delegate- or property-based Creator)
 at [Source: REDACTED (`StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION` disabled); line: 1, column: 2]
	at com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:67)
	at com.fasterxml.jackson.databind.DeserializationContext.reportBadDefinition(DeserializationContext.java:1887)
	at com.fasterxml.jackson.databind.DatabindContext.reportBadDefinition(DatabindContext.java:414)
	at com.fasterxml.jackson.databind.DeserializationContext.handleMissingInstantiator(DeserializationContext.java:1375)
	at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1508)
	at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:348)
	at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:185)
	at com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:342)
	at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4905)
	at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3848)
	at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:251)
	... 45 common frames omitted

Sample
The main application is as follows:

@SpringBootApplication
public class SampleIntegrationApplication {

    @MessagingGateway(name = "QueueGateway")
    public interface QueueGateway {
	    @Gateway(requestChannel = "mainChannel")
	    void sendToQueue(String message);
    }

    @Autowired
    QueueGateway gateway;

    @Autowired
    SqsTemplate template;

    @SqsListener("input-queue")
    public void listenTestQueue(String message) {
	    gateway.sendToQueue(message);
    }

    public static void main(String[] args) throws Exception {
	    SpringApplication.run(SampleIntegrationApplication.class, args);
    }

    @Bean
    public IntegrationFlow sqsFlow() {
	    return IntegrationFlow
			    .from("mainChannel")
			    .filter((String message) -> message.toLowerCase().contains("error"))
			    .log(LoggingHandler.Level.WARN, "routing", m -> m)
			    .handle(m -> template.send(to -> to.queue("output-queue").payload(m)))
			    .get();
    }
}

In the test, the messages are sent to the queue as follows:

    sqsTemplate.send(to -> to.queue("input-queue").payload("error: this is an error"));
    sqsTemplate.send(to -> to.queue("input-queue").payload("This is okay"));

And the test includes the following listener:

@SqsListener("output-queue")
public void receiveStringMessage(String logMessage) {
    messageReceivedLatch.countDown();
    lastLogMessage = logMessage;
    logger.info("Received message: {}", logMessage);
}

In other discussions, I saw that the use of DevTools may have something to do with this, but the above application does not have DevTools as a dependency.

@maciejwalkowiak
Copy link
Contributor

Could you please publish this example as a full project I can run locally?

@stormxa-i8c
Copy link
Author

Hi, I've uploaded the code to: spring-integration-test

@tomazfernandes
Copy link
Contributor

@stormxa-i8c, this seems more like an issue in Spring Integration side - would you be able to reproduce this error without Spring Integration?

Did you reach out to them already?

Thanks.

@tomazfernandes tomazfernandes added component: sqs SQS integration related issue status: waiting-for-feedback Waiting for feedback from issuer labels Jan 1, 2025
@stormxa-i8c
Copy link
Author

Hi, sorry for the late response. I did not consider that it might be a Spring Integration issue, I will address it with them and post the solution here, if any.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
component: sqs SQS integration related issue status: waiting-for-feedback Waiting for feedback from issuer
Projects
None yet
Development

No branches or pull requests

3 participants