From ad326c11e31e27ced0cabf02bd9616e029425d10 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Fri, 29 Sep 2023 03:04:53 +0200 Subject: [PATCH 1/6] settingswatcher: move an inline function to a method (This function/method will also be deleted in a later commit.) Release note: None --- .../settingswatcher/settings_watcher.go | 45 ++++++++++--------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/pkg/server/settingswatcher/settings_watcher.go b/pkg/server/settingswatcher/settings_watcher.go index 1a589a6cf128..89662a32afc1 100644 --- a/pkg/server/settingswatcher/settings_watcher.go +++ b/pkg/server/settingswatcher/settings_watcher.go @@ -45,7 +45,12 @@ type SettingsWatcher struct { f *rangefeed.Factory stopper *stop.Stopper dec RowDecoder - storage Storage + + // storage is used to persist a local cache of the setting + // overrides, for use when a node starts up before KV is ready. + storage Storage + // snapshot is what goes into the local cache. + snapshot []roachpb.KeyValue overridesMonitor OverridesMonitor @@ -194,24 +199,6 @@ func (s *SettingsWatcher) Start(ctx context.Context) error { if s.storage != nil { bufferSize = settings.MaxSettings * 3 } - var snapshot []roachpb.KeyValue // used with storage - maybeUpdateSnapshot := func(update rangefeedcache.Update) { - // Only record the update to the buffer if we're writing to storage. - if s.storage == nil || - // and the update has some new information to write. - (update.Type == rangefeedcache.IncrementalUpdate && len(update.Events) == 0) { - return - } - eventKVs := rangefeedbuffer.EventsToKVs(update.Events, - rangefeedbuffer.RangeFeedValueEventToKV) - switch update.Type { - case rangefeedcache.CompleteUpdate: - snapshot = eventKVs - case rangefeedcache.IncrementalUpdate: - snapshot = rangefeedbuffer.MergeKVs(snapshot, eventKVs) - } - s.storage.SnapshotKVs(ctx, snapshot) - } c := rangefeedcache.NewWatcher( "settings-watcher", s.clock, s.f, @@ -223,7 +210,7 @@ func (s *SettingsWatcher) Start(ctx context.Context) error { }, func(ctx context.Context, update rangefeedcache.Update) { noteUpdate(update) - maybeUpdateSnapshot(update) + s.maybeUpdateSnapshot(ctx, update) }, s.testingWatcherKnobs, ) @@ -255,6 +242,24 @@ func (s *SettingsWatcher) Start(ctx context.Context) error { } } +func (s *SettingsWatcher) maybeUpdateSnapshot(ctx context.Context, update rangefeedcache.Update) { + // Only record the update to the buffer if we're writing to storage. + if s.storage == nil || + // and the update has some new information to write. + (update.Type == rangefeedcache.IncrementalUpdate && len(update.Events) == 0) { + return + } + eventKVs := rangefeedbuffer.EventsToKVs(update.Events, + rangefeedbuffer.RangeFeedValueEventToKV) + switch update.Type { + case rangefeedcache.CompleteUpdate: + s.snapshot = eventKVs + case rangefeedcache.IncrementalUpdate: + s.snapshot = rangefeedbuffer.MergeKVs(s.snapshot, eventKVs) + } + s.storage.SnapshotKVs(ctx, s.snapshot) +} + // TestingRestart restarts the rangefeeds and waits for the initial // update after the rangefeed update to be processed. func (s *SettingsWatcher) TestingRestart() { From ece6d5715f7b23de619b0c90c3863c77142561b3 Mon Sep 17 00:00:00 2001 From: Stan Rosenberg Date: Fri, 22 Sep 2023 17:08:48 -0400 Subject: [PATCH 2/6] roachtest: harmonize GCE and AWS machine types Previously, a (performance) roachtest executed in GCE and AWS may have used a different memory (per CPU) multiplier and/or cpu family, e.g., cascade lake vs ice lake. In the best case, this resulted in different performance baselines on an otherwise equivalent machine type. In the worst case, this resulted in OOMs due to VMs in AWS having 2x less memory per CPU. This change harmozines GCE and AWS machine types by making them as isomorphic as possible, wrt memory, cpu family and price. The following heuristics are used depending on specified `MemPerCPU`: `Standard` yields 4GB/cpu, `High` yields 8GB/cpu, `Auto` yields 4GB/cpu up to and including 16 vCPUs, then 2GB/cpu. `Low` is supported _only_ in GCE. Consequently, `n2-standard` maps to `m6i`, `n2-highmem` maps to `r6i`, `n2-custom` maps to `c6i`, modulo local SSDs in which case `m6id` is used, etc. Note, we also force `--gce-min-cpu-platform` to `Ice Lake`; isomorphic AWS machine types are exclusively on `Ice Lake`. Roachprod is extended to show cpu family and architecture on `List`. Cost estimation now correctly deals with _custom_ machine types. Finally, we change the default zone allocation in GCE from exclusively `us-east1-b` to ~25% `us-central1-b` and ~75% `us-east1-b`. This is intended to balance the quotas for local SSDs until we eventually switch to PD-SSDs. Epic: none Fixes: #106570 Release note: None --- pkg/cmd/roachprod/main.go | 32 ++- pkg/cmd/roachtest/cluster.go | 7 +- pkg/cmd/roachtest/cluster_test.go | 231 ++++++++++++++++++ pkg/cmd/roachtest/spec/cluster_spec.go | 15 +- pkg/cmd/roachtest/spec/machine_type.go | 132 +++++++--- .../tests/admission_control_database_drop.go | 1 - .../tests/admission_control_index_backfill.go | 1 - pkg/cmd/roachtest/tests/restore.go | 2 +- pkg/roachprod/cloud/cluster_cloud.go | 2 +- pkg/roachprod/vm/aws/aws.go | 8 +- pkg/roachprod/vm/gce/gcloud.go | 66 ++++- pkg/roachprod/vm/vm.go | 35 ++- pkg/roachprod/vm/vm_test.go | 23 ++ 13 files changed, 479 insertions(+), 76 deletions(-) diff --git a/pkg/cmd/roachprod/main.go b/pkg/cmd/roachprod/main.go index a2c415f53d69..7f61d4f019d7 100644 --- a/pkg/cmd/roachprod/main.go +++ b/pkg/cmd/roachprod/main.go @@ -279,12 +279,22 @@ hosts file. } } else { machineType := func(clusterVMs vm.List) string { - res := clusterVMs[0].MachineType - // Display CPU architecture, other than amd64 (default). - if arch := clusterVMs[0].Labels["arch"]; arch != "" && arch != string(vm.ArchAMD64) { - res += fmt.Sprintf(" [%s]", arch) + return clusterVMs[0].MachineType + } + cpuArch := func(clusterVMs vm.List) string { + // Display CPU architecture and family. + if clusterVMs[0].CPUArch == "" { + // N.B. Either a local cluster or unsupported cloud provider. + return "" + } + if clusterVMs[0].CPUFamily != "" { + return clusterVMs[0].CPUFamily + } + if clusterVMs[0].CPUArch != vm.ArchAMD64 { + return string(clusterVMs[0].CPUArch) } - return res + // AMD64 is the default, so don't display it. + return "" } // Align columns left and separate with at least two spaces. tw := tabwriter.NewWriter(os.Stdout, 0, 8, 2, ' ', tabwriter.AlignRight) @@ -293,14 +303,14 @@ hosts file. // [1] https://github.com/golang/go/issues/12073 // Print header. - fmt.Fprintf(tw, "%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t\n", - "Cluster", "Clouds", "Size", "VM", + fmt.Fprintf(tw, "%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t\n", + "Cluster", "Clouds", "Size", "VM", "Arch", color.HiWhiteString("$/hour"), color.HiWhiteString("$ Spent"), color.HiWhiteString("Uptime"), color.HiWhiteString("TTL"), color.HiWhiteString("$/TTL")) // Print separator. - fmt.Fprintf(tw, "%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t\n", - "", "", "", + fmt.Fprintf(tw, "%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t\n", + "", "", "", "", color.HiWhiteString(""), color.HiWhiteString(""), color.HiWhiteString(""), color.HiWhiteString(""), color.HiWhiteString("")) @@ -312,8 +322,8 @@ hosts file. } else { // N.B. Tabwriter doesn't support per-column alignment. It looks odd to have the cluster names right-aligned, // so we make it left-aligned. - fmt.Fprintf(tw, "%s\t%s\t%d\t%s", name+strings.Repeat(" ", maxClusterName-len(name)), c.Clouds(), - len(c.VMs), machineType(c.VMs)) + fmt.Fprintf(tw, "%s\t%s\t%d\t%s\t%s", name+strings.Repeat(" ", maxClusterName-len(name)), c.Clouds(), + len(c.VMs), machineType(c.VMs), cpuArch(c.VMs)) if !c.IsLocal() { colorByCostBucket := func(cost float64) func(string, ...interface{}) string { switch { diff --git a/pkg/cmd/roachtest/cluster.go b/pkg/cmd/roachtest/cluster.go index a532ae927ca5..156f69ccfe0f 100644 --- a/pkg/cmd/roachtest/cluster.go +++ b/pkg/cmd/roachtest/cluster.go @@ -597,6 +597,9 @@ func MachineTypeToCPUs(s string) int { if _, err := fmt.Sscanf(s, "n2-highcpu-%d", &v); err == nil { return v } + if _, err := fmt.Sscanf(s, "n2-custom-%d", &v); err == nil { + return v + } if _, err := fmt.Sscanf(s, "n2-highmem-%d", &v); err == nil { return v } @@ -650,9 +653,7 @@ func MachineTypeToCPUs(s string) int { // TODO(pbardea): Non-default Azure machine types are not supported // and will return unknown machine type error. - fmt.Fprintf(os.Stderr, "unknown machine type: %s\n", s) - os.Exit(1) - return -1 + panic(fmt.Sprintf("unknown machine type: %s\n", s)) } type nodeSelector interface { diff --git a/pkg/cmd/roachtest/cluster_test.go b/pkg/cmd/roachtest/cluster_test.go index 6483fd466dfe..5b57ba1d7651 100644 --- a/pkg/cmd/roachtest/cluster_test.go +++ b/pkg/cmd/roachtest/cluster_test.go @@ -11,6 +11,7 @@ package main import ( + "fmt" "testing" "time" @@ -168,6 +169,9 @@ func TestClusterMachineType(t *testing.T) { {"n2-standard-32", 32}, {"n2-standard-64", 64}, {"n2-standard-96", 96}, + {"n2-highmem-8", 8}, + {"n2-highcpu-16-2048", 16}, + {"n2-custom-32-65536", 32}, {"t2a-standard-2", 2}, {"t2a-standard-4", 4}, {"t2a-standard-8", 8}, @@ -185,6 +189,233 @@ func TestClusterMachineType(t *testing.T) { } } +type machineTypeTestCase struct { + cpus int + mem spec.MemPerCPU + localSSD bool + arch vm.CPUArch + expectedMachineType string + expectedArch vm.CPUArch +} + +func TestAWSMachineType(t *testing.T) { + testCases := []machineTypeTestCase{} + + xlarge := func(cpus int) string { + var size string + switch { + case cpus <= 2: + size = "large" + case cpus <= 4: + size = "xlarge" + case cpus <= 8: + size = "2xlarge" + case cpus <= 16: + size = "4xlarge" + case cpus <= 32: + size = "8xlarge" + case cpus <= 48: + size = "12xlarge" + case cpus <= 64: + size = "16xlarge" + case cpus <= 96: + size = "24xlarge" + default: + size = "24xlarge" + } + return size + } + + addAMD := func(mem spec.MemPerCPU) { + family := func() string { + switch mem { + case spec.Auto: + return "m6i" + case spec.Standard: + return "m6i" + case spec.High: + return "r6i" + } + return "" + } + + for _, arch := range []vm.CPUArch{vm.ArchAMD64, vm.ArchFIPS} { + family := family() + + testCases = append(testCases, machineTypeTestCase{1, mem, false, arch, + fmt.Sprintf("%s.%s", family, xlarge(1)), arch}) + testCases = append(testCases, machineTypeTestCase{1, mem, true, arch, + fmt.Sprintf("%sd.%s", family, xlarge(1)), arch}) + for i := 2; i <= 128; i += 2 { + if i > 16 && mem == spec.Auto { + family = "c6i" + } + testCases = append(testCases, machineTypeTestCase{i, mem, false, arch, + fmt.Sprintf("%s.%s", family, xlarge(i)), arch}) + testCases = append(testCases, machineTypeTestCase{i, mem, true, arch, + fmt.Sprintf("%sd.%s", family, xlarge(i)), arch}) + } + } + } + addARM := func(mem spec.MemPerCPU) { + fallback := false + var family string + + switch mem { + case spec.Auto: + family = "m7g" + case spec.Standard: + family = "m7g" + case spec.High: + family = "r6i" + fallback = true + } + + if fallback { + testCases = append(testCases, machineTypeTestCase{1, mem, false, vm.ArchARM64, + fmt.Sprintf("%s.%s", family, xlarge(1)), vm.ArchAMD64}) + testCases = append(testCases, machineTypeTestCase{1, mem, true, vm.ArchARM64, + fmt.Sprintf("%sd.%s", family, xlarge(1)), vm.ArchAMD64}) + } else { + testCases = append(testCases, machineTypeTestCase{1, mem, false, vm.ArchARM64, + fmt.Sprintf("%s.%s", family, xlarge(1)), vm.ArchARM64}) + testCases = append(testCases, machineTypeTestCase{1, mem, true, vm.ArchARM64, + fmt.Sprintf("%sd.%s", family, xlarge(1)), vm.ArchARM64}) + } + for i := 2; i <= 128; i += 2 { + if i > 16 && mem == spec.Auto { + family = "c7g" + } + fallback = fallback || i > 64 + + if fallback { + if mem == spec.Auto { + family = "c6i" + } else if mem == spec.Standard { + family = "m6i" + } + // Expect fallback to AMD64. + testCases = append(testCases, machineTypeTestCase{i, mem, false, vm.ArchARM64, + fmt.Sprintf("%s.%s", family, xlarge(i)), vm.ArchAMD64}) + testCases = append(testCases, machineTypeTestCase{i, mem, true, vm.ArchARM64, + fmt.Sprintf("%sd.%s", family, xlarge(i)), vm.ArchAMD64}) + } else { + testCases = append(testCases, machineTypeTestCase{i, mem, false, vm.ArchARM64, + fmt.Sprintf("%s.%s", family, xlarge(i)), vm.ArchARM64}) + testCases = append(testCases, machineTypeTestCase{i, mem, true, vm.ArchARM64, + fmt.Sprintf("%sd.%s", family, xlarge(i)), vm.ArchARM64}) + } + } + } + for _, mem := range []spec.MemPerCPU{spec.Auto, spec.Standard, spec.High} { + addAMD(mem) + addARM(mem) + } + + for _, tc := range testCases { + t.Run(fmt.Sprintf("%d/%s/%t/%s", tc.cpus, tc.mem, tc.localSSD, tc.arch), func(t *testing.T) { + machineType, selectedArch := spec.AWSMachineType(tc.cpus, tc.mem, tc.localSSD, tc.arch) + + require.Equal(t, tc.expectedMachineType, machineType) + require.Equal(t, tc.expectedArch, selectedArch) + }) + } + // spec.Low is not supported. + require.Panics(t, func() { spec.AWSMachineType(4, spec.Low, false, vm.ArchAMD64) }) + require.Panics(t, func() { spec.AWSMachineType(16, spec.Low, false, vm.ArchARM64) }) +} + +func TestGCEMachineType(t *testing.T) { + testCases := []machineTypeTestCase{} + + addAMD := func(mem spec.MemPerCPU) { + series := func() string { + switch mem { + case spec.Auto: + return "standard" + case spec.Standard: + return "standard" + case spec.High: + return "highmem" + case spec.Low: + return "highcpu" + } + return "" + } + + for _, arch := range []vm.CPUArch{vm.ArchAMD64, vm.ArchFIPS} { + series := series() + + testCases = append(testCases, machineTypeTestCase{1, mem, false, arch, + fmt.Sprintf("n2-%s-%d", series, 2), arch}) + for i := 2; i <= 128; i += 2 { + if i > 16 && mem == spec.Auto { + // n2-custom with 2GB per CPU. + testCases = append(testCases, machineTypeTestCase{i, mem, false, arch, + fmt.Sprintf("n2-custom-%d-%d", i, i*2048), arch}) + } else { + testCases = append(testCases, machineTypeTestCase{i, mem, false, arch, + fmt.Sprintf("n2-%s-%d", series, i), arch}) + } + } + } + } + addARM := func(mem spec.MemPerCPU) { + fallback := false + var series string + + switch mem { + case spec.Auto: + series = "standard" + case spec.Standard: + series = "standard" + case spec.High: + fallback = true + series = "highmem" + case spec.Low: + fallback = true + series = "highcpu" + } + + if fallback { + testCases = append(testCases, machineTypeTestCase{1, mem, false, vm.ArchARM64, + fmt.Sprintf("n2-%s-%d", series, 2), vm.ArchAMD64}) + } else { + testCases = append(testCases, machineTypeTestCase{1, mem, false, vm.ArchARM64, + fmt.Sprintf("t2a-%s-%d", series, 1), vm.ArchARM64}) + } + for i := 2; i <= 128; i += 2 { + fallback = fallback || i > 48 || (i > 16 && mem == spec.Auto) + + if fallback { + expectedMachineType := fmt.Sprintf("n2-%s-%d", series, i) + if i > 16 && mem == spec.Auto { + expectedMachineType = fmt.Sprintf("n2-custom-%d-%d", i, i*2048) + } + // Expect fallback to AMD64. + testCases = append(testCases, machineTypeTestCase{i, mem, false, vm.ArchARM64, + expectedMachineType, vm.ArchAMD64}) + } else { + testCases = append(testCases, machineTypeTestCase{i, mem, false, vm.ArchARM64, + fmt.Sprintf("t2a-%s-%d", series, i), vm.ArchARM64}) + } + } + } + for _, mem := range []spec.MemPerCPU{spec.Auto, spec.Standard, spec.High, spec.Low} { + addAMD(mem) + addARM(mem) + } + + for _, tc := range testCases { + t.Run(fmt.Sprintf("%d/%s/%s", tc.cpus, tc.mem, tc.arch), func(t *testing.T) { + machineType, selectedArch := spec.GCEMachineType(tc.cpus, tc.mem, tc.arch) + + require.Equal(t, tc.expectedMachineType, machineType) + require.Equal(t, tc.expectedArch, selectedArch) + }) + } +} + func TestCmdLogFileName(t *testing.T) { ts := time.Date(2000, 1, 1, 15, 4, 12, 0, time.Local) diff --git a/pkg/cmd/roachtest/spec/cluster_spec.go b/pkg/cmd/roachtest/spec/cluster_spec.go index 7022fe783171..0c29b55de2ca 100644 --- a/pkg/cmd/roachtest/spec/cluster_spec.go +++ b/pkg/cmd/roachtest/spec/cluster_spec.go @@ -172,10 +172,18 @@ func getGCEOpts( localSSD bool, RAID0 bool, terminateOnMigration bool, - minCPUPlatform, volumeType string, + minCPUPlatform string, + arch vm.CPUArch, + volumeType string, ) vm.ProviderOpts { opts := gce.DefaultProviderOpts() opts.MachineType = machineType + if arch == vm.ArchARM64 { + // ARM64 machines don't support minCPUPlatform. + opts.MinCPUPlatform = "" + } else if minCPUPlatform != "" { + opts.MinCPUPlatform = minCPUPlatform + } if volumeSize != 0 { opts.PDVolumeSize = volumeSize } @@ -191,7 +199,6 @@ func getGCEOpts( opts.UseMultipleDisks = !RAID0 } opts.TerminateOnMigration = terminateOnMigration - opts.MinCPUPlatform = minCPUPlatform if volumeType != "" { opts.PDVolumeType = volumeType } @@ -258,7 +265,7 @@ func (s *ClusterSpec) RoachprodOpts( // based on the cloud and CPU count. switch s.Cloud { case AWS: - machineType, selectedArch = AWSMachineType(s.CPUs, s.Mem, arch) + machineType, selectedArch = AWSMachineType(s.CPUs, s.Mem, s.PreferLocalSSD && s.VolumeSize == 0, arch) case GCE: machineType, selectedArch = GCEMachineType(s.CPUs, s.Mem, arch) case Azure: @@ -324,7 +331,7 @@ func (s *ClusterSpec) RoachprodOpts( case GCE: providerOpts = getGCEOpts(machineType, zones, s.VolumeSize, ssdCount, createVMOpts.SSDOpts.UseLocalSSD, s.RAID0, s.TerminateOnMigration, - s.GCEMinCPUPlatform, s.GCEVolumeType, + s.GCEMinCPUPlatform, vm.ParseArch(createVMOpts.Arch), s.GCEVolumeType, ) case Azure: providerOpts = getAzureOpts(machineType, zones) diff --git a/pkg/cmd/roachtest/spec/machine_type.go b/pkg/cmd/roachtest/spec/machine_type.go index 533874e4529f..8239ae522599 100644 --- a/pkg/cmd/roachtest/spec/machine_type.go +++ b/pkg/cmd/roachtest/spec/machine_type.go @@ -16,26 +16,50 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachprod/vm" ) -// AWSMachineType selects a machine type given the desired number of CPUs and -// memory per CPU ratio. Also returns the architecture of the selected machine type. -func AWSMachineType(cpus int, mem MemPerCPU, arch vm.CPUArch) (string, vm.CPUArch) { - // TODO(erikgrinaker): These have significantly less RAM than - // their GCE counterparts. Consider harmonizing them. - family := "c6id" // 2 GB RAM per CPU +// AWSMachineType selects a machine type given the desired number of CPUs, memory per CPU, +// support for locally-attached SSDs and CPU architecture. It returns a compatible machine type and its architecture. +// +// When MemPerCPU is Standard, the memory per CPU ratio is 4 GB. For High, it is 8 GB. +// For Auto, it's 4 GB up to and including 16 CPUs, then 2 GB. Low is not supported. +// +// N.B. in some cases, the selected architecture and machine type may be different from the requested one. E.g., +// graviton3 with >= 24xlarge (96 vCPUs) isn't available, so we fall back to (c|m|r)6i.24xlarge. +// N.B. cpus is expected to be an even number; validation is deferred to a specific cloud provider. +// +// At the time of writing, the intel machines are all third-generation Xeon, "Ice Lake" which are isomorphic to +// GCE's n2-(standard|highmem|custom) _with_ --minimum-cpu-platform="Intel Ice Lake" (roachprod's default). +func AWSMachineType( + cpus int, mem MemPerCPU, shouldSupportLocalSSD bool, arch vm.CPUArch, +) (string, vm.CPUArch) { + family := "m6i" // 4 GB RAM per CPU selectedArch := vm.ArchAMD64 + if arch == vm.ArchFIPS { + // N.B. FIPS is available in any AMD64 machine configuration. selectedArch = vm.ArchFIPS } else if arch == vm.ArchARM64 { - family = "c7g" // 2 GB RAM per CPU (graviton3) + family = "m7g" // 4 GB RAM per CPU (graviton3) selectedArch = vm.ArchARM64 } - if mem == High { - family = "m6i" // 4 GB RAM per CPU + switch mem { + case Auto: + if cpus > 16 { + family = "c6i" // 2 GB RAM per CPU + + if arch == vm.ArchARM64 { + family = "c7g" // 2 GB RAM per CPU (graviton3) + } + } + case Standard: + // nothing to do, family is already configured as per above + case High: + family = "r6i" // 8 GB RAM per CPU + // N.B. graviton3 doesn't support x8 memory multiplier, so we fall back. if arch == vm.ArchARM64 { - family = "m7g" // 4 GB RAM per CPU (graviton3) + selectedArch = vm.ArchAMD64 } - } else if mem == Low { + case Low: panic("low memory per CPU not available for AWS") } @@ -58,59 +82,97 @@ func AWSMachineType(cpus int, mem MemPerCPU, arch vm.CPUArch) (string, vm.CPUArc case cpus <= 96: size = "24xlarge" default: - panic(fmt.Sprintf("no aws machine type with %d cpus", cpus)) + // N.B. some machines can go up to 192 vCPUs, but we never exceed 96 in tests. + size = "24xlarge" } - - // There is no m7g.24xlarge, fall back to m6i.24xlarge. - if family == "m7g" && size == "24xlarge" { - family = "m6i" + // There is no m7g.24xlarge (or c7g.24xlarge), fall back to (c|m|r)6i.24xlarge. + if selectedArch == vm.ArchARM64 && size == "24xlarge" { + switch mem { + case Auto: + family = "c6i" + case Standard: + family = "m6i" + case High: + family = "r6i" + } selectedArch = vm.ArchAMD64 } - // There is no c7g.24xlarge, fall back to c6id.24xlarge. - if family == "c7g" && size == "24xlarge" { - family = "c6id" - selectedArch = vm.ArchAMD64 + if shouldSupportLocalSSD { + // All of the above instance families can be modified to support local SSDs by appending "d". + family += "d" } return fmt.Sprintf("%s.%s", family, size), selectedArch } -// GCEMachineType selects a machine type given the desired number of CPUs and -// memory per CPU ratio. Also returns the architecture of the selected machine type. +// GCEMachineType selects a machine type given the desired number of CPUs, memory per CPU, and CPU architecture. +// It returns a compatible machine type and its architecture. +// +// When MemPerCPU is Standard, the memory per CPU ratio is 4 GB. For High, it is 8 GB. +// For Auto, it's 4 GB up to and including 16 CPUs, then 2 GB. Low is 1 GB. +// +// N.B. in some cases, the selected architecture and machine type may be different from the requested one. E.g., +// single CPU machines are not available, so we fall back to dual CPU machines. +// N.B. cpus is expected to be an even number; validation is deferred to a specific cloud provider. +// +// At the time of writing, the intel machines are all third-generation xeon, "Ice Lake" assuming +// --minimum-cpu-platform="Intel Ice Lake" (roachprod's default). This is isomorphic to AWS's m6i or c6i. +// The only exception is low memory machines (n2-highcpu-xxx), which aren't available in AWS. func GCEMachineType(cpus int, mem MemPerCPU, arch vm.CPUArch) (string, vm.CPUArch) { - // TODO(peter): This is awkward: at or below 16 cpus, use n2-standard so that - // the machines have a decent amount of RAM. We could use custom machine - // configurations, but the rules for the amount of RAM per CPU need to be - // determined (you can't request any arbitrary amount of RAM). series := "n2" selectedArch := vm.ArchAMD64 + if arch == vm.ArchFIPS { + // N.B. FIPS is available in any AMD64 machine configuration. selectedArch = vm.ArchFIPS + } else if arch == vm.ArchARM64 { + selectedArch = vm.ArchARM64 + series = "t2a" // Ampere Altra } var kind string switch mem { case Auto: if cpus > 16 { - kind = "highcpu" + // We'll use 2GB RAM per CPU for custom machines. + kind = "custom" + if arch == vm.ArchARM64 { + // T2A doesn't support custom, fall back to n2. + series = "n2" + selectedArch = vm.ArchAMD64 + } } else { kind = "standard" } case Standard: - kind = "standard" // 3.75 GB RAM per CPU + kind = "standard" // 4 GB RAM per CPU case High: - kind = "highmem" // 6.5 GB RAM per CPU + kind = "highmem" // 8 GB RAM per CPU + if arch == vm.ArchARM64 { + // T2A doesn't support highmem, fall back to n2. + series = "n2" + selectedArch = vm.ArchAMD64 + } case Low: - kind = "highcpu" // 0.9 GB RAM per CPU + kind = "highcpu" // 1 GB RAM per CPU + if arch == vm.ArchARM64 { + // T2A doesn't support highcpu, fall back to n2. + series = "n2" + selectedArch = vm.ArchAMD64 + } } - if arch == vm.ArchARM64 && mem == Auto && cpus <= 48 { - series = "t2a" - kind = "standard" - selectedArch = vm.ArchARM64 + // T2A doesn't support cpus > 48, fall back to n2. + if selectedArch == vm.ArchARM64 && cpus > 48 { + series = "n2" + selectedArch = vm.ArchAMD64 } - // N.B. n2 family does not support single CPU machines. + // N.B. n2 does not support single CPU machines. if series == "n2" && cpus == 1 { cpus = 2 } + if kind == "custom" { + // We use 2GB RAM per CPU for custom machines. + return fmt.Sprintf("%s-custom-%d-%d", series, cpus, 2048*cpus), selectedArch + } return fmt.Sprintf("%s-%s-%d", series, kind, cpus), selectedArch } diff --git a/pkg/cmd/roachtest/tests/admission_control_database_drop.go b/pkg/cmd/roachtest/tests/admission_control_database_drop.go index 1124001e9a4a..fbf33b1d0a19 100644 --- a/pkg/cmd/roachtest/tests/admission_control_database_drop.go +++ b/pkg/cmd/roachtest/tests/admission_control_database_drop.go @@ -36,7 +36,6 @@ func registerDatabaseDrop(r registry.Registry) { spec.Cloud(spec.GCE), ) clusterSpec.InstanceType = "n2-standard-8" - clusterSpec.GCEMinCPUPlatform = "Intel Ice Lake" clusterSpec.GCEVolumeType = "pd-ssd" r.Add(registry.TestSpec{ diff --git a/pkg/cmd/roachtest/tests/admission_control_index_backfill.go b/pkg/cmd/roachtest/tests/admission_control_index_backfill.go index 3b8f566d84b3..7eebc99018f4 100644 --- a/pkg/cmd/roachtest/tests/admission_control_index_backfill.go +++ b/pkg/cmd/roachtest/tests/admission_control_index_backfill.go @@ -37,7 +37,6 @@ func registerIndexBackfill(r registry.Registry) { spec.Cloud(spec.GCE), ) clusterSpec.InstanceType = "n2-standard-8" - clusterSpec.GCEMinCPUPlatform = "Intel Ice Lake" clusterSpec.GCEVolumeType = "pd-ssd" r.Add(registry.TestSpec{ diff --git a/pkg/cmd/roachtest/tests/restore.go b/pkg/cmd/roachtest/tests/restore.go index 5572bcee2994..ab46d517cd66 100644 --- a/pkg/cmd/roachtest/tests/restore.go +++ b/pkg/cmd/roachtest/tests/restore.go @@ -504,7 +504,7 @@ func (hw hardwareSpecs) makeClusterSpecs(r registry.Registry, backupCloud string // https://github.com/cockroachdb/cockroach/issues/98783. // // TODO(srosenberg): Remove this workaround when 98783 is addressed. - s.InstanceType, _ = spec.AWSMachineType(s.CPUs, s.Mem, vm.ArchAMD64) + s.InstanceType, _ = spec.AWSMachineType(s.CPUs, s.Mem, s.PreferLocalSSD && s.VolumeSize == 0, vm.ArchAMD64) s.InstanceType = strings.Replace(s.InstanceType, "d.", ".", 1) s.Arch = vm.ArchAMD64 } diff --git a/pkg/roachprod/cloud/cluster_cloud.go b/pkg/roachprod/cloud/cluster_cloud.go index ef2c3731befa..bdec1c011615 100644 --- a/pkg/roachprod/cloud/cluster_cloud.go +++ b/pkg/roachprod/cloud/cluster_cloud.go @@ -154,7 +154,7 @@ func (c *Cluster) PrintDetails(logger *logger.Logger) { logger.Printf("(no expiration)") } for _, vm := range c.VMs { - logger.Printf(" %s\t%s\t%s\t%s", vm.Name, vm.DNS, vm.PrivateIP, vm.PublicIP) + logger.Printf(" %s\t%s\t%s\t%s\t%s\t%s\t%s", vm.Name, vm.DNS, vm.PrivateIP, vm.PublicIP, vm.MachineType, vm.CPUArch, vm.CPUFamily) } } diff --git a/pkg/roachprod/vm/aws/aws.go b/pkg/roachprod/vm/aws/aws.go index 04434cd91e56..965bbc0c6c96 100644 --- a/pkg/roachprod/vm/aws/aws.go +++ b/pkg/roachprod/vm/aws/aws.go @@ -864,9 +864,10 @@ func (p *Provider) listRegion( var data struct { Reservations []struct { Instances []struct { - InstanceID string `json:"InstanceId"` - LaunchTime string - Placement struct { + InstanceID string `json:"InstanceId"` + Architecture string + LaunchTime string + Placement struct { AvailabilityZone string } PrivateDNSName string `json:"PrivateDnsName"` @@ -982,6 +983,7 @@ func (p *Provider) listRegion( RemoteUser: opts.RemoteUserName, VPC: in.VpcID, MachineType: in.InstanceType, + CPUArch: vm.ParseArch(in.Architecture), Zone: in.Placement.AvailabilityZone, NonBootAttachedVolumes: nonBootableVolumes, } diff --git a/pkg/roachprod/vm/gce/gcloud.go b/pkg/roachprod/vm/gce/gcloud.go index f0255d1ca132..98040d40ca47 100644 --- a/pkg/roachprod/vm/gce/gcloud.go +++ b/pkg/roachprod/vm/gce/gcloud.go @@ -126,6 +126,8 @@ type jsonVM struct { ProvisioningModel string } MachineType string + // CPU platform corresponding to machine type; see https://cloud.google.com/compute/docs/cpu-platforms + CPUPlatform string SelfLink string Zone string instanceDisksResponse @@ -168,6 +170,7 @@ func (jsonVM *jsonVM) toVM( } machineType := lastComponent(jsonVM.MachineType) + cpuPlatform := jsonVM.CPUPlatform zone := lastComponent(jsonVM.Zone) remoteUser := config.SharedUser if !opts.useSharedUser { @@ -238,6 +241,8 @@ func (jsonVM *jsonVM) toVM( RemoteUser: remoteUser, VPC: vpc, MachineType: machineType, + CPUArch: vm.ParseArch(cpuPlatform), + CPUFamily: strings.Replace(strings.ToLower(cpuPlatform), "intel ", "", 1), Zone: zone, Project: project, NonBootAttachedVolumes: volumes, @@ -253,10 +258,10 @@ type jsonAuth struct { // DefaultProviderOpts returns a new gce.ProviderOpts with default values set. func DefaultProviderOpts() *ProviderOpts { return &ProviderOpts{ - // projects needs space for one project, which is set by the flags for - // commands that accept a single project. + // N.B. we set minCPUPlatform to "Intel Ice Lake" by default because it's readily available in the majority of GCE + // regions. Furthermore, it gets us closer to AWS instances like m6i which exclusively run Ice Lake. MachineType: "n2-standard-4", - MinCPUPlatform: "", + MinCPUPlatform: "Intel Ice Lake", Zones: nil, Image: DefaultImage, SSDCount: 1, @@ -803,7 +808,7 @@ func (o *ProviderOpts) ConfigureCreateFlags(flags *pflag.FlagSet) { flags.StringVar(&o.MachineType, ProviderName+"-machine-type", "n2-standard-4", "Machine type (see https://cloud.google.com/compute/docs/machine-types)") - flags.StringVar(&o.MinCPUPlatform, ProviderName+"-min-cpu-platform", "", + flags.StringVar(&o.MinCPUPlatform, ProviderName+"-min-cpu-platform", "Intel Ice Lake", "Minimum CPU platform (see https://cloud.google.com/compute/docs/instances/specify-min-cpu-platform)") flags.StringVar(&o.Image, ProviderName+"-image", DefaultImage, "Image to use to create the vm, "+ @@ -982,6 +987,10 @@ func (p *Provider) Create( } } } + if providerOpts.MinCPUPlatform != "" { + l.Printf("WARNING: --gce-min-cpu-platform is ignored for T2A instances") + providerOpts.MinCPUPlatform = "" + } } //TODO(srosenberg): remove this once we have a better way to detect ARM64 machines if useArmAMI { @@ -1145,7 +1154,8 @@ func (p *Provider) Create( // N.B. Only n1, n2 and c2 instances are supported since we don't typically use other instance types. // Consult https://cloud.google.com/compute/docs/disks/#local_ssd_machine_type_restrictions for other types of instances. func AllowedLocalSSDCount(machineType string) ([]int, error) { - machineTypes := regexp.MustCompile(`^([cn])(\d+)-.+-(\d+)$`) + // E.g., n2-standard-4, n2-custom-8-16384. + machineTypes := regexp.MustCompile(`^([cn])(\d+)-[a-z]+-(\d+)(?:-\d+)?$`) matches := machineTypes.FindStringSubmatch(machineType) if len(matches) >= 3 { @@ -1189,7 +1199,7 @@ func AllowedLocalSSDCount(machineType string) ([]int, error) { } } } - return nil, fmt.Errorf("unsupported machine type: %q", machineType) + return nil, fmt.Errorf("unsupported machine type: %q, matches: %v", machineType, matches) } // N.B. neither boot disk nor additional persistent disks are assigned VM labels by default. @@ -1529,16 +1539,48 @@ func populateCostPerHour(l *logger.Logger, vms vm.List) error { }, }, }, - Preemptible: vm.Preemptible, - MachineType: &cloudbilling.MachineType{ - PredefinedMachineType: &cloudbilling.PredefinedMachineType{ - MachineType: machineType, - }, - }, + Preemptible: vm.Preemptible, PersistentDisks: []*cloudbilling.PersistentDisk{}, Region: zone[:len(zone)-2], }, } + if !strings.Contains(machineType, "custom") { + workload.ComputeVmWorkload.MachineType = &cloudbilling.MachineType{ + PredefinedMachineType: &cloudbilling.PredefinedMachineType{ + MachineType: machineType, + }, + } + } else { + decodeCustomType := func() (string, int64, int64, error) { + parts := strings.Split(machineType, "-") + decodeErr := errors.Newf("invalid custom machineType %s", machineType) + if len(parts) != 4 { + return "", 0, 0, decodeErr + } + series, cpus, memory := parts[0], parts[2], parts[3] + cpusInt, parseErr := strconv.Atoi(cpus) + if parseErr != nil { + return "", 0, 0, decodeErr + } + memoryInt, parseErr := strconv.Atoi(memory) + if parseErr != nil { + return "", 0, 0, decodeErr + } + return series, int64(cpusInt), int64(memoryInt), nil + } + series, cpus, memory, err := decodeCustomType() + if err != nil { + l.Errorf("Error estimating VM costs (will continue without): %v", err) + continue + } + workload.ComputeVmWorkload.MachineType = &cloudbilling.MachineType{ + CustomMachineType: &cloudbilling.CustomMachineType{ + MachineSeries: series, + VirtualCpuCount: cpus, + MemorySizeGb: float64(memory / 1024), + }, + } + } for _, v := range vm.NonBootAttachedVolumes { workload.ComputeVmWorkload.PersistentDisks = append(workload.ComputeVmWorkload.PersistentDisks, &cloudbilling.PersistentDisk{ DiskSize: &cloudbilling.Usage{ diff --git a/pkg/roachprod/vm/vm.go b/pkg/roachprod/vm/vm.go index ae2090edc11d..dbccb822cdb5 100644 --- a/pkg/roachprod/vm/vm.go +++ b/pkg/roachprod/vm/vm.go @@ -41,13 +41,36 @@ const ( // TagArch is the CPU architecture tag const. TagArch = "arch" - ArchARM64 = CPUArch("arm64") - ArchAMD64 = CPUArch("amd64") - ArchFIPS = CPUArch("fips") + ArchARM64 = CPUArch("arm64") + ArchAMD64 = CPUArch("amd64") + ArchFIPS = CPUArch("fips") + ArchUnknown = CPUArch("unknown") ) type CPUArch string +// ParseArch parses a string into a CPUArch using a simple, non-exhaustive heuristic. +// Supported input values were extracted from the following CLI tools/binaries: file, gcloud, aws +func ParseArch(s string) CPUArch { + if s == "" { + return ArchUnknown + } + arch := strings.ToLower(s) + + if strings.Contains(arch, "amd64") || strings.Contains(arch, "x86_64") || + strings.Contains(arch, "intel") { + return ArchAMD64 + } + if strings.Contains(arch, "arm64") || strings.Contains(arch, "aarch64") || + strings.Contains(arch, "ampere") || strings.Contains(arch, "graviton") { + return ArchARM64 + } + if strings.Contains(arch, "fips") { + return ArchFIPS + } + return ArchUnknown +} + // GetDefaultLabelMap returns a label map for a common set of labels. func GetDefaultLabelMap(opts CreateOpts) map[string]string { // Add architecture override tag, only if it was specified. @@ -100,7 +123,11 @@ type VM struct { // their public or private IP. VPC string `json:"vpc"` MachineType string `json:"machine_type"` - Zone string `json:"zone"` + // When available, either vm.ArchAMD64 or vm.ArchARM64. + CPUArch CPUArch `json:"cpu_architecture"` + // When available, 'Haswell', 'Skylake', etc. + CPUFamily string `json:"cpu_family"` + Zone string `json:"zone"` // Project represents the project to which this vm belongs, if the VM is in a // cloud that supports project (i.e. GCE). Empty otherwise. Project string `json:"project"` diff --git a/pkg/roachprod/vm/vm_test.go b/pkg/roachprod/vm/vm_test.go index da9a5729adfa..9f1a5e5e4530 100644 --- a/pkg/roachprod/vm/vm_test.go +++ b/pkg/roachprod/vm/vm_test.go @@ -156,3 +156,26 @@ func TestSanitizeLabel(t *testing.T) { }) } } + +func TestParseArch(t *testing.T) { + cases := []struct { + arch string + expected CPUArch + }{ + {"amd64", ArchAMD64}, + {"arm64", ArchARM64}, + {"Intel", ArchAMD64}, + {"x86_64", ArchAMD64}, + {"aarch64", ArchARM64}, + {"Intel Cascade Lake", ArchAMD64}, + {"Ampere Altra", ArchARM64}, + // E.g., GCE returns this when VM is still being provisioned. + {"Unknown CPU Platform", ArchUnknown}, + } + + for _, c := range cases { + t.Run(c.arch, func(t *testing.T) { + assert.EqualValues(t, c.expected, ParseArch(c.arch)) + }) + } +} From 5086cd766a0b4c6fe88b9d7add79166bf5c140fa Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Thu, 28 Sep 2023 19:32:04 +0000 Subject: [PATCH 3/6] server: change x-locality log from vevent to vinfo The X-locality log events were added in #104585 to the Node batch receive path, to alert when localities were misconfigured. In some clusters, especially test clusters, these events are unnecessarily verbose in traces. Change the log from `VEvent(5)` to `VInfo(5)` in the node batch path. Part of: #110648 Epic: none Release note: None --- pkg/server/node.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/server/node.go b/pkg/server/node.go index c1d9bd5fc0d5..85f9aabf937c 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -1385,23 +1385,23 @@ func (n *Node) getLocalityComparison( ) roachpb.LocalityComparisonType { gossip := n.storeCfg.Gossip if gossip == nil { - log.VEventf(ctx, 2, "gossip is not configured") + log.VInfof(ctx, 2, "gossip is not configured") return roachpb.LocalityComparisonType_UNDEFINED } gatewayNodeDesc, err := gossip.GetNodeDescriptor(gatewayNodeID) if err != nil { - log.VEventf(ctx, 2, + log.VInfof(ctx, 2, "failed to perform look up for node descriptor %v", err) return roachpb.LocalityComparisonType_UNDEFINED } comparisonResult, regionErr, zoneErr := n.Descriptor.Locality.CompareWithLocality(gatewayNodeDesc.Locality) if regionErr != nil { - log.VEventf(ctx, 5, "unable to determine if the given nodes are cross region %v", regionErr) + log.VInfof(ctx, 5, "unable to determine if the given nodes are cross region %v", regionErr) } if zoneErr != nil { - log.VEventf(ctx, 5, "unable to determine if the given nodes are cross zone %v", zoneErr) + log.VInfof(ctx, 5, "unable to determine if the given nodes are cross zone %v", zoneErr) } return comparisonResult From 3412f90a6002d7206e8ebe3a186aa1fd3b6fa5cd Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Thu, 28 Sep 2023 19:35:10 +0000 Subject: [PATCH 4/6] kvcoord: change x-locality log from vevent to vinfo The X-locality log events were added in #103963 for the dist sender, to alert when localities were misconfigured. In some clusters, especially test clusters, these events are unnecessarily verbose in traces. Change the log from `VEvent(5)` to `VInfo(5)` in the dist sender path. Resolves: #110648 Epic: none Release note: None --- pkg/kv/kvclient/kvcoord/dist_sender.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index 0986e38d35f5..bbcc42c7804c 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -2653,10 +2653,10 @@ func (ds *DistSender) getLocalityComparison( comparisonResult, regionErr, zoneErr := gatewayNodeDesc.Locality.CompareWithLocality(destinationNodeDesc.Locality) if regionErr != nil { - log.VEventf(ctx, 5, "unable to determine if the given nodes are cross region %v", regionErr) + log.VInfof(ctx, 5, "unable to determine if the given nodes are cross region %v", regionErr) } if zoneErr != nil { - log.VEventf(ctx, 5, "unable to determine if the given nodes are cross zone %v", zoneErr) + log.VInfof(ctx, 5, "unable to determine if the given nodes are cross zone %v", zoneErr) } return comparisonResult } From 81ebdaf7c659663f89d0da1028f683753e1c71fa Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Fri, 29 Sep 2023 02:21:24 +0200 Subject: [PATCH 5/6] server,settingswatcher: properly evict entries from the local persisted cache (For context, on each node there is a local persisted cache of cluster setting customizations. This exists to ensure that configured values can be used even before a node has fully started up and can start reading customizations from `system.settings`.) Prior to this patch, entries were never evicted from the local persisted cache: when a cluster setting was reset, any previously saved entry in the cache would remain there. This is a very old bug, which was long hidden and was recently revealed when commit 2f5d717178a87b42fa94c20a226cc491d185455d was merged. In a nutshell, before this recent commit the code responsible to load the entries from the cache didn't fully work and so the stale entries were never restored from the cache. That commit fixed the loader code, and so the stale entries became active, which made the old bug visible. To fix the old bug, this present commit modifies the settings watcher to preserve KV deletion events, and propagates them to the persisted cache. (There is no release note because there is no user-facing release where the bug was visible.) Release note: None Co-authored-by: Steven Danna --- pkg/server/settings_cache.go | 16 ++++- pkg/server/settings_cache_test.go | 59 +++++++++++++++++++ .../settingswatcher/settings_watcher.go | 1 + .../settings_watcher_external_test.go | 9 ++- 4 files changed, 83 insertions(+), 2 deletions(-) diff --git a/pkg/server/settings_cache.go b/pkg/server/settings_cache.go index d3110d6ed6a9..2ac7bf68528b 100644 --- a/pkg/server/settings_cache.go +++ b/pkg/server/settings_cache.go @@ -99,10 +99,23 @@ var _ settingswatcher.Storage = (*settingsCacheWriter)(nil) func storeCachedSettingsKVs(ctx context.Context, eng storage.Engine, kvs []roachpb.KeyValue) error { batch := eng.NewBatch() defer batch.Close() + + // Remove previous entries -- they are now stale. + if _, _, _, err := storage.MVCCDeleteRange(ctx, batch, + keys.LocalStoreCachedSettingsKeyMin, + keys.LocalStoreCachedSettingsKeyMax, + 0 /* no limit */, hlc.Timestamp{}, storage.MVCCWriteOptions{}, false /* returnKeys */); err != nil { + return err + } + + // Now we can populate the cache with new entries. for _, kv := range kvs { kv.Value.Timestamp = hlc.Timestamp{} // nb: Timestamp is not part of checksum + cachedSettingsKey := keys.StoreCachedSettingsKey(kv.Key) + // A new value is added, or an existing value is updated. + log.VEventf(ctx, 1, "storing cached setting: %s -> %+v", cachedSettingsKey, kv.Value) if err := storage.MVCCPut( - ctx, batch, keys.StoreCachedSettingsKey(kv.Key), hlc.Timestamp{}, kv.Value, storage.MVCCWriteOptions{}, + ctx, batch, cachedSettingsKey, hlc.Timestamp{}, kv.Value, storage.MVCCWriteOptions{}, ); err != nil { return err } @@ -151,6 +164,7 @@ func initializeCachedSettings( " skipping settings updates.") } settingKey := settings.InternalKey(settingKeyS) + log.VEventf(ctx, 1, "loaded cached setting: %s -> %+v", settingKey, val) if err := updater.Set(ctx, settingKey, val); err != nil { log.Warningf(ctx, "setting %q to %v failed: %+v", settingKey, val, err) } diff --git a/pkg/server/settings_cache_test.go b/pkg/server/settings_cache_test.go index c1cc463ac911..04f42296092d 100644 --- a/pkg/server/settings_cache_test.go +++ b/pkg/server/settings_cache_test.go @@ -13,6 +13,7 @@ package server import ( "context" "fmt" + "strings" "testing" "time" @@ -24,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" @@ -146,3 +148,60 @@ Actual: %+v return nil }) } + +func TestCachedSettingDeletionIsPersisted(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + hasKey := func(kvs []roachpb.KeyValue, key string) bool { + for _, kv := range kvs { + if strings.Contains(string(kv.Key), key) { + return true + } + } + return false + } + + ctx := context.Background() + + ts, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ + DefaultTestTenant: base.TestIsSpecificToStorageLayerAndNeedsASystemTenant, + }) + defer ts.Stopper().Stop(ctx) + db := sqlutils.MakeSQLRunner(sqlDB) + + // Make the test faster. + st := ts.ClusterSettings() + closedts.TargetDuration.Override(ctx, &st.SV, 10*time.Millisecond) + closedts.SideTransportCloseInterval.Override(ctx, &st.SV, 10*time.Millisecond) + kvserver.RangeFeedRefreshInterval.Override(ctx, &st.SV, 10*time.Millisecond) + + // Customize a setting. + db.Exec(t, `SET CLUSTER SETTING ui.display_timezone = 'America/New_York'`) + // The setting won't propagate to the store until the setting watcher caches + // up with the rangefeed, which might take a while. + testutils.SucceedsSoon(t, func() error { + store, err := ts.GetStores().(*kvserver.Stores).GetStore(1) + require.NoError(t, err) + settings, err := loadCachedSettingsKVs(context.Background(), store.TODOEngine()) + require.NoError(t, err) + if !hasKey(settings, `ui.display_timezone`) { + return errors.New("cached setting not found") + } + return nil + }) + + // Reset the setting. + db.Exec(t, `RESET CLUSTER SETTING ui.display_timezone`) + // Check that the setting is eventually deleted from the store. + testutils.SucceedsSoon(t, func() error { + store, err := ts.GetStores().(*kvserver.Stores).GetStore(1) + require.NoError(t, err) + settings, err := loadCachedSettingsKVs(context.Background(), store.TODOEngine()) + require.NoError(t, err) + if hasKey(settings, `ui.display_timezone`) { + return errors.New("cached setting was still found") + } + return nil + }) +} diff --git a/pkg/server/settingswatcher/settings_watcher.go b/pkg/server/settingswatcher/settings_watcher.go index 89662a32afc1..abc3eec6c3ef 100644 --- a/pkg/server/settingswatcher/settings_watcher.go +++ b/pkg/server/settingswatcher/settings_watcher.go @@ -199,6 +199,7 @@ func (s *SettingsWatcher) Start(ctx context.Context) error { if s.storage != nil { bufferSize = settings.MaxSettings * 3 } + c := rangefeedcache.NewWatcher( "settings-watcher", s.clock, s.f, diff --git a/pkg/server/settingswatcher/settings_watcher_external_test.go b/pkg/server/settingswatcher/settings_watcher_external_test.go index 5e92467ec235..209be02d5f4a 100644 --- a/pkg/server/settingswatcher/settings_watcher_external_test.go +++ b/pkg/server/settingswatcher/settings_watcher_external_test.go @@ -216,7 +216,14 @@ type fakeStorage struct { func (f *fakeStorage) SnapshotKVs(ctx context.Context, kvs []roachpb.KeyValue) { f.Lock() defer f.Unlock() - f.kvs = kvs + nonDeletions := make([]roachpb.KeyValue, 0, len(kvs)) + for _, kv := range kvs { + if !kv.Value.IsPresent() { + continue + } + nonDeletions = append(nonDeletions, kv) + } + f.kvs = nonDeletions f.numWrites++ } From c5af6243bde3fdf75e51f7728914e01267a935cb Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Thu, 28 Sep 2023 18:00:43 +0200 Subject: [PATCH 6/6] settingswatcher: write-through to the persisted cache Prior to this patch, the rangefeed watcher over `system.settings` was updating the in-RAM value store before it propagated the updates to the persisted local cache. In fact, the update to the persisted local cache was lagging quite a bit behind, because the rangefeed watcher would buffer updates and only flush them after a while. As a result, the following sequence was possible: 1. client updates a cluster setting. 2. server is immediately shut down. The persisted cache has not been updated yet. 3. server is restarted. For a short while (until the settings watcher has caught up), the old version of the setting remains active. This recall of ghost values of a setting was simply a bug. This patch fixes that, by ensuring that the persisted cache is written through before the in-RAM value store. By doing this, we give up on batching updates to the persisted local store. This is deemed acceptable because cluster settings are not updated frequently. Release note: None --- pkg/server/server.go | 12 ++- .../settingswatcher/settings_watcher.go | 40 +++++----- pkg/server/testing_knobs.go | 4 + pkg/settings/integration_tests/BUILD.bazel | 1 + .../integration_tests/settings_test.go | 74 +++++++++++++++++++ 5 files changed, 107 insertions(+), 24 deletions(-) diff --git a/pkg/server/server.go b/pkg/server/server.go index b53e41eefad9..f2b03d10c81d 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -2143,8 +2143,16 @@ func (s *topLevelServer) PreStart(ctx context.Context) error { return err } - if err := s.node.tenantSettingsWatcher.Start(workersCtx, s.sqlServer.execCfg.SystemTableIDResolver); err != nil { - return errors.Wrap(err, "failed to initialize the tenant settings watcher") + startSettingsWatcher := true + if serverKnobs := s.cfg.TestingKnobs.Server; serverKnobs != nil { + if serverKnobs.(*TestingKnobs).DisableSettingsWatcher { + startSettingsWatcher = false + } + } + if startSettingsWatcher { + if err := s.node.tenantSettingsWatcher.Start(workersCtx, s.sqlServer.execCfg.SystemTableIDResolver); err != nil { + return errors.Wrap(err, "failed to initialize the tenant settings watcher") + } } if err := s.tenantCapabilitiesWatcher.Start(ctx); err != nil { return errors.Wrap(err, "initializing tenant capabilities") diff --git a/pkg/server/settingswatcher/settings_watcher.go b/pkg/server/settingswatcher/settings_watcher.go index abc3eec6c3ef..44ea86e75a32 100644 --- a/pkg/server/settingswatcher/settings_watcher.go +++ b/pkg/server/settingswatcher/settings_watcher.go @@ -211,7 +211,6 @@ func (s *SettingsWatcher) Start(ctx context.Context) error { }, func(ctx context.Context, update rangefeedcache.Update) { noteUpdate(update) - s.maybeUpdateSnapshot(ctx, update) }, s.testingWatcherKnobs, ) @@ -243,24 +242,6 @@ func (s *SettingsWatcher) Start(ctx context.Context) error { } } -func (s *SettingsWatcher) maybeUpdateSnapshot(ctx context.Context, update rangefeedcache.Update) { - // Only record the update to the buffer if we're writing to storage. - if s.storage == nil || - // and the update has some new information to write. - (update.Type == rangefeedcache.IncrementalUpdate && len(update.Events) == 0) { - return - } - eventKVs := rangefeedbuffer.EventsToKVs(update.Events, - rangefeedbuffer.RangeFeedValueEventToKV) - switch update.Type { - case rangefeedcache.CompleteUpdate: - s.snapshot = eventKVs - case rangefeedcache.IncrementalUpdate: - s.snapshot = rangefeedbuffer.MergeKVs(s.snapshot, eventKVs) - } - s.storage.SnapshotKVs(ctx, s.snapshot) -} - // TestingRestart restarts the rangefeeds and waits for the initial // update after the rangefeed update to be processed. func (s *SettingsWatcher) TestingRestart() { @@ -276,11 +257,13 @@ func (s *SettingsWatcher) TestingRestart() { func (s *SettingsWatcher) handleKV( ctx context.Context, kv *kvpb.RangeFeedValue, ) rangefeedbuffer.Event { - var alloc tree.DatumAlloc - settingKeyS, val, tombstone, err := s.dec.DecodeRow(roachpb.KeyValue{ + rkv := roachpb.KeyValue{ Key: kv.Key, Value: kv.Value, - }, &alloc) + } + + var alloc tree.DatumAlloc + settingKeyS, val, tombstone, err := s.dec.DecodeRow(rkv, &alloc) if err != nil { // This should never happen: the rangefeed should only ever deliver valid SQL rows. err = errors.NewAssertionErrorWithWrappedErrf(err, "failed to decode settings row %v", kv.Key) @@ -301,6 +284,19 @@ func (s *SettingsWatcher) handleKV( } } + // Ensure that the update is persisted to the local cache before we + // propagate the value to the in-RAM store. This ensures the latest + // value will be reloaded from the cache if the service is + // interrupted abruptly after the new value is seen by a client. + // + // Note: it is because we really want the cache to be updated before + // the in-RAM store that we do this here instead of batching the + // updates in the onUpdate rangefeed function. + if s.storage != nil { + s.snapshot = rangefeedbuffer.MergeKVs(s.snapshot, []roachpb.KeyValue{rkv}) + s.storage.SnapshotKVs(ctx, s.snapshot) + } + s.maybeSet(ctx, settingKey, settingsValue{ val: val, ts: kv.Value.Timestamp, diff --git a/pkg/server/testing_knobs.go b/pkg/server/testing_knobs.go index 9eaa7980c0b8..dcd77718bf28 100644 --- a/pkg/server/testing_knobs.go +++ b/pkg/server/testing_knobs.go @@ -161,6 +161,10 @@ type TestingKnobs struct { // DialNodeCallback is used to mock dial errors when dialing a node. It is // invoked by the dialNode method of server.serverIterator. DialNodeCallback func(ctx context.Context, nodeID roachpb.NodeID) error + + // DisableSettingsWatcher disables the watcher that monitors updates + // to system.settings. + DisableSettingsWatcher bool } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. diff --git a/pkg/settings/integration_tests/BUILD.bazel b/pkg/settings/integration_tests/BUILD.bazel index 7bbc72b18446..ad3ed76d2fea 100644 --- a/pkg/settings/integration_tests/BUILD.bazel +++ b/pkg/settings/integration_tests/BUILD.bazel @@ -19,5 +19,6 @@ go_test( "//pkg/util/leaktest", "//pkg/util/log", "@com_github_cockroachdb_errors//:errors", + "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/settings/integration_tests/settings_test.go b/pkg/settings/integration_tests/settings_test.go index 8b1910849d21..44443683a1b9 100644 --- a/pkg/settings/integration_tests/settings_test.go +++ b/pkg/settings/integration_tests/settings_test.go @@ -17,6 +17,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -24,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" ) const strKey = "testing.str" @@ -286,3 +288,75 @@ func TestSettingsShowAll(t *testing.T) { t.Fatalf("show all did not find the test keys: %q", rows) } } + +func TestSettingsPersistenceEndToEnd(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + // We're going to restart the test server, but expecting storage to + // persist. Define a sticky VFS for this purpose. + stickyVFSRegistry := server.NewStickyVFSRegistry() + serverKnobs := &server.TestingKnobs{ + StickyVFSRegistry: stickyVFSRegistry, + } + serverArgs := base.TestServerArgs{ + DefaultTestTenant: base.TestIsSpecificToStorageLayerAndNeedsASystemTenant, + StoreSpecs: []base.StoreSpec{ + {InMemory: true, StickyVFSID: "1"}, + }, + Knobs: base.TestingKnobs{ + Server: serverKnobs, + }, + } + + ts, sqlDB, _ := serverutils.StartServer(t, serverArgs) + defer ts.Stopper().Stop(ctx) + db := sqlutils.MakeSQLRunner(sqlDB) + + // We need a custom value for the cluster setting that's guaranteed + // to be different from the default. So check that it's not equal to + // the default always. + const differentValue = `something` + + setting, _ := settings.LookupForLocalAccessByKey("cluster.organization", true) + s := setting.(*settings.StringSetting) + st := ts.ClusterSettings() + require.NotEqual(t, s.Get(&st.SV), differentValue) + origValue := db.QueryStr(t, `SHOW CLUSTER SETTING cluster.organization`)[0][0] + + // Customize the setting. + db.Exec(t, `SET CLUSTER SETTING cluster.organization = $1`, differentValue) + newValue := db.QueryStr(t, `SHOW CLUSTER SETTING cluster.organization`)[0][0] + + // Restart the server; verify the setting customization is preserved. + // For this we disable the settings watcher, to ensure that + // only the value loaded by the local persisted cache is used. + ts.Stopper().Stop(ctx) + serverKnobs.DisableSettingsWatcher = true + ts, sqlDB, _ = serverutils.StartServer(t, serverArgs) + defer ts.Stopper().Stop(ctx) + db = sqlutils.MakeSQLRunner(sqlDB) + + db.CheckQueryResults(t, `SHOW CLUSTER SETTING cluster.organization`, [][]string{{newValue}}) + + // Restart the server to make the setting writable again. + ts.Stopper().Stop(ctx) + serverKnobs.DisableSettingsWatcher = false + ts, sqlDB, _ = serverutils.StartServer(t, serverArgs) + defer ts.Stopper().Stop(ctx) + db = sqlutils.MakeSQLRunner(sqlDB) + + // Reset the setting, then check the original value is restored. + db.Exec(t, `RESET CLUSTER SETTING cluster.organization`) + db.CheckQueryResults(t, `SHOW CLUSTER SETTING cluster.organization`, [][]string{{origValue}}) + + // Restart the server; verify the original value is still there. + ts.Stopper().Stop(ctx) + ts, sqlDB, _ = serverutils.StartServer(t, serverArgs) + defer ts.Stopper().Stop(ctx) + db = sqlutils.MakeSQLRunner(sqlDB) + + db.CheckQueryResults(t, `SHOW CLUSTER SETTING cluster.organization`, [][]string{{origValue}}) +}