Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Metricbeat] add collecting tags for rds metricset #16605

Merged
merged 18 commits into from
Mar 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add filtering option for prometheus collector. {pull}16420[16420]
- Add metricsets based on Ceph Manager Daemon to the `ceph` module. {issue}7723[7723] {pull}16254[16254]
- Release `statsd` module as GA. {pull}16447[16447] {issue}14280[14280]
- Add collecting tags and tags_filter for rds metricset in aws module. {pull}16605[16605] {issue}16358[16358]
- Add OpenMetrics Metricbeat module {pull}16596[16596]
- Add `cloudfoundry` module to send events from Cloud Foundry. {pull}16671[16671]
- Add `redisenterprise` module. {pull}16482[16482] {issue}15269[15269]
Expand Down
5 changes: 4 additions & 1 deletion metricbeat/docs/modules/aws.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ Currently, we have `billing`, `cloudwatch`, `dynamodb`, `ebs`, `ec2`, `elb`,
`lambda`, `rds`, `s3_daily_storage`, `s3_request`, `sns`, `sqs` and `usage`
metricset in `aws` module.

Collecting `tags` for `ec2`, `cloudwatch`, and metricset created based on
Collecting `tags` for `ec2`, `rds`, `cloudwatch`, and metricset created based on
`cloudwatch` using light module is supported.

* *tags.*: Tag key value pairs from aws resources. A tag is a label that user assigns to an AWS resource.
Expand Down Expand Up @@ -301,6 +301,9 @@ metricbeat.modules:
- module: aws
period: 60s
credential_profile_name: test-mb
tags_filter:
- key: "dept"
value: "eng"
metricsets:
- rds
----
Expand Down
3 changes: 3 additions & 0 deletions x-pack/metricbeat/metricbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,9 @@ metricbeat.modules:
- module: aws
period: 60s
credential_profile_name: test-mb
tags_filter:
- key: "dept"
value: "eng"
metricsets:
- rds

Expand Down
3 changes: 3 additions & 0 deletions x-pack/metricbeat/module/aws/_meta/config.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,8 @@
- module: aws
period: 60s
credential_profile_name: test-mb
tags_filter:
- key: "dept"
value: "eng"
metricsets:
- rds
2 changes: 1 addition & 1 deletion x-pack/metricbeat/module/aws/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Currently, we have `billing`, `cloudwatch`, `dynamodb`, `ebs`, `ec2`, `elb`,
`lambda`, `rds`, `s3_daily_storage`, `s3_request`, `sns`, `sqs` and `usage`
metricset in `aws` module.

Collecting `tags` for `ec2`, `cloudwatch`, and metricset created based on
Collecting `tags` for `ec2`, `rds`, `cloudwatch`, and metricset created based on
`cloudwatch` using light module is supported.

* *tags.*: Tag key value pairs from aws resources. A tag is a label that user assigns to an AWS resource.
Expand Down
16 changes: 13 additions & 3 deletions x-pack/metricbeat/module/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/aws/aws-sdk-go-v2/service/ec2"
"github.com/aws/aws-sdk-go-v2/service/ec2/ec2iface"
"github.com/aws/aws-sdk-go-v2/service/iam"
"github.com/aws/aws-sdk-go-v2/service/rds"
"github.com/aws/aws-sdk-go-v2/service/resourcegroupstaggingapi"
"github.com/aws/aws-sdk-go-v2/service/sts"
"github.com/pkg/errors"
Expand All @@ -23,9 +24,10 @@ import (

// Config defines all required and optional parameters for aws metricsets
type Config struct {
Period time.Duration `config:"period" validate:"nonzero,required"`
Regions []string `config:"regions"`
AWSConfig awscommon.ConfigAWS `config:",inline"`
Period time.Duration `config:"period" validate:"nonzero,required"`
Regions []string `config:"regions"`
AWSConfig awscommon.ConfigAWS `config:",inline"`
TagsFilter []Tag `config:"tags_filter"`
}

// MetricSet is the base metricset for all aws metricsets
Expand All @@ -37,6 +39,7 @@ type MetricSet struct {
AwsConfig *awssdk.Config
AccountName string
AccountID string
TagsFilter []Tag
}

// Tag holds a configuration specific for ec2 and cloudwatch metricset.
Expand Down Expand Up @@ -84,6 +87,7 @@ func NewMetricSet(base mb.BaseMetricSet) (*MetricSet, error) {
BaseMetricSet: base,
Period: config.Period,
AwsConfig: &awsConfig,
TagsFilter: config.TagsFilter,
}

// Get IAM account name
Expand Down Expand Up @@ -194,6 +198,12 @@ func CheckTagFiltersExist(tagsFilter []Tag, tags interface{}) bool {
tagKeys = append(tagKeys, *tag.Key)
tagValues = append(tagValues, *tag.Value)
}
case []rds.Tag:
tagsRDS := tags.([]rds.Tag)
for _, tag := range tagsRDS {
tagKeys = append(tagKeys, *tag.Key)
tagValues = append(tagValues, *tag.Value)
}
}

