Skip to content

Commit

Permalink
Buffer should destructure to Vec when single-referenced
Browse files Browse the repository at this point in the history
Until now, Buffer offered no means of extracting the internal `Vec<u8>`
when, say, a Volume::read() operation had completed.  This made it
impossible for Crucible consumers to reuse the `Vec` buffer, forcing an
otherwise unnecessary allocation.
  • Loading branch information
pfmooney authored Dec 22, 2023
1 parent 7c1281b commit fc7a1ad
Show file tree
Hide file tree
Showing 9 changed files with 311 additions and 320 deletions.
6 changes: 3 additions & 3 deletions crudd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ async fn cmd_read<T: BlockIO>(
// So say we have an offset of 5. we're misaligned by 5 bytes, so we
// read 5 bytes we don't need. we skip those 5 bytes then write
// the rest to the output
let bytes = buffer.as_vec().await;
let bytes = buffer.into_vec().unwrap();
output.write_all(
&bytes[offset_misalignment as usize
..(offset_misalignment + alignment_bytes) as usize],
Expand Down Expand Up @@ -314,7 +314,7 @@ async fn write_remainder_and_finalize<'a, T: BlockIO>(
crucible.read(uflow_offset, uflow_r_buf.clone()).await?;

// Copy it into w_buf
let r_bytes = uflow_r_buf.as_vec().await;
let r_bytes = uflow_r_buf.into_vec().unwrap();
w_buf[n_read..n_read + uflow_backfill]
.copy_from_slice(&r_bytes[uflow_remainder as usize..]);

Expand Down Expand Up @@ -400,7 +400,7 @@ async fn cmd_write<T: BlockIO>(
let offset = Block::new(block_idx, native_block_size.trailing_zeros());
crucible.read(offset, buffer.clone()).await?;

let mut w_vec = buffer.as_vec().await.clone();
let mut w_vec = buffer.into_vec().unwrap();
// Write our data into the buffer
let bytes_read = input.read(
&mut w_vec[offset_misalignment as usize
Expand Down
5 changes: 2 additions & 3 deletions crutest/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,13 +224,12 @@ async fn cli_read(
let offset = Block::new(block_index as u64, ri.block_size.trailing_zeros());
let length: usize = size * ri.block_size as usize;

let vec: Vec<u8> = vec![255; length];
let data = crucible::Buffer::from_vec(vec);
let data = crucible::Buffer::from_vec(vec![255; length]);

println!("Read at block {:5}, len:{:7}", offset.value, data.len());
guest.read(offset, data.clone()).await?;

let mut dl = data.as_vec().await.to_vec();
let mut dl = data.into_vec().unwrap();
match validate_vec(
dl.clone(),
block_index,
Expand Down
49 changes: 20 additions & 29 deletions crutest/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1121,11 +1121,10 @@ async fn verify_volume(
};

let length: usize = next_io_blocks * ri.block_size as usize;
let vec: Vec<u8> = vec![255; length];
let data = crucible::Buffer::from_vec(vec);
let data = crucible::Buffer::from_vec(vec![255; length]);
guest.read(offset, data.clone()).await?;

let dl = data.as_vec().await.to_vec();
let dl = data.into_vec().unwrap();
match validate_vec(
dl,
block_index,
Expand Down Expand Up @@ -1361,11 +1360,10 @@ async fn balloon_workload(
guest.flush(None).await?;

let length: usize = size * ri.block_size as usize;
let vec: Vec<u8> = vec![255; length];
let data = crucible::Buffer::from_vec(vec);
let data = crucible::Buffer::from_vec(vec![255; length]);
guest.read(offset, data.clone()).await?;

let dl = data.as_vec().await.to_vec();
let dl = data.into_vec().unwrap();
match validate_vec(
dl,
block_index,
Expand Down Expand Up @@ -1579,8 +1577,7 @@ async fn generic_workload(
} else {
// Read (+ verify)
let length: usize = size * ri.block_size as usize;
let vec: Vec<u8> = vec![255; length];
let data = crucible::Buffer::from_vec(vec);
let data = crucible::Buffer::from_vec(vec![255; length]);
if !quiet {
match wtq {
WhenToQuit::Count { count } => {
Expand All @@ -1605,7 +1602,7 @@ async fn generic_workload(
}
guest.read(offset, data.clone()).await?;

let dl = data.as_vec().await.to_vec();
let dl = data.into_vec().unwrap();
match validate_vec(
dl,
block_index,
Expand Down Expand Up @@ -2217,13 +2214,12 @@ async fn one_workload(guest: &Arc<Guest>, ri: &mut RegionInfo) -> Result<()> {
guest.write(offset, data).await?;

let length: usize = size * ri.block_size as usize;
let vec: Vec<u8> = vec![255; length];
let data = crucible::Buffer::from_vec(vec);
let data = crucible::Buffer::from_vec(vec![255; length]);

println!("Read at block {:5}, len:{:7}", offset.value, data.len());
guest.read(offset, data.clone()).await?;

let dl = data.as_vec().await.to_vec();
let dl = data.into_vec().unwrap();
match validate_vec(dl, block_index, &mut ri.write_log, ri.block_size, false)
{
ValidateStatus::Bad | ValidateStatus::InRange => {
Expand Down Expand Up @@ -2372,11 +2368,10 @@ async fn write_flush_read_workload(
guest.flush(None).await?;

let length: usize = size * ri.block_size as usize;
let vec: Vec<u8> = vec![255; length];
let data = crucible::Buffer::from_vec(vec);
let data = crucible::Buffer::from_vec(vec![255; length]);
guest.read(offset, data.clone()).await?;

let dl = data.as_vec().await.to_vec();
let dl = data.into_vec().unwrap();
match validate_vec(
dl,
block_index,
Expand Down Expand Up @@ -2534,8 +2529,7 @@ async fn repair_workload(
} else {
// Read
let length: usize = size * ri.block_size as usize;
let vec: Vec<u8> = vec![255; length];
let data = crucible::Buffer::from_vec(vec);
let data = crucible::Buffer::from_vec(vec![255; length]);
println!(
"{:>0width$}/{:>0width$} Read \
block {:>bw$} len {:>sw$}",
Expand All @@ -2547,7 +2541,7 @@ async fn repair_workload(
bw = block_width,
sw = size_width,
);
guest.read(offset, data.clone()).await?;
guest.read(offset, data).await?;
}
}
}
Expand Down Expand Up @@ -2615,10 +2609,9 @@ async fn demo_workload(
} else {
// Read
let length: usize = size * ri.block_size as usize;
let vec: Vec<u8> = vec![255; length];
let data = crucible::Buffer::from_vec(vec);
let data = crucible::Buffer::from_vec(vec![255; length]);

let future = guest.read(offset, data.clone());
let future = guest.read(offset, data);
futureslist.push(future);
}
}
Expand Down Expand Up @@ -2677,13 +2670,12 @@ async fn span_workload(guest: &Arc<Guest>, ri: &mut RegionInfo) -> Result<()> {
guest.flush(None).await?;

let length: usize = 2 * ri.block_size as usize;
let vec: Vec<u8> = vec![99; length];
let data = crucible::Buffer::from_vec(vec);
let data = crucible::Buffer::from_vec(vec![99; length]);

println!("Sending a read spanning two extents");
guest.read(offset, data.clone()).await?;

let dl = data.as_vec().await.to_vec();
let dl = data.into_vec().unwrap();
match validate_vec(dl, block_index, &mut ri.write_log, ri.block_size, false)
{
ValidateStatus::Bad | ValidateStatus::InRange => {
Expand Down Expand Up @@ -2718,11 +2710,10 @@ async fn big_workload(guest: &Arc<Guest>, ri: &mut RegionInfo) -> Result<()> {
guest.flush(None).await?;

let length: usize = ri.block_size as usize;
let vec: Vec<u8> = vec![255; length];
let data = crucible::Buffer::from_vec(vec);
let data = crucible::Buffer::from_vec(vec![255; length]);
guest.read(offset, data.clone()).await?;

let dl = data.as_vec().await.to_vec();
let dl = data.into_vec().unwrap();
match validate_vec(
dl,
block_index,
Expand Down Expand Up @@ -2848,8 +2839,8 @@ async fn dep_workload(guest: &Arc<Guest>, ri: &mut RegionInfo) -> Result<()> {
let future = guest.write_to_byte_offset(my_offset, data);
futureslist.push(future);
} else {
let vec: Vec<u8> = vec![0; ri.block_size as usize];
let data = crucible::Buffer::from_vec(vec);
let data =
crucible::Buffer::from_vec(vec![0; ri.block_size as usize]);

println!(
"Loop:{} send read {} @ offset:{} len:{}",
Expand Down
5 changes: 2 additions & 3 deletions pantry/src/pantry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,7 @@ impl PantryEntry {
.read_from_byte_offset(offset, buffer.clone())
.await?;

let response = buffer.as_vec().await;
Ok(response.clone())
Ok(buffer.into_vec().unwrap())
}

pub async fn scrub(&self) -> Result<(), CrucibleError> {
Expand Down Expand Up @@ -341,7 +340,7 @@ impl PantryEntry {
.read_from_byte_offset(start, data.clone())
.await?;

hasher.update(&*data.as_vec().await);
hasher.update(&data.into_vec().unwrap())
}

let digest = hex::encode(hasher.finalize());
Expand Down
79 changes: 25 additions & 54 deletions upstairs/src/block_req.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,41 +11,34 @@ use tokio::sync::oneshot;
#[derive(Debug)]
pub(crate) struct BlockReq {
pub op: BlockOp,
sender: oneshot::Sender<Result<(), CrucibleError>>,
pub res: BlockRes,
}

impl BlockReq {
pub fn new(
op: BlockOp,
sender: oneshot::Sender<Result<(), CrucibleError>>,
) -> BlockReq {
Self { op, sender }
}

/// Return a copy of the block op
pub fn op(&self) -> BlockOp {
self.op.clone()
}

/// Consume this BlockReq and send Ok to the receiver
#[must_use]
#[derive(Debug)]
pub(crate) struct BlockRes(Option<oneshot::Sender<Result<(), CrucibleError>>>);
impl BlockRes {
/// Consume this BlockRes and send Ok to the receiver
pub fn send_ok(self) {
self.send_result(Ok(()))
}

/// Consume this BlockReq and send an Err to the receiver
/// Consume this BlockRes and send an Err to the receiver
pub fn send_err(self, e: CrucibleError) {
self.send_result(Err(e))
}

/// Consume this BlockReq and send a Result to the receiver
pub fn send_result(self, r: Result<(), CrucibleError>) {
/// Consume this BlockRes and send a Result to the receiver
fn send_result(mut self, r: Result<(), CrucibleError>) {
// XXX this eats the result!
let _ = self.sender.send(r);
let _ = self.0.take().expect("sender was populated").send(r);
}

/// Consume this BlockReq and return the inner oneshot sender
pub fn take_sender(self) -> oneshot::Sender<Result<(), CrucibleError>> {
self.sender
}
impl Drop for BlockRes {
fn drop(&mut self) {
// Dropping a BlockRes without issuing a completion would mean the
// associated waiter would be stuck waiting forever for a result.
assert!(self.0.is_none(), "result should be sent for BlockRes");
}
}

Expand All @@ -61,10 +54,10 @@ pub(crate) struct BlockReqWaiter {
}

impl BlockReqWaiter {
pub fn new(
recv: oneshot::Receiver<Result<(), CrucibleError>>,
) -> BlockReqWaiter {
Self { recv }
/// Create associated `BlockReqWaiter`/`BlockRes` pair
pub fn pair() -> (BlockReqWaiter, BlockRes) {
let (send, recv) = oneshot::channel();
(Self { recv }, BlockRes(Some(send)))
}

/// Consume this BlockReqWaiter and wait on the message
Expand All @@ -75,7 +68,7 @@ impl BlockReqWaiter {
}
}

#[allow(dead_code)]
#[cfg(test)]
pub fn try_wait(&mut self) -> Option<Result<(), CrucibleError>> {
match self.recv.try_recv() {
Ok(v) => Some(v),
Expand All @@ -93,42 +86,20 @@ impl BlockReqWaiter {
mod test {
use super::*;

#[tokio::test]
async fn test_blockreqwaiter_send() {
let (send, recv) = oneshot::channel();
let brw = BlockReqWaiter::new(recv);

send.send(Ok(())).unwrap();

brw.wait().await.unwrap();
}

#[tokio::test]
async fn test_blockreq_and_blockreqwaiter() {
let (send, recv) = oneshot::channel();

let op = BlockOp::Flush {
snapshot_details: None,
};
let br = BlockReq::new(op, send);
let brw = BlockReqWaiter::new(recv);
let (brw, res) = BlockReqWaiter::pair();

br.send_ok();
res.send_ok();

brw.wait().await.unwrap();
}

#[tokio::test]
async fn test_blockreq_and_blockreqwaiter_err() {
let (send, recv) = oneshot::channel();

let op = BlockOp::Flush {
snapshot_details: None,
};
let br = BlockReq::new(op, send);
let brw = BlockReqWaiter::new(recv);
let (brw, res) = BlockReqWaiter::pair();

br.send_err(CrucibleError::UpstairsInactive);
res.send_err(CrucibleError::UpstairsInactive);

assert!(brw.wait().await.is_err());
}
Expand Down
Loading

0 comments on commit fc7a1ad

Please sign in to comment.