Skip to content

Commit

Permalink
Fix propagation issues for context in the new obsreport usage (#625)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored Mar 12, 2020
1 parent aa893e8 commit 4f385cd
Show file tree
Hide file tree
Showing 11 changed files with 80 additions and 90 deletions.
7 changes: 3 additions & 4 deletions exporter/exporterhelper/metricshelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,15 @@ func NewMetricsExporter(config configmodels.Exporter, pushMetricsData PushMetric

func pushMetricsWithObservability(next PushMetricsData, exporterName string) PushMetricsData {
return func(ctx context.Context, md consumerdata.MetricsData) (int, error) {
exporterCtx, span := obsreport.StartMetricsExportOp(ctx, exporterName)
numDroppedTimeSeries, err := next(exporterCtx, md)
ctx = obsreport.StartMetricsExportOp(ctx, exporterName)
numDroppedTimeSeries, err := next(ctx, md)

// TODO: this is not ideal: it should come from the next function itself.
// temporarily loading it from internal format. Once full switch is done
// to new metrics will remove this.
numReceivedTimeSeries, numPoints := measureMetricsExport(md)

obsreport.EndMetricsExportOp(
exporterCtx, span, numPoints, numReceivedTimeSeries, numDroppedTimeSeries, err)
obsreport.EndMetricsExportOp(ctx, numPoints, numReceivedTimeSeries, numDroppedTimeSeries, err)
return numDroppedTimeSeries, err
}
}
Expand Down
12 changes: 6 additions & 6 deletions exporter/exporterhelper/tracehelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,15 @@ func NewTraceExporter(
// the observability signals during the pusher execution.
func (p traceDataPusher) withObservability(exporterName string) traceDataPusher {
return func(ctx context.Context, td consumerdata.TraceData) (int, error) {
exporterCtx, span := obsreport.StartTraceDataExportOp(ctx, exporterName)
ctx = obsreport.StartTraceDataExportOp(ctx, exporterName)
// Forward the data to the next consumer (this pusher is the next).
droppedSpans, err := p(exporterCtx, td)
droppedSpans, err := p(ctx, td)

// TODO: this is not ideal: it should come from the next function itself.
// temporarily loading it from internal format. Once full switch is done
// to new metrics will remove this.
numSpans := len(td.Spans)
obsreport.EndTraceDataExportOp(exporterCtx, span, numSpans, droppedSpans, err)
obsreport.EndTraceDataExportOp(ctx, numSpans, droppedSpans, err)
return droppedSpans, err
}
}
Expand Down Expand Up @@ -173,15 +173,15 @@ func NewTraceExporterV2(
// the observability signals during the pusher execution.
func (p traceV2DataPusher) withObservability(exporterName string) traceV2DataPusher {
return func(ctx context.Context, td data.TraceData) (int, error) {
exporterCtx, span := obsreport.StartTraceDataExportOp(ctx, exporterName)
ctx = obsreport.StartTraceDataExportOp(ctx, exporterName)
// Forward the data to the next consumer (this pusher is the next).
droppedSpans, err := p(exporterCtx, td)
droppedSpans, err := p(ctx, td)

// TODO: this is not ideal: it should come from the next function itself.
// temporarily loading it from internal format. Once full switch is done
// to new metrics will remove this.
numSpans := td.SpanCount()
obsreport.EndTraceDataExportOp(exporterCtx, span, numSpans, droppedSpans, err)
obsreport.EndTraceDataExportOp(ctx, numSpans, droppedSpans, err)
return droppedSpans, err
}
}
7 changes: 4 additions & 3 deletions obsreport/obsreport.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,22 @@ var (
)

// SetParentLink tries to retrieve a span from parentCtx and if one exists
// sets its SpanID, TraceID as a link in the span provided. It returns
// true only if it retrieved a parent span from the context.
// sets its SpanID, TraceID as a link to the Span from the provided context.
// It returns true only if it retrieved a parent span from the context.
//
// This is typically used when the parentCtx may already have a trace and is
// long lived (eg.: an gRPC stream, or TCP connection) and one desires distinct
// traces for individual operations under the long lived trace associated to
// the parentCtx. This function is a helper that encapsulates the work of
// linking the short lived trace/span to the longer one.
func SetParentLink(parentCtx context.Context, span *trace.Span) bool {
func SetParentLink(ctx, parentCtx context.Context) bool {
parentSpanFromRPC := trace.FromContext(parentCtx)
if parentSpanFromRPC == nil {
return false
}

psc := parentSpanFromRPC.SpanContext()
span := trace.FromContext(ctx)
span.AddLink(trace.Link{
SpanID: psc.SpanID,
TraceID: psc.TraceID,
Expand Down
15 changes: 6 additions & 9 deletions obsreport/obsreport_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ var (
func StartTraceDataExportOp(
operationCtx context.Context,
exporter string,
) (context.Context, *trace.Span) {
) context.Context {
return traceExportDataOp(
operationCtx,
exporter,
Expand All @@ -82,7 +82,6 @@ func StartTraceDataExportOp(
// StartTraceDataExportOp.
func EndTraceDataExportOp(
exporterCtx context.Context,
span *trace.Span,
numExportedSpans int,
numDroppedSpans int, // TODO: For legacy measurements, to be removed in the future.
err error,
Expand All @@ -94,7 +93,6 @@ func EndTraceDataExportOp(

endExportOp(
exporterCtx,
span,
numExportedSpans,
err,
configmodels.TracesDataType,
Expand All @@ -107,7 +105,7 @@ func EndTraceDataExportOp(
func StartMetricsExportOp(
operationCtx context.Context,
exporter string,
) (context.Context, *trace.Span) {
) context.Context {
return traceExportDataOp(
operationCtx,
exporter,
Expand All @@ -118,7 +116,6 @@ func StartMetricsExportOp(
// StartMetricsExportOp.
func EndMetricsExportOp(
exporterCtx context.Context,
span *trace.Span,
numExportedPoints int,
numExportedTimeSeries int, // TODO: For legacy measurements, to be removed in the future.
numDroppedTimeSeries int, // TODO: For legacy measurements, to be removed in the future.
Expand All @@ -131,7 +128,6 @@ func EndMetricsExportOp(

endExportOp(
exporterCtx,
span,
numExportedPoints,
err,
configmodels.MetricsDataType,
Expand Down Expand Up @@ -164,15 +160,15 @@ func traceExportDataOp(
exporterCtx context.Context,
exporterName string,
operationSuffix string,
) (context.Context, *trace.Span) {
) context.Context {
spanName := exporterPrefix + exporterName + operationSuffix
return trace.StartSpan(exporterCtx, spanName)
ctx, _ := trace.StartSpan(exporterCtx, spanName)
return ctx
}

// endExportOp records the observability signals at the end of an operation.
func endExportOp(
exporterCtx context.Context,
span *trace.Span,
numExportedItems int,
err error,
dataType configmodels.DataType,
Expand Down Expand Up @@ -201,6 +197,7 @@ func endExportOp(
failedToSendMeasure.M(int64(numFailedToSend)))
}

span := trace.FromContext(exporterCtx)
// End span according to errors.
if span.IsRecordingEvents() {
var sentItemsKey, failedToSendItemsKey string
Expand Down
15 changes: 6 additions & 9 deletions obsreport/obsreport_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func StartTraceDataReceiveOp(
operationCtx context.Context,
receiver string,
transport string,
) (context.Context, *trace.Span) {
) context.Context {
return traceReceiveTraceDataOp(
operationCtx,
receiver,
Expand All @@ -87,7 +87,6 @@ func StartTraceDataReceiveOp(
// StartTraceDataReceiveOp.
func EndTraceDataReceiveOp(
receiverCtx context.Context,
span *trace.Span,
format string,
numReceivedSpans int,
err error,
Expand All @@ -105,7 +104,6 @@ func EndTraceDataReceiveOp(

endReceiveOp(
receiverCtx,
span,
format,
numReceivedSpans,
err,
Expand All @@ -120,7 +118,7 @@ func StartMetricsReceiveOp(
operationCtx context.Context,
receiver string,
transport string,
) (context.Context, *trace.Span) {
) context.Context {
return traceReceiveTraceDataOp(
operationCtx,
receiver,
Expand All @@ -132,7 +130,6 @@ func StartMetricsReceiveOp(
// StartMetricsReceiveOp.
func EndMetricsReceiveOp(
receiverCtx context.Context,
span *trace.Span,
format string,
numReceivedPoints int,
numReceivedTimeSeries int, // For legacy measurements.
Expand All @@ -150,7 +147,6 @@ func EndMetricsReceiveOp(

endReceiveOp(
receiverCtx,
span,
format,
numReceivedPoints,
err,
Expand Down Expand Up @@ -194,18 +190,17 @@ func traceReceiveTraceDataOp(
receiverName string,
transport string,
operationSuffix string,
) (context.Context, *trace.Span) {
) context.Context {
spanName := receiverPrefix + receiverName + operationSuffix
ctx, span := trace.StartSpan(receiverCtx, spanName)
span.AddAttributes(trace.StringAttribute(
TransportKey, transport))
return ctx, span
return ctx
}

// endReceiveOp records the observability signals at the end of an operation.
func endReceiveOp(
receiverCtx context.Context,
span *trace.Span,
format string,
numReceivedItems int,
err error,
Expand All @@ -218,6 +213,8 @@ func endReceiveOp(
numRefused = numReceivedItems
}

span := trace.FromContext(receiverCtx)

if useNew {
var acceptedMeasure, refusedMeasure *stats.Int64Measure
switch dataType {
Expand Down
18 changes: 5 additions & 13 deletions obsreport/obsreport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,11 @@ func Test_obsreport_ReceiveTraceDataOp(t *testing.T) {
errs := []error{nil, errFake}
rcvdSpans := []int{13, 42}
for i, err := range errs {
ctx, span := StartTraceDataReceiveOp(receiverCtx, receiver, transport)
ctx := StartTraceDataReceiveOp(receiverCtx, receiver, transport)
assert.NotNil(t, ctx)
assert.NotNil(t, span)

EndTraceDataReceiveOp(
ctx,
span,
format,
rcvdSpans[i],
err)
Expand Down Expand Up @@ -169,13 +167,11 @@ func Test_obsreport_ReceiveMetricsOp(t *testing.T) {
rcvdMetricPts := []int{23, 29}
rcvdTimeSeries := []int{2, 3}
for i, err := range errs {
ctx, span := StartMetricsReceiveOp(receiverCtx, receiver, transport)
ctx := StartMetricsReceiveOp(receiverCtx, receiver, transport)
assert.NotNil(t, ctx)
assert.NotNil(t, span)

EndMetricsReceiveOp(
ctx,
span,
format,
rcvdMetricPts[i],
rcvdTimeSeries[i],
Expand Down Expand Up @@ -238,17 +234,15 @@ func Test_obsreport_ExportTraceDataOp(t *testing.T) {
errs := []error{nil, errFake}
numExportedSpans := []int{22, 14}
for i, err := range errs {
ctx, span := StartTraceDataExportOp(exporterCtx, exporter)
ctx := StartTraceDataExportOp(exporterCtx, exporter)
assert.NotNil(t, ctx)
assert.NotNil(t, span)

var numDroppedSpans int
if err != nil {
numDroppedSpans = numExportedSpans[i]
}

EndTraceDataExportOp(
ctx, span, numExportedSpans[i], numDroppedSpans, err)
EndTraceDataExportOp(ctx, numExportedSpans[i], numDroppedSpans, err)
}

spans := ss.PullAllSpans()
Expand Down Expand Up @@ -304,9 +298,8 @@ func Test_obsreport_ExportMetricsOp(t *testing.T) {
toSendMetricPts := []int{17, 23}
toSendTimeSeries := []int{3, 5}
for i, err := range errs {
ctx, span := StartMetricsExportOp(exporterCtx, exporter)
ctx := StartMetricsExportOp(exporterCtx, exporter)
assert.NotNil(t, ctx)
assert.NotNil(t, span)

var numDroppedTimeSeires int
if err != nil {
Expand All @@ -315,7 +308,6 @@ func Test_obsreport_ExportMetricsOp(t *testing.T) {

EndMetricsExportOp(
ctx,
span,
toSendMetricPts[i],
toSendTimeSeries[i],
numDroppedTimeSeires,
Expand Down
43 changes: 20 additions & 23 deletions receiver/jaegerreceiver/trace_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,26 +329,24 @@ func consumeTraceData(
}

func (jr *jReceiver) SubmitBatches(batches []*jaeger.Batch, options handler.SubmitBatchOptions) ([]*jaeger.BatchSubmitResponse, error) {
ctx := context.Background()
receiverCtx := obsreport.ReceiverContext(
ctx, jr.instanceName, collectorHTTPTransport, collectorReceiverTagValue)
_, span := obsreport.StartTraceDataReceiveOp(
receiverCtx, jr.instanceName, collectorHTTPTransport)
ctx := obsreport.ReceiverContext(
context.Background(), jr.instanceName, collectorHTTPTransport, collectorReceiverTagValue)
ctx = obsreport.StartTraceDataReceiveOp(
ctx, jr.instanceName, collectorHTTPTransport)

jbsr, numSpans, err := consumeTraceData(receiverCtx, batches, jr.nextConsumer)
obsreport.EndTraceDataReceiveOp(receiverCtx, span, thriftFormat, numSpans, err)
jbsr, numSpans, err := consumeTraceData(ctx, batches, jr.nextConsumer)
obsreport.EndTraceDataReceiveOp(ctx, thriftFormat, numSpans, err)

return jbsr, err
}

func (jtr *jTchannelReceiver) SubmitBatches(ctx thrift.Context, batches []*jaeger.Batch) ([]*jaeger.BatchSubmitResponse, error) {
receiverCtx := obsreport.ReceiverContext(
ctx, jtr.instanceName, collectorTChannelTransport, tchannelCollectorReceiverTagValue)
_, span := obsreport.StartTraceDataReceiveOp(
ctx, jtr.instanceName, collectorTChannelTransport)
func (jtr *jTchannelReceiver) SubmitBatches(thriftCtx thrift.Context, batches []*jaeger.Batch) ([]*jaeger.BatchSubmitResponse, error) {
ctx := obsreport.ReceiverContext(
thriftCtx, jtr.instanceName, collectorTChannelTransport, tchannelCollectorReceiverTagValue)
ctx = obsreport.StartTraceDataReceiveOp(ctx, jtr.instanceName, collectorTChannelTransport)

jbsr, numSpans, err := consumeTraceData(receiverCtx, batches, jtr.nextConsumer)
obsreport.EndTraceDataReceiveOp(receiverCtx, span, thriftFormat, numSpans, err)
jbsr, numSpans, err := consumeTraceData(ctx, batches, jtr.nextConsumer)
obsreport.EndTraceDataReceiveOp(ctx, thriftFormat, numSpans, err)

return jbsr, err
}
Expand All @@ -366,14 +364,14 @@ func (jr *jReceiver) EmitZipkinBatch(spans []*zipkincore.Span) error {
// EmitBatch implements cmd/agent/reporter.Reporter and it forwards
// Jaeger spans received by the Jaeger agent processor.
func (jr *jReceiver) EmitBatch(batch *jaeger.Batch) error {
_, span := obsreport.StartTraceDataReceiveOp(
context.Background(), jr.instanceName, agentTransport)
ctx := obsreport.StartTraceDataReceiveOp(
jr.defaultAgentCtx, jr.instanceName, agentTransport)
// TODO: call below never returns error it remove from the signature
td, _ := jaegertranslator.ThriftBatchToOCProto(batch)
td.SourceFormat = "jaeger"

err := jr.nextConsumer.ConsumeTraceData(jr.defaultAgentCtx, td)
obsreport.EndTraceDataReceiveOp(jr.defaultAgentCtx, span, thriftFormat, len(batch.Spans), err)
err := jr.nextConsumer.ConsumeTraceData(ctx, td)
obsreport.EndTraceDataReceiveOp(ctx, thriftFormat, len(batch.Spans), err)

return err
}
Expand All @@ -398,17 +396,16 @@ func (jr *jReceiver) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest)
ctx = client.NewContext(ctx, c)
}

receiverCtx := obsreport.ReceiverContext(
ctx = obsreport.ReceiverContext(
ctx, jr.instanceName, grpcTransport, collectorReceiverTagValue)
_, span := obsreport.StartTraceDataReceiveOp(
ctx, jr.instanceName, grpcTransport)
ctx = obsreport.StartTraceDataReceiveOp(ctx, jr.instanceName, grpcTransport)

// TODO: the function below never returns error, change its interface.
td, _ := jaegertranslator.ProtoBatchToOCProto(r.GetBatch())
td.SourceFormat = "jaeger"

err := jr.nextConsumer.ConsumeTraceData(receiverCtx, td)
obsreport.EndTraceDataReceiveOp(receiverCtx, span, protobufFormat, len(r.GetBatch().Spans), err)
err := jr.nextConsumer.ConsumeTraceData(ctx, td)
obsreport.EndTraceDataReceiveOp(ctx, protobufFormat, len(r.GetBatch().Spans), err)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 4f385cd

Please sign in to comment.