Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
84618: sql: add new index_recommendation column r=maryliag a=maryliag

This commit adds a new column index_recommendations
STRING[] to:
crdb_internal.node_statement_statistics
crdb_internal.cluster_statement_statistics
system.statement_statistics
crdb_internal.statement_statistics

Part of #83782

Release note (sql change): Adding new column index_recommendations
to crdb_internal.node_statement_statistics,
crdb_internal.cluster_statement_statistics, system.statement_statistics
and crdb_internal.statement_statistics

84674: changefeedccl/schemafeed: ignore unwatched column drops  r=jayshrivastava a=jayshrivastava

Closes #80982

Release note (enterprise change): Previously, if you dropped
a column with the schema_change_policy 'stop', the changefeed
would stop. Dropping a column with a different policy would
result in previous rows being retransmitted with the
dropped column omitted.

In some cases, a changefeed may target specific columns
(a column family) of a table. In these cases, if a non-target
column is dropped, it does not make sense to stop the changefeed
or retransmit values because the column was not visible to
a consumer sink to begin with.

With this change, dropping an non-target column from a
table will not stop the changefeed when the
schema_change_policy is 'stop'. With any other policy,
dropping a non-target column will not trigger a backfill.

85034: sql: better tracing of apply joins r=yuzefovich a=yuzefovich

**sql: add tracing spans for each iteration of apply join**

This commit creates a separate tracing span for each iteration of apply
join and recursive CTE execution to make the traces easier to digest.

Release note: None

**sql: log DistSQL diagrams when tracing is enabled**

This commit makes it so that we always include all DistSQL diagrams into
the trace. This could be especially helpful when executing apply join
iterations to understand the plan that each iteration gets. This commit
also removes an environment variable that used to control this logging
previously since I don't think anyone has used it in years now that we
have better tools for debugging (like a stmt bundle).

Informs: #https://github.com/cockroachlabs/support/issues/1681.

Release note: None

85042: logictest: remove spec-planning configs r=yuzefovich a=yuzefovich

This commit removes `local-spec-planning`, `fakedist-spec-planning`, and
`5node-spec-planning` logic test configurations since they seem to be
not very useful at the moment. They were introduced to support the new
DistSQL spec factory, but that work has been postponed with no active
development at the moment, so it seems silly to run most of the logic
tests through the configs that are largely duplicate of the other
default ones (because most of the `Construct*` methods are not
implemented in the new factory). Once the active development on the new
factory resumes, it'll be pretty easy to bring them back to life, but at
the moment let's reduce the amount of tests we run without really losing
any test coverage.

Release note: None

Co-authored-by: Marylia Gutierrez <[email protected]>
Co-authored-by: Jayant Shrivastava <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
4 people committed Jul 26, 2022
5 parents 6b1c99b + 748c4c0 + 74bccc7 + 490dc83 + d1164b0 commit d7b901d
Show file tree
Hide file tree
Showing 47 changed files with 769 additions and 155 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -283,4 +283,4 @@ trace.jaeger.agent string the address of a Jaeger agent to receive traces using
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https://<ui>/#/debug/tracez
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 22.1-30 set the active cluster version in the format '<major>.<minor>'
version version 22.1-32 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,6 @@
<tr><td><code>trace.opentelemetry.collector</code></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.</td></tr>
<tr><td><code>trace.span_registry.enabled</code></td><td>boolean</td><td><code>true</code></td><td>if set, ongoing traces can be seen at https://<ui>/#/debug/tracez</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>22.1-30</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>22.1-32</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
189 changes: 189 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,195 @@ func TestChangefeedUserDefinedTypes(t *testing.T) {
cdcTest(t, testFn)
}

