Skip to content

Commit

Permalink
Add ActionLimit and a gzip writer pool to handle checkin responses (#…
Browse files Browse the repository at this point in the history
…2994)

Add a sync.Pool to share gzip writers among all resquests in order
to avoid new gzip.Writer allocations. Each writer allocation is around
1.2MB and is a limiting factor of hitting larger scales. Add action
dispatch limiter to control how quickly the checkin endpoint writes
responses in order to make better use of the writer pool.
---------

Co-authored-by: Nicolas Chaulet <[email protected]>
  • Loading branch information
michel-laterman and nchaulet authored Oct 3, 2023
1 parent 8a59a1e commit 644a711
Show file tree
Hide file tree
Showing 10 changed files with 248 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: feature

# Change summary; a 80ish characters long description of the change.
summary: Add action-dispatcher rate limiter

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
description: |
Add a sync.Pool to share gzip writers among all resquests in order to avoid new gzip.Writer allocations.
Each writer allocation is around 1.2MB and is a limiting factor of hitting larger scales.
Add a rate limiter to the action-dispatcher used by checkins (disabled by default) to control how responses are written.
# Affected component; a word indicating the component this changeset affects.
component:

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: 2994

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
#issue: https://github.com/owner/repo/1234
9 changes: 9 additions & 0 deletions fleet-server.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,15 @@ fleet:
# # max_connections is the maximum number of connnections per API endpoint
# max_connections: 0
#
# # action_limit is a limiter for the action dispatcher, it is added to control how fast the checkin endpoint writes responses when an action effecting multiple agents is detected.
# # This is done in order to be able to reuse gzip writers if gzip is requested as allocating new writers is expensive (around 1.2MB for a new allocation).
# # If the interval is too high it may negativly effect assumtions around server write timeouts and poll poll durations, if used we expect the value to be around 5ms.
# # An interval value of 0 disables the limiter by using an infinite rate limit (default behavior).
# # Burst controls concurrency and has a larger influence on the gzip writer pool size (default 5).
# action_limit:
# interval: 0
# burst: 5
#
# # endpoint specific limits below
# checkin_limit:
# interval: 1ms
Expand Down
19 changes: 15 additions & 4 deletions internal/pkg/action/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/elastic/fleet-server/v7/internal/pkg/model"
"github.com/elastic/fleet-server/v7/internal/pkg/monitor"
"github.com/elastic/fleet-server/v7/internal/pkg/sqn"
"golang.org/x/time/rate"

"github.com/rs/zerolog/log"
)
Expand All @@ -33,17 +34,23 @@ func (s Sub) Ch() chan []model.Action {

// Dispatcher tracks agent subscriptions and emits actions to the subscriptions.
type Dispatcher struct {
am monitor.SimpleMonitor
am monitor.SimpleMonitor
limit *rate.Limiter

mx sync.RWMutex
subs map[string]Sub
}

// NewDispatcher creates a Dispatcher using the provided monitor.
func NewDispatcher(am monitor.SimpleMonitor) *Dispatcher {
func NewDispatcher(am monitor.SimpleMonitor, throttle time.Duration, i int) *Dispatcher {
r := rate.Inf
if throttle > 0 {
r = rate.Every(throttle)
}
return &Dispatcher{
am: am,
subs: make(map[string]Sub),
am: am,
limit: rate.NewLimiter(r, i),
subs: make(map[string]Sub),
}
}

Expand Down Expand Up @@ -122,6 +129,10 @@ func (d *Dispatcher) process(ctx context.Context, hits []es.HitT) {
}

for agentID, actions := range agentActions {
if err := d.limit.Wait(ctx); err != nil {
log.Error().Err(err).Msg("action dispatcher rate limit error")
return
}
d.dispatch(ctx, agentID, actions)
}
}
Expand Down
62 changes: 57 additions & 5 deletions internal/pkg/action/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/elastic/fleet-server/v7/internal/pkg/sqn"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"golang.org/x/time/rate"
)

