diff --git a/cmd/cluster-agent/api/server.go b/cmd/cluster-agent/api/server.go index c4f5c0534633a7..30346b8cd14656 100644 --- a/cmd/cluster-agent/api/server.go +++ b/cmd/cluster-agent/api/server.go @@ -136,7 +136,7 @@ func StartServer(ctx context.Context, w workloadmeta.Component, taggerComp tagge // event size should be small enough to fit within the grpc max message size maxEventSize := maxMessageSize / 2 pb.RegisterAgentSecureServer(grpcSrv, &serverSecure{ - taggerServer: taggerserver.NewServer(taggerComp, maxEventSize), + taggerServer: taggerserver.NewServer(taggerComp, maxEventSize, cfg.GetInt("remote_tagger.max_concurrent_sync")), }) timeout := pkgconfigsetup.Datadog().GetDuration("cluster_agent.server.idle_timeout_seconds") * time.Second diff --git a/comp/api/api/apiimpl/server_cmd.go b/comp/api/api/apiimpl/server_cmd.go index 2eca0fe7abd09b..7e69a99ecc201f 100644 --- a/comp/api/api/apiimpl/server_cmd.go +++ b/comp/api/api/apiimpl/server_cmd.go @@ -64,7 +64,7 @@ func (server *apiServer) startCMDServer( pb.RegisterAgentSecureServer(s, &serverSecure{ configService: server.rcService, configServiceMRF: server.rcServiceMRF, - taggerServer: taggerserver.NewServer(server.taggerComp, maxEventSize), + taggerServer: taggerserver.NewServer(server.taggerComp, maxEventSize, cfg.GetInt("remote_tagger.max_concurrent_sync")), taggerComp: server.taggerComp, // TODO(components): decide if workloadmetaServer should be componentized itself workloadmetaServer: workloadmetaServer.NewServer(server.wmeta), diff --git a/comp/core/tagger/server/server.go b/comp/core/tagger/server/server.go index 34a719f32c440a..d35017c9ce25c3 100644 --- a/comp/core/tagger/server/server.go +++ b/comp/core/tagger/server/server.go @@ -34,13 +34,15 @@ const ( type Server struct { taggerComponent tagger.Component maxEventSize int + throttler Throttler } // NewServer returns a new Server -func NewServer(t tagger.Component, maxEventSize int) *Server { +func NewServer(t tagger.Component, maxEventSize int, maxParallelSync int) *Server { return &Server{ taggerComponent: t, maxEventSize: maxEventSize, + throttler: NewSyncThrottler(uint32(maxParallelSync)), } } @@ -53,6 +55,42 @@ func (s *Server) TaggerStreamEntities(in *pb.StreamTagsRequest, out pb.AgentSecu return err } + ticker := time.NewTicker(streamKeepAliveInterval) + defer ticker.Stop() + + timeoutRefreshError := make(chan error) + + go func() { + // The remote tagger client has a timeout that closes the + // connection after 10 minutes of inactivity (implemented in + // comp/core/tagger/remote/tagger.go) In order to avoid closing the + // connection and having to open it again, the server will send + // an empty message after 9 minutes of inactivity. The goal is + // only to keep the connection alive without losing the + // protection against “half” closed connections brought by the + // timeout. + for { + select { + case <-out.Context().Done(): + return + + case <-ticker.C: + err = grpc.DoWithTimeout(func() error { + return out.Send(&pb.StreamTagsResponse{ + Events: []*pb.StreamTagsEvent{}, + }) + }, taggerStreamSendTimeout) + + if err != nil { + log.Warnf("error sending tagger keep-alive: %s", err) + s.taggerComponent.GetTaggerTelemetryStore().ServerStreamErrors.Inc() + timeoutRefreshError <- err + return + } + } + } + }() + filterBuilder := types.NewFilterBuilder() for _, prefix := range in.GetPrefixes() { filterBuilder = filterBuilder.Include(types.EntityIDPrefix(prefix)) @@ -62,23 +100,23 @@ func (s *Server) TaggerStreamEntities(in *pb.StreamTagsRequest, out pb.AgentSecu streamingID := in.GetStreamingID() if streamingID == "" { - // this is done to preserve backward compatibility - // if CLC runner is using an old version, the streaming ID would be an empty string, - // and the server needs to auto-assign a unique id streamingID = uuid.New().String() } - subscriptionID := fmt.Sprintf("streaming-client-%s", streamingID) + + initBurst := true + tk := s.throttler.RequestToken() + defer s.throttler.Release(tk) + subscription, err := s.taggerComponent.Subscribe(subscriptionID, filter) + log.Debugf("cluster tagger has just initiated subscription for %q at time %v", subscriptionID, time.Now().Unix()) if err != nil { + log.Errorf("Failed to subscribe to tagger for subscription %q", subscriptionID) return err } defer subscription.Unsubscribe() - ticker := time.NewTicker(streamKeepAliveInterval) - defer ticker.Stop() - sendFunc := func(chunk []*pb.StreamTagsEvent) error { return grpc.DoWithTimeout(func() error { return out.Send(&pb.StreamTagsResponse{ @@ -114,29 +152,17 @@ func (s *Server) TaggerStreamEntities(in *pb.StreamTagsRequest, out pb.AgentSecu return err } + if initBurst { + initBurst = false + s.throttler.Release(tk) + log.Infof("cluster tagger has just finished initialization for subscription %q at time %v", subscriptionID, time.Now().Unix()) + } + case <-out.Context().Done(): return nil - // The remote tagger client has a timeout that closes the - // connection after 10 minutes of inactivity (implemented in - // comp/core/tagger/remote/tagger.go) In order to avoid closing the - // connection and having to open it again, the server will send - // an empty message after 9 minutes of inactivity. The goal is - // only to keep the connection alive without losing the - // protection against “half” closed connections brought by the - // timeout. - case <-ticker.C: - err = grpc.DoWithTimeout(func() error { - return out.Send(&pb.StreamTagsResponse{ - Events: []*pb.StreamTagsEvent{}, - }) - }, taggerStreamSendTimeout) - - if err != nil { - log.Warnf("error sending tagger keep-alive: %s", err) - s.taggerComponent.GetTaggerTelemetryStore().ServerStreamErrors.Inc() - return err - } + case err = <-timeoutRefreshError: + return err } } } diff --git a/comp/core/tagger/server/syncthrottler.go b/comp/core/tagger/server/syncthrottler.go new file mode 100644 index 00000000000000..5309e97c09a7a6 --- /dev/null +++ b/comp/core/tagger/server/syncthrottler.go @@ -0,0 +1,62 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package server + +import ( + "sync" + + "github.com/google/uuid" +) + +type token string + +// Throttler provides tokens with throttling logic that limits the number of active tokens at the same time +// When a component is done with a token, it should release the token by calling the Release method +type Throttler interface { + // RequestToken returns a token + RequestToken() token + // ReleaseToken returns token back to the throttler + // This method is idempotent (i.e. invoking it on the same token multiple times will have the same effect) + Release(t token) +} + +// limiter implements the Throttler interface +type limiter struct { + mutex sync.RWMutex + tokensChan chan struct{} + activeRequests map[token]struct{} +} + +// NewSyncThrottler creates and returns a new Throttler +func NewSyncThrottler(maxConcurrentSync uint32) Throttler { + return &limiter{ + mutex: sync.RWMutex{}, + tokensChan: make(chan struct{}, maxConcurrentSync), + activeRequests: make(map[token]struct{}), + } +} + +// RequestToken implements Throttler#RequestToken +func (l *limiter) RequestToken() token { + tk := token(uuid.New().String()) + l.tokensChan <- struct{}{} + + l.mutex.Lock() + defer l.mutex.Unlock() + + l.activeRequests[tk] = struct{}{} + return tk +} + +// Release implements Throttler#Release +func (l *limiter) Release(t token) { + l.mutex.Lock() + defer l.mutex.Unlock() + if _, found := l.activeRequests[t]; found { + <-l.tokensChan + delete(l.activeRequests, t) + } +} diff --git a/comp/core/tagger/server/syncthrottler_test.go b/comp/core/tagger/server/syncthrottler_test.go new file mode 100644 index 00000000000000..0c1d488c8298a3 --- /dev/null +++ b/comp/core/tagger/server/syncthrottler_test.go @@ -0,0 +1,27 @@ +package server + +import ( + "testing" + "time" +) + +func TestSyncThrottler(t *testing.T) { + throtler := NewSyncThrottler(3) + + t1 := throtler.RequestToken() + t2 := throtler.RequestToken() + t3 := throtler.RequestToken() + + go func() { + time.Sleep(1 * time.Second) + throtler.Release(t3) + }() + + t4 := throtler.RequestToken() // this should block until token t3 is released + throtler.Release(t4) + + throtler.Release(t4) // releasing a token that was already released should be ok (idempotent) + + throtler.Release(t1) + throtler.Release(t2) +} diff --git a/pkg/config/setup/config.go b/pkg/config/setup/config.go index 53ef845aa8fac8..55cd7ae704b5cb 100644 --- a/pkg/config/setup/config.go +++ b/pkg/config/setup/config.go @@ -729,6 +729,9 @@ func InitConfig(config pkgconfigmodel.Setup) { config.BindEnvAndSetDefault("clc_runner_server_readheader_timeout", 10) config.BindEnvAndSetDefault("clc_runner_remote_tagger_enabled", false) + // Remote tagger + config.BindEnvAndSetDefault("remote_tagger.max_concurrent_sync", 4) + // Admission controller config.BindEnvAndSetDefault("admission_controller.enabled", false) config.BindEnvAndSetDefault("admission_controller.validation.enabled", true)