Skip to content

Commit

Permalink
server: batch large span stats requests
Browse files Browse the repository at this point in the history
Previously, span stats requests could only request a maximum
of 500 spans at a time, otherwise the request would return an error.

This commit adds transparent batching at both the system tenant's handler
and the secondary tenant's handler. The cluster setting
`server.span_stats.span_batch_limit` is used to control the
size of the batches. The default value has been raised to 1000.

Resolves cockroachdb#105638
Epic: CRDB-30635
Release note (ops change): Requests for database details or table details
from the UI, or usages of `SHOW RANGES WITH DETAILS` are no longer subject
to errors if the number of requested spans is too large.
  • Loading branch information
Zach Lite committed Aug 24, 2023
1 parent 1c1f08d commit e47fd1d
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 21 deletions.
2 changes: 1 addition & 1 deletion pkg/roachpb/span_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

// Put span statistics cluster settings here to avoid import cycle.

const DefaultSpanStatsSpanLimit = 500
const DefaultSpanStatsSpanLimit = 1000

// SpanStatsBatchLimit registers the maximum number of spans allowed in a
// span stats request payload.
Expand Down
80 changes: 80 additions & 0 deletions pkg/server/span_stats_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"fmt"
"strconv"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangestats"
Expand All @@ -25,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/rangedesc"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)

const MixedVersionErr = "span stats request - unable to service a mixed version request"
Expand Down Expand Up @@ -380,3 +382,81 @@ func isLegacyRequest(req *roachpb.SpanStatsRequest) bool {
// If the start/end key fields are not nil, we have a request using the old request format.
return req.StartKey != nil || req.EndKey != nil
}

// verifySpanStatsRequest returns an error if the request can not be serviced.
// Requests can not be serviced if the active cluster version is less than 23.1,
// or if the request is made using the pre 23.1 format.
func verifySpanStatsRequest(
ctx context.Context, req *roachpb.SpanStatsRequest, version clusterversion.Handle,
) error {

// If the cluster's active version is less than 23.1 return a mixed version error.
if !version.IsActive(ctx, clusterversion.V23_1) {
return errors.New(MixedVersionErr)
}

// If we receive a request using the old format.
if isLegacyRequest(req) {
// We want to force 23.1 callers to use the new format (e.g. Spans field).
if req.NodeID == "0" {
return errors.New(UnexpectedLegacyRequest)
}
// We want to error if we receive a legacy request from a 22.2
// node (e.g. during a mixed-version fanout).
return errors.New(MixedVersionErr)
}

return nil
}

// batchedSpanStats breaks the request spans down into batches that are
// batchSize large. impl is invoked for each batch. Then, responses from
// each batch are merged together and returned to the caller of this function.
func batchedSpanStats(
ctx context.Context,
req *roachpb.SpanStatsRequest,
impl func(
ctx context.Context, req *roachpb.SpanStatsRequest,
) (*roachpb.SpanStatsResponse, error),
batchSize int,
) (*roachpb.SpanStatsResponse, error) {

if len(req.Spans) == 0 {
return &roachpb.SpanStatsResponse{}, nil
}

if len(req.Spans) <= batchSize {
return impl(ctx, req)
}

// Allocate memory once for the required number of batches.
totalSpans := len(req.Spans)
batches := make([][]roachpb.Span, 0, (totalSpans+batchSize-1)/batchSize)
for batchSize < len(req.Spans) {
req.Spans, batches =
req.Spans[batchSize:], append(
batches,
req.Spans[0:batchSize:batchSize],
)
}
batches = append(batches, req.Spans)

res := &roachpb.SpanStatsResponse{}
res.SpanToStats = make(map[string]*roachpb.SpanStats, totalSpans)

for _, batch := range batches {
req.Spans = batch
batchRes, batchErr := impl(ctx, req)
if batchErr != nil {
return nil, batchErr
}

for k, v := range batchRes.SpanToStats {
res.SpanToStats[k] = v
}

res.Errors = append(res.Errors, batchRes.Errors...)
}

return res, nil
}
94 changes: 94 additions & 0 deletions pkg/server/span_stats_server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package server

import (
"context"
"encoding/binary"
"testing"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/require"
)

