Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[functionbeat] Allow Kinesis to deploy using the CLI. #10116

Merged
merged 8 commits into from
Jan 25, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 53 additions & 2 deletions x-pack/functionbeat/_meta/beat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ functionbeat.provider.aws.functions:
type: sqs

# Description of the method to help identify them when you run multiples functions.
description: "lambda function for sqs events"
description: "lambda function for SQS events"

# Concurrency, is the reserved number of instances for that function.
# Default is 5.
Expand All @@ -80,8 +80,9 @@ functionbeat.provider.aws.functions:
#fields:
# env: staging

# List of cloudwatch log group registered to that function.
# List of SQS queues.
triggers:
# Arn for the SQS queue.
- event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents

# Define custom processors for this function.
Expand All @@ -92,3 +93,53 @@ functionbeat.provider.aws.functions:
# max_depth: 1
# target: ""
# overwrite_keys: false
#

# Create a function that accepts events from Kinesis streams.
- name: kinesis
enabled: false
type: kinesis

# Description of the method to help identify them when you run multiples functions.
description: "lambda function for Kinesis events"

# Concurrency, is the reserved number of instances for that function.
# Default is 5.
#
# Note: There is a hard limit of 1000 functions of any kind per account.
#concurrency: 5

# The maximum memory allocated for this function, the configured size must be a factor of 64.
# There is a hard limit of 3008MiB for each function. Default is 128MiB.
#memory_size: 128MiB

# Dead letter queue configuration, this must be set to an ARN pointing to a SQS queue.
#dead_letter_config.target_arn:

# Optional fields that you can specify to add additional information to the
# output. Fields can be scalar values, arrays, dictionaries, or any nested
# combination of these.
#fields:
# env: staging

# Define custom processors for this function.
#processors:
# - decode_json_fields:
# fields: ["message"]
# process_array: false
# max_depth: 1
# target: ""
# overwrite_keys: false

# List of Kinesis streams.
triggers:
# Arn for the Kinesis stream.
- event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents

# batch_size is the number of events read in a batch.
# Default is 10.
#batch_size: 100

# Starting position is where to start reading events from the Kinesis stream.
# Default is trim_horizon.
#starting_position: "trim_horizon"
55 changes: 55 additions & 0 deletions x-pack/functionbeat/_meta/beat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ functionbeat.provider.aws.functions:
#fields:
# env: staging

# List of SQS queues.
triggers:
# Arn for the SQS queue.
- event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents

# Define custom processors for this function.
#processors:
# - decode_json_fields:
Expand All @@ -89,3 +94,53 @@ functionbeat.provider.aws.functions:
# max_depth: 1
# target: ""
# overwrite_keys: false
#

# Create a function that accepts events from Kinesis streams.
- name: kinesis
enabled: false
type: kinesis

# Description of the method to help identify them when you run multiples functions.
description: "lambda function for Kinesis events"

# Concurrency, is the reserved number of instances for that function.
# Default is 5.
#
# Note: There is a hard limit of 1000 functions of any kind per account.
#concurrency: 5

# The maximum memory allocated for this function, the configured size must be a factor of 64.
# There is a hard limit of 3008MiB for each function. Default is 128MiB.
#memory_size: 128MiB

# Dead letter queue configuration, this must be set to an ARN pointing to a SQS queue.
#dead_letter_config.target_arn:

# Optional fields that you can specify to add additional information to the
# output. Fields can be scalar values, arrays, dictionaries, or any nested
# combination of these.
#fields:
# env: staging

# Define custom processors for this function.
#processors:
# - decode_json_fields:
# fields: ["message"]
# process_array: false
# max_depth: 1
# target: ""
# overwrite_keys: false

# List of Kinesis streams.
triggers:
# Arn for the Kinesis stream.
- event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents

# batch_size is the number of events read in a batch.
# Default is 10.
#batch_size: 100