// If the schema_change_policy is 'stop' and we drop columns which are not
// targeted by the changefeed, it should not stop.
func TestNoStopAfterNonTargetColumnDrop(t *testing.T) {
defer leaktest.AfterTest(t)()
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

sqlDB.Exec(t, `CREATE TABLE hasfams (id int primary key, a string, b string, c string, FAMILY id_a (id, a), FAMILY b_and_c (b, c))`)
sqlDB.Exec(t, `INSERT INTO hasfams values (0, 'a', 'b', 'c')`)

// Open up the changefeed.
cf := feed(t, f, `CREATE CHANGEFEED FOR TABLE hasfams FAMILY b_and_c WITH schema_change_policy='stop'`)
defer closeFeed(t, cf)
assertPayloads(t, cf, []string{
`hasfams.b_and_c: [0]->{"after": {"b": "b", "c": "c"}}`,
})

sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN a`)
sqlDB.Exec(t, `INSERT INTO hasfams VALUES (1, 'b1', 'c1')`)

assertPayloads(t, cf, []string{
`hasfams.b_and_c: [1]->{"after": {"b": "b1", "c": "c1"}}`,
})

// Check that dropping a watched column still stops the changefeed.
sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN b`)
if _, err := cf.Next(); !testutils.IsError(err, `schema change occurred at`) {
t.Errorf(`expected "schema change occurred at ..." got: %+v`, err.Error())
}
}

cdcTest(t, testFn, feedTestOmitSinks("sinkless"))
}

// If we drop columns which are not targeted by the changefeed, it should not backfill.
func TestNoBackfillAfterNonTargetColumnDrop(t *testing.T) {
defer leaktest.AfterTest(t)()
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

sqlDB.Exec(t, `CREATE TABLE hasfams (id int primary key, a string, b string, c string, FAMILY id_a (id, a), FAMILY b_and_c (b, c))`)
sqlDB.Exec(t, `INSERT INTO hasfams values (0, 'a', 'b', 'c')`)

// Open up the changefeed.
cf := feed(t, f, `CREATE CHANGEFEED FOR TABLE hasfams FAMILY b_and_c`)
defer closeFeed(t, cf)
assertPayloads(t, cf, []string{
`hasfams.b_and_c: [0]->{"after": {"b": "b", "c": "c"}}`,
})

sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN a`)
sqlDB.Exec(t, `INSERT INTO hasfams VALUES (1, 'b1', 'c1')`)
assertPayloads(t, cf, []string{
`hasfams.b_and_c: [1]->{"after": {"b": "b1", "c": "c1"}}`,
})

// Check that dropping a watched column still backfills.
sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN c`)
assertPayloads(t, cf, []string{
`hasfams.b_and_c: [0]->{"after": {"b": "b"}}`,
`hasfams.b_and_c: [1]->{"after": {"b": "b1"}}`,
})
}

cdcTest(t, testFn, feedTestOmitSinks("sinkless"))
}

func TestChangefeedColumnDropsWithFamilyAndNonFamilyTargets(t *testing.T) {
defer leaktest.AfterTest(t)()
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

sqlDB.Exec(t, `CREATE TABLE hasfams (id int primary key, a string, b string, c string, FAMILY id_a (id, a), FAMILY b_and_c (b, c))`)
sqlDB.Exec(t, `CREATE TABLE nofams (id int primary key, a string, b string, c string)`)
sqlDB.Exec(t, `INSERT INTO hasfams values (0, 'a', 'b', 'c')`)
sqlDB.Exec(t, `INSERT INTO nofams values (0, 'a', 'b', 'c')`)

// Open up the changefeed.
cf := feed(t, f, `CREATE CHANGEFEED FOR TABLE hasfams FAMILY b_and_c, TABLE nofams`)
defer closeFeed(t, cf)
assertPayloads(t, cf, []string{
`hasfams.b_and_c: [0]->{"after": {"b": "b", "c": "c"}}`,
`nofams: [0]->{"after": {"a": "a", "b": "b", "c": "c", "id": 0}}`,
})

// Dropping an unwatched column from hasfams does not affect the changefeed.
sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN a`)
sqlDB.Exec(t, `INSERT INTO hasfams VALUES (1, 'b1', 'c1')`)
sqlDB.Exec(t, `INSERT INTO nofams VALUES (1, 'a1', 'b1', 'c1')`)
assertPayloads(t, cf, []string{
`hasfams.b_and_c: [1]->{"after": {"b": "b1", "c": "c1"}}`,
`nofams: [1]->{"after": {"a": "a1", "b": "b1", "c": "c1", "id": 1}}`,
})

// Check that dropping a watched column will backfill the changefeed.
sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN b`)
assertPayloads(t, cf, []string{
`hasfams.b_and_c: [0]->{"after": {"c": "c"}}`,
`hasfams.b_and_c: [1]->{"after": {"c": "c1"}}`,
})

