Skip to content

Commit

Permalink
⚗️ Try to use proxy connection and gluesql.
Browse files Browse the repository at this point in the history
  • Loading branch information
langyo committed Oct 25, 2023
1 parent b268b08 commit f819768
Show file tree
Hide file tree
Showing 10 changed files with 258 additions and 70 deletions.
6 changes: 5 additions & 1 deletion packages/proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ publish = false
[dependencies]
tairitsu-utils = { path = "../utils", version = "*" }

ron = "^0.8"
serde = { version = "^1", features = ["derive"] }
serde_json = "^1"
anyhow = "^1"
tokio_wasi = { version = "^1", features = ["full"] }

sea-orm = { git = "https://github.com/langyo/sea-orm", branch = "proxy-conn", features = [
"proxy",
] }
1 change: 1 addition & 0 deletions packages/proto/src/entity/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod post;
17 changes: 17 additions & 0 deletions packages/proto/src/entity/post.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Deserialize, Serialize)]
#[sea_orm(table_name = "posts")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i64,

pub title: String,
pub text: String,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}

impl ActiveModelBehavior for ActiveModel {}
134 changes: 119 additions & 15 deletions packages/proto/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,127 @@
use anyhow::Result;
use std::io;
mod entity;

use tairitsu_utils::types::proto::backend::Msg;
use std::{
collections::BTreeMap,
sync::{Arc, Mutex},
};

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
loop {
let mut buffer = String::new();
let stdin = io::stdin();
stdin.read_line(&mut buffer)?;
use sea_orm::{
ActiveValue::Set, Database, DbBackend, DbErr, EntityTrait, ProxyDatabaseTrait, ProxyExecResult,
ProxyRow, Statement,
};

use entity::post::{ActiveModel, Entity};
use tairitsu_utils::types::proto::backend::{RequestMsg, ResponseMsg};

#[derive(Debug)]
struct ProxyDb {}

let msg: Msg = ron::from_str(&buffer)?;
impl ProxyDatabaseTrait for ProxyDb {
fn query(&self, statement: Statement) -> Result<Vec<ProxyRow>, DbErr> {
let sql = statement.sql.clone();
println!(
"{}",
serde_json::to_string(&RequestMsg::Query(sql)).unwrap()
);

let ret = Msg {
id: msg.id + 1,
data: msg.data + " hahaha",
let mut input = String::new();
std::io::stdin().read_line(&mut input).unwrap();
let ret: ResponseMsg = serde_json::from_str(&input).unwrap();
let ret = match ret {
ResponseMsg::Query(v) => v,
_ => unreachable!("Not a query result"),
};
let ret = ron::to_string(&ret)?;

println!("{}", ret);
let mut rows: Vec<ProxyRow> = vec![];
for row in ret {
let mut map: BTreeMap<String, sea_orm::Value> = BTreeMap::new();
for (k, v) in row.iter() {
map.insert(k.to_owned(), {
if v.is_string() {
sea_orm::Value::String(Some(Box::new(v.as_str().unwrap().to_string())))
} else if v.is_number() {
sea_orm::Value::BigInt(Some(v.as_i64().unwrap()))
} else if v.is_boolean() {
sea_orm::Value::Bool(Some(v.as_bool().unwrap()))
} else {
unreachable!("Unknown json type")
}
});
}
rows.push(ProxyRow { values: map });
}

Ok(rows)
}

fn execute(&self, statement: Statement) -> Result<ProxyExecResult, DbErr> {
let sql = {
if let Some(values) = statement.values {
// Replace all the '?' with the statement values
let mut new_sql = statement.sql.clone();
let mark_count = new_sql.matches('?').count();
for (i, v) in values.0.iter().enumerate() {
if i >= mark_count {
break;
}
new_sql = new_sql.replacen('?', &v.to_string(), 1);
}

new_sql
} else {
statement.sql
}
};

// Send the query to stdout
let msg = RequestMsg::Execute(sql);
let msg = serde_json::to_string(&msg).unwrap();
println!("{}", msg);

// Get the result from stdin
let mut input = String::new();
std::io::stdin().read_line(&mut input).unwrap();
let ret: ResponseMsg = serde_json::from_str(&input).unwrap();
let ret = match ret {
ResponseMsg::Execute(v) => v,
_ => unreachable!(),
};

Ok(ret)
}
}

#[tokio::main(flavor = "current_thread")]
async fn main() {
let db = Database::connect_proxy(
DbBackend::Sqlite,
Arc::new(Mutex::new(Box::new(ProxyDb {}))),
)
.await
.unwrap();

let data = ActiveModel {
id: Set(11),
title: Set("Homo".to_owned()),
text: Set("いいよ、来いよ".to_owned()),
};
Entity::insert(data).exec(&db).await.unwrap();
let data = ActiveModel {
id: Set(45),
title: Set("Homo".to_owned()),
text: Set("そうだよ".to_owned()),
};
Entity::insert(data).exec(&db).await.unwrap();
let data = ActiveModel {
id: Set(14),
title: Set("Homo".to_owned()),
text: Set("悔い改めて".to_owned()),
};
Entity::insert(data).exec(&db).await.unwrap();

let list = Entity::find().all(&db).await.unwrap().to_vec();
println!(
"{}",
serde_json::to_string(&RequestMsg::Debug(format!("{:?}", list))).unwrap()
);
}
4 changes: 4 additions & 0 deletions packages/utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ chrono = { version = "^0.4", features = [
] }
log = "^0.4"
serde = { version = "^1", features = ["derive"] }
serde_json = { version = "^1" }
strum = "^0.25"
strum_macros = "^0.25"
uuid = { version = "^1", features = [
Expand All @@ -28,3 +29,6 @@ uuid = { version = "^1", features = [
] }

bcrypt = "^0.15"
sea-orm = { git = "https://github.com/langyo/sea-orm", branch = "proxy-conn", features = [
"proxy",
] }
19 changes: 16 additions & 3 deletions packages/utils/src/types/proto/backend.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,20 @@
use std::collections::BTreeMap;

use sea_orm::ProxyExecResult;
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Msg {
pub id: u32,
pub data: String,
pub enum RequestMsg {
Query(String),
Execute(String),

Debug(String),
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum ResponseMsg {
Query(Vec<BTreeMap<String, serde_json::Value>>),
Execute(ProxyExecResult),

None,
}
6 changes: 5 additions & 1 deletion packages/vm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ tairitsu-utils = { path = "../utils", version = "*" }
anyhow = "^1"
bytes = "^1"
async-trait = "^0.1"
ron = "^0.8"
serde = { version = "^1", features = ["derive"] }
serde_json = "^1"
reqwest = { version = "^0.11", features = ["blocking"] }
lazy_static = "^1"
flume = "^0.11"
Expand All @@ -22,3 +22,7 @@ async-std = { version = "^1", features = ["attributes", "tokio1"] }
wit-component = "^0.16"
wasmtime = { version = "^14", features = ["component-model"] }
wasmtime-wasi = "^14"
sea-orm = { git = "https://github.com/langyo/sea-orm", branch = "proxy-conn", features = [
"proxy",
] }
gluesql = "^0.13"
121 changes: 81 additions & 40 deletions packages/vm/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
mod runtime;
mod stream;

use anyhow::Result;
use bytes::Bytes;
use std::collections::BTreeMap;

use tairitsu_utils::types::proto::backend::Msg;
use gluesql::{memory_storage::MemoryStorage, prelude::Glue};
use sea_orm::ProxyExecResult;
use wasmtime::{Config, Engine};
use wit_component::ComponentEncoder;

mod runtime;
mod stream;

use runtime::Runtime;
use tairitsu_utils::types::proto::backend::{RequestMsg, ResponseMsg};

#[async_std::main]
async fn main() -> Result<()> {
// Transfer the wasm binary to wasm component binary

let adapter = include_bytes!("../res/wasi_snapshot_preview1.command.wasm");

let component = &ComponentEncoder::default()
.module(include_bytes!(
"../../../target/wasm32-wasi/release/tairitsu-proto.wasm"
Expand All @@ -32,43 +33,83 @@ async fn main() -> Result<()> {
let cwasm = engine.precompile_component(component)?;
let cwasm = Bytes::from(cwasm);

// Run the prototype demo
// Create the database connection
println!("Creating database connection...");
let mem = MemoryStorage::default();
let mut db = Glue::new(mem);
db.execute(
r#"
CREATE TABLE IF NOT EXISTS posts (
id INTEGER PRIMARY KEY,
title TEXT NOT NULL,
text TEXT NOT NULL
)
"#,
)?;

// Run the prototype demo
println!("Running prototype demo...");
let entity_base = Runtime::new(cwasm);

use std::time::Instant;
let now = Instant::now();

let mut threads = Vec::new();
for index in 0..10 {
let mut entity = entity_base.clone();
threads.push(std::thread::spawn(move || {
let mut runner = entity.init().unwrap();

let tx = runner.tx.clone();
let rx = runner.rx.clone();

std::thread::spawn(move || {
runner.run().unwrap();
});

let data = Msg {
id: 233,
data: "hello".to_string(),
};
println!("#{index} Sending to vm: {:?}", data);
tx.send(data).unwrap();

let msg = rx.recv().unwrap();
println!("#{index} Received on main: {:?}", msg);
}));
}

for thread in threads {
thread.join().unwrap();
let mut runner = Runtime::new(cwasm).init()?;

let tx = runner.tx.clone();
let rx = runner.rx.clone();

std::thread::spawn(move || {
runner.run().unwrap();
});

while let Ok(msg) = rx.recv() {
match msg {
RequestMsg::Execute(sql) => {
println!("SQL execute: {:?}", sql);
let ret = db.execute(sql)?;

println!("SQL execute result: {:?}", ret);
let ret = ResponseMsg::Execute(ProxyExecResult {
last_insert_id: 1,
rows_affected: 1,
});
tx.send(ret)?;
}
RequestMsg::Query(sql) => {
println!("SQL query: {:?}", sql);

let mut ret = vec![];
for payload in db.execute(sql)?.iter() {
match payload {
gluesql::prelude::Payload::Select { labels, rows } => {
for row in rows.iter() {
let mut map = BTreeMap::new();
for (label, column) in labels.iter().zip(row.iter()) {
map.insert(
label.to_owned(),
match column {
gluesql::prelude::Value::I64(val) => {
serde_json::Value::Number((*val).into())
}
gluesql::prelude::Value::Str(val) => {
serde_json::Value::String(val.to_owned())
}
_ => unreachable!("Unsupported value: {:?}", column),
},
);
}
ret.push(map.into());
}
}
_ => unreachable!("Unsupported payload: {:?}", payload),
}
}

println!("SQL query result: {:?}", ret);
let ret = ResponseMsg::Query(ret);
tx.send(ret)?;
}
RequestMsg::Debug(msg) => {
println!("VM Debug: {}", msg);
}
}
}
println!("Time elapsed: {:?}", now.elapsed());

Ok(())
}
6 changes: 3 additions & 3 deletions packages/vm/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use wasmtime_wasi::preview2::{
};

use crate::stream::{HostInputStreamBox, HostOutputStreamBox};
use tairitsu_utils::types::proto::backend::Msg;
use tairitsu_utils::types::proto::backend::{RequestMsg, ResponseMsg};

struct Ctx {
wasi: WasiCtx,
Expand Down Expand Up @@ -45,8 +45,8 @@ pub struct Runner {
component: Component,
linker: Linker<Ctx>,

pub tx: Sender<Msg>,
pub rx: Receiver<Msg>,
pub tx: Sender<ResponseMsg>,
pub rx: Receiver<RequestMsg>,
}

impl Runtime {
Expand Down
Loading

0 comments on commit f819768

Please sign in to comment.