Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Create initial Block Streamer service #428

Merged
merged 40 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
0121e1c
feat: Create new cargo project for block stream
morgsmccauley Nov 21, 2023
56fcd2a
feat: Add `redis` module
morgsmccauley Nov 21, 2023
50b413e
feat: Add existing historical processing logic
morgsmccauley Nov 21, 2023
998c823
feat: Add supporting modules to make things compile
morgsmccauley Nov 21, 2023
694f04d
refactor: Rename historical processing to `BlockStreamer`
morgsmccauley Nov 21, 2023
503a069
refactor: Rename `IndexerFunction` > `IndexerConfig`
morgsmccauley Nov 21, 2023
af9faf7
feat: Use default log target of module path
morgsmccauley Nov 21, 2023
5682f61
fix: Store stream storage under correct redis key
morgsmccauley Nov 21, 2023
fde65a1
refactor: Remove unnecessary logging target from redis mod
morgsmccauley Nov 21, 2023
4c1b947
feat: Start dummy `BlockStreamer` to test
morgsmccauley Nov 21, 2023
781f3f7
refactor: Remove unneeded redis storage
morgsmccauley Nov 21, 2023
e63824f
refactor: Return `handle` rather than awaiting implicitly
morgsmccauley Nov 21, 2023
76641bd
feat: Start `BlockStreamer` from arbitrary block height
morgsmccauley Nov 21, 2023
93facb4
refactor: Rename `process_historical_messages` to `start_block_stream`
morgsmccauley Nov 22, 2023
34e5049
feat: Log and propogate errors in block stream task
morgsmccauley Nov 22, 2023
ae2eab1
feat: Ignore error in block stream when cancelling
morgsmccauley Nov 22, 2023
57e0c4b
refactor: Make decision between index/metadata more clear
morgsmccauley Nov 22, 2023
b0876b3
chore: Update crates to latest version
morgsmccauley Nov 23, 2023
11a07da
feat: Add mockable s3 client
morgsmccauley Nov 23, 2023
d8116ec
feat: Add `DeltaLakeClient` using s3 client mock
morgsmccauley Nov 23, 2023
aed29b4
feat: Fetch latest_block metadata from delta lake
morgsmccauley Nov 23, 2023
38a1d6a
refactor: Use `DeltaLakeClient` to get metadata for block stream
morgsmccauley Nov 23, 2023
3409801
feat: Add `list_objects` to `s3_client`
morgsmccauley Nov 23, 2023
82300c5
feat: Move s3 logic to `DeltaLakeClient`
morgsmccauley Nov 23, 2023
1f652d1
refactor: Use `DeltaLakeClient` to fetch index files
morgsmccauley Nov 23, 2023
95dd389
refactor: Inline use of bucket/prefix
morgsmccauley Nov 23, 2023
3c1838f
refactor: Simplify use of `continuation_token`
morgsmccauley Nov 23, 2023
1287b7c
refactor: Extract S3 listing from `DeltaLakeClient`
morgsmccauley Nov 23, 2023
e48ba77
test: Test `DeltaLakeClient` listing multi/wildcard
morgsmccauley Nov 23, 2023
cb318fc
refactor: Rename functions to add clarity
morgsmccauley Nov 23, 2023
5431708
test: Listing index files for multiple acccounts and wildcard
morgsmccauley Nov 24, 2023
cfdfb7a
refactor: Use consistent naming for contract_id
morgsmccauley Nov 24, 2023
d9100a7
refactor: Return `BlockHeight` instead of raw index files
morgsmccauley Nov 24, 2023
242a997
refactor: Fetch start block date from s3 instead of rpc
morgsmccauley Nov 24, 2023
187d187
chore: Remove unused code
morgsmccauley Nov 24, 2023
a621487
refactor: Make unused methods private
morgsmccauley Nov 24, 2023
9569787
chore: Add more logging
morgsmccauley Nov 24, 2023
9574e54
refactor: Rename `BlockStreamer` to `BlockStream`
morgsmccauley Nov 24, 2023
0531b5f
refactor: Encapsulate block date logic within `DeltaLakeClient`
morgsmccauley Nov 24, 2023
21706bd
chore: Remove unused code
morgsmccauley Nov 26, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3,693 changes: 3,693 additions & 0 deletions block-streamer/Cargo.lock

Large diffs are not rendered by default.

28 changes: 28 additions & 0 deletions block-streamer/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
[package]
name = "block-streamer"
version = "0.1.0"
edition = "2021"

