Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…99239 #99263 #99278

98980: kvcoord: Add metric to keep track of restarted ranges in rangefeed r=miretskiy a=miretskiy

Add a `distsender.rangefeed.restart_ranges` metric to keep track of the number of ranges restarted due to transient error.

Epic: CRDB-25044
Release note: None

99069: storage/cloud: correct the flag name in implicit credentials error message r=rhu713 a=taroface

When `--external-io-disable-implicit-credentials` is set and the user issues a command with `AUTH=implicit`, the resulting error message has the wrong flag name (`disable` is left out). Searching for that flag name in the docs doesn't return any results. The flag name is corrected in this PR.

Release note: none
Release justification: CLI bug

99077: changefeedccl: Allow timeout override r=miretskiy a=miretskiy

Add timeout URL parameter for schema registry URIs. Prior to this change, all schema registry calls used default time out of 3 seconds.  This PR increases the timeout to 30 seconds, and allows timeout to be specified via `timeout=T` URL parameter.

Informs https://github.com/cockroachlabs/support/issues/2173

Release note (enterprise change): AVRO schema registry URI allow additional `timeout=T` query parameter to change the default timeout for contacting schema registry.

99141: storage: CheckSSTConflict fix for nexting over overlapping points r=jbowens a=itsbilal

Previously, the nexting logic around both iterators being at a range key and not a point key was flawed in that we'd miss ext points that were in between
the current and next sst keys, when we'd next both of them. This change addresses that.

It also addresses other miscellaneous corner cases around stats calculations with overlapping sst/engine range keys and point keys. All these bugs were found with the upcoming CheckSSTConflicts randomized test in #98408.

Epic: none

Release note: None

99146: opt: speed up lookup constraint builder r=mgartner a=mgartner

#### opt: add benchmark with many lookup joins

This commit adds an optimizer benchmark that explores many lookup joins.
It explores many potential lookup joins that do not ultimately get added
to the memo, as well as many lookup joins that do get added to the memo.

Release note: None

#### opt: split HasSingleColumnConstValues into two functions

This commit splits HasSingleColumnConstValues into two functions - one
that returns a boolean if a constraint set constrains a single column to
a set of constant, non-null values, and another function that returns
the constant values. The former is more efficient when the only the
boolean is needed.

Release note: None

#### opt: simplify lookup join constraint builder

This commit reduces computation and allocations when attempting to
build lookup join constraints by performing a simple column ID equality
before more complex computations and allocations.

Release note: None

#### opt: reduce allocations when building lookup join constraints

During the construction of lookup join constraints, two allocations of a
`opt.ColList` have been combined into a single allocation, and
allocation of a `memo.FiltersExpr` to store remaining filters is now
only performed if necessary.

Release note: None

These changes offer a nice speedup for the newly added benchmark:

```
name                         old time/op    new time/op    delta
SlowQueries/slow-query-1-10    15.8ms ± 1%    15.7ms ± 1%     ~     (p=0.690 n=5+5)
SlowQueries/slow-query-2-10     220ms ± 0%     219ms ± 0%     ~     (p=0.095 n=5+5)
SlowQueries/slow-query-3-10    63.0ms ± 1%    62.4ms ± 0%   -0.98%  (p=0.008 n=5+5)
SlowQueries/slow-query-4-10     1.70s ± 1%     1.38s ± 0%  -19.22%  (p=0.008 n=5+5)

name                         old alloc/op   new alloc/op   delta
SlowQueries/slow-query-1-10    7.04MB ± 0%    6.98MB ± 0%   -0.79%  (p=0.008 n=5+5)
SlowQueries/slow-query-2-10    48.7MB ± 0%    48.7MB ± 0%   -0.11%  (p=0.008 n=5+5)
SlowQueries/slow-query-3-10    45.1MB ± 0%    44.9MB ± 0%   -0.55%  (p=0.008 n=5+5)
SlowQueries/slow-query-4-10     878MB ± 0%     737MB ± 0%  -16.03%  (p=0.008 n=5+5)

name                         old allocs/op  new allocs/op  delta
SlowQueries/slow-query-1-10     76.1k ± 0%     75.8k ± 0%   -0.38%  (p=0.008 n=5+5)
SlowQueries/slow-query-2-10      401k ± 0%      400k ± 0%   -0.25%  (p=0.008 n=5+5)
SlowQueries/slow-query-3-10      390k ± 0%      389k ± 0%   -0.21%  (p=0.008 n=5+5)
SlowQueries/slow-query-4-10     18.2M ± 0%     17.4M ± 0%   -4.44%  (p=0.008 n=5+5)
```

