Skip to content

Commit

Permalink
Merge pull request elastic#880 from michalpristas/multiple-endpoints
Browse files Browse the repository at this point in the history
Multiple endpoints
  • Loading branch information
michalpristas committed Dec 7, 2021
1 parent 02a9752 commit 0379b9d
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 60 deletions.
131 changes: 75 additions & 56 deletions cmd/fleet/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,83 +39,102 @@ func diagConn(c net.Conn, s http.ConnState) {
}

func runServer(ctx context.Context, router http.Handler, cfg *config.Server) error {
addr := cfg.BindAddress()
listeners := cfg.BindEndpoints()
rdto := cfg.Timeouts.Read
wrto := cfg.Timeouts.Write
idle := cfg.Timeouts.Idle
rdhr := cfg.Timeouts.ReadHeader
mhbz := cfg.Limits.MaxHeaderByteSize
bctx := func(net.Listener) context.Context { return ctx }

log.Info().
Str("bind", addr).
Dur("rdTimeout", rdto).
Dur("wrTimeout", wrto).
Msg("server listening")

server := http.Server{
Addr: addr,
ReadTimeout: rdto,
WriteTimeout: wrto,
IdleTimeout: idle,
ReadHeaderTimeout: rdhr,
Handler: router,
BaseContext: bctx,
ConnState: diagConn,
MaxHeaderBytes: mhbz,
ErrorLog: errLogger(),
}
errChan := make(chan error)
cancelCtx, cancel := context.WithCancel(ctx)
defer cancel()

forceCh := make(chan struct{})
defer close(forceCh)

// handler to close server
go func() {
select {
case <-ctx.Done():
log.Debug().Msg("force server close on ctx.Done()")
server.Close()
case <-forceCh:
log.Debug().Msg("go routine forced closed on exit")
for _, addr := range listeners {
log.Info().
Str("bind", addr).
Dur("rdTimeout", rdto).
Dur("wrTimeout", wrto).
Msg("server listening")

server := http.Server{
Addr: addr,
ReadTimeout: rdto,
WriteTimeout: wrto,
IdleTimeout: idle,
ReadHeaderTimeout: rdhr,
Handler: router,
BaseContext: bctx,
ConnState: diagConn,
MaxHeaderBytes: mhbz,
ErrorLog: errLogger(),
}
}()

var listenCfg net.ListenConfig

ln, err := listenCfg.Listen(ctx, "tcp", addr)
if err != nil {
return err
}
forceCh := make(chan struct{})
defer close(forceCh)

// Bind the deferred Close() to the stack variable to handle case where 'ln' is wrapped
defer func() { ln.Close() }()
// handler to close server
go func() {
select {
case <-ctx.Done():
log.Debug().Msg("force server close on ctx.Done()")
server.Close()
case <-forceCh:
log.Debug().Msg("go routine forced closed on exit")
}
}()

// Conn Limiter must be before the TLS handshake in the stack;
// The server should not eat the cost of the handshake if there
// is no capacity to service the connection.
// Also, it appears the HTTP2 implementation depends on the tls.Listener
// being at the top of the stack.
ln = wrapConnLimitter(ctx, ln, cfg)
var listenCfg net.ListenConfig

if cfg.TLS != nil && cfg.TLS.IsEnabled() {
commonTlsCfg, err := tlscommon.LoadTLSServerConfig(cfg.TLS)
ln, err := listenCfg.Listen(ctx, "tcp", addr)
if err != nil {
return err
}
server.TLSConfig = commonTlsCfg.ToConfig()

// Must enable http/2 in the configuration explicitly.
// (see https://golang.org/pkg/net/http/#Server.Serve)
server.TLSConfig.NextProtos = []string{"h2", "http/1.1"}
// Bind the deferred Close() to the stack variable to handle case where 'ln' is wrapped
defer func() { ln.Close() }()

ln = tls.NewListener(ln, server.TLSConfig)
// Conn Limiter must be before the TLS handshake in the stack;
// The server should not eat the cost of the handshake if there
// is no capacity to service the connection.
// Also, it appears the HTTP2 implementation depends on the tls.Listener
// being at the top of the stack.
ln = wrapConnLimitter(ctx, ln, cfg)

if cfg.TLS != nil && cfg.TLS.IsEnabled() {
commonTlsCfg, err := tlscommon.LoadTLSServerConfig(cfg.TLS)
if err != nil {
return err
}
server.TLSConfig = commonTlsCfg.ToConfig()

// Must enable http/2 in the configuration explicitly.
// (see https://golang.org/pkg/net/http/#Server.Serve)
server.TLSConfig.NextProtos = []string{"h2", "http/1.1"}

ln = tls.NewListener(ln, server.TLSConfig)

} else {
log.Warn().Msg("Exposed over insecure HTTP; enablement of TLS is strongly recommended")
}

log.Debug().Msgf("Listening on %s", addr)

go func(ctx context.Context, errChan chan error, ln net.Listener) {
if err := server.Serve(ln); err != nil && err != http.ErrServerClosed {
errChan <- err
}
}(cancelCtx, errChan, ln)

} else {
log.Warn().Msg("exposed over insecure HTTP; enablement of TLS is strongly recommended")
}

if err := server.Serve(ln); err != nil && err != http.ErrServerClosed {
return err
select {
case err := <-errChan:
if err != context.Canceled {
return err
}
case <-cancelCtx.Done():
}

return nil
Expand Down
5 changes: 3 additions & 2 deletions internal/pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,9 @@ func TestConfig(t *testing.T) {
{
Type: "fleet-server",
Server: Server{
Host: "localhost",
Port: 8888,
Host: "localhost",
Port: 8888,
InternalPort: 8221,
Timeouts: ServerTimeouts{
Read: 20 * time.Second,
ReadHeader: 5 * time.Second,
Expand Down
33 changes: 31 additions & 2 deletions internal/pkg/config/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (

const kDefaultHost = "0.0.0.0"
const kDefaultPort = 8220
const kDefaultInternalHost = "localhost"
const kDefaultInternalPort = 8221

// Policy is the configuration policy to use.
type Policy struct {
Expand Down Expand Up @@ -57,6 +59,7 @@ func (c *ServerBulk) InitDefaults() {
type Server struct {
Host string `config:"host"`
Port uint16 `config:"port"`
InternalPort uint16 `config:"internal_port"`
TLS *tlscommon.ServerConfig `config:"ssl"`
Timeouts ServerTimeouts `config:"timeouts"`
Profiler ServerProfiler `config:"profiler"`
Expand All @@ -73,6 +76,7 @@ type Server struct {
func (c *Server) InitDefaults() {
c.Host = kDefaultHost
c.Port = kDefaultPort
c.InternalPort = kDefaultInternalPort
c.Timeouts.InitDefaults()
c.CompressionLevel = flate.BestSpeed
c.CompressionThresh = 1024
Expand All @@ -83,13 +87,38 @@ func (c *Server) InitDefaults() {
c.GC.InitDefaults()
}

// BindEndpoints returns the binding address for the all HTTP server listeners.
func (c *Server) BindEndpoints() []string {
primaryAddress := c.BindAddress()
endpoints := make([]string, 0, 2)
endpoints = append(endpoints, primaryAddress)

if internalAddress := c.BindInternalAddress(); internalAddress != "" && internalAddress != ":0" && internalAddress != primaryAddress {
endpoints = append(endpoints, internalAddress)
}

return endpoints
}

// BindAddress returns the binding address for the HTTP server.
func (c *Server) BindAddress() string {
host := c.Host
return bindAddress(c.Host, c.Port)
}

// BindInternalAddress returns the binding address for the internal HTTP server.
func (c *Server) BindInternalAddress() string {
if c.InternalPort <= 0 {
return bindAddress(kDefaultInternalHost, kDefaultInternalPort)
}

return bindAddress(kDefaultInternalHost, c.InternalPort)
}

func bindAddress(host string, port uint16) string {
if strings.Count(host, ":") > 1 && strings.Count(host, "]") == 0 {
host = "[" + host + "]"
}
return fmt.Sprintf("%s:%d", host, c.Port)
return fmt.Sprintf("%s:%d", host, port)
}

// Input is the input defined by Agent to run Fleet Server.
Expand Down

0 comments on commit 0379b9d

Please sign in to comment.