Skip to content

Commit

Permalink
feat: Create initial Block Streamer service (#428)
Browse files Browse the repository at this point in the history
This PR introduces a new Rust service called Block Streamer. This is
essentially a refactored version of Historical Backfill, which does not
stop the 'manual filtering' portion of the backfill, continuing
indefinitely. Eventually, this service will be able to serve requests,
allowing control over these Block Streams. I've outlined the major
changes below.

## `trait S3Client`
Low level S3 requests have been abstracted behind a `trait`. This allows
us to use the `trait` as an argument, allowing the injection of mock
implementations in tests. We no longer need to use real access keys, and
can make more concrete assertions.

## `DeltaLakeClient`
The majority of the S3 related logic, `queryapi_coordinator/src/s3.rs`
included, has been refactored in to `DeltaLakeClient`. This client
encapsulates all logic relating the interaction/consumption of
`near-delta-lake` S3. Now, only a single method needs to be called from
`BlockStream` in order to get the relevant block heights for a given
contract_id/pattern. There's been a fair amount of refactor across all
the methods themselves, I've added tests to ensure the still behave as
expected.

## `BlockStream`
The refactored version of Historical Backfill, not much has changed here
in this version, the main change is that it now used `DeltaLakeClient`.
This will eventually be expanded to provide more control over the
Stream.

## `indexer_rules_*` & `storage`
Currently these exist as separate crates within the `indexer/`
workspace. Rather than creating a workspace for `block-streamer` I've
added the respective files for each to the single crate. This is
probably fine for `storage`, now called `redis`, but given
`indexer_rules_*` is also used in the Registry Contract, I'll probably
extract this in a later PR, to avoid it being duplicated.
  • Loading branch information
morgsmccauley authored Dec 11, 2023
1 parent 98022a1 commit 3a9cfd2
Show file tree
Hide file tree
Showing 15 changed files with 5,683 additions and 0 deletions.
3,693 changes: 3,693 additions & 0 deletions block-streamer/Cargo.lock

Large diffs are not rendered by default.

28 changes: 28 additions & 0 deletions block-streamer/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
[package]
name = "block-streamer"
version = "0.1.0"
edition = "2021"

[dependencies]
anyhow = "1.0.57"
async-trait = "0.1.74"
aws-config = { version = "1.0.0", features = ["behavior-version-latest"]}
aws-smithy-http = "0.60.0"
aws-smithy-types = "1.0.1"
aws-sdk-s3 = "0.39.1"
aws-types = "1.0.0"
borsh = "0.10.2"
chrono = "0.4.25"
futures = "0.3.5"
mockall = "0.11.4"
redis = { version = "0.21.5", features = ["tokio-comp", "connection-manager"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1.0.55"
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
tokio = { version = "1.28.0", features = ["rt-multi-thread"]}
tokio-util = "0.7.10"
tokio-stream = "0.1.14"
wildmatch = "2.1.1"

near-lake-framework = "0.7.1"
201 changes: 201 additions & 0 deletions block-streamer/src/block_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
use crate::indexer_config::IndexerConfig;
use crate::rules::types::indexer_rule_match::ChainId;
use crate::rules::MatchingRule;
use anyhow::{bail, Context};
use near_lake_framework::near_indexer_primitives;
use tokio::task::JoinHandle;

pub struct Task {
handle: JoinHandle<anyhow::Result<()>>,
cancellation_token: tokio_util::sync::CancellationToken,
}

pub struct BlockStream {
task: Option<Task>,
}

impl BlockStream {
pub fn new() -> Self {
Self { task: None }
}

pub fn start(
&mut self,
start_block_height: near_indexer_primitives::types::BlockHeight,
indexer: IndexerConfig,
redis_connection_manager: crate::redis::ConnectionManager,
delta_lake_client: crate::delta_lake_client::DeltaLakeClient<crate::s3_client::S3Client>,
chain_id: ChainId,
) -> anyhow::Result<()> {
if self.task.is_some() {
return Err(anyhow::anyhow!("BlockStreamer has already been started",));
}

let cancellation_token = tokio_util::sync::CancellationToken::new();
let cancellation_token_clone = cancellation_token.clone();

let handle = tokio::spawn(async move {
tokio::select! {
_ = cancellation_token_clone.cancelled() => {
tracing::info!(
"Cancelling existing block stream task for indexer: {}",
indexer.get_full_name(),
);

Ok(())
},
result = start_block_stream(
start_block_height,
indexer.clone(),
&redis_connection_manager,
&delta_lake_client,
&chain_id,
) => {
result.map_err(|err| {
tracing::error!(
"Block stream task for indexer: {} stopped due to error: {:?}",
indexer.get_full_name(),
err,
);
err
})
}
}
});

self.task = Some(Task {
handle,
cancellation_token,
});

Ok(())
}

pub async fn cancel(&mut self) -> anyhow::Result<()> {
if let Some(task) = self.task.take() {
task.cancellation_token.cancel();
let _ = task.handle.await?;

return Ok(());
}

Err(anyhow::anyhow!(
"Attempted to cancel already cancelled, or not started, BlockStreamer"
))
}

pub fn take_handle(&mut self) -> Option<JoinHandle<anyhow::Result<()>>> {
self.task.take().map(|task| task.handle)
}
}

pub(crate) async fn start_block_stream(
start_block_height: near_indexer_primitives::types::BlockHeight,
indexer: IndexerConfig,
redis_connection_manager: &crate::redis::ConnectionManager,
delta_lake_client: &crate::delta_lake_client::DeltaLakeClient<crate::s3_client::S3Client>,
chain_id: &ChainId,
) -> anyhow::Result<()> {
tracing::info!(
"Starting block stream at {start_block_height} for indexer: {}",
indexer.get_full_name(),
);

let latest_block_metadata = delta_lake_client.get_latest_block_metadata().await?;
let last_indexed_block = latest_block_metadata
.last_indexed_block
.parse::<near_indexer_primitives::types::BlockHeight>()?;

let blocks_from_index = match &indexer.indexer_rule.matching_rule {
MatchingRule::ActionAny {
affected_account_id,
..
} => {
tracing::debug!(
"Fetching block heights starting from {} from delta lake for indexer: {}",
start_block_height,
indexer.get_full_name()
);

delta_lake_client
.list_matching_block_heights(start_block_height, affected_account_id)
.await
}
MatchingRule::ActionFunctionCall { .. } => {
bail!("ActionFunctionCall matching rule not yet supported for historical processing, function: {:?} {:?}", indexer.account_id, indexer.function_name);
}
MatchingRule::Event { .. } => {
bail!("Event matching rule not yet supported for historical processing, function {:?} {:?}", indexer.account_id, indexer.function_name);
}
}?;

tracing::debug!(
"Flushing {} block heights from index files to historical Stream for indexer: {}",
blocks_from_index.len(),
indexer.get_full_name(),
);

for block in &blocks_from_index {
crate::redis::xadd(
redis_connection_manager,
// TODO make configurable
crate::redis::generate_historical_stream_key(&indexer.get_full_name()),
&[("block_height", block)],
)
.await
.context("Failed to add block to Redis Stream")?;
}

let mut last_indexed_block =
blocks_from_index
.last()
.map_or(last_indexed_block, |&last_block_in_index| {
// Check for the case where index files are written right after we fetch the last_indexed_block metadata
std::cmp::max(last_block_in_index, last_indexed_block)
});

tracing::debug!(
"Starting near-lake-framework from {last_indexed_block} for indexer: {}",
indexer.get_full_name(),
);

let lake_config = match &chain_id {
ChainId::Mainnet => near_lake_framework::LakeConfigBuilder::default().mainnet(),
ChainId::Testnet => near_lake_framework::LakeConfigBuilder::default().testnet(),
}
.start_block_height(last_indexed_block)
.build()
.context("Failed to build lake config")?;

let (sender, mut stream) = near_lake_framework::streamer(lake_config);

while let Some(streamer_message) = stream.recv().await {
let block_height = streamer_message.block.header.height;
last_indexed_block = block_height;

let matches = crate::rules::reduce_indexer_rule_matches(
&indexer.indexer_rule,
&streamer_message,
chain_id.clone(),
);

if !matches.is_empty() {
crate::redis::xadd(
redis_connection_manager,
crate::redis::generate_historical_stream_key(&indexer.get_full_name()),
&[("block_height", block_height)],
)
.await?;
}
}

drop(sender);

tracing::debug!(
"Stopped block stream at {} for indexer: {}",
last_indexed_block,
indexer.get_full_name(),
);

Ok(())
}
Loading

0 comments on commit 3a9cfd2

Please sign in to comment.