diff --git a/pkg/ccl/backupccl/backup_planning.go b/pkg/ccl/backupccl/backup_planning.go index 4cbd4d564279..360303768d91 100644 --- a/pkg/ccl/backupccl/backup_planning.go +++ b/pkg/ccl/backupccl/backup_planning.go @@ -1034,7 +1034,7 @@ func backupPlanHook( return errors.Wrap(err, "invalid previous backups") } if coveredTime != startTime { - return errors.Wrapf(err, "expected previous backups to cover until time %v, got %v", startTime, coveredTime) + return errors.Errorf("expected previous backups to cover until time %v, got %v", startTime, coveredTime) } } diff --git a/pkg/ccl/importccl/read_import_workload.go b/pkg/ccl/importccl/read_import_workload.go index 151e4b7c5dab..6215686a0110 100644 --- a/pkg/ccl/importccl/read_import_workload.go +++ b/pkg/ccl/importccl/read_import_workload.go @@ -157,7 +157,7 @@ func (w *workloadReader) readFiles( } } if t.Name == `` { - return errors.Wrapf(err, `unknown table %s for generator %s`, conf.Table, meta.Name) + return errors.Errorf(`unknown table %s for generator %s`, conf.Table, meta.Name) } wc := NewWorkloadKVConverter( diff --git a/pkg/jobs/BUILD.bazel b/pkg/jobs/BUILD.bazel index 598f95505bfd..f1bec751295d 100644 --- a/pkg/jobs/BUILD.bazel +++ b/pkg/jobs/BUILD.bazel @@ -30,6 +30,7 @@ go_library( "//pkg/roachpb", "//pkg/scheduledjobs", "//pkg/security", + "//pkg/server/telemetry", "//pkg/settings", "//pkg/settings/cluster", "//pkg/sql/catalog", diff --git a/pkg/jobs/metrics.go b/pkg/jobs/metrics.go index 064ef97bdbd8..00d46df58484 100644 --- a/pkg/jobs/metrics.go +++ b/pkg/jobs/metrics.go @@ -16,6 +16,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/util/metric" io_prometheus_client "github.com/prometheus/client_model/go" ) @@ -150,3 +151,39 @@ func (m *Metrics) init(histogramWindowInterval time.Duration) { // MakeChangefeedMetricsHook allows for registration of changefeed metrics from // ccl code. var MakeChangefeedMetricsHook func(time.Duration) metric.Struct + +// JobTelemetryMetrics is a telemetry metrics for individual job types. +type JobTelemetryMetrics struct { + Successful telemetry.Counter + Failed telemetry.Counter + Canceled telemetry.Counter +} + +// newJobTelemetryMetrics creates a new JobTelemetryMetrics object +// for a given job type name. +func newJobTelemetryMetrics(jobName string) *JobTelemetryMetrics { + return &JobTelemetryMetrics{ + Successful: telemetry.GetCounterOnce(fmt.Sprintf("job.%s.successful", jobName)), + Failed: telemetry.GetCounterOnce(fmt.Sprintf("job.%s.failed", jobName)), + Canceled: telemetry.GetCounterOnce(fmt.Sprintf("job.%s.canceled", jobName)), + } +} + +// getJobTelemetryMetricsArray initializes an array of job related telemetry +// metrics +func getJobTelemetryMetricsArray() [jobspb.NumJobTypes]*JobTelemetryMetrics { + var metrics [jobspb.NumJobTypes]*JobTelemetryMetrics + for i := 0; i < jobspb.NumJobTypes; i++ { + jt := jobspb.Type(i) + if jt == jobspb.TypeUnspecified { // do not track TypeUnspecified + continue + } + typeStr := strings.ToLower(strings.Replace(jt.String(), " ", "_", -1)) + metrics[i] = newJobTelemetryMetrics(typeStr) + } + return metrics +} + +// TelemetryMetrics contains telemetry metrics for different +// job types. +var TelemetryMetrics = getJobTelemetryMetricsArray() diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index 101cf0a3fb6e..cb02b5670c78 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog" @@ -1218,6 +1219,7 @@ func (r *Registry) stepThroughStateMachine( // restarted during the next adopt loop and reverting will be retried. return errors.Wrapf(err, "job %d: could not mark as canceled: %v", job.ID(), jobErr) } + telemetry.Inc(TelemetryMetrics[jobType].Canceled) return errors.WithSecondaryError(errors.Errorf("job %s", status), jobErr) case StatusSucceeded: if jobErr != nil { @@ -1232,6 +1234,7 @@ func (r *Registry) stepThroughStateMachine( // better. return r.stepThroughStateMachine(ctx, execCtx, resumer, job, StatusReverting, errors.Wrapf(err, "could not mark job %d as succeeded", job.ID())) } + telemetry.Inc(TelemetryMetrics[jobType].Successful) return nil case StatusReverting: if err := job.reverted(ctx, nil /* txn */, jobErr, nil /* fn */); err != nil { @@ -1286,6 +1289,7 @@ func (r *Registry) stepThroughStateMachine( // restarted during the next adopt loop and reverting will be retried. return errors.Wrapf(err, "job %d: could not mark as failed: %s", job.ID(), jobErr) } + telemetry.Inc(TelemetryMetrics[jobType].Failed) return jobErr default: return errors.NewAssertionErrorWithWrappedErrf(jobErr, diff --git a/pkg/server/node.go b/pkg/server/node.go index fc2c50571472..232601fc1379 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -233,7 +233,7 @@ func bootstrapCluster( if i == 0 { bootstrapVersion = cv } else if bootstrapVersion != cv { - return nil, errors.Wrapf(err, "found cluster versions %s and %s", bootstrapVersion, cv) + return nil, errors.Errorf("found cluster versions %s and %s", bootstrapVersion, cv) } sIdent := roachpb.StoreIdent{ diff --git a/pkg/sql/logictest/testdata/logic_test/alter_table b/pkg/sql/logictest/testdata/logic_test/alter_table index 0063789d8011..fa3271970a1d 100644 --- a/pkg/sql/logictest/testdata/logic_test/alter_table +++ b/pkg/sql/logictest/testdata/logic_test/alter_table @@ -1688,3 +1688,14 @@ SELECT count(descriptor_id) WHERE descriptor_id = ('test.public.t45985'::REGCLASS)::INT8; ---- 0 + +# Validate that the schema_change_successful metric +query T +SELECT feature_name FROM crdb_internal.feature_usage +WHERE feature_name IN ('job.schema_change.successful', +'job.schema_change.failed') AND +usage_count > 0 +ORDER BY feature_name DESC +---- +job.schema_change.successful +job.schema_change.failed diff --git a/pkg/sql/logictest/testdata/logic_test/distsql_stats b/pkg/sql/logictest/testdata/logic_test/distsql_stats index cfb8ce4f3eb7..eae91a2e8fce 100644 --- a/pkg/sql/logictest/testdata/logic_test/distsql_stats +++ b/pkg/sql/logictest/testdata/logic_test/distsql_stats @@ -1065,3 +1065,18 @@ SHOW STATISTICS USING JSON FOR TABLE greeting_stats statement ok ALTER TABLE greeting_stats INJECT STATISTICS '$stats' + +# Validate that the schema_change_successful metric +query T +SELECT feature_name FROM crdb_internal.feature_usage +WHERE feature_name in ('job.typedesc_schema_change.successful', +'job.schema_change.successful', +'job.create_stats.successful', +'job.auto_create_stats.successful') AND +usage_count > 0 +ORDER BY feature_name DESC +---- +job.typedesc_schema_change.successful +job.schema_change.successful +job.create_stats.successful +job.auto_create_stats.successful diff --git a/pkg/sql/logictest/testdata/logic_test/jobs b/pkg/sql/logictest/testdata/logic_test/jobs index 21214acf5197..79af2e7012cd 100644 --- a/pkg/sql/logictest/testdata/logic_test/jobs +++ b/pkg/sql/logictest/testdata/logic_test/jobs @@ -130,3 +130,14 @@ user testuser # testuser should no longer have the ability to control jobs. statement error pq: user testuser does not have CONTROLJOB privilege PAUSE JOB (SELECT job_id FROM [SHOW JOBS] WHERE user_name = 'testuser2' AND job_type = 'SCHEMA CHANGE GC') + +user root + +# Validate that the schema_change_successful metric +query T +SELECT feature_name FROM crdb_internal.feature_usage +WHERE feature_name in ('job.schema_change.successful') AND +usage_count > 0 +ORDER BY feature_name DESC +---- +job.schema_change.successful diff --git a/pkg/sql/logictest/testdata/logic_test/partial_index b/pkg/sql/logictest/testdata/logic_test/partial_index index 0f72d2d63d7b..234b0c5ebccf 100644 --- a/pkg/sql/logictest/testdata/logic_test/partial_index +++ b/pkg/sql/logictest/testdata/logic_test/partial_index @@ -1795,3 +1795,17 @@ CREATE TABLE t61414_c ( statement ok UPSERT INTO t61414_c (k, a, b, d) VALUES (1, 2, 3, 4) + +# Regression test for #61284. When building partial index DEL column +# expressions, there should not be ambiguous column errors if there exists +# columns in an UPDATE FROM clause that match column names in the partial index +# predicate. + +statement ok +CREATE TABLE t61284 ( + a INT, + INDEX (a) WHERE a > 0 +) + +statement ok +UPDATE t61284 SET a = v.a FROM (VALUES (1), (2)) AS v(a) WHERE t61284.a = v.a diff --git a/pkg/sql/logictest/testdata/logic_test/select b/pkg/sql/logictest/testdata/logic_test/select index e5b88db298bd..716cde581205 100644 --- a/pkg/sql/logictest/testdata/logic_test/select +++ b/pkg/sql/logictest/testdata/logic_test/select @@ -746,3 +746,7 @@ SELECT * FROM crdb_internal.jobs # Cleanup statement ok SET disallow_full_table_scans = false + +# Regression test for #58104. +statement ok +SELECT * FROM pg_catalog.pg_attrdef WHERE (adnum = 1 AND adrelid = 1) OR (adbin = 'foo' AND adrelid = 2) diff --git a/pkg/sql/mutations/mutations.go b/pkg/sql/mutations/mutations.go index 37817be6f409..ba3ef2aeabf9 100644 --- a/pkg/sql/mutations/mutations.go +++ b/pkg/sql/mutations/mutations.go @@ -313,6 +313,8 @@ func statisticsMutator( return stmts, changed } +// foreignKeyMutator is a MultiStatementMutation implementation which adds +// foreign key references between existing columns. func foreignKeyMutator( rng *rand.Rand, stmts []tree.Statement, ) (mutated []tree.Statement, changed bool) { @@ -369,6 +371,10 @@ func foreignKeyMutator( // Choose a random column subset. var fkCols []*tree.ColumnTableDef for _, c := range cols[table.Table] { + if c.Computed.Computed { + // We don't support FK references from computed columns (#46672). + continue + } if usedCols[table.Table][c.Name] { continue } @@ -429,6 +435,10 @@ func foreignKeyMutator( fkCol := fkCols[len(usingCols)] found := false for refI, refCol := range availCols { + if refCol.Computed.Virtual { + // We don't support FK references to virtual columns (#51296). + continue + } fkColType := tree.MustBeStaticallyKnownType(fkCol.Type) refColType := tree.MustBeStaticallyKnownType(refCol.Type) if fkColType.Equivalent(refColType) && colinfo.ColumnTypeIsIndexable(refColType) { diff --git a/pkg/sql/opt/optbuilder/mutation_builder.go b/pkg/sql/opt/optbuilder/mutation_builder.go index 109e84646dc7..6424bed8c4f8 100644 --- a/pkg/sql/opt/optbuilder/mutation_builder.go +++ b/pkg/sql/opt/optbuilder/mutation_builder.go @@ -287,10 +287,9 @@ func (mb *mutationBuilder) buildInputForUpdate( noRowLocking, inScope, ) - mb.outScope = mb.fetchScope // Set list of columns that will be fetched by the input expression. - mb.setFetchColIDs(mb.outScope.cols) + mb.setFetchColIDs(mb.fetchScope.cols) // If there is a FROM clause present, we must join all the tables // together with the table being updated. @@ -299,18 +298,25 @@ func (mb *mutationBuilder) buildInputForUpdate( fromScope := mb.b.buildFromTables(from, noRowLocking, inScope) // Check that the same table name is not used multiple times. - mb.b.validateJoinTableNames(mb.outScope, fromScope) + mb.b.validateJoinTableNames(mb.fetchScope, fromScope) // The FROM table columns can be accessed by the RETURNING clause of the // query and so we have to make them accessible. mb.extraAccessibleCols = fromScope.cols // Add the columns in the FROM scope. + // We create a new scope so that fetchScope is not modified. It will be + // used later to build partial index predicate expressions, and we do + // not want ambiguities with column names in the FROM clause. + mb.outScope = mb.fetchScope.replace() + mb.outScope.appendColumnsFromScope(mb.fetchScope) mb.outScope.appendColumnsFromScope(fromScope) - left := mb.outScope.expr.(memo.RelExpr) + left := mb.fetchScope.expr.(memo.RelExpr) right := fromScope.expr.(memo.RelExpr) mb.outScope.expr = mb.b.factory.ConstructInnerJoin(left, right, memo.TrueFilter, memo.EmptyJoinPrivate) + } else { + mb.outScope = mb.fetchScope } // WHERE diff --git a/pkg/sql/opt/optbuilder/testdata/partial-indexes b/pkg/sql/opt/optbuilder/testdata/partial-indexes index ea840602a2fd..fde352cd0d56 100644 --- a/pkg/sql/opt/optbuilder/testdata/partial-indexes +++ b/pkg/sql/opt/optbuilder/testdata/partial-indexes @@ -339,6 +339,59 @@ update partial_indexes ├── c:7 = 'delete-only' [as=partial_index_put3:13] └── c:7 = 'write-only' [as=partial_index_put4:14] +build +UPDATE partial_indexes SET a = v.a FROM (VALUES (1), (2)) AS v(a) WHERE partial_indexes.a = v.a +---- +update partial_indexes + ├── columns: + ├── fetch columns: a:5 b:6 c:7 + ├── update-mapping: + │ └── column1:9 => a:1 + ├── partial index put columns: partial_index_put1:10 partial_index_put2:11 partial_index_put3:13 partial_index_put4:14 + ├── partial index del columns: partial_index_put1:10 partial_index_del2:12 partial_index_put3:13 partial_index_put4:14 + └── project + ├── columns: partial_index_put1:10 partial_index_put2:11 partial_index_del2:12 partial_index_put3:13 partial_index_put4:14 a:5!null b:6 c:7 crdb_internal_mvcc_timestamp:8 column1:9!null + ├── distinct-on + │ ├── columns: a:5!null b:6 c:7 crdb_internal_mvcc_timestamp:8 column1:9!null + │ ├── grouping columns: a:5!null + │ ├── select + │ │ ├── columns: a:5!null b:6 c:7 crdb_internal_mvcc_timestamp:8 column1:9!null + │ │ ├── inner-join (cross) + │ │ │ ├── columns: a:5!null b:6 c:7 crdb_internal_mvcc_timestamp:8 column1:9!null + │ │ │ ├── scan partial_indexes + │ │ │ │ ├── columns: a:5!null b:6 c:7 crdb_internal_mvcc_timestamp:8 + │ │ │ │ └── partial index predicates + │ │ │ │ ├── secondary: filters + │ │ │ │ │ └── c:7 = 'foo' + │ │ │ │ ├── secondary: filters + │ │ │ │ │ └── (a:5 > b:6) AND (c:7 = 'bar') + │ │ │ │ ├── b: filters + │ │ │ │ │ └── c:7 = 'delete-only' + │ │ │ │ └── b: filters + │ │ │ │ └── c:7 = 'write-only' + │ │ │ ├── values + │ │ │ │ ├── columns: column1:9!null + │ │ │ │ ├── (1,) + │ │ │ │ └── (2,) + │ │ │ └── filters (true) + │ │ └── filters + │ │ └── a:5 = column1:9 + │ └── aggregations + │ ├── first-agg [as=b:6] + │ │ └── b:6 + │ ├── first-agg [as=c:7] + │ │ └── c:7 + │ ├── first-agg [as=crdb_internal_mvcc_timestamp:8] + │ │ └── crdb_internal_mvcc_timestamp:8 + │ └── first-agg [as=column1:9] + │ └── column1:9 + └── projections + ├── c:7 = 'foo' [as=partial_index_put1:10] + ├── (column1:9 > b:6) AND (c:7 = 'bar') [as=partial_index_put2:11] + ├── (a:5 > b:6) AND (c:7 = 'bar') [as=partial_index_del2:12] + ├── c:7 = 'delete-only' [as=partial_index_put3:13] + └── c:7 = 'write-only' [as=partial_index_put4:14] + # Do not error with "column reference is ambiguous" when table column names # match synthesized column names. build diff --git a/pkg/sql/rowenc/testutils.go b/pkg/sql/rowenc/testutils.go index ba0ec75fd38a..c8de9a416649 100644 --- a/pkg/sql/rowenc/testutils.go +++ b/pkg/sql/rowenc/testutils.go @@ -1146,6 +1146,15 @@ func RandCreateTableWithInterleave( } } } + + // colIdx generates numbers that are incorporated into column names. + colIdx := func(ordinal int) int { + if generateColumnIndexNumber != nil { + return int(generateColumnIndexNumber()) + } + return ordinal + } + var interleaveDef *tree.InterleaveDef if interleaveIntoPK != nil && len(interleaveIntoPK.Columns) > 0 { // Make the interleave prefix, which has to be exactly the columns in the @@ -1165,11 +1174,7 @@ func RandCreateTableWithInterleave( // Loop until we generate an indexable column type. var extraCol *tree.ColumnTableDef for { - colIdx := i + prefixLength - if generateColumnIndexNumber != nil { - colIdx = int(generateColumnIndexNumber()) - } - extraCol = randColumnTableDef(rng, tableIdx, colIdx) + extraCol = randColumnTableDef(rng, tableIdx, colIdx(i+prefixLength)) extraColType := tree.MustBeStaticallyKnownType(extraCol.Type) if colinfo.ColumnTypeIsIndexable(extraColType) { break @@ -1205,12 +1210,10 @@ func RandCreateTableWithInterleave( } } else { // Make new defs from scratch. - for i := 0; i < nColumns; i++ { - colIdx := i - if generateColumnIndexNumber != nil { - colIdx = int(generateColumnIndexNumber()) - } - columnDef := randColumnTableDef(rng, tableIdx, colIdx) + nComputedColumns := randutil.RandIntInRange(rng, 0, (nColumns+1)/2) + nNormalColumns := nColumns - nComputedColumns + for i := 0; i < nNormalColumns; i++ { + columnDef := randColumnTableDef(rng, tableIdx, colIdx(i)) columnDefs = append(columnDefs, columnDef) defs = append(defs, columnDef) } @@ -1236,6 +1239,14 @@ func RandCreateTableWithInterleave( } } } + + // Make defs for computed columns. + normalColDefs := columnDefs + for i := nNormalColumns; i < nColumns; i++ { + columnDef := randComputedColumnTableDef(rng, normalColDefs, tableIdx, colIdx(i)) + columnDefs = append(columnDefs, columnDef) + defs = append(defs, columnDef) + } } // Make indexes. @@ -1374,11 +1385,15 @@ func IndexStoringMutator(rng *rand.Rand, stmts []tree.Statement) ([]tree.Stateme } return colMap } - generateStoringCols := func(rng *rand.Rand, tableCols []tree.Name, indexCols map[tree.Name]struct{}) []tree.Name { + generateStoringCols := func(rng *rand.Rand, tableInfo tableInfo, indexCols map[tree.Name]struct{}) []tree.Name { var storingCols []tree.Name - for _, col := range tableCols { - _, ok := indexCols[col] - if ok { + for colOrdinal, col := range tableInfo.columnNames { + if _, ok := indexCols[col]; ok { + // Skip PK columns and columns already in the index. + continue + } + if tableInfo.columnsTableDefs[colOrdinal].Computed.Virtual { + // Virtual columns can't be stored. continue } if rng.Intn(2) == 0 { @@ -1403,7 +1418,7 @@ func IndexStoringMutator(rng *rand.Rand, stmts []tree.Statement) ([]tree.Stateme for _, elem := range ast.Columns { indexCols[elem.Column] = struct{}{} } - ast.Storing = generateStoringCols(rng, tableInfo.columnNames, indexCols) + ast.Storing = generateStoringCols(rng, tableInfo, indexCols) changed = true } case *tree.CreateTable: @@ -1430,7 +1445,7 @@ func IndexStoringMutator(rng *rand.Rand, stmts []tree.Statement) ([]tree.Stateme for _, elem := range idx.Columns { indexCols[elem.Column] = struct{}{} } - idx.Storing = generateStoringCols(rng, tableInfo.columnNames, indexCols) + idx.Storing = generateStoringCols(rng, tableInfo, indexCols) changed = true } } @@ -1493,8 +1508,8 @@ func PartialIndexMutator(rng *rand.Rand, stmts []tree.Statement) ([]tree.Stateme return stmts, changed } -// randColumnTableDef produces a random ColumnTableDef, with a random type and -// nullability. +// randColumnTableDef produces a random ColumnTableDef for a non-computed +// column, with a random type and nullability. func randColumnTableDef(rand *rand.Rand, tableIdx int, colIdx int) *tree.ColumnTableDef { columnDef := &tree.ColumnTableDef{ // We make a unique name for all columns by prefixing them with the table @@ -1506,6 +1521,111 @@ func randColumnTableDef(rand *rand.Rand, tableIdx int, colIdx int) *tree.ColumnT return columnDef } +// randComputedColumnTableDef produces a random ColumnTableDef for a computed +// column (either STORED or VIRTUAL). The computed expressions refer to columns +// in normalColDefs. +func randComputedColumnTableDef( + rng *rand.Rand, normalColDefs []*tree.ColumnTableDef, tableIdx int, colIdx int, +) *tree.ColumnTableDef { + newDef := randColumnTableDef(rng, tableIdx, colIdx) + newDef.Computed.Computed = true + newDef.Computed.Virtual = (rng.Intn(2) == 0) + + if rng.Intn(2) == 0 { + // Try to find a set of numeric columns with the same type; the computed + // expression will be of the form "a+b+c". + var cols []*tree.ColumnTableDef + var fam types.Family + for _, idx := range rng.Perm(len(normalColDefs)) { + x := normalColDefs[idx] + xFam := x.Type.(*types.T).Family() + + if len(cols) == 0 { + switch xFam { + case types.IntFamily, types.FloatFamily, types.DecimalFamily: + fam = xFam + cols = append(cols, x) + } + } else if fam == xFam { + cols = append(cols, x) + if len(cols) > 1 && rng.Intn(2) == 0 { + break + } + } + } + if len(cols) > 1 { + var expr tree.Expr + expr = tree.NewUnresolvedName(string(cols[0].Name)) + for _, x := range cols[1:] { + expr = &tree.BinaryExpr{ + Operator: tree.Plus, + Left: expr, + Right: tree.NewUnresolvedName(string(x.Name)), + } + } + newDef.Type = cols[0].Type + newDef.Computed.Expr = expr + return newDef + } + } + + // Pick a single column and create a computed column that depends on it. + // The expression is as follows: + // - for numeric types (int, float, decimal), the expression is "x+1"; + // - for string type, the expression is "lower(x)"; + // - for types that can be cast to string in computed columns, the expression + // is "lower(x::string)"; + // - otherwise, the expression is "IF(x IS NULL, 'foo', 'bar')". + x := normalColDefs[randutil.RandIntInRange(rng, 0, len(normalColDefs))] + xTyp := x.Type.(*types.T) + + switch xTyp.Family() { + case types.IntFamily, types.FloatFamily, types.DecimalFamily: + newDef.Type = xTyp + newDef.Computed.Expr = &tree.BinaryExpr{ + Operator: tree.Plus, + Left: tree.NewUnresolvedName(string(x.Name)), + Right: RandDatum(rng, xTyp, false /* nullOk */), + } + + case types.StringFamily: + newDef.Type = types.String + newDef.Computed.Expr = &tree.FuncExpr{ + Func: tree.WrapFunction("lower"), + Exprs: tree.Exprs{tree.NewUnresolvedName(string(x.Name))}, + } + + default: + volatility, ok := tree.LookupCastVolatility(xTyp, types.String) + if ok && volatility <= tree.VolatilityImmutable { + // We can cast to string; use lower(x::string) + newDef.Type = types.String + newDef.Computed.Expr = &tree.FuncExpr{ + Func: tree.WrapFunction("lower"), + Exprs: tree.Exprs{ + &tree.CastExpr{ + Expr: tree.NewUnresolvedName(string(x.Name)), + Type: types.String, + }, + }, + } + } else { + // We cannot cast this type to string in a computed column expression. + // Use IF(x IS NULL, 'foo', 'bar'). + newDef.Type = types.String + newDef.Computed.Expr = &tree.IfExpr{ + Cond: &tree.IsNullExpr{ + Expr: tree.NewUnresolvedName(string(x.Name)), + }, + True: RandDatum(rng, types.String, true /* nullOK */), + Else: RandDatum(rng, types.String, true /* nullOK */), + } + } + } + + return newDef +} + // randIndexTableDefFromCols creates an IndexTableDef with a random subset of // the given columns and a random direction. func randIndexTableDefFromCols( diff --git a/pkg/sql/tests/random_schema_test.go b/pkg/sql/tests/random_schema_test.go index f2ba00715037..0b971d663e88 100644 --- a/pkg/sql/tests/random_schema_test.go +++ b/pkg/sql/tests/random_schema_test.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -46,23 +47,27 @@ func TestCreateRandomSchema(t *testing.T) { t.Fatal(err) } + toStr := func(c tree.Statement) string { + return tree.AsStringWithFlags(c, tree.FmtParsable) + } + rng := rand.New(rand.NewSource(timeutil.Now().UnixNano())) for i := 0; i < 100; i++ { - tab := rowenc.RandCreateTable(rng, "table", i) + createTable := rowenc.RandCreateTable(rng, "table", i) setDb(t, db, "test") - _, err := db.Exec(tab.String()) + _, err := db.Exec(toStr(createTable)) if err != nil { - t.Fatal(tab, err) + t.Fatal(createTable, err) } var tabName, tabStmt, secondTabStmt string if err := db.QueryRow(fmt.Sprintf("SHOW CREATE TABLE %s", - tab.Table.String())).Scan(&tabName, &tabStmt); err != nil { + createTable.Table.String())).Scan(&tabName, &tabStmt); err != nil { t.Fatal(err) } - if tabName != tab.Table.String() { - t.Fatalf("found table name %s, expected %s", tabName, tab.Table.String()) + if tabName != createTable.Table.String() { + t.Fatalf("found table name %s, expected %s", tabName, createTable.Table.String()) } // Reparse the show create table statement that's stored in the database. @@ -75,7 +80,7 @@ func TestCreateRandomSchema(t *testing.T) { // Now run the SHOW CREATE TABLE statement we found on a new db and verify // that both tables are the same. - _, err = db.Exec(parsed.AST.String()) + _, err = db.Exec(tree.AsStringWithFlags(parsed.AST, tree.FmtParsable)) if err != nil { t.Fatal(parsed.AST, err) } @@ -85,21 +90,21 @@ func TestCreateRandomSchema(t *testing.T) { t.Fatal(err) } - if tabName != tab.Table.String() { - t.Fatalf("found table name %s, expected %s", tabName, tab.Table.String()) + if tabName != createTable.Table.String() { + t.Fatalf("found table name %s, expected %s", tabName, createTable.Table.String()) } // Reparse the show create table statement that's stored in the database. secondParsed, err := parser.ParseOne(secondTabStmt) if err != nil { t.Fatalf("error parsing show create table: %s", err) } - if parsed.AST.String() != secondParsed.AST.String() { + if toStr(parsed.AST) != toStr(secondParsed.AST) { t.Fatalf("for input statement\n%s\nfound first output\n%q\nbut second output\n%q", - tab.String(), parsed.AST.String(), secondParsed.AST.String()) + toStr(createTable), toStr(parsed.AST), toStr(secondParsed.AST)) } if tabStmt != secondTabStmt { t.Fatalf("for input statement\n%s\nfound first output\n%q\nbut second output\n%q", - tab.String(), tabStmt, secondTabStmt) + toStr(createTable), tabStmt, secondTabStmt) } } } diff --git a/pkg/util/tracing/BUILD.bazel b/pkg/util/tracing/BUILD.bazel index 7be975183dd2..87e35eeee0a7 100644 --- a/pkg/util/tracing/BUILD.bazel +++ b/pkg/util/tracing/BUILD.bazel @@ -67,6 +67,7 @@ go_test( "//pkg/util/iterutil", "//pkg/util/leaktest", "//pkg/util/stop", + "//pkg/util/timeutil", "//pkg/util/tracing/tracingpb", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_logtags//:logtags", diff --git a/pkg/util/tracing/crdbspan.go b/pkg/util/tracing/crdbspan.go index 4c3b4b6c5f9d..98e3c237e365 100644 --- a/pkg/util/tracing/crdbspan.go +++ b/pkg/util/tracing/crdbspan.go @@ -24,7 +24,6 @@ import ( "github.com/cockroachdb/logtags" "github.com/gogo/protobuf/types" "github.com/opentracing/opentracing-go" - otlog "github.com/opentracing/opentracing-go/log" ) // crdbSpan is a span for internal crdb usage. This is used to power SQL session @@ -48,7 +47,12 @@ type crdbSpan struct { // tag's key to a user. logTags *logtags.Buffer - mu crdbSpanMu + mu crdbSpanMu + testing *testingKnob +} + +type testingKnob struct { + clock timeutil.TimeSource } type crdbSpanMu struct { @@ -56,13 +60,20 @@ type crdbSpanMu struct { // duration is initialized to -1 and set on Finish(). duration time.Duration - // recording maintains state once StartRecording() is called. recording struct { // recordingType is the recording type of the ongoing recording, if any. // Its 'load' method may be called without holding the surrounding mutex, // but its 'swap' method requires the mutex. recordingType atomicRecordingType - recordedLogs []opentracing.LogRecord + + logs sizeLimitedBuffer // of *tracingpb.LogRecords + structured sizeLimitedBuffer // of Structured events + + // dropped is true if the span has capped out it's memory limits for + // logs and structured events, and has had to drop some. It's used to + // annotate recordings with the _dropped tag, when applicable. + dropped bool + // children contains the list of child spans started after this Span // started recording. children []*crdbSpan @@ -79,12 +90,27 @@ type crdbSpanMu struct { // those that were set before recording started)? tags opentracing.Tags - structured ring.Buffer // of Structured events - // The Span's associated baggage. baggage map[string]string } +func newSizeLimitedBuffer(limit int64) sizeLimitedBuffer { + return sizeLimitedBuffer{ + limit: limit, + } +} + +type sizeLimitedBuffer struct { + ring.Buffer + size int64 // in bytes + limit int64 // in bytes +} + +func (b *sizeLimitedBuffer) Reset() { + b.Buffer.Reset() + b.size = 0 +} + func (s *crdbSpan) recordingType() RecordingType { if s == nil { return RecordingOff @@ -122,7 +148,10 @@ func (s *crdbSpan) resetRecording() { s.mu.Lock() defer s.mu.Unlock() - s.mu.recording.recordedLogs = nil + s.mu.recording.logs.Reset() + s.mu.recording.structured.Reset() + s.mu.recording.dropped = false + s.mu.recording.children = nil s.mu.recording.remoteSpans = nil } @@ -209,26 +238,54 @@ func (s *crdbSpan) record(msg string) { return } - s.mu.Lock() - defer s.mu.Unlock() - if len(s.mu.recording.recordedLogs) < maxLogsPerSpan { - s.mu.recording.recordedLogs = append(s.mu.recording.recordedLogs, opentracing.LogRecord{ - Timestamp: time.Now(), - Fields: []otlog.Field{ - otlog.String(tracingpb.LogMessageField, msg), - }, - }) + var now time.Time + if s.testing != nil { + now = s.testing.clock.Now() + } else { + now = time.Now() } + logRecord := &tracingpb.LogRecord{ + Time: now, + Fields: []tracingpb.LogRecord_Field{ + {Key: tracingpb.LogMessageField, Value: msg}, + }, + } + + s.recordInternal(logRecord, &s.mu.recording.logs) } func (s *crdbSpan) recordStructured(item Structured) { + s.recordInternal(item, &s.mu.recording.structured) +} + +// sizable is a subset for protoutil.Message, for payloads (log records and +// structured events) that can be recorded. +type sizable interface { + Size() int +} + +func (s *crdbSpan) recordInternal(payload sizable, buffer *sizeLimitedBuffer) { s.mu.Lock() defer s.mu.Unlock() - if s.mu.structured.Len() == maxStructuredEventsPerSpan { - s.mu.structured.RemoveLast() + size := int64(payload.Size()) + if size > buffer.limit { + // The incoming payload alone blows past the memory limit. Let's just + // drop it. + s.mu.recording.dropped = true + return + } + + buffer.size += size + if buffer.size > buffer.limit { + s.mu.recording.dropped = true } - s.mu.structured.AddFirst(item) + for buffer.size > buffer.limit { + first := buffer.GetFirst().(sizable) + buffer.RemoveFirst() + buffer.size -= int64(first.Size()) + } + buffer.AddLast(payload) } func (s *crdbSpan) setBaggageItemAndTag(restrictedKey, value string) { @@ -294,12 +351,15 @@ func (s *crdbSpan) getRecordingLocked(wantTags bool) tracingpb.RecordedSpan { if s.mu.recording.recordingType.load() == RecordingVerbose { addTag("_verbose", "1") } + if s.mu.recording.dropped { + addTag("_dropped", "1") + } } - if numEvents := s.mu.structured.Len(); numEvents != 0 { + if numEvents := s.mu.recording.structured.Len(); numEvents != 0 { rs.InternalStructured = make([]*types.Any, 0, numEvents) for i := 0; i < numEvents; i++ { - event := s.mu.structured.Get(i).(Structured) + event := s.mu.recording.structured.Get(i).(Structured) item, err := types.MarshalAny(event) if err != nil { // An error here is an error from Marshal; these @@ -330,15 +390,11 @@ func (s *crdbSpan) getRecordingLocked(wantTags bool) tracingpb.RecordedSpan { } } - rs.Logs = make([]tracingpb.LogRecord, len(s.mu.recording.recordedLogs)) - for i, r := range s.mu.recording.recordedLogs { - rs.Logs[i].Time = r.Timestamp - rs.Logs[i].Fields = make([]tracingpb.LogRecord_Field, len(r.Fields)) - for j, f := range r.Fields { - rs.Logs[i].Fields[j] = tracingpb.LogRecord_Field{ - Key: f.Key(), - Value: fmt.Sprint(f.Value()), - } + if numLogs := s.mu.recording.logs.Len(); numLogs != 0 { + rs.Logs = make([]tracingpb.LogRecord, numLogs) + for i := 0; i < numLogs; i++ { + lr := s.mu.recording.logs.Get(i).(*tracingpb.LogRecord) + rs.Logs[i] = *lr } } diff --git a/pkg/util/tracing/shadow.go b/pkg/util/tracing/shadow.go index db7d6f9ffc44..62c6b944aac2 100644 --- a/pkg/util/tracing/shadow.go +++ b/pkg/util/tracing/shadow.go @@ -106,7 +106,7 @@ func makeShadowSpan( func createLightStepTracer(token string) (shadowTracerManager, opentracing.Tracer) { return lightStepManager{}, lightstep.NewTracer(lightstep.Options{ AccessToken: token, - MaxLogsPerSpan: maxLogsPerSpan, + MaxLogsPerSpan: maxLogsPerSpanExternal, MaxBufferedSpans: 10000, UseGRPC: true, }) diff --git a/pkg/util/tracing/span.go b/pkg/util/tracing/span.go index d1ab1e66e749..db36b5044d7a 100644 --- a/pkg/util/tracing/span.go +++ b/pkg/util/tracing/span.go @@ -94,6 +94,12 @@ func (sp *Span) Finish() { // As a performance optimization, GetRecording does not return tags when the // underlying Span is not verbose. Returning tags requires expensive // stringification. +// +// A few internal tags are added to denote span properties: +// +// "_unfinished" The span was never Finish()ed +// "_verbose" The span is a verbose one +// "_dropped" The span dropped recordings due to sizing constraints func (sp *Span) GetRecording() Recording { // It's always valid to get the recording, even for a finished span. return sp.i.GetRecording() @@ -149,7 +155,8 @@ func (sp *Span) IsVerbose() bool { return sp.i.IsVerbose() } -// Record provides a way to record free-form text into verbose spans. +// Record provides a way to record free-form text into verbose spans. Recordings +// may be dropped due to sizing constraints. // // TODO(irfansharif): We don't currently have redactability with trace // recordings (both here, and using RecordStructured above). We'll want to do this @@ -171,7 +178,8 @@ func (sp *Span) Recordf(format string, args ...interface{}) { // RecordStructured adds a Structured payload to the Span. It will be added to // the recording even if the Span is not verbose; however it will be discarded -// if the underlying Span has been optimized out (i.e. is a noop span). +// if the underlying Span has been optimized out (i.e. is a noop span). Payloads +// may also be dropped due to sizing constraints. // // The caller must not mutate the item once RecordStructured has been called. func (sp *Span) RecordStructured(item Structured) { diff --git a/pkg/util/tracing/span_test.go b/pkg/util/tracing/span_test.go index fae2aaa2e192..f6a0c0ff5203 100644 --- a/pkg/util/tracing/span_test.go +++ b/pkg/util/tracing/span_test.go @@ -18,6 +18,8 @@ import ( "testing" "time" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" "github.com/gogo/protobuf/types" @@ -34,8 +36,14 @@ func TestRecordingString(t *testing.T) { root := tr.StartSpan("root", WithForceRealSpan()) root.SetVerbose(true) root.Record("root 1") - // Hackily fix the timing on the first log message, so that we can check it later. - root.i.crdb.mu.recording.recordedLogs[0].Timestamp = root.i.crdb.startTime.Add(time.Millisecond) + { + // Hackily fix the timing on the first log message, so that we can check it later. + r := root.i.crdb.mu.recording.logs.GetFirst().(*tracingpb.LogRecord) + r.Time = root.i.crdb.startTime.Add(time.Millisecond) + root.i.crdb.mu.recording.logs.RemoveFirst() + root.i.crdb.mu.recording.logs.AddFirst(r) + } + // Sleep a bit so that everything that comes afterwards has higher timestamps // than the one we just assigned. Otherwise the sorting will be screwed up. time.Sleep(10 * time.Millisecond) @@ -214,31 +222,169 @@ func TestSpanRecordStructured(t *testing.T) { `)) } +// TestSpanRecordStructuredLimit tests recording behavior when the size of +// structured data recorded into the span exceeds the configured limit. func TestSpanRecordStructuredLimit(t *testing.T) { tr := NewTracer() sp := tr.StartSpan("root", WithForceRealSpan()) defer sp.Finish() + pad := func(i int) string { return fmt.Sprintf("%06d", i) } + payload := func(i int) Structured { return &types.StringValue{Value: pad(i)} } + + numPayloads := maxStructuredBytesPerSpan / payload(42).Size() const extra = 10 - for i := int32(1); i <= maxStructuredEventsPerSpan+extra; i++ { - sp.RecordStructured(&types.Int32Value{Value: i}) + for i := 1; i <= numPayloads+extra; i++ { + sp.RecordStructured(payload(i)) } + + sp.SetVerbose(true) rec := sp.GetRecording() require.Len(t, rec, 1) - require.Len(t, rec[0].InternalStructured, maxStructuredEventsPerSpan) + require.Len(t, rec[0].InternalStructured, numPayloads) + require.Equal(t, "1", rec[0].Tags["_dropped"]) first := rec[0].InternalStructured[0] last := rec[0].InternalStructured[len(rec[0].InternalStructured)-1] var d1 types.DynamicAny require.NoError(t, types.UnmarshalAny(first, &d1)) - require.IsType(t, (*types.Int32Value)(nil), d1.Message) + require.IsType(t, (*types.StringValue)(nil), d1.Message) + + var res string + require.NoError(t, types.StdStringUnmarshal(&res, first.Value)) + require.Equal(t, pad(extra+1), res) + + require.NoError(t, types.StdStringUnmarshal(&res, last.Value)) + require.Equal(t, pad(numPayloads+extra), res) +} + +// TestSpanRecordLimit tests recording behavior when the amount of data logged +// into the span exceeds the configured limit. +func TestSpanRecordLimit(t *testing.T) { + // Logs include the timestamp, and we want to fix them so they're not + // variably sized (needed for the test below). + clock := &timeutil.ManualTime{} + tr := NewTracer() + tr.testing = &testingKnob{clock} + + sp := tr.StartSpan("root", WithForceRealSpan()) + defer sp.Finish() + sp.SetVerbose(true) + + msg := func(i int) string { return fmt.Sprintf("msg: %10d", i) } + + // Determine the size of a log record by actually recording once. + sp.Record(msg(42)) + logSize := sp.GetRecording()[0].Logs[0].Size() + sp.ResetRecording() + + numLogs := maxLogBytesPerSpan / logSize + const extra = 10 + for i := 1; i <= numLogs+extra; i++ { + sp.Record(msg(i)) + } + + rec := sp.GetRecording() + require.Len(t, rec, 1) + require.Len(t, rec[0].Logs, numLogs) + require.Equal(t, rec[0].Tags["_dropped"], "1") + + first := rec[0].Logs[0] + last := rec[0].Logs[len(rec[0].Logs)-1] + + require.Equal(t, first.Fields[0].Value, msg(extra+1)) + require.Equal(t, last.Fields[0].Value, msg(numLogs+extra)) +} + +// testStructuredImpl is a testing implementation of Structured event. +type testStructuredImpl struct { + *types.Int32Value +} + +var _ Structured = &testStructuredImpl{} - var res int32 - require.NoError(t, types.StdInt32Unmarshal(&res, first.Value)) - require.Equal(t, res, int32(maxStructuredEventsPerSpan+extra)) +func (t *testStructuredImpl) String() string { + return fmt.Sprintf("structured=%d", t.Value) +} + +func newTestStructured(i int) *testStructuredImpl { + return &testStructuredImpl{ + &types.Int32Value{Value: int32(i)}, + } +} + +// TestSpanReset checks that resetting a span clears out existing recordings. +func TestSpanReset(t *testing.T) { + // Logs include the timestamp, and we want to fix them so they're not + // variably sized (needed for the test below). + clock := &timeutil.ManualTime{} + tr := NewTracer() + tr.testing = &testingKnob{clock} + + sp := tr.StartSpan("root", WithForceRealSpan()) + defer sp.Finish() + sp.SetVerbose(true) + + for i := 1; i <= 10; i++ { + if i%2 == 0 { + sp.RecordStructured(newTestStructured(i)) + } else { + sp.Record(fmt.Sprintf("%d", i)) + } + } + + require.NoError(t, TestingCheckRecordedSpans(sp.GetRecording(), ` + span: root + tags: _unfinished=1 _verbose=1 + event: 1 + event: structured=2 + event: 3 + event: structured=4 + event: 5 + event: structured=6 + event: 7 + event: structured=8 + event: 9 + event: structured=10 + `)) + require.NoError(t, TestingCheckRecording(sp.GetRecording(), ` + === operation:root _unfinished:1 _verbose:1 + event:1 + event:structured=2 + event:3 + event:structured=4 + event:5 + event:structured=6 + event:7 + event:structured=8 + event:9 + event:structured=10 + `)) + + sp.ResetRecording() + + require.NoError(t, TestingCheckRecordedSpans(sp.GetRecording(), ` + span: root + tags: _unfinished=1 _verbose=1 + `)) + require.NoError(t, TestingCheckRecording(sp.GetRecording(), ` + === operation:root _unfinished:1 _verbose:1 + `)) + + msg := func(i int) string { return fmt.Sprintf("msg: %010d", i) } + sp.Record(msg(42)) + logSize := sp.GetRecording()[0].Logs[0].Size() + numLogs := maxLogBytesPerSpan / logSize + const extra = 10 + + for i := 1; i <= numLogs+extra; i++ { + sp.Record(msg(i)) + } - require.NoError(t, types.StdInt32Unmarshal(&res, last.Value)) - require.Equal(t, res, int32(extra+1)) + require.Equal(t, sp.GetRecording()[0].Tags["_dropped"], "1") + sp.ResetRecording() + _, found := sp.GetRecording()[0].Tags["_dropped"] + require.False(t, found) } func TestNonVerboseChildSpanRegisteredWithParent(t *testing.T) { diff --git a/pkg/util/tracing/tracer.go b/pkg/util/tracing/tracer.go index 8e69a7ca157f..e581666b3b19 100644 --- a/pkg/util/tracing/tracer.go +++ b/pkg/util/tracing/tracer.go @@ -41,17 +41,18 @@ import ( const verboseTracingBaggageKey = "sb" const ( - // maxLogsPerSpan limits the number of logs in a Span; use a comfortable - // limit. - maxLogsPerSpan = 1000 - // maxStructuredEventsPerSpan limits the number of structured events in a - // span; use a comfortable limit. - maxStructuredEventsPerSpan = 50 + // maxRecordedBytesPerSpan limits the size of logs and structured in a span; + // use a comfortable limit. + maxLogBytesPerSpan = 256 * (1 << 10) // 256 KiB + maxStructuredBytesPerSpan = 10 * (1 << 10) // 10 KiB // maxChildrenPerSpan limits the number of (direct) child spans in a Span. maxChildrenPerSpan = 1000 // maxSpanRegistrySize limits the number of local root spans tracked in // a Tracer's registry. maxSpanRegistrySize = 5000 + // maxLogsPerSpanExternal limits the number of logs in a Span for external + // tracers (net/trace, lightstep); use a comfortable limit. + maxLogsPerSpanExternal = 1000 ) // These constants are used to form keys to represent tracing context @@ -146,6 +147,8 @@ type Tracer struct { TracingVerbosityIndependentSemanticsIsActive func() bool includeAsyncSpansInRecordings bool // see TestingIncludeAsyncSpansInRecordings + + testing *testingKnob } // NewTracer creates a Tracer. It initially tries to run with minimal overhead @@ -321,7 +324,7 @@ func (t *Tracer) startSpanGeneric( var netTr trace.Trace if t.useNetTrace() { netTr = trace.New("tracing", opName) - netTr.SetMaxEvents(maxLogsPerSpan) + netTr.SetMaxEvents(maxLogsPerSpanExternal) // If LogTags are given, pass them as tags to the shadow span. // Regular tags are populated later, via the top-level Span. @@ -367,8 +370,10 @@ func (t *Tracer) startSpanGeneric( mu: crdbSpanMu{ duration: -1, // unfinished }, + testing: t.testing, } - helper.crdbSpan.mu.structured.Reserve(maxStructuredEventsPerSpan) + helper.crdbSpan.mu.recording.logs = newSizeLimitedBuffer(maxLogBytesPerSpan) + helper.crdbSpan.mu.recording.structured = newSizeLimitedBuffer(maxStructuredBytesPerSpan) helper.span.i = spanInner{ tracer: t, crdb: &helper.crdbSpan, diff --git a/pkg/util/tracing/tracer_test.go b/pkg/util/tracing/tracer_test.go index 9fd05bc9c0aa..dc0cefd11471 100644 --- a/pkg/util/tracing/tracer_test.go +++ b/pkg/util/tracing/tracer_test.go @@ -351,7 +351,7 @@ func TestShadowTracer(t *testing.T) { Port: 65535, Plaintext: true, }, - MaxLogsPerSpan: maxLogsPerSpan, + MaxLogsPerSpan: maxLogsPerSpanExternal, UseGRPC: true, }), },