Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
63355: opt: fix duplicate empty JSON value spans for <@ expressions r=angelazxu a=angelazxu

Previously, there were some duplicate spans produced for empty JSON values,
such as {"a": []} and {"a": {}}. This was discovered in the spans produced
from contained by and fetch/contained by expressions.

This commit fixes this issue, only allowing one empty object or array span to
be created per level of nesting, in encodedContainedInvertedIndexSpans.

Informs: #61430

Release note: None

63371: colexec: miscellaneous cleanup r=yuzefovich a=yuzefovich

**colexec: clean up external distinct test a bit**

Release note: None

**colbuilder: enforce unique memory monitor names**

Our disk-spilling infrastructure (namely, `diskSpillerBase`) uses the
name of the limited memory monitor of its in-memory operator in order to
distinguish the "memory budget exceeded" errors between the ones it
needs to catch and the ones it needs to propagate further. In
particular, previously it was possible that the in-memory chains within
two disk-backed sorters created for the external distinct (one in the
fallback strategy and another when the output ordering is needed, on top
of the external distinct) would have the same memory monitor names. This
is now fixed by appending a unique suffix to all names and is enforced
via an invariants assertion in the test setup.

Note that even with the previous behavior no actual bug would occur
because the monitors with the same name were on "different levels of the
tree" and there was the correct catcher in-between. So this commit
simply restores the assumption that we had implicitly without fixing any
production bugs.

Release note: None

63372: colexec: fix hash aggregator when spilling to disk r=yuzefovich a=yuzefovich

**execinfrapb: introduce aliases for agg funcs and use everywhere**

This commit introduces nicer aliases for the specification of the
aggregate functions and uses the aliases throughout the code base.

Release note: None

**colexec: clean up aggregator test cases**

This commit is only a test change. It cleans up the aggregator test
cases in the following ways:
- removing some of the defaults in favor of explicit setting (easier to
read each test case in isolation)
- reordering the fields to have uniform assignment order
- inserting any_not_null aggregates for the cases when the input is
ordered (this will be needed by the follow up commit that will enforce
a particular order on the output). This change simulates how specs are
created in the production.
- removing a couple of impossible in production test cases (when some
columns are unused).

Release note: None

**colexec: fix hash aggregator when spilling to disk**

In some cases the aggregation is expected to maintain the required
ordering in order to eliminate an explicit sort afterwards. It is always
the case that the required ordering is a prefix of ordered grouping
columns. With the introduction of disk spilling for the vectorized hash
aggregator in 21.1 release the ordering was no longer maintained if the
spilling occurs. In all previous cases (row-by-row processors and
in-memory columnar operator) the ordering was maintained by
construction, but with `hashBasedPartitioner` the ordering can be
arbitrary.

In order to fix this issue we now do what we did for the external
distinct - we plan an external sort on top of the external hash
aggregator to restore the required ordering. Note that this will only
kick in if the spilling to disk occurred. This required changes to the
AggregatorSpec to propagate the required output ordering.

Fixes: #63159.

Release note (bug fix): In 21.1 alpha and beta releases CockroachDB
could return the output in an incorrect order if the query containing
hash aggregation was executed via the vectorized engine and spilling to
temporary storage was required, in some cases.

Co-authored-by: Angela Xu <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
3 people committed Apr 9, 2021
4 parents 832e8af + a72d1bb + 7d8097a + ef13232 commit 71a023f
Show file tree
Hide file tree
Showing 33 changed files with 1,173 additions and 947 deletions.
575 changes: 312 additions & 263 deletions pkg/sql/colexec/aggregators_test.go

Large diffs are not rendered by default.

177 changes: 95 additions & 82 deletions pkg/sql/colexec/colbuilder/execplan.go

Large diffs are not rendered by default.

