From d775ec7673bf5af3635103a699e900e7762c78f4 Mon Sep 17 00:00:00 2001 From: anzoteh96 Date: Wed, 1 Jul 2020 20:53:41 -0400 Subject: [PATCH] importccl: support `unique_rowid()` as default expression for IMPORT INTO The PR #50295 supports non-targeted columns with constant expression. This PR is a follow up to that in adding support to `unique_rowid()`. This is done by assigning the same value of `rowid` that was generated at the `IMPORT stage (row converter) as the default value. Release note (general change): IMPORT INTO now supports `unique_rowid()` as a default expression. --- pkg/ccl/importccl/import_stmt_test.go | 82 ++++++- pkg/ccl/importccl/read_import_pgdump.go | 2 +- pkg/ccl/importccl/testdata/pgdump/geo.sql | 8 +- pkg/sql/row/row_converter.go | 249 +++++++++++++++------- 4 files changed, 252 insertions(+), 89 deletions(-) diff --git a/pkg/ccl/importccl/import_stmt_test.go b/pkg/ccl/importccl/import_stmt_test.go index bba91dac1c6c..a41f90fa65cd 100644 --- a/pkg/ccl/importccl/import_stmt_test.go +++ b/pkg/ccl/importccl/import_stmt_test.go @@ -2946,6 +2946,10 @@ func BenchmarkCSVConvertRecord(b *testing.B) { b.ReportAllocs() } +func selectNotNull(col string) string { + return fmt.Sprintf(`SELECT %s FROM t WHERE %s IS NOT NULL`, col, col) +} + // Test that IMPORT INTO works when columns with default expressions are present. // The default expressions supported by IMPORT INTO are constant expressions, // which are literals and functions that always return the same value given the @@ -2958,6 +2962,10 @@ func TestImportDefault(t *testing.T) { defer log.Scope(t).Close(t) const nodes = 3 + numFiles := nodes + 2 + rowsPerFile := 1000 + rowsPerRaceFile := 16 + testFiles := makeCSVData(t, numFiles, rowsPerFile, nodes, rowsPerRaceFile) ctx := context.Background() baseDir := filepath.Join("testdata", "csv") @@ -3231,6 +3239,59 @@ func TestImportDefault(t *testing.T) { }) } }) + t.Run("unique_rowid", func(t *testing.T) { + const M = int(1e9 + 7) // Remainder for unique_rowid addition. + testCases := []struct { + name string + create string + targetCols []string + insert string + rowIDCols []string + }{ + { + name: "multiple_unique_rowid", + create: "a INT DEFAULT unique_rowid(), b INT, c STRING, d INT DEFAULT unique_rowid()", + targetCols: []string{"b", "c"}, + insert: "INSERT INTO t (b, c) VALUES (3, 'CAT'), (4, 'DOG')", + rowIDCols: []string{selectNotNull("a"), selectNotNull("d")}, + }, + { + name: "unique_rowid_with_pk", + create: "a INT DEFAULT unique_rowid(), b INT PRIMARY KEY, c STRING", + targetCols: []string{"b", "c"}, + insert: "INSERT INTO t (b, c) VALUES (-3, 'CAT'), (-4, 'DOG')", + rowIDCols: []string{selectNotNull("a")}, + }, + { + name: "rowid+rowid", + create: fmt.Sprintf( + `a INT DEFAULT (unique_rowid() %% %d) + (unique_rowid() %% %d), b INT PRIMARY KEY, c STRING`, M, M), + targetCols: []string{"b", "c"}, + rowIDCols: []string{selectNotNull("a")}, + }, + } + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + defer sqlDB.Exec(t, `DROP TABLE t`) + sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE t(%s)`, test.create)) + if test.insert != "" { + sqlDB.Exec(t, test.insert) + } + sqlDB.Exec(t, fmt.Sprintf(`IMPORT INTO t (%s) CSV DATA (%s)`, + strings.Join(test.targetCols, ", "), + strings.Join(testFiles.files, ", "))) + var numDistinctRows int + sqlDB.QueryRow(t, + fmt.Sprintf(`SELECT DISTINCT COUNT (*) FROM (%s)`, + strings.Join(test.rowIDCols, " UNION ")), + ).Scan(&numDistinctRows) + var numRows int + sqlDB.QueryRow(t, `SELECT COUNT (*) FROM t`).Scan(&numRows) + require.Equal(t, numDistinctRows, len(test.rowIDCols)*numRows) + }) + + } + }) } // goos: darwin @@ -4369,14 +4430,23 @@ func TestImportPgDumpGeo(t *testing.T) { // Verify both created tables are identical. importCreate := sqlDB.QueryStr(t, "SELECT create_statement FROM [SHOW CREATE importdb.nyc_census_blocks]") - // Families are slightly different due to the geom column being last - // in exec and rowid being last in import, so swap that in import to - // match exec. - importCreate[0][0] = strings.Replace(importCreate[0][0], "geom, rowid", "rowid, geom", 1) + // Families are slightly different due to that rowid shows up in exec + // but not import (possibly due to the ALTER TABLE statement that makes + // gid a primary key), so add that into import to match exec. + importCreate[0][0] = strings.Replace(importCreate[0][0], "boroname, geom", "boroname, rowid, geom", 1) sqlDB.CheckQueryResults(t, "SELECT create_statement FROM [SHOW CREATE execdb.nyc_census_blocks]", importCreate) - importSelect := sqlDB.QueryStr(t, "SELECT * FROM importdb.nyc_census_blocks ORDER BY PRIMARY KEY importdb.nyc_census_blocks") - sqlDB.CheckQueryResults(t, "SELECT * FROM execdb.nyc_census_blocks ORDER BY PRIMARY KEY execdb.nyc_census_blocks", importSelect) + // Drop the comparison of gid for import vs exec, then check that gid + // in import is indeed valid rowid. + importCols := "blkid, popn_total, popn_white, popn_black, popn_nativ, popn_asian, popn_other, boroname" + importSelect := sqlDB.QueryStr(t, fmt.Sprintf( + "SELECT (%s) FROM importdb.nyc_census_blocks ORDER BY PRIMARY KEY importdb.nyc_census_blocks", + importCols, + )) + sqlDB.CheckQueryResults(t, fmt.Sprintf( + "SELECT (%s) FROM execdb.nyc_census_blocks ORDER BY PRIMARY KEY execdb.nyc_census_blocks", + importCols, + ), importSelect) } func TestImportCockroachDump(t *testing.T) { diff --git a/pkg/ccl/importccl/read_import_pgdump.go b/pkg/ccl/importccl/read_import_pgdump.go index 12fe406cd597..a765b9097070 100644 --- a/pkg/ccl/importccl/read_import_pgdump.go +++ b/pkg/ccl/importccl/read_import_pgdump.go @@ -508,7 +508,7 @@ func newPgDumpReader( for i, col := range table.Desc.VisibleColumns() { colSubMap[col.Name] = i } - conv, err := row.NewDatumRowConverter(ctx, table.Desc, targetCols, evalCtx, kvCh) + conv, err := row.NewDatumRowConverter(ctx, table.Desc, targetCols, evalCtx.Copy(), kvCh) if err != nil { return nil, err } diff --git a/pkg/ccl/importccl/testdata/pgdump/geo.sql b/pkg/ccl/importccl/testdata/pgdump/geo.sql index b0068f68eb2b..3d030809629d 100644 --- a/pkg/ccl/importccl/testdata/pgdump/geo.sql +++ b/pkg/ccl/importccl/testdata/pgdump/geo.sql @@ -1,11 +1,7 @@ --- The two comments below removing gid are there because IMPORT doesn't --- support DEFAULT functions (#48253). This function is otherwise exactly --- what shp2pgsql produces. - SET CLIENT_ENCODING TO UTF8; SET STANDARD_CONFORMING_STRINGS TO ON; BEGIN; -CREATE TABLE "nyc_census_blocks" (--gid serial, +CREATE TABLE "nyc_census_blocks" (gid serial, "blkid" varchar(15), "popn_total" float8, "popn_white" float8, @@ -14,7 +10,7 @@ CREATE TABLE "nyc_census_blocks" (--gid serial, "popn_asian" float8, "popn_other" float8, "boroname" varchar(32)); ---ALTER TABLE "nyc_census_blocks" ADD PRIMARY KEY (gid); +ALTER TABLE "nyc_census_blocks" ADD PRIMARY KEY (gid); SELECT AddGeometryColumn('','nyc_census_blocks','geom','26918','MULTIPOLYGON',2); INSERT INTO "nyc_census_blocks" ("blkid","popn_total","popn_white","popn_black","popn_nativ","popn_asian","popn_other","boroname",geom) VALUES ('360850009001000','97','51','32','1','5','8','Staten Island','010600002026690000010000000103000000010000000A00000051AC161881A22141A31409CF1F2A51415F4321458DA2214100102A3F1D2A51418C34807C0BA221414E3E89F5122A5141782D605495A12141780D1CE92A2A51410D1C9C6770A121410F2D6074322A5141441560E0B0A02141A00099C72F2A51412365B4789AA021419F60A7BB342A514160E3E8FA66A0214118B4C0CE402A5141EA4BF3EEC7A12141A3023D61452A514151AC161881A22141A31409CF1F2A5141'); INSERT INTO "nyc_census_blocks" ("blkid","popn_total","popn_white","popn_black","popn_nativ","popn_asian","popn_other","boroname",geom) VALUES ('360850020011000','66','52','2','0','7','5','Staten Island','0106000020266900000100000001030000000100000007000000083B4A6F79A8214127EC57B49926514151B51BB7CEA72141B2EAD6F38A2651416F429640B9A72141449FCB1C89265141163AA64D56A72141B89E2B7C9B26514150509213EDA72141DCC9A351A826514184FA4C6017A82141B9AE24F0AB265141083B4A6F79A8214127EC57B499265141'); diff --git a/pkg/sql/row/row_converter.go b/pkg/sql/row/row_converter.go index d0dd375344f0..56aa0bd366d8 100644 --- a/pkg/sql/row/row_converter.go +++ b/pkg/sql/row/row_converter.go @@ -22,21 +22,117 @@ import ( "github.com/cockroachdb/errors" ) +// For some functions (specifically the volatile ones), we do +// not want to use the provided builtin. Instead, we opt for +// our own function definition, which produces deterministic results. +func makeBuiltinOverride( + builtin *tree.FunctionDefinition, overloads ...tree.Overload, +) *tree.FunctionDefinition { + props := builtin.FunctionProperties + return tree.NewFunctionDefinition( + "import."+builtin.Name, &props, overloads) +} + +// These constants are addresses used to store relevant information +// in the Annotation field of evalCtx when evaluating expressions. +const ( + cellInfoAddr tree.AnnotationIdx = iota + 1 +) + +type cellInfoAnnot struct { + sourceID int32 + rowID int64 + uniqueRowIDInst int + uniqueRowIDTotal int +} + +// We don't want to call unique_rowid() for columns with such default expressions +// because it is not idempotent and has unfortunate overlapping of output +// spans since it puts the uniqueness-ensuring per-generator part (nodeID) +// in the low-bits. Instead, make our own IDs that attempt to keep each +// generator (sourceID) writing to its own key-space with sequential +// rowIndexes mapping to sequential unique IDs. This is done by putting the +// following as the lower bits, in order to handle the case where there are +// multiple columns with default as `unique_rowid`: +// +// #default_rowid_cols * rowIndex + colPosition (among those with default unique_rowid) +// +// To avoid collisions with the SQL-genenerated IDs (at least for a +// very long time) we also flip the top bit to 1. +// +// Producing sequential keys in non-overlapping spans for each source yields +// observed improvements in ingestion performance of ~2-3x and even more +// significant reductions in required compactions during IMPORT. +// +// TODO(dt): Note that currently some callers (e.g. CSV IMPORT, which can be +// used on a table more than once) offset their rowIndex by a wall-time at +// which their overall job is run, so that subsequent ingestion jobs pick +// different row IDs for the i'th row and don't collide. However such +// time-offset rowIDs mean each row imported consumes some unit of time that +// must then elapse before the next IMPORT could run without colliding e.g. +// a 100m row file would use 10µs/row or ~17min worth of IDs. For now it is +// likely that IMPORT's write-rate is still the limiting factor, but this +// scheme means rowIndexes are very large (1 yr in 10s of µs is about 2^42). +// Finding an alternative scheme for avoiding collisions (like sourceID * +// fileIndex*desc.Version) could improve on this. For now, if this +// best-effort collision avoidance scheme doesn't work in some cases we can +// just recommend an explicit PK as a workaround. +// +// TODO(anzoteh96): As per the issue in #51004, having too many columns with +// default expression unique_rowid() could cause collisions when IMPORTs are run +// too close to each other. It will therefore be nice to fix this problem. +func importUniqueRowID(evalCtx *tree.EvalContext, args tree.Datums) (tree.Datum, error) { + c := (*evalCtx.Annotations).Get(cellInfoAddr).(cellInfoAnnot) + avoidCollisionsWithSQLsIDs := uint64(1 << 63) + shiftedIndex := int64(c.uniqueRowIDTotal)*c.rowID + int64(c.uniqueRowIDInst) + returnIndex := (uint64(c.sourceID) << rowIDBits) ^ uint64(shiftedIndex) + c.uniqueRowIDInst++ + (*evalCtx.Annotations).Set(cellInfoAddr, c) + return tree.NewDInt(tree.DInt(avoidCollisionsWithSQLsIDs | returnIndex)), nil +} + +// Besides overriding, there are also counters that we want to keep track +// of as we walk through the expressions in a row (at datumRowConverter creation +// time). This will be handled by the sideEffect field: it will be called with an +// annotation passed in that changes the counter. In the case of unique_rowid, for +// example, we want to keep track of the total number of unique_rowid occurences +// in a row. +type customFunc struct { + sideEffect func(annotations *tree.Annotations) + override *tree.FunctionDefinition +} + // Given that imports can be retried and resumed, we want to // ensure that the default functions return the same value given // the same arguments, even on retries. Therfore we decide to support // only a limited subset of non-immutable functions, which are // all listed here. -var supportedImportFunctions = map[string]struct{}{ +var supportedImportFuncOverrides = map[string]*customFunc{ // These methods can be supported given that we set the statement // and transaction timestamp to be equal, i.e. the write timestamp. - "current_date": {}, - "current_timestamp": {}, - "localtimestamp": {}, - "now": {}, - "statement_timestamp": {}, - "timeofday": {}, - "transaction_timestamp": {}, + "current_date": nil, + "current_timestamp": nil, + "localtimestamp": nil, + "now": nil, + "statement_timestamp": nil, + "timeofday": nil, + "transaction_timestamp": nil, + "unique_rowid": { + sideEffect: func(annot *tree.Annotations) { + c := annot.Get(cellInfoAddr).(cellInfoAnnot) + c.uniqueRowIDTotal++ + annot.Set(cellInfoAddr, c) + }, + override: makeBuiltinOverride( + tree.FunDefs["unique_rowid"], + tree.Overload{ + Types: tree.ArgTypes{}, + ReturnType: tree.FixedReturnType(types.Int), + Fn: importUniqueRowID, + Info: "Returns a unique rowid based on row position and time", + }, + ), + }, } func unsafeExpressionError(err error, msg string, expr string) error { @@ -62,7 +158,11 @@ func (e *unsafeErrExpr) Eval(_ *tree.EvalContext) (tree.Datum, error) { // visitor walks the tree and ensures that any expression in the tree // that's not immutable is what we explicitly support. type importDefaultExprVisitor struct { - err error + err error + ctx context.Context + annot *tree.Annotations + semaCtx *tree.SemaContext + volatile bool } // VisitPre implements tree.Visitor interface. @@ -79,8 +179,30 @@ func (v *importDefaultExprVisitor) VisitPost(expr tree.Expr) (newExpr tree.Expr) case *tree.FuncExpr: if fn.ResolvedOverload().Volatility > tree.VolatilityImmutable { resolvedFnName := fn.Func.FunctionReference.(*tree.FunctionDefinition).Name - if _, ok := supportedImportFunctions[resolvedFnName]; !ok { + if custom, isSafe := supportedImportFuncOverrides[resolvedFnName]; !isSafe { v.err = errors.Newf(`function %s unsupported by IMPORT INTO`, resolvedFnName) + } else { + // No override exists, means it's okay to use the definitions given in builtin.go. + if custom == nil { + return expr + } + // Override exists, so we turn the volatile flag of the visitor to true. + // In addition, the sideEffect function needs to be called to update any + // relevant counter (e.g. the total number of occurrences of the unique_rowid + // function in an expression). + v.volatile = true + custom.sideEffect(v.annot) + funcExpr := &tree.FuncExpr{ + Func: tree.ResolvableFunctionReference{FunctionReference: custom.override}, + Type: fn.Type, + Exprs: fn.Exprs, + } + // The override must have appropriate overload defined. + overrideExpr, err := funcExpr.TypeCheck(v.ctx, v.semaCtx, fn.ResolvedType()) + if err != nil { + v.err = errors.Wrapf(err, "error overloading function") + } + return overrideExpr } } } @@ -91,28 +213,28 @@ func (v *importDefaultExprVisitor) VisitPost(expr tree.Expr) (newExpr tree.Expr) // for import. func SanitizeExprsForImport( ctx context.Context, evalCtx *tree.EvalContext, expr tree.Expr, targetType *types.T, -) (tree.TypedExpr, error) { +) (tree.TypedExpr, bool, error) { semaCtx := tree.MakeSemaContext() // If we have immutable expressions, then we can just return it right away. typedExpr, err := sqlbase.SanitizeVarFreeExpr( ctx, expr, targetType, "import_default", &semaCtx, tree.VolatilityImmutable) if err == nil { - return typedExpr, nil + return typedExpr, false, nil } // Now that the expressions are not immutable, we first check that they // are of the correct type before checking for any unsupported functions // for import. typedExpr, err = tree.TypeCheck(ctx, expr, &semaCtx, targetType) if err != nil { - return nil, unsafeExpressionError(err, "type checking error", expr.String()) + return nil, false, unsafeExpressionError(err, "type checking error", expr.String()) } - v := &importDefaultExprVisitor{} + v := &importDefaultExprVisitor{annot: evalCtx.Annotations} newExpr, _ := tree.WalkExpr(v, typedExpr) if v.err != nil { - return nil, unsafeExpressionError(v.err, "expr walking error", expr.String()) + return nil, false, unsafeExpressionError(v.err, "expr walking error", expr.String()) } - return newExpr.(tree.TypedExpr), nil + return newExpr.(tree.TypedExpr), v.volatile, nil } // KVInserter implements the putter interface. @@ -290,7 +412,6 @@ type DatumRowConverter struct { IsTargetCol map[int]struct{} // The rest of these are derived from tableDesc, just cached here. - hidden int ri Inserter EvalCtx *tree.EvalContext cols []sqlbase.ColumnDescriptor @@ -396,43 +517,42 @@ func NewDatumRowConverter( _, ok := isTargetColID[col.ID] return ok } - c.hidden = -1 + hidden := -1 + annot := make(tree.Annotations, 1) + annot.Set(cellInfoAddr, cellInfoAnnot{uniqueRowIDInst: 0}) + c.EvalCtx.Annotations = &annot for i := range cols { col := &cols[i] if col.Hidden { - if col.DefaultExpr == nil || *col.DefaultExpr != "unique_rowid()" || c.hidden != -1 { + if col.DefaultExpr == nil || *col.DefaultExpr != "unique_rowid()" || hidden != -1 { return nil, errors.New("unexpected hidden column") } - c.hidden = i - c.Datums = append(c.Datums, nil) - } else { - if col.DefaultExpr != nil { - // Placeholder for columns with default values that will be evaluated when - // each import row is being created. - c.defaultCache[i], err = SanitizeExprsForImport(ctx, evalCtx, defaultExprs[i], col.Type) - if err != nil { - // This expression may not be safe for import but we don't want to - // call the user out at this stage: targeted columns may not have - // been identified now (e.g. "IMPORT PGDUMP...") and we want to - // throw an error only at the "Row" stage when the targeted columns - // have been identified. - c.defaultCache[i] = &unsafeErrExpr{ - err: errors.Wrapf(err, "default expression %s unsafe for import", defaultExprs[i].String()), - } - } else { - // TODO (anzoteh96): currently, all the functions we support are immutable - // or stable, so it's okay for defaultCache to be evaluated here. However, - // when we add support for unique_rowid, it changes for every row and - // therefore can no longer be evaluated right away. We therefore - // need a flag to determine when we can evaluate here. + hidden = i + } + if col.DefaultExpr != nil { + // Placeholder for columns with default values that will be evaluated when + // each import row is being created. + typedExpr, volatile, err := SanitizeExprsForImport(ctx, evalCtx, defaultExprs[i], col.Type) + if err != nil { + // This expression may not be safe for import but we don't want to + // call the user out at this stage: targeted columns may not have + // been identified now (e.g. "IMPORT PGDUMP...") and we want to + // throw an error only at the "Row" stage when the targeted columns + // have been identified. + c.defaultCache[i] = &unsafeErrExpr{ + err: errors.Wrapf(err, "default expression %s unsafe for import", defaultExprs[i].String()), + } + } else { + c.defaultCache[i] = typedExpr + if !volatile { c.defaultCache[i], err = c.defaultCache[i].Eval(evalCtx) if err != nil { return nil, errors.Wrapf(err, "error evaluating default expression") } } - if !isTargetCol(col) { - c.Datums = append(c.Datums, nil) - } + } + if !isTargetCol(col) { + c.Datums = append(c.Datums, nil) } } } @@ -456,41 +576,18 @@ const rowIDBits = 64 - builtins.NodeIDBits // Row inserts kv operations into the current kv batch, and triggers a SendBatch // if necessary. func (c *DatumRowConverter) Row(ctx context.Context, sourceID int32, rowIndex int64) error { - if c.hidden >= 0 { - // We don't want to call unique_rowid() for the hidden PK column because it - // is not idempotent and has unfortunate overlapping of output spans since - // it puts the uniqueness-ensuring per-generator part (nodeID) in the - // low-bits. Instead, make our own IDs that attempt to keep each generator - // (sourceID) writing to its own key-space with sequential rowIndexes - // mapping to sequential unique IDs, by putting the rowID in the lower - // bits. To avoid collisions with the SQL-genenerated IDs (at least for a - // very long time) we also flip the top bit to 1. - // - // Producing sequential keys in non-overlapping spans for each source yields - // observed improvements in ingestion performance of ~2-3x and even more - // significant reductions in required compactions during IMPORT. - // - // TODO(dt): Note that currently some callers (e.g. CSV IMPORT, which can be - // used on a table more than once) offset their rowIndex by a wall-time at - // which their overall job is run, so that subsequent ingestion jobs pick - // different row IDs for the i'th row and don't collide. However such - // time-offset rowIDs mean each row imported consumes some unit of time that - // must then elapse before the next IMPORT could run without colliding e.g. - // a 100m row file would use 10µs/row or ~17min worth of IDs. For now it is - // likely that IMPORT's write-rate is still the limiting factor, but this - // scheme means rowIndexes are very large (1 yr in 10s of µs is about 2^42). - // Finding an alternative scheme for avoiding collisions (like sourceID * - // fileIndex*desc.Version) could improve on this. For now, if this - // best-effort collision avoidance scheme doesn't work in some cases we can - // just recommend an explicit PK as a workaround. - avoidCollisionsWithSQLsIDs := uint64(1 << 63) - rowID := (uint64(sourceID) << rowIDBits) ^ uint64(rowIndex) - c.Datums[c.hidden] = tree.NewDInt(tree.DInt(avoidCollisionsWithSQLsIDs | rowID)) + isTargetCol := func(i int) bool { + _, ok := c.IsTargetCol[i] + return ok } - + annot := c.EvalCtx.Annotations.Get(cellInfoAddr).(cellInfoAnnot) + annot.uniqueRowIDInst = 0 + annot.sourceID = sourceID + annot.rowID = rowIndex + c.EvalCtx.Annotations.Set(cellInfoAddr, annot) for i := range c.cols { col := &c.cols[i] - if _, ok := c.IsTargetCol[i]; !ok && !col.Hidden && col.DefaultExpr != nil { + if !isTargetCol(i) && col.DefaultExpr != nil { datum, err := c.defaultCache[i].Eval(c.EvalCtx) if err != nil { return errors.Wrapf(