diff --git a/pkg/ccl/importccl/import_stmt_test.go b/pkg/ccl/importccl/import_stmt_test.go index f19add2b194e..d112e3b7425d 100644 --- a/pkg/ccl/importccl/import_stmt_test.go +++ b/pkg/ccl/importccl/import_stmt_test.go @@ -3968,7 +3968,7 @@ func TestImportDefault(t *testing.T) { } }) - t.Run("random-related", func(t *testing.T) { + t.Run("random-functions", func(t *testing.T) { testCases := []struct { name string create string @@ -3978,19 +3978,19 @@ func TestImportDefault(t *testing.T) { }{ { name: "random-multiple", - create: "a INT, b FLOAT DEFAULT random(), c STRING, d FLOAT DEFAULT random()", + create: "a INT, b FLOAT PRIMARY KEY DEFAULT random(), c STRING, d FLOAT DEFAULT random()", targetCols: []string{"a", "c"}, randomCols: []string{selectNotNull("b"), selectNotNull("d")}, }, { name: "gen_random_uuid", - create: "a INT, b STRING, c UUID DEFAULT gen_random_uuid()", + create: "a INT, b STRING, c UUID PRIMARY KEY DEFAULT gen_random_uuid(), d UUID DEFAULT gen_random_uuid()", targetCols: []string{"a", "b"}, - randomCols: []string{selectNotNull("c")}, + randomCols: []string{selectNotNull("c"), selectNotNull("d")}, }, { name: "mixed_random_uuid", - create: "a INT, b STRING, c UUID DEFAULT gen_random_uuid(), d FLOAT DEFAULT random()", + create: "a INT, b STRING, c UUID PRIMARY KEY DEFAULT gen_random_uuid(), d FLOAT DEFAULT random()", targetCols: []string{"a", "b"}, randomCols: []string{selectNotNull("c")}, }, @@ -4033,6 +4033,50 @@ func TestImportDefault(t *testing.T) { }) } +// This is a regression test for #61203. We test that the random() keys are +// unique on a larger data set. This would previously fail with a primary key +// collision error since we would generate duplicate UUIDs. +// +// Note: that although there is no guarantee that UUIDs do not collide, the +// probability of such a collision is vanishingly low. +func TestUniqueUUID(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // This test is slow under race since it explicitly tried to import a large + // amount of data. + skip.UnderRace(t, "slow under race") + + const ( + nodes = 3 + dataDir = "userfile://defaultdb.my_files/export" + dataFiles = dataDir + "/*" + ) + ctx := context.Background() + args := base.TestServerArgs{} + tc := testcluster.StartTestCluster(t, nodes, base.TestClusterArgs{ServerArgs: args}) + defer tc.Stopper().Stop(ctx) + connDB := tc.Conns[0] + sqlDB := sqlutils.MakeSQLRunner(connDB) + + dataSize := parallelImporterReaderBatchSize * 100 + + sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE data AS SELECT * FROM generate_series(1, %d);`, dataSize)) + sqlDB.Exec(t, `EXPORT INTO CSV $1 FROM TABLE data;`, dataDir) + + // Ensure that UUIDs do not collide when importing 20000 rows. + sqlDB.Exec(t, `CREATE TABLE r1 (a UUID PRIMARY KEY DEFAULT gen_random_uuid(), b INT);`) + sqlDB.Exec(t, `IMPORT INTO r1 (b) CSV DATA ($1);`, dataFiles) + + // Ensure that UUIDs do not collide when importing into a table with several UUID calls. + sqlDB.Exec(t, `CREATE TABLE r2 (a UUID PRIMARY KEY DEFAULT gen_random_uuid(), b INT, c UUID DEFAULT gen_random_uuid());`) + sqlDB.Exec(t, `IMPORT INTO r2 (b) CSV DATA ($1);`, dataFiles) + + // Ensure that random keys do not collide. + sqlDB.Exec(t, `CREATE TABLE r3 (a FLOAT PRIMARY KEY DEFAULT random(), b INT);`) + sqlDB.Exec(t, `IMPORT INTO r3 (b) CSV DATA ($1);`, dataFiles) +} + func TestImportDefaultNextVal(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/sql/row/expr_walker.go b/pkg/sql/row/expr_walker.go index 26c7102bc996..ffb82249690e 100644 --- a/pkg/sql/row/expr_walker.go +++ b/pkg/sql/row/expr_walker.go @@ -45,48 +45,93 @@ const chunkSizeIncrementRate = 10 const initialChunkSize = 10 const maxChunkSize = 100000 +// importRandPosition uniquely identifies an instance to a call to a random +// function during an import. +type importRandPosition int64 + +func (pos importRandPosition) distance(o importRandPosition) int64 { + diff := int64(pos) - int64(o) + if diff < 0 { + return -diff + } + return diff +} + +// getPosForRandImport gives the importRandPosition for the first instance of a +// call to a random function when generating a given row from a given source. +// numInstances refers to the number of random function invocations per row. +func getPosForRandImport(rowID int64, sourceID int32, numInstances int) importRandPosition { + // We expect r.pos to increment by numInstances for each row. + // Therefore, assuming that rowID increments by 1 for every row, + // we will initialize the position as rowID * numInstances + sourceID << rowIDBits. + rowIDWithMultiplier := int64(numInstances) * rowID + pos := (int64(sourceID) << rowIDBits) ^ rowIDWithMultiplier + return importRandPosition(pos) +} + +// randomSource is only exposed through an interface to ensure that caller's +// don't access underlying field. +type randomSource interface { + // Float64 returns, as a float64, a pseudo-random number in [0.0,1.0). + Float64(c *CellInfoAnnotation) float64 + // Int63 returns a non-negative pseudo-random 63-bit integer as an int64. + Int63(c *CellInfoAnnotation) int64 +} + +var _ randomSource = (*importRand)(nil) + type importRand struct { *rand.Rand - pos int64 + pos importRandPosition } -func newImportRand(pos int64) *importRand { +func (r *importRand) reseed(pos importRandPosition) { adjPos := (pos / reseedRandEveryN) * reseedRandEveryN - rnd := rand.New(rand.NewSource(adjPos)) + rnd := rand.New(rand.NewSource(int64(adjPos))) for i := int(pos % reseedRandEveryN); i > 0; i-- { _ = rnd.Float64() } - return &importRand{rnd, pos} -} -func (r *importRand) advancePos() { - r.pos++ + r.Rand = rnd + r.pos = pos +} + +func (r *importRand) maybeReseed(c *CellInfoAnnotation) { + // newRowPos is the position of the first random function invocation of the + // row we're currently processing. If this is more than c.randInstancePerRow + // away, that means that we've skipped a batch of rows. importRand assumes + // that it operates on a contiguous set of rows when it increments its + // position, so if we skip some rows we need to reseed. + // We may skip rows because a single row converter may be responsible for + // converting several non-contiguous batches of KVs. + newRowPos := getPosForRandImport(c.rowID, c.sourceID, c.randInstancePerRow) + rowsSkipped := newRowPos.distance(r.pos) > int64(c.randInstancePerRow) + if rowsSkipped { + // Reseed at the new position, since our internally tracked r.pos is now out + // of sync. + r.reseed(newRowPos) + } if r.pos%reseedRandEveryN == 0 { - // Time to reseed. - r.Rand = rand.New(rand.NewSource(r.pos)) + r.reseed(r.pos) } } -func (r *importRand) Float64() float64 { +// Float64 implements the randomSource interface. +func (r *importRand) Float64(c *CellInfoAnnotation) float64 { + r.maybeReseed(c) randNum := r.Rand.Float64() - r.advancePos() + r.pos++ return randNum } -func (r *importRand) Int63() int64 { +// Int63 implements the randomSource interface. +func (r *importRand) Int63(c *CellInfoAnnotation) int64 { + r.maybeReseed(c) randNum := r.Rand.Int63() - r.advancePos() + r.pos++ return randNum } -func getSeedForImportRand(rowID int64, sourceID int32, numInstances int) int64 { - // We expect r.pos to increment by numInstances for each row. - // Therefore, assuming that rowID increments by 1 for every row, - // we will initialize the position as rowID * numInstances + sourceID << rowIDBits. - rowIDWithMultiplier := int64(numInstances) * rowID - return (int64(sourceID) << rowIDBits) ^ rowIDWithMultiplier -} - // For some functions (specifically the volatile ones), we do // not want to use the provided builtin. Instead, we opt for // our own function definition, which produces deterministic results. @@ -135,7 +180,8 @@ type CellInfoAnnotation struct { uniqueRowIDTotal int // Annotations for rand() and gen_random_uuid(). - randSource *importRand + // randSource should not be used directly, but through getImportRand() instead. + randSource randomSource randInstancePerRow int // Annotations for next_val(). @@ -154,6 +200,13 @@ func (c *CellInfoAnnotation) reset(sourceID int32, rowID int64) { c.uniqueRowIDInstance = 0 } +func makeImportRand(c *CellInfoAnnotation) randomSource { + pos := getPosForRandImport(c.rowID, c.sourceID, c.randInstancePerRow) + randSource := &importRand{} + randSource.reseed(pos) + return randSource +} + // We don't want to call unique_rowid() for columns with such default expressions // because it is not idempotent and has unfortunate overlapping of output // spans since it puts the uniqueness-ensuring per-generator part (nodeID) @@ -202,19 +255,17 @@ func importUniqueRowID(evalCtx *tree.EvalContext, args tree.Datums) (tree.Datum, func importRandom(evalCtx *tree.EvalContext, args tree.Datums) (tree.Datum, error) { c := getCellInfoAnnotation(evalCtx.Annotations) if c.randSource == nil { - c.randSource = newImportRand(getSeedForImportRand( - c.rowID, c.sourceID, c.randInstancePerRow)) + c.randSource = makeImportRand(c) } - return tree.NewDFloat(tree.DFloat(c.randSource.Float64())), nil + return tree.NewDFloat(tree.DFloat(c.randSource.Float64(c))), nil } func importGenUUID(evalCtx *tree.EvalContext, args tree.Datums) (tree.Datum, error) { c := getCellInfoAnnotation(evalCtx.Annotations) if c.randSource == nil { - c.randSource = newImportRand(getSeedForImportRand( - c.rowID, c.sourceID, c.randInstancePerRow)) + c.randSource = makeImportRand(c) } - gen := c.randSource.Int63() + gen := c.randSource.Int63(c) id := uuid.MakeV4() id.DeterministicV4(uint64(gen), uint64(1<<63)) return tree.NewDUuid(tree.DUuid{UUID: id}), nil