Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
44901: colexec: add "recursive" merging to external sort r=yuzefovich a=yuzefovich

**colexec: add "recursive" merging to external sort**

Previously, external sorter was merging all partitions at once (i.e.
there was a single merger). This is problematic because each partition
uses some amount of RAM for its buffer, we want to make sure that all
partitions together do not exceed the memory limit.

Now this is addressed by splitting previous "merging" stage into three:
1. new "repeated merging" stage is now responsible for merging all
current partitions and spilling new partitions to disk until only one is
left. This might be performed while we're still consuming the input.
2. new "final merging" stage that sets up an emitter that can merge all
the remaining partitions. This occurs only when the input has been fully
consumed and we can merge all the partitions at once, without having to
spill to disk.
3. new " emitting" stage that simply emits the output.

Addresses: #44727.

Release note: None

**colexec: fix external sorter memory accounting**

Previously, there was a problem with inputPartitioningOperator (which
divides up the input into "partitions" once the partition exceeds the
provided memory limit) because usage of Allocator.Clear is not
compatible with Allocator.PerformOperation. Now this problem is fixed by
explicitly tracking the memory used by the input batches with
RetainBatch against a "standalone" allocator. We're using standalone
budget for that to avoid double counting for the memory of the batch
(they are accounted already with unlimitedAllocator of the sorter).

Fixes: #45043.

Release note: None

45184: jobs: trigger job adoption on registry Run() r=pbardea a=pbardea

To quickly trigger the adoption of a job, make the job registry listen
to a channel which will be triggered when a job is run.

Release note: None

45244: colexec: miscellaneous HashRouter fixes r=yuzefovich a=asubiotto

This commit fixes several bugs and discomforts related to the HashRouter
observed when running tpch queries.
1) RetainBatch and ReleaseBatch weren't accounting for the selection vector,
   resulting in a slight memory leak.
2) HashRouter disk spilling infrastructure was not being cleaned up on
   successful termination.
3) HashRouter memory monitor names were not unique, they now contain the output
   StreamIDs. Example: hash-router-[1,2,3]-unlimited.
4) A big memory accounting leak was happening when returning an in-memory
   pending batch. This batch is a staging area that is flushed to disk when
   full, but returned if a read occurs before that. The batch was being
   returned without changing the memory account. Since batches are unsafe for
   reuse, the HashRouter should call ReleaseBatch in this case.
5) Tests would ignore if Run returned an error, resulting in hard-to-decipher
   failures where no output was returned. Run is now checked for errors before
   verifying the output.

Release note: None (bug fixes related to vectorize=experimental_on)

Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Paul Bardea <[email protected]>
Co-authored-by: Alfonso Subiotto Marques <[email protected]>
  • Loading branch information
4 people committed Feb 20, 2020
4 parents 1cd0905 + b344b39 + a721067 + 171282c commit 9fc70cc
Show file tree
Hide file tree
Showing 12 changed files with 361 additions and 114 deletions.
12 changes: 11 additions & 1 deletion pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1680,7 +1680,7 @@ func TestJobInTxn(t *testing.T) {
defer func(oldInterval time.Duration) {
jobs.DefaultAdoptInterval = oldInterval
}(jobs.DefaultAdoptInterval)
jobs.DefaultAdoptInterval = 10 * time.Millisecond
jobs.DefaultAdoptInterval = 5 * time.Second

ctx := context.Background()
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{})
Expand Down Expand Up @@ -1757,6 +1757,8 @@ func TestJobInTxn(t *testing.T) {
})

