Skip to content

Commit

Permalink
kvserver: add unit testing to side-transport
Browse files Browse the repository at this point in the history
  • Loading branch information
nvanbenschoten committed Mar 2, 2021
1 parent 24bcbf9 commit f2058f5
Show file tree
Hide file tree
Showing 3 changed files with 284 additions and 40 deletions.
113 changes: 74 additions & 39 deletions pkg/kv/kvserver/closedts/sidetransport/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ import (
// told about ranges that it doesn't care about.
type Sender struct {
stopper *stop.Stopper
dialer *nodedialer.Dialer
st *cluster.Settings
clock *hlc.Clock
nodeID roachpb.NodeID
Expand Down Expand Up @@ -84,11 +83,13 @@ type Sender struct {
// to this buffer signals the connections to send it on their streams.
buf *updatesBuf

// connections contains connections to all nodes with follower replicas.
// connections are added as nodes get replicas for ranges with local leases
// and removed when the respective node no longer has any replicas with
// local leases.
connections map[roachpb.NodeID]*connection
// connFactory is used to establish new connections.
connFactory connFactory
// conns contains connections to all nodes with follower replicas of any of
// the registered leaseholder. connections are added as nodes get replicas
// for ranges with local leases and removed when the respective node no
// longer has any replicas with local leases.
conns map[roachpb.NodeID]conn
}

// trackedRange contains the information that the side-transport last published
Expand Down Expand Up @@ -135,18 +136,25 @@ type Replica interface {
// NewSender creates a Sender. Run must be called on it afterwards to get it to
// start publishing closed timestamps.
func NewSender(
stopper *stop.Stopper, dialer *nodedialer.Dialer, clock *hlc.Clock, st *cluster.Settings,
stopper *stop.Stopper, st *cluster.Settings, clock *hlc.Clock, dialer *nodedialer.Dialer,
) *Sender {
connFactory := newRPCConnFactory(dialer)
return newSenderWithConnFactory(stopper, st, clock, connFactory)
}

func newSenderWithConnFactory(
stopper *stop.Stopper, st *cluster.Settings, clock *hlc.Clock, connFactory connFactory,
) *Sender {
s := &Sender{
stopper: stopper,
dialer: dialer,
buf: newUpdatesBuf(),
clock: clock,
st: st,
stopper: stopper,
st: st,
clock: clock,
buf: newUpdatesBuf(),
connFactory: connFactory,
}
s.trackedMu.tracked = make(map[roachpb.RangeID]trackedRange)
s.leaseholdersMu.leaseholders = make(map[roachpb.RangeID]leaseholder)
s.connections = make(map[roachpb.NodeID]*connection)
s.conns = make(map[roachpb.NodeID]conn)
return s
}

Expand Down Expand Up @@ -239,7 +247,7 @@ func (s *Sender) UnregisterLeaseholder(
}
}

func (s *Sender) publish(ctx context.Context) {
func (s *Sender) publish(ctx context.Context) hlc.ClockTimestamp {
s.trackedMu.Lock()
defer s.trackedMu.Unlock()

Expand All @@ -251,9 +259,6 @@ func (s *Sender) publish(ctx context.Context) {
// Determine the message's sequence number.
s.trackedMu.lastSeqNum++
msg.SeqNum = s.trackedMu.lastSeqNum
// The first message produced is essentially a snapshot, since it has no
// previous state to reference.
msg.Snapshot = msg.SeqNum == 1

// Fix the closed timestamps that will be communicated to by this message.
// These timestamps (one per range policy) will apply to all the ranges
Expand Down Expand Up @@ -352,10 +357,10 @@ func (s *Sender) publish(ctx context.Context) {
// Close connections to the nodes that no longer need any info from us
// (because they don't have replicas for any of the ranges with leases on this
// node).
for nodeID, conn := range s.connections {
for nodeID, c := range s.conns {
if !nodesWithFollowers.Contains(int(nodeID)) {
delete(s.connections, nodeID)
conn.close()
delete(s.conns, nodeID)
c.close()
}
}

Expand All @@ -365,15 +370,18 @@ func (s *Sender) publish(ctx context.Context) {
// Note that we don't open a connection to ourselves. The timestamps that
// we're closing are written directly to the sideTransportClosedTimestamp
// fields of the local replicas in BumpSideTransportClosed.
if _, ok := s.connections[nodeID]; !ok && nodeID != s.nodeID {
c := newConnection(s, nodeID, s.dialer, s.buf)
if _, ok := s.conns[nodeID]; !ok && nodeID != s.nodeID {
c := s.connFactory.new(s, nodeID)
c.run(ctx, s.stopper)
s.connections[nodeID] = c
s.conns[nodeID] = c
}
})

// Publish the new message to all connections.
s.buf.Push(ctx, msg)

// Return the publication time, for tests.
return now
}

// GetSnapshot generates an update that contains all the sender's state (as
Expand Down Expand Up @@ -545,29 +553,55 @@ func (b *updatesBuf) Close() {
b.mu.updated.Broadcast()
}

// connection represents a connection to one particular node. The connection
// watches an updatesBuf and streams all the messages to the respective node.
type connection struct {
// connFactory is capable of creating new connections to specific nodes.
type connFactory interface {
new(*Sender, roachpb.NodeID) conn
}

// conn is a side-transport connection to a node. A conn watches an updatesBuf
// and streams all the messages to the respective node.
type conn interface {
run(context.Context, *stop.Stopper)
close()
}

// rpcConnFactory is an implementation of connFactory that establishes
// connections to other nodes using gRPC.
type rpcConnFactory struct {
dialer *nodedialer.Dialer
}

func newRPCConnFactory(dialer *nodedialer.Dialer) connFactory {
return &rpcConnFactory{
dialer: dialer,
}
}

// new implements the connFactory interface.
func (f *rpcConnFactory) new(s *Sender, nodeID roachpb.NodeID) conn {
return newRPCConn(f.dialer, s, nodeID)
}

// rpcConn is an implementation of conn that is implemented using a gRPC stream.
type rpcConn struct {
log.AmbientContext
dialer *nodedialer.Dialer
producer *Sender
nodeID roachpb.NodeID
dialer *nodedialer.Dialer
// buf accumulates messages to be sent to the connection. If the buffer
// overflows (because this stream is disconnected for long enough), we'll have
// to send a snapshot before we can resume sending regular messages.
buf *updatesBuf
stream ctpb.SideTransport_PushUpdatesClient
closed int32 // atomic
}

func newConnection(
p *Sender, nodeID roachpb.NodeID, dialer *nodedialer.Dialer, buf *updatesBuf,
) *connection {
r := &connection{
producer: p,
nodeID: nodeID,
func newRPCConn(
dialer *nodedialer.Dialer, producer *Sender, nodeID roachpb.NodeID,
) conn {
r := &rpcConn{
dialer: dialer,
buf: buf,
producer: producer,
nodeID: nodeID,
}
r.AddLogTag("ctstream", nodeID)
return r
Expand All @@ -576,11 +610,11 @@ func newConnection(
// close makes the connection stop sending messages. The run() goroutine will
// exit asynchronously. The parent Sender is expected to remove this connection
// from its list.
func (r *connection) close() {
func (r *rpcConn) close() {
atomic.StoreInt32(&r.closed, 1)
}

func (r *connection) sendMsg(ctx context.Context, msg *ctpb.Update) error {
func (r *rpcConn) sendMsg(ctx context.Context, msg *ctpb.Update) error {
if r.stream == nil {
conn, err := r.dialer.Dial(ctx, r.nodeID, rpc.SystemClass)
if err != nil {
Expand All @@ -594,7 +628,8 @@ func (r *connection) sendMsg(ctx context.Context, msg *ctpb.Update) error {
return r.stream.Send(msg)
}

func (r *connection) run(ctx context.Context, stopper *stop.Stopper) {
// run implements the conn interface.
func (r *rpcConn) run(ctx context.Context, stopper *stop.Stopper) {
_ /* err */ = stopper.RunAsyncTask(ctx, fmt.Sprintf("closedts publisher for n%d", r.nodeID),
func(ctx context.Context) {
ctx = r.AnnotateCtx(ctx)
Expand Down Expand Up @@ -623,7 +658,7 @@ func (r *connection) run(ctx context.Context, stopper *stop.Stopper) {
}

var ok bool
msg, ok = r.buf.GetBySeq(ctx, lastSent+1)
msg, ok = r.producer.buf.GetBySeq(ctx, lastSent+1)
// We can be signaled to stop in two ways: the buffer can be closed (in
// which case all connections must exit), or this connection was closed
// via close(). In either case, we quit.
Expand Down
Loading

0 comments on commit f2058f5

Please sign in to comment.