diff --git a/pkg/ccl/importccl/import_processor.go b/pkg/ccl/importccl/import_processor.go index 1b9f8903b915..dce74ddd39dc 100644 --- a/pkg/ccl/importccl/import_processor.go +++ b/pkg/ccl/importccl/import_processor.go @@ -156,12 +156,12 @@ func makeInputConverter( spec.Format.MysqlOut, kvCh, spec.WalltimeNanos, int(spec.ReaderParallelism), singleTable, evalCtx) case roachpb.IOFileFormat_Mysqldump: - return newMysqldumpReader(ctx, kvCh, spec.Tables, evalCtx) + return newMysqldumpReader(ctx, kvCh, spec.WalltimeNanos, spec.Tables, evalCtx) case roachpb.IOFileFormat_PgCopy: return newPgCopyReader(spec.Format.PgCopy, kvCh, spec.WalltimeNanos, int(spec.ReaderParallelism), singleTable, evalCtx) case roachpb.IOFileFormat_PgDump: - return newPgDumpReader(ctx, kvCh, spec.Format.PgDump, spec.Tables, evalCtx) + return newPgDumpReader(ctx, kvCh, spec.Format.PgDump, spec.WalltimeNanos, spec.Tables, evalCtx) case roachpb.IOFileFormat_Avro: return newAvroInputReader( kvCh, singleTable, spec.Format.Avro, spec.WalltimeNanos, diff --git a/pkg/ccl/importccl/import_stmt_test.go b/pkg/ccl/importccl/import_stmt_test.go index becd153f897f..1ebe802711e0 100644 --- a/pkg/ccl/importccl/import_stmt_test.go +++ b/pkg/ccl/importccl/import_stmt_test.go @@ -3114,14 +3114,6 @@ func TestImportDefault(t *testing.T) { format: "CSV", expectedResults: [][]string{{"1", "102"}, {"2", "102"}}, }, - { - name: "random", - data: "1\n2", - create: `a INT, b FLOAT DEFAULT random()`, - targetCols: "a", - format: "CSV", - expectedError: "unsafe for import", - }, { name: "nextval", sequence: "testseq", @@ -3131,22 +3123,6 @@ func TestImportDefault(t *testing.T) { format: "CSV", expectedError: "unsafe for import", }, - { - name: "random_plus_timestamp", - data: "1\n2", - create: "a INT, b INT DEFAULT (100*random())::int + current_timestamp()::int", - targetCols: "a", - format: "CSV", - expectedError: "unsafe for import", - }, - { - name: "deep_nesting_with_random", - data: "1\n2", - create: "a INT, b INT DEFAULT (1 + 2 + (100 * round(3 + random())::int)) * 5 + 3", - targetCols: "a", - format: "CSV", - expectedError: "unsafe for import", - }, // TODO (anzoteh96): add AVRO format, and also MySQL and PGDUMP once // IMPORT INTO are supported for these file formats. } @@ -3336,6 +3312,69 @@ func TestImportDefault(t *testing.T) { } }) + t.Run("random-related", func(t *testing.T) { + testCases := []struct { + name string + create string + targetCols []string + randomCols []string + data string + }{ + { + name: "random-multiple", + create: "a INT, b FLOAT DEFAULT random(), c STRING, d FLOAT DEFAULT random()", + targetCols: []string{"a", "c"}, + randomCols: []string{selectNotNull("b"), selectNotNull("d")}, + }, + { + name: "gen_random_uuid", + create: "a INT, b STRING, c UUID DEFAULT gen_random_uuid()", + targetCols: []string{"a", "b"}, + randomCols: []string{selectNotNull("c")}, + }, + { + name: "mixed_random_uuid", + create: "a INT, b STRING, c UUID DEFAULT gen_random_uuid(), d FLOAT DEFAULT random()", + targetCols: []string{"a", "b"}, + randomCols: []string{selectNotNull("c")}, + }, + { + name: "random_with_targeted", + create: "a INT, b FLOAT DEFAULT random(), d FLOAT DEFAULT random()", + targetCols: []string{"a", "b"}, + randomCols: []string{selectNotNull("d")}, + data: "1,0.37\n2,0.455\n3,3.14\n4,0.246\n5,0.42", + }, + // TODO (anzoteh96): create a testcase for AVRO once we manage to extract + // targeted columns from the AVRO schema. + } + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + defer sqlDB.Exec(t, `DROP TABLE t`) + sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE t(%s)`, test.create)) + fileName := strings.Join(testFiles.files, ", ") + if test.data != "" { + data = test.data + fileName = fmt.Sprintf(`%q`, srv.URL) + } + // Let's do 3 IMPORTs for each test case to ensure that the values produced + // do not overlap. + for i := 0; i < 3; i++ { + sqlDB.Exec(t, fmt.Sprintf(`IMPORT INTO t (%s) CSV DATA (%s)`, + strings.Join(test.targetCols, ", "), + fileName)) + } + var numDistinctRows int + sqlDB.QueryRow(t, + fmt.Sprintf(`SELECT DISTINCT COUNT (*) FROM (%s)`, + strings.Join(test.randomCols, " UNION ")), + ).Scan(&numDistinctRows) + var numRows int + sqlDB.QueryRow(t, `SELECT COUNT (*) FROM t`).Scan(&numRows) + require.Equal(t, numDistinctRows, len(test.randomCols)*numRows) + }) + } + }) } // goos: darwin diff --git a/pkg/ccl/importccl/read_import_base.go b/pkg/ccl/importccl/read_import_base.go index 6070646db046..ec7759f93749 100644 --- a/pkg/ccl/importccl/read_import_base.go +++ b/pkg/ccl/importccl/read_import_base.go @@ -607,6 +607,12 @@ func (p *parallelImporter) flush(ctx context.Context) error { } } +func timestampAfterEpoch(walltime int64) uint64 { + epoch := time.Date(2015, time.January, 1, 0, 0, 0, 0, time.UTC).UnixNano() + const precision = uint64(10 * time.Microsecond) + return uint64(walltime-epoch) / precision +} + func (p *parallelImporter) importWorker( ctx context.Context, workerID int, @@ -624,9 +630,7 @@ func (p *parallelImporter) importWorker( } var rowNum int64 - epoch := time.Date(2015, time.January, 1, 0, 0, 0, 0, time.UTC).UnixNano() - const precision = uint64(10 * time.Microsecond) - timestamp := uint64(importCtx.walltime-epoch) / precision + timestamp := timestampAfterEpoch(importCtx.walltime) conv.CompletedRowFn = func() int64 { m := emittedRowLowWatermark(workerID, rowNum, minEmitted) diff --git a/pkg/ccl/importccl/read_import_mysql.go b/pkg/ccl/importccl/read_import_mysql.go index ccddc08b6615..cf5c58df6412 100644 --- a/pkg/ccl/importccl/read_import_mysql.go +++ b/pkg/ccl/importccl/read_import_mysql.go @@ -50,6 +50,7 @@ type mysqldumpReader struct { tables map[string]*row.DatumRowConverter kvCh chan row.KVBatch debugRow func(tree.Datums) + walltime int64 } var _ inputConverter = &mysqldumpReader{} @@ -57,10 +58,11 @@ var _ inputConverter = &mysqldumpReader{} func newMysqldumpReader( ctx context.Context, kvCh chan row.KVBatch, + walltime int64, tables map[string]*execinfrapb.ReadImportDataSpec_ImportTable, evalCtx *tree.EvalContext, ) (*mysqldumpReader, error) { - res := &mysqldumpReader{evalCtx: evalCtx, kvCh: kvCh} + res := &mysqldumpReader{evalCtx: evalCtx, kvCh: kvCh, walltime: walltime} converters := make(map[string]*row.DatumRowConverter, len(tables)) for name, table := range tables { @@ -132,6 +134,7 @@ func (m *mysqldumpReader) readFile( return errors.Errorf("missing schema info for requested table %q", name) } inserts++ + timestamp := timestampAfterEpoch(m.walltime) rows, ok := i.Rows.(mysql.Values) if !ok { return errors.Errorf( @@ -156,7 +159,7 @@ func (m *mysqldumpReader) readFile( } conv.Datums[i] = converted } - if err := conv.Row(ctx, inputIdx, count); err != nil { + if err := conv.Row(ctx, inputIdx, count+int64(timestamp)); err != nil { return err } if m.debugRow != nil { diff --git a/pkg/ccl/importccl/read_import_mysql_test.go b/pkg/ccl/importccl/read_import_mysql_test.go index 34ebece36263..d8a7cba8e00d 100644 --- a/pkg/ccl/importccl/read_import_mysql_test.go +++ b/pkg/ccl/importccl/read_import_mysql_test.go @@ -46,7 +46,10 @@ func TestMysqldumpDataReader(t *testing.T) { tables := map[string]*execinfrapb.ReadImportDataSpec_ImportTable{"simple": {Desc: table.TableDesc()}} kvCh := make(chan row.KVBatch, 10) - converter, err := newMysqldumpReader(ctx, kvCh, tables, testEvalCtx) + // When creating a new dump reader, we need to pass in the walltime that will be used as + // a parameter used for generating unique rowid, random, and gen_random_uuid as default + // expressions. Here, the parameter doesn't matter so we pass in 0. + converter, err := newMysqldumpReader(ctx, kvCh, 0 /*walltime*/, tables, testEvalCtx) if err != nil { t.Fatal(err) diff --git a/pkg/ccl/importccl/read_import_pgdump.go b/pkg/ccl/importccl/read_import_pgdump.go index af0aef094d34..b9effa188135 100644 --- a/pkg/ccl/importccl/read_import_pgdump.go +++ b/pkg/ccl/importccl/read_import_pgdump.go @@ -506,11 +506,12 @@ func getTableName2(u *tree.UnresolvedObjectName) (string, error) { } type pgDumpReader struct { - tables map[string]*row.DatumRowConverter - descs map[string]*execinfrapb.ReadImportDataSpec_ImportTable - kvCh chan row.KVBatch - opts roachpb.PgDumpOptions - colMap map[*row.DatumRowConverter](map[string]int) + tables map[string]*row.DatumRowConverter + descs map[string]*execinfrapb.ReadImportDataSpec_ImportTable + kvCh chan row.KVBatch + opts roachpb.PgDumpOptions + walltime int64 + colMap map[*row.DatumRowConverter](map[string]int) } var _ inputConverter = &pgDumpReader{} @@ -520,6 +521,7 @@ func newPgDumpReader( ctx context.Context, kvCh chan row.KVBatch, opts roachpb.PgDumpOptions, + walltime int64, descs map[string]*execinfrapb.ReadImportDataSpec_ImportTable, evalCtx *tree.EvalContext, ) (*pgDumpReader, error) { @@ -545,11 +547,12 @@ func newPgDumpReader( } } return &pgDumpReader{ - kvCh: kvCh, - tables: converters, - descs: descs, - opts: opts, - colMap: colMap, + kvCh: kvCh, + tables: converters, + descs: descs, + opts: opts, + walltime: walltime, + colMap: colMap, }, nil } @@ -613,6 +616,7 @@ func (m *pgDumpReader) readFile( // the command "IMPORT INTO table (targetCols) PGDUMP DATA (filename)" expectedColLen = len(conv.VisibleCols) } + timestamp := timestampAfterEpoch(m.walltime) values, ok := i.Rows.Select.(*tree.ValuesClause) if !ok { return errors.Errorf("unsupported: %s", i.Rows.Select) @@ -658,7 +662,7 @@ func (m *pgDumpReader) readFile( } conv.Datums[idx] = converted } - if err := conv.Row(ctx, inputIdx, count); err != nil { + if err := conv.Row(ctx, inputIdx, count+int64(timestamp)); err != nil { return err } } diff --git a/pkg/sql/row/expr_walker.go b/pkg/sql/row/expr_walker.go index 995ab5e8ca3a..f502077fbaf8 100644 --- a/pkg/sql/row/expr_walker.go +++ b/pkg/sql/row/expr_walker.go @@ -12,13 +12,64 @@ package row import ( "context" + "math/rand" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" ) +// reseedRandEveryN is the number of calls before reseeding happens. +// TODO (anzoteh96): setting reseedRandEveryN presents the tradeoff +// between the frequency of re-seeding and the number of calls to +// Float64() needed upon every resume. Therefore it will be useful to +// tune this parameter. +const reseedRandEveryN = 1000 + +type importRand struct { + *rand.Rand + pos int64 +} + +func newImportRand(pos int64) *importRand { + adjPos := (pos / reseedRandEveryN) * reseedRandEveryN + rnd := rand.New(rand.NewSource(adjPos)) + for i := int(pos % reseedRandEveryN); i > 0; i-- { + _ = rnd.Float64() + } + return &importRand{rnd, pos} +} + +func (r *importRand) advancePos() { + r.pos++ + if r.pos%reseedRandEveryN == 0 { + // Time to reseed. + r.Rand = rand.New(rand.NewSource(r.pos)) + } +} + +func (r *importRand) Float64() float64 { + randNum := r.Rand.Float64() + r.advancePos() + return randNum +} + +func (r *importRand) Int63() int64 { + randNum := r.Rand.Int63() + r.advancePos() + return randNum +} + +func getSeedForImportRand(rowID int64, sourceID int32, numInstances int) int64 { + // We expect r.pos to increment by numInstances for each row. + // Therefore, assuming that rowID increments by 1 for every row, + // we will initialize the position as rowID * numInstances + sourceID << rowIDBits. + rowIDWithMultiplier := int64(numInstances) * rowID + return (int64(sourceID) << rowIDBits) ^ rowIDWithMultiplier +} + // For some functions (specifically the volatile ones), we do // not want to use the provided builtin. Instead, we opt for // our own function definition, which produces deterministic results. @@ -51,6 +102,8 @@ type cellInfoAnnotation struct { rowID int64 uniqueRowIDInstance int uniqueRowIDTotal int + randSource *importRand + randInstancePerRow int } func getCellInfoAnnotation(t *tree.Annotations) *cellInfoAnnotation { @@ -108,6 +161,27 @@ func importUniqueRowID(evalCtx *tree.EvalContext, args tree.Datums) (tree.Datum, return tree.NewDInt(tree.DInt(avoidCollisionsWithSQLsIDs | returnIndex)), nil } +func importRandom(evalCtx *tree.EvalContext, args tree.Datums) (tree.Datum, error) { + c := getCellInfoAnnotation(evalCtx.Annotations) + if c.randSource == nil { + c.randSource = newImportRand(getSeedForImportRand( + c.rowID, c.sourceID, c.randInstancePerRow)) + } + return tree.NewDFloat(tree.DFloat(c.randSource.Float64())), nil +} + +func importGenUUID(evalCtx *tree.EvalContext, args tree.Datums) (tree.Datum, error) { + c := getCellInfoAnnotation(evalCtx.Annotations) + if c.randSource == nil { + c.randSource = newImportRand(getSeedForImportRand( + c.rowID, c.sourceID, c.randInstancePerRow)) + } + gen := c.randSource.Int63() + id := uuid.MakeV4() + id.DeterministicV4(uint64(gen), uint64(1<<63)) + return tree.NewDUuid(tree.DUuid{UUID: id}), nil +} + // Besides overriding, there are also counters that we want to keep track // of as we walk through the expressions in a row (at datumRowConverter creation // time). This will be handled by the visitorSideEffect field: it will be @@ -151,6 +225,37 @@ var supportedImportFuncOverrides = map[string]*customFunc{ }, ), }, + "random": { + visitorSideEffect: func(annot *tree.Annotations) { + getCellInfoAnnotation(annot).randInstancePerRow++ + }, + override: makeBuiltinOverride( + tree.FunDefs["random"], + tree.Overload{ + Types: tree.ArgTypes{}, + ReturnType: tree.FixedReturnType(types.Float), + Fn: importRandom, + Info: "Returns a random number between 0 and 1 based on row position and time.", + Volatility: tree.VolatilityVolatile, + }, + ), + }, + "gen_random_uuid": { + visitorSideEffect: func(annot *tree.Annotations) { + getCellInfoAnnotation(annot).randInstancePerRow++ + }, + override: makeBuiltinOverride( + tree.FunDefs["gen_random_uuid"], + tree.Overload{ + Types: tree.ArgTypes{}, + ReturnType: tree.FixedReturnType(types.Uuid), + Fn: importGenUUID, + Info: "Generates a random UUID based on row position and time, " + + "and returns it as a value of UUID type.", + Volatility: tree.VolatilityVolatile, + }, + ), + }, } func unsafeExpressionError(err error, msg string, expr string) error { diff --git a/pkg/sql/row/row_converter.go b/pkg/sql/row/row_converter.go index 273c9a88b422..d9f2e8022d66 100644 --- a/pkg/sql/row/row_converter.go +++ b/pkg/sql/row/row_converter.go @@ -364,13 +364,21 @@ func (c *DatumRowConverter) Row(ctx context.Context, sourceID int32, rowIndex in getCellInfoAnnotation(c.EvalCtx.Annotations).Reset(sourceID, rowIndex) for i := range c.cols { col := &c.cols[i] - if !isTargetCol(i) && col.DefaultExpr != nil { + if col.DefaultExpr != nil { + // If this column is targeted, then the evaluation is a no-op except to + // make one evaluation just in case we have random() default expression + // to ensure that the positions we advance in a row is the same as the + // number of instances the function random() appears in a row. + // TODO (anzoteh96): Optimize this part of code when there's no expression + // involving random(), gen_random_uuid(), or anything like that. datum, err := c.defaultCache[i].Eval(c.EvalCtx) - if err != nil { - return errors.Wrapf( - err, "error evaluating default expression %q", col.DefaultExprStr()) + if !isTargetCol(i) { + if err != nil { + return errors.Wrapf( + err, "error evaluating default expression %q", col.DefaultExprStr()) + } + c.Datums[i] = datum } - c.Datums[i] = datum } }