Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved graceful shutdown - Collector #2076

Merged
merged 1 commit into from
Feb 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 23 additions & 14 deletions cmd/collector/app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"go.uber.org/zap"
"google.golang.org/grpc"

"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore"
"github.com/jaegertracing/jaeger/cmd/collector/app/server"
"github.com/jaegertracing/jaeger/pkg/healthcheck"
Expand All @@ -41,6 +42,8 @@ type Collector struct {
spanWriter spanstore.Writer
strategyStore strategystore.StrategyStore
hCheck *healthcheck.HealthCheck
spanProcessor processor.SpanProcessor
spanHandlers *SpanHandlers

// state, read only
hServer *http.Server
Expand Down Expand Up @@ -79,57 +82,59 @@ func (c *Collector) Start(builderOpts *CollectorOptions) error {
Logger: c.logger,
MetricsFactory: c.metricsFactory,
}
zipkinSpansHandler, jaegerBatchesHandler, grpcHandler := handlerBuilder.BuildHandlers()

c.spanProcessor = handlerBuilder.BuildSpanProcessor()
c.spanHandlers = handlerBuilder.BuildHandlers(c.spanProcessor)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a factory? zipkinSpansHandler, jaegerBatchesHandler, grpcHandler can implement Factory.CreateHandler()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, probably as part of another PR.

recoveryHandler := recoveryhandler.NewRecoveryHandler(c.logger, true)

if tchServer, err := server.StartThriftServer(&server.ThriftServerParams{
ServiceName: c.serviceName,
Port: builderOpts.CollectorPort,
JaegerBatchesHandler: jaegerBatchesHandler,
ZipkinSpansHandler: zipkinSpansHandler,
JaegerBatchesHandler: c.spanHandlers.JaegerBatchesHandler,
ZipkinSpansHandler: c.spanHandlers.ZipkinSpansHandler,
StrategyStore: c.strategyStore,
Logger: c.logger,
}); err != nil {
c.logger.Fatal("Could not start Thrift collector", zap.Error(err))
c.logger.Fatal("could not start Thrift collector", zap.Error(err))
} else {
c.tchServer = tchServer
}

if grpcServer, err := server.StartGRPCServer(&server.GRPCServerParams{
Port: builderOpts.CollectorGRPCPort,
Handler: grpcHandler,
Handler: c.spanHandlers.GRPCHandler,
TLSConfig: builderOpts.TLS,
SamplingStore: c.strategyStore,
Logger: c.logger,
}); err != nil {
c.logger.Fatal("Could not start gRPC collector", zap.Error(err))
c.logger.Fatal("could not start gRPC collector", zap.Error(err))
} else {
c.grpcServer = grpcServer
}

if httpServer, err := server.StartHTTPServer(&server.HTTPServerParams{
Port: builderOpts.CollectorHTTPPort,
Handler: jaegerBatchesHandler,
Handler: c.spanHandlers.JaegerBatchesHandler,
RecoveryHandler: recoveryHandler,
HealthCheck: c.hCheck,
MetricsFactory: c.metricsFactory,
SamplingStore: c.strategyStore,
Logger: c.logger,
}); err != nil {
c.logger.Fatal("Could not start the HTTP server", zap.Error(err))
c.logger.Fatal("could not start the HTTP server", zap.Error(err))
} else {
c.hServer = httpServer
}

if zkServer, err := server.StartZipkinServer(&server.ZipkinServerParams{
Port: builderOpts.CollectorZipkinHTTPPort,
Handler: zipkinSpansHandler,
Handler: c.spanHandlers.ZipkinSpansHandler,
RecoveryHandler: recoveryHandler,
AllowedHeaders: builderOpts.CollectorZipkinAllowedHeaders,
AllowedOrigins: builderOpts.CollectorZipkinAllowedOrigins,
Logger: c.logger,
}); err != nil {
c.logger.Fatal("Could not start the Zipkin server", zap.Error(err))
c.logger.Fatal("could not start the Zipkin server", zap.Error(err))
} else {
c.zkServer = zkServer
}
Expand All @@ -154,7 +159,7 @@ func (c *Collector) Close() error {
timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
err := c.hServer.Shutdown(timeout)
if err != nil {
c.logger.Error("Failed to stop the main HTTP server", zap.Error(err))
c.logger.Error("failed to stop the main HTTP server", zap.Error(err))
}
defer cancel()
}
Expand All @@ -164,17 +169,21 @@ func (c *Collector) Close() error {
timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
err := c.zkServer.Shutdown(timeout)
if err != nil {
c.logger.Error("Failed to stop the Zipkin server", zap.Error(err))
c.logger.Error("failed to stop the Zipkin server", zap.Error(err))
}
defer cancel()
}

// by now, we shouldn't have any in-flight requests anymore
if err := c.spanProcessor.Close(); err != nil {
c.logger.Error("failed to close span processor.", zap.Error(err))
}

// the span processor is closed
if c.spanWriter != nil {
if closer, ok := c.spanWriter.(io.Closer); ok {
err := closer.Close() // SpanWriter
if err != nil {
c.logger.Error("Failed to close span writer", zap.Error(err))
c.logger.Error("failed to close span writer", zap.Error(err))
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions cmd/collector/app/handler/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ func (p *mockSpanProcessor) reset() {
p.spans = nil
}

func (p *mockSpanProcessor) Close() error {
return nil
}

func initializeGRPCTestServer(t *testing.T, beforeServe func(s *grpc.Server)) (*grpc.Server, net.Addr) {
server := grpc.NewServer()
beforeServe(server)
Expand Down
3 changes: 3 additions & 0 deletions cmd/collector/app/handler/thrift_span_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ func (s *shouldIErrorProcessor) ProcessSpans(mSpans []*model.Span, _ processor.S
}
return retMe, nil
}
func (s *shouldIErrorProcessor) Close() error {
return nil
}

func TestZipkinSpanHandler(t *testing.T) {
testChunks := []struct {
Expand Down
3 changes: 3 additions & 0 deletions cmd/collector/app/processor/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package processor

import (
"io"

"github.com/jaegertracing/jaeger/model"
)

Expand All @@ -28,6 +30,7 @@ type SpansOptions struct {
type SpanProcessor interface {
// ProcessSpans processes model spans and return with either a list of true/false success or an error
ProcessSpans(mSpans []*model.Span, options SpansOptions) ([]bool, error)
io.Closer
}

// InboundTransport identifies the transport used to receive spans.
Expand Down
4 changes: 4 additions & 0 deletions cmd/collector/app/server/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ func (s mockSamplingStore) GetSamplingStrategy(serviceName string) (*sampling.Sa
type mockSpanProcessor struct {
}

func (p *mockSpanProcessor) Close() error {
return nil
}

func (p *mockSpanProcessor) ProcessSpans(spans []*model.Span, _ processor.SpansOptions) ([]bool, error) {
return []bool{}, nil
}
28 changes: 19 additions & 9 deletions cmd/collector/app/span_handler_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/collector/app/handler"
"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
zs "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer/zipkin"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/storage/spanstore"
Expand All @@ -35,17 +36,20 @@ type SpanHandlerBuilder struct {
MetricsFactory metrics.Factory
}

// BuildHandlers builds span handlers (Zipkin, Jaeger)
func (b *SpanHandlerBuilder) BuildHandlers() (
handler.ZipkinSpansHandler,
handler.JaegerBatchesHandler,
*handler.GRPCHandler,
) {
// SpanHandlers holds instances to the span handlers built by the SpanHandlerBuilder
type SpanHandlers struct {
ZipkinSpansHandler handler.ZipkinSpansHandler
JaegerBatchesHandler handler.JaegerBatchesHandler
GRPCHandler *handler.GRPCHandler
}

// BuildSpanProcessor builds the span processor to be used with the handlers
func (b *SpanHandlerBuilder) BuildSpanProcessor() processor.SpanProcessor {
hostname, _ := os.Hostname()
svcMetrics := b.metricsFactory()
hostMetrics := svcMetrics.Namespace(metrics.NSOptions{Tags: map[string]string{"host": hostname}})

spanProcessor := NewSpanProcessor(
return NewSpanProcessor(
b.SpanWriter,
Options.ServiceMetrics(svcMetrics),
Options.HostMetrics(hostMetrics),
Expand All @@ -58,9 +62,15 @@ func (b *SpanHandlerBuilder) BuildHandlers() (
Options.DynQueueSizeMemory(b.CollectorOpts.DynQueueSizeMemory),
)

return handler.NewZipkinSpanHandler(b.Logger, spanProcessor, zs.NewChainedSanitizer(zs.StandardSanitizers...)),
}

// BuildHandlers builds span handlers (Zipkin, Jaeger)
func (b *SpanHandlerBuilder) BuildHandlers(spanProcessor processor.SpanProcessor) *SpanHandlers {
return &SpanHandlers{
handler.NewZipkinSpanHandler(b.Logger, spanProcessor, zs.NewChainedSanitizer(zs.StandardSanitizers...)),
handler.NewJaegerSpanHandler(b.Logger, spanProcessor),
handler.NewGRPCHandler(b.Logger, spanProcessor)
handler.NewGRPCHandler(b.Logger, spanProcessor),
}
}

func defaultSpanFilter(*model.Span) bool {
Expand Down
10 changes: 6 additions & 4 deletions cmd/collector/app/span_handler_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,12 @@ func TestNewSpanHandlerBuilder(t *testing.T) {
MetricsFactory: metrics.NullFactory,
}

zipkin, jaeger, grpc := builder.BuildHandlers()
assert.NotNil(t, zipkin)
assert.NotNil(t, jaeger)
assert.NotNil(t, grpc)
spanProcessor := builder.BuildSpanProcessor()
spanHandlers := builder.BuildHandlers(spanProcessor)
assert.NotNil(t, spanHandlers.ZipkinSpansHandler)
assert.NotNil(t, spanHandlers.JaegerBatchesHandler)
assert.NotNil(t, spanHandlers.GRPCHandler)
assert.NotNil(t, spanProcessor)
}

func TestDefaultSpanFilter(t *testing.T) {
Expand Down
5 changes: 3 additions & 2 deletions cmd/collector/app/span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,11 @@ func newSpanProcessor(spanWriter spanstore.Writer, opts ...Option) *spanProcesso
return &sp
}

// Stop halts the span processor and all its go-routines.
func (sp *spanProcessor) Stop() {
func (sp *spanProcessor) Close() error {
close(sp.stopCh)
sp.queue.Stop()

return nil
}

func (sp *spanProcessor) saveSpan(span *model.Span) {
Expand Down
20 changes: 12 additions & 8 deletions cmd/collector/app/span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,12 @@ import (
zc "github.com/jaegertracing/jaeger/thrift-gen/zipkincore"
)

var _ (io.Closer) = (*fakeSpanWriter)(nil)
var blackListedService = "zoidberg"
var (
_ io.Closer = (*fakeSpanWriter)(nil)
_ io.Closer = (*spanProcessor)(nil)

blackListedService = "zoidberg"
)

func TestBySvcMetrics(t *testing.T) {
allowedService := "bender"
Expand Down Expand Up @@ -215,7 +219,6 @@ func makeJaegerSpan(service string, rootSpan bool, debugEnabled bool) (*jaeger.S
func TestSpanProcessor(t *testing.T) {
w := &fakeSpanWriter{}
p := NewSpanProcessor(w, Options.QueueSize(1)).(*spanProcessor)
defer p.Stop()

res, err := p.ProcessSpans([]*model.Span{
{
Expand All @@ -226,6 +229,7 @@ func TestSpanProcessor(t *testing.T) {
}, processor.SpansOptions{SpanFormat: processor.JaegerSpanFormat})
assert.NoError(t, err)
assert.Equal(t, []bool{true}, res)
assert.NoError(t, p.Close())
}

func TestSpanProcessorErrors(t *testing.T) {
Expand All @@ -251,7 +255,7 @@ func TestSpanProcessorErrors(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, []bool{true}, res)

p.Stop()
assert.NoError(t, p.Close())

assert.Equal(t, map[string]string{
"level": "error",
Expand Down Expand Up @@ -282,7 +286,7 @@ func TestSpanProcessorBusy(t *testing.T) {
Options.QueueSize(1),
Options.ReportBusy(true),
).(*spanProcessor)
defer p.Stop()
defer assert.NoError(t, p.Close())

// block the writer so that the first span is read from the queue and blocks the processor,
// and eiher the second or the third span is rejected since the queue capacity is just 1.
Expand Down Expand Up @@ -317,7 +321,7 @@ func TestSpanProcessorWithNilProcess(t *testing.T) {

w := &fakeSpanWriter{}
p := NewSpanProcessor(w, Options.ServiceMetrics(serviceMetrics)).(*spanProcessor)
defer p.Stop()
defer assert.NoError(t, p.Close())

p.saveSpan(&model.Span{})

Expand All @@ -335,7 +339,7 @@ func TestSpanProcessorWithCollectorTags(t *testing.T) {

w := &fakeSpanWriter{}
p := NewSpanProcessor(w, Options.CollectorTags(testCollectorTags)).(*spanProcessor)
defer p.Stop()
defer assert.NoError(t, p.Close())

span := &model.Span{
Process: model.NewProcess("unit-test-service", []model.KeyValue{}),
Expand Down Expand Up @@ -363,7 +367,6 @@ func TestSpanProcessorCountSpan(t *testing.T) {
w := &fakeSpanWriter{}
p := NewSpanProcessor(w, Options.HostMetrics(m), Options.DynQueueSizeMemory(1000)).(*spanProcessor)
p.background(10*time.Millisecond, p.updateGauges)
defer p.Stop()

p.processSpan(&model.Span{})
assert.NotEqual(t, uint64(0), p.bytesProcessed)
Expand All @@ -378,6 +381,7 @@ func TestSpanProcessorCountSpan(t *testing.T) {
}

assert.Fail(t, "gauge hasn't been updated within a reasonable amount of time")
assert.NoError(t, p.Close())
}

func TestUpdateDynQueueSize(t *testing.T) {
Expand Down