diff --git a/packages/proto/Cargo.toml b/packages/proto/Cargo.toml index 5119e94..066fecf 100644 --- a/packages/proto/Cargo.toml +++ b/packages/proto/Cargo.toml @@ -9,7 +9,11 @@ publish = false [dependencies] tairitsu-utils = { path = "../utils", version = "*" } -ron = "^0.8" serde = { version = "^1", features = ["derive"] } +serde_json = "^1" anyhow = "^1" tokio_wasi = { version = "^1", features = ["full"] } + +sea-orm = { git = "https://github.com/langyo/sea-orm", branch = "proxy-conn", features = [ + "proxy", +] } diff --git a/packages/proto/src/entity/mod.rs b/packages/proto/src/entity/mod.rs new file mode 100644 index 0000000..e8b6291 --- /dev/null +++ b/packages/proto/src/entity/mod.rs @@ -0,0 +1 @@ +pub mod post; diff --git a/packages/proto/src/entity/post.rs b/packages/proto/src/entity/post.rs new file mode 100644 index 0000000..8688460 --- /dev/null +++ b/packages/proto/src/entity/post.rs @@ -0,0 +1,17 @@ +use sea_orm::entity::prelude::*; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Deserialize, Serialize)] +#[sea_orm(table_name = "posts")] +pub struct Model { + #[sea_orm(primary_key)] + pub id: i64, + + pub title: String, + pub text: String, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/packages/proto/src/main.rs b/packages/proto/src/main.rs index 5869acb..2b393f3 100644 --- a/packages/proto/src/main.rs +++ b/packages/proto/src/main.rs @@ -1,23 +1,127 @@ -use anyhow::Result; -use std::io; +mod entity; -use tairitsu_utils::types::proto::backend::Msg; +use std::{ + collections::BTreeMap, + sync::{Arc, Mutex}, +}; -#[tokio::main(flavor = "current_thread")] -async fn main() -> Result<()> { - loop { - let mut buffer = String::new(); - let stdin = io::stdin(); - stdin.read_line(&mut buffer)?; +use sea_orm::{ + ActiveValue::Set, Database, DbBackend, DbErr, EntityTrait, ProxyDatabaseTrait, ProxyExecResult, + ProxyRow, Statement, +}; + +use entity::post::{ActiveModel, Entity}; +use tairitsu_utils::types::proto::backend::{RequestMsg, ResponseMsg}; + +#[derive(Debug)] +struct ProxyDb {} - let msg: Msg = ron::from_str(&buffer)?; +impl ProxyDatabaseTrait for ProxyDb { + fn query(&self, statement: Statement) -> Result, DbErr> { + let sql = statement.sql.clone(); + println!( + "{}", + serde_json::to_string(&RequestMsg::Query(sql)).unwrap() + ); - let ret = Msg { - id: msg.id + 1, - data: msg.data + " hahaha", + let mut input = String::new(); + std::io::stdin().read_line(&mut input).unwrap(); + let ret: ResponseMsg = serde_json::from_str(&input).unwrap(); + let ret = match ret { + ResponseMsg::Query(v) => v, + _ => unreachable!("Not a query result"), }; - let ret = ron::to_string(&ret)?; - println!("{}", ret); + let mut rows: Vec = vec![]; + for row in ret { + let mut map: BTreeMap = BTreeMap::new(); + for (k, v) in row.iter() { + map.insert(k.to_owned(), { + if v.is_string() { + sea_orm::Value::String(Some(Box::new(v.as_str().unwrap().to_string()))) + } else if v.is_number() { + sea_orm::Value::BigInt(Some(v.as_i64().unwrap())) + } else if v.is_boolean() { + sea_orm::Value::Bool(Some(v.as_bool().unwrap())) + } else { + unreachable!("Unknown json type") + } + }); + } + rows.push(ProxyRow { values: map }); + } + + Ok(rows) } + + fn execute(&self, statement: Statement) -> Result { + let sql = { + if let Some(values) = statement.values { + // Replace all the '?' with the statement values + let mut new_sql = statement.sql.clone(); + let mark_count = new_sql.matches('?').count(); + for (i, v) in values.0.iter().enumerate() { + if i >= mark_count { + break; + } + new_sql = new_sql.replacen('?', &v.to_string(), 1); + } + + new_sql + } else { + statement.sql + } + }; + + // Send the query to stdout + let msg = RequestMsg::Execute(sql); + let msg = serde_json::to_string(&msg).unwrap(); + println!("{}", msg); + + // Get the result from stdin + let mut input = String::new(); + std::io::stdin().read_line(&mut input).unwrap(); + let ret: ResponseMsg = serde_json::from_str(&input).unwrap(); + let ret = match ret { + ResponseMsg::Execute(v) => v, + _ => unreachable!(), + }; + + Ok(ret) + } +} + +#[tokio::main(flavor = "current_thread")] +async fn main() { + let db = Database::connect_proxy( + DbBackend::Sqlite, + Arc::new(Mutex::new(Box::new(ProxyDb {}))), + ) + .await + .unwrap(); + + let data = ActiveModel { + id: Set(11), + title: Set("Homo".to_owned()), + text: Set("いいよ、来いよ".to_owned()), + }; + Entity::insert(data).exec(&db).await.unwrap(); + let data = ActiveModel { + id: Set(45), + title: Set("Homo".to_owned()), + text: Set("そうだよ".to_owned()), + }; + Entity::insert(data).exec(&db).await.unwrap(); + let data = ActiveModel { + id: Set(14), + title: Set("Homo".to_owned()), + text: Set("悔い改めて".to_owned()), + }; + Entity::insert(data).exec(&db).await.unwrap(); + + let list = Entity::find().all(&db).await.unwrap().to_vec(); + println!( + "{}", + serde_json::to_string(&RequestMsg::Debug(format!("{:?}", list))).unwrap() + ); } diff --git a/packages/utils/Cargo.toml b/packages/utils/Cargo.toml index 885f814..584babb 100644 --- a/packages/utils/Cargo.toml +++ b/packages/utils/Cargo.toml @@ -18,6 +18,7 @@ chrono = { version = "^0.4", features = [ ] } log = "^0.4" serde = { version = "^1", features = ["derive"] } +serde_json = { version = "^1" } strum = "^0.25" strum_macros = "^0.25" uuid = { version = "^1", features = [ @@ -28,3 +29,6 @@ uuid = { version = "^1", features = [ ] } bcrypt = "^0.15" +sea-orm = { git = "https://github.com/langyo/sea-orm", branch = "proxy-conn", features = [ + "proxy", +] } diff --git a/packages/utils/src/types/proto/backend.rs b/packages/utils/src/types/proto/backend.rs index 6032dde..c8091cf 100644 --- a/packages/utils/src/types/proto/backend.rs +++ b/packages/utils/src/types/proto/backend.rs @@ -1,7 +1,20 @@ +use std::collections::BTreeMap; + +use sea_orm::ProxyExecResult; use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Serialize, Deserialize)] -pub struct Msg { - pub id: u32, - pub data: String, +pub enum RequestMsg { + Query(String), + Execute(String), + + Debug(String), +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum ResponseMsg { + Query(Vec>), + Execute(ProxyExecResult), + + None, } diff --git a/packages/vm/Cargo.toml b/packages/vm/Cargo.toml index 40dea36..00a9b53 100644 --- a/packages/vm/Cargo.toml +++ b/packages/vm/Cargo.toml @@ -12,8 +12,8 @@ tairitsu-utils = { path = "../utils", version = "*" } anyhow = "^1" bytes = "^1" async-trait = "^0.1" -ron = "^0.8" serde = { version = "^1", features = ["derive"] } +serde_json = "^1" reqwest = { version = "^0.11", features = ["blocking"] } lazy_static = "^1" flume = "^0.11" @@ -22,3 +22,7 @@ async-std = { version = "^1", features = ["attributes", "tokio1"] } wit-component = "^0.16" wasmtime = { version = "^14", features = ["component-model"] } wasmtime-wasi = "^14" +sea-orm = { git = "https://github.com/langyo/sea-orm", branch = "proxy-conn", features = [ + "proxy", +] } +gluesql = "^0.13" diff --git a/packages/vm/src/main.rs b/packages/vm/src/main.rs index 8773adc..b1981bc 100644 --- a/packages/vm/src/main.rs +++ b/packages/vm/src/main.rs @@ -1,21 +1,22 @@ +mod runtime; +mod stream; + use anyhow::Result; use bytes::Bytes; +use std::collections::BTreeMap; -use tairitsu_utils::types::proto::backend::Msg; +use gluesql::{memory_storage::MemoryStorage, prelude::Glue}; +use sea_orm::ProxyExecResult; use wasmtime::{Config, Engine}; use wit_component::ComponentEncoder; -mod runtime; -mod stream; - use runtime::Runtime; +use tairitsu_utils::types::proto::backend::{RequestMsg, ResponseMsg}; #[async_std::main] async fn main() -> Result<()> { // Transfer the wasm binary to wasm component binary - let adapter = include_bytes!("../res/wasi_snapshot_preview1.command.wasm"); - let component = &ComponentEncoder::default() .module(include_bytes!( "../../../target/wasm32-wasi/release/tairitsu-proto.wasm" @@ -32,43 +33,83 @@ async fn main() -> Result<()> { let cwasm = engine.precompile_component(component)?; let cwasm = Bytes::from(cwasm); - // Run the prototype demo + // Create the database connection + println!("Creating database connection..."); + let mem = MemoryStorage::default(); + let mut db = Glue::new(mem); + db.execute( + r#" + CREATE TABLE IF NOT EXISTS posts ( + id INTEGER PRIMARY KEY, + title TEXT NOT NULL, + text TEXT NOT NULL + ) + "#, + )?; + // Run the prototype demo println!("Running prototype demo..."); - let entity_base = Runtime::new(cwasm); - - use std::time::Instant; - let now = Instant::now(); - - let mut threads = Vec::new(); - for index in 0..10 { - let mut entity = entity_base.clone(); - threads.push(std::thread::spawn(move || { - let mut runner = entity.init().unwrap(); - - let tx = runner.tx.clone(); - let rx = runner.rx.clone(); - - std::thread::spawn(move || { - runner.run().unwrap(); - }); - - let data = Msg { - id: 233, - data: "hello".to_string(), - }; - println!("#{index} Sending to vm: {:?}", data); - tx.send(data).unwrap(); - - let msg = rx.recv().unwrap(); - println!("#{index} Received on main: {:?}", msg); - })); - } - - for thread in threads { - thread.join().unwrap(); + let mut runner = Runtime::new(cwasm).init()?; + + let tx = runner.tx.clone(); + let rx = runner.rx.clone(); + + std::thread::spawn(move || { + runner.run().unwrap(); + }); + + while let Ok(msg) = rx.recv() { + match msg { + RequestMsg::Execute(sql) => { + println!("SQL execute: {:?}", sql); + let ret = db.execute(sql)?; + + println!("SQL execute result: {:?}", ret); + let ret = ResponseMsg::Execute(ProxyExecResult { + last_insert_id: 1, + rows_affected: 1, + }); + tx.send(ret)?; + } + RequestMsg::Query(sql) => { + println!("SQL query: {:?}", sql); + + let mut ret = vec![]; + for payload in db.execute(sql)?.iter() { + match payload { + gluesql::prelude::Payload::Select { labels, rows } => { + for row in rows.iter() { + let mut map = BTreeMap::new(); + for (label, column) in labels.iter().zip(row.iter()) { + map.insert( + label.to_owned(), + match column { + gluesql::prelude::Value::I64(val) => { + serde_json::Value::Number((*val).into()) + } + gluesql::prelude::Value::Str(val) => { + serde_json::Value::String(val.to_owned()) + } + _ => unreachable!("Unsupported value: {:?}", column), + }, + ); + } + ret.push(map.into()); + } + } + _ => unreachable!("Unsupported payload: {:?}", payload), + } + } + + println!("SQL query result: {:?}", ret); + let ret = ResponseMsg::Query(ret); + tx.send(ret)?; + } + RequestMsg::Debug(msg) => { + println!("VM Debug: {}", msg); + } + } } - println!("Time elapsed: {:?}", now.elapsed()); Ok(()) } diff --git a/packages/vm/src/runtime.rs b/packages/vm/src/runtime.rs index 76a6bbc..b998a8f 100644 --- a/packages/vm/src/runtime.rs +++ b/packages/vm/src/runtime.rs @@ -12,7 +12,7 @@ use wasmtime_wasi::preview2::{ }; use crate::stream::{HostInputStreamBox, HostOutputStreamBox}; -use tairitsu_utils::types::proto::backend::Msg; +use tairitsu_utils::types::proto::backend::{RequestMsg, ResponseMsg}; struct Ctx { wasi: WasiCtx, @@ -45,8 +45,8 @@ pub struct Runner { component: Component, linker: Linker, - pub tx: Sender, - pub rx: Receiver, + pub tx: Sender, + pub rx: Receiver, } impl Runtime { diff --git a/packages/vm/src/stream.rs b/packages/vm/src/stream.rs index 1cae797..1c0a373 100644 --- a/packages/vm/src/stream.rs +++ b/packages/vm/src/stream.rs @@ -6,10 +6,10 @@ use wasmtime_wasi::preview2::{ HostInputStream, HostOutputStream, StdinStream, StdoutStream, StreamResult, Subscribe, }; -use tairitsu_utils::types::proto::backend::Msg; +use tairitsu_utils::types::proto::backend::{RequestMsg, ResponseMsg}; pub struct InputStream { - pub tasks: Arc>>, + pub tasks: Arc>>, } #[async_trait::async_trait] @@ -25,7 +25,7 @@ impl HostInputStream for InputStream { let mut tasks = self.tasks.lock().unwrap(); if tasks.len() > 0 { let ret = tasks.remove(0); - let ret = ron::to_string(&ret).unwrap() + "\n"; + let ret = serde_json::to_string(&ret).unwrap() + "\n"; let ret = Bytes::from(ret); return Ok(ret); @@ -37,7 +37,7 @@ impl HostInputStream for InputStream { } pub struct OutputStream { - pub tx: Sender, + pub tx: Sender, } #[async_trait::async_trait] @@ -49,7 +49,7 @@ impl Subscribe for OutputStream { impl HostOutputStream for OutputStream { fn write(&mut self, bytes: Bytes) -> StreamResult<()> { let msg = String::from_utf8(bytes.to_vec()).expect("Failed to parse message"); - let msg = ron::from_str::(&msg).expect("Failed to parse message"); + let msg = serde_json::from_str::(&msg).expect("Failed to parse message"); self.tx.send(msg).expect("Failed to send message"); Ok(()) @@ -65,7 +65,7 @@ impl HostOutputStream for OutputStream { } pub struct HostInputStreamBox { - pub tasks: Arc>>, + pub tasks: Arc>>, } impl StdinStream for HostInputStreamBox { @@ -81,7 +81,7 @@ impl StdinStream for HostInputStreamBox { } pub struct HostOutputStreamBox { - pub tx: Sender, + pub tx: Sender, } impl StdoutStream for HostOutputStreamBox {