Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add KafkaRest as new output for Falco Sidekick #263

Merged
merged 1 commit into from
Aug 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ It works as a single endpoint for as many as you want `Falco` instances :
- [**AWS SNS**](https://aws.amazon.com/sns/features/)
- [**GCP PubSub**](https://cloud.google.com/pubsub)
- [**Apache Kafka**](https://kafka.apache.org/)
- [**Kafka Rest Proxy**](https://docs.confluent.io/platform/current/kafka-rest/index.html)
- [**RabbitMQ**](https://www.rabbitmq.com/)
- [**Azure Event Hubs**](https://azure.microsoft.com/en-in/services/event-hubs/)

Expand Down Expand Up @@ -350,6 +351,13 @@ kafka:
topic: "" # Name of the topic, if not empty, Kafka output is enabled
# minimumpriority: "debug" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default)

kafkarest:
address: "" # The full URL to the topic (example "http://kafkarest:8082/topics/test")
#version: 2 # Kafka Rest Proxy API version 2|1 (default: 2)
# minimumpriority: "debug" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default)
# mutualtls: false # if true, checkcert flag will be ignored (server cert will always be checked)
# checkcert: true # check if ssl certificate of the output is valid (default: true)

pagerduty:
routingKey: "" # Pagerduty Routing Key, if not empty, Pagerduty output is enabled
minimumpriority: "" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default)
Expand Down Expand Up @@ -699,6 +707,12 @@ care of lower/uppercases**) : `yaml: a.b --> envvar: A_B` :
- **KAFKA_MINIMUMPRIORITY**: minimum priority of event for using this output,
order is
`emergency|alert|critical|error|warning|notice|informational|debug or "" (default)`
- **KAFKAREST_ADDRESS**: The full URL to the topic (example "http://kafkarest:8082/topics/test")
- **KAFKAREST_VERSION**: Kafka Rest Proxy API version 2|1 (default: 2)
- **KAFKAREST_MINIMUMPRIORITY** : minimum priority of event for using this output, order is
`emergency|alert|critical|error|warning|notice|informational|debug or "" (default)`
- **KAFKAREST_MUTUALTLS** : enable mutual tls authentication for this output (default: `false`)
- **KAFKAREST_CHECKCERT** : check if ssl certificate of the output is valid (default: `true`)
- **PAGERDUTY_APIKEY**: Pagerduty API Key, if not empty, Pagerduty output is
_enabled_
- **PAGERDUTY_SERVICE**: Service to create an incident (mandatory)
Expand Down
7 changes: 7 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,12 @@ func getConfig() *types.Configuration {
v.SetDefault("Kafka.Topic", "")
v.SetDefault("Kafka.MinimumPriority", "")

v.SetDefault("KafkaRest.Address", "")
v.SetDefault("KafkaRest.Version", 2)
v.SetDefault("KafkaRest.MinimumPriority", "")
v.SetDefault("KafkaRest.MutualTls", false)
v.SetDefault("KafkaRest.CheckCert", true)

v.SetDefault("Pagerduty.RoutingKey", "")
v.SetDefault("Pagerduty.MinimumPriority", "")
v.SetDefault("Pagerduty.MutualTls", false)
Expand Down Expand Up @@ -360,6 +366,7 @@ func getConfig() *types.Configuration {
c.GCP.CloudRun.MinimumPriority = checkPriority(c.GCP.CloudRun.MinimumPriority)
c.Googlechat.MinimumPriority = checkPriority(c.Googlechat.MinimumPriority)
c.Kafka.MinimumPriority = checkPriority(c.Kafka.MinimumPriority)
c.KafkaRest.MinimumPriority = checkPriority(c.KafkaRest.MinimumPriority)
c.Pagerduty.MinimumPriority = checkPriority(c.Pagerduty.MinimumPriority)
c.Kubeless.MinimumPriority = checkPriority(c.Kubeless.MinimumPriority)
c.Openfaas.MinimumPriority = checkPriority(c.Openfaas.MinimumPriority)
Expand Down
7 changes: 7 additions & 0 deletions config_example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,13 @@ kafka:
topic: "" # Name of the topic, if not empty, Kafka output is enabled
# minimumpriority: "debug" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default)

kafkarest:
address: "" # The full URL to the topic (example "http://kafkarest:8082/topics/test")
#version: 2 # Kafka Rest Proxy API version 2|1 (default: 2)
# minimumpriority: "debug" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default)
# mutualtls: false # if true, checkcert flag will be ignored (server cert will always be checked)
# checkcert: true # check if ssl certificate of the output is valid (default: true)

pagerduty:
routingKey: "" # Pagerduty Routing Key, if not empty, Pagerduty output is enabled
minimumpriority: "" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/nats-io/nats.go v1.10.0
github.com/nats-io/stan.go v0.8.3
github.com/prometheus/client_golang v1.9.0
github.com/segmentio/kafka-go v0.4.10
github.com/segmentio/kafka-go v0.4.17
github.com/spf13/viper v1.7.1
github.com/streadway/amqp v1.0.0
github.com/stretchr/testify v1.7.0
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4=
github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20=
github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
Expand Down Expand Up @@ -422,6 +423,7 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxv
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
Expand Down Expand Up @@ -528,6 +530,8 @@ github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR
github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc=
github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pierrec/lz4 v2.6.0+incompatible h1:Ix9yFKn1nSPBLFl/yZknTp8TU5G4Ps0JDmguYK6iH1A=
github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down Expand Up @@ -580,6 +584,8 @@ github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/segmentio/kafka-go v0.4.10 h1:YnI820ZLfh710adINqwuCVtN3wbnLsLnT/+xhI0oooQ=
github.com/segmentio/kafka-go v0.4.10/go.mod h1:BVDwBTF24avtlj4l8/xsWNb4papVeg16+jO6/0qjvhA=
github.com/segmentio/kafka-go v0.4.17 h1:IyqRstL9KUTDb3kyGPOOa5VffokKWSEzN6geJ92dSDY=
github.com/segmentio/kafka-go v0.4.17/go.mod h1:19+Eg7KwrNKy/PFhiIthEPkO8k+ac7/ZYXwYM9Df10w=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
Expand Down
4 changes: 4 additions & 0 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,10 @@ func forwardEvent(falcopayload types.FalcoPayload) {
go kafkaClient.KafkaProduce(falcopayload)
}

if config.KafkaRest.Address != "" && (falcopayload.Priority >= types.Priority(config.KafkaRest.MinimumPriority) || falcopayload.Rule == testRule) {
go kafkaRestClient.KafkaRestPost(falcopayload)
}

if config.Pagerduty.RoutingKey != "" && (falcopayload.Priority >= types.Priority(config.Pagerduty.MinimumPriority) || falcopayload.Rule == testRule) {
go pagerdutyClient.PagerdutyPost(falcopayload)
}
Expand Down
11 changes: 11 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ var (
gcpClient *outputs.Client
googleChatClient *outputs.Client
kafkaClient *outputs.Client
kafkaRestClient *outputs.Client
pagerdutyClient *outputs.Client
gcpCloudRunClient *outputs.Client
kubelessClient *outputs.Client
Expand Down Expand Up @@ -362,6 +363,16 @@ func init() {
}
}

if config.KafkaRest.Address != "" {
var err error
kafkaRestClient, err = outputs.NewClient("KafkaRest", config.KafkaRest.Address, config.KafkaRest.MutualTLS, config.KafkaRest.CheckCert, config, stats, promStats, statsdClient, dogstatsdClient)
if err != nil {
config.KafkaRest.Address = ""
} else {
outputs.EnabledOutputs = append(outputs.EnabledOutputs, "KafkaRest")
}
}

if config.Pagerduty.RoutingKey != "" {
var err error
var url = "https://events.pagerduty.com/v2/enqueue"
Expand Down
64 changes: 64 additions & 0 deletions outputs/kafkarest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package outputs

import (
"encoding/base64"
"encoding/json"
"fmt"
"log"

"github.com/falcosecurity/falcosidekick/types"
)

// Records are the items inside the request wrapper
type Records struct {
Value string `json:"value"`
}

// KafkaRestPayload is the request wrapper for Kafka Rest
type KafkaRestPayload struct {
Records []Records `json:"records"`
}

// KafkaRestPost posts event the Kafka Rest Proxy
func (c *Client) KafkaRestPost(falcopayload types.FalcoPayload) {
c.Stats.KafkaRest.Add(Total, 1)

var version int
switch c.Config.KafkaRest.Version {
case 2:
version = c.Config.KafkaRest.Version
case 1:
version = c.Config.KafkaRest.Version
default:
version = 2
}
falcoMsg, err := json.Marshal(falcopayload)
if err != nil {
c.Stats.KafkaRest.Add(Error, 1)
c.PromStats.Outputs.With(map[string]string{"destination": "kafkarest", "status": Error}).Inc()
log.Printf("[ERROR] : Kafka Rest - %v - %v\n", "failed to marshalling message", err.Error())
return
}

c.ContentType = fmt.Sprintf("application/vnd.kafka.binary.v%d+json", version)

payload := KafkaRestPayload{
Records: []Records{{
Value: base64.StdEncoding.EncodeToString(falcoMsg),
}},
}

err = c.Post(payload)
if err != nil {
go c.CountMetric(Outputs, 1, []string{"output:kafkarest", "status:error"})
c.Stats.KafkaRest.Add(Error, 1)
c.PromStats.Outputs.With(map[string]string{"destination": "kafkarest", "status": Error}).Inc()
log.Printf("[ERROR] : Kafka Rest - %v\n", err.Error())
return
}

// Setting the success status
go c.CountMetric(Outputs, 1, []string{"output:kafkarest", "status:ok"})
c.Stats.KafkaRest.Add(OK, 1)
c.PromStats.Outputs.With(map[string]string{"destination": "kafkarest", "status": OK}).Inc()
}
1 change: 1 addition & 0 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func getInitStats() *types.Statistics {
GCPCloudRun: getOutputNewMap("gcpcloudrun"),
GoogleChat: getOutputNewMap("googlechat"),
Kafka: getOutputNewMap("kafka"),
KafkaRest: getOutputNewMap("kafkarest"),
Pagerduty: getOutputNewMap("pagerduty"),
Kubeless: getOutputNewMap("kubeless"),
Openfaas: getOutputNewMap("openfaas"),
Expand Down
10 changes: 10 additions & 0 deletions types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type Configuration struct {
GCP gcpOutputConfig
Googlechat GooglechatConfig
Kafka kafkaConfig
KafkaRest KafkaRestConfig
Pagerduty PagerdutyConfig
Kubeless kubelessConfig
Openfaas openfaasConfig
Expand Down Expand Up @@ -332,6 +333,14 @@ type kafkaConfig struct {
MinimumPriority string
}

type KafkaRestConfig struct {
Address string
Version int
MinimumPriority string
CheckCert bool
MutualTLS bool
}

type PagerdutyConfig struct {
RoutingKey string
MinimumPriority string
Expand Down Expand Up @@ -435,6 +444,7 @@ type Statistics struct {
GCPCloudRun *expvar.Map
GoogleChat *expvar.Map
Kafka *expvar.Map
KafkaRest *expvar.Map
Pagerduty *expvar.Map
CloudEvents *expvar.Map
Kubeless *expvar.Map
Expand Down