Skip to content

πŸš€ Eventstream pipeline to preprocess and resend from some source to some storage

License

Notifications You must be signed in to change notification settings

geniusrabbit/eventstream

Repository files navigation

Eventstream message pipeline service

License Docker Pulls Go Report Card Coverage Status Testing Status Publish Docker Status

Eventstream pipeline for storing and re-sending events inside the system.

go get -v -u github.com/geniusrabbit/eventstream/cmd/eventstream

Run eventstream service in docker

docker run -d -it --rm -v ./custom.config.hcl:/config.hcl \
  geniusrabbit/eventstream

Source list

  • kafka
  • NATS & NATS stream
  • Redis stream

Storage list

  • Clickhouse
  • Vertica
  • kafka
  • NATS
  • Redis stream

Config example

Supports two file formats YAML & HCL

stores {
  clickhouse_1 {
    connect = "{{@env:CLICKHOUSE_STORE_CONNECT}}"
    buffer = 1000
    init_query = [<<Q
      CREATE TABLE IF NOT EXISTS stat.testlog (
         timestamp        DateTime
       , datemark         Date default toDate(timestamp)
       , service          String
       , msg              String
       , error            String
       , created_at       DateTime default now()
      ) Engine=Memory COMMENT 'The test table';
    Q]
  }
  kafka_1 {
    connect = "{{@env:KAFKA_EVENTS_CONNECT}}"
  }
}

// Source could be any supported stream service like kafka, nats, etc...
sources {
  nats_1 {
    connect = "{{@env:NATS_SOURCE_CONNECT}}"
    format  = "json"
  }
}

// Streams it's pipelines which have source and destination store
streams {
  log_1 {
    store  = "clickhouse_1"
    source = "nats_1"
    target = "testlog"
    // Optional if fields in log and in message the same
    // Transforms into:
    //   INSERT INTO testlog (service, msg, error, timestamp) VALUES($srv, $msg, $err, @toDateTime($timestamp))
    fields = "service=srv,msg,error=err,timestamp=@toDateTime({{timestamp:date}})"
    where  = "srv == \"main\""
    metrics = [
      {
        name = "log.counter"
        type = "counter"
        tags {
          server  = "{{srv}}"
        }
      }
    ]
  }
  kafka_retranslate {
    store  = "kafka_1"
    source = "nats_1"
    targets = [
      {
        fields = {
          server = "{{srv}}"
          timestamp = "{{timestamp}}"
        }
        where = "type = \"statistic\""
      }
    ]
    where = "srv = \"events\""
  }
}

Metrics

Metrics helps analyze some events during processing and monitor streams state. Every stream can process metrics with the keyword metrics.

Example:

metrics = [
  {
    name = "log.counter"
    type = "counter"
    tags { server = "{{srv}}" }
  },
  {
    name = "actions.counter"
    type = "counter"
    tags { action = "{{action}}" }
  },
  {...}
]

All metrics available by URL /metrics with prometheus protocol. To activate metrics need to define profile connection port.

SERVER_PROFILE_MODE=net
SERVER_PROFILE_LISTEN=:6060

Health check

curl "http://hostname:port/health-check"
{"status":"OK"}

TODO

  • Add processing custom error metrics
  • Add MySQL database storage
  • Add PostgreSQL database storage
  • Add MongoDB database storage
  • Add HTTP/Ping driver storage
  • Add Redis database storage
  • Prepare evetstream as Framework extension
  • Add Kafka stream writer support
  • Add NATS stream writer support
  • Add Redis stream source/storage support
  • Add RabbitMQ stream source/storage support
  • Add health check API
  • Add customizable prometheus metrics
  • Add 'where' stream condition (http://github.com/Knetic/govaluate)
  • Ack message only if success
  • Buffering all data until be stored
  • Add support HCL config