Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
84306: asim: add load splits r=kvoli a=kvoli

This patch adds load based splitting to the allocation simulator. It
uses the production code path, `pkg/kv/kvserver/split`, to decide when
and which key to split on. To enable split recommendations from this
package, load events are recorded to the splitter and split suggestions
enqueued into the simulator split queue. Split keys are likewise found
via consulting the split decider first and when not found, the split
queue wil instead split evenly (50/50) on the number of keys instead.

resolves #82630

Release note: None

84451: dev,genbzl: add support for generating syntax diagrams r=ajwerner a=ajwerner

Fixes #84443.

Release note: None

84571: changefeedccl: prerequisite changes for `DROP COLUMN` r=ajwerner a=ajwerner

This is the first two commits from #84563. They are needed to ensure that we don't change the behavior of `DROP COLUMN` when we support it in the declarative schema changer. The issue is that there the protocol is to create a new primary index and swap to it. The column becomes a non-public before the index swap, so the primary index swap is no longer a logical schema change of any kind. With this change, we can now detect that and properly restart as opposed to stop.

Also, the newly added testing uncovers some badness in how we classify some other schema changes, and should generally be useful.

Co-authored-by: Austen McClernon <[email protected]>
Co-authored-by: Andrew Werner <[email protected]>
  • Loading branch information
3 people committed Jul 18, 2022
4 parents b29f48b + 163713a + 2d9a886 + e6cf908 commit 7191d81
Show file tree
Hide file tree
Showing 37 changed files with 1,613 additions and 160 deletions.
2 changes: 1 addition & 1 deletion dev
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
set -euo pipefail

# Bump this counter to force rebuilding `dev` on all machines.
DEV_VERSION=42
DEV_VERSION=43

