Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
93302: colflow: fix a possible test flake as well as misc test cleanup r=yuzefovich a=yuzefovich

This commit fixes a possible test flake where some tests that use `RandomDataOp` with `Selection=true` option could take really long time. This could occur if the selection probability is selected randomly to be tiny which results in zero-length batches continuously being generated. We now enforce 0.01 lower bound on that probability.

Additionally, this commit does the following cleanup:
- it fixes the formatting of a comment in a test which was messed up by the updated Go formatter with the upgrade to 1.19 version.
- it unexports `RandomDataOp` and returns the type schema directly from the constructor (the only reason it was previously exported).

Fixes: cockroachdb#92876.

Release note: None

93303: sql: avoid an allocation in SetIndexRecommendations r=yuzefovich a=yuzefovich

This commit fixes the code in `SetIndexRecommendations` to not create a copy of `optPlanningCtx` - we can just use the one that we have on the `planner` directly by taking its pointer. This is what we do in all other places.
```
name                           old time/op    new time/op    delta
Select1/Cockroach-24              158µs ± 5%     159µs ± 5%    ~     (p=0.631 n=10+10)
Select1/MultinodeCockroach-24     164µs ± 2%     165µs ± 2%    ~     (p=0.842 n=9+10)

name                           old alloc/op   new alloc/op   delta
Select1/Cockroach-24             23.0kB ± 1%    22.3kB ± 1%  -3.41%  (p=0.000 n=10+10)
Select1/MultinodeCockroach-24    22.5kB ± 2%    21.7kB ± 2%  -3.29%  (p=0.000 n=10+9)

name                           old allocs/op  new allocs/op  delta
Select1/Cockroach-24                208 ± 2%       205 ± 1%  -1.11%  (p=0.006 n=10+10)
Select1/MultinodeCockroach-24       184 ± 0%       182 ± 0%  -1.14%  (p=0.000 n=9+8)
```

Epic: None

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
craig[bot] and yuzefovich committed Dec 9, 2022
3 parents 4a641ac + 369e2bb + 02bea87 commit 62cfe1a
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 64 deletions.
38 changes: 20 additions & 18 deletions pkg/col/coldatatestutils/random_testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,12 +298,12 @@ const (
defaultNumBatches = 4
)

