-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Expose endpoint to control streams #430
Conversation
e1ac0e6
to
5511837
Compare
@@ -0,0 +1,5 @@ | |||
fn main() -> Result<(), Box<dyn std::error::Error>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generates the Rust types from the proto
file on cargo build
@@ -0,0 +1,17 @@ | |||
use tonic::Request; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These examples use the exposed Rust types to interact with the gRPC service - these not only serve as examples, but also make it easy to quickly debug by running them
// pub code: String, | ||
// pub start_block_height: Option<u64>, | ||
// pub schema: Option<String>, | ||
// pub provisioned: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No longer used/needed - this is more for the Runner side of things which will be passed from the Control Plane directly
pub indexer_rule: IndexerRule, | ||
} | ||
|
||
impl IndexerConfig { | ||
pub fn get_full_name(&self) -> String { | ||
format!("{}/{}", self.account_id, self.function_name) | ||
} | ||
|
||
pub fn get_hash_id(&self) -> String { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consistent/deterministic ID to use for the BlockStream so it can be persisted across restarts
tonic::include_proto!("blockstreamer"); | ||
} | ||
|
||
pub use blockstreamer::*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exposes the Rust Client types
|
||
Ok(Response::new(response)) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I should probably add tests here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
#[tokio::main] | ||
async fn main() -> Result<(), Box<dyn std::error::Error>> { | ||
let mut client = BlockStreamerClient::connect("http://[::1]:10000").await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to create a client for each request? Can we just use one client instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a cargo example, a standalone binary which can be executed via cargo --example list_streams
. I created it so we can easily send requests to the server, rather than creating verbose cli commands.
block-streamer/src/block_stream.rs
Outdated
std::cmp::max(last_block_in_index, last_indexed_block) | ||
}); | ||
|
||
tracing::debug!( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. So, here is where we'd end up with duplicated reads since each stream has its own lake stream? I can see why we would need to rethink the caching as you had briefly mentioned in the weekly meeting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep exactly - we'd need to create some shared client so it can cache internally, but that's a future problem :)
block-streamer/src/redis.rs
Outdated
@@ -0,0 +1,142 @@ | |||
pub use redis::{self, aio::ConnectionManager, FromRedisValue, ToRedisArgs}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like I saw these changes already. I don't know if a rebase is needed or something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
9322584
to
2d570f0
Compare
This PR exposes a gRPC endpoint for the Block Streamer service to provide control over individual Block Streams. There are currently 3 methods: start/stop/list. The functionality across these methods is quite basic, I didn't want to spend too much time fleshing them out as I'm still not certain on how exactly they will be used. I expect them to Change once the Control Plane starts using them.