Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

importccl: avoid random() collisions between KV batches #61214

Merged
merged 1 commit into from
Mar 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 49 additions & 5 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3968,7 +3968,7 @@ func TestImportDefault(t *testing.T) {

}
})
t.Run("random-related", func(t *testing.T) {
t.Run("random-functions", func(t *testing.T) {
testCases := []struct {
name string
create string
Expand All @@ -3978,19 +3978,19 @@ func TestImportDefault(t *testing.T) {
}{
{
name: "random-multiple",
create: "a INT, b FLOAT DEFAULT random(), c STRING, d FLOAT DEFAULT random()",
create: "a INT, b FLOAT PRIMARY KEY DEFAULT random(), c STRING, d FLOAT DEFAULT random()",
targetCols: []string{"a", "c"},
randomCols: []string{selectNotNull("b"), selectNotNull("d")},
},
{
name: "gen_random_uuid",
create: "a INT, b STRING, c UUID DEFAULT gen_random_uuid()",
create: "a INT, b STRING, c UUID PRIMARY KEY DEFAULT gen_random_uuid(), d UUID DEFAULT gen_random_uuid()",
targetCols: []string{"a", "b"},
randomCols: []string{selectNotNull("c")},
randomCols: []string{selectNotNull("c"), selectNotNull("d")},
},
{
name: "mixed_random_uuid",
create: "a INT, b STRING, c UUID DEFAULT gen_random_uuid(), d FLOAT DEFAULT random()",
create: "a INT, b STRING, c UUID PRIMARY KEY DEFAULT gen_random_uuid(), d FLOAT DEFAULT random()",
targetCols: []string{"a", "b"},
randomCols: []string{selectNotNull("c")},
},
Expand Down Expand Up @@ -4033,6 +4033,50 @@ func TestImportDefault(t *testing.T) {
})
}

// This is a regression test for #61203. We test that the random() keys are
// unique on a larger data set. This would previously fail with a primary key
// collision error since we would generate duplicate UUIDs.
//
// Note: that although there is no guarantee that UUIDs do not collide, the
// probability of such a collision is vanishingly low.
func TestUniqueUUID(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// This test is slow under race since it explicitly tried to import a large
// amount of data.
skip.UnderRace(t, "slow under race")

const (
nodes = 3
dataDir = "userfile://defaultdb.my_files/export"
dataFiles = dataDir + "/*"
)
ctx := context.Background()
args := base.TestServerArgs{}
tc := testcluster.StartTestCluster(t, nodes, base.TestClusterArgs{ServerArgs: args})
defer tc.Stopper().Stop(ctx)
connDB := tc.Conns[0]
sqlDB := sqlutils.MakeSQLRunner(connDB)

dataSize := parallelImporterReaderBatchSize * 100

sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE data AS SELECT * FROM generate_series(1, %d);`, dataSize))
sqlDB.Exec(t, `EXPORT INTO CSV $1 FROM TABLE data;`, dataDir)

// Ensure that UUIDs do not collide when importing 20000 rows.
sqlDB.Exec(t, `CREATE TABLE r1 (a UUID PRIMARY KEY DEFAULT gen_random_uuid(), b INT);`)
sqlDB.Exec(t, `IMPORT INTO r1 (b) CSV DATA ($1);`, dataFiles)

// Ensure that UUIDs do not collide when importing into a table with several UUID calls.
sqlDB.Exec(t, `CREATE TABLE r2 (a UUID PRIMARY KEY DEFAULT gen_random_uuid(), b INT, c UUID DEFAULT gen_random_uuid());`)
sqlDB.Exec(t, `IMPORT INTO r2 (b) CSV DATA ($1);`, dataFiles)

// Ensure that random keys do not collide.
sqlDB.Exec(t, `CREATE TABLE r3 (a FLOAT PRIMARY KEY DEFAULT random(), b INT);`)
sqlDB.Exec(t, `IMPORT INTO r3 (b) CSV DATA ($1);`, dataFiles)
}

func TestImportDefaultNextVal(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
107 changes: 79 additions & 28 deletions pkg/sql/row/expr_walker.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,48 +45,93 @@ const chunkSizeIncrementRate = 10
const initialChunkSize = 10
const maxChunkSize = 100000

// importRandPosition uniquely identifies an instance to a call to a random
// function during an import.
type importRandPosition int64

func (pos importRandPosition) distance(o importRandPosition) int64 {
diff := int64(pos) - int64(o)
if diff < 0 {
return -diff
}
return diff
}

// getPosForRandImport gives the importRandPosition for the first instance of a
// call to a random function when generating a given row from a given source.
// numInstances refers to the number of random function invocations per row.
func getPosForRandImport(rowID int64, sourceID int32, numInstances int) importRandPosition {
// We expect r.pos to increment by numInstances for each row.
// Therefore, assuming that rowID increments by 1 for every row,
// we will initialize the position as rowID * numInstances + sourceID << rowIDBits.
rowIDWithMultiplier := int64(numInstances) * rowID
pos := (int64(sourceID) << rowIDBits) ^ rowIDWithMultiplier
return importRandPosition(pos)
}

// randomSource is only exposed through an interface to ensure that caller's
// don't access underlying field.
type randomSource interface {
// Float64 returns, as a float64, a pseudo-random number in [0.0,1.0).
Float64(c *CellInfoAnnotation) float64
// Int63 returns a non-negative pseudo-random 63-bit integer as an int64.
Int63(c *CellInfoAnnotation) int64
}

var _ randomSource = (*importRand)(nil)

type importRand struct {
*rand.Rand
pos int64
pos importRandPosition
}

func newImportRand(pos int64) *importRand {
func (r *importRand) reseed(pos importRandPosition) {
adjPos := (pos / reseedRandEveryN) * reseedRandEveryN
rnd := rand.New(rand.NewSource(adjPos))
rnd := rand.New(rand.NewSource(int64(adjPos)))
for i := int(pos % reseedRandEveryN); i > 0; i-- {
_ = rnd.Float64()
}
return &importRand{rnd, pos}
}

func (r *importRand) advancePos() {
r.pos++
r.Rand = rnd
r.pos = pos
}

func (r *importRand) maybeReseed(c *CellInfoAnnotation) {
// newRowPos is the position of the first random function invocation of the
// row we're currently processing. If this is more than c.randInstancePerRow
// away, that means that we've skipped a batch of rows. importRand assumes
// that it operates on a contiguous set of rows when it increments its
// position, so if we skip some rows we need to reseed.
// We may skip rows because a single row converter may be responsible for
// converting several non-contiguous batches of KVs.
newRowPos := getPosForRandImport(c.rowID, c.sourceID, c.randInstancePerRow)
rowsSkipped := newRowPos.distance(r.pos) > int64(c.randInstancePerRow)
if rowsSkipped {
// Reseed at the new position, since our internally tracked r.pos is now out
// of sync.
r.reseed(newRowPos)
}
if r.pos%reseedRandEveryN == 0 {
// Time to reseed.
r.Rand = rand.New(rand.NewSource(r.pos))
r.reseed(r.pos)
}
}

func (r *importRand) Float64() float64 {
// Float64 implements the randomSource interface.
func (r *importRand) Float64(c *CellInfoAnnotation) float64 {
r.maybeReseed(c)
randNum := r.Rand.Float64()
r.advancePos()
r.pos++
return randNum
}

func (r *importRand) Int63() int64 {
// Int63 implements the randomSource interface.
func (r *importRand) Int63(c *CellInfoAnnotation) int64 {
r.maybeReseed(c)
randNum := r.Rand.Int63()
r.advancePos()
r.pos++
return randNum
}

func getSeedForImportRand(rowID int64, sourceID int32, numInstances int) int64 {
// We expect r.pos to increment by numInstances for each row.
// Therefore, assuming that rowID increments by 1 for every row,
// we will initialize the position as rowID * numInstances + sourceID << rowIDBits.
rowIDWithMultiplier := int64(numInstances) * rowID
return (int64(sourceID) << rowIDBits) ^ rowIDWithMultiplier
}

// For some functions (specifically the volatile ones), we do
// not want to use the provided builtin. Instead, we opt for
// our own function definition, which produces deterministic results.
Expand Down Expand Up @@ -135,7 +180,8 @@ type CellInfoAnnotation struct {
uniqueRowIDTotal int

// Annotations for rand() and gen_random_uuid().
randSource *importRand
// randSource should not be used directly, but through getImportRand() instead.
randSource randomSource
randInstancePerRow int

// Annotations for next_val().
Expand All @@ -154,6 +200,13 @@ func (c *CellInfoAnnotation) reset(sourceID int32, rowID int64) {
c.uniqueRowIDInstance = 0
}

func makeImportRand(c *CellInfoAnnotation) randomSource {
pos := getPosForRandImport(c.rowID, c.sourceID, c.randInstancePerRow)
randSource := &importRand{}
randSource.reseed(pos)
return randSource
}

// We don't want to call unique_rowid() for columns with such default expressions
// because it is not idempotent and has unfortunate overlapping of output
// spans since it puts the uniqueness-ensuring per-generator part (nodeID)
Expand Down Expand Up @@ -202,19 +255,17 @@ func importUniqueRowID(evalCtx *tree.EvalContext, args tree.Datums) (tree.Datum,
func importRandom(evalCtx *tree.EvalContext, args tree.Datums) (tree.Datum, error) {
c := getCellInfoAnnotation(evalCtx.Annotations)
if c.randSource == nil {
c.randSource = newImportRand(getSeedForImportRand(
c.rowID, c.sourceID, c.randInstancePerRow))
c.randSource = makeImportRand(c)
}
return tree.NewDFloat(tree.DFloat(c.randSource.Float64())), nil
return tree.NewDFloat(tree.DFloat(c.randSource.Float64(c))), nil
}

func importGenUUID(evalCtx *tree.EvalContext, args tree.Datums) (tree.Datum, error) {
c := getCellInfoAnnotation(evalCtx.Annotations)
if c.randSource == nil {
c.randSource = newImportRand(getSeedForImportRand(
c.rowID, c.sourceID, c.randInstancePerRow))
c.randSource = makeImportRand(c)
}
gen := c.randSource.Int63()
gen := c.randSource.Int63(c)
id := uuid.MakeV4()
id.DeterministicV4(uint64(gen), uint64(1<<63))
return tree.NewDUuid(tree.DUuid{UUID: id}), nil
Expand Down