diff --git a/pkg/col/coldatatestutils/random_testutils.go b/pkg/col/coldatatestutils/random_testutils.go index 144db1b79cf0..4fd05ff6d1f1 100644 --- a/pkg/col/coldatatestutils/random_testutils.go +++ b/pkg/col/coldatatestutils/random_testutils.go @@ -298,12 +298,12 @@ const ( defaultNumBatches = 4 ) -// RandomDataOpArgs are arguments passed in to RandomDataOp. All arguments are +// RandomDataOpArgs are arguments passed in to randomDataOp. All arguments are // optional (refer to the constants above this struct definition for the // defaults). Bools are false by default. type RandomDataOpArgs struct { // DeterministicTyps, if set, overrides MaxSchemaLength and disables type - // randomization, forcing the RandomDataOp to use this schema. + // randomization, forcing the randomDataOp to use this schema. DeterministicTyps []*types.T // MaxSchemaLength is the maximum length of the operator's schema, which will // be at least one type. @@ -322,9 +322,9 @@ type RandomDataOpArgs struct { BatchAccumulator func(ctx context.Context, b coldata.Batch, typs []*types.T) } -// RandomDataOp is an operator that generates random data according to +// randomDataOp is an operator that generates random data according to // RandomDataOpArgs. Call GetBuffer to get all data that was returned. -type RandomDataOp struct { +type randomDataOp struct { ctx context.Context allocator *colmem.Allocator batchAccumulator func(ctx context.Context, b coldata.Batch, typs []*types.T) @@ -337,12 +337,12 @@ type RandomDataOp struct { nulls bool } -var _ colexecop.Operator = &RandomDataOp{} +var _ colexecop.Operator = &randomDataOp{} -// NewRandomDataOp creates a new RandomDataOp. +// NewRandomDataOp creates a new randomDataOp. func NewRandomDataOp( allocator *colmem.Allocator, rng *rand.Rand, args RandomDataOpArgs, -) *RandomDataOp { +) (colexecop.Operator, []*types.T) { var ( maxSchemaLength = defaultMaxSchemaLength batchSize = coldata.BatchSize() @@ -366,7 +366,7 @@ func NewRandomDataOp( typs[i] = randgen.RandType(rng) } } - return &RandomDataOp{ + return &randomDataOp{ allocator: allocator, batchAccumulator: args.BatchAccumulator, typs: typs, @@ -375,16 +375,16 @@ func NewRandomDataOp( numBatches: numBatches, selection: args.Selection, nulls: args.Nulls, - } + }, typs } // Init is part of the colexecop.Operator interface. -func (o *RandomDataOp) Init(ctx context.Context) { +func (o *randomDataOp) Init(ctx context.Context) { o.ctx = ctx } // Next is part of the colexecop.Operator interface. -func (o *RandomDataOp) Next() coldata.Batch { +func (o *randomDataOp) Next() coldata.Batch { if o.numReturned == o.numBatches { // Done. b := coldata.ZeroBatch @@ -400,6 +400,13 @@ func (o *RandomDataOp) Next() coldata.Batch { ) if o.selection { selProbability = o.rng.Float64() + // Ensure a reasonable lower bound on the probability of selecting a + // tuple into the batch. If we don't do this, it might be possible for + // us to spin for very long time in the loop below before we get a + // non-zero length batch if this probability is tiny. + if selProbability < 0.01 { + selProbability = 0.01 + } } if o.nulls && o.rng.Float64() > 0.1 { // Even if nulls are desired, in 10% of cases create a batch with no @@ -421,18 +428,13 @@ func (o *RandomDataOp) Next() coldata.Batch { } // ChildCount implements the execopnode.OpNode interface. -func (o *RandomDataOp) ChildCount(verbose bool) int { +func (o *randomDataOp) ChildCount(verbose bool) int { return 0 } // Child implements the execopnode.OpNode interface. -func (o *RandomDataOp) Child(nth int, verbose bool) execopnode.OpNode { +func (o *randomDataOp) Child(nth int, verbose bool) execopnode.OpNode { colexecerror.InternalError(errors.AssertionFailedf("invalid index %d", nth)) // This code is unreachable, but the compiler cannot infer that. return nil } - -// Typs returns the output types of the RandomDataOp. -func (o *RandomDataOp) Typs() []*types.T { - return o.typs -} diff --git a/pkg/sql/colcontainer/diskqueue_test.go b/pkg/sql/colcontainer/diskqueue_test.go index cb3002d60c40..3cd73e717272 100644 --- a/pkg/sql/colcontainer/diskqueue_test.go +++ b/pkg/sql/colcontainer/diskqueue_test.go @@ -68,7 +68,7 @@ func TestDiskQueue(t *testing.T) { prefix, diskQueueCacheMode, alwaysCompress, suffix, numBatches), func(t *testing.T) { // Create random input. batches := make([]coldata.Batch, 0, numBatches) - op := coldatatestutils.NewRandomDataOp(testAllocator, rng, coldatatestutils.RandomDataOpArgs{ + op, typs := coldatatestutils.NewRandomDataOp(testAllocator, rng, coldatatestutils.RandomDataOpArgs{ NumBatches: cap(batches), BatchSize: 1 + rng.Intn(coldata.BatchSize()), Nulls: true, @@ -77,7 +77,6 @@ func TestDiskQueue(t *testing.T) { }, }) op.Init(ctx) - typs := op.Typs() queueCfg.SetCacheMode(diskQueueCacheMode) if !rewindable { diff --git a/pkg/sql/colexec/colexecutils/spilling_buffer_test.go b/pkg/sql/colexec/colexecutils/spilling_buffer_test.go index d58aa7903890..a9fbd13fbcc5 100644 --- a/pkg/sql/colexec/colexecutils/spilling_buffer_test.go +++ b/pkg/sql/colexec/colexecutils/spilling_buffer_test.go @@ -90,7 +90,7 @@ func TestSpillingBuffer(t *testing.T) { // window into them. var tuples *AppendOnlyBufferedBatch // Create random input. - op := coldatatestutils.NewRandomDataOp(testAllocator, rng, coldatatestutils.RandomDataOpArgs{ + op, _ := coldatatestutils.NewRandomDataOp(testAllocator, rng, coldatatestutils.RandomDataOpArgs{ NumBatches: numBatches, BatchSize: inputBatchSize, Nulls: true, diff --git a/pkg/sql/colexec/colexecutils/spilling_queue_test.go b/pkg/sql/colexec/colexecutils/spilling_queue_test.go index 87e826c98676..8636d4a84c19 100644 --- a/pkg/sql/colexec/colexecutils/spilling_queue_test.go +++ b/pkg/sql/colexec/colexecutils/spilling_queue_test.go @@ -89,7 +89,7 @@ func TestSpillingQueue(t *testing.T) { // tuples and will be comparing against a window into them. var tuples *AppendOnlyBufferedBatch // Create random input. - op := coldatatestutils.NewRandomDataOp(testAllocator, rng, coldatatestutils.RandomDataOpArgs{ + op, typs := coldatatestutils.NewRandomDataOp(testAllocator, rng, coldatatestutils.RandomDataOpArgs{ NumBatches: numBatches, BatchSize: inputBatchSize, Nulls: true, @@ -104,7 +104,6 @@ func TestSpillingQueue(t *testing.T) { }, }) op.Init(ctx) - typs := op.Typs() queueCfg.SetCacheMode(diskQueueCacheMode) queueCfg.TestingKnobs.AlwaysCompress = alwaysCompress @@ -261,7 +260,7 @@ func TestSpillingQueueDidntSpill(t *testing.T) { rng, _ := randutil.NewTestRand() numBatches := int(spillingQueueInitialItemsLen)*(1+rng.Intn(4)) + rng.Intn(int(spillingQueueInitialItemsLen)) - op := coldatatestutils.NewRandomDataOp(testAllocator, rng, coldatatestutils.RandomDataOpArgs{ + op, typs := coldatatestutils.NewRandomDataOp(testAllocator, rng, coldatatestutils.RandomDataOpArgs{ // TODO(yuzefovich): for some types (e.g. types.MakeArray(types.Int)) // the memory estimation diverges from 0 after Enqueue() / Dequeue() // sequence. Figure it out. @@ -272,7 +271,6 @@ func TestSpillingQueueDidntSpill(t *testing.T) { }) op.Init(ctx) - typs := op.Typs() // Choose a memory limit such that at most two batches can be kept in the // in-memory buffer at a time (single batch is not enough because the queue // delays the release of the memory by one batch). diff --git a/pkg/sql/colflow/colrpc/colrpc_test.go b/pkg/sql/colflow/colrpc/colrpc_test.go index 336dc3cf4e00..445490b56e9e 100644 --- a/pkg/sql/colflow/colrpc/colrpc_test.go +++ b/pkg/sql/colflow/colrpc/colrpc_test.go @@ -270,7 +270,7 @@ func TestOutboxInbox(t *testing.T) { inputMemAcc := testMemMonitor.MakeBoundAccount() defer inputMemAcc.Close(outboxCtx) - input := coldatatestutils.NewRandomDataOp( + input, _ := coldatatestutils.NewRandomDataOp( colmem.NewAllocator(outboxCtx, &inputMemAcc, coldata.StandardColumnFactory), rng, args, ) outboxMemAcc := testMemMonitor.MakeBoundAccount() @@ -682,7 +682,7 @@ func TestOutboxInboxMetadataPropagation(t *testing.T) { serverStreamNotification = <-mockServer.InboundStreams serverStream = serverStreamNotification.Stream typs = []*types.T{types.Int} - input = coldatatestutils.NewRandomDataOp( + input, _ = coldatatestutils.NewRandomDataOp( testAllocator, rng, coldatatestutils.RandomDataOpArgs{ diff --git a/pkg/sql/colflow/vectorized_flow_shutdown_test.go b/pkg/sql/colflow/vectorized_flow_shutdown_test.go index 5a38a4b4f63b..e1771ffb8ae7 100644 --- a/pkg/sql/colflow/vectorized_flow_shutdown_test.go +++ b/pkg/sql/colflow/vectorized_flow_shutdown_test.go @@ -66,41 +66,37 @@ var ( // synchronizer which then outputs all the data into a materializer. // The resulting scheme looks as follows: // -// Remote Node | Local Node -// | -// -> output -> Outbox -> | -> Inbox -> | -// | | -// -// Hash Router -> output -> Outbox -> | -> Inbox -> | -// -// | | -// -> output -> Outbox -> | -> Inbox -> | -// | -> Synchronizer -> materializer -> FlowCoordinator -// Outbox -> | -> Inbox -> | -// | -// Outbox -> | -> Inbox -> | -// | -// Outbox -> | -> Inbox -> | +// | Remote Node | Local Node +// | | +// | -> output -> Outbox -> | -> Inbox -> | +// | | | +// | Hash Router -> output -> Outbox -> | -> Inbox -> | +// | | | +// | -> output -> Outbox -> | -> Inbox -> | +// | | -> Synchronizer -> materializer -> FlowCoordinator +// | Outbox -> | -> Inbox -> | +// | | +// | Outbox -> | -> Inbox -> | +// | | +// | Outbox -> | -> Inbox -> | // // Also, with 50% probability, another remote node with the chain of an Outbox // and Inbox is placed between the synchronizer and materializer. The resulting // scheme then looks as follows: // -// Remote Node | Another Remote Node | Local Node -// | | -// -> output -> Outbox -> | -> Inbox -> | -// | | | | -// -// Hash Router -> output -> Outbox -> | -> Inbox -> | -// -// | | | | -// -> output -> Outbox -> | -> Inbox -> | -// | | -> Synchronizer -> Outbox -> | -> Inbox -> materializer -> FlowCoordinator -// Outbox -> | -> Inbox -> | -// | | | -// Outbox -> | -> Inbox -> | -// | | | -// Outbox -> | -> Inbox -> | +// | Remote Node | Another Remote Node | Local Node +// | | | +// | -> output -> Outbox -> | -> Inbox -> | +// | | | | | +// | Hash Router -> output -> Outbox -> | -> Inbox -> | +// | | | | | +// | -> output -> Outbox -> | -> Inbox -> | +// | | | -> Synchronizer -> Outbox -> | -> Inbox -> materializer -> FlowCoordinator +// | Outbox -> | -> Inbox -> | +// | | | | +// | Outbox -> | -> Inbox -> | +// | | | | +// | Outbox -> | -> Inbox -> | // // Remote nodes are simulated by having separate contexts and separate outbox // registries. @@ -148,10 +144,10 @@ func TestVectorizedFlowShutdown(t *testing.T) { } rng, _ := randutil.NewTestRand() var ( - err error - wg sync.WaitGroup - typs = []*types.T{types.Int} - hashRouterInput = coldatatestutils.NewRandomDataOp( + err error + wg sync.WaitGroup + typs = []*types.T{types.Int} + hashRouterInput, _ = coldatatestutils.NewRandomDataOp( testAllocator, rng, coldatatestutils.RandomDataOpArgs{ diff --git a/pkg/sql/instrumentation.go b/pkg/sql/instrumentation.go index 626a5627fe5f..25f599e414f1 100644 --- a/pkg/sql/instrumentation.go +++ b/pkg/sql/instrumentation.go @@ -737,9 +737,7 @@ func (m execNodeTraceMetadata) annotateExplain( func (ih *instrumentationHelper) SetIndexRecommendations( ctx context.Context, idxRec *idxrecommendations.IndexRecCache, planner *planner, isInternal bool, ) { - opc := planner.optPlanningCtx - opc.reset(ctx) - stmtType := opc.p.stmt.AST.StatementType() + stmtType := planner.stmt.AST.StatementType() reset := false var recommendations []indexrec.Rec @@ -750,6 +748,8 @@ func (ih *instrumentationHelper) SetIndexRecommendations( stmtType, isInternal, ) { + opc := &planner.optPlanningCtx + opc.reset(ctx) f := opc.optimizer.Factory() evalCtx := opc.p.EvalContext() f.Init(ctx, evalCtx, opc.catalog)