Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(processors/k8sattributesprocessor): support metadata enrichment based on multiple attributes #8465

Merged
merged 38 commits into from
Jul 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
36f85f7
feat(k8sattributesprocessor): modify pod association structure
Sep 30, 2021
997c346
feat(k8sprocessor): store data in cache based on pod association
Oct 1, 2021
1469116
doc(k8sattributesprocessor): update documentation
Oct 4, 2021
48b8c08
feat(k8sprocessor): treat host.name as every other attribute
Oct 4, 2021
6ea78b3
feat(k8sprocessor): simplify cache logic
Oct 4, 2021
b50b6d6
feat(k8sprocessor): fix pod removal
Oct 5, 2021
ba1df0a
feat(k8sprocessor): fix typo
Oct 5, 2021
c46c248
feat(k8sprocessor): minor cleanup
Oct 5, 2021
194b3e1
feat(k8sprocessor): minor cleanup
Oct 5, 2021
7ae1a77
feat(k8sattributesprocessor): ensure backward compatibility
Oct 13, 2021
977eafd
refactor(k8sattributesprocessor): fix after review
Oct 13, 2021
d66f8b1
feat(processors/k8sattributes): do not expose delimiter
Mar 15, 2022
716c729
Update processor/k8sattributesprocessor/config.go
sumo-drosiek Mar 15, 2022
3dd1abd
Update processor/k8sattributesprocessor/doc.go
sumo-drosiek Mar 15, 2022
99bff93
Update processor/k8sattributesprocessor/factory.go
sumo-drosiek Mar 15, 2022
ec23aae
feat(processors/k8sattributes): update deprecation warnings
Mar 15, 2022
e6dee0f
refactor(processors/k8sattributes): fix lint
Mar 15, 2022
daafea1
refactor(processors/k8sattributes): change PodIdentifier type
Mar 21, 2022
3c3f4a9
refactor(k8sattributesprocessor): change getIdentifiersFromAssoc's ne…
Mar 22, 2022
ee507b9
fix(k8sattributesprocessor): do not use source.Name for connection so…
Mar 22, 2022
c5e30fc
refactor(k8sattributesprocessor): update deprecation log
Mar 22, 2022
3e35a2f
feat(k8sattributesprocessor): reduce PodIdentifierMaxLength to 4
Mar 22, 2022
33f60a4
refactor(k8sattributesprocessor): rename GetPodIdentifierAttribute to…
Mar 22, 2022
ea77bf8
refactor(k8sattributesprocessor): extract resource_attribute and conn…
Mar 22, 2022
6321755
refactor(k8sattributesprocessor): do not use name for connection at all
Mar 22, 2022
875c218
docs(processor/k8sattributeprocessor): udpate
Apr 11, 2022
d8125a3
feat(processors/k8sattributes): fix after rebase
Apr 13, 2022
e44bf48
refactor(k8sattributesprocessor): clean up asso.Name related code
Apr 22, 2022
6ab9edc
refactor: refactor extractPodIDNoAssociations
Apr 22, 2022
f8eb928
refactor: return PodIdentifierAttribute for getConnectionIP
Apr 22, 2022
bbef85e
refactor: review
macdewee May 13, 2022
bdfc814
fix: validation of empty podIdentifier
macdewee May 13, 2022
0efc3a7
Update processor/k8sattributesprocessor/factory.go
sumo-drosiek Jun 28, 2022
fac8f7e
Update processor/k8sattributesprocessor/factory.go
sumo-drosiek Jul 4, 2022
54d1ccb
refactor: clean up
Jul 4, 2022
b80ffbe
chore(changelog): update
Jul 4, 2022
433caca
refactor: clean up
Jul 4, 2022
264ad8e
Update unreleased/drosiek-k8sprocessor.yaml
dmitryax Jul 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion processor/k8sattributesprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
package k8sattributesprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor"

import (
"fmt"

"go.opentelemetry.io/collector/config"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/kube"
)

