diff --git a/cloud/kubernetes/bring-your-own-certs/client.yaml b/cloud/kubernetes/bring-your-own-certs/client.yaml
index 76930e17d005..ff9d929acb96 100644
--- a/cloud/kubernetes/bring-your-own-certs/client.yaml
+++ b/cloud/kubernetes/bring-your-own-certs/client.yaml
@@ -20,7 +20,7 @@ spec:
serviceAccountName: cockroachdb
containers:
- name: cockroachdb-client
- image: cockroachdb/cockroach:v24.2.5
+ image: cockroachdb/cockroach:v24.3.0
# Keep a pod open indefinitely so kubectl exec can be used to get a shell to it
# and run cockroach client commands, such as cockroach sql, cockroach node status, etc.
command:
diff --git a/cloud/kubernetes/bring-your-own-certs/cockroachdb-statefulset.yaml b/cloud/kubernetes/bring-your-own-certs/cockroachdb-statefulset.yaml
index 4a2caef477b3..a68c6e2acc13 100644
--- a/cloud/kubernetes/bring-your-own-certs/cockroachdb-statefulset.yaml
+++ b/cloud/kubernetes/bring-your-own-certs/cockroachdb-statefulset.yaml
@@ -153,7 +153,7 @@ spec:
topologyKey: kubernetes.io/hostname
containers:
- name: cockroachdb
- image: cockroachdb/cockroach:v24.2.5
+ image: cockroachdb/cockroach:v24.3.0
imagePullPolicy: IfNotPresent
# TODO: Change these to appropriate values for the hardware that you're running. You can see
# the resources that can be allocated on each of your Kubernetes nodes by running:
diff --git a/cloud/kubernetes/client-secure.yaml b/cloud/kubernetes/client-secure.yaml
index 45ec196dc487..dd0de00eea11 100644
--- a/cloud/kubernetes/client-secure.yaml
+++ b/cloud/kubernetes/client-secure.yaml
@@ -32,7 +32,7 @@ spec:
mountPath: /cockroach-certs
containers:
- name: cockroachdb-client
- image: cockroachdb/cockroach:v24.2.5
+ image: cockroachdb/cockroach:v24.3.0
imagePullPolicy: IfNotPresent
volumeMounts:
- name: client-certs
diff --git a/cloud/kubernetes/cluster-init-secure.yaml b/cloud/kubernetes/cluster-init-secure.yaml
index 0a22ad3a0f7c..96c373991528 100644
--- a/cloud/kubernetes/cluster-init-secure.yaml
+++ b/cloud/kubernetes/cluster-init-secure.yaml
@@ -34,7 +34,7 @@ spec:
mountPath: /cockroach-certs
containers:
- name: cluster-init
- image: cockroachdb/cockroach:v24.2.5
+ image: cockroachdb/cockroach:v24.3.0
imagePullPolicy: IfNotPresent
volumeMounts:
- name: client-certs
diff --git a/cloud/kubernetes/cluster-init.yaml b/cloud/kubernetes/cluster-init.yaml
index 6ae55e7c7661..c4da71a1c465 100644
--- a/cloud/kubernetes/cluster-init.yaml
+++ b/cloud/kubernetes/cluster-init.yaml
@@ -10,7 +10,7 @@ spec:
spec:
containers:
- name: cluster-init
- image: cockroachdb/cockroach:v24.2.5
+ image: cockroachdb/cockroach:v24.3.0
imagePullPolicy: IfNotPresent
command:
- "/cockroach/cockroach"
diff --git a/cloud/kubernetes/cockroachdb-statefulset-secure.yaml b/cloud/kubernetes/cockroachdb-statefulset-secure.yaml
index 4255dba708d1..dfdad65d6626 100644
--- a/cloud/kubernetes/cockroachdb-statefulset-secure.yaml
+++ b/cloud/kubernetes/cockroachdb-statefulset-secure.yaml
@@ -195,7 +195,7 @@ spec:
topologyKey: kubernetes.io/hostname
containers:
- name: cockroachdb
- image: cockroachdb/cockroach:v24.2.5
+ image: cockroachdb/cockroach:v24.3.0
imagePullPolicy: IfNotPresent
# TODO: Change these to appropriate values for the hardware that you're running. You can see
# the resources that can be allocated on each of your Kubernetes nodes by running:
diff --git a/cloud/kubernetes/cockroachdb-statefulset.yaml b/cloud/kubernetes/cockroachdb-statefulset.yaml
index 03e4bebd3c59..a8ddb94fda2f 100644
--- a/cloud/kubernetes/cockroachdb-statefulset.yaml
+++ b/cloud/kubernetes/cockroachdb-statefulset.yaml
@@ -98,7 +98,7 @@ spec:
topologyKey: kubernetes.io/hostname
containers:
- name: cockroachdb
- image: cockroachdb/cockroach:v24.2.5
+ image: cockroachdb/cockroach:v24.3.0
imagePullPolicy: IfNotPresent
# TODO: Change these to appropriate values for the hardware that you're running. You can see
# the resources that can be allocated on each of your Kubernetes nodes by running:
diff --git a/cloud/kubernetes/multiregion/client-secure.yaml b/cloud/kubernetes/multiregion/client-secure.yaml
index 34ef54960bbf..aa6c0c117d91 100644
--- a/cloud/kubernetes/multiregion/client-secure.yaml
+++ b/cloud/kubernetes/multiregion/client-secure.yaml
@@ -9,7 +9,7 @@ spec:
serviceAccountName: cockroachdb
containers:
- name: cockroachdb-client
- image: cockroachdb/cockroach:v24.2.5
+ image: cockroachdb/cockroach:v24.3.0
imagePullPolicy: IfNotPresent
volumeMounts:
- name: client-certs
diff --git a/cloud/kubernetes/multiregion/cluster-init-secure.yaml b/cloud/kubernetes/multiregion/cluster-init-secure.yaml
index fb95c0c04610..a8cb164afc01 100644
--- a/cloud/kubernetes/multiregion/cluster-init-secure.yaml
+++ b/cloud/kubernetes/multiregion/cluster-init-secure.yaml
@@ -11,7 +11,7 @@ spec:
serviceAccountName: cockroachdb
containers:
- name: cluster-init
- image: cockroachdb/cockroach:v24.2.5
+ image: cockroachdb/cockroach:v24.3.0
imagePullPolicy: IfNotPresent
volumeMounts:
- name: client-certs
diff --git a/cloud/kubernetes/multiregion/cockroachdb-statefulset-secure.yaml b/cloud/kubernetes/multiregion/cockroachdb-statefulset-secure.yaml
index 6335666ebb55..369052ec4692 100644
--- a/cloud/kubernetes/multiregion/cockroachdb-statefulset-secure.yaml
+++ b/cloud/kubernetes/multiregion/cockroachdb-statefulset-secure.yaml
@@ -167,7 +167,7 @@ spec:
topologyKey: kubernetes.io/hostname
containers:
- name: cockroachdb
- image: cockroachdb/cockroach:v24.2.5
+ image: cockroachdb/cockroach:v24.3.0
imagePullPolicy: IfNotPresent
ports:
- containerPort: 26257
diff --git a/cloud/kubernetes/multiregion/eks/cockroachdb-statefulset-secure-eks.yaml b/cloud/kubernetes/multiregion/eks/cockroachdb-statefulset-secure-eks.yaml
index 6f0642a52f67..62920152fca8 100644
--- a/cloud/kubernetes/multiregion/eks/cockroachdb-statefulset-secure-eks.yaml
+++ b/cloud/kubernetes/multiregion/eks/cockroachdb-statefulset-secure-eks.yaml
@@ -185,7 +185,7 @@ spec:
name: cockroach-env
containers:
- name: cockroachdb
- image: cockroachdb/cockroach:v24.2.5
+ image: cockroachdb/cockroach:v24.3.0
imagePullPolicy: IfNotPresent
# TODO: Change these to appropriate values for the hardware that you're running. You can see
# the resources that can be allocated on each of your Kubernetes nodes by running:
diff --git a/cloud/kubernetes/performance/cockroachdb-daemonset-insecure.yaml b/cloud/kubernetes/performance/cockroachdb-daemonset-insecure.yaml
index 6918d3cfebd1..407a4b4de9c8 100644
--- a/cloud/kubernetes/performance/cockroachdb-daemonset-insecure.yaml
+++ b/cloud/kubernetes/performance/cockroachdb-daemonset-insecure.yaml
@@ -82,7 +82,7 @@ spec:
hostNetwork: true
containers:
- name: cockroachdb
- image: cockroachdb/cockroach:v24.2.5
+ image: cockroachdb/cockroach:v24.3.0
imagePullPolicy: IfNotPresent
# TODO: If you configured taints to give CockroachDB exclusive access to nodes, feel free
# to remove the requests and limits sections. If you didn't, you'll need to change these to
diff --git a/cloud/kubernetes/performance/cockroachdb-daemonset-secure.yaml b/cloud/kubernetes/performance/cockroachdb-daemonset-secure.yaml
index fe357858fbc1..fa6ad936163a 100644
--- a/cloud/kubernetes/performance/cockroachdb-daemonset-secure.yaml
+++ b/cloud/kubernetes/performance/cockroachdb-daemonset-secure.yaml
@@ -198,7 +198,7 @@ spec:
topologyKey: kubernetes.io/hostname
containers:
- name: cockroachdb
- image: cockroachdb/cockroach:v24.2.5
+ image: cockroachdb/cockroach:v24.3.0
imagePullPolicy: IfNotPresent
# TODO: If you configured taints to give CockroachDB exclusive access to nodes, feel free
# to remove the requests and limits sections. If you didn't, you'll need to change these to
diff --git a/cloud/kubernetes/performance/cockroachdb-statefulset-insecure.yaml b/cloud/kubernetes/performance/cockroachdb-statefulset-insecure.yaml
index 6d69c0176b6f..335ff1229134 100644
--- a/cloud/kubernetes/performance/cockroachdb-statefulset-insecure.yaml
+++ b/cloud/kubernetes/performance/cockroachdb-statefulset-insecure.yaml
@@ -141,7 +141,7 @@ spec:
- name: cockroachdb
# NOTE: Always use the most recent version of CockroachDB for the best
# performance and reliability.
- image: cockroachdb/cockroach:v24.2.5
+ image: cockroachdb/cockroach:v24.3.0
imagePullPolicy: IfNotPresent
# TODO: Change these to appropriate values for the hardware that you're running. You can see
# the resources that can be allocated on each of your Kubernetes nodes by running:
diff --git a/cloud/kubernetes/performance/cockroachdb-statefulset-secure.yaml b/cloud/kubernetes/performance/cockroachdb-statefulset-secure.yaml
index 112331e0fa48..2573b771583b 100644
--- a/cloud/kubernetes/performance/cockroachdb-statefulset-secure.yaml
+++ b/cloud/kubernetes/performance/cockroachdb-statefulset-secure.yaml
@@ -232,7 +232,7 @@ spec:
- name: cockroachdb
# NOTE: Always use the most recent version of CockroachDB for the best
# performance and reliability.
- image: cockroachdb/cockroach:v24.2.5
+ image: cockroachdb/cockroach:v24.3.0
imagePullPolicy: IfNotPresent
# TODO: Change these to appropriate values for the hardware that you're running. You can see
# the resources that can be allocated on each of your Kubernetes nodes by running:
diff --git a/cloud/kubernetes/v1.6/client-secure.yaml b/cloud/kubernetes/v1.6/client-secure.yaml
index 63ec1c98614e..aac217e7066b 100644
--- a/cloud/kubernetes/v1.6/client-secure.yaml
+++ b/cloud/kubernetes/v1.6/client-secure.yaml
@@ -32,7 +32,7 @@ spec:
mountPath: /cockroach-certs
containers:
- name: cockroachdb-client
- image: cockroachdb/cockroach:v24.2.5
+ image: cockroachdb/cockroach:v24.3.0
imagePullPolicy: IfNotPresent
volumeMounts:
- name: client-certs
diff --git a/cloud/kubernetes/v1.6/cluster-init-secure.yaml b/cloud/kubernetes/v1.6/cluster-init-secure.yaml
index 9124fcad638e..a71793fff22d 100644
--- a/cloud/kubernetes/v1.6/cluster-init-secure.yaml
+++ b/cloud/kubernetes/v1.6/cluster-init-secure.yaml
@@ -34,7 +34,7 @@ spec:
mountPath: /cockroach-certs
containers:
- name: cluster-init
- image: cockroachdb/cockroach:v24.2.5
+ image: cockroachdb/cockroach:v24.3.0
imagePullPolicy: IfNotPresent
volumeMounts:
- name: client-certs
diff --git a/cloud/kubernetes/v1.6/cluster-init.yaml b/cloud/kubernetes/v1.6/cluster-init.yaml
index 3a9f671bb171..228cabd45323 100644
--- a/cloud/kubernetes/v1.6/cluster-init.yaml
+++ b/cloud/kubernetes/v1.6/cluster-init.yaml
@@ -10,7 +10,7 @@ spec:
spec:
containers:
- name: cluster-init
- image: cockroachdb/cockroach:v24.2.5
+ image: cockroachdb/cockroach:v24.3.0
imagePullPolicy: IfNotPresent
command:
- "/cockroach/cockroach"
diff --git a/cloud/kubernetes/v1.6/cockroachdb-statefulset-secure.yaml b/cloud/kubernetes/v1.6/cockroachdb-statefulset-secure.yaml
index dbbf6c7fa1c7..c8c76839723d 100644
--- a/cloud/kubernetes/v1.6/cockroachdb-statefulset-secure.yaml
+++ b/cloud/kubernetes/v1.6/cockroachdb-statefulset-secure.yaml
@@ -178,7 +178,7 @@ spec:
topologyKey: kubernetes.io/hostname
containers:
- name: cockroachdb
- image: cockroachdb/cockroach:v24.2.5
+ image: cockroachdb/cockroach:v24.3.0
imagePullPolicy: IfNotPresent
ports:
- containerPort: 26257
diff --git a/cloud/kubernetes/v1.6/cockroachdb-statefulset.yaml b/cloud/kubernetes/v1.6/cockroachdb-statefulset.yaml
index dc46d55174c6..7d6f00080c78 100644
--- a/cloud/kubernetes/v1.6/cockroachdb-statefulset.yaml
+++ b/cloud/kubernetes/v1.6/cockroachdb-statefulset.yaml
@@ -81,7 +81,7 @@ spec:
topologyKey: kubernetes.io/hostname
containers:
- name: cockroachdb
- image: cockroachdb/cockroach:v24.2.5
+ image: cockroachdb/cockroach:v24.3.0
imagePullPolicy: IfNotPresent
ports:
- containerPort: 26257
diff --git a/cloud/kubernetes/v1.7/client-secure.yaml b/cloud/kubernetes/v1.7/client-secure.yaml
index 87fbf021bd6e..a99425c59ffc 100644
--- a/cloud/kubernetes/v1.7/client-secure.yaml
+++ b/cloud/kubernetes/v1.7/client-secure.yaml
@@ -32,7 +32,7 @@ spec:
mountPath: /cockroach-certs
containers:
- name: cockroachdb-client
- image: cockroachdb/cockroach:v24.2.5
+ image: cockroachdb/cockroach:v24.3.0
imagePullPolicy: IfNotPresent
volumeMounts:
- name: client-certs
diff --git a/cloud/kubernetes/v1.7/cluster-init-secure.yaml b/cloud/kubernetes/v1.7/cluster-init-secure.yaml
index e493ef72becd..9e0d7439ecfb 100644
--- a/cloud/kubernetes/v1.7/cluster-init-secure.yaml
+++ b/cloud/kubernetes/v1.7/cluster-init-secure.yaml
@@ -34,7 +34,7 @@ spec:
mountPath: /cockroach-certs
containers:
- name: cluster-init
- image: cockroachdb/cockroach:v24.2.5
+ image: cockroachdb/cockroach:v24.3.0
imagePullPolicy: IfNotPresent
volumeMounts:
- name: client-certs
diff --git a/cloud/kubernetes/v1.7/cluster-init.yaml b/cloud/kubernetes/v1.7/cluster-init.yaml
index 7925337105ec..123081e9865f 100644
--- a/cloud/kubernetes/v1.7/cluster-init.yaml
+++ b/cloud/kubernetes/v1.7/cluster-init.yaml
@@ -10,7 +10,7 @@ spec:
spec:
containers:
- name: cluster-init
- image: cockroachdb/cockroach:v24.2.5
+ image: cockroachdb/cockroach:v24.3.0
imagePullPolicy: IfNotPresent
command:
- "/cockroach/cockroach"
diff --git a/cloud/kubernetes/v1.7/cockroachdb-statefulset-secure.yaml b/cloud/kubernetes/v1.7/cockroachdb-statefulset-secure.yaml
index 329dcb369c1c..57bfea164c2f 100644
--- a/cloud/kubernetes/v1.7/cockroachdb-statefulset-secure.yaml
+++ b/cloud/kubernetes/v1.7/cockroachdb-statefulset-secure.yaml
@@ -190,7 +190,7 @@ spec:
topologyKey: kubernetes.io/hostname
containers:
- name: cockroachdb
- image: cockroachdb/cockroach:v24.2.5
+ image: cockroachdb/cockroach:v24.3.0
imagePullPolicy: IfNotPresent
ports:
- containerPort: 26257
diff --git a/cloud/kubernetes/v1.7/cockroachdb-statefulset.yaml b/cloud/kubernetes/v1.7/cockroachdb-statefulset.yaml
index 87cf5bc7893c..8cc94c63277c 100644
--- a/cloud/kubernetes/v1.7/cockroachdb-statefulset.yaml
+++ b/cloud/kubernetes/v1.7/cockroachdb-statefulset.yaml
@@ -93,7 +93,7 @@ spec:
topologyKey: kubernetes.io/hostname
containers:
- name: cockroachdb
- image: cockroachdb/cockroach:v24.2.5
+ image: cockroachdb/cockroach:v24.3.0
imagePullPolicy: IfNotPresent
ports:
- containerPort: 26257
diff --git a/docs/generated/metrics/metrics.html b/docs/generated/metrics/metrics.html
index 1e96c7274080..a03d1d3dbe4c 100644
--- a/docs/generated/metrics/metrics.html
+++ b/docs/generated/metrics/metrics.html
@@ -1530,6 +1530,10 @@
APPLICATION | kv.protectedts.reconciliation.num_runs | number of successful reconciliation runs on this node | Count | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
APPLICATION | kv.protectedts.reconciliation.records_processed | number of records processed without error during reconciliation on this node | Count | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
APPLICATION | kv.protectedts.reconciliation.records_removed | number of records removed during reconciliation runs on this node | Count | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
+APPLICATION | kv.streamer.batches.in_progress | Number of BatchRequests in progress across all KV Streamer operators | Batches | GAUGE | COUNT | AVG | NONE |
+APPLICATION | kv.streamer.batches.sent | Number of BatchRequests sent across all KV Streamer operators | Batches | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
+APPLICATION | kv.streamer.batches.throttled | Number of BatchRequests currently being throttled due to reaching the concurrency limit, across all KV Streamer operators | Batches | GAUGE | COUNT | AVG | NONE |
+APPLICATION | kv.streamer.operators.active | Number of KV Streamer operators currently in use | Operators | GAUGE | COUNT | AVG | NONE |
APPLICATION | logical_replication.batch_hist_nanos | Time spent flushing a batch | Nanoseconds | HISTOGRAM | NANOSECONDS | AVG | NONE |
APPLICATION | logical_replication.catchup_ranges | Source side ranges undergoing catch up scans (innacurate with multiple LDR jobs) | Ranges | GAUGE | COUNT | AVG | NONE |
APPLICATION | logical_replication.catchup_ranges_by_label | Source side ranges undergoing catch up scans | Ranges | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel
index f45319a44f26..8eedf8d99ef0 100644
--- a/pkg/BUILD.bazel
+++ b/pkg/BUILD.bazel
@@ -2101,6 +2101,7 @@ GO_TARGETS = [
"//pkg/sql/regionliveness:regionliveness",
"//pkg/sql/regions:regions",
"//pkg/sql/regions:regions_test",
+ "//pkg/sql/rolemembershipcache:rolemembershipcache",
"//pkg/sql/roleoption:roleoption",
"//pkg/sql/row:row",
"//pkg/sql/row:row_test",
diff --git a/pkg/cmd/reduce/main.go b/pkg/cmd/reduce/main.go
index 214a99233f5c..73e10cee44ff 100644
--- a/pkg/cmd/reduce/main.go
+++ b/pkg/cmd/reduce/main.go
@@ -47,6 +47,8 @@ var (
}()
flags = flag.NewFlagSet(os.Args[0], flag.ExitOnError)
binary = flags.String("binary", "./cockroach", "path to cockroach binary")
+ httpPort = flags.Int("http-port", 8080, "first port number for HTTP servers in demo")
+ sqlPort = flags.Int("sql-port", 26257, "first port number for SQL servers in demo")
file = flags.String("file", "", "the path to a file containing SQL queries to reduce; required")
outFlag = flags.String("out", "", "if set, the path to a new file where reduced result will be written to")
verbose = flags.Bool("v", false, "print progress to standard output and the original test case output if it is not interesting")
@@ -117,8 +119,8 @@ func main() {
}
reducesql.LogUnknown = *unknown
out, err := reduceSQL(
- *binary, *contains, file, *workers, *verbose, *chunkReductions, *multiRegion,
- *tlp, *costfuzz, *unoptimizedOracle,
+ *binary, *httpPort, *sqlPort, *contains, file, *workers, *verbose,
+ *chunkReductions, *multiRegion, *tlp, *costfuzz, *unoptimizedOracle,
)
if err != nil {
log.Fatal(err)
@@ -136,7 +138,9 @@ func main() {
}
func reduceSQL(
- binary, contains string,
+ binary string,
+ httpPort, sqlPort int,
+ contains string,
file *string,
workers int,
verbose bool,
@@ -292,28 +296,22 @@ SELECT '%[1]s';
}
isInteresting := func(ctx context.Context, sql string) (interesting bool, logOriginalHint func()) {
- // If not multi-region, disable license generation. Do not exit on errors so
- // the entirety of the input SQL is processed.
- var cmd *exec.Cmd
+ args := []string{
+ "demo",
+ "--insecure",
+ "--empty",
+ // Do not exit on errors so the entirety of the input SQL is
+ // processed.
+ "--set=errexit=false",
+ "--format=tsv",
+ fmt.Sprintf("--http-port=%d", httpPort),
+ fmt.Sprintf("--sql-port=%d", sqlPort),
+ }
if multiRegion {
- cmd = exec.CommandContext(ctx, binary,
- "demo",
- "--insecure",
- "--empty",
- "--nodes=9",
- "--multitenant=false",
- "--set=errexit=false",
- "--format=tsv",
- )
- } else {
- cmd = exec.CommandContext(ctx, binary,
- "demo",
- "--insecure",
- "--empty",
- "--set=errexit=false",
- "--format=tsv",
- )
+ args = append(args, "--nodes=9")
+ args = append(args, "--multitenant=false")
}
+ cmd := exec.CommandContext(ctx, binary, args...)
// Disable telemetry.
cmd.Env = []string{"COCKROACH_SKIP_ENABLING_DIAGNOSTIC_REPORTING", "true"}
sql = settings + sql
diff --git a/pkg/cmd/roachtest/clusterstats/exporter.go b/pkg/cmd/roachtest/clusterstats/exporter.go
index 9e4dbbc7cf02..052855014409 100644
--- a/pkg/cmd/roachtest/clusterstats/exporter.go
+++ b/pkg/cmd/roachtest/clusterstats/exporter.go
@@ -144,13 +144,13 @@ func serializeOpenmetricsReport(r ClusterStatRun, labelString *string) (*bytes.B
// Emit summary metrics from Total
for metricName, value := range r.Total {
- buffer.WriteString(fmt.Sprintf("# TYPE %s gauge\n", util.SanitizeMetricName(metricName)))
+ buffer.WriteString(GetOpenmetricsGaugeType(metricName))
buffer.WriteString(fmt.Sprintf("%s{%s} %f %d\n", util.SanitizeMetricName(metricName), *labelString, value, timeutil.Now().UTC().Unix()))
}
// Emit histogram metrics from Stats
for _, stat := range r.Stats {
- buffer.WriteString(fmt.Sprintf("# TYPE %s gauge\n", util.SanitizeMetricName(stat.Tag)))
+ buffer.WriteString(GetOpenmetricsGaugeType(stat.Tag))
for i, timestamp := range stat.Time {
t := timeutil.Unix(0, timestamp)
buffer.WriteString(
@@ -445,3 +445,7 @@ func GetOpenmetricsLabelMap(
}
return defaultMap
}
+
+func GetOpenmetricsGaugeType(metricName string) string {
+ return fmt.Sprintf("# TYPE %s gauge\n", util.SanitizeMetricName(metricName))
+}
diff --git a/pkg/cmd/roachtest/tests/BUILD.bazel b/pkg/cmd/roachtest/tests/BUILD.bazel
index 8d71854934f0..686f4ff5931d 100644
--- a/pkg/cmd/roachtest/tests/BUILD.bazel
+++ b/pkg/cmd/roachtest/tests/BUILD.bazel
@@ -215,6 +215,7 @@ go_library(
"//pkg/cloud/amazon",
"//pkg/cloud/gcp",
"//pkg/cmd/cmpconn",
+ "//pkg/cmd/roachprod-microbench/util",
"//pkg/cmd/roachtest/cluster",
"//pkg/cmd/roachtest/clusterstats",
"//pkg/cmd/roachtest/grafana",
diff --git a/pkg/cmd/roachtest/tests/sysbench.go b/pkg/cmd/roachtest/tests/sysbench.go
index c7342690ff27..ecba1f68eaca 100644
--- a/pkg/cmd/roachtest/tests/sysbench.go
+++ b/pkg/cmd/roachtest/tests/sysbench.go
@@ -7,6 +7,7 @@ package tests
import (
"bufio"
+ "bytes"
"context"
"encoding/json"
"fmt"
@@ -15,7 +16,9 @@ import (
"strings"
"time"
+ "github.com/cockroachdb/cockroach/pkg/cmd/roachprod-microbench/util"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
+ "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/clusterstats"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil"
@@ -193,7 +196,7 @@ func runSysbench(ctx context.Context, t test.Test, c cluster.Cluster, opts sysbe
}
t.Status("exporting results")
- return exportSysbenchResults(t, result.Stdout, start)
+ return exportSysbenchResults(t, c, result.Stdout, start, opts)
}
if opts.usePostgres {
if err := runWorkload(ctx); err != nil {
@@ -281,12 +284,20 @@ type sysbenchMetrics struct {
Reconnects string `json:"reconnects"`
}
-// exportSysbenchResults parses the output of `sysbench` into a JSON file
-// and writes it to the perf directory that roachperf expects. Sysbench does
-// have a way to customize the report output via injecting a custom
-// `sysbench.hooks.report_intermediate` hook, but then we would lose the
-// human-readable output in the test itself.
-func exportSysbenchResults(t test.Test, result string, start time.Time) error {
+type openmetricsValues struct {
+ Value string
+ Time int64
+}
+
+// exportSysbenchResults parses the output of `sysbench` into a stats
+// file and writes it to the perf directory that roachperf expects. The
+// format of the stats file is dependent on t.ExportOpenmetrics().
+// Sysbench does have a way to customize the report output via injecting
+// a custom `sysbench.hooks.report_intermediate` hook, but then we would
+// lose the human-readable output in the test itself.
+func exportSysbenchResults(
+ t test.Test, c cluster.Cluster, result string, start time.Time, opts sysbenchOptions,
+) error {
// Parse the results into a JSON file that roachperf understands.
// The output of the results look like:
// 1. Start up information.
@@ -309,6 +320,46 @@ func exportSysbenchResults(t test.Test, result string, start time.Time) error {
var snapshotsFound int
s := bufio.NewScanner(strings.NewReader(result))
+ labels := map[string]string{
+ "distribution": opts.distribution,
+ "duration": fmt.Sprintf("%f", opts.duration.Seconds()),
+ "concurrency": fmt.Sprintf("%d", opts.concurrency),
+ "table": fmt.Sprintf("%d", opts.tables),
+ "rows-per-table": fmt.Sprintf("%d", opts.rowsPerTable),
+ "use-postgres": fmt.Sprintf("%t", opts.usePostgres),
+ }
+ labelString := clusterstats.GetOpenmetricsLabelString(t, c, labels)
+ openmetricsMap := make(map[string][]openmetricsValues)
+ tick := func(fields []string, qpsByType []string) error {
+ snapshotTick := sysbenchMetrics{
+ Time: start.Unix(),
+ Threads: fields[1],
+ Transactions: fields[3],
+ Qps: fields[5],
+ ReadQps: qpsByType[0],
+ WriteQps: qpsByType[1],
+ OtherQps: qpsByType[2],
+ P95Latency: fields[10],
+ Errors: fields[12],
+ Reconnects: fields[14],
+ }
+
+ if t.ExportOpenmetrics() {
+ addCurrentSnapshotToOpenmetrics(snapshotTick, openmetricsMap)
+ } else {
+ var snapshotTickBytes []byte
+ snapshotTickBytes, err = json.Marshal(snapshotTick)
+ if err != nil {
+ return errors.Errorf("error marshaling metrics")
+ }
+ snapshotTickBytes = append(snapshotTickBytes, []byte("\n")...)
+ metricBytes = append(metricBytes, snapshotTickBytes...)
+ }
+
+ start = start.Add(time.Second)
+ return nil
+ }
+
for s.Scan() {
if matched := regex.MatchString(s.Text()); !matched {
continue
@@ -328,26 +379,11 @@ func exportSysbenchResults(t test.Test, result string, start time.Time) error {
if len(qpsByType) != 3 {
return errors.Errorf("QPS metrics output in unexpected format, expected 3 fields got: %d", len(qpsByType))
}
- snapshotTick := sysbenchMetrics{
- Time: start.Unix(),
- Threads: fields[1],
- Transactions: fields[3],
- Qps: fields[5],
- ReadQps: qpsByType[0],
- WriteQps: qpsByType[1],
- OtherQps: qpsByType[2],
- P95Latency: fields[10],
- Errors: fields[12],
- Reconnects: fields[14],
- }
- snapshotTickBytes, err := json.Marshal(snapshotTick)
- if err != nil {
- return errors.Errorf("error marshaling metrics")
+ if err := tick(fields, qpsByType); err != nil {
+ return err
}
- metricBytes = append(metricBytes, snapshotTickBytes...)
- metricBytes = append(metricBytes, []byte("\n")...)
- start = start.Add(time.Second)
+
}
// Guard against the possibility that the format changed and we no longer
// get any output.
@@ -363,5 +399,40 @@ func exportSysbenchResults(t test.Test, result string, start time.Time) error {
return err
}
- return os.WriteFile(fmt.Sprintf("%s/stats.json", perfDir), metricBytes, 0666)
+ if t.ExportOpenmetrics() {
+ metricBytes = getOpenmetricsBytes(openmetricsMap, labelString)
+ }
+ return os.WriteFile(fmt.Sprintf("%s/%s", perfDir, roachtestutil.GetBenchmarkMetricsFileName(t)), metricBytes, 0666)
+}
+
+// Add sysbenchMetrics to the openmetricsMap
+func addCurrentSnapshotToOpenmetrics(
+ metrics sysbenchMetrics, openmetricsMap map[string][]openmetricsValues,
+) {
+ time := metrics.Time
+ openmetricsMap["threads"] = append(openmetricsMap["threads"], openmetricsValues{Value: metrics.Threads, Time: time})
+ openmetricsMap["transactions"] = append(openmetricsMap["transactions"], openmetricsValues{Value: metrics.Transactions, Time: time})
+ openmetricsMap["qps"] = append(openmetricsMap["qps"], openmetricsValues{Value: metrics.Qps, Time: time})
+ openmetricsMap["read_qps"] = append(openmetricsMap["read_qps"], openmetricsValues{Value: metrics.ReadQps, Time: time})
+ openmetricsMap["write_qps"] = append(openmetricsMap["write_qps"], openmetricsValues{Value: metrics.WriteQps, Time: time})
+ openmetricsMap["other_qps"] = append(openmetricsMap["other_qps"], openmetricsValues{Value: metrics.OtherQps, Time: time})
+ openmetricsMap["p95_latency"] = append(openmetricsMap["p95_latency"], openmetricsValues{Value: metrics.P95Latency, Time: time})
+ openmetricsMap["errors"] = append(openmetricsMap["errors"], openmetricsValues{Value: metrics.Errors, Time: time})
+ openmetricsMap["reconnects"] = append(openmetricsMap["reconnects"], openmetricsValues{Value: metrics.Reconnects, Time: time})
+}
+
+// Convert openmetricsMap to bytes for writing to file
+func getOpenmetricsBytes(openmetricsMap map[string][]openmetricsValues, labelString string) []byte {
+ metricsBuf := bytes.NewBuffer([]byte{})
+ for key, values := range openmetricsMap {
+ metricName := util.SanitizeMetricName(key)
+ metricsBuf.WriteString(clusterstats.GetOpenmetricsGaugeType(metricName))
+ for _, value := range values {
+ metricsBuf.WriteString(fmt.Sprintf("%s{%s} %s %d\n", metricName, labelString, value.Value, value.Time))
+ }
+ }
+
+ // Add # EOF at the end for openmetrics
+ metricsBuf.WriteString("# EOF\n")
+ return metricsBuf.Bytes()
}
diff --git a/pkg/kv/kvclient/kvstreamer/BUILD.bazel b/pkg/kv/kvclient/kvstreamer/BUILD.bazel
index 2c46e9b9dd10..5df59942ed37 100644
--- a/pkg/kv/kvclient/kvstreamer/BUILD.bazel
+++ b/pkg/kv/kvclient/kvstreamer/BUILD.bazel
@@ -5,6 +5,7 @@ go_library(
srcs = [
"avg_response_estimator.go",
"budget.go",
+ "metrics.go",
"requests_provider.go",
"results_buffer.go",
"size.go",
@@ -30,6 +31,7 @@ go_library(
"//pkg/util/buildutil",
"//pkg/util/humanizeutil",
"//pkg/util/log",
+ "//pkg/util/metric",
"//pkg/util/mon",
"//pkg/util/quotapool",
"//pkg/util/stop",
diff --git a/pkg/kv/kvclient/kvstreamer/metrics.go b/pkg/kv/kvclient/kvstreamer/metrics.go
new file mode 100644
index 000000000000..269b420a763d
--- /dev/null
+++ b/pkg/kv/kvclient/kvstreamer/metrics.go
@@ -0,0 +1,55 @@
+// Copyright 2024 The Cockroach Authors.
+//
+// Use of this software is governed by the CockroachDB Software License
+// included in the /LICENSE file.
+
+package kvstreamer
+
+import "github.com/cockroachdb/cockroach/pkg/util/metric"
+
+var (
+ metaStreamerCount = metric.Metadata{
+ Name: "kv.streamer.operators.active",
+ Help: "Number of KV Streamer operators currently in use",
+ Measurement: "Operators",
+ Unit: metric.Unit_COUNT,
+ }
+ metaBatchesSent = metric.Metadata{
+ Name: "kv.streamer.batches.sent",
+ Help: "Number of BatchRequests sent across all KV Streamer operators",
+ Measurement: "Batches",
+ Unit: metric.Unit_COUNT,
+ }
+ metaBatchesInProgress = metric.Metadata{
+ Name: "kv.streamer.batches.in_progress",
+ Help: "Number of BatchRequests in progress across all KV Streamer operators",
+ Measurement: "Batches",
+ Unit: metric.Unit_COUNT,
+ }
+ metaBatchesThrottled = metric.Metadata{
+ Name: "kv.streamer.batches.throttled",
+ Help: "Number of BatchRequests currently being throttled due to reaching the concurrency limit, across all KV Streamer operators",
+ Measurement: "Batches",
+ Unit: metric.Unit_COUNT,
+ }
+)
+
+type Metrics struct {
+ OperatorsCount *metric.Gauge
+ BatchesSent *metric.Counter
+ BatchesInProgress *metric.Gauge
+ BatchesThrottled *metric.Gauge
+}
+
+var _ metric.Struct = Metrics{}
+
+func (Metrics) MetricStruct() {}
+
+func MakeMetrics() Metrics {
+ return Metrics{
+ OperatorsCount: metric.NewGauge(metaStreamerCount),
+ BatchesSent: metric.NewCounter(metaBatchesSent),
+ BatchesInProgress: metric.NewGauge(metaBatchesInProgress),
+ BatchesThrottled: metric.NewGauge(metaBatchesThrottled),
+ }
+}
diff --git a/pkg/kv/kvclient/kvstreamer/streamer.go b/pkg/kv/kvclient/kvstreamer/streamer.go
index cff6af64457f..11218051b4ca 100644
--- a/pkg/kv/kvclient/kvstreamer/streamer.go
+++ b/pkg/kv/kvclient/kvstreamer/streamer.go
@@ -217,6 +217,7 @@ func (r Result) Release(ctx context.Context) {
// TODO(yuzefovich): support pipelining of Enqueue and GetResults calls.
type Streamer struct {
distSender *kvcoord.DistSender
+ metrics *Metrics
stopper *stop.Stopper
// sd can be nil in tests.
sd *sessiondata.SessionData
@@ -378,6 +379,7 @@ type sendFn func(context.Context, *kvpb.BatchRequest) (*kvpb.BatchResponse, erro
// parameters of all received responses.
func NewStreamer(
distSender *kvcoord.DistSender,
+ metrics *Metrics,
stopper *stop.Stopper,
txn *kv.Txn,
sendFn func(context.Context, *kvpb.BatchRequest) (*kvpb.BatchResponse, error),
@@ -408,6 +410,7 @@ func NewStreamer(
}
s := &Streamer{
distSender: distSender,
+ metrics: metrics,
stopper: stopper,
sd: sd,
headOfLineOnlyFraction: headOfLineOnlyFraction,
@@ -415,6 +418,7 @@ func NewStreamer(
lockStrength: lockStrength,
lockDurability: lockDurability,
}
+ s.metrics.OperatorsCount.Inc(1)
if kvPairsRead == nil {
kvPairsRead = new(int64)
@@ -827,6 +831,7 @@ func (s *Streamer) Close(ctx context.Context) {
// exited.
s.results.close(ctx)
}
+ s.metrics.OperatorsCount.Dec(1)
*s = Streamer{}
}
@@ -891,7 +896,6 @@ func (w *workerCoordinator) mainLoop(ctx context.Context) {
return
}
- w.s.requestsToServe.Lock()
// The coordinator goroutine is the only one that removes requests from
// w.s.requestsToServe, so we can keep the reference to next request
// without holding the lock.
@@ -900,8 +904,11 @@ func (w *workerCoordinator) mainLoop(ctx context.Context) {
// issueRequestsForAsyncProcessing() another request with higher urgency
// is added; however, this is not a problem - we wait for available
// budget here on a best-effort basis.
- nextReq := w.s.requestsToServe.nextLocked()
- w.s.requestsToServe.Unlock()
+ nextReq := func() singleRangeBatch {
+ w.s.requestsToServe.Lock()
+ defer w.s.requestsToServe.Unlock()
+ return w.s.requestsToServe.nextLocked()
+ }()
// If we already have minTargetBytes set on the first request to be
// issued, then use that.
atLeastBytes := nextReq.minTargetBytes
@@ -1069,6 +1076,13 @@ func (w *workerCoordinator) getMaxNumRequestsToIssue(ctx context.Context) (_ int
}
// The whole quota is currently used up, so we blockingly acquire a quota of
// 1.
+ numBatches := func() int64 {
+ w.s.requestsToServe.Lock()
+ defer w.s.requestsToServe.Unlock()
+ return int64(w.s.requestsToServe.lengthLocked())
+ }()
+ w.s.metrics.BatchesThrottled.Inc(numBatches)
+ defer w.s.metrics.BatchesThrottled.Dec(numBatches)
alloc, err := w.asyncSem.Acquire(ctx, 1)
if err != nil {
w.s.results.setError(err)
@@ -1359,6 +1373,10 @@ func (w *workerCoordinator) performRequestAsync(
},
func(ctx context.Context) {
defer w.asyncRequestCleanup(false /* budgetMuAlreadyLocked */)
+ w.s.metrics.BatchesSent.Inc(1)
+ w.s.metrics.BatchesInProgress.Inc(1)
+ defer w.s.metrics.BatchesInProgress.Dec(1)
+
ba := &kvpb.BatchRequest{}
ba.Header.WaitPolicy = w.lockWaitPolicy
ba.Header.TargetBytes = targetBytes
diff --git a/pkg/kv/kvclient/kvstreamer/streamer_accounting_test.go b/pkg/kv/kvclient/kvstreamer/streamer_accounting_test.go
index 0ee45c7b8741..889ac783a408 100644
--- a/pkg/kv/kvclient/kvstreamer/streamer_accounting_test.go
+++ b/pkg/kv/kvclient/kvstreamer/streamer_accounting_test.go
@@ -80,8 +80,10 @@ func TestStreamerMemoryAccounting(t *testing.T) {
panic(err)
}
leafTxn := kv.NewLeafTxn(ctx, s.DB(), s.DistSQLPlanningNodeID(), leafInputState)
+ metrics := MakeMetrics()
s := NewStreamer(
s.DistSenderI().(*kvcoord.DistSender),
+ &metrics,
s.AppStopper(),
leafTxn,
func(ctx context.Context, ba *kvpb.BatchRequest) (*kvpb.BatchResponse, error) {
diff --git a/pkg/kv/kvclient/kvstreamer/streamer_test.go b/pkg/kv/kvclient/kvstreamer/streamer_test.go
index 275138f13ff3..25b3b63d8288 100644
--- a/pkg/kv/kvclient/kvstreamer/streamer_test.go
+++ b/pkg/kv/kvclient/kvstreamer/streamer_test.go
@@ -50,8 +50,10 @@ func getStreamer(
panic(err)
}
leafTxn := kv.NewLeafTxn(ctx, s.DB(), s.DistSQLPlanningNodeID(), leafInputState)
+ metrics := kvstreamer.MakeMetrics()
return kvstreamer.NewStreamer(
s.DistSenderI().(*kvcoord.DistSender),
+ &metrics,
s.AppStopper(),
leafTxn,
func(ctx context.Context, ba *kvpb.BatchRequest) (*kvpb.BatchResponse, error) {
@@ -113,9 +115,11 @@ func TestStreamerLimitations(t *testing.T) {
})
t.Run("unexpected RootTxn", func(t *testing.T) {
+ metrics := kvstreamer.MakeMetrics()
require.Panics(t, func() {
kvstreamer.NewStreamer(
s.DistSenderI().(*kvcoord.DistSender),
+ &metrics,
s.AppStopper(),
kv.NewTxn(ctx, s.DB(), s.DistSQLPlanningNodeID()),
nil, /* sendFn */
diff --git a/pkg/kv/kvserver/batcheval/cmd_migrate.go b/pkg/kv/kvserver/batcheval/cmd_migrate.go
index 6ff111152357..da262233cea2 100644
--- a/pkg/kv/kvserver/batcheval/cmd_migrate.go
+++ b/pkg/kv/kvserver/batcheval/cmd_migrate.go
@@ -85,10 +85,10 @@ func Migrate(
// Set DoTimelyApplicationToAllReplicas so that migrates are applied on all
// replicas. This is done since MigrateRequests trigger a call to
// waitForApplication (see Replica.executeWriteBatch).
- if cArgs.EvalCtx.ClusterSettings().Version.IsActive(ctx, clusterversion.V25_1_AddRangeForceFlushKey) {
+ if cArgs.EvalCtx.ClusterSettings().Version.IsActive(ctx, clusterversion.V25_1_AddRangeForceFlushKey) ||
+ cArgs.EvalCtx.EvalKnobs().OverrideDoTimelyApplicationToAllReplicas {
pd.Replicated.DoTimelyApplicationToAllReplicas = true
}
-
return pd, nil
}
diff --git a/pkg/kv/kvserver/flow_control_integration_test.go b/pkg/kv/kvserver/flow_control_integration_test.go
index f35246a87a4b..ebd4cdde23af 100644
--- a/pkg/kv/kvserver/flow_control_integration_test.go
+++ b/pkg/kv/kvserver/flow_control_integration_test.go
@@ -17,12 +17,16 @@ import (
"time"
"github.com/cockroachdb/cockroach/pkg/base"
+ "github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
+ "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2"
+ "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
+ "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
@@ -5460,6 +5464,221 @@ func TestFlowControlSendQueueRangeRelocate(t *testing.T) {
})
}
+func TestFlowControlSendQueueRangeMigrate(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ defer log.Scope(t).Close(t)
+
+ ctx := context.Background()
+ const numNodes = 3
+ // We're going to be transitioning from startV to endV. Think a cluster of
+ // binaries running vX, but with active version vX-1.
+ startV := clusterversion.PreviousRelease.Version()
+ endV := clusterversion.Latest.Version()
+ settings := cluster.MakeTestingClusterSettingsWithVersions(endV, startV, false)
+ kvflowcontrol.Mode.Override(ctx, &settings.SV, kvflowcontrol.ApplyToAll)
+ // We want to exhaust tokens but not overload the test, so we set the limits
+ // lower (8 and 16 MiB default).
+ kvflowcontrol.ElasticTokensPerStream.Override(ctx, &settings.SV, 2<<20)
+ kvflowcontrol.RegularTokensPerStream.Override(ctx, &settings.SV, 4<<20)
+ disableWorkQueueGrantingServers := make([]atomic.Bool, numNodes)
+ setTokenReturnEnabled := func(enabled bool, serverIdxs ...int) {
+ for _, serverIdx := range serverIdxs {
+ disableWorkQueueGrantingServers[serverIdx].Store(!enabled)
+ }
+ }
+
+ argsPerServer := make(map[int]base.TestServerArgs)
+ for i := range disableWorkQueueGrantingServers {
+ disableWorkQueueGrantingServers[i].Store(true)
+ argsPerServer[i] = base.TestServerArgs{
+ Settings: settings,
+ Knobs: base.TestingKnobs{
+ Server: &server.TestingKnobs{
+ ClusterVersionOverride: startV,
+ DisableAutomaticVersionUpgrade: make(chan struct{}),
+ },
+ Store: &kvserver.StoreTestingKnobs{
+ EvalKnobs: kvserverbase.BatchEvalTestingKnobs{
+ // Because we are migrating from a version (currently) prior to the
+ // range force flush key version gate, we won't trigger the force
+ // flush via migrate until we're on the endV, which defeats the
+ // purpose of this test. We override the behavior here to allow the
+ // force flush to be triggered on the startV from a Migrate
+ // request.
+ OverrideDoTimelyApplicationToAllReplicas: true,
+ },
+ FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{
+ OverrideV2EnabledWhenLeaderLevel: func() kvflowcontrol.V2EnabledWhenLeaderLevel {
+ return kvflowcontrol.V2EnabledWhenLeaderV2Encoding
+ },
+ UseOnlyForScratchRanges: true,
+ OverrideTokenDeduction: func(tokens kvflowcontrol.Tokens) kvflowcontrol.Tokens {
+ // Deduct every write as 1 MiB, regardless of how large it
+ // actually is.
+ return kvflowcontrol.Tokens(1 << 20)
+ },
+ // We want to test the behavior of the send queue, so we want to
+ // always have up-to-date stats. This ensures that the send queue
+ // stats are always refreshed on each call to
+ // RangeController.HandleRaftEventRaftMuLocked.
+ OverrideAlwaysRefreshSendStreamStats: true,
+ },
+ },
+ AdmissionControl: &admission.TestingKnobs{
+ DisableWorkQueueFastPath: true,
+ DisableWorkQueueGranting: func() bool {
+ idx := i
+ return disableWorkQueueGrantingServers[idx].Load()
+ },
+ },
+ },
+ }
+ }
+
+ tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{
+ ReplicationMode: base.ReplicationManual,
+ ServerArgsPerNode: argsPerServer,
+ })
+ defer tc.Stopper().Stop(ctx)
+
+ k := tc.ScratchRange(t)
+ tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...)
+
+ h := newFlowControlTestHelper(
+ t, tc, "flow_control_integration_v2", /* testdata */
+ kvflowcontrol.V2EnabledWhenLeaderV2Encoding, true, /* isStatic */
+ )
+ h.init(kvflowcontrol.ApplyToAll)
+ defer h.close("send_queue_range_migrate")
+
+ desc, err := tc.LookupRange(k)
+ require.NoError(t, err)
+ h.enableVerboseRaftMsgLoggingForRange(desc.RangeID)
+ n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0))
+ h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)
+ h.resetV2TokenMetrics(ctx)
+ h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)
+
+ store := tc.GetFirstStoreFromServer(t, 0)
+ assertVersion := func(expV roachpb.Version) error {
+ repl, err := store.GetReplica(desc.RangeID)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if gotV := repl.Version(); gotV != expV {
+ return errors.Errorf("expected in-memory version %s, got %s", expV, gotV)
+ }
+
+ sl := stateloader.Make(desc.RangeID)
+ persistedV, err := sl.LoadVersion(ctx, store.TODOEngine())
+ if err != nil {
+ return err
+ }
+ if persistedV != expV {
+ return errors.Errorf("expected persisted version %s, got %s", expV, persistedV)
+ }
+ return nil
+ }
+
+ require.NoError(t, assertVersion(startV))
+
+ migrated := false
+ unregister := batcheval.TestingRegisterMigrationInterceptor(endV, func() {
+ migrated = true
+ })
+ defer unregister()
+
+ h.comment(`
+-- We will exhaust the tokens across all streams while admission is blocked on
+-- n3, using a single 4 MiB (deduction, the write itself is small) write. Then,
+-- we will write a 1 MiB put to the range, migrate the range, and write a 1 MiB
+-- put to the migrated range. We expect that the migration will trigger a force
+-- flush of the send queue.`)
+ // Block admission on n3, while allowing every other node to admit.
+ setTokenReturnEnabled(true /* enabled */, 0, 1)
+ setTokenReturnEnabled(false /* enabled */, 2)
+ // Drain the tokens to n3 by blocking admission and issuing the buffer
+ // size of writes to the range.
+ h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
+ h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
+ h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
+ h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
+ h.waitForTotalTrackedTokens(ctx, desc.RangeID, 4<<20 /* 4 MiB */, 0 /* serverIdx */)
+
+ h.comment(`(Sending 1 MiB put request to develop a send queue)`)
+ h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
+ h.comment(`(Sent 1 MiB put request)`)
+ h.waitForTotalTrackedTokens(ctx, desc.RangeID, 4<<20 /* 4 MiB */, 0 /* serverIdx */)
+ h.waitForAllTokensReturnedForStreamsV2(ctx, 0, /* serverIdx */
+ testingMkFlowStream(0), testingMkFlowStream(1))
+ h.waitForSendQueueSize(ctx, desc.RangeID, 1<<20 /* expSize 1 MiB */, 0 /* serverIdx */)
+
+ h.comment(`
+-- Send queue metrics from n1, n3's send queue should have 1 MiB for s3.`)
+ h.query(n1, flowSendQueueQueryStr)
+ h.comment(`
+-- Observe the total tracked tokens per-stream on n1, s3's entries will still
+-- be tracked here.`)
+ h.query(n1, `
+ SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8)
+ FROM crdb_internal.kv_flow_control_handles_v2
+`, "range_id", "store_id", "total_tracked_tokens")
+ h.comment(`
+-- Per-store tokens available from n1, these should reflect the lack of tokens
+-- for s3.`)
+ h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)
+
+ h.comment(`-- (Issuing MigrateRequest to range)`)
+ req := &kvpb.MigrateRequest{
+ RequestHeader: kvpb.RequestHeader{
+ Key: desc.StartKey.AsRawKey(),
+ EndKey: desc.EndKey.AsRawKey(),
+ },
+ Version: endV,
+ }
+ kvDB := tc.Servers[0].DB()
+ require.NoError(t, func() error {
+ if _, pErr := kv.SendWrappedWith(ctx, kvDB.GetFactory().NonTransactionalSender(), kvpb.Header{RangeID: desc.RangeID}, req); pErr != nil {
+ return pErr.GoError()
+ }
+ if !migrated {
+ return errors.Errorf("expected migration interceptor to have been called")
+ }
+ return assertVersion(endV)
+ }())
+
+ h.waitForConnectedStreams(ctx, desc.RangeID, 3, 0 /* serverIdx */)
+ h.waitForSendQueueSize(ctx, desc.RangeID, 0 /* expSize */, 0 /* serverIdx */)
+ h.comment(`
+-- Send queue and flow token metrics from n1 post-migrate. The migrate should
+-- have triggered a force flush of the send queue.`)
+ h.query(n1, flowSendQueueQueryStr)
+ h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)
+
+ h.comment(`(Sending 1 MiB put request to the migrated range)`)
+ h.put(ctx, roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
+ h.comment(`(Sent 1 MiB put request to the migrated range)`)
+ h.waitForSendQueueSize(ctx, desc.RangeID, 1<<20 /* expSize 1 MiB */, 0 /* serverIdx */)
+ h.waitForAllTokensReturnedForStreamsV2(ctx, 0, /* serverIdx */
+ testingMkFlowStream(0), testingMkFlowStream(1))
+
+ h.comment(`
+-- Send queue and flow token metrics from n1 post-migrate and post 1 MiB put.
+-- We expect to see the send queue develop for s3 again.`)
+ h.query(n1, flowSendQueueQueryStr)
+ h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)
+
+ h.comment(`-- (Allowing below-raft admission to proceed on n3.)`)
+ setTokenReturnEnabled(true /* enabled */, 2)
+ h.waitForAllTokensReturned(ctx, 3, 0 /* serverIdx */)
+ h.waitForSendQueueSize(ctx, desc.RangeID, 0 /* expSize 0 MiB */, 0 /* serverIdx */)
+
+ h.comment(`
+-- Send queue and flow token metrics from n1. All tokens should be returned.`)
+ h.query(n1, flowSendQueueQueryStr)
+ h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)
+}
+
type flowControlTestHelper struct {
t testing.TB
tc *testcluster.TestCluster
diff --git a/pkg/kv/kvserver/kvserverbase/knobs.go b/pkg/kv/kvserver/kvserverbase/knobs.go
index a894ad3a2921..84536ad425db 100644
--- a/pkg/kv/kvserver/kvserverbase/knobs.go
+++ b/pkg/kv/kvserver/kvserverbase/knobs.go
@@ -52,6 +52,16 @@ type BatchEvalTestingKnobs struct {
// its record (which can be resolved synchronously with EndTxn). This is
// useful in certain tests.
DisableTxnAutoGC bool
+
+ // OverrideDoTimelyApplicationToAllReplicas overrides the cluster version
+ // check for the timely replication directive which force flushes rac2 send
+ // queues to all replicas, if present for *Migrate* requests only. When set
+ // to true, the directive is always set, when set to false, the default
+ // behavior is used.
+ //
+ // NOTE: This currently only applies to Migrate requests and only ignores the
+ // cluster version.
+ OverrideDoTimelyApplicationToAllReplicas bool
}
// IntentResolverTestingKnobs contains testing helpers that are used during
diff --git a/pkg/kv/kvserver/rangefeed/bench_test.go b/pkg/kv/kvserver/rangefeed/bench_test.go
index bf9066fca64f..ec8f92795122 100644
--- a/pkg/kv/kvserver/rangefeed/bench_test.go
+++ b/pkg/kv/kvserver/rangefeed/bench_test.go
@@ -105,7 +105,7 @@ func runBenchmarkRangefeed(b *testing.B, opts benchmarkRangefeedOpts) {
streams[i] = &noopStream{ctx: ctx, done: make(chan *kvpb.Error, 1)}
ok, _, _ := p.Register(ctx, span, hlc.MinTimestamp, nil,
withDiff, withFiltering, false, /* withOmitRemote */
- streams[i], nil)
+ streams[i])
require.True(b, ok)
}
diff --git a/pkg/kv/kvserver/rangefeed/budget.go b/pkg/kv/kvserver/rangefeed/budget.go
index 7966843c0937..ce43c2e40b1f 100644
--- a/pkg/kv/kvserver/rangefeed/budget.go
+++ b/pkg/kv/kvserver/rangefeed/budget.go
@@ -228,7 +228,7 @@ func (f *FeedBudget) Close(ctx context.Context) {
f.closed.Do(func() {
f.mu.Lock()
f.mu.closed = true
- f.mu.memBudget.Close(ctx)
+ f.mu.memBudget.Clear(ctx)
close(f.stopC)
f.mu.Unlock()
})
diff --git a/pkg/kv/kvserver/rangefeed/buffered_registration.go b/pkg/kv/kvserver/rangefeed/buffered_registration.go
index 6fb9906430fe..909e9283fe12 100644
--- a/pkg/kv/kvserver/rangefeed/buffered_registration.go
+++ b/pkg/kv/kvserver/rangefeed/buffered_registration.go
@@ -81,17 +81,17 @@ func newBufferedRegistration(
blockWhenFull bool,
metrics *Metrics,
stream Stream,
- unregisterFn func(),
+ removeRegFromProcessor func(registration),
) *bufferedRegistration {
br := &bufferedRegistration{
baseRegistration: baseRegistration{
- streamCtx: streamCtx,
- span: span,
- catchUpTimestamp: startTS,
- withDiff: withDiff,
- withFiltering: withFiltering,
- withOmitRemote: withOmitRemote,
- unreg: unregisterFn,
+ streamCtx: streamCtx,
+ span: span,
+ catchUpTimestamp: startTS,
+ withDiff: withDiff,
+ withFiltering: withFiltering,
+ withOmitRemote: withOmitRemote,
+ removeRegFromProcessor: removeRegFromProcessor,
},
metrics: metrics,
stream: stream,
@@ -172,6 +172,7 @@ func (br *bufferedRegistration) Disconnect(pErr *kvpb.Error) {
}
br.mu.disconnected = true
br.stream.SendError(pErr)
+ br.removeRegFromProcessor(br)
}
}
diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go
index 198c638c9af0..de207b7d89a5 100644
--- a/pkg/kv/kvserver/rangefeed/processor.go
+++ b/pkg/kv/kvserver/rangefeed/processor.go
@@ -104,6 +104,11 @@ type Config struct {
// for low-volume system ranges, since the worker pool is small (default 2).
// Only has an effect when Scheduler is used.
Priority bool
+
+ // UnregisterFromReplica is a callback provided from the
+ // replica that this processor can call when shutting down to
+ // remove itself from the replica.
+ UnregisterFromReplica func(Processor)
}
// SetDefaults initializes unset fields in Config to values
@@ -191,7 +196,6 @@ type Processor interface {
withFiltering bool,
withOmitRemote bool,
stream Stream,
- disconnectFn func(),
) (bool, Disconnector, *Filter)
// DisconnectSpanWithErr disconnects all rangefeed registrations that overlap
diff --git a/pkg/kv/kvserver/rangefeed/processor_test.go b/pkg/kv/kvserver/rangefeed/processor_test.go
index d1a5f5162607..32d0e86cd4f2 100644
--- a/pkg/kv/kvserver/rangefeed/processor_test.go
+++ b/pkg/kv/kvserver/rangefeed/processor_test.go
@@ -70,7 +70,6 @@ func TestProcessorBasic(t *testing.T) {
false, /* withFiltering */
false, /* withOmitRemote */
r1Stream,
- func() {},
)
require.True(t, r1OK)
h.syncEventAndRegistrations()
@@ -205,7 +204,6 @@ func TestProcessorBasic(t *testing.T) {
true, /* withFiltering */
false, /* withOmitRemote */
r2Stream,
- func() {},
)
require.True(t, r2OK)
h.syncEventAndRegistrations()
@@ -314,7 +312,6 @@ func TestProcessorBasic(t *testing.T) {
false, /* withFiltering */
false, /* withOmitRemote */
r3Stream,
- func() {},
)
require.True(t, r30K)
r3Stream.SendError(kvpb.NewError(fmt.Errorf("disconnection error")))
@@ -336,7 +333,6 @@ func TestProcessorBasic(t *testing.T) {
false, /* withFiltering */
false, /* withOmitRemote */
r4Stream,
- func() {},
)
require.False(t, r4OK)
})
@@ -362,7 +358,6 @@ func TestProcessorOmitRemote(t *testing.T) {
false, /* withFiltering */
false, /* withOmitRemote */
r1Stream,
- func() {},
)
require.True(t, r1OK)
h.syncEventAndRegistrations()
@@ -388,7 +383,6 @@ func TestProcessorOmitRemote(t *testing.T) {
false, /* withFiltering */
true, /* withOmitRemote */
r2Stream,
- func() {},
)
require.True(t, r2OK)
h.syncEventAndRegistrations()
@@ -442,7 +436,6 @@ func TestProcessorSlowConsumer(t *testing.T) {
false, /* withFiltering */
false, /* withOmitRemote */
r1Stream,
- func() {},
)
r2Stream := newTestStream()
p.Register(
@@ -454,7 +447,6 @@ func TestProcessorSlowConsumer(t *testing.T) {
false, /* withFiltering */
false, /* withOmitRemote */
r2Stream,
- func() {},
)
h.syncEventAndRegistrations()
require.Equal(t, 2, p.Len())
@@ -551,7 +543,6 @@ func TestProcessorMemoryBudgetExceeded(t *testing.T) {
false, /* withFiltering */
false, /* withOmitRemote */
r1Stream,
- func() {},
)
h.syncEventAndRegistrations()
@@ -607,7 +598,6 @@ func TestProcessorMemoryBudgetReleased(t *testing.T) {
false, /* withFiltering */
false, /* withOmitRemote */
r1Stream,
- func() {},
)
h.syncEventAndRegistrations()
@@ -689,7 +679,6 @@ func TestProcessorInitializeResolvedTimestamp(t *testing.T) {
false, /* withFiltering */
false, /* withOmitRemote */
r1Stream,
- func() {},
)
h.syncEventAndRegistrations()
require.Equal(t, 1, p.Len())
@@ -996,7 +985,7 @@ func TestProcessorConcurrentStop(t *testing.T) {
runtime.Gosched()
s := newTestStream()
p.Register(s.ctx, h.span, hlc.Timestamp{}, nil, /* catchUpIter */
- false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, s, func() {})
+ false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, s)
}()
go func() {
defer wg.Done()
@@ -1068,7 +1057,7 @@ func TestProcessorRegistrationObservesOnlyNewEvents(t *testing.T) {
s := newTestStream()
regs[s] = firstIdx
p.Register(s.ctx, h.span, hlc.Timestamp{}, nil, /* catchUpIter */
- false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, s, func() {})
+ false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, s)
regDone <- struct{}{}
}
}()
@@ -1129,7 +1118,6 @@ func TestBudgetReleaseOnProcessorStop(t *testing.T) {
false, /* withFiltering */
false, /* withOmitRemote */
rStream,
- func() {},
)
h.syncEventAndRegistrations()
@@ -1210,7 +1198,6 @@ func TestBudgetReleaseOnLastStreamError(t *testing.T) {
false, /* withFiltering */
false, /* withOmitRemote */
rStream,
- func() {},
)
h.syncEventAndRegistrations()
@@ -1281,7 +1268,6 @@ func TestBudgetReleaseOnOneStreamError(t *testing.T) {
false, /* withFiltering */
false, /* withOmitRemote */
r1Stream,
- func() {},
)
// Non-blocking registration that would consume all events.
@@ -1295,7 +1281,6 @@ func TestBudgetReleaseOnOneStreamError(t *testing.T) {
false, /* withFiltering */
false, /* withOmitRemote */
r2Stream,
- func() {},
)
h.syncEventAndRegistrations()
@@ -1463,7 +1448,7 @@ func TestProcessorBackpressure(t *testing.T) {
// Add a registration.
stream := newTestStream()
ok, _, _ := p.Register(stream.ctx, span, hlc.MinTimestamp, nil, /* catchUpIter */
- false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, stream, nil)
+ false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, stream)
require.True(t, ok)
// Wait for the initial checkpoint.
diff --git a/pkg/kv/kvserver/rangefeed/registry.go b/pkg/kv/kvserver/rangefeed/registry.go
index 43c8523ed359..453df77b138d 100644
--- a/pkg/kv/kvserver/rangefeed/registry.go
+++ b/pkg/kv/kvserver/rangefeed/registry.go
@@ -9,6 +9,7 @@ import (
"context"
"fmt"
"sync"
+ "sync/atomic"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
@@ -66,23 +67,35 @@ type registration interface {
Range() interval.Range
// ID returns the id field of the registration as a uintptr.
ID() uintptr
- // getUnreg returns the unregisterFn call back of the registration. It should
- // be called when being unregistered from processor.
- getUnreg() func()
+
+ // shouldUnregister returns true if this registration should be unregistered
+ // by unregisterMarkedRegistrations. UnregisterMarkedRegistrations is called
+ // by the rangefeed scheduler when it has been informed of an unregister
+ // request.
+ shouldUnregister() bool
+ // setShouldUnregister sets shouldUnregister to true. Used by the rangefeed
+ // processor in response to an unregister request.
+ setShouldUnregister()
}
// baseRegistration is a common base for all registration types. It is intended
// to be embedded in an actual registration struct.
type baseRegistration struct {
- streamCtx context.Context
- span roachpb.Span
- withDiff bool
- withFiltering bool
- withOmitRemote bool
- unreg func()
+ streamCtx context.Context
+ span roachpb.Span
+ withDiff bool
+ withFiltering bool
+ withOmitRemote bool
+ // removeRegFromProcessor is called to remove the registration from its
+ // processor. This is provided by the creator of the registration and called
+ // during disconnect(). Since it is called during disconnect it must be
+ // non-blocking.
+ removeRegFromProcessor func(registration)
+
catchUpTimestamp hlc.Timestamp // exclusive
id int64 // internal
keys interval.Range
+ shouldUnreg atomic.Bool
}
// ID implements interval.Interface.
@@ -123,8 +136,12 @@ func (r *baseRegistration) getWithOmitRemote() bool {
return r.withOmitRemote
}
-func (r *baseRegistration) getUnreg() func() {
- return r.unreg
+func (r *baseRegistration) shouldUnregister() bool {
+ return r.shouldUnreg.Load()
+}
+
+func (r *baseRegistration) setShouldUnregister() {
+ r.shouldUnreg.Store(true)
}
func (r *baseRegistration) getWithDiff() bool {
@@ -337,16 +354,6 @@ func (reg *registry) PublishToOverlapping(
})
}
-// Unregister removes a registration from the registry. It is assumed that the
-// registration has already been disconnected, this is intended only to clean
-// up the registry.
-func (reg *registry) Unregister(ctx context.Context, r registration) {
- reg.metrics.RangeFeedRegistrations.Dec(1)
- if err := reg.tree.Delete(r, false /* fast */); err != nil {
- log.Fatalf(ctx, "%v", err)
- }
-}
-
// DisconnectAllOnShutdown disconnectes all registrations on processor shutdown.
// This is different from normal disconnect as registrations won't be able to
// perform Unregister when processor's work loop is already terminated.
@@ -355,16 +362,9 @@ func (reg *registry) Unregister(ctx context.Context, r registration) {
// TODO: this should be revisited as part of
// https://github.com/cockroachdb/cockroach/issues/110634
func (reg *registry) DisconnectAllOnShutdown(ctx context.Context, pErr *kvpb.Error) {
- reg.metrics.RangeFeedRegistrations.Dec(int64(reg.tree.Len()))
reg.DisconnectWithErr(ctx, all, pErr)
}
-// Disconnect disconnects all registrations that overlap the specified span with
-// a nil error.
-func (reg *registry) Disconnect(ctx context.Context, span roachpb.Span) {
- reg.DisconnectWithErr(ctx, span, nil /* pErr */)
-}
-
// DisconnectWithErr disconnects all registrations that overlap the specified
// span with the provided error.
func (reg *registry) DisconnectWithErr(ctx context.Context, span roachpb.Span, pErr *kvpb.Error) {
@@ -398,7 +398,14 @@ func (reg *registry) forOverlappingRegs(
} else {
reg.tree.DoMatching(matchFn, span.AsRange())
}
+ reg.remove(ctx, toDelete)
+}
+func (reg *registry) remove(ctx context.Context, toDelete []interval.Interface) {
+ // We only ever call remote on values we know exist in the
+ // registry, so we can assume we can decrement this by the
+ // lenght of the input.
+ reg.metrics.RangeFeedRegistrations.Dec(int64(len(toDelete)))
if len(toDelete) == reg.tree.Len() {
reg.tree.Clear()
} else if len(toDelete) == 1 {
@@ -415,6 +422,24 @@ func (reg *registry) forOverlappingRegs(
}
}
+// unregisterMarkedRegistrations iterates the registery and removes any
+// registrations where shouldUnregister() returns true. This is called by the
+// rangefeed processor in response to an async unregistration request.
+//
+// See the comment on (*ScheduledProcessor).unregisterClientAsync for more
+// details.
+func (reg *registry) unregisterMarkedRegistrations(ctx context.Context) {
+ var toDelete []interval.Interface
+ reg.tree.Do(func(i interval.Interface) (done bool) {
+ r := i.(registration)
+ if r.shouldUnregister() {
+ toDelete = append(toDelete, i)
+ }
+ return false
+ })
+ reg.remove(ctx, toDelete)
+}
+
// waitForCaughtUp waits for all registrations overlapping the given span to
// completely process their internal buffers.
func (reg *registry) waitForCaughtUp(ctx context.Context, span roachpb.Span) error {
diff --git a/pkg/kv/kvserver/rangefeed/registry_helper_test.go b/pkg/kv/kvserver/rangefeed/registry_helper_test.go
index a5b543386b99..56d9780f3593 100644
--- a/pkg/kv/kvserver/rangefeed/registry_helper_test.go
+++ b/pkg/kv/kvserver/rangefeed/registry_helper_test.go
@@ -267,7 +267,7 @@ func newTestRegistration(
false, /* blockWhenFull */
NewMetrics(),
s,
- func() {},
+ func(registration) {},
)
return &testRegistration{
bufferedRegistration: r,
diff --git a/pkg/kv/kvserver/rangefeed/registry_test.go b/pkg/kv/kvserver/rangefeed/registry_test.go
index c551a6bb1467..ba850f7cf571 100644
--- a/pkg/kv/kvserver/rangefeed/registry_test.go
+++ b/pkg/kv/kvserver/rangefeed/registry_test.go
@@ -195,7 +195,6 @@ func TestRegistryBasic(t *testing.T) {
reg := makeRegistry(NewMetrics())
require.Equal(t, 0, reg.Len())
reg.PublishToOverlapping(ctx, spAB, ev1, logicalOpMetadata{}, nil /* alloc */)
- reg.Disconnect(ctx, spAB)
reg.DisconnectWithErr(ctx, spAB, err1)
rAB := newTestRegistration(spAB, hlc.Timestamp{}, nil, false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */)
@@ -284,7 +283,7 @@ func TestRegistryBasic(t *testing.T) {
require.Equal(t, []*kvpb.RangeFeedEvent{noPrev(ev4), noPrev(ev1)}, rAB.GetAndClearEvents())
// Disconnect from rAB without error.
- reg.Disconnect(ctx, spAB)
+ reg.DisconnectWithErr(ctx, spAB, nil)
require.Nil(t, rAC.WaitForError(t))
require.Nil(t, rAB.WaitForError(t))
require.Equal(t, 1, reg.Len())
@@ -312,9 +311,13 @@ func TestRegistryBasic(t *testing.T) {
require.False(t, f.NeedPrevVal(roachpb.Span{Key: keyC}))
require.False(t, f.NeedPrevVal(roachpb.Span{Key: keyX}))
- // Unregister the rBC registration.
- reg.Unregister(ctx, rBC.bufferedRegistration)
+ // Unregister the rBC registration as if it was being unregistered via the
+ // processor.
+ rBC.setShouldUnregister()
+ reg.unregisterMarkedRegistrations(ctx)
require.Equal(t, 0, reg.Len())
+ require.Equal(t, 0, int(reg.metrics.RangeFeedRegistrations.Value()),
+ "RangefeedRegistrations metric not zero after all registrations have been removed")
}
func TestRegistryPublishBeneathStartTimestamp(t *testing.T) {
@@ -360,30 +363,30 @@ func TestRegistryPublishBeneathStartTimestamp(t *testing.T) {
func TestRegistrationString(t *testing.T) {
testCases := []struct {
- r baseRegistration
+ r *baseRegistration
exp string
}{
{
- r: baseRegistration{
+ r: &baseRegistration{
span: roachpb.Span{Key: roachpb.Key("a")},
},
exp: `[a @ 0,0+]`,
},
{
- r: baseRegistration{span: roachpb.Span{
+ r: &baseRegistration{span: roachpb.Span{
Key: roachpb.Key("a"), EndKey: roachpb.Key("c")},
},
exp: `[{a-c} @ 0,0+]`,
},
{
- r: baseRegistration{
+ r: &baseRegistration{
span: roachpb.Span{Key: roachpb.Key("d")},
catchUpTimestamp: hlc.Timestamp{WallTime: 10, Logical: 1},
},
exp: `[d @ 0.000000010,1+]`,
},
{
- r: baseRegistration{span: roachpb.Span{
+ r: &baseRegistration{span: roachpb.Span{
Key: roachpb.Key("d"), EndKey: roachpb.Key("z")},
catchUpTimestamp: hlc.Timestamp{WallTime: 40, Logical: 9},
},
diff --git a/pkg/kv/kvserver/rangefeed/scheduled_processor.go b/pkg/kv/kvserver/rangefeed/scheduled_processor.go
index 51a7ad55dce4..e8d01d3a569d 100644
--- a/pkg/kv/kvserver/rangefeed/scheduled_processor.go
+++ b/pkg/kv/kvserver/rangefeed/scheduled_processor.go
@@ -7,6 +7,7 @@ package rangefeed
import (
"context"
+ "sync/atomic"
"time"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
@@ -64,6 +65,11 @@ type ScheduledProcessor struct {
// stopper passed by start that is used for firing up async work from scheduler.
stopper *stop.Stopper
txnPushActive bool
+
+ // pendingUnregistrations indicates that the registry may have registrations
+ // that can be unregistered. This is handled outside of the requestQueue to
+ // avoid blocking clients who only need to signal unregistration.
+ pendingUnregistrations atomic.Bool
}
// NewScheduledProcessor creates a new scheduler based rangefeed Processor.
@@ -157,6 +163,16 @@ func (p *ScheduledProcessor) processRequests(ctx context.Context) {
case e := <-p.requestQueue:
e(ctx)
default:
+ if p.pendingUnregistrations.Swap(false) {
+ p.reg.unregisterMarkedRegistrations(ctx)
+ // If we have no more registrations, we can stop this processor. Note
+ // that stopInternal sets p.stopping to true so any register requests
+ // being concurrenly enqueued will fail-fast before being added to the
+ // registry.
+ if p.reg.Len() == 0 {
+ p.stopInternal(ctx, nil)
+ }
+ }
return
}
}
@@ -239,6 +255,10 @@ func (p *ScheduledProcessor) cleanup() {
p.taskCancel()
close(p.stoppedC)
p.MemBudget.Close(ctx)
+ if p.UnregisterFromReplica != nil {
+ p.UnregisterFromReplica(p)
+ }
+
}
// Stop shuts down the processor and closes all registrations. Safe to call on
@@ -271,14 +291,18 @@ func (p *ScheduledProcessor) DisconnectSpanWithErr(span roachpb.Span, pErr *kvpb
func (p *ScheduledProcessor) sendStop(pErr *kvpb.Error) {
p.enqueueRequest(func(ctx context.Context) {
- p.reg.DisconnectWithErr(ctx, all, pErr)
- // First set stopping flag to ensure that once all registrations are removed
- // processor should stop.
- p.stopping = true
- p.scheduler.StopProcessor()
+ p.stopInternal(ctx, pErr)
})
}
+func (p *ScheduledProcessor) stopInternal(ctx context.Context, pErr *kvpb.Error) {
+ p.reg.DisconnectAllOnShutdown(ctx, pErr)
+ // First set stopping flag to ensure that once all registrations are removed
+ // processor should stop.
+ p.stopping = true
+ p.scheduler.StopProcessor()
+}
+
// Register registers the stream over the specified span of keys.
//
// The registration will not observe any events that were consumed before this
@@ -304,7 +328,6 @@ func (p *ScheduledProcessor) Register(
withFiltering bool,
withOmitRemote bool,
stream Stream,
- disconnectFn func(),
) (bool, Disconnector, *Filter) {
// Synchronize the event channel so that this registration doesn't see any
// events that were consumed before this registration was called. Instead,
@@ -319,8 +342,7 @@ func (p *ScheduledProcessor) Register(
} else {
r = newBufferedRegistration(
streamCtx, span.AsRawSpanWithNoLocals(), startTS, catchUpIter, withDiff, withFiltering, withOmitRemote,
- p.Config.EventChanCap, blockWhenFull, p.Metrics, stream, disconnectFn,
- )
+ p.Config.EventChanCap, blockWhenFull, p.Metrics, stream, p.unregisterClientAsync)
}
filter := runRequest(p, func(ctx context.Context, p *ScheduledProcessor) *Filter {
@@ -345,16 +367,7 @@ func (p *ScheduledProcessor) Register(
r.publish(ctx, p.newCheckpointEvent(), nil)
// Run an output loop for the registry.
- runOutputLoop := func(ctx context.Context) {
- r.runOutputLoop(ctx, p.RangeID)
- if p.unregisterClient(r) {
- // unreg callback is set by replica to tear down processors that have
- // zero registrations left and to update event filters.
- if f := r.getUnreg(); f != nil {
- f()
- }
- }
- }
+ runOutputLoop := func(ctx context.Context) { r.runOutputLoop(ctx, p.RangeID) }
// NB: use ctx, not p.taskCtx, as the registry handles teardown itself.
if err := p.Stopper.RunAsyncTask(ctx, "rangefeed: output loop", runOutputLoop); err != nil {
// If we can't schedule internally, processor is already stopped which
@@ -362,7 +375,6 @@ func (p *ScheduledProcessor) Register(
// registration.
r.Disconnect(kvpb.NewError(err))
r.drainAllocations(ctx)
- p.reg.Unregister(ctx, r)
}
return f
})
@@ -372,13 +384,6 @@ func (p *ScheduledProcessor) Register(
return false, nil, nil
}
-func (p *ScheduledProcessor) unregisterClient(r registration) bool {
- return runRequest(p, func(ctx context.Context, p *ScheduledProcessor) bool {
- p.reg.Unregister(ctx, r)
- return true
- })
-}
-
// ConsumeLogicalOps informs the rangefeed processor of the set of logical
// operations. It returns false if consuming the operations hit a timeout, as
// specified by the EventChanTimeout configuration. If the method returns false,
@@ -637,6 +642,29 @@ func (p *ScheduledProcessor) enqueueRequest(req request) {
}
}
+// unregisterClientAsync instructs the processor to unregister the given
+// registration. This doesn't send an actual request to the request queue to
+// ensure that it is non-blocking.
+//
+// Rather, the registration has its shouldUnregister flag set and the
+// processor's pendingUnregistrations flag is set. During processRequests, if
+// pendingUnregistrations is true, we remove any marked registrations from the
+// registry.
+//
+// We are OK with these being processed out of order since these requests
+// originate from a registration cleanup, so the registration in question is no
+// longer processing events.
+func (p *ScheduledProcessor) unregisterClientAsync(r registration) {
+ select {
+ case <-p.stoppedC:
+ return
+ default:
+ }
+ r.setShouldUnregister()
+ p.pendingUnregistrations.Store(true)
+ p.scheduler.Enqueue(RequestQueued)
+}
+
func (p *ScheduledProcessor) consumeEvent(ctx context.Context, e *event) {
switch {
case e.ops != nil:
diff --git a/pkg/kv/kvserver/rangefeed/stream_manager_test.go b/pkg/kv/kvserver/rangefeed/stream_manager_test.go
index 274bab21b242..7387ba42693b 100644
--- a/pkg/kv/kvserver/rangefeed/stream_manager_test.go
+++ b/pkg/kv/kvserver/rangefeed/stream_manager_test.go
@@ -221,7 +221,7 @@ func TestStreamManagerErrorHandling(t *testing.T) {
stream := sm.NewStream(sID, rID)
registered, d, _ := p.Register(ctx, h.span, hlc.Timestamp{}, nil, /* catchUpIter */
false /* withDiff */, false /* withFiltering */, false, /* withOmitRemote */
- stream, func() {})
+ stream)
require.True(t, registered)
go p.StopWithErr(disconnectErr)
require.Equal(t, 0, testRangefeedCounter.get())
@@ -235,7 +235,7 @@ func TestStreamManagerErrorHandling(t *testing.T) {
defer stopper.Stop(ctx)
registered, d, _ := p.Register(ctx, h.span, hlc.Timestamp{}, nil, /* catchUpIter */
false /* withDiff */, false /* withFiltering */, false, /* withOmitRemote */
- stream, func() {})
+ stream)
require.True(t, registered)
sm.AddStream(sID, d)
require.Equal(t, 1, p.Len())
@@ -250,7 +250,7 @@ func TestStreamManagerErrorHandling(t *testing.T) {
defer stopper.Stop(ctx)
registered, d, _ := p.Register(ctx, h.span, hlc.Timestamp{}, nil, /* catchUpIter */
false /* withDiff */, false /* withFiltering */, false, /* withOmitRemote */
- stream, func() {})
+ stream)
require.True(t, registered)
sm.AddStream(sID, d)
require.Equal(t, 1, testRangefeedCounter.get())
diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go
index a806530d49a0..4e6e52277236 100644
--- a/pkg/kv/kvserver/replica_rangefeed.go
+++ b/pkg/kv/kvserver/replica_rangefeed.go
@@ -466,7 +466,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked(
if p != nil {
reg, disconnector, filter := p.Register(streamCtx, span, startTS, catchUpIter, withDiff, withFiltering, withOmitRemote,
- stream, func() { r.maybeDisconnectEmptyRangefeed(p) })
+ stream)
if reg {
// Registered successfully with an existing processor.
// Update the rangefeed filter to avoid filtering ops
@@ -499,20 +499,21 @@ func (r *Replica) registerWithRangefeedRaftMuLocked(
desc := r.Desc()
tp := rangefeedTxnPusher{ir: r.store.intentResolver, r: r, span: desc.RSpan()}
cfg := rangefeed.Config{
- AmbientContext: r.AmbientContext,
- Clock: r.Clock(),
- Stopper: r.store.stopper,
- Settings: r.store.ClusterSettings(),
- RangeID: r.RangeID,
- Span: desc.RSpan(),
- TxnPusher: &tp,
- PushTxnsAge: r.store.TestingKnobs().RangeFeedPushTxnsAge,
- EventChanCap: defaultEventChanCap,
- EventChanTimeout: defaultEventChanTimeout,
- Metrics: r.store.metrics.RangeFeedMetrics,
- MemBudget: feedBudget,
- Scheduler: r.store.getRangefeedScheduler(),
- Priority: isSystemSpan, // only takes effect when Scheduler != nil
+ AmbientContext: r.AmbientContext,
+ Clock: r.Clock(),
+ Stopper: r.store.stopper,
+ Settings: r.store.ClusterSettings(),
+ RangeID: r.RangeID,
+ Span: desc.RSpan(),
+ TxnPusher: &tp,
+ PushTxnsAge: r.store.TestingKnobs().RangeFeedPushTxnsAge,
+ EventChanCap: defaultEventChanCap,
+ EventChanTimeout: defaultEventChanTimeout,
+ Metrics: r.store.metrics.RangeFeedMetrics,
+ MemBudget: feedBudget,
+ Scheduler: r.store.getRangefeedScheduler(),
+ Priority: isSystemSpan, // only takes effect when Scheduler != nil
+ UnregisterFromReplica: r.unsetRangefeedProcessor,
}
p = rangefeed.NewProcessor(cfg)
@@ -547,7 +548,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked(
// this ensures that the only time the registration fails is during
// server shutdown.
reg, disconnector, filter := p.Register(streamCtx, span, startTS, catchUpIter, withDiff,
- withFiltering, withOmitRemote, stream, func() { r.maybeDisconnectEmptyRangefeed(p) })
+ withFiltering, withOmitRemote, stream)
if !reg {
select {
case <-r.store.Stopper().ShouldQuiesce():
diff --git a/pkg/kv/kvserver/testdata/flow_control_integration_v2/send_queue_range_migrate b/pkg/kv/kvserver/testdata/flow_control_integration_v2/send_queue_range_migrate
new file mode 100644
index 000000000000..9bd56806b83d
--- /dev/null
+++ b/pkg/kv/kvserver/testdata/flow_control_integration_v2/send_queue_range_migrate
@@ -0,0 +1,164 @@
+echo
+----
+----
+-- We will exhaust the tokens across all streams while admission is blocked on
+-- n3, using a single 4 MiB (deduction, the write itself is small) write. Then,
+-- we will write a 1 MiB put to the range, migrate the range, and write a 1 MiB
+-- put to the migrated range. We expect that the migration will trigger a force
+-- flush of the send queue.
+
+
+(Sending 1 MiB put request to develop a send queue)
+
+
+(Sent 1 MiB put request)
+
+
+-- Send queue metrics from n1, n3's send queue should have 1 MiB for s3.
+SELECT name, crdb_internal.humanize_bytes(value::INT8)
+ FROM crdb_internal.node_metrics
+ WHERE name LIKE '%kvflowcontrol%send_queue%'
+ AND name != 'kvflowcontrol.send_queue.count'
+ORDER BY name ASC;
+
+ kvflowcontrol.send_queue.bytes | 1.0 MiB
+ kvflowcontrol.send_queue.prevent.count | 0 B
+ kvflowcontrol.send_queue.scheduled.deducted_bytes | 0 B
+ kvflowcontrol.send_queue.scheduled.force_flush | 0 B
+ kvflowcontrol.tokens.send.elastic.deducted.force_flush_send_queue | 0 B
+ kvflowcontrol.tokens.send.elastic.deducted.prevent_send_queue | 0 B
+ kvflowcontrol.tokens.send.regular.deducted.prevent_send_queue | 0 B
+
+
+-- Observe the total tracked tokens per-stream on n1, s3's entries will still
+-- be tracked here.
+SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8)
+ FROM crdb_internal.kv_flow_control_handles_v2
+
+ range_id | store_id | total_tracked_tokens
+-----------+----------+-----------------------
+ 70 | 1 | 0 B
+ 70 | 2 | 0 B
+ 70 | 3 | 4.0 MiB
+
+
+-- Per-store tokens available from n1, these should reflect the lack of tokens
+-- for s3.
+SELECT store_id,
+ crdb_internal.humanize_bytes(available_eval_regular_tokens),
+ crdb_internal.humanize_bytes(available_eval_elastic_tokens),
+ crdb_internal.humanize_bytes(available_send_regular_tokens),
+ crdb_internal.humanize_bytes(available_send_elastic_tokens)
+ FROM crdb_internal.kv_flow_controller_v2
+ ORDER BY store_id ASC;
+
+ store_id | eval_regular_available | eval_elastic_available | send_regular_available | send_elastic_available
+-----------+------------------------+------------------------+------------------------+-------------------------
+ 1 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB
+ 2 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB
+ 3 | 0 B | -3.0 MiB | 0 B | -2.0 MiB
+
+
+-- (Issuing MigrateRequest to range)
+
+
+-- Send queue and flow token metrics from n1 post-migrate. The migrate should
+-- have triggered a force flush of the send queue.
+SELECT name, crdb_internal.humanize_bytes(value::INT8)
+ FROM crdb_internal.node_metrics
+ WHERE name LIKE '%kvflowcontrol%send_queue%'
+ AND name != 'kvflowcontrol.send_queue.count'
+ORDER BY name ASC;
+
+ kvflowcontrol.send_queue.bytes | 0 B
+ kvflowcontrol.send_queue.prevent.count | 0 B
+ kvflowcontrol.send_queue.scheduled.deducted_bytes | 0 B
+ kvflowcontrol.send_queue.scheduled.force_flush | 0 B
+ kvflowcontrol.tokens.send.elastic.deducted.force_flush_send_queue | 1.0 MiB
+ kvflowcontrol.tokens.send.elastic.deducted.prevent_send_queue | 0 B
+ kvflowcontrol.tokens.send.regular.deducted.prevent_send_queue | 0 B
+SELECT store_id,
+ crdb_internal.humanize_bytes(available_eval_regular_tokens),
+ crdb_internal.humanize_bytes(available_eval_elastic_tokens),
+ crdb_internal.humanize_bytes(available_send_regular_tokens),
+ crdb_internal.humanize_bytes(available_send_elastic_tokens)
+ FROM crdb_internal.kv_flow_controller_v2
+ ORDER BY store_id ASC;
+
+ store_id | eval_regular_available | eval_elastic_available | send_regular_available | send_elastic_available
+-----------+------------------------+------------------------+------------------------+-------------------------
+ 1 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB
+ 2 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB
+ 3 | 0 B | -3.0 MiB | 0 B | -3.0 MiB
+
+
+(Sending 1 MiB put request to the migrated range)
+
+
+(Sent 1 MiB put request to the migrated range)
+
+
+-- Send queue and flow token metrics from n1 post-migrate and post 1 MiB put.
+-- We expect to see the send queue develop for s3 again.
+SELECT name, crdb_internal.humanize_bytes(value::INT8)
+ FROM crdb_internal.node_metrics
+ WHERE name LIKE '%kvflowcontrol%send_queue%'
+ AND name != 'kvflowcontrol.send_queue.count'
+ORDER BY name ASC;
+
+ kvflowcontrol.send_queue.bytes | 1.0 MiB
+ kvflowcontrol.send_queue.prevent.count | 0 B
+ kvflowcontrol.send_queue.scheduled.deducted_bytes | 0 B
+ kvflowcontrol.send_queue.scheduled.force_flush | 0 B
+ kvflowcontrol.tokens.send.elastic.deducted.force_flush_send_queue | 1.0 MiB
+ kvflowcontrol.tokens.send.elastic.deducted.prevent_send_queue | 0 B
+ kvflowcontrol.tokens.send.regular.deducted.prevent_send_queue | 0 B
+SELECT store_id,
+ crdb_internal.humanize_bytes(available_eval_regular_tokens),
+ crdb_internal.humanize_bytes(available_eval_elastic_tokens),
+ crdb_internal.humanize_bytes(available_send_regular_tokens),
+ crdb_internal.humanize_bytes(available_send_elastic_tokens)
+ FROM crdb_internal.kv_flow_controller_v2
+ ORDER BY store_id ASC;
+
+ store_id | eval_regular_available | eval_elastic_available | send_regular_available | send_elastic_available
+-----------+------------------------+------------------------+------------------------+-------------------------
+ 1 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB
+ 2 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB
+ 3 | 0 B | -4.0 MiB | 0 B | -3.0 MiB
+
+
+-- (Allowing below-raft admission to proceed on n3.)
+
+
+-- Send queue and flow token metrics from n1. All tokens should be returned.
+SELECT name, crdb_internal.humanize_bytes(value::INT8)
+ FROM crdb_internal.node_metrics
+ WHERE name LIKE '%kvflowcontrol%send_queue%'
+ AND name != 'kvflowcontrol.send_queue.count'
+ORDER BY name ASC;
+
+ kvflowcontrol.send_queue.bytes | 0 B
+ kvflowcontrol.send_queue.prevent.count | 0 B
+ kvflowcontrol.send_queue.scheduled.deducted_bytes | 0 B
+ kvflowcontrol.send_queue.scheduled.force_flush | 0 B
+ kvflowcontrol.tokens.send.elastic.deducted.force_flush_send_queue | 1.0 MiB
+ kvflowcontrol.tokens.send.elastic.deducted.prevent_send_queue | 0 B
+ kvflowcontrol.tokens.send.regular.deducted.prevent_send_queue | 0 B
+SELECT store_id,
+ crdb_internal.humanize_bytes(available_eval_regular_tokens),
+ crdb_internal.humanize_bytes(available_eval_elastic_tokens),
+ crdb_internal.humanize_bytes(available_send_regular_tokens),
+ crdb_internal.humanize_bytes(available_send_elastic_tokens)
+ FROM crdb_internal.kv_flow_controller_v2
+ ORDER BY store_id ASC;
+
+ store_id | eval_regular_available | eval_elastic_available | send_regular_available | send_elastic_available
+-----------+------------------------+------------------------+------------------------+-------------------------
+ 1 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB
+ 2 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB
+ 3 | 4.0 MiB | 2.0 MiB | 4.0 MiB | 2.0 MiB
+----
+----
+
+# vim:ft=sql
diff --git a/pkg/roachprod/opentelemetry/cockroachdb_metrics.go b/pkg/roachprod/opentelemetry/cockroachdb_metrics.go
index ed8cc82ccde8..dc20ede4bde2 100644
--- a/pkg/roachprod/opentelemetry/cockroachdb_metrics.go
+++ b/pkg/roachprod/opentelemetry/cockroachdb_metrics.go
@@ -431,8 +431,10 @@ var cockroachdbMetrics = map[string]string{
"distsender_batch_responses_cross_zone_bytes": "distsender.batch_responses.cross_zone.bytes",
"distsender_batch_responses_replica_addressed_bytes": "distsender.batch_responses.replica_addressed.bytes",
"distsender_batches": "distsender.batches.total",
+ "distsender_batches_async_in_progress": "distsender.batches.async.in_progress",
"distsender_batches_async_sent": "distsender.batches.async.sent",
"distsender_batches_async_throttled": "distsender.batches.async.throttled",
+ "distsender_batches_async_throttled_cumulative_duration_nanos": "distsender.batches.async.throttled_cumulative_duration_nanos",
"distsender_batches_partial": "distsender.batches.partial",
"distsender_circuit_breaker_replicas_count": "distsender.circuit_breaker.replicas.count",
"distsender_circuit_breaker_replicas_probes_failure": "distsender.circuit_breaker.replicas.probes.failure",
@@ -1013,6 +1015,10 @@ var cockroachdbMetrics = map[string]string{
"kv_replica_write_batch_evaluate_latency_sum": "kv.replica_write_batch_evaluate.latency.sum",
"kv_split_estimated_stats": "kv.split.estimated_stats",
"kv_split_total_bytes_estimates": "kv.split.total_bytes_estimates",
+ "kv_streamer_batches_in_progress": "kv.streamer.batches.in_progress",
+ "kv_streamer_batches_sent": "kv.streamer.batches.sent",
+ "kv_streamer_batches_throttled": "kv.streamer.batches.throttled",
+ "kv_streamer_operators_active": "kv.streamer.operators.active",
"kv_tenant_rate_limit_current_blocked": "kv.tenant_rate_limit.current_blocked",
"kv_tenant_rate_limit_num_tenants": "kv.tenant_rate_limit.num_tenants",
"kv_tenant_rate_limit_read_batches_admitted": "kv.tenant_rate_limit.read_batches_admitted",
diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel
index d823ffeb61a6..b31d88567417 100644
--- a/pkg/server/BUILD.bazel
+++ b/pkg/server/BUILD.bazel
@@ -117,6 +117,7 @@ go_library(
"//pkg/kv/bulk",
"//pkg/kv/kvclient",
"//pkg/kv/kvclient/kvcoord",
+ "//pkg/kv/kvclient/kvstreamer",
"//pkg/kv/kvclient/kvtenant",
"//pkg/kv/kvclient/rangefeed",
"//pkg/kv/kvclient/rangestats",
@@ -246,6 +247,7 @@ go_library(
"//pkg/sql/privilege",
"//pkg/sql/querycache",
"//pkg/sql/rangeprober",
+ "//pkg/sql/rolemembershipcache",
"//pkg/sql/roleoption",
"//pkg/sql/scheduledlogging",
"//pkg/sql/schemachanger/scdeps",
diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go
index a2862735a9b3..a9b408f5c22a 100644
--- a/pkg/server/server_sql.go
+++ b/pkg/server/server_sql.go
@@ -32,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/bulk"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
+ "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvstreamer"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangestats"
@@ -89,6 +90,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire"
"github.com/cockroachdb/cockroach/pkg/sql/querycache"
"github.com/cockroachdb/cockroach/pkg/sql/rangeprober"
+ "github.com/cockroachdb/cockroach/pkg/sql/rolemembershipcache"
"github.com/cockroachdb/cockroach/pkg/sql/scheduledlogging"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scdeps"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec"
@@ -745,6 +747,8 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
cfg.registry.AddMetricStruct(rowMetrics)
internalRowMetrics := sql.NewRowMetrics(true /* internal */)
cfg.registry.AddMetricStruct(internalRowMetrics)
+ kvStreamerMetrics := kvstreamer.MakeMetrics()
+ cfg.registry.AddMetricStruct(kvStreamerMetrics)
virtualSchemas, err := sql.NewVirtualSchemaHolder(ctx, cfg.Settings)
if err != nil {
@@ -839,6 +843,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
Metrics: &distSQLMetrics,
RowMetrics: &rowMetrics,
InternalRowMetrics: &internalRowMetrics,
+ KVStreamerMetrics: &kvStreamerMetrics,
SQLLivenessReader: cfg.sqlLivenessProvider.CachedReader(),
JobRegistry: jobRegistry,
@@ -983,8 +988,8 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
VirtualSchemas: virtualSchemas,
HistogramWindowInterval: cfg.HistogramWindowInterval(),
RangeDescriptorCache: cfg.distSender.RangeDescriptorCache(),
- RoleMemberCache: sql.NewMembershipCache(
- serverCacheMemoryMonitor.MakeBoundAccount(), cfg.stopper,
+ RoleMemberCache: rolemembershipcache.NewMembershipCache(
+ serverCacheMemoryMonitor.MakeBoundAccount(), cfg.internalDB, cfg.stopper,
),
SequenceCacheNode: sessiondatapb.NewSequenceCacheNode(),
SessionInitCache: sessioninit.NewCache(
diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel
index 8d962a280c7a..0637cc4be224 100644
--- a/pkg/sql/BUILD.bazel
+++ b/pkg/sql/BUILD.bazel
@@ -435,7 +435,6 @@ go_library(
"//pkg/sql/isql",
"//pkg/sql/lex",
"//pkg/sql/lexbase",
- "//pkg/sql/memsize",
"//pkg/sql/mutations",
"//pkg/sql/oidext",
"//pkg/sql/opt",
@@ -469,6 +468,7 @@ go_library(
"//pkg/sql/querycache",
"//pkg/sql/regionliveness",
"//pkg/sql/regions",
+ "//pkg/sql/rolemembershipcache",
"//pkg/sql/roleoption",
"//pkg/sql/row",
"//pkg/sql/rowcontainer",
@@ -587,7 +587,6 @@ go_library(
"//pkg/util/startup",
"//pkg/util/stop",
"//pkg/util/syncutil",
- "//pkg/util/syncutil/singleflight",
"//pkg/util/timeutil",
"//pkg/util/timeutil/pgdate",
"//pkg/util/tochar",
diff --git a/pkg/sql/alter_role.go b/pkg/sql/alter_role.go
index 07384fd3bb7b..60b1dfe1361f 100644
--- a/pkg/sql/alter_role.go
+++ b/pkg/sql/alter_role.go
@@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotice"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/roleoption"
+ "github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
@@ -193,7 +194,7 @@ func (n *alterRoleNode) startExec(params runParams) error {
opName,
params.p.txn,
sessiondata.NodeUserSessionDataOverride,
- fmt.Sprintf("SELECT 1 FROM %s WHERE username = $1", sessioninit.UsersTableName),
+ fmt.Sprintf("SELECT 1 FROM system.public.%s WHERE username = $1", catconstants.UsersTableName),
n.roleName,
)
if err != nil {
@@ -442,19 +443,19 @@ func (n *alterRoleSetNode) startExec(params runParams) error {
}
var deleteQuery = fmt.Sprintf(
- `DELETE FROM %s WHERE database_id = $1 AND role_name = $2`,
- sessioninit.DatabaseRoleSettingsTableName,
+ `DELETE FROM system.public.%s WHERE database_id = $1 AND role_name = $2`,
+ catconstants.DatabaseRoleSettingsTableName,
)
var upsertQuery = fmt.Sprintf(`
-UPSERT INTO %s (database_id, role_name, settings, role_id)
+UPSERT INTO system.public.%s (database_id, role_name, settings, role_id)
VALUES ($1, $2, $3, (
SELECT CASE $2
WHEN '%s' THEN %d
ELSE (SELECT user_id FROM system.users WHERE username = $2)
END
))`,
- sessioninit.DatabaseRoleSettingsTableName, username.EmptyRole, username.EmptyRoleID,
+ catconstants.DatabaseRoleSettingsTableName, username.EmptyRole, username.EmptyRoleID,
)
// Instead of inserting an empty settings array, this function will make
@@ -583,7 +584,7 @@ func (n *alterRoleSetNode) getRoleName(
opName,
params.p.txn,
sessiondata.NodeUserSessionDataOverride,
- fmt.Sprintf("SELECT 1 FROM %s WHERE username = $1", sessioninit.UsersTableName),
+ fmt.Sprintf("SELECT 1 FROM system.public.%s WHERE username = $1", catconstants.UsersTableName),
n.roleName,
)
if err != nil {
@@ -636,8 +637,8 @@ func (n *alterRoleSetNode) makeNewSettings(
params runParams, opName redact.RedactableString, roleName username.SQLUsername,
) (oldSettings []string, newSettings []string, err error) {
var selectQuery = fmt.Sprintf(
- `SELECT settings FROM %s WHERE database_id = $1 AND role_name = $2`,
- sessioninit.DatabaseRoleSettingsTableName,
+ `SELECT settings FROM system.public.%s WHERE database_id = $1 AND role_name = $2`,
+ catconstants.DatabaseRoleSettingsTableName,
)
datums, err := params.p.InternalSQLTxn().QueryRowEx(
params.ctx,
diff --git a/pkg/sql/authorization.go b/pkg/sql/authorization.go
index 04f7268abaeb..ad2a0686c23a 100644
--- a/pkg/sql/authorization.go
+++ b/pkg/sql/authorization.go
@@ -12,7 +12,6 @@ import (
"strings"
"github.com/cockroachdb/cockroach/pkg/keys"
- "github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
@@ -21,102 +20,21 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/funcdesc"
- "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/schemadesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc"
- "github.com/cockroachdb/cockroach/pkg/sql/isql"
- "github.com/cockroachdb/cockroach/pkg/sql/memsize"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/roleoption"
+ "github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
- "github.com/cockroachdb/cockroach/pkg/sql/sessioninit"
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
"github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilege"
- "github.com/cockroachdb/cockroach/pkg/util/hlc"
- "github.com/cockroachdb/cockroach/pkg/util/log"
- "github.com/cockroachdb/cockroach/pkg/util/mon"
- "github.com/cockroachdb/cockroach/pkg/util/stop"
- "github.com/cockroachdb/cockroach/pkg/util/syncutil"
- "github.com/cockroachdb/cockroach/pkg/util/syncutil/singleflight"
"github.com/cockroachdb/errors"
)
-// MembershipCache is a shared cache for role membership information.
-type MembershipCache struct {
- syncutil.Mutex
- tableVersion descpb.DescriptorVersion
- boundAccount mon.BoundAccount
- // userCache is a mapping from username to userRoleMembership.
- userCache map[username.SQLUsername]userRoleMembership
- // populateCacheGroup ensures that there is at most one request in-flight
- // for each key.
- populateCacheGroup *singleflight.Group
- // readTS is the timestamp that was used to populate the cache.
- readTS hlc.Timestamp
- stopper *stop.Stopper
-}
-
-// NewMembershipCache initializes a new MembershipCache.
-func NewMembershipCache(account mon.BoundAccount, stopper *stop.Stopper) *MembershipCache {
- return &MembershipCache{
- boundAccount: account,
- populateCacheGroup: singleflight.NewGroup("lookup role membership", "key"),
- stopper: stopper,
- }
-}
-
-// RunAtCacheReadTS runs an operations at a timestamp that is guaranteed to
-// be consistent with the data in the cache. txn is used to check if the
-// table version matches the cached table version, and if it does, db is
-// used to start a separate transaction at the cached timestamp.
-func (m *MembershipCache) RunAtCacheReadTS(
- ctx context.Context, db descs.DB, txn descs.Txn, f func(context.Context, descs.Txn) error,
-) error {
- tableDesc, err := txn.Descriptors().ByIDWithLeased(txn.KV()).Get().Table(ctx, keys.RoleMembersTableID)
- if err != nil {
- return err
- }
-
- var readTS hlc.Timestamp
- func() {
- m.Lock()
- defer m.Unlock()
- if tableDesc.IsUncommittedVersion() {
- return
- }
- if tableDesc.GetVersion() != m.tableVersion {
- return
- }
- // The cached ts could be from long ago, so use the table modification
- // if it's more recent.
- if m.readTS.Less(tableDesc.GetModificationTime()) {
- readTS = tableDesc.GetModificationTime()
- return
- }
- readTS = m.readTS
- }()
-
- // If there's no cached read timestamp, then use the existing transaction.
- if readTS.IsEmpty() {
- return f(ctx, txn)
- }
-
- // If we found a historical timestamp to use, run in a different transaction.
- return db.DescsTxn(ctx, func(ctx context.Context, newTxn descs.Txn) error {
- if err := newTxn.KV().SetFixedTimestamp(ctx, readTS); err != nil {
- return err
- }
- return f(ctx, newTxn)
- })
-}
-
-// userRoleMembership is a mapping of "rolename" -> "with admin option".
-type userRoleMembership map[username.SQLUsername]bool
-
// AuthorizationAccessor for checking authorization (e.g. desc privileges).
type AuthorizationAccessor interface {
// CheckPrivilegeForTableID verifies that the user has `privilege` on the table
@@ -605,166 +523,7 @@ func (p *planner) MemberOfWithAdminOption(
func MemberOfWithAdminOption(
ctx context.Context, execCfg *ExecutorConfig, txn descs.Txn, member username.SQLUsername,
) (_ map[username.SQLUsername]bool, retErr error) {
- if txn == nil {
- return nil, errors.AssertionFailedf("cannot use MemberOfWithAdminoption without a txn")
- }
-
- roleMembersCache := execCfg.RoleMemberCache
-
- // Lookup table version.
- _, tableDesc, err := descs.PrefixAndTable(
- ctx, txn.Descriptors().ByNameWithLeased(txn.KV()).Get(), &roleMembersTableName,
- )
- if err != nil {
- return nil, err
- }
-
- tableVersion := tableDesc.GetVersion()
- if tableDesc.IsUncommittedVersion() {
- return resolveMemberOfWithAdminOption(ctx, member, txn)
- }
- if txn.SessionData().AllowRoleMembershipsToChangeDuringTransaction {
- defer func() {
- if retErr != nil {
- return
- }
- txn.Descriptors().ReleaseSpecifiedLeases(ctx, []lease.IDVersion{
- {
- Name: tableDesc.GetName(),
- ID: tableDesc.GetID(),
- Version: tableVersion,
- },
- })
- }()
- }
-
- // Check version and maybe clear cache while holding the mutex.
- // We use a closure here so that we release the lock here, then keep
- // going and re-lock if adding the looked-up entry.
- userMapping, found, refreshCache := func() (userRoleMembership, bool, bool) {
- roleMembersCache.Lock()
- defer roleMembersCache.Unlock()
- if roleMembersCache.tableVersion < tableVersion {
- // If the cache is based on an old table version, then update version and
- // drop the map.
- roleMembersCache.tableVersion = tableVersion
- roleMembersCache.userCache = make(map[username.SQLUsername]userRoleMembership)
- roleMembersCache.boundAccount.Empty(ctx)
- return nil, false, true
- } else if roleMembersCache.tableVersion > tableVersion {
- // If the cache is based on a newer table version, then this transaction
- // should not use the cached data.
- return nil, false, true
- }
- userMapping, ok := roleMembersCache.userCache[member]
- return userMapping, ok, len(roleMembersCache.userCache) == 0
- }()
-
- if !refreshCache {
- // The cache always contains entries for every role that exists, so the
- // only time we need to refresh the cache is if it has been invalidated
- // by the tableVersion changing.
- if found {
- return userMapping, nil
- } else {
- return nil, sqlerrors.NewUndefinedUserError(member)
- }
- }
-
- // If `txn` is high priority and in a retry, do not launch the singleflight to
- // populate the cache because it can potentially cause a deadlock (see
- // TestConcurrentGrants/concurrent-GRANTs-high-priority for a repro) and
- // instead just issue a read using `txn` to `system.role_members` table and
- // return the result.
- if txn.KV().UserPriority() == roachpb.MaxUserPriority && txn.KV().Epoch() > 0 {
- return resolveMemberOfWithAdminOption(ctx, member, txn)
- }
-
- // Lookup memberships outside the lock. There will be at most one request
- // in-flight for each user. The role_memberships table version is also part
- // of the request key so that we don't read data from an old version of the
- // table.
- //
- // The singleflight closure uses a fresh transaction to prevent a data race
- // that may occur if the context is cancelled, leading to the outer txn
- // being cleaned up. We set the timestamp of this new transaction to be
- // the same as the outer transaction that already read the descriptor, to
- // ensure that we are reading from the right version of the table.
- newTxnTimestamp := txn.KV().ReadTimestamp()
- future, _ := roleMembersCache.populateCacheGroup.DoChan(ctx,
- fmt.Sprintf("refreshMembershipCache-%d", tableVersion),
- singleflight.DoOpts{
- Stop: roleMembersCache.stopper,
- InheritCancelation: false,
- },
- func(ctx context.Context) (interface{}, error) {
- var allMemberships map[username.SQLUsername]userRoleMembership
- err = execCfg.InternalDB.Txn(ctx, func(ctx context.Context, newTxn isql.Txn) error {
- // Run the membership read as high-priority, thereby pushing any intents
- // out of its way. This prevents deadlocks in cases where a GRANT/REVOKE
- // txn, which has already laid a write intent on the
- // `system.role_members` table, waits for `newTxn` and `newTxn`, which
- // attempts to read the same system table, is blocked by the
- // GRANT/REVOKE txn.
- if err := newTxn.KV().SetUserPriority(roachpb.MaxUserPriority); err != nil {
- return err
- }
- err := newTxn.KV().SetFixedTimestamp(ctx, newTxnTimestamp)
- if err != nil {
- return err
- }
- allMemberships, err = resolveAllMemberships(ctx, newTxn)
- if err != nil {
- return err
- }
- return err
- })
- if err != nil {
- return nil, err
- }
- func() {
- // Update membership if the table version hasn't changed.
- roleMembersCache.Lock()
- defer roleMembersCache.Unlock()
- if roleMembersCache.tableVersion != tableVersion {
- // Table version has changed while we were looking: don't cache the data.
- return
- }
-
- // Table version remains the same: update map, unlock, return.
- sizeOfCache := int64(0)
- for _, memberships := range allMemberships {
- sizeOfEntry := int64(len(member.Normalized()))
- for m := range memberships {
- sizeOfEntry += int64(len(m.Normalized()))
- sizeOfEntry += memsize.Bool
- }
- sizeOfCache += sizeOfEntry
- }
- if err := roleMembersCache.boundAccount.Grow(ctx, sizeOfCache); err != nil {
- // If there is no memory available to cache the entry, we can still
- // proceed so that the query has a chance to succeed.
- log.Ops.Warningf(ctx, "no memory available to cache role membership info: %v", err)
- } else {
- roleMembersCache.userCache = allMemberships
- roleMembersCache.readTS = newTxnTimestamp
- }
- }()
- return allMemberships, nil
- })
- var allMemberships map[username.SQLUsername]userRoleMembership
- res := future.WaitForResult(ctx)
- if res.Err != nil {
- return nil, res.Err
- }
- allMemberships = res.Val.(map[username.SQLUsername]userRoleMembership)
- memberships, found := allMemberships[member]
- // The map contains entries for every role that exists, so if it's
- // not present in the map, the role does not exist.
- if !found {
- return nil, sqlerrors.NewUndefinedUserError(member)
- }
- return memberships, nil
+ return execCfg.RoleMemberCache.GetRolesForMember(ctx, txn, member)
}
// EnsureUserOnlyBelongsToRoles grants all the roles in `roles` to `user` and,
@@ -862,144 +621,6 @@ var enableGrantOptionForOwner = settings.RegisterBoolSetting(
settings.WithPublic,
)
-// resolveMemberOfWithAdminOption performs the actual recursive role membership lookup.
-func resolveMemberOfWithAdminOption(
- ctx context.Context, member username.SQLUsername, txn isql.Txn,
-) (map[username.SQLUsername]bool, error) {
- allMemberships, err := resolveAllMemberships(ctx, txn)
- if err != nil {
- return nil, err
- }
-
- // allMemberships will have an entry for every user, so we first verify that
- // the user exists.
- ret, roleExists := allMemberships[member]
- if !roleExists {
- return nil, sqlerrors.NewUndefinedUserError(member)
- }
-
- return ret, nil
-}
-
-// membership represents a parent-child role relationship.
-type membership struct {
- parent, child username.SQLUsername
- isAdmin bool
-}
-
-func resolveAllMemberships(
- ctx context.Context, txn isql.Txn,
-) (map[username.SQLUsername]userRoleMembership, error) {
- memberToDirectParents := make(map[username.SQLUsername][]membership)
- if err := forEachRoleWithMemberships(
- ctx, txn,
- func(ctx context.Context, role username.SQLUsername, memberships []membership) error {
- memberToDirectParents[role] = memberships
- return nil
- },
- ); err != nil {
- return nil, err
- }
- // We need to add entries for the node and public roles, which do not have
- // rows in system.users.
- memberToDirectParents[username.NodeUserName()] = append(
- memberToDirectParents[username.NodeUserName()],
- membership{
- parent: username.AdminRoleName(),
- child: username.NodeUserName(),
- isAdmin: true,
- },
- )
- memberToDirectParents[username.PublicRoleName()] = []membership{}
-
- // Recurse through all roles associated with each user.
- ret := make(map[username.SQLUsername]userRoleMembership)
- for member := range memberToDirectParents {
- memberToAllAncestors := make(map[username.SQLUsername]bool)
- var recurse func(u username.SQLUsername)
- recurse = func(u username.SQLUsername) {
- for _, membership := range memberToDirectParents[u] {
- // If the parent role was seen before, we still might need to update
- // the isAdmin flag for that role, but there's no need to recurse
- // through the role's ancestry again.
- prev, alreadySeen := memberToAllAncestors[membership.parent]
- memberToAllAncestors[membership.parent] = prev || membership.isAdmin
- if !alreadySeen {
- recurse(membership.parent)
- }
- }
- }
- recurse(member)
- ret[member] = memberToAllAncestors
- }
- return ret, nil
-}
-
-func forEachRoleWithMemberships(
- ctx context.Context,
- txn isql.Txn,
- fn func(ctx context.Context, role username.SQLUsername, memberships []membership) error,
-) (retErr error) {
- const query = `
-SELECT
- u.username,
- array_agg(rm.role ORDER BY rank),
- array_agg(rm."isAdmin" ORDER BY rank)
-FROM system.users u
-LEFT OUTER JOIN (
- SELECT *, rank() OVER (PARTITION BY member ORDER BY role)
- FROM system.role_members
-) rm ON u.username = rm.member
-GROUP BY u.username;`
- it, err := txn.QueryIteratorEx(ctx, "read-role-with-memberships", txn.KV(),
- sessiondata.NodeUserSessionDataOverride, query)
- if err != nil {
- return err
- }
- // We have to make sure to close the iterator since we might return from the
- // for loop early (before Next() returns false).
- defer func() { retErr = errors.CombineErrors(retErr, it.Close()) }()
-
- var ok bool
- var loopErr error
- for ok, loopErr = it.Next(ctx); ok; ok, loopErr = it.Next(ctx) {
- row := it.Cur()
- child := tree.MustBeDString(row[0])
- childName := username.MakeSQLUsernameFromPreNormalizedString(string(child))
- parentNames := tree.MustBeDArray(row[1])
- isAdmins := tree.MustBeDArray(row[2])
-
- memberships := make([]membership, 0, parentNames.Len())
- for i := 0; i < parentNames.Len(); i++ {
- if parentNames.Array[i] == tree.DNull {
- // A null element means this role has no parents.
- continue
- }
- // The names in the system tables are already normalized.
- parent := tree.MustBeDString(parentNames.Array[i])
- parentName := username.MakeSQLUsernameFromPreNormalizedString(string(parent))
- isAdmin := tree.MustBeDBool(isAdmins.Array[i])
- memberships = append(memberships, membership{
- parent: parentName,
- child: childName,
- isAdmin: bool(isAdmin),
- })
- }
-
- if err := fn(
- ctx,
- childName,
- memberships,
- ); err != nil {
- return err
- }
- }
- if loopErr != nil {
- return loopErr
- }
- return nil
-}
-
// UserHasRoleOption implements the AuthorizationAccessor interface.
func (p *planner) UserHasRoleOption(
ctx context.Context, user username.SQLUsername, roleOption roleoption.Option,
@@ -1028,8 +649,8 @@ func (p *planner) UserHasRoleOption(
ctx, "has-role-option", p.Txn(),
sessiondata.NodeUserSessionDataOverride,
fmt.Sprintf(
- `SELECT 1 from %s WHERE option = '%s' AND username = $1 LIMIT 1`,
- sessioninit.RoleOptionsTableName, roleOption.String()), user.Normalized())
+ `SELECT 1 from system.public.%s WHERE option = '%s' AND username = $1 LIMIT 1`,
+ catconstants.RoleOptionsTableName, roleOption.String()), user.Normalized())
if err != nil {
return false, err
}
diff --git a/pkg/sql/colfetcher/index_join.go b/pkg/sql/colfetcher/index_join.go
index cd8cdff6d4a1..6827dd8ace58 100644
--- a/pkg/sql/colfetcher/index_join.go
+++ b/pkg/sql/colfetcher/index_join.go
@@ -555,6 +555,7 @@ func NewColIndexJoin(
}
kvFetcher = row.NewStreamingKVFetcher(
flowCtx.Cfg.DistSender,
+ flowCtx.Cfg.KVStreamerMetrics,
flowCtx.Stopper(),
txn,
flowCtx.Cfg.Settings,
diff --git a/pkg/sql/create_role.go b/pkg/sql/create_role.go
index b32996063130..d538d605756f 100644
--- a/pkg/sql/create_role.go
+++ b/pkg/sql/create_role.go
@@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotice"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/roleoption"
+ "github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sessioninit"
@@ -138,7 +139,7 @@ func (n *CreateRoleNode) startExec(params runParams) error {
opName,
params.p.txn,
sessiondata.NodeUserSessionDataOverride,
- fmt.Sprintf(`select "isRole" from %s where username = $1`, sessioninit.UsersTableName),
+ fmt.Sprintf(`select "isRole" from system.public.%s where username = $1`, catconstants.UsersTableName),
n.roleName,
)
if err != nil {
@@ -153,7 +154,7 @@ func (n *CreateRoleNode) startExec(params runParams) error {
}
// TODO(richardjcai): move hashedPassword column to system.role_options.
- stmt := fmt.Sprintf("INSERT INTO %s VALUES ($1, $2, $3, $4)", sessioninit.UsersTableName)
+ stmt := fmt.Sprintf("INSERT INTO system.public.%s VALUES ($1, $2, $3, $4)", catconstants.UsersTableName)
roleID, err := descidgen.GenerateUniqueRoleID(params.ctx, params.ExecCfg().DB, params.ExecCfg().Codec)
if err != nil {
return err
diff --git a/pkg/sql/descmetadata/BUILD.bazel b/pkg/sql/descmetadata/BUILD.bazel
index f882fdbc42cd..a806ccb824ff 100644
--- a/pkg/sql/descmetadata/BUILD.bazel
+++ b/pkg/sql/descmetadata/BUILD.bazel
@@ -14,6 +14,7 @@ go_library(
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/isql",
"//pkg/sql/schemachanger/scexec",
+ "//pkg/sql/sem/catconstants",
"//pkg/sql/sessiondata",
"//pkg/sql/sessioninit",
"//pkg/sql/ttl/ttlbase",
diff --git a/pkg/sql/descmetadata/metadata_updater.go b/pkg/sql/descmetadata/metadata_updater.go
index 0c753620aca1..c60a821227ce 100644
--- a/pkg/sql/descmetadata/metadata_updater.go
+++ b/pkg/sql/descmetadata/metadata_updater.go
@@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec"
+ "github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sessioninit"
"github.com/cockroachdb/cockroach/pkg/sql/ttl/ttlbase"
@@ -58,8 +59,8 @@ func (mu metadataUpdater) DeleteDatabaseRoleSettings(ctx context.Context, dbID d
mu.txn.KV(),
sessiondata.NodeUserSessionDataOverride,
fmt.Sprintf(
- `DELETE FROM %s WHERE database_id = $1`,
- sessioninit.DatabaseRoleSettingsTableName,
+ `DELETE FROM system.public.%s WHERE database_id = $1`,
+ catconstants.DatabaseRoleSettingsTableName,
),
dbID,
)
diff --git a/pkg/sql/drop_role.go b/pkg/sql/drop_role.go
index b550dbbbb3a3..a2685f4a0722 100644
--- a/pkg/sql/drop_role.go
+++ b/pkg/sql/drop_role.go
@@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
+ "github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sessioninit"
@@ -447,8 +448,8 @@ func (n *DropRoleNode) startExec(params runParams) error {
params.p.txn,
sessiondata.NodeUserSessionDataOverride,
fmt.Sprintf(
- `DELETE FROM %s WHERE username=$1`,
- sessioninit.RoleOptionsTableName,
+ `DELETE FROM system.public.%s WHERE username=$1`,
+ catconstants.RoleOptionsTableName,
),
normalizedUsername,
)
@@ -462,8 +463,8 @@ func (n *DropRoleNode) startExec(params runParams) error {
params.p.txn,
sessiondata.NodeUserSessionDataOverride,
fmt.Sprintf(
- `DELETE FROM %s WHERE role_name = $1`,
- sessioninit.DatabaseRoleSettingsTableName,
+ `DELETE FROM system.public.%s WHERE role_name = $1`,
+ catconstants.DatabaseRoleSettingsTableName,
),
normalizedUsername,
); err != nil {
diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go
index b62e117a35db..96bf7696f099 100644
--- a/pkg/sql/exec_util.go
+++ b/pkg/sql/exec_util.go
@@ -82,6 +82,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirecancel"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
"github.com/cockroachdb/cockroach/pkg/sql/querycache"
+ "github.com/cockroachdb/cockroach/pkg/sql/rolemembershipcache"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/rowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/scheduledlogging"
@@ -1335,7 +1336,7 @@ type ExecutorConfig struct {
RangeDescriptorCache *rangecache.RangeCache
// Role membership cache.
- RoleMemberCache *MembershipCache
+ RoleMemberCache *rolemembershipcache.MembershipCache
// Node-level sequence cache
SequenceCacheNode *sessiondatapb.SequenceCacheNode
diff --git a/pkg/sql/execinfra/BUILD.bazel b/pkg/sql/execinfra/BUILD.bazel
index 91e40c9c244b..8e434e905449 100644
--- a/pkg/sql/execinfra/BUILD.bazel
+++ b/pkg/sql/execinfra/BUILD.bazel
@@ -30,6 +30,7 @@ go_library(
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvclient/kvcoord",
+ "//pkg/kv/kvclient/kvstreamer",
"//pkg/kv/kvclient/rangecache",
"//pkg/kv/kvpb",
"//pkg/kv/kvserver/diskmap",
diff --git a/pkg/sql/execinfra/server_config.go b/pkg/sql/execinfra/server_config.go
index ac5d3d69de86..af4df167d836 100644
--- a/pkg/sql/execinfra/server_config.go
+++ b/pkg/sql/execinfra/server_config.go
@@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
+ "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvstreamer"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/diskmap"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
@@ -128,6 +129,7 @@ type ServerConfig struct {
Metrics *DistSQLMetrics
RowMetrics *rowinfra.Metrics
InternalRowMetrics *rowinfra.Metrics
+ KVStreamerMetrics *kvstreamer.Metrics
// SQLLivenessReader provides access to reading the liveness of sessions.
SQLLivenessReader sqlliveness.Reader
diff --git a/pkg/sql/rolemembershipcache/BUILD.bazel b/pkg/sql/rolemembershipcache/BUILD.bazel
new file mode 100644
index 000000000000..d690fae5c4b4
--- /dev/null
+++ b/pkg/sql/rolemembershipcache/BUILD.bazel
@@ -0,0 +1,28 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
+
+go_library(
+ name = "rolemembershipcache",
+ srcs = ["cache.go"],
+ importpath = "github.com/cockroachdb/cockroach/pkg/sql/rolemembershipcache",
+ visibility = ["//visibility:public"],
+ deps = [
+ "//pkg/keys",
+ "//pkg/roachpb",
+ "//pkg/security/username",
+ "//pkg/sql/catalog/descpb",
+ "//pkg/sql/catalog/descs",
+ "//pkg/sql/catalog/lease",
+ "//pkg/sql/isql",
+ "//pkg/sql/memsize",
+ "//pkg/sql/sem/tree",
+ "//pkg/sql/sessiondata",
+ "//pkg/sql/sqlerrors",
+ "//pkg/util/hlc",
+ "//pkg/util/log",
+ "//pkg/util/mon",
+ "//pkg/util/stop",
+ "//pkg/util/syncutil",
+ "//pkg/util/syncutil/singleflight",
+ "@com_github_cockroachdb_errors//:errors",
+ ],
+)
diff --git a/pkg/sql/rolemembershipcache/cache.go b/pkg/sql/rolemembershipcache/cache.go
new file mode 100644
index 000000000000..208dff2ce1a0
--- /dev/null
+++ b/pkg/sql/rolemembershipcache/cache.go
@@ -0,0 +1,410 @@
+// Copyright 2024 The Cockroach Authors.
+//
+// Use of this software is governed by the CockroachDB Software License
+// included in the /LICENSE file.
+
+package rolemembershipcache
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/cockroachdb/cockroach/pkg/keys"
+ "github.com/cockroachdb/cockroach/pkg/roachpb"
+ "github.com/cockroachdb/cockroach/pkg/security/username"
+ "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
+ "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
+ "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
+ "github.com/cockroachdb/cockroach/pkg/sql/isql"
+ "github.com/cockroachdb/cockroach/pkg/sql/memsize"
+ "github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
+ "github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
+ "github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
+ "github.com/cockroachdb/cockroach/pkg/util/hlc"
+ "github.com/cockroachdb/cockroach/pkg/util/log"
+ "github.com/cockroachdb/cockroach/pkg/util/mon"
+ "github.com/cockroachdb/cockroach/pkg/util/stop"
+ "github.com/cockroachdb/cockroach/pkg/util/syncutil"
+ "github.com/cockroachdb/cockroach/pkg/util/syncutil/singleflight"
+ "github.com/cockroachdb/errors"
+)
+
+// MembershipCache is a shared cache for role membership information.
+type MembershipCache struct {
+ syncutil.Mutex
+ tableVersion descpb.DescriptorVersion
+ boundAccount mon.BoundAccount
+ // userCache is a mapping from username to userRoleMembership.
+ userCache map[username.SQLUsername]userRoleMembership
+ // populateCacheGroup ensures that there is at most one request in-flight
+ // for each key.
+ populateCacheGroup *singleflight.Group
+ // readTS is the timestamp that was used to populate the cache.
+ readTS hlc.Timestamp
+ // internalDB is used to run the queries to populate the cache.
+ internalDB descs.DB
+ stopper *stop.Stopper
+}
+
+// userRoleMembership is a mapping of "rolename" -> "with admin option".
+type userRoleMembership map[username.SQLUsername]bool
+
+// NewMembershipCache initializes a new MembershipCache.
+func NewMembershipCache(
+ account mon.BoundAccount, internalDB descs.DB, stopper *stop.Stopper,
+) *MembershipCache {
+ return &MembershipCache{
+ boundAccount: account,
+ populateCacheGroup: singleflight.NewGroup("lookup role membership", "key"),
+ internalDB: internalDB,
+ stopper: stopper,
+ }
+}
+
+// RunAtCacheReadTS runs an operations at a timestamp that is guaranteed to
+// be consistent with the data in the cache. txn is used to check if the
+// table version matches the cached table version, and if it does, db is
+// used to start a separate transaction at the cached timestamp.
+func (m *MembershipCache) RunAtCacheReadTS(
+ ctx context.Context, db descs.DB, txn descs.Txn, f func(context.Context, descs.Txn) error,
+) error {
+ tableDesc, err := txn.Descriptors().ByIDWithLeased(txn.KV()).Get().Table(ctx, keys.RoleMembersTableID)
+ if err != nil {
+ return err
+ }
+
+ var readTS hlc.Timestamp
+ func() {
+ m.Lock()
+ defer m.Unlock()
+ if tableDesc.IsUncommittedVersion() {
+ return
+ }
+ if tableDesc.GetVersion() != m.tableVersion {
+ return
+ }
+ // The cached ts could be from long ago, so use the table modification
+ // if it's more recent.
+ if m.readTS.Less(tableDesc.GetModificationTime()) {
+ readTS = tableDesc.GetModificationTime()
+ return
+ }
+ readTS = m.readTS
+ }()
+
+ // If there's no cached read timestamp, then use the existing transaction.
+ if readTS.IsEmpty() {
+ return f(ctx, txn)
+ }
+
+ // If we found a historical timestamp to use, run in a different transaction.
+ return db.DescsTxn(ctx, func(ctx context.Context, newTxn descs.Txn) error {
+ if err := newTxn.KV().SetFixedTimestamp(ctx, readTS); err != nil {
+ return err
+ }
+ return f(ctx, newTxn)
+ })
+}
+
+// GetRolesForMember looks up all the roles 'member' belongs to (direct and
+// indirect) and returns a map of "role" -> "isAdmin".
+// The "isAdmin" flag applies to both direct and indirect members.
+// Requires a valid transaction to be open.
+func (m *MembershipCache) GetRolesForMember(
+ ctx context.Context, txn descs.Txn, member username.SQLUsername,
+) (_ map[username.SQLUsername]bool, retErr error) {
+ if txn == nil {
+ return nil, errors.AssertionFailedf("cannot use MembershipCache without a txn")
+ }
+
+ // Lookup table version.
+ tableDesc, err := txn.Descriptors().ByIDWithLeased(txn.KV()).Get().Table(ctx, keys.RoleMembersTableID)
+ if err != nil {
+ return nil, err
+ }
+
+ tableVersion := tableDesc.GetVersion()
+ if tableDesc.IsUncommittedVersion() {
+ return resolveRolesForMember(ctx, txn, member)
+ }
+ if txn.SessionData().AllowRoleMembershipsToChangeDuringTransaction {
+ defer func() {
+ if retErr != nil {
+ return
+ }
+ txn.Descriptors().ReleaseSpecifiedLeases(ctx, []lease.IDVersion{
+ {
+ Name: tableDesc.GetName(),
+ ID: tableDesc.GetID(),
+ Version: tableVersion,
+ },
+ })
+ }()
+ }
+
+ // Check version and maybe clear cache while holding the mutex.
+ // We use a closure here so that we release the lock here, then keep
+ // going and re-lock if adding the looked-up entry.
+ userMapping, found, refreshCache := func() (userRoleMembership, bool, bool) {
+ m.Lock()
+ defer m.Unlock()
+ if m.tableVersion < tableVersion {
+ // If the cache is based on an old table version, then update version and
+ // drop the map.
+ m.tableVersion = tableVersion
+ m.userCache = make(map[username.SQLUsername]userRoleMembership)
+ m.boundAccount.Empty(ctx)
+ return nil, false, true
+ } else if m.tableVersion > tableVersion {
+ // If the cache is based on a newer table version, then this transaction
+ // should not use the cached data.
+ return nil, false, true
+ }
+ userMapping, ok := m.userCache[member]
+ return userMapping, ok, len(m.userCache) == 0
+ }()
+
+ if !refreshCache {
+ // The cache always contains entries for every role that exists, so the
+ // only time we need to refresh the cache is if it has been invalidated
+ // by the tableVersion changing.
+ if found {
+ return userMapping, nil
+ } else {
+ return nil, sqlerrors.NewUndefinedUserError(member)
+ }
+ }
+
+ // If `txn` is high priority and in a retry, do not launch the singleflight to
+ // populate the cache because it can potentially cause a deadlock (see
+ // TestConcurrentGrants/concurrent-GRANTs-high-priority for a repro) and
+ // instead just issue a read using `txn` to `system.role_members` table and
+ // return the result.
+ if txn.KV().UserPriority() == roachpb.MaxUserPriority && txn.KV().Epoch() > 0 {
+ return resolveRolesForMember(ctx, txn, member)
+ }
+
+ // Lookup memberships outside the lock. There will be at most one request
+ // in-flight for version of the table. The role_memberships table version is
+ // also part of the request key so that we don't read data from an old version
+ // of the table.
+ //
+ // The singleflight closure uses a fresh transaction to prevent a data race
+ // that may occur if the context is cancelled, leading to the outer txn
+ // being cleaned up. We set the timestamp of this new transaction to be
+ // the same as the outer transaction that already read the descriptor, to
+ // ensure that we are reading from the right version of the table.
+ newTxnTimestamp := txn.KV().ReadTimestamp()
+ future, _ := m.populateCacheGroup.DoChan(ctx,
+ fmt.Sprintf("refreshMembershipCache-%d", tableVersion),
+ singleflight.DoOpts{
+ Stop: m.stopper,
+ InheritCancelation: false,
+ },
+ func(ctx context.Context) (interface{}, error) {
+ var allMemberships map[username.SQLUsername]userRoleMembership
+ err = m.internalDB.Txn(ctx, func(ctx context.Context, newTxn isql.Txn) error {
+ // Run the membership read as high-priority, thereby pushing any intents
+ // out of its way. This prevents deadlocks in cases where a GRANT/REVOKE
+ // txn, which has already laid a write intent on the
+ // `system.role_members` table, waits for `newTxn` and `newTxn`, which
+ // attempts to read the same system table, is blocked by the
+ // GRANT/REVOKE txn.
+ if err := newTxn.KV().SetUserPriority(roachpb.MaxUserPriority); err != nil {
+ return err
+ }
+ err := newTxn.KV().SetFixedTimestamp(ctx, newTxnTimestamp)
+ if err != nil {
+ return err
+ }
+ allMemberships, err = resolveAllMemberships(ctx, newTxn)
+ if err != nil {
+ return err
+ }
+ return err
+ })
+ if err != nil {
+ return nil, err
+ }
+ func() {
+ // Update membership if the table version hasn't changed.
+ m.Lock()
+ defer m.Unlock()
+ if m.tableVersion != tableVersion {
+ // Table version has changed while we were looking: don't cache the data.
+ return
+ }
+
+ // Table version remains the same: update map, unlock, return.
+ sizeOfCache := int64(0)
+ for _, memberships := range allMemberships {
+ sizeOfEntry := int64(len(member.Normalized()))
+ for m := range memberships {
+ sizeOfEntry += int64(len(m.Normalized()))
+ sizeOfEntry += memsize.Bool
+ }
+ sizeOfCache += sizeOfEntry
+ }
+ if err := m.boundAccount.Grow(ctx, sizeOfCache); err != nil {
+ // If there is no memory available to cache the entry, we can still
+ // proceed so that the query has a chance to succeed.
+ log.Ops.Warningf(ctx, "no memory available to cache role membership info: %v", err)
+ } else {
+ m.userCache = allMemberships
+ m.readTS = newTxnTimestamp
+ }
+ }()
+ return allMemberships, nil
+ })
+ var allMemberships map[username.SQLUsername]userRoleMembership
+ res := future.WaitForResult(ctx)
+ if res.Err != nil {
+ return nil, res.Err
+ }
+ allMemberships = res.Val.(map[username.SQLUsername]userRoleMembership)
+ memberships, found := allMemberships[member]
+ // The map contains entries for every role that exists, so if it's
+ // not present in the map, the role does not exist.
+ if !found {
+ return nil, sqlerrors.NewUndefinedUserError(member)
+ }
+ return memberships, nil
+}
+
+// resolveRolesForMember performs the actual recursive role membership lookup.
+func resolveRolesForMember(
+ ctx context.Context, txn isql.Txn, member username.SQLUsername,
+) (map[username.SQLUsername]bool, error) {
+ allMemberships, err := resolveAllMemberships(ctx, txn)
+ if err != nil {
+ return nil, err
+ }
+
+ // allMemberships will have an entry for every user, so we first verify that
+ // the user exists.
+ ret, roleExists := allMemberships[member]
+ if !roleExists {
+ return nil, sqlerrors.NewUndefinedUserError(member)
+ }
+
+ return ret, nil
+}
+
+// membership represents a parent-child role relationship.
+type membership struct {
+ parent, child username.SQLUsername
+ isAdmin bool
+}
+
+func resolveAllMemberships(
+ ctx context.Context, txn isql.Txn,
+) (map[username.SQLUsername]userRoleMembership, error) {
+ memberToDirectParents := make(map[username.SQLUsername][]membership)
+ if err := forEachRoleWithMemberships(
+ ctx, txn,
+ func(ctx context.Context, role username.SQLUsername, memberships []membership) error {
+ memberToDirectParents[role] = memberships
+ return nil
+ },
+ ); err != nil {
+ return nil, err
+ }
+ // We need to add entries for the node and public roles, which do not have
+ // rows in system.users.
+ memberToDirectParents[username.NodeUserName()] = append(
+ memberToDirectParents[username.NodeUserName()],
+ membership{
+ parent: username.AdminRoleName(),
+ child: username.NodeUserName(),
+ isAdmin: true,
+ },
+ )
+ memberToDirectParents[username.PublicRoleName()] = []membership{}
+
+ // Recurse through all roles associated with each user.
+ ret := make(map[username.SQLUsername]userRoleMembership)
+ for member := range memberToDirectParents {
+ memberToAllAncestors := make(map[username.SQLUsername]bool)
+ var recurse func(u username.SQLUsername)
+ recurse = func(u username.SQLUsername) {
+ for _, membership := range memberToDirectParents[u] {
+ // If the parent role was seen before, we still might need to update
+ // the isAdmin flag for that role, but there's no need to recurse
+ // through the role's ancestry again.
+ prev, alreadySeen := memberToAllAncestors[membership.parent]
+ memberToAllAncestors[membership.parent] = prev || membership.isAdmin
+ if !alreadySeen {
+ recurse(membership.parent)
+ }
+ }
+ }
+ recurse(member)
+ ret[member] = memberToAllAncestors
+ }
+ return ret, nil
+}
+
+func forEachRoleWithMemberships(
+ ctx context.Context,
+ txn isql.Txn,
+ fn func(ctx context.Context, role username.SQLUsername, memberships []membership) error,
+) (retErr error) {
+ const query = `
+SELECT
+ u.username,
+ array_agg(rm.role ORDER BY rank),
+ array_agg(rm."isAdmin" ORDER BY rank)
+FROM system.users u
+LEFT OUTER JOIN (
+ SELECT *, rank() OVER (PARTITION BY member ORDER BY role)
+ FROM system.role_members
+) rm ON u.username = rm.member
+GROUP BY u.username;`
+ it, err := txn.QueryIteratorEx(ctx, "read-role-with-memberships", txn.KV(),
+ sessiondata.NodeUserSessionDataOverride, query)
+ if err != nil {
+ return err
+ }
+ // We have to make sure to close the iterator since we might return from the
+ // for loop early (before Next() returns false).
+ defer func() { retErr = errors.CombineErrors(retErr, it.Close()) }()
+
+ var ok bool
+ var loopErr error
+ for ok, loopErr = it.Next(ctx); ok; ok, loopErr = it.Next(ctx) {
+ row := it.Cur()
+ child := tree.MustBeDString(row[0])
+ childName := username.MakeSQLUsernameFromPreNormalizedString(string(child))
+ parentNames := tree.MustBeDArray(row[1])
+ isAdmins := tree.MustBeDArray(row[2])
+
+ memberships := make([]membership, 0, parentNames.Len())
+ for i := 0; i < parentNames.Len(); i++ {
+ if parentNames.Array[i] == tree.DNull {
+ // A null element means this role has no parents.
+ continue
+ }
+ // The names in the system tables are already normalized.
+ parent := tree.MustBeDString(parentNames.Array[i])
+ parentName := username.MakeSQLUsernameFromPreNormalizedString(string(parent))
+ isAdmin := tree.MustBeDBool(isAdmins.Array[i])
+ memberships = append(memberships, membership{
+ parent: parentName,
+ child: childName,
+ isAdmin: bool(isAdmin),
+ })
+ }
+
+ if err := fn(
+ ctx,
+ childName,
+ memberships,
+ ); err != nil {
+ return err
+ }
+ }
+ if loopErr != nil {
+ return loopErr
+ }
+ return nil
+}
diff --git a/pkg/sql/row/kv_fetcher.go b/pkg/sql/row/kv_fetcher.go
index 3aaa0c56c6ad..2ab1ecaa2484 100644
--- a/pkg/sql/row/kv_fetcher.go
+++ b/pkg/sql/row/kv_fetcher.go
@@ -182,6 +182,7 @@ func NewKVFetcher(
// If maintainOrdering is true, then diskBuffer must be non-nil.
func NewStreamingKVFetcher(
distSender *kvcoord.DistSender,
+ metrics *kvstreamer.Metrics,
stopper *stop.Stopper,
txn *kv.Txn,
st *cluster.Settings,
@@ -204,6 +205,7 @@ func NewStreamingKVFetcher(
sendFn := makeSendFunc(txn, ext, &batchRequestsIssued)
streamer := kvstreamer.NewStreamer(
distSender,
+ metrics,
stopper,
txn,
sendFn,
diff --git a/pkg/sql/rowexec/joinreader.go b/pkg/sql/rowexec/joinreader.go
index ad4d4d1e25aa..d556cf39c5c6 100644
--- a/pkg/sql/rowexec/joinreader.go
+++ b/pkg/sql/rowexec/joinreader.go
@@ -557,6 +557,7 @@ func newJoinReader(
singleRowLookup := readerType == indexJoinReaderType || spec.LookupColumnsAreKey
streamingKVFetcher = row.NewStreamingKVFetcher(
flowCtx.Cfg.DistSender,
+ flowCtx.Cfg.KVStreamerMetrics,
flowCtx.Stopper(),
jr.txn,
flowCtx.Cfg.Settings,
diff --git a/pkg/sql/sessioninit/BUILD.bazel b/pkg/sql/sessioninit/BUILD.bazel
index d8ba60ce921f..6abd5b7758f3 100644
--- a/pkg/sql/sessioninit/BUILD.bazel
+++ b/pkg/sql/sessioninit/BUILD.bazel
@@ -9,6 +9,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/sql/sessioninit",
visibility = ["//visibility:public"],
deps = [
+ "//pkg/keys",
"//pkg/security/password",
"//pkg/security/username",
"//pkg/settings",
@@ -16,7 +17,6 @@ go_library(
"//pkg/sql/catalog",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/descs",
- "//pkg/sql/sem/catconstants",
"//pkg/sql/sem/tree",
"//pkg/util/log",
"//pkg/util/mon",
diff --git a/pkg/sql/sessioninit/cache.go b/pkg/sql/sessioninit/cache.go
index c3c195212b7f..4f7961801412 100644
--- a/pkg/sql/sessioninit/cache.go
+++ b/pkg/sql/sessioninit/cache.go
@@ -10,6 +10,7 @@ import (
"fmt"
"unsafe"
+ "github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/security/password"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/settings"
@@ -132,11 +133,11 @@ func (a *Cache) GetAuthInfo(
if err := txn.Descriptors().MaybeSetReplicationSafeTS(ctx, txn.KV()); err != nil {
return err
}
- _, usersTableDesc, err = descs.PrefixAndTable(ctx, txn.Descriptors().ByNameWithLeased(txn.KV()).Get(), UsersTableName)
+ usersTableDesc, err = txn.Descriptors().ByIDWithLeased(txn.KV()).Get().Table(ctx, keys.UsersTableID)
if err != nil {
return err
}
- _, roleOptionsTableDesc, err = descs.PrefixAndTable(ctx, txn.Descriptors().ByNameWithLeased(txn.KV()).Get(), RoleOptionsTableName)
+ roleOptionsTableDesc, err = txn.Descriptors().ByIDWithLeased(txn.KV()).Get().Table(ctx, keys.RoleOptionsTableID)
return err
})
if err != nil {
@@ -296,7 +297,7 @@ func (a *Cache) GetDefaultSettings(
err = db.DescsTxn(ctx, func(
ctx context.Context, txn descs.Txn,
) error {
- _, dbRoleSettingsTableDesc, err = descs.PrefixAndTable(ctx, txn.Descriptors().ByNameWithLeased(txn.KV()).Get(), DatabaseRoleSettingsTableName)
+ dbRoleSettingsTableDesc, err = txn.Descriptors().ByIDWithLeased(txn.KV()).Get().Table(ctx, keys.DatabaseRoleSettingsTableID)
if err != nil {
return err
}
diff --git a/pkg/sql/sessioninit/constants.go b/pkg/sql/sessioninit/constants.go
index 5d082fcff9e4..54bca83323ad 100644
--- a/pkg/sql/sessioninit/constants.go
+++ b/pkg/sql/sessioninit/constants.go
@@ -5,20 +5,7 @@
package sessioninit
-import (
- "github.com/cockroachdb/cockroach/pkg/security/username"
- "github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants"
- "github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
-)
-
-// UsersTableName represents system.users.
-var UsersTableName = tree.NewTableNameWithSchema("system", catconstants.PublicSchemaName, "users")
-
-// RoleOptionsTableName represents system.role_options.
-var RoleOptionsTableName = tree.NewTableNameWithSchema("system", catconstants.PublicSchemaName, "role_options")
-
-// DatabaseRoleSettingsTableName represents system.database_role_settings.
-var DatabaseRoleSettingsTableName = tree.NewTableNameWithSchema("system", catconstants.PublicSchemaName, "database_role_settings")
+import "github.com/cockroachdb/cockroach/pkg/security/username"
// defaultDatabaseID is used in the settingsCache for entries that should
// apply to all database.
diff --git a/pkg/sql/syntheticprivilege/constants.go b/pkg/sql/syntheticprivilege/constants.go
index 649e72bb5cf0..af5589cf6fd6 100644
--- a/pkg/sql/syntheticprivilege/constants.go
+++ b/pkg/sql/syntheticprivilege/constants.go
@@ -10,5 +10,5 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
)
-// SystemPrivilegesTableName represents system.database_role_settings.
+// SystemPrivilegesTableName represents system.privileges.
var SystemPrivilegesTableName = tree.NewTableNameWithSchema("system", catconstants.PublicSchemaName, "privileges")
diff --git a/pkg/sql/user.go b/pkg/sql/user.go
index b92f2c028eac..164402473ea4 100644
--- a/pkg/sql/user.go
+++ b/pkg/sql/user.go
@@ -21,7 +21,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
- "github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sessioninit"
@@ -532,12 +531,10 @@ func RoleExists(ctx context.Context, txn isql.Txn, role username.SQLUsername) (b
return row != nil, nil
}
-var roleMembersTableName = tree.MakeTableNameWithSchema("system", catconstants.PublicSchemaName, "role_members")
-
// BumpRoleMembershipTableVersion increases the table version for the
// role membership table.
func (p *planner) BumpRoleMembershipTableVersion(ctx context.Context) error {
- _, tableDesc, err := p.ResolveMutableTableDescriptor(ctx, &roleMembersTableName, true, tree.ResolveAnyTableKind)
+ tableDesc, err := p.Descriptors().MutableByID(p.Txn()).Table(ctx, keys.RoleMembersTableID)
if err != nil {
return err
}
@@ -550,7 +547,7 @@ func (p *planner) BumpRoleMembershipTableVersion(ctx context.Context) error {
// bumpUsersTableVersion increases the table version for the
// users table.
func (p *planner) bumpUsersTableVersion(ctx context.Context) error {
- _, tableDesc, err := p.ResolveMutableTableDescriptor(ctx, sessioninit.UsersTableName, true, tree.ResolveAnyTableKind)
+ tableDesc, err := p.Descriptors().MutableByID(p.Txn()).Table(ctx, keys.UsersTableID)
if err != nil {
return err
}
@@ -563,7 +560,7 @@ func (p *planner) bumpUsersTableVersion(ctx context.Context) error {
// bumpRoleOptionsTableVersion increases the table version for the
// role_options table.
func (p *planner) bumpRoleOptionsTableVersion(ctx context.Context) error {
- _, tableDesc, err := p.ResolveMutableTableDescriptor(ctx, sessioninit.RoleOptionsTableName, true, tree.ResolveAnyTableKind)
+ tableDesc, err := p.Descriptors().MutableByID(p.Txn()).Table(ctx, keys.RoleOptionsTableID)
if err != nil {
return err
}
@@ -576,7 +573,7 @@ func (p *planner) bumpRoleOptionsTableVersion(ctx context.Context) error {
// bumpDatabaseRoleSettingsTableVersion increases the table version for the
// database_role_settings table.
func (p *planner) bumpDatabaseRoleSettingsTableVersion(ctx context.Context) error {
- _, tableDesc, err := p.ResolveMutableTableDescriptor(ctx, sessioninit.DatabaseRoleSettingsTableName, true, tree.ResolveAnyTableKind)
+ tableDesc, err := p.Descriptors().MutableByID(p.Txn()).Table(ctx, keys.DatabaseRoleSettingsTableID)
if err != nil {
return err
}