Skip to content

Commit

Permalink
feat: Expose health info from Block Stream/Executor RPC (#889)
Browse files Browse the repository at this point in the history
This PR exposes a `health` field on the Block Stream/Executor info,
which can be accessed via RPC. The intent of this field is for
Coordinator to monitor it, and then act accordingly. I wanted to raise
this work first, so that the overall result is not too large.

Essentially, `health` contains only a single `enum` describing the
"state" of the process, but this can be expanded over time as needed.
  • Loading branch information
morgsmccauley authored Jul 18, 2024
1 parent f2cdc78 commit 29bde3c
Show file tree
Hide file tree
Showing 13 changed files with 330 additions and 49 deletions.
22 changes: 22 additions & 0 deletions block-streamer/proto/block_streamer.proto
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,26 @@ message StreamInfo {
string function_name = 4;
// Block height corresponding to the created/updated height of the indexer
uint64 version = 5;
// Contains health information for the Block Stream
Health health = 6;
}

// Contains health information for the Block Stream
message Health {
// The processing state of the block stream
ProcessingState processing_state = 1;
// When the health info was last updated
uint64 updated_at_timestamp_secs = 2;
}

enum ProcessingState {
UNSPECIFIED = 0;
// Not started, or has been stopped
IDLE = 1;
// Running as expected
RUNNING = 2;
// Waiting for some internal condition to be met before continuing
WAITING = 3;
// Stopped due to some unknown error
STALLED = 4;
}
170 changes: 155 additions & 15 deletions block-streamer/src/block_stream.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::cmp::Ordering;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::task::Poll;
use std::time::SystemTime;

use anyhow::Context;
use futures::StreamExt;
Expand Down Expand Up @@ -52,16 +54,43 @@ impl<F: Future> Future for PollCounter<F> {
}

pub struct Task {
handle: JoinHandle<anyhow::Result<()>>,
stream_handle: JoinHandle<anyhow::Result<()>>,
monitor_handle: JoinHandle<()>,
cancellation_token: tokio_util::sync::CancellationToken,
}

/// Represents the processing state of a block stream
#[derive(Clone)]
pub enum ProcessingState {
/// Block Stream is not currently active but can be started. Either has not been started or was
/// stopped.
Idle,

/// Block Stream is actively processing blocks.
Running,

/// Block Stream has been intentionally/internally paused due to some condition, i.e. back pressure on
/// the Redis Stream.
Waiting,

/// Block Stream has stalled due to an error or other condition. Must be manually
/// restarted.
Stalled,
}

#[derive(Clone)]
pub struct BlockStreamHealth {
pub processing_state: ProcessingState,
pub last_updated: SystemTime,
}

pub struct BlockStream {
task: Option<Task>,
pub indexer_config: IndexerConfig,
pub chain_id: ChainId,
pub version: u64,
pub redis_stream: String,
health: Arc<Mutex<BlockStreamHealth>>,
}

impl BlockStream {
Expand All @@ -77,23 +106,107 @@ impl BlockStream {
chain_id,
version,
redis_stream,
health: Arc::new(Mutex::new(BlockStreamHealth {
processing_state: ProcessingState::Idle,
last_updated: SystemTime::now(),
})),
}
}

pub fn start(
&mut self,
pub fn health(&self) -> anyhow::Result<BlockStreamHealth> {
match self.health.lock() {
Ok(health) => Ok(health.clone()),
Err(e) => Err(anyhow::anyhow!("Failed to acquire health lock: {:?}", e)),
}
}

fn start_health_monitoring_task(&self, redis: Arc<RedisClient>) -> JoinHandle<()> {
tokio::spawn({
let config = self.indexer_config.clone();
let health = self.health.clone();
let redis_stream = self.redis_stream.clone();

async move {
let mut last_processed_block =
redis.get_last_processed_block(&config).await.unwrap();

loop {
tokio::time::sleep(std::time::Duration::from_secs(5)).await;

let new_last_processed_block =
if let Ok(block) = redis.get_last_processed_block(&config).await {
block
} else {
tracing::warn!(
account_id = config.account_id.as_str(),
function_name = config.function_name,
"Failed to fetch last processed block"
);
continue;
};

let stream_size = if let Ok(stream_size) =
redis.get_stream_length(redis_stream.clone()).await
{
stream_size
} else {
tracing::warn!(
account_id = config.account_id.as_str(),
function_name = config.function_name,
"Failed to fetch stream size"
);
continue;
};

let mut health_lock = if let Ok(health) = health.lock() {
health
} else {
tracing::warn!(
account_id = config.account_id.as_str(),
function_name = config.function_name,
"Failed to acquire health lock"
);
continue;
};

match new_last_processed_block.cmp(&last_processed_block) {
Ordering::Less => {
tracing::error!(
account_id = config.account_id.as_str(),
function_name = config.function_name,
"Last processed block should not decrease"
);

health_lock.processing_state = ProcessingState::Stalled;
}
Ordering::Equal if stream_size >= Some(MAX_STREAM_SIZE) => {
health_lock.processing_state = ProcessingState::Waiting;
}
Ordering::Equal => {
health_lock.processing_state = ProcessingState::Stalled;
}
Ordering::Greater => {
health_lock.processing_state = ProcessingState::Running;
}
};

health_lock.last_updated = SystemTime::now();

last_processed_block = new_last_processed_block;
}
}
})
}

fn start_block_stream_task(
&self,
start_block_height: near_indexer_primitives::types::BlockHeight,
redis: Arc<RedisClient>,
reciever_blocks_processor: Arc<ReceiverBlocksProcessor>,
lake_s3_client: SharedLakeS3Client,
) -> anyhow::Result<()> {
if self.task.is_some() {
return Err(anyhow::anyhow!("BlockStreamer has already been started",));
}

let cancellation_token = tokio_util::sync::CancellationToken::new();

let handle = tokio::spawn({
cancellation_token: tokio_util::sync::CancellationToken,
) -> JoinHandle<anyhow::Result<()>> {
tokio::spawn({
let cancellation_token = cancellation_token.clone();
let indexer_config = self.indexer_config.clone();
let chain_id = self.chain_id.clone();
Expand Down Expand Up @@ -137,10 +250,35 @@ impl BlockStream {
}
}
}
});
})
}

