Skip to content

Commit

Permalink
refactor: work through conversions
Browse files Browse the repository at this point in the history
  • Loading branch information
zeeshanlakhani committed Dec 16, 2022
1 parent 801422d commit e745dba
Show file tree
Hide file tree
Showing 8 changed files with 214 additions and 189 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ authors = ["Brooklyn Zelenka <[email protected]>"]
[lib]
path = "src/lib.rs"
bench = false
doctest = false
doctest = true

[[bin]]
name = "ipvm"
Expand All @@ -33,7 +33,6 @@ required-features = ["test_utils"]
[dependencies]
anyhow = "1.0"
async-trait = "0.1"
cid = "0.9"
clap = { version = "4.0", features = ["derive"] }
derive_more = "0.99.17"
diesel = { version = "2.0", features = ["sqlite"] }
Expand All @@ -58,7 +57,7 @@ ucan = "0.1"
url = "2.3"
wasmer = { version = "3.1", features = ["compiler"] }
wasmer-compiler-singlepass = "3.1.0"
wasmer-middlewares = "3.1.0"
wasmer-middlewares = "3.1"

[dev-dependencies]
criterion = "0.4"
Expand Down
2 changes: 0 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,6 @@ async fn main() -> Result<()> {
.execute(&mut conn)
.expect("Error saving new post");

//todo!("advertise receipt");

let res_copy = res.clone().into_bytes();

println!("Wasm CID: {closure_cid}");
Expand Down
6 changes: 1 addition & 5 deletions src/network/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@ use crate::network::{
swarm::ComposedBehaviour,
};
use anyhow::Result;
use libp2p::{identity::Keypair, request_response::ResponseChannel, Multiaddr, PeerId, Swarm};
use libp2p::{request_response::ResponseChannel, Multiaddr, PeerId, Swarm};
use std::collections::HashSet;
use tokio::sync::{mpsc, oneshot};

#[derive(Clone)]
pub struct Client {
sender: mpsc::Sender<Command>,
peer_id: PeerId,
}

impl Client {
Expand All @@ -20,13 +19,10 @@ impl Client {
) -> Result<(Self, mpsc::Receiver<Event>, EventLoop)> {
let (command_sender, command_receiver) = mpsc::channel(1);
let (event_sender, event_receiver) = mpsc::channel(1);
let keypair = Keypair::generate_ed25519();
let peer_id = keypair.public().to_peer_id();

Ok((
Client {
sender: command_sender,
peer_id,
},
event_receiver,
EventLoop::new(swarm, command_receiver, event_sender),
Expand Down
121 changes: 66 additions & 55 deletions src/workflow/closure.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
//! The smallest unit of work in IPVM

use crate::workflow::pointer::{InvokedTaskPointer, Promise, Status};
use crate::workflow::pointer::{InvokedTaskPointer, Promise, Status, OK_BRANCH};
use anyhow::anyhow;
use libipld::{
cbor::DagCborCodec, cid::multibase::Base, prelude::Encode, serde as ipld_serde, Cid, Ipld, Link,
cbor::DagCborCodec, cid::multibase::Base, codec::Codec, serde as ipld_serde, Cid, Ipld, Link,
};
use multihash::{Code, MultihashDigest};
use std::{collections::btree_map::BTreeMap, convert::TryFrom, fmt};
use url::Url;

const WITH_KEY: &str = "with";
const DO_KEY: &str = "do";
const INPUTS_KEY: &str = "inputs";

/// The suspended representation of the smallest unit of work in IPVM
///
/// # Example
///
/// ```
/// use libipld::Ipld;
/// use url::Url;
Expand All @@ -19,7 +25,7 @@ use url::Url;
/// Closure {
/// resource: Url::parse("ipfs://bafkreihf37goitzzlatlhwgiadb2wxkmn4k2edremzfjsm7qhnoxwlfstm").expect("IPFS URL"),
/// action: Action::from("wasm/run"),
/// inputs: Input::from(Ipld::Null),
/// inputs: Input::try_from(Ipld::Null).unwrap(),
/// };
/// ```
#[derive(Clone, Debug, PartialEq)]
Expand All @@ -41,21 +47,21 @@ impl TryFrom<Closure> for Link<Closure> {
type Error = anyhow::Error;

fn try_from(closure: Closure) -> Result<Link<Closure>, Self::Error> {
let mut closure_bytes = Vec::new();
<Closure as Into<Ipld>>::into(closure).encode(DagCborCodec, &mut closure_bytes)?;
let ipld: Ipld = closure.into();
let bytes = DagCborCodec.encode(&ipld)?;
Ok(Link::new(Cid::new_v1(
DagCborCodec.into(),
Code::Sha3_256.digest(&closure_bytes),
Code::Sha3_256.digest(&bytes),
)))
}
}

impl From<Closure> for Ipld {
fn from(closure: Closure) -> Self {
Ipld::Map(BTreeMap::from([
("with".to_string(), Ipld::String(closure.resource.into())),
("do ".to_string(), closure.action.into()),
("inputs".to_string(), closure.inputs.into()),
(WITH_KEY.into(), Ipld::String(closure.resource.into())),
(DO_KEY.into(), closure.action.into()),
(INPUTS_KEY.into(), closure.inputs.into()),
]))
}
}
Expand All @@ -64,27 +70,37 @@ impl TryFrom<Ipld> for Closure {
type Error = anyhow::Error;

fn try_from(ipld: Ipld) -> Result<Self, Self::Error> {
match ipld {
Ipld::Map(assoc) => Ok(Closure {
action: Action::try_from(assoc.get("do").ok_or(anyhow!("Bad"))?.clone())
.or_else(|_| Err(anyhow!("Bad")))?,

inputs: Input::from(assoc.get("inputs").ok_or(anyhow!("Bad"))?.clone()),
resource: match assoc.get("with").ok_or(anyhow!("Bad"))? {
Ipld::Link(cid) => cid
.to_string_of_base(Base::Base32HexLower)
.or(Err(anyhow!("Bad")))
.and_then(|txt| {
Url::parse(format!("{}{}", "ipfs://", txt).as_str())
.or(Err(anyhow!("Bad")))
}),
Ipld::String(txt) => Url::parse(txt.as_str()).or(Err(anyhow!("Bad"))),
_ => Err(anyhow!("Bad")),
}?,
}),

_ => Err(anyhow!("Bad")),
}
let map: BTreeMap<String, Ipld> = ipld_serde::from_ipld(ipld)?;
let action = Action::try_from(
map.get(DO_KEY)
.ok_or_else(|| anyhow!("No do action set."))?
.to_owned(),
)?;
let inputs = Input::try_from(
map.get(INPUTS_KEY)
.ok_or_else(|| anyhow!("No inputs key."))?
.to_owned(),
)?;

let resource = match map.get(WITH_KEY) {
Some(Ipld::Link(cid)) => cid
.to_string_of_base(Base::Base32HexLower)
.map_err(|e| anyhow!("Failed to encode CID into multibase string: {e}"))
.and_then(|txt| {
Url::parse(format!("{}{}", "ipfs://", txt).as_str())
.map_err(|e| anyhow!("Failed to parse URL: {e}"))
}),
Some(Ipld::String(txt)) => {
Url::parse(txt.as_str()).map_err(|e| anyhow!("Failed to parse URL: {e}"))
}
_ => Err(anyhow!("No resource/with set.")),
}?;

Ok(Closure {
resource,
action,
inputs,
})
}
}

Expand Down Expand Up @@ -115,31 +131,26 @@ impl From<Promise> for Input {
}
}

impl From<Ipld> for Input {
fn from(ipld: Ipld) -> Input {
match ipld {
Ipld::Map(ref map) => {
if map.len() != 1 {
return Input::IpldData(ipld);
}
match map.get("ucan/ok") {
Some(Ipld::List(pointer)) => {
if let Ok(invoked_task) =
InvokedTaskPointer::try_from(Ipld::List(pointer.clone()))
{
Input::Deferred(Promise {
branch_selector: Some(Status::Success),
invoked_task,
})
} else {
Input::IpldData(ipld)
}
}

_ => Input::IpldData(ipld),
}
}
_ => Input::IpldData(ipld),
impl TryFrom<Ipld> for Input {
type Error = anyhow::Error;

fn try_from(ipld: Ipld) -> Result<Self, Self::Error> {
let Ok(map) = ipld_serde::from_ipld::<BTreeMap<String, Ipld>>(ipld.clone()) else {
return Ok(Input::IpldData(ipld))
};

if map.len() > 1 {
map.get(OK_BRANCH)
.map_or(Ok(Input::IpldData(ipld)), |ipld| {
let pointer = ipld_serde::from_ipld(ipld.clone())?;
let invoked_task = InvokedTaskPointer::try_from(Ipld::List(pointer))?;
Ok(Input::Deferred(Promise {
result: Some(Status::Success),
invoked_task,
}))
})
} else {
Ok(Input::IpldData(ipld))
}
}
}
Expand Down
15 changes: 13 additions & 2 deletions src/workflow/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Configuration module

use anyhow::anyhow;
use libipld::{serde as ipld_serde, Ipld};
use std::{collections::BTreeMap, default::Default, time::Duration};

Expand Down Expand Up @@ -72,8 +73,18 @@ impl TryFrom<Ipld> for Resources {
type Error = anyhow::Error;

fn try_from(ipld: Ipld) -> Result<Self, Self::Error> {
let fuel = ipld_serde::from_ipld(ipld.get(FUEL_KEY)?.to_owned())?;
let time = ipld_serde::from_ipld(ipld.take(TIMEOUT_KEY)?)?;
let map: BTreeMap<String, Ipld> = ipld_serde::from_ipld(ipld)?;
let fuel = ipld_serde::from_ipld(
map.get(FUEL_KEY)
.ok_or_else(|| anyhow!("No fuel set."))?
.to_owned(),
)?;

let time = ipld_serde::from_ipld(
map.get(TIMEOUT_KEY)
.ok_or_else(|| anyhow!("No timeout set."))?
.to_owned(),
)?;

Ok(Resources { fuel, time })
}
Expand Down
Loading

0 comments on commit e745dba

Please sign in to comment.