Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[filebeat][websocket] - Added runtime URL modification support based on state and cursor values #39997

Merged
merged 11 commits into from
Jun 25, 2024
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Introduce log message for not supported annotations for Hints based autodiscover {pull}38213[38213]
- Add persistent volume claim name to volume if available {pull}38839[38839]
- Raw events are now logged to a different file, this prevents potentially sensitive information from leaking into log files {pull}38767[38767]
- Websocket input: Added runtime URL modification support based on state and cursor values {issue}39858[39858] {pull}39997[39997]

*Auditbeat*

Expand Down
24 changes: 24 additions & 0 deletions x-pack/filebeat/docs/inputs/input-websocket.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,30 @@ program: |
})
----

[[input-url-program-websocket]]
[float]
==== `url_program`

If present, this CEL program is executed before the websocket connection is established using the `state` object, including any stored cursor value. It must evaluate to a valid URL. The returned URL is used to make the websocket connection for processing. The program may use cursor values or other state defined values to customize the URL at runtime.

["source","yaml",subs="attributes"]
----
url: ws://testapi:443/v1/streamresults
state:
initial_start_time: "2022-01-01T00:00:00Z"
url_program: |
state.url + "?since=" + state.?cursor.since.orValue(state.initial_start_time)
program: |
bytes(state.response).decode_json().as(inner_body,{
"events": {
"message": inner_body.encode_json(),
},
"cursor": {
"since": inner_body.timestamp
}
})
----

[[state-websocket]]
[float]
==== `state`
Expand Down
2 changes: 2 additions & 0 deletions x-pack/filebeat/input/websocket/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
)

type config struct {
// URLProgram is the CEL program to be run once before to prep the url.
URLProgram string `config:"url_program"`
// Program is the CEL program to be run for each polling.
Program string `config:"program"`
// Regexps is the set of regular expression to be made
Expand Down
75 changes: 74 additions & 1 deletion x-pack/filebeat/input/websocket/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"errors"
"fmt"
"net/url"
"reflect"
"time"

Expand Down Expand Up @@ -97,9 +98,15 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p
state["cursor"] = cursor
}

// initialize the input url with the help of the url_program.
url, err := i.getURL(ctx, state, log)
if err != nil {
metrics.errorsTotal.Inc()
return err
}

// websocket client
headers := formHeader(cfg)
url := cfg.URL.String()
c, resp, err := websocket.DefaultDialer.DialContext(ctx, url, headers)
if resp != nil && resp.Body != nil {
log.Debugw("websocket connection response", "body", resp.Body)
Expand Down Expand Up @@ -150,6 +157,72 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p
}
}

// getURL initializes the input URL with the help of the url_program.
func (i input) getURL(ctx context.Context, state map[string]interface{}, log *logp.Logger) (string, error) {
var (
url string
err error
)
cfg := i.cfg
if cfg.URLProgram != "" {
state["url"] = cfg.URL.String()
// CEL program which is used to prime/initialize the input url
url_prg, ast, err := newProgram(ctx, cfg.URLProgram, root, nil, log)
if err != nil {
return url, err
}

log.Debugw("cel engine state before url_eval", logp.Namespace("websocket"), "state", redactor{state: state, cfg: cfg.Redact})
start := i.now().In(time.UTC)
url, err = evalURLWith(ctx, url_prg, ast, state, start)
log.Debugw("url_eval result", logp.Namespace("websocket"), "modified_url", url)
if err != nil {
log.Errorw("failed url evaluation", "error", err)
return url, err
}
} else {
url = cfg.URL.String()
}
return url, err
}

func evalURLWith(ctx context.Context, prg cel.Program, ast *cel.Ast, state map[string]interface{}, now time.Time) (string, error) {
out, _, err := prg.ContextEval(ctx, map[string]interface{}{
// Replace global program "now" with current time. This is necessary
// as the lib.Time now global is static at program instantiation time
// which will persist over multiple evaluations. The lib.Time behaviour
// is correct for mito where CEL program instances live for only a
// single evaluation. Rather than incurring the cost of creating a new
// cel.Program for each evaluation, shadow lib.Time's now with a new
// value for each eval. We retain the lib.Time now global for
// compatibility between CEL programs developed in mito with programs
// run in the input.
"now": now,
root: state,
})
if err != nil {
err = lib.DecoratedError{AST: ast, Err: err}
}
if e := ctx.Err(); e != nil {
err = e
}
if err != nil {
return "", fmt.Errorf("failed eval: %w", err)
}
v, err := out.ConvertToNative(reflect.TypeOf(""))
if err != nil {
return "", fmt.Errorf("failed type conversion: %w", err)
}
switch v := v.(type) {
case string:
_, err = url.Parse(v)
return v, err
default:
// This should never happen.
return "", fmt.Errorf("unexpected native conversion type: %T", v)
}
}

