Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Cloudwatch Logs Output #127

Merged
merged 1 commit into from
Nov 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 83 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ Currently available outputs are :
* [**AWS Lambda**](https://aws.amazon.com/lambda/features/)
* [**AWS SQS**](https://aws.amazon.com/sqs/features/)
* [**AWS SNS**](https://aws.amazon.com/sns/features/)
* [**AWS CloudWatchLogs**](https://aws.amazon.com/cloudwatch/features/)
* **SMTP** (email)
* [**Opsgenie**](https://www.opsgenie.com/)
* [**StatsD**](https://github.com/statsd/statsd) (for monitoring of `falcosidekick`)
Expand Down Expand Up @@ -194,6 +195,10 @@ aws:
# topicarn : "" # SNS TopicArn, if not empty, AWS SNS output is enabled
rawjson: false # Send Raw JSON or parse it (default: false)
# minimumpriority: "" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default)
cloudwatchlogs:
# loggroup : "" # AWS CloudWatch Logs Group name, if not empty, CloudWatch Logs output is enabled
# logstream : "" # AWS CloudWatch Logs Stream name, if empty, Falcosidekick will try to create a log stream
# minimumpriority: "" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default)

smtp:
# hostport: "" # host:port address of SMTP server, if not empty, SMTP output is enabled
Expand Down Expand Up @@ -252,7 +257,7 @@ googlechat:
kafka:
url: "" # Apache Kafka URL (ex: http://kafka). Defaults to port 9092 if no port is specified after the domain, if not empty, Kafka output is enabled
topic: "" # Name of the topic, if not empty, Kafka output is enabled
# partition: 0 # Partition number of the topic.
# partition: 0 # Partition number of the topic.
# minimumpriority: "debug" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default)
```

Expand Down Expand Up @@ -332,6 +337,8 @@ The *env vars* "match" field names in *yaml file with this structure (**take car
* **AWS_SNS_TOPICARN** : AWS SNS TopicARN, if not empty, AWS SNS output is *enabled*
* **AWS_SNS_RAWJSON** : Send Raw JSON or parse it (default: false)
* **AWS_SNS_MINIMUMPRIORITY** : minimum priority of event for using this output, order is `emergency|alert|critical|error|warning|notice|informational|debug or "" (default)`
* **AWS_CLOUDWATCHLOGS_LOGGROUP** : AWS CloudWatch Logs Group name, if not empty, CloudWatch Logs output is enabled
* **AWS_CLOUDWATCHLOGS_LOGSTREAM** : AWS CloudWatch Logs Stream name, if empty, FalcoSideKick will try to create a log stream
* **SMTP_HOSTPORT** : "host:port" address of SMTP server, if not empty, SMTP output is *enabled*
* **SMTP_USER** : user to access SMTP server
* **SMTP_PASSWORD** : password to access SMTP server
Expand Down Expand Up @@ -415,6 +422,81 @@ The daemon exposes a `prometheus` endpoint on URI `/metrics`.

The daemon is able to push its metrics to a StatsD/DogstatsD server. See [Configuration](https://github.com/falcosecurity/falcosidekick#configuration) section for how-to.


### AWS Policy example

When using the AWS output you will need to set the AWS keys with some permissions to access the resources you selected to use, like
`SQS`, `Lambda`, `SNS` and `CloudWatchLogs`

#### CloudWatch Logs Sample Policy

```json
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "cloudwacthlogs",
"Effect": "Allow",
"Action": [
"logs:CreateLogStream",
"logs:DescribeLogStreams",
"logs:PutRetentionPolicy",
"logs:PutLogEvents"
],
"Resource": "*"
}
]
}
```

#### SQS Sample Policy

```json
{
"Version": "2012-10-17",
"Id": "sqs",
"Statement": [{
"Sid":"sendMessage",
"Effect": "Allow",
"Principal": "*",
"Action": "sqs:SendMessage",
"Resource": "arn:aws:sqs:*:111122223333:queue1"
}]
}
```

#### SNS Sample Policy

```json
{
"Version": "2012-10-17",
"Id": "sns",
"Statement": [{
"Sid":"publish",
"Effect": "Allow",
"Principal": "*",
"Action": "sns:Publish",
"Resource": "arn:aws:sqs:*:111122223333:queue1"
}]
}
```

#### Lambda Sample Policy

```json
{
"Version": "2012-10-17",
"Id": "lambda",
"Statement": [{
"Sid":"invoke",
"Effect": "Allow",
"Principal": "*",
"Action": "lambda:InvokeFunction",
"Resource": "*"
}]
}
```

## Examples

Run you daemon and try (from falco's documentation) :
Expand Down
4 changes: 4 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ func getConfig() *types.Configuration {
v.SetDefault("AWS.SNS.TopicArn", "")
v.SetDefault("AWS.SNS.MinimumPriority", "")
v.SetDefault("AWS.SNS.RawJSON", false)
v.SetDefault("AWS.CloudWatchLogs.LogGroup", "")
v.SetDefault("AWS.CloudWatchLogs.LogStream", "")
v.SetDefault("AWS.CloudWatchLogs.MinimumPriority", "")
v.SetDefault("SMTP.HostPort", "")
v.SetDefault("SMTP.User", "")
v.SetDefault("SMTP.Password", "")
Expand Down Expand Up @@ -175,6 +178,7 @@ func getConfig() *types.Configuration {
c.AWS.Lambda.MinimumPriority = checkPriority(c.AWS.Lambda.MinimumPriority)
c.AWS.SQS.MinimumPriority = checkPriority(c.AWS.SQS.MinimumPriority)
c.AWS.SNS.MinimumPriority = checkPriority(c.AWS.SNS.MinimumPriority)
c.AWS.CloudWatchLogs.MinimumPriority = checkPriority(c.AWS.CloudWatchLogs.MinimumPriority)
c.Opsgenie.MinimumPriority = checkPriority(c.Opsgenie.MinimumPriority)
c.Webhook.MinimumPriority = checkPriority(c.Webhook.MinimumPriority)
c.Azure.EventHub.MinimumPriority = checkPriority(c.Azure.EventHub.MinimumPriority)
Expand Down
6 changes: 5 additions & 1 deletion config_example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ aws:
# topicarn : "" # SNS TopicArn, if not empty, AWS SNS output is enabled
rawjson: false # Send Raw JSON or parse it (default: false)
# minimumpriority: "" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default)
cloudwatchlogs:
# loggroup : "" # AWS CloudWatch Logs Group name, if not empty, CloudWatch Logs output is enabled
# logstream : "" # AWS CloudWatch Logs Stream name, if empty, Falcosidekick will try to create a log stream
# minimumpriority: "" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default)

smtp:
# hostport: "" # host:port address of SMTP server, if not empty, SMTP output is enabled
Expand Down Expand Up @@ -141,5 +145,5 @@ googlechat:
kafka:
url: "" # Apache Kafka URL (ex: http://kafka). Defaults to port 9092 if no port is specified after the domain, if not empty, Kafka output is enabled
topic: "" # Name of the topic, if not empty, Kafka output is enabled
# partition: 0 # Partition number of the topic.
# partition: 0 # Partition number of the topic.
# minimumpriority: "debug" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default)
5 changes: 5 additions & 0 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,18 @@ func forwardEvent(falcopayload types.FalcoPayload) {
go awsClient.PublishTopic(falcopayload)
}

if config.AWS.CloudWatchLogs.LogGroup != "" && (priorityMap[strings.ToLower(falcopayload.Priority)] >= priorityMap[strings.ToLower(config.AWS.CloudWatchLogs.MinimumPriority)] || falcopayload.Rule == TestRule) {
go awsClient.SendCloudWatchLog(falcopayload)
}

if config.SMTP.HostPort != "" && (priorityMap[strings.ToLower(falcopayload.Priority)] >= priorityMap[strings.ToLower(config.SMTP.MinimumPriority)] || falcopayload.Rule == TestRule) {
go smtpClient.SendMail(falcopayload)
}

if config.Opsgenie.APIKey != "" && (priorityMap[strings.ToLower(falcopayload.Priority)] >= priorityMap[strings.ToLower(config.Opsgenie.MinimumPriority)] || falcopayload.Rule == TestRule) {
go opsgenieClient.OpsgeniePost(falcopayload)
}

if config.Webhook.Address != "" && (priorityMap[strings.ToLower(falcopayload.Priority)] >= priorityMap[strings.ToLower(config.Webhook.MinimumPriority)] || falcopayload.Rule == TestRule) {
go webhookClient.WebhookPost(falcopayload)
}
Expand Down
8 changes: 7 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ func init() {
}
}

if config.AWS.Lambda.FunctionName != "" || config.AWS.SQS.URL != "" || config.AWS.SNS.TopicArn != "" {
if config.AWS.Lambda.FunctionName != "" || config.AWS.SQS.URL != "" ||
config.AWS.SNS.TopicArn != "" || config.AWS.CloudWatchLogs.LogGroup != "" {
var err error
awsClient, err = outputs.NewAWSClient(config, stats, promStats, statsdClient, dogstatsdClient)
if err != nil {
Expand All @@ -204,6 +205,8 @@ func init() {
config.AWS.Lambda.FunctionName = ""
config.AWS.SQS.URL = ""
config.AWS.SNS.TopicArn = ""
config.AWS.CloudWatchLogs.LogGroup = ""
config.AWS.CloudWatchLogs.LogStream = ""
} else {
if config.AWS.Lambda.FunctionName != "" {
enabledOutputsText += "AWSLambda "
Expand All @@ -214,6 +217,9 @@ func init() {
if config.AWS.SNS.TopicArn != "" {
enabledOutputsText += "AWSSNS "
}
if config.AWS.CloudWatchLogs.LogGroup != "" {
enabledOutputsText += "AWSCloudWatchLogs "
}
}
}

Expand Down
82 changes: 81 additions & 1 deletion outputs/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ import (
"log"
"net/url"
"os"
"time"

"github.com/falcosecurity/falcosidekick/types"

"github.com/DataDog/datadog-go/statsd"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go/service/lambda"
"github.com/aws/aws-sdk-go/service/sns"
"github.com/aws/aws-sdk-go/service/sqs"
Expand Down Expand Up @@ -124,7 +127,7 @@ func (c *Client) SendMessage(falcopayload types.FalcoPayload) {

log.Printf("[INFO] : %v SQS - Send Message OK (%v)\n", c.OutputType, *resp.MessageId)
go c.CountMetric("outputs", 1, []string{"output:awssqs", "status:ok"})
c.Stats.AWSSQS.Add("ok", 1)
c.Stats.AWSSQS.Add(OK, 1)
c.PromStats.Outputs.With(map[string]string{"destination": "awssqs", "status": "ok"}).Inc()
}

Expand Down Expand Up @@ -189,3 +192,80 @@ func (c *Client) PublishTopic(falcopayload types.FalcoPayload) {
c.Stats.AWSSNS.Add(OK, 1)
c.PromStats.Outputs.With(map[string]string{"destination": "awssns", "status": OK}).Inc()
}

// SendCloudWatchLog sends a message to CloudWatch Log
func (c *Client) SendCloudWatchLog(falcopayload types.FalcoPayload) {
svc := cloudwatchlogs.New(c.AWSSession)

f, _ := json.Marshal(falcopayload)

c.Stats.AWSCloudWatchLogs.Add(Total, 1)

if c.Config.AWS.CloudWatchLogs.LogStream == "" {
streamName := "falcosidekick-logstream"
log.Printf("[INFO] : %v CloudWatchLogs - Log Stream not configured creating one called %s\n", c.OutputType, streamName)
inputLogStream := &cloudwatchlogs.CreateLogStreamInput{
LogGroupName: aws.String(c.Config.AWS.CloudWatchLogs.LogGroup),
LogStreamName: aws.String(streamName),
}

_, err := svc.CreateLogStream(inputLogStream)
if err != nil {
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == cloudwatchlogs.ErrCodeResourceAlreadyExistsException {
log.Printf("[INFO] : %v CloudWatchLogs - Log Stream %s already exist, reusing...\n", c.OutputType, streamName)
Issif marked this conversation as resolved.
Show resolved Hide resolved
} else {
go c.CountMetric("outputs", 1, []string{"output:awscloudwatchlogs", "status:error"})
c.Stats.AWSCloudWatchLogs.Add(Error, 1)
c.PromStats.Outputs.With(map[string]string{"destination": "awscloudwatchlogs", "status": Error}).Inc()
log.Printf("[ERROR] : %v CloudWatchLogs - %v\n", c.OutputType, err.Error())
return
}
}

c.Config.AWS.CloudWatchLogs.LogStream = streamName
}

logevent := &cloudwatchlogs.InputLogEvent{
Message: aws.String(string(f)),
Timestamp: aws.Int64(falcopayload.Time.UnixNano() / int64(time.Millisecond)),
}

input := &cloudwatchlogs.PutLogEventsInput{
LogEvents: []*cloudwatchlogs.InputLogEvent{logevent},
LogGroupName: aws.String(c.Config.AWS.CloudWatchLogs.LogGroup),
LogStreamName: aws.String(c.Config.AWS.CloudWatchLogs.LogStream),
}

var err error
resp := &cloudwatchlogs.PutLogEventsOutput{}
resp, err = c.putLogEvents(svc, input)
if err != nil {
go c.CountMetric("outputs", 1, []string{"output:awscloudwatchlogs", "status:error"})
c.Stats.AWSCloudWatchLogs.Add(Error, 1)
c.PromStats.Outputs.With(map[string]string{"destination": "awscloudwatchlogs", "status": Error}).Inc()
log.Printf("[ERROR] : %v CloudWatchLogs - %v\n", c.OutputType, err.Error())
return
}

log.Printf("[INFO] : %v CloudWatchLogs - Send Log OK (%v)\n", c.OutputType, resp.String())
go c.CountMetric("outputs", 1, []string{"output:awscloudwatchlogs", "status:ok"})
c.Stats.AWSCloudWatchLogs.Add(OK, 1)
c.PromStats.Outputs.With(map[string]string{"destination": "awscloudwatchlogs", "status": OK}).Inc()
}

// PutLogEvents will attempt to execute and handle invalid tokens.
func (c *Client) putLogEvents(svc *cloudwatchlogs.CloudWatchLogs, input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) {
resp, err := svc.PutLogEvents(input)
if err != nil {
if exception, ok := err.(*cloudwatchlogs.InvalidSequenceTokenException); ok {
log.Printf("[INFO] : %v Refreshing token for LogGroup: %s LogStream: %s", c.OutputType, *input.LogGroupName, *input.LogStreamName)
input.SequenceToken = exception.ExpectedSequenceToken

return c.putLogEvents(svc, input)
}

return nil, err
}

return resp, nil
}
55 changes: 28 additions & 27 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,34 @@ func getInitStats() *types.Statistics {
}))

stats = &types.Statistics{
Requests: getInputNewMap("requests"),
FIFO: getInputNewMap("fifo"),
GRPC: getInputNewMap("grpc"),
Falco: expvar.NewMap("falco.priority"),
Slack: getOutputNewMap("slack"),
Rocketchat: getOutputNewMap("rocketchat"),
Mattermost: getOutputNewMap("mattermost"),
Teams: getOutputNewMap("teams"),
Datadog: getOutputNewMap("datadog"),
Discord: getOutputNewMap("discord"),
Alertmanager: getOutputNewMap("alertmanager"),
Elasticsearch: getOutputNewMap("elasticsearch"),
Loki: getOutputNewMap("loki"),
Nats: getOutputNewMap("nats"),
Influxdb: getOutputNewMap("influxdb"),
AWSLambda: getOutputNewMap("awslambda"),
AWSSQS: getOutputNewMap("awssqs"),
AWSSNS: getOutputNewMap("awssns"),
SMTP: getOutputNewMap("smtp"),
Opsgenie: getOutputNewMap("opsgenie"),
Statsd: getOutputNewMap("statsd"),
Dogstatsd: getOutputNewMap("dogstatsd"),
Webhook: getOutputNewMap("webhook"),
AzureEventHub: getOutputNewMap("azureeventhub"),
GCPPubSub: getOutputNewMap("gcppubsub"),
GoogleChat: getOutputNewMap("googlechat"),
Kafka: getOutputNewMap("kafka"),
Requests: getInputNewMap("requests"),
FIFO: getInputNewMap("fifo"),
GRPC: getInputNewMap("grpc"),
Falco: expvar.NewMap("falco.priority"),
Slack: getOutputNewMap("slack"),
Rocketchat: getOutputNewMap("rocketchat"),
Mattermost: getOutputNewMap("mattermost"),
Teams: getOutputNewMap("teams"),
Datadog: getOutputNewMap("datadog"),
Discord: getOutputNewMap("discord"),
Alertmanager: getOutputNewMap("alertmanager"),
Elasticsearch: getOutputNewMap("elasticsearch"),
Loki: getOutputNewMap("loki"),
Nats: getOutputNewMap("nats"),
Influxdb: getOutputNewMap("influxdb"),
AWSLambda: getOutputNewMap("awslambda"),
AWSSQS: getOutputNewMap("awssqs"),
AWSSNS: getOutputNewMap("awssns"),
AWSCloudWatchLogs: getOutputNewMap("awscloudwatchlogs"),
SMTP: getOutputNewMap("smtp"),
Opsgenie: getOutputNewMap("opsgenie"),
Statsd: getOutputNewMap("statsd"),
Dogstatsd: getOutputNewMap("dogstatsd"),
Webhook: getOutputNewMap("webhook"),
AzureEventHub: getOutputNewMap("azureeventhub"),
GCPPubSub: getOutputNewMap("gcppubsub"),
GoogleChat: getOutputNewMap("googlechat"),
Kafka: getOutputNewMap("kafka"),
}
stats.Falco.Add("emergency", 0)
stats.Falco.Add("alert", 0)
Expand Down
Loading