# Starting position is where to start reading events from the Kinesis stream.
# Default is trim_horizon.
#starting_position: "trim_horizon"
15 changes: 15 additions & 0 deletions x-pack/functionbeat/docs/config-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ are:

`cloudwatch_logs`:: Collects events from CloudWatch logs.
`sqs`:: Collects data from Amazon Simple Queue Service (SQS).
`kinesis`:: Collects data from a Kinesis stream.

[float]
[id="{beatname_lc}-description"]
Expand Down Expand Up @@ -125,3 +126,17 @@ default is 128 MiB.

The dead letter queue to use for messages that can't be processed successfully.
Set this option to an ARN that points to an SQS queue.

[float]
[id="{beatname_lc}-batch-size"]
==== `batch_size`

The number of events to read from a Kinesis stream, the minimal values is 100 and the maximun is
10000. The default is 100.

[float]
[id="{beatname_lc}-starting-position"]
==== `starting_position`

The starting position to read from a Kinesis stream, valids values are `trim_horizon` and `latest`.
The default is trim_horizon.
55 changes: 53 additions & 2 deletions x-pack/functionbeat/functionbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ functionbeat.provider.aws.functions:
type: sqs

# Description of the method to help identify them when you run multiples functions.
description: "lambda function for sqs events"
description: "lambda function for SQS events"

# Concurrency, is the reserved number of instances for that function.
# Default is 5.
Expand All @@ -80,8 +80,9 @@ functionbeat.provider.aws.functions:
#fields:
# env: staging

# List of cloudwatch log group registered to that function.
# List of SQS queues.
triggers:
# Arn for the SQS queue.
- event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents

# Define custom processors for this function.
Expand All @@ -92,6 +93,56 @@ functionbeat.provider.aws.functions:
# max_depth: 1
# target: ""
# overwrite_keys: false
#

# Create a function that accepts events from Kinesis streams.
- name: kinesis
enabled: false
type: kinesis

# Description of the method to help identify them when you run multiples functions.
description: "lambda function for Kinesis events"

# Concurrency, is the reserved number of instances for that function.
# Default is 5.
#
# Note: There is a hard limit of 1000 functions of any kind per account.
#concurrency: 5

# The maximum memory allocated for this function, the configured size must be a factor of 64.
# There is a hard limit of 3008MiB for each function. Default is 128MiB.
#memory_size: 128MiB

# Dead letter queue configuration, this must be set to an ARN pointing to a SQS queue.
#dead_letter_config.target_arn:

# Optional fields that you can specify to add additional information to the
# output. Fields can be scalar values, arrays, dictionaries, or any nested
# combination of these.
#fields:
# env: staging

# Define custom processors for this function.
#processors:
# - decode_json_fields:
# fields: ["message"]
# process_array: false
# max_depth: 1
# target: ""
# overwrite_keys: false

# List of Kinesis streams.
triggers:
# Arn for the Kinesis stream.
- event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents

# batch_size is the number of events read in a batch.
# Default is 10.
#batch_size: 100

# Starting position is where to start reading events from the Kinesis stream.
# Default is trim_horizon.
#starting_position: "trim_horizon"

#================================ General ======================================

Expand Down
55 changes: 55 additions & 0 deletions x-pack/functionbeat/functionbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ functionbeat.provider.aws.functions:
#fields:
# env: staging

# List of SQS queues.
triggers:
# Arn for the SQS queue.
- event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents

# Define custom processors for this function.
#processors:
# - decode_json_fields:
Expand All @@ -89,6 +94,56 @@ functionbeat.provider.aws.functions:
# max_depth: 1
# target: ""
# overwrite_keys: false
#

# Create a function that accepts events from Kinesis streams.
- name: kinesis
enabled: false
type: kinesis

# Description of the method to help identify them when you run multiples functions.
description: "lambda function for Kinesis events"

# Concurrency, is the reserved number of instances for that function.
# Default is 5.
#
# Note: There is a hard limit of 1000 functions of any kind per account.
#concurrency: 5

