Skip to content

Commit

Permalink
GFTP: JSON RPC server mode (#367)
Browse files Browse the repository at this point in the history
Co-authored-by: Przemysław Rekucki <[email protected]>
  • Loading branch information
mfranciszkiewicz and prekucki authored Jun 26, 2020
1 parent 2595933 commit d85c7c2
Show file tree
Hide file tree
Showing 11 changed files with 969 additions and 285 deletions.
437 changes: 233 additions & 204 deletions Cargo.lock

Large diffs are not rendered by default.

14 changes: 12 additions & 2 deletions core/gftp/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "gftp"
version = "0.1.0"
version = "0.1.1"
authors = ["Golem Factory <[email protected]>"]
edition = "2018"

Expand All @@ -21,6 +21,16 @@ env_logger = "0.7.1"
futures = "0.3"
rand = "0.7.3"
structopt = "0.3.9"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
sha3 = "0.8.2"
tokio = { version = "0.2", features = ["io-std", "time"] }
thiserror = "1.0.20"
dotenv = "0.15.0"
url = "2.1.1"
url = { version = "2.1.1", features = ["serde"] }


[dev-dependencies]
sha3 = "0.8.2"
tokio = { version = "0.2", features = ["process"] }
tempdir = "0.3.7"
230 changes: 230 additions & 0 deletions core/gftp/examples/gftp-server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
use anyhow::{anyhow, Result};
use futures::future::{FutureExt, LocalBoxFuture};
use gftp::rpc::*;
use sha3::digest::generic_array::GenericArray;
use sha3::Digest;
use std::ffi::OsString;
use std::fs::OpenOptions;
use std::io::Read;
use std::path::{Path, PathBuf};
use std::process::Stdio;
use std::sync::atomic::{AtomicUsize, Ordering};
use structopt::StructOpt;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::process::{ChildStdin, ChildStdout, Command};

static SEQ: AtomicUsize = AtomicUsize::new(0);
type HashOutput = GenericArray<u8, <sha3::Sha3_512 as Digest>::OutputSize>;

/// Build the GFTP binary, start the daemon and run:
///
/// `cargo run --example gftp-server ../../target/debug/gftp Cargo.toml`
#[derive(StructOpt)]
struct Args {
/// Path to GFTP binary
gftp_bin: PathBuf,
/// File to share
share: PathBuf,
}

trait ReadRpcMessage {
fn read_message(&mut self) -> LocalBoxFuture<Result<RpcMessage>>;
}

trait WriteRpcMessage {
fn write_message(&mut self, msg: RpcMessage) -> LocalBoxFuture<Result<()>>;
}

impl ReadRpcMessage for BufReader<ChildStdout> {
fn read_message(&mut self) -> LocalBoxFuture<Result<RpcMessage>> {
async move {
let mut buffer = String::new();
self.read_line(&mut buffer).await?;
log::info!("[Rx] {}", buffer.trim());
let msg = serde_json::from_str::<RpcMessage>(&buffer)?;
Ok(msg)
}
.boxed_local()
}
}

impl WriteRpcMessage for ChildStdin {
fn write_message(&mut self, msg: RpcMessage) -> LocalBoxFuture<Result<()>> {
async move {
let ser = format!("{}\r\n", serde_json::to_string(&msg)?);
log::info!("[Tx] {}", ser.trim());
self.write_all(ser.as_bytes()).await?;
self.flush();
Ok(())
}
.boxed_local()
}
}

async fn send(
stdin: &mut ChildStdin,
reader: &mut BufReader<ChildStdout>,
req: RpcRequest,
) -> Result<RpcResult> {
let id = SEQ.fetch_add(1, Ordering::Relaxed) as i64;
let msg = RpcMessage::request(Some(&RpcId::Int(id)), req);
stdin.write_message(msg).await?;

let res = reader.read_message().await?;
match res.id {
Some(RpcId::Int(v)) => match v == id {
false => return Err(anyhow!("Invalid response ID: {}, expected {}", v, id)),
_ => (),
},
id => return Err(anyhow!("Invalid response ID: {:?}", id)),
}

match res.body {
RpcBody::Error { error } => return Err(anyhow!("Request {:?} failed: {:?}", id, error)),
RpcBody::Request { .. } => return Err(anyhow!("Unexpected message: {:?}", res)),
RpcBody::Result { result } => Ok(result),
}
}

fn hash_file(path: &Path) -> Result<HashOutput> {
let mut file_src = OpenOptions::new().read(true).open(path)?;

let mut hasher = sha3::Sha3_512::default();
let mut chunk = vec![0; 4096];

while let Ok(count) = file_src.read(&mut chunk[..]) {
hasher.input(&chunk[..count]);
if count != 4096 {
break;
}
}
Ok(hasher.result())
}

#[actix_rt::main]
async fn main() -> Result<()> {
dotenv::dotenv().ok();
std::env::set_var(
"RUST_LOG",
std::env::var("RUST_LOG").unwrap_or("info".into()),
);
env_logger::init();

let args = Args::from_args();
if !args.gftp_bin.exists() {
return Err(anyhow!(
"Gftp binary does not exist: {}",
args.gftp_bin.display()
));
}
if !args.share.exists() {
return Err(anyhow!(
"Shared file does not exist: {}",
args.gftp_bin.display()
));
}

let tmp_dir = tempdir::TempDir::new("gftp-server")?;
let published_hash = hash_file(&args.share)?;

log::info!("spawning server");
let mut child = Command::new(args.gftp_bin)
.arg(OsString::from("server"))
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()?;

let mut stdin = child.stdin.take().unwrap();
let stdout = child.stdout.take().unwrap();
let mut reader = BufReader::new(stdout);

log::info!("sending version request");
let req = RpcRequest::Version {};
send(&mut stdin, &mut reader, req).await?;

log::info!("sending publish request");
let files = vec![args.share.clone()];
let req = RpcRequest::Publish { files };
let urls = match send(&mut stdin, &mut reader, req).await? {
RpcResult::Files(files) => files.into_iter().map(|r| r.url).collect::<Vec<_>>(),
result => return Err(anyhow!("Invalid result: {:?}", result)),
};

log::info!("sending close request");
let req = RpcRequest::Close { urls: urls.clone() };
match send(&mut stdin, &mut reader, req).await? {
RpcResult::Statuses(vec) => {
if vec.iter().any(|b| b == &RpcStatusResult::Error) {
return Err(anyhow!("Invalid result: {:?}", vec));
}
}
result => return Err(anyhow!("Invalid result: {:?}", result)),
}

log::info!("sending erroneous close request");
let req = RpcRequest::Close { urls };
match send(&mut stdin, &mut reader, req).await? {
RpcResult::Statuses(vec) => {
if vec.iter().any(|b| b == &RpcStatusResult::Ok) {
return Err(anyhow!("Invalid result: {:?}", vec));
}
}
result => return Err(anyhow!("Invalid result: {:?}", result)),
}

log::info!("sending publish request (for download)");
let files = vec![args.share.clone()];
let req = RpcRequest::Publish { files };
let url = match send(&mut stdin, &mut reader, req).await? {
RpcResult::Files(files) => files
.into_iter()
.map(|r| r.url)
.next()
.ok_or_else(|| anyhow!("Missing URL in response"))?,
result => return Err(anyhow!("Invalid result: {:?}", result)),
};

log::info!("sending download request");
let output_file = tmp_dir.path().join("tmp-download");
let req = RpcRequest::Download {
url,
output_file: output_file.clone(),
};
send(&mut stdin, &mut reader, req).await?;

if hash_file(&output_file)? != published_hash {
return Err(anyhow!("Invalid file hash (receive request)"));
} else {
log::info!("file checksum ok");
}

log::info!("sending receive request");
let output_file = tmp_dir.path().join("tmp-receive");
let req = RpcRequest::Receive {
output_file: output_file.clone(),
};
let url = match send(&mut stdin, &mut reader, req).await? {
RpcResult::File(file_result) => file_result.url,
result => return Err(anyhow!("Invalid result: {:?}", result)),
};

log::info!("sending upload request");
let req = RpcRequest::Upload {
url,
file: args.share,
};
send(&mut stdin, &mut reader, req).await?;

if hash_file(&output_file)? != published_hash {
return Err(anyhow!("Invalid file hash (receive request)"));
} else {
log::info!("file checksum ok");
}

log::info!("sending shutdown request");
let req = RpcRequest::Shutdown {};
send(&mut stdin, &mut reader, req).await?;

child.await?;
Ok(())
}
75 changes: 60 additions & 15 deletions core/gftp/readme.md
Original file line number Diff line number Diff line change
@@ -1,40 +1,85 @@
# Using gftp transfer binary

## Publishing files:
## Publishing files

Start yagna service:
```
```bash
cargo run service run
```

Publish chosen file. Copy file hash from logs.
```
Publish a chosen file (blocking).
```bash
cargo run -p gftp -- publish {file name}
...
Published file [LICENSE] as gftp://0x06bf342e4d1633aac5db38817c2e938e9d6ab7f3/edb0016d9f8bafb54540da34f05a8d510de8114488f23916276bdead05509a53.
...
```

## Downloading files:
Example output:
```json
{"result": [{"file": "Cargo.toml", "url": "gftp://0xf2f32374dde7326be2461b4e16a34adb0afe018f/39dc05a25ea97a1c90166658d93786f3302a51b8e31eb9b26001b615dea7e773"}]}
```

or with `--verbose` (`-v`)
```bash
cargo run -p gftp -- publish {file name} -v
```

```json
{"jsonrpc": "2.0", "id": null, "result": [{"file": "Cargo.toml", "url": "gftp://0xf2f32374dde7326be2461b4e16a34adb0afe018f/39dc05a25ea97a1c90166658d93786f3302a51b8e31eb9b26001b615dea7e773"}]}
```

## Downloading a file

```
cargo run -p gftp -- download \
gftp://0x06bf342e4d1633aac5db38817c2e938e9d6ab7f3/edb0016d9f8bafb54540da34f05a8d510de8114488f23916276bdead05509a53 \
-o workdir/gftp/download.txt
```

## Uploading file
## Uploading a file

Publish file for upload:
Publish file for upload (blocking):

```
cargo run -p gftp -- await-upload workdir/gftp-upload/License
...
[2020-03-03T10:17:57Z INFO gftp] Waiting for file upload [workdir/gftp-upload/License] on url [gftp://0x06bf342e4d1633aac5db38817c2e938e9d6ab7f3/z2IeDvgs1Q1hZ6seR0iSEsKW8kxdxQCK0eoz6DsYVznqJIl5K18NqwJPdLgesY9yR].
...
cargo run -p gftp -- receive workdir/gftp-upload/License
```

Upload file on provider side:
```
cargo run --release -p gftp -- upload LICENSE gftp://0x06bf342e4d1633aac5db38817c2e938e9d6ab7f3/z2IeDvgs1Q1hZ6seR0iSEsKW8kxdxQCK0eoz6DsYVznqJIl5K18NqwJPdLgesY9yR
cargo run -p gftp -- upload LICENSE gftp://0x06bf342e4d1633aac5db38817c2e938e9d6ab7f3/z2IeDvgs1Q1hZ6seR0iSEsKW8kxdxQCK0eoz6DsYVznqJIl5K18NqwJPdLgesY9yR
```

## JSON-RPC 2.0 server

To start the application in JSON RPC server mode, type:

```
cargo run -p gftp -- server
```

JSON RPC messages can be sent to application's stdin. **Each JSON object needs to be terminated with a new line** (`\n`).

### Publish

```json
{"jsonrpc": "2.0", "id": "1", "method": "publish", "params": {"files": ["Cargo.toml"]}}
```

### Download
```json
{"jsonrpc": "2.0", "id": 2, "method": "download", "params": {"url": "gftp://0xf2f32374dde7326be2461b4e16a34adb0afe018f/1d040d4ea83249ec6b8264305365acf3068e095245ea3981de1c4b16782253cc", "output_file": "/home/me/download.bin"}}
```

### AwaitUpload
```json
{"jsonrpc": "2.0", "id": "3", "method": "receive", "params": {"output_file": "/home/me/upload.bin"}}
```

### Upload
```json
{"jsonrpc": "2.0", "id": 4, "method": "upload", "params": {"url": "gftp://0xf2f32374dde7326be2461b4e16a34adb0afe018f/1d040d4ea83249ec6b8264305365acf3068e095245ea3981de1c4b16782253cc", "file": "/etc/passwd"}}
```

## Flags

- `-v`, `--verbose`

Increases output verbosity to match the one in JSON RPC server mode.
Loading

0 comments on commit d85c7c2

Please sign in to comment.