type mockMonitor struct {
Expand All @@ -38,7 +39,7 @@ func (m *mockMonitor) GetCheckpoint() sqn.SeqNo {

func TestNewDispatcher(t *testing.T) {
m := &mockMonitor{}
d := NewDispatcher(m)
d := NewDispatcher(m, 0, 0)

assert.NotNil(t, d.am)
assert.NotNil(t, d.subs)
Expand Down Expand Up @@ -67,9 +68,10 @@ func compareActions(t *testing.T, expects, results []model.Action) {

func Test_Dispatcher_Run(t *testing.T) {
tests := []struct {
name string
getMock func() *mockMonitor
expect map[string][]model.Action
name string
throttle time.Duration
getMock func() *mockMonitor
expect map[string][]model.Action
}{{
name: "one agent action",
getMock: func() *mockMonitor {
Expand Down Expand Up @@ -126,6 +128,41 @@ func Test_Dispatcher_Run(t *testing.T) {
Type: "upgrade",
}},
},
}, {
name: "three agent action with limiter",
throttle: 1 * time.Second,
getMock: func() *mockMonitor {
m := &mockMonitor{}
ch := make(chan []es.HitT)
go func() {
ch <- []es.HitT{es.HitT{
Source: json.RawMessage(`{"action_id":"test-action","agents":["agent1","agent2","agent3"],"data":{"key":"value"},"type":"upgrade"}`),
}}
}()
var rch <-chan []es.HitT = ch
m.On("Output").Return(rch)
return m
},
expect: map[string][]model.Action{
"agent1": []model.Action{model.Action{
ActionID: "test-action",
Agents: nil,
Data: json.RawMessage(`{"key":"value"}`),
Type: "upgrade",
}},
"agent2": []model.Action{model.Action{
ActionID: "test-action",
Agents: nil,
Data: json.RawMessage(`{"key":"value"}`),
Type: "upgrade",
}},
"agent3": []model.Action{model.Action{
ActionID: "test-action",
Agents: nil,
Data: json.RawMessage(`{"key":"value"}`),
Type: "upgrade",
}},
},
}, {
name: "one agent action with scheduling",
getMock: func() *mockMonitor {
Expand Down Expand Up @@ -198,8 +235,13 @@ func Test_Dispatcher_Run(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := tt.getMock()
throttle := rate.Inf
if tt.throttle > 0 {
throttle = rate.Every(tt.throttle)
}
d := &Dispatcher{
am: m,
am: m,
limit: rate.NewLimiter(throttle, 1),
subs: map[string]Sub{
"agent1": Sub{
agentID: "agent1",
Expand All @@ -216,6 +258,7 @@ func Test_Dispatcher_Run(t *testing.T) {
},
}

now := time.Now()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
Expand All @@ -224,9 +267,11 @@ func Test_Dispatcher_Run(t *testing.T) {
}()

ticker := time.NewTicker(time.Second * 5)

select {
case actions := <-d.subs["agent1"].Ch():
compareActions(t, tt.expect["agent1"], actions)
// NOTE: agent1 is not rate limited if the action limmiter is enabled for these tests.
case <-ticker.C:
t.Fatal("timeout waiting for subscription on agent1")
}
Expand All @@ -236,6 +281,9 @@ func Test_Dispatcher_Run(t *testing.T) {
select {
case actions := <-d.subs["agent2"].Ch():
compareActions(t, expect, actions)
if tt.throttle != 0 {
assert.GreaterOrEqual(t, time.Now(), now.Add(1*tt.throttle))
}
case <-ticker.C:
t.Fatal("timeout waiting for subscription on agent2")
}
Expand All @@ -246,10 +294,14 @@ func Test_Dispatcher_Run(t *testing.T) {
select {
case actions := <-d.subs["agent3"].Ch():
compareActions(t, expect, actions)
if tt.throttle != 0 {
assert.GreaterOrEqual(t, time.Now(), now.Add(2*tt.throttle))
}
case <-ticker.C:
t.Fatal("timeout waiting for subscription on agent3")
}
}

})
}
}
Expand Down
25 changes: 20 additions & 5 deletions internal/pkg/api/handleCheckin.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"math/rand"
"net/http"
"reflect"
"sync"
"time"

"github.com/elastic/fleet-server/v7/internal/pkg/action"
Expand Down Expand Up @@ -60,6 +62,11 @@ type CheckinT struct {
gcp monitor.GlobalCheckpointProvider
ad *action.Dispatcher
tr *action.TokenResolver

// gwPool is a gzip.Writer pool intended to lower the amount of writers created when responding to checkin requests.
// gzip.Writer allocations are expensive (~1.2MB each) and can exhaust an instance's memory if a lot of concurrent responses are sent (this occurs when a mass-action such as an upgrade is detected).
// effectiveness of the pool is controlled by rate limiter configured through the limit.action_limit attribute.
gwPool sync.Pool
bulker bulk.Bulk
}

Expand All @@ -83,6 +90,15 @@ func NewCheckinT(
gcp: gcp,
ad: ad,
tr: tr,
gwPool: sync.Pool{
New: func() any {
zipper, err := gzip.NewWriterLevel(io.Discard, cfg.CompressionLevel)
if err != nil {
panic(err)
}
return zipper
},
},
bulker: bulker,
}

Expand Down Expand Up @@ -498,13 +514,12 @@ func (ct *CheckinT) writeResponse(zlog zerolog.Logger, w http.ResponseWriter, r
if len(payload) > compressThreshold && compressionLevel != flate.NoCompression && acceptsEncoding(r, kEncodingGzip) {
wrCounter := datacounter.NewWriterCounter(w)

zipper, err := gzip.NewWriterLevel(wrCounter, compressionLevel)
if err != nil {
return fmt.Errorf("writeResponse new gzip: %w", err)
}
zipper, _ := ct.gwPool.Get().(*gzip.Writer)

w.Header().Set("Content-Encoding", kEncodingGzip)
defer ct.gwPool.Put(zipper)
zipper.Reset(wrCounter)

w.Header().Set("Content-Encoding", kEncodingGzip)
if _, err = zipper.Write(payload); err != nil {
return fmt.Errorf("writeResponse gzip write: %w", err)
}
Expand Down
Loading

0 comments on commit 644a711

Please sign in to comment.