Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SqsAsyncClient is failing unpredictably during streaming #1466

Closed
jaylynstoesz opened this issue Oct 9, 2019 · 6 comments
Closed

SqsAsyncClient is failing unpredictably during streaming #1466

jaylynstoesz opened this issue Oct 9, 2019 · 6 comments
Labels
closing-soon This issue will close in 4 days unless further comments are made. investigating This issue is being investigated and/or work is in progress to resolve the issue. response-requested Waiting on additional info and feedback. Will move to "closing-soon" in 10 days.

Comments

@jaylynstoesz
Copy link

I'm reading from SQS via Alpakka using the async client. It appears to randomly stop receiving messages from the queue, anywhere from a few minutes to a few hours apart. It appears to happen more frequently when volume is higher, though not necessarily during peak times.

Expected Behavior

My service continuously polls messages from SQS, processes them, and deletes them from the queue. Invalid messages are sent to a dead letter queue. In the attached screenshot of the queue monitoring dashboard, I would expect the NumberOfMessagesReceived graph to match the NumberOfMessagesSent graph.

Current Behavior

The SQS client randomly throws java.io.IOException: Server failed to send complete response and stops polling messages.
Screen Shot 2019-10-09 at 1 29 35 PM

Steps to Reproduce (for bugs)

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.alpakka.sqs.javadsl.SqsSource
import akka.stream.alpakka.sqs.SqsSourceSettings
import akka.stream.javadsl.Sink
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.sqs.model.Message
import software.amazon.awssdk.services.sqs.SqsAsyncClient

//...

fun <T> subscribe() {
    /* Initialize Akka */
    val actorSystem = ActorSystem.create()

    /* Provision async SQS client */
    val sqsAsyncClient = SqsAsyncClient
        .builder()
        .httpClientBuilder(NettyNioAsyncHttpClient.builder())
        .region(Region.US_EAST_1)
        .build()

    /* Provision Alpakka SQS module (Akka Streams) */
    val source = SqsSource.create(
        "https://sqs.us-east-1.amazonaws.com/000000000000/queue-name",
        SqsSourceSettings.Defaults(),
        sqsAsyncClient
    )

    /* Akka receives between 1k - 1.5k messages per minute from SQS */
    source
        .map { message: Message -> message.messageId() }
        .runWith(Sink.ignore(), ActorMaterializer.create(actorSystem))
        .handle { _, error ->
            /* Unpredictably returns java.io.IOException: Server failed to send complete response */
            error?.printStackTrace()
        }
}

Error stack trace:

java.util.concurrent.CompletionException: software.amazon.awssdk.core.exception.SdkClientException
	at software.amazon.awssdk.utils.CompletableFutureUtils.errorAsCompletionException(CompletableFutureUtils.java:61)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncExecutionFailureExceptionReportingStage.lambda$execute$0(AsyncExecutionFailureExceptionReportingStage.java:51)
	at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
	at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
	at software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:75)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.retryErrorIfNeeded(AsyncRetryableStage.java:175)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.retryIfNeeded(AsyncRetryableStage.java:126)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.lambda$execute$0(AsyncRetryableStage.java:107)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$ResponseHandler.onError(MakeAsyncHttpRequestStage.java:249)
	at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.lambda$notifyIfResponseNotCompleted$2(ResponseHandler.java:397)
	at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.runAndLogError(ResponseHandler.java:180)
	at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.notifyIfResponseNotCompleted(ResponseHandler.java:397)
	at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.channelInactive(ResponseHandler.java:148)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236)
	at io.netty.handler.logging.LoggingHandler.channelInactive(LoggingHandler.java:167)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)	
at io.netty.channel.CombinedChannelDuplexHandler.channelInactive(CombinedChannelDuplexHandler.java:223)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236)
	at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:390)
	at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:355)
	at io.netty.handler.ssl.SslHandler.channelInactive(SslHandler.java:1078)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
	at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:278)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1403)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
	at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:912)
	at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:827)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:405)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:906)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at java.lang.Thread.run(Thread.java:748)
Caused by: software.amazon.awssdk.core.exception.SdkClientException
	at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:97)
	at software.amazon.awssdk.core.internal.util.ThrowableUtils.asSdkException(ThrowableUtils.java:98)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.retryIfNeeded(AsyncRetryableStage.java:125)
	... 52 more
Caused by: java.io.IOException: Server failed to send complete response
	at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.notifyIfResponseNotCompleted(ResponseHandler.java:396)
	... 43 more

Context

This service is designed to stream business-critical accounting events in order to respond to charges against customer accounts in real time. So, if the service stops polling SQS messages, this prevents us from responding as events occur and could incur significant costs to the business.

Your Environment

Kotlin: 1.3.41
Netty & SQS Java SDK: 2.9.15

CC: @debora-ito re: #1122 (comment)

@zoewangg
Copy link
Contributor

Thank you for the detailed report. I will try to reproduce it.

@zoewangg zoewangg added the investigating This issue is being investigated and/or work is in progress to resolve the issue. label Oct 14, 2019
@zoewangg
Copy link
Contributor

I was not able to reproduce java.io.IOException: Server failed to send complete response error with sqs async client unfortunately. I noticed you were using SqsSourceSettings.Defaults() and it set waitTimeSeconds to 20 seconds, which means the connection should be long-lived.

When you said it stopped receiving messages from the queue, did you mean all requests failed of server failed to send response or the client got stuck somehow?

I will continue investigating.

@zoewangg
Copy link
Contributor

#1476 has been merged to address the issue with unresusable connections getting reused and the change will be included in the next release.

@zoewangg
Copy link
Contributor

Hi @jaylynstoesz, a fix has been released as part of 2.9.24. Could you try with the latest version and see if the issue is resolved for your use case?

@zoewangg
Copy link
Contributor

Hi @jaylynstoesz are you still seeing the issue?

@zoewangg zoewangg added closing-soon This issue will close in 4 days unless further comments are made. needs-response labels Oct 30, 2019
@zoewangg
Copy link
Contributor

zoewangg commented Nov 5, 2019

Closing the issue due to no response. Feel free to reopen if you continue to see the issue with the latest version.

@zoewangg zoewangg closed this as completed Nov 5, 2019
@debora-ito debora-ito added response-requested Waiting on additional info and feedback. Will move to "closing-soon" in 10 days. and removed needs-response labels Feb 24, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
closing-soon This issue will close in 4 days unless further comments are made. investigating This issue is being investigated and/or work is in progress to resolve the issue. response-requested Waiting on additional info and feedback. Will move to "closing-soon" in 10 days.
Projects
None yet
Development

No branches or pull requests

3 participants