From 0f424ea828b7adb7e687868f0e3564eab017f4ab Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Tue, 23 Mar 2021 22:43:23 +0100 Subject: [PATCH 01/11] utilccl,kvccl: add IsEnterpriseEnabled for faster license checks `utilccl.CheckEnterpriseEnabled()` is used to check whether a valid enterprise license exists for a given feature. If no valid license is found, it returns an error with specific details. However, `kvccl` used this function in follower read hot paths, and instantiating an error when follower reads are unavailable could have significant overhead -- see e.g. #62447. This patch adds `IsEnterpriseEnabled()`, which has the same behavior as `CheckEnterpriseEnabled()` but returns a boolean instead. This is significantly faster since we can avoid instantiating a custom error each time. `kvccl` is also updated to use this in hot paths. Release note: None --- .../kvccl/kvfollowerreadsccl/followerreads.go | 12 ++++- pkg/ccl/utilccl/license_check.go | 49 ++++++++++++++++--- 2 files changed, 52 insertions(+), 9 deletions(-) diff --git a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads.go b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads.go index 636abd8ae5d2..899679349e22 100644 --- a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads.go +++ b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads.go @@ -77,16 +77,26 @@ func getGlobalReadsLead(clock *hlc.Clock) time.Duration { return clock.MaxOffset() } +// checkEnterpriseEnabled checks whether the enterprise feature for follower +// reads is enabled, returning a detailed error if not. It is not suitable for +// use in hot paths since a new error may be instantiated on each call. func checkEnterpriseEnabled(clusterID uuid.UUID, st *cluster.Settings) error { org := sql.ClusterOrganization.Get(&st.SV) return utilccl.CheckEnterpriseEnabled(st, clusterID, org, "follower reads") } +// isEnterpriseEnabled is faster than checkEnterpriseEnabled, and suitable +// for hot paths. +func isEnterpriseEnabled(clusterID uuid.UUID, st *cluster.Settings) bool { + org := sql.ClusterOrganization.Get(&st.SV) + return utilccl.IsEnterpriseEnabled(st, clusterID, org, "follower reads") +} + func checkFollowerReadsEnabled(clusterID uuid.UUID, st *cluster.Settings) bool { if !kvserver.FollowerReadsEnabled.Get(&st.SV) { return false } - return checkEnterpriseEnabled(clusterID, st) == nil + return isEnterpriseEnabled(clusterID, st) } func evalFollowerReadOffset(clusterID uuid.UUID, st *cluster.Settings) (time.Duration, error) { diff --git a/pkg/ccl/utilccl/license_check.go b/pkg/ccl/utilccl/license_check.go index b3e27afd149c..d2c494f7ac34 100644 --- a/pkg/ccl/utilccl/license_check.go +++ b/pkg/ccl/utilccl/license_check.go @@ -57,6 +57,11 @@ const ( testingEnterpriseEnabled = 1 ) +// errEnterpriseRequired is returned by check() when the caller does +// not request detailed errors. +var errEnterpriseRequired = pgerror.New(pgcode.CCLValidLicenseRequired, + "a valid enterprise license is required") + // TestingEnableEnterprise allows overriding the license check in tests. func TestingEnableEnterprise() func() { before := atomic.LoadInt32(&testingEnterprise) @@ -78,11 +83,21 @@ func TestingDisableEnterprise() func() { // CheckEnterpriseEnabled returns a non-nil error if the requested enterprise // feature is not enabled, including information or a link explaining how to // enable it. +// +// This should not be used in hot paths, since an unavailable feature will +// result in a new error being instantiated for every call -- use +// IsEnterpriseEnabled() instead. func CheckEnterpriseEnabled(st *cluster.Settings, cluster uuid.UUID, org, feature string) error { - if atomic.LoadInt32(&testingEnterprise) == testingEnterpriseEnabled { - return nil - } - return checkEnterpriseEnabledAt(st, timeutil.Now(), cluster, org, feature) + return checkEnterpriseEnabledAt(st, timeutil.Now(), cluster, org, feature, true /* withDetails */) +} + +// IsEnterpriseEnabled returns whether the requested enterprise feature is +// enabled. It is faster than CheckEnterpriseEnabled, since it does not return +// details about why the feature is unavailable, and can therefore be used in +// hot paths. +func IsEnterpriseEnabled(st *cluster.Settings, cluster uuid.UUID, org, feature string) bool { + return checkEnterpriseEnabledAt( + st, timeutil.Now(), cluster, org, feature, false /* withDetails */) == nil } func init() { @@ -114,8 +129,11 @@ func TimeToEnterpriseLicenseExpiry( } func checkEnterpriseEnabledAt( - st *cluster.Settings, at time.Time, cluster uuid.UUID, org, feature string, + st *cluster.Settings, at time.Time, cluster uuid.UUID, org, feature string, withDetails bool, ) error { + if atomic.LoadInt32(&testingEnterprise) == testingEnterpriseEnabled { + return nil + } var lic *licenseccl.License // FIXME(tschottdorf): see whether it makes sense to cache the decoded // license. @@ -125,7 +143,7 @@ func checkEnterpriseEnabledAt( return err } } - return check(lic, at, cluster, org, feature) + return check(lic, at, cluster, org, feature, withDetails) } func getLicenseType(st *cluster.Settings) (string, error) { @@ -149,9 +167,15 @@ func decode(s string) (*licenseccl.License, error) { return lic, err } -// check returns an error if the license is empty or not currently valid. -func check(l *licenseccl.License, at time.Time, cluster uuid.UUID, org, feature string) error { +// check returns an error if the license is empty or not currently valid. If +// withDetails is false, a generic error message is returned for performance. +func check( + l *licenseccl.License, at time.Time, cluster uuid.UUID, org, feature string, withDetails bool, +) error { if l == nil { + if !withDetails { + return errEnterpriseRequired + } // TODO(dt): link to some stable URL that then redirects to a helpful page // that explains what to do here. link := "https://cockroachlabs.com/pricing?cluster=" @@ -168,6 +192,9 @@ func check(l *licenseccl.License, at time.Time, cluster uuid.UUID, org, feature // suddenly throwing errors at them. if l.ValidUntilUnixSec > 0 && l.Type != licenseccl.License_Enterprise { if expiration := timeutil.Unix(l.ValidUntilUnixSec, 0); at.After(expiration) { + if !withDetails { + return errEnterpriseRequired + } licensePrefix := redact.SafeString("") switch l.Type { case licenseccl.License_NonCommercial: @@ -190,6 +217,9 @@ func check(l *licenseccl.License, at time.Time, cluster uuid.UUID, org, feature if strings.EqualFold(l.OrganizationName, org) { return nil } + if !withDetails { + return errEnterpriseRequired + } return pgerror.Newf(pgcode.CCLValidLicenseRequired, "license valid only for %q", l.OrganizationName) } @@ -201,6 +231,9 @@ func check(l *licenseccl.License, at time.Time, cluster uuid.UUID, org, feature } // no match, so compose an error message. + if !withDetails { + return errEnterpriseRequired + } var matches bytes.Buffer for i, c := range l.ClusterID { if i > 0 { From a7c1f374ffbf9dd552382aaa2e495b24bb34d645 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Tue, 23 Mar 2021 23:11:49 +0100 Subject: [PATCH 02/11] utilccl: cache license decoding Previously, the `utilccl` package would decode the license from the the base64-encoded Protobuf representation in settings every time it was needed, which was sufficient for its uses. However, recently there's been a need to check whether enterprise features are enabled in hot paths (e.g. with follower reads as seen in #62447), making the decoding cost too great. This patch adds `cluster.Settings.Cache` as a shared cache, and uses it to cache decoded licenses with a private key type. Release note: None --- pkg/ccl/utilccl/license_check.go | 60 ++++++++++++++---------- pkg/ccl/utilccl/license_check_test.go | 2 +- pkg/ccl/utilccl/license_test.go | 4 +- pkg/settings/cluster/cluster_settings.go | 5 ++ 4 files changed, 43 insertions(+), 28 deletions(-) diff --git a/pkg/ccl/utilccl/license_check.go b/pkg/ccl/utilccl/license_check.go index d2c494f7ac34..8d4e101367f8 100644 --- a/pkg/ccl/utilccl/license_check.go +++ b/pkg/ccl/utilccl/license_check.go @@ -62,6 +62,10 @@ const ( var errEnterpriseRequired = pgerror.New(pgcode.CCLValidLicenseRequired, "a valid enterprise license is required") +// licenseCacheKey is used to cache licenses in cluster.Settings.Cache, +// keeping the entries private. +type licenseCacheKey string + // TestingEnableEnterprise allows overriding the license check in tests. func TestingEnableEnterprise() func() { before := atomic.LoadInt32(&testingEnterprise) @@ -112,19 +116,12 @@ func init() { func TimeToEnterpriseLicenseExpiry( ctx context.Context, st *cluster.Settings, asOf time.Time, ) (time.Duration, error) { - var lic *licenseccl.License - // FIXME(tschottdorf): see whether it makes sense to cache the decoded - // license. - if str := enterpriseLicense.Get(&st.SV); str != "" { - var err error - if lic, err = decode(str); err != nil { - return 0, err - } - } else { - return 0, nil + license, err := getLicense(st) + if err != nil || license == nil { + return 0, err } - expiration := timeutil.Unix(lic.ValidUntilUnixSec, 0) + expiration := timeutil.Unix(license.ValidUntilUnixSec, 0) return expiration.Sub(asOf), nil } @@ -134,28 +131,41 @@ func checkEnterpriseEnabledAt( if atomic.LoadInt32(&testingEnterprise) == testingEnterpriseEnabled { return nil } - var lic *licenseccl.License - // FIXME(tschottdorf): see whether it makes sense to cache the decoded - // license. - if str := enterpriseLicense.Get(&st.SV); str != "" { - var err error - if lic, err = decode(str); err != nil { - return err - } + license, err := getLicense(st) + if err != nil { + return err } - return check(lic, at, cluster, org, feature, withDetails) + return check(license, at, cluster, org, feature, withDetails) } -func getLicenseType(st *cluster.Settings) (string, error) { +// getLicense fetches the license from the given settings, using Settings.Cache +// to cache the decoded license (if any). The returned license must not be +// modified by the caller. +func getLicense(st *cluster.Settings) (*licenseccl.License, error) { str := enterpriseLicense.Get(&st.SV) if str == "" { - return "None", nil + return nil, nil } - lic, err := decode(str) + cacheKey := licenseCacheKey(str) + if cachedLicense, ok := st.Cache.Load(cacheKey); ok { + return cachedLicense.(*licenseccl.License), nil + } + license, err := decode(str) + if err != nil { + return nil, err + } + st.Cache.Store(cacheKey, license) + return license, nil +} + +func getLicenseType(st *cluster.Settings) (string, error) { + license, err := getLicense(st) if err != nil { return "", err + } else if license == nil { + return "None", nil } - return lic.Type.String(), nil + return license.Type.String(), nil } // decode attempts to read a base64 encoded License. @@ -164,7 +174,7 @@ func decode(s string) (*licenseccl.License, error) { if err != nil { return nil, pgerror.WithCandidateCode(err, pgcode.Syntax) } - return lic, err + return lic, nil } // check returns an error if the license is empty or not currently valid. If diff --git a/pkg/ccl/utilccl/license_check_test.go b/pkg/ccl/utilccl/license_check_test.go index 206b6e6f250b..055bfe8f8227 100644 --- a/pkg/ccl/utilccl/license_check_test.go +++ b/pkg/ccl/utilccl/license_check_test.go @@ -63,7 +63,7 @@ func TestSettingAndCheckingLicense(t *testing.T) { if err := updater.Set("enterprise.license", tc.lic, "s"); err != nil { t.Fatal(err) } - err := checkEnterpriseEnabledAt(st, tc.checkTime, tc.checkCluster, "", "") + err := checkEnterpriseEnabledAt(st, tc.checkTime, tc.checkCluster, "", "", true) if !testutils.IsError(err, tc.err) { l, _ := decode(tc.lic) t.Fatalf("%d: lic %v, update by %T, checked by %s at %s, got %q", i, l, updater, tc.checkCluster, tc.checkTime, err) diff --git a/pkg/ccl/utilccl/license_test.go b/pkg/ccl/utilccl/license_test.go index 23f9441dd1d6..adc064a55d2b 100644 --- a/pkg/ccl/utilccl/license_test.go +++ b/pkg/ccl/utilccl/license_test.go @@ -83,7 +83,7 @@ func TestLicense(t *testing.T) { } } if err := check( - lic, tc.checkTime, tc.checkCluster, tc.checkOrg, "", + lic, tc.checkTime, tc.checkCluster, tc.checkOrg, "", true, ); !testutils.IsError(err, tc.err) { t.Fatalf("%d: lic for %s to %s, checked by %s at %s.\n got %q", i, tc.grantedTo, tc.expiration, tc.checkCluster, tc.checkTime, err) @@ -108,7 +108,7 @@ func TestExpiredLicenseLanguage(t *testing.T) { Type: licenseccl.License_Evaluation, ValidUntilUnixSec: 1, } - err := check(lic, timeutil.Now(), uuid.MakeV4(), "", "RESTORE") + err := check(lic, timeutil.Now(), uuid.MakeV4(), "", "RESTORE", true) expected := "Use of RESTORE requires an enterprise license. Your evaluation license expired on " + "January 1, 1970. If you're interested in getting a new license, please contact " + "subscriptions@cockroachlabs.com and we can help you out." diff --git a/pkg/settings/cluster/cluster_settings.go b/pkg/settings/cluster/cluster_settings.go index aa0cd7e7a238..13291e15d4d6 100644 --- a/pkg/settings/cluster/cluster_settings.go +++ b/pkg/settings/cluster/cluster_settings.go @@ -12,6 +12,7 @@ package cluster import ( "context" + "sync" "sync/atomic" "github.com/cockroachdb/cockroach/pkg/clusterversion" @@ -51,6 +52,10 @@ type Settings struct { // Setting the active cluster version has a very specific, intended usage // pattern. Look towards the interface itself for more commentary. Version clusterversion.Handle + + // Cache can be used for arbitrary caching, e.g. to cache decoded + // enterprises licenses for utilccl.CheckEnterpriseEnabled(). + Cache sync.Map } // TelemetryOptOut is a place for controlling whether to opt out of telemetry or not. From 5d1dbb5e67233bc07d730e1838ac80940424be37 Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Wed, 31 Mar 2021 12:32:19 +0100 Subject: [PATCH 03/11] testutils: add skip.UnderBazel and skip.UnderBazelWithIssue This is to skip individual tests under bazel. This seems a bit more fine-grained than the broken_in_bazel tag in the bazel configuration but also allows us to avoid skipping tests that work outside of bazel in our main test suite. Release note: None --- pkg/cmd/skip-test/main.go | 22 +++++++++++++++++++--- pkg/testutils/skip/BUILD.bazel | 1 + pkg/testutils/skip/skip.go | 12 ++++++++++++ 3 files changed, 32 insertions(+), 3 deletions(-) diff --git a/pkg/cmd/skip-test/main.go b/pkg/cmd/skip-test/main.go index 6b45f65c2d2e..bbc33f043654 100644 --- a/pkg/cmd/skip-test/main.go +++ b/pkg/cmd/skip-test/main.go @@ -45,6 +45,11 @@ var flagUnderRace = flag.Bool( false, "if true, only skip under race", ) +var flagUnderBazel = flag.Bool( + "under_bazel", + false, + "if true, only skip under bazel", +) func main() { flag.Parse() @@ -81,6 +86,10 @@ func main() { log.Fatalf("expected test to be of format `TestName` or `pkg/to/test:TestName`, found %s", arg) } + if *flagUnderBazel && *flagUnderRace { + log.Fatal("cannot use both -under_race and -under_bazel") + } + // Check git status is clean. if err := spawn("git", "diff", "--exit-code"); err != nil { log.Fatal(errors.Wrap(err, "git state may not be clean, please use `git stash` or commit changes before proceeding.")) @@ -131,9 +140,11 @@ func main() { if err := spawn("git", "add", fileName); err != nil { log.Fatal(errors.Wrapf(err, "failed to add %s to commit", fileName)) } - var underRaceStr string + var modifierStr string if *flagUnderRace { - underRaceStr = " under race" + modifierStr = " under race" + } else if *flagUnderBazel { + modifierStr = " under bazel" } commitMsg := fmt.Sprintf(`%s: skip %s%s @@ -146,7 +157,7 @@ Generated by bin/skip-test. Release justification: non-production code changes Release note: None -`, pkgPrefix, testName, underRaceStr, issueNum, *flagReason) +`, pkgPrefix, testName, modifierStr, issueNum, *flagReason) if err := spawn("git", "commit", "-m", commitMsg); err != nil { log.Fatal(errors.Wrapf(err, "failed to commit %s", fileName)) } @@ -208,6 +219,11 @@ func replaceFile(fileName, testName string, issueNum int) { newLines, fmt.Sprintf(`skip.UnderRaceWithIssue(t, %d, "%s")`, issueNum, *flagReason), ) + } else if *flagUnderBazel { + newLines = append( + newLines, + fmt.Sprintf(`skip.UnderBazelWithIssue(t, %d, "%s")`, issueNum, *flagReason), + ) } else { newLines = append( newLines, diff --git a/pkg/testutils/skip/BUILD.bazel b/pkg/testutils/skip/BUILD.bazel index 4ea8aaececc7..5f9a6a694c66 100644 --- a/pkg/testutils/skip/BUILD.bazel +++ b/pkg/testutils/skip/BUILD.bazel @@ -9,6 +9,7 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/testutils/skip", visibility = ["//visibility:public"], deps = [ + "//pkg/build/bazel", "//pkg/util", "//pkg/util/envutil", ], diff --git a/pkg/testutils/skip/skip.go b/pkg/testutils/skip/skip.go index 010e0e5f31cd..ca87583f4d18 100644 --- a/pkg/testutils/skip/skip.go +++ b/pkg/testutils/skip/skip.go @@ -15,6 +15,7 @@ import ( "fmt" "testing" + "github.com/cockroachdb/cockroach/pkg/build/bazel" "github.com/cockroachdb/cockroach/pkg/util" ) @@ -66,6 +67,17 @@ func UnderRaceWithIssue(t SkippableTest, githubIssueID int, args ...interface{}) } } +// UnderBazelWithIssue skips this test if we are building inside bazel, +// logging the given issue ID as the reason. +func UnderBazelWithIssue(t SkippableTest, githubIssueID int, args ...interface{}) { + t.Helper() + if bazel.BuiltWithBazel() { + t.Skip(append([]interface{}{fmt.Sprintf( + "disabled under bazel. issue: https://github.com/cockroachdb/cockroach/issue/%d", githubIssueID, + )}, args...)) + } +} + // UnderShort skips this test if the -short flag is specified. func UnderShort(t SkippableTest, args ...interface{}) { t.Helper() From 4648747e644240780b71e8c1cbf2e485cbd59c37 Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Wed, 31 Mar 2021 16:02:53 +0100 Subject: [PATCH 04/11] kvserver: use UnderBazelWithIssue for TestPushTxnHeartbeatTimeout skip This test apepars to work outside of bazel, so let's limit where we skip it. Release note: None --- pkg/kv/kvserver/replica_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index c216f43079f5..887b557022c6 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -5594,7 +5594,7 @@ func TestPushTxnQueryPusheeHasNewerVersion(t *testing.T) { // heartbeat within its transaction liveness threshold can be pushed/aborted. func TestPushTxnHeartbeatTimeout(t *testing.T) { defer leaktest.AfterTest(t)() - skip.WithIssue(t, 62860, "flaky test") + skip.UnderBazelWithIssue(t, 62860, "flaky test") defer log.Scope(t).Close(t) tc := testContext{manualClock: hlc.NewManualClock(123)} stopper := stop.NewStopper() From fbb28980c01e3ddcd598e6fb68768450d1724260 Mon Sep 17 00:00:00 2001 From: richardjcai Date: Mon, 29 Mar 2021 15:10:54 -0400 Subject: [PATCH 05/11] workload: add idle-conns flag for adding idle connections to tpcc Release note: None --- pkg/workload/tpcc/tpcc.go | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/pkg/workload/tpcc/tpcc.go b/pkg/workload/tpcc/tpcc.go index 5079730ce2c8..d7e0dae1c6c9 100644 --- a/pkg/workload/tpcc/tpcc.go +++ b/pkg/workload/tpcc/tpcc.go @@ -44,6 +44,8 @@ type tpcc struct { nowString []byte numConns int + idleConns int + // Used in non-uniform random data generation. cLoad is the value of C at load // time. cCustomerID is the value of C for the customer id generator. cItemID // is the value of C for the item id generator. See 2.1.6. @@ -187,6 +189,7 @@ var tpccMeta = workload.Meta{ `Number of connections. Defaults to --warehouses * %d (except in nowait mode, where it defaults to --workers`, numConnsPerWarehouse, )) + g.flags.IntVar(&g.idleConns, `idle-conns`, 0, `Number of idle connections. Defaults to 0`) g.flags.IntVar(&g.partitions, `partitions`, 1, `Partition tables`) g.flags.IntVar(&g.clientPartitions, `client-partitions`, 0, `Make client behave as if the tables are partitioned, but does not actually partition underlying data. Requires --partition-affinity.`) g.flags.IntSliceVar(&g.affinityPartitions, `partition-affinity`, nil, `Run load generator against specific partition (requires partitions). `+ @@ -622,6 +625,7 @@ func (w *tpcc) Ops( MaxConnsPerPool: 50, } fmt.Printf("Initializing %d connections...\n", w.numConns) + dbs := make([]*workload.MultiConnPool, len(urls)) var g errgroup.Group for i := range urls { @@ -672,6 +676,21 @@ func (w *tpcc) Ops( } } + fmt.Printf("Initializing %d idle connections...\n", w.idleConns) + var conns []*pgx.Conn + for i := 0; i < w.idleConns; i++ { + for _, url := range urls { + connConfig, err := pgx.ParseURI(url) + if err != nil { + return workload.QueryLoad{}, err + } + conn, err := pgx.Connect(connConfig) + if err != nil { + return workload.QueryLoad{}, err + } + conns = append(conns, conn) + } + } fmt.Printf("Initializing %d workers and preparing statements...\n", w.workers) ql := workload.QueryLoad{SQLDatabase: sqlDatabase} ql.WorkerFns = make([]func(context.Context) error, 0, w.workers) @@ -723,6 +742,15 @@ func (w *tpcc) Ops( for _, tx := range allTxs { reg.GetHandle().Get(tx.name) } + + // Close idle connections. + ql.Close = func(context context.Context) { + for _, conn := range conns { + if err := conn.Close(); err != nil { + log.Warningf(ctx, "%v", err) + } + } + } return ql, nil } From db1929fe7b5fd39f5f6e3e0cf07a29ea0bab2465 Mon Sep 17 00:00:00 2001 From: Eric Harmeling Date: Wed, 31 Mar 2021 11:58:40 -0400 Subject: [PATCH 06/11] Added CACHE to SEQUENCE syntax diagrams Release justification: non-production code changes Release note: None --- docs/generated/sql/bnf/alter_sequence_options_stmt.bnf | 4 ++-- docs/generated/sql/bnf/create_sequence_stmt.bnf | 4 ++-- docs/generated/sql/bnf/stmt_block.bnf | 1 + pkg/sql/parser/sql.y | 5 ++--- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/docs/generated/sql/bnf/alter_sequence_options_stmt.bnf b/docs/generated/sql/bnf/alter_sequence_options_stmt.bnf index b396c7e22f8d..cd0e4bcf2fa1 100644 --- a/docs/generated/sql/bnf/alter_sequence_options_stmt.bnf +++ b/docs/generated/sql/bnf/alter_sequence_options_stmt.bnf @@ -1,3 +1,3 @@ alter_sequence_options_stmt ::= - 'ALTER' 'SEQUENCE' sequence_name ( ( ( 'NO' 'CYCLE' | 'OWNED' 'BY' 'NONE' | 'OWNED' 'BY' column_name | 'INCREMENT' integer | 'INCREMENT' 'BY' integer | 'MINVALUE' integer | 'NO' 'MINVALUE' | 'MAXVALUE' integer | 'NO' 'MAXVALUE' | 'START' integer | 'START' 'WITH' integer | 'VIRTUAL' ) ) ( ( ( 'NO' 'CYCLE' | 'OWNED' 'BY' 'NONE' | 'OWNED' 'BY' column_name | 'INCREMENT' integer | 'INCREMENT' 'BY' integer | 'MINVALUE' integer | 'NO' 'MINVALUE' | 'MAXVALUE' integer | 'NO' 'MAXVALUE' | 'START' integer | 'START' 'WITH' integer | 'VIRTUAL' ) ) )* ) - | 'ALTER' 'SEQUENCE' 'IF' 'EXISTS' sequence_name ( ( ( 'NO' 'CYCLE' | 'OWNED' 'BY' 'NONE' | 'OWNED' 'BY' column_name | 'INCREMENT' integer | 'INCREMENT' 'BY' integer | 'MINVALUE' integer | 'NO' 'MINVALUE' | 'MAXVALUE' integer | 'NO' 'MAXVALUE' | 'START' integer | 'START' 'WITH' integer | 'VIRTUAL' ) ) ( ( ( 'NO' 'CYCLE' | 'OWNED' 'BY' 'NONE' | 'OWNED' 'BY' column_name | 'INCREMENT' integer | 'INCREMENT' 'BY' integer | 'MINVALUE' integer | 'NO' 'MINVALUE' | 'MAXVALUE' integer | 'NO' 'MAXVALUE' | 'START' integer | 'START' 'WITH' integer | 'VIRTUAL' ) ) )* ) + 'ALTER' 'SEQUENCE' sequence_name ( ( ( 'NO' 'CYCLE' | 'OWNED' 'BY' 'NONE' | 'OWNED' 'BY' column_name | 'CACHE' integer | 'INCREMENT' integer | 'INCREMENT' 'BY' integer | 'MINVALUE' integer | 'NO' 'MINVALUE' | 'MAXVALUE' integer | 'NO' 'MAXVALUE' | 'START' integer | 'START' 'WITH' integer | 'VIRTUAL' ) ) ( ( ( 'NO' 'CYCLE' | 'OWNED' 'BY' 'NONE' | 'OWNED' 'BY' column_name | 'CACHE' integer | 'INCREMENT' integer | 'INCREMENT' 'BY' integer | 'MINVALUE' integer | 'NO' 'MINVALUE' | 'MAXVALUE' integer | 'NO' 'MAXVALUE' | 'START' integer | 'START' 'WITH' integer | 'VIRTUAL' ) ) )* ) + | 'ALTER' 'SEQUENCE' 'IF' 'EXISTS' sequence_name ( ( ( 'NO' 'CYCLE' | 'OWNED' 'BY' 'NONE' | 'OWNED' 'BY' column_name | 'CACHE' integer | 'INCREMENT' integer | 'INCREMENT' 'BY' integer | 'MINVALUE' integer | 'NO' 'MINVALUE' | 'MAXVALUE' integer | 'NO' 'MAXVALUE' | 'START' integer | 'START' 'WITH' integer | 'VIRTUAL' ) ) ( ( ( 'NO' 'CYCLE' | 'OWNED' 'BY' 'NONE' | 'OWNED' 'BY' column_name | 'CACHE' integer | 'INCREMENT' integer | 'INCREMENT' 'BY' integer | 'MINVALUE' integer | 'NO' 'MINVALUE' | 'MAXVALUE' integer | 'NO' 'MAXVALUE' | 'START' integer | 'START' 'WITH' integer | 'VIRTUAL' ) ) )* ) diff --git a/docs/generated/sql/bnf/create_sequence_stmt.bnf b/docs/generated/sql/bnf/create_sequence_stmt.bnf index 9d2b1cfcd281..af8975c963c4 100644 --- a/docs/generated/sql/bnf/create_sequence_stmt.bnf +++ b/docs/generated/sql/bnf/create_sequence_stmt.bnf @@ -1,3 +1,3 @@ create_sequence_stmt ::= - 'CREATE' opt_temp 'SEQUENCE' sequence_name ( ( ( ( 'NO' 'CYCLE' | 'OWNED' 'BY' 'NONE' | 'OWNED' 'BY' column_name | 'INCREMENT' integer | 'INCREMENT' 'BY' integer | 'MINVALUE' integer | 'NO' 'MINVALUE' | 'MAXVALUE' integer | 'NO' 'MAXVALUE' | 'START' integer | 'START' 'WITH' integer | 'VIRTUAL' ) ) ( ( ( 'NO' 'CYCLE' | 'OWNED' 'BY' 'NONE' | 'OWNED' 'BY' column_name | 'INCREMENT' integer | 'INCREMENT' 'BY' integer | 'MINVALUE' integer | 'NO' 'MINVALUE' | 'MAXVALUE' integer | 'NO' 'MAXVALUE' | 'START' integer | 'START' 'WITH' integer | 'VIRTUAL' ) ) )* ) | ) - | 'CREATE' opt_temp 'SEQUENCE' 'IF' 'NOT' 'EXISTS' sequence_name ( ( ( ( 'NO' 'CYCLE' | 'OWNED' 'BY' 'NONE' | 'OWNED' 'BY' column_name | 'INCREMENT' integer | 'INCREMENT' 'BY' integer | 'MINVALUE' integer | 'NO' 'MINVALUE' | 'MAXVALUE' integer | 'NO' 'MAXVALUE' | 'START' integer | 'START' 'WITH' integer | 'VIRTUAL' ) ) ( ( ( 'NO' 'CYCLE' | 'OWNED' 'BY' 'NONE' | 'OWNED' 'BY' column_name | 'INCREMENT' integer | 'INCREMENT' 'BY' integer | 'MINVALUE' integer | 'NO' 'MINVALUE' | 'MAXVALUE' integer | 'NO' 'MAXVALUE' | 'START' integer | 'START' 'WITH' integer | 'VIRTUAL' ) ) )* ) | ) + 'CREATE' opt_temp 'SEQUENCE' sequence_name ( ( ( ( 'NO' 'CYCLE' | 'OWNED' 'BY' 'NONE' | 'OWNED' 'BY' column_name | 'CACHE' integer | 'INCREMENT' integer | 'INCREMENT' 'BY' integer | 'MINVALUE' integer | 'NO' 'MINVALUE' | 'MAXVALUE' integer | 'NO' 'MAXVALUE' | 'START' integer | 'START' 'WITH' integer | 'VIRTUAL' ) ) ( ( ( 'NO' 'CYCLE' | 'OWNED' 'BY' 'NONE' | 'OWNED' 'BY' column_name | 'CACHE' integer | 'INCREMENT' integer | 'INCREMENT' 'BY' integer | 'MINVALUE' integer | 'NO' 'MINVALUE' | 'MAXVALUE' integer | 'NO' 'MAXVALUE' | 'START' integer | 'START' 'WITH' integer | 'VIRTUAL' ) ) )* ) | ) + | 'CREATE' opt_temp 'SEQUENCE' 'IF' 'NOT' 'EXISTS' sequence_name ( ( ( ( 'NO' 'CYCLE' | 'OWNED' 'BY' 'NONE' | 'OWNED' 'BY' column_name | 'CACHE' integer | 'INCREMENT' integer | 'INCREMENT' 'BY' integer | 'MINVALUE' integer | 'NO' 'MINVALUE' | 'MAXVALUE' integer | 'NO' 'MAXVALUE' | 'START' integer | 'START' 'WITH' integer | 'VIRTUAL' ) ) ( ( ( 'NO' 'CYCLE' | 'OWNED' 'BY' 'NONE' | 'OWNED' 'BY' column_name | 'CACHE' integer | 'INCREMENT' integer | 'INCREMENT' 'BY' integer | 'MINVALUE' integer | 'NO' 'MINVALUE' | 'MAXVALUE' integer | 'NO' 'MAXVALUE' | 'START' integer | 'START' 'WITH' integer | 'VIRTUAL' ) ) )* ) | ) diff --git a/docs/generated/sql/bnf/stmt_block.bnf b/docs/generated/sql/bnf/stmt_block.bnf index 5412a4387075..65b13f6a3cbf 100644 --- a/docs/generated/sql/bnf/stmt_block.bnf +++ b/docs/generated/sql/bnf/stmt_block.bnf @@ -2560,6 +2560,7 @@ sequence_option_elem ::= 'NO' 'CYCLE' | 'OWNED' 'BY' 'NONE' | 'OWNED' 'BY' column_path + | 'CACHE' signed_iconst64 | 'INCREMENT' signed_iconst64 | 'INCREMENT' 'BY' signed_iconst64 | 'MINVALUE' signed_iconst64 diff --git a/pkg/sql/parser/sql.y b/pkg/sql/parser/sql.y index 4e7f1b5de6d0..d51b6b584261 100644 --- a/pkg/sql/parser/sql.y +++ b/pkg/sql/parser/sql.y @@ -2279,7 +2279,7 @@ username_or_sconst: { // We use UsernameValidation because username_or_sconst and role_spec // are only used for usernames of existing accounts, not when - // creating new users or roles. + // creating new users or roles. $$.val, _ = security.MakeSQLUsernameFromUserInput($1, security.UsernameValidation) } @@ -6838,8 +6838,7 @@ sequence_option_elem: return 1 } $$.val = tree.SequenceOption{Name: tree.SeqOptOwnedBy, ColumnItemVal: columnItem} } -| CACHE signed_iconst64 { /* SKIP DOC */ - x := $2.int64() +| CACHE signed_iconst64 { x := $2.int64() $$.val = tree.SequenceOption{Name: tree.SeqOptCache, IntVal: &x} } | INCREMENT signed_iconst64 { x := $2.int64() $$.val = tree.SequenceOption{Name: tree.SeqOptIncrement, IntVal: &x} } From 04c09fea322be55e25b064fe434b8c6264a1ab77 Mon Sep 17 00:00:00 2001 From: Aayush Shah Date: Tue, 30 Mar 2021 18:14:21 -0400 Subject: [PATCH 07/11] kvserver: deflake TestFollowerReadsWithStaleDescriptor A preceeding change (#62696) introduced a flakey update to this test. Prior to that change, this test was using 2 voting replicas but that change tried to make it use 1 voter and 1 non-voter instead (as a litmus test for the new syntax added in #62696). The test rebalances a replica away from a node and ensures that a historical read sent immediately afterwards gets re-routed to the leaseholder replica, since the receiving store had its replica destroyed. However, when we're using a non-voter in this test, that non-voter may not have learned about this replication change by the time it receives this historical query and that fails the assertion. This commit re-organizes the test and fixes the flake. Release note: None --- .../kvfollowerreadsccl/followerreads_test.go | 34 ++++++++++++++----- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go index 0ee1190a5eaf..5a0203e1f8b6 100644 --- a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go +++ b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go @@ -540,8 +540,7 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) { n1 := sqlutils.MakeSQLRunner(tc.Conns[0]) n1.Exec(t, `CREATE DATABASE t`) n1.Exec(t, `CREATE TABLE test (k INT PRIMARY KEY)`) - n1.Exec(t, `ALTER TABLE test EXPERIMENTAL_RELOCATE VOTERS VALUES (ARRAY[1], 1)`) - n1.Exec(t, `ALTER TABLE test EXPERIMENTAL_RELOCATE NON_VOTERS VALUES (ARRAY[2], 1)`) + n1.Exec(t, `ALTER TABLE test EXPERIMENTAL_RELOCATE VOTERS VALUES (ARRAY[1,2], 1)`) // Speed up closing of timestamps, as we'll in order to be able to use // follower_read_timestamp(). // Every 0.2s we'll close the timestamp from 0.4s ago. We'll attempt follower reads @@ -571,12 +570,31 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) { require.Equal(t, roachpb.StoreID(1), entry.Lease().Replica.StoreID) require.Equal(t, []roachpb.ReplicaDescriptor{ {NodeID: 1, StoreID: 1, ReplicaID: 1}, - {NodeID: 2, StoreID: 2, ReplicaID: 2, Type: roachpb.ReplicaTypeNonVoter()}, + {NodeID: 2, StoreID: 2, ReplicaID: 2}, }, entry.Desc().Replicas().Descriptors()) - // Relocate the follower. n2 will no longer have a replica. - n1.Exec(t, `ALTER TABLE test EXPERIMENTAL_RELOCATE VOTERS VALUES (ARRAY[1,3], 1)`) - n1.Exec(t, `ALTER TABLE test EXPERIMENTAL_RELOCATE NON_VOTERS VALUES (ARRAY[], 1)`) + // Remove the follower and add a new non-voter to n3. n2 will no longer have a + // replica. + n1.Exec(t, `ALTER TABLE test EXPERIMENTAL_RELOCATE VOTERS VALUES (ARRAY[1], 1)`) + n1.Exec(t, `ALTER TABLE test EXPERIMENTAL_RELOCATE NON_VOTERS VALUES (ARRAY[3], 1)`) + + // Wait until the new non-voter is upreplicated to n3. + testutils.SucceedsSoon( + t, func() error { + return tc.Server(2).GetStores().(*kvserver.Stores).VisitStores( + func(s *kvserver.Store) error { + repl := s.LookupReplica(tablePrefix) + if repl == nil { + return errors.Errorf("no replica found on store %s", s) + } + if !repl.IsInitialized() { + return errors.Errorf("non-voter not initialized") + } + return nil + }, + ) + }, + ) // Execute the query again and assert the cache is updated. This query will // not be executed as a follower read since it attempts to use n2 which @@ -585,7 +603,7 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) { n4.Exec(t, historicalQuery) // As a sanity check, verify that this was not a follower read. rec := <-recCh - require.False(t, kv.OnlyFollowerReads(rec), "query was not served through follower reads: %s", rec) + require.False(t, kv.OnlyFollowerReads(rec), "query was served through follower reads: %s", rec) // Check that the cache was properly updated. entry = n4Cache.GetCached(ctx, tablePrefix, false /* inverted */) require.NotNil(t, entry) @@ -593,7 +611,7 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) { require.Equal(t, roachpb.StoreID(1), entry.Lease().Replica.StoreID) require.Equal(t, []roachpb.ReplicaDescriptor{ {NodeID: 1, StoreID: 1, ReplicaID: 1}, - {NodeID: 3, StoreID: 3, ReplicaID: 3}, + {NodeID: 3, StoreID: 3, ReplicaID: 3, Type: roachpb.ReplicaTypeNonVoter()}, }, entry.Desc().Replicas().Descriptors()) // Make a note of the follower reads metric on n3. We'll check that it was From 830b02267f9d9b2669df77b6e674974d32f336b7 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Wed, 31 Mar 2021 11:51:32 -0700 Subject: [PATCH 08/11] colexecerror: catch panics from packages in sql/sem folder Previously, we would only catch panics from `sql/sem/tree` package. Recently sqlsmith encountered a crash because of a panic in `sql/sem/builtins` package, and I believe it is reasonable to catch panics from that package as well as from `sql/sem/transform`, so we will now be catching based on `sql/sem` prefix. Release note: None --- pkg/sql/colexecerror/error.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/pkg/sql/colexecerror/error.go b/pkg/sql/colexecerror/error.go index 28a3c4957575..1f5b060c14f2 100644 --- a/pkg/sql/colexecerror/error.go +++ b/pkg/sql/colexecerror/error.go @@ -92,12 +92,18 @@ func CatchVectorizedRuntimeError(operation func()) (retErr error) { return retErr } +// We use the approach of allow-listing the packages the panics from which are +// safe to catch (which is the case when the code doesn't update shared state +// and doesn't manipulate locks). +// +// Multiple actual packages can have the same prefix as a single constant string +// defined below, but all of such packages are allowed to be caught from. const ( colPackagesPrefix = "github.com/cockroachdb/cockroach/pkg/col" execinfraPackagePrefix = "github.com/cockroachdb/cockroach/pkg/sql/execinfra" sqlColPackagesPrefix = "github.com/cockroachdb/cockroach/pkg/sql/col" sqlRowPackagesPrefix = "github.com/cockroachdb/cockroach/pkg/sql/row" - treePackagePrefix = "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + sqlSemPackagesPrefix = "github.com/cockroachdb/cockroach/pkg/sql/sem" ) // shouldCatchPanic checks whether the panic that was emitted from @@ -126,7 +132,7 @@ func shouldCatchPanic(panicEmittedFrom string) bool { strings.HasPrefix(panicEmittedFrom, execinfraPackagePrefix) || strings.HasPrefix(panicEmittedFrom, sqlColPackagesPrefix) || strings.HasPrefix(panicEmittedFrom, sqlRowPackagesPrefix) || - strings.HasPrefix(panicEmittedFrom, treePackagePrefix) + strings.HasPrefix(panicEmittedFrom, sqlSemPackagesPrefix) } // StorageError is an error that was created by a component below the sql From e0efca8403d547da641abcc44d56ea0989386566 Mon Sep 17 00:00:00 2001 From: Radu Berinde Date: Tue, 30 Mar 2021 13:49:22 -0400 Subject: [PATCH 09/11] tenantrate: add "test" that reports IOPS estimations This change adds a "test" facility which takes the description of a uniform workload (read percentage, read size, write size) and prints out an estimation of the sustained IOPS and burst IO. This will allow a better understanding of how changes to the settings or the mechanism translate into IOPS changes. Release note: None --- pkg/kv/kvserver/tenantrate/BUILD.bazel | 1 + pkg/kv/kvserver/tenantrate/helpers_test.go | 23 ++++++ pkg/kv/kvserver/tenantrate/limiter_test.go | 78 ++++++++++++++++++- .../tenantrate/testdata/estimate_iops | 49 ++++++++++++ 4 files changed, 150 insertions(+), 1 deletion(-) create mode 100644 pkg/kv/kvserver/tenantrate/testdata/estimate_iops diff --git a/pkg/kv/kvserver/tenantrate/BUILD.bazel b/pkg/kv/kvserver/tenantrate/BUILD.bazel index 6c86f3295e93..b39c508e21e6 100644 --- a/pkg/kv/kvserver/tenantrate/BUILD.bazel +++ b/pkg/kv/kvserver/tenantrate/BUILD.bazel @@ -43,6 +43,7 @@ go_test( "//pkg/util/timeutil", "@com_github_cockroachdb_datadriven//:datadriven", "@com_github_cockroachdb_errors//:errors", + "@com_github_dustin_go_humanize//:go-humanize", "@com_github_stretchr_testify//require", "@in_gopkg_yaml_v2//:yaml_v2", ], diff --git a/pkg/kv/kvserver/tenantrate/helpers_test.go b/pkg/kv/kvserver/tenantrate/helpers_test.go index 81f29ec0ff94..96d4701b91fd 100644 --- a/pkg/kv/kvserver/tenantrate/helpers_test.go +++ b/pkg/kv/kvserver/tenantrate/helpers_test.go @@ -24,3 +24,26 @@ func OverrideSettingsWithRateLimits(settings *cluster.Settings, rl LimitConfigs) writeRateLimit.Override(&settings.SV, int64(rl.WriteBytes.Rate)) writeBurstLimit.Override(&settings.SV, rl.WriteBytes.Burst) } + +// DefaultLimitConfigs returns the configuration that corresponds to the default +// setting values. +func DefaultLimitConfigs() LimitConfigs { + return LimitConfigs{ + ReadRequests: LimitConfig{ + Rate: Limit(readRequestRateLimit.Default()), + Burst: readRequestBurstLimit.Default(), + }, + WriteRequests: LimitConfig{ + Rate: Limit(writeRequestRateLimit.Default()), + Burst: writeRequestBurstLimit.Default(), + }, + ReadBytes: LimitConfig{ + Rate: Limit(readRateLimit.Default()), + Burst: readBurstLimit.Default(), + }, + WriteBytes: LimitConfig{ + Rate: Limit(writeRateLimit.Default()), + Burst: writeBurstLimit.Default(), + }, + } +} diff --git a/pkg/kv/kvserver/tenantrate/limiter_test.go b/pkg/kv/kvserver/tenantrate/limiter_test.go index 1c94d70bb958..87ddf3ff609f 100644 --- a/pkg/kv/kvserver/tenantrate/limiter_test.go +++ b/pkg/kv/kvserver/tenantrate/limiter_test.go @@ -15,6 +15,7 @@ import ( "bytes" "context" "fmt" + "math" "regexp" "sort" "strings" @@ -30,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/errors" + "github.com/dustin/go-humanize" "github.com/stretchr/testify/require" "gopkg.in/yaml.v2" ) @@ -105,10 +107,11 @@ var testStateCommands = map[string]func(*testState, *testing.T, *datadriven.Test "metrics": (*testState).metrics, "get_tenants": (*testState).getTenants, "release_tenants": (*testState).releaseTenants, + "estimate_iops": (*testState).estimateIOPS, } func (ts *testState) run(t *testing.T, d *datadriven.TestData) string { - if !ts.initialized && d.Cmd != "init" { + if !ts.initialized && d.Cmd != "init" && d.Cmd != "estimate_iops" { d.Fatalf(t, "expected init as first command, got %q", d.Cmd) } if f, ok := testStateCommands[d.Cmd]; ok { @@ -486,6 +489,79 @@ func (ts *testState) releaseTenants(t *testing.T, d *datadriven.TestData) string return ts.FormatTenants() } +// estimateIOPS takes in the description of a workload and produces an estimate +// of the IOPS for that workload (under the default settings). +// +// For example: +// +// estimate_iops +// readpercentage: 50 +// readsize: 4096 +// writesize: 4096 +// ---- +// Mixed workload (50% reads; 4.0 KiB reads; 4.0 KiB writes): 256 sustained IOPS, 256 burst. +// +func (ts *testState) estimateIOPS(t *testing.T, d *datadriven.TestData) string { + var workload struct { + ReadPercentage int + ReadSize int + WriteSize int + } + if err := yaml.UnmarshalStrict([]byte(d.Input), &workload); err != nil { + d.Fatalf(t, "failed to parse workload information: %v", err) + } + if workload.ReadPercentage < 0 || workload.ReadPercentage > 100 { + d.Fatalf(t, "Invalid read percentage %d", workload.ReadPercentage) + } + limits := tenantrate.DefaultLimitConfigs() + + calculateIOPS := func(readRate, readBytesRate, writeRate, writeBytesRate float64) float64 { + readIOPS := math.Min(readRate, readBytesRate/float64(workload.ReadSize)) + writeIOPS := math.Min(writeRate, writeBytesRate/float64(workload.WriteSize)) + // The reads and writes are rate-limited separately; our workload will be + // bottlenecked on one of them. + return math.Min( + writeIOPS*100.0/float64(100-workload.ReadPercentage), + readIOPS*100.0/float64(workload.ReadPercentage), + ) + } + + sustained := calculateIOPS( + float64(limits.ReadRequests.Rate), float64(limits.ReadBytes.Rate), + float64(limits.WriteRequests.Rate), float64(limits.WriteBytes.Rate), + ) + + burst := calculateIOPS( + float64(limits.ReadRequests.Burst), float64(limits.ReadBytes.Burst), + float64(limits.WriteRequests.Burst), float64(limits.WriteBytes.Burst), + ) + fmtFloat := func(val float64) string { + if val < 10 { + return fmt.Sprintf("%.1f", val) + } + return fmt.Sprintf("%.0f", val) + } + switch workload.ReadPercentage { + case 0: + return fmt.Sprintf( + "Write-only workload (%s writes): %s sustained IOPS, %s burst.", + humanize.IBytes(uint64(workload.WriteSize)), fmtFloat(sustained), fmtFloat(burst), + ) + case 100: + return fmt.Sprintf( + "Read-only workload (%s reads): %s sustained IOPS, %s burst.", + humanize.IBytes(uint64(workload.ReadSize)), fmtFloat(sustained), fmtFloat(burst), + ) + default: + return fmt.Sprintf( + "Mixed workload (%d%% reads; %s reads; %s writes): %s sustained IOPS, %s burst.", + workload.ReadPercentage, + humanize.IBytes(uint64(workload.ReadSize)), humanize.IBytes(uint64(workload.WriteSize)), + fmtFloat(sustained), fmtFloat(burst), + ) + } +} + func (rs *testState) FormatRunning() string { var states []string for _, ls := range rs.running { diff --git a/pkg/kv/kvserver/tenantrate/testdata/estimate_iops b/pkg/kv/kvserver/tenantrate/testdata/estimate_iops new file mode 100644 index 000000000000..4054e0b0f01a --- /dev/null +++ b/pkg/kv/kvserver/tenantrate/testdata/estimate_iops @@ -0,0 +1,49 @@ +estimate_iops +readpercentage: 100 +readsize: 4096 +---- +Read-only workload (4.0 KiB reads): 128 sustained IOPS, 512 burst. + +estimate_iops +readpercentage: 100 +readsize: 65536 +---- +Read-only workload (64 KiB reads): 16 sustained IOPS, 256 burst. + +estimate_iops +readpercentage: 100 +readsize: 1048576 +---- +Read-only workload (1.0 MiB reads): 1.0 sustained IOPS, 16 burst. + +estimate_iops +readpercentage: 0 +writesize: 4096 +---- +Write-only workload (4.0 KiB writes): 128 sustained IOPS, 512 burst. + +estimate_iops +readpercentage: 0 +writesize: 65536 +---- +Write-only workload (64 KiB writes): 8.0 sustained IOPS, 128 burst. + +estimate_iops +readpercentage: 0 +writesize: 1048576 +---- +Write-only workload (1.0 MiB writes): 0.5 sustained IOPS, 8.0 burst. + +estimate_iops +readpercentage: 50 +readsize: 4096 +writesize: 4096 +---- +Mixed workload (50% reads; 4.0 KiB reads; 4.0 KiB writes): 256 sustained IOPS, 1024 burst. + +estimate_iops +readpercentage: 90 +readsize: 4096 +writesize: 4096 +---- +Mixed workload (90% reads; 4.0 KiB reads; 4.0 KiB writes): 142 sustained IOPS, 569 burst. From bb3d37c3f2313270affd0516e90c22857e239c95 Mon Sep 17 00:00:00 2001 From: Ricky Stewart Date: Wed, 31 Mar 2021 14:45:29 -0500 Subject: [PATCH 10/11] build: install essential build tools in teamcity build agents In #62815, we migrated from an alternative way of installing golang, the `longsleep/golang-backports` deb repo, to the currently recommended install method found at https://golang.org/doc/install -- namely, we download a tarball and then just unzip it in the right spot. This works perfectly, *except* that the deb package had a dependency on build tools like `gcc` and `make`, and certain build configurations had come to depend on their global installation (namely, all those that don't use `builder.sh` to run a build). This resulted in a couple of failures being reported: * https://teamcity.cockroachdb.com/buildConfiguration/Cockroach_ExampleORMs/2834741 * https://teamcity.cockroachdb.com/buildConfiguration/Cockroach_UnitTests_Acceptance/2834732 We just install [`build-essential`](https://packages.ubuntu.com/xenial/build-essential) here, which is the easiest way to get all of that stuff. Release note: None --- build/packer/teamcity-agent.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/build/packer/teamcity-agent.sh b/build/packer/teamcity-agent.sh index cbf8cbbf2103..eca335fd0ce4 100644 --- a/build/packer/teamcity-agent.sh +++ b/build/packer/teamcity-agent.sh @@ -31,6 +31,7 @@ apt-get update --yes apt-get install --yes sudo apt-get install --yes \ + build-essential \ curl \ docker-ce \ docker-compose \ From 60f03a7109228ea93e64fa5aec90d1b1655cd1b1 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Thu, 25 Mar 2021 21:14:20 -0700 Subject: [PATCH 11/11] colserde: fix the edge case with nulls handling When serializing the data of Bool, Bytes, Int, and Float types when they don't have any nulls in the vector, we don't explicit specify the null bitmap. Previously, when deserializing such vectors with no nulls we would simply call `UnsetNulls` on the `coldata.Nulls` object that is currently present. However, it is possible that already present nulls object cannot support the desired batch length. This could lead to index out of bounds accesses. Note that in the vast majority of cases this likely doesn't happen in practice because we check `MaybeHasNulls`, and that would return `false` making us omit the null checking code. Release note (bug fix): Previously, CockroachDB could encounter an internal error in rare circumstances when executing queries via the vectorized engine that operate on columns of BOOL, BYTES, INT, and FLOAT types that have a mix of NULL and non-NULL values. --- pkg/col/coldata/nulls.go | 6 ++ pkg/col/coldata/vec.eg.go | 3 + pkg/col/coldata/vec_tmpl.go | 3 + pkg/col/coldatatestutils/random_testutils.go | 18 +++--- pkg/col/colserde/arrowbatchconverter.go | 14 +++- pkg/col/colserde/arrowbatchconverter_test.go | 67 ++++++++++++-------- pkg/col/colserde/record_batch_test.go | 16 ++--- pkg/sql/colcontainer/diskqueue_test.go | 25 ++++---- 8 files changed, 93 insertions(+), 59 deletions(-) diff --git a/pkg/col/coldata/nulls.go b/pkg/col/coldata/nulls.go index 826e49ad36b4..557a8afe8edf 100644 --- a/pkg/col/coldata/nulls.go +++ b/pkg/col/coldata/nulls.go @@ -311,6 +311,12 @@ func (n *Nulls) Slice(start int, end int) Nulls { return s } +// MaxNumElements returns the maximum number of elements that this Nulls can +// accommodate. +func (n *Nulls) MaxNumElements() int { + return len(n.nulls) * 8 +} + // NullBitmap returns the null bitmap. func (n *Nulls) NullBitmap() []byte { return n.nulls diff --git a/pkg/col/coldata/vec.eg.go b/pkg/col/coldata/vec.eg.go index 3285856e050b..db0e3d774ee2 100644 --- a/pkg/col/coldata/vec.eg.go +++ b/pkg/col/coldata/vec.eg.go @@ -1172,6 +1172,9 @@ func SetValueAt(v Vec, elem interface{}, rowIdx int) { // GetValueAt is an inefficient helper to get the value in a Vec when the type // is unknown. func GetValueAt(v Vec, rowIdx int) interface{} { + if v.Nulls().NullAt(rowIdx) { + return nil + } t := v.Type() switch v.CanonicalTypeFamily() { case types.BoolFamily: diff --git a/pkg/col/coldata/vec_tmpl.go b/pkg/col/coldata/vec_tmpl.go index 179ac44f630a..044404c2b836 100644 --- a/pkg/col/coldata/vec_tmpl.go +++ b/pkg/col/coldata/vec_tmpl.go @@ -267,6 +267,9 @@ func SetValueAt(v Vec, elem interface{}, rowIdx int) { // GetValueAt is an inefficient helper to get the value in a Vec when the type // is unknown. func GetValueAt(v Vec, rowIdx int) interface{} { + if v.Nulls().NullAt(rowIdx) { + return nil + } t := v.Type() switch v.CanonicalTypeFamily() { // {{range .}} diff --git a/pkg/col/coldatatestutils/random_testutils.go b/pkg/col/coldatatestutils/random_testutils.go index 706f65e609da..2707bba159b5 100644 --- a/pkg/col/coldatatestutils/random_testutils.go +++ b/pkg/col/coldatatestutils/random_testutils.go @@ -380,18 +380,16 @@ func (o *RandomDataOp) Next(context.Context) coldata.Batch { selProbability float64 nullProbability float64 ) + if o.selection { + selProbability = o.rng.Float64() + } + if o.nulls && o.rng.Float64() > 0.1 { + // Even if nulls are desired, in 10% of cases create a batch with no + // nulls at all. + nullProbability = o.rng.Float64() + } for { - if o.selection { - selProbability = o.rng.Float64() - } - if o.nulls { - nullProbability = o.rng.Float64() - } - b := RandomBatchWithSel(o.allocator, o.rng, o.typs, o.batchSize, nullProbability, selProbability) - if !o.selection { - b.SetSelection(false) - } if b.Length() == 0 { // Don't return a zero-length batch until we return o.numBatches batches. continue diff --git a/pkg/col/colserde/arrowbatchconverter.go b/pkg/col/colserde/arrowbatchconverter.go index ffb227697074..4979aa10477f 100644 --- a/pkg/col/colserde/arrowbatchconverter.go +++ b/pkg/col/colserde/arrowbatchconverter.go @@ -483,6 +483,18 @@ func handleNulls(arr array.Interface, vec coldata.Vec, batchLength int) { // For types with the canonical type family of Bool, Bytes, Int, or // Float, when there are no nulls, we have a null bitmap with zero // length. - vec.Nulls().UnsetNulls() + if vec.Nulls().MaxNumElements() < batchLength { + // The current null bitmap doesn't have enough space, so we need to + // allocate a new one. + // + // Note that this has likely occurred because on the previous batch + // there were some nulls and we replaced the null bitmap with the + // arrowBitmap which happened to be of insufficient capacity for the + // current batch. + nulls := coldata.NewNulls(batchLength) + vec.SetNulls(&nulls) + } else { + vec.Nulls().UnsetNulls() + } } } diff --git a/pkg/col/colserde/arrowbatchconverter_test.go b/pkg/col/colserde/arrowbatchconverter_test.go index 69faf977462f..9f6a343275fe 100644 --- a/pkg/col/colserde/arrowbatchconverter_test.go +++ b/pkg/col/colserde/arrowbatchconverter_test.go @@ -61,35 +61,27 @@ func TestArrowBatchConverterRandom(t *testing.T) { coldata.AssertEquivalentBatches(t, expected, actual) } -// roundTripBatch is a helper function that round trips a batch through the -// ArrowBatchConverter and RecordBatchSerializer. Make sure to copy the input -// batch before passing it to this function to assert equality. +// roundTripBatch is a helper function that pushes the source batch through the +// ArrowBatchConverter and RecordBatchSerializer. The result is written to dest. func roundTripBatch( - b coldata.Batch, - c *colserde.ArrowBatchConverter, - r *colserde.RecordBatchSerializer, - typs []*types.T, -) (coldata.Batch, error) { + src, dest coldata.Batch, c *colserde.ArrowBatchConverter, r *colserde.RecordBatchSerializer, +) error { var buf bytes.Buffer - arrowDataIn, err := c.BatchToArrow(b) + arrowDataIn, err := c.BatchToArrow(src) if err != nil { - return nil, err + return err } - _, _, err = r.Serialize(&buf, arrowDataIn, b.Length()) + _, _, err = r.Serialize(&buf, arrowDataIn, src.Length()) if err != nil { - return nil, err + return err } var arrowDataOut []*array.Data batchLength, err := r.Deserialize(&arrowDataOut, buf.Bytes()) if err != nil { - return nil, err + return err } - actual := testAllocator.NewMemBatchWithFixedCapacity(typs, batchLength) - if err := c.ArrowToBatch(arrowDataOut, batchLength, actual); err != nil { - return nil, err - } - return actual, nil + return c.ArrowToBatch(arrowDataOut, batchLength, dest) } func TestRecordBatchRoundtripThroughBytes(t *testing.T) { @@ -98,26 +90,45 @@ func TestRecordBatchRoundtripThroughBytes(t *testing.T) { rng, _ := randutil.NewPseudoRand() for run := 0; run < 10; run++ { var typs []*types.T - var b coldata.Batch + var src coldata.Batch if rng.Float64() < 0.1 { // In 10% of cases we'll use a zero length schema. - b = testAllocator.NewMemBatchWithFixedCapacity(typs, rng.Intn(coldata.BatchSize())+1) - b.SetLength(b.Capacity()) + src = testAllocator.NewMemBatchWithFixedCapacity(typs, rng.Intn(coldata.BatchSize())+1) + src.SetLength(src.Capacity()) } else { - typs, b = randomBatch(testAllocator) + typs, src = randomBatch(testAllocator) } + dest := testAllocator.NewMemBatchWithMaxCapacity(typs) c, err := colserde.NewArrowBatchConverter(typs) require.NoError(t, err) r, err := colserde.NewRecordBatchSerializer(typs) require.NoError(t, err) - // Make a copy of the original batch because the converter modifies and - // casts data without copying for performance reasons. - expected := coldatatestutils.CopyBatch(b, typs, testColumnFactory) - actual, err := roundTripBatch(b, c, r, typs) - require.NoError(t, err) + // Reuse the same destination batch as well as the ArrowBatchConverter + // and RecordBatchSerializer in order to simulate how these things are + // used in the production setting. + for i := 0; i < 10; i++ { + require.NoError(t, roundTripBatch(src, dest, c, r)) + + coldata.AssertEquivalentBatches(t, src, dest) + // Check that we can actually read each tuple from the destination + // batch. + for _, vec := range dest.ColVecs() { + for tupleIdx := 0; tupleIdx < dest.Length(); tupleIdx++ { + coldata.GetValueAt(vec, tupleIdx) + } + } - coldata.AssertEquivalentBatches(t, expected, actual) + // Generate the new source batch. + nullProbability := rng.Float64() + if rng.Float64() < 0.1 { + // In some cases, make sure that there are no nulls at all. + nullProbability = 0 + } + capacity := rng.Intn(coldata.BatchSize()) + 1 + length := rng.Intn(capacity) + src = coldatatestutils.RandomBatch(testAllocator, rng, typs, capacity, length, nullProbability) + } } } diff --git a/pkg/col/colserde/record_batch_test.go b/pkg/col/colserde/record_batch_test.go index f66c2b04a90c..3a9091ef44ea 100644 --- a/pkg/col/colserde/record_batch_test.go +++ b/pkg/col/colserde/record_batch_test.go @@ -302,8 +302,9 @@ func TestRecordBatchSerializerDeserializeMemoryEstimate(t *testing.T) { rng, _ := randutil.NewPseudoRand() typs := []*types.T{types.Bytes} - b := testAllocator.NewMemBatchWithFixedCapacity(typs, coldata.BatchSize()) - bytesVec := b.ColVec(0).Bytes() + src := testAllocator.NewMemBatchWithMaxCapacity(typs) + dest := testAllocator.NewMemBatchWithMaxCapacity(typs) + bytesVec := src.ColVec(0).Bytes() maxValueLen := coldata.BytesInitialAllocationFactor * 8 value := make([]byte, maxValueLen) for i := 0; i < coldata.BatchSize(); i++ { @@ -312,17 +313,16 @@ func TestRecordBatchSerializerDeserializeMemoryEstimate(t *testing.T) { require.NoError(t, err) bytesVec.Set(i, value) } - b.SetLength(coldata.BatchSize()) - - originalMemorySize := colmem.GetBatchMemSize(b) + src.SetLength(coldata.BatchSize()) c, err := colserde.NewArrowBatchConverter(typs) require.NoError(t, err) r, err := colserde.NewRecordBatchSerializer(typs) require.NoError(t, err) - b, err = roundTripBatch(b, c, r, typs) - require.NoError(t, err) - newMemorySize := colmem.GetBatchMemSize(b) + require.NoError(t, roundTripBatch(src, dest, c, r)) + + originalMemorySize := colmem.GetBatchMemSize(src) + newMemorySize := colmem.GetBatchMemSize(dest) // We expect that the original and the new memory sizes are relatively close // to each other (do not differ by more than a third). We cannot guarantee diff --git a/pkg/sql/colcontainer/diskqueue_test.go b/pkg/sql/colcontainer/diskqueue_test.go index 640ebc15d736..1fdb73e5547c 100644 --- a/pkg/sql/colcontainer/diskqueue_test.go +++ b/pkg/sql/colcontainer/diskqueue_test.go @@ -104,20 +104,22 @@ func TestDiskQueue(t *testing.T) { require.NoError(t, err) require.Equal(t, 1, len(directories)) - // Run verification. + // Run verification. We reuse the same batch to dequeue into + // since that is the common pattern. + dest := coldata.NewMemBatch(typs, testColumnFactory) for { - b := op.Next(ctx) - require.NoError(t, q.Enqueue(ctx, b)) - if b.Length() == 0 { + src := op.Next(ctx) + require.NoError(t, q.Enqueue(ctx, src)) + if src.Length() == 0 { break } if rng.Float64() < dequeuedProbabilityBeforeAllEnqueuesAreDone { - if ok, err := q.Dequeue(ctx, b); !ok { + if ok, err := q.Dequeue(ctx, dest); !ok { t.Fatal("queue incorrectly considered empty") } else if err != nil { t.Fatal(err) } - coldata.AssertEquivalentBatches(t, batches[0], b) + coldata.AssertEquivalentBatches(t, batches[0], dest) batches = batches[1:] } } @@ -127,25 +129,24 @@ func TestDiskQueue(t *testing.T) { } for i := 0; i < numReadIterations; i++ { batchIdx := 0 - b := coldata.NewMemBatch(typs, testColumnFactory) for batchIdx < len(batches) { - if ok, err := q.Dequeue(ctx, b); !ok { + if ok, err := q.Dequeue(ctx, dest); !ok { t.Fatal("queue incorrectly considered empty") } else if err != nil { t.Fatal(err) } - coldata.AssertEquivalentBatches(t, batches[batchIdx], b) + coldata.AssertEquivalentBatches(t, batches[batchIdx], dest) batchIdx++ } if testReuseCache { // Trying to Enqueue after a Dequeue should return an error in these // CacheModes. - require.Error(t, q.Enqueue(ctx, b)) + require.Error(t, q.Enqueue(ctx, dest)) } - if ok, err := q.Dequeue(ctx, b); ok { - if b.Length() != 0 { + if ok, err := q.Dequeue(ctx, dest); ok { + if dest.Length() != 0 { t.Fatal("queue should be empty") } } else if err != nil {