Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[filebeat][websocket] - Added runtime URL modification support based on state and cursor values #39997

Merged
merged 11 commits into from
Jun 25, 2024
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Introduce log message for not supported annotations for Hints based autodiscover {pull}38213[38213]
- Add persistent volume claim name to volume if available {pull}38839[38839]
- Raw events are now logged to a different file, this prevents potentially sensitive information from leaking into log files {pull}38767[38767]
- Websocket input: Added runtime URL modification support based on state and cursor values {issue}39858[39858] {pull}39997[39997]

*Auditbeat*

Expand Down
32 changes: 32 additions & 0 deletions x-pack/filebeat/docs/inputs/input-websocket.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,38 @@ program: |
})
----

[[input-initializer-program-websocket]]
[float]
==== `input_initializer_program`

The input initializer program is executed once when the input is started. This program is useful for initializing the input URL with custom values. It shares the same state with the main `CEL program` and hence can utilize cursor values or other state defined values to customize the URL at runtime. This program must always
set the `url` field value and has to be wrapped inside `curly braces` as shown in the example.
ShourieG marked this conversation as resolved.
Show resolved Hide resolved

["source","yaml",subs="attributes"]
----
url: ws://testapi:443/v1/streamresults
state:
initial_start_time: "2022-01-01T00:00:00Z"
input_initializer_program: |
{
"url" : (
has(state.cursor) && has(state.cursor.since) ?
state.url+"?since="+ state.cursor.since
:
state.url+"?since="+ state.initial_start_time
)
}
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
program: |
bytes(state.response).decode_json().as(inner_body,{
"events": {
"message": inner_body.encode_json(),
},
"cursor": {
"since": inner_body.timestamp
}
})
----

[[state-websocket]]
[float]
==== `state`
Expand Down
2 changes: 2 additions & 0 deletions x-pack/filebeat/input/websocket/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
)

type config struct {
// InputInitializerProgram is the CEL program to be run once before to prep the url.
InputInitializerProgram string `config:"input_initializer_program"`
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
// Program is the CEL program to be run for each polling.
Program string `config:"program"`
// Regexps is the set of regular expression to be made
Expand Down
41 changes: 40 additions & 1 deletion x-pack/filebeat/input/websocket/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,15 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p
state["cursor"] = cursor
}

// initialize the input url with the help of the input_initializer_program.
url, err := i.initializeInputURL(ctx, state, log)
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
metrics.errorsTotal.Inc()
return err
}

// websocket client
headers := formHeader(cfg)
url := cfg.URL.String()
c, resp, err := websocket.DefaultDialer.DialContext(ctx, url, headers)
if resp != nil && resp.Body != nil {
log.Debugw("websocket connection response", "body", resp.Body)
Expand Down Expand Up @@ -150,6 +156,39 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p
}
}

// initializeInputURL initializes the input with the help of the input_initializer_program.
func (i input) initializeInputURL(ctx context.Context, state map[string]interface{}, log *logp.Logger) (string, error) {
var url string
cfg := i.cfg
if cfg.InputInitializerProgram != "" {
state["url"] = cfg.URL.String()
// CEL program which is used to prime/initialize the input url
input_initializer_prg, ast, err := newProgram(ctx, cfg.InputInitializerProgram, root, nil, log)
if err != nil {
return url, err
}

log.Debugw("cel engine state before input_initializer_eval", logp.Namespace("websocket"), "state", redactor{state: state, cfg: cfg.Redact})
start := i.now().In(time.UTC)
state, err := evalWith(ctx, input_initializer_prg, ast, state, start)
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
log.Debugw("cel engine state after input_initializer_eval", logp.Namespace("websocket"), "state", redactor{state: state, cfg: cfg.Redact})
if err != nil {
log.Errorw("failed input_initializer evaluation", "error", err)
return url, err
}

if u, ok := state["url"].(string); ok {
url = u
} else {
return url, fmt.Errorf("unexpected type returned for evaluation url: %T", state["url"])
}
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
delete(state, "url")
} else {
url = cfg.URL.String()
}
return url, nil
}

// processAndPublishData processes the data in state, updates the cursor and publishes it to the publisher.
// the CEL program here only executes a single time, since the websocket connection is persistent and events are received and processed in real time.
func (i *input) processAndPublishData(ctx context.Context, metrics *inputMetrics, prg cel.Program, ast *cel.Ast,
Expand Down
162 changes: 161 additions & 1 deletion x-pack/filebeat/input/websocket/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/http"
"net/http/httptest"
"reflect"
"strings"
"sync"
"testing"
"time"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/elastic/elastic-agent-libs/mapstr"
)

//nolint:gosec // These are test tokens and are not used in production code.
const (
basicToken = "dXNlcjpwYXNz"
bearerToken = "BXNlcjpwYVVz"
Expand Down Expand Up @@ -399,6 +401,164 @@ var inputTests = []struct {
},
}

