From f68643490956508cf836d93546ecf013fd98b86a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juraci=20Paix=C3=A3o=20Kr=C3=B6hling?= Date: Wed, 12 Feb 2020 11:29:04 +0100 Subject: [PATCH] Add support for graceful shutdown of collector. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Juraci Paixão Kröhling --- cmd/collector/app/collector.go | 37 ++++++++++++------- .../app/handler/grpc_handler_test.go | 4 ++ .../app/handler/thrift_span_handler_test.go | 3 ++ cmd/collector/app/processor/span.go | 3 ++ cmd/collector/app/server/test.go | 4 ++ cmd/collector/app/span_handler_builder.go | 28 +++++++++----- .../app/span_handler_builder_test.go | 10 +++-- cmd/collector/app/span_processor.go | 5 ++- cmd/collector/app/span_processor_test.go | 20 ++++++---- 9 files changed, 77 insertions(+), 37 deletions(-) diff --git a/cmd/collector/app/collector.go b/cmd/collector/app/collector.go index 59ab4372d4b..bc8f3664781 100644 --- a/cmd/collector/app/collector.go +++ b/cmd/collector/app/collector.go @@ -25,6 +25,7 @@ import ( "go.uber.org/zap" "google.golang.org/grpc" + "github.com/jaegertracing/jaeger/cmd/collector/app/processor" "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore" "github.com/jaegertracing/jaeger/cmd/collector/app/server" "github.com/jaegertracing/jaeger/pkg/healthcheck" @@ -41,6 +42,8 @@ type Collector struct { spanWriter spanstore.Writer strategyStore strategystore.StrategyStore hCheck *healthcheck.HealthCheck + spanProcessor processor.SpanProcessor + spanHandlers *SpanHandlers // state, read only hServer *http.Server @@ -79,57 +82,59 @@ func (c *Collector) Start(builderOpts *CollectorOptions) error { Logger: c.logger, MetricsFactory: c.metricsFactory, } - zipkinSpansHandler, jaegerBatchesHandler, grpcHandler := handlerBuilder.BuildHandlers() + + c.spanProcessor = handlerBuilder.BuildSpanProcessor() + c.spanHandlers = handlerBuilder.BuildHandlers(c.spanProcessor) recoveryHandler := recoveryhandler.NewRecoveryHandler(c.logger, true) if tchServer, err := server.StartThriftServer(&server.ThriftServerParams{ ServiceName: c.serviceName, Port: builderOpts.CollectorPort, - JaegerBatchesHandler: jaegerBatchesHandler, - ZipkinSpansHandler: zipkinSpansHandler, + JaegerBatchesHandler: c.spanHandlers.JaegerBatchesHandler, + ZipkinSpansHandler: c.spanHandlers.ZipkinSpansHandler, StrategyStore: c.strategyStore, Logger: c.logger, }); err != nil { - c.logger.Fatal("Could not start Thrift collector", zap.Error(err)) + c.logger.Fatal("could not start Thrift collector", zap.Error(err)) } else { c.tchServer = tchServer } if grpcServer, err := server.StartGRPCServer(&server.GRPCServerParams{ Port: builderOpts.CollectorGRPCPort, - Handler: grpcHandler, + Handler: c.spanHandlers.GRPCHandler, TLSConfig: builderOpts.TLS, SamplingStore: c.strategyStore, Logger: c.logger, }); err != nil { - c.logger.Fatal("Could not start gRPC collector", zap.Error(err)) + c.logger.Fatal("could not start gRPC collector", zap.Error(err)) } else { c.grpcServer = grpcServer } if httpServer, err := server.StartHTTPServer(&server.HTTPServerParams{ Port: builderOpts.CollectorHTTPPort, - Handler: jaegerBatchesHandler, + Handler: c.spanHandlers.JaegerBatchesHandler, RecoveryHandler: recoveryHandler, HealthCheck: c.hCheck, MetricsFactory: c.metricsFactory, SamplingStore: c.strategyStore, Logger: c.logger, }); err != nil { - c.logger.Fatal("Could not start the HTTP server", zap.Error(err)) + c.logger.Fatal("could not start the HTTP server", zap.Error(err)) } else { c.hServer = httpServer } if zkServer, err := server.StartZipkinServer(&server.ZipkinServerParams{ Port: builderOpts.CollectorZipkinHTTPPort, - Handler: zipkinSpansHandler, + Handler: c.spanHandlers.ZipkinSpansHandler, RecoveryHandler: recoveryHandler, AllowedHeaders: builderOpts.CollectorZipkinAllowedHeaders, AllowedOrigins: builderOpts.CollectorZipkinAllowedOrigins, Logger: c.logger, }); err != nil { - c.logger.Fatal("Could not start the Zipkin server", zap.Error(err)) + c.logger.Fatal("could not start the Zipkin server", zap.Error(err)) } else { c.zkServer = zkServer } @@ -154,7 +159,7 @@ func (c *Collector) Close() error { timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second) err := c.hServer.Shutdown(timeout) if err != nil { - c.logger.Error("Failed to stop the main HTTP server", zap.Error(err)) + c.logger.Error("failed to stop the main HTTP server", zap.Error(err)) } defer cancel() } @@ -164,17 +169,21 @@ func (c *Collector) Close() error { timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second) err := c.zkServer.Shutdown(timeout) if err != nil { - c.logger.Error("Failed to stop the Zipkin server", zap.Error(err)) + c.logger.Error("failed to stop the Zipkin server", zap.Error(err)) } defer cancel() } - // by now, we shouldn't have any in-flight requests anymore + if err := c.spanProcessor.Close(); err != nil { + c.logger.Error("failed to close span processor.", zap.Error(err)) + } + + // the span processor is closed if c.spanWriter != nil { if closer, ok := c.spanWriter.(io.Closer); ok { err := closer.Close() // SpanWriter if err != nil { - c.logger.Error("Failed to close span writer", zap.Error(err)) + c.logger.Error("failed to close span writer", zap.Error(err)) } } } diff --git a/cmd/collector/app/handler/grpc_handler_test.go b/cmd/collector/app/handler/grpc_handler_test.go index f969fbc4b3b..83a3dcb0100 100644 --- a/cmd/collector/app/handler/grpc_handler_test.go +++ b/cmd/collector/app/handler/grpc_handler_test.go @@ -57,6 +57,10 @@ func (p *mockSpanProcessor) reset() { p.spans = nil } +func (p *mockSpanProcessor) Close() error { + return nil +} + func initializeGRPCTestServer(t *testing.T, beforeServe func(s *grpc.Server)) (*grpc.Server, net.Addr) { server := grpc.NewServer() beforeServe(server) diff --git a/cmd/collector/app/handler/thrift_span_handler_test.go b/cmd/collector/app/handler/thrift_span_handler_test.go index 150645ba18f..55ba10cd4ce 100644 --- a/cmd/collector/app/handler/thrift_span_handler_test.go +++ b/cmd/collector/app/handler/thrift_span_handler_test.go @@ -76,6 +76,9 @@ func (s *shouldIErrorProcessor) ProcessSpans(mSpans []*model.Span, _ processor.S } return retMe, nil } +func (s *shouldIErrorProcessor) Close() error { + return nil +} func TestZipkinSpanHandler(t *testing.T) { testChunks := []struct { diff --git a/cmd/collector/app/processor/span.go b/cmd/collector/app/processor/span.go index 65ca34da22d..3ca1e8ea292 100644 --- a/cmd/collector/app/processor/span.go +++ b/cmd/collector/app/processor/span.go @@ -15,6 +15,8 @@ package processor import ( + "io" + "github.com/jaegertracing/jaeger/model" ) @@ -28,6 +30,7 @@ type SpansOptions struct { type SpanProcessor interface { // ProcessSpans processes model spans and return with either a list of true/false success or an error ProcessSpans(mSpans []*model.Span, options SpansOptions) ([]bool, error) + io.Closer } // InboundTransport identifies the transport used to receive spans. diff --git a/cmd/collector/app/server/test.go b/cmd/collector/app/server/test.go index 7a4bf7eaf47..76173eae0e6 100644 --- a/cmd/collector/app/server/test.go +++ b/cmd/collector/app/server/test.go @@ -29,6 +29,10 @@ func (s mockSamplingStore) GetSamplingStrategy(serviceName string) (*sampling.Sa type mockSpanProcessor struct { } +func (p *mockSpanProcessor) Close() error { + return nil +} + func (p *mockSpanProcessor) ProcessSpans(spans []*model.Span, _ processor.SpansOptions) ([]bool, error) { return []bool{}, nil } diff --git a/cmd/collector/app/span_handler_builder.go b/cmd/collector/app/span_handler_builder.go index 8c49395ae94..46fa5728eb5 100644 --- a/cmd/collector/app/span_handler_builder.go +++ b/cmd/collector/app/span_handler_builder.go @@ -22,6 +22,7 @@ import ( "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/collector/app/handler" + "github.com/jaegertracing/jaeger/cmd/collector/app/processor" zs "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer/zipkin" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/storage/spanstore" @@ -35,17 +36,20 @@ type SpanHandlerBuilder struct { MetricsFactory metrics.Factory } -// BuildHandlers builds span handlers (Zipkin, Jaeger) -func (b *SpanHandlerBuilder) BuildHandlers() ( - handler.ZipkinSpansHandler, - handler.JaegerBatchesHandler, - *handler.GRPCHandler, -) { +// SpanHandlers holds instances to the span handlers built by the SpanHandlerBuilder +type SpanHandlers struct { + ZipkinSpansHandler handler.ZipkinSpansHandler + JaegerBatchesHandler handler.JaegerBatchesHandler + GRPCHandler *handler.GRPCHandler +} + +// BuildSpanProcessor builds the span processor to be used with the handlers +func (b *SpanHandlerBuilder) BuildSpanProcessor() processor.SpanProcessor { hostname, _ := os.Hostname() svcMetrics := b.metricsFactory() hostMetrics := svcMetrics.Namespace(metrics.NSOptions{Tags: map[string]string{"host": hostname}}) - spanProcessor := NewSpanProcessor( + return NewSpanProcessor( b.SpanWriter, Options.ServiceMetrics(svcMetrics), Options.HostMetrics(hostMetrics), @@ -58,9 +62,15 @@ func (b *SpanHandlerBuilder) BuildHandlers() ( Options.DynQueueSizeMemory(b.CollectorOpts.DynQueueSizeMemory), ) - return handler.NewZipkinSpanHandler(b.Logger, spanProcessor, zs.NewChainedSanitizer(zs.StandardSanitizers...)), +} + +// BuildHandlers builds span handlers (Zipkin, Jaeger) +func (b *SpanHandlerBuilder) BuildHandlers(spanProcessor processor.SpanProcessor) *SpanHandlers { + return &SpanHandlers{ + handler.NewZipkinSpanHandler(b.Logger, spanProcessor, zs.NewChainedSanitizer(zs.StandardSanitizers...)), handler.NewJaegerSpanHandler(b.Logger, spanProcessor), - handler.NewGRPCHandler(b.Logger, spanProcessor) + handler.NewGRPCHandler(b.Logger, spanProcessor), + } } func defaultSpanFilter(*model.Span) bool { diff --git a/cmd/collector/app/span_handler_builder_test.go b/cmd/collector/app/span_handler_builder_test.go index e6313c405b7..772f9ce0835 100644 --- a/cmd/collector/app/span_handler_builder_test.go +++ b/cmd/collector/app/span_handler_builder_test.go @@ -50,10 +50,12 @@ func TestNewSpanHandlerBuilder(t *testing.T) { MetricsFactory: metrics.NullFactory, } - zipkin, jaeger, grpc := builder.BuildHandlers() - assert.NotNil(t, zipkin) - assert.NotNil(t, jaeger) - assert.NotNil(t, grpc) + spanProcessor := builder.BuildSpanProcessor() + spanHandlers := builder.BuildHandlers(spanProcessor) + assert.NotNil(t, spanHandlers.ZipkinSpansHandler) + assert.NotNil(t, spanHandlers.JaegerBatchesHandler) + assert.NotNil(t, spanHandlers.GRPCHandler) + assert.NotNil(t, spanProcessor) } func TestDefaultSpanFilter(t *testing.T) { diff --git a/cmd/collector/app/span_processor.go b/cmd/collector/app/span_processor.go index 5f2d77e49a6..5d3c214eb66 100644 --- a/cmd/collector/app/span_processor.go +++ b/cmd/collector/app/span_processor.go @@ -126,10 +126,11 @@ func newSpanProcessor(spanWriter spanstore.Writer, opts ...Option) *spanProcesso return &sp } -// Stop halts the span processor and all its go-routines. -func (sp *spanProcessor) Stop() { +func (sp *spanProcessor) Close() error { close(sp.stopCh) sp.queue.Stop() + + return nil } func (sp *spanProcessor) saveSpan(span *model.Span) { diff --git a/cmd/collector/app/span_processor_test.go b/cmd/collector/app/span_processor_test.go index e9184007103..118833a954b 100644 --- a/cmd/collector/app/span_processor_test.go +++ b/cmd/collector/app/span_processor_test.go @@ -37,8 +37,12 @@ import ( zc "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" ) -var _ (io.Closer) = (*fakeSpanWriter)(nil) -var blackListedService = "zoidberg" +var ( + _ io.Closer = (*fakeSpanWriter)(nil) + _ io.Closer = (*spanProcessor)(nil) + + blackListedService = "zoidberg" +) func TestBySvcMetrics(t *testing.T) { allowedService := "bender" @@ -215,7 +219,6 @@ func makeJaegerSpan(service string, rootSpan bool, debugEnabled bool) (*jaeger.S func TestSpanProcessor(t *testing.T) { w := &fakeSpanWriter{} p := NewSpanProcessor(w, Options.QueueSize(1)).(*spanProcessor) - defer p.Stop() res, err := p.ProcessSpans([]*model.Span{ { @@ -226,6 +229,7 @@ func TestSpanProcessor(t *testing.T) { }, processor.SpansOptions{SpanFormat: processor.JaegerSpanFormat}) assert.NoError(t, err) assert.Equal(t, []bool{true}, res) + assert.NoError(t, p.Close()) } func TestSpanProcessorErrors(t *testing.T) { @@ -251,7 +255,7 @@ func TestSpanProcessorErrors(t *testing.T) { assert.NoError(t, err) assert.Equal(t, []bool{true}, res) - p.Stop() + assert.NoError(t, p.Close()) assert.Equal(t, map[string]string{ "level": "error", @@ -282,7 +286,7 @@ func TestSpanProcessorBusy(t *testing.T) { Options.QueueSize(1), Options.ReportBusy(true), ).(*spanProcessor) - defer p.Stop() + defer assert.NoError(t, p.Close()) // block the writer so that the first span is read from the queue and blocks the processor, // and eiher the second or the third span is rejected since the queue capacity is just 1. @@ -317,7 +321,7 @@ func TestSpanProcessorWithNilProcess(t *testing.T) { w := &fakeSpanWriter{} p := NewSpanProcessor(w, Options.ServiceMetrics(serviceMetrics)).(*spanProcessor) - defer p.Stop() + defer assert.NoError(t, p.Close()) p.saveSpan(&model.Span{}) @@ -335,7 +339,7 @@ func TestSpanProcessorWithCollectorTags(t *testing.T) { w := &fakeSpanWriter{} p := NewSpanProcessor(w, Options.CollectorTags(testCollectorTags)).(*spanProcessor) - defer p.Stop() + defer assert.NoError(t, p.Close()) span := &model.Span{ Process: model.NewProcess("unit-test-service", []model.KeyValue{}), @@ -363,7 +367,6 @@ func TestSpanProcessorCountSpan(t *testing.T) { w := &fakeSpanWriter{} p := NewSpanProcessor(w, Options.HostMetrics(m), Options.DynQueueSizeMemory(1000)).(*spanProcessor) p.background(10*time.Millisecond, p.updateGauges) - defer p.Stop() p.processSpan(&model.Span{}) assert.NotEqual(t, uint64(0), p.bytesProcessed) @@ -378,6 +381,7 @@ func TestSpanProcessorCountSpan(t *testing.T) { } assert.Fail(t, "gauge hasn't been updated within a reasonable amount of time") + assert.NoError(t, p.Close()) } func TestUpdateDynQueueSize(t *testing.T) {