44 changes: 22 additions & 22 deletions pkg/sql/colexec/colexecagg/aggregate_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,17 @@ import (
// IsAggOptimized returns whether aggFn has an optimized implementation.
func IsAggOptimized(aggFn execinfrapb.AggregatorSpec_Func) bool {
switch aggFn {
case execinfrapb.AggregatorSpec_ANY_NOT_NULL,
execinfrapb.AggregatorSpec_AVG,
execinfrapb.AggregatorSpec_SUM,
execinfrapb.AggregatorSpec_SUM_INT,
execinfrapb.AggregatorSpec_CONCAT_AGG,
execinfrapb.AggregatorSpec_COUNT_ROWS,
execinfrapb.AggregatorSpec_COUNT,
execinfrapb.AggregatorSpec_MIN,
execinfrapb.AggregatorSpec_MAX,
execinfrapb.AggregatorSpec_BOOL_AND,
execinfrapb.AggregatorSpec_BOOL_OR:
case execinfrapb.AnyNotNull,
execinfrapb.Avg,
execinfrapb.Sum,
execinfrapb.SumInt,
execinfrapb.ConcatAgg,
execinfrapb.CountRows,
execinfrapb.Count,
execinfrapb.Min,
execinfrapb.Max,
execinfrapb.BoolAnd,
execinfrapb.BoolOr:
return true
default:
return false
Expand Down Expand Up @@ -225,67 +225,67 @@ func NewAggregateFuncsAlloc(
for i, aggFn := range args.Spec.Aggregations {
var err error
switch aggFn.Func {
case execinfrapb.AggregatorSpec_ANY_NOT_NULL:
case execinfrapb.AnyNotNull:
if isHashAgg {
funcAllocs[i], err = newAnyNotNullHashAggAlloc(args.Allocator, args.InputTypes[aggFn.ColIdx[0]], allocSize)
} else {
funcAllocs[i], err = newAnyNotNullOrderedAggAlloc(args.Allocator, args.InputTypes[aggFn.ColIdx[0]], allocSize)
}
case execinfrapb.AggregatorSpec_AVG:
case execinfrapb.Avg:
if isHashAgg {
funcAllocs[i], err = newAvgHashAggAlloc(args.Allocator, args.InputTypes[aggFn.ColIdx[0]], allocSize)
} else {
funcAllocs[i], err = newAvgOrderedAggAlloc(args.Allocator, args.InputTypes[aggFn.ColIdx[0]], allocSize)
}
case execinfrapb.AggregatorSpec_SUM:
case execinfrapb.Sum:
if isHashAgg {
funcAllocs[i], err = newSumHashAggAlloc(args.Allocator, args.InputTypes[aggFn.ColIdx[0]], allocSize)
} else {
funcAllocs[i], err = newSumOrderedAggAlloc(args.Allocator, args.InputTypes[aggFn.ColIdx[0]], allocSize)
}
case execinfrapb.AggregatorSpec_SUM_INT:
case execinfrapb.SumInt:
if isHashAgg {
funcAllocs[i], err = newSumIntHashAggAlloc(args.Allocator, args.InputTypes[aggFn.ColIdx[0]], allocSize)
} else {
funcAllocs[i], err = newSumIntOrderedAggAlloc(args.Allocator, args.InputTypes[aggFn.ColIdx[0]], allocSize)
}
case execinfrapb.AggregatorSpec_CONCAT_AGG:
case execinfrapb.ConcatAgg:
if isHashAgg {
funcAllocs[i] = newConcatHashAggAlloc(args.Allocator, allocSize)
} else {
funcAllocs[i] = newConcatOrderedAggAlloc(args.Allocator, allocSize)
}
case execinfrapb.AggregatorSpec_COUNT_ROWS:
case execinfrapb.CountRows:
if isHashAgg {
funcAllocs[i] = newCountRowsHashAggAlloc(args.Allocator, allocSize)
} else {
funcAllocs[i] = newCountRowsOrderedAggAlloc(args.Allocator, allocSize)
}
case execinfrapb.AggregatorSpec_COUNT:
case execinfrapb.Count:
if isHashAgg {
funcAllocs[i] = newCountHashAggAlloc(args.Allocator, allocSize)
} else {
funcAllocs[i] = newCountOrderedAggAlloc(args.Allocator, allocSize)
}
case execinfrapb.AggregatorSpec_MIN:
case execinfrapb.Min:
if isHashAgg {
funcAllocs[i] = newMinHashAggAlloc(args.Allocator, args.InputTypes[aggFn.ColIdx[0]], allocSize)
} else {
funcAllocs[i] = newMinOrderedAggAlloc(args.Allocator, args.InputTypes[aggFn.ColIdx[0]], allocSize)
}
case execinfrapb.AggregatorSpec_MAX:
case execinfrapb.Max:
if isHashAgg {
funcAllocs[i] = newMaxHashAggAlloc(args.Allocator, args.InputTypes[aggFn.ColIdx[0]], allocSize)
} else {
funcAllocs[i] = newMaxOrderedAggAlloc(args.Allocator, args.InputTypes[aggFn.ColIdx[0]], allocSize)
}
case execinfrapb.AggregatorSpec_BOOL_AND:
case execinfrapb.BoolAnd:
if isHashAgg {
funcAllocs[i] = newBoolAndHashAggAlloc(args.Allocator, allocSize)
} else {
funcAllocs[i] = newBoolAndOrderedAggAlloc(args.Allocator, allocSize)
}
case execinfrapb.AggregatorSpec_BOOL_OR:
case execinfrapb.BoolOr:
if isHashAgg {
funcAllocs[i] = newBoolOrHashAggAlloc(args.Allocator, allocSize)
} else {
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/colexec/colexecargs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ go_library(
deps = [
"//pkg/col/coldata",
"//pkg/sql/colcontainer",
"//pkg/sql/colexecerror",
"//pkg/sql/colexecop",
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/parser",
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/util/mon",
"@com_github_cockroachdb_errors//:errors",
"@com_github_marusama_semaphore//:semaphore",
],
)
Expand Down
17 changes: 17 additions & 0 deletions pkg/sql/colexec/colexecargs/op_creation.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ import (

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/colcontainer"
"github.com/cockroachdb/cockroach/pkg/sql/colexecerror"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/errors"
"github.com/marusama/semaphore"
)

Expand Down Expand Up @@ -107,6 +109,21 @@ type NewColOperatorResult struct {

var _ execinfra.Releasable = &NewColOperatorResult{}

// AssertInvariants confirms that all invariants are maintained by
// NewColOperatorResult.
func (r *NewColOperatorResult) AssertInvariants() {
// Check that all memory monitor names are unique (colexec.diskSpillerBase
// relies on this in order to catch "memory budget exceeded" errors only
// from "its own" component).
names := make(map[string]struct{}, len(r.OpMonitors))
for _, m := range r.OpMonitors {
if _, seen := names[m.Name()]; seen {
colexecerror.InternalError(errors.AssertionFailedf("monitor named %q encountered twice", m.Name()))
}
names[m.Name()] = struct{}{}
}
}

var newColOperatorResultPool = sync.Pool{
New: func() interface{} {
return &NewColOperatorResult{}
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/colexec/colexectestutils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,8 @@ func RunTestsWithoutAllNullsInjection(
if err != nil {
t.Fatal(err)
}
// We might short-circuit, so defer the closing of the operator.
defer closeIfCloser(ctx, t, op)
op.Init()
b := op.Next(ctx)
if b.Length() == 0 {
Expand Down Expand Up @@ -495,7 +497,6 @@ func RunTestsWithoutAllNullsInjection(
assert.False(t, maybeHasNulls(b))
}
}
closeIfCloser(ctx, t, op)
}
}

Expand Down
73 changes: 37 additions & 36 deletions pkg/sql/colexec/default_agg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexectestutils"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand All @@ -33,10 +34,8 @@ func TestDefaultAggregateFunc(t *testing.T) {
defer log.Scope(t).Close(t)
testCases := []aggregatorTestCase{
{
aggFns: []execinfrapb.AggregatorSpec_Func{
execinfrapb.AggregatorSpec_ANY_NOT_NULL,
execinfrapb.AggregatorSpec_STRING_AGG,
},
name: "StringAgg",
typs: []*types.T{types.Int, types.String, types.String},
input: colexectestutils.Tuples{
{nil, "a", "1"},
{nil, "b", "2"},
Expand All @@ -46,21 +45,21 @@ func TestDefaultAggregateFunc(t *testing.T) {
{1, "f", "6"},
{1, "g", "7"},
},
groupCols: []uint32{0},
aggCols: [][]uint32{{0}, {1, 2}},
aggFns: []execinfrapb.AggregatorSpec_Func{
execinfrapb.AnyNotNull,
execinfrapb.StringAgg,
},
expected: colexectestutils.Tuples{
{nil, "a2b"},
{0, "c4d5e"},
{1, "f7g"},
},
typs: []*types.T{types.Int, types.String, types.String},
name: "StringAgg",
groupCols: []uint32{0},
aggCols: [][]uint32{{0}, {1, 2}},
},
{
aggFns: []execinfrapb.AggregatorSpec_Func{
execinfrapb.AggregatorSpec_ANY_NOT_NULL,
execinfrapb.AggregatorSpec_STRING_AGG,
},
name: "StringAggWithConstDelimiter",
typs: []*types.T{types.Int, types.String},
input: colexectestutils.Tuples{
{nil, "a"},
{nil, "b"},
Expand All @@ -70,24 +69,21 @@ func TestDefaultAggregateFunc(t *testing.T) {
{1, "f"},
{1, "g"},
},
constArguments: [][]execinfrapb.Expression{nil, {{Expr: "'_'"}}},
groupCols: []uint32{0},
aggCols: [][]uint32{{0}, {1}},
aggFns: []execinfrapb.AggregatorSpec_Func{
execinfrapb.AnyNotNull,
execinfrapb.StringAgg,
},
expected: colexectestutils.Tuples{
{nil, "a_b"},
{0, "c_d_e"},
{1, "f_g"},
},
typs: []*types.T{types.Int, types.String},
name: "StringAggWithConstDelimiter",
groupCols: []uint32{0},
aggCols: [][]uint32{{0}, {1}},
constArguments: [][]execinfrapb.Expression{nil, {{Expr: "'_'"}}},
},
{
aggFns: []execinfrapb.AggregatorSpec_Func{
execinfrapb.AggregatorSpec_ANY_NOT_NULL,
execinfrapb.AggregatorSpec_JSON_AGG,
execinfrapb.AggregatorSpec_JSON_AGG,
execinfrapb.AggregatorSpec_STRING_AGG,
},
name: "JsonAggWithStringAgg",
typs: []*types.T{types.Int, types.Jsonb, types.String},
input: colexectestutils.Tuples{
{nil, `'{"id": 1}'`, "a"},
Expand All @@ -97,36 +93,41 @@ func TestDefaultAggregateFunc(t *testing.T) {
{0, `'{"id": 2}'`, "e"},
{1, `'{"id": 3}'`, "f"},
},
constArguments: [][]execinfrapb.Expression{nil, nil, nil, {{Expr: "'_'"}}},
groupCols: []uint32{0},
aggCols: [][]uint32{{0}, {1}, {2}, {2}},
aggFns: []execinfrapb.AggregatorSpec_Func{
execinfrapb.AnyNotNull,
execinfrapb.JSONAgg,
execinfrapb.JSONAgg,
execinfrapb.StringAgg,
},
expected: colexectestutils.Tuples{
{nil, `'[{"id": 1}, {"id": 2}]'`, `'["a", "b"]'`, "a_b"},
{0, `'[{"id": 1}, {"id": 2}, {"id": 2}]'`, `'["c", "d", "e"]'`, "c_d_e"},
{1, `'[{"id": 3}]'`, `'["f"]'`, "f"},
},
name: "JsonAggWithStringAgg",
groupCols: []uint32{0},
aggCols: [][]uint32{{0}, {1}, {2}, {2}},
constArguments: [][]execinfrapb.Expression{nil, nil, nil, {{Expr: "'_'"}}},
},
{
aggFns: []execinfrapb.AggregatorSpec_Func{
execinfrapb.AggregatorSpec_ANY_NOT_NULL,
execinfrapb.AggregatorSpec_XOR_AGG,
},
name: "XorAgg",
typs: rowenc.TwoIntCols,
input: colexectestutils.Tuples{
{nil, 3},
{nil, 1},
{0, -5},
{0, -1},
{0, 0},
},
groupCols: []uint32{0},
aggCols: [][]uint32{{0}, {1}},
aggFns: []execinfrapb.AggregatorSpec_Func{
execinfrapb.AnyNotNull,
execinfrapb.XorAgg,
},
expected: colexectestutils.Tuples{
{nil, 2},
{0, 4},
},
typs: []*types.T{types.Int, types.Int},
name: "XorAgg",
groupCols: []uint32{0},
aggCols: [][]uint32{{0}, {1}},
},
}

Expand Down Expand Up @@ -166,7 +167,7 @@ func TestDefaultAggregateFunc(t *testing.T) {
}

func BenchmarkDefaultAggregateFunction(b *testing.B) {
aggFn := execinfrapb.AggregatorSpec_STRING_AGG
aggFn := execinfrapb.StringAgg
for _, agg := range aggTypes {
for _, numInputRows := range []int{32, 32 * coldata.BatchSize()} {
for _, groupSize := range []int{1, 2, 32, 128, coldata.BatchSize()} {
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/colexec/external_distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ func NewExternalDistinct(
// No particular output ordering is required.
return ed
}
// TODO(yuzefovich): the fact that we're planning an additional external
// sort isn't accounted for when considering the number file descriptors to
// acquire. Not urgent, but it should be fixed.
maxNumberActivePartitions := calculateMaxNumberActivePartitions(flowCtx, args, numRequiredActivePartitions)
return createDiskBackedSorter(ed, inputTypes, outputOrdering.Columns, maxNumberActivePartitions)
}
32 changes: 13 additions & 19 deletions pkg/sql/colexec/external_distinct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,38 +67,35 @@ func TestExternalDistinct(t *testing.T) {
for tcIdx, tc := range distinctTestCases {
log.Infof(context.Background(), "spillForced=%t/%d", spillForced, tcIdx)
var semsToCheck []semaphore.Semaphore
var outputOrdering execinfrapb.Ordering
verifier := colexectestutils.UnorderedVerifier
// Check that the external distinct and the disk-backed sort
// were added as Closers.
numExpectedClosers := 2
if tc.isOrderedOnDistinctCols {
outputOrdering = convertDistinctColsToOrdering(tc.distinctCols)
verifier = colexectestutils.OrderedVerifier
// The final disk-backed sort must also be added as a
// Closer.
numExpectedClosers++
}
colexectestutils.RunTestsWithTyps(
t,
testAllocator,
[]colexectestutils.Tuples{tc.tuples},
[][]*types.T{tc.typs},
tc.expected,
// We're using an unordered verifier because the in-memory
// unordered distinct is free to change the order of the tuples
// when exporting them into an external distinct.
colexectestutils.UnorderedVerifier,
verifier,
func(input []colexecop.Operator) (colexecop.Operator, error) {
// A sorter should never exceed ExternalSorterMinPartitions, even
// during repartitioning. A panic will happen if a sorter requests
// more than this number of file descriptors.
sem := colexecop.NewTestingSemaphore(colexecop.ExternalSorterMinPartitions)
semsToCheck = append(semsToCheck, sem)
var outputOrdering execinfrapb.Ordering
if tc.isOrderedOnDistinctCols {
outputOrdering = convertDistinctColsToOrdering(tc.distinctCols)
}
distinct, newAccounts, newMonitors, closers, err := createExternalDistinct(
ctx, flowCtx, input, tc.typs, tc.distinctCols, outputOrdering,
queueCfg, sem, nil /* spillingCallbackFn */, numForcedRepartitions,
)
// Check that the external distinct and the disk-backed sort
// were added as Closers.
numExpectedClosers := 2
if len(outputOrdering.Columns) > 0 {
// The final disk-backed sort must also be added as a
// Closer.
numExpectedClosers++
}
require.Equal(t, numExpectedClosers, len(closers))
accounts = append(accounts, newAccounts...)
monitors = append(monitors, newMonitors...)
Expand Down Expand Up @@ -385,9 +382,6 @@ func createExternalDistinct(
}
args.TestingKnobs.SpillingCallbackFn = spillingCallbackFn
args.TestingKnobs.NumForcedRepartitions = numForcedRepartitions
// External sorter relies on different memory accounts to
// understand when to start a new partition, so we will not use
// the streaming memory account.
result, err := colexecargs.TestNewColOperator(ctx, flowCtx, args)
return result.Op, result.OpAccounts, result.OpMonitors, result.ToClose, err
}
Loading

0 comments on commit 71a023f

Please sign in to comment.