Skip to content

Commit

Permalink
feat: Introduce Redis Streams sink (txpipe#253)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sbcdn authored and kodemill committed Jun 24, 2022
1 parent f99d346 commit 2dfa385
Show file tree
Hide file tree
Showing 9 changed files with 226 additions and 11 deletions.
79 changes: 70 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ tokio = { version = "1.18.2", optional = true, features = ["rt"] }
# required for CI to complete successfully
openssl = { version = "0.10", optional = true, features = ["vendored"] }

# redis support
redis = { version ="0.21.5", optional = true, features = ["tokio-comp"]}

# features: gcp
cloud-pubsub = { version = "0.8.0", optional = true }
async-recursion = { version = "1.0.0", optional = true }
Expand All @@ -68,4 +71,5 @@ kafkasink = ["kafka", "openssl"]
elasticsink = ["elasticsearch", "tokio"]
fingerprint = ["murmur3"]
aws = ["aws-config", "aws-sdk-sqs", "aws-sdk-lambda", "aws-sdk-s3", "tokio"]
redissink = ["redis", "tokio"]
gcp = ["cloud-pubsub", "tokio", "async-recursion"]
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ Oura is in its essence just a pipeline for proccessing events. Each stage of the
- [x] Kafka topic
- [x] Elasticsearch index / data stream
- [x] Rotating log files with compression
- [ ] Redis streams
- [x] Redis streams
- [x] AWS SQS queue
- [x] AWS Lambda call
- [x] AWS S3 objects
Expand Down
35 changes: 35 additions & 0 deletions book/src/sinks/redis_streams.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Redis Streams

A sink that implements a _Redis Stream_ producer. The sink allows different stream strategies.

It is possible to send all Event to a single stream or create multiple streams, one for each event type.
Both modes use `<millisecondsTime>-<sequenceNumber>` as unique entry ID (redis stream standard).
With StreamStrategy `None` a single redis-stream is used for all events, a stream name can be defined by `stream_name`, the default stream name is `oura`.
StreamStrategy `ByEventType` creates its own redis-stream for each event type. By appling filters it is possible to define the streams which should be created.

The sink will use fingerprints as keys, if fingerprints are active otherwise the event type name in lowercase is used.

## Configuration

_Single Stream Mode:_

```toml
[sink]
type = "Redis"
redis_server = "redis://default:@127.0.0.1:6379/0"
stream_name = "mystream"
stream_strategy = "None"
```

_Multi Stream Mode:_
```toml
[sink]
type = "Redis"
redis_server = "redis://default:@127.0.0.1:6379/0"
stream_strategy = "ByEventType"
```

- `type`: the literal value `Redis`.
- `redis_server`: the redis server in the format `redis://[<username>][:<password>]@<hostname>[:port][/<db>]`
- `stream_name` : the name of the redis stream for StreamStrategy `None`, default is "oura" if not specified
- `stream_strategy` : `None` or `ByEventType`
12 changes: 12 additions & 0 deletions src/bin/oura/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ use oura::sinks::aws_lambda::Config as AwsLambdaConfig;
#[cfg(feature = "aws")]
use oura::sinks::aws_s3::Config as AwsS3Config;

#[cfg(feature = "redissink")]
use oura::sinks::redis::Config as RedisConfig;

#[cfg(feature = "gcp")]
use oura::sinks::gcp_pubsub::Config as GcpPubSubConfig;

Expand Down Expand Up @@ -121,8 +124,13 @@ enum Sink {
#[cfg(feature = "aws")]
AwsS3(AwsS3Config),


#[cfg(feature = "redissink")]
Redis(RedisConfig),

#[cfg(feature = "gcp")]
GcpPubSub(GcpPubSubConfig),

}

fn bootstrap_sink(config: Sink, input: StageReceiver, utils: Arc<Utils>) -> BootstrapResult {
Expand Down Expand Up @@ -152,8 +160,12 @@ fn bootstrap_sink(config: Sink, input: StageReceiver, utils: Arc<Utils>) -> Boot
#[cfg(feature = "aws")]
Sink::AwsS3(c) => WithUtils::new(c, utils).bootstrap(input),

#[cfg(feature = "redissink")]
Sink::Redis(c) => WithUtils::new(c, utils).bootstrap(input),

#[cfg(feature = "gcp")]
Sink::GcpPubSub(c) => WithUtils::new(c, utils).bootstrap(input),

}
}

