Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pubsub-emulator throw error when publish 50k messages. #992

Closed
mrdulin opened this issue May 6, 2020 · 8 comments
Closed

pubsub-emulator throw error when publish 50k messages. #992

mrdulin opened this issue May 6, 2020 · 8 comments
Assignees
Labels
api: pubsub Issues related to the googleapis/nodejs-pubsub API. priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.

Comments

@mrdulin
Copy link

mrdulin commented May 6, 2020

Environment details

  • OS: uname -a => Darwin US_C02WG0GXHV2V 17.7.0 Darwin Kernel Version 17.7.0: Thu Jan 23 07:05:23 PST 2020; root:xnu-4570.71.69~1/RELEASE_X86_64 x86_64
  • Node.js version: node -v => v10.16.2
  • npm version: npm -v => 6.14.4
  • @google-cloud/pubsub version: "@google-cloud/pubsub": "^1.7.3"

Description

When I tried to publish 50k messages, the pubsub-emulator throw below error infinitely:

Error stack

[pubsub] May 06, 2020 2:47:20 PM io.grpc.netty.NettyServerHandler onStreamError
[pubsub] 警告: Stream Error
[pubsub] io.netty.handler.codec.http2.Http2Exception$StreamException: Stream closed before write could take place
[pubsub]        at io.netty.handler.codec.http2.Http2Exception.streamError(Http2Exception.java:149)
[pubsub]        at io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController$FlowState.cancel(DefaultHttp2RemoteFlowController.java:481)
[pubsub]        at io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController$1.onStreamClosed(DefaultHttp2RemoteFlowController.java:105)
[pubsub]        at io.netty.handler.codec.http2.DefaultHttp2Connection.notifyClosed(DefaultHttp2Connection.java:356)
[pubsub]        at io.netty.handler.codec.http2.DefaultHttp2Connection$ActiveStreams.removeFromActiveStreams(DefaultHttp2Connection.java:1000)
[pubsub]        at io.netty.handler.codec.http2.DefaultHttp2Connection$ActiveStreams.deactivate(DefaultHttp2Connection.java:956)
[pubsub]        at io.netty.handler.codec.http2.DefaultHttp2Connection$DefaultStream.close(DefaultHttp2Connection.java:512)
[pubsub]        at io.netty.handler.codec.http2.DefaultHttp2Connection$DefaultStream.close(DefaultHttp2Connection.java:518)
[pubsub]        at io.netty.handler.codec.http2.Http2ConnectionHandler.closeStream(Http2ConnectionHandler.java:599)
[pubsub]        at io.netty.handler.codec.http2.Http2ConnectionHandler.processRstStreamWriteResult(Http2ConnectionHandler.java:872)
[pubsub]        at io.netty.handler.codec.http2.Http2ConnectionHandler.access$1000(Http2ConnectionHandler.java:66)
[pubsub]        at io.netty.handler.codec.http2.Http2ConnectionHandler$3.operationComplete(Http2ConnectionHandler.java:796)
[pubsub]        at io.netty.handler.codec.http2.Http2ConnectionHandler$3.operationComplete(Http2ConnectionHandler.java:793)
[pubsub]        at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:502)
[pubsub]        at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:476)
[pubsub]        at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:415)
[pubsub]        at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:540)
[pubsub]        at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:529)
[pubsub]        at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:101)
[pubsub]        at io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48)
[pubsub]        at io.netty.channel.ChannelOutboundBuffer.safeSuccess(ChannelOutboundBuffer.java:703)
[pubsub]        at io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:258)
[pubsub]        at io.netty.channel.ChannelOutboundBuffer.removeBytes(ChannelOutboundBuffer.java:338)
[pubsub]        at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:428)
[pubsub]        at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:939)
[pubsub]        at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:360)
[pubsub]        at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:906)
[pubsub]        at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1370)
[pubsub]        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:739)
[pubsub]        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:731)
[pubsub]        at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:717)
[pubsub]        at io.netty.handler.logging.LoggingHandler.flush(LoggingHandler.java:265)
[pubsub]        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:739)
[pubsub]        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:731)
[pubsub]        at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:717)
[pubsub]        at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
[pubsub]        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:739)
[pubsub]        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:731)
[pubsub]        at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:717)
[pubsub]        at io.netty.handler.codec.http2.Http2ConnectionHandler.flush(Http2ConnectionHandler.java:201)
[pubsub]        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:739)
[pubsub]        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:731)
[pubsub]        at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:717)
[pubsub]        at io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:978)
[pubsub]        at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:253)
[pubsub]        at io.grpc.netty.WriteQueue.flush(WriteQueue.java:118)
[pubsub]        at io.grpc.netty.WriteQueue.access$000(WriteQueue.java:32)
[pubsub]        at io.grpc.netty.WriteQueue$1.run(WriteQueue.java:44)
[pubsub]        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
[pubsub]        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
[pubsub]        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:495)
[pubsub]        at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
[pubsub]        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[pubsub]        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[pubsub]        at java.lang.Thread.run(Thread.java:748)
[pubsub] 
[pubsub] May 06, 2020 2:47:20 PM io.grpc.netty.NettyServerHandler onStreamError
[pubsub] 警告: Stream Error
[pubsub] io.netty.handler.codec.http2.Http2Exception$StreamException: Stream closed before write could take place
[pubsub]        at io.netty.handler.codec.http2.Http2Exception.streamError(Http2Exception.java:149)
[pubsub]        at io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController$FlowState.cancel(DefaultHttp2RemoteFlowController.java:481)
[pubsub]        at io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController$1.onStreamClosed(DefaultHttp2RemoteFlowController.java:105)
[pubsub]        at io.netty.handler.codec.http2.DefaultHttp2Connection.notifyClosed(DefaultHttp2Connection.java:356)
[pubsub]        at io.netty.handler.codec.http2.DefaultHttp2Connection$ActiveStreams.removeFromActiveStreams(DefaultHttp2Connection.java:1000)
[pubsub]        at io.netty.handler.codec.http2.DefaultHttp2Connection$ActiveStreams.deactivate(DefaultHttp2Connection.java:956)
[pubsub]        at io.netty.handler.codec.http2.DefaultHttp2Connection$DefaultStream.close(DefaultHttp2Connection.java:512)
[pubsub]        at io.netty.handler.codec.http2.DefaultHttp2Connection$DefaultStream.close(DefaultHttp2Connection.java:518)
[pubsub]        at io.netty.handler.codec.http2.Http2ConnectionHandler.closeStream(Http2ConnectionHandler.java:599)
[pubsub]        at io.netty.handler.codec.http2.Http2ConnectionHandler.processRstStreamWriteResult(Http2ConnectionHandler.java:872)
[pubsub]        at io.netty.handler.codec.http2.Http2ConnectionHandler.access$1000(Http2ConnectionHandler.java:66)
[pubsub]        at io.netty.handler.codec.http2.Http2ConnectionHandler$3.operationComplete(Http2ConnectionHandler.java:796)
[pubsub]        at io.netty.handler.codec.http2.Http2ConnectionHandler$3.operationComplete(Http2ConnectionHandler.java:793)
[pubsub]        at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:502)
[pubsub]        at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:476)
[pubsub]        at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:415)
[pubsub]        at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:540)
[pubsub]        at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:529)
[pubsub]        at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:101)
[pubsub]        at io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48)
[pubsub]        at io.netty.channel.ChannelOutboundBuffer.safeSuccess(ChannelOutboundBuffer.java:703)
[pubsub]        at io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:258)
[pubsub]        at io.netty.channel.ChannelOutboundBuffer.removeBytes(ChannelOutboundBuffer.java:338)
[pubsub]        at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:428)
[pubsub]        at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:939)
[pubsub]        at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:360)
[pubsub]        at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:906)
[pubsub]        at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1370)
[pubsub]        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:739)
[pubsub]        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:731)
[pubsub]        at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:717)
[pubsub]        at io.netty.handler.logging.LoggingHandler.flush(LoggingHandler.java:265)
[pubsub]        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:739)
[pubsub]        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:731)
[pubsub]        at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:717)
[pubsub]        at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
[pubsub]        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:739)
[pubsub]        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:731)
[pubsub]        at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:717)
[pubsub]        at io.netty.handler.codec.http2.Http2ConnectionHandler.flush(Http2ConnectionHandler.java:201)
[pubsub]        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:739)
[pubsub]        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:731)
[pubsub]        at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:717)
[pubsub]        at io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:978)
[pubsub]        at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:253)
[pubsub]        at io.grpc.netty.WriteQueue.flush(WriteQueue.java:118)
[pubsub]        at io.grpc.netty.WriteQueue.access$000(WriteQueue.java:32)
[pubsub]        at io.grpc.netty.WriteQueue$1.run(WriteQueue.java:44)
[pubsub]        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
[pubsub]        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
[pubsub]        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:495)
[pubsub]        at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
[pubsub]        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[pubsub]        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[pubsub]        at java.lang.Thread.run(Thread.java:748)
[pubsub] 

