Skip to content

Commit

Permalink
Add Shutdown() function to processors (#379)
Browse files Browse the repository at this point in the history
Shutdown() method is added so that we can implement proper
pipeline flushing during collector shutdown. This will be done
in a future PR.
  • Loading branch information
tigrannajaryan authored Oct 8, 2019
1 parent 3a0aa77 commit 01d8d94
Show file tree
Hide file tree
Showing 10 changed files with 57 additions and 3 deletions.
5 changes: 5 additions & 0 deletions processor/attributesprocessor/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ func (a *attributesProcessor) GetCapabilities() processor.Capabilities {
return processor.Capabilities{MutatesConsumedData: true}
}

// Shutdown is invoked during service shutdown.
func (a *attributesProcessor) Shutdown() error {
return nil
}

func insertAttribute(action attributeAction, attributesMap map[string]*tracepb.AttributeValue) {
// Insert is only performed when the target key does not already exist
// in the attribute map.
Expand Down
6 changes: 6 additions & 0 deletions processor/nodebatcherprocessor/node_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ func (b *batcher) GetCapabilities() processor.Capabilities {
return processor.Capabilities{MutatesConsumedData: false}
}

// Shutdown is invoked during service shutdown.
func (b *batcher) Shutdown() error {
// TODO: flush accumulated data.
return nil
}

func (b *batcher) genBucketID(node *commonpb.Node, resource *resourcepb.Resource, spanFormat string) string {
h := sha256.New()
if node != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ func (tsp *tracesamplerprocessor) GetCapabilities() processor.Capabilities {
return processor.Capabilities{MutatesConsumedData: false}
}

// Shutdown is invoked during service shutdown.
func (tsp *tracesamplerprocessor) Shutdown() error {
return nil
}

// hash is a murmur3 hash function, see http://en.wikipedia.org/wiki/MurmurHash.
func hash(key []byte, seed uint32) (hash uint32) {
const (
Expand Down
15 changes: 13 additions & 2 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,27 @@ import (
"github.com/open-telemetry/opentelemetry-collector/consumer"
)

// Processor defines the common functions that must be implemented by TraceProcessor
// and MetricsProcessor.
type Processor interface {
// GetCapabilities must return the capabilities of the processor.
GetCapabilities() Capabilities

// Shutdown is invoked during service shutdown. Typically used to flush the data
// that may be accumulated in the processor.
Shutdown() error
}

// TraceProcessor composes TraceConsumer with some additional processor-specific functions.
type TraceProcessor interface {
consumer.TraceConsumer
GetCapabilities() Capabilities
Processor
}

// MetricsProcessor composes MetricsConsumer with some additional processor-specific functions.
type MetricsProcessor interface {
consumer.MetricsConsumer
GetCapabilities() Capabilities
Processor
}

// Capabilities describes the capabilities of TraceProcessor or MetricsProcessor.
Expand Down
5 changes: 5 additions & 0 deletions processor/processortest/nop_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ func (np *nopProcessor) GetCapabilities() processor.Capabilities {
return processor.Capabilities{MutatesConsumedData: false}
}

// Shutdown is invoked during service shutdown.
func (np *nopProcessor) Shutdown() error {
return nil
}

// NewNopTraceProcessor creates an TraceProcessor that just pass the received data to the nextTraceProcessor.
func NewNopTraceProcessor(nextTraceProcessor consumer.TraceConsumer) consumer.TraceConsumer {
return &nopProcessor{nextTraceProcessor: nextTraceProcessor}
Expand Down
6 changes: 6 additions & 0 deletions processor/queuedprocessor/queued_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ func (sp *queuedSpanProcessor) GetCapabilities() processor.Capabilities {
return processor.Capabilities{MutatesConsumedData: false}
}

// Shutdown is invoked during service shutdown.
func (sp *queuedSpanProcessor) Shutdown() error {
// TODO: flush the queue.
return nil
}

func (sp *queuedSpanProcessor) processItemFromQueue(item *queueItem) {
startTime := time.Now()
err := sp.sender.ConsumeTraceData(item.ctx, item.td)
Expand Down
5 changes: 5 additions & 0 deletions processor/spanprocessor/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ func (sp *spanProcessor) GetCapabilities() processor.Capabilities {
return processor.Capabilities{MutatesConsumedData: true}
}

// Shutdown is invoked during service shutdown.
func (sp *spanProcessor) Shutdown() error {
return nil
}

func (sp *spanProcessor) nameSpan(span *tracepb.Span) {
// Note: There was a separate proposal for creating the string.
// With benchmarking, strings.Builder is faster than the proposal.
Expand Down
5 changes: 5 additions & 0 deletions processor/tailsamplingprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,11 @@ func (tsp *tailSamplingSpanProcessor) GetCapabilities() processor.Capabilities {
return processor.Capabilities{MutatesConsumedData: false}
}

// Shutdown is invoked during service shutdown.
func (tsp *tailSamplingSpanProcessor) Shutdown() error {
return nil
}

func (tsp *tailSamplingSpanProcessor) dropTrace(traceID traceKey, deletionTime time.Time) {
var trace *sampling.TraceData
if d, ok := tsp.idToTrace.Load(traceID); ok {
Expand Down
5 changes: 5 additions & 0 deletions processor/tailsamplingprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,3 +331,8 @@ func (p *mockSpanProcessor) ConsumeTraceData(ctx context.Context, td consumerdat
func (p *mockSpanProcessor) GetCapabilities() processor.Capabilities {
return processor.Capabilities{MutatesConsumedData: false}
}

// Shutdown is invoked during service shutdown.
func (p *mockSpanProcessor) Shutdown() error {
return nil
}
3 changes: 2 additions & 1 deletion service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,8 @@ func (app *Application) shutdownPipelines() {
app.logger.Info("Stopping receivers...")
app.builtReceivers.StopAll()

// TODO: shutdown processors
// TODO: shutdown processors by calling Shutdown() for each processor in the
// order they are arranged in the pipeline.

app.logger.Info("Shutting down exporters...")
app.exporters.ShutdownAll()
Expand Down

0 comments on commit 01d8d94

Please sign in to comment.