diff --git a/packages/grpc-js/src/server-call.ts b/packages/grpc-js/src/server-call.ts index 107c2e3ef..8dcfb8bfd 100644 --- a/packages/grpc-js/src/server-call.ts +++ b/packages/grpc-js/src/server-call.ts @@ -863,7 +863,9 @@ export class Http2ServerCallStream< await this.pushOrBufferMessage(readable, decompressedMessage); } pendingMessageProcessing = false; - this.stream.resume(); + if (this.canPush) { + this.stream.resume(); + } await maybePushEnd(); }); diff --git a/packages/grpc-js/test/test-server.ts b/packages/grpc-js/test/test-server.ts index 48b305ef4..2b6ac3eb3 100644 --- a/packages/grpc-js/test/test-server.ts +++ b/packages/grpc-js/test/test-server.ts @@ -228,6 +228,63 @@ describe('Server', () => { }); }); + describe('reading backpressure', () => { + let client: ServiceClient; + const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto'); + const echoService = loadProtoFile(protoFile) + .EchoService as ServiceClientConstructor; + + const serviceImplementation = { + echo(call: ServerUnaryCall, callback: sendUnaryData) { + callback(null, call.request); + }, + echoBidiStream(call: ServerDuplexStream) { + setTimeout(() => { + call.resume() + call.end(); + + const http2Stream = (call as any).call as http2.ServerHttp2Stream; + const messagesToPush = (http2Stream as any).messagesToPush as Array; + assert.strictEqual(messagesToPush.length < 100, true); + }, 500); + }, + }; + + beforeEach(done => { + server.addService(echoService.service, serviceImplementation); + + server.bindAsync( + 'localhost:0', + ServerCredentials.createInsecure(), + (err, port) => { + assert.ifError(err); + client = new echoService( + `localhost:${port}`, + grpc.credentials.createInsecure() + ); + server.start(); + done(); + } + ); + }); + + afterEach(done => { + client.close(); + server.tryShutdown(done); + }); + + it('Should cancel open calls after the grace period ends', done => { + const call = client.echoBidiStream(); + call.on('end', () => { + done(); + }); + call.resume() + for (let i = 0; i < 10000; i++) { + call.write({value: 'abc'}); + } + }); + }); + describe('drain', () => { let client: ServiceClient; let portNumber: number;