From 1a63a470d726b43a5bea4b95ac6262f15281b34d Mon Sep 17 00:00:00 2001 From: David Taylor Date: Sun, 17 Dec 2023 13:31:18 +0000 Subject: [PATCH 1/4] server: bump up concurrent download calls These still only get executed at the compaction concurrency limit but having more of them in the queue will allow us to utilize higher compaction concurrency if it is configured. Release note: none. Epic: none. --- pkg/server/span_download.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/server/span_download.go b/pkg/server/span_download.go index 305d3f4ca176..695f9a9ef328 100644 --- a/pkg/server/span_download.go +++ b/pkg/server/span_download.go @@ -109,7 +109,9 @@ func (s *systemStatusServer) localDownloadSpan( } return nil } - for i := 0; i < 4; i++ { + + const downloadWaiters = 8 + for i := 0; i < downloadWaiters; i++ { grp.GoCtx(downloader) } return grp.Wait() From 6ff1b8bb3edd6104e3e0379c1809c3cdfb688acd Mon Sep 17 00:00:00 2001 From: David Taylor Date: Sun, 17 Dec 2023 15:53:23 +0000 Subject: [PATCH 2/4] storage: add API to increase/decrease compaction concurrency This adds a new method to the Engine API to increase or decrease the compaction concurrency by some amount. This is added along-side the existing API for setting the compaction concurrency to a specific amount, as it allows callers to increase or decrease concurrency without needing to know the current value. Release note: none. Epic: none. --- pkg/storage/engine.go | 4 ++++ pkg/storage/pebble.go | 16 ++++++++++++++++ 2 files changed, 20 insertions(+) diff --git a/pkg/storage/engine.go b/pkg/storage/engine.go index b25544e1ab42..514f29565061 100644 --- a/pkg/storage/engine.go +++ b/pkg/storage/engine.go @@ -1086,6 +1086,10 @@ type Engine interface { // concurrency. It returns the previous compaction concurrency. SetCompactionConcurrency(n uint64) uint64 + // AdjustCompactionConcurrency is used to adjust the engine's compaction + // concurrency. + AdjustCompactionConcurrency(delta int64) (uint64, error) + // SetStoreID informs the engine of the store ID, once it is known. // Used to show the store ID in logs and to initialize the shared object // creator ID (if shared object storage is configured). diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index 87ebbe937e14..e27a3ba0ff4b 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -930,6 +930,22 @@ func (p *Pebble) SetCompactionConcurrency(n uint64) uint64 { return prevConcurrency } +// AdjustCompactionConcurrency adjusts the compaction concurrency up or down by +// the passed delta, down to a minimum of 1. Attempts to reduce it below 1 will +// return an error. +func (p *Pebble) AdjustCompactionConcurrency(delta int64) (uint64, error) { + for { + current := atomic.LoadUint64(&p.atomic.compactionConcurrency) + adjusted := int64(current) + delta + if adjusted < 1 { + return 0, fmt.Errorf("compaction concurrency cannot be less than 1") + } + if atomic.CompareAndSwapUint64(&p.atomic.compactionConcurrency, current, uint64(adjusted)) { + return uint64(adjusted), nil + } + } +} + // SetStoreID adds the store id to pebble logs. func (p *Pebble) SetStoreID(ctx context.Context, storeID int32) error { if p == nil { From c623233d2f94ed4f835c43492b195c3a06173f9d Mon Sep 17 00:00:00 2001 From: Bilal Akhtar Date: Mon, 18 Dec 2023 11:00:30 -0500 Subject: [PATCH 3/4] go.mod: bump Pebble to 48b54c29d8fe https://github.com/cockroachdb/pebble/commit/48b54c29 sstable: fix incorrect range key mask in virtualLast() Informs #116330. Release note: None. Epic: none --- DEPS.bzl | 6 +++--- build/bazelutil/distdir_files.bzl | 2 +- go.mod | 2 +- go.sum | 4 ++-- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/DEPS.bzl b/DEPS.bzl index eef7a08a3fd8..309a40557c51 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -1613,10 +1613,10 @@ def go_deps(): patches = [ "@com_github_cockroachdb_cockroach//build/patches:com_github_cockroachdb_pebble.patch", ], - sha256 = "c63d15d21281f795e2cd016da02fdfa7a64a3365909fabd8d5581636af4fe7e0", - strip_prefix = "github.com/cockroachdb/pebble@v0.0.0-20231214172447-ab4952c5f87b", + sha256 = "1878bb40f322c5c93bb7db26b6287219eb56507fc59b82292fcd4d2187639a16", + strip_prefix = "github.com/cockroachdb/pebble@v0.0.0-20231218155426-48b54c29d8fe", urls = [ - "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20231214172447-ab4952c5f87b.zip", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20231218155426-48b54c29d8fe.zip", ], ) go_repository( diff --git a/build/bazelutil/distdir_files.bzl b/build/bazelutil/distdir_files.bzl index 42cd9fdcbc95..820feba1f4f7 100644 --- a/build/bazelutil/distdir_files.bzl +++ b/build/bazelutil/distdir_files.bzl @@ -322,7 +322,7 @@ DISTDIR_FILES = { "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/gostdlib/com_github_cockroachdb_gostdlib-v1.19.0.zip": "c4d516bcfe8c07b6fc09b8a9a07a95065b36c2855627cb3514e40c98f872b69e", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/logtags/com_github_cockroachdb_logtags-v0.0.0-20230118201751-21c54148d20b.zip": "ca7776f47e5fecb4c495490a679036bfc29d95bd7625290cfdb9abb0baf97476", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/metamorphic/com_github_cockroachdb_metamorphic-v0.0.0-20231108215700-4ba948b56895.zip": "28c8cf42192951b69378cf537be5a9a43f2aeb35542908cc4fe5f689505853ea", - "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20231214172447-ab4952c5f87b.zip": "c63d15d21281f795e2cd016da02fdfa7a64a3365909fabd8d5581636af4fe7e0", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20231218155426-48b54c29d8fe.zip": "1878bb40f322c5c93bb7db26b6287219eb56507fc59b82292fcd4d2187639a16", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/redact/com_github_cockroachdb_redact-v1.1.5.zip": "11b30528eb0dafc8bc1a5ba39d81277c257cbe6946a7564402f588357c164560", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/returncheck/com_github_cockroachdb_returncheck-v0.0.0-20200612231554-92cdbca611dd.zip": "ce92ba4352deec995b1f2eecf16eba7f5d51f5aa245a1c362dfe24c83d31f82b", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/stress/com_github_cockroachdb_stress-v0.0.0-20220803192808-1806698b1b7b.zip": "3fda531795c600daf25532a4f98be2a1335cd1e5e182c72789bca79f5f69fcc1", diff --git a/go.mod b/go.mod index f4f23320950a..e0f8c7112d3d 100644 --- a/go.mod +++ b/go.mod @@ -113,7 +113,7 @@ require ( github.com/cockroachdb/go-test-teamcity v0.0.0-20191211140407-cff980ad0a55 github.com/cockroachdb/gostdlib v1.19.0 github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b - github.com/cockroachdb/pebble v0.0.0-20231214172447-ab4952c5f87b + github.com/cockroachdb/pebble v0.0.0-20231218155426-48b54c29d8fe github.com/cockroachdb/redact v1.1.5 github.com/cockroachdb/returncheck v0.0.0-20200612231554-92cdbca611dd github.com/cockroachdb/stress v0.0.0-20220803192808-1806698b1b7b diff --git a/go.sum b/go.sum index 6f3f06c4ccc1..2dfa81f22747 100644 --- a/go.sum +++ b/go.sum @@ -493,8 +493,8 @@ github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b h1:r6VH0faHjZe github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b/go.mod h1:Vz9DsVWQQhf3vs21MhPMZpMGSht7O/2vFW2xusFUVOs= github.com/cockroachdb/metamorphic v0.0.0-20231108215700-4ba948b56895 h1:XANOgPYtvELQ/h4IrmPAohXqe2pWA8Bwhejr3VQoZsA= github.com/cockroachdb/metamorphic v0.0.0-20231108215700-4ba948b56895/go.mod h1:aPd7gM9ov9M8v32Yy5NJrDyOcD8z642dqs+F0CeNXfA= -github.com/cockroachdb/pebble v0.0.0-20231214172447-ab4952c5f87b h1:r3BhKev3k3GhdEGl9PMSO1MBZu5P1D+JXAcXI4O21UY= -github.com/cockroachdb/pebble v0.0.0-20231214172447-ab4952c5f87b/go.mod h1:BHuaMa/lK7fUe75BlsteiiTu8ptIG+qSAuDtGMArP18= +github.com/cockroachdb/pebble v0.0.0-20231218155426-48b54c29d8fe h1:ZBhPcgWjnfy2PFWlvPlcOXAfAQqOIdpfksijpKiMWcc= +github.com/cockroachdb/pebble v0.0.0-20231218155426-48b54c29d8fe/go.mod h1:BHuaMa/lK7fUe75BlsteiiTu8ptIG+qSAuDtGMArP18= github.com/cockroachdb/redact v1.1.3/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg= github.com/cockroachdb/redact v1.1.5 h1:u1PMllDkdFfPWaNGMyLD1+so+aq3uUItthCFqzwPJ30= github.com/cockroachdb/redact v1.1.5/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg= From acff1f2e90660689e225ad378bd05850b131a366 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Sun, 17 Dec 2023 16:00:07 +0000 Subject: [PATCH 4/4] server: dynamically increase compaction concurrency during downloads Early in an online restore, a workload may not be able to utilize all CPU due to the high latency of accessing still-remote data meaning that many or most queries spend most of their time waiting rather than executing. During this phase, increasing the compaciton concurrency can expedite getting data downloaded, to reduce that latency and improve the workload performance. However later in the download phase, when more of the most accessed data has been downloaded, the workload itself may be able to execute fast enough that it requires the majority of available CPU. At this point, excessive CPU usage by download compactions will actually negatively impact the workload performance. Thus it is desirable to maximize compaction concurrency when CPU is available but reduce it when becomes scarce. This change introduces an additional goroutine to the download call that monitors CPU usage and adjusts compaction concurrency up and down based on the CPU usage being below 70% or above 80% respectively. Release note: none. Epic: none. --- pkg/server/span_download.go | 65 ++++++++++++++++++++++++++++++++++++- pkg/storage/engine.go | 6 ++-- pkg/storage/pebble.go | 9 +++-- 3 files changed, 71 insertions(+), 9 deletions(-) diff --git a/pkg/server/span_download.go b/pkg/server/span_download.go index 695f9a9ef328..57502b9f3c14 100644 --- a/pkg/server/span_download.go +++ b/pkg/server/span_download.go @@ -12,6 +12,7 @@ package server import ( "context" + "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -19,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/server/srverrors" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -101,19 +103,80 @@ func (s *systemStatusServer) localDownloadSpan( return nil }) + const downloadWaiters = 16 + downloadersDone := make(chan struct{}, downloadWaiters) + downloader := func(ctx context.Context) error { for sp := range spanCh { if err := store.TODOEngine().Download(ctx, sp); err != nil { return err } } + downloadersDone <- struct{}{} return nil } - const downloadWaiters = 8 for i := 0; i < downloadWaiters; i++ { grp.GoCtx(downloader) } + + grp.GoCtx(func(ctx context.Context) error { + var added int64 + // Remove any additional concurrency we've added when we exit. + // + // TODO(dt,radu): Ideally we'd adjust a separate limit that applies only + // to download compactions, so that this does not fight with manual calls + // to SetConcurrentCompactions. + defer func() { + if added != 0 { + adjusted := store.TODOEngine().AdjustCompactionConcurrency(-added) + log.Infof(ctx, "downloads complete; reset compaction concurrency to %d", adjusted) + } + }() + + const maxAddedConcurrency, lowCPU, highCPU, initialIncrease = 16, 0.65, 0.8, 8 + + // Begin by bumping up the concurrency by 8, then start watching the CPU + // usage and adjusting up or down based on CPU until downloading finishes. + store.TODOEngine().AdjustCompactionConcurrency(initialIncrease) + added += initialIncrease + + t := time.NewTicker(time.Second * 15) + defer t.Stop() + ctxDone := ctx.Done() + + var waitersExited int + for { + select { + case <-ctxDone: + return ctx.Err() + case <-downloadersDone: + waitersExited++ + // Return and stop managing added concurrency if the workers are done. + if waitersExited >= downloadWaiters { + return nil + } + case <-t.C: + cpu := s.sqlServer.cfg.RuntimeStatSampler.GetCPUCombinedPercentNorm() + if cpu > highCPU && added > 0 { + // If CPU is high and we have added any additional concurrency, we + // should reduce our added concurrency to make sure CPU is available + // for the execution of foreground traffic. + adjusted := store.TODOEngine().AdjustCompactionConcurrency(-1) + added-- + log.Infof(ctx, "decreasing additional compaction concurrency to %d (%d total) due cpu usage %.0f%% > %.0f%%", added, adjusted, cpu*100, highCPU*100) + } else if cpu < lowCPU { + // If CPU is low, we should use it to do additional downloading. + if added < maxAddedConcurrency { + adjusted := store.TODOEngine().AdjustCompactionConcurrency(1) + added++ + log.Infof(ctx, "increasing additional compaction concurrency to %d (%d total) due cpu usage %.0f%% < %.0f%%", added, adjusted, cpu*100, lowCPU*100) + } + } + } + } + }) + return grp.Wait() }) } diff --git a/pkg/storage/engine.go b/pkg/storage/engine.go index 514f29565061..00b3f43f9f48 100644 --- a/pkg/storage/engine.go +++ b/pkg/storage/engine.go @@ -1086,9 +1086,9 @@ type Engine interface { // concurrency. It returns the previous compaction concurrency. SetCompactionConcurrency(n uint64) uint64 - // AdjustCompactionConcurrency is used to adjust the engine's compaction - // concurrency. - AdjustCompactionConcurrency(delta int64) (uint64, error) + // AdjustCompactionConcurrency adjusts the compaction concurrency up or down by + // the passed delta, down to a minimum of 1. + AdjustCompactionConcurrency(delta int64) uint64 // SetStoreID informs the engine of the store ID, once it is known. // Used to show the store ID in logs and to initialize the shared object diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index e27a3ba0ff4b..ef98d3d26cd6 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -931,17 +931,16 @@ func (p *Pebble) SetCompactionConcurrency(n uint64) uint64 { } // AdjustCompactionConcurrency adjusts the compaction concurrency up or down by -// the passed delta, down to a minimum of 1. Attempts to reduce it below 1 will -// return an error. -func (p *Pebble) AdjustCompactionConcurrency(delta int64) (uint64, error) { +// the passed delta, down to a minimum of 1. +func (p *Pebble) AdjustCompactionConcurrency(delta int64) uint64 { for { current := atomic.LoadUint64(&p.atomic.compactionConcurrency) adjusted := int64(current) + delta if adjusted < 1 { - return 0, fmt.Errorf("compaction concurrency cannot be less than 1") + adjusted = 1 } if atomic.CompareAndSwapUint64(&p.atomic.compactionConcurrency, current, uint64(adjusted)) { - return uint64(adjusted), nil + return uint64(adjusted) } } }