From bae8e25dd0289a7a6cac414870cc7eb2dff4407b Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Sat, 1 Sep 2018 00:51:40 -0400 Subject: [PATCH] Enable tracing of Cassandra queries Signed-off-by: Yuri Shkuro --- cmd/query/app/handler.go | 22 ++--- .../storage/cassandra/savetracetest/main.go | 16 ++-- plugin/storage/cassandra/spanstore/reader.go | 89 ++++++++++++++----- .../cassandra/spanstore/reader_test.go | 13 +-- plugin/storage/es/spanstore/reader.go | 8 +- plugin/storage/es/spanstore/reader_test.go | 31 +++---- .../es/spanstore/service_operation_test.go | 5 +- .../storage/integration/integration_test.go | 11 +-- plugin/storage/integration/kafka_test.go | 11 +-- plugin/storage/memory/memory.go | 9 +- plugin/storage/memory/memory_test.go | 15 ++-- storage/spanstore/interface.go | 9 +- storage/spanstore/metrics/decorator.go | 17 ++-- storage/spanstore/metrics/decorator_test.go | 17 ++-- storage/spanstore/mocks/Reader.go | 19 ++-- 15 files changed, 177 insertions(+), 115 deletions(-) diff --git a/cmd/query/app/handler.go b/cmd/query/app/handler.go index 36ad56e5e52..512dcde1a76 100644 --- a/cmd/query/app/handler.go +++ b/cmd/query/app/handler.go @@ -15,6 +15,7 @@ package app import ( + "context" "encoding/json" "fmt" "net/http" @@ -153,7 +154,7 @@ func (aH *APIHandler) route(route string, args ...interface{}) string { } func (aH *APIHandler) getServices(w http.ResponseWriter, r *http.Request) { - services, err := aH.spanReader.GetServices() + services, err := aH.spanReader.GetServices(r.Context()) if aH.handleError(w, err, http.StatusInternalServerError) { return } @@ -168,7 +169,7 @@ func (aH *APIHandler) getOperationsLegacy(w http.ResponseWriter, r *http.Request vars := mux.Vars(r) // given how getOperationsLegacy is bound to URL route, serviceParam cannot be empty service, _ := url.QueryUnescape(vars[serviceParam]) - operations, err := aH.spanReader.GetOperations(service) + operations, err := aH.spanReader.GetOperations(r.Context(), service) if aH.handleError(w, err, http.StatusInternalServerError) { return } @@ -186,7 +187,7 @@ func (aH *APIHandler) getOperations(w http.ResponseWriter, r *http.Request) { return } } - operations, err := aH.spanReader.GetOperations(service) + operations, err := aH.spanReader.GetOperations(r.Context(), service) if aH.handleError(w, err, http.StatusInternalServerError) { return } @@ -206,12 +207,12 @@ func (aH *APIHandler) search(w http.ResponseWriter, r *http.Request) { var uiErrors []structuredError var tracesFromStorage []*model.Trace if len(tQuery.traceIDs) > 0 { - tracesFromStorage, uiErrors, err = aH.tracesByIDs(tQuery.traceIDs) + tracesFromStorage, uiErrors, err = aH.tracesByIDs(r.Context(), tQuery.traceIDs) if aH.handleError(w, err, http.StatusInternalServerError) { return } } else { - tracesFromStorage, err = aH.spanReader.FindTraces(&tQuery.TraceQueryParameters) + tracesFromStorage, err = aH.spanReader.FindTraces(r.Context(), &tQuery.TraceQueryParameters) if aH.handleError(w, err, http.StatusInternalServerError) { return } @@ -233,11 +234,11 @@ func (aH *APIHandler) search(w http.ResponseWriter, r *http.Request) { aH.writeJSON(w, r, &structuredRes) } -func (aH *APIHandler) tracesByIDs(traceIDs []model.TraceID) ([]*model.Trace, []structuredError, error) { +func (aH *APIHandler) tracesByIDs(ctx context.Context, traceIDs []model.TraceID) ([]*model.Trace, []structuredError, error) { var errors []structuredError retMe := make([]*model.Trace, 0, len(traceIDs)) for _, traceID := range traceIDs { - if trace, err := trace(traceID, aH.spanReader, aH.archiveSpanReader); err != nil { + if trace, err := trace(ctx, traceID, aH.spanReader, aH.archiveSpanReader); err != nil { if err != spanstore.ErrTraceNotFound { return nil, nil, err } @@ -399,7 +400,7 @@ func (aH *APIHandler) withTraceFromReader( if !ok { return } - trace, err := trace(traceID, reader, backupReader) + trace, err := trace(r.Context(), traceID, reader, backupReader) if err == spanstore.ErrTraceNotFound { aH.handleError(w, err, http.StatusNotFound) return @@ -411,16 +412,17 @@ func (aH *APIHandler) withTraceFromReader( } func trace( + ctx context.Context, traceID model.TraceID, reader spanstore.Reader, backupReader spanstore.Reader, ) (*model.Trace, error) { - trace, err := reader.GetTrace(traceID) + trace, err := reader.GetTrace(ctx, traceID) if err == spanstore.ErrTraceNotFound { if backupReader == nil { return nil, err } - trace, err = backupReader.GetTrace(traceID) + trace, err = backupReader.GetTrace(ctx, traceID) } return trace, err } diff --git a/plugin/storage/cassandra/savetracetest/main.go b/plugin/storage/cassandra/savetracetest/main.go index 5c6805daf84..73c1ffe8d59 100644 --- a/plugin/storage/cassandra/savetracetest/main.go +++ b/plugin/storage/cassandra/savetracetest/main.go @@ -15,6 +15,7 @@ package main import ( + "context" "time" "github.com/uber/jaeger-lib/metrics" @@ -50,7 +51,8 @@ func main() { logger.Info("Saved span", zap.String("spanID", getSomeSpan().SpanID.String())) } s := getSomeSpan() - trace, err := spanReader.GetTrace(s.TraceID) + ctx := context.Background() + trace, err := spanReader.GetTrace(ctx, s.TraceID) if err != nil { logger.Fatal("Failed to read", zap.Error(err)) } else { @@ -63,27 +65,27 @@ func main() { StartTimeMax: time.Now().Add(time.Hour), } logger.Info("Check main query") - queryAndPrint(spanReader, tqp) + queryAndPrint(ctx, spanReader, tqp) tqp.OperationName = "opName" logger.Info("Check query with operation") - queryAndPrint(spanReader, tqp) + queryAndPrint(ctx, spanReader, tqp) tqp.Tags = map[string]string{ "someKey": "someVal", } logger.Info("Check query with operation name and tags") - queryAndPrint(spanReader, tqp) + queryAndPrint(ctx, spanReader, tqp) tqp.DurationMin = 0 tqp.DurationMax = time.Hour tqp.Tags = map[string]string{} logger.Info("check query with duration") - queryAndPrint(spanReader, tqp) + queryAndPrint(ctx, spanReader, tqp) } -func queryAndPrint(spanReader *cSpanStore.SpanReader, tqp *spanstore.TraceQueryParameters) { - traces, err := spanReader.FindTraces(tqp) +func queryAndPrint(ctx context.Context, spanReader *cSpanStore.SpanReader, tqp *spanstore.TraceQueryParameters) { + traces, err := spanReader.FindTraces(ctx, tqp) if err != nil { logger.Fatal("Failed to query", zap.Error(err)) } else { diff --git a/plugin/storage/cassandra/spanstore/reader.go b/plugin/storage/cassandra/spanstore/reader.go index 26b62d3c4eb..f4762f9c18f 100644 --- a/plugin/storage/cassandra/spanstore/reader.go +++ b/plugin/storage/cassandra/spanstore/reader.go @@ -15,8 +15,12 @@ package spanstore import ( + "context" "time" + "github.com/opentracing/opentracing-go" + ottag "github.com/opentracing/opentracing-go/ext" + otlog "github.com/opentracing/opentracing-go/log" "github.com/pkg/errors" "github.com/uber/jaeger-lib/metrics" "go.uber.org/zap" @@ -132,17 +136,27 @@ func NewSpanReader( } // GetServices returns all services traced by Jaeger -func (s *SpanReader) GetServices() ([]string, error) { +func (s *SpanReader) GetServices(ctx context.Context) ([]string, error) { return s.serviceNamesReader() } // GetOperations returns all operations for a specific service traced by Jaeger -func (s *SpanReader) GetOperations(service string) ([]string, error) { +func (s *SpanReader) GetOperations(ctx context.Context, service string) ([]string, error) { return s.operationNamesReader(service) } -func (s *SpanReader) readTrace(traceID dbmodel.TraceID) (*model.Trace, error) { +func (s *SpanReader) readTrace(ctx context.Context, traceID dbmodel.TraceID) (*model.Trace, error) { + span, ctx := startSpanForQuery(ctx, "readTrace", querySpanByTraceID) + defer span.Finish() + span.SetTag("traceID", traceID) + + trace, err := s.readTraceInSpan(ctx, traceID) + logErrorToSpan(span, err) + return trace, err +} + +func (s *SpanReader) readTraceInSpan(ctx context.Context, traceID dbmodel.TraceID) (*model.Trace, error) { start := time.Now() q := s.session.Query(querySpanByTraceID, traceID) i := q.Iter() @@ -191,8 +205,8 @@ func (s *SpanReader) readTrace(traceID dbmodel.TraceID) (*model.Trace, error) { } // GetTrace takes a traceID and returns a Trace associated with that traceID -func (s *SpanReader) GetTrace(traceID model.TraceID) (*model.Trace, error) { - return s.readTrace(dbmodel.TraceIDFromDomain(traceID)) +func (s *SpanReader) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) { + return s.readTrace(ctx, dbmodel.TraceIDFromDomain(traceID)) } func validateQuery(p *spanstore.TraceQueryParameters) error { @@ -218,14 +232,14 @@ func validateQuery(p *spanstore.TraceQueryParameters) error { } // FindTraces retrieves traces that match the traceQuery -func (s *SpanReader) FindTraces(traceQuery *spanstore.TraceQueryParameters) ([]*model.Trace, error) { +func (s *SpanReader) FindTraces(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) ([]*model.Trace, error) { if err := validateQuery(traceQuery); err != nil { return nil, err } if traceQuery.NumTraces == 0 { traceQuery.NumTraces = defaultNumTraces } - uniqueTraceIDs, err := s.findTraceIDs(traceQuery) + uniqueTraceIDs, err := s.findTraceIDs(ctx, traceQuery) if err != nil { return nil, err } @@ -234,7 +248,7 @@ func (s *SpanReader) FindTraces(traceQuery *spanstore.TraceQueryParameters) ([]* if len(retMe) >= traceQuery.NumTraces { break } - jTrace, err := s.readTrace(traceID) + jTrace, err := s.readTrace(ctx, traceID) if err != nil { s.logger.Error("Failure to read trace", zap.String("trace_id", traceID.String()), zap.Error(err)) continue @@ -244,18 +258,18 @@ func (s *SpanReader) FindTraces(traceQuery *spanstore.TraceQueryParameters) ([]* return retMe, nil } -func (s *SpanReader) findTraceIDs(traceQuery *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) { +func (s *SpanReader) findTraceIDs(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) { if traceQuery.DurationMin != 0 || traceQuery.DurationMax != 0 { - return s.queryByDuration(traceQuery) + return s.queryByDuration(ctx, traceQuery) } if traceQuery.OperationName != "" { - traceIds, err := s.queryByServiceNameAndOperation(traceQuery) + traceIds, err := s.queryByServiceNameAndOperation(ctx, traceQuery) if err != nil { return nil, err } if len(traceQuery.Tags) > 0 { - tagTraceIds, err := s.queryByTagsAndLogs(traceQuery) + tagTraceIds, err := s.queryByTagsAndLogs(ctx, traceQuery) if err != nil { return nil, err } @@ -267,12 +281,15 @@ func (s *SpanReader) findTraceIDs(traceQuery *spanstore.TraceQueryParameters) (d return traceIds, nil } if len(traceQuery.Tags) > 0 { - return s.queryByTagsAndLogs(traceQuery) + return s.queryByTagsAndLogs(ctx, traceQuery) } - return s.queryByService(traceQuery) + return s.queryByService(ctx, traceQuery) } -func (s *SpanReader) queryByTagsAndLogs(tq *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) { +func (s *SpanReader) queryByTagsAndLogs(ctx context.Context, tq *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) { + span, ctx := startSpanForQuery(ctx, "queryByTagsAndLogs", queryByTag) + defer span.Finish() + results := make([]dbmodel.UniqueTraceIDs, 0, len(tq.Tags)) for k, v := range tq.Tags { query := s.session.Query( @@ -284,7 +301,8 @@ func (s *SpanReader) queryByTagsAndLogs(tq *spanstore.TraceQueryParameters) (dbm model.TimeAsEpochMicroseconds(tq.StartTimeMax), tq.NumTraces*limitMultiple, ).PageSize(0) - t, err := s.executeQuery(query, s.metrics.queryTagIndex) + // TODO should have span per iteration + t, err := s.executeQuery(span, query, s.metrics.queryTagIndex) if err != nil { return nil, err } @@ -293,7 +311,10 @@ func (s *SpanReader) queryByTagsAndLogs(tq *spanstore.TraceQueryParameters) (dbm return dbmodel.IntersectTraceIDs(results), nil } -func (s *SpanReader) queryByDuration(traceQuery *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) { +func (s *SpanReader) queryByDuration(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) { + span, ctx := startSpanForQuery(ctx, "queryByDuration", queryByDuration) + defer span.Finish() + results := dbmodel.UniqueTraceIDs{} minDurationMicros := traceQuery.DurationMin.Nanoseconds() / int64(time.Microsecond/time.Nanosecond) @@ -316,7 +337,8 @@ func (s *SpanReader) queryByDuration(traceQuery *spanstore.TraceQueryParameters) minDurationMicros, maxDurationMicros, traceQuery.NumTraces*limitMultiple) - t, err := s.executeQuery(query, s.metrics.queryDurationIndex) + // TODO should have span for each iteration + t, err := s.executeQuery(span, query, s.metrics.queryDurationIndex) if err != nil { return nil, err } @@ -331,7 +353,9 @@ func (s *SpanReader) queryByDuration(traceQuery *spanstore.TraceQueryParameters) return results, nil } -func (s *SpanReader) queryByServiceNameAndOperation(tq *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) { +func (s *SpanReader) queryByServiceNameAndOperation(ctx context.Context, tq *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) { + span, ctx := startSpanForQuery(ctx, "queryByServiceNameAndOperation", queryByServiceAndOperationName) + defer span.Finish() query := s.session.Query( queryByServiceAndOperationName, tq.ServiceName, @@ -340,10 +364,12 @@ func (s *SpanReader) queryByServiceNameAndOperation(tq *spanstore.TraceQueryPara model.TimeAsEpochMicroseconds(tq.StartTimeMax), tq.NumTraces*limitMultiple, ).PageSize(0) - return s.executeQuery(query, s.metrics.queryServiceOperationIndex) + return s.executeQuery(span, query, s.metrics.queryServiceOperationIndex) } -func (s *SpanReader) queryByService(tq *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) { +func (s *SpanReader) queryByService(ctx context.Context, tq *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) { + span, ctx := startSpanForQuery(ctx, "queryByService", queryByServiceName) + defer span.Finish() query := s.session.Query( queryByServiceName, tq.ServiceName, @@ -351,10 +377,10 @@ func (s *SpanReader) queryByService(tq *spanstore.TraceQueryParameters) (dbmodel model.TimeAsEpochMicroseconds(tq.StartTimeMax), tq.NumTraces*limitMultiple, ).PageSize(0) - return s.executeQuery(query, s.metrics.queryServiceNameIndex) + return s.executeQuery(span, query, s.metrics.queryServiceNameIndex) } -func (s *SpanReader) executeQuery(query cassandra.Query, tableMetrics *casMetrics.Table) (dbmodel.UniqueTraceIDs, error) { +func (s *SpanReader) executeQuery(span opentracing.Span, query cassandra.Query, tableMetrics *casMetrics.Table) (dbmodel.UniqueTraceIDs, error) { start := time.Now() i := query.Iter() retMe := dbmodel.UniqueTraceIDs{} @@ -365,8 +391,25 @@ func (s *SpanReader) executeQuery(query cassandra.Query, tableMetrics *casMetric err := i.Close() tableMetrics.Emit(err, time.Since(start)) if err != nil { + logErrorToSpan(span, err) s.logger.Error("Failed to exec query", zap.Error(err)) return nil, err } return retMe, nil } + +func startSpanForQuery(ctx context.Context, name, query string) (opentracing.Span, context.Context) { + span, ctx := opentracing.StartSpanFromContext(ctx, name) + ottag.DBStatement.Set(span, query) + ottag.DBType.Set(span, "cassandra") + ottag.Component.Set(span, "gocql") + return span, ctx +} + +func logErrorToSpan(span opentracing.Span, err error) { + if err == nil { + return + } + ottag.Error.Set(span, true) + span.LogFields(otlog.Error(err)) +} diff --git a/plugin/storage/cassandra/spanstore/reader_test.go b/plugin/storage/cassandra/spanstore/reader_test.go index 10265311f32..8499aa6800a 100644 --- a/plugin/storage/cassandra/spanstore/reader_test.go +++ b/plugin/storage/cassandra/spanstore/reader_test.go @@ -15,6 +15,7 @@ package spanstore import ( + "context" "errors" "strings" "testing" @@ -59,7 +60,7 @@ var _ spanstore.Reader = &SpanReader{} // check API conformance func TestSpanReaderGetServices(t *testing.T) { withSpanReader(func(r *spanReaderTest) { r.reader.serviceNamesReader = func() ([]string, error) { return []string{"service-a"}, nil } - s, err := r.reader.GetServices() + s, err := r.reader.GetServices(context.Background()) assert.NoError(t, err) assert.Equal(t, []string{"service-a"}, s) }) @@ -68,7 +69,7 @@ func TestSpanReaderGetServices(t *testing.T) { func TestSpanReaderGetOperations(t *testing.T) { withSpanReader(func(r *spanReaderTest) { r.reader.operationNamesReader = func(string) ([]string, error) { return []string{"operation-a"}, nil } - s, err := r.reader.GetOperations("service-x") + s, err := r.reader.GetOperations(context.Background(), "service-x") assert.NoError(t, err) assert.Equal(t, []string{"operation-a"}, s) }) @@ -117,7 +118,7 @@ func TestSpanReaderGetTrace(t *testing.T) { r.session.On("Query", mock.AnythingOfType("string"), matchEverything()).Return(query) - trace, err := r.reader.GetTrace(model.TraceID{}) + trace, err := r.reader.GetTrace(context.Background(), model.TraceID{}) if testCase.expectedErr == "" { assert.NoError(t, err) assert.NotNil(t, trace) @@ -143,7 +144,7 @@ func TestSpanReaderGetTrace_TraceNotFound(t *testing.T) { r.session.On("Query", mock.AnythingOfType("string"), matchEverything()).Return(query) - trace, err := r.reader.GetTrace(model.TraceID{}) + trace, err := r.reader.GetTrace(context.Background(), model.TraceID{}) assert.Nil(t, trace) assert.EqualError(t, err, "trace not found") }) @@ -151,7 +152,7 @@ func TestSpanReaderGetTrace_TraceNotFound(t *testing.T) { func TestSpanReaderFindTracesBadRequest(t *testing.T) { withSpanReader(func(r *spanReaderTest) { - _, err := r.reader.FindTraces(nil) + _, err := r.reader.FindTraces(context.Background(), nil) assert.Error(t, err) }) } @@ -353,7 +354,7 @@ func TestSpanReaderFindTraces(t *testing.T) { queryParams.DurationMax = time.Minute * 3 } - res, err := r.reader.FindTraces(queryParams) + res, err := r.reader.FindTraces(context.Background(), queryParams) if testCase.expectedError == "" { assert.NoError(t, err) assert.Len(t, res, testCase.expectedCount, "expecting certain number of traces") diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index 752d09a1b37..106588ab8af 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -114,7 +114,7 @@ func newSpanReader(client es.Client, logger *zap.Logger, maxLookback time.Durati } // GetTrace takes a traceID and returns a Trace associated with that traceID -func (s *SpanReader) GetTrace(traceID model.TraceID) (*model.Trace, error) { +func (s *SpanReader) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) { currentTime := time.Now() traces, err := s.multiRead([]string{traceID.String()}, currentTime.Add(-s.maxLookback), currentTime) if err != nil { @@ -167,14 +167,14 @@ func (s *SpanReader) indicesForTimeRange(indexName string, startTime time.Time, } // GetServices returns all services traced by Jaeger, ordered by frequency -func (s *SpanReader) GetServices() ([]string, error) { +func (s *SpanReader) GetServices(ctx context.Context) ([]string, error) { currentTime := time.Now() jaegerIndices := s.indicesForTimeRange(s.serviceIndexPrefix, currentTime.Add(-s.maxLookback), currentTime) return s.serviceOperationStorage.getServices(jaegerIndices) } // GetOperations returns all operations for a specific service traced by Jaeger -func (s *SpanReader) GetOperations(service string) ([]string, error) { +func (s *SpanReader) GetOperations(ctx context.Context, service string) ([]string, error) { currentTime := time.Now() jaegerIndices := s.indicesForTimeRange(s.serviceIndexPrefix, currentTime.Add(-s.maxLookback), currentTime) return s.serviceOperationStorage.getOperations(jaegerIndices, service) @@ -193,7 +193,7 @@ func bucketToStringArray(buckets []*elastic.AggregationBucketKeyItem) ([]string, } // FindTraces retrieves traces that match the traceQuery -func (s *SpanReader) FindTraces(traceQuery *spanstore.TraceQueryParameters) ([]*model.Trace, error) { +func (s *SpanReader) FindTraces(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) ([]*model.Trace, error) { if err := validateQuery(traceQuery); err != nil { return nil, err } diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index e1ace550b19..593d3c9df2f 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -15,6 +15,7 @@ package spanstore import ( + "context" "encoding/json" "errors" "testing" @@ -135,7 +136,7 @@ func TestSpanReader_GetTrace(t *testing.T) { }, }, nil) - trace, err := r.reader.GetTrace(model.NewTraceID(0, 1)) + trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) require.NoError(t, err) require.NotNil(t, trace) @@ -166,7 +167,7 @@ func TestSpanReader_SearchAfter(t *testing.T) { }, }, nil).Times(2) - trace, err := r.reader.GetTrace(model.NewTraceID(0, 1)) + trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) require.NoError(t, err) require.NotNil(t, trace) @@ -185,7 +186,7 @@ func TestSpanReader_GetTraceQueryError(t *testing.T) { Return(&elastic.MultiSearchResult{ Responses: []*elastic.SearchResult{}, }, nil) - trace, err := r.reader.GetTrace(model.NewTraceID(0, 1)) + trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) require.EqualError(t, err, "No trace with that ID found") require.Nil(t, trace) }) @@ -204,7 +205,7 @@ func TestSpanReader_GetTraceNilHits(t *testing.T) { }, }, nil) - trace, err := r.reader.GetTrace(model.NewTraceID(0, 1)) + trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) require.EqualError(t, err, "No trace with that ID found") require.Nil(t, trace) }) @@ -227,7 +228,7 @@ func TestSpanReader_GetTraceInvalidSpanError(t *testing.T) { }, }, nil) - trace, err := r.reader.GetTrace(model.NewTraceID(0, 1)) + trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) require.Error(t, err, "invalid span") require.Nil(t, trace) }) @@ -251,7 +252,7 @@ func TestSpanReader_GetTraceSpanConversionError(t *testing.T) { }, }, nil) - trace, err := r.reader.GetTrace(model.NewTraceID(0, 1)) + trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) require.Error(t, err, "span conversion error, because lacks elements") require.Nil(t, trace) }) @@ -392,9 +393,9 @@ func testGet(typ string, t *testing.T) { func returnSearchFunc(typ string, r *spanReaderTest) ([]string, error) { if typ == servicesAggregation { - return r.reader.GetServices() + return r.reader.GetServices(context.Background()) } else if typ == operationsAggregation { - return r.reader.GetOperations("someService") + return r.reader.GetOperations(context.Background(), "someService") } else if typ == traceIDAggregation { return r.reader.findTraceIDs(&spanstore.TraceQueryParameters{}) } @@ -460,7 +461,7 @@ func TestSpanReader_FindTraces(t *testing.T) { NumTraces: 1, } - traces, err := r.reader.FindTraces(traceQuery) + traces, err := r.reader.FindTraces(context.Background(), traceQuery) require.NoError(t, err) assert.Len(t, traces, 1) @@ -504,7 +505,7 @@ func TestSpanReader_FindTracesInvalidQuery(t *testing.T) { StartTimeMax: time.Now(), } - traces, err := r.reader.FindTraces(traceQuery) + traces, err := r.reader.FindTraces(context.Background(), traceQuery) require.Error(t, err) assert.Nil(t, traces) }) @@ -536,7 +537,7 @@ func TestSpanReader_FindTracesAggregationFailure(t *testing.T) { StartTimeMax: time.Now(), } - traces, err := r.reader.FindTraces(traceQuery) + traces, err := r.reader.FindTraces(context.Background(), traceQuery) require.Error(t, err) assert.Nil(t, traces) }) @@ -570,7 +571,7 @@ func TestSpanReader_FindTracesNoTraceIDs(t *testing.T) { StartTimeMax: time.Now(), } - traces, err := r.reader.FindTraces(traceQuery) + traces, err := r.reader.FindTraces(context.Background(), traceQuery) require.NoError(t, err) assert.Len(t, traces, 0) }) @@ -603,7 +604,7 @@ func TestSpanReader_FindTracesReadTraceFailure(t *testing.T) { StartTimeMax: time.Now(), } - traces, err := r.reader.FindTraces(traceQuery) + traces, err := r.reader.FindTraces(context.Background(), traceQuery) require.EqualError(t, err, "read error") assert.Len(t, traces, 0) }) @@ -641,7 +642,7 @@ func TestSpanReader_FindTracesSpanCollectionFailure(t *testing.T) { StartTimeMax: time.Now(), } - traces, err := r.reader.FindTraces(traceQuery) + traces, err := r.reader.FindTraces(context.Background(), traceQuery) require.Error(t, err) assert.Len(t, traces, 0) }) @@ -912,7 +913,7 @@ func TestSpanReader_GetEmptyIndex(t *testing.T) { NumTraces: 2, } - services, err := r.reader.FindTraces(traceQuery) + services, err := r.reader.FindTraces(context.Background(), traceQuery) require.NoError(t, err) assert.Empty(t, services) }) diff --git a/plugin/storage/es/spanstore/service_operation_test.go b/plugin/storage/es/spanstore/service_operation_test.go index e3517738b47..3a92300bd1e 100644 --- a/plugin/storage/es/spanstore/service_operation_test.go +++ b/plugin/storage/es/spanstore/service_operation_test.go @@ -15,6 +15,7 @@ package spanstore import ( + "context" "testing" "github.com/stretchr/testify/assert" @@ -105,7 +106,7 @@ func TestSpanReader_GetServicesEmptyIndex(t *testing.T) { Return(&elastic.MultiSearchResult{ Responses: []*elastic.SearchResult{}, }, nil) - services, err := r.reader.GetServices() + services, err := r.reader.GetServices(context.Background()) require.NoError(t, err) assert.Empty(t, services) }) @@ -119,7 +120,7 @@ func TestSpanReader_GetOperationsEmptyIndex(t *testing.T) { Return(&elastic.MultiSearchResult{ Responses: []*elastic.SearchResult{}, }, nil) - services, err := r.reader.GetOperations("foo") + services, err := r.reader.GetOperations(context.Background(), "foo") require.NoError(t, err) assert.Empty(t, services) }) diff --git a/plugin/storage/integration/integration_test.go b/plugin/storage/integration/integration_test.go index 1700e55e1b9..fffd0265311 100644 --- a/plugin/storage/integration/integration_test.go +++ b/plugin/storage/integration/integration_test.go @@ -16,6 +16,7 @@ package integration import ( "bytes" + "context" "encoding/json" "fmt" "io/ioutil" @@ -114,7 +115,7 @@ func (s *StorageIntegration) testGetServices(t *testing.T) { var actual []string found := s.waitForCondition(t, func(t *testing.T) bool { - actual, err := s.SpanReader.GetServices() + actual, err := s.SpanReader.GetServices(context.Background()) require.NoError(t, err) return assert.ObjectsAreEqualValues(expected, actual) }) @@ -136,7 +137,7 @@ func (s *StorageIntegration) testGetLargeSpan(t *testing.T) { var actual *model.Trace found := s.waitForCondition(t, func(t *testing.T) bool { var err error - actual, err = s.SpanReader.GetTrace(expectedTraceID) + actual, err = s.SpanReader.GetTrace(context.Background(), expectedTraceID) return err == nil && len(actual.Spans) == len(expected.Spans) }) if !assert.True(t, found) { @@ -154,7 +155,7 @@ func (s *StorageIntegration) testGetOperations(t *testing.T) { var actual []string found := s.waitForCondition(t, func(t *testing.T) bool { var err error - actual, err = s.SpanReader.GetOperations("example-service-1") + actual, err = s.SpanReader.GetOperations(context.Background(), "example-service-1") require.NoError(t, err) return assert.ObjectsAreEqualValues(expected, actual) }) @@ -175,7 +176,7 @@ func (s *StorageIntegration) testGetTrace(t *testing.T) { var actual *model.Trace found := s.waitForCondition(t, func(t *testing.T) bool { var err error - actual, err = s.SpanReader.GetTrace(expectedTraceID) + actual, err = s.SpanReader.GetTrace(context.Background(), expectedTraceID) if err != nil { t.Log(err) } @@ -224,7 +225,7 @@ func (s *StorageIntegration) findTracesByQuery(t *testing.T, query *spanstore.Tr var traces []*model.Trace found := s.waitForCondition(t, func(t *testing.T) bool { var err error - traces, err = s.SpanReader.FindTraces(query) + traces, err = s.SpanReader.FindTraces(context.Background(), query) if err == nil && tracesMatch(t, traces, expected) { return true } diff --git a/plugin/storage/integration/kafka_test.go b/plugin/storage/integration/kafka_test.go index 0d8c539bfbb..479fed11766 100644 --- a/plugin/storage/integration/kafka_test.go +++ b/plugin/storage/integration/kafka_test.go @@ -15,6 +15,7 @@ package integration import ( + "context" "os" "strconv" "testing" @@ -96,19 +97,19 @@ type ingester struct { traceStore *memory.Store } -func (r *ingester) GetTrace(traceID model.TraceID) (*model.Trace, error) { - return r.traceStore.GetTrace(traceID) +func (r *ingester) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) { + return r.traceStore.GetTrace(ctx, traceID) } -func (r *ingester) GetServices() ([]string, error) { +func (r *ingester) GetServices(ctx context.Context) ([]string, error) { return nil, nil } -func (r *ingester) GetOperations(service string) ([]string, error) { +func (r *ingester) GetOperations(ctx context.Context, service string) ([]string, error) { return nil, nil } -func (r *ingester) FindTraces(query *spanstore.TraceQueryParameters) ([]*model.Trace, error) { +func (r *ingester) FindTraces(ctx context.Context, query *spanstore.TraceQueryParameters) ([]*model.Trace, error) { return nil, nil } diff --git a/plugin/storage/memory/memory.go b/plugin/storage/memory/memory.go index c8fa9a06b63..e82c7210c09 100644 --- a/plugin/storage/memory/memory.go +++ b/plugin/storage/memory/memory.go @@ -15,6 +15,7 @@ package memory import ( + "context" "errors" "sync" "time" @@ -146,7 +147,7 @@ func (m *Store) WriteSpan(span *model.Span) error { } // GetTrace gets a trace -func (m *Store) GetTrace(traceID model.TraceID) (*model.Trace, error) { +func (m *Store) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) { m.RLock() defer m.RUnlock() retMe := m.traces[traceID] @@ -157,7 +158,7 @@ func (m *Store) GetTrace(traceID model.TraceID) (*model.Trace, error) { } // GetServices returns a list of all known services -func (m *Store) GetServices() ([]string, error) { +func (m *Store) GetServices(ctx context.Context) ([]string, error) { m.RLock() defer m.RUnlock() var retMe []string @@ -168,7 +169,7 @@ func (m *Store) GetServices() ([]string, error) { } // GetOperations returns the operations of a given service -func (m *Store) GetOperations(service string) ([]string, error) { +func (m *Store) GetOperations(ctx context.Context, service string) ([]string, error) { m.RLock() defer m.RUnlock() if operations, ok := m.operations[service]; ok { @@ -182,7 +183,7 @@ func (m *Store) GetOperations(service string) ([]string, error) { } // FindTraces returns all traces in the query parameters are satisfied by a trace's span -func (m *Store) FindTraces(query *spanstore.TraceQueryParameters) ([]*model.Trace, error) { +func (m *Store) FindTraces(ctx context.Context, query *spanstore.TraceQueryParameters) ([]*model.Trace, error) { m.RLock() defer m.RUnlock() var retMe []*model.Trace diff --git a/plugin/storage/memory/memory_test.go b/plugin/storage/memory/memory_test.go index e9e0a9a3813..f5e10e258c8 100644 --- a/plugin/storage/memory/memory_test.go +++ b/plugin/storage/memory/memory_test.go @@ -15,6 +15,7 @@ package memory import ( + "context" "testing" "time" @@ -198,7 +199,7 @@ func TestStoreWithLimit(t *testing.T) { func TestStoreGetTraceSuccess(t *testing.T) { withPopulatedMemoryStore(func(store *Store) { - trace, err := store.GetTrace(testingSpan.TraceID) + trace, err := store.GetTrace(context.Background(), testingSpan.TraceID) assert.NoError(t, err) assert.Len(t, trace.Spans, 1) assert.Equal(t, testingSpan, trace.Spans[0]) @@ -207,7 +208,7 @@ func TestStoreGetTraceSuccess(t *testing.T) { func TestStoreGetTraceFailure(t *testing.T) { withPopulatedMemoryStore(func(store *Store) { - trace, err := store.GetTrace(model.TraceID{}) + trace, err := store.GetTrace(context.Background(), model.TraceID{}) assert.EqualError(t, err, errTraceNotFound.Error()) assert.Nil(t, trace) }) @@ -215,7 +216,7 @@ func TestStoreGetTraceFailure(t *testing.T) { func TestStoreGetServices(t *testing.T) { withPopulatedMemoryStore(func(store *Store) { - serviceNames, err := store.GetServices() + serviceNames, err := store.GetServices(context.Background()) assert.NoError(t, err) assert.Len(t, serviceNames, 1) assert.EqualValues(t, testingSpan.Process.ServiceName, serviceNames[0]) @@ -224,7 +225,7 @@ func TestStoreGetServices(t *testing.T) { func TestStoreGetOperationsFound(t *testing.T) { withPopulatedMemoryStore(func(store *Store) { - operations, err := store.GetOperations(testingSpan.Process.ServiceName) + operations, err := store.GetOperations(context.Background(), testingSpan.Process.ServiceName) assert.NoError(t, err) assert.Len(t, operations, 1) assert.EqualValues(t, testingSpan.OperationName, operations[0]) @@ -233,7 +234,7 @@ func TestStoreGetOperationsFound(t *testing.T) { func TestStoreGetOperationsNotFound(t *testing.T) { withPopulatedMemoryStore(func(store *Store) { - operations, err := store.GetOperations("notAService") + operations, err := store.GetOperations(context.Background(), "notAService") assert.NoError(t, err) assert.Len(t, operations, 0) }) @@ -241,7 +242,7 @@ func TestStoreGetOperationsNotFound(t *testing.T) { func TestStoreGetEmptyTraceSet(t *testing.T) { withPopulatedMemoryStore(func(store *Store) { - traces, err := store.FindTraces(&spanstore.TraceQueryParameters{}) + traces, err := store.FindTraces(context.Background(), &spanstore.TraceQueryParameters{}) assert.NoError(t, err) assert.Len(t, traces, 0) }) @@ -313,7 +314,7 @@ func TestStoreGetTrace(t *testing.T) { for _, testS := range testStruct { withPopulatedMemoryStore(func(store *Store) { testS.query.NumTraces = 10 - traces, err := store.FindTraces(testS.query) + traces, err := store.FindTraces(context.Background(), testS.query) assert.NoError(t, err) if testS.traceFound { assert.Len(t, traces, 1) diff --git a/storage/spanstore/interface.go b/storage/spanstore/interface.go index 4b005d53ae8..eee40f1a57d 100644 --- a/storage/spanstore/interface.go +++ b/storage/spanstore/interface.go @@ -15,6 +15,7 @@ package spanstore import ( + "context" "errors" "time" @@ -33,10 +34,10 @@ var ( // Reader finds and loads traces and other data from storage. type Reader interface { - GetTrace(traceID model.TraceID) (*model.Trace, error) - GetServices() ([]string, error) - GetOperations(service string) ([]string, error) - FindTraces(query *TraceQueryParameters) ([]*model.Trace, error) + GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) + GetServices(ctx context.Context) ([]string, error) + GetOperations(ctx context.Context, service string) ([]string, error) + FindTraces(ctx context.Context, query *TraceQueryParameters) ([]*model.Trace, error) } // TraceQueryParameters contains parameters of a trace query. diff --git a/storage/spanstore/metrics/decorator.go b/storage/spanstore/metrics/decorator.go index f82f0cb3139..e277253c206 100644 --- a/storage/spanstore/metrics/decorator.go +++ b/storage/spanstore/metrics/decorator.go @@ -15,6 +15,7 @@ package metrics import ( + "context" "time" "github.com/uber/jaeger-lib/metrics" @@ -72,33 +73,33 @@ func buildQueryMetrics(namespace string, metricsFactory metrics.Factory) *queryM } // FindTraces implements spanstore.Reader#FindTraces -func (m *ReadMetricsDecorator) FindTraces(traceQuery *spanstore.TraceQueryParameters) ([]*model.Trace, error) { +func (m *ReadMetricsDecorator) FindTraces(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) ([]*model.Trace, error) { start := time.Now() - retMe, err := m.spanReader.FindTraces(traceQuery) + retMe, err := m.spanReader.FindTraces(ctx, traceQuery) m.findTracesMetrics.emit(err, time.Since(start), len(retMe)) return retMe, err } // GetTrace implements spanstore.Reader#GetTrace -func (m *ReadMetricsDecorator) GetTrace(traceID model.TraceID) (*model.Trace, error) { +func (m *ReadMetricsDecorator) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) { start := time.Now() - retMe, err := m.spanReader.GetTrace(traceID) + retMe, err := m.spanReader.GetTrace(ctx, traceID) m.getTraceMetrics.emit(err, time.Since(start), 1) return retMe, err } // GetServices implements spanstore.Reader#GetServices -func (m *ReadMetricsDecorator) GetServices() ([]string, error) { +func (m *ReadMetricsDecorator) GetServices(ctx context.Context) ([]string, error) { start := time.Now() - retMe, err := m.spanReader.GetServices() + retMe, err := m.spanReader.GetServices(ctx) m.getServicesMetrics.emit(err, time.Since(start), len(retMe)) return retMe, err } // GetOperations implements spanstore.Reader#GetOperations -func (m *ReadMetricsDecorator) GetOperations(service string) ([]string, error) { +func (m *ReadMetricsDecorator) GetOperations(ctx context.Context, service string) ([]string, error) { start := time.Now() - retMe, err := m.spanReader.GetOperations(service) + retMe, err := m.spanReader.GetOperations(ctx, service) m.getOperationsMetrics.emit(err, time.Since(start), len(retMe)) return retMe, err } diff --git a/storage/spanstore/metrics/decorator_test.go b/storage/spanstore/metrics/decorator_test.go index dbc68d97419..9be83f7a696 100644 --- a/storage/spanstore/metrics/decorator_test.go +++ b/storage/spanstore/metrics/decorator_test.go @@ -15,6 +15,7 @@ package metrics_test import ( + "context" "errors" "testing" @@ -33,13 +34,13 @@ func TestSuccessfulUnderlyingCalls(t *testing.T) { mockReader := mocks.Reader{} mrs := NewReadMetricsDecorator(&mockReader, mf) mockReader.On("GetServices").Return([]string{}, nil) - mrs.GetServices() + mrs.GetServices(context.Background()) mockReader.On("GetOperations", "something").Return([]string{}, nil) - mrs.GetOperations("something") + mrs.GetOperations(context.Background(), "something") mockReader.On("GetTrace", model.TraceID{}).Return(&model.Trace{}, nil) - mrs.GetTrace(model.TraceID{}) + mrs.GetTrace(context.Background(), model.TraceID{}) mockReader.On("FindTraces", &spanstore.TraceQueryParameters{}).Return([]*model.Trace{}, nil) - mrs.FindTraces(&spanstore.TraceQueryParameters{}) + mrs.FindTraces(context.Background(), &spanstore.TraceQueryParameters{}) counters, gauges := mf.Snapshot() expecteds := map[string]int64{ "get_operations.attempts": 1, @@ -90,13 +91,13 @@ func TestFailingUnderlyingCalls(t *testing.T) { mockReader := mocks.Reader{} mrs := NewReadMetricsDecorator(&mockReader, mf) mockReader.On("GetServices").Return(nil, errors.New("Failure")) - mrs.GetServices() + mrs.GetServices(context.Background()) mockReader.On("GetOperations", "something").Return(nil, errors.New("Failure")) - mrs.GetOperations("something") + mrs.GetOperations(context.Background(), "something") mockReader.On("GetTrace", model.TraceID{}).Return(nil, errors.New("Failure")) - mrs.GetTrace(model.TraceID{}) + mrs.GetTrace(context.Background(), model.TraceID{}) mockReader.On("FindTraces", &spanstore.TraceQueryParameters{}).Return(nil, errors.New("Failure")) - mrs.FindTraces(&spanstore.TraceQueryParameters{}) + mrs.FindTraces(context.Background(), &spanstore.TraceQueryParameters{}) counters, gauges := mf.Snapshot() expecteds := map[string]int64{ "get_operations.attempts": 1, diff --git a/storage/spanstore/mocks/Reader.go b/storage/spanstore/mocks/Reader.go index 236eef8bc7c..7653970609c 100644 --- a/storage/spanstore/mocks/Reader.go +++ b/storage/spanstore/mocks/Reader.go @@ -14,9 +14,14 @@ package mocks -import mock "github.com/stretchr/testify/mock" -import model "github.com/jaegertracing/jaeger/model" -import spanstore "github.com/jaegertracing/jaeger/storage/spanstore" +import ( + "context" + + mock "github.com/stretchr/testify/mock" + + model "github.com/jaegertracing/jaeger/model" + spanstore "github.com/jaegertracing/jaeger/storage/spanstore" +) // Reader is an autogenerated mock type for the Reader type type Reader struct { @@ -24,7 +29,7 @@ type Reader struct { } // FindTraces provides a mock function with given fields: query -func (_m *Reader) FindTraces(query *spanstore.TraceQueryParameters) ([]*model.Trace, error) { +func (_m *Reader) FindTraces(ctx context.Context, query *spanstore.TraceQueryParameters) ([]*model.Trace, error) { ret := _m.Called(query) var r0 []*model.Trace @@ -47,7 +52,7 @@ func (_m *Reader) FindTraces(query *spanstore.TraceQueryParameters) ([]*model.Tr } // GetOperations provides a mock function with given fields: service -func (_m *Reader) GetOperations(service string) ([]string, error) { +func (_m *Reader) GetOperations(ctx context.Context, service string) ([]string, error) { ret := _m.Called(service) var r0 []string @@ -70,7 +75,7 @@ func (_m *Reader) GetOperations(service string) ([]string, error) { } // GetServices provides a mock function with given fields: -func (_m *Reader) GetServices() ([]string, error) { +func (_m *Reader) GetServices(ctx context.Context) ([]string, error) { ret := _m.Called() var r0 []string @@ -93,7 +98,7 @@ func (_m *Reader) GetServices() ([]string, error) { } // GetTrace provides a mock function with given fields: traceID -func (_m *Reader) GetTrace(traceID model.TraceID) (*model.Trace, error) { +func (_m *Reader) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) { ret := _m.Called(traceID) var r0 *model.Trace