Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
68678: opt: don't use outer columns as implicit grouping columns r=DrewKimball a=DrewKimball

Previously, the optbuilder logic would add any outer column that is
referenced in a grouping context to the set of grouping columns. This
is correct in some cases, because outer columns are effectively constant,
and can just be removed by norm rules. However, it is incorrect in the
case when there are no grouping columns, e.g. a `ScalarGroupBy`. In that
case, the `ScalarGroupBy` would inadvertently be converted into a `GroupBy`.
This patch modifies the optbuilder to simply ignore outer columns in a
grouping context.

Fixes #68290

Release note: None

68997: stats: use table descriptors instead of IDs r=postamar a=postamar

Previously, the sql stats package inferred various properties about
a table from its ID, like if it is a system or a virtual table.
However the table descriptor is usually readily or easily available,
providing much richer information about a table than its ID.
In particular, this allows us to stop collecting stats for views.

Release note (sql change): table statistics are no longer collected
for views.

69024: colexec: fix bytes corruption for disk-spilled window functions r=DrewKimball a=DrewKimball

This patch fixes the `Truncate` method for bytes columns so that it
updates the offsets to be non-decreasing up to the new `maxSetLength`.
This is necessary in the case when the new `maxSetLength` is greater
than the old one.

This can happen when a window function has a bytes output column and
the `SpillingQueue` that buffers input batches spills to disk. If a
batch has trailing nulls up to the last processed index, and it is
immediately enqueued to disk, `SpillingQueue` does not call `SetLength`
on the batch, so the offsets still need to be updated (in this case by
`Truncate`).

Fixes #60824

Release note: None

69039: github: route pkg/migration PRs to kv-prs sub-team r=irfansharif a=irfansharif

Missed one in #68903.

Release note: None

Co-authored-by: Drew Kimball <[email protected]>
Co-authored-by: Marius Posta <[email protected]>
Co-authored-by: irfan sharif <[email protected]>
  • Loading branch information
