diff --git a/test/readrows.ts b/test/readrows.ts index 7c43c0add..2b0056338 100644 --- a/test/readrows.ts +++ b/test/readrows.ts @@ -13,7 +13,7 @@ // limitations under the License. import {before, describe, it} from 'mocha'; -import {Bigtable, protos, Row, Table} from '../src'; +import {Bigtable, Row, Table} from '../src'; import * as assert from 'assert'; import {Transform, PassThrough, pipeline} from 'stream'; @@ -21,18 +21,42 @@ import {GoogleError} from 'google-gax'; import {MockServer} from '../src/util/mock-servers/mock-server'; import {BigtableClientMockService} from '../src/util/mock-servers/service-implementations/bigtable-client-mock-service'; import {MockService} from '../src/util/mock-servers/mock-service'; -import {debugLog, readRowsImpl} from './utils/readRowsImpl'; -import {ServerWritableStream, UntypedHandleCall} from '@grpc/grpc-js'; -import {readRowsImpl2} from './utils/readRowsImpl2'; +import {ReadRowsImpl} from './utils/readRowsImpl'; + +import { + ReadRowsServiceParameters, + ReadRowsWritableStream, +} from '../test/utils/readRowsServiceParameters'; +import * as mocha from 'mocha'; + +const DEBUG = process.env.BIGTABLE_TEST_DEBUG === 'true'; + +function debugLog(text: string) { + if (DEBUG) { + console.log(text); + } +} + +// Define parameters for a standard Bigtable Mock service +const VALUE_SIZE = 1024 * 1024; +// we want each row to be split into 2 chunks of different sizes +const CHUNK_SIZE = 1023 * 1024 - 1; +const CHUNKS_PER_RESPONSE = 10; +const STANDARD_KEY_FROM = 0; +// 1000 rows must be enough to reproduce issues with losing the data and to create backpressure +const STANDARD_KEY_TO = 1000; +const STANDARD_SERVICE_WITHOUT_ERRORS: ReadRowsServiceParameters = { + keyFrom: STANDARD_KEY_FROM, + keyTo: STANDARD_KEY_TO, + valueSize: VALUE_SIZE, + chunkSize: CHUNK_SIZE, + chunksPerResponse: CHUNKS_PER_RESPONSE, + debugLog, +}; type PromiseVoid = Promise; interface ServerImplementationInterface { - ( - server: ServerWritableStream< - protos.google.bigtable.v2.IReadRowsRequest, - protos.google.bigtable.v2.IReadRowsResponse - > - ): PromiseVoid; + (server: ReadRowsWritableStream): PromiseVoid; } describe('Bigtable/ReadRows', () => { @@ -53,15 +77,21 @@ describe('Bigtable/ReadRows', () => { service = new BigtableClientMockService(server); }); - it('should create read stream and read synchronously', function (done) { - this.timeout(60000); + // helper function because some tests run slower + // on Windows and need a longer timeout + function setWindowsTestTimeout(test: mocha.Context) { + if (process.platform === 'win32') { + test.timeout(60000); // it runs much slower on Windows! + } + } - // 1000 rows must be enough to reproduce issues with losing the data and to create backpressure - const keyFrom = 0; - const keyTo = 1000; + it('should create read stream and read synchronously', function (done) { + setWindowsTestTimeout(this); service.setService({ - ReadRows: readRowsImpl(keyFrom, keyTo) as any, + ReadRows: ReadRowsImpl.createService( + STANDARD_SERVICE_WITHOUT_ERRORS + ) as ServerImplementationInterface, }); let receivedRowCount = 0; @@ -81,19 +111,17 @@ describe('Bigtable/ReadRows', () => { debugLog(`received row key ${key}`); }); readStream.on('end', () => { - assert.strictEqual(receivedRowCount, keyTo - keyFrom); - assert.strictEqual(lastKeyReceived, keyTo - 1); + assert.strictEqual(receivedRowCount, STANDARD_KEY_TO - STANDARD_KEY_FROM); + assert.strictEqual(lastKeyReceived, STANDARD_KEY_TO - 1); done(); }); }); it('should create read stream and read synchronously using Transform stream', done => { - // 1000 rows must be enough to reproduce issues with losing the data and to create backpressure - const keyFrom = 0; - const keyTo = 1000; - service.setService({ - ReadRows: readRowsImpl(keyFrom, keyTo) as any, + ReadRows: ReadRowsImpl.createService( + STANDARD_SERVICE_WITHOUT_ERRORS + ) as ServerImplementationInterface, }); let receivedRowCount = 0; @@ -128,8 +156,8 @@ describe('Bigtable/ReadRows', () => { debugLog(`received row key ${key}`); }); passThrough.on('end', () => { - assert.strictEqual(receivedRowCount, keyTo - keyFrom); - assert.strictEqual(lastKeyReceived, keyTo - 1); + assert.strictEqual(receivedRowCount, STANDARD_KEY_TO - STANDARD_KEY_FROM); + assert.strictEqual(lastKeyReceived, STANDARD_KEY_TO - 1); done(); }); @@ -137,16 +165,11 @@ describe('Bigtable/ReadRows', () => { }); it('should create read stream and read asynchronously using Transform stream', function (done) { - if (process.platform === 'win32') { - this.timeout(60000); // it runs much slower on Windows! - } - - // 1000 rows must be enough to reproduce issues with losing the data and to create backpressure - const keyFrom = 0; - const keyTo = 1000; - + setWindowsTestTimeout(this); service.setService({ - ReadRows: readRowsImpl(keyFrom, keyTo) as any, + ReadRows: ReadRowsImpl.createService( + STANDARD_SERVICE_WITHOUT_ERRORS + ) as ServerImplementationInterface, }); let receivedRowCount = 0; @@ -183,8 +206,8 @@ describe('Bigtable/ReadRows', () => { debugLog(`received row key ${key}`); }); passThrough.on('end', () => { - assert.strictEqual(receivedRowCount, keyTo - keyFrom); - assert.strictEqual(lastKeyReceived, keyTo - 1); + assert.strictEqual(receivedRowCount, STANDARD_KEY_TO - STANDARD_KEY_FROM); + assert.strictEqual(lastKeyReceived, STANDARD_KEY_TO - 1); done(); }); @@ -192,14 +215,13 @@ describe('Bigtable/ReadRows', () => { }); it('should be able to stop reading from the read stream', done => { - // 1000 rows must be enough to reproduce issues with losing the data and to create backpressure - const keyFrom = 0; - const keyTo = 1000; // pick any key to stop after const stopAfter = 42; service.setService({ - ReadRows: readRowsImpl(keyFrom, keyTo) as any, + ReadRows: ReadRowsImpl.createService( + STANDARD_SERVICE_WITHOUT_ERRORS + ) as ServerImplementationInterface, }); let receivedRowCount = 0; @@ -232,18 +254,14 @@ describe('Bigtable/ReadRows', () => { // TODO: enable after https://github.com/googleapis/nodejs-bigtable/issues/1286 is fixed it('should be able to stop reading from the read stream when reading asynchronously', function (done) { - if (process.platform === 'win32') { - this.timeout(600000); // it runs much slower on Windows! - } - - // 1000 rows must be enough to reproduce issues with losing the data and to create backpressure - const keyFrom = 0; - const keyTo = 1000; + setWindowsTestTimeout(this); // pick any key to stop after const stopAfter = 420; service.setService({ - ReadRows: readRowsImpl(keyFrom, keyTo) as any, + ReadRows: ReadRowsImpl.createService( + STANDARD_SERVICE_WITHOUT_ERRORS + ) as ServerImplementationInterface, }); let receivedRowCount = 0; @@ -294,61 +312,77 @@ describe('Bigtable/ReadRows', () => { pipeline(readStream, transform, passThrough, () => {}); }); - it('should silently resume after server or network error', done => { - // 1000 rows must be enough to reproduce issues with losing the data and to create backpressure - const keyFrom = 0; - const keyTo = 1000; - // the server will error after sending this chunk (not row) - const errorAfterChunkNo = 423; - - service.setService({ - ReadRows: readRowsImpl(keyFrom, keyTo, errorAfterChunkNo) as any, - }); - - let receivedRowCount = 0; - let lastKeyReceived: number | undefined; - - const readStream = table.createReadStream(); - readStream.on('error', (err: GoogleError) => { - done(err); - }); - readStream.on('data', (row: Row) => { - ++receivedRowCount; - const key = parseInt(row.id); - if (lastKeyReceived && key <= lastKeyReceived) { - done(new Error('Test error: keys are not in order')); - } - lastKeyReceived = key; - debugLog(`received row key ${key}`); + describe('should silently resume after server or network error', () => { + function runTest(done: Mocha.Done, errorAfterChunkNo: number) { + service.setService({ + ReadRows: ReadRowsImpl.createService({ + keyFrom: STANDARD_KEY_FROM, + keyTo: STANDARD_KEY_TO, + valueSize: VALUE_SIZE, + chunkSize: CHUNK_SIZE, + chunksPerResponse: CHUNKS_PER_RESPONSE, + errorAfterChunkNo, + debugLog, + }) as ServerImplementationInterface, + }); + let receivedRowCount = 0; + let lastKeyReceived: number | undefined; + + const readStream = table.createReadStream(); + readStream.on('error', (err: GoogleError) => { + done(err); + }); + readStream.on('data', (row: Row) => { + ++receivedRowCount; + const key = parseInt(row.id); + if (lastKeyReceived && key <= lastKeyReceived) { + done(new Error('Test error: keys are not in order')); + } + lastKeyReceived = key; + debugLog(`received row key ${key}`); + }); + readStream.on('end', () => { + assert.strictEqual( + receivedRowCount, + STANDARD_KEY_TO - STANDARD_KEY_FROM + ); + assert.strictEqual(lastKeyReceived, STANDARD_KEY_TO - 1); + done(); + }); + } + it('with an error at a fixed position', function (done) { + setWindowsTestTimeout(this); + // Emits an error after enough chunks have been pushed to create back pressure + runTest(done, 423); }); - readStream.on('end', () => { - assert.strictEqual(receivedRowCount, keyTo - keyFrom); - assert.strictEqual(lastKeyReceived, keyTo - 1); - done(); + it('with an error at a random position', function (done) { + this.timeout(200000); + // Emits an error after a random number of chunks. + const errorAfterChunkNo = Math.floor(Math.random() * 1000); + runTest(done, errorAfterChunkNo); }); }); - - it('should return row data in the right order', done => { - // 150 rows must be enough to reproduce issues with losing the data and to create backpressure - const keyFrom = undefined; - const keyTo = undefined; - // the server will error after sending this chunk (not row) - const errorAfterChunkNo = 100; + it('should return row data in the right order', function (done) { + setWindowsTestTimeout(this); const dataResults = []; - // TODO: Do not use `any` here, make it a more specific type and address downstream implications on the mock server. + // keyTo and keyFrom are not provided so they will be determined from + // the request that is passed in. service.setService({ - ReadRows: readRowsImpl2( - keyFrom, - keyTo, - errorAfterChunkNo - ) as ServerImplementationInterface, + ReadRows: ReadRowsImpl.createService({ + errorAfterChunkNo: 100, // the server will error after sending this chunk (not row) + valueSize: 1, + chunkSize: 1, + chunksPerResponse: 1, + debugLog, + }) as ServerImplementationInterface, }); const sleep = (ms: number) => { return new Promise(resolve => setTimeout(resolve, ms)); }; (async () => { try { + // 150 rows must be enough to reproduce issues with losing the data and to create backpressure const stream = table.createReadStream({ start: '00000000', end: '00000150', diff --git a/test/utils/readRowsImpl.ts b/test/utils/readRowsImpl.ts index b103bb54e..51c22f0bd 100644 --- a/test/utils/readRowsImpl.ts +++ b/test/utils/readRowsImpl.ts @@ -12,25 +12,25 @@ // See the License for the specific language governing permissions and // limitations under the License. -import {ServerWritableStream} from '@grpc/grpc-js'; import {protos} from '../../src'; import {GoogleError, Status} from 'google-gax'; +import { + ChunkGeneratorParameters, + DebugLog, + ReadRowsServiceParameters, + ReadRowsWritableStream, +} from './readRowsServiceParameters'; +import {google} from '../../protos/protos'; +import IRowRange = google.bigtable.v2.IRowRange; -const VALUE_SIZE = 1024 * 1024; -// we want each row to be splitted into 2 chunks of different sizes -const CHUNK_SIZE = 1023 * 1024 - 1; -const CHUNK_PER_RESPONSE = 10; - -const DEBUG = process.env.BIGTABLE_TEST_DEBUG === 'true'; - -export function debugLog(text: string) { - if (DEBUG) { - console.log(text); - } -} - +// Generate documentation for this function +/** Pretty prints the request object. + * @param request The request object to pretty print. + * @param debugLog A function that logs debug messages. + */ function prettyPrintRequest( - request: protos.google.bigtable.v2.IReadRowsRequest + request: protos.google.bigtable.v2.IReadRowsRequest, + debugLog: DebugLog ) { // pretty-printing important parts of the request. // doing it field by field because we want to apply .toString() to all key fields @@ -75,8 +75,16 @@ function prettyPrintRequest( /** Generates chunks for rows in a fake table that match the provided RowSet. * The fake table contains monotonically increasing zero padded rows * in the range [keyFrom, keyTo). + * @param chunkGeneratorParameters The parameters for generating chunks. + * @param debugLog The logging function for printing test output. + * @returns {protos.google.bigtable.v2.ReadRowsResponse.ICellChunk[]} The generated chunks. */ -function generateChunks(keyFrom: number, keyTo: number) { +function generateChunks( + chunkGeneratorParameters: ChunkGeneratorParameters, + debugLog: DebugLog +): protos.google.bigtable.v2.ReadRowsResponse.ICellChunk[] { + const keyFrom = chunkGeneratorParameters.keyFrom; + const keyTo = chunkGeneratorParameters.keyTo; debugLog(`generating chunks from ${keyFrom} to ${keyTo}`); const chunks: protos.google.bigtable.v2.ReadRowsResponse.ICellChunk[] = []; @@ -86,7 +94,7 @@ function generateChunks(keyFrom: number, keyTo: number) { const binaryKey = Buffer.from(key.toString().padStart(8, '0')); debugLog(`generating chunks for ${key}`); const rowKey = binaryKey.toString('base64'); - let remainingBytes = VALUE_SIZE; + let remainingBytes = chunkGeneratorParameters.valueSize; let chunkCounter = 0; while (remainingBytes > 0) { debugLog(` remaining bytes: ${remainingBytes}`); @@ -100,7 +108,10 @@ function generateChunks(keyFrom: number, keyTo: number) { value: Buffer.from('qualifier').toString('base64'), }; } - const thisChunkSize = Math.min(CHUNK_SIZE, remainingBytes); + const thisChunkSize = Math.min( + chunkGeneratorParameters.chunkSize, + remainingBytes + ); remainingBytes -= thisChunkSize; const value = Buffer.from('a'.repeat(remainingBytes)).toString('base64'); chunk.value = value; @@ -116,6 +127,11 @@ function generateChunks(keyFrom: number, keyTo: number) { return chunks; } +/** Checks if the given key is in the provided RowSet. + * @param stringKey The key to check. + * @param rowSet The RowSet to check against. + * @returns {boolean} True if the key is in the RowSet, false otherwise. + */ function isKeyInRowSet( stringKey: string, rowSet?: protos.google.bigtable.v2.IRowSet | null @@ -154,80 +170,218 @@ function isKeyInRowSet( return true; } -// Returns an implementation of the server streaming ReadRows call that would return -// monotonically increasing zero padded rows in the range [keyFrom, keyTo). -// The returned implementation can be passed to gRPC server. -export function readRowsImpl( - keyFrom: number, - keyTo: number, - errorAfterChunkNo?: number -): ( - stream: ServerWritableStream< - protos.google.bigtable.v2.IReadRowsRequest, - protos.google.bigtable.v2.IReadRowsResponse - > -) => Promise { - return async ( - stream: ServerWritableStream< - protos.google.bigtable.v2.IReadRowsRequest, - protos.google.bigtable.v2.IReadRowsResponse - > - ): Promise => { - prettyPrintRequest(stream.request); +/** Gets the key value for the given property specified in the request. + * @param request The request object to get the key value from. + * @param property The property from the request to get the value from. + * @returns {string | undefined} The key value from the request. + */ +function getKeyValue( + request: protos.google.bigtable.v2.IReadRowsRequest, + property: keyof IRowRange +) { + if ( + request?.rows?.rowRanges && + request?.rows?.rowRanges[0] && + request?.rows?.rowRanges[0][property]?.toString() + ) { + return request?.rows?.rowRanges[0][property]?.toString(); + } + return undefined; +} + +/** Gets the key from the request object. + * @param stream The stream object to get the key from. + * @param keySelectionParameters The parameters for selecting the key. + * @returns {number} The selected key for generating chunks + */ +function getSelectedKey( + request: protos.google.bigtable.v2.IReadRowsRequest, + keySelectionParameters: { + keyOpenProperty: keyof IRowRange; + keyClosedProperty: keyof IRowRange; + defaultKey?: number; + } +) { + const keyRequestOpen = getKeyValue( + request, + keySelectionParameters.keyOpenProperty + ); + const keyRequestClosed = getKeyValue( + request, + keySelectionParameters.keyClosedProperty + ); + const defaultKey = keySelectionParameters.defaultKey; + return defaultKey === undefined + ? keyRequestClosed === undefined + ? parseInt(keyRequestOpen as string) + 1 + : parseInt(keyRequestClosed as string) + : defaultKey; +} + +/** Generates chunks for rows in a fake table that match the provided RowSet. + * The fake table contains monotonically increasing zero padded rows + * in the range [keyFrom, keyTo). + * @param request The request object to generate chunks from. + * @param serviceParameters The parameters for generating chunks. + * @param debugLog A function that logs debug messages. + * @returns {protos.google.bigtable.v2.ReadRowsResponse.ICellChunk[]} The generated chunks. + */ +function generateChunksFromRequest( + request: protos.google.bigtable.v2.IReadRowsRequest, + serviceParameters: ReadRowsServiceParameters +) { + return generateChunks( + { + keyFrom: getSelectedKey(request, { + keyOpenProperty: 'startKeyOpen', + keyClosedProperty: 'startKeyClosed', + defaultKey: serviceParameters.keyFrom, + }), + keyTo: getSelectedKey(request, { + keyOpenProperty: 'endKeyOpen', + keyClosedProperty: 'endKeyClosed', + defaultKey: serviceParameters.keyTo, + }), + chunkSize: serviceParameters.chunkSize, + valueSize: serviceParameters.valueSize, + }, + serviceParameters.debugLog + ); +} + +/** A class that handles the ReadRows request. + * @param stream The stream object that is passed into the request. + * @param debugLog A function that logs debug messages. + */ +class ReadRowsRequestHandler { + public cancelled: boolean; + public stopWaiting: () => void; + constructor( + readonly stream: ReadRowsWritableStream, + readonly debugLog: DebugLog + ) { + this.cancelled = false; + this.stopWaiting = () => {}; + } - let stopWaiting: () => void = () => {}; - let cancelled = false; + /** Sends the response object to the stream. + * @param response The response object to send. + */ + async sendResponse( + response: protos.google.bigtable.v2.IReadRowsResponse + ): Promise { // an asynchronous function to write a response object to stream, reused several times below. // captures `cancelled` variable - const sendResponse = async ( - response: protos.google.bigtable.v2.IReadRowsResponse - ): Promise => { - return new Promise(resolve => { - setTimeout(async () => { - if (cancelled) { - resolve(); - return; - } - const canSendMore = stream.write(response); - if (response.chunks && response.chunks.length > 0) { - debugLog(`sent ${response.chunks.length} chunks`); - } - if (response.lastScannedRowKey) { - const binaryKey = Buffer.from( - response.lastScannedRowKey as string, - 'base64' - ); - const stringKey = binaryKey.toString(); - debugLog(`sent lastScannedRowKey = ${stringKey}`); - } - if (!canSendMore) { - debugLog('awaiting for back pressure'); - await new Promise(resolve => { - stopWaiting = resolve; - stream.once('drain', resolve); - }); - } + return new Promise(resolve => { + const debugLog = this.debugLog; + const stream = this.stream; + setTimeout(async () => { + if (this.cancelled) { resolve(); - }, 0); - }); + return; + } + const canSendMore = stream.write(response); + if (response.chunks && response.chunks.length > 0) { + debugLog(`sent ${response.chunks.length} chunks`); + } + // Log a message if the lastScannedRowKey is provided. + if (response.lastScannedRowKey) { + const binaryKey = Buffer.from( + response.lastScannedRowKey as string, + 'base64' + ); + const stringKey = binaryKey.toString(); + debugLog(`sent lastScannedRowKey = ${stringKey}`); + } + if (!canSendMore) { + // Before doing any more writing with the stream, drain the stream. + debugLog('awaiting for back pressure'); + await new Promise(resolve => { + this.stopWaiting = resolve; + stream.once('drain', resolve); + }); + } + resolve(); + }, 0); + }); + } +} + +/** Implementation of the server streaming ReadRows call. + * The implementation returns monotonically increasing zero padded rows + * in the range [keyFrom, keyTo) if they are provided. Instances of this object + * are used to store data that needs to be shared between multiple requests. + * For instance, the service ignores the errorAfterChunkNo value after the + * service has already emitted an error. + * + * @param serviceParameters The parameters for the implementation. + */ +export class ReadRowsImpl { + private errorAfterChunkNo?: number; + + /** + * Constructor for creating the ReadRows service. Constructor is private to + * encourage use of createService with the factory pattern to restrict the + * way that the service is created for better encapsulation. + * + * @param serviceParameters The parameters for creating the service + * @private + */ + private constructor(readonly serviceParameters: ReadRowsServiceParameters) { + this.errorAfterChunkNo = serviceParameters.errorAfterChunkNo; + } + + /** + Factory method that returns an implementation of the server handling streaming + ReadRows calls that would return monotonically increasing zero padded rows + in the range [keyFrom, keyTo). The returned implementation can be passed to + gRPC server. + @param serviceParameters The parameters for creating the service + */ + static createService(serviceParameters: ReadRowsServiceParameters) { + return async (stream: ReadRowsWritableStream): Promise => { + await new ReadRowsImpl(serviceParameters).handleRequest(stream); }; + } + /** Handles the ReadRows request. + * @param stream The stream object that is passed into the request. + */ + private async handleRequest(stream: ReadRowsWritableStream) { + const debugLog = this.serviceParameters.debugLog; + prettyPrintRequest(stream.request, debugLog); + const readRowsRequestHandler = new ReadRowsRequestHandler(stream, debugLog); stream.on('cancelled', () => { debugLog('gRPC server received cancel()'); - cancelled = true; - stopWaiting(); + readRowsRequestHandler.cancelled = true; + readRowsRequestHandler.stopWaiting(); stream.emit('error', new Error('Cancelled')); }); + const chunks = generateChunksFromRequest( + stream.request, + this.serviceParameters + ); + await this.sendAllChunks(readRowsRequestHandler, chunks); + } + /** Sends all chunks to the stream. + * @param readRowsRequestHandler The handler for the request. + * @param chunks The chunks to send. + */ + private async sendAllChunks( + readRowsRequestHandler: ReadRowsRequestHandler, + chunks: protos.google.bigtable.v2.ReadRowsResponse.ICellChunk[] + ) { + const stream = readRowsRequestHandler.stream; + const debugLog = readRowsRequestHandler.debugLog; let chunksSent = 0; - const chunks = generateChunks(keyFrom, keyTo); let lastScannedRowKey: string | undefined; let currentResponseChunks: protos.google.bigtable.v2.ReadRowsResponse.ICellChunk[] = []; let chunkIdx = 0; let skipThisRow = false; for (const chunk of chunks) { - if (cancelled) { + if (readRowsRequestHandler.cancelled) { break; } @@ -256,8 +410,9 @@ export function readRowsImpl( ++chunkIdx; } if ( - currentResponseChunks.length === CHUNK_PER_RESPONSE || - chunkIdx === errorAfterChunkNo || + currentResponseChunks.length === + this.serviceParameters.chunksPerResponse || + chunkIdx === this.errorAfterChunkNo || // if we skipped a row and set lastScannedRowKey, dump everything and send a separate message with lastScannedRowKey lastScannedRowKey ) { @@ -265,15 +420,15 @@ export function readRowsImpl( chunks: currentResponseChunks, }; chunksSent += currentResponseChunks.length; - await sendResponse(response); + await readRowsRequestHandler.sendResponse(response); currentResponseChunks = []; - if (chunkIdx === errorAfterChunkNo) { + if (chunkIdx === this.errorAfterChunkNo) { debugLog(`sending error after chunk #${chunkIdx}`); - errorAfterChunkNo = undefined; // do not send error for the second time + this.errorAfterChunkNo = undefined; // do not send error for the second time const error = new GoogleError('Uh oh'); error.code = Status.ABORTED; stream.emit('error', error); - cancelled = true; + readRowsRequestHandler.cancelled = true; break; } } @@ -281,19 +436,19 @@ export function readRowsImpl( const response: protos.google.bigtable.v2.IReadRowsResponse = { lastScannedRowKey, }; - await sendResponse(response); + await readRowsRequestHandler.sendResponse(response); lastScannedRowKey = undefined; } } - if (!cancelled && currentResponseChunks.length > 0) { + if (!readRowsRequestHandler.cancelled && currentResponseChunks.length > 0) { const response: protos.google.bigtable.v2.IReadRowsResponse = { chunks: currentResponseChunks, lastScannedRowKey, }; chunksSent += currentResponseChunks.length; - await sendResponse(response); + await readRowsRequestHandler.sendResponse(response); } debugLog(`in total, sent ${chunksSent} chunks`); stream.end(); - }; + } } diff --git a/test/utils/readRowsImpl2.ts b/test/utils/readRowsImpl2.ts deleted file mode 100644 index e8397fd63..000000000 --- a/test/utils/readRowsImpl2.ts +++ /dev/null @@ -1,332 +0,0 @@ -// Copyright 2024 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -import {ServerWritableStream} from '@grpc/grpc-js'; -import {protos} from '../../src'; -import {GoogleError, Status} from 'google-gax'; - -const VALUE_SIZE = 1; -// we want each row to be splitted into 2 chunks of different sizes -const CHUNK_SIZE = 1; -const CHUNK_PER_RESPONSE = 1; - -const DEBUG = process.env.BIGTABLE_TEST_DEBUG === 'true'; - -export function debugLog(text: string) { - if (DEBUG) { - console.log(text); - } -} - -// TODO: Simplify this, perhaps use shared code from readRowsImpl. -function prettyPrintRequest( - request: protos.google.bigtable.v2.IReadRowsRequest -) { - // pretty-printing important parts of the request. - // doing it field by field because we want to apply .toString() to all key fields - debugLog('received request: {'); - debugLog(` tableName: "${request.tableName}",`); - if (request.rows) { - debugLog(' rows: {'); - if (request.rows.rowKeys) { - debugLog(' rowKeys: ['); - for (const key of request.rows.rowKeys) { - debugLog(` "${key.toString()}",`); - } - debugLog(' ],'); - } - if (request.rows.rowRanges) { - debugLog(' rowRanges: ['); - for (const range of request.rows.rowRanges) { - debugLog(' {'); - if (range.startKeyOpen) { - debugLog(` startKeyOpen: "${range.startKeyOpen.toString()}",`); - } - if (range.startKeyClosed) { - debugLog( - ` startKeyClosed: "${range.startKeyClosed.toString()}",` - ); - } - if (range.endKeyOpen) { - debugLog(` endKeyOpen: "${range.endKeyOpen.toString()}",`); - } - if (range.endKeyClosed) { - debugLog(` endKeyClosed: "${range.endKeyClosed.toString()}",`); - } - debugLog(' },'); - } - debugLog(' ],'); - } - debugLog(' },'); - } - debugLog('}'); -} - -/** Generates chunks for rows in a fake table that match the provided RowSet. - * The fake table contains monotonically increasing zero padded rows - * in the range [keyFrom, keyTo). - */ -function generateChunks(keyFrom: number, keyTo: number) { - debugLog(`generating chunks from ${keyFrom} to ${keyTo}`); - - const chunks: protos.google.bigtable.v2.ReadRowsResponse.ICellChunk[] = []; - for (let key = keyFrom; key < keyTo; ++key) { - // the keys must be increasing, but we also want to keep them readable, - // so we'll use keys 00000000, 00000001, 00000002, etc. stored as Buffers - const binaryKey = Buffer.from(key.toString().padStart(8, '0')); - debugLog(`generating chunks for ${key}`); - const rowKey = binaryKey.toString('base64'); - let remainingBytes = VALUE_SIZE; - let chunkCounter = 0; - while (remainingBytes > 0) { - debugLog(` remaining bytes: ${remainingBytes}`); - const chunk: protos.google.bigtable.v2.ReadRowsResponse.ICellChunk = {}; - if (chunkCounter === 0) { - chunk.rowKey = rowKey; - chunk.familyName = { - value: 'family', - }; - chunk.qualifier = { - value: Buffer.from('qualifier').toString('base64'), - }; - } - const thisChunkSize = Math.min(CHUNK_SIZE, remainingBytes); - remainingBytes -= thisChunkSize; - const value = Buffer.from('a'.repeat(remainingBytes)).toString('base64'); - chunk.value = value; - if (remainingBytes === 0) { - debugLog(` setting commit flag for rowKey ${key}`); - chunk.commitRow = true; - } - chunks.push(chunk); - ++chunkCounter; - } - } - debugLog(`generated ${chunks.length} chunks between ${keyFrom} and ${keyTo}`); - return chunks; -} - -function isKeyInRowSet( - stringKey: string, - rowSet?: protos.google.bigtable.v2.IRowSet | null -): boolean { - if (!rowSet) { - return true; - } - // primitive support for row ranges - if (rowSet.rowRanges || rowSet.rowKeys) { - for (const requestKey of rowSet.rowKeys ?? []) { - if (stringKey === requestKey.toString()) { - return true; - } - } - for (const range of rowSet.rowRanges ?? []) { - let startOk = true; - let endOk = true; - if (range.startKeyOpen && range.startKeyOpen.toString() >= stringKey) { - startOk = false; - } - if (range.startKeyClosed && range.startKeyClosed.toString() > stringKey) { - startOk = false; - } - if (range.endKeyOpen && range.endKeyOpen.toString() <= stringKey) { - endOk = false; - } - if (range.endKeyClosed && range.endKeyClosed.toString() < stringKey) { - endOk = false; - } - if (startOk && endOk) { - return true; - } - } - return false; - } - return true; -} - -// Returns an implementation of the server streaming ReadRows call that would return -// monotonically increasing zero padded rows in the range [keyFrom, keyTo). -// The returned implementation can be passed to gRPC server. -// TODO: Remove optional keyFrom, keyTo from the server. No test uses them. Remove them from this test as well. -// TODO: Address the excessive number of if statements. -// TODO: Perhaps group the if statements into classes so that they can be unit tested. -export function readRowsImpl2( - keyFrom?: number, - keyTo?: number, - errorAfterChunkNo?: number -): ( - stream: ServerWritableStream< - protos.google.bigtable.v2.IReadRowsRequest, - protos.google.bigtable.v2.IReadRowsResponse - > -) => Promise { - return async ( - stream: ServerWritableStream< - protos.google.bigtable.v2.IReadRowsRequest, - protos.google.bigtable.v2.IReadRowsResponse - > - ): Promise => { - console.log('Server received request'); - prettyPrintRequest(stream.request); - - let stopWaiting: () => void = () => {}; - // an asynchronous function to write a response object to stream, reused several times below. - // captures `cancelled` variable - const sendResponse = async ( - response: protos.google.bigtable.v2.IReadRowsResponse - ): Promise => { - return new Promise(resolve => { - setTimeout(async () => { - const canSendMore = stream.write(response); - if (response.chunks && response.chunks.length > 0) { - debugLog(`sent ${response.chunks.length} chunks`); - } - if (response.lastScannedRowKey) { - const binaryKey = Buffer.from( - response.lastScannedRowKey as string, - 'base64' - ); - const stringKey = binaryKey.toString(); - debugLog(`sent lastScannedRowKey = ${stringKey}`); - } - if (!canSendMore) { - debugLog('awaiting for back pressure'); - await new Promise(resolve => { - stopWaiting = resolve; - stream.once('drain', resolve); - }); - } - resolve(); - }, 0); - }); - }; - - let chunksSent = 0; - let keyFromRequestClosed: any; - if ( - stream?.request?.rows?.rowRanges && - stream?.request?.rows?.rowRanges[0] && - stream?.request?.rows?.rowRanges[0]?.startKeyClosed?.toString() - ) { - keyFromRequestClosed = - stream?.request?.rows?.rowRanges[0]?.startKeyClosed?.toString(); - } - let keyFromRequestOpen: any; - if ( - stream?.request?.rows?.rowRanges && - stream?.request?.rows?.rowRanges[0] && - stream?.request?.rows?.rowRanges[0]?.startKeyOpen?.toString() - ) { - keyFromRequestOpen = - stream?.request?.rows?.rowRanges[0]?.startKeyOpen?.toString(); - } - let keyToRequestClosed: any; - if ( - stream?.request?.rows?.rowRanges && - stream?.request?.rows?.rowRanges[0] && - stream?.request?.rows?.rowRanges[0]?.endKeyClosed?.toString() - ) { - keyToRequestClosed = - stream?.request?.rows?.rowRanges[0]?.endKeyClosed?.toString(); - } - let keyToRequestOpen; - if ( - stream?.request?.rows?.rowRanges && - stream?.request?.rows?.rowRanges[0] && - stream?.request?.rows?.rowRanges[0]?.endKeyOpen?.toString() - ) { - keyToRequestOpen = - stream?.request?.rows?.rowRanges[0]?.endKeyOpen?.toString(); - } - const keyFromUsed = keyFrom - ? keyFrom - : keyFromRequestClosed - ? parseInt(keyFromRequestClosed as string) - : parseInt(keyFromRequestOpen as string) + 1; - const keyToUsed = keyTo - ? keyTo - : keyToRequestClosed - ? parseInt(keyToRequestClosed as string) - : parseInt(keyToRequestOpen as string) + 1; - const chunks = generateChunks(keyFromUsed, keyToUsed); - let lastScannedRowKey: string | undefined; - let currentResponseChunks: protos.google.bigtable.v2.ReadRowsResponse.ICellChunk[] = - []; - let chunkIdx = 0; - let skipThisRow = false; - for (const chunk of chunks) { - if (chunk.rowKey) { - const binaryKey = Buffer.from(chunk.rowKey as string, 'base64'); - const stringKey = binaryKey.toString(); - - debugLog(`starting row with key ${stringKey}`); - if (isKeyInRowSet(stringKey, stream.request.rows)) { - skipThisRow = false; - } else { - debugLog( - `skipping row with key ${stringKey} because it's out of requested range or keys` - ); - skipThisRow = true; - lastScannedRowKey = chunk.rowKey as string; - } - } - - if (chunk.commitRow) { - debugLog('commit row'); - } - - if (!skipThisRow) { - currentResponseChunks.push(chunk); - ++chunkIdx; - } - if ( - currentResponseChunks.length === CHUNK_PER_RESPONSE || - chunkIdx === errorAfterChunkNo || - // if we skipped a row and set lastScannedRowKey, dump everything and send a separate message with lastScannedRowKey - lastScannedRowKey - ) { - const response: protos.google.bigtable.v2.IReadRowsResponse = { - chunks: currentResponseChunks, - }; - chunksSent += currentResponseChunks.length; - await sendResponse(response); - currentResponseChunks = []; - if (chunkIdx === errorAfterChunkNo) { - debugLog(`sending error after chunk #${chunkIdx}`); - errorAfterChunkNo = undefined; // do not send error for the second time - const error = new GoogleError('Uh oh'); - error.code = Status.ABORTED; - stream.emit('error', error); - } - } - if (lastScannedRowKey) { - const response: protos.google.bigtable.v2.IReadRowsResponse = { - lastScannedRowKey, - }; - await sendResponse(response); - lastScannedRowKey = undefined; - } - } - if (currentResponseChunks.length > 0) { - const response: protos.google.bigtable.v2.IReadRowsResponse = { - chunks: currentResponseChunks, - lastScannedRowKey, - }; - chunksSent += currentResponseChunks.length; - await sendResponse(response); - } - debugLog(`in total, sent ${chunksSent} chunks`); - stream.end(); - }; -} diff --git a/test/utils/readRowsServiceParameters.ts b/test/utils/readRowsServiceParameters.ts new file mode 100644 index 000000000..bf9b2c076 --- /dev/null +++ b/test/utils/readRowsServiceParameters.ts @@ -0,0 +1,45 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {ServerWritableStream} from '@grpc/grpc-js'; +import {protos} from '../../src'; + +/** + * This file contains the parameters for the readRowsService. + */ + +interface SharedReadRowsParameters { + chunkSize: number; // The size of each chunk that the server pushes back + valueSize: number; // An upper bound on the amount of data included in the chunks +} + +export type DebugLog = (message: string) => void; + +export interface ReadRowsServiceParameters extends SharedReadRowsParameters { + keyFrom?: number; // The key the data coming from the service will start from + keyTo?: number; // The key the data coming from the service will end at + errorAfterChunkNo?: number; // The chunk that the error should come after + chunksPerResponse: number; // The total number of chunks the server should send + debugLog: DebugLog; +} + +export interface ChunkGeneratorParameters extends SharedReadRowsParameters { + keyFrom: number; // The first row in the generated chunks will start with this key + keyTo: number; // The last row in the generated chunks will start with this key +} + +export type ReadRowsWritableStream = ServerWritableStream< + protos.google.bigtable.v2.IReadRowsRequest, + protos.google.bigtable.v2.IReadRowsResponse +>;