diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index c520b8d81091..2983b117b591 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -294,4 +294,4 @@ trace.opentelemetry.collector string address of an OpenTelemetry trace collecto trace.snapshot.rate duration 0s if non-zero, interval at which background trace snapshots are captured tenant-rw trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https:///#/debug/tracez tenant-rw trace.zipkin.collector string the address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. tenant-rw -version version 1000023.1-2 set the active cluster version in the format '.' tenant-rw +version version 1000023.1-4 set the active cluster version in the format '.' tenant-rw diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index deb7ab56cfbb..3d571fe77bb5 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -247,6 +247,6 @@
trace.snapshot.rate
duration0sif non-zero, interval at which background trace snapshots are capturedServerless/Dedicated/Self-Hosted
trace.span_registry.enabled
booleantrueif set, ongoing traces can be seen at https://<ui>/#/debug/tracezServerless/Dedicated/Self-Hosted
trace.zipkin.collector
stringthe address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.Serverless/Dedicated/Self-Hosted -
version
version1000023.1-2set the active cluster version in the format '<major>.<minor>'Serverless/Dedicated/Self-Hosted +
version
version1000023.1-4set the active cluster version in the format '<major>.<minor>'Serverless/Dedicated/Self-Hosted diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 5d9f2d872021..e67c1553eba5 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -553,6 +553,7 @@ ALL_TESTS = [ "//pkg/sql/stmtdiagnostics:stmtdiagnostics_test", "//pkg/sql/syntheticprivilege:syntheticprivilege_test", "//pkg/sql/tests:tests_test", + "//pkg/sql/ttl/ttlbase:ttlbase_test", "//pkg/sql/ttl/ttljob:ttljob_test", "//pkg/sql/types:types_disallowed_imports_test", "//pkg/sql/types:types_test", @@ -2010,6 +2011,7 @@ GO_TARGETS = [ "//pkg/sql/tests:tests", "//pkg/sql/tests:tests_test", "//pkg/sql/ttl/ttlbase:ttlbase", + "//pkg/sql/ttl/ttlbase:ttlbase_test", "//pkg/sql/ttl/ttljob:ttljob", "//pkg/sql/ttl/ttljob:ttljob_test", "//pkg/sql/ttl/ttlschedule:ttlschedule", diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index f4ee15192288..723513d90f20 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -542,6 +542,10 @@ const ( // Step 1b: Add new version for 23.2 development here. // Do not add new versions to a patch release. // ************************************************* + + // V23_2TTLAllowDescPK is the version where TTL tables can have descending + // primary keys. + V23_2TTLAllowDescPK ) func (k Key) String() string { @@ -941,6 +945,10 @@ var rawVersionsSingleton = keyedVersions{ // Step 2b: Add new version gates for 23.2 development here. // Do not add new versions to a patch release. // ************************************************* + { + Key: V23_2TTLAllowDescPK, + Version: roachpb.Version{Major: 23, Minor: 1, Internal: 4}, + }, } // developmentBranch must true on the main development branch but diff --git a/pkg/sql/alter_table.go b/pkg/sql/alter_table.go index ce5780382dcf..df8fe577fbe4 100644 --- a/pkg/sql/alter_table.go +++ b/pkg/sql/alter_table.go @@ -25,7 +25,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/funcdesc" "github.com/cockroachdb/cockroach/pkg/sql/catalog/resolver" @@ -495,7 +494,7 @@ func (n *alterTableNode) startExec(params runParams) error { } tableDesc := n.tableDesc - if t.Column == colinfo.TTLDefaultExpirationColumnName && + if t.Column == catpb.TTLDefaultExpirationColumnName && tableDesc.HasRowLevelTTL() && tableDesc.GetRowLevelTTL().HasDurationExpr() { return errors.WithHintf( @@ -607,7 +606,7 @@ func (n *alterTableNode) startExec(params runParams) error { "column %q in the middle of being dropped", t.GetColumn()) } columnName := col.GetName() - if columnName == colinfo.TTLDefaultExpirationColumnName && + if columnName == catpb.TTLDefaultExpirationColumnName && tableDesc.HasRowLevelTTL() && tableDesc.GetRowLevelTTL().HasDurationExpr() { return pgerror.Newf( @@ -753,7 +752,7 @@ func (n *alterTableNode) startExec(params runParams) error { case *tree.AlterTableRenameColumn: tableDesc := n.tableDesc columnName := t.Column - if columnName == colinfo.TTLDefaultExpirationColumnName && + if columnName == catpb.TTLDefaultExpirationColumnName && tableDesc.HasRowLevelTTL() && tableDesc.GetRowLevelTTL().HasDurationExpr() { return pgerror.Newf( @@ -1841,7 +1840,7 @@ func handleTTLStorageParamChange( // Update default expression on automated column if required. if before.HasDurationExpr() && after.HasDurationExpr() && before.DurationExpr != after.DurationExpr { - col, err := catalog.MustFindColumnByName(tableDesc, colinfo.TTLDefaultExpirationColumnName) + col, err := catalog.MustFindColumnByName(tableDesc, catpb.TTLDefaultExpirationColumnName) if err != nil { return false, err } @@ -1883,11 +1882,11 @@ func handleTTLStorageParamChange( // Adding a TTL requires adding the automatic column and deferring the TTL // addition to after the column is successfully added. addTTLMutation = true - if catalog.FindColumnByName(tableDesc, colinfo.TTLDefaultExpirationColumnName) != nil { + if catalog.FindColumnByName(tableDesc, catpb.TTLDefaultExpirationColumnName) != nil { return false, pgerror.Newf( pgcode.InvalidTableDefinition, "cannot add TTL to table with the %s column already defined", - colinfo.TTLDefaultExpirationColumnName, + catpb.TTLDefaultExpirationColumnName, ) } col, err := rowLevelTTLAutomaticColumnDef(after) @@ -1933,7 +1932,7 @@ func handleTTLStorageParamChange( // Create the DROP COLUMN job and the associated mutation. dropTTLMutation = true droppedViews, err := dropColumnImpl(params, tn, tableDesc, after, &tree.AlterTableDropColumn{ - Column: colinfo.TTLDefaultExpirationColumnName, + Column: catpb.TTLDefaultExpirationColumnName, }) if err != nil { return false, err diff --git a/pkg/sql/catalog/catpb/BUILD.bazel b/pkg/sql/catalog/catpb/BUILD.bazel index 3651db0de1a8..24ac54bc37af 100644 --- a/pkg/sql/catalog/catpb/BUILD.bazel +++ b/pkg/sql/catalog/catpb/BUILD.bazel @@ -41,7 +41,6 @@ go_library( "job_id.go", "multiregion.go", "privilege.go", - "ttl.go", ":gen-privilegedescversion-stringer", # keep ], embed = [":catpb_go_proto"], diff --git a/pkg/sql/catalog/catpb/catalog.go b/pkg/sql/catalog/catpb/catalog.go index 8e5c5be8857d..313edf55fa8d 100644 --- a/pkg/sql/catalog/catpb/catalog.go +++ b/pkg/sql/catalog/catpb/catalog.go @@ -99,6 +99,14 @@ func (as *AutoStatsSettings) NoAutoStatsSettingsOverrides() bool { return true } +// TTLDefaultExpirationColumnName is the column name representing the expiration +// column for TTL. +const TTLDefaultExpirationColumnName = "crdb_internal_expiration" + +// DefaultTTLExpirationExpr is default TTL expression when +// ttl_expiration_expression is not specified +var DefaultTTLExpirationExpr = Expression(TTLDefaultExpirationColumnName) + // HasDurationExpr is a utility method to determine if ttl_expires_after was set func (rowLevelTTL *RowLevelTTL) HasDurationExpr() bool { return rowLevelTTL.DurationExpr != "" @@ -108,3 +116,18 @@ func (rowLevelTTL *RowLevelTTL) HasDurationExpr() bool { func (rowLevelTTL *RowLevelTTL) HasExpirationExpr() bool { return rowLevelTTL.ExpirationExpr != "" } + +// DeletionCronOrDefault returns the DeletionCron or the global default. +func (m *RowLevelTTL) DeletionCronOrDefault() string { + if override := m.DeletionCron; override != "" { + return override + } + return "@hourly" +} + +func (rowLevelTTL *RowLevelTTL) GetTTLExpr() Expression { + if rowLevelTTL.HasExpirationExpr() { + return rowLevelTTL.ExpirationExpr + } + return DefaultTTLExpirationExpr +} diff --git a/pkg/sql/catalog/colinfo/BUILD.bazel b/pkg/sql/catalog/colinfo/BUILD.bazel index ed4c935da990..5e08791937dd 100644 --- a/pkg/sql/catalog/colinfo/BUILD.bazel +++ b/pkg/sql/catalog/colinfo/BUILD.bazel @@ -13,7 +13,6 @@ go_library( "ordering.go", "result_columns.go", "system_columns.go", - "ttl.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo", visibility = ["//visibility:public"], diff --git a/pkg/sql/catalog/colinfo/ttl.go b/pkg/sql/catalog/colinfo/ttl.go deleted file mode 100644 index 6980e7383ca9..000000000000 --- a/pkg/sql/catalog/colinfo/ttl.go +++ /dev/null @@ -1,21 +0,0 @@ -// Copyright 2022 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package colinfo - -import "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb" - -// TTLDefaultExpirationColumnName is the column name representing the expiration -// column for TTL. -const TTLDefaultExpirationColumnName = "crdb_internal_expiration" - -// DefaultTTLExpirationExpr is default TTL expression when -// ttl_expiration_expression is not specified -var DefaultTTLExpirationExpr = catpb.Expression(TTLDefaultExpirationColumnName) diff --git a/pkg/sql/catalog/lease/lease.go b/pkg/sql/catalog/lease/lease.go index 33b136e01f7f..3eb6e6a471c5 100644 --- a/pkg/sql/catalog/lease/lease.go +++ b/pkg/sql/catalog/lease/lease.go @@ -483,15 +483,15 @@ func (m *Manager) AcquireFreshestFromStore(ctx context.Context, id descpb.ID) er // If we are to join an in-progress acquisition, it needs to be an acquisition // initiated after this point. // So, we handle two cases: - // 1. The first DoChan() call tells us that we didn't join an in-progress - // acquisition. Great, the lease that's being acquired is good. - // 2. The first DoChan() call tells us that we did join an in-progress acq. - // We have to wait this acquisition out; it's not good for us. But any - // future acquisition is good, so the next time around the loop it doesn't - // matter if we initiate a request or join an in-progress one. - // In both cases, we need to check if the lease we want is still valid because - // lease acquisition is done without holding the descriptorState lock, so anything - // can happen in between lease acquisition and us getting control again. + // 1. The first acquireNodeLease(..) call tells us that we didn't join an + // in-progress acquisition but rather initiated one. Great, the lease + // that's being acquired is good. + // 2. The first acquireNodeLease(..) call tells us that we did join an + // in-progress acquisition; + // We have to wait this acquisition out; it's not good for us. But any + // future acquisition is good, so the next time around the loop it doesn't + // matter if we initiate a request or join an in-progress one (because + // either way, it's an acquisition performed after this call). attemptsMade := 0 for { // Acquire a fresh lease. @@ -501,12 +501,11 @@ func (m *Manager) AcquireFreshestFromStore(ctx context.Context, id descpb.ID) er } if didAcquire { - // Case 1: we didn't join an in-progress call and the lease is still - // valid. + // Case 1: we initiated a lease acquisition call. break - } else if attemptsMade > 1 { - // Case 2: more than one acquisition has happened and the lease is still - // valid. + } else if attemptsMade > 0 { + // Case 2: we joined an in-progress lease acquisition call but that call + // was initiated after we entered this function. break } attemptsMade++ diff --git a/pkg/sql/catalog/tabledesc/ttl.go b/pkg/sql/catalog/tabledesc/ttl.go index dc7828e62419..2f86b2e7c43a 100644 --- a/pkg/sql/catalog/tabledesc/ttl.go +++ b/pkg/sql/catalog/tabledesc/ttl.go @@ -16,7 +16,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catenumpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/schemaexpr" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" @@ -94,7 +93,7 @@ func ValidateTTLExpirationExpr(desc catalog.TableDescriptor) error { // ValidateTTLExpirationColumn validates that the ttl_expire_after setting, if // any, is in a valid state. It requires that the TTLDefaultExpirationColumn // exists and has DEFAULT/ON UPDATE clauses. -func ValidateTTLExpirationColumn(desc catalog.TableDescriptor) error { +func ValidateTTLExpirationColumn(desc catalog.TableDescriptor, allowDescPK bool) error { if !desc.HasRowLevelTTL() { return nil } @@ -102,16 +101,16 @@ func ValidateTTLExpirationColumn(desc catalog.TableDescriptor) error { return nil } intervalExpr := desc.GetRowLevelTTL().DurationExpr - col, err := catalog.MustFindColumnByTreeName(desc, colinfo.TTLDefaultExpirationColumnName) + col, err := catalog.MustFindColumnByTreeName(desc, catpb.TTLDefaultExpirationColumnName) if err != nil { - return errors.Wrapf(err, "expected column %s", colinfo.TTLDefaultExpirationColumnName) + return errors.Wrapf(err, "expected column %s", catpb.TTLDefaultExpirationColumnName) } expectedStr := `current_timestamp():::TIMESTAMPTZ + ` + string(intervalExpr) if col.GetDefaultExpr() != expectedStr { return pgerror.Newf( pgcode.InvalidTableDefinition, "expected DEFAULT expression of %s to be %s", - colinfo.TTLDefaultExpirationColumnName, + catpb.TTLDefaultExpirationColumnName, expectedStr, ) } @@ -119,20 +118,22 @@ func ValidateTTLExpirationColumn(desc catalog.TableDescriptor) error { return pgerror.Newf( pgcode.InvalidTableDefinition, "expected ON UPDATE expression of %s to be %s", - colinfo.TTLDefaultExpirationColumnName, + catpb.TTLDefaultExpirationColumnName, expectedStr, ) } // For row-level TTL, only ascending PKs are permitted. - pk := desc.GetPrimaryIndex() - for i := 0; i < pk.NumKeyColumns(); i++ { - dir := pk.GetKeyColumnDirection(i) - if dir != catenumpb.IndexColumn_ASC { - return unimplemented.NewWithIssuef( - 76912, - `non-ascending ordering on PRIMARY KEYs are not supported with row-level TTL`, - ) + if !allowDescPK { + pk := desc.GetPrimaryIndex() + for i := 0; i < pk.NumKeyColumns(); i++ { + dir := pk.GetKeyColumnDirection(i) + if dir != catenumpb.IndexColumn_ASC { + return unimplemented.NewWithIssuef( + 76912, + `non-ascending ordering on PRIMARY KEYs are not supported with row-level TTL`, + ) + } } } diff --git a/pkg/sql/catalog/tabledesc/validate.go b/pkg/sql/catalog/tabledesc/validate.go index 5da17a54cc3f..bfcd66e53248 100644 --- a/pkg/sql/catalog/tabledesc/validate.go +++ b/pkg/sql/catalog/tabledesc/validate.go @@ -11,6 +11,7 @@ package tabledesc import ( + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" @@ -862,7 +863,7 @@ func (desc *wrapper) ValidateSelf(vea catalog.ValidationErrorAccumulator) { // ValidateRowLevelTTL is also used before the table descriptor is fully // initialized to validate the storage parameters. vea.Report(ValidateTTLExpirationExpr(desc)) - vea.Report(ValidateTTLExpirationColumn(desc)) + vea.Report(ValidateTTLExpirationColumn(desc, vea.IsActive(clusterversion.V23_2TTLAllowDescPK))) // Validate that there are no column with both a foreign key ON UPDATE and an // ON UPDATE expression. This check is made to ensure that we know which ON diff --git a/pkg/sql/catalog/tabledesc/validate_test.go b/pkg/sql/catalog/tabledesc/validate_test.go index b0c705eef8d0..1bfc051e435f 100644 --- a/pkg/sql/catalog/tabledesc/validate_test.go +++ b/pkg/sql/catalog/tabledesc/validate_test.go @@ -50,7 +50,7 @@ const ( // must add a justification for this in the map. todoIAmKnowinglyAddingTechDebt // iSolemnlySwearThisFieldIsValidated means that a field was added to a - //validate method. + // validate method. iSolemnlySwearThisFieldIsValidated ) @@ -386,24 +386,25 @@ func TestValidateTableDesc(t *testing.T) { } testData := []struct { - err string - desc descpb.TableDescriptor + err string + desc descpb.TableDescriptor + version clusterversion.Key }{ - {`empty relation name`, - descpb.TableDescriptor{}}, - {`invalid table ID 0`, - descpb.TableDescriptor{ID: 0, Name: "foo"}}, - {`invalid parent ID 0`, - descpb.TableDescriptor{ID: 2, Name: "foo"}}, - {`table must contain at least 1 column`, - descpb.TableDescriptor{ + {err: `empty relation name`, + desc: descpb.TableDescriptor{}}, + {err: `invalid table ID 0`, + desc: descpb.TableDescriptor{ID: 0, Name: "foo"}}, + {err: `invalid parent ID 0`, + desc: descpb.TableDescriptor{ID: 2, Name: "foo"}}, + {err: `table must contain at least 1 column`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", FormatVersion: descpb.InterleavedFormatVersion, }}, - {`empty column name`, - descpb.TableDescriptor{ + {err: `empty column name`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -413,8 +414,8 @@ func TestValidateTableDesc(t *testing.T) { }, NextColumnID: 2, }}, - {`table is encoded using using version 0, but this client only supports version 3`, - descpb.TableDescriptor{ + {err: `table is encoded using using version 0, but this client only supports version 3`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -423,8 +424,8 @@ func TestValidateTableDesc(t *testing.T) { }, NextColumnID: 2, }}, - {`virtual column "virt" is not computed`, - descpb.TableDescriptor{ + {err: `virtual column "virt" is not computed`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -435,8 +436,8 @@ func TestValidateTableDesc(t *testing.T) { }, NextColumnID: 3, }}, - {`invalid column ID 0`, - descpb.TableDescriptor{ + {err: `invalid column ID 0`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -446,8 +447,8 @@ func TestValidateTableDesc(t *testing.T) { }, NextColumnID: 2, }}, - {`table must contain a primary key`, - descpb.TableDescriptor{ + {err: `table must contain a primary key`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -461,8 +462,8 @@ func TestValidateTableDesc(t *testing.T) { NextColumnID: 2, NextFamilyID: 1, }}, - {`duplicate column name: "bar"`, - descpb.TableDescriptor{ + {err: `duplicate column name: "bar"`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -473,8 +474,8 @@ func TestValidateTableDesc(t *testing.T) { }, NextColumnID: 3, }}, - {`duplicate column name: "bar"`, - descpb.TableDescriptor{ + {err: `duplicate column name: "bar"`, + desc: descpb.TableDescriptor{ ID: catconstants.CrdbInternalBackwardDependenciesTableID, ParentID: 0, Name: "foo", @@ -485,8 +486,8 @@ func TestValidateTableDesc(t *testing.T) { }, NextColumnID: 3, }}, - {`column "blah" duplicate ID of column "bar": 1`, - descpb.TableDescriptor{ + {err: `column "blah" duplicate ID of column "bar": 1`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -497,8 +498,8 @@ func TestValidateTableDesc(t *testing.T) { }, NextColumnID: 2, }}, - {`at least 1 column family must be specified`, - descpb.TableDescriptor{ + {err: `at least 1 column family must be specified`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -508,8 +509,8 @@ func TestValidateTableDesc(t *testing.T) { }, NextColumnID: 2, }}, - {`column "bar" cannot be hidden and inaccessible`, - descpb.TableDescriptor{ + {err: `column "bar" cannot be hidden and inaccessible`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -519,8 +520,8 @@ func TestValidateTableDesc(t *testing.T) { }, NextColumnID: 2, }}, - {`the 0th family must have ID 0`, - descpb.TableDescriptor{ + {err: `the 0th family must have ID 0`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -533,8 +534,8 @@ func TestValidateTableDesc(t *testing.T) { }, NextColumnID: 2, }}, - {`duplicate family name: "baz"`, - descpb.TableDescriptor{ + {err: `duplicate family name: "baz"`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -549,8 +550,8 @@ func TestValidateTableDesc(t *testing.T) { NextColumnID: 2, NextFamilyID: 2, }}, - {`family "qux" duplicate ID of family "baz": 0`, - descpb.TableDescriptor{ + {err: `family "qux" duplicate ID of family "baz": 0`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -565,8 +566,8 @@ func TestValidateTableDesc(t *testing.T) { NextColumnID: 2, NextFamilyID: 2, }}, - {`duplicate family name: "baz"`, - descpb.TableDescriptor{ + {err: `duplicate family name: "baz"`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -581,8 +582,8 @@ func TestValidateTableDesc(t *testing.T) { NextColumnID: 2, NextFamilyID: 2, }}, - {`mismatched column ID size (1) and name size (0)`, - descpb.TableDescriptor{ + {err: `mismatched column ID size (1) and name size (0)`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -596,8 +597,8 @@ func TestValidateTableDesc(t *testing.T) { NextColumnID: 2, NextFamilyID: 1, }}, - {`family "baz" contains column reference "bar" with unknown ID 2`, - descpb.TableDescriptor{ + {err: `family "baz" contains column reference "bar" with unknown ID 2`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -611,8 +612,8 @@ func TestValidateTableDesc(t *testing.T) { NextColumnID: 2, NextFamilyID: 1, }}, - {`family "baz" column 1 should have name "bar", but found name "qux"`, - descpb.TableDescriptor{ + {err: `family "baz" column 1 should have name "bar", but found name "qux"`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -626,8 +627,8 @@ func TestValidateTableDesc(t *testing.T) { NextColumnID: 2, NextFamilyID: 1, }}, - {`column "bar" is not in any column family`, - descpb.TableDescriptor{ + {err: `column "bar" is not in any column family`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -641,8 +642,8 @@ func TestValidateTableDesc(t *testing.T) { NextColumnID: 2, NextFamilyID: 1, }}, - {`column 1 is in both family 0 and 1`, - descpb.TableDescriptor{ + {err: `column 1 is in both family 0 and 1`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -657,8 +658,8 @@ func TestValidateTableDesc(t *testing.T) { NextColumnID: 2, NextFamilyID: 2, }}, - {`virtual computed column "virt" cannot be part of a family`, - descpb.TableDescriptor{ + {err: `virtual computed column "virt" cannot be part of a family`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -674,8 +675,8 @@ func TestValidateTableDesc(t *testing.T) { NextColumnID: 3, NextFamilyID: 2, }}, - {`table must contain a primary key`, - descpb.TableDescriptor{ + {err: `table must contain a primary key`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -692,8 +693,8 @@ func TestValidateTableDesc(t *testing.T) { NextColumnID: 2, NextFamilyID: 1, }}, - {`primary index "p_idx" cannot be not visible`, - descpb.TableDescriptor{ + {err: `primary index "p_idx" cannot be not visible`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -724,8 +725,8 @@ func TestValidateTableDesc(t *testing.T) { NextFamilyID: 1, NextIndexID: 2, }}, - {`invalid index ID 0`, - descpb.TableDescriptor{ + {err: `invalid index ID 0`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -746,8 +747,8 @@ func TestValidateTableDesc(t *testing.T) { NextFamilyID: 1, NextConstraintID: 2, }}, - {`index "bar" must contain at least 1 column`, - descpb.TableDescriptor{ + {err: `index "bar" must contain at least 1 column`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -776,8 +777,8 @@ func TestValidateTableDesc(t *testing.T) { NextIndexID: 3, NextConstraintID: 2, }}, - {`mismatched column IDs (1) and names (0)`, - descpb.TableDescriptor{ + {err: `mismatched column IDs (1) and names (0)`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -799,8 +800,8 @@ func TestValidateTableDesc(t *testing.T) { NextIndexID: 2, NextConstraintID: 2, }}, - {`mismatched column IDs (1) and names (2)`, - descpb.TableDescriptor{ + {err: `mismatched column IDs (1) and names (2)`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -821,8 +822,8 @@ func TestValidateTableDesc(t *testing.T) { NextIndexID: 2, NextConstraintID: 2, }}, - {`duplicate index name: "bar"`, - descpb.TableDescriptor{ + {err: `duplicate index name: "bar"`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -850,8 +851,8 @@ func TestValidateTableDesc(t *testing.T) { NextIndexID: 3, NextConstraintID: 2, }}, - {`index "blah" duplicate ID of index "bar": 1`, - descpb.TableDescriptor{ + {err: `index "blah" duplicate ID of index "bar": 1`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -880,8 +881,8 @@ func TestValidateTableDesc(t *testing.T) { NextIndexID: 2, NextConstraintID: 2, }}, - {`index "bar" contains key column "bar" with unknown ID 2`, - descpb.TableDescriptor{ + {err: `index "bar" contains key column "bar" with unknown ID 2`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -905,8 +906,8 @@ func TestValidateTableDesc(t *testing.T) { NextIndexID: 2, NextConstraintID: 2, }}, - {`index "bar" key column ID 1 should have name "bar", but found name "blah"`, - descpb.TableDescriptor{ + {err: `index "bar" key column ID 1 should have name "bar", but found name "blah"`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -930,8 +931,8 @@ func TestValidateTableDesc(t *testing.T) { NextIndexID: 2, NextConstraintID: 2, }}, - {`mismatched column IDs (1) and directions (0)`, - descpb.TableDescriptor{ + {err: `mismatched column IDs (1) and directions (0)`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -951,8 +952,8 @@ func TestValidateTableDesc(t *testing.T) { NextIndexID: 2, NextConstraintID: 2, }}, - {`mismatched STORING column IDs (1) and names (0)`, - descpb.TableDescriptor{ + {err: `mismatched STORING column IDs (1) and names (0)`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -981,8 +982,8 @@ func TestValidateTableDesc(t *testing.T) { NextIndexID: 2, NextConstraintID: 2, }}, - {`index "secondary" contains stored column "quux" with unknown ID 123`, - descpb.TableDescriptor{ + {err: `index "secondary" contains stored column "quux" with unknown ID 123`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -1018,8 +1019,8 @@ func TestValidateTableDesc(t *testing.T) { NextIndexID: 3, NextConstraintID: 2, }}, - {`index "secondary" stored column ID 2 should have name "baz", but found name "quux"`, - descpb.TableDescriptor{ + {err: `index "secondary" stored column ID 2 should have name "baz", but found name "quux"`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -1060,8 +1061,8 @@ func TestValidateTableDesc(t *testing.T) { NextIndexID: 3, NextConstraintID: 2, }}, - {`index "secondary" key suffix column ID 123 is invalid`, - descpb.TableDescriptor{ + {err: `index "secondary" key suffix column ID 123 is invalid`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -1096,8 +1097,8 @@ func TestValidateTableDesc(t *testing.T) { NextIndexID: 3, NextConstraintID: 2, }}, - {`index "primary" contains deprecated foreign key representation`, - descpb.TableDescriptor{ + {err: `index "primary" contains deprecated foreign key representation`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -1121,10 +1122,10 @@ func TestValidateTableDesc(t *testing.T) { NextIndexID: 2, NextConstraintID: 2, }}, - {`at least one of LIST or RANGE partitioning must be used`, + {err: `at least one of LIST or RANGE partitioning must be used`, // Verify that validatePartitioning is hooked up. The rest of these // tests are in TestValidatePartitionion. - descpb.TableDescriptor{ + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -1150,8 +1151,8 @@ func TestValidateTableDesc(t *testing.T) { NextIndexID: 3, NextConstraintID: 2, }}, - {`index "foo_crdb_internal_bar_shard_5_bar_idx" refers to non-existent shard column "does not exist"`, - descpb.TableDescriptor{ + {err: `index "foo_crdb_internal_bar_shard_5_bar_idx" refers to non-existent shard column "does not exist"`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -1196,8 +1197,8 @@ func TestValidateTableDesc(t *testing.T) { NextIndexID: 3, NextConstraintID: 2, }}, - {`TableID mismatch for unique without index constraint "bar_unique": "1" doesn't match descriptor: "2"`, - descpb.TableDescriptor{ + {err: `TableID mismatch for unique without index constraint "bar_unique": "1" doesn't match descriptor: "2"`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -1223,8 +1224,8 @@ func TestValidateTableDesc(t *testing.T) { }, }, }}, - {`unique without index constraint "bar_unique" contains unknown column "2"`, - descpb.TableDescriptor{ + {err: `unique without index constraint "bar_unique" contains unknown column "2"`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -1250,8 +1251,8 @@ func TestValidateTableDesc(t *testing.T) { }, }, }}, - {`unique without index constraint "bar_unique" contains duplicate column "1"`, - descpb.TableDescriptor{ + {err: `unique without index constraint "bar_unique" contains duplicate column "1"`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -1277,8 +1278,8 @@ func TestValidateTableDesc(t *testing.T) { }, }, }}, - {`empty constraint name`, - descpb.TableDescriptor{ + {err: `empty constraint name`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -1308,8 +1309,8 @@ func TestValidateTableDesc(t *testing.T) { }, }, }}, - {`index "sec" cannot store virtual column "c3"`, - descpb.TableDescriptor{ + {err: `index "sec" cannot store virtual column "c3"`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -1348,8 +1349,8 @@ func TestValidateTableDesc(t *testing.T) { NextIndexID: 3, NextConstraintID: 2, }}, - {"", - descpb.TableDescriptor{ + {err: ``, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -1432,8 +1433,8 @@ func TestValidateTableDesc(t *testing.T) { NextConstraintID: 3, Privileges: catpb.NewBasePrivilegeDescriptor(username.AdminRoleName()), }}, - {`index "sec" cannot store virtual column "c3"`, - descpb.TableDescriptor{ + {err: `index "sec" cannot store virtual column "c3"`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -1516,8 +1517,8 @@ func TestValidateTableDesc(t *testing.T) { NextConstraintID: 3, Privileges: catpb.NewBasePrivilegeDescriptor(username.AdminRoleName()), }}, - {`index "new_sec" cannot store virtual column "c3"`, - descpb.TableDescriptor{ + {err: `index "new_sec" cannot store virtual column "c3"`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -1600,8 +1601,8 @@ func TestValidateTableDesc(t *testing.T) { NextConstraintID: 3, Privileges: catpb.NewBasePrivilegeDescriptor(username.AdminRoleName()), }}, - {`index "sec" cannot store virtual column "v"`, - descpb.TableDescriptor{ + {err: `index "sec" cannot store virtual column "v"`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -1635,8 +1636,8 @@ func TestValidateTableDesc(t *testing.T) { NextIndexID: 3, NextConstraintID: 2, }}, - {`index "sec" has column ID 2 present in: [KeyColumnIDs StoreColumnIDs]`, - descpb.TableDescriptor{ + {err: `index "sec" has column ID 2 present in: [KeyColumnIDs StoreColumnIDs]`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -1670,8 +1671,8 @@ func TestValidateTableDesc(t *testing.T) { NextIndexID: 3, NextConstraintID: 2, }}, - {`computed column "bar" cannot also have an ON UPDATE expression`, - descpb.TableDescriptor{ + {err: `computed column "bar" cannot also have an ON UPDATE expression`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -1690,8 +1691,8 @@ func TestValidateTableDesc(t *testing.T) { NextColumnID: 2, NextFamilyID: 1, }}, - {`both generated identity and on update expression specified for column "bar"`, - descpb.TableDescriptor{ + {err: `both generated identity and on update expression specified for column "bar"`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -1710,8 +1711,8 @@ func TestValidateTableDesc(t *testing.T) { NextColumnID: 2, NextFamilyID: 1, }}, - {`both generated identity and on update expression specified for column "bar"`, - descpb.TableDescriptor{ + {err: `both generated identity and on update expression specified for column "bar"`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -1730,8 +1731,8 @@ func TestValidateTableDesc(t *testing.T) { NextColumnID: 2, NextFamilyID: 1, }}, - {`conflicting NULL/NOT NULL declarations for column "bar"`, - descpb.TableDescriptor{ + {err: `conflicting NULL/NOT NULL declarations for column "bar"`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -1743,8 +1744,8 @@ func TestValidateTableDesc(t *testing.T) { }, NextColumnID: 3, }}, - {`conflicting NULL/NOT NULL declarations for column "bar"`, - descpb.TableDescriptor{ + {err: `conflicting NULL/NOT NULL declarations for column "bar"`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -1756,8 +1757,8 @@ func TestValidateTableDesc(t *testing.T) { }, NextColumnID: 3, }}, - {`both generated identity and computed expression specified for column "bar"`, - descpb.TableDescriptor{ + {err: `both generated identity and computed expression specified for column "bar"`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -1768,8 +1769,8 @@ func TestValidateTableDesc(t *testing.T) { }, NextColumnID: 3, }}, - {`both generated identity and computed expression specified for column "bar"`, - descpb.TableDescriptor{ + {err: `both generated identity and computed expression specified for column "bar"`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -1780,8 +1781,8 @@ func TestValidateTableDesc(t *testing.T) { }, NextColumnID: 3, }}, - {`conflicting NULL/NOT NULL declarations for column "bar"`, - descpb.TableDescriptor{ + {err: `conflicting NULL/NOT NULL declarations for column "bar"`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -1794,8 +1795,8 @@ func TestValidateTableDesc(t *testing.T) { }, NextColumnID: 3, }}, - {`conflicting NULL/NOT NULL declarations for column "bar"`, - descpb.TableDescriptor{ + {err: `conflicting NULL/NOT NULL declarations for column "bar"`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -1808,8 +1809,8 @@ func TestValidateTableDesc(t *testing.T) { }, NextColumnID: 3, }}, - {`both generated identity and computed expression specified for column "bar"`, - descpb.TableDescriptor{ + {err: `both generated identity and computed expression specified for column "bar"`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -1821,8 +1822,8 @@ func TestValidateTableDesc(t *testing.T) { }, NextColumnID: 3, }}, - {`both generated identity and computed expression specified for column "bar"`, - descpb.TableDescriptor{ + {err: `both generated identity and computed expression specified for column "bar"`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -1834,8 +1835,8 @@ func TestValidateTableDesc(t *testing.T) { }, NextColumnID: 3, }}, - {`computed column "bar" cannot also have a DEFAULT expression`, - descpb.TableDescriptor{ + {err: `computed column "bar" cannot also have a DEFAULT expression`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -1850,8 +1851,8 @@ func TestValidateTableDesc(t *testing.T) { }, NextColumnID: 2, }}, - {`computed column "bar" cannot also have an ON UPDATE expression`, - descpb.TableDescriptor{ + {err: `computed column "bar" cannot also have an ON UPDATE expression`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -1866,8 +1867,8 @@ func TestValidateTableDesc(t *testing.T) { }, NextColumnID: 2, }}, - {`non-index mutation in state BACKFILLING`, - descpb.TableDescriptor{ + {err: `non-index mutation in state BACKFILLING`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -1889,8 +1890,8 @@ func TestValidateTableDesc(t *testing.T) { }, NextColumnID: 3, }}, - {`non-index mutation in state MERGING`, - descpb.TableDescriptor{ + {err: `non-index mutation in state MERGING`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -1912,8 +1913,8 @@ func TestValidateTableDesc(t *testing.T) { }, NextColumnID: 3, }}, - {`public index "ruroh" is using the delete preserving encoding`, - descpb.TableDescriptor{ + {err: `public index "ruroh" is using the delete preserving encoding`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -1952,8 +1953,8 @@ func TestValidateTableDesc(t *testing.T) { NextFamilyID: 1, NextConstraintID: 2, }}, - {`public index "primary" is using the delete preserving encoding`, - descpb.TableDescriptor{ + {err: `public index "primary" is using the delete preserving encoding`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -1983,8 +1984,8 @@ func TestValidateTableDesc(t *testing.T) { NextConstraintID: 2, }, }, - {`column ID 123 found in depended-on-by references, no such column in this relation`, - descpb.TableDescriptor{ + {err: `column ID 123 found in depended-on-by references, no such column in this relation`, + desc: descpb.TableDescriptor{ Name: "foo", ID: 51, ParentID: 1, @@ -2003,8 +2004,8 @@ func TestValidateTableDesc(t *testing.T) { }, }, }, - {`index ID 123 found in depended-on-by references, no such index in this relation`, - descpb.TableDescriptor{ + {err: `index ID 123 found in depended-on-by references, no such index in this relation`, + desc: descpb.TableDescriptor{ Name: "foo", ID: 51, ParentID: 1, @@ -2023,8 +2024,8 @@ func TestValidateTableDesc(t *testing.T) { }, }, }, - {`Setting sql_stats_automatic_collection_enabled may not be set on virtual table`, - descpb.TableDescriptor{ + {err: `Setting sql_stats_automatic_collection_enabled may not be set on virtual table`, + desc: descpb.TableDescriptor{ ID: catconstants.MinVirtualID, ParentID: 1, Name: "foo", @@ -2035,8 +2036,8 @@ func TestValidateTableDesc(t *testing.T) { NextColumnID: 2, AutoStatsSettings: &catpb.AutoStatsSettings{Enabled: &boolTrue}, }}, - {`Setting sql_stats_automatic_collection_enabled may not be set on a view or sequence`, - descpb.TableDescriptor{ + {err: `Setting sql_stats_automatic_collection_enabled may not be set on a view or sequence`, + desc: descpb.TableDescriptor{ Name: "bar", ID: 52, ParentID: 1, @@ -2051,8 +2052,8 @@ func TestValidateTableDesc(t *testing.T) { Privileges: catpb.NewBasePrivilegeDescriptor(username.AdminRoleName()), AutoStatsSettings: &catpb.AutoStatsSettings{Enabled: &boolTrue}, }}, - {`Setting sql_stats_automatic_collection_enabled may not be set on a view or sequence`, - descpb.TableDescriptor{ + {err: `Setting sql_stats_automatic_collection_enabled may not be set on a view or sequence`, + desc: descpb.TableDescriptor{ ID: 51, ParentID: 1, Name: "foo", @@ -2088,8 +2089,8 @@ func TestValidateTableDesc(t *testing.T) { AutoStatsSettings: &catpb.AutoStatsSettings{Enabled: &boolTrue}, }, }, - {`invalid integer value for sql_stats_automatic_collection_min_stale_rows: cannot be set to a negative value: -1`, - descpb.TableDescriptor{ + {err: `invalid integer value for sql_stats_automatic_collection_min_stale_rows: cannot be set to a negative value: -1`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -2100,8 +2101,8 @@ func TestValidateTableDesc(t *testing.T) { NextColumnID: 2, AutoStatsSettings: &catpb.AutoStatsSettings{MinStaleRows: &negativeOne}, }}, - {`invalid float value for sql_stats_automatic_collection_fraction_stale_rows: cannot set to a negative value: -1.000000`, - descpb.TableDescriptor{ + {err: `invalid float value for sql_stats_automatic_collection_fraction_stale_rows: cannot set to a negative value: -1.000000`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -2112,8 +2113,8 @@ func TestValidateTableDesc(t *testing.T) { NextColumnID: 2, AutoStatsSettings: &catpb.AutoStatsSettings{FractionStaleRows: &negativeOneFloat}, }}, - {`row-level TTL expiration expression "missing_col" refers to unknown columns`, - descpb.TableDescriptor{ + {err: `row-level TTL expiration expression "missing_col" refers to unknown columns`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -2143,8 +2144,8 @@ func TestValidateTableDesc(t *testing.T) { ExpirationExpr: catpb.Expression("missing_col"), }, }}, - {`"ttl_expire_after" and/or "ttl_expiration_expression" must be set`, - descpb.TableDescriptor{ + {err: `"ttl_expire_after" and/or "ttl_expiration_expression" must be set`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -2174,8 +2175,8 @@ func TestValidateTableDesc(t *testing.T) { SelectBatchSize: 5, }, }}, - {`expected column crdb_internal_expiration: column "crdb_internal_expiration" does not exist`, - descpb.TableDescriptor{ + {err: `expected column crdb_internal_expiration: column "crdb_internal_expiration" does not exist`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -2205,8 +2206,8 @@ func TestValidateTableDesc(t *testing.T) { DurationExpr: catpb.Expression("INTERVAL '2 minutes'"), }, }}, - {`expected DEFAULT expression of crdb_internal_expiration to be current_timestamp():::TIMESTAMPTZ + INTERVAL '2 minutes'`, - descpb.TableDescriptor{ + {err: `expected DEFAULT expression of crdb_internal_expiration to be current_timestamp():::TIMESTAMPTZ + INTERVAL '2 minutes'`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -2242,8 +2243,8 @@ func TestValidateTableDesc(t *testing.T) { DurationExpr: catpb.Expression("INTERVAL '2 minutes'"), }, }}, - {`expected ON UPDATE expression of crdb_internal_expiration to be current_timestamp():::TIMESTAMPTZ + INTERVAL '2 minutes'`, - descpb.TableDescriptor{ + {err: `expected ON UPDATE expression of crdb_internal_expiration to be current_timestamp():::TIMESTAMPTZ + INTERVAL '2 minutes'`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -2279,8 +2280,8 @@ func TestValidateTableDesc(t *testing.T) { DurationExpr: catpb.Expression("INTERVAL '2 minutes'"), }, }}, - {`"ttl_select_batch_size" must be at least 1`, - descpb.TableDescriptor{ + {err: `"ttl_select_batch_size" must be at least 1`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -2318,8 +2319,8 @@ func TestValidateTableDesc(t *testing.T) { SelectBatchSize: -2, }, }}, - {`unimplemented: non-ascending ordering on PRIMARY KEYs are not supported with row-level TTL`, - descpb.TableDescriptor{ + {err: `unimplemented: non-ascending ordering on PRIMARY KEYs are not supported with row-level TTL`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -2355,9 +2356,10 @@ func TestValidateTableDesc(t *testing.T) { RowLevelTTL: &catpb.RowLevelTTL{ DurationExpr: catpb.Expression("INTERVAL '2 minutes'"), }, - }}, - {`unknown mutation ID 123 associated with job ID 456`, - descpb.TableDescriptor{ + }, + version: clusterversion.V22_2}, + {err: `unknown mutation ID 123 associated with job ID 456`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -2377,8 +2379,8 @@ func TestValidateTableDesc(t *testing.T) { NextColumnID: 2, NextFamilyID: 1, }}, - {`two job IDs 12345 and 45678 mapped to the same mutation ID 1`, - descpb.TableDescriptor{ + {err: `two job IDs 12345 and 45678 mapped to the same mutation ID 1`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -2419,8 +2421,8 @@ func TestValidateTableDesc(t *testing.T) { NextColumnID: 2, NextFamilyID: 1, }}, - {`invisibility is incompatible with value for not_visible`, - descpb.TableDescriptor{ + {err: `invisibility is incompatible with value for not_visible`, + desc: descpb.TableDescriptor{ ID: 2, ParentID: 1, Name: "foo", @@ -2456,7 +2458,14 @@ func TestValidateTableDesc(t *testing.T) { d.desc.Privileges = catpb.NewBasePrivilegeDescriptor(username.RootUserName()) desc := NewBuilder(&d.desc).BuildImmutableTable() expectedErr := fmt.Sprintf("%s %q (%d): %s", desc.DescriptorType(), desc.GetName(), desc.GetID(), d.err) - err := validate.Self(clusterversion.TestingClusterVersion, desc) + clusterVersion := clusterversion.TestingClusterVersion + version := d.version + if version != 0 { + clusterVersion = clusterversion.ClusterVersion{ + Version: clusterversion.ByKey(version), + } + } + err := validate.Self(clusterVersion, desc) if d.err == "" && err != nil { t.Errorf("%d: expected success, but found error: \"%+v\"", i, err) } else if d.err != "" && err == nil { diff --git a/pkg/sql/create_table.go b/pkg/sql/create_table.go index 104d243dffce..c3f7aa472492 100644 --- a/pkg/sql/create_table.go +++ b/pkg/sql/create_table.go @@ -1474,14 +1474,14 @@ func NewTableDesc( for _, def := range n.Defs { switch def := def.(type) { case *tree.ColumnTableDef: - if def.Name == colinfo.TTLDefaultExpirationColumnName { + if def.Name == catpb.TTLDefaultExpirationColumnName { // If we find the column, make sure it has the expected type. if def.Type.SQLString() != types.TimestampTZ.SQLString() { return nil, pgerror.Newf( pgcode.InvalidTableDefinition, `table %s has TTL defined, but column %s is not a %s`, def.Name, - colinfo.TTLDefaultExpirationColumnName, + catpb.TTLDefaultExpirationColumnName, types.TimestampTZ.SQLString(), ) } @@ -2437,7 +2437,7 @@ func CreateRowLevelTTLScheduledJob( func rowLevelTTLAutomaticColumnDef(ttl *catpb.RowLevelTTL) (*tree.ColumnTableDef, error) { def := &tree.ColumnTableDef{ - Name: colinfo.TTLDefaultExpirationColumnName, + Name: catpb.TTLDefaultExpirationColumnName, Type: types.TimestampTZ, Hidden: true, } diff --git a/pkg/sql/flowinfra/stream_decoder.go b/pkg/sql/flowinfra/stream_decoder.go index 6aeabecc732c..0850791de5f5 100644 --- a/pkg/sql/flowinfra/stream_decoder.go +++ b/pkg/sql/flowinfra/stream_decoder.go @@ -45,8 +45,7 @@ import ( // AddMessage can be called multiple times before getting the rows, but this // will cause data to accumulate internally. type StreamDecoder struct { - types []*types.T - // TODO(yuzefovich): move catenumpb.DatumEncoding into execinfrapb. + types []*types.T encoding []catenumpb.DatumEncoding data []byte numEmptyRows int diff --git a/pkg/sql/logictest/testdata/logic_test/row_level_ttl b/pkg/sql/logictest/testdata/logic_test/row_level_ttl index b54f0c63d7e2..22c97d0545af 100644 --- a/pkg/sql/logictest/testdata/logic_test/row_level_ttl +++ b/pkg/sql/logictest/testdata/logic_test/row_level_ttl @@ -961,9 +961,15 @@ subtest end subtest desc_pk_with_ttl +# TODO(ecwall): remove local-mixed-22.2-23.1 variant in 24.1 +onlyif config local-mixed-22.2-23.1 statement error non-ascending ordering on PRIMARY KEYs are not supported CREATE TABLE tbl_desc_pk_with_ttl (id INT, id2 INT, PRIMARY KEY (id, id2 DESC)) WITH (ttl_expire_after = '10 minutes') +skipif config local-mixed-22.2-23.1 +statement ok +CREATE TABLE tbl_desc_pk_with_ttl (id INT, id2 INT, PRIMARY KEY (id, id2 DESC)) WITH (ttl_expire_after = '10 minutes') + subtest end subtest desc_pk_without_ttl_add_ttl @@ -971,9 +977,15 @@ subtest desc_pk_without_ttl_add_ttl statement ok CREATE TABLE tbl_desc_pk_without_ttl_add_ttl (id INT, id2 INT, PRIMARY KEY (id, id2 DESC)) +# TODO(ecwall): remove local-mixed-22.2-23.1 variant in 24.1 +onlyif config local-mixed-22.2-23.1 statement error non-ascending ordering on PRIMARY KEYs are not supported ALTER TABLE tbl_desc_pk_without_ttl_add_ttl SET (ttl_expire_after = '10 minutes') +skipif config local-mixed-22.2-23.1 +statement ok +ALTER TABLE tbl_desc_pk_without_ttl_add_ttl SET (ttl_expire_after = '10 minutes') + subtest end subtest asc_pk_alter_desc_pk @@ -981,9 +993,15 @@ subtest asc_pk_alter_desc_pk statement ok CREATE TABLE tbl_asc_pk_alter_desc_pk (id INT, id2 INT, PRIMARY KEY (id, id2)) WITH (ttl_expire_after = '10 minutes') +# TODO(ecwall): remove local-mixed-22.2-23.1 variant in 24.1 +onlyif config local-mixed-22.2-23.1 statement error non-ascending ordering on PRIMARY KEYs are not supported ALTER TABLE tbl_asc_pk_alter_desc_pk ALTER PRIMARY KEY USING COLUMNS (id, id2 DESC) +skipif config local-mixed-22.2-23.1 +statement ok +ALTER TABLE tbl_asc_pk_alter_desc_pk ALTER PRIMARY KEY USING COLUMNS (id, id2 DESC) + subtest end subtest create_table_no_ttl_set_ttl_expire_after diff --git a/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/alter_table_alter_column_set_not_null.go b/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/alter_table_alter_column_set_not_null.go index b3e9b3568ab5..67f848c22b96 100644 --- a/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/alter_table_alter_column_set_not_null.go +++ b/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/alter_table_alter_column_set_not_null.go @@ -11,7 +11,7 @@ package scbuildstmt import ( - "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scpb" @@ -40,7 +40,7 @@ func alterColumnPreChecks(b BuildCtx, tn *tree.TableName, tbl *scpb.Table, colum scpb.ForEachRowLevelTTL(b.QueryByID(tbl.TableID), func( _ scpb.Status, _ scpb.TargetStatus, e *scpb.RowLevelTTL, ) { - if columnName == colinfo.TTLDefaultExpirationColumnName && e.HasDurationExpr() { + if columnName == catpb.TTLDefaultExpirationColumnName && e.HasDurationExpr() { panic(pgerror.Newf( pgcode.InvalidTableDefinition, `cannot alter column %s while ttl_expire_after is set`, diff --git a/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/alter_table_alter_primary_key.go b/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/alter_table_alter_primary_key.go index 934ab56a7cec..72551dd341ea 100644 --- a/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/alter_table_alter_primary_key.go +++ b/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/alter_table_alter_primary_key.go @@ -418,9 +418,11 @@ func fallBackIfDescColInRowLevelTTLTables(b BuildCtx, tableID catid.DescID, t al // It's a row-level-ttl table. Ensure it has no non-descending // key columns, and there is no inbound/outbound foreign keys. - for _, col := range t.Columns { - if indexColumnDirection(col.Direction) != catenumpb.IndexColumn_ASC { - panic(scerrors.NotImplementedErrorf(t.n, "non-ascending ordering on PRIMARY KEYs are not supported")) + if !b.ClusterSettings().Version.IsActive(b, clusterversion.V23_2TTLAllowDescPK) { + for _, col := range t.Columns { + if indexColumnDirection(col.Direction) != catenumpb.IndexColumn_ASC { + panic(scerrors.NotImplementedErrorf(t.n, "non-ascending ordering on PRIMARY KEYs are not supported")) + } } } } diff --git a/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/alter_table_drop_column.go b/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/alter_table_drop_column.go index c14a27f85939..6820f6a402c3 100644 --- a/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/alter_table_drop_column.go +++ b/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/alter_table_drop_column.go @@ -15,7 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/sql/catalog" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" @@ -81,7 +81,7 @@ func checkRowLevelTTLColumn( if rowLevelTTL == nil { return } - if rowLevelTTL.DurationExpr != "" && n.Column == colinfo.TTLDefaultExpirationColumnName { + if rowLevelTTL.DurationExpr != "" && n.Column == catpb.TTLDefaultExpirationColumnName { panic(errors.WithHintf( pgerror.Newf( pgcode.InvalidTableDefinition, diff --git a/pkg/sql/sqlstats/insights/detector.go b/pkg/sql/sqlstats/insights/detector.go index c64bd1b3efec..67d28bc02ed4 100644 --- a/pkg/sql/sqlstats/insights/detector.go +++ b/pkg/sql/sqlstats/insights/detector.go @@ -81,8 +81,8 @@ func (d *anomalyDetector) isSlow(stmt *Statement) (decision bool) { d.withFingerprintLatencySummary(stmt, func(latencySummary *quantile.Stream) { latencySummary.Insert(stmt.LatencyInSeconds) - p50 := latencySummary.Query(0.5) - p99 := latencySummary.Query(0.99) + p50 := latencySummary.Query(0.5, true) + p99 := latencySummary.Query(0.99, true) decision = stmt.LatencyInSeconds >= p99 && stmt.LatencyInSeconds >= 2*p50 && stmt.LatencyInSeconds >= AnomalyDetectionLatencyThreshold.Get(&d.settings.SV).Seconds() @@ -91,7 +91,9 @@ func (d *anomalyDetector) isSlow(stmt *Statement) (decision bool) { return } -func (d *anomalyDetector) GetPercentileValues(id appstatspb.StmtFingerprintID) PercentileValues { +func (d *anomalyDetector) GetPercentileValues( + id appstatspb.StmtFingerprintID, shouldFlush bool, +) PercentileValues { // latencySummary.Query might modify its own state (Stream.flush), so a read-write lock is necessary. d.mu.Lock() defer d.mu.Unlock() @@ -100,9 +102,9 @@ func (d *anomalyDetector) GetPercentileValues(id appstatspb.StmtFingerprintID) P latencySummary := entry.Value.(latencySummaryEntry).value // If more percentiles are added, update the value of `desiredQuantiles` above // to include the new keys. - latencies.P50 = latencySummary.Query(0.5) - latencies.P90 = latencySummary.Query(0.9) - latencies.P99 = latencySummary.Query(0.99) + latencies.P50 = latencySummary.Query(0.5, shouldFlush) + latencies.P90 = latencySummary.Query(0.9, shouldFlush) + latencies.P99 = latencySummary.Query(0.99, shouldFlush) } return latencies } diff --git a/pkg/sql/sqlstats/insights/insights.go b/pkg/sql/sqlstats/insights/insights.go index 3d047468ef56..c316ccc5b90b 100644 --- a/pkg/sql/sqlstats/insights/insights.go +++ b/pkg/sql/sqlstats/insights/insights.go @@ -155,7 +155,7 @@ type Reader interface { } type LatencyInformation interface { - GetPercentileValues(fingerprintID appstatspb.StmtFingerprintID) PercentileValues + GetPercentileValues(fingerprintID appstatspb.StmtFingerprintID, shouldFlush bool) PercentileValues } type PercentileValues struct { diff --git a/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go b/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go index 766fa64e6558..bd9da99381bc 100644 --- a/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go +++ b/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go @@ -144,7 +144,9 @@ func (s *Container) RecordStatement( // Percentile latencies are only being sampled if the latency was above the // AnomalyDetectionLatencyThreshold. - latencies := s.latencyInformation.GetPercentileValues(stmtFingerprintID) + // The Insights detector already does a flush when detecting for anomaly latency, + // so there is no need to force a flush when retrieving the data during this step. + latencies := s.latencyInformation.GetPercentileValues(stmtFingerprintID, false) latencyInfo := appstatspb.LatencyInfo{ Min: value.ServiceLatency, Max: value.ServiceLatency, diff --git a/pkg/sql/ttl/ttlbase/BUILD.bazel b/pkg/sql/ttl/ttlbase/BUILD.bazel index b3cb815e5a72..ee3ee6588bae 100644 --- a/pkg/sql/ttl/ttlbase/BUILD.bazel +++ b/pkg/sql/ttl/ttlbase/BUILD.bazel @@ -1,12 +1,31 @@ load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "ttlbase", - srcs = ["ttl_helpers.go"], + srcs = [ + "ttl_helpers.go", + "ttl_test_util.go", + ], importpath = "github.com/cockroachdb/cockroach/pkg/sql/ttl/ttlbase", visibility = ["//visibility:public"], - deps = ["//pkg/sql/lexbase"], + deps = [ + "//pkg/sql/catalog/catenumpb", + "//pkg/sql/catalog/catpb", + ], +) + +go_test( + name = "ttlbase_test", + srcs = ["ttl_helpers_test.go"], + args = ["-test.timeout=295s"], + embed = [":ttlbase"], + deps = [ + "//pkg/sql/catalog/catenumpb", + "//pkg/util/leaktest", + "//pkg/util/log", + "@com_github_stretchr_testify//require", + ], ) get_x_data(name = "get_x_data") diff --git a/pkg/sql/ttl/ttlbase/ttl_helpers.go b/pkg/sql/ttl/ttlbase/ttl_helpers.go index 55a340cb9190..d63642c031b2 100644 --- a/pkg/sql/ttl/ttlbase/ttl_helpers.go +++ b/pkg/sql/ttl/ttlbase/ttl_helpers.go @@ -12,42 +12,164 @@ package ttlbase import ( "bytes" + "strconv" "time" - "github.com/cockroachdb/cockroach/pkg/sql/lexbase" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/catenumpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb" ) // DefaultAOSTDuration is the default duration to use in the AS OF SYSTEM TIME // clause used in the SELECT query. const DefaultAOSTDuration = -time.Second * 30 -// SelectTemplate is the format string used to build SELECT queries for the -// TTL job. -const SelectTemplate = `SELECT %[1]s FROM [%[2]d AS tbl_name] -AS OF SYSTEM TIME INTERVAL '%[3]d seconds' -WHERE %[4]s <= $1 -%[5]s%[6]s -ORDER BY %[1]s -LIMIT %[7]v` - -// DeleteTemplate is the format string used to build DELETE queries for the -// TTL job. -const DeleteTemplate = `DELETE FROM [%d AS tbl_name] -WHERE %s <= $1 -AND (%s) IN (%s)` - -// MakeColumnNamesSQL converts columns into an escape string -// for an order by clause, e.g.: -// -// {"a", "b"} => a, b -// {"escape-me", "b"} => "escape-me", b -func MakeColumnNamesSQL(columns []string) string { - var b bytes.Buffer - for i, pkColumn := range columns { +var startKeyCompareOps = map[catenumpb.IndexColumn_Direction]string{ + catenumpb.IndexColumn_ASC: ">", + catenumpb.IndexColumn_DESC: "<", +} +var endKeyCompareOps = map[catenumpb.IndexColumn_Direction]string{ + catenumpb.IndexColumn_ASC: "<", + catenumpb.IndexColumn_DESC: ">", +} + +func BuildSelectQuery( + relationName string, + pkColNames []string, + pkColDirs []catenumpb.IndexColumn_Direction, + aostDuration time.Duration, + ttlExpr catpb.Expression, + numStartQueryBounds, numEndQueryBounds int, + limit int64, + startIncl bool, +) string { + numPkCols := len(pkColNames) + if numPkCols == 0 { + panic("pkColNames is empty") + } + if numPkCols != len(pkColDirs) { + panic("different number of pkColNames and pkColDirs") + } + var buf bytes.Buffer + // SELECT + buf.WriteString("SELECT ") + for i := range pkColNames { + if i > 0 { + buf.WriteString(", ") + } + buf.WriteString(pkColNames[i]) + } + // FROM + buf.WriteString("\nFROM ") + buf.WriteString(relationName) + // AS OF SYSTEM TIME + buf.WriteString("\nAS OF SYSTEM TIME INTERVAL '") + buf.WriteString(strconv.Itoa(int(aostDuration.Milliseconds()) / 1000)) + buf.WriteString(" seconds'") + // WHERE + buf.WriteString("\nWHERE ((") + buf.WriteString(string(ttlExpr)) + buf.WriteString(") <= $1)") + writeBounds := func( + numQueryBounds int, + placeholderOffset int, + compareOps map[catenumpb.IndexColumn_Direction]string, + inclusive bool, + ) { + if numQueryBounds > 0 { + buf.WriteString("\nAND (") + for i := 0; i < numQueryBounds; i++ { + isLast := i == numQueryBounds-1 + buf.WriteString("\n (") + for j := 0; j < i; j++ { + buf.WriteString(pkColNames[j]) + buf.WriteString(" = $") + buf.WriteString(strconv.Itoa(j + placeholderOffset)) + buf.WriteString(" AND ") + } + buf.WriteString(pkColNames[i]) + buf.WriteString(" ") + buf.WriteString(compareOps[pkColDirs[i]]) + if isLast && inclusive { + buf.WriteString("=") + } + buf.WriteString(" $") + buf.WriteString(strconv.Itoa(i + placeholderOffset)) + buf.WriteString(")") + if !isLast { + buf.WriteString(" OR") + } + } + buf.WriteString("\n)") + } + } + const endPlaceholderOffset = 2 + writeBounds( + numStartQueryBounds, + endPlaceholderOffset+numEndQueryBounds, + startKeyCompareOps, + startIncl, + ) + writeBounds( + numEndQueryBounds, + endPlaceholderOffset, + endKeyCompareOps, + false, /*inclusive*/ + ) + + // ORDER BY + buf.WriteString("\nORDER BY ") + for i := range pkColNames { if i > 0 { - b.WriteString(", ") + buf.WriteString(", ") + } + buf.WriteString(pkColNames[i]) + buf.WriteString(" ") + buf.WriteString(pkColDirs[i].String()) + } + // LIMIT + buf.WriteString("\nLIMIT ") + buf.WriteString(strconv.Itoa(int(limit))) + return buf.String() +} + +func BuildDeleteQuery( + relationName string, pkColNames []string, ttlExpr catpb.Expression, numRows int, +) string { + if len(pkColNames) == 0 { + panic("pkColNames is empty") + } + var buf bytes.Buffer + // DELETE + buf.WriteString("DELETE FROM ") + buf.WriteString(relationName) + // WHERE + buf.WriteString("\nWHERE ((") + buf.WriteString(string(ttlExpr)) + buf.WriteString(") <= $1)") + if numRows > 0 { + buf.WriteString("\nAND (") + for i := range pkColNames { + if i > 0 { + buf.WriteString(", ") + } + buf.WriteString(pkColNames[i]) + } + buf.WriteString(") IN (") + for i := 0; i < numRows; i++ { + if i > 0 { + buf.WriteString(", ") + } + buf.WriteString("(") + for j := range pkColNames { + if j > 0 { + buf.WriteString(", ") + } + buf.WriteString("$") + buf.WriteString(strconv.Itoa(i*len(pkColNames) + j + 2)) + } + buf.WriteString(")") } - lexbase.EncodeRestrictedSQLIdent(&b, pkColumn, lexbase.EncNoFlags) + buf.WriteString(")") } - return b.String() + return buf.String() } diff --git a/pkg/sql/ttl/ttlbase/ttl_helpers_test.go b/pkg/sql/ttl/ttlbase/ttl_helpers_test.go new file mode 100644 index 000000000000..b1f1a689e23a --- /dev/null +++ b/pkg/sql/ttl/ttlbase/ttl_helpers_test.go @@ -0,0 +1,398 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package ttlbase + +import ( + "testing" + + "github.com/cockroachdb/cockroach/pkg/sql/catalog/catenumpb" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" +) + +const ( + relationName = "relation_name" + ttlExpr = "expire_at" +) + +func TestBuildSelectQuery(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testCases := []struct { + desc string + pkColDirs []catenumpb.IndexColumn_Direction + numStartQueryBounds int + numEndQueryBounds int + startIncl bool + expectedQuery string + }{ + { + desc: "ASC", + pkColDirs: []catenumpb.IndexColumn_Direction{ + catenumpb.IndexColumn_ASC, + }, + numStartQueryBounds: 1, + numEndQueryBounds: 1, + expectedQuery: `SELECT col0 +FROM relation_name +AS OF SYSTEM TIME INTERVAL '-30 seconds' +WHERE ((expire_at) <= $1) +AND ( + (col0 > $3) +) +AND ( + (col0 < $2) +) +ORDER BY col0 ASC +LIMIT 2`, + }, + { + desc: "DESC", + pkColDirs: []catenumpb.IndexColumn_Direction{ + catenumpb.IndexColumn_DESC, + }, + numStartQueryBounds: 1, + numEndQueryBounds: 1, + expectedQuery: `SELECT col0 +FROM relation_name +AS OF SYSTEM TIME INTERVAL '-30 seconds' +WHERE ((expire_at) <= $1) +AND ( + (col0 < $3) +) +AND ( + (col0 > $2) +) +ORDER BY col0 DESC +LIMIT 2`, + }, + { + desc: "ASC empty", + pkColDirs: []catenumpb.IndexColumn_Direction{ + catenumpb.IndexColumn_ASC, + }, + numStartQueryBounds: 0, + numEndQueryBounds: 0, + expectedQuery: `SELECT col0 +FROM relation_name +AS OF SYSTEM TIME INTERVAL '-30 seconds' +WHERE ((expire_at) <= $1) +ORDER BY col0 ASC +LIMIT 2`, + }, + { + desc: "DESC empty", + pkColDirs: []catenumpb.IndexColumn_Direction{ + catenumpb.IndexColumn_DESC, + }, + numStartQueryBounds: 0, + numEndQueryBounds: 0, + startIncl: true, + expectedQuery: `SELECT col0 +FROM relation_name +AS OF SYSTEM TIME INTERVAL '-30 seconds' +WHERE ((expire_at) <= $1) +ORDER BY col0 DESC +LIMIT 2`, + }, + { + desc: "ASC startIncl", + pkColDirs: []catenumpb.IndexColumn_Direction{ + catenumpb.IndexColumn_ASC, + }, + numStartQueryBounds: 1, + numEndQueryBounds: 1, + startIncl: true, + expectedQuery: `SELECT col0 +FROM relation_name +AS OF SYSTEM TIME INTERVAL '-30 seconds' +WHERE ((expire_at) <= $1) +AND ( + (col0 >= $3) +) +AND ( + (col0 < $2) +) +ORDER BY col0 ASC +LIMIT 2`, + }, + { + desc: "DESC startIncl", + pkColDirs: []catenumpb.IndexColumn_Direction{ + catenumpb.IndexColumn_DESC, + }, + numStartQueryBounds: 1, + numEndQueryBounds: 1, + startIncl: true, + expectedQuery: `SELECT col0 +FROM relation_name +AS OF SYSTEM TIME INTERVAL '-30 seconds' +WHERE ((expire_at) <= $1) +AND ( + (col0 <= $3) +) +AND ( + (col0 > $2) +) +ORDER BY col0 DESC +LIMIT 2`, + }, + { + desc: "ASC ASC", + pkColDirs: []catenumpb.IndexColumn_Direction{ + catenumpb.IndexColumn_ASC, + catenumpb.IndexColumn_ASC, + }, + numStartQueryBounds: 2, + numEndQueryBounds: 2, + expectedQuery: `SELECT col0, col1 +FROM relation_name +AS OF SYSTEM TIME INTERVAL '-30 seconds' +WHERE ((expire_at) <= $1) +AND ( + (col0 > $4) OR + (col0 = $4 AND col1 > $5) +) +AND ( + (col0 < $2) OR + (col0 = $2 AND col1 < $3) +) +ORDER BY col0 ASC, col1 ASC +LIMIT 2`, + }, + { + desc: "ASC ASC partial start", + pkColDirs: []catenumpb.IndexColumn_Direction{ + catenumpb.IndexColumn_ASC, + catenumpb.IndexColumn_ASC, + }, + numStartQueryBounds: 1, + numEndQueryBounds: 2, + expectedQuery: `SELECT col0, col1 +FROM relation_name +AS OF SYSTEM TIME INTERVAL '-30 seconds' +WHERE ((expire_at) <= $1) +AND ( + (col0 > $4) +) +AND ( + (col0 < $2) OR + (col0 = $2 AND col1 < $3) +) +ORDER BY col0 ASC, col1 ASC +LIMIT 2`, + }, + { + desc: "ASC ASC partial end", + pkColDirs: []catenumpb.IndexColumn_Direction{ + catenumpb.IndexColumn_ASC, + catenumpb.IndexColumn_ASC, + }, + numStartQueryBounds: 2, + numEndQueryBounds: 1, + expectedQuery: `SELECT col0, col1 +FROM relation_name +AS OF SYSTEM TIME INTERVAL '-30 seconds' +WHERE ((expire_at) <= $1) +AND ( + (col0 > $3) OR + (col0 = $3 AND col1 > $4) +) +AND ( + (col0 < $2) +) +ORDER BY col0 ASC, col1 ASC +LIMIT 2`, + }, + { + desc: "DESC DESC", + pkColDirs: []catenumpb.IndexColumn_Direction{ + catenumpb.IndexColumn_DESC, + catenumpb.IndexColumn_DESC, + }, + numStartQueryBounds: 2, + numEndQueryBounds: 2, + expectedQuery: `SELECT col0, col1 +FROM relation_name +AS OF SYSTEM TIME INTERVAL '-30 seconds' +WHERE ((expire_at) <= $1) +AND ( + (col0 < $4) OR + (col0 = $4 AND col1 < $5) +) +AND ( + (col0 > $2) OR + (col0 = $2 AND col1 > $3) +) +ORDER BY col0 DESC, col1 DESC +LIMIT 2`, + }, + { + desc: "DESC DESC partial start", + pkColDirs: []catenumpb.IndexColumn_Direction{ + catenumpb.IndexColumn_DESC, + catenumpb.IndexColumn_DESC, + }, + numStartQueryBounds: 1, + numEndQueryBounds: 2, + expectedQuery: `SELECT col0, col1 +FROM relation_name +AS OF SYSTEM TIME INTERVAL '-30 seconds' +WHERE ((expire_at) <= $1) +AND ( + (col0 < $4) +) +AND ( + (col0 > $2) OR + (col0 = $2 AND col1 > $3) +) +ORDER BY col0 DESC, col1 DESC +LIMIT 2`, + }, + { + desc: "DESC DESC partial end", + pkColDirs: []catenumpb.IndexColumn_Direction{ + catenumpb.IndexColumn_DESC, + catenumpb.IndexColumn_DESC, + }, + numStartQueryBounds: 2, + numEndQueryBounds: 1, + expectedQuery: `SELECT col0, col1 +FROM relation_name +AS OF SYSTEM TIME INTERVAL '-30 seconds' +WHERE ((expire_at) <= $1) +AND ( + (col0 < $3) OR + (col0 = $3 AND col1 < $4) +) +AND ( + (col0 > $2) +) +ORDER BY col0 DESC, col1 DESC +LIMIT 2`, + }, + { + desc: "ASC DESC ASC", + pkColDirs: []catenumpb.IndexColumn_Direction{ + catenumpb.IndexColumn_ASC, + catenumpb.IndexColumn_DESC, + catenumpb.IndexColumn_ASC, + }, + numStartQueryBounds: 3, + numEndQueryBounds: 3, + expectedQuery: `SELECT col0, col1, col2 +FROM relation_name +AS OF SYSTEM TIME INTERVAL '-30 seconds' +WHERE ((expire_at) <= $1) +AND ( + (col0 > $5) OR + (col0 = $5 AND col1 < $6) OR + (col0 = $5 AND col1 = $6 AND col2 > $7) +) +AND ( + (col0 < $2) OR + (col0 = $2 AND col1 > $3) OR + (col0 = $2 AND col1 = $3 AND col2 < $4) +) +ORDER BY col0 ASC, col1 DESC, col2 ASC +LIMIT 2`, + }, + { + desc: "DESC ASC DESC", + pkColDirs: []catenumpb.IndexColumn_Direction{ + catenumpb.IndexColumn_DESC, + catenumpb.IndexColumn_ASC, + catenumpb.IndexColumn_DESC, + }, + numStartQueryBounds: 3, + numEndQueryBounds: 3, + expectedQuery: `SELECT col0, col1, col2 +FROM relation_name +AS OF SYSTEM TIME INTERVAL '-30 seconds' +WHERE ((expire_at) <= $1) +AND ( + (col0 < $5) OR + (col0 = $5 AND col1 > $6) OR + (col0 = $5 AND col1 = $6 AND col2 < $7) +) +AND ( + (col0 > $2) OR + (col0 = $2 AND col1 < $3) OR + (col0 = $2 AND col1 = $3 AND col2 > $4) +) +ORDER BY col0 DESC, col1 ASC, col2 DESC +LIMIT 2`, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + pkColDirs := tc.pkColDirs + pkColNames := GenPKColNames(len(pkColDirs)) + actualQuery := BuildSelectQuery( + relationName, + pkColNames, + pkColDirs, + DefaultAOSTDuration, + ttlExpr, + tc.numStartQueryBounds, + tc.numEndQueryBounds, + 2, /*limit*/ + tc.startIncl, + ) + require.Equal(t, tc.expectedQuery, actualQuery) + }) + } +} + +func TestBuildDeleteQuery(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testCases := []struct { + desc string + numPKCols int + numRows int + expectedQuery string + }{ + { + desc: "1 PK col - 1 row", + numPKCols: 1, + numRows: 1, + expectedQuery: `DELETE FROM relation_name +WHERE ((expire_at) <= $1) +AND (col0) IN (($2))`, + }, + { + desc: "3 PK cols - 3 rows", + numPKCols: 3, + numRows: 3, + expectedQuery: `DELETE FROM relation_name +WHERE ((expire_at) <= $1) +AND (col0, col1, col2) IN (($2, $3, $4), ($5, $6, $7), ($8, $9, $10))`, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + pkColNames := GenPKColNames(tc.numPKCols) + actualQuery := BuildDeleteQuery( + relationName, + pkColNames, + ttlExpr, + tc.numRows, + ) + require.Equal(t, tc.expectedQuery, actualQuery) + }) + } +} diff --git a/pkg/sql/catalog/catpb/ttl.go b/pkg/sql/ttl/ttlbase/ttl_test_util.go similarity index 51% rename from pkg/sql/catalog/catpb/ttl.go rename to pkg/sql/ttl/ttlbase/ttl_test_util.go index fe2b20736cce..e851e9e281b5 100644 --- a/pkg/sql/catalog/catpb/ttl.go +++ b/pkg/sql/ttl/ttlbase/ttl_test_util.go @@ -1,4 +1,4 @@ -// Copyright 2022 The Cockroach Authors. +// Copyright 2023 The Cockroach Authors. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt. @@ -8,12 +8,15 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package catpb +package ttlbase -// DeletionCronOrDefault returns the DeletionCron or the global default. -func (m *RowLevelTTL) DeletionCronOrDefault() string { - if override := m.DeletionCron; override != "" { - return override +import "fmt" + +// GenPKColNames generates column names col0, col1, col2, etc for tests. +func GenPKColNames(numPKCols int) []string { + names := make([]string, 0, numPKCols) + for i := 0; i < numPKCols; i++ { + names = append(names, fmt.Sprintf("col%d", i)) } - return "@hourly" + return names } diff --git a/pkg/sql/ttl/ttljob/BUILD.bazel b/pkg/sql/ttl/ttljob/BUILD.bazel index d6fa1ca94143..ea7eed1717a2 100644 --- a/pkg/sql/ttl/ttljob/BUILD.bazel +++ b/pkg/sql/ttl/ttljob/BUILD.bazel @@ -25,12 +25,11 @@ go_library( "//pkg/sql/catalog", "//pkg/sql/catalog/catenumpb", "//pkg/sql/catalog/catpb", - "//pkg/sql/catalog/colinfo", - "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/descs", "//pkg/sql/execinfra", "//pkg/sql/execinfrapb", "//pkg/sql/isql", + "//pkg/sql/lexbase", "//pkg/sql/physicalplan", "//pkg/sql/rowenc", "//pkg/sql/rowexec", @@ -61,8 +60,8 @@ go_test( "ttljob_test.go", ], args = ["-test.timeout=295s"], - embed = [":ttljob"], deps = [ + ":ttljob", "//pkg/base", "//pkg/ccl/kvccl/kvtenantccl", "//pkg/jobs", @@ -77,8 +76,10 @@ go_test( "//pkg/server", "//pkg/sql", "//pkg/sql/catalog", + "//pkg/sql/catalog/catenumpb", "//pkg/sql/catalog/colinfo", "//pkg/sql/catalog/desctestutils", + "//pkg/sql/isql", "//pkg/sql/lexbase", "//pkg/sql/parser", "//pkg/sql/randgen", diff --git a/pkg/sql/ttl/ttljob/ttljob.go b/pkg/sql/ttl/ttljob/ttljob.go index 6ff4642c350a..2a7d0423fdfc 100644 --- a/pkg/sql/ttl/ttljob/ttljob.go +++ b/pkg/sql/ttl/ttljob/ttljob.go @@ -25,7 +25,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/isql" @@ -150,10 +149,7 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err return err } - ttlExpr := colinfo.DefaultTTLExpirationExpr - if rowLevelTTL.HasExpirationExpr() { - ttlExpr = "(" + rowLevelTTL.ExpirationExpr + ")" - } + ttlExpr := rowLevelTTL.GetTTLExpr() labelMetrics := rowLevelTTL.LabelMetrics group := ctxgroup.WithContext(ctx) diff --git a/pkg/sql/ttl/ttljob/ttljob_metrics.go b/pkg/sql/ttl/ttljob/ttljob_metrics.go index ae526f574226..7670f44a28f6 100644 --- a/pkg/sql/ttl/ttljob/ttljob_metrics.go +++ b/pkg/sql/ttl/ttljob/ttljob_metrics.go @@ -212,7 +212,7 @@ func (m *rowLevelTTLMetrics) fetchStatistics( }, { opName: fmt.Sprintf("ttl num expired rows stats %s", relationName), - query: `SELECT count(1) FROM [%d AS t] AS OF SYSTEM TIME %s WHERE ` + string(ttlExpr) + ` < $1`, + query: `SELECT count(1) FROM [%d AS t] AS OF SYSTEM TIME %s WHERE (` + string(ttlExpr) + `) < $1`, args: []interface{}{details.Cutoff}, gauge: m.TotalExpiredRows, }, diff --git a/pkg/sql/ttl/ttljob/ttljob_processor.go b/pkg/sql/ttl/ttljob/ttljob_processor.go index a460ef93f8b9..949ede9daeb2 100644 --- a/pkg/sql/ttl/ttljob/ttljob_processor.go +++ b/pkg/sql/ttl/ttljob/ttljob_processor.go @@ -11,6 +11,7 @@ package ttljob import ( + "bytes" "context" "runtime" "sync/atomic" @@ -24,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/sql/lexbase" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/rowexec" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -56,6 +58,7 @@ func (t *ttlProcessor) work(ctx context.Context) error { descsCol := flowCtx.Descriptors codec := serverCfg.Codec details := ttlSpec.RowLevelTTLDetails + tableID := details.TableID deleteRateLimit := ttlSpec.DeleteRateLimit deleteRateLimiter := quotapool.NewRateLimiter( @@ -66,28 +69,36 @@ func (t *ttlProcessor) work(ctx context.Context) error { processorRowCount := int64(0) - var relationName string - var pkColumns []string - var pkTypes []*types.T - var colDirs []catenumpb.IndexColumn_Direction - var labelMetrics bool + var ( + relationName string + pkColNames []string + pkColTypes []*types.T + pkColDirs []catenumpb.IndexColumn_Direction + labelMetrics bool + ) if err := serverCfg.DB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - desc, err := descsCol.ByIDWithLeased(txn.KV()).WithoutNonPublic().Get().Table(ctx, details.TableID) + desc, err := descsCol.ByIDWithLeased(txn.KV()).WithoutNonPublic().Get().Table(ctx, tableID) if err != nil { return err } + var buf bytes.Buffer primaryIndexDesc := desc.GetPrimaryIndex().IndexDesc() - pkColumns = primaryIndexDesc.KeyColumnNames - pkTypes = make([]*types.T, 0, len(primaryIndexDesc.KeyColumnIDs)) + pkColNames = make([]string, 0, len(primaryIndexDesc.KeyColumnNames)) + for _, name := range primaryIndexDesc.KeyColumnNames { + lexbase.EncodeRestrictedSQLIdent(&buf, name, lexbase.EncNoFlags) + pkColNames = append(pkColNames, buf.String()) + buf.Reset() + } + pkColTypes = make([]*types.T, 0, len(primaryIndexDesc.KeyColumnIDs)) for _, id := range primaryIndexDesc.KeyColumnIDs { col, err := catalog.MustFindColumnByID(desc, id) if err != nil { return err } - pkTypes = append(pkTypes, col.GetType()) + pkColTypes = append(pkColTypes, col.GetType()) } - colDirs = primaryIndexDesc.KeyColumnDirections + pkColDirs = primaryIndexDesc.KeyColumnDirections if !desc.HasRowLevelTTL() { return errors.Newf("unable to find TTL on table %s", desc.GetName()) @@ -120,7 +131,7 @@ func (t *ttlProcessor) work(ctx context.Context) error { processorConcurrency = processorSpanCount } err := func() error { - spanChan := make(chan spanToProcess, processorConcurrency) + spanChan := make(chan QueryBounds, processorConcurrency) defer close(spanChan) for i := int64(0); i < processorConcurrency; i++ { group.GoCtx(func(ctx context.Context) error { @@ -130,7 +141,8 @@ func (t *ttlProcessor) work(ctx context.Context) error { ctx, metrics, spanToProcess, - pkColumns, + pkColNames, + pkColDirs, relationName, deleteRateLimiter, ) @@ -153,18 +165,18 @@ func (t *ttlProcessor) work(ctx context.Context) error { var alloc tree.DatumAlloc for _, span := range ttlSpec.Spans { startKey := span.Key - startPK, err := rowenc.DecodeIndexKeyToDatums(codec, pkTypes, colDirs, startKey, &alloc) + startPK, err := rowenc.DecodeIndexKeyToDatums(codec, pkColTypes, pkColDirs, startKey, &alloc) if err != nil { - return errors.Wrapf(err, "decode startKey error pkTypes=%s colDirs=%s key=%x", pkTypes, colDirs, []byte(startKey)) + return errors.Wrapf(err, "decode startKey error key=%x", []byte(startKey)) } endKey := span.EndKey - endPK, err := rowenc.DecodeIndexKeyToDatums(codec, pkTypes, colDirs, endKey, &alloc) + endPK, err := rowenc.DecodeIndexKeyToDatums(codec, pkColTypes, pkColDirs, endKey, &alloc) if err != nil { - return errors.Wrapf(err, "decode endKey error pkTypes=%s colDirs=%s key=%x", pkTypes, colDirs, []byte(startKey)) + return errors.Wrapf(err, "decode endKey error key=%x", []byte(endKey)) } - spanChan <- spanToProcess{ - startPK: startPK, - endPK: endPK, + spanChan <- QueryBounds{ + Start: startPK, + End: endPK, } } return nil @@ -201,7 +213,7 @@ func (t *ttlProcessor) work(ctx context.Context) error { ctx, 2, /* level */ "TTL processorRowCount updated jobID=%d processorID=%d sqlInstanceID=%d tableID=%d jobRowCount=%d processorRowCount=%d", - jobID, processorID, sqlInstanceID, details.TableID, rowLevelTTL.JobRowCount, processorRowCount, + jobID, processorID, sqlInstanceID, tableID, rowLevelTTL.JobRowCount, processorRowCount, ) return nil }, @@ -212,8 +224,9 @@ func (t *ttlProcessor) work(ctx context.Context) error { func (t *ttlProcessor) runTTLOnSpan( ctx context.Context, metrics rowLevelTTLMetrics, - spanToProcess spanToProcess, - pkColumns []string, + bounds QueryBounds, + pkColNames []string, + pkColDirs []catenumpb.IndexColumn_Direction, relationName string, deleteRateLimiter *quotapool.RateLimiter, ) (spanRowCount int64, err error) { @@ -224,7 +237,6 @@ func (t *ttlProcessor) runTTLOnSpan( ttlSpec := t.ttlSpec details := ttlSpec.RowLevelTTLDetails - tableID := details.TableID cutoff := details.Cutoff ttlExpr := ttlSpec.TTLExpr flowCtx := t.FlowCtx @@ -243,21 +255,20 @@ func (t *ttlProcessor) runTTLOnSpan( } } - selectBuilder := makeSelectQueryBuilder( - tableID, + selectBuilder := MakeSelectQueryBuilder( cutoff, - pkColumns, + pkColNames, + pkColDirs, relationName, - spanToProcess, + bounds, aostDuration, selectBatchSize, ttlExpr, ) deleteBatchSize := ttlSpec.DeleteBatchSize - deleteBuilder := makeDeleteQueryBuilder( - tableID, + deleteBuilder := MakeDeleteQueryBuilder( cutoff, - pkColumns, + pkColNames, relationName, deleteBatchSize, ttlExpr, @@ -286,7 +297,7 @@ func (t *ttlProcessor) runTTLOnSpan( // Step 1. Fetch some rows we want to delete using a historical // SELECT query. start := timeutil.Now() - expiredRowsPKs, err := selectBuilder.run(ctx, ie) + expiredRowsPKs, hasNext, err := selectBuilder.Run(ctx, ie) metrics.SelectDuration.RecordValue(int64(timeutil.Since(start))) if err != nil { return spanRowCount, errors.Wrapf(err, "error selecting rows to delete") @@ -321,7 +332,7 @@ func (t *ttlProcessor) runTTLOnSpan( defer tokens.Consume() start := timeutil.Now() - batchRowCount, err := deleteBuilder.run(ctx, txn, deleteBatch) + batchRowCount, err := deleteBuilder.Run(ctx, txn, deleteBatch) if err != nil { return err } @@ -342,7 +353,7 @@ func (t *ttlProcessor) runTTLOnSpan( // If we selected less than the select batch size, we have selected every // row and so we end it here. - if numExpiredRows < selectBatchSize { + if !hasNext { break } } diff --git a/pkg/sql/ttl/ttljob/ttljob_query_builder.go b/pkg/sql/ttl/ttljob/ttljob_query_builder.go index e09943d689e1..fb9808ef446b 100644 --- a/pkg/sql/ttl/ttljob/ttljob_query_builder.go +++ b/pkg/sql/ttl/ttljob/ttljob_query_builder.go @@ -16,8 +16,8 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/catenumpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" @@ -26,13 +26,18 @@ import ( "github.com/cockroachdb/errors" ) -// selectQueryBuilder is responsible for maintaining state around the -// SELECT portion of the TTL job. -type selectQueryBuilder struct { - tableID descpb.ID - pkColumns []string +type QueryBounds struct { + Start, End tree.Datums +} + +// SelectQueryBuilder is responsible for maintaining state around the SELECT +// portion of the TTL job. +type SelectQueryBuilder struct { + relationName string + pkColNames []string + pkColDirs []catenumpb.IndexColumn_Direction selectOpName string - spanToProcess spanToProcess + bounds QueryBounds selectBatchSize int64 aostDuration time.Duration ttlExpr catpb.Expression @@ -45,134 +50,87 @@ type selectQueryBuilder struct { // cachedArgs keeps a cache of args to use in the run query. // The cache is of form [cutoff, , ]. cachedArgs []interface{} - // pkColumnNamesSQL caches the column names of the PK. - pkColumnNamesSQL string - // endPKColumnNamesSQL caches the column names of the ending PK. - endPKColumnNamesSQL string } -type spanToProcess struct { - startPK, endPK tree.Datums -} - -func makeSelectQueryBuilder( - tableID descpb.ID, +func MakeSelectQueryBuilder( cutoff time.Time, - pkColumns []string, + pkColNames []string, + pkColDirs []catenumpb.IndexColumn_Direction, relationName string, - spanToProcess spanToProcess, + bounds QueryBounds, aostDuration time.Duration, selectBatchSize int64, ttlExpr catpb.Expression, -) selectQueryBuilder { - // We will have a maximum of 1 + len(pkColumns)*2 columns, where one - // is reserved for AOST, and len(pkColumns) for both start and end key. - cachedArgs := make([]interface{}, 0, 1+len(pkColumns)*2) +) SelectQueryBuilder { + numPkCols := len(pkColNames) + if numPkCols == 0 { + panic("pkColNames is empty") + } + if numPkCols != len(pkColDirs) { + panic("different number of pkColNames and pkColDirs") + } + // We will have a maximum of 1 + len(pkColNames)*2 columns, where one + // is reserved for AOST, and len(pkColNames) for both start and end key. + cachedArgs := make([]interface{}, 0, 1+numPkCols*2) cachedArgs = append(cachedArgs, cutoff) - endPK := spanToProcess.endPK + endPK := bounds.End for _, d := range endPK { cachedArgs = append(cachedArgs, d) } - startPK := spanToProcess.startPK + startPK := bounds.Start for _, d := range startPK { cachedArgs = append(cachedArgs, d) } - return selectQueryBuilder{ - tableID: tableID, - pkColumns: pkColumns, + return SelectQueryBuilder{ + relationName: relationName, + pkColNames: pkColNames, + pkColDirs: pkColDirs, selectOpName: fmt.Sprintf("ttl select %s", relationName), - spanToProcess: spanToProcess, + bounds: bounds, aostDuration: aostDuration, selectBatchSize: selectBatchSize, ttlExpr: ttlExpr, - cachedArgs: cachedArgs, - isFirst: true, - pkColumnNamesSQL: ttlbase.MakeColumnNamesSQL(pkColumns), - endPKColumnNamesSQL: ttlbase.MakeColumnNamesSQL(pkColumns[:len(endPK)]), + cachedArgs: cachedArgs, + isFirst: true, } } -func (b *selectQueryBuilder) buildQuery() string { - // Generate the end key clause for SELECT, which always stays the same. - // Start from $2 as $1 is for the now clause. - // The end key of a span is exclusive, so use <. - var endFilterClause string - endPK := b.spanToProcess.endPK - if len(endPK) > 0 { - endFilterClause = fmt.Sprintf(" AND (%s) < (", b.endPKColumnNamesSQL) - for i := range endPK { - if i > 0 { - endFilterClause += ", " - } - endFilterClause += fmt.Sprintf("$%d", i+2) - } - endFilterClause += ")" - } - - startPK := b.spanToProcess.startPK - var filterClause string - if !b.isFirst { - // After the first query, we always want (col1, ...) > (cursor_col_1, ...) - filterClause = fmt.Sprintf("AND (%s) > (", b.pkColumnNamesSQL) - for i := range b.pkColumns { - if i > 0 { - filterClause += ", " - } - // We start from 2 if we don't have an endPK clause, but add len(b.endPK) - // if there is. - filterClause += fmt.Sprintf("$%d", 2+len(endPK)+i) - } - filterClause += ")" - } else if len(startPK) > 0 { - // For the the first query, we want (col1, ...) >= (cursor_col_1, ...) - filterClause = fmt.Sprintf("AND (%s) >= (", ttlbase.MakeColumnNamesSQL(b.pkColumns[:len(startPK)])) - for i := range startPK { - if i > 0 { - filterClause += ", " - } - // We start from 2 if we don't have an endPK clause, but add len(b.endPK) - // if there is. - filterClause += fmt.Sprintf("$%d", 2+len(endPK)+i) - } - filterClause += ")" - } - - return fmt.Sprintf( - ttlbase.SelectTemplate, - b.pkColumnNamesSQL, - b.tableID, - int64(b.aostDuration.Seconds()), +func (b *SelectQueryBuilder) buildQuery() string { + return ttlbase.BuildSelectQuery( + b.relationName, + b.pkColNames, + b.pkColDirs, + b.aostDuration, b.ttlExpr, - filterClause, - endFilterClause, + len(b.bounds.Start), + len(b.bounds.End), b.selectBatchSize, + b.isFirst, ) } -func (b *selectQueryBuilder) nextQuery() (string, []interface{}) { +var qosLevel = sessiondatapb.TTLLow + +func (b *SelectQueryBuilder) Run( + ctx context.Context, ie isql.Executor, +) (_ []tree.Datums, hasNext bool, _ error) { + var query string if b.isFirst { - q := b.buildQuery() + query = b.buildQuery() b.isFirst = false - return q, b.cachedArgs - } - // All subsequent query strings are the same. - // Populate the cache once, and then maintain it for all subsequent calls. - if b.cachedQuery == "" { - b.cachedQuery = b.buildQuery() + } else { + if b.cachedQuery == "" { + b.cachedQuery = b.buildQuery() + } + query = b.cachedQuery } - return b.cachedQuery, b.cachedArgs -} - -func (b *selectQueryBuilder) run(ctx context.Context, ie isql.Executor) ([]tree.Datums, error) { - q, args := b.nextQuery() // Use a nil txn so that the AOST clause is handled correctly. Currently, // the internal executor will treat a passed-in txn as an explicit txn, so // the AOST clause on the SELECT query would not be interpreted correctly. - qosLevel := sessiondatapb.TTLLow - ret, err := ie.QueryBufferedEx( + rows, err := ie.QueryBufferedEx( ctx, b.selectOpName, nil, /* txn */ @@ -180,38 +138,35 @@ func (b *selectQueryBuilder) run(ctx context.Context, ie isql.Executor) ([]tree. User: username.RootUserName(), QualityOfService: &qosLevel, }, - q, - args..., + query, + b.cachedArgs..., ) if err != nil { - return nil, err + return nil, false, err } - if err := b.moveCursor(ret); err != nil { - return nil, err - } - return ret, nil -} -func (b *selectQueryBuilder) moveCursor(rows []tree.Datums) error { - // Move the cursor forward. - if len(rows) > 0 { - lastRow := rows[len(rows)-1] - b.cachedArgs = b.cachedArgs[:1+len(b.spanToProcess.endPK)] - if len(lastRow) != len(b.pkColumns) { - return errors.AssertionFailedf("expected %d columns for last row, got %d", len(b.pkColumns), len(lastRow)) + numRows := int64(len(rows)) + if numRows > 0 { + // Move the cursor forward if SELECT returns rows. + lastRow := rows[numRows-1] + if len(lastRow) != len(b.pkColNames) { + return nil, false, errors.AssertionFailedf("expected %d columns for last row, got %d", len(b.pkColNames), len(lastRow)) } + b.cachedArgs = b.cachedArgs[:len(b.cachedArgs)-len(b.bounds.Start)] for _, d := range lastRow { b.cachedArgs = append(b.cachedArgs, d) } + b.bounds.Start = lastRow } - return nil + + return rows, numRows == b.selectBatchSize, nil } -// deleteQueryBuilder is responsible for maintaining state around the -// SELECT portion of the TTL job. -type deleteQueryBuilder struct { - tableID descpb.ID - pkColumns []string +// DeleteQueryBuilder is responsible for maintaining state around the DELETE +// portion of the TTL job. +type DeleteQueryBuilder struct { + relationName string + pkColNames []string deleteBatchSize int64 deleteOpName string ttlExpr catpb.Expression @@ -224,20 +179,22 @@ type deleteQueryBuilder struct { cachedArgs []interface{} } -func makeDeleteQueryBuilder( - tableID descpb.ID, +func MakeDeleteQueryBuilder( cutoff time.Time, - pkColumns []string, + pkColNames []string, relationName string, deleteBatchSize int64, ttlExpr catpb.Expression, -) deleteQueryBuilder { - cachedArgs := make([]interface{}, 0, 1+int64(len(pkColumns))*deleteBatchSize) +) DeleteQueryBuilder { + if len(pkColNames) == 0 { + panic("pkColNames is empty") + } + cachedArgs := make([]interface{}, 0, 1+int64(len(pkColNames))*deleteBatchSize) cachedArgs = append(cachedArgs, cutoff) - return deleteQueryBuilder{ - tableID: tableID, - pkColumns: pkColumns, + return DeleteQueryBuilder{ + relationName: relationName, + pkColNames: pkColNames, deleteBatchSize: deleteBatchSize, deleteOpName: fmt.Sprintf("ttl delete %s", relationName), ttlExpr: ttlExpr, @@ -245,56 +202,36 @@ func makeDeleteQueryBuilder( } } -func (b *deleteQueryBuilder) buildQuery(numRows int) string { - columnNamesSQL := ttlbase.MakeColumnNamesSQL(b.pkColumns) - var placeholderStr string - for i := 0; i < numRows; i++ { - if i > 0 { - placeholderStr += ", " - } - placeholderStr += "(" - for j := 0; j < len(b.pkColumns); j++ { - if j > 0 { - placeholderStr += ", " - } - placeholderStr += fmt.Sprintf("$%d", 2+i*len(b.pkColumns)+j) - } - placeholderStr += ")" - } - - return fmt.Sprintf( - ttlbase.DeleteTemplate, - b.tableID, +func (b *DeleteQueryBuilder) buildQuery(numRows int) string { + return ttlbase.BuildDeleteQuery( + b.relationName, + b.pkColNames, b.ttlExpr, - columnNamesSQL, - placeholderStr, + numRows, ) } -func (b *deleteQueryBuilder) buildQueryAndArgs(rows []tree.Datums) (string, []interface{}) { - var q string - if int64(len(rows)) == b.deleteBatchSize { +func (b *DeleteQueryBuilder) Run( + ctx context.Context, txn isql.Txn, rows []tree.Datums, +) (int64, error) { + numRows := len(rows) + var query string + if int64(numRows) == b.deleteBatchSize { if b.cachedQuery == "" { - b.cachedQuery = b.buildQuery(len(rows)) + b.cachedQuery = b.buildQuery(numRows) } - q = b.cachedQuery + query = b.cachedQuery } else { - q = b.buildQuery(len(rows)) + query = b.buildQuery(numRows) } + deleteArgs := b.cachedArgs[:1] for _, row := range rows { for _, col := range row { deleteArgs = append(deleteArgs, col) } } - return q, deleteArgs -} -func (b *deleteQueryBuilder) run( - ctx context.Context, txn isql.Txn, rows []tree.Datums, -) (int64, error) { - q, deleteArgs := b.buildQueryAndArgs(rows) - qosLevel := sessiondatapb.TTLLow rowCount, err := txn.ExecEx( ctx, b.deleteOpName, @@ -303,7 +240,7 @@ func (b *deleteQueryBuilder) run( User: username.RootUserName(), QualityOfService: &qosLevel, }, - q, + query, deleteArgs..., ) return int64(rowCount), err diff --git a/pkg/sql/ttl/ttljob/ttljob_query_builder_test.go b/pkg/sql/ttl/ttljob/ttljob_query_builder_test.go index 6b810dfc854f..025cd8d54e0f 100644 --- a/pkg/sql/ttl/ttljob/ttljob_query_builder_test.go +++ b/pkg/sql/ttl/ttljob/ttljob_query_builder_test.go @@ -8,365 +8,373 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package ttljob +package ttljob_test import ( + "context" + "fmt" + "strconv" + "strings" "testing" "time" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/catenumpb" + "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/ttl/ttlbase" + "github.com/cockroachdb/cockroach/pkg/sql/ttl/ttljob" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/stretchr/testify/require" ) +const ( + relationName = "defaultdb.relation_name" + ttlColName = "expire_at" +) + +var ( + cutoff = time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC) + expireAt = cutoff.AddDate(-1, 0, 0) +) + +func genCreateTableStatement( + pkColNames []string, pkColDirs []catenumpb.IndexColumn_Direction, +) string { + numPKCols := len(pkColNames) + colDefs := make([]string, 0, numPKCols+1) + for i := range pkColNames { + colDefs = append(colDefs, pkColNames[i]+" int") + } + colDefs = append(colDefs, ttlColName+" timestamptz") + pkColDefs := make([]string, 0, numPKCols) + for i := range pkColNames { + var pkColDir catenumpb.IndexColumn_Direction + if pkColDirs != nil { + pkColDir = pkColDirs[i] + } + pkColDefs = append(pkColDefs, pkColNames[i]+" "+pkColDir.String()) + } + return fmt.Sprintf( + "CREATE TABLE %s (%s, PRIMARY KEY(%s))", + relationName, strings.Join(colDefs, ", "), strings.Join(pkColDefs, ", "), + ) +} + +func genInsertStatement(values []string) string { + return fmt.Sprintf( + "INSERT INTO %s VALUES %s", + relationName, strings.Join(values, ", "), + ) +} + func TestSelectQueryBuilder(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - mockTime := time.Date(2000, 1, 1, 13, 30, 45, 0, time.UTC) - mockDuration := -10 * time.Second - - type iteration struct { - expectedQuery string - expectedArgs []interface{} - rows []tree.Datums + intsToDatums := func(ints ...int) tree.Datums { + datums := make(tree.Datums, 0, len(ints)) + for _, i := range ints { + datums = append(datums, tree.NewDInt(tree.DInt(i))) + } + return datums } + testCases := []struct { - desc string - b selectQueryBuilder - iterations []iteration + desc string + pkColDirs []catenumpb.IndexColumn_Direction + numRows int + bounds ttljob.QueryBounds + // [iteration][row][val] + iterations [][][]int }{ { - desc: "middle range", - b: makeSelectQueryBuilder( - 1, - mockTime, - []string{"col1", "col2"}, - "relation_name", - spanToProcess{ - startPK: tree.Datums{tree.NewDInt(100), tree.NewDInt(5)}, - endPK: tree.Datums{tree.NewDInt(200), tree.NewDInt(15)}, + desc: "ASC", + pkColDirs: []catenumpb.IndexColumn_Direction{ + catenumpb.IndexColumn_ASC, + }, + iterations: [][][]int{ + { + {0}, + {1}, }, - mockDuration, - 2, - colinfo.TTLDefaultExpirationColumnName, - ), - iterations: []iteration{ + {}, + }, + }, + { + desc: "DESC", + pkColDirs: []catenumpb.IndexColumn_Direction{ + catenumpb.IndexColumn_DESC, + }, + iterations: [][][]int{ { - expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name] -AS OF SYSTEM TIME INTERVAL '-10 seconds' -WHERE crdb_internal_expiration <= $1 -AND (col1, col2) >= ($4, $5) AND (col1, col2) < ($2, $3) -ORDER BY col1, col2 -LIMIT 2`, - expectedArgs: []interface{}{ - mockTime, - tree.NewDInt(200), tree.NewDInt(15), - tree.NewDInt(100), tree.NewDInt(5), - }, - rows: []tree.Datums{ - {tree.NewDInt(100), tree.NewDInt(12)}, - {tree.NewDInt(105), tree.NewDInt(12)}, - }, + {1}, + {0}, }, + {}, + }, + }, + { + desc: "ASC partial last result", + pkColDirs: []catenumpb.IndexColumn_Direction{ + catenumpb.IndexColumn_ASC, + }, + numRows: 1, + iterations: [][][]int{ { - expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name] -AS OF SYSTEM TIME INTERVAL '-10 seconds' -WHERE crdb_internal_expiration <= $1 -AND (col1, col2) > ($4, $5) AND (col1, col2) < ($2, $3) -ORDER BY col1, col2 -LIMIT 2`, - expectedArgs: []interface{}{ - mockTime, - tree.NewDInt(200), tree.NewDInt(15), - tree.NewDInt(105), tree.NewDInt(12), - }, - rows: []tree.Datums{ - {tree.NewDInt(112), tree.NewDInt(19)}, - {tree.NewDInt(180), tree.NewDInt(132)}, - }, + {0}, }, + }, + }, + { + desc: "DESC partial last result", + pkColDirs: []catenumpb.IndexColumn_Direction{ + catenumpb.IndexColumn_DESC, + }, + numRows: 1, + iterations: [][][]int{ { - expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name] -AS OF SYSTEM TIME INTERVAL '-10 seconds' -WHERE crdb_internal_expiration <= $1 -AND (col1, col2) > ($4, $5) AND (col1, col2) < ($2, $3) -ORDER BY col1, col2 -LIMIT 2`, - expectedArgs: []interface{}{ - mockTime, - tree.NewDInt(200), tree.NewDInt(15), - tree.NewDInt(180), tree.NewDInt(132), - }, - rows: []tree.Datums{}, + {0}, }, }, }, { - desc: "only one range", - b: makeSelectQueryBuilder( - 1, - mockTime, - []string{"col1", "col2"}, - "table_name", - spanToProcess{}, - mockDuration, - 2, - colinfo.TTLDefaultExpirationColumnName, - ), - iterations: []iteration{ + desc: "ASC start bounds", + pkColDirs: []catenumpb.IndexColumn_Direction{ + catenumpb.IndexColumn_ASC, + }, + bounds: ttljob.QueryBounds{ + Start: intsToDatums(1), + }, + iterations: [][][]int{ { - expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name] -AS OF SYSTEM TIME INTERVAL '-10 seconds' -WHERE crdb_internal_expiration <= $1 - -ORDER BY col1, col2 -LIMIT 2`, - expectedArgs: []interface{}{ - mockTime, - }, - rows: []tree.Datums{ - {tree.NewDInt(100), tree.NewDInt(12)}, - {tree.NewDInt(105), tree.NewDInt(12)}, - }, + {1}, }, + }, + }, + { + desc: "ASC end bounds", + pkColDirs: []catenumpb.IndexColumn_Direction{ + catenumpb.IndexColumn_ASC, + }, + bounds: ttljob.QueryBounds{ + End: intsToDatums(1), + }, + iterations: [][][]int{ { - expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name] -AS OF SYSTEM TIME INTERVAL '-10 seconds' -WHERE crdb_internal_expiration <= $1 -AND (col1, col2) > ($2, $3) -ORDER BY col1, col2 -LIMIT 2`, - expectedArgs: []interface{}{ - mockTime, - tree.NewDInt(105), tree.NewDInt(12), - }, - rows: []tree.Datums{ - {tree.NewDInt(112), tree.NewDInt(19)}, - {tree.NewDInt(180), tree.NewDInt(132)}, - }, + {0}, }, + }, + }, + { + desc: "DESC start bounds", + pkColDirs: []catenumpb.IndexColumn_Direction{ + catenumpb.IndexColumn_DESC, + }, + bounds: ttljob.QueryBounds{ + Start: intsToDatums(0), + }, + iterations: [][][]int{ { - expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name] -AS OF SYSTEM TIME INTERVAL '-10 seconds' -WHERE crdb_internal_expiration <= $1 -AND (col1, col2) > ($2, $3) -ORDER BY col1, col2 -LIMIT 2`, - expectedArgs: []interface{}{ - mockTime, - tree.NewDInt(180), tree.NewDInt(132), - }, - rows: []tree.Datums{}, + {0}, }, }, }, { - desc: "one range, but a partial startPK and endPK split", - b: makeSelectQueryBuilder( - 1, - mockTime, - []string{"col1", "col2"}, - "table_name", - spanToProcess{ - startPK: tree.Datums{tree.NewDInt(100)}, - endPK: tree.Datums{tree.NewDInt(181)}, + desc: "DESC end bounds", + pkColDirs: []catenumpb.IndexColumn_Direction{ + catenumpb.IndexColumn_DESC, + }, + bounds: ttljob.QueryBounds{ + End: intsToDatums(0), + }, + iterations: [][][]int{ + { + {1}, }, - mockDuration, - 2, - colinfo.TTLDefaultExpirationColumnName, - ), - iterations: []iteration{ + }, + }, + { + desc: "ASC ASC", + pkColDirs: []catenumpb.IndexColumn_Direction{ + catenumpb.IndexColumn_ASC, + catenumpb.IndexColumn_ASC, + }, + iterations: [][][]int{ { - expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name] -AS OF SYSTEM TIME INTERVAL '-10 seconds' -WHERE crdb_internal_expiration <= $1 -AND (col1) >= ($3) AND (col1) < ($2) -ORDER BY col1, col2 -LIMIT 2`, - expectedArgs: []interface{}{ - mockTime, - tree.NewDInt(181), - tree.NewDInt(100), - }, - rows: []tree.Datums{ - {tree.NewDInt(100), tree.NewDInt(12)}, - {tree.NewDInt(105), tree.NewDInt(12)}, - }, + {0, 0}, + {0, 1}, }, { - expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name] -AS OF SYSTEM TIME INTERVAL '-10 seconds' -WHERE crdb_internal_expiration <= $1 -AND (col1, col2) > ($3, $4) AND (col1) < ($2) -ORDER BY col1, col2 -LIMIT 2`, - expectedArgs: []interface{}{ - mockTime, - tree.NewDInt(181), - tree.NewDInt(105), tree.NewDInt(12), - }, - rows: []tree.Datums{ - {tree.NewDInt(112), tree.NewDInt(19)}, - {tree.NewDInt(180), tree.NewDInt(132)}, - }, + {1, 0}, + {1, 1}, }, + {}, + }, + }, + { + desc: "DESC DESC", + pkColDirs: []catenumpb.IndexColumn_Direction{ + catenumpb.IndexColumn_DESC, + catenumpb.IndexColumn_DESC, + }, + iterations: [][][]int{ { - expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name] -AS OF SYSTEM TIME INTERVAL '-10 seconds' -WHERE crdb_internal_expiration <= $1 -AND (col1, col2) > ($3, $4) AND (col1) < ($2) -ORDER BY col1, col2 -LIMIT 2`, - expectedArgs: []interface{}{ - mockTime, - tree.NewDInt(181), - tree.NewDInt(180), tree.NewDInt(132), - }, - rows: []tree.Datums{}, + {1, 1}, + {1, 0}, }, + { + {0, 1}, + {0, 0}, + }, + {}, }, }, { - desc: "first range", - b: makeSelectQueryBuilder( - 1, - mockTime, - []string{"col1", "col2"}, - "table_name", - spanToProcess{ - endPK: tree.Datums{tree.NewDInt(200), tree.NewDInt(15)}, + desc: "ASC DESC ASC", + pkColDirs: []catenumpb.IndexColumn_Direction{ + catenumpb.IndexColumn_ASC, + catenumpb.IndexColumn_DESC, + catenumpb.IndexColumn_ASC, + }, + iterations: [][][]int{ + { + {0, 1, 0}, + {0, 1, 1}, }, - mockDuration, - 2, - colinfo.TTLDefaultExpirationColumnName, - ), - iterations: []iteration{ { - expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name] -AS OF SYSTEM TIME INTERVAL '-10 seconds' -WHERE crdb_internal_expiration <= $1 - AND (col1, col2) < ($2, $3) -ORDER BY col1, col2 -LIMIT 2`, - expectedArgs: []interface{}{ - mockTime, - tree.NewDInt(200), tree.NewDInt(15), - }, - rows: []tree.Datums{ - {tree.NewDInt(100), tree.NewDInt(12)}, - {tree.NewDInt(105), tree.NewDInt(12)}, - }, + {0, 0, 0}, + {0, 0, 1}, }, { - expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name] -AS OF SYSTEM TIME INTERVAL '-10 seconds' -WHERE crdb_internal_expiration <= $1 -AND (col1, col2) > ($4, $5) AND (col1, col2) < ($2, $3) -ORDER BY col1, col2 -LIMIT 2`, - expectedArgs: []interface{}{ - mockTime, - tree.NewDInt(200), tree.NewDInt(15), - tree.NewDInt(105), tree.NewDInt(12), - }, - rows: []tree.Datums{ - {tree.NewDInt(112), tree.NewDInt(19)}, - {tree.NewDInt(180), tree.NewDInt(132)}, - }, + {1, 1, 0}, + {1, 1, 1}, }, { - expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name] -AS OF SYSTEM TIME INTERVAL '-10 seconds' -WHERE crdb_internal_expiration <= $1 -AND (col1, col2) > ($4, $5) AND (col1, col2) < ($2, $3) -ORDER BY col1, col2 -LIMIT 2`, - expectedArgs: []interface{}{ - mockTime, - tree.NewDInt(200), tree.NewDInt(15), - tree.NewDInt(180), tree.NewDInt(132), - }, - rows: []tree.Datums{}, + {1, 0, 0}, + {1, 0, 1}, }, + {}, }, }, { - desc: "last range", - b: makeSelectQueryBuilder( - 1, - mockTime, - []string{"col1", "col2"}, - "table_name", - spanToProcess{ - startPK: tree.Datums{tree.NewDInt(100), tree.NewDInt(5)}, + desc: "DESC ASC DESC", + pkColDirs: []catenumpb.IndexColumn_Direction{ + catenumpb.IndexColumn_DESC, + catenumpb.IndexColumn_ASC, + catenumpb.IndexColumn_DESC, + }, + iterations: [][][]int{ + { + {1, 0, 1}, + {1, 0, 0}, }, - mockDuration, - 2, - colinfo.TTLDefaultExpirationColumnName, - ), - iterations: []iteration{ { - expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name] -AS OF SYSTEM TIME INTERVAL '-10 seconds' -WHERE crdb_internal_expiration <= $1 -AND (col1, col2) >= ($2, $3) -ORDER BY col1, col2 -LIMIT 2`, - expectedArgs: []interface{}{ - mockTime, - tree.NewDInt(100), tree.NewDInt(5), - }, - rows: []tree.Datums{ - {tree.NewDInt(100), tree.NewDInt(12)}, - {tree.NewDInt(105), tree.NewDInt(12)}, - }, + {1, 1, 1}, + {1, 1, 0}, }, { - expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name] -AS OF SYSTEM TIME INTERVAL '-10 seconds' -WHERE crdb_internal_expiration <= $1 -AND (col1, col2) > ($2, $3) -ORDER BY col1, col2 -LIMIT 2`, - expectedArgs: []interface{}{ - mockTime, - tree.NewDInt(105), tree.NewDInt(12), - }, - rows: []tree.Datums{ - {tree.NewDInt(112), tree.NewDInt(19)}, - {tree.NewDInt(180), tree.NewDInt(132)}, - }, + {0, 0, 1}, + {0, 0, 0}, }, { - expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name] -AS OF SYSTEM TIME INTERVAL '-10 seconds' -WHERE crdb_internal_expiration <= $1 -AND (col1, col2) > ($2, $3) -ORDER BY col1, col2 -LIMIT 2`, - expectedArgs: []interface{}{ - mockTime, - tree.NewDInt(180), tree.NewDInt(132), - }, - rows: []tree.Datums{}, + {0, 1, 1}, + {0, 1, 0}, }, + {}, }, }, } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - for i, it := range tc.iterations { - q, args := tc.b.nextQuery() - require.Equal(t, it.expectedQuery, q) - require.Equal(t, it.expectedArgs, args) - require.NoError(t, tc.b.moveCursor(it.rows)) - if i >= 1 { - require.NotEmpty(t, tc.b.cachedQuery) + ctx := context.Background() + testCluster := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) + defer testCluster.Stopper().Stop(ctx) + + testServer := testCluster.Server(0) + ie := testServer.InternalExecutor().(*sql.InternalExecutor) + + // Generate pkColNames. + pkColDirs := tc.pkColDirs + numPKCols := len(pkColDirs) + pkColNames := ttlbase.GenPKColNames(numPKCols) + + // Run CREATE TABLE statement. + createTableStatement := genCreateTableStatement(pkColNames, pkColDirs) + _, err := ie.Exec(ctx, "create ttl table", nil, createTableStatement) + require.NoError(t, err) + + // Run INSERT statement. + numRows := tc.numRows + maxNumRows := 2 << (numPKCols - 1) + if numRows == 0 { + numRows = maxNumRows + } else if numRows > maxNumRows { + panic("numRows must be less than maxNumRows") + } + values := make([]string, 0, numRows) + for i := 0; i < numRows; i++ { + format := fmt.Sprintf("%%0%db", numPKCols) + number := fmt.Sprintf(format, i) + value := make([]string, 0, numPKCols+1) + for j := 0; j < numPKCols; j++ { + value = append(value, string(number[j])) + } + value = append(value, "'"+expireAt.Format(time.RFC3339)+"'") + values = append(values, "("+strings.Join(value, ", ")+")") + } + insertStatement := genInsertStatement(values) + _, err = ie.Exec(ctx, "insert ttl table", nil, insertStatement) + require.NoError(t, err) + + // Setup SelectQueryBuilder. + queryBuilder := ttljob.MakeSelectQueryBuilder( + cutoff, + pkColNames, + pkColDirs, + relationName, + tc.bounds, + 0, + 2, + ttlColName, + ) + + // Verify queryBuilder iterations. + i := 0 + expectedIterations := tc.iterations + actualIterations := make([][][]int, 0, len(expectedIterations)) + for ; ; i++ { + const msg = "i=%d" + result, hasNext, err := queryBuilder.Run(ctx, ie) + require.NoErrorf(t, err, msg, i) + actualIteration := make([][]int, 0, len(result)) + for _, datums := range result { + row := make([]int, 0, len(datums)) + for _, datum := range datums { + val := int(*datum.(*tree.DInt)) + row = append(row, val) + } + actualIteration = append(actualIteration, row) + } + actualIterations = append(actualIterations, actualIteration) + require.Greaterf(t, len(expectedIterations), i, msg, i) + require.Equalf(t, expectedIterations[i], actualIteration, msg, i) + if !hasNext { + break + } + } + require.Len(t, expectedIterations, i+1) + + // Verify all selected rows are unique. + for i := range actualIterations { + for j := range actualIterations { + if i != j { + require.NotEqualf(t, actualIterations[i], actualIterations[j], "i=%d j=%d", i, j) + } } } }) @@ -377,119 +385,88 @@ func TestDeleteQueryBuilder(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - mockTime := time.Date(2000, 1, 1, 13, 30, 45, 0, time.UTC) - - type iteration struct { - rows []tree.Datums - - expectedQuery string - expectedArgs []interface{} - } testCases := []struct { - desc string - b deleteQueryBuilder - iterations []iteration + desc string + numPKCols int + numRows int }{ { - desc: "single delete less than batch size", - b: makeDeleteQueryBuilder(1, mockTime, []string{"col1", "col2"}, "table_name", 3, colinfo.TTLDefaultExpirationColumnName), - iterations: []iteration{ - { - rows: []tree.Datums{ - {tree.NewDInt(10), tree.NewDInt(15)}, - {tree.NewDInt(12), tree.NewDInt(16)}, - }, - expectedQuery: `DELETE FROM [1 AS tbl_name] -WHERE crdb_internal_expiration <= $1 -AND (col1, col2) IN (($2, $3), ($4, $5))`, - expectedArgs: []interface{}{ - mockTime, - tree.NewDInt(10), tree.NewDInt(15), - tree.NewDInt(12), tree.NewDInt(16), - }, - }, - }, + desc: "1 PK col - 0 rows", + numPKCols: 1, + numRows: 0, }, { - desc: "multiple deletes", - b: makeDeleteQueryBuilder(1, mockTime, []string{"col1", "col2"}, "table_name", 3, colinfo.TTLDefaultExpirationColumnName), - iterations: []iteration{ - { - rows: []tree.Datums{ - {tree.NewDInt(10), tree.NewDInt(15)}, - {tree.NewDInt(12), tree.NewDInt(16)}, - {tree.NewDInt(12), tree.NewDInt(18)}, - }, - expectedQuery: `DELETE FROM [1 AS tbl_name] -WHERE crdb_internal_expiration <= $1 -AND (col1, col2) IN (($2, $3), ($4, $5), ($6, $7))`, - expectedArgs: []interface{}{ - mockTime, - tree.NewDInt(10), tree.NewDInt(15), - tree.NewDInt(12), tree.NewDInt(16), - tree.NewDInt(12), tree.NewDInt(18), - }, - }, - { - rows: []tree.Datums{ - {tree.NewDInt(110), tree.NewDInt(115)}, - {tree.NewDInt(112), tree.NewDInt(116)}, - {tree.NewDInt(112), tree.NewDInt(118)}, - }, - expectedQuery: `DELETE FROM [1 AS tbl_name] -WHERE crdb_internal_expiration <= $1 -AND (col1, col2) IN (($2, $3), ($4, $5), ($6, $7))`, - expectedArgs: []interface{}{ - mockTime, - tree.NewDInt(110), tree.NewDInt(115), - tree.NewDInt(112), tree.NewDInt(116), - tree.NewDInt(112), tree.NewDInt(118), - }, - }, - { - rows: []tree.Datums{ - {tree.NewDInt(1210), tree.NewDInt(1215)}, - }, - expectedQuery: `DELETE FROM [1 AS tbl_name] -WHERE crdb_internal_expiration <= $1 -AND (col1, col2) IN (($2, $3))`, - expectedArgs: []interface{}{ - mockTime, - tree.NewDInt(1210), tree.NewDInt(1215), - }, - }, - }, + desc: "1 PK col - 1 row", + numPKCols: 1, + numRows: 1, + }, + { + desc: "3 PK cols - 3 rows", + numPKCols: 3, + numRows: 3, }, } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - for _, it := range tc.iterations { - q, args := tc.b.buildQueryAndArgs(it.rows) - require.Equal(t, it.expectedQuery, q) - require.Equal(t, it.expectedArgs, args) - } - }) - } -} + ctx := context.Background() -func TestMakeColumnNamesSQL(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) + testCluster := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) + defer testCluster.Stopper().Stop(ctx) - testCases := []struct { - cols []string - expected string - }{ - {[]string{"a"}, "a"}, - {[]string{"index"}, `index`}, - {[]string{"a", "b"}, "a, b"}, - {[]string{"escape-me", "index", "c"}, `"escape-me", index, c`}, - } + testServer := testCluster.Server(0) + ie := testServer.InternalExecutor().(*sql.InternalExecutor) + db := testServer.InternalDB().(*sql.InternalDB) - for _, tc := range testCases { - t.Run(tc.expected, func(t *testing.T) { - require.Equal(t, tc.expected, ttlbase.MakeColumnNamesSQL(tc.cols)) + // Generate pkColNames. + numPKCols := tc.numPKCols + pkColNames := ttlbase.GenPKColNames(numPKCols) + + // Run CREATE TABLE statement. + createTableStatement := genCreateTableStatement(pkColNames, nil) + _, err := ie.Exec(ctx, "create ttl table", nil, createTableStatement) + require.NoError(t, err) + + // Run INSERT statement. + expectedNumRows := tc.numRows + if expectedNumRows > 0 { + values := make([]string, 0, expectedNumRows) + for i := 0; i < expectedNumRows; i++ { + value := make([]string, 0, numPKCols+1) + for j := 0; j < numPKCols; j++ { + value = append(value, strconv.Itoa(i)) + } + value = append(value, "'"+expireAt.Format(time.RFC3339)+"'") + values = append(values, "("+strings.Join(value, ", ")+")") + } + insertStatement := genInsertStatement(values) + _, err = ie.Exec(ctx, "insert ttl table", nil, insertStatement) + require.NoError(t, err) + } + + // Setup DeleteQueryBuilder. + queryBuilder := ttljob.MakeDeleteQueryBuilder( + cutoff, + pkColNames, + relationName, + 2, /* deleteBatchSize */ + ttlColName, + ) + + // Verify rows are deleted. + rows := make([]tree.Datums, 0, expectedNumRows) + err = db.Txn( + ctx, + func(ctx context.Context, txn isql.Txn) error { + actualNumRows, err := queryBuilder.Run(ctx, txn, rows) + if err != nil { + return err + } + require.Equal(t, int64(expectedNumRows), actualNumRows) + return nil + }, + ) + require.NoError(t, err) }) } } diff --git a/pkg/sql/ttl/ttljob/ttljob_test.go b/pkg/sql/ttl/ttljob/ttljob_test.go index 179db98bc704..4c0ed19124c5 100644 --- a/pkg/sql/ttl/ttljob/ttljob_test.go +++ b/pkg/sql/ttl/ttljob/ttljob_test.go @@ -602,7 +602,7 @@ func TestRowLevelTTLJobRandomEntries(t *testing.T) { numSplits int forceNonMultiTenant bool expirationExpression string - addRow func(th *rowLevelTTLTestJobTestHelper, createTableStmt *tree.CreateTable, ts time.Time) + addRow func(th *rowLevelTTLTestJobTestHelper, t *testing.T, createTableStmt *tree.CreateTable, ts time.Time) } // Add some basic one and three column row-level TTL tests. testCases := []testCase{ @@ -669,6 +669,18 @@ func TestRowLevelTTLJobRandomEntries(t *testing.T) { "quote-kw-col" TIMESTAMPTZ, text TEXT, PRIMARY KEY (id, other_col, "quote-kw-col") +) WITH (ttl_expire_after = '30 days')`, + numExpiredRows: 1001, + numNonExpiredRows: 5, + }, + { + desc: "three column pk DESC", + createTable: `CREATE TABLE tbl ( + id UUID DEFAULT gen_random_uuid(), + other_col INT, + "quote-kw-col" TIMESTAMPTZ, + text TEXT, + PRIMARY KEY (id, other_col DESC, "quote-kw-col") ) WITH (ttl_expire_after = '30 days')`, numExpiredRows: 1001, numNonExpiredRows: 5, @@ -724,7 +736,7 @@ func TestRowLevelTTLJobRandomEntries(t *testing.T) { numExpiredRows: 1001, numNonExpiredRows: 5, expirationExpression: "expire_at", - addRow: func(th *rowLevelTTLTestJobTestHelper, _ *tree.CreateTable, ts time.Time) { + addRow: func(th *rowLevelTTLTestJobTestHelper, t *testing.T, _ *tree.CreateTable, ts time.Time) { th.sqlDB.Exec( t, "INSERT INTO tbl (expire_at) VALUES ($1)", @@ -759,7 +771,7 @@ func TestRowLevelTTLJobRandomEntries(t *testing.T) { ) } - defaultAddRow := func(th *rowLevelTTLTestJobTestHelper, createTableStmt *tree.CreateTable, ts time.Time) { + defaultAddRow := func(th *rowLevelTTLTestJobTestHelper, t *testing.T, createTableStmt *tree.CreateTable, ts time.Time) { insertColumns := []string{"crdb_internal_expiration"} placeholders := []string{"$1"} values := []interface{}{ts} @@ -867,10 +879,10 @@ func TestRowLevelTTLJobRandomEntries(t *testing.T) { // Add expired and non-expired rows. for i := 0; i < tc.numExpiredRows; i++ { - addRow(th, createTableStmt, timeutil.Now().Add(-time.Hour)) + addRow(th, t, createTableStmt, timeutil.Now().Add(-time.Hour)) } for i := 0; i < tc.numNonExpiredRows; i++ { - addRow(th, createTableStmt, timeutil.Now().Add(time.Hour*24*30)) + addRow(th, t, createTableStmt, timeutil.Now().Add(time.Hour*24*30)) } for _, stmt := range tc.postSetup { @@ -978,7 +990,7 @@ func TestInboundForeignKeyOnDeleteRestrict(t *testing.T) { sqlDB.Exec(t, "INSERT INTO child VALUES (1, 1)") // Force the schedule to execute. - th.waitForScheduledJob(t, jobs.StatusFailed, `delete on table "tbl_name" violates foreign key constraint "child_tbl_id_fkey" on table "child"`) + th.waitForScheduledJob(t, jobs.StatusFailed, `delete on table "tbl" violates foreign key constraint "child_tbl_id_fkey" on table "child"`) results := sqlDB.QueryStr(t, "SELECT * FROM tbl") require.Len(t, results, 1) diff --git a/pkg/sql/ttl/ttlschedule/ttlschedule.go b/pkg/sql/ttl/ttlschedule/ttlschedule.go index 3d4517a1cdbf..2727e6744014 100644 --- a/pkg/sql/ttl/ttlschedule/ttlschedule.go +++ b/pkg/sql/ttl/ttlschedule/ttlschedule.go @@ -214,30 +214,35 @@ func (s rowLevelTTLExecutor) GetCreateScheduleStatement( } func makeTTLJobDescription(tableDesc catalog.TableDescriptor, tn tree.ObjectName) string { - pkColumns := tableDesc.GetPrimaryIndex().IndexDesc().KeyColumnNames - pkColumnNamesSQL := ttlbase.MakeColumnNamesSQL(pkColumns) - selectQuery := fmt.Sprintf( - ttlbase.SelectTemplate, - pkColumnNamesSQL, - tableDesc.GetID(), - int64(ttlbase.DefaultAOSTDuration.Seconds()), - "", - fmt.Sprintf("AND (%s) > ()", pkColumnNamesSQL), - fmt.Sprintf(" AND (%s) < ()", pkColumnNamesSQL), - "", + relationName := tn.FQString() + pkIndex := tableDesc.GetPrimaryIndex().IndexDesc() + pkColNames := pkIndex.KeyColumnNames + pkColDirs := pkIndex.KeyColumnDirections + rowLevelTTL := tableDesc.GetRowLevelTTL() + ttlExpirationExpr := rowLevelTTL.GetTTLExpr() + numPkCols := len(pkColNames) + selectQuery := ttlbase.BuildSelectQuery( + relationName, + pkColNames, + pkColDirs, + ttlbase.DefaultAOSTDuration, + ttlExpirationExpr, + numPkCols, + numPkCols, + rowLevelTTL.SelectBatchSize, + true, /*startIncl*/ ) - deleteQuery := fmt.Sprintf( - ttlbase.DeleteTemplate, - tableDesc.GetID(), - "", - pkColumnNamesSQL, - "", + deleteQuery := ttlbase.BuildDeleteQuery( + relationName, + pkColNames, + ttlExpirationExpr, + 1, /*numRows*/ ) return fmt.Sprintf(`ttl for %s -- for each span, iterate to find rows: %s -- then delete with: -%s`, tn.FQString(), selectQuery, deleteQuery) +%s`, relationName, selectQuery, deleteQuery) } func createRowLevelTTLJob( @@ -248,7 +253,8 @@ func createRowLevelTTLJob( ttlArgs catpb.ScheduledRowLevelTTLArgs, ) (jobspb.JobID, error) { descsCol := descs.FromTxn(txn) - tableDesc, err := descsCol.ByIDWithLeased(txn.KV()).WithoutNonPublic().Get().Table(ctx, ttlArgs.TableID) + tableID := ttlArgs.TableID + tableDesc, err := descsCol.ByIDWithLeased(txn.KV()).WithoutNonPublic().Get().Table(ctx, tableID) if err != nil { return 0, err } @@ -260,7 +266,7 @@ func createRowLevelTTLJob( Description: makeTTLJobDescription(tableDesc, tn), Username: username.NodeUserName(), Details: jobspb.RowLevelTTLDetails{ - TableID: ttlArgs.TableID, + TableID: tableID, Cutoff: timeutil.Now(), TableVersion: tableDesc.GetVersion(), }, diff --git a/pkg/util/quantile/stream.go b/pkg/util/quantile/stream.go index 34918b706ab1..bb3e490b436e 100644 --- a/pkg/util/quantile/stream.go +++ b/pkg/util/quantile/stream.go @@ -166,7 +166,11 @@ func (s *Stream) insert(sample Sample) { // Query returns the computed qth percentiles value. If s was created with // NewTargeted, and q is not in the set of quantiles provided a priori, Query // will return an unspecified result. -func (s *Stream) Query(q float64) float64 { +// flush can be a heavy operation, but is required on cases where you need high precision +// of the data because new data was just added (shouldFlush = true). +// If is okay to have some delay on the data or no new data was added the flush +// can be skipped (shouldFlush = false). +func (s *Stream) Query(q float64, shouldFlush bool) float64 { if !s.flushed() { // Fast path when there hasn't been enough data for a flush; // this also yields better accuracy for small sets of data. @@ -181,7 +185,9 @@ func (s *Stream) Query(q float64) float64 { s.maybeSort() return s.b[i].Value } - s.flush() + if shouldFlush { + s.flush() + } return s.stream.query(q) } diff --git a/pkg/util/quantile/stream_test.go b/pkg/util/quantile/stream_test.go index ec2e5fb12993..e0af96c78999 100644 --- a/pkg/util/quantile/stream_test.go +++ b/pkg/util/quantile/stream_test.go @@ -61,7 +61,7 @@ func verifyPercsWithAbsoluteEpsilon(t *testing.T, a []float64, s *Stream) { upper = len(a) } w, min, max := a[k-1], a[lower-1], a[upper-1] - if g := s.Query(quantile); g < min || g > max { + if g := s.Query(quantile, true); g < min || g > max { t.Errorf("q=%f: want %v [%f,%f], got %v", quantile, w, min, max, g) } } @@ -76,7 +76,7 @@ func verifyLowPercsWithRelativeEpsilon(t *testing.T, a []float64, s *Stream) { lowerRank := int((1 - RelativeEpsilon) * qu * n) upperRank := int(math.Ceil((1 + RelativeEpsilon) * qu * n)) w, min, max := a[k-1], a[lowerRank-1], a[upperRank-1] - if g := s.Query(qu); g < min || g > max { + if g := s.Query(qu, true); g < min || g > max { t.Errorf("q=%f: want %v [%f,%f], got %v", qu, w, min, max, g) } } @@ -91,7 +91,7 @@ func verifyHighPercsWithRelativeEpsilon(t *testing.T, a []float64, s *Stream) { lowerRank := int((1 - (1+RelativeEpsilon)*(1-qu)) * n) upperRank := int(math.Ceil((1 - (1-RelativeEpsilon)*(1-qu)) * n)) w, min, max := a[k-1], a[lowerRank-1], a[upperRank-1] - if g := s.Query(qu); g < min || g > max { + if g := s.Query(qu, true); g < min || g > max { t.Errorf("q=%f: want %v [%f,%f], got %v", qu, w, min, max, g) } } @@ -135,7 +135,7 @@ func TestTargetedQuerySmallSampleSize(t *testing.T) { 0.90: 5, 0.99: 5, } { - if got := s.Query(φ); got != want { + if got := s.Query(φ, true); got != want { t.Errorf("want %f for φ=%f, got %f", want, φ, got) } } @@ -200,7 +200,7 @@ func TestUncompressed(t *testing.T) { // Before compression, Query should have 100% accuracy. for quantile := range Targets { w := quantile * 100 - if g := q.Query(quantile); g != w { + if g := q.Query(quantile, true); g != w { t.Errorf("want %f, got %f", w, g) } } @@ -219,17 +219,45 @@ func TestUncompressedSamples(t *testing.T) { func TestUncompressedOne(t *testing.T) { q := NewTargeted(map[float64]float64{0.99: 0.01}) q.Insert(3.14) - if g := q.Query(0.90); g != 3.14 { + if g := q.Query(0.90, true); g != 3.14 { t.Error("want PI, got", g) } } func TestDefaults(t *testing.T) { - if g := NewTargeted(map[float64]float64{0.99: 0.001}).Query(0.99); g != 0 { + if g := NewTargeted(map[float64]float64{0.99: 0.001}).Query(0.99, true); g != 0 { t.Errorf("want 0, got %f", g) } } +func TestQueryFlush(t *testing.T) { + q := NewTargeted(map[float64]float64{0.99: 0.001}) + for i := 1; i <= 100; i++ { + q.Insert(float64(i)) + } + // A flush after all inserts should make all following `Query` + // give the same result with shouldFlush true or false. + q.flush() + if p := q.Query(0.90, true); p != 91 { + t.Error("want 91, got", p) + } + if p := q.Query(0.90, false); p != 91 { + t.Error("want 91, got", p) + } + + // Do an insert without forcing a flush. The Query with + // shouldFlush false will ignore the new value and return + // the same result as before. + q.Insert(float64(101)) + if p := q.Query(0.90, false); p != 91 { + t.Error("want 91, got", p) + } + // The Query with flush will update the value. + if p := q.Query(0.90, true); p != 92 { + t.Error("want 92, got", p) + } +} + func TestByteSize(t *testing.T) { // Empty size is nonzero. q := NewTargeted(Targets)