diff --git a/pkg/kv/kvserver/client_closed_timestamp_test.go b/pkg/kv/kvserver/client_closed_timestamp_test.go index 52252031218f..cfd72e273439 100644 --- a/pkg/kv/kvserver/client_closed_timestamp_test.go +++ b/pkg/kv/kvserver/client_closed_timestamp_test.go @@ -61,6 +61,8 @@ func TestClosedTimestampWorksWhenRequestsAreSentToNonLeaseHolders(t *testing.T) const closeInterval = 10 * time.Millisecond sqlRunner.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '"+ closeInterval.String()+"'") + sqlRunner.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '"+ + closeInterval.String()+"'") // To make node3 have a large epoch, synthesize a liveness record for with // epoch 1000 before starting the node. diff --git a/pkg/kv/kvserver/client_metrics_test.go b/pkg/kv/kvserver/client_metrics_test.go index 4ccf87218ce7..ec14a7be0ebd 100644 --- a/pkg/kv/kvserver/client_metrics_test.go +++ b/pkg/kv/kvserver/client_metrics_test.go @@ -352,6 +352,8 @@ func TestStoreMaxBehindNanosOnlyTracksEpochBasedLeases(t *testing.T) { closedTimestampDuration.String()) tdb.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.close_fraction = $1", closedTimestampFraction) + tdb.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = $1", + closedTimestampDuration.String()) // Let's get to a point where we know that we have an expiration based lease // with a start time more than some time ago and then we have a max closed diff --git a/pkg/kv/kvserver/closedts/ctpb/service.pb.go b/pkg/kv/kvserver/closedts/ctpb/service.pb.go index e4e34d67af40..7a2f5d6dcd6f 100644 --- a/pkg/kv/kvserver/closedts/ctpb/service.pb.go +++ b/pkg/kv/kvserver/closedts/ctpb/service.pb.go @@ -61,8 +61,9 @@ type Update struct { // to result in a stream failing and a new one being established). Snapshot bool `protobuf:"varint,3,opt,name=snapshot,proto3" json:"snapshot,omitempty"` ClosedTimestamps []Update_GroupUpdate `protobuf:"bytes,4,rep,name=closed_timestamps,json=closedTimestamps,proto3" json:"closed_timestamps"` - // removed contains the set of ranges that are no longer registered on the - // stream and who future updates are no longer applicable to. + // removed contains the set of ranges that are no longer tracked on this + // stream. The closed timestamps in this message and future messages no longer + // apply to these removed ranges. // // The field will be empty if snapshot is true, as a snapshot message implies // that all ranges not present in the snapshot's added_or_updated list are no @@ -75,7 +76,7 @@ func (m *Update) Reset() { *m = Update{} } func (m *Update) String() string { return proto.CompactTextString(m) } func (*Update) ProtoMessage() {} func (*Update) Descriptor() ([]byte, []int) { - return fileDescriptor_service_96a1a4bff833e11e, []int{0} + return fileDescriptor_service_458097978ab919fe, []int{0} } func (m *Update) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -118,7 +119,7 @@ func (m *Update_GroupUpdate) Reset() { *m = Update_GroupUpdate{} } func (m *Update_GroupUpdate) String() string { return proto.CompactTextString(m) } func (*Update_GroupUpdate) ProtoMessage() {} func (*Update_GroupUpdate) Descriptor() ([]byte, []int) { - return fileDescriptor_service_96a1a4bff833e11e, []int{0, 0} + return fileDescriptor_service_458097978ab919fe, []int{0, 0} } func (m *Update_GroupUpdate) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -159,7 +160,7 @@ func (m *Update_RangeUpdate) Reset() { *m = Update_RangeUpdate{} } func (m *Update_RangeUpdate) String() string { return proto.CompactTextString(m) } func (*Update_RangeUpdate) ProtoMessage() {} func (*Update_RangeUpdate) Descriptor() ([]byte, []int) { - return fileDescriptor_service_96a1a4bff833e11e, []int{0, 1} + return fileDescriptor_service_458097978ab919fe, []int{0, 1} } func (m *Update_RangeUpdate) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -191,7 +192,7 @@ func (m *Response) Reset() { *m = Response{} } func (m *Response) String() string { return proto.CompactTextString(m) } func (*Response) ProtoMessage() {} func (*Response) Descriptor() ([]byte, []int) { - return fileDescriptor_service_96a1a4bff833e11e, []int{1} + return fileDescriptor_service_458097978ab919fe, []int{1} } func (m *Response) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1290,10 +1291,10 @@ var ( ) func init() { - proto.RegisterFile("kv/kvserver/closedts/ctpb/service.proto", fileDescriptor_service_96a1a4bff833e11e) + proto.RegisterFile("kv/kvserver/closedts/ctpb/service.proto", fileDescriptor_service_458097978ab919fe) } -var fileDescriptor_service_96a1a4bff833e11e = []byte{ +var fileDescriptor_service_458097978ab919fe = []byte{ // 628 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x54, 0xc1, 0x4e, 0xdb, 0x30, 0x18, 0x6e, 0x96, 0x92, 0x56, 0xae, 0x06, 0xcc, 0xda, 0x21, 0x8a, 0xb6, 0xa4, 0x62, 0x82, 0xe5, diff --git a/pkg/kv/kvserver/closedts/ctpb/service.proto b/pkg/kv/kvserver/closedts/ctpb/service.proto index 5a976d87e29f..d141d2725861 100644 --- a/pkg/kv/kvserver/closedts/ctpb/service.proto +++ b/pkg/kv/kvserver/closedts/ctpb/service.proto @@ -72,8 +72,9 @@ message Update { } repeated GroupUpdate closed_timestamps = 4 [(gogoproto.nullable) = false]; - // removed contains the set of ranges that are no longer registered on the - // stream and who future updates are no longer applicable to. + // removed contains the set of ranges that are no longer tracked on this + // stream. The closed timestamps in this message and future messages no longer + // apply to these removed ranges. // // The field will be empty if snapshot is true, as a snapshot message implies // that all ranges not present in the snapshot's added_or_updated list are no diff --git a/pkg/kv/kvserver/closedts/sidetransport/BUILD.bazel b/pkg/kv/kvserver/closedts/sidetransport/BUILD.bazel index 595e61f7c393..2422e36ce748 100644 --- a/pkg/kv/kvserver/closedts/sidetransport/BUILD.bazel +++ b/pkg/kv/kvserver/closedts/sidetransport/BUILD.bazel @@ -2,10 +2,14 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "sidetransport", - srcs = ["sender.go"], + srcs = [ + "receiver.go", + "sender.go", + ], importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/sidetransport", visibility = ["//visibility:public"], deps = [ + "//pkg/base", "//pkg/clusterversion", "//pkg/kv/kvserver/closedts", "//pkg/kv/kvserver/closedts/ctpb", @@ -19,15 +23,20 @@ go_library( "//pkg/util/stop", "//pkg/util/syncutil", "//pkg/util/timeutil", + "@com_github_cockroachdb_errors//:errors", "@org_golang_google_grpc//:go_default_library", ], ) go_test( name = "sidetransport_test", - srcs = ["sender_test.go"], + srcs = [ + "receiver_test.go", + "sender_test.go", + ], embed = [":sidetransport"], deps = [ + "//pkg/base", "//pkg/kv/kvserver/closedts", "//pkg/kv/kvserver/closedts/ctpb", "//pkg/roachpb", @@ -35,8 +44,10 @@ go_test( "//pkg/settings/cluster", "//pkg/util/hlc", "//pkg/util/leaktest", + "//pkg/util/log", "//pkg/util/stop", "//pkg/util/syncutil", + "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//require", "@org_golang_google_grpc//:go_default_library", ], diff --git a/pkg/kv/kvserver/closedts/sidetransport/receiver.go b/pkg/kv/kvserver/closedts/sidetransport/receiver.go new file mode 100644 index 000000000000..07b714c39e37 --- /dev/null +++ b/pkg/kv/kvserver/closedts/sidetransport/receiver.go @@ -0,0 +1,345 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sidetransport + +import ( + "context" + "fmt" + "io" + "strings" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" +) + +// Receiver is the gRPC server for the closed timestamp side-transport, +// receiving updates from remote nodes. It maintains the set of current +// streaming connections. +type Receiver struct { + log.AmbientContext + stop *stop.Stopper + stores Stores + testingKnobs receiverTestingKnobs + + mu struct { + syncutil.RWMutex + conns map[roachpb.NodeID]*incomingStream + } +} + +// receiverTestingKnobs contains knobs for incomingStreams connected to a +// Receiver. The map is indexed by the sender NodeID. +type receiverTestingKnobs map[roachpb.NodeID]incomingStreamTestingKnobs + +var _ ctpb.SideTransportServer = &Receiver{} + +// NewReceiver creates a Receiver, to be used as a gRPC server with +// ctpb.RegisterClosedTimestampSideTransportServer. +func NewReceiver( + nodeID *base.NodeIDContainer, + stop *stop.Stopper, + stores Stores, + testingKnobs receiverTestingKnobs, +) *Receiver { + r := &Receiver{ + stop: stop, + stores: stores, + testingKnobs: testingKnobs, + } + r.AmbientContext.AddLogTag("n", nodeID) + r.mu.conns = make(map[roachpb.NodeID]*incomingStream) + return r +} + +// PushUpdates is the streaming RPC handler. +func (s *Receiver) PushUpdates(stream ctpb.SideTransport_PushUpdatesServer) error { + // Create a steam to service this connection. The stream will call back into + // the Receiver through onFirstMsg to register itself once it finds out the + // sender's node id. + ctx := s.AnnotateCtx(stream.Context()) + return newIncomingStream(s, s.stores).Run(ctx, s.stop, stream) +} + +// GetClosedTimestamp returns the latest closed timestamp that the receiver +// knows for a particular range, together with the LAI needed to have applied in +// order to use this closed timestamp. +// +// leaseholderNode is the last known leaseholder for the range. For efficiency +// reasons, only the closed timestamp info received from that node is checked +// for closed timestamp info about this range. +func (s *Receiver) GetClosedTimestamp( + ctx context.Context, rangeID roachpb.RangeID, leaseholderNode roachpb.NodeID, +) (hlc.Timestamp, ctpb.LAI) { + s.mu.RLock() + conn, ok := s.mu.conns[leaseholderNode] + s.mu.RUnlock() + if !ok { + return hlc.Timestamp{}, 0 + } + return conn.GetClosedTimestamp(ctx, rangeID) +} + +// onFirstMsg is called when the first message on a stream is received. This is +// the point where the stream finds out what node it's receiving data from. +func (s *Receiver) onFirstMsg(ctx context.Context, r *incomingStream, nodeID roachpb.NodeID) error { + s.mu.Lock() + defer s.mu.Unlock() + + log.VEventf(ctx, 2, "n%d opened a closed timestamps side-transport connection", nodeID) + // If we already have a connection from nodeID, we don't accept this one. The + // other one has to be zombie going away soon. The client is expected to retry + // to establish the new connection. + // + // We could figure out a way to signal the existing connection to terminate, + // but it doesn't seem worth it. + if _, ok := s.mu.conns[nodeID]; ok { + return errors.Errorf("connection from n%d already exists", nodeID) + } + s.mu.conns[nodeID] = r + r.testingKnobs = s.testingKnobs[nodeID] + return nil +} + +// onRecvErr is called when one of the inbound streams errors out. The stream is +// removed from the Receiver's collection. +func (s *Receiver) onRecvErr(ctx context.Context, nodeID roachpb.NodeID, err error) { + s.mu.Lock() + defer s.mu.Unlock() + + if err != io.EOF { + log.Warningf(ctx, "closed timestamps side-transport connection dropped from node: %d", nodeID) + } else { + log.VEventf(ctx, 2, "closed timestamps side-transport connection dropped from node: %d (%s)", nodeID, err) + } + if nodeID != 0 { + delete(s.mu.conns, nodeID) + } +} + +// incomingStream represents an inbound connection to a node publishing closed +// timestamp information. It maintains the latest closed timestamps communicated +// by the sender node. +type incomingStream struct { + // The server that created this stream. + server *Receiver + stores Stores + testingKnobs incomingStreamTestingKnobs + // The node that's sending info on this stream. + nodeID roachpb.NodeID + + mu struct { + syncutil.RWMutex + streamState + } +} + +type incomingStreamTestingKnobs struct { + onFirstMsg chan struct{} + onRecvErr func(sender roachpb.NodeID, err error) + onMsg chan *ctpb.Update +} + +// Stores is the interface of *Stores needed by incomingStream. +type Stores interface { + // ForwardSideTransportClosedTimestampForRange forwards the side-transport + // closed timestamp for the local replica(s) of the given range. + ForwardSideTransportClosedTimestampForRange( + ctx context.Context, rangeID roachpb.RangeID, closedTS hlc.Timestamp, lai ctpb.LAI) +} + +func newIncomingStream(s *Receiver, stores Stores) *incomingStream { + r := &incomingStream{ + server: s, + stores: stores, + } + return r +} + +// GetClosedTimestamp returns the latest closed timestamp that the receiver +// knows for a particular range, together with the LAI needed to have applied in +// order to use this closed timestamp. Returns an empty timestamp if the stream +// does not have state for the range. +func (r *incomingStream) GetClosedTimestamp( + ctx context.Context, rangeID roachpb.RangeID, +) (hlc.Timestamp, ctpb.LAI) { + r.mu.RLock() + defer r.mu.RUnlock() + info, ok := r.mu.tracked[rangeID] + if !ok { + return hlc.Timestamp{}, 0 + } + return r.mu.lastClosed[info.policy], info.lai +} + +// processUpdate processes one update received on the stream, updating the local +// state. +func (r *incomingStream) processUpdate(ctx context.Context, msg *ctpb.Update) { + log.VEventf(ctx, 4, "received side-transport update: %v", msg) + + if msg.NodeID == 0 { + log.Fatalf(ctx, "missing NodeID in message: %s", msg) + } + + if msg.NodeID != r.nodeID { + log.Fatalf(ctx, "wrong NodeID; expected %d, got %d", r.nodeID, msg.NodeID) + } + + // Handle the removed ranges. In order to not lose closed ts info, before we + // can remove a range from our tracking, we copy the info about its closed + // timestamp to the local replica(s). Note that it's important to do this + // before updating lastClosed below since, by definition, the closed + // timestamps in this message don't apply to the Removed ranges. + if len(msg.Removed) != 0 { + // Note that we call r.stores.ForwardSideTransportClosedTimestampForRange while holding + // our read lock, not write lock. ForwardSideTransportClosedTimestampForRange will call + // into each Replica, telling it to hold on locally to the the info we're about to + // remove from the stream. We can't do this with the mutex write-locked + // because replicas call GetClosedTimestamp() independently, with r.mu held + // (=> deadlock). + r.mu.RLock() + for _, rangeID := range msg.Removed { + info, ok := r.mu.tracked[rangeID] + if !ok { + log.Fatalf(ctx, "attempting to unregister a missing range: r%d", rangeID) + } + r.stores.ForwardSideTransportClosedTimestampForRange( + ctx, rangeID, r.mu.lastClosed[info.policy], info.lai) + } + r.mu.RUnlock() + } + + r.mu.Lock() + defer r.mu.Unlock() + + // Reset all the state on snapshots. + if msg.Snapshot { + for i := range r.mu.lastClosed { + r.mu.lastClosed[i] = hlc.Timestamp{} + } + r.mu.tracked = make(map[roachpb.RangeID]trackedRange, len(r.mu.tracked)) + } else if msg.SeqNum != r.mu.lastSeqNum+1 { + log.Fatalf(ctx, "expected closed timestamp side-transport message with sequence number "+ + "%d, got %d", r.mu.lastSeqNum+1, msg.SeqNum) + } + r.mu.lastSeqNum = msg.SeqNum + + for _, rng := range msg.AddedOrUpdated { + r.mu.tracked[rng.RangeID] = trackedRange{ + lai: rng.LAI, + policy: rng.Policy, + } + } + for _, rangeID := range msg.Removed { + delete(r.mu.tracked, rangeID) + } + for _, update := range msg.ClosedTimestamps { + r.mu.lastClosed[update.Policy] = update.ClosedTimestamp + } +} + +// Run handles an incoming stream of closed timestamps. +func (r *incomingStream) Run( + ctx context.Context, + stopper *stop.Stopper, + // The gRPC stream with incoming messages. + stream ctpb.SideTransport_PushUpdatesServer, +) error { + // We have to do the stream processing on a separate goroutine because Recv() + // is blocking, with no way to interrupt it other than returning from the RPC + // handler (i.e. this Run function). + // The main goroutine remains in charge of listening for stopper quiescence. + streamDone := make(chan struct{}) + if err := stopper.RunAsyncTask(ctx, "closedts side-transport server conn", func(ctx context.Context) { + // On exit, signal the other goroutine to terminate. + defer close(streamDone) + for { + msg, err := stream.Recv() + if err != nil { + if fn := r.testingKnobs.onRecvErr; fn != nil { + fn(r.nodeID, err) + } + r.server.onRecvErr(ctx, r.nodeID, err) + return + } + + if r.nodeID == 0 { + r.nodeID = msg.NodeID + + if err := r.server.onFirstMsg(ctx, r, r.nodeID); err != nil { + log.Warningf(ctx, "%s", err.Error()) + return + } else if ch := r.testingKnobs.onFirstMsg; ch != nil { + ch <- struct{}{} + } + if !msg.Snapshot { + log.Fatal(ctx, "expected the first message to be a snapshot") + } + } + + r.processUpdate(ctx, msg) + if ch := r.testingKnobs.onMsg; ch != nil { + select { + case ch <- msg: + default: + } + } + } + }); err != nil { + return err + } + + // Block until the client terminates (or there's another stream error) or + // the stopper signals us to bail. + select { + case <-streamDone: + case <-stopper.ShouldQuiesce(): + } + // Returning causes a blocked stream.Recv() (if there still is one) to return. + return nil +} + +func (r *incomingStream) String() string { + r.mu.Lock() + defer r.mu.Unlock() + var s strings.Builder + s.WriteString(fmt.Sprintf("n%d closed timestamps: ", r.nodeID)) + now := timeutil.Now() + rangesByPoicy := make(map[roachpb.RangeClosedTimestampPolicy]*strings.Builder) + for pol, ts := range r.mu.lastClosed { + if pol != 0 { + s.WriteString(", ") + } + policy := roachpb.RangeClosedTimestampPolicy(pol) + s.WriteString(fmt.Sprintf("%s: %s (lead/lag: %s)", policy, ts, now.Sub(ts.GoTime()))) + rangesByPoicy[policy] = &strings.Builder{} + } + s.WriteRune('\n') + for rid, info := range r.mu.tracked { + rangesByPoicy[info.policy].WriteString(fmt.Sprintf("%d, ", rid)) + } + first := true + for policy, sb := range rangesByPoicy { + if !first { + s.WriteRune('\n') + } else { + first = false + } + s.WriteString(fmt.Sprintf("%s tracked: %s", policy, sb.String())) + } + return s.String() +} diff --git a/pkg/kv/kvserver/closedts/sidetransport/receiver_test.go b/pkg/kv/kvserver/closedts/sidetransport/receiver_test.go new file mode 100644 index 000000000000..b33b3baca0d6 --- /dev/null +++ b/pkg/kv/kvserver/closedts/sidetransport/receiver_test.go @@ -0,0 +1,218 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sidetransport + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/stretchr/testify/require" +) + +type mockStores struct { + recording []rangeUpdate + sem chan struct{} +} + +type rangeUpdate struct { + rid roachpb.RangeID + closedTS hlc.Timestamp + lai ctpb.LAI +} + +var _ Stores = &mockStores{} + +func (m *mockStores) ForwardSideTransportClosedTimestampForRange( + ctx context.Context, rangeID roachpb.RangeID, closedTS hlc.Timestamp, lai ctpb.LAI, +) { + upd := rangeUpdate{ + rid: rangeID, + closedTS: closedTS, + lai: lai, + } + m.recording = append(m.recording, upd) + if m.sem != nil { + m.sem <- struct{}{} + <-m.sem + } +} + +func (m *mockStores) getAndClearRecording() []rangeUpdate { + res := m.recording + m.recording = nil + return res +} + +var ts10 = hlc.Timestamp{WallTime: 10} +var ts11 = hlc.Timestamp{WallTime: 11} +var ts12 = hlc.Timestamp{WallTime: 12} +var ts20 = hlc.Timestamp{WallTime: 20, Synthetic: true} +var ts21 = hlc.Timestamp{WallTime: 21, Synthetic: true} +var ts22 = hlc.Timestamp{WallTime: 22, Synthetic: true} +var laiZero = ctpb.LAI(0) + +const lai100 = ctpb.LAI(100) +const lai101 = ctpb.LAI(101) +const lai102 = ctpb.LAI(102) +const lai103 = ctpb.LAI(102) + +func TestIncomingStreamProcessUpdateBasic(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + nid := &base.NodeIDContainer{} + nid.Set(ctx, 1) + stores := &mockStores{} + server := NewReceiver(nid, stopper, stores, receiverTestingKnobs{}) + r := newIncomingStream(server, stores) + r.nodeID = 1 + + msg := &ctpb.Update{ + NodeID: 1, + SeqNum: 1, + Snapshot: true, + ClosedTimestamps: []ctpb.Update_GroupUpdate{ + {Policy: roachpb.LAG_BY_CLUSTER_SETTING, ClosedTimestamp: ts10}, + {Policy: roachpb.LEAD_FOR_GLOBAL_READS, ClosedTimestamp: ts20}, + }, + AddedOrUpdated: []ctpb.Update_RangeUpdate{ + {RangeID: 1, LAI: lai100, Policy: roachpb.LAG_BY_CLUSTER_SETTING}, + {RangeID: 2, LAI: lai101, Policy: roachpb.LAG_BY_CLUSTER_SETTING}, + {RangeID: 3, LAI: lai102, Policy: roachpb.LEAD_FOR_GLOBAL_READS}, + }, + Removed: nil, + } + r.processUpdate(ctx, msg) + ts, lai := r.GetClosedTimestamp(ctx, 1) + require.Equal(t, ts10, ts) + require.Equal(t, lai100, lai) + ts, lai = r.GetClosedTimestamp(ctx, 2) + require.Equal(t, ts10, ts) + require.Equal(t, lai101, lai) + ts, lai = r.GetClosedTimestamp(ctx, 3) + require.Equal(t, ts20, ts) + require.Equal(t, lai102, lai) + require.Empty(t, stores.getAndClearRecording()) + + // Remove range 1, update 2 implicitly, update 3 explicitly. + msg = &ctpb.Update{ + NodeID: 1, + SeqNum: 2, + Snapshot: false, + ClosedTimestamps: []ctpb.Update_GroupUpdate{ + {Policy: roachpb.LAG_BY_CLUSTER_SETTING, ClosedTimestamp: ts11}, + {Policy: roachpb.LEAD_FOR_GLOBAL_READS, ClosedTimestamp: ts21}, + }, + AddedOrUpdated: []ctpb.Update_RangeUpdate{ + {RangeID: 3, LAI: lai103, Policy: roachpb.LEAD_FOR_GLOBAL_READS}, + }, + Removed: []roachpb.RangeID{1}, + } + r.processUpdate(ctx, msg) + ts, lai = r.GetClosedTimestamp(ctx, 1) + require.Empty(t, ts) + require.Equal(t, laiZero, lai) + ts, lai = r.GetClosedTimestamp(ctx, 2) + require.Equal(t, ts11, ts) + require.Equal(t, lai101, lai) + ts, lai = r.GetClosedTimestamp(ctx, 3) + require.Equal(t, ts21, ts) + require.Equal(t, lai103, lai) + require.Equal(t, []rangeUpdate{{rid: 1, closedTS: ts10, lai: lai100}}, stores.getAndClearRecording()) + + // Send a snapshot and check that it rests all the state. + msg = &ctpb.Update{ + NodeID: 1, + SeqNum: 3, + Snapshot: true, + ClosedTimestamps: []ctpb.Update_GroupUpdate{ + {Policy: roachpb.LAG_BY_CLUSTER_SETTING, ClosedTimestamp: ts12}, + {Policy: roachpb.LEAD_FOR_GLOBAL_READS, ClosedTimestamp: ts22}, + }, + AddedOrUpdated: []ctpb.Update_RangeUpdate{ + {RangeID: 3, LAI: lai102, Policy: roachpb.LEAD_FOR_GLOBAL_READS}, + {RangeID: 4, LAI: lai100, Policy: roachpb.LAG_BY_CLUSTER_SETTING}, + }, + Removed: nil, + } + r.processUpdate(ctx, msg) + ts, lai = r.GetClosedTimestamp(ctx, 2) + require.Empty(t, ts) + require.Equal(t, laiZero, lai) + ts, lai = r.GetClosedTimestamp(ctx, 3) + require.Equal(t, ts22, ts) + require.Equal(t, lai102, lai) + ts, lai = r.GetClosedTimestamp(ctx, 4) + require.Equal(t, ts12, ts) + require.Equal(t, lai100, lai) + require.Empty(t, stores.getAndClearRecording()) +} + +// Test that when the incomingStream calls into the Stores to update a range, it +// doesn't hold its internal lock. Or, in other words, test that replicas can +// call into the stream while the stream is blocked updating the stores. In +// particular, the replica being updated might be calling into the stream to get +// its closed timestamp (async, for another operation), and it'd better not +// deadlock. +func TestIncomingStreamCallsIntoStoresDontHoldLock(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + nid := &base.NodeIDContainer{} + nid.Set(ctx, 1) + stores := &mockStores{} + server := NewReceiver(nid, stopper, stores, receiverTestingKnobs{}) + r := newIncomingStream(server, stores) + r.nodeID = 1 + + // Add a range to the stream. + msg := &ctpb.Update{ + NodeID: 1, SeqNum: 1, Snapshot: true, + ClosedTimestamps: []ctpb.Update_GroupUpdate{ + {Policy: roachpb.LEAD_FOR_GLOBAL_READS, ClosedTimestamp: ts10}, + }, + AddedOrUpdated: []ctpb.Update_RangeUpdate{ + {RangeID: 1, LAI: lai100, Policy: roachpb.LEAD_FOR_GLOBAL_READS}, + }, + Removed: nil, + } + r.processUpdate(ctx, msg) + + // Remove the range and block the removal in the Stores. + ch := make(chan struct{}) + stores.sem = ch + msg = &ctpb.Update{ + NodeID: 1, SeqNum: 2, Snapshot: false, + Removed: []roachpb.RangeID{1}, + } + go r.processUpdate(ctx, msg) + // Wait for the processUpdate to block. + <-ch + // With the update blocked, call into the stream. We're testing that this + // doesn't deadlock. + ts, _ := r.GetClosedTimestamp(ctx, 1) + require.Equal(t, ts10, ts) + // Unblock the process. + ch <- struct{}{} +} diff --git a/pkg/kv/kvserver/closedts/sidetransport/sender.go b/pkg/kv/kvserver/closedts/sidetransport/sender.go index b53860df6e02..1b691c43208f 100644 --- a/pkg/kv/kvserver/closedts/sidetransport/sender.go +++ b/pkg/kv/kvserver/closedts/sidetransport/sender.go @@ -62,16 +62,7 @@ type Sender struct { trackedMu struct { syncutil.Mutex - // lastSeqNum is the sequence number of the last message published. - lastSeqNum ctpb.SeqNum - // lastClosed is the closed timestamp published for each policy in the - // last message. - lastClosed [roachpb.MAX_CLOSED_TIMESTAMP_POLICY]hlc.Timestamp - // tracked maintains the information that was communicated to connections in - // the last sent message (implicitly or explicitly). A range enters this - // structure as soon as it's included in a message, and exits it when it's - // removed through Update.Removed. - tracked map[roachpb.RangeID]trackedRange + streamState } leaseholdersMu struct { @@ -90,6 +81,22 @@ type Sender struct { conns map[roachpb.NodeID]conn } +// streamState encapsulates the state that's tracked by a stream. Both the +// Sender and the Receiver use this struct and, for a given stream, both ends +// are supposed to correspond (modulo message delays), in wonderful symmetry. +type streamState struct { + // lastSeqNum is the sequence number of the last message published. + lastSeqNum ctpb.SeqNum + // lastClosed is the closed timestamp published for each policy in the + // last message. + lastClosed [roachpb.MAX_CLOSED_TIMESTAMP_POLICY]hlc.Timestamp + // tracked maintains the information that was communicated to connections in + // the last sent message (implicitly or explicitly). A range enters this + // structure as soon as it's included in a message, and exits it when it's + // removed through Update.Removed. + tracked map[roachpb.RangeID]trackedRange +} + type connTestingKnobs struct { beforeSend func(destNodeID roachpb.NodeID, msg *ctpb.Update) } @@ -166,6 +173,7 @@ func newSenderWithConnFactory( // This is not know at construction time. func (s *Sender) Run(ctx context.Context, nodeID roachpb.NodeID) { s.nodeID = nodeID + waitForUpgrade := !s.st.Version.IsActive(ctx, clusterversion.ClosedTimestampsRaftTransport) confCh := make(chan struct{}, 1) confChanged := func() { @@ -197,8 +205,11 @@ func (s *Sender) Run(ctx context.Context, nodeID roachpb.NodeID) { select { case <-timer.C: timer.Read = true - if !s.st.Version.IsActive(ctx, clusterversion.ClosedTimestampsRaftTransport) { + if waitForUpgrade && !s.st.Version.IsActive(ctx, clusterversion.ClosedTimestampsRaftTransport) { continue + } else if waitForUpgrade { + waitForUpgrade = false + log.Infof(ctx, "closed-timestamps v2 mechanism enabled by cluster version upgrade") } s.publish(ctx) case <-confCh: @@ -252,6 +263,7 @@ func (s *Sender) UnregisterLeaseholder( func (s *Sender) publish(ctx context.Context) hlc.ClockTimestamp { s.trackedMu.Lock() defer s.trackedMu.Unlock() + log.VEventf(ctx, 2, "side-transport publishing a new message") msg := &ctpb.Update{ NodeID: s.nodeID, @@ -383,6 +395,7 @@ func (s *Sender) publish(ctx context.Context) hlc.ClockTimestamp { }) // Publish the new message to all connections. + log.VEventf(ctx, 4, "side-transport publishing message with closed timestamps: %v (%v)", msg.ClosedTimestamps, msg) s.buf.Push(ctx, msg) // Return the publication time, for tests. @@ -608,7 +621,8 @@ type rpcConn struct { nodeID roachpb.NodeID testingKnobs connTestingKnobs - stream ctpb.SideTransport_PushUpdatesClient + stream ctpb.SideTransport_PushUpdatesClient + lastSent ctpb.SeqNum // cancelStreamCtx cleans up the resources (goroutine) associated with stream. // It needs to be called whenever stream is discarded. cancelStreamCtx context.CancelFunc @@ -638,6 +652,11 @@ func (r *rpcConn) cleanupStream() { r.stream = nil r.cancelStreamCtx() r.cancelStreamCtx = nil + // If we've been disconnected, reset the message sequence. If we ever + // reconnect, we'll ask the buffer for message 1, which was a snapshot. + // Generally, the buffer is not going to have that message any more and so + // we'll generate a new snapshot. + r.lastSent = 0 } // close makes the connection stop sending messages. The run() goroutine will @@ -684,7 +703,6 @@ func (r *rpcConn) run(ctx context.Context, stopper *stop.Stopper) { // On sending errors, we sleep a bit as to not spin on a tripped // circuit-breaker in the Dialer. const sleepOnErr = time.Second - var lastSent ctpb.SeqNum for { if ctx.Err() != nil { return @@ -698,19 +716,8 @@ func (r *rpcConn) run(ctx context.Context, stopper *stop.Stopper) { } var msg *ctpb.Update - - // If we've been disconnected, reset the message sequence. We'll ask the - // buffer for the very first message ever, which was a snapshot. - // Generally, the buffer is not going to have that message any more and - // so we'll generate a snapshot below. Except soon after startup when - // streams are initially established, when the initial message should - // still be in the buffer. - if r.stream == nil { - lastSent = 0 - } - var ok bool - msg, ok = r.producer.buf.GetBySeq(ctx, lastSent+1) + msg, ok = r.producer.buf.GetBySeq(ctx, r.lastSent+1) // We can be signaled to stop in two ways: the buffer can be closed (in // which case all connections must exit), or this connection was closed // via close(). In either case, we quit. @@ -723,20 +730,21 @@ func (r *rpcConn) run(ctx context.Context, stopper *stop.Stopper) { } if msg == nil { - // The sequence number we've requested is no longer in the buffer. - // We need to generate a snapshot in order to re-initialize the - // stream. + // The sequence number we've requested is no longer in the buffer. We + // need to generate a snapshot in order to re-initialize the stream. + // The snapshot will give us the sequence number to use for future + // incrementals. msg = r.producer.GetSnapshot() } + r.lastSent = msg.SeqNum - lastSent = msg.SeqNum if fn := r.testingKnobs.beforeSend; fn != nil { fn(r.nodeID, msg) } if err := r.stream.Send(msg); err != nil { if err != io.EOF && everyN.ShouldLog() { log.Warningf(ctx, "failed to send closed timestamp message %d to n%d: %s", - lastSent, r.nodeID, err) + r.lastSent, r.nodeID, err) } // Keep track of the fact that we need a new connection. // diff --git a/pkg/kv/kvserver/closedts/sidetransport/sender_test.go b/pkg/kv/kvserver/closedts/sidetransport/sender_test.go index f9a8bee1b226..f5d60d97068f 100644 --- a/pkg/kv/kvserver/closedts/sidetransport/sender_test.go +++ b/pkg/kv/kvserver/closedts/sidetransport/sender_test.go @@ -12,10 +12,12 @@ package sidetransport import ( "context" + "fmt" "net" "testing" "time" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -23,8 +25,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" "google.golang.org/grpc" ) @@ -33,22 +37,45 @@ import ( type mockReplica struct { storeID roachpb.StoreID rangeID roachpb.RangeID - desc roachpb.RangeDescriptor + mu struct { + syncutil.Mutex + desc roachpb.RangeDescriptor + } canBump bool lai ctpb.LAI policy roachpb.RangeClosedTimestampPolicy } -func (m *mockReplica) StoreID() roachpb.StoreID { return m.storeID } -func (m *mockReplica) GetRangeID() roachpb.RangeID { return m.rangeID } -func (m *mockReplica) Desc() *roachpb.RangeDescriptor { return &m.desc } +var _ Replica = &mockReplica{} + +func (m *mockReplica) StoreID() roachpb.StoreID { return m.storeID } +func (m *mockReplica) GetRangeID() roachpb.RangeID { return m.rangeID } +func (m *mockReplica) Desc() *roachpb.RangeDescriptor { + m.mu.Lock() + defer m.mu.Unlock() + return &m.mu.desc +} func (m *mockReplica) BumpSideTransportClosed( _ context.Context, _ hlc.ClockTimestamp, _ [roachpb.MAX_CLOSED_TIMESTAMP_POLICY]hlc.Timestamp, ) (bool, ctpb.LAI, roachpb.RangeClosedTimestampPolicy) { return m.canBump, m.lai, m.policy } +func (m *mockReplica) removeReplica(nid roachpb.NodeID) { + m.mu.Lock() + defer m.mu.Unlock() + replicas := m.mu.desc.Replicas() + for _, rd := range replicas.Descriptors() { + if rd.NodeID == nid { + replicas.RemoveReplica(rd.NodeID, rd.StoreID) + m.mu.desc.SetReplicas(replicas) + return + } + } + panic(fmt.Sprintf("replica not found for n%d", nid)) +} + // mockConnFactory is a mock implementation of the connFactory interface. type mockConnFactory struct{} @@ -81,14 +108,15 @@ func newMockReplica(id roachpb.RangeID, nodes ...roachpb.NodeID) *mockReplica { for _, nodeID := range nodes { desc.AddReplica(nodeID, roachpb.StoreID(nodeID), roachpb.VOTER_FULL) } - return &mockReplica{ + r := &mockReplica{ storeID: 1, rangeID: id, - desc: desc, canBump: true, lai: 5, policy: roachpb.LAG_BY_CLUSTER_SETTING, } + r.mu.desc = desc + return r } func expGroupUpdates(s *Sender, now hlc.ClockTimestamp) []ctpb.Update_GroupUpdate { @@ -105,6 +133,7 @@ func expGroupUpdates(s *Sender, now hlc.ClockTimestamp) []ctpb.Update_GroupUpdat func TestSenderBasic(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) ctx := context.Background() connFactory := &mockConnFactory{} s, stopper := newMockSender(connFactory) @@ -245,70 +274,106 @@ func (s *mockReceiver) getCalled() bool { return s.mu.called } -// mockSideTransportGRPCServer wraps a mockReceiver in a gRPC server listening -// on a network interface. -type mockSideTransportGRPCServer struct { +// sideTransportGRPCServer wraps a Receiver (a real one of a mock) in a gRPC +// server listening on a network interface. +type sideTransportGRPCServer struct { lis net.Listener srv *grpc.Server - receiver *mockReceiver - stopper *stop.Stopper + receiver ctpb.SideTransportServer } -func (s *mockSideTransportGRPCServer) close() { - s.receiver.close() +func (s *sideTransportGRPCServer) Close() { s.srv.Stop() _ /* err */ = s.lis.Close() - s.stopper.Stop(context.Background()) } -func (s *mockSideTransportGRPCServer) addr() net.Addr { +func (s *sideTransportGRPCServer) addr() net.Addr { return s.lis.Addr() } -func newMockSideTransportGRPCServer() (*mockSideTransportGRPCServer, error) { +func newMockSideTransportGRPCServer(stopper *stop.Stopper) (*sideTransportGRPCServer, error) { + receiver := newMockReceiver() + stopper.AddCloser(receiver) + server, err := newMockSideTransportGRPCServerWithOpts(stopper, receiver) + if err != nil { + return nil, err + } + return server, nil +} + +func newMockSideTransportGRPCServerWithOpts( + stopper *stop.Stopper, receiver ctpb.SideTransportServer, +) (*sideTransportGRPCServer, error) { lis, err := net.Listen("tcp", "localhost:") if err != nil { return nil, err } - stopper := stop.NewStopper() clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) grpcServer := rpc.NewServer(rpc.NewInsecureTestingContext(clock, stopper)) - - receiver := newMockReceiver() ctpb.RegisterSideTransportServer(grpcServer, receiver) go func() { _ /* err */ = grpcServer.Serve(lis) }() - return &mockSideTransportGRPCServer{ + server := &sideTransportGRPCServer{ lis: lis, srv: grpcServer, receiver: receiver, - stopper: stopper, - }, nil + } + stopper.AddCloser(server) + return server, nil +} + +func (s *sideTransportGRPCServer) mockReceiver() *mockReceiver { + return s.receiver.(*mockReceiver) } -func (s *mockReceiver) close() { +func (s *mockReceiver) Close() { close(s.stop) } type mockDialer struct { - serverAddr string - mu struct { + mu struct { syncutil.Mutex + addrs map[roachpb.NodeID]string conns []*grpc.ClientConn } } var _ nodeDialer = &mockDialer{} +type nodeAddr struct { + nid roachpb.NodeID + addr string +} + +func newMockDialer(addrs ...nodeAddr) *mockDialer { + d := &mockDialer{} + d.mu.addrs = make(map[roachpb.NodeID]string) + for _, addr := range addrs { + d.mu.addrs[addr.nid] = addr.addr + } + return d +} + +func (m *mockDialer) addOrUpdateNode(nid roachpb.NodeID, addr string) { + m.mu.Lock() + defer m.mu.Unlock() + m.mu.addrs[nid] = addr +} + func (m *mockDialer) Dial( ctx context.Context, nodeID roachpb.NodeID, class rpc.ConnectionClass, ) (_ *grpc.ClientConn, _ error) { m.mu.Lock() defer m.mu.Unlock() - c, err := grpc.Dial(m.serverAddr, grpc.WithInsecure()) + addr, ok := m.mu.addrs[nodeID] + if !ok { + return nil, errors.Errorf("node not configured in mockDialer: n%d", nodeID) + } + + c, err := grpc.Dial(addr, grpc.WithInsecure()) if err == nil { m.mu.conns = append(m.mu.conns, c) } @@ -326,12 +391,17 @@ func (m *mockDialer) Close() { // Test that the stopper quiescence interrupts a stream.Send. func TestRPCConnUnblocksOnStopper(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) ctx := context.Background() - srv, err := newMockSideTransportGRPCServer() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + srv, err := newMockSideTransportGRPCServer(stopper) require.NoError(t, err) - defer srv.close() - dialer := &mockDialer{serverAddr: srv.addr().String()} + dialer := newMockDialer(nodeAddr{ + nid: 2, + addr: srv.addr().String(), + }) defer dialer.Close() ch := make(chan struct{}) @@ -376,5 +446,72 @@ func TestRPCConnUnblocksOnStopper(t *testing.T) { // have been unblocked. stopper.Stop(ctx) - require.True(t, srv.receiver.getCalled()) + require.True(t, srv.mockReceiver().getCalled()) +} + +// Test a Sender and Receiver talking gRPC to each other. +func TestSenderReceiverIntegration(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + // We're going to create Receivers, corresponding to 3 nodes. Node 1 will also + // be the Sender, so we won't expect a connection to it (the Sender doesn't + // connect to itself). + const numNodes = 3 + receivers := make([]*Receiver, numNodes) + dialer := newMockDialer(nodeAddr{}) + defer dialer.Close() + incomingStreamOnN2FromN1Terminated := make(chan error) + for i := 0; i < numNodes; i++ { + receiverStop := stop.NewStopper() + defer func(i int) { + receiverStop.Stop(ctx) + }(i) + nid := &base.NodeIDContainer{} + nid.Set(ctx, roachpb.NodeID(i+1)) + stores := &mockStores{} + knobs := receiverTestingKnobs{ + roachpb.NodeID(1): { + onFirstMsg: make(chan struct{}), + onMsg: make(chan *ctpb.Update), + }, + } + incomingFromN1Knobs := knobs[1] + switch nid.Get() { + case 1: + // n1 doesn't expect any streams, since the only active sender will be on + // n1 and it's not supposed to connect to the local receiver. + incomingFromN1Knobs.onRecvErr = func(_ roachpb.NodeID, _ error) { + t.Errorf("unexpected receive error on node n%d", nid) + } + case 2: + // n2 gets a special handler. + incomingFromN1Knobs.onRecvErr = func(_ roachpb.NodeID, err error) { + incomingStreamOnN2FromN1Terminated <- err + } + } + knobs[1] = incomingFromN1Knobs + receivers[i] = NewReceiver(nid, receiverStop, stores, knobs) + srv, err := newMockSideTransportGRPCServerWithOpts(receiverStop, receivers[i]) + dialer.addOrUpdateNode(nid.Get(), srv.addr().String()) + require.NoError(t, err) + } + + s, senderStopper := newMockSender(newRPCConnFactory(dialer, connTestingKnobs{})) + defer senderStopper.Stop(ctx) + s.Run(ctx, roachpb.NodeID(1)) + + // Add a replica with replicas on n2 and n3. + r1 := newMockReplica(15, 1, 2, 3) + s.RegisterLeaseholder(ctx, r1, 1 /* leaseSeq */) + // Check that connections to n2,3 are established. + <-receivers[1].testingKnobs[1].onFirstMsg + <-receivers[2].testingKnobs[1].onFirstMsg + // Remove one of the replicas and check that the connection to the respective + // Receiver drops (since there's no other ranges with replicas on n2). + r1.removeReplica(roachpb.NodeID(2)) + <-incomingStreamOnN2FromN1Terminated + // Check that the other Receiver is still receiving updates. + <-receivers[2].testingKnobs[1].onMsg } diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 1f48e3c67d9b..054c139a1c92 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -1218,6 +1218,9 @@ func (r *Replica) checkExecutionCanProceed( r.maybeExtendLeaseAsync(ctx, st) } }() + var update replicaUpdate + // When we're done, apply the update (if any) after releasing r.mu. + defer update.apply(ctx, r) now := r.Clock().NowAsClockTimestamp() rSpan, err := keys.Range(ba.Requests) @@ -1274,7 +1277,9 @@ func (r *Replica) checkExecutionCanProceed( // If not, can we serve this request on a follower? // TODO(nvanbenschoten): once we make this check cheaper // than leaseGoodToGoRLocked, invert these checks. - if !r.canServeFollowerReadRLocked(ctx, ba, err) { + var ok bool + ok, update = r.canServeFollowerReadRLocked(ctx, ba, err) + if !ok { return st, err } err = nil // ignore error diff --git a/pkg/kv/kvserver/replica_closedts.go b/pkg/kv/kvserver/replica_closedts.go index 5bfc5619e782..3173bfe12b67 100644 --- a/pkg/kv/kvserver/replica_closedts.go +++ b/pkg/kv/kvserver/replica_closedts.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" ) // EmitMLAI registers the replica's last assigned max lease index with the @@ -150,3 +151,36 @@ func (r *Replica) closedTimestampTargetRLocked() hlc.Timestamp { policy := r.closedTimestampPolicyRLocked() return closedts.TargetForPolicy(now, maxClockOffset, lagTargetDuration, policy) } + +// ForwardSideTransportClosedTimestamp forwards +// r.mu.sideTransportClosedTimestamp. It is called by the closed timestamp +// side-transport receiver. +func (r *Replica) ForwardSideTransportClosedTimestamp( + ctx context.Context, closedTS hlc.Timestamp, lai ctpb.LAI, +) { + r.mu.Lock() + defer r.mu.Unlock() + + if r.mu.sideTransportClosedTimestamp.Forward(closedTS) { + if r.mu.sideTransportCloseTimestampLAI > lai { + log.Fatalf(ctx, "received side-transport notification with higher closed timestamp "+ + "but lower LAI: r%d current LAI: %d received LAI: %d", + r.RangeID, r.mu.sideTransportCloseTimestampLAI, lai) + } + r.mu.sideTransportCloseTimestampLAI = lai + } +} + +// getSideTransportClosedTimestamp returns the replica's information about the +// timestamp that was closed by the side-transport. Note that this not include +// r.mu.state.RaftClosedTimestamp. Also note that this might not be the highest +// closed timestamp communicated by the side-transport - the +// ClosedTimestampReceiver should be checked too if an up-to-date value is +// required. +// +// It's the responsibility of the caller to check the returned LAI against the +// replica's applied LAI. If the returned LAI hasn't applied, the closed +// timestamp cannot be used. +func (r *Replica) getSideTransportClosedTimestampRLocked() (closedTS hlc.Timestamp, lai ctpb.LAI) { + return r.mu.sideTransportClosedTimestamp, r.mu.sideTransportCloseTimestampLAI +} diff --git a/pkg/kv/kvserver/replica_follower_read.go b/pkg/kv/kvserver/replica_follower_read.go index 650b233cedee..965c97ff7245 100644 --- a/pkg/kv/kvserver/replica_follower_read.go +++ b/pkg/kv/kvserver/replica_follower_read.go @@ -52,6 +52,22 @@ func BatchCanBeEvaluatedOnFollower(ba roachpb.BatchRequest) bool { return ba.Txn != nil && ba.IsAllTransactional() && ba.IsReadOnly() && !ba.IsLocking() } +// replicaUpdate contains updates to be applied to a replica. It's intended to +// be returned by functions holding r.mu in reader mode, to be applied later +// when the mutex can be taken in write mode. +type replicaUpdate struct { + sideTransportClosedTimestamp hlc.Timestamp + sideTransportClosedLAI ctpb.LAI +} + +// apply copies the information into the replica. This cannot be called with r.mu held. +func (u replicaUpdate) apply(ctx context.Context, r *Replica) { + if u == (replicaUpdate{}) { + return + } + r.ForwardSideTransportClosedTimestamp(ctx, u.sideTransportClosedTimestamp, u.sideTransportClosedLAI) +} + // canServeFollowerReadRLocked tests, when a range lease could not be acquired, // whether the batch can be served as a follower read despite the error. Only // non-locking, read-only requests can be served as follower reads. The batch @@ -59,7 +75,7 @@ func BatchCanBeEvaluatedOnFollower(ba roachpb.BatchRequest) bool { // accepted as a follower read. func (r *Replica) canServeFollowerReadRLocked( ctx context.Context, ba *roachpb.BatchRequest, err error, -) bool { +) (bool, replicaUpdate) { var lErr *roachpb.NotLeaseHolderError eligible := errors.As(err, &lErr) && lErr.LeaseHolder != nil && lErr.Lease.Type() == roachpb.LeaseEpoch && @@ -68,23 +84,23 @@ func (r *Replica) canServeFollowerReadRLocked( if !eligible { // We couldn't do anything with the error, propagate it. - return false + return false, replicaUpdate{} } repDesc, err := r.getReplicaDescriptorRLocked() if err != nil { - return false + return false, replicaUpdate{} } switch typ := repDesc.GetType(); typ { case roachpb.VOTER_FULL, roachpb.VOTER_INCOMING, roachpb.NON_VOTER: default: log.Eventf(ctx, "%s replicas cannot serve follower reads", typ) - return false + return false, replicaUpdate{} } requiredFrontier := ba.Txn.RequiredFrontier() - maxClosed, _ := r.maxClosedRLocked(ctx) + maxClosed, _, update := r.maxClosedRLocked(ctx, requiredFrontier /* sufficient */) canServeFollowerRead := requiredFrontier.LessEq(maxClosed) tsDiff := requiredFrontier.GoTime().Sub(maxClosed.GoTime()) if !canServeFollowerRead { @@ -108,7 +124,7 @@ func (r *Replica) canServeFollowerReadRLocked( r.store.cfg.ClosedTimestamp.Storage.(*ctstorage.MultiStorage).StringForNodes(lErr.LeaseHolder.NodeID), ) } - return false + return false, update } // This replica can serve this read! @@ -117,7 +133,7 @@ func (r *Replica) canServeFollowerReadRLocked( // serve reads for that and smaller timestamps forever. log.Eventf(ctx, "%s; query timestamp below closed timestamp by %s", kvbase.FollowerReadServingMsg, -tsDiff) r.store.metrics.FollowerReadsCount.Inc(1) - return true + return true, update } // maxClosed returns the maximum closed timestamp for this range. @@ -131,31 +147,96 @@ func (r *Replica) canServeFollowerReadRLocked( // uses an expiration-based lease. Expiration-based leases do not support the // closed timestamp subsystem. A zero-value timestamp will be returned if ok // is false. +// +// TODO(andrei): Remove the bool retval once we remove the old closed timestamp +// mechanism. func (r *Replica) maxClosed(ctx context.Context) (_ hlc.Timestamp, ok bool) { r.mu.RLock() - defer r.mu.RUnlock() - return r.maxClosedRLocked(ctx) + res, ok, update := r.maxClosedRLocked(ctx, hlc.Timestamp{} /* sufficient */) + r.mu.RUnlock() + update.apply(ctx, r) + return res, ok } -func (r *Replica) maxClosedRLocked(ctx context.Context) (_ hlc.Timestamp, ok bool) { - lai := r.mu.state.LeaseAppliedIndex - lease := *r.mu.state.Lease +// maxClosedRLocked is like maxClosed, except that it requires r.mu to be +// rlocked. It also optionally takes a hint: if sufficient is not +// empty, maxClosedRLocked might return a timestamp that's lower than the +// maximum closed timestamp that we know about, as long as the returned +// timestamp is still >= sufficient. This is a performance optimization because +// we can avoid consulting the ClosedTimestampReceiver. +func (r *Replica) maxClosedRLocked( + ctx context.Context, sufficient hlc.Timestamp, +) (_ hlc.Timestamp, ok bool, _ replicaUpdate) { + appliedLAI := ctpb.LAI(r.mu.state.LeaseAppliedIndex) + lease := r.mu.state.Lease initialMaxClosed := r.mu.initialMaxClosed replicaStateClosed := r.mu.state.RaftClosedTimestamp + // Consider the timestamp closed through the side-transport. Such a timestamp + // can be in two places: + // - r.mu.sideTransportClosedTimestamp + // - in the sidetransport.Receiver + // We check the former here. We check the latter further down, only if we have + // to. + var sideTransportClosed hlc.Timestamp + sideTransportClosedMaybe, minLAI := r.getSideTransportClosedTimestampRLocked() + // We can use sideTransportClosedMaybe if we've applied at least up to minLAI. + // The replica could in theory maintain more information about what lower + // timestamps the side transport had closed with lower LAIs, but we don't + // bother. + replicationBehind := appliedLAI < minLAI + if !replicationBehind { + sideTransportClosed = sideTransportClosedMaybe + } + // TODO(andrei): In 21.1 we added support for closed timestamps on ranges with + // expiration-based leases. Once the old closed timestamp transport is gone in + // 21.2, this can go away. if lease.Expiration != nil { - return hlc.Timestamp{}, false + return hlc.Timestamp{}, false, replicaUpdate{} } // Look at the legacy closed timestamp propagation mechanism. maxClosed := r.store.cfg.ClosedTimestamp.Provider.MaxClosed( - lease.Replica.NodeID, r.RangeID, ctpb.Epoch(lease.Epoch), ctpb.LAI(lai)) + lease.Replica.NodeID, r.RangeID, ctpb.Epoch(lease.Epoch), appliedLAI) maxClosed.Forward(lease.Start.ToTimestamp()) maxClosed.Forward(initialMaxClosed) // Look at the "new" closed timestamp propagation mechanism. maxClosed.Forward(replicaStateClosed) + maxClosed.Forward(sideTransportClosed) + + // If the closed timestamp we know so far is sufficient, we return early + // without consulting the ClosedTimestampReceiver. + if !sufficient.IsEmpty() && sufficient.LessEq(maxClosed) { + return maxClosed, true, replicaUpdate{} + } + + // We now look at sidetransport.Receiver, unless replicationBehind was set; + // the LAIs in the Receiver are >= the one returned by + // getSideTransportClosedTimestampRLocked(), so there's no point in even + // checking. + var update replicaUpdate + // In some tests the lease can be empty, or the ClosedTimestampReceiver might + // not be set. + // TODO(andrei): Remove the ClosedTimestampReceiver == nil protection once the + // multiTestContext goes away. + if !replicationBehind && !lease.Empty() && r.store.cfg.ClosedTimestampReceiver != nil { + otherSideTransportClosed, otherSideTransportLAI := + r.store.cfg.ClosedTimestampReceiver.GetClosedTimestamp(ctx, r.RangeID, lease.Replica.NodeID) + if appliedLAI < otherSideTransportLAI { + otherSideTransportClosed = hlc.Timestamp{} + } + // If otherSideTransportClosed ends up winning, we return it in update so + // that the caller copies it into the Replica. Hopefully, future calls with + // `sufficient` set don't need to go to the Receiver for a while. + if maxClosed.Forward(otherSideTransportClosed) { + update = replicaUpdate{ + sideTransportClosedTimestamp: otherSideTransportClosed, + sideTransportClosedLAI: otherSideTransportLAI, + } + } + } - return maxClosed, true + return maxClosed, true, update } // ClosedTimestampV2 returns the closed timestamp. Unlike MaxClosedTimestamp, it diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 689e278479e9..8e05b7f547d3 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -465,9 +465,15 @@ func (r *Replica) handleInvalidLeaseError( // not be necessary once we break the dependency between closed timestamps // and leases and address the TODO in checkExecutionCanProceed to check the // closed timestamp before consulting the lease. + + var update replicaUpdate + defer update.apply(ctx, r) + r.mu.RLock() defer r.mu.RUnlock() - if r.canServeFollowerReadRLocked(ctx, ba, pErr.GoError()) { + var ok bool + ok, update = r.canServeFollowerReadRLocked(ctx, ba, pErr.GoError()) + if ok { // Follower read possible. Retry command. return nil } diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index baeae3ae0931..30cff8dd7d1e 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -649,8 +649,9 @@ type StoreConfig struct { RPCContext *rpc.Context RangeDescriptorCache *rangecache.RangeCache - ClosedTimestamp *container.Container - ClosedTimestampSender *sidetransport.Sender + ClosedTimestamp *container.Container + ClosedTimestampSender *sidetransport.Sender + ClosedTimestampReceiver *sidetransport.Receiver // SQLExecutor is used by the store to execute SQL statements. SQLExecutor sqlutil.InternalExecutor diff --git a/pkg/kv/kvserver/stores.go b/pkg/kv/kvserver/stores.go index 801aa6b80cd2..9d4bf5f5c9e4 100644 --- a/pkg/kv/kvserver/stores.go +++ b/pkg/kv/kvserver/stores.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -119,6 +120,22 @@ func (ls *Stores) RemoveStore(s *Store) { ls.storeMap.Delete(int64(s.Ident.StoreID)) } +// ForwardSideTransportClosedTimestampForRange forwards the side-transport +// closed timestamp for the local replicas of the given range. +func (ls *Stores) ForwardSideTransportClosedTimestampForRange( + ctx context.Context, rangeID roachpb.RangeID, closedTS hlc.Timestamp, lai ctpb.LAI, +) { + if err := ls.VisitStores(func(s *Store) error { + r := s.GetReplicaIfExists(rangeID) + if r != nil { + r.ForwardSideTransportClosedTimestamp(ctx, closedTS, lai) + } + return nil + }); err != nil { + log.Fatalf(ctx, "unexpected error: %s", err) + } +} + // VisitStores implements a visitor pattern over stores in the // storeMap. The specified function is invoked with each store in // turn. Care is taken to invoke the visitor func without the lock diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index b18ca98d0a42..2d91727a038a 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -72,6 +72,7 @@ go_library( "//pkg/kv/kvprober", "//pkg/kv/kvserver", "//pkg/kv/kvserver/closedts/container", + "//pkg/kv/kvserver/closedts/ctpb", "//pkg/kv/kvserver/closedts/sidetransport", "//pkg/kv/kvserver/kvserverbase", "//pkg/kv/kvserver/kvserverpb", diff --git a/pkg/server/node.go b/pkg/server/node.go index fc2c50571472..3e7162b71326 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -287,6 +287,7 @@ func NewNode( reg *metric.Registry, stopper *stop.Stopper, txnMetrics kvcoord.TxnMetrics, + stores *kvserver.Stores, execCfg *sql.ExecutorConfig, clusterID *base.ClusterIDContainer, ) *Node { @@ -299,7 +300,7 @@ func NewNode( stopper: stopper, recorder: recorder, metrics: makeNodeMetrics(reg, cfg.HistogramWindowInterval), - stores: kvserver.NewStores(cfg.AmbientCtx, cfg.Clock), + stores: stores, txnMetrics: txnMetrics, sqlExec: sqlExec, clusterID: clusterID, diff --git a/pkg/server/server.go b/pkg/server/server.go index 449b45712201..223d24ea4557 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -44,6 +44,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvprober" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/container" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/sidetransport" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" @@ -482,6 +483,8 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { sTS := ts.MakeServer(cfg.AmbientCtx, tsDB, nodeCountFn, cfg.TimeSeriesServerConfig, stopper) ctSender := sidetransport.NewSender(stopper, st, clock, nodeDialer) + stores := kvserver.NewStores(cfg.AmbientCtx, clock) + ctReceiver := sidetransport.NewReceiver(nodeIDContainer, stopper, stores, nil /* testingKnobs */) // The InternalExecutor will be further initialized later, as we create more // of the server's components. There's a circular dependency - many things @@ -537,6 +540,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { RangeDescriptorCache: distSender.RangeDescriptorCache(), TimeSeriesDataStore: tsDB, ClosedTimestampSender: ctSender, + ClosedTimestampReceiver: ctReceiver, // Initialize the closed timestamp subsystem. Note that it won't // be ready until it is .Start()ed, but the grpc server can be @@ -585,12 +589,13 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { node := NewNode( storeCfg, recorder, registry, stopper, - txnMetrics, nil /* execCfg */, &rpcContext.ClusterID) + txnMetrics, stores, nil /* execCfg */, &rpcContext.ClusterID) lateBoundNode = node roachpb.RegisterInternalServer(grpcServer.Server, node) kvserver.RegisterPerReplicaServer(grpcServer.Server, node.perReplicaServer) kvserver.RegisterPerStoreServer(grpcServer.Server, node.perReplicaServer) node.storeCfg.ClosedTimestamp.RegisterClosedTimestampServer(grpcServer.Server) + ctpb.RegisterSideTransportServer(grpcServer.Server, ctReceiver) replicationReporter := reports.NewReporter( db, node.stores, storePool, st, nodeLiveness, internalExecutor)