forked from googleapis/nodejs-pubsub
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
test: test ReadRows logic with local gRPC server (googleapis#1282)
* test: test ReadRows logic with local gRPC server * test: PR feedback * test: fix race condition in initialization * test: PR feedback, renaming a variable for readability * test: add test for asynchronous end() call * test: only set lastScannedRowKey for completed rows * test: refactor, fix lastScannedRowKey logic, PR feedback * test: rename test suite --------- Co-authored-by: danieljbruce <[email protected]>
- Loading branch information
1 parent
6bda5be
commit 52a6711
Showing
2 changed files
with
629 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,330 @@ | ||
// Copyright 2023 Google LLC | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// https://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
import {before, describe, it} from 'mocha'; | ||
import {Bigtable, Row, Table} from '../src'; | ||
import * as assert from 'assert'; | ||
import {Transform, PassThrough, pipeline} from 'stream'; | ||
|
||
import {GoogleError} from 'google-gax'; | ||
import {MockServer} from '../src/util/mock-servers/mock-server'; | ||
import {BigtableClientMockService} from '../src/util/mock-servers/service-implementations/bigtable-client-mock-service'; | ||
import {MockService} from '../src/util/mock-servers/mock-service'; | ||
import {debugLog, readRowsImpl} from './utils/readRowsImpl'; | ||
|
||
describe('Bigtable/ReadRows', () => { | ||
let server: MockServer; | ||
let service: MockService; | ||
let bigtable: Bigtable; | ||
let table: Table; | ||
|
||
before(async () => { | ||
// make sure we have everything initialized before starting tests | ||
const port = await new Promise<string>(resolve => { | ||
server = new MockServer(resolve); | ||
}); | ||
bigtable = new Bigtable({ | ||
apiEndpoint: `localhost:${port}`, | ||
}); | ||
table = bigtable.instance('fake-instance').table('fake-table'); | ||
service = new BigtableClientMockService(server); | ||
}); | ||
|
||
it('should create read stream and read synchronously', done => { | ||
// 1000 rows must be enough to reproduce issues with losing the data and to create backpressure | ||
const keyFrom = 0; | ||
const keyTo = 1000; | ||
|
||
service.setService({ | ||
ReadRows: readRowsImpl(keyFrom, keyTo), | ||
}); | ||
|
||
let receivedRowCount = 0; | ||
let lastKeyReceived: number | undefined; | ||
|
||
const readStream = table.createReadStream(); | ||
readStream.on('error', (err: GoogleError) => { | ||
done(err); | ||
}); | ||
readStream.on('data', (row: Row) => { | ||
++receivedRowCount; | ||
const key = parseInt(row.id); | ||
if (lastKeyReceived && key <= lastKeyReceived) { | ||
done(new Error('Test error: keys are not in order')); | ||
} | ||
lastKeyReceived = key; | ||
debugLog(`received row key ${key}`); | ||
}); | ||
readStream.on('end', () => { | ||
assert.strictEqual(receivedRowCount, keyTo - keyFrom); | ||
assert.strictEqual(lastKeyReceived, keyTo - 1); | ||
done(); | ||
}); | ||
}); | ||
|
||
it('should create read stream and read synchronously using Transform stream', done => { | ||
// 1000 rows must be enough to reproduce issues with losing the data and to create backpressure | ||
const keyFrom = 0; | ||
const keyTo = 1000; | ||
|
||
service.setService({ | ||
ReadRows: readRowsImpl(keyFrom, keyTo), | ||
}); | ||
|
||
let receivedRowCount = 0; | ||
let lastKeyReceived: number | undefined; | ||
|
||
// BigTable stream | ||
const readStream = table.createReadStream(); | ||
|
||
// Transform stream | ||
const transform = new Transform({ | ||
objectMode: true, | ||
transform: (row, _encoding, callback) => { | ||
callback(null, row); | ||
}, | ||
}); | ||
|
||
// Final stream | ||
const passThrough = new PassThrough({ | ||
objectMode: true, | ||
}); | ||
|
||
passThrough.on('error', (err: GoogleError) => { | ||
done(err); | ||
}); | ||
passThrough.on('data', (row: Row) => { | ||
++receivedRowCount; | ||
const key = parseInt(row.id); | ||
if (lastKeyReceived && key <= lastKeyReceived) { | ||
done(new Error('Test error: keys are not in order')); | ||
} | ||
lastKeyReceived = key; | ||
debugLog(`received row key ${key}`); | ||
}); | ||
passThrough.on('end', () => { | ||
assert.strictEqual(receivedRowCount, keyTo - keyFrom); | ||
assert.strictEqual(lastKeyReceived, keyTo - 1); | ||
done(); | ||
}); | ||
|
||
pipeline(readStream, transform, passThrough, () => {}); | ||
}); | ||
|
||
// TODO(@alexander-fenster): enable after https://github.com/googleapis/nodejs-bigtable/issues/607 is fixed | ||
it.skip('should create read stream and read asynchronously using Transform stream', function (done) { | ||
if (process.platform === 'win32') { | ||
this.timeout(60000); // it runs much slower on Windows! | ||
} | ||
|
||
// 1000 rows must be enough to reproduce issues with losing the data and to create backpressure | ||
const keyFrom = 0; | ||
const keyTo = 1000; | ||
|
||
service.setService({ | ||
ReadRows: readRowsImpl(keyFrom, keyTo), | ||
}); | ||
|
||
let receivedRowCount = 0; | ||
let lastKeyReceived: number | undefined; | ||
|
||
// BigTable stream | ||
const readStream = table.createReadStream(); | ||
|
||
// Transform stream | ||
const transform = new Transform({ | ||
objectMode: true, | ||
transform: (row, _encoding, callback) => { | ||
setTimeout(() => { | ||
callback(null, row); | ||
}, 0); | ||
}, | ||
}); | ||
|
||
// Final stream | ||
const passThrough = new PassThrough({ | ||
objectMode: true, | ||
}); | ||
|
||
passThrough.on('error', (err: GoogleError) => { | ||
done(err); | ||
}); | ||
passThrough.on('data', (row: Row) => { | ||
++receivedRowCount; | ||
const key = parseInt(row.id); | ||
if (lastKeyReceived && key <= lastKeyReceived) { | ||
done(new Error('Test error: keys are not in order')); | ||
} | ||
lastKeyReceived = key; | ||
debugLog(`received row key ${key}`); | ||
}); | ||
passThrough.on('end', () => { | ||
assert.strictEqual(receivedRowCount, keyTo - keyFrom); | ||
assert.strictEqual(lastKeyReceived, keyTo - 1); | ||
done(); | ||
}); | ||
|
||
pipeline(readStream, transform, passThrough, () => {}); | ||
}); | ||
|
||
it('should be able to stop reading from the read stream', done => { | ||
// 1000 rows must be enough to reproduce issues with losing the data and to create backpressure | ||
const keyFrom = 0; | ||
const keyTo = 1000; | ||
// pick any key to stop after | ||
const stopAfter = 42; | ||
|
||
service.setService({ | ||
ReadRows: readRowsImpl(keyFrom, keyTo), | ||
}); | ||
|
||
let receivedRowCount = 0; | ||
let lastKeyReceived: number | undefined; | ||
|
||
const readStream = table.createReadStream({ | ||
// workaround for https://github.com/grpc/grpc-node/issues/2446, remove when fixed | ||
gaxOptions: { | ||
timeout: 3000, | ||
}, | ||
}); | ||
readStream.on('error', (err: GoogleError) => { | ||
done(err); | ||
}); | ||
readStream.on('data', (row: Row) => { | ||
++receivedRowCount; | ||
const key = parseInt(row.id); | ||
if (lastKeyReceived && key <= lastKeyReceived) { | ||
done(new Error('Test error: keys are not in order')); | ||
} | ||
lastKeyReceived = key; | ||
debugLog(`received row key ${key}`); | ||
|
||
if (receivedRowCount === stopAfter) { | ||
debugLog(`requesting to stop after receiving key ${key}`); | ||
readStream.end(); | ||
} | ||
}); | ||
readStream.on('end', () => { | ||
assert.strictEqual(receivedRowCount, stopAfter); | ||
assert.strictEqual(lastKeyReceived, stopAfter - 1); | ||
done(); | ||
}); | ||
}); | ||
|
||
// TODO(@alexander-fenster): enable after it's fixed | ||
it.skip('should be able to stop reading from the read stream when reading asynchronously', function (done) { | ||
if (process.platform === 'win32') { | ||
this.timeout(60000); // it runs much slower on Windows! | ||
} | ||
|
||
// 1000 rows must be enough to reproduce issues with losing the data and to create backpressure | ||
const keyFrom = 0; | ||
const keyTo = 1000; | ||
// pick any key to stop after | ||
const stopAfter = 420; | ||
|
||
service.setService({ | ||
ReadRows: readRowsImpl(keyFrom, keyTo), | ||
}); | ||
|
||
let receivedRowCount = 0; | ||
let lastKeyReceived: number | undefined; | ||
|
||
// BigTable stream | ||
const readStream = table.createReadStream({ | ||
// workaround for https://github.com/grpc/grpc-node/issues/2446, remove when fixed | ||
gaxOptions: { | ||
timeout: 3000, | ||
}, | ||
}); | ||
|
||
// Transform stream | ||
const transform = new Transform({ | ||
objectMode: true, | ||
transform: (row, _encoding, callback) => { | ||
setTimeout(() => { | ||
callback(null, row); | ||
}, 0); | ||
}, | ||
}); | ||
|
||
// Final stream | ||
const passThrough = new PassThrough({ | ||
objectMode: true, | ||
}); | ||
|
||
passThrough.on('error', (err: GoogleError) => { | ||
done(err); | ||
}); | ||
passThrough.on('data', (row: Row) => { | ||
++receivedRowCount; | ||
const key = parseInt(row.id); | ||
if (lastKeyReceived && key <= lastKeyReceived) { | ||
done(new Error('Test error: keys are not in order')); | ||
} | ||
lastKeyReceived = key; | ||
debugLog(`received row key ${key}`); | ||
|
||
if (receivedRowCount === stopAfter) { | ||
debugLog(`requesting to stop after receiving key ${key}`); | ||
readStream.end(); | ||
} | ||
}); | ||
passThrough.on('end', () => { | ||
assert.strictEqual(receivedRowCount, stopAfter); | ||
assert.strictEqual(lastKeyReceived, stopAfter - 1); | ||
done(); | ||
}); | ||
|
||
pipeline(readStream, transform, passThrough, () => {}); | ||
}); | ||
|
||
it('should silently resume after server or network error', done => { | ||
// 1000 rows must be enough to reproduce issues with losing the data and to create backpressure | ||
const keyFrom = 0; | ||
const keyTo = 1000; | ||
// the server will error after sending this chunk (not row) | ||
const errorAfterChunkNo = 423; | ||
|
||
service.setService({ | ||
ReadRows: readRowsImpl(keyFrom, keyTo, errorAfterChunkNo), | ||
}); | ||
|
||
let receivedRowCount = 0; | ||
let lastKeyReceived: number | undefined; | ||
|
||
const readStream = table.createReadStream(); | ||
readStream.on('error', (err: GoogleError) => { | ||
done(err); | ||
}); | ||
readStream.on('data', (row: Row) => { | ||
++receivedRowCount; | ||
const key = parseInt(row.id); | ||
if (lastKeyReceived && key <= lastKeyReceived) { | ||
done(new Error('Test error: keys are not in order')); | ||
} | ||
lastKeyReceived = key; | ||
debugLog(`received row key ${key}`); | ||
}); | ||
readStream.on('end', () => { | ||
assert.strictEqual(receivedRowCount, keyTo - keyFrom); | ||
assert.strictEqual(lastKeyReceived, keyTo - 1); | ||
done(); | ||
}); | ||
}); | ||
|
||
after(async () => { | ||
server.shutdown(() => {}); | ||
}); | ||
}); |
Oops, something went wrong.