Skip to content

Commit

Permalink
allow write buffer to server stream
Browse files Browse the repository at this point in the history
  • Loading branch information
Sergey Saltykov committed Jan 30, 2024
1 parent 666a374 commit f338988
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 9 deletions.
6 changes: 3 additions & 3 deletions packages/grpc-js/src/object-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ export interface IntermediateObjectWritable<T> extends Writable {
}

export interface ObjectWritable<T> extends IntermediateObjectWritable<T> {
_write(chunk: T, encoding: string, callback: Function): void;
write(chunk: T, cb?: Function): boolean;
write(chunk: T, encoding?: any, cb?: Function): boolean;
_write(chunk: T | Buffer, encoding: string, callback: Function): void;
write(chunk: T | Buffer, cb?: Function): boolean;
write(chunk: T | Buffer, encoding?: any, cb?: Function): boolean;
setDefaultEncoding(encoding: string): this;
end(): ReturnType<Writable['end']> extends Writable ? this : void;
end(
Expand Down
35 changes: 29 additions & 6 deletions packages/grpc-js/src/server-call.ts
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ export class ServerWritableStreamImpl<RequestType, ResponseType>
}

_write(
chunk: ResponseType,
chunk: ResponseType | Buffer,
encoding: string,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
callback: (...args: any[]) => void
Expand Down Expand Up @@ -654,15 +654,38 @@ export class Http2ServerCallStream<
}
}

serializeMessage(value: ResponseType) {
serializeMessage(value: ResponseType | Buffer) {
// TODO(cjihrig): Call compression aware serializeMessage().

if (value instanceof Buffer) {
return this.serializeMessageHandleBuffer(value);
}

const messageBuffer = this.handler.serialize(value);
return this.addCompressionAndLength(messageBuffer);
}

// TODO(cjihrig): Call compression aware serializeMessage().
const byteLength = messageBuffer.byteLength;
private serializeMessageHandleBuffer(value: Buffer): Buffer {
const byteLength = value.byteLength;
// checking if this is a protobuf message or a gRPC frame
if (
byteLength >= 5 &&
(value.readUInt8(0) === 0 || value.readUint8(0) === 1) &&
value.readUInt32BE(1) === byteLength - 5
) {
return value;
}

return this.addCompressionAndLength(value);
}

private addCompressionAndLength(value: Buffer, compressed = false) {
const byteLength = value.byteLength;
const compressionByte = compressed ? 1 : 0;
const output = Buffer.allocUnsafe(byteLength + 5);
output.writeUInt8(0, 0);
output.writeUInt8(compressionByte, 0);
output.writeUInt32BE(byteLength, 1);
messageBuffer.copy(output, 5);
value.copy(output, 5);
return output;
}

Expand Down

0 comments on commit f338988

Please sign in to comment.