-
Notifications
You must be signed in to change notification settings - Fork 598
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(subscription): add metrics for cursor #18052
Conversation
f036461
to
5966c6c
Compare
@@ -452,6 +486,7 @@ impl SubscriptionCursor { | |||
} | |||
} | |||
} | |||
self.last_fetch = Instant::now(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we will miss when next
returns early in L449 and L449
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this cursor should be closed after 449 is returned, set last_fetch doesn't seem to make sense
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this cursor should be closed after 449 is returned, set last_fetch doesn't seem to make sense
I think the cursor will not be closed but will be set to invalid state and remain in the cursor map. Actually after walking through the codes again, I found out that if initiate_query
or try_refill_remaining_rows
return an error, cursor state will not be set to invalid so I am thinking whether we should set the state to invalid in next
instead of next_row
to make sure the all errors are fully covered.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I think we can further improve the error handling via one of the following ideas:
- Auto close the cursor if the cursor is in an invalid state.
- Auto retry/recreate the query stream / RPC when meeting errors
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do it in next pr
src/frontend/src/monitor/stats.rs
Outdated
pub valid_subsription_cursor_nums: IntGauge, | ||
pub invalid_subsription_cursor_nums: IntGauge, | ||
pub subscription_cursor_error_count: GenericCounter<AtomicU64>, | ||
pub subscription_cursor_query_duration: HistogramVec, | ||
pub subscription_cursor_declare_duration: HistogramVec, | ||
pub subscription_cursor_fetch_duration: HistogramVec, | ||
pub subscription_cursor_last_fetch_duration: HistogramVec, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is simpler and less error prone if we use Collector
to periodically report the following metrics instead of updating them in cursor manager in an ad-hoc manner:
valid_subsription_cursor_nums
invalid_subsription_cursor_nums
subscription_cursor_last_fetch_duration
You can refer to the StateStoreCollector as an exmaple.
Cursor::Subscription(cursor) => cursor.next(count, handle_args, formats).await, | ||
Cursor::Subscription(cursor) => { | ||
cursor.next(count, handle_args, formats).await.map_err(|e| { | ||
cursor.cursor_metrics.subscription_cursor_error_count.inc(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about updating the metric in fetch_cursor.rs
just like what we did in declare_cursor.rs
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we don't distinguish between cursor
and subscription cursor
until next()
, we can only do it here. So we cannot correctly add to the subscription_cursor_fetch_err
counts in fetch_cursor.rs
let row = self.next_row(&handle_args, formats).await?; | ||
self.cursor_metrics | ||
.subscription_cursor_fetch_duration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The metric here is not measuring the whole fetch duration. How about doing it in fetch_cursor.rs
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may fetch n rows at a time, which in fetch_cursor.rs
statistics may lead to inconsistent metrics and less overhead for methods other than next_row()
@@ -452,6 +486,7 @@ impl SubscriptionCursor { | |||
} | |||
} | |||
} | |||
self.last_fetch = Instant::now(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this cursor should be closed after 449 is returned, set last_fetch doesn't seem to make sense
I think the cursor will not be closed but will be set to invalid state and remain in the cursor map. Actually after walking through the codes again, I found out that if initiate_query
or try_refill_remaining_rows
return an error, cursor state will not be set to invalid so I am thinking whether we should set the state to invalid in next
instead of next_row
to make sure the all errors are fully covered.
@@ -452,6 +486,7 @@ impl SubscriptionCursor { | |||
} | |||
} | |||
} | |||
self.last_fetch = Instant::now(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I think we can further improve the error handling via one of the following ideas:
- Auto close the cursor if the cursor is in an invalid state.
- Auto retry/recreate the query stream / RPC when meeting errors
032c181
to
704e6e4
Compare
704e6e4
to
4cbb0e1
Compare
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
#17992
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.