diff --git a/book/src/SUMMARY.md b/book/src/SUMMARY.md index ad78cba9..5d2876cf 100644 --- a/book/src/SUMMARY.md +++ b/book/src/SUMMARY.md @@ -36,4 +36,4 @@ - [Intersect Options](./advanced/intersect_options.md) - [Guides](./guides/README.md) - [Cardano => Kafka](./guides/cardano_2_kafka.md) - - [Custom networks](./guides/connecting_to_custom_networks.md) \ No newline at end of file + - [Custom networks](./guides/connecting_to_custom_networks.md) diff --git a/book/src/guides/connecting_to_custom_networks.md b/book/src/guides/connecting_to_custom_networks.md index 55061371..40b209ed 100644 --- a/book/src/guides/connecting_to_custom_networks.md +++ b/book/src/guides/connecting_to_custom_networks.md @@ -27,17 +27,17 @@ adahandle_policy = String Some details on the cofigurataion: -| Name | DataType | Description | -| :--- | :--- | :--- | -| byron_epoch_length | u32 | .... | -| byron_slot_length | u32 | .... | -| byron_known_slot | u32 | .... | -| byron_known_hash | String | .... | -| byron_known_time | u64 | .... | -| shelley_epoch_length | u32 | .... | -| shelley_slot_length | u32 | .... | -| shelley_known_slot | u32 | .... | -| shelley_known_hash | String | .... | -| shelley_known_time | u64 | .... | -| address_hrp | String | .... | -| adahandle_policy | String | Minting policy of AdaHandle on the network. | \ No newline at end of file +| Name | DataType | Description | +| :------------------- | :------- | :------------------------------------------ | +| byron_epoch_length | u32 | .... | +| byron_slot_length | u32 | .... | +| byron_known_slot | u32 | .... | +| byron_known_hash | String | .... | +| byron_known_time | u64 | .... | +| shelley_epoch_length | u32 | .... | +| shelley_slot_length | u32 | .... | +| shelley_known_slot | u32 | .... | +| shelley_known_hash | String | .... | +| shelley_known_time | u64 | .... | +| address_hrp | String | .... | +| adahandle_policy | String | Minting policy of AdaHandle on the network. | diff --git a/book/src/sinks/redis_streams.md b/book/src/sinks/redis_streams.md index 3e54c5f2..163d1825 100644 --- a/book/src/sinks/redis_streams.md +++ b/book/src/sinks/redis_streams.md @@ -32,4 +32,4 @@ stream_strategy = "ByEventType" - `type`: the literal value `Redis`. - `redis_server`: the redis server in the format `redis://[][:]@[:port][/]` - `stream_name` : the name of the redis stream for StreamStrategy `None`, default is "oura" if not specified -- `stream_strategy` : `None` or `ByEventType` \ No newline at end of file +- `stream_strategy` : `None` or `ByEventType` diff --git a/src/bin/oura/daemon.rs b/src/bin/oura/daemon.rs index 53c96215..06776c15 100644 --- a/src/bin/oura/daemon.rs +++ b/src/bin/oura/daemon.rs @@ -124,13 +124,11 @@ enum Sink { #[cfg(feature = "aws")] AwsS3(AwsS3Config), - #[cfg(feature = "redissink")] Redis(RedisConfig), - + #[cfg(feature = "gcp")] GcpPubSub(GcpPubSubConfig), - } fn bootstrap_sink(config: Sink, input: StageReceiver, utils: Arc) -> BootstrapResult { @@ -165,7 +163,6 @@ fn bootstrap_sink(config: Sink, input: StageReceiver, utils: Arc) -> Boot #[cfg(feature = "gcp")] Sink::GcpPubSub(c) => WithUtils::new(c, utils).bootstrap(input), - } } diff --git a/src/sinks/mod.rs b/src/sinks/mod.rs index 45aa6aed..8741c07b 100644 --- a/src/sinks/mod.rs +++ b/src/sinks/mod.rs @@ -31,4 +31,4 @@ pub mod aws_s3; pub mod redis; #[cfg(feature = "gcp")] -pub mod gcp_pubsub; \ No newline at end of file +pub mod gcp_pubsub; diff --git a/src/sinks/redis/mod.rs b/src/sinks/redis/mod.rs index fcae13f3..28290f4e 100644 --- a/src/sinks/redis/mod.rs +++ b/src/sinks/redis/mod.rs @@ -1,3 +1,3 @@ mod run; mod setup; -pub use setup::*; \ No newline at end of file +pub use setup::*; diff --git a/src/sinks/redis/run.rs b/src/sinks/redis/run.rs index 63d26f1c..782fbc70 100644 --- a/src/sinks/redis/run.rs +++ b/src/sinks/redis/run.rs @@ -1,27 +1,24 @@ #![allow(unused_variables)] -use std::sync::Arc; +use super::StreamStrategy; +use crate::{model::Event, pipelining::StageReceiver, utils::Utils, Error}; use serde::Serialize; -use serde_json::{json}; -use crate::{pipelining::StageReceiver, utils::Utils, Error, model::Event}; -use super::{StreamStrategy}; +use serde_json::json; +use std::sync::Arc; #[derive(Serialize)] pub struct RedisRecord { - pub event: Event, + pub event: Event, pub key: String, } impl From for RedisRecord { fn from(event: Event) -> Self { let key = key(&event); - RedisRecord { - event, - key, - } + RedisRecord { event, key } } } -fn key(event : &Event) -> String { +fn key(event: &Event) -> String { if let Some(fingerprint) = &event.fingerprint { fingerprint.clone() } else { @@ -30,26 +27,34 @@ fn key(event : &Event) -> String { } pub fn producer_loop( - input : StageReceiver, - utils : Arc, - conn : &mut redis::Connection, - stream_strategy : StreamStrategy, - redis_stream : String, + input: StageReceiver, + utils: Arc, + conn: &mut redis::Connection, + stream_strategy: StreamStrategy, + redis_stream: String, ) -> Result<(), Error> { for event in input.iter() { utils.track_sink_progress(&event); let payload = RedisRecord::from(event); - let stream : String; - match stream_strategy { - StreamStrategy::ByEventType => { - stream = payload.event.data.clone().to_string().to_lowercase(); - } - _ => { - stream = redis_stream.clone(); - } - } - log::debug!("Stream: {:?}, Key: {:?}, Event: {:?}", stream, payload.key, payload.event); - let _ : () = redis::cmd("XADD").arg(stream).arg("*").arg(&[(payload.key,json!(payload.event).to_string())]).query(conn)?; + + let stream = match stream_strategy { + StreamStrategy::ByEventType => payload.event.data.clone().to_string().to_lowercase(), + _ => redis_stream.clone(), + }; + + log::debug!( + "Stream: {:?}, Key: {:?}, Event: {:?}", + stream, + payload.key, + payload.event + ); + + let _: () = redis::cmd("XADD") + .arg(stream) + .arg("*") + .arg(&[(payload.key, json!(payload.event).to_string())]) + .query(conn)?; } + Ok(()) -} \ No newline at end of file +} diff --git a/src/sinks/redis/setup.rs b/src/sinks/redis/setup.rs index c41d30d3..671c85dd 100644 --- a/src/sinks/redis/setup.rs +++ b/src/sinks/redis/setup.rs @@ -1,4 +1,4 @@ -use redis::{Client}; +use redis::Client; use serde::Deserialize; use crate::{ @@ -11,14 +11,14 @@ use super::run::*; #[derive(Debug, Clone, Deserialize)] pub enum StreamStrategy { ByEventType, - None -} + None, +} #[derive(Debug, Deserialize)] pub struct Config { - pub redis_server : String, - pub stream_strategy : Option, - pub stream_name : Option, + pub redis_server: String, + pub stream_strategy: Option, + pub stream_name: Option, } impl SinkProvider for WithUtils { @@ -26,17 +26,24 @@ impl SinkProvider for WithUtils { let client = Client::open(self.inner.redis_server.clone())?; let mut connection = client.get_connection()?; log::debug!("Connected to Redis Database!"); + let stream_strategy = match self.inner.stream_strategy.clone() { - Some(strategy) => { - strategy - }, - _ => StreamStrategy::None + Some(strategy) => strategy, + _ => StreamStrategy::None, }; - let redis_stream = self.inner.stream_name.clone().unwrap_or("oura".to_string()); + + let redis_stream = self + .inner + .stream_name + .clone() + .unwrap_or_else(|| "oura".to_string()); + let utils = self.utils.clone(); let handle = std::thread::spawn(move || { - producer_loop(input, utils, &mut connection, stream_strategy, redis_stream).expect("redis sink loop failed"); + producer_loop(input, utils, &mut connection, stream_strategy, redis_stream) + .expect("redis sink loop failed"); }); + Ok(handle) } }