diff --git a/x-pack/filebeat/input/websocket/input.go b/x-pack/filebeat/input/websocket/input.go index 69810edd944..b4c1b840866 100644 --- a/x-pack/filebeat/input/websocket/input.go +++ b/x-pack/filebeat/input/websocket/input.go @@ -5,15 +5,18 @@ package websocket import ( + "bytes" "context" "errors" "fmt" + "io" "net/url" "reflect" "time" "github.com/google/cel-go/cel" "github.com/gorilla/websocket" + "go.uber.org/zap/zapcore" "google.golang.org/protobuf/types/known/structpb" v2 "github.com/elastic/beats/v7/filebeat/input/v2" @@ -109,7 +112,15 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p headers := formHeader(cfg) c, resp, err := websocket.DefaultDialer.DialContext(ctx, url, headers) if resp != nil && resp.Body != nil { - log.Debugw("websocket connection response", "body", resp.Body) + var buf bytes.Buffer + if log.Core().Enabled(zapcore.DebugLevel) { + const limit = 1e4 + io.CopyN(&buf, resp.Body, limit) + } + if n, _ := io.Copy(io.Discard, resp.Body); n != 0 && buf.Len() != 0 { + buf.WriteString("... truncated") + } + log.Debugw("websocket connection response", "body", &buf) resp.Body.Close() } if err != nil { @@ -119,41 +130,26 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p } defer c.Close() - done := make(chan error) - - go func() { - defer close(done) - for { - _, message, err := c.ReadMessage() - if err != nil { - metrics.errorsTotal.Inc() - if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) { - log.Errorw("websocket connection closed", "error", err) - } else { - log.Errorw("failed to read websocket data", "error", err) - } - done <- err - return - } - metrics.receivedBytesTotal.Add(uint64(len(message))) - state["response"] = message - log.Debugw("received websocket message", logp.Namespace("websocket"), string(message)) - err = i.processAndPublishData(ctx, metrics, prg, ast, state, cursor, pub, log) - if err != nil { - metrics.errorsTotal.Inc() - log.Errorw("failed to process and publish data", "error", err) - done <- err - return + for { + _, message, err := c.ReadMessage() + if err != nil { + metrics.errorsTotal.Inc() + if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) { + log.Errorw("websocket connection closed", "error", err) + } else { + log.Errorw("failed to read websocket data", "error", err) } + return err + } + metrics.receivedBytesTotal.Add(uint64(len(message))) + state["response"] = message + log.Debugw("received websocket message", logp.Namespace("websocket"), string(message)) + err = i.processAndPublishData(ctx, metrics, prg, ast, state, cursor, pub, log) + if err != nil { + metrics.errorsTotal.Inc() + log.Errorw("failed to process and publish data", "error", err) + return err } - }() - - // blocks until done is closed, context is cancelled or an error is received - select { - case err := <-done: - return err - case <-ctx.Done(): - return ctx.Err() } }