Skip to content

Commit

Permalink
Merge pull request #61630 from pbardea/backport20.2-61214
Browse files Browse the repository at this point in the history
release-20.2: importccl: avoid random() collisions between KV batches
  • Loading branch information
pbardea authored Mar 15, 2021
2 parents d4a049e + 8a311c8 commit fc5e756
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 33 deletions.
54 changes: 49 additions & 5 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3648,7 +3648,7 @@ func TestImportDefault(t *testing.T) {

}
})
t.Run("random-related", func(t *testing.T) {
t.Run("random-functions", func(t *testing.T) {
testCases := []struct {
name string
create string
Expand All @@ -3658,19 +3658,19 @@ func TestImportDefault(t *testing.T) {
}{
{
name: "random-multiple",
create: "a INT, b FLOAT DEFAULT random(), c STRING, d FLOAT DEFAULT random()",
create: "a INT, b FLOAT PRIMARY KEY DEFAULT random(), c STRING, d FLOAT DEFAULT random()",
targetCols: []string{"a", "c"},
randomCols: []string{selectNotNull("b"), selectNotNull("d")},
},
{
name: "gen_random_uuid",
create: "a INT, b STRING, c UUID DEFAULT gen_random_uuid()",
create: "a INT, b STRING, c UUID PRIMARY KEY DEFAULT gen_random_uuid(), d UUID DEFAULT gen_random_uuid()",
targetCols: []string{"a", "b"},
randomCols: []string{selectNotNull("c")},
randomCols: []string{selectNotNull("c"), selectNotNull("d")},
},
{
name: "mixed_random_uuid",
create: "a INT, b STRING, c UUID DEFAULT gen_random_uuid(), d FLOAT DEFAULT random()",
create: "a INT, b STRING, c UUID PRIMARY KEY DEFAULT gen_random_uuid(), d FLOAT DEFAULT random()",
targetCols: []string{"a", "b"},
randomCols: []string{selectNotNull("c")},
},
Expand Down Expand Up @@ -3713,6 +3713,50 @@ func TestImportDefault(t *testing.T) {
})
}

// This is a regression test for #61203. We test that the random() keys are
// unique on a larger data set. This would previously fail with a primary key
// collision error since we would generate duplicate UUIDs.
//
// Note: that although there is no guarantee that UUIDs do not collide, the
// probability of such a collision is vanishingly low.
func TestUniqueUUID(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// This test is slow under race since it explicitly tried to import a large
// amount of data.
skip.UnderRace(t, "slow under race")

const (
nodes = 3
dataDir = "userfile://defaultdb.my_files/export"
dataFiles = dataDir + "/*"
)
ctx := context.Background()
args := base.TestServerArgs{}
tc := testcluster.StartTestCluster(t, nodes, base.TestClusterArgs{ServerArgs: args})
defer tc.Stopper().Stop(ctx)
connDB := tc.Conns[0]
sqlDB := sqlutils.MakeSQLRunner(connDB)

dataSize := parallelImporterReaderBatchSize * 100

sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE data AS SELECT * FROM generate_series(1, %d);`, dataSize))
sqlDB.Exec(t, `EXPORT INTO CSV $1 FROM TABLE data;`, dataDir)

// Ensure that UUIDs do not collide when importing 20000 rows.
sqlDB.Exec(t, `CREATE TABLE r1 (a UUID PRIMARY KEY DEFAULT gen_random_uuid(), b INT);`)
sqlDB.Exec(t, `IMPORT INTO r1 (b) CSV DATA ($1);`, dataFiles)

// Ensure that UUIDs do not collide when importing into a table with several UUID calls.
sqlDB.Exec(t, `CREATE TABLE r2 (a UUID PRIMARY KEY DEFAULT gen_random_uuid(), b INT, c UUID DEFAULT gen_random_uuid());`)
sqlDB.Exec(t, `IMPORT INTO r2 (b) CSV DATA ($1);`, dataFiles)

// Ensure that random keys do not collide.
sqlDB.Exec(t, `CREATE TABLE r3 (a FLOAT PRIMARY KEY DEFAULT random(), b INT);`)
sqlDB.Exec(t, `IMPORT INTO r3 (b) CSV DATA ($1);`, dataFiles)
}

func TestImportComputed(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
109 changes: 81 additions & 28 deletions pkg/sql/row/expr_walker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,48 +28,93 @@ import (
// tune this parameter.
const reseedRandEveryN = 1000

// importRandPosition uniquely identifies an instance to a call to a random
// function during an import.
type importRandPosition int64

func (pos importRandPosition) distance(o importRandPosition) int64 {
diff := int64(pos) - int64(o)
if diff < 0 {
return -diff
}
return diff
}

// getPosForRandImport gives the importRandPosition for the first instance of a
// call to a random function when generating a given row from a given source.
// numInstances refers to the number of random function invocations per row.
func getPosForRandImport(rowID int64, sourceID int32, numInstances int) importRandPosition {
// We expect r.pos to increment by numInstances for each row.
// Therefore, assuming that rowID increments by 1 for every row,
// we will initialize the position as rowID * numInstances + sourceID << rowIDBits.
rowIDWithMultiplier := int64(numInstances) * rowID
pos := (int64(sourceID) << rowIDBits) ^ rowIDWithMultiplier
return importRandPosition(pos)
}

// randomSource is only exposed through an interface to ensure that caller's
// don't access underlying field.
type randomSource interface {
// Float64 returns, as a float64, a pseudo-random number in [0.0,1.0).
Float64(c *cellInfoAnnotation) float64
// Int63 returns a non-negative pseudo-random 63-bit integer as an int64.
Int63(c *cellInfoAnnotation) int64
}

var _ randomSource = (*importRand)(nil)

type importRand struct {
*rand.Rand
pos int64
pos importRandPosition
}

func newImportRand(pos int64) *importRand {
func (r *importRand) reseed(pos importRandPosition) {
adjPos := (pos / reseedRandEveryN) * reseedRandEveryN
rnd := rand.New(rand.NewSource(adjPos))
rnd := rand.New(rand.NewSource(int64(adjPos)))
for i := int(pos % reseedRandEveryN); i > 0; i-- {
_ = rnd.Float64()
}
return &importRand{rnd, pos}

r.Rand = rnd
r.pos = pos
}

func (r *importRand) advancePos() {
r.pos++
func (r *importRand) maybeReseed(c *cellInfoAnnotation) {
// newRowPos is the position of the first random function invocation of the
// row we're currently processing. If this is more than c.randInstancePerRow
// away, that means that we've skipped a batch of rows. importRand assumes
// that it operates on a contiguous set of rows when it increments its
// position, so if we skip some rows we need to reseed.
// We may skip rows because a single row converter may be responsible for
// converting several non-contiguous batches of KVs.
newRowPos := getPosForRandImport(c.rowID, c.sourceID, c.randInstancePerRow)
rowsSkipped := newRowPos.distance(r.pos) > int64(c.randInstancePerRow)
if rowsSkipped {
// Reseed at the new position, since our internally tracked r.pos is now out
// of sync.
r.reseed(newRowPos)
}
if r.pos%reseedRandEveryN == 0 {
// Time to reseed.
r.Rand = rand.New(rand.NewSource(r.pos))
r.reseed(r.pos)
}
}

func (r *importRand) Float64() float64 {
// Float64 implements the randomSource interface.
func (r *importRand) Float64(c *cellInfoAnnotation) float64 {
r.maybeReseed(c)
randNum := r.Rand.Float64()
r.advancePos()
r.pos++
return randNum
}

func (r *importRand) Int63() int64 {
// Int63 implements the randomSource interface.
func (r *importRand) Int63(c *cellInfoAnnotation) int64 {
r.maybeReseed(c)
randNum := r.Rand.Int63()
r.advancePos()
r.pos++
return randNum
}

func getSeedForImportRand(rowID int64, sourceID int32, numInstances int) int64 {
// We expect r.pos to increment by numInstances for each row.
// Therefore, assuming that rowID increments by 1 for every row,
// we will initialize the position as rowID * numInstances + sourceID << rowIDBits.
rowIDWithMultiplier := int64(numInstances) * rowID
return (int64(sourceID) << rowIDBits) ^ rowIDWithMultiplier
}

// For some functions (specifically the volatile ones), we do
// not want to use the provided builtin. Instead, we opt for
// our own function definition, which produces deterministic results.
Expand Down Expand Up @@ -102,8 +147,11 @@ type cellInfoAnnotation struct {
rowID int64
uniqueRowIDInstance int
uniqueRowIDTotal int
randSource *importRand
randInstancePerRow int

// Annotations for rand() and gen_random_uuid().
// randSource should not be used directly, but through getImportRand() instead.
randSource randomSource
randInstancePerRow int
}

func getCellInfoAnnotation(t *tree.Annotations) *cellInfoAnnotation {
Expand All @@ -116,6 +164,13 @@ func (c *cellInfoAnnotation) Reset(sourceID int32, rowID int64) {
c.uniqueRowIDInstance = 0
}

func makeImportRand(c *cellInfoAnnotation) randomSource {
pos := getPosForRandImport(c.rowID, c.sourceID, c.randInstancePerRow)
randSource := &importRand{}
randSource.reseed(pos)
return randSource
}

// We don't want to call unique_rowid() for columns with such default expressions
// because it is not idempotent and has unfortunate overlapping of output
// spans since it puts the uniqueness-ensuring per-generator part (nodeID)
Expand Down Expand Up @@ -164,19 +219,17 @@ func importUniqueRowID(evalCtx *tree.EvalContext, args tree.Datums) (tree.Datum,
func importRandom(evalCtx *tree.EvalContext, args tree.Datums) (tree.Datum, error) {
c := getCellInfoAnnotation(evalCtx.Annotations)
if c.randSource == nil {
c.randSource = newImportRand(getSeedForImportRand(
c.rowID, c.sourceID, c.randInstancePerRow))
c.randSource = makeImportRand(c)
}
return tree.NewDFloat(tree.DFloat(c.randSource.Float64())), nil
return tree.NewDFloat(tree.DFloat(c.randSource.Float64(c))), nil
}

func importGenUUID(evalCtx *tree.EvalContext, args tree.Datums) (tree.Datum, error) {
c := getCellInfoAnnotation(evalCtx.Annotations)
if c.randSource == nil {
c.randSource = newImportRand(getSeedForImportRand(
c.rowID, c.sourceID, c.randInstancePerRow))
c.randSource = makeImportRand(c)
}
gen := c.randSource.Int63()
gen := c.randSource.Int63(c)
id := uuid.MakeV4()
id.DeterministicV4(uint64(gen), uint64(1<<63))
return tree.NewDUuid(tree.DUuid{UUID: id}), nil
Expand Down

0 comments on commit fc5e756

Please sign in to comment.