Skip to content

Commit

Permalink
Merge #73697 #74921
Browse files Browse the repository at this point in the history
73697: kvclient: ignore stale lease information from lagging replicas r=aayushshah15 a=aayushshah15

This commit makes it such that the DistSender ignores the lease information
returned in `NotLeaseHolderError`s coming from replicas that have stale view of
the range descriptor (characterized by an older `DescriptorGeneration` on the
replica).

Not doing so before was hazardous because, if we received an NLHE that pointed
to a replica that did not belong in the cached descriptor, we'd trigger a cache
evicion. This assumed that the replica returning the error had a fresher view
of the range than what we had in the cache, which is not always true. This
meant that we'd keep doing range lookups and subsequent evictions until this
lagging replica caught up to the current state of the range.

Fixes #72772

Release note (bug fix): A bug that caused high SQL tail latencies during
background rebalancing in the cluster has been fixed.

74921: sql,kvstreamer: use Streamer with multiple column families r=yuzefovich a=yuzefovich

This commit takes advantage of the recently introduced `WholeRowsOfSize`
argument of the `BatchRequest`s (which allows for SQL rows not being
split across multiple responses when multiple column families are
present) to support multi-column family case in the `Streamer`.

Addresses: #54680.

Release note: None


Co-authored-by: Aayush Shah <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
3 people committed Jan 27, 2022
3 parents bb826d2 + 92f0260 + e51323b commit ba04380
Show file tree
Hide file tree
Showing 13 changed files with 304 additions and 90 deletions.
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2146,10 +2146,10 @@ func (ds *DistSender) sendToReplicas(

var updatedLeaseholder bool
if tErr.Lease != nil {
updatedLeaseholder = routing.UpdateLease(ctx, tErr.Lease)
updatedLeaseholder = routing.UpdateLease(ctx, tErr.Lease, tErr.DescriptorGeneration)
} else if tErr.LeaseHolder != nil {
// tErr.LeaseHolder might be set when tErr.Lease isn't.
routing.UpdateLeaseholder(ctx, *tErr.LeaseHolder)
routing.UpdateLeaseholder(ctx, *tErr.LeaseHolder, tErr.DescriptorGeneration)
updatedLeaseholder = true
}
// Move the new leaseholder to the head of the queue for the next
Expand Down
106 changes: 106 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -971,6 +971,112 @@ func TestDistSenderMovesOnFromReplicaWithStaleLease(t *testing.T) {
require.LessOrEqual(t, callsToNode2, 11)
}

// TestDistSenderIgnodesNLHEBasedOnOldRangeGeneration tests that a
// NotLeaseHolderError received from a replica that has a stale range descriptor
// version is ignored, and the next replica is attempted.
func TestDistSenderIgnoresNLHEBasedOnOldRangeGeneration(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
tracer := tracing.NewTracer()
ctx, finishAndGetRecording := tracing.ContextWithRecordingSpan(
context.Background(), tracer, "test",
)
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper)
g := makeGossip(t, stopper, rpcContext)
for _, n := range testUserRangeDescriptor3Replicas.Replicas().VoterDescriptors() {
require.NoError(t, g.AddInfoProto(
gossip.MakeNodeIDKey(n.NodeID),
newNodeDesc(n.NodeID),
gossip.NodeDescriptorTTL,
))
}

oldGeneration := roachpb.RangeGeneration(1)
newGeneration := roachpb.RangeGeneration(2)
desc := roachpb.RangeDescriptor{
RangeID: 1,
Generation: newGeneration,
StartKey: roachpb.RKeyMin,
EndKey: roachpb.RKeyMax,
InternalReplicas: []roachpb.ReplicaDescriptor{
{NodeID: 1, StoreID: 1, ReplicaID: 1},
{NodeID: 2, StoreID: 2, ReplicaID: 2},
{NodeID: 3, StoreID: 3, ReplicaID: 3},
},
}
// Ambiguous lease refers to a replica that is incompatible with the cached
// range descriptor.
ambiguousLease := roachpb.Lease{
Replica: roachpb.ReplicaDescriptor{NodeID: 4, StoreID: 4, ReplicaID: 4},
}
cachedLease := roachpb.Lease{
Replica: desc.InternalReplicas[1],
}

// The cache starts with a lease on node 2, so the first request will be
// routed there. That replica will reply with an NLHE with an old descriptor
// generation value, which should make the DistSender try the next replica.
var calls []roachpb.NodeID
sendFn := func(ctx context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, error) {
calls = append(calls, ba.Replica.NodeID)
if ba.Replica.NodeID == 2 {
reply := &roachpb.BatchResponse{}
err := &roachpb.NotLeaseHolderError{Lease: &ambiguousLease, DescriptorGeneration: oldGeneration}
reply.Error = roachpb.NewError(err)
return reply, nil
}
require.Equal(t, ba.Replica.NodeID, roachpb.NodeID(1))
return ba.CreateReply(), nil
}

cfg := DistSenderConfig{
AmbientCtx: log.AmbientContext{Tracer: tracer},
Clock: clock,
NodeDescs: g,
RPCContext: rpcContext,
TestingKnobs: ClientTestingKnobs{
TransportFactory: adaptSimpleTransport(sendFn),
},
RangeDescriptorDB: threeReplicaMockRangeDescriptorDB,
NodeDialer: nodedialer.New(rpcContext, gossip.AddressResolver(g)),
Settings: cluster.MakeTestingClusterSettings(),
}
ds := NewDistSender(cfg)

ds.rangeCache.Insert(ctx, roachpb.RangeInfo{
Desc: desc,
Lease: cachedLease,
})

get := roachpb.NewGet(roachpb.Key("a"), false /* forUpdate */)
_, pErr := kv.SendWrapped(ctx, ds, get)
require.Nil(t, pErr)

require.Equal(t, int64(0), ds.Metrics().RangeLookups.Count())
// We expect to backoff and retry the same replica 11 times when we get an
// NLHE with stale info. See `sameReplicaRetryLimit`.
require.Equal(t, int64(11), ds.Metrics().NextReplicaErrCount.Count())
require.Equal(t, int64(11), ds.Metrics().NotLeaseHolderErrCount.Count())

// Ensure that we called Node 2 11 times and then finally called Node 1.
var expectedCalls []roachpb.NodeID
for i := 0; i < 11; i++ {
expectedCalls = append(expectedCalls, roachpb.NodeID(2))
}
expectedCalls = append(expectedCalls, roachpb.NodeID(1))
require.Equal(t, expectedCalls, calls)

require.Regexp(
t,
"backing off due to .* stale info",
finishAndGetRecording().String(),
)
}

