From 54957e1350f0ad4b8cbc40e593359a31d2815324 Mon Sep 17 00:00:00 2001 From: Gustavo Caso Date: Tue, 15 Oct 2024 13:22:50 +0200 Subject: [PATCH] connect checks agent and agent kernel to schedule python checks --- cmd/checks-agent/subcommands/start/command.go | 97 ++++++++++++++++++- comp/core/autodiscovery/proto/proto.go | 93 ++++++++++++++++++ comp/core/autodiscovery/server/proto.go | 52 ---------- comp/core/autodiscovery/server/server.go | 5 +- comp/logs/agent/agentimpl/agent.go | 48 +-------- 5 files changed, 196 insertions(+), 99 deletions(-) create mode 100644 comp/core/autodiscovery/proto/proto.go delete mode 100644 comp/core/autodiscovery/server/proto.go diff --git a/cmd/checks-agent/subcommands/start/command.go b/cmd/checks-agent/subcommands/start/command.go index 8fe9fa2b2c52a..e20a8039dfdda 100644 --- a/cmd/checks-agent/subcommands/start/command.go +++ b/cmd/checks-agent/subcommands/start/command.go @@ -8,18 +8,26 @@ package start import ( "context" + "io" "os" "os/signal" "syscall" + "time" + "github.com/cenkalti/backoff" "github.com/spf13/cobra" "go.uber.org/fx" "github.com/DataDog/datadog-agent/comp/aggregator/diagnosesendermanager" "github.com/DataDog/datadog-agent/comp/aggregator/diagnosesendermanager/diagnosesendermanagerimpl" + "github.com/DataDog/datadog-agent/comp/api/authtoken/fetchonlyimpl" "github.com/DataDog/datadog-agent/comp/collector/collector" "github.com/DataDog/datadog-agent/comp/collector/collector/collectorimpl" + "github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration" + auProto "github.com/DataDog/datadog-agent/comp/core/autodiscovery/proto" "github.com/DataDog/datadog-agent/comp/core/config" + grpcClient "github.com/DataDog/datadog-agent/comp/core/grpcClient/def" + grpcClientfx "github.com/DataDog/datadog-agent/comp/core/grpcClient/fx" "github.com/DataDog/datadog-agent/comp/core/hostname/hostnameimpl" log "github.com/DataDog/datadog-agent/comp/core/log/def" logfx "github.com/DataDog/datadog-agent/comp/core/log/fx" @@ -30,6 +38,7 @@ import ( "github.com/DataDog/datadog-agent/comp/serializer/compression/compressionimpl" "github.com/DataDog/datadog-agent/pkg/aggregator/sender" pkgcollector "github.com/DataDog/datadog-agent/pkg/collector" + "github.com/DataDog/datadog-agent/pkg/proto/pbgo/core" "github.com/DataDog/datadog-agent/pkg/serializer" "github.com/DataDog/datadog-agent/pkg/status/health" "github.com/DataDog/datadog-agent/pkg/util/fxutil" @@ -92,6 +101,10 @@ func RunChecksAgent(cliParams *CLIParams, defaultConfPath string, fct interface{ }), compressionimpl.Module(), hostnameimpl.Module(), + + // grpc Client + grpcClientfx.Module(), + fetchonlyimpl.Module(), ) } @@ -102,6 +115,7 @@ func start( _ diagnosesendermanager.Component, collector collector.Component, sender sender.SenderManager, + grpcClient grpcClient.Component, ) error { // Main context passed to components @@ -112,7 +126,10 @@ func start( // TODO: figure out how to initial.ize checks context // check.InitializeInventoryChecksContext(invChecks) - pkgcollector.InitCheckScheduler(optional.NewOption(collector), sender, optional.NewNoneOption[integrations.Component]()) + scheduler := pkgcollector.InitCheckScheduler(optional.NewOption(collector), sender, optional.NewNoneOption[integrations.Component]()) + + // Start the scheduler + go startScheduler(grpcClient, scheduler, log) stopCh := make(chan struct{}) go handleSignals(stopCh, log) @@ -180,3 +197,81 @@ func StopAgent(cancel context.CancelFunc, log log.Component) { log.Info("See ya!") log.Flush() } + +type autodiscoveryStream struct { + autodiscoveryStream core.AgentSecure_AutodiscoveryStreamConfigClient + autodiscoveryStreamCancel context.CancelFunc +} + +func (a *autodiscoveryStream) initStream(grpcClient grpcClient.Component, log log.Component) error { + expBackoff := backoff.NewExponentialBackOff() + expBackoff.InitialInterval = 500 * time.Millisecond + expBackoff.MaxInterval = 5 * time.Minute + expBackoff.MaxElapsedTime = 0 * time.Minute + + return backoff.Retry(func() error { + select { + case <-grpcClient.Context().Done(): + return &backoff.PermanentError{} + default: + } + + streamCtx, streamCancelCtx := grpcClient.NewStreamContext() + + stream, err := grpcClient.AutodiscoveryStreamConfig(streamCtx, nil) + if err != nil { + log.Infof("unable to establish stream, will possibly retry: %s", err) + // We need to handle the case that the kernel agent dies + return err + } + + a.autodiscoveryStream = stream + a.autodiscoveryStreamCancel = streamCancelCtx + + log.Info("autodiscovery stream established successfully") + return nil + }, expBackoff) +} + +func startScheduler(grpcClient grpcClient.Component, scheduler *pkgcollector.CheckScheduler, log log.Component) { + // Start a stream using the grpc Client to consume autodiscovery updates for the different configurations + autodiscoveryStream := &autodiscoveryStream{} + + for { + if autodiscoveryStream.autodiscoveryStream == nil { + err := autodiscoveryStream.initStream(grpcClient, log) + if err != nil { + log.Warnf("error received trying to start stream: %s", err) + continue + } + } + + streamConfigs, err := autodiscoveryStream.autodiscoveryStream.Recv() + + if err != nil { + autodiscoveryStream.autodiscoveryStreamCancel() + + autodiscoveryStream.autodiscoveryStream = nil + + if err != io.EOF { + log.Warnf("error received from autodiscovery stream: %s", err) + } + + continue + } + + scheduleConfigs := []integration.Config{} + unscheduleConfigs := []integration.Config{} + + for _, config := range streamConfigs.Configs { + if config.EventType == core.ConfigEventType_SCHEDULE { + scheduleConfigs = append(scheduleConfigs, auProto.AutodiscoveryConfigFromprotobufConfig(config)) + } else if config.EventType == core.ConfigEventType_UNSCHEDULE { + unscheduleConfigs = append(unscheduleConfigs, auProto.AutodiscoveryConfigFromprotobufConfig(config)) + } + } + + scheduler.Schedule(scheduleConfigs) + scheduler.Unschedule(unscheduleConfigs) + } +} diff --git a/comp/core/autodiscovery/proto/proto.go b/comp/core/autodiscovery/proto/proto.go new file mode 100644 index 0000000000000..f03c1f2ad66d1 --- /dev/null +++ b/comp/core/autodiscovery/proto/proto.go @@ -0,0 +1,93 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package proto + +import ( + "github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration" + "github.com/DataDog/datadog-agent/pkg/proto/pbgo/core" +) + +func ProtobufConfigFromAutodiscoveryConfig(config *integration.Config) *core.Config { + instances := [][]byte{} + + for _, instance := range config.Instances { + instances = append(instances, []byte(instance)) + } + + advancedAdIdentifiers := make([]*core.AdvancedADIdentifier, 0, len(config.AdvancedADIdentifiers)) + for _, advancedAdIdentifier := range config.AdvancedADIdentifiers { + advancedAdIdentifiers = append(advancedAdIdentifiers, &core.AdvancedADIdentifier{ + KubeService: &core.KubeNamespacedName{ + Name: advancedAdIdentifier.KubeService.Name, + Namespace: advancedAdIdentifier.KubeService.Namespace, + }, + KubeEndpoints: &core.KubeNamespacedName{ + Name: advancedAdIdentifier.KubeEndpoints.Name, + Namespace: advancedAdIdentifier.KubeEndpoints.Namespace, + }, + }) + } + + return &core.Config{ + Name: config.Name, + Instances: instances, + InitConfig: config.InitConfig, + MetricConfig: config.MetricConfig, + LogsConfig: config.LogsConfig, + AdIdentifiers: config.ADIdentifiers, + AdvancedAdIdentifiers: advancedAdIdentifiers, + Provider: config.Provider, + ServiceId: config.ServiceID, + TaggerEntity: config.TaggerEntity, + ClusterCheck: config.ClusterCheck, + NodeName: config.NodeName, + Source: config.Source, + IgnoreAutodiscoveryTags: config.IgnoreAutodiscoveryTags, + MetricsExcluded: config.MetricsExcluded, + LogsExcluded: config.LogsExcluded, + } +} + +func AutodiscoveryConfigFromprotobufConfig(config *core.Config) integration.Config { + instances := []integration.Data{} + + for _, instance := range config.Instances { + instances = append(instances, integration.Data(instance)) + } + + advancedAdIdentifiers := make([]integration.AdvancedADIdentifier, 0, len(config.AdvancedAdIdentifiers)) + for _, advancedAdIdentifier := range config.AdvancedAdIdentifiers { + advancedAdIdentifiers = append(advancedAdIdentifiers, integration.AdvancedADIdentifier{ + KubeService: integration.KubeNamespacedName{ + Name: advancedAdIdentifier.KubeService.Name, + Namespace: advancedAdIdentifier.KubeService.Namespace, + }, + KubeEndpoints: integration.KubeNamespacedName{ + Name: advancedAdIdentifier.KubeEndpoints.Name, + Namespace: advancedAdIdentifier.KubeEndpoints.Namespace, + }, + }) + } + + return integration.Config{ + Name: config.Name, + Instances: instances, + InitConfig: config.InitConfig, + MetricConfig: config.MetricConfig, + LogsConfig: config.LogsConfig, + ADIdentifiers: config.AdIdentifiers, + AdvancedADIdentifiers: advancedAdIdentifiers, + Provider: config.Provider, + ServiceID: config.ServiceId, + TaggerEntity: config.TaggerEntity, + ClusterCheck: config.ClusterCheck, + NodeName: config.NodeName, + Source: config.Source, + IgnoreAutodiscoveryTags: config.IgnoreAutodiscoveryTags, + MetricsExcluded: config.MetricsExcluded, + LogsExcluded: config.LogsExcluded, + } +} diff --git a/comp/core/autodiscovery/server/proto.go b/comp/core/autodiscovery/server/proto.go deleted file mode 100644 index 247cd133bcd6c..0000000000000 --- a/comp/core/autodiscovery/server/proto.go +++ /dev/null @@ -1,52 +0,0 @@ -// Unless explicitly stated otherwise all files in this repository are licensed -// under the Apache License Version 2.0. -// This product includes software developed at Datadog (https://www.datadoghq.com/). -// Copyright 2016-2020 Datadog, Inc. - -package server - -import ( - "github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration" - pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/core" -) - -func protobufConfigFromAutodiscoveryConfig(config *integration.Config) *pb.Config { - instances := [][]byte{} - - for _, instance := range config.Instances { - instances = append(instances, []byte(instance)) - } - - advancedAdIdentifiers := make([]*pb.AdvancedADIdentifier, 0, len(config.AdvancedADIdentifiers)) - for _, advancedAdIdentifier := range config.AdvancedADIdentifiers { - advancedAdIdentifiers = append(advancedAdIdentifiers, &pb.AdvancedADIdentifier{ - KubeService: &pb.KubeNamespacedName{ - Name: advancedAdIdentifier.KubeService.Name, - Namespace: advancedAdIdentifier.KubeService.Namespace, - }, - KubeEndpoints: &pb.KubeNamespacedName{ - Name: advancedAdIdentifier.KubeEndpoints.Name, - Namespace: advancedAdIdentifier.KubeEndpoints.Namespace, - }, - }) - } - - return &pb.Config{ - Name: config.Name, - Instances: instances, - InitConfig: config.InitConfig, - MetricConfig: config.MetricConfig, - LogsConfig: config.LogsConfig, - AdIdentifiers: config.ADIdentifiers, - AdvancedAdIdentifiers: advancedAdIdentifiers, - Provider: config.Provider, - ServiceId: config.ServiceID, - TaggerEntity: config.TaggerEntity, - ClusterCheck: config.ClusterCheck, - NodeName: config.NodeName, - Source: config.Source, - IgnoreAutodiscoveryTags: config.IgnoreAutodiscoveryTags, - MetricsExcluded: config.MetricsExcluded, - LogsExcluded: config.LogsExcluded, - } -} diff --git a/comp/core/autodiscovery/server/server.go b/comp/core/autodiscovery/server/server.go index 004e50fb54c47..b8a45f2e1ad26 100644 --- a/comp/core/autodiscovery/server/server.go +++ b/comp/core/autodiscovery/server/server.go @@ -9,6 +9,7 @@ import ( "time" "github.com/DataDog/datadog-agent/comp/core/autodiscovery" + "github.com/DataDog/datadog-agent/comp/core/autodiscovery/proto" pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/core" "github.com/DataDog/datadog-agent/pkg/util/grpc" "github.com/DataDog/datadog-agent/pkg/util/log" @@ -37,7 +38,7 @@ func (s *Server) StreamConfig(out pb.AgentSecure_AutodiscoveryStreamConfigServer return nil } - protobufConfig := protobufConfigFromAutodiscoveryConfig(config) + protobufConfig := proto.ProtobufConfigFromAutodiscoveryConfig(config) protobufConfig.EventType = pb.ConfigEventType_SCHEDULE err := grpc.DoWithTimeout(func() error { @@ -55,7 +56,7 @@ func (s *Server) StreamConfig(out pb.AgentSecure_AutodiscoveryStreamConfigServer return nil } - protobufConfig := protobufConfigFromAutodiscoveryConfig(config) + protobufConfig := proto.ProtobufConfigFromAutodiscoveryConfig(config) protobufConfig.EventType = pb.ConfigEventType_UNSCHEDULE err := grpc.DoWithTimeout(func() error { diff --git a/comp/logs/agent/agentimpl/agent.go b/comp/logs/agent/agentimpl/agent.go index b7f5159f14bb5..eba1d1b2975a1 100644 --- a/comp/logs/agent/agentimpl/agent.go +++ b/comp/logs/agent/agentimpl/agent.go @@ -19,6 +19,7 @@ import ( "go.uber.org/fx" "github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration" + auProto "github.com/DataDog/datadog-agent/comp/core/autodiscovery/proto" configComponent "github.com/DataDog/datadog-agent/comp/core/config" flaretypes "github.com/DataDog/datadog-agent/comp/core/flare/types" grpcClient "github.com/DataDog/datadog-agent/comp/core/grpcClient/def" @@ -220,7 +221,7 @@ func (a *logAgent) start(context.Context) error { a.autodiscoveryStream = nil if err != io.EOF { - a.log.Warnf("error received from autodiscovery stream workloadmeta: %s", err) + a.log.Warnf("error received from autodiscovery stream: %s", err) } continue @@ -231,9 +232,9 @@ func (a *logAgent) start(context.Context) error { for _, config := range streamConfigs.Configs { if config.EventType == core.ConfigEventType_SCHEDULE { - scheduleConfigs = append(scheduleConfigs, autodiscoveryConfigFromprotobufConfig(config)) + scheduleConfigs = append(scheduleConfigs, auProto.AutodiscoveryConfigFromprotobufConfig(config)) } else if config.EventType == core.ConfigEventType_UNSCHEDULE { - unscheduleConfigs = append(unscheduleConfigs, autodiscoveryConfigFromprotobufConfig(config)) + unscheduleConfigs = append(unscheduleConfigs, auProto.AutodiscoveryConfigFromprotobufConfig(config)) } } @@ -256,47 +257,6 @@ func (a *logAgent) start(context.Context) error { return nil } -func autodiscoveryConfigFromprotobufConfig(config *core.Config) integration.Config { - instances := []integration.Data{} - - for _, instance := range config.Instances { - instances = append(instances, integration.Data(instance)) - } - - advancedAdIdentifiers := make([]integration.AdvancedADIdentifier, 0, len(config.AdvancedAdIdentifiers)) - for _, advancedAdIdentifier := range config.AdvancedAdIdentifiers { - advancedAdIdentifiers = append(advancedAdIdentifiers, integration.AdvancedADIdentifier{ - KubeService: integration.KubeNamespacedName{ - Name: advancedAdIdentifier.KubeService.Name, - Namespace: advancedAdIdentifier.KubeService.Namespace, - }, - KubeEndpoints: integration.KubeNamespacedName{ - Name: advancedAdIdentifier.KubeEndpoints.Name, - Namespace: advancedAdIdentifier.KubeEndpoints.Namespace, - }, - }) - } - - return integration.Config{ - Name: config.Name, - Instances: instances, - InitConfig: config.InitConfig, - MetricConfig: config.MetricConfig, - LogsConfig: config.LogsConfig, - ADIdentifiers: config.AdIdentifiers, - AdvancedADIdentifiers: advancedAdIdentifiers, - Provider: config.Provider, - ServiceID: config.ServiceId, - TaggerEntity: config.TaggerEntity, - ClusterCheck: config.ClusterCheck, - NodeName: config.NodeName, - Source: config.Source, - IgnoreAutodiscoveryTags: config.IgnoreAutodiscoveryTags, - MetricsExcluded: config.MetricsExcluded, - LogsExcluded: config.LogsExcluded, - } -} - func (a *logAgent) initStream() error { expBackoff := backoff.NewExponentialBackOff() expBackoff.InitialInterval = 500 * time.Millisecond