diff --git a/cmd/cluster-agent/api/server.go b/cmd/cluster-agent/api/server.go index c4f5c0534633a..30346b8cd1465 100644 --- a/cmd/cluster-agent/api/server.go +++ b/cmd/cluster-agent/api/server.go @@ -136,7 +136,7 @@ func StartServer(ctx context.Context, w workloadmeta.Component, taggerComp tagge // event size should be small enough to fit within the grpc max message size maxEventSize := maxMessageSize / 2 pb.RegisterAgentSecureServer(grpcSrv, &serverSecure{ - taggerServer: taggerserver.NewServer(taggerComp, maxEventSize), + taggerServer: taggerserver.NewServer(taggerComp, maxEventSize, cfg.GetInt("remote_tagger.max_concurrent_sync")), }) timeout := pkgconfigsetup.Datadog().GetDuration("cluster_agent.server.idle_timeout_seconds") * time.Second diff --git a/comp/api/api/apiimpl/server_cmd.go b/comp/api/api/apiimpl/server_cmd.go index 2eca0fe7abd09..7e69a99ecc201 100644 --- a/comp/api/api/apiimpl/server_cmd.go +++ b/comp/api/api/apiimpl/server_cmd.go @@ -64,7 +64,7 @@ func (server *apiServer) startCMDServer( pb.RegisterAgentSecureServer(s, &serverSecure{ configService: server.rcService, configServiceMRF: server.rcServiceMRF, - taggerServer: taggerserver.NewServer(server.taggerComp, maxEventSize), + taggerServer: taggerserver.NewServer(server.taggerComp, maxEventSize, cfg.GetInt("remote_tagger.max_concurrent_sync")), taggerComp: server.taggerComp, // TODO(components): decide if workloadmetaServer should be componentized itself workloadmetaServer: workloadmetaServer.NewServer(server.wmeta), diff --git a/comp/core/tagger/server/server.go b/comp/core/tagger/server/server.go index 34a719f32c440..6e7cf4c1fd5c8 100644 --- a/comp/core/tagger/server/server.go +++ b/comp/core/tagger/server/server.go @@ -34,13 +34,15 @@ const ( type Server struct { taggerComponent tagger.Component maxEventSize int + throttler Throttler } // NewServer returns a new Server -func NewServer(t tagger.Component, maxEventSize int) *Server { +func NewServer(t tagger.Component, maxEventSize int, maxParallelSync int) *Server { return &Server{ taggerComponent: t, maxEventSize: maxEventSize, + throttler: NewSyncThrottler(uint32(maxParallelSync)), } } @@ -53,6 +55,42 @@ func (s *Server) TaggerStreamEntities(in *pb.StreamTagsRequest, out pb.AgentSecu return err } + ticker := time.NewTicker(streamKeepAliveInterval) + defer ticker.Stop() + + timeoutRefreshError := make(chan error) + + go func() { + // The remote tagger client has a timeout that closes the + // connection after 10 minutes of inactivity (implemented in + // comp/core/tagger/remote/tagger.go) In order to avoid closing the + // connection and having to open it again, the server will send + // an empty message after 9 minutes of inactivity. The goal is + // only to keep the connection alive without losing the + // protection against “half” closed connections brought by the + // timeout. + for { + select { + case <-out.Context().Done(): + return + + case <-ticker.C: + err = grpc.DoWithTimeout(func() error { + return out.Send(&pb.StreamTagsResponse{ + Events: []*pb.StreamTagsEvent{}, + }) + }, taggerStreamSendTimeout) + + if err != nil { + log.Warnf("error sending tagger keep-alive: %s", err) + s.taggerComponent.GetTaggerTelemetryStore().ServerStreamErrors.Inc() + timeoutRefreshError <- err + return + } + } + } + }() + filterBuilder := types.NewFilterBuilder() for _, prefix := range in.GetPrefixes() { filterBuilder = filterBuilder.Include(types.EntityIDPrefix(prefix)) @@ -62,23 +100,27 @@ func (s *Server) TaggerStreamEntities(in *pb.StreamTagsRequest, out pb.AgentSecu streamingID := in.GetStreamingID() if streamingID == "" { - // this is done to preserve backward compatibility - // if CLC runner is using an old version, the streaming ID would be an empty string, - // and the server needs to auto-assign a unique id streamingID = uuid.New().String() } - subscriptionID := fmt.Sprintf("streaming-client-%s", streamingID) + + // initBurst is a flag indicating if the initial sync is still in progress or not + // true means the sync hasn't yet been finalised + // false means the streaming client has already caught up with the server + initBurst := true + log.Debugf("requesting token from server throttler for streaming id: %q", streamingID) + tk := s.throttler.RequestToken() + defer s.throttler.Release(tk) + subscription, err := s.taggerComponent.Subscribe(subscriptionID, filter) + log.Debugf("cluster tagger has just initiated subscription for %q at time %v", subscriptionID, time.Now().Unix()) if err != nil { + log.Errorf("Failed to subscribe to tagger for subscription %q", subscriptionID) return err } defer subscription.Unsubscribe() - ticker := time.NewTicker(streamKeepAliveInterval) - defer ticker.Stop() - sendFunc := func(chunk []*pb.StreamTagsEvent) error { return grpc.DoWithTimeout(func() error { return out.Send(&pb.StreamTagsResponse{ @@ -114,29 +156,17 @@ func (s *Server) TaggerStreamEntities(in *pb.StreamTagsRequest, out pb.AgentSecu return err } + if initBurst { + initBurst = false + s.throttler.Release(tk) + log.Infof("cluster tagger has just finished initialization for subscription %q at time %v", subscriptionID, time.Now().Unix()) + } + case <-out.Context().Done(): return nil - // The remote tagger client has a timeout that closes the - // connection after 10 minutes of inactivity (implemented in - // comp/core/tagger/remote/tagger.go) In order to avoid closing the - // connection and having to open it again, the server will send - // an empty message after 9 minutes of inactivity. The goal is - // only to keep the connection alive without losing the - // protection against “half” closed connections brought by the - // timeout. - case <-ticker.C: - err = grpc.DoWithTimeout(func() error { - return out.Send(&pb.StreamTagsResponse{ - Events: []*pb.StreamTagsEvent{}, - }) - }, taggerStreamSendTimeout) - - if err != nil { - log.Warnf("error sending tagger keep-alive: %s", err) - s.taggerComponent.GetTaggerTelemetryStore().ServerStreamErrors.Inc() - return err - } + case err = <-timeoutRefreshError: + return err } } } diff --git a/comp/core/tagger/server/syncthrottler.go b/comp/core/tagger/server/syncthrottler.go new file mode 100644 index 0000000000000..5309e97c09a7a --- /dev/null +++ b/comp/core/tagger/server/syncthrottler.go @@ -0,0 +1,62 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package server + +import ( + "sync" + + "github.com/google/uuid" +) + +type token string + +// Throttler provides tokens with throttling logic that limits the number of active tokens at the same time +// When a component is done with a token, it should release the token by calling the Release method +type Throttler interface { + // RequestToken returns a token + RequestToken() token + // ReleaseToken returns token back to the throttler + // This method is idempotent (i.e. invoking it on the same token multiple times will have the same effect) + Release(t token) +} + +// limiter implements the Throttler interface +type limiter struct { + mutex sync.RWMutex + tokensChan chan struct{} + activeRequests map[token]struct{} +} + +// NewSyncThrottler creates and returns a new Throttler +func NewSyncThrottler(maxConcurrentSync uint32) Throttler { + return &limiter{ + mutex: sync.RWMutex{}, + tokensChan: make(chan struct{}, maxConcurrentSync), + activeRequests: make(map[token]struct{}), + } +} + +// RequestToken implements Throttler#RequestToken +func (l *limiter) RequestToken() token { + tk := token(uuid.New().String()) + l.tokensChan <- struct{}{} + + l.mutex.Lock() + defer l.mutex.Unlock() + + l.activeRequests[tk] = struct{}{} + return tk +} + +// Release implements Throttler#Release +func (l *limiter) Release(t token) { + l.mutex.Lock() + defer l.mutex.Unlock() + if _, found := l.activeRequests[t]; found { + <-l.tokensChan + delete(l.activeRequests, t) + } +} diff --git a/comp/core/tagger/server/syncthrottler_test.go b/comp/core/tagger/server/syncthrottler_test.go new file mode 100644 index 0000000000000..a4065184baab9 --- /dev/null +++ b/comp/core/tagger/server/syncthrottler_test.go @@ -0,0 +1,32 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package server + +import ( + "sync" + "testing" + "time" +) + +func TestSyncThrottler(_ *testing.T) { + + throtler := NewSyncThrottler(3) + + var wg sync.WaitGroup + + for i := 0; i < 30; i++ { + wg.Add(1) + go func() { + defer wg.Done() + t := throtler.RequestToken() + time.Sleep(200 * time.Millisecond) + throtler.Release(t) + throtler.Release(t) // Release method should be idempotent + }() + } + + wg.Wait() +} diff --git a/pkg/config/setup/config.go b/pkg/config/setup/config.go index 8914e8cef3a6c..7bacf70ebd8cc 100644 --- a/pkg/config/setup/config.go +++ b/pkg/config/setup/config.go @@ -730,6 +730,9 @@ func InitConfig(config pkgconfigmodel.Setup) { config.BindEnvAndSetDefault("clc_runner_server_readheader_timeout", 10) config.BindEnvAndSetDefault("clc_runner_remote_tagger_enabled", false) + // Remote tagger + config.BindEnvAndSetDefault("remote_tagger.max_concurrent_sync", 3) + // Admission controller config.BindEnvAndSetDefault("admission_controller.enabled", false) config.BindEnvAndSetDefault("admission_controller.validation.enabled", true)