From 0379b9dbfedcc0412c4df9db710d2f04b5bec4f2 Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Tue, 23 Nov 2021 09:12:56 +0100 Subject: [PATCH] Merge pull request #880 from michalpristas/multiple-endpoints Multiple endpoints --- cmd/fleet/server.go | 131 +++++++++++++++++------------ internal/pkg/config/config_test.go | 5 +- internal/pkg/config/input.go | 33 +++++++- 3 files changed, 109 insertions(+), 60 deletions(-) diff --git a/cmd/fleet/server.go b/cmd/fleet/server.go index a28e2b1d4..971534c56 100644 --- a/cmd/fleet/server.go +++ b/cmd/fleet/server.go @@ -39,7 +39,7 @@ func diagConn(c net.Conn, s http.ConnState) { } func runServer(ctx context.Context, router http.Handler, cfg *config.Server) error { - addr := cfg.BindAddress() + listeners := cfg.BindEndpoints() rdto := cfg.Timeouts.Read wrto := cfg.Timeouts.Write idle := cfg.Timeouts.Idle @@ -47,75 +47,94 @@ func runServer(ctx context.Context, router http.Handler, cfg *config.Server) err mhbz := cfg.Limits.MaxHeaderByteSize bctx := func(net.Listener) context.Context { return ctx } - log.Info(). - Str("bind", addr). - Dur("rdTimeout", rdto). - Dur("wrTimeout", wrto). - Msg("server listening") - - server := http.Server{ - Addr: addr, - ReadTimeout: rdto, - WriteTimeout: wrto, - IdleTimeout: idle, - ReadHeaderTimeout: rdhr, - Handler: router, - BaseContext: bctx, - ConnState: diagConn, - MaxHeaderBytes: mhbz, - ErrorLog: errLogger(), - } + errChan := make(chan error) + cancelCtx, cancel := context.WithCancel(ctx) + defer cancel() - forceCh := make(chan struct{}) - defer close(forceCh) - - // handler to close server - go func() { - select { - case <-ctx.Done(): - log.Debug().Msg("force server close on ctx.Done()") - server.Close() - case <-forceCh: - log.Debug().Msg("go routine forced closed on exit") + for _, addr := range listeners { + log.Info(). + Str("bind", addr). + Dur("rdTimeout", rdto). + Dur("wrTimeout", wrto). + Msg("server listening") + + server := http.Server{ + Addr: addr, + ReadTimeout: rdto, + WriteTimeout: wrto, + IdleTimeout: idle, + ReadHeaderTimeout: rdhr, + Handler: router, + BaseContext: bctx, + ConnState: diagConn, + MaxHeaderBytes: mhbz, + ErrorLog: errLogger(), } - }() - var listenCfg net.ListenConfig - - ln, err := listenCfg.Listen(ctx, "tcp", addr) - if err != nil { - return err - } + forceCh := make(chan struct{}) + defer close(forceCh) - // Bind the deferred Close() to the stack variable to handle case where 'ln' is wrapped - defer func() { ln.Close() }() + // handler to close server + go func() { + select { + case <-ctx.Done(): + log.Debug().Msg("force server close on ctx.Done()") + server.Close() + case <-forceCh: + log.Debug().Msg("go routine forced closed on exit") + } + }() - // Conn Limiter must be before the TLS handshake in the stack; - // The server should not eat the cost of the handshake if there - // is no capacity to service the connection. - // Also, it appears the HTTP2 implementation depends on the tls.Listener - // being at the top of the stack. - ln = wrapConnLimitter(ctx, ln, cfg) + var listenCfg net.ListenConfig - if cfg.TLS != nil && cfg.TLS.IsEnabled() { - commonTlsCfg, err := tlscommon.LoadTLSServerConfig(cfg.TLS) + ln, err := listenCfg.Listen(ctx, "tcp", addr) if err != nil { return err } - server.TLSConfig = commonTlsCfg.ToConfig() - // Must enable http/2 in the configuration explicitly. - // (see https://golang.org/pkg/net/http/#Server.Serve) - server.TLSConfig.NextProtos = []string{"h2", "http/1.1"} + // Bind the deferred Close() to the stack variable to handle case where 'ln' is wrapped + defer func() { ln.Close() }() - ln = tls.NewListener(ln, server.TLSConfig) + // Conn Limiter must be before the TLS handshake in the stack; + // The server should not eat the cost of the handshake if there + // is no capacity to service the connection. + // Also, it appears the HTTP2 implementation depends on the tls.Listener + // being at the top of the stack. + ln = wrapConnLimitter(ctx, ln, cfg) + + if cfg.TLS != nil && cfg.TLS.IsEnabled() { + commonTlsCfg, err := tlscommon.LoadTLSServerConfig(cfg.TLS) + if err != nil { + return err + } + server.TLSConfig = commonTlsCfg.ToConfig() + + // Must enable http/2 in the configuration explicitly. + // (see https://golang.org/pkg/net/http/#Server.Serve) + server.TLSConfig.NextProtos = []string{"h2", "http/1.1"} + + ln = tls.NewListener(ln, server.TLSConfig) + + } else { + log.Warn().Msg("Exposed over insecure HTTP; enablement of TLS is strongly recommended") + } + + log.Debug().Msgf("Listening on %s", addr) + + go func(ctx context.Context, errChan chan error, ln net.Listener) { + if err := server.Serve(ln); err != nil && err != http.ErrServerClosed { + errChan <- err + } + }(cancelCtx, errChan, ln) - } else { - log.Warn().Msg("exposed over insecure HTTP; enablement of TLS is strongly recommended") } - if err := server.Serve(ln); err != nil && err != http.ErrServerClosed { - return err + select { + case err := <-errChan: + if err != context.Canceled { + return err + } + case <-cancelCtx.Done(): } return nil diff --git a/internal/pkg/config/config_test.go b/internal/pkg/config/config_test.go index fda3bbe0b..6603fa067 100644 --- a/internal/pkg/config/config_test.go +++ b/internal/pkg/config/config_test.go @@ -103,8 +103,9 @@ func TestConfig(t *testing.T) { { Type: "fleet-server", Server: Server{ - Host: "localhost", - Port: 8888, + Host: "localhost", + Port: 8888, + InternalPort: 8221, Timeouts: ServerTimeouts{ Read: 20 * time.Second, ReadHeader: 5 * time.Second, diff --git a/internal/pkg/config/input.go b/internal/pkg/config/input.go index f3a296ad6..e225d66af 100644 --- a/internal/pkg/config/input.go +++ b/internal/pkg/config/input.go @@ -15,6 +15,8 @@ import ( const kDefaultHost = "0.0.0.0" const kDefaultPort = 8220 +const kDefaultInternalHost = "localhost" +const kDefaultInternalPort = 8221 // Policy is the configuration policy to use. type Policy struct { @@ -57,6 +59,7 @@ func (c *ServerBulk) InitDefaults() { type Server struct { Host string `config:"host"` Port uint16 `config:"port"` + InternalPort uint16 `config:"internal_port"` TLS *tlscommon.ServerConfig `config:"ssl"` Timeouts ServerTimeouts `config:"timeouts"` Profiler ServerProfiler `config:"profiler"` @@ -73,6 +76,7 @@ type Server struct { func (c *Server) InitDefaults() { c.Host = kDefaultHost c.Port = kDefaultPort + c.InternalPort = kDefaultInternalPort c.Timeouts.InitDefaults() c.CompressionLevel = flate.BestSpeed c.CompressionThresh = 1024 @@ -83,13 +87,38 @@ func (c *Server) InitDefaults() { c.GC.InitDefaults() } +// BindEndpoints returns the binding address for the all HTTP server listeners. +func (c *Server) BindEndpoints() []string { + primaryAddress := c.BindAddress() + endpoints := make([]string, 0, 2) + endpoints = append(endpoints, primaryAddress) + + if internalAddress := c.BindInternalAddress(); internalAddress != "" && internalAddress != ":0" && internalAddress != primaryAddress { + endpoints = append(endpoints, internalAddress) + } + + return endpoints +} + // BindAddress returns the binding address for the HTTP server. func (c *Server) BindAddress() string { - host := c.Host + return bindAddress(c.Host, c.Port) +} + +// BindInternalAddress returns the binding address for the internal HTTP server. +func (c *Server) BindInternalAddress() string { + if c.InternalPort <= 0 { + return bindAddress(kDefaultInternalHost, kDefaultInternalPort) + } + + return bindAddress(kDefaultInternalHost, c.InternalPort) +} + +func bindAddress(host string, port uint16) string { if strings.Count(host, ":") > 1 && strings.Count(host, "]") == 0 { host = "[" + host + "]" } - return fmt.Sprintf("%s:%d", host, c.Port) + return fmt.Sprintf("%s:%d", host, port) } // Input is the input defined by Agent to run Fleet Server.