pub fn start(
&mut self,
start_block_height: near_indexer_primitives::types::BlockHeight,
redis: Arc<RedisClient>,
reciever_blocks_processor: Arc<ReceiverBlocksProcessor>,
lake_s3_client: SharedLakeS3Client,
) -> anyhow::Result<()> {
if self.task.is_some() {
return Err(anyhow::anyhow!("BlockStreamer has already been started",));
}

let cancellation_token = tokio_util::sync::CancellationToken::new();

let monitor_handle = self.start_health_monitoring_task(redis.clone());

let stream_handle = self.start_block_stream_task(
start_block_height,
redis,
reciever_blocks_processor,
lake_s3_client,
cancellation_token.clone(),
);

self.task = Some(Task {
handle,
stream_handle,
monitor_handle,
cancellation_token,
});

Expand All @@ -149,8 +287,9 @@ impl BlockStream {

pub async fn cancel(&mut self) -> anyhow::Result<()> {
if let Some(task) = self.task.take() {
task.monitor_handle.abort();
task.cancellation_token.cancel();
let _ = task.handle.await?;
let _ = task.stream_handle.await?;

// Fails if metric doesn't exist, i.e. task was never polled
let _ = metrics::BLOCK_STREAM_UP
Expand All @@ -167,6 +306,7 @@ impl BlockStream {

#[allow(clippy::too_many_arguments)]
#[tracing::instrument(
name = "block_stream"
skip_all,
fields(
account_id = indexer.account_id.as_str(),
Expand Down
22 changes: 22 additions & 0 deletions block-streamer/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,18 @@ impl RedisCommandsImpl {
Ok(Self { connection })
}

pub async fn get<T>(&self, key: T) -> Result<Option<u64>, RedisError>
where
T: ToRedisArgs + Debug + Send + Sync + 'static,
{
tracing::debug!("GET: {:?}", key);

redis::cmd("GET")
.arg(key)
.query_async(&mut self.connection.clone())
.await
}

pub async fn xadd<T, U>(&self, stream_key: T, fields: &[(String, U)]) -> Result<(), RedisError>
where
T: ToRedisArgs + Debug + Send + Sync + 'static,
Expand Down Expand Up @@ -133,6 +145,16 @@ impl RedisClientImpl {
.context("Failed to set last processed block")
}

pub async fn get_last_processed_block(
&self,
indexer_config: &IndexerConfig,
) -> anyhow::Result<Option<u64>> {
self.commands
.get(indexer_config.last_processed_block_key())
.await
.context("Failed to set last processed block")
}

pub async fn get_stream_length(&self, stream: String) -> anyhow::Result<Option<u64>> {
self.commands.xlen(stream).await
}
Expand Down
52 changes: 47 additions & 5 deletions block-streamer/src/server/block_streamer_service.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::HashMap;
use std::sync::Mutex;
use std::time::SystemTime;

use near_lake_framework::near_indexer_primitives;
use tonic::{Request, Response, Status};
Expand All @@ -21,6 +22,30 @@ pub struct BlockStreamerService {
block_streams: Mutex<HashMap<String, block_stream::BlockStream>>,
}

impl From<block_stream::BlockStreamHealth> for blockstreamer::Health {
fn from(health: block_stream::BlockStreamHealth) -> Self {
blockstreamer::Health {
processing_state: match health.processing_state {
block_stream::ProcessingState::Running => {
blockstreamer::ProcessingState::Running as i32
}
block_stream::ProcessingState::Idle => blockstreamer::ProcessingState::Idle as i32,
block_stream::ProcessingState::Stalled => {
blockstreamer::ProcessingState::Stalled as i32
}
block_stream::ProcessingState::Waiting => {
blockstreamer::ProcessingState::Waiting as i32
}
},
updated_at_timestamp_secs: health
.last_updated
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs(),
}
}
}

impl BlockStreamerService {
pub fn new(
redis: std::sync::Arc<crate::redis::RedisClient>,
Expand Down Expand Up @@ -77,11 +102,17 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic
});

if let Some((stream_id, stream)) = stream_entry {
let stream_health = stream.health().map_err(|err| {
tracing::error!(?err, "Failed to get health of block stream");
Status::internal("Failed to get health of block stream")
})?;

Ok(Response::new(StreamInfo {
stream_id: stream_id.to_string(),
account_id: stream.indexer_config.account_id.to_string(),
function_name: stream.indexer_config.function_name.to_string(),
version: stream.version,
health: Some(stream_health.into()),
}))
} else {
Err(Status::not_found(format!(
Expand Down Expand Up @@ -210,11 +241,22 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic

let block_streams: Vec<StreamInfo> = lock
.values()
.map(|block_stream| StreamInfo {
stream_id: block_stream.indexer_config.get_hash_id(),
account_id: block_stream.indexer_config.account_id.to_string(),
function_name: block_stream.indexer_config.function_name.clone(),
version: block_stream.version,
.map(|block_stream| {
let stream_health = block_stream
.health()
.map_err(|err| {
tracing::error!(?err, "Failed to get health of block stream");
Status::internal("Failed to get health of block stream")
})
.ok();

StreamInfo {
stream_id: block_stream.indexer_config.get_hash_id(),
account_id: block_stream.indexer_config.account_id.to_string(),
function_name: block_stream.indexer_config.function_name.to_string(),
version: block_stream.version,
health: stream_health.map(|health| health.into()),
}
})
.collect();

Expand Down
Loading

0 comments on commit 29bde3c

Please sign in to comment.