diff --git a/cloud/kubernetes/bring-your-own-certs/client.yaml b/cloud/kubernetes/bring-your-own-certs/client.yaml
index b74b03972f73..928d9a17e92c 100644
--- a/cloud/kubernetes/bring-your-own-certs/client.yaml
+++ b/cloud/kubernetes/bring-your-own-certs/client.yaml
@@ -20,7 +20,7 @@ spec:
serviceAccountName: cockroachdb
containers:
- name: cockroachdb-client
- image: cockroachdb/cockroach:v22.1.5
+ image: cockroachdb/cockroach:v22.1.6
# Keep a pod open indefinitely so kubectl exec can be used to get a shell to it
# and run cockroach client commands, such as cockroach sql, cockroach node status, etc.
command:
diff --git a/cloud/kubernetes/bring-your-own-certs/cockroachdb-statefulset.yaml b/cloud/kubernetes/bring-your-own-certs/cockroachdb-statefulset.yaml
index d563836772d3..bb7361943198 100644
--- a/cloud/kubernetes/bring-your-own-certs/cockroachdb-statefulset.yaml
+++ b/cloud/kubernetes/bring-your-own-certs/cockroachdb-statefulset.yaml
@@ -153,7 +153,7 @@ spec:
topologyKey: kubernetes.io/hostname
containers:
- name: cockroachdb
- image: cockroachdb/cockroach:v22.1.5
+ image: cockroachdb/cockroach:v22.1.6
imagePullPolicy: IfNotPresent
# TODO: Change these to appropriate values for the hardware that you're running. You can see
# the resources that can be allocated on each of your Kubernetes nodes by running:
diff --git a/cloud/kubernetes/client-secure.yaml b/cloud/kubernetes/client-secure.yaml
index 865482746577..0825ba7c0618 100644
--- a/cloud/kubernetes/client-secure.yaml
+++ b/cloud/kubernetes/client-secure.yaml
@@ -32,7 +32,7 @@ spec:
mountPath: /cockroach-certs
containers:
- name: cockroachdb-client
- image: cockroachdb/cockroach:v22.1.5
+ image: cockroachdb/cockroach:v22.1.6
imagePullPolicy: IfNotPresent
volumeMounts:
- name: client-certs
diff --git a/cloud/kubernetes/cluster-init-secure.yaml b/cloud/kubernetes/cluster-init-secure.yaml
index 37740a41f07c..15d82040f4a8 100644
--- a/cloud/kubernetes/cluster-init-secure.yaml
+++ b/cloud/kubernetes/cluster-init-secure.yaml
@@ -34,7 +34,7 @@ spec:
mountPath: /cockroach-certs
containers:
- name: cluster-init
- image: cockroachdb/cockroach:v22.1.5
+ image: cockroachdb/cockroach:v22.1.6
imagePullPolicy: IfNotPresent
volumeMounts:
- name: client-certs
diff --git a/cloud/kubernetes/cluster-init.yaml b/cloud/kubernetes/cluster-init.yaml
index 7965eb73dd9d..6590ba127540 100644
--- a/cloud/kubernetes/cluster-init.yaml
+++ b/cloud/kubernetes/cluster-init.yaml
@@ -10,7 +10,7 @@ spec:
spec:
containers:
- name: cluster-init
- image: cockroachdb/cockroach:v22.1.5
+ image: cockroachdb/cockroach:v22.1.6
imagePullPolicy: IfNotPresent
command:
- "/cockroach/cockroach"
diff --git a/cloud/kubernetes/cockroachdb-statefulset-secure.yaml b/cloud/kubernetes/cockroachdb-statefulset-secure.yaml
index 404412c2d645..9f7e13eb1b3c 100644
--- a/cloud/kubernetes/cockroachdb-statefulset-secure.yaml
+++ b/cloud/kubernetes/cockroachdb-statefulset-secure.yaml
@@ -195,7 +195,7 @@ spec:
topologyKey: kubernetes.io/hostname
containers:
- name: cockroachdb
- image: cockroachdb/cockroach:v22.1.5
+ image: cockroachdb/cockroach:v22.1.6
imagePullPolicy: IfNotPresent
# TODO: Change these to appropriate values for the hardware that you're running. You can see
# the resources that can be allocated on each of your Kubernetes nodes by running:
diff --git a/cloud/kubernetes/cockroachdb-statefulset.yaml b/cloud/kubernetes/cockroachdb-statefulset.yaml
index 92ef0325bdb1..e3e8a7cc8537 100644
--- a/cloud/kubernetes/cockroachdb-statefulset.yaml
+++ b/cloud/kubernetes/cockroachdb-statefulset.yaml
@@ -98,7 +98,7 @@ spec:
topologyKey: kubernetes.io/hostname
containers:
- name: cockroachdb
- image: cockroachdb/cockroach:v22.1.5
+ image: cockroachdb/cockroach:v22.1.6
imagePullPolicy: IfNotPresent
# TODO: Change these to appropriate values for the hardware that you're running. You can see
# the resources that can be allocated on each of your Kubernetes nodes by running:
diff --git a/cloud/kubernetes/multiregion/client-secure.yaml b/cloud/kubernetes/multiregion/client-secure.yaml
index 647cf92e2c9a..5d4f02244068 100644
--- a/cloud/kubernetes/multiregion/client-secure.yaml
+++ b/cloud/kubernetes/multiregion/client-secure.yaml
@@ -9,7 +9,7 @@ spec:
serviceAccountName: cockroachdb
containers:
- name: cockroachdb-client
- image: cockroachdb/cockroach:v22.1.5
+ image: cockroachdb/cockroach:v22.1.6
imagePullPolicy: IfNotPresent
volumeMounts:
- name: client-certs
diff --git a/cloud/kubernetes/multiregion/cluster-init-secure.yaml b/cloud/kubernetes/multiregion/cluster-init-secure.yaml
index 8644f1186a6f..3edebae25d15 100644
--- a/cloud/kubernetes/multiregion/cluster-init-secure.yaml
+++ b/cloud/kubernetes/multiregion/cluster-init-secure.yaml
@@ -11,7 +11,7 @@ spec:
serviceAccountName: cockroachdb
containers:
- name: cluster-init
- image: cockroachdb/cockroach:v22.1.5
+ image: cockroachdb/cockroach:v22.1.6
imagePullPolicy: IfNotPresent
volumeMounts:
- name: client-certs
diff --git a/cloud/kubernetes/multiregion/cockroachdb-statefulset-secure.yaml b/cloud/kubernetes/multiregion/cockroachdb-statefulset-secure.yaml
index 99ef2e6c6c0f..d7878ce7d61c 100644
--- a/cloud/kubernetes/multiregion/cockroachdb-statefulset-secure.yaml
+++ b/cloud/kubernetes/multiregion/cockroachdb-statefulset-secure.yaml
@@ -167,7 +167,7 @@ spec:
topologyKey: kubernetes.io/hostname
containers:
- name: cockroachdb
- image: cockroachdb/cockroach:v22.1.5
+ image: cockroachdb/cockroach:v22.1.6
imagePullPolicy: IfNotPresent
ports:
- containerPort: 26257
diff --git a/cloud/kubernetes/multiregion/eks/cockroachdb-statefulset-secure-eks.yaml b/cloud/kubernetes/multiregion/eks/cockroachdb-statefulset-secure-eks.yaml
index f550e760d211..dea7298988ff 100644
--- a/cloud/kubernetes/multiregion/eks/cockroachdb-statefulset-secure-eks.yaml
+++ b/cloud/kubernetes/multiregion/eks/cockroachdb-statefulset-secure-eks.yaml
@@ -185,7 +185,7 @@ spec:
name: cockroach-env
containers:
- name: cockroachdb
- image: cockroachdb/cockroach:v22.1.5
+ image: cockroachdb/cockroach:v22.1.6
imagePullPolicy: IfNotPresent
# TODO: Change these to appropriate values for the hardware that you're running. You can see
# the resources that can be allocated on each of your Kubernetes nodes by running:
diff --git a/cloud/kubernetes/performance/cockroachdb-daemonset-insecure.yaml b/cloud/kubernetes/performance/cockroachdb-daemonset-insecure.yaml
index 70da01e9d0fe..19c058da5a13 100644
--- a/cloud/kubernetes/performance/cockroachdb-daemonset-insecure.yaml
+++ b/cloud/kubernetes/performance/cockroachdb-daemonset-insecure.yaml
@@ -82,7 +82,7 @@ spec:
hostNetwork: true
containers:
- name: cockroachdb
- image: cockroachdb/cockroach:v22.1.5
+ image: cockroachdb/cockroach:v22.1.6
imagePullPolicy: IfNotPresent
# TODO: If you configured taints to give CockroachDB exclusive access to nodes, feel free
# to remove the requests and limits sections. If you didn't, you'll need to change these to
diff --git a/cloud/kubernetes/performance/cockroachdb-daemonset-secure.yaml b/cloud/kubernetes/performance/cockroachdb-daemonset-secure.yaml
index 698c300d47dc..eaff5065f2a1 100644
--- a/cloud/kubernetes/performance/cockroachdb-daemonset-secure.yaml
+++ b/cloud/kubernetes/performance/cockroachdb-daemonset-secure.yaml
@@ -198,7 +198,7 @@ spec:
topologyKey: kubernetes.io/hostname
containers:
- name: cockroachdb
- image: cockroachdb/cockroach:v22.1.5
+ image: cockroachdb/cockroach:v22.1.6
imagePullPolicy: IfNotPresent
# TODO: If you configured taints to give CockroachDB exclusive access to nodes, feel free
# to remove the requests and limits sections. If you didn't, you'll need to change these to
diff --git a/cloud/kubernetes/performance/cockroachdb-statefulset-insecure.yaml b/cloud/kubernetes/performance/cockroachdb-statefulset-insecure.yaml
index 6eb9b03ce254..16bfcda66164 100644
--- a/cloud/kubernetes/performance/cockroachdb-statefulset-insecure.yaml
+++ b/cloud/kubernetes/performance/cockroachdb-statefulset-insecure.yaml
@@ -141,7 +141,7 @@ spec:
- name: cockroachdb
# NOTE: Always use the most recent version of CockroachDB for the best
# performance and reliability.
- image: cockroachdb/cockroach:v22.1.5
+ image: cockroachdb/cockroach:v22.1.6
imagePullPolicy: IfNotPresent
# TODO: Change these to appropriate values for the hardware that you're running. You can see
# the resources that can be allocated on each of your Kubernetes nodes by running:
diff --git a/cloud/kubernetes/performance/cockroachdb-statefulset-secure.yaml b/cloud/kubernetes/performance/cockroachdb-statefulset-secure.yaml
index cb66b616eb8e..ca8054e7b618 100644
--- a/cloud/kubernetes/performance/cockroachdb-statefulset-secure.yaml
+++ b/cloud/kubernetes/performance/cockroachdb-statefulset-secure.yaml
@@ -232,7 +232,7 @@ spec:
- name: cockroachdb
# NOTE: Always use the most recent version of CockroachDB for the best
# performance and reliability.
- image: cockroachdb/cockroach:v22.1.5
+ image: cockroachdb/cockroach:v22.1.6
imagePullPolicy: IfNotPresent
# TODO: Change these to appropriate values for the hardware that you're running. You can see
# the resources that can be allocated on each of your Kubernetes nodes by running:
diff --git a/cloud/kubernetes/v1.6/client-secure.yaml b/cloud/kubernetes/v1.6/client-secure.yaml
index bd0ef6392626..8b6b20ded119 100644
--- a/cloud/kubernetes/v1.6/client-secure.yaml
+++ b/cloud/kubernetes/v1.6/client-secure.yaml
@@ -32,7 +32,7 @@ spec:
mountPath: /cockroach-certs
containers:
- name: cockroachdb-client
- image: cockroachdb/cockroach:v22.1.5
+ image: cockroachdb/cockroach:v22.1.6
imagePullPolicy: IfNotPresent
volumeMounts:
- name: client-certs
diff --git a/cloud/kubernetes/v1.6/cluster-init-secure.yaml b/cloud/kubernetes/v1.6/cluster-init-secure.yaml
index 2e4f29e212a4..d3af9aa321aa 100644
--- a/cloud/kubernetes/v1.6/cluster-init-secure.yaml
+++ b/cloud/kubernetes/v1.6/cluster-init-secure.yaml
@@ -34,7 +34,7 @@ spec:
mountPath: /cockroach-certs
containers:
- name: cluster-init
- image: cockroachdb/cockroach:v22.1.5
+ image: cockroachdb/cockroach:v22.1.6
imagePullPolicy: IfNotPresent
volumeMounts:
- name: client-certs
diff --git a/cloud/kubernetes/v1.6/cluster-init.yaml b/cloud/kubernetes/v1.6/cluster-init.yaml
index 19d8acc3e1ec..03e5c1ecb1ad 100644
--- a/cloud/kubernetes/v1.6/cluster-init.yaml
+++ b/cloud/kubernetes/v1.6/cluster-init.yaml
@@ -10,7 +10,7 @@ spec:
spec:
containers:
- name: cluster-init
- image: cockroachdb/cockroach:v22.1.5
+ image: cockroachdb/cockroach:v22.1.6
imagePullPolicy: IfNotPresent
command:
- "/cockroach/cockroach"
diff --git a/cloud/kubernetes/v1.6/cockroachdb-statefulset-secure.yaml b/cloud/kubernetes/v1.6/cockroachdb-statefulset-secure.yaml
index 11b7df75999b..23539ef8a9f0 100644
--- a/cloud/kubernetes/v1.6/cockroachdb-statefulset-secure.yaml
+++ b/cloud/kubernetes/v1.6/cockroachdb-statefulset-secure.yaml
@@ -178,7 +178,7 @@ spec:
topologyKey: kubernetes.io/hostname
containers:
- name: cockroachdb
- image: cockroachdb/cockroach:v22.1.5
+ image: cockroachdb/cockroach:v22.1.6
imagePullPolicy: IfNotPresent
ports:
- containerPort: 26257
diff --git a/cloud/kubernetes/v1.6/cockroachdb-statefulset.yaml b/cloud/kubernetes/v1.6/cockroachdb-statefulset.yaml
index 20b6b62590ec..c7c565dc8fb3 100644
--- a/cloud/kubernetes/v1.6/cockroachdb-statefulset.yaml
+++ b/cloud/kubernetes/v1.6/cockroachdb-statefulset.yaml
@@ -81,7 +81,7 @@ spec:
topologyKey: kubernetes.io/hostname
containers:
- name: cockroachdb
- image: cockroachdb/cockroach:v22.1.5
+ image: cockroachdb/cockroach:v22.1.6
imagePullPolicy: IfNotPresent
ports:
- containerPort: 26257
diff --git a/cloud/kubernetes/v1.7/client-secure.yaml b/cloud/kubernetes/v1.7/client-secure.yaml
index abc5e54c82fb..683488a3e673 100644
--- a/cloud/kubernetes/v1.7/client-secure.yaml
+++ b/cloud/kubernetes/v1.7/client-secure.yaml
@@ -32,7 +32,7 @@ spec:
mountPath: /cockroach-certs
containers:
- name: cockroachdb-client
- image: cockroachdb/cockroach:v22.1.5
+ image: cockroachdb/cockroach:v22.1.6
imagePullPolicy: IfNotPresent
volumeMounts:
- name: client-certs
diff --git a/cloud/kubernetes/v1.7/cluster-init-secure.yaml b/cloud/kubernetes/v1.7/cluster-init-secure.yaml
index 19c7586122d3..26b4be34f16f 100644
--- a/cloud/kubernetes/v1.7/cluster-init-secure.yaml
+++ b/cloud/kubernetes/v1.7/cluster-init-secure.yaml
@@ -34,7 +34,7 @@ spec:
mountPath: /cockroach-certs
containers:
- name: cluster-init
- image: cockroachdb/cockroach:v22.1.5
+ image: cockroachdb/cockroach:v22.1.6
imagePullPolicy: IfNotPresent
volumeMounts:
- name: client-certs
diff --git a/cloud/kubernetes/v1.7/cluster-init.yaml b/cloud/kubernetes/v1.7/cluster-init.yaml
index 9701bc3fc6d2..f3256cd2df5e 100644
--- a/cloud/kubernetes/v1.7/cluster-init.yaml
+++ b/cloud/kubernetes/v1.7/cluster-init.yaml
@@ -10,7 +10,7 @@ spec:
spec:
containers:
- name: cluster-init
- image: cockroachdb/cockroach:v22.1.5
+ image: cockroachdb/cockroach:v22.1.6
imagePullPolicy: IfNotPresent
command:
- "/cockroach/cockroach"
diff --git a/cloud/kubernetes/v1.7/cockroachdb-statefulset-secure.yaml b/cloud/kubernetes/v1.7/cockroachdb-statefulset-secure.yaml
index 0ca41639cf73..9dc1039845ee 100644
--- a/cloud/kubernetes/v1.7/cockroachdb-statefulset-secure.yaml
+++ b/cloud/kubernetes/v1.7/cockroachdb-statefulset-secure.yaml
@@ -190,7 +190,7 @@ spec:
topologyKey: kubernetes.io/hostname
containers:
- name: cockroachdb
- image: cockroachdb/cockroach:v22.1.5
+ image: cockroachdb/cockroach:v22.1.6
imagePullPolicy: IfNotPresent
ports:
- containerPort: 26257
diff --git a/cloud/kubernetes/v1.7/cockroachdb-statefulset.yaml b/cloud/kubernetes/v1.7/cockroachdb-statefulset.yaml
index 501b6e6d815c..b4be69a79aa6 100644
--- a/cloud/kubernetes/v1.7/cockroachdb-statefulset.yaml
+++ b/cloud/kubernetes/v1.7/cockroachdb-statefulset.yaml
@@ -93,7 +93,7 @@ spec:
topologyKey: kubernetes.io/hostname
containers:
- name: cockroachdb
- image: cockroachdb/cockroach:v22.1.5
+ image: cockroachdb/cockroach:v22.1.6
imagePullPolicy: IfNotPresent
ports:
- containerPort: 26257
diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html
index 38826353bf76..199ca927ddce 100644
--- a/docs/generated/settings/settings.html
+++ b/docs/generated/settings/settings.html
@@ -56,6 +56,7 @@
kv.snapshot_delegation.enabled
boolean
false
set to true to allow snapshots from follower replicas
kv.snapshot_rebalance.max_rate
byte size
32 MiB
the rate limit (bytes/sec) to use for rebalance and upreplication snapshots
kv.snapshot_recovery.max_rate
byte size
32 MiB
the rate limit (bytes/sec) to use for recovery snapshots
+
kv.store.admission.provisioned_bandwidth
byte size
0 B
if set to a non-zero value, this is used as the provisioned bandwidth (in bytes/s), for each store. It can be over-ridden on a per-store basis using the --store flag
kv.transaction.max_intents_bytes
integer
4194304
maximum number of bytes used to track locks in transactions
kv.transaction.max_refresh_spans_bytes
integer
4194304
maximum number of bytes used to track refresh spans in serializable transactions
if set, transactions that exceed their lock tracking budget (kv.transaction.max_intents_bytes) are rejected instead of having their lock spans imprecisely compressed
diff --git a/pkg/base/store_spec.go b/pkg/base/store_spec.go
index a8d6c77a9a2e..3b1de84d2a7e 100644
--- a/pkg/base/store_spec.go
+++ b/pkg/base/store_spec.go
@@ -161,6 +161,86 @@ func (ss *SizeSpec) Set(value string) error {
return nil
}
+// ProvisionedRateSpec is an optional part of the StoreSpec.
+//
+// TODO(sumeer): We should map the file path specified in the store spec to
+// the disk name. df can be used to map paths to names like /dev/nvme1n1 and
+// /dev/sdb (these examples are from AWS EBS and GCP PD respectively) and the
+// corresponding names produced by disk_counters.go are nvme1n1 and sdb
+// respectively. We need to find or write a platform independent library --
+// see the discussion on
+// https://github.com/cockroachdb/cockroach/pull/86063#pullrequestreview-1074487018.
+// With that change, the ProvisionedRateSpec would only be needed to override
+// the cluster setting when there are heterogenous bandwidth limits in a
+// cluster (there would be no more DiskName field).
+type ProvisionedRateSpec struct {
+ // DiskName is the name of the disk observed by the code in disk_counters.go
+ // when retrieving stats for this store.
+ DiskName string
+ // ProvisionedBandwidth is the bandwidth provisioned for this store in
+ // bytes/s.
+ ProvisionedBandwidth int64
+}
+
+func newStoreProvisionedRateSpec(
+ field redact.SafeString, value string,
+) (ProvisionedRateSpec, error) {
+ var spec ProvisionedRateSpec
+ used := make(map[string]struct{})
+ for _, split := range strings.Split(value, ":") {
+ if len(split) == 0 {
+ continue
+ }
+ subSplits := strings.Split(split, "=")
+ if len(subSplits) != 2 {
+ return ProvisionedRateSpec{}, errors.Errorf("%s field has invalid value %s", field, value)
+ }
+ subField := subSplits[0]
+ subValue := subSplits[1]
+ if _, ok := used[subField]; ok {
+ return ProvisionedRateSpec{}, errors.Errorf("%s field has duplicate sub-field %s",
+ field, subField)
+ }
+ used[subField] = struct{}{}
+ if len(subField) == 0 {
+ continue
+ }
+ if len(subValue) == 0 {
+ return ProvisionedRateSpec{},
+ errors.Errorf("%s field has no value specified for sub-field %s", field, subField)
+ }
+ switch subField {
+ case "disk-name":
+ spec.DiskName = subValue
+ case "bandwidth":
+ if len(subValue) <= 2 || subValue[len(subValue)-2:] != "/s" {
+ return ProvisionedRateSpec{},
+ errors.Errorf("%s field does not have bandwidth sub-field %s ending in /s",
+ field, subValue)
+ }
+ subValue = subValue[:len(subValue)-2]
+ var err error
+ spec.ProvisionedBandwidth, err = humanizeutil.ParseBytes(subValue)
+ if err != nil {
+ return ProvisionedRateSpec{},
+ errors.Wrapf(err, "could not parse bandwidth in field %s", field)
+ }
+ if spec.ProvisionedBandwidth == 0 {
+ return ProvisionedRateSpec{},
+ errors.Errorf("%s field is trying to set bandwidth to 0", field)
+ }
+ default:
+ return ProvisionedRateSpec{}, errors.Errorf("%s field has unknown sub-field %s",
+ field, subField)
+ }
+ }
+ if len(spec.DiskName) == 0 {
+ return ProvisionedRateSpec{},
+ errors.Errorf("%s field did not specify disk-name", field)
+ }
+ return spec, nil
+}
+
// StoreSpec contains the details that can be specified in the cli pertaining
// to the --store flag.
type StoreSpec struct {
@@ -189,6 +269,8 @@ type StoreSpec struct {
// through to C CCL code to set up encryption-at-rest. Must be set if and
// only if encryption is enabled, otherwise left empty.
EncryptionOptions []byte
+ // ProvisionedRateSpec is optional.
+ ProvisionedRateSpec ProvisionedRateSpec
}
// String returns a fully parsable version of the store spec.
@@ -231,6 +313,16 @@ func (ss StoreSpec) String() string {
fmt.Fprint(&buffer, optsStr)
fmt.Fprint(&buffer, ",")
}
+ if len(ss.ProvisionedRateSpec.DiskName) > 0 {
+ fmt.Fprintf(&buffer, "provisioned-rate=disk-name=%s",
+ ss.ProvisionedRateSpec.DiskName)
+ if ss.ProvisionedRateSpec.ProvisionedBandwidth > 0 {
+ fmt.Fprintf(&buffer, ":bandwidth=%s/s,",
+ humanizeutil.IBytes(ss.ProvisionedRateSpec.ProvisionedBandwidth))
+ } else {
+ fmt.Fprintf(&buffer, ",")
+ }
+ }
// Trim the extra comma from the end if it exists.
if l := buffer.Len(); l > 0 {
buffer.Truncate(l - 1)
@@ -259,7 +351,7 @@ var fractionRegex = regexp.MustCompile(`^([-]?([0-9]+\.[0-9]*|[0-9]*\.[0-9]+|[0-
// NewStoreSpec parses the string passed into a --store flag and returns a
// StoreSpec if it is correctly parsed.
-// There are four possible fields that can be passed in, comma separated:
+// There are five possible fields that can be passed in, comma separated:
// - path=xxx The directory in which to the rocks db instance should be
// located, required unless using a in memory storage.
// - type=mem This specifies that the store is an in memory storage instead of
@@ -273,6 +365,10 @@ var fractionRegex = regexp.MustCompile(`^([-]?([0-9]+\.[0-9]*|[0-9]*\.[0-9]+|[0-
// - 20% -> 20% of the available space
// - 0.2 -> 20% of the available space
// - attrs=xxx:yyy:zzz A colon separated list of optional attributes.
+// - provisioned-rate=disk-name=[:bandwidth=] The
+// provisioned-rate can be used for admission control for operations on the
+// store. The bandwidth is optional, and if unspecified, a cluster setting
+// (kv.store.admission.provisioned_bandwidth) will be used.
// Note that commas are forbidden within any field name or value.
func NewStoreSpec(value string) (StoreSpec, error) {
const pathField = "path"
@@ -399,6 +495,13 @@ func NewStoreSpec(value string) (StoreSpec, error) {
return StoreSpec{}, err
}
ss.PebbleOptions = buf.String()
+ case "provisioned-rate":
+ rateSpec, err := newStoreProvisionedRateSpec("provisioned-rate", value)
+ if err != nil {
+ return StoreSpec{}, err
+ }
+ ss.ProvisionedRateSpec = rateSpec
+
default:
return StoreSpec{}, fmt.Errorf("%s is not a valid store field", field)
}
diff --git a/pkg/base/store_spec_test.go b/pkg/base/store_spec_test.go
index f7fd65984222..28e460568d2b 100644
--- a/pkg/base/store_spec_test.go
+++ b/pkg/base/store_spec_test.go
@@ -151,6 +151,14 @@ target_file_size=2097152`
{"path=/mnt/hda1,type=other", "other is not a valid store type", StoreSpec{}},
{"path=/mnt/hda1,type=mem,size=20GiB", "path specified for in memory store", StoreSpec{}},
+ // provisioned rate
+ {"path=/mnt/hda1,provisioned-rate=disk-name=nvme1n1:bandwidth=200MiB/s", "",
+ StoreSpec{Path: "/mnt/hda1", ProvisionedRateSpec: base.ProvisionedRateSpec{
+ DiskName: "nvme1n1", ProvisionedBandwidth: 200 << 20}}},
+ {"path=/mnt/hda1,provisioned-rate=disk-name=sdb", "", StoreSpec{
+ Path: "/mnt/hda1", ProvisionedRateSpec: base.ProvisionedRateSpec{
+ DiskName: "sdb", ProvisionedBandwidth: 0}}},
+
// RocksDB
{"path=/,rocksdb=key1=val1;key2=val2", "", StoreSpec{Path: "/", RocksDBOptions: "key1=val1;key2=val2"}},
diff --git a/pkg/ccl/streamingccl/event.go b/pkg/ccl/streamingccl/event.go
index d04af2a9b1c6..4491b7cad25f 100644
--- a/pkg/ccl/streamingccl/event.go
+++ b/pkg/ccl/streamingccl/event.go
@@ -50,7 +50,7 @@ type Event interface {
// GetResolvedSpans returns a list of span-time pairs indicating the time for
// which all KV events within that span has been emitted.
- GetResolvedSpans() *[]jobspb.ResolvedSpan
+ GetResolvedSpans() []jobspb.ResolvedSpan
}
// kvEvent is a key value pair that needs to be ingested.
@@ -81,7 +81,7 @@ func (kve kvEvent) GetDeleteRange() *roachpb.RangeFeedDeleteRange {
}
// GetResolvedSpans implements the Event interface.
-func (kve kvEvent) GetResolvedSpans() *[]jobspb.ResolvedSpan {
+func (kve kvEvent) GetResolvedSpans() []jobspb.ResolvedSpan {
return nil
}
@@ -111,7 +111,7 @@ func (sste sstableEvent) GetDeleteRange() *roachpb.RangeFeedDeleteRange {
}
// GetResolvedSpans implements the Event interface.
-func (sste sstableEvent) GetResolvedSpans() *[]jobspb.ResolvedSpan {
+func (sste sstableEvent) GetResolvedSpans() []jobspb.ResolvedSpan {
return nil
}
@@ -143,7 +143,7 @@ func (dre delRangeEvent) GetDeleteRange() *roachpb.RangeFeedDeleteRange {
}
// GetResolvedSpans implements the Event interface.
-func (dre delRangeEvent) GetResolvedSpans() *[]jobspb.ResolvedSpan {
+func (dre delRangeEvent) GetResolvedSpans() []jobspb.ResolvedSpan {
return nil
}
@@ -178,8 +178,8 @@ func (ce checkpointEvent) GetDeleteRange() *roachpb.RangeFeedDeleteRange {
}
// GetResolvedSpans implements the Event interface.
-func (ce checkpointEvent) GetResolvedSpans() *[]jobspb.ResolvedSpan {
- return &ce.resolvedSpans
+func (ce checkpointEvent) GetResolvedSpans() []jobspb.ResolvedSpan {
+ return ce.resolvedSpans
}
// MakeKVEvent creates an Event from a KV.
diff --git a/pkg/ccl/streamingccl/streamclient/client_test.go b/pkg/ccl/streamingccl/streamclient/client_test.go
index 1a4351e1c55d..c2c0e98517b1 100644
--- a/pkg/ccl/streamingccl/streamclient/client_test.go
+++ b/pkg/ccl/streamingccl/streamclient/client_test.go
@@ -124,8 +124,6 @@ func TestGetFirstActiveClient(t *testing.T) {
defer func() {
require.NoError(t, client.Close(context.Background()))
}()
- interceptable, ok := client.(InterceptableStreamClient)
- require.True(t, ok)
streamAddresses := []string{
"randomgen://test0/",
@@ -142,7 +140,7 @@ func TestGetFirstActiveClient(t *testing.T) {
}
// Track dials and error for all but test3 and test4
- interceptable.RegisterDialInterception(func(streamURL *url.URL) error {
+ client.RegisterDialInterception(func(streamURL *url.URL) error {
addr := streamURL.String()
addressDialCount[addr]++
if addr != streamAddresses[3] && addr != streamAddresses[4] {
@@ -151,7 +149,7 @@ func TestGetFirstActiveClient(t *testing.T) {
return nil
})
- client, err := GetFirstActiveClient(context.Background(), streamAddresses)
+ activeClient, err := GetFirstActiveClient(context.Background(), streamAddresses)
require.NoError(t, err)
// Should've dialed the valid schemes up to the 5th one where it should've
@@ -165,7 +163,7 @@ func TestGetFirstActiveClient(t *testing.T) {
require.Equal(t, 0, addressDialCount[streamAddresses[6]])
// The 5th should've succeded as it was a valid scheme and succeeded Dial
- require.Equal(t, client.(*randomStreamClient).streamURL.String(), streamAddresses[4])
+ require.Equal(t, activeClient.(*RandomStreamClient).streamURL.String(), streamAddresses[4])
}
// ExampleClientUsage serves as documentation to indicate how a stream
@@ -243,7 +241,7 @@ func ExampleClient() {
case streamingccl.CheckpointEvent:
ingested.Lock()
minTS := hlc.MaxTimestamp
- for _, rs := range *event.GetResolvedSpans() {
+ for _, rs := range event.GetResolvedSpans() {
if rs.Timestamp.Less(minTS) {
minTS = rs.Timestamp
}
diff --git a/pkg/ccl/streamingccl/streamclient/random_stream_client.go b/pkg/ccl/streamingccl/streamclient/random_stream_client.go
index 397f75941252..a16b9daf32f3 100644
--- a/pkg/ccl/streamingccl/streamclient/random_stream_client.go
+++ b/pkg/ccl/streamingccl/streamclient/random_stream_client.go
@@ -52,15 +52,17 @@ const (
// EventFrequency is the frequency in nanoseconds that the stream will emit
// randomly generated KV events.
EventFrequency = "EVENT_FREQUENCY"
- // KVsPerCheckpoint controls approximately how many KV events should be emitted
- // between checkpoint events.
- KVsPerCheckpoint = "KVS_PER_CHECKPOINT"
+ // EventsPerCheckpoint controls approximately how many data events (KV/SST/DelRange)
+ // should be emitted between checkpoint events.
+ EventsPerCheckpoint = "EVENTS_PER_CHECKPOINT"
// NumPartitions controls the number of partitions the client will stream data
// back on. Each partition will encompass a single table span.
NumPartitions = "NUM_PARTITIONS"
- // DupProbability controls the probability with which we emit duplicate KV
+ // DupProbability controls the probability with which we emit duplicate data
// events.
DupProbability = "DUP_PROBABILITY"
+ // SSTProbability controls the probability with which we emit SST event.
+ SSTProbability = "SST_PROBABILITY"
// TenantID specifies the ID of the tenant we are ingesting data into. This
// allows the client to prefix the generated KVs with the appropriate tenant
// prefix.
@@ -74,8 +76,8 @@ const (
)
// TODO(dt): just make interceptors a singleton, not the whole client.
-var randomStreamClientSingleton = func() *randomStreamClient {
- c := randomStreamClient{}
+var randomStreamClientSingleton = func() *RandomStreamClient {
+ c := RandomStreamClient{}
c.mu.tableID = 52
return &c
}()
@@ -83,7 +85,7 @@ var randomStreamClientSingleton = func() *randomStreamClient {
// GetRandomStreamClientSingletonForTesting returns the singleton instance of
// the client. This is to be used in testing, when interceptors can be
// registered on the client to observe events.
-func GetRandomStreamClientSingletonForTesting() Client {
+func GetRandomStreamClientSingletonForTesting() *RandomStreamClient {
return randomStreamClientSingleton
}
@@ -99,45 +101,32 @@ type DialInterceptFn func(streamURL *url.URL) error
// Heartbeat.
type HeartbeatInterceptFn func(timestamp hlc.Timestamp)
-// InterceptableStreamClient wraps a Client, and provides a method to register
-// interceptor methods that are run on every streamed Event.
-type InterceptableStreamClient interface {
- Client
-
- // RegisterInterception is how you can register your interceptor to be called
- // from an InterceptableStreamClient.
- RegisterInterception(fn InterceptFn)
-
- // RegisterDialInterception registers an interceptor to be called
- // whenever Dial is called on the client.
- RegisterDialInterception(fn DialInterceptFn)
- // RegisterHeartbeatInterception registers an interceptor to be called
- // whenever Heartbeat is called on the client.
- RegisterHeartbeatInterception(fn HeartbeatInterceptFn)
-
- // ClearInterceptors clears all registered interceptors on the client.
- ClearInterceptors()
-}
+// SSTableMakerFn is a function that generates RangeFeedSSTable event
+// with a given list of roachpb.KeyValue.
+type SSTableMakerFn func(keyValues []roachpb.KeyValue) roachpb.RangeFeedSSTable
// randomStreamConfig specifies the variables that controls the rate and type of
// events that the generated stream emits.
type randomStreamConfig struct {
- valueRange int
- eventFrequency time.Duration
- kvsPerCheckpoint int
- numPartitions int
- dupProbability float64
- tenantID roachpb.TenantID
+ valueRange int
+ eventFrequency time.Duration
+ eventsPerCheckpoint int
+ numPartitions int
+ dupProbability float64
+ sstProbability float64
+
+ tenantID roachpb.TenantID
}
func parseRandomStreamConfig(streamURL *url.URL) (randomStreamConfig, error) {
c := randomStreamConfig{
- valueRange: 100,
- eventFrequency: 10 * time.Microsecond,
- kvsPerCheckpoint: 100,
- numPartitions: 1,
- dupProbability: 0.5,
- tenantID: roachpb.SystemTenantID,
+ valueRange: 100,
+ eventFrequency: 10 * time.Microsecond,
+ eventsPerCheckpoint: 30,
+ numPartitions: 1, // TODO(casper): increases this
+ dupProbability: 0.3,
+ sstProbability: 0.2,
+ tenantID: roachpb.SystemTenantID,
}
var err error
@@ -148,16 +137,23 @@ func parseRandomStreamConfig(streamURL *url.URL) (randomStreamConfig, error) {
}
}
- if kvFreqStr := streamURL.Query().Get(EventFrequency); kvFreqStr != "" {
- kvFreq, err := strconv.Atoi(kvFreqStr)
- c.eventFrequency = time.Duration(kvFreq)
+ if eventFreqStr := streamURL.Query().Get(EventFrequency); eventFreqStr != "" {
+ eventFreq, err := strconv.Atoi(eventFreqStr)
+ c.eventFrequency = time.Duration(eventFreq)
+ if err != nil {
+ return c, err
+ }
+ }
+
+ if eventsPerCheckpointStr := streamURL.Query().Get(EventsPerCheckpoint); eventsPerCheckpointStr != "" {
+ c.eventsPerCheckpoint, err = strconv.Atoi(eventsPerCheckpointStr)
if err != nil {
return c, err
}
}
- if kvsPerCheckpointStr := streamURL.Query().Get(KVsPerCheckpoint); kvsPerCheckpointStr != "" {
- c.kvsPerCheckpoint, err = strconv.Atoi(kvsPerCheckpointStr)
+ if sstProbabilityStr := streamURL.Query().Get(SSTProbability); sstProbabilityStr != "" {
+ c.sstProbability, err = strconv.ParseFloat(sstProbabilityStr, 32)
if err != nil {
return c, err
}
@@ -195,20 +191,88 @@ func (c randomStreamConfig) URL(table int) string {
q := u.Query()
q.Add(ValueRangeKey, strconv.Itoa(c.valueRange))
q.Add(EventFrequency, strconv.Itoa(int(c.eventFrequency)))
- q.Add(KVsPerCheckpoint, strconv.Itoa(c.kvsPerCheckpoint))
+ q.Add(EventsPerCheckpoint, strconv.Itoa(c.eventsPerCheckpoint))
q.Add(NumPartitions, strconv.Itoa(c.numPartitions))
q.Add(DupProbability, fmt.Sprintf("%f", c.dupProbability))
+ q.Add(SSTProbability, fmt.Sprintf("%f", c.sstProbability))
q.Add(TenantID, strconv.Itoa(int(c.tenantID.ToUint64())))
u.RawQuery = q.Encode()
return u.String()
}
-// randomStreamClient is a temporary stream client implementation that generates
+type randomEventGenerator struct {
+ rng *rand.Rand
+ config randomStreamConfig
+ numEventsSinceLastResolved int
+ sstMaker SSTableMakerFn
+ tableDesc *tabledesc.Mutable
+ systemKVs []roachpb.KeyValue
+}
+
+func newRandomEventGenerator(
+ rng *rand.Rand, partitionURL *url.URL, config randomStreamConfig, fn SSTableMakerFn,
+) (*randomEventGenerator, error) {
+ var partitionTableID int
+ partitionTableID, err := strconv.Atoi(partitionURL.Host)
+ if err != nil {
+ return nil, err
+ }
+ tableDesc, systemKVs, err := getDescriptorAndNamespaceKVForTableID(config, descpb.ID(partitionTableID))
+ if err != nil {
+ return nil, err
+ }
+ return &randomEventGenerator{
+ rng: rng,
+ config: config,
+ numEventsSinceLastResolved: 0,
+ sstMaker: fn,
+ tableDesc: tableDesc,
+ systemKVs: systemKVs,
+ }, nil
+}
+
+func (r *randomEventGenerator) generateNewEvent() streamingccl.Event {
+ var event streamingccl.Event
+ if r.numEventsSinceLastResolved == r.config.eventsPerCheckpoint {
+ // Emit a CheckpointEvent.
+ resolvedTime := timeutil.Now()
+ hlcResolvedTime := hlc.Timestamp{WallTime: resolvedTime.UnixNano()}
+ resolvedSpan := jobspb.ResolvedSpan{Span: r.tableDesc.TableSpan(keys.SystemSQLCodec), Timestamp: hlcResolvedTime}
+ event = streamingccl.MakeCheckpointEvent([]jobspb.ResolvedSpan{resolvedSpan})
+ r.numEventsSinceLastResolved = 0
+ } else {
+ // If there are system KVs to emit, prioritize those.
+ if len(r.systemKVs) > 0 {
+ systemKV := r.systemKVs[0]
+ systemKV.Value.Timestamp = hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}
+ event = streamingccl.MakeKVEvent(systemKV)
+ r.systemKVs = r.systemKVs[1:]
+ return event
+ }
+
+ // Emit SST with given probability.
+ // TODO(casper): add support for DelRange.
+ if prob := r.rng.Float64(); prob < r.config.sstProbability {
+ size := 10 + r.rng.Intn(30)
+ keyVals := make([]roachpb.KeyValue, 0, size)
+ for i := 0; i < size; i++ {
+ keyVals = append(keyVals, makeRandomKey(r.rng, r.config, r.tableDesc))
+ }
+ event = streamingccl.MakeSSTableEvent(r.sstMaker(keyVals))
+ } else {
+ event = streamingccl.MakeKVEvent(makeRandomKey(r.rng, r.config, r.tableDesc))
+ }
+ r.numEventsSinceLastResolved++
+ }
+ return event
+}
+
+// RandomStreamClient is a temporary stream client implementation that generates
// random events.
//
// The client can be configured to return more than one partition via the stream
// URL. Each partition covers a single table span.
-type randomStreamClient struct {
+type RandomStreamClient struct {
config randomStreamConfig
streamURL *url.URL
@@ -221,12 +285,12 @@ type randomStreamClient struct {
interceptors []InterceptFn
dialInterceptors []DialInterceptFn
heartbeatInterceptors []HeartbeatInterceptFn
+ sstMaker SSTableMakerFn
tableID int
}
}
-var _ Client = &randomStreamClient{}
-var _ InterceptableStreamClient = &randomStreamClient{}
+var _ Client = &RandomStreamClient{}
// newRandomStreamClient returns a stream client that generates a random set of
// events on a table with an integer key and integer value for the table with
@@ -243,7 +307,7 @@ func newRandomStreamClient(streamURL *url.URL) (Client, error) {
return c, nil
}
-func (m *randomStreamClient) getNextTableID() int {
+func (m *RandomStreamClient) getNextTableID() int {
m.mu.Lock()
defer m.mu.Unlock()
ret := m.mu.tableID
@@ -251,7 +315,7 @@ func (m *randomStreamClient) getNextTableID() int {
return ret
}
-func (m *randomStreamClient) tableDescForID(tableID int) (*tabledesc.Mutable, error) {
+func (m *RandomStreamClient) tableDescForID(tableID int) (*tabledesc.Mutable, error) {
partitionURI := m.config.URL(tableID)
partitionURL, err := url.Parse(partitionURI)
if err != nil {
@@ -266,12 +330,12 @@ func (m *randomStreamClient) tableDescForID(tableID int) (*tabledesc.Mutable, er
if err != nil {
return nil, err
}
- tableDesc, _, err := m.getDescriptorAndNamespaceKVForTableID(config, descpb.ID(partitionTableID))
+ tableDesc, _, err := getDescriptorAndNamespaceKVForTableID(config, descpb.ID(partitionTableID))
return tableDesc, err
}
// Dial implements Client interface.
-func (m *randomStreamClient) Dial(ctx context.Context) error {
+func (m *RandomStreamClient) Dial(ctx context.Context) error {
m.mu.Lock()
defer m.mu.Unlock()
for _, interceptor := range m.mu.dialInterceptors {
@@ -287,7 +351,7 @@ func (m *randomStreamClient) Dial(ctx context.Context) error {
}
// Plan implements the Client interface.
-func (m *randomStreamClient) Plan(ctx context.Context, id streaming.StreamID) (Topology, error) {
+func (m *RandomStreamClient) Plan(ctx context.Context, id streaming.StreamID) (Topology, error) {
topology := make(Topology, 0, m.config.numPartitions)
log.Infof(ctx, "planning random stream for tenant %d", m.config.tenantID)
@@ -315,7 +379,7 @@ func (m *randomStreamClient) Plan(ctx context.Context, id streaming.StreamID) (T
}
// Create implements the Client interface.
-func (m *randomStreamClient) Create(
+func (m *RandomStreamClient) Create(
ctx context.Context, target roachpb.TenantID,
) (streaming.StreamID, error) {
log.Infof(ctx, "creating random stream for tenant %d", target.ToUint64())
@@ -324,7 +388,7 @@ func (m *randomStreamClient) Create(
}
// Heartbeat implements the Client interface.
-func (m *randomStreamClient) Heartbeat(
+func (m *RandomStreamClient) Heartbeat(
ctx context.Context, _ streaming.StreamID, ts hlc.Timestamp,
) (streampb.StreamReplicationStatus, error) {
m.mu.Lock()
@@ -340,7 +404,7 @@ func (m *randomStreamClient) Heartbeat(
// getDescriptorAndNamespaceKVForTableID returns the namespace and descriptor
// KVs for the table with tableID.
-func (m *randomStreamClient) getDescriptorAndNamespaceKVForTableID(
+func getDescriptorAndNamespaceKVForTableID(
config randomStreamConfig, tableID descpb.ID,
) (*tabledesc.Mutable, []roachpb.KeyValue, error) {
tableName := fmt.Sprintf("%s%d", IngestionTablePrefix, tableID)
@@ -387,18 +451,19 @@ func (m *randomStreamClient) getDescriptorAndNamespaceKVForTableID(
}
// Close implements the Client interface.
-func (m *randomStreamClient) Close(ctx context.Context) error {
+func (m *RandomStreamClient) Close(ctx context.Context) error {
return nil
}
// Subscribe implements the Client interface.
-func (m *randomStreamClient) Subscribe(
+func (m *RandomStreamClient) Subscribe(
ctx context.Context, stream streaming.StreamID, spec SubscriptionToken, checkpoint hlc.Timestamp,
) (Subscription, error) {
partitionURL, err := url.Parse(string(spec))
if err != nil {
return nil, err
}
+ // add option for sst probability
config, err := parseRandomStreamConfig(partitionURL)
if err != nil {
return nil, err
@@ -411,85 +476,36 @@ func (m *randomStreamClient) Subscribe(
panic("cannot start random stream client event stream in the future")
}
- var partitionTableID int
- partitionTableID, err = strconv.Atoi(partitionURL.Host)
- if err != nil {
- return nil, err
- }
- log.Infof(ctx, "producing kvs for metadata for table %d for tenant %d based on %q", partitionTableID, config.tenantID, spec)
-
- tableDesc, systemKVs, err := m.getDescriptorAndNamespaceKVForTableID(config, descpb.ID(partitionTableID))
+ // rand is not thread safe, so create a random source for each partition.
+ rng, _ := randutil.NewPseudoRand()
+ m.mu.Lock()
+ reg, err := newRandomEventGenerator(rng, partitionURL, config, m.mu.sstMaker)
+ m.mu.Unlock()
if err != nil {
return nil, err
}
- copyKeyVal := func(keyVal *roachpb.KeyValue) *roachpb.KeyValue {
- res := roachpb.KeyValue{
- Key: make([]byte, len(keyVal.Key)),
- Value: roachpb.Value{
- RawBytes: make([]byte, len(keyVal.Value.RawBytes)),
- Timestamp: keyVal.Value.Timestamp,
- },
- }
- copy(res.Key, keyVal.Key)
- copy(res.Value.RawBytes, keyVal.Value.RawBytes)
- return &res
- }
-
receiveFn := func(ctx context.Context) error {
defer close(eventCh)
- // rand is not thread safe, so create a random source for each partition.
- r := rand.New(rand.NewSource(timeutil.Now().UnixNano()))
- kvInterval := config.eventFrequency
-
- numKVEventsSinceLastResolved := 0
-
- rng, _ := randutil.NewPseudoRand()
-
- var keyValCopy *roachpb.KeyValue
+ dataEventInterval := config.eventFrequency
+ var lastEventCopy streamingccl.Event
for {
var event streamingccl.Event
- if numKVEventsSinceLastResolved == config.kvsPerCheckpoint {
- // Emit a CheckpointEvent.
- resolvedTime := timeutil.Now()
- hlcResolvedTime := hlc.Timestamp{WallTime: resolvedTime.UnixNano()}
- resolvedSpan := jobspb.ResolvedSpan{Span: tableDesc.TableSpan(keys.SystemSQLCodec), Timestamp: hlcResolvedTime}
- event = streamingccl.MakeCheckpointEvent([]jobspb.ResolvedSpan{resolvedSpan})
- numKVEventsSinceLastResolved = 0
+ if lastEventCopy != nil && rng.Float64() < config.dupProbability {
+ event = duplicateEvent(lastEventCopy)
} else {
- // If there are system KVs to emit, prioritize those.
- if len(systemKVs) > 0 {
- systemKV := systemKVs[0]
- systemKV.Value.Timestamp = hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}
- event = streamingccl.MakeKVEvent(systemKV)
- systemKVs = systemKVs[1:]
- } else {
- numKVEventsSinceLastResolved++
- // Generate a duplicate KVEvent.
- if rng.Float64() < config.dupProbability && keyValCopy != nil {
- event = streamingccl.MakeKVEvent(*keyValCopy)
- } else {
- event = streamingccl.MakeKVEvent(makeRandomKey(r, config, tableDesc))
- }
- }
- // Create a copy of KeyValue generated as the KeyValue in the event might get modified later.
- keyValCopy = copyKeyVal(event.GetKV())
+ event = reg.generateNewEvent()
}
+ lastEventCopy = duplicateEvent(event)
select {
+ // The event may get modified after sent to the channel.
case eventCh <- event:
case <-ctx.Done():
return ctx.Err()
}
- if event.Type() == streamingccl.KVEvent {
- // Use the originally generated KeyValue copy as the KeyValue inside the event might
- // get modified by ingestion processor's tenant rekeyer.
- // 'keyValCopy' will only be set when it is a KV event. Copying the 'keyValCopy' again
- // to prevent the event being modified by interceptor again.
- event = streamingccl.MakeKVEvent(*copyKeyVal(keyValCopy))
- }
func() {
m.mu.Lock()
defer m.mu.Unlock()
@@ -497,13 +513,13 @@ func (m *randomStreamClient) Subscribe(
if len(m.mu.interceptors) > 0 {
for _, interceptor := range m.mu.interceptors {
if interceptor != nil {
- interceptor(event, spec)
+ interceptor(duplicateEvent(lastEventCopy), spec)
}
}
}
}()
- time.Sleep(kvInterval)
+ time.Sleep(dataEventInterval)
}
}
@@ -514,7 +530,7 @@ func (m *randomStreamClient) Subscribe(
}
// Complete implements the streamclient.Client interface.
-func (m *randomStreamClient) Complete(
+func (m *RandomStreamClient) Complete(
ctx context.Context, streamID streaming.StreamID, successfulIngestion bool,
) error {
return nil
@@ -594,31 +610,78 @@ func makeRandomKey(
}
}
-// RegisterInterception implements the InterceptableStreamClient interface.
-func (m *randomStreamClient) RegisterInterception(fn InterceptFn) {
+func duplicateEvent(event streamingccl.Event) streamingccl.Event {
+ var dup streamingccl.Event
+ switch event.Type() {
+ case streamingccl.CheckpointEvent:
+ resolvedSpans := make([]jobspb.ResolvedSpan, len(event.GetResolvedSpans()))
+ copy(resolvedSpans, event.GetResolvedSpans())
+ dup = streamingccl.MakeCheckpointEvent(resolvedSpans)
+ case streamingccl.KVEvent:
+ eventKV := event.GetKV()
+ rawBytes := make([]byte, len(eventKV.Value.RawBytes))
+ copy(rawBytes, eventKV.Value.RawBytes)
+ keyVal := roachpb.KeyValue{
+ Key: event.GetKV().Key.Clone(),
+ Value: roachpb.Value{
+ RawBytes: rawBytes,
+ Timestamp: eventKV.Value.Timestamp,
+ },
+ }
+ dup = streamingccl.MakeKVEvent(keyVal)
+ case streamingccl.SSTableEvent:
+ sst := event.GetSSTable()
+ dataCopy := make([]byte, len(sst.Data))
+ copy(dataCopy, sst.Data)
+ dup = streamingccl.MakeSSTableEvent(roachpb.RangeFeedSSTable{
+ Data: dataCopy,
+ Span: sst.Span.Clone(),
+ WriteTS: sst.WriteTS,
+ })
+ default:
+ panic("unsopported event type")
+ }
+ return dup
+}
+
+// RegisterInterception registers a interceptor to be called after
+// an event is emitted from the client.
+func (m *RandomStreamClient) RegisterInterception(fn InterceptFn) {
m.mu.Lock()
defer m.mu.Unlock()
m.mu.interceptors = append(m.mu.interceptors, fn)
}
-// RegisterDialInterception implements the InterceptableStreamClient interface.
-func (m *randomStreamClient) RegisterDialInterception(fn DialInterceptFn) {
+// RegisterDialInterception registers a interceptor to be called
+// whenever Dial is called on the client.
+func (m *RandomStreamClient) RegisterDialInterception(fn DialInterceptFn) {
m.mu.Lock()
defer m.mu.Unlock()
m.mu.dialInterceptors = append(m.mu.dialInterceptors, fn)
}
-// RegisterHeartbeatInterception implements the InterceptableStreamClient interface.
-func (m *randomStreamClient) RegisterHeartbeatInterception(fn HeartbeatInterceptFn) {
+// RegisterHeartbeatInterception registers an interceptor to be called
+// whenever Heartbeat is called on the client.
+func (m *RandomStreamClient) RegisterHeartbeatInterception(fn HeartbeatInterceptFn) {
m.mu.Lock()
defer m.mu.Unlock()
m.mu.heartbeatInterceptors = append(m.mu.heartbeatInterceptors, fn)
}
-// ClearInterceptors implements the InterceptableStreamClient interface.
-func (m *randomStreamClient) ClearInterceptors() {
+// RegisterSSTableGenerator registers a functor to be called
+// whenever an SSTable event is to be generated.
+func (m *RandomStreamClient) RegisterSSTableGenerator(fn SSTableMakerFn) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ m.mu.sstMaker = fn
+}
+
+// ClearInterceptors clears all registered interceptors on the client.
+func (m *RandomStreamClient) ClearInterceptors() {
m.mu.Lock()
defer m.mu.Unlock()
- m.mu.interceptors = make([]InterceptFn, 0)
- m.mu.heartbeatInterceptors = make([]HeartbeatInterceptFn, 0)
+ m.mu.interceptors = m.mu.interceptors[:0]
+ m.mu.heartbeatInterceptors = m.mu.heartbeatInterceptors[:0]
+ m.mu.dialInterceptors = m.mu.dialInterceptors[:0]
+ m.mu.sstMaker = nil
}
diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go
index 0c3c8483b57d..9ba472fe4e5e 100644
--- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go
+++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go
@@ -419,7 +419,6 @@ func (sf *streamIngestionFrontier) maybeUpdatePartitionProgress() error {
}
progress := md.Progress
-
// Keep the recorded highwater empty until some advancement has been made
if sf.highWaterAtStart.Less(highWatermark) {
progress.Progress = &jobspb.Progress_HighWater{
diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor_test.go
index c33f7f08192b..abb7cd46427e 100644
--- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor_test.go
+++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor_test.go
@@ -302,14 +302,13 @@ func TestStreamIngestionFrontierProcessor(t *testing.T) {
defer func() {
require.NoError(t, client.Close(context.Background()))
}()
- interceptable, ok := client.(streamclient.InterceptableStreamClient)
- require.True(t, ok)
- defer interceptable.ClearInterceptors()
+
+ client.ClearInterceptors()
// Record heartbeats in a list and terminate the client once the expected
// frontier timestamp has been reached
heartbeats := make([]hlc.Timestamp, 0)
- interceptable.RegisterHeartbeatInterception(func(heartbeatTs hlc.Timestamp) {
+ client.RegisterHeartbeatInterception(func(heartbeatTs hlc.Timestamp) {
heartbeats = append(heartbeats, heartbeatTs)
if tc.expectedFrontierTimestamp.LessEq(heartbeatTs) {
doneCh <- struct{}{}
diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
index b0b24ae27746..b0e9dd0f18f4 100644
--- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
+++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
@@ -428,9 +428,7 @@ func (s *streamIngestionResumer) cancelProducerJob(
streamID, s.job.ID())
if err = client.Complete(ctx, streamID, false /* successfulIngestion */); err != nil {
log.Warningf(ctx, "encountered error when canceling the producer job: %v", err)
- fmt.Println("canceled failure", err)
}
- fmt.Println("cancel sent")
if err = client.Close(ctx); err != nil {
log.Warningf(ctx, "encountered error when closing the stream client: %v", err)
}
diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go
index 10f3b676bc2c..5b0d0040f35e 100644
--- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go
+++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go
@@ -697,7 +697,7 @@ func (sip *streamIngestionProcessor) bufferKV(kv *roachpb.KeyValue) error {
}
func (sip *streamIngestionProcessor) bufferCheckpoint(event partitionEvent) error {
- resolvedSpans := *event.GetResolvedSpans()
+ resolvedSpans := event.GetResolvedSpans()
if resolvedSpans == nil {
return errors.New("checkpoint event expected to have resolved spans")
}
diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go
index 9137af0fac83..3b62a63314cf 100644
--- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go
+++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go
@@ -37,6 +37,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/distsqlutils"
+ "github.com/cockroachdb/cockroach/pkg/testutils/storageutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
@@ -247,7 +248,8 @@ func TestStreamIngestionProcessor(t *testing.T) {
{ID: "2", SubscriptionToken: p2, Spans: []roachpb.Span{p2Span}},
}
out, err := runStreamIngestionProcessor(ctx, t, registry, kvDB,
- partitions, startTime, []jobspb.ResolvedSpan{}, nil /* interceptEvents */, tenantRekey, mockClient, nil /* cutoverProvider */, nil /* streamingTestingKnobs */)
+ partitions, startTime, []jobspb.ResolvedSpan{}, tenantRekey,
+ mockClient, nil /* cutoverProvider */, nil /* streamingTestingKnobs */)
require.NoError(t, err)
emittedRows := readRows(out)
@@ -290,7 +292,8 @@ func TestStreamIngestionProcessor(t *testing.T) {
lastClientStart[token] = clientStartTime
}}
out, err := runStreamIngestionProcessor(ctx, t, registry, kvDB,
- partitions, startTime, checkpoint, nil /* interceptEvents */, tenantRekey, mockClient, nil /* cutoverProvider */, streamingTestingKnobs)
+ partitions, startTime, checkpoint, tenantRekey, mockClient,
+ nil /* cutoverProvider */, streamingTestingKnobs)
require.NoError(t, err)
emittedRows := readRows(out)
@@ -313,7 +316,8 @@ func TestStreamIngestionProcessor(t *testing.T) {
{SubscriptionToken: streamclient.SubscriptionToken("2")},
}
out, err := runStreamIngestionProcessor(ctx, t, registry, kvDB,
- partitions, startTime, []jobspb.ResolvedSpan{}, nil /* interceptEvents */, tenantRekey, &errorStreamClient{}, nil /* cutoverProvider */, nil /* streamingTestingKnobs */)
+ partitions, startTime, []jobspb.ResolvedSpan{}, tenantRekey, &errorStreamClient{},
+ nil /* cutoverProvider */, nil /* streamingTestingKnobs */)
require.NoError(t, err)
// Expect no rows, and just the error.
@@ -445,10 +449,13 @@ func TestRandomClientGeneration(t *testing.T) {
// The random client returns system and table data partitions.
streamClient, err := streamclient.NewStreamClient(ctx, streamingccl.StreamAddress(streamAddr))
require.NoError(t, err)
- id, err := streamClient.Create(ctx, roachpb.MakeTenantID(tenantID))
+
+ randomStreamClient, ok := streamClient.(*streamclient.RandomStreamClient)
+ require.True(t, ok)
+ id, err := randomStreamClient.Create(ctx, roachpb.MakeTenantID(tenantID))
require.NoError(t, err)
- topo, err := streamClient.Plan(ctx, id)
+ topo, err := randomStreamClient.Plan(ctx, id)
require.NoError(t, err)
// One system and two table data partitions.
require.Equal(t, 2 /* numPartitions */, len(topo))
@@ -467,11 +474,17 @@ func TestRandomClientGeneration(t *testing.T) {
nil /* tableRekeys */, []execinfrapb.TenantRekey{tenantRekey}, true /* restoreTenantFromStream */)
require.NoError(t, err)
streamValidator := newStreamClientValidator(rekeyer)
- validator := registerValidatorWithClient(streamValidator)
+
+ randomStreamClient.ClearInterceptors()
+ randomStreamClient.RegisterSSTableGenerator(func(keyValues []roachpb.KeyValue) roachpb.RangeFeedSSTable {
+ return sstMaker(t, keyValues)
+ })
+ randomStreamClient.RegisterInterception(cancelAfterCheckpoints)
+ randomStreamClient.RegisterInterception(validateFnWithValidator(t, streamValidator))
out, err := runStreamIngestionProcessor(ctx, t, registry, kvDB,
- topo, startTime, []jobspb.ResolvedSpan{}, []streamclient.InterceptFn{cancelAfterCheckpoints, validator}, tenantRekey,
- streamClient, noCutover{}, nil /* streamingTestingKnobs*/)
+ topo, startTime, []jobspb.ResolvedSpan{}, tenantRekey,
+ randomStreamClient, noCutover{}, nil /* streamingTestingKnobs*/)
require.NoError(t, err)
partitionSpanToTableID := getPartitionSpanToTableID(t, topo)
@@ -537,14 +550,13 @@ func runStreamIngestionProcessor(
partitions streamclient.Topology,
startTime hlc.Timestamp,
checkpoint []jobspb.ResolvedSpan,
- interceptEvents []streamclient.InterceptFn,
tenantRekey execinfrapb.TenantRekey,
mockClient streamclient.Client,
cutoverProvider cutoverProvider,
streamingTestingKnobs *sql.StreamingTestingKnobs,
) (*distsqlutils.RowBuffer, error) {
sip, out, err := getStreamIngestionProcessor(ctx, t, registry, kvDB,
- partitions, startTime, checkpoint, interceptEvents, tenantRekey, mockClient, cutoverProvider, streamingTestingKnobs)
+ partitions, startTime, checkpoint, tenantRekey, mockClient, cutoverProvider, streamingTestingKnobs)
require.NoError(t, err)
sip.Run(ctx)
@@ -567,7 +579,6 @@ func getStreamIngestionProcessor(
partitions streamclient.Topology,
startTime hlc.Timestamp,
checkpoint []jobspb.ResolvedSpan,
- interceptEvents []streamclient.InterceptFn,
tenantRekey execinfrapb.TenantRekey,
mockClient streamclient.Client,
cutoverProvider cutoverProvider,
@@ -624,11 +635,6 @@ func getStreamIngestionProcessor(
sip.cutoverProvider = cutoverProvider
}
- if interceptable, ok := sip.forceClientForTests.(streamclient.InterceptableStreamClient); ok {
- for _, interceptor := range interceptEvents {
- interceptable.RegisterInterception(interceptor)
- }
- }
return sip, out, err
}
@@ -642,33 +648,51 @@ func resolvedSpansMinTS(resolvedSpans []jobspb.ResolvedSpan) hlc.Timestamp {
return minTS
}
-func registerValidatorWithClient(
- validator *streamClientValidator,
+func noteKeyVal(
+ validator *streamClientValidator, keyVal roachpb.KeyValue, spec streamclient.SubscriptionToken,
+) {
+ if validator.rekeyer != nil {
+ rekey, _, err := validator.rekeyer.RewriteKey(keyVal.Key)
+ if err != nil {
+ panic(err.Error())
+ }
+ keyVal.Key = rekey
+ keyVal.Value.ClearChecksum()
+ keyVal.Value.InitChecksum(keyVal.Key)
+ }
+ err := validator.noteRow(string(spec), string(keyVal.Key), string(keyVal.Value.RawBytes),
+ keyVal.Value.Timestamp)
+ if err != nil {
+ panic(err.Error())
+ }
+}
+
+func validateFnWithValidator(
+ t *testing.T, validator *streamClientValidator,
) func(event streamingccl.Event, spec streamclient.SubscriptionToken) {
return func(event streamingccl.Event, spec streamclient.SubscriptionToken) {
switch event.Type() {
case streamingccl.CheckpointEvent:
- resolvedTS := resolvedSpansMinTS(*event.GetResolvedSpans())
+ resolvedTS := resolvedSpansMinTS(event.GetResolvedSpans())
err := validator.noteResolved(string(spec), resolvedTS)
if err != nil {
panic(err.Error())
}
- case streamingccl.KVEvent:
- keyVal := *event.GetKV()
- if validator.rekeyer != nil {
- rekey, _, err := validator.rekeyer.RewriteKey(keyVal.Key)
- if err != nil {
- panic(err.Error())
- }
- keyVal.Key = rekey
- keyVal.Value.ClearChecksum()
- keyVal.Value.InitChecksum(keyVal.Key)
- }
- err := validator.noteRow(string(spec), string(keyVal.Key), string(keyVal.Value.RawBytes),
- keyVal.Value.Timestamp)
- if err != nil {
- panic(err.Error())
+ case streamingccl.SSTableEvent:
+ kvs := storageutils.ScanSST(t, event.GetSSTable().Data)
+ for _, keyVal := range kvs.MVCCKeyValues() {
+ noteKeyVal(validator, roachpb.KeyValue{
+ Key: keyVal.Key.Key,
+ Value: roachpb.Value{
+ RawBytes: keyVal.Value,
+ Timestamp: keyVal.Key.Timestamp,
+ },
+ }, spec)
}
+ case streamingccl.KVEvent:
+ noteKeyVal(validator, *event.GetKV(), spec)
+ case streamingccl.DeleteRangeEvent:
+ panic(errors.New("unsupported event type"))
}
}
}
diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_test.go
index ca6d32f799e0..b6a730d0fe4b 100644
--- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_test.go
+++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_test.go
@@ -12,6 +12,7 @@ import (
"context"
gosql "database/sql"
"fmt"
+ "sort"
"testing"
"time"
@@ -24,12 +25,14 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
+ clustersettings "github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
+ "github.com/cockroachdb/cockroach/pkg/testutils/storageutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
@@ -64,6 +67,35 @@ func getTestRandomClientURI(tenantID int) string {
return makeTestStreamURI(valueRange, kvsPerResolved, numPartitions, kvFrequency, dupProbability, tenantID)
}
+func sstMaker(t *testing.T, keyValues []roachpb.KeyValue) roachpb.RangeFeedSSTable {
+ sort.Slice(keyValues, func(i, j int) bool {
+ return keyValues[i].Key.Compare(keyValues[j].Key) < 0
+ })
+ batchTS := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}
+ kvs := make(storageutils.KVs, 0, len(keyValues))
+ for i, keyVal := range keyValues {
+ if i > 0 && keyVal.Key.Equal(keyValues[i-1].Key) {
+ continue
+ }
+ kvs = append(kvs, storage.MVCCKeyValue{
+ Key: storage.MVCCKey{
+ Key: keyVal.Key,
+ Timestamp: batchTS,
+ },
+ Value: keyVal.Value.RawBytes,
+ })
+ }
+ data, start, end := storageutils.MakeSST(t, clustersettings.MakeTestingClusterSettings(), kvs)
+ return roachpb.RangeFeedSSTable{
+ Data: data,
+ Span: roachpb.Span{
+ Key: start,
+ EndKey: end,
+ },
+ WriteTS: batchTS,
+ }
+}
+
// TestStreamIngestionJobWithRandomClient creates a stream ingestion job that is
// fed KVs from the random stream client. After receiving a certain number of
// resolved timestamp events the test completes the job to tear down the flow,
@@ -96,22 +128,17 @@ func TestStreamIngestionJobWithRandomClient(t *testing.T) {
}}, true /* restoreTenantFromStream */)
require.NoError(t, err)
streamValidator := newStreamClientValidator(rekeyer)
- registerValidator := registerValidatorWithClient(streamValidator)
client := streamclient.GetRandomStreamClientSingletonForTesting()
defer func() {
require.NoError(t, client.Close(ctx))
}()
- interceptEvents := []streamclient.InterceptFn{
- completeJobAfterCheckpoints,
- registerValidator,
- }
- if interceptable, ok := client.(streamclient.InterceptableStreamClient); ok {
- for _, interceptor := range interceptEvents {
- interceptable.RegisterInterception(interceptor)
- }
- } else {
- t.Fatal("expected the random stream client to be interceptable")
- }
+
+ client.ClearInterceptors()
+ client.RegisterInterception(completeJobAfterCheckpoints)
+ client.RegisterInterception(validateFnWithValidator(t, streamValidator))
+ client.RegisterSSTableGenerator(func(keyValues []roachpb.KeyValue) roachpb.RangeFeedSSTable {
+ return sstMaker(t, keyValues)
+ })
var receivedRevertRequest chan struct{}
var allowResponse chan struct{}
diff --git a/pkg/ccl/streamingccl/streamingtest/replication_helpers.go b/pkg/ccl/streamingccl/streamingtest/replication_helpers.go
index 46bb3cb6212f..9c1d17da6808 100644
--- a/pkg/ccl/streamingccl/streamingtest/replication_helpers.go
+++ b/pkg/ccl/streamingccl/streamingtest/replication_helpers.go
@@ -65,7 +65,7 @@ func ResolvedAtLeast(lo hlc.Timestamp) FeedEventPredicate {
if msg.Type() != streamingccl.CheckpointEvent {
return false
}
- return lo.LessEq(minResolvedTimestamp(*msg.GetResolvedSpans()))
+ return lo.LessEq(minResolvedTimestamp(msg.GetResolvedSpans()))
}
}
@@ -114,7 +114,7 @@ func (rf *ReplicationFeed) ObserveResolved(ctx context.Context, lo hlc.Timestamp
require.NoError(rf.t, rf.consumeUntil(ctx, ResolvedAtLeast(lo), func(err error) bool {
return true
}))
- return minResolvedTimestamp(*rf.msg.GetResolvedSpans())
+ return minResolvedTimestamp(rf.msg.GetResolvedSpans())
}
// ObserveError consumes the feed until the feed is exhausted, and the final error should
diff --git a/pkg/ccl/streamingccl/streamproducer/producer_job.go b/pkg/ccl/streamingccl/streamproducer/producer_job.go
index 7970f3fb6b1c..04091c7ba735 100644
--- a/pkg/ccl/streamingccl/streamproducer/producer_job.go
+++ b/pkg/ccl/streamingccl/streamproducer/producer_job.go
@@ -101,7 +101,6 @@ func (p *producerJobResumer) Resume(ctx context.Context, execCtx interface{}) er
case jobspb.StreamReplicationProgress_FINISHED_SUCCESSFULLY:
return p.releaseProtectedTimestamp(ctx, execCfg)
case jobspb.StreamReplicationProgress_FINISHED_UNSUCCESSFULLY:
- fmt.Println("producer try update cancel requested")
return j.Update(ctx, nil, func(txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
ju.UpdateStatus(jobs.StatusCancelRequested)
return nil
diff --git a/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go b/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go
index 2d064a854e16..8ce708f0f742 100644
--- a/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go
+++ b/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go
@@ -10,7 +10,6 @@ package streamproducer
import (
"context"
- "fmt"
"time"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
@@ -268,7 +267,6 @@ func completeReplicationStream(
md.Progress.RunningStatus = "succeeding this producer job as the corresponding " +
"stream ingestion finished successfully"
} else {
- fmt.Println("producer update stream ingestion status")
md.Progress.GetStreamReplication().StreamIngestionStatus =
jobspb.StreamReplicationProgress_FINISHED_UNSUCCESSFULLY
md.Progress.RunningStatus = "canceling this producer job as the corresponding " +
diff --git a/pkg/cli/cliflags/flags.go b/pkg/cli/cliflags/flags.go
index d4283a200011..afdded387a64 100644
--- a/pkg/cli/cliflags/flags.go
+++ b/pkg/cli/cliflags/flags.go
@@ -938,6 +938,17 @@ memory that the store may consume, for example:
--store=type=mem,size=20GiB
--store=type=mem,size=90%
+
+Optionally, to configure admission control enforcement to prevent disk
+bandwidth saturation, the "provisioned-rate" field can be specified with
+the "disk-name" and an optional "bandwidth". The bandwidth is used to override
+the value of the cluster setting, kv.store.admission.provisioned_bandwidth.
+For example:
+