// processAndPublishData processes the data in state, updates the cursor and publishes it to the publisher.
// the CEL program here only executes a single time, since the websocket connection is persistent and events are received and processed in real time.
func (i *input) processAndPublishData(ctx context.Context, metrics *inputMetrics, prg cel.Program, ast *cel.Ast,
Expand Down
99 changes: 98 additions & 1 deletion x-pack/filebeat/input/websocket/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package websocket

import (
"context"
"errors"
"fmt"
"net/http"
"net/http/httptest"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/elastic/elastic-agent-libs/mapstr"
)

//nolint:gosec // These are test tokens and are not used in production code.
const (
basicToken = "dXNlcjpwYXNz"
bearerToken = "BXNlcjpwYVVz"
Expand Down Expand Up @@ -399,6 +401,101 @@ var inputTests = []struct {
},
}

var urlEvalTests = []struct {
name string
config map[string]interface{}
time func() time.Time
want string
}{
{
name: "cursor based url modification",
config: map[string]interface{}{
"url": "ws://testapi/getresults",
"url_program": `has(state.cursor) && has(state.cursor.since) ? state.url+"?since="+ state.cursor.since : state.url`,
"state": map[string]interface{}{
"cursor": map[string]interface{}{
"since": "2017-08-17T14:54:12",
},
},
},
want: "ws://testapi/getresults?since=2017-08-17T14:54:12",
},
{
name: "cursor based url modification using simplified query",
config: map[string]interface{}{
"url": "ws://testapi/getresults",
"url_program": `state.url + "?since=" + state.?cursor.since.orValue(state.url)`,
"state": map[string]interface{}{
"cursor": map[string]interface{}{
"since": "2017-08-17T14:54:12",
},
},
},
want: "ws://testapi/getresults?since=2017-08-17T14:54:12",
},
{
name: "url modification with no cursor",
config: map[string]interface{}{
"url": "ws://testapi/getresults",
"url_program": `has(state.cursor) && has(state.cursor.since) ? state.url+"?since="+ state.cursor.since: state.url+"?since="+ state.initial_start_time`,
"state": map[string]interface{}{
"initial_start_time": "2022-01-01T00:00:00Z",
},
},
want: "ws://testapi/getresults?since=2022-01-01T00:00:00Z",
},
{
name: "url modification with no cursor, using simplified query",
config: map[string]interface{}{
"url": "ws://testapi/getresults",
"url_program": `state.url + "?since=" + state.?cursor.since.orValue(state.initial_start_time)`,
"state": map[string]interface{}{
"initial_start_time": "2022-01-01T00:00:00Z",
},
},
want: "ws://testapi/getresults?since=2022-01-01T00:00:00Z",
},
}

func TestURLEval(t *testing.T) {
logp.TestingSetup()
for _, test := range urlEvalTests {
t.Run(test.name, func(t *testing.T) {

cfg := conf.MustNewConfigFrom(test.config)

conf := config{}
conf.Redact = &redact{}
err := cfg.Unpack(&conf)
if err != nil {
t.Fatalf("unexpected error unpacking config: %v", err)
}

name := input{}.Name()
if name != "websocket" {
t.Errorf(`unexpected input name: got:%q want:"websocket"`, name)
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

var state map[string]interface{}
if conf.State == nil {
state = make(map[string]interface{})
} else {
state = conf.State
}

response, err := input{test.time, conf}.getURL(ctx, state, logp.NewLogger("websocket_url_eval_test"))
if err != nil && !errors.Is(err, context.Canceled) {
t.Errorf("unexpected error from running input: got:%v want:%v", err, nil)
}

assert.Equal(t, test.want, response)
})
}
}

func TestInput(t *testing.T) {
// tests will ignore context cancelled errors, since they are expected
ctxCancelledError := fmt.Errorf("context canceled")
Expand Down Expand Up @@ -432,7 +529,7 @@ func TestInput(t *testing.T) {
t.Fatalf("unexpected error running test: %v", err)
}

ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
defer cancel()

v2Ctx := v2.Context{
Expand Down
Loading