Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[corechecks] Support custom tags in every corechecks #2723

Merged
merged 18 commits into from
Jan 23, 2019
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
fdc706f
[aggregator] store custom tags in a checkSender.
remeh Nov 29, 2018
65c2dc3
[aggregator] unit tests for custom tags in `checkSender`.
remeh Nov 29, 2018
bb211cb
[corechecks] set custom tags read in check config to the sender.
remeh Nov 29, 2018
c9dbc2c
[corechecks] unify the usage of checks custom tags.
remeh Nov 29, 2018
db03f64
[corechecks] comment about the tags configuration field while impleme…
remeh Nov 29, 2018
d2a6531
[corechecks] missed some manually added custom tags in Docker check.
remeh Nov 29, 2018
3218a94
[corechecks] missed some manually added custom tags in kube & snmp un…
remeh Nov 29, 2018
2882804
[corechecks] missed some manually added custom tags in network (linux…
remeh Nov 29, 2018
e8087dd
[corechecks] custom tags unit/inte tests changes due to new implement…
remeh Nov 30, 2018
8313dc0
[docs] add `tags` field into agent/config.md
remeh Nov 30, 2018
b34b8da
[docs] revert a typo fix to avoid merge conflict.
remeh Nov 30, 2018
a379b0b
Add release note entry.
remeh Dec 3, 2018
f34cdda
Merge branch 'master' into remeh/corechecks-custom-tags
remeh Jan 8, 2019
cb72b2b
[corechecks] remove some manually added custom tags in containerd check.
remeh Jan 8, 2019
1bd099a
[corechecks] no need to checks for tags added in the configuration du…
remeh Jan 8, 2019
17c2a76
Restore instance tags in integration tests.
remeh Jan 14, 2019
81b5621
Review feedbacks on comments and improvements.
remeh Jan 14, 2019
dd9fef4
Revert "Restore instance tags in integration tests."
remeh Jan 15, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/agent/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ advanced options in the `instance` section:
that should run less frequently than the default 15 seconds interval
* `empty_default_hostname`: submit metrics, events and service checks with no
hostname when set to `true`
* `tags`: send custom tags in addition to the tags sent by the check.

## Removed options

Expand Down
5 changes: 5 additions & 0 deletions pkg/aggregator/mocksender/mocked_methods.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ func (m *MockSender) Commit() {
m.Called()
}

//SetCheckCustomTags enables the set of check custom tags mock call.
func (m *MockSender) SetCheckCustomTags(tags []string) {
m.Called(tags)
}

//GetMetricStats enables the get metric stats mock call.
func (m *MockSender) GetMetricStats() map[string]int64 {
m.Called()
Expand Down
1 change: 1 addition & 0 deletions pkg/aggregator/mocksender/mocksender.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func (m *MockSender) SetupAcceptAll() {
m.On("Event", mock.AnythingOfType("metrics.Event")).Return()
m.On("GetMetricStats", mock.AnythingOfType("map[string]int64")).Return()
m.On("DisableDefaultHostname", mock.AnythingOfType("bool")).Return()
m.On("SetCheckCustomTags", mock.AnythingOfType("[]string")).Return()
m.On("Commit").Return()
}

Expand Down
14 changes: 13 additions & 1 deletion pkg/aggregator/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Sender interface {
Event(e metrics.Event)
GetMetricStats() map[string]int64
DisableDefaultHostname(disable bool)
SetCheckCustomTags(tags []string)
}

type metricStats struct {
Expand All @@ -61,6 +62,7 @@ type checkSender struct {
smsOut chan<- senderMetricSample
serviceCheckOut chan<- metrics.ServiceCheck
eventOut chan<- metrics.Event
checkTags []string
}

type senderMetricSample struct {
Expand Down Expand Up @@ -152,6 +154,12 @@ func (s *checkSender) DisableDefaultHostname(disable bool) {
s.defaultHostnameDisabled = disable
}

// SetCheckCustomTags stores the tags set in the check configuration file.
// They will be appended to each send (metric, event and service)
func (s *checkSender) SetCheckCustomTags(tags []string) {
s.checkTags = tags
}

// Commit commits the metric samples that were added during a check run
// Should be called at the end of every check run
func (s *checkSender) Commit() {
Expand Down Expand Up @@ -191,6 +199,8 @@ func (s *checkSender) SendRawMetricSample(sample *metrics.MetricSample) {
}

func (s *checkSender) sendMetricSample(metric string, value float64, hostname string, tags []string, mType metrics.MetricType) {
tags = append(tags, s.checkTags...)

log.Trace(mType.String(), " sample: ", metric, ": ", value, " for hostname: ", hostname, " tags: ", tags)

metricSample := &metrics.MetricSample{
Expand Down Expand Up @@ -267,7 +277,7 @@ func (s *checkSender) ServiceCheck(checkName string, status metrics.ServiceCheck
Status: status,
Host: hostname,
Ts: time.Now().Unix(),
Tags: tags,
Tags: append(tags, s.checkTags...),
Message: message,
}

Expand All @@ -284,6 +294,8 @@ func (s *checkSender) ServiceCheck(checkName string, status metrics.ServiceCheck

// Event submits an event
func (s *checkSender) Event(e metrics.Event) {
e.Tags = append(e.Tags, s.checkTags...)

log.Trace("Event submitted: ", e.Title, " for hostname: ", e.Host, " tags: ", e.Tags)

if e.Host == "" && !s.defaultHostnameDisabled {
Expand Down
120 changes: 120 additions & 0 deletions pkg/aggregator/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"sync"
"testing"
"time"

// 3p

Expand Down Expand Up @@ -133,6 +134,125 @@ func TestGetSenderDefaultHostname(t *testing.T) {
assert.Equal(t, false, checksender.defaultHostnameDisabled)
}

func TestGetSenderAddCheckCustomTagsMetrics(t *testing.T) {
resetAggregator()
InitAggregator(nil, "testhostname", "")

senderMetricSampleChan := make(chan senderMetricSample, 10)
serviceCheckChan := make(chan metrics.ServiceCheck, 10)
eventChan := make(chan metrics.Event, 10)
checkSender := newCheckSender(checkID1, "", senderMetricSampleChan, serviceCheckChan, eventChan)

// no custom tags
checkSender.sendMetricSample("metric.test", 42.0, "testhostname", nil, metrics.CounterType)
sms := <-senderMetricSampleChan
assert.Nil(t, sms.metricSample.Tags)

// only tags added by the check
checkTags := []string{"check:tag1", "check:tag2"}
checkSender.sendMetricSample("metric.test", 42.0, "testhostname", checkTags, metrics.CounterType)
sms = <-senderMetricSampleChan
assert.Equal(t, checkTags, sms.metricSample.Tags)

// simulate tags in the configuration file
customTags := []string{"custom:tag1", "custom:tag2"}
checkSender.SetCheckCustomTags(customTags)
assert.Len(t, checkSender.checkTags, 2)

// only tags coming from the configuration file
checkSender.sendMetricSample("metric.test", 42.0, "testhostname", nil, metrics.CounterType)
sms = <-senderMetricSampleChan
assert.Equal(t, customTags, sms.metricSample.Tags)

// tags added by the check + tags coming from the configuration file
checkSender.sendMetricSample("metric.test", 42.0, "testhostname", checkTags, metrics.CounterType)
sms = <-senderMetricSampleChan
assert.Equal(t, append(checkTags, customTags...), sms.metricSample.Tags)
}

func TestGetSenderAddCheckCustomTagsService(t *testing.T) {
resetAggregator()
InitAggregator(nil, "testhostname", "")

senderMetricSampleChan := make(chan senderMetricSample, 10)
serviceCheckChan := make(chan metrics.ServiceCheck, 10)
eventChan := make(chan metrics.Event, 10)
checkSender := newCheckSender(checkID1, "", senderMetricSampleChan, serviceCheckChan, eventChan)

// no custom tags
checkSender.ServiceCheck("test", metrics.ServiceCheckOK, "testhostname", nil, "test message")
sc := <-serviceCheckChan
assert.Nil(t, sc.Tags)

// only tags added by the check
checkTags := []string{"check:tag1", "check:tag2"}
checkSender.ServiceCheck("test", metrics.ServiceCheckOK, "testhostname", checkTags, "test message")
sc = <-serviceCheckChan
assert.Equal(t, checkTags, sc.Tags)

// simulate tags in the configuration file
customTags := []string{"custom:tag1", "custom:tag2"}
checkSender.SetCheckCustomTags(customTags)
assert.Len(t, checkSender.checkTags, 2)

// only tags coming from the configuration file
checkSender.ServiceCheck("test", metrics.ServiceCheckOK, "testhostname", nil, "test message")
sc = <-serviceCheckChan
assert.Equal(t, customTags, sc.Tags)

// tags added by the check + tags coming from the configuration file
checkSender.ServiceCheck("test", metrics.ServiceCheckOK, "testhostname", checkTags, "test message")
sc = <-serviceCheckChan
assert.Equal(t, append(checkTags, customTags...), sc.Tags)
}

func TestGetSenderAddCheckCustomTagsEvent(t *testing.T) {
resetAggregator()
InitAggregator(nil, "testhostname", "")

senderMetricSampleChan := make(chan senderMetricSample, 10)
serviceCheckChan := make(chan metrics.ServiceCheck, 10)
eventChan := make(chan metrics.Event, 10)
checkSender := newCheckSender(checkID1, "", senderMetricSampleChan, serviceCheckChan, eventChan)

event := metrics.Event{
Title: "title",
Host: "testhostname",
Ts: time.Now().Unix(),
Text: "text",
Tags: nil,
}

// no custom tags
checkSender.Event(event)
e := <-eventChan
assert.Nil(t, e.Tags)

// only tags added by the check
checkTags := []string{"check:tag1", "check:tag2"}
event.Tags = checkTags
checkSender.Event(event)
e = <-eventChan
assert.Equal(t, checkTags, e.Tags)

// simulate tags in the configuration file
customTags := []string{"custom:tag1", "custom:tag2"}
checkSender.SetCheckCustomTags(customTags)
assert.Len(t, checkSender.checkTags, 2)

// only tags coming from the configuration file
event.Tags = nil
checkSender.Event(event)
e = <-eventChan
assert.Equal(t, customTags, e.Tags)

// tags added by the check + tags coming from the configuration file
event.Tags = checkTags
checkSender.Event(event)
e = <-eventChan
assert.Equal(t, append(checkTags, customTags...), e.Tags)
}

func TestCheckSenderInterface(t *testing.T) {
senderMetricSampleChan := make(chan senderMetricSample, 10)
serviceCheckChan := make(chan metrics.ServiceCheck, 10)
Expand Down
5 changes: 3 additions & 2 deletions pkg/autodiscovery/integration/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ type Config struct {

// CommonInstanceConfig holds the reserved fields for the yaml instance data
type CommonInstanceConfig struct {
MinCollectionInterval int `yaml:"min_collection_interval"`
EmptyDefaultHostname bool `yaml:"empty_default_hostname"`
MinCollectionInterval int `yaml:"min_collection_interval"`
EmptyDefaultHostname bool `yaml:"empty_default_hostname"`
Tags []string `yaml:"tags"`
}

// Equal determines whether the passed config is the same
Expand Down
15 changes: 15 additions & 0 deletions pkg/collector/corechecks/checkbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ import (
// Integration warnings are handled via the Warn and Warnf methods
// that forward the warning to the logger and send the warning to
// the collector for display in the status page and the web UI.
//
// If custom tags are set in the check configuration file,
xvello marked this conversation as resolved.
Show resolved Hide resolved
// they will be automatically appended to each send done by this
// check.
type CheckBase struct {
checkName string
checkID check.ID
Expand Down Expand Up @@ -84,6 +88,17 @@ func (c *CheckBase) CommonConfigure(instance integration.Data) error {
}
s.DisableDefaultHostname(true)
}

// Set custom tags configured for this check
if commonOptions.Tags != nil && len(commonOptions.Tags) > 0 {
xvello marked this conversation as resolved.
Show resolved Hide resolved
s, err := aggregator.GetSender(c.checkID)
if err != nil {
log.Errorf("failed to retrieve a sender for check %s: %s", string(c.ID()), err)
return err
}
s.SetCheckCustomTags(commonOptions.Tags)
}

return nil
}

Expand Down
4 changes: 1 addition & 3 deletions pkg/collector/corechecks/cluster/kubernetes_apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ const (

// KubeASConfig is the config of the API server.
type KubeASConfig struct {
Tags []string `yaml:"tags"`
CollectEvent bool `yaml:"collect_events"`
CollectOShiftQuotas bool `yaml:"collect_openshift_clusterquotas"`
FilteredEventType []string `yaml:"filtered_event_types"`
Expand Down Expand Up @@ -279,7 +278,7 @@ func (k *KubeASCheck) parseComponentStatus(sender aggregator.Sender, componentsS
log.Debug("API Server component's structure is not expected")
continue
}
tagComp := append(k.instance.Tags, fmt.Sprintf("component:%s", component.Name))
tagComp := []string{fmt.Sprintf("component:%s", component.Name)}
for _, condition := range component.Conditions {
status_check := metrics.ServiceCheckUnknown

Expand Down Expand Up @@ -341,7 +340,6 @@ ITER_EVENTS:
k.Warnf("Error while formatting bundled events, %s. Not submitting", err.Error())
continue
}
datadogEv.Tags = append(datadogEv.Tags, k.instance.Tags...)
sender.Event(datadogEv)
}
return nil
Expand Down
24 changes: 10 additions & 14 deletions pkg/collector/corechecks/cluster/kubernetes_apiserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,33 +89,31 @@ func TestParseComponentStatus(t *testing.T) {

// FIXME: use the factory instead
kubeASCheck := &KubeASCheck{
instance: &KubeASConfig{
Tags: []string{"test"},
},
instance: &KubeASConfig{},
CheckBase: core.NewCheckBase(kubernetesAPIServerCheckName),
KubeAPIServerHostname: "hostname",
}

mocked := mocksender.NewMockSender(kubeASCheck.ID())
mocked.On("ServiceCheck", "kube_apiserver_controlplane.up", metrics.ServiceCheckOK, "hostname", []string{"test", "component:Zookeeper"}, "")
mocked.On("ServiceCheck", "kube_apiserver_controlplane.up", metrics.ServiceCheckOK, "hostname", []string{"component:Zookeeper"}, "")
kubeASCheck.parseComponentStatus(mocked, expected)

mocked.AssertNumberOfCalls(t, "ServiceCheck", 1)
mocked.AssertServiceCheck(t, "kube_apiserver_controlplane.up", metrics.ServiceCheckOK, "hostname", []string{"test", "component:Zookeeper"}, "")
mocked.AssertServiceCheck(t, "kube_apiserver_controlplane.up", metrics.ServiceCheckOK, "hostname", []string{"component:Zookeeper"}, "")

err := kubeASCheck.parseComponentStatus(mocked, unExpected)
assert.EqualError(t, err, "metadata structure has changed. Not collecting API Server's Components status")
mocked.AssertNotCalled(t, "ServiceCheck", "kube_apiserver_controlplane.up")

mocked.On("ServiceCheck", "kube_apiserver_controlplane.up", metrics.ServiceCheckCritical, "hostname", []string{"test", "component:ETCD"}, "")
mocked.On("ServiceCheck", "kube_apiserver_controlplane.up", metrics.ServiceCheckCritical, "hostname", []string{"component:ETCD"}, "")
kubeASCheck.parseComponentStatus(mocked, unHealthy)
mocked.AssertNumberOfCalls(t, "ServiceCheck", 2)
mocked.AssertServiceCheck(t, "kube_apiserver_controlplane.up", metrics.ServiceCheckCritical, "hostname", []string{"test", "component:ETCD"}, "")
mocked.AssertServiceCheck(t, "kube_apiserver_controlplane.up", metrics.ServiceCheckCritical, "hostname", []string{"component:ETCD"}, "")

mocked.On("ServiceCheck", "kube_apiserver_controlplane.up", metrics.ServiceCheckUnknown, "hostname", []string{"test", "component:DCA"}, "")
mocked.On("ServiceCheck", "kube_apiserver_controlplane.up", metrics.ServiceCheckUnknown, "hostname", []string{"component:DCA"}, "")
kubeASCheck.parseComponentStatus(mocked, unknown)
mocked.AssertNumberOfCalls(t, "ServiceCheck", 3)
mocked.AssertServiceCheck(t, "kube_apiserver_controlplane.up", metrics.ServiceCheckUnknown, "hostname", []string{"test", "component:DCA"}, "")
mocked.AssertServiceCheck(t, "kube_apiserver_controlplane.up", metrics.ServiceCheckUnknown, "hostname", []string{"component:DCA"}, "")

empty_resp := kubeASCheck.parseComponentStatus(mocked, empty)
assert.Nil(t, empty_resp, "metadata structure has changed. Not collecting API Server's Components status")
Expand Down Expand Up @@ -159,7 +157,6 @@ func TestProcessBundledEvents(t *testing.T) {

kubeASCheck := &KubeASCheck{
instance: &KubeASConfig{
Tags: []string{"test"},
FilteredEventType: []string{"ignored"},
},
CheckBase: core.NewCheckBase(kubernetesAPIServerCheckName),
Expand Down Expand Up @@ -195,7 +192,7 @@ func TestProcessBundledEvents(t *testing.T) {
Title: "Events from the machine-blue Node",
Text: "%%% \n30 **MissingClusterDNS**: MountVolume.SetUp succeeded\n \n _Events emitted by the kubelet seen at " + time.Unix(709675200, 0).String() + "_ \n\n %%%",
Priority: "normal",
Tags: []string{"test", "namespace:default", "source_component:kubelet"},
Tags: []string{"namespace:default", "source_component:kubelet"},
AggregationKey: "kubernetes_apiserver:e63e74fa-f566-11e7-9749-0e4863e1cbf4",
SourceTypeName: "kubernetes",
Ts: 709675200,
Expand All @@ -222,7 +219,7 @@ func TestProcessBundledEvents(t *testing.T) {
Title: "Events from the machine-blue Node",
Text: "%%% \n30 **MissingClusterDNS**: MountVolume.SetUp succeeded\n \n _Events emitted by the kubelet seen at " + time.Unix(709675200, 0).String() + "_ \n\n %%%",
Priority: "normal",
Tags: []string{"test", "namespace:default", "source_component:kubelet"},
Tags: []string{"namespace:default", "source_component:kubelet"},
AggregationKey: "kubernetes_apiserver:e63e74fa-f566-11e7-9749-0e4863e1cbf4",
SourceTypeName: "kubernetes",
Ts: 709675200,
Expand All @@ -247,7 +244,6 @@ func TestProcessEvent(t *testing.T) {

kubeASCheck := &KubeASCheck{
instance: &KubeASConfig{
Tags: []string{"test"},
FilteredEventType: []string{"ignored"},
},
CheckBase: core.NewCheckBase(kubernetesAPIServerCheckName),
Expand All @@ -263,7 +259,7 @@ func TestProcessEvent(t *testing.T) {
Title: "Events from the dca-789976f5d7-2ljx6 ReplicaSet",
Text: "%%% \n2 **Scheduled**: Successfully assigned dca-789976f5d7-2ljx6 to ip-10-0-0-54\n \n _New events emitted by the default-scheduler seen at " + time.Unix(709662600000, 0).String() + "_ \n\n %%%",
Priority: "normal",
Tags: []string{"test", "source_component:default-scheduler", "namespace:default"},
Tags: []string{"source_component:default-scheduler", "namespace:default"},
AggregationKey: "kubernetes_apiserver:e6417a7f-f566-11e7-9749-0e4863e1cbf4",
SourceTypeName: "kubernetes",
Ts: 709662600,
Expand Down
2 changes: 1 addition & 1 deletion pkg/collector/corechecks/cluster/kubernetes_openshift.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (k *KubeASCheck) retrieveOShiftClusterQuotas() ([]osq.ClusterResourceQuota,
// reportClusterQuotas reports metrics on OpenShift ClusterResourceQuota objects
func (k *KubeASCheck) reportClusterQuotas(quotas []osq.ClusterResourceQuota, sender aggregator.Sender) {
for _, quota := range quotas {
quotaTags := append(k.instance.Tags, fmt.Sprintf("clusterquota:%s", quota.Name))
quotaTags := []string{fmt.Sprintf("clusterquota:%s", quota.Name)}
remaining := computeQuotaRemaining(quota.Status.Total.Used, quota.Status.Total.Hard)

k.reportQuota(quota.Status.Total.Hard, "openshift.clusterquota", "limit", quotaTags, sender)
Expand Down
4 changes: 2 additions & 2 deletions pkg/collector/corechecks/cluster/kubernetes_openshift_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestReportClusterQuotas(t *testing.T) {
json.Unmarshal(raw, &list)
require.Len(t, list.Items, 1)

var instanceCfg = []byte("tags: [customtag]")
var instanceCfg = []byte("")
var initCfg = []byte("")
kubeASCheck := KubernetesASFactory().(*KubeASCheck)
err = kubeASCheck.Configure(instanceCfg, initCfg)
Expand All @@ -38,7 +38,7 @@ func TestReportClusterQuotas(t *testing.T) {
mocked.AssertNumberOfCalls(t, "Gauge", 9*3)

// Total
expectedTags := []string{"customtag", "clusterquota:multiproj-test"}
expectedTags := []string{"clusterquota:multiproj-test"}

mocked.AssertMetric(t, "Gauge", "openshift.clusterquota.cpu.limit", 3.0, "", expectedTags)
mocked.AssertMetric(t, "Gauge", "openshift.clusterquota.cpu.used", 0.6, "", expectedTags)
Expand Down
Loading