Skip to content

Commit

Permalink
suricata: consume from multiple queues
Browse files Browse the repository at this point in the history
  • Loading branch information
markuskont committed Mar 25, 2023
1 parent aa9a021 commit 000a77d
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 59 deletions.
121 changes: 64 additions & 57 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cmd

import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
Expand Down Expand Up @@ -147,61 +148,67 @@ func run(cmd *cobra.Command, args []string) {
}

if viper.GetBool("suricata.enabled") {
shardsSuricata, err := processing.NewDataMapShards(
poolCtx,
viper.GetInt("workers.suricata.correlate"),
"suricata events",
)
if err != nil {
log.Fatal(err)
keys := viper.GetStringSlice("suricata.redis.key.input")
if len(keys) == 0 {
log.Fatal("Suricata redis key missing")
}
defer shardsSuricata.Close()

if err := processing.CorrelateSuricataEvents(processing.SuricataCorrelateConfig{
ConfigStreamWorkers: processing.ConfigStreamWorkers{
Name: "correlate suricata",
Workers: viper.GetInt("workers.suricata.correlate"),
Pool: pool,
Ctx: poolCtx,
Logger: log,
},
InputEventShards: shardsSuricata,
CorrelatedEventShards: shardsCorrelations,
Output: processing.ConfigStreamRedis{
Client: redis.NewClient(&redis.Options{
Addr: viper.GetString("suricata.redis.host"),
DB: viper.GetInt("suricata.redis.db"),
Password: viper.GetString("suricata.redis.password"),
}),
Key: viper.GetString("suricata.redis.key.output"),
},
}); err != nil {
log.Fatal(err)
}

balancerSuricata, err := shardsSuricata.Handler("community_id")
if err != nil {
log.Fatal(err)
}
if err := processing.ConsumeRedis(processing.ConfigConsumeRedis{
ConfigStreamRedis: processing.ConfigStreamRedis{
Client: redis.NewClient(&redis.Options{
Addr: viper.GetString("suricata.redis.host"),
DB: viper.GetInt("suricata.redis.db"),
Password: viper.GetString("suricata.redis.password"),
}),
Key: viper.GetString("suricata.redis.key.input"),
},
Handler: balancerSuricata,
ConfigStreamWorkers: processing.ConfigStreamWorkers{
Name: "consume suricata",
Workers: viper.GetInt("workers.suricata.consume"),
Pool: pool,
Ctx: poolCtx,
Logger: log,
},
}); err != nil {
log.Fatal(err)
for _, keyInput := range keys {
shardsSuricata, err := processing.NewDataMapShards(
poolCtx,
viper.GetInt("workers.suricata.correlate"),
fmt.Sprintf("suricata events - %s", keyInput),
)
if err != nil {
log.Fatal(err)
}
defer shardsSuricata.Close()

if err := processing.CorrelateSuricataEvents(processing.SuricataCorrelateConfig{
ConfigStreamWorkers: processing.ConfigStreamWorkers{
Name: "correlate suricata",
Workers: viper.GetInt("workers.suricata.correlate"),
Pool: pool,
Ctx: poolCtx,
Logger: log,
},
InputEventShards: shardsSuricata,
CorrelatedEventShards: shardsCorrelations,
Output: processing.ConfigStreamRedis{
Client: redis.NewClient(&redis.Options{
Addr: viper.GetString("suricata.redis.host"),
DB: viper.GetInt("suricata.redis.db"),
Password: viper.GetString("suricata.redis.password"),
}),
Key: keyInput + "_" + viper.GetString("suricata.redis.key.output_suffix"),
},
}); err != nil {
log.Fatal(err)
}

balancerSuricata, err := shardsSuricata.Handler("community_id")
if err != nil {
log.Fatal(err)
}
if err := processing.ConsumeRedis(processing.ConfigConsumeRedis{
ConfigStreamRedis: processing.ConfigStreamRedis{
Client: redis.NewClient(&redis.Options{
Addr: viper.GetString("suricata.redis.host"),
DB: viper.GetInt("suricata.redis.db"),
Password: viper.GetString("suricata.redis.password"),
}),
Key: keyInput,
},
Handler: balancerSuricata,
ConfigStreamWorkers: processing.ConfigStreamWorkers{
Name: "consume suricata",
Workers: viper.GetInt("workers.suricata.consume"),
Pool: pool,
Ctx: poolCtx,
Logger: log,
},
}); err != nil {
log.Fatal(err)
}
}
}

Expand Down Expand Up @@ -307,9 +314,9 @@ func init() {
pFlags.String("suricata-redis-password", "", "Password for EVE stream. Empty value disables authentication.")
viper.BindPFlag("suricata.redis.password", pFlags.Lookup("suricata-redis-password"))

pFlags.String("suricata-redis-key-input", "suricata", "Redis key for EVE stream.")
pFlags.StringSlice("suricata-redis-key-input", []string{"suricata"}, "Redis key for EVE stream.")
viper.BindPFlag("suricata.redis.key.input", pFlags.Lookup("suricata-redis-key-input"))

pFlags.String("suricata-redis-key-output", "suricata-edr", "Redis key for EVE stream.")
viper.BindPFlag("suricata.redis.key.output", pFlags.Lookup("suricata-redis-key-output"))
pFlags.String("suricata-redis-key-output-suffix", "edr", "Redis key for EVE stream.")
viper.BindPFlag("suricata.redis.key.output_suffix", pFlags.Lookup("suricata-redis-key-output-suffix"))
}
3 changes: 2 additions & 1 deletion processing/stream_consume_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ func ConsumeRedis(c ConfigConsumeRedis) error {

c.Pool.Go(func() error {
lctx := c.Logger.
WithField("worker", worker)
WithField("worker", worker).
WithField("queue", c.Key)

lctx.Info("worker setting up")
loop:
Expand Down
3 changes: 2 additions & 1 deletion processing/stream_suricata.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ func CorrelateSuricataEvents(c SuricataCorrelateConfig) error {
c.Pool.Go(func() error {
lctx := c.Logger.
WithField("worker", worker).
WithField("stream", "suricata")
WithField("stream", "suricata").
WithField("queue", c.Output.Key)
lctx.Info("worker setting up")

chEvents := c.InputEventShards.Channels[worker]
Expand Down

0 comments on commit 000a77d

Please sign in to comment.