From 5054c4b7850ad43cdf777028ea5d6bae08c73f6e Mon Sep 17 00:00:00 2001 From: ajaykumargdr Date: Tue, 26 Mar 2024 16:58:42 +0530 Subject: [PATCH 01/13] refactor: remove unwanted methods inside ssb client and clear clippy warnings --- runtime/lite/examples/basic-producer.rs | 17 +-- runtime/lite/src/consumer.rs | 15 ++- runtime/lite/src/context.rs | 25 ++-- .../modules/kuska_ssb_client/client/client.rs | 107 +++++------------- .../modules/kuska_ssb_client/client/mod.rs | 4 +- .../client/response_parser.rs | 2 +- .../modules/kuska_ssb_client/client/tests.rs | 6 +- .../modules/kuska_ssb_client/client/types.rs | 17 --- runtime/lite/src/modules/logger/mod.rs | 1 + runtime/lite/src/modules/storage/mod.rs | 1 + runtime/lite/src/modules/storage/storage.rs | 6 +- .../src/modules/wasmtime_wasi_module/mod.rs | 6 +- 12 files changed, 69 insertions(+), 138 deletions(-) diff --git a/runtime/lite/examples/basic-producer.rs b/runtime/lite/examples/basic-producer.rs index 215eae65..a3ac5b37 100644 --- a/runtime/lite/examples/basic-producer.rs +++ b/runtime/lite/examples/basic-producer.rs @@ -42,19 +42,6 @@ async fn main() { println!(" Hash: {block_hash}"); println!(" Extrinsics:"); - // Code for automate transfer - // use subxt_signer::sr25519::dev; - // if block_number == 10 { - // let dest = dev::bob().public_key().into(); - // let tx = polkadot::tx().balances().transfer_allow_death(dest, 10_000); - // let from = dev::alice(); - // let _events = api - // .tx() - // .sign_and_submit_then_watch_default(&tx, &from) - // .await - // .unwrap(); - // } - // Log each of the extrinsic with it's associated events: let extrinsics = block.extrinsics().await.unwrap(); for ext in extrinsics.iter() { @@ -81,11 +68,9 @@ async fn main() { name: None, }; - let result = client + let _ = client .publish(&value.to_string(), Some(vec![menttion])) .await; - // assert!(result.is_ok()); - // break 'outer; } None => (), } diff --git a/runtime/lite/src/consumer.rs b/runtime/lite/src/consumer.rs index 22c8b765..3e6ef14e 100644 --- a/runtime/lite/src/consumer.rs +++ b/runtime/lite/src/consumer.rs @@ -2,12 +2,12 @@ use kuska_ssb::keystore::read_patchwork_config; use runtime::{ common::RequestBody, logger::CoreLogger, + state_manager::GlobalState, modules::kuska_ssb_client::client::Client, storage::{CoreStorage, Storage}, Ctx, Logger, }; use std::{ - borrow::{Borrow, BorrowMut}, net::{TcpListener, TcpStream}, sync::{Arc, Mutex}, }; @@ -20,9 +20,13 @@ use dotenv::dotenv; async fn main() { dotenv().ok(); let db = CoreStorage::new("runtime").unwrap(); + let logger = CoreLogger::new(Some("./workflow")); + let state_manager = GlobalState::new(logger.clone()); + let context = Arc::new(Mutex::new(Context::new( - CoreLogger::new(Some("./workflow")), + logger, db, + state_manager ))); let secret = std::env::var("SECRET").unwrap_or_else(|_| { @@ -39,7 +43,10 @@ async fn main() { let mut client = Client::new(Some(key), "0.0.0.0".to_string(), port) .await .unwrap(); - client.live_feed_with_execution_of_workflow(true, ssb_context).await.unwrap(); + client + .live_feed_with_execution_of_workflow(true, ssb_context) + .await + .unwrap(); }); // Spawn the HTTP server task @@ -77,7 +84,7 @@ fn handle_client(mut stream: TcpStream, ctx: Arc>) { db.insert(&body.pub_id.clone(), body).unwrap(); logger.info("Data inserted successfully"); - + // println!("Received data: {:?}", body); // Respond to the client (optional) let response = "Data received!"; diff --git a/runtime/lite/src/context.rs b/runtime/lite/src/context.rs index 3935de4c..802aa72e 100644 --- a/runtime/lite/src/context.rs +++ b/runtime/lite/src/context.rs @@ -1,33 +1,40 @@ -use std::sync::{Mutex, MutexGuard, RwLockReadGuard}; +use std::sync::{Mutex, MutexGuard}; -use async_std::sync::RwLock; +use crate::{logger::CoreLogger, state_manager::{GlobalState, WorkflowState}, storage::CoreStorage}; -use crate::{state_manager::CoreLogger, storage::CoreStorage}; +type GlobalStateManager = GlobalState; pub trait Ctx: Send + 'static { - fn get_logger(&self) -> std::sync::MutexGuard; - fn get_db(&self) -> std::sync::MutexGuard; + fn get_logger(&self) -> MutexGuard; + fn get_db(&self) -> MutexGuard; + fn get_state_manager(&self) -> MutexGuard; } pub struct Context { pub logger: Mutex, - pub db : Mutex + pub db : Mutex, + pub state_manager : Mutex, } impl Ctx for Context { - fn get_logger(&self) -> std::sync::MutexGuard { + fn get_logger(&self) -> MutexGuard { self.logger.lock().unwrap() } - fn get_db(&self) -> std::sync::MutexGuard { + fn get_db(&self) -> MutexGuard { self.db.lock().unwrap() } + fn get_state_manager(&self) -> MutexGuard{ + self.state_manager.lock().unwrap() + } + } impl Context { - pub fn new(logger: CoreLogger, db: CoreStorage) -> Self { + pub fn new(logger: CoreLogger, db: CoreStorage, state_manager: GlobalStateManager) -> Self { Context { logger: Mutex::new(logger), db: Mutex::new(db), + state_manager: Mutex::new(state_manager), } } } diff --git a/runtime/lite/src/modules/kuska_ssb_client/client/client.rs b/runtime/lite/src/modules/kuska_ssb_client/client/client.rs index d5154a5d..a30a5c8b 100644 --- a/runtime/lite/src/modules/kuska_ssb_client/client/client.rs +++ b/runtime/lite/src/modules/kuska_ssb_client/client/client.rs @@ -5,7 +5,7 @@ use crate::{ }; use super::*; -use kuska_ssb::api::dto::content::Mention; +use kuska_ssb::api::dto::content::{Mention, Post}; impl Client { async fn get_async<'a, T, F>(&mut self, req_no: RequestNo, f: F) -> Result @@ -18,7 +18,7 @@ impl Client { if id == req_no { match msg { RecvMsg::RpcResponse(_type, body) => { - return f(&body).map_err(|err| err.into()); + return f(&body); } RecvMsg::ErrorResponse(message) => { return std::result::Result::Err(Box::new(AppError::new(message))); @@ -29,7 +29,7 @@ impl Client { } } - async fn print_source_until_eof<'a, T, F>(&mut self, req_no: RequestNo, f: F) -> Result> + async fn get_responses<'a, T, F>(&mut self, req_no: RequestNo, f: F) -> Result> where F: Fn(&[u8]) -> Result, T: Debug + serde::Deserialize<'a>, @@ -45,10 +45,7 @@ impl Client { let display = f(&body); match display { - Ok(display) => { - println!("response=> {:#?}", display); - response.push(display); - } + Ok(display) => response.push(display), Err(err) => { let body = std::str::from_utf8(&body).unwrap(); @@ -79,7 +76,7 @@ impl Client { ip: String, port: String, ) -> Result { - let server_pk = id.replace("=.ed25519", "").replace("@", ""); + let server_pk = id.replace("=.ed25519", "").replace('@', ""); let server_pk = ed25519::PublicKey::from_slice(&base64::decode(&server_pk)?).expect("bad public key"); @@ -104,12 +101,6 @@ impl Client { pub async fn new(configs: Option, ip: String, port: String) -> Result { match configs { Some(config) => { - // let public_key = - // PublicKey::from_slice(&base64::decode(&config.public_key)?).unwrap(); - // let secret_key = - // SecretKey::from_slice(&base64::decode(&config.secret_key)?).unwrap(); - // let id = config.id; - Self::ssb_handshake(config.pk, config.sk, config.id, ip, port).await } None => { @@ -132,19 +123,13 @@ impl Client { } pub async fn get(&mut self, msg_id: &str) -> Result { - let msg_id = if msg_id == "any" { - "%TL34NIX8JpMJN+ubHWx6cRhIwEal8VqHdKVg2t6lFcg=.sha256".to_string() - } else { - msg_id.to_string() - }; - - let req_id = self.api.get_req_send(&msg_id).await?; + let req_id = self.api.get_req_send(msg_id).await?; let msg = self.get_async(req_id, message_res_parse).await?; Ok(msg) } - pub async fn user(&mut self, live: bool, user_id: &str) -> Result> { + pub async fn get_feed_by_user(&mut self, live: bool, user_id: &str) -> Result> { let user_id = match user_id { "me" => self.whoami().await?, _ => user_id.to_string(), @@ -153,7 +138,7 @@ impl Client { let args = CreateHistoryStreamIn::new(user_id).live(live); let req_id = self.api.create_history_stream_req_send(&args).await?; - let feed = self.print_source_until_eof(req_id, feed_res_parse).await?; + let feed = self.get_responses(req_id, feed_res_parse).await?; Ok(feed) } @@ -162,7 +147,7 @@ impl Client { let args = CreateStreamIn::default().live(live); let req_id = self.api.create_feed_stream_req_send(&args).await?; - let feed = self.print_source_until_eof(req_id, feed_res_parse).await?; + let feed = self.get_responses(req_id, feed_res_parse).await?; Ok(feed) } @@ -175,57 +160,21 @@ impl Client { let args = CreateStreamIn::default().live(live); let req_id = self.api.create_feed_stream_req_send(&args).await?; - let _feed = self.execute_workflow_by_event(req_id, ctx).await?; - - Ok(()) - } - - pub async fn latest(&mut self) -> Result<()> { - let req_id = self.api.latest_req_send().await?; - self.print_source_until_eof(req_id, latest_res_parse) - .await?; - - Ok(()) - } - - pub async fn private(&mut self, user_id: &str) -> Result<()> { - let user_id = match user_id { - "me" => self.whoami().await?, - _ => user_id.to_string(), - }; - - let sk = self.get_secret_key(); - let show_private = |body: &[u8]| { - let msg = feed_res_parse(body)?.into_message()?; - if let serde_json::Value::String(content) = msg.content() { - if is_privatebox(&content) { - let ret = privatebox_decipher(&content, &sk)?.unwrap_or("".to_string()); - return Ok(ret); - } - } - return Ok("".to_string()); - }; - - let args = CreateHistoryStreamIn::new(user_id.to_string()); - let req_id = self.api.create_history_stream_req_send(&args).await?; - - self.print_source_until_eof(req_id, show_private).await?; + self.execute_workflow_by_event(req_id, ctx).await?; Ok(()) } - pub async fn create_invite(&mut self) -> Result<()> { + pub async fn create_invite(&mut self) -> Result> { let req_id = self.api.invite_create_req_send(1).await?; - - self.print_source_until_eof(req_id, invite_create).await?; - - Ok(()) + let responses = self.get_responses(req_id, invite_create).await?; + Ok(responses) } - pub async fn accept_invite(&mut self, invite_code: &str) -> Result<()> { - let req_id = self.api.invite_use_req_send(invite_code).await?; - self.print_source_until_eof(req_id, invite_create).await?; - Ok(()) + pub async fn accept_invite(&mut self, invite_code: &str) -> Result> { + let req_id = self.api.invite_use_req_send(invite_code).await?; + let responses = self.get_responses(req_id, invite_create).await?; + Ok(responses) } pub async fn publish(&mut self, msg: &str, mention: Option>) -> Result<()> { @@ -262,7 +211,6 @@ impl Client { req_no: RequestNo, ctx: Arc>, ) -> Result<()> { - let mut response = vec![]; let mut is_synced = false; @@ -277,14 +225,13 @@ impl Client { match display { Ok(display) => { if is_synced { - match serde_json::from_value::( + match serde_json::from_value::( display.value.get("content").unwrap().clone(), ) { Ok(x) => { match serde_json::from_str::(&x.text) { Ok(mut event) => { - response.push(display); println!("{:#?}", event); let ctx = ctx.lock().unwrap(); @@ -297,12 +244,14 @@ impl Client { "data" : crate::common::combine_values(&mut event, &body.input), "allowed_hosts": body.allowed_hosts }); - wasmtime_wasi_module::run_workflow( - serde_json::to_value(data).unwrap(), - body.wasm, - 0, - "hello", - ); + let _ = + wasmtime_wasi_module::run_workflow( + serde_json::to_value(data) + .unwrap(), + body.wasm, + 0, + "hello", + ); } Err(e) => logger.error(&e.to_string()), } @@ -327,12 +276,12 @@ impl Client { } } RecvMsg::ErrorResponse(message) => { - return std::result::Result::Err(Box::new(AppError::new(message))); + return std::result::Result::Err(Box::new(AppError::new(message))) } - RecvMsg::CancelStreamResponse() => {} _ => {} } } } } } + diff --git a/runtime/lite/src/modules/kuska_ssb_client/client/mod.rs b/runtime/lite/src/modules/kuska_ssb_client/client/mod.rs index 0cd1b521..0507c14b 100644 --- a/runtime/lite/src/modules/kuska_ssb_client/client/mod.rs +++ b/runtime/lite/src/modules/kuska_ssb_client/client/mod.rs @@ -3,7 +3,6 @@ extern crate kuska_ssb; extern crate base64; - use std::fmt::Debug; use async_std::net::TcpStream; @@ -16,11 +15,12 @@ use kuska_ssb::{ api::ApiCaller, crypto::ed25519::PublicKey, discovery::ssb_net_id, - feed::{is_privatebox, privatebox_decipher, Feed, Message}, + feed::{Feed, Message}, keystore::{from_patchwork_local, OwnedIdentity}, rpc::{RecvMsg, RequestNo, RpcReader, RpcWriter}, }; +#[allow(clippy::module_inception)] mod client; mod errors; mod response_parser; diff --git a/runtime/lite/src/modules/kuska_ssb_client/client/response_parser.rs b/runtime/lite/src/modules/kuska_ssb_client/client/response_parser.rs index 8e5b1ebe..c2a87457 100644 --- a/runtime/lite/src/modules/kuska_ssb_client/client/response_parser.rs +++ b/runtime/lite/src/modules/kuska_ssb_client/client/response_parser.rs @@ -7,7 +7,7 @@ pub fn message_res_parse(body: &[u8]) -> Result { Ok(Message::from_slice(body)?) } pub fn feed_res_parse(body: &[u8]) -> Result { - Ok(Feed::from_slice(&body)?) + Ok(Feed::from_slice(body)?) } pub fn latest_res_parse(body: &[u8]) -> Result { diff --git a/runtime/lite/src/modules/kuska_ssb_client/client/tests.rs b/runtime/lite/src/modules/kuska_ssb_client/client/tests.rs index aabbd573..f52c6354 100644 --- a/runtime/lite/src/modules/kuska_ssb_client/client/tests.rs +++ b/runtime/lite/src/modules/kuska_ssb_client/client/tests.rs @@ -1,6 +1,6 @@ #[cfg(test)] mod tests { - use crate::modules::kuska_ssb_client::client::{types, Client, UserConfig}; + use crate::modules::kuska_ssb_client::client::{types, Client}; use dotenv::dotenv; use kuska_ssb::keystore::read_patchwork_config; @@ -121,7 +121,7 @@ mod tests { // wait for server to publish async_std::task::sleep(std::time::Duration::from_secs(1)).await; - let feed = client.user(false, &config.id).await.unwrap(); + let feed = client.get_feed_by_user(false, &config.id).await.unwrap(); let event = feed.last().unwrap().value.clone(); let message = event.get("content").unwrap(); @@ -230,8 +230,6 @@ mod tests { #[tokio::test] #[ignore] async fn test_event_subscription() { - use super::*; - dotenv().ok(); let secret = std::env::var("SECRET").unwrap_or_else(|_| { diff --git a/runtime/lite/src/modules/kuska_ssb_client/client/types.rs b/runtime/lite/src/modules/kuska_ssb_client/client/types.rs index 7d58e540..685d5850 100644 --- a/runtime/lite/src/modules/kuska_ssb_client/client/types.rs +++ b/runtime/lite/src/modules/kuska_ssb_client/client/types.rs @@ -1,4 +1,3 @@ -use kuska_ssb::api::dto::content::Mention; use serde::{Deserialize, Serialize}; use super::*; @@ -15,12 +14,6 @@ pub struct Event { pub body: String, } -pub struct UserConfig { - pub public_key: String, - pub secret_key: String, - pub id: String, -} - #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] pub struct Content { #[serde(rename = "type")] @@ -28,13 +21,3 @@ pub struct Content { pub text: String, // mentions: Option>, } - -impl UserConfig { - pub fn new(public_key: &str, secret_key: &str, id: &str) -> Self { - Self { - public_key: public_key.to_string(), - secret_key: secret_key.to_string(), - id: id.to_string(), - } - } -} diff --git a/runtime/lite/src/modules/logger/mod.rs b/runtime/lite/src/modules/logger/mod.rs index ce0131ac..fb7be6c9 100644 --- a/runtime/lite/src/modules/logger/mod.rs +++ b/runtime/lite/src/modules/logger/mod.rs @@ -1,3 +1,4 @@ +#[allow(clippy::module_inception)] pub mod logger; pub mod traits; diff --git a/runtime/lite/src/modules/storage/mod.rs b/runtime/lite/src/modules/storage/mod.rs index 6cb2bb1d..dd060ff8 100644 --- a/runtime/lite/src/modules/storage/mod.rs +++ b/runtime/lite/src/modules/storage/mod.rs @@ -2,6 +2,7 @@ /// does: pub mod traits; pub use traits::*; +#[allow(clippy::module_inception)] pub mod storage; pub use storage::*; pub mod test; diff --git a/runtime/lite/src/modules/storage/storage.rs b/runtime/lite/src/modules/storage/storage.rs index b8b3f062..1843e26e 100644 --- a/runtime/lite/src/modules/storage/storage.rs +++ b/runtime/lite/src/modules/storage/storage.rs @@ -95,7 +95,7 @@ impl Storage for CoreStorage { /// The `set_data` function returns a `Result<(), Error>`. fn set_data(&self, key: &str, value: Vec) -> Result<(), CustomError> { let serialized_value = value; - self.db.put(key, &serialized_value)?; + self.db.put(key, serialized_value)?; Ok(()) } @@ -115,7 +115,7 @@ impl Storage for CoreStorage { /// The `modify_data` function is returning a `Result<(), Error>`. fn modify_data(&self, key: &str, value: Vec) -> Result<(), CustomError> { let _existing_data = self.get_data(key)?; - self.db.put(key, &value)?; + self.db.put(key, value)?; Ok(()) } @@ -151,7 +151,7 @@ impl Storage for CoreStorage { /// /// The `store_wasm` function is returning a `Result<(), Error>`. fn store_wasm(&self, key: &str, wasm: &[u8]) -> Result<(), CustomError> { - self.db.put(key, &wasm)?; + self.db.put(key, wasm)?; Ok(()) } diff --git a/runtime/lite/src/modules/wasmtime_wasi_module/mod.rs b/runtime/lite/src/modules/wasmtime_wasi_module/mod.rs index 54d641db..e6a283a8 100644 --- a/runtime/lite/src/modules/wasmtime_wasi_module/mod.rs +++ b/runtime/lite/src/modules/wasmtime_wasi_module/mod.rs @@ -40,7 +40,7 @@ fn run_workflow_helper( let cache = DB::open_default(format!("./.cache/{:?}", id)).unwrap(); let prev_internal_state_data = if !restart { - let prev_internal_state_data: Value = match cache.get(&hash_key.as_bytes()).unwrap() { + let prev_internal_state_data: Value = match cache.get(hash_key.as_bytes()).unwrap() { Some(data) => serde_json::from_slice(&data).unwrap(), None => serde_json::json!([]), }; @@ -267,7 +267,7 @@ fn run_workflow_helper( let mut bytes: Vec = Vec::new(); serde_json::to_writer(&mut bytes, &state_output).unwrap(); - cache.put(&hash_key.as_bytes(), bytes).unwrap(); + cache.put(hash_key.as_bytes(), bytes).unwrap(); } else { state_manager .update_result(workflow_index, res.result.clone(), true) @@ -276,7 +276,7 @@ fn run_workflow_helper( let state_result = serde_json::json!({ "success" : res }); let mut bytes: Vec = Vec::new(); serde_json::to_writer(&mut bytes, &state_result).unwrap(); - cache.put(&hash_key.as_bytes(), bytes).unwrap(); + cache.put(hash_key.as_bytes(), bytes).unwrap(); } Ok(res) From d0a2c7c93a5e50e05a3a4f830e6823032b69de38 Mon Sep 17 00:00:00 2001 From: ajaykumargdr Date: Wed, 27 Mar 2024 11:40:17 +0530 Subject: [PATCH 02/13] fix: create new type `Event` to use in publish event method and pass missing fields --- .../modules/kuska_ssb_client/client/client.rs | 14 +++--- .../modules/kuska_ssb_client/client/tests.rs | 45 ++++++------------- .../modules/kuska_ssb_client/client/types.rs | 31 ++++++++++--- 3 files changed, 46 insertions(+), 44 deletions(-) diff --git a/runtime/lite/src/modules/kuska_ssb_client/client/client.rs b/runtime/lite/src/modules/kuska_ssb_client/client/client.rs index a30a5c8b..4f35a4f5 100644 --- a/runtime/lite/src/modules/kuska_ssb_client/client/client.rs +++ b/runtime/lite/src/modules/kuska_ssb_client/client/client.rs @@ -100,9 +100,7 @@ impl Client { pub async fn new(configs: Option, ip: String, port: String) -> Result { match configs { - Some(config) => { - Self::ssb_handshake(config.pk, config.sk, config.id, ip, port).await - } + Some(config) => Self::ssb_handshake(config.pk, config.sk, config.id, ip, port).await, None => { let OwnedIdentity { pk, sk, id } = from_patchwork_local().await.expect("read local secret"); @@ -189,12 +187,14 @@ impl Client { Ok(()) } - pub async fn publish_event(&mut self, msg: &str, mention: Option>) -> Result<()> { + pub async fn publish_event(&mut self, event: Event) -> Result<()> { let _req_id = self .api .publish_req_send(TypedMessage::Event { - text: msg.to_string(), - mentions: mention, + event: event.event, + section: event.section, + content: event.content, + mentions: event.mentions, }) .await?; @@ -211,7 +211,6 @@ impl Client { req_no: RequestNo, ctx: Arc>, ) -> Result<()> { - let mut is_synced = false; loop { @@ -284,4 +283,3 @@ impl Client { } } } - diff --git a/runtime/lite/src/modules/kuska_ssb_client/client/tests.rs b/runtime/lite/src/modules/kuska_ssb_client/client/tests.rs index ba748a78..979b01bd 100644 --- a/runtime/lite/src/modules/kuska_ssb_client/client/tests.rs +++ b/runtime/lite/src/modules/kuska_ssb_client/client/tests.rs @@ -1,8 +1,9 @@ #[cfg(test)] mod tests { - use kuska_ssb::keystore::read_patchwork_config; - use crate::modules::kuska_ssb_client::client::{types, Client}; + use crate::modules::kuska_ssb_client::client::Client; use dotenv::dotenv; + use kuska_ssb::keystore::read_patchwork_config; + use serde_json::{json, Value}; // ssb-server should keep running for testing /* configure the env variables such as ssb-sercret file path, ip and port where @@ -93,7 +94,7 @@ mod tests { #[ignore] // returns list of feeds posted by particular user async fn test_user_method() { - use types::Event; + // use types::Event; dotenv().ok(); let secret = std::env::var("CONSUMER_SECRET").unwrap_or_else(|_| { @@ -109,10 +110,10 @@ mod tests { .await .unwrap(); - let old_event = Event { - id: "1".to_string(), - body: "hello_world_event".to_string(), - }; + let old_event = json!({ + "id": "1".to_string(), + "body": "hello_world_event".to_string(), + }); let value = serde_json::to_value(old_event.clone()).unwrap(); @@ -134,7 +135,7 @@ mod tests { let feed_text = message.get("text").unwrap(); let feed_text: String = serde_json::from_value(feed_text.clone()).unwrap(); - let new_event: Event = serde_json::from_str(&feed_text).unwrap(); + let new_event: Value = serde_json::from_str(&feed_text).unwrap(); // let event = serde_json::from_value(event).unwrap(); assert_eq!(old_event, new_event); } @@ -197,10 +198,10 @@ mod tests { let feed = client.feed(false).await.unwrap(); let prev_len = feed.len(); - let old_event = types::Event { - id: "1".to_string(), - body: "hello_world_event".to_string(), - }; + let old_event = json!({ + "id": "1".to_string(), + "body": "hello_world_event".to_string(), + }); let value = serde_json::to_value(old_event.clone()).unwrap(); @@ -223,7 +224,7 @@ mod tests { let feed_text = message.get("text").unwrap(); let feed_text: String = serde_json::from_value(feed_text.clone()).unwrap(); - let new_event: types::Event = serde_json::from_str(&feed_text).unwrap(); + let new_event: Value = serde_json::from_str(&feed_text).unwrap(); assert_eq!(old_event, new_event); } @@ -333,22 +334,4 @@ mod tests { let res = client.create_invite().await; assert!(res.is_ok()) } - - #[async_std::test] - #[ignore] - async fn test_accept_invite() { - dotenv::dotenv().ok(); - let secret = std::env::var("CONSUMER_SECRET").unwrap_or_else(|_| { - let home_dir = dirs::home_dir().unwrap(); - std::format!("{}/.ssb/secret", home_dir.to_string_lossy()) - }); - let port = std::env::var("CONSUMER_PORT").unwrap_or_else(|_| 8015.to_string()); - let mut file = async_std::fs::File::open(secret).await.unwrap(); - let key = read_patchwork_config(&mut file).await.unwrap(); - let mut client = Client::new(Some(key), "0.0.0.0".to_string(), port) - .await - .unwrap(); - let res = client.accept_invite("172.28.0.4:8008:@hkYrVBGtWm5+xeRYDzsL9u6b0cM2rtcYs4NiiZQEVLs=.ed25519~BERengMsq9t2ovXHBZgiFtKDlcvAYQTXSPk/JAw+3zQ=").await; - assert!(res.is_ok()) - } } diff --git a/runtime/lite/src/modules/kuska_ssb_client/client/types.rs b/runtime/lite/src/modules/kuska_ssb_client/client/types.rs index 685d5850..b7762386 100644 --- a/runtime/lite/src/modules/kuska_ssb_client/client/types.rs +++ b/runtime/lite/src/modules/kuska_ssb_client/client/types.rs @@ -1,17 +1,38 @@ -use serde::{Deserialize, Serialize}; - use super::*; +use kuska_ssb::api::dto::content::Mention; +use serde::{Deserialize, Serialize}; + pub struct Client { pub api: ApiCaller, pub rpc_reader: RpcReader, pub sk: SecretKey, } -#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +#[derive(Debug, Serialize, Deserialize)] pub struct Event { - pub id: String, - pub body: String, + pub event: String, + pub section: String, + pub content: String, + + #[serde(skip_serializing_if = "Option::is_none")] + pub mentions: Option>, +} + +impl Event { + pub fn new( + event: String, + section: String, + content: String, + mentions: Option>, + ) -> Self { + Self { + event, + section, + content, + mentions, + } + } } #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] From 62176476d65c62115d9673712d90fdecad94da4b Mon Sep 17 00:00:00 2001 From: ajaykumargdr Date: Wed, 27 Mar 2024 15:55:19 +0530 Subject: [PATCH 03/13] chore: add state manager to the context & optimize wasm runtime and state manager --- runtime/lite/src/consumer.rs | 10 +- .../modules/kuska_ssb_client/client/client.rs | 21 +++- runtime/lite/src/modules/logger/logger.rs | 2 - runtime/lite/src/modules/state_manager/mod.rs | 4 +- .../lite/src/modules/state_manager/traits.rs | 2 +- runtime/lite/src/modules/storage/storage.rs | 41 +------ runtime/lite/src/modules/storage/test.rs | 31 ------ runtime/lite/src/modules/storage/traits.rs | 6 +- .../src/modules/wasmtime_wasi_module/mod.rs | 65 ++++------- .../src/modules/wasmtime_wasi_module/tests.rs | 105 ++++++++++++------ 10 files changed, 124 insertions(+), 163 deletions(-) diff --git a/runtime/lite/src/consumer.rs b/runtime/lite/src/consumer.rs index 13c8206f..d1bb4866 100644 --- a/runtime/lite/src/consumer.rs +++ b/runtime/lite/src/consumer.rs @@ -2,8 +2,8 @@ use kuska_ssb::keystore::read_patchwork_config; use runtime::{ common::RequestBody, logger::CoreLogger, - state_manager::GlobalState, modules::kuska_ssb_client::client::Client, + state_manager::GlobalState, storage::{CoreStorage, Storage}, Ctx, Logger, }; @@ -23,11 +23,7 @@ async fn main() { let logger = CoreLogger::new(Some("./workflow")); let state_manager = GlobalState::new(logger.clone()); - let context = Arc::new(Mutex::new(Context::new( - logger, - db, - state_manager - ))); + let context = Arc::new(Mutex::new(Context::new(logger, db, state_manager))); let secret = std::env::var("CONSUMER_SECRET").unwrap_or_else(|_| { let home_dir = dirs::home_dir().unwrap(); @@ -82,7 +78,7 @@ fn handle_client(mut stream: TcpStream, ctx: Arc>) { logger.info("Data Deseriased"); let db = ctx.get_db(); - db.insert(&body.pub_id.clone(), body).unwrap(); + db.insert_request_body(&body.pub_id.clone(), body).unwrap(); logger.info("Data inserted successfully"); // println!("Received data: {:?}", body); diff --git a/runtime/lite/src/modules/kuska_ssb_client/client/client.rs b/runtime/lite/src/modules/kuska_ssb_client/client/client.rs index 4f35a4f5..77a26d48 100644 --- a/runtime/lite/src/modules/kuska_ssb_client/client/client.rs +++ b/runtime/lite/src/modules/kuska_ssb_client/client/client.rs @@ -1,7 +1,10 @@ use std::sync::{Arc, Mutex}; use crate::{ - modules::logger::Logger, modules::storage::Storage, modules::wasmtime_wasi_module, Ctx, + modules::{ + logger::Logger, state_manager::GlobalStateManager, storage::Storage, wasmtime_wasi_module, + }, + Ctx, }; use super::*; @@ -236,20 +239,30 @@ impl Client { let ctx = ctx.lock().unwrap(); let db = ctx.get_db(); let logger = ctx.get_logger(); + let state_manager = ctx.get_state_manager(); - match db.get(&x.mentions.unwrap()[0].link) { + match db.get_request_body( + &x.mentions.unwrap()[0].link, + ) { Ok(body) => { let data = serde_json::json!({ "data" : crate::common::combine_values(&mut event, &body.input), "allowed_hosts": body.allowed_hosts }); + + let workflow_index = ctx + .get_state_manager() + .new_workflow(0, "hello"); + let _ = wasmtime_wasi_module::run_workflow( + state_manager, + logger, serde_json::to_value(data) .unwrap(), body.wasm, - 0, - "hello", + workflow_index, + false, ); } Err(e) => logger.error(&e.to_string()), diff --git a/runtime/lite/src/modules/logger/logger.rs b/runtime/lite/src/modules/logger/logger.rs index 1b2eed59..83736b9d 100644 --- a/runtime/lite/src/modules/logger/logger.rs +++ b/runtime/lite/src/modules/logger/logger.rs @@ -28,14 +28,12 @@ impl CoreLogger { let file = match log_file { Some(file) => OpenOptions::new() .write(true) - .append(true) .create(true) .open(file) .unwrap(), None => OpenOptions::new() .create(true) .write(true) - .append(true) .open("./workflows.log") .unwrap(), }; diff --git a/runtime/lite/src/modules/state_manager/mod.rs b/runtime/lite/src/modules/state_manager/mod.rs index 88b777db..92db5cee 100644 --- a/runtime/lite/src/modules/state_manager/mod.rs +++ b/runtime/lite/src/modules/state_manager/mod.rs @@ -11,6 +11,7 @@ pub use logger::{CoreLogger, Logger}; use super::logger; +#[derive(Debug)] pub struct GlobalState { workflows: Vec, logger: U, @@ -27,10 +28,11 @@ impl GlobalState { impl GlobalStateManager for GlobalState { - fn new_workflow(&mut self, workflow_id: usize, workflow_name: &str) { + fn new_workflow(&mut self, workflow_id: usize, workflow_name: &str) -> usize{ self.workflows .push(WorkflowState::new(workflow_id, workflow_name)); self.logger.info(&format!("[new workflow created with id:{}]", workflow_id)); + self.workflows.len() - 1 } fn get_state_data(&self, workflow_index: usize) -> Result> { diff --git a/runtime/lite/src/modules/state_manager/traits.rs b/runtime/lite/src/modules/state_manager/traits.rs index eaee9490..8e755d7c 100644 --- a/runtime/lite/src/modules/state_manager/traits.rs +++ b/runtime/lite/src/modules/state_manager/traits.rs @@ -12,7 +12,7 @@ pub trait WorkflowStateManager{ } pub trait GlobalStateManager { - fn new_workflow(&mut self, workflow_id: usize, workflow_name: &str); // returns index(used as id also) + fn new_workflow(&mut self, workflow_id: usize, workflow_name: &str) -> usize; // returns index(used as id also) fn get_state_data(&self, workflow_index: usize) -> Result>; fn update_running(&mut self, workflow_index: usize) -> Result<()>; fn update_paused(&mut self, workflow_index: usize, output: Option) -> Result<()>; diff --git a/runtime/lite/src/modules/storage/storage.rs b/runtime/lite/src/modules/storage/storage.rs index 1843e26e..14cbef5c 100644 --- a/runtime/lite/src/modules/storage/storage.rs +++ b/runtime/lite/src/modules/storage/storage.rs @@ -36,7 +36,7 @@ impl From for CustomError { impl From for io::Error { fn from(error: CustomError) -> Self { - match error { + match error { CustomError::RocksDB(e) => io::Error::new(io::ErrorKind::Other, e), CustomError::Io(e) => e, CustomError::Custom(e) => io::Error::new(io::ErrorKind::Other, e), @@ -94,8 +94,8 @@ impl Storage for CoreStorage { /// /// The `set_data` function returns a `Result<(), Error>`. fn set_data(&self, key: &str, value: Vec) -> Result<(), CustomError> { - let serialized_value = value; - self.db.put(key, serialized_value)?; + // let serialized_value = value; + self.db.put(key, value)?; Ok(()) } @@ -137,25 +137,6 @@ impl Storage for CoreStorage { Ok(()) } - /// The function `store_wasm` stores a WebAssembly binary file in a key-value database. - /// - /// Arguments: - /// - /// * `key`: The `key` parameter is a reference to a string that represents the key under which the - /// WebAssembly binary will be stored in the database. - /// * `wasm_path`: The `wasm_path` parameter in the `store_wasm` function represents the file path - /// to the WebAssembly (Wasm) file that you want to store in the database. This function reads the - /// contents of the Wasm file as bytes and stores them in the database with the specified key. - /// - /// Returns: - /// - /// The `store_wasm` function is returning a `Result<(), Error>`. - fn store_wasm(&self, key: &str, wasm: &[u8]) -> Result<(), CustomError> { - self.db.put(key, wasm)?; - - Ok(()) - } - /// The function `get_wasm` retrieves a WebAssembly module from a database using a given key. /// /// Arguments: @@ -168,24 +149,14 @@ impl Storage for CoreStorage { /// The `get_wasm` function returns a `Result` containing either a vector of `u8` bytes or an /// `Error`. - fn get_wasm(&self, key: &str) -> Result, CustomError> { - match self.db.get(key) { - Ok(Some(retrieved_wasm_bytes)) => Ok(retrieved_wasm_bytes), - Ok(None) => Err(CustomError::Custom(format!( - "WASM module not found with key: {:?}", - key - ))), - Err(err) => Err(CustomError::RocksDB(err)), - } - } - - fn insert(&self, key: &str, value: crate::common::RequestBody) -> Result<(), CustomError> { + fn insert_request_body(&self, key: &str, value: crate::common::RequestBody) -> Result<(), CustomError> { let bytes = serde_json::to_vec(&value).map_err(|e| CustomError::Custom(e.to_string()))?; self.db .put(key, bytes) .map_err(|e| CustomError::Custom(e.to_string())) } - fn get(&self, key: &str) -> Result { + + fn get_request_body(&self, key: &str) -> Result { let res = self .db .get(key) diff --git a/runtime/lite/src/modules/storage/test.rs b/runtime/lite/src/modules/storage/test.rs index b80232d6..d872c189 100644 --- a/runtime/lite/src/modules/storage/test.rs +++ b/runtime/lite/src/modules/storage/test.rs @@ -94,35 +94,4 @@ mod tests { fs::remove_dir_all(std::path::Path::new("test5")).unwrap(); result.unwrap(); } - - /// The test function stores a WebAssembly file in a database and then retrieves it to compare with - /// the original file. - #[test] - fn test_store_and_get_wasm() { - let core_storage = CoreStorage::new("test8").unwrap(); - let wasm_bytes = vec![0x00, 0x61, 0x01]; - - let key = "boilerplate"; - core_storage.store_wasm(key, &wasm_bytes).unwrap(); - - let retrieved_wasm = core_storage.get_wasm(key).unwrap(); - fs::remove_dir_all(std::path::Path::new("test8")).unwrap(); - assert_eq!(retrieved_wasm, wasm_bytes) - } - - /// The test function is checking if an error is raised when trying to retrieve a WebAssembly module - /// with a different key than the one it was stored with. - #[test] - #[should_panic] - fn test_get_wasm_with_different_key() { - let core_storage = CoreStorage::new("test9").unwrap(); - let wasm_bytes = vec![0x00, 0x61, 0x01]; - - let key = "boilerplate"; - core_storage.store_wasm(key, &wasm_bytes).unwrap(); - - let result = core_storage.get_wasm("hello"); - fs::remove_dir_all(std::path::Path::new("test9")).unwrap(); - result.unwrap(); - } } diff --git a/runtime/lite/src/modules/storage/traits.rs b/runtime/lite/src/modules/storage/traits.rs index 15176bc7..49409641 100644 --- a/runtime/lite/src/modules/storage/traits.rs +++ b/runtime/lite/src/modules/storage/traits.rs @@ -5,8 +5,6 @@ pub trait Storage { fn set_data(&self, key: &str, value: Vec) -> Result<(), CustomError>; fn modify_data(&self, key: &str, value: Vec) -> Result<(), CustomError>; fn delete_data(&self, key: &str) -> Result<(), CustomError>; - fn store_wasm(&self, key: &str, wasm : &[u8]) -> Result<(), CustomError>; - fn get_wasm(&self, key: &str) -> Result, CustomError>; - fn insert(&self, key: &str, value: RequestBody) -> Result<(), CustomError>; - fn get(&self, key: &str) -> Result; + fn insert_request_body(&self, key: &str, value: RequestBody) -> Result<(), CustomError>; + fn get_request_body(&self, key: &str) -> Result; } diff --git a/runtime/lite/src/modules/wasmtime_wasi_module/mod.rs b/runtime/lite/src/modules/wasmtime_wasi_module/mod.rs index e6a283a8..f1a5e455 100644 --- a/runtime/lite/src/modules/wasmtime_wasi_module/mod.rs +++ b/runtime/lite/src/modules/wasmtime_wasi_module/mod.rs @@ -1,5 +1,6 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; +use std::sync::MutexGuard; pub mod help; pub use help::*; mod tests; @@ -13,7 +14,7 @@ use std::sync::{Arc, Mutex}; mod types; pub use types::*; -use logger::{CoreLogger, Logger}; +use logger::Logger; use rocksdb::DB; use wasi_common::WasiCtx; use wasi_experimental_http_wasmtime::{HttpCtx, HttpState}; @@ -23,23 +24,24 @@ use wasmtime_wasi::sync::WasiCtxBuilder; use super::logger; -#[allow(dead_code)] -fn run_workflow_helper( +pub fn run_workflow( + mut state_manager: MutexGuard>, + logger: MutexGuard, data: Value, wasm_file: Vec, - hash_key: String, - state_manager: &mut GlobalState, workflow_index: usize, - restart: bool, // ignores the cache - logger: U, + ignore_cache: bool, // ignores the cache ) -> Result { - let id = state_manager + let workflow_id = state_manager .get_state_data(workflow_index) .unwrap() .get_id(); - let cache = DB::open_default(format!("./.cache/{:?}", id)).unwrap(); - let prev_internal_state_data = if !restart { + let hash_key = digest(format!("{:?}{:?}", data, workflow_id)); + + let cache = DB::open_default(format!("./.cache/{:?}", workflow_id)).unwrap(); + + let prev_internal_state_data = if !ignore_cache { let prev_internal_state_data: Value = match cache.get(hash_key.as_bytes()).unwrap() { Some(data) => serde_json::from_slice(&data).unwrap(), None => serde_json::json!([]), @@ -48,7 +50,7 @@ fn run_workflow_helper( // returns the main output without passing the state data to the workflow if let Some(output) = prev_internal_state_data.get("success") { state_manager.update_running(workflow_index).unwrap(); - logger.warn(&format!("[workflow:{id} cached result used]")); + logger.warn(&format!("[workflow:{workflow_id} cached result used]")); state_manager .update_result(workflow_index, output.clone(), true) .unwrap(); @@ -120,7 +122,7 @@ fn run_workflow_helper( let output_2: Arc>> = Arc::new(Mutex::new(Vec::new())); let output_ = output_2.clone(); - let logger_cln = Arc::new(Mutex::new(logger)); + let logger_cln = Arc::new(Mutex::new(logger.clone())); linker .func_wrap( @@ -142,21 +144,25 @@ fn run_workflow_helper( ExecutionState::Init => { logger_cln.lock().unwrap().info(&format!( "[workflow:{:?} task[{}...] ]", - id, task_state_data.action_name + workflow_id, task_state_data.action_name )); } ExecutionState::Running => { logger_cln.lock().unwrap().info(&format!( "[workflow:{:?} task[{}:{}] running]", - id, task_state_data.task_index, task_state_data.action_name + workflow_id, + task_state_data.task_index, + task_state_data.action_name )); } ExecutionState::Paused => { logger_cln.lock().unwrap().warn(&format!( "[workflow:{:?} task[{}:{}] paused]", - id, task_state_data.task_index, task_state_data.action_name + workflow_id, + task_state_data.task_index, + task_state_data.action_name )); } @@ -167,14 +173,14 @@ fn run_workflow_helper( -1 => { logger_cln.lock().unwrap().info(&format!( "[workflow:{:?} task[{}] success]", - id, task_state_data.action_name + workflow_id, task_state_data.action_name )); } _ => { logger_cln.lock().unwrap().info(&format!( "[workflow:{:?} task[{}:{}] success]", - id, + workflow_id, task_state_data.task_index, task_state_data.action_name )); @@ -188,7 +194,7 @@ fn run_workflow_helper( ExecutionState::Failed => { logger_cln.lock().unwrap().error(&format!( "[workflow:{:?} task[{}:{}] failed[{}]]", - id, + workflow_id, task_state_data.task_index, task_state_data.action_name, task_state_data.error.unwrap() @@ -281,26 +287,3 @@ fn run_workflow_helper( Ok(res) } - -pub fn run_workflow( - data: Value, - wasm_file: Vec, - workflow_id: usize, - workflow_name: &str, -) -> Result { - let logger = CoreLogger::new(Some("./workflow.log")); - let mut state_manager = GlobalState::new(logger.clone()); - - state_manager.new_workflow(workflow_id, workflow_name); - - let digest = digest(format!("{:?}{:?}", data, workflow_name)); - run_workflow_helper( - data, - wasm_file, - digest, - &mut state_manager, - 0, - false, - logger, - ) -} diff --git a/runtime/lite/src/modules/wasmtime_wasi_module/tests.rs b/runtime/lite/src/modules/wasmtime_wasi_module/tests.rs index dfef33d9..0c92b450 100644 --- a/runtime/lite/src/modules/wasmtime_wasi_module/tests.rs +++ b/runtime/lite/src/modules/wasmtime_wasi_module/tests.rs @@ -1,43 +1,74 @@ -#[async_std::test] -async fn test_hello_world() { - let path = std::env::var("WORKFLOW_WASM") - .unwrap_or("../../workflow/examples/hello_world.wasm".to_string()); +#[cfg(test)] +mod tests { + use crate::context::Ctx; + use crate::logger::CoreLogger; + use crate::state_manager::{GlobalState, GlobalStateManager}; + use crate::storage::CoreStorage; + use crate::wasmtime_wasi_module::run_workflow; - let wasm = std::fs::read(&path).unwrap(); + #[async_std::test] + async fn test_hello_world() { + let logger = CoreLogger::new(Some("./workflow-1.log")); + let ctx = crate::context::Context::new( + logger.clone(), + CoreStorage::new("workflow_db_1").unwrap(), + GlobalState::new(logger), + ); - let input = serde_json::json!({ - "allowed_hosts": [], - "data": { - "hello" : "world" - } - }); + let path = std::env::var("WORKFLOW_WASM") + .unwrap_or("../../workflow/examples/hello_world.wasm".to_string()); - let result = super::run_workflow(input, wasm, 1,"hello_world").unwrap(); + let wasm = std::fs::read(&path).unwrap(); - assert!(result.result.to_string().contains("Hello")); -} + let input = serde_json::json!({ + "allowed_hosts": [], + "data": { + "hello" : "world" + } + }); + + let logger = ctx.get_logger(); + let mut state_manager = ctx.get_state_manager(); + + let index = state_manager.new_workflow(1, "hello_world"); + + let result = run_workflow(state_manager, logger, input.clone(), wasm, index, true).unwrap(); + + assert!(result.result.to_string().contains("Hello")); + } + + #[async_std::test] + async fn test_employee_salary() { + let logger = CoreLogger::new(Some("./workflow-2.log")); + let ctx = crate::context::Context::new( + logger.clone(), + CoreStorage::new("workflow_db_2").unwrap(), + GlobalState::new(logger), + ); + + let path = std::env::var("WORKFLOW_WASM") + .unwrap_or("../../workflow/examples/employee_salary_state_managed.wasm".to_string()); + let wasm = std::fs::read(&path).unwrap(); + + let server = test_util::post("127.0.0.1:1234").await; + let input = serde_json::json!({ + "allowed_hosts": [ + server.uri() + ], + "data": { + "role":"Software Developer", + } + }); + + let logger = ctx.get_logger(); + let mut state_manager = ctx.get_state_manager(); + + let index = state_manager.new_workflow(2, "employee_salary"); -#[async_std::test] -async fn test_employee_salary() { - - let path = std::env::var("WORKFLOW_WASM").unwrap_or( - "../../workflow/examples/employee_salary_state_managed.wasm".to_string(), - ); - let wasm = std::fs::read(&path).unwrap(); - - let server = test_util::post("127.0.0.1:1234").await; - let input = serde_json::json!({ - "allowed_hosts": [ - server.uri() - ], - "data": { - "role":"Software Developer", - } - }); - - let result = super::run_workflow(input.clone(), wasm, 2, "employee_salary").unwrap(); - assert!(result - .result - .to_string() - .contains("Salary creditted for emp id 1 from Hugobyte")); + let result = run_workflow(state_manager, logger, input.clone(), wasm, index, true).unwrap(); + assert!(result + .result + .to_string() + .contains("Salary creditted for emp id 1 from Hugobyte")); + } } From 23fb88c694c80d476d03e2a705992cb7b5fae044 Mon Sep 17 00:00:00 2001 From: ajaykumargdr Date: Wed, 27 Mar 2024 16:47:09 +0530 Subject: [PATCH 04/13] refactor: fix spelling mistakes, format Cargo.toml and remove unwanted files --- runtime/lite/Cargo.toml | 110 ++++++----- runtime/lite/src/consumer.rs | 2 +- runtime/lite/src/modules/common/mod.rs | 4 +- .../modules/kuska_ssb_client/client/client.rs | 4 +- .../modules/kuska_ssb_client/client/tests.rs | 2 +- runtime/lite/src/modules/logger/tests.rs | 4 +- runtime/lite/src/modules/state_manager/mod.rs | 186 +++--------------- .../lite/src/modules/state_manager/tests.rs | 132 +++++++++++++ .../src/modules/wasmtime_wasi_module/help.rs | 11 -- .../src/modules/wasmtime_wasi_module/mod.rs | 11 +- 10 files changed, 235 insertions(+), 231 deletions(-) create mode 100644 runtime/lite/src/modules/state_manager/tests.rs delete mode 100644 runtime/lite/src/modules/wasmtime_wasi_module/help.rs diff --git a/runtime/lite/Cargo.toml b/runtime/lite/Cargo.toml index c1fa7dba..b20ae044 100644 --- a/runtime/lite/Cargo.toml +++ b/runtime/lite/Cargo.toml @@ -1,64 +1,74 @@ -[package] -name = "runtime" -authors = ["The HugoByte Team "] -edition = "2021" -repository = "https://github.com/hugobyte/aurras.git" -version = "0.0.1" - - [[bin]] -name = "consumer" -path = "src/consumer.rs" +name = 'consumer' +path = 'src/consumer.rs' + +[package] +name = 'runtime' +authors = ['The HugoByte Team '] +edition = '2021' +repository = 'https://github.com/hugobyte/aurras.git' +version = '0.0.1' [lib] -path = "src/lib.rs" +path = 'src/lib.rs' [dependencies] -kuska-ssb = { git = "https://github.com/shanithkk/ssb.git", branch = "fix-invite-accept-issue" } -kuska-handshake = { git = "https://github.com/Kuska-ssb/handshake.git", features = [ - "async_std", -] } -# kuska-handshake = "0.2.0" -kuska-sodiumoxide = "0.2.5-0" -base64 = "0.11.0" -# hex = "0.4.0" -async-std = { version = "1.12.0", features = ["unstable", "attributes"] } -log = "0.4.8" -serde = { version = "1.0.104", features = ["derive"] } -serde_json = { version = "1.0.48", features = [ - "preserve_order", - "arbitrary_precision", -] } -dirs = "2.0" - -# rand = "0.7.3" -# wasmtime-wasi-module = { path = "/Users/shanithkk/Hugobyte/learn/rust/search/wasmtime-wasi-module" } +kuska-sodiumoxide = '0.2.5-0' +base64 = '0.11.0' +log = '0.4.8' +dirs = '2.0' +subxt-signer = '0.34.0' +slog = '2.7.0' +slog-async = '2.8.0' +slog-term = '2.9.1' +wasmtime = '0.35.0' +anyhow = '1.0.80' +wasmtime-wasi = '0.35.0' +wasi-common = '0.35.0' +serde_derive = '1.0.81' +http = '0.2' +bytes = '1' +wasi-experimental-http-wasmtime = '0.10.0' +sha256 = '1.5.0' +rocksdb = '0.18.0' +dotenv = '0.15.0' -tokio = { version = "1.36.0", features = ["macros", "time", "rt-multi-thread"] } -subxt-signer = "0.34.0" +[dependencies.kuska-ssb] +git = 'https://github.com/shanithkk/ssb.git' +branch = 'fix-invite-accept-issue' -slog = "2.7.0" -slog-async = "2.8.0" -slog-term = "2.9.1" +[dependencies.kuska-handshake] +git = 'https://github.com/Kuska-ssb/handshake.git' +features = ['async_std'] -wasmtime = "0.35.0" -anyhow = "1.0.80" -# wiremock = "0.5.17" -wasmtime-wasi = "0.35.0" -wasi-common = "0.35.0" -serde_derive = "1.0.81" -http = "0.2" -bytes = "1" -wasi-experimental-http-wasmtime = "0.10.0" -sha256 = "1.5.0" -test_util = { path = "../../workflow/test_util" } +[dependencies.async-std] +version = '1.12.0' +features = [ + 'unstable', + 'attributes', +] -rocksdb = "0.18.0" +[dependencies.serde] +version = '1.0.104' +features = ['derive'] +[dependencies.serde_json] +version = '1.0.48' +features = [ + 'preserve_order', + 'arbitrary_precision', +] -# tempfile = "3.10.1" -dotenv = "0.15.0" +[dependencies.tokio] +version = '1.36.0' +features = [ + 'macros', + 'time', + 'rt-multi-thread', +] +[dependencies.test_util] +path = '../../workflow/test_util' [dev-dependencies] -subxt = "0.34.0" +subxt = '0.34.0' diff --git a/runtime/lite/src/consumer.rs b/runtime/lite/src/consumer.rs index d1bb4866..06323d69 100644 --- a/runtime/lite/src/consumer.rs +++ b/runtime/lite/src/consumer.rs @@ -75,7 +75,7 @@ fn handle_client(mut stream: TcpStream, ctx: Arc>) { let ctx = ctx.lock().unwrap(); let logger = ctx.get_logger().clone(); - logger.info("Data Deseriased"); + logger.info("Data Deserialized"); let db = ctx.get_db(); db.insert_request_body(&body.pub_id.clone(), body).unwrap(); diff --git a/runtime/lite/src/modules/common/mod.rs b/runtime/lite/src/modules/common/mod.rs index cc4fd5c5..5a3de043 100644 --- a/runtime/lite/src/modules/common/mod.rs +++ b/runtime/lite/src/modules/common/mod.rs @@ -6,7 +6,7 @@ pub struct RequestBody { pub wasm: Vec, pub invite: String, pub pub_id: String, - pub allowed_hosts : Option>, + pub allowed_hosts: Option>, pub input: Value, } @@ -24,4 +24,4 @@ pub fn combine_values(dest: &mut serde_json::Value, src: &serde_json::Value) { } (_, _) => panic!("update_with only works with two serde_json::Value::Object s"), } -} \ No newline at end of file +} diff --git a/runtime/lite/src/modules/kuska_ssb_client/client/client.rs b/runtime/lite/src/modules/kuska_ssb_client/client/client.rs index 77a26d48..fafd0fda 100644 --- a/runtime/lite/src/modules/kuska_ssb_client/client/client.rs +++ b/runtime/lite/src/modules/kuska_ssb_client/client/client.rs @@ -83,9 +83,9 @@ impl Client { let server_pk = ed25519::PublicKey::from_slice(&base64::decode(&server_pk)?).expect("bad public key"); - let server_ipport = format!("{}:{}", ip, port); + let server_ip_port = format!("{}:{}", ip, port); - let mut socket = TcpStream::connect(server_ipport).await?; + let mut socket = TcpStream::connect(server_ip_port).await?; let handshake = handshake_client(&mut socket, ssb_net_id(), pk, sk.clone(), server_pk).await?; diff --git a/runtime/lite/src/modules/kuska_ssb_client/client/tests.rs b/runtime/lite/src/modules/kuska_ssb_client/client/tests.rs index 979b01bd..0a0ba167 100644 --- a/runtime/lite/src/modules/kuska_ssb_client/client/tests.rs +++ b/runtime/lite/src/modules/kuska_ssb_client/client/tests.rs @@ -6,7 +6,7 @@ mod tests { use serde_json::{json, Value}; // ssb-server should keep running for testing - /* configure the env variables such as ssb-sercret file path, ip and port where + /* configure the env variables such as ssb-secret file path, ip and port where ssb-server is running in .env file */ // use `cargo test -- --ignored` command for testing diff --git a/runtime/lite/src/modules/logger/tests.rs b/runtime/lite/src/modules/logger/tests.rs index 0a21a4c5..31c2ff61 100644 --- a/runtime/lite/src/modules/logger/tests.rs +++ b/runtime/lite/src/modules/logger/tests.rs @@ -27,7 +27,7 @@ fn test_writing_to_log_file() { } #[test] -fn test_logger_in_multi_threads(){ +fn test_logger_in_multi_threads() { let logger = CoreLogger::new(Some("test3.log")); let mut handles = vec![]; @@ -44,4 +44,4 @@ fn test_logger_in_multi_threads(){ } fs::remove_file("test3.log").unwrap(); -} \ No newline at end of file +} diff --git a/runtime/lite/src/modules/state_manager/mod.rs b/runtime/lite/src/modules/state_manager/mod.rs index 92db5cee..e72112d1 100644 --- a/runtime/lite/src/modules/state_manager/mod.rs +++ b/runtime/lite/src/modules/state_manager/mod.rs @@ -5,11 +5,13 @@ use serde_json::Value; pub mod traits; pub mod types; +pub use logger::{CoreLogger, Logger}; pub use traits::*; pub use types::*; -pub use logger::{CoreLogger, Logger}; use super::logger; +#[cfg(test)] +mod tests; #[derive(Debug)] pub struct GlobalState { @@ -26,12 +28,12 @@ impl GlobalState { } } -impl GlobalStateManager for GlobalState -{ - fn new_workflow(&mut self, workflow_id: usize, workflow_name: &str) -> usize{ +impl GlobalStateManager for GlobalState { + fn new_workflow(&mut self, workflow_id: usize, workflow_name: &str) -> usize { self.workflows .push(WorkflowState::new(workflow_id, workflow_name)); - self.logger.info(&format!("[new workflow created with id:{}]", workflow_id)); + self.logger + .info(&format!("[new workflow created with id:{}]", workflow_id)); self.workflows.len() - 1 } @@ -47,9 +49,15 @@ impl GlobalStateManager for GlobalState if self.workflows.len() <= workflow_index { Err(anyhow!("index out of bound")) } else { - self.logger.info(&format!("[workflow:{} starting...]", self.workflows[workflow_index].get_id())); + self.logger.info(&format!( + "[workflow:{} starting...]", + self.workflows[workflow_index].get_id() + )); self.workflows[workflow_index].update_running()?; - self.logger.info(&format!("[workflow:{} running]", self.workflows[workflow_index].get_id())); + self.logger.info(&format!( + "[workflow:{} running]", + self.workflows[workflow_index].get_id() + )); Ok(()) } } @@ -59,7 +67,10 @@ impl GlobalStateManager for GlobalState Err(anyhow!("index out of bound")) } else { self.workflows[workflow_index].update_paused(output)?; - self.logger.warn(&format!("[workflow:{} paused]", self.workflows[workflow_index].get_id())); + self.logger.warn(&format!( + "[workflow:{} paused]", + self.workflows[workflow_index].get_id() + )); Ok(()) } } @@ -75,157 +86,22 @@ impl GlobalStateManager for GlobalState } else { self.workflows[workflow_index].update_result(result.clone(), is_success)?; - if is_success{ - self.logger.info(&format!("[workflow:{} execution success✅]", self.workflows[workflow_index].get_id())); - }else{ + if is_success { + self.logger.info(&format!( + "[workflow:{} execution success✅]", + self.workflows[workflow_index].get_id() + )); + } else { let id = self.workflows[workflow_index].get_id(); - self.logger.error(&format!("[workflow:{} execution failed❌]", id)); - let result: String = serde_json::from_value(result.get("Err").unwrap().clone()).unwrap(); - self.logger.error(&format!("[workflow:{} result: {}]", id, result)); + self.logger + .error(&format!("[workflow:{} execution failed❌]", id)); + let result: String = + serde_json::from_value(result.get("Err").unwrap().clone()).unwrap(); + self.logger + .error(&format!("[workflow:{} result: {}]", id, result)); } Ok(()) } } - - } - -#[cfg(test)] -mod tests { - use super::*; - #[test] - fn test_new_workflow() { - let logger = CoreLogger::new(Some("./test_log_1.log")); - let mut global_state= GlobalState::new(logger); - global_state.new_workflow(0, "test_workflow"); - let new_workflow_state = WorkflowState::new(0, "test_workflow"); - assert_eq!(global_state.workflows[0], new_workflow_state); - std::fs::remove_file("./test_log_1.log").unwrap() - } - - #[test] - fn test_get_state_data_pass() { - let logger = CoreLogger::new(Some("./test_log_2.log")); - let mut global_state = GlobalState::new(logger); - - global_state.new_workflow(0, "test_workflow"); - let state_data = global_state.get_state_data(0).unwrap(); - assert_eq!(state_data.get_id(), 0); - assert_eq!(state_data.get_workflow_name(), "test_workflow"); - assert_eq!(state_data.get_execution_state(), ExecutionState::Init); - assert!(state_data.get_result().is_err()); - - - global_state.update_running(0).unwrap(); - let state_data = global_state.get_state_data(0).unwrap(); - assert_eq!(state_data.get_id(), 0); - assert_eq!(state_data.get_workflow_name(), "test_workflow"); - assert_eq!(state_data.get_execution_state(), ExecutionState::Running); - assert!(state_data.get_result().is_err()); - - // without result - global_state.update_paused(0, None).unwrap(); - let state_data = global_state.get_state_data(0).unwrap(); - assert_eq!(state_data.get_id(), 0); - assert_eq!(state_data.get_workflow_name(), "test_workflow"); - assert_eq!(state_data.get_execution_state(), ExecutionState::Paused); - assert!(state_data.get_result().is_err()); - - - global_state.update_running(0).unwrap(); - // with result - let data = Value::String("some result".to_string()); - global_state.update_paused(0, Some(data.clone())).unwrap(); - let state_data = global_state.get_state_data(0).unwrap(); - assert_eq!(state_data.get_id(), 0); - assert_eq!(state_data.get_workflow_name(), "test_workflow"); - assert_eq!(state_data.get_execution_state(), ExecutionState::Paused); - assert_eq!(state_data.get_result().unwrap(), data); - - global_state.update_running(0).unwrap(); - global_state.update_result(0, data.clone(), true).unwrap(); - let state_data = global_state.get_state_data(0).unwrap(); - assert_eq!(state_data.get_id(), 0); - assert_eq!(state_data.get_workflow_name(), "test_workflow"); - assert_eq!(state_data.get_execution_state(), ExecutionState::Success); - assert_eq!(state_data.get_result().unwrap(), data); - std::fs::remove_file("./test_log_2.log").unwrap() - } - - #[test] - #[should_panic="index out of bound"] - fn test_get_state_data_fail(){ - let logger = CoreLogger::new(Some("./test_log_3.log")); - let mut global_state = GlobalState::new(logger); - global_state.new_workflow(0, "test_workflow"); - std::fs::remove_file("./test_log_3.log").unwrap(); - global_state.get_state_data(1).unwrap(); - } - - #[test] - fn test_update_running_pass(){ - let logger = CoreLogger::new(Some("./test_log_4.log")); - let mut global_state = GlobalState::new(logger); - global_state.new_workflow(0, "test_workflow"); - global_state.update_running(0).unwrap(); - let state_data = global_state.get_state_data(0).unwrap(); - assert_eq!(state_data.get_execution_state(), ExecutionState::Running); - std::fs::remove_file("./test_log_4.log").unwrap() - } - - #[test] - #[should_panic="index out of bound"] - fn test_update_running_fail(){ - let logger = CoreLogger::new(Some("./test_log_5.log")); - let mut global_state = GlobalState::new(logger); - global_state.new_workflow(0, "test_workflow"); - std::fs::remove_file("./test_log_5.log").unwrap(); - global_state.update_running(1).unwrap(); - } - - #[test] - fn test_update_paused_pass(){ - let logger = CoreLogger::new(Some("./test_log_6.log")); - let mut global_state = GlobalState::new(logger); - global_state.new_workflow(0, "test_workflow"); - global_state.update_paused(0, None).unwrap(); - let state_data = global_state.get_state_data(0).unwrap(); - assert_eq!(state_data.get_execution_state(), ExecutionState::Paused); - std::fs::remove_file("./test_log_6.log").unwrap(); - } - - #[test] - #[should_panic="index out of bound"] - fn test_update_paused_fail(){ - let logger = CoreLogger::new(Some("./test_log_7.log")); - let mut global_state = GlobalState::new(logger); - global_state.new_workflow(0, "test_workflow"); - std::fs::remove_file("./test_log_7.log").unwrap(); - global_state.update_paused(1, None).unwrap(); - } - - #[test] - fn test_update_result_pass(){ - let logger = CoreLogger::new(Some("./test_log_8.log")); - let mut global_state = GlobalState::new(logger); - global_state.new_workflow(0, "test_workflow"); - global_state.update_running(0).unwrap(); - let data = Value::String("some result".to_string()); - global_state.update_result(0, data.clone(), true).unwrap(); - let state_data = global_state.get_state_data(0).unwrap(); - std::fs::remove_file("./test_log_8.log").unwrap(); - assert_eq!(state_data.get_result().unwrap(), data); - } - - #[test] - #[should_panic="index out of bound"] - fn test_update_result_fail(){ - let logger = CoreLogger::new(Some("./test_log_9.log")); - let mut global_state = GlobalState::new(logger); - global_state.new_workflow(0, "test_workflow"); - std::fs::remove_file("./test_log_9.log").unwrap(); - global_state.update_result(1, Value::Null, true).unwrap(); - } - -} \ No newline at end of file diff --git a/runtime/lite/src/modules/state_manager/tests.rs b/runtime/lite/src/modules/state_manager/tests.rs new file mode 100644 index 00000000..8aabc198 --- /dev/null +++ b/runtime/lite/src/modules/state_manager/tests.rs @@ -0,0 +1,132 @@ +use super::*; +#[test] +fn test_new_workflow() { + let logger = CoreLogger::new(Some("./test_log_1.log")); + let mut global_state = GlobalState::new(logger); + global_state.new_workflow(0, "test_workflow"); + let new_workflow_state = WorkflowState::new(0, "test_workflow"); + assert_eq!(global_state.workflows[0], new_workflow_state); + std::fs::remove_file("./test_log_1.log").unwrap() +} + +#[test] +fn test_get_state_data_pass() { + let logger = CoreLogger::new(Some("./test_log_2.log")); + let mut global_state = GlobalState::new(logger); + + global_state.new_workflow(0, "test_workflow"); + let state_data = global_state.get_state_data(0).unwrap(); + assert_eq!(state_data.get_id(), 0); + assert_eq!(state_data.get_workflow_name(), "test_workflow"); + assert_eq!(state_data.get_execution_state(), ExecutionState::Init); + assert!(state_data.get_result().is_err()); + + global_state.update_running(0).unwrap(); + let state_data = global_state.get_state_data(0).unwrap(); + assert_eq!(state_data.get_id(), 0); + assert_eq!(state_data.get_workflow_name(), "test_workflow"); + assert_eq!(state_data.get_execution_state(), ExecutionState::Running); + assert!(state_data.get_result().is_err()); + + // without result + global_state.update_paused(0, None).unwrap(); + let state_data = global_state.get_state_data(0).unwrap(); + assert_eq!(state_data.get_id(), 0); + assert_eq!(state_data.get_workflow_name(), "test_workflow"); + assert_eq!(state_data.get_execution_state(), ExecutionState::Paused); + assert!(state_data.get_result().is_err()); + + global_state.update_running(0).unwrap(); + // with result + let data = Value::String("some result".to_string()); + global_state.update_paused(0, Some(data.clone())).unwrap(); + let state_data = global_state.get_state_data(0).unwrap(); + assert_eq!(state_data.get_id(), 0); + assert_eq!(state_data.get_workflow_name(), "test_workflow"); + assert_eq!(state_data.get_execution_state(), ExecutionState::Paused); + assert_eq!(state_data.get_result().unwrap(), data); + + global_state.update_running(0).unwrap(); + global_state.update_result(0, data.clone(), true).unwrap(); + let state_data = global_state.get_state_data(0).unwrap(); + assert_eq!(state_data.get_id(), 0); + assert_eq!(state_data.get_workflow_name(), "test_workflow"); + assert_eq!(state_data.get_execution_state(), ExecutionState::Success); + assert_eq!(state_data.get_result().unwrap(), data); + std::fs::remove_file("./test_log_2.log").unwrap() +} + +#[test] +#[should_panic = "index out of bound"] +fn test_get_state_data_fail() { + let logger = CoreLogger::new(Some("./test_log_3.log")); + let mut global_state = GlobalState::new(logger); + global_state.new_workflow(0, "test_workflow"); + std::fs::remove_file("./test_log_3.log").unwrap(); + global_state.get_state_data(1).unwrap(); +} + +#[test] +fn test_update_running_pass() { + let logger = CoreLogger::new(Some("./test_log_4.log")); + let mut global_state = GlobalState::new(logger); + global_state.new_workflow(0, "test_workflow"); + global_state.update_running(0).unwrap(); + let state_data = global_state.get_state_data(0).unwrap(); + assert_eq!(state_data.get_execution_state(), ExecutionState::Running); + std::fs::remove_file("./test_log_4.log").unwrap() +} + +#[test] +#[should_panic = "index out of bound"] +fn test_update_running_fail() { + let logger = CoreLogger::new(Some("./test_log_5.log")); + let mut global_state = GlobalState::new(logger); + global_state.new_workflow(0, "test_workflow"); + std::fs::remove_file("./test_log_5.log").unwrap(); + global_state.update_running(1).unwrap(); +} + +#[test] +fn test_update_paused_pass() { + let logger = CoreLogger::new(Some("./test_log_6.log")); + let mut global_state = GlobalState::new(logger); + global_state.new_workflow(0, "test_workflow"); + global_state.update_paused(0, None).unwrap(); + let state_data = global_state.get_state_data(0).unwrap(); + assert_eq!(state_data.get_execution_state(), ExecutionState::Paused); + std::fs::remove_file("./test_log_6.log").unwrap(); +} + +#[test] +#[should_panic = "index out of bound"] +fn test_update_paused_fail() { + let logger = CoreLogger::new(Some("./test_log_7.log")); + let mut global_state = GlobalState::new(logger); + global_state.new_workflow(0, "test_workflow"); + std::fs::remove_file("./test_log_7.log").unwrap(); + global_state.update_paused(1, None).unwrap(); +} + +#[test] +fn test_update_result_pass() { + let logger = CoreLogger::new(Some("./test_log_8.log")); + let mut global_state = GlobalState::new(logger); + global_state.new_workflow(0, "test_workflow"); + global_state.update_running(0).unwrap(); + let data = Value::String("some result".to_string()); + global_state.update_result(0, data.clone(), true).unwrap(); + let state_data = global_state.get_state_data(0).unwrap(); + std::fs::remove_file("./test_log_8.log").unwrap(); + assert_eq!(state_data.get_result().unwrap(), data); +} + +#[test] +#[should_panic = "index out of bound"] +fn test_update_result_fail() { + let logger = CoreLogger::new(Some("./test_log_9.log")); + let mut global_state = GlobalState::new(logger); + global_state.new_workflow(0, "test_workflow"); + std::fs::remove_file("./test_log_9.log").unwrap(); + global_state.update_result(1, Value::Null, true).unwrap(); +} diff --git a/runtime/lite/src/modules/wasmtime_wasi_module/help.rs b/runtime/lite/src/modules/wasmtime_wasi_module/help.rs deleted file mode 100644 index 9fdbc87e..00000000 --- a/runtime/lite/src/modules/wasmtime_wasi_module/help.rs +++ /dev/null @@ -1,11 +0,0 @@ -// use wiremock::MockServer; - -// async fn create_server(add: &str) -> MockServer { -// let listener = std::net::TcpListener::bind(add).unwrap(); -// MockServer::builder().listener(listener).start().await -// } - -// pub async fn post(address: &str) -> MockServer { -// let server = create_server(address).await; -// server -// } diff --git a/runtime/lite/src/modules/wasmtime_wasi_module/mod.rs b/runtime/lite/src/modules/wasmtime_wasi_module/mod.rs index f1a5e455..ea2286cd 100644 --- a/runtime/lite/src/modules/wasmtime_wasi_module/mod.rs +++ b/runtime/lite/src/modules/wasmtime_wasi_module/mod.rs @@ -1,16 +1,13 @@ -use serde::{Deserialize, Serialize}; -use serde_json::Value; -use std::sync::MutexGuard; -pub mod help; -pub use help::*; -mod tests; - use crate::modules::state_manager::{ ExecutionState, GlobalState, GlobalStateManager, WorkflowState, }; +use serde::{Deserialize, Serialize}; +use serde_json::Value; use sha256::digest; +use std::sync::MutexGuard; use std::sync::{Arc, Mutex}; +mod tests; mod types; pub use types::*; From d2d57f6d38fdc70b0a20f1a90084a7a8ec1111c9 Mon Sep 17 00:00:00 2001 From: ajaykumargdr Date: Thu, 28 Mar 2024 11:44:53 +0530 Subject: [PATCH 05/13] chore: remove all print statements and log instead --- runtime/lite/Cargo.toml | 110 ++++++++++-------- runtime/lite/examples/basic-producer.rs | 21 ++-- runtime/lite/src/consumer.rs | 20 ++-- .../modules/kuska_ssb_client/client/client.rs | 15 +-- .../modules/kuska_ssb_client/client/tests.rs | 6 - runtime/lite/src/modules/storage/test.rs | 2 - 6 files changed, 95 insertions(+), 79 deletions(-) diff --git a/runtime/lite/Cargo.toml b/runtime/lite/Cargo.toml index c1fa7dba..b20ae044 100644 --- a/runtime/lite/Cargo.toml +++ b/runtime/lite/Cargo.toml @@ -1,64 +1,74 @@ -[package] -name = "runtime" -authors = ["The HugoByte Team "] -edition = "2021" -repository = "https://github.com/hugobyte/aurras.git" -version = "0.0.1" - - [[bin]] -name = "consumer" -path = "src/consumer.rs" +name = 'consumer' +path = 'src/consumer.rs' + +[package] +name = 'runtime' +authors = ['The HugoByte Team '] +edition = '2021' +repository = 'https://github.com/hugobyte/aurras.git' +version = '0.0.1' [lib] -path = "src/lib.rs" +path = 'src/lib.rs' [dependencies] -kuska-ssb = { git = "https://github.com/shanithkk/ssb.git", branch = "fix-invite-accept-issue" } -kuska-handshake = { git = "https://github.com/Kuska-ssb/handshake.git", features = [ - "async_std", -] } -# kuska-handshake = "0.2.0" -kuska-sodiumoxide = "0.2.5-0" -base64 = "0.11.0" -# hex = "0.4.0" -async-std = { version = "1.12.0", features = ["unstable", "attributes"] } -log = "0.4.8" -serde = { version = "1.0.104", features = ["derive"] } -serde_json = { version = "1.0.48", features = [ - "preserve_order", - "arbitrary_precision", -] } -dirs = "2.0" - -# rand = "0.7.3" -# wasmtime-wasi-module = { path = "/Users/shanithkk/Hugobyte/learn/rust/search/wasmtime-wasi-module" } +kuska-sodiumoxide = '0.2.5-0' +base64 = '0.11.0' +log = '0.4.8' +dirs = '2.0' +subxt-signer = '0.34.0' +slog = '2.7.0' +slog-async = '2.8.0' +slog-term = '2.9.1' +wasmtime = '0.35.0' +anyhow = '1.0.80' +wasmtime-wasi = '0.35.0' +wasi-common = '0.35.0' +serde_derive = '1.0.81' +http = '0.2' +bytes = '1' +wasi-experimental-http-wasmtime = '0.10.0' +sha256 = '1.5.0' +rocksdb = '0.18.0' +dotenv = '0.15.0' -tokio = { version = "1.36.0", features = ["macros", "time", "rt-multi-thread"] } -subxt-signer = "0.34.0" +[dependencies.kuska-ssb] +git = 'https://github.com/shanithkk/ssb.git' +branch = 'fix-invite-accept-issue' -slog = "2.7.0" -slog-async = "2.8.0" -slog-term = "2.9.1" +[dependencies.kuska-handshake] +git = 'https://github.com/Kuska-ssb/handshake.git' +features = ['async_std'] -wasmtime = "0.35.0" -anyhow = "1.0.80" -# wiremock = "0.5.17" -wasmtime-wasi = "0.35.0" -wasi-common = "0.35.0" -serde_derive = "1.0.81" -http = "0.2" -bytes = "1" -wasi-experimental-http-wasmtime = "0.10.0" -sha256 = "1.5.0" -test_util = { path = "../../workflow/test_util" } +[dependencies.async-std] +version = '1.12.0' +features = [ + 'unstable', + 'attributes', +] -rocksdb = "0.18.0" +[dependencies.serde] +version = '1.0.104' +features = ['derive'] +[dependencies.serde_json] +version = '1.0.48' +features = [ + 'preserve_order', + 'arbitrary_precision', +] -# tempfile = "3.10.1" -dotenv = "0.15.0" +[dependencies.tokio] +version = '1.36.0' +features = [ + 'macros', + 'time', + 'rt-multi-thread', +] +[dependencies.test_util] +path = '../../workflow/test_util' [dev-dependencies] -subxt = "0.34.0" +subxt = '0.34.0' diff --git a/runtime/lite/examples/basic-producer.rs b/runtime/lite/examples/basic-producer.rs index a3ac5b37..b115ccd4 100644 --- a/runtime/lite/examples/basic-producer.rs +++ b/runtime/lite/examples/basic-producer.rs @@ -1,11 +1,15 @@ use dotenv::dotenv; use kuska_ssb::{api::dto::content::Mention, keystore::read_patchwork_config}; use runtime::kuska_ssb_client::client::Client; +use runtime::logger::{CoreLogger, Logger}; #[tokio::main] async fn main() { dotenv().ok(); - println!("start"); + + let logger = CoreLogger::new(Some("./ssb-producer.log")); + + logger.info("starting producer..."); let secret = std::env::var("PRODUCER_SECRET").unwrap_or_else(|_| { let home_dir = dirs::home_dir().unwrap(); std::format!("{}/.ssb/secret", home_dir.to_string_lossy()) @@ -21,6 +25,8 @@ async fn main() { .await .unwrap(); + logger.info("producer successfully started✅"); + use subxt::{OnlineClient, PolkadotConfig}; #[subxt::subxt(runtime_metadata_path = "./src/modules/utils/polkadot_metadata_small.scale")] @@ -38,9 +44,9 @@ async fn main() { let block_number = block.header().number; let block_hash = block.hash(); - println!("Block #{block_number}:"); - println!(" Hash: {block_hash}"); - println!(" Extrinsics:"); + logger.info(&format!( + "Block #{block_number} Hash: {block_hash} Extrinsics:" + )); // Log each of the extrinsic with it's associated events: let extrinsics = block.extrinsics().await.unwrap(); @@ -56,20 +62,21 @@ async fn main() { let from_addr = transfer.from.to_string(); let to_addr = transfer.from.to_string(); let amount = transfer.amount; - println!("{from_addr:?}"); + + logger.info(&format!("Transfer: {from_addr} -> {to_addr} {amount}")); let value = format!( "{{\"from\":\"{}\",\"to\":\"{}\",\"amount\":\"{}\"}}", from_addr, to_addr, amount ); - let menttion = Mention { + let mention = Mention { link: pub_address.clone(), name: None, }; let _ = client - .publish(&value.to_string(), Some(vec![menttion])) + .publish(&value.to_string(), Some(vec![mention])) .await; } None => (), diff --git a/runtime/lite/src/consumer.rs b/runtime/lite/src/consumer.rs index 13c8206f..c64e1452 100644 --- a/runtime/lite/src/consumer.rs +++ b/runtime/lite/src/consumer.rs @@ -20,11 +20,13 @@ use dotenv::dotenv; async fn main() { dotenv().ok(); let db = CoreStorage::new("runtime").unwrap(); - let logger = CoreLogger::new(Some("./workflow")); + let logger = CoreLogger::new(Some("./ssb-consumer.log")); let state_manager = GlobalState::new(logger.clone()); + logger.info("starting consumer..."); + let context = Arc::new(Mutex::new(Context::new( - logger, + logger.clone(), db, state_manager ))); @@ -38,11 +40,16 @@ async fn main() { let key = read_patchwork_config(&mut file).await.unwrap(); let ssb_context = context.clone(); + // Spawn the SSB feed listener task - tokio::spawn(async { + tokio::spawn(async move{ let mut client = Client::new(Some(key), "0.0.0.0".to_string(), port) .await .unwrap(); + + let logger = ssb_context.clone().lock().unwrap().get_logger().clone(); + logger.info("consumer successfully started✅"); + client .live_feed_with_execution_of_workflow(true, ssb_context) .await @@ -52,14 +59,14 @@ async fn main() { // Spawn the HTTP server task tokio::spawn(async move { let listener = TcpListener::bind("127.0.0.1:8080").expect("Failed to bind to address"); - println!("Listening on 127.0.0.1:8080..."); + logger.info("Listening on 127.0.0.1:8080..."); for stream in listener.incoming() { match stream { Ok(stream) => { handle_client(stream, context.clone()); } Err(e) => { - eprintln!("Error accepting connection: {}", e); + logger.error(&format!("Error accepting connection: {}", e)); } } } @@ -79,13 +86,12 @@ fn handle_client(mut stream: TcpStream, ctx: Arc>) { let ctx = ctx.lock().unwrap(); let logger = ctx.get_logger().clone(); - logger.info("Data Deseriased"); + logger.info("Data Deserialized"); let db = ctx.get_db(); db.insert(&body.pub_id.clone(), body).unwrap(); logger.info("Data inserted successfully"); - // println!("Received data: {:?}", body); // Respond to the client (optional) let response = "Data received!"; stream diff --git a/runtime/lite/src/modules/kuska_ssb_client/client/client.rs b/runtime/lite/src/modules/kuska_ssb_client/client/client.rs index 4f35a4f5..32c2f2cf 100644 --- a/runtime/lite/src/modules/kuska_ssb_client/client/client.rs +++ b/runtime/lite/src/modules/kuska_ssb_client/client/client.rs @@ -215,6 +215,9 @@ impl Client { loop { let (id, msg) = self.rpc_reader.recv().await?; + let ctx = ctx.lock().unwrap(); + let db = ctx.get_db(); + let logger = ctx.get_logger(); if id == req_no { match msg { @@ -231,11 +234,9 @@ impl Client { match serde_json::from_str::(&x.text) { Ok(mut event) => { - println!("{:#?}", event); - let ctx = ctx.lock().unwrap(); - let db = ctx.get_db(); - let logger = ctx.get_logger(); + + logger.info(&format!("Event: {:#?}", event)); match db.get(&x.mentions.unwrap()[0].link) { Ok(body) => { @@ -255,10 +256,10 @@ impl Client { Err(e) => logger.error(&e.to_string()), } } - Err(e) => println!("{:#?}", e), + Err(e) => logger.error(&e.to_string()), } } - Err(e) => println!("{:#?}", e), + Err(e) => logger.error(&e.to_string()), } } } @@ -266,7 +267,7 @@ impl Client { let body = std::str::from_utf8(&body).unwrap(); if body == "{\"sync\":true}" { - println!("Syncing Successful"); + logger.info("Syncing Successful"); is_synced = true; } else { return std::result::Result::Err(err); diff --git a/runtime/lite/src/modules/kuska_ssb_client/client/tests.rs b/runtime/lite/src/modules/kuska_ssb_client/client/tests.rs index 979b01bd..77ba146c 100644 --- a/runtime/lite/src/modules/kuska_ssb_client/client/tests.rs +++ b/runtime/lite/src/modules/kuska_ssb_client/client/tests.rs @@ -269,11 +269,6 @@ mod tests { let block = block.unwrap(); let block_number = block.header().number; - let block_hash = block.hash(); - - println!("Block #{block_number}:"); - println!(" Hash: {block_hash}"); - println!(" Extrinsics:"); if block_number == 10 { let dest = dev::bob().public_key().into(); @@ -300,7 +295,6 @@ mod tests { let from_addr = transfer.from.to_string(); let to_addr = transfer.from.to_string(); let amount = transfer.amount; - println!("{from_addr:?}"); let value = format!( "{{\"from\":\"{}\",\"to\":\"{}\",\"amount\":\"{}\"}}", diff --git a/runtime/lite/src/modules/storage/test.rs b/runtime/lite/src/modules/storage/test.rs index b80232d6..7c8f2e7e 100644 --- a/runtime/lite/src/modules/storage/test.rs +++ b/runtime/lite/src/modules/storage/test.rs @@ -15,7 +15,6 @@ mod tests { while retries < 3 { if let Err(_) = std::fs::remove_file(lock_file_path) { - println!("Failed to remove lock file: {}", lock_file_path); retries += 1; // Wait for 1 second before retrying @@ -41,7 +40,6 @@ mod tests { .set_data("test_key", b"test_value".to_vec()) .unwrap(); let result = core_storage.get_data("test_key").unwrap(); - println!("{:?}", result); let deserialized_value: Vec = result; fs::remove_dir_all(std::path::Path::new("test2")).unwrap(); assert_eq!(deserialized_value, b"test_value"); From 83b418ba7b043e8aa79170432e6eed5140c18287 Mon Sep 17 00:00:00 2001 From: ajaykumargdr Date: Thu, 28 Mar 2024 18:32:39 +0530 Subject: [PATCH 06/13] chore: update docker composer file with healer image --- runtime/lite/scripts/docker-compose.yml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/runtime/lite/scripts/docker-compose.yml b/runtime/lite/scripts/docker-compose.yml index 45a65661..bfa56756 100644 --- a/runtime/lite/scripts/docker-compose.yml +++ b/runtime/lite/scripts/docker-compose.yml @@ -29,4 +29,10 @@ services: ports: - "8015:8008" # Map container port 8088 to host port 8082 restart: on-failure + + healer: + image: ahdinosaur/healer + volumes: + - /var/run/docker.sock:/tmp/docker.sock + restart: unless-stopped \ No newline at end of file From 07726ae4a8108422905d93e04eef43d2ccad9d08 Mon Sep 17 00:00:00 2001 From: ajaykumargdr Date: Thu, 28 Mar 2024 22:43:05 +0530 Subject: [PATCH 07/13] fix: change corestorage db name in consumer to fix conflicts --- runtime/lite/src/consumer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/lite/src/consumer.rs b/runtime/lite/src/consumer.rs index db794f08..9f2d6590 100644 --- a/runtime/lite/src/consumer.rs +++ b/runtime/lite/src/consumer.rs @@ -19,7 +19,7 @@ use dotenv::dotenv; #[tokio::main] async fn main() { dotenv().ok(); - let db = CoreStorage::new("runtime").unwrap(); + let db = CoreStorage::new("runtime-db").unwrap(); let logger = CoreLogger::new(Some("./ssb-consumer.log")); let state_manager = GlobalState::new(logger.clone()); From bca5c81eb01dc11139ffec19118304996d824f09 Mon Sep 17 00:00:00 2001 From: ajaykumargdr Date: Thu, 28 Mar 2024 23:31:29 +0530 Subject: [PATCH 08/13] chore: remove event type in ssb client publish event --- .../modules/kuska_ssb_client/client/client.rs | 10 +++---- .../modules/kuska_ssb_client/client/types.rs | 27 ------------------- 2 files changed, 5 insertions(+), 32 deletions(-) diff --git a/runtime/lite/src/modules/kuska_ssb_client/client/client.rs b/runtime/lite/src/modules/kuska_ssb_client/client/client.rs index 32c2f2cf..8f8c328a 100644 --- a/runtime/lite/src/modules/kuska_ssb_client/client/client.rs +++ b/runtime/lite/src/modules/kuska_ssb_client/client/client.rs @@ -187,14 +187,14 @@ impl Client { Ok(()) } - pub async fn publish_event(&mut self, event: Event) -> Result<()> { + pub async fn publish_event(&mut self, event: &str, section: &str, content: &str, mentions: Option>) -> Result<()> { let _req_id = self .api .publish_req_send(TypedMessage::Event { - event: event.event, - section: event.section, - content: event.content, - mentions: event.mentions, + event: event.to_string(), + section: section.to_string(), + content: content.to_string(), + mentions, }) .await?; diff --git a/runtime/lite/src/modules/kuska_ssb_client/client/types.rs b/runtime/lite/src/modules/kuska_ssb_client/client/types.rs index b7762386..d8e495e8 100644 --- a/runtime/lite/src/modules/kuska_ssb_client/client/types.rs +++ b/runtime/lite/src/modules/kuska_ssb_client/client/types.rs @@ -1,6 +1,5 @@ use super::*; -use kuska_ssb::api::dto::content::Mention; use serde::{Deserialize, Serialize}; pub struct Client { @@ -9,32 +8,6 @@ pub struct Client { pub sk: SecretKey, } -#[derive(Debug, Serialize, Deserialize)] -pub struct Event { - pub event: String, - pub section: String, - pub content: String, - - #[serde(skip_serializing_if = "Option::is_none")] - pub mentions: Option>, -} - -impl Event { - pub fn new( - event: String, - section: String, - content: String, - mentions: Option>, - ) -> Self { - Self { - event, - section, - content, - mentions, - } - } -} - #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] pub struct Content { #[serde(rename = "type")] From b4a9e05bc624954fcb574831b81140540e790d5a Mon Sep 17 00:00:00 2001 From: ajaykumargdr Date: Thu, 28 Mar 2024 23:37:14 +0530 Subject: [PATCH 09/13] fix: remove unwanted comments --- runtime/lite/src/modules/kuska_ssb_client/client/tests.rs | 2 -- runtime/lite/src/modules/kuska_ssb_client/client/types.rs | 1 - 2 files changed, 3 deletions(-) diff --git a/runtime/lite/src/modules/kuska_ssb_client/client/tests.rs b/runtime/lite/src/modules/kuska_ssb_client/client/tests.rs index 77ba146c..baf1beea 100644 --- a/runtime/lite/src/modules/kuska_ssb_client/client/tests.rs +++ b/runtime/lite/src/modules/kuska_ssb_client/client/tests.rs @@ -94,7 +94,6 @@ mod tests { #[ignore] // returns list of feeds posted by particular user async fn test_user_method() { - // use types::Event; dotenv().ok(); let secret = std::env::var("CONSUMER_SECRET").unwrap_or_else(|_| { @@ -136,7 +135,6 @@ mod tests { let feed_text: String = serde_json::from_value(feed_text.clone()).unwrap(); let new_event: Value = serde_json::from_str(&feed_text).unwrap(); - // let event = serde_json::from_value(event).unwrap(); assert_eq!(old_event, new_event); } diff --git a/runtime/lite/src/modules/kuska_ssb_client/client/types.rs b/runtime/lite/src/modules/kuska_ssb_client/client/types.rs index d8e495e8..c0a02c68 100644 --- a/runtime/lite/src/modules/kuska_ssb_client/client/types.rs +++ b/runtime/lite/src/modules/kuska_ssb_client/client/types.rs @@ -13,5 +13,4 @@ pub struct Content { #[serde(rename = "type")] pub types: String, pub text: String, - // mentions: Option>, } From 9036afcfad3c5609e0ab97a4223458a7f4573d17 Mon Sep 17 00:00:00 2001 From: ajaykumargdr Date: Thu, 28 Mar 2024 23:47:30 +0530 Subject: [PATCH 10/13] fix: remove duplicate dependencies --- runtime/lite/src/consumer.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/runtime/lite/src/consumer.rs b/runtime/lite/src/consumer.rs index c549a327..827a8e7d 100644 --- a/runtime/lite/src/consumer.rs +++ b/runtime/lite/src/consumer.rs @@ -4,7 +4,6 @@ use runtime::{ logger::CoreLogger, state_manager::GlobalState, modules::kuska_ssb_client::client::Client, - state_manager::GlobalState, storage::{CoreStorage, Storage}, Ctx, Logger, }; From 0964f79aceaeecaf0bd807df684fdf52a534f774 Mon Sep 17 00:00:00 2001 From: ajaykumargdr Date: Fri, 29 Mar 2024 01:13:44 +0530 Subject: [PATCH 11/13] chore: fix locking issue while running workflow --- runtime/lite/src/modules/kuska_ssb_client/client/client.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/runtime/lite/src/modules/kuska_ssb_client/client/client.rs b/runtime/lite/src/modules/kuska_ssb_client/client/client.rs index 7ee0ffb1..8eb31028 100644 --- a/runtime/lite/src/modules/kuska_ssb_client/client/client.rs +++ b/runtime/lite/src/modules/kuska_ssb_client/client/client.rs @@ -227,7 +227,7 @@ impl Client { let ctx = ctx.lock().unwrap(); let db = ctx.get_db(); let logger = ctx.get_logger(); - let state_manager = ctx.get_state_manager(); + let mut state_manager = ctx.get_state_manager(); if id == req_no { match msg { @@ -255,8 +255,7 @@ impl Client { "allowed_hosts": body.allowed_hosts }); - let workflow_index = ctx - .get_state_manager() + let workflow_index = state_manager .new_workflow(0, "hello"); let _ = From 0a5852b4f581e079ab432637e3ea0e288995872f Mon Sep 17 00:00:00 2001 From: ajaykumargdr Date: Fri, 29 Mar 2024 15:45:45 +0530 Subject: [PATCH 12/13] chore: remove all hardcoded values and create constant value --- runtime/lite/examples/basic-producer.rs | 9 +- runtime/lite/src/consumer.rs | 25 ++--- runtime/lite/src/modules/common/constants.rs | 13 ++- .../modules/kuska_ssb_client/client/client.rs | 4 +- .../modules/kuska_ssb_client/client/tests.rs | 99 +++++++++++-------- runtime/lite/src/modules/mod.rs | 4 +- 6 files changed, 88 insertions(+), 66 deletions(-) diff --git a/runtime/lite/examples/basic-producer.rs b/runtime/lite/examples/basic-producer.rs index 1994d1e3..136df173 100644 --- a/runtime/lite/examples/basic-producer.rs +++ b/runtime/lite/examples/basic-producer.rs @@ -1,7 +1,9 @@ use dotenv::dotenv; use kuska_ssb::{api::dto::content::Mention, keystore::read_patchwork_config}; +use runtime::constants::DEFAULT_PRODUCER_SECRET_DIR; use runtime::kuska_ssb_client::client::Client; use runtime::logger::{CoreLogger, Logger}; +use runtime::common::constants::{DEFAULT_PRODUCER_IP, DEFAULT_PRODUCER_PORT}; #[tokio::main] async fn main() { @@ -11,12 +13,11 @@ async fn main() { logger.info("starting producer..."); let secret = std::env::var("PRODUCER_SECRET").unwrap_or_else(|_| { - let home_dir = dirs::home_dir().unwrap(); - std::format!("{}/.ssb/secret", home_dir.to_string_lossy()) + DEFAULT_PRODUCER_SECRET_DIR.to_string() }); - let port = std::env::var("PRODUCER_PORT").unwrap_or("8014".to_string()); - let ip = std::env::var("PRODUCER_IP").unwrap_or("0.0.0.0".to_string()); + let port = std::env::var("PRODUCER_PORT").unwrap_or(DEFAULT_PRODUCER_PORT.to_string()); + let ip = std::env::var("PRODUCER_IP").unwrap_or(DEFAULT_PRODUCER_IP.to_string()); let pub_address = std::env::var("PUB_ADDRESS").expect("Pub address must be provided"); let invite = std::env::var("PRODUCER_INVITE").expect("invite address must be provided"); diff --git a/runtime/lite/src/consumer.rs b/runtime/lite/src/consumer.rs index c05b57fc..9af63bee 100644 --- a/runtime/lite/src/consumer.rs +++ b/runtime/lite/src/consumer.rs @@ -1,11 +1,9 @@ use kuska_ssb::keystore::read_patchwork_config; use runtime::{ - common::{constants::FOLLOW_LIST_KEY, RequestBody}, - logger::CoreLogger, - modules::kuska_ssb_client::client::Client, - state_manager::GlobalState, - storage::{CoreStorage, Storage}, - Ctx, Logger, + common::{ + constants::{DEFAULT_CONSUMER_IP, DEFAULT_CONSUMER_PORT, FOLLOW_LIST_KEY, HTTP_SERVER_ADDRESS}, + RequestBody, + }, constants::DEFAULT_CONSUMER_SECRET_DIR, logger::CoreLogger, modules::kuska_ssb_client::client::Client, state_manager::GlobalState, storage::{CoreStorage, Storage}, Ctx, Logger }; use std::{ net::{TcpListener, TcpStream}, @@ -28,20 +26,17 @@ async fn main() { let context = Arc::new(Mutex::new(Context::new(logger.clone(), db, state_manager))); let secret = std::env::var("CONSUMER_SECRET").unwrap_or_else(|_| { - let home_dir = dirs::home_dir().unwrap(); - std::format!("{}/.ssb/secret", home_dir.to_string_lossy()) + DEFAULT_CONSUMER_SECRET_DIR.to_string() }); - let port = std::env::var("CONSUMER_PORT").unwrap_or_else(|_| 8008.to_string()); - let ip = std::env::var("CONSUMER_IP").unwrap_or("0.0.0.0".to_string()); + let port = std::env::var("CONSUMER_PORT").unwrap_or_else(|_| DEFAULT_CONSUMER_PORT.to_string()); + let ip = std::env::var("CONSUMER_IP").unwrap_or(DEFAULT_CONSUMER_IP.to_string()); let mut file = async_std::fs::File::open(secret).await.unwrap(); let key = read_patchwork_config(&mut file).await.unwrap(); let ssb_context = context.clone(); let (channel_invite_tx, channel_invite_rx) = std::sync::mpsc::channel::(); - let mut client = Client::new(Some(key), ip, port) - .await - .unwrap(); + let mut client = Client::new(Some(key), ip, port).await.unwrap(); // Spawn the SSB feed listener task tokio::spawn(async move { let logger = ssb_context.clone().lock().unwrap().get_logger().clone(); @@ -60,8 +55,8 @@ async fn main() { // Spawn the HTTP server task tokio::spawn(async move { - let listener = TcpListener::bind("127.0.0.1:8080").expect("Failed to bind to address"); - logger.info("Listening on 127.0.0.1:8080..."); + let listener = TcpListener::bind(HTTP_SERVER_ADDRESS).expect("Failed to bind to address"); + logger.info(&format!("Listening on {HTTP_SERVER_ADDRESS}...")); for stream in listener.incoming() { match stream { Ok(stream) => { diff --git a/runtime/lite/src/modules/common/constants.rs b/runtime/lite/src/modules/common/constants.rs index 5fef5f90..100b6d92 100644 --- a/runtime/lite/src/modules/common/constants.rs +++ b/runtime/lite/src/modules/common/constants.rs @@ -1,2 +1,11 @@ - -pub const FOLLOW_LIST_KEY: &str = "follow_list"; \ No newline at end of file +pub const FOLLOW_LIST_KEY: &str = "follow_list"; +pub const HTTP_SERVER_ADDRESS: &str = "127.0.0.1:8080"; +pub const DEFAULT_CONSUMER_IP: &str = "0.0.0.0"; +pub const DEFAULT_PRODUCER_IP: &str = "0.0.0.0"; +pub const DEFAULT_PUB_IP: &str = "0.0.0.0"; +pub const DEFAULT_PUB_PORT: &str = "8013"; +pub const DEFAULT_CONSUMER_PORT: &str = "8014"; +pub const DEFAULT_PRODUCER_PORT: &str = "8015"; +pub const DEFAULT_CONSUMER_SECRET_DIR: &str = "scripts/secret/consumer_secret"; +pub const DEFAULT_PRODUCER_SECRET_DIR: &str = "scripts/secret/producer_secret"; +pub const DEFAULT_PUB_SECRET_DIR: &str = "scripts/secret/pubs_secret"; \ No newline at end of file diff --git a/runtime/lite/src/modules/kuska_ssb_client/client/client.rs b/runtime/lite/src/modules/kuska_ssb_client/client/client.rs index 3dab43e6..378a58ba 100644 --- a/runtime/lite/src/modules/kuska_ssb_client/client/client.rs +++ b/runtime/lite/src/modules/kuska_ssb_client/client/client.rs @@ -164,9 +164,9 @@ impl Client { Ok(()) } - pub async fn create_invite(&mut self) -> Result> { + pub async fn create_invite(&mut self) -> Result { let req_id = self.api.invite_create_req_send(1).await?; - let responses = self.get_responses(req_id, invite_create).await?; + let responses = self.get_async(req_id, invite_create).await?; Ok(responses) } diff --git a/runtime/lite/src/modules/kuska_ssb_client/client/tests.rs b/runtime/lite/src/modules/kuska_ssb_client/client/tests.rs index 740cdfac..2ef632a9 100644 --- a/runtime/lite/src/modules/kuska_ssb_client/client/tests.rs +++ b/runtime/lite/src/modules/kuska_ssb_client/client/tests.rs @@ -1,5 +1,10 @@ #[cfg(test)] mod tests { + use crate::common::constants::{ + DEFAULT_CONSUMER_IP, DEFAULT_CONSUMER_PORT, DEFAULT_CONSUMER_SECRET_DIR, + DEFAULT_PRODUCER_IP, DEFAULT_PRODUCER_PORT, DEFAULT_PRODUCER_SECRET_DIR, + }; + use crate::constants::{DEFAULT_PUB_IP, DEFAULT_PUB_PORT, DEFAULT_PUB_SECRET_DIR}; use crate::modules::kuska_ssb_client::client::Client; use dotenv::dotenv; use kuska_ssb::keystore::read_patchwork_config; @@ -15,12 +20,15 @@ mod tests { async fn test_client() { dotenv().ok(); + dbg!(std::env::current_dir().unwrap()); + let secret = std::env::var("CONSUMER_SECRET").unwrap_or_else(|_| { - let home_dir = dirs::home_dir().unwrap(); - std::format!("{}/.ssb/secret", home_dir.to_string_lossy()) + DEFAULT_CONSUMER_SECRET_DIR.to_string() }); - let ssb_port = std::env::var("CONSUMER_PORT").unwrap_or_else(|_| 8008.to_string()); - let ssb_ip = std::env::var("CONSUMER_IP").unwrap_or_else(|_| "0.0.0.0".to_string()); + let ssb_port = + std::env::var("CONSUMER_PORT").unwrap_or_else(|_| DEFAULT_CONSUMER_PORT.to_string()); + let ssb_ip = + std::env::var("CONSUMER_IP").unwrap_or_else(|_| DEFAULT_CONSUMER_IP.to_string()); let mut file = async_std::fs::File::open(secret).await.unwrap(); let config = read_patchwork_config(&mut file).await.unwrap(); @@ -33,11 +41,12 @@ mod tests { dotenv().ok(); let secret = std::env::var("CONSUMER_SECRET").unwrap_or_else(|_| { - let home_dir = dirs::home_dir().unwrap(); - std::format!("{}/.ssb/secret", home_dir.to_string_lossy()) + DEFAULT_CONSUMER_SECRET_DIR.to_string() }); - let ssb_port = std::env::var("CONSUMER_PORT").unwrap_or_else(|_| 8008.to_string()); - let ssb_ip = std::env::var("CONSUMER_IP").unwrap_or_else(|_| "0.0.0.0".to_string()); + let ssb_port = + std::env::var("CONSUMER_PORT").unwrap_or_else(|_| DEFAULT_CONSUMER_PORT.to_string()); + let ssb_ip = + std::env::var("CONSUMER_IP").unwrap_or_else(|_| DEFAULT_CONSUMER_IP.to_string()); let mut file = async_std::fs::File::open(secret).await.unwrap(); let config = read_patchwork_config(&mut file).await.unwrap(); @@ -51,11 +60,12 @@ mod tests { dotenv().ok(); let secret = std::env::var("CONSUMER_SECRET").unwrap_or_else(|_| { - let home_dir = dirs::home_dir().unwrap(); - std::format!("{}/.ssb/secret", home_dir.to_string_lossy()) + DEFAULT_CONSUMER_SECRET_DIR.to_string() }); - let ssb_port = std::env::var("CONSUMER_PORT").unwrap_or_else(|_| 8008.to_string()); - let ssb_ip = std::env::var("CONSUMER_IP").unwrap_or_else(|_| "0.0.0.0".to_string()); + let ssb_port = + std::env::var("CONSUMER_PORT").unwrap_or_else(|_| DEFAULT_CONSUMER_PORT.to_string()); + let ssb_ip = + std::env::var("CONSUMER_IP").unwrap_or_else(|_| DEFAULT_CONSUMER_IP.to_string()); let mut file = async_std::fs::File::open(secret).await.unwrap(); let config = read_patchwork_config(&mut file).await.unwrap(); @@ -74,11 +84,12 @@ mod tests { dotenv().ok(); let secret = std::env::var("CONSUMER_SECRET").unwrap_or_else(|_| { - let home_dir = dirs::home_dir().unwrap(); - std::format!("{}/.ssb/secret", home_dir.to_string_lossy()) + DEFAULT_CONSUMER_SECRET_DIR.to_string() }); - let ssb_port = std::env::var("CONSUMER_PORT").unwrap_or_else(|_| 8008.to_string()); - let ssb_ip = std::env::var("CONSUMER_IP").unwrap_or_else(|_| "0.0.0.0".to_string()); + let ssb_port = + std::env::var("CONSUMER_PORT").unwrap_or_else(|_| DEFAULT_CONSUMER_PORT.to_string()); + let ssb_ip = + std::env::var("CONSUMER_IP").unwrap_or_else(|_| DEFAULT_CONSUMER_IP.to_string()); let mut file = async_std::fs::File::open(secret).await.unwrap(); let config = read_patchwork_config(&mut file).await.unwrap(); @@ -97,11 +108,12 @@ mod tests { dotenv().ok(); let secret = std::env::var("CONSUMER_SECRET").unwrap_or_else(|_| { - let home_dir = dirs::home_dir().unwrap(); - std::format!("{}/.ssb/secret", home_dir.to_string_lossy()) + DEFAULT_CONSUMER_SECRET_DIR.to_string() }); - let ssb_port = std::env::var("CONSUMER_PORT").unwrap_or_else(|_| 8008.to_string()); - let ssb_ip = std::env::var("CONSUMER_IP").unwrap_or_else(|_| "0.0.0.0".to_string()); + let ssb_port = + std::env::var("CONSUMER_PORT").unwrap_or_else(|_| DEFAULT_CONSUMER_PORT.to_string()); + let ssb_ip = + std::env::var("CONSUMER_IP").unwrap_or_else(|_| DEFAULT_CONSUMER_IP.to_string()); let mut file = async_std::fs::File::open(secret).await.unwrap(); let config = read_patchwork_config(&mut file).await.unwrap(); @@ -144,12 +156,13 @@ mod tests { async fn test_close() { dotenv().ok(); - let secret = std::env::var("PRODUCER_SECRET").unwrap_or_else(|_| { - let home_dir = dirs::home_dir().unwrap(); - std::format!("{}/.ssb/secret", home_dir.to_string_lossy()) + let secret = std::env::var("CONSUMER_SECRET").unwrap_or_else(|_| { + DEFAULT_CONSUMER_SECRET_DIR.to_string() }); - let ssb_port = std::env::var("PRODUCER_PORT").unwrap_or_else(|_| 8008.to_string()); - let ssb_ip = std::env::var("CONSUMER_IP").unwrap_or_else(|_| "0.0.0.0".to_string()); + let ssb_port = + std::env::var("CONSUMER_PORT").unwrap_or_else(|_| DEFAULT_CONSUMER_PORT.to_string()); + let ssb_ip = + std::env::var("CONSUMER_IP").unwrap_or_else(|_| DEFAULT_CONSUMER_IP.to_string()); let mut file = async_std::fs::File::open(secret).await.unwrap(); let config = read_patchwork_config(&mut file).await.unwrap(); @@ -164,12 +177,13 @@ mod tests { async fn test_feed() { dotenv().ok(); - let secret = std::env::var("PRODUCER_SECRET").unwrap_or_else(|_| { - let home_dir = dirs::home_dir().unwrap(); - std::format!("{}/.ssb/secret", home_dir.to_string_lossy()) + let secret = std::env::var("CONSUMER_SECRET").unwrap_or_else(|_| { + DEFAULT_CONSUMER_SECRET_DIR.to_string() }); - let ssb_port = std::env::var("PRODUCER_PORT").unwrap_or_else(|_| 8008.to_string()); - let ssb_ip = std::env::var("CONSUMER_IP").unwrap_or_else(|_| "0.0.0.0".to_string()); + let ssb_port = + std::env::var("CONSUMER_PORT").unwrap_or_else(|_| DEFAULT_CONSUMER_PORT.to_string()); + let ssb_ip = + std::env::var("CONSUMER_IP").unwrap_or_else(|_| DEFAULT_CONSUMER_IP.to_string()); let mut file = async_std::fs::File::open(secret).await.unwrap(); let config = read_patchwork_config(&mut file).await.unwrap(); @@ -184,11 +198,12 @@ mod tests { dotenv().ok(); let secret = std::env::var("PRODUCER_SECRET").unwrap_or_else(|_| { - let home_dir = dirs::home_dir().unwrap(); - std::format!("{}/.ssb/secret", home_dir.to_string_lossy()) + DEFAULT_PRODUCER_SECRET_DIR.to_string() }); - let ssb_port = std::env::var("PRODUCER_PORT").unwrap_or_else(|_| 8008.to_string()); - let ssb_ip = std::env::var("CONSUMER_IP").unwrap_or_else(|_| "0.0.0.0".to_string()); + let ssb_port = + std::env::var("PRODUCER_PORT").unwrap_or_else(|_| DEFAULT_PRODUCER_PORT.to_string()); + let ssb_ip = + std::env::var("PRODUCER_IP").unwrap_or_else(|_| DEFAULT_PRODUCER_IP.to_string()); let mut file = async_std::fs::File::open(secret).await.unwrap(); let config = read_patchwork_config(&mut file).await.unwrap(); @@ -232,11 +247,12 @@ mod tests { dotenv().ok(); let secret = std::env::var("CONSUMER_SECRET").unwrap_or_else(|_| { - let home_dir = dirs::home_dir().unwrap(); - std::format!("{}/.ssb/secret", home_dir.to_string_lossy()) + DEFAULT_CONSUMER_SECRET_DIR.to_string() }); - let ssb_port = std::env::var("CONSUMER_PORT").unwrap_or_else(|_| 8008.to_string()); - let ssb_ip = std::env::var("CONSUMER_IP").unwrap_or_else(|_| "0.0.0.0".to_string()); + let ssb_port = + std::env::var("CONSUMER_PORT").unwrap_or_else(|_| DEFAULT_CONSUMER_PORT.to_string()); + let ssb_ip = + std::env::var("CONSUMER_IP").unwrap_or_else(|_| DEFAULT_CONSUMER_IP.to_string()); let mut file = async_std::fs::File::open(secret).await.unwrap(); let config = read_patchwork_config(&mut file).await.unwrap(); //TODO @@ -314,13 +330,12 @@ mod tests { async fn test_create_invite() { dotenv::dotenv().ok(); let secret = std::env::var("PUB_SECRET").unwrap_or_else(|_| { - let home_dir = dirs::home_dir().unwrap(); - std::format!("{}/.ssb/secret", home_dir.to_string_lossy()) + DEFAULT_PUB_SECRET_DIR.to_string() }); - let port = std::env::var("PUB_PORT").unwrap_or_else(|_| 8013.to_string()); + let port = std::env::var("PUB_PORT").unwrap_or_else(|_| DEFAULT_PUB_PORT.to_string()); let mut file = async_std::fs::File::open(secret).await.unwrap(); let key = read_patchwork_config(&mut file).await.unwrap(); - let mut client = Client::new(Some(key), "0.0.0.0".to_string(), port) + let mut client = Client::new(Some(key), DEFAULT_PUB_IP.to_string(), port) .await .unwrap(); let res = client.create_invite().await; diff --git a/runtime/lite/src/modules/mod.rs b/runtime/lite/src/modules/mod.rs index 33f66bce..9efd7f8b 100644 --- a/runtime/lite/src/modules/mod.rs +++ b/runtime/lite/src/modules/mod.rs @@ -4,4 +4,6 @@ pub mod logger; pub mod wasmtime_wasi_module; pub mod state_manager; pub mod storage; -pub mod common; \ No newline at end of file +pub mod common; + +pub use common::*; \ No newline at end of file From 682051683813b341d3c30a884a4810f06dd9910e Mon Sep 17 00:00:00 2001 From: ajaykumargdr Date: Fri, 29 Mar 2024 15:47:11 +0530 Subject: [PATCH 13/13] fix: remove unwanted spaces --- runtime/lite/src/modules/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/runtime/lite/src/modules/mod.rs b/runtime/lite/src/modules/mod.rs index 9efd7f8b..05f6b7ef 100644 --- a/runtime/lite/src/modules/mod.rs +++ b/runtime/lite/src/modules/mod.rs @@ -1,4 +1,3 @@ - pub mod kuska_ssb_client; pub mod logger; pub mod wasmtime_wasi_module;