From e6af2b62c9545387e0bb2039af9695a47822f7f4 Mon Sep 17 00:00:00 2001 From: Thomas Labarussias Date: Mon, 12 Jun 2023 14:44:06 +0200 Subject: [PATCH] add OpenObserve output Signed-off-by: Thomas Labarussias --- README.md | 31 +++++++++++++++++++++++++++++++ config.go | 12 ++++++++++++ config_example.yaml | 12 ++++++++++++ handlers.go | 4 ++++ main.go | 11 +++++++++++ outputs/openobserve.go | 40 ++++++++++++++++++++++++++++++++++++++++ stats.go | 1 + types/types.go | 15 +++++++++++++++ 8 files changed, 126 insertions(+) create mode 100644 outputs/openobserve.go diff --git a/README.md b/README.md index c1813be75..ab2061db4 100644 --- a/README.md +++ b/README.md @@ -60,6 +60,7 @@ It works as a single endpoint for as many as you want `Falco` instances : - [**Grafana**](https://grafana.com/) (annotations) - **Syslog** - [**Zincsearch**](https://docs.zincsearch.com/) +- [**OpenObserve**](https://openobserve.ai) ### Object Storage @@ -614,6 +615,18 @@ n8n: # headerauthvalue: "" # Header Auth Value to authenticate with N8N # checkcert: true # check if ssl certificate of the output is valid (default: true) # minimumpriority: "" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default) + +openobserve: + # hostport: "" # http://{domain or ip}:{port}, if not empty, OpenObserve output is enabled + # organizationName: "default" # Organization name (default: default) + # streamName: "falco" # Stream name (default: falco) + # minimumpriority: "" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default) + # mutualtls: false # if true, checkcert flag will be ignored (server cert will always be checked) + # checkcert: true # check if ssl certificate of the output is valid (default: true) + # username: "a" # use this username to authenticate to OpenObserve if the username is not empty (default: "") + # password: "" # use this password to authenticate to OpenObserve if the password is not empty (default: "") + # customHeaders: # Custom headers to add in POST, useful for Authentication + # key: value ``` Usage : @@ -1115,6 +1128,24 @@ order is - **N8N_PASSWORD**: Header Auth Value to authenticate with N8N - **N8N_CHECKCERT**: check if ssl certificate of the output is valid (default: true) - **N8N_MINIMUMPRIORITY**: minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default) +- **OPENOBSERVE_HOSTPORT** : Elasticsearch http://host:port, if not `empty`, + OpenObserve is _enabled_ +- **OPENOBSERVE_ORGANIZATIONNAME** : Organization name (default: default) +- **OPENOBSERVE_STREAMNAME** : Stream name (default: falco) +- **OPENOBSERVE_MINIMUMPRIORITY** : minimum priority of event for using this + output, order is + `emergency|alert|critical|error|warning|notice|informational|debug or "" (default)` + `monthly`, `annually`, `none` +- **OPENOBSERVE_MUTUALTLS** : enable mutual tls authentication for this output (default: + `false`) +- **OPENOBSERVE_CHECKCERT** : check if ssl certificate of the output is valid (default: + `true`) +- **OPENOBSERVE_USERNAME** : use this username to authenticate to OpenObserve if the + username is not empty (default: "") +- **OPENOBSERVE_PASSWORD** : use this password to authenticate to OpenObserve if the + password is not empty (default: "") +- **OPENOBSERVE_CUSTOMHEADERS** : a list of comma separated custom headers to add, + syntax is "key:value,key:value" #### Slack/Rocketchat/Mattermost/Googlechat Message Formatting diff --git a/config.go b/config.go index 8f4be579d..8985aefd4 100644 --- a/config.go +++ b/config.go @@ -24,6 +24,7 @@ func getConfig() *types.Configuration { Grafana: types.GrafanaOutputConfig{CustomHeaders: make(map[string]string)}, Loki: types.LokiOutputConfig{CustomHeaders: make(map[string]string)}, Elasticsearch: types.ElasticsearchOutputConfig{CustomHeaders: make(map[string]string)}, + OpenObserve: types.OpenObserveConfig{CustomHeaders: make(map[string]string)}, Webhook: types.WebhookOutputConfig{CustomHeaders: make(map[string]string)}, Alertmanager: types.AlertmanagerOutputConfig{ExtraLabels: make(map[string]string), ExtraAnnotations: make(map[string]string)}, CloudEvents: types.CloudEventsOutputConfig{Extensions: make(map[string]string)}, @@ -442,6 +443,15 @@ func getConfig() *types.Configuration { v.SetDefault("Telegram.MinimumPriority", "") v.SetDefault("Telegram.CheckCert", true) + v.SetDefault("OpenObserve.HostPort", "") + v.SetDefault("OpenObserve.OrganizationName", "default") + v.SetDefault("OpenObserve.StreamName", "falco") + v.SetDefault("OpenObserve.MinimumPriority", "") + v.SetDefault("OpenObserve.MutualTls", false) + v.SetDefault("OpenObserve.CheckCert", true) + v.SetDefault("OpenObserve.Username", "") + v.SetDefault("OpenObserve.Password", "") + v.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) v.AutomaticEnv() if *configFile != "" { @@ -621,6 +631,8 @@ func getConfig() *types.Configuration { c.TimescaleDB.MinimumPriority = checkPriority(c.TimescaleDB.MinimumPriority) c.Redis.MinimumPriority = checkPriority(c.Redis.MinimumPriority) c.Telegram.MinimumPriority = checkPriority(c.Telegram.MinimumPriority) + c.N8N.MinimumPriority = checkPriority(c.N8N.MinimumPriority) + c.OpenObserve.MinimumPriority = checkPriority(c.OpenObserve.MinimumPriority) c.Slack.MessageFormatTemplate = getMessageFormatTemplate("Slack", c.Slack.MessageFormat) c.Rocketchat.MessageFormatTemplate = getMessageFormatTemplate("Rocketchat", c.Rocketchat.MessageFormat) diff --git a/config_example.yaml b/config_example.yaml index fd5558edc..695b3732d 100644 --- a/config_example.yaml +++ b/config_example.yaml @@ -443,3 +443,15 @@ n8n: # headerauthvalue: "" # Header Auth Value to authenticate with N8N # checkcert: true # check if ssl certificate of the output is valid (default: true) # minimumpriority: "" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default) + +openobserve: + # hostport: "" # http://{domain or ip}:{port}, if not empty, OpenObserve output is enabled + # organizationName: "default" # Organization name (default: default) + # streamName: "falco" # Stream name (default: falco) + # minimumpriority: "" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default) + # mutualtls: false # if true, checkcert flag will be ignored (server cert will always be checked) + # checkcert: true # check if ssl certificate of the output is valid (default: true) + # username: "a" # use this username to authenticate to OpenObserve if the username is not empty (default: "") + # password: "" # use this password to authenticate to OpenObserve if the password is not empty (default: "") + # customHeaders: # Custom headers to add in POST, useful for Authentication + # key: value \ No newline at end of file diff --git a/handlers.go b/handlers.go index 4d566a1b5..794a4ba2d 100644 --- a/handlers.go +++ b/handlers.go @@ -394,4 +394,8 @@ func forwardEvent(falcopayload types.FalcoPayload) { if config.N8N.Address != "" && (falcopayload.Priority >= types.Priority(config.N8N.MinimumPriority) || falcopayload.Rule == testRule) { go n8nClient.N8NPost(falcopayload) } + + if config.OpenObserve.HostPort != "" && (falcopayload.Priority >= types.Priority(config.OpenObserve.MinimumPriority) || falcopayload.Rule == testRule) { + go openObserveClient.OpenObservePost(falcopayload) + } } diff --git a/main.go b/main.go index afca67d9f..912332382 100644 --- a/main.go +++ b/main.go @@ -68,6 +68,7 @@ var ( redisClient *outputs.Client telegramClient *outputs.Client n8nClient *outputs.Client + openObserveClient *outputs.Client statsdClient, dogstatsdClient *statsd.Client config *types.Configuration @@ -703,6 +704,16 @@ func init() { } } + if config.OpenObserve.HostPort != "" { + var err error + openObserveClient, err = outputs.NewClient("OpenObserve", config.OpenObserve.HostPort+"/api/"+config.OpenObserve.OrganizationName+"/"+config.OpenObserve.StreamName+"/_multi", config.OpenObserve.MutualTLS, config.OpenObserve.CheckCert, config, stats, promStats, statsdClient, dogstatsdClient) + if err != nil { + config.OpenObserve.HostPort = "" + } else { + outputs.EnabledOutputs = append(outputs.EnabledOutputs, "OpenObserve") + } + } + log.Printf("[INFO] : Falco Sidekick version: %s\n", GetVersionInfo().GitVersion) log.Printf("[INFO] : Enabled Outputs : %s\n", outputs.EnabledOutputs) diff --git a/outputs/openobserve.go b/outputs/openobserve.go new file mode 100644 index 000000000..dee8759ee --- /dev/null +++ b/outputs/openobserve.go @@ -0,0 +1,40 @@ +package outputs + +import ( + "log" + + "github.com/falcosecurity/falcosidekick/types" +) + +// OpenObservePost posts event to OpenObserve +func (c *Client) OpenObservePost(falcopayload types.FalcoPayload) { + c.Stats.OpenObserve.Add(Total, 1) + + if c.Config.OpenObserve.Username != "" && c.Config.OpenObserve.Password != "" { + c.httpClientLock.Lock() + defer c.httpClientLock.Unlock() + c.BasicAuth(c.Config.OpenObserve.Username, c.Config.OpenObserve.Password) + } + + for i, j := range c.Config.OpenObserve.CustomHeaders { + c.AddHeader(i, j) + } + + if err := c.Post(falcopayload); err != nil { + c.setOpenObserveErrorMetrics() + log.Printf("[ERROR] : OpenObserve - %v\n", err) + return + } + + // Setting the success status + go c.CountMetric(Outputs, 1, []string{"output:openobserve", "status:ok"}) + c.Stats.OpenObserve.Add(OK, 1) + c.PromStats.Outputs.With(map[string]string{"destination": "openobserve", "status": OK}).Inc() +} + +// setOpenObserveErrorMetrics set the error stats +func (c *Client) setOpenObserveErrorMetrics() { + go c.CountMetric(Outputs, 1, []string{"output:openobserve", "status:error"}) + c.Stats.OpenObserve.Add(Error, 1) + c.PromStats.Outputs.With(map[string]string{"destination": "openobserve", "status": Error}).Inc() +} diff --git a/stats.go b/stats.go index 2ca0d10ad..c7b581eba 100644 --- a/stats.go +++ b/stats.go @@ -79,6 +79,7 @@ func getInitStats() *types.Statistics { Redis: getOutputNewMap("redis"), Telegram: getOutputNewMap("telegram"), N8N: getOutputNewMap("n8n"), + OpenObserve: getOutputNewMap("openobserve"), } stats.Falco.Add(outputs.Emergency, 0) stats.Falco.Add(outputs.Alert, 0) diff --git a/types/types.go b/types/types.go index 0e7b30423..fbf3cfe16 100644 --- a/types/types.go +++ b/types/types.go @@ -102,6 +102,7 @@ type Configuration struct { Redis RedisConfig Telegram TelegramConfig N8N N8NConfig + OpenObserve OpenObserveConfig } // SlackOutputConfig represents parameters for Slack @@ -675,6 +676,19 @@ type N8NConfig struct { CheckCert bool } +// OpenObserveConfig represents config parameters for OpenObserve +type OpenObserveConfig struct { + HostPort string + OrganizationName string + StreamName string + MinimumPriority string + Username string + Password string + CheckCert bool + MutualTLS bool + CustomHeaders map[string]string +} + // Statistics is a struct to store stastics type Statistics struct { Requests *expvar.Map @@ -738,6 +752,7 @@ type Statistics struct { Redis *expvar.Map Telegram *expvar.Map N8N *expvar.Map + OpenObserve *expvar.Map } // PromStatistics is a struct to store prometheus metrics