func makeSpans(n int) roachpb.Spans {
spans := make([]roachpb.Span, 0, n)

for i := 0; i < n; i++ {
startKey := make(roachpb.Key, 8)
endKey := make(roachpb.Key, 8)
binary.LittleEndian.PutUint64(startKey, uint64(i))
binary.LittleEndian.PutUint64(endKey, uint64(i+1))
spans = append(spans, roachpb.Span{Key: startKey, EndKey: endKey})
}

return spans
}

func TestSpanStatsBatching(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

type testCase struct {
nSpans int
limit int
expectedBatches int
}

testCases := []testCase{
{nSpans: 0, limit: 100, expectedBatches: 0},
{nSpans: 10, limit: 100, expectedBatches: 1},
{nSpans: 100, limit: 100, expectedBatches: 1},
{nSpans: 101, limit: 100, expectedBatches: 2},
{nSpans: 200, limit: 100, expectedBatches: 2},
}

for _, tcase := range testCases {
numBatches := 0
mockSpanStatsEndpoint := func(
ctx context.Context,
req *roachpb.SpanStatsRequest,
) (*roachpb.SpanStatsResponse, error) {
numBatches++

res := &roachpb.SpanStatsResponse{}
res.SpanToStats = make(map[string]*roachpb.SpanStats)

// Provide a response for all spans, and an error per
// invocation. This lets us confirm that all batched
// responses make their way back to the caller.
for _, sp := range req.Spans {
res.SpanToStats[sp.String()] = &roachpb.SpanStats{}
}
res.Errors = append(res.Errors, "some error")
return res, nil
}

req := &roachpb.SpanStatsRequest{}
req.Spans = makeSpans(tcase.nSpans)
res, err := batchedSpanStats(ctx, req, mockSpanStatsEndpoint, tcase.limit)
require.NoError(t, err)

// Assert that the mocked span stats function is invoked the correct
// number of times.
require.Equal(t, tcase.expectedBatches, numBatches)

// Assert that the responses from each batch are returned to the caller.
require.Equal(t, tcase.nSpans, len(res.SpanToStats))

// Assert that the errors from each batch are returned to the caller.
require.Equal(t, tcase.expectedBatches, len(res.Errors))
}

}
31 changes: 11 additions & 20 deletions pkg/server/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/build"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
Expand Down Expand Up @@ -3625,7 +3624,13 @@ func (s *statusServer) SpanStats(
// already returns a proper gRPC error status.
return nil, err
}
return s.sqlServer.tenantConnect.SpanStats(ctx, req)

if err := verifySpanStatsRequest(ctx, req, s.st.Version); err != nil {
return nil, err
}

batchSize := int(roachpb.SpanStatsBatchLimit.Get(&s.st.SV))
return batchedSpanStats(ctx, req, s.sqlServer.tenantConnect.SpanStats, batchSize)
}

func (s *systemStatusServer) SpanStats(
Expand All @@ -3639,26 +3644,12 @@ func (s *systemStatusServer) SpanStats(
return nil, err
}

// If the cluster's active version is less than 23.1 return a mixed version error.
if !s.st.Version.IsActive(ctx, clusterversion.V23_1) {
return nil, errors.New(MixedVersionErr)
}

// If we receive a request using the old format.
if isLegacyRequest(req) {
// We want to force 23.1 callers to use the new format (e.g. Spans field).
if req.NodeID == "0" {
return nil, errors.New(UnexpectedLegacyRequest)
}
// We want to error if we receive a legacy request from a 22.2
// node (e.g. during a mixed-version fanout).
return nil, errors.New(MixedVersionErr)
}
if len(req.Spans) > int(roachpb.SpanStatsBatchLimit.Get(&s.st.SV)) {
return nil, errors.Newf(exceedSpanLimitPlaceholder, len(req.Spans), int(roachpb.SpanStatsBatchLimit.Get(&s.st.SV)))
if err := verifySpanStatsRequest(ctx, req, s.st.Version); err != nil {
return nil, err
}

return s.getSpanStatsInternal(ctx, req)
batchSize := int(roachpb.SpanStatsBatchLimit.Get(&s.st.SV))
return batchedSpanStats(ctx, req, s.getSpanStatsInternal, batchSize)
}

// Diagnostics returns an anonymized diagnostics report.
Expand Down

0 comments on commit e47fd1d

Please sign in to comment.