-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
dispatcher_configs.go
177 lines (149 loc) · 5.07 KB
/
dispatcher_configs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.
//go:build clusterchecks
// +build clusterchecks
package clusterchecks
import (
"github.com/DataDog/datadog-agent/pkg/autodiscovery/integration"
"github.com/DataDog/datadog-agent/pkg/clusteragent/clusterchecks/types"
"github.com/DataDog/datadog-agent/pkg/collector/check"
le "github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection/metrics"
)
// getAllConfigs returns all configurations known to the store, for reporting
func (d *dispatcher) getAllConfigs() ([]integration.Config, error) {
d.store.RLock()
defer d.store.RUnlock()
return makeConfigArray(d.store.digestToConfig), nil
}
func (d *dispatcher) getState() (types.StateResponse, error) {
d.store.RLock()
defer d.store.RUnlock()
response := types.StateResponse{
Warmup: !d.store.active,
Dangling: makeConfigArray(d.store.danglingConfigs),
}
for _, node := range d.store.nodes {
n := types.StateNodeResponse{
Name: node.name,
Configs: makeConfigArray(node.digestToConfig),
}
response.Nodes = append(response.Nodes, n)
}
return response, nil
}
func (d *dispatcher) addConfig(config integration.Config, targetNodeName string) {
d.store.Lock()
defer d.store.Unlock()
// Register config
digest := config.Digest()
d.store.digestToConfig[digest] = config
for _, instance := range config.Instances {
checkID := check.BuildID(config.Name, instance, config.InitConfig)
d.store.idToDigest[checkID] = digest
if targetNodeName != "" {
configsInfo.Set(1.0, targetNodeName, string(checkID), le.JoinLeaderValue)
}
}
// No target node specified: store in danglingConfigs
if targetNodeName == "" {
danglingConfigs.Inc(le.JoinLeaderValue)
d.store.danglingConfigs[digest] = config
return
}
currentNode, foundCurrent := d.store.getNodeStore(d.store.digestToNode[digest])
targetNode := d.store.getOrCreateNodeStore(targetNodeName, "")
// Dispatch to target node
targetNode.Lock()
targetNode.addConfig(config)
targetNode.Unlock()
d.store.digestToNode[digest] = targetNodeName
// Remove config from previous node if found
// We double-check the config actually changed nodes, to
// prevent de-scheduling the check we just scheduled.
// See https://github.com/DataDog/datadog-agent/pull/3023
if foundCurrent && currentNode != targetNode {
currentNode.Lock()
currentNode.removeConfig(digest)
currentNode.Unlock()
}
}
func (d *dispatcher) removeConfig(digest string) {
d.store.Lock()
defer d.store.Unlock()
node, found := d.store.getNodeStore(d.store.digestToNode[digest])
delete(d.store.digestToNode, digest)
delete(d.store.digestToConfig, digest)
delete(d.store.danglingConfigs, digest)
for k, v := range d.store.idToDigest {
if v == digest {
configsInfo.Delete(node.name, string(k), le.JoinLeaderValue)
delete(d.store.idToDigest, k)
}
}
// Remove from node configs if assigned
if found {
node.Lock()
node.removeConfig(digest)
node.Unlock()
}
}
// shouldDispatchDanling returns true if there are dangling configs
// and node registered, available for dispatching.
func (d *dispatcher) shouldDispatchDanling() bool {
d.store.RLock()
defer d.store.RUnlock()
if len(d.store.danglingConfigs) == 0 {
return false
}
if len(d.store.nodes) == 0 {
return false
}
return true
}
// retrieveAndClearDangling extracts dangling configs from the store
func (d *dispatcher) retrieveAndClearDangling() []integration.Config {
d.store.Lock()
defer d.store.Unlock()
configs := makeConfigArray(d.store.danglingConfigs)
d.store.clearDangling()
danglingConfigs.Set(0, le.JoinLeaderValue)
return configs
}
// patchConfiguration transforms the configuration from AD into a config
// ready to use by node agents. It does the following changes:
// - empty the ADIdentifiers array, to avoid node-agents detecting them as templates
// - clear the ClusterCheck boolean
// - add the empty_default_hostname option to all instances
// - inject the extra tags (including `cluster_name` if set) in all instances
func (d *dispatcher) patchConfiguration(in integration.Config) (integration.Config, error) {
out := in
out.ADIdentifiers = nil
out.ClusterCheck = false
// Deep copy the instances to avoid modifying the original
out.Instances = make([]integration.Data, len(in.Instances))
copy(out.Instances, in.Instances)
for i := range out.Instances {
err := out.Instances[i].SetField("empty_default_hostname", true)
if err != nil {
return in, err
}
// Inject extra tags if not empty
if len(d.extraTags) == 0 {
continue
}
err = out.Instances[i].MergeAdditionalTags(d.extraTags)
if err != nil {
return in, err
}
}
return out, nil
}
// getConfigAndDigest returns config and digest of a check by checkID
func (d *dispatcher) getConfigAndDigest(checkID string) (integration.Config, string) {
d.store.RLock()
defer d.store.RUnlock()
digest := d.store.idToDigest[check.ID(checkID)]
return d.store.digestToConfig[digest], digest
}