var inputInitializerTests = []struct {
name string
config map[string]interface{}
time func() time.Time
want string
wantErr error
}{
{
name: "cursor based url modification",
config: map[string]interface{}{
"url": "ws://testapi/getresults",
"input_initializer_program": `
{
"url" : (
has(state.cursor) && has(state.cursor.since) ?
state.url+"?since="+ state.cursor.since
:
state.url
)
}`,
"state": map[string]interface{}{
"cursor": map[string]interface{}{
"since": "2017-08-17T14:54:12",
},
},
},
want: "ws://testapi/getresults?since=2017-08-17T14:54:12",
},
{
name: "url modification with no cursor",
config: map[string]interface{}{
"url": "ws://testapi/getresults",
"input_initializer_program": `
{
"url" : (
has(state.cursor) && has(state.cursor.since) ?
state.url+"?since="+ state.cursor.since
:
state.url+"?since="+ state.initial_start_time
)
}`,
"state": map[string]interface{}{
"initial_start_time": "2022-01-01T00:00:00Z",
},
},
want: "ws://testapi/getresults?since=2022-01-01T00:00:00Z",
},
{
name: "missing state variable",
config: map[string]interface{}{
"url": "ws://testapi/getresults",
"input_initializer_program": `
{
"url" : (
has(state.cursor) && has(state.cursor.since) ?
state.url+"?since="+ state.cursor.since
:
state.url+"?since="+ state.start_time
)
}`,
"state": map[string]interface{}{
"initial_start_time": "2022-01-01T00:00:00Z",
},
},
wantErr: fmt.Errorf("failed eval: ERROR: <input>:4:51: no such key: start_time"),
},
{
name: "missing cursor variable",
config: map[string]interface{}{
"url": "ws://testapi/getresults",
"input_initializer_program": `
{
"url" : (
has(state.cursor) ?
state.url+"?since="+ state.cursor.since
:
state.url+"?since="+ state.initial_start_time
)
}`,
"state": map[string]interface{}{
"cursor": map[string]interface{}{
"timestamp": "2017-08-17T14:54:12",
},
"initial_start_time": "2022-01-01T00:00:00Z",
},
},
wantErr: fmt.Errorf("failed eval: ERROR: <input>:4:24: no such key: since"),
},
{
name: "missing curly braces in program definition",
config: map[string]interface{}{
"url": "ws://testapi/getresults",
"input_initializer_program": `
"url" : (
has(state.cursor) && has(state.cursor.since) ?
state.url+"?since="+ state.cursor.since
:
state.url+"?since="+ state.initial_start_time
)`,
"state": map[string]interface{}{
"cursor": map[string]interface{}{
"since": "2017-08-17T14:54:12",
},
"initial_start_time": "2022-01-01T00:00:00Z",
},
},
wantErr: fmt.Errorf("failed compilation: ERROR: <input>:2:11: Syntax error: mismatched input ':' expecting <EOF>"),
},
}

func TestInputInitializer(t *testing.T) {
// tests will ignore context cancelled errors, since they are expected
ctxCancelledError := fmt.Errorf("context canceled")
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
logp.TestingSetup()
for _, test := range inputInitializerTests {
t.Run(test.name, func(t *testing.T) {

cfg := conf.MustNewConfigFrom(test.config)

conf := config{}
conf.Redact = &redact{}
err := cfg.Unpack(&conf)
if err != nil {
if test.wantErr != nil {
assert.EqualError(t, err, test.wantErr.Error())
return
}
t.Fatalf("unexpected error unpacking config: %v", err)
}

name := input{}.Name()
if name != "websocket" {
t.Errorf(`unexpected input name: got:%q want:"websocket"`, name)
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

var state map[string]interface{}
if conf.State == nil {
state = make(map[string]interface{})
} else {
state = conf.State
}

response, err := input{test.time, conf}.initializeInputURL(ctx, state, logp.NewLogger("websocket_input_initializer_test"))
if (fmt.Sprint(err) != fmt.Sprint(ctxCancelledError)) && (strings.Split(fmt.Sprint(err), "\n")[0] != fmt.Sprint(test.wantErr)) {
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
t.Errorf("unexpected error from running input: got:%v want:%v", err, test.wantErr)
}
if test.wantErr != nil {
return
}

assert.Equal(t, test.want, response)
})
}
}

func TestInput(t *testing.T) {
// tests will ignore context cancelled errors, since they are expected
ctxCancelledError := fmt.Errorf("context canceled")
Expand Down Expand Up @@ -432,7 +592,7 @@ func TestInput(t *testing.T) {
t.Fatalf("unexpected error running test: %v", err)
}

ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
defer cancel()

v2Ctx := v2.Context{
Expand Down
Loading