Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
119349: kvcoord: add `Transport.Reset()` r=erikgrinaker a=erikgrinaker

Extracted from #118943.

---

This resets the transport back to the first replica in the list.

Epic: none
Release note: None

119370: kvserver: deflake TestLeaseholdersRejectClockUpdateWithJump r=nvanbenschoten a=pav-kv

Fixes #119362
Epic: none
Release note: none

Co-authored-by: Erik Grinaker <[email protected]>
Co-authored-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
3 people committed Feb 20, 2024
3 parents 3161ca9 + 72eec2a + ca57086 commit 44e45c7
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 49 deletions.
32 changes: 3 additions & 29 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,31 +100,21 @@ func (c *internalClientCounts) Inc(ic rpc.RestrictedInternalClient) {
}

type countConnectionsTransport struct {
wrapped kvcoord.Transport
kvcoord.Transport
counts *internalClientCounts
wrapRangeFeedClient wrapRangeFeedClientFn
rfStreamEnabled bool
}

var _ kvcoord.Transport = (*countConnectionsTransport)(nil)

func (c *countConnectionsTransport) IsExhausted() bool {
return c.wrapped.IsExhausted()
}

func (c *countConnectionsTransport) SendNext(
ctx context.Context, request *kvpb.BatchRequest,
) (*kvpb.BatchResponse, error) {
return c.wrapped.SendNext(ctx, request)
}

type testFeedCtxKey struct{}
type useMuxRangeFeedCtxKey struct{}

