From 0a7972c185ba032f25eb36dafa8681f7a9b4b81e Mon Sep 17 00:00:00 2001 From: P1llus Date: Tue, 5 May 2020 06:51:43 +0200 Subject: [PATCH 01/27] MVP for http input --- x-pack/filebeat/include/list.go | 1 + x-pack/filebeat/input/httpinput/config.go | 41 ++++ x-pack/filebeat/input/httpinput/input.go | 269 ++++++++++++++++++++++ 3 files changed, 311 insertions(+) create mode 100644 x-pack/filebeat/input/httpinput/config.go create mode 100644 x-pack/filebeat/input/httpinput/input.go diff --git a/x-pack/filebeat/include/list.go b/x-pack/filebeat/include/list.go index 25a600616c9f..1279857f0524 100644 --- a/x-pack/filebeat/include/list.go +++ b/x-pack/filebeat/include/list.go @@ -11,6 +11,7 @@ import ( _ "github.com/elastic/beats/v7/x-pack/filebeat/input/azureeventhub" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/cloudfoundry" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/googlepubsub" + _ "github.com/elastic/beats/v7/x-pack/filebeat/input/httpinput" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit" diff --git a/x-pack/filebeat/input/httpinput/config.go b/x-pack/filebeat/input/httpinput/config.go new file mode 100644 index 000000000000..e5ec9fd2ccc9 --- /dev/null +++ b/x-pack/filebeat/input/httpinput/config.go @@ -0,0 +1,41 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package httpinput + +// Config contains information about httpjson configuration +type config struct { + UseSSL bool `config:"ssl"` + SSLCertificate string `config:"ssl_certificate"` + SSLKey string `config:"ssl_key"` + SSLCA string `config:"ssl_certificate_authorities"` + BasicAuth bool `config:"basic_auth"` + Username string `config:"username"` + Password string `config:"password"` + ResponseCode int `config:"response_code"` + ResponseBody string `config:"response_body"` + ResponseHeaders string `config:"response_headers"` + ListenAddress string `config:"listen_address"` + ListenPort string `config:"listen_port"` + URL string `config:"url"` + Prefix string `config:"prefix"` +} + +func defaultConfig() config { + var c config + c.UseSSL = false + c.SSLCertificate = "/home/marius/go/src/github.com/P1llus/beats/x-pack/filebeat/server.crt" + c.SSLKey = "/home/marius/go/src/github.com/P1llus/beats/x-pack/filebeat/server.key" + c.SSLCA = "" + c.BasicAuth = false + c.Username = "" + c.Password = "" + c.ResponseCode = 200 + c.ResponseBody = `{"message": "success"}` + c.ResponseHeaders = `{"Content-Type"=>"application/json"}` + c.ListenAddress = "" + c.ListenPort = "8000" + c.URL = "/" + return c +} diff --git a/x-pack/filebeat/input/httpinput/input.go b/x-pack/filebeat/input/httpinput/input.go new file mode 100644 index 000000000000..622e0f41774c --- /dev/null +++ b/x-pack/filebeat/input/httpinput/input.go @@ -0,0 +1,269 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package httpinput + +import ( + "context" + "encoding/json" + "io/ioutil" + "net/http" + "strings" + "sync" + "time" + + "github.com/pkg/errors" + + "github.com/elastic/beats/v7/filebeat/channel" + "github.com/elastic/beats/v7/filebeat/input" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" +) + +const ( + inputName = "httpinput" +) + +func init() { + err := input.Register(inputName, NewInput) + if err != nil { + panic(errors.Wrapf(err, "failed to register %v input", inputName)) + } +} + +type HttpInput struct { + config + log *logp.Logger + outlet channel.Outleter // Output of received messages. + inputCtx context.Context // Wraps the Done channel from parent input.Context. + + workerCtx context.Context // Worker goroutine context. It's cancelled when the input stops or the worker exits. + workerCancel context.CancelFunc // Used to signal that the worker should stop. + workerOnce sync.Once // Guarantees that the worker goroutine is only started once. + workerWg sync.WaitGroup // Waits on worker goroutine. + httpServer *http.Server // The currently running HTTP instance + httpMux *http.ServeMux // Current HTTP Handler + httpRequest http.Request // Current Request + httpResponse http.ResponseWriter // Current ResponseWriter +} + +// NewInput creates a new httpjson input +func NewInput( + cfg *common.Config, + connector channel.Connector, + inputContext input.Context, +) (input.Input, error) { + // Extract and validate the input's configuration. + conf := defaultConfig() + if err := cfg.Unpack(&conf); err != nil { + return nil, err + } + // Build outlet for events. + out, err := connector.ConnectWith(cfg, beat.ClientConfig{ + Processing: beat.ProcessingConfig{ + DynamicFields: inputContext.DynamicFields, + }, + }) + if err != nil { + return nil, err + } + + // Wrap input.Context's Done channel with a context.Context. This goroutine + // stops with the parent closes the Done channel. + inputCtx, cancelInputCtx := context.WithCancel(context.Background()) + go func() { + defer cancelInputCtx() + select { + case <-inputContext.Done: + case <-inputCtx.Done(): + } + }() + + // If the input ever needs to be made restartable, then context would need + // to be recreated with each restart. + workerCtx, workerCancel := context.WithCancel(inputCtx) + + in := &HttpInput{ + config: conf, + log: logp.NewLogger("httpinput"), + outlet: out, + inputCtx: inputCtx, + workerCtx: workerCtx, + workerCancel: workerCancel, + } + + in.log.Info("Initialized httpinput input.") + return in, nil +} + +// Run starts the input worker then returns. Only the first invocation +// will ever start the worker. +func (in *HttpInput) Run() { + in.workerOnce.Do(func() { + in.workerWg.Add(1) + go func() { + in.log.Info("httpinput worker has started.") + defer in.log.Info("httpinput worker has stopped.") + defer in.workerWg.Done() + defer in.workerCancel() + if err := in.run(); err != nil { + in.log.Error(err) + return + } + }() + }) +} + +func (in *HttpInput) run() error { + var err error + // Create worker context + ctx, cancel := context.WithCancel(in.workerCtx) + defer cancel() + + // Initialize the HTTP server + err = in.createServer() + + if err != nil && err != http.ErrServerClosed { + in.log.Fatalf("HTTP Server could not start, error: %v", err) + } + + // Infinite Loop waiting for agent to stop + for { + select { + case <-ctx.Done(): + return nil + } + } + return err +} + +// Stop stops the misp input and waits for it to fully stop. +func (in *HttpInput) Stop() { + in.httpServer.Shutdown(in.workerCtx) + in.workerCancel() + in.workerWg.Wait() +} + +// Wait is an alias for Stop. +func (in *HttpInput) Wait() { + in.Stop() +} + +// Create a response to the request +func (in *HttpInput) apiResponse(w http.ResponseWriter, r *http.Request) { + + // Storing for validation + in.httpRequest = *r + in.httpResponse = w + + // Validates request, writes response directly on error. + objmap := in.validateRequest() + + if objmap == nil || len(objmap) == 0 { + in.log.Error("Request could not be processed") + return + } + ok := in.outlet.OnEvent(beat.Event{ + Timestamp: time.Now().UTC(), + Fields: common.MapStr{ + "message": "testing", + in.config.Prefix: objmap, + }, + }) + + if !ok { + return + } + + // On success, returns the configured response parameters + w.Write([]byte(in.config.ResponseBody)) + w.WriteHeader(in.config.ResponseCode) +} + +func (in *HttpInput) createServer() error { + // Merge listening address and port + var address strings.Builder + address.WriteString(in.config.ListenAddress + ":" + in.config.ListenPort) + + in.httpMux = http.NewServeMux() + in.httpMux.HandleFunc(in.config.URL, in.apiResponse) + + if in.config.UseSSL == true { + in.httpServer = &http.Server{Addr: address.String(), Handler: in.httpMux} + return in.httpServer.ListenAndServeTLS(in.config.SSLCertificate, in.config.SSLKey) + } + if in.config.UseSSL == false { + in.httpServer = &http.Server{Addr: address.String(), Handler: in.httpMux} + return in.httpServer.ListenAndServe() + } + return errors.New("SSL settings missing") +} + +func (in *HttpInput) validateRequest() map[string]interface{} { + // Check auth settings and credentials + if in.config.BasicAuth == true { + if in.config.Username == "" || in.config.Password == "" { + in.log.Fatal("Username and password required when basicauth is enabled") + return nil + } + + username, password, _ := in.httpRequest.BasicAuth() + if in.config.Username != username || in.config.Password != password { + in.httpResponse.WriteHeader(http.StatusUnauthorized) + in.httpResponse.Write([]byte(`{"message": "Incorrect username or password"}`)) + return nil + } + } + + // Only allow POST requests + if in.httpRequest.Method != http.MethodPost { + in.httpResponse.WriteHeader(http.StatusMethodNotAllowed) + in.httpResponse.Write([]byte(`{"message": "only post request supported"}`)) + return nil + } + + // Only allow JSON + if in.httpRequest.Header.Get("Content-Type") != "application/json" { + in.httpResponse.WriteHeader(http.StatusUnsupportedMediaType) + in.httpResponse.Write([]byte(`{"message": "wrong content-type header"}`)) + return nil + } + + // Only accept JSON in return + if in.httpRequest.Header.Get("Accept") != "application/json" { + in.httpResponse.WriteHeader(http.StatusNotAcceptable) + in.httpResponse.Write([]byte(`{"message": "wrong accept header"}`)) + return nil + } + + if in.httpRequest.Body == http.NoBody { + in.httpResponse.WriteHeader(http.StatusNotAcceptable) + in.httpResponse.Write([]byte(`{"message": "empty body"}`)) + return nil + } + + // Write full []byte to string + body, err := ioutil.ReadAll(in.httpRequest.Body) + + // If body cannot be read + if err != nil { + in.httpResponse.WriteHeader(http.StatusInternalServerError) + in.httpResponse.Write([]byte(`{"message": "failure"}`)) + return nil + } + + // Declare interface for request body + objmap := make(map[string]interface{}) + + err = json.Unmarshal(body, &objmap) + + // If body can be read, but not converted to JSON + if err != nil { + in.httpResponse.WriteHeader(http.StatusBadRequest) + in.httpResponse.Write([]byte(`{"message": "malformed JSON body"}`)) + return nil + } + return objmap +} From c9852081f2ac11af352b805502f602249c42e7cf Mon Sep 17 00:00:00 2001 From: P1llus Date: Tue, 5 May 2020 06:56:05 +0200 Subject: [PATCH 02/27] adding temp error message on event send failure --- x-pack/filebeat/input/httpinput/input.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/filebeat/input/httpinput/input.go b/x-pack/filebeat/input/httpinput/input.go index 622e0f41774c..28a10651d552 100644 --- a/x-pack/filebeat/input/httpinput/input.go +++ b/x-pack/filebeat/input/httpinput/input.go @@ -174,7 +174,7 @@ func (in *HttpInput) apiResponse(w http.ResponseWriter, r *http.Request) { }) if !ok { - return + in.log.Error("Failed to send event") } // On success, returns the configured response parameters From 5102356e1bedeb8ac7e7b79a36ae54eb9f4c740f Mon Sep 17 00:00:00 2001 From: P1llus Date: Tue, 5 May 2020 07:10:02 +0200 Subject: [PATCH 03/27] updated comment that was there from old input --- x-pack/filebeat/input/httpinput/input.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/filebeat/input/httpinput/input.go b/x-pack/filebeat/input/httpinput/input.go index 28a10651d552..0553f73737e5 100644 --- a/x-pack/filebeat/input/httpinput/input.go +++ b/x-pack/filebeat/input/httpinput/input.go @@ -139,7 +139,7 @@ func (in *HttpInput) run() error { return err } -// Stop stops the misp input and waits for it to fully stop. +// Stops HTTP input and waits for it to finish func (in *HttpInput) Stop() { in.httpServer.Shutdown(in.workerCtx) in.workerCancel() From 7c824c9e6851b16a34154251e4338e1cc81d6825 Mon Sep 17 00:00:00 2001 From: P1llus Date: Tue, 5 May 2020 16:02:08 +0200 Subject: [PATCH 04/27] modify config --- x-pack/filebeat/input/httpinput/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/filebeat/input/httpinput/config.go b/x-pack/filebeat/input/httpinput/config.go index e5ec9fd2ccc9..da4887390ba5 100644 --- a/x-pack/filebeat/input/httpinput/config.go +++ b/x-pack/filebeat/input/httpinput/config.go @@ -33,7 +33,7 @@ func defaultConfig() config { c.Password = "" c.ResponseCode = 200 c.ResponseBody = `{"message": "success"}` - c.ResponseHeaders = `{"Content-Type"=>"application/json"}` + c.ResponseHeaders = `{"Content-Type": "application/json"}` c.ListenAddress = "" c.ListenPort = "8000" c.URL = "/" From c94b6b91911240cb50e5b94cdd0c9154dba8119a Mon Sep 17 00:00:00 2001 From: P1llus Date: Wed, 6 May 2020 13:08:46 +0200 Subject: [PATCH 05/27] changing default config --- x-pack/filebeat/input/httpinput/config.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/filebeat/input/httpinput/config.go b/x-pack/filebeat/input/httpinput/config.go index da4887390ba5..a4eae23787e7 100644 --- a/x-pack/filebeat/input/httpinput/config.go +++ b/x-pack/filebeat/input/httpinput/config.go @@ -25,8 +25,8 @@ type config struct { func defaultConfig() config { var c config c.UseSSL = false - c.SSLCertificate = "/home/marius/go/src/github.com/P1llus/beats/x-pack/filebeat/server.crt" - c.SSLKey = "/home/marius/go/src/github.com/P1llus/beats/x-pack/filebeat/server.key" + c.SSLCertificate = "" + c.SSLKey = "" c.SSLCA = "" c.BasicAuth = false c.Username = "" From aaafd65ed87940ead55e1ddfce592193e28958d6 Mon Sep 17 00:00:00 2001 From: P1llus Date: Thu, 7 May 2020 20:16:49 +0200 Subject: [PATCH 06/27] cleaning up the code and refactor checks to its own functions --- x-pack/filebeat/input/httpinput/config.go | 1 + x-pack/filebeat/input/httpinput/input.go | 176 +++++++++++++++------- 2 files changed, 120 insertions(+), 57 deletions(-) diff --git a/x-pack/filebeat/input/httpinput/config.go b/x-pack/filebeat/input/httpinput/config.go index a4eae23787e7..bd0bce4109e2 100644 --- a/x-pack/filebeat/input/httpinput/config.go +++ b/x-pack/filebeat/input/httpinput/config.go @@ -37,5 +37,6 @@ func defaultConfig() config { c.ListenAddress = "" c.ListenPort = "8000" c.URL = "/" + c.Prefix = "json" return c } diff --git a/x-pack/filebeat/input/httpinput/input.go b/x-pack/filebeat/input/httpinput/input.go index 0553f73737e5..6b6f111ddd9a 100644 --- a/x-pack/filebeat/input/httpinput/input.go +++ b/x-pack/filebeat/input/httpinput/input.go @@ -12,6 +12,7 @@ import ( "strings" "sync" "time" + "fmt" "github.com/pkg/errors" @@ -47,6 +48,7 @@ type HttpInput struct { httpMux *http.ServeMux // Current HTTP Handler httpRequest http.Request // Current Request httpResponse http.ResponseWriter // Current ResponseWriter + eventObject *map[string]interface{} // Current event object } // NewInput creates a new httpjson input @@ -151,109 +153,150 @@ func (in *HttpInput) Wait() { in.Stop() } +func (in *HttpInput) createServer() error { + // Merge listening address and port + var address strings.Builder + address.WriteString(in.config.ListenAddress + ":" + in.config.ListenPort) + + in.httpMux = http.NewServeMux() + in.httpMux.HandleFunc(in.config.URL, in.apiResponse) + + if in.config.UseSSL == true { + in.httpServer = &http.Server{Addr: address.String(), Handler: in.httpMux} + return in.httpServer.ListenAndServeTLS(in.config.SSLCertificate, in.config.SSLKey) + } + if in.config.UseSSL == false { + in.httpServer = &http.Server{Addr: address.String(), Handler: in.httpMux} + return in.httpServer.ListenAndServe() + } + return errors.New("SSL settings missing") +} + // Create a response to the request func (in *HttpInput) apiResponse(w http.ResponseWriter, r *http.Request) { + var err string + var status uint // Storing for validation in.httpRequest = *r in.httpResponse = w // Validates request, writes response directly on error. - objmap := in.validateRequest() + status, err = in.createEvent() - if objmap == nil || len(objmap) == 0 { - in.log.Error("Request could not be processed") + if err != "" || status != 0 { + in.sendResponse(status, err) return } + + // On success, returns the configured response parameters + in.sendResponse(http.StatusOK, in.config.ResponseBody) +} + +func (in *HttpInput) createEvent() (uint, string) { + var err string + var status uint + + status, err = in.validateRequest() + + // Check if any of the validations failed, and if so, return them + if err != "" || status != 0 { + return status, err + } + + // Create the event ok := in.outlet.OnEvent(beat.Event{ Timestamp: time.Now().UTC(), Fields: common.MapStr{ "message": "testing", - in.config.Prefix: objmap, + in.config.Prefix: in.eventObject, }, }) + // If event cannot be sent if !ok { - in.log.Error("Failed to send event") + return http.StatusInternalServerError, in.createErrorMessage("unable to send event") } - // On success, returns the configured response parameters - w.Write([]byte(in.config.ResponseBody)) - w.WriteHeader(in.config.ResponseCode) + return 0, "" } -func (in *HttpInput) createServer() error { - // Merge listening address and port - var address strings.Builder - address.WriteString(in.config.ListenAddress + ":" + in.config.ListenPort) - - in.httpMux = http.NewServeMux() - in.httpMux.HandleFunc(in.config.URL, in.apiResponse) +func (in *HttpInput) validateRequest() (uint, string) { + // Only allow POST requests + var err string + var status uint - if in.config.UseSSL == true { - in.httpServer = &http.Server{Addr: address.String(), Handler: in.httpMux} - return in.httpServer.ListenAndServeTLS(in.config.SSLCertificate, in.config.SSLKey) + // Check auth settings and credentials + if in.config.BasicAuth == true { + status, err = in.validateAuth() } - if in.config.UseSSL == false { - in.httpServer = &http.Server{Addr: address.String(), Handler: in.httpMux} - return in.httpServer.ListenAndServe() + + if err != "" && status != 0 { + return status, err } - return errors.New("SSL settings missing") -} -func (in *HttpInput) validateRequest() map[string]interface{} { - // Check auth settings and credentials - if in.config.BasicAuth == true { - if in.config.Username == "" || in.config.Password == "" { - in.log.Fatal("Username and password required when basicauth is enabled") - return nil - } + // Validate headers + status, err = in.validateHeader() - username, password, _ := in.httpRequest.BasicAuth() - if in.config.Username != username || in.config.Password != password { - in.httpResponse.WriteHeader(http.StatusUnauthorized) - in.httpResponse.Write([]byte(`{"message": "Incorrect username or password"}`)) - return nil - } + if err != "" && status != 0 { + return status, err } - // Only allow POST requests - if in.httpRequest.Method != http.MethodPost { - in.httpResponse.WriteHeader(http.StatusMethodNotAllowed) - in.httpResponse.Write([]byte(`{"message": "only post request supported"}`)) - return nil + // Validate body + status, err = in.validateBody() + + if err != "" && status != 0 { + return status, err } + return 0, "" + +} + +func (in *HttpInput) validateHeader() (uint, string) { // Only allow JSON if in.httpRequest.Header.Get("Content-Type") != "application/json" { - in.httpResponse.WriteHeader(http.StatusUnsupportedMediaType) - in.httpResponse.Write([]byte(`{"message": "wrong content-type header"}`)) - return nil + return http.StatusUnsupportedMediaType, in.createErrorMessage("wrong content-type header, expecting application/json") } // Only accept JSON in return if in.httpRequest.Header.Get("Accept") != "application/json" { - in.httpResponse.WriteHeader(http.StatusNotAcceptable) - in.httpResponse.Write([]byte(`{"message": "wrong accept header"}`)) - return nil + return http.StatusNotAcceptable, in.createErrorMessage("wrong accept header, expecting application/json") + } + return 0, "" +} + +func (in *HttpInput) validateAuth() (uint, string) { + // Check if username or password is missing + if in.config.Username == "" || in.config.Password == "" { + return http.StatusUnauthorized, in.createErrorMessage("Username and password required when basicauth is enabled") + } + + // Check if username and password combination is correct + username, password, _ := in.httpRequest.BasicAuth() + if in.config.Username != username || in.config.Password != password { + return http.StatusUnauthorized, in.createErrorMessage("Incorrect username or password") } + return 0, "" +} + +func (in *HttpInput) validateBody() (uint, string) { + // Checks if body is empty if in.httpRequest.Body == http.NoBody { - in.httpResponse.WriteHeader(http.StatusNotAcceptable) - in.httpResponse.Write([]byte(`{"message": "empty body"}`)) - return nil + return http.StatusNotAcceptable, in.createErrorMessage("body can not be empty") } + // Write full []byte to string body, err := ioutil.ReadAll(in.httpRequest.Body) // If body cannot be read if err != nil { - in.httpResponse.WriteHeader(http.StatusInternalServerError) - in.httpResponse.Write([]byte(`{"message": "failure"}`)) - return nil + return http.StatusInternalServerError, in.createErrorMessage("unable to read body") } + // Declare interface for request body objmap := make(map[string]interface{}) @@ -261,9 +304,28 @@ func (in *HttpInput) validateRequest() map[string]interface{} { // If body can be read, but not converted to JSON if err != nil { - in.httpResponse.WriteHeader(http.StatusBadRequest) - in.httpResponse.Write([]byte(`{"message": "malformed JSON body"}`)) - return nil + return http.StatusBadRequest, in.createErrorMessage("malformed JSON body") } - return objmap + // Assign the current Unmarshaled object when no errors + in.eventObject = &objmap + + return 0, "" +} + +func (in *HttpInput) validateMethod() (uint, string) { + // Ensure HTTP method is POST + if in.httpRequest.Method != http.MethodPost { + return http.StatusMethodNotAllowed, in.createErrorMessage("only POST requests supported") + } + + return 0, "" +} + +func (in *HttpInput) createErrorMessage(r string ) string { + return fmt.Sprintf(`{"message": "%v"}`, r) +} + +func (in *HttpInput) sendResponse(h uint, b string) { + in.httpResponse.WriteHeader(int(h)) + in.httpResponse.Write([]byte(b)) } From 458dcb50b4beb69d53c7ae68df8dbf5990a7f52d Mon Sep 17 00:00:00 2001 From: P1llus Date: Thu, 7 May 2020 20:46:14 +0200 Subject: [PATCH 07/27] mage fmt --- x-pack/filebeat/input/httpinput/config.go | 2 +- x-pack/filebeat/input/httpinput/input.go | 32 +++++++++++------------ 2 files changed, 16 insertions(+), 18 deletions(-) diff --git a/x-pack/filebeat/input/httpinput/config.go b/x-pack/filebeat/input/httpinput/config.go index bd0bce4109e2..6fa25b58ddcd 100644 --- a/x-pack/filebeat/input/httpinput/config.go +++ b/x-pack/filebeat/input/httpinput/config.go @@ -19,7 +19,7 @@ type config struct { ListenAddress string `config:"listen_address"` ListenPort string `config:"listen_port"` URL string `config:"url"` - Prefix string `config:"prefix"` + Prefix string `config:"prefix"` } func defaultConfig() config { diff --git a/x-pack/filebeat/input/httpinput/input.go b/x-pack/filebeat/input/httpinput/input.go index 6b6f111ddd9a..749094d2ee1a 100644 --- a/x-pack/filebeat/input/httpinput/input.go +++ b/x-pack/filebeat/input/httpinput/input.go @@ -7,12 +7,12 @@ package httpinput import ( "context" "encoding/json" + "fmt" "io/ioutil" "net/http" "strings" "sync" "time" - "fmt" "github.com/pkg/errors" @@ -40,15 +40,15 @@ type HttpInput struct { outlet channel.Outleter // Output of received messages. inputCtx context.Context // Wraps the Done channel from parent input.Context. - workerCtx context.Context // Worker goroutine context. It's cancelled when the input stops or the worker exits. - workerCancel context.CancelFunc // Used to signal that the worker should stop. - workerOnce sync.Once // Guarantees that the worker goroutine is only started once. - workerWg sync.WaitGroup // Waits on worker goroutine. - httpServer *http.Server // The currently running HTTP instance - httpMux *http.ServeMux // Current HTTP Handler - httpRequest http.Request // Current Request - httpResponse http.ResponseWriter // Current ResponseWriter - eventObject *map[string]interface{} // Current event object + workerCtx context.Context // Worker goroutine context. It's cancelled when the input stops or the worker exits. + workerCancel context.CancelFunc // Used to signal that the worker should stop. + workerOnce sync.Once // Guarantees that the worker goroutine is only started once. + workerWg sync.WaitGroup // Waits on worker goroutine. + httpServer *http.Server // The currently running HTTP instance + httpMux *http.ServeMux // Current HTTP Handler + httpRequest http.Request // Current Request + httpResponse http.ResponseWriter // Current ResponseWriter + eventObject *map[string]interface{} // Current event object } // NewInput creates a new httpjson input @@ -203,13 +203,13 @@ func (in *HttpInput) createEvent() (uint, string) { if err != "" || status != 0 { return status, err } - + // Create the event ok := in.outlet.OnEvent(beat.Event{ Timestamp: time.Now().UTC(), Fields: common.MapStr{ - "message": "testing", - in.config.Prefix: in.eventObject, + "message": "testing", + in.config.Prefix: in.eventObject, }, }) @@ -243,7 +243,7 @@ func (in *HttpInput) validateRequest() (uint, string) { } // Validate body - status, err = in.validateBody() + status, err = in.validateBody() if err != "" && status != 0 { return status, err @@ -287,7 +287,6 @@ func (in *HttpInput) validateBody() (uint, string) { return http.StatusNotAcceptable, in.createErrorMessage("body can not be empty") } - // Write full []byte to string body, err := ioutil.ReadAll(in.httpRequest.Body) @@ -296,7 +295,6 @@ func (in *HttpInput) validateBody() (uint, string) { return http.StatusInternalServerError, in.createErrorMessage("unable to read body") } - // Declare interface for request body objmap := make(map[string]interface{}) @@ -321,7 +319,7 @@ func (in *HttpInput) validateMethod() (uint, string) { return 0, "" } -func (in *HttpInput) createErrorMessage(r string ) string { +func (in *HttpInput) createErrorMessage(r string) string { return fmt.Sprintf(`{"message": "%v"}`, r) } From f51160eba11a0f7377d6448a80d753998a70b01a Mon Sep 17 00:00:00 2001 From: P1llus Date: Sat, 9 May 2020 21:14:45 +0200 Subject: [PATCH 08/27] Changed name from httpinput to http_endpoint, moved http server to its own file, separated server creation and running, simplified context on run, changed to tlscommon for config --- x-pack/filebeat/include/list.go | 2 +- x-pack/filebeat/input/http_endpoint/config.go | 40 ++++ .../input/http_endpoint/httpserver.go | 81 +++++++ .../{httpinput => http_endpoint}/input.go | 201 +++++++----------- x-pack/filebeat/input/httpinput/config.go | 42 ---- 5 files changed, 203 insertions(+), 163 deletions(-) create mode 100644 x-pack/filebeat/input/http_endpoint/config.go create mode 100644 x-pack/filebeat/input/http_endpoint/httpserver.go rename x-pack/filebeat/input/{httpinput => http_endpoint}/input.go (62%) delete mode 100644 x-pack/filebeat/input/httpinput/config.go diff --git a/x-pack/filebeat/include/list.go b/x-pack/filebeat/include/list.go index afd5f2cc90f5..a5719d18fc73 100644 --- a/x-pack/filebeat/include/list.go +++ b/x-pack/filebeat/include/list.go @@ -11,7 +11,7 @@ import ( _ "github.com/elastic/beats/v7/x-pack/filebeat/input/azureeventhub" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/cloudfoundry" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/googlepubsub" - _ "github.com/elastic/beats/v7/x-pack/filebeat/input/httpinput" + _ "github.com/elastic/beats/v7/x-pack/filebeat/input/http_endpoint" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit" diff --git a/x-pack/filebeat/input/http_endpoint/config.go b/x-pack/filebeat/input/http_endpoint/config.go new file mode 100644 index 000000000000..7eac2752c7fb --- /dev/null +++ b/x-pack/filebeat/input/http_endpoint/config.go @@ -0,0 +1,40 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package httpendpoint + +import ( + "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" +) + +// Config contains information about httpjson configuration +type config struct { + TLS *tlscommon.ServerConfig `config:"ssl"` + ClientAuth bool `config:"ssl.client_auth"` + BasicAuth bool `config:"basic_auth"` + Username string `config:"username"` + Password string `config:"password"` + ResponseCode int `config:"response_code" validate:"positive"` + ResponseBody string `config:"response_body"` + ResponseHeaders string `config:"response_headers"` + ListenAddress string `config:"listen_address"` + ListenPort string `config:"listen_port"` + URL string `config:"url"` + Prefix string `config:"prefix"` +} + +func defaultConfig() config { + return config{ + BasicAuth: false, + Username: "", + Password: "", + ResponseCode: 200, + ResponseBody: `{"message": "success"}`, + ResponseHeaders: `{"Content-Type": "application/json"}`, + ListenAddress: "127.0.0.1", + ListenPort: "8000", + URL: "/", + Prefix: "json", + } +} diff --git a/x-pack/filebeat/input/http_endpoint/httpserver.go b/x-pack/filebeat/input/http_endpoint/httpserver.go new file mode 100644 index 000000000000..84b6fd6aa1d3 --- /dev/null +++ b/x-pack/filebeat/input/http_endpoint/httpserver.go @@ -0,0 +1,81 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package httpendpoint + +import ( + "context" + "crypto/tls" + "fmt" + "net/http" + + "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" + "github.com/elastic/beats/v7/libbeat/logp" +) + +type HttpServer struct { + server *http.Server + ctx context.Context + stop context.CancelFunc + done chan struct{} +} + +func (h *HttpServer) Start() error { + go func() { + if h.server.TLSConfig != nil { + logp.Info("Starting HTTPS server on %s", h.server.Addr) + //certificate is already loaded. That's why the parameters are empty + err := h.server.ListenAndServeTLS("", "") + if err != nil && err != http.ErrServerClosed { + logp.Critical("Unable to start HTTPS server due to error: %v", err) + } + } else { + logp.Info("Starting HTTP server on %s", h.server.Addr) + err := h.server.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + logp.Critical("Unable to start HTTP server due to error: %v", err) + } + } + }() + + return nil +} + +func (h *HttpServer) Stop() { + logp.Info("Stopping HTTP server") + close(h.done) + h.stop() + h.server.Shutdown(h.ctx) +} + +func createServer(in *HttpEndpoint) (*HttpServer, error) { + server := &http.Server{ + Addr: in.config.ListenAddress + ":" + in.config.ListenPort, + } + + http.HandleFunc(in.config.URL, in.apiResponse) + tlsConfig, err := tlscommon.LoadTLSServerConfig(in.config.TLS) + + if err != nil { + return nil, err + } + + if tlsConfig != nil { + server.TLSConfig = tlsConfig.BuildModuleConfig(in.config.ListenAddress) + if !in.config.ClientAuth { + server.TLSConfig.ClientAuth = tls.NoClientCert + } + } + + ctx, cancel := context.WithCancel(context.TODO()) + h := &HttpServer{ + done: make(chan struct{}), + ctx: ctx, + stop: cancel, + } + fmt.Printf("%+v\n", defaultConfig) + h.server = server + + return h, nil +} diff --git a/x-pack/filebeat/input/httpinput/input.go b/x-pack/filebeat/input/http_endpoint/input.go similarity index 62% rename from x-pack/filebeat/input/httpinput/input.go rename to x-pack/filebeat/input/http_endpoint/input.go index 749094d2ee1a..c0d2721e4cbc 100644 --- a/x-pack/filebeat/input/httpinput/input.go +++ b/x-pack/filebeat/input/http_endpoint/input.go @@ -2,15 +2,15 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -package httpinput +package httpendpoint import ( + "bytes" "context" "encoding/json" "fmt" "io/ioutil" "net/http" - "strings" "sync" "time" @@ -24,7 +24,7 @@ import ( ) const ( - inputName = "httpinput" + inputName = "http_endpoint" ) func init() { @@ -34,7 +34,7 @@ func init() { } } -type HttpInput struct { +type HttpEndpoint struct { config log *logp.Logger outlet channel.Outleter // Output of received messages. @@ -44,8 +44,7 @@ type HttpInput struct { workerCancel context.CancelFunc // Used to signal that the worker should stop. workerOnce sync.Once // Guarantees that the worker goroutine is only started once. workerWg sync.WaitGroup // Waits on worker goroutine. - httpServer *http.Server // The currently running HTTP instance - httpMux *http.ServeMux // Current HTTP Handler + server *HttpServer // Server instance httpRequest http.Request // Current Request httpResponse http.ResponseWriter // Current ResponseWriter eventObject *map[string]interface{} // Current event object @@ -62,6 +61,7 @@ func NewInput( if err := cfg.Unpack(&conf); err != nil { return nil, err } + // Build outlet for events. out, err := connector.ConnectWith(cfg, beat.ClientConfig{ Processing: beat.ProcessingConfig{ @@ -87,164 +87,97 @@ func NewInput( // to be recreated with each restart. workerCtx, workerCancel := context.WithCancel(inputCtx) - in := &HttpInput{ + in := &HttpEndpoint{ config: conf, - log: logp.NewLogger("httpinput"), + log: logp.NewLogger(inputName), outlet: out, inputCtx: inputCtx, workerCtx: workerCtx, workerCancel: workerCancel, } - in.log.Info("Initialized httpinput input.") + // Create an instance of the HTTP server with the beat context + in.server, err = createServer(in) + if err != nil { + return nil, err + } + + in.log.Infof("Initialized %v input on %v:%v", inputName, in.config.ListenAddress, in.config.ListenPort) return in, nil } // Run starts the input worker then returns. Only the first invocation // will ever start the worker. -func (in *HttpInput) Run() { +func (in *HttpEndpoint) Run() { in.workerOnce.Do(func() { in.workerWg.Add(1) - go func() { - in.log.Info("httpinput worker has started.") - defer in.log.Info("httpinput worker has stopped.") - defer in.workerWg.Done() - defer in.workerCancel() - if err := in.run(); err != nil { - in.log.Error(err) - return - } - }() + go in.run() + in.workerWg.Done() }) } -func (in *HttpInput) run() error { - var err error - // Create worker context - ctx, cancel := context.WithCancel(in.workerCtx) - defer cancel() - - // Initialize the HTTP server - err = in.createServer() - - if err != nil && err != http.ErrServerClosed { - in.log.Fatalf("HTTP Server could not start, error: %v", err) +func (in *HttpEndpoint) run() { + defer in.log.Infof("%v worker has stopped.", inputName) + err := in.server.Start() + if err != nil { + return } - // Infinite Loop waiting for agent to stop - for { - select { - case <-ctx.Done(): - return nil - } - } - return err + return } // Stops HTTP input and waits for it to finish -func (in *HttpInput) Stop() { - in.httpServer.Shutdown(in.workerCtx) +func (in *HttpEndpoint) Stop() { in.workerCancel() in.workerWg.Wait() } // Wait is an alias for Stop. -func (in *HttpInput) Wait() { +func (in *HttpEndpoint) Wait() { in.Stop() } -func (in *HttpInput) createServer() error { - // Merge listening address and port - var address strings.Builder - address.WriteString(in.config.ListenAddress + ":" + in.config.ListenPort) - - in.httpMux = http.NewServeMux() - in.httpMux.HandleFunc(in.config.URL, in.apiResponse) - - if in.config.UseSSL == true { - in.httpServer = &http.Server{Addr: address.String(), Handler: in.httpMux} - return in.httpServer.ListenAndServeTLS(in.config.SSLCertificate, in.config.SSLKey) - } - if in.config.UseSSL == false { - in.httpServer = &http.Server{Addr: address.String(), Handler: in.httpMux} - return in.httpServer.ListenAndServe() - } - return errors.New("SSL settings missing") -} - -// Create a response to the request -func (in *HttpInput) apiResponse(w http.ResponseWriter, r *http.Request) { - var err string - var status uint - - // Storing for validation - in.httpRequest = *r - in.httpResponse = w - - // Validates request, writes response directly on error. - status, err = in.createEvent() - - if err != "" || status != 0 { - in.sendResponse(status, err) - return - } - - // On success, returns the configured response parameters - in.sendResponse(http.StatusOK, in.config.ResponseBody) -} - -func (in *HttpInput) createEvent() (uint, string) { +// Validates the incoming request and creates a new event upon success +func (in *HttpEndpoint) sendEvent() (uint, string) { var err string var status uint status, err = in.validateRequest() - - // Check if any of the validations failed, and if so, return them if err != "" || status != 0 { return status, err } - // Create the event - ok := in.outlet.OnEvent(beat.Event{ + event := in.outlet.OnEvent(beat.Event{ Timestamp: time.Now().UTC(), Fields: common.MapStr{ - "message": "testing", in.config.Prefix: in.eventObject, }, }) - - // If event cannot be sent - if !ok { + if !event { return http.StatusInternalServerError, in.createErrorMessage("unable to send event") } return 0, "" } -func (in *HttpInput) validateRequest() (uint, string) { - // Only allow POST requests +// Runs all validations for each request +func (in *HttpEndpoint) validateRequest() (uint, string) { var err string var status uint - // Check auth settings and credentials - if in.config.BasicAuth == true { + if in.config.BasicAuth { status, err = in.validateAuth() } - if err != "" && status != 0 { return status, err } - // Validate headers status, err = in.validateHeader() - if err != "" && status != 0 { return status, err } - // Validate body status, err = in.validateBody() - if err != "" && status != 0 { return status, err } @@ -253,26 +186,24 @@ func (in *HttpInput) validateRequest() (uint, string) { } -func (in *HttpInput) validateHeader() (uint, string) { - // Only allow JSON +// Validate that only supported Accept and Content type headers are used +func (in *HttpEndpoint) validateHeader() (uint, string) { if in.httpRequest.Header.Get("Content-Type") != "application/json" { return http.StatusUnsupportedMediaType, in.createErrorMessage("wrong content-type header, expecting application/json") } - // Only accept JSON in return if in.httpRequest.Header.Get("Accept") != "application/json" { return http.StatusNotAcceptable, in.createErrorMessage("wrong accept header, expecting application/json") } return 0, "" } -func (in *HttpInput) validateAuth() (uint, string) { - // Check if username or password is missing +// Validate if headers are current and authentication is successful +func (in *HttpEndpoint) validateAuth() (uint, string) { if in.config.Username == "" || in.config.Password == "" { return http.StatusUnauthorized, in.createErrorMessage("Username and password required when basicauth is enabled") } - // Check if username and password combination is correct username, password, _ := in.httpRequest.BasicAuth() if in.config.Username != username || in.config.Password != password { return http.StatusUnauthorized, in.createErrorMessage("Incorrect username or password") @@ -281,37 +212,36 @@ func (in *HttpInput) validateAuth() (uint, string) { return 0, "" } -func (in *HttpInput) validateBody() (uint, string) { - // Checks if body is empty +// Validates that body is not empty, not a list of objects and valid JSON +func (in *HttpEndpoint) validateBody() (uint, string) { + var isObject string if in.httpRequest.Body == http.NoBody { return http.StatusNotAcceptable, in.createErrorMessage("body can not be empty") } - // Write full []byte to string body, err := ioutil.ReadAll(in.httpRequest.Body) - - // If body cannot be read if err != nil { return http.StatusInternalServerError, in.createErrorMessage("unable to read body") } - // Declare interface for request body - objmap := make(map[string]interface{}) + isObject = in.isObjectOrList(body) + if isObject == "list" { + return http.StatusBadRequest, in.createErrorMessage("List of JSON objects is not supported") + } + objmap := make(map[string]interface{}) err = json.Unmarshal(body, &objmap) - - // If body can be read, but not converted to JSON if err != nil { return http.StatusBadRequest, in.createErrorMessage("malformed JSON body") } - // Assign the current Unmarshaled object when no errors + in.eventObject = &objmap return 0, "" } -func (in *HttpInput) validateMethod() (uint, string) { - // Ensure HTTP method is POST +// Ensure only valid HTTP Methods used +func (in *HttpEndpoint) validateMethod() (uint, string) { if in.httpRequest.Method != http.MethodPost { return http.StatusMethodNotAllowed, in.createErrorMessage("only POST requests supported") } @@ -319,11 +249,42 @@ func (in *HttpInput) validateMethod() (uint, string) { return 0, "" } -func (in *HttpInput) createErrorMessage(r string) string { +func (in *HttpEndpoint) createErrorMessage(r string) string { return fmt.Sprintf(`{"message": "%v"}`, r) } -func (in *HttpInput) sendResponse(h uint, b string) { +func (in *HttpEndpoint) isObjectOrList(b []byte) string { + obj := bytes.TrimLeft(b, " \t\r\n") + if len(obj) > 0 && obj[0] == '{' { + return "object" + } + + if len(obj) > 0 && obj[0] == '[' { + return "list" + } + + return "" +} + +func (in *HttpEndpoint) sendResponse(h uint, b string) { in.httpResponse.WriteHeader(int(h)) in.httpResponse.Write([]byte(b)) } + +// Validates incoming requests and sends a new event upon success +func (in *HttpEndpoint) apiResponse(w http.ResponseWriter, r *http.Request) { + var err string + var status uint + + in.httpRequest = *r + in.httpResponse = w + + // This triggers both validation and event creation + status, err = in.sendEvent() + if err != "" || status != 0 { + in.sendResponse(status, err) + return + } + + in.sendResponse(http.StatusOK, in.config.ResponseBody) +} diff --git a/x-pack/filebeat/input/httpinput/config.go b/x-pack/filebeat/input/httpinput/config.go deleted file mode 100644 index 6fa25b58ddcd..000000000000 --- a/x-pack/filebeat/input/httpinput/config.go +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package httpinput - -// Config contains information about httpjson configuration -type config struct { - UseSSL bool `config:"ssl"` - SSLCertificate string `config:"ssl_certificate"` - SSLKey string `config:"ssl_key"` - SSLCA string `config:"ssl_certificate_authorities"` - BasicAuth bool `config:"basic_auth"` - Username string `config:"username"` - Password string `config:"password"` - ResponseCode int `config:"response_code"` - ResponseBody string `config:"response_body"` - ResponseHeaders string `config:"response_headers"` - ListenAddress string `config:"listen_address"` - ListenPort string `config:"listen_port"` - URL string `config:"url"` - Prefix string `config:"prefix"` -} - -func defaultConfig() config { - var c config - c.UseSSL = false - c.SSLCertificate = "" - c.SSLKey = "" - c.SSLCA = "" - c.BasicAuth = false - c.Username = "" - c.Password = "" - c.ResponseCode = 200 - c.ResponseBody = `{"message": "success"}` - c.ResponseHeaders = `{"Content-Type": "application/json"}` - c.ListenAddress = "" - c.ListenPort = "8000" - c.URL = "/" - c.Prefix = "json" - return c -} From 1ccb3c2a5c0149c6a084c4dad69b39a487a46acf Mon Sep 17 00:00:00 2001 From: P1llus Date: Tue, 12 May 2020 10:51:47 +0200 Subject: [PATCH 09/27] updated code based on PR comments. Changed clientauth after confirming it is not enforced anymore, modified go routines around starting the HTTP server, fixed some typos and test code --- x-pack/filebeat/input/http_endpoint/config.go | 1 - .../input/http_endpoint/httpserver.go | 26 ++++++------------- x-pack/filebeat/input/http_endpoint/input.go | 23 +++++++--------- 3 files changed, 18 insertions(+), 32 deletions(-) diff --git a/x-pack/filebeat/input/http_endpoint/config.go b/x-pack/filebeat/input/http_endpoint/config.go index 7eac2752c7fb..557a41bc8435 100644 --- a/x-pack/filebeat/input/http_endpoint/config.go +++ b/x-pack/filebeat/input/http_endpoint/config.go @@ -11,7 +11,6 @@ import ( // Config contains information about httpjson configuration type config struct { TLS *tlscommon.ServerConfig `config:"ssl"` - ClientAuth bool `config:"ssl.client_auth"` BasicAuth bool `config:"basic_auth"` Username string `config:"username"` Password string `config:"password"` diff --git a/x-pack/filebeat/input/http_endpoint/httpserver.go b/x-pack/filebeat/input/http_endpoint/httpserver.go index 84b6fd6aa1d3..6ab444ea01c2 100644 --- a/x-pack/filebeat/input/http_endpoint/httpserver.go +++ b/x-pack/filebeat/input/http_endpoint/httpserver.go @@ -6,8 +6,6 @@ package httpendpoint import ( "context" - "crypto/tls" - "fmt" "net/http" "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" @@ -15,36 +13,33 @@ import ( ) type HttpServer struct { + log *logp.Logger server *http.Server ctx context.Context stop context.CancelFunc - done chan struct{} } -func (h *HttpServer) Start() error { +func (h *HttpServer) Start() { go func() { if h.server.TLSConfig != nil { - logp.Info("Starting HTTPS server on %s", h.server.Addr) + h.log.Infof("Starting HTTPS server on %s", h.server.Addr) //certificate is already loaded. That's why the parameters are empty err := h.server.ListenAndServeTLS("", "") if err != nil && err != http.ErrServerClosed { - logp.Critical("Unable to start HTTPS server due to error: %v", err) + h.log.Fatalf("Unable to start HTTPS server due to error: %v", err) } } else { - logp.Info("Starting HTTP server on %s", h.server.Addr) + h.log.Infof("Starting HTTP server on %s", h.server.Addr) err := h.server.ListenAndServe() if err != nil && err != http.ErrServerClosed { - logp.Critical("Unable to start HTTP server due to error: %v", err) + h.log.Fatalf("Unable to start HTTP server due to error: %v", err) } } }() - - return nil } func (h *HttpServer) Stop() { - logp.Info("Stopping HTTP server") - close(h.done) + h.log.Info("Stopping HTTP server") h.stop() h.server.Shutdown(h.ctx) } @@ -56,25 +51,20 @@ func createServer(in *HttpEndpoint) (*HttpServer, error) { http.HandleFunc(in.config.URL, in.apiResponse) tlsConfig, err := tlscommon.LoadTLSServerConfig(in.config.TLS) - if err != nil { return nil, err } if tlsConfig != nil { server.TLSConfig = tlsConfig.BuildModuleConfig(in.config.ListenAddress) - if !in.config.ClientAuth { - server.TLSConfig.ClientAuth = tls.NoClientCert - } } ctx, cancel := context.WithCancel(context.TODO()) h := &HttpServer{ - done: make(chan struct{}), ctx: ctx, stop: cancel, + log: logp.NewLogger("http_server"), } - fmt.Printf("%+v\n", defaultConfig) h.server = server return h, nil diff --git a/x-pack/filebeat/input/http_endpoint/input.go b/x-pack/filebeat/input/http_endpoint/input.go index c0d2721e4cbc..896009468217 100644 --- a/x-pack/filebeat/input/http_endpoint/input.go +++ b/x-pack/filebeat/input/http_endpoint/input.go @@ -112,16 +112,13 @@ func (in *HttpEndpoint) Run() { in.workerOnce.Do(func() { in.workerWg.Add(1) go in.run() - in.workerWg.Done() }) } func (in *HttpEndpoint) run() { + defer in.workerWg.Done() defer in.log.Infof("%v worker has stopped.", inputName) - err := in.server.Start() - if err != nil { - return - } + in.server.Start() return } @@ -154,7 +151,7 @@ func (in *HttpEndpoint) sendEvent() (uint, string) { }, }) if !event { - return http.StatusInternalServerError, in.createErrorMessage("unable to send event") + return http.StatusInternalServerError, in.createErrorMessage("Unable to send event") } return 0, "" @@ -189,11 +186,11 @@ func (in *HttpEndpoint) validateRequest() (uint, string) { // Validate that only supported Accept and Content type headers are used func (in *HttpEndpoint) validateHeader() (uint, string) { if in.httpRequest.Header.Get("Content-Type") != "application/json" { - return http.StatusUnsupportedMediaType, in.createErrorMessage("wrong content-type header, expecting application/json") + return http.StatusUnsupportedMediaType, in.createErrorMessage("Wrong Content-Type header, expecting application/json") } if in.httpRequest.Header.Get("Accept") != "application/json" { - return http.StatusNotAcceptable, in.createErrorMessage("wrong accept header, expecting application/json") + return http.StatusNotAcceptable, in.createErrorMessage("Wrong Accept header, expecting application/json") } return 0, "" } @@ -216,12 +213,12 @@ func (in *HttpEndpoint) validateAuth() (uint, string) { func (in *HttpEndpoint) validateBody() (uint, string) { var isObject string if in.httpRequest.Body == http.NoBody { - return http.StatusNotAcceptable, in.createErrorMessage("body can not be empty") + return http.StatusNotAcceptable, in.createErrorMessage("Body cannot be empty") } body, err := ioutil.ReadAll(in.httpRequest.Body) if err != nil { - return http.StatusInternalServerError, in.createErrorMessage("unable to read body") + return http.StatusInternalServerError, in.createErrorMessage("Unable to read body") } isObject = in.isObjectOrList(body) @@ -232,7 +229,7 @@ func (in *HttpEndpoint) validateBody() (uint, string) { objmap := make(map[string]interface{}) err = json.Unmarshal(body, &objmap) if err != nil { - return http.StatusBadRequest, in.createErrorMessage("malformed JSON body") + return http.StatusBadRequest, in.createErrorMessage("Malformed JSON body") } in.eventObject = &objmap @@ -243,7 +240,7 @@ func (in *HttpEndpoint) validateBody() (uint, string) { // Ensure only valid HTTP Methods used func (in *HttpEndpoint) validateMethod() (uint, string) { if in.httpRequest.Method != http.MethodPost { - return http.StatusMethodNotAllowed, in.createErrorMessage("only POST requests supported") + return http.StatusMethodNotAllowed, in.createErrorMessage("Only POST requests supported") } return 0, "" @@ -258,7 +255,7 @@ func (in *HttpEndpoint) isObjectOrList(b []byte) string { if len(obj) > 0 && obj[0] == '{' { return "object" } - + if len(obj) > 0 && obj[0] == '[' { return "list" } From c185b4862eaa0471bbc3f7d1bd045f141f19c965 Mon Sep 17 00:00:00 2001 From: P1llus Date: Tue, 12 May 2020 14:17:08 +0200 Subject: [PATCH 10/27] change packagename --- x-pack/filebeat/input/http_endpoint/input.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/filebeat/input/http_endpoint/input.go b/x-pack/filebeat/input/http_endpoint/input.go index 896009468217..e7a65804a800 100644 --- a/x-pack/filebeat/input/http_endpoint/input.go +++ b/x-pack/filebeat/input/http_endpoint/input.go @@ -2,7 +2,7 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -package httpendpoint +package http_endpoint import ( "bytes" From 72a8652711c481f641b05636200307853a48c761 Mon Sep 17 00:00:00 2001 From: P1llus Date: Tue, 12 May 2020 14:18:38 +0200 Subject: [PATCH 11/27] change packagename --- x-pack/filebeat/input/http_endpoint/config.go | 2 +- x-pack/filebeat/input/http_endpoint/httpserver.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/filebeat/input/http_endpoint/config.go b/x-pack/filebeat/input/http_endpoint/config.go index 557a41bc8435..10b967d657af 100644 --- a/x-pack/filebeat/input/http_endpoint/config.go +++ b/x-pack/filebeat/input/http_endpoint/config.go @@ -2,7 +2,7 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -package httpendpoint +package http_endpoint import ( "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" diff --git a/x-pack/filebeat/input/http_endpoint/httpserver.go b/x-pack/filebeat/input/http_endpoint/httpserver.go index 6ab444ea01c2..3a43f081a29f 100644 --- a/x-pack/filebeat/input/http_endpoint/httpserver.go +++ b/x-pack/filebeat/input/http_endpoint/httpserver.go @@ -2,7 +2,7 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -package httpendpoint +package http_endpoint import ( "context" From 7db39a9d3d8effc18b9aae96b6178e62984c6a24 Mon Sep 17 00:00:00 2001 From: P1llus Date: Thu, 14 May 2020 09:25:57 +0200 Subject: [PATCH 12/27] updated code to be more idiomatic with comments from noemi --- x-pack/filebeat/input/http_endpoint/input.go | 49 +++++++++----------- 1 file changed, 22 insertions(+), 27 deletions(-) diff --git a/x-pack/filebeat/input/http_endpoint/input.go b/x-pack/filebeat/input/http_endpoint/input.go index e7a65804a800..f0821da8c9b8 100644 --- a/x-pack/filebeat/input/http_endpoint/input.go +++ b/x-pack/filebeat/input/http_endpoint/input.go @@ -45,8 +45,6 @@ type HttpEndpoint struct { workerOnce sync.Once // Guarantees that the worker goroutine is only started once. workerWg sync.WaitGroup // Waits on worker goroutine. server *HttpServer // Server instance - httpRequest http.Request // Current Request - httpResponse http.ResponseWriter // Current ResponseWriter eventObject *map[string]interface{} // Current event object } @@ -135,11 +133,11 @@ func (in *HttpEndpoint) Wait() { } // Validates the incoming request and creates a new event upon success -func (in *HttpEndpoint) sendEvent() (uint, string) { +func (in *HttpEndpoint) sendEvent(w http.ResponseWriter, r *http.Request) (uint, string) { var err string var status uint - status, err = in.validateRequest() + status, err = in.validateRequest(w, r) if err != "" || status != 0 { return status, err } @@ -158,23 +156,23 @@ func (in *HttpEndpoint) sendEvent() (uint, string) { } // Runs all validations for each request -func (in *HttpEndpoint) validateRequest() (uint, string) { +func (in *HttpEndpoint) validateRequest(w http.ResponseWriter, r *http.Request) (uint, string) { var err string var status uint if in.config.BasicAuth { - status, err = in.validateAuth() + status, err = in.validateAuth(w, r) } if err != "" && status != 0 { return status, err } - status, err = in.validateHeader() + status, err = in.validateHeader(w, r) if err != "" && status != 0 { return status, err } - status, err = in.validateBody() + status, err = in.validateBody(w, r) if err != "" && status != 0 { return status, err } @@ -184,24 +182,24 @@ func (in *HttpEndpoint) validateRequest() (uint, string) { } // Validate that only supported Accept and Content type headers are used -func (in *HttpEndpoint) validateHeader() (uint, string) { - if in.httpRequest.Header.Get("Content-Type") != "application/json" { +func (in *HttpEndpoint) validateHeader(w http.ResponseWriter, r *http.Request) (uint, string) { + if r.Header.Get("Content-Type") != "application/json" { return http.StatusUnsupportedMediaType, in.createErrorMessage("Wrong Content-Type header, expecting application/json") } - if in.httpRequest.Header.Get("Accept") != "application/json" { + if r.Header.Get("Accept") != "application/json" { return http.StatusNotAcceptable, in.createErrorMessage("Wrong Accept header, expecting application/json") } return 0, "" } // Validate if headers are current and authentication is successful -func (in *HttpEndpoint) validateAuth() (uint, string) { +func (in *HttpEndpoint) validateAuth(w http.ResponseWriter, r *http.Request) (uint, string) { if in.config.Username == "" || in.config.Password == "" { return http.StatusUnauthorized, in.createErrorMessage("Username and password required when basicauth is enabled") } - username, password, _ := in.httpRequest.BasicAuth() + username, password, _ := r.BasicAuth() if in.config.Username != username || in.config.Password != password { return http.StatusUnauthorized, in.createErrorMessage("Incorrect username or password") } @@ -210,13 +208,13 @@ func (in *HttpEndpoint) validateAuth() (uint, string) { } // Validates that body is not empty, not a list of objects and valid JSON -func (in *HttpEndpoint) validateBody() (uint, string) { +func (in *HttpEndpoint) validateBody(w http.ResponseWriter, r *http.Request) (uint, string) { var isObject string - if in.httpRequest.Body == http.NoBody { + if r.Body == http.NoBody { return http.StatusNotAcceptable, in.createErrorMessage("Body cannot be empty") } - body, err := ioutil.ReadAll(in.httpRequest.Body) + body, err := ioutil.ReadAll(r.Body) if err != nil { return http.StatusInternalServerError, in.createErrorMessage("Unable to read body") } @@ -238,8 +236,8 @@ func (in *HttpEndpoint) validateBody() (uint, string) { } // Ensure only valid HTTP Methods used -func (in *HttpEndpoint) validateMethod() (uint, string) { - if in.httpRequest.Method != http.MethodPost { +func (in *HttpEndpoint) validateMethod(w http.ResponseWriter, r *http.Request) (uint, string) { + if r.Method != http.MethodPost { return http.StatusMethodNotAllowed, in.createErrorMessage("Only POST requests supported") } @@ -263,9 +261,9 @@ func (in *HttpEndpoint) isObjectOrList(b []byte) string { return "" } -func (in *HttpEndpoint) sendResponse(h uint, b string) { - in.httpResponse.WriteHeader(int(h)) - in.httpResponse.Write([]byte(b)) +func (in *HttpEndpoint) sendResponse(w http.ResponseWriter, h uint, b string) { + w.WriteHeader(int(h)) + w.Write([]byte(b)) } // Validates incoming requests and sends a new event upon success @@ -273,15 +271,12 @@ func (in *HttpEndpoint) apiResponse(w http.ResponseWriter, r *http.Request) { var err string var status uint - in.httpRequest = *r - in.httpResponse = w - // This triggers both validation and event creation - status, err = in.sendEvent() + status, err = in.sendEvent(w, r) if err != "" || status != 0 { - in.sendResponse(status, err) + in.sendResponse(w, status, err) return } - in.sendResponse(http.StatusOK, in.config.ResponseBody) + in.sendResponse(w, http.StatusOK, in.config.ResponseBody) } From 04be474d9e97b8f16c73db755c6d7cb851e04041 Mon Sep 17 00:00:00 2001 From: P1llus Date: Thu, 14 May 2020 09:41:59 +0200 Subject: [PATCH 13/27] removing variable declaration to be more idiomatic --- x-pack/filebeat/input/http_endpoint/input.go | 26 ++++++-------------- 1 file changed, 8 insertions(+), 18 deletions(-) diff --git a/x-pack/filebeat/input/http_endpoint/input.go b/x-pack/filebeat/input/http_endpoint/input.go index f0821da8c9b8..75b09570bb18 100644 --- a/x-pack/filebeat/input/http_endpoint/input.go +++ b/x-pack/filebeat/input/http_endpoint/input.go @@ -134,10 +134,7 @@ func (in *HttpEndpoint) Wait() { // Validates the incoming request and creates a new event upon success func (in *HttpEndpoint) sendEvent(w http.ResponseWriter, r *http.Request) (uint, string) { - var err string - var status uint - - status, err = in.validateRequest(w, r) + status, err := in.validateRequest(w, r) if err != "" || status != 0 { return status, err } @@ -157,17 +154,14 @@ func (in *HttpEndpoint) sendEvent(w http.ResponseWriter, r *http.Request) (uint, // Runs all validations for each request func (in *HttpEndpoint) validateRequest(w http.ResponseWriter, r *http.Request) (uint, string) { - var err string - var status uint - if in.config.BasicAuth { - status, err = in.validateAuth(w, r) - } - if err != "" && status != 0 { - return status, err + status, err := in.validateAuth(w, r) + if err != "" && status != 0 { + return status, err + } } - status, err = in.validateHeader(w, r) + status, err := in.validateHeader(w, r) if err != "" && status != 0 { return status, err } @@ -209,7 +203,6 @@ func (in *HttpEndpoint) validateAuth(w http.ResponseWriter, r *http.Request) (ui // Validates that body is not empty, not a list of objects and valid JSON func (in *HttpEndpoint) validateBody(w http.ResponseWriter, r *http.Request) (uint, string) { - var isObject string if r.Body == http.NoBody { return http.StatusNotAcceptable, in.createErrorMessage("Body cannot be empty") } @@ -219,7 +212,7 @@ func (in *HttpEndpoint) validateBody(w http.ResponseWriter, r *http.Request) (ui return http.StatusInternalServerError, in.createErrorMessage("Unable to read body") } - isObject = in.isObjectOrList(body) + isObject := in.isObjectOrList(body) if isObject == "list" { return http.StatusBadRequest, in.createErrorMessage("List of JSON objects is not supported") } @@ -268,11 +261,8 @@ func (in *HttpEndpoint) sendResponse(w http.ResponseWriter, h uint, b string) { // Validates incoming requests and sends a new event upon success func (in *HttpEndpoint) apiResponse(w http.ResponseWriter, r *http.Request) { - var err string - var status uint - // This triggers both validation and event creation - status, err = in.sendEvent(w, r) + status, err := in.sendEvent(w, r) if err != "" || status != 0 { in.sendResponse(w, status, err) return From aa27ac965dcffaf91ca6c24c67eaee489967a989 Mon Sep 17 00:00:00 2001 From: P1llus Date: Fri, 15 May 2020 10:58:30 +0200 Subject: [PATCH 14/27] adding basic test, currently not working --- .../tests/system/test_http_endpoint.py | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 x-pack/filebeat/tests/system/test_http_endpoint.py diff --git a/x-pack/filebeat/tests/system/test_http_endpoint.py b/x-pack/filebeat/tests/system/test_http_endpoint.py new file mode 100644 index 000000000000..d3322281156c --- /dev/null +++ b/x-pack/filebeat/tests/system/test_http_endpoint.py @@ -0,0 +1,35 @@ +import requests +import sys +import os + +sys.path.append(os.path.join(os.path.dirname(__file__), '../../../../filebeat/tests/system/')) +from filebeat import BaseTest + + +class Test(BaseTest): + """ + Test filebeat with the http_endpoint input + """ + + def test_http_endpoint_without_ssl(self): + """ + Test http_endpoint input with HTTP events. + """ + host = "127.0.0.1" + port = 8081 + input_raw = """ +- type: http_endpoint + enabled: true + listen_address: {} + listen_port: {} +""" + + input_raw = input_raw.format(host, port) + self.render_config_template( + input_raw=input_raw, + inputs=False, + ) + + filebeat = self.start_beat() + + self.wait_until(lambda: self.log_contains("Starting HTTP server on 127.0.0.1:8081")) \ No newline at end of file From 4d2cc8b8da7651ffc428262236c36a78205b00e4 Mon Sep 17 00:00:00 2001 From: P1llus Date: Fri, 15 May 2020 16:32:37 +0200 Subject: [PATCH 15/27] forgot to add method validation --- x-pack/filebeat/input/http_endpoint/input.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/x-pack/filebeat/input/http_endpoint/input.go b/x-pack/filebeat/input/http_endpoint/input.go index 75b09570bb18..71f53da747ba 100644 --- a/x-pack/filebeat/input/http_endpoint/input.go +++ b/x-pack/filebeat/input/http_endpoint/input.go @@ -161,7 +161,12 @@ func (in *HttpEndpoint) validateRequest(w http.ResponseWriter, r *http.Request) } } - status, err := in.validateHeader(w, r) + status, err := in.validateMethod(w, r) + if err != "" && status != 0 { + return status, err + } + + status, err = in.validateHeader(w, r) if err != "" && status != 0 { return status, err } From 07c00d6788b1286c472a308b0604c5a770d9476b Mon Sep 17 00:00:00 2001 From: P1llus Date: Fri, 15 May 2020 16:32:52 +0200 Subject: [PATCH 16/27] make ALLL the tests! --- .../tests/system/test_http_endpoint.py | 195 +++++++++++++++++- 1 file changed, 189 insertions(+), 6 deletions(-) diff --git a/x-pack/filebeat/tests/system/test_http_endpoint.py b/x-pack/filebeat/tests/system/test_http_endpoint.py index d3322281156c..9c4a93de523d 100644 --- a/x-pack/filebeat/tests/system/test_http_endpoint.py +++ b/x-pack/filebeat/tests/system/test_http_endpoint.py @@ -1,8 +1,13 @@ +import jinja2 import requests import sys import os +import json +import time +from requests.auth import HTTPBasicAuth + +sys.path.append(os.path.join(os.path.dirname(__file__), '../../../../filebeat/tests/system')) -sys.path.append(os.path.join(os.path.dirname(__file__), '../../../../filebeat/tests/system/')) from filebeat import BaseTest @@ -10,26 +15,204 @@ class Test(BaseTest): """ Test filebeat with the http_endpoint input """ + @classmethod + def setUpClass(self): + self.beat_name = "filebeat" + self.beat_path = os.path.abspath( + os.path.join(os.path.dirname(__file__), "../../")) + + super(BaseTest, self).setUpClass() + + def setUp(self): + super(BaseTest, self).setUp() + + # Hack to make jinja2 have the right paths + self.template_env = jinja2.Environment( + loader=jinja2.FileSystemLoader([ + os.path.abspath(os.path.join(self.beat_path, "../../filebeat")), + os.path.abspath(os.path.join(self.beat_path, "../../libbeat")) + ]) + ) - def test_http_endpoint_without_ssl(self): + def get_config(self, options=None): """ - Test http_endpoint input with HTTP events. + General function so that we do not have to define settings each time """ host = "127.0.0.1" port = 8081 input_raw = """ - type: http_endpoint enabled: true - listen_address: {} - listen_port: {} + listen_address: {} + listen_port: {} """ + if options: + input_raw = '\n'.join([input_raw, options]) + self.beat_name = "filebeat" + self.beat_path = os.path.abspath( + os.path.join(os.path.dirname(__file__), "../../")) input_raw = input_raw.format(host, port) self.render_config_template( input_raw=input_raw, inputs=False, ) + self.host = host + self.port = port + self.prefix = 'testmessage' + self.url = f"http://{host}:{port}/" + + def test_http_endpoint_request(self): + """ + Test http_endpoint input with HTTP events. + """ + self.get_config() + filebeat = self.start_beat() + self.wait_until(lambda: self.log_contains(f"Starting HTTP server on {self.host}:{self.port}")) + + message = "somerandommessage" + payload = {self.prefix: message} + headers = {"Content-Type": "application/json", "Accept": "application/json"} + r = requests.post(self.url, headers=headers, data=json.dumps(payload)) + + self.wait_until(lambda: self.output_count(lambda x: x >= 1)) + filebeat.check_kill_and_wait() + + output = self.read_output() + + assert output[0]["input.type"] == "http_endpoint" + assert output[0][f"json.{self.prefix}"] == message + assert r.text == '{"message": "success"}' + + def test_http_endpoint_wrong_content_header(self): + """ + Test http_endpoint input with wrong content header. + """ + self.get_config() + filebeat = self.start_beat() + self.wait_until(lambda: self.log_contains(f"Starting HTTP server on {self.host}:{self.port}")) + + message = "somerandommessage" + payload = {self.prefix: message} + headers = {"Content-Type": "application/xml", "Accept": "application/json"} + r = requests.post(self.url, headers=headers, data=json.dumps(payload)) + + filebeat.check_kill_and_wait() + + assert r.status_code == 415 + assert r.text == '{"message": "Wrong Content-Type header, expecting application/json"}' + + def test_http_endpoint_wrong_accept_header(self): + """ + Test http_endpoint input with wrong accept header. + """ + self.get_config() + filebeat = self.start_beat() + self.wait_until(lambda: self.log_contains(f"Starting HTTP server on {self.host}:{self.port}")) + + message = "somerandommessage" + payload = {self.prefix: message} + headers = {"Content-Type": "application/json", "Accept": "application/xml"} + r = requests.post(self.url, headers=headers, data=json.dumps(payload)) + + filebeat.check_kill_and_wait() + assert r.status_code == 406 + assert r.text == '{"message": "Wrong Accept header, expecting application/json"}' + + def test_http_endpoint_missing_auth_value(self): + """ + Test http_endpoint input with missing basic auth values. + """ + options = """ + basic_auth: true + username: testuser + password: +""" + self.get_config(options) filebeat = self.start_beat() + self.wait_until(lambda: self.log_contains(f"Starting HTTP server on {self.host}:{self.port}")) + + message = "somerandommessage" + payload = {self.prefix: message} + headers = {"Content-Type": "application/json", "Accept": "application/json"} + r = requests.post(self.url, headers=headers, data=json.dumps(payload), auth=HTTPBasicAuth('testuser','something')) + + filebeat.check_kill_and_wait() + + assert r.status_code == 401 + assert r.text == '{"message": "Username and password required when basicauth is enabled"}' + + def test_http_endpoint_wrong_auth_value(self): + """ + Test http_endpoint input with wrong basic auth values. + """ + options = """ + basic_auth: true + username: testuser + password: testpassword +""" + self.get_config(options) + filebeat = self.start_beat() + self.wait_until(lambda: self.log_contains(f"Starting HTTP server on {self.host}:{self.port}")) + + message = "somerandommessage" + payload = {self.prefix: message} + headers = {"Content-Type": "application/json", "Accept": "application/json"} + r = requests.post(self.url, headers=headers, data=json.dumps(payload), auth=HTTPBasicAuth('testuser','qwerty')) + + filebeat.check_kill_and_wait() + + assert r.status_code == 401 + assert r.text == '{"message": "Incorrect username or password"}' + + def test_http_endpoint_empty_body(self): + """ + Test http_endpoint input with empty body. + """ + self.get_config() + filebeat = self.start_beat() + self.wait_until(lambda: self.log_contains(f"Starting HTTP server on {self.host}:{self.port}")) + + headers = {"Content-Type": "application/json", "Accept": "application/json"} + r = requests.post(self.url, headers=headers, data="") + + filebeat.check_kill_and_wait() + + assert r.status_code == 406 + assert r.text == '{"message": "Body cannot be empty"}' + + def test_http_endpoint_malformed_json(self): + """ + Test http_endpoint input with malformed body. + """ + + self.get_config() + filebeat = self.start_beat() + self.wait_until(lambda: self.log_contains(f"Starting HTTP server on {self.host}:{self.port}")) + payload = '{"message::":: "something"}' + headers = {"Content-Type": "application/json", "Accept": "application/json"} + r = requests.post(self.url, headers=headers, data=payload) + + filebeat.check_kill_and_wait() + + assert r.status_code == 400 + assert r.text == '{"message": "Malformed JSON body"}' + + def test_http_endpoint_get_request(self): + """ + Test http_endpoint input with GET request. + """ + + self.get_config() + filebeat = self.start_beat() + self.wait_until(lambda: self.log_contains(f"Starting HTTP server on {self.host}:{self.port}")) + message = "somerandommessage" + payload = {self.prefix: message} + headers = {"Content-Type": "application/json", "Accept": "application/json"} + r = requests.get(self.url, headers=headers, data=json.dumps(payload)) + + filebeat.check_kill_and_wait() - self.wait_until(lambda: self.log_contains("Starting HTTP server on 127.0.0.1:8081")) \ No newline at end of file + assert r.status_code == 405 + assert r.text == '{"message": "Only POST requests supported"}' \ No newline at end of file From a59503ee3f996b2c16fa206ff62e6374ae6ab475 Mon Sep 17 00:00:00 2001 From: P1llus Date: Sun, 17 May 2020 15:30:11 +0200 Subject: [PATCH 17/27] added changes from PR comments and mage fmt --- x-pack/filebeat/input/http_endpoint/config.go | 42 ++++----- .../input/http_endpoint/httpserver.go | 12 ++- x-pack/filebeat/input/http_endpoint/input.go | 94 ++++++++++--------- .../tests/system/test_http_endpoint.py | 9 +- 4 files changed, 83 insertions(+), 74 deletions(-) diff --git a/x-pack/filebeat/input/http_endpoint/config.go b/x-pack/filebeat/input/http_endpoint/config.go index 10b967d657af..faf422bde056 100644 --- a/x-pack/filebeat/input/http_endpoint/config.go +++ b/x-pack/filebeat/input/http_endpoint/config.go @@ -10,30 +10,30 @@ import ( // Config contains information about httpjson configuration type config struct { - TLS *tlscommon.ServerConfig `config:"ssl"` - BasicAuth bool `config:"basic_auth"` - Username string `config:"username"` - Password string `config:"password"` - ResponseCode int `config:"response_code" validate:"positive"` - ResponseBody string `config:"response_body"` - ResponseHeaders string `config:"response_headers"` - ListenAddress string `config:"listen_address"` - ListenPort string `config:"listen_port"` - URL string `config:"url"` - Prefix string `config:"prefix"` + TLS *tlscommon.ServerConfig `config:"ssl"` + BasicAuth bool `config:"basic_auth"` + Username string `config:"username"` + Password string `config:"password"` + ResponseCode int `config:"response_code" validate:"positive"` + ResponseBody string `config:"response_body"` + ResponseHeader string `config:"response_headers"` + ListenAddress string `config:"listen_address"` + ListenPort string `config:"listen_port"` + URL string `config:"url"` + Prefix string `config:"prefix"` } func defaultConfig() config { return config{ - BasicAuth: false, - Username: "", - Password: "", - ResponseCode: 200, - ResponseBody: `{"message": "success"}`, - ResponseHeaders: `{"Content-Type": "application/json"}`, - ListenAddress: "127.0.0.1", - ListenPort: "8000", - URL: "/", - Prefix: "json", + BasicAuth: false, + Username: "", + Password: "", + ResponseCode: 200, + ResponseBody: `{"message": "success"}`, + ResponseHeader: `{"Content-Type": "application/json"}`, + ListenAddress: "127.0.0.1", + ListenPort: "8000", + URL: "/", + Prefix: "json", } } diff --git a/x-pack/filebeat/input/http_endpoint/httpserver.go b/x-pack/filebeat/input/http_endpoint/httpserver.go index 3a43f081a29f..de1c8a73fe84 100644 --- a/x-pack/filebeat/input/http_endpoint/httpserver.go +++ b/x-pack/filebeat/input/http_endpoint/httpserver.go @@ -7,6 +7,7 @@ package http_endpoint import ( "context" "net/http" + "time" "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" "github.com/elastic/beats/v7/libbeat/logp" @@ -41,15 +42,20 @@ func (h *HttpServer) Start() { func (h *HttpServer) Stop() { h.log.Info("Stopping HTTP server") h.stop() - h.server.Shutdown(h.ctx) + if err := h.server.Shutdown(h.ctx); err != nil { + h.log.Fatalf("Unable to stop HTTP server due to error: %v", err) + } } func createServer(in *HttpEndpoint) (*HttpServer, error) { + mux := http.NewServeMux() + responseHandler := http.HandlerFunc(in.apiResponse) + mux.Handle(in.config.URL, in.validateRequest(responseHandler)) server := &http.Server{ Addr: in.config.ListenAddress + ":" + in.config.ListenPort, + Handler: mux, } - http.HandleFunc(in.config.URL, in.apiResponse) tlsConfig, err := tlscommon.LoadTLSServerConfig(in.config.TLS) if err != nil { return nil, err @@ -59,7 +65,7 @@ func createServer(in *HttpEndpoint) (*HttpServer, error) { server.TLSConfig = tlsConfig.BuildModuleConfig(in.config.ListenAddress) } - ctx, cancel := context.WithCancel(context.TODO()) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) h := &HttpServer{ ctx: ctx, stop: cancel, diff --git a/x-pack/filebeat/input/http_endpoint/input.go b/x-pack/filebeat/input/http_endpoint/input.go index 71f53da747ba..779059703e30 100644 --- a/x-pack/filebeat/input/http_endpoint/input.go +++ b/x-pack/filebeat/input/http_endpoint/input.go @@ -46,6 +46,7 @@ type HttpEndpoint struct { workerWg sync.WaitGroup // Waits on worker goroutine. server *HttpServer // Server instance eventObject *map[string]interface{} // Current event object + finalHandler http.HandlerFunc } // NewInput creates a new httpjson input @@ -101,6 +102,7 @@ func NewInput( } in.log.Infof("Initialized %v input on %v:%v", inputName, in.config.ListenAddress, in.config.ListenPort) + return in, nil } @@ -132,13 +134,8 @@ func (in *HttpEndpoint) Wait() { in.Stop() } -// Validates the incoming request and creates a new event upon success -func (in *HttpEndpoint) sendEvent(w http.ResponseWriter, r *http.Request) (uint, string) { - status, err := in.validateRequest(w, r) - if err != "" || status != 0 { - return status, err - } - +// If middleware validation successed, event is sent +func (in *HttpEndpoint) sendEvent(w http.ResponseWriter, r *http.Request) { event := in.outlet.OnEvent(beat.Event{ Timestamp: time.Now().UTC(), Fields: common.MapStr{ @@ -146,38 +143,60 @@ func (in *HttpEndpoint) sendEvent(w http.ResponseWriter, r *http.Request) (uint, }, }) if !event { - return http.StatusInternalServerError, in.createErrorMessage("Unable to send event") + in.sendResponse(w, http.StatusInternalServerError, in.createErrorMessage("Unable to send event")) + return } - return 0, "" + return } -// Runs all validations for each request -func (in *HttpEndpoint) validateRequest(w http.ResponseWriter, r *http.Request) (uint, string) { - if in.config.BasicAuth { - status, err := in.validateAuth(w, r) - if err != "" && status != 0 { - return status, err - } - } +func (in *HttpEndpoint) sendResponse(w http.ResponseWriter, h uint, b string) { + w.WriteHeader(int(h)) + w.Write([]byte(b)) +} - status, err := in.validateMethod(w, r) - if err != "" && status != 0 { - return status, err +// Triggers if middleware validation returns successful +func (in *HttpEndpoint) apiResponse(w http.ResponseWriter, r *http.Request) { + objmap := make(map[string]string) + json.Unmarshal([]byte(in.config.ResponseHeader), &objmap) + for k, v := range objmap { + w.Header().Set(k, v) } + in.sendEvent(w, r) + in.sendResponse(w, uint(in.config.ResponseCode), in.config.ResponseBody) +} - status, err = in.validateHeader(w, r) - if err != "" && status != 0 { - return status, err - } +// Runs all validations for each request +func (in *HttpEndpoint) validateRequest(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if in.config.BasicAuth { + status, err := in.validateAuth(w, r) + if err != "" && status != 0 { + in.sendResponse(w, status, err) + return + } + } - status, err = in.validateBody(w, r) - if err != "" && status != 0 { - return status, err - } + status, err := in.validateMethod(w, r) + if err != "" && status != 0 { + in.sendResponse(w, status, err) + return + } - return 0, "" + status, err = in.validateHeader(w, r) + if err != "" && status != 0 { + in.sendResponse(w, status, err) + return + } + status, err = in.validateBody(w, r) + if err != "" && status != 0 { + in.sendResponse(w, status, err) + return + } + + next.ServeHTTP(w, r) + }) } // Validate that only supported Accept and Content type headers are used @@ -258,20 +277,3 @@ func (in *HttpEndpoint) isObjectOrList(b []byte) string { return "" } - -func (in *HttpEndpoint) sendResponse(w http.ResponseWriter, h uint, b string) { - w.WriteHeader(int(h)) - w.Write([]byte(b)) -} - -// Validates incoming requests and sends a new event upon success -func (in *HttpEndpoint) apiResponse(w http.ResponseWriter, r *http.Request) { - // This triggers both validation and event creation - status, err := in.sendEvent(w, r) - if err != "" || status != 0 { - in.sendResponse(w, status, err) - return - } - - in.sendResponse(w, http.StatusOK, in.config.ResponseBody) -} diff --git a/x-pack/filebeat/tests/system/test_http_endpoint.py b/x-pack/filebeat/tests/system/test_http_endpoint.py index 9c4a93de523d..2479b9694eb3 100644 --- a/x-pack/filebeat/tests/system/test_http_endpoint.py +++ b/x-pack/filebeat/tests/system/test_http_endpoint.py @@ -50,7 +50,7 @@ def get_config(self, options=None): input_raw = '\n'.join([input_raw, options]) self.beat_name = "filebeat" self.beat_path = os.path.abspath( - os.path.join(os.path.dirname(__file__), "../../")) + os.path.join(os.path.dirname(__file__), "../../")) input_raw = input_raw.format(host, port) self.render_config_template( @@ -136,7 +136,8 @@ def test_http_endpoint_missing_auth_value(self): message = "somerandommessage" payload = {self.prefix: message} headers = {"Content-Type": "application/json", "Accept": "application/json"} - r = requests.post(self.url, headers=headers, data=json.dumps(payload), auth=HTTPBasicAuth('testuser','something')) + r = requests.post(self.url, headers=headers, data=json.dumps( + payload), auth=HTTPBasicAuth('testuser', 'something')) filebeat.check_kill_and_wait() @@ -159,7 +160,7 @@ def test_http_endpoint_wrong_auth_value(self): message = "somerandommessage" payload = {self.prefix: message} headers = {"Content-Type": "application/json", "Accept": "application/json"} - r = requests.post(self.url, headers=headers, data=json.dumps(payload), auth=HTTPBasicAuth('testuser','qwerty')) + r = requests.post(self.url, headers=headers, data=json.dumps(payload), auth=HTTPBasicAuth('testuser', 'qwerty')) filebeat.check_kill_and_wait() @@ -215,4 +216,4 @@ def test_http_endpoint_get_request(self): filebeat.check_kill_and_wait() assert r.status_code == 405 - assert r.text == '{"message": "Only POST requests supported"}' \ No newline at end of file + assert r.text == '{"message": "Only POST requests supported"}' From 23a5c525231294c5375b7d6cc6d3cfe76b06654b Mon Sep 17 00:00:00 2001 From: P1llus Date: Sun, 17 May 2020 15:59:12 +0200 Subject: [PATCH 18/27] small change on response header and adding documentation --- .../docs/inputs/input-http-endpoint.asciidoc | 122 ++++++++++++++++++ x-pack/filebeat/input/http_endpoint/config.go | 2 +- .../input/http_endpoint/httpserver.go | 2 +- 3 files changed, 124 insertions(+), 2 deletions(-) create mode 100644 x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc diff --git a/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc b/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc new file mode 100644 index 000000000000..6b52884ac6e6 --- /dev/null +++ b/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc @@ -0,0 +1,122 @@ +[role="xpack"] + +:type: http_endpoint + +[id="{beatname_lc}-input-{type}"] +=== HTTP Endpoint input + +++++ +HTTP Endpoint +++++ + +beta[] + +Use the `http_endpoint` input to create a HTTP listener that can receive incoming HTTP POST requests. + +This input can for example be used to receive incoming webhooks from a third-party application or service. + +Example configurations: + +Basic example: +["source","yaml",subs="attributes"] +---- +{beatname_lc}.inputs: +- type: http_endpoint + enabled: true + listen_address: 192.168.1.1 + listen_port: 8080 +---- + +Custom response example: +["source","yaml",subs="attributes"] +---- +{beatname_lc}.inputs: +- type: http_endpoint + enabled: true + listen_address: 192.168.1.1 + listen_port: 8080 + response_code: 200 + response_body: '{"message": "success"}' + response_header: '{"Content-Type": "application/json"}' + url: "/" + prefix: "json" +---- + +Basic auth and SSL example: +["source","yaml",subs="attributes"] +---- +{beatname_lc}.inputs: +- type: http_endpoint + enabled: true + listen_address: 192.168.1.1 + listen_port: 8080 + ssl.enabled: true + ssl.certificate: "/home/user/server.pem" + ssl.key: "/home/user/server.key" + ssl.verification_mode: "none" + ssl.certificate_authority: "/home/user/ca.pem" + basic_auth: true + username: someuser + password: somepassword +---- + + +==== Configuration options + +The `http_endpoint` input supports the following configuration options plus the +<<{beatname_lc}-input-{type}-common-options>> described later. + +[float] +==== `basic_auth` + +Enables or disables HTTP basic auth for each incoming request. If enabled then `username` and `password` will also need to be configured. + +[float] +==== `username` + +If `basic_auth` is enabled, this is the username used for authentication against the HTTP listener. Requires `password` to also be set. + +[float] +==== `password` + +If `basic_auth` is eanbled, this is the password used for authentication against the HTTP listener. Requires `username` to also be set. + +[float] +==== `response_code` + +The HTTP response code returned upon success. Should be in the 2XX range. + +[float] +==== `response_header` + +The response header returned upon success. + +[float] +==== `response_body` + +The response body returned upon success. + +[float] +==== `listen_address` + +If multiple interfaces is present the `listen_address` can be set to control which IP address the listener binds to. Defaults to `127.0.0.1`. + +[float] +==== `listen_port` + +Which port the listener binds to. Defaults to 8000 + +[float] +==== `url` + +This options specific which URL path to accept requests on. Defaults to `/` + +[float] +==== `prefix` + +This option specifies which prefix the incoming request will be mapped to. + +[id="{beatname_lc}-input-{type}-common-options"] +include::../../../../filebeat/docs/inputs/input-common-options.asciidoc[] + +:type!: diff --git a/x-pack/filebeat/input/http_endpoint/config.go b/x-pack/filebeat/input/http_endpoint/config.go index faf422bde056..4536f46ea2fd 100644 --- a/x-pack/filebeat/input/http_endpoint/config.go +++ b/x-pack/filebeat/input/http_endpoint/config.go @@ -16,7 +16,7 @@ type config struct { Password string `config:"password"` ResponseCode int `config:"response_code" validate:"positive"` ResponseBody string `config:"response_body"` - ResponseHeader string `config:"response_headers"` + ResponseHeader string `config:"response_header"` ListenAddress string `config:"listen_address"` ListenPort string `config:"listen_port"` URL string `config:"url"` diff --git a/x-pack/filebeat/input/http_endpoint/httpserver.go b/x-pack/filebeat/input/http_endpoint/httpserver.go index de1c8a73fe84..68325caaeb48 100644 --- a/x-pack/filebeat/input/http_endpoint/httpserver.go +++ b/x-pack/filebeat/input/http_endpoint/httpserver.go @@ -52,7 +52,7 @@ func createServer(in *HttpEndpoint) (*HttpServer, error) { responseHandler := http.HandlerFunc(in.apiResponse) mux.Handle(in.config.URL, in.validateRequest(responseHandler)) server := &http.Server{ - Addr: in.config.ListenAddress + ":" + in.config.ListenPort, + Addr: in.config.ListenAddress + ":" + in.config.ListenPort, Handler: mux, } From 219eb08a39e22b96fa3dfa8fba547c9ce7ab4be7 Mon Sep 17 00:00:00 2001 From: P1llus Date: Mon, 18 May 2020 11:32:26 +0200 Subject: [PATCH 19/27] including new input docs --- filebeat/docs/filebeat-options.asciidoc | 3 +++ 1 file changed, 3 insertions(+) diff --git a/filebeat/docs/filebeat-options.asciidoc b/filebeat/docs/filebeat-options.asciidoc index 6f9a49d43fd4..784bd8e0986a 100644 --- a/filebeat/docs/filebeat-options.asciidoc +++ b/filebeat/docs/filebeat-options.asciidoc @@ -47,6 +47,7 @@ You can configure {beatname_uc} to use the following inputs: * <<{beatname_lc}-input-container>> * <<{beatname_lc}-input-docker>> * <<{beatname_lc}-input-google-pubsub>> +* <<{beatname_lc}-input-http-endpoint>> * <<{beatname_lc}-input-httpjson>> * <<{beatname_lc}-input-kafka>> * <<{beatname_lc}-input-log>> @@ -73,6 +74,8 @@ include::inputs/input-docker.asciidoc[] include::../../x-pack/filebeat/docs/inputs/input-google-pubsub.asciidoc[] +include::../../x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc[] + include::../../x-pack/filebeat/docs/inputs/input-httpjson.asciidoc[] include::inputs/input-kafka.asciidoc[] From 7f6abed62971a48381ea7b6e0aec2b89f6e18d49 Mon Sep 17 00:00:00 2001 From: P1llus Date: Mon, 18 May 2020 11:32:59 +0200 Subject: [PATCH 20/27] changing to older string formatting to support python version in nosetest --- .../tests/system/test_http_endpoint.py | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/x-pack/filebeat/tests/system/test_http_endpoint.py b/x-pack/filebeat/tests/system/test_http_endpoint.py index 2479b9694eb3..4203918a693e 100644 --- a/x-pack/filebeat/tests/system/test_http_endpoint.py +++ b/x-pack/filebeat/tests/system/test_http_endpoint.py @@ -3,7 +3,6 @@ import sys import os import json -import time from requests.auth import HTTPBasicAuth sys.path.append(os.path.join(os.path.dirname(__file__), '../../../../filebeat/tests/system')) @@ -60,7 +59,7 @@ def get_config(self, options=None): self.host = host self.port = port self.prefix = 'testmessage' - self.url = f"http://{host}:{port}/" + self.url = "http://{}:{}/".format(host, port) def test_http_endpoint_request(self): """ @@ -68,7 +67,7 @@ def test_http_endpoint_request(self): """ self.get_config() filebeat = self.start_beat() - self.wait_until(lambda: self.log_contains(f"Starting HTTP server on {self.host}:{self.port}")) + self.wait_until(lambda: self.log_contains("Starting HTTP server on {}:{}".format(self.host, self.port))) message = "somerandommessage" payload = {self.prefix: message} @@ -81,7 +80,7 @@ def test_http_endpoint_request(self): output = self.read_output() assert output[0]["input.type"] == "http_endpoint" - assert output[0][f"json.{self.prefix}"] == message + assert output[0]["json.{}".format(self.prefix)] == message assert r.text == '{"message": "success"}' def test_http_endpoint_wrong_content_header(self): @@ -90,7 +89,7 @@ def test_http_endpoint_wrong_content_header(self): """ self.get_config() filebeat = self.start_beat() - self.wait_until(lambda: self.log_contains(f"Starting HTTP server on {self.host}:{self.port}")) + self.wait_until(lambda: self.log_contains("Starting HTTP server on {}:{}".format(self.host, self.port))) message = "somerandommessage" payload = {self.prefix: message} @@ -108,7 +107,7 @@ def test_http_endpoint_wrong_accept_header(self): """ self.get_config() filebeat = self.start_beat() - self.wait_until(lambda: self.log_contains(f"Starting HTTP server on {self.host}:{self.port}")) + self.wait_until(lambda: self.log_contains("Starting HTTP server on {}:{}".format(self.host, self.port))) message = "somerandommessage" payload = {self.prefix: message} @@ -131,7 +130,7 @@ def test_http_endpoint_missing_auth_value(self): """ self.get_config(options) filebeat = self.start_beat() - self.wait_until(lambda: self.log_contains(f"Starting HTTP server on {self.host}:{self.port}")) + self.wait_until(lambda: self.log_contains("Starting HTTP server on {}:{}".format(self.host, self.port))) message = "somerandommessage" payload = {self.prefix: message} @@ -155,7 +154,7 @@ def test_http_endpoint_wrong_auth_value(self): """ self.get_config(options) filebeat = self.start_beat() - self.wait_until(lambda: self.log_contains(f"Starting HTTP server on {self.host}:{self.port}")) + self.wait_until(lambda: self.log_contains("Starting HTTP server on {}:{}".format(self.host, self.port))) message = "somerandommessage" payload = {self.prefix: message} @@ -173,7 +172,7 @@ def test_http_endpoint_empty_body(self): """ self.get_config() filebeat = self.start_beat() - self.wait_until(lambda: self.log_contains(f"Starting HTTP server on {self.host}:{self.port}")) + self.wait_until(lambda: self.log_contains("Starting HTTP server on {}:{}".format(self.host, self.port))) headers = {"Content-Type": "application/json", "Accept": "application/json"} r = requests.post(self.url, headers=headers, data="") @@ -190,7 +189,7 @@ def test_http_endpoint_malformed_json(self): self.get_config() filebeat = self.start_beat() - self.wait_until(lambda: self.log_contains(f"Starting HTTP server on {self.host}:{self.port}")) + self.wait_until(lambda: self.log_contains("Starting HTTP server on {}:{}".format(self.host, self.port))) payload = '{"message::":: "something"}' headers = {"Content-Type": "application/json", "Accept": "application/json"} r = requests.post(self.url, headers=headers, data=payload) @@ -207,7 +206,7 @@ def test_http_endpoint_get_request(self): self.get_config() filebeat = self.start_beat() - self.wait_until(lambda: self.log_contains(f"Starting HTTP server on {self.host}:{self.port}")) + self.wait_until(lambda: self.log_contains("Starting HTTP server on {}:{}".format(self.host, self.port))) message = "somerandommessage" payload = {self.prefix: message} headers = {"Content-Type": "application/json", "Accept": "application/json"} From f6032a46c98c24f2855b98dc304c2ec13f864243 Mon Sep 17 00:00:00 2001 From: P1llus Date: Mon, 18 May 2020 11:41:32 +0200 Subject: [PATCH 21/27] wrong doc reference? --- filebeat/docs/filebeat-options.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/filebeat/docs/filebeat-options.asciidoc b/filebeat/docs/filebeat-options.asciidoc index 784bd8e0986a..456588808544 100644 --- a/filebeat/docs/filebeat-options.asciidoc +++ b/filebeat/docs/filebeat-options.asciidoc @@ -47,7 +47,7 @@ You can configure {beatname_uc} to use the following inputs: * <<{beatname_lc}-input-container>> * <<{beatname_lc}-input-docker>> * <<{beatname_lc}-input-google-pubsub>> -* <<{beatname_lc}-input-http-endpoint>> +* <<{beatname_lc}-input-http_endpoint>> * <<{beatname_lc}-input-httpjson>> * <<{beatname_lc}-input-kafka>> * <<{beatname_lc}-input-log>> From e1880406c177f6353c777d8574fa9e676c07e796 Mon Sep 17 00:00:00 2001 From: P1llus Date: Tue, 19 May 2020 09:38:59 +0200 Subject: [PATCH 22/27] removing response header, updating docs and modify based on PR comments --- .../docs/inputs/input-http-endpoint.asciidoc | 6 ------ x-pack/filebeat/input/http_endpoint/config.go | 1 - x-pack/filebeat/input/http_endpoint/input.go | 21 ++++++------------- 3 files changed, 6 insertions(+), 22 deletions(-) diff --git a/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc b/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc index 6b52884ac6e6..2a949b01d26a 100644 --- a/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc @@ -37,7 +37,6 @@ Custom response example: listen_port: 8080 response_code: 200 response_body: '{"message": "success"}' - response_header: '{"Content-Type": "application/json"}' url: "/" prefix: "json" ---- @@ -86,11 +85,6 @@ If `basic_auth` is eanbled, this is the password used for authentication against The HTTP response code returned upon success. Should be in the 2XX range. -[float] -==== `response_header` - -The response header returned upon success. - [float] ==== `response_body` diff --git a/x-pack/filebeat/input/http_endpoint/config.go b/x-pack/filebeat/input/http_endpoint/config.go index 4536f46ea2fd..bb25b7f6e38a 100644 --- a/x-pack/filebeat/input/http_endpoint/config.go +++ b/x-pack/filebeat/input/http_endpoint/config.go @@ -16,7 +16,6 @@ type config struct { Password string `config:"password"` ResponseCode int `config:"response_code" validate:"positive"` ResponseBody string `config:"response_body"` - ResponseHeader string `config:"response_header"` ListenAddress string `config:"listen_address"` ListenPort string `config:"listen_port"` URL string `config:"url"` diff --git a/x-pack/filebeat/input/http_endpoint/input.go b/x-pack/filebeat/input/http_endpoint/input.go index 779059703e30..8c6211f8147a 100644 --- a/x-pack/filebeat/input/http_endpoint/input.go +++ b/x-pack/filebeat/input/http_endpoint/input.go @@ -119,8 +119,6 @@ func (in *HttpEndpoint) run() { defer in.workerWg.Done() defer in.log.Infof("%v worker has stopped.", inputName) in.server.Start() - - return } // Stops HTTP input and waits for it to finish @@ -144,28 +142,21 @@ func (in *HttpEndpoint) sendEvent(w http.ResponseWriter, r *http.Request) { }) if !event { in.sendResponse(w, http.StatusInternalServerError, in.createErrorMessage("Unable to send event")) - return } - - return -} - -func (in *HttpEndpoint) sendResponse(w http.ResponseWriter, h uint, b string) { - w.WriteHeader(int(h)) - w.Write([]byte(b)) } // Triggers if middleware validation returns successful func (in *HttpEndpoint) apiResponse(w http.ResponseWriter, r *http.Request) { - objmap := make(map[string]string) - json.Unmarshal([]byte(in.config.ResponseHeader), &objmap) - for k, v := range objmap { - w.Header().Set(k, v) - } in.sendEvent(w, r) + w.Header().Add("Content-Type", "application/json") in.sendResponse(w, uint(in.config.ResponseCode), in.config.ResponseBody) } +func (in *HttpEndpoint) sendResponse(w http.ResponseWriter, h uint, b string) { + w.WriteHeader(int(h)) + w.Write([]byte(b)) +} + // Runs all validations for each request func (in *HttpEndpoint) validateRequest(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { From 94d2e272b63dc04f40ffeaa6e82b2dbcdf3e8572 Mon Sep 17 00:00:00 2001 From: P1llus Date: Tue, 19 May 2020 10:36:17 +0200 Subject: [PATCH 23/27] needed to remove responseheader from defaultconf as well --- x-pack/filebeat/input/http_endpoint/config.go | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/filebeat/input/http_endpoint/config.go b/x-pack/filebeat/input/http_endpoint/config.go index bb25b7f6e38a..9ac1eb91e6f4 100644 --- a/x-pack/filebeat/input/http_endpoint/config.go +++ b/x-pack/filebeat/input/http_endpoint/config.go @@ -29,7 +29,6 @@ func defaultConfig() config { Password: "", ResponseCode: 200, ResponseBody: `{"message": "success"}`, - ResponseHeader: `{"Content-Type": "application/json"}`, ListenAddress: "127.0.0.1", ListenPort: "8000", URL: "/", From 111b1e3646a69df1e16a631e5fcb260f2c1cbd73 Mon Sep 17 00:00:00 2001 From: P1llus Date: Tue, 19 May 2020 11:00:05 +0200 Subject: [PATCH 24/27] mage fmt update --- x-pack/filebeat/input/http_endpoint/config.go | 38 +++++++++---------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/x-pack/filebeat/input/http_endpoint/config.go b/x-pack/filebeat/input/http_endpoint/config.go index 9ac1eb91e6f4..1a470ace10c3 100644 --- a/x-pack/filebeat/input/http_endpoint/config.go +++ b/x-pack/filebeat/input/http_endpoint/config.go @@ -10,28 +10,28 @@ import ( // Config contains information about httpjson configuration type config struct { - TLS *tlscommon.ServerConfig `config:"ssl"` - BasicAuth bool `config:"basic_auth"` - Username string `config:"username"` - Password string `config:"password"` - ResponseCode int `config:"response_code" validate:"positive"` - ResponseBody string `config:"response_body"` - ListenAddress string `config:"listen_address"` - ListenPort string `config:"listen_port"` - URL string `config:"url"` - Prefix string `config:"prefix"` + TLS *tlscommon.ServerConfig `config:"ssl"` + BasicAuth bool `config:"basic_auth"` + Username string `config:"username"` + Password string `config:"password"` + ResponseCode int `config:"response_code" validate:"positive"` + ResponseBody string `config:"response_body"` + ListenAddress string `config:"listen_address"` + ListenPort string `config:"listen_port"` + URL string `config:"url"` + Prefix string `config:"prefix"` } func defaultConfig() config { return config{ - BasicAuth: false, - Username: "", - Password: "", - ResponseCode: 200, - ResponseBody: `{"message": "success"}`, - ListenAddress: "127.0.0.1", - ListenPort: "8000", - URL: "/", - Prefix: "json", + BasicAuth: false, + Username: "", + Password: "", + ResponseCode: 200, + ResponseBody: `{"message": "success"}`, + ListenAddress: "127.0.0.1", + ListenPort: "8000", + URL: "/", + Prefix: "json", } } From fce4e8ea4b1d20861d1a7bec54c6abc35a222832 Mon Sep 17 00:00:00 2001 From: P1llus Date: Mon, 25 May 2020 08:45:02 +0200 Subject: [PATCH 25/27] validation for response body added --- x-pack/filebeat/input/http_endpoint/config.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/x-pack/filebeat/input/http_endpoint/config.go b/x-pack/filebeat/input/http_endpoint/config.go index 1a470ace10c3..d11e4c59c682 100644 --- a/x-pack/filebeat/input/http_endpoint/config.go +++ b/x-pack/filebeat/input/http_endpoint/config.go @@ -5,6 +5,8 @@ package http_endpoint import ( + "encoding/json" + "errors" "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" ) @@ -35,3 +37,11 @@ func defaultConfig() config { Prefix: "json", } } + +func (c *config) Validate() error { + if !json.Valid([]byte(c.ResponseBody)) { + return errors.New("response_body must be valid JSON") + } + + return nil +} From c9b9a81af093b1b9639dae9648233b2bcc334c9f Mon Sep 17 00:00:00 2001 From: P1llus Date: Mon, 25 May 2020 09:43:20 +0200 Subject: [PATCH 26/27] Mage fmt update --- x-pack/filebeat/input/http_endpoint/config.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/x-pack/filebeat/input/http_endpoint/config.go b/x-pack/filebeat/input/http_endpoint/config.go index d11e4c59c682..0626f5e2afd8 100644 --- a/x-pack/filebeat/input/http_endpoint/config.go +++ b/x-pack/filebeat/input/http_endpoint/config.go @@ -7,6 +7,7 @@ package http_endpoint import ( "encoding/json" "errors" + "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" ) @@ -39,9 +40,9 @@ func defaultConfig() config { } func (c *config) Validate() error { - if !json.Valid([]byte(c.ResponseBody)) { - return errors.New("response_body must be valid JSON") - } + if !json.Valid([]byte(c.ResponseBody)) { + return errors.New("response_body must be valid JSON") + } - return nil + return nil } From d17e5d0d3549aba50eddcf2faeecd64307fb1341 Mon Sep 17 00:00:00 2001 From: P1llus Date: Mon, 25 May 2020 13:31:01 +0200 Subject: [PATCH 27/27] updated changelog --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 27b1701f875e..34cd7d5a1c57 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -309,6 +309,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Improve ECS categorization field mappings in system module. {issue}16031[16031] {pull}18065[18065] - Change the `json.*` input settings implementation to merge parsed json objects with existing objects in the event instead of fully replacing them. {pull}17958[17958] - Improve ECS categorization field mappings in osquery module. {issue}16176[16176] {pull}17881[17881] +- Added http_endpoint input{pull}18298[18298] *Heartbeat*