Skip to content

Commit

Permalink
connect checks agent and agent kernel to schedule python checks
Browse files Browse the repository at this point in the history
  • Loading branch information
GustavoCaso committed Oct 15, 2024
1 parent 89cc97e commit 54957e1
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 99 deletions.
97 changes: 96 additions & 1 deletion cmd/checks-agent/subcommands/start/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,26 @@ package start

import (
"context"
"io"
"os"
"os/signal"
"syscall"
"time"

"github.com/cenkalti/backoff"
"github.com/spf13/cobra"
"go.uber.org/fx"

"github.com/DataDog/datadog-agent/comp/aggregator/diagnosesendermanager"
"github.com/DataDog/datadog-agent/comp/aggregator/diagnosesendermanager/diagnosesendermanagerimpl"
"github.com/DataDog/datadog-agent/comp/api/authtoken/fetchonlyimpl"
"github.com/DataDog/datadog-agent/comp/collector/collector"
"github.com/DataDog/datadog-agent/comp/collector/collector/collectorimpl"
"github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration"
auProto "github.com/DataDog/datadog-agent/comp/core/autodiscovery/proto"
"github.com/DataDog/datadog-agent/comp/core/config"
grpcClient "github.com/DataDog/datadog-agent/comp/core/grpcClient/def"
grpcClientfx "github.com/DataDog/datadog-agent/comp/core/grpcClient/fx"
"github.com/DataDog/datadog-agent/comp/core/hostname/hostnameimpl"
log "github.com/DataDog/datadog-agent/comp/core/log/def"
logfx "github.com/DataDog/datadog-agent/comp/core/log/fx"
Expand All @@ -30,6 +38,7 @@ import (
"github.com/DataDog/datadog-agent/comp/serializer/compression/compressionimpl"
"github.com/DataDog/datadog-agent/pkg/aggregator/sender"
pkgcollector "github.com/DataDog/datadog-agent/pkg/collector"
"github.com/DataDog/datadog-agent/pkg/proto/pbgo/core"
"github.com/DataDog/datadog-agent/pkg/serializer"
"github.com/DataDog/datadog-agent/pkg/status/health"
"github.com/DataDog/datadog-agent/pkg/util/fxutil"
Expand Down Expand Up @@ -92,6 +101,10 @@ func RunChecksAgent(cliParams *CLIParams, defaultConfPath string, fct interface{
}),
compressionimpl.Module(),
hostnameimpl.Module(),

// grpc Client
grpcClientfx.Module(),
fetchonlyimpl.Module(),
)
}

