diff --git a/Cargo.lock b/Cargo.lock index 2068bb5..19f646e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9,11 +9,42 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" dependencies = [ "cfg-if", + "getrandom", "once_cell", "version_check", "zerocopy", ] +[[package]] +name = "aho-corasick" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e60d3430d3a69478ad0993f19238d2df97c507009a52b3c10addcd7f6bcb916" +dependencies = [ + "memchr", +] + +[[package]] +name = "allocator-api2" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5" + +[[package]] +name = "base64" +version = "0.21.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" + +[[package]] +name = "bincode" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" +dependencies = [ + "serde", +] + [[package]] name = "block-buffer" version = "0.10.4" @@ -29,6 +60,15 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chumsky" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8eebd66744a15ded14960ab4ccdbfb51ad3b81f51f3f04a80adac98c985396c9" +dependencies = [ + "hashbrown 0.14.3", +] + [[package]] name = "cpufeatures" version = "0.2.12" @@ -53,6 +93,10 @@ name = "datakit-filter" version = "0.1.0" dependencies = [ "handlebars", + "jaq-core", + "jaq-interpret", + "jaq-parse", + "jaq-std", "lazy_static", "log", "proxy-wasm", @@ -62,6 +106,15 @@ dependencies = [ "url", ] +[[package]] +name = "deranged" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" +dependencies = [ + "powerfmt", +] + [[package]] name = "digest" version = "0.10.7" @@ -72,6 +125,18 @@ dependencies = [ "crypto-common", ] +[[package]] +name = "dyn-clone" +version = "1.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d6ef0072f8a535281e4876be788938b528e9a1d43900b82c2569af7da799125" + +[[package]] +name = "equivalent" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" + [[package]] name = "form_urlencoded" version = "1.2.1" @@ -91,6 +156,17 @@ dependencies = [ "version_check", ] +[[package]] +name = "getrandom" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94b22e06ecb0110981051723910cbf0b5f5e09a2062dd7663334ee79a9d1286c" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + [[package]] name = "handlebars" version = "5.1.2" @@ -114,6 +190,22 @@ dependencies = [ "ahash", ] +[[package]] +name = "hashbrown" +version = "0.14.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" +dependencies = [ + "ahash", + "allocator-api2", +] + +[[package]] +name = "hifijson" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18ae468bcb4dfecf0e4949ee28abbc99076b6a0077f51ddbc94dbfff8e6a870c" + [[package]] name = "idna" version = "0.5.0" @@ -124,12 +216,84 @@ dependencies = [ "unicode-normalization", ] +[[package]] +name = "indexmap" +version = "2.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "168fb715dda47215e360912c096649d23d58bf392ac62f73919e831745e40f26" +dependencies = [ + "equivalent", + "hashbrown 0.14.3", +] + [[package]] name = "itoa" version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" +[[package]] +name = "jaq-core" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03d6a5713b8f33675abfac79d1db0022a3f28764b2a6b96a185c199ad8dab86d" +dependencies = [ + "aho-corasick", + "base64", + "hifijson", + "jaq-interpret", + "libm", + "log", + "regex", + "time", + "urlencoding", +] + +[[package]] +name = "jaq-interpret" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f569e38e5fc677db8dfda89ee0b4c25b3f53e811b16434fd14bdc5b43fc362ac" +dependencies = [ + "ahash", + "dyn-clone", + "hifijson", + "indexmap", + "jaq-syn", + "once_cell", + "serde_json", +] + +[[package]] +name = "jaq-parse" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef6f8beb9f9922546419e774e24199e8a968f54c63a5a2323c8f3ef3321ace14" +dependencies = [ + "chumsky", + "jaq-syn", +] + +[[package]] +name = "jaq-std" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d7871c59297cbfdd18f6f1bbbafaad24e97fd555ee1e2a1be7a40a5a20f551a" +dependencies = [ + "bincode", + "jaq-parse", + "jaq-syn", +] + +[[package]] +name = "jaq-syn" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4d60101fb791b20c982731d848ed6e7d25363656497647c2093b68bd88398d6" +dependencies = [ + "serde", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -142,6 +306,12 @@ version = "0.2.153" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" +[[package]] +name = "libm" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" + [[package]] name = "log" version = "0.4.21" @@ -154,6 +324,12 @@ version = "2.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c8640c5d730cb13ebd907d8d04b52f55ac9a2eec55b440c8892f40d56c76c1d" +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + [[package]] name = "once_cell" version = "1.19.0" @@ -211,6 +387,12 @@ dependencies = [ "sha2", ] +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "proc-macro2" version = "1.0.79" @@ -226,7 +408,7 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "823b744520cd4a54ba7ebacbffe4562e839d6dcd8f89209f96a1ace4f5229cd4" dependencies = [ - "hashbrown", + "hashbrown 0.13.2", "log", ] @@ -239,6 +421,35 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "regex" +version = "1.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c117dbdfde9c8308975b6a18d71f3f385c89461f7b3fb054288ecf2a2058ba4c" +dependencies = [ + "aho-corasick", + "memchr", + "regex-automata", + "regex-syntax", +] + +[[package]] +name = "regex-automata" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86b83b8b9847f9bf95ef68afb0b8e6cdb80f498442f5179a29fad448fcc1eaea" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "adad44e29e4c806119491a7f06f03de4d1af22c3a680dd47f1e6e179439d1f56" + [[package]] name = "ryu" version = "1.0.17" @@ -327,6 +538,37 @@ dependencies = [ "syn", ] +[[package]] +name = "time" +version = "0.3.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8248b6521bb14bc45b4067159b9b6ad792e2d6d754d6c41fb50e29fefe38749" +dependencies = [ + "deranged", + "itoa", + "num-conv", + "powerfmt", + "serde", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" + +[[package]] +name = "time-macros" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ba3a3ef41e6672a2f0f001392bb5dcd3ff0a9992d618ca761a11c3121547774" +dependencies = [ + "num-conv", + "time-core", +] + [[package]] name = "tinyvec" version = "1.6.0" @@ -386,12 +628,24 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "urlencoding" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" + [[package]] name = "version_check" version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + [[package]] name = "zerocopy" version = "0.7.32" diff --git a/datakit-filter/Cargo.toml b/datakit-filter/Cargo.toml index 5d28e4a..ed07c7d 100644 --- a/datakit-filter/Cargo.toml +++ b/datakit-filter/Cargo.toml @@ -19,3 +19,7 @@ serde = { version = "*", features = ["derive"] } lazy_static = "*" "url" = "2.5.0" handlebars = "5.1.0" +jaq-interpret = "1.2.1" +jaq-parse = "1.0.2" +jaq-core = "1.2.1" +jaq-std = "1.2.1" diff --git a/datakit-filter/src/config.rs b/datakit-filter/src/config.rs index 1b47c72..1dfb845 100644 --- a/datakit-filter/src/config.rs +++ b/datakit-filter/src/config.rs @@ -234,13 +234,7 @@ impl Config { pub fn get_config_value serde::Deserialize<'de>>( bt: &BTreeMap, key: &str, - default: T, -) -> T { - match bt.get(key) { - Some(v) => match serde_json::from_value(v.clone()) { - Ok(s) => s, - Err(_) => default, - }, - None => default, - } +) -> Option { + bt.get(key) + .and_then(|v| serde_json::from_value(v.clone()).ok()) } diff --git a/datakit-filter/src/filter.rs b/datakit-filter/src/filter.rs index 8dfdc2e..42ceca1 100644 --- a/datakit-filter/src/filter.rs +++ b/datakit-filter/src/filter.rs @@ -342,6 +342,7 @@ proxy_wasm::main! {{ nodes::register_node("template", Box::new(nodes::template::TemplateFactory {})); nodes::register_node("call", Box::new(nodes::call::CallFactory {})); nodes::register_node("response", Box::new(nodes::response::ResponseFactory {})); + nodes::register_node("jq", Box::new(nodes::jq::JqFactory {})); proxy_wasm::set_log_level(LogLevel::Debug); proxy_wasm::set_root_context(|_| -> Box { diff --git a/datakit-filter/src/nodes.rs b/datakit-filter/src/nodes.rs index 6087b7f..b211cb5 100644 --- a/datakit-filter/src/nodes.rs +++ b/datakit-filter/src/nodes.rs @@ -7,6 +7,7 @@ use std::sync::{Mutex, OnceLock}; use crate::data::{Payload, State, State::*}; pub mod call; +pub mod jq; pub mod response; pub mod template; diff --git a/datakit-filter/src/nodes/call.rs b/datakit-filter/src/nodes/call.rs index ca8f17b..6a54c35 100644 --- a/datakit-filter/src/nodes/call.rs +++ b/datakit-filter/src/nodes/call.rs @@ -117,9 +117,9 @@ impl NodeFactory for CallFactory { bt: &BTreeMap, ) -> Result, String> { Ok(Box::new(CallConfig { - url: get_config_value(bt, "url", String::from("")), - method: get_config_value(bt, "method", String::from("GET")), - timeout: get_config_value(bt, "timeout", 60), + url: get_config_value(bt, "url").unwrap_or_else(|| String::from("")), + method: get_config_value(bt, "method").unwrap_or_else(|| String::from("GET")), + timeout: get_config_value(bt, "timeout").unwrap_or(60), })) } diff --git a/datakit-filter/src/nodes/jq.rs b/datakit-filter/src/nodes/jq.rs new file mode 100644 index 0000000..63fa6e9 --- /dev/null +++ b/datakit-filter/src/nodes/jq.rs @@ -0,0 +1,313 @@ +use jaq_core; +use jaq_interpret::{Ctx, Filter, FilterT, ParseCtx, RcIter, Val}; +use jaq_std; +use proxy_wasm::traits::*; +use serde_json::Value as JsonValue; +use std::any::Any; +use std::collections::BTreeMap; + +use crate::config::get_config_value; +use crate::data::{Payload, State}; +use crate::nodes::{Node, NodeConfig, NodeFactory}; + +#[derive(Clone, Debug)] +pub struct JqConfig { + jq: String, + inputs: Vec, +} + +impl NodeConfig for JqConfig { + fn as_any(&self) -> &dyn Any { + self + } +} + +#[derive(Clone)] +pub struct Jq { + inputs: Vec, + filter: Filter, +} + +impl TryFrom<&JqConfig> for Jq { + type Error = String; + + fn try_from(config: &JqConfig) -> Result { + Jq::new(&config.jq, config.inputs.clone()) + } +} + +struct Errors(Vec); + +impl> From for Errors { + fn from(value: T) -> Self { + Errors(vec![value.into()]) + } +} + +impl Errors { + fn new() -> Self { + Self(vec![]) + } + + fn push(&mut self, e: E) + where + E: Into, + { + self.0.push(e.into()); + } + + fn is_empty(&self) -> bool { + self.0.is_empty() + } + + #[cfg(test)] + fn into_inner(self) -> Vec { + self.0 + } +} + +impl From for State { + fn from(val: Errors) -> Self { + State::Fail(Some(Payload::Error(if val.is_empty() { + // should be unreachable + "unknown jq error".to_string() + } else { + val.0.join(", ") + }))) + } +} + +impl Jq { + fn new(jq: &str, inputs: Vec) -> Result { + let mut defs = ParseCtx::new(inputs.clone()); + + defs.insert_natives(jaq_core::core()); + defs.insert_defs(jaq_std::std()); + + if !defs.errs.is_empty() { + for (err, _) in defs.errs { + log::error!("jq: input error: {err}"); + } + return Err("failed parsing filter inputs".to_string()); + } + + let (parsed, errs) = jaq_parse::parse(jq, jaq_parse::main()); + if !errs.is_empty() { + for err in errs { + log::error!("filter parse error: {err}"); + } + return Err("invalid filter".to_string()); + } + + let Some(parsed) = parsed else { + return Err("parsed filter contains no main handler".to_string()); + }; + + // compile the filter in the context of the given definitions + let filter = defs.compile(parsed); + if !defs.errs.is_empty() { + for (err, _) in defs.errs { + log::error!("filter compile error: {err}"); + } + return Err("filter compilation failed".to_string()); + } + + let inputs = inputs.clone(); + + Ok(Jq { inputs, filter }) + } + + fn exec(&self, inputs: &[Option<&Payload>]) -> Result, Errors> { + if inputs.len() != self.inputs.len() { + return Err(Errors::from(format!( + "invalid number of inputs, expected: {}, got: {}", + self.inputs.len(), + inputs.len() + ))); + } + + let mut errs = Errors::new(); + + let vars_iter = self + .inputs + .iter() + .zip(inputs.iter()) + .map(|(name, input)| -> Val { + match input { + Some(input) => match input.to_json() { + Ok(value) => value.into(), + Err(e) => { + errs.push(format!("jq: input error at {name}: {e}")); + Val::Null + } + }, + None => Val::Null, + } + }); + + let input_iter = { + let iter = std::iter::empty::>(); + let iter = Box::new(iter) as Box>>; + RcIter::new(iter) + }; + let input = Val::Null; + + let ctx = Ctx::new(vars_iter, &input_iter); + + let results: Vec = self + .filter + .run((ctx, input)) + .map(|item| match item { + Ok(v) => v.into(), + Err(e) => { + errs.push(e.to_string()); + JsonValue::Null + } + }) + .collect(); + + if !errs.is_empty() { + return Err(errs); + } + + Ok(results) + } +} + +impl Node for Jq { + fn run(&self, _ctx: &dyn HttpContext, inputs: &[Option<&Payload>]) -> State { + match self.exec(inputs) { + Ok(mut results) => { + State::Done(match results.len() { + // empty + 0 => None, + + // single + 1 => { + let Some(item) = results.pop() else { + unreachable!(); + }; + Some(Payload::Json(item)) + } + + // more than one, return as an array + _ => Some(Payload::Json(results.into())), + }) + } + Err(errs) => errs.into(), + } + } +} + +pub struct JqFactory {} + +impl NodeFactory for JqFactory { + fn new_config( + &self, + _name: &str, + inputs: &[String], + bt: &BTreeMap, + ) -> Result, String> { + Ok(Box::new(JqConfig { + jq: get_config_value(bt, "jq").ok_or_else(|| ".".to_string())?, + inputs: inputs.to_vec(), + })) + } + + fn new_node(&self, config: &dyn NodeConfig) -> Box { + match config.as_any().downcast_ref::() { + Some(cc) => Box::new(Jq::try_from(cc).unwrap()), + None => panic!("incompatible NodeConfig"), + } + } +} + +#[cfg(test)] +mod test { + use super::*; + use serde_json::json; + + #[test] + fn filter_sanity() { + let jq = Jq::new("{ a: $a, b: $b }", vec!["a".to_string(), "b".to_string()]); + + let Ok(jq) = jq else { + panic!("jq error"); + }; + + let a = Payload::Json(json!({ + "foo": "bar", + "arr": [1, 2, 3], + })); + + let b = Payload::Json(json!("some text")); + + let inputs = vec![Some(&a), Some(&b)]; + + let res = jq.exec(inputs.as_slice()); + + let Ok(results) = res else { + panic!("unexpected jq error"); + }; + + assert_eq!( + results, + vec![json!({ + "a": { + "foo": "bar", + "arr": [1, 2, 3] + }, + "b": "some text" + })] + ); + } + + #[test] + fn invalid_filter_text() { + let jq = Jq::new("nope!", Vec::new()); + + let Err(e) = jq else { + panic!("expected invalid filter to result in an error"); + }; + + assert_eq!("invalid filter", e.to_string()); + } + + #[test] + fn empty_filter() { + let jq = Jq::new("", vec![]); + + let Err(e) = jq else { + panic!("expected invalid filter to result in an error"); + }; + + assert_eq!("invalid filter", e.to_string()); + } + + #[test] + fn filter_errors() { + let jq = Jq::new("error(\"woops\")", vec![]).unwrap(); + + let res = jq.exec(&[]); + let Err(errs) = res else { + panic!("expected a failure"); + }; + + assert_eq!(errs.into_inner(), vec!["woops"]); + } + + #[test] + fn invalid_number_of_inputs() { + let jq = Jq::new("$foo", vec!["foo".to_string()]).unwrap(); + + let res = jq.exec(&[]); + let Err(errs) = res else { + panic!("expected a failure"); + }; + + assert_eq!( + errs.into_inner(), + vec!["invalid number of inputs, expected: 1, got: 0"] + ); + } +} diff --git a/datakit-filter/src/nodes/response.rs b/datakit-filter/src/nodes/response.rs index a30a04b..1745f66 100644 --- a/datakit-filter/src/nodes/response.rs +++ b/datakit-filter/src/nodes/response.rs @@ -60,7 +60,7 @@ impl NodeFactory for ResponseFactory { bt: &BTreeMap, ) -> Result, String> { Ok(Box::new(ResponseConfig { - status: get_config_value(bt, "status", 200), + status: get_config_value(bt, "status").unwrap_or(200), })) } diff --git a/datakit-filter/src/nodes/template.rs b/datakit-filter/src/nodes/template.rs index ce832f2..bd63263 100644 --- a/datakit-filter/src/nodes/template.rs +++ b/datakit-filter/src/nodes/template.rs @@ -107,8 +107,9 @@ impl NodeFactory for TemplateFactory { ) -> Result, String> { Ok(Box::new(TemplateConfig { inputs: inputs.to_vec(), - template: get_config_value(bt, "template", String::from("")), - content_type: get_config_value(bt, "content_type", String::from("application/json")), + template: get_config_value(bt, "template").unwrap_or_else(|| String::from("")), + content_type: get_config_value(bt, "content_type") + .unwrap_or_else(|| String::from("application/json")), })) }