From 7d148f6894e9ef49f2c50a3037dd10ceda410311 Mon Sep 17 00:00:00 2001 From: Paul Bardea Date: Fri, 26 Feb 2021 17:18:33 -0500 Subject: [PATCH] importccl: avoid random_number collisions between KV batches An import may parallelize the work to convert SQL rows into KVs. During this phase, default expressions are evaluated. Previously, IMPORT's implementations of random number generates that are evaluated in default expressions assumed that all of the given row IDs were contiguous. This is not the case since 1 row converter may be responsible for converting several non-contiguous batches of rows. This resulted in random_values colliding between different batches of KV space. This commit fixes this bug by feeding in the current position and resetting the random source backing these methods. This ensures that import treats a new contiguous batch of rows separately. Release justification: bug fix Release note (bug fix): Fix a bug where random numbers generated as default expressions during IMPORT would collide a few hundred rows apart from each-other. --- pkg/ccl/importccl/import_stmt_test.go | 54 +++++++++++-- pkg/sql/row/expr_walker.go | 107 +++++++++++++++++++------- 2 files changed, 128 insertions(+), 33 deletions(-) diff --git a/pkg/ccl/importccl/import_stmt_test.go b/pkg/ccl/importccl/import_stmt_test.go index f19add2b194e..d112e3b7425d 100644 --- a/pkg/ccl/importccl/import_stmt_test.go +++ b/pkg/ccl/importccl/import_stmt_test.go @@ -3968,7 +3968,7 @@ func TestImportDefault(t *testing.T) { } }) - t.Run("random-related", func(t *testing.T) { + t.Run("random-functions", func(t *testing.T) { testCases := []struct { name string create string @@ -3978,19 +3978,19 @@ func TestImportDefault(t *testing.T) { }{ { name: "random-multiple", - create: "a INT, b FLOAT DEFAULT random(), c STRING, d FLOAT DEFAULT random()", + create: "a INT, b FLOAT PRIMARY KEY DEFAULT random(), c STRING, d FLOAT DEFAULT random()", targetCols: []string{"a", "c"}, randomCols: []string{selectNotNull("b"), selectNotNull("d")}, }, { name: "gen_random_uuid", - create: "a INT, b STRING, c UUID DEFAULT gen_random_uuid()", + create: "a INT, b STRING, c UUID PRIMARY KEY DEFAULT gen_random_uuid(), d UUID DEFAULT gen_random_uuid()", targetCols: []string{"a", "b"}, - randomCols: []string{selectNotNull("c")}, + randomCols: []string{selectNotNull("c"), selectNotNull("d")}, }, { name: "mixed_random_uuid", - create: "a INT, b STRING, c UUID DEFAULT gen_random_uuid(), d FLOAT DEFAULT random()", + create: "a INT, b STRING, c UUID PRIMARY KEY DEFAULT gen_random_uuid(), d FLOAT DEFAULT random()", targetCols: []string{"a", "b"}, randomCols: []string{selectNotNull("c")}, }, @@ -4033,6 +4033,50 @@ func TestImportDefault(t *testing.T) { }) } +// This is a regression test for #61203. We test that the random() keys are +// unique on a larger data set. This would previously fail with a primary key +// collision error since we would generate duplicate UUIDs. +// +// Note: that although there is no guarantee that UUIDs do not collide, the +// probability of such a collision is vanishingly low. +func TestUniqueUUID(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // This test is slow under race since it explicitly tried to import a large + // amount of data. + skip.UnderRace(t, "slow under race") + + const ( + nodes = 3 + dataDir = "userfile://defaultdb.my_files/export" + dataFiles = dataDir + "/*" + ) + ctx := context.Background() + args := base.TestServerArgs{} + tc := testcluster.StartTestCluster(t, nodes, base.TestClusterArgs{ServerArgs: args}) + defer tc.Stopper().Stop(ctx) + connDB := tc.Conns[0] + sqlDB := sqlutils.MakeSQLRunner(connDB) + + dataSize := parallelImporterReaderBatchSize * 100 + + sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE data AS SELECT * FROM generate_series(1, %d);`, dataSize)) + sqlDB.Exec(t, `EXPORT INTO CSV $1 FROM TABLE data;`, dataDir) + + // Ensure that UUIDs do not collide when importing 20000 rows. + sqlDB.Exec(t, `CREATE TABLE r1 (a UUID PRIMARY KEY DEFAULT gen_random_uuid(), b INT);`) + sqlDB.Exec(t, `IMPORT INTO r1 (b) CSV DATA ($1);`, dataFiles) + + // Ensure that UUIDs do not collide when importing into a table with several UUID calls. + sqlDB.Exec(t, `CREATE TABLE r2 (a UUID PRIMARY KEY DEFAULT gen_random_uuid(), b INT, c UUID DEFAULT gen_random_uuid());`) + sqlDB.Exec(t, `IMPORT INTO r2 (b) CSV DATA ($1);`, dataFiles) + + // Ensure that random keys do not collide. + sqlDB.Exec(t, `CREATE TABLE r3 (a FLOAT PRIMARY KEY DEFAULT random(), b INT);`) + sqlDB.Exec(t, `IMPORT INTO r3 (b) CSV DATA ($1);`, dataFiles) +} + func TestImportDefaultNextVal(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/sql/row/expr_walker.go b/pkg/sql/row/expr_walker.go index 26c7102bc996..ffb82249690e 100644 --- a/pkg/sql/row/expr_walker.go +++ b/pkg/sql/row/expr_walker.go @@ -45,48 +45,93 @@ const chunkSizeIncrementRate = 10 const initialChunkSize = 10 const maxChunkSize = 100000 +// importRandPosition uniquely identifies an instance to a call to a random +// function during an import. +type importRandPosition int64 + +func (pos importRandPosition) distance(o importRandPosition) int64 { + diff := int64(pos) - int64(o) + if diff < 0 { + return -diff + } + return diff +} + +// getPosForRandImport gives the importRandPosition for the first instance of a +// call to a random function when generating a given row from a given source. +// numInstances refers to the number of random function invocations per row. +func getPosForRandImport(rowID int64, sourceID int32, numInstances int) importRandPosition { + // We expect r.pos to increment by numInstances for each row. + // Therefore, assuming that rowID increments by 1 for every row, + // we will initialize the position as rowID * numInstances + sourceID << rowIDBits. + rowIDWithMultiplier := int64(numInstances) * rowID + pos := (int64(sourceID) << rowIDBits) ^ rowIDWithMultiplier + return importRandPosition(pos) +} + +// randomSource is only exposed through an interface to ensure that caller's +// don't access underlying field. +type randomSource interface { + // Float64 returns, as a float64, a pseudo-random number in [0.0,1.0). + Float64(c *CellInfoAnnotation) float64 + // Int63 returns a non-negative pseudo-random 63-bit integer as an int64. + Int63(c *CellInfoAnnotation) int64 +} + +var _ randomSource = (*importRand)(nil) + type importRand struct { *rand.Rand - pos int64 + pos importRandPosition } -func newImportRand(pos int64) *importRand { +func (r *importRand) reseed(pos importRandPosition) { adjPos := (pos / reseedRandEveryN) * reseedRandEveryN - rnd := rand.New(rand.NewSource(adjPos)) + rnd := rand.New(rand.NewSource(int64(adjPos))) for i := int(pos % reseedRandEveryN); i > 0; i-- { _ = rnd.Float64() } - return &importRand{rnd, pos} -} -func (r *importRand) advancePos() { - r.pos++ + r.Rand = rnd + r.pos = pos +} + +func (r *importRand) maybeReseed(c *CellInfoAnnotation) { + // newRowPos is the position of the first random function invocation of the + // row we're currently processing. If this is more than c.randInstancePerRow + // away, that means that we've skipped a batch of rows. importRand assumes + // that it operates on a contiguous set of rows when it increments its + // position, so if we skip some rows we need to reseed. + // We may skip rows because a single row converter may be responsible for + // converting several non-contiguous batches of KVs. + newRowPos := getPosForRandImport(c.rowID, c.sourceID, c.randInstancePerRow) + rowsSkipped := newRowPos.distance(r.pos) > int64(c.randInstancePerRow) + if rowsSkipped { + // Reseed at the new position, since our internally tracked r.pos is now out + // of sync. + r.reseed(newRowPos) + } if r.pos%reseedRandEveryN == 0 { - // Time to reseed. - r.Rand = rand.New(rand.NewSource(r.pos)) + r.reseed(r.pos) } } -func (r *importRand) Float64() float64 { +// Float64 implements the randomSource interface. +func (r *importRand) Float64(c *CellInfoAnnotation) float64 { + r.maybeReseed(c) randNum := r.Rand.Float64() - r.advancePos() + r.pos++ return randNum } -func (r *importRand) Int63() int64 { +// Int63 implements the randomSource interface. +func (r *importRand) Int63(c *CellInfoAnnotation) int64 { + r.maybeReseed(c) randNum := r.Rand.Int63() - r.advancePos() + r.pos++ return randNum } -func getSeedForImportRand(rowID int64, sourceID int32, numInstances int) int64 { - // We expect r.pos to increment by numInstances for each row. - // Therefore, assuming that rowID increments by 1 for every row, - // we will initialize the position as rowID * numInstances + sourceID << rowIDBits. - rowIDWithMultiplier := int64(numInstances) * rowID - return (int64(sourceID) << rowIDBits) ^ rowIDWithMultiplier -} - // For some functions (specifically the volatile ones), we do // not want to use the provided builtin. Instead, we opt for // our own function definition, which produces deterministic results. @@ -135,7 +180,8 @@ type CellInfoAnnotation struct { uniqueRowIDTotal int // Annotations for rand() and gen_random_uuid(). - randSource *importRand + // randSource should not be used directly, but through getImportRand() instead. + randSource randomSource randInstancePerRow int // Annotations for next_val(). @@ -154,6 +200,13 @@ func (c *CellInfoAnnotation) reset(sourceID int32, rowID int64) { c.uniqueRowIDInstance = 0 } +func makeImportRand(c *CellInfoAnnotation) randomSource { + pos := getPosForRandImport(c.rowID, c.sourceID, c.randInstancePerRow) + randSource := &importRand{} + randSource.reseed(pos) + return randSource +} + // We don't want to call unique_rowid() for columns with such default expressions // because it is not idempotent and has unfortunate overlapping of output // spans since it puts the uniqueness-ensuring per-generator part (nodeID) @@ -202,19 +255,17 @@ func importUniqueRowID(evalCtx *tree.EvalContext, args tree.Datums) (tree.Datum, func importRandom(evalCtx *tree.EvalContext, args tree.Datums) (tree.Datum, error) { c := getCellInfoAnnotation(evalCtx.Annotations) if c.randSource == nil { - c.randSource = newImportRand(getSeedForImportRand( - c.rowID, c.sourceID, c.randInstancePerRow)) + c.randSource = makeImportRand(c) } - return tree.NewDFloat(tree.DFloat(c.randSource.Float64())), nil + return tree.NewDFloat(tree.DFloat(c.randSource.Float64(c))), nil } func importGenUUID(evalCtx *tree.EvalContext, args tree.Datums) (tree.Datum, error) { c := getCellInfoAnnotation(evalCtx.Annotations) if c.randSource == nil { - c.randSource = newImportRand(getSeedForImportRand( - c.rowID, c.sourceID, c.randInstancePerRow)) + c.randSource = makeImportRand(c) } - gen := c.randSource.Int63() + gen := c.randSource.Int63(c) id := uuid.MakeV4() id.DeterministicV4(uint64(gen), uint64(1<<63)) return tree.NewDUuid(tree.DUuid{UUID: id}), nil