Epic: None


99154: ui: stop polling in stmt fingerprint details page, change default sort on stmts r=maryliag a=xinhaoz

See individual commits.

https://www.loom.com/share/17569db4a0c04a968dabbc4421d429bf

99169: kv: unflake TestDelegateSnapshot r=kvoli a=andrewbaptist

Fixes: #96841
Fixes: #96525

Previously this test would assume that all snapshots came from the sending of snapshots through the AdminChangeReplicasRequest which end up as type OTHER. However occassionally we get a spurious raft snapshot which makes this test flaky. This change ignores any raft snapshots that are sent.

Epic: none
Release note: None

99172: upgrades: hardcode descriptors in system_rbr_indexes r=JeffSwenson a=JeffSwenson

Previously, if a change was made to the system.sql_instances, system.lease, or system.sqlliveness bootstrap schema, it would change the behavior of the upgrade attached to the V23_1_SystemRbrReadNew version gate.

Now, the content of the descriptors is hard coded in the upgrade so that the behavior is not accidentally changed in the future.

Fixes: #99074

Release note: None

99180: builtins: add builtin functions which cast to OID to the distSQL block list r=michae2,cucaroach a=msirek

Distributed SQL which executes functions or casts to OID rely on `planner` receiver functions to execute internal SQL to get information about the OID from system tables. If these casts occur on a remote processor, the `planner` is not accessible and a dummy planner is used, which does not implement these receiver functions. To prevent internal errors, these casts or problem functions are added to a distSQL block list by `distSQLExprCheckVisitor`. A cast to an OID can also be done via a builtin function of the same name as the target type, e.g. `regproc`. These builtins do not currently have `DistsqlBlocklist` set, allowing distributed execution.

The solution is to mark `DistsqlBlocklist` as true for any builtin function which casts to an OID type.

Fixes #98373

Release note: None

99239: appstats: fix percentile greater than max latency r=maryliag a=maryliag

Part Of #99070
When an execution happens, its latency is added to a stream and then ordered so percentiles can be queried.
When getting the percentile values, we don't have the timestamp of when each value was added, meaning when we query the stream we could be getting values from a previous aggregation timestamp, if the current windows has very few executions (the stream has a limit, so we only have the most recent execution, but if the statement is not run frequently this stream can have old data).

The way this information is stored will need to be changed, but for now a patchy solution was added so we don't have the case where we show percentiles greater than the actual max.

Release note (bug fix): Add a check so percentiles are never greater than the max latency value.

99263: roachtest: copyfrom fix cluster package install r=aliher1911 a=aliher1911

When installing packages, look on cluster remoteness as proxy for arch instead of roachtest runtime which is runs different arch.

Epic: none

Release note: None

99278: sql: fix update helper optional from clause r=rytaft a=lyang24

fixes #98662

sanity testing:
output
<img width="265" alt="Screen Shot 2023-03-22 at 1 01 10 PM" src="https://user-images.githubusercontent.com/20375035/227027849-34f34bb4-d52b-4de4-8a5b-456ee8b27f1b.png">
sample sql
<img width="874" alt="Screen Shot 2023-03-22 at 1 10 15 PM" src="https://user-images.githubusercontent.com/20375035/227027874-3f6515f4-fdbe-4c64-9d6f-03eb9b5c67f3.png">


Release note (sql change): fix helper message on update sql to correctly position the optional from cause.

