-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Create initial Block Streamer service #428
feat: Create initial Block Streamer service #428
Conversation
e6ff42e
to
fb6f0cf
Compare
// TODO do in parallel? | ||
// TODO only list objects without .json extension |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can speed this up by only listing folders/common_prefixes
. Currently, we list all matching objects, and if that object is a file, it just returns itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a tiny bit confused about the application of this function. What kinds of objects are we looking for? By "common_prefixes", do you mean "block." or "shard."? I believe ListObject allows matching the entire prefix against some pattern, using *, ., and ? symbols.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
common_prefixes
is an S3 concept, It is essentially the 'sub-folders'. list_all_objects
will return both 'files' and 'folders', and in the original implementation (which has mostly been carried over to here), we would then call list on each file/folder. Listing a file is pointless as it just returns itself, we only need to list folders - hence the comment.
|
||
#[tokio::main] | ||
async fn main() -> anyhow::Result<()> { | ||
tracing_subscriber::registry() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Starting a random Block Stream just to serve as an integration test. Eventually this will start an endpoint which provides control over the Block Streams.
@@ -0,0 +1,141 @@ | |||
use near_lake_framework::near_indexer_primitives::{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All rules/
code is the same as what we have currently, just copied over to this new crate.
@@ -0,0 +1,79 @@ | |||
pub mod matcher; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, expect this file which differs from the original. indexer_rules_type
has been merged in here. This will eventually be extracted.
57a8d0f
to
9574e54
Compare
contract_pattern, | ||
); | ||
|
||
// TODO Remove all block heights after start_block_height |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is probably a bug - we fetch all block heights within the given day that the specified block height occurs. If the block height is finalised in the middle of the day, we unnecessarily include all block heights prior.
We should probably remove all block heights which are lower than start_block_height
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code comment says all block heights after start. I assume you meant before? Also, is the list of block heights in order? Maybe we can get a slice of the list starting from start_date? Or when we flush to redis, we can filter there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes, I mean before.
The list of block heights is in order. Yeah, we can slice it from the start_block_height
, I just wanted to avoid changing the code too much, we can address in a future PR :).
} | ||
} | ||
|
||
pub async fn get_nearest_block_date( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously used JSON RPC but has been refactored to use S3 to avoid maintaining multiple clients.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lot of code to look through. I believe I understand/see the changes outside of the rule engine, which I need to pour over some more. Some questions, but otherwise looks great! I salute you for your efforts in refactoring coordinator, sir.
}?; | ||
|
||
tracing::debug!( | ||
"Flushing {} block heights from index files to historical Stream for indexer: {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we still calling it a historical stream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, no. We only have a single stream so don't need to distinguish between historical/real-time, I'll update these logs in a future PR
|
||
let (sender, mut stream) = near_lake_framework::streamer(lake_config); | ||
|
||
while let Some(streamer_message) = stream.recv().await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, I believe this is where you now continue "real-time" processing? When a new historical backfill request comes in, will we cancel and restart the task?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, and no. This will eventually become 'real-time', but it continues from where the Databricks blocks finished which can be at most a day behind.
pub fn new(s3_client: T) -> Self { | ||
DeltaLakeClient { | ||
s3_client, | ||
// hardcode to mainnet for |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incomplete comment? Why do we hardcode here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"for now" lol - I fixed it in this PR https://github.com/near/queryapi/pull/430/files#diff-bd02e799d38dbea2e9e4c46adfbcc107693ec11fb4bffbed84d442a7191b88bbR50
"Block {} not found on S3, attempting to fetch next block", | ||
current_block_height | ||
); | ||
current_block_height += 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the retry count here isn't that we're retrying to get the same block but instead retrying to find the closest matching block by trying to get the number 1 over? What scenario would we need to try finding the next block over, up to 20 times?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't remember the exact reason, but protocol can sometimes skip blocks, so they aren't guaranteed to be consecutive. In that case we just find the closest block thereafter.
// TODO do in parallel? | ||
// TODO only list objects without .json extension |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a tiny bit confused about the application of this function. What kinds of objects are we looking for? By "common_prefixes", do you mean "block." or "shard."? I believe ListObject allows matching the entire prefix against some pattern, using *, ., and ? symbols.
.flat_map(|index_file| index_file.heights) | ||
.collect(); | ||
|
||
let pattern_has_multiple_contracts = contract_pattern.chars().any(|c| c == ',' || c == '*'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had a feeling we supported multiple contracts since I've seen code related to it, but I never tried it. I don't think the UI let's you put multiple. Is this a functionality we just have some partial support for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The UI does/should let you input multiple :)
contract_pattern, | ||
); | ||
|
||
// TODO Remove all block heights after start_block_height |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code comment says all block heights after start. I assume you meant before? Also, is the list of block heights in order? Maybe we can get a slice of the list starting from start_date? Or when we flush to redis, we can filter there.
redis::Client::open(redis_connection_str).expect("can create redis client") | ||
} | ||
|
||
pub fn generate_real_time_stream_key(prefix: &str) -> String { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these still used in block-streamer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I left them here in-case, but decided to remove in a future PR https://github.com/near/queryapi/pull/442/files#diff-e09d65b6630cb04bb5d37db13b936ce372b768d6ca0f1eed271d3de305d21003
This PR introduces a new Rust service called Block Streamer. This is essentially a refactored version of Historical Backfill, which does not stop the 'manual filtering' portion of the backfill, continuing indefinitely. Eventually, this service will be able to serve requests, allowing control over these Block Streams. I've outlined the major changes below.
trait S3Client
Low level S3 requests have been abstracted behind a
trait
. This allows us to use thetrait
as an argument, allowing the injection of mock implementations in tests. We no longer need to use real access keys, and can make more concrete assertions.DeltaLakeClient
The majority of the S3 related logic,
queryapi_coordinator/src/s3.rs
included, has been refactored in toDeltaLakeClient
. This client encapsulates all logic relating the interaction/consumption ofnear-delta-lake
S3. Now, only a single method needs to be called fromBlockStream
in order to get the relevant block heights for a given contract_id/pattern. There's been a fair amount of refactor across all the methods themselves, I've added tests to ensure the still behave as expected.BlockStream
The refactored version of Historical Backfill, not much has changed here in this version, the main change is that it now used
DeltaLakeClient
. This will eventually be expanded to provide more control over the Stream.indexer_rules_*
&storage
Currently these exist as separate crates within the
indexer/
workspace. Rather than creating a workspace forblock-streamer
I've added the respective files for each to the single crate. This is probably fine forstorage
, now calledredis
, but givenindexer_rules_*
is also used in the Registry Contract, I'll probably extract this in a later PR, to avoid it being duplicated.