diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index 0faaae00a120..fa1bd341d8e9 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -366,7 +366,7 @@ func (r *Registry) maybeCancelJobs(ctx context.Context, nl NodeLiveness) { func (r *Registry) cleanupOldJobs(ctx context.Context, olderThan time.Time) error { const stmt = `SELECT id, payload FROM system.jobs WHERE status IN ($1, $2, $3) AND created < $4 ORDER BY created LIMIT 1000` - rows, _ /* cols */, err := r.ex.Query( + rows, err := r.ex.Query( ctx, "gc-jobs", nil /* txn */, stmt, StatusFailed, StatusSucceeded, StatusCanceled, olderThan, ) if err != nil { @@ -600,7 +600,7 @@ func AddResumeHook(fn ResumeHookFn) { func (r *Registry) maybeAdoptJob(ctx context.Context, nl NodeLiveness) error { const stmt = `SELECT id, payload, progress IS NULL FROM system.jobs WHERE status IN ($1, $2) ORDER BY created DESC` - rows, _ /* cols */, err := r.ex.Query( + rows, err := r.ex.Query( ctx, "adopt-job", nil /* txn */, stmt, StatusPending, StatusRunning, ) if err != nil { diff --git a/pkg/server/server_update.go b/pkg/server/server_update.go index d1a26ab20413..21c6ceffbec7 100644 --- a/pkg/server/server_update.go +++ b/pkg/server/server_update.go @@ -138,7 +138,7 @@ func (s *Server) upgradeStatus(ctx context.Context) (bool, error) { } // Check if auto upgrade is enabled at current version. - datums, _, err := s.internalExecutor.Query( + datums, err := s.internalExecutor.Query( ctx, "read-downgrade", nil, /* txn */ "SELECT value FROM system.settings WHERE name = 'cluster.preserve_downgrade_option';", ) @@ -162,7 +162,7 @@ func (s *Server) upgradeStatus(ctx context.Context) (bool, error) { // (which returns the version from the KV store as opposed to the possibly // lagging settings subsystem). func (s *Server) clusterVersion(ctx context.Context) (string, error) { - datums, _, err := s.internalExecutor.Query( + datums, err := s.internalExecutor.Query( ctx, "show-version", nil, /* txn */ "SHOW CLUSTER SETTING version;", ) diff --git a/pkg/server/updates.go b/pkg/server/updates.go index 99288b610371..9a6951048e5a 100644 --- a/pkg/server/updates.go +++ b/pkg/server/updates.go @@ -346,7 +346,7 @@ func (s *Server) getReportingInfo(ctx context.Context) *diagnosticspb.Diagnostic // Read the system.settings table to determine the settings for which we have // explicitly set values -- the in-memory SV has the set and default values // flattened for quick reads, but we'd rather only report the non-defaults. - if datums, _, err := s.internalExecutor.Query( + if datums, err := s.internalExecutor.Query( ctx, "read-setting", nil /* txn */, "SELECT name FROM system.settings", ); err != nil { log.Warningf(ctx, "failed to read settings: %s", err) @@ -358,7 +358,7 @@ func (s *Server) getReportingInfo(ctx context.Context) *diagnosticspb.Diagnostic } } - if datums, _, err := s.internalExecutor.Query( + if datums, err := s.internalExecutor.Query( ctx, "read-zone-configs", nil, /* txn */ diff --git a/pkg/sql/authorization.go b/pkg/sql/authorization.go index 5944a7303f0b..1395a2ad647c 100644 --- a/pkg/sql/authorization.go +++ b/pkg/sql/authorization.go @@ -247,7 +247,7 @@ func (p *planner) resolveMemberOfWithAdminOption( } visited[m] = struct{}{} - rows, _ /* cols */, err := p.ExecCfg().InternalExecutor.Query( + rows, err := p.ExecCfg().InternalExecutor.Query( ctx, "expand-roles", nil /* txn */, lookupRolesStmt, m, ) if err != nil { diff --git a/pkg/sql/check.go b/pkg/sql/check.go index 49fbaabd6909..b6c19aa6f040 100644 --- a/pkg/sql/check.go +++ b/pkg/sql/check.go @@ -49,7 +49,6 @@ func validateCheckExpr( } lim := &tree.Limit{Count: tree.NewDInt(1)} stmt := &tree.Select{Select: sel, Limit: lim} - queryStr := tree.AsStringWithFlags(stmt, tree.FmtParsable) log.Infof(ctx, "Validating check constraint %q with query %q", expr.String(), queryStr) @@ -178,34 +177,27 @@ func (p *planner) validateForeignKey( query, ) - rows, err := p.delegateQuery(ctx, "ALTER TABLE VALIDATE", query, nil, nil) + plan, err := p.delegateQuery(ctx, "ALTER TABLE VALIDATE", query, nil, nil) if err != nil { return err } - rows, err = p.optimizePlan(ctx, rows, allColumns(rows)) + plan, err = p.optimizePlan(ctx, plan, allColumns(plan)) if err != nil { return err } - defer rows.Close(ctx) + defer plan.Close(ctx) - params := runParams{ - ctx: ctx, - extendedEvalCtx: &p.extendedEvalCtx, - p: p, - } - if err := startPlan(params, rows); err != nil { - return err - } - next, err := rows.Next(params) + rows, err := p.runWithDistSQL(ctx, plan) if err != nil { return err } + defer rows.Close(ctx) - if next { + if rows.Len() > 0 { return pgerror.NewErrorf(pgerror.CodeForeignKeyViolationError, "foreign key violation: MATCH FULL does not allow mixing of null and nonnull values %s for %s", - rows.Values(), srcIdx.ForeignKey.Name, + rows.At(0), srcIdx.ForeignKey.Name, ) } } @@ -217,42 +209,36 @@ func (p *planner) validateForeignKey( query, ) - rows, err := p.delegateQuery(ctx, "ALTER TABLE VALIDATE", query, nil, nil) + plan, err := p.delegateQuery(ctx, "ALTER TABLE VALIDATE", query, nil, nil) if err != nil { return err } - rows, err = p.optimizePlan(ctx, rows, allColumns(rows)) + plan, err = p.optimizePlan(ctx, plan, allColumns(plan)) if err != nil { return err } - defer rows.Close(ctx) + defer plan.Close(ctx) - params := runParams{ - ctx: ctx, - extendedEvalCtx: &p.extendedEvalCtx, - p: p, - } - if err := startPlan(params, rows); err != nil { - return err - } - next, err := rows.Next(params) + rows, err := p.runWithDistSQL(ctx, plan) if err != nil { return err } + defer rows.Close(ctx) - if next { - values := rows.Values() - var pairs bytes.Buffer - for i := range values { - if i > 0 { - pairs.WriteString(", ") - } - pairs.WriteString(fmt.Sprintf("%s=%v", srcIdx.ColumnNames[i], values[i])) + if rows.Len() == 0 { + return nil + } + + values := rows.At(0) + var pairs bytes.Buffer + for i := range values { + if i > 0 { + pairs.WriteString(", ") } - return pgerror.NewErrorf(pgerror.CodeForeignKeyViolationError, - "foreign key violation: %q row %s has no match in %q", - srcTable.Name, pairs.String(), targetTable.Name) + pairs.WriteString(fmt.Sprintf("%s=%v", srcIdx.ColumnNames[i], values[i])) } - return nil + return pgerror.NewErrorf(pgerror.CodeForeignKeyViolationError, + "foreign key violation: %q row %s has no match in %q", + srcTable.Name, pairs.String(), targetTable.Name) } diff --git a/pkg/sql/copy.go b/pkg/sql/copy.go index ee84e2f838be..1880310ccc52 100644 --- a/pkg/sql/copy.go +++ b/pkg/sql/copy.go @@ -334,7 +334,7 @@ func (c *copyMachine) insertRows(ctx context.Context) (retErr error) { extendedEvalCtx: &c.p.extendedEvalCtx, p: &c.p, } - if err := startPlan(params, insertNode); err != nil { + if err := startExec(params, insertNode); err != nil { return err } rows, err := countRowsAffected(params, insertNode) diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index 393d9bfc03d6..d161eec1318d 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -1739,7 +1739,7 @@ CREATE TABLE crdb_internal.zones ( return 0, "", fmt.Errorf("object with ID %d does not exist", id) } - rows, _ /* cols */, err := p.ExtendedEvalContext().ExecCfg.InternalExecutor.Query( + rows, err := p.ExtendedEvalContext().ExecCfg.InternalExecutor.Query( ctx, "crdb-internal-zones-table", p.txn, `SELECT id, config FROM system.zones`) if err != nil { return err diff --git a/pkg/sql/create_stats.go b/pkg/sql/create_stats.go index ef5498404535..b6c3d659cde0 100644 --- a/pkg/sql/create_stats.go +++ b/pkg/sql/create_stats.go @@ -322,7 +322,7 @@ func checkRunningJobs(ctx context.Context, job *jobs.Job, p *planner) error { } const stmt = `SELECT id, payload FROM system.jobs WHERE status IN ($1, $2) ORDER BY created` - rows, _ /* cols */, err := p.ExecCfg().InternalExecutor.Query( + rows, err := p.ExecCfg().InternalExecutor.Query( ctx, "get-jobs", nil /* txn */, stmt, jobs.StatusPending, jobs.StatusRunning, ) if err != nil { diff --git a/pkg/sql/distinct.go b/pkg/sql/distinct.go index 3fea97930b25..7b09c4cd139a 100644 --- a/pkg/sql/distinct.go +++ b/pkg/sql/distinct.go @@ -17,13 +17,10 @@ package sql import ( "context" - "github.com/cockroachdb/cockroach/pkg/sql/distsqlpb" - "github.com/cockroachdb/cockroach/pkg/sql/distsqlrun" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sem/types" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/util" - "github.com/pkg/errors" ) // distinctNode de-duplicates rows returned by a wrapped planNode. @@ -42,8 +39,6 @@ type distinctNode struct { // Subset of distinctOnColIdxs on which the input guarantees an ordering. // All rows that are equal on these columns appear contiguously in the input. columnsInOrder util.FastIntSet - - run *rowSourceToPlanNode } // distinct constructs a distinctNode. @@ -190,65 +185,18 @@ func (p *planner) distinct( } func (n *distinctNode) startExec(params runParams) error { - flowCtx := &distsqlrun.FlowCtx{ - EvalCtx: params.EvalContext(), - } - - cols := make([]int, len(planColumns(n.plan))) - for i := range cols { - cols[i] = i - } - - spec := createDistinctSpec(n, cols) - - input, err := makePlanNodeToRowSource(n.plan, params, false) - if err != nil { - return err - } - if len(spec.DistinctColumns) == 0 { - return errors.New("cannot initialize a distinctNode with 0 columns") - } - - // Normally, startExec isn't recursive, since it's invoked for all nodes using - // the planTree walker. And as normal, the walker will startExec the source - // of this distinct. - // But, we also need to startExec our planNodeToRowSource to properly - // initialize it. That won't get touched via the planNode walker, so we have - // to do it recursively here. - if err := input.startExec(params); err != nil { - return err - } - if err := input.InitWithOutput(&distsqlpb.PostProcessSpec{}, nil); err != nil { - return err - } - - post := &distsqlpb.PostProcessSpec{} // post is not used as we only use the processor for the core distinct logic. - var output distsqlrun.RowReceiver // output is never used as distinct is only run as a RowSource. - - proc, err := distsqlrun.NewDistinct(flowCtx, 0 /* processorID */, spec, input, post, output) - if err != nil { - return err - } - - n.run = makeRowSourceToPlanNode(proc, nil /* forwarder */, planColumns(n), nil /* originalPlanNode */) - - n.run.source.Start(params.ctx) - - return nil + panic("distinctNode can't be called in local mode") } func (n *distinctNode) Next(params runParams) (bool, error) { - return n.run.Next(params) + panic("distinctNode can't be called in local mode") } func (n *distinctNode) Values() tree.Datums { - return n.run.Values() + panic("distinctNode can't be called in local mode") } func (n *distinctNode) Close(ctx context.Context) { - if n.run != nil { - n.run.Close(ctx) - } n.plan.Close(ctx) } diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index a01a3db7b53d..c1f0a8d65518 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -862,7 +862,7 @@ func initTableReaderSpec( *s = distsqlpb.TableReaderSpec{ Table: *n.desc.TableDesc(), Reverse: n.reverse, - IsCheck: n.run.isCheck, + IsCheck: n.isCheck, Visibility: n.colCfg.visibility.toDistSQLScanVisibility(), // Retain the capacity of the spans slice. @@ -877,7 +877,7 @@ func initTableReaderSpec( // When a TableReader is running scrub checks, do not allow a // post-processor. This is because the outgoing stream is a fixed // format (distsqlrun.ScrubTypes). - if n.run.isCheck { + if n.isCheck { return s, distsqlpb.PostProcessSpec{}, nil } @@ -2447,12 +2447,6 @@ func (dsp *DistSQLPlanner) wrapPlan(planCtx *PlanningCtx, n planNode) (PhysicalP // Don't continue recursing into explain nodes - they need to be left // alone since they handle their own planning later. return false, nil - case *deleteNode: - // DeleteNode currently uses its scanNode directly, if it exists. This - // is a bit tough to fix, so for now, don't try to recurse through - // deleteNodes. - // TODO(jordan): fix deleteNode to stop doing that. - return false, nil } if !seenTop { // We know we're wrapping the first node, so ignore it. diff --git a/pkg/sql/distsqlrun/hashjoiner.go b/pkg/sql/distsqlrun/hashjoiner.go index a157ac55d4a7..a292b847e85a 100644 --- a/pkg/sql/distsqlrun/hashjoiner.go +++ b/pkg/sql/distsqlrun/hashjoiner.go @@ -642,6 +642,49 @@ func (h *hashJoiner) receiveNext( } else if row == nil { return nil, nil, false, nil } + // We make the explicit check for whether or not the row contained a NULL value + // on an equality column. The reasoning here is because of the way we expect + // NULL equality checks to behave (i.e. NULL != NULL) and the fact that we + // use the encoding of any given row as key into our bucket. Thus if we + // encountered a NULL row when building the hashmap we have to store in + // order to use it for RIGHT OUTER joins but if we encounter another + // NULL row when going through the left stream (probing phase), matching + // this with the first NULL row would be incorrect. + // + // If we have have the following: + // CREATE TABLE t(x INT); INSERT INTO t(x) VALUES (NULL); + // | x | + // ------ + // | NULL | + // + // For the following query: + // SELECT * FROM t AS a FULL OUTER JOIN t AS b USING(x); + // + // We expect: + // | x | + // ------ + // | NULL | + // | NULL | + // + // The following examples illustrates the behavior when joining on two + // or more columns, and only one of them contains NULL. + // If we have have the following: + // CREATE TABLE t(x INT, y INT); + // INSERT INTO t(x, y) VALUES (44,51), (NULL,52); + // | x | y | + // ------ + // | 44 | 51 | + // | NULL | 52 | + // + // For the following query: + // SELECT * FROM t AS a FULL OUTER JOIN t AS b USING(x, y); + // + // We expect: + // | x | y | + // ------ + // | 44 | 51 | + // | NULL | 52 | + // | NULL | 52 | hasNull := false for _, c := range h.eqCols[side] { if row[c].IsNull() { diff --git a/pkg/sql/filter.go b/pkg/sql/filter.go index 8ff506ddc37d..f05f3e5d0034 100644 --- a/pkg/sql/filter.go +++ b/pkg/sql/filter.go @@ -19,7 +19,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sem/types" - "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" ) // filterNode implements a filtering stage. It is intended to be used @@ -56,26 +55,13 @@ func (f *filterNode) startExec(runParams) error { // Next implements the planNode interface. func (f *filterNode) Next(params runParams) (bool, error) { - for { - if next, err := f.source.plan.Next(params); !next { - return false, err - } - - params.extendedEvalCtx.PushIVarContainer(f) - passesFilter, err := sqlbase.RunFilter(f.filter, params.EvalContext()) - params.extendedEvalCtx.PopIVarContainer() - if err != nil { - return false, err - } + panic("filterNode cannot be run in local mode") +} - if passesFilter { - return true, nil - } - // Row was filtered out; grab the next row. - } +func (f *filterNode) Values() tree.Datums { + panic("filterNode cannot be run in local mode") } -func (f *filterNode) Values() tree.Datums { return f.source.plan.Values() } func (f *filterNode) Close(ctx context.Context) { f.source.plan.Close(ctx) } func (f *filterNode) computePhysicalProps(evalCtx *tree.EvalContext) { diff --git a/pkg/sql/group.go b/pkg/sql/group.go index 0317cb592d2b..759ba77c38eb 100644 --- a/pkg/sql/group.go +++ b/pkg/sql/group.go @@ -60,8 +60,6 @@ type groupNode struct { funcs []*aggregateFuncHolder props physicalProps - - run groupRun } // groupBy constructs a planNode "complex" consisting of a groupNode and other @@ -308,166 +306,16 @@ func (p *planner) groupBy( return plan, group, nil } -// groupRun contains the run-time state for groupNode during local execution. -type groupRun struct { - // The set of bucket keys. We add buckets as we are processing input rows, and - // we remove them as we are outputting results. - buckets map[string]struct{} - populated bool - sourceEmpty bool - - lastOrderedGroupKey tree.Datums - consumedGroupKey bool - - // The current result row. - values tree.Datums - - // gotOneRow becomes true after one result row has been produced. - // Used in conjunction with needOnlyOneRow. - gotOneRow bool - - scratch []byte -} - -// matchLastGroupKey takes a row and matches it with the row stored by -// lastOrderedGroupKey. It returns true if the two rows are equal on the -// grouping columns, and false otherwise. -func (n *groupNode) matchLastGroupKey(ctx *tree.EvalContext, row tree.Datums) bool { - for _, i := range n.orderedGroupCols { - if n.run.lastOrderedGroupKey[i].Compare(ctx, row[i]) != 0 { - return false - } - } - return true -} - -// accumulateRow takes a row and accumulates it into all the aggregate -// functions. -func (n *groupNode) accumulateRow(params runParams, values tree.Datums) error { - bucket := n.run.scratch - for _, idx := range n.groupCols { - var err error - bucket, err = sqlbase.EncodeDatumKeyAscending(bucket, values[idx]) - if err != nil { - return err - } - } - - n.run.buckets[string(bucket)] = struct{}{} - - // Feed the aggregateFuncHolders for this bucket the non-grouped values. - for _, f := range n.funcs { - if f.hasFilter() && values[f.filterRenderIdx] != tree.DBoolTrue { - continue - } - - var value tree.Datum - if f.argRenderIdx != noRenderIdx { - value = values[f.argRenderIdx] - } - - if err := f.add(params.ctx, params.EvalContext(), bucket, value); err != nil { - return err - } - } - - n.run.scratch = bucket[:0] - n.run.gotOneRow = true - - return nil -} - func (n *groupNode) startExec(params runParams) error { - // TODO(peter): This memory isn't being accounted for. The similar code in - // sql/distsqlrun/aggregator.go does account for the memory. - n.run.buckets = make(map[string]struct{}) - return nil + panic("groupNode cannot be run in local mode") } func (n *groupNode) Next(params runParams) (bool, error) { - // We're going to accumulate rows from n.plan until it's either exhausted or - // the ordered group columns change. Subsequent calls to Next will return the - // result of each bucket, then continue accumulating n.plan when there are no - // more buckets. - for (!n.run.populated || len(n.run.buckets) == 0) && !n.run.sourceEmpty { - // We've finished consuming the old buckets. - n.run.populated = false - - if !n.run.consumedGroupKey && n.run.lastOrderedGroupKey != nil { - if err := n.accumulateRow(params, n.run.lastOrderedGroupKey); err != nil { - return false, err - } - n.run.consumedGroupKey = true - } - - next := false - if err := params.p.cancelChecker.Check(); err != nil { - return false, err - } - if !(n.needOnlyOneRow && n.run.gotOneRow) { - var err error - next, err = n.plan.Next(params) - if err != nil { - return false, err - } - } - if !next { - n.run.sourceEmpty = true - n.setupOutput() - break - } - - values := n.plan.Values() - if n.run.lastOrderedGroupKey == nil { - n.run.lastOrderedGroupKey = make(tree.Datums, len(values)) - copy(n.run.lastOrderedGroupKey, values) - n.run.consumedGroupKey = true - } - if !n.matchLastGroupKey(params.EvalContext(), values) { - copy(n.run.lastOrderedGroupKey, values) - n.run.consumedGroupKey = false - n.run.populated = true - n.setupOutput() - break - } - - // Add row to bucket. - if err := n.accumulateRow(params, values); err != nil { - return false, err - } - } - - if len(n.run.buckets) == 0 { - return false, nil - } - var bucket string - // Pick an arbitrary bucket. - for bucket = range n.run.buckets { - break - } - // TODO(peter): Deleting from the n.run.buckets is fairly slow. The similar - // code in distsqlrun.aggregator performs a single step of copying all of the - // buckets to a slice and then releasing the buckets map. - delete(n.run.buckets, bucket) - for i, f := range n.funcs { - aggregateFunc, ok := f.run.buckets[bucket] - if !ok { - // No input for this bucket (possible if f has a FILTER). - // In most cases the result is NULL but there are exceptions - // (like COUNT). - aggregateFunc = f.create(params.EvalContext(), nil /* arguments */) - } - var err error - n.run.values[i], err = aggregateFunc.Result() - if err != nil { - return false, err - } - } - return true, nil + panic("groupNode cannot be run in local mode") } func (n *groupNode) Values() tree.Datums { - return n.run.values + panic("groupNode cannot be run in local mode") } func (n *groupNode) Close(ctx context.Context) { @@ -475,18 +323,6 @@ func (n *groupNode) Close(ctx context.Context) { for _, f := range n.funcs { f.close(ctx) } - n.run.buckets = nil -} - -// setupOutput runs once after all the input rows have been processed. It sets -// up the necessary state to start iterating through the buckets in Next(). -func (n *groupNode) setupOutput() { - if len(n.run.buckets) < 1 && n.isScalar { - n.run.buckets[""] = struct{}{} - } - if n.run.values == nil { - n.run.values = make(tree.Datums, len(n.funcs)) - } } // requiresIsDistinctFromNullFilter returns whether a @@ -852,35 +688,3 @@ func (a *aggregateFuncHolder) close(ctx context.Context) { a.run.bucketsMemAcc.Close(ctx) } - -// add accumulates one more value for a particular bucket into an aggregation -// function. -func (a *aggregateFuncHolder) add( - ctx context.Context, evalCtx *tree.EvalContext, bucket []byte, d tree.Datum, -) error { - // NB: the compiler *should* optimize `myMap[string(myBytes)]`. See: - // https://github.com/golang/go/commit/f5f5a8b6209f84961687d993b93ea0d397f5d5bf - - if a.run.seen != nil { - encoded, err := sqlbase.EncodeDatumKeyAscending(bucket, d) - if err != nil { - return err - } - if _, ok := a.run.seen[string(encoded)]; ok { - // skip - return nil - } - if err := a.run.bucketsMemAcc.Grow(ctx, int64(len(encoded))); err != nil { - return err - } - a.run.seen[string(encoded)] = struct{}{} - } - - impl, ok := a.run.buckets[string(bucket)] - if !ok { - impl = a.create(evalCtx, a.arguments) - a.run.buckets[string(bucket)] = impl - } - - return impl.Add(ctx, d) -} diff --git a/pkg/sql/index_join.go b/pkg/sql/index_join.go index 8c65ef19f9f8..da4eb2c15ea3 100644 --- a/pkg/sql/index_join.go +++ b/pkg/sql/index_join.go @@ -21,7 +21,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" - "github.com/cockroachdb/cockroach/pkg/util/log" ) // indexJoinNode implements joining of results from an index with the rows @@ -240,64 +239,16 @@ type indexJoinRun struct { colIDtoRowIndex map[sqlbase.ColumnID]int } -const indexJoinBatchSize = 100 - func (n *indexJoinNode) startExec(params runParams) error { - return n.table.startExec(params) + panic("indexJoinNode cannot be run in local mode") } func (n *indexJoinNode) Next(params runParams) (bool, error) { - // Loop looking up the next row. We either are going to pull a row from the - // table or a batch of rows from the index. If we pull a batch of rows from - // the index we perform another iteration of the loop looking for rows in the - // table. This outer loop is necessary because a batch of rows from the index - // might all be filtered when the resulting rows are read from the table. - for tableLookup := (len(n.table.spans) > 0); true; tableLookup = true { - // First, try to pull a row from the table. - if tableLookup { - next, err := n.table.Next(params) - if err != nil { - return false, err - } - if next { - return true, nil - } - } - - // The table is out of rows. Pull primary keys from the index. - n.table.run.scanInitialized = false - n.table.spans = n.table.spans[:0] - - for len(n.table.spans) < indexJoinBatchSize { - if next, err := n.index.Next(params); !next { - // The index is out of rows or an error occurred. - if err != nil { - return false, err - } - if len(n.table.spans) == 0 { - // The index is out of rows. - return false, nil - } - break - } - vals := n.index.Values() - primaryIndexSpan, _, err := sqlbase.EncodeIndexSpan( - n.table.desc.TableDesc(), n.table.index, n.run.colIDtoRowIndex, vals, n.run.primaryKeyPrefix) - if err != nil { - return false, err - } - n.table.spans = append(n.table.spans, primaryIndexSpan) - } - - if log.V(3) { - log.Infof(params.ctx, "table scan: %s", sqlbase.PrettySpans(n.table.index, n.table.spans, 0)) - } - } - return false, nil + panic("indexJoinNode cannot be run in local mode") } func (n *indexJoinNode) Values() tree.Datums { - return n.table.Values() + panic("indexJoinNode cannot be run in local mode") } func (n *indexJoinNode) Close(ctx context.Context) { diff --git a/pkg/sql/information_schema.go b/pkg/sql/information_schema.go index a985923ab326..92de78cdd72c 100644 --- a/pkg/sql/information_schema.go +++ b/pkg/sql/information_schema.go @@ -1429,7 +1429,7 @@ func forEachRole( ctx context.Context, p *planner, fn func(username string, isRole bool) error, ) error { query := `SELECT username, "isRole" FROM system.users` - rows, _ /* cols */, err := p.ExtendedEvalContext().ExecCfg.InternalExecutor.Query( + rows, err := p.ExtendedEvalContext().ExecCfg.InternalExecutor.Query( ctx, "read-roles", p.txn, query, ) if err != nil { @@ -1454,7 +1454,7 @@ func forEachRoleMembership( ctx context.Context, p *planner, fn func(role, member string, isAdmin bool) error, ) error { query := `SELECT "role", "member", "isAdmin" FROM system.role_members` - rows, _ /* cols */, err := p.ExtendedEvalContext().ExecCfg.InternalExecutor.Query( + rows, err := p.ExtendedEvalContext().ExecCfg.InternalExecutor.Query( ctx, "read-members", p.txn, query, ) if err != nil { diff --git a/pkg/sql/internal.go b/pkg/sql/internal.go index 694969ca8de9..f7cd99344ce8 100644 --- a/pkg/sql/internal.go +++ b/pkg/sql/internal.go @@ -230,6 +230,19 @@ func (ie *internalExecutorImpl) initConnEx( // If txn is not nil, the statement will be executed in the respective txn. func (ie *InternalExecutor) Query( ctx context.Context, opName string, txn *client.Txn, stmt string, qargs ...interface{}, +) ([]tree.Datums, error) { + datums, _, err := ie.queryInternal( + ctx, opName, txn, + internalExecRootSession, + SessionArgs{}, + stmt, qargs...) + return datums, err +} + +// QueryWithCols is like Query, but it also returns the computed ResultColumns +// of the input query. +func (ie *InternalExecutor) QueryWithCols( + ctx context.Context, opName string, txn *client.Txn, stmt string, qargs ...interface{}, ) ([]tree.Datums, sqlbase.ResultColumns, error) { return ie.queryInternal( ctx, opName, txn, @@ -273,7 +286,7 @@ func (ie *InternalExecutor) QueryWithUser( func (ie *InternalExecutor) QueryRow( ctx context.Context, opName string, txn *client.Txn, stmt string, qargs ...interface{}, ) (tree.Datums, error) { - rows, _, err := ie.Query(ctx, opName, txn, stmt, qargs...) + rows, err := ie.Query(ctx, opName, txn, stmt, qargs...) if err != nil { return nil, err } @@ -329,6 +342,19 @@ func (ie *InternalExecutor) ExecWithUser( // If txn is not nil, the statement will be executed in the respective txn. func (ie *SessionBoundInternalExecutor) Query( ctx context.Context, opName string, txn *client.Txn, stmt string, qargs ...interface{}, +) ([]tree.Datums, error) { + rows, _, err := ie.impl.queryInternal( + ctx, opName, txn, + internalExecInheritSession, + SessionArgs{}, + stmt, qargs...) + return rows, err +} + +// QueryWithCols is like Query, but it also returns the computed ResultColumns +// of the input query. +func (ie *SessionBoundInternalExecutor) QueryWithCols( + ctx context.Context, opName string, txn *client.Txn, stmt string, qargs ...interface{}, ) ([]tree.Datums, sqlbase.ResultColumns, error) { return ie.impl.queryInternal( ctx, opName, txn, diff --git a/pkg/sql/internal_test.go b/pkg/sql/internal_test.go index c5f03d211e26..d4b775b726db 100644 --- a/pkg/sql/internal_test.go +++ b/pkg/sql/internal_test.go @@ -27,7 +27,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" - "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -199,7 +198,7 @@ func TestInternalExecAppNameInitialization(t *testing.T) { type testInternalExecutor interface { Query( ctx context.Context, opName string, txn *client.Txn, stmt string, qargs ...interface{}, - ) ([]tree.Datums, sqlbase.ResultColumns, error) + ) ([]tree.Datums, error) Exec( ctx context.Context, opName string, txn *client.Txn, stmt string, qargs ...interface{}, ) (int, error) @@ -212,7 +211,7 @@ func testInternalExecutorAppNameInitialization( ie testInternalExecutor, ) { // Check that the application_name is set properly in the executor. - if rows, _, err := ie.Query(context.TODO(), "test-query", nil, + if rows, err := ie.Query(context.TODO(), "test-query", nil, "SHOW application_name"); err != nil { t.Fatal(err) } else if len(rows) != 1 { @@ -225,7 +224,7 @@ func testInternalExecutorAppNameInitialization( // have this keep running until we cancel it below. errChan := make(chan error) go func() { - _, _, err := ie.Query(context.TODO(), + _, err := ie.Query(context.TODO(), "test-query", nil, /* txn */ "SELECT pg_sleep(1337666)") @@ -241,7 +240,7 @@ func testInternalExecutorAppNameInitialization( // When it does, we capture the query ID. var queryID string testutils.SucceedsSoon(t, func() error { - rows, _, err := ie.Query(context.TODO(), + rows, err := ie.Query(context.TODO(), "find-query", nil, /* txn */ // We need to assemble the magic string so that this SELECT @@ -270,7 +269,7 @@ func testInternalExecutorAppNameInitialization( }) // Check that the query shows up in the internal tables without error. - if rows, _, err := ie.Query(context.TODO(), "find-query", nil, + if rows, err := ie.Query(context.TODO(), "find-query", nil, "SELECT application_name FROM crdb_internal.node_queries WHERE query LIKE '%337' || '666%'"); err != nil { t.Fatal(err) } else if len(rows) != 1 { @@ -295,7 +294,7 @@ func testInternalExecutorAppNameInitialization( } // Now check that it was properly registered in statistics. - if rows, _, err := ie.Query(context.TODO(), "find-query", nil, + if rows, err := ie.Query(context.TODO(), "find-query", nil, "SELECT application_name FROM crdb_internal.node_statement_statistics WHERE key LIKE 'SELECT' || ' pg_sleep(%'"); err != nil { t.Fatal(err) } else if len(rows) != 1 { diff --git a/pkg/sql/join.go b/pkg/sql/join.go index e9ed4444cdcc..e4e50ddc4e52 100644 --- a/pkg/sql/join.go +++ b/pkg/sql/join.go @@ -17,15 +17,12 @@ package sql import ( "context" "fmt" - "unsafe" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" - "github.com/cockroachdb/cockroach/pkg/sql/rowcontainer" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sem/types" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/util" - "github.com/cockroachdb/cockroach/pkg/util/mon" ) // joinNode is a planNode whose rows are the result of an inner or @@ -53,8 +50,6 @@ type joinNode struct { // columns contains the metadata for the results of this node. columns sqlbase.ResultColumns - - run joinRun } // makeJoinPredicate builds a joinPredicate from a join condition. Also returns @@ -132,22 +127,6 @@ func (p *planner) makeJoinNode( pred: pred, columns: pred.info.SourceColumns, } - - n.run.buffer = &RowBuffer{ - RowContainer: rowcontainer.NewRowContainer( - p.EvalContext().Mon.MakeBoundAccount(), sqlbase.ColTypeInfoFromResCols(planColumns(n)), 0, - ), - } - - n.run.bucketsMemAcc = p.EvalContext().Mon.MakeBoundAccount() - n.run.buckets = buckets{ - buckets: make(map[string]*bucket), - rowContainer: rowcontainer.NewRowContainer( - p.EvalContext().Mon.MakeBoundAccount(), - sqlbase.ColTypeInfoFromResCols(planColumns(n.right.plan)), - 0, - ), - } return n } @@ -371,387 +350,26 @@ func (p *planner) makeJoin( return planDataSource{info: rInfo, plan: r}, nil } -// joinRun contains the run-time state of joinNode during local execution. -type joinRun struct { - // output contains the last generated row of results from this node. - output tree.Datums - - // buffer is our intermediate row store where we effectively 'stash' a batch - // of results at once, this is then used for subsequent calls to Next() and - // Values(). - buffer *RowBuffer - - buckets buckets - bucketsMemAcc mon.BoundAccount - - // emptyRight contain tuples of NULL values to use on the right for left and - // full outer joins when the on condition fails. - // This is also used for semi and anti joins, in which case it is always nil. - emptyRight tree.Datums - - // emptyLeft contains tuples of NULL values to use on the left for right and - // full outer joins when the on condition fails. - emptyLeft tree.Datums - - // finishedOutput indicates that we've finished writing all of the rows for - // this join and that we can quit as soon as our buffer is empty. - finishedOutput bool -} - func (n *joinNode) startExec(params runParams) error { - if err := n.hashJoinStart(params); err != nil { - return err - } - - // Pre-allocate the space for output rows. - n.run.output = make(tree.Datums, len(n.columns)) - - // If needed, pre-allocate left and right rows of NULL tuples for when the - // join predicate fails to match. - if n.joinType == sqlbase.LeftOuterJoin || n.joinType == sqlbase.FullOuterJoin { - n.run.emptyRight = make(tree.Datums, len(planColumns(n.right.plan))) - for i := range n.run.emptyRight { - n.run.emptyRight[i] = tree.DNull - } - } - if n.joinType == sqlbase.RightOuterJoin || n.joinType == sqlbase.FullOuterJoin { - n.run.emptyLeft = make(tree.Datums, len(planColumns(n.left.plan))) - for i := range n.run.emptyLeft { - n.run.emptyLeft[i] = tree.DNull - } - } - - return nil -} - -func (n *joinNode) hashJoinStart(params runParams) error { - var scratch []byte - // Load all the rows from the right side and build our hashmap. - ctx := params.ctx - for { - hasRow, err := n.right.plan.Next(params) - if err != nil { - return err - } - if !hasRow { - break - } - row := n.right.plan.Values() - encoding, _, err := n.pred.encode(scratch, row, n.pred.rightEqualityIndices) - if err != nil { - return err - } - - if err := n.run.buckets.AddRow(ctx, &n.run.bucketsMemAcc, encoding, row); err != nil { - return err - } - - scratch = encoding[:0] - } - if n.joinType == sqlbase.FullOuterJoin || n.joinType == sqlbase.RightOuterJoin { - return n.run.buckets.InitSeen(ctx, &n.run.bucketsMemAcc) - } - return nil + panic("joinNode cannot be run in local mode") } // Next implements the planNode interface. func (n *joinNode) Next(params runParams) (res bool, err error) { - // If results available from from previously computed results, we just - // return true. - if n.run.buffer.Next() { - return true, nil - } - - // If the buffer is empty and we've finished outputting, we're done. - if n.run.finishedOutput { - return false, nil - } - - wantUnmatchedLeft := n.joinType == sqlbase.LeftOuterJoin || - n.joinType == sqlbase.FullOuterJoin || - n.joinType == sqlbase.LeftAntiJoin - wantUnmatchedRight := n.joinType == sqlbase.RightOuterJoin || n.joinType == sqlbase.FullOuterJoin - - if len(n.run.buckets.Buckets()) == 0 { - if !wantUnmatchedLeft { - // No rows on right; don't even try. - return false, nil - } - } - - // Compute next batch of matching rows. - var scratch []byte - for { - if err := params.p.cancelChecker.Check(); err != nil { - return false, err - } - - leftHasRow, err := n.left.plan.Next(params) - if err != nil { - return false, err - } - if !leftHasRow { - break - } - - lrow := n.left.plan.Values() - encoding, containsNull, err := n.pred.encode(scratch, lrow, n.pred.leftEqualityIndices) - if err != nil { - return false, err - } - - // We make the explicit check for whether or not lrow contained a NULL value - // on an equality column. The reasoning here is because of the way we expect - // NULL equality checks to behave (i.e. NULL != NULL) and the fact that we - // use the encoding of any given row as key into our bucket. Thus if we - // encountered a NULL row when building the hashmap we have to store in - // order to use it for RIGHT OUTER joins but if we encounter another - // NULL row when going through the left stream (probing phase), matching - // this with the first NULL row would be incorrect. - // - // If we have have the following: - // CREATE TABLE t(x INT); INSERT INTO t(x) VALUES (NULL); - // | x | - // ------ - // | NULL | - // - // For the following query: - // SELECT * FROM t AS a FULL OUTER JOIN t AS b USING(x); - // - // We expect: - // | x | - // ------ - // | NULL | - // | NULL | - // - // The following examples illustrates the behavior when joining on two - // or more columns, and only one of them contains NULL. - // If we have have the following: - // CREATE TABLE t(x INT, y INT); - // INSERT INTO t(x, y) VALUES (44,51), (NULL,52); - // | x | y | - // ------ - // | 44 | 51 | - // | NULL | 52 | - // - // For the following query: - // SELECT * FROM t AS a FULL OUTER JOIN t AS b USING(x, y); - // - // We expect: - // | x | y | - // ------ - // | 44 | 51 | - // | NULL | 52 | - // | NULL | 52 | - if containsNull { - if !wantUnmatchedLeft { - scratch = encoding[:0] - // Failed to match -- no matching row, nothing to do. - continue - } - // We append an empty right row to the left row, adding the result - // to our buffer for the subsequent call to Next(). - n.pred.prepareRow(n.run.output, lrow, n.run.emptyRight) - if _, err := n.run.buffer.AddRow(params.ctx, n.run.output); err != nil { - return false, err - } - return n.run.buffer.Next(), nil - } - - b, ok := n.run.buckets.Fetch(encoding) - if !ok { - if !wantUnmatchedLeft { - scratch = encoding[:0] - continue - } - // Left or full outer join: unmatched rows are padded with NULLs. - // Given that we did not find a matching right row we append an - // empty right row to the left row, adding the result to our buffer - // for the subsequent call to Next(). - n.pred.prepareRow(n.run.output, lrow, n.run.emptyRight) - if _, err := n.run.buffer.AddRow(params.ctx, n.run.output); err != nil { - return false, err - } - return n.run.buffer.Next(), nil - } - - // We iterate through all the rows in the bucket attempting to match the - // on condition, if the on condition passes we add it to the buffer. - foundMatch := false - for idx, rrow := range b.Rows() { - passesOnCond, err := n.pred.eval(params.EvalContext(), lrow, rrow) - if err != nil { - return false, err - } - - if !passesOnCond { - continue - } - foundMatch = true - if n.joinType == sqlbase.JoinType_LEFT_ANTI { - // For anti-join, we want to output the left rows that don't have a - // match. Since we found a match, we can skip this row. - break - } - - if n.joinType == sqlbase.JoinType_LEFT_SEMI { - // Semi-joins only output the left row. - n.pred.prepareRow(n.run.output, lrow, nil) - } else { - n.pred.prepareRow(n.run.output, lrow, rrow) - } - if wantUnmatchedRight { - // Mark the row as seen if we need to retrieve the rows - // without matches for right or full joins later. - b.MarkSeen(idx) - } - if _, err := n.run.buffer.AddRow(params.ctx, n.run.output); err != nil { - return false, err - } - if n.joinType == sqlbase.JoinType_LEFT_SEMI { - // For semi-joins, we only output the left row once, even if it matches - // multiple rows. - break - } - } - if !foundMatch && wantUnmatchedLeft { - // If none of the rows matched the on condition and we are computing a - // left outer, full outer, or anti join, we need to add a row with an - // empty right side. - n.pred.prepareRow(n.run.output, lrow, n.run.emptyRight) - if _, err := n.run.buffer.AddRow(params.ctx, n.run.output); err != nil { - return false, err - } - } - if n.run.buffer.Next() { - return true, nil - } - scratch = encoding[:0] - } - - // no more lrows, we go through the unmatched rows in the internal hashmap. - if !wantUnmatchedRight { - return false, nil - } - - for _, b := range n.run.buckets.Buckets() { - for idx, rrow := range b.Rows() { - if err := params.p.cancelChecker.Check(); err != nil { - return false, err - } - if !b.Seen(idx) { - n.pred.prepareRow(n.run.output, n.run.emptyLeft, rrow) - if _, err := n.run.buffer.AddRow(params.ctx, n.run.output); err != nil { - return false, err - } - } - } - } - n.run.finishedOutput = true - - return n.run.buffer.Next(), nil + panic("joinNode cannot be run in local mode") } // Values implements the planNode interface. func (n *joinNode) Values() tree.Datums { - return n.run.buffer.Values() + panic("joinNode cannot be run in local mode") } // Close implements the planNode interface. func (n *joinNode) Close(ctx context.Context) { - n.run.buffer.Close(ctx) - n.run.buffer = nil - n.run.buckets.Close(ctx) - n.run.bucketsMemAcc.Close(ctx) - n.right.plan.Close(ctx) n.left.plan.Close(ctx) } -// bucket here is the set of rows for a given group key (comprised of -// columns specified by the join constraints), 'seen' is used to determine if -// there was a matching row in the opposite stream. -type bucket struct { - rows []tree.Datums - seen []bool -} - -func (b *bucket) Seen(i int) bool { - return b.seen[i] -} - -func (b *bucket) Rows() []tree.Datums { - return b.rows -} - -func (b *bucket) MarkSeen(i int) { - b.seen[i] = true -} - -func (b *bucket) AddRow(row tree.Datums) { - b.rows = append(b.rows, row) -} - -type buckets struct { - buckets map[string]*bucket - rowContainer *rowcontainer.RowContainer -} - -func (b *buckets) Buckets() map[string]*bucket { - return b.buckets -} - -func (b *buckets) AddRow( - ctx context.Context, acc *mon.BoundAccount, encoding []byte, row tree.Datums, -) error { - bk, ok := b.buckets[string(encoding)] - if !ok { - bk = &bucket{} - } - - rowCopy, err := b.rowContainer.AddRow(ctx, row) - if err != nil { - return err - } - if err := acc.Grow(ctx, rowcontainer.SizeOfDatums); err != nil { - return err - } - bk.AddRow(rowCopy) - - if !ok { - b.buckets[string(encoding)] = bk - } - return nil -} - -const sizeOfBoolSlice = unsafe.Sizeof([]bool{}) -const sizeOfBool = unsafe.Sizeof(true) - -// InitSeen initializes the seen array for each of the buckets. It must be run -// before the buckets' seen state is used. -func (b *buckets) InitSeen(ctx context.Context, acc *mon.BoundAccount) error { - for _, bucket := range b.buckets { - if err := acc.Grow( - ctx, int64(sizeOfBoolSlice+uintptr(len(bucket.rows))*sizeOfBool), - ); err != nil { - return err - } - bucket.seen = make([]bool, len(bucket.rows)) - } - return nil -} - -func (b *buckets) Close(ctx context.Context) { - b.rowContainer.Close(ctx) - b.rowContainer = nil - b.buckets = nil -} - -func (b *buckets) Fetch(encoding []byte) (*bucket, bool) { - bk, ok := b.buckets[string(encoding)] - return bk, ok -} - // commonColumns returns the names of columns common on the // right and left sides, for use by NATURAL JOIN. func commonColumns(left, right *sqlbase.DataSourceInfo) tree.NameList { diff --git a/pkg/sql/join_predicate.go b/pkg/sql/join_predicate.go index e2af2142a630..d0ce2a87650f 100644 --- a/pkg/sql/join_predicate.go +++ b/pkg/sql/join_predicate.go @@ -266,23 +266,6 @@ func (p *joinPredicate) IndexedVarNodeFormatter(idx int) tree.NodeFormatter { return p.rightInfo.NodeFormatter(idx - p.numLeftCols) } -// eval for joinPredicate runs the on condition across the columns that do -// not participate in the equality (the equality columns are checked -// in the join algorithm already). -// Returns true if there is no on condition or the on condition accepts the -// row. -func (p *joinPredicate) eval(ctx *tree.EvalContext, leftRow, rightRow tree.Datums) (bool, error) { - if p.onCond != nil { - copy(p.curRow[:len(leftRow)], leftRow) - copy(p.curRow[len(leftRow):], rightRow) - ctx.PushIVarContainer(p.iVarHelper.Container()) - pred, err := sqlbase.RunFilter(p.onCond, ctx) - ctx.PopIVarContainer() - return pred, err - } - return true, nil -} - // getNeededColumns figures out the columns needed for the two // sources. This takes into account both the equality columns and the // predicate expression. @@ -309,30 +292,6 @@ func (p *joinPredicate) getNeededColumns(neededJoined []bool) ([]bool, []bool) { return leftNeeded, rightNeeded } -// prepareRow prepares the output row by combining values from the -// input data sources. -func (p *joinPredicate) prepareRow(result, leftRow, rightRow tree.Datums) { - copy(result[:len(leftRow)], leftRow) - copy(result[len(leftRow):], rightRow) -} - -// encode returns the encoding of a row from a given side (left or right), -// according to the columns specified by the equality constraints. -func (p *joinPredicate) encode(b []byte, row tree.Datums, cols []int) ([]byte, bool, error) { - var err error - containsNull := false - for _, colIdx := range cols { - if row[colIdx] == tree.DNull { - containsNull = true - } - b, err = sqlbase.EncodeDatumKeyAscending(b, row[colIdx]) - if err != nil { - return nil, false, err - } - } - return b, containsNull, nil -} - // usingColumns captures the information about equality columns // from USING and NATURAL JOIN statements. type usingColumn struct { diff --git a/pkg/sql/join_test.go b/pkg/sql/join_test.go index ce02051d1362..d8d17ac0398c 100644 --- a/pkg/sql/join_test.go +++ b/pkg/sql/join_test.go @@ -17,14 +17,11 @@ package sql import ( "context" "fmt" - "reflect" "testing" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/internal/client" - "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/sql/opt/exec" - "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" @@ -129,185 +126,3 @@ func TestInterleavedNodes(t *testing.T) { } } } - -func TestSemiAntiJoin(t *testing.T) { - defer leaktest.AfterTest(t)() - - ctx := context.TODO() - s, sqlDB, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) - defer s.Stopper().Stop(ctx) - - sqlutils.CreateTestInterleavedHierarchy(t, sqlDB) - sqlutils.CreateTable( - t, sqlDB, "t1", "a INT PRIMARY KEY, b INT, c INT", 10, - sqlutils.ToRowFn( - sqlutils.RowIdxFn, - func(row int) tree.Datum { - return tree.NewDInt(tree.DInt(row * row)) - }, - func(row int) tree.Datum { - return tree.NewDInt(tree.DInt(row * row * row)) - }, - ), - ) - sqlutils.CreateTable( - t, sqlDB, "t2", "a INT PRIMARY KEY, b INT", 10, - sqlutils.ToRowFn( - func(row int) tree.Datum { - return tree.NewDInt(tree.DInt(row + 5)) - }, - func(row int) tree.Datum { - return tree.NewDInt(tree.DInt((row - 5) * (row - 5))) - }, - ), - ) - testCases := []struct { - typ sqlbase.JoinType - left, right string - using tree.NameList - expected []string - }{ - { // 0 - typ: sqlbase.JoinType_LEFT_SEMI, - left: "t1", - right: "t2", - using: tree.NameList{"a"}, - expected: []string{ - "[6 36 216]", - "[7 49 343]", - "[8 64 512]", - "[9 81 729]", - "[10 100 1000]", - }, - }, - { // 1 - typ: sqlbase.JoinType_LEFT_ANTI, - left: "t1", - right: "t2", - using: tree.NameList{"a"}, - expected: []string{ - "[1 1 1]", - "[2 4 8]", - "[3 9 27]", - "[4 16 64]", - "[5 25 125]", - }, - }, - { // 2 - typ: sqlbase.JoinType_LEFT_SEMI, - left: "t1", - right: "t2", - using: tree.NameList{"b"}, - expected: []string{ - "[1 1 1]", - "[2 4 8]", - "[3 9 27]", - "[4 16 64]", - "[5 25 125]", - }, - }, - { // 3 - typ: sqlbase.JoinType_LEFT_ANTI, - left: "t1", - right: "t2", - using: tree.NameList{"b"}, - expected: []string{ - "[6 36 216]", - "[7 49 343]", - "[8 64 512]", - "[9 81 729]", - "[10 100 1000]", - }, - }, - { // 4 - typ: sqlbase.JoinType_LEFT_SEMI, - left: "t2", - right: "t1", - using: tree.NameList{"a"}, - expected: []string{ - "[6 16]", - "[7 9]", - "[8 4]", - "[9 1]", - "[10 0]", - }, - }, - { // 5 - typ: sqlbase.JoinType_LEFT_ANTI, - left: "t2", - right: "t1", - using: tree.NameList{"a"}, - expected: []string{ - "[11 1]", - "[12 4]", - "[13 9]", - "[14 16]", - "[15 25]", - }, - }, - } - - for i, tc := range testCases { - t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { - err := kvDB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { - left, err := newTestScanNode(kvDB, tc.left) - if err != nil { - return err - } - right, err := newTestScanNode(kvDB, tc.right) - if err != nil { - return err - } - - execCfg := s.ExecutorConfig().(ExecutorConfig) - p, cleanup := newInternalPlanner( - "TestSemiAntiJoin", txn, security.RootUser, &MemoryMetrics{}, &execCfg, - ) - defer cleanup() - leftSrc := planDataSource{ - plan: left, - info: &sqlbase.DataSourceInfo{SourceColumns: planColumns(left)}, - } - rightSrc := planDataSource{ - plan: right, - info: &sqlbase.DataSourceInfo{SourceColumns: planColumns(right)}, - } - pred, _, err := p.makeJoinPredicate( - ctx, leftSrc.info, rightSrc.info, tc.typ, - &tree.UsingJoinCond{Cols: tc.using}, - ) - if err != nil { - return err - } - join := p.makeJoinNode(planDataSource{plan: left}, planDataSource{plan: right}, pred) - params := runParams{ - ctx: ctx, - extendedEvalCtx: p.ExtendedEvalContext(), - p: p, - } - defer join.Close(ctx) - if err := startPlan(params, join); err != nil { - return err - } - var res []string - for { - ok, err := join.Next(params) - if err != nil { - return err - } - if !ok { - break - } - res = append(res, fmt.Sprintf("%v", join.Values())) - } - if !reflect.DeepEqual(res, tc.expected) { - t.Errorf("expected:\n%v\n got:\n%v", tc.expected, res) - } - return nil - }) - if err != nil { - t.Fatal(err) - } - }) - } -} diff --git a/pkg/sql/lease.go b/pkg/sql/lease.go index 4c5a3c90b669..452006916aa1 100644 --- a/pkg/sql/lease.go +++ b/pkg/sql/lease.go @@ -1790,7 +1790,7 @@ SELECT "descID", version, expiration FROM system.public.lease AS OF SYSTEM TIME // The retry is required because of errors caused by node restarts. Retry 30 times. if err := retry.WithMaxAttempts(ctx, retryOptions, 30, func() error { var err error - rows, _, err = m.LeaseStore.execCfg.InternalExecutor.Query( + rows, err = m.LeaseStore.execCfg.InternalExecutor.Query( ctx, "read orphaned table leases", nil /*txn*/, sqlQuery) return err }); err != nil { diff --git a/pkg/sql/limit.go b/pkg/sql/limit.go index 7cbbf02ea93d..bbdf437868df 100644 --- a/pkg/sql/limit.go +++ b/pkg/sql/limit.go @@ -32,8 +32,6 @@ type limitNode struct { evaluated bool count int64 offset int64 - - run limitRun } // limit constructs a limitNode based on the LIMIT and OFFSET clauses. @@ -74,39 +72,17 @@ func (p *planner) Limit(ctx context.Context, n *tree.Limit) (*limitNode, error) return &res, nil } -// limitRun contains the state of limitNode during local execution. -type limitRun struct { - rowIndex int64 -} - func (n *limitNode) startExec(params runParams) error { - return n.evalLimit(params.EvalContext()) + panic("limitNode cannot be run in local mode") } func (n *limitNode) Next(params runParams) (bool, error) { - // n.rowIndex is the 0-based index of the next row. - // We don't do (n.rowIndex >= n.offset + n.count) to avoid overflow (count can be MaxInt64). - if n.run.rowIndex-n.offset >= n.count { - return false, nil - } - - for { - if next, err := n.plan.Next(params); !next { - return false, err - } - - n.run.rowIndex++ - if n.run.rowIndex > n.offset { - // Row within limits, return it. - break - } - - // Fetch the next row. - } - return true, nil + panic("limitNode cannot be run in local mode") } -func (n *limitNode) Values() tree.Datums { return n.plan.Values() } +func (n *limitNode) Values() tree.Datums { + panic("limitNode cannot be run in local mode") +} func (n *limitNode) Close(ctx context.Context) { n.plan.Close(ctx) diff --git a/pkg/sql/logictest/testdata/planner_test/insert b/pkg/sql/logictest/testdata/planner_test/insert index 37352b44a394..c4f40b019d82 100644 --- a/pkg/sql/logictest/testdata/planner_test/insert +++ b/pkg/sql/logictest/testdata/planner_test/insert @@ -361,7 +361,6 @@ count · · │ count 1 └── sort · · │ order +column2 - │ strategy top 1 └── values · · · size 2 columns, 2 rows @@ -376,7 +375,6 @@ count · · │ count 1 └── sort · · │ order +column2 - │ strategy top 1 └── values · · · size 2 columns, 2 rows @@ -391,6 +389,5 @@ count · · │ count 1 └── sort · · │ order +column2 - │ strategy top 1 └── values · · · size 2 columns, 2 rows diff --git a/pkg/sql/logictest/testdata/planner_test/limit b/pkg/sql/logictest/testdata/planner_test/limit index 2c611057e1dc..c083cee8cca9 100644 --- a/pkg/sql/logictest/testdata/planner_test/limit +++ b/pkg/sql/logictest/testdata/planner_test/limit @@ -31,7 +31,6 @@ limit · · (w) │ count 100 · · └── sort · · (w) +w │ order +w · · - │ strategy top 100 · · └── render · · (w) · │ render 0 test.public.t.w · · └── scan · · (k[omitted], v[omitted], w) k!=NULL; key(k) @@ -100,7 +99,6 @@ limit · · (sum) · │ count 10 · · └── sort · · (sum) · │ order -v · · - │ strategy top 10 · · └── group · · (sum, v) weak-key(v) │ aggregate 0 sum(w) · · │ aggregate 1 v · · diff --git a/pkg/sql/logictest/testdata/planner_test/order_by b/pkg/sql/logictest/testdata/planner_test/order_by index 4b6b9c4b191b..ebb878f8d90d 100644 --- a/pkg/sql/logictest/testdata/planner_test/order_by +++ b/pkg/sql/logictest/testdata/planner_test/order_by @@ -30,15 +30,14 @@ sort · · query TTT EXPLAIN SELECT a, b FROM t ORDER BY b LIMIT 2 ---- -limit · · - │ count 2 - └── sort · · - │ order +b - │ strategy top 2 - └── render · · - └── scan · · -· table t@primary -· spans ALL +limit · · + │ count 2 + └── sort · · + │ order +b + └── render · · + └── scan · · +· table t@primary +· spans ALL query TTTTT EXPLAIN (VERBOSE) SELECT DISTINCT c FROM t ORDER BY b LIMIT 2 @@ -48,7 +47,6 @@ limit · · (c) weak- └── distinct · · (c) weak-key(c) └── sort · · (c) · │ order +b · · - │ strategy iterative · · └── render · · (c, b) · │ render 0 test.public.t.c · · │ render 1 test.public.t.b · · diff --git a/pkg/sql/logictest/testdata/planner_test/select_index b/pkg/sql/logictest/testdata/planner_test/select_index index f02f960e4157..b70af34343a2 100644 --- a/pkg/sql/logictest/testdata/planner_test/select_index +++ b/pkg/sql/logictest/testdata/planner_test/select_index @@ -1085,14 +1085,13 @@ SELECT tree, field, description FROM [ EXPLAIN (VERBOSE) SELECT * FROM noncover ORDER BY c LIMIT 1000000 ] ---- -limit · · - │ count 1000000 - └── sort · · - │ order +c - │ strategy top 1000000 - └── scan · · -· table noncover@primary -· spans ALL +limit · · + │ count 1000000 + └── sort · · + │ order +c + └── scan · · +· table noncover@primary +· spans ALL # ------------------------------------------------------------------------------ # These tests verify that while we are joining an index with the table, we diff --git a/pkg/sql/logictest/testdata/planner_test/show_trace b/pkg/sql/logictest/testdata/planner_test/show_trace index 64f1ea423604..78e5df99d9fa 100644 --- a/pkg/sql/logictest/testdata/planner_test/show_trace +++ b/pkg/sql/logictest/testdata/planner_test/show_trace @@ -268,10 +268,10 @@ SET tracing = on,kv,results; DELETE FROM t.kv; SET tracing = off query TT SELECT operation, message FROM [SHOW KV TRACE FOR SESSION] ---- -flow Scan /Table/54/{1-2} +table reader Scan /Table/54/{1-2} dist sender send querying next range at /Table/54/1 dist sender send r20: sending batch 1 Scan to (n1,s1):1 -flow fetched: /kv/primary/1/v -> /2 +table reader fetched: /kv/primary/1/v -> /2 flow Del /Table/54/2/2/0 flow Del /Table/54/1/1/0 dist sender send querying next range at /Table/54/1/1/0 diff --git a/pkg/sql/lookup_join.go b/pkg/sql/lookup_join.go index 6a2c026cb085..488cd28098fa 100644 --- a/pkg/sql/lookup_join.go +++ b/pkg/sql/lookup_join.go @@ -17,7 +17,6 @@ package sql import ( "context" - "github.com/cockroachdb/cockroach/pkg/sql/opt/exec" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" ) @@ -43,130 +42,21 @@ type lookupJoinNode struct { onCond tree.TypedExpr props physicalProps - - run lookupJoinRun -} - -// lookupJoinRun is the state for the local execution path for lookup join. -// -// We have no local execution path; we fall back on doing a full table scan and -// using the joinNode to do the join. Note that this can be significantly worse -// than not having lookup joins at all, because no filters are being pushed into -// the scan as constraints. -// -// This path is temporary and only exists to avoid failures (especially in logic -// tests) when DistSQL is not being used. -type lookupJoinRun struct { - n *joinNode } func (lj *lookupJoinNode) startExec(params runParams) error { - // Make sure the table node has a span (full scan). - var err error - lj.table.spans, err = spansFromConstraint( - lj.table.desc, lj.table.index, nil /* constraint */, exec.ColumnOrdinalSet{}, - false /* forDelete */) - if err != nil { - return err - } - - // Create a joinNode that joins the input and the table. Note that startExec - // will be called on lj.input and lj.table. - - leftSrc := planDataSource{ - info: &sqlbase.DataSourceInfo{SourceColumns: planColumns(lj.input)}, - plan: lj.input, - } - - indexColIDs := lj.table.index.ColumnIDs - if !lj.table.index.Unique { - // Add implicit key columns. - indexColIDs = append(indexColIDs, lj.table.index.ExtraColumnIDs...) - } - - // The lookup side may not output all the index columns on which we are doing - // the lookup. We need them to be produced so that we can refer to them in the - // join predicate. So we find any such instances and adjust the scan node - // accordingly. - for i := range lj.keyCols { - colID := indexColIDs[i] - if _, ok := lj.table.colIdxMap[colID]; !ok { - // Tricky case: the lookup join doesn't output this column so we can't - // refer to it; we have to add it. - n := lj.table - colPos := len(n.cols) - var colDesc *sqlbase.ColumnDescriptor - for i := range n.desc.Columns { - if n.desc.Columns[i].ID == colID { - colDesc = &n.desc.Columns[i] - break - } - } - n.cols = append(n.cols, *colDesc) - n.resultColumns = append( - n.resultColumns, - leftSrc.info.SourceColumns[lj.keyCols[i]], - ) - n.colIdxMap[colID] = colPos - n.valNeededForCol.Add(colPos) - n.run.row = make([]tree.Datum, len(n.cols)) - n.filterVars = tree.MakeIndexedVarHelper(n, len(n.cols)) - // startExec was already called for the node, run it again. - if err := n.startExec(params); err != nil { - return err - } - } - } - - if err := lj.table.startExec(params); err != nil { - return err - } - - rightSrc := planDataSource{ - info: &sqlbase.DataSourceInfo{SourceColumns: planColumns(lj.table)}, - plan: lj.table, - } - - pred, _, err := params.p.makeJoinPredicate( - context.TODO(), leftSrc.info, rightSrc.info, lj.joinType, nil, /* cond */ - ) - if err != nil { - return err - } - - // Program the equalities implied by keyCols. - for i := range lj.keyCols { - colID := indexColIDs[i] - pred.addEquality(leftSrc.info, lj.keyCols[i], rightSrc.info, lj.table.colIdxMap[colID]) - } - - onAndExprs := splitAndExpr(params.EvalContext(), lj.onCond, nil /* exprs */) - for _, e := range onAndExprs { - if e != tree.DBoolTrue && !pred.tryAddEqualityFilter(e, leftSrc.info, rightSrc.info) { - pred.onCond = mergeConj(pred.onCond, e) - } - } - lj.run.n = params.p.makeJoinNode(leftSrc, rightSrc, pred) - if err := lj.run.n.startExec(params); err != nil { - return err - } - return lj.table.startExec(params) + panic("lookupJoinNode cannot be run in local mode") } func (lj *lookupJoinNode) Next(params runParams) (bool, error) { - return lj.run.n.Next(params) + panic("lookupJoinNode cannot be run in local mode") } func (lj *lookupJoinNode) Values() tree.Datums { - // Chop off any values we may have tacked onto the table scanNode. - return lj.run.n.Values()[:len(lj.columns)] + panic("lookupJoinNode cannot be run in local mode") } func (lj *lookupJoinNode) Close(ctx context.Context) { - if lj.run.n != nil { - lj.run.n.Close(ctx) - } else { - lj.input.Close(ctx) - lj.table.Close(ctx) - } + lj.input.Close(ctx) + lj.table.Close(ctx) } diff --git a/pkg/sql/opt/exec/execbuilder/testdata/show_trace b/pkg/sql/opt/exec/execbuilder/testdata/show_trace index 6810d230ff10..9d5bd41acdcc 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/show_trace +++ b/pkg/sql/opt/exec/execbuilder/testdata/show_trace @@ -264,10 +264,10 @@ SET tracing = on,kv,results; DELETE FROM t.kv; SET tracing = off query TT SELECT operation, message FROM [SHOW KV TRACE FOR SESSION] ---- -flow Scan /Table/54/{1-2} +table reader Scan /Table/54/{1-2} dist sender send querying next range at /Table/54/1 dist sender send r20: sending batch 1 Scan to (n1,s1):1 -flow fetched: /kv/primary/1/v -> /2 +table reader fetched: /kv/primary/1/v -> /2 flow Del /Table/54/2/2/0 flow Del /Table/54/1/1/0 dist sender send querying next range at /Table/54/1/1/0 diff --git a/pkg/sql/opt_decompose_test.go b/pkg/sql/opt_decompose_test.go index a35f77257ac5..0935cd424f84 100644 --- a/pkg/sql/opt_decompose_test.go +++ b/pkg/sql/opt_decompose_test.go @@ -72,7 +72,6 @@ func makeSelectNode(t *testing.T, p *planner) *renderNode { numColumns := len(sel.sourceInfo[0].SourceColumns) sel.ivarHelper = tree.MakeIndexedVarHelper(sel, numColumns) p.extendedEvalCtx.IVarContainer = sel - sel.run.curSourceRow = make(tree.Datums, numColumns) return sel } diff --git a/pkg/sql/opt_exec_factory.go b/pkg/sql/opt_exec_factory.go index 70d99c3a74d8..99d25d08853b 100644 --- a/pkg/sql/opt_exec_factory.go +++ b/pkg/sql/opt_exec_factory.go @@ -93,7 +93,7 @@ func (ef *execFactory) ConstructScan( return newZeroNode(scan.resultColumns), nil } scan.index = indexDesc - scan.run.isSecondaryIndex = (indexDesc != &tabDesc.PrimaryIndex) + scan.isSecondaryIndex = (indexDesc != &tabDesc.PrimaryIndex) scan.hardLimit = hardLimit scan.reverse = reverse scan.maxResults = maxResults @@ -541,7 +541,7 @@ func (ef *execFactory) ConstructIndexJoin( primaryIndex := tabDesc.GetPrimaryIndex() tableScan.index = &primaryIndex - tableScan.run.isSecondaryIndex = false + tableScan.isSecondaryIndex = false tableScan.disableBatchLimit() primaryKeyColumns, colIDtoRowIndex := processIndexJoinColumns(tableScan, scan) @@ -584,7 +584,7 @@ func (ef *execFactory) ConstructLookupJoin( } tableScan.index = indexDesc - tableScan.run.isSecondaryIndex = (indexDesc != &tabDesc.PrimaryIndex) + tableScan.isSecondaryIndex = (indexDesc != &tabDesc.PrimaryIndex) n := &lookupJoinNode{ input: input.(planNode), @@ -631,7 +631,7 @@ func (ef *execFactory) constructScanForZigzag( } scan.index = indexDesc - scan.run.isSecondaryIndex = (indexDesc.ID != tableDesc.PrimaryIndex.ID) + scan.isSecondaryIndex = (indexDesc.ID != tableDesc.PrimaryIndex.ID) return scan, nil } diff --git a/pkg/sql/opt_index_selection.go b/pkg/sql/opt_index_selection.go index f9d77c2fc4ac..fa23f0abdf59 100644 --- a/pkg/sql/opt_index_selection.go +++ b/pkg/sql/opt_index_selection.go @@ -229,7 +229,7 @@ func (p *planner) selectIndex( c := candidates[0] s.index = c.index s.specifiedIndex = nil - s.run.isSecondaryIndex = (c.index != &s.desc.PrimaryIndex) + s.isSecondaryIndex = (c.index != &s.desc.PrimaryIndex) constraint := c.ic.Constraint() diff --git a/pkg/sql/opt_limits.go b/pkg/sql/opt_limits.go index b37776d81005..6300327c2765 100644 --- a/pkg/sql/opt_limits.go +++ b/pkg/sql/opt_limits.go @@ -73,14 +73,6 @@ func (p *planner) applyLimit(plan planNode, numRows int64, soft bool) { p.applyLimit(n.plan, getLimit(count, n.offset), false /* soft */) case *sortNode: - if n.needSort && numRows != math.MaxInt64 { - v := p.newSortValues(n.ordering, planColumns(n.plan), int(numRows)) - if soft { - n.run.sortStrategy = newIterativeSortStrategy(v) - } else { - n.run.sortStrategy = newSortTopKStrategy(v, numRows) - } - } if n.needSort { // We can't propagate the limit, because the sort // potentially needs all rows. diff --git a/pkg/sql/pg_catalog.go b/pkg/sql/pg_catalog.go index ca43a1782ee8..b63b9deadc52 100644 --- a/pkg/sql/pg_catalog.go +++ b/pkg/sql/pg_catalog.go @@ -991,7 +991,7 @@ CREATE TABLE pg_catalog.pg_description ( p *planner, dbContext *DatabaseDescriptor, addRow func(...tree.Datum) error) error { - comments, _, err := p.extendedEvalCtx.ExecCfg.InternalExecutor.Query( + comments, err := p.extendedEvalCtx.ExecCfg.InternalExecutor.Query( ctx, "select-comments", p.EvalContext().Txn, diff --git a/pkg/sql/pgwire/conn_test.go b/pkg/sql/pgwire/conn_test.go index 42d33714e858..8a6797bf766c 100644 --- a/pkg/sql/pgwire/conn_test.go +++ b/pkg/sql/pgwire/conn_test.go @@ -223,7 +223,7 @@ func processPgxStartup(ctx context.Context, s serverutils.TestServerInterface, c func execQuery( ctx context.Context, query string, s serverutils.TestServerInterface, c *conn, ) error { - rows, cols, err := s.InternalExecutor().(sqlutil.InternalExecutor).Query( + rows, cols, err := s.InternalExecutor().(sqlutil.InternalExecutor).QueryWithCols( ctx, "test", nil /* txn */, query, ) if err != nil { diff --git a/pkg/sql/plan.go b/pkg/sql/plan.go index 2bdda16b4460..c1e29b68f331 100644 --- a/pkg/sql/plan.go +++ b/pkg/sql/plan.go @@ -426,14 +426,6 @@ func (p *planTop) close(ctx context.Context) { } } -// start starts the plan. -func (p *planTop) start(params runParams) error { - if err := p.evalSubqueries(params); err != nil { - return err - } - return startPlan(params, p.plan) -} - // columns retrieves the plan's columns. func (p *planTop) columns() sqlbase.ResultColumns { return planColumns(p.plan) @@ -455,20 +447,6 @@ func (p *planTop) collectSpans(params runParams) (readSpans, writeSpans roachpb. return readSpans, writeSpans, nil } -// startPlan starts the given plan and all its sub-query nodes. -func startPlan(params runParams, plan planNode) error { - // Now start execution. - if err := startExec(params, plan); err != nil { - return err - } - - // Finally, trigger limit propagation through the plan. The actual - // LIMIT values will have been evaluated by startExec(). - params.p.setUnlimited(plan) - - return nil -} - // autoCommitNode is implemented by planNodes that might be able to commit the // KV txn in which they operate. Some nodes might want to do this to take // advantage of the 1PC optimization in case they're running as an implicit diff --git a/pkg/sql/plan_node_to_row_source.go b/pkg/sql/plan_node_to_row_source.go index 2fa3f725c015..2891d696a312 100644 --- a/pkg/sql/plan_node_to_row_source.go +++ b/pkg/sql/plan_node_to_row_source.go @@ -135,14 +135,6 @@ func (p *planNodeToRowSource) InternalClose() { } } -func (p *planNodeToRowSource) startExec(_ runParams) error { - // If we're getting startExec'd, it means we're running in local mode - so we - // mark ourselves already started, since local mode will have taken care of - // starting the child nodes of this node. - p.started = true - return nil -} - func (p *planNodeToRowSource) Next() (sqlbase.EncDatumRow, *distsqlrun.ProducerMetadata) { if p.State == distsqlrun.StateRunning && p.fastPath { var count int diff --git a/pkg/sql/planner.go b/pkg/sql/planner.go index f54a52f5337b..ed9714bf3d4b 100644 --- a/pkg/sql/planner.go +++ b/pkg/sql/planner.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/querycache" "github.com/cockroachdb/cockroach/pkg/sql/row" + "github.com/cockroachdb/cockroach/pkg/sql/rowcontainer" "github.com/cockroachdb/cockroach/pkg/sql/sem/transform" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sem/types" @@ -35,6 +36,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/log/logtags" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/pkg/errors" @@ -532,6 +534,57 @@ func (p *planner) prepareForDistSQLSupportCheck() { p.setUnlimited(p.curPlan.plan) } +// runWithDistSQL runs a planNode tree synchronously via DistSQL, returning the +// results in a RowContainer. There's no streaming on this, so use sparingly. +// In general, you should always prefer to use the internal executor if you can. +func (p *planner) runWithDistSQL( + ctx context.Context, plan planNode, +) (*rowcontainer.RowContainer, error) { + params := runParams{ + ctx: ctx, + extendedEvalCtx: &p.extendedEvalCtx, + p: p, + } + // Create the DistSQL plan for the input. + planCtx := params.extendedEvalCtx.DistSQLPlanner.NewPlanningCtx(ctx, params.extendedEvalCtx, params.p.txn) + log.VEvent(ctx, 1, "creating DistSQL plan") + physPlan, err := planCtx.ExtendedEvalCtx.DistSQLPlanner.createPlanForNode(planCtx, plan) + if err != nil { + return nil, err + } + planCtx.ExtendedEvalCtx.DistSQLPlanner.FinalizePlan(planCtx, &physPlan) + columns := planColumns(plan) + + // Initialize a row container for the DistSQL execution engine to write into. + ci := sqlbase.ColTypeInfoFromResCols(columns) + rows := rowcontainer.NewRowContainer(*p.extendedEvalCtx.ActiveMemAcc, ci, 0 /* rowCapacity */) + rowResultWriter := NewRowResultWriter(rows) + recv := MakeDistSQLReceiver( + ctx, + rowResultWriter, + tree.Rows, + p.ExecCfg().RangeDescriptorCache, + p.ExecCfg().LeaseHolderCache, + p.txn, + func(ts hlc.Timestamp) { + _ = p.ExecCfg().Clock.Update(ts) + }, + p.extendedEvalCtx.Tracing, + ) + defer recv.Release() + + // Copy the evalCtx, as dsp.Run() might change it. + evalCtxCopy := p.extendedEvalCtx + // Run the plan, writing to the row container we initialized earlier. + p.extendedEvalCtx.DistSQLPlanner.Run( + planCtx, p.txn, &physPlan, recv, &evalCtxCopy, nil /* finishedSetupFn */) + if rowResultWriter.Err() != nil { + rows.Close(ctx) + return nil, rowResultWriter.Err() + } + return rows, nil +} + // txnModesSetter is an interface used by SQL execution to influence the current // transaction. type txnModesSetter interface { diff --git a/pkg/sql/render.go b/pkg/sql/render.go index f433fe494df8..0f95074edc5d 100644 --- a/pkg/sql/render.go +++ b/pkg/sql/render.go @@ -80,8 +80,6 @@ type renderNode struct { // modified by index selection. props physicalProps - run renderRun - // This struct must be allocated on the heap and its location stay // stable after construction because it implements // IndexedVarContainer and the IndexedVar objects in sub-expressions @@ -344,7 +342,7 @@ func (p *planner) SelectClause( // IndexedVarEval implements the tree.IndexedVarContainer interface. func (r *renderNode) IndexedVarEval(idx int, ctx *tree.EvalContext) (tree.Datum, error) { - return r.run.curSourceRow[idx].Eval(ctx) + panic("renderNode can't be run in local mode") } // IndexedVarResolvedType implements the tree.IndexedVarContainer interface. @@ -357,33 +355,18 @@ func (r *renderNode) IndexedVarNodeFormatter(idx int) tree.NodeFormatter { return r.sourceInfo[0].NodeFormatter(idx) } -// renderRun contains the run-time state of renderNode during local execution. -type renderRun struct { - // The current source row, with one value per source column. - // populated by Next(), used by renderRow(). - curSourceRow tree.Datums - - // The rendered row, with one value for each render expression. - // populated by Next(). - row tree.Datums -} - func (r *renderNode) startExec(runParams) error { - return nil + panic("renderNode can't be run in local mode") } func (r *renderNode) Next(params runParams) (bool, error) { - if next, err := r.source.plan.Next(params); !next { - return false, err - } - - r.run.curSourceRow = r.source.plan.Values() + panic("renderNode can't be run in local mode") +} - err := r.renderRow(params.EvalContext()) - return err == nil, err +func (r *renderNode) Values() tree.Datums { + panic("renderNode can't be run in local mode") } -func (r *renderNode) Values() tree.Datums { return r.run.row } func (r *renderNode) Close(ctx context.Context) { r.source.plan.Close(ctx) } // initFrom initializes the table node, given the parsed select expression @@ -566,22 +549,6 @@ func (r *renderNode) resetRenderColumns(exprs []tree.TypedExpr, cols sqlbase.Res r.columns = cols } -// renderRow renders the row by evaluating the render expressions. -func (r *renderNode) renderRow(evalCtx *tree.EvalContext) error { - if r.run.row == nil { - r.run.row = make([]tree.Datum, len(r.render)) - } - evalCtx.IVarContainer = r - for i, e := range r.render { - var err error - r.run.row[i], err = e.Eval(evalCtx) - if err != nil { - return err - } - } - return nil -} - // computePhysicalPropsForRender computes ordering information for the // render node, given ordering information for the "from" node. // diff --git a/pkg/sql/row_buffer.go b/pkg/sql/row_buffer.go deleted file mode 100644 index ff3e7f49852e..000000000000 --- a/pkg/sql/row_buffer.go +++ /dev/null @@ -1,52 +0,0 @@ -// Copyright 2016 The Cockroach Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -// implied. See the License for the specific language governing -// permissions and limitations under the License. - -package sql - -import ( - "github.com/cockroachdb/cockroach/pkg/sql/rowcontainer" - "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" -) - -// RowBuffer is a buffer for rows of DTuples. Rows must be added using -// AddRow(), once the work is done the Close() method must be called to release -// the allocated memory. -// -// This is intended for nodes where it is simpler to compute a batch of rows at -// once instead of maintaining internal state in order to operate correctly -// under the constraints imposed by Next() and Values() under the planNode -// interface. -type RowBuffer struct { - *rowcontainer.RowContainer - output tree.Datums -} - -// Values here is analogous to Values() as defined under planNode. -// -// Available after Next(), result only valid until the next call to Next() -func (rb *RowBuffer) Values() tree.Datums { - return rb.output -} - -// Next here is analogous to Next() as defined under planNode, if no pre-computed -// results were buffered in prior to the call we return false. Else we stage the -// next output value for the subsequent call to Values(). -func (rb *RowBuffer) Next() bool { - if rb.Len() == 0 { - return false - } - rb.output = rb.At(0) - rb.PopFirst() - return true -} diff --git a/pkg/sql/row_source_to_plan_node.go b/pkg/sql/row_source_to_plan_node.go index e9139d2f8d75..1f1bf415d6e1 100644 --- a/pkg/sql/row_source_to_plan_node.go +++ b/pkg/sql/row_source_to_plan_node.go @@ -30,6 +30,8 @@ type rowSourceToPlanNode struct { source distsqlrun.RowSource forwarder metadataForwarder + // originalPlanNode is the original planNode that the wrapped RowSource got + // planned for. originalPlanNode planNode planCols sqlbase.ResultColumns diff --git a/pkg/sql/scan.go b/pkg/sql/scan.go index e27c5ef4253f..fbf3b3a9b8aa 100644 --- a/pkg/sql/scan.go +++ b/pkg/sql/scan.go @@ -23,12 +23,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/distsqlpb" "github.com/cockroachdb/cockroach/pkg/sql/distsqlrun" "github.com/cockroachdb/cockroach/pkg/sql/privilege" - "github.com/cockroachdb/cockroach/pkg/sql/row" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sem/types" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/util" - "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/pkg/errors" ) @@ -103,7 +101,11 @@ type scanNode struct { // Should be set to true if sqlbase.ParallelScans is true. parallelScansEnabled bool - run scanRun + isSecondaryIndex bool + + // Indicates if this scanNode will do a physical data check. This is + // only true when running SCRUB commands. + isCheck bool // This struct must be allocated on the heap and its location stay // stable after construction because it implements @@ -180,7 +182,7 @@ func (p *planner) Scan() *scanNode { var _ tree.IndexedVarContainer = &scanNode{} func (n *scanNode) IndexedVarEval(idx int, ctx *tree.EvalContext) (tree.Datum, error) { - return n.run.row[idx].Eval(ctx) + panic("scanNode can't be run in local mode") } func (n *scanNode) IndexedVarResolvedType(idx int) types.T { @@ -191,36 +193,8 @@ func (n *scanNode) IndexedVarNodeFormatter(idx int) tree.NodeFormatter { return (*tree.Name)(&n.resultColumns[idx].Name) } -// scanRun contains the run-time state of scanNode during local execution. -type scanRun struct { - // Contains values for the current row. There is a 1-1 correspondence - // between resultColumns and values in row. - row tree.Datums - - // the index of the current row. - rowIndex int64 - - scanInitialized bool - isSecondaryIndex bool - - // Indicates if this scanNode will do a physical data check. This is - // only true when running SCRUB commands. - isCheck bool - - fetcher row.Fetcher -} - func (n *scanNode) startExec(params runParams) error { - tableArgs := row.FetcherTableArgs{ - Desc: n.desc, - Index: n.index, - ColIdxMap: n.colIdxMap, - IsSecondaryIndex: n.run.isSecondaryIndex, - Cols: n.cols, - ValNeededForCol: n.valNeededForCol.Copy(), - } - return n.run.fetcher.Init(n.reverse, false, /* returnRangeInfo */ - false /* isCheck */, ¶ms.p.alloc, tableArgs) + panic("scanNode can't be run in local mode") } func (n *scanNode) Close(context.Context) { @@ -229,35 +203,11 @@ func (n *scanNode) Close(context.Context) { } func (n *scanNode) Next(params runParams) (bool, error) { - tracing.AnnotateTrace() - if !n.run.scanInitialized { - if err := n.initScan(params); err != nil { - return false, err - } - } - - // We fetch one row at a time until we find one that passes the filter. - for n.hardLimit == 0 || n.run.rowIndex < n.hardLimit { - var err error - n.run.row, _, _, err = n.run.fetcher.NextRowDecoded(params.ctx) - if err != nil || n.run.row == nil { - return false, err - } - params.extendedEvalCtx.IVarContainer = n - passesFilter, err := sqlbase.RunFilter(n.filter, params.EvalContext()) - if err != nil { - return false, err - } - if passesFilter { - n.run.rowIndex++ - return true, nil - } - } - return false, nil + panic("scanNode can't be run in local mode") } func (n *scanNode) Values() tree.Datums { - return n.run.row + panic("scanNode can't be run in local mode") } // disableBatchLimit disables the kvfetcher batch limits. Used for index-join, @@ -282,27 +232,6 @@ func (n *scanNode) canParallelize() bool { n.parallelScansEnabled } -// initScan sets up the rowFetcher and starts a scan. -func (n *scanNode) initScan(params runParams) error { - limitHint := n.limitHint() - limitBatches := true - if n.canParallelize() || n.disableBatchLimits { - limitBatches = false - } - if err := n.run.fetcher.StartScan( - params.ctx, - params.p.txn, - n.spans, - limitBatches, - limitHint, - params.p.extendedEvalCtx.Tracing.KVTracingEnabled(), - ); err != nil { - return err - } - n.run.scanInitialized = true - return nil -} - func (n *scanNode) limitHint() int64 { var limitHint int64 if n.hardLimit != 0 { @@ -479,7 +408,6 @@ func (n *scanNode) initDescDefaults(planDeps planDependencies, colCfg scanColumn if len(n.cols) > 0 { n.valNeededForCol.AddRange(0, len(n.cols)-1) } - n.run.row = make([]tree.Datum, len(n.cols)) n.filterVars = tree.MakeIndexedVarHelper(n, len(n.cols)) return nil } diff --git a/pkg/sql/scrub_constraint.go b/pkg/sql/scrub_constraint.go index c4f1c10f85cb..6a9f0cf748e1 100644 --- a/pkg/sql/scrub_constraint.go +++ b/pkg/sql/scrub_constraint.go @@ -20,6 +20,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/sql/parser" + "github.com/cockroachdb/cockroach/pkg/sql/rowcontainer" "github.com/cockroachdb/cockroach/pkg/sql/scrub" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" @@ -47,9 +48,9 @@ type sqlCheckConstraintCheckOperation struct { // sqlCheckConstraintCheckRun contains the run-time state for // sqlCheckConstraintCheckOperation during local execution. type sqlCheckConstraintCheckRun struct { - started bool - rows planNode - hasRowsLeft bool + started bool + rows *rowcontainer.RowContainer + rowIndex int } func newSQLCheckConstraintCheckOperation( @@ -90,49 +91,53 @@ func (o *sqlCheckConstraintCheckOperation) Start(params runParams) error { // use the tableDesc we have, but this is a rare operation and the benefit // would be marginal compared to the work of the actual query, so the added // complexity seems unjustified. - rows, err := params.p.SelectClause(ctx, sel, nil /* orderBy */, nil, /* limit */ + plan, err := params.p.SelectClause(ctx, sel, nil /* orderBy */, nil, /* limit */ nil /* with */, nil /* desiredTypes */, publicColumns) if err != nil { return err } - rows, err = params.p.optimizePlan(ctx, rows, allColumns(rows)) + plan, err = params.p.optimizePlan(ctx, plan, allColumns(plan)) if err != nil { return err } - if err := startPlan(params, rows); err != nil { + planCtx := params.extendedEvalCtx.DistSQLPlanner.NewPlanningCtx(ctx, params.extendedEvalCtx, params.p.txn) + physPlan, err := scrubPlanDistSQL(ctx, planCtx, plan) + if err != nil { return err } - - // Collect all the columns. - for i := range o.tableDesc.Columns { - o.columns = append(o.columns, &o.tableDesc.Columns[i]) + columns := planColumns(plan) + columnTypes := make([]sqlbase.ColumnType, len(columns)) + for i := range planColumns(plan) { + columnTypes[i], err = sqlbase.DatumTypeToColumnType(columns[i].Typ) + if err != nil { + return err + } } - // Find the row indexes for all of the primary index columns. - if o.primaryColIdxs, err = getPrimaryColIdxs(o.tableDesc, o.columns); err != nil { + rows, err := scrubRunDistSQL(ctx, planCtx, params.p, physPlan, columnTypes) + if err != nil { + rows.Close(ctx) return err } o.run.started = true o.run.rows = rows - // Begin the first unit of work. This prepares the hasRowsLeft flag - // for the first iteration. - o.run.hasRowsLeft, err = o.run.rows.Next(params) + + // Collect all the columns. + for i := range o.tableDesc.Columns { + o.columns = append(o.columns, &o.tableDesc.Columns[i]) + } + // Find the row indexes for all of the primary index columns. + o.primaryColIdxs, err = getPrimaryColIdxs(o.tableDesc, o.columns) return err } // Next implements the checkOperation interface. func (o *sqlCheckConstraintCheckOperation) Next(params runParams) (tree.Datums, error) { - row := o.run.rows.Values() + row := o.run.rows.At(o.run.rowIndex) + o.run.rowIndex++ timestamp := tree.MakeDTimestamp( params.extendedEvalCtx.GetStmtTimestamp(), time.Nanosecond) - // Start the next unit of work. This is required so during the next - // call to Done() it is known whether there are any rows left. - var err error - if o.run.hasRowsLeft, err = o.run.rows.Next(params); err != nil { - return nil, err - } - var primaryKeyDatums tree.Datums for _, rowIdx := range o.primaryColIdxs { primaryKeyDatums = append(primaryKeyDatums, row[rowIdx]) @@ -171,7 +176,7 @@ func (o *sqlCheckConstraintCheckOperation) Started() bool { // Done implements the checkOperation interface. func (o *sqlCheckConstraintCheckOperation) Done(ctx context.Context) bool { - return o.run.rows == nil || !o.run.hasRowsLeft + return o.run.rows == nil || o.run.rowIndex >= o.run.rows.Len() } // Close implements the checkOperation interface. diff --git a/pkg/sql/scrub_physical.go b/pkg/sql/scrub_physical.go index 424254f5741c..47711a95e9d1 100644 --- a/pkg/sql/scrub_physical.go +++ b/pkg/sql/scrub_physical.go @@ -110,7 +110,7 @@ func (o *physicalCheckOperation) Start(params runParams) error { NoIndexJoin: true, } scan := params.p.Scan() - scan.run.isCheck = true + scan.isCheck = true colCfg := scanColumnsConfig{wantedColumns: columnIDs, addUnwantedAsHidden: true} if err := scan.initTable(ctx, params.p, o.tableDesc, indexFlags, colCfg); err != nil { return err diff --git a/pkg/sql/sem/tree/datum.go b/pkg/sql/sem/tree/datum.go index 333e888a75f7..64c47c3e85b1 100644 --- a/pkg/sql/sem/tree/datum.go +++ b/pkg/sql/sem/tree/datum.go @@ -142,13 +142,6 @@ type Datums []Datum // Len returns the number of Datum values. func (d Datums) Len() int { return len(d) } -// Reverse reverses the order of the Datum values. -func (d Datums) Reverse() { - for i, j := 0, d.Len()-1; i < j; i, j = i+1, j-1 { - d[i], d[j] = d[j], d[i] - } -} - // Format implements the NodeFormatter interface. func (d *Datums) Format(ctx *FmtCtx) { ctx.WriteByte('(') diff --git a/pkg/sql/sem/tree/eval.go b/pkg/sql/sem/tree/eval.go index ec58ed2710ba..5d6a6e89f2e8 100644 --- a/pkg/sql/sem/tree/eval.go +++ b/pkg/sql/sem/tree/eval.go @@ -2371,6 +2371,11 @@ type EvalPlanner interface { // this interface are always "session-bound" - they inherit session variables // from a parent session. type SessionBoundInternalExecutor interface { + // Query is part of the sqlutil.InternalExecutor interface. + Query( + ctx context.Context, opName string, txn *client.Txn, stmt string, qargs ...interface{}, + ) ([]Datums, error) + // QueryRow is part of the sqlutil.InternalExecutor interface. QueryRow( ctx context.Context, opName string, txn *client.Txn, stmt string, qargs ...interface{}, diff --git a/pkg/sql/sem/tree/expr.go b/pkg/sql/sem/tree/expr.go index d973fcfc0b1f..0a8faa08e9bd 100644 --- a/pkg/sql/sem/tree/expr.go +++ b/pkg/sql/sem/tree/expr.go @@ -1272,18 +1272,6 @@ func (node *FuncExpr) GetAggregateConstructor() func(*EvalContext, Datums) Aggre } } -// GetWindowConstructor returns a window function constructor if the -// FuncExpr is a built-in window function. -func (node *FuncExpr) GetWindowConstructor() func(*EvalContext) WindowFunc { - if node.fn == nil || node.fn.WindowFunc == nil { - return nil - } - return func(evalCtx *EvalContext) WindowFunc { - types := typesOfExprs(node.Exprs) - return node.fn.WindowFunc(types, evalCtx) - } -} - func typesOfExprs(exprs Exprs) []types.T { types := make([]types.T, len(exprs)) for i, expr := range exprs { diff --git a/pkg/sql/show_stats.go b/pkg/sql/show_stats.go index 80aa57695861..afe42a8aa4f3 100644 --- a/pkg/sql/show_stats.go +++ b/pkg/sql/show_stats.go @@ -65,7 +65,7 @@ func (p *planner) ShowTableStats(ctx context.Context, n *tree.ShowTableStats) (p // - convert column IDs to column names // - if the statistic has a histogram, we return the statistic ID as a // "handle" which can be used with SHOW HISTOGRAM. - rows, _ /* cols */, err := p.ExtendedEvalContext().ExecCfg.InternalExecutor.Query( + rows, err := p.ExtendedEvalContext().ExecCfg.InternalExecutor.Query( ctx, "read-table-stats", p.txn, diff --git a/pkg/sql/sort.go b/pkg/sql/sort.go index 2f80ed03e3d0..8a3a29c93f7c 100644 --- a/pkg/sql/sort.go +++ b/pkg/sql/sort.go @@ -15,12 +15,9 @@ package sql import ( - "container/heap" "context" - "sort" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" - "github.com/cockroachdb/cockroach/pkg/sql/rowcontainer" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sem/types" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" @@ -37,8 +34,6 @@ type sortNode struct { ordering sqlbase.ColumnOrdering needSort bool - - run sortRun } // orderBy constructs a sortNode based on the ORDER BY clause. @@ -215,92 +210,20 @@ func (p *planner) orderBy( return &sortNode{columns: columns, ordering: ordering}, nil } -// sortRun contains the run-time state of sortNode during local -// execution. -type sortRun struct { - sortStrategy sortingStrategy - valueIter valueIterator -} - func (n *sortNode) startExec(runParams) error { - return nil + panic("sortNode cannot be run in local mode") } func (n *sortNode) Next(params runParams) (bool, error) { - cancelChecker := params.p.cancelChecker - - for n.needSort { - if vn, ok := n.plan.(*valuesNode); ok { - // The plan we wrap is already a values node. Just sort it. - v := &sortValues{ - ordering: n.ordering, - rows: vn.rows, - evalCtx: params.EvalContext(), - } - n.run.sortStrategy = newSortAllStrategy(v) - n.run.sortStrategy.Finish(params.ctx, cancelChecker) - // Sorting is done. Relinquish the reference on the row container, - // so as to avoid closing it twice. - n.run.sortStrategy = nil - // Fall through -- the remainder of the work is done by the - // valuesNode itself. - n.needSort = false - break - } else if n.run.sortStrategy == nil { - v := params.p.newSortValues(n.ordering, planColumns(n.plan), 0 /*capacity*/) - n.run.sortStrategy = newSortAllStrategy(v) - } - - if err := cancelChecker.Check(); err != nil { - return false, err - } - - // TODO(andrei): If we're scanning an index with a prefix matching an - // ordering prefix, we should only accumulate values for equal fields - // in this prefix, then sort the accumulated chunk and output. - // TODO(irfansharif): matching column ordering speed-ups from distsql, - // when implemented, could be repurposed and used here. - next, err := n.plan.Next(params) - if err != nil { - return false, err - } - if !next { - n.run.sortStrategy.Finish(params.ctx, cancelChecker) - n.run.valueIter = n.run.sortStrategy - n.needSort = false - break - } - - values := n.plan.Values() - if err := n.run.sortStrategy.Add(params.ctx, values); err != nil { - return false, err - } - } - - // Check again, in case sort returned early due to a cancellation. - if err := cancelChecker.Check(); err != nil { - return false, err - } - - if n.run.valueIter == nil { - n.run.valueIter = n.plan - } - return n.run.valueIter.Next(params) + panic("sortNode cannot be run in local mode") } func (n *sortNode) Values() tree.Datums { - // If an ordering expression was used the number of columns in each row might - // differ from the number of columns requested, so trim the result. - return n.run.valueIter.Values()[:len(n.columns)] + panic("sortNode cannot be run in local mode") } func (n *sortNode) Close(ctx context.Context) { n.plan.Close(ctx) - if n.run.sortStrategy != nil { - n.run.sortStrategy.Close(ctx) - } - // n.run.valueIter points to either n.plan or n.run.sortStrategy and thus has already - // been closed. } func ensureColumnOrderable(c sqlbase.ResultColumn) error { @@ -516,340 +439,3 @@ func (p *planner) colIndex(numOriginalCols int, expr tree.Expr, context string) } return int(ord), nil } - -// valueIterator provides iterative access to a value source's values and -// debug values. It is a subset of the planNode interface, so all methods -// should conform to the comments expressed in the planNode definition. -type valueIterator interface { - Next(runParams) (bool, error) - Values() tree.Datums - Close(ctx context.Context) -} - -type sortingStrategy interface { - valueIterator - // Add adds a single value to the sortingStrategy. It guarantees that - // if it decided to store the provided value, that it will make a deep - // copy of it. - Add(context.Context, tree.Datums) error - // Finish terminates the sorting strategy, allowing for postprocessing - // after all values have been provided to the strategy. The method should - // not be called more than once, and should only be called after all Add - // calls have occurred. - Finish(context.Context, *sqlbase.CancelChecker) -} - -// sortAllStrategy reads in all values into the wrapped valuesNode and -// uses sort.Sort to sort all values in-place. It has a worst-case time -// complexity of O(n*log(n)) and a worst-case space complexity of O(n). -// -// The strategy is intended to be used when all values need to be sorted. -type sortAllStrategy struct { - vNode *sortValues -} - -func newSortAllStrategy(vNode *sortValues) sortingStrategy { - return &sortAllStrategy{ - vNode: vNode, - } -} - -func (ss *sortAllStrategy) Add(ctx context.Context, values tree.Datums) error { - _, err := ss.vNode.rows.AddRow(ctx, values) - return err -} - -func (ss *sortAllStrategy) Finish(ctx context.Context, cancelChecker *sqlbase.CancelChecker) { - ss.vNode.SortAll(cancelChecker) -} - -func (ss *sortAllStrategy) Next(params runParams) (bool, error) { - return ss.vNode.Next(params) -} - -func (ss *sortAllStrategy) Values() tree.Datums { - return ss.vNode.Values() -} - -func (ss *sortAllStrategy) Close(ctx context.Context) { - ss.vNode.Close(ctx) -} - -// iterativeSortStrategy reads in all values into the wrapped sortValues -// and turns the underlying slice into a min-heap. It then pops a value -// off of the heap for each call to Next, meaning that it only needs to -// sort the number of values needed, instead of the entire slice. If the -// underlying value source provides n rows and the strategy produce only -// k rows, it has a worst-case time complexity of O(n + k*log(n)) and a -// worst-case space complexity of O(n). -// -// The strategy is intended to be used when an unknown number of values -// need to be sorted, but that most likely not all values need to be sorted. -type iterativeSortStrategy struct { - vNode *sortValues - lastVal tree.Datums - nextRowIdx int -} - -func newIterativeSortStrategy(vNode *sortValues) sortingStrategy { - return &iterativeSortStrategy{ - vNode: vNode, - } -} - -func (ss *iterativeSortStrategy) Add(ctx context.Context, values tree.Datums) error { - _, err := ss.vNode.rows.AddRow(ctx, values) - return err -} - -func (ss *iterativeSortStrategy) Finish(context.Context, *sqlbase.CancelChecker) { - ss.vNode.InitMinHeap() -} - -func (ss *iterativeSortStrategy) Next(runParams) (bool, error) { - if ss.vNode.Len() == 0 { - return false, nil - } - ss.lastVal = ss.vNode.PopValues() - ss.nextRowIdx++ - return true, nil -} - -func (ss *iterativeSortStrategy) Values() tree.Datums { - return ss.lastVal -} - -func (ss *iterativeSortStrategy) Close(ctx context.Context) { - ss.vNode.Close(ctx) -} - -// sortTopKStrategy creates a max-heap in its wrapped sortValues and keeps -// this heap populated with only the top k values seen. It accomplishes this -// by comparing new values (before the deep copy) with the top of the heap. -// If the new value is less than the current top, the top will be replaced -// and the heap will be fixed. If not, the new value is dropped. When finished, -// all values in the heap are popped, sorting the values correctly in-place. -// It has a worst-case time complexity of O(n*log(k)) and a worst-case space -// complexity of O(k). -// -// The strategy is intended to be used when exactly k values need to be sorted, -// where k is known before sorting begins. -// -// TODO(nvanbenschoten): There are better algorithms that can achieve a sorted -// top k in a worst-case time complexity of O(n + k*log(k)) while maintaining -// a worst-case space complexity of O(k). For instance, the top k can be found -// in linear time, and then this can be sorted in linearithmic time. -type sortTopKStrategy struct { - vNode *sortValues - topK int64 -} - -func newSortTopKStrategy(vNode *sortValues, topK int64) sortingStrategy { - ss := &sortTopKStrategy{ - vNode: vNode, - topK: topK, - } - ss.vNode.InitMaxHeap() - return ss -} - -func (ss *sortTopKStrategy) Add(ctx context.Context, values tree.Datums) error { - switch { - case int64(ss.vNode.Len()) < ss.topK: - // The first k values all go into the max-heap. - if err := ss.vNode.PushValues(ctx, values); err != nil { - return err - } - case ss.vNode.ValuesLess(values, ss.vNode.rows.At(0)): - // Once the heap is full, only replace the top - // value if a new value is less than it. If so - // replace and fix the heap. - if err := ss.vNode.rows.Replace(ctx, 0, values); err != nil { - return err - } - heap.Fix(ss.vNode, 0) - } - return nil -} - -func (ss *sortTopKStrategy) Finish(ctx context.Context, cancelChecker *sqlbase.CancelChecker) { - // Pop all values in the heap, resulting in the inverted ordering - // being sorted in reverse. Therefore, the slice is ordered correctly - // in-place. - for ss.vNode.Len() > 0 { - if cancelChecker.Check() != nil { - return - } - heap.Pop(ss.vNode) - } - ss.vNode.ResetLen() -} - -func (ss *sortTopKStrategy) Next(params runParams) (bool, error) { - return ss.vNode.Next(params) -} - -func (ss *sortTopKStrategy) Values() tree.Datums { - return ss.vNode.Values() -} - -func (ss *sortTopKStrategy) Close(ctx context.Context) { - ss.vNode.Close(ctx) -} - -// TODO(pmattis): If the result set is large, we might need to perform the -// sort on disk. There is no point in doing this while we're buffering the -// entire result set in memory. If/when we start streaming results back to -// the client we should revisit. -// -// type onDiskSortStrategy struct{} - -// sortValues is a reduced form of valuesNode for use in a sortStrategy. -type sortValues struct { - // invertSorting inverts the sorting predicate. - invertSorting bool - // ordering indicates the desired ordering of each column in the rows - ordering sqlbase.ColumnOrdering - // rows contains the columns during sorting. - rows *rowcontainer.RowContainer - - // evalCtx is needed because we use datum.Compare() which needs it, - // and the sort.Interface and heap.Interface do not allow - // introducing extra parameters into the Less() method. - evalCtx *tree.EvalContext - - // rowsPopped is used for heaps, it indicates the number of rows - // that were "popped". These rows are still part of the underlying - // rowcontainer.RowContainer, in the range [rows.Len()-n.rowsPopped, - // rows.Len). - rowsPopped int - - // nextRow is used while iterating. - nextRow int -} - -var _ valueIterator = &sortValues{} -var _ sort.Interface = &sortValues{} -var _ heap.Interface = &sortValues{} - -func (p *planner) newSortValues( - ordering sqlbase.ColumnOrdering, columns sqlbase.ResultColumns, capacity int, -) *sortValues { - return &sortValues{ - ordering: ordering, - rows: rowcontainer.NewRowContainer( - p.EvalContext().Mon.MakeBoundAccount(), - sqlbase.ColTypeInfoFromResCols(columns), - capacity, - ), - evalCtx: p.EvalContext(), - } -} - -// Values implement the valuesIterator interface. -func (n *sortValues) Values() tree.Datums { - return n.rows.At(n.nextRow - 1) -} - -// Next implements the valuesIterator interface. -func (n *sortValues) Next(runParams) (bool, error) { - if n.nextRow >= n.rows.Len() { - return false, nil - } - n.nextRow++ - return true, nil -} - -// Close implements the valuesIterator interface. -func (n *sortValues) Close(ctx context.Context) { - n.rows.Close(ctx) -} - -// Len implements the sort.Interface interface. -func (n *sortValues) Len() int { - return n.rows.Len() - n.rowsPopped -} - -// ValuesLess returns the comparison result between the two provided -// Datums slices in the context of the sortValues ordering. -func (n *sortValues) ValuesLess(ra, rb tree.Datums) bool { - return sqlbase.CompareDatums(n.ordering, n.evalCtx, ra, rb) < 0 -} - -// Less implements the sort.Interface interface. -func (n *sortValues) Less(i, j int) bool { - // TODO(pmattis): An alternative to this type of field-based comparison would - // be to construct a sort-key per row using encodeTableKey(). Using a - // sort-key approach would likely fit better with a disk-based sort. - ra, rb := n.rows.At(i), n.rows.At(j) - return n.invertSorting != n.ValuesLess(ra, rb) -} - -// Swap implements the sort.Interface interface. -func (n *sortValues) Swap(i, j int) { - n.rows.Swap(i, j) -} - -// Push implements the heap.Interface interface. -func (n *sortValues) Push(x interface{}) { - // We can't push to the heap via heap.Push because that doesn't - // allow us to return an error. Instead we use PushValues(), which - // adds the value *then* calls heap.Push. By the time control - // arrives here, the value is already added, so there is nothing - // left to do. -} - -// PushValues pushes the given Datums value into the heap -// representation of the sortValues. -func (n *sortValues) PushValues(ctx context.Context, values tree.Datums) error { - _, err := n.rows.AddRow(ctx, values) - // We still need to call heap.Push() to sort the heap. - heap.Push(n, nil) - return err -} - -// Pop implements the heap.Interface interface. -func (n *sortValues) Pop() interface{} { - if n.rowsPopped >= n.rows.Len() { - panic("no more rows to pop") - } - n.rowsPopped++ - // Returning a Datums as an interface{} involves an allocation. Luckily, the - // value of Pop is only used for the return value of heap.Pop, which we can - // avoid using. - return nil -} - -// PopValues pops the top Datums value off the heap representation -// of the sortValues. We avoid using heap.Pop() directly to -// avoid allocating an interface{}. -func (n *sortValues) PopValues() tree.Datums { - heap.Pop(n) - // Return the last popped row. - return n.rows.At(n.rows.Len() - n.rowsPopped) -} - -// ResetLen resets the length to that of the underlying row -// container. This resets the effect that popping values had on the -// sortValues's visible length. -func (n *sortValues) ResetLen() { - n.rowsPopped = 0 -} - -// SortAll sorts all values in the sortValues.rows slice. -func (n *sortValues) SortAll(cancelChecker *sqlbase.CancelChecker) { - n.invertSorting = false - sqlbase.Sort(n, cancelChecker) -} - -// InitMaxHeap initializes the sortValues.rows slice as a max-heap. -func (n *sortValues) InitMaxHeap() { - n.invertSorting = true - heap.Init(n) -} - -// InitMinHeap initializes the sortValues.rows slice as a min-heap. -func (n *sortValues) InitMinHeap() { - n.invertSorting = false - heap.Init(n) -} diff --git a/pkg/sql/sqlutil/internal_executor.go b/pkg/sql/sqlutil/internal_executor.go index bfc9c5bde13d..3fb69fa11144 100644 --- a/pkg/sql/sqlutil/internal_executor.go +++ b/pkg/sql/sqlutil/internal_executor.go @@ -47,6 +47,15 @@ type InternalExecutor interface { // If txn is not nil, the statement will be executed in the respective txn. Query( ctx context.Context, opName string, txn *client.Txn, statement string, qargs ...interface{}, + ) ([]tree.Datums, error) + + // QueryWithCols executes the supplied SQL statement and returns the resulting + // rows and their column types. + // The statement is executed as the root user. + // + // If txn is not nil, the statement will be executed in the respective txn. + QueryWithCols( + ctx context.Context, opName string, txn *client.Txn, statement string, qargs ...interface{}, ) ([]tree.Datums, sqlbase.ResultColumns, error) // QueryRow is like Query, except it returns a single row, or nil if not row is diff --git a/pkg/sql/stats/automatic_stats.go b/pkg/sql/stats/automatic_stats.go index 627b6d028425..c8f9e580ad57 100644 --- a/pkg/sql/stats/automatic_stats.go +++ b/pkg/sql/stats/automatic_stats.go @@ -223,7 +223,7 @@ func (r *Refresher) ensureAllTables(ctx context.Context, settings *settings.Valu return } - rows, _ /* columns */, err := r.ex.Query( + rows, err := r.ex.Query( ctx, "get-tables", nil, /* txn */ diff --git a/pkg/sql/stats/stats_cache.go b/pkg/sql/stats/stats_cache.go index 692599d7def2..a2a5ec2f7b0b 100644 --- a/pkg/sql/stats/stats_cache.go +++ b/pkg/sql/stats/stats_cache.go @@ -297,7 +297,7 @@ FROM system.table_statistics WHERE "tableID" = $1 ORDER BY "createdAt" DESC ` - rows, _ /* cols */, err := sc.SQLExecutor.Query( + rows, err := sc.SQLExecutor.Query( ctx, "get-table-statistics", nil /* txn */, getTableStatisticsStmt, tableID, ) if err != nil { diff --git a/pkg/sql/subquery.go b/pkg/sql/subquery.go index 4321a9e067f7..87603d87a0db 100644 --- a/pkg/sql/subquery.go +++ b/pkg/sql/subquery.go @@ -16,14 +16,11 @@ package sql import ( "context" - "fmt" "github.com/cockroachdb/cockroach/pkg/sql/distsqlrun" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sem/types" - "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" - "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/log" ) @@ -56,147 +53,6 @@ func (p *planner) EvalSubquery(expr *tree.Subquery) (result tree.Datum, err erro return s.result, nil } -func (p *planTop) evalSubqueries(params runParams) error { - for i := range p.subqueryPlans { - sq := &p.subqueryPlans[i] - if sq.started { - // Already started. Nothing to do. - continue - } - - if !sq.expanded { - return pgerror.NewAssertionErrorf("subquery %d (%q) was not expanded properly", i+1, sq.subquery) - } - - if log.V(2) { - log.Infof(params.ctx, "starting subquery %d (%q)", i+1, sq.subquery) - } - - if err := startPlan(params, sq.plan); err != nil { - return err - } - sq.started = true - res, err := sq.doEval(params) - if err != nil { - return err - } - sq.result = res - } - return nil -} - -func (s *subquery) doEval(params runParams) (result tree.Datum, err error) { - switch s.execMode { - case distsqlrun.SubqueryExecModeExists: - // For EXISTS expressions, all we want to know is if there is at least one - // row. - hasRow, err := s.plan.Next(params) - if err != nil { - return nil, err - } - return tree.MakeDBool(tree.DBool(hasRow)), nil - - case distsqlrun.SubqueryExecModeAllRows, distsqlrun.SubqueryExecModeAllRowsNormalized: - var rows tree.DTuple - next, err := s.plan.Next(params) - for ; next; next, err = s.plan.Next(params) { - values := s.plan.Values() - switch len(values) { - case 1: - // This seems hokey, but if we don't do this then the subquery expands - // to a tuple of tuples instead of a tuple of values and an expression - // like "k IN (SELECT foo FROM bar)" will fail because we're comparing - // a single value against a tuple. - rows.D = append(rows.D, values[0]) - default: - // The result from plan.Values() is only valid until the next call to - // plan.Next(), so make a copy. - typ := s.subquery.ResolvedType().(types.TTuple) - valuesCopy := tree.NewDTupleWithLen(typ, len(values)) - copy(valuesCopy.D, values) - rows.D = append(rows.D, valuesCopy) - } - } - if err != nil { - return nil, err - } - - if ok, dir := s.subqueryTupleOrdering(); ok { - if dir == encoding.Descending { - rows.D.Reverse() - } - rows.SetSorted() - } - if s.execMode == distsqlrun.SubqueryExecModeAllRowsNormalized { - rows.Normalize(params.EvalContext()) - } - return &rows, nil - - case distsqlrun.SubqueryExecModeOneRow: - hasRow, err := s.plan.Next(params) - if err != nil { - return nil, err - } - if !hasRow { - return tree.DNull, nil - } - values := s.plan.Values() - switch len(values) { - case 1: - result = values[0] - default: - // We can skip initializing the Types sub-field here: it will be - // populated upon first access to DTuple.ResolvedType(), as per - // contract of DTuple.typ. - typ := s.subquery.ResolvedType().(types.TTuple) - valuesCopy := tree.NewDTupleWithLen(typ, len(values)) - copy(valuesCopy.D, values) - result = valuesCopy - } - return result, nil - - default: - panic(fmt.Sprintf("unexpected subqueryExecMode: %d", s.execMode)) - } -} - -// subqueryTupleOrdering returns whether the rows of the subquery are ordered -// such that the resulting subquery tuple can be considered fully sorted. -// For this to happen, the columns in the subquery must be sorted in the same -// direction and with the same order of precedence that the tuple will have. The -// method will return a boolean specifying whether the result is in sorted order, -// and if so, will specify which direction it is sorted in. -// -// TODO(knz): This will not work for subquery renders that are not row dependent -// like -// SELECT 1 IN (SELECT 1 ORDER BY 1) -// because even if they are included in an ORDER BY clause, they will not be part -// of the plan.Ordering(). -func (s *subquery) subqueryTupleOrdering() (bool, encoding.Direction) { - // Columns must be sorted in the order that they appear in the render - // and which they will later appear in the resulting tuple. - desired := make(sqlbase.ColumnOrdering, len(planColumns(s.plan))) - for i := range desired { - desired[i] = sqlbase.ColumnOrderInfo{ - ColIdx: i, - Direction: encoding.Ascending, - } - } - - // Check Ascending direction. - order := planPhysicalProps(s.plan) - match := order.computeMatch(desired) - if match == len(desired) { - return true, encoding.Ascending - } - // Check Descending direction. - match = order.reverse().computeMatch(desired) - if match == len(desired) { - return true, encoding.Descending - } - return false, 0 -} - // analyzeSubqueries finds tree.Subquery syntax nodes; for each one, it builds // an initial plan, adds an entry in planTop.subqueryPlans, and annotates the // Subquery node with a type and a link (Idx) to that entry. diff --git a/pkg/sql/subquery_test.go b/pkg/sql/subquery_test.go deleted file mode 100644 index 73e668fb0dd7..000000000000 --- a/pkg/sql/subquery_test.go +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright 2015 The Cockroach Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -// implied. See the License for the specific language governing -// permissions and limitations under the License. - -package sql - -import ( - "context" - "testing" - - "github.com/cockroachdb/cockroach/pkg/sql/parser" - "github.com/cockroachdb/cockroach/pkg/testutils" - "github.com/cockroachdb/cockroach/pkg/util/leaktest" -) - -// Test that starting the subqueries returns an error if the evaluation of a -// subquery returns an error. -func TestStartSubqueriesReturnsError(t *testing.T) { - defer leaktest.AfterTest(t)() - sql := "SELECT 1 WHERE (SELECT crdb_internal.force_error('xxx', 'forced') > 0)" - p := makeTestPlanner() - stmt, err := parser.ParseOne(sql) - if err != nil { - t.Fatal(err) - } - p.stmt = &Statement{Statement: stmt} - if err := p.makePlan(context.TODO()); err != nil { - t.Fatal(err) - } - params := runParams{ctx: context.TODO(), p: p, extendedEvalCtx: &p.extendedEvalCtx} - if err := p.curPlan.start(params); !testutils.IsError(err, `forced`) { - t.Fatalf("expected error from force_error(), got: %v", err) - } - p.curPlan.close(context.TODO()) -} diff --git a/pkg/sql/table.go b/pkg/sql/table.go index d170f0afbc5c..fafceb04f6e3 100644 --- a/pkg/sql/table.go +++ b/pkg/sql/table.go @@ -59,7 +59,7 @@ type namespaceKey struct { // system.namespace. func (p *planner) getAllNames(ctx context.Context) (map[sqlbase.ID]namespaceKey, error) { namespace := map[sqlbase.ID]namespaceKey{} - rows, _ /* cols */, err := p.ExtendedEvalContext().ExecCfg.InternalExecutor.Query( + rows, err := p.ExtendedEvalContext().ExecCfg.InternalExecutor.Query( ctx, "get-all-names", p.txn, `SELECT id, "parentID", name FROM system.namespace`, ) diff --git a/pkg/sql/union.go b/pkg/sql/union.go index 4d722872329f..2e286d120fd0 100644 --- a/pkg/sql/union.go +++ b/pkg/sql/union.go @@ -65,8 +65,7 @@ import ( type unionNode struct { // right and left are the data source operands. // right is read first, to populate the `emit` field. - right, left planNode - rightClosed, leftClosed bool + right, left planNode // columns contains the metadata for the results of this node. columns sqlbase.ResultColumns @@ -76,16 +75,11 @@ type unionNode struct { // emitAll is a performance optimization for UNION ALL. When set // the union logic avoids the `emit` logic entirely. emitAll bool - // emit contains the rows seen on the right so far and performs the - // selection/filtering logic. - emit unionNodeEmit // unionType is the type of operation (UNION, INTERSECT, EXCEPT) unionType tree.UnionType // all indicates if the operation is the ALL or DISTINCT version all bool - - run unionRun } // Union constructs a planNode from a UNION/INTERSECT/EXCEPT expression. @@ -107,27 +101,14 @@ func (p *planner) Union( func (p *planner) newUnionNode( typ tree.UnionType, all bool, left, right planNode, ) (planNode, error) { - var emitAll = false - var emit unionNodeEmit + emitAll := false switch typ { case tree.UnionOp: if all { emitAll = true - } else { - emit = make(unionNodeEmitDistinct) } case tree.IntersectOp: - if all { - emit = make(intersectNodeEmitAll) - } else { - emit = make(intersectNodeEmitDistinct) - } case tree.ExceptOp: - if all { - emit = make(exceptNodeEmitAll) - } else { - emit = make(exceptNodeEmitDistinct) - } default: return nil, errors.Errorf("%v is not supported", typ) } @@ -177,179 +158,25 @@ func (p *planner) newUnionNode( columns: unionColumns, inverted: inverted, emitAll: emitAll, - emit: emit, unionType: typ, all: all, } return node, nil } -// unionRun contains the run-time state of unionNode during local execution. -type unionRun struct { - // scratch is a preallocated buffer for formatting the key of the - // current row on the right. - scratch []byte -} - func (n *unionNode) startExec(params runParams) error { - n.run.scratch = make([]byte, 0) - return nil + panic("unionNode cannot be run in local mode") } func (n *unionNode) Next(params runParams) (bool, error) { - if err := params.p.cancelChecker.Check(); err != nil { - return false, err - } - if !n.rightClosed { - return n.readRight(params) - } - if !n.leftClosed { - return n.readLeft(params) - } - return false, nil + panic("unionNode cannot be run in local mode") } func (n *unionNode) Values() tree.Datums { - if !n.rightClosed { - return n.right.Values() - } - if !n.leftClosed { - return n.left.Values() - } - return nil + panic("unionNode cannot be run in local mode") } func (n *unionNode) Close(ctx context.Context) { n.right.Close(ctx) n.left.Close(ctx) } - -func (n *unionNode) readRight(params runParams) (bool, error) { - next, err := n.right.Next(params) - for ; next; next, err = n.right.Next(params) { - if n.emitAll { - return true, nil - } - n.run.scratch = n.run.scratch[:0] - if n.run.scratch, err = sqlbase.EncodeDatumsKeyAscending( - n.run.scratch, n.right.Values()); err != nil { - return false, err - } - // TODO(dan): Sending the entire encodeDTuple to be stored in the map would - // use a lot of memory for big rows or big resultsets. Consider using a hash - // of the bytes instead. - if n.emit.emitRight(n.run.scratch) { - return true, nil - } - } - if err != nil { - return false, err - } - - n.rightClosed = true - return n.readLeft(params) -} - -func (n *unionNode) readLeft(params runParams) (bool, error) { - next, err := n.left.Next(params) - for ; next; next, err = n.left.Next(params) { - if n.emitAll { - return true, nil - } - n.run.scratch = n.run.scratch[:0] - if n.run.scratch, err = sqlbase.EncodeDatumsKeyAscending( - n.run.scratch, n.left.Values()); err != nil { - return false, err - } - if n.emit.emitLeft(n.run.scratch) { - return true, nil - } - } - if err != nil { - return false, err - } - - n.leftClosed = true - return false, nil -} - -// unionNodeEmit represents the emitter logic for one of the six combinations of -// UNION/INTERSECT/EXCEPT and ALL/DISTINCT. As right and then left are iterated, -// state is kept and used to compute the set operation as well as distinctness. -type unionNodeEmit interface { - emitRight([]byte) bool - emitLeft([]byte) bool -} - -type unionNodeEmitDistinct map[string]int -type intersectNodeEmitAll map[string]int -type intersectNodeEmitDistinct map[string]int -type exceptNodeEmitAll map[string]int -type exceptNodeEmitDistinct map[string]int - -// NB: the compiler optimizes out the string allocation in -// `myMap[string(myBytes)]`. See: -// https://github.com/golang/go/commit/f5f5a8b6209f84961687d993b93ea0d397f5d5bf -func (e unionNodeEmitDistinct) emitRight(b []byte) bool { - _, ok := e[string(b)] - e[string(b)] = 1 - return !ok -} - -func (e unionNodeEmitDistinct) emitLeft(b []byte) bool { - _, ok := e[string(b)] - e[string(b)] = 1 - return !ok -} - -func (e intersectNodeEmitAll) emitRight(b []byte) bool { - e[string(b)]++ - return false -} - -func (e intersectNodeEmitAll) emitLeft(b []byte) bool { - if v, ok := e[string(b)]; ok && v > 0 { - e[string(b)]-- - return true - } - return false -} - -func (e intersectNodeEmitDistinct) emitRight(b []byte) bool { - e[string(b)]++ - return false -} - -func (e intersectNodeEmitDistinct) emitLeft(b []byte) bool { - if v, ok := e[string(b)]; ok && v > 0 { - e[string(b)] = 0 - return true - } - return false -} - -func (e exceptNodeEmitAll) emitRight(b []byte) bool { - e[string(b)]++ - return false -} - -func (e exceptNodeEmitAll) emitLeft(b []byte) bool { - if v, ok := e[string(b)]; ok && v > 0 { - e[string(b)]-- - return false - } - return true -} - -func (e exceptNodeEmitDistinct) emitRight(b []byte) bool { - e[string(b)]++ - return false -} - -func (e exceptNodeEmitDistinct) emitLeft(b []byte) bool { - if _, ok := e[string(b)]; !ok { - e[string(b)] = 0 - return true - } - return false -} diff --git a/pkg/sql/user.go b/pkg/sql/user.go index ed6f71d200ce..c8e656193433 100644 --- a/pkg/sql/user.go +++ b/pkg/sql/user.go @@ -51,7 +51,7 @@ func GetUserHashedPassword( // The map value is true if the map key is a role, false if it is a user. func (p *planner) GetAllUsersAndRoles(ctx context.Context) (map[string]bool, error) { query := `SELECT username,"isRole" FROM system.users` - rows, _ /* cols */, err := p.ExtendedEvalContext().ExecCfg.InternalExecutor.Query( + rows, err := p.ExtendedEvalContext().ExecCfg.InternalExecutor.Query( ctx, "read-users", p.txn, query) if err != nil { return nil, err diff --git a/pkg/sql/values_test.go b/pkg/sql/values_test.go index 83721fd4503b..283ab083d64d 100644 --- a/pkg/sql/values_test.go +++ b/pkg/sql/values_test.go @@ -157,7 +157,7 @@ func TestValues(t *testing.T) { t.Fatalf("%d: unexpected error in optimizePlan: %v", i, err) } params := runParams{ctx: ctx, p: p, extendedEvalCtx: &p.extendedEvalCtx} - if err := startPlan(params, plan); err != nil { + if err := startExec(params, plan); err != nil { t.Fatalf("%d: unexpected error in Start: %v", i, err) } var rows []tree.Datums diff --git a/pkg/sql/walk.go b/pkg/sql/walk.go index c61f5e65d6ca..253c11037169 100644 --- a/pkg/sql/walk.go +++ b/pkg/sql/walk.go @@ -355,12 +355,6 @@ func (v *planVisitor) visitInternal(plan planNode, name string) { order.addOrderColumn(o.ColIdx, o.Direction) } v.observer.attr(name, "order", order.AsString(columns)) - switch ss := n.run.sortStrategy.(type) { - case *iterativeSortStrategy: - v.observer.attr(name, "strategy", "iterative") - case *sortTopKStrategy: - v.observer.attr(name, "strategy", fmt.Sprintf("top %d", ss.topK)) - } } n.plan = v.visit(n.plan) diff --git a/pkg/sql/window.go b/pkg/sql/window.go index 6b8dea81ac7f..0525168f54a1 100644 --- a/pkg/sql/window.go +++ b/pkg/sql/window.go @@ -17,17 +17,13 @@ package sql import ( "context" "fmt" - "sort" - "unsafe" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/rowcontainer" - "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins" "github.com/cockroachdb/cockroach/pkg/sql/sem/transform" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sem/types" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" - "github.com/cockroachdb/cockroach/pkg/util/duration" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/mon" ) @@ -162,8 +158,7 @@ type windowRun struct { wrappedRenderVals *rowcontainer.RowContainer // The populated values for this windowNode. - values valuesNode - populated bool + values valuesNode windowValues [][]tree.Datum curRowIdx int @@ -173,48 +168,15 @@ type windowRun struct { } func (n *windowNode) startExec(params runParams) error { - n.run.windowsAcc = params.EvalContext().Mon.MakeBoundAccount() - - return nil + panic("windowNode can't be run in local mode") } func (n *windowNode) Next(params runParams) (bool, error) { - for !n.run.populated { - if err := params.p.cancelChecker.Check(); err != nil { - return false, err - } - - next, err := n.plan.Next(params) - if err != nil { - return false, err - } - if !next { - n.run.populated = true - if err := n.computeWindows(params.ctx, params.EvalContext()); err != nil { - return false, err - } - n.run.values.rows = rowcontainer.NewRowContainer( - params.EvalContext().Mon.MakeBoundAccount(), - sqlbase.ColTypeInfoFromResCols(n.run.values.columns), - n.run.wrappedRenderVals.Len(), - ) - if err := n.populateValues(params.ctx, params.EvalContext()); err != nil { - return false, err - } - break - } - - values := n.plan.Values() - if _, err := n.run.wrappedRenderVals.AddRow(params.ctx, values); err != nil { - return false, err - } - } - - return n.run.values.Next(params) + panic("windowNode can't be run in local mode") } func (n *windowNode) Values() tree.Datums { - return n.run.values.Values() + panic("windowNode can't be run in local mode") } func (n *windowNode) Close(ctx context.Context) { @@ -592,366 +554,6 @@ func (n *windowNode) replaceIndexVarsAndAggFuncs(s *renderNode) { } } -type partitionSorter struct { - evalCtx *tree.EvalContext - rows indexedRows - windowDefVals *rowcontainer.RowContainer - ordering sqlbase.ColumnOrdering -} - -// partitionSorter implements the sort.Interface interface. -func (n *partitionSorter) Len() int { return n.rows.Len() } -func (n *partitionSorter) Swap(i, j int) { - n.rows.rows[i], n.rows.rows[j] = n.rows.rows[j], n.rows.rows[i] -} -func (n *partitionSorter) Less(i, j int) bool { return n.Compare(i, j) < 0 } - -// partitionSorter implements the PeerGroupChecker interface. -func (n *partitionSorter) InSameGroup(i, j int) (bool, error) { return n.Compare(i, j) == 0, nil } - -func (n *partitionSorter) Compare(i, j int) int { - ra, rb := n.rows.rows[i], n.rows.rows[j] - defa, defb := n.windowDefVals.At(ra.idx), n.windowDefVals.At(rb.idx) - for _, o := range n.ordering { - da := defa[o.ColIdx] - db := defb[o.ColIdx] - if c := da.Compare(n.evalCtx, db); c != 0 { - if o.Direction != encoding.Ascending { - return -c - } - return c - } - } - return 0 -} - -type allPeers struct{} - -// allPeers implements the PeerGroupChecker interface. -func (allPeers) InSameGroup(i, j int) (bool, error) { return true, nil } - -// computeWindows populates n.run.windowValues, adding a column of values to the -// 2D-slice for each window function in n.funcs. This needs to be performed -// all at once because in order to compute the result of a window function -// for any single row, we need to have access to all rows at the same time. -// -// The state shared between rows while computing all window functions for a -// single row is not easily extracted for two reasons: -// 1. window functions can define different partitioning attributes -// 2. window functions can define different column orderings within partitions -// -// The general structure is: -// for each window function -// compute partitions -// for each partition -// sort partition -// evaluate window frame over partition per cell, keeping track of peer groups -func (n *windowNode) computeWindows(ctx context.Context, evalCtx *tree.EvalContext) error { - rowCount := n.run.wrappedRenderVals.Len() - if rowCount == 0 { - return nil - } - - windowCount := len(n.funcs) - - winValSz := uintptr(rowCount) * unsafe.Sizeof([]tree.Datum{}) - winAllocSz := uintptr(rowCount*windowCount) * unsafe.Sizeof(tree.Datum(nil)) - if err := n.run.windowsAcc.Grow(ctx, int64(winValSz+winAllocSz)); err != nil { - return err - } - - n.run.windowValues = make([][]tree.Datum, rowCount) - windowAlloc := make([]tree.Datum, rowCount*windowCount) - for i := range n.run.windowValues { - n.run.windowValues[i] = windowAlloc[i*windowCount : (i+1)*windowCount] - } - - var scratchBytes []byte - var scratchDatum []tree.Datum - for windowIdx, windowFn := range n.funcs { - frameRun := &tree.WindowFrameRun{ - ArgIdxStart: windowFn.argIdxStart, - ArgCount: windowFn.argCount, - RowIdx: 0, - FilterColIdx: windowFn.filterColIdx, - } - if n.run.windowFrames[windowIdx] != nil { - frameRun.Frame = n.run.windowFrames[windowIdx] - // OffsetExpr's must be integer expressions not containing any variables, aggregate functions, or window functions, - // so we need to make sure these expressions are evaluated before using offsets. - bounds := frameRun.Frame.Bounds - if bounds.StartBound.HasOffset() { - typedStartOffset := bounds.StartBound.OffsetExpr.(tree.TypedExpr) - dStartOffset, err := typedStartOffset.Eval(evalCtx) - if err != nil { - return err - } - if dStartOffset == tree.DNull { - return pgerror.NewErrorf(pgerror.CodeNullValueNotAllowedError, "frame starting offset must not be null") - } - if isNegative(evalCtx, dStartOffset) { - if frameRun.Frame.Mode == tree.RANGE { - return pgerror.NewErrorf(pgerror.CodeInvalidWindowFrameOffsetError, "invalid preceding or following size in window function") - } - return pgerror.NewErrorf(pgerror.CodeInvalidWindowFrameOffsetError, "frame starting offset must not be negative") - } - frameRun.StartBoundOffset = dStartOffset - } - if bounds.EndBound != nil && bounds.EndBound.HasOffset() { - typedEndOffset := bounds.EndBound.OffsetExpr.(tree.TypedExpr) - dEndOffset, err := typedEndOffset.Eval(evalCtx) - if err != nil { - return err - } - if dEndOffset == tree.DNull { - return pgerror.NewErrorf(pgerror.CodeNullValueNotAllowedError, "frame ending offset must not be null") - } - if isNegative(evalCtx, dEndOffset) { - if frameRun.Frame.Mode == tree.RANGE { - return pgerror.NewErrorf(pgerror.CodeInvalidWindowFrameOffsetError, "invalid preceding or following size in window function") - } - return pgerror.NewErrorf(pgerror.CodeInvalidWindowFrameOffsetError, "frame ending offset must not be negative") - } - frameRun.EndBoundOffset = dEndOffset - } - if frameRun.RangeModeWithOffsets() { - frameRun.OrdColIdx = windowFn.columnOrdering[0].ColIdx - frameRun.OrdDirection = windowFn.columnOrdering[0].Direction - - colTyp := windowFn.ordColTyp - // Type of offset depends on the ordering column's type. - offsetTyp := colTyp - if types.IsDateTimeType(colTyp) { - // For datetime related ordering columns, offset must be an Interval. - offsetTyp = types.Interval - } - plusOp, minusOp, found := tree.WindowFrameRangeOps{}.LookupImpl(colTyp, offsetTyp) - if !found { - return pgerror.NewErrorf(pgerror.CodeWindowingError, "given logical offset cannot be combined with ordering column") - } - frameRun.PlusOp, frameRun.MinusOp = plusOp, minusOp - } - } - - partitions := make(map[string]indexedRows) - - if len(windowFn.partitionIdxs) == 0 { - // If no partition indexes are included for the window function, all - // rows are added to the same partition, which need to be pre-allocated. - sz := int64(uintptr(rowCount)*unsafe.Sizeof(indexedRow{}) + unsafe.Sizeof(indexedRows{})) - if err := n.run.windowsAcc.Grow(ctx, sz); err != nil { - return err - } - partitions[""] = indexedRows{rows: make([]indexedRow, rowCount)} - } - - if num := len(windowFn.partitionIdxs); num > cap(scratchDatum) { - sz := int64(uintptr(num) * unsafe.Sizeof(tree.Datum(nil))) - if err := n.run.windowsAcc.Grow(ctx, sz); err != nil { - return err - } - scratchDatum = make([]tree.Datum, num) - } else { - scratchDatum = scratchDatum[:num] - } - - // Partition rows into separate partitions based on hash values of the - // window function's PARTITION BY attribute. - // - // TODO(nvanbenschoten): Window functions with the same window definition - // can share partition and sorting work. - // See Cao et al. [http://vldb.org/pvldb/vol5/p1244_yucao_vldb2012.pdf] - for rowI := 0; rowI < rowCount; rowI++ { - row := n.run.wrappedRenderVals.At(rowI) - // We need the whole row and not just arguments to window functions since - // in RANGE mode we might need access to the column over which the rows - // are sorted, and all such columns come after all arguments to window - // functions. - entry := indexedRow{idx: rowI, row: row} - if len(windowFn.partitionIdxs) == 0 { - // If no partition indexes are included for the window function, all - // rows are added to the same partition. - partitions[""].rows[rowI] = entry - } else { - // If the window function has partition indexes, we hash the values of each - // of these indexes for each row, and partition based on this hashed value. - for i, idx := range windowFn.partitionIdxs { - scratchDatum[i] = row[idx] - } - - encoded, err := sqlbase.EncodeDatumsKeyAscending(scratchBytes, scratchDatum) - if err != nil { - return err - } - - sz := int64(uintptr(len(encoded)) + unsafe.Sizeof(entry)) - if err := n.run.windowsAcc.Grow(ctx, sz); err != nil { - return err - } - partition := partitions[string(encoded)] - partition.rows = append(partition.rows, entry) - partitions[string(encoded)] = partition - scratchBytes = encoded[:0] - } - } - - // For each partition, perform necessary sorting based on the window function's - // ORDER BY attribute. After this, perform the window function computation for - // each tuple and save the result in n.run.windowValues. - // - // TODO(nvanbenschoten) - // - Investigate inter- and intra-partition parallelism - // - Investigate more efficient aggregation techniques - // * Removable Cumulative - // * Segment Tree - // See Leis et al. [http://www.vldb.org/pvldb/vol8/p1058-leis.pdf] - for _, partition := range partitions { - builtin := windowFn.expr.GetWindowConstructor()(evalCtx) - defer builtin.Close(ctx, evalCtx) - - var peerGrouper tree.PeerGroupChecker - if windowFn.columnOrdering != nil { - // If an ORDER BY clause is provided, order the partition and use the - // sorter as our PeerGroupChecker. - sorter := &partitionSorter{ - evalCtx: evalCtx, - rows: partition, - windowDefVals: n.run.wrappedRenderVals, - ordering: windowFn.columnOrdering, - } - // The sort needs to be deterministic because multiple window functions with - // syntactically equivalent ORDER BY clauses in their window definitions - // need to be guaranteed to be evaluated in the same order, even if the - // ORDER BY *does not* uniquely determine an ordering. In the future, this - // could be guaranteed by only performing a single pass over a sorted partition - // for functions with syntactically equivalent PARTITION BY and ORDER BY clauses. - sort.Sort(sorter) - peerGrouper = sorter - } else { - // If ORDER BY clause is not provided, all rows are peers. - peerGrouper = allPeers{} - } - - frameRun.Rows = partition - frameRun.RowIdx = 0 - - if !frameRun.IsDefaultFrame() { - // We have a custom frame not equivalent to default one, so if we have - // an aggregate function, we want to reset it for each row. - // Not resetting is an optimization since we're not computing - // the result over the whole frame but only as a result of the current - // row and previous results of aggregation. - builtins.ShouldReset(builtin) - } - - if err := frameRun.PeerHelper.Init(frameRun, peerGrouper); err != nil { - return err - } - frameRun.CurRowPeerGroupNum = 0 - - for frameRun.RowIdx < partition.Len() { - // Perform calculations on each row in the current peer group. - peerGroupEndIdx := frameRun.PeerHelper.GetFirstPeerIdx(frameRun.CurRowPeerGroupNum) + frameRun.PeerHelper.GetRowCount(frameRun.CurRowPeerGroupNum) - for ; frameRun.RowIdx < peerGroupEndIdx; frameRun.RowIdx++ { - res, err := builtin.Compute(ctx, evalCtx, frameRun) - if err != nil { - return err - } - - // This may overestimate, because WindowFuncs may perform internal caching. - sz := res.Size() - if err := n.run.windowsAcc.Grow(ctx, int64(sz)); err != nil { - return err - } - - // Save result into n.run.windowValues, indexed by original row index. - valRowIdx := partition.rows[frameRun.RowIdx].idx - n.run.windowValues[valRowIdx][windowIdx] = res - } - if err := frameRun.PeerHelper.Update(frameRun); err != nil { - return err - } - frameRun.CurRowPeerGroupNum++ - } - } - } - return nil -} - -// isNegative returns whether offset is negative. -func isNegative(evalCtx *tree.EvalContext, offset tree.Datum) bool { - switch o := offset.(type) { - case *tree.DInt: - return *o < 0 - case *tree.DDecimal: - return o.Negative - case *tree.DFloat: - return *o < 0 - case *tree.DInterval: - return o.Compare(evalCtx, &tree.DInterval{Duration: duration.Duration{}}) < 0 - default: - panic("unexpected offset type") - } -} - -// populateValues populates n.run.values with final datum values after computing -// window result values in n.run.windowValues. -func (n *windowNode) populateValues(ctx context.Context, evalCtx *tree.EvalContext) error { - rowCount := n.run.wrappedRenderVals.Len() - row := make(tree.Datums, len(n.windowRender)) - for i := 0; i < rowCount; i++ { - wrappedRow := n.run.wrappedRenderVals.At(i) - - n.run.curRowIdx = i // Point all windowFuncHolders to the correct row values. - curColIdx := 0 - curFnIdx := 0 - for j := range row { - if curWindowRender := n.windowRender[j]; curWindowRender == nil { - // If the windowRender at this index is nil, propagate the datum - // directly from the wrapped planNode. It wasn't changed by windowNode. - row[j] = wrappedRow[curColIdx] - curColIdx++ - } else { - // If the windowRender is not nil, ignore 0 or more columns from the wrapped - // planNode. These were used as arguments to window functions all beneath - // a single windowRender. - // SELECT rank() over () from t; -> ignore 0 from wrapped values - // SELECT (rank() over () + avg(b) over ()) from t; -> ignore 1 from wrapped values - // SELECT (avg(a) over () + avg(b) over ()) from t; -> ignore 2 from wrapped values - for ; curFnIdx < len(n.funcs); curFnIdx++ { - windowFn := n.funcs[curFnIdx] - if windowFn.argIdxStart != curColIdx { - break - } - curColIdx += windowFn.argCount - } - // Instead, we evaluate the current window render, which depends on at least - // one window function, at the given row. - evalCtx.PushIVarContainer(&n.colAndAggContainer) - res, err := curWindowRender.Eval(evalCtx) - evalCtx.PopIVarContainer() - if err != nil { - return err - } - row[j] = res - } - } - - if _, err := n.run.values.rows.AddRow(ctx, row); err != nil { - return err - } - } - - // Done using the output of computeWindows, release memory and clear - // accounts. - n.run.wrappedRenderVals.Close(ctx) - n.run.wrappedRenderVals = nil - n.run.windowValues = nil - n.run.windowsAcc.Close(ctx) - - return nil -} - type extractWindowFuncsVisitor struct { n *windowNode @@ -1149,39 +751,3 @@ func (c *windowNodeColAndAggContainer) IndexedVarNodeFormatter(idx int) tree.Nod // Avoid duplicating the type annotation by calling .Format directly. return c.sourceInfo.NodeFormatter(idx) } - -// indexedRows are rows with the corresponding indices. -type indexedRows struct { - rows []indexedRow -} - -// Len implements tree.IndexedRows interface. -func (ir indexedRows) Len() int { - return len(ir.rows) -} - -// GetRow implements tree.IndexedRows interface. -func (ir indexedRows) GetRow(_ context.Context, idx int) (tree.IndexedRow, error) { - return ir.rows[idx], nil -} - -// indexedRow is a row with a corresponding index. -type indexedRow struct { - idx int - row tree.Datums -} - -// GetIdx implements tree.IndexedRow interface. -func (ir indexedRow) GetIdx() int { - return ir.idx -} - -// GetDatum implements tree.IndexedRow interface. -func (ir indexedRow) GetDatum(colIdx int) (tree.Datum, error) { - return ir.row[colIdx], nil -} - -// GetDatums implements tree.IndexedRow interface. -func (ir indexedRow) GetDatums(firstColIdx, lastColIdx int) (tree.Datums, error) { - return ir.row[firstColIdx:lastColIdx], nil -}