Skip to content

Commit

Permalink
importccl: avoid random_number collisions between KV batches
Browse files Browse the repository at this point in the history
An import may parallelize the work to convert SQL rows into KVs. During
this phase, default expressions are evaluated. Previously, IMPORT's
implementations of random number generates that are evaluated in default
expressions assumed that all of the given row IDs were contiguous. This
is not the case since 1 row converter may be responsible for converting
several non-contiguous batches of rows. This resulted in random_values
colliding between different batches of KV space.

This commit fixes this bug by feeding in the current position and
resetting the random source backing these methods. This ensures that
import treats a new contiguous batch of rows separately.

Release justification: bug fix
Release note (bug fix): Fix a bug where random numbers generated as
default expressions during IMPORT would collide a few hundred rows apart
from each-other.
  • Loading branch information
pbardea committed Mar 12, 2021
1 parent b4caa1a commit 8a311c8
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 33 deletions.
54 changes: 49 additions & 5 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3648,7 +3648,7 @@ func TestImportDefault(t *testing.T) {

}
})
t.Run("random-related", func(t *testing.T) {
t.Run("random-functions", func(t *testing.T) {
testCases := []struct {
name string
create string
Expand All @@ -3658,19 +3658,19 @@ func TestImportDefault(t *testing.T) {
}{
{
name: "random-multiple",
create: "a INT, b FLOAT DEFAULT random(), c STRING, d FLOAT DEFAULT random()",
create: "a INT, b FLOAT PRIMARY KEY DEFAULT random(), c STRING, d FLOAT DEFAULT random()",
targetCols: []string{"a", "c"},
randomCols: []string{selectNotNull("b"), selectNotNull("d")},
},
{
name: "gen_random_uuid",
create: "a INT, b STRING, c UUID DEFAULT gen_random_uuid()",
create: "a INT, b STRING, c UUID PRIMARY KEY DEFAULT gen_random_uuid(), d UUID DEFAULT gen_random_uuid()",
targetCols: []string{"a", "b"},
randomCols: []string{selectNotNull("c")},
randomCols: []string{selectNotNull("c"), selectNotNull("d")},
},
{
name: "mixed_random_uuid",
create: "a INT, b STRING, c UUID DEFAULT gen_random_uuid(), d FLOAT DEFAULT random()",
create: "a INT, b STRING, c UUID PRIMARY KEY DEFAULT gen_random_uuid(), d FLOAT DEFAULT random()",
targetCols: []string{"a", "b"},
randomCols: []string{selectNotNull("c")},
},
Expand Down Expand Up @@ -3713,6 +3713,50 @@ func TestImportDefault(t *testing.T) {
})
}

// This is a regression test for #61203. We test that the random() keys are
// unique on a larger data set. This would previously fail with a primary key
// collision error since we would generate duplicate UUIDs.
//
// Note: that although there is no guarantee that UUIDs do not collide, the
// probability of such a collision is vanishingly low.
func TestUniqueUUID(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// This test is slow under race since it explicitly tried to import a large
// amount of data.
skip.UnderRace(t, "slow under race")

const (
nodes = 3
dataDir = "userfile://defaultdb.my_files/export"
dataFiles = dataDir + "/*"
)
ctx := context.Background()
args := base.TestServerArgs{}
tc := testcluster.StartTestCluster(t, nodes, base.TestClusterArgs{ServerArgs: args})
defer tc.Stopper().Stop(ctx)
connDB := tc.Conns[0]
sqlDB := sqlutils.MakeSQLRunner(connDB)

dataSize := parallelImporterReaderBatchSize * 100

sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE data AS SELECT * FROM generate_series(1, %d);`, dataSize))
sqlDB.Exec(t, `EXPORT INTO CSV $1 FROM TABLE data;`, dataDir)

// Ensure that UUIDs do not collide when importing 20000 rows.
sqlDB.Exec(t, `CREATE TABLE r1 (a UUID PRIMARY KEY DEFAULT gen_random_uuid(), b INT);`)
sqlDB.Exec(t, `IMPORT INTO r1 (b) CSV DATA ($1);`, dataFiles)

// Ensure that UUIDs do not collide when importing into a table with several UUID calls.
sqlDB.Exec(t, `CREATE TABLE r2 (a UUID PRIMARY KEY DEFAULT gen_random_uuid(), b INT, c UUID DEFAULT gen_random_uuid());`)
sqlDB.Exec(t, `IMPORT INTO r2 (b) CSV DATA ($1);`, dataFiles)

// Ensure that random keys do not collide.
sqlDB.Exec(t, `CREATE TABLE r3 (a FLOAT PRIMARY KEY DEFAULT random(), b INT);`)
sqlDB.Exec(t, `IMPORT INTO r3 (b) CSV DATA ($1);`, dataFiles)
}

func TestImportComputed(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
109 changes: 81 additions & 28 deletions pkg/sql/row/expr_walker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,48 +28,93 @@ import (
// tune this parameter.
const reseedRandEveryN = 1000

// importRandPosition uniquely identifies an instance to a call to a random
// function during an import.
type importRandPosition int64

func (pos importRandPosition) distance(o importRandPosition) int64 {
diff := int64(pos) - int64(o)
if diff < 0 {
return -diff
}
return diff
}

// getPosForRandImport gives the importRandPosition for the first instance of a
// call to a random function when generating a given row from a given source.
// numInstances refers to the number of random function invocations per row.
func getPosForRandImport(rowID int64, sourceID int32, numInstances int) importRandPosition {
// We expect r.pos to increment by numInstances for each row.
// Therefore, assuming that rowID increments by 1 for every row,
// we will initialize the position as rowID * numInstances + sourceID << rowIDBits.
rowIDWithMultiplier := int64(numInstances) * rowID
pos := (int64(sourceID) << rowIDBits) ^ rowIDWithMultiplier
return importRandPosition(pos)
}

// randomSource is only exposed through an interface to ensure that caller's
// don't access underlying field.
type randomSource interface {
// Float64 returns, as a float64, a pseudo-random number in [0.0,1.0).
Float64(c *cellInfoAnnotation) float64
// Int63 returns a non-negative pseudo-random 63-bit integer as an int64.
Int63(c *cellInfoAnnotation) int64
}

var _ randomSource = (*importRand)(nil)

type importRand struct {
*rand.Rand
pos int64
pos importRandPosition
}

func newImportRand(pos int64) *importRand {
func (r *importRand) reseed(pos importRandPosition) {
adjPos := (pos / reseedRandEveryN) * reseedRandEveryN
rnd := rand.New(rand.NewSource(adjPos))
rnd := rand.New(rand.NewSource(int64(adjPos)))
for i := int(pos % reseedRandEveryN); i > 0; i-- {
_ = rnd.Float64()
}
return &importRand{rnd, pos}

r.Rand = rnd
r.pos = pos
}

func (r *importRand) advancePos() {
r.pos++
func (r *importRand) maybeReseed(c *cellInfoAnnotation) {
// newRowPos is the position of the first random function invocation of the
// row we're currently processing. If this is more than c.randInstancePerRow
// away, that means that we've skipped a batch of rows. importRand assumes
// that it operates on a contiguous set of rows when it increments its
// position, so if we skip some rows we need to reseed.
// We may skip rows because a single row converter may be responsible for
// converting several non-contiguous batches of KVs.
newRowPos := getPosForRandImport(c.rowID, c.sourceID, c.randInstancePerRow)
rowsSkipped := newRowPos.distance(r.pos) > int64(c.randInstancePerRow)
if rowsSkipped {
// Reseed at the new position, since our internally tracked r.pos is now out
// of sync.
r.reseed(newRowPos)
}
if r.pos%reseedRandEveryN == 0 {
// Time to reseed.
r.Rand = rand.New(rand.NewSource(r.pos))
r.reseed(r.pos)
}
}

func (r *importRand) Float64() float64 {
// Float64 implements the randomSource interface.
func (r *importRand) Float64(c *cellInfoAnnotation) float64 {
r.maybeReseed(c)
randNum := r.Rand.Float64()
r.advancePos()
r.pos++
return randNum
}

func (r *importRand) Int63() int64 {
// Int63 implements the randomSource interface.
func (r *importRand) Int63(c *cellInfoAnnotation) int64 {
r.maybeReseed(c)
randNum := r.Rand.Int63()
r.advancePos()
r.pos++
return randNum
}

func getSeedForImportRand(rowID int64, sourceID int32, numInstances int) int64 {
// We expect r.pos to increment by numInstances for each row.
// Therefore, assuming that rowID increments by 1 for every row,
// we will initialize the position as rowID * numInstances + sourceID << rowIDBits.
rowIDWithMultiplier := int64(numInstances) * rowID
return (int64(sourceID) << rowIDBits) ^ rowIDWithMultiplier
}

// For some functions (specifically the volatile ones), we do
// not want to use the provided builtin. Instead, we opt for
// our own function definition, which produces deterministic results.
Expand Down Expand Up @@ -102,8 +147,11 @@ type cellInfoAnnotation struct {
rowID int64
uniqueRowIDInstance int
uniqueRowIDTotal int
randSource *importRand
randInstancePerRow int

// Annotations for rand() and gen_random_uuid().
// randSource should not be used directly, but through getImportRand() instead.
randSource randomSource
randInstancePerRow int
}

func getCellInfoAnnotation(t *tree.Annotations) *cellInfoAnnotation {
Expand All @@ -116,6 +164,13 @@ func (c *cellInfoAnnotation) Reset(sourceID int32, rowID int64) {
c.uniqueRowIDInstance = 0
}

func makeImportRand(c *cellInfoAnnotation) randomSource {
pos := getPosForRandImport(c.rowID, c.sourceID, c.randInstancePerRow)
randSource := &importRand{}
randSource.reseed(pos)
return randSource
}

// We don't want to call unique_rowid() for columns with such default expressions
// because it is not idempotent and has unfortunate overlapping of output
// spans since it puts the uniqueness-ensuring per-generator part (nodeID)
Expand Down Expand Up @@ -164,19 +219,17 @@ func importUniqueRowID(evalCtx *tree.EvalContext, args tree.Datums) (tree.Datum,
func importRandom(evalCtx *tree.EvalContext, args tree.Datums) (tree.Datum, error) {
c := getCellInfoAnnotation(evalCtx.Annotations)
if c.randSource == nil {
c.randSource = newImportRand(getSeedForImportRand(
c.rowID, c.sourceID, c.randInstancePerRow))
c.randSource = makeImportRand(c)
}
return tree.NewDFloat(tree.DFloat(c.randSource.Float64())), nil
return tree.NewDFloat(tree.DFloat(c.randSource.Float64(c))), nil
}

func importGenUUID(evalCtx *tree.EvalContext, args tree.Datums) (tree.Datum, error) {
c := getCellInfoAnnotation(evalCtx.Annotations)
if c.randSource == nil {
c.randSource = newImportRand(getSeedForImportRand(
c.rowID, c.sourceID, c.randInstancePerRow))
c.randSource = makeImportRand(c)
}
gen := c.randSource.Int63()
gen := c.randSource.Int63(c)
id := uuid.MakeV4()
id.DeterministicV4(uint64(gen), uint64(1<<63))
return tree.NewDUuid(tree.DUuid{UUID: id}), nil
Expand Down

0 comments on commit 8a311c8

Please sign in to comment.