diff --git a/README.md b/README.md index dc3a5493a..4911da574 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,7 @@ Currently available outputs are : * [**AWS Lambda**](https://aws.amazon.com/lambda/features/) * [**AWS SQS**](https://aws.amazon.com/sqs/features/) * [**AWS SNS**](https://aws.amazon.com/sns/features/) +* [**AWS CloudWatchLogs**](https://aws.amazon.com/cloudwatch/features/) * **SMTP** (email) * [**Opsgenie**](https://www.opsgenie.com/) * [**StatsD**](https://github.com/statsd/statsd) (for monitoring of `falcosidekick`) @@ -194,6 +195,10 @@ aws: # topicarn : "" # SNS TopicArn, if not empty, AWS SNS output is enabled rawjson: false # Send Raw JSON or parse it (default: false) # minimumpriority: "" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default) + cloudwatchlogs: + # loggroup : "" # AWS CloudWatch Logs Group name, if not empty, CloudWatch Logs output is enabled + # logstream : "" # AWS CloudWatch Logs Stream name, if empty, Falcosidekick will try to create a log stream + # minimumpriority: "" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default) smtp: # hostport: "" # host:port address of SMTP server, if not empty, SMTP output is enabled @@ -252,7 +257,7 @@ googlechat: kafka: url: "" # Apache Kafka URL (ex: http://kafka). Defaults to port 9092 if no port is specified after the domain, if not empty, Kafka output is enabled topic: "" # Name of the topic, if not empty, Kafka output is enabled - # partition: 0 # Partition number of the topic. + # partition: 0 # Partition number of the topic. # minimumpriority: "debug" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default) ``` @@ -332,6 +337,8 @@ The *env vars* "match" field names in *yaml file with this structure (**take car * **AWS_SNS_TOPICARN** : AWS SNS TopicARN, if not empty, AWS SNS output is *enabled* * **AWS_SNS_RAWJSON** : Send Raw JSON or parse it (default: false) * **AWS_SNS_MINIMUMPRIORITY** : minimum priority of event for using this output, order is `emergency|alert|critical|error|warning|notice|informational|debug or "" (default)` +* **AWS_CLOUDWATCHLOGS_LOGGROUP** : AWS CloudWatch Logs Group name, if not empty, CloudWatch Logs output is enabled +* **AWS_CLOUDWATCHLOGS_LOGSTREAM** : AWS CloudWatch Logs Stream name, if empty, FalcoSideKick will try to create a log stream * **SMTP_HOSTPORT** : "host:port" address of SMTP server, if not empty, SMTP output is *enabled* * **SMTP_USER** : user to access SMTP server * **SMTP_PASSWORD** : password to access SMTP server @@ -415,6 +422,81 @@ The daemon exposes a `prometheus` endpoint on URI `/metrics`. The daemon is able to push its metrics to a StatsD/DogstatsD server. See [Configuration](https://github.com/falcosecurity/falcosidekick#configuration) section for how-to. + +### AWS Policy example + +When using the AWS output you will need to set the AWS keys with some permissions to access the resources you selected to use, like +`SQS`, `Lambda`, `SNS` and `CloudWatchLogs` + +#### CloudWatch Logs Sample Policy + +```json +{ + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "cloudwacthlogs", + "Effect": "Allow", + "Action": [ + "logs:CreateLogStream", + "logs:DescribeLogStreams", + "logs:PutRetentionPolicy", + "logs:PutLogEvents" + ], + "Resource": "*" + } + ] +} +``` + +#### SQS Sample Policy + +```json +{ + "Version": "2012-10-17", + "Id": "sqs", + "Statement": [{ + "Sid":"sendMessage", + "Effect": "Allow", + "Principal": "*", + "Action": "sqs:SendMessage", + "Resource": "arn:aws:sqs:*:111122223333:queue1" + }] +} +``` + +#### SNS Sample Policy + +```json +{ + "Version": "2012-10-17", + "Id": "sns", + "Statement": [{ + "Sid":"publish", + "Effect": "Allow", + "Principal": "*", + "Action": "sns:Publish", + "Resource": "arn:aws:sqs:*:111122223333:queue1" + }] +} +``` + +#### Lambda Sample Policy + +```json +{ + "Version": "2012-10-17", + "Id": "lambda", + "Statement": [{ + "Sid":"invoke", + "Effect": "Allow", + "Principal": "*", + "Action": "lambda:InvokeFunction", + "Resource": "*" + }] +} +``` + ## Examples Run you daemon and try (from falco's documentation) : diff --git a/config.go b/config.go index 5e98d3704..d5939fe5b 100644 --- a/config.go +++ b/config.go @@ -85,6 +85,9 @@ func getConfig() *types.Configuration { v.SetDefault("AWS.SNS.TopicArn", "") v.SetDefault("AWS.SNS.MinimumPriority", "") v.SetDefault("AWS.SNS.RawJSON", false) + v.SetDefault("AWS.CloudWatchLogs.LogGroup", "") + v.SetDefault("AWS.CloudWatchLogs.LogStream", "") + v.SetDefault("AWS.CloudWatchLogs.MinimumPriority", "") v.SetDefault("SMTP.HostPort", "") v.SetDefault("SMTP.User", "") v.SetDefault("SMTP.Password", "") @@ -175,6 +178,7 @@ func getConfig() *types.Configuration { c.AWS.Lambda.MinimumPriority = checkPriority(c.AWS.Lambda.MinimumPriority) c.AWS.SQS.MinimumPriority = checkPriority(c.AWS.SQS.MinimumPriority) c.AWS.SNS.MinimumPriority = checkPriority(c.AWS.SNS.MinimumPriority) + c.AWS.CloudWatchLogs.MinimumPriority = checkPriority(c.AWS.CloudWatchLogs.MinimumPriority) c.Opsgenie.MinimumPriority = checkPriority(c.Opsgenie.MinimumPriority) c.Webhook.MinimumPriority = checkPriority(c.Webhook.MinimumPriority) c.Azure.EventHub.MinimumPriority = checkPriority(c.Azure.EventHub.MinimumPriority) diff --git a/config_example.yaml b/config_example.yaml index de99d01fa..be4f3510d 100644 --- a/config_example.yaml +++ b/config_example.yaml @@ -83,6 +83,10 @@ aws: # topicarn : "" # SNS TopicArn, if not empty, AWS SNS output is enabled rawjson: false # Send Raw JSON or parse it (default: false) # minimumpriority: "" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default) + cloudwatchlogs: + # loggroup : "" # AWS CloudWatch Logs Group name, if not empty, CloudWatch Logs output is enabled + # logstream : "" # AWS CloudWatch Logs Stream name, if empty, Falcosidekick will try to create a log stream + # minimumpriority: "" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default) smtp: # hostport: "" # host:port address of SMTP server, if not empty, SMTP output is enabled @@ -141,5 +145,5 @@ googlechat: kafka: url: "" # Apache Kafka URL (ex: http://kafka). Defaults to port 9092 if no port is specified after the domain, if not empty, Kafka output is enabled topic: "" # Name of the topic, if not empty, Kafka output is enabled - # partition: 0 # Partition number of the topic. + # partition: 0 # Partition number of the topic. # minimumpriority: "debug" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default) diff --git a/handlers.go b/handlers.go index 4614f2aee..a1c9a55b3 100644 --- a/handlers.go +++ b/handlers.go @@ -184,6 +184,10 @@ func forwardEvent(falcopayload types.FalcoPayload) { go awsClient.PublishTopic(falcopayload) } + if config.AWS.CloudWatchLogs.LogGroup != "" && (priorityMap[strings.ToLower(falcopayload.Priority)] >= priorityMap[strings.ToLower(config.AWS.CloudWatchLogs.MinimumPriority)] || falcopayload.Rule == TestRule) { + go awsClient.SendCloudWatchLog(falcopayload) + } + if config.SMTP.HostPort != "" && (priorityMap[strings.ToLower(falcopayload.Priority)] >= priorityMap[strings.ToLower(config.SMTP.MinimumPriority)] || falcopayload.Rule == TestRule) { go smtpClient.SendMail(falcopayload) } @@ -191,6 +195,7 @@ func forwardEvent(falcopayload types.FalcoPayload) { if config.Opsgenie.APIKey != "" && (priorityMap[strings.ToLower(falcopayload.Priority)] >= priorityMap[strings.ToLower(config.Opsgenie.MinimumPriority)] || falcopayload.Rule == TestRule) { go opsgenieClient.OpsgeniePost(falcopayload) } + if config.Webhook.Address != "" && (priorityMap[strings.ToLower(falcopayload.Priority)] >= priorityMap[strings.ToLower(config.Webhook.MinimumPriority)] || falcopayload.Rule == TestRule) { go webhookClient.WebhookPost(falcopayload) } diff --git a/main.go b/main.go index 2b8badf7d..f8d52c600 100644 --- a/main.go +++ b/main.go @@ -194,7 +194,8 @@ func init() { } } - if config.AWS.Lambda.FunctionName != "" || config.AWS.SQS.URL != "" || config.AWS.SNS.TopicArn != "" { + if config.AWS.Lambda.FunctionName != "" || config.AWS.SQS.URL != "" || + config.AWS.SNS.TopicArn != "" || config.AWS.CloudWatchLogs.LogGroup != "" { var err error awsClient, err = outputs.NewAWSClient(config, stats, promStats, statsdClient, dogstatsdClient) if err != nil { @@ -204,6 +205,8 @@ func init() { config.AWS.Lambda.FunctionName = "" config.AWS.SQS.URL = "" config.AWS.SNS.TopicArn = "" + config.AWS.CloudWatchLogs.LogGroup = "" + config.AWS.CloudWatchLogs.LogStream = "" } else { if config.AWS.Lambda.FunctionName != "" { enabledOutputsText += "AWSLambda " @@ -214,6 +217,9 @@ func init() { if config.AWS.SNS.TopicArn != "" { enabledOutputsText += "AWSSNS " } + if config.AWS.CloudWatchLogs.LogGroup != "" { + enabledOutputsText += "AWSCloudWatchLogs " + } } } diff --git a/outputs/aws.go b/outputs/aws.go index a3c69047f..a9e0454b2 100644 --- a/outputs/aws.go +++ b/outputs/aws.go @@ -7,12 +7,15 @@ import ( "log" "net/url" "os" + "time" "github.com/falcosecurity/falcosidekick/types" "github.com/DataDog/datadog-go/statsd" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/cloudwatchlogs" "github.com/aws/aws-sdk-go/service/lambda" "github.com/aws/aws-sdk-go/service/sns" "github.com/aws/aws-sdk-go/service/sqs" @@ -124,7 +127,7 @@ func (c *Client) SendMessage(falcopayload types.FalcoPayload) { log.Printf("[INFO] : %v SQS - Send Message OK (%v)\n", c.OutputType, *resp.MessageId) go c.CountMetric("outputs", 1, []string{"output:awssqs", "status:ok"}) - c.Stats.AWSSQS.Add("ok", 1) + c.Stats.AWSSQS.Add(OK, 1) c.PromStats.Outputs.With(map[string]string{"destination": "awssqs", "status": "ok"}).Inc() } @@ -189,3 +192,80 @@ func (c *Client) PublishTopic(falcopayload types.FalcoPayload) { c.Stats.AWSSNS.Add(OK, 1) c.PromStats.Outputs.With(map[string]string{"destination": "awssns", "status": OK}).Inc() } + +// SendCloudWatchLog sends a message to CloudWatch Log +func (c *Client) SendCloudWatchLog(falcopayload types.FalcoPayload) { + svc := cloudwatchlogs.New(c.AWSSession) + + f, _ := json.Marshal(falcopayload) + + c.Stats.AWSCloudWatchLogs.Add(Total, 1) + + if c.Config.AWS.CloudWatchLogs.LogStream == "" { + streamName := "falcosidekick-logstream" + log.Printf("[INFO] : %v CloudWatchLogs - Log Stream not configured creating one called %s\n", c.OutputType, streamName) + inputLogStream := &cloudwatchlogs.CreateLogStreamInput{ + LogGroupName: aws.String(c.Config.AWS.CloudWatchLogs.LogGroup), + LogStreamName: aws.String(streamName), + } + + _, err := svc.CreateLogStream(inputLogStream) + if err != nil { + if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == cloudwatchlogs.ErrCodeResourceAlreadyExistsException { + log.Printf("[INFO] : %v CloudWatchLogs - Log Stream %s already exist, reusing...\n", c.OutputType, streamName) + } else { + go c.CountMetric("outputs", 1, []string{"output:awscloudwatchlogs", "status:error"}) + c.Stats.AWSCloudWatchLogs.Add(Error, 1) + c.PromStats.Outputs.With(map[string]string{"destination": "awscloudwatchlogs", "status": Error}).Inc() + log.Printf("[ERROR] : %v CloudWatchLogs - %v\n", c.OutputType, err.Error()) + return + } + } + + c.Config.AWS.CloudWatchLogs.LogStream = streamName + } + + logevent := &cloudwatchlogs.InputLogEvent{ + Message: aws.String(string(f)), + Timestamp: aws.Int64(falcopayload.Time.UnixNano() / int64(time.Millisecond)), + } + + input := &cloudwatchlogs.PutLogEventsInput{ + LogEvents: []*cloudwatchlogs.InputLogEvent{logevent}, + LogGroupName: aws.String(c.Config.AWS.CloudWatchLogs.LogGroup), + LogStreamName: aws.String(c.Config.AWS.CloudWatchLogs.LogStream), + } + + var err error + resp := &cloudwatchlogs.PutLogEventsOutput{} + resp, err = c.putLogEvents(svc, input) + if err != nil { + go c.CountMetric("outputs", 1, []string{"output:awscloudwatchlogs", "status:error"}) + c.Stats.AWSCloudWatchLogs.Add(Error, 1) + c.PromStats.Outputs.With(map[string]string{"destination": "awscloudwatchlogs", "status": Error}).Inc() + log.Printf("[ERROR] : %v CloudWatchLogs - %v\n", c.OutputType, err.Error()) + return + } + + log.Printf("[INFO] : %v CloudWatchLogs - Send Log OK (%v)\n", c.OutputType, resp.String()) + go c.CountMetric("outputs", 1, []string{"output:awscloudwatchlogs", "status:ok"}) + c.Stats.AWSCloudWatchLogs.Add(OK, 1) + c.PromStats.Outputs.With(map[string]string{"destination": "awscloudwatchlogs", "status": OK}).Inc() +} + +// PutLogEvents will attempt to execute and handle invalid tokens. +func (c *Client) putLogEvents(svc *cloudwatchlogs.CloudWatchLogs, input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { + resp, err := svc.PutLogEvents(input) + if err != nil { + if exception, ok := err.(*cloudwatchlogs.InvalidSequenceTokenException); ok { + log.Printf("[INFO] : %v Refreshing token for LogGroup: %s LogStream: %s", c.OutputType, *input.LogGroupName, *input.LogStreamName) + input.SequenceToken = exception.ExpectedSequenceToken + + return c.putLogEvents(svc, input) + } + + return nil, err + } + + return resp, nil +} diff --git a/stats.go b/stats.go index f58f30630..8987eec87 100644 --- a/stats.go +++ b/stats.go @@ -17,33 +17,34 @@ func getInitStats() *types.Statistics { })) stats = &types.Statistics{ - Requests: getInputNewMap("requests"), - FIFO: getInputNewMap("fifo"), - GRPC: getInputNewMap("grpc"), - Falco: expvar.NewMap("falco.priority"), - Slack: getOutputNewMap("slack"), - Rocketchat: getOutputNewMap("rocketchat"), - Mattermost: getOutputNewMap("mattermost"), - Teams: getOutputNewMap("teams"), - Datadog: getOutputNewMap("datadog"), - Discord: getOutputNewMap("discord"), - Alertmanager: getOutputNewMap("alertmanager"), - Elasticsearch: getOutputNewMap("elasticsearch"), - Loki: getOutputNewMap("loki"), - Nats: getOutputNewMap("nats"), - Influxdb: getOutputNewMap("influxdb"), - AWSLambda: getOutputNewMap("awslambda"), - AWSSQS: getOutputNewMap("awssqs"), - AWSSNS: getOutputNewMap("awssns"), - SMTP: getOutputNewMap("smtp"), - Opsgenie: getOutputNewMap("opsgenie"), - Statsd: getOutputNewMap("statsd"), - Dogstatsd: getOutputNewMap("dogstatsd"), - Webhook: getOutputNewMap("webhook"), - AzureEventHub: getOutputNewMap("azureeventhub"), - GCPPubSub: getOutputNewMap("gcppubsub"), - GoogleChat: getOutputNewMap("googlechat"), - Kafka: getOutputNewMap("kafka"), + Requests: getInputNewMap("requests"), + FIFO: getInputNewMap("fifo"), + GRPC: getInputNewMap("grpc"), + Falco: expvar.NewMap("falco.priority"), + Slack: getOutputNewMap("slack"), + Rocketchat: getOutputNewMap("rocketchat"), + Mattermost: getOutputNewMap("mattermost"), + Teams: getOutputNewMap("teams"), + Datadog: getOutputNewMap("datadog"), + Discord: getOutputNewMap("discord"), + Alertmanager: getOutputNewMap("alertmanager"), + Elasticsearch: getOutputNewMap("elasticsearch"), + Loki: getOutputNewMap("loki"), + Nats: getOutputNewMap("nats"), + Influxdb: getOutputNewMap("influxdb"), + AWSLambda: getOutputNewMap("awslambda"), + AWSSQS: getOutputNewMap("awssqs"), + AWSSNS: getOutputNewMap("awssns"), + AWSCloudWatchLogs: getOutputNewMap("awscloudwatchlogs"), + SMTP: getOutputNewMap("smtp"), + Opsgenie: getOutputNewMap("opsgenie"), + Statsd: getOutputNewMap("statsd"), + Dogstatsd: getOutputNewMap("dogstatsd"), + Webhook: getOutputNewMap("webhook"), + AzureEventHub: getOutputNewMap("azureeventhub"), + GCPPubSub: getOutputNewMap("gcppubsub"), + GoogleChat: getOutputNewMap("googlechat"), + Kafka: getOutputNewMap("kafka"), } stats.Falco.Add("emergency", 0) stats.Falco.Add("alert", 0) diff --git a/types/types.go b/types/types.go index eb69076ec..7e64bc9fd 100644 --- a/types/types.go +++ b/types/types.go @@ -138,6 +138,7 @@ type awsOutputConfig struct { Lambda awsLambdaConfig SQS awsSQSConfig SNS awsSNSConfig + CloudWatchLogs awsCloudWatchLogs } type awsLambdaConfig struct { @@ -158,6 +159,12 @@ type awsSNSConfig struct { MinimumPriority string } +type awsCloudWatchLogs struct { + LogGroup string + LogStream string + MinimumPriority string +} + type smtpOutputConfig struct { HostPort string User string @@ -226,33 +233,34 @@ type kafkaConfig struct { // Statistics is a struct to store stastics type Statistics struct { - Requests *expvar.Map - FIFO *expvar.Map - GRPC *expvar.Map - Falco *expvar.Map - Slack *expvar.Map - Mattermost *expvar.Map - Rocketchat *expvar.Map - Teams *expvar.Map - Datadog *expvar.Map - Discord *expvar.Map - Alertmanager *expvar.Map - Elasticsearch *expvar.Map - Loki *expvar.Map - Nats *expvar.Map - Influxdb *expvar.Map - AWSLambda *expvar.Map - AWSSQS *expvar.Map - AWSSNS *expvar.Map - SMTP *expvar.Map - Opsgenie *expvar.Map - Statsd *expvar.Map - Dogstatsd *expvar.Map - Webhook *expvar.Map - AzureEventHub *expvar.Map - GCPPubSub *expvar.Map - GoogleChat *expvar.Map - Kafka *expvar.Map + Requests *expvar.Map + FIFO *expvar.Map + GRPC *expvar.Map + Falco *expvar.Map + Slack *expvar.Map + Mattermost *expvar.Map + Rocketchat *expvar.Map + Teams *expvar.Map + Datadog *expvar.Map + Discord *expvar.Map + Alertmanager *expvar.Map + Elasticsearch *expvar.Map + Loki *expvar.Map + Nats *expvar.Map + Influxdb *expvar.Map + AWSLambda *expvar.Map + AWSSQS *expvar.Map + AWSSNS *expvar.Map + AWSCloudWatchLogs *expvar.Map + SMTP *expvar.Map + Opsgenie *expvar.Map + Statsd *expvar.Map + Dogstatsd *expvar.Map + Webhook *expvar.Map + AzureEventHub *expvar.Map + GCPPubSub *expvar.Map + GoogleChat *expvar.Map + Kafka *expvar.Map } // PromStatistics is a struct to store prometheus metrics