Expand Down
5 changes: 4 additions & 1 deletion src/sinks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,8 @@ pub mod aws_lambda;
#[cfg(feature = "aws")]
pub mod aws_s3;

#[cfg(feature = "redissink")]
pub mod redis;

#[cfg(feature = "gcp")]
pub mod gcp_pubsub;
pub mod gcp_pubsub;
3 changes: 3 additions & 0 deletions src/sinks/redis/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
mod run;
mod setup;
pub use setup::*;
55 changes: 55 additions & 0 deletions src/sinks/redis/run.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#![allow(unused_variables)]
use std::sync::Arc;
use serde::Serialize;
use serde_json::{json};
use crate::{pipelining::StageReceiver, utils::Utils, Error, model::Event};
use super::{StreamStrategy};

#[derive(Serialize)]
pub struct RedisRecord {
pub event: Event,
pub key: String,
}

impl From<Event> for RedisRecord {
fn from(event: Event) -> Self {
let key = key(&event);
RedisRecord {
event,
key,
}
}
}

fn key(event : &Event) -> String {
if let Some(fingerprint) = &event.fingerprint {
fingerprint.clone()
} else {
event.data.clone().to_string().to_lowercase()
}
}

pub fn producer_loop(
input : StageReceiver,
utils : Arc<Utils>,
conn : &mut redis::Connection,
stream_strategy : StreamStrategy,
redis_stream : String,
) -> Result<(), Error> {
for event in input.iter() {
utils.track_sink_progress(&event);
let payload = RedisRecord::from(event);
let stream : String;
match stream_strategy {
StreamStrategy::ByEventType => {
stream = payload.event.data.clone().to_string().to_lowercase();
}
_ => {
stream = redis_stream.clone();
}
}
log::debug!("Stream: {:?}, Key: {:?}, Event: {:?}", stream, payload.key, payload.event);
let _ : () = redis::cmd("XADD").arg(stream).arg("*").arg(&[(payload.key,json!(payload.event).to_string())]).query(conn)?;
}
Ok(())
}
42 changes: 42 additions & 0 deletions src/sinks/redis/setup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use redis::{Client};
use serde::Deserialize;

use crate::{
pipelining::{BootstrapResult, SinkProvider, StageReceiver},
utils::WithUtils,
};

use super::run::*;

#[derive(Debug, Clone, Deserialize)]
pub enum StreamStrategy {
ByEventType,
None
}

#[derive(Debug, Deserialize)]
pub struct Config {
pub redis_server : String,
pub stream_strategy : Option<StreamStrategy>,
pub stream_name : Option<String>,
}

impl SinkProvider for WithUtils<Config> {
fn bootstrap(&self, input: StageReceiver) -> BootstrapResult {
let client = Client::open(self.inner.redis_server.clone())?;
let mut connection = client.get_connection()?;
log::debug!("Connected to Redis Database!");
let stream_strategy = match self.inner.stream_strategy.clone() {
Some(strategy) => {
strategy
},
_ => StreamStrategy::None
};
let redis_stream = self.inner.stream_name.clone().unwrap_or("oura".to_string());
let utils = self.utils.clone();
let handle = std::thread::spawn(move || {
producer_loop(input, utils, &mut connection, stream_strategy, redis_stream).expect("redis sink loop failed");
});
Ok(handle)
}
}

0 comments on commit 2dfa385

Please sign in to comment.