func (c *countConnectionsTransport) NextInternalClient(
ctx context.Context,
) (rpc.RestrictedInternalClient, error) {
client, err := c.wrapped.NextInternalClient(ctx)
client, err := c.Transport.NextInternalClient(ctx)
if err != nil {
return nil, err
}
Expand All @@ -149,22 +139,6 @@ func (c *countConnectionsTransport) NextInternalClient(
return tc, nil
}

func (c *countConnectionsTransport) NextReplica() roachpb.ReplicaDescriptor {
return c.wrapped.NextReplica()
}

func (c *countConnectionsTransport) SkipReplica() {
c.wrapped.SkipReplica()
}

func (c *countConnectionsTransport) MoveToFront(descriptor roachpb.ReplicaDescriptor) bool {
return c.wrapped.MoveToFront(descriptor)
}

func (c *countConnectionsTransport) Release() {
c.wrapped.Release()
}

func makeTransportFactory(
rfStreamEnabled bool, counts *internalClientCounts, wrapFn wrapRangeFeedClientFn,
) func(kvcoord.TransportFactory) kvcoord.TransportFactory {
Expand All @@ -175,7 +149,7 @@ func makeTransportFactory(
return nil, err
}
countingTransport := &countConnectionsTransport{
wrapped: transport,
Transport: transport,
rfStreamEnabled: rfStreamEnabled,
counts: counts,
wrapRangeFeedClient: wrapFn,
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,10 @@ func (l *simpleTransportAdapter) MoveToFront(replica roachpb.ReplicaDescriptor)
return false
}

func (l *simpleTransportAdapter) Reset() {
l.nextReplicaIdx = 0
}

func (l *simpleTransportAdapter) Release() {}

func makeGossip(t *testing.T, stopper *stop.Stopper, rpcContext *rpc.Context) *gossip.Gossip {
Expand Down
12 changes: 12 additions & 0 deletions pkg/kv/kvclient/kvcoord/mocks_generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pkg/kv/kvclient/kvcoord/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ func (*firstNErrorTransport) MoveToFront(roachpb.ReplicaDescriptor) bool {
return true
}

func (f *firstNErrorTransport) Reset() {
f.numSent = 0
}

// TestComplexScenarios verifies various complex success/failure scenarios by
// mocking sendOne.
func TestComplexScenarios(t *testing.T) {
Expand Down
11 changes: 11 additions & 0 deletions pkg/kv/kvclient/kvcoord/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ type Transport interface {
// transport.
MoveToFront(roachpb.ReplicaDescriptor) bool

// Reset moves back to the first replica in the transport, according to the
// current ordering. This may not be the same replica as MoveToFront if it was
// called prior, which places the replica at the next rather than first index.
Reset()

// Release releases any resources held by this Transport.
Release()
}
Expand Down Expand Up @@ -283,6 +288,10 @@ func (gt *grpcTransport) MoveToFront(replica roachpb.ReplicaDescriptor) bool {
return false
}

func (gt *grpcTransport) Reset() {
gt.nextReplicaIdx = 0
}

// splitHealthy splits the grpcTransport's replica slice into healthy replica
// and unhealthy replica, based on their connection state. Healthy replicas will
// be rearranged first in the replicas slice, and unhealthy replicas will be
Expand Down Expand Up @@ -394,4 +403,6 @@ func (s *senderTransport) MoveToFront(replica roachpb.ReplicaDescriptor) bool {
return true
}

func (s *senderTransport) Reset() {}

func (s *senderTransport) Release() {}
44 changes: 44 additions & 0 deletions pkg/kv/kvclient/kvcoord/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,50 @@ func TestTransportMoveToFront(t *testing.T) {
require.Equal(t, 1, gt.nextReplicaIdx)
}

func TestTransportReset(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

rd1 := roachpb.ReplicaDescriptor{NodeID: 1, StoreID: 1, ReplicaID: 1}
rd2 := roachpb.ReplicaDescriptor{NodeID: 2, StoreID: 2, ReplicaID: 2}
rd3 := roachpb.ReplicaDescriptor{NodeID: 3, StoreID: 3, ReplicaID: 3}
gt := grpcTransport{replicas: []roachpb.ReplicaDescriptor{rd1, rd2, rd3}}

// Reset should be a noop when positioned at start.
require.Equal(t, rd1, gt.NextReplica())
gt.Reset()
require.Equal(t, rd1, gt.NextReplica())

// Reset should move back to front when in the middle.
gt.SkipReplica()
require.Equal(t, rd2, gt.NextReplica())
gt.Reset()
require.Equal(t, rd1, gt.NextReplica())

// Reset should move back to front when exhausted.
gt.SkipReplica()
gt.SkipReplica()
gt.SkipReplica()
require.True(t, gt.IsExhausted())
gt.Reset()
require.False(t, gt.IsExhausted())
require.Equal(t, rd1, gt.NextReplica())

// MoveToFront will reorder replicas by moving the replica to the next index.
// Reset moves to the start of the modified ordering.
gt.SkipReplica()
gt.SkipReplica()
require.True(t, gt.MoveToFront(rd1))
gt.Reset()
require.Equal(t, rd2, gt.NextReplica())
gt.SkipReplica()
require.Equal(t, rd1, gt.NextReplica())
gt.SkipReplica()
require.Equal(t, rd3, gt.NextReplica())
gt.SkipReplica()
require.True(t, gt.IsExhausted())
}

// TestSpanImport tests that the gRPC transport ingests trace information that
// came from gRPC responses (via tracingpb.RecordedSpan on the batch responses).
func TestSpanImport(t *testing.T) {
Expand Down
34 changes: 14 additions & 20 deletions pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,10 @@ func TestLeaseholdersRejectClockUpdateWithJump(t *testing.T) {
require.NoError(t, err)

manual.Pause()
ts1 := s.Clock().Now()
ts1 := hlc.Timestamp{WallTime: manual.UnixNano()}
// NB: it's possible that HLC ran in front of manual.Now() after the Pause()
// call. Particularly, if the wall clock regressed during Pause(), and there
// was a concurrent Now() with a pre-regression higher timestamp. See #119362.

key := roachpb.Key("a")
incArgs := incrementArgs(key, 5)
Expand All @@ -188,37 +191,28 @@ func TestLeaseholdersRejectClockUpdateWithJump(t *testing.T) {
const numCmds = 3
clockOffset := s.Clock().MaxOffset() / numCmds
for i := int64(1); i <= numCmds; i++ {
ts := hlc.ClockTimestamp(ts1.Add(i*clockOffset.Nanoseconds(), 0))
if _, err := kv.SendWrappedWith(context.Background(), store.TestSender(), kvpb.Header{Now: ts}, incArgs); err != nil {
t.Fatal(err)
}
_, pErr := kv.SendWrappedWith(ctx, store.TestSender(), kvpb.Header{
Now: hlc.ClockTimestamp(ts1.Add(i*clockOffset.Nanoseconds(), 0)),
}, incArgs)
require.NoError(t, pErr.GoError())
}

// Expect the clock to advance.
ts2 := s.Clock().Now()
if expAdvance, advance := ts2.GoTime().Sub(ts1.GoTime()), numCmds*clockOffset; advance != expAdvance {
t.Fatalf("expected clock to advance %s; got %s", expAdvance, advance)
}
require.Equal(t, numCmds*clockOffset, ts2.GoTime().Sub(ts1.GoTime()))

// Once the accumulated offset reaches MaxOffset, commands will be rejected.
tsFuture := hlc.ClockTimestamp(ts1.Add(s.Clock().MaxOffset().Nanoseconds()+1, 0))
_, pErr := kv.SendWrappedWith(ctx, store.TestSender(), kvpb.Header{Now: tsFuture}, incArgs)
if !testutils.IsPError(pErr, "remote wall time is too far ahead") {
t.Fatalf("unexpected error %v", pErr)
}
require.True(t, testutils.IsPError(pErr, "remote wall time is too far ahead"))

// The clock did not advance and the final command was not executed.
ts3 := s.Clock().Now()
if advance := ts3.GoTime().Sub(ts2.GoTime()); advance != 0 {
t.Fatalf("expected clock not to advance, but it advanced by %s", advance)
}
require.Zero(t, ts3.GoTime().Sub(ts2.GoTime()))
valRes, err := storage.MVCCGet(context.Background(), store.TODOEngine(), key, ts3,
storage.MVCCGetOptions{})
if err != nil {
t.Fatal(err)
}
if a, e := mustGetInt(valRes.Value), incArgs.Increment*numCmds; a != e {
t.Errorf("expected %d, got %d", e, a)
}
require.NoError(t, err)
require.Equal(t, incArgs.Increment*numCmds, mustGetInt(valRes.Value))
}

// TestTxnPutOutOfOrder tests a case where a put operation of an older
Expand Down

0 comments on commit 44e45c7

Please sign in to comment.