4 people committed Aug 17, 2021
5 parents f6684cb + 3b4b631 + 7273596 + de91509 + 65ccca6 commit b69e790
Show file tree
Hide file tree
Showing 31 changed files with 248 additions and 139 deletions.
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@
/pkg/internal/team/ @cockroachdb/test-eng
/pkg/jobs/ @cockroachdb/sql-schema
/pkg/keys/ @cockroachdb/kv-prs
/pkg/migration/ @cockroachdb/kv @cockroachdb/sql-schema
/pkg/migration/ @cockroachdb/kv-prs @cockroachdb/sql-schema
/pkg/multitenant @cockroachdb/unowned
/pkg/release/ @cockroachdb/dev-inf
/pkg/roachpb/ @cockroachdb/kv-prs
Expand Down
6 changes: 4 additions & 2 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/covering"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -343,9 +344,10 @@ func backup(
}
var tableStatistics []*stats.TableStatisticProto
for i := range backupManifest.Descriptors {
if tableDesc, _, _, _ := descpb.FromDescriptor(&backupManifest.Descriptors[i]); tableDesc != nil {
if tbl, _, _, _ := descpb.FromDescriptor(&backupManifest.Descriptors[i]); tbl != nil {
tableDesc := tabledesc.NewBuilder(tbl).BuildImmutableTable()
// Collect all the table stats for this table.
tableStatisticsAcc, err := statsCache.GetTableStats(ctx, tableDesc.GetID())
tableStatisticsAcc, err := statsCache.GetTableStats(ctx, tableDesc)
if err != nil {
// Successfully backed up data is more valuable than table stats that can
// be recomputed after restore, and so if we fail to collect the stats of a
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1830,7 +1830,8 @@ func (r *restoreResumer) ReportResults(ctx context.Context, resultsCh chan<- tre
func (r *restoreResumer) notifyStatsRefresherOfNewTables() {
details := r.job.Details().(jobspb.RestoreDetails)
for i := range details.TableDescs {
r.execCfg.StatsRefresher.NotifyMutation(details.TableDescs[i].GetID(), math.MaxInt32 /* rowsAffected */)
desc := tabledesc.NewBuilder(details.TableDescs[i]).BuildImmutableTable()
r.execCfg.StatsRefresher.NotifyMutation(desc, math.MaxInt32 /* rowsAffected */)
}
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -2273,7 +2273,8 @@ func (r *importResumer) publishTables(ctx context.Context, execCfg *sql.Executor
// rows affected per table, so we use a large number because we want to make
// sure that stats always get created/refreshed here.
for i := range details.Tables {
execCfg.StatsRefresher.NotifyMutation(details.Tables[i].Desc.ID, math.MaxInt32 /* rowsAffected */)
desc := tabledesc.NewBuilder(details.Tables[i].Desc).BuildImmutableTable()
execCfg.StatsRefresher.NotifyMutation(desc, math.MaxInt32 /* rowsAffected */)
}

return nil
Expand Down
3 changes: 3 additions & 0 deletions pkg/col/coldata/bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,9 @@ func (b *Bytes) Truncate(length int) {
// This is a no-op.
return
}
// Ensure that calling Truncate with a length greater than maxSetLength does
// not invalidate the non-decreasing invariant.
b.UpdateOffsetsToBeNonDecreasing(length)
b.data = b.data[:b.offsets[length]]
b.maxSetLength = length
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/authorization.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func getOwnerOfDesc(desc catalog.Descriptor) security.SQLUsername {
if owner.Undefined() {
// If the descriptor is ownerless and the descriptor is part of the system db,
// node is the owner.
if desc.GetID() == keys.SystemDatabaseID || desc.GetParentID() == keys.SystemDatabaseID {
if catalog.IsSystemDescriptor(desc) {
owner = security.NodeUserName()
} else {
// This check is redundant in this case since admin already has privilege
Expand Down
9 changes: 9 additions & 0 deletions pkg/sql/catalog/descriptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,3 +477,12 @@ func FormatSafeDescriptorProperties(w *redact.StringBuilder, desc Descriptor) {
w.Printf(", NumDrainingNames: %d", len(drainingNames))
}
}

// IsSystemDescriptor returns true iff the descriptor is a system or a reserved
// descriptor.
func IsSystemDescriptor(desc Descriptor) bool {
if desc.GetID() <= keys.MaxReservedDescID {
return true
}
return desc.GetParentID() == keys.SystemDatabaseID
}
2 changes: 1 addition & 1 deletion pkg/sql/catalog/lease/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (m *Manager) PublishMultiple(
if err != nil {
return nil, err
}
expectedVersions[id] = expected
expectedVersions[id] = expected.GetVersion()
}

descs := make(map[descpb.ID]catalog.MutableDescriptor)
Expand Down
14 changes: 6 additions & 8 deletions pkg/sql/catalog/lease/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,32 +106,30 @@ func (m *Manager) WaitForNoVersion(
}

// WaitForOneVersion returns once there are no unexpired leases on the
// previous version of the descriptor. It returns the current version.
// previous version of the descriptor. It returns the descriptor with the
// current version.
// After returning there can only be versions of the descriptor >= to the
// returned version. Lease acquisition (see acquire()) maintains the
// invariant that no new leases for desc.Version-1 will be granted once
// desc.Version exists.
func (m *Manager) WaitForOneVersion(
ctx context.Context, id descpb.ID, retryOpts retry.Options,
) (descpb.DescriptorVersion, error) {
var version descpb.DescriptorVersion
) (desc catalog.Descriptor, _ error) {
for lastCount, r := 0, retry.Start(retryOpts); r.Next(); {
var desc catalog.Descriptor
if err := m.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
desc, err = catalogkv.MustGetDescriptorByID(ctx, txn, m.Codec(), id)
return err
}); err != nil {
return 0, err
return nil, err
}

// Check to see if there are any leases that still exist on the previous
// version of the descriptor.
now := m.storage.clock.Now()
descs := []IDVersion{NewIDVersionPrev(desc.GetName(), desc.GetID(), desc.GetVersion())}
version = desc.GetVersion()
count, err := CountLeases(ctx, m.storage.internalExecutor, descs, now)
if err != nil {
return 0, err
return nil, err
}
if count == 0 {
break
Expand All @@ -141,7 +139,7 @@ func (m *Manager) WaitForOneVersion(
log.Infof(ctx, "waiting for %d leases to expire: desc=%v", count, descs)
}
}
return version, nil
return desc, nil
}

// IDVersion represents a descriptor ID, version pair that are
Expand Down
3 changes: 1 addition & 2 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2746,8 +2746,7 @@ func (ex *connExecutor) notifyStatsRefresherOfNewTables(ctx context.Context) {
// Initiate a run of CREATE STATISTICS. We use a large number
// for rowsAffected because we want to make sure that stats always get
// created/refreshed here.
ex.planner.execCfg.StatsRefresher.
NotifyMutation(desc.GetID(), math.MaxInt32 /* rowsAffected */)
ex.planner.execCfg.StatsRefresher.NotifyMutation(desc, math.MaxInt32 /* rowsAffected */)
}
}
}
Expand Down
5 changes: 1 addition & 4 deletions pkg/sql/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,7 @@ func (d *deleteNode) BatchedNext(params runParams) (bool, error) {
}

// Possibly initiate a run of CREATE STATISTICS.
params.ExecCfg().StatsRefresher.NotifyMutation(
d.run.td.tableDesc().GetID(),
d.run.td.lastBatchSize,
)
params.ExecCfg().StatsRefresher.NotifyMutation(d.run.td.tableDesc(), d.run.td.lastBatchSize)

return d.run.td.lastBatchSize > 0, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (d *deleteRangeNode) startExec(params runParams) error {
}

// Possibly initiate a run of CREATE STATISTICS.
params.ExecCfg().StatsRefresher.NotifyMutation(d.desc.GetID(), d.rowCount)
params.ExecCfg().StatsRefresher.NotifyMutation(d.desc, d.rowCount)

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsql_plan_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (dsp *DistSQLPlanner) createStatsPlan(
)

// Estimate the expected number of rows based on existing stats in the cache.
tableStats, err := planCtx.ExtendedEvalCtx.ExecCfg.TableStatsCache.GetTableStats(planCtx.ctx, desc.GetID())
tableStats, err := planCtx.ExtendedEvalCtx.ExecCfg.TableStatsCache.GetTableStats(planCtx.ctx, desc)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/gcjob/gc_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ func (r schemaChangeGCResumer) Resume(ctx context.Context, execCtx interface{})
if len(details.InterleavedIndexes) > 0 {
// Before deleting any indexes, ensure that old versions of the table
// descriptor are no longer in use.
if err := sql.WaitToUpdateLeases(ctx, execCfg.LeaseManager, details.InterleavedTable.ID); err != nil {
_, err := sql.WaitToUpdateLeases(ctx, execCfg.LeaseManager, details.InterleavedTable.ID)
if err != nil {
return err
}
interleavedIndexIDs := make([]descpb.IndexID, len(details.InterleavedIndexes))
Expand Down
14 changes: 5 additions & 9 deletions pkg/sql/gcjob/index_garbage_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand All @@ -44,18 +43,15 @@ func gcIndexes(
// Before deleting any indexes, ensure that old versions of the table descriptor
// are no longer in use. This is necessary in the case of truncate, where we
// schedule a GC Job in the transaction that commits the truncation.
if err := sql.WaitToUpdateLeases(ctx, execCfg.LeaseManager, parentID); err != nil {
parentDesc, err := sql.WaitToUpdateLeases(ctx, execCfg.LeaseManager, parentID)
if err != nil {
return err
}

var parentTable catalog.TableDescriptor
if err := execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
parentTable, err = catalogkv.MustGetTableDescByID(ctx, txn, execCfg.Codec, parentID)
return err
}); err != nil {
return errors.Wrapf(err, "fetching parent table %d", parentID)
parentTable, isTable := parentDesc.(catalog.TableDescriptor)
if !isTable {
return errors.AssertionFailedf("expected descriptor %d to be a table, not %T", parentID, parentDesc)
}

for _, index := range droppedIndexes {
if index.Status != jobspb.SchemaChangeGCProgress_DELETING {
continue
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func (n *insertNode) BatchedNext(params runParams) (bool, error) {
}

// Possibly initiate a run of CREATE STATISTICS.
params.ExecCfg().StatsRefresher.NotifyMutation(n.run.ti.tableDesc().GetID(), n.run.ti.lastBatchSize)
params.ExecCfg().StatsRefresher.NotifyMutation(n.run.ti.tableDesc(), n.run.ti.lastBatchSize)

return n.run.ti.lastBatchSize > 0, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/insert_fast_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func (n *insertFastPathNode) BatchedNext(params runParams) (bool, error) {
n.run.done = true

// Possibly initiate a run of CREATE STATISTICS.
params.ExecCfg().StatsRefresher.NotifyMutation(n.run.ti.ri.Helper.TableDesc.GetID(), len(n.input))
params.ExecCfg().StatsRefresher.NotifyMutation(n.run.ti.ri.Helper.TableDesc, len(n.input))

return true, nil
}
Expand Down
22 changes: 22 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/window
Original file line number Diff line number Diff line change
Expand Up @@ -4279,3 +4279,25 @@ NULL NULL NULL

statement ok
RESET vectorize;

# Regression test for incorrect bytes results when spilling to disk when there
# are multiple partitions and one of the earlier partitions has trailing NULL
# values.
statement ok
DROP TABLE IF EXISTS t;
CREATE TABLE t (x INT, y STRING);
INSERT INTO t VALUES (1, 'NotNull'), (1, NULL), (1, NULL),
(2, 'NotNull'), (2, 'NotNull'), (2, 'NotNull'), (2, 'NotNull');

# Loading a batch with trailing nulls onto disk and then unloading it to
# continue processing should not corrupt the output bytes.
query ITT rowsort
SELECT x, y, first_value(y) OVER (PARTITION BY x ROWS BETWEEN CURRENT ROW AND CURRENT ROW) FROM t;
----
1 NotNull NotNull
1 NULL NULL
1 NULL NULL
2 NotNull NotNull
2 NotNull NotNull
2 NotNull NotNull
2 NotNull NotNull
30 changes: 16 additions & 14 deletions pkg/sql/opt/optbuilder/scalar.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,22 +83,24 @@ func (b *Builder) buildScalar(
// effectively constant) or it is part of a table and we are already
// grouping on the entire PK of that table.
g := inScope.groupby
if !inScope.isOuterColumn(t.id) && !b.allowImplicitGroupingColumn(t.id, g) {
panic(newGroupingError(t.name.ReferenceName()))
if !inScope.isOuterColumn(t.id) {
if !b.allowImplicitGroupingColumn(t.id, g) {
panic(newGroupingError(t.name.ReferenceName()))
}
// We add a new grouping column; these show up both in aggInScope and
// aggOutScope. We only do this when the column is not an outer column;
// otherwise, we may inadvertently convert a ScalarGroupBy to a GroupBy.
//
// Note that normalization rules will trim down the list of grouping
// columns based on FDs, so this is only for the purposes of building a
// valid operator.
aggInCol := g.aggInScope.addColumn(scopeColName(""), t)
b.finishBuildScalarRef(t, inScope, g.aggInScope, aggInCol, nil)
g.groupStrs[symbolicExprStr(t)] = aggInCol

g.aggOutScope.appendColumn(aggInCol)
}

// We add a new grouping column; these show up both in aggInScope and
// aggOutScope.
//
// Note that normalization rules will trim down the list of grouping
// columns based on FDs, so this is only for the purposes of building a
// valid operator.
aggInCol := g.aggInScope.addColumn(scopeColName(""), t)
b.finishBuildScalarRef(t, inScope, g.aggInScope, aggInCol, nil)
g.groupStrs[symbolicExprStr(t)] = aggInCol

g.aggOutScope.appendColumn(aggInCol)

return b.finishBuildScalarRef(t, g.aggOutScope, outScope, outCol, colRefs)
}

Expand Down
Loading

0 comments on commit b69e790

Please sign in to comment.