Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
emmazzz committed Dec 8, 2024
1 parent dc79bba commit aa05fe3
Showing 1 changed file with 19 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,10 @@ mod tests {
use sui_field_count::FieldCount;
use sui_types::full_checkpoint_content::CheckpointData;

use crate::{db, pipeline::{concurrent::max_chunk_rows, Processor}};
use crate::{
db,
pipeline::{concurrent::max_chunk_rows, Processor},
};

use super::*;

Expand Down Expand Up @@ -252,6 +255,7 @@ mod tests {

#[async_trait::async_trait]
impl Handler for TestHandler {
const MAX_PENDING_ROWS: usize = 10000;
async fn commit(
_values: &[Self::Value],
_conn: &mut db::Connection<'_>,
Expand Down Expand Up @@ -381,7 +385,7 @@ mod tests {
let metrics = Arc::new(IndexerMetrics::new(&Registry::new()));
let cancel = CancellationToken::new();

let _collector = collector::<TestHandler>(
let collector = collector::<TestHandler>(
CommitterConfig::default(),
None,
processor_rx,
Expand All @@ -404,16 +408,17 @@ mod tests {
drop(processor_tx);

// After a short delay, collector should shut down
tokio::time::sleep(Duration::from_millis(200)).await;
assert!(collector_rx.try_recv().is_err());
let _ = tokio::time::timeout(Duration::from_millis(500), collector)
.await
.expect("collector did not shutdown");

cancel.cancel();
}

#[tokio::test]
async fn test_collector_respects_max_pending() {
let processor_channel_size = 5; // unit is checkpoint
let collector_channel_size = 10; // unit is batch, aka rows / MAX_CHUNK_ROWS
let collector_channel_size = 2; // unit is batch, aka rows / MAX_CHUNK_ROWS
let (processor_tx, processor_rx) = mpsc::channel(processor_channel_size);
let (collector_tx, _collector_rx) = mpsc::channel(collector_channel_size);

Expand All @@ -438,30 +443,29 @@ mod tests {
1000,
vec![
Entry;
// Decreasing this number by even 1 would make the test fail.
TestHandler::MAX_PENDING_ROWS
+ max_chunk_rows::<TestHandler>() * collector_channel_size
],
);
processor_tx.send(data).await.unwrap();

tokio::time::sleep(Duration::from_millis(200)).await;

// Now fill up the processor channel with minimum data to trigger send blocking
for _ in 0..processor_channel_size {
let more_data = Indexed::new(0, 2, 11, 1000, vec![Entry]);
processor_tx.send(more_data).await.unwrap();
}

// Now sending even more data should block.
// Now sending even more data should block because of MAX_PENDING_ROWS limit.
let even_more_data = Indexed::new(0, 3, 12, 1000, vec![Entry]);

let send_result = tokio::time::timeout(
Duration::from_millis(2000),
processor_tx.send(even_more_data),
)
.await;
assert!(
send_result.is_err(),
"Send should timeout due to MAX_PENDING_ROWS limit"
);
let send_result = processor_tx.try_send(even_more_data);
assert!(matches!(
send_result,
Err(mpsc::error::TrySendError::Full(_))
));

cancel.cancel();
}
Expand Down

0 comments on commit aa05fe3

Please sign in to comment.