From 644a711b5f8ff109b49509673d6f7112642f4c28 Mon Sep 17 00:00:00 2001 From: Michel Laterman <82832767+michel-laterman@users.noreply.github.com> Date: Tue, 3 Oct 2023 13:19:36 -0700 Subject: [PATCH] Add ActionLimit and a gzip writer pool to handle checkin responses (#2994) Add a sync.Pool to share gzip writers among all resquests in order to avoid new gzip.Writer allocations. Each writer allocation is around 1.2MB and is a limiting factor of hitting larger scales. Add action dispatch limiter to control how quickly the checkin endpoint writes responses in order to make better use of the writer pool. --------- Co-authored-by: Nicolas Chaulet --- ...66-Add-action-dispatcher-rate-limiter.yaml | 35 ++++++ fleet-server.reference.yml | 9 ++ internal/pkg/action/dispatcher.go | 19 +++- internal/pkg/action/dispatcher_test.go | 62 ++++++++++- internal/pkg/api/handleCheckin.go | 25 ++++- internal/pkg/api/handleCheckin_test.go | 100 ++++++++++++++++++ internal/pkg/config/env_defaults.go | 8 ++ internal/pkg/config/limits.go | 2 + internal/pkg/config/timeouts.go | 2 +- internal/pkg/server/fleet.go | 2 +- 10 files changed, 248 insertions(+), 16 deletions(-) create mode 100644 changelog/fragments/1696358866-Add-action-dispatcher-rate-limiter.yaml diff --git a/changelog/fragments/1696358866-Add-action-dispatcher-rate-limiter.yaml b/changelog/fragments/1696358866-Add-action-dispatcher-rate-limiter.yaml new file mode 100644 index 000000000..04c4c74f3 --- /dev/null +++ b/changelog/fragments/1696358866-Add-action-dispatcher-rate-limiter.yaml @@ -0,0 +1,35 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: feature + +# Change summary; a 80ish characters long description of the change. +summary: Add action-dispatcher rate limiter + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +description: | + Add a sync.Pool to share gzip writers among all resquests in order to avoid new gzip.Writer allocations. + Each writer allocation is around 1.2MB and is a limiting factor of hitting larger scales. + Add a rate limiter to the action-dispatcher used by checkins (disabled by default) to control how responses are written. + +# Affected component; a word indicating the component this changeset affects. +component: + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: 2994 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +#issue: https://github.com/owner/repo/1234 diff --git a/fleet-server.reference.yml b/fleet-server.reference.yml index 338f163fc..67af0bfea 100644 --- a/fleet-server.reference.yml +++ b/fleet-server.reference.yml @@ -146,6 +146,15 @@ fleet: # # max_connections is the maximum number of connnections per API endpoint # max_connections: 0 # +# # action_limit is a limiter for the action dispatcher, it is added to control how fast the checkin endpoint writes responses when an action effecting multiple agents is detected. +# # This is done in order to be able to reuse gzip writers if gzip is requested as allocating new writers is expensive (around 1.2MB for a new allocation). +# # If the interval is too high it may negativly effect assumtions around server write timeouts and poll poll durations, if used we expect the value to be around 5ms. +# # An interval value of 0 disables the limiter by using an infinite rate limit (default behavior). +# # Burst controls concurrency and has a larger influence on the gzip writer pool size (default 5). +# action_limit: +# interval: 0 +# burst: 5 +# # # endpoint specific limits below # checkin_limit: # interval: 1ms diff --git a/internal/pkg/action/dispatcher.go b/internal/pkg/action/dispatcher.go index bfd6e1df1..59fb51a3c 100644 --- a/internal/pkg/action/dispatcher.go +++ b/internal/pkg/action/dispatcher.go @@ -15,6 +15,7 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/model" "github.com/elastic/fleet-server/v7/internal/pkg/monitor" "github.com/elastic/fleet-server/v7/internal/pkg/sqn" + "golang.org/x/time/rate" "github.com/rs/zerolog/log" ) @@ -33,17 +34,23 @@ func (s Sub) Ch() chan []model.Action { // Dispatcher tracks agent subscriptions and emits actions to the subscriptions. type Dispatcher struct { - am monitor.SimpleMonitor + am monitor.SimpleMonitor + limit *rate.Limiter mx sync.RWMutex subs map[string]Sub } // NewDispatcher creates a Dispatcher using the provided monitor. -func NewDispatcher(am monitor.SimpleMonitor) *Dispatcher { +func NewDispatcher(am monitor.SimpleMonitor, throttle time.Duration, i int) *Dispatcher { + r := rate.Inf + if throttle > 0 { + r = rate.Every(throttle) + } return &Dispatcher{ - am: am, - subs: make(map[string]Sub), + am: am, + limit: rate.NewLimiter(r, i), + subs: make(map[string]Sub), } } @@ -122,6 +129,10 @@ func (d *Dispatcher) process(ctx context.Context, hits []es.HitT) { } for agentID, actions := range agentActions { + if err := d.limit.Wait(ctx); err != nil { + log.Error().Err(err).Msg("action dispatcher rate limit error") + return + } d.dispatch(ctx, agentID, actions) } } diff --git a/internal/pkg/action/dispatcher_test.go b/internal/pkg/action/dispatcher_test.go index f5eedace7..6fd152eb2 100644 --- a/internal/pkg/action/dispatcher_test.go +++ b/internal/pkg/action/dispatcher_test.go @@ -15,6 +15,7 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/sqn" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "golang.org/x/time/rate" ) type mockMonitor struct { @@ -38,7 +39,7 @@ func (m *mockMonitor) GetCheckpoint() sqn.SeqNo { func TestNewDispatcher(t *testing.T) { m := &mockMonitor{} - d := NewDispatcher(m) + d := NewDispatcher(m, 0, 0) assert.NotNil(t, d.am) assert.NotNil(t, d.subs) @@ -67,9 +68,10 @@ func compareActions(t *testing.T, expects, results []model.Action) { func Test_Dispatcher_Run(t *testing.T) { tests := []struct { - name string - getMock func() *mockMonitor - expect map[string][]model.Action + name string + throttle time.Duration + getMock func() *mockMonitor + expect map[string][]model.Action }{{ name: "one agent action", getMock: func() *mockMonitor { @@ -126,6 +128,41 @@ func Test_Dispatcher_Run(t *testing.T) { Type: "upgrade", }}, }, + }, { + name: "three agent action with limiter", + throttle: 1 * time.Second, + getMock: func() *mockMonitor { + m := &mockMonitor{} + ch := make(chan []es.HitT) + go func() { + ch <- []es.HitT{es.HitT{ + Source: json.RawMessage(`{"action_id":"test-action","agents":["agent1","agent2","agent3"],"data":{"key":"value"},"type":"upgrade"}`), + }} + }() + var rch <-chan []es.HitT = ch + m.On("Output").Return(rch) + return m + }, + expect: map[string][]model.Action{ + "agent1": []model.Action{model.Action{ + ActionID: "test-action", + Agents: nil, + Data: json.RawMessage(`{"key":"value"}`), + Type: "upgrade", + }}, + "agent2": []model.Action{model.Action{ + ActionID: "test-action", + Agents: nil, + Data: json.RawMessage(`{"key":"value"}`), + Type: "upgrade", + }}, + "agent3": []model.Action{model.Action{ + ActionID: "test-action", + Agents: nil, + Data: json.RawMessage(`{"key":"value"}`), + Type: "upgrade", + }}, + }, }, { name: "one agent action with scheduling", getMock: func() *mockMonitor { @@ -198,8 +235,13 @@ func Test_Dispatcher_Run(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { m := tt.getMock() + throttle := rate.Inf + if tt.throttle > 0 { + throttle = rate.Every(tt.throttle) + } d := &Dispatcher{ - am: m, + am: m, + limit: rate.NewLimiter(throttle, 1), subs: map[string]Sub{ "agent1": Sub{ agentID: "agent1", @@ -216,6 +258,7 @@ func Test_Dispatcher_Run(t *testing.T) { }, } + now := time.Now() ctx, cancel := context.WithCancel(context.Background()) defer cancel() go func() { @@ -224,9 +267,11 @@ func Test_Dispatcher_Run(t *testing.T) { }() ticker := time.NewTicker(time.Second * 5) + select { case actions := <-d.subs["agent1"].Ch(): compareActions(t, tt.expect["agent1"], actions) + // NOTE: agent1 is not rate limited if the action limmiter is enabled for these tests. case <-ticker.C: t.Fatal("timeout waiting for subscription on agent1") } @@ -236,6 +281,9 @@ func Test_Dispatcher_Run(t *testing.T) { select { case actions := <-d.subs["agent2"].Ch(): compareActions(t, expect, actions) + if tt.throttle != 0 { + assert.GreaterOrEqual(t, time.Now(), now.Add(1*tt.throttle)) + } case <-ticker.C: t.Fatal("timeout waiting for subscription on agent2") } @@ -246,10 +294,14 @@ func Test_Dispatcher_Run(t *testing.T) { select { case actions := <-d.subs["agent3"].Ch(): compareActions(t, expect, actions) + if tt.throttle != 0 { + assert.GreaterOrEqual(t, time.Now(), now.Add(2*tt.throttle)) + } case <-ticker.C: t.Fatal("timeout waiting for subscription on agent3") } } + }) } } diff --git a/internal/pkg/api/handleCheckin.go b/internal/pkg/api/handleCheckin.go index 7f3f666d8..fb72fe381 100644 --- a/internal/pkg/api/handleCheckin.go +++ b/internal/pkg/api/handleCheckin.go @@ -12,9 +12,11 @@ import ( "encoding/json" "errors" "fmt" + "io" "math/rand" "net/http" "reflect" + "sync" "time" "github.com/elastic/fleet-server/v7/internal/pkg/action" @@ -60,6 +62,11 @@ type CheckinT struct { gcp monitor.GlobalCheckpointProvider ad *action.Dispatcher tr *action.TokenResolver + + // gwPool is a gzip.Writer pool intended to lower the amount of writers created when responding to checkin requests. + // gzip.Writer allocations are expensive (~1.2MB each) and can exhaust an instance's memory if a lot of concurrent responses are sent (this occurs when a mass-action such as an upgrade is detected). + // effectiveness of the pool is controlled by rate limiter configured through the limit.action_limit attribute. + gwPool sync.Pool bulker bulk.Bulk } @@ -83,6 +90,15 @@ func NewCheckinT( gcp: gcp, ad: ad, tr: tr, + gwPool: sync.Pool{ + New: func() any { + zipper, err := gzip.NewWriterLevel(io.Discard, cfg.CompressionLevel) + if err != nil { + panic(err) + } + return zipper + }, + }, bulker: bulker, } @@ -498,13 +514,12 @@ func (ct *CheckinT) writeResponse(zlog zerolog.Logger, w http.ResponseWriter, r if len(payload) > compressThreshold && compressionLevel != flate.NoCompression && acceptsEncoding(r, kEncodingGzip) { wrCounter := datacounter.NewWriterCounter(w) - zipper, err := gzip.NewWriterLevel(wrCounter, compressionLevel) - if err != nil { - return fmt.Errorf("writeResponse new gzip: %w", err) - } + zipper, _ := ct.gwPool.Get().(*gzip.Writer) - w.Header().Set("Content-Encoding", kEncodingGzip) + defer ct.gwPool.Put(zipper) + zipper.Reset(wrCounter) + w.Header().Set("Content-Encoding", kEncodingGzip) if _, err = zipper.Write(payload); err != nil { return fmt.Errorf("writeResponse gzip write: %w", err) } diff --git a/internal/pkg/api/handleCheckin_test.go b/internal/pkg/api/handleCheckin_test.go index 102d9c82d..50dd09a1d 100644 --- a/internal/pkg/api/handleCheckin_test.go +++ b/internal/pkg/api/handleCheckin_test.go @@ -7,8 +7,11 @@ package api import ( + "compress/flate" "context" "encoding/json" + "net/http" + "net/http/httptest" "testing" "time" @@ -27,6 +30,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" ) func TestConvertActions(t *testing.T) { @@ -478,3 +482,99 @@ func TestProcessUpgradeDetails(t *testing.T) { }) } } + +func Test_CheckinT_writeResponse(t *testing.T) { + tests := []struct { + name string + req *http.Request + respHeader string + }{{ + name: "no compression", + req: &http.Request{}, + respHeader: "", + }, { + name: "with compression", + req: &http.Request{ + Header: http.Header{ + "Accept-Encoding": []string{"gzip"}, + }, + }, + respHeader: "gzip", + }} + + verCon := mustBuildConstraints("8.0.0") + cfg := &config.Server{ + CompressionLevel: flate.BestSpeed, + CompressionThresh: 1, + } + + ct := NewCheckinT(verCon, cfg, nil, nil, nil, nil, nil, nil, ftesting.NewMockBulk()) + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + wr := httptest.NewRecorder() + err := ct.writeResponse(testlog.SetLogger(t), wr, test.req, &model.Agent{}, CheckinResponse{ + Action: "checkin", + }) + resp := wr.Result() + defer resp.Body.Close() + require.NoError(t, err) + assert.Equal(t, test.respHeader, resp.Header.Get("Content-Encoding")) + }) + } +} + +func Benchmark_CheckinT_writeResponse(b *testing.B) { + verCon := mustBuildConstraints("8.0.0") + cfg := &config.Server{ + CompressionLevel: flate.BestSpeed, + CompressionThresh: 1, + } + ct := NewCheckinT(verCon, cfg, nil, nil, nil, nil, nil, nil, ftesting.NewMockBulk()) + + logger := testlog.SetLogger(b) + req := &http.Request{ + Header: http.Header{ + "Accept-Encoding": []string{"gzip"}, + }, + } + agent := &model.Agent{} + resp := CheckinResponse{ + Action: "checkin", + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := ct.writeResponse(logger, httptest.NewRecorder(), req, agent, resp) + require.NoError(b, err) + } +} + +func BenchmarkParallel_CheckinT_writeResponse(b *testing.B) { + verCon := mustBuildConstraints("8.0.0") + cfg := &config.Server{ + CompressionLevel: flate.BestSpeed, + CompressionThresh: 1, + } + ct := NewCheckinT(verCon, cfg, nil, nil, nil, nil, nil, nil, ftesting.NewMockBulk()) + + logger := testlog.SetLogger(b) + req := &http.Request{ + Header: http.Header{ + "Accept-Encoding": []string{"gzip"}, + }, + } + agent := &model.Agent{} + resp := CheckinResponse{ + Action: "checkin", + } + + b.ResetTimer() + b.SetParallelism(100) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + err := ct.writeResponse(logger, httptest.NewRecorder(), req, agent, resp) + require.NoError(b, err) + } + }) +} diff --git a/internal/pkg/config/env_defaults.go b/internal/pkg/config/env_defaults.go index 99e43dda0..b93be0219 100644 --- a/internal/pkg/config/env_defaults.go +++ b/internal/pkg/config/env_defaults.go @@ -26,6 +26,9 @@ const ( defaultMaxConnections = 0 // no limit defaultPolicyThrottle = time.Millisecond * 5 + defaultActionInterval = 0 // no throttle + defaultActionBurst = 5 + defaultCheckinInterval = time.Millisecond defaultCheckinBurst = 1000 defaultCheckinMax = 0 @@ -122,6 +125,7 @@ type serverLimitDefaults struct { PolicyThrottle time.Duration `config:"policy_throttle"` MaxConnections int `config:"max_connections"` + ActionLimit limit `config:"action_limit"` CheckinLimit limit `config:"checkin_limit"` ArtifactLimit limit `config:"artifact_limit"` EnrollLimit limit `config:"enroll_limit"` @@ -139,6 +143,10 @@ func defaultserverLimitDefaults() *serverLimitDefaults { PolicyThrottle: defaultPolicyThrottle, MaxConnections: defaultMaxConnections, + ActionLimit: limit{ + Interval: defaultActionInterval, + Burst: defaultActionBurst, + }, CheckinLimit: limit{ Interval: defaultCheckinInterval, Burst: defaultCheckinBurst, diff --git a/internal/pkg/config/limits.go b/internal/pkg/config/limits.go index 21c36b6bf..70c7a089b 100644 --- a/internal/pkg/config/limits.go +++ b/internal/pkg/config/limits.go @@ -21,6 +21,7 @@ type ServerLimits struct { MaxHeaderByteSize int `config:"max_header_byte_size"` MaxConnections int `config:"max_connections"` + ActionLimit Limit `config:"action_limit"` CheckinLimit Limit `config:"checkin_limit"` ArtifactLimit Limit `config:"artifact_limit"` EnrollLimit Limit `config:"enroll_limit"` @@ -51,6 +52,7 @@ func (c *ServerLimits) LoadLimits(limits *envLimits) { c.PolicyThrottle = l.PolicyThrottle } + c.ActionLimit = mergeEnvLimit(c.ActionLimit, l.ActionLimit) c.CheckinLimit = mergeEnvLimit(c.CheckinLimit, l.CheckinLimit) c.ArtifactLimit = mergeEnvLimit(c.ArtifactLimit, l.ArtifactLimit) c.EnrollLimit = mergeEnvLimit(c.EnrollLimit, l.EnrollLimit) diff --git a/internal/pkg/config/timeouts.go b/internal/pkg/config/timeouts.go index ca0cc84a9..faf5860c9 100644 --- a/internal/pkg/config/timeouts.go +++ b/internal/pkg/config/timeouts.go @@ -57,7 +57,7 @@ func (c *ServerTimeouts) InitDefaults() { // Long poll timeout, will be short-circuited on policy change c.CheckinLongPoll = 5 * time.Minute - // Jitter subtracted from c.CheckinLongPoll. Disabled if zero. + // Jitter subtracted from c.CheckinLongPoll. Disabled if zero. c.CheckinJitter = 30 * time.Second // MaxPoll is the maximum allowed value for a long poll when the client specified poll_timeout value is used. diff --git a/internal/pkg/server/fleet.go b/internal/pkg/server/fleet.go index 0d6a26a42..fa489d8ee 100644 --- a/internal/pkg/server/fleet.go +++ b/internal/pkg/server/fleet.go @@ -505,7 +505,7 @@ func (f *Fleet) runSubsystems(ctx context.Context, cfg *config.Config, g *errgro } g.Go(loggedRunFunc(ctx, "Revision monitor", am.Run)) - ad = action.NewDispatcher(am) + ad = action.NewDispatcher(am, cfg.Inputs[0].Server.Limits.ActionLimit.Interval, cfg.Inputs[0].Server.Limits.ActionLimit.Burst) g.Go(loggedRunFunc(ctx, "Revision dispatcher", ad.Run)) tr, err = action.NewTokenResolver(bulker) if err != nil {