func TestDistSenderRetryOnTransportErrors(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvclient/kvstreamer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ go_test(
"//pkg/util/log",
"//pkg/util/mon",
"//pkg/util/randutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_dustin_go_humanize//:go-humanize",
"@com_github_stretchr_testify//require",
],
Expand Down
21 changes: 11 additions & 10 deletions pkg/kv/kvclient/kvstreamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,7 @@ func (r Result) Release(ctx context.Context) {
}
}

// Streamer provides a streaming oriented API for reading from the KV layer. At
// the moment the Streamer only works when SQL rows are comprised of a single KV
// (i.e. a single column family).
// TODO(yuzefovich): lift the restriction on a single column family once KV is
// updated so that rows are never split across different BatchResponses when
// TargetBytes limitBytes is exceeded.
// Streamer provides a streaming oriented API for reading from the KV layer.
//
// The example usage is roughly as follows:
//
Expand Down Expand Up @@ -207,9 +202,10 @@ type Streamer struct {
distSender *kvcoord.DistSender
stopper *stop.Stopper

mode OperationMode
hints Hints
budget *budget
mode OperationMode
hints Hints
maxKeysPerRow int32
budget *budget

coordinator workerCoordinator
coordinatorStarted bool
Expand Down Expand Up @@ -353,7 +349,10 @@ func NewStreamer(
// Hints can be used to hint the aggressiveness of the caching policy. In
// particular, it can be used to disable caching when the client knows that all
// looked-up keys are unique (e.g. in the case of an index-join).
func (s *Streamer) Init(mode OperationMode, hints Hints) {
//
// maxKeysPerRow indicates the maximum number of KV pairs that comprise a single
// SQL row (i.e. the number of column families in the index being scanned).
func (s *Streamer) Init(mode OperationMode, hints Hints, maxKeysPerRow int) {
if mode != OutOfOrder {
panic(errors.AssertionFailedf("only OutOfOrder mode is supported"))
}
Expand All @@ -362,6 +361,7 @@ func (s *Streamer) Init(mode OperationMode, hints Hints) {
panic(errors.AssertionFailedf("only unique requests are currently supported"))
}
s.hints = hints
s.maxKeysPerRow = int32(maxKeysPerRow)
s.waitForResults = make(chan struct{}, 1)
}

Expand Down Expand Up @@ -1008,6 +1008,7 @@ func (w *workerCoordinator) performRequestAsync(
ba.Header.WaitPolicy = w.lockWaitPolicy
ba.Header.TargetBytes = targetBytes
ba.Header.AllowEmpty = !headOfLine
ba.Header.WholeRowsOfSize = w.s.maxKeysPerRow
// TODO(yuzefovich): consider setting MaxSpanRequestKeys whenever
// applicable (#67885).
ba.AdmissionHeader = w.requestAdmissionHeader
Expand Down
96 changes: 91 additions & 5 deletions pkg/kv/kvclient/kvstreamer/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
"fmt"
"math"
"sync"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
Expand All @@ -29,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/errors"
"github.com/dustin/go-humanize"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -64,21 +66,21 @@ func TestStreamerLimitations(t *testing.T) {
t.Run("InOrder mode unsupported", func(t *testing.T) {
require.Panics(t, func() {
streamer := getStreamer()
streamer.Init(InOrder, Hints{UniqueRequests: true})
streamer.Init(InOrder, Hints{UniqueRequests: true}, 1 /* maxKeysPerRow */)
})
})

t.Run("non-unique requests unsupported", func(t *testing.T) {
require.Panics(t, func() {
streamer := getStreamer()
streamer.Init(OutOfOrder, Hints{UniqueRequests: false})
streamer.Init(OutOfOrder, Hints{UniqueRequests: false}, 1 /* maxKeysPerRow */)
})
})

t.Run("invalid enqueueKeys", func(t *testing.T) {
streamer := getStreamer()
defer streamer.Close()
streamer.Init(OutOfOrder, Hints{UniqueRequests: true})
streamer.Init(OutOfOrder, Hints{UniqueRequests: true}, 1 /* maxKeysPerRow */)
// Use a single request but two keys which is invalid.
reqs := []roachpb.RequestUnion{{Value: &roachpb.RequestUnion_Get{}}}
enqueueKeys := []int{0, 1}
Expand All @@ -88,7 +90,7 @@ func TestStreamerLimitations(t *testing.T) {
t.Run("pipelining unsupported", func(t *testing.T) {
streamer := getStreamer()
defer streamer.Close()
streamer.Init(OutOfOrder, Hints{UniqueRequests: true})
streamer.Init(OutOfOrder, Hints{UniqueRequests: true}, 1 /* maxKeysPerRow */)
get := roachpb.NewGet(roachpb.Key("key"), false /* forUpdate */)
reqs := []roachpb.RequestUnion{{
Value: &roachpb.RequestUnion_Get{
Expand Down Expand Up @@ -232,7 +234,7 @@ func TestStreamerBudgetErrorInEnqueue(t *testing.T) {
getStreamer := func(limitBytes int64) *Streamer {
acc.Clear(ctx)
s := getStreamer(ctx, s, limitBytes, &acc)
s.Init(OutOfOrder, Hints{UniqueRequests: true})
s.Init(OutOfOrder, Hints{UniqueRequests: true}, 1 /* maxKeysPerRow */)
return s
}

Expand Down Expand Up @@ -336,3 +338,87 @@ func TestStreamerCorrectlyDiscardsResponses(t *testing.T) {
})
}
}

// TestStreamerColumnFamilies verifies that the Streamer works correctly with
// large rows and multiple column families. The goal is to make sure that KVs
// from different rows are not intertwined.
func TestStreamerWideRows(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Start a cluster with large --max-sql-memory parameter so that the
// Streamer isn't hitting the root budget exceeded error.
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{
SQLMemoryPoolSize: 1 << 30, /* 1GiB */
})
ctx := context.Background()
defer s.Stopper().Stop(ctx)

const blobSize = 10 * initialAvgResponseSize
const numRows = 2

_, err := db.Exec("CREATE TABLE t (pk INT PRIMARY KEY, k INT, blob1 STRING, blob2 STRING, INDEX (k), FAMILY (pk, k, blob1), FAMILY (blob2))")
require.NoError(t, err)
for i := 0; i < numRows; i++ {
if i > 0 {
// Split each row into a separate range.
_, err = db.Exec(fmt.Sprintf("ALTER TABLE t SPLIT AT VALUES(%d)", i))
require.NoError(t, err)
}
_, err = db.Exec(fmt.Sprintf("INSERT INTO t SELECT %d, 1, repeat('a', %d), repeat('b', %d)", i, blobSize, blobSize))
require.NoError(t, err)
}

// Populate the range cache.
_, err = db.Exec("SELECT count(*) from t")
require.NoError(t, err)

// Perform an index join to read large blobs.
query := "SELECT count(*), sum(length(blob1)), sum(length(blob2)) FROM t@t_k_idx WHERE k = 1"
const concurrency = 3
// Different values for the distsql_workmem setting allow us to exercise the
// behavior in some degenerate cases (e.g. a small value results in a single
// KV exceeding the limit).
for _, workmem := range []int{
3 * blobSize / 2,
3 * blobSize,
4 * blobSize,
} {
t.Run(fmt.Sprintf("workmem=%s", humanize.Bytes(uint64(workmem))), func(t *testing.T) {
_, err = db.Exec(fmt.Sprintf("SET distsql_workmem = '%dB'", workmem))
require.NoError(t, err)
var wg sync.WaitGroup
wg.Add(concurrency)
errCh := make(chan error, concurrency)
for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()
row := db.QueryRow(query)
var count, sum1, sum2 int
if err := row.Scan(&count, &sum1, &sum2); err != nil {
errCh <- err
return
}
if count != numRows {
errCh <- errors.Newf("expected %d rows, read %d", numRows, count)
return
}
if sum1 != numRows*blobSize {
errCh <- errors.Newf("expected total length %d of blob1, read %d", numRows*blobSize, sum1)
return
}
if sum2 != numRows*blobSize {
errCh <- errors.Newf("expected total length %d of blob2, read %d", numRows*blobSize, sum2)
return
}
}()
}
wg.Wait()
close(errCh)
err, ok := <-errCh
if ok {
t.Fatal(err)
}
})
}
}
Loading

0 comments on commit ba04380

Please sign in to comment.