Skip to content

Commit

Permalink
Merge pull request #1 from googleapis/main
Browse files Browse the repository at this point in the history
test: Make the ReadRows service in tests more modular (googleapis#1462)
  • Loading branch information
kevkim-codes authored Sep 27, 2024
2 parents 433a8e3 + cf6906b commit b3663e8
Show file tree
Hide file tree
Showing 4 changed files with 409 additions and 507 deletions.
216 changes: 125 additions & 91 deletions test/readrows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,50 @@
// limitations under the License.

import {before, describe, it} from 'mocha';
import {Bigtable, protos, Row, Table} from '../src';
import {Bigtable, Row, Table} from '../src';
import * as assert from 'assert';
import {Transform, PassThrough, pipeline} from 'stream';

import {GoogleError} from 'google-gax';
import {MockServer} from '../src/util/mock-servers/mock-server';
import {BigtableClientMockService} from '../src/util/mock-servers/service-implementations/bigtable-client-mock-service';
import {MockService} from '../src/util/mock-servers/mock-service';
import {debugLog, readRowsImpl} from './utils/readRowsImpl';
import {ServerWritableStream, UntypedHandleCall} from '@grpc/grpc-js';
import {readRowsImpl2} from './utils/readRowsImpl2';
import {ReadRowsImpl} from './utils/readRowsImpl';

import {
ReadRowsServiceParameters,
ReadRowsWritableStream,
} from '../test/utils/readRowsServiceParameters';
import * as mocha from 'mocha';

const DEBUG = process.env.BIGTABLE_TEST_DEBUG === 'true';

function debugLog(text: string) {
if (DEBUG) {
console.log(text);
}
}

// Define parameters for a standard Bigtable Mock service
const VALUE_SIZE = 1024 * 1024;
// we want each row to be split into 2 chunks of different sizes
const CHUNK_SIZE = 1023 * 1024 - 1;
const CHUNKS_PER_RESPONSE = 10;
const STANDARD_KEY_FROM = 0;
// 1000 rows must be enough to reproduce issues with losing the data and to create backpressure
const STANDARD_KEY_TO = 1000;
const STANDARD_SERVICE_WITHOUT_ERRORS: ReadRowsServiceParameters = {
keyFrom: STANDARD_KEY_FROM,
keyTo: STANDARD_KEY_TO,
valueSize: VALUE_SIZE,
chunkSize: CHUNK_SIZE,
chunksPerResponse: CHUNKS_PER_RESPONSE,
debugLog,
};

type PromiseVoid = Promise<void>;
interface ServerImplementationInterface {
(
server: ServerWritableStream<
protos.google.bigtable.v2.IReadRowsRequest,
protos.google.bigtable.v2.IReadRowsResponse
>
): PromiseVoid;
(server: ReadRowsWritableStream): PromiseVoid;
}

describe('Bigtable/ReadRows', () => {
Expand All @@ -53,15 +77,21 @@ describe('Bigtable/ReadRows', () => {
service = new BigtableClientMockService(server);
});

it('should create read stream and read synchronously', function (done) {
this.timeout(60000);
// helper function because some tests run slower
// on Windows and need a longer timeout
function setWindowsTestTimeout(test: mocha.Context) {
if (process.platform === 'win32') {
test.timeout(60000); // it runs much slower on Windows!
}
}

// 1000 rows must be enough to reproduce issues with losing the data and to create backpressure
const keyFrom = 0;
const keyTo = 1000;
it('should create read stream and read synchronously', function (done) {
setWindowsTestTimeout(this);

service.setService({
ReadRows: readRowsImpl(keyFrom, keyTo) as any,
ReadRows: ReadRowsImpl.createService(
STANDARD_SERVICE_WITHOUT_ERRORS
) as ServerImplementationInterface,
});

let receivedRowCount = 0;
Expand All @@ -81,19 +111,17 @@ describe('Bigtable/ReadRows', () => {
debugLog(`received row key ${key}`);
});
readStream.on('end', () => {
assert.strictEqual(receivedRowCount, keyTo - keyFrom);
assert.strictEqual(lastKeyReceived, keyTo - 1);
assert.strictEqual(receivedRowCount, STANDARD_KEY_TO - STANDARD_KEY_FROM);
assert.strictEqual(lastKeyReceived, STANDARD_KEY_TO - 1);
done();
});
});

it('should create read stream and read synchronously using Transform stream', done => {
// 1000 rows must be enough to reproduce issues with losing the data and to create backpressure
const keyFrom = 0;
const keyTo = 1000;

service.setService({
ReadRows: readRowsImpl(keyFrom, keyTo) as any,
ReadRows: ReadRowsImpl.createService(
STANDARD_SERVICE_WITHOUT_ERRORS
) as ServerImplementationInterface,
});

let receivedRowCount = 0;
Expand Down Expand Up @@ -128,25 +156,20 @@ describe('Bigtable/ReadRows', () => {
debugLog(`received row key ${key}`);
});
passThrough.on('end', () => {
assert.strictEqual(receivedRowCount, keyTo - keyFrom);
assert.strictEqual(lastKeyReceived, keyTo - 1);
assert.strictEqual(receivedRowCount, STANDARD_KEY_TO - STANDARD_KEY_FROM);
assert.strictEqual(lastKeyReceived, STANDARD_KEY_TO - 1);
done();
});

pipeline(readStream, transform, passThrough, () => {});
});

it('should create read stream and read asynchronously using Transform stream', function (done) {
if (process.platform === 'win32') {
this.timeout(60000); // it runs much slower on Windows!
}

// 1000 rows must be enough to reproduce issues with losing the data and to create backpressure
const keyFrom = 0;
const keyTo = 1000;

setWindowsTestTimeout(this);
service.setService({
ReadRows: readRowsImpl(keyFrom, keyTo) as any,
ReadRows: ReadRowsImpl.createService(
STANDARD_SERVICE_WITHOUT_ERRORS
) as ServerImplementationInterface,
});

let receivedRowCount = 0;
Expand Down Expand Up @@ -183,23 +206,22 @@ describe('Bigtable/ReadRows', () => {
debugLog(`received row key ${key}`);
});
passThrough.on('end', () => {
assert.strictEqual(receivedRowCount, keyTo - keyFrom);
assert.strictEqual(lastKeyReceived, keyTo - 1);
assert.strictEqual(receivedRowCount, STANDARD_KEY_TO - STANDARD_KEY_FROM);
assert.strictEqual(lastKeyReceived, STANDARD_KEY_TO - 1);
done();
});

pipeline(readStream, transform, passThrough, () => {});
});

it('should be able to stop reading from the read stream', done => {
// 1000 rows must be enough to reproduce issues with losing the data and to create backpressure
const keyFrom = 0;
const keyTo = 1000;
// pick any key to stop after
const stopAfter = 42;

service.setService({
ReadRows: readRowsImpl(keyFrom, keyTo) as any,
ReadRows: ReadRowsImpl.createService(
STANDARD_SERVICE_WITHOUT_ERRORS
) as ServerImplementationInterface,
});

let receivedRowCount = 0;
Expand Down Expand Up @@ -232,18 +254,14 @@ describe('Bigtable/ReadRows', () => {

// TODO: enable after https://github.com/googleapis/nodejs-bigtable/issues/1286 is fixed
it('should be able to stop reading from the read stream when reading asynchronously', function (done) {
if (process.platform === 'win32') {
this.timeout(600000); // it runs much slower on Windows!
}

// 1000 rows must be enough to reproduce issues with losing the data and to create backpressure
const keyFrom = 0;
const keyTo = 1000;
setWindowsTestTimeout(this);
// pick any key to stop after
const stopAfter = 420;

service.setService({
ReadRows: readRowsImpl(keyFrom, keyTo) as any,
ReadRows: ReadRowsImpl.createService(
STANDARD_SERVICE_WITHOUT_ERRORS
) as ServerImplementationInterface,
});

let receivedRowCount = 0;
Expand Down Expand Up @@ -294,61 +312,77 @@ describe('Bigtable/ReadRows', () => {
pipeline(readStream, transform, passThrough, () => {});
});

it('should silently resume after server or network error', done => {
// 1000 rows must be enough to reproduce issues with losing the data and to create backpressure
const keyFrom = 0;
const keyTo = 1000;
// the server will error after sending this chunk (not row)
const errorAfterChunkNo = 423;

service.setService({
ReadRows: readRowsImpl(keyFrom, keyTo, errorAfterChunkNo) as any,
});

let receivedRowCount = 0;
let lastKeyReceived: number | undefined;

const readStream = table.createReadStream();
readStream.on('error', (err: GoogleError) => {
done(err);
});
readStream.on('data', (row: Row) => {
++receivedRowCount;
const key = parseInt(row.id);
if (lastKeyReceived && key <= lastKeyReceived) {
done(new Error('Test error: keys are not in order'));
}
lastKeyReceived = key;
debugLog(`received row key ${key}`);
describe('should silently resume after server or network error', () => {
function runTest(done: Mocha.Done, errorAfterChunkNo: number) {
service.setService({
ReadRows: ReadRowsImpl.createService({
keyFrom: STANDARD_KEY_FROM,
keyTo: STANDARD_KEY_TO,
valueSize: VALUE_SIZE,
chunkSize: CHUNK_SIZE,
chunksPerResponse: CHUNKS_PER_RESPONSE,
errorAfterChunkNo,
debugLog,
}) as ServerImplementationInterface,
});
let receivedRowCount = 0;
let lastKeyReceived: number | undefined;

const readStream = table.createReadStream();
readStream.on('error', (err: GoogleError) => {
done(err);
});
readStream.on('data', (row: Row) => {
++receivedRowCount;
const key = parseInt(row.id);
if (lastKeyReceived && key <= lastKeyReceived) {
done(new Error('Test error: keys are not in order'));
}
lastKeyReceived = key;
debugLog(`received row key ${key}`);
});
readStream.on('end', () => {
assert.strictEqual(
receivedRowCount,
STANDARD_KEY_TO - STANDARD_KEY_FROM
);
assert.strictEqual(lastKeyReceived, STANDARD_KEY_TO - 1);
done();
});
}
it('with an error at a fixed position', function (done) {
setWindowsTestTimeout(this);
// Emits an error after enough chunks have been pushed to create back pressure
runTest(done, 423);
});
readStream.on('end', () => {
assert.strictEqual(receivedRowCount, keyTo - keyFrom);
assert.strictEqual(lastKeyReceived, keyTo - 1);
done();
it('with an error at a random position', function (done) {
this.timeout(200000);
// Emits an error after a random number of chunks.
const errorAfterChunkNo = Math.floor(Math.random() * 1000);
runTest(done, errorAfterChunkNo);
});
});

it('should return row data in the right order', done => {
// 150 rows must be enough to reproduce issues with losing the data and to create backpressure
const keyFrom = undefined;
const keyTo = undefined;
// the server will error after sending this chunk (not row)
const errorAfterChunkNo = 100;
it('should return row data in the right order', function (done) {
setWindowsTestTimeout(this);
const dataResults = [];

// TODO: Do not use `any` here, make it a more specific type and address downstream implications on the mock server.
// keyTo and keyFrom are not provided so they will be determined from
// the request that is passed in.
service.setService({
ReadRows: readRowsImpl2(
keyFrom,
keyTo,
errorAfterChunkNo
) as ServerImplementationInterface,
ReadRows: ReadRowsImpl.createService({
errorAfterChunkNo: 100, // the server will error after sending this chunk (not row)
valueSize: 1,
chunkSize: 1,
chunksPerResponse: 1,
debugLog,
}) as ServerImplementationInterface,
});
const sleep = (ms: number) => {
return new Promise(resolve => setTimeout(resolve, ms));
};
(async () => {
try {
// 150 rows must be enough to reproduce issues with losing the data and to create backpressure
const stream = table.createReadStream({
start: '00000000',
end: '00000150',
Expand Down
Loading

0 comments on commit b3663e8

Please sign in to comment.