From e297981537778db177b4a252f790109ba630b511 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Wed, 25 Mar 2020 13:39:03 -0700 Subject: [PATCH 1/6] colexec: fix resetting of buffered groups Release justification: bug fixes and low-risk updates to new functionality. Previously, whenever we needed to reset the buffered groups, we would close the spilling queue and would create a new one when needed. This is an overkill since we could simply reset the spilling queues that we have which reduces amount of allocations and improves the performance. Release note: None --- pkg/sql/colexec/mergejoiner.go | 38 +++++++++++------------------ pkg/sql/colexec/mergejoiner_tmpl.go | 12 ++++++--- 2 files changed, 22 insertions(+), 28 deletions(-) diff --git a/pkg/sql/colexec/mergejoiner.go b/pkg/sql/colexec/mergejoiner.go index b79a0f8d1b3a..5e0ad53adbc5 100644 --- a/pkg/sql/colexec/mergejoiner.go +++ b/pkg/sql/colexec/mergejoiner.go @@ -102,8 +102,8 @@ type mjBufferedGroup struct { } func (bg *mjBufferedGroup) reset(ctx context.Context) { - if err := bg.close(ctx); err != nil { - execerror.VectorizedInternalPanic(err) + if bg.spillingQueue != nil { + bg.spillingQueue.reset(ctx) } bg.numTuples = 0 } @@ -441,10 +441,18 @@ func (o *mergeJoinBase) initWithOutputBatchSize(outBatchSize int) { o.outputBatchSize = 1<<16 - 1 } + o.proberState.lBufferedGroup.spillingQueue = newSpillingQueue( + o.unlimitedAllocator, o.left.sourceTypes, o.memoryLimit, + o.diskQueueCfg, o.fdSemaphore, coldata.BatchSize(), o.diskAcc, + ) o.proberState.lBufferedGroup.firstTuple = make([]coldata.Vec, len(o.left.sourceTypes)) for colIdx, colType := range o.left.sourceTypes { o.proberState.lBufferedGroup.firstTuple[colIdx] = o.unlimitedAllocator.NewMemColumn(colType, 1) } + o.proberState.rBufferedGroup.spillingQueue = newRewindableSpillingQueue( + o.unlimitedAllocator, o.right.sourceTypes, o.memoryLimit, + o.diskQueueCfg, o.fdSemaphore, coldata.BatchSize(), o.diskAcc, + ) o.proberState.rBufferedGroup.firstTuple = make([]coldata.Vec, len(o.right.sourceTypes)) for colIdx, colType := range o.right.sourceTypes { o.proberState.rBufferedGroup.firstTuple[colIdx] = o.unlimitedAllocator.NewMemColumn(colType, 1) @@ -485,12 +493,6 @@ func (o *mergeJoinBase) appendToBufferedGroup( if input == &o.left { sourceTypes = o.left.sourceTypes bufferedGroup = &o.proberState.lBufferedGroup - if bufferedGroup.spillingQueue == nil { - bufferedGroup.spillingQueue = newSpillingQueue( - o.unlimitedAllocator, o.left.sourceTypes, o.memoryLimit, - o.diskQueueCfg, o.fdSemaphore, coldata.BatchSize(), o.diskAcc, - ) - } // TODO(yuzefovich): uncomment when spillingQueue actually copies the // enqueued batches when those are kept in memory. //if o.scratch.lBufferedGroupBatch == nil { @@ -500,12 +502,6 @@ func (o *mergeJoinBase) appendToBufferedGroup( } else { sourceTypes = o.right.sourceTypes bufferedGroup = &o.proberState.rBufferedGroup - if bufferedGroup.spillingQueue == nil { - bufferedGroup.spillingQueue = newRewindableSpillingQueue( - o.unlimitedAllocator, o.right.sourceTypes, o.memoryLimit, - o.diskQueueCfg, o.fdSemaphore, coldata.BatchSize(), o.diskAcc, - ) - } // TODO(yuzefovich): uncomment when spillingQueue actually copies the // enqueued batches when those are kept in memory. //if o.scratch.rBufferedGroupBatch == nil { @@ -703,17 +699,11 @@ func (o *mergeJoinBase) Close(ctx context.Context) error { } } } - if o.proberState.lBufferedGroup.spillingQueue != nil { - if err := o.proberState.lBufferedGroup.close(ctx); err != nil { - lastErr = err - } - o.proberState.lBufferedGroup.spillingQueue = nil + if err := o.proberState.lBufferedGroup.close(ctx); err != nil { + lastErr = err } - if o.proberState.rBufferedGroup.spillingQueue != nil { - if err := o.proberState.rBufferedGroup.close(ctx); err != nil { - lastErr = err - } - o.proberState.rBufferedGroup.spillingQueue = nil + if err := o.proberState.rBufferedGroup.close(ctx); err != nil { + lastErr = err } return lastErr } diff --git a/pkg/sql/colexec/mergejoiner_tmpl.go b/pkg/sql/colexec/mergejoiner_tmpl.go index 751f7e478350..fdd55948f980 100644 --- a/pkg/sql/colexec/mergejoiner_tmpl.go +++ b/pkg/sql/colexec/mergejoiner_tmpl.go @@ -1487,11 +1487,15 @@ func (o *mergeJoin_JOIN_TYPE_STRINGOp) Next(ctx context.Context) coldata.Batch { return o.output } case mjDone: - if err := o.proberState.lBufferedGroup.close(ctx); err != nil { - execerror.VectorizedInternalPanic(err) + // Note that resetting of buffered groups will close disk queues + // (if there are any). + if o.proberState.lBufferedGroupNeedToReset { + o.proberState.lBufferedGroup.reset(ctx) + o.proberState.lBufferedGroupNeedToReset = false } - if err := o.proberState.rBufferedGroup.close(ctx); err != nil { - execerror.VectorizedInternalPanic(err) + if o.proberState.rBufferedGroupNeedToReset { + o.proberState.rBufferedGroup.reset(ctx) + o.proberState.rBufferedGroupNeedToReset = false } return coldata.ZeroBatch default: From e09543ff91bda2ab7aa551cd9d1f5b1471c3fc9b Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Wed, 25 Mar 2020 19:48:18 -0700 Subject: [PATCH 2/6] logictest: make results of queries in vectorize test deterministic Release justification: non-production code changes. Several queries in `vectorize` logic test could have produced different results, and this is now fixed. Release note: None --- pkg/sql/logictest/testdata/logic_test/vectorize | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/pkg/sql/logictest/testdata/logic_test/vectorize b/pkg/sql/logictest/testdata/logic_test/vectorize index 82965fdbf60f..e4627f1b3f23 100644 --- a/pkg/sql/logictest/testdata/logic_test/vectorize +++ b/pkg/sql/logictest/testdata/logic_test/vectorize @@ -11,7 +11,7 @@ statement ok INSERT INTO a SELECT g//2, g, g FROM generate_series(0,2000) g(g) query II -SELECT a, CASE WHEN a = 0 THEN 0 WHEN a = 1 THEN 3 ELSE 5 END FROM a LIMIT 6 +SELECT a, CASE WHEN a = 0 THEN 0 WHEN a = 1 THEN 3 ELSE 5 END FROM a ORDER BY 1, 2 LIMIT 6 ---- 0 0 0 0 @@ -71,7 +71,7 @@ SELECT count(*) FROM (SELECT DISTINCT a FROM a) 1001 query III -SELECT * FROM a LIMIT 10 +SELECT * FROM a ORDER BY 1, 2 LIMIT 10 ---- 0 0 0 0 1 1 @@ -85,7 +85,7 @@ SELECT * FROM a LIMIT 10 4 9 9 query II -SELECT DISTINCT(a), b FROM a LIMIT 10 +SELECT DISTINCT(a), b FROM a ORDER BY 1, 2 LIMIT 10 ---- 0 0 0 1 @@ -99,7 +99,7 @@ SELECT DISTINCT(a), b FROM a LIMIT 10 4 9 # Simple filter. -query I +query I rowsort SELECT b FROM a WHERE b < 3 ---- 0 @@ -107,7 +107,7 @@ SELECT b FROM a WHERE b < 3 2 # Mixed type comparison -query IB +query IB rowsort SELECT c, c > 1 FROM a LIMIT 3 ---- 0 false @@ -129,7 +129,7 @@ SELECT a, b FROM nulls WHERE a <= b # Filter on the result of a projection. query II -SELECT a, b FROM a WHERE a * 2 < b LIMIT 5 +SELECT a, b FROM a WHERE a * 2 < b ORDER BY 1, 2 LIMIT 5 ---- 0 1 1 3 @@ -182,7 +182,7 @@ SELECT (a + 1.0::DECIMAL)::INT FROM a LIMIT 1 # Operations with constants on the left work. query I -SELECT 5 - a FROM a LIMIT 3 +SELECT 5 - a FROM a ORDER BY 1 DESC LIMIT 3 ---- 5 5 @@ -190,7 +190,7 @@ SELECT 5 - a FROM a LIMIT 3 # Constant projections. query II -SELECT 5, a FROM a LIMIT 3 +SELECT 5, a FROM a ORDER BY 2 LIMIT 3 ---- 5 0 5 0 From 60a38465b7eda6a487e7bc0578ed7ff82e345dcb Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Wed, 25 Mar 2020 15:54:44 -0700 Subject: [PATCH 3/6] colexec: further optimize hash aggregator Release justification: low-risk update to new functionality (it is low-risk because it does not change anything fundamentally, rather only improves the way we handle allocations and clear an internal state). This commit optimizes the hash aggregator relationship with selection vectors. Previously, we were maintaining a map from hash code (`uint64`) to a slice of ints, and this would result in creating a new int slice for every hash code that the hash aggregator ever encounters during its run. However, we're processing atmost (about) `batchTupleLimit` tuples at once, so at most we can have the same number of different hash codes. This observation allows us to have constant number of int slices. To accommodate this, we introducing a map from hash code to the "slot" in `[][]int`, and the map is maintained to contain hash codes that we need to process. Once the hash code has been processed, the entry is deleted. This way both the map and the number of int slices stays constant throughout the run of the hash aggregator. Also this commit refactors `makeAggregateFuncs` to separate out the creation of output types (this has some impact on performance of hash aggregator which makes aggregate functions for every group). Release note: None --- pkg/sql/colexec/aggregator.go | 55 +++++++++++---- pkg/sql/colexec/aggregator_test.go | 36 +++++----- pkg/sql/colexec/hash_aggregator.go | 108 +++++++++++++++++++---------- 3 files changed, 130 insertions(+), 69 deletions(-) diff --git a/pkg/sql/colexec/aggregator.go b/pkg/sql/colexec/aggregator.go index 9f9104f1465f..b9fd275dd0c9 100644 --- a/pkg/sql/colexec/aggregator.go +++ b/pkg/sql/colexec/aggregator.go @@ -211,8 +211,13 @@ func NewOrderedAggregator( isScalar: isScalar, } - a.aggregateFuncs, a.outputTypes, err = makeAggregateFuncs(a.allocator, aggTypes, aggFns) - + a.aggregateFuncs, err = makeAggregateFuncs(a.allocator, aggTypes, aggFns) + if err != nil { + return nil, errors.AssertionFailedf( + "this error should have been checked in isAggregateSupported\n%+v", err, + ) + } + a.outputTypes, err = makeAggregateFuncsOutputTypes(aggTypes, aggFns) if err != nil { return nil, errors.AssertionFailedf( "this error should have been checked in isAggregateSupported\n%+v", err, @@ -224,9 +229,8 @@ func NewOrderedAggregator( func makeAggregateFuncs( allocator *Allocator, aggTyps [][]coltypes.T, aggFns []execinfrapb.AggregatorSpec_Func, -) ([]aggregateFunc, []coltypes.T, error) { +) ([]aggregateFunc, error) { funcs := make([]aggregateFunc, len(aggFns)) - outTyps := make([]coltypes.T, len(aggFns)) for i := range aggFns { var err error @@ -250,26 +254,46 @@ func makeAggregateFuncs( case execinfrapb.AggregatorSpec_BOOL_OR: funcs[i] = newBoolOrAgg() default: - return nil, nil, errors.Errorf("unsupported columnar aggregate function %s", aggFns[i].String()) + return nil, errors.Errorf("unsupported columnar aggregate function %s", aggFns[i].String()) } + if err != nil { + return nil, err + } + } + + return funcs, nil +} + +func makeAggregateFuncsOutputTypes( + aggTyps [][]coltypes.T, aggFns []execinfrapb.AggregatorSpec_Func, +) ([]coltypes.T, error) { + outTyps := make([]coltypes.T, len(aggFns)) + + for i := range aggFns { // Set the output type of the aggregate. switch aggFns[i] { case execinfrapb.AggregatorSpec_COUNT_ROWS, execinfrapb.AggregatorSpec_COUNT: // TODO(jordan): this is a somewhat of a hack. The aggregate functions // should come with their own output types, somehow. outTyps[i] = coltypes.Int64 - default: + case + execinfrapb.AggregatorSpec_ANY_NOT_NULL, + execinfrapb.AggregatorSpec_AVG, + execinfrapb.AggregatorSpec_SUM, + execinfrapb.AggregatorSpec_SUM_INT, + execinfrapb.AggregatorSpec_MIN, + execinfrapb.AggregatorSpec_MAX, + execinfrapb.AggregatorSpec_BOOL_AND, + execinfrapb.AggregatorSpec_BOOL_OR: // Output types are the input types for now. outTyps[i] = aggTyps[i][0] - } - - if err != nil { - return nil, nil, err + default: + return nil, errors.Errorf("unsupported columnar aggregate function %s", aggFns[i].String()) } } - return funcs, outTyps, nil + return outTyps, nil } func (a *orderedAggregator) initWithOutputBatchSize(outputSize int) { @@ -470,7 +494,7 @@ func isAggregateSupported( return false, errors.Newf("sum_int is only supported on Int64 through vectorized") } } - _, outputTypes, err := makeAggregateFuncs( + _, err = makeAggregateFuncs( nil, /* allocator */ [][]coltypes.T{aggTypes}, []execinfrapb.AggregatorSpec_Func{aggFn}, @@ -478,6 +502,13 @@ func isAggregateSupported( if err != nil { return false, err } + outputTypes, err := makeAggregateFuncsOutputTypes( + [][]coltypes.T{aggTypes}, + []execinfrapb.AggregatorSpec_Func{aggFn}, + ) + if err != nil { + return false, err + } _, retType, err := execinfrapb.GetAggregateInfo(aggFn, inputTypes...) if err != nil { return false, err diff --git a/pkg/sql/colexec/aggregator_test.go b/pkg/sql/colexec/aggregator_test.go index 0189258e7d73..f10fa7eff541 100644 --- a/pkg/sql/colexec/aggregator_test.go +++ b/pkg/sql/colexec/aggregator_test.go @@ -702,6 +702,7 @@ func BenchmarkAggregator(b *testing.B) { rng, _ := randutil.NewPseudoRand() ctx := context.Background() + const bytesFixedLength = 8 for _, aggFn := range []execinfrapb.AggregatorSpec_Func{ execinfrapb.AggregatorSpec_ANY_NOT_NULL, execinfrapb.AggregatorSpec_AVG, @@ -716,12 +717,18 @@ func BenchmarkAggregator(b *testing.B) { fName := execinfrapb.AggregatorSpec_Func_name[int32(aggFn)] b.Run(fName, func(b *testing.B) { for _, agg := range aggTypes { - for _, typ := range []coltypes.T{coltypes.Int64, coltypes.Decimal} { + for typIdx, typ := range []coltypes.T{coltypes.Int64, coltypes.Decimal, coltypes.Bytes} { for _, groupSize := range []int{1, 2, coldata.BatchSize() / 2, coldata.BatchSize()} { for _, hasNulls := range []bool{false, true} { for _, numInputBatches := range []int{64} { if aggFn == execinfrapb.AggregatorSpec_BOOL_AND || aggFn == execinfrapb.AggregatorSpec_BOOL_OR { typ = coltypes.Bool + if typIdx > 0 { + // We don't need to run the benchmark of bool_and and + // bool_or multiple times, so we skip all runs except + // for the first one. + continue + } } b.Run(fmt.Sprintf("%s/%s/groupSize=%d/hasNulls=%t/numInputBatches=%d", agg.name, typ.String(), groupSize, hasNulls, numInputBatches), @@ -740,29 +747,18 @@ func BenchmarkAggregator(b *testing.B) { } groups[i] = int64(curGroup) } + nullProb := 0.0 if hasNulls { - nulls := cols[1].Nulls() - for i := 0; i < nTuples; i++ { - if rng.Float64() < nullProbability { - nulls.SetNull(i) - } - } + nullProb = nullProbability } - switch typ { - case coltypes.Int64: + coldata.RandomVec(rng, typ, bytesFixedLength, cols[1], nTuples, nullProb) + if typ == coltypes.Int64 && aggFn == execinfrapb.AggregatorSpec_SUM { + // Summation of random Int64 values can lead to + // overflow, and we will panic. To go around it, we + // restrict the range of values. vals := cols[1].Int64() for i := range vals { - vals[i] = rng.Int63() % 1024 - } - case coltypes.Decimal: - vals := cols[1].Decimal() - for i := range vals { - vals[i].SetInt64(rng.Int63() % 1024) - } - case coltypes.Bool: - vals := cols[1].Bool() - for i := range vals { - vals[i] = rng.Float64() < 0.5 + vals[i] = vals[i] % 1024 } } source := newChunkingBatchSource(colTypes, cols, nTuples) diff --git a/pkg/sql/colexec/hash_aggregator.go b/pkg/sql/colexec/hash_aggregator.go index 1414b8e6e0f4..a243a4fad631 100644 --- a/pkg/sql/colexec/hash_aggregator.go +++ b/pkg/sql/colexec/hash_aggregator.go @@ -79,8 +79,7 @@ type hashAggregator struct { valCols []uint32 // batchTupleLimit limits the number of tuples the aggregator will buffer - // before it starts to perform aggregation. The maximum value of this field - // is math.MaxUint16 - coldata.BatchSize(). + // before it starts to perform aggregation. batchTupleLimit int // state stores the current state of hashAggregator. @@ -88,6 +87,8 @@ type hashAggregator struct { scratch struct { coldata.Batch + // vecs stores "unwrapped" batch. + vecs []coldata.Vec // sels stores the intermediate selection vector for each hash code. It // is maintained in such a way that when for a particular hashCode @@ -95,13 +96,20 @@ type hashAggregator struct { // length 0. Also, onlineAgg() method will reset all modified slices to // have zero length once it is done processing all tuples in the batch, // this allows us to not reset the slices for all possible hash codes. - // TODO(yuzefovich): instead of having a map from hashCode to []int - // (which could result in having many int slices), we could use - // constant number of such slices (probably batchTupleLimit of them) - // and have a map from hashCode to an index in [][]int that would do + // + // Instead of having a map from hashCode to []int (which could result + // in having many int slices), we are using a constant number of such + // slices and have a map from hashCode to a "slot" in sels that does // the "translation." The key insight here is that we will have at most - // batchTupleLimit different hashCodes at once. - sels map[uint64][]int + // batchTupleLimit (plus - possibly - constant excess) different + // hashCodes at once. + sels [][]int + // hashCodeToSelsSlot is a mapping from the hashCode to a slot in sels + // slice. New keys are added to this map when building the selections + // when new hashCode is encountered, and keys are deleted from the map + // in online aggregation phase once the tuples with the corresponding + // hash codes have been processed. + hashCodeToSelsSlot map[uint64]int // group is a boolean vector where "true" represent the beginning of a group // in the column. It is shared among all aggregation functions. Since @@ -123,6 +131,8 @@ type hashAggregator struct { // bufferedBatch because in the worst case where all keys in the grouping // columns are distinct, we need to store every single key in the input. keyMapping coldata.Batch + // keyMappingVecs stores "unwrapped" keyMapping batch. + keyMappingVecs []coldata.Vec output struct { coldata.Batch @@ -155,6 +165,7 @@ type hashAggregator struct { // hashBuffer stores hash values for each tuple in the buffered batch. hashBuffer []uint64 + alloc hashAggFuncsAlloc cancelChecker CancelChecker decimalScratch decimalOverloadScratch } @@ -213,7 +224,7 @@ func NewHashAggregator( } } - _, outputTypes, err := makeAggregateFuncs(allocator, aggTyps, aggFns) + outputTypes, err := makeAggregateFuncsOutputTypes(aggTyps, aggFns) if err != nil { return nil, errors.AssertionFailedf( "this error should have been checked in isAggregateSupported\n%+v", err, @@ -258,14 +269,19 @@ func (op *hashAggregator) Init() { // to accommodate the case where sometimes number of buffered tuples exceeds // op.batchTupleLimit. This is because we perform checks after appending the // input tuples to the scratch buffer. - op.scratch.Batch = - op.allocator.NewMemBatchWithSize(op.valTypes, op.batchTupleLimit+coldata.BatchSize()) - op.scratch.sels = make(map[uint64][]int) - op.scratch.group = make([]bool, op.batchTupleLimit+coldata.BatchSize()) - + maxBufferedTuples := op.batchTupleLimit + coldata.BatchSize() + op.scratch.Batch = op.allocator.NewMemBatchWithSize(op.valTypes, maxBufferedTuples) + op.scratch.vecs = op.scratch.ColVecs() + op.scratch.sels = make([][]int, maxBufferedTuples) + op.scratch.hashCodeToSelsSlot = make(map[uint64]int) + op.scratch.group = make([]bool, maxBufferedTuples) + // Eventually, op.keyMapping will contain as many tuples as there are + // groups in the input, but we don't know that number upfront, so we + // allocate it with some reasonably sized constant capacity. op.keyMapping = op.allocator.NewMemBatchWithSize(op.groupTypes, op.batchTupleLimit) + op.keyMappingVecs = op.keyMapping.ColVecs() - op.hashBuffer = make([]uint64, op.batchTupleLimit+coldata.BatchSize()) + op.hashBuffer = make([]uint64, maxBufferedTuples) } func (op *hashAggregator) Next(ctx context.Context) coldata.Batch { @@ -364,9 +380,9 @@ func (op *hashAggregator) bufferBatch(ctx context.Context) bool { break } bufferedTupleCount += batchSize - op.allocator.PerformOperation(op.scratch.ColVecs(), func() { + op.allocator.PerformOperation(op.scratch.vecs, func() { for i, colIdx := range op.valCols { - op.scratch.ColVec(i).Append( + op.scratch.vecs[i].Append( coldata.SliceArgs{ ColType: op.valTypes[i], Src: b.ColVec(int(colIdx)), @@ -393,7 +409,7 @@ func (op *hashAggregator) buildSelectionForEachHashCode(ctx context.Context) { rehash(ctx, hashBuffer, op.valTypes[colIdx], - op.scratch.ColVec(int(colIdx)), + op.scratch.vecs[colIdx], nKeys, nil, /* sel */ op.cancelChecker, @@ -408,13 +424,17 @@ func (op *hashAggregator) buildSelectionForEachHashCode(ctx context.Context) { // they all are of zero length here (see the comment for op.scratch.sels // for context). + nextSelsSlot := 0 // We can use selIdx to index into op.scratch since op.scratch never has a // a selection vector. for selIdx, hashCode := range hashBuffer { - if _, ok := op.scratch.sels[hashCode]; !ok { - op.scratch.sels[hashCode] = make([]int, 0) + selsSlot, ok := op.scratch.hashCodeToSelsSlot[hashCode] + if !ok { + selsSlot = nextSelsSlot + op.scratch.hashCodeToSelsSlot[hashCode] = selsSlot + nextSelsSlot++ } - op.scratch.sels[hashCode] = append(op.scratch.sels[hashCode], selIdx) + op.scratch.sels[selsSlot] = append(op.scratch.sels[selsSlot], selIdx) } } @@ -422,18 +442,15 @@ func (op *hashAggregator) buildSelectionForEachHashCode(ctx context.Context) { // aggFunctions for each group if it doesn't not exist. Then it calls Compute() // on each aggregation function to perform aggregation. func (op *hashAggregator) onlineAgg() { - // Unwrap colvecs to avoid calling .ColVec() in a tight loop each time. - keyMappingVecs := op.keyMapping.ColVecs() - scratchBufferVecs := op.scratch.ColVecs() - for _, hashCode := range op.hashBuffer { - remaining := op.scratch.sels[hashCode] - if len(remaining) == 0 { + selsSlot, ok := op.scratch.hashCodeToSelsSlot[hashCode] + if !ok { // It is possible that multiple tuples have the same hashCode, and // we process all such tuples when we encounter the first of these // tuples. continue } + remaining := op.scratch.sels[selsSlot] var anyMatched bool @@ -460,7 +477,7 @@ func (op *hashAggregator) onlineAgg() { op.aggFuncMap[hashCode] = make([]*hashAggFuncs, 0, 1) } - // Stage 2: Build aggregate function that doesn't exist then perform + // Stage 2: Build aggregate function that doesn't exist, then perform // aggregation on the newly created aggregate function. for len(remaining) > 0 { // Record the selection vector index of the beginning of the group. @@ -468,16 +485,17 @@ func (op *hashAggregator) onlineAgg() { // Build new agg functions. keyIdx := op.keyMapping.Length() - aggFunc := &hashAggFuncs{keyIdx: keyIdx} + aggFunc := op.alloc.newHashAggFuncs() + aggFunc.keyIdx = keyIdx // Store the key of the current aggregating group into keyMapping. - op.allocator.PerformOperation(keyMappingVecs, func() { + op.allocator.PerformOperation(op.keyMappingVecs, func() { for keyIdx, colIdx := range op.groupCols { // TODO(azhng): Try to preallocate enough memory so instead of // .Append() we can use execgen.SET to improve the // performance. - keyMappingVecs[keyIdx].Append(coldata.SliceArgs{ - Src: scratchBufferVecs[colIdx], + op.keyMappingVecs[keyIdx].Append(coldata.SliceArgs{ + Src: op.scratch.vecs[colIdx], ColType: op.valTypes[colIdx], DestIdx: aggFunc.keyIdx, SrcStartIdx: remaining[0], @@ -487,8 +505,7 @@ func (op *hashAggregator) onlineAgg() { op.keyMapping.SetLength(keyIdx + 1) }) - aggFunc.fns, _, _ = - makeAggregateFuncs(op.allocator, op.aggTypes, op.aggFuncs) + aggFunc.fns, _ = makeAggregateFuncs(op.allocator, op.aggTypes, op.aggFuncs) op.aggFuncMap[hashCode] = append(op.aggFuncMap[hashCode], aggFunc) // Select rest of the tuples that matches the current key. We don't need @@ -510,7 +527,9 @@ func (op *hashAggregator) onlineAgg() { // We have processed all tuples with this hashCode, so we should reset // the length of the corresponding slice. - op.scratch.sels[hashCode] = op.scratch.sels[hashCode][:0] + op.scratch.sels[selsSlot] = op.scratch.sels[selsSlot][:0] + // We also need to delete the hashCode from the mapping. + delete(op.scratch.hashCodeToSelsSlot, hashCode) } } @@ -533,8 +552,6 @@ func (op *hashAggregator) reset(ctx context.Context) { op.keyMapping.ResetInternalBatch() op.keyMapping.SetLength(0) - - op.scratch.sels = make(map[uint64][]int) } // hashAggFuncs stores the aggregation functions for the corresponding @@ -560,3 +577,20 @@ func (v *hashAggFuncs) compute(b coldata.Batch, aggCols [][]uint32) { fn.Compute(b, aggCols[fnIdx]) } } + +const hashAggFuncsAllocSize = 16 + +// hashAggFuncsAlloc is a utility struct that batches allocations of +// hashAggFuncs. +type hashAggFuncsAlloc struct { + buf []hashAggFuncs +} + +func (a *hashAggFuncsAlloc) newHashAggFuncs() *hashAggFuncs { + if len(a.buf) == 0 { + a.buf = make([]hashAggFuncs, hashAggFuncsAllocSize) + } + ret := &a.buf[0] + a.buf = a.buf[1:] + return ret +} From ffe79b121e24c3ff4688161f3c2d3745be8ef46c Mon Sep 17 00:00:00 2001 From: Peter Mattis Date: Fri, 27 Mar 2020 08:33:04 -0400 Subject: [PATCH 4/6] cli,build: remove backtrace support Support for Backtrace Labs out of process tracers (i.e. ptrace) has been disabled since Nov 2016. We never found this integration useful. It is past time to remove it. Release justification: removal of unused code Release note: None --- Gopkg.lock | 9 --- build/builder.sh | 6 -- pkg/cli/backtrace.go | 123 ------------------------------- pkg/cli/backtrace_unsupported.go | 19 ----- pkg/cli/context.go | 4 - pkg/cli/demo.go | 6 +- pkg/cli/start.go | 3 +- vendor | 2 +- 8 files changed, 4 insertions(+), 168 deletions(-) delete mode 100644 pkg/cli/backtrace.go delete mode 100644 pkg/cli/backtrace_unsupported.go diff --git a/Gopkg.lock b/Gopkg.lock index 2c2b1ee679e6..dc82b5fdcda8 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -293,14 +293,6 @@ pruneopts = "UT" revision = "4b99d0c2c99ec77eb3a42344d206a88997957495" -[[projects]] - branch = "master" - digest = "1:9163b457fd33634a890c83fba9f0fd20b8991101b55490fa705b1074a7bd2cdb" - name = "github.com/backtrace-labs/go-bcd" - packages = ["."] - pruneopts = "UT" - revision = "5d8e01b2f0438922289238fd3ba043761daf1102" - [[projects]] branch = "master" digest = "1:927f2f9632311e72cc76586aebff836c12c0132150afc2fe251f616e41522595" @@ -2023,7 +2015,6 @@ "github.com/aws/aws-sdk-go/service/s3", "github.com/aws/aws-sdk-go/service/s3/s3manager", "github.com/axiomhq/hyperloglog", - "github.com/backtrace-labs/go-bcd", "github.com/benesch/cgosymbolizer", "github.com/biogo/store/llrb", "github.com/cenkalti/backoff", diff --git a/build/builder.sh b/build/builder.sh index d81302a79861..2080cefb299a 100755 --- a/build/builder.sh +++ b/build/builder.sh @@ -126,12 +126,6 @@ if test -e "${alternates_file}"; then vols="${vols} --volume=${alternates_path}:${alternates_path}${cached_volume_mode}" fi -backtrace_dir=${cockroach_toplevel}/../../cockroachlabs/backtrace -if test -d "${backtrace_dir}"; then - vols="${vols} --volume=${backtrace_dir}:/opt/backtrace${cached_volume_mode}" - vols="${vols} --volume=${backtrace_dir}/cockroach.cf:${container_home}/.coroner.cf${cached_volume_mode}" -fi - if [ "${BUILDER_HIDE_GOPATH_SRC:-}" != "1" ]; then vols="${vols} --volume=${gopath0}/src:/go/src${cached_volume_mode}" fi diff --git a/pkg/cli/backtrace.go b/pkg/cli/backtrace.go deleted file mode 100644 index 182d2996b560..000000000000 --- a/pkg/cli/backtrace.go +++ /dev/null @@ -1,123 +0,0 @@ -// Copyright 2016 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -// +build linux freebsd - -package cli - -import ( - "context" - "fmt" - "os" - "path/filepath" - "time" - - "github.com/backtrace-labs/go-bcd" - "github.com/cockroachdb/cockroach/pkg/build" - "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/stop" - "golang.org/x/sys/unix" -) - -// Currently disabled as backtrace appears to be obscuring problems when test -// clusters encounter panics. See #10872. -const backtraceEnabled = false - -func initBacktrace(logDir string, options ...stop.Option) *stop.Stopper { - if !backtraceEnabled { - return stop.NewStopper(options...) - } - - ctx := context.TODO() - - const ptracePath = "/opt/backtrace/bin/ptrace" - if _, err := os.Stat(ptracePath); err != nil { - log.Infof(ctx, "backtrace disabled: %s", err) - return stop.NewStopper(options...) - } - - if err := bcd.EnableTracing(); err != nil { - log.Infof(ctx, "unable to enable backtrace: %s", err) - return stop.NewStopper(options...) - } - - bcd.UpdateConfig(bcd.GlobalConfig{ - PanicOnKillFailure: true, - ResendSignal: true, - RateLimit: time.Second * 3, - SynchronousPut: true, - }) - - // Use the default tracer implementation. - // false: Exclude system goroutines. - tracer := bcd.New(bcd.NewOptions{ - IncludeSystemGs: false, - }) - if err := tracer.SetOutputPath(logDir, 0755); err != nil { - log.Infof(ctx, "unable to set output path: %s", err) - // Not a fatal error, continue. - } - - // Enable WARNING log output from the tracer. - tracer.AddOptions(nil, "-L", "WARNING") - - info := build.GetInfo() - tracer.AddKV(nil, "cgo-compiler", info.CgoCompiler) - tracer.AddKV(nil, "go-version", info.GoVersion) - tracer.AddKV(nil, "platform", info.Platform) - tracer.AddKV(nil, "type", info.Type) - tracer.AddKV(nil, "tag", info.Tag) - tracer.AddKV(nil, "time", info.Time) - - // Register for traces on signal reception. - tracer.SetSigset( - unix.SIGABRT, - unix.SIGBUS, - unix.SIGFPE, - unix.SIGILL, - unix.SIGSEGV, - ) - bcd.Register(tracer) - - // Hook log.Fatal*. - log.SetExitFunc(false /* hideStack */, func(code int) { - _ = bcd.Trace(tracer, fmt.Errorf("exit %d", code), nil) - os.Exit(code) - }) - - options = append(options, - stop.OnPanic(func(val interface{}) { - err, ok := val.(error) - if !ok { - err = fmt.Errorf("%v", val) - } - _ = bcd.Trace(tracer, err, nil) - panic(val) - })) - - stopper := stop.NewStopper(options...) - - // Internally, backtrace uses an external program (/opt/backtrace/bin/ptrace) - // to generate traces. We direct the stdout for this program to a file for - // debugging our usage of backtrace. - if f, err := os.OpenFile(filepath.Join(logDir, "backtrace.out"), - os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666); err != nil { - log.Infof(ctx, "unable to open: %s", err) - } else { - stopper.AddCloser(stop.CloserFn(func() { - f.Close() - })) - tracer.SetPipes(nil, f) - } - - tracer.SetLogLevel(bcd.LogMax) - log.Infof(ctx, "backtrace enabled") - return stopper -} diff --git a/pkg/cli/backtrace_unsupported.go b/pkg/cli/backtrace_unsupported.go deleted file mode 100644 index d639f5b61a69..000000000000 --- a/pkg/cli/backtrace_unsupported.go +++ /dev/null @@ -1,19 +0,0 @@ -// Copyright 2016 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -// +build !linux,!freebsd - -package cli - -import "github.com/cockroachdb/cockroach/pkg/util/stop" - -func initBacktrace(logDir string, options ...stop.Option) *stop.Stopper { - return stop.NewStopper(options...) -} diff --git a/pkg/cli/context.go b/pkg/cli/context.go index 923d32ae811b..df78026aaa6f 100644 --- a/pkg/cli/context.go +++ b/pkg/cli/context.go @@ -127,7 +127,6 @@ func initCLIDefaults() { startCtx.listeningURLFile = "" startCtx.pidFile = "" startCtx.inBackground = false - startCtx.backtraceOutputDir = "" quitCtx.serverDecommission = false @@ -325,9 +324,6 @@ var startCtx struct { // logging settings specific to file logging. logDir log.DirName - - // directory to use for logging backtrace outputs. - backtraceOutputDir string } // quitCtx captures the command-line parameters of the `quit` command. diff --git a/pkg/cli/demo.go b/pkg/cli/demo.go index 0772c6dcfb92..1a0ab3f3b810 100644 --- a/pkg/cli/demo.go +++ b/pkg/cli/demo.go @@ -293,10 +293,8 @@ func testServerArgsForTransientCluster(nodeID roachpb.NodeID, joinAddr string) b storeSpec.StickyInMemoryEngineID = fmt.Sprintf("demo-node%d", nodeID) args := base.TestServerArgs{ - PartOfCluster: true, - Stopper: initBacktrace( - fmt.Sprintf("%s/demo-node%d", startCtx.backtraceOutputDir, nodeID), - ), + PartOfCluster: true, + Stopper: stop.NewStopper(), JoinAddr: joinAddr, DisableTLSForHTTP: true, StoreSpecs: []base.StoreSpec{storeSpec}, diff --git a/pkg/cli/start.go b/pkg/cli/start.go index b40c10c28b1e..b1afa3b55d9b 100644 --- a/pkg/cli/start.go +++ b/pkg/cli/start.go @@ -1199,7 +1199,6 @@ func setupAndInitializeLoggingAndProfiling( if p := logOutputDirectory(); p != "" { outputDirectory = p } - startCtx.backtraceOutputDir = outputDirectory serverCfg.GoroutineDumpDirName = filepath.Join(outputDirectory, base.GoroutineDumpDir) serverCfg.HeapProfileDirName = filepath.Join(outputDirectory, base.HeapProfileDir) @@ -1250,7 +1249,7 @@ func setupAndInitializeLoggingAndProfiling( // Disable Stopper task tracking as performing that call site tracking is // moderately expensive (certainly outweighing the infrequent benefit it // provides). - stopper = initBacktrace(outputDirectory) + stopper = stop.NewStopper() log.Event(ctx, "initialized profiles") return stopper, nil diff --git a/vendor b/vendor index 96685713ba9f..5a9ddedc78e7 160000 --- a/vendor +++ b/vendor @@ -1 +1 @@ -Subproject commit 96685713ba9f84f506042b5f2d56e881ee2d9202 +Subproject commit 5a9ddedc78e7c3705ed08d68eb9873e766ad1f6d From 7f611599ffe33566b1df49859f45abb9d148c0fd Mon Sep 17 00:00:00 2001 From: Peter Mattis Date: Fri, 27 Mar 2020 10:50:32 -0400 Subject: [PATCH 5/6] build: do not fail RocksDB build on compiler warnings Disable `-Werror` for the RocksDB build which has recently started complaining about a missing exception specification in a jemalloc header with the newest version of Xcode. Release justification: low risk change to remove developer build irritation. Should be a no-op for production builds. Release note: None --- Makefile | 3 +-- c-deps/rocksdb-rebuild | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/Makefile b/Makefile index 997bc8f86f36..9f780720a22e 100644 --- a/Makefile +++ b/Makefile @@ -642,8 +642,7 @@ $(ROCKSDB_DIR)/Makefile: $(C_DEPS_DIR)/rocksdb-rebuild | bin/.submodules-initial -DSNAPPY_LIBRARIES=$(LIBSNAPPY) -DSNAPPY_INCLUDE_DIR="$(SNAPPY_SRC_DIR);$(SNAPPY_DIR)" -DWITH_SNAPPY=ON \ $(if $(use-stdmalloc),,-DJEMALLOC_LIBRARIES=$(LIBJEMALLOC) -DJEMALLOC_INCLUDE_DIR=$(JEMALLOC_DIR)/include -DWITH_JEMALLOC=ON) \ -DCMAKE_BUILD_TYPE=$(if $(ENABLE_ROCKSDB_ASSERTIONS),Debug,Release) \ - -DFAIL_ON_WARNINGS=$(if $(findstring windows,$(XGOOS)),0,1) \ - -DUSE_RTTI=1 + -DUSE_RTTI=1 -DFAIL_ON_WARNINGS=0 $(SNAPPY_DIR)/Makefile: $(C_DEPS_DIR)/snappy-rebuild | bin/.submodules-initialized rm -rf $(SNAPPY_DIR) diff --git a/c-deps/rocksdb-rebuild b/c-deps/rocksdb-rebuild index af91aaa8c5b5..e2c3eec6ef5f 100644 --- a/c-deps/rocksdb-rebuild +++ b/c-deps/rocksdb-rebuild @@ -1,4 +1,4 @@ Bump the version below when changing rocksdb CMake flags. Search for "BUILD ARTIFACT CACHING" in build/common.mk for rationale. -13 +14 From f9a536c53b6a69b2ea29f7d2acef85947e33c673 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Fri, 27 Mar 2020 07:49:22 -0700 Subject: [PATCH 6/6] workload: enhance querybench Release justification: non-production code changes. `querybench` workload has been enhanced to support queries that consist of multiple statements that are present on a single line. Previously, the workload would fail on such queries because it attempts to `Prepare` the whole line which would error out. Now we're still attempting to prepare the whole line, but if that fails, we will store plain query and will be executing it using `DB.Query` rather that `Stmt.Query`. Release note: None --- pkg/workload/querybench/query_bench.go | 49 ++++++++++++++++++-------- pkg/workload/querybench/tpch-queries | 4 +-- 2 files changed, 36 insertions(+), 17 deletions(-) diff --git a/pkg/workload/querybench/query_bench.go b/pkg/workload/querybench/query_bench.go index 6469b7a2ab69..ccc0bc74a94c 100644 --- a/pkg/workload/querybench/query_bench.go +++ b/pkg/workload/querybench/query_bench.go @@ -134,16 +134,17 @@ func (g *queryBench) Ops(urls []string, reg *histogram.Registry) (workload.Query stmts := make([]namedStmt, len(g.queries)) for i, query := range g.queries { - stmt, err := db.Prepare(query) - if err != nil { - return workload.QueryLoad{}, errors.Wrapf(err, "failed to prepare query %q", query) - } stmts[i] = namedStmt{ // TODO(solon): Allow specifying names in the query file rather than using // the entire query as the name. name: fmt.Sprintf("%2d: %s", i+1, query), - stmt: stmt, } + stmt, err := db.Prepare(query) + if err != nil { + stmts[i].query = query + continue + } + stmts[i].preparedStmt = stmt } maxNumStmts := 0 @@ -192,7 +193,11 @@ func GetQueries(path string) ([]string, error) { type namedStmt struct { name string - stmt *gosql.Stmt + // We will try to Prepare the statement, and if that succeeds, the prepared + // statement will be stored in `preparedStmt', otherwise, we will store + // plain query in 'query'. + preparedStmt *gosql.Stmt + query string } type queryBenchWorker struct { @@ -221,15 +226,31 @@ func (o *queryBenchWorker) run(ctx context.Context) error { stmt := o.stmts[o.stmtIdx%len(o.stmts)] o.stmtIdx++ - rows, err := stmt.stmt.Query() - if err != nil { - return err - } - defer rows.Close() - for rows.Next() { + exhaustRows := func(execFn func() (*gosql.Rows, error)) error { + rows, err := execFn() + if err != nil { + return err + } + defer rows.Close() + for rows.Next() { + } + if err := rows.Err(); err != nil { + return err + } + return nil } - if err := rows.Err(); err != nil { - return err + if stmt.preparedStmt != nil { + if err := exhaustRows(func() (*gosql.Rows, error) { + return stmt.preparedStmt.Query() + }); err != nil { + return err + } + } else { + if err := exhaustRows(func() (*gosql.Rows, error) { + return o.db.Query(stmt.query) + }); err != nil { + return err + } } elapsed := timeutil.Since(start) if o.verbose { diff --git a/pkg/workload/querybench/tpch-queries b/pkg/workload/querybench/tpch-queries index b72840d215d4..852528b0bb24 100644 --- a/pkg/workload/querybench/tpch-queries +++ b/pkg/workload/querybench/tpch-queries @@ -44,9 +44,7 @@ SELECT c_count, count(*) AS custdist FROM ( SELECT c_custkey, count(o_orderkey) SELECT 100.00 * sum(CASE WHEN p_type LIKE 'PROMO%' THEN l_extendedprice * (1 - l_discount) ELSE 0 END) / sum(l_extendedprice * (1 - l_discount)) AS promo_revenue FROM lineitem, part WHERE l_partkey = p_partkey AND l_shipdate >= DATE '1995-09-01' AND l_shipdate < DATE '1995-09-01' + INTERVAL '1' MONTH -- query 15 -CREATE VIEW revenue0 (supplier_no, total_revenue) AS SELECT l_suppkey, sum(l_extendedprice * (1 - l_discount)) FROM lineitem WHERE l_shipdate >= DATE '1996-01-01' AND l_shipdate < DATE '1996-01-01' + INTERVAL '3' MONTH GROUP BY l_suppkey -SELECT s_suppkey, s_name, s_address, s_phone, total_revenue FROM supplier, revenue0 WHERE s_suppkey = supplier_no AND total_revenue = ( SELECT max(total_revenue) FROM revenue0) ORDER BY s_suppkey -DROP VIEW revenue0 +CREATE VIEW revenue0 (supplier_no, total_revenue) AS SELECT l_suppkey, sum(l_extendedprice * (1 - l_discount)) FROM lineitem WHERE l_shipdate >= DATE '1996-01-01' AND l_shipdate < DATE '1996-01-01' + INTERVAL '3' MONTH GROUP BY l_suppkey; SELECT s_suppkey, s_name, s_address, s_phone, total_revenue FROM supplier, revenue0 WHERE s_suppkey = supplier_no AND total_revenue = ( SELECT max(total_revenue) FROM revenue0) ORDER BY s_suppkey; DROP VIEW revenue0 -- query 16 SELECT p_brand, p_type, p_size, count(distinct ps_suppkey) AS supplier_cnt FROM partsupp, part WHERE p_partkey = ps_partkey AND p_brand <> 'Brand#45' AND p_type NOT LIKE 'MEDIUM POLISHED%' AND p_size IN (49, 14, 23, 45, 19, 3, 36, 9) AND ps_suppkey NOT IN ( SELECT s_suppkey FROM supplier WHERE s_comment LIKE '%Customer%Complaints%') GROUP BY p_brand, p_type, p_size ORDER BY supplier_cnt DESC, p_brand, p_type, p_size