// Check that dropping a watched column will backfill the changefeed.
sqlDB.Exec(t, `ALTER TABLE nofams DROP COLUMN b`)
assertPayloads(t, cf, []string{
`nofams: [0]->{"after": {"a": "a", "c": "c", "id": 0}}`,
`nofams: [1]->{"after": {"a": "a1", "c": "c1", "id": 1}}`,
})
}

cdcTest(t, testFn, feedTestOmitSinks("sinkless"))
}

func TestChangefeedColumnDropsOnMultipleFamiliesWithTheSameName(t *testing.T) {
defer leaktest.AfterTest(t)()
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

sqlDB.Exec(t, `CREATE TABLE hasfams (id int primary key, a string, b string, c string, FAMILY id_a (id, a), FAMILY b_and_c (b, c))`)
sqlDB.Exec(t, `CREATE TABLE alsohasfams (id int primary key, a string, b string, c string, FAMILY id_a (id, a), FAMILY b_and_c (b, c))`)
sqlDB.Exec(t, `INSERT INTO hasfams values (0, 'a', 'b', 'c')`)
sqlDB.Exec(t, `INSERT INTO alsohasfams values (0, 'a', 'b', 'c')`)

// Open up the changefeed.
cf := feed(t, f, `CREATE CHANGEFEED FOR TABLE hasfams FAMILY b_and_c, TABLE alsohasfams FAMILY id_a`)
defer closeFeed(t, cf)
assertPayloads(t, cf, []string{
`hasfams.b_and_c: [0]->{"after": {"b": "b", "c": "c"}}`,
`alsohasfams.id_a: [0]->{"after": {"a": "a", "id": 0}}`,
})

// Dropping an unwatched column from hasfams does not affect the changefeed.
sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN a`)
sqlDB.Exec(t, `INSERT INTO hasfams VALUES (1, 'b1', 'c1')`)
sqlDB.Exec(t, `INSERT INTO alsohasfams VALUES (1, 'a1', 'b1', 'c1')`)
assertPayloads(t, cf, []string{
`hasfams.b_and_c: [1]->{"after": {"b": "b1", "c": "c1"}}`,
`alsohasfams.id_a: [1]->{"after": {"a": "a1", "id": 1}}`,
})

// Check that dropping a watched column will backfill the changefeed.
sqlDB.Exec(t, `ALTER TABLE alsohasfams DROP COLUMN a`)
assertPayloads(t, cf, []string{
`alsohasfams.id_a: [0]->{"after": {"id": 0}}`,
`alsohasfams.id_a: [1]->{"after": {"id": 1}}`,
})

// Check that dropping a watched column will backfill the changefeed.
sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN b`)
assertPayloads(t, cf, []string{
`hasfams.b_and_c: [0]->{"after": {"c": "c"}}`,
`hasfams.b_and_c: [1]->{"after": {"c": "c1"}}`,
})
}

cdcTest(t, testFn, feedTestOmitSinks("sinkless"))
}

