Skip to content

Commit

Permalink
importccl: avoid random_number collisions between KV batches
Browse files Browse the repository at this point in the history
An import may parallelize the work to convert SQL rows into KVs. During
this phase, default expressions are evaluated. Previously, IMPORT's
implementations of random number generates that are evaluated in default
expressions assumed that all of the given row IDs were contiguous. This
is not the case since 1 row converter may be responsible for converting
several non-contiguous batches of rows. This resulted in random_values
colliding between different batches of KV space.

This commit fixes this bug by feeding in the current position and
resetting the random source backing these methods. This ensures that
import treats a new contiguous batch of rows separately.

Release justification: bug fix
Release note (bug fix): Fix a bug where random numbers generated as
default expressions during IMPORT would collide a few hundred rows apart
from each-other.
  • Loading branch information
pbardea committed Feb 27, 2021
1 parent 4d24f8a commit 153062e
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 19 deletions.
37 changes: 37 additions & 0 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/row"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqltestutils"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/storage/cloud"
Expand Down Expand Up @@ -6733,3 +6734,39 @@ func TestDetachedImport(t *testing.T) {
sqlDB.QueryRow(t, importIntoQueryDetached, simpleOcf).Scan(&jobID)
waitForJobResult(t, tc, jobID, jobs.StatusFailed)
}

// This is a regression test for #61203. This would previously fail with a
// primary key collision error since we would generate duplicate UUIDs.
//
// Note: that although there is no guarantee that UUIDs do not collide, the
// probability of such a collision is vanishingly low.
func TestUniqueUUID(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
defer sqltestutils.SetTestJobsAdoptInterval()()

const (
nodes = 3
)
ctx := context.Background()
args := base.TestServerArgs{}
tc := testcluster.StartTestCluster(t, nodes, base.TestClusterArgs{ServerArgs: args})
defer tc.Stopper().Stop(ctx)
connDB := tc.Conns[0]
sqlDB := sqlutils.MakeSQLRunner(connDB)

sqlDB.Exec(t, `CREATE TABLE data AS SELECT * FROM generate_series(1, 20000);`)
sqlDB.Exec(t, `EXPORT INTO CSV 'userfile://defaultdb.my_files/export' FROM TABLE data;`)

// Ensure that UUIDs do not collide when importing 20000 rows.
sqlDB.Exec(t, `CREATE TABLE r1 (a UUID PRIMARY KEY DEFAULT gen_random_uuid(), b INT);`)
sqlDB.Exec(t, `IMPORT INTO r1 (b) CSV DATA ('userfile://defaultdb.my_files/export/*');`)

// Ensure that UUIDs do not collide when importing into a table with several UUID calls.
sqlDB.Exec(t, `CREATE TABLE r2 (a UUID PRIMARY KEY DEFAULT gen_random_uuid(), b INT, c UUID DEFAULT gen_random_uuid());`)
sqlDB.Exec(t, `IMPORT INTO r2 (b) CSV DATA ('userfile://defaultdb.my_files/export/*');`)

// Ensure that random keys do not collide.
sqlDB.Exec(t, `CREATE TABLE r3 (a FLOAT PRIMARY KEY DEFAULT random(), b INT);`)
sqlDB.Exec(t, `IMPORT INTO r3 (b) CSV DATA ('userfile://defaultdb.my_files/export/*');`)
}
76 changes: 57 additions & 19 deletions pkg/sql/row/expr_walker.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
// between the frequency of re-seeding and the number of calls to
// Float64() needed upon every resume. Therefore it will be useful to
// tune this parameter.
const reseedRandEveryN = 1000
const reseedRandEveryN = 11

// chunkSizeIncrementRate is the factor by which the size of the chunk of
// sequence values we allocate during an import increases.
Expand All @@ -47,44 +47,64 @@ const maxChunkSize = 100000

type importRand struct {
*rand.Rand
pos int64
pos importRandPosition
}

