diff --git a/pkg/kv/kvserver/closedts/sidetransport/sender.go b/pkg/kv/kvserver/closedts/sidetransport/sender.go index b72cd457c550..b1174f7b03d2 100644 --- a/pkg/kv/kvserver/closedts/sidetransport/sender.go +++ b/pkg/kv/kvserver/closedts/sidetransport/sender.go @@ -56,7 +56,6 @@ import ( // told about ranges that it doesn't care about. type Sender struct { stopper *stop.Stopper - dialer *nodedialer.Dialer st *cluster.Settings clock *hlc.Clock nodeID roachpb.NodeID @@ -84,11 +83,13 @@ type Sender struct { // to this buffer signals the connections to send it on their streams. buf *updatesBuf - // connections contains connections to all nodes with follower replicas. - // connections are added as nodes get replicas for ranges with local leases - // and removed when the respective node no longer has any replicas with - // local leases. - connections map[roachpb.NodeID]*connection + // connFactory is used to establish new connections. + connFactory connFactory + // conns contains connections to all nodes with follower replicas of any of + // the registered leaseholder. connections are added as nodes get replicas + // for ranges with local leases and removed when the respective node no + // longer has any replicas with local leases. + conns map[roachpb.NodeID]conn } // trackedRange contains the information that the side-transport last published @@ -135,18 +136,25 @@ type Replica interface { // NewSender creates a Sender. Run must be called on it afterwards to get it to // start publishing closed timestamps. func NewSender( - stopper *stop.Stopper, dialer *nodedialer.Dialer, clock *hlc.Clock, st *cluster.Settings, + stopper *stop.Stopper, st *cluster.Settings, clock *hlc.Clock, dialer *nodedialer.Dialer, +) *Sender { + connFactory := newRPCConnFactory(dialer) + return newSenderWithConnFactory(stopper, st, clock, connFactory) +} + +func newSenderWithConnFactory( + stopper *stop.Stopper, st *cluster.Settings, clock *hlc.Clock, connFactory connFactory, ) *Sender { s := &Sender{ - stopper: stopper, - dialer: dialer, - buf: newUpdatesBuf(), - clock: clock, - st: st, + stopper: stopper, + st: st, + clock: clock, + buf: newUpdatesBuf(), + connFactory: connFactory, } s.trackedMu.tracked = make(map[roachpb.RangeID]trackedRange) s.leaseholdersMu.leaseholders = make(map[roachpb.RangeID]leaseholder) - s.connections = make(map[roachpb.NodeID]*connection) + s.conns = make(map[roachpb.NodeID]conn) return s } @@ -239,7 +247,7 @@ func (s *Sender) UnregisterLeaseholder( } } -func (s *Sender) publish(ctx context.Context) { +func (s *Sender) publish(ctx context.Context) hlc.ClockTimestamp { s.trackedMu.Lock() defer s.trackedMu.Unlock() @@ -251,9 +259,6 @@ func (s *Sender) publish(ctx context.Context) { // Determine the message's sequence number. s.trackedMu.lastSeqNum++ msg.SeqNum = s.trackedMu.lastSeqNum - // The first message produced is essentially a snapshot, since it has no - // previous state to reference. - msg.Snapshot = msg.SeqNum == 1 // Fix the closed timestamps that will be communicated to by this message. // These timestamps (one per range policy) will apply to all the ranges @@ -352,10 +357,10 @@ func (s *Sender) publish(ctx context.Context) { // Close connections to the nodes that no longer need any info from us // (because they don't have replicas for any of the ranges with leases on this // node). - for nodeID, conn := range s.connections { + for nodeID, c := range s.conns { if !nodesWithFollowers.Contains(int(nodeID)) { - delete(s.connections, nodeID) - conn.close() + delete(s.conns, nodeID) + c.close() } } @@ -365,15 +370,18 @@ func (s *Sender) publish(ctx context.Context) { // Note that we don't open a connection to ourselves. The timestamps that // we're closing are written directly to the sideTransportClosedTimestamp // fields of the local replicas in BumpSideTransportClosed. - if _, ok := s.connections[nodeID]; !ok && nodeID != s.nodeID { - c := newConnection(s, nodeID, s.dialer, s.buf) + if _, ok := s.conns[nodeID]; !ok && nodeID != s.nodeID { + c := s.connFactory.new(s, nodeID) c.run(ctx, s.stopper) - s.connections[nodeID] = c + s.conns[nodeID] = c } }) // Publish the new message to all connections. s.buf.Push(ctx, msg) + + // Return the publication time, for tests. + return now } // GetSnapshot generates an update that contains all the sender's state (as @@ -545,29 +553,55 @@ func (b *updatesBuf) Close() { b.mu.updated.Broadcast() } -// connection represents a connection to one particular node. The connection -// watches an updatesBuf and streams all the messages to the respective node. -type connection struct { +// connFactory is capable of creating new connections to specific nodes. +type connFactory interface { + new(*Sender, roachpb.NodeID) conn +} + +// conn is a side-transport connection to a node. A conn watches an updatesBuf +// and streams all the messages to the respective node. +type conn interface { + run(context.Context, *stop.Stopper) + close() +} + +// rpcConnFactory is an implementation of connFactory that establishes +// connections to other nodes using gRPC. +type rpcConnFactory struct { + dialer *nodedialer.Dialer +} + +func newRPCConnFactory(dialer *nodedialer.Dialer) connFactory { + return &rpcConnFactory{ + dialer: dialer, + } +} + +// new implements the connFactory interface. +func (f *rpcConnFactory) new(s *Sender, nodeID roachpb.NodeID) conn { + return newRPCConn(f.dialer, s, nodeID) +} + +// rpcConn is an implementation of conn that is implemented using a gRPC stream. +type rpcConn struct { log.AmbientContext + dialer *nodedialer.Dialer producer *Sender nodeID roachpb.NodeID - dialer *nodedialer.Dialer // buf accumulates messages to be sent to the connection. If the buffer // overflows (because this stream is disconnected for long enough), we'll have // to send a snapshot before we can resume sending regular messages. - buf *updatesBuf stream ctpb.SideTransport_PushUpdatesClient closed int32 // atomic } -func newConnection( - p *Sender, nodeID roachpb.NodeID, dialer *nodedialer.Dialer, buf *updatesBuf, -) *connection { - r := &connection{ - producer: p, - nodeID: nodeID, +func newRPCConn( + dialer *nodedialer.Dialer, producer *Sender, nodeID roachpb.NodeID, +) conn { + r := &rpcConn{ dialer: dialer, - buf: buf, + producer: producer, + nodeID: nodeID, } r.AddLogTag("ctstream", nodeID) return r @@ -576,11 +610,11 @@ func newConnection( // close makes the connection stop sending messages. The run() goroutine will // exit asynchronously. The parent Sender is expected to remove this connection // from its list. -func (r *connection) close() { +func (r *rpcConn) close() { atomic.StoreInt32(&r.closed, 1) } -func (r *connection) sendMsg(ctx context.Context, msg *ctpb.Update) error { +func (r *rpcConn) sendMsg(ctx context.Context, msg *ctpb.Update) error { if r.stream == nil { conn, err := r.dialer.Dial(ctx, r.nodeID, rpc.SystemClass) if err != nil { @@ -594,7 +628,8 @@ func (r *connection) sendMsg(ctx context.Context, msg *ctpb.Update) error { return r.stream.Send(msg) } -func (r *connection) run(ctx context.Context, stopper *stop.Stopper) { +// run implements the conn interface. +func (r *rpcConn) run(ctx context.Context, stopper *stop.Stopper) { _ /* err */ = stopper.RunAsyncTask(ctx, fmt.Sprintf("closedts publisher for n%d", r.nodeID), func(ctx context.Context) { ctx = r.AnnotateCtx(ctx) @@ -623,7 +658,7 @@ func (r *connection) run(ctx context.Context, stopper *stop.Stopper) { } var ok bool - msg, ok = r.buf.GetBySeq(ctx, lastSent+1) + msg, ok = r.producer.buf.GetBySeq(ctx, lastSent+1) // We can be signaled to stop in two ways: the buffer can be closed (in // which case all connections must exit), or this connection was closed // via close(). In either case, we quit. diff --git a/pkg/kv/kvserver/closedts/sidetransport/sender_test.go b/pkg/kv/kvserver/closedts/sidetransport/sender_test.go new file mode 100644 index 000000000000..b50acc64199f --- /dev/null +++ b/pkg/kv/kvserver/closedts/sidetransport/sender_test.go @@ -0,0 +1,209 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sidetransport + +import ( + "context" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/stretchr/testify/require" +) + +// mockReplica is a mock implementation of the Replica interface. +type mockReplica struct { + storeID roachpb.StoreID + rangeID roachpb.RangeID + desc roachpb.RangeDescriptor + + canBump bool + lai ctpb.LAI + policy roachpb.RangeClosedTimestampPolicy +} + +func (m *mockReplica) StoreID() roachpb.StoreID { return m.storeID } +func (m *mockReplica) GetRangeID() roachpb.RangeID { return m.rangeID } +func (m *mockReplica) Desc() *roachpb.RangeDescriptor { return &m.desc } +func (m *mockReplica) BumpSideTransportClosed( + _ context.Context, + _ hlc.ClockTimestamp, + _ [roachpb.MAX_CLOSED_TIMESTAMP_POLICY]hlc.Timestamp, +) (bool, ctpb.LAI, roachpb.RangeClosedTimestampPolicy) { + return m.canBump, m.lai, m.policy +} + +// mockConnFactory is a mock implementation of the connFactory interface. +type mockConnFactory struct{} + +func (f *mockConnFactory) new(_ *Sender, nodeID roachpb.NodeID) conn { + return &mockConn{nodeID: nodeID} +} + +// mockConn is a mock implementation of the conn interface. +type mockConn struct { + nodeID roachpb.NodeID + running bool + closed bool +} + +func (c *mockConn) run(context.Context, *stop.Stopper) { c.running = true } +func (c *mockConn) close() { c.closed = true } + +func newMockSender() (*Sender, *stop.Stopper) { + stopper := stop.NewStopper() + st := cluster.MakeTestingClusterSettings() + clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) + connFactory := &mockConnFactory{} + s := newSenderWithConnFactory(stopper, st, clock, connFactory) + s.nodeID = 1 // usually set in (*Sender).Run + return s, stopper +} + +func newMockReplica(id roachpb.RangeID, nodes ...roachpb.NodeID) *mockReplica { + var desc roachpb.RangeDescriptor + desc.RangeID = id + for _, nodeID := range nodes { + desc.AddReplica(nodeID, roachpb.StoreID(nodeID), roachpb.VOTER_FULL) + } + return &mockReplica{ + storeID: 1, + rangeID: id, + desc: desc, + canBump: true, + lai: 5, + policy: roachpb.LAG_BY_CLUSTER_SETTING, + } +} + +func expGroupUpdates(s *Sender, now hlc.ClockTimestamp) []ctpb.Update_GroupUpdate { + maxClockOffset := s.clock.MaxOffset() + lagTargetDuration := closedts.TargetDuration.Get(&s.st.SV) + targetForPolicy := func(pol roachpb.RangeClosedTimestampPolicy) hlc.Timestamp { + return closedts.TargetForPolicy(now, maxClockOffset, lagTargetDuration, pol) + } + return []ctpb.Update_GroupUpdate{ + {roachpb.LAG_BY_CLUSTER_SETTING, targetForPolicy(roachpb.LAG_BY_CLUSTER_SETTING)}, + {roachpb.LEAD_FOR_GLOBAL_READS, targetForPolicy(roachpb.LEAD_FOR_GLOBAL_READS)}, + } +} + +func TestSenderBasic(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + s, stopper := newMockSender() + defer stopper.Stop(ctx) + + // No leaseholders. + now := s.publish(ctx) + require.Len(t, s.trackedMu.tracked, 0) + require.Len(t, s.leaseholdersMu.leaseholders, 0) + require.Len(t, s.conns, 0) + + require.Equal(t, ctpb.SeqNum(1), s.trackedMu.lastSeqNum) + up, ok := s.buf.GetBySeq(ctx, 1) + require.True(t, ok) + require.Equal(t, roachpb.NodeID(1), up.NodeID) + require.Equal(t, ctpb.SeqNum(1), up.SeqNum) + require.Equal(t, false, up.Snapshot) + require.Equal(t, expGroupUpdates(s, now), up.ClosedTimestamps) + require.Nil(t, up.Removed) + require.Nil(t, up.AddedOrUpdated) + + // Add a leaseholder that can close. + r1 := newMockReplica(15, 1, 2, 3) + s.RegisterLeaseholder(ctx, r1, 1) + now = s.publish(ctx) + require.Len(t, s.trackedMu.tracked, 1) + require.Equal(t, map[roachpb.RangeID]trackedRange{ + 15: trackedRange{lai: 5, policy: roachpb.LAG_BY_CLUSTER_SETTING}, + }, s.trackedMu.tracked) + require.Len(t, s.leaseholdersMu.leaseholders, 1) + require.Len(t, s.conns, 2) + + require.Equal(t, ctpb.SeqNum(2), s.trackedMu.lastSeqNum) + up, ok = s.buf.GetBySeq(ctx, 2) + require.True(t, ok) + require.Equal(t, roachpb.NodeID(1), up.NodeID) + require.Equal(t, ctpb.SeqNum(2), up.SeqNum) + require.Equal(t, false, up.Snapshot) + require.Equal(t, expGroupUpdates(s, now), up.ClosedTimestamps) + require.Nil(t, up.Removed) + require.Equal(t, []ctpb.Update_RangeUpdate{ + {RangeID: 15, LAI: 5, Policy: roachpb.LAG_BY_CLUSTER_SETTING}, + }, up.AddedOrUpdated) + + c2, ok := s.conns[2] + require.True(t, ok) + require.Equal(t, &mockConn{nodeID: 2, running: true, closed: false}, c2.(*mockConn)) + c3, ok := s.conns[3] + require.True(t, ok) + require.Equal(t, &mockConn{nodeID: 3, running: true, closed: false}, c3.(*mockConn)) + + // The leaseholder can not close the next timestamp. + r1.canBump = false + now = s.publish(ctx) + require.Len(t, s.trackedMu.tracked, 0) + require.Len(t, s.leaseholdersMu.leaseholders, 1) + require.Len(t, s.conns, 2) + + require.Equal(t, ctpb.SeqNum(3), s.trackedMu.lastSeqNum) + up, ok = s.buf.GetBySeq(ctx, 3) + require.True(t, ok) + require.Equal(t, roachpb.NodeID(1), up.NodeID) + require.Equal(t, ctpb.SeqNum(3), up.SeqNum) + require.Equal(t, false, up.Snapshot) + require.Equal(t, expGroupUpdates(s, now), up.ClosedTimestamps) + require.Equal(t, []roachpb.RangeID{15}, up.Removed) + require.Nil(t, up.AddedOrUpdated) + + // The leaseholder loses its lease. + s.UnregisterLeaseholder(ctx, 1, 15) + now = s.publish(ctx) + require.Len(t, s.trackedMu.tracked, 0) + require.Len(t, s.leaseholdersMu.leaseholders, 0) + require.Len(t, s.conns, 0) + + require.Equal(t, ctpb.SeqNum(4), s.trackedMu.lastSeqNum) + up, ok = s.buf.GetBySeq(ctx, 4) + require.True(t, ok) + require.Equal(t, roachpb.NodeID(1), up.NodeID) + require.Equal(t, ctpb.SeqNum(4), up.SeqNum) + require.Equal(t, false, up.Snapshot) + require.Equal(t, expGroupUpdates(s, now), up.ClosedTimestamps) + require.Nil(t, up.Removed) + require.Nil(t, up.AddedOrUpdated) + + require.True(t, c2.(*mockConn).closed) + require.True(t, c3.(*mockConn).closed) +} + +func TestSenderConnectionChanges(t *testing.T) { + // TODO: Two ranges. + // Add follower for range 1: 2, 3. + // - check conns to 2 and 3. + // Add follower for range 2: 3, 4. + // - check conns to 2, 3, 4. + // Remove followers for range 2, 3. + // - check conns to 3, 4. + // Remove followers for range 3. + // - check conns to 4. +} + +func TestSenderSameRangeDifferentStores(t *testing.T) { + // TODO: Two replicas, different stores, same replica. +} diff --git a/pkg/server/server.go b/pkg/server/server.go index e75f1ea6236f..7f4ceca532b1 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -480,7 +480,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { } sTS := ts.MakeServer(cfg.AmbientCtx, tsDB, nodeCountFn, cfg.TimeSeriesServerConfig, stopper) - ctSender := sidetransport.NewSender(stopper, nodeDialer, clock, st) + ctSender := sidetransport.NewSender(stopper, st, clock, nodeDialer) // The InternalExecutor will be further initialized later, as we create more // of the server's components. There's a circular dependency - many things