Skip to content

Commit

Permalink
Use a cache instead of sync.Pool in script processor (#19562) (#19693)
Browse files Browse the repository at this point in the history
This updates the script processor to keep a simple cache of Javascript VM
sessions instead of relying on sync.Pool for this task. The size of this
cache can be controlled by the new `max_cached_sessions` option.

The reasoning behind this change is to avoid sync.Pool discarding and
re-allocating new sessions every garbage collection cycle. For large
Javascript pipelines, allocating a new session is a very expensive
operation that can take hundreds of milliseconds or even seconds to
complete. This has a severe impact in ingestion rates.

(cherry picked from commit 7930f9e)
  • Loading branch information
adriansr authored Jul 14, 2020
1 parent b80c7ef commit 19b1aec
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ field. You can revert this change by configuring tags for the module and omittin
- Add the `ignore_failure` configuration option to the dissect processor. {pull}19464[19464]
- Add the `overwrite_keys` configuration option to the dissect processor. {pull}19464[19464]
- Add support to trim captured values in the dissect processor. {pull}19464[19464]
- Added the `max_cached_sessions` option to the script processor. {pull}19562[19562]

*Auditbeat*

Expand Down
3 changes: 3 additions & 0 deletions libbeat/processors/script/docs/script.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ is interrupted. You can set this option to prevent a script from running for
too long (like preventing an infinite `while` loop). By default there is no
timeout.

`max_cached_sessions`:: This sets the maximum number of Javascript VM sessions
that will be cached to avoid reallocation. The default is `4`.

[float]
==== Event API

Expand Down
18 changes: 10 additions & 8 deletions libbeat/processors/script/javascript/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ import (

// Config defines the Javascript source files to use for the processor.
type Config struct {
Tag string `config:"tag"` // Processor ID for debug and metrics.
Source string `config:"source"` // Inline script to execute.
File string `config:"file"` // Source file.
Files []string `config:"files"` // Multiple source files.
Params map[string]interface{} `config:"params"` // Parameters to pass to script.
Timeout time.Duration `config:"timeout" validate:"min=0"` // Execution timeout.
TagOnException string `config:"tag_on_exception"` // Tag to add to events when an exception happens.
Tag string `config:"tag"` // Processor ID for debug and metrics.
Source string `config:"source"` // Inline script to execute.
File string `config:"file"` // Source file.
Files []string `config:"files"` // Multiple source files.
Params map[string]interface{} `config:"params"` // Parameters to pass to script.
Timeout time.Duration `config:"timeout" validate:"min=0"` // Execution timeout.
TagOnException string `config:"tag_on_exception"` // Tag to add to events when an exception happens.
MaxCachedSessions int `config:"max_cached_sessions" validate:"min=0"` // Max. number of cached VM sessions.
}

// Validate returns an error if one (and only one) option is not set.
Expand All @@ -57,6 +58,7 @@ func (c Config) Validate() error {

func defaultConfig() Config {
return Config{
TagOnException: "_js_exception",
TagOnException: "_js_exception",
MaxCachedSessions: 4,
}
}
40 changes: 28 additions & 12 deletions libbeat/processors/script/javascript/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package javascript

import (
"reflect"
"sync"
"time"

"github.com/dop251/goja"
Expand Down Expand Up @@ -83,17 +82,25 @@ type session struct {
}

func newSession(p *goja.Program, conf Config, test bool) (*session, error) {
// Create a logger
logger := logp.NewLogger(logName)
if conf.Tag != "" {
logger = logger.With("instance_id", conf.Tag)
}
// Measure load times
start := time.Now()
defer func() {
took := time.Now().Sub(start)
logger.Debugf("Load of javascript pipeline took %v", took)
}()
// Setup JS runtime.
s := &session{
vm: goja.New(),
log: logp.NewLogger(logName),
log: logger,
makeEvent: newBeatEventV0,
timeout: conf.Timeout,
tagOnException: conf.TagOnException,
}
if conf.Tag != "" {
s.log = s.log.With("instance_id", conf.Tag)
}

// Register modules.
for _, registerModule := range sessionHooks {
Expand Down Expand Up @@ -266,7 +273,8 @@ func init() {
}

type sessionPool struct {
pool *sync.Pool
New func() *session
C chan *session
}

func newSessionPool(p *goja.Program, c Config) (*sessionPool, error) {
Expand All @@ -275,24 +283,32 @@ func newSessionPool(p *goja.Program, c Config) (*sessionPool, error) {
return nil, err
}

pool := &sync.Pool{
New: func() interface{} {
pool := sessionPool{
New: func() *session {
s, _ := newSession(p, c, false)
return s
},
C: make(chan *session, c.MaxCachedSessions),
}
pool.Put(s)

return &sessionPool{pool}, nil
return &pool, nil
}

func (p *sessionPool) Get() *session {
s, _ := p.pool.Get().(*session)
return s
select {
case s := <-p.C:
return s
default:
return p.New()
}
}

func (p *sessionPool) Put(s *session) {
if s != nil {
p.pool.Put(s)
select {
case p.C <- s:
default:
}
}
}

0 comments on commit 19b1aec

Please sign in to comment.