// Config defines configuration for k8s attributes processor.
Expand Down Expand Up @@ -50,7 +53,17 @@ type Config struct {
}

func (cfg *Config) Validate() error {
return cfg.APIConfig.Validate()
if err := cfg.APIConfig.Validate(); err != nil {
return err
}

for _, assoc := range cfg.Association {
if len(assoc.Sources) > kube.PodIdentifierMaxLength {
return fmt.Errorf("too many association sources. limit is %v", kube.PodIdentifierMaxLength)
}
}

return nil
}

// ExtractConfig section allows specifying extraction rules to extract
Expand Down Expand Up @@ -214,13 +227,21 @@ type FieldFilterConfig struct {
// PodAssociationConfig contain single rule how to associate Pod metadata
// with logs, spans and metrics
type PodAssociationConfig struct {
// Deprecated: Sources should be used to provide From and Name.
// If this is set, From and Name are going to be used as Sources' ones
// From represents the source of the association.
// Allowed values are "connection" and "resource_attribute".
From string `mapstructure:"from"`

// Deprecated: Sources should be used to provide From and Name.
// If this is set, From and Name are going to be used as Sources' ones
// Name represents extracted key name.
// e.g. ip, pod_uid, k8s.pod.ip
Name string `mapstructure:"name"`

// List of pod association sources which should be taken
// to identify pod
Sources []PodAssociationSourceConfig `mapstructure:"sources"`
}

// ExcludeConfig represent a list of Pods to exclude
Expand All @@ -232,3 +253,13 @@ type ExcludeConfig struct {
type ExcludePodConfig struct {
Name string `mapstructure:"name"`
}

type PodAssociationSourceConfig struct {
// From represents the source of the association.
// Allowed values are "connection" and "resource_attribute".
From string `mapstructure:"from"`

// Name represents extracted key name.
// e.g. ip, pod_uid, k8s.pod.ip
Name string `mapstructure:"name"`
}
33 changes: 25 additions & 8 deletions processor/k8sattributesprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,21 +84,38 @@ func TestLoadConfig(t *testing.T) {
},
Association: []PodAssociationConfig{
{
From: "resource_attribute",
Name: "ip",
Sources: []PodAssociationSourceConfig{
{
From: "resource_attribute",
Name: "ip",
},
},
},
{
From: "resource_attribute",
Name: "k8s.pod.ip",
Sources: []PodAssociationSourceConfig{
{
From: "resource_attribute",
Name: "k8s.pod.ip",
},
},
},
{
From: "resource_attribute",
Name: "host.name",
Sources: []PodAssociationSourceConfig{
{
From: "resource_attribute",
Name: "host.name",
},
},
},
{
From: "connection",
Name: "ip",
Sources: []PodAssociationSourceConfig{
{
From: "connection",
Name: "ip",
},
},
},
// Deprecated way
{
From: "resource_attribute",
Name: "k8s.pod.uid",
Expand Down
39 changes: 23 additions & 16 deletions processor/k8sattributesprocessor/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,29 @@
// extracted metadata to the relevant spans, metrics and logs. The processor uses the kubernetes API to discover all pods
// running in a cluster, keeps a record of their IP addresses, pod UIDs and interesting metadata.
// The rules for associating the data passing through the processor (spans, metrics and logs) with specific Pod Metadata are configured via "pod_association" key.
// It represents a list of rules that are executed in the specified order until the first one is able to do the match.
// Each rule is specified as a pair of from (representing the rule type) and name (representing the extracted key name).
// It represents a list of associations that are executed in the specified order until the first one is able to do the match.
//
// Each association is specified as a list of sources of association.
// Sources represents list of rules. All rules are going to be executed and combination of result is going to be a pod metadata cache key.
pmm-sumo marked this conversation as resolved.
Show resolved Hide resolved
// In order to get an association applied, all the data defined in each source has to be successfully fetched from a log, trace or metric.
//
// Each sources rule is specified as a pair of `from` (representing the rule type) and `name` (representing the attribute name if `From` is set to `resource_attribute`).
// Following rule types are available:
// from: "resource_attribute" - allows to specify the attribute name to lookup up in the list of attributes of the received Resource. The specified attribute, if it is present, identifies the Pod that is represented by the Resource.
// (the value can contain either IP address or Pod UID)
// from: "connection" - takes the IP attribute from connection context (if available) and automatically
// associates it with "k8s.pod.ip" attribute
// from: "connection" - takes the IP attribute from connection context (if available)
// from: "resource_attribute" - allows to specify the attribute name to lookup up in the list of attributes of the received Resource.
// Semantic convention should be used for naming.
//
// Pod association configuration.
// pod_association:
// - from: resource_attribute
// name: ip
// - from: resource_attribute
// name: k8s.pod.ip
// - from: resource_attribute
// name: host.name
// - from: connection
// name: ip
// - from: resource_attribute
// name: k8s.pod.uid
// - sources:
// - from: resource_attribute
// name: k8s.pod.ip
// # below association matches for pair `k8s.pod.name` and `k8s.namespace.name`
// - sources:
// - from: resource_attribute
// name: k8s.pod.name
// - from: resource_attribute
// name: k8s.namespace.name
//
// If Pod association rules are not configured resources are associated with metadata only by connection's IP Address.
//
Expand All @@ -52,6 +56,9 @@
// - k8s.node.name
// Not all the attributes are guaranteed to be added.
//
// Only attribute names from `metadata` should be used for pod_association's `resource_attribute`,
// because empty or non-existing values will be ignored.
//
// The following container level attributes require additional attributes to identify a particular container in a pod:
// 1. Container spec attributes - will be set only if container identifying attribute `k8s.container.name` is set
// as a resource attribute (similar to all other attributes, pod has to be identified as well):
Expand Down
40 changes: 40 additions & 0 deletions processor/k8sattributesprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ func createKubernetesProcessor(
kp := &kubernetesprocessor{logger: params.Logger}

warnDeprecatedMetadataConfig(kp.logger, cfg)
warnDeprecatedPodAssociationConfig(kp.logger, cfg)

err := errWrongKeyConfig(cfg)
if err != nil {
Expand Down Expand Up @@ -252,3 +253,42 @@ func errWrongKeyConfig(cfg config.Processor) error {

return nil
}

func warnDeprecatedPodAssociationConfig(logger *zap.Logger, cfg config.Processor) {
oCfg := cfg.(*Config)
deprecated := ""
actual := ""
for _, assoc := range oCfg.Association {
if assoc.From == "" && assoc.Name == "" {
continue
}

deprecated += fmt.Sprintf(`
- from: %s`, assoc.From)
actual += fmt.Sprintf(`
- sources:
- from: %s`, assoc.From)

if assoc.Name != "" {
deprecated += fmt.Sprintf(`
name: %s`, assoc.Name)
}

if assoc.From != kube.ConnectionSource {
actual += fmt.Sprintf(`
name: %s`, assoc.Name)
}
}

if deprecated != "" {
logger.Warn(fmt.Sprintf(`Deprecated pod_association configuration detected. Please replace:

pod_association:%s

with

pod_association:%s

`, deprecated, actual))
}
}
107 changes: 82 additions & 25 deletions processor/k8sattributesprocessor/internal/kube/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ func (c *WatchClient) extractNamespaceAttributes(namespace *api_v1.Namespace) ma
return tags
}

func (c *WatchClient) addOrUpdatePod(pod *api_v1.Pod) {
func (c *WatchClient) podFromAPI(pod *api_v1.Pod) *Pod {
newPod := &Pod{
Name: pod.Name,
Namespace: pod.GetNamespace(),
Expand All @@ -387,41 +387,98 @@ func (c *WatchClient) addOrUpdatePod(pod *api_v1.Pod) {
}
}

return newPod
}

// getIdentifiersFromAssoc returns list of PodIdentifiers for given pod
func (c *WatchClient) getIdentifiersFromAssoc(pod *Pod) []PodIdentifier {
ids := []PodIdentifier{}
for _, assoc := range c.Associations {
ret := PodIdentifier{}
skip := false
for i, source := range assoc.Sources {
// If association configured to take IP address from connection
switch {
case source.From == ConnectionSource:
if pod.Address == "" {
skip = true
break
}
ret[i] = PodIdentifierAttributeFromSource(source, pod.Address)
case source.From == ResourceSource:
attr := ""
switch source.Name {
case conventions.AttributeK8SNamespaceName:
attr = pod.Namespace
case conventions.AttributeK8SPodName:
attr = pod.Name
case conventions.AttributeK8SPodUID:
attr = pod.PodUID
case conventions.AttributeHostName:
attr = pod.Address
default:
if v, ok := pod.Attributes[source.Name]; ok {
attr = v
}
}

if attr == "" {
skip = true
dmitryax marked this conversation as resolved.
Show resolved Hide resolved
break
}
ret[i] = PodIdentifierAttributeFromSource(source, attr)
}
}

if !skip {
ids = append(ids, ret)
}
}

// Ensure backward compatibility
if pod.PodUID != "" {
ids = append(ids, PodIdentifier{
PodIdentifierAttributeFromResourceAttribute(conventions.AttributeK8SPodUID, pod.PodUID),
})
}

if pod.Address != "" {
ids = append(ids, PodIdentifier{
PodIdentifierAttributeFromConnection(pod.Address),
})
}

return ids
}

func (c *WatchClient) addOrUpdatePod(pod *api_v1.Pod) {
newPod := c.podFromAPI(pod)

c.m.Lock()
defer c.m.Unlock()

if pod.UID != "" {
c.Pods[PodIdentifier(pod.UID)] = newPod
}
if pod.Status.PodIP != "" {
// compare initial scheduled timestamp for existing pod and new pod with same IP
// and only replace old pod if scheduled time of new pod is newer? This should fix
// the case where scheduler has assigned the same IP to a new pod but update event for
// the old pod came in later.
if p, ok := c.Pods[PodIdentifier(pod.Status.PodIP)]; ok {
if p.StartTime != nil && pod.Status.StartTime.Before(p.StartTime) {
for _, id := range c.getIdentifiersFromAssoc(newPod) {
// compare initial scheduled timestamp for existing pod and new pod with same identifier
// and only replace old pod if scheduled time of new pod is newer or equal.
// This should fix the case where scheduler has assigned the same attribtues (like IP address)
// to a new pod but update event for the old pod came in later.
if p, ok := c.Pods[id]; ok {
if p.StartTime != nil && !p.StartTime.Before(pod.Status.StartTime) {
return
}
}
c.Pods[PodIdentifier(pod.Status.PodIP)] = newPod
c.Pods[id] = newPod
}
}

func (c *WatchClient) forgetPod(pod *api_v1.Pod) {
c.m.RLock()
p, ok := c.GetPod(PodIdentifier(pod.Status.PodIP))
c.m.RUnlock()

if ok && p.Name == pod.Name {
c.appendDeleteQueue(PodIdentifier(pod.Status.PodIP), pod.Name)
}
podToRemove := c.podFromAPI(pod)
for _, id := range c.getIdentifiersFromAssoc(podToRemove) {
p, ok := c.GetPod(id)

c.m.RLock()
p, ok = c.GetPod(PodIdentifier(pod.UID))
c.m.RUnlock()

if ok && p.Name == pod.Name {
c.appendDeleteQueue(PodIdentifier(pod.UID), pod.Name)
if ok && p.Name == pod.Name {
c.appendDeleteQueue(id, pod.Name)
}
}
}

Expand Down
Loading