THIS_DIR=$(cd "$(dirname "$0")" && pwd)
BINARY_DIR=$THIS_DIR/bin/dev-versions
Expand Down
2 changes: 2 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -992,6 +992,7 @@ GO_TARGETS = [
"//pkg/kv/kvserver/allocator:allocator",
"//pkg/kv/kvserver/apply:apply",
"//pkg/kv/kvserver/apply:apply_test",
"//pkg/kv/kvserver/asim/config:config",
"//pkg/kv/kvserver/asim/state:state",
"//pkg/kv/kvserver/asim/state:state_test",
"//pkg/kv/kvserver/asim/workload:workload",
Expand Down Expand Up @@ -2189,6 +2190,7 @@ GET_X_DATA_TARGETS = [
"//pkg/kv/kvserver/allocator/storepool:get_x_data",
"//pkg/kv/kvserver/apply:get_x_data",
"//pkg/kv/kvserver/asim:get_x_data",
"//pkg/kv/kvserver/asim/config:get_x_data",
"//pkg/kv/kvserver/asim/state:get_x_data",
"//pkg/kv/kvserver/asim/workload:get_x_data",
"//pkg/kv/kvserver/batcheval:get_x_data",
Expand Down
31 changes: 22 additions & 9 deletions pkg/ccl/changefeedccl/kvfeed/kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,13 +283,22 @@ func (f *kvFeed) run(ctx context.Context) (err error) {

highWater := rangeFeedResumeFrontier.Frontier()
boundaryType := jobspb.ResolvedSpan_BACKFILL
if f.schemaChangePolicy == changefeedbase.OptSchemaChangePolicyStop {
boundaryType = jobspb.ResolvedSpan_EXIT
} else if events, err := f.tableFeed.Peek(ctx, highWater.Next()); err == nil && isPrimaryKeyChange(events) {
boundaryType = jobspb.ResolvedSpan_RESTART
} else if err != nil {
events, err := f.tableFeed.Peek(ctx, highWater.Next())
if err != nil {
return err
}
// Detect whether the event corresponds to a primary index change. Also
// detect whether that primary index change corresponds to any change in
// the primary key or in the set of visible columns. If it corresponds to
// no such change, than it may be a column being dropped physically and
// should not trigger a failure in the `stop` policy.
primaryIndexChange, noColumnChanges := isPrimaryKeyChange(events)
if primaryIndexChange && (noColumnChanges ||
f.schemaChangePolicy != changefeedbase.OptSchemaChangePolicyStop) {
boundaryType = jobspb.ResolvedSpan_RESTART
} else if f.schemaChangePolicy == changefeedbase.OptSchemaChangePolicyStop {
boundaryType = jobspb.ResolvedSpan_EXIT
}
// Resolve all of the spans as a boundary if the policy indicates that
// we should do so.
if f.schemaChangePolicy != changefeedbase.OptSchemaChangePolicyNoBackfill ||
Expand All @@ -306,13 +315,17 @@ func (f *kvFeed) run(ctx context.Context) (err error) {
}
}

func isPrimaryKeyChange(events []schemafeed.TableEvent) bool {
func isPrimaryKeyChange(
events []schemafeed.TableEvent,
) (isPrimaryIndexChange, hasNoColumnChanges bool) {
hasNoColumnChanges = true
for _, ev := range events {
if schemafeed.IsPrimaryIndexChange(ev) {
return true
if ok, noColumnChange := schemafeed.IsPrimaryIndexChange(ev); ok {
isPrimaryIndexChange = true
hasNoColumnChanges = hasNoColumnChanges && noColumnChange
}
}
return false
return isPrimaryIndexChange, isPrimaryIndexChange && hasNoColumnChanges
}

// filterCheckpointSpans filters spans which have already been completed,
Expand Down
17 changes: 17 additions & 0 deletions pkg/ccl/changefeedccl/schemafeed/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
load("//build:STRINGER.bzl", "stringer")

go_library(
name = "schemafeed",
srcs = [
"gen-tableEventType-stringer", # keep
"metrics.go",
"schema_feed.go",
"table_event_filter.go",
Expand Down Expand Up @@ -45,20 +47,26 @@ go_test(
name = "schemafeed_test",
size = "medium",
srcs = [
"helpers_test.go",
"main_test.go",
"schema_feed_test.go",
"table_event_filter_datadriven_test.go",
"table_event_filter_test.go",
],
data = glob(["testdata/**"]),
embed = [":schemafeed"],
deps = [
"//pkg/base",
"//pkg/ccl/changefeedccl/changefeedbase",
"//pkg/ccl/changefeedccl/schemafeed/schematestutils",
"//pkg/ccl/utilccl",
"//pkg/jobs/jobspb",
"//pkg/keys",
"//pkg/roachpb",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/tabledesc",
Expand All @@ -70,10 +78,19 @@ go_test(
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/randutil",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_errors//:errors",
"@com_github_gogo_protobuf//proto",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
],
)

stringer(
name = "gen-tableEventType-stringer",
src = "table_event_filter.go",
additional_args = ["--trimprefix=tableEvent"],
typ = "tableEventType",
)

get_x_data(name = "get_x_data")
37 changes: 37 additions & 0 deletions pkg/ccl/changefeedccl/schemafeed/helpers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package schemafeed

import "strings"

const TestingAllEventFilter = "testing"

func init() {
schemaChangeEventFilters[TestingAllEventFilter] = tableEventFilter{
tableEventDropColumn: false,
tableEventAddColumnWithBackfill: false,
tableEventAddColumnNoBackfill: false,
tableEventUnknown: false,
tableEventPrimaryKeyChange: false,
tableEventLocalityRegionalByRowChange: false,
tableEventAddHiddenColumn: false,
}
}

var ClassifyEvent = classifyTableEvent

func PrintTableEventType(t tableEventType) string {
var strs []string
for i := 0; i < 63; i++ {
if t&1<<i != 0 {
strs = append(strs, tableEventType(1<<i).String())
}
}
return strings.Join(strs, "|")
}
155 changes: 104 additions & 51 deletions pkg/ccl/changefeedccl/schemafeed/table_event_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package schemafeed

import (
"context"
"strings"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
Expand All @@ -19,33 +20,38 @@ import (

type tableEventType uint64

//go:generate stringer --type tableEventType --trimprefix tableEvent

const (
tableEventTypeUnknown tableEventType = 0
tableEventTypeAddColumnNoBackfill tableEventType = 1 << (iota - 1)
tableEventTypeAddColumnWithBackfill
tableEventTypeDropColumn
tableEventUnknown tableEventType = iota
tableEventAddColumnNoBackfill
tableEventAddColumnWithBackfill
tableEventDropColumn
tableEventTruncate
tableEventPrimaryKeyChange
tableEventLocalityRegionalByRowChange
tableEventAddHiddenColumn
numEventTypes int = iota
)

type tableEventTypeSet uint64

var (
defaultTableEventFilter = tableEventFilter{
tableEventTypeDropColumn: false,
tableEventTypeAddColumnWithBackfill: false,
tableEventTypeAddColumnNoBackfill: true,
tableEventTypeUnknown: true,
tableEventDropColumn: false,
tableEventAddColumnWithBackfill: false,
tableEventAddColumnNoBackfill: true,
tableEventUnknown: true,
tableEventPrimaryKeyChange: false,
tableEventLocalityRegionalByRowChange: false,
tableEventAddHiddenColumn: true,
}

columnChangeTableEventFilter = tableEventFilter{
tableEventTypeDropColumn: false,
tableEventTypeAddColumnWithBackfill: false,
tableEventTypeAddColumnNoBackfill: false,
tableEventTypeUnknown: true,
tableEventDropColumn: false,
tableEventAddColumnWithBackfill: false,
tableEventAddColumnNoBackfill: false,
tableEventUnknown: true,
tableEventPrimaryKeyChange: false,
tableEventLocalityRegionalByRowChange: false,
tableEventAddHiddenColumn: true,
Expand All @@ -59,46 +65,56 @@ var (

// Contains returns true if the receiver includes the given event
// types.
func (e tableEventType) Contains(event tableEventType) bool {
return e&event == event
func (e tableEventTypeSet) Contains(event tableEventType) bool {
return e&event.mask() != 0
}

func (e tableEventType) mask() tableEventTypeSet {
if e == 0 {
return 0
}
return 1 << (e - 1)
}

// Clear returns a new tableEventType with the given event types
// cleared.
func (e tableEventType) Clear(event tableEventType) tableEventType {
return e & (^event)
func (e tableEventTypeSet) Clear(event tableEventType) tableEventTypeSet {
return e & (^event.mask())
}

func classifyTableEvent(e TableEvent) tableEventType {
et := tableEventTypeUnknown
if primaryKeyChanged(e) {
et = et | tableEventPrimaryKeyChange
}

if newVisibleColumnBackfillComplete(e) {
et = et | tableEventTypeAddColumnWithBackfill
}
func (e tableEventTypeSet) empty() bool { return e == 0 }

if newHiddenColumnBackfillComplete(e) {
et = et | tableEventAddHiddenColumn
func (e tableEventTypeSet) String() string {
if e.empty() {
return tableEventUnknown.String()
}

if newVisibleColumnNoBackfill(e) {
et = et | tableEventTypeAddColumnNoBackfill
}

if hasNewVisibleColumnDropBackfillMutation(e) {
et = et | tableEventTypeDropColumn
}

if tableTruncated(e) {
et = et | tableEventTruncate
var strs []string
for et := tableEventType(1); int(et) < numEventTypes; et++ {
if e.Contains(et) {
strs = append(strs, et.String())
}
}
return strings.Join(strs, "|")
}

if regionalByRowChanged(e) {
et = et | tableEventLocalityRegionalByRowChange
func classifyTableEvent(e TableEvent) tableEventTypeSet {
var et tableEventTypeSet
for _, c := range []struct {
eventType tableEventType
predicate func(event TableEvent) bool
}{
{tableEventPrimaryKeyChange, primaryKeyChanged},
{tableEventAddColumnWithBackfill, newVisibleColumnBackfillComplete},
{tableEventAddHiddenColumn, newHiddenColumnBackfillComplete},
{tableEventAddColumnNoBackfill, newVisibleColumnNoBackfill},
{tableEventDropColumn, hasNewVisibleColumnDropBackfillMutation},
{tableEventTruncate, tableTruncated},
{tableEventLocalityRegionalByRowChange, regionalByRowChanged},
} {
if c.predicate(e) {
et |= c.eventType.mask()
}
}

return et
}

Expand All @@ -114,8 +130,8 @@ func (filter tableEventFilter) shouldFilter(ctx context.Context, e TableEvent) (
return false, errors.Errorf(`"%s" was truncated`, e.Before.GetName())
}

if et == tableEventTypeUnknown {
shouldFilter, ok := filter[tableEventTypeUnknown]
if et.empty() {
shouldFilter, ok := filter[tableEventUnknown]
if !ok {
return false, errors.AssertionFailedf("policy does not specify how to handle event type %v", et)
}
Expand Down Expand Up @@ -205,24 +221,61 @@ func regionalByRowChanged(e TableEvent) bool {
return e.Before.IsLocalityRegionalByRow() != e.After.IsLocalityRegionalByRow()
}

func hasNewPrimaryIndexWithNoVisibleColumnChanges(e TableEvent) bool {
before, after := e.Before.GetPrimaryIndex(), e.After.GetPrimaryIndex()
if before.GetID() == after.GetID() ||
before.NumKeyColumns() != after.NumKeyColumns() {
return false
}
for i, n := 0, before.NumKeyColumns(); i < n; i++ {
if before.GetKeyColumnID(i) != after.GetKeyColumnID(i) {
return false
}
}
collectPublicStoredColumns := func(
idx catalog.Index, tab catalog.TableDescriptor,
) (cols catalog.TableColSet) {
for i, n := 0, idx.NumPrimaryStoredColumns(); i < n; i++ {
colID := idx.GetStoredColumnID(i)
col, _ := tab.FindColumnWithID(colID)
if col.Public() {
cols.Add(colID)
}
}
return cols
}
storedBefore := collectPublicStoredColumns(before, e.Before)
storedAfter := collectPublicStoredColumns(after, e.After)
return storedBefore.Len() == storedAfter.Len() &&
storedBefore.Difference(storedAfter).Empty()
}

// IsPrimaryIndexChange returns true if the event corresponds to a change
// in the primary index.
func IsPrimaryIndexChange(e TableEvent) bool {
et := classifyTableEvent(e)
return et.Contains(tableEventPrimaryKeyChange)
// in the primary index. It also returns whether the primary index change
// corresponds to any change in the visible column set or key ordering.
// This is useful because when the declarative schema changer drops a column,
// it does so by adding a new primary index with the column excluded and
// then swaps to the new primary index. The column logically disappears
// before the index swap occurs. We want to detect the case of this index
// swap and not stop changefeeds which are programmed to stop upon schema
// changes.
func IsPrimaryIndexChange(e TableEvent) (isPrimaryIndexChange, noVisibleOrderOrColumnChanges bool) {
isPrimaryIndexChange = classifyTableEvent(e).Contains(tableEventPrimaryKeyChange)
if isPrimaryIndexChange {
noVisibleOrderOrColumnChanges = hasNewPrimaryIndexWithNoVisibleColumnChanges(e)
}
return isPrimaryIndexChange, noVisibleOrderOrColumnChanges
}

// IsOnlyPrimaryIndexChange returns to true if the event corresponds
// to a change in the primary index and _only_ a change in the primary
// index.
func IsOnlyPrimaryIndexChange(e TableEvent) bool {
et := classifyTableEvent(e)
return et == tableEventPrimaryKeyChange
return classifyTableEvent(e) == tableEventPrimaryKeyChange.mask()
}

// IsRegionalByRowChange returns true if the event corresponds to a
// change in the table's locality to or from RegionalByRow.
func IsRegionalByRowChange(e TableEvent) bool {
et := classifyTableEvent(e)
return et.Contains(tableEventLocalityRegionalByRowChange)
return classifyTableEvent(e).Contains(tableEventLocalityRegionalByRowChange)
}
Loading

0 comments on commit 7191d81

Please sign in to comment.