Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add FindTraceID to the spanstore interface #1246

Merged
merged 2 commits into from
Dec 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions cmd/query/app/handler_archive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ import (

func TestGetArchivedTrace_NotFound(t *testing.T) {
mockReader := &spanstoremocks.Reader{}
mockReader.On("GetTrace", mock.AnythingOfType("model.TraceID")).
mockReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is context.valueCtx? Shouldn't it be context.Context?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an implementation of context.Context, I used the implementation because mock.AnythingOfType doesn't appear to work with interfaces. (See stretchr/testify#519)

Return(nil, spanstore.ErrTraceNotFound).Once()
for _, tc := range []spanstore.Reader{nil, mockReader} {
archiveReader := tc // capture loop var
t.Run(fmt.Sprint(archiveReader), func(t *testing.T) {
withTestServer(t, func(ts *testServer) {
ts.spanReader.On("GetTrace", mock.AnythingOfType("model.TraceID")).
ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
Return(nil, spanstore.ErrTraceNotFound).Once()
var response structuredResponse
err := getJSON(ts.server.URL+"/api/traces/"+mockTraceID.String(), &response)
Expand All @@ -50,11 +50,11 @@ func TestGetArchivedTrace_NotFound(t *testing.T) {
func TestGetArchivedTraceSuccess(t *testing.T) {
traceID := model.NewTraceID(0, 123456)
mockReader := &spanstoremocks.Reader{}
mockReader.On("GetTrace", mock.AnythingOfType("model.TraceID")).
mockReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
Return(mockTrace, nil).Once()
withTestServer(t, func(ts *testServer) {
// make main reader return NotFound
ts.spanReader.On("GetTrace", mock.AnythingOfType("model.TraceID")).
ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
Return(nil, spanstore.ErrTraceNotFound).Once()
var response structuredTraceResponse
err := getJSON(ts.server.URL+"/api/traces/"+mockTraceID.String(), &response)
Expand All @@ -78,7 +78,7 @@ func TestArchiveTrace_Success(t *testing.T) {
mockWriter.On("WriteSpan", mock.AnythingOfType("*model.Span")).
Return(nil).Times(2)
withTestServer(t, func(ts *testServer) {
ts.spanReader.On("GetTrace", mock.AnythingOfType("model.TraceID")).
ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
Return(mockTrace, nil).Once()
var response structuredResponse
err := postJSON(ts.server.URL+"/api/archive/"+mockTraceID.String(), []string{}, &response)
Expand All @@ -91,7 +91,7 @@ func TestArchiveTrace_WriteErrors(t *testing.T) {
mockWriter.On("WriteSpan", mock.AnythingOfType("*model.Span")).
Return(errors.New("cannot save")).Times(2)
withTestServer(t, func(ts *testServer) {
ts.spanReader.On("GetTrace", mock.AnythingOfType("model.TraceID")).
ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
Return(mockTrace, nil).Once()
var response structuredResponse
err := postJSON(ts.server.URL+"/api/archive/"+mockTraceID.String(), []string{}, &response)
Expand Down
48 changes: 24 additions & 24 deletions cmd/query/app/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func withTestServer(t *testing.T, doTest func(s *testServer), options ...Handler
func TestGetTraceSuccess(t *testing.T) {
server, readMock, _ := initializeTestServer()
defer server.Close()
readMock.On("GetTrace", mock.AnythingOfType("model.TraceID")).
readMock.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
Return(mockTrace, nil).Once()

var response structuredResponse
Expand Down Expand Up @@ -252,7 +252,7 @@ func TestGetTrace(t *testing.T) {
server, readMock, _ := initializeTestServer(HandlerOptions.Tracer(jaegerTracer))
defer server.Close()

readMock.On("GetTrace", model.NewTraceID(0, 0x123456abc)).
readMock.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), model.NewTraceID(0, 0x123456abc)).
Return(makeMockTrace(t), nil).Once()

var response structuredResponse
Expand All @@ -273,7 +273,7 @@ func TestGetTrace(t *testing.T) {
func TestGetTraceDBFailure(t *testing.T) {
server, readMock, _ := initializeTestServer()
defer server.Close()
readMock.On("GetTrace", mock.AnythingOfType("model.TraceID")).
readMock.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
Return(nil, errStorage).Once()

var response structuredResponse
Expand All @@ -284,7 +284,7 @@ func TestGetTraceDBFailure(t *testing.T) {
func TestGetTraceNotFound(t *testing.T) {
server, readMock, _ := initializeTestServer()
defer server.Close()
readMock.On("GetTrace", mock.AnythingOfType("model.TraceID")).
readMock.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
Return(nil, spanstore.ErrTraceNotFound).Once()

var response structuredResponse
Expand All @@ -301,7 +301,7 @@ func TestGetTraceAdjustmentFailure(t *testing.T) {
),
)
defer server.Close()
readMock.On("GetTrace", mock.AnythingOfType("model.TraceID")).
readMock.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
Return(mockTrace, nil).Once()

var response structuredResponse
Expand All @@ -323,7 +323,7 @@ func TestGetTraceBadTraceID(t *testing.T) {
func TestSearchSuccess(t *testing.T) {
server, readMock, _ := initializeTestServer()
defer server.Close()
readMock.On("FindTraces", mock.AnythingOfType("*spanstore.TraceQueryParameters")).
readMock.On("FindTraces", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*spanstore.TraceQueryParameters")).
Return([]*model.Trace{mockTrace}, nil).Once()

var response structuredResponse
Expand All @@ -335,7 +335,7 @@ func TestSearchSuccess(t *testing.T) {
func TestSearchByTraceIDSuccess(t *testing.T) {
server, readMock, _ := initializeTestServer()
defer server.Close()
readMock.On("GetTrace", mock.AnythingOfType("model.TraceID")).
readMock.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
Return(mockTrace, nil).Twice()

var response structuredResponse
Expand All @@ -349,9 +349,9 @@ func TestSearchByTraceIDSuccessWithArchive(t *testing.T) {
archiveReadMock := &spanstoremocks.Reader{}
server, readMock, _ := initializeTestServer(HandlerOptions.ArchiveSpanReader(archiveReadMock))
defer server.Close()
readMock.On("GetTrace", mock.AnythingOfType("model.TraceID")).
readMock.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
Return(nil, spanstore.ErrTraceNotFound).Twice()
archiveReadMock.On("GetTrace", mock.AnythingOfType("model.TraceID")).
archiveReadMock.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
Return(mockTrace, nil).Twice()

var response structuredResponse
Expand All @@ -364,7 +364,7 @@ func TestSearchByTraceIDSuccessWithArchive(t *testing.T) {
func TestSearchByTraceIDNotFound(t *testing.T) {
server, readMock, _ := initializeTestServer()
defer server.Close()
readMock.On("GetTrace", mock.AnythingOfType("model.TraceID")).
readMock.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
Return(nil, spanstore.ErrTraceNotFound).Once()

var response structuredResponse
Expand All @@ -378,7 +378,7 @@ func TestSearchByTraceIDFailure(t *testing.T) {
server, readMock, _ := initializeTestServer()
defer server.Close()
whatsamattayou := "https://youtu.be/WrKFOCg13QQ"
readMock.On("GetTrace", mock.AnythingOfType("model.TraceID")).
readMock.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).
Return(nil, fmt.Errorf(whatsamattayou)).Once()

var response structuredResponse
Expand All @@ -395,7 +395,7 @@ func TestSearchModelConversionFailure(t *testing.T) {
),
)
defer server.Close()
readMock.On("FindTraces", mock.AnythingOfType("*spanstore.TraceQueryParameters")).
readMock.On("FindTraces", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*spanstore.TraceQueryParameters")).
Return([]*model.Trace{mockTrace}, nil).Once()
var response structuredResponse
err := getJSON(server.URL+`/api/traces?service=service&start=0&end=0&operation=operation&limit=200&minDuration=20ms`, &response)
Expand All @@ -407,7 +407,7 @@ func TestSearchModelConversionFailure(t *testing.T) {
func TestSearchDBFailure(t *testing.T) {
server, readMock, _ := initializeTestServer()
defer server.Close()
readMock.On("FindTraces", mock.AnythingOfType("*spanstore.TraceQueryParameters")).
readMock.On("FindTraces", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*spanstore.TraceQueryParameters")).
Return(nil, fmt.Errorf("whatsamattayou")).Once()

var response structuredResponse
Expand Down Expand Up @@ -449,7 +449,7 @@ func TestGetServicesSuccess(t *testing.T) {
server, readMock, _ := initializeTestServer()
defer server.Close()
expectedServices := []string{"trifle", "bling"}
readMock.On("GetServices").Return(expectedServices, nil).Once()
readMock.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil).Once()

var response structuredResponse
err := getJSON(server.URL+"/api/services", &response)
Expand All @@ -462,20 +462,20 @@ func TestGetServicesSuccess(t *testing.T) {
}

func TestGetServicesStorageFailure(t *testing.T) {
server, mock, _ := initializeTestServer()
server, readMock, _ := initializeTestServer()
defer server.Close()
mock.On("GetServices").Return(nil, errStorage).Once()
readMock.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(nil, errStorage).Once()

var response structuredResponse
err := getJSON(server.URL+"/api/services", &response)
assert.Error(t, err)
}

func TestGetOperationsSuccess(t *testing.T) {
server, mock, _ := initializeTestServer()
server, readMock, _ := initializeTestServer()
defer server.Close()
expectedOperations := []string{"", "get"}
mock.On("GetOperations", "abc/trifle").Return(expectedOperations, nil).Once()
readMock.On("GetOperations", mock.AnythingOfType("*context.valueCtx"), "abc/trifle").Return(expectedOperations, nil).Once()

var response structuredResponse
err := getJSON(server.URL+"/api/operations?service=abc%2Ftrifle", &response)
Expand All @@ -497,20 +497,20 @@ func TestGetOperationsNoServiceName(t *testing.T) {
}

func TestGetOperationsStorageFailure(t *testing.T) {
server, mock, _ := initializeTestServer()
server, reaMock, _ := initializeTestServer()
defer server.Close()
mock.On("GetOperations", "trifle").Return(nil, errStorage).Once()
reaMock.On("GetOperations", mock.AnythingOfType("*context.valueCtx"), "trifle").Return(nil, errStorage).Once()

var response structuredResponse
err := getJSON(server.URL+"/api/operations?service=trifle", &response)
assert.Error(t, err)
}

func TestGetOperationsLegacySuccess(t *testing.T) {
server, mock, _ := initializeTestServer()
server, readMock, _ := initializeTestServer()
defer server.Close()
expectedOperations := []string{"", "get"}
mock.On("GetOperations", "abc/trifle").Return(expectedOperations, nil).Once()
readMock.On("GetOperations", mock.AnythingOfType("*context.valueCtx"), "abc/trifle").Return(expectedOperations, nil).Once()

var response structuredResponse
err := getJSON(server.URL+"/api/services/abc%2Ftrifle/operations", &response)
Expand All @@ -523,9 +523,9 @@ func TestGetOperationsLegacySuccess(t *testing.T) {
}

func TestGetOperationsLegacyStorageFailure(t *testing.T) {
server, mock, _ := initializeTestServer()
server, readMock, _ := initializeTestServer()
defer server.Close()
mock.On("GetOperations", "trifle").Return(nil, errStorage).Once()
readMock.On("GetOperations", mock.AnythingOfType("*context.valueCtx"), "trifle").Return(nil, errStorage).Once()

var response structuredResponse
err := getJSON(server.URL+"/api/services/trifle/operations", &response)
Expand Down
37 changes: 26 additions & 11 deletions plugin/storage/cassandra/spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,29 +232,44 @@ func validateQuery(p *spanstore.TraceQueryParameters) error {

// FindTraces retrieves traces that match the traceQuery
func (s *SpanReader) FindTraces(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) ([]*model.Trace, error) {
uniqueTraceIDs, err := s.FindTraceIDs(ctx, traceQuery)
if err != nil {
return nil, err
}
var retMe []*model.Trace
for _, traceID := range uniqueTraceIDs {
jTrace, err := s.GetTrace(ctx, traceID)
if err != nil {
s.logger.Error("Failure to read trace", zap.String("trace_id", traceID.String()), zap.Error(err))
continue
}
retMe = append(retMe, jTrace)
}
return retMe, nil
}

// FindTraceIDs retrieve traceIDs that match the traceQuery
func (s *SpanReader) FindTraceIDs(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) ([]model.TraceID, error) {
if err := validateQuery(traceQuery); err != nil {
return nil, err
}
if traceQuery.NumTraces == 0 {
traceQuery.NumTraces = defaultNumTraces
}
uniqueTraceIDs, err := s.findTraceIDs(ctx, traceQuery)

dbTraceIDs, err := s.findTraceIDs(ctx, traceQuery)
if err != nil {
return nil, err
}
var retMe []*model.Trace
for traceID := range uniqueTraceIDs {
if len(retMe) >= traceQuery.NumTraces {

var traceIDs []model.TraceID
for t := range dbTraceIDs {
if len(traceIDs) >= traceQuery.NumTraces {
break
}
jTrace, err := s.readTrace(ctx, traceID)
if err != nil {
s.logger.Error("Failure to read trace", zap.String("trace_id", traceID.String()), zap.Error(err))
continue
}
retMe = append(retMe, jTrace)
traceIDs = append(traceIDs, t.ToDomain())
}
return retMe, nil
return traceIDs, nil
}

func (s *SpanReader) findTraceIDs(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) {
Expand Down
4 changes: 4 additions & 0 deletions plugin/storage/es/spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,10 @@ func (s *SpanReader) FindTraces(ctx context.Context, traceQuery *spanstore.Trace
return s.multiRead(ctx, uniqueTraceIDs, traceQuery.StartTimeMin, traceQuery.StartTimeMax)
}

func (s *SpanReader) FindTraceIDs(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) ([]model.TraceID, error) {
return nil, errors.New("not implemented") // TODO: Implement
}

func (s *SpanReader) multiRead(ctx context.Context, traceIDs []string, startTime, endTime time.Time) ([]*model.Trace, error) {

childSpan, _ := opentracing.StartSpanFromContext(ctx, "multiRead")
Expand Down
8 changes: 8 additions & 0 deletions plugin/storage/es/spanstore/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,14 @@ func TestFindTraceIDs(t *testing.T) {
testGet(traceIDAggregation, t)
}

func TestFindTraceIDNotImplemented(t *testing.T) {
withSpanReader(func(r *spanReaderTest) {
traceIDs, err := r.reader.FindTraceIDs(context.Background(), nil)
assert.Nil(t, traceIDs)
assert.EqualError(t, err, "not implemented")
})
}

func mockMultiSearchService(r *spanReaderTest) *mock.Call {
multiSearchService := &mocks.MultiSearchService{}
multiSearchService.On("Add", mock.Anything, mock.Anything, mock.Anything).Return(multiSearchService)
Expand Down
4 changes: 4 additions & 0 deletions plugin/storage/integration/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ func (r *ingester) FindTraces(ctx context.Context, query *spanstore.TraceQueryPa
return nil, nil
}

func (r *ingester) FindTraceIDs(ctx context.Context, query *spanstore.TraceQueryParameters) ([]model.TraceID, error) {
return nil, nil
}

func TestKafkaStorage(t *testing.T) {
if os.Getenv("STORAGE") != "kafka" {
t.Skip("Integration test against kafka skipped; set STORAGE env var to kafka to run this")
Expand Down
4 changes: 4 additions & 0 deletions plugin/storage/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,10 @@ func (m *Store) FindTraces(ctx context.Context, query *spanstore.TraceQueryParam
return retMe, nil
}

func (m *Store) FindTraceIDs(ctx context.Context, query *spanstore.TraceQueryParameters) ([]model.TraceID, error) {
return nil, errors.New("not implemented")
}

func (m *Store) validTrace(trace *model.Trace, query *spanstore.TraceQueryParameters) bool {
for _, span := range trace.Spans {
if m.validSpan(span, query) {
Expand Down
8 changes: 8 additions & 0 deletions plugin/storage/memory/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,3 +326,11 @@ func TestStoreGetTrace(t *testing.T) {
})
}
}

func TestStore_FindTraceIDs(t *testing.T) {
withMemoryStore(func(store *Store) {
traceIDs, err := store.FindTraceIDs(context.Background(), nil)
assert.Nil(t, traceIDs)
assert.EqualError(t, err, "not implemented")
})
}
1 change: 1 addition & 0 deletions storage/spanstore/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type Reader interface {
GetServices(ctx context.Context) ([]string, error)
GetOperations(ctx context.Context, service string) ([]string, error)
FindTraces(ctx context.Context, query *TraceQueryParameters) ([]*model.Trace, error)
FindTraceIDs(ctx context.Context, query *TraceQueryParameters) ([]model.TraceID, error)
}

// TraceQueryParameters contains parameters of a trace query.
Expand Down
12 changes: 11 additions & 1 deletion storage/spanstore/metrics/decorator.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
type ReadMetricsDecorator struct {
spanReader spanstore.Reader
findTracesMetrics *queryMetrics
findTraceIDsMetrics *queryMetrics
getTraceMetrics *queryMetrics
getServicesMetrics *queryMetrics
getOperationsMetrics *queryMetrics
Expand All @@ -36,7 +37,7 @@ type ReadMetricsDecorator struct {
type queryMetrics struct {
Errors metrics.Counter `metric:"requests" tags:"result=err"`
Successes metrics.Counter `metric:"requests" tags:"result=ok"`
Responses metrics.Timer `metric:"responses"` //used as a histogram, not necessary for GetTrace
Responses metrics.Timer `metric:"responses"` // used as a histogram, not necessary for GetTrace
ErrLatency metrics.Timer `metric:"latency" tags:"result=err"`
OKLatency metrics.Timer `metric:"latency" tags:"result=ok"`
}
Expand All @@ -57,6 +58,7 @@ func NewReadMetricsDecorator(spanReader spanstore.Reader, metricsFactory metrics
return &ReadMetricsDecorator{
spanReader: spanReader,
findTracesMetrics: buildQueryMetrics("find_traces", metricsFactory),
findTraceIDsMetrics: buildQueryMetrics("find_trace_ids", metricsFactory),
getTraceMetrics: buildQueryMetrics("get_trace", metricsFactory),
getServicesMetrics: buildQueryMetrics("get_services", metricsFactory),
getOperationsMetrics: buildQueryMetrics("get_operations", metricsFactory),
Expand All @@ -78,6 +80,14 @@ func (m *ReadMetricsDecorator) FindTraces(ctx context.Context, traceQuery *spans
return retMe, err
}

// FindTraceIDs implements spanstore.Reader#FindTraceIDs
func (m *ReadMetricsDecorator) FindTraceIDs(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) ([]model.TraceID, error) {
start := time.Now()
retMe, err := m.spanReader.FindTraceIDs(ctx, traceQuery)
m.findTraceIDsMetrics.emit(err, time.Since(start), len(retMe))
return retMe, err
}

// GetTrace implements spanstore.Reader#GetTrace
func (m *ReadMetricsDecorator) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) {
start := time.Now()
Expand Down
Loading