diff --git a/Cargo.lock b/Cargo.lock index f29f17b0047..78aa333c8d8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1805,6 +1805,17 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" +[[package]] +name = "idna" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "418a0a6fab821475f634efe3ccc45c013f742efe03d853e8d3355d5cb850ecf8" +dependencies = [ + "matches", + "unicode-bidi", + "unicode-normalization", +] + [[package]] name = "idna" version = "0.5.0" @@ -1815,6 +1826,12 @@ dependencies = [ "unicode-normalization", ] +[[package]] +name = "if_chain" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb56e1aa765b4b4f3aadfab769793b7087bb03a4ea4920644a6d238e2df5b9ed" + [[package]] name = "indexmap" version = "2.2.1" @@ -2034,6 +2051,12 @@ dependencies = [ "regex-automata 0.1.10", ] +[[package]] +name = "matches" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2532096657941c2fea9c289d370a250971c689d4f143798ff67113ec042024a5" + [[package]] name = "matchit" version = "0.7.3" @@ -2266,6 +2289,40 @@ dependencies = [ "rand", ] +[[package]] +name = "netdata-collector" +version = "1.1.1" +dependencies = [ + "anyhow", + "netdata_collector", + "tedge_actors", + "tedge_api", + "tedge_config", + "tedge_mqtt_ext", + "tokio", +] + +[[package]] +name = "netdata-plugin" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4ea74900614650952d0e117d21eb06735420cd88bb99607318aca5d49483605" +dependencies = [ + "thiserror", + "validator", +] + +[[package]] +name = "netdata_collector" +version = "1.1.1" +dependencies = [ + "async-trait", + "netdata-plugin", + "tedge_actors", + "tedge_api", + "time", +] + [[package]] name = "nix" version = "0.26.4" @@ -4769,7 +4826,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "31e6302e3bb753d46e83516cae55ae196fc0c309407cf11ab35cc51a4c2a4633" dependencies = [ "form_urlencoded", - "idna", + "idna 0.5.0", "percent-encoding", ] @@ -4805,6 +4862,49 @@ dependencies = [ "log", ] +[[package]] +name = "validator" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d0f08911ab0fee2c5009580f04615fa868898ee57de10692a45da0c3bcc3e5e" +dependencies = [ + "idna 0.2.3", + "lazy_static", + "regex", + "serde", + "serde_derive", + "serde_json", + "url", + "validator_derive", + "validator_types", +] + +[[package]] +name = "validator_derive" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d85135714dba11a1bd0b3eb1744169266f1a38977bf4e3ff5e2e1acb8c2b7eee" +dependencies = [ + "if_chain", + "lazy_static", + "proc-macro-error", + "proc-macro2", + "quote", + "regex", + "syn 1.0.109", + "validator_types", +] + +[[package]] +name = "validator_types" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ded9d97e1d42327632f5f3bae6403c04886e2de3036261ef42deebd931a6a291" +dependencies = [ + "proc-macro2", + "syn 1.0.109", +] + [[package]] name = "valuable" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 309652301b8..d23c85e8543 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ members = [ "crates/tests/*", "plugins/c8y_firmware_plugin", "plugins/c8y_remote_access_plugin", + "plugins/netdata-collector", "plugins/tedge_apt_plugin", ] resolver = "2" diff --git a/crates/extensions/netdata_collector/Cargo.toml b/crates/extensions/netdata_collector/Cargo.toml new file mode 100644 index 00000000000..2399c023124 --- /dev/null +++ b/crates/extensions/netdata_collector/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "netdata_collector" +version.workspace = true +authors.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +async-trait = { workspace = true } +netdata-plugin = "0.2.0" +tedge_actors = { workspace = true } +tedge_api = { workspace = true } +time = { workspace = true } + +[lints] +workspace = true diff --git a/crates/extensions/netdata_collector/src/actor.rs b/crates/extensions/netdata_collector/src/actor.rs new file mode 100644 index 00000000000..7edbabd94df --- /dev/null +++ b/crates/extensions/netdata_collector/src/actor.rs @@ -0,0 +1,88 @@ +use crate::message::MetricPoints; +use crate::TEdgeNetDataCollectorBuilder; +use netdata_plugin::collector::Collector; +use netdata_plugin::Chart; +use netdata_plugin::Dimension; +use std::collections::HashMap; +use std::collections::HashSet; +use tedge_actors::Actor; +use tedge_actors::LoggingReceiver; +use tedge_actors::MessageReceiver; +use tedge_actors::RuntimeError; + +pub struct TEdgeNetDataCollector { + pub(crate) input: LoggingReceiver, +} + +impl TEdgeNetDataCollector { + pub fn builder() -> TEdgeNetDataCollectorBuilder { + TEdgeNetDataCollectorBuilder::default() + } +} + +#[async_trait::async_trait] +impl Actor for TEdgeNetDataCollector { + fn name(&self) -> &str { + "NetData" + } + + async fn run(mut self) -> Result<(), RuntimeError> { + let mut writer = std::io::stdout(); + let mut c = Collector::new(&mut writer); + let mut charts = HashMap::new(); + + while let Some(points) = self.input.recv().await { + // Declare any new chart + let updated_charts: HashSet = + points.iter().map(|p| p.chart_id.clone()).collect(); + for chart_id in updated_charts.iter() { + if !charts.contains_key(chart_id) { + let chart = new_chart(chart_id); + c.add_chart(&chart).unwrap(); + charts.insert(chart_id.to_string(), HashSet::new()); + } + } + + // Declare any new dimension + for p in points.iter() { + if let Some(dims) = charts.get_mut(&p.chart_id) { + let dim_id = p.dimension_id.clone(); + if !dims.contains(&dim_id) { + let dim = new_dim(&dim_id); + c.add_dimension(&p.chart_id, &dim).unwrap(); + dims.insert(dim_id); + } + } + } + + // Publish the metrics + for p in points { + c.prepare_value(&p.chart_id, &p.dimension_id, p.value) + .unwrap(); + } + for chart_id in updated_charts { + c.commit_chart(&chart_id).unwrap(); + } + } + + Ok(()) + } +} + +fn new_chart(chart_id: &str) -> Chart { + Chart { + type_id: chart_id, + name: chart_id, + title: chart_id, + units: "units", + ..Default::default() + } +} + +fn new_dim(dim_id: &str) -> Dimension { + Dimension { + id: dim_id, + name: dim_id, + ..Default::default() + } +} diff --git a/crates/extensions/netdata_collector/src/builder.rs b/crates/extensions/netdata_collector/src/builder.rs new file mode 100644 index 00000000000..a8d25df720f --- /dev/null +++ b/crates/extensions/netdata_collector/src/builder.rs @@ -0,0 +1,51 @@ +use crate::MetricPoints; +use crate::TEdgeNetDataCollector; +use std::convert::Infallible; +use tedge_actors::futures::channel::mpsc; +use tedge_actors::Builder; +use tedge_actors::CloneSender; +use tedge_actors::DynSender; +use tedge_actors::LoggingReceiver; +use tedge_actors::MessageSink; +use tedge_actors::RuntimeRequest; +use tedge_actors::RuntimeRequestSink; + +pub struct TEdgeNetDataCollectorBuilder { + input: LoggingReceiver, + input_sender: DynSender, + signal_sender: DynSender, +} + +impl Default for TEdgeNetDataCollectorBuilder { + fn default() -> Self { + let (input_sender, input_receiver) = mpsc::channel(10); + let (signal_sender, signal_receiver) = mpsc::channel(10); + let input = LoggingReceiver::new("NetData".into(), input_receiver, signal_receiver); + + TEdgeNetDataCollectorBuilder { + input, + input_sender: input_sender.into(), + signal_sender: signal_sender.into(), + } + } +} + +impl MessageSink for TEdgeNetDataCollectorBuilder { + fn get_sender(&self) -> DynSender { + self.input_sender.sender_clone() + } +} + +impl RuntimeRequestSink for TEdgeNetDataCollectorBuilder { + fn get_signal_sender(&self) -> DynSender { + self.signal_sender.sender_clone() + } +} + +impl Builder for TEdgeNetDataCollectorBuilder { + type Error = Infallible; + + fn try_build(self) -> Result { + Ok(TEdgeNetDataCollector { input: self.input }) + } +} diff --git a/crates/extensions/netdata_collector/src/lib.rs b/crates/extensions/netdata_collector/src/lib.rs new file mode 100644 index 00000000000..af1ee938b9c --- /dev/null +++ b/crates/extensions/netdata_collector/src/lib.rs @@ -0,0 +1,7 @@ +mod actor; +mod builder; +mod message; + +pub use actor::*; +pub use builder::*; +pub use message::*; diff --git a/crates/extensions/netdata_collector/src/message.rs b/crates/extensions/netdata_collector/src/message.rs new file mode 100644 index 00000000000..5879a919177 --- /dev/null +++ b/crates/extensions/netdata_collector/src/message.rs @@ -0,0 +1,92 @@ +use std::convert::Infallible; +use std::vec; +use tedge_api::measurement::parse_str; +use tedge_api::measurement::MeasurementVisitor; +use tedge_api::measurement::ThinEdgeJsonParserError; + +#[derive(Debug)] +pub struct MetricPoint { + pub chart_id: String, + pub dimension_id: String, + pub value: i64, +} + +#[derive(Debug)] +pub struct MetricPoints { + points: Vec, +} + +impl IntoIterator for MetricPoints { + type Item = MetricPoint; + type IntoIter = vec::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.points.into_iter() + } +} + +impl MetricPoints { + pub fn parse( + device: &str, + measurement_type: &str, + thin_edge_json: &str, + ) -> Result { + let mut builder = MetricPointsBuilder::new(device, measurement_type); + parse_str(thin_edge_json, &mut builder)?; + Ok(MetricPoints { + points: builder.points, + }) + } + + pub fn iter(&self) -> impl Iterator { + self.points.iter() + } +} + +struct MetricPointsBuilder { + chart_id: String, + points: Vec, +} + +impl MetricPointsBuilder { + fn new(device: &str, measurement_type: &str) -> Self { + let chart_id = format!("tedge.{device}/{measurement_type}"); + MetricPointsBuilder { + chart_id, + points: vec![], + } + } +} + +impl MeasurementVisitor for MetricPointsBuilder { + type Error = Infallible; + + fn visit_timestamp(&mut self, _value: time::OffsetDateTime) -> Result<(), Self::Error> { + // ignored: time is managed by netdata + Ok(()) + } + + fn visit_measurement(&mut self, name: &str, value: f64) -> Result<(), Self::Error> { + self.points.push(MetricPoint { + chart_id: self.chart_id.clone(), + dimension_id: name.to_string(), + value: value.round() as i64, + }); + Ok(()) + } + + fn visit_text_property(&mut self, _name: &str, _value: &str) -> Result<(), Self::Error> { + // ignored: netdata only accepts number + Ok(()) + } + + fn visit_start_group(&mut self, _group: &str) -> Result<(), Self::Error> { + // ignored for now + Ok(()) + } + + fn visit_end_group(&mut self) -> Result<(), Self::Error> { + // ignored for now + Ok(()) + } +} diff --git a/plugins/netdata-collector/Cargo.toml b/plugins/netdata-collector/Cargo.toml new file mode 100644 index 00000000000..6c05730fa8b --- /dev/null +++ b/plugins/netdata-collector/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "netdata-collector" +version.workspace = true +authors.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow = { workspace = true } +netdata_collector = { path = "../../crates/extensions/netdata_collector" } +tedge_actors = { workspace = true } +tedge_api = { workspace = true } +tedge_config = { workspace = true } +tedge_mqtt_ext = { workspace = true } +tokio = { workspace = true, features = ["rt-multi-thread"] } + +[lints] +workspace = true diff --git a/plugins/netdata-collector/src/main.rs b/plugins/netdata-collector/src/main.rs new file mode 100644 index 00000000000..745f231e070 --- /dev/null +++ b/plugins/netdata-collector/src/main.rs @@ -0,0 +1,58 @@ +use netdata_collector::MetricPoints; +use netdata_collector::TEdgeNetDataCollector; +use tedge_actors::MessageSink; +use tedge_actors::Runtime; +use tedge_api::mqtt_topics::Channel; +use tedge_api::mqtt_topics::ChannelFilter; +use tedge_api::mqtt_topics::EntityFilter; +use tedge_api::mqtt_topics::MqttSchema; +use tedge_config::TEdgeConfig; +use tedge_config::TEdgeConfigLocation; +use tedge_mqtt_ext::MqttActorBuilder; +use tedge_mqtt_ext::MqttMessage; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let config = TEdgeConfig::try_new(TEdgeConfigLocation::default())?; + let mqtt_config = config.mqtt_config()?; + let mqtt_schema = MqttSchema::with_root(config.mqtt.topic_root.clone()); + + let mut runtime = Runtime::new(); + let mut mqtt = MqttActorBuilder::new(mqtt_config.with_session_name("tedge-netdata-collect")); + let netdata = TEdgeNetDataCollector::builder(); + + let measurements = mqtt_schema.topics(EntityFilter::AnyEntity, ChannelFilter::Measurement); + netdata.connect_mapped_source(measurements, &mut mqtt, move |msg| { + extract_metrics(&mqtt_schema, msg) + }); + + runtime.spawn(mqtt).await?; + runtime.spawn(netdata).await?; + runtime.run_to_completion().await?; + + Ok(()) +} + +fn extract_metrics( + schema: &MqttSchema, + message: MqttMessage, +) -> impl Iterator { + let Ok((entity, Channel::Measurement { measurement_type })) = + schema.entity_channel_of(&message.topic) + else { + return None.into_iter(); + }; + + let Ok(thin_edge_json) = message.payload_str() else { + return None.into_iter(); + }; + + let device = entity + .default_device_name() + .unwrap_or_else(|| entity.as_str()); + let Ok(points) = MetricPoints::parse(device, &measurement_type, thin_edge_json) else { + return None.into_iter(); + }; + + Some(points).into_iter() +}