Skip to content

Commit

Permalink
Merge branch 'master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
jpkrohling authored Jul 22, 2021
2 parents 5468b51 + f8a6b3c commit abbbcc7
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 18 deletions.
6 changes: 3 additions & 3 deletions cmd/agent/app/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (b *Builder) CreateAgent(primaryProxy CollectorProxy, logger *zap.Logger, m
if err != nil {
return nil, fmt.Errorf("cannot create processors: %w", err)
}
server := b.HTTPServer.getHTTPServer(primaryProxy.GetManager(), mFactory)
server := b.HTTPServer.getHTTPServer(primaryProxy.GetManager(), mFactory, logger)
b.publishOpts(mFactory)

return NewAgent(processors, server, logger), nil
Expand Down Expand Up @@ -170,11 +170,11 @@ func (b *Builder) getProcessors(rep reporter.Reporter, mFactory metrics.Factory,
}

// GetHTTPServer creates an HTTP server that provides sampling strategies and baggage restrictions to client libraries.
func (c HTTPServerConfiguration) getHTTPServer(manager configmanager.ClientConfigManager, mFactory metrics.Factory) *http.Server {
func (c HTTPServerConfiguration) getHTTPServer(manager configmanager.ClientConfigManager, mFactory metrics.Factory, logger *zap.Logger) *http.Server {
if c.HostPort == "" {
c.HostPort = defaultHTTPServerHostPort
}
return httpserver.NewHTTPServer(c.HostPort, manager, mFactory)
return httpserver.NewHTTPServer(c.HostPort, manager, mFactory, logger)
}

// GetThriftProcessor gets a TBufferedServer backed Processor using the collector configuration
Expand Down
11 changes: 9 additions & 2 deletions cmd/agent/app/httpserver/srv.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,27 @@ import (

"github.com/gorilla/mux"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/jaegertracing/jaeger/cmd/agent/app/configmanager"
"github.com/jaegertracing/jaeger/pkg/clientcfg/clientcfghttp"
)

// NewHTTPServer creates a new server that hosts an HTTP/JSON endpoint for clients
// to query for sampling strategies and baggage restrictions.
func NewHTTPServer(hostPort string, manager configmanager.ClientConfigManager, mFactory metrics.Factory) *http.Server {
func NewHTTPServer(hostPort string, manager configmanager.ClientConfigManager, mFactory metrics.Factory, logger *zap.Logger) *http.Server {
handler := clientcfghttp.NewHTTPHandler(clientcfghttp.HTTPHandlerParams{
ConfigManager: manager,
MetricsFactory: mFactory,
LegacySamplingEndpoint: true,
})
r := mux.NewRouter()
handler.RegisterRoutes(r)
return &http.Server{Addr: hostPort, Handler: r}
errorLog, _ := zap.NewStdLogAt(logger, zapcore.ErrorLevel)
return &http.Server{
Addr: hostPort,
Handler: r,
ErrorLog: errorLog,
}
}
3 changes: 2 additions & 1 deletion cmd/agent/app/httpserver/srv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"go.uber.org/zap"
)

func TestHTTPServer(t *testing.T) {
s := NewHTTPServer(":1", nil, nil)
s := NewHTTPServer(":1", nil, nil, zap.NewNop())
assert.NotNil(t, s)
}
7 changes: 6 additions & 1 deletion cmd/collector/app/server/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/gorilla/mux"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/jaegertracing/jaeger/cmd/collector/app/handler"
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore"
Expand All @@ -46,7 +47,11 @@ type HTTPServerParams struct {
func StartHTTPServer(params *HTTPServerParams) (*http.Server, error) {
params.Logger.Info("Starting jaeger-collector HTTP server", zap.String("http host-port", params.HostPort))

server := &http.Server{Addr: params.HostPort}
errorLog, _ := zap.NewStdLogAt(params.Logger, zapcore.ErrorLevel)
server := &http.Server{
Addr: params.HostPort,
ErrorLog: errorLog,
}
if params.TLSConfig.Enabled {
tlsCfg, err := params.TLSConfig.Config(params.Logger) // This checks if the certificates are correctly provided
if err != nil {
Expand Down
7 changes: 6 additions & 1 deletion cmd/collector/app/server/zipkin.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/rs/cors"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/jaegertracing/jaeger/cmd/collector/app/handler"
"github.com/jaegertracing/jaeger/cmd/collector/app/zipkin"
Expand Down Expand Up @@ -56,7 +57,11 @@ func StartZipkinServer(params *ZipkinServerParams) (*http.Server, error) {
return nil, err
}

server := &http.Server{Addr: params.HostPort}
errorLog, _ := zap.NewStdLogAt(params.Logger, zapcore.ErrorLevel)
server := &http.Server{
Addr: params.HostPort,
ErrorLog: errorLog,
}
serveZipkin(server, listener, params)

return server, nil
Expand Down
8 changes: 7 additions & 1 deletion cmd/flags/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/spf13/viper"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/jaegertracing/jaeger/pkg/healthcheck"
"github.com/jaegertracing/jaeger/pkg/recoveryhandler"
Expand Down Expand Up @@ -107,7 +108,12 @@ func (s *AdminServer) serveWithListener(l net.Listener) {
version.RegisterHandler(s.mux, s.logger)
s.registerPprofHandlers()
recoveryHandler := recoveryhandler.NewRecoveryHandler(s.logger, true)
s.server = &http.Server{Handler: recoveryHandler(s.mux)}
errorLog, _ := zap.NewStdLogAt(s.logger, zapcore.ErrorLevel)
s.server = &http.Server{
Handler: recoveryHandler(s.mux),
ErrorLog: errorLog,
}

s.logger.Info("Starting admin HTTP server", zap.String("http-addr", s.adminHostPort))
go func() {
switch err := s.server.Serve(l); err {
Expand Down
23 changes: 14 additions & 9 deletions cmd/query/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/soheilhy/cmux"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"

Expand All @@ -50,6 +51,7 @@ type Server struct {
conn net.Listener
grpcConn net.Listener
httpConn net.Listener
cmuxServer cmux.CMux
grpcServer *grpc.Server
httpServer *http.Server
separatePorts bool
Expand Down Expand Up @@ -78,7 +80,7 @@ func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, metricsQuery
return nil, err
}

httpServer, cancelFunc, err := createHTTPServer(querySvc, metricsQuerySvc, options, tracer, logger)
httpServer, closeGRPCGateway, err := createHTTPServer(querySvc, metricsQuerySvc, options, tracer, logger)
if err != nil {
return nil, err
}
Expand All @@ -92,7 +94,7 @@ func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, metricsQuery
httpServer: httpServer,
separatePorts: grpcPort != httpPort,
unavailableChannel: make(chan healthcheck.Status),
grpcGatewayCancel: cancelFunc,
grpcGatewayCancel: closeGRPCGateway,
}, nil
}

Expand Down Expand Up @@ -145,9 +147,9 @@ func createHTTPServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc.
r = r.PathPrefix(queryOpts.BasePath).Subrouter()
}

ctx, cancelFunc := context.WithCancel(context.Background())
ctx, closeGRPCGateway := context.WithCancel(context.Background())
if err := apiv3.RegisterGRPCGateway(ctx, logger, r, queryOpts.BasePath, queryOpts.GRPCHostPort, queryOpts.TLSGRPC); err != nil {
cancelFunc() // make go vet happy
closeGRPCGateway()
return nil, nil, err
}

Expand All @@ -161,20 +163,22 @@ func createHTTPServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc.
handler = handlers.CompressHandler(handler)
recoveryHandler := recoveryhandler.NewRecoveryHandler(logger, true)

errorLog, _ := zap.NewStdLogAt(logger, zapcore.ErrorLevel)
server := &http.Server{
Handler: recoveryHandler(handler),
Handler: recoveryHandler(handler),
ErrorLog: errorLog,
}

if queryOpts.TLSHTTP.Enabled {
tlsCfg, err := queryOpts.TLSHTTP.Config(logger) // This checks if the certificates are correctly provided
if err != nil {
cancelFunc()
closeGRPCGateway()
return nil, nil, err
}
server.TLSConfig = tlsCfg

}
return server, cancelFunc, nil
return server, closeGRPCGateway, nil
}

// initListener initialises listeners of the server
Expand Down Expand Up @@ -222,7 +226,6 @@ func (s *Server) initListener() (cmux.CMux, error) {
s.httpConn = cmuxServer.Match(cmux.Any())

return cmuxServer, nil

}

// Start http, GRPC and cmux servers concurrently
Expand All @@ -231,6 +234,7 @@ func (s *Server) Start() error {
if err != nil {
return err
}
s.cmuxServer = cmuxServer

var tcpPort int
if !s.separatePorts {
Expand Down Expand Up @@ -283,7 +287,7 @@ func (s *Server) Start() error {
s.logger.Info("Starting CMUX server", zap.Int("port", tcpPort), zap.String("addr", s.queryOptions.HTTPHostPort))

err := cmuxServer.Serve()
// TODO: Remove string comparison when https://github.com/soheilhy/cmux/pull/69 is merged
// TODO: find a way to avoid string comparison. Even though cmux has ErrServerClosed, it's not returned here.
if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
s.logger.Error("Could not start multiplexed server", zap.Error(err))
}
Expand All @@ -305,6 +309,7 @@ func (s *Server) Close() error {
s.httpConn.Close()
s.grpcConn.Close()
} else {
s.cmuxServer.Close()
s.conn.Close()
}
return nil
Expand Down

0 comments on commit abbbcc7

Please sign in to comment.