Co-authored-by: Yevgeniy Miretskiy <[email protected]>
Co-authored-by: Ryan Kuo <[email protected]>
Co-authored-by: Bilal Akhtar <[email protected]>
Co-authored-by: Marcus Gartner <[email protected]>
Co-authored-by: Xin Hao Zhang <[email protected]>
Co-authored-by: Andrew Baptist <[email protected]>
Co-authored-by: Jeff <[email protected]>
Co-authored-by: Mark Sirek <[email protected]>
Co-authored-by: maryliag <[email protected]>
Co-authored-by: Oleg Afanasyev <[email protected]>
Co-authored-by: Eric.Yang <[email protected]>
  • Loading branch information
12 people committed Mar 22, 2023
13 parents c2460f1 + c9e4529 + 1b8d40f + 6ca38f2 + 82267b1 + 37901ef + d8f099b + 8316ddc + b292729 + b837c30 + 7ab6c32 + 7040439 + 26c719d commit 5fc479d
Show file tree
Hide file tree
Showing 38 changed files with 810 additions and 242 deletions.
10 changes: 10 additions & 0 deletions pkg/ccl/changefeedccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type AggMetrics struct {
RunningCount *aggmetric.AggGauge
BatchReductionCount *aggmetric.AggGauge
InternalRetryMessageCount *aggmetric.AggGauge
SchemaRegistrations *aggmetric.AggCounter
SchemaRegistryRetries *aggmetric.AggCounter

// There is always at least 1 sliMetrics created for defaultSLI scope.
Expand Down Expand Up @@ -118,6 +119,7 @@ type sliMetrics struct {
RunningCount *aggmetric.Gauge
BatchReductionCount *aggmetric.Gauge
InternalRetryMessageCount *aggmetric.Gauge
SchemaRegistrations *aggmetric.Counter
SchemaRegistryRetries *aggmetric.Counter
}

Expand Down Expand Up @@ -502,6 +504,12 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics {
Measurement: "Retries",
Unit: metric.Unit_COUNT,
}
metaSchemaRegistryRegistrations := metric.Metadata{
Name: "changefeed.schema_registry.registrations",
Help: "Number of registration attempts with the schema registry",
Measurement: "Registrations",
Unit: metric.Unit_COUNT,
}
// NB: When adding new histograms, use sigFigs = 1. Older histograms
// retain significant figures of 2.
b := aggmetric.MakeBuilder("scope")
Expand Down Expand Up @@ -555,6 +563,7 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics {
BatchReductionCount: b.Gauge(metaBatchReductionCount),
InternalRetryMessageCount: b.Gauge(metaInternalRetryMessageCount),
SchemaRegistryRetries: b.Counter(metaSchemaRegistryRetriesCount),
SchemaRegistrations: b.Counter(metaSchemaRegistryRegistrations),
}
a.mu.sliMetrics = make(map[string]*sliMetrics)
_, err := a.getOrCreateScope(defaultSLIScope)
Expand Down Expand Up @@ -611,6 +620,7 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
BatchReductionCount: a.BatchReductionCount.AddChild(scope),
InternalRetryMessageCount: a.InternalRetryMessageCount.AddChild(scope),
SchemaRegistryRetries: a.SchemaRegistryRetries.AddChild(scope),
SchemaRegistrations: a.SchemaRegistrations.AddChild(scope),
}

