Skip to content

Commit

Permalink
[dbnode] Remove per-series traces on read queries (#3074)
Browse files Browse the repository at this point in the history
  • Loading branch information
nbroyles authored Jan 10, 2021
1 parent 4bad816 commit 644bac3
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 51 deletions.
36 changes: 4 additions & 32 deletions src/dbnode/network/server/tchannelthrift/node/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -844,7 +844,7 @@ func (s *service) fetchReadResults(
defer sp.Finish()

for idx := range response.Elements {
segments, rpcErr := s.readEncodedResult(ctx, nsID, encodedDataResults[idx])
segments, rpcErr := s.readEncodedResult(ctx, encodedDataResults[idx])
if rpcErr != nil {
return rpcErr
}
Expand Down Expand Up @@ -1024,7 +1024,7 @@ func (s *service) FetchBatchRaw(tctx thrift.Context, req *rpc.FetchBatchRawReque
continue
}

segments, rpcErr := s.readEncodedResult(ctx, nsID, encodedResults[i].result)
segments, rpcErr := s.readEncodedResult(ctx, encodedResults[i].result)
if rpcErr != nil {
rawResult.Err = rpcErr
if tterrors.IsBadRequestError(rawResult.Err) {
Expand Down Expand Up @@ -1101,7 +1101,7 @@ func (s *service) FetchBatchRawV2(tctx thrift.Context, req *rpc.FetchBatchRawV2R
continue
}

segments, rpcErr := s.readEncodedResult(ctx, nsIdx, encodedResult)
segments, rpcErr := s.readEncodedResult(ctx, encodedResult)
if rpcErr != nil {
rawResult.Err = rpcErr
if tterrors.IsBadRequestError(rawResult.Err) {
Expand Down Expand Up @@ -2308,18 +2308,8 @@ func (s *service) newPooledID(

func (s *service) readEncodedResult(
ctx context.Context,
nsID ident.ID,
encoded [][]xio.BlockReader,
) ([]*rpc.Segments, *rpc.Error) {
ctx, sp, sampled := ctx.StartSampledTraceSpan(tracepoint.FetchReadSingleResult)
if sampled {
sp.LogFields(
opentracinglog.String("id", nsID.String()),
opentracinglog.Int("segmentCount", len(encoded)),
)
}
defer sp.Finish()

segments := s.pools.segmentsArray.Get()
segments = segmentsArr(segments).grow(len(encoded))
segments = segments[:0]
Expand All @@ -2328,7 +2318,7 @@ func (s *service) readEncodedResult(
}))

for _, readers := range encoded {
segment, err := s.readEncodedResultSegment(ctx, nsID, readers)
segment, err := s.readEncodedResultSegment(readers)
if err != nil {
return nil, err
}
Expand All @@ -2342,12 +2332,8 @@ func (s *service) readEncodedResult(
}

func (s *service) readEncodedResultSegment(
ctx context.Context,
nsID ident.ID,
readers []xio.BlockReader,
) (*rpc.Segments, *rpc.Error) {
ctx, sp, sampled := ctx.StartSampledTraceSpan(tracepoint.FetchReadSegment)
defer sp.Finish()
converted, err := convert.ToSegments(readers)
if err != nil {
return nil, convert.ToRPCError(err)
Expand All @@ -2356,20 +2342,6 @@ func (s *service) readEncodedResultSegment(
return nil, nil
}

if sampled {
sp.LogFields(
opentracinglog.String("id", nsID.String()),
opentracinglog.Int("blockCount", len(readers)),
opentracinglog.Int("unmergedCount", len(converted.Segments.Unmerged)),
)

if converted.Segments.Merged != nil {
sp.LogFields(
opentracinglog.Int64("mergedBlockSize", converted.Segments.Merged.GetBlockSize()),
opentracinglog.Int64("mergedStartTime", converted.Segments.Merged.GetStartTime()),
)
}
}
return converted.Segments, nil
}

Expand Down
13 changes: 5 additions & 8 deletions src/dbnode/network/server/tchannelthrift/node/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1651,15 +1651,12 @@ func TestServiceFetchTagged(t *testing.T) {

sp.Finish()
spans := mtr.FinishedSpans()
require.Len(t, spans, 8)

require.Len(t, spans, 4)
assert.Equal(t, tracepoint.FetchReadEncoded, spans[0].OperationName)
assert.Equal(t, tracepoint.FetchReadSegment, spans[1].OperationName)
assert.Equal(t, tracepoint.FetchReadSingleResult, spans[2].OperationName)
assert.Equal(t, tracepoint.FetchReadSegment, spans[3].OperationName)
assert.Equal(t, tracepoint.FetchReadSingleResult, spans[4].OperationName)
assert.Equal(t, tracepoint.FetchReadResults, spans[5].OperationName)
assert.Equal(t, tracepoint.FetchTagged, spans[6].OperationName)
assert.Equal(t, "root", spans[7].OperationName)
assert.Equal(t, tracepoint.FetchReadResults, spans[1].OperationName)
assert.Equal(t, tracepoint.FetchTagged, spans[2].OperationName)
assert.Equal(t, "root", spans[3].OperationName)
}

func TestServiceFetchTaggedIsOverloaded(t *testing.T) {
Expand Down
11 changes: 0 additions & 11 deletions src/dbnode/storage/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -927,17 +927,6 @@ func (d *db) ReadEncoded(
return nil, err
}

ctx, sp, sampled := ctx.StartSampledTraceSpan(tracepoint.DBReadEncoded)
if sampled {
sp.LogFields(
opentracinglog.String("namespace", namespace.String()),
opentracinglog.String("id", id.String()),
xopentracing.Time("start", start),
xopentracing.Time("end", end),
)
}

defer sp.Finish()
return n.ReadEncoded(ctx, id, start, end)
}

Expand Down

0 comments on commit 644bac3

Please sign in to comment.