Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CONTP-549] throttle the number of parallel client initial syncs in the tagger server #31741

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/cluster-agent/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func StartServer(ctx context.Context, w workloadmeta.Component, taggerComp tagge
// event size should be small enough to fit within the grpc max message size
maxEventSize := maxMessageSize / 2
pb.RegisterAgentSecureServer(grpcSrv, &serverSecure{
taggerServer: taggerserver.NewServer(taggerComp, maxEventSize),
taggerServer: taggerserver.NewServer(taggerComp, maxEventSize, cfg.GetInt("remote_tagger.max_concurrent_sync")),
})

timeout := pkgconfigsetup.Datadog().GetDuration("cluster_agent.server.idle_timeout_seconds") * time.Second
Expand Down
2 changes: 1 addition & 1 deletion comp/api/api/apiimpl/server_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (server *apiServer) startCMDServer(
pb.RegisterAgentSecureServer(s, &serverSecure{
configService: server.rcService,
configServiceMRF: server.rcServiceMRF,
taggerServer: taggerserver.NewServer(server.taggerComp, maxEventSize),
taggerServer: taggerserver.NewServer(server.taggerComp, maxEventSize, cfg.GetInt("remote_tagger.max_concurrent_sync")),
taggerComp: server.taggerComp,
// TODO(components): decide if workloadmetaServer should be componentized itself
workloadmetaServer: workloadmetaServer.NewServer(server.wmeta),
Expand Down
86 changes: 58 additions & 28 deletions comp/core/tagger/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ const (
type Server struct {
taggerComponent tagger.Component
maxEventSize int
throttler Throttler
}

// NewServer returns a new Server
func NewServer(t tagger.Component, maxEventSize int) *Server {
func NewServer(t tagger.Component, maxEventSize int, maxParallelSync int) *Server {
return &Server{
taggerComponent: t,
maxEventSize: maxEventSize,
throttler: NewSyncThrottler(uint32(maxParallelSync)),
}
}

Expand All @@ -53,6 +55,42 @@ func (s *Server) TaggerStreamEntities(in *pb.StreamTagsRequest, out pb.AgentSecu
return err
}

ticker := time.NewTicker(streamKeepAliveInterval)
defer ticker.Stop()

timeoutRefreshError := make(chan error)

go func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wondering if we can create a function to isolate this logic.
Also should it be the client who send the keepAlive and not the server?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline regarding this.

In summary, this part of the code was already there, it is just moved up a bit.
We can work on refactoring this code in a subsequent PR.

// The remote tagger client has a timeout that closes the
// connection after 10 minutes of inactivity (implemented in
// comp/core/tagger/remote/tagger.go) In order to avoid closing the
// connection and having to open it again, the server will send
// an empty message after 9 minutes of inactivity. The goal is
// only to keep the connection alive without losing the
// protection against “half” closed connections brought by the
// timeout.
for {
select {
case <-out.Context().Done():
return

case <-ticker.C:
err = grpc.DoWithTimeout(func() error {
return out.Send(&pb.StreamTagsResponse{
Events: []*pb.StreamTagsEvent{},
})
}, taggerStreamSendTimeout)

if err != nil {
log.Warnf("error sending tagger keep-alive: %s", err)
s.taggerComponent.GetTaggerTelemetryStore().ServerStreamErrors.Inc()
timeoutRefreshError <- err
return
}
}
}
}()

filterBuilder := types.NewFilterBuilder()
for _, prefix := range in.GetPrefixes() {
filterBuilder = filterBuilder.Include(types.EntityIDPrefix(prefix))
Expand All @@ -62,23 +100,27 @@ func (s *Server) TaggerStreamEntities(in *pb.StreamTagsRequest, out pb.AgentSecu

streamingID := in.GetStreamingID()
if streamingID == "" {
// this is done to preserve backward compatibility
// if CLC runner is using an old version, the streaming ID would be an empty string,
// and the server needs to auto-assign a unique id
streamingID = uuid.New().String()
}

subscriptionID := fmt.Sprintf("streaming-client-%s", streamingID)

// initBurst is a flag indicating if the initial sync is still in progress or not
// true means the sync hasn't yet been finalised
// false means the streaming client has already caught up with the server
initBurst := true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you add a comment to explain what is initBurst?

log.Debugf("requesting token from server throttler for streaming id: %q", streamingID)
tk := s.throttler.RequestToken()
defer s.throttler.Release(tk)

subscription, err := s.taggerComponent.Subscribe(subscriptionID, filter)
log.Debugf("cluster tagger has just initiated subscription for %q at time %v", subscriptionID, time.Now().Unix())
if err != nil {
log.Errorf("Failed to subscribe to tagger for subscription %q", subscriptionID)
return err
}

defer subscription.Unsubscribe()

ticker := time.NewTicker(streamKeepAliveInterval)
defer ticker.Stop()

sendFunc := func(chunk []*pb.StreamTagsEvent) error {
return grpc.DoWithTimeout(func() error {
return out.Send(&pb.StreamTagsResponse{
Expand Down Expand Up @@ -114,29 +156,17 @@ func (s *Server) TaggerStreamEntities(in *pb.StreamTagsRequest, out pb.AgentSecu
return err
}

if initBurst {
initBurst = false
s.throttler.Release(tk)
log.Infof("cluster tagger has just finished initialization for subscription %q at time %v", subscriptionID, time.Now().Unix())
}

case <-out.Context().Done():
return nil

// The remote tagger client has a timeout that closes the
// connection after 10 minutes of inactivity (implemented in
// comp/core/tagger/remote/tagger.go) In order to avoid closing the
// connection and having to open it again, the server will send
// an empty message after 9 minutes of inactivity. The goal is
// only to keep the connection alive without losing the
// protection against “half” closed connections brought by the
// timeout.
case <-ticker.C:
err = grpc.DoWithTimeout(func() error {
return out.Send(&pb.StreamTagsResponse{
Events: []*pb.StreamTagsEvent{},
})
}, taggerStreamSendTimeout)

if err != nil {
log.Warnf("error sending tagger keep-alive: %s", err)
s.taggerComponent.GetTaggerTelemetryStore().ServerStreamErrors.Inc()
return err
}
case err = <-timeoutRefreshError:
return err
}
}
}
Expand Down
62 changes: 62 additions & 0 deletions comp/core/tagger/server/syncthrottler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package server

import (
"sync"

"github.com/google/uuid"
)

type token string

// Throttler provides tokens with throttling logic that limits the number of active tokens at the same time
// When a component is done with a token, it should release the token by calling the Release method
type Throttler interface {
// RequestToken returns a token
RequestToken() token
// ReleaseToken returns token back to the throttler
// This method is idempotent (i.e. invoking it on the same token multiple times will have the same effect)
Release(t token)
}

// limiter implements the Throttler interface
type limiter struct {
mutex sync.RWMutex
tokensChan chan struct{}
activeRequests map[token]struct{}
}

// NewSyncThrottler creates and returns a new Throttler
func NewSyncThrottler(maxConcurrentSync uint32) Throttler {
return &limiter{
mutex: sync.RWMutex{},
tokensChan: make(chan struct{}, maxConcurrentSync),
activeRequests: make(map[token]struct{}),
}
}

// RequestToken implements Throttler#RequestToken
func (l *limiter) RequestToken() token {
tk := token(uuid.New().String())
l.tokensChan <- struct{}{}

l.mutex.Lock()
defer l.mutex.Unlock()

l.activeRequests[tk] = struct{}{}
return tk
}

// Release implements Throttler#Release
func (l *limiter) Release(t token) {
l.mutex.Lock()
defer l.mutex.Unlock()
if _, found := l.activeRequests[t]; found {
<-l.tokensChan
delete(l.activeRequests, t)
}
}
32 changes: 32 additions & 0 deletions comp/core/tagger/server/syncthrottler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package server

import (
"sync"
"testing"
"time"
)

func TestSyncThrottler(_ *testing.T) {

throtler := NewSyncThrottler(3)

var wg sync.WaitGroup

for i := 0; i < 30; i++ {
wg.Add(1)
go func() {
defer wg.Done()
t := throtler.RequestToken()
time.Sleep(200 * time.Millisecond)
throtler.Release(t)
throtler.Release(t) // Release method should be idempotent
}()
}

wg.Wait()
}
clamoriniere marked this conversation as resolved.
Show resolved Hide resolved
3 changes: 3 additions & 0 deletions pkg/config/setup/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,9 @@ func InitConfig(config pkgconfigmodel.Setup) {
config.BindEnvAndSetDefault("clc_runner_server_readheader_timeout", 10)
config.BindEnvAndSetDefault("clc_runner_remote_tagger_enabled", false)

// Remote tagger
config.BindEnvAndSetDefault("remote_tagger.max_concurrent_sync", 3)

// Admission controller
config.BindEnvAndSetDefault("admission_controller.enabled", false)
config.BindEnvAndSetDefault("admission_controller.validation.enabled", true)
Expand Down
Loading