diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index d3a28b379533..fcda80c5bd34 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -2146,10 +2146,10 @@ func (ds *DistSender) sendToReplicas( var updatedLeaseholder bool if tErr.Lease != nil { - updatedLeaseholder = routing.UpdateLease(ctx, tErr.Lease) + updatedLeaseholder = routing.UpdateLease(ctx, tErr.Lease, tErr.DescriptorGeneration) } else if tErr.LeaseHolder != nil { // tErr.LeaseHolder might be set when tErr.Lease isn't. - routing.UpdateLeaseholder(ctx, *tErr.LeaseHolder) + routing.UpdateLeaseholder(ctx, *tErr.LeaseHolder, tErr.DescriptorGeneration) updatedLeaseholder = true } // Move the new leaseholder to the head of the queue for the next diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_test.go index 6d4f4e0170ad..58b7fa1edcb0 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_test.go @@ -971,6 +971,112 @@ func TestDistSenderMovesOnFromReplicaWithStaleLease(t *testing.T) { require.LessOrEqual(t, callsToNode2, 11) } +// TestDistSenderIgnodesNLHEBasedOnOldRangeGeneration tests that a +// NotLeaseHolderError received from a replica that has a stale range descriptor +// version is ignored, and the next replica is attempted. +func TestDistSenderIgnoresNLHEBasedOnOldRangeGeneration(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + tracer := tracing.NewTracer() + ctx, finishAndGetRecording := tracing.ContextWithRecordingSpan( + context.Background(), tracer, "test", + ) + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) + rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper) + g := makeGossip(t, stopper, rpcContext) + for _, n := range testUserRangeDescriptor3Replicas.Replicas().VoterDescriptors() { + require.NoError(t, g.AddInfoProto( + gossip.MakeNodeIDKey(n.NodeID), + newNodeDesc(n.NodeID), + gossip.NodeDescriptorTTL, + )) + } + + oldGeneration := roachpb.RangeGeneration(1) + newGeneration := roachpb.RangeGeneration(2) + desc := roachpb.RangeDescriptor{ + RangeID: 1, + Generation: newGeneration, + StartKey: roachpb.RKeyMin, + EndKey: roachpb.RKeyMax, + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, ReplicaID: 1}, + {NodeID: 2, StoreID: 2, ReplicaID: 2}, + {NodeID: 3, StoreID: 3, ReplicaID: 3}, + }, + } + // Ambiguous lease refers to a replica that is incompatible with the cached + // range descriptor. + ambiguousLease := roachpb.Lease{ + Replica: roachpb.ReplicaDescriptor{NodeID: 4, StoreID: 4, ReplicaID: 4}, + } + cachedLease := roachpb.Lease{ + Replica: desc.InternalReplicas[1], + } + + // The cache starts with a lease on node 2, so the first request will be + // routed there. That replica will reply with an NLHE with an old descriptor + // generation value, which should make the DistSender try the next replica. + var calls []roachpb.NodeID + sendFn := func(ctx context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, error) { + calls = append(calls, ba.Replica.NodeID) + if ba.Replica.NodeID == 2 { + reply := &roachpb.BatchResponse{} + err := &roachpb.NotLeaseHolderError{Lease: &ambiguousLease, DescriptorGeneration: oldGeneration} + reply.Error = roachpb.NewError(err) + return reply, nil + } + require.Equal(t, ba.Replica.NodeID, roachpb.NodeID(1)) + return ba.CreateReply(), nil + } + + cfg := DistSenderConfig{ + AmbientCtx: log.AmbientContext{Tracer: tracer}, + Clock: clock, + NodeDescs: g, + RPCContext: rpcContext, + TestingKnobs: ClientTestingKnobs{ + TransportFactory: adaptSimpleTransport(sendFn), + }, + RangeDescriptorDB: threeReplicaMockRangeDescriptorDB, + NodeDialer: nodedialer.New(rpcContext, gossip.AddressResolver(g)), + Settings: cluster.MakeTestingClusterSettings(), + } + ds := NewDistSender(cfg) + + ds.rangeCache.Insert(ctx, roachpb.RangeInfo{ + Desc: desc, + Lease: cachedLease, + }) + + get := roachpb.NewGet(roachpb.Key("a"), false /* forUpdate */) + _, pErr := kv.SendWrapped(ctx, ds, get) + require.Nil(t, pErr) + + require.Equal(t, int64(0), ds.Metrics().RangeLookups.Count()) + // We expect to backoff and retry the same replica 11 times when we get an + // NLHE with stale info. See `sameReplicaRetryLimit`. + require.Equal(t, int64(11), ds.Metrics().NextReplicaErrCount.Count()) + require.Equal(t, int64(11), ds.Metrics().NotLeaseHolderErrCount.Count()) + + // Ensure that we called Node 2 11 times and then finally called Node 1. + var expectedCalls []roachpb.NodeID + for i := 0; i < 11; i++ { + expectedCalls = append(expectedCalls, roachpb.NodeID(2)) + } + expectedCalls = append(expectedCalls, roachpb.NodeID(1)) + require.Equal(t, expectedCalls, calls) + + require.Regexp( + t, + "backing off due to .* stale info", + finishAndGetRecording().String(), + ) +} + func TestDistSenderRetryOnTransportErrors(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/kv/kvclient/kvstreamer/BUILD.bazel b/pkg/kv/kvclient/kvstreamer/BUILD.bazel index 508f70955d90..ae5ad00d565a 100644 --- a/pkg/kv/kvclient/kvstreamer/BUILD.bazel +++ b/pkg/kv/kvclient/kvstreamer/BUILD.bazel @@ -52,6 +52,7 @@ go_test( "//pkg/util/log", "//pkg/util/mon", "//pkg/util/randutil", + "@com_github_cockroachdb_errors//:errors", "@com_github_dustin_go_humanize//:go-humanize", "@com_github_stretchr_testify//require", ], diff --git a/pkg/kv/kvclient/kvstreamer/streamer.go b/pkg/kv/kvclient/kvstreamer/streamer.go index 2d411e1a72b3..63a13e6dd8df 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer.go +++ b/pkg/kv/kvclient/kvstreamer/streamer.go @@ -142,12 +142,7 @@ func (r Result) Release(ctx context.Context) { } } -// Streamer provides a streaming oriented API for reading from the KV layer. At -// the moment the Streamer only works when SQL rows are comprised of a single KV -// (i.e. a single column family). -// TODO(yuzefovich): lift the restriction on a single column family once KV is -// updated so that rows are never split across different BatchResponses when -// TargetBytes limitBytes is exceeded. +// Streamer provides a streaming oriented API for reading from the KV layer. // // The example usage is roughly as follows: // @@ -207,9 +202,10 @@ type Streamer struct { distSender *kvcoord.DistSender stopper *stop.Stopper - mode OperationMode - hints Hints - budget *budget + mode OperationMode + hints Hints + maxKeysPerRow int32 + budget *budget coordinator workerCoordinator coordinatorStarted bool @@ -353,7 +349,10 @@ func NewStreamer( // Hints can be used to hint the aggressiveness of the caching policy. In // particular, it can be used to disable caching when the client knows that all // looked-up keys are unique (e.g. in the case of an index-join). -func (s *Streamer) Init(mode OperationMode, hints Hints) { +// +// maxKeysPerRow indicates the maximum number of KV pairs that comprise a single +// SQL row (i.e. the number of column families in the index being scanned). +func (s *Streamer) Init(mode OperationMode, hints Hints, maxKeysPerRow int) { if mode != OutOfOrder { panic(errors.AssertionFailedf("only OutOfOrder mode is supported")) } @@ -362,6 +361,7 @@ func (s *Streamer) Init(mode OperationMode, hints Hints) { panic(errors.AssertionFailedf("only unique requests are currently supported")) } s.hints = hints + s.maxKeysPerRow = int32(maxKeysPerRow) s.waitForResults = make(chan struct{}, 1) } @@ -1008,6 +1008,7 @@ func (w *workerCoordinator) performRequestAsync( ba.Header.WaitPolicy = w.lockWaitPolicy ba.Header.TargetBytes = targetBytes ba.Header.AllowEmpty = !headOfLine + ba.Header.WholeRowsOfSize = w.s.maxKeysPerRow // TODO(yuzefovich): consider setting MaxSpanRequestKeys whenever // applicable (#67885). ba.AdmissionHeader = w.requestAdmissionHeader diff --git a/pkg/kv/kvclient/kvstreamer/streamer_test.go b/pkg/kv/kvclient/kvstreamer/streamer_test.go index 7b5ac7d45caf..bc65005148d0 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer_test.go +++ b/pkg/kv/kvclient/kvstreamer/streamer_test.go @@ -14,6 +14,7 @@ import ( "context" "fmt" "math" + "sync" "testing" "github.com/cockroachdb/cockroach/pkg/base" @@ -29,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/cockroachdb/errors" "github.com/dustin/go-humanize" "github.com/stretchr/testify/require" ) @@ -64,21 +66,21 @@ func TestStreamerLimitations(t *testing.T) { t.Run("InOrder mode unsupported", func(t *testing.T) { require.Panics(t, func() { streamer := getStreamer() - streamer.Init(InOrder, Hints{UniqueRequests: true}) + streamer.Init(InOrder, Hints{UniqueRequests: true}, 1 /* maxKeysPerRow */) }) }) t.Run("non-unique requests unsupported", func(t *testing.T) { require.Panics(t, func() { streamer := getStreamer() - streamer.Init(OutOfOrder, Hints{UniqueRequests: false}) + streamer.Init(OutOfOrder, Hints{UniqueRequests: false}, 1 /* maxKeysPerRow */) }) }) t.Run("invalid enqueueKeys", func(t *testing.T) { streamer := getStreamer() defer streamer.Close() - streamer.Init(OutOfOrder, Hints{UniqueRequests: true}) + streamer.Init(OutOfOrder, Hints{UniqueRequests: true}, 1 /* maxKeysPerRow */) // Use a single request but two keys which is invalid. reqs := []roachpb.RequestUnion{{Value: &roachpb.RequestUnion_Get{}}} enqueueKeys := []int{0, 1} @@ -88,7 +90,7 @@ func TestStreamerLimitations(t *testing.T) { t.Run("pipelining unsupported", func(t *testing.T) { streamer := getStreamer() defer streamer.Close() - streamer.Init(OutOfOrder, Hints{UniqueRequests: true}) + streamer.Init(OutOfOrder, Hints{UniqueRequests: true}, 1 /* maxKeysPerRow */) get := roachpb.NewGet(roachpb.Key("key"), false /* forUpdate */) reqs := []roachpb.RequestUnion{{ Value: &roachpb.RequestUnion_Get{ @@ -232,7 +234,7 @@ func TestStreamerBudgetErrorInEnqueue(t *testing.T) { getStreamer := func(limitBytes int64) *Streamer { acc.Clear(ctx) s := getStreamer(ctx, s, limitBytes, &acc) - s.Init(OutOfOrder, Hints{UniqueRequests: true}) + s.Init(OutOfOrder, Hints{UniqueRequests: true}, 1 /* maxKeysPerRow */) return s } @@ -336,3 +338,87 @@ func TestStreamerCorrectlyDiscardsResponses(t *testing.T) { }) } } + +// TestStreamerColumnFamilies verifies that the Streamer works correctly with +// large rows and multiple column families. The goal is to make sure that KVs +// from different rows are not intertwined. +func TestStreamerWideRows(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // Start a cluster with large --max-sql-memory parameter so that the + // Streamer isn't hitting the root budget exceeded error. + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{ + SQLMemoryPoolSize: 1 << 30, /* 1GiB */ + }) + ctx := context.Background() + defer s.Stopper().Stop(ctx) + + const blobSize = 10 * initialAvgResponseSize + const numRows = 2 + + _, err := db.Exec("CREATE TABLE t (pk INT PRIMARY KEY, k INT, blob1 STRING, blob2 STRING, INDEX (k), FAMILY (pk, k, blob1), FAMILY (blob2))") + require.NoError(t, err) + for i := 0; i < numRows; i++ { + if i > 0 { + // Split each row into a separate range. + _, err = db.Exec(fmt.Sprintf("ALTER TABLE t SPLIT AT VALUES(%d)", i)) + require.NoError(t, err) + } + _, err = db.Exec(fmt.Sprintf("INSERT INTO t SELECT %d, 1, repeat('a', %d), repeat('b', %d)", i, blobSize, blobSize)) + require.NoError(t, err) + } + + // Populate the range cache. + _, err = db.Exec("SELECT count(*) from t") + require.NoError(t, err) + + // Perform an index join to read large blobs. + query := "SELECT count(*), sum(length(blob1)), sum(length(blob2)) FROM t@t_k_idx WHERE k = 1" + const concurrency = 3 + // Different values for the distsql_workmem setting allow us to exercise the + // behavior in some degenerate cases (e.g. a small value results in a single + // KV exceeding the limit). + for _, workmem := range []int{ + 3 * blobSize / 2, + 3 * blobSize, + 4 * blobSize, + } { + t.Run(fmt.Sprintf("workmem=%s", humanize.Bytes(uint64(workmem))), func(t *testing.T) { + _, err = db.Exec(fmt.Sprintf("SET distsql_workmem = '%dB'", workmem)) + require.NoError(t, err) + var wg sync.WaitGroup + wg.Add(concurrency) + errCh := make(chan error, concurrency) + for i := 0; i < concurrency; i++ { + go func() { + defer wg.Done() + row := db.QueryRow(query) + var count, sum1, sum2 int + if err := row.Scan(&count, &sum1, &sum2); err != nil { + errCh <- err + return + } + if count != numRows { + errCh <- errors.Newf("expected %d rows, read %d", numRows, count) + return + } + if sum1 != numRows*blobSize { + errCh <- errors.Newf("expected total length %d of blob1, read %d", numRows*blobSize, sum1) + return + } + if sum2 != numRows*blobSize { + errCh <- errors.Newf("expected total length %d of blob2, read %d", numRows*blobSize, sum2) + return + } + }() + } + wg.Wait() + close(errCh) + err, ok := <-errCh + if ok { + t.Fatal(err) + } + }) + } +} diff --git a/pkg/kv/kvclient/rangecache/range_cache.go b/pkg/kv/kvclient/rangecache/range_cache.go index 29e068bcda1f..086c585f51f2 100644 --- a/pkg/kv/kvclient/rangecache/range_cache.go +++ b/pkg/kv/kvclient/rangecache/range_cache.go @@ -379,7 +379,9 @@ func (et *EvictionToken) syncRLocked( // It's legal to pass in a lease with a zero Sequence; it will be treated as a // speculative lease and considered newer than any existing lease (and then in // turn will be overridden by any subsequent update). -func (et *EvictionToken) UpdateLease(ctx context.Context, l *roachpb.Lease) bool { +func (et *EvictionToken) UpdateLease( + ctx context.Context, l *roachpb.Lease, descGeneration roachpb.RangeGeneration, +) bool { rdc := et.rdc rdc.rangeCache.Lock() defer rdc.rangeCache.Unlock() @@ -388,7 +390,7 @@ func (et *EvictionToken) UpdateLease(ctx context.Context, l *roachpb.Lease) bool if !stillValid { return false } - ok, newEntry := cachedEntry.updateLease(l) + ok, newEntry := cachedEntry.updateLease(l, descGeneration) if !ok { return false } @@ -407,11 +409,13 @@ func (et *EvictionToken) UpdateLease(ctx context.Context, l *roachpb.Lease) bool // a full lease. This is called when a likely leaseholder is known, but not a // full lease. The lease we'll insert into the cache will be considered // "speculative". -func (et *EvictionToken) UpdateLeaseholder(ctx context.Context, lh roachpb.ReplicaDescriptor) { +func (et *EvictionToken) UpdateLeaseholder( + ctx context.Context, lh roachpb.ReplicaDescriptor, descGeneration roachpb.RangeGeneration, +) { // Notice that we don't initialize Lease.Sequence, which will make // entry.LeaseSpeculative() return true. l := &roachpb.Lease{Replica: lh} - et.UpdateLease(ctx, l) + et.UpdateLease(ctx, l, descGeneration) } // EvictLease evicts information about the current lease from the cache, if the @@ -1303,11 +1307,15 @@ func compareEntryLeases(a, b *CacheEntry) int { // This means that the passed-in lease is older than the lease already in the // entry. // -// If the new leaseholder is not a replica in the descriptor, we assume the -// lease information to be more recent than the entry's descriptor, and we -// return true, nil. The caller should evict the receiver from the cache, but -// it'll have to do extra work to figure out what to insert instead. -func (e *CacheEntry) updateLease(l *roachpb.Lease) (updated bool, newEntry *CacheEntry) { +// If the new leaseholder is not a replica in the descriptor, and the error is +// coming from a replica with range descriptor generation at least as high as +// the cache's, we deduce the lease information to be more recent than the +// entry's descriptor, and we return true, nil. The caller should evict the +// receiver from the cache, but it'll have to do extra work to figure out what +// to insert instead. +func (e *CacheEntry) updateLease( + l *roachpb.Lease, descGeneration roachpb.RangeGeneration, +) (updated bool, newEntry *CacheEntry) { // If l is older than what the entry has (or the same), return early. // // This method handles speculative leases: a new lease with a sequence of 0 is @@ -1326,11 +1334,21 @@ func (e *CacheEntry) updateLease(l *roachpb.Lease) (updated bool, newEntry *Cach return false, e } - // Check whether the lease we were given is compatible with the replicas in - // the descriptor. If it's not, the descriptor must be really stale, and the - // RangeCacheEntry needs to be evicted. + // If the lease is incompatible with the cached descriptor and the error is + // coming from a replica that has a non-stale descriptor, the cached + // descriptor must be stale and the RangeCacheEntry needs to be evicted. _, ok := e.desc.GetReplicaDescriptorByID(l.Replica.ReplicaID) if !ok { + // If the error is coming from a replica that has a stale range descriptor, + // we cannot trigger a cache eviction since this means we've rebalanced the + // old leaseholder away. If we were to evict here, we'd keep evicting until + // this replica applied the new lease. Not updating the cache here means + // that we'll end up trying all voting replicas until we either hit the new + // leaseholder or hit a replica that has accurate knowledge of the + // leaseholder. + if descGeneration != 0 && descGeneration < e.desc.Generation { + return false, nil + } return true, nil } diff --git a/pkg/kv/kvclient/rangecache/range_cache_test.go b/pkg/kv/kvclient/rangecache/range_cache_test.go index a88c7c27404a..0fe5fc1bd9cb 100644 --- a/pkg/kv/kvclient/rangecache/range_cache_test.go +++ b/pkg/kv/kvclient/rangecache/range_cache_test.go @@ -1570,13 +1570,16 @@ func TestRangeCacheUpdateLease(t *testing.T) { StoreID: 4, ReplicaID: 4, } + + staleRangeGeneration := roachpb.RangeGeneration(2) + nonStaleRangeGeneration := roachpb.RangeGeneration(3) desc1 := roachpb.RangeDescriptor{ StartKey: roachpb.RKeyMin, EndKey: roachpb.RKeyMax, InternalReplicas: []roachpb.ReplicaDescriptor{ rep1, rep2, }, - Generation: 0, + Generation: nonStaleRangeGeneration, } desc2 := roachpb.RangeDescriptor{ StartKey: roachpb.RKeyMin, @@ -1584,7 +1587,7 @@ func TestRangeCacheUpdateLease(t *testing.T) { InternalReplicas: []roachpb.ReplicaDescriptor{ rep2, rep3, }, - Generation: 1, + Generation: nonStaleRangeGeneration + 1, } desc3 := roachpb.RangeDescriptor{ StartKey: roachpb.RKeyMin, @@ -1592,7 +1595,7 @@ func TestRangeCacheUpdateLease(t *testing.T) { InternalReplicas: []roachpb.ReplicaDescriptor{ rep1, rep2, }, - Generation: 2, + Generation: nonStaleRangeGeneration + 2, } startKey := desc1.StartKey @@ -1621,7 +1624,7 @@ func TestRangeCacheUpdateLease(t *testing.T) { Sequence: 1, } oldTok := tok - ok := tok.UpdateLease(ctx, l) + ok := tok.UpdateLease(ctx, l, 0 /* descGeneration */) require.True(t, ok) require.Equal(t, oldTok.Desc(), tok.Desc()) require.Equal(t, &l.Replica, tok.Leaseholder()) @@ -1643,13 +1646,21 @@ func TestRangeCacheUpdateLease(t *testing.T) { require.True(t, ri.lease.Empty()) require.Equal(t, roachpb.LEAD_FOR_GLOBAL_READS, ri.ClosedTimestampPolicy()) - // Check that trying to update the lease to a non-member replica results - // in the entry's eviction and the token's invalidation. + // Check that trying to update the lease to a non-member replica results in + // the entry's eviction and the token's invalidation if the descriptor + // generation in the error is not older than the cached descriptor generation. l = &roachpb.Lease{ Replica: repNonMember, Sequence: 2, } - ok = tok.UpdateLease(ctx, l) + // Check that there's no eviction if the range desc generation in the error is + // stale. + ok = tok.UpdateLease(ctx, l, staleRangeGeneration) + require.False(t, ok) + require.True(t, tok.Valid()) + + // However, expect an eviction when the error's desc generation is non-stale. + ok = tok.UpdateLease(ctx, l, nonStaleRangeGeneration) require.False(t, ok) require.False(t, tok.Valid()) ri = cache.GetCached(ctx, startKey, false /* inverted */) @@ -1672,10 +1683,7 @@ func TestRangeCacheUpdateLease(t *testing.T) { Desc: desc2, Lease: roachpb.Lease{}, }) - ok = tok.UpdateLease(ctx, - // Specify a lease compatible with desc2. - &roachpb.Lease{Replica: rep2, Sequence: 3}, - ) + ok = tok.UpdateLease(ctx, &roachpb.Lease{Replica: rep2, Sequence: 3}, 0 /* descGeneration */) require.True(t, ok) require.NotNil(t, tok) require.Equal(t, &desc2, tok.Desc()) @@ -1690,7 +1698,7 @@ func TestRangeCacheUpdateLease(t *testing.T) { }) // This time try to specify a lease that's not compatible with the desc. The // entry should end up evicted from the cache. - ok = tok.UpdateLease(ctx, &roachpb.Lease{Replica: rep3, Sequence: 4}) + ok = tok.UpdateLease(ctx, &roachpb.Lease{Replica: rep3, Sequence: 4}, 0 /* descGeneration */) require.False(t, ok) require.False(t, tok.Valid()) ri = cache.GetCached(ctx, startKey, false /* inverted */) @@ -1734,7 +1742,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) { Replica: rep1, Sequence: 1, } - ok, e := e.updateLease(l) + ok, e := e.updateLease(l, 0 /* descGeneration */) require.True(t, ok) require.True(t, l.Equal(e.Lease())) @@ -1743,7 +1751,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) { Replica: rep1, Sequence: 0, } - ok, e = e.updateLease(l) + ok, e = e.updateLease(l, 0 /* descGeneration */) require.True(t, ok) require.NotNil(t, e.Leaseholder()) require.True(t, l.Replica.Equal(*e.Leaseholder())) @@ -1755,7 +1763,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) { Replica: rep2, Sequence: 0, } - ok, e = e.updateLease(l) + ok, e = e.updateLease(l, 0 /* descGeneration */) require.True(t, ok) require.NotNil(t, e.Leaseholder()) require.True(t, l.Replica.Equal(*e.Leaseholder())) @@ -1765,7 +1773,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) { Replica: rep1, Sequence: 0, } - ok, e = e.updateLease(l) + ok, e = e.updateLease(l, 0 /* descGeneration */) require.True(t, ok) require.NotNil(t, e.Leaseholder()) require.True(t, l.Replica.Equal(*e.Leaseholder())) @@ -1775,7 +1783,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) { Replica: rep1, Sequence: 2, } - ok, e = e.updateLease(l) + ok, e = e.updateLease(l, 0 /* descGeneration */) require.True(t, ok) require.NotNil(t, e.Leaseholder()) require.True(t, l.Equal(*e.Lease())) @@ -1785,7 +1793,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) { Replica: rep2, Sequence: 1, } - ok, e = e.updateLease(l) + ok, e = e.updateLease(l, 0 /* descGeneration */) require.False(t, ok) require.False(t, l.Equal(*e.Lease())) @@ -1794,7 +1802,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) { Replica: rep2, Sequence: 2, } - ok, e = e.updateLease(l) + ok, e = e.updateLease(l, 0 /* descGeneration */) require.True(t, ok) require.True(t, l.Equal(e.Lease())) @@ -1804,7 +1812,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) { Sequence: 2, } require.True(t, l.Equal(e.Lease())) - ok, e = e.updateLease(l) + ok, e = e.updateLease(l, 0 /* descGeneration */) require.False(t, ok) require.True(t, l.Equal(e.Lease())) @@ -1814,7 +1822,7 @@ func TestRangeCacheEntryUpdateLease(t *testing.T) { Replica: repNonMember, Sequence: 0, } - ok, e = e.updateLease(l) + ok, e = e.updateLease(l, 0 /* descGeneration */) require.True(t, ok) require.Nil(t, e) } diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index 7eadd4a8084c..ca9172ea364e 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -913,8 +913,9 @@ func newNotLeaseHolderError( l roachpb.Lease, proposerStoreID roachpb.StoreID, rangeDesc *roachpb.RangeDescriptor, msg string, ) *roachpb.NotLeaseHolderError { err := &roachpb.NotLeaseHolderError{ - RangeID: rangeDesc.RangeID, - CustomMsg: msg, + RangeID: rangeDesc.RangeID, + DescriptorGeneration: rangeDesc.Generation, + CustomMsg: msg, } if proposerStoreID != 0 { err.Replica, _ = rangeDesc.GetReplicaDescriptor(proposerStoreID) diff --git a/pkg/roachpb/errors.proto b/pkg/roachpb/errors.proto index 2cbe3d23db6b..5e46b6c79962 100644 --- a/pkg/roachpb/errors.proto +++ b/pkg/roachpb/errors.proto @@ -56,6 +56,12 @@ message NotLeaseHolderError { optional roachpb.Lease lease = 4; optional int64 range_id = 3 [(gogoproto.nullable) = false, (gogoproto.customname) = "RangeID", (gogoproto.casttype) = "RangeID"]; + // The range descriptor generation of the replica the error originated from. + // Used by the DistSender's RangeCache to determine whether the error was + // returned because the replica had a stale understanding of who the + // leaseholder is. + optional int64 descriptor_generation = 6 [(gogoproto.nullable) = false, + (gogoproto.customname) = "DescriptorGeneration", (gogoproto.casttype) = "RangeGeneration"]; // If set, the Error() method will return this instead of composing its // regular spiel. Useful because we reuse this error when rejecting a command // because the lease under which its application was attempted is different diff --git a/pkg/sql/colfetcher/cfetcher.go b/pkg/sql/colfetcher/cfetcher.go index a6b02c60cf93..f6d92810c3ab 100644 --- a/pkg/sql/colfetcher/cfetcher.go +++ b/pkg/sql/colfetcher/cfetcher.go @@ -242,7 +242,8 @@ type cFetcher struct { // maxKeysPerRow memoizes the maximum number of keys per row in the index // we're fetching from. This is used to calculate the kvBatchFetcher's - // firstBatchLimit. + // firstBatchLimit as well as by the ColIndexJoin when it is using the + // Streamer API. maxKeysPerRow int // True if the index key must be decoded. This is only false if there are no diff --git a/pkg/sql/colfetcher/index_join.go b/pkg/sql/colfetcher/index_join.go index 830cc3d3cf74..fab7a415d915 100644 --- a/pkg/sql/colfetcher/index_join.go +++ b/pkg/sql/colfetcher/index_join.go @@ -144,6 +144,7 @@ func (s *ColIndexJoin) Init(ctx context.Context) { s.streamerInfo.Streamer.Init( kvstreamer.OutOfOrder, kvstreamer.Hints{UniqueRequests: true}, + s.cf.maxKeysPerRow, ) } } @@ -455,23 +456,13 @@ func NewColIndexJoin( useStreamer := row.CanUseStreamer(ctx, flowCtx.EvalCtx.Settings) && !spec.MaintainOrdering if useStreamer { - // TODO(yuzefovich): remove this conditional once multiple column - // families are supported. - if maxKeysPerRow, err := tableArgs.desc.KeysPerRow(tableArgs.index.GetID()); err != nil { - return nil, err - } else if maxKeysPerRow > 1 { - // Currently, the streamer only supports cases with a single column - // family. - useStreamer = false - } else { - if streamerBudgetAcc == nil { - return nil, errors.AssertionFailedf("streamer budget account is nil when the Streamer API is desired") - } - // Keep the quarter of the memory limit for the output batch of the - // cFetcher, and we'll give the remaining three quarters to the - // streamer budget below. - memoryLimit = int64(math.Ceil(float64(memoryLimit) / 4.0)) + if streamerBudgetAcc == nil { + return nil, errors.AssertionFailedf("streamer budget account is nil when the Streamer API is desired") } + // Keep the quarter of the memory limit for the output batch of the + // cFetcher, and we'll give the remaining three quarters to the streamer + // budget below. + memoryLimit = int64(math.Ceil(float64(memoryLimit) / 4.0)) } fetcher := cFetcherPool.Get().(*cFetcher) diff --git a/pkg/sql/row/kv_batch_streamer.go b/pkg/sql/row/kv_batch_streamer.go index 550db6a32803..2390d50a66e1 100644 --- a/pkg/sql/row/kv_batch_streamer.go +++ b/pkg/sql/row/kv_batch_streamer.go @@ -26,7 +26,7 @@ import ( // CanUseStreamer returns whether the kvstreamer.Streamer API should be used. func CanUseStreamer(ctx context.Context, settings *cluster.Settings) bool { // TODO(yuzefovich): remove the version gate in 22.2 cycle. - return settings.Version.IsActive(ctx, clusterversion.TargetBytesAvoidExcess) && + return settings.Version.IsActive(ctx, clusterversion.ScanWholeRows) && useStreamerEnabled.Get(&settings.SV) } diff --git a/pkg/sql/rowexec/joinreader.go b/pkg/sql/rowexec/joinreader.go index 80c79b410aa1..90a07e29431a 100644 --- a/pkg/sql/rowexec/joinreader.go +++ b/pkg/sql/rowexec/joinreader.go @@ -130,6 +130,7 @@ type joinReader struct { unlimitedMemMonitor *mon.BytesMonitor budgetAcc mon.BoundAccount budgetLimit int64 + maxKeysPerRow int } input execinfra.RowSource @@ -462,29 +463,22 @@ func newJoinReader( jr.batchSizeBytes = jr.strategy.getLookupRowsBatchSizeHint(flowCtx.EvalCtx.SessionData()) if jr.usesStreamer { - maxKeysPerRow, err := jr.desc.KeysPerRow(jr.index.GetID()) + // jr.batchSizeBytes will be used up by the input batch, and we'll give + // everything else to the streamer budget. Note that budgetLimit will + // always be positive given that memoryLimit is at least 8MiB and + // batchSizeBytes is at most 4MiB. + jr.streamerInfo.budgetLimit = memoryLimit - jr.batchSizeBytes + // We need to use an unlimited monitor for the streamer's budget since + // the streamer itself is responsible for staying under the limit. + jr.streamerInfo.unlimitedMemMonitor = mon.NewMonitorInheritWithLimit( + "joinreader-streamer-unlimited" /* name */, math.MaxInt64, flowCtx.EvalCtx.Mon, + ) + jr.streamerInfo.unlimitedMemMonitor.Start(flowCtx.EvalCtx.Ctx(), flowCtx.EvalCtx.Mon, mon.BoundAccount{}) + jr.streamerInfo.budgetAcc = jr.streamerInfo.unlimitedMemMonitor.MakeBoundAccount() + jr.streamerInfo.maxKeysPerRow, err = jr.desc.KeysPerRow(jr.index.GetID()) if err != nil { return nil, err } - if maxKeysPerRow > 1 { - // Currently, the streamer only supports cases with a single column - // family. - jr.usesStreamer = false - } else { - // jr.batchSizeBytes will be used up by the input batch, and we'll - // give everything else to the streamer budget. Note that - // budgetLimit will always be positive given that memoryLimit is at - // least 8MiB and batchSizeBytes is at most 4MiB. - jr.streamerInfo.budgetLimit = memoryLimit - jr.batchSizeBytes - // We need to use an unlimited monitor for the streamer's budget - // since the streamer itself is responsible for staying under the - // limit. - jr.streamerInfo.unlimitedMemMonitor = mon.NewMonitorInheritWithLimit( - "joinreader-streamer-unlimited" /* name */, math.MaxInt64, flowCtx.EvalCtx.Mon, - ) - jr.streamerInfo.unlimitedMemMonitor.Start(flowCtx.EvalCtx.Ctx(), flowCtx.EvalCtx.Mon, mon.BoundAccount{}) - jr.streamerInfo.budgetAcc = jr.streamerInfo.unlimitedMemMonitor.MakeBoundAccount() - } } // TODO(radu): verify the input types match the index key types @@ -1039,6 +1033,7 @@ func (jr *joinReader) Start(ctx context.Context) { jr.streamerInfo.Streamer.Init( kvstreamer.OutOfOrder, kvstreamer.Hints{UniqueRequests: true}, + jr.streamerInfo.maxKeysPerRow, ) } jr.runningState = jrReadingInput