func TestChangefeedColumnDropsOnTheSameTableWithMultipleFamilies(t *testing.T) {
defer leaktest.AfterTest(t)()
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

sqlDB.Exec(t, `CREATE TABLE hasfams (id int primary key, a string, b string, c string, FAMILY id_a (id, a), FAMILY b_and_c (b, c))`)
sqlDB.Exec(t, `INSERT INTO hasfams values (0, 'a', 'b', 'c')`)

// Open up the changefeed.
cf := feed(t, f, `CREATE CHANGEFEED FOR TABLE hasfams FAMILY id_a, TABLE hasfams FAMILY b_and_c`)
defer closeFeed(t, cf)
assertPayloads(t, cf, []string{
`hasfams.b_and_c: [0]->{"after": {"b": "b", "c": "c"}}`,
`hasfams.id_a: [0]->{"after": {"a": "a", "id": 0}}`,
})

// Check that dropping a watched column will backfill the changefeed.
sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN a`)
assertPayloads(t, cf, []string{
`hasfams.id_a: [0]->{"after": {"id": 0}}`,
})

// Check that dropping a watched column will backfill the changefeed.
sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN b`)
assertPayloads(t, cf, []string{
`hasfams.b_and_c: [0]->{"after": {"c": "c"}}`,
})
}

cdcTest(t, testFn, feedTestOmitSinks("sinkless"))
}

