-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Capture errors thrown within Coordinator #515
Conversation
50f43ce
to
ea3a37c
Compare
pub fn connect(block_streamer_url: String) -> anyhow::Result<Self> { | ||
let channel = Channel::from_shared(block_streamer_url) | ||
.context("Block Streamer URL is invalid")? | ||
.connect_lazy(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Defer connection to when calls are made. This allows us to consolidate retries (in the code below) rather than also having retry logic here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. So basically when we make the call, the channel connection AND the call success is within the same retry loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's correct
synchronise_executors(&indexer_registry, &executors_handler), | ||
synchronise_block_streams(&indexer_registry, &redis_client, &block_streams_handler), | ||
async { | ||
sleep(CONTROL_LOOP_THROTTLE_SECONDS).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Forcing a minimum loop duration
exponential_retry(|| async { | ||
let response = self | ||
.client | ||
.clone() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cloning a gRPC/tonic client is cheap and allows us to avoid holding a mutable reference - https://docs.rs/tonic/latest/tonic/transport/channel/struct.Channel.html#multiplexing-requests
ea3a37c
to
6a7c208
Compare
pub fn connect(block_streamer_url: String) -> anyhow::Result<Self> { | ||
let channel = Channel::from_shared(block_streamer_url) | ||
.context("Block Streamer URL is invalid")? | ||
.connect_lazy(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. So basically when we make the call, the channel connection AND the call success is within the same retry loop?
|
||
Ok(response.into_inner().streams) | ||
pub async fn list(&self) -> anyhow::Result<Vec<StreamInfo>> { | ||
exponential_retry(|| async { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does exponential retry have a limit or does it increase forever? Might be good to have an upper limit so that we don't necessarily have to restart coordinator too if the error was say block streamer or runner side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No it will just increase forever, good call, we should cap the delay seconds, i'll do that in a follow up PR.
.clone() | ||
.stop_stream(Request::new(request.clone())) | ||
.await | ||
.map_err(|e| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if a stop fails, is swallowed, and the subsequent start (e.g. an indexer update) is successful? Is there a mechanism in block stream to prevent duplicates? It may be worth skipping with continue if we don't have any duplication mechanisms in place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Block Streamer service will throw an error if we try to start the same stream for a given indexer
Currently, errors thrown within Coordinator V2 will bubble up to
main()
and cause the entire application to exit. This PR captures those errors, handling them accordingly.Errors are handled in either of the following ways:
I expect this behaviour to evolve over time as we learn more about the system, the important thing here is that Coordinator will not crash on errors.