From df3fcec296799b07ba1d292c8f63db036e1b4e6f Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Thu, 18 Nov 2021 09:27:51 +0100 Subject: [PATCH] [filebeat][s3] Add custom parsing script for S3 notifications (#28946) * Add custom parsing script for S3 notifications * Remove unnecessary custom jsmapstr type. It can be used as a regular JS map since its only purpose is to be read. * add docs and changelog entry * Remove commented code * Document script options restriction * Better error if Records are missing in notification * Fix test * Pass notification as string and add xml parsing options for the scripts --- CHANGELOG.next.asciidoc | 2 + .../docs/inputs/input-aws-s3.asciidoc | 255 ++++++++++++++ x-pack/filebeat/input/awss3/config.go | 31 ++ x-pack/filebeat/input/awss3/input.go | 6 +- .../input/awss3/input_benchmark_test.go | 2 +- x-pack/filebeat/input/awss3/script.go | 150 +++++++++ .../input/awss3/script_jss3event_v2.go | 69 ++++ .../input/awss3/script_jss3event_v2_test.go | 60 ++++ x-pack/filebeat/input/awss3/script_session.go | 217 ++++++++++++ .../input/awss3/script_session_test.go | 317 ++++++++++++++++++ x-pack/filebeat/input/awss3/sqs_s3_event.go | 16 +- .../filebeat/input/awss3/sqs_s3_event_test.go | 34 +- 12 files changed, 1148 insertions(+), 11 deletions(-) create mode 100644 x-pack/filebeat/input/awss3/script.go create mode 100644 x-pack/filebeat/input/awss3/script_jss3event_v2.go create mode 100644 x-pack/filebeat/input/awss3/script_jss3event_v2_test.go create mode 100644 x-pack/filebeat/input/awss3/script_session.go create mode 100644 x-pack/filebeat/input/awss3/script_session_test.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index dcd03b0ee70..bd473812131 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -336,6 +336,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add support for '/var/log/pods/' path for add_kubernetes_metadata processor with `resource_type: pod`. {pull}28868[28868] - Add documentation for add_kubernetes_metadata processors `log_path` matcher. {pull}28868[28868] - Add support in aws-s3 input for s3 notification from SNS to SQS. {pull}28800[28800] +- Add support in aws-s3 input for custom script parsing of s3 notifications. {pull}28946[28946] +- Improve error handling in aws-s3 input for malformed s3 notifications. {issue}28828[28828] {pull}28946[28946] *Heartbeat* diff --git a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc index 696a7368e3f..ec7a16cd67b 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc @@ -282,6 +282,90 @@ attribute. The default value is 5. If you have configured a dead letter queue then you can set this value to `-1` to disable deletion on failure. +[float] +==== `sqs.notification_parsing_script.source` + +Inline Javascript source code. + +[source,yaml] +---- +sqs.notification_parsing_script.source: > + function parse(notification) { + var evts = []; + var evt = new S3EventV2(); + evt.SetS3BucketName(notification.bucket); + evt.SetS3ObjectKey(notification.path); + evts.push(evt); + return evts; + } +---- + +[float] +==== `sqs.notification_parsing_script.file` + +Path to a script file to load. Relative paths are interpreted as +relative to the `path.config` directory. Globs are expanded. + +This loads `filter.js` from disk. + +[source,yaml] +---- +sqs.notification_parsing_script.file: ${path.config}/filter.js +---- + +[float] +==== `sqs.notification_parsing_script.files` + +List of script files to load. The scripts are concatenated together. +Relative paths are interpreted as relative to the `path.config` directory. +And globs are expanded. + +[float] +==== `sqs.notification_parsing_script.params` + +A dictionary of parameters that are passed to the `register` of the +script. + +Parameters can be passed to the script by adding `params` to the config. +This allows for a script to be made reusable. When using `params` the +code must define a `register(params)` function to receive the parameters. + +[source,yaml] +---- +sqs.notification_parsing_script: + params: + provider: aws:s3 + source: > + var params = {provider: ""}; + function register(scriptParams) { + params = scriptParams; + } + function parse(notification) { + var evts = []; + var evt = new S3EventV2(); + evt.SetS3BucketName(notification.bucket); + evt.SetS3ObjectKey(notification.path); + evt.SetProvider(params.provider); + evts.push(evt); + return evts; + } +---- + +[float] +==== `sqs.notification_parsing_script.timeout` + +This sets an execution timeout for the `process` function. When +the `process` function takes longer than the `timeout` period the function +is interrupted. You can set this option to prevent a script from running for +too long (like preventing an infinite `while` loop). By default there is no +timeout. + +[float] +==== `sqs.notification_parsing_script.max_cached_sessions` + +This sets the maximum number of Javascript VM sessions +that will be cached to avoid reallocation. + [float] ==== `sqs.wait_time` @@ -426,6 +510,177 @@ Therefore, when using the polling list of S3 bucket objects method, scaling shou vertical, with a single bigger {beatname_uc} instance and higher `number_of_workers` config value. +[float] +=== SQS Custom Notification Parsing Script + +Under some circumstances you might want to listen to events that are not following +the standard SQS notifications format. To be able to parse them, it is possible to +define a custom script that will take care of processing them and generating the +required list of S3 Events used to download the files. + +The `sqs.notification_parsing_script` executes Javascript code to process an event. +It uses a pure Go implementation of ECMAScript 5.1 and has no external dependencies. + +It can be configured by embedding Javascript in your configuration file or by pointing +the processor at external file(s). Only one of the options `sqs.notification_parsing_script.source`, `sqs.notification_parsing_script.file`, and `sqs.notification_parsing_script.files` +can be set at the same time. + +The script requires a `parse(notification)` function that receives the notification as +a raw string and returns a list of `S3EventV2` objects. This raw string can then be +processed as needed, e.g.: `JSON.parse(n)` or the provided helper for XML `new XMLDecoder(n)`. + +If the script defines a `test()` function it will be invoked when it is loaded. Any exceptions thrown will cause the processor to fail to load. This can be used to make assertions about the behavior of the script. + +[source,javascript] +---- +function parse(n) { + var m = JSON.parse(n); + var evts = []; + var files = m.files; + var bucket = m.bucket; + + if (!Array.isArray(files) || (files.length == 0) || bucket == null || bucket == "") { + return evts; + } + + files.forEach(function(f){ + var evt = new S3EventV2(); + evt.SetS3BucketName(bucket); + evt.SetS3ObjectKey(f.path); + evts.push(evt); + }); + + return evts; +} + +function test() { + var events = parse({bucket: "aBucket", files: [{path: "path/to/file"}]}); + if (events.length !== 1) { + throw "expecting one event"; + } + if (events[0].S3.Bucket.Name === "aBucket") { + throw "expected bucket === aBucket"; + } + if (events[0].S3.Object.Key === "path/to/file") { + throw "expected bucket === path/to/file"; + } +} +---- + +[float] +==== S3EventV2 API + +The `S3EventV2` object returned by the `parse` method. + +[frame="topbot",options="header"] +|=== +|Method |Description + +|`new S3EventV2()` +|Returns a new `S3EventV2` object. + +*Example*: `var evt = new S3EventV2();` + +|`SetAWSRegion(string)` +|Sets the AWS region. + +*Example*: `evt.SetAWSRegion("us-east-1");` + +|`SetProvider(string)` +|Sets the provider. + +*Example*: `evt.SetProvider("provider");` + +|`SetEventName(string)` +|Sets the event name. + +*Example*: `evt.SetEventName("event-type");` + +|`SetEventSource(string)` +|Sets the event surce. + +*Example*: `evt.SetEventSource("aws:s3");` + +|`SetS3BucketName(string)` +|Sets the bucket name. + +*Example*: `evt.SetS3BucketName("bucket-name");` + +|`SetS3BucketARN(string)` +|Sets the bucket ARN. + +*Example*: `evt.SetS3BucketARN("bucket-ARN");` + +|`SetS3ObjectKey(string)` +|Sets the object key. + +*Example*: `evt.SetS3ObjectKey("path/to/object");` + +|=== + +In order to be able to retrieve an S3 object successfully, at least `S3.Object.Key` +and `S3.Bucket.Name` properties must be set (using the provided setters). The other +properties will be used as metadata in the resulting event when available. + +[float] +==== XMLDecoder API + +To help with XML decoding, an `XMLDecoder` class is provided. + +Example XML input: + +[source,xml] +------------------------------------------------------------------------------- + + + William H. Gaddis + The Recognitions + One of the great seminal American novels of the 20th century. + + +------------------------------------------------------------------------------- + +Will produce the following output: + +[source,json] +------------------------------------------------------------------------------- +{ + "catalog": { + "book": { + "author": "William H. Gaddis", + "review": "One of the great seminal American novels of the 20th century.", + "seq": "1", + "title": "The Recognitions" + } + } +} +------------------------------------------------------------------------------- + +[frame="topbot",options="header"] +|=== +|Method |Description + +|`new XMLDecoder(string)` +|Returns a new `XMLDecoder` object to decode the provided `string`. + +*Example*: `var dec = new XMLDecoder(n);` + +|`PrependHyphenToAttr()` +|Causes the Decoder to prepend a hyphen (`-`) to to all XML attribute names. + +*Example*: `dec.PrependHyphenToAttr();` + +|`LowercaseKeys()` +|Causes the Decoder to transform all key name to lowercase. + +*Example*: `dec.LowercaseKeys();` + +|`Decode()` +|Reads the XML string and return a map containing the data. + +*Example*: `var m = dec.Decode();` + +|=== [float] === Metrics diff --git a/x-pack/filebeat/input/awss3/config.go b/x-pack/filebeat/input/awss3/config.go index 5b8308d1771..d25b99a69bd 100644 --- a/x-pack/filebeat/input/awss3/config.go +++ b/x-pack/filebeat/input/awss3/config.go @@ -24,6 +24,7 @@ type config struct { VisibilityTimeout time.Duration `config:"visibility_timeout"` SQSWaitTime time.Duration `config:"sqs.wait_time"` // The max duration for which the SQS ReceiveMessage call waits for a message to arrive in the queue before returning. SQSMaxReceiveCount int `config:"sqs.max_receive_count"` // The max number of times a message should be received (retried) before deleting it. + SQSScript *scriptConfig `config:"sqs.notification_parsing_script"` FIPSEnabled bool `config:"fips_enabled"` MaxNumberOfMessages int `config:"max_number_of_messages"` QueueURL string `config:"queue_url"` @@ -151,6 +152,36 @@ func (rc *readerConfig) Validate() error { return nil } +type scriptConfig struct { + Source string `config:"source"` // Inline script to execute. + File string `config:"file"` // Source file. + Files []string `config:"files"` // Multiple source files. + Params map[string]interface{} `config:"params"` // Parameters to pass to script. + Timeout time.Duration `config:"timeout" validate:"min=0"` // Execution timeout. + MaxCachedSessions int `config:"max_cached_sessions" validate:"min=0"` // Max. number of cached VM sessions. +} + +// Validate returns an error if one (and only one) option is not set. +func (c scriptConfig) Validate() error { + numConfigured := 0 + for _, set := range []bool{c.Source != "", c.File != "", len(c.Files) > 0} { + if set { + numConfigured++ + } + } + + switch { + case numConfigured == 0: + return errors.New("javascript must be defined via 'file', " + + "'files', or inline as 'source'") + case numConfigured > 1: + return errors.New("javascript can be defined in only one of " + + "'file', 'files', or inline as 'source'") + } + + return nil +} + func (rc *readerConfig) InitDefaults() { rc.BufferSize = 16 * humanize.KiByte rc.MaxBytes = 10 * humanize.MiByte diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index edd146b6a23..bf3f8cf28b2 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -186,8 +186,12 @@ func (in *s3Input) createSQSReceiver(ctx v2.Context, client beat.Client) (*sqsRe if len(in.config.FileSelectors) == 0 { fileSelectors = []fileSelectorConfig{{ReaderConfig: in.config.ReaderConfig}} } + script, err := newScriptFromConfig(log.Named("sqs_script"), in.config.SQSScript) + if err != nil { + return nil, err + } s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, client, fileSelectors) - sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), metrics, sqsAPI, in.config.VisibilityTimeout, in.config.SQSMaxReceiveCount, s3EventHandlerFactory) + sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), metrics, sqsAPI, script, in.config.VisibilityTimeout, in.config.SQSMaxReceiveCount, s3EventHandlerFactory) sqsReader := newSQSReader(log.Named("sqs"), metrics, sqsAPI, in.config.MaxNumberOfMessages, sqsMessageHandler) return sqsReader, nil diff --git a/x-pack/filebeat/input/awss3/input_benchmark_test.go b/x-pack/filebeat/input/awss3/input_benchmark_test.go index aabb86b1a6c..ecdc1756ce4 100644 --- a/x-pack/filebeat/input/awss3/input_benchmark_test.go +++ b/x-pack/filebeat/input/awss3/input_benchmark_test.go @@ -166,7 +166,7 @@ func benchmarkInputSQS(t *testing.T, maxMessagesInflight int) testing.BenchmarkR conf := makeBenchmarkConfig(t) s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, client, conf.FileSelectors) - sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), metrics, sqsAPI, time.Minute, 5, s3EventHandlerFactory) + sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), metrics, sqsAPI, nil, time.Minute, 5, s3EventHandlerFactory) sqsReader := newSQSReader(log.Named("sqs"), metrics, sqsAPI, maxMessagesInflight, sqsMessageHandler) go func() { diff --git a/x-pack/filebeat/input/awss3/script.go b/x-pack/filebeat/input/awss3/script.go new file mode 100644 index 00000000000..812fbe65dc5 --- /dev/null +++ b/x-pack/filebeat/input/awss3/script.go @@ -0,0 +1,150 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package awss3 + +import ( + "bytes" + "io" + "os" + "path/filepath" + "runtime" + "strings" + + "github.com/dop251/goja" + "github.com/pkg/errors" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/paths" +) + +type script struct { + scriptConfig + sessionPool *sessionPool + sourceProg *goja.Program + sourceFile string +} + +// newScriptFromConfig constructs a new Javascript script from the given config +// object. It loads the sources, compiles them, and validates the entry point. +func newScriptFromConfig(log *logp.Logger, c *scriptConfig) (*script, error) { + if c == nil { + return nil, nil + } + err := c.Validate() + if err != nil { + return nil, err + } + + var sourceFile string + var sourceCode []byte + + switch { + case c.Source != "": + sourceFile = "inline.js" + sourceCode = []byte(c.Source) + case c.File != "": + sourceFile, sourceCode, err = loadSources(c.File) + case len(c.Files) > 0: + sourceFile, sourceCode, err = loadSources(c.Files...) + } + if err != nil { + return nil, err + } + + // Validate processor source code. + prog, err := goja.Compile(sourceFile, string(sourceCode), true) + if err != nil { + return nil, err + } + + pool, err := newSessionPool(prog, *c) + if err != nil { + return nil, err + } + + return &script{ + scriptConfig: *c, + sessionPool: pool, + sourceProg: prog, + sourceFile: sourceFile, + }, nil +} + +// loadSources loads javascript source from files. +func loadSources(files ...string) (string, []byte, error) { + var sources []string + buf := new(bytes.Buffer) + + readFile := func(path string) error { + if common.IsStrictPerms() { + if err := common.OwnerHasExclusiveWritePerms(path); err != nil { + return err + } + } + + f, err := os.Open(path) + if err != nil { + return errors.Wrapf(err, "failed to open file %v", path) + } + defer f.Close() + + if _, err = io.Copy(buf, f); err != nil { + return errors.Wrapf(err, "failed to read file %v", path) + } + return nil + } + + for _, filePath := range files { + filePath = paths.Resolve(paths.Config, filePath) + + if hasMeta(filePath) { + matches, err := filepath.Glob(filePath) + if err != nil { + return "", nil, err + } + sources = append(sources, matches...) + } else { + sources = append(sources, filePath) + } + } + + if len(sources) == 0 { + return "", nil, errors.Errorf("no sources were found in %v", + strings.Join(files, ", ")) + } + + for _, name := range sources { + if err := readFile(name); err != nil { + return "", nil, err + } + } + + return strings.Join(sources, ";"), buf.Bytes(), nil +} + +// run runs the parse function. It receives a raw notification +// as a string and returns a list of S3 Events describing +// which files are going to be downloaded. +func (p *script) run(n string) ([]s3EventV2, error) { + s := p.sessionPool.Get() + defer p.sessionPool.Put(s) + + return s.runParseFunc(n) +} + +func (p *script) String() string { + return "script=[type=javascript, sources=" + p.sourceFile + "]" +} + +// hasMeta reports whether path contains any of the magic characters +// recognized by Match/Glob. +func hasMeta(path string) bool { + magicChars := `*?[` + if runtime.GOOS != "windows" { + magicChars = `*?[\` + } + return strings.ContainsAny(path, magicChars) +} diff --git a/x-pack/filebeat/input/awss3/script_jss3event_v2.go b/x-pack/filebeat/input/awss3/script_jss3event_v2.go new file mode 100644 index 00000000000..04cea00a08b --- /dev/null +++ b/x-pack/filebeat/input/awss3/script_jss3event_v2.go @@ -0,0 +1,69 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package awss3 + +import ( + "strings" + + "github.com/dop251/goja" + "github.com/pkg/errors" + + "github.com/elastic/beats/v7/libbeat/common/encoding/xml" +) + +func newJSS3EventV2Constructor(s *session) func(call goja.ConstructorCall) *goja.Object { + return func(call goja.ConstructorCall) *goja.Object { + if len(call.Arguments) != 0 { + panic(errors.New("Event constructor don't accept arguments")) + } + return s.vm.ToValue(&s3EventV2{}).(*goja.Object) + } +} + +func (e *s3EventV2) SetAWSRegion(v string) { + e.AWSRegion = v +} + +func (e *s3EventV2) SetProvider(v string) { + e.Provider = v +} + +func (e *s3EventV2) SetEventName(v string) { + e.EventName = v +} + +func (e *s3EventV2) SetEventSource(v string) { + e.EventSource = v +} + +func (e *s3EventV2) SetS3BucketName(v string) { + e.S3.Bucket.Name = v +} + +func (e *s3EventV2) SetS3BucketARN(v string) { + e.S3.Bucket.ARN = v +} + +func (e *s3EventV2) SetS3ObjectKey(v string) { + e.S3.Object.Key = v +} + +func newXMLDecoderConstructor(s *session) func(call goja.ConstructorCall) *goja.Object { + return func(call goja.ConstructorCall) *goja.Object { + if len(call.Arguments) != 1 { + panic(errors.New("Event constructor requires one argument")) + } + + a0 := call.Argument(0).Export() + s0, ok := a0.(string) + + if !ok { + panic(errors.Errorf("Event constructor requires a "+ + "string argument but got %T", a0)) + } + + return s.vm.ToValue(xml.NewDecoder(strings.NewReader(s0))).(*goja.Object) + } +} diff --git a/x-pack/filebeat/input/awss3/script_jss3event_v2_test.go b/x-pack/filebeat/input/awss3/script_jss3event_v2_test.go new file mode 100644 index 00000000000..dc387d95e33 --- /dev/null +++ b/x-pack/filebeat/input/awss3/script_jss3event_v2_test.go @@ -0,0 +1,60 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package awss3 + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/logp" +) + +const ( + header = `function parse(n) {` + footer = `}` +) + +var log = logp.NewLogger("test") + +func TestJSS3EventV2(t *testing.T) { + logp.TestingSetup() + + source := ` + var evts = []; + var evt = new S3EventV2(); + evt.SetAWSRegion("region"); + evt.SetProvider("provider"); + evt.SetEventName("name"); + evt.SetEventSource("source"); + evt.SetS3BucketName("bucket"); + evt.SetS3BucketARN("arn"); + evt.SetS3ObjectKey("key"); + evts.push(evt); + return evts; + ` + + p, err := newScriptFromConfig(log, &scriptConfig{Source: header + source + footer}) + if err != nil { + t.Fatal(err) + } + + evts, err := p.run(`{}`) + require.NoError(t, err) + require.Equal(t, 1, len(evts)) + + exp := s3EventV2{ + AWSRegion: "region", + Provider: "provider", + EventName: "name", + EventSource: "source", + } + exp.S3.Bucket.Name = "bucket" + exp.S3.Bucket.ARN = "arn" + exp.S3.Object.Key = "key" + + assert.EqualValues(t, exp, evts[0]) +} diff --git a/x-pack/filebeat/input/awss3/script_session.go b/x-pack/filebeat/input/awss3/script_session.go new file mode 100644 index 00000000000..aad0539665e --- /dev/null +++ b/x-pack/filebeat/input/awss3/script_session.go @@ -0,0 +1,217 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package awss3 + +import ( + "fmt" + "reflect" + "time" + + "github.com/dop251/goja" + "github.com/pkg/errors" + "go.uber.org/zap" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" +) + +const ( + logName = "awss3.script" + + entryPointFunction = "parse" + registerFunction = "register" + testFunction = "test" + + timeoutError = "javascript parser execution timeout" +) + +// session is a javascript runtime environment used throughout the life of +// the input instance. +type session struct { + vm *goja.Runtime + log *logp.Logger + parseFunc goja.Callable + timeout time.Duration +} + +func newSession(p *goja.Program, conf scriptConfig, test bool) (*session, error) { + // Create a logger + logger := logp.NewLogger(logName) + + // Setup JS runtime. + s := &session{ + vm: goja.New(), + log: logger, + timeout: conf.Timeout, + } + + // Register common.MapStr as being a simple map[string]interface{} for + // treatment within the JS VM. + s.vm.RegisterSimpleMapType(reflect.TypeOf(common.MapStr(nil)), + func(i interface{}) map[string]interface{} { + return map[string]interface{}(i.(common.MapStr)) + }, + ) + + // Register constructors for 'new S3EventV2' to enable creating them from the JS code. + s.vm.Set("S3EventV2", newJSS3EventV2Constructor(s)) + s.vm.Set("XMLDecoder", newXMLDecoderConstructor(s)) + + if _, err := s.vm.RunProgram(p); err != nil { + return nil, err + } + + if err := s.setParseFunction(); err != nil { + return nil, err + } + + if len(conf.Params) > 0 { + if err := s.registerScriptParams(conf.Params); err != nil { + return nil, err + } + } + + if test { + if err := s.executeTestFunction(); err != nil { + return nil, err + } + } + + return s, nil +} + +// setParseFunction validates that the parse() function exists and stores +// the handle. +func (s *session) setParseFunction() error { + parseFunc := s.vm.Get(entryPointFunction) + if parseFunc == nil { + return errors.New("parse function not found") + } + if parseFunc.ExportType().Kind() != reflect.Func { + return errors.New("parse is not a function") + } + if err := s.vm.ExportTo(parseFunc, &s.parseFunc); err != nil { + return errors.Wrap(err, "failed to export parse function") + } + return nil +} + +// registerScriptParams calls the register() function and passes the params. +func (s *session) registerScriptParams(params map[string]interface{}) error { + registerFunc := s.vm.Get(registerFunction) + if registerFunc == nil { + return errors.New("params were provided but no register function was found") + } + if registerFunc.ExportType().Kind() != reflect.Func { + return errors.New("register is not a function") + } + var register goja.Callable + if err := s.vm.ExportTo(registerFunc, ®ister); err != nil { + return errors.Wrap(err, "failed to export register function") + } + if _, err := register(goja.Undefined(), s.vm.ToValue(params)); err != nil { + return errors.Wrap(err, "failed to register script_params") + } + s.log.Debug("Registered params with script") + return nil +} + +// executeTestFunction executes the test() function if it exists. Any exceptions +// will cause the script to fail to load. +func (s *session) executeTestFunction() error { + if testFunc := s.vm.Get(testFunction); testFunc != nil { + if testFunc.ExportType().Kind() != reflect.Func { + return errors.New("test is not a function") + } + var test goja.Callable + if err := s.vm.ExportTo(testFunc, &test); err != nil { + return errors.Wrap(err, "failed to export test function") + } + _, err := test(goja.Undefined(), nil) + if err != nil { + return errors.Wrap(err, "failed in test() function") + } + s.log.Debugf("Successful test() execution for script.") + } + return nil +} + +// runParseFunc executes parse() from the JS script. +func (s *session) runParseFunc(n string) (out []s3EventV2, err error) { + defer func() { + if r := recover(); r != nil { + s.log.Errorw("The javascript script caused an unexpected panic "+ + "while parsing a notification. Recovering, but please report this.", + "notification", common.MapStr{"original": n}, + "panic", r, + zap.Stack("stack")) + err = fmt.Errorf("unexpected panic in javascript script: %v", r) + } + }() + + // Interrupt the JS code if execution exceeds timeout. + if s.timeout > 0 { + t := time.AfterFunc(s.timeout, func() { + s.vm.Interrupt(timeoutError) + }) + defer t.Stop() + } + + v, err := s.parseFunc(goja.Undefined(), s.vm.ToValue(n)) + if err != nil { + return nil, fmt.Errorf("failed in parse function: %w", err) + } + + if v.Equals(goja.Undefined()) { + return out, nil + } + + if err := s.vm.ExportTo(v, &out); err != nil { + return nil, fmt.Errorf("can't export returned value: %w", err) + } + + return out, nil +} + +type sessionPool struct { + New func() *session + C chan *session +} + +func newSessionPool(p *goja.Program, c scriptConfig) (*sessionPool, error) { + s, err := newSession(p, c, true) + if err != nil { + return nil, err + } + + pool := sessionPool{ + New: func() *session { + s, _ := newSession(p, c, false) + return s + }, + C: make(chan *session, c.MaxCachedSessions), + } + pool.Put(s) + + return &pool, nil +} + +func (p *sessionPool) Get() *session { + select { + case s := <-p.C: + return s + default: + return p.New() + } +} + +func (p *sessionPool) Put(s *session) { + if s != nil { + select { + case p.C <- s: + default: + } + } +} diff --git a/x-pack/filebeat/input/awss3/script_session_test.go b/x-pack/filebeat/input/awss3/script_session_test.go new file mode 100644 index 00000000000..4cad65b03fd --- /dev/null +++ b/x-pack/filebeat/input/awss3/script_session_test.go @@ -0,0 +1,317 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package awss3 + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/elastic/beats/v7/libbeat/logp" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestSessionScriptParams(t *testing.T) { + logp.TestingSetup() + + t.Run("register method is optional", func(t *testing.T) { + _, err := newScriptFromConfig(log, &scriptConfig{Source: header + footer}) + if err != nil { + t.Fatal(err) + } + }) + + t.Run("register required for params", func(t *testing.T) { + _, err := newScriptFromConfig(log, &scriptConfig{Source: header + footer, Params: map[string]interface{}{ + "p1": 42, + }, + }) + if assert.Error(t, err) { + assert.Contains(t, err.Error(), "params were provided") + } + }) + + t.Run("register params", func(t *testing.T) { + const script = ` + function register(params) { + if (params["p1"] !== 42) { + throw "invalid p1"; + } + } + + function parse(n) {} + ` + _, err := newScriptFromConfig(log, &scriptConfig{ + Source: script, + Params: map[string]interface{}{ + "p1": 42, + }, + }) + assert.NoError(t, err) + }) +} + +func TestSessionTestFunction(t *testing.T) { + logp.TestingSetup() + + const script = ` + var fail = false; + + function register(params) { + fail = params["fail"]; + } + + function parse(n) { + if (fail) { + throw "intentional failure"; + } + var m = JSON.parse(n); + var e = new S3EventV2(); + e.SetS3ObjectKey(m["hello"]); + return [e]; + } + + function test() { + var n = "{\"hello\": \"earth\"}"; + var evts = parse(n); + + if (evts[0].S3.Object.Key !== "earth") { + throw "invalid key value"; + } + } + ` + + t.Run("test method is optional", func(t *testing.T) { + _, err := newScriptFromConfig(log, &scriptConfig{ + Source: header + footer, + }) + if err != nil { + t.Fatal(err) + } + }) + + t.Run("test success", func(t *testing.T) { + _, err := newScriptFromConfig(log, &scriptConfig{ + Source: script, + Params: map[string]interface{}{ + "fail": false, + }, + }) + assert.NoError(t, err) + }) + + t.Run("test failure", func(t *testing.T) { + _, err := newScriptFromConfig(log, &scriptConfig{ + Source: script, + Params: map[string]interface{}{ + "fail": true, + }, + }) + assert.Error(t, err) + }) +} + +func TestSessionTimeout(t *testing.T) { + logp.TestingSetup() + + const runawayLoop = ` + var m = JSON.parse(n); + while (!m.stop) { + m.hello = "world"; + } + ` + + p, err := newScriptFromConfig(log, &scriptConfig{ + Source: header + runawayLoop + footer, + Timeout: 100 * time.Millisecond, + }) + if err != nil { + t.Fatal(err) + } + + n := `{"stop": false}` + + // Execute and expect a timeout. + _, err = p.run(n) + if assert.Error(t, err) { + assert.Contains(t, err.Error(), timeoutError) + } + + // Verify that any internal runtime interrupt state has been cleared. + n = `{"stop": true}` + _, err = p.run(n) + assert.NoError(t, err) +} + +func TestSessionParallel(t *testing.T) { + logp.TestingSetup() + + const script = ` + var m = JSON.parse(n); + var evt = new S3EventV2(); + evt.SetS3ObjectKey(m.hello.world); + return [evt]; + ` + + p, err := newScriptFromConfig(log, &scriptConfig{ + Source: header + script + footer, + }) + if err != nil { + t.Fatal(err) + } + + const goroutines = 10 + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var wg sync.WaitGroup + wg.Add(goroutines) + for i := 0; i < goroutines; i++ { + go func() { + defer wg.Done() + for ctx.Err() == nil { + n := `{"hello":{"world": "hello"}}` + evts, err := p.run(n) + require.NoError(t, err) + require.Equal(t, 1, len(evts)) + assert.Equal(t, "hello", evts[0].S3.Object.Key) + } + }() + } + + time.AfterFunc(time.Second, cancel) + wg.Wait() +} + +func TestCreateS3EventsFromNotification(t *testing.T) { + logp.TestingSetup() + + n := `{ + "cid": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", + "timestamp": 1492726639222, + "fileCount": 4, + "totalSize": 349986221, + "bucket": "bucketNNNN", + "pathPrefix": "logs/aaaa-bbbb-cccc-dddd-eeee-ffff", + "files": [ + { + "path": "logs/aaaa-bbbb-cccc-dddd-eeee-ffff/part-00000.gz", + "size": 90506437, + "checksum": "ffffffffffffffffffff" + }, + { + "path": "logs/aaaa-bbbb-cccc-dddd-eeee-ffff/part-00001.gz", + "size": 86467594, + "checksum": "ffffffffffffffffffff" + }, + { + "path": "logs/aaaa-bbbb-cccc-dddd-eeee-ffff/part-00002.gz", + "size": 83893710, + "checksum": "ffffffffffffffffffff" + }, + { + "path": "logs/aaaa-bbbb-cccc-dddd-eeee-ffff/part-00003.gz", + "size": 89118480, + "checksum": "ffffffffffffffffffff" + } + ] + }` + + const script = ` + function parse(n) { + var m = JSON.parse(n); + var evts = []; + var files = m.files; + var bucket = m.bucket; + + if (!Array.isArray(files) || (files.length == 0) || bucket == null || bucket == "") { + return evts; + } + + files.forEach(function(f){ + var evt = new S3EventV2(); + evt.SetS3BucketName(bucket); + evt.SetS3ObjectKey(f.path); + evts.push(evt); + }); + + return evts; + } +` + s, err := newScriptFromConfig(log, &scriptConfig{Source: script}) + require.NoError(t, err) + + evts, err := s.run(n) + require.NoError(t, err) + require.Equal(t, 4, len(evts)) + + const expectedBucket = "bucketNNNN" + expectedObjectKeys := []string{ + "logs/aaaa-bbbb-cccc-dddd-eeee-ffff/part-00000.gz", + "logs/aaaa-bbbb-cccc-dddd-eeee-ffff/part-00001.gz", + "logs/aaaa-bbbb-cccc-dddd-eeee-ffff/part-00002.gz", + "logs/aaaa-bbbb-cccc-dddd-eeee-ffff/part-00003.gz", + } + + for i, e := range expectedObjectKeys { + assert.Equal(t, expectedBucket, evts[i].S3.Bucket.Name) + assert.Equal(t, e, evts[i].S3.Object.Key) + } +} + +func TestParseXML(t *testing.T) { + logp.TestingSetup() + + n := ` + bucketNNNN + + logs/aaaa-bbbb-cccc-dddd-eeee-ffff/part-00000.gz + logs/aaaa-bbbb-cccc-dddd-eeee-ffff/part-00001.gz + + ` + + const script = ` + function parse(n) { + var dec = new XMLDecoder(n); + var m = dec.Decode(); + var evts = []; + var files = m.record.files.file; + var bucket = m.record.bucket; + + if (!Array.isArray(files) || (files.length == 0) || bucket == null || bucket == "") { + return evts; + } + + files.forEach(function(f){ + var evt = new S3EventV2(); + evt.SetS3BucketName(bucket); + evt.SetS3ObjectKey(f.path); + evts.push(evt); + }); + + return evts; + } +` + s, err := newScriptFromConfig(log, &scriptConfig{Source: script}) + require.NoError(t, err) + + evts, err := s.run(n) + require.NoError(t, err) + require.Equal(t, 2, len(evts)) + + const expectedBucket = "bucketNNNN" + expectedObjectKeys := []string{ + "logs/aaaa-bbbb-cccc-dddd-eeee-ffff/part-00000.gz", + "logs/aaaa-bbbb-cccc-dddd-eeee-ffff/part-00001.gz", + } + + for i, e := range expectedObjectKeys { + assert.Equal(t, expectedBucket, evts[i].S3.Bucket.Name) + assert.Equal(t, e, evts[i].S3.Object.Key) + } +} diff --git a/x-pack/filebeat/input/awss3/sqs_s3_event.go b/x-pack/filebeat/input/awss3/sqs_s3_event.go index a89aad7fc12..c906c74fa9e 100644 --- a/x-pack/filebeat/input/awss3/sqs_s3_event.go +++ b/x-pack/filebeat/input/awss3/sqs_s3_event.go @@ -86,9 +86,10 @@ type sqsS3EventProcessor struct { log *logp.Logger warnOnce sync.Once metrics *inputMetrics + script *script } -func newSQSS3EventProcessor(log *logp.Logger, metrics *inputMetrics, sqs sqsAPI, sqsVisibilityTimeout time.Duration, maxReceiveCount int, s3 s3ObjectHandlerFactory) *sqsS3EventProcessor { +func newSQSS3EventProcessor(log *logp.Logger, metrics *inputMetrics, sqs sqsAPI, script *script, sqsVisibilityTimeout time.Duration, maxReceiveCount int, s3 s3ObjectHandlerFactory) *sqsS3EventProcessor { if metrics == nil { metrics = newInputMetrics(monitoring.NewRegistry(), "") } @@ -99,6 +100,7 @@ func newSQSS3EventProcessor(log *logp.Logger, metrics *inputMetrics, sqs sqsAPI, sqs: sqs, log: log, metrics: metrics, + script: script, } } @@ -185,6 +187,12 @@ func (p *sqsS3EventProcessor) keepalive(ctx context.Context, log *logp.Logger, w } func (p *sqsS3EventProcessor) getS3Notifications(body string) ([]s3EventV2, error) { + // Check if a parsing script is defined. If so, it takes precedence over + // format autodetection. + if p.script != nil { + return p.script.run(body) + } + // NOTE: If AWS introduces a V3 schema this will need updated to handle that schema. var events s3EventsV2 dec := json.NewDecoder(strings.NewReader(body)) @@ -201,6 +209,12 @@ func (p *sqsS3EventProcessor) getS3Notifications(body string) ([]s3EventV2, erro return nil, fmt.Errorf("failed to decode SQS message body as an S3 notification: %w", err) } } + + if events.Records == nil { + p.log.Debugw("Invalid SQS message body: missing Records field", "sqs_message_body", body) + return nil, errors.New("the message is an invalid S3 notification: missing Records field") + } + return p.getS3Info(events) } diff --git a/x-pack/filebeat/input/awss3/sqs_s3_event_test.go b/x-pack/filebeat/input/awss3/sqs_s3_event_test.go index 9edd5ec4ed9..ad6d30056d4 100644 --- a/x-pack/filebeat/input/awss3/sqs_s3_event_test.go +++ b/x-pack/filebeat/input/awss3/sqs_s3_event_test.go @@ -38,7 +38,7 @@ func TestSQSS3EventProcessor(t *testing.T) { mockAPI.EXPECT().DeleteMessage(gomock.Any(), gomock.Eq(&msg)).Return(nil), ) - p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, time.Minute, 5, mockS3HandlerFactory) + p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, time.Minute, 5, mockS3HandlerFactory) require.NoError(t, p.ProcessSQS(ctx, &msg)) }) @@ -60,7 +60,7 @@ func TestSQSS3EventProcessor(t *testing.T) { mockAPI.EXPECT().DeleteMessage(gomock.Any(), gomock.Eq(&invalidBodyMsg)).Return(nil), ) - p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, time.Minute, 5, mockS3HandlerFactory) + p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, time.Minute, 5, mockS3HandlerFactory) err := p.ProcessSQS(ctx, &invalidBodyMsg) require.Error(t, err) t.Log(err) @@ -75,13 +75,13 @@ func TestSQSS3EventProcessor(t *testing.T) { mockAPI := NewMockSQSAPI(ctrl) mockS3HandlerFactory := NewMockS3ObjectHandlerFactory(ctrl) - emptyRecordsMsg := newSQSMessage() + emptyRecordsMsg := newSQSMessage([]s3EventV2{}...) gomock.InOrder( mockAPI.EXPECT().DeleteMessage(gomock.Any(), gomock.Eq(&emptyRecordsMsg)).Return(nil), ) - p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, time.Minute, 5, mockS3HandlerFactory) + p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, time.Minute, 5, mockS3HandlerFactory) require.NoError(t, p.ProcessSQS(ctx, &emptyRecordsMsg)) }) @@ -108,7 +108,7 @@ func TestSQSS3EventProcessor(t *testing.T) { mockAPI.EXPECT().DeleteMessage(gomock.Any(), gomock.Eq(&msg)).Return(nil), ) - p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, visibilityTimeout, 5, mockS3HandlerFactory) + p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, visibilityTimeout, 5, mockS3HandlerFactory) require.NoError(t, p.ProcessSQS(ctx, &msg)) }) @@ -127,7 +127,7 @@ func TestSQSS3EventProcessor(t *testing.T) { mockS3Handler.EXPECT().ProcessS3Object().Return(errors.New("fake connectivity problem")), ) - p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, time.Minute, 5, mockS3HandlerFactory) + p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, time.Minute, 5, mockS3HandlerFactory) err := p.ProcessSQS(ctx, &msg) t.Log(err) require.Error(t, err) @@ -154,7 +154,7 @@ func TestSQSS3EventProcessor(t *testing.T) { mockAPI.EXPECT().DeleteMessage(gomock.Any(), gomock.Eq(&msg)).Return(nil), ) - p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, time.Minute, 5, mockS3HandlerFactory) + p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, time.Minute, 5, mockS3HandlerFactory) err := p.ProcessSQS(ctx, &msg) t.Log(err) require.Error(t, err) @@ -164,7 +164,7 @@ func TestSQSS3EventProcessor(t *testing.T) { func TestSqsProcessor_getS3Notifications(t *testing.T) { logp.TestingSetup() - p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, nil, time.Minute, 5, nil) + p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, nil, nil, time.Minute, 5, nil) t.Run("s3 key is url unescaped", func(t *testing.T) { msg := newSQSMessage(newS3Event("Happy+Face.jpg")) @@ -194,6 +194,24 @@ func TestSqsProcessor_getS3Notifications(t *testing.T) { assert.Equal(t, "arn:aws:s3:::vpc-flow-logs-ks", events[0].S3.Bucket.ARN) assert.Equal(t, "vpc-flow-logs-ks", events[0].S3.Bucket.Name) }) + + t.Run("missing Records fail", func(t *testing.T) { + msg := `{"message":"missing records"}` + _, err := p.getS3Notifications(msg) + require.Error(t, err) + assert.EqualError(t, err, "the message is an invalid S3 notification: missing Records field") + msg = `{"message":"null records", "Records": null}` + _, err = p.getS3Notifications(msg) + require.Error(t, err) + assert.EqualError(t, err, "the message is an invalid S3 notification: missing Records field") + }) + + t.Run("empty Records does not fail", func(t *testing.T) { + msg := `{"Records":[]}` + events, err := p.getS3Notifications(msg) + require.NoError(t, err) + assert.Equal(t, 0, len(events)) + }) } func TestNonRecoverableError(t *testing.T) {