Skip to content

Commit

Permalink
[POC] One-way block streamer (#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
plauche authored Jan 25, 2023
1 parent ceb5174 commit 6e6ace0
Show file tree
Hide file tree
Showing 15 changed files with 5,157 additions and 0 deletions.
4,659 changes: 4,659 additions & 0 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[workspace]

members = [
"car-utility",
"block-ship",
"block-streamer",
"local-dev-environment/desktop/radio-service"
]
36 changes: 36 additions & 0 deletions DESIGN.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
This is an overview of the current data transfer protocol implemented in the `block-streamer/` application and a few open questions for the future.

## Current Design

The current design and implementation of the `block-streamer/` is intended to be a very simple way to transmit a file in IPFS across a radio link and reassemble using block data.

The file to be transmitted is read into 50 (tbd configurable) byte blocks. Each block consists of a CID, data, and links to other CIDs (if a root node). Each block is serialized into one binary blob, which is broken up into 40 byte (tbd configurable) chunks. Each chunk consists of a CID marker (first 4 bytes of CID), a chunk offset, and data. A header message consisting of the block CID is transmitted to the receiver first, followed by the chunks of the block's data+links, which are then reassembled in order. The current implementation is able to handle a dag with depth of two and can reassemble blocks sent out of order, but it can't yet handle chunks sent out of order.

*Current magic numbers and CID marker are placeholders to get things working, not final decisions.*

*Why not to use the CAR transport around blocks?*

In this initial implementation the CAR transport is not used. The reasoning was that this IPFS implementation should be designed for exchanging data over constrained communications links. This means it is likely that blocks will be transmitted individually, or even broken up into smaller chunks. There did not seem to be an immediate advantage to packaging these blocks up into a CAR, only to break that CAR up again into smaller chunks for transmission, when then blocks themselves could be transmitted as-is. However the CAR transport may still prove to be useful in this system in the future.

*Why decided to chunk blocks (hash+data) down to payload size)*

