-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(bigquery): add better timers around every API call #8626
feat(bigquery): add better timers around every API call #8626
Conversation
b3236fc
to
6a2a3d4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had some trouble reviewing this because there's so much copied / moved code, ended up only skimming thru most of the lineage and usage changes. Looks very good to me, my main concerns around assertions in the perf timer. I think we should avoid erroring due to the perf time and instead note something went wrong in the report, if possible
try: | ||
projects = BigQueryDataDictionary.get_projects(conn) | ||
projects = self.bigquery_data_dictionary.get_projects() | ||
except Exception as e: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might change my mind after seeing the data dictionary, but I think it might make sense to catch this in the data dictionary method rather than here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure if it makes a big difference but I moved it as suggested.
bigquery_audit_metadata_query_template: Callable[ | ||
[ | ||
str, # dataset: str | ||
bool, # use_date_sharded_tables: bool | ||
Optional[int], # limit: Optional[int] = None | ||
], | ||
str, | ||
], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is fine here, but in general would like to avoid passing complex functions because it's kinda ugly and hard to extend
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree
if rate_limiter: | ||
with rate_limiter: | ||
for entry in query_job: | ||
with current_timer.pause_timer(): | ||
yield entry | ||
else: | ||
for entry in query_job: | ||
with current_timer.pause_timer(): | ||
yield entry |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is pretty awkward. Maybe we can set the rate limiter to an empty context manager or put the decision whether to rate limit inside the rate limiter (would prob have to wrap it then)? Not sure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refractored it slightly. I think it looks better now.
if i == 0: | ||
logger.info( | ||
f"Starting log load from GCP Logging for {client.project}" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems unnecessary -- can gain this info from the next log
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removing this
self.report = report | ||
|
||
def get_client(self) -> bigquery.Client: | ||
assert self.bq_client is not None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shouldn't be necessary, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, removing this method altogether.
assert ( | ||
not self.paused and not self.end_time | ||
), "Can not pause a paused/stopped timer" | ||
assert ( | ||
self.start_time is not None | ||
), "Can not pause a timer that hasn't started. Did you forget to start the timer ?" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For safety, since we're not great about wrapping everything with try / excepts, what do you think about a logger warning or error here instead? I would rather complete ingestion with faulty timers than have an uncaught exception. Or perhaps we can store an error state in this object, so it can be seen in the report and we know not to trust the time
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. Let me add warning logs in this case, and also add the error state indicator for reported time in such case.
if self.paused: # Entering paused timer context, NO OP | ||
pass | ||
else: | ||
self.start() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if self.paused: # Entering paused timer context, NO OP | |
pass | |
else: | |
self.start() | |
if not self.paused: | |
self.start() |
if self.paused: | ||
self.paused = False | ||
|
||
def pause_timer(self) -> "PerfTimer": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thoughts on renaming this to just pause()
. Generally it's clear that the object being paused is a timer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense.
time.sleep(1) | ||
assert round(timer.elapsed_seconds()) == 1 | ||
|
||
assert round(timer.elapsed_seconds()) == 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert round(timer.elapsed_seconds()) == 1 | |
assert pytest.approx(timer.elapsed_seconds()) == 1 |
Can do the same for others
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Looks like a legitimate failure on |
@asikowitz the test is fixed. |
Tested with bigquery connector-tests for accuracy.
Primary changes -
BigQueryDataDictionary
toBigQuerySchemaApi
, added timers for the calls in this class.BigQueryAuditLogApi
, added timers for the calls in this class.Refractors:
BigQueryConnectionConfig
class.BigQueryLineageExtractor
.bigquery_v2/queries.py
Functional changes -
Checklist