Skip to content

Commit

Permalink
kvs mut example wip, fix hydro-project#785
Browse files Browse the repository at this point in the history
  • Loading branch information
MingweiSamuel committed Mar 4, 2024
1 parent f8311db commit 3eb3da0
Show file tree
Hide file tree
Showing 15 changed files with 316 additions and 33 deletions.
28 changes: 0 additions & 28 deletions hydroflow/examples/kvs/README.md

This file was deleted.

28 changes: 28 additions & 0 deletions hydroflow/examples/kvs_mut/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
Simple single-node key-value store example based on a join of PUTs and GETs.

Current semantics are:
- PUTs are appended: we remember them all forever
- GETs are only remembered for the current tick, which may not be monotone depending on how they
are consumed.
- GETs for empty keys get no acknowledgement.

Clients accept commands on stdin. Command syntax is as follows:
- `PUT <key>, <value>`
- `GET <key>`
Commands are case-insensitive. All keys and values are treated as `String`s.

## Running the example

To run the example, open 2 terminals.

In one terminal run the server like so:
```
cargo run -p hydroflow --example kvs_mut -- --role server --addr localhost:12346
```

In another terminal run a client:
```
cargo run -p hydroflow --example kvs_mut -- --role client --addr localhost:9090 --server-addr localhost:12346
```

