-
Notifications
You must be signed in to change notification settings - Fork 56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
createReadStream didn't returns all data when stream is pause and resumed #607
Comments
Hello! Here's what I've tried that works as expected: const Transform = require('stream').Transform
const processStream = new Transform({
objectMode: true,
transform(row, enc, next) {
setTimeout(() => {
// Something async
next(null, row)
}, 1000)
},
})
let numRowsReceived = 0
table.createReadStream()
.pipe(processStream)
.on('data', () => numRowsReceived++)
.on('end', () => {
// Verify numRowsReceived is expected number
console.log('All results processed successfully!')
}) The built-in Transform stream seems perfectly suited to what you're after. Let me know if this will work for you. |
Hello What I want is to do the same behavior as the iterator in Java that's why I wrote the mentioned class |
Hello @stephenplusplus, I tried what we told me and I tried multiple ways including (pipe/unpine, cork/uncork and pause/resume) and I still didn't successfully able to pause and resume the stream and get the correct data |
I am facing the same issue with the results being inconsistent. Most of the times the rows retrieved match the expected rows, but critically not always and not for all the streams that are running at the time. I also tried using Thanks |
@stephenplusplus Here is log based on above snippet given by you with timeout of 1 ms. With Emulator : Attempt-2 Attempt-3 With 100k Records Attempt-2 Attempt-3 Without Emulator(Live Data): Attempt-2 Attempt-3 With 100k Records Attempt-2 Attempt-3 It seems like issue with Emulator but working fine with live data. const rowsToInsert = [];
for (let index = 0; index < 50000; index++) {
rowsToInsert.push({
key: `tablet#a0b81f74#${index}`,
data: {
stats_summary: {
name: `name-${index}`,
value: index
}
}
})
} |
We found this behavior today while developing a new system on with BigTable. Using the most recent client, on a Live BigTable instance (our staging instance), when I try and stream a key range with 66,003 rows I have found that the stream stops at 66,000 rows if the consumer is async, but returns all 66,003 rows if its immediate. edit: I should add our version information here
I have been able to replicate with this basic setup: const startTime = Date.now();
let rowCount = 0;
const transformer = new Transform({
objectMode: true,
transform(chunk: any, _encoding, callback) {
rowCount++;
if (process.env.DELAY_TRANSFORM === 'true') {
setTimeout(() => {
callback(null, chunk);
}, 0);
} else {
callback(null, chunk);
}
},
});
const output = new Writable({
objectMode: true,
write(_chunk, _encoding, callback) {
callback();
},
});
const readStream = await eventsTable.createReadStream({
decode: false,
filter: [{ family: 'metadata', column: { cellLimit: 1 } }],
start: `${sessionKey}|`,
end: `${sessionKey}||`,
});
await new Promise<void>((resolve, reject) => {
pipeline(
readStream,
transformer,
output,
err => {
if (err) {
reject(err);
} else {
resolve();
}
},
);
});
const elapsed = Date.now() - startTime;
console.info(`Finished reading ${rowCount} rows in ${elapsed}ms`); When executing with
But with
First, note the missing three rows. Second, the execution takes 10 times longer! I was expecting a bit more time since the stream does a pause for (presumably) back pressure in the transformer, but 10x seems a bit excessive for what is nearly an instance pass-through of each row. But, we can ignore the excessive slow down for now (fortunately this system is not in a performance critical path). What we are having trouble understanding how to resolve is the three missing rows (for this example). Could there be a pause happening after the 66,000 row is received that never gets resumed to finish the stream completely? edit: I ran this same script, but with the I also ran it with the end set to accept up to 20,052 and the stream ends at 20,048. |
Did a bit more testing this morning, when I use a key range that is a nice even number of rows (e.g. 60,000 or 20,000) all rows are returned. More irregular expected row counts (like 60,003, or 20,050) result in missing data. |
I was just forwarded this, and it appears to be the issue we're observing. Sounds like fixed in newer Node (which we are working towards), and may have a work-around from that. I will test this and report back. |
We're doing final validation that the node upgrade resolves this problem over the next two days, but at this point it seems safe to say it is that Node issue and that the only resolution is to upgrade to at least 15.x to pick up the streaming fixes. I will follow back up after we are confident in the resolution. We appreciate the attention this received (on the support side, presumably what lead to the label shifts) while we continued investigation and found the true cause. |
We are working on a fix for in the client library. We have been able to reproduce this issue and are working on fix for this |
@jlogsdon Thank you for the workaround update! As we will likely drop Node 12 (but continue to support Node 14+) we'll still have to address this in the library (as per @igorbernstein2's comment) but it's great to know there's an interim solution for users who are able to upgrade. |
That's good to hear, as it seems like upgrading to Node 15 did not completely resolve this issue. We were able to get our validator shipped and run some testing today and found we're still frequently missing rows from the end of streams (upwards of 50% of the time!). I noticed something while testing this out against one of the specific stream ranges that failed. First, a bit of background about our data: we have an Now, when I create a read stream that filters down to just the Hopefully this context is useful for the team. Looking forward to seeing a potential fix here, but will be looking at alternate avenues on our side in the interim. edit: For anyone unable to upgrade who might be looking for a solution before the client can be fixed, we successfully worked around the streaming issue by building an async generator that fetches a chunk of rows at a time, pushes those out of the generator, and then repeats until the fetch returns a partial or empty chunk. |
We've narrowed down the issue to a race condition in a stream Transform: Line 885 in 436e778
The transform (toRowStream) references the state of previous node in the pipeline (chunktransformer) as a signal to stop itself from emitting elements. Under most circumstances, this works ok. However when the grpc stream is producing elements faster than the consumer is consuming (ie. a Transform that invokes the callback in a setTimeout), this causes the intermediate transforms to start buffering elements. Thus the chunktransformer can be done, while there are still elements in the pipeline. Any elements left in the pipeline will end up getting dropped. We are working on a fix, but the buffering angle makes this a bit difficult:
In one case we need to ignore the buffered data and in the other we need to drain it. I apologize for the inconvenience and will update this ticket when we have an update |
We have a potential fix in this pr: Would it be possible for you to confirm that it fixes your use case? |
Yup! Here's a before test to show the issue still exists on release 4.5.1 (expected 66,003, which were returned with a sync stream):
and then after pulling that PR in:
Doesn't look like it helped :( I pulled that PR in by cloning the repo, running |
Thanks for the quick response. We are having trouble reproducing the issue with the fix applied. Just to double check that we are running the same code. May I ask you to re-run your test, but this time to use a published version of the client? Specifically:
Thanks for helping us work through this |
@jlogsdon friendly ping |
Ah, missed that last reply! I should be able to try again today or tomorrow. |
I was able to do some testing and it looks fixed! I wasn't able to use the same data that I had previously, unfortunately, as it has fallen out of its retention period, but I tested a few other prefixes with anywhere from 2k to 30k rows and always got the correct number of results! |
great to hear! We will cut a release, please let us know if you encounter any other issues |
Hello,
I have an issue regarding createReadStream method in nodejs bigtable client, in may situation I need to have a way to get only one item from the stream and do some process on it then get the next and so on, and I don't want to store all the data in memory, and bigtable client in nodejs didn't provide any thing to do it, I use the createReadStream and pause the stream once I get an Item/Row from the table.
This worked fine for small data, but when I tried to do it on large data about 500k rows I didn't retrieve all the rows that I have in the table for the query I made (the 'end' event is emitted before I get all the result from the table).
My code use class similar to this code, the class is supposed to return promise resolves to the next item from the stream or undefined if the stream is finished
I also try to use once to only listen on data event on time each time I call next() but I still also didn't get all the data that I should get.
I really hope you solve this issue or create another way to get only one item from the bigtable at each step.
Environment details
@google-cloud/bigtable
version: 2.3.1I also use bigtable emulator
Steps to reproduce
Thanks!
The text was updated successfully, but these errors were encountered: