Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
62071: geo: fix tests to pass under bazel r=rickystewart a=otan

See individual commits for details.

Resolves #61664 

62255: colflow: fix premature stats collection and improve tracking r=yuzefovich a=yuzefovich

This commit achieves two goals:

1. previously, we could call `finishVectorizedStatsCollectors` prematurely
in some edge cases (e.g. when the right side of the hash join is empty,
we could collect the stats from the left input tree too early). This was
caused by the fact that we accumulate the vectorized stats collectors
into a queue which is handled by the outboxes or the root materializer.
This commit begins sharing the responsibility of the collecting stats
with the hash router too. Only the hash router needs updating to achieve
the fix because it is currently the only component that splits an input
stream. The change is also sound because when the hash router exits, all
stats collectors in its input tree should have the final info, and the
hash router can collect correct stats. Those stats are added as another
metadata object to be returned by the last to exit router output.

2. `vectorizedStatsCollectorsQueue` on the flow creator is removed in
favor of more precise tracking of which stats collectors belong to which
tree. The motivation behind this changes is that the follow-up commits
will unify the retrieval of stats and draining of the metadata sources so
that the former always occurred right before the latter.

Also this commits renames `metadataSourcesQueue` to `metadataSources` and
changes the order of arguments in a few functions.

Fixes: #56928.

Release note (bug fix): Previously, CockroachDB when collecting
execution statistics (e.g. when running EXPLAIN ANALYZE) could collect
them prematurely which would result in incorrect stats. This is now
fixed.

62288: colbuilder: use correct processorID for wrapped filterers r=yuzefovich a=yuzefovich

We forgot to use the correct processorID when planning wrapped filterers
which resulted in stats from those processors to be always attributed to
a processor with ID = 0.

Release note: None (no stable release with this bug)

62293: sql: remove some remnants of the heuristics planner r=yuzefovich a=yuzefovich

The heuristic planner has been long gone, but some remnants still
remain. The most notable change is to `createTableNode` where we needed
to synthesize a row ID column in the HP. I also searched for "heuristic
pl" in the code base and found a few other mentions which are also
removed.

Two more are still present since they didn't seem as trivial.

Release note: None

Co-authored-by: Oliver Tan <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
3 people committed Mar 22, 2021
5 parents e6a0d23 + 5c6d96d + 4310252 + abe20c8 + ea6a557 commit 27b4ab1
Show file tree
Hide file tree
Showing 40 changed files with 407 additions and 301 deletions.
48 changes: 45 additions & 3 deletions c-deps/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,51 @@ cmake_external(
visibility = ["//visibility:public"],
)

# TODO(irfansharif): libgeos has not been worked out yet. We'll need to
# similarly ensure the lib/libgeos.so and lib/libegeos_c.so are in the right
# place.
# Define the targets for libgeos.
cmake_external(
name = "libgeos",
cache_entries = {
"CMAKE_BUILD_TYPE": "Release",
"CMAKE_C_FLAGS": "-fPIC",
"CMAKE_CXX_FLAGS": "-fPIC",
},
lib_source = "@geos//:all",
make_commands = [
"mkdir -p libgeos/lib",
"make --no-print-directory geos_c",
] + select({
"@io_bazel_rules_go//go/platform:darwin": [
"cp -L $BUILD_TMPDIR/lib/libgeos.dylib libgeos/lib",
"cp -L $BUILD_TMPDIR/lib/libgeos_c.dylib libgeos/lib",
# TODO(#bazel): install_name_tool is also required here for release.
],
"@io_bazel_rules_go//go/platform:windows": [
# NOTE: Windows ends up in bin/ on the BUILD_TMPDIR.
"cp -L $BUILD_TMPDIR/bin/libgeos.dll libgeos/lib",
"cp -L $BUILD_TMPDIR/bin/libgeos_c.dll libgeos/lib",
],
"//conditions:default": [
"cp -L $BUILD_TMPDIR/lib/libgeos.so libgeos/lib",
"cp -L $BUILD_TMPDIR/lib/libgeos_c.so libgeos/lib",
# TODO(#bazel): patchelf is also required here for release.
],
}),
shared_libraries = select({
"@io_bazel_rules_go//go/platform:darwin": [
"libgeos_c.dylib",
"libgeos.dylib",
],
"@io_bazel_rules_go//go/platform:windows": [
"libgeos_c.dll",
"libgeos.dll",
],
"//conditions:default": [
"libgeos_c.so",
"libgeos.so",
],
}),
visibility = ["//visibility:public"],
)

# Define the build target for libroach.
#
Expand Down
5 changes: 5 additions & 0 deletions c-deps/REPOSITORIES.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ def c_deps():
path = "c-deps/proj",
build_file_content = BUILD_ALL_CONTENT,
)
native.new_local_repository(
name = "geos",
path = "c-deps/geos",
build_file_content = BUILD_ALL_CONTENT,
)
native.new_local_repository(
name = "protobuf",
path = "c-deps/protobuf",
Expand Down
2 changes: 1 addition & 1 deletion pkg/geo/geogfn/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ go_test(
"topology_operations_test.go",
"unary_operators_test.go",
],
data = ["@cockroach//c-deps:libgeos"],
embed = [":geogfn"],
tags = ["broken_in_bazel"],
deps = [
"//pkg/geo",
"//pkg/geo/geoprojbase",
Expand Down
3 changes: 1 addition & 2 deletions pkg/geo/geoindex/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,8 @@ go_test(
"s2_geometry_index_test.go",
"utils_test.go",
],
data = glob(["testdata/**"]),
data = glob(["testdata/**"]) + ["@cockroach//c-deps:libgeos"],
embed = [":geoindex"],
tags = ["broken_in_bazel"],
deps = [
"//pkg/geo",
"//pkg/geo/geogfn",
Expand Down
2 changes: 1 addition & 1 deletion pkg/geo/geomfn/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ go_test(
"validity_check_test.go",
"voronoi_test.go",
],
data = ["@cockroach//c-deps:libgeos"],
embed = [":geomfn"],
tags = ["broken_in_bazel"],
deps = [
"//pkg/geo",
"//pkg/geo/geopb",
Expand Down
3 changes: 2 additions & 1 deletion pkg/geo/geos/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/geo/geos",
visibility = ["//visibility:public"],
deps = [
"//pkg/build/bazel",
"//pkg/docs",
"//pkg/geo/geopb",
"@com_github_cockroachdb_errors//:errors",
Expand All @@ -67,8 +68,8 @@ go_test(
name = "geos_test",
size = "small",
srcs = ["geos_test.go"],
data = ["@cockroach//c-deps:libgeos"],
embed = [":geos"],
tags = ["broken_in_bazel"],
deps = [
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
Expand Down
8 changes: 8 additions & 0 deletions pkg/geo/geos/geos.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ package geos

import (
"os"
"path"
"path/filepath"
"runtime"
"sync"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/build/bazel"
"github.com/cockroachdb/cockroach/pkg/docs"
"github.com/cockroachdb/cockroach/pkg/geo/geopb"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -135,6 +137,12 @@ func findLibraryDirectories(flagLibraryDirectoryValue string, crdbBinaryLoc stri
),
findLibraryDirectoriesInParentingDirectories(cwd)...,
)
// Account for the libraries to be in a bazel runfile path.
if bazel.BuiltWithBazel() {
if p, err := bazel.Runfile(path.Join("c-deps", "libgeos", "lib")); err == nil {
locs = append(locs, p)
}
}
return locs
}

Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ go_library(
"sort_chunks.go",
"sort_utils.go",
"sorttopk.go",
"stats.go",
"tuple_proj_op.go",
"unordered_distinct.go",
"utils.go",
Expand Down
12 changes: 8 additions & 4 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ func wrapRowSources(
input,
inputTypes[i],
nil, /* output */
nil, /* getStats */
metadataSources,
nil, /* toClose */
nil, /* getStats */
nil, /* cancelFlow */
)
if err != nil {
Expand Down Expand Up @@ -770,7 +770,9 @@ func NewColOperator(
result.ColumnTypes = make([]*types.T, len(spec.Input[0].ColumnTypes))
copy(result.ColumnTypes, spec.Input[0].ColumnTypes)
result.Op = inputs[0]
if err := result.planAndMaybeWrapFilter(ctx, flowCtx, evalCtx, args, core.Filterer.Filter, factory); err != nil {
if err := result.planAndMaybeWrapFilter(
ctx, flowCtx, evalCtx, args, spec.ProcessorID, core.Filterer.Filter, factory,
); err != nil {
return r, err
}

Expand Down Expand Up @@ -1070,7 +1072,7 @@ func NewColOperator(

if !core.HashJoiner.OnExpr.Empty() && core.HashJoiner.Type == descpb.InnerJoin {
if err = result.planAndMaybeWrapFilter(
ctx, flowCtx, evalCtx, args, core.HashJoiner.OnExpr, factory,
ctx, flowCtx, evalCtx, args, spec.ProcessorID, core.HashJoiner.OnExpr, factory,
); err != nil {
return r, err
}
Expand Down Expand Up @@ -1124,7 +1126,7 @@ func NewColOperator(

if onExpr != nil {
if err = result.planAndMaybeWrapFilter(
ctx, flowCtx, evalCtx, args, *onExpr, factory,
ctx, flowCtx, evalCtx, args, spec.ProcessorID, *onExpr, factory,
); err != nil {
return r, err
}
Expand Down Expand Up @@ -1370,6 +1372,7 @@ func (r opResult) planAndMaybeWrapFilter(
flowCtx *execinfra.FlowCtx,
evalCtx *tree.EvalContext,
args *colexecargs.NewColOperatorArgs,
processorID int32,
filter execinfrapb.Expression,
factory coldata.ColumnFactory,
) error {
Expand All @@ -1393,6 +1396,7 @@ func (r opResult) planAndMaybeWrapFilter(
Filter: filter,
},
},
ProcessorID: processorID,
ResultTypes: args.Spec.ResultTypes,
}
return r.createAndWrapRowSource(
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/colexec/colbuilder/execplan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@ func TestNewColOperatorExpectedTypeSchema(t *testing.T) {
r.Op,
[]*types.T{types.Int},
nil, /* output */
nil, /* metadataSourcesQueue */
nil, /* toClose */
nil, /* getStats */
nil, /* metadataSources */
nil, /* toClose */
nil, /* cancelFlow */
)
require.NoError(t, err)
Expand Down
22 changes: 12 additions & 10 deletions pkg/sql/colexec/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,14 @@ type Materializer struct {
// trailing metadata state, which is meant only for internal metadata
// generation.
type drainHelper struct {
sources execinfrapb.MetadataSources
// If unset, the drainHelper wasn't Start()'ed, so all operations on it
// are noops.
ctx context.Context
ctx context.Context

getStats func() []*execinfrapb.ComponentStats
sources execinfrapb.MetadataSources

bufferedMeta []execinfrapb.ProducerMetadata
getStats func() []*execinfrapb.ComponentStats
}

var _ execinfra.RowSource = &drainHelper{}
Expand All @@ -90,11 +92,11 @@ var drainHelperPool = sync.Pool{
}

func newDrainHelper(
sources execinfrapb.MetadataSources, getStats func() []*execinfrapb.ComponentStats,
getStats func() []*execinfrapb.ComponentStats, sources execinfrapb.MetadataSources,
) *drainHelper {
d := drainHelperPool.Get().(*drainHelper)
d.sources = sources
d.getStats = getStats
d.sources = sources
return d
}

Expand Down Expand Up @@ -180,10 +182,10 @@ var materializerEmptyPostProcessSpec = &execinfrapb.PostProcessSpec{}
// columnar data coming from input to return it as rows.
// Arguments:
// - typs is the output types scheme.
// - metadataSourcesQueue are all of the metadata sources that are planned on
// the same node as the Materializer and that need to be drained.
// - getStats (when tracing is enabled) returns all of the execution statistics
// of operators which the materializer is responsible for.
// - metadataSources are all of the metadata sources that are planned on the
// same node as the Materializer and that need to be drained.
// - cancelFlow should return the context cancellation function that cancels
// the context of the flow (i.e. it is Flow.ctxCancel). It should only be
// non-nil in case of a root Materializer (i.e. not when we're wrapping a row
Expand All @@ -196,17 +198,17 @@ func NewMaterializer(
input colexecop.Operator,
typs []*types.T,
output execinfra.RowReceiver,
metadataSourcesQueue []execinfrapb.MetadataSource,
toClose []colexecop.Closer,
getStats func() []*execinfrapb.ComponentStats,
metadataSources []execinfrapb.MetadataSource,
toClose []colexecop.Closer,
cancelFlow func() context.CancelFunc,
) (*Materializer, error) {
m := materializerPool.Get().(*Materializer)
*m = Materializer{
ProcessorBase: m.ProcessorBase,
input: input,
typs: typs,
drainHelper: newDrainHelper(metadataSourcesQueue, getStats),
drainHelper: newDrainHelper(getStats, metadataSources),
converter: colconv.NewAllVecToDatumConverter(len(typs)),
row: make(rowenc.EncDatumRow, len(typs)),
closers: toClose,
Expand Down
14 changes: 7 additions & 7 deletions pkg/sql/colexec/materializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ func TestColumnarizeMaterialize(t *testing.T) {
c,
typs,
nil, /* output */
nil, /* metadataSourcesQueue */
nil, /* toClose */
nil, /* getStats */
nil, /* metadataSources */
nil, /* toClose */
nil, /* cancelFlow */
)
if err != nil {
Expand Down Expand Up @@ -152,9 +152,9 @@ func BenchmarkMaterializer(b *testing.B) {
input,
typs,
nil, /* output */
nil, /* metadataSourcesQueue */
nil, /* toClose */
nil, /* getStats */
nil, /* metadataSources */
nil, /* toClose */
nil, /* cancelFlow */
)
if err != nil {
Expand Down Expand Up @@ -207,9 +207,9 @@ func TestMaterializerNextErrorAfterConsumerDone(t *testing.T) {
&colexecop.CallbackOperator{},
nil, /* typ */
nil, /* output */
nil, /* getStats */
[]execinfrapb.MetadataSource{metadataSource},
nil, /* toClose */
nil, /* getStats */
nil, /* cancelFlow */
)
require.NoError(t, err)
Expand Down Expand Up @@ -256,9 +256,9 @@ func BenchmarkColumnarizeMaterialize(b *testing.B) {
c,
types,
nil, /* output */
nil, /* metadataSourcesQueue */
nil, /* toClose */
nil, /* getStats */
nil, /* metadataSources */
nil, /* toClose */
nil, /* cancelFlow */
)
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/colexec/parallel_unordered_synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,13 @@ func (s *ParallelUnorderedSynchronizer) Child(nth int, verbose bool) execinfra.O
type SynchronizerInput struct {
// Op is the input Operator.
Op colexecop.Operator
// StatsCollectors are all vectorized stats collectors in the input tree.
// The field is currently being used *only* to track all of the stats
// collectors in the input tree, and the synchronizers should *not* access
// it themselves.
// TODO(yuzefovich): actually move the logic of getting stats into the
// synchronizers.
StatsCollectors []VectorizedStatsCollector
// MetadataSources are metadata sources in the input tree that should be
// drained in the same goroutine as Op.
MetadataSources execinfrapb.MetadataSources
Expand Down
22 changes: 22 additions & 0 deletions pkg/sql/colexec/stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package colexec

import (
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
)

// VectorizedStatsCollector is the common interface implemented by collectors.
type VectorizedStatsCollector interface {
colexecop.Operator
GetStats() *execinfrapb.ComponentStats
}
4 changes: 2 additions & 2 deletions pkg/sql/colexec/types_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ func TestSQLTypesIntegration(t *testing.T) {
arrowOp,
typs,
output,
nil, /* metadataSourcesQueue */
nil, /* toClose */
nil, /* getStats */
nil, /* metadataSources */
nil, /* toClose */
nil, /* cancelFlow */
)
require.NoError(t, err)
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colflow/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ go_library(
"//pkg/util/randutil",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/treeprinter",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
Expand Down
Loading

0 comments on commit 27b4ab1

Please sign in to comment.