Skip to content

Commit

Permalink
importccl: support random() and gen_random_uuid() as default expression
Browse files Browse the repository at this point in the history
This PR follows up from cockroachdb#50295 to support random() and gen_random_uuid() as
default expression. Following the visitor and overriding patterns on previous
PRs, the random() expression is supported by periodically re-seeding (say,
every N rows) and making sure that when we resume (after a failed import),
we return to the position where it's last re-seeded. The seed is determined
by the row number, timestamp, and source ID.

Release note (general change): random() and gen_random_uuid() are supported
as default expressions for IMPORT.
  • Loading branch information
anzoteh96 committed Aug 3, 2020
1 parent b6cdf0f commit 879c970
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 26 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/importccl/import_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ func ingestKvs(
} else {
prog.ResumePos[file] = idx
}
prog.ResumePos[file] = adjustedResumePos(prog.ResumePos[file])
prog.CompletedFraction[file] = math.Float32frombits(atomic.LoadUint32(&writtenFraction[offset]))
}
progCh <- prog
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/importccl/import_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,8 @@ func TestImportHonorsResumePosition(t *testing.T) {
// (BulkAdderFlushesEveryBatch), then the progress resport must be emitted every
// batchSize rows (possibly out of order), starting from our initial resumePos
for prog := range progCh {
if !t.Failed() && prog.ResumePos[0] < (rp+int64(batchSize)) {
adjustedPos := adjustedResumePos(rp + int64(batchSize))
if !t.Failed() && prog.ResumePos[0] < adjustedPos {
t.Logf("unexpected progress resume pos: %d", prog.ResumePos[0])
t.Fail()
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,16 @@ const (
avroSchemaURI = "schema_uri"
)

// resumeRowMultiple is so that each resume row index must be a multiple of
// resumeRowMultiple, and that every row is re-seeded at those places.
const resumeRowMultiple = 20

// For the purpose of recording the resume position, we want to resume at
// the last position we reseeded so we round down to the lowest resumeRowMultiple.
func adjustedResumePos(resumePos int64) int64 {
return (resumePos / resumeRowMultiple) * resumeRowMultiple
}

var importOptionExpectValues = map[string]sql.KVStringOptValidate{
csvDelimiter: sql.KVStringOptRequireValue,
csvComment: sql.KVStringOptRequireValue,
Expand Down
62 changes: 38 additions & 24 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3067,14 +3067,6 @@ func TestImportDefault(t *testing.T) {
format: "CSV",
expectedResults: [][]string{{"1", "102"}, {"2", "102"}},
},
{
name: "random",
data: "1\n2",
create: `a INT, b FLOAT DEFAULT random()`,
targetCols: "a",
format: "CSV",
expectedError: "unsafe for import",
},
{
name: "nextval",
sequence: "testseq",
Expand All @@ -3084,22 +3076,6 @@ func TestImportDefault(t *testing.T) {
format: "CSV",
expectedError: "unsafe for import",
},
{
name: "random_plus_timestamp",
data: "1\n2",
create: "a INT, b INT DEFAULT (100*random())::int + current_timestamp()::int",
targetCols: "a",
format: "CSV",
expectedError: "unsafe for import",
},
{
name: "deep_nesting_with_random",
data: "1\n2",
create: "a INT, b INT DEFAULT (1 + 2 + (100 * round(3 + random())::int)) * 5 + 3",
targetCols: "a",
format: "CSV",
expectedError: "unsafe for import",
},
// TODO (anzoteh96): add AVRO format, and also MySQL and PGDUMP once
// IMPORT INTO are supported for these file formats.
}
Expand Down Expand Up @@ -3289,6 +3265,44 @@ func TestImportDefault(t *testing.T) {

}
})
t.Run("random-related", func(t *testing.T) {
testCases := []struct {
name string
create string
targetCols []string
randomCols []string
}{
{
name: "random",
create: "a INT, b FLOAT DEFAULT random(), c STRING",
targetCols: []string{"a", "c"},
randomCols: []string{selectNotNull("b")},
},
{
name: "gen_random_uuid",
create: "a INT, b STRING, c UUID DEFAULT gen_random_uuid()",
targetCols: []string{"a", "b"},
randomCols: []string{selectNotNull("c")},
},
}
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
defer sqlDB.Exec(t, `DROP TABLE t`)
sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE t(%s)`, test.create))
sqlDB.Exec(t, fmt.Sprintf(`IMPORT INTO t (%s) CSV DATA (%s)`,
strings.Join(test.targetCols, ", "),
strings.Join(testFiles.files, ", ")))
var numDistinctRows int
sqlDB.QueryRow(t,
fmt.Sprintf(`SELECT DISTINCT COUNT (*) FROM (%s)`,
strings.Join(test.randomCols, " UNION ")),
).Scan(&numDistinctRows)
var numRows int
sqlDB.QueryRow(t, `SELECT COUNT (*) FROM t`).Scan(&numRows)
require.Equal(t, numDistinctRows, len(test.randomCols)*numRows)
})
}
})
}

// goos: darwin
Expand Down
5 changes: 4 additions & 1 deletion pkg/ccl/importccl/read_import_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,14 +619,17 @@ func (p *parallelImporter) importWorker(
conv.KvBatch.Progress = batch.progress
for batchIdx, record := range batch.data {
rowNum = batch.startPos + int64(batchIdx)
rowIndex := int64(timestamp) + rowNum
if batchIdx%resumeRowMultiple == 0 {
row.ReseedForImport(conv.KvBatch.Source, rowIndex)
}
if err := consumer.FillDatums(record, rowNum, conv); err != nil {
if err = handleCorruptRow(ctx, fileCtx, err); err != nil {
return err
}
continue
}

rowIndex := int64(timestamp) + rowNum
if err := conv.Row(ctx, conv.KvBatch.Source, rowIndex); err != nil {
return newImportRowError(err, fmt.Sprintf("%v", record), rowNum)
}
Expand Down
40 changes: 40 additions & 0 deletions pkg/sql/row/expr_walker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ package row

import (
"context"
"math/rand"

"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -108,6 +110,17 @@ func importUniqueRowID(evalCtx *tree.EvalContext, args tree.Datums) (tree.Datum,
return tree.NewDInt(tree.DInt(avoidCollisionsWithSQLsIDs | returnIndex)), nil
}

func importRandom(evalCtx *tree.EvalContext, args tree.Datums) (tree.Datum, error) {
return tree.NewDFloat(tree.DFloat(rand.Float64())), nil
}

func importGenUUID(evalCtx *tree.EvalContext, args tree.Datums) (tree.Datum, error) {
gen := rand.Int63()
id := uuid.MakeV4()
id.DeterministicV4(uint64(gen), uint64(1<<63))
return tree.NewDUuid(tree.DUuid{UUID: id}), nil
}

// Besides overriding, there are also counters that we want to keep track
// of as we walk through the expressions in a row (at datumRowConverter creation
// time). This will be handled by the visitorSideEffect field: it will be
Expand Down Expand Up @@ -151,6 +164,33 @@ var supportedImportFuncOverrides = map[string]*customFunc{
},
),
},
"random": {
visitorSideEffect: func(annot *tree.Annotations) {},
override: makeBuiltinOverride(
tree.FunDefs["random"],
tree.Overload{
Types: tree.ArgTypes{},
ReturnType: tree.FixedReturnType(types.Float),
Fn: importRandom,
Info: "Returns a random number between 0 and 1 based on row position and time.",
Volatility: tree.VolatilityVolatile,
},
),
},
"gen_random_uuid": {
visitorSideEffect: func(annot *tree.Annotations) {},
override: makeBuiltinOverride(
tree.FunDefs["gen_random_uuid"],
tree.Overload{
Types: tree.ArgTypes{},
ReturnType: tree.FixedReturnType(types.Uuid),
Fn: importGenUUID,
Info: "Generates a random UUID based on row position and time, " +
"and returns it as a value of UUID type.",
Volatility: tree.VolatilityVolatile,
},
),
},
}

func unsafeExpressionError(err error, msg string, expr string) error {
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/row/row_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package row

import (
"context"
"math/rand"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins"
Expand Down Expand Up @@ -178,6 +179,12 @@ type KVBatch struct {
KVs []roachpb.KeyValue
}

// ReseedForImport is a method to reset the state of the rand library whenever
// being called.
func ReseedForImport(sourceID int32, rowIndex int64) {
rand.Seed((int64(sourceID) << rowIDBits) + rowIndex)
}

// DatumRowConverter converts Datums into kvs and streams it to the destination
// channel.
type DatumRowConverter struct {
Expand Down

0 comments on commit 879c970

Please sign in to comment.