Skip to content

Commit

Permalink
refactor: worker errors (#2416)
Browse files Browse the repository at this point in the history
  • Loading branch information
mehulmathur16 authored Jul 21, 2024
1 parent e8c1404 commit ddf3fb1
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 41 deletions.
7 changes: 4 additions & 3 deletions src/cli/javascript/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,18 +214,19 @@ mod test {
use rquickjs::{Context, FromJs, IntoJs, Object, Runtime, String as JsString};

use super::*;
use crate::core::error::worker;
use crate::core::http::Response;
use crate::core::worker::{Command, WorkerRequest, WorkerResponse};

fn create_test_response() -> Result<WorkerResponse> {
fn create_test_response() -> worker::Result<WorkerResponse> {
let mut headers = HeaderMap::new();
headers.insert("content-type", "application/json".parse().unwrap());
let response = crate::core::http::Response {
status: reqwest::StatusCode::OK,
headers,
body: Bytes::from("Hello, World!"),
};
let js_response: Result<WorkerResponse> = response.try_into();
let js_response: worker::Result<WorkerResponse> = response.try_into();
js_response
}

Expand All @@ -245,7 +246,7 @@ mod test {
#[test]
fn test_from_js_response() {
let js_response = create_test_response().unwrap();
let response: Result<crate::core::http::Response<Bytes>> = js_response.try_into();
let response: worker::Result<crate::core::http::Response<Bytes>> = js_response.try_into();
assert!(response.is_ok());
let response = response.unwrap();
assert_eq!(response.status, reqwest::StatusCode::OK);
Expand Down
41 changes: 20 additions & 21 deletions src/cli/javascript/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::thread;
use async_graphql_value::ConstValue;
use rquickjs::{Context, Ctx, FromJs, Function, IntoJs, Value};

use crate::core::error::worker;
use crate::core::worker::{Command, Event, WorkerRequest};
use crate::core::{blueprint, WorkerIO};

Expand Down Expand Up @@ -82,7 +83,7 @@ impl Drop for Runtime {

#[async_trait::async_trait]
impl WorkerIO<Event, Command> for Runtime {
async fn call(&self, name: &str, event: Event) -> anyhow::Result<Option<Command>> {
async fn call(&self, name: &str, event: Event) -> worker::Result<Option<Command>> {
let script = self.script.clone();
let name = name.to_string(); // TODO
if let Some(runtime) = &self.tokio_runtime {
Expand All @@ -93,14 +94,14 @@ impl WorkerIO<Event, Command> for Runtime {
})
.await?
} else {
anyhow::bail!("JS Runtime is stopped")
Err(worker::Error::JsRuntimeStopped)
}
}
}

#[async_trait::async_trait]
impl WorkerIO<ConstValue, ConstValue> for Runtime {
async fn call(&self, name: &str, input: ConstValue) -> anyhow::Result<Option<ConstValue>> {
async fn call(&self, name: &str, input: ConstValue) -> worker::Result<Option<ConstValue>> {
let script = self.script.clone();
let name = name.to_string();
let value = serde_json::to_string(&input)?;
Expand All @@ -112,7 +113,7 @@ impl WorkerIO<ConstValue, ConstValue> for Runtime {
})
.await?
} else {
anyhow::bail!("JS Runtime is stopped")
Err(worker::Error::JsRuntimeStopped)
}
}
}
Expand All @@ -139,50 +140,48 @@ fn prepare_args<'js>(ctx: &Ctx<'js>, req: WorkerRequest) -> rquickjs::Result<(Va
Ok((object.into_value(),))
}

fn call(name: String, event: Event) -> anyhow::Result<Option<Command>> {
fn call(name: String, event: Event) -> worker::Result<Option<Command>> {
LOCAL_RUNTIME.with_borrow_mut(|cell| {
let runtime = cell
.get_mut()
.ok_or(anyhow::anyhow!("JS runtime not initialized"))?;
let runtime = cell.get_mut().ok_or(worker::Error::RuntimeNotInitialized)?;
runtime.0.with(|ctx| match event {
Event::Request(req) => {
let fn_as_value = ctx
.globals()
.get::<&str, Function>(name.as_str())
.map_err(|_| anyhow::anyhow!("globalThis not initialized"))?;
.map_err(|e| worker::Error::GlobalThisNotInitialised(e.to_string()))?;

let function = fn_as_value
.as_function()
.ok_or(anyhow::anyhow!("`{name}` is not a function"))?;
.ok_or(worker::Error::InvalidFunction(name))?;

let args = prepare_args(&ctx, req)?;
let args =
prepare_args(&ctx, req).map_err(|e| worker::Error::Rquickjs(e.to_string()))?;
let command: Option<Value> = function.call(args).ok();
command
.map(|output| Command::from_js(&ctx, output))
.transpose()
.map_err(|e| anyhow::anyhow!("deserialize failed: {e}"))
.map_err(|e| worker::Error::DeserializeFailed(e.to_string()))
}
})
})
}

fn execute_inner(name: String, value: String) -> anyhow::Result<ConstValue> {
fn execute_inner(name: String, value: String) -> worker::Result<ConstValue> {
LOCAL_RUNTIME.with_borrow_mut(|cell| {
let runtime = cell
.get_mut()
.ok_or(anyhow::anyhow!("JS runtime not initialized"))?;
let runtime = cell.get_mut().ok_or(worker::Error::RuntimeNotInitialized)?;
runtime.0.with(|ctx| {
let fn_as_value = ctx
.globals()
.get::<_, rquickjs::Function>(&name)
.map_err(|_| anyhow::anyhow!("globalThis not initialized"))?;
.map_err(|e| worker::Error::GlobalThisNotInitialised(e.to_string()))?;

let function = fn_as_value
.as_function()
.ok_or(anyhow::anyhow!("`{}` is not a function", name))?;
let val: String = function.call((value, ))
.map_err(|_| anyhow::anyhow!("unable to parse value from js function: {} maybe because it's not returning a string?", name,))?;
Ok::<_, anyhow::Error>(serde_json::from_str(&val)?)
.ok_or(worker::Error::InvalidFunction(name.clone()))?;
let val: String = function
.call((value,))
.map_err(|e| worker::Error::FunctionValueParseError(e.to_string(), name))?;
Ok::<_, worker::Error>(serde_json::from_str(&val)?)
})
})
}
126 changes: 126 additions & 0 deletions src/core/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
use std::fmt::Display;

use derive_more::From;

#[derive(From, thiserror::Error, Debug)]
pub enum Error {
#[error("Worker Error")]
Worker(worker::Error),
}

pub mod worker {
use std::sync::Arc;

use derive_more::{DebugCustom, From};
use tokio::task::JoinError;

#[derive(From, DebugCustom, Clone)]
pub enum Error {
#[debug(fmt = "Failed to initialize worker")]
InitializationFailed,

#[debug(fmt = "Worker communication error")]
Communication,

#[debug(fmt = "Serde Json Error: {}", _0)]
SerdeJson(Arc<serde_json::Error>),

#[debug(fmt = "Request Clone Failed")]
RequestCloneFailed,

#[debug(fmt = "Hyper Header To Str Error: {}", _0)]
HyperHeaderStr(Arc<hyper::header::ToStrError>),

#[debug(fmt = "JS Runtime Stopped Error")]
JsRuntimeStopped,

#[debug(fmt = "CLI Error : {}", _0)]
CLI(String),

#[debug(fmt = "Join Error : {}", _0)]
Join(Arc<JoinError>),

#[debug(fmt = "Runtime not initialized")]
RuntimeNotInitialized,

#[debug(fmt = "{} is not a function", _0)]
#[from(ignore)]
InvalidFunction(String),

#[debug(fmt = "Rquickjs Error: {}", _0)]
#[from(ignore)]
Rquickjs(String),

#[debug(fmt = "Deserialize Failed: {}", _0)]
#[from(ignore)]
DeserializeFailed(String),

#[debug(fmt = "globalThis not initialized: {}", _0)]
#[from(ignore)]
GlobalThisNotInitialised(String),

#[debug(
fmt = "Error: {}\nUnable to parse value from js function: {} maybe because it's not returning a string?",
_0,
_1
)]
FunctionValueParseError(String, String),

#[debug(fmt = "Error : {}", _0)]
Anyhow(Arc<anyhow::Error>),
}

impl From<serde_json::Error> for Error {
fn from(error: serde_json::Error) -> Self {
Error::SerdeJson(Arc::new(error))
}
}

impl From<hyper::header::ToStrError> for Error {
fn from(error: hyper::header::ToStrError) -> Self {
Error::HyperHeaderStr(Arc::new(error))
}
}

impl From<JoinError> for Error {
fn from(error: JoinError) -> Self {
Error::Join(Arc::new(error))
}
}

impl From<anyhow::Error> for Error {
fn from(error: anyhow::Error) -> Self {
Error::Anyhow(Arc::new(error))
}
}

pub type Result<A> = std::result::Result<A, Error>;
}

impl Display for worker::Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
worker::Error::InitializationFailed => write!(f, "Failed to initialize worker"),
worker::Error::Communication => write!(f, "Worker communication error"),
worker::Error::SerdeJson(error) => write!(f, "Serde Json Error: {}", error),
worker::Error::RequestCloneFailed => write!(f, "Request Clone Failed"),
worker::Error::HyperHeaderStr(error) => {
write!(f, "Hyper Header To Str Error: {}", error)
}
worker::Error::JsRuntimeStopped => write!(f, "JS Runtime Stopped Error"),
worker::Error::CLI(msg) => write!(f, "CLI Error: {}", msg),
worker::Error::Join(error) => write!(f, "Join Error: {}", error),
worker::Error::RuntimeNotInitialized => write!(f, "Runtime not initialized"),
worker::Error::InvalidFunction(function_name) => {
write!(f, "{} is not a function", function_name)
}
worker::Error::Rquickjs(error) => write!(f, "Rquickjs error: {}", error),
worker::Error::DeserializeFailed(error) => write!(f, "Deserialize Failed: {}", error),
worker::Error::GlobalThisNotInitialised(error) => write!(f, "globalThis not initialized: {}", error),
worker::Error::FunctionValueParseError(error, name) => write!(f, "Error: {}\nUnable to parse value from js function: {} maybe because it's not returning a string?", error, name),
worker::Error::Anyhow(msg) => write!(f, "Error: {}", msg),
}
}
}

pub type Result<A, E> = std::result::Result<A, E>;
15 changes: 8 additions & 7 deletions src/core/ir/error.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::sync::Arc;

use async_graphql::{ErrorExtensions, Value as ConstValue};
use derive_more::From;
use thiserror::Error;

use crate::core::auth;
#[derive(Debug, Error, Clone)]
use crate::core::error::worker;
#[derive(From, Debug, Error, Clone)]
pub enum Error {
#[error("IOException: {0}")]
IOException(String),
Expand All @@ -21,13 +23,18 @@ pub enum Error {
APIValidationError(Vec<String>),

#[error("ExprEvalError: {0}")]
#[from(ignore)]
ExprEvalError(String),

#[error("DeserializeError: {0}")]
#[from(ignore)]
DeserializeError(String),

#[error("Authentication Failure: {0}")]
AuthError(auth::error::Error),

#[error("Worker Error: {0}")]
WorkerError(worker::Error),
}

impl ErrorExtensions for Error {
Expand All @@ -49,12 +56,6 @@ impl ErrorExtensions for Error {
}
}

impl From<auth::error::Error> for Error {
fn from(value: auth::error::Error) -> Self {
Error::AuthError(value)
}
}

impl<'a> From<crate::core::valid::ValidationError<&'a str>> for Error {
fn from(value: crate::core::valid::ValidationError<&'a str>) -> Self {
Error::APIValidationError(
Expand Down
4 changes: 3 additions & 1 deletion src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub mod data_loader;
pub mod directive;
pub mod document;
pub mod endpoint;
pub mod error;
pub mod generator;
pub mod graphql;
pub mod grpc;
Expand Down Expand Up @@ -46,6 +47,7 @@ use std::hash::Hash;
use std::num::NonZeroU64;

use async_graphql_value::ConstValue;
pub use error::{Error, Result};
use http::Response;
use ir::model::IoId;
pub use mustache::Mustache;
Expand Down Expand Up @@ -90,7 +92,7 @@ pub type EntityCache = dyn Cache<Key = IoId, Value = ConstValue>;
#[async_trait::async_trait]
pub trait WorkerIO<In, Out>: Send + Sync + 'static {
/// Calls a global JS function
async fn call(&self, name: &str, input: In) -> anyhow::Result<Option<Out>>;
async fn call(&self, name: &str, input: In) -> error::worker::Result<Option<Out>>;
}

pub fn is_default<T: Default + Eq>(val: &T) -> bool {
Expand Down
Loading

0 comments on commit ddf3fb1

Please sign in to comment.