Skip to content

Commit

Permalink
auto instru: add rc provider (#15008)
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmed-mez authored Jan 12, 2023
1 parent 3734232 commit 9e41fde
Show file tree
Hide file tree
Showing 14 changed files with 419 additions and 191 deletions.
13 changes: 9 additions & 4 deletions cmd/cluster-agent/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,9 @@ func start(cmd *cobra.Command, args []string) error {
}

// Initialize remote configuration
var rcClient *remote.Client
if config.Datadog.GetBool("remote_configuration.enabled") {
rcClient, err := initializeRemoteConfig(mainCtx)
rcClient, err = initializeRemoteConfig(mainCtx)
if err != nil {
log.Errorf("Failed to start remote-configuration: %v", err)
} else {
Expand Down Expand Up @@ -354,10 +355,14 @@ func start(cmd *cobra.Command, args []string) error {
if config.Datadog.GetBool("admission_controller.auto_instrumentation.patcher.enabled") {
patchCtx := admissionpatch.ControllerContext{
IsLeaderFunc: le.IsLeader,
Client: apiCl.Cl,
K8sClient: apiCl.Cl,
RcClient: rcClient,
ClusterName: clusterName,
StopCh: stopCh,
}
admissionpatch.StartControllers(patchCtx, clusterName)
if err := admissionpatch.StartControllers(patchCtx); err != nil {
log.Errorf("Cannot start auto instrumentation patcher: %v", err)
}
} else {
log.Info("Auto instrumentation patcher is disabled")
}
Expand Down Expand Up @@ -454,7 +459,7 @@ func initializeRemoteConfig(ctx context.Context) (*remote.Client, error) {
return nil, fmt.Errorf("unable to start remote-config service: %w", err)
}

rcClient, err := remote.NewClient("cluster-agent", configService, version.AgentVersion, []data.Product{}, time.Second*5)
rcClient, err := remote.NewClient("cluster-agent", configService, version.AgentVersion, []data.Product{data.ProductAPMTracing}, time.Second*5)
if err != nil {
return nil, fmt.Errorf("unable to create local remote-config client: %w", err)
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/clusteragent/admission/common/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,10 @@ const (

// LibConfigV1AnnotKeyFormat is the format of the library config annotation
LibConfigV1AnnotKeyFormat = "admission.datadoghq.com/%s-lib.config.v1"

// RcIDAnnotKey is the format of the RC ID annotation
RcIDAnnotKeyFormat = "admission.datadoghq.com/%s-lib.rc.id"

// RcRevisionAnnotKey is the format of the RC revision annotation
RcRevisionAnnotKeyFormat = "admission.datadoghq.com/%s-lib.rc.rev"
)
45 changes: 36 additions & 9 deletions pkg/clusteragent/admission/common/lib_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,43 @@ import (

// LibConfig holds the APM library configuration
type LibConfig struct {
Version int `yaml:"version,omitempty" json:"version,omitempty"` // config schema version, not config version
ServiceLanguage string `yaml:"service_language,omitempty" json:"service_language,omitempty"`
Language string `yaml:"library_language" json:"library_language"`
Version string `yaml:"library_version" json:"library_version"`

Tracing *bool `yaml:"tracing_enabled,omitempty" json:"tracing_enabled,omitempty"`
LogInjection *bool `yaml:"log_injection_enabled,omitempty" json:"log_injection_enabled,omitempty"`
HealthMetrics *bool `yaml:"health_metrics_enabled,omitempty" json:"health_metrics_enabled,omitempty"`
RuntimeMetrics *bool `yaml:"runtime_metrics_enabled,omitempty" json:"runtime_metrics_enabled,omitempty"`
TracingSamplingRate *float64 `yaml:"tracing_sampling_rate,omitempty" json:"tracing_sampling_rate,omitempty"`
TracingRateLimit *int `yaml:"tracing_rate_limit,omitempty" json:"tracing_rate_limit,omitempty"`
TracingTags []string `yaml:"tracing_tags,omitempty" json:"tracing_tags,omitempty"`
ServiceName *string `yaml:"service_name,omitempty" json:"service_name,omitempty"`
Env *string `yaml:"env,omitempty" json:"env,omitempty"`

Tracing *bool `yaml:"tracing_enabled,omitempty" json:"tracing_enabled,omitempty"`
LogInjection *bool `yaml:"log_injection_enabled,omitempty" json:"log_injection_enabled,omitempty"`
HealthMetrics *bool `yaml:"health_metrics_enabled,omitempty" json:"health_metrics_enabled,omitempty"`
RuntimeMetrics *bool `yaml:"runtime_metrics_enabled,omitempty" json:"runtime_metrics_enabled,omitempty"`

TracingSamplingRate *float64 `yaml:"tracing_sampling_rate" json:"tracing_sampling_rate,omitempty"`
TracingRateLimit *int `yaml:"tracing_rate_limit" json:"tracing_rate_limit,omitempty"`
TracingTags []string `yaml:"tracing_tags" json:"tracing_tags,omitempty"`

// TODO: Implement the conversion of the following parameters in the ToEnvs() method
TracingServiceMapping []TracingServiceMapEntry `yaml:"tracing_service_mapping" json:"tracing_service_mapping,omitempty"`
TracingAgentTimeout *int `yaml:"tracing_agent_timeout" json:"tracing_agent_timeout,omitempty"`
TracingHeaderTags []TracingHeaderTagEntry `yaml:"tracing_header_tags" json:"tracing_header_tags,omitempty"`
TracingPartialFlushMinSpans *int `yaml:"tracing_partial_flush_min_spans" json:"tracing_partial_flush_min_spans,omitempty"`
TracingDebug *bool `yaml:"tracing_debug" json:"tracing_debug,omitempty"`
TracingLogLevel *string `yaml:"tracing_log_level" json:"tracing_log_level,omitempty"`
TracingMethods []string `yaml:"tracing_methods" json:"tracing_methods,omitempty"`
TracingPropagationStyleInject []string `yaml:"tracing_propagation_style_inject" json:"tracing_propagation_style_inject,omitempty"`
TracingPropagationStyleExtract []string `yaml:"tracing_propagation_style_extract" json:"tracing_propagation_style_extract,omitempty"`
}

// TracingServiceMapEntry holds service mapping config
type TracingServiceMapEntry struct {
FromKey string `yaml:"from_key" json:"from_key"`
ToName string `yaml:"to_name" json:"to_name"`
}

// TracingHeaderTagEntry holds header tags config
type TracingHeaderTagEntry struct {
Header string `yaml:"header" json:"header"`
TagName string `yaml:"tag_name" json:"tag_name"`
}

// ToEnvs converts the config fields into environment variables
Expand Down
104 changes: 104 additions & 0 deletions pkg/clusteragent/admission/patch/file_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

//go:build kubeapiserver
// +build kubeapiserver

package patch

import (
"encoding/json"
"os"
"time"

"github.com/DataDog/datadog-agent/pkg/util/log"
)

// filePatchProvider this is a stub and will be used for e2e testing only
type filePatchProvider struct {
file string
pollInterval time.Duration
isLeader func() bool
subscribers map[TargetObjKind]chan PatchRequest
lastSuccessfulRefresh time.Time
clusterName string
}

var _ patchProvider = &filePatchProvider{}

func newfileProvider(isLeaderFunc func() bool, clusterName string) *filePatchProvider {
return &filePatchProvider{
file: "/etc/datadog-agent/auto-instru.json",
pollInterval: 15 * time.Second,
isLeader: isLeaderFunc,
subscribers: make(map[TargetObjKind]chan PatchRequest),
clusterName: clusterName,
}
}

func (fpp *filePatchProvider) subscribe(kind TargetObjKind) chan PatchRequest {
ch := make(chan PatchRequest, 10)
fpp.subscribers[kind] = ch
return ch
}

func (fpp *filePatchProvider) start(stopCh <-chan struct{}) {
ticker := time.NewTicker(fpp.pollInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := fpp.refresh(); err != nil {
log.Errorf(err.Error())
}
case <-stopCh:
log.Info("Shutting down patch provider")
return
}
}
}

func (fpp *filePatchProvider) refresh() error {
if !fpp.isLeader() {
log.Debug("Not leader, skipping")
return nil
}
requests, err := fpp.poll()
if err != nil {
return err
}
log.Debugf("Got %d new patch requests", len(requests))
for _, req := range requests {
if err := req.Validate(fpp.clusterName); err != nil {
log.Errorf("Skipping invalid patch request: %s", err)
continue
}
if ch, found := fpp.subscribers[req.K8sTarget.Kind]; found {
log.Infof("Publishing patch requests for target %s", req.K8sTarget)
ch <- req
}
}
fpp.lastSuccessfulRefresh = time.Now()
return nil
}

func (fpp *filePatchProvider) poll() ([]PatchRequest, error) {
info, err := os.Stat(fpp.file)
if err != nil {
return nil, err
}
modTime := info.ModTime()
if fpp.lastSuccessfulRefresh.After(modTime) {
log.Debugf("File %q hasn't changed since the last Successful refresh at %v", fpp.file, fpp.lastSuccessfulRefresh)
return []PatchRequest{}, nil
}
content, err := os.ReadFile(fpp.file)
if err != nil {
return nil, err
}
var requests []PatchRequest
err = json.Unmarshal(content, &requests)
return requests, err
}
59 changes: 26 additions & 33 deletions pkg/clusteragent/admission/patch/patch_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,34 +27,43 @@ const (
type Action string

const (
// ApplyConfig instructs the patcher to apply the patch request
ApplyConfig Action = "apply"
// DisableInjection instructs the patcher to disable library injection
DisableInjection Action = "disable"
// EnableConfig instructs the patcher to apply the patch request
EnableConfig Action = "enable"
// DisableConfig instructs the patcher to disable library injection
DisableConfig Action = "disable"
)

// PatchRequest holds the required data to target a k8s object and apply library configuration
type PatchRequest struct {
Action Action `yaml:"action"`
K8sTarget K8sTarget `yaml:"k8s_target"`
LibID LibID `yaml:"lib_id"`
LibConfig common.LibConfig `yaml:"lib_config"`
ID string `json:"id"`
Revision int64 `json:"revision"`
SchemaVersion string `json:"schema_version"`
Action Action `json:"action"`

// Library parameters
LibConfig common.LibConfig `json:"lib_config"`

// Target k8s object
K8sTarget K8sTarget `json:"k8s_target"`
}

// Validate returns whether a patch request is applicable
func (pr PatchRequest) Validate(clusterName string) error {
if err := pr.K8sTarget.validate(clusterName); err != nil {
return err
if pr.LibConfig.Language == "" {
return errors.New("library language is empty")
}
return pr.LibID.validate()
if pr.LibConfig.Version == "" {
return errors.New("library version is empty")
}
return pr.K8sTarget.validate(clusterName)
}

// K8sTarget represent the targetet k8s object
type K8sTarget struct {
ClusterName string `yaml:"cluster_name"`
Kind TargetObjKind `yaml:"kind"`
Name string `yaml:"name"`
Namespace string `yaml:"namespace"`
Cluster string `json:"cluster"`
Kind TargetObjKind `json:"kind"`
Name string `json:"name"`
Namespace string `json:"namespace"`
}

// String returns a string representation of the targeted k8s object
Expand All @@ -63,8 +72,8 @@ func (k K8sTarget) String() string {
}

func (k K8sTarget) validate(clusterName string) error {
if k.ClusterName != clusterName {
return fmt.Errorf("target cluster name %q is different from the local one %q", k.ClusterName, clusterName)
if k.Cluster != clusterName {
return fmt.Errorf("target cluster name %q is different from the local one %q", k.Cluster, clusterName)
}
if k.Name == "" {
return errors.New("target object name is empty")
Expand All @@ -79,19 +88,3 @@ func (k K8sTarget) validate(clusterName string) error {
}
return nil
}

// LibID hold the minimal information to inject a library
type LibID struct {
Language string `yaml:"language"`
Version string `yaml:"version"`
}

func (li LibID) validate() error {
if li.Language == "" {
return errors.New("library language is empty")
}
if li.Version == "" {
return errors.New("library version is empty")
}
return nil
}
Loading

0 comments on commit 9e41fde

Please sign in to comment.