Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a dummy downstairs test for TCP buffer fillup #1069

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 203 additions & 2 deletions upstairs/src/dummy_downstairs_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,9 @@ pub(crate) mod protocol_test {
uuid: Uuid::new_v4(),
read_only: false,

extent_count: 10,
extent_size: Block::new_512(10),
// 8M region
extent_count: 32,
extent_size: Block::new_512(512),

gen_numbers: vec![0u64; 10],
flush_numbers: vec![0u64; 10],
Expand Down Expand Up @@ -632,6 +633,25 @@ pub(crate) mod protocol_test {
}
}

fn make_4m_blank_read_response() -> Vec<crucible_protocol::ReadResponse> {
(0..(7 * 1024 * 1024 / 512))
.map(|i| {
let data = vec![0u8; 512];
let hash = crucible_common::integrity_hash(&[&data]);

crucible_protocol::ReadResponse {
eid: 0,
offset: Block::new_512(i),
data: BytesMut::from(&data[..]),
block_contexts: vec![BlockContext {
hash,
encryption_context: None,
}],
}
})
.collect()
}

/// Filter the first element that matches some predicate out of a list
pub fn filter_out<T, P>(l: &mut Vec<T>, pred: P) -> Option<T>
where
Expand Down Expand Up @@ -2964,4 +2984,185 @@ pub(crate) mod protocol_test {

Ok(())
}

/// Submit a huge amount of IO all at once to ensure that the Upstairs
/// doesn't stall by allowing the TCP buffers to fill up.
#[tokio::test(flavor = "multi_thread")]
async fn test_upstairs_stall() -> Result<()> {
let harness = Arc::new(TestHarness::new().await?);

let (_jh1, mut ds1_messages) =
harness.ds1().await.spawn_message_receiver().await;
let (_jh2, mut ds2_messages) =
harness.ds2.spawn_message_receiver().await;
let (_jh3, mut ds3_messages) =
harness.ds3.spawn_message_receiver().await;

// For each downstairs, spawn a task that will return read responses as
// fast as possible.

let _ds1_task: tokio::task::JoinHandle<Result<()>> = {
let harness = harness.clone();
tokio::spawn(async move {
let upstairs_id = harness.guest.get_uuid().await.unwrap();
let session_id = harness
.ds1()
.await
.upstairs_session_id
.lock()
.await
.unwrap();

loop {
match ds1_messages.recv().await {
Some(m) => match m {
Message::ReadRequest { job_id, .. } => {
let ds1 = harness.ds1().await;
let mut fw = ds1.fw.lock().await;

fw.send(Message::ReadResponse {
upstairs_id,
session_id,
job_id,
responses: Ok(
make_4m_blank_read_response(),
),
})
.await
.unwrap();
}

_ => {
panic!("unexpected message!");
}
},

None => {
// channel closed,
break;
}
}
}

Ok(())
})
};

let _ds2_task: tokio::task::JoinHandle<Result<()>> = {
let harness = harness.clone();
tokio::spawn(async move {
let upstairs_id = harness.guest.get_uuid().await.unwrap();
let session_id = harness
.ds1()
.await
.upstairs_session_id
.lock()
.await
.unwrap();

loop {
match ds2_messages.recv().await {
Some(m) => match m {
Message::ReadRequest { job_id, .. } => {
let mut fw = harness.ds2.fw.lock().await;

fw.send(Message::ReadResponse {
upstairs_id,
session_id,
job_id,
responses: Ok(
make_4m_blank_read_response(),
),
})
.await
.unwrap();
}

_ => {
panic!("unexpected message!");
}
},

None => {
// channel closed,
break;
}
}
}

Ok(())
})
};

let _ds3_task: tokio::task::JoinHandle<Result<()>> = {
let harness = harness.clone();
tokio::spawn(async move {
let upstairs_id = harness.guest.get_uuid().await.unwrap();
let session_id = harness
.ds1()
.await
.upstairs_session_id
.lock()
.await
.unwrap();

loop {
match ds3_messages.recv().await {
Some(m) => match m {
Message::ReadRequest { job_id, .. } => {
let mut fw = harness.ds3.fw.lock().await;

fw.send(Message::ReadResponse {
upstairs_id,
session_id,
job_id,
responses: Ok(
make_4m_blank_read_response(),
),
})
.await
.unwrap();
}

_ => {
panic!("unexpected message!");
}
},

None => {
// channel closed,
break;
}
}
}

Ok(())
})
};

// Then, submit a huge amount of IO all at the same time (by spawning
// tokio tasks) and await it finishing.

const IO_DEPTH: usize = 200;

let futures: Vec<_> = (0..IO_DEPTH)
.map(|_| {
let harness = harness.clone();
tokio::spawn(async move {
let buffer = Buffer::new(7 * 1024 * 1024);
harness
.guest
.read(Block::new_512(0), buffer)
.await
.unwrap();
})
})
.collect();

for future in futures {
future.await.unwrap();
}

Ok(())
}
}
Loading