Skip to content

Commit

Permalink
[indexer alt] add collector tests (MystenLabs#20413)
Browse files Browse the repository at this point in the history
## Description 

Added tests for indexer alt collector and also corrected a potential
debug assert.

## Test plan 

Added tests.

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
emmazzz authored Dec 8, 2024
1 parent c9739e6 commit 47945a9
Show file tree
Hide file tree
Showing 2 changed files with 168 additions and 11 deletions.
175 changes: 164 additions & 11 deletions crates/sui-indexer-alt-framework/src/pipeline/concurrent/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@ struct Pending<H: Handler> {
impl<H: Handler> Pending<H> {
/// Whether there are values left to commit from this indexed checkpoint.
fn is_empty(&self) -> bool {
debug_assert!(self.watermark.batch_rows == 0);
self.values.is_empty()
let empty = self.values.is_empty();
if empty {
debug_assert!(self.watermark.batch_rows == 0);
}
empty
}

/// Adds data from this indexed checkpoint to the `batch`, honoring the handler's bounds on
Expand Down Expand Up @@ -135,7 +138,7 @@ pub(super) fn collector<H: Handler + 'static>(
pipeline = H::NAME,
elapsed_ms = elapsed * 1000.0,
rows = batch.len(),
pending = pending_rows,
pending_rows = pending_rows,
"Gathered batch",
);

Expand Down Expand Up @@ -220,29 +223,44 @@ mod tests {
use sui_field_count::FieldCount;
use sui_types::full_checkpoint_content::CheckpointData;

use crate::{db, pipeline::Processor};
use crate::{
db,
pipeline::{concurrent::max_chunk_rows, Processor},
};

use super::*;

#[derive(FieldCount)]
#[derive(Clone)]
struct Entry;

impl FieldCount for Entry {
// Fake a large number of fields to test max_chunk_rows.
const FIELD_COUNT: usize = 32;
}

use prometheus::Registry;
use std::time::Duration;
use tokio::sync::mpsc;

struct TestHandler;
impl Processor for TestHandler {
type Value = Entry;
const NAME: &'static str = "test";
const NAME: &'static str = "test_handler";
const FANOUT: usize = 1;

fn process(&self, _: &Arc<CheckpointData>) -> anyhow::Result<Vec<Self::Value>> {
fn process(&self, _checkpoint: &Arc<CheckpointData>) -> anyhow::Result<Vec<Self::Value>> {
Ok(vec![])
}
}

#[async_trait::async_trait]
impl Handler for TestHandler {
const MAX_PENDING_ROWS: usize = 1000;
const MIN_EAGER_ROWS: usize = 100;

async fn commit(_: &[Self::Value], _: &mut db::Connection<'_>) -> anyhow::Result<usize> {
const MAX_PENDING_ROWS: usize = 10000;
async fn commit(
_values: &[Self::Value],
_conn: &mut db::Connection<'_>,
) -> anyhow::Result<usize> {
tokio::time::sleep(Duration::from_millis(1000)).await;
Ok(0)
}
}
Expand Down Expand Up @@ -316,4 +334,139 @@ mod tests {
assert_eq!(received.len(), 3);
assert!(pending.is_empty());
}

#[tokio::test]
async fn test_collector_batches_data() {
let (processor_tx, processor_rx) = mpsc::channel(10);
let (collector_tx, mut collector_rx) = mpsc::channel(10);
let metrics = Arc::new(IndexerMetrics::new(&Registry::new()));
let cancel = CancellationToken::new();

let _collector = collector::<TestHandler>(
CommitterConfig::default(),
None,
processor_rx,
collector_tx,
metrics,
cancel.clone(),
);

let max_chunk_rows = max_chunk_rows::<TestHandler>();
let part1_length = max_chunk_rows / 2;
let part2_length = max_chunk_rows - part1_length - 1;

// Send test data
let test_data = vec![
Indexed::new(0, 1, 10, 1000, vec![Entry; part1_length]),
Indexed::new(0, 2, 20, 2000, vec![Entry; part2_length]),
Indexed::new(0, 3, 30, 3000, vec![Entry, Entry]),
];

for data in test_data {
processor_tx.send(data).await.unwrap();
}

let batch1 = collector_rx.recv().await.unwrap();
assert_eq!(batch1.len(), max_chunk_rows);

let batch2 = collector_rx.recv().await.unwrap();
assert_eq!(batch2.len(), 1);

let batch3 = collector_rx.recv().await.unwrap();
assert_eq!(batch3.len(), 0);

cancel.cancel();
}

#[tokio::test]
async fn test_collector_shutdown() {
let (processor_tx, processor_rx) = mpsc::channel(10);
let (collector_tx, mut collector_rx) = mpsc::channel(10);
let metrics = Arc::new(IndexerMetrics::new(&Registry::new()));
let cancel = CancellationToken::new();

let collector = collector::<TestHandler>(
CommitterConfig::default(),
None,
processor_rx,
collector_tx,
metrics,
cancel.clone(),
);

processor_tx
.send(Indexed::new(0, 1, 10, 1000, vec![Entry, Entry]))
.await
.unwrap();

tokio::time::sleep(Duration::from_millis(200)).await;

let batch = collector_rx.recv().await.unwrap();
assert_eq!(batch.len(), 2);

// Drop processor sender to simulate shutdown
drop(processor_tx);

// After a short delay, collector should shut down
let _ = tokio::time::timeout(Duration::from_millis(500), collector)
.await
.expect("collector did not shutdown");

cancel.cancel();
}

#[tokio::test]
async fn test_collector_respects_max_pending() {
let processor_channel_size = 5; // unit is checkpoint
let collector_channel_size = 2; // unit is batch, aka rows / MAX_CHUNK_ROWS
let (processor_tx, processor_rx) = mpsc::channel(processor_channel_size);
let (collector_tx, _collector_rx) = mpsc::channel(collector_channel_size);

let metrics = Arc::new(IndexerMetrics::new(&Registry::new()));

let cancel = CancellationToken::new();

let _collector = collector::<TestHandler>(
CommitterConfig::default(),
None,
processor_rx,
collector_tx,
metrics.clone(),
cancel.clone(),
);

// Send more data than MAX_PENDING_ROWS plus collector channel buffer
let data = Indexed::new(
0,
1,
10,
1000,
vec![
Entry;
// Decreasing this number by even 1 would make the test fail.
TestHandler::MAX_PENDING_ROWS
+ max_chunk_rows::<TestHandler>() * collector_channel_size
],
);
processor_tx.send(data).await.unwrap();

tokio::time::sleep(Duration::from_millis(200)).await;

// Now fill up the processor channel with minimum data to trigger send blocking
for _ in 0..processor_channel_size {
let more_data = Indexed::new(0, 2, 11, 1000, vec![Entry]);
processor_tx.send(more_data).await.unwrap();
}

// Now sending even more data should block because of MAX_PENDING_ROWS limit.
let even_more_data = Indexed::new(0, 3, 12, 1000, vec![Entry]);

let send_result = processor_tx.try_send(even_more_data);
assert!(matches!(
send_result,
Err(mpsc::error::TrySendError::Full(_))
));

cancel.cancel();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -257,5 +257,9 @@ pub(crate) fn pipeline<H: Handler + Send + Sync + 'static>(
}

const fn max_chunk_rows<H: Handler>() -> usize {
// Handle division by zero
if H::Value::FIELD_COUNT == 0 {
return 0;
}
i16::MAX as usize / H::Value::FIELD_COUNT
}

0 comments on commit 47945a9

Please sign in to comment.