Skip to content

Commit

Permalink
[corechecks] Support custom tags in every corechecks (DataDog#2723)
Browse files Browse the repository at this point in the history
* [aggregator] store custom tags in a checkSender.

This is mainly used to attach tags read in checks configuration
to an instanciated sender.

Those custom tags will be appended to each send (event, metric, service).

* [aggregator] unit tests for custom tags in `checkSender`.

* [corechecks] set custom tags read in check config to the sender.

* [corechecks] unify the usage of checks custom tags.

Some checks were already appending their custom tags to the metrics,
events and services check sent.

This commit removes this as it's now done by the `checkSender`.

* [corechecks] comment about the tags configuration field while implementing a new core check.

* [corechecks] missed some manually added custom tags in Docker check.

* [corechecks] missed some manually added custom tags in kube & snmp unit tests.

* [corechecks] missed some manually added custom tags in network (linux) and kube apiserver unit tests.

* [corechecks] custom tags unit/inte tests changes due to new implementation.

* [docs] add `tags` field into agent/config.md

* [docs] revert a typo fix to avoid merge conflict.

* Add release note entry.

* [corechecks] remove some manually added custom tags in containerd check.

* [corechecks] no need to checks for tags added in the configuration during tests.

* Restore instance tags in integration tests.

* Review feedbacks on comments and improvements.

* Revert "Restore instance tags in integration tests."

This reverts commit 17c2a76.
  • Loading branch information
remeh authored Jan 23, 2019
1 parent 6294d58 commit 12cf59d
Show file tree
Hide file tree
Showing 29 changed files with 232 additions and 140 deletions.
1 change: 1 addition & 0 deletions docs/agent/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ advanced options in the `instance` section:
that should run less frequently than the default 15 seconds interval
* `empty_default_hostname`: submit metrics, events and service checks with no
hostname when set to `true`
* `tags`: send custom tags in addition to the tags sent by the check.

## Removed options

Expand Down
5 changes: 5 additions & 0 deletions pkg/aggregator/mocksender/mocked_methods.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ func (m *MockSender) Commit() {
m.Called()
}

//SetCheckCustomTags enables the set of check custom tags mock call.
func (m *MockSender) SetCheckCustomTags(tags []string) {
m.Called(tags)
}

//GetMetricStats enables the get metric stats mock call.
func (m *MockSender) GetMetricStats() map[string]int64 {
m.Called()
Expand Down
1 change: 1 addition & 0 deletions pkg/aggregator/mocksender/mocksender.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func (m *MockSender) SetupAcceptAll() {
m.On("Event", mock.AnythingOfType("metrics.Event")).Return()
m.On("GetMetricStats", mock.AnythingOfType("map[string]int64")).Return()
m.On("DisableDefaultHostname", mock.AnythingOfType("bool")).Return()
m.On("SetCheckCustomTags", mock.AnythingOfType("[]string")).Return()
m.On("Commit").Return()
}

Expand Down
14 changes: 13 additions & 1 deletion pkg/aggregator/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Sender interface {
Event(e metrics.Event)
GetMetricStats() map[string]int64
DisableDefaultHostname(disable bool)
SetCheckCustomTags(tags []string)
}

type metricStats struct {
Expand All @@ -61,6 +62,7 @@ type checkSender struct {
smsOut chan<- senderMetricSample
serviceCheckOut chan<- metrics.ServiceCheck
eventOut chan<- metrics.Event
checkTags []string
}

type senderMetricSample struct {
Expand Down Expand Up @@ -152,6 +154,12 @@ func (s *checkSender) DisableDefaultHostname(disable bool) {
s.defaultHostnameDisabled = disable
}

// SetCheckCustomTags stores the tags set in the check configuration file.
// They will be appended to each send (metric, event and service)
func (s *checkSender) SetCheckCustomTags(tags []string) {
s.checkTags = tags
}

// Commit commits the metric samples that were added during a check run
// Should be called at the end of every check run
func (s *checkSender) Commit() {
Expand Down Expand Up @@ -191,6 +199,8 @@ func (s *checkSender) SendRawMetricSample(sample *metrics.MetricSample) {
}

func (s *checkSender) sendMetricSample(metric string, value float64, hostname string, tags []string, mType metrics.MetricType) {
tags = append(tags, s.checkTags...)

log.Trace(mType.String(), " sample: ", metric, ": ", value, " for hostname: ", hostname, " tags: ", tags)

metricSample := &metrics.MetricSample{
Expand Down Expand Up @@ -267,7 +277,7 @@ func (s *checkSender) ServiceCheck(checkName string, status metrics.ServiceCheck
Status: status,
Host: hostname,
Ts: time.Now().Unix(),
Tags: tags,
Tags: append(tags, s.checkTags...),
Message: message,
}

Expand All @@ -284,6 +294,8 @@ func (s *checkSender) ServiceCheck(checkName string, status metrics.ServiceCheck

// Event submits an event
func (s *checkSender) Event(e metrics.Event) {
e.Tags = append(e.Tags, s.checkTags...)

log.Trace("Event submitted: ", e.Title, " for hostname: ", e.Host, " tags: ", e.Tags)

if e.Host == "" && !s.defaultHostnameDisabled {
Expand Down
120 changes: 120 additions & 0 deletions pkg/aggregator/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"sync"
"testing"
"time"

// 3p

Expand Down Expand Up @@ -133,6 +134,125 @@ func TestGetSenderDefaultHostname(t *testing.T) {
assert.Equal(t, false, checksender.defaultHostnameDisabled)
}

func TestGetSenderAddCheckCustomTagsMetrics(t *testing.T) {
resetAggregator()
InitAggregator(nil, "testhostname", "")

senderMetricSampleChan := make(chan senderMetricSample, 10)
serviceCheckChan := make(chan metrics.ServiceCheck, 10)
eventChan := make(chan metrics.Event, 10)
checkSender := newCheckSender(checkID1, "", senderMetricSampleChan, serviceCheckChan, eventChan)

// no custom tags
checkSender.sendMetricSample("metric.test", 42.0, "testhostname", nil, metrics.CounterType)
sms := <-senderMetricSampleChan
assert.Nil(t, sms.metricSample.Tags)

// only tags added by the check
checkTags := []string{"check:tag1", "check:tag2"}
checkSender.sendMetricSample("metric.test", 42.0, "testhostname", checkTags, metrics.CounterType)
sms = <-senderMetricSampleChan
assert.Equal(t, checkTags, sms.metricSample.Tags)

// simulate tags in the configuration file
customTags := []string{"custom:tag1", "custom:tag2"}
checkSender.SetCheckCustomTags(customTags)
assert.Len(t, checkSender.checkTags, 2)

// only tags coming from the configuration file
checkSender.sendMetricSample("metric.test", 42.0, "testhostname", nil, metrics.CounterType)
sms = <-senderMetricSampleChan
assert.Equal(t, customTags, sms.metricSample.Tags)

// tags added by the check + tags coming from the configuration file
checkSender.sendMetricSample("metric.test", 42.0, "testhostname", checkTags, metrics.CounterType)
sms = <-senderMetricSampleChan
assert.Equal(t, append(checkTags, customTags...), sms.metricSample.Tags)
}

func TestGetSenderAddCheckCustomTagsService(t *testing.T) {
resetAggregator()
InitAggregator(nil, "testhostname", "")

senderMetricSampleChan := make(chan senderMetricSample, 10)
serviceCheckChan := make(chan metrics.ServiceCheck, 10)
eventChan := make(chan metrics.Event, 10)
checkSender := newCheckSender(checkID1, "", senderMetricSampleChan, serviceCheckChan, eventChan)

// no custom tags
checkSender.ServiceCheck("test", metrics.ServiceCheckOK, "testhostname", nil, "test message")
sc := <-serviceCheckChan
assert.Nil(t, sc.Tags)

// only tags added by the check
checkTags := []string{"check:tag1", "check:tag2"}
checkSender.ServiceCheck("test", metrics.ServiceCheckOK, "testhostname", checkTags, "test message")
sc = <-serviceCheckChan
assert.Equal(t, checkTags, sc.Tags)

// simulate tags in the configuration file
customTags := []string{"custom:tag1", "custom:tag2"}
checkSender.SetCheckCustomTags(customTags)
assert.Len(t, checkSender.checkTags, 2)

// only tags coming from the configuration file
checkSender.ServiceCheck("test", metrics.ServiceCheckOK, "testhostname", nil, "test message")
sc = <-serviceCheckChan
assert.Equal(t, customTags, sc.Tags)

// tags added by the check + tags coming from the configuration file
checkSender.ServiceCheck("test", metrics.ServiceCheckOK, "testhostname", checkTags, "test message")
sc = <-serviceCheckChan
assert.Equal(t, append(checkTags, customTags...), sc.Tags)
}

func TestGetSenderAddCheckCustomTagsEvent(t *testing.T) {
resetAggregator()
InitAggregator(nil, "testhostname", "")

senderMetricSampleChan := make(chan senderMetricSample, 10)
serviceCheckChan := make(chan metrics.ServiceCheck, 10)
eventChan := make(chan metrics.Event, 10)
checkSender := newCheckSender(checkID1, "", senderMetricSampleChan, serviceCheckChan, eventChan)

event := metrics.Event{
Title: "title",
Host: "testhostname",
Ts: time.Now().Unix(),
Text: "text",
Tags: nil,
}

// no custom tags
checkSender.Event(event)
e := <-eventChan
assert.Nil(t, e.Tags)

// only tags added by the check
checkTags := []string{"check:tag1", "check:tag2"}
event.Tags = checkTags
checkSender.Event(event)
e = <-eventChan
assert.Equal(t, checkTags, e.Tags)

// simulate tags in the configuration file
customTags := []string{"custom:tag1", "custom:tag2"}
checkSender.SetCheckCustomTags(customTags)
assert.Len(t, checkSender.checkTags, 2)

// only tags coming from the configuration file
event.Tags = nil
checkSender.Event(event)
e = <-eventChan
assert.Equal(t, customTags, e.Tags)

// tags added by the check + tags coming from the configuration file
event.Tags = checkTags
checkSender.Event(event)
e = <-eventChan
assert.Equal(t, append(checkTags, customTags...), e.Tags)
}

func TestCheckSenderInterface(t *testing.T) {
senderMetricSampleChan := make(chan senderMetricSample, 10)
serviceCheckChan := make(chan metrics.ServiceCheck, 10)
Expand Down
9 changes: 5 additions & 4 deletions pkg/autodiscovery/integration/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,11 @@ type Config struct {

// CommonInstanceConfig holds the reserved fields for the yaml instance data
type CommonInstanceConfig struct {
MinCollectionInterval int `yaml:"min_collection_interval"`
EmptyDefaultHostname bool `yaml:"empty_default_hostname"`
Name string `yaml:"name"`
Namespace string `yaml:"namespace"`
MinCollectionInterval int `yaml:"min_collection_interval"`
EmptyDefaultHostname bool `yaml:"empty_default_hostname"`
Tags []string `yaml:"tags"`
Name string `yaml:"name"`
Namespace string `yaml:"namespace"`
}

// Equal determines whether the passed config is the same
Expand Down
14 changes: 14 additions & 0 deletions pkg/collector/corechecks/checkbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ import (
// Integration warnings are handled via the Warn and Warnf methods
// that forward the warning to the logger and send the warning to
// the collector for display in the status page and the web UI.
//
// If custom tags are set in the instance configuration, they will
// be automatically appended to each send done by this check.
type CheckBase struct {
checkName string
checkID check.ID
Expand Down Expand Up @@ -84,6 +87,17 @@ func (c *CheckBase) CommonConfigure(instance integration.Data) error {
}
s.DisableDefaultHostname(true)
}

// Set custom tags configured for this check
if len(commonOptions.Tags) > 0 {
s, err := aggregator.GetSender(c.checkID)
if err != nil {
log.Errorf("failed to retrieve a sender for check %s: %s", string(c.ID()), err)
return err
}
s.SetCheckCustomTags(commonOptions.Tags)
}

return nil
}

Expand Down
4 changes: 1 addition & 3 deletions pkg/collector/corechecks/cluster/kubernetes_apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ const (

// KubeASConfig is the config of the API server.
type KubeASConfig struct {
Tags []string `yaml:"tags"`
CollectEvent bool `yaml:"collect_events"`
CollectOShiftQuotas bool `yaml:"collect_openshift_clusterquotas"`
FilteredEventType []string `yaml:"filtered_event_types"`
Expand Down Expand Up @@ -279,7 +278,7 @@ func (k *KubeASCheck) parseComponentStatus(sender aggregator.Sender, componentsS
log.Debug("API Server component's structure is not expected")
continue
}
tagComp := append(k.instance.Tags, fmt.Sprintf("component:%s", component.Name))
tagComp := []string{fmt.Sprintf("component:%s", component.Name)}
for _, condition := range component.Conditions {
status_check := metrics.ServiceCheckUnknown

Expand Down Expand Up @@ -341,7 +340,6 @@ ITER_EVENTS:
k.Warnf("Error while formatting bundled events, %s. Not submitting", err.Error())
continue
}
datadogEv.Tags = append(datadogEv.Tags, k.instance.Tags...)
sender.Event(datadogEv)
}
return nil
Expand Down
24 changes: 10 additions & 14 deletions pkg/collector/corechecks/cluster/kubernetes_apiserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,33 +89,31 @@ func TestParseComponentStatus(t *testing.T) {

// FIXME: use the factory instead
kubeASCheck := &KubeASCheck{
instance: &KubeASConfig{
Tags: []string{"test"},
},
instance: &KubeASConfig{},
CheckBase: core.NewCheckBase(kubernetesAPIServerCheckName),
KubeAPIServerHostname: "hostname",
}

mocked := mocksender.NewMockSender(kubeASCheck.ID())
mocked.On("ServiceCheck", "kube_apiserver_controlplane.up", metrics.ServiceCheckOK, "hostname", []string{"test", "component:Zookeeper"}, "")
mocked.On("ServiceCheck", "kube_apiserver_controlplane.up", metrics.ServiceCheckOK, "hostname", []string{"component:Zookeeper"}, "")
kubeASCheck.parseComponentStatus(mocked, expected)

mocked.AssertNumberOfCalls(t, "ServiceCheck", 1)
mocked.AssertServiceCheck(t, "kube_apiserver_controlplane.up", metrics.ServiceCheckOK, "hostname", []string{"test", "component:Zookeeper"}, "")
mocked.AssertServiceCheck(t, "kube_apiserver_controlplane.up", metrics.ServiceCheckOK, "hostname", []string{"component:Zookeeper"}, "")

err := kubeASCheck.parseComponentStatus(mocked, unExpected)
assert.EqualError(t, err, "metadata structure has changed. Not collecting API Server's Components status")
mocked.AssertNotCalled(t, "ServiceCheck", "kube_apiserver_controlplane.up")

mocked.On("ServiceCheck", "kube_apiserver_controlplane.up", metrics.ServiceCheckCritical, "hostname", []string{"test", "component:ETCD"}, "")
mocked.On("ServiceCheck", "kube_apiserver_controlplane.up", metrics.ServiceCheckCritical, "hostname", []string{"component:ETCD"}, "")
kubeASCheck.parseComponentStatus(mocked, unHealthy)
mocked.AssertNumberOfCalls(t, "ServiceCheck", 2)
mocked.AssertServiceCheck(t, "kube_apiserver_controlplane.up", metrics.ServiceCheckCritical, "hostname", []string{"test", "component:ETCD"}, "")
mocked.AssertServiceCheck(t, "kube_apiserver_controlplane.up", metrics.ServiceCheckCritical, "hostname", []string{"component:ETCD"}, "")

mocked.On("ServiceCheck", "kube_apiserver_controlplane.up", metrics.ServiceCheckUnknown, "hostname", []string{"test", "component:DCA"}, "")
mocked.On("ServiceCheck", "kube_apiserver_controlplane.up", metrics.ServiceCheckUnknown, "hostname", []string{"component:DCA"}, "")
kubeASCheck.parseComponentStatus(mocked, unknown)
mocked.AssertNumberOfCalls(t, "ServiceCheck", 3)
mocked.AssertServiceCheck(t, "kube_apiserver_controlplane.up", metrics.ServiceCheckUnknown, "hostname", []string{"test", "component:DCA"}, "")
mocked.AssertServiceCheck(t, "kube_apiserver_controlplane.up", metrics.ServiceCheckUnknown, "hostname", []string{"component:DCA"}, "")

empty_resp := kubeASCheck.parseComponentStatus(mocked, empty)
assert.Nil(t, empty_resp, "metadata structure has changed. Not collecting API Server's Components status")
Expand Down Expand Up @@ -159,7 +157,6 @@ func TestProcessBundledEvents(t *testing.T) {

kubeASCheck := &KubeASCheck{
instance: &KubeASConfig{
Tags: []string{"test"},
FilteredEventType: []string{"ignored"},
},
CheckBase: core.NewCheckBase(kubernetesAPIServerCheckName),
Expand Down Expand Up @@ -195,7 +192,7 @@ func TestProcessBundledEvents(t *testing.T) {
Title: "Events from the machine-blue Node",
Text: "%%% \n30 **MissingClusterDNS**: MountVolume.SetUp succeeded\n \n _Events emitted by the kubelet seen at " + time.Unix(709675200, 0).String() + "_ \n\n %%%",
Priority: "normal",
Tags: []string{"test", "namespace:default", "source_component:kubelet"},
Tags: []string{"namespace:default", "source_component:kubelet"},
AggregationKey: "kubernetes_apiserver:e63e74fa-f566-11e7-9749-0e4863e1cbf4",
SourceTypeName: "kubernetes",
Ts: 709675200,
Expand Down Expand Up @@ -223,7 +220,7 @@ func TestProcessBundledEvents(t *testing.T) {
Title: "Events from the machine-blue Node",
Text: "%%% \n30 **MissingClusterDNS**: MountVolume.SetUp succeeded\n \n _Events emitted by the kubelet seen at " + time.Unix(709675200, 0).String() + "_ \n\n %%%",
Priority: "normal",
Tags: []string{"test", "namespace:default", "source_component:kubelet"},
Tags: []string{"namespace:default", "source_component:kubelet"},
AggregationKey: "kubernetes_apiserver:e63e74fa-f566-11e7-9749-0e4863e1cbf4",
SourceTypeName: "kubernetes",
Ts: 709675200,
Expand All @@ -248,7 +245,6 @@ func TestProcessEvent(t *testing.T) {

kubeASCheck := &KubeASCheck{
instance: &KubeASConfig{
Tags: []string{"test"},
FilteredEventType: []string{"ignored"},
},
CheckBase: core.NewCheckBase(kubernetesAPIServerCheckName),
Expand All @@ -264,7 +260,7 @@ func TestProcessEvent(t *testing.T) {
Title: "Events from the dca-789976f5d7-2ljx6 ReplicaSet",
Text: "%%% \n2 **Scheduled**: Successfully assigned dca-789976f5d7-2ljx6 to ip-10-0-0-54\n \n _New events emitted by the default-scheduler seen at " + time.Unix(709662600000, 0).String() + "_ \n\n %%%",
Priority: "normal",
Tags: []string{"test", "source_component:default-scheduler", "namespace:default"},
Tags: []string{"source_component:default-scheduler", "namespace:default"},
AggregationKey: "kubernetes_apiserver:e6417a7f-f566-11e7-9749-0e4863e1cbf4",
SourceTypeName: "kubernetes",
Ts: 709662600,
Expand Down
2 changes: 1 addition & 1 deletion pkg/collector/corechecks/cluster/kubernetes_openshift.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (k *KubeASCheck) retrieveOShiftClusterQuotas() ([]osq.ClusterResourceQuota,
// reportClusterQuotas reports metrics on OpenShift ClusterResourceQuota objects
func (k *KubeASCheck) reportClusterQuotas(quotas []osq.ClusterResourceQuota, sender aggregator.Sender) {
for _, quota := range quotas {
quotaTags := append(k.instance.Tags, fmt.Sprintf("clusterquota:%s", quota.Name))
quotaTags := []string{fmt.Sprintf("clusterquota:%s", quota.Name)}
remaining := computeQuotaRemaining(quota.Status.Total.Used, quota.Status.Total.Hard)

k.reportQuota(quota.Status.Total.Hard, "openshift.clusterquota", "limit", quotaTags, sender)
Expand Down
4 changes: 2 additions & 2 deletions pkg/collector/corechecks/cluster/kubernetes_openshift_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestReportClusterQuotas(t *testing.T) {
json.Unmarshal(raw, &list)
require.Len(t, list.Items, 1)

var instanceCfg = []byte("tags: [customtag]")
var instanceCfg = []byte("")
var initCfg = []byte("")
kubeASCheck := KubernetesASFactory().(*KubeASCheck)
err = kubeASCheck.Configure(instanceCfg, initCfg)
Expand All @@ -38,7 +38,7 @@ func TestReportClusterQuotas(t *testing.T) {
mocked.AssertNumberOfCalls(t, "Gauge", 9*3)

// Total
expectedTags := []string{"customtag", "clusterquota:multiproj-test"}
expectedTags := []string{"clusterquota:multiproj-test"}

mocked.AssertMetric(t, "Gauge", "openshift.clusterquota.cpu.limit", 3.0, "", expectedTags)
mocked.AssertMetric(t, "Gauge", "openshift.clusterquota.cpu.used", 0.6, "", expectedTags)
Expand Down
Loading

0 comments on commit 12cf59d

Please sign in to comment.