My publisher also throws many same errors:

{ Error: Retry total timeout exceeded before any response was received
    at repeat (/Users/ldu020/workspace/xxx/xxx-master/workflow/node_modules/google-gax/src/normalCalls/retries.ts:83:23)
    at Timeout.setTimeout (/Users/ldu020/workspace/xxx/xxx-master/workflow/node_modules/google-gax/src/normalCalls/retries.ts:124:13)
    at ontimeout (timers.js:436:11)
    at tryOnTimeout (timers.js:300:5)
    at listOnTimeout (timers.js:263:5)
    at Timer.processTimers (timers.js:223:10) code: 4 }
{ Error: Retry total timeout exceeded before any response was received
    at repeat (/Users/ldu020/workspace/xxx/xxx-master/workflow/node_modules/google-gax/src/normalCalls/retries.ts:83:23)
    at Timeout.setTimeout (/Users/ldu020/workspace/xxx/xxx-master/workflow/node_modules/google-gax/src/normalCalls/retries.ts:124:13)
    at ontimeout (timers.js:436:11)
    at tryOnTimeout (timers.js:300:5)
    at listOnTimeout (timers.js:263:5)
    at Timer.processTimers (timers.js:223:10) code: 4 }

My subscriber throw below errors:

[2020-05-06T06:29:08.932Z]Received message 46689:
Data: message payload 3998
Attributes: {}
[2020-05-06T06:29:08.932Z]Received message 46690:
Data: message payload 3999
Attributes: {}
ERROR: Error: Failed to "acknowledge" for 500 message(s). Reason: 4 DEADLINE_EXCEEDED: Deadline exceeded
ERROR: Error: Failed to "acknowledge" for 100 message(s). Reason: 4 DEADLINE_EXCEEDED: Deadline exceeded
ERROR: Error: Failed to "acknowledge" for 200 message(s). Reason: 4 DEADLINE_EXCEEDED: Deadline exceeded
ERROR: Error: Failed to "modifyAckDeadline" for 400 message(s). Reason: 4 DEADLINE_EXCEEDED: Deadline exceeded
ERROR: Error: Failed to "modifyAckDeadline" for 500 message(s). Reason: 4 DEADLINE_EXCEEDED: Deadline exceeded
ERROR: Error: Failed to "modifyAckDeadline" for 100 message(s). Reason: 4 DEADLINE_EXCEEDED: Deadline exceeded
ERROR: Error: Failed to "modifyAckDeadline" for 200 message(s). Reason: 4 DEADLINE_EXCEEDED: Deadline exceeded
ERROR: Error: Failed to connect to channel. Reason: Failed to connect before the deadline