// RandomDataOpArgs are arguments passed in to RandomDataOp. All arguments are
// RandomDataOpArgs are arguments passed in to randomDataOp. All arguments are
// optional (refer to the constants above this struct definition for the
// defaults). Bools are false by default.
type RandomDataOpArgs struct {
// DeterministicTyps, if set, overrides MaxSchemaLength and disables type
// randomization, forcing the RandomDataOp to use this schema.
// randomization, forcing the randomDataOp to use this schema.
DeterministicTyps []*types.T
// MaxSchemaLength is the maximum length of the operator's schema, which will
// be at least one type.
Expand All @@ -322,9 +322,9 @@ type RandomDataOpArgs struct {
BatchAccumulator func(ctx context.Context, b coldata.Batch, typs []*types.T)
}

// RandomDataOp is an operator that generates random data according to
// randomDataOp is an operator that generates random data according to
// RandomDataOpArgs. Call GetBuffer to get all data that was returned.
type RandomDataOp struct {
type randomDataOp struct {
ctx context.Context
allocator *colmem.Allocator
batchAccumulator func(ctx context.Context, b coldata.Batch, typs []*types.T)
Expand All @@ -337,12 +337,12 @@ type RandomDataOp struct {
nulls bool
}

var _ colexecop.Operator = &RandomDataOp{}
var _ colexecop.Operator = &randomDataOp{}

// NewRandomDataOp creates a new RandomDataOp.
// NewRandomDataOp creates a new randomDataOp.
func NewRandomDataOp(
allocator *colmem.Allocator, rng *rand.Rand, args RandomDataOpArgs,
) *RandomDataOp {
) (colexecop.Operator, []*types.T) {
var (
maxSchemaLength = defaultMaxSchemaLength
batchSize = coldata.BatchSize()
Expand All @@ -366,7 +366,7 @@ func NewRandomDataOp(
typs[i] = randgen.RandType(rng)
}
}
return &RandomDataOp{
return &randomDataOp{
allocator: allocator,
batchAccumulator: args.BatchAccumulator,
typs: typs,
Expand All @@ -375,16 +375,16 @@ func NewRandomDataOp(
numBatches: numBatches,
selection: args.Selection,
nulls: args.Nulls,
}
}, typs
}

// Init is part of the colexecop.Operator interface.
func (o *RandomDataOp) Init(ctx context.Context) {
func (o *randomDataOp) Init(ctx context.Context) {
o.ctx = ctx
}

// Next is part of the colexecop.Operator interface.
func (o *RandomDataOp) Next() coldata.Batch {
func (o *randomDataOp) Next() coldata.Batch {
if o.numReturned == o.numBatches {
// Done.
b := coldata.ZeroBatch
Expand All @@ -400,6 +400,13 @@ func (o *RandomDataOp) Next() coldata.Batch {
)
if o.selection {
selProbability = o.rng.Float64()
// Ensure a reasonable lower bound on the probability of selecting a
// tuple into the batch. If we don't do this, it might be possible for
// us to spin for very long time in the loop below before we get a
// non-zero length batch if this probability is tiny.
if selProbability < 0.01 {
selProbability = 0.01
}
}
if o.nulls && o.rng.Float64() > 0.1 {
// Even if nulls are desired, in 10% of cases create a batch with no
Expand All @@ -421,18 +428,13 @@ func (o *RandomDataOp) Next() coldata.Batch {
}

// ChildCount implements the execopnode.OpNode interface.
func (o *RandomDataOp) ChildCount(verbose bool) int {
func (o *randomDataOp) ChildCount(verbose bool) int {
return 0
}

// Child implements the execopnode.OpNode interface.
func (o *RandomDataOp) Child(nth int, verbose bool) execopnode.OpNode {
func (o *randomDataOp) Child(nth int, verbose bool) execopnode.OpNode {
colexecerror.InternalError(errors.AssertionFailedf("invalid index %d", nth))
// This code is unreachable, but the compiler cannot infer that.
return nil
}

// Typs returns the output types of the RandomDataOp.
func (o *RandomDataOp) Typs() []*types.T {
return o.typs
}
3 changes: 1 addition & 2 deletions pkg/sql/colcontainer/diskqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestDiskQueue(t *testing.T) {
prefix, diskQueueCacheMode, alwaysCompress, suffix, numBatches), func(t *testing.T) {
// Create random input.
batches := make([]coldata.Batch, 0, numBatches)
op := coldatatestutils.NewRandomDataOp(testAllocator, rng, coldatatestutils.RandomDataOpArgs{
op, typs := coldatatestutils.NewRandomDataOp(testAllocator, rng, coldatatestutils.RandomDataOpArgs{
NumBatches: cap(batches),
BatchSize: 1 + rng.Intn(coldata.BatchSize()),
Nulls: true,
Expand All @@ -77,7 +77,6 @@ func TestDiskQueue(t *testing.T) {
},
})
op.Init(ctx)
typs := op.Typs()

queueCfg.SetCacheMode(diskQueueCacheMode)
if !rewindable {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/colexecutils/spilling_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestSpillingBuffer(t *testing.T) {
// window into them.
var tuples *AppendOnlyBufferedBatch
// Create random input.
op := coldatatestutils.NewRandomDataOp(testAllocator, rng, coldatatestutils.RandomDataOpArgs{
op, _ := coldatatestutils.NewRandomDataOp(testAllocator, rng, coldatatestutils.RandomDataOpArgs{
NumBatches: numBatches,
BatchSize: inputBatchSize,
Nulls: true,
Expand Down
6 changes: 2 additions & 4 deletions pkg/sql/colexec/colexecutils/spilling_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestSpillingQueue(t *testing.T) {
// tuples and will be comparing against a window into them.
var tuples *AppendOnlyBufferedBatch
// Create random input.
op := coldatatestutils.NewRandomDataOp(testAllocator, rng, coldatatestutils.RandomDataOpArgs{
op, typs := coldatatestutils.NewRandomDataOp(testAllocator, rng, coldatatestutils.RandomDataOpArgs{
NumBatches: numBatches,
BatchSize: inputBatchSize,
Nulls: true,
Expand All @@ -104,7 +104,6 @@ func TestSpillingQueue(t *testing.T) {
},
})
op.Init(ctx)
typs := op.Typs()

queueCfg.SetCacheMode(diskQueueCacheMode)
queueCfg.TestingKnobs.AlwaysCompress = alwaysCompress
Expand Down Expand Up @@ -261,7 +260,7 @@ func TestSpillingQueueDidntSpill(t *testing.T) {

rng, _ := randutil.NewTestRand()
numBatches := int(spillingQueueInitialItemsLen)*(1+rng.Intn(4)) + rng.Intn(int(spillingQueueInitialItemsLen))
op := coldatatestutils.NewRandomDataOp(testAllocator, rng, coldatatestutils.RandomDataOpArgs{
op, typs := coldatatestutils.NewRandomDataOp(testAllocator, rng, coldatatestutils.RandomDataOpArgs{
// TODO(yuzefovich): for some types (e.g. types.MakeArray(types.Int))
// the memory estimation diverges from 0 after Enqueue() / Dequeue()
// sequence. Figure it out.
Expand All @@ -272,7 +271,6 @@ func TestSpillingQueueDidntSpill(t *testing.T) {
})
op.Init(ctx)

typs := op.Typs()
// Choose a memory limit such that at most two batches can be kept in the
// in-memory buffer at a time (single batch is not enough because the queue
// delays the release of the memory by one batch).
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/colflow/colrpc/colrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func TestOutboxInbox(t *testing.T) {

inputMemAcc := testMemMonitor.MakeBoundAccount()
defer inputMemAcc.Close(outboxCtx)
input := coldatatestutils.NewRandomDataOp(
input, _ := coldatatestutils.NewRandomDataOp(
colmem.NewAllocator(outboxCtx, &inputMemAcc, coldata.StandardColumnFactory), rng, args,
)
outboxMemAcc := testMemMonitor.MakeBoundAccount()
Expand Down Expand Up @@ -682,7 +682,7 @@ func TestOutboxInboxMetadataPropagation(t *testing.T) {
serverStreamNotification = <-mockServer.InboundStreams
serverStream = serverStreamNotification.Stream
typs = []*types.T{types.Int}
input = coldatatestutils.NewRandomDataOp(
input, _ = coldatatestutils.NewRandomDataOp(
testAllocator,
rng,
coldatatestutils.RandomDataOpArgs{
Expand Down
64 changes: 30 additions & 34 deletions pkg/sql/colflow/vectorized_flow_shutdown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,41 +66,37 @@ var (
// synchronizer which then outputs all the data into a materializer.
// The resulting scheme looks as follows:
//
// Remote Node | Local Node
// |
// -> output -> Outbox -> | -> Inbox -> |
// | |
//
// Hash Router -> output -> Outbox -> | -> Inbox -> |
//
// | |
// -> output -> Outbox -> | -> Inbox -> |
// | -> Synchronizer -> materializer -> FlowCoordinator
// Outbox -> | -> Inbox -> |
// |
// Outbox -> | -> Inbox -> |
// |
// Outbox -> | -> Inbox -> |
// | Remote Node | Local Node
// | |
// | -> output -> Outbox -> | -> Inbox -> |
// | | |
// | Hash Router -> output -> Outbox -> | -> Inbox -> |
// | | |
// | -> output -> Outbox -> | -> Inbox -> |
// | | -> Synchronizer -> materializer -> FlowCoordinator
// | Outbox -> | -> Inbox -> |
// | |
// | Outbox -> | -> Inbox -> |
// | |
// | Outbox -> | -> Inbox -> |
//
// Also, with 50% probability, another remote node with the chain of an Outbox
// and Inbox is placed between the synchronizer and materializer. The resulting
// scheme then looks as follows:
//
// Remote Node | Another Remote Node | Local Node
// | |
// -> output -> Outbox -> | -> Inbox -> |
// | | | |
//
// Hash Router -> output -> Outbox -> | -> Inbox -> |
//
// | | | |
// -> output -> Outbox -> | -> Inbox -> |
// | | -> Synchronizer -> Outbox -> | -> Inbox -> materializer -> FlowCoordinator
// Outbox -> | -> Inbox -> |
// | | |
// Outbox -> | -> Inbox -> |
// | | |
// Outbox -> | -> Inbox -> |
// | Remote Node | Another Remote Node | Local Node
// | | |
// | -> output -> Outbox -> | -> Inbox -> |
// | | | | |
// | Hash Router -> output -> Outbox -> | -> Inbox -> |
// | | | | |
// | -> output -> Outbox -> | -> Inbox -> |
// | | | -> Synchronizer -> Outbox -> | -> Inbox -> materializer -> FlowCoordinator
// | Outbox -> | -> Inbox -> |
// | | | |
// | Outbox -> | -> Inbox -> |
// | | | |
// | Outbox -> | -> Inbox -> |
//
// Remote nodes are simulated by having separate contexts and separate outbox
// registries.
Expand Down Expand Up @@ -148,10 +144,10 @@ func TestVectorizedFlowShutdown(t *testing.T) {
}
rng, _ := randutil.NewTestRand()
var (
err error
wg sync.WaitGroup
typs = []*types.T{types.Int}
hashRouterInput = coldatatestutils.NewRandomDataOp(
err error
wg sync.WaitGroup
typs = []*types.T{types.Int}
hashRouterInput, _ = coldatatestutils.NewRandomDataOp(
testAllocator,
rng,
coldatatestutils.RandomDataOpArgs{
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -737,9 +737,7 @@ func (m execNodeTraceMetadata) annotateExplain(
func (ih *instrumentationHelper) SetIndexRecommendations(
ctx context.Context, idxRec *idxrecommendations.IndexRecCache, planner *planner, isInternal bool,
) {
opc := planner.optPlanningCtx
opc.reset(ctx)
stmtType := opc.p.stmt.AST.StatementType()
stmtType := planner.stmt.AST.StatementType()

reset := false
var recommendations []indexrec.Rec
Expand All @@ -750,6 +748,8 @@ func (ih *instrumentationHelper) SetIndexRecommendations(
stmtType,
isInternal,
) {
opc := &planner.optPlanningCtx
opc.reset(ctx)
f := opc.optimizer.Factory()
evalCtx := opc.p.EvalContext()
f.Init(ctx, evalCtx, opc.catalog)
Expand Down

0 comments on commit 62cfe1a

Please sign in to comment.