Adding the `--graph <graph_type>` flag to the end of the command lines above will print out a node-and-edge diagram of the program. Supported values for `<graph_type>` include [mermaid](https://mermaid-js.github.io/) and [dot](https://graphviz.org/doc/info/lang.html).
File renamed without changes.
29 changes: 29 additions & 0 deletions hydroflow/examples/kvs_mut/helpers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use regex::Regex;

use crate::protocol::KvsMessage;

pub fn parse_command(line: String) -> Option<KvsMessage> {
let re = Regex::new(r"([A-z]+)\s+(.+)").unwrap();
let caps = re.captures(line.as_str())?;

let binding = caps.get(1).unwrap().as_str().to_uppercase();
let cmdstr = binding.as_str();
let args = caps.get(2).unwrap().as_str();
match cmdstr {
"PUT" => {
let kv = args.split_once(',')?;
Some(KvsMessage::Put {
key: kv.0.trim().to_string(),
value: Some(kv.1.trim().to_string()),
})
}
"DELETE" => Some(KvsMessage::Put {
key: args.trim().to_string(),
value: None,
}),
"GET" => Some(KvsMessage::Get {
key: args.trim().to_string(),
}),
_ => None,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ fn test() {
use hydroflow::util::{run_cargo_example, wait_for_process_output};

let (_server, _, mut server_stdout) =
run_cargo_example("kvs", "--role server --addr 127.0.0.1:2051");
run_cargo_example("kvs_mut", "--role server --addr 127.0.0.1:2051");

let (_client, mut client_stdin, mut client_stdout) = run_cargo_example(
"kvs",
"kvs_mut",
"--role client --addr 127.0.0.1:2052 --server-addr 127.0.0.1:2051",
);

Expand All @@ -74,7 +74,7 @@ fn test() {
client_stdin.write_all(b"PUT a,7\n").unwrap();

let (_client2, mut client2_stdin, mut client2_stdout) = run_cargo_example(
"kvs",
"kvs_mut",
"--role client --addr 127.0.0.1:2053 --server-addr 127.0.0.1:2051",
);

Expand Down
37 changes: 37 additions & 0 deletions hydroflow/examples/kvs_mut/protocol.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use std::net::SocketAddr;

use hydroflow_macro::DemuxEnum;
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Serialize, Deserialize, DemuxEnum)]
pub enum KvsMessage {
Put { key: String, value: Option<String> },
Get { key: String },
}

#[derive(Clone, Debug, DemuxEnum)]
pub enum KvsMessageWithAddr {
Put {
key: String,
value: Option<String>,
addr: SocketAddr,
},
Get {
key: String,
addr: SocketAddr,
},
}
impl KvsMessageWithAddr {
pub fn from_message(message: KvsMessage, addr: SocketAddr) -> Self {
match message {
KvsMessage::Put { key, value } => Self::Put { key, value, addr },
KvsMessage::Get { key } => Self::Get { key, addr },
}
}
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct KvsResponse {
pub key: String,
pub value: String,
}
52 changes: 52 additions & 0 deletions hydroflow/examples/kvs_mut/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use hydroflow::hydroflow_syntax;
use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::util::{PersistenceKeyed, UdpSink, UdpStream};

use crate::protocol::{KvsMessageWithAddr, KvsResponse};
use crate::Opts;

pub(crate) async fn run_server(outbound: UdpSink, inbound: UdpStream, opts: Opts) {
println!("Server live!");

let mut hf: Hydroflow = hydroflow_syntax! {
// Setup network channels.
network_send = dest_sink_serde(outbound);
network_recv = source_stream_serde(inbound)
-> _upcast(Some(Delta))
-> map(Result::unwrap)
-> inspect(|(msg, addr)| println!("Message received {:?} from {:?}", msg, addr))
-> map(|(msg, addr)| KvsMessageWithAddr::from_message(msg, addr))
-> demux_enum::<KvsMessageWithAddr>();
puts = network_recv[Put];
gets = network_recv[Get];

// Store puts mutably (supporting deletion)
puts
-> flat_map(|(key, value, _addr)| {
match value {
Some(value) => PersistenceKeyed::Persist(key, value),
None => PersistenceKeyed::Delete(key),
}
})
-> persist_mut_keyed()
-> [0]lookup;
gets -> [1]lookup;
// Join PUTs and GETs by key, persisting the PUTs.
lookup = join::<'tick, 'tick>();

// Send GET responses back to the client address.
lookup
-> inspect(|tup| println!("Found a match: {:?}", tup))
-> map(|(key, (value, client_addr))| (KvsResponse { key, value }, client_addr))
-> network_send;
};

if let Some(graph) = opts.graph {
let serde_graph = hf
.meta_graph()
.expect("No graph found, maybe failed to parse.");
serde_graph.open_graph(graph, opts.write_config).unwrap();
}

hf.run_async().await.unwrap();
}
34 changes: 34 additions & 0 deletions hydroflow/examples/kvs_pubsub/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
Simple single-node key-value store example based on a join of PUTs and GETs.

Current semantics are:
- PUTs are appended: we remember them all forever
- GETs are only remembered for the current tick, which may not be monotone depending on how they
are consumed.
- GETs for empty keys get no acknowledgement.

Clients accept commands on stdin. Command syntax is as follows:
- `PUT <key>, <value>`
- `GET <key>`
Commands are case-insensitive. All keys and values are treated as `String`s.

## Pubsub?

This KVS actually acts as a publish-subscribe service, because deleting old values, in the simplest case,
is not monotonic. So therefore a read on a particular key will receive future writes to that key.
For a more traditional, and non-monotonic KVS, see the `kvs_mut` example.

## Running the example

To run the example, open 2 terminals.

In one terminal run the server like so:
```
cargo run -p hydroflow --example kvs_pubsub -- --role server --addr localhost:12346
```

In another terminal run a client:
```
cargo run -p hydroflow --example kvs_pubsub -- --role client --addr localhost:9090 --server-addr localhost:12346
```

Adding the `--graph <graph_type>` flag to the end of the command lines above will print out a node-and-edge diagram of the program. Supported values for `<graph_type>` include [mermaid](https://mermaid-js.github.io/) and [dot](https://graphviz.org/doc/info/lang.html).
41 changes: 41 additions & 0 deletions hydroflow/examples/kvs_pubsub/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use std::net::SocketAddr;

use hydroflow::hydroflow_syntax;
use hydroflow::util::{UdpSink, UdpStream};

use crate::helpers::parse_command;
use crate::protocol::KvsResponse;
use crate::Opts;

pub(crate) async fn run_client(
outbound: UdpSink,
inbound: UdpStream,
server_addr: SocketAddr,
opts: Opts,
) {
println!("Client live!");

let mut hf = hydroflow_syntax! {
// set up channels
outbound_chan = dest_sink_serde(outbound);
inbound_chan = source_stream_serde(inbound) -> map(Result::unwrap);

// read in commands from stdin and forward to server
source_stdin()
-> filter_map(|line| parse_command(line.unwrap()))
-> map(|msg| { (msg, server_addr) })
-> outbound_chan;

// print inbound msgs
inbound_chan -> for_each(|(response, _addr): (KvsResponse, _)| println!("Got a Response: {:?}", response));
};

if let Some(graph) = opts.graph {
let serde_graph = hf
.meta_graph()
.expect("No graph found, maybe failed to parse.");
serde_graph.open_graph(graph, opts.write_config).unwrap();
}

hf.run_async().await.unwrap();
}
File renamed without changes.
90 changes: 90 additions & 0 deletions hydroflow/examples/kvs_pubsub/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use std::net::SocketAddr;

use clap::{Parser, ValueEnum};
use client::run_client;
use hydroflow::lang::graph::{WriteConfig, WriteGraphType};
use hydroflow::util::{bind_udp_bytes, ipv4_resolve};
use server::run_server;

mod client;
mod helpers;
mod protocol;
mod server;

#[derive(Clone, ValueEnum, Debug)]
enum Role {
Client,
Server,
}

#[derive(Parser, Debug)]
struct Opts {
#[clap(value_enum, long)]
role: Role,
#[clap(long, value_parser = ipv4_resolve)]
addr: Option<SocketAddr>,
#[clap(long, value_parser = ipv4_resolve)]
server_addr: Option<SocketAddr>,
#[clap(long)]
graph: Option<WriteGraphType>,
#[clap(flatten)]
write_config: Option<WriteConfig>,
}

#[hydroflow::main]
async fn main() {
let opts = Opts::parse();
let addr = opts.addr.unwrap();

match opts.role {
Role::Client => {
let (outbound, inbound, _) = bind_udp_bytes(addr).await;
println!("Client is bound to {:?}", addr);
println!("Attempting to connect to server at {:?}", opts.server_addr);
run_client(outbound, inbound, opts.server_addr.unwrap(), opts).await;
}
Role::Server => {
let (outbound, inbound, _) = bind_udp_bytes(addr).await;
println!("Listening on {:?}", opts.addr.unwrap());
run_server(outbound, inbound, opts).await;
}
}
}

#[test]
fn test() {
use std::io::Write;

use hydroflow::util::{run_cargo_example, wait_for_process_output};

let (_server, _, mut server_stdout) =
run_cargo_example("kvs_pubsub", "--role server --addr 127.0.0.1:2051");

let (_client, mut client_stdin, mut client_stdout) = run_cargo_example(
"kvs_pubsub",
"--role client --addr 127.0.0.1:2052 --server-addr 127.0.0.1:2051",
);

let mut server_output = String::new();
wait_for_process_output(&mut server_output, &mut server_stdout, "Server live!");

let mut client_output = String::new();
wait_for_process_output(&mut client_output, &mut client_stdout, "Client live!");

client_stdin.write_all(b"PUT a,7\n").unwrap();

let (_client2, mut client2_stdin, mut client2_stdout) = run_cargo_example(
"kvs_pubsub",
"--role client --addr 127.0.0.1:2053 --server-addr 127.0.0.1:2051",
);

let mut client2_output = String::new();
wait_for_process_output(&mut client2_output, &mut client2_stdout, "Client live!");

client2_stdin.write_all(b"GET a\n").unwrap();
wait_for_process_output(
&mut client2_output,
&mut client2_stdout,
r#"Got a Response: KvsResponse \{ key: "a", value: "7" \}"#,
);
}
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion hydroflow_lang/src/graph/ops/persist_mut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use super::{

/// `persist_mut()` is similar to `persist()` except that it also enables deletions.
/// `persist_mut()` expects an input of type `Persistence<T>`, and it is this enumeration that enables the user to communicate deletion.
/// Deletions/persists hapepn in the order they are received in the stream. For example, [Persist(1), Delete(1), Persist(1)] will result ina a single '1' value being stored.
/// Deletions/persists happen in the order they are received in the stream. For example, [Persist(1), Delete(1), Persist(1)] will result ina a single '1' value being stored.
///
/// ```hydroflow
/// source_iter([hydroflow::util::Persistence::Persist(1), hydroflow::util::Persistence::Persist(2), hydroflow::util::Persistence::Delete(1)])
Expand Down
2 changes: 1 addition & 1 deletion hydroflow_lang/src/graph/ops/persist_mut_keyed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use super::{

/// `persist_mut_keyed()` is similar to `persist_mut()` except that it also enables key-based deletions
/// `persist_mut()` expects an input of type `PersistenceKeyed<T>`, and it is this enumeration that enables the user to communicate deletion.
/// Deletions/persists hapepn in the order they are received in the stream. For example, [Persist(1), Delete(1), Persist(1)] will result ina a single '1' value being stored.
/// Deletions/persists happen in the order they are received in the stream. For example, [Persist(1), Delete(1), Persist(1)] will result ina a single '1' value being stored.
///
/// ```hydroflow
/// source_iter([hydroflow::util::PersistenceKeyed::Persist(0, 1), hydroflow::util::PersistenceKeyed::Persist(1, 1), hydroflow::util::PersistenceKeyed::Delete(1)])
Expand Down

0 comments on commit 3eb3da0

Please sign in to comment.