func newImportRand(pos int64) *importRand {
func (r *importRand) reseed(pos importRandPosition) {
adjPos := (pos / reseedRandEveryN) * reseedRandEveryN
rnd := rand.New(rand.NewSource(adjPos))
rnd := rand.New(rand.NewSource(int64(adjPos)))
for i := int(pos % reseedRandEveryN); i > 0; i-- {
_ = rnd.Float64()
}
return &importRand{rnd, pos}

r.Rand = rnd
r.pos = pos
}

func (r *importRand) advancePos() {
r.pos++
if r.pos%reseedRandEveryN == 0 {
// Time to reseed.
r.Rand = rand.New(rand.NewSource(r.pos))
r.reseed(r.pos)
}
}

// randomSource is only exposed through an interface to ensure that caller's
// don't access underlying field.
type randomSource interface {
// Float64 returns, as a float64, a pseudo-random number in [0.0,1.0).
Float64() float64
// Int63 returns a non-negative pseudo-random 63-bit integer as an int64.
Int63() int64
}

// Float64 implements the randomSource interface.
func (r *importRand) Float64() float64 {
randNum := r.Rand.Float64()
r.advancePos()
return randNum
}

// Int63 implements the randomSource interface.
func (r *importRand) Int63() int64 {
randNum := r.Rand.Int63()
r.advancePos()
return randNum
}

func getSeedForImportRand(rowID int64, sourceID int32, numInstances int) int64 {
// importRandPosition uniquely identifies an instance to a call to a random
// function during an import.
type importRandPosition int64

// getSeedForImportRand gives the importRandPosition for the first instance of a
// call to a random function when generating a given row from a given source.
// numInstances refers to the number of random function invocations per row.
func getSeedForImportRand(rowID int64, sourceID int32, numInstances int) importRandPosition {
// We expect r.pos to increment by numInstances for each row.
// Therefore, assuming that rowID increments by 1 for every row,
// we will initialize the position as rowID * numInstances + sourceID << rowIDBits.
rowIDWithMultiplier := int64(numInstances) * rowID
return (int64(sourceID) << rowIDBits) ^ rowIDWithMultiplier
pos := (int64(sourceID) << rowIDBits) ^ rowIDWithMultiplier
return importRandPosition(pos)
}

// For some functions (specifically the volatile ones), we do
Expand Down Expand Up @@ -135,8 +155,14 @@ type CellInfoAnnotation struct {
uniqueRowIDTotal int

// Annotations for rand() and gen_random_uuid().
// randSource should not be used directly, but through getImportRand() instead.
randSource *importRand
randInstancePerRow int
// newBatch keeps track if we're still operating in the same contiguous batch
// of keys. This is important to note since the randSource assumes that calls
// to random functions operate on contiguous rows, and should be reseeded when
// that's not the case.
newBatch bool

// Annotations for next_val().
seqNameToMetadata map[string]*SequenceMetadata
Expand All @@ -149,11 +175,29 @@ func getCellInfoAnnotation(t *tree.Annotations) *CellInfoAnnotation {
}

func (c *CellInfoAnnotation) reset(sourceID int32, rowID int64) {
// If the source has changed, the row is not the one immediately after
// the last one, we're processing a new batch.
c.newBatch = (c.sourceID != sourceID) || (c.rowID+1 != rowID)

c.sourceID = sourceID
c.rowID = rowID
c.uniqueRowIDInstance = 0
}

func (c *CellInfoAnnotation) getImportRand() randomSource {
pos := getSeedForImportRand(c.rowID, c.sourceID, c.randInstancePerRow)
if c.randSource == nil {
c.randSource = &importRand{}
c.randSource.reseed(pos)
}
if c.newBatch {
c.randSource.reseed(pos)
c.newBatch = false
}

return c.randSource
}

// We don't want to call unique_rowid() for columns with such default expressions
// because it is not idempotent and has unfortunate overlapping of output
// spans since it puts the uniqueness-ensuring per-generator part (nodeID)
Expand Down Expand Up @@ -201,20 +245,14 @@ func importUniqueRowID(evalCtx *tree.EvalContext, args tree.Datums) (tree.Datum,

func importRandom(evalCtx *tree.EvalContext, args tree.Datums) (tree.Datum, error) {
c := getCellInfoAnnotation(evalCtx.Annotations)
if c.randSource == nil {
c.randSource = newImportRand(getSeedForImportRand(
c.rowID, c.sourceID, c.randInstancePerRow))
}
return tree.NewDFloat(tree.DFloat(c.randSource.Float64())), nil
importRand := c.getImportRand()
return tree.NewDFloat(tree.DFloat(importRand.Float64())), nil
}

func importGenUUID(evalCtx *tree.EvalContext, args tree.Datums) (tree.Datum, error) {
c := getCellInfoAnnotation(evalCtx.Annotations)
if c.randSource == nil {
c.randSource = newImportRand(getSeedForImportRand(
c.rowID, c.sourceID, c.randInstancePerRow))
}
gen := c.randSource.Int63()
importRand := c.getImportRand()
gen := importRand.Int63()
id := uuid.MakeV4()
id.DeterministicV4(uint64(gen), uint64(1<<63))
return tree.NewDUuid(tree.DUuid{UUID: id}), nil
Expand Down

0 comments on commit 153062e

Please sign in to comment.