diff --git a/packages/grpc-js/src/server-interceptors.ts b/packages/grpc-js/src/server-interceptors.ts index c03f3028c..405f526bf 100644 --- a/packages/grpc-js/src/server-interceptors.ts +++ b/packages/grpc-js/src/server-interceptors.ts @@ -39,6 +39,11 @@ function trace(text: string) { logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text); } +interface GrpcFrame { + infoBytes: Buffer; + payload: Buffer; +} + export interface ServerMetadataListener { (metadata: Metadata, next: (metadata: Metadata) => void): void; } @@ -613,16 +618,15 @@ export class BaseServerInterceptingCall implements ServerInterceptingCallInterfa * @param value * @returns */ - private serializeMessage(value: any) { - const messageBuffer = this.handler.serialize(value); - const byteLength = messageBuffer.byteLength; - const output = Buffer.allocUnsafe(byteLength + 5); + private serializeMessage(value: any): GrpcFrame { + const payload = this.handler.serialize(value); + + const infoBytes = Buffer.allocUnsafe(5); /* Note: response compression is currently not supported, so this * compressed bit is always 0. */ - output.writeUInt8(0, 0); - output.writeUInt32BE(byteLength, 1); - messageBuffer.copy(output, 5); - return output; + infoBytes.writeUInt8(0, 0); + infoBytes.writeUInt32BE(payload.byteLength, 1); + return { infoBytes, payload }; } private decompressMessage( @@ -740,47 +744,70 @@ export class BaseServerInterceptingCall implements ServerInterceptingCallInterfa }; this.stream.respond(headers, defaultResponseOptions); } + private createFrameWriteErrorCallback( + callback: () => void + ): (err: unknown) => void { + let errored = false; + let written = 0; + return (error: unknown) => { + if (errored) { + return; + } + if (error) { + this.sendStatus({ + code: Status.INTERNAL, + details: `Error writing message: ${getErrorMessage(error)}`, + metadata: null, + }); + errored = true; + return; + } + written++; + if (written !== 2) { + return; + } + this.callEventTracker?.addMessageSent(); + callback(); + }; + } sendMessage(message: any, callback: () => void): void { if (this.checkCancelled()) { return; } - let response: Buffer; + let frame: GrpcFrame; try { - response = this.serializeMessage(message); + frame = this.serializeMessage(message); } catch (e) { this.sendStatus({ code: Status.INTERNAL, details: `Error serializing response: ${getErrorMessage(e)}`, - metadata: null + metadata: null, }); return; } if ( this.maxSendMessageSize !== -1 && - response.length - 5 > this.maxSendMessageSize + frame.payload.length > this.maxSendMessageSize ) { this.sendStatus({ code: Status.RESOURCE_EXHAUSTED, - details: `Sent message larger than max (${response.length} vs. ${this.maxSendMessageSize})`, - metadata: null + details: `Sent message larger than max (${frame.payload.length} vs. ${this.maxSendMessageSize})`, + metadata: null, }); return; } this.maybeSendMetadata(); - trace('Request to ' + this.handler.path + ' sent data frame of size ' + response.length); - this.stream.write(response, error => { - if (error) { - this.sendStatus({ - code: Status.INTERNAL, - details: `Error writing message: ${getErrorMessage(error)}`, - metadata: null - }); - return; - } - this.callEventTracker?.addMessageSent(); - callback(); - }); + trace( + 'Request to ' + + this.handler.path + + ' sent data frame of size ' + + frame.payload.length + ); + + const cb = this.createFrameWriteErrorCallback(callback); + this.stream.write(frame.infoBytes, cb); + this.stream.write(frame.payload, cb); } sendStatus(status: PartialStatusObject): void { if (this.checkCancelled()) {