From 0eaed19428ec194368685eaa5eda24904e65a936 Mon Sep 17 00:00:00 2001 From: anzoteh96 Date: Wed, 1 Jul 2020 20:53:41 -0400 Subject: [PATCH] importccl: support `unique_rowid()` as default expression for IMPORT INTO The PR #50295 supports non-targeted columns with constant expression. This PR is a follow up to that in adding support to `unique_rowid()`. Previously, the only support given to rowid as a default expression is for hidden column, which is a function of timestamp, row number, and source ID (the ID of processor). To accommodate for more usage of unique_rowid(), this PR modifies the unique_rowid function by making unique_rowid as a function of timestamp, row number, source ID, the total occurrences of unique_rowid in the table schema, and instances of each unique_rowid within each row. In addition, this PR also modifies the visitor method #51390 by adding override methods for volatile methods like unique_rowid. Annotations containing the total occurrences of unique_rowid and unique_rowid instances within a row are stored inside evalCtx, which will be read and updated when visitor walks through the default expression at the sanitization stage, and when default expression is evaluated at each row. Partially addresses #48253 Release note (general change): IMPORT INTO now supports `unique_rowid()` as a default expression. --- pkg/ccl/importccl/import_stmt_test.go | 85 ++++++- pkg/ccl/importccl/read_import_base.go | 2 +- pkg/ccl/importccl/testdata/pgdump/geo.sql | 8 +- pkg/sql/row/row_converter.go | 280 +++++++++++++++------- 4 files changed, 279 insertions(+), 96 deletions(-) diff --git a/pkg/ccl/importccl/import_stmt_test.go b/pkg/ccl/importccl/import_stmt_test.go index bba91dac1c6c..ff7d3f2adf1b 100644 --- a/pkg/ccl/importccl/import_stmt_test.go +++ b/pkg/ccl/importccl/import_stmt_test.go @@ -2946,6 +2946,10 @@ func BenchmarkCSVConvertRecord(b *testing.B) { b.ReportAllocs() } +func selectNotNull(col string) string { + return fmt.Sprintf(`SELECT %s FROM t WHERE %s IS NOT NULL`, col, col) +} + // Test that IMPORT INTO works when columns with default expressions are present. // The default expressions supported by IMPORT INTO are constant expressions, // which are literals and functions that always return the same value given the @@ -2958,6 +2962,10 @@ func TestImportDefault(t *testing.T) { defer log.Scope(t).Close(t) const nodes = 3 + numFiles := nodes + 2 + rowsPerFile := 1000 + rowsPerRaceFile := 16 + testFiles := makeCSVData(t, numFiles, rowsPerFile, nodes, rowsPerRaceFile) ctx := context.Background() baseDir := filepath.Join("testdata", "csv") @@ -3231,6 +3239,64 @@ func TestImportDefault(t *testing.T) { }) } }) + t.Run("unique_rowid", func(t *testing.T) { + const M = int(1e9 + 7) // Remainder for unique_rowid addition. + testCases := []struct { + name string + create string + targetCols []string + insert string + rowIDCols []string + }{ + { + name: "multiple_unique_rowid", + create: "a INT DEFAULT unique_rowid(), b INT, c STRING, d INT DEFAULT unique_rowid()", + targetCols: []string{"b", "c"}, + insert: "INSERT INTO t (b, c) VALUES (3, 'CAT'), (4, 'DOG')", + rowIDCols: []string{selectNotNull("a"), selectNotNull("d")}, + }, + { + name: "unique_rowid_with_pk", + create: "a INT DEFAULT unique_rowid(), b INT PRIMARY KEY, c STRING", + targetCols: []string{"b", "c"}, + insert: "INSERT INTO t (b, c) VALUES (-3, 'CAT'), (-4, 'DOG')", + rowIDCols: []string{selectNotNull("a")}, + }, + { + // unique_rowid()+unique_rowid() won't work as the rowid produced by import + // has its leftmost bit set to 1, and adding them causes overflow. A way to + // get around is to have each unique_rowid() modulo a number, M. Here M = 1e9+7 + // is used here given that it's big enough and is a prime, which is + // generally effective in avoiding collisions. + name: "rowid+rowid", + create: fmt.Sprintf( + `a INT DEFAULT (unique_rowid() %% %d) + (unique_rowid() %% %d), b INT PRIMARY KEY, c STRING`, M, M), + targetCols: []string{"b", "c"}, + rowIDCols: []string{selectNotNull("a")}, + }, + } + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + defer sqlDB.Exec(t, `DROP TABLE t`) + sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE t(%s)`, test.create)) + if test.insert != "" { + sqlDB.Exec(t, test.insert) + } + sqlDB.Exec(t, fmt.Sprintf(`IMPORT INTO t (%s) CSV DATA (%s)`, + strings.Join(test.targetCols, ", "), + strings.Join(testFiles.files, ", "))) + var numDistinctRows int + sqlDB.QueryRow(t, + fmt.Sprintf(`SELECT DISTINCT COUNT (*) FROM (%s)`, + strings.Join(test.rowIDCols, " UNION ")), + ).Scan(&numDistinctRows) + var numRows int + sqlDB.QueryRow(t, `SELECT COUNT (*) FROM t`).Scan(&numRows) + require.Equal(t, numDistinctRows, len(test.rowIDCols)*numRows) + }) + + } + }) } // goos: darwin @@ -4369,14 +4435,21 @@ func TestImportPgDumpGeo(t *testing.T) { // Verify both created tables are identical. importCreate := sqlDB.QueryStr(t, "SELECT create_statement FROM [SHOW CREATE importdb.nyc_census_blocks]") - // Families are slightly different due to the geom column being last - // in exec and rowid being last in import, so swap that in import to - // match exec. - importCreate[0][0] = strings.Replace(importCreate[0][0], "geom, rowid", "rowid, geom", 1) + // Families are slightly different due to rowid showing up in exec but + // not import (possibly due to the ALTER TABLE statement that makes + // gid a primary key), so add that into import to match exec. + importCreate[0][0] = strings.Replace(importCreate[0][0], "boroname, geom", "boroname, rowid, geom", 1) sqlDB.CheckQueryResults(t, "SELECT create_statement FROM [SHOW CREATE execdb.nyc_census_blocks]", importCreate) - importSelect := sqlDB.QueryStr(t, "SELECT * FROM importdb.nyc_census_blocks ORDER BY PRIMARY KEY importdb.nyc_census_blocks") - sqlDB.CheckQueryResults(t, "SELECT * FROM execdb.nyc_census_blocks ORDER BY PRIMARY KEY execdb.nyc_census_blocks", importSelect) + importCols := "blkid, popn_total, popn_white, popn_black, popn_nativ, popn_asian, popn_other, boroname" + importSelect := sqlDB.QueryStr(t, fmt.Sprintf( + "SELECT (%s) FROM importdb.nyc_census_blocks ORDER BY PRIMARY KEY importdb.nyc_census_blocks", + importCols, + )) + sqlDB.CheckQueryResults(t, fmt.Sprintf( + "SELECT (%s) FROM execdb.nyc_census_blocks ORDER BY PRIMARY KEY execdb.nyc_census_blocks", + importCols, + ), importSelect) } func TestImportCockroachDump(t *testing.T) { diff --git a/pkg/ccl/importccl/read_import_base.go b/pkg/ccl/importccl/read_import_base.go index 1d2340b547e0..6ceb06e1b254 100644 --- a/pkg/ccl/importccl/read_import_base.go +++ b/pkg/ccl/importccl/read_import_base.go @@ -408,7 +408,7 @@ func makeDatumConverter( ctx context.Context, importCtx *parallelImportContext, fileCtx *importFileContext, ) (*row.DatumRowConverter, error) { conv, err := row.NewDatumRowConverter( - ctx, importCtx.tableDesc, importCtx.targetCols, importCtx.evalCtx.Copy(), importCtx.kvCh) + ctx, importCtx.tableDesc, importCtx.targetCols, importCtx.evalCtx, importCtx.kvCh) if err == nil { conv.KvBatch.Source = fileCtx.source } diff --git a/pkg/ccl/importccl/testdata/pgdump/geo.sql b/pkg/ccl/importccl/testdata/pgdump/geo.sql index b0068f68eb2b..3d030809629d 100644 --- a/pkg/ccl/importccl/testdata/pgdump/geo.sql +++ b/pkg/ccl/importccl/testdata/pgdump/geo.sql @@ -1,11 +1,7 @@ --- The two comments below removing gid are there because IMPORT doesn't --- support DEFAULT functions (#48253). This function is otherwise exactly --- what shp2pgsql produces. - SET CLIENT_ENCODING TO UTF8; SET STANDARD_CONFORMING_STRINGS TO ON; BEGIN; -CREATE TABLE "nyc_census_blocks" (--gid serial, +CREATE TABLE "nyc_census_blocks" (gid serial, "blkid" varchar(15), "popn_total" float8, "popn_white" float8, @@ -14,7 +10,7 @@ CREATE TABLE "nyc_census_blocks" (--gid serial, "popn_asian" float8, "popn_other" float8, "boroname" varchar(32)); ---ALTER TABLE "nyc_census_blocks" ADD PRIMARY KEY (gid); +ALTER TABLE "nyc_census_blocks" ADD PRIMARY KEY (gid); SELECT AddGeometryColumn('','nyc_census_blocks','geom','26918','MULTIPOLYGON',2); INSERT INTO "nyc_census_blocks" ("blkid","popn_total","popn_white","popn_black","popn_nativ","popn_asian","popn_other","boroname",geom) VALUES ('360850009001000','97','51','32','1','5','8','Staten Island','010600002026690000010000000103000000010000000A00000051AC161881A22141A31409CF1F2A51415F4321458DA2214100102A3F1D2A51418C34807C0BA221414E3E89F5122A5141782D605495A12141780D1CE92A2A51410D1C9C6770A121410F2D6074322A5141441560E0B0A02141A00099C72F2A51412365B4789AA021419F60A7BB342A514160E3E8FA66A0214118B4C0CE402A5141EA4BF3EEC7A12141A3023D61452A514151AC161881A22141A31409CF1F2A5141'); INSERT INTO "nyc_census_blocks" ("blkid","popn_total","popn_white","popn_black","popn_nativ","popn_asian","popn_other","boroname",geom) VALUES ('360850020011000','66','52','2','0','7','5','Staten Island','0106000020266900000100000001030000000100000007000000083B4A6F79A8214127EC57B49926514151B51BB7CEA72141B2EAD6F38A2651416F429640B9A72141449FCB1C89265141163AA64D56A72141B89E2B7C9B26514150509213EDA72141DCC9A351A826514184FA4C6017A82141B9AE24F0AB265141083B4A6F79A8214127EC57B499265141'); diff --git a/pkg/sql/row/row_converter.go b/pkg/sql/row/row_converter.go index d0dd375344f0..81df52867637 100644 --- a/pkg/sql/row/row_converter.go +++ b/pkg/sql/row/row_converter.go @@ -22,21 +22,136 @@ import ( "github.com/cockroachdb/errors" ) +// For some functions (specifically the volatile ones), we do +// not want to use the provided builtin. Instead, we opt for +// our own function definition, which produces deterministic results. +func makeBuiltinOverride( + builtin *tree.FunctionDefinition, overloads ...tree.Overload, +) *tree.FunctionDefinition { + props := builtin.FunctionProperties + return tree.NewFunctionDefinition( + "import."+builtin.Name, &props, overloads) +} + +type overrideVolatility bool + +const ( + // cellInfoAddr is the address used to store relevant information + // in the Annotation field of evalCtx when evaluating expressions. + cellInfoAddr tree.AnnotationIdx = iota + 1 + // The following constants are the override volatility constants to + // decide whether a default expression can be evaluated at the new + // datum converter stage. Note that overrideErrorTerm is a placeholder + // to be returned when an error is returned at sanitizeExprForImport. + overrideErrorTerm overrideVolatility = false + overrideImmutable overrideVolatility = false + overrideVolatile overrideVolatility = true +) + +type cellInfoAnnotation struct { + sourceID int32 + rowID int64 + uniqueRowIDInstance int + uniqueRowIDTotal int +} + +func getCellInfoAnnotation(t *tree.Annotations) *cellInfoAnnotation { + return t.Get(cellInfoAddr).(*cellInfoAnnotation) +} + +func (c *cellInfoAnnotation) Reset(sourceID int32, rowID int64) { + c.sourceID = sourceID + c.rowID = rowID + c.uniqueRowIDInstance = 0 +} + +// We don't want to call unique_rowid() for columns with such default expressions +// because it is not idempotent and has unfortunate overlapping of output +// spans since it puts the uniqueness-ensuring per-generator part (nodeID) +// in the low-bits. Instead, make our own IDs that attempt to keep each +// generator (sourceID) writing to its own key-space with sequential +// rowIndexes mapping to sequential unique IDs. This is done by putting the +// following as the lower bits, in order to handle the case where there are +// multiple columns with default as `unique_rowid`: +// +// #default_rowid_cols * rowIndex + colPosition (among those with default unique_rowid) +// +// To avoid collisions with the SQL-genenerated IDs (at least for a +// very long time) we also flip the top bit to 1. +// +// Producing sequential keys in non-overlapping spans for each source yields +// observed improvements in ingestion performance of ~2-3x and even more +// significant reductions in required compactions during IMPORT. +// +// TODO(dt): Note that currently some callers (e.g. CSV IMPORT, which can be +// used on a table more than once) offset their rowIndex by a wall-time at +// which their overall job is run, so that subsequent ingestion jobs pick +// different row IDs for the i'th row and don't collide. However such +// time-offset rowIDs mean each row imported consumes some unit of time that +// must then elapse before the next IMPORT could run without colliding e.g. +// a 100m row file would use 10µs/row or ~17min worth of IDs. For now it is +// likely that IMPORT's write-rate is still the limiting factor, but this +// scheme means rowIndexes are very large (1 yr in 10s of µs is about 2^42). +// Finding an alternative scheme for avoiding collisions (like sourceID * +// fileIndex*desc.Version) could improve on this. For now, if this +// best-effort collision avoidance scheme doesn't work in some cases we can +// just recommend an explicit PK as a workaround. +// +// TODO(anzoteh96): As per the issue in #51004, having too many columns with +// default expression unique_rowid() could cause collisions when IMPORTs are run +// too close to each other. It will therefore be nice to fix this problem. +func importUniqueRowID(evalCtx *tree.EvalContext, args tree.Datums) (tree.Datum, error) { + c := getCellInfoAnnotation(evalCtx.Annotations) + avoidCollisionsWithSQLsIDs := uint64(1 << 63) + shiftedIndex := int64(c.uniqueRowIDTotal)*c.rowID + int64(c.uniqueRowIDInstance) + returnIndex := (uint64(c.sourceID) << rowIDBits) ^ uint64(shiftedIndex) + c.uniqueRowIDInstance++ + evalCtx.Annotations.Set(cellInfoAddr, c) + return tree.NewDInt(tree.DInt(avoidCollisionsWithSQLsIDs | returnIndex)), nil +} + +// Besides overriding, there are also counters that we want to keep track +// of as we walk through the expressions in a row (at datumRowConverter creation +// time). This will be handled by the sideEffect field: it will be called with an +// annotation passed in that changes the counter. In the case of unique_rowid, for +// example, we want to keep track of the total number of unique_rowid occurrences +// in a row. +type customFunc struct { + visitorSideEffect func(annotations *tree.Annotations) + override *tree.FunctionDefinition +} + +var useDefaultBuiltin *customFunc + // Given that imports can be retried and resumed, we want to // ensure that the default functions return the same value given // the same arguments, even on retries. Therfore we decide to support // only a limited subset of non-immutable functions, which are // all listed here. -var supportedImportFunctions = map[string]struct{}{ +var supportedImportFuncOverrides = map[string]*customFunc{ // These methods can be supported given that we set the statement // and transaction timestamp to be equal, i.e. the write timestamp. - "current_date": {}, - "current_timestamp": {}, - "localtimestamp": {}, - "now": {}, - "statement_timestamp": {}, - "timeofday": {}, - "transaction_timestamp": {}, + "current_date": useDefaultBuiltin, + "current_timestamp": useDefaultBuiltin, + "localtimestamp": useDefaultBuiltin, + "now": useDefaultBuiltin, + "statement_timestamp": useDefaultBuiltin, + "timeofday": useDefaultBuiltin, + "transaction_timestamp": useDefaultBuiltin, + "unique_rowid": { + visitorSideEffect: func(annot *tree.Annotations) { + getCellInfoAnnotation(annot).uniqueRowIDTotal++ + }, + override: makeBuiltinOverride( + tree.FunDefs["unique_rowid"], + tree.Overload{ + Types: tree.ArgTypes{}, + ReturnType: tree.FixedReturnType(types.Int), + Fn: importUniqueRowID, + Info: "Returns a unique rowid based on row position and time", + }, + ), + }, } func unsafeExpressionError(err error, msg string, expr string) error { @@ -62,7 +177,13 @@ func (e *unsafeErrExpr) Eval(_ *tree.EvalContext) (tree.Datum, error) { // visitor walks the tree and ensures that any expression in the tree // that's not immutable is what we explicitly support. type importDefaultExprVisitor struct { - err error + err error + ctx context.Context + annotations *tree.Annotations + semaCtx *tree.SemaContext + // The volatility flag will be set if there's at least one volatile + // function appearing in the default expression. + volatility overrideVolatility } // VisitPre implements tree.Visitor interface. @@ -79,40 +200,67 @@ func (v *importDefaultExprVisitor) VisitPost(expr tree.Expr) (newExpr tree.Expr) case *tree.FuncExpr: if fn.ResolvedOverload().Volatility > tree.VolatilityImmutable { resolvedFnName := fn.Func.FunctionReference.(*tree.FunctionDefinition).Name - if _, ok := supportedImportFunctions[resolvedFnName]; !ok { + if custom, isSafe := supportedImportFuncOverrides[resolvedFnName]; !isSafe { v.err = errors.Newf(`function %s unsupported by IMPORT INTO`, resolvedFnName) + } else { + if custom == useDefaultBuiltin { + // No override exists, means it's okay to use the definitions given in + // builtin.go. + return expr + } + // Override exists, so we turn the volatility flag of the visitor to true. + // In addition, the sideEffect function needs to be called to update any + // relevant counter (e.g. the total number of occurrences of the + // unique_rowid function in an expression). + v.volatility = overrideVolatile + if custom.visitorSideEffect != nil { + custom.visitorSideEffect(v.annotations) + } + funcExpr := &tree.FuncExpr{ + Func: tree.ResolvableFunctionReference{FunctionReference: custom.override}, + Type: fn.Type, + Exprs: fn.Exprs, + } + // The override must have appropriate overload defined. + overrideExpr, err := funcExpr.TypeCheck(v.ctx, v.semaCtx, fn.ResolvedType()) + if err != nil { + v.err = errors.Wrapf(err, "error overloading function") + } + return overrideExpr } } } return expr } -// SanitizeExprsForImport checks whether default expressions are supported +// sanitizeExprsForImport checks whether default expressions are supported // for import. -func SanitizeExprsForImport( +func sanitizeExprsForImport( ctx context.Context, evalCtx *tree.EvalContext, expr tree.Expr, targetType *types.T, -) (tree.TypedExpr, error) { +) (tree.TypedExpr, overrideVolatility, error) { semaCtx := tree.MakeSemaContext() // If we have immutable expressions, then we can just return it right away. typedExpr, err := sqlbase.SanitizeVarFreeExpr( ctx, expr, targetType, "import_default", &semaCtx, tree.VolatilityImmutable) if err == nil { - return typedExpr, nil + return typedExpr, overrideImmutable, nil } // Now that the expressions are not immutable, we first check that they // are of the correct type before checking for any unsupported functions // for import. typedExpr, err = tree.TypeCheck(ctx, expr, &semaCtx, targetType) if err != nil { - return nil, unsafeExpressionError(err, "type checking error", expr.String()) + return nil, overrideErrorTerm, + unsafeExpressionError(err, "type checking error", expr.String()) } - v := &importDefaultExprVisitor{} + v := &importDefaultExprVisitor{annotations: evalCtx.Annotations} newExpr, _ := tree.WalkExpr(v, typedExpr) if v.err != nil { - return nil, unsafeExpressionError(v.err, "expr walking error", expr.String()) + return nil, overrideErrorTerm, + unsafeExpressionError(v.err, "expr walking error", expr.String()) } - return newExpr.(tree.TypedExpr), nil + return newExpr.(tree.TypedExpr), v.volatility, nil } // KVInserter implements the putter interface. @@ -290,7 +438,6 @@ type DatumRowConverter struct { IsTargetCol map[int]struct{} // The rest of these are derived from tableDesc, just cached here. - hidden int ri Inserter EvalCtx *tree.EvalContext cols []sqlbase.ColumnDescriptor @@ -327,7 +474,7 @@ func NewDatumRowConverter( c := &DatumRowConverter{ tableDesc: immutDesc, KvCh: kvCh, - EvalCtx: evalCtx, + EvalCtx: evalCtx.Copy(), } var targetColDescriptors []sqlbase.ColumnDescriptor @@ -396,43 +543,37 @@ func NewDatumRowConverter( _, ok := isTargetColID[col.ID] return ok } - c.hidden = -1 + annot := make(tree.Annotations, 1) + annot.Set(cellInfoAddr, &cellInfoAnnotation{uniqueRowIDInstance: 0}) + c.EvalCtx.Annotations = &annot for i := range cols { col := &cols[i] - if col.Hidden { - if col.DefaultExpr == nil || *col.DefaultExpr != "unique_rowid()" || c.hidden != -1 { - return nil, errors.New("unexpected hidden column") - } - c.hidden = i - c.Datums = append(c.Datums, nil) - } else { - if col.DefaultExpr != nil { - // Placeholder for columns with default values that will be evaluated when - // each import row is being created. - c.defaultCache[i], err = SanitizeExprsForImport(ctx, evalCtx, defaultExprs[i], col.Type) - if err != nil { - // This expression may not be safe for import but we don't want to - // call the user out at this stage: targeted columns may not have - // been identified now (e.g. "IMPORT PGDUMP...") and we want to - // throw an error only at the "Row" stage when the targeted columns - // have been identified. - c.defaultCache[i] = &unsafeErrExpr{ - err: errors.Wrapf(err, "default expression %s unsafe for import", defaultExprs[i].String()), - } - } else { - // TODO (anzoteh96): currently, all the functions we support are immutable - // or stable, so it's okay for defaultCache to be evaluated here. However, - // when we add support for unique_rowid, it changes for every row and - // therefore can no longer be evaluated right away. We therefore - // need a flag to determine when we can evaluate here. - c.defaultCache[i], err = c.defaultCache[i].Eval(evalCtx) + if col.DefaultExpr != nil { + // Placeholder for columns with default values that will be evaluated when + // each import row is being created. + typedExpr, volatile, err := sanitizeExprsForImport(ctx, c.EvalCtx, defaultExprs[i], col.Type) + if err != nil { + // This expression may not be safe for import but we don't want to + // call the user out at this stage: targeted columns may not have + // been identified now (e.g. "IMPORT PGDUMP...") and we want to + // throw an error only at the "Row" stage when the targeted columns + // have been identified. + c.defaultCache[i] = &unsafeErrExpr{ + err: errors.Wrapf(err, "default expression %s unsafe for import", defaultExprs[i].String()), + } + } else { + c.defaultCache[i] = typedExpr + if volatile == overrideImmutable { + // This default expression isn't volatile, so we can evaluate once + // here and memoize it. + c.defaultCache[i], err = c.defaultCache[i].Eval(c.EvalCtx) if err != nil { return nil, errors.Wrapf(err, "error evaluating default expression") } } - if !isTargetCol(col) { - c.Datums = append(c.Datums, nil) - } + } + if !isTargetCol(col) { + c.Datums = append(c.Datums, nil) } } } @@ -456,41 +597,14 @@ const rowIDBits = 64 - builtins.NodeIDBits // Row inserts kv operations into the current kv batch, and triggers a SendBatch // if necessary. func (c *DatumRowConverter) Row(ctx context.Context, sourceID int32, rowIndex int64) error { - if c.hidden >= 0 { - // We don't want to call unique_rowid() for the hidden PK column because it - // is not idempotent and has unfortunate overlapping of output spans since - // it puts the uniqueness-ensuring per-generator part (nodeID) in the - // low-bits. Instead, make our own IDs that attempt to keep each generator - // (sourceID) writing to its own key-space with sequential rowIndexes - // mapping to sequential unique IDs, by putting the rowID in the lower - // bits. To avoid collisions with the SQL-genenerated IDs (at least for a - // very long time) we also flip the top bit to 1. - // - // Producing sequential keys in non-overlapping spans for each source yields - // observed improvements in ingestion performance of ~2-3x and even more - // significant reductions in required compactions during IMPORT. - // - // TODO(dt): Note that currently some callers (e.g. CSV IMPORT, which can be - // used on a table more than once) offset their rowIndex by a wall-time at - // which their overall job is run, so that subsequent ingestion jobs pick - // different row IDs for the i'th row and don't collide. However such - // time-offset rowIDs mean each row imported consumes some unit of time that - // must then elapse before the next IMPORT could run without colliding e.g. - // a 100m row file would use 10µs/row or ~17min worth of IDs. For now it is - // likely that IMPORT's write-rate is still the limiting factor, but this - // scheme means rowIndexes are very large (1 yr in 10s of µs is about 2^42). - // Finding an alternative scheme for avoiding collisions (like sourceID * - // fileIndex*desc.Version) could improve on this. For now, if this - // best-effort collision avoidance scheme doesn't work in some cases we can - // just recommend an explicit PK as a workaround. - avoidCollisionsWithSQLsIDs := uint64(1 << 63) - rowID := (uint64(sourceID) << rowIDBits) ^ uint64(rowIndex) - c.Datums[c.hidden] = tree.NewDInt(tree.DInt(avoidCollisionsWithSQLsIDs | rowID)) + isTargetCol := func(i int) bool { + _, ok := c.IsTargetCol[i] + return ok } - + getCellInfoAnnotation(c.EvalCtx.Annotations).Reset(sourceID, rowIndex) for i := range c.cols { col := &c.cols[i] - if _, ok := c.IsTargetCol[i]; !ok && !col.Hidden && col.DefaultExpr != nil { + if !isTargetCol(i) && col.DefaultExpr != nil { datum, err := c.defaultCache[i].Eval(c.EvalCtx) if err != nil { return errors.Wrapf(