func TestChangefeedExternalIODisabled(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
21 changes: 20 additions & 1 deletion pkg/ccl/changefeedccl/changefeedbase/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ type Target struct {
type StatementTimeName string

type targetsByTable struct {
wholeTable *Target
// wholeTable is a single target set when the target has no specified column
// families
wholeTable *Target
// byFamilyName is populated only if there are multiple column family targets
// for the table
byFamilyName map[string]Target
}

Expand Down Expand Up @@ -83,6 +87,21 @@ func (ts *Targets) EachTarget(f func(Target) error) error {
return nil
}

// GetSpecifiedColumnFamilies returns a set of watched families
// belonging to the table.
func (ts *Targets) GetSpecifiedColumnFamilies(tableID descpb.ID) map[string]struct{} {
target, exists := ts.m[tableID]
if !exists {
return make(map[string]struct{})
}

families := make(map[string]struct{}, len(target.byFamilyName))
for family := range target.byFamilyName {
families[family] = struct{}{}
}
return families
}

// EachTableID iterates over unique TableIDs referenced in Targets.
func (ts *Targets) EachTableID(f func(descpb.ID) error) error {
for id := range ts.m {
Expand Down
15 changes: 11 additions & 4 deletions pkg/ccl/changefeedccl/kvfeed/kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,11 +287,18 @@ func (f *kvFeed) run(ctx context.Context) (err error) {
if err != nil {
return err
}

// Detect whether the event corresponds to a primary index change. Also
// detect whether that primary index change corresponds to any change in
// the primary key or in the set of visible columns. If it corresponds to
// no such change, than it may be a column being dropped physically and
// should not trigger a failure in the `stop` policy.
// detect whether the change corresponds to any change in the set of visible
// primary key columns.
//
// If a primary key is being changed and there are no changes in the
// primary key's columns, this may be due to a column which was dropped
// logically before and is presently being physically dropped.
//
// If is no change in the primary key columns, then a primary key change
// should not trigger a failure in the `stop` policy because this change is
// effectively invisible to consumers.
primaryIndexChange, noColumnChanges := isPrimaryKeyChange(events)
if primaryIndexChange && (noColumnChanges ||
f.schemaChangePolicy != changefeedbase.OptSchemaChangePolicyStop) {
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/schemafeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ go_library(
"//pkg/sql/sessiondata",
"//pkg/sql/sqlutil",
"//pkg/storage",
"//pkg/util",
"//pkg/util/contextutil",
"//pkg/util/encoding",
"//pkg/util/hlc",
Expand Down
13 changes: 12 additions & 1 deletion pkg/ccl/changefeedccl/schemafeed/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@

package schemafeed

import "strings"
import (
"strings"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
)

const TestingAllEventFilter = "testing"

Expand All @@ -35,3 +40,9 @@ func PrintTableEventType(t tableEventType) string {
}
return strings.Join(strs, "|")
}

func CreateChangefeedTargets(tableID descpb.ID) changefeedbase.Targets {
targets := changefeedbase.Targets{}
targets.Add(changefeedbase.Target{TableID: tableID})
return targets
}
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/schemafeed/schema_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ func (tf *schemaFeed) validateDescriptor(
Before: lastVersion,
After: desc,
}
shouldFilter, err := tf.filter.shouldFilter(ctx, e)
shouldFilter, err := tf.filter.shouldFilter(ctx, e, tf.targets)
log.VEventf(ctx, 1, "validate shouldFilter %v %v", formatEvent(e), shouldFilter)
if err != nil {
return err
Expand Down
62 changes: 60 additions & 2 deletions pkg/ccl/changefeedccl/schemafeed/table_event_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import (

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scpb"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -122,7 +124,9 @@ func classifyTableEvent(e TableEvent) tableEventTypeSet {
// permitted by the filter.
type tableEventFilter map[tableEventType]bool

func (filter tableEventFilter) shouldFilter(ctx context.Context, e TableEvent) (bool, error) {
func (filter tableEventFilter) shouldFilter(
ctx context.Context, e TableEvent, targets changefeedbase.Targets,
) (bool, error) {
et := classifyTableEvent(e)

// Truncation events are not ignored and return an error.
Expand All @@ -141,7 +145,19 @@ func (filter tableEventFilter) shouldFilter(ctx context.Context, e TableEvent) (
shouldFilter := true
for filterEvent, filterPolicy := range filter {
if et.Contains(filterEvent) && !filterPolicy {
shouldFilter = false
// Apply changefeed target-specific filters.
// In some cases, a drop column event should be filtered out.
// For example, we may be dropping a column which is not
// monitored by the changefeed.
if filterEvent == tableEventDropColumn {
sf, err := shouldFilterDropColumnEvent(e, targets)
if err != nil {
return false, err
}
shouldFilter = sf
} else {
shouldFilter = false
}
}
et = et.Clear(filterEvent)
}
Expand All @@ -151,6 +167,48 @@ func (filter tableEventFilter) shouldFilter(ctx context.Context, e TableEvent) (
return shouldFilter, nil
}

// shouldFilterDropColumnEvent decides if we should filter out a drop column event.
func shouldFilterDropColumnEvent(e TableEvent, targets changefeedbase.Targets) (bool, error) {
if watched, err := droppedColumnIsWatched(e, targets); err != nil {
return false, err
} else if watched {
return false, nil
}
return true, nil
}

// Returns true if the changefeed targets a column which has a drop mutation inside the table event.
func droppedColumnIsWatched(e TableEvent, targets changefeedbase.Targets) (bool, error) {
// If no column families are specified, then all columns are targeted.
specifiedColumnFamiliesForTable := targets.GetSpecifiedColumnFamilies(e.Before.GetID())
if len(specifiedColumnFamiliesForTable) == 0 {
return true, nil
}

var watchedColumnIDs util.FastIntSet
if err := e.Before.ForeachFamily(func(family *descpb.ColumnFamilyDescriptor) error {
if _, ok := specifiedColumnFamiliesForTable[family.Name]; ok {
for _, columnID := range family.ColumnIDs {
watchedColumnIDs.Add(int(columnID))
}
}
return nil
}); err != nil {
return false, err
}

for _, m := range e.After.AllMutations() {
if m.AsColumn() == nil || m.AsColumn().IsHidden() {
continue
}
if m.Dropped() && m.WriteAndDeleteOnly() && watchedColumnIDs.Contains(int(m.AsColumn().GetID())) {
return true, nil
}
}

return false, nil
}

func hasNewVisibleColumnDropBackfillMutation(e TableEvent) (res bool) {
// Make sure that the old descriptor *doesn't* have the same mutation to avoid adding
// the same scan boundary more than once.
Expand Down
Loading

0 comments on commit d7b901d

Please sign in to comment.