From 12c7d5d1019723726653f48630078d9697b704b0 Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Thu, 11 Nov 2021 12:30:57 +0100 Subject: [PATCH 1/7] Allow multiple endpoints where server is exposed at --- cmd/fleet/server.go | 118 ++++++++++++++++++----------------- internal/pkg/config/input.go | 38 +++++++++-- main.go | 2 +- 3 files changed, 95 insertions(+), 63 deletions(-) diff --git a/cmd/fleet/server.go b/cmd/fleet/server.go index 98ad61630..305ed87db 100644 --- a/cmd/fleet/server.go +++ b/cmd/fleet/server.go @@ -40,8 +40,8 @@ func diagConn(c net.Conn, s http.ConnState) { } func runServer(ctx context.Context, router *httprouter.Router, cfg *config.Server) error { + listeners := cfg.BindEndpoints() - addr := cfg.BindAddress() rdto := cfg.Timeouts.Read wrto := cfg.Timeouts.Write idle := cfg.Timeouts.Idle @@ -49,75 +49,77 @@ func runServer(ctx context.Context, router *httprouter.Router, cfg *config.Serve mhbz := cfg.Limits.MaxHeaderByteSize bctx := func(net.Listener) context.Context { return ctx } - log.Info(). - Str("bind", addr). - Dur("rdTimeout", rdto). - Dur("wrTimeout", wrto). - Msg("server listening") - - server := http.Server{ - Addr: addr, - ReadTimeout: rdto, - WriteTimeout: wrto, - IdleTimeout: idle, - ReadHeaderTimeout: rdhr, - Handler: router, - BaseContext: bctx, - ConnState: diagConn, - MaxHeaderBytes: mhbz, - ErrorLog: errLogger(), - } - - forceCh := make(chan struct{}) - defer close(forceCh) - - // handler to close server - go func() { - select { - case <-ctx.Done(): - log.Debug().Msg("force server close on ctx.Done()") - server.Close() - case <-forceCh: - log.Debug().Msg("go routine forced closed on exit") + for _, addr := range listeners { + log.Info(). + Str("bind", addr). + Dur("rdTimeout", rdto). + Dur("wrTimeout", wrto). + Msg("server listening") + + server := http.Server{ + Addr: addr, + ReadTimeout: rdto, + WriteTimeout: wrto, + IdleTimeout: idle, + ReadHeaderTimeout: rdhr, + Handler: router, + BaseContext: bctx, + ConnState: diagConn, + MaxHeaderBytes: mhbz, + ErrorLog: errLogger(), } - }() - var listenCfg net.ListenConfig + forceCh := make(chan struct{}) + defer close(forceCh) - ln, err := listenCfg.Listen(ctx, "tcp", addr) - if err != nil { - return err - } + // handler to close server + go func() { + select { + case <-ctx.Done(): + log.Debug().Msg("force server close on ctx.Done()") + server.Close() + case <-forceCh: + log.Debug().Msg("go routine forced closed on exit") + } + }() - // Bind the deferred Close() to the stack variable to handle case where 'ln' is wrapped - defer func() { ln.Close() }() + var listenCfg net.ListenConfig - // Conn Limiter must be before the TLS handshake in the stack; - // The server should not eat the cost of the handshake if there - // is no capacity to service the connection. - // Also, it appears the HTTP2 implementation depends on the tls.Listener - // being at the top of the stack. - ln = wrapConnLimitter(ctx, ln, cfg) - - if cfg.TLS != nil && cfg.TLS.IsEnabled() { - commonTlsCfg, err := tlscommon.LoadTLSServerConfig(cfg.TLS) + ln, err := listenCfg.Listen(ctx, "tcp", addr) if err != nil { return err } - server.TLSConfig = commonTlsCfg.ToConfig() - // Must enable http/2 in the configuration explicitly. - // (see https://golang.org/pkg/net/http/#Server.Serve) - server.TLSConfig.NextProtos = []string{"h2", "http/1.1"} + // Bind the deferred Close() to the stack variable to handle case where 'ln' is wrapped + defer func() { ln.Close() }() - ln = tls.NewListener(ln, server.TLSConfig) + // Conn Limiter must be before the TLS handshake in the stack; + // The server should not eat the cost of the handshake if there + // is no capacity to service the connection. + // Also, it appears the HTTP2 implementation depends on the tls.Listener + // being at the top of the stack. + ln = wrapConnLimitter(ctx, ln, cfg) - } else { - log.Warn().Msg("exposed over insecure HTTP; enablement of TLS is strongly recommended") - } + if cfg.TLS != nil && cfg.TLS.IsEnabled() { + commonTlsCfg, err := tlscommon.LoadTLSServerConfig(cfg.TLS) + if err != nil { + return err + } + server.TLSConfig = commonTlsCfg.ToConfig() - if err := server.Serve(ln); err != nil && err != http.ErrServerClosed { - return err + // Must enable http/2 in the configuration explicitly. + // (see https://golang.org/pkg/net/http/#Server.Serve) + server.TLSConfig.NextProtos = []string{"h2", "http/1.1"} + + ln = tls.NewListener(ln, server.TLSConfig) + + } else { + log.Warn().Msg("exposed over insecure HTTP; enablement of TLS is strongly recommended") + } + + if err := server.Serve(ln); err != nil && err != http.ErrServerClosed { + return err + } } return nil diff --git a/internal/pkg/config/input.go b/internal/pkg/config/input.go index b4238fb3e..0a38769ed 100644 --- a/internal/pkg/config/input.go +++ b/internal/pkg/config/input.go @@ -53,10 +53,16 @@ func (c *ServerBulk) InitDefaults() { c.FlushMaxPending = 8 } +type Endpoint struct { + Host string `config:"host"` + Port uint16 `config:"port"` +} + // Server is the configuration for the server type Server struct { - Host string `config:"host"` - Port uint16 `config:"port"` + Host string `config:"host"` // TODO: deprecated use Endpoints + Port uint16 `config:"port"` // TODO: deprecated use Endpoints + Endpoints []Endpoint `config:"endpoints"` TLS *tlscommon.ServerConfig `config:"ssl"` Timeouts ServerTimeouts `config:"timeouts"` Profiler ServerProfiler `config:"profiler"` @@ -80,13 +86,37 @@ func (c *Server) InitDefaults() { c.Bulk.InitDefaults() } +// BindEndpoints returns the binding address for the all HTTP server listeners. +func (c *Server) BindEndpoints() []string { + endpoints := map[string]bool{ + c.BindAddress(): true, + } + + // add each of the endpoints to collection, + for _, ep := range c.Endpoints { + e := bindAddress(ep.Host, ep.Port) + endpoints[e] = true + } + + // we need to get rid of duplicates so we dont have port collision + uniqueEndpoints := make([]string, 0, len(endpoints)) + for ep := range endpoints { + uniqueEndpoints = append(uniqueEndpoints, ep) + } + + return uniqueEndpoints +} + // BindAddress returns the binding address for the HTTP server. func (c *Server) BindAddress() string { - host := c.Host + return bindAddress(c.Host, c.Port) +} + +func bindAddress(host string, port uint16) string { if strings.Count(host, ":") > 1 && strings.Count(host, "]") == 0 { host = "[" + host + "]" } - return fmt.Sprintf("%s:%d", host, c.Port) + return fmt.Sprintf("%s:%d", host, port) } // Input is the input defined by Agent to run Fleet Server. diff --git a/main.go b/main.go index c72cf78b2..27330d702 100644 --- a/main.go +++ b/main.go @@ -17,7 +17,7 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/build" ) -const defaultVersion = "8.1.0" +const defaultVersion = "7.16.0" var ( Version string = defaultVersion From 7bc02bed3b5d7fce3a72a07f23e13d6df9aa7ca6 Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Thu, 11 Nov 2021 14:02:55 +0100 Subject: [PATCH 2/7] internal --- internal/pkg/config/input.go | 36 +++++++++++++++++++----------------- main.go | 2 +- 2 files changed, 20 insertions(+), 18 deletions(-) diff --git a/internal/pkg/config/input.go b/internal/pkg/config/input.go index 0a38769ed..71502339c 100644 --- a/internal/pkg/config/input.go +++ b/internal/pkg/config/input.go @@ -60,9 +60,10 @@ type Endpoint struct { // Server is the configuration for the server type Server struct { - Host string `config:"host"` // TODO: deprecated use Endpoints - Port uint16 `config:"port"` // TODO: deprecated use Endpoints - Endpoints []Endpoint `config:"endpoints"` + Host string `config:"host"` + Port uint16 `config:"port"` + InternalHost string `config:"internal_host"` + InternalPort uint16 `config:"internal_port"` TLS *tlscommon.ServerConfig `config:"ssl"` Timeouts ServerTimeouts `config:"timeouts"` Profiler ServerProfiler `config:"profiler"` @@ -88,23 +89,15 @@ func (c *Server) InitDefaults() { // BindEndpoints returns the binding address for the all HTTP server listeners. func (c *Server) BindEndpoints() []string { - endpoints := map[string]bool{ - c.BindAddress(): true, - } - - // add each of the endpoints to collection, - for _, ep := range c.Endpoints { - e := bindAddress(ep.Host, ep.Port) - endpoints[e] = true - } + primaryAddress := c.BindAddress() + endpoints := make([]string, 0, 2) + endpoints = append(endpoints, primaryAddress) - // we need to get rid of duplicates so we dont have port collision - uniqueEndpoints := make([]string, 0, len(endpoints)) - for ep := range endpoints { - uniqueEndpoints = append(uniqueEndpoints, ep) + if internalAddress := c.BindInternalAddress(); internalAddress != "" && internalAddress != primaryAddress { + endpoints = append(endpoints, internalAddress) } - return uniqueEndpoints + return endpoints } // BindAddress returns the binding address for the HTTP server. @@ -112,6 +105,15 @@ func (c *Server) BindAddress() string { return bindAddress(c.Host, c.Port) } +// BindInternalAddress returns the binding address for the internal HTTP server. +func (c *Server) BindInternalAddress() string { + if c.InternalPort <= 0 { + return "" + } + + return bindAddress(c.InternalHost, c.InternalPort) +} + func bindAddress(host string, port uint16) string { if strings.Count(host, ":") > 1 && strings.Count(host, "]") == 0 { host = "[" + host + "]" diff --git a/main.go b/main.go index 27330d702..169b8d7c2 100644 --- a/main.go +++ b/main.go @@ -17,7 +17,7 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/build" ) -const defaultVersion = "7.16.0" +const defaultVersion = "7.15.0" var ( Version string = defaultVersion From 8ba494dfdde2c76be3822c61d9f58724a3a1e6c2 Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Mon, 15 Nov 2021 11:18:17 +0100 Subject: [PATCH 3/7] added logic for internal endpoint --- cmd/fleet/server.go | 19 ++++++++++++++++++- internal/pkg/config/input.go | 3 ++- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/cmd/fleet/server.go b/cmd/fleet/server.go index 305ed87db..338da3d21 100644 --- a/cmd/fleet/server.go +++ b/cmd/fleet/server.go @@ -49,6 +49,10 @@ func runServer(ctx context.Context, router *httprouter.Router, cfg *config.Serve mhbz := cfg.Limits.MaxHeaderByteSize bctx := func(net.Listener) context.Context { return ctx } + errChan := make(chan error) + cancelCtx, cancel := context.WithCancel(ctx) + defer cancel() + for _, addr := range listeners { log.Info(). Str("bind", addr). @@ -117,9 +121,22 @@ func runServer(ctx context.Context, router *httprouter.Router, cfg *config.Serve log.Warn().Msg("exposed over insecure HTTP; enablement of TLS is strongly recommended") } - if err := server.Serve(ln); err != nil && err != http.ErrServerClosed { + log.Debug().Msgf("Listening on %s", addr) + + go func(ctx context.Context, errChan chan error, ln net.Listener) { + if err := server.Serve(ln); err != nil && err != http.ErrServerClosed { + errChan <- err + } + }(cancelCtx, errChan, ln) + + } + + select { + case err := <-errChan: + if err != context.Canceled { return err } + case <-cancelCtx.Done(): } return nil diff --git a/internal/pkg/config/input.go b/internal/pkg/config/input.go index 71502339c..aee6493f6 100644 --- a/internal/pkg/config/input.go +++ b/internal/pkg/config/input.go @@ -93,7 +93,8 @@ func (c *Server) BindEndpoints() []string { endpoints := make([]string, 0, 2) endpoints = append(endpoints, primaryAddress) - if internalAddress := c.BindInternalAddress(); internalAddress != "" && internalAddress != primaryAddress { + if internalAddress := c.BindInternalAddress(); internalAddress != "" && internalAddress != ":0" && internalAddress != primaryAddress { + endpoints = append(endpoints, internalAddress) } From f2c7bfea208a816886a4c941f23703c59dadf39b Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Mon, 15 Nov 2021 11:21:33 +0100 Subject: [PATCH 4/7] version back to where it was --- main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main.go b/main.go index 169b8d7c2..c72cf78b2 100644 --- a/main.go +++ b/main.go @@ -17,7 +17,7 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/build" ) -const defaultVersion = "7.15.0" +const defaultVersion = "8.1.0" var ( Version string = defaultVersion From c7c504208505c49a46906135772fcb7b5d9729fc Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Thu, 18 Nov 2021 17:03:13 +0100 Subject: [PATCH 5/7] internal on localhost --- internal/pkg/config/input.go | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/internal/pkg/config/input.go b/internal/pkg/config/input.go index aee6493f6..7124e430b 100644 --- a/internal/pkg/config/input.go +++ b/internal/pkg/config/input.go @@ -15,6 +15,8 @@ import ( const kDefaultHost = "0.0.0.0" const kDefaultPort = 8220 +const kDefaultInternalHost = "localhost" +const kDefaultInternalPort = 8221 // Policy is the configuration policy to use. type Policy struct { @@ -53,16 +55,10 @@ func (c *ServerBulk) InitDefaults() { c.FlushMaxPending = 8 } -type Endpoint struct { - Host string `config:"host"` - Port uint16 `config:"port"` -} - // Server is the configuration for the server type Server struct { Host string `config:"host"` Port uint16 `config:"port"` - InternalHost string `config:"internal_host"` InternalPort uint16 `config:"internal_port"` TLS *tlscommon.ServerConfig `config:"ssl"` Timeouts ServerTimeouts `config:"timeouts"` @@ -78,6 +74,7 @@ type Server struct { func (c *Server) InitDefaults() { c.Host = kDefaultHost c.Port = kDefaultPort + c.InternalPort = kDefaultInternalPort c.Timeouts.InitDefaults() c.CompressionLevel = flate.BestSpeed c.CompressionThresh = 1024 @@ -94,7 +91,6 @@ func (c *Server) BindEndpoints() []string { endpoints = append(endpoints, primaryAddress) if internalAddress := c.BindInternalAddress(); internalAddress != "" && internalAddress != ":0" && internalAddress != primaryAddress { - endpoints = append(endpoints, internalAddress) } @@ -109,10 +105,10 @@ func (c *Server) BindAddress() string { // BindInternalAddress returns the binding address for the internal HTTP server. func (c *Server) BindInternalAddress() string { if c.InternalPort <= 0 { - return "" + return bindAddress(kDefaultInternalHost, kDefaultInternalPort) } - return bindAddress(c.InternalHost, c.InternalPort) + return bindAddress(kDefaultInternalHost, c.InternalPort) } func bindAddress(host string, port uint16) string { From 505380fdc33961e6581e6aa5e692d982b9070905 Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Fri, 19 Nov 2021 09:13:32 +0100 Subject: [PATCH 6/7] Update cmd/fleet/server.go Co-authored-by: Michel Laterman <82832767+michel-laterman@users.noreply.github.com> --- cmd/fleet/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/fleet/server.go b/cmd/fleet/server.go index 338da3d21..c9f2e680b 100644 --- a/cmd/fleet/server.go +++ b/cmd/fleet/server.go @@ -118,7 +118,7 @@ func runServer(ctx context.Context, router *httprouter.Router, cfg *config.Serve ln = tls.NewListener(ln, server.TLSConfig) } else { - log.Warn().Msg("exposed over insecure HTTP; enablement of TLS is strongly recommended") + log.Warn().Msg("Exposed over insecure HTTP; enablement of TLS is strongly recommended") } log.Debug().Msgf("Listening on %s", addr) From 63888ba343cba964bb2f4cc9c8b14f2c711cdc0e Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Fri, 19 Nov 2021 09:37:43 +0100 Subject: [PATCH 7/7] unit tests --- internal/pkg/config/config_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/internal/pkg/config/config_test.go b/internal/pkg/config/config_test.go index bfffcd9da..4c7b5c1c9 100644 --- a/internal/pkg/config/config_test.go +++ b/internal/pkg/config/config_test.go @@ -103,8 +103,9 @@ func TestConfig(t *testing.T) { { Type: "fleet-server", Server: Server{ - Host: "localhost", - Port: 8888, + Host: "localhost", + Port: 8888, + InternalPort: 8221, Timeouts: ServerTimeouts{ Read: 20 * time.Second, ReadHeader: 5 * time.Second,