Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
52247: importccl: support random() and gen_random_uuid() as default expressions r=pbardea a=Anzoteh96

This PR follows up from cockroachdb#50295 to support random() and gen_random_uuid() as default expression. 

Following the visitor and overriding patterns on previous PRs, the random() expression is supported by periodically re-seeding for every N instances of random() and gen_random_uuid() (combined), and making sure that when we resume (after a failed import), we return to the position where it's last re-seeded. The seed is determined by the row number, timestamp, and source ID.

Release note (general change): random() and gen_random_uuid() are supported
as default expressions for IMPORT.

Co-authored-by: anzoteh96 <[email protected]>
  • Loading branch information
craig[bot] and anzoteh96 committed Aug 17, 2020
2 parents 3e24cae + 8a7e1bf commit 7b1abbf
Show file tree
Hide file tree
Showing 8 changed files with 214 additions and 48 deletions.
4 changes: 2 additions & 2 deletions pkg/ccl/importccl/import_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,12 @@ func makeInputConverter(
spec.Format.MysqlOut, kvCh, spec.WalltimeNanos,
int(spec.ReaderParallelism), singleTable, evalCtx)
case roachpb.IOFileFormat_Mysqldump:
return newMysqldumpReader(ctx, kvCh, spec.Tables, evalCtx)
return newMysqldumpReader(ctx, kvCh, spec.WalltimeNanos, spec.Tables, evalCtx)
case roachpb.IOFileFormat_PgCopy:
return newPgCopyReader(spec.Format.PgCopy, kvCh, spec.WalltimeNanos,
int(spec.ReaderParallelism), singleTable, evalCtx)
case roachpb.IOFileFormat_PgDump:
return newPgDumpReader(ctx, kvCh, spec.Format.PgDump, spec.Tables, evalCtx)
return newPgDumpReader(ctx, kvCh, spec.Format.PgDump, spec.WalltimeNanos, spec.Tables, evalCtx)
case roachpb.IOFileFormat_Avro:
return newAvroInputReader(
kvCh, singleTable, spec.Format.Avro, spec.WalltimeNanos,
Expand Down
87 changes: 63 additions & 24 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3114,14 +3114,6 @@ func TestImportDefault(t *testing.T) {
format: "CSV",
expectedResults: [][]string{{"1", "102"}, {"2", "102"}},
},
{
name: "random",
data: "1\n2",
create: `a INT, b FLOAT DEFAULT random()`,
targetCols: "a",
format: "CSV",
expectedError: "unsafe for import",
},
{
name: "nextval",
sequence: "testseq",
Expand All @@ -3131,22 +3123,6 @@ func TestImportDefault(t *testing.T) {
format: "CSV",
expectedError: "unsafe for import",
},
{
name: "random_plus_timestamp",
data: "1\n2",
create: "a INT, b INT DEFAULT (100*random())::int + current_timestamp()::int",
targetCols: "a",
format: "CSV",
expectedError: "unsafe for import",
},
{
name: "deep_nesting_with_random",
data: "1\n2",
create: "a INT, b INT DEFAULT (1 + 2 + (100 * round(3 + random())::int)) * 5 + 3",
targetCols: "a",
format: "CSV",
expectedError: "unsafe for import",
},
// TODO (anzoteh96): add AVRO format, and also MySQL and PGDUMP once
// IMPORT INTO are supported for these file formats.
}
Expand Down Expand Up @@ -3336,6 +3312,69 @@ func TestImportDefault(t *testing.T) {

}
})
t.Run("random-related", func(t *testing.T) {
testCases := []struct {
name string
create string
targetCols []string
randomCols []string
data string
}{
{
name: "random-multiple",
create: "a INT, b FLOAT DEFAULT random(), c STRING, d FLOAT DEFAULT random()",
targetCols: []string{"a", "c"},
randomCols: []string{selectNotNull("b"), selectNotNull("d")},
},
{
name: "gen_random_uuid",
create: "a INT, b STRING, c UUID DEFAULT gen_random_uuid()",
targetCols: []string{"a", "b"},
randomCols: []string{selectNotNull("c")},
},
{
name: "mixed_random_uuid",
create: "a INT, b STRING, c UUID DEFAULT gen_random_uuid(), d FLOAT DEFAULT random()",
targetCols: []string{"a", "b"},
randomCols: []string{selectNotNull("c")},
},
{
name: "random_with_targeted",
create: "a INT, b FLOAT DEFAULT random(), d FLOAT DEFAULT random()",
targetCols: []string{"a", "b"},
randomCols: []string{selectNotNull("d")},
data: "1,0.37\n2,0.455\n3,3.14\n4,0.246\n5,0.42",
},
// TODO (anzoteh96): create a testcase for AVRO once we manage to extract
// targeted columns from the AVRO schema.
}
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
defer sqlDB.Exec(t, `DROP TABLE t`)
sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE t(%s)`, test.create))
fileName := strings.Join(testFiles.files, ", ")
if test.data != "" {
data = test.data
fileName = fmt.Sprintf(`%q`, srv.URL)
}
// Let's do 3 IMPORTs for each test case to ensure that the values produced
// do not overlap.
for i := 0; i < 3; i++ {
sqlDB.Exec(t, fmt.Sprintf(`IMPORT INTO t (%s) CSV DATA (%s)`,
strings.Join(test.targetCols, ", "),
fileName))
}
var numDistinctRows int
sqlDB.QueryRow(t,
fmt.Sprintf(`SELECT DISTINCT COUNT (*) FROM (%s)`,
strings.Join(test.randomCols, " UNION ")),
).Scan(&numDistinctRows)
var numRows int
sqlDB.QueryRow(t, `SELECT COUNT (*) FROM t`).Scan(&numRows)
require.Equal(t, numDistinctRows, len(test.randomCols)*numRows)
})
}
})
}

// goos: darwin
Expand Down
10 changes: 7 additions & 3 deletions pkg/ccl/importccl/read_import_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,12 @@ func (p *parallelImporter) flush(ctx context.Context) error {
}
}

func timestampAfterEpoch(walltime int64) uint64 {
epoch := time.Date(2015, time.January, 1, 0, 0, 0, 0, time.UTC).UnixNano()
const precision = uint64(10 * time.Microsecond)
return uint64(walltime-epoch) / precision
}

func (p *parallelImporter) importWorker(
ctx context.Context,
workerID int,
Expand All @@ -624,9 +630,7 @@ func (p *parallelImporter) importWorker(
}

var rowNum int64
epoch := time.Date(2015, time.January, 1, 0, 0, 0, 0, time.UTC).UnixNano()
const precision = uint64(10 * time.Microsecond)
timestamp := uint64(importCtx.walltime-epoch) / precision
timestamp := timestampAfterEpoch(importCtx.walltime)

conv.CompletedRowFn = func() int64 {
m := emittedRowLowWatermark(workerID, rowNum, minEmitted)
Expand Down
7 changes: 5 additions & 2 deletions pkg/ccl/importccl/read_import_mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,19 @@ type mysqldumpReader struct {
tables map[string]*row.DatumRowConverter
kvCh chan row.KVBatch
debugRow func(tree.Datums)
walltime int64
}

var _ inputConverter = &mysqldumpReader{}

func newMysqldumpReader(
ctx context.Context,
kvCh chan row.KVBatch,
walltime int64,
tables map[string]*execinfrapb.ReadImportDataSpec_ImportTable,
evalCtx *tree.EvalContext,
) (*mysqldumpReader, error) {
res := &mysqldumpReader{evalCtx: evalCtx, kvCh: kvCh}
res := &mysqldumpReader{evalCtx: evalCtx, kvCh: kvCh, walltime: walltime}

converters := make(map[string]*row.DatumRowConverter, len(tables))
for name, table := range tables {
Expand Down Expand Up @@ -132,6 +134,7 @@ func (m *mysqldumpReader) readFile(
return errors.Errorf("missing schema info for requested table %q", name)
}
inserts++
timestamp := timestampAfterEpoch(m.walltime)
rows, ok := i.Rows.(mysql.Values)
if !ok {
return errors.Errorf(
Expand All @@ -156,7 +159,7 @@ func (m *mysqldumpReader) readFile(
}
conv.Datums[i] = converted
}
if err := conv.Row(ctx, inputIdx, count); err != nil {
if err := conv.Row(ctx, inputIdx, count+int64(timestamp)); err != nil {
return err
}
if m.debugRow != nil {
Expand Down
5 changes: 4 additions & 1 deletion pkg/ccl/importccl/read_import_mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ func TestMysqldumpDataReader(t *testing.T) {
tables := map[string]*execinfrapb.ReadImportDataSpec_ImportTable{"simple": {Desc: table.TableDesc()}}

kvCh := make(chan row.KVBatch, 10)
converter, err := newMysqldumpReader(ctx, kvCh, tables, testEvalCtx)
// When creating a new dump reader, we need to pass in the walltime that will be used as
// a parameter used for generating unique rowid, random, and gen_random_uuid as default
// expressions. Here, the parameter doesn't matter so we pass in 0.
converter, err := newMysqldumpReader(ctx, kvCh, 0 /*walltime*/, tables, testEvalCtx)

if err != nil {
t.Fatal(err)
Expand Down
26 changes: 15 additions & 11 deletions pkg/ccl/importccl/read_import_pgdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,11 +506,12 @@ func getTableName2(u *tree.UnresolvedObjectName) (string, error) {
}

type pgDumpReader struct {
tables map[string]*row.DatumRowConverter
descs map[string]*execinfrapb.ReadImportDataSpec_ImportTable
kvCh chan row.KVBatch
opts roachpb.PgDumpOptions
colMap map[*row.DatumRowConverter](map[string]int)
tables map[string]*row.DatumRowConverter
descs map[string]*execinfrapb.ReadImportDataSpec_ImportTable
kvCh chan row.KVBatch
opts roachpb.PgDumpOptions
walltime int64
colMap map[*row.DatumRowConverter](map[string]int)
}

var _ inputConverter = &pgDumpReader{}
Expand All @@ -520,6 +521,7 @@ func newPgDumpReader(
ctx context.Context,
kvCh chan row.KVBatch,
opts roachpb.PgDumpOptions,
walltime int64,
descs map[string]*execinfrapb.ReadImportDataSpec_ImportTable,
evalCtx *tree.EvalContext,
) (*pgDumpReader, error) {
Expand All @@ -545,11 +547,12 @@ func newPgDumpReader(
}
}
return &pgDumpReader{
kvCh: kvCh,
tables: converters,
descs: descs,
opts: opts,
colMap: colMap,
kvCh: kvCh,
tables: converters,
descs: descs,
opts: opts,
walltime: walltime,
colMap: colMap,
}, nil
}

Expand Down Expand Up @@ -613,6 +616,7 @@ func (m *pgDumpReader) readFile(
// the command "IMPORT INTO table (targetCols) PGDUMP DATA (filename)"
expectedColLen = len(conv.VisibleCols)
}
timestamp := timestampAfterEpoch(m.walltime)
values, ok := i.Rows.Select.(*tree.ValuesClause)
if !ok {
return errors.Errorf("unsupported: %s", i.Rows.Select)
Expand Down Expand Up @@ -658,7 +662,7 @@ func (m *pgDumpReader) readFile(
}
conv.Datums[idx] = converted
}
if err := conv.Row(ctx, inputIdx, count); err != nil {
if err := conv.Row(ctx, inputIdx, count+int64(timestamp)); err != nil {
return err
}
}
Expand Down
105 changes: 105 additions & 0 deletions pkg/sql/row/expr_walker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,64 @@ package row

import (
"context"
"math/rand"

"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)

// reseedRandEveryN is the number of calls before reseeding happens.
// TODO (anzoteh96): setting reseedRandEveryN presents the tradeoff
// between the frequency of re-seeding and the number of calls to
// Float64() needed upon every resume. Therefore it will be useful to
// tune this parameter.
const reseedRandEveryN = 1000

type importRand struct {
*rand.Rand
pos int64
}

func newImportRand(pos int64) *importRand {
adjPos := (pos / reseedRandEveryN) * reseedRandEveryN
rnd := rand.New(rand.NewSource(adjPos))
for i := int(pos % reseedRandEveryN); i > 0; i-- {
_ = rnd.Float64()
}
return &importRand{rnd, pos}
}

func (r *importRand) advancePos() {
r.pos++
if r.pos%reseedRandEveryN == 0 {
// Time to reseed.
r.Rand = rand.New(rand.NewSource(r.pos))
}
}

func (r *importRand) Float64() float64 {
randNum := r.Rand.Float64()
r.advancePos()
return randNum
}

func (r *importRand) Int63() int64 {
randNum := r.Rand.Int63()
r.advancePos()
return randNum
}

func getSeedForImportRand(rowID int64, sourceID int32, numInstances int) int64 {
// We expect r.pos to increment by numInstances for each row.
// Therefore, assuming that rowID increments by 1 for every row,
// we will initialize the position as rowID * numInstances + sourceID << rowIDBits.
rowIDWithMultiplier := int64(numInstances) * rowID
return (int64(sourceID) << rowIDBits) ^ rowIDWithMultiplier
}

// For some functions (specifically the volatile ones), we do
// not want to use the provided builtin. Instead, we opt for
// our own function definition, which produces deterministic results.
Expand Down Expand Up @@ -51,6 +102,8 @@ type cellInfoAnnotation struct {
rowID int64
uniqueRowIDInstance int
uniqueRowIDTotal int
randSource *importRand
randInstancePerRow int
}

func getCellInfoAnnotation(t *tree.Annotations) *cellInfoAnnotation {
Expand Down Expand Up @@ -108,6 +161,27 @@ func importUniqueRowID(evalCtx *tree.EvalContext, args tree.Datums) (tree.Datum,
return tree.NewDInt(tree.DInt(avoidCollisionsWithSQLsIDs | returnIndex)), nil
}

func importRandom(evalCtx *tree.EvalContext, args tree.Datums) (tree.Datum, error) {
c := getCellInfoAnnotation(evalCtx.Annotations)
if c.randSource == nil {
c.randSource = newImportRand(getSeedForImportRand(
c.rowID, c.sourceID, c.randInstancePerRow))
}
return tree.NewDFloat(tree.DFloat(c.randSource.Float64())), nil
}

func importGenUUID(evalCtx *tree.EvalContext, args tree.Datums) (tree.Datum, error) {
c := getCellInfoAnnotation(evalCtx.Annotations)
if c.randSource == nil {
c.randSource = newImportRand(getSeedForImportRand(
c.rowID, c.sourceID, c.randInstancePerRow))
}
gen := c.randSource.Int63()
id := uuid.MakeV4()
id.DeterministicV4(uint64(gen), uint64(1<<63))
return tree.NewDUuid(tree.DUuid{UUID: id}), nil
}

// Besides overriding, there are also counters that we want to keep track
// of as we walk through the expressions in a row (at datumRowConverter creation
// time). This will be handled by the visitorSideEffect field: it will be
Expand Down Expand Up @@ -151,6 +225,37 @@ var supportedImportFuncOverrides = map[string]*customFunc{
},
),
},
"random": {
visitorSideEffect: func(annot *tree.Annotations) {
getCellInfoAnnotation(annot).randInstancePerRow++
},
override: makeBuiltinOverride(
tree.FunDefs["random"],
tree.Overload{
Types: tree.ArgTypes{},
ReturnType: tree.FixedReturnType(types.Float),
Fn: importRandom,
Info: "Returns a random number between 0 and 1 based on row position and time.",
Volatility: tree.VolatilityVolatile,
},
),
},
"gen_random_uuid": {
visitorSideEffect: func(annot *tree.Annotations) {
getCellInfoAnnotation(annot).randInstancePerRow++
},
override: makeBuiltinOverride(
tree.FunDefs["gen_random_uuid"],
tree.Overload{
Types: tree.ArgTypes{},
ReturnType: tree.FixedReturnType(types.Uuid),
Fn: importGenUUID,
Info: "Generates a random UUID based on row position and time, " +
"and returns it as a value of UUID type.",
Volatility: tree.VolatilityVolatile,
},
),
},
}

func unsafeExpressionError(err error, msg string, expr string) error {
Expand Down
Loading

0 comments on commit 7b1abbf

Please sign in to comment.