From dc49dcedc954bfd36e81e07055907d6dea09b5d6 Mon Sep 17 00:00:00 2001 From: Steffen Siering Date: Tue, 14 Jul 2020 15:26:25 +0200 Subject: [PATCH] Move http_endpoint input to v2 input API (#19815) --- x-pack/filebeat/include/list.go | 1 - .../filebeat/input/default-inputs/inputs.go | 4 +- x-pack/filebeat/input/http_endpoint/config.go | 6 + .../filebeat/input/http_endpoint/handler.go | 109 +++++++ .../input/http_endpoint/httpserver.go | 77 ----- x-pack/filebeat/input/http_endpoint/input.go | 284 +++++------------- .../filebeat/input/http_endpoint/validate.go | 45 +++ .../tests/system/test_http_endpoint.py | 21 +- 8 files changed, 241 insertions(+), 306 deletions(-) create mode 100644 x-pack/filebeat/input/http_endpoint/handler.go delete mode 100644 x-pack/filebeat/input/http_endpoint/httpserver.go create mode 100644 x-pack/filebeat/input/http_endpoint/validate.go diff --git a/x-pack/filebeat/include/list.go b/x-pack/filebeat/include/list.go index 1e2831bb5999..fbc3c8ca7c6d 100644 --- a/x-pack/filebeat/include/list.go +++ b/x-pack/filebeat/include/list.go @@ -11,7 +11,6 @@ import ( _ "github.com/elastic/beats/v7/x-pack/filebeat/input/awscloudwatch" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/azureeventhub" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/googlepubsub" - _ "github.com/elastic/beats/v7/x-pack/filebeat/input/http_endpoint" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/s3" diff --git a/x-pack/filebeat/input/default-inputs/inputs.go b/x-pack/filebeat/input/default-inputs/inputs.go index afac3c2e61cf..da27367a109c 100644 --- a/x-pack/filebeat/input/default-inputs/inputs.go +++ b/x-pack/filebeat/input/default-inputs/inputs.go @@ -11,6 +11,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/x-pack/filebeat/input/cloudfoundry" + "github.com/elastic/beats/v7/x-pack/filebeat/input/http_endpoint" "github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit" ) @@ -23,7 +24,8 @@ func Init(info beat.Info, log *logp.Logger, store beater.StateStore) []v2.Plugin func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2.Plugin { return []v2.Plugin{ - o365audit.Plugin(log, store), cloudfoundry.Plugin(), + http_endpoint.Plugin(), + o365audit.Plugin(log, store), } } diff --git a/x-pack/filebeat/input/http_endpoint/config.go b/x-pack/filebeat/input/http_endpoint/config.go index 0626f5e2afd8..41e97489cecc 100644 --- a/x-pack/filebeat/input/http_endpoint/config.go +++ b/x-pack/filebeat/input/http_endpoint/config.go @@ -44,5 +44,11 @@ func (c *config) Validate() error { return errors.New("response_body must be valid JSON") } + if c.BasicAuth { + if c.Username == "" || c.Password == "" { + return errors.New("Username and password required when basicauth is enabled") + } + } + return nil } diff --git a/x-pack/filebeat/input/http_endpoint/handler.go b/x-pack/filebeat/input/http_endpoint/handler.go new file mode 100644 index 000000000000..ff31a08e9bd6 --- /dev/null +++ b/x-pack/filebeat/input/http_endpoint/handler.go @@ -0,0 +1,109 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package http_endpoint + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "io" + "io/ioutil" + "net/http" + "time" + + stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" +) + +type httpHandler struct { + log *logp.Logger + publisher stateless.Publisher + + messageField string + responseCode int + responseBody string +} + +var errBodyEmpty = errors.New("Body cannot be empty") +var errUnsupportedType = errors.New("Only JSON objects are accepted") + +// Triggers if middleware validation returns successful +func (h *httpHandler) apiResponse(w http.ResponseWriter, r *http.Request) { + obj, status, err := httpReadJsonObject(r.Body) + if err != nil { + w.Header().Add("Content-Type", "application/json") + sendErrorResponse(w, status, err) + return + } + + h.publishEvent(obj) + w.Header().Add("Content-Type", "application/json") + h.sendResponse(w, h.responseCode, h.responseBody) +} + +func (h *httpHandler) sendResponse(w http.ResponseWriter, status int, message string) { + w.WriteHeader(status) + io.WriteString(w, message) +} + +func (h *httpHandler) publishEvent(obj common.MapStr) { + event := beat.Event{ + Timestamp: time.Now().UTC(), + Fields: common.MapStr{ + h.messageField: obj, + }, + } + + h.publisher.Publish(event) +} + +func withValidator(v validator, handler http.HandlerFunc) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if status, err := v.ValidateHeader(r); status != 0 && err != nil { + sendErrorResponse(w, status, err) + } else { + handler(w, r) + } + } +} + +func sendErrorResponse(w http.ResponseWriter, status int, err error) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(status) + fmt.Fprintf(w, `{"message": %q}`, err.Error()) +} + +func httpReadJsonObject(body io.Reader) (obj common.MapStr, status int, err error) { + if body == http.NoBody { + return nil, http.StatusNotAcceptable, errBodyEmpty + } + + contents, err := ioutil.ReadAll(body) + if err != nil { + return nil, http.StatusInternalServerError, fmt.Errorf("failed reading body: %w", err) + } + + if !isObject(contents) { + return nil, http.StatusBadRequest, errUnsupportedType + } + + obj = common.MapStr{} + if err := json.Unmarshal(contents, &obj); err != nil { + return nil, http.StatusBadRequest, fmt.Errorf("Malformed JSON body: %w", err) + } + + return obj, 0, nil +} + +func isObject(b []byte) bool { + obj := bytes.TrimLeft(b, " \t\r\n") + if len(obj) > 0 && obj[0] == '{' { + return true + } + return false +} diff --git a/x-pack/filebeat/input/http_endpoint/httpserver.go b/x-pack/filebeat/input/http_endpoint/httpserver.go deleted file mode 100644 index 68325caaeb48..000000000000 --- a/x-pack/filebeat/input/http_endpoint/httpserver.go +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package http_endpoint - -import ( - "context" - "net/http" - "time" - - "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" - "github.com/elastic/beats/v7/libbeat/logp" -) - -type HttpServer struct { - log *logp.Logger - server *http.Server - ctx context.Context - stop context.CancelFunc -} - -func (h *HttpServer) Start() { - go func() { - if h.server.TLSConfig != nil { - h.log.Infof("Starting HTTPS server on %s", h.server.Addr) - //certificate is already loaded. That's why the parameters are empty - err := h.server.ListenAndServeTLS("", "") - if err != nil && err != http.ErrServerClosed { - h.log.Fatalf("Unable to start HTTPS server due to error: %v", err) - } - } else { - h.log.Infof("Starting HTTP server on %s", h.server.Addr) - err := h.server.ListenAndServe() - if err != nil && err != http.ErrServerClosed { - h.log.Fatalf("Unable to start HTTP server due to error: %v", err) - } - } - }() -} - -func (h *HttpServer) Stop() { - h.log.Info("Stopping HTTP server") - h.stop() - if err := h.server.Shutdown(h.ctx); err != nil { - h.log.Fatalf("Unable to stop HTTP server due to error: %v", err) - } -} - -func createServer(in *HttpEndpoint) (*HttpServer, error) { - mux := http.NewServeMux() - responseHandler := http.HandlerFunc(in.apiResponse) - mux.Handle(in.config.URL, in.validateRequest(responseHandler)) - server := &http.Server{ - Addr: in.config.ListenAddress + ":" + in.config.ListenPort, - Handler: mux, - } - - tlsConfig, err := tlscommon.LoadTLSServerConfig(in.config.TLS) - if err != nil { - return nil, err - } - - if tlsConfig != nil { - server.TLSConfig = tlsConfig.BuildModuleConfig(in.config.ListenAddress) - } - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - h := &HttpServer{ - ctx: ctx, - stop: cancel, - log: logp.NewLogger("http_server"), - } - h.server = server - - return h, nil -} diff --git a/x-pack/filebeat/input/http_endpoint/input.go b/x-pack/filebeat/input/http_endpoint/input.go index 555880fab6d3..c79fd2cba22b 100644 --- a/x-pack/filebeat/input/http_endpoint/input.go +++ b/x-pack/filebeat/input/http_endpoint/input.go @@ -5,259 +5,119 @@ package http_endpoint import ( - "bytes" - "context" - "encoding/json" + "crypto/tls" "fmt" - "io/ioutil" + "net" "net/http" - "sync" - "time" - "github.com/pkg/errors" - - "github.com/elastic/beats/v7/filebeat/channel" - "github.com/elastic/beats/v7/filebeat/input" - "github.com/elastic/beats/v7/libbeat/beat" + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" + "github.com/elastic/beats/v7/libbeat/feature" + "github.com/elastic/go-concert/ctxtool" ) const ( inputName = "http_endpoint" ) -func init() { - err := input.Register(inputName, NewInput) - if err != nil { - panic(errors.Wrapf(err, "failed to register %v input", inputName)) - } +type httpEndpoint struct { + config config + addr string + tlsConfig *tls.Config } -type HttpEndpoint struct { - config - log *logp.Logger - outlet channel.Outleter // Output of received messages. - inputCtx context.Context // Wraps the Done channel from parent input.Context. - - workerCtx context.Context // Worker goroutine context. It's cancelled when the input stops or the worker exits. - workerCancel context.CancelFunc // Used to signal that the worker should stop. - workerOnce sync.Once // Guarantees that the worker goroutine is only started once. - workerWg sync.WaitGroup // Waits on worker goroutine. - server *HttpServer // Server instance - eventObject *map[string]interface{} // Current event object - finalHandler http.HandlerFunc +func Plugin() v2.Plugin { + return v2.Plugin{ + Name: inputName, + Stability: feature.Beta, + Deprecated: false, + Manager: stateless.NewInputManager(configure), + } } -// NewInput creates a new httpjson input -func NewInput( - cfg *common.Config, - connector channel.Connector, - inputContext input.Context, -) (input.Input, error) { - // Extract and validate the input's configuration. +func configure(cfg *common.Config) (stateless.Input, error) { conf := defaultConfig() if err := cfg.Unpack(&conf); err != nil { return nil, err } - // Build outlet for events. - out, err := connector.Connect(cfg) - if err != nil { + return newHTTPEndpoint(conf) +} + +func newHTTPEndpoint(config config) (*httpEndpoint, error) { + if err := config.Validate(); err != nil { return nil, err } - // Wrap input.Context's Done channel with a context.Context. This goroutine - // stops with the parent closes the Done channel. - inputCtx, cancelInputCtx := context.WithCancel(context.Background()) - go func() { - defer cancelInputCtx() - select { - case <-inputContext.Done: - case <-inputCtx.Done(): - } - }() + addr := fmt.Sprintf("%v:%v", config.ListenAddress, config.ListenPort) - // If the input ever needs to be made restartable, then context would need - // to be recreated with each restart. - workerCtx, workerCancel := context.WithCancel(inputCtx) - - in := &HttpEndpoint{ - config: conf, - log: logp.NewLogger(inputName), - outlet: out, - inputCtx: inputCtx, - workerCtx: workerCtx, - workerCancel: workerCancel, - } - - // Create an instance of the HTTP server with the beat context - in.server, err = createServer(in) + var tlsConfig *tls.Config + tlsConfigBuilder, err := tlscommon.LoadTLSServerConfig(config.TLS) if err != nil { return nil, err } - - in.log.Infof("Initialized %v input on %v:%v", inputName, in.config.ListenAddress, in.config.ListenPort) - - return in, nil -} - -// Run starts the input worker then returns. Only the first invocation -// will ever start the worker. -func (in *HttpEndpoint) Run() { - in.workerOnce.Do(func() { - in.workerWg.Add(1) - go in.run() - }) -} - -func (in *HttpEndpoint) run() { - defer in.workerWg.Done() - defer in.log.Infof("%v worker has stopped.", inputName) - in.server.Start() -} - -// Stops HTTP input and waits for it to finish -func (in *HttpEndpoint) Stop() { - in.workerCancel() - in.workerWg.Wait() -} - -// Wait is an alias for Stop. -func (in *HttpEndpoint) Wait() { - in.Stop() -} - -// If middleware validation successed, event is sent -func (in *HttpEndpoint) sendEvent(w http.ResponseWriter, r *http.Request) { - event := in.outlet.OnEvent(beat.Event{ - Timestamp: time.Now().UTC(), - Fields: common.MapStr{ - in.config.Prefix: in.eventObject, - }, - }) - if !event { - in.sendResponse(w, http.StatusInternalServerError, in.createErrorMessage("Unable to send event")) + if tlsConfigBuilder != nil { + tlsConfig = tlsConfigBuilder.BuildModuleConfig(addr) } -} -// Triggers if middleware validation returns successful -func (in *HttpEndpoint) apiResponse(w http.ResponseWriter, r *http.Request) { - in.sendEvent(w, r) - w.Header().Add("Content-Type", "application/json") - in.sendResponse(w, uint(in.config.ResponseCode), in.config.ResponseBody) + return &httpEndpoint{ + config: config, + tlsConfig: tlsConfig, + addr: addr, + }, nil } -func (in *HttpEndpoint) sendResponse(w http.ResponseWriter, h uint, b string) { - w.WriteHeader(int(h)) - w.Write([]byte(b)) -} - -// Runs all validations for each request -func (in *HttpEndpoint) validateRequest(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if in.config.BasicAuth { - status, err := in.validateAuth(w, r) - if err != "" && status != 0 { - in.sendResponse(w, status, err) - return - } - } - - status, err := in.validateMethod(w, r) - if err != "" && status != 0 { - in.sendResponse(w, status, err) - return - } +func (*httpEndpoint) Name() string { return inputName } - status, err = in.validateHeader(w, r) - if err != "" && status != 0 { - in.sendResponse(w, status, err) - return - } - - status, err = in.validateBody(w, r) - if err != "" && status != 0 { - in.sendResponse(w, status, err) - return - } - - next.ServeHTTP(w, r) - }) -} - -// Validate that only supported Accept and Content type headers are used -func (in *HttpEndpoint) validateHeader(w http.ResponseWriter, r *http.Request) (uint, string) { - if r.Header.Get("Content-Type") != "application/json" { - return http.StatusUnsupportedMediaType, in.createErrorMessage("Wrong Content-Type header, expecting application/json") - } - - return 0, "" -} - -// Validate if headers are current and authentication is successful -func (in *HttpEndpoint) validateAuth(w http.ResponseWriter, r *http.Request) (uint, string) { - if in.config.Username == "" || in.config.Password == "" { - return http.StatusUnauthorized, in.createErrorMessage("Username and password required when basicauth is enabled") - } - - username, password, _ := r.BasicAuth() - if in.config.Username != username || in.config.Password != password { - return http.StatusUnauthorized, in.createErrorMessage("Incorrect username or password") - } - - return 0, "" -} - -// Validates that body is not empty, not a list of objects and valid JSON -func (in *HttpEndpoint) validateBody(w http.ResponseWriter, r *http.Request) (uint, string) { - if r.Body == http.NoBody { - return http.StatusNotAcceptable, in.createErrorMessage("Body cannot be empty") - } - - body, err := ioutil.ReadAll(r.Body) +func (e *httpEndpoint) Test(_ v2.TestContext) error { + l, err := net.Listen("tcp", e.addr) if err != nil { - return http.StatusInternalServerError, in.createErrorMessage("Unable to read body") + return err } + return l.Close() +} - isObject := in.isObjectOrList(body) - if isObject == "list" { - return http.StatusBadRequest, in.createErrorMessage("List of JSON objects is not supported") - } +func (e *httpEndpoint) Run(ctx v2.Context, publisher stateless.Publisher) error { + log := ctx.Logger.With("address", e.addr) - objmap := make(map[string]interface{}) - err = json.Unmarshal(body, &objmap) - if err != nil { - return http.StatusBadRequest, in.createErrorMessage("Malformed JSON body") + validator := &apiValidator{ + basicAuth: e.config.BasicAuth, + username: e.config.Username, + password: e.config.Password, + method: http.MethodPost, + contentType: "application/json", } - in.eventObject = &objmap - - return 0, "" -} - -// Ensure only valid HTTP Methods used -func (in *HttpEndpoint) validateMethod(w http.ResponseWriter, r *http.Request) (uint, string) { - if r.Method != http.MethodPost { - return http.StatusMethodNotAllowed, in.createErrorMessage("Only POST requests supported") + handler := &httpHandler{ + log: log, + publisher: publisher, + messageField: e.config.Prefix, + responseCode: e.config.ResponseCode, + responseBody: e.config.ResponseBody, } - return 0, "" -} - -func (in *HttpEndpoint) createErrorMessage(r string) string { - return fmt.Sprintf(`{"message": "%v"}`, r) -} + mux := http.NewServeMux() + mux.HandleFunc(e.config.URL, withValidator(validator, handler.apiResponse)) + server := &http.Server{Addr: e.addr, TLSConfig: e.tlsConfig, Handler: mux} + _, cancel := ctxtool.WithFunc(ctxtool.FromCanceller(ctx.Cancelation), func() { + server.Close() + }) + defer cancel() -func (in *HttpEndpoint) isObjectOrList(b []byte) string { - obj := bytes.TrimLeft(b, " \t\r\n") - if len(obj) > 0 && obj[0] == '{' { - return "object" + var err error + if server.TLSConfig != nil { + log.Infof("Starting HTTPS server on %s", server.Addr) + //certificate is already loaded. That's why the parameters are empty + err = server.ListenAndServeTLS("", "") + } else { + log.Infof("Starting HTTP server on %s", server.Addr) + err = server.ListenAndServe() } - if len(obj) > 0 && obj[0] == '[' { - return "list" + if err != nil && err != http.ErrServerClosed { + return fmt.Errorf("Unable to start server due to error: %w", err) } - - return "" + return nil } diff --git a/x-pack/filebeat/input/http_endpoint/validate.go b/x-pack/filebeat/input/http_endpoint/validate.go new file mode 100644 index 000000000000..86ce115f8025 --- /dev/null +++ b/x-pack/filebeat/input/http_endpoint/validate.go @@ -0,0 +1,45 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package http_endpoint + +import ( + "errors" + "fmt" + "net/http" +) + +type validator interface { + // ValidateHeader checks the HTTP headers for compliance. The body must not + // be touched. + ValidateHeader(*http.Request) (int, error) +} + +type apiValidator struct { + basicAuth bool + username, password string + method string + contentType string +} + +var errIncorrectUserOrPass = errors.New("Incorrect username or password") + +func (v *apiValidator) ValidateHeader(r *http.Request) (int, error) { + if v.basicAuth { + username, password, _ := r.BasicAuth() + if v.username != username || v.password != password { + return http.StatusUnauthorized, errIncorrectUserOrPass + } + } + + if v.method != "" && v.method != r.Method { + return http.StatusMethodNotAllowed, fmt.Errorf("Only %v requests supported", v.method) + } + + if v.contentType != "" && r.Header.Get("Content-Type") != v.contentType { + return http.StatusUnsupportedMediaType, fmt.Errorf("Wrong Content-Type header, expecting %v", v.contentType) + } + + return 0, nil +} diff --git a/x-pack/filebeat/tests/system/test_http_endpoint.py b/x-pack/filebeat/tests/system/test_http_endpoint.py index 8d0b863b70b9..89ac33032662 100644 --- a/x-pack/filebeat/tests/system/test_http_endpoint.py +++ b/x-pack/filebeat/tests/system/test_http_endpoint.py @@ -79,9 +79,9 @@ def test_http_endpoint_request(self): output = self.read_output() + assert r.text == '{"message": "success"}' assert output[0]["input.type"] == "http_endpoint" assert output[0]["json.{}".format(self.prefix)] == message - assert r.text == '{"message": "success"}' def test_http_endpoint_wrong_content_header(self): """ @@ -112,18 +112,8 @@ def test_http_endpoint_missing_auth_value(self): """ self.get_config(options) filebeat = self.start_beat() - self.wait_until(lambda: self.log_contains("Starting HTTP server on {}:{}".format(self.host, self.port))) - - message = "somerandommessage" - payload = {self.prefix: message} - headers = {"Content-Type": "application/json", "Accept": "application/json"} - r = requests.post(self.url, headers=headers, data=json.dumps( - payload), auth=HTTPBasicAuth('testuser', 'something')) - - filebeat.check_kill_and_wait() - - assert r.status_code == 401 - assert r.text == '{"message": "Username and password required when basicauth is enabled"}' + self.wait_until(lambda: self.log_contains("Username and password required when basicauth is enabled")) + filebeat.kill_and_wait() def test_http_endpoint_wrong_auth_value(self): """ @@ -178,8 +168,9 @@ def test_http_endpoint_malformed_json(self): filebeat.check_kill_and_wait() + print("response:", r.status_code, r.text) assert r.status_code == 400 - assert r.text == '{"message": "Malformed JSON body"}' + assert r.text.startswith('{"message": "Malformed JSON body:') def test_http_endpoint_get_request(self): """ @@ -193,7 +184,7 @@ def test_http_endpoint_get_request(self): payload = {self.prefix: message} headers = {"Content-Type": "application/json", "Accept": "application/json"} r = requests.get(self.url, headers=headers, data=json.dumps(payload)) - + print("response:", r.status_code, r.text) filebeat.check_kill_and_wait() assert r.status_code == 405