-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: Merge sync logic to provide fine-grained life-cycle control #762
Conversation
This reverts commit b25685b.
cb11bad
to
9c595c8
Compare
b0824b0
to
3a31c15
Compare
35c2811
to
d57ff58
Compare
.map_err(|e| { | ||
tracing::error!(stream_id, "Failed to stop stream\n{e:?}"); | ||
}); | ||
.context(format!("Failed to stop stream: {stream_id}"))?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Propagate the error up, don't swallow, so we can handle it in Synchroniser
.
@@ -13,6 +13,25 @@ pub struct IndexerConfig { | |||
pub created_at_block_height: u64, | |||
} | |||
|
|||
#[cfg(test)] | |||
impl Default for IndexerConfig { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got sick of continuously writing this in tests - only available in test builds as shouldn't be used in release builds.
@@ -48,14 +54,65 @@ impl IndexerStateManagerImpl { | |||
Self { redis_client } | |||
} | |||
|
|||
pub async fn migrate(&self, registry: &IndexerRegistry) -> anyhow::Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This serves two purposes:
- Migrating from
OldIndexerState
to the newIndexerState
, and - Ensuring all state objects are added to the Redis Set (handled via
set_state()
)
We need 2. to be able to list current IndexerStates
, we'd need to scan
the DB otherwise which is not performant.
This migration could leave us in a bad state as we use the "existence" of the state object to signify it's SynchronisationState
, i.e. new/existing. By using the current registry to update the exisiting states we may end up writing state for an Indexer which was registered but not yet synchronised (i.e. registered while Coordinator was down). We would then incorrectly treat this Indexer as "existing" rather than "new". At this point, this isn't necessarily a problem as the new/existing flows aren't too different, but this is handled in the synchroniser.sync_existing_block_stream()
method which I'll call out.
// FIX `IndexerConfig` does not exist after an Indexer is deleted, and we need a way to | ||
// construct the state key without it. But, this isn't ideal as we now have two places which | ||
// define this key - we need to consolidate these somehow. | ||
pub fn get_state_key(&self) -> String { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As described in the comment, this isn't ideal, I'll need to re-think this but didn't want to bloat the current PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should Config and State share some interface to guarantee certain fields, which can constitute the key? Such as accountID and functionName. By state key, we really mean the key value in the set right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, but we can't do exactly that in Rust.
My current thoughts are to have a common trait, probably RedisKeyProvider
with some default methods provided which generate the keys using account_id
and function_name
. But we can't require fields, we can only require methods, so best we can do is have methods for each of those. Which does still leave room for error but I think is both obvious enough to not be abused, and is better than this :)
By state key, we really mean the key value in the set right?
Yes, that's right
@@ -0,0 +1,1186 @@ | |||
use registry_types::StartBlock; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file is huge, but it's mostly tests.
} | ||
|
||
// FIX if this fails, then subsequent control loops will perpetually fail since the | ||
// above will error with ALREADY_EXISTS |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will handle this in a follow up
if state.block_stream_synced_at.is_none() { | ||
// NOTE: A value of `None` would suggest that `state` was created before initialisation, | ||
// which is currently not possible, but may be in future | ||
tracing::warn!("Existing block stream has no previous sync state, treating as new"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned above with the migration, this is the new/existing case we need to handle. The migration may end up with new indexers being treated as existing, but we'd know if that's the case if block_stream_synced_at
is None
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Super cool work! I think the existing synchronization logic is more readily understood now and feels like block streams and executors are more logically tied together now.
state, | ||
executor.cloned(), | ||
block_stream.cloned(), | ||
)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a return is used in a for loop, does the for loop just continue on instead of breaking and returning? My impression here is that we should be continuing through the loop as we have more work to do. The return here is confusing me a bit regarding that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I didn't even notice this, I'm assuming you are talking about the lack of ;
?
In the context of a for loop, the implicit return value is ignored, so in this case the loop will continue. If I were to add return
it would break the loop.
// FIX `IndexerConfig` does not exist after an Indexer is deleted, and we need a way to | ||
// construct the state key without it. But, this isn't ideal as we now have two places which | ||
// define this key - we need to consolidate these somehow. | ||
pub fn get_state_key(&self) -> String { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should Config and State share some interface to guarantee certain fields, which can constitute the key? Such as accountID and functionName. By state key, we really mean the key value in the set right?
coordinator/src/main.rs
Outdated
@@ -63,27 +72,11 @@ async fn main() -> anyhow::Result<()> { | |||
|
|||
loop { | |||
let indexer_registry = registry.fetch().await?; | |||
indexer_state_manager.migrate(&indexer_registry).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This only needs to be successful once right? Once it is done, we just remove it? I'm assuming you have it in the loop so you can manually resolve any problems while Coordinator is up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, yeah, this doesn't even need to be in the loop there isn't actually any benefit it being there 😅 I'll move it.
coordinator/src/main.rs
Outdated
Ok(()) | ||
} | ||
)?; | ||
tokio::try_join!(synchroniser.sync(), async { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like all sync tasks are designed to succeed and instead just log errors. How do we enable visibility of sync errors beyond manually checking logs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean like Grafana? We'd hook in to those Errors directly. Eventually I think we'd want to short-circuit the sync for that Indexer, store some state, and then act accordingly. But we're not quite there yet.
} | ||
|
||
self.state_manager.delete_state(state).await?; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this where we would call resource clean up? Since Block Streamer is technically responsible for creating the redis block height stream, who is going to delete the redis stream? This would be more important as well when we migrate to storing blocks on redis instead of heights.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Coordinator should delete the stream, just like it does when we republish the Indexer. The Redis stream is created implicitly when it is xadd
ed to, Block Streamer doesn't explicitly create it.
To synchronise Block Streams/Executors we compare the latest registry with the rest of the system. Components are started/stopped as needed, but we lack control over life-cycle events such as registration/deletion. As we build towards Coordinator managing provisioning, we need a way to hook in to these life-cycle events, so we can provision/de-provision accordingly.
This PR refactors the current synchronisation logic to provide greater control over these life-cycle events. Existing Block Stream/Executor sync logic have been merged to to a single
Synchronisation
struct, which now manages new/existing/deleted indexers as a whole, providing the required life-cycle hooks described above. Merging the sync logic allows us to control when Block Streams/Executors are started/stopped, allowing us to do things before and after, specifically, de-/provisioning. I wanted to do this refactor first to make reviewing easier, I'll follow this up with the provisioning changes next.To achieve the above, each Indexer now has it's own
IndexerState
object in Redis. After synchronising a newly registered Indexer we write its state object, and after removing it, we delete the state object. This means we have the following states, allowing us to handle each accordingly:The synchronisation logic is essentially the same, expect that now it is driven off the persistent Redis state, rather than the current Block Stream/Executors state.