for _, tagFilter := range tagsFilter {
Expand Down
16 changes: 4 additions & 12 deletions x-pack/metricbeat/module/aws/ec2/ec2.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ func init() {
// interface methods except for Fetch.
type MetricSet struct {
*aws.MetricSet
TagsFilter []aws.Tag `config:"tags_filter"`
}

// New creates a new instance of the MetricSet. New is responsible for unpacking
Expand All @@ -56,15 +55,6 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return nil, errors.Wrap(err, "error creating aws metricset")
}

config := struct {
Tags []aws.Tag `config:"tags_filter"`
}{}

err = base.Module().UnpackConfig(&config)
if err != nil {
return nil, errors.Wrap(err, "error unpack raw module config using UnpackConfig")
}

// Check if period is set to be multiple of 60s or 300s
remainder300 := int(metricSet.Period.Seconds()) % 300
remainder60 := int(metricSet.Period.Seconds()) % 60
Expand All @@ -76,8 +66,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
}

return &MetricSet{
MetricSet: metricSet,
TagsFilter: config.Tags,
MetricSet: metricSet,
}, nil
}

Expand Down Expand Up @@ -201,6 +190,9 @@ func (m *MetricSet) createCloudWatchEvents(getMetricDataResults []cloudwatch.Met
// If tag filter doesn't exist in tagKeys/tagValues,
// then do not report this event/instance.
if exists := aws.CheckTagFiltersExist(m.TagsFilter, tags); !exists {
// if tag filter doesn't exist, remove this event initial
// entry to avoid report an empty event.
delete(events, instanceID)
continue
}
}
Expand Down
140 changes: 139 additions & 1 deletion x-pack/metricbeat/module/aws/ec2/ec2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,6 @@ func TestCreateCloudWatchEventsDedotTags(t *testing.T) {

metricSet := MetricSet{
&aws.MetricSet{},
nil,
}
events, err := metricSet.createCloudWatchEvents(getMetricDataOutput, instancesOutputs, "us-west-1")
assert.NoError(t, err)
Expand All @@ -222,6 +221,145 @@ func TestCreateCloudWatchEventsDedotTags(t *testing.T) {
assert.Equal(t, expectedEvent.MetricSetFields["tags"], events[instanceID].ModuleFields["tags"])
}

func TestCreateCloudWatchEventsWithTagsFilter(t *testing.T) {
expectedEvent := mb.Event{
RootFields: common.MapStr{
"cloud": common.MapStr{
"region": regionName,
"provider": "aws",
"instance": common.MapStr{"id": "i-123"},
"machine": common.MapStr{"type": "t2.medium"},
"availability_zone": "us-west-1a",
},
},
MetricSetFields: common.MapStr{
"cpu": common.MapStr{
"total": common.MapStr{"pct": 0.25},
},
"instance": common.MapStr{
"image": common.MapStr{"id": "image-123"},
"core": common.MapStr{"count": int64(1)},
"threads_per_core": int64(1),
"state": common.MapStr{"code": int64(16), "name": "running"},
"monitoring": common.MapStr{"state": "disabled"},
"public": common.MapStr{
"dns_name": "ec2-1-2-3-4.us-west-1.compute.amazonaws.com",
"ip": "1.2.3.4",
},
"private": common.MapStr{
"dns_name": "ip-5-6-7-8.us-west-1.compute.internal",
"ip": "5.6.7.8",
},
},
"tags": common.MapStr{
"app_kubernetes_io/name": "foo",
"helm_sh/chart": "foo-chart",
},
},
}

svcEC2Mock := &MockEC2Client{}
instanceIDs, instancesOutputs, err := getInstancesPerRegion(svcEC2Mock)
assert.NoError(t, err)
assert.Equal(t, 1, len(instanceIDs))
instanceID := instanceIDs[0]
assert.Equal(t, instanceID, instanceID)
timestamp := time.Now()

getMetricDataOutput := []cloudwatch.MetricDataResult{
{
Id: &id1,
Label: &label1,
Values: []float64{0.25},
Timestamps: []time.Time{timestamp},
},
{
Id: &id2,
Label: &label2,
Values: []float64{0.0},
Timestamps: []time.Time{timestamp},
},
{
Id: &id3,
Label: &label3,
Values: []float64{0.0},
Timestamps: []time.Time{timestamp},
},
{
Id: &id4,
Label: &label4,
Values: []float64{0.0},
Timestamps: []time.Time{timestamp},
},
}

metricSet := MetricSet{
&aws.MetricSet{
TagsFilter: []aws.Tag{{
Key: "app.kubernetes.io/name",
Value: "foo",
}},
},
}
events, err := metricSet.createCloudWatchEvents(getMetricDataOutput, instancesOutputs, "us-west-1")

assert.NoError(t, err)
assert.Equal(t, 1, len(events))
assert.Equal(t, expectedEvent.RootFields, events[instanceID].RootFields)
assert.Equal(t, expectedEvent.MetricSetFields["cpu"], events[instanceID].MetricSetFields["cpu"])
assert.Equal(t, expectedEvent.MetricSetFields["instance"], events[instanceID].MetricSetFields["instance"])
assert.Equal(t, expectedEvent.MetricSetFields["tags"], events[instanceID].ModuleFields["tags"])
}

func TestCreateCloudWatchEventsWithNotMatchingTagsFilter(t *testing.T) {
svcEC2Mock := &MockEC2Client{}
instanceIDs, instancesOutputs, err := getInstancesPerRegion(svcEC2Mock)
assert.NoError(t, err)
assert.Equal(t, 1, len(instanceIDs))
instanceID := instanceIDs[0]
assert.Equal(t, instanceID, instanceID)
timestamp := time.Now()

getMetricDataOutput := []cloudwatch.MetricDataResult{
{
Id: &id1,
Label: &label1,
Values: []float64{0.25},
Timestamps: []time.Time{timestamp},
},
{
Id: &id2,
Label: &label2,
Values: []float64{0.0},
Timestamps: []time.Time{timestamp},
},
{
Id: &id3,
Label: &label3,
Values: []float64{0.0},
Timestamps: []time.Time{timestamp},
},
{
Id: &id4,
Label: &label4,
Values: []float64{0.0},
Timestamps: []time.Time{timestamp},
},
}

metricSet := MetricSet{
&aws.MetricSet{
TagsFilter: []aws.Tag{{
Key: "app_kubernetes_io/name",
Value: "not_foo",
}},
},
}
events, err := metricSet.createCloudWatchEvents(getMetricDataOutput, instancesOutputs, "us-west-1")
assert.NoError(t, err)
assert.Equal(t, 0, len(events))
}

func TestConstructMetricQueries(t *testing.T) {
name := "InstanceId"
dim := cloudwatch.Dimension{
Expand Down
52 changes: 31 additions & 21 deletions x-pack/metricbeat/module/aws/rds/_meta/data.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,61 +3,71 @@
"aws": {
"rds": {
"aurora_bin_log_replica_lag": 0,
"aurora_replica.lag.ms": 19.683,
"aurora_replica.lag_max.ms": 19.651500701904297,
"aurora_replica.lag_min.ms": 19.651500701904297,
"aurora_replica.lag_max.ms": 19.108999252319336,
"aurora_replica.lag_min.ms": 19.108999252319336,
"cache_hit_ratio.buffer": 100,
"cache_hit_ratio.result_set": 0,
"cpu": {
"total": {
"pct": 0.035
"pct": 0.04
}
},
"database_connections": 0,
"db_instance.class": "db.r5.large",
"db_instance": {
"arn": "arn:aws:rds:us-east-1:428152502467:db:database-1-instance-1",
"class": "db.r5.large",
"identifier": "database-1-instance-1",
"status": "available"
},
"db_instance.identifier": "database-1-instance-1",
"deadlocks": 0,
"disk_usage": {
"bin_log.bytes": 0
},
"engine_uptime.sec": 278598.25,
"free_local_storage.bytes": 33112842240,
"freeable_memory.bytes": 4879514624,
"engine_uptime.sec": 2704277,
"free_local_storage.bytes": 31745863680,
"freeable_memory.bytes": 4634234880,
"latency": {
"commit": 2.9322999999999997,
"commit": 5.270933333333333,
"ddl": 0,
"delete": 0,
"dml": 0.08325833333333334,
"insert": 0.08325833333333334,
"select": 0.20890321021571023,
"dml": 0.1624,
"insert": 0.1624,
"select": 0.17333862433862435,
"update": 0
},
"login_failures": 0,
"queries": 7.807070323085375,
"queries": 9.11833836203304,
"throughput": {
"commit": 0.2500020925349523,
"commit": 0.5000916834753039,
"ddl": 0,
"delete": 0,
"dml": 0.2500020925349523,
"insert": 0.2500020925349523,
"network": 1.7498192019524124,
"network_receive": 0.8749096009762062,
"network_transmit": 0.8749096009762062,
"select": 2.8751975752392616,
"dml": 0.5000916834753039,
"insert": 0.5000916834753039,
"network": 1.4,
"network_receive": 0.7,
"network_transmit": 0.7,
"select": 3.150577605894414,
"update": 0
},
"transactions": {
"active": 0,
"blocked": 0
}
},
"tags": {
"created-by": "ks",
"dept": "engr"
}
},
"cloud": {
"account": {
"id": "428152502467",
"name": "elastic-beats"
},
"availability_zone": "us-east-1b",
"provider": "aws",
"region": "eu-west-1"
"region": "us-east-1"
},
"event": {
"dataset": "aws.rds",
Expand Down
Loading