Skip to content

Commit

Permalink
Enable tracing of Cassandra queries
Browse files Browse the repository at this point in the history
Signed-off-by: Yuri Shkuro <[email protected]>
  • Loading branch information
Yuri Shkuro committed Sep 4, 2018
1 parent 5a67504 commit bae8e25
Show file tree
Hide file tree
Showing 15 changed files with 177 additions and 115 deletions.
22 changes: 12 additions & 10 deletions cmd/query/app/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package app

import (
"context"
"encoding/json"
"fmt"
"net/http"
Expand Down Expand Up @@ -153,7 +154,7 @@ func (aH *APIHandler) route(route string, args ...interface{}) string {
}

func (aH *APIHandler) getServices(w http.ResponseWriter, r *http.Request) {
services, err := aH.spanReader.GetServices()
services, err := aH.spanReader.GetServices(r.Context())
if aH.handleError(w, err, http.StatusInternalServerError) {
return
}
Expand All @@ -168,7 +169,7 @@ func (aH *APIHandler) getOperationsLegacy(w http.ResponseWriter, r *http.Request
vars := mux.Vars(r)
// given how getOperationsLegacy is bound to URL route, serviceParam cannot be empty
service, _ := url.QueryUnescape(vars[serviceParam])
operations, err := aH.spanReader.GetOperations(service)
operations, err := aH.spanReader.GetOperations(r.Context(), service)
if aH.handleError(w, err, http.StatusInternalServerError) {
return
}
Expand All @@ -186,7 +187,7 @@ func (aH *APIHandler) getOperations(w http.ResponseWriter, r *http.Request) {
return
}
}
operations, err := aH.spanReader.GetOperations(service)
operations, err := aH.spanReader.GetOperations(r.Context(), service)
if aH.handleError(w, err, http.StatusInternalServerError) {
return
}
Expand All @@ -206,12 +207,12 @@ func (aH *APIHandler) search(w http.ResponseWriter, r *http.Request) {
var uiErrors []structuredError
var tracesFromStorage []*model.Trace
if len(tQuery.traceIDs) > 0 {
tracesFromStorage, uiErrors, err = aH.tracesByIDs(tQuery.traceIDs)
tracesFromStorage, uiErrors, err = aH.tracesByIDs(r.Context(), tQuery.traceIDs)
if aH.handleError(w, err, http.StatusInternalServerError) {
return
}
} else {
tracesFromStorage, err = aH.spanReader.FindTraces(&tQuery.TraceQueryParameters)
tracesFromStorage, err = aH.spanReader.FindTraces(r.Context(), &tQuery.TraceQueryParameters)
if aH.handleError(w, err, http.StatusInternalServerError) {
return
}
Expand All @@ -233,11 +234,11 @@ func (aH *APIHandler) search(w http.ResponseWriter, r *http.Request) {
aH.writeJSON(w, r, &structuredRes)
}

func (aH *APIHandler) tracesByIDs(traceIDs []model.TraceID) ([]*model.Trace, []structuredError, error) {
func (aH *APIHandler) tracesByIDs(ctx context.Context, traceIDs []model.TraceID) ([]*model.Trace, []structuredError, error) {
var errors []structuredError
retMe := make([]*model.Trace, 0, len(traceIDs))
for _, traceID := range traceIDs {
if trace, err := trace(traceID, aH.spanReader, aH.archiveSpanReader); err != nil {
if trace, err := trace(ctx, traceID, aH.spanReader, aH.archiveSpanReader); err != nil {
if err != spanstore.ErrTraceNotFound {
return nil, nil, err
}
Expand Down Expand Up @@ -399,7 +400,7 @@ func (aH *APIHandler) withTraceFromReader(
if !ok {
return
}
trace, err := trace(traceID, reader, backupReader)
trace, err := trace(r.Context(), traceID, reader, backupReader)
if err == spanstore.ErrTraceNotFound {
aH.handleError(w, err, http.StatusNotFound)
return
Expand All @@ -411,16 +412,17 @@ func (aH *APIHandler) withTraceFromReader(
}

func trace(
ctx context.Context,
traceID model.TraceID,
reader spanstore.Reader,
backupReader spanstore.Reader,
) (*model.Trace, error) {
trace, err := reader.GetTrace(traceID)
trace, err := reader.GetTrace(ctx, traceID)
if err == spanstore.ErrTraceNotFound {
if backupReader == nil {
return nil, err
}
trace, err = backupReader.GetTrace(traceID)
trace, err = backupReader.GetTrace(ctx, traceID)
}
return trace, err
}
Expand Down
16 changes: 9 additions & 7 deletions plugin/storage/cassandra/savetracetest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package main

import (
"context"
"time"

"github.com/uber/jaeger-lib/metrics"
Expand Down Expand Up @@ -50,7 +51,8 @@ func main() {
logger.Info("Saved span", zap.String("spanID", getSomeSpan().SpanID.String()))
}
s := getSomeSpan()
trace, err := spanReader.GetTrace(s.TraceID)
ctx := context.Background()
trace, err := spanReader.GetTrace(ctx, s.TraceID)
if err != nil {
logger.Fatal("Failed to read", zap.Error(err))
} else {
Expand All @@ -63,27 +65,27 @@ func main() {
StartTimeMax: time.Now().Add(time.Hour),
}
logger.Info("Check main query")
queryAndPrint(spanReader, tqp)
queryAndPrint(ctx, spanReader, tqp)

tqp.OperationName = "opName"
logger.Info("Check query with operation")
queryAndPrint(spanReader, tqp)
queryAndPrint(ctx, spanReader, tqp)

tqp.Tags = map[string]string{
"someKey": "someVal",
}
logger.Info("Check query with operation name and tags")
queryAndPrint(spanReader, tqp)
queryAndPrint(ctx, spanReader, tqp)

tqp.DurationMin = 0
tqp.DurationMax = time.Hour
tqp.Tags = map[string]string{}
logger.Info("check query with duration")
queryAndPrint(spanReader, tqp)
queryAndPrint(ctx, spanReader, tqp)
}

func queryAndPrint(spanReader *cSpanStore.SpanReader, tqp *spanstore.TraceQueryParameters) {
traces, err := spanReader.FindTraces(tqp)
func queryAndPrint(ctx context.Context, spanReader *cSpanStore.SpanReader, tqp *spanstore.TraceQueryParameters) {
traces, err := spanReader.FindTraces(ctx, tqp)
if err != nil {
logger.Fatal("Failed to query", zap.Error(err))
} else {
Expand Down
89 changes: 66 additions & 23 deletions plugin/storage/cassandra/spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@
package spanstore

import (
"context"
"time"

"github.com/opentracing/opentracing-go"
ottag "github.com/opentracing/opentracing-go/ext"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/pkg/errors"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"
Expand Down Expand Up @@ -132,17 +136,27 @@ func NewSpanReader(
}

// GetServices returns all services traced by Jaeger
func (s *SpanReader) GetServices() ([]string, error) {
func (s *SpanReader) GetServices(ctx context.Context) ([]string, error) {
return s.serviceNamesReader()

}

// GetOperations returns all operations for a specific service traced by Jaeger
func (s *SpanReader) GetOperations(service string) ([]string, error) {
func (s *SpanReader) GetOperations(ctx context.Context, service string) ([]string, error) {
return s.operationNamesReader(service)
}

func (s *SpanReader) readTrace(traceID dbmodel.TraceID) (*model.Trace, error) {
func (s *SpanReader) readTrace(ctx context.Context, traceID dbmodel.TraceID) (*model.Trace, error) {
span, ctx := startSpanForQuery(ctx, "readTrace", querySpanByTraceID)
defer span.Finish()
span.SetTag("traceID", traceID)

trace, err := s.readTraceInSpan(ctx, traceID)
logErrorToSpan(span, err)
return trace, err
}

func (s *SpanReader) readTraceInSpan(ctx context.Context, traceID dbmodel.TraceID) (*model.Trace, error) {
start := time.Now()
q := s.session.Query(querySpanByTraceID, traceID)
i := q.Iter()
Expand Down Expand Up @@ -191,8 +205,8 @@ func (s *SpanReader) readTrace(traceID dbmodel.TraceID) (*model.Trace, error) {
}

// GetTrace takes a traceID and returns a Trace associated with that traceID
func (s *SpanReader) GetTrace(traceID model.TraceID) (*model.Trace, error) {
return s.readTrace(dbmodel.TraceIDFromDomain(traceID))
func (s *SpanReader) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) {
return s.readTrace(ctx, dbmodel.TraceIDFromDomain(traceID))
}

func validateQuery(p *spanstore.TraceQueryParameters) error {
Expand All @@ -218,14 +232,14 @@ func validateQuery(p *spanstore.TraceQueryParameters) error {
}

// FindTraces retrieves traces that match the traceQuery
func (s *SpanReader) FindTraces(traceQuery *spanstore.TraceQueryParameters) ([]*model.Trace, error) {
func (s *SpanReader) FindTraces(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) ([]*model.Trace, error) {
if err := validateQuery(traceQuery); err != nil {
return nil, err
}
if traceQuery.NumTraces == 0 {
traceQuery.NumTraces = defaultNumTraces
}
uniqueTraceIDs, err := s.findTraceIDs(traceQuery)
uniqueTraceIDs, err := s.findTraceIDs(ctx, traceQuery)
if err != nil {
return nil, err
}
Expand All @@ -234,7 +248,7 @@ func (s *SpanReader) FindTraces(traceQuery *spanstore.TraceQueryParameters) ([]*
if len(retMe) >= traceQuery.NumTraces {
break
}
jTrace, err := s.readTrace(traceID)
jTrace, err := s.readTrace(ctx, traceID)
if err != nil {
s.logger.Error("Failure to read trace", zap.String("trace_id", traceID.String()), zap.Error(err))
continue
Expand All @@ -244,18 +258,18 @@ func (s *SpanReader) FindTraces(traceQuery *spanstore.TraceQueryParameters) ([]*
return retMe, nil
}

func (s *SpanReader) findTraceIDs(traceQuery *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) {
func (s *SpanReader) findTraceIDs(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) {
if traceQuery.DurationMin != 0 || traceQuery.DurationMax != 0 {
return s.queryByDuration(traceQuery)
return s.queryByDuration(ctx, traceQuery)
}

if traceQuery.OperationName != "" {
traceIds, err := s.queryByServiceNameAndOperation(traceQuery)
traceIds, err := s.queryByServiceNameAndOperation(ctx, traceQuery)
if err != nil {
return nil, err
}
if len(traceQuery.Tags) > 0 {
tagTraceIds, err := s.queryByTagsAndLogs(traceQuery)
tagTraceIds, err := s.queryByTagsAndLogs(ctx, traceQuery)
if err != nil {
return nil, err
}
Expand All @@ -267,12 +281,15 @@ func (s *SpanReader) findTraceIDs(traceQuery *spanstore.TraceQueryParameters) (d
return traceIds, nil
}
if len(traceQuery.Tags) > 0 {
return s.queryByTagsAndLogs(traceQuery)
return s.queryByTagsAndLogs(ctx, traceQuery)
}
return s.queryByService(traceQuery)
return s.queryByService(ctx, traceQuery)
}

func (s *SpanReader) queryByTagsAndLogs(tq *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) {
func (s *SpanReader) queryByTagsAndLogs(ctx context.Context, tq *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) {
span, ctx := startSpanForQuery(ctx, "queryByTagsAndLogs", queryByTag)
defer span.Finish()

results := make([]dbmodel.UniqueTraceIDs, 0, len(tq.Tags))
for k, v := range tq.Tags {
query := s.session.Query(
Expand All @@ -284,7 +301,8 @@ func (s *SpanReader) queryByTagsAndLogs(tq *spanstore.TraceQueryParameters) (dbm
model.TimeAsEpochMicroseconds(tq.StartTimeMax),
tq.NumTraces*limitMultiple,
).PageSize(0)
t, err := s.executeQuery(query, s.metrics.queryTagIndex)
// TODO should have span per iteration
t, err := s.executeQuery(span, query, s.metrics.queryTagIndex)
if err != nil {
return nil, err
}
Expand All @@ -293,7 +311,10 @@ func (s *SpanReader) queryByTagsAndLogs(tq *spanstore.TraceQueryParameters) (dbm
return dbmodel.IntersectTraceIDs(results), nil
}

func (s *SpanReader) queryByDuration(traceQuery *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) {
func (s *SpanReader) queryByDuration(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) {
span, ctx := startSpanForQuery(ctx, "queryByDuration", queryByDuration)
defer span.Finish()

results := dbmodel.UniqueTraceIDs{}

minDurationMicros := traceQuery.DurationMin.Nanoseconds() / int64(time.Microsecond/time.Nanosecond)
Expand All @@ -316,7 +337,8 @@ func (s *SpanReader) queryByDuration(traceQuery *spanstore.TraceQueryParameters)
minDurationMicros,
maxDurationMicros,
traceQuery.NumTraces*limitMultiple)
t, err := s.executeQuery(query, s.metrics.queryDurationIndex)
// TODO should have span for each iteration
t, err := s.executeQuery(span, query, s.metrics.queryDurationIndex)
if err != nil {
return nil, err
}
Expand All @@ -331,7 +353,9 @@ func (s *SpanReader) queryByDuration(traceQuery *spanstore.TraceQueryParameters)
return results, nil
}

func (s *SpanReader) queryByServiceNameAndOperation(tq *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) {
func (s *SpanReader) queryByServiceNameAndOperation(ctx context.Context, tq *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) {
span, ctx := startSpanForQuery(ctx, "queryByServiceNameAndOperation", queryByServiceAndOperationName)
defer span.Finish()
query := s.session.Query(
queryByServiceAndOperationName,
tq.ServiceName,
Expand All @@ -340,21 +364,23 @@ func (s *SpanReader) queryByServiceNameAndOperation(tq *spanstore.TraceQueryPara
model.TimeAsEpochMicroseconds(tq.StartTimeMax),
tq.NumTraces*limitMultiple,
).PageSize(0)
return s.executeQuery(query, s.metrics.queryServiceOperationIndex)
return s.executeQuery(span, query, s.metrics.queryServiceOperationIndex)
}

func (s *SpanReader) queryByService(tq *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) {
func (s *SpanReader) queryByService(ctx context.Context, tq *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) {
span, ctx := startSpanForQuery(ctx, "queryByService", queryByServiceName)
defer span.Finish()
query := s.session.Query(
queryByServiceName,
tq.ServiceName,
model.TimeAsEpochMicroseconds(tq.StartTimeMin),
model.TimeAsEpochMicroseconds(tq.StartTimeMax),
tq.NumTraces*limitMultiple,
).PageSize(0)
return s.executeQuery(query, s.metrics.queryServiceNameIndex)
return s.executeQuery(span, query, s.metrics.queryServiceNameIndex)
}

func (s *SpanReader) executeQuery(query cassandra.Query, tableMetrics *casMetrics.Table) (dbmodel.UniqueTraceIDs, error) {
func (s *SpanReader) executeQuery(span opentracing.Span, query cassandra.Query, tableMetrics *casMetrics.Table) (dbmodel.UniqueTraceIDs, error) {
start := time.Now()
i := query.Iter()
retMe := dbmodel.UniqueTraceIDs{}
Expand All @@ -365,8 +391,25 @@ func (s *SpanReader) executeQuery(query cassandra.Query, tableMetrics *casMetric
err := i.Close()
tableMetrics.Emit(err, time.Since(start))
if err != nil {
logErrorToSpan(span, err)
s.logger.Error("Failed to exec query", zap.Error(err))
return nil, err
}
return retMe, nil
}

func startSpanForQuery(ctx context.Context, name, query string) (opentracing.Span, context.Context) {
span, ctx := opentracing.StartSpanFromContext(ctx, name)
ottag.DBStatement.Set(span, query)
ottag.DBType.Set(span, "cassandra")
ottag.Component.Set(span, "gocql")
return span, ctx
}

func logErrorToSpan(span opentracing.Span, err error) {
if err == nil {
return
}
ottag.Error.Set(span, true)
span.LogFields(otlog.Error(err))
}
Loading

0 comments on commit bae8e25

Please sign in to comment.