a.mu.sliMetrics[scope] = sm
Expand Down
47 changes: 36 additions & 11 deletions pkg/ccl/changefeedccl/schema_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"io"
"net/url"
"path"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
Expand Down Expand Up @@ -60,23 +61,29 @@ type confluentSchemaRegistry struct {

var _ schemaRegistry = (*confluentSchemaRegistry)(nil)

type schemaRegistryParams map[string][]byte
type schemaRegistryParams struct {
params map[string][]byte
timeout time.Duration
}

func (s schemaRegistryParams) caCert() []byte {
return s[changefeedbase.RegistryParamCACert]
return s.params[changefeedbase.RegistryParamCACert]
}

func (s schemaRegistryParams) clientCert() []byte {
return s[changefeedbase.RegistryParamClientCert]
return s.params[changefeedbase.RegistryParamClientCert]
}

func (s schemaRegistryParams) clientKey() []byte {
return s[changefeedbase.RegistryParamClientKey]
return s.params[changefeedbase.RegistryParamClientKey]
}

func getAndDeleteParams(u *url.URL) (schemaRegistryParams, error) {
const timeoutParam = "timeout"
const defaultSchemaRegistryTimeout = 30 * time.Second

func getAndDeleteParams(u *url.URL) (*schemaRegistryParams, error) {
query := u.Query()
s := make(schemaRegistryParams, 3)
s := schemaRegistryParams{params: make(map[string][]byte, 3)}
for _, k := range []string{
changefeedbase.RegistryParamCACert,
changefeedbase.RegistryParamClientCert,
Expand All @@ -87,14 +94,26 @@ func getAndDeleteParams(u *url.URL) (schemaRegistryParams, error) {
if err != nil {
return nil, errors.Wrapf(err, "param %s must be base 64 encoded", k)
}
s[k] = decoded
s.params[k] = decoded
query.Del(k)
}
}

if strTimeout := query.Get(timeoutParam); strTimeout != "" {
dur, err := time.ParseDuration(strTimeout)
if err != nil {
return nil, err
}
s.timeout = dur
} else {
// Default timeout in httputil is way too low. Use something more reasonable.
s.timeout = defaultSchemaRegistryTimeout
}

// remove crdb query params to ensure compatibility with schema
// registry implementation
u.RawQuery = query.Encode()
return s, nil
return &s, nil
}

func newConfluentSchemaRegistry(
Expand Down Expand Up @@ -140,14 +159,17 @@ func newConfluentSchemaRegistry(
// Setup the httputil.Client to use when dialing Confluent schema registry. If `ca_cert`
// is set as a query param in the registry URL, client should trust the corresponding
// cert while dialing. Otherwise, use the DefaultClient.
func setupHTTPClient(baseURL *url.URL, s schemaRegistryParams) (*httputil.Client, error) {
if len(s) == 0 {
return httputil.DefaultClient, nil
func setupHTTPClient(baseURL *url.URL, s *schemaRegistryParams) (*httputil.Client, error) {
if len(s.params) == 0 {
return httputil.NewClientWithTimeout(s.timeout), nil
}

httpClient, err := newClientFromTLSKeyPair(s.caCert(), s.clientCert(), s.clientKey())
if err != nil {
return nil, err
}
httpClient.Timeout = s.timeout

if baseURL.Scheme == "http" {
log.Warningf(context.Background(), "TLS configuration provided but schema registry %s uses HTTP", baseURL)
}
Expand Down Expand Up @@ -217,6 +239,9 @@ func (r *confluentSchemaRegistry) RegisterSchemaForSubject(
if err != nil {
return 0, err
}
if r.sliMetrics != nil {
r.sliMetrics.SchemaRegistrations.Inc(1)
}
return id, nil
}

Expand Down
21 changes: 21 additions & 0 deletions pkg/ccl/changefeedccl/schema_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ package changefeedccl
import (
"context"
"errors"
"net/url"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest"
Expand All @@ -34,6 +36,24 @@ func TestConfluentSchemaRegistry(t *testing.T) {
_, err := newConfluentSchemaRegistry(url, nil, nil)
require.Error(t, err)
})

t.Run("configure timeout", func(t *testing.T) {
regServer := cdctest.StartTestSchemaRegistry()
defer regServer.Close()
r, err := newConfluentSchemaRegistry(regServer.URL(), nil, nil)
require.NoError(t, err)
require.Equal(t, defaultSchemaRegistryTimeout, r.client.Timeout)

// add explicit timeout param.
u, err := url.Parse(regServer.URL())
require.NoError(t, err)
values := u.Query()
values.Set(timeoutParam, "42ms")
u.RawQuery = values.Encode()
r, err = newConfluentSchemaRegistry(u.String(), nil, nil)
require.NoError(t, err)
require.Equal(t, 42*time.Millisecond, r.client.Timeout)
})
}

type mockExternalConnectionProvider map[string]string
Expand Down Expand Up @@ -122,6 +142,7 @@ func TestConfluentSchemaRegistryRetryMetrics(t *testing.T) {
}
return nil
})
require.EqualValues(t, 0, sliMetrics.SchemaRegistrations.Value())
cancel()
})

Expand Down
2 changes: 1 addition & 1 deletion pkg/cloud/amazon/aws_kms.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func MakeAWSKMS(ctx context.Context, uri string, env cloud.KMSEnv) (cloud.KMS, e
case cloud.AuthParamImplicit:
if env.KMSConfig().DisableImplicitCredentials {
return nil, errors.New(
"implicit credentials disallowed for s3 due to --external-io-implicit-credentials flag")
"implicit credentials disallowed for s3 due to --external-io-disable-implicit-credentials flag")
}
opts.SharedConfigState = session.SharedConfigEnable
default:
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloud/amazon/s3_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ func MakeS3Storage(
case cloud.AuthParamImplicit:
if args.IOConf.DisableImplicitCredentials {
return nil, errors.New(
"implicit credentials disallowed for s3 due to --external-io-implicit-credentials flag")
"implicit credentials disallowed for s3 due to --external-io-disable-implicit-credentials flag")
}
default:
return nil, errors.Errorf("unsupported value %s for %s", conf.Auth, cloud.AuthParam)
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloud/azure/azure_kms.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func MakeAzureKMS(ctx context.Context, uri string, env cloud.KMSEnv) (cloud.KMS,
case cloudpb.AzureAuth_IMPLICIT:
if env.KMSConfig().DisableImplicitCredentials {
return nil, errors.New(
"implicit credentials disallowed for azure due to --external-io-implicit-credentials flag")
"implicit credentials disallowed for azure due to --external-io-disable-implicit-credentials flag")
}
// The Default credential supports env vars and managed identity magic.
// We rely on the former for testing and the latter in prod.
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloud/azure/azure_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func makeAzureStorage(
case cloudpb.AzureAuth_IMPLICIT:
if args.IOConf.DisableImplicitCredentials {
return nil, errors.New(
"implicit credentials disallowed for azure due to --external-io-implicit-credentials flag")
"implicit credentials disallowed for azure due to --external-io-disable-implicit-credentials flag")
}
// The Default credential supports env vars and managed identity magic.
// We rely on the former for testing and the latter in prod.
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloud/gcp/gcp_kms.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func MakeGCSKMS(ctx context.Context, uri string, env cloud.KMSEnv) (cloud.KMS, e
case cloud.AuthParamImplicit:
if env.KMSConfig().DisableImplicitCredentials {
return nil, errors.New(
"implicit credentials disallowed for gcs due to --external-io-implicit-credentials flag")
"implicit credentials disallowed for gcs due to --external-io-disable-implicit-credentials flag")
}
// If implicit credentials used, no client options needed.
default:
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloud/gcp/gcp_kms_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,6 @@ func TestGCSKMSDisallowImplicitCredentials(t *testing.T) {
Settings: gcpKMSTestSettings,
ExternalIOConfig: &base.ExternalIODirConfig{DisableImplicitCredentials: true}})
require.True(t, testutils.IsError(err,
"implicit credentials disallowed for gcs due to --external-io-implicit-credentials flag"),
"implicit credentials disallowed for gcs due to --external-io-disable-implicit-credentials flag"),
)
}
19 changes: 4 additions & 15 deletions pkg/cmd/roachtest/tests/copyfrom.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package tests
import (
"context"
"fmt"
"runtime"
"strings"

"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
Expand Down Expand Up @@ -60,22 +59,12 @@ CREATE INDEX l_sk_pk ON lineitem (l_suppkey, l_partkey);
`

func initTest(ctx context.Context, t test.Test, c cluster.Cluster, sf int) {
if runtime.GOOS == "linux" {
if err := repeatRunE(
ctx, t, c, c.All(), "update apt-get", `sudo apt-get -qq update`,
); err != nil {
t.Fatal(err)
}
if err := repeatRunE(
ctx,
t,
c,
c.All(),
"install dependencies",
`sudo apt-get install -qq postgresql`,
); err != nil {
if !c.IsLocal() {
if err := c.Install(ctx, t.L(), c.All(), "postgresql"); err != nil {
t.Fatal(err)
}
} else {
t.L().Printf("when running locally, ensure that psql is installed")
}
csv := fmt.Sprintf(tpchLineitemFmt, sf)
c.Run(ctx, c.Node(1), "rm -f /tmp/lineitem-table.csv")
Expand Down
8 changes: 8 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,12 @@ This counts the number of ranges with an active rangefeed that are performing ca
Measurement: "Ranges",
Unit: metric.Unit_COUNT,
}
metaDistSenderRangefeedRestartRanges = metric.Metadata{
Name: "distsender.rangefeed.restart_ranges",
Help: `Number of ranges that were restarted due to transient errors`,
Measurement: "Ranges",
Unit: metric.Unit_COUNT,
}
metaDistSenderRangefeedRestartStuck = metric.Metadata{
Name: "distsender.rangefeed.restart_stuck",
Help: `Number of times a rangefeed was restarted due to not receiving ` +
Expand Down Expand Up @@ -239,6 +245,7 @@ type DistSenderMetrics struct {
RangefeedRanges *metric.Gauge
RangefeedCatchupRanges *metric.Gauge
RangefeedErrorCatchup *metric.Counter
RangefeedRestartRanges *metric.Counter
RangefeedRestartStuck *metric.Counter
MethodCounts [kvpb.NumMethods]*metric.Counter
ErrCounts [kvpb.NumErrors]*metric.Counter
Expand All @@ -260,6 +267,7 @@ func makeDistSenderMetrics() DistSenderMetrics {
RangefeedRanges: metric.NewGauge(metaDistSenderRangefeedTotalRanges),
RangefeedCatchupRanges: metric.NewGauge(metaDistSenderRangefeedCatchupRanges),
RangefeedErrorCatchup: metric.NewCounter(metaDistSenderRangefeedErrorCatchupRanges),
RangefeedRestartRanges: metric.NewCounter(metaDistSenderRangefeedRestartRanges),
RangefeedRestartStuck: metric.NewCounter(metaDistSenderRangefeedRestartStuck),
}
for i := range m.MethodCounts {
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,8 @@ func (m *rangefeedMuxer) receiveEventsFromNode(
func (m *rangefeedMuxer) restartActiveRangeFeed(
ctx context.Context, active *activeMuxRangeFeed, reason error,
) error {
m.ds.metrics.RangefeedRestartRanges.Inc(1)

if log.V(1) {
log.Infof(ctx, "RangeFeed %s@%s disconnected with last checkpoint %s ago: %v",
active.Span, active.StartAfter, timeutil.Since(active.Resolved.GoTime()), reason)
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ func (ds *DistSender) partialRangeFeed(
if err != nil {
return err
}
ds.metrics.RangefeedRestartRanges.Inc(1)
if errInfo.evict {
token.Evict(ctx)
token = rangecache.EvictionToken{}
Expand Down
47 changes: 46 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func TestEvalAddSSTable(t *testing.T) {
noShadowBelow int64 // DisallowShadowingBelow
requireReqTS bool // AddSSTableRequireAtRequestTimestamp
expect kvs
ignoreExpect bool
expectErr interface{} // error type, substring, substring slice, or true (any)
expectErrRace interface{}
expectStatsEst bool // expect MVCCStats.ContainsEstimates, don't check stats
Expand Down Expand Up @@ -1040,6 +1041,48 @@ func TestEvalAddSSTable(t *testing.T) {
sst: kvs{rangeKV("a", "b", 7, "")},
expectErr: &kvpb.WriteTooOldError{},
},
"DisallowConflict allows overlapping sst range tombstones": {
noConflict: true,
data: kvs{pointKV("ib", 6, "foo"), pointKV("if", 6, "foo"), pointKV("it", 6, "foo"), rangeKV("i", "j", 5, "")},
sst: kvs{rangeKV("ia", "irc", 8, ""), rangeKV("ie", "iu", 9, ""), pointKV("ic", 7, "foo"), pointKV("iq", 8, "foo")},
ignoreExpect: true,
},
"DisallowConflict does not miss deleted ext keys": {
noConflict: true,
data: kvs{pointKV("c", 6, "foo"), pointKV("d", 6, "foo"), pointKV("e", 6, "foo"), rangeKV("bb", "j", 5, "")},
sst: kvs{rangeKV("b", "k", 8, ""), pointKV("cc", 9, "foo"), pointKV("dd", 7, "foo"), pointKV("ee", 7, "foo")},
ignoreExpect: true,
},
"DisallowConflict does not miss deleted ext keys 2": {
noConflict: true,
data: kvs{pointKV("kr", 7, "foo"), pointKV("krj", 7, "foo"), pointKV("ksq", 7, "foo"), pointKV("ku", 6, "foo")},
sst: kvs{rangeKV("ke", "l", 11, ""), pointKV("kr", 8, "bar"), pointKV("ksxk", 9, "bar")},
ignoreExpect: true,
},
"DisallowConflict does not miss deleted ext keys 3": {
noConflict: true,
data: kvs{pointKV("xe", 5, "foo"), pointKV("xg", 6, "foo"), pointKV("xh", 7, "foo"), rangeKV("xf", "xk", 5, "")},
sst: kvs{pointKV("xeqn", 10, "foo"), pointKV("xh", 12, "foo"), rangeKV("x", "xp", 11, "")},
ignoreExpect: true,
},
"DisallowConflict does not miss deleted ext keys 4": {
noConflict: true,
data: kvs{pointKV("xh", 7, "foo")},
sst: kvs{pointKV("xh", 12, "foo"), rangeKV("x", "xp", 11, "")},
ignoreExpect: true,
},
"DisallowConflict does not repeatedly count ext value deleted by ext range": {
noConflict: true,
data: kvs{rangeKV("bf", "bjs", 7, ""), pointKV("bbeg", 6, "foo"), pointKV("bf", 6, "foo"), pointKV("bl", 6, "foo")},
sst: kvs{pointKV("bbtq", 11, "foo"), pointKV("bbw", 11, "foo"), pointKV("bc", 11, "foo"), pointKV("bl", 12, "foo")},
ignoreExpect: true,
},
"DisallowConflict does not miss sst range keys after overlapping point": {
noConflict: true,
data: kvs{pointKV("oe", 8, "foo"), pointKV("oi", 8, "foo"), rangeKV("o", "omk", 7, ""), pointKV("od", 6, "foo")},
sst: kvs{pointKV("oe", 11, "foo"), pointKV("oih", 12, "foo"), rangeKV("ods", "ogvh", 10, ""), rangeKV("ogvh", "ohl", 10, ""), rangeKV("ogvh", "ohl", 9, "")},
ignoreExpect: true,
},
}
testutils.RunTrueAndFalse(t, "IngestAsWrites", func(t *testing.T, ingestAsWrites bool) {
testutils.RunValues(t, "RewriteConcurrency", []interface{}{0, 8}, func(t *testing.T, c interface{}) {
Expand Down Expand Up @@ -1198,7 +1241,9 @@ func TestEvalAddSSTable(t *testing.T) {
}

// Scan resulting data from engine.
require.Equal(t, expect, storageutils.ScanEngine(t, engine))
if !tc.ignoreExpect {
require.Equal(t, expect, storageutils.ScanEngine(t, engine))
}

// Check that stats were updated correctly.
if tc.expectStatsEst {
Expand Down
Loading

0 comments on commit 5fc479d

Please sign in to comment.