The [lab radio hardware](https://www.adafruit.com/product/3076) currently used in developing this system has a [strict payload size limit of 60 bytes](https://github.com/adafruit/RadioHead/blob/master/RH_RF69.h#L346-L347). While this radio may be more restrictive than typical satellite radios, it seems prudent to work under stricter requirements to ensure this system can scale both up and down to different payload limits. If sending individual 60-byte blocks the payload is already mostly consumed by the CID (36 bytes). This 60% overhead is not exactly efficient, so the decision was made to break blocks down into chunks which contain a CID marker (4 bytes), and a chunk offset (2 bytes), and a data blob, minimizing overhead to improve efficiency.

## Future Design Decisions

*Are there existing UDP data transfer protocols we can borrow from or use as-is?*

The current protocol for chunking/sending/assembling blocks was intentionally made simple to better understand the block transmission problem. It is very possible that an existing protocol built on UDP may provide the necessary chunking functionality, or at least functional pieces which can be built on.

Existing protocols which should be further investigated:
- [UDT](https://en.wikipedia.org/wiki/UDP-based_Data_Transfer_Protocol)
- [QUIC](https://www.chromium.org/quic/)
- [CoAP](https://en.wikipedia.org/wiki/Constrained_Application_Protocol)

*How should it handle specific data requests?*

A crucial part of this system will be correctly handling the transmission of a file across multiple communications passes, and dealing with lossy communication links, so the ability to request specific pieces of a DAG will be required. There are a number of different methods for specifying these pieces, such as by CID, with bitmasks, bloom filters, and sub-graphs. This decision will likely include a simple proof of concept implementing individual CID requests, followed by an analysis of the tradeoffs of other specification methods.

*Formal protocol messages*

The current implementation is a very simple one-way stream of block chunks. The future functional system will need to implement a formalized protocol with defined messages which allow for interactions such as requesting a specific CID or indicating that a CID has been received correctly. These will likely be created as required when implementing additional protocol functionality.
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ RUN apt-get update && \
RUN curl -Lo protoc.zip "https://github.com/protocolbuffers/protobuf/releases/download/v21.12/protoc-21.12-linux-x86_64.zip"
RUN unzip -q protoc.zip -d /usr/local
RUN chmod a+x /usr/local/bin/protoc
ENV PROTOC=/usr/local/bin/protoc
18 changes: 18 additions & 0 deletions block-ship/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[package]
name = "block-ship"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
anyhow = "1"
bytes = "1.1"
cid = {version = "0.9", features = ["scale-codec"] }
futures = "0.3.21"
iroh-unixfs = "0.2.0"
parity-scale-codec = { version = "3.0.0", default-features = false, features = ["derive"] }
parity-scale-codec-derive = "3.1.3"
rand = "0.8.5"
tokio = { version = "1", features = ["fs", "io-util"] }
tracing = "0.1"
82 changes: 82 additions & 0 deletions block-ship/src/chunking.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use crate::types::BlockWrapper;

use anyhow::Result;
use cid::Cid;
use futures::TryStreamExt;
use iroh_unixfs::{
builder::{File, FileBuilder},
Block,
};
use parity_scale_codec::Encode;
use rand::seq::SliceRandom;
use rand::thread_rng;
use std::collections::BTreeMap;
use std::path::PathBuf;
use tokio::fs::File as TokioFile;
use tokio::io::AsyncWriteExt;
use tracing::{info, warn};

// TODO: Refactor this and chunks_to_path so that they actually do what their names say
// and so that they can be tested directly against each other
pub async fn path_to_chunks(path: &PathBuf) -> Result<Vec<Vec<u8>>> {
let file: File = FileBuilder::new()
.path(path)
.fixed_chunker(50)
// This will decrease the width of the underlying tree
// but the logic isn't ready on the receiving end end
// and the current CID size means that two links will
// still overrun the lab radio packet size
// .degree(2)
.build()
.await?;

let mut blocks: Vec<_> = file.encode().await?.try_collect().await?;

let mut payloads = vec![];

info!("{:?} broken into {} blocks", path.as_path(), blocks.len());

// This randomly shuffles the order of the blocks (prior to chunking)
// in order to exercise reassembly on the receiver side.
blocks.shuffle(&mut thread_rng());

for block in blocks {
let wrapper = BlockWrapper::from_block(block)?;
let chunks = wrapper.to_chunks()?;
for c in chunks {
payloads.push(c.encode());
}
}

Ok(payloads)
}

pub async fn chunks_to_path(
path: &PathBuf,
root: &Block,
blocks: &BTreeMap<Cid, Block>,
) -> Result<bool> {
// First check if all CIDs exist
for c in root.links().iter() {
if !blocks.contains_key(c) {
info!("Missing cid {}, wait for more data", c);
return Ok(false);
}
}

let mut output_file = TokioFile::create(path).await?;
for cid in root.links().iter() {
if let Some(data) = blocks.get(cid) {
output_file.write_all(data.data()).await?;
} else {
// missing a cid...not ready yet...we shouldn't get
// here because of the CIDs check above, but
// we verify again anyways
warn!("Still missing a cid...");
return Ok(false);
}
}
output_file.flush().await?;

Ok(true)
}
2 changes: 2 additions & 0 deletions block-ship/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod chunking;
pub mod types;
127 changes: 127 additions & 0 deletions block-ship/src/types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
use anyhow::{anyhow, Result};
use bytes::Bytes;
use cid::Cid;
use iroh_unixfs::Block;
use parity_scale_codec::{Decode, Encode};
use parity_scale_codec_derive::{Decode as ParityDecode, Encode as ParityEncode};

const CHUNK_SIZE: usize = 40;

#[derive(Clone, Debug, ParityDecode, ParityEncode)]
pub struct TransmissionChunk {
pub cid_marker: Vec<u8>,
pub chunk_offset: u16,
pub data: Vec<u8>,
}

#[derive(Clone, Debug, ParityDecode, ParityEncode)]
pub enum TransmissionMessage {
Cid(Vec<u8>),
Chunk(TransmissionChunk),
}

#[derive(Debug, Eq, PartialEq)]
pub struct BlockWrapper {
pub cid: Vec<u8>,
pub payload: BlockPayload,
}

#[derive(Debug, Eq, ParityDecode, ParityEncode, PartialEq)]
pub struct BlockPayload {
pub data: Vec<u8>,
pub links: Vec<Vec<u8>>,
}

impl BlockWrapper {
pub fn to_block(&self) -> Result<Block> {
let mut links = vec![];
for l in &self.payload.links {
links.push(Cid::try_from(l.clone())?);
}
Ok(Block::new(
Cid::try_from(self.cid.clone())?,
Bytes::from(self.payload.data.clone()),
links,
))
}

pub fn from_block(block: Block) -> Result<Self> {
let mut links = vec![];
for l in block.links() {
links.push(l.to_bytes());
}

// Right now we're ignoring the data attached to the root nodes
// because the current assembly method doesn't require it
// and it saves a decent amount of payload weight
let data = if !links.is_empty() {
vec![]
} else {
block.data().to_vec()
};

Ok(BlockWrapper {
cid: block.cid().to_bytes(),
payload: BlockPayload { data, links },
})
}

pub fn to_chunks(&self) -> Result<Vec<TransmissionMessage>> {
let cid_marker = &self.cid[..4];
let mut chunks = vec![];

chunks.push(TransmissionMessage::Cid(self.cid.clone()));

let encoded_payload = self.payload.encode();
for (offset, chunk) in (0_u16..).zip(encoded_payload.chunks(CHUNK_SIZE)) {
chunks.push(TransmissionMessage::Chunk(TransmissionChunk {
cid_marker: cid_marker.to_vec(),
chunk_offset: offset,
data: chunk.to_vec(),
}));
}

Ok(chunks)
}

// TODO: This should probably verify the hash against the data
pub fn from_chunks(cid: &[u8], messages: &[TransmissionChunk]) -> Result<Self> {
let blob: Vec<u8> = messages.iter().flat_map(|c| c.data.clone()).collect();
if let Ok(payload) = BlockPayload::decode(&mut blob.as_slice()) {
return Ok(BlockWrapper {
cid: cid.to_owned(),
payload,
});
}
Err(anyhow!("Failed to find payload"))
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
pub fn test_chunk_and_rebuild_block() {
let cid = vec![1, 2, 4, 5, 6];
let wrapper = BlockWrapper {
cid: cid.clone(),
payload: BlockPayload {
data: vec![4, 5, 6],
links: vec![vec![1], vec![2]],
},
};

let messages = wrapper.to_chunks().unwrap();
let chunks: Vec<TransmissionChunk> = messages
.iter()
.filter_map(|mes| match mes {
TransmissionMessage::Chunk(chunk) => Some(chunk.clone()),
TransmissionMessage::Cid(_) => None,
})
.collect();
dbg!(&chunks);
let rebuilt = BlockWrapper::from_chunks(&cid, &chunks).unwrap();
assert_eq!(wrapper, rebuilt);
}
}
21 changes: 21 additions & 0 deletions block-streamer/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[package]
name = "block-streamer"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
anyhow = "1"
iroh-unixfs = "0.2.0"
tokio = { version = "1", features = ["fs", "io-util"] }
cid = {version = "0.9", features = ["scale-codec"] }
futures = "0.3.21"
clap = { version = "4.0.15", features = ["derive"] }
rand = "0.8.5"
parity-scale-codec = { version = "3.0.0", default-features = false, features = ["derive"] }
parity-scale-codec-derive = "3.1.3"
bytes = "1.1"
tracing = "0.1"
tracing-subscriber = "0.3"
block-ship = { path = "../block-ship" }
2 changes: 2 additions & 0 deletions block-streamer/Cross.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[target.armv7-unknown-linux-gnueabihf]
dockerfile = "../Dockerfile"
17 changes: 17 additions & 0 deletions block-streamer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Overview

This block-stream application provides the foundation for creating and exploring a new data transfer protocol designed for the IPFS-in-space scenario. The current implementation is pretty simple, it reads in the contents of a file, breaks up the contents into blocks, and transmits those blocks in UDP packets to a receiver. The blocks are serialized into binary data using the Parity SCALE format. The receiver listens for the stream of blocks, attempts to find the root block, and then waits until all links in the root are satisfied before assembling the file. This simple and naive approach to IPFS data transfer is intended to lay a foundation of point-to-point block streaming to be iterated on in future project milestones.

## Usage

First start the receiving/listening instance:

$ cargo run -- receive /path/to/new/file 127.0.0.1:8080

This command will start an instance of the `block-streamer` which is listening at `127.0.0.1:8080` and will attempt to assemble the blocks it receives into a file located at `/path/to/new/file`.

Next start the transmitting instance:

$ cargo run -- transmit /path/to/file 127.0.0.1:8080

This command will start an instance of `block-streamer` which will break up the file at `/path/to/file` into blocks, and then transmit those blocks in UDP packets to `127.0.0.1:8080`. Currently the blocks are sorted into random order prior to transmission in order to exercise the assembly functionality on the listening side.
1 change: 1 addition & 0 deletions block-streamer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

59 changes: 59 additions & 0 deletions block-streamer/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
mod receive;
mod transmit;

use crate::receive::receive;
use crate::transmit::transmit;
use anyhow::Result;
use clap::{Parser, Subcommand};
use std::path::PathBuf;
use tracing::Level;

#[derive(Parser, Debug, Clone)]
#[clap(version, long_about = None, propagate_version = true)]
#[clap(about = "Transmit/Receive IPFS block stream")]
pub struct Cli {
#[clap(subcommand)]
command: Commands,
}

#[derive(Subcommand, Debug, Clone)]
enum Commands {
#[clap(about = "Transmit a file")]
Transmit {
/// The path to a file to be transmitted
path: PathBuf,
/// The address to transmit the file to
target_address: String,
},
#[clap(about = "Receive a file")]
Receive {
/// The path to a file where received blocks will be output
path: PathBuf,
/// The address to listen for the file on
listen_address: String,
},
}

impl Cli {
pub async fn run(&self) -> Result<()> {
match &self.command {
Commands::Transmit {
path,
target_address,
} => transmit(path, target_address).await?,
Commands::Receive {
path,
listen_address,
} => receive(path, listen_address).await?,
}
Ok(())
}
}

#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<()> {
tracing_subscriber::fmt().with_max_level(Level::INFO).init();

let cli = Cli::parse();
cli.run().await
}
Loading

0 comments on commit 6e6ace0

Please sign in to comment.