t.Run("rollback txn", func(t *testing.T) {
start := timeutil.Now()

txn, err := sqlDB.Begin()
require.NoError(t, err)
_, err = txn.Exec("BACKUP tobeaborted TO doesnotmattter")
Expand All @@ -1773,9 +1775,13 @@ func TestJobInTxn(t *testing.T) {
sqlRunner.Exec(t, "SHOW JOB WHEN COMPLETE $1", *job.ID())
require.Equal(t, int32(0), atomic.LoadInt32(&hasRun),
"job has run in transaction before txn commit")

require.True(t, timeutil.Since(start) < jobs.DefaultAdoptInterval, "job should have been adopted immediately")
})

t.Run("normal success", func(t *testing.T) {
start := timeutil.Now()

// Now let's actually commit the transaction and check that the job ran.
txn, err := sqlDB.Begin()
require.NoError(t, err)
Expand All @@ -1791,9 +1797,12 @@ func TestJobInTxn(t *testing.T) {
require.Equal(t, int32(1), atomic.LoadInt32(&hasRun),
"more than one job ran")
require.Equal(t, "", j.Payload().Error)

require.True(t, timeutil.Since(start) < jobs.DefaultAdoptInterval, "job should have been adopted immediately")
})

t.Run("one of the queued jobs fails", func(t *testing.T) {
start := timeutil.Now()
txn, err := sqlDB.Begin()
require.NoError(t, err)

Expand All @@ -1807,5 +1816,6 @@ func TestJobInTxn(t *testing.T) {
// Now let's actually commit the transaction and check that there is a
// failure.
require.Error(t, txn.Commit())
require.True(t, timeutil.Since(start) < jobs.DefaultAdoptInterval, "job should have been adopted immediately")
})
}
46 changes: 30 additions & 16 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,16 @@ type NodeLiveness interface {
// node simply behaves as though its leniency period is 0. Epoch-based
// nodes will see time-based nodes delay the act of stealing a job.
type Registry struct {
ac log.AmbientContext
stopper *stop.Stopper
db *client.DB
ex sqlutil.InternalExecutor
clock *hlc.Clock
nodeID *base.NodeIDContainer
settings *cluster.Settings
planFn planHookMaker
metrics Metrics
ac log.AmbientContext
stopper *stop.Stopper
db *client.DB
ex sqlutil.InternalExecutor
clock *hlc.Clock
nodeID *base.NodeIDContainer
settings *cluster.Settings
planFn planHookMaker
metrics Metrics
adoptionCh chan struct{}

// if non-empty, indicates path to file that prevents any job adoptions.
preventAdoptionFile string
Expand Down Expand Up @@ -165,6 +166,7 @@ func MakeRegistry(
settings: settings,
planFn: planFn,
preventAdoptionFile: preventAdoptionFile,
adoptionCh: make(chan struct{}),
}
r.mu.epoch = 1
r.mu.jobs = make(map[int64]context.CancelFunc)
Expand Down Expand Up @@ -263,6 +265,12 @@ func (r *Registry) Run(ctx context.Context, ex sqlutil.InternalExecutor, jobs []
log.Infof(ctx, "scheduled jobs %+v", jobs)
buf := bytes.Buffer{}
for i, id := range jobs {
select {
case r.adoptionCh <- struct{}{}:
case <-ctx.Done():
return ctx.Err()
}

if i > 0 {
buf.WriteString(",")
}
Expand Down Expand Up @@ -393,19 +401,25 @@ func (r *Registry) Start(
}
})

maybeAdoptJobs := func(ctx context.Context) {
if r.adoptionDisabled(ctx) {
r.cancelAll(ctx)
return
}
if err := r.maybeAdoptJob(ctx, nl); err != nil {
log.Errorf(ctx, "error while adopting jobs: %s", err)
}
}

stopper.RunWorker(context.Background(), func(ctx context.Context) {
for {
select {
case <-stopper.ShouldStop():
return
case <-r.adoptionCh:
maybeAdoptJobs(ctx)
case <-time.After(adoptInterval):
if r.adoptionDisabled(ctx) {
r.cancelAll(ctx)
continue
}
if err := r.maybeAdoptJob(ctx, nl); err != nil {
log.Errorf(ctx, "error while adopting jobs: %s", err)
}
maybeAdoptJobs(ctx)
}
}
})
Expand Down
16 changes: 11 additions & 5 deletions pkg/sql/colexec/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,14 @@ func (a *Allocator) NewMemBatch(types []coltypes.T) coldata.Batch {
return a.NewMemBatchWithSize(types, int(coldata.BatchSize()))
}

func (a *Allocator) selVectorSize(capacity int) int64 {
return int64(capacity * sizeOfUint16)
}

// NewMemBatchWithSize allocates a new in-memory coldata.Batch with the given
// column size.
func (a *Allocator) NewMemBatchWithSize(types []coltypes.T, size int) coldata.Batch {
selVectorSize := size * sizeOfUint16
estimatedStaticMemoryUsage := int64(estimateBatchSizeBytes(types, size) + selVectorSize)
estimatedStaticMemoryUsage := a.selVectorSize(size) + int64(estimateBatchSizeBytes(types, size))
if err := a.acc.Grow(a.ctx, estimatedStaticMemoryUsage); err != nil {
execerror.VectorizedInternalPanic(err)
}
Expand All @@ -71,7 +74,7 @@ func (a *Allocator) NewMemBatchWithSize(types []coltypes.T, size int) coldata.Ba
// through PerformOperation. Use this if you want to explicitly manage the
// memory accounted for.
func (a *Allocator) RetainBatch(b coldata.Batch) {
if err := a.acc.Grow(a.ctx, getVecsSize(b.ColVecs())); err != nil {
if err := a.acc.Grow(a.ctx, a.selVectorSize(cap(b.Selection()))+getVecsSize(b.ColVecs())); err != nil {
execerror.VectorizedInternalPanic(err)
}
}
Expand All @@ -81,7 +84,7 @@ func (a *Allocator) RetainBatch(b coldata.Batch) {
// Flow.Cleanup. Use this if you want to explicitly manage the memory used. An
// example of a use case is releasing a batch before writing it to disk.
func (a *Allocator) ReleaseBatch(b coldata.Batch) {
a.acc.Shrink(a.ctx, getVecsSize(b.ColVecs()))
a.acc.Shrink(a.ctx, a.selVectorSize(cap(b.Selection()))+getVecsSize(b.ColVecs()))
}

// NewMemColumn returns a new coldata.Vec, initialized with a length.
Expand Down Expand Up @@ -154,8 +157,11 @@ func (a *Allocator) Used() int64 {
}

// Clear clears up the memory account of the allocator.
// WARNING: usage of this method is *not* compatible with using
// PerformOperation. Use this only in combination with RetainBatch /
// ReleaseBatch.
func (a *Allocator) Clear() {
a.acc.Clear(a.ctx)
a.acc.Shrink(a.ctx, a.acc.Used())
}

const (
Expand Down
37 changes: 37 additions & 0 deletions pkg/sql/colexec/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"reflect"

"github.com/cockroachdb/cockroach/pkg/col/coltypes"
"github.com/cockroachdb/cockroach/pkg/sql/colcontainer"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/execerror"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/typeconv"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
Expand Down Expand Up @@ -93,6 +94,7 @@ type NewColOperatorArgs struct {
Inputs []Operator
StreamingMemAccount *mon.BoundAccount
ProcessorConstructor execinfra.ProcessorConstructor
DiskQueueCfg colcontainer.DiskQueueCfg
TestingKnobs struct {
// UseStreamingMemAccountForBuffering specifies whether to use
// StreamingMemAccount when creating buffering operators and should only be
Expand All @@ -108,6 +110,9 @@ type NewColOperatorArgs struct {
// DiskSpillingDisabled specifies whether only in-memory operators should
// be created.
DiskSpillingDisabled bool
// MaxNumberPartitions determines the maximum number of "active"
// partitions for Partitioner interface.
MaxNumberPartitions int
}
}

Expand Down Expand Up @@ -762,15 +767,22 @@ func NewColOperator(
ctx, result.createBufferingUnlimitedMemAccount(
ctx, flowCtx, monitorNamePrefix,
))
standaloneAllocator := NewAllocator(
ctx, result.createStandaloneMemAccount(
ctx, flowCtx, monitorNamePrefix,
))
diskQueuesUnlimitedAllocator := NewAllocator(
ctx, result.createBufferingUnlimitedMemAccount(
ctx, flowCtx, monitorNamePrefix+"-disk-queues",
))
return newExternalSorter(
unlimitedAllocator,
standaloneAllocator,
input, inputTypes, core.Sorter.OutputOrdering,
execinfra.GetWorkMemLimit(flowCtx.Cfg),
args.TestingKnobs.MaxNumberPartitions,
diskQueuesUnlimitedAllocator,
args.DiskQueueCfg,
)
},
args.TestingKnobs.SpillingCallbackFn,
Expand Down Expand Up @@ -996,6 +1008,31 @@ func (r *NewColOperatorResult) createBufferingUnlimitedMemAccount(
return &bufferingMemAccount
}

// createStandaloneMemAccount instantiates an unlimited memory monitor and a
// memory account that have a standalone budget. This means that the memory
// registered with these objects is *not* reported to the root monitor (i.e.
// it will not count towards max-sql-memory). Use it only when the memory in
// use is accounted for with a different memory monitor. The receiver is
// updated to have references to both objects.
func (r *NewColOperatorResult) createStandaloneMemAccount(
ctx context.Context, flowCtx *execinfra.FlowCtx, name string,
) *mon.BoundAccount {
standaloneMemMonitor := mon.MakeMonitor(
name+"-standalone",
mon.MemoryResource,
nil, /* curCount */
nil, /* maxHist */
-1, /* increment: use default increment */
math.MaxInt64, /* noteworthy */
flowCtx.Cfg.Settings,
)
r.BufferingOpMemMonitors = append(r.BufferingOpMemMonitors, &standaloneMemMonitor)
standaloneMemMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(math.MaxInt64))
standaloneMemAccount := standaloneMemMonitor.MakeBoundAccount()
r.BufferingOpMemAccounts = append(r.BufferingOpMemAccounts, &standaloneMemAccount)
return &standaloneMemAccount
}

func (r *NewColOperatorResult) planFilterExpr(
ctx context.Context,
evalCtx *tree.EvalContext,
Expand Down
Loading

0 comments on commit 9fc70cc

Please sign in to comment.