[dependencies]
anyhow = "1.0.57"
async-trait = "0.1.74"
aws-config = { version = "1.0.0", features = ["behavior-version-latest"]}
aws-smithy-http = "0.60.0"
aws-smithy-types = "1.0.1"
aws-sdk-s3 = "0.39.1"
aws-types = "1.0.0"
borsh = "0.10.2"
chrono = "0.4.25"
futures = "0.3.5"
mockall = "0.11.4"
redis = { version = "0.21.5", features = ["tokio-comp", "connection-manager"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1.0.55"
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
tokio = { version = "1.28.0", features = ["rt-multi-thread"]}
tokio-util = "0.7.10"
tokio-stream = "0.1.14"
wildmatch = "2.1.1"

near-lake-framework = "0.7.1"
201 changes: 201 additions & 0 deletions block-streamer/src/block_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
use crate::indexer_config::IndexerConfig;
use crate::rules::types::indexer_rule_match::ChainId;
use crate::rules::MatchingRule;
use anyhow::{bail, Context};
use near_lake_framework::near_indexer_primitives;
use tokio::task::JoinHandle;

pub struct Task {
handle: JoinHandle<anyhow::Result<()>>,
cancellation_token: tokio_util::sync::CancellationToken,
}

pub struct BlockStream {
task: Option<Task>,
}

impl BlockStream {
pub fn new() -> Self {
Self { task: None }
}

pub fn start(
&mut self,
start_block_height: near_indexer_primitives::types::BlockHeight,
indexer: IndexerConfig,
redis_connection_manager: crate::redis::ConnectionManager,
delta_lake_client: crate::delta_lake_client::DeltaLakeClient<crate::s3_client::S3Client>,
chain_id: ChainId,
) -> anyhow::Result<()> {
if self.task.is_some() {
return Err(anyhow::anyhow!("BlockStreamer has already been started",));
}

let cancellation_token = tokio_util::sync::CancellationToken::new();
let cancellation_token_clone = cancellation_token.clone();

let handle = tokio::spawn(async move {
tokio::select! {
_ = cancellation_token_clone.cancelled() => {
tracing::info!(
"Cancelling existing block stream task for indexer: {}",
indexer.get_full_name(),
);

Ok(())
},
result = start_block_stream(
start_block_height,
indexer.clone(),
&redis_connection_manager,
&delta_lake_client,
&chain_id,
) => {
result.map_err(|err| {
tracing::error!(
"Block stream task for indexer: {} stopped due to error: {:?}",
indexer.get_full_name(),
err,
);
err
})
}
}
});

self.task = Some(Task {
handle,
cancellation_token,
});

Ok(())
}

pub async fn cancel(&mut self) -> anyhow::Result<()> {
if let Some(task) = self.task.take() {
task.cancellation_token.cancel();
let _ = task.handle.await?;

return Ok(());
}

Err(anyhow::anyhow!(
"Attempted to cancel already cancelled, or not started, BlockStreamer"
))
}

pub fn take_handle(&mut self) -> Option<JoinHandle<anyhow::Result<()>>> {
self.task.take().map(|task| task.handle)
}
}

pub(crate) async fn start_block_stream(
start_block_height: near_indexer_primitives::types::BlockHeight,
indexer: IndexerConfig,
redis_connection_manager: &crate::redis::ConnectionManager,
delta_lake_client: &crate::delta_lake_client::DeltaLakeClient<crate::s3_client::S3Client>,
chain_id: &ChainId,
) -> anyhow::Result<()> {
tracing::info!(
"Starting block stream at {start_block_height} for indexer: {}",
indexer.get_full_name(),
);

let latest_block_metadata = delta_lake_client.get_latest_block_metadata().await?;
let last_indexed_block = latest_block_metadata
.last_indexed_block
.parse::<near_indexer_primitives::types::BlockHeight>()?;

let blocks_from_index = match &indexer.indexer_rule.matching_rule {
MatchingRule::ActionAny {
affected_account_id,
..
} => {
tracing::debug!(
"Fetching block heights starting from {} from delta lake for indexer: {}",
start_block_height,
indexer.get_full_name()
);

delta_lake_client
.list_matching_block_heights(start_block_height, affected_account_id)
.await
}
MatchingRule::ActionFunctionCall { .. } => {
bail!("ActionFunctionCall matching rule not yet supported for historical processing, function: {:?} {:?}", indexer.account_id, indexer.function_name);
}
MatchingRule::Event { .. } => {
bail!("Event matching rule not yet supported for historical processing, function {:?} {:?}", indexer.account_id, indexer.function_name);
}
}?;

tracing::debug!(
"Flushing {} block heights from index files to historical Stream for indexer: {}",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we still calling it a historical stream?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, no. We only have a single stream so don't need to distinguish between historical/real-time, I'll update these logs in a future PR

blocks_from_index.len(),
indexer.get_full_name(),
);

for block in &blocks_from_index {
crate::redis::xadd(
redis_connection_manager,
// TODO make configurable
crate::redis::generate_historical_stream_key(&indexer.get_full_name()),
&[("block_height", block)],
)
.await
.context("Failed to add block to Redis Stream")?;
}

let mut last_indexed_block =
blocks_from_index
.last()
.map_or(last_indexed_block, |&last_block_in_index| {
// Check for the case where index files are written right after we fetch the last_indexed_block metadata
std::cmp::max(last_block_in_index, last_indexed_block)
});

tracing::debug!(
"Starting near-lake-framework from {last_indexed_block} for indexer: {}",
indexer.get_full_name(),
);

let lake_config = match &chain_id {
ChainId::Mainnet => near_lake_framework::LakeConfigBuilder::default().mainnet(),
ChainId::Testnet => near_lake_framework::LakeConfigBuilder::default().testnet(),
}
.start_block_height(last_indexed_block)
.build()
.context("Failed to build lake config")?;

let (sender, mut stream) = near_lake_framework::streamer(lake_config);

while let Some(streamer_message) = stream.recv().await {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, I believe this is where you now continue "real-time" processing? When a new historical backfill request comes in, will we cancel and restart the task?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and no. This will eventually become 'real-time', but it continues from where the Databricks blocks finished which can be at most a day behind.

let block_height = streamer_message.block.header.height;
last_indexed_block = block_height;

let matches = crate::rules::reduce_indexer_rule_matches(
&indexer.indexer_rule,
&streamer_message,
chain_id.clone(),
);

if !matches.is_empty() {
crate::redis::xadd(
redis_connection_manager,
crate::redis::generate_historical_stream_key(&indexer.get_full_name()),
&[("block_height", block_height)],
)
.await?;
}
}

drop(sender);

tracing::debug!(
"Stopped block stream at {} for indexer: {}",
last_indexed_block,
indexer.get_full_name(),
);

Ok(())
}
Loading