Actually, I don't know it's a bug or I am using pubsub incorrectly. So I decide to post this as a github issue. I found there are some questions on stackoverflow and issues on github:

But didn't find a solution.

Steps to reproduce

I made a minimal code example for reproducing it. Please tell what's going on? Thanks!

Repo: https://github.com/mrdulin/nodejs-gcp/tree/master/src/pubsub/pubsub-emulator

  1. Start pubsub-emulator
gcloud beta emulators pubsub start --project=$PROJECT_ID
  1. Create topic:
npx ts-node ./publisher.ts create pubsub-emulator-t1
  1. Create a subscription for the topic:
npx ts-node ./subscriber.ts create pubsub-emulator-t1 pubsub-emulator-t1-sub
  1. Listen for the messages:
npx ts-node ./subscriber.ts receive pubsub-emulator-t1-sub
  1. Publish 50k messages:
npx ts-node ./publisher.ts publish pubsub-emulator-t1

Additional information

I also got the error: Retry total timeout exceeded before any response was received in production environments. So it may not be a problem of pubsub-emulator

@google/pubsub version: "@google-cloud/pubsub": "^1.6.0"

That's why I am trying to make an example to reproduce it. But can't figure out what's going on.
The only way I can reproduce this error now is to post 50k messages.

Test cases: https://gist.github.com/mrdulin/79f1689a9baaafaef90fcad42646bf6d