# The maximum memory allocated for this function, the configured size must be a factor of 64.
# There is a hard limit of 3008MiB for each function. Default is 128MiB.
#memory_size: 128MiB

# Dead letter queue configuration, this must be set to an ARN pointing to a SQS queue.
#dead_letter_config.target_arn:

# Optional fields that you can specify to add additional information to the
# output. Fields can be scalar values, arrays, dictionaries, or any nested
# combination of these.
#fields:
# env: staging

# Define custom processors for this function.
#processors:
# - decode_json_fields:
# fields: ["message"]
# process_array: false
# max_depth: 1
# target: ""
# overwrite_keys: false

# List of Kinesis streams.
triggers:
# Arn for the Kinesis stream.
- event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents

# batch_size is the number of events read in a batch.
# Default is 10.
#batch_size: 100

# Starting position is where to start reading events from the Kinesis stream.
# Default is trim_horizon.
#starting_position: "trim_horizon"

#================================ General =====================================

Expand Down
39 changes: 23 additions & 16 deletions x-pack/functionbeat/provider/aws/cli_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type AWSLambdaFunction struct {
}

type installer interface {
Policies() []cloudformation.AWSIAMRole_Policy
Template() *cloudformation.Template
LambdaConfig() *lambdaConfig
}
Expand Down Expand Up @@ -84,6 +85,27 @@ func (c *CLIManager) template(function installer, name, codeLoc string) *cloudfo
// Documentation: https://docs.aws.amazon.com/AWSCloudFormation/latest/APIReference/Welcome.html
// Intrinsic function reference: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/intrinsic-function-reference.html

// Default policies to writes logs from the Lambda.
policies := []cloudformation.AWSIAMRole_Policy{
cloudformation.AWSIAMRole_Policy{
PolicyName: cloudformation.Join("-", []string{"fnb", "lambda", name}),
PolicyDocument: map[string]interface{}{
"Statement": []map[string]interface{}{
map[string]interface{}{
"Action": []string{"logs:CreateLogStream", "Logs:PutLogEvents"},
"Effect": "Allow",
"Resource": []string{
cloudformation.Sub("arn:${AWS::Partition}:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/" + name + ":*"),
},
},
},
},
},
}

// Merge any specific policies from the service.
policies = append(policies, function.Policies()...)

// Create the roles for the lambda.
template := cloudformation.NewTemplate()
// doc: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-iam-role.html
Expand All @@ -106,22 +128,7 @@ func (c *CLIManager) template(function installer, name, codeLoc string) *cloudfo
RoleName: "functionbeat-lambda-" + name,
// Allow the lambda to write log to cloudwatch logs.
// doc: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-iam-policy.html
Policies: []cloudformation.AWSIAMRole_Policy{
cloudformation.AWSIAMRole_Policy{
PolicyName: cloudformation.Join("-", []string{"fnb", "lambda", name}),
PolicyDocument: map[string]interface{}{
"Statement": []map[string]interface{}{
map[string]interface{}{
"Action": []string{"logs:CreateLogStream", "Logs:PutLogEvents"},
"Effect": "Allow",
"Resource": []string{
cloudformation.Sub("arn:${AWS::Partition}:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/" + name + ":*"),
},
},
},
},
},
},
Policies: policies,
}

// Configure the Dead letter, any failed events will be send to the configured amazon resource name.
Expand Down
5 changes: 5 additions & 0 deletions x-pack/functionbeat/provider/aws/cloudwatch_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,3 +210,8 @@ func (c *CloudwatchLogs) Template() *cloudformation.Template {
func (c *CloudwatchLogs) LambdaConfig() *lambdaConfig {
return c.config.LambdaConfig
}

// Policies returns a slice of policy to add to the lambda.
func (c *CloudwatchLogs) Policies() []cloudformation.AWSIAMRole_Policy {
return []cloudformation.AWSIAMRole_Policy{}
}
Loading