Skip to content

Commit

Permalink
Merge pull request #851 from near/main
Browse files Browse the repository at this point in the history
Prod Release 03/07/24
  • Loading branch information
morgsmccauley authored Jul 3, 2024
2 parents b382782 + a9c25c3 commit 932c277
Show file tree
Hide file tree
Showing 68 changed files with 2,200 additions and 1,592 deletions.
1 change: 1 addition & 0 deletions block-streamer/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion block-streamer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ graphql_client = { version = "0.14.0", features = ["reqwest"] }
lazy_static = "1.4.0"
mockall = "0.11.4"
near-lake-framework = "0.7.8"
pin-project = "1.1.5"
prometheus = "0.13.3"
prost = "0.12.3"
redis = { version = "0.21.5", features = ["tokio-comp", "connection-manager"] }
Expand All @@ -29,7 +30,7 @@ serde_json = "1.0.55"
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
tracing-stackdriver = "0.10.0"
tokio = { version = "1.28.0", features = ["full"]}
tokio = { version = "1.28.0", features = ["full", "test-util"]}
tokio-util = "0.7.10"
tokio-stream = "0.1.14"
tonic = "0.10.2"
Expand Down
149 changes: 96 additions & 53 deletions block-streamer/src/block_stream.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
use std::future::Future;
use std::pin::Pin;
use std::task::Poll;

use anyhow::Context;
use near_lake_framework::near_indexer_primitives;
use tokio::task::JoinHandle;
Expand All @@ -12,6 +16,36 @@ use registry_types::Rule;
const LAKE_PREFETCH_SIZE: usize = 100;
const MAX_STREAM_SIZE_WITH_CACHE: u64 = 100;
const DELTA_LAKE_SKIP_ACCOUNTS: [&str; 4] = ["*", "*.near", "*.kaiching", "*.tg"];
const MAX_STREAM_SIZE: u64 = 100;

#[pin_project::pin_project]
pub struct PollCounter<F> {
#[pin]
inner: F,
indexer_name: String,
}

impl<F> PollCounter<F> {
pub fn new(inner: F, indexer_name: String) -> Self {
Self {
inner,
indexer_name,
}
}
}

impl<F: Future> Future for PollCounter<F> {
type Output = F::Output;

fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
metrics::BLOCK_STREAM_UP
.with_label_values(&[&self.indexer_name])
.inc();

let this = self.project();
this.inner.poll(cx)
}
}

