From b3581847c8c244adb453160fc3be39707d8182b8 Mon Sep 17 00:00:00 2001 From: Peter Deng Date: Tue, 17 Nov 2020 10:39:14 +0800 Subject: [PATCH 1/5] allow customized monitoring api to be added --- libbeat/api/routes.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/libbeat/api/routes.go b/libbeat/api/routes.go index 8cb35cfc23c..c353cada3e4 100644 --- a/libbeat/api/routes.go +++ b/libbeat/api/routes.go @@ -30,6 +30,8 @@ import ( type handlerFunc func(http.ResponseWriter, *http.Request) type lookupFunc func(string) *monitoring.Namespace +var handlerFuncMap = make(map[string]handlerFunc) + // NewWithDefaultRoutes creates a new server with default API routes. func NewWithDefaultRoutes(log *logp.Logger, config *common.Config, ns lookupFunc) (*Server, error) { mux := http.NewServeMux() @@ -38,6 +40,10 @@ func NewWithDefaultRoutes(log *logp.Logger, config *common.Config, ns lookupFunc mux.HandleFunc("/state", makeAPIHandler(ns("state"))) mux.HandleFunc("/stats", makeAPIHandler(ns("stats"))) mux.HandleFunc("/dataset", makeAPIHandler(ns("dataset"))) + + for api, h := range handlerFuncMap { + mux.HandleFunc(api, h) + } return New(log, mux, config) } @@ -73,3 +79,8 @@ func prettyPrint(w http.ResponseWriter, data common.MapStr, u *url.URL) { fmt.Fprintf(w, data.String()) } } + +// AddHandlerFunc provides interface to add customized handlerFunc +func AddHandlerFunc(api string, h handlerFunc) { + handlerFuncMap[api] = h +} From bfbed501d211ddd6f9dc9d7fc3746ed8224710b8 Mon Sep 17 00:00:00 2001 From: Peter Deng Date: Tue, 17 Nov 2020 11:32:36 +0800 Subject: [PATCH 2/5] update CHANGELOG.next.asciidoc --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index fba1bca3d95..aadd98cd81e 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -557,6 +557,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Added "add_network_direction" processor for determining perimeter-based network direction. {pull}23076[23076] - Added new `rate_limit` processor for enforcing rate limits on event throughput. {pull}22883[22883] - Allow node/namespace metadata to be disabled on kubernetes metagen and ensure add_kubernetes_metadata honors host {pull}23012[23012] +- Add support for customized monitoring API. {pull}22605[22605] *Auditbeat* From b1d3bc97d8b32541939dc3e03039f0b8381e1fca Mon Sep 17 00:00:00 2001 From: Peter Deng Date: Fri, 11 Dec 2020 13:58:06 +0800 Subject: [PATCH 3/5] support register/deregister routes dynamically and refactor test case --- libbeat/api/routes.go | 95 ++++++++++++++++++++++++++++++++------ libbeat/api/server_test.go | 59 +++++++++++++++++------ 2 files changed, 127 insertions(+), 27 deletions(-) diff --git a/libbeat/api/routes.go b/libbeat/api/routes.go index c353cada3e4..2aad3fa2363 100644 --- a/libbeat/api/routes.go +++ b/libbeat/api/routes.go @@ -21,6 +21,8 @@ import ( "fmt" "net/http" "net/url" + "strings" + "sync" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" @@ -30,21 +32,37 @@ import ( type handlerFunc func(http.ResponseWriter, *http.Request) type lookupFunc func(string) *monitoring.Namespace -var handlerFuncMap = make(map[string]handlerFunc) +var ( + routes = &Routes{ + routes: make(map[string]handlerFunc), + smux: http.NewServeMux(), + } +) + +func init() { + routes.smux.HandleFunc("/", routes.handle) +} // NewWithDefaultRoutes creates a new server with default API routes. func NewWithDefaultRoutes(log *logp.Logger, config *common.Config, ns lookupFunc) (*Server, error) { - mux := http.NewServeMux() - - mux.HandleFunc("/", makeRootAPIHandler(makeAPIHandler(ns("info")))) - mux.HandleFunc("/state", makeAPIHandler(ns("state"))) - mux.HandleFunc("/stats", makeAPIHandler(ns("stats"))) - mux.HandleFunc("/dataset", makeAPIHandler(ns("dataset"))) - - for api, h := range handlerFuncMap { - mux.HandleFunc(api, h) + defaultRoutes := map[string]handlerFunc{ + "/": makeRootAPIHandler(makeAPIHandler(ns("info"))), + "/state": makeAPIHandler(ns("state")), + "/stats": makeAPIHandler(ns("stats")), + "/dataset": makeAPIHandler(ns("dataset")), } - return New(log, mux, config) + for api, h := range defaultRoutes { + if err := routes.register(api, h); err != nil { + return nil, err + } + } + if log == nil { + log = logp.NewLogger("") + } + if routes.log == nil { + routes.log = log + } + return New(log, routes.smux, config) } func makeRootAPIHandler(handler handlerFunc) handlerFunc { @@ -80,7 +98,56 @@ func prettyPrint(w http.ResponseWriter, data common.MapStr, u *url.URL) { } } -// AddHandlerFunc provides interface to add customized handlerFunc -func AddHandlerFunc(api string, h handlerFunc) { - handlerFuncMap[api] = h +type Routes struct { + routes map[string]handlerFunc + log *logp.Logger + smux *http.ServeMux + mux sync.RWMutex +} + +func (d *Routes) handle(w http.ResponseWriter, r *http.Request) { + d.mux.RLock() + defer d.mux.RUnlock() + + if h, exist := d.routes[r.URL.Path]; exist { + h(w, r) + return + } + + http.NotFound(w, r) +} + +func (d *Routes) register(api string, h handlerFunc) error { + d.mux.Lock() + defer d.mux.Unlock() + if !strings.HasPrefix(api, "/") { + return fmt.Errorf("route should starts with /") + } + if _, exist := d.routes[api]; exist { + err := fmt.Errorf("route %s is already in use", api) + d.log.Error(err.Error()) + return err + } + d.routes[api] = h + return nil +} + +func (d *Routes) deregister(api string) error { + d.mux.Lock() + defer d.mux.Unlock() + if _, exist := d.routes[api]; !exist { + return fmt.Errorf("route %s is not registered", api) + } + delete(d.routes, api) + return nil +} + +// Register registers an API and its http handler +func Register(api string, h handlerFunc) error { + return routes.register(api, h) +} + +// Deregister deregisters an API +func Deregister(api string) error { + return routes.deregister(api) } diff --git a/libbeat/api/server_test.go b/libbeat/api/server_test.go index e4df0f5b4e3..dfa2e2c707f 100644 --- a/libbeat/api/server_test.go +++ b/libbeat/api/server_test.go @@ -27,6 +27,7 @@ import ( "runtime" "testing" + "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -98,15 +99,8 @@ func TestSocket(t *testing.T) { }() c := client(sockFile) + verifyApi("http://unix/echo-hello", c.Get, verifyBodyString(t, "ehlo!")) - r, err := c.Get("http://unix/echo-hello") - require.NoError(t, err) - defer r.Body.Close() - - body, err := ioutil.ReadAll(r.Body) - require.NoError(t, err) - - assert.Equal(t, "ehlo!", string(body)) fi, err := os.Stat(sockFile) assert.Equal(t, socketFileMode, fi.Mode().Perm()) }) @@ -167,14 +161,53 @@ func TestHTTP(t *testing.T) { go s.Start() defer s.Stop() - r, err := http.Get("http://" + s.l.Addr().String() + "/echo-hello") - require.NoError(t, err) - defer r.Body.Close() + verifyApi("http://"+s.l.Addr().String()+"/echo-hello", http.Get, verifyBodyString(t, "ehlo!")) +} + +func TestDynamicRoutes(t *testing.T) { + // select a random free port. + url := "http://localhost:0" - body, err := ioutil.ReadAll(r.Body) + cfg := common.MustNewConfigFrom(map[string]interface{}{ + "host": url, + }) + s, err := NewWithDefaultRoutes(nil, cfg, func(string) *monitoring.Namespace { + return &monitoring.Namespace{} + }) require.NoError(t, err) + go s.Start() + defer s.Stop() + + api := "/echo-hello" + uri := "http://" + s.l.Addr().String() + api + require.NoError(t, Register(api, simpleHandler)) + require.EqualError(t, Register(api, simpleHandler), fmt.Sprintf("route %s is already in use", api)) + verifyApi(uri, http.Get, verifyBodyString(t, "ehlo!")) + + require.NoError(t, Deregister(api)) + require.EqualError(t, Deregister(api), fmt.Sprintf("route %s is not registered", api)) + verifyApi(uri, http.Get, verifyBodyString(t, "404 page not found\n")) +} + +type verifyFunc func(resp *http.Response, err error) + +func verifyApi(url string, visit func(url string) (resp *http.Response, err error), verify verifyFunc) { + verify(visit(url)) +} + +func verifyBodyString(t *testing.T, s string) verifyFunc { + return func(r *http.Response, err error) { + require.NoError(t, err) + defer r.Body.Close() + + body, err := ioutil.ReadAll(r.Body) + require.NoError(t, err) + assert.Equal(t, s, string(body)) + } +} - assert.Equal(t, "ehlo!", string(body)) +func simpleHandler(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, "ehlo!") } func simpleMux() *http.ServeMux { From ebc290c24557060119aa36f997ef322a7a4274e7 Mon Sep 17 00:00:00 2001 From: Peter Deng Date: Wed, 16 Dec 2020 12:48:54 +0800 Subject: [PATCH 4/5] Revert "support register/deregister routes dynamically and refactor test case" This reverts commit 7d6142a75d06800cf4a6f03b93cbeacd6e10c467. --- libbeat/api/routes.go | 95 ++++++-------------------------------- libbeat/api/server_test.go | 59 ++++++----------------- 2 files changed, 27 insertions(+), 127 deletions(-) diff --git a/libbeat/api/routes.go b/libbeat/api/routes.go index 2aad3fa2363..c353cada3e4 100644 --- a/libbeat/api/routes.go +++ b/libbeat/api/routes.go @@ -21,8 +21,6 @@ import ( "fmt" "net/http" "net/url" - "strings" - "sync" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" @@ -32,37 +30,21 @@ import ( type handlerFunc func(http.ResponseWriter, *http.Request) type lookupFunc func(string) *monitoring.Namespace -var ( - routes = &Routes{ - routes: make(map[string]handlerFunc), - smux: http.NewServeMux(), - } -) - -func init() { - routes.smux.HandleFunc("/", routes.handle) -} +var handlerFuncMap = make(map[string]handlerFunc) // NewWithDefaultRoutes creates a new server with default API routes. func NewWithDefaultRoutes(log *logp.Logger, config *common.Config, ns lookupFunc) (*Server, error) { - defaultRoutes := map[string]handlerFunc{ - "/": makeRootAPIHandler(makeAPIHandler(ns("info"))), - "/state": makeAPIHandler(ns("state")), - "/stats": makeAPIHandler(ns("stats")), - "/dataset": makeAPIHandler(ns("dataset")), - } - for api, h := range defaultRoutes { - if err := routes.register(api, h); err != nil { - return nil, err - } - } - if log == nil { - log = logp.NewLogger("") - } - if routes.log == nil { - routes.log = log + mux := http.NewServeMux() + + mux.HandleFunc("/", makeRootAPIHandler(makeAPIHandler(ns("info")))) + mux.HandleFunc("/state", makeAPIHandler(ns("state"))) + mux.HandleFunc("/stats", makeAPIHandler(ns("stats"))) + mux.HandleFunc("/dataset", makeAPIHandler(ns("dataset"))) + + for api, h := range handlerFuncMap { + mux.HandleFunc(api, h) } - return New(log, routes.smux, config) + return New(log, mux, config) } func makeRootAPIHandler(handler handlerFunc) handlerFunc { @@ -98,56 +80,7 @@ func prettyPrint(w http.ResponseWriter, data common.MapStr, u *url.URL) { } } -type Routes struct { - routes map[string]handlerFunc - log *logp.Logger - smux *http.ServeMux - mux sync.RWMutex -} - -func (d *Routes) handle(w http.ResponseWriter, r *http.Request) { - d.mux.RLock() - defer d.mux.RUnlock() - - if h, exist := d.routes[r.URL.Path]; exist { - h(w, r) - return - } - - http.NotFound(w, r) -} - -func (d *Routes) register(api string, h handlerFunc) error { - d.mux.Lock() - defer d.mux.Unlock() - if !strings.HasPrefix(api, "/") { - return fmt.Errorf("route should starts with /") - } - if _, exist := d.routes[api]; exist { - err := fmt.Errorf("route %s is already in use", api) - d.log.Error(err.Error()) - return err - } - d.routes[api] = h - return nil -} - -func (d *Routes) deregister(api string) error { - d.mux.Lock() - defer d.mux.Unlock() - if _, exist := d.routes[api]; !exist { - return fmt.Errorf("route %s is not registered", api) - } - delete(d.routes, api) - return nil -} - -// Register registers an API and its http handler -func Register(api string, h handlerFunc) error { - return routes.register(api, h) -} - -// Deregister deregisters an API -func Deregister(api string) error { - return routes.deregister(api) +// AddHandlerFunc provides interface to add customized handlerFunc +func AddHandlerFunc(api string, h handlerFunc) { + handlerFuncMap[api] = h } diff --git a/libbeat/api/server_test.go b/libbeat/api/server_test.go index dfa2e2c707f..e4df0f5b4e3 100644 --- a/libbeat/api/server_test.go +++ b/libbeat/api/server_test.go @@ -27,7 +27,6 @@ import ( "runtime" "testing" - "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -99,8 +98,15 @@ func TestSocket(t *testing.T) { }() c := client(sockFile) - verifyApi("http://unix/echo-hello", c.Get, verifyBodyString(t, "ehlo!")) + r, err := c.Get("http://unix/echo-hello") + require.NoError(t, err) + defer r.Body.Close() + + body, err := ioutil.ReadAll(r.Body) + require.NoError(t, err) + + assert.Equal(t, "ehlo!", string(body)) fi, err := os.Stat(sockFile) assert.Equal(t, socketFileMode, fi.Mode().Perm()) }) @@ -161,53 +167,14 @@ func TestHTTP(t *testing.T) { go s.Start() defer s.Stop() - verifyApi("http://"+s.l.Addr().String()+"/echo-hello", http.Get, verifyBodyString(t, "ehlo!")) -} - -func TestDynamicRoutes(t *testing.T) { - // select a random free port. - url := "http://localhost:0" - - cfg := common.MustNewConfigFrom(map[string]interface{}{ - "host": url, - }) - s, err := NewWithDefaultRoutes(nil, cfg, func(string) *monitoring.Namespace { - return &monitoring.Namespace{} - }) + r, err := http.Get("http://" + s.l.Addr().String() + "/echo-hello") require.NoError(t, err) - go s.Start() - defer s.Stop() - - api := "/echo-hello" - uri := "http://" + s.l.Addr().String() + api - require.NoError(t, Register(api, simpleHandler)) - require.EqualError(t, Register(api, simpleHandler), fmt.Sprintf("route %s is already in use", api)) - verifyApi(uri, http.Get, verifyBodyString(t, "ehlo!")) - - require.NoError(t, Deregister(api)) - require.EqualError(t, Deregister(api), fmt.Sprintf("route %s is not registered", api)) - verifyApi(uri, http.Get, verifyBodyString(t, "404 page not found\n")) -} - -type verifyFunc func(resp *http.Response, err error) - -func verifyApi(url string, visit func(url string) (resp *http.Response, err error), verify verifyFunc) { - verify(visit(url)) -} + defer r.Body.Close() -func verifyBodyString(t *testing.T, s string) verifyFunc { - return func(r *http.Response, err error) { - require.NoError(t, err) - defer r.Body.Close() - - body, err := ioutil.ReadAll(r.Body) - require.NoError(t, err) - assert.Equal(t, s, string(body)) - } -} + body, err := ioutil.ReadAll(r.Body) + require.NoError(t, err) -func simpleHandler(w http.ResponseWriter, r *http.Request) { - fmt.Fprintf(w, "ehlo!") + assert.Equal(t, "ehlo!", string(body)) } func simpleMux() *http.ServeMux { From 8222f53e76095be288b2645b1b25bedf9c30fdd5 Mon Sep 17 00:00:00 2001 From: Peter Deng Date: Wed, 16 Dec 2020 12:56:29 +0800 Subject: [PATCH 5/5] check handler func existence before adding it --- libbeat/api/routes.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/libbeat/api/routes.go b/libbeat/api/routes.go index c353cada3e4..bc72347cf28 100644 --- a/libbeat/api/routes.go +++ b/libbeat/api/routes.go @@ -81,6 +81,10 @@ func prettyPrint(w http.ResponseWriter, data common.MapStr, u *url.URL) { } // AddHandlerFunc provides interface to add customized handlerFunc -func AddHandlerFunc(api string, h handlerFunc) { +func AddHandlerFunc(api string, h handlerFunc) error { + if _, exist := handlerFuncMap[api]; exist { + return fmt.Errorf("%s already exist", api) + } handlerFuncMap[api] = h + return nil }