Skip to content

Commit

Permalink
Merge #34383
Browse files Browse the repository at this point in the history
34383: sql: delete local implementations of planNodes r=jordanlewis a=jordanlewis

This PR deletes the remaining users of the planNode execution engine and deletes the duplicate implementations for those planNodes that have DistSQL equivalents.

Closes #33173.

Co-authored-by: Jordan Lewis <[email protected]>
  • Loading branch information
craig[bot] and jordanlewis committed Feb 19, 2019
2 parents 8f0299f + b799599 commit 2ebd51d
Show file tree
Hide file tree
Showing 60 changed files with 300 additions and 2,670 deletions.
4 changes: 2 additions & 2 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ func (r *Registry) maybeCancelJobs(ctx context.Context, nl NodeLiveness) {

func (r *Registry) cleanupOldJobs(ctx context.Context, olderThan time.Time) error {
const stmt = `SELECT id, payload FROM system.jobs WHERE status IN ($1, $2, $3) AND created < $4 ORDER BY created LIMIT 1000`
rows, _ /* cols */, err := r.ex.Query(
rows, err := r.ex.Query(
ctx, "gc-jobs", nil /* txn */, stmt, StatusFailed, StatusSucceeded, StatusCanceled, olderThan,
)
if err != nil {
Expand Down Expand Up @@ -600,7 +600,7 @@ func AddResumeHook(fn ResumeHookFn) {

func (r *Registry) maybeAdoptJob(ctx context.Context, nl NodeLiveness) error {
const stmt = `SELECT id, payload, progress IS NULL FROM system.jobs WHERE status IN ($1, $2) ORDER BY created DESC`
rows, _ /* cols */, err := r.ex.Query(
rows, err := r.ex.Query(
ctx, "adopt-job", nil /* txn */, stmt, StatusPending, StatusRunning,
)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/server/server_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (s *Server) upgradeStatus(ctx context.Context) (bool, error) {
}

// Check if auto upgrade is enabled at current version.
datums, _, err := s.internalExecutor.Query(
datums, err := s.internalExecutor.Query(
ctx, "read-downgrade", nil, /* txn */
"SELECT value FROM system.settings WHERE name = 'cluster.preserve_downgrade_option';",
)
Expand All @@ -162,7 +162,7 @@ func (s *Server) upgradeStatus(ctx context.Context) (bool, error) {
// (which returns the version from the KV store as opposed to the possibly
// lagging settings subsystem).
func (s *Server) clusterVersion(ctx context.Context) (string, error) {
datums, _, err := s.internalExecutor.Query(
datums, err := s.internalExecutor.Query(
ctx, "show-version", nil, /* txn */
"SHOW CLUSTER SETTING version;",
)
Expand Down
4 changes: 2 additions & 2 deletions pkg/server/updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ func (s *Server) getReportingInfo(ctx context.Context) *diagnosticspb.Diagnostic
// Read the system.settings table to determine the settings for which we have
// explicitly set values -- the in-memory SV has the set and default values
// flattened for quick reads, but we'd rather only report the non-defaults.
if datums, _, err := s.internalExecutor.Query(
if datums, err := s.internalExecutor.Query(
ctx, "read-setting", nil /* txn */, "SELECT name FROM system.settings",
); err != nil {
log.Warningf(ctx, "failed to read settings: %s", err)
Expand All @@ -358,7 +358,7 @@ func (s *Server) getReportingInfo(ctx context.Context) *diagnosticspb.Diagnostic
}
}

if datums, _, err := s.internalExecutor.Query(
if datums, err := s.internalExecutor.Query(
ctx,
"read-zone-configs",
nil, /* txn */
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/authorization.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func (p *planner) resolveMemberOfWithAdminOption(
}
visited[m] = struct{}{}

rows, _ /* cols */, err := p.ExecCfg().InternalExecutor.Query(
rows, err := p.ExecCfg().InternalExecutor.Query(
ctx, "expand-roles", nil /* txn */, lookupRolesStmt, m,
)
if err != nil {
Expand Down
64 changes: 25 additions & 39 deletions pkg/sql/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ func validateCheckExpr(
}
lim := &tree.Limit{Count: tree.NewDInt(1)}
stmt := &tree.Select{Select: sel, Limit: lim}

queryStr := tree.AsStringWithFlags(stmt, tree.FmtParsable)
log.Infof(ctx, "Validating check constraint %q with query %q", expr.String(), queryStr)

Expand Down Expand Up @@ -178,34 +177,27 @@ func (p *planner) validateForeignKey(
query,
)

rows, err := p.delegateQuery(ctx, "ALTER TABLE VALIDATE", query, nil, nil)
plan, err := p.delegateQuery(ctx, "ALTER TABLE VALIDATE", query, nil, nil)
if err != nil {
return err
}

rows, err = p.optimizePlan(ctx, rows, allColumns(rows))
plan, err = p.optimizePlan(ctx, plan, allColumns(plan))
if err != nil {
return err
}
defer rows.Close(ctx)
defer plan.Close(ctx)

params := runParams{
ctx: ctx,
extendedEvalCtx: &p.extendedEvalCtx,
p: p,
}
if err := startPlan(params, rows); err != nil {
return err
}
next, err := rows.Next(params)
rows, err := p.runWithDistSQL(ctx, plan)
if err != nil {
return err
}
defer rows.Close(ctx)

if next {
if rows.Len() > 0 {
return pgerror.NewErrorf(pgerror.CodeForeignKeyViolationError,
"foreign key violation: MATCH FULL does not allow mixing of null and nonnull values %s for %s",
rows.Values(), srcIdx.ForeignKey.Name,
rows.At(0), srcIdx.ForeignKey.Name,
)
}
}
Expand All @@ -217,42 +209,36 @@ func (p *planner) validateForeignKey(
query,
)

rows, err := p.delegateQuery(ctx, "ALTER TABLE VALIDATE", query, nil, nil)
plan, err := p.delegateQuery(ctx, "ALTER TABLE VALIDATE", query, nil, nil)
if err != nil {
return err
}

rows, err = p.optimizePlan(ctx, rows, allColumns(rows))
plan, err = p.optimizePlan(ctx, plan, allColumns(plan))
if err != nil {
return err
}
defer rows.Close(ctx)
defer plan.Close(ctx)

params := runParams{
ctx: ctx,
extendedEvalCtx: &p.extendedEvalCtx,
p: p,
}
if err := startPlan(params, rows); err != nil {
return err
}
next, err := rows.Next(params)
rows, err := p.runWithDistSQL(ctx, plan)
if err != nil {
return err
}
defer rows.Close(ctx)

if next {
values := rows.Values()
var pairs bytes.Buffer
for i := range values {
if i > 0 {
pairs.WriteString(", ")
}
pairs.WriteString(fmt.Sprintf("%s=%v", srcIdx.ColumnNames[i], values[i]))
if rows.Len() == 0 {
return nil
}

values := rows.At(0)
var pairs bytes.Buffer
for i := range values {
if i > 0 {
pairs.WriteString(", ")
}
return pgerror.NewErrorf(pgerror.CodeForeignKeyViolationError,
"foreign key violation: %q row %s has no match in %q",
srcTable.Name, pairs.String(), targetTable.Name)
pairs.WriteString(fmt.Sprintf("%s=%v", srcIdx.ColumnNames[i], values[i]))
}
return nil
return pgerror.NewErrorf(pgerror.CodeForeignKeyViolationError,
"foreign key violation: %q row %s has no match in %q",
srcTable.Name, pairs.String(), targetTable.Name)
}
2 changes: 1 addition & 1 deletion pkg/sql/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func (c *copyMachine) insertRows(ctx context.Context) (retErr error) {
extendedEvalCtx: &c.p.extendedEvalCtx,
p: &c.p,
}
if err := startPlan(params, insertNode); err != nil {
if err := startExec(params, insertNode); err != nil {
return err
}
rows, err := countRowsAffected(params, insertNode)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/crdb_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -1739,7 +1739,7 @@ CREATE TABLE crdb_internal.zones (
return 0, "", fmt.Errorf("object with ID %d does not exist", id)
}

rows, _ /* cols */, err := p.ExtendedEvalContext().ExecCfg.InternalExecutor.Query(
rows, err := p.ExtendedEvalContext().ExecCfg.InternalExecutor.Query(
ctx, "crdb-internal-zones-table", p.txn, `SELECT id, config FROM system.zones`)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/create_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func checkRunningJobs(ctx context.Context, job *jobs.Job, p *planner) error {
}
const stmt = `SELECT id, payload FROM system.jobs WHERE status IN ($1, $2) ORDER BY created`

rows, _ /* cols */, err := p.ExecCfg().InternalExecutor.Query(
rows, err := p.ExecCfg().InternalExecutor.Query(
ctx, "get-jobs", nil /* txn */, stmt, jobs.StatusPending, jobs.StatusRunning,
)
if err != nil {
Expand Down
58 changes: 3 additions & 55 deletions pkg/sql/distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,10 @@ package sql
import (
"context"

"github.com/cockroachdb/cockroach/pkg/sql/distsqlpb"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlrun"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sem/types"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/pkg/errors"
)

// distinctNode de-duplicates rows returned by a wrapped planNode.
Expand All @@ -42,8 +39,6 @@ type distinctNode struct {
// Subset of distinctOnColIdxs on which the input guarantees an ordering.
// All rows that are equal on these columns appear contiguously in the input.
columnsInOrder util.FastIntSet

run *rowSourceToPlanNode
}

// distinct constructs a distinctNode.
Expand Down Expand Up @@ -190,65 +185,18 @@ func (p *planner) distinct(
}

func (n *distinctNode) startExec(params runParams) error {
flowCtx := &distsqlrun.FlowCtx{
EvalCtx: params.EvalContext(),
}

cols := make([]int, len(planColumns(n.plan)))
for i := range cols {
cols[i] = i
}

spec := createDistinctSpec(n, cols)

input, err := makePlanNodeToRowSource(n.plan, params, false)
if err != nil {
return err
}
if len(spec.DistinctColumns) == 0 {
return errors.New("cannot initialize a distinctNode with 0 columns")
}

// Normally, startExec isn't recursive, since it's invoked for all nodes using
// the planTree walker. And as normal, the walker will startExec the source
// of this distinct.
// But, we also need to startExec our planNodeToRowSource to properly
// initialize it. That won't get touched via the planNode walker, so we have
// to do it recursively here.
if err := input.startExec(params); err != nil {
return err
}
if err := input.InitWithOutput(&distsqlpb.PostProcessSpec{}, nil); err != nil {
return err
}

post := &distsqlpb.PostProcessSpec{} // post is not used as we only use the processor for the core distinct logic.
var output distsqlrun.RowReceiver // output is never used as distinct is only run as a RowSource.

proc, err := distsqlrun.NewDistinct(flowCtx, 0 /* processorID */, spec, input, post, output)
if err != nil {
return err
}

n.run = makeRowSourceToPlanNode(proc, nil /* forwarder */, planColumns(n), nil /* originalPlanNode */)

n.run.source.Start(params.ctx)

return nil
panic("distinctNode can't be called in local mode")
}

func (n *distinctNode) Next(params runParams) (bool, error) {
return n.run.Next(params)
panic("distinctNode can't be called in local mode")
}

func (n *distinctNode) Values() tree.Datums {
return n.run.Values()
panic("distinctNode can't be called in local mode")
}

func (n *distinctNode) Close(ctx context.Context) {
if n.run != nil {
n.run.Close(ctx)
}
n.plan.Close(ctx)
}

Expand Down
10 changes: 2 additions & 8 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,7 @@ func initTableReaderSpec(
*s = distsqlpb.TableReaderSpec{
Table: *n.desc.TableDesc(),
Reverse: n.reverse,
IsCheck: n.run.isCheck,
IsCheck: n.isCheck,
Visibility: n.colCfg.visibility.toDistSQLScanVisibility(),

// Retain the capacity of the spans slice.
Expand All @@ -877,7 +877,7 @@ func initTableReaderSpec(
// When a TableReader is running scrub checks, do not allow a
// post-processor. This is because the outgoing stream is a fixed
// format (distsqlrun.ScrubTypes).
if n.run.isCheck {
if n.isCheck {
return s, distsqlpb.PostProcessSpec{}, nil
}

Expand Down Expand Up @@ -2447,12 +2447,6 @@ func (dsp *DistSQLPlanner) wrapPlan(planCtx *PlanningCtx, n planNode) (PhysicalP
// Don't continue recursing into explain nodes - they need to be left
// alone since they handle their own planning later.
return false, nil
case *deleteNode:
// DeleteNode currently uses its scanNode directly, if it exists. This
// is a bit tough to fix, so for now, don't try to recurse through
// deleteNodes.
// TODO(jordan): fix deleteNode to stop doing that.
return false, nil
}
if !seenTop {
// We know we're wrapping the first node, so ignore it.
Expand Down
43 changes: 43 additions & 0 deletions pkg/sql/distsqlrun/hashjoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,49 @@ func (h *hashJoiner) receiveNext(
} else if row == nil {
return nil, nil, false, nil
}
// We make the explicit check for whether or not the row contained a NULL value
// on an equality column. The reasoning here is because of the way we expect
// NULL equality checks to behave (i.e. NULL != NULL) and the fact that we
// use the encoding of any given row as key into our bucket. Thus if we
// encountered a NULL row when building the hashmap we have to store in
// order to use it for RIGHT OUTER joins but if we encounter another
// NULL row when going through the left stream (probing phase), matching
// this with the first NULL row would be incorrect.
//
// If we have have the following:
// CREATE TABLE t(x INT); INSERT INTO t(x) VALUES (NULL);
// | x |
// ------
// | NULL |
//
// For the following query:
// SELECT * FROM t AS a FULL OUTER JOIN t AS b USING(x);
//
// We expect:
// | x |
// ------
// | NULL |
// | NULL |
//
// The following examples illustrates the behavior when joining on two
// or more columns, and only one of them contains NULL.
// If we have have the following:
// CREATE TABLE t(x INT, y INT);
// INSERT INTO t(x, y) VALUES (44,51), (NULL,52);
// | x | y |
// ------
// | 44 | 51 |
// | NULL | 52 |
//
// For the following query:
// SELECT * FROM t AS a FULL OUTER JOIN t AS b USING(x, y);
//
// We expect:
// | x | y |
// ------
// | 44 | 51 |
// | NULL | 52 |
// | NULL | 52 |
hasNull := false
for _, c := range h.eqCols[side] {
if row[c].IsNull() {
Expand Down
Loading

0 comments on commit 2ebd51d

Please sign in to comment.