Expand All @@ -102,6 +115,7 @@ func start(
_ diagnosesendermanager.Component,
collector collector.Component,
sender sender.SenderManager,
grpcClient grpcClient.Component,
) error {

// Main context passed to components
Expand All @@ -112,7 +126,10 @@ func start(
// TODO: figure out how to initial.ize checks context
// check.InitializeInventoryChecksContext(invChecks)

pkgcollector.InitCheckScheduler(optional.NewOption(collector), sender, optional.NewNoneOption[integrations.Component]())
scheduler := pkgcollector.InitCheckScheduler(optional.NewOption(collector), sender, optional.NewNoneOption[integrations.Component]())

// Start the scheduler
go startScheduler(grpcClient, scheduler, log)

stopCh := make(chan struct{})
go handleSignals(stopCh, log)
Expand Down Expand Up @@ -180,3 +197,81 @@ func StopAgent(cancel context.CancelFunc, log log.Component) {
log.Info("See ya!")
log.Flush()
}

type autodiscoveryStream struct {
autodiscoveryStream core.AgentSecure_AutodiscoveryStreamConfigClient
autodiscoveryStreamCancel context.CancelFunc
}

func (a *autodiscoveryStream) initStream(grpcClient grpcClient.Component, log log.Component) error {
expBackoff := backoff.NewExponentialBackOff()
expBackoff.InitialInterval = 500 * time.Millisecond
expBackoff.MaxInterval = 5 * time.Minute
expBackoff.MaxElapsedTime = 0 * time.Minute

return backoff.Retry(func() error {
select {
case <-grpcClient.Context().Done():
return &backoff.PermanentError{}
default:
}

streamCtx, streamCancelCtx := grpcClient.NewStreamContext()

stream, err := grpcClient.AutodiscoveryStreamConfig(streamCtx, nil)
if err != nil {
log.Infof("unable to establish stream, will possibly retry: %s", err)
// We need to handle the case that the kernel agent dies
return err
}

a.autodiscoveryStream = stream
a.autodiscoveryStreamCancel = streamCancelCtx

log.Info("autodiscovery stream established successfully")
return nil
}, expBackoff)
}

func startScheduler(grpcClient grpcClient.Component, scheduler *pkgcollector.CheckScheduler, log log.Component) {
// Start a stream using the grpc Client to consume autodiscovery updates for the different configurations
autodiscoveryStream := &autodiscoveryStream{}

for {
if autodiscoveryStream.autodiscoveryStream == nil {
err := autodiscoveryStream.initStream(grpcClient, log)
if err != nil {
log.Warnf("error received trying to start stream: %s", err)
continue
}
}

streamConfigs, err := autodiscoveryStream.autodiscoveryStream.Recv()

if err != nil {
autodiscoveryStream.autodiscoveryStreamCancel()

autodiscoveryStream.autodiscoveryStream = nil

if err != io.EOF {
log.Warnf("error received from autodiscovery stream: %s", err)
}

continue
}

scheduleConfigs := []integration.Config{}
unscheduleConfigs := []integration.Config{}

for _, config := range streamConfigs.Configs {
if config.EventType == core.ConfigEventType_SCHEDULE {
scheduleConfigs = append(scheduleConfigs, auProto.AutodiscoveryConfigFromprotobufConfig(config))
} else if config.EventType == core.ConfigEventType_UNSCHEDULE {
unscheduleConfigs = append(unscheduleConfigs, auProto.AutodiscoveryConfigFromprotobufConfig(config))
}
}

scheduler.Schedule(scheduleConfigs)
scheduler.Unschedule(unscheduleConfigs)
}
}
93 changes: 93 additions & 0 deletions comp/core/autodiscovery/proto/proto.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package proto

import (
"github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration"
"github.com/DataDog/datadog-agent/pkg/proto/pbgo/core"
)

func ProtobufConfigFromAutodiscoveryConfig(config *integration.Config) *core.Config {
instances := [][]byte{}

for _, instance := range config.Instances {
instances = append(instances, []byte(instance))
}

advancedAdIdentifiers := make([]*core.AdvancedADIdentifier, 0, len(config.AdvancedADIdentifiers))
for _, advancedAdIdentifier := range config.AdvancedADIdentifiers {
advancedAdIdentifiers = append(advancedAdIdentifiers, &core.AdvancedADIdentifier{
KubeService: &core.KubeNamespacedName{
Name: advancedAdIdentifier.KubeService.Name,
Namespace: advancedAdIdentifier.KubeService.Namespace,
},
KubeEndpoints: &core.KubeNamespacedName{
Name: advancedAdIdentifier.KubeEndpoints.Name,
Namespace: advancedAdIdentifier.KubeEndpoints.Namespace,
},
})
}

return &core.Config{
Name: config.Name,
Instances: instances,
InitConfig: config.InitConfig,
MetricConfig: config.MetricConfig,
LogsConfig: config.LogsConfig,
AdIdentifiers: config.ADIdentifiers,
AdvancedAdIdentifiers: advancedAdIdentifiers,
Provider: config.Provider,
ServiceId: config.ServiceID,
TaggerEntity: config.TaggerEntity,
ClusterCheck: config.ClusterCheck,
NodeName: config.NodeName,
Source: config.Source,
IgnoreAutodiscoveryTags: config.IgnoreAutodiscoveryTags,
MetricsExcluded: config.MetricsExcluded,
LogsExcluded: config.LogsExcluded,
}
}

func AutodiscoveryConfigFromprotobufConfig(config *core.Config) integration.Config {
instances := []integration.Data{}

for _, instance := range config.Instances {
instances = append(instances, integration.Data(instance))
}

advancedAdIdentifiers := make([]integration.AdvancedADIdentifier, 0, len(config.AdvancedAdIdentifiers))
for _, advancedAdIdentifier := range config.AdvancedAdIdentifiers {
advancedAdIdentifiers = append(advancedAdIdentifiers, integration.AdvancedADIdentifier{
KubeService: integration.KubeNamespacedName{
Name: advancedAdIdentifier.KubeService.Name,
Namespace: advancedAdIdentifier.KubeService.Namespace,
},
KubeEndpoints: integration.KubeNamespacedName{
Name: advancedAdIdentifier.KubeEndpoints.Name,
Namespace: advancedAdIdentifier.KubeEndpoints.Namespace,
},
})
}

return integration.Config{
Name: config.Name,
Instances: instances,
InitConfig: config.InitConfig,
MetricConfig: config.MetricConfig,
LogsConfig: config.LogsConfig,
ADIdentifiers: config.AdIdentifiers,
AdvancedADIdentifiers: advancedAdIdentifiers,
Provider: config.Provider,
ServiceID: config.ServiceId,
TaggerEntity: config.TaggerEntity,
ClusterCheck: config.ClusterCheck,
NodeName: config.NodeName,
Source: config.Source,
IgnoreAutodiscoveryTags: config.IgnoreAutodiscoveryTags,
MetricsExcluded: config.MetricsExcluded,
LogsExcluded: config.LogsExcluded,
}
}
52 changes: 0 additions & 52 deletions comp/core/autodiscovery/server/proto.go

This file was deleted.

5 changes: 3 additions & 2 deletions comp/core/autodiscovery/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/DataDog/datadog-agent/comp/core/autodiscovery"
"github.com/DataDog/datadog-agent/comp/core/autodiscovery/proto"
pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/core"
"github.com/DataDog/datadog-agent/pkg/util/grpc"
"github.com/DataDog/datadog-agent/pkg/util/log"
Expand Down Expand Up @@ -37,7 +38,7 @@ func (s *Server) StreamConfig(out pb.AgentSecure_AutodiscoveryStreamConfigServer
return nil
}

protobufConfig := protobufConfigFromAutodiscoveryConfig(config)
protobufConfig := proto.ProtobufConfigFromAutodiscoveryConfig(config)
protobufConfig.EventType = pb.ConfigEventType_SCHEDULE

err := grpc.DoWithTimeout(func() error {
Expand All @@ -55,7 +56,7 @@ func (s *Server) StreamConfig(out pb.AgentSecure_AutodiscoveryStreamConfigServer
return nil
}

protobufConfig := protobufConfigFromAutodiscoveryConfig(config)
protobufConfig := proto.ProtobufConfigFromAutodiscoveryConfig(config)
protobufConfig.EventType = pb.ConfigEventType_UNSCHEDULE

err := grpc.DoWithTimeout(func() error {
Expand Down
48 changes: 4 additions & 44 deletions comp/logs/agent/agentimpl/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"go.uber.org/fx"

"github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration"
auProto "github.com/DataDog/datadog-agent/comp/core/autodiscovery/proto"
configComponent "github.com/DataDog/datadog-agent/comp/core/config"
flaretypes "github.com/DataDog/datadog-agent/comp/core/flare/types"
grpcClient "github.com/DataDog/datadog-agent/comp/core/grpcClient/def"
Expand Down Expand Up @@ -220,7 +221,7 @@ func (a *logAgent) start(context.Context) error {
a.autodiscoveryStream = nil

if err != io.EOF {
a.log.Warnf("error received from autodiscovery stream workloadmeta: %s", err)
a.log.Warnf("error received from autodiscovery stream: %s", err)
}

continue
Expand All @@ -231,9 +232,9 @@ func (a *logAgent) start(context.Context) error {

for _, config := range streamConfigs.Configs {
if config.EventType == core.ConfigEventType_SCHEDULE {
scheduleConfigs = append(scheduleConfigs, autodiscoveryConfigFromprotobufConfig(config))
scheduleConfigs = append(scheduleConfigs, auProto.AutodiscoveryConfigFromprotobufConfig(config))
} else if config.EventType == core.ConfigEventType_UNSCHEDULE {
unscheduleConfigs = append(unscheduleConfigs, autodiscoveryConfigFromprotobufConfig(config))
unscheduleConfigs = append(unscheduleConfigs, auProto.AutodiscoveryConfigFromprotobufConfig(config))
}
}

Expand All @@ -256,47 +257,6 @@ func (a *logAgent) start(context.Context) error {
return nil
}

func autodiscoveryConfigFromprotobufConfig(config *core.Config) integration.Config {
instances := []integration.Data{}

for _, instance := range config.Instances {
instances = append(instances, integration.Data(instance))
}

advancedAdIdentifiers := make([]integration.AdvancedADIdentifier, 0, len(config.AdvancedAdIdentifiers))
for _, advancedAdIdentifier := range config.AdvancedAdIdentifiers {
advancedAdIdentifiers = append(advancedAdIdentifiers, integration.AdvancedADIdentifier{
KubeService: integration.KubeNamespacedName{
Name: advancedAdIdentifier.KubeService.Name,
Namespace: advancedAdIdentifier.KubeService.Namespace,
},
KubeEndpoints: integration.KubeNamespacedName{
Name: advancedAdIdentifier.KubeEndpoints.Name,
Namespace: advancedAdIdentifier.KubeEndpoints.Namespace,
},
})
}

return integration.Config{
Name: config.Name,
Instances: instances,
InitConfig: config.InitConfig,
MetricConfig: config.MetricConfig,
LogsConfig: config.LogsConfig,
ADIdentifiers: config.AdIdentifiers,
AdvancedADIdentifiers: advancedAdIdentifiers,
Provider: config.Provider,
ServiceID: config.ServiceId,
TaggerEntity: config.TaggerEntity,
ClusterCheck: config.ClusterCheck,
NodeName: config.NodeName,
Source: config.Source,
IgnoreAutodiscoveryTags: config.IgnoreAutodiscoveryTags,
MetricsExcluded: config.MetricsExcluded,
LogsExcluded: config.LogsExcluded,
}
}

func (a *logAgent) initStream() error {
expBackoff := backoff.NewExponentialBackOff()
expBackoff.InitialInterval = 500 * time.Millisecond
Expand Down

0 comments on commit 54957e1

Please sign in to comment.