From 0eaed19428ec194368685eaa5eda24904e65a936 Mon Sep 17 00:00:00 2001 From: anzoteh96 Date: Wed, 1 Jul 2020 20:53:41 -0400 Subject: [PATCH 1/4] importccl: support `unique_rowid()` as default expression for IMPORT INTO The PR #50295 supports non-targeted columns with constant expression. This PR is a follow up to that in adding support to `unique_rowid()`. Previously, the only support given to rowid as a default expression is for hidden column, which is a function of timestamp, row number, and source ID (the ID of processor). To accommodate for more usage of unique_rowid(), this PR modifies the unique_rowid function by making unique_rowid as a function of timestamp, row number, source ID, the total occurrences of unique_rowid in the table schema, and instances of each unique_rowid within each row. In addition, this PR also modifies the visitor method #51390 by adding override methods for volatile methods like unique_rowid. Annotations containing the total occurrences of unique_rowid and unique_rowid instances within a row are stored inside evalCtx, which will be read and updated when visitor walks through the default expression at the sanitization stage, and when default expression is evaluated at each row. Partially addresses #48253 Release note (general change): IMPORT INTO now supports `unique_rowid()` as a default expression. --- pkg/ccl/importccl/import_stmt_test.go | 85 ++++++- pkg/ccl/importccl/read_import_base.go | 2 +- pkg/ccl/importccl/testdata/pgdump/geo.sql | 8 +- pkg/sql/row/row_converter.go | 280 +++++++++++++++------- 4 files changed, 279 insertions(+), 96 deletions(-) diff --git a/pkg/ccl/importccl/import_stmt_test.go b/pkg/ccl/importccl/import_stmt_test.go index bba91dac1c6c..ff7d3f2adf1b 100644 --- a/pkg/ccl/importccl/import_stmt_test.go +++ b/pkg/ccl/importccl/import_stmt_test.go @@ -2946,6 +2946,10 @@ func BenchmarkCSVConvertRecord(b *testing.B) { b.ReportAllocs() } +func selectNotNull(col string) string { + return fmt.Sprintf(`SELECT %s FROM t WHERE %s IS NOT NULL`, col, col) +} + // Test that IMPORT INTO works when columns with default expressions are present. // The default expressions supported by IMPORT INTO are constant expressions, // which are literals and functions that always return the same value given the @@ -2958,6 +2962,10 @@ func TestImportDefault(t *testing.T) { defer log.Scope(t).Close(t) const nodes = 3 + numFiles := nodes + 2 + rowsPerFile := 1000 + rowsPerRaceFile := 16 + testFiles := makeCSVData(t, numFiles, rowsPerFile, nodes, rowsPerRaceFile) ctx := context.Background() baseDir := filepath.Join("testdata", "csv") @@ -3231,6 +3239,64 @@ func TestImportDefault(t *testing.T) { }) } }) + t.Run("unique_rowid", func(t *testing.T) { + const M = int(1e9 + 7) // Remainder for unique_rowid addition. + testCases := []struct { + name string + create string + targetCols []string + insert string + rowIDCols []string + }{ + { + name: "multiple_unique_rowid", + create: "a INT DEFAULT unique_rowid(), b INT, c STRING, d INT DEFAULT unique_rowid()", + targetCols: []string{"b", "c"}, + insert: "INSERT INTO t (b, c) VALUES (3, 'CAT'), (4, 'DOG')", + rowIDCols: []string{selectNotNull("a"), selectNotNull("d")}, + }, + { + name: "unique_rowid_with_pk", + create: "a INT DEFAULT unique_rowid(), b INT PRIMARY KEY, c STRING", + targetCols: []string{"b", "c"}, + insert: "INSERT INTO t (b, c) VALUES (-3, 'CAT'), (-4, 'DOG')", + rowIDCols: []string{selectNotNull("a")}, + }, + { + // unique_rowid()+unique_rowid() won't work as the rowid produced by import + // has its leftmost bit set to 1, and adding them causes overflow. A way to + // get around is to have each unique_rowid() modulo a number, M. Here M = 1e9+7 + // is used here given that it's big enough and is a prime, which is + // generally effective in avoiding collisions. + name: "rowid+rowid", + create: fmt.Sprintf( + `a INT DEFAULT (unique_rowid() %% %d) + (unique_rowid() %% %d), b INT PRIMARY KEY, c STRING`, M, M), + targetCols: []string{"b", "c"}, + rowIDCols: []string{selectNotNull("a")}, + }, + } + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + defer sqlDB.Exec(t, `DROP TABLE t`) + sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE t(%s)`, test.create)) + if test.insert != "" { + sqlDB.Exec(t, test.insert) + } + sqlDB.Exec(t, fmt.Sprintf(`IMPORT INTO t (%s) CSV DATA (%s)`, + strings.Join(test.targetCols, ", "), + strings.Join(testFiles.files, ", "))) + var numDistinctRows int + sqlDB.QueryRow(t, + fmt.Sprintf(`SELECT DISTINCT COUNT (*) FROM (%s)`, + strings.Join(test.rowIDCols, " UNION ")), + ).Scan(&numDistinctRows) + var numRows int + sqlDB.QueryRow(t, `SELECT COUNT (*) FROM t`).Scan(&numRows) + require.Equal(t, numDistinctRows, len(test.rowIDCols)*numRows) + }) + + } + }) } // goos: darwin @@ -4369,14 +4435,21 @@ func TestImportPgDumpGeo(t *testing.T) { // Verify both created tables are identical. importCreate := sqlDB.QueryStr(t, "SELECT create_statement FROM [SHOW CREATE importdb.nyc_census_blocks]") - // Families are slightly different due to the geom column being last - // in exec and rowid being last in import, so swap that in import to - // match exec. - importCreate[0][0] = strings.Replace(importCreate[0][0], "geom, rowid", "rowid, geom", 1) + // Families are slightly different due to rowid showing up in exec but + // not import (possibly due to the ALTER TABLE statement that makes + // gid a primary key), so add that into import to match exec. + importCreate[0][0] = strings.Replace(importCreate[0][0], "boroname, geom", "boroname, rowid, geom", 1) sqlDB.CheckQueryResults(t, "SELECT create_statement FROM [SHOW CREATE execdb.nyc_census_blocks]", importCreate) - importSelect := sqlDB.QueryStr(t, "SELECT * FROM importdb.nyc_census_blocks ORDER BY PRIMARY KEY importdb.nyc_census_blocks") - sqlDB.CheckQueryResults(t, "SELECT * FROM execdb.nyc_census_blocks ORDER BY PRIMARY KEY execdb.nyc_census_blocks", importSelect) + importCols := "blkid, popn_total, popn_white, popn_black, popn_nativ, popn_asian, popn_other, boroname" + importSelect := sqlDB.QueryStr(t, fmt.Sprintf( + "SELECT (%s) FROM importdb.nyc_census_blocks ORDER BY PRIMARY KEY importdb.nyc_census_blocks", + importCols, + )) + sqlDB.CheckQueryResults(t, fmt.Sprintf( + "SELECT (%s) FROM execdb.nyc_census_blocks ORDER BY PRIMARY KEY execdb.nyc_census_blocks", + importCols, + ), importSelect) } func TestImportCockroachDump(t *testing.T) { diff --git a/pkg/ccl/importccl/read_import_base.go b/pkg/ccl/importccl/read_import_base.go index 1d2340b547e0..6ceb06e1b254 100644 --- a/pkg/ccl/importccl/read_import_base.go +++ b/pkg/ccl/importccl/read_import_base.go @@ -408,7 +408,7 @@ func makeDatumConverter( ctx context.Context, importCtx *parallelImportContext, fileCtx *importFileContext, ) (*row.DatumRowConverter, error) { conv, err := row.NewDatumRowConverter( - ctx, importCtx.tableDesc, importCtx.targetCols, importCtx.evalCtx.Copy(), importCtx.kvCh) + ctx, importCtx.tableDesc, importCtx.targetCols, importCtx.evalCtx, importCtx.kvCh) if err == nil { conv.KvBatch.Source = fileCtx.source } diff --git a/pkg/ccl/importccl/testdata/pgdump/geo.sql b/pkg/ccl/importccl/testdata/pgdump/geo.sql index b0068f68eb2b..3d030809629d 100644 --- a/pkg/ccl/importccl/testdata/pgdump/geo.sql +++ b/pkg/ccl/importccl/testdata/pgdump/geo.sql @@ -1,11 +1,7 @@ --- The two comments below removing gid are there because IMPORT doesn't --- support DEFAULT functions (#48253). This function is otherwise exactly --- what shp2pgsql produces. - SET CLIENT_ENCODING TO UTF8; SET STANDARD_CONFORMING_STRINGS TO ON; BEGIN; -CREATE TABLE "nyc_census_blocks" (--gid serial, +CREATE TABLE "nyc_census_blocks" (gid serial, "blkid" varchar(15), "popn_total" float8, "popn_white" float8, @@ -14,7 +10,7 @@ CREATE TABLE "nyc_census_blocks" (--gid serial, "popn_asian" float8, "popn_other" float8, "boroname" varchar(32)); ---ALTER TABLE "nyc_census_blocks" ADD PRIMARY KEY (gid); +ALTER TABLE "nyc_census_blocks" ADD PRIMARY KEY (gid); SELECT AddGeometryColumn('','nyc_census_blocks','geom','26918','MULTIPOLYGON',2); INSERT INTO "nyc_census_blocks" ("blkid","popn_total","popn_white","popn_black","popn_nativ","popn_asian","popn_other","boroname",geom) VALUES ('360850009001000','97','51','32','1','5','8','Staten Island','010600002026690000010000000103000000010000000A00000051AC161881A22141A31409CF1F2A51415F4321458DA2214100102A3F1D2A51418C34807C0BA221414E3E89F5122A5141782D605495A12141780D1CE92A2A51410D1C9C6770A121410F2D6074322A5141441560E0B0A02141A00099C72F2A51412365B4789AA021419F60A7BB342A514160E3E8FA66A0214118B4C0CE402A5141EA4BF3EEC7A12141A3023D61452A514151AC161881A22141A31409CF1F2A5141'); INSERT INTO "nyc_census_blocks" ("blkid","popn_total","popn_white","popn_black","popn_nativ","popn_asian","popn_other","boroname",geom) VALUES ('360850020011000','66','52','2','0','7','5','Staten Island','0106000020266900000100000001030000000100000007000000083B4A6F79A8214127EC57B49926514151B51BB7CEA72141B2EAD6F38A2651416F429640B9A72141449FCB1C89265141163AA64D56A72141B89E2B7C9B26514150509213EDA72141DCC9A351A826514184FA4C6017A82141B9AE24F0AB265141083B4A6F79A8214127EC57B499265141'); diff --git a/pkg/sql/row/row_converter.go b/pkg/sql/row/row_converter.go index d0dd375344f0..81df52867637 100644 --- a/pkg/sql/row/row_converter.go +++ b/pkg/sql/row/row_converter.go @@ -22,21 +22,136 @@ import ( "github.com/cockroachdb/errors" ) +// For some functions (specifically the volatile ones), we do +// not want to use the provided builtin. Instead, we opt for +// our own function definition, which produces deterministic results. +func makeBuiltinOverride( + builtin *tree.FunctionDefinition, overloads ...tree.Overload, +) *tree.FunctionDefinition { + props := builtin.FunctionProperties + return tree.NewFunctionDefinition( + "import."+builtin.Name, &props, overloads) +} + +type overrideVolatility bool + +const ( + // cellInfoAddr is the address used to store relevant information + // in the Annotation field of evalCtx when evaluating expressions. + cellInfoAddr tree.AnnotationIdx = iota + 1 + // The following constants are the override volatility constants to + // decide whether a default expression can be evaluated at the new + // datum converter stage. Note that overrideErrorTerm is a placeholder + // to be returned when an error is returned at sanitizeExprForImport. + overrideErrorTerm overrideVolatility = false + overrideImmutable overrideVolatility = false + overrideVolatile overrideVolatility = true +) + +type cellInfoAnnotation struct { + sourceID int32 + rowID int64 + uniqueRowIDInstance int + uniqueRowIDTotal int +} + +func getCellInfoAnnotation(t *tree.Annotations) *cellInfoAnnotation { + return t.Get(cellInfoAddr).(*cellInfoAnnotation) +} + +func (c *cellInfoAnnotation) Reset(sourceID int32, rowID int64) { + c.sourceID = sourceID + c.rowID = rowID + c.uniqueRowIDInstance = 0 +} + +// We don't want to call unique_rowid() for columns with such default expressions +// because it is not idempotent and has unfortunate overlapping of output +// spans since it puts the uniqueness-ensuring per-generator part (nodeID) +// in the low-bits. Instead, make our own IDs that attempt to keep each +// generator (sourceID) writing to its own key-space with sequential +// rowIndexes mapping to sequential unique IDs. This is done by putting the +// following as the lower bits, in order to handle the case where there are +// multiple columns with default as `unique_rowid`: +// +// #default_rowid_cols * rowIndex + colPosition (among those with default unique_rowid) +// +// To avoid collisions with the SQL-genenerated IDs (at least for a +// very long time) we also flip the top bit to 1. +// +// Producing sequential keys in non-overlapping spans for each source yields +// observed improvements in ingestion performance of ~2-3x and even more +// significant reductions in required compactions during IMPORT. +// +// TODO(dt): Note that currently some callers (e.g. CSV IMPORT, which can be +// used on a table more than once) offset their rowIndex by a wall-time at +// which their overall job is run, so that subsequent ingestion jobs pick +// different row IDs for the i'th row and don't collide. However such +// time-offset rowIDs mean each row imported consumes some unit of time that +// must then elapse before the next IMPORT could run without colliding e.g. +// a 100m row file would use 10µs/row or ~17min worth of IDs. For now it is +// likely that IMPORT's write-rate is still the limiting factor, but this +// scheme means rowIndexes are very large (1 yr in 10s of µs is about 2^42). +// Finding an alternative scheme for avoiding collisions (like sourceID * +// fileIndex*desc.Version) could improve on this. For now, if this +// best-effort collision avoidance scheme doesn't work in some cases we can +// just recommend an explicit PK as a workaround. +// +// TODO(anzoteh96): As per the issue in #51004, having too many columns with +// default expression unique_rowid() could cause collisions when IMPORTs are run +// too close to each other. It will therefore be nice to fix this problem. +func importUniqueRowID(evalCtx *tree.EvalContext, args tree.Datums) (tree.Datum, error) { + c := getCellInfoAnnotation(evalCtx.Annotations) + avoidCollisionsWithSQLsIDs := uint64(1 << 63) + shiftedIndex := int64(c.uniqueRowIDTotal)*c.rowID + int64(c.uniqueRowIDInstance) + returnIndex := (uint64(c.sourceID) << rowIDBits) ^ uint64(shiftedIndex) + c.uniqueRowIDInstance++ + evalCtx.Annotations.Set(cellInfoAddr, c) + return tree.NewDInt(tree.DInt(avoidCollisionsWithSQLsIDs | returnIndex)), nil +} + +// Besides overriding, there are also counters that we want to keep track +// of as we walk through the expressions in a row (at datumRowConverter creation +// time). This will be handled by the sideEffect field: it will be called with an +// annotation passed in that changes the counter. In the case of unique_rowid, for +// example, we want to keep track of the total number of unique_rowid occurrences +// in a row. +type customFunc struct { + visitorSideEffect func(annotations *tree.Annotations) + override *tree.FunctionDefinition +} + +var useDefaultBuiltin *customFunc + // Given that imports can be retried and resumed, we want to // ensure that the default functions return the same value given // the same arguments, even on retries. Therfore we decide to support // only a limited subset of non-immutable functions, which are // all listed here. -var supportedImportFunctions = map[string]struct{}{ +var supportedImportFuncOverrides = map[string]*customFunc{ // These methods can be supported given that we set the statement // and transaction timestamp to be equal, i.e. the write timestamp. - "current_date": {}, - "current_timestamp": {}, - "localtimestamp": {}, - "now": {}, - "statement_timestamp": {}, - "timeofday": {}, - "transaction_timestamp": {}, + "current_date": useDefaultBuiltin, + "current_timestamp": useDefaultBuiltin, + "localtimestamp": useDefaultBuiltin, + "now": useDefaultBuiltin, + "statement_timestamp": useDefaultBuiltin, + "timeofday": useDefaultBuiltin, + "transaction_timestamp": useDefaultBuiltin, + "unique_rowid": { + visitorSideEffect: func(annot *tree.Annotations) { + getCellInfoAnnotation(annot).uniqueRowIDTotal++ + }, + override: makeBuiltinOverride( + tree.FunDefs["unique_rowid"], + tree.Overload{ + Types: tree.ArgTypes{}, + ReturnType: tree.FixedReturnType(types.Int), + Fn: importUniqueRowID, + Info: "Returns a unique rowid based on row position and time", + }, + ), + }, } func unsafeExpressionError(err error, msg string, expr string) error { @@ -62,7 +177,13 @@ func (e *unsafeErrExpr) Eval(_ *tree.EvalContext) (tree.Datum, error) { // visitor walks the tree and ensures that any expression in the tree // that's not immutable is what we explicitly support. type importDefaultExprVisitor struct { - err error + err error + ctx context.Context + annotations *tree.Annotations + semaCtx *tree.SemaContext + // The volatility flag will be set if there's at least one volatile + // function appearing in the default expression. + volatility overrideVolatility } // VisitPre implements tree.Visitor interface. @@ -79,40 +200,67 @@ func (v *importDefaultExprVisitor) VisitPost(expr tree.Expr) (newExpr tree.Expr) case *tree.FuncExpr: if fn.ResolvedOverload().Volatility > tree.VolatilityImmutable { resolvedFnName := fn.Func.FunctionReference.(*tree.FunctionDefinition).Name - if _, ok := supportedImportFunctions[resolvedFnName]; !ok { + if custom, isSafe := supportedImportFuncOverrides[resolvedFnName]; !isSafe { v.err = errors.Newf(`function %s unsupported by IMPORT INTO`, resolvedFnName) + } else { + if custom == useDefaultBuiltin { + // No override exists, means it's okay to use the definitions given in + // builtin.go. + return expr + } + // Override exists, so we turn the volatility flag of the visitor to true. + // In addition, the sideEffect function needs to be called to update any + // relevant counter (e.g. the total number of occurrences of the + // unique_rowid function in an expression). + v.volatility = overrideVolatile + if custom.visitorSideEffect != nil { + custom.visitorSideEffect(v.annotations) + } + funcExpr := &tree.FuncExpr{ + Func: tree.ResolvableFunctionReference{FunctionReference: custom.override}, + Type: fn.Type, + Exprs: fn.Exprs, + } + // The override must have appropriate overload defined. + overrideExpr, err := funcExpr.TypeCheck(v.ctx, v.semaCtx, fn.ResolvedType()) + if err != nil { + v.err = errors.Wrapf(err, "error overloading function") + } + return overrideExpr } } } return expr } -// SanitizeExprsForImport checks whether default expressions are supported +// sanitizeExprsForImport checks whether default expressions are supported // for import. -func SanitizeExprsForImport( +func sanitizeExprsForImport( ctx context.Context, evalCtx *tree.EvalContext, expr tree.Expr, targetType *types.T, -) (tree.TypedExpr, error) { +) (tree.TypedExpr, overrideVolatility, error) { semaCtx := tree.MakeSemaContext() // If we have immutable expressions, then we can just return it right away. typedExpr, err := sqlbase.SanitizeVarFreeExpr( ctx, expr, targetType, "import_default", &semaCtx, tree.VolatilityImmutable) if err == nil { - return typedExpr, nil + return typedExpr, overrideImmutable, nil } // Now that the expressions are not immutable, we first check that they // are of the correct type before checking for any unsupported functions // for import. typedExpr, err = tree.TypeCheck(ctx, expr, &semaCtx, targetType) if err != nil { - return nil, unsafeExpressionError(err, "type checking error", expr.String()) + return nil, overrideErrorTerm, + unsafeExpressionError(err, "type checking error", expr.String()) } - v := &importDefaultExprVisitor{} + v := &importDefaultExprVisitor{annotations: evalCtx.Annotations} newExpr, _ := tree.WalkExpr(v, typedExpr) if v.err != nil { - return nil, unsafeExpressionError(v.err, "expr walking error", expr.String()) + return nil, overrideErrorTerm, + unsafeExpressionError(v.err, "expr walking error", expr.String()) } - return newExpr.(tree.TypedExpr), nil + return newExpr.(tree.TypedExpr), v.volatility, nil } // KVInserter implements the putter interface. @@ -290,7 +438,6 @@ type DatumRowConverter struct { IsTargetCol map[int]struct{} // The rest of these are derived from tableDesc, just cached here. - hidden int ri Inserter EvalCtx *tree.EvalContext cols []sqlbase.ColumnDescriptor @@ -327,7 +474,7 @@ func NewDatumRowConverter( c := &DatumRowConverter{ tableDesc: immutDesc, KvCh: kvCh, - EvalCtx: evalCtx, + EvalCtx: evalCtx.Copy(), } var targetColDescriptors []sqlbase.ColumnDescriptor @@ -396,43 +543,37 @@ func NewDatumRowConverter( _, ok := isTargetColID[col.ID] return ok } - c.hidden = -1 + annot := make(tree.Annotations, 1) + annot.Set(cellInfoAddr, &cellInfoAnnotation{uniqueRowIDInstance: 0}) + c.EvalCtx.Annotations = &annot for i := range cols { col := &cols[i] - if col.Hidden { - if col.DefaultExpr == nil || *col.DefaultExpr != "unique_rowid()" || c.hidden != -1 { - return nil, errors.New("unexpected hidden column") - } - c.hidden = i - c.Datums = append(c.Datums, nil) - } else { - if col.DefaultExpr != nil { - // Placeholder for columns with default values that will be evaluated when - // each import row is being created. - c.defaultCache[i], err = SanitizeExprsForImport(ctx, evalCtx, defaultExprs[i], col.Type) - if err != nil { - // This expression may not be safe for import but we don't want to - // call the user out at this stage: targeted columns may not have - // been identified now (e.g. "IMPORT PGDUMP...") and we want to - // throw an error only at the "Row" stage when the targeted columns - // have been identified. - c.defaultCache[i] = &unsafeErrExpr{ - err: errors.Wrapf(err, "default expression %s unsafe for import", defaultExprs[i].String()), - } - } else { - // TODO (anzoteh96): currently, all the functions we support are immutable - // or stable, so it's okay for defaultCache to be evaluated here. However, - // when we add support for unique_rowid, it changes for every row and - // therefore can no longer be evaluated right away. We therefore - // need a flag to determine when we can evaluate here. - c.defaultCache[i], err = c.defaultCache[i].Eval(evalCtx) + if col.DefaultExpr != nil { + // Placeholder for columns with default values that will be evaluated when + // each import row is being created. + typedExpr, volatile, err := sanitizeExprsForImport(ctx, c.EvalCtx, defaultExprs[i], col.Type) + if err != nil { + // This expression may not be safe for import but we don't want to + // call the user out at this stage: targeted columns may not have + // been identified now (e.g. "IMPORT PGDUMP...") and we want to + // throw an error only at the "Row" stage when the targeted columns + // have been identified. + c.defaultCache[i] = &unsafeErrExpr{ + err: errors.Wrapf(err, "default expression %s unsafe for import", defaultExprs[i].String()), + } + } else { + c.defaultCache[i] = typedExpr + if volatile == overrideImmutable { + // This default expression isn't volatile, so we can evaluate once + // here and memoize it. + c.defaultCache[i], err = c.defaultCache[i].Eval(c.EvalCtx) if err != nil { return nil, errors.Wrapf(err, "error evaluating default expression") } } - if !isTargetCol(col) { - c.Datums = append(c.Datums, nil) - } + } + if !isTargetCol(col) { + c.Datums = append(c.Datums, nil) } } } @@ -456,41 +597,14 @@ const rowIDBits = 64 - builtins.NodeIDBits // Row inserts kv operations into the current kv batch, and triggers a SendBatch // if necessary. func (c *DatumRowConverter) Row(ctx context.Context, sourceID int32, rowIndex int64) error { - if c.hidden >= 0 { - // We don't want to call unique_rowid() for the hidden PK column because it - // is not idempotent and has unfortunate overlapping of output spans since - // it puts the uniqueness-ensuring per-generator part (nodeID) in the - // low-bits. Instead, make our own IDs that attempt to keep each generator - // (sourceID) writing to its own key-space with sequential rowIndexes - // mapping to sequential unique IDs, by putting the rowID in the lower - // bits. To avoid collisions with the SQL-genenerated IDs (at least for a - // very long time) we also flip the top bit to 1. - // - // Producing sequential keys in non-overlapping spans for each source yields - // observed improvements in ingestion performance of ~2-3x and even more - // significant reductions in required compactions during IMPORT. - // - // TODO(dt): Note that currently some callers (e.g. CSV IMPORT, which can be - // used on a table more than once) offset their rowIndex by a wall-time at - // which their overall job is run, so that subsequent ingestion jobs pick - // different row IDs for the i'th row and don't collide. However such - // time-offset rowIDs mean each row imported consumes some unit of time that - // must then elapse before the next IMPORT could run without colliding e.g. - // a 100m row file would use 10µs/row or ~17min worth of IDs. For now it is - // likely that IMPORT's write-rate is still the limiting factor, but this - // scheme means rowIndexes are very large (1 yr in 10s of µs is about 2^42). - // Finding an alternative scheme for avoiding collisions (like sourceID * - // fileIndex*desc.Version) could improve on this. For now, if this - // best-effort collision avoidance scheme doesn't work in some cases we can - // just recommend an explicit PK as a workaround. - avoidCollisionsWithSQLsIDs := uint64(1 << 63) - rowID := (uint64(sourceID) << rowIDBits) ^ uint64(rowIndex) - c.Datums[c.hidden] = tree.NewDInt(tree.DInt(avoidCollisionsWithSQLsIDs | rowID)) + isTargetCol := func(i int) bool { + _, ok := c.IsTargetCol[i] + return ok } - + getCellInfoAnnotation(c.EvalCtx.Annotations).Reset(sourceID, rowIndex) for i := range c.cols { col := &c.cols[i] - if _, ok := c.IsTargetCol[i]; !ok && !col.Hidden && col.DefaultExpr != nil { + if !isTargetCol(i) && col.DefaultExpr != nil { datum, err := c.defaultCache[i].Eval(c.EvalCtx) if err != nil { return errors.Wrapf( From 88252dfca5d49256dbf0e4004f6e0ae1c195586f Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Thu, 16 Jul 2020 11:19:25 -0700 Subject: [PATCH 2/4] rowflow,colexec: make routers propagate errors to all non-closed outputs This commit changes the way we propagate the errors in the hash router so that the error metadata is sent on all non-closed streams. Previously, we would be sending it over only the first non-closed stream which could result in the processors on the same stage as that single stream end to treat the absence of rows and errors as the input being exhausted successfully, which is wrong because the input did encounter an error. The same thing has been happening in the vectorized flow, but in that case the problem is less severe - the issue will present itself only when we have wrapped processors (because the materializers will prevent the propagation throughout the whole flow as described below): In the vectorized engine we use panic-catch mechanism of error propagation, and we end up with the following sequence of events: 1. an operator encounters an error on any node (e.g. `colBatchScan` encounters RWUI error on a remote node). It is not an internal vectorized error, so the operator will panic with `colexecerror.ExpectedError`. 2. the panic is caught by one of the catchers (it can be a parallel unordered synchronizer goroutine, an outbox goroutine, a materializer, a hash router) 3. that component will then decide how to propagate the error further: 3.1 if it is a parallel unordered synchronizer, then it will cancel all of its inputs and will repanic 3.2 if it is an outbox, the error is sent as metadata which will be received by an inbox which will panic with it 3.3. if it is a materializer, then it might swallow the error (this is the reason we need for the vectorized hash router to send the error to all of its inputs). The swallowing is acceptable if it is the root materializer though. 3.4 if it is a hash router, it'll cancel all of its outputs and will forward the error on each of the outputs. Release note (bug fix): Previously, CockroachDB could return incorrect results on query that encountered ReadWithinUncertaintyInterval error, and this has been fixed. --- pkg/sql/colexec/routers.go | 12 +- pkg/sql/colexec/routers_test.go | 21 +-- pkg/sql/rowexec/processors_test.go | 4 - pkg/sql/rowflow/routers.go | 54 ++++++-- pkg/sql/rowflow/routers_test.go | 199 ++++++++++++++++++----------- 5 files changed, 178 insertions(+), 112 deletions(-) diff --git a/pkg/sql/colexec/routers.go b/pkg/sql/colexec/routers.go index c71ad521a6f2..ed69257710d8 100644 --- a/pkg/sql/colexec/routers.go +++ b/pkg/sql/colexec/routers.go @@ -634,10 +634,10 @@ func newHashRouterWithOutputs( return r } -// cancelOutputs cancels all outputs and forwards the given error to one output -// if non-nil. The only case where the error is not forwarded if no output could -// be canceled due to an error. In this case each output will forward the error -// returned during cancellation. +// cancelOutputs cancels all outputs and forwards the given error to all of +// them if non-nil. The only case where the error is not forwarded is if no +// output could be canceled due to an error. In this case each output will +// forward the error returned during cancellation. func (r *HashRouter) cancelOutputs(ctx context.Context, errToForward error) { for _, o := range r.outputs { if err := colexecerror.CatchVectorizedRuntimeError(func() { @@ -646,10 +646,6 @@ func (r *HashRouter) cancelOutputs(ctx context.Context, errToForward error) { // If there was an error canceling this output, this error can be // forwarded to whoever is calling Next. o.forwardErr(err) - } else { - // Successful cancellation, which means errToForward was also consumed. - // Set it to nil to not forward it to another output. - errToForward = nil } } } diff --git a/pkg/sql/colexec/routers_test.go b/pkg/sql/colexec/routers_test.go index 0f5ed03c022c..0bd6e691d988 100644 --- a/pkg/sql/colexec/routers_test.go +++ b/pkg/sql/colexec/routers_test.go @@ -1097,29 +1097,16 @@ func TestHashRouterRandom(t *testing.T) { require.NoError(t, resultsByOp[i].err) } } - requireOneError := func(t *testing.T, err error) { + requireErrFromEachOutput := func(t *testing.T, err error) { t.Helper() if err == nil { t.Fatal("use requireNoErrors instead") } for i := range resultsByOp { - if err == nil { - // A match was already found. Since we only expect one error, this - // error must be nil. - require.Nil(t, resultsByOp[i].err, "expected error to be nil") - continue - } if resultsByOp[i].err == nil { - // This result has no error but we have not yet found the expected - // error, continue to another result. - continue + t.Fatalf("unexpectedly no error from %d output", i) } require.True(t, testutils.IsError(resultsByOp[i].err, err.Error()), "unexpected error %v", resultsByOp[i].err) - err = nil - } - if err != nil { - // err is set to nil when a match is found. - t.Fatal("no matching error found") } } @@ -1156,10 +1143,10 @@ func TestHashRouterRandom(t *testing.T) { } } case hashRouterContextCanceled: - requireOneError(t, context.Canceled) + requireErrFromEachOutput(t, context.Canceled) checkMetadata(t, []string{hashRouterMetadataMsg}) case hashRouterOutputErrorOnAddBatch: - requireOneError(t, errors.New(addBatchErrMsg)) + requireErrFromEachOutput(t, errors.New(addBatchErrMsg)) checkMetadata(t, []string{hashRouterMetadataMsg}) case hashRouterOutputErrorOnNext: // If an error is encountered in Next, it is returned to the caller, diff --git a/pkg/sql/rowexec/processors_test.go b/pkg/sql/rowexec/processors_test.go index 63252cfb9cb5..906a61158c6e 100644 --- a/pkg/sql/rowexec/processors_test.go +++ b/pkg/sql/rowexec/processors_test.go @@ -779,9 +779,6 @@ func TestUncertaintyErrorIsReturned(t *testing.T) { // onerow is a table created to test #51458. The value of the only row in this // table is explicitly set to 2 so that it is routed by hash to a desired // destination. - // TODO(asubiotto): The vectorized execution engine probably has the same - // problem and might need a separate table since the hash functions are - // different. sqlutils.CreateTable(t, dbConn, "onerow", "x INT", 1, sqlutils.ToRowFn(func(_ int) tree.Datum { return tree.NewDInt(tree.DInt(2)) })) _, err := dbConn.Exec(fmt.Sprintf(` ALTER TABLE t SPLIT AT VALUES (10), (20); @@ -833,7 +830,6 @@ func TestUncertaintyErrorIsReturned(t *testing.T) { tableNames: []string{"t"}, }, }, - skip: "https://github.com/cockroachdb/cockroach/issues/51458", }, } diff --git a/pkg/sql/rowflow/routers.go b/pkg/sql/rowflow/routers.go index 6eeff161f8ee..902b87b2bd88 100644 --- a/pkg/sql/rowflow/routers.go +++ b/pkg/sql/rowflow/routers.go @@ -405,32 +405,63 @@ func (rb *routerBase) updateStreamState( } } -// fwdMetadata forwards a metadata record to the first stream that's still -// accepting data. +// fwdMetadata forwards a metadata record to streams that are still accepting +// data. Note that if the metadata record contains an error, it is propagated +// to all non-closed streams whereas all other types of metadata are propagated +// only to the first non-closed stream. func (rb *routerBase) fwdMetadata(meta *execinfrapb.ProducerMetadata) { if meta == nil { log.Fatalf(context.TODO(), "asked to fwd empty metadata") } rb.semaphore <- struct{}{} + defer func() { + <-rb.semaphore + }() + if metaErr := meta.Err; metaErr != nil { + // Forward the error to all non-closed streams. + if rb.fwdErrMetadata(metaErr) { + return + } + } else { + // Forward the metadata to the first non-closed stream. + for i := range rb.outputs { + ro := &rb.outputs[i] + ro.mu.Lock() + if ro.mu.streamStatus != execinfra.ConsumerClosed { + ro.addMetadataLocked(meta) + ro.mu.Unlock() + ro.mu.cond.Signal() + return + } + ro.mu.Unlock() + } + } + // If we got here it means that we couldn't even forward metadata anywhere; + // all streams are closed. + atomic.StoreUint32(&rb.aggregatedStatus, uint32(execinfra.ConsumerClosed)) +} +// fwdErrMetadata forwards err to all non-closed streams and returns a boolean +// indicating whether it was sent on at least one stream. Note that this method +// assumes that rb.semaphore has been acquired and leaves it up to the caller +// to release it. +func (rb *routerBase) fwdErrMetadata(err error) bool { + forwarded := false for i := range rb.outputs { ro := &rb.outputs[i] ro.mu.Lock() if ro.mu.streamStatus != execinfra.ConsumerClosed { + meta := &execinfrapb.ProducerMetadata{Err: err} ro.addMetadataLocked(meta) ro.mu.Unlock() ro.mu.cond.Signal() - <-rb.semaphore - return + forwarded = true + } else { + ro.mu.Unlock() } - ro.mu.Unlock() } - - <-rb.semaphore - // If we got here it means that we couldn't even forward metadata anywhere; - // all streams are closed. - atomic.StoreUint32(&rb.aggregatedStatus, uint32(execinfra.ConsumerClosed)) + return forwarded } func (rb *routerBase) shouldUseSemaphore() bool { @@ -491,7 +522,8 @@ func (mr *mirrorRouter) Push( aggStatus := mr.aggStatus() if meta != nil { mr.fwdMetadata(meta) - return aggStatus + // fwdMetadata can change the status, re-read it. + return mr.aggStatus() } if aggStatus != execinfra.NeedMoreRows { return aggStatus diff --git a/pkg/sql/rowflow/routers_test.go b/pkg/sql/rowflow/routers_test.go index d0b39c2cf75b..3e82e0c087ed 100644 --- a/pkg/sql/rowflow/routers_test.go +++ b/pkg/sql/rowflow/routers_test.go @@ -442,9 +442,9 @@ func preimageAttack( } } -// Test that metadata records get forwarded by routers. Regardless of the type -// of router, the records are supposed to be forwarded on the first output -// stream that's not closed. +// Test that metadata records get forwarded by routers. Depending on the type +// of the metadata, it might need to be forward to either one or all non-closed +// streams (regardless of the type of the router). func TestMetadataIsForwarded(t *testing.T) { defer leaktest.AfterTest(t)() @@ -473,89 +473,144 @@ func TestMetadataIsForwarded(t *testing.T) { }, } - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - chans := make([]execinfra.RowChannel, 2) - recvs := make([]execinfra.RowReceiver, 2) - tc.spec.Streams = make([]execinfrapb.StreamEndpointSpec, 2) - for i := 0; i < 2; i++ { - chans[i].InitWithBufSizeAndNumSenders(nil /* no column types */, 1, 1) - recvs[i] = &chans[i] - tc.spec.Streams[i] = execinfrapb.StreamEndpointSpec{StreamID: execinfrapb.StreamID(i)} - } - router, wg := setupRouter(t, st, evalCtx, diskMonitor, tc.spec, nil /* no columns */, recvs) - - err1 := errors.Errorf("test error 1") - err2 := errors.Errorf("test error 2") - err3 := errors.Errorf("test error 3") - err4 := errors.Errorf("test error 4") - - // Push metadata; it should go to stream 0. - for i := 0; i < 10; i++ { - consumerStatus := router.Push(nil /* row */, &execinfrapb.ProducerMetadata{Err: err1}) - if consumerStatus != execinfra.NeedMoreRows { - t.Fatalf("expected status %d, got: %d", execinfra.NeedMoreRows, consumerStatus) + // metaConfigs describe different configuration of metadata handling. It + // assumes the test is working in 4 stages with the following events: + // - stage 1 is finished with ConsumerDone() call on stream 0 + // - stage 2 is finished with ConsumerClosed() call on stream 0 (at this + // point only stream 1 is non-closed) + // - stage 3 is finished with ConsumerClosed() call on stream 1 (at this + // point all streams are closed). + metaConfigs := []struct { + name string + getMeta func(stage int) *execinfrapb.ProducerMetadata + // getReceiverStreamIDs returns the streamIDs of streams that are + // expected to receive the metadata on the given stage. + getReceiverStreamIDs func(stage int) []int + assertExpected func(streamID int, meta *execinfrapb.ProducerMetadata, stage int) + }{ + { + name: "error", + getMeta: func(stage int) *execinfrapb.ProducerMetadata { + return &execinfrapb.ProducerMetadata{ + Err: errors.Errorf("test error %d", stage), } - _, meta := chans[0].Next() - if !errors.Is(meta.Err, err1) { - t.Fatalf("unexpected meta.Err %v, expected %s", meta.Err, err1) + }, + getReceiverStreamIDs: func(stage int) []int { + switch stage { + case 1, 2: + // Errors are propagated to all non-closed streams. + return []int{0, 1} + default: + // Stream 0 is closed after stage 2, so now only stream 1 + // is expected to receive metadata. + return []int{1} } - } - - chans[0].ConsumerDone() - // Push metadata; it should still go to stream 0. - for i := 0; i < 10; i++ { - consumerStatus := router.Push(nil /* row */, &execinfrapb.ProducerMetadata{Err: err2}) - if consumerStatus != execinfra.NeedMoreRows { - t.Fatalf("expected status %d, got: %d", execinfra.NeedMoreRows, consumerStatus) + }, + assertExpected: func(streamID int, meta *execinfrapb.ProducerMetadata, stage int) { + expected := errors.Errorf("test error %d", stage) + if !errors.Is(meta.Err, expected) { + t.Fatalf("stream %d: unexpected meta.Err %v, expected %s", streamID, meta.Err, expected) } - _, meta := chans[0].Next() - if !errors.Is(meta.Err, err2) { - t.Fatalf("unexpected meta.Err %v, expected %s", meta.Err, err2) + }, + }, + { + name: "non-error", + getMeta: func(stage int) *execinfrapb.ProducerMetadata { + return &execinfrapb.ProducerMetadata{ + RowNum: &execinfrapb.RemoteProducerMetadata_RowNum{RowNum: int32(stage)}, } - } - - chans[0].ConsumerClosed() + }, + getReceiverStreamIDs: func(stage int) []int { + switch stage { + case 1, 2: + return []int{0} + default: + return []int{1} + } + }, + assertExpected: func(streamID int, meta *execinfrapb.ProducerMetadata, stage int) { + if meta.RowNum.RowNum != int32(stage) { + t.Fatalf("streamID %d: unexpected meta %v, expected RowNum=%d in stage %d", streamID, meta, stage, stage) + } + }, + }, + } - // Metadata should switch to going to stream 1 once the new status is - // observed. - testutils.SucceedsSoon(t, func() error { - consumerStatus := router.Push(nil /* row */, &execinfrapb.ProducerMetadata{Err: err3}) - if consumerStatus != execinfra.NeedMoreRows { - t.Fatalf("expected status %d, got: %d", execinfra.NeedMoreRows, consumerStatus) + for _, tc := range testCases { + for _, metaConfig := range metaConfigs { + t.Run(fmt.Sprintf("%s/%s", tc.name, metaConfig.name), func(t *testing.T) { + chans := make([]execinfra.RowChannel, 2) + recvs := make([]execinfra.RowReceiver, 2) + tc.spec.Streams = make([]execinfrapb.StreamEndpointSpec, 2) + for i := 0; i < 2; i++ { + chans[i].InitWithBufSizeAndNumSenders(nil /* no column types */, 1, 1) + recvs[i] = &chans[i] + tc.spec.Streams[i] = execinfrapb.StreamEndpointSpec{StreamID: execinfrapb.StreamID(i)} } - // Receive on stream 1 if there is a message waiting. Metadata may still - // try to go to 0 for a little while. - select { - case d := <-chans[1].C: - if !errors.Is(d.Meta.Err, err3) { - t.Fatalf("unexpected meta.Err %v, expected %s", d.Meta.Err, err3) + router, wg := setupRouter(t, st, evalCtx, diskMonitor, tc.spec, nil /* no columns */, recvs) + + stage := 1 + for i := 0; i < 10; i++ { + consumerStatus := router.Push(nil /* row */, metaConfig.getMeta(stage)) + if consumerStatus != execinfra.NeedMoreRows { + t.Fatalf("expected status %d, got: %d", execinfra.NeedMoreRows, consumerStatus) + } + for _, streamID := range metaConfig.getReceiverStreamIDs(stage) { + _, meta := chans[streamID].Next() + metaConfig.assertExpected(streamID, meta, stage) } - return nil - default: - return errors.Errorf("no metadata on stream 1") } - }) + chans[0].ConsumerDone() - chans[1].ConsumerClosed() + stage = 2 + for i := 0; i < 10; i++ { + consumerStatus := router.Push(nil /* row */, metaConfig.getMeta(stage)) + if consumerStatus != execinfra.NeedMoreRows { + t.Fatalf("expected status %d, got: %d", execinfra.NeedMoreRows, consumerStatus) + } + for _, streamID := range metaConfig.getReceiverStreamIDs(stage) { + _, meta := chans[streamID].Next() + metaConfig.assertExpected(streamID, meta, stage) + } + } + chans[0].ConsumerClosed() - // Start drain the channels in the background. - for i := range chans { - go drainRowChannel(&chans[i]) - } + stage = 3 + testutils.SucceedsSoon(t, func() error { + consumerStatus := router.Push(nil /* row */, metaConfig.getMeta(stage)) + if consumerStatus != execinfra.NeedMoreRows { + t.Fatalf("expected status %d, got: %d", execinfra.NeedMoreRows, consumerStatus) + } + // Receive on stream 1 if there is a message waiting. Metadata may still + // try to go to 0 for a little while. + select { + case d := <-chans[1].C: + metaConfig.assertExpected(1 /* streamID */, d.Meta, stage) + return nil + default: + return errors.Errorf("no metadata on stream 1") + } + }) + chans[1].ConsumerClosed() - testutils.SucceedsSoon(t, func() error { - consumerStatus := router.Push(nil /* row */, &execinfrapb.ProducerMetadata{Err: err4}) - if consumerStatus != execinfra.ConsumerClosed { - return fmt.Errorf("expected status %d, got: %d", execinfra.ConsumerClosed, consumerStatus) + stage = 4 + // Start drain the channels in the background. + for i := range chans { + go drainRowChannel(&chans[i]) } - return nil - }) + testutils.SucceedsSoon(t, func() error { + consumerStatus := router.Push(nil /* row */, metaConfig.getMeta(stage)) + if consumerStatus != execinfra.ConsumerClosed { + return fmt.Errorf("expected status %d, got: %d", execinfra.ConsumerClosed, consumerStatus) + } + return nil + }) - router.ProducerDone() + router.ProducerDone() - wg.Wait() - }) + wg.Wait() + }) + } } } From 60de8f036198f5082537e183f82152f4ebe0568b Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Tue, 28 Jul 2020 10:43:25 -0700 Subject: [PATCH 3/4] colexec: re-enable short-circuiting in the hash joiner This commit re-enables short-circuiting logic in the hash joiner when the build side is empty (it was temporarily disabled because of 48785 which has been fixed). Release note: None --- pkg/sql/colexec/hashjoiner.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/pkg/sql/colexec/hashjoiner.go b/pkg/sql/colexec/hashjoiner.go index 347862ea28ed..16f2d3a9cb5e 100644 --- a/pkg/sql/colexec/hashjoiner.go +++ b/pkg/sql/colexec/hashjoiner.go @@ -256,11 +256,7 @@ func (hj *hashJoiner) Next(ctx context.Context) coldata.Batch { hj.spec.joinType == sqlbase.RightOuterJoin || hj.spec.joinType == sqlbase.LeftSemiJoin || hj.spec.joinType == sqlbase.IntersectAllJoin { - // The short-circuiting behavior is temporarily disabled - // because it causes flakiness of some tests due to #48785 - // (concurrent calls to DrainMeta and Next). - // TODO(asubiotto): remove this once the issue is resolved. - // hj.state = hjDone + hj.state = hjDone continue } } From 94ac60f0aee726306a9c123e533d4b401f68706a Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Tue, 28 Jul 2020 12:00:27 -0700 Subject: [PATCH 4/4] sql: skip TestQueryProgress This test started failing more often, so we'll skip it temporarily until we figure it out. Release note: None --- pkg/sql/conn_executor_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/sql/conn_executor_test.go b/pkg/sql/conn_executor_test.go index 68f8b3fd6bff..4c9aa1cf9609 100644 --- a/pkg/sql/conn_executor_test.go +++ b/pkg/sql/conn_executor_test.go @@ -38,6 +38,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -495,6 +496,8 @@ func TestQueryProgress(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + skip.WithIssue(t, 51356) + const rows, kvBatchSize = 1000, 50 defer rowexec.TestingSetScannedRowProgressFrequency(rows / 60)()