pub struct Task {
handle: JoinHandle<anyhow::Result<()>>,
Expand Down Expand Up @@ -45,7 +79,7 @@ impl BlockStream {
pub fn start(
&mut self,
start_block_height: near_indexer_primitives::types::BlockHeight,
redis_client: std::sync::Arc<crate::redis::RedisClient>,
redis: std::sync::Arc<crate::redis::RedisClient>,
delta_lake_client: std::sync::Arc<crate::delta_lake_client::DeltaLakeClient>,
lake_s3_client: crate::lake_s3_client::SharedLakeS3Client,
) -> anyhow::Result<()> {
Expand All @@ -54,42 +88,49 @@ impl BlockStream {
}

let cancellation_token = tokio_util::sync::CancellationToken::new();
let cancellation_token_clone = cancellation_token.clone();

let indexer_config = self.indexer_config.clone();
let chain_id = self.chain_id.clone();
let redis_stream = self.redis_stream.clone();

let handle = tokio::spawn(async move {
tokio::select! {
_ = cancellation_token_clone.cancelled() => {
tracing::info!(
account_id = indexer_config.account_id.as_str(),
function_name = indexer_config.function_name,
"Cancelling block stream task",
);

Ok(())
},
result = start_block_stream(

let handle = tokio::spawn({
let cancellation_token = cancellation_token.clone();
let indexer_config = self.indexer_config.clone();
let chain_id = self.chain_id.clone();
let redis_stream = self.redis_stream.clone();

async move {
let block_stream_future = start_block_stream(
start_block_height,
&indexer_config,
redis_client,
redis,
delta_lake_client,
lake_s3_client,
&chain_id,
LAKE_PREFETCH_SIZE,
redis_stream
) => {
result.map_err(|err| {
tracing::error!(
redis_stream,
);

let block_stream_future =
PollCounter::new(block_stream_future, indexer_config.get_full_name());

tokio::select! {
_ = cancellation_token.cancelled() => {
tracing::info!(
account_id = indexer_config.account_id.as_str(),
function_name = indexer_config.function_name,
"Block stream task stopped due to error: {:?}",
err,
"Cancelling block stream task",
);
err
})

Ok(())
},
result = block_stream_future => {
result.map_err(|err| {
tracing::error!(
account_id = indexer_config.account_id.as_str(),
function_name = indexer_config.function_name,
"Block stream task stopped due to error: {:?}",
err,
);
err
})
}
}
}
});
Expand All @@ -107,6 +148,10 @@ impl BlockStream {
task.cancellation_token.cancel();
let _ = task.handle.await?;

// Fails if metric doesn't exist, i.e. task was never polled
let _ = metrics::BLOCK_STREAM_UP
.remove_label_values(&[&self.indexer_config.get_full_name()]);

return Ok(());
}

Expand All @@ -129,7 +174,7 @@ impl BlockStream {
pub(crate) async fn start_block_stream(
start_block_height: near_indexer_primitives::types::BlockHeight,
indexer: &IndexerConfig,
redis_client: std::sync::Arc<crate::redis::RedisClient>,
redis: std::sync::Arc<crate::redis::RedisClient>,
delta_lake_client: std::sync::Arc<crate::delta_lake_client::DeltaLakeClient>,
lake_s3_client: crate::lake_s3_client::SharedLakeS3Client,
chain_id: &ChainId,
Expand All @@ -145,7 +190,7 @@ pub(crate) async fn start_block_stream(
let last_indexed_delta_lake_block = process_delta_lake_blocks(
start_block_height,
delta_lake_client,
redis_client.clone(),
redis.clone(),
indexer,
redis_stream.clone(),
)
Expand All @@ -156,7 +201,7 @@ pub(crate) async fn start_block_stream(
last_indexed_delta_lake_block,
lake_s3_client,
lake_prefetch_size,
redis_client,
redis,
indexer,
redis_stream,
chain_id,
Expand All @@ -175,7 +220,7 @@ pub(crate) async fn start_block_stream(
async fn process_delta_lake_blocks(
start_block_height: near_indexer_primitives::types::BlockHeight,
delta_lake_client: std::sync::Arc<crate::delta_lake_client::DeltaLakeClient>,
redis_client: std::sync::Arc<crate::redis::RedisClient>,
redis: std::sync::Arc<crate::redis::RedisClient>,
indexer: &IndexerConfig,
redis_stream: String,
) -> anyhow::Result<u64> {
Expand Down Expand Up @@ -230,10 +275,10 @@ async fn process_delta_lake_blocks(

for block_height in &blocks_from_index {
let block_height = block_height.to_owned();
redis_client
.publish_block(indexer, redis_stream.clone(), block_height)
redis
.publish_block(indexer, redis_stream.clone(), block_height, MAX_STREAM_SIZE)
.await?;
redis_client
redis
.set_last_processed_block(indexer, block_height)
.await?;
}
Expand All @@ -253,7 +298,7 @@ async fn process_near_lake_blocks(
start_block_height: near_indexer_primitives::types::BlockHeight,
lake_s3_client: crate::lake_s3_client::SharedLakeS3Client,
lake_prefetch_size: usize,
redis_client: std::sync::Arc<crate::redis::RedisClient>,
redis: std::sync::Arc<crate::redis::RedisClient>,
indexer: &IndexerConfig,
redis_stream: String,
chain_id: &ChainId,
Expand All @@ -278,7 +323,7 @@ async fn process_near_lake_blocks(
let block_height = streamer_message.block.header.height;
last_indexed_block = block_height;

redis_client
redis
.set_last_processed_block(indexer, block_height)
.await?;

Expand All @@ -289,18 +334,14 @@ async fn process_near_lake_blocks(
);

if !matches.is_empty() {
if let Ok(Some(stream_length)) =
redis_client.get_stream_length(redis_stream.clone()).await
{
if let Ok(Some(stream_length)) = redis.get_stream_length(redis_stream.clone()).await {
if stream_length <= MAX_STREAM_SIZE_WITH_CACHE {
redis_client
.cache_streamer_message(&streamer_message)
.await?;
redis.cache_streamer_message(&streamer_message).await?;
}
}

redis_client
.publish_block(indexer, redis_stream.clone(), block_height)
redis
.publish_block(indexer, redis_stream.clone(), block_height, MAX_STREAM_SIZE)
.await?;
}
}
Expand Down Expand Up @@ -355,29 +396,30 @@ mod tests {
.expect_list_matching_block_heights()
.returning(|_, _| Ok(vec![107503702, 107503703]));

let mut mock_redis_client = crate::redis::RedisClient::default();
mock_redis_client
let mut mock_redis = crate::redis::RedisClient::default();
mock_redis
.expect_publish_block()
.with(
predicate::always(),
predicate::eq("stream key".to_string()),
predicate::in_iter([107503702, 107503703, 107503705]),
predicate::always(),
)
.returning(|_, _, _| Ok(()))
.returning(|_, _, _, _| Ok(()))
.times(3);
mock_redis_client
mock_redis
.expect_set_last_processed_block()
.with(
predicate::always(),
predicate::in_iter([107503702, 107503703, 107503704, 107503705]),
)
.returning(|_, _| Ok(()))
.times(4);
mock_redis_client
mock_redis
.expect_cache_streamer_message()
.with(predicate::always())
.returning(|_| Ok(()));
mock_redis_client
mock_redis
.expect_get_stream_length()
.with(predicate::eq("stream key".to_string()))
.returning(|_| Ok(Some(10)));
Expand All @@ -397,7 +439,7 @@ mod tests {
start_block_stream(
91940840,
&indexer_config,
std::sync::Arc::new(mock_redis_client),
std::sync::Arc::new(mock_redis),
std::sync::Arc::new(mock_delta_lake_client),
mock_lake_s3_client,
&ChainId::Mainnet,
Expand Down Expand Up @@ -435,8 +477,9 @@ mod tests {
predicate::always(),
predicate::eq("stream key".to_string()),
predicate::in_iter([107503705]),
predicate::always(),
)
.returning(|_, _, _| Ok(()))
.returning(|_, _, _, _| Ok(()))
.times(1);
mock_redis_client
.expect_set_last_processed_block()
Expand Down
4 changes: 2 additions & 2 deletions block-streamer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ async fn main() -> anyhow::Result<()> {
"Starting Block Streamer"
);

let redis_client = std::sync::Arc::new(redis::RedisClient::connect(&redis_url).await?);
let redis = std::sync::Arc::new(redis::RedisClient::connect(&redis_url).await?);

let aws_config = aws_config::from_env().load().await;
let s3_config = aws_sdk_s3::Config::from(&aws_config);
Expand All @@ -58,7 +58,7 @@ async fn main() -> anyhow::Result<()> {

tokio::spawn(metrics::init_server(metrics_port).expect("Failed to start metrics server"));

server::init(&grpc_port, redis_client, delta_lake_client, lake_s3_client).await?;
server::init(&grpc_port, redis, delta_lake_client, lake_s3_client).await?;

Ok(())
}
6 changes: 6 additions & 0 deletions block-streamer/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ lazy_static! {
&["level"]
)
.unwrap();
pub static ref BLOCK_STREAM_UP: IntCounterVec = register_int_counter_vec!(
"queryapi_block_streamer_block_stream_up",
"A continuously increasing counter to indicate the block stream is up",
&["indexer"]
)
.unwrap();
}

pub struct LogCounter;
Expand Down
Loading

0 comments on commit 932c277

Please sign in to comment.