@product-auto-label product-auto-label bot added the api: pubsub Issues related to the googleapis/nodejs-pubsub API. label May 6, 2020
@yoshi-automation yoshi-automation added the triage me I really want to be triaged. label May 6, 2020
@meredithslota meredithslota added priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. and removed triage me I really want to be triaged. labels May 8, 2020
@feywind
Copy link
Collaborator

feywind commented May 8, 2020

@mrdulin Thanks for the detailed write-up! Just based on the Java error trace at the top, I'm guessing this is something with the Pub/Sub emulator, but I'll try out your repro steps.

@mrdulin
Copy link
Author

mrdulin commented May 11, 2020

@feywind ok. Thanks. But I also hit this issue in the production environment. If I publish 50K messages without using Promise.all, just use for loop and async/await. The issue is gone.
But it will block the execution of subsequent logic because of async/await until all messages have been published.

const n = 50 * 1000;
for (let i = 0; i < n; i++) {
    const data = `message payload ${i}`;
    const dataBuffer = Buffer.from(data);
    await topic.publish(dataBuffer)
}
// some logic ...
res.json(data);

Any suggestions about how to publish a huge amount of messages without blocking the execution of subsequent logic? Do I need to use child_process or some queue like bull to publish the huge amount of messages in the background without blocking request/response workflow of the API? This means I need to respond to the front-end as soon as possible, the 50k messages should be the background tasks.

@feywind
Copy link
Collaborator

feywind commented May 19, 2020

Realistically you should be able to just fire and forget (unless the Node process is terminated or something) so it does kind of seem like there's some library issue there. We're going to be pulling in some much newer versions of the pieces under the Pub/Sub library soon with the 2.0 release of the Pub/Sub library. I'm hoping that will help with some of these issues. I think the plan is to release that this week.

@feywind
Copy link
Collaborator

feywind commented May 21, 2020

@google-cloud/pubsub 2.0.0 is now released, so if you're on Node 10+, it might be worth trying that to see if the newer versions of gRPC/gax help anything.

@mrdulin
Copy link
Author

mrdulin commented May 22, 2020

@feywind ok. thanks. I will try it.

@stephenplusplus
Copy link
Contributor

@mrdulin Just checking in to see if you're still experiencing these issues?

@feywind
Copy link
Collaborator

feywind commented Jul 21, 2020

There hasn't been any movement on this for a while, so I'm going to go ahead and close it. (Please feel free to re-open if you need any other help with it.)

@feywind feywind closed this as completed Jul 21, 2020
@mrdulin
Copy link
Author

mrdulin commented Aug 7, 2020

@stephenplusplus @feywind The issue is gone when I upgraded the client library to "@google-cloud/pubsub": "^2.3.0"

feywind pushed a commit to feywind/nodejs-pubsub that referenced this issue Nov 12, 2024
Added a test case for the new close function
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the googleapis/nodejs-pubsub API. priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Projects
None yet
Development

No branches or pull requests

5 participants