diff --git a/packages/grpc-js/src/call-stream.ts b/packages/grpc-js/src/call-stream.ts index c987d9320..7c06c335c 100644 --- a/packages/grpc-js/src/call-stream.ts +++ b/packages/grpc-js/src/call-stream.ts @@ -26,6 +26,7 @@ import {Filter} from './filter'; import {FilterStackFactory} from './filter-stack'; import {Metadata} from './metadata'; import {ObjectDuplex, WriteCallback} from './object-stream'; +import {StreamDecoder} from './stream-decoder'; const {HTTP2_HEADER_STATUS, HTTP2_HEADER_CONTENT_TYPE, NGHTTP2_CANCEL} = http2.constants; @@ -77,12 +78,6 @@ export type Call = { EmitterAugmentation1<'status', StatusObject>& ObjectDuplex; -enum ReadState { - NO_DATA, - READING_SIZE, - READING_MESSAGE -} - export class Http2CallStream extends Duplex implements Call { credentials: CallCredentials = CallCredentials.createEmpty(); filterStack: Filter; @@ -92,13 +87,7 @@ export class Http2CallStream extends Duplex implements Call { private pendingWriteCallback: WriteCallback|null = null; private pendingFinalCallback: Function|null = null; - private readState: ReadState = ReadState.NO_DATA; - private readCompressFlag: Buffer = Buffer.alloc(1); - private readPartialSize: Buffer = Buffer.alloc(4); - private readSizeRemaining = 4; - private readMessageSize = 0; - private readPartialMessage: Buffer[] = []; - private readMessageRemaining = 0; + private decoder = new StreamDecoder(); private isReadFilterPending = false; private canPush = false; @@ -292,62 +281,10 @@ export class Http2CallStream extends Duplex implements Call { }); stream.on('trailers', this.handleTrailers.bind(this)); stream.on('data', (data: Buffer) => { - let readHead = 0; - let toRead: number; - while (readHead < data.length) { - switch (this.readState) { - case ReadState.NO_DATA: - this.readCompressFlag = data.slice(readHead, readHead + 1); - readHead += 1; - this.readState = ReadState.READING_SIZE; - this.readPartialSize.fill(0); - this.readSizeRemaining = 4; - this.readMessageSize = 0; - this.readMessageRemaining = 0; - this.readPartialMessage = []; - break; - case ReadState.READING_SIZE: - toRead = Math.min(data.length - readHead, this.readSizeRemaining); - data.copy( - this.readPartialSize, 4 - this.readSizeRemaining, readHead, - readHead + toRead); - this.readSizeRemaining -= toRead; - readHead += toRead; - // readSizeRemaining >=0 here - if (this.readSizeRemaining === 0) { - this.readMessageSize = this.readPartialSize.readUInt32BE(0); - this.readMessageRemaining = this.readMessageSize; - if (this.readMessageRemaining > 0) { - this.readState = ReadState.READING_MESSAGE; - } else { - this.tryPush(Buffer.concat( - [this.readCompressFlag, this.readPartialSize])); - this.readState = ReadState.NO_DATA; - } - } - break; - case ReadState.READING_MESSAGE: - toRead = - Math.min(data.length - readHead, this.readMessageRemaining); - this.readPartialMessage.push( - data.slice(readHead, readHead + toRead)); - this.readMessageRemaining -= toRead; - readHead += toRead; - // readMessageRemaining >=0 here - if (this.readMessageRemaining === 0) { - // At this point, we have read a full message - const framedMessageBuffers = [ - this.readCompressFlag, this.readPartialSize - ].concat(this.readPartialMessage); - const framedMessage = Buffer.concat( - framedMessageBuffers, this.readMessageSize + 5); - this.tryPush(framedMessage); - this.readState = ReadState.NO_DATA; - } - break; - default: - throw new Error('This should never happen'); - } + const message = this.decoder.write(data); + + if (message !== null) { + this.tryPush(message); } }); stream.on('end', () => { diff --git a/packages/grpc-js/src/stream-decoder.ts b/packages/grpc-js/src/stream-decoder.ts new file mode 100644 index 000000000..3a8e91aec --- /dev/null +++ b/packages/grpc-js/src/stream-decoder.ts @@ -0,0 +1,98 @@ +/* + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +enum ReadState { + NO_DATA, + READING_SIZE, + READING_MESSAGE +} + + +export class StreamDecoder { + private readState: ReadState = ReadState.NO_DATA; + private readCompressFlag: Buffer = Buffer.alloc(1); + private readPartialSize: Buffer = Buffer.alloc(4); + private readSizeRemaining = 4; + private readMessageSize = 0; + private readPartialMessage: Buffer[] = []; + private readMessageRemaining = 0; + + + write(data: Buffer): Buffer|null { + let readHead = 0; + let toRead: number; + + while (readHead < data.length) { + switch (this.readState) { + case ReadState.NO_DATA: + this.readCompressFlag = data.slice(readHead, readHead + 1); + readHead += 1; + this.readState = ReadState.READING_SIZE; + this.readPartialSize.fill(0); + this.readSizeRemaining = 4; + this.readMessageSize = 0; + this.readMessageRemaining = 0; + this.readPartialMessage = []; + break; + case ReadState.READING_SIZE: + toRead = Math.min(data.length - readHead, this.readSizeRemaining); + data.copy( + this.readPartialSize, 4 - this.readSizeRemaining, readHead, + readHead + toRead); + this.readSizeRemaining -= toRead; + readHead += toRead; + // readSizeRemaining >=0 here + if (this.readSizeRemaining === 0) { + this.readMessageSize = this.readPartialSize.readUInt32BE(0); + this.readMessageRemaining = this.readMessageSize; + if (this.readMessageRemaining > 0) { + this.readState = ReadState.READING_MESSAGE; + } else { + const message = Buffer.concat( + [this.readCompressFlag, this.readPartialSize], 5); + + this.readState = ReadState.NO_DATA; + return message; + } + } + break; + case ReadState.READING_MESSAGE: + toRead = Math.min(data.length - readHead, this.readMessageRemaining); + this.readPartialMessage.push(data.slice(readHead, readHead + toRead)); + this.readMessageRemaining -= toRead; + readHead += toRead; + // readMessageRemaining >=0 here + if (this.readMessageRemaining === 0) { + // At this point, we have read a full message + const framedMessageBuffers = [ + this.readCompressFlag, this.readPartialSize + ].concat(this.readPartialMessage); + const framedMessage = + Buffer.concat(framedMessageBuffers, this.readMessageSize + 5); + + this.readState = ReadState.NO_DATA; + return framedMessage; + } + break; + default: + throw new Error('Unexpected read state'); + } + } + + return null; + } +}