Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Pass Block as a reference to BlockStore::put_block #272

Merged
merged 5 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
- refactor: Remove sessions and redundant code. [PR 255](https://github.com/dariusc93/rust-ipfs/pull/255)
- refactor: Move from libipld to ipld-core. [PR 257](https://github.com/dariusc93/rust-ipfs/pull/257)
- chore: Use `Bytes` when creating or using `Block` within bitswap. [PR 264](https://github.com/dariusc93/rust-ipfs/pull/264)
- refactor: Reference `Block` in `BlockStore::put_block`. [PR 272](https://github.com/dariusc93/rust-ipfs/pull/272)
- feat: Passthrough timeout to WantSession::new. [PR 265](https://github.com/dariusc93/rust-ipfs/pull/265)

# 0.11.20
Expand Down
4 changes: 4 additions & 0 deletions src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ impl Block {
&self.data
}

pub fn inner_data(&self) -> &Bytes {
&self.data
}

pub fn into_inner(self) -> (Cid, Bytes) {
(self.cid, self.data)
}
Expand Down
8 changes: 4 additions & 4 deletions src/dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,7 @@ impl std::future::IntoFuture for DagPut {
};
let cid = Cid::new(version, self.codec.into(), hash)?;
let block = Block::new(cid, bytes)?;
let cid = self.dag_ipld.repo.put_block(block).await?;
let cid = self.dag_ipld.repo.put_block(&block).await?;

if let Some(opt) = self.pinned {
if !self.dag_ipld.repo.is_pinned(&cid).await? {
Expand Down Expand Up @@ -1250,7 +1250,7 @@ mod tests {
let (cid, data) = blocks.next().unwrap();
assert_eq!(blocks.next(), None);

ipfs.put_block(Block::new(cid, data).unwrap())
ipfs.put_block(&Block::new(cid, data).unwrap())
.await
.unwrap();

Expand Down Expand Up @@ -1283,7 +1283,7 @@ mod tests {

let total_size = data.len();

ipfs.put_block(Block::new(cid, data).unwrap())
ipfs.put_block(&Block::new(cid, data).unwrap())
.await
.unwrap();

Expand All @@ -1301,7 +1301,7 @@ mod tests {
let node = node.unwrap();
let block = Block::new(node.cid.to_owned(), node.block.to_vec()).unwrap();

ipfs.put_block(block).await.unwrap();
ipfs.put_block(&block).await.unwrap();

cids.push(node.cid.to_owned());
}
Expand Down
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1128,7 +1128,7 @@ impl Ipfs {
}

/// Puts a block into the ipfs repo.
pub async fn put_block(&self, block: Block) -> Result<Cid, Error> {
pub async fn put_block(&self, block: &Block) -> Result<Cid, Error> {
self.repo.put_block(block).span(self.span.clone()).await
}

Expand Down Expand Up @@ -2752,7 +2752,7 @@ mod tests {
let cid = Cid::new_v1(BlockCodec::Raw.into(), Code::Sha2_256.digest(&data));
let block = Block::new(cid, data).unwrap();

let cid: Cid = ipfs.put_block(block.clone()).await.unwrap();
let cid: Cid = ipfs.put_block(&block).await.unwrap();
let new_block = ipfs.get_block(&cid).await.unwrap();
assert_eq!(block, new_block);
}
Expand Down
6 changes: 3 additions & 3 deletions src/p2p/bitswap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ mod test {

let cid = *block.cid();

repo.put_block(block.clone()).await?;
repo.put_block(&block).await?;

let opt = DialOpts::peer_id(peer2)
.addresses(vec![addr2.clone()])
Expand Down Expand Up @@ -759,7 +759,7 @@ mod test {

let cid = *block.cid();

repo.put_block(block.clone()).await?;
repo.put_block(&block).await?;

let opt = DialOpts::peer_id(peer2)
.addresses(vec![addr2.clone()])
Expand Down Expand Up @@ -816,7 +816,7 @@ mod test {

let cid = *block.cid();

repo.put_block(block.clone()).await?;
repo.put_block(&block).await?;

let opt = DialOpts::peer_id(peer2)
.addresses(vec![addr2.clone()])
Expand Down
2 changes: 1 addition & 1 deletion src/p2p/bitswap/sessions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ impl WantSession {
tracing::warn!(session = %self.cid, %peer_id, cid = %block.cid(), name = "want_session", "state already putting block into store");
} else {
tracing::info!(%peer_id, cid = %block.cid(), name = "want_session", "storing block");
let fut = self.repo.put_block(block).into_future();
let fut = self.repo.put_block(&block).into_future();
self.state = WantSessionState::PutBlock {
from_peer_id: peer_id,
fut,
Expand Down
2 changes: 1 addition & 1 deletion src/refs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ mod tests {
let cid = Cid::try_from(*cid_str).unwrap();
let block = Block::new(cid, data.to_vec()).unwrap();
block.to_ipld().unwrap();
ipfs.put_block(block).await.unwrap();
ipfs.put_block(&block).await.unwrap();
}

ipfs
Expand Down
15 changes: 8 additions & 7 deletions src/repo/blockstore/flatfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl BlockStore for FsBlockStore {

//TODO: Allow multiple puts without holding a lock. We could probably hold a read lock instead
// and revert back to using a broadcast
async fn put(&self, block: Block) -> Result<(Cid, BlockPut), Error> {
async fn put(&self, block: &Block) -> Result<(Cid, BlockPut), Error> {
let inner = &mut *self.inner.write().await;
inner.put(block).await
}
Expand Down Expand Up @@ -138,7 +138,8 @@ impl FsBlockStoreInner {
.await?
}

async fn put(&mut self, block: Block) -> Result<(Cid, BlockPut), Error> {
async fn put(&mut self, block: &Block) -> Result<(Cid, BlockPut), Error> {
let block = block.clone();
let target_path = block_path(self.path.clone(), block.cid());
let cid = *block.cid();

Expand Down Expand Up @@ -344,7 +345,7 @@ mod tests {
panic!("block should not be found")
}

let put = store.put(block.clone()).await.unwrap();
let put = store.put(&block).await.unwrap();
assert_eq!(put.0, cid.to_owned());
let contains = store.contains(&cid);
assert!(contains.await.unwrap());
Expand Down Expand Up @@ -375,7 +376,7 @@ mod tests {
block_store.open().await.unwrap();

assert!(!block_store.contains(block.cid()).await.unwrap());
block_store.put(block.clone()).await.unwrap();
block_store.put(&block).await.unwrap();

let block_store = FsBlockStore::new(tmp.clone());
block_store.open().await.unwrap();
Expand All @@ -399,7 +400,7 @@ mod tests {
let data_slice = data.to_vec();
let cid = Cid::new_v1(BlockCodec::Raw.into(), Code::Sha2_256.digest(&data_slice));
let block = Block::new(cid, data_slice).unwrap();
block_store.put(block.clone()).await.unwrap();
block_store.put(&block).await.unwrap();
}

let cids = block_store.list().await.collect::<Vec<_>>().await;
Expand Down Expand Up @@ -449,7 +450,7 @@ mod tests {
let block = block.clone();
async move {
barrier.wait().await;
bs.put(block).await
bs.put(&block).await
}
})
})
Expand Down Expand Up @@ -490,7 +491,7 @@ mod tests {

assert_eq!(single.list().await.collect::<Vec<_>>().await.len(), 0);

single.put(block).await.unwrap();
single.put(&block).await.unwrap();

// compare the multihash since we store the block named as cidv1
assert_eq!(
Expand Down
3 changes: 2 additions & 1 deletion src/repo/blockstore/idb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ impl BlockStore for IdbBlockStore {
rx.await?
}

async fn put(&self, block: Block) -> Result<(Cid, BlockPut), Error> {
async fn put(&self, block: &Block) -> Result<(Cid, BlockPut), Error> {
let block = block.clone();
if self.contains(block.cid()).await? {
return Ok((*block.cid(), BlockPut::Existed));
}
Expand Down
23 changes: 14 additions & 9 deletions src/repo/blockstore/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use futures::StreamExt;
use ipld_core::cid::Cid;
use tokio::sync::RwLock;

use bytes::Bytes;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
Expand All @@ -26,7 +27,7 @@ impl std::fmt::Debug for MemBlockStore {
}

struct MemBlockInner {
blocks: HashMap<Cid, Block>,
blocks: HashMap<Cid, Bytes>,
}

impl MemBlockStore {
Expand Down Expand Up @@ -58,8 +59,12 @@ impl BlockStore for MemBlockStore {

async fn get(&self, cid: &Cid) -> Result<Option<Block>, Error> {
let inner = &*self.inner.read().await;
let block = inner.blocks.get(cid).cloned();
Ok(block)
let block = match inner.blocks.get(cid) {
Some(bytes) => Block::new(*cid, bytes.clone())?,
None => return Ok(None),
};

Ok(Some(block))
}

async fn size(&self, cid: &[Cid]) -> Result<Option<usize>, Error> {
Expand All @@ -69,17 +74,17 @@ impl BlockStore for MemBlockStore {
.blocks
.iter()
.filter(|(id, _)| cid.contains(id))
.map(|(_, b)| b.data().len())
.map(|(_, b)| b.len())
.sum(),
))
}

async fn total_size(&self) -> Result<usize, Error> {
let inner = &*self.inner.read().await;
Ok(inner.blocks.values().map(|b| b.data().len()).sum())
Ok(inner.blocks.values().map(|b| b.len()).sum())
}

async fn put(&self, block: Block) -> Result<(Cid, BlockPut), Error> {
async fn put(&self, block: &Block) -> Result<(Cid, BlockPut), Error> {
use std::collections::hash_map::Entry;

let inner = &mut *self.inner.write().await;
Expand All @@ -92,7 +97,7 @@ impl BlockStore for MemBlockStore {
Entry::Vacant(ve) => {
trace!("new block");
let cid = *ve.key();
ve.insert(block);
ve.insert(block.inner_data().clone());
Ok((cid, BlockPut::NewBlock))
}
}
Expand Down Expand Up @@ -154,7 +159,7 @@ mod tests {
panic!("block should not be found")
}

let put = store.put(block.clone());
let put = store.put(&block);
assert_eq!(put.await.unwrap().0, cid.to_owned());
let contains = store.contains(&cid);
assert!(contains.await.unwrap());
Expand All @@ -180,7 +185,7 @@ mod tests {
let data_slice = data.to_vec();
let cid = Cid::new_v1(BlockCodec::Raw.into(), Code::Sha2_256.digest(&data_slice));
let block = Block::new(cid, data_slice).unwrap();
mem_store.put(block.clone()).await.unwrap();
mem_store.put(&block).await.unwrap();
assert!(mem_store.contains(block.cid()).await.unwrap());
}

Expand Down
20 changes: 10 additions & 10 deletions src/repo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ pub trait BlockStore: Debug + Send + Sync + 'static {
/// Get a total size of the block store
async fn total_size(&self) -> Result<usize, Error>;
/// Inserts a block in the blockstore.
async fn put(&self, block: Block) -> Result<(Cid, BlockPut), Error>;
async fn put(&self, block: &Block) -> Result<(Cid, BlockPut), Error>;
/// Removes a block from the blockstore.
async fn remove(&self, cid: &Cid) -> Result<(), Error>;
/// Remove multiple blocks from the blockstore
Expand Down Expand Up @@ -451,7 +451,7 @@ impl Repo {
let mut stream = self.list_blocks().await;
while let Some(cid) = stream.next().await {
match self.get_block_now(&cid).await {
Ok(Some(block)) => match repo.inner.block_store.put(block).await {
Ok(Some(block)) => match repo.inner.block_store.put(&block).await {
Ok(_) => {}
Err(e) => error!("Error migrating {cid}: {e}"),
},
Expand Down Expand Up @@ -607,7 +607,7 @@ impl Repo {
}

/// Puts a block into the block store.
pub fn put_block(&self, block: Block) -> RepoPutBlock {
pub fn put_block<'a>(&self, block: &'a Block) -> RepoPutBlock<'a> {
RepoPutBlock::new(self, block).broadcast_on_new_block(true)
}

Expand Down Expand Up @@ -998,15 +998,15 @@ impl Repo {
}
}

pub struct RepoPutBlock {
pub struct RepoPutBlock<'a> {
repo: Repo,
block: Block,
block: &'a Block,
span: Option<Span>,
broadcast_on_new_block: bool,
}

impl RepoPutBlock {
fn new(repo: &Repo, block: Block) -> Self {
impl<'a> RepoPutBlock<'a> {
fn new(repo: &Repo, block: &'a Block) -> Self {
Self {
repo: repo.clone(),
block,
Expand All @@ -1026,16 +1026,16 @@ impl RepoPutBlock {
}
}

impl IntoFuture for RepoPutBlock {
impl<'a> IntoFuture for RepoPutBlock<'a> {
type IntoFuture = BoxFuture<'static, Self::Output>;
type Output = Result<Cid, Error>;
fn into_future(self) -> Self::IntoFuture {
let block = self.block;
let block = self.block.clone();
let span = self.span.unwrap_or(Span::current());
let span = debug_span!(parent: &span, "put_block", cid = %block.cid());
async move {
let _guard = self.repo.inner.gclock.read().await;
let (cid, res) = self.repo.inner.block_store.put(block.clone()).await?;
let (cid, res) = self.repo.inner.block_store.put(&block).await?;

if let BlockPut::NewBlock = res {
if self.broadcast_on_new_block {
Expand Down
6 changes: 3 additions & 3 deletions src/unixfs/add.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ impl Stream for UnixfsAdd {
return;
}
};
let _cid = match repo.put_block(block).await {
let _cid = match repo.put_block(&block).await {
Ok(cid) => cid,
Err(e) => {
yield UnixfsStatus::FailedStatus { written, total_size, error: Some(e) };
Expand All @@ -209,7 +209,7 @@ impl Stream for UnixfsAdd {
return;
}
};
let _cid = match repo.put_block(block).await {
let _cid = match repo.put_block(&block).await {
Ok(cid) => cid,
Err(e) => {
yield UnixfsStatus::FailedStatus { written, total_size, error: Some(e) };
Expand Down Expand Up @@ -249,7 +249,7 @@ impl Stream for UnixfsAdd {
let cid = node.cid.to_owned();
let block = Block::new(cid, node.block.to_vec())?;

repo.put_block(block).await?;
repo.put_block(&block).await?;

cids.push(cid);
}
Expand Down
6 changes: 3 additions & 3 deletions tests/bitswap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ async fn bitswap_stress_test() {

let nodes = spawn_nodes::<5>(Topology::Mesh).await;

let block = Block::new(cid, data.clone()).unwrap();

for (i, node) in nodes.iter().enumerate() {
if filter(i) {
node.put_block(Block::new(cid, data.clone()).unwrap())
.await
.unwrap();
node.put_block(&block).await.unwrap();
}
}

Expand Down
Loading