From 52a6711f8e8a3ba4a6d242ce3cf2c6cf9585c727 Mon Sep 17 00:00:00 2001 From: Alexander Fenster Date: Mon, 22 May 2023 16:15:40 -0700 Subject: [PATCH] test: test ReadRows logic with local gRPC server (#1282) * test: test ReadRows logic with local gRPC server * test: PR feedback * test: fix race condition in initialization * test: PR feedback, renaming a variable for readability * test: add test for asynchronous end() call * test: only set lastScannedRowKey for completed rows * test: refactor, fix lastScannedRowKey logic, PR feedback * test: rename test suite --------- Co-authored-by: danieljbruce --- test/readrows.ts | 330 +++++++++++++++++++++++++++++++++++++ test/utils/readRowsImpl.ts | 299 +++++++++++++++++++++++++++++++++ 2 files changed, 629 insertions(+) create mode 100644 test/readrows.ts create mode 100644 test/utils/readRowsImpl.ts diff --git a/test/readrows.ts b/test/readrows.ts new file mode 100644 index 000000000..3da2cb9cf --- /dev/null +++ b/test/readrows.ts @@ -0,0 +1,330 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {before, describe, it} from 'mocha'; +import {Bigtable, Row, Table} from '../src'; +import * as assert from 'assert'; +import {Transform, PassThrough, pipeline} from 'stream'; + +import {GoogleError} from 'google-gax'; +import {MockServer} from '../src/util/mock-servers/mock-server'; +import {BigtableClientMockService} from '../src/util/mock-servers/service-implementations/bigtable-client-mock-service'; +import {MockService} from '../src/util/mock-servers/mock-service'; +import {debugLog, readRowsImpl} from './utils/readRowsImpl'; + +describe('Bigtable/ReadRows', () => { + let server: MockServer; + let service: MockService; + let bigtable: Bigtable; + let table: Table; + + before(async () => { + // make sure we have everything initialized before starting tests + const port = await new Promise(resolve => { + server = new MockServer(resolve); + }); + bigtable = new Bigtable({ + apiEndpoint: `localhost:${port}`, + }); + table = bigtable.instance('fake-instance').table('fake-table'); + service = new BigtableClientMockService(server); + }); + + it('should create read stream and read synchronously', done => { + // 1000 rows must be enough to reproduce issues with losing the data and to create backpressure + const keyFrom = 0; + const keyTo = 1000; + + service.setService({ + ReadRows: readRowsImpl(keyFrom, keyTo), + }); + + let receivedRowCount = 0; + let lastKeyReceived: number | undefined; + + const readStream = table.createReadStream(); + readStream.on('error', (err: GoogleError) => { + done(err); + }); + readStream.on('data', (row: Row) => { + ++receivedRowCount; + const key = parseInt(row.id); + if (lastKeyReceived && key <= lastKeyReceived) { + done(new Error('Test error: keys are not in order')); + } + lastKeyReceived = key; + debugLog(`received row key ${key}`); + }); + readStream.on('end', () => { + assert.strictEqual(receivedRowCount, keyTo - keyFrom); + assert.strictEqual(lastKeyReceived, keyTo - 1); + done(); + }); + }); + + it('should create read stream and read synchronously using Transform stream', done => { + // 1000 rows must be enough to reproduce issues with losing the data and to create backpressure + const keyFrom = 0; + const keyTo = 1000; + + service.setService({ + ReadRows: readRowsImpl(keyFrom, keyTo), + }); + + let receivedRowCount = 0; + let lastKeyReceived: number | undefined; + + // BigTable stream + const readStream = table.createReadStream(); + + // Transform stream + const transform = new Transform({ + objectMode: true, + transform: (row, _encoding, callback) => { + callback(null, row); + }, + }); + + // Final stream + const passThrough = new PassThrough({ + objectMode: true, + }); + + passThrough.on('error', (err: GoogleError) => { + done(err); + }); + passThrough.on('data', (row: Row) => { + ++receivedRowCount; + const key = parseInt(row.id); + if (lastKeyReceived && key <= lastKeyReceived) { + done(new Error('Test error: keys are not in order')); + } + lastKeyReceived = key; + debugLog(`received row key ${key}`); + }); + passThrough.on('end', () => { + assert.strictEqual(receivedRowCount, keyTo - keyFrom); + assert.strictEqual(lastKeyReceived, keyTo - 1); + done(); + }); + + pipeline(readStream, transform, passThrough, () => {}); + }); + + // TODO(@alexander-fenster): enable after https://github.com/googleapis/nodejs-bigtable/issues/607 is fixed + it.skip('should create read stream and read asynchronously using Transform stream', function (done) { + if (process.platform === 'win32') { + this.timeout(60000); // it runs much slower on Windows! + } + + // 1000 rows must be enough to reproduce issues with losing the data and to create backpressure + const keyFrom = 0; + const keyTo = 1000; + + service.setService({ + ReadRows: readRowsImpl(keyFrom, keyTo), + }); + + let receivedRowCount = 0; + let lastKeyReceived: number | undefined; + + // BigTable stream + const readStream = table.createReadStream(); + + // Transform stream + const transform = new Transform({ + objectMode: true, + transform: (row, _encoding, callback) => { + setTimeout(() => { + callback(null, row); + }, 0); + }, + }); + + // Final stream + const passThrough = new PassThrough({ + objectMode: true, + }); + + passThrough.on('error', (err: GoogleError) => { + done(err); + }); + passThrough.on('data', (row: Row) => { + ++receivedRowCount; + const key = parseInt(row.id); + if (lastKeyReceived && key <= lastKeyReceived) { + done(new Error('Test error: keys are not in order')); + } + lastKeyReceived = key; + debugLog(`received row key ${key}`); + }); + passThrough.on('end', () => { + assert.strictEqual(receivedRowCount, keyTo - keyFrom); + assert.strictEqual(lastKeyReceived, keyTo - 1); + done(); + }); + + pipeline(readStream, transform, passThrough, () => {}); + }); + + it('should be able to stop reading from the read stream', done => { + // 1000 rows must be enough to reproduce issues with losing the data and to create backpressure + const keyFrom = 0; + const keyTo = 1000; + // pick any key to stop after + const stopAfter = 42; + + service.setService({ + ReadRows: readRowsImpl(keyFrom, keyTo), + }); + + let receivedRowCount = 0; + let lastKeyReceived: number | undefined; + + const readStream = table.createReadStream({ + // workaround for https://github.com/grpc/grpc-node/issues/2446, remove when fixed + gaxOptions: { + timeout: 3000, + }, + }); + readStream.on('error', (err: GoogleError) => { + done(err); + }); + readStream.on('data', (row: Row) => { + ++receivedRowCount; + const key = parseInt(row.id); + if (lastKeyReceived && key <= lastKeyReceived) { + done(new Error('Test error: keys are not in order')); + } + lastKeyReceived = key; + debugLog(`received row key ${key}`); + + if (receivedRowCount === stopAfter) { + debugLog(`requesting to stop after receiving key ${key}`); + readStream.end(); + } + }); + readStream.on('end', () => { + assert.strictEqual(receivedRowCount, stopAfter); + assert.strictEqual(lastKeyReceived, stopAfter - 1); + done(); + }); + }); + + // TODO(@alexander-fenster): enable after it's fixed + it.skip('should be able to stop reading from the read stream when reading asynchronously', function (done) { + if (process.platform === 'win32') { + this.timeout(60000); // it runs much slower on Windows! + } + + // 1000 rows must be enough to reproduce issues with losing the data and to create backpressure + const keyFrom = 0; + const keyTo = 1000; + // pick any key to stop after + const stopAfter = 420; + + service.setService({ + ReadRows: readRowsImpl(keyFrom, keyTo), + }); + + let receivedRowCount = 0; + let lastKeyReceived: number | undefined; + + // BigTable stream + const readStream = table.createReadStream({ + // workaround for https://github.com/grpc/grpc-node/issues/2446, remove when fixed + gaxOptions: { + timeout: 3000, + }, + }); + + // Transform stream + const transform = new Transform({ + objectMode: true, + transform: (row, _encoding, callback) => { + setTimeout(() => { + callback(null, row); + }, 0); + }, + }); + + // Final stream + const passThrough = new PassThrough({ + objectMode: true, + }); + + passThrough.on('error', (err: GoogleError) => { + done(err); + }); + passThrough.on('data', (row: Row) => { + ++receivedRowCount; + const key = parseInt(row.id); + if (lastKeyReceived && key <= lastKeyReceived) { + done(new Error('Test error: keys are not in order')); + } + lastKeyReceived = key; + debugLog(`received row key ${key}`); + + if (receivedRowCount === stopAfter) { + debugLog(`requesting to stop after receiving key ${key}`); + readStream.end(); + } + }); + passThrough.on('end', () => { + assert.strictEqual(receivedRowCount, stopAfter); + assert.strictEqual(lastKeyReceived, stopAfter - 1); + done(); + }); + + pipeline(readStream, transform, passThrough, () => {}); + }); + + it('should silently resume after server or network error', done => { + // 1000 rows must be enough to reproduce issues with losing the data and to create backpressure + const keyFrom = 0; + const keyTo = 1000; + // the server will error after sending this chunk (not row) + const errorAfterChunkNo = 423; + + service.setService({ + ReadRows: readRowsImpl(keyFrom, keyTo, errorAfterChunkNo), + }); + + let receivedRowCount = 0; + let lastKeyReceived: number | undefined; + + const readStream = table.createReadStream(); + readStream.on('error', (err: GoogleError) => { + done(err); + }); + readStream.on('data', (row: Row) => { + ++receivedRowCount; + const key = parseInt(row.id); + if (lastKeyReceived && key <= lastKeyReceived) { + done(new Error('Test error: keys are not in order')); + } + lastKeyReceived = key; + debugLog(`received row key ${key}`); + }); + readStream.on('end', () => { + assert.strictEqual(receivedRowCount, keyTo - keyFrom); + assert.strictEqual(lastKeyReceived, keyTo - 1); + done(); + }); + }); + + after(async () => { + server.shutdown(() => {}); + }); +}); diff --git a/test/utils/readRowsImpl.ts b/test/utils/readRowsImpl.ts new file mode 100644 index 000000000..3f11dce23 --- /dev/null +++ b/test/utils/readRowsImpl.ts @@ -0,0 +1,299 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {ServerWritableStream} from '@grpc/grpc-js'; +import {protos} from '../../src'; +import {GoogleError, Status} from 'google-gax'; + +const VALUE_SIZE = 1024 * 1024; +// we want each row to be splitted into 2 chunks of different sizes +const CHUNK_SIZE = 1023 * 1024 - 1; +const CHUNK_PER_RESPONSE = 10; + +const DEBUG = process.env.BIGTABLE_TEST_DEBUG === 'true'; + +export function debugLog(text: string) { + if (DEBUG) { + console.log(text); + } +} + +function prettyPrintRequest( + request: protos.google.bigtable.v2.IReadRowsRequest +) { + // pretty-printing important parts of the request. + // doing it field by field because we want to apply .toString() to all key fields + debugLog('received request: {'); + debugLog(` tableName: "${request.tableName}",`); + if (request.rows) { + debugLog(' rows: {'); + if (request.rows.rowKeys) { + debugLog(' rowKeys: ['); + for (const key of request.rows.rowKeys) { + debugLog(` "${key.toString()}",`); + } + debugLog(' ],'); + } + if (request.rows.rowRanges) { + debugLog(' rowRanges: ['); + for (const range of request.rows.rowRanges) { + debugLog(' {'); + if (range.startKeyOpen) { + debugLog(` startKeyOpen: "${range.startKeyOpen.toString()}",`); + } + if (range.startKeyClosed) { + debugLog( + ` startKeyClosed: "${range.startKeyClosed.toString()}",` + ); + } + if (range.endKeyOpen) { + debugLog(` endKeyOpen: "${range.endKeyOpen.toString()}",`); + } + if (range.endKeyClosed) { + debugLog(` endKeyClosed: "${range.endKeyClosed.toString()}",`); + } + debugLog(' },'); + } + debugLog(' ],'); + } + debugLog(' },'); + } + debugLog('}'); +} + +/** Generates chunks for rows in a fake table that match the provided RowSet. + * The fake table contains monotonically increasing zero padded rows + * in the range [keyFrom, keyTo). + */ +function generateChunks(keyFrom: number, keyTo: number) { + debugLog(`generating chunks from ${keyFrom} to ${keyTo}`); + + const chunks: protos.google.bigtable.v2.ReadRowsResponse.ICellChunk[] = []; + for (let key = keyFrom; key < keyTo; ++key) { + // the keys must be increasing, but we also want to keep them readable, + // so we'll use keys 00000000, 00000001, 00000002, etc. stored as Buffers + const binaryKey = Buffer.from(key.toString().padStart(8, '0')); + debugLog(`generating chunks for ${key}`); + const rowKey = binaryKey.toString('base64'); + let remainingBytes = VALUE_SIZE; + let chunkCounter = 0; + while (remainingBytes > 0) { + debugLog(` remaining bytes: ${remainingBytes}`); + const chunk: protos.google.bigtable.v2.ReadRowsResponse.ICellChunk = {}; + if (chunkCounter === 0) { + chunk.rowKey = rowKey; + chunk.familyName = { + value: 'family', + }; + chunk.qualifier = { + value: Buffer.from('qualifier').toString('base64'), + }; + } + const thisChunkSize = Math.min(CHUNK_SIZE, remainingBytes); + remainingBytes -= thisChunkSize; + const value = Buffer.from('a'.repeat(remainingBytes)).toString('base64'); + chunk.value = value; + if (remainingBytes === 0) { + debugLog(` setting commit flag for rowKey ${key}`); + chunk.commitRow = true; + } + chunks.push(chunk); + ++chunkCounter; + } + } + debugLog(`generated ${chunks.length} chunks between ${keyFrom} and ${keyTo}`); + return chunks; +} + +function isKeyInRowSet( + stringKey: string, + rowSet?: protos.google.bigtable.v2.IRowSet | null +): boolean { + if (!rowSet) { + return true; + } + // primitive support for row ranges + if (rowSet.rowRanges || rowSet.rowKeys) { + for (const requestKey of rowSet.rowKeys ?? []) { + if (stringKey === requestKey.toString()) { + return true; + } + } + for (const range of rowSet.rowRanges ?? []) { + let startOk = true; + let endOk = true; + if (range.startKeyOpen && range.startKeyOpen.toString() >= stringKey) { + startOk = false; + } + if (range.startKeyClosed && range.startKeyClosed.toString() > stringKey) { + startOk = false; + } + if (range.endKeyOpen && range.endKeyOpen.toString() <= stringKey) { + endOk = false; + } + if (range.endKeyClosed && range.endKeyClosed.toString() < stringKey) { + endOk = false; + } + if (startOk && endOk) { + return true; + } + } + return false; + } + return true; +} + +// Returns an implementation of the server streaming ReadRows call that would return +// monotonically increasing zero padded rows in the range [keyFrom, keyTo). +// The returned implementation can be passed to gRPC server. +export function readRowsImpl( + keyFrom: number, + keyTo: number, + errorAfterChunkNo?: number +): ( + stream: ServerWritableStream< + protos.google.bigtable.v2.IReadRowsRequest, + protos.google.bigtable.v2.IReadRowsResponse + > +) => Promise { + return async ( + stream: ServerWritableStream< + protos.google.bigtable.v2.IReadRowsRequest, + protos.google.bigtable.v2.IReadRowsResponse + > + ): Promise => { + prettyPrintRequest(stream.request); + + let stopWaiting: () => void = () => {}; + let cancelled = false; + // an asynchronous function to write a response object to stream, reused several times below. + // captures `cancelled` variable + const sendResponse = async ( + response: protos.google.bigtable.v2.IReadRowsResponse + ): Promise => { + return new Promise(resolve => { + setTimeout(async () => { + if (cancelled) { + resolve(); + return; + } + const canSendMore = stream.write(response); + if (response.chunks && response.chunks.length > 0) { + debugLog(`sent ${response.chunks.length} chunks`); + } + if (response.lastScannedRowKey) { + const binaryKey = Buffer.from( + response.lastScannedRowKey as string, + 'base64' + ); + const stringKey = binaryKey.toString(); + debugLog(`sent lastScannedRowKey = ${stringKey}`); + } + if (!canSendMore) { + debugLog('awaiting for back pressure'); + await new Promise(resolve => { + stopWaiting = resolve; + stream.once('drain', resolve); + }); + } + resolve(); + }, 0); + }); + }; + + stream.on('cancelled', () => { + debugLog('gRPC server received cancel()'); + cancelled = true; + stopWaiting(); + stream.emit('error', new Error('Cancelled')); + }); + + let chunksSent = 0; + const chunks = generateChunks(keyFrom, keyTo); + let lastScannedRowKey: string | undefined; + let currentResponseChunks: protos.google.bigtable.v2.ReadRowsResponse.ICellChunk[] = + []; + let chunkIdx = 0; + let skipThisRow = false; + for (const chunk of chunks) { + if (cancelled) { + break; + } + + if (chunk.rowKey) { + const binaryKey = Buffer.from(chunk.rowKey as string, 'base64'); + const stringKey = binaryKey.toString(); + + debugLog(`starting row with key ${stringKey}`); + if (isKeyInRowSet(stringKey, stream.request.rows)) { + skipThisRow = false; + } else { + debugLog( + `skipping row with key ${stringKey} because it's out of requested range or keys` + ); + skipThisRow = true; + lastScannedRowKey = chunk.rowKey as string; + } + } + + if (chunk.commitRow) { + debugLog('commit row'); + } + + if (!skipThisRow) { + currentResponseChunks.push(chunk); + ++chunkIdx; + } + if ( + currentResponseChunks.length === CHUNK_PER_RESPONSE || + chunkIdx === errorAfterChunkNo || + // if we skipped a row and set lastScannedRowKey, dump everything and send a separate message with lastScannedRowKey + lastScannedRowKey + ) { + const response: protos.google.bigtable.v2.IReadRowsResponse = { + chunks: currentResponseChunks, + }; + chunksSent += currentResponseChunks.length; + await sendResponse(response); + currentResponseChunks = []; + if (chunkIdx === errorAfterChunkNo) { + debugLog(`sending error after chunk #${chunkIdx}`); + errorAfterChunkNo = undefined; // do not send error for the second time + const error = new GoogleError('Uh oh'); + error.code = Status.ABORTED; + stream.destroy(error); + cancelled = true; + break; + } + } + if (lastScannedRowKey) { + const response: protos.google.bigtable.v2.IReadRowsResponse = { + lastScannedRowKey, + }; + await sendResponse(response); + lastScannedRowKey = undefined; + } + } + if (!cancelled && currentResponseChunks.length > 0) { + const response: protos.google.bigtable.v2.IReadRowsResponse = { + chunks: currentResponseChunks, + lastScannedRowKey, + }; + chunksSent += currentResponseChunks.length; + await sendResponse(response); + } + debugLog(`in total, sent ${chunksSent} chunks`); + stream.end(); + }; +}