-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
BigQuery: to_dataframe
respects progress_bar_type
when used with BQ Storage API
#7697
BigQuery: to_dataframe
respects progress_bar_type
when used with BQ Storage API
#7697
Conversation
KeyboardInterrupt
during to_dataframe
(with BQ Storage API) no longer hangsto_dataframe
respects progress_bar_type
with BQ Storage API
0d96dbd
to
9812563
Compare
ce0a302
to
6ed43f0
Compare
to_dataframe
respects progress_bar_type
with BQ Storage APIto_dataframe
respects progress_bar_type
when used with BQ Storage API
6ed43f0
to
b82eb6d
Compare
@@ -1274,6 +1275,16 @@ def __repr__(self): | |||
return "Row({}, {})".format(self._xxx_values, f2i) | |||
|
|||
|
|||
class _FakeQueue(object): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be more closely named according to it's use?
@@ -1408,7 +1426,23 @@ def _to_dataframe_bqstorage_stream( | |||
# the end using manually-parsed schema. | |||
return pandas.concat(frames)[columns] | |||
|
|||
def _to_dataframe_bqstorage(self, bqstorage_client, dtypes): | |||
def _process_progress_updates(self, progress_queue, progress_bar): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this work well for large tables as the number of updates grows, since you're potentially walking it every _PROGRESS_INTERVAL seconds?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I decreased _PROGRESS_INTERVAL
to account for the fact that we get way too many updates in a second, but you're right that with large tables (many streams) this becomes worse.
Originally I tried having a constant loop of updates for tqdm
but there were some locking issues with writing to stderr/stdout outside of the main thread.
Right now I handle this by very likely dropping updates when the queue fills up, meaning the progress bar is grossly inaccurate for large tables. Maybe what I should do instead is have 2 queues. 🤔 One queue used by the workers and a "sum" thread to add up and send to main thread every _PROGRESS_INTERVAL
via a different queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose the problem is also bounded by the max dataframe users can fit in ram. Our truly large tables aren't really going to be pulled into dataframes without excessive projection or filtering. Perhaps this is a non issue for now.
* Add unit test for progress bar. * Add test for full queue.
The worker queue runs in a background thread, so it's more likely to be able to keep up with the other workers that are adding to the worker queue.
b82eb6d
to
71112b0
Compare
When the BigQuery Storage API was used in
to_dataframe
, theprogress_bar_type
argument was ignored. This PR fixes that by creating a concurrent queue that worker threads can send progress updates to.A fake queue is created for the case with no progress bar to prevent filling a queue but never reading from it.
This fix depends on:
Add page iterator to ReadRowsStream #7680 addsReleased in.pages
to BQ Storage. Must be merged and released.google-cloud-bigquery-storage
version 0.4.0!BigQuery: ensure thatKeyboardInterrupt
duringto_dataframe
no longer hangs. #7698 adds loop over.pages
to fix issue withKeyboardInterrupt
(discovered while working on this progress bar feature).Closes #7654