Skip to content

Commit

Permalink
[indexer alt] add collector tests
Browse files Browse the repository at this point in the history
  • Loading branch information
emmazzz committed Nov 25, 2024
1 parent 252c332 commit 24abd0a
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 3 deletions.
155 changes: 152 additions & 3 deletions crates/sui-indexer-alt/src/pipeline/concurrent/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@ struct Pending<H: Handler> {
impl<H: Handler> Pending<H> {
/// Whether there are values left to commit from this indexed checkpoint.
fn is_empty(&self) -> bool {
debug_assert!(self.watermark.batch_rows == 0);
self.values.is_empty()
let empty = self.values.is_empty();
if empty {
debug_assert!(self.watermark.batch_rows == 0);
}
empty
}

/// Adds data from this indexed checkpoint to the `batch`, honoring the handler's bounds on
Expand Down Expand Up @@ -132,7 +135,7 @@ pub(super) fn collector<H: Handler + 'static>(
pipeline = H::NAME,
elapsed_ms = elapsed * 1000.0,
rows = batch.len(),
pending = pending_rows,
pending_rows = pending_rows,
"Gathered batch",
);

Expand Down Expand Up @@ -179,3 +182,149 @@ pub(super) fn collector<H: Handler + 'static>(
}
})
}

#[cfg(test)]
mod tests {
use crate::{db, pipeline::Processor};

use super::*;
use std::time::Duration;
use prometheus::Registry;
use sui_types::full_checkpoint_content::CheckpointData;
use tokio::sync::mpsc;

struct TestHandler;
impl Processor for TestHandler {
type Value = u64;
const NAME: &'static str = "test_handler";
const FANOUT: usize = 1;

fn process(&self, _checkpoint: &Arc<CheckpointData>) -> anyhow::Result<Vec<Self::Value>> {
Ok(vec![])
}
}

#[async_trait::async_trait]
impl Handler for TestHandler {

const MIN_EAGER_ROWS: usize = 3;
const MAX_CHUNK_ROWS: usize = 4;
const MAX_PENDING_ROWS: usize = 10;

async fn commit(_values: &[Self::Value], _conn: &mut db::Connection<'_>)
-> anyhow::Result<usize> {
tokio::time::sleep(Duration::from_millis(1000)).await;
Ok(0)
}
}

#[tokio::test]
async fn test_collector_batches_data() {
let (processor_tx, processor_rx) = mpsc::channel(10);
let (collector_tx, mut collector_rx) = mpsc::channel(10);
let metrics = Arc::new(IndexerMetrics::new(&Registry::new()));
let cancel = CancellationToken::new();

let _collector = collector::<TestHandler>(
PipelineConfig::default(),
processor_rx,
collector_tx,
metrics,
cancel.clone(),
);

// Send test data
let test_data = vec![
Indexed::new(0, 1, 10, 1000, vec![1, 2]),
Indexed::new(0, 2, 20, 2000, vec![3, 4]),
Indexed::new(0, 3, 30, 3000, vec![5, 6]),
];

for data in test_data {
processor_tx.send(data).await.unwrap();
}

let batch1 = collector_rx.recv().await.unwrap();
assert_eq!(batch1.len(), 4);

let batch2 = collector_rx.recv().await.unwrap();
assert_eq!(batch2.len(), 2);

let batch3 = collector_rx.recv().await.unwrap();
assert_eq!(batch3.len(), 0);

cancel.cancel();
}

#[tokio::test]
async fn test_collector_shutdown() {
let (processor_tx, processor_rx) = mpsc::channel(10);
let (collector_tx, mut collector_rx) = mpsc::channel(10);
let metrics = Arc::new(IndexerMetrics::new(&Registry::new()));
let cancel = CancellationToken::new();

let _collector = collector::<TestHandler>(
PipelineConfig::default(),
processor_rx,
collector_tx,
metrics,
cancel.clone(),
);

processor_tx.send(Indexed::new(0, 1, 10, 1000, vec![1, 2])).await.unwrap();

let batch = collector_rx.recv().await.unwrap();
assert_eq!(batch.len(), 2);

// Drop processor sender to simulate shutdown
drop(processor_tx);

// After a short delay, collector should shut down
tokio::time::sleep(Duration::from_millis(200)).await;
assert!(collector_rx.try_recv().is_err());

cancel.cancel();
}

#[tokio::test]
async fn test_collector_respects_max_pending() {
let processor_channel_size = 5; // unit is checkpoint
let collector_channel_size = 10; // unit is batch, aka rows / MAX_CHUNK_ROWS
let (processor_tx, processor_rx) = mpsc::channel(processor_channel_size);
let (collector_tx, _collector_rx) = mpsc::channel(collector_channel_size);

let metrics = Arc::new(IndexerMetrics::new(&Registry::new()));

let cancel = CancellationToken::new();

let _collector = collector::<TestHandler>(
PipelineConfig::default().with_collect_interval(Duration::from_secs(10)),
processor_rx,
collector_tx,
metrics.clone(),
cancel.clone(),
);

// Send more data than MAX_PENDING_ROWS plus collector channel buffer
let data = Indexed::new(0, 1, 10, 1000, vec![1; TestHandler::MAX_PENDING_ROWS + TestHandler::MAX_CHUNK_ROWS * collector_channel_size]);
processor_tx.send(data).await.unwrap();

// Now fill up the processor channel with minimum data to trigger send blocking
for _ in 0..processor_channel_size {
let more_data = Indexed::new(0, 2, 11, 1000, vec![1]);
processor_tx.send(more_data).await.unwrap();
}

// Now sending even more data should block.
let even_more_data = Indexed::new(0, 3, 12, 1000, vec![1]);

let send_result = tokio::time::timeout(
Duration::from_millis(2000),
processor_tx.send(even_more_data)
).await;
assert!(send_result.is_err(), "Send should timeout due to MAX_PENDING_ROWS limit");

cancel.cancel();
}
}

17 changes: 17 additions & 0 deletions crates/sui-indexer-alt/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,23 @@ pub struct PipelineConfig {
pub skip_watermark: bool,
}

impl Default for PipelineConfig {
fn default() -> Self {
Self {
write_concurrency: 5,
collect_interval: Duration::from_millis(500),
watermark_interval: Duration::from_millis(500),
skip_watermark: false,
}
}
}

impl PipelineConfig {
pub fn with_collect_interval(self, interval: Duration) -> Self {
Self { collect_interval: interval, ..self }
}
}

/// Processed values associated with a single checkpoint. This is an internal type used to
/// communicate between the processor and the collector parts of the pipeline.
struct Indexed<P: Processor> {
Expand Down

0 comments on commit 24abd0a

Please sign in to comment.