Skip to content

Commit

Permalink
kvserver: add closedts side-transport consumer
Browse files Browse the repository at this point in the history
Add the consumer of closed timestamps communicated by the side transport
(i.e. the gRPC server for our new push-based streaming protocol).

This side-transport consumer accumulates closed timestamps communicated
to it by other nodes (the leaseholders of the respective ranges). Its
state is queried whenever a range needs a higher closed timestamp than
what it has locally in the Replica state, at which point the Replica's
state is lazily updated.

Release note: None
Release justification: Needed for GLOBAL tables.
  • Loading branch information
andreimatei committed Mar 5, 2021
1 parent 05a7bec commit e8f925e
Show file tree
Hide file tree
Showing 18 changed files with 969 additions and 93 deletions.
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/client_closed_timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ func TestClosedTimestampWorksWhenRequestsAreSentToNonLeaseHolders(t *testing.T)
const closeInterval = 10 * time.Millisecond
sqlRunner.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '"+
closeInterval.String()+"'")
sqlRunner.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '"+
closeInterval.String()+"'")

// To make node3 have a large epoch, synthesize a liveness record for with
// epoch 1000 before starting the node.
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/client_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,8 @@ func TestStoreMaxBehindNanosOnlyTracksEpochBasedLeases(t *testing.T) {
closedTimestampDuration.String())
tdb.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.close_fraction = $1",
closedTimestampFraction)
tdb.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = $1",
closedTimestampDuration.String())

// Let's get to a point where we know that we have an expiration based lease
// with a start time more than some time ago and then we have a max closed
Expand Down
17 changes: 9 additions & 8 deletions pkg/kv/kvserver/closedts/ctpb/service.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions pkg/kv/kvserver/closedts/ctpb/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ message Update {
}
repeated GroupUpdate closed_timestamps = 4 [(gogoproto.nullable) = false];

// removed contains the set of ranges that are no longer registered on the
// stream and who future updates are no longer applicable to.
// removed contains the set of ranges that are no longer tracked on this
// stream. The closed timestamps in this message and future messages no longer
// apply to these removed ranges.
//
// The field will be empty if snapshot is true, as a snapshot message implies
// that all ranges not present in the snapshot's added_or_updated list are no
Expand Down
15 changes: 13 additions & 2 deletions pkg/kv/kvserver/closedts/sidetransport/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "sidetransport",
srcs = ["sender.go"],
srcs = [
"receiver.go",
"sender.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/sidetransport",
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/clusterversion",
"//pkg/kv/kvserver/closedts",
"//pkg/kv/kvserver/closedts/ctpb",
Expand All @@ -19,24 +23,31 @@ go_library(
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@org_golang_google_grpc//:go_default_library",
],
)

go_test(
name = "sidetransport_test",
srcs = ["sender_test.go"],
srcs = [
"receiver_test.go",
"sender_test.go",
],
embed = [":sidetransport"],
deps = [
"//pkg/base",
"//pkg/kv/kvserver/closedts",
"//pkg/kv/kvserver/closedts/ctpb",
"//pkg/roachpb",
"//pkg/rpc",
"//pkg/settings/cluster",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/stop",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
"@org_golang_google_grpc//:go_default_library",
],
Expand Down
Loading

0 comments on commit e8f925e

Please sign in to comment.