From 163713a69f5962bc75a23f2c09751be376e7ecac Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Wed, 22 Jun 2022 21:51:15 +0000 Subject: [PATCH 1/4] asim: add load splits This patch adds load based splitting to the allocation simulator. It uses the production code path, `pkg/kv/kvserver/split`, to decide when and which key to split on. To enable split recommendations from this package, load events are recorded to the splitter and split suggestions enqueued into the simulator split queue. Split keys are likewise found via consulting the split decider first and when not found, the split queue wil instead split evenly (50/50) on the number of keys instead. resolves #82630 Release note: None --- pkg/BUILD.bazel | 2 + pkg/kv/kvserver/asim/BUILD.bazel | 4 +- pkg/kv/kvserver/asim/asim.go | 33 +++-- pkg/kv/kvserver/asim/asim_test.go | 20 +-- pkg/kv/kvserver/asim/config/BUILD.bazel | 11 ++ pkg/kv/kvserver/asim/{ => config}/settings.go | 50 +++++-- pkg/kv/kvserver/asim/metrics_tracker_test.go | 3 +- pkg/kv/kvserver/asim/replicate_queue.go | 34 +++-- pkg/kv/kvserver/asim/replicate_queue_test.go | 10 +- pkg/kv/kvserver/asim/state/BUILD.bazel | 5 + pkg/kv/kvserver/asim/state/config_loader.go | 4 +- pkg/kv/kvserver/asim/state/helpers.go | 3 +- pkg/kv/kvserver/asim/state/impl.go | 91 ++++++++++-- pkg/kv/kvserver/asim/state/load.go | 30 +++- pkg/kv/kvserver/asim/state/split_decider.go | 128 +++++++++++++++++ .../kvserver/asim/state/split_decider_test.go | 135 ++++++++++++++++++ pkg/kv/kvserver/asim/state/state.go | 25 ++++ pkg/kv/kvserver/asim/state/state_test.go | 36 +++-- 18 files changed, 554 insertions(+), 70 deletions(-) create mode 100644 pkg/kv/kvserver/asim/config/BUILD.bazel rename pkg/kv/kvserver/asim/{ => config}/settings.go (71%) create mode 100644 pkg/kv/kvserver/asim/state/split_decider.go create mode 100644 pkg/kv/kvserver/asim/state/split_decider_test.go diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 48cb73eab4ec..451d39f27be7 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -992,6 +992,7 @@ GO_TARGETS = [ "//pkg/kv/kvserver/allocator:allocator", "//pkg/kv/kvserver/apply:apply", "//pkg/kv/kvserver/apply:apply_test", + "//pkg/kv/kvserver/asim/config:config", "//pkg/kv/kvserver/asim/state:state", "//pkg/kv/kvserver/asim/state:state_test", "//pkg/kv/kvserver/asim/workload:workload", @@ -2189,6 +2190,7 @@ GET_X_DATA_TARGETS = [ "//pkg/kv/kvserver/allocator/storepool:get_x_data", "//pkg/kv/kvserver/apply:get_x_data", "//pkg/kv/kvserver/asim:get_x_data", + "//pkg/kv/kvserver/asim/config:get_x_data", "//pkg/kv/kvserver/asim/state:get_x_data", "//pkg/kv/kvserver/asim/workload:get_x_data", "//pkg/kv/kvserver/batcheval:get_x_data", diff --git a/pkg/kv/kvserver/asim/BUILD.bazel b/pkg/kv/kvserver/asim/BUILD.bazel index 424ff31d4f5c..3ea3a7f96e62 100644 --- a/pkg/kv/kvserver/asim/BUILD.bazel +++ b/pkg/kv/kvserver/asim/BUILD.bazel @@ -8,13 +8,13 @@ go_library( "metrics_tracker.go", "pacer.go", "replicate_queue.go", - "settings.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim", visibility = ["//visibility:public"], deps = [ "//pkg/kv/kvserver/allocator/allocatorimpl", "//pkg/kv/kvserver/allocator/storepool", + "//pkg/kv/kvserver/asim/config", "//pkg/kv/kvserver/asim/state", "//pkg/kv/kvserver/asim/workload", "//pkg/roachpb", @@ -34,9 +34,11 @@ go_test( ], embed = [":asim"], deps = [ + "//pkg/kv/kvserver/asim/config", "//pkg/kv/kvserver/asim/state", "//pkg/kv/kvserver/asim/workload", "//pkg/roachpb", + "//pkg/testutils/skip", "//pkg/util/timeutil", "@com_github_stretchr_testify//require", ], diff --git a/pkg/kv/kvserver/asim/asim.go b/pkg/kv/kvserver/asim/asim.go index 2dcc1d7e79e8..8fe401059bd2 100644 --- a/pkg/kv/kvserver/asim/asim.go +++ b/pkg/kv/kvserver/asim/asim.go @@ -14,6 +14,7 @@ import ( "context" "time" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/workload" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -52,7 +53,7 @@ func NewSimulator( initialState state.State, exchange state.Exchange, changer state.Changer, - settings SimulationSettings, + settings *config.SimulationSettings, metrics *MetricsTracker, ) *Simulator { pacers := make(map[state.StoreID]ReplicaPacer) @@ -62,14 +63,14 @@ func NewSimulator( rqs[storeID] = NewReplicateQueue( storeID, changer, - ReplicaChangeDelayFn(settings), + settings.ReplicaChangeDelayFn(), initialState.MakeAllocator(storeID), start, ) sqs[storeID] = NewSplitQueue( storeID, changer, - RangeSplitDelayFn(settings), + settings.RangeSplitDelayFn(), settings.RangeSizeSplitThreshold, start, ) @@ -177,6 +178,23 @@ func (s *Simulator) tickStoreClocks(tick time.Time) { // processing. func (s *Simulator) tickQueues(ctx context.Context, tick time.Time, state state.State) { for storeID := range state.Stores() { + + // Tick the split queue. + s.sqs[storeID].Tick(ctx, tick, state) + // Tick the replicate queue. + s.rqs[storeID].Tick(ctx, tick, state) + + // Tick changes that may have been enqueued with a lower completion + // than the current tick, from the queues. + s.changer.Tick(tick, state) + + // Try adding suggested load splits that are pending for this store. + for _, rangeID := range state.LoadSplitterFor(storeID).ClearSplitKeys() { + if r, ok := state.LeaseHolderReplica(rangeID); ok { + s.sqs[storeID].MaybeAdd(ctx, r, state) + } + } + for { r := s.pacers[storeID].Next(tick) if r == nil { @@ -190,15 +208,6 @@ func (s *Simulator) tickQueues(ctx context.Context, tick time.Time, state state. continue } - // Tick the split queue. - s.sqs[storeID].Tick(ctx, tick, state) - // Tick the replicate queue. - s.rqs[storeID].Tick(ctx, tick, state) - - // Tick changes that may have been enqueued with a lower completion - // than the current tick, from the queues. - s.changer.Tick(tick, state) - // Try adding the replica to the split queue. s.sqs[storeID].MaybeAdd(ctx, r, state) // Try adding the replica to the replicate queue. diff --git a/pkg/kv/kvserver/asim/asim_test.go b/pkg/kv/kvserver/asim/asim_test.go index f33c8c7de393..c370f5945513 100644 --- a/pkg/kv/kvserver/asim/asim_test.go +++ b/pkg/kv/kvserver/asim/asim_test.go @@ -20,15 +20,17 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/workload" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/stretchr/testify/require" ) func TestRunAllocatorSimulator(t *testing.T) { ctx := context.Background() - settings := asim.DefaultSimulationSettings() + settings := config.DefaultSimulationSettings() start := state.TestingStartTime() end := start.Add(1000 * time.Second) interval := 10 * time.Second @@ -75,19 +77,19 @@ func testPreGossipStores(s state.State, exchange state.Exchange, at time.Time) { } // TestAllocatorSimulatorSpeed tests that the simulation runs at a rate of at -// least 5 simulated minutes per wall clock second (1:50) for a 32 node +// least 1.67 simulated minutes per wall clock second (1:100) for a 32 node // cluster, with 32000 replicas. The workload is generating 16000 keys per // second with a uniform distribution. -// NB: In practice, on a single thread N2 GCP VM, this completes with a minimum -// run of 1350ms, approximately 16x faster (1:444) than what this test asserts. -// The limit is set much higher due to --stress and inconsistent processor -// speeds. The speedup is not linear w.r.t replica or store count. -// TODO(kvoli,lidorcarmel): If this test flakes on CI --stress --race, decrease -// the stores, or decrease replicasPerStore. func TestAllocatorSimulatorSpeed(t *testing.T) { ctx := context.Background() + + skipString := "Skipping test under (?stress|?race) as it asserts on speed of the run." + skip.UnderStress(t, skipString) + skip.UnderStressRace(t, skipString) + skip.UnderRace(t, skipString) + start := state.TestingStartTime() - settings := asim.DefaultSimulationSettings() + settings := config.DefaultSimulationSettings() // Run each simulation for 5 minutes. end := start.Add(5 * time.Minute) diff --git a/pkg/kv/kvserver/asim/config/BUILD.bazel b/pkg/kv/kvserver/asim/config/BUILD.bazel new file mode 100644 index 000000000000..0c0dee0f3484 --- /dev/null +++ b/pkg/kv/kvserver/asim/config/BUILD.bazel @@ -0,0 +1,11 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "config", + srcs = ["settings.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config", + visibility = ["//visibility:public"], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/kv/kvserver/asim/settings.go b/pkg/kv/kvserver/asim/config/settings.go similarity index 71% rename from pkg/kv/kvserver/asim/settings.go rename to pkg/kv/kvserver/asim/config/settings.go index c29220ecb54d..9846acf46f0a 100644 --- a/pkg/kv/kvserver/asim/settings.go +++ b/pkg/kv/kvserver/asim/config/settings.go @@ -8,7 +8,7 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package asim +package config import "time" @@ -23,11 +23,17 @@ const ( defaultPacerMaxIterIterval = 1 * time.Second defaultStateExchangeInterval = 10 * time.Second defaultStateExchangeDelay = 500 * time.Millisecond + defaultSplitQPSThreshold = 2500 + defaultSplitQPSRetention = 10 * time.Minute + defaultSeed = 42 ) // SimulationSettings controls // WIP: Thread these settings through to each of the sim parts. type SimulationSettings struct { + // Seed is the random source that will be used for any simulator components + // that accept a seed. + Seed int64 // ReplicaChangeBaseDelay is the base delay for all replica movements // (add,remove). It accounts for a fixed overhead of initiating a replica // movement. @@ -65,11 +71,18 @@ type SimulationSettings struct { // StateExchangeDelay is the delay between sending a state update and all // other stores receiving the update. StateExchangeDelay time.Duration + // SplitQPSThreshold is the threshold above which a range will be a + // candidate for load based splitting. + SplitQPSThreshold float64 + // SplitQPSRetention is the duration which recorded load will be retained + // and factored into load based splitting decisions. + SplitQPSRetention time.Duration } // DefaultSimulationSettings returns a set of default settings for simulation. -func DefaultSimulationSettings() SimulationSettings { - return SimulationSettings{ +func DefaultSimulationSettings() *SimulationSettings { + return &SimulationSettings{ + Seed: defaultSeed, ReplicaChangeBaseDelay: defaultReplicaChangeBaseDelay, ReplicaAddRate: defaultReplicaAddDelayFactor, SplitQueueDelay: defaultSplitQueueDelay, @@ -80,18 +93,18 @@ func DefaultSimulationSettings() SimulationSettings { PacerMaxIterIterval: defaultPacerMaxIterIterval, StateExchangeInterval: defaultStateExchangeInterval, StateExchangeDelay: defaultStateExchangeDelay, + SplitQPSThreshold: defaultSplitQPSThreshold, + SplitQPSRetention: defaultSplitQPSRetention, } } // ReplicaChangeDelayFn returns a function which calculates the delay for // adding a replica based on the range size. -func ReplicaChangeDelayFn( - settings SimulationSettings, -) func(rangeSize int64, add bool) time.Duration { +func (s *SimulationSettings) ReplicaChangeDelayFn() func(rangeSize int64, add bool) time.Duration { return func(rangeSize int64, add bool) time.Duration { - delay := settings.ReplicaChangeBaseDelay + delay := s.ReplicaChangeBaseDelay if add { - delay += (time.Duration(rangeSize/(1024*1024)) / time.Duration(settings.ReplicaAddRate)) + delay += (time.Duration(rangeSize/(1024*1024)) / time.Duration(s.ReplicaAddRate)) } return delay } @@ -99,8 +112,25 @@ func ReplicaChangeDelayFn( // RangeSplitDelayFn returns a function which calculates the delay for // splitting a range. -func RangeSplitDelayFn(settings SimulationSettings) func() time.Duration { +func (s *SimulationSettings) RangeSplitDelayFn() func() time.Duration { return func() time.Duration { - return settings.SplitQueueDelay + return s.SplitQueueDelay + } +} + +// SplitQPSThresholdFn returns a function that returns the current QPS split +// threshold for load based splitting of a range. +func (s *SimulationSettings) SplitQPSThresholdFn() func() float64 { + return func() float64 { + return s.SplitQPSThreshold + } +} + +// SplitQPSRetentionFn returns a function that returns the current QPS +// retention duration for load recorded against a range, used in load based +// split decisions. +func (s *SimulationSettings) SplitQPSRetentionFn() func() time.Duration { + return func() time.Duration { + return s.SplitQPSRetention } } diff --git a/pkg/kv/kvserver/asim/metrics_tracker_test.go b/pkg/kv/kvserver/asim/metrics_tracker_test.go index 6db2b6741303..f740b364edd9 100644 --- a/pkg/kv/kvserver/asim/metrics_tracker_test.go +++ b/pkg/kv/kvserver/asim/metrics_tracker_test.go @@ -18,6 +18,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/workload" "github.com/stretchr/testify/require" @@ -105,7 +106,7 @@ func Example_rebalance() { func Example_workload() { ctx := context.Background() start := state.TestingStartTime() - settings := asim.DefaultSimulationSettings() + settings := config.DefaultSimulationSettings() end := start.Add(200 * time.Second) interval := 10 * time.Second rwg := make([]workload.Generator, 1) diff --git a/pkg/kv/kvserver/asim/replicate_queue.go b/pkg/kv/kvserver/asim/replicate_queue.go index 16feacf33ecf..6c60d853fb37 100644 --- a/pkg/kv/kvserver/asim/replicate_queue.go +++ b/pkg/kv/kvserver/asim/replicate_queue.go @@ -170,7 +170,7 @@ func (rq *replicateQueue) MaybeAdd( return true } -// Tick proceses updates in the ReplicateQueue. Only one replica is +// Tick processes updates in the ReplicateQueue. Only one replica is // processed at a time and the duration taken to process a replica depends // on the action taken. Replicas in the queue are processed in order of // priority, then in FIFO order on ties. The Tick function currently only @@ -273,7 +273,7 @@ func NewSplitQueue( // MaybeAdd proposes a range for being split. If it meets the criteria it is // enqueued. func (sq *splitQueue) MaybeAdd(ctx context.Context, replica state.Replica, state state.State) bool { - priority := sq.shouldSplit(replica.Range(), state) + priority := sq.shouldSplit(sq.lastTick, replica.Range(), state) if priority < 1 { return false } @@ -288,7 +288,7 @@ func (sq *splitQueue) MaybeAdd(ctx context.Context, replica state.Replica, state return true } -// Tick proceses updates in the split queue. Only one range is processed at a +// Tick processes updates in the split queue. Only one range is processed at a // time and the duration taken to process a replica depends on the action // taken. Replicas in the queue are processed in order of priority, then in // FIFO order on ties. The tick currently only considers size based range @@ -309,13 +309,13 @@ func (sq *splitQueue) Tick(ctx context.Context, tick time.Time, s state.State) { return } - // Check whether the range satifies the split criteria, since it may have + // Check whether the range satisfies the split criteria, since it may have // changed since it was enqueued. - if sq.shouldSplit(rng.RangeID(), s) < 1 { + if sq.shouldSplit(tick, rng.RangeID(), s) < 1 { return } - splitKey, ok := findKeySpanSplit(s, rng.RangeID()) + splitKey, ok := sq.findKeySpanSplit(tick, s, rng.RangeID()) if !ok { return } @@ -338,19 +338,35 @@ func (sq *splitQueue) Tick(ctx context.Context, tick time.Time, s state.State) { // shouldSplit returns whether a range should be split into two. When the // floating point number returned is greater than or equal to 1, it should be // split with that priority, else it shouldn't. -func (sq *splitQueue) shouldSplit(rangeID state.RangeID, s state.State) float64 { +func (sq *splitQueue) shouldSplit(tick time.Time, rangeID state.RangeID, s state.State) float64 { rng, ok := s.Range(rangeID) if !ok { return 0 } - return float64(rng.Size()) / float64(sq.splitThreshold) + // Check whether we should split this range based on load. + if _, ok := s.LoadSplitterFor(sq.storeID).SplitKey(tick, rangeID); ok { + return 2.0 + } + + // Check whether we should split this range based on size. + overfullBytesThreshold := float64(rng.Size()) / float64(sq.splitThreshold) + + return overfullBytesThreshold } // findKeySpanSplit returns a key that may be used for splitting a range into // two. It will return the key that divides the range into an equal number of // keys on the lhs and rhs. -func findKeySpanSplit(s state.State, rangeID state.RangeID) (state.Key, bool) { +func (sq *splitQueue) findKeySpanSplit( + tick time.Time, s state.State, rangeID state.RangeID, +) (state.Key, bool) { + // Try and use the split key suggested by the load based splitter, if one + // exists. + if loadSplitKey, ok := s.LoadSplitterFor(sq.storeID).SplitKey(tick, rangeID); ok { + return loadSplitKey, true + } + start, end, ok := s.RangeSpan(rangeID) if !ok { return start, false diff --git a/pkg/kv/kvserver/asim/replicate_queue_test.go b/pkg/kv/kvserver/asim/replicate_queue_test.go index 009cb882b7be..8cbf0af9b41c 100644 --- a/pkg/kv/kvserver/asim/replicate_queue_test.go +++ b/pkg/kv/kvserver/asim/replicate_queue_test.go @@ -16,6 +16,7 @@ import ( "testing" "time" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/workload" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -60,7 +61,7 @@ func TestReplicateQueue(t *testing.T) { start := state.TestingStartTime() ctx := context.Background() testingStore := state.StoreID(1) - testSettings := DefaultSimulationSettings() + testSettings := config.DefaultSimulationSettings() // NB: This test assumes 5s interval/changes for simplification purposes. testSettings.StateExchangeInterval = 5 * time.Second @@ -127,7 +128,7 @@ func TestReplicateQueue(t *testing.T) { rq := NewReplicateQueue( store.StoreID(), changer, - ReplicaChangeDelayFn(testSettings), + testSettings.ReplicaChangeDelayFn(), s.MakeAllocator(store.StoreID()), start, ) @@ -181,7 +182,8 @@ func TestReplicateQueue(t *testing.T) { func TestSplitQueue(t *testing.T) { start := state.TestingStartTime() ctx := context.Background() - testSettings := DefaultSimulationSettings() + testSettings := config.DefaultSimulationSettings() + // NB: This test assume 5 second split queue delays for simplification. testSettings.SplitQueueDelay = 5 * time.Second @@ -300,7 +302,7 @@ func TestSplitQueue(t *testing.T) { sq := NewSplitQueue( store.StoreID(), changer, - RangeSplitDelayFn(testSettings), + testSettings.RangeSplitDelayFn(), tc.splitThreshold, start, ) diff --git a/pkg/kv/kvserver/asim/state/BUILD.bazel b/pkg/kv/kvserver/asim/state/BUILD.bazel index 5da2f42c2d9f..b53a0747037c 100644 --- a/pkg/kv/kvserver/asim/state/BUILD.bazel +++ b/pkg/kv/kvserver/asim/state/BUILD.bazel @@ -10,6 +10,7 @@ go_library( "helpers.go", "impl.go", "load.go", + "split_decider.go", "state.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state", @@ -20,8 +21,10 @@ go_library( "//pkg/kv/kvserver/allocator", "//pkg/kv/kvserver/allocator/allocatorimpl", "//pkg/kv/kvserver/allocator/storepool", + "//pkg/kv/kvserver/asim/config", "//pkg/kv/kvserver/asim/workload", "//pkg/kv/kvserver/liveness/livenesspb", + "//pkg/kv/kvserver/split", "//pkg/roachpb", "//pkg/settings/cluster", "//pkg/util/hlc", @@ -39,10 +42,12 @@ go_test( "change_test.go", "config_loader_test.go", "exchange_test.go", + "split_decider_test.go", "state_test.go", ], embed = [":state"], deps = [ + "//pkg/kv/kvserver/asim/config", "//pkg/kv/kvserver/asim/workload", "//pkg/roachpb", "@com_github_stretchr_testify//require", diff --git a/pkg/kv/kvserver/asim/state/config_loader.go b/pkg/kv/kvserver/asim/state/config_loader.go index 338d967e35e7..9a57820ef977 100644 --- a/pkg/kv/kvserver/asim/state/config_loader.go +++ b/pkg/kv/kvserver/asim/state/config_loader.go @@ -10,6 +10,8 @@ package state +import "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config" + // SingleRegionConfig is a simple cluster config with a single region and 3 // zones, all have the same number of nodes. var SingleRegionConfig = ClusterInfo{ @@ -109,7 +111,7 @@ type ClusterInfo struct { // LoadConfig loads a predefined configuration which contains cluster // information such as regions, zones, etc. func LoadConfig(c ClusterInfo) State { - s := newState() + s := newState(config.DefaultSimulationSettings()) // A new state has a single range - add the replica load for that range. s.load[1] = &ReplicaLoadCounter{} s.clusterinfo = c diff --git a/pkg/kv/kvserver/asim/state/helpers.go b/pkg/kv/kvserver/asim/state/helpers.go index 66f35356c644..8349dfc8aa04 100644 --- a/pkg/kv/kvserver/asim/state/helpers.go +++ b/pkg/kv/kvserver/asim/state/helpers.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/config/zonepb" "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -84,7 +85,7 @@ func NewTestState( replicas map[Key][]StoreID, leaseholders map[Key]StoreID, ) State { - state := newState() + state := newState(config.DefaultSimulationSettings()) for i := 0; i < nodes; i++ { node := state.AddNode() for j := 0; j < storesPerNode; j++ { diff --git a/pkg/kv/kvserver/asim/state/impl.go b/pkg/kv/kvserver/asim/state/impl.go index 363c20bbb4ff..29cb4e1a158c 100644 --- a/pkg/kv/kvserver/asim/state/impl.go +++ b/pkg/kv/kvserver/asim/state/impl.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/workload" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -31,10 +32,12 @@ type state struct { nodes map[NodeID]*node stores map[StoreID]*store load map[RangeID]ReplicaLoad + loadsplits map[StoreID]LoadSplitter ranges *rmap clusterinfo ClusterInfo usageInfo *ClusterUsageInfo clock ManualSimClock + settings *config.SimulationSettings // Unique ID generators for Nodes and Stores. These are incremented // pre-assignment. So that IDs start from 1. @@ -43,17 +46,19 @@ type state struct { } // NewState returns an implementation of the State interface. -func NewState() State { - return newState() +func NewState(settings *config.SimulationSettings) State { + return newState(settings) } -func newState() *state { +func newState(settings *config.SimulationSettings) *state { return &state{ - nodes: make(map[NodeID]*node), - stores: make(map[StoreID]*store), - load: make(map[RangeID]ReplicaLoad), - ranges: newRMap(), - usageInfo: newClusterUsageInfo(), + nodes: make(map[NodeID]*node), + stores: make(map[StoreID]*store), + load: map[RangeID]ReplicaLoad{FirstRangeID: &ReplicaLoadCounter{}}, + loadsplits: make(map[StoreID]LoadSplitter), + ranges: newRMap(), + usageInfo: newClusterUsageInfo(), + settings: config.DefaultSimulationSettings(), } } @@ -301,6 +306,12 @@ func (s *state) AddStore(nodeID NodeID) (Store, bool) { node.stores = append(node.stores, storeID) s.stores[storeID] = store + // Add a range load splitter for this store. + s.loadsplits[storeID] = NewSplitDecider(s.settings.Seed, + s.settings.SplitQPSThresholdFn(), + s.settings.SplitQPSRetentionFn(), + ) + return store, true } @@ -489,21 +500,28 @@ func (s *state) SplitRange(splitKey Key) (Range, Range, bool) { // Update the range map state. ranges.rangeTree.ReplaceOrInsert(r) ranges.rangeMap[r.rangeID] = r - s.load[r.rangeID] = &ReplicaLoadCounter{} - // Update the range size to be split 50/50 between the lhs and rhs. + // Update the range size to be split 50/50 between the lhs and rhs. Also + // split the replica load that is recorded 50/50 between the lhs and rhs. // NB: This is a simplifying assumption. predecessorRange.size /= 2 r.size = predecessorRange.size + if predecessorLoad, ok := s.load[predecessorRange.rangeID]; ok { + s.load[r.rangeID] = predecessorLoad.Split() + } // If there are existing replicas for the LHS of the split, then also // create replicas on the same stores for the RHS. for storeID, replica := range predecessorRange.replicas { s.AddReplica(rangeID, storeID) - // The successor range's leaseholder was on this store, copy the - // leaseholder store over for the new split range. if replica.HoldsLease() { + // The successor range's leaseholder was on this store, copy the + // leaseholder store over for the new split range. s.TransferLease(rangeID, storeID) + + // Reset the recorded load split statistics on the predecessor + // range. + s.loadsplits[storeID].ResetRange(predecessorRange.rangeID) } } @@ -532,9 +550,12 @@ func (s *state) TransferLease(rangeID RangeID, storeID StoreID) bool { // Remove the old leaseholder. oldLeaseHolderID := rng.leaseholder - for _, repl := range rng.replicas { + for oldStoreID, repl := range rng.replicas { if repl.replicaID == oldLeaseHolderID { repl.holdsLease = false + // Reset the load stats on the old range, within the old + // leaseholder store. + s.loadsplits[oldStoreID].ResetRange(rangeID) } } @@ -606,6 +627,14 @@ func (s *state) applyLoad(rng *rng, le workload.LoadEvent) { // Note that deletes are not supported currently, we are also assuming data // is not compacted. rng.size += le.WriteSize + + // Record the load against the splitter for the store which holds a lease + // for this range, if one exists. + store, ok := s.LeaseholderStore(rng.rangeID) + if !ok { + return + } + s.loadsplits[store.StoreID()].Record(s.clock.Now(), rng.rangeID, le) } func (s *state) updateStoreCapacities() { @@ -681,6 +710,42 @@ func (s *state) MakeAllocator(storeID StoreID) allocatorimpl.Allocator { ) } +// LeaseHolderReplica returns the replica which holds a lease for the range +// with ID RangeID, if the range exists, otherwise returning false. +func (s *state) LeaseHolderReplica(rangeID RangeID) (Replica, bool) { + rng, ok := s.ranges.rangeMap[rangeID] + if !ok { + return nil, false + } + + for _, replica := range rng.replicas { + if replica.holdsLease { + return replica, true + } + } + return nil, false +} + +// LeaseholderStore returns the store which holds a lease for the range with ID +// RangeID, if the range and store exist, otherwise returning false. +func (s *state) LeaseholderStore(rangeID RangeID) (Store, bool) { + replica, ok := s.LeaseHolderReplica(rangeID) + if !ok { + return nil, false + } + + store, ok := s.stores[replica.StoreID()] + if !ok { + return nil, false + } + return store, true +} + +// LoadSplitterFor returns the load splitter for the Store with ID StoreID. +func (s *state) LoadSplitterFor(storeID StoreID) LoadSplitter { + return s.loadsplits[storeID] +} + // node is an implementation of the Node interface. type node struct { nodeID NodeID diff --git a/pkg/kv/kvserver/asim/state/load.go b/pkg/kv/kvserver/asim/state/load.go index 105b967ccae9..e2c37609b538 100644 --- a/pkg/kv/kvserver/asim/state/load.go +++ b/pkg/kv/kvserver/asim/state/load.go @@ -24,6 +24,14 @@ type ReplicaLoad interface { // Load translates the recorded load events into usage information of the // replica. Load() allocator.RangeUsageInfo + // Split halves the load of the ReplicaLoad this method is called on and + // assigns the other half to a new ReplicaLoad that is returned i.e. 50/50. + Split() ReplicaLoad +} + +// LoadEventQPS returns the QPS for a given workload event. +func LoadEventQPS(le workload.LoadEvent) float64 { + return float64(le.Reads) + float64(le.Writes) } // ReplicaLoadCounter is the sum of all key accesses and size of bytes, both written @@ -37,6 +45,7 @@ type ReplicaLoadCounter struct { WriteBytes int64 ReadKeys int64 ReadBytes int64 + QPS float64 } // ApplyLoad applies a load event onto a replica load counter. @@ -45,6 +54,7 @@ func (rl *ReplicaLoadCounter) ApplyLoad(le workload.LoadEvent) { rl.ReadKeys += le.Reads rl.WriteBytes += le.WriteSize rl.WriteKeys += le.Writes + rl.QPS += LoadEventQPS(le) } // Load translates the recorded key accesses and size into range usage @@ -52,11 +62,29 @@ func (rl *ReplicaLoadCounter) ApplyLoad(le workload.LoadEvent) { func (rl *ReplicaLoadCounter) Load() allocator.RangeUsageInfo { return allocator.RangeUsageInfo{ LogicalBytes: rl.WriteBytes, - QueriesPerSecond: float64(rl.WriteKeys + rl.ReadKeys), + QueriesPerSecond: rl.QPS, WritesPerSecond: float64(rl.WriteKeys), } } +// Split halves the load of the ReplicaLoad this method is called on and +// assigns the other half to a new ReplicaLoad that is returned i.e. 50/50. +func (rl *ReplicaLoadCounter) Split() ReplicaLoad { + rl.WriteKeys /= 2 + rl.WriteBytes /= 2 + rl.ReadKeys /= 2 + rl.ReadBytes /= 2 + rl.QPS /= 2 + + return &ReplicaLoadCounter{ + WriteKeys: rl.WriteKeys, + WriteBytes: rl.WriteBytes, + ReadKeys: rl.ReadKeys, + ReadBytes: rl.ReadBytes, + QPS: rl.QPS, + } +} + // Capacity returns the store capacity for the store with id storeID. It // aggregates the load from each replica within the store. func Capacity(state State, storeID StoreID) roachpb.StoreCapacity { diff --git a/pkg/kv/kvserver/asim/state/split_decider.go b/pkg/kv/kvserver/asim/state/split_decider.go new file mode 100644 index 000000000000..1f6e08bb593a --- /dev/null +++ b/pkg/kv/kvserver/asim/state/split_decider.go @@ -0,0 +1,128 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package state + +import ( + "math/rand" + "time" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/workload" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/split" + "github.com/cockroachdb/cockroach/pkg/roachpb" +) + +// LoadSplitter provides an abstraction for load based splitting. It records +// load against ranges, suggested ranges to be split and possible split keys +// for ranges. +type LoadSplitter interface { + // Record records a workload event at the time given, against the range + // with ID RangeID. + Record(time.Time, RangeID, workload.LoadEvent) bool + // SplitKey returns whether split key and true if a valid split key exists + // given the recorded load, otherwise returning false. + SplitKey(time.Time, RangeID) (Key, bool) + // ClearSplitKeys returns a suggested list of ranges that should be split + // due to load. Calling this function resets the list of suggestions. + ClearSplitKeys() []RangeID + // ResetRange resets the collected statistics for a Range with ID RangeID. + ResetRange(rangeID RangeID) +} + +// SplitDecider implements the LoadSplitter interface. +type SplitDecider struct { + deciders map[RangeID]*split.Decider + suggestions []RangeID + qpsThreshold func() float64 + qpsRetention func() time.Duration + seed int64 +} + +// NewSplitDecider returns a new SplitDecider. +func NewSplitDecider( + seed int64, qpsThresholdFn func() float64, qpsRetentionFn func() time.Duration, +) *SplitDecider { + return &SplitDecider{ + deciders: make(map[RangeID]*split.Decider), + suggestions: []RangeID{}, + seed: seed, + qpsThreshold: qpsThresholdFn, + qpsRetention: qpsRetentionFn, + } +} + +func (s *SplitDecider) newDecider() *split.Decider { + rand := rand.New(rand.NewSource(s.seed)) + + intN := func(n int) int { + return rand.Intn(n) + } + + decider := &split.Decider{} + split.Init(decider, intN, s.qpsThreshold, s.qpsRetention) + return decider +} + +// Record records a workload event at the time given, against the range +// with ID RangeID. +func (s *SplitDecider) Record(tick time.Time, rangeID RangeID, le workload.LoadEvent) bool { + decider := s.deciders[rangeID] + + if decider == nil { + decider = s.newDecider() + s.deciders[rangeID] = decider + } + + qps := LoadEventQPS(le) + shouldSplit := decider.Record(tick, int(qps), func() roachpb.Span { + return roachpb.Span{ + Key: Key(le.Key).ToRKey().AsRawKey(), + } + }) + + if shouldSplit { + s.suggestions = append(s.suggestions, rangeID) + } + + return shouldSplit +} + +// SplitKey returns whether split key and true if a valid split key exists +// given the recorded load, otherwise returning false. +func (s *SplitDecider) SplitKey(tick time.Time, rangeID RangeID) (Key, bool) { + decider := s.deciders[rangeID] + if decider == nil { + return InvalidKey, false + } + + key := decider.MaybeSplitKey(tick) + if key == nil { + return InvalidKey, false + } + + return ToKey(key), true +} + +// ClearSplitKeys returns a suggested list of ranges that should be split due +// to load. Calling this function resets the list of suggestions. +func (s *SplitDecider) ClearSplitKeys() []RangeID { + suggestions := make([]RangeID, len(s.suggestions)) + for i, suggestion := range s.suggestions { + suggestions[i] = suggestion + } + + s.suggestions = []RangeID{} + return suggestions +} + +// ResetRange resets the collected statistics for a Range with ID RangeID. +func (s *SplitDecider) ResetRange(rangeID RangeID) { + s.deciders[rangeID] = s.newDecider() +} diff --git a/pkg/kv/kvserver/asim/state/split_decider_test.go b/pkg/kv/kvserver/asim/state/split_decider_test.go new file mode 100644 index 000000000000..8bcc7c9d3b2a --- /dev/null +++ b/pkg/kv/kvserver/asim/state/split_decider_test.go @@ -0,0 +1,135 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package state + +import ( + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/workload" + "github.com/stretchr/testify/require" +) + +var testingSequence = []Key{10, 1, 9, 2, 8, 3, 4, 7, 5, 6} + +func TestSplitDecider(t *testing.T) { + testingSeed := 42 + testingThreshold := func() float64 { return 2500 } + testingRetention := func() time.Duration { return 60 * time.Second } + startTime := TestingStartTime() + decider := NewSplitDecider(int64(testingSeed), testingThreshold, testingRetention) + + // A decider should be created for a range when a load event is first + // recorded against it. + require.Nil(t, decider.deciders[1]) + decider.Record(startTime, 1, workload.LoadEvent{Key: 1, Reads: 1}) + require.NotNil(t, decider.deciders[1]) + + // No valid split key should be found when there has been below threshold + // load. + splitKey, found := decider.SplitKey(startTime, 1) + require.False(t, found) + require.Equal(t, InvalidKey, splitKey) + + // No ranges should have been accumulated as suggestions for splitting. + suggestions := decider.ClearSplitKeys() + require.Empty(t, suggestions) + sequence := testingSequence + + // Register load greater than the threshold. + for i := 0; int64(i) < int64(testingRetention()/time.Second); i++ { + for j := 0; j < int(testingThreshold())+100; j++ { + decider.Record( + OffsetTick(startTime, int64(i)), + 1, + workload.LoadEvent{Key: int64(sequence[j%len(sequence)]), Reads: 1}, + ) + } + } + + // There should now be 1 suggested range for splitting which corresponds to + // the midpoint of the testing sequence. + require.Equal(t, []RangeID{1}, decider.ClearSplitKeys()) + splitKey, found = decider.SplitKey(startTime.Add(testingRetention()), 1) + require.True(t, found) + require.Equal(t, Key(6), splitKey) + + // After clearing the split keys, it should now return no new suggestions. + require.Equal(t, []RangeID{}, decider.ClearSplitKeys()) +} + +func TestSplitDeciderWorkload(t *testing.T) { + testingSeed := 42 + testingRangeID := FirstRangeID + startTime := TestingStartTime() + + testCases := []struct { + desc string + ticks []int64 + sequence []Key + qps int64 + threshold float64 + retention time.Duration + expectedSplitKey Key + expectedOk bool + }{ + { + desc: "no split, low qps", + ticks: []int64{20, 40, 60, 80}, + sequence: testingSequence, + qps: 1000, + threshold: 2500, + retention: 120 * time.Second, + expectedSplitKey: InvalidKey, + expectedOk: false, + }, + { + desc: "split, load split evenly left/right of 6", + ticks: []int64{20, 40, 60, 80}, + sequence: testingSequence, + qps: 3000, + threshold: 2500, + retention: 120 * time.Second, + expectedSplitKey: 6, + expectedOk: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + splitDecider := NewSplitDecider( + int64(testingSeed), + func() float64 { return tc.threshold }, + func() time.Duration { return tc.retention }, + ) + lastTick := int64(0) + + for _, tick := range tc.ticks { + tickDelta := tick - lastTick + lastTick = tick + + for loadEventIdx := 0; loadEventIdx < int(tickDelta)*int(tc.qps); loadEventIdx++ { + loadEvent := workload.LoadEvent{ + Key: int64(tc.sequence[loadEventIdx%len(tc.sequence)]), + Reads: 1, + } + splitDecider.Record(OffsetTick(startTime, tick), RangeID(testingRangeID), loadEvent) + } + } + splitKey, ok := splitDecider.SplitKey(OffsetTick(startTime, tc.ticks[len(tc.ticks)-1]), RangeID(testingRangeID)) + require.Equal(t, tc.expectedOk, ok) + require.Equal(t, tc.expectedSplitKey, splitKey) + if tc.expectedOk { + require.GreaterOrEqual(t, len(splitDecider.ClearSplitKeys()), 1) + } + }) + } +} diff --git a/pkg/kv/kvserver/asim/state/state.go b/pkg/kv/kvserver/asim/state/state.go index 3ff03560e532..e8c193a686e9 100644 --- a/pkg/kv/kvserver/asim/state/state.go +++ b/pkg/kv/kvserver/asim/state/state.go @@ -12,6 +12,7 @@ package state import ( "fmt" + "strconv" "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" @@ -68,6 +69,12 @@ type State interface { RangeCount() int64 // Replicas returns all replicas that exist on a store. Replicas(StoreID) []Replica + // LeaseHolderReplica returns the replica which holds a lease for the range + // with ID RangeID, if the range exists, otherwise returning false. + LeaseHolderReplica(RangeID) (Replica, bool) + // LeaseholderStore returns the store which holds a lease for the range with ID + // RangeID, if the range and store exist, otherwise returning false. + LeaseholderStore(RangeID) (Store, bool) // AddNode modifies the state to include one additional node. This cannot // fail. The new Node is returned. AddNode() Node @@ -144,6 +151,8 @@ type State interface { // the allocator and storepool should both be separated out of this // interface, instead using it to populate themselves. MakeAllocator(StoreID) allocatorimpl.Allocator + // LoadSplitterFor returns the load splitter for the Store with ID StoreID. + LoadSplitterFor(StoreID) LoadSplitter } // Node is a container for stores and is part of a cluster. @@ -257,6 +266,18 @@ func (k Key) ToRKey() roachpb.RKey { return roachpb.RKey(fmt.Sprintf(keyFmt, k)) } +// ToKey converts a roachpb formatted key into a simulator int64 key. +func ToKey(key roachpb.Key) Key { + stringKey := key.String() + stringKey = stringKey[1 : len(stringKey)-1] + var convertedKey int64 + convertedKey, err := strconv.ParseInt(stringKey, 10, 0) + if err != nil { + return InvalidKey + } + return Key(convertedKey) +} + // defaultSpanConfig is the span config applied by default to all ranges, // unless overwritten. var defaultSpanConfig roachpb.SpanConfig = roachpb.SpanConfig{ @@ -265,3 +286,7 @@ var defaultSpanConfig roachpb.SpanConfig = roachpb.SpanConfig{ NumReplicas: 3, NumVoters: 3, } + +// FirstRangeID is the constant for the ID assigned to the first range within +// the keyspace. +const FirstRangeID = 1 diff --git a/pkg/kv/kvserver/asim/state/state_test.go b/pkg/kv/kvserver/asim/state/state_test.go index 066d3961e83c..c7af54e62faa 100644 --- a/pkg/kv/kvserver/asim/state/state_test.go +++ b/pkg/kv/kvserver/asim/state/state_test.go @@ -13,13 +13,14 @@ package state import ( "testing" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/workload" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/stretchr/testify/require" ) func TestStateUpdates(t *testing.T) { - s := NewState() + s := NewState(config.DefaultSimulationSettings()) node := s.AddNode() s.AddStore(node.NodeID()) require.Equal(t, 1, len(s.Nodes())) @@ -30,7 +31,7 @@ func TestStateUpdates(t *testing.T) { // for any replicas that existed on the pre-split range. It also checks that // the post-split keys are correct. func TestRangeSplit(t *testing.T) { - s := newState() + s := newState(config.DefaultSimulationSettings()) k1 := MinKey r1 := s.rangeFor(k1) @@ -57,7 +58,7 @@ func TestRangeSplit(t *testing.T) { } func TestRangeMap(t *testing.T) { - s := newState() + s := newState(config.DefaultSimulationSettings()) // Assert that the first range is correctly initialized upon creation of a // new state. @@ -93,7 +94,7 @@ func TestRangeMap(t *testing.T) { // TestValidTransfer asserts that ValidTransfer behaves correctly. func TestValidTransfer(t *testing.T) { - s := NewState() + s := NewState(config.DefaultSimulationSettings()) _, r1, _ := s.SplitRange(1) @@ -126,7 +127,7 @@ func TestValidTransfer(t *testing.T) { // TestTransferLease asserts that the state is correctly updated following a // valid lease transfer. func TestTransferLease(t *testing.T) { - s := NewState() + s := NewState(config.DefaultSimulationSettings()) _, r1, _ := s.SplitRange(1) @@ -154,7 +155,7 @@ func TestTransferLease(t *testing.T) { // TestValidReplicaTarget asserts that CanAddReplica and CanRemoveReplica // behave correctly under various scenarios. func TestValidReplicaTarget(t *testing.T) { - s := NewState() + s := NewState(config.DefaultSimulationSettings()) _, r1, _ := s.SplitRange(1) @@ -193,7 +194,7 @@ func TestValidReplicaTarget(t *testing.T) { } func TestAddReplica(t *testing.T) { - s := NewState() + s := NewState(config.DefaultSimulationSettings()) _, r1, _ := s.SplitRange(1) _, r2, _ := s.SplitRange(2) @@ -218,7 +219,7 @@ func TestAddReplica(t *testing.T) { // TestWorkloadApply asserts that applying workload on a key, will be reflected // on the leaseholder for the range that key is contained within. func TestWorkloadApply(t *testing.T) { - s := NewState() + s := NewState(config.DefaultSimulationSettings()) n1 := s.AddNode() s1, _ := s.AddStore(n1.NodeID()) @@ -262,3 +263,22 @@ func TestWorkloadApply(t *testing.T) { expectedLoad.QueriesPerSecond *= 10 require.Equal(t, expectedLoad, sc3) } + +// TestKeyTranslation asserts that key encoding between roachpb keys and +// simulator int64 keys are correct. +func TestKeyTranslation(t *testing.T) { + for add := Key(1); add <= MaxKey; add *= 2 { + key := MinKey + add + rkey := key.ToRKey() + mappedKey := ToKey(rkey.AsRawKey()) + require.Equal( + t, + key, + mappedKey, + "unexpected mapping %d (key) -> %s (rkey) -> %d (mapped)", + key, + rkey, + mappedKey, + ) + } +} From 2d9a886638278090e6d96f4bc34a0cb8c9cb1ff1 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Thu, 14 Jul 2022 16:33:44 -0400 Subject: [PATCH 2/4] dev,genbzl: add support for generating syntax diagrams Release note: None --- dev | 2 +- pkg/cmd/dev/generate.go | 12 ++ pkg/gen/BUILD.bazel | 6 + pkg/gen/bnf.bzl | 256 ++++++++++++++++++++++++++++++++++++++ pkg/gen/diagrams.bzl | 253 +++++++++++++++++++++++++++++++++++++ pkg/gen/gen.bzl | 14 +++ pkg/gen/genbzl/targets.go | 15 ++- 7 files changed, 553 insertions(+), 5 deletions(-) create mode 100644 pkg/gen/bnf.bzl create mode 100644 pkg/gen/diagrams.bzl diff --git a/dev b/dev index 16df7c20d428..6298cf77c05e 100755 --- a/dev +++ b/dev @@ -3,7 +3,7 @@ set -euo pipefail # Bump this counter to force rebuilding `dev` on all machines. -DEV_VERSION=42 +DEV_VERSION=43 THIS_DIR=$(cd "$(dirname "$0")" && pwd) BINARY_DIR=$THIS_DIR/bin/dev-versions diff --git a/pkg/cmd/dev/generate.go b/pkg/cmd/dev/generate.go index 0462dd8970de..5d98dbe1c4a8 100644 --- a/pkg/cmd/dev/generate.go +++ b/pkg/cmd/dev/generate.go @@ -37,6 +37,8 @@ func makeGenerateCmd(runE func(cmd *cobra.Command, args []string) error) *cobra. dev generate bazel # DEPS.bzl and BUILD.bazel files dev generate cgo # files that help non-Bazel systems (IDEs, go) link to our C dependencies dev generate docs # generates documentation + dev generate diagrams # generates syntax diagrams + dev generate bnf # generates syntax bnf files dev generate go # generates go code (execgen, stringer, protobufs, etc.), plus everything 'cgo' generates dev generate go_nocgo # generates go code (execgen, stringer, protobufs, etc.) dev generate protobuf # *.pb.go files (subset of 'dev generate go') @@ -73,6 +75,8 @@ func (d *dev) generate(cmd *cobra.Command, targets []string) error { "parser": d.generateParser, "optgen": d.generateOptGen, "schemachanger": d.generateSchemaChanger, + "diagrams": d.generateDiagrams, + "bnf": d.generateBNF, } if len(targets) == 0 { @@ -199,6 +203,14 @@ func (d *dev) generateSchemaChanger(cmd *cobra.Command) error { return d.generateTarget(cmd.Context(), "//pkg/gen:schemachanger") } +func (d *dev) generateDiagrams(cmd *cobra.Command) error { + return d.generateTarget(cmd.Context(), "//pkg/gen:diagrams") +} + +func (d *dev) generateBNF(cmd *cobra.Command) error { + return d.generateTarget(cmd.Context(), "//pkg/gen:bnf") +} + func (d *dev) generateTarget(ctx context.Context, target string) error { if err := d.exec.CommandContextInheritingStdStreams( ctx, "bazel", "run", target, diff --git a/pkg/gen/BUILD.bazel b/pkg/gen/BUILD.bazel index de9b72777c75..a8e260bfb580 100644 --- a/pkg/gen/BUILD.bazel +++ b/pkg/gen/BUILD.bazel @@ -1,6 +1,8 @@ load( ":gen.bzl", "bindata", + "bnf", + "diagrams", "docs", "execgen", "gen", @@ -33,6 +35,10 @@ parser() schemachanger() +diagrams() + +bnf() + gen( name = "gen", srcs = [ diff --git a/pkg/gen/bnf.bzl b/pkg/gen/bnf.bzl new file mode 100644 index 000000000000..5458603cd3b6 --- /dev/null +++ b/pkg/gen/bnf.bzl @@ -0,0 +1,256 @@ +# Generated by genbzl + +BNF_SRCS = [ + "//docs/generated/sql/bnf:abort_stmt.bnf", + "//docs/generated/sql/bnf:add_column.bnf", + "//docs/generated/sql/bnf:add_constraint.bnf", + "//docs/generated/sql/bnf:alter_backup.bnf", + "//docs/generated/sql/bnf:alter_changefeed.bnf", + "//docs/generated/sql/bnf:alter_column.bnf", + "//docs/generated/sql/bnf:alter_database_add_region_stmt.bnf", + "//docs/generated/sql/bnf:alter_database_add_super_region.bnf", + "//docs/generated/sql/bnf:alter_database_alter_super_region.bnf", + "//docs/generated/sql/bnf:alter_database_drop_region.bnf", + "//docs/generated/sql/bnf:alter_database_drop_super_region.bnf", + "//docs/generated/sql/bnf:alter_database_owner.bnf", + "//docs/generated/sql/bnf:alter_database_primary_region.bnf", + "//docs/generated/sql/bnf:alter_database_stmt.bnf", + "//docs/generated/sql/bnf:alter_database_survival_goal_stmt.bnf", + "//docs/generated/sql/bnf:alter_database_to_schema_stmt.bnf", + "//docs/generated/sql/bnf:alter_ddl_stmt.bnf", + "//docs/generated/sql/bnf:alter_default_privileges_stmt.bnf", + "//docs/generated/sql/bnf:alter_index_partition_by.bnf", + "//docs/generated/sql/bnf:alter_index_stmt.bnf", + "//docs/generated/sql/bnf:alter_partition_stmt.bnf", + "//docs/generated/sql/bnf:alter_primary_key.bnf", + "//docs/generated/sql/bnf:alter_range_relocate_stmt.bnf", + "//docs/generated/sql/bnf:alter_range_stmt.bnf", + "//docs/generated/sql/bnf:alter_rename_view_stmt.bnf", + "//docs/generated/sql/bnf:alter_role_stmt.bnf", + "//docs/generated/sql/bnf:alter_scatter_index_stmt.bnf", + "//docs/generated/sql/bnf:alter_scatter_stmt.bnf", + "//docs/generated/sql/bnf:alter_schema.bnf", + "//docs/generated/sql/bnf:alter_sequence.bnf", + "//docs/generated/sql/bnf:alter_sequence_options_stmt.bnf", + "//docs/generated/sql/bnf:alter_sequence_owner_stmt.bnf", + "//docs/generated/sql/bnf:alter_sequence_set_schema_stmt.bnf", + "//docs/generated/sql/bnf:alter_stmt.bnf", + "//docs/generated/sql/bnf:alter_table.bnf", + "//docs/generated/sql/bnf:alter_table_locality_stmt.bnf", + "//docs/generated/sql/bnf:alter_table_owner_stmt.bnf", + "//docs/generated/sql/bnf:alter_table_partition_by.bnf", + "//docs/generated/sql/bnf:alter_table_reset_storage_param.bnf", + "//docs/generated/sql/bnf:alter_table_set_schema_stmt.bnf", + "//docs/generated/sql/bnf:alter_table_set_storage_param.bnf", + "//docs/generated/sql/bnf:alter_table_stmt.bnf", + "//docs/generated/sql/bnf:alter_tenant_csetting_stmt.bnf", + "//docs/generated/sql/bnf:alter_type.bnf", + "//docs/generated/sql/bnf:alter_view.bnf", + "//docs/generated/sql/bnf:alter_view_owner_stmt.bnf", + "//docs/generated/sql/bnf:alter_view_set_schema_stmt.bnf", + "//docs/generated/sql/bnf:alter_zone_database_stmt.bnf", + "//docs/generated/sql/bnf:alter_zone_index_stmt.bnf", + "//docs/generated/sql/bnf:alter_zone_partition_stmt.bnf", + "//docs/generated/sql/bnf:alter_zone_range_stmt.bnf", + "//docs/generated/sql/bnf:alter_zone_table_stmt.bnf", + "//docs/generated/sql/bnf:analyze_stmt.bnf", + "//docs/generated/sql/bnf:backup.bnf", + "//docs/generated/sql/bnf:begin_stmt.bnf", + "//docs/generated/sql/bnf:begin_transaction.bnf", + "//docs/generated/sql/bnf:cancel_all_jobs_stmt.bnf", + "//docs/generated/sql/bnf:cancel_job.bnf", + "//docs/generated/sql/bnf:cancel_query.bnf", + "//docs/generated/sql/bnf:cancel_session.bnf", + "//docs/generated/sql/bnf:cancel_stmt.bnf", + "//docs/generated/sql/bnf:check_column_level.bnf", + "//docs/generated/sql/bnf:check_table_level.bnf", + "//docs/generated/sql/bnf:close_cursor_stmt.bnf", + "//docs/generated/sql/bnf:col_qualification.bnf", + "//docs/generated/sql/bnf:column_def.bnf", + "//docs/generated/sql/bnf:comment.bnf", + "//docs/generated/sql/bnf:commit_transaction.bnf", + "//docs/generated/sql/bnf:copy_from_stmt.bnf", + "//docs/generated/sql/bnf:create_as_col_qual_list.bnf", + "//docs/generated/sql/bnf:create_as_constraint_def.bnf", + "//docs/generated/sql/bnf:create_changefeed_stmt.bnf", + "//docs/generated/sql/bnf:create_database_stmt.bnf", + "//docs/generated/sql/bnf:create_ddl_stmt.bnf", + "//docs/generated/sql/bnf:create_extension_stmt.bnf", + "//docs/generated/sql/bnf:create_func_stmt.bnf", + "//docs/generated/sql/bnf:create_index_stmt.bnf", + "//docs/generated/sql/bnf:create_index_with_storage_param.bnf", + "//docs/generated/sql/bnf:create_inverted_index_stmt.bnf", + "//docs/generated/sql/bnf:create_role_stmt.bnf", + "//docs/generated/sql/bnf:create_schedule_for_backup_stmt.bnf", + "//docs/generated/sql/bnf:create_schema_stmt.bnf", + "//docs/generated/sql/bnf:create_sequence_stmt.bnf", + "//docs/generated/sql/bnf:create_stats_stmt.bnf", + "//docs/generated/sql/bnf:create_stmt.bnf", + "//docs/generated/sql/bnf:create_table_as_stmt.bnf", + "//docs/generated/sql/bnf:create_table_stmt.bnf", + "//docs/generated/sql/bnf:create_table_with_storage_param.bnf", + "//docs/generated/sql/bnf:create_type.bnf", + "//docs/generated/sql/bnf:create_view_stmt.bnf", + "//docs/generated/sql/bnf:deallocate_stmt.bnf", + "//docs/generated/sql/bnf:declare_cursor_stmt.bnf", + "//docs/generated/sql/bnf:default_value_column_level.bnf", + "//docs/generated/sql/bnf:delete_stmt.bnf", + "//docs/generated/sql/bnf:discard_stmt.bnf", + "//docs/generated/sql/bnf:drop_column.bnf", + "//docs/generated/sql/bnf:drop_constraint.bnf", + "//docs/generated/sql/bnf:drop_database.bnf", + "//docs/generated/sql/bnf:drop_ddl_stmt.bnf", + "//docs/generated/sql/bnf:drop_index.bnf", + "//docs/generated/sql/bnf:drop_owned_by_stmt.bnf", + "//docs/generated/sql/bnf:drop_role_stmt.bnf", + "//docs/generated/sql/bnf:drop_schedule_stmt.bnf", + "//docs/generated/sql/bnf:drop_schema.bnf", + "//docs/generated/sql/bnf:drop_sequence_stmt.bnf", + "//docs/generated/sql/bnf:drop_stmt.bnf", + "//docs/generated/sql/bnf:drop_table.bnf", + "//docs/generated/sql/bnf:drop_type.bnf", + "//docs/generated/sql/bnf:drop_view.bnf", + "//docs/generated/sql/bnf:execute_stmt.bnf", + "//docs/generated/sql/bnf:experimental_audit.bnf", + "//docs/generated/sql/bnf:explain_analyze_stmt.bnf", + "//docs/generated/sql/bnf:explain_stmt.bnf", + "//docs/generated/sql/bnf:explainable_stmt.bnf", + "//docs/generated/sql/bnf:export_stmt.bnf", + "//docs/generated/sql/bnf:family_def.bnf", + "//docs/generated/sql/bnf:fetch_cursor_stmt.bnf", + "//docs/generated/sql/bnf:for_locking.bnf", + "//docs/generated/sql/bnf:foreign_key_column_level.bnf", + "//docs/generated/sql/bnf:foreign_key_table_level.bnf", + "//docs/generated/sql/bnf:generic_set.bnf", + "//docs/generated/sql/bnf:grant_stmt.bnf", + "//docs/generated/sql/bnf:import_csv.bnf", + "//docs/generated/sql/bnf:import_dump.bnf", + "//docs/generated/sql/bnf:import_into.bnf", + "//docs/generated/sql/bnf:index_def.bnf", + "//docs/generated/sql/bnf:insert_rest.bnf", + "//docs/generated/sql/bnf:insert_stmt.bnf", + "//docs/generated/sql/bnf:iso_level.bnf", + "//docs/generated/sql/bnf:joined_table.bnf", + "//docs/generated/sql/bnf:legacy_begin_stmt.bnf", + "//docs/generated/sql/bnf:legacy_end_stmt.bnf", + "//docs/generated/sql/bnf:legacy_transaction_stmt.bnf", + "//docs/generated/sql/bnf:like_table_option_list.bnf", + "//docs/generated/sql/bnf:limit_clause.bnf", + "//docs/generated/sql/bnf:move_cursor_stmt.bnf", + "//docs/generated/sql/bnf:not_null_column_level.bnf", + "//docs/generated/sql/bnf:offset_clause.bnf", + "//docs/generated/sql/bnf:on_conflict.bnf", + "//docs/generated/sql/bnf:opt_frame_clause.bnf", + "//docs/generated/sql/bnf:opt_locality.bnf", + "//docs/generated/sql/bnf:opt_persistence_temp_table.bnf", + "//docs/generated/sql/bnf:opt_with_storage_parameter_list.bnf", + "//docs/generated/sql/bnf:pause_all_jobs_stmt.bnf", + "//docs/generated/sql/bnf:pause_job.bnf", + "//docs/generated/sql/bnf:pause_schedule.bnf", + "//docs/generated/sql/bnf:pause_stmt.bnf", + "//docs/generated/sql/bnf:preparable_stmt.bnf", + "//docs/generated/sql/bnf:prepare_stmt.bnf", + "//docs/generated/sql/bnf:primary_key_column_level.bnf", + "//docs/generated/sql/bnf:primary_key_table_level.bnf", + "//docs/generated/sql/bnf:reassign_owned_by_stmt.bnf", + "//docs/generated/sql/bnf:refresh_materialized_views.bnf", + "//docs/generated/sql/bnf:release_savepoint.bnf", + "//docs/generated/sql/bnf:rename_column.bnf", + "//docs/generated/sql/bnf:rename_constraint.bnf", + "//docs/generated/sql/bnf:rename_database.bnf", + "//docs/generated/sql/bnf:rename_index.bnf", + "//docs/generated/sql/bnf:rename_sequence.bnf", + "//docs/generated/sql/bnf:rename_table.bnf", + "//docs/generated/sql/bnf:reset_csetting_stmt.bnf", + "//docs/generated/sql/bnf:reset_session_stmt.bnf", + "//docs/generated/sql/bnf:reset_stmt.bnf", + "//docs/generated/sql/bnf:restore.bnf", + "//docs/generated/sql/bnf:resume_all_jobs_stmt.bnf", + "//docs/generated/sql/bnf:resume_job.bnf", + "//docs/generated/sql/bnf:resume_schedule.bnf", + "//docs/generated/sql/bnf:resume_stmt.bnf", + "//docs/generated/sql/bnf:revoke_stmt.bnf", + "//docs/generated/sql/bnf:rollback_transaction.bnf", + "//docs/generated/sql/bnf:routine_body_stmt.bnf", + "//docs/generated/sql/bnf:routine_return_stmt.bnf", + "//docs/generated/sql/bnf:row_source_extension_stmt.bnf", + "//docs/generated/sql/bnf:savepoint_stmt.bnf", + "//docs/generated/sql/bnf:scrub_database_stmt.bnf", + "//docs/generated/sql/bnf:scrub_stmt.bnf", + "//docs/generated/sql/bnf:scrub_table_stmt.bnf", + "//docs/generated/sql/bnf:select_clause.bnf", + "//docs/generated/sql/bnf:select_stmt.bnf", + "//docs/generated/sql/bnf:set_cluster_setting.bnf", + "//docs/generated/sql/bnf:set_csetting_stmt.bnf", + "//docs/generated/sql/bnf:set_exprs_internal.bnf", + "//docs/generated/sql/bnf:set_local_stmt.bnf", + "//docs/generated/sql/bnf:set_operation.bnf", + "//docs/generated/sql/bnf:set_or_reset_csetting_stmt.bnf", + "//docs/generated/sql/bnf:set_rest.bnf", + "//docs/generated/sql/bnf:set_rest_more.bnf", + "//docs/generated/sql/bnf:set_session_stmt.bnf", + "//docs/generated/sql/bnf:set_transaction.bnf", + "//docs/generated/sql/bnf:set_transaction_stmt.bnf", + "//docs/generated/sql/bnf:show_backup.bnf", + "//docs/generated/sql/bnf:show_cluster_setting.bnf", + "//docs/generated/sql/bnf:show_columns_stmt.bnf", + "//docs/generated/sql/bnf:show_constraints_stmt.bnf", + "//docs/generated/sql/bnf:show_create_schedules_stmt.bnf", + "//docs/generated/sql/bnf:show_create_stmt.bnf", + "//docs/generated/sql/bnf:show_databases_stmt.bnf", + "//docs/generated/sql/bnf:show_default_privileges_stmt.bnf", + "//docs/generated/sql/bnf:show_enums.bnf", + "//docs/generated/sql/bnf:show_full_scans.bnf", + "//docs/generated/sql/bnf:show_grants_stmt.bnf", + "//docs/generated/sql/bnf:show_indexes_stmt.bnf", + "//docs/generated/sql/bnf:show_jobs.bnf", + "//docs/generated/sql/bnf:show_keys.bnf", + "//docs/generated/sql/bnf:show_local_or_tenant_csettings_stmt.bnf", + "//docs/generated/sql/bnf:show_locality.bnf", + "//docs/generated/sql/bnf:show_locality_stmt.bnf", + "//docs/generated/sql/bnf:show_partitions_stmt.bnf", + "//docs/generated/sql/bnf:show_range_for_row_stmt.bnf", + "//docs/generated/sql/bnf:show_ranges_stmt.bnf", + "//docs/generated/sql/bnf:show_regions.bnf", + "//docs/generated/sql/bnf:show_roles_stmt.bnf", + "//docs/generated/sql/bnf:show_savepoint_status.bnf", + "//docs/generated/sql/bnf:show_schedules.bnf", + "//docs/generated/sql/bnf:show_schemas.bnf", + "//docs/generated/sql/bnf:show_sequences.bnf", + "//docs/generated/sql/bnf:show_session_stmt.bnf", + "//docs/generated/sql/bnf:show_sessions.bnf", + "//docs/generated/sql/bnf:show_statements.bnf", + "//docs/generated/sql/bnf:show_stats.bnf", + "//docs/generated/sql/bnf:show_survival_goal_stmt.bnf", + "//docs/generated/sql/bnf:show_tables.bnf", + "//docs/generated/sql/bnf:show_trace.bnf", + "//docs/generated/sql/bnf:show_transactions_stmt.bnf", + "//docs/generated/sql/bnf:show_transfer_stmt.bnf", + "//docs/generated/sql/bnf:show_types_stmt.bnf", + "//docs/generated/sql/bnf:show_users_stmt.bnf", + "//docs/generated/sql/bnf:show_var.bnf", + "//docs/generated/sql/bnf:show_zone_stmt.bnf", + "//docs/generated/sql/bnf:simple_select_clause.bnf", + "//docs/generated/sql/bnf:sort_clause.bnf", + "//docs/generated/sql/bnf:split_index_at.bnf", + "//docs/generated/sql/bnf:split_table_at.bnf", + "//docs/generated/sql/bnf:stmt.bnf", + "//docs/generated/sql/bnf:stmt_block.bnf", + "//docs/generated/sql/bnf:stmt_without_legacy_transaction.bnf", + "//docs/generated/sql/bnf:table_clause.bnf", + "//docs/generated/sql/bnf:table_constraint.bnf", + "//docs/generated/sql/bnf:table_ref.bnf", + "//docs/generated/sql/bnf:transaction_stmt.bnf", + "//docs/generated/sql/bnf:truncate_stmt.bnf", + "//docs/generated/sql/bnf:unique_column_level.bnf", + "//docs/generated/sql/bnf:unique_table_level.bnf", + "//docs/generated/sql/bnf:unsplit_index_at.bnf", + "//docs/generated/sql/bnf:unsplit_table_at.bnf", + "//docs/generated/sql/bnf:update_stmt.bnf", + "//docs/generated/sql/bnf:upsert_stmt.bnf", + "//docs/generated/sql/bnf:use_stmt.bnf", + "//docs/generated/sql/bnf:validate_constraint.bnf", + "//docs/generated/sql/bnf:values_clause.bnf", + "//docs/generated/sql/bnf:window_definition.bnf", + "//docs/generated/sql/bnf:with_clause.bnf", +] diff --git a/pkg/gen/diagrams.bzl b/pkg/gen/diagrams.bzl new file mode 100644 index 000000000000..bb7fb86140ae --- /dev/null +++ b/pkg/gen/diagrams.bzl @@ -0,0 +1,253 @@ +# Generated by genbzl + +DIAGRAMS_SRCS = [ + "//docs/generated/sql/bnf:abort.html", + "//docs/generated/sql/bnf:add_column.html", + "//docs/generated/sql/bnf:add_constraint.html", + "//docs/generated/sql/bnf:alter.html", + "//docs/generated/sql/bnf:alter_backup.html", + "//docs/generated/sql/bnf:alter_changefeed.html", + "//docs/generated/sql/bnf:alter_column.html", + "//docs/generated/sql/bnf:alter_database.html", + "//docs/generated/sql/bnf:alter_database_add_region.html", + "//docs/generated/sql/bnf:alter_database_add_super_region.html", + "//docs/generated/sql/bnf:alter_database_alter_super_region.html", + "//docs/generated/sql/bnf:alter_database_drop_region.html", + "//docs/generated/sql/bnf:alter_database_drop_super_region.html", + "//docs/generated/sql/bnf:alter_database_owner.html", + "//docs/generated/sql/bnf:alter_database_primary_region.html", + "//docs/generated/sql/bnf:alter_database_survival_goal.html", + "//docs/generated/sql/bnf:alter_database_to_schema.html", + "//docs/generated/sql/bnf:alter_ddl.html", + "//docs/generated/sql/bnf:alter_default_privileges.html", + "//docs/generated/sql/bnf:alter_index.html", + "//docs/generated/sql/bnf:alter_index_partition_by.html", + "//docs/generated/sql/bnf:alter_partition.html", + "//docs/generated/sql/bnf:alter_primary_key.html", + "//docs/generated/sql/bnf:alter_range.html", + "//docs/generated/sql/bnf:alter_range_relocate.html", + "//docs/generated/sql/bnf:alter_rename_view.html", + "//docs/generated/sql/bnf:alter_role.html", + "//docs/generated/sql/bnf:alter_scatter.html", + "//docs/generated/sql/bnf:alter_scatter_index.html", + "//docs/generated/sql/bnf:alter_schema.html", + "//docs/generated/sql/bnf:alter_sequence.html", + "//docs/generated/sql/bnf:alter_sequence_options.html", + "//docs/generated/sql/bnf:alter_sequence_owner.html", + "//docs/generated/sql/bnf:alter_sequence_set_schema.html", + "//docs/generated/sql/bnf:alter_table.html", + "//docs/generated/sql/bnf:alter_table_locality.html", + "//docs/generated/sql/bnf:alter_table_owner.html", + "//docs/generated/sql/bnf:alter_table_partition_by.html", + "//docs/generated/sql/bnf:alter_table_reset_storage_param.html", + "//docs/generated/sql/bnf:alter_table_set_schema.html", + "//docs/generated/sql/bnf:alter_table_set_storage_param.html", + "//docs/generated/sql/bnf:alter_tenant_csetting.html", + "//docs/generated/sql/bnf:alter_type.html", + "//docs/generated/sql/bnf:alter_view.html", + "//docs/generated/sql/bnf:alter_view_owner.html", + "//docs/generated/sql/bnf:alter_view_set_schema.html", + "//docs/generated/sql/bnf:alter_zone_database.html", + "//docs/generated/sql/bnf:alter_zone_index.html", + "//docs/generated/sql/bnf:alter_zone_partition.html", + "//docs/generated/sql/bnf:alter_zone_range.html", + "//docs/generated/sql/bnf:alter_zone_table.html", + "//docs/generated/sql/bnf:analyze.html", + "//docs/generated/sql/bnf:backup.html", + "//docs/generated/sql/bnf:begin.html", + "//docs/generated/sql/bnf:begin_transaction.html", + "//docs/generated/sql/bnf:cancel.html", + "//docs/generated/sql/bnf:cancel_all_jobs.html", + "//docs/generated/sql/bnf:cancel_job.html", + "//docs/generated/sql/bnf:cancel_query.html", + "//docs/generated/sql/bnf:cancel_session.html", + "//docs/generated/sql/bnf:check_column_level.html", + "//docs/generated/sql/bnf:check_table_level.html", + "//docs/generated/sql/bnf:close_cursor.html", + "//docs/generated/sql/bnf:col_qualification.html", + "//docs/generated/sql/bnf:column_def.html", + "//docs/generated/sql/bnf:comment.html", + "//docs/generated/sql/bnf:commit_transaction.html", + "//docs/generated/sql/bnf:copy_from.html", + "//docs/generated/sql/bnf:create.html", + "//docs/generated/sql/bnf:create_as_col_qual_list.html", + "//docs/generated/sql/bnf:create_as_constraint_def.html", + "//docs/generated/sql/bnf:create_changefeed.html", + "//docs/generated/sql/bnf:create_database.html", + "//docs/generated/sql/bnf:create_ddl.html", + "//docs/generated/sql/bnf:create_extension.html", + "//docs/generated/sql/bnf:create_func.html", + "//docs/generated/sql/bnf:create_index.html", + "//docs/generated/sql/bnf:create_index_with_storage_param.html", + "//docs/generated/sql/bnf:create_inverted_index.html", + "//docs/generated/sql/bnf:create_role.html", + "//docs/generated/sql/bnf:create_schedule_for_backup.html", + "//docs/generated/sql/bnf:create_schema.html", + "//docs/generated/sql/bnf:create_sequence.html", + "//docs/generated/sql/bnf:create_stats.html", + "//docs/generated/sql/bnf:create_table.html", + "//docs/generated/sql/bnf:create_table_as.html", + "//docs/generated/sql/bnf:create_table_with_storage_param.html", + "//docs/generated/sql/bnf:create_type.html", + "//docs/generated/sql/bnf:create_view.html", + "//docs/generated/sql/bnf:deallocate.html", + "//docs/generated/sql/bnf:declare_cursor.html", + "//docs/generated/sql/bnf:default_value_column_level.html", + "//docs/generated/sql/bnf:delete.html", + "//docs/generated/sql/bnf:discard.html", + "//docs/generated/sql/bnf:drop.html", + "//docs/generated/sql/bnf:drop_column.html", + "//docs/generated/sql/bnf:drop_constraint.html", + "//docs/generated/sql/bnf:drop_database.html", + "//docs/generated/sql/bnf:drop_ddl.html", + "//docs/generated/sql/bnf:drop_index.html", + "//docs/generated/sql/bnf:drop_owned_by.html", + "//docs/generated/sql/bnf:drop_role.html", + "//docs/generated/sql/bnf:drop_schedule.html", + "//docs/generated/sql/bnf:drop_schema.html", + "//docs/generated/sql/bnf:drop_sequence.html", + "//docs/generated/sql/bnf:drop_table.html", + "//docs/generated/sql/bnf:drop_type.html", + "//docs/generated/sql/bnf:drop_view.html", + "//docs/generated/sql/bnf:execute.html", + "//docs/generated/sql/bnf:experimental_audit.html", + "//docs/generated/sql/bnf:explain.html", + "//docs/generated/sql/bnf:explain_analyze.html", + "//docs/generated/sql/bnf:explainable.html", + "//docs/generated/sql/bnf:export.html", + "//docs/generated/sql/bnf:family_def.html", + "//docs/generated/sql/bnf:fetch_cursor.html", + "//docs/generated/sql/bnf:for_locking.html", + "//docs/generated/sql/bnf:foreign_key_column_level.html", + "//docs/generated/sql/bnf:foreign_key_table_level.html", + "//docs/generated/sql/bnf:generic_set.html", + "//docs/generated/sql/bnf:grant.html", + "//docs/generated/sql/bnf:import_csv.html", + "//docs/generated/sql/bnf:import_dump.html", + "//docs/generated/sql/bnf:import_into.html", + "//docs/generated/sql/bnf:index_def.html", + "//docs/generated/sql/bnf:insert.html", + "//docs/generated/sql/bnf:insert_rest.html", + "//docs/generated/sql/bnf:iso_level.html", + "//docs/generated/sql/bnf:joined_table.html", + "//docs/generated/sql/bnf:legacy_begin.html", + "//docs/generated/sql/bnf:legacy_end.html", + "//docs/generated/sql/bnf:legacy_transaction.html", + "//docs/generated/sql/bnf:like_table_option_list.html", + "//docs/generated/sql/bnf:limit_clause.html", + "//docs/generated/sql/bnf:move_cursor.html", + "//docs/generated/sql/bnf:not_null_column_level.html", + "//docs/generated/sql/bnf:offset_clause.html", + "//docs/generated/sql/bnf:on_conflict.html", + "//docs/generated/sql/bnf:opt_frame_clause.html", + "//docs/generated/sql/bnf:opt_locality.html", + "//docs/generated/sql/bnf:opt_persistence_temp_table.html", + "//docs/generated/sql/bnf:opt_with_storage_parameter_list.html", + "//docs/generated/sql/bnf:pause.html", + "//docs/generated/sql/bnf:pause_all_jobs.html", + "//docs/generated/sql/bnf:pause_job.html", + "//docs/generated/sql/bnf:pause_schedule.html", + "//docs/generated/sql/bnf:preparable.html", + "//docs/generated/sql/bnf:prepare.html", + "//docs/generated/sql/bnf:primary_key_column_level.html", + "//docs/generated/sql/bnf:primary_key_table_level.html", + "//docs/generated/sql/bnf:reassign_owned_by.html", + "//docs/generated/sql/bnf:refresh_materialized_views.html", + "//docs/generated/sql/bnf:release_savepoint.html", + "//docs/generated/sql/bnf:rename_column.html", + "//docs/generated/sql/bnf:rename_constraint.html", + "//docs/generated/sql/bnf:rename_database.html", + "//docs/generated/sql/bnf:rename_index.html", + "//docs/generated/sql/bnf:rename_sequence.html", + "//docs/generated/sql/bnf:rename_table.html", + "//docs/generated/sql/bnf:reset.html", + "//docs/generated/sql/bnf:reset_csetting.html", + "//docs/generated/sql/bnf:reset_session.html", + "//docs/generated/sql/bnf:restore.html", + "//docs/generated/sql/bnf:resume.html", + "//docs/generated/sql/bnf:resume_all_jobs.html", + "//docs/generated/sql/bnf:resume_job.html", + "//docs/generated/sql/bnf:resume_schedule.html", + "//docs/generated/sql/bnf:revoke.html", + "//docs/generated/sql/bnf:rollback_transaction.html", + "//docs/generated/sql/bnf:routine_body.html", + "//docs/generated/sql/bnf:routine_return.html", + "//docs/generated/sql/bnf:row_source_extension.html", + "//docs/generated/sql/bnf:savepoint.html", + "//docs/generated/sql/bnf:scrub.html", + "//docs/generated/sql/bnf:scrub_database.html", + "//docs/generated/sql/bnf:scrub_table.html", + "//docs/generated/sql/bnf:select.html", + "//docs/generated/sql/bnf:select_clause.html", + "//docs/generated/sql/bnf:set_cluster_setting.html", + "//docs/generated/sql/bnf:set_csetting.html", + "//docs/generated/sql/bnf:set_exprs_internal.html", + "//docs/generated/sql/bnf:set_local.html", + "//docs/generated/sql/bnf:set_operation.html", + "//docs/generated/sql/bnf:set_or_reset_csetting.html", + "//docs/generated/sql/bnf:set_rest.html", + "//docs/generated/sql/bnf:set_rest_more.html", + "//docs/generated/sql/bnf:set_session.html", + "//docs/generated/sql/bnf:set_transaction.html", + "//docs/generated/sql/bnf:show_backup.html", + "//docs/generated/sql/bnf:show_cluster_setting.html", + "//docs/generated/sql/bnf:show_columns.html", + "//docs/generated/sql/bnf:show_constraints.html", + "//docs/generated/sql/bnf:show_create.html", + "//docs/generated/sql/bnf:show_create_schedules.html", + "//docs/generated/sql/bnf:show_databases.html", + "//docs/generated/sql/bnf:show_default_privileges.html", + "//docs/generated/sql/bnf:show_enums.html", + "//docs/generated/sql/bnf:show_full_scans.html", + "//docs/generated/sql/bnf:show_grants.html", + "//docs/generated/sql/bnf:show_indexes.html", + "//docs/generated/sql/bnf:show_jobs.html", + "//docs/generated/sql/bnf:show_keys.html", + "//docs/generated/sql/bnf:show_local_or_tenant_csettings.html", + "//docs/generated/sql/bnf:show_locality.html", + "//docs/generated/sql/bnf:show_partitions.html", + "//docs/generated/sql/bnf:show_range_for_row.html", + "//docs/generated/sql/bnf:show_ranges.html", + "//docs/generated/sql/bnf:show_regions.html", + "//docs/generated/sql/bnf:show_roles.html", + "//docs/generated/sql/bnf:show_savepoint_status.html", + "//docs/generated/sql/bnf:show_schedules.html", + "//docs/generated/sql/bnf:show_schemas.html", + "//docs/generated/sql/bnf:show_sequences.html", + "//docs/generated/sql/bnf:show_session.html", + "//docs/generated/sql/bnf:show_sessions.html", + "//docs/generated/sql/bnf:show_statements.html", + "//docs/generated/sql/bnf:show_stats.html", + "//docs/generated/sql/bnf:show_survival_goal.html", + "//docs/generated/sql/bnf:show_tables.html", + "//docs/generated/sql/bnf:show_trace.html", + "//docs/generated/sql/bnf:show_transactions.html", + "//docs/generated/sql/bnf:show_transfer.html", + "//docs/generated/sql/bnf:show_types.html", + "//docs/generated/sql/bnf:show_users.html", + "//docs/generated/sql/bnf:show_var.html", + "//docs/generated/sql/bnf:show_zone.html", + "//docs/generated/sql/bnf:simple_select_clause.html", + "//docs/generated/sql/bnf:sort_clause.html", + "//docs/generated/sql/bnf:split_index_at.html", + "//docs/generated/sql/bnf:split_table_at.html", + "//docs/generated/sql/bnf:stmt.html", + "//docs/generated/sql/bnf:stmt_block.html", + "//docs/generated/sql/bnf:stmt_without_legacy_transaction.html", + "//docs/generated/sql/bnf:table_clause.html", + "//docs/generated/sql/bnf:table_constraint.html", + "//docs/generated/sql/bnf:table_ref.html", + "//docs/generated/sql/bnf:transaction.html", + "//docs/generated/sql/bnf:truncate.html", + "//docs/generated/sql/bnf:unique_column_level.html", + "//docs/generated/sql/bnf:unique_table_level.html", + "//docs/generated/sql/bnf:unsplit_index_at.html", + "//docs/generated/sql/bnf:unsplit_table_at.html", + "//docs/generated/sql/bnf:update.html", + "//docs/generated/sql/bnf:upsert.html", + "//docs/generated/sql/bnf:use.html", + "//docs/generated/sql/bnf:validate_constraint.html", + "//docs/generated/sql/bnf:values_clause.html", + "//docs/generated/sql/bnf:window_definition.html", + "//docs/generated/sql/bnf:with_clause.html", +] diff --git a/pkg/gen/gen.bzl b/pkg/gen/gen.bzl index 367991521389..b94e36a59dbb 100644 --- a/pkg/gen/gen.bzl +++ b/pkg/gen/gen.bzl @@ -29,6 +29,8 @@ load(":protobuf.bzl", "PROTOBUF_SRCS") load(":stringer.bzl", "STRINGER_SRCS") load(":parser.bzl", "PARSER_SRCS") load(":schemachanger.bzl", "SCHEMACHANGER_SRCS") +load(":diagrams.bzl", "DIAGRAMS_SRCS") +load(":bnf.bzl", "BNF_SRCS") # GeneratedFileInfo provides two pieces of information to the _hoist_files # rule. It provides the set of files to be hoisted via the generated_files @@ -295,5 +297,17 @@ def schemachanger(): srcs = SCHEMACHANGER_SRCS, ) +def diagrams(): + _hoist_no_prefix( + name = "diagrams", + srcs = DIAGRAMS_SRCS, + ) + +def bnf(): + _hoist_no_prefix( + name = "bnf", + srcs = BNF_SRCS, + ) + def gen(name, srcs): _hoist_files(name = name, data = srcs, tags = ["no-remote-exec"]) diff --git a/pkg/gen/genbzl/targets.go b/pkg/gen/genbzl/targets.go index 563dc522d8b5..11cb0ca2e7e8 100644 --- a/pkg/gen/genbzl/targets.go +++ b/pkg/gen/genbzl/targets.go @@ -53,11 +53,17 @@ let targets = attr("exec_tools", "(opt|lang)gen", kind("genrule rule", {{ .All in let og = labels("outs", $targets) in $og - filter(".*:.*(-gen|gen-).*", $og)`, }, + { + target: "diagrams", + query: `labels("outs", //docs/generated/sql/bnf:svg)`, + }, + { + target: "bnf", + query: `labels("outs", //docs/generated/sql/bnf:bnf)`, + }, { target: "docs", - query: ` -kind("generated file", //docs/...:*) - - labels("outs", //docs/generated/sql/bnf:svg)`, + query: `kind("generated file", //docs/...:*) - ({{ template "diagrams" $ }})`, }, { target: "parser", @@ -79,7 +85,6 @@ in ($all ^ labels("out", kind("_gomock_prog_gen rule", {{ .All }}))) + //pkg/testutils/lint/passes/errcheck:errcheck_excludes.txt + //build/bazelutil:test_force_build_cdeps.txt + //build/bazelutil:test_stamping.txt - + labels("outs", //docs/generated/sql/bnf:svg) `, doNotGenerate: true, }, @@ -101,6 +106,8 @@ kind("generated file", {{ .All }}) - ( + ({{ template "excluded" $ }}) + ({{ template "parser" $ }}) + ({{ template "schemachanger" $ }}) + + ({{ template "diagrams" $ }}) + + ({{ template "bnf" $ }}) )`, }, } From e0c33b444ca887eb6775fb77af6602e050cebaf4 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Thu, 2 Jun 2022 19:02:18 -0400 Subject: [PATCH 3/4] changefeedccl/schemafeed: add datadriven test This change also refactors some of the event classifications Release note: None --- pkg/ccl/changefeedccl/schemafeed/BUILD.bazel | 17 ++ .../changefeedccl/schemafeed/helpers_test.go | 37 ++++ .../schemafeed/table_event_filter.go | 113 ++++++------ .../table_event_filter_datadriven_test.go | 166 ++++++++++++++++++ .../schemafeed/table_event_filter_test.go | 8 +- .../schemafeed/tableeventtype_string.go | 30 ++++ .../schemafeed/testdata/add_column | 42 +++++ .../schemafeed/testdata/drop_column | 28 +++ pkg/cmd/roachtest/tests/cdc.go | 33 ++-- pkg/gen/stringer.bzl | 1 + 10 files changed, 401 insertions(+), 74 deletions(-) create mode 100644 pkg/ccl/changefeedccl/schemafeed/helpers_test.go create mode 100644 pkg/ccl/changefeedccl/schemafeed/table_event_filter_datadriven_test.go create mode 100644 pkg/ccl/changefeedccl/schemafeed/tableeventtype_string.go create mode 100644 pkg/ccl/changefeedccl/schemafeed/testdata/add_column create mode 100644 pkg/ccl/changefeedccl/schemafeed/testdata/drop_column diff --git a/pkg/ccl/changefeedccl/schemafeed/BUILD.bazel b/pkg/ccl/changefeedccl/schemafeed/BUILD.bazel index a266ced0970f..a01b601a4ed0 100644 --- a/pkg/ccl/changefeedccl/schemafeed/BUILD.bazel +++ b/pkg/ccl/changefeedccl/schemafeed/BUILD.bazel @@ -1,9 +1,11 @@ load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") +load("//build:STRINGER.bzl", "stringer") go_library( name = "schemafeed", srcs = [ + "gen-tableEventType-stringer", # keep "metrics.go", "schema_feed.go", "table_event_filter.go", @@ -45,20 +47,26 @@ go_test( name = "schemafeed_test", size = "medium", srcs = [ + "helpers_test.go", "main_test.go", "schema_feed_test.go", + "table_event_filter_datadriven_test.go", "table_event_filter_test.go", ], + data = glob(["testdata/**"]), embed = [":schemafeed"], deps = [ "//pkg/base", + "//pkg/ccl/changefeedccl/changefeedbase", "//pkg/ccl/changefeedccl/schemafeed/schematestutils", "//pkg/ccl/utilccl", + "//pkg/jobs/jobspb", "//pkg/keys", "//pkg/roachpb", "//pkg/security/securityassets", "//pkg/security/securitytest", "//pkg/server", + "//pkg/sql", "//pkg/sql/catalog", "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/tabledesc", @@ -70,10 +78,19 @@ go_test( "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/randutil", + "@com_github_cockroachdb_datadriven//:datadriven", "@com_github_cockroachdb_errors//:errors", "@com_github_gogo_protobuf//proto", + "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", ], ) +stringer( + name = "gen-tableEventType-stringer", + src = "table_event_filter.go", + additional_args = ["--trimprefix=tableEvent"], + typ = "tableEventType", +) + get_x_data(name = "get_x_data") diff --git a/pkg/ccl/changefeedccl/schemafeed/helpers_test.go b/pkg/ccl/changefeedccl/schemafeed/helpers_test.go new file mode 100644 index 000000000000..91f61db019a1 --- /dev/null +++ b/pkg/ccl/changefeedccl/schemafeed/helpers_test.go @@ -0,0 +1,37 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package schemafeed + +import "strings" + +const TestingAllEventFilter = "testing" + +func init() { + schemaChangeEventFilters[TestingAllEventFilter] = tableEventFilter{ + tableEventDropColumn: false, + tableEventAddColumnWithBackfill: false, + tableEventAddColumnNoBackfill: false, + tableEventUnknown: false, + tableEventPrimaryKeyChange: false, + tableEventLocalityRegionalByRowChange: false, + tableEventAddHiddenColumn: false, + } +} + +var ClassifyEvent = classifyTableEvent + +func PrintTableEventType(t tableEventType) string { + var strs []string + for i := 0; i < 63; i++ { + if t&1< +// Creates a schemafeed with the targets specified as the input with the +// provided ID. +// +// - "pop" f= +// Pop all events from the schemafeed with the given ID. +// The structure of the events looks like as follows: +// +// t 1->2: Unknown +// t 2->3: Unknown +// t 3->4: Unknown +// t 4->5: Unknown +// t 5->6: Unknown +// t 6->7: PrimaryKeyChange +// t 7->8: Unknown +// t 8->9: Unknown +// +// The first column is the name of the table in question. +// The second is the version transition. The third indicates +// the event classification. +// +func TestDataDriven(t *testing.T) { + defer leaktest.AfterTest(t)() + + defer log.Scope(t).Close(t) + + testData := testutils.TestDataPath(t, "") + datadriven.Walk(t, testData, func(t *testing.T, path string) { + ctx := context.Background() + ts, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ + // TODO(ajwerner): Otherwise the test gets skipped due to some CCL warning. + DisableDefaultTestTenant: true, + }) + tdb := sqlutils.MakeSQLRunner(sqlDB) + defer ts.Stopper().Stop(ctx) + ctx, cancel := ts.Stopper().WithCancelOnQuiesce(ctx) + defer cancel() + schemaFeeds := map[int]schemafeed.SchemaFeed{} + parseTargets := func(t *testing.T, in string) (targets changefeedbase.Targets) { + tables := strings.Fields(in) + for _, tab := range tables { + var tableID descpb.ID + var statementTimeName changefeedbase.StatementTimeName + tdb.QueryRow(t, "SELECT $1::regclass::int, $1::regclass::string", tab).Scan( + &tableID, &statementTimeName) + targets.Add(changefeedbase.Target{ + Type: jobspb.ChangefeedTargetSpecification_PRIMARY_FAMILY_ONLY, + TableID: tableID, + FamilyName: "primary", + StatementTimeName: statementTimeName, + }) + } + return targets + } + errCh := make(chan error, 1) + defer func() { + select { + case err := <-errCh: + t.Fatalf("unexpected error: %v", err) + default: + } + }() + datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { + select { + case err := <-errCh: + d.Fatalf(t, "unexpected error: %v", err) + default: + } + t.Log(d.Cmd) + switch d.Cmd { + case "exec": + if _, err := sqlDB.Exec(d.Input); err != nil { + d.Fatalf(t, "failed to exec: %v", err) + } + return "" + case "create": + var i int + d.ScanArgs(t, "f", &i) + if _, exists := schemaFeeds[i]; exists { + d.Fatalf(t, "feed %d already created", i) + } + cfg := &ts.SQLServer().(*sql.Server).GetExecutorConfig().DistSQLSrv.ServerConfig + now := ts.Clock().Now() + targets := parseTargets(t, d.Input) + f := schemafeed.New(ctx, cfg, schemafeed.TestingAllEventFilter, targets, now, nil, changefeedbase.CanHandle{ + MultipleColumnFamilies: true, + VirtualColumns: true, + }) + schemaFeeds[i] = f + + go func() { + if err := f.Run(ctx); !assert.NoError(t, err) { + select { + case errCh <- err: + default: + } + } + }() + return "" + case "pop": + var i int + d.ScanArgs(t, "f", &i) + sf, exists := schemaFeeds[i] + if !exists { + d.Fatalf(t, "feed %d does not exist", i) + } + events, err := sf.Pop(ctx, ts.Clock().Now()) + if err != nil { + return err.Error() + } + var buf strings.Builder + for i, ev := range events { + if i > 0 { + buf.WriteString("\n") + } + _, _ = fmt.Fprintf(&buf, "%s %d->%d: %s", + ev.After.GetName(), ev.Before.GetVersion(), ev.After.GetVersion(), + schemafeed.ClassifyEvent(ev), + ) + } + return buf.String() + default: + d.Fatalf(t, "unexpected command %s", d.Cmd) + return "" + } + }) + }) +} diff --git a/pkg/ccl/changefeedccl/schemafeed/table_event_filter_test.go b/pkg/ccl/changefeedccl/schemafeed/table_event_filter_test.go index 14615c7196b5..723a8653e8ac 100644 --- a/pkg/ccl/changefeedccl/schemafeed/table_event_filter_test.go +++ b/pkg/ccl/changefeedccl/schemafeed/table_event_filter_test.go @@ -243,10 +243,10 @@ func TestTableEventFilterErrorsWithIncompletePolicy(t *testing.T) { dropColBackfill := schematestutils.AddColumnDropBackfillMutation incompleteFilter := tableEventFilter{ - // tableEventTypeDropColumn: false, - tableEventTypeAddColumnWithBackfill: false, - tableEventTypeAddColumnNoBackfill: true, - // tableEventTypeUnknown: true, + // tableEventDropColumn: false, + tableEventAddColumnWithBackfill: false, + tableEventAddColumnNoBackfill: true, + // tableEventUnknown: true, tableEventPrimaryKeyChange: false, } dropColEvent := TableEvent{ diff --git a/pkg/ccl/changefeedccl/schemafeed/tableeventtype_string.go b/pkg/ccl/changefeedccl/schemafeed/tableeventtype_string.go new file mode 100644 index 000000000000..520149527a94 --- /dev/null +++ b/pkg/ccl/changefeedccl/schemafeed/tableeventtype_string.go @@ -0,0 +1,30 @@ +// Code generated by "stringer"; DO NOT EDIT. + +package schemafeed + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[tableEventUnknown-0] + _ = x[tableEventAddColumnNoBackfill-1] + _ = x[tableEventAddColumnWithBackfill-2] + _ = x[tableEventDropColumn-3] + _ = x[tableEventTruncate-4] + _ = x[tableEventPrimaryKeyChange-5] + _ = x[tableEventLocalityRegionalByRowChange-6] + _ = x[tableEventAddHiddenColumn-7] +} + +const _tableEventType_name = "UnknownAddColumnNoBackfillAddColumnWithBackfillDropColumnTruncatePrimaryKeyChangeLocalityRegionalByRowChangeAddHiddenColumn" + +var _tableEventType_index = [...]uint8{0, 7, 26, 47, 57, 65, 81, 108, 123} + +func (i tableEventType) String() string { + if i >= tableEventType(len(_tableEventType_index)-1) { + return "tableEventType(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _tableEventType_name[_tableEventType_index[i]:_tableEventType_index[i+1]] +} diff --git a/pkg/ccl/changefeedccl/schemafeed/testdata/add_column b/pkg/ccl/changefeedccl/schemafeed/testdata/add_column new file mode 100644 index 000000000000..d8e2973bdbb8 --- /dev/null +++ b/pkg/ccl/changefeedccl/schemafeed/testdata/add_column @@ -0,0 +1,42 @@ +exec +CREATE TABLE t (i INT PRIMARY KEY) +---- + +create f=1 +t +---- + +exec +ALTER TABLE t ADD COLUMN j INT; +---- + +pop f=1 +---- +t 1->2: Unknown +t 2->3: Unknown +t 3->4: AddColumnNoBackfill + +exec +ALTER TABLE t ADD COLUMN k INT DEFAULT 42; +---- + +pop f=1 +---- +t 4->5: Unknown +t 5->6: Unknown +t 6->7: Unknown +t 7->8: Unknown +t 8->9: AddColumnWithBackfill|PrimaryKeyChange +t 9->10: Unknown +t 10->11: Unknown + +exec +SET use_declarative_schema_changer=off; +ALTER TABLE t ADD COLUMN l INT NOT NULL DEFAULT 42; +---- + +pop f=1 +---- +t 11->12: Unknown +t 12->13: Unknown +t 13->14: AddColumnWithBackfill diff --git a/pkg/ccl/changefeedccl/schemafeed/testdata/drop_column b/pkg/ccl/changefeedccl/schemafeed/testdata/drop_column new file mode 100644 index 000000000000..df51ec2788fc --- /dev/null +++ b/pkg/ccl/changefeedccl/schemafeed/testdata/drop_column @@ -0,0 +1,28 @@ +exec +CREATE TABLE t (i INT PRIMARY KEY, j INT, k INT, l INT) +---- + +create f=1 +t +---- + +exec +ALTER TABLE t DROP COLUMN j; +---- + +pop f=1 +---- +t 1->2: DropColumn +t 2->3: Unknown +t 3->4: AddHiddenColumn + +exec +SET use_declarative_schema_changer=off; +ALTER TABLE t DROP COLUMN k; +---- + +pop f=1 +---- +t 4->5: DropColumn +t 5->6: Unknown +t 6->7: AddHiddenColumn diff --git a/pkg/cmd/roachtest/tests/cdc.go b/pkg/cmd/roachtest/tests/cdc.go index d41fdfd12e9a..3470d0682401 100644 --- a/pkg/cmd/roachtest/tests/cdc.go +++ b/pkg/cmd/roachtest/tests/cdc.go @@ -500,26 +500,19 @@ func runCDCSchemaRegistry(ctx context.Context, t test.Test, c cluster.Cluster) { t.Fatal(err) } - if _, err := db.Exec(`INSERT INTO foo VALUES (1)`); err != nil { - t.Fatal(err) - } - if _, err := db.Exec(`ALTER TABLE foo ADD COLUMN b STRING`); err != nil { - t.Fatal(err) - } - if _, err := db.Exec(`INSERT INTO foo VALUES (2, '2')`); err != nil { - t.Fatal(err) - } - if _, err := db.Exec(`ALTER TABLE foo ADD COLUMN c INT`); err != nil { - t.Fatal(err) - } - if _, err := db.Exec(`INSERT INTO foo VALUES (3, '3', 3)`); err != nil { - t.Fatal(err) - } - if _, err := db.Exec(`ALTER TABLE foo DROP COLUMN b`); err != nil { - t.Fatal(err) - } - if _, err := db.Exec(`INSERT INTO foo VALUES (4, 4)`); err != nil { - t.Fatal(err) + for _, stmt := range []string{ + `INSERT INTO foo VALUES (1)`, + `ALTER TABLE foo ADD COLUMN b STRING`, + `INSERT INTO foo VALUES (2, '2')`, + `ALTER TABLE foo ADD COLUMN c INT`, + `INSERT INTO foo VALUES (3, '3', 3)`, + `ALTER TABLE foo DROP COLUMN b`, + `INSERT INTO foo VALUES (4, 4)`, + } { + t.L().Printf("Executing SQL: %s", stmt) + if _, err := db.Exec(stmt); err != nil { + t.Fatalf("failed to execute %s: %v", stmt, err) + } } // There are various internal races and retries in changefeeds that can diff --git a/pkg/gen/stringer.bzl b/pkg/gen/stringer.bzl index f9bcc4078388..276d6322af30 100644 --- a/pkg/gen/stringer.bzl +++ b/pkg/gen/stringer.bzl @@ -2,6 +2,7 @@ STRINGER_SRCS = [ "//pkg/base:testclusterreplicationmode_string.go", + "//pkg/ccl/changefeedccl/schemafeed:tableeventtype_string.go", "//pkg/ccl/sqlproxyccl:errorcode_string.go", "//pkg/cli:keytype_string.go", "//pkg/clusterversion:key_string.go", From e6cf9084c8740320fe12b391a66d9f37d355ef7e Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Mon, 18 Jul 2022 02:13:09 -0400 Subject: [PATCH 4/4] changefeedccl: properly handle drop column index swaps This change adds the capability for changefeeds to detect primary index changes which do not correspond to changes in the set of columns or the ordering of the key columns, as will be used in a later commit by the `DROP COLUMN` schema changes in the declarative schema changer. Release note: None --- pkg/ccl/changefeedccl/kvfeed/kv_feed.go | 31 +++++++++---- .../schemafeed/table_event_filter.go | 46 +++++++++++++++++-- .../table_event_filter_datadriven_test.go | 3 ++ .../schemafeed/table_event_filter_test.go | 3 +- .../schemafeed/testdata/alter_primary_key | 37 +++++++++++++++ 5 files changed, 107 insertions(+), 13 deletions(-) create mode 100644 pkg/ccl/changefeedccl/schemafeed/testdata/alter_primary_key diff --git a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go index a8f206e51028..3d4ed2ebcd05 100644 --- a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go @@ -283,13 +283,22 @@ func (f *kvFeed) run(ctx context.Context) (err error) { highWater := rangeFeedResumeFrontier.Frontier() boundaryType := jobspb.ResolvedSpan_BACKFILL - if f.schemaChangePolicy == changefeedbase.OptSchemaChangePolicyStop { - boundaryType = jobspb.ResolvedSpan_EXIT - } else if events, err := f.tableFeed.Peek(ctx, highWater.Next()); err == nil && isPrimaryKeyChange(events) { - boundaryType = jobspb.ResolvedSpan_RESTART - } else if err != nil { + events, err := f.tableFeed.Peek(ctx, highWater.Next()) + if err != nil { return err } + // Detect whether the event corresponds to a primary index change. Also + // detect whether that primary index change corresponds to any change in + // the primary key or in the set of visible columns. If it corresponds to + // no such change, than it may be a column being dropped physically and + // should not trigger a failure in the `stop` policy. + primaryIndexChange, noColumnChanges := isPrimaryKeyChange(events) + if primaryIndexChange && (noColumnChanges || + f.schemaChangePolicy != changefeedbase.OptSchemaChangePolicyStop) { + boundaryType = jobspb.ResolvedSpan_RESTART + } else if f.schemaChangePolicy == changefeedbase.OptSchemaChangePolicyStop { + boundaryType = jobspb.ResolvedSpan_EXIT + } // Resolve all of the spans as a boundary if the policy indicates that // we should do so. if f.schemaChangePolicy != changefeedbase.OptSchemaChangePolicyNoBackfill || @@ -306,13 +315,17 @@ func (f *kvFeed) run(ctx context.Context) (err error) { } } -func isPrimaryKeyChange(events []schemafeed.TableEvent) bool { +func isPrimaryKeyChange( + events []schemafeed.TableEvent, +) (isPrimaryIndexChange, hasNoColumnChanges bool) { + hasNoColumnChanges = true for _, ev := range events { - if schemafeed.IsPrimaryIndexChange(ev) { - return true + if ok, noColumnChange := schemafeed.IsPrimaryIndexChange(ev); ok { + isPrimaryIndexChange = true + hasNoColumnChanges = hasNoColumnChanges && noColumnChange } } - return false + return isPrimaryIndexChange, isPrimaryIndexChange && hasNoColumnChanges } // filterCheckpointSpans filters spans which have already been completed, diff --git a/pkg/ccl/changefeedccl/schemafeed/table_event_filter.go b/pkg/ccl/changefeedccl/schemafeed/table_event_filter.go index 7eddfdb6ecd2..b7f3c519e387 100644 --- a/pkg/ccl/changefeedccl/schemafeed/table_event_filter.go +++ b/pkg/ccl/changefeedccl/schemafeed/table_event_filter.go @@ -221,10 +221,50 @@ func regionalByRowChanged(e TableEvent) bool { return e.Before.IsLocalityRegionalByRow() != e.After.IsLocalityRegionalByRow() } +func hasNewPrimaryIndexWithNoVisibleColumnChanges(e TableEvent) bool { + before, after := e.Before.GetPrimaryIndex(), e.After.GetPrimaryIndex() + if before.GetID() == after.GetID() || + before.NumKeyColumns() != after.NumKeyColumns() { + return false + } + for i, n := 0, before.NumKeyColumns(); i < n; i++ { + if before.GetKeyColumnID(i) != after.GetKeyColumnID(i) { + return false + } + } + collectPublicStoredColumns := func( + idx catalog.Index, tab catalog.TableDescriptor, + ) (cols catalog.TableColSet) { + for i, n := 0, idx.NumPrimaryStoredColumns(); i < n; i++ { + colID := idx.GetStoredColumnID(i) + col, _ := tab.FindColumnWithID(colID) + if col.Public() { + cols.Add(colID) + } + } + return cols + } + storedBefore := collectPublicStoredColumns(before, e.Before) + storedAfter := collectPublicStoredColumns(after, e.After) + return storedBefore.Len() == storedAfter.Len() && + storedBefore.Difference(storedAfter).Empty() +} + // IsPrimaryIndexChange returns true if the event corresponds to a change -// in the primary index. -func IsPrimaryIndexChange(e TableEvent) bool { - return classifyTableEvent(e).Contains(tableEventPrimaryKeyChange) +// in the primary index. It also returns whether the primary index change +// corresponds to any change in the visible column set or key ordering. +// This is useful because when the declarative schema changer drops a column, +// it does so by adding a new primary index with the column excluded and +// then swaps to the new primary index. The column logically disappears +// before the index swap occurs. We want to detect the case of this index +// swap and not stop changefeeds which are programmed to stop upon schema +// changes. +func IsPrimaryIndexChange(e TableEvent) (isPrimaryIndexChange, noVisibleOrderOrColumnChanges bool) { + isPrimaryIndexChange = classifyTableEvent(e).Contains(tableEventPrimaryKeyChange) + if isPrimaryIndexChange { + noVisibleOrderOrColumnChanges = hasNewPrimaryIndexWithNoVisibleColumnChanges(e) + } + return isPrimaryIndexChange, noVisibleOrderOrColumnChanges } // IsOnlyPrimaryIndexChange returns to true if the event corresponds diff --git a/pkg/ccl/changefeedccl/schemafeed/table_event_filter_datadriven_test.go b/pkg/ccl/changefeedccl/schemafeed/table_event_filter_datadriven_test.go index 4fe3f3cdef94..57e1decc152d 100644 --- a/pkg/ccl/changefeedccl/schemafeed/table_event_filter_datadriven_test.go +++ b/pkg/ccl/changefeedccl/schemafeed/table_event_filter_datadriven_test.go @@ -155,6 +155,9 @@ func TestDataDriven(t *testing.T) { ev.After.GetName(), ev.Before.GetVersion(), ev.After.GetVersion(), schemafeed.ClassifyEvent(ev), ) + if _, noColumnChanges := schemafeed.IsPrimaryIndexChange(ev); noColumnChanges { + buf.WriteString(" (no column changes)") + } } return buf.String() default: diff --git a/pkg/ccl/changefeedccl/schemafeed/table_event_filter_test.go b/pkg/ccl/changefeedccl/schemafeed/table_event_filter_test.go index 723a8653e8ac..c6bb55ab805b 100644 --- a/pkg/ccl/changefeedccl/schemafeed/table_event_filter_test.go +++ b/pkg/ccl/changefeedccl/schemafeed/table_event_filter_test.go @@ -146,7 +146,8 @@ func TestTableEventIsPrimaryIndexChange(t *testing.T) { }, } { t.Run(c.name, func(t *testing.T) { - require.Equalf(t, c.exp, IsPrimaryIndexChange(c.e), "event %v", c.e) + got, _ := IsPrimaryIndexChange(c.e) + require.Equalf(t, c.exp, got, "event %v", c.e) }) } } diff --git a/pkg/ccl/changefeedccl/schemafeed/testdata/alter_primary_key b/pkg/ccl/changefeedccl/schemafeed/testdata/alter_primary_key new file mode 100644 index 000000000000..8e8921d829ef --- /dev/null +++ b/pkg/ccl/changefeedccl/schemafeed/testdata/alter_primary_key @@ -0,0 +1,37 @@ +exec +CREATE TABLE t (i INT PRIMARY KEY, j INT NOT NULL, k INT NOT NULL) +---- + +create f=1 +t +---- + +exec +ALTER TABLE t ALTER PRIMARY KEY USING COLUMNS (j, k); +---- + +pop f=1 +---- +t 1->2: Unknown +t 2->3: Unknown +t 3->4: Unknown +t 4->5: Unknown +t 5->6: Unknown +t 6->7: PrimaryKeyChange +t 7->8: Unknown +t 8->9: Unknown + +exec +ALTER TABLE t ALTER PRIMARY KEY USING COLUMNS (k, j); +---- + +pop f=1 +---- +t 9->10: Unknown +t 10->11: Unknown +t 11->12: Unknown +t 12->13: Unknown +t 13->14: Unknown +t 14->15: PrimaryKeyChange +t 15->16: Unknown +t 16->17: Unknown