diff --git a/DEPS.bzl b/DEPS.bzl index 8ae23f702a4e..09e4ad4ab63d 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -1611,6 +1611,16 @@ def go_deps(): "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/stress/com_github_cockroachdb_stress-v0.0.0-20220803192808-1806698b1b7b.zip", ], ) + go_repository( + name = "com_github_cockroachdb_tokenbucket", + build_file_proto_mode = "disable_global", + importpath = "github.com/cockroachdb/tokenbucket", + sha256 = "7711efac97ce89c704e84b30c6c73887040d8ba50ce5b286e7113f4af7012339", + strip_prefix = "github.com/cockroachdb/tokenbucket@v0.0.0-20230613231145-182959a1fad6", + urls = [ + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/tokenbucket/com_github_cockroachdb_tokenbucket-v0.0.0-20230613231145-182959a1fad6.zip", + ], + ) go_repository( name = "com_github_cockroachdb_tools", build_file_proto_mode = "disable_global", diff --git a/build/bazelutil/distdir_files.bzl b/build/bazelutil/distdir_files.bzl index 57bce652909d..25991376c1ff 100644 --- a/build/bazelutil/distdir_files.bzl +++ b/build/bazelutil/distdir_files.bzl @@ -320,6 +320,7 @@ DISTDIR_FILES = { "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/stress/com_github_cockroachdb_stress-v0.0.0-20220803192808-1806698b1b7b.zip": "3fda531795c600daf25532a4f98be2a1335cd1e5e182c72789bca79f5f69fcc1", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/tablewriter/com_github_cockroachdb_tablewriter-v0.0.5-0.20200105123400-bd15540e8847.zip": "79daf1c29ec50cdd8dd1ea33f8a814963646a45a2ebe22742d652579340ebde0", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/teamcity/com_github_cockroachdb_teamcity-v0.0.0-20180905144921-8ca25c33eb11.zip": "9df6b028c9fb5bff7bdad844bda504356945fd6d3cd583c50f68d8b8e85060f6", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/tokenbucket/com_github_cockroachdb_tokenbucket-v0.0.0-20230613231145-182959a1fad6.zip": "7711efac97ce89c704e84b30c6c73887040d8ba50ce5b286e7113f4af7012339", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/tools/com_github_cockroachdb_tools-v0.0.0-20211112185054-642e51449b40.zip": "37a3737dd23768b4997b2f0341d625658f5862cdbf808f7fbf3a7f9fd25913a7", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/ttycolor/com_github_cockroachdb_ttycolor-v0.0.0-20210902133924-c7d7dcdde4e8.zip": "1260533510c89abd6d8af573a40f0246f6865d5091144dea509b2c48e7c61614", "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/vitess/com_github_cockroachdb_vitess-v0.0.0-20210218160543-54524729cc82.zip": "71f14e67f9396930d978d85c47b853f5cc4ce340e53cf88bf7d731b8428b2f77", diff --git a/build/teamcity/cockroach/ci/tests/lint_impl.sh b/build/teamcity/cockroach/ci/tests/lint_impl.sh index 6356b38513c9..fddec1f7a398 100755 --- a/build/teamcity/cockroach/ci/tests/lint_impl.sh +++ b/build/teamcity/cockroach/ci/tests/lint_impl.sh @@ -3,7 +3,6 @@ set -xeuo pipefail # GCAssert requirements -- start -export PATH="$(dirname $(bazel run @go_sdk//:bin/go --run_under=realpath)):$PATH" bazel run //pkg/gen:code bazel run //pkg/cmd/generate-cgo:generate-cgo --run_under="cd $(bazel info workspace) && " # GCAssert requirements -- end diff --git a/go.mod b/go.mod index 6f68520028b3..8b25b066a542 100644 --- a/go.mod +++ b/go.mod @@ -120,6 +120,7 @@ require ( github.com/cockroachdb/redact v1.1.5 github.com/cockroachdb/returncheck v0.0.0-20200612231554-92cdbca611dd github.com/cockroachdb/stress v0.0.0-20220803192808-1806698b1b7b + github.com/cockroachdb/tokenbucket v0.0.0-20230613231145-182959a1fad6 github.com/cockroachdb/tools v0.0.0-20211112185054-642e51449b40 github.com/cockroachdb/ttycolor v0.0.0-20210902133924-c7d7dcdde4e8 github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd diff --git a/go.sum b/go.sum index f1ef2f6f025a..c227a1c9bb65 100644 --- a/go.sum +++ b/go.sum @@ -499,6 +499,8 @@ github.com/cockroachdb/tablewriter v0.0.5-0.20200105123400-bd15540e8847 h1:c7yLg github.com/cockroachdb/tablewriter v0.0.5-0.20200105123400-bd15540e8847/go.mod h1:zq6QwlOf5SlnkVbMSr5EoBv3636FWnp+qbPhuoO21uA= github.com/cockroachdb/teamcity v0.0.0-20180905144921-8ca25c33eb11 h1:UAqRo5xPCyTtZznAJ9iPVpDUFxGI0a6QWtQ8E+zwJRg= github.com/cockroachdb/teamcity v0.0.0-20180905144921-8ca25c33eb11/go.mod h1:3299Mt0Q7PkqGqbsxhvbrTpMqRyIcZ6OMw4IEmiO09g= +github.com/cockroachdb/tokenbucket v0.0.0-20230613231145-182959a1fad6 h1:DJK8W/iB+s/qkTtmXSrHA49lp5O3OsR7E6z4byOLy34= +github.com/cockroachdb/tokenbucket v0.0.0-20230613231145-182959a1fad6/go.mod h1:7nc4anLGjupUW/PeY5qiNYsdNXj7zopG+eqsS7To5IQ= github.com/cockroachdb/tools v0.0.0-20211112185054-642e51449b40 h1:qVTb3XEv+7VVjPetop8yGbN95HfCeqZx+VMveeSJPZw= github.com/cockroachdb/tools v0.0.0-20211112185054-642e51449b40/go.mod h1:cllxeV+TYc387/XzQRnDg6YThHoDzFewovWffzAm37Q= github.com/cockroachdb/ttycolor v0.0.0-20210902133924-c7d7dcdde4e8 h1:Hli+oX84dKq44sLVCcsGKqifm5Lg9J8VoJ2P3h9iPdI= diff --git a/pkg/cmd/dev/lint.go b/pkg/cmd/dev/lint.go index 5f46a52411a4..aa6037a4c8aa 100644 --- a/pkg/cmd/dev/lint.go +++ b/pkg/cmd/dev/lint.go @@ -53,6 +53,26 @@ func (d *dev) lint(cmd *cobra.Command, commandLine []string) error { } } + lintEnv := os.Environ() + + if !short { + // First, generate code to make sure GCAssert and any other + // tests that depend on generated code still work. + if err := d.generateGo(cmd); err != nil { + return err + } + // We also need `CC` and `CXX` set appropriately. + cc, err := d.exec.LookPath("cc") + if err != nil { + return fmt.Errorf("`cc` is not installed; needed for `TestGCAssert` (%w)", err) + } + cc = strings.TrimSpace(cc) + d.log.Printf("export CC=%s", cc) + d.log.Printf("export CXX=%s", cc) + envWithCc := []string{"CC=" + cc, "CXX=" + cc} + lintEnv = append(envWithCc, lintEnv...) + } + var args []string // NOTE the --config=test here. It's very important we compile the test binary with the // appropriate stuff (gotags, etc.) @@ -80,13 +100,11 @@ func (d *dev) lint(cmd *cobra.Command, commandLine []string) error { if !strings.HasPrefix(pkg, "./") { pkg = "./" + pkg } - env := os.Environ() envvar := fmt.Sprintf("PKG=%s", pkg) d.log.Printf("export %s", envvar) - env = append(env, envvar) - return d.exec.CommandContextWithEnv(ctx, env, "bazel", args...) + lintEnv = append(lintEnv, envvar) } - err := d.exec.CommandContextInheritingStdStreams(ctx, "bazel", args...) + err := d.exec.CommandContextWithEnv(ctx, lintEnv, "bazel", args...) if err != nil { return err } diff --git a/pkg/cmd/dev/testdata/datadriven/lint b/pkg/cmd/dev/testdata/datadriven/lint index 0c7da9396adb..30f15c4575ad 100644 --- a/pkg/cmd/dev/testdata/datadriven/lint +++ b/pkg/cmd/dev/testdata/datadriven/lint @@ -1,6 +1,12 @@ exec dev lint ---- +bazel run //pkg/gen:code +bazel info workspace --color=no +bazel run //pkg/cmd/generate-cgo:generate-cgo '--run_under=cd crdb-checkout && ' +which cc +export CC= +export CXX= bazel run --config=test //build/bazelutil:lint -- -test.v bazel build //pkg/cmd/cockroach-short //pkg/cmd/dev //pkg/obsservice/cmd/obsservice //pkg/cmd/roachprod //pkg/cmd/roachtest --//build/toolchains:nogo_flag @@ -12,17 +18,36 @@ bazel run --config=test //build/bazelutil:lint -- -test.v -test.short -test.time exec dev lint pkg/cmd/dev ---- +bazel run //pkg/gen:code +bazel info workspace --color=no +bazel run //pkg/cmd/generate-cgo:generate-cgo '--run_under=cd crdb-checkout && ' +which cc +export CC= +export CXX= export PKG=./pkg/cmd/dev bazel run --config=test //build/bazelutil:lint -- -test.v +bazel build //pkg/cmd/cockroach-short //pkg/cmd/dev //pkg/obsservice/cmd/obsservice //pkg/cmd/roachprod //pkg/cmd/roachtest --//build/toolchains:nogo_flag exec dev lint -f TestLowercaseFunctionNames --cpus 4 ---- +bazel run //pkg/gen:code +bazel info workspace --color=no +bazel run //pkg/cmd/generate-cgo:generate-cgo '--run_under=cd crdb-checkout && ' +which cc +export CC= +export CXX= bazel run --config=test //build/bazelutil:lint --local_cpu_resources=4 -- -test.v -test.run Lint/TestLowercaseFunctionNames exec dev lint --cpus 4 ---- +bazel run //pkg/gen:code +bazel info workspace --color=no +bazel run //pkg/cmd/generate-cgo:generate-cgo '--run_under=cd crdb-checkout && ' +which cc +export CC= +export CXX= bazel run --config=test //build/bazelutil:lint --local_cpu_resources=4 -- -test.v bazel build //pkg/cmd/cockroach-short //pkg/cmd/dev //pkg/obsservice/cmd/obsservice //pkg/cmd/roachprod //pkg/cmd/roachtest --//build/toolchains:nogo_flag --local_cpu_resources=4 diff --git a/pkg/kv/kvserver/tenantrate/BUILD.bazel b/pkg/kv/kvserver/tenantrate/BUILD.bazel index 23921fd27552..ea684f60cc43 100644 --- a/pkg/kv/kvserver/tenantrate/BUILD.bazel +++ b/pkg/kv/kvserver/tenantrate/BUILD.bazel @@ -26,6 +26,7 @@ go_library( "//pkg/util/syncutil", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_tokenbucket//:tokenbucket", ], ) diff --git a/pkg/kv/kvserver/tenantrate/limiter.go b/pkg/kv/kvserver/tenantrate/limiter.go index 247f96cc8df8..5426bbef4712 100644 --- a/pkg/kv/kvserver/tenantrate/limiter.go +++ b/pkg/kv/kvserver/tenantrate/limiter.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/quotapool" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/tokenbucket" ) // Limiter is used to rate-limit KV requests for a given tenant. @@ -151,7 +152,7 @@ func (rl *limiter) RecordRead(ctx context.Context, respInfo tenantcostmodel.Resp amount := tb.config.ReadBatchUnits amount += float64(respInfo.ReadCount()) * tb.config.ReadRequestUnits amount += float64(respInfo.ReadBytes()) * tb.config.ReadUnitsPerByte - tb.Adjust(quotapool.Tokens(-amount)) + tb.Adjust(tokenbucket.Tokens(-amount)) // Do not notify the head of the queue. In the best case we did not disturb // the time at which it can be fulfilled and in the worst case, we made it // further in the future. @@ -174,7 +175,7 @@ func (rl *limiter) updateConfig(config Config) { rl.qp.Update(func(res quotapool.Resource) (shouldNotify bool) { tb := res.(*tokenBucket) tb.config = config - tb.UpdateConfig(quotapool.TokensPerSecond(config.Rate), quotapool.Tokens(config.Burst)) + tb.UpdateConfig(tokenbucket.TokensPerSecond(config.Rate), tokenbucket.Tokens(config.Burst)) return true }) } @@ -182,7 +183,7 @@ func (rl *limiter) updateConfig(config Config) { // tokenBucket represents the token bucket for KV Compute Units and its // associated configuration. It implements quotapool.Resource. type tokenBucket struct { - quotapool.TokenBucket + tokenbucket.TokenBucket config Config } @@ -190,8 +191,8 @@ type tokenBucket struct { var _ quotapool.Resource = (*tokenBucket)(nil) func (tb *tokenBucket) init(config Config, timeSource timeutil.TimeSource) { - tb.TokenBucket.Init( - quotapool.TokensPerSecond(config.Rate), quotapool.Tokens(config.Burst), timeSource, + tb.TokenBucket.InitWithNowFn( + tokenbucket.TokensPerSecond(config.Rate), tokenbucket.Tokens(config.Burst), timeSource.Now, ) tb.config = config } @@ -236,7 +237,7 @@ func (req *waitRequest) Acquire( // value, in case the quota pool is in debt and the read should block. needed = 0 } - return tb.TryToFulfill(quotapool.Tokens(needed)) + return tb.TryToFulfill(tokenbucket.Tokens(needed)) } // ShouldWait is part of quotapool.Request. diff --git a/pkg/testutils/lint/BUILD.bazel b/pkg/testutils/lint/BUILD.bazel index a8df5787b81a..a9c222cd7935 100644 --- a/pkg/testutils/lint/BUILD.bazel +++ b/pkg/testutils/lint/BUILD.bazel @@ -21,7 +21,7 @@ go_test( "nightly_lint_test.go", ], args = ["-test.timeout=295s"], - data = glob(["testdata/**"]), + data = glob(["testdata/**"]) + ["@go_sdk//:files"], embed = [":lint"], embedsrcs = ["gcassert_paths.txt"], gotags = ["lint"], diff --git a/pkg/testutils/lint/lint_test.go b/pkg/testutils/lint/lint_test.go index 00d4e9021ac0..02c25c977dbc 100644 --- a/pkg/testutils/lint/lint_test.go +++ b/pkg/testutils/lint/lint_test.go @@ -44,6 +44,12 @@ const cockroachDB = "github.com/cockroachdb/cockroach" //go:embed gcassert_paths.txt var rawGcassertPaths string +func init() { + if bazel.BuiltWithBazel() { + bazel.SetGoEnv() + } +} + func dirCmd( dir string, name string, args ...string, ) (*exec.Cmd, *bytes.Buffer, stream.Filter, error) { diff --git a/pkg/util/admission/BUILD.bazel b/pkg/util/admission/BUILD.bazel index 0b2a1ab6f69a..2639bac4da5c 100644 --- a/pkg/util/admission/BUILD.bazel +++ b/pkg/util/admission/BUILD.bazel @@ -36,13 +36,13 @@ go_library( "//pkg/util/humanizeutil", "//pkg/util/log", "//pkg/util/metric", - "//pkg/util/quotapool", "//pkg/util/syncutil", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_logtags//:logtags", "@com_github_cockroachdb_pebble//:pebble", "@com_github_cockroachdb_redact//:redact", + "@com_github_cockroachdb_tokenbucket//:tokenbucket", ], ) @@ -76,13 +76,13 @@ go_test( "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/metric", - "//pkg/util/quotapool", "//pkg/util/syncutil", "//pkg/util/timeutil", "//pkg/util/tracing", "@com_github_cockroachdb_datadriven//:datadriven", "@com_github_cockroachdb_pebble//:pebble", "@com_github_cockroachdb_redact//:redact", + "@com_github_cockroachdb_tokenbucket//:tokenbucket", "@com_github_guptarohit_asciigraph//:asciigraph", "@com_github_stretchr_testify//require", ], diff --git a/pkg/util/admission/elastic_cpu_granter.go b/pkg/util/admission/elastic_cpu_granter.go index f7eb1be88967..02a16af28313 100644 --- a/pkg/util/admission/elastic_cpu_granter.go +++ b/pkg/util/admission/elastic_cpu_granter.go @@ -20,9 +20,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" - "github.com/cockroachdb/cockroach/pkg/util/quotapool" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/tokenbucket" ) // We don't want the ability for an admitted a request to be able to run @@ -108,7 +108,7 @@ type elasticCPUGranter struct { // GrantCoordinator, which is ordered before the one in WorkQueue, since // requester.granted() is called while holding GrantCoordinator.mu. syncutil.Mutex - tb *quotapool.TokenBucket + tb *tokenbucket.TokenBucket tbLastReset time.Time tbReset *util.EveryN utilizationLimit float64 @@ -122,8 +122,8 @@ var _ granter = &elasticCPUGranter{} func newElasticCPUGranter( ambientCtx log.AmbientContext, st *cluster.Settings, metrics *elasticCPUGranterMetrics, ) *elasticCPUGranter { - tokenBucket := "apool.TokenBucket{} - tokenBucket.Init(0, 0, timeutil.DefaultTimeSource{}) + tokenBucket := &tokenbucket.TokenBucket{} + tokenBucket.InitWithNowFn(0, 0, timeutil.Now) return newElasticCPUGranterWithTokenBucket(ambientCtx, st, metrics, tokenBucket) } @@ -131,7 +131,7 @@ func newElasticCPUGranterWithTokenBucket( ambientCtx log.AmbientContext, st *cluster.Settings, metrics *elasticCPUGranterMetrics, - tokenBucket *quotapool.TokenBucket, + tokenBucket *tokenbucket.TokenBucket, ) *elasticCPUGranter { e := &elasticCPUGranter{ ctx: ambientCtx.AnnotateCtx(context.Background()), @@ -156,7 +156,7 @@ func (e *elasticCPUGranter) tryGet(count int64) (granted bool) { e.mu.Lock() defer e.mu.Unlock() - granted, _ = e.mu.tb.TryToFulfill(quotapool.Tokens(count)) + granted, _ = e.mu.tb.TryToFulfill(tokenbucket.Tokens(count)) return granted } @@ -170,7 +170,7 @@ func (e *elasticCPUGranter) returnGrantWithoutGrantingElsewhere(count int64) { e.mu.Lock() defer e.mu.Unlock() - e.mu.tb.Adjust(quotapool.Tokens(count)) + e.mu.tb.Adjust(tokenbucket.Tokens(count)) } // tookWithoutPermission implements granter. @@ -178,7 +178,7 @@ func (e *elasticCPUGranter) tookWithoutPermission(count int64) { e.mu.Lock() defer e.mu.Unlock() - e.mu.tb.Adjust(quotapool.Tokens(-count)) + e.mu.tb.Adjust(tokenbucket.Tokens(-count)) } // continueGrantChain implements granter. @@ -212,7 +212,7 @@ func (e *elasticCPUGranter) setUtilizationLimit(utilizationLimit float64) { // rate := utilizationLimit * float64(int64(runtime.GOMAXPROCS(0))*time.Second.Nanoseconds()) e.mu.utilizationLimit = utilizationLimit - e.mu.tb.UpdateConfig(quotapool.TokensPerSecond(rate), quotapool.Tokens(rate)) + e.mu.tb.UpdateConfig(tokenbucket.TokensPerSecond(rate), tokenbucket.Tokens(rate)) if now := timeutil.Now(); now.Sub(e.mu.tbLastReset) > 15*time.Second { // TODO(irfansharif): make this is a cluster setting? // Periodically reset the token bucket. This is just defense-in-depth // and at worst, over-admits. We've seen production clusters where the diff --git a/pkg/util/admission/elastic_cpu_granter_test.go b/pkg/util/admission/elastic_cpu_granter_test.go index 1099cc3cf758..8f8f9b9ddfc9 100644 --- a/pkg/util/admission/elastic_cpu_granter_test.go +++ b/pkg/util/admission/elastic_cpu_granter_test.go @@ -21,9 +21,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/quotapool" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/datadriven" + "github.com/cockroachdb/tokenbucket" "github.com/stretchr/testify/require" ) @@ -72,7 +72,7 @@ func TestElasticCPUGranter(t *testing.T) { elasticCPUGranterMetrics *elasticCPUGranterMetrics elasticCPUGranter *elasticCPUGranter mt *timeutil.ManualTime - tokenBucket *quotapool.TokenBucket + tokenBucket *tokenbucket.TokenBucket ) printTokenBucket := func() string { @@ -128,8 +128,8 @@ func TestElasticCPUGranter(t *testing.T) { elasticCPURequester = &testElasticCPURequester{} elasticCPUGranterMetrics = makeElasticCPUGranterMetrics() mt = timeutil.NewManualTime(t0) - tokenBucket = "apool.TokenBucket{} - tokenBucket.Init(0, 0, mt) + tokenBucket = &tokenbucket.TokenBucket{} + tokenBucket.InitWithNowFn(0, 0, mt.Now) elasticCPUGranter = newElasticCPUGranterWithTokenBucket( log.MakeTestingAmbientCtxWithNewTracer(), cluster.MakeTestingClusterSettings(), diff --git a/pkg/util/quotapool/BUILD.bazel b/pkg/util/quotapool/BUILD.bazel index bf3cddcb2347..7162a93a8c06 100644 --- a/pkg/util/quotapool/BUILD.bazel +++ b/pkg/util/quotapool/BUILD.bazel @@ -9,7 +9,6 @@ go_library( "intpool.go", "notify_queue.go", "quotapool.go", - "token_bucket.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/util/quotapool", visibility = ["//visibility:public"], @@ -19,6 +18,7 @@ go_library( "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_redact//:redact", + "@com_github_cockroachdb_tokenbucket//:tokenbucket", ], ) @@ -32,7 +32,6 @@ go_test( "intpool_test.go", "node_size_test.go", "notify_queue_test.go", - "token_bucket_test.go", ], args = ["-test.timeout=55s"], embed = [":quotapool"], diff --git a/pkg/util/quotapool/int_rate.go b/pkg/util/quotapool/int_rate.go index 0df950193b06..8cf4ab99e236 100644 --- a/pkg/util/quotapool/int_rate.go +++ b/pkg/util/quotapool/int_rate.go @@ -17,6 +17,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/tokenbucket" ) // Limit defines a rate in terms of quota per second. @@ -43,9 +44,9 @@ type RateLimiter struct { // If rate == Inf() then any bursts are allowed, and acquisition does not block. func NewRateLimiter(name string, rate Limit, burst int64, options ...Option) *RateLimiter { rl := &RateLimiter{} - tb := &TokenBucket{} + tb := &tokenbucket.TokenBucket{} rl.qp = New(name, tb, options...) - tb.Init(TokensPerSecond(rate), Tokens(burst), rl.qp.timeSource) + tb.InitWithNowFn(tokenbucket.TokensPerSecond(rate), tokenbucket.Tokens(burst), rl.qp.timeSource.Now) rl.isInf.Set(math.IsInf(float64(rate), 1)) return rl } @@ -99,8 +100,8 @@ func (rl *RateLimiter) AdmitN(n int64) bool { func (rl *RateLimiter) UpdateLimit(rate Limit, burst int64) { rl.qp.Update(func(res Resource) (shouldNotify bool) { rl.isInf.Set(math.IsInf(float64(rate), 1)) - tb := res.(*TokenBucket) - tb.UpdateConfig(TokensPerSecond(rate), Tokens(burst)) + tb := res.(*tokenbucket.TokenBucket) + tb.UpdateConfig(tokenbucket.TokensPerSecond(rate), tokenbucket.Tokens(burst)) return true }) } @@ -116,8 +117,8 @@ type RateAlloc struct { // methods on the RateAlloc after this call. func (ra *RateAlloc) Return() { ra.rl.qp.Update(func(res Resource) (shouldNotify bool) { - tb := res.(*TokenBucket) - tb.Adjust(Tokens(ra.alloc)) + tb := res.(*tokenbucket.TokenBucket) + tb.Adjust(tokenbucket.Tokens(ra.alloc)) return true }) ra.rl.putRateAlloc((*rateAlloc)(ra)) @@ -156,8 +157,8 @@ func (rl *RateLimiter) putRateRequest(r *rateRequest) { func (i *rateRequest) Acquire( ctx context.Context, res Resource, ) (fulfilled bool, tryAgainAfter time.Duration) { - tb := res.(*TokenBucket) - return tb.TryToFulfill(Tokens(i.want)) + tb := res.(*tokenbucket.TokenBucket) + return tb.TryToFulfill(tokenbucket.Tokens(i.want)) } func (i *rateRequest) ShouldWait() bool { diff --git a/pkg/util/quotapool/token_bucket.go b/pkg/util/quotapool/token_bucket.go deleted file mode 100644 index dbd8a0a346dc..000000000000 --- a/pkg/util/quotapool/token_bucket.go +++ /dev/null @@ -1,164 +0,0 @@ -// Copyright 2021 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package quotapool - -import ( - "time" - - "github.com/cockroachdb/cockroach/pkg/util/timeutil" -) - -// Tokens are abstract units (usually units of work). -type Tokens float64 - -// TokensPerSecond is the rate of token replenishment. -type TokensPerSecond float64 - -// TokenBucket implements the basic accounting for a token bucket. -// -// A token bucket has a rate of replenishment and a burst limit. Tokens are -// replenished over time, up to the burst limit. -// -// The token bucket keeps track of the current amount and updates it as time -// passes. The bucket can go into debt (i.e. negative current amount). -type TokenBucket struct { - rate TokensPerSecond - burst Tokens - timeSource timeutil.TimeSource - - current Tokens - lastUpdated time.Time - - exhaustedStart time.Time - exhaustedMicros int64 -} - -// Init the token bucket. -func (tb *TokenBucket) Init(rate TokensPerSecond, burst Tokens, timeSource timeutil.TimeSource) { - *tb = TokenBucket{ - rate: rate, - burst: burst, - timeSource: timeSource, - current: burst, - lastUpdated: timeSource.Now(), - } -} - -// Update moves the time forward, accounting for the replenishment since the -// last update. -func (tb *TokenBucket) Update() { - now := tb.timeSource.Now() - if since := now.Sub(tb.lastUpdated); since > 0 { - tb.current += Tokens(float64(tb.rate) * since.Seconds()) - - if tb.current > tb.burst { - tb.current = tb.burst - } - tb.lastUpdated = now - tb.updateExhaustedMicros() - } -} - -// UpdateConfig updates the rate and burst limits. The change in burst will be -// applied to the current token quantity. For example, if the RateLimiter -// currently had 5 available tokens and the burst is updated from 10 to 20, the -// amount will increase to 15. Similarly, if the burst is decreased by 10, the -// current quota will decrease accordingly, potentially putting the limiter into -// debt. -func (tb *TokenBucket) UpdateConfig(rate TokensPerSecond, burst Tokens) { - tb.Update() - - burstDelta := burst - tb.burst - tb.rate = rate - tb.burst = burst - - tb.current += burstDelta - tb.updateExhaustedMicros() -} - -// Reset resets the current tokens to whatever the burst is. -func (tb *TokenBucket) Reset() { - tb.current = tb.burst - tb.updateExhaustedMicros() -} - -// Adjust returns tokens to the bucket (positive delta) or accounts for a debt -// of tokens (negative delta). -func (tb *TokenBucket) Adjust(delta Tokens) { - tb.Update() - tb.current += delta - if tb.current > tb.burst { - tb.current = tb.burst - } - tb.updateExhaustedMicros() -} - -// TryToFulfill either removes the given amount if is available, or returns a -// time after which the request should be retried. -func (tb *TokenBucket) TryToFulfill(amount Tokens) (fulfilled bool, tryAgainAfter time.Duration) { - tb.Update() - - // Deal with the case where the request is larger than the burst size. In - // this case we'll allow the acquisition to complete if and when the current - // value is equal to the burst. If the acquisition succeeds, it will put the - // limiter into debt. - want := amount - if want > tb.burst { - want = tb.burst - } - if delta := want - tb.current; delta > 0 { - // Compute the time it will take to get to the needed capacity. - timeDelta := time.Duration((float64(delta) * float64(time.Second)) / float64(tb.rate)) - if timeDelta < time.Nanosecond { - timeDelta = time.Nanosecond - } - return false, timeDelta - } - - tb.current -= amount - tb.updateExhaustedMicros() - return true, 0 -} - -// Exhausted returns the cumulative duration over which this token bucket was -// exhausted. Exported only for metrics. -func (tb *TokenBucket) Exhausted() time.Duration { - var ongoingExhaustionMicros int64 - if !tb.exhaustedStart.IsZero() { - ongoingExhaustionMicros = tb.timeSource.Now().Sub(tb.exhaustedStart).Microseconds() - } - return time.Duration(tb.exhaustedMicros+ongoingExhaustionMicros) * time.Microsecond -} - -// Available returns the currently available tokens (can be -ve). Exported only -// for metrics. -func (tb *TokenBucket) Available() Tokens { - return tb.current -} - -func (tb *TokenBucket) updateExhaustedMicros() { - now := tb.timeSource.Now() - if tb.current <= 0 && tb.exhaustedStart.IsZero() { - tb.exhaustedStart = now - } - if tb.current > 0 && !tb.exhaustedStart.IsZero() { - tb.exhaustedMicros += now.Sub(tb.exhaustedStart).Microseconds() - tb.exhaustedStart = time.Time{} - } -} - -// TestingInternalParameters returns the refill rate (configured), burst tokens -// (configured), and number of available tokens where available <= burst. It's -// used in tests. -func (tb *TokenBucket) TestingInternalParameters() (rate TokensPerSecond, burst, available Tokens) { - tb.Update() - return tb.rate, tb.burst, tb.current -} diff --git a/pkg/util/quotapool/token_bucket_test.go b/pkg/util/quotapool/token_bucket_test.go deleted file mode 100644 index 0e02a156a693..000000000000 --- a/pkg/util/quotapool/token_bucket_test.go +++ /dev/null @@ -1,125 +0,0 @@ -// Copyright 2021 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package quotapool - -import ( - "testing" - "time" - - "github.com/cockroachdb/cockroach/pkg/util/timeutil" -) - -func TestTokenBucket(t *testing.T) { - t0 := time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC) - mt := timeutil.NewManualTime(t0) - - var tb TokenBucket - tb.Init(10, 20, mt) - - check := func(expected Tokens) { - t.Helper() - const eps = 1e-10 - tb.Update() - if delta := tb.Available() - expected; delta > eps || delta < -eps { - t.Fatalf("expected current amount %v, got %v", expected, tb.current) - } - } - - checkFulfill := func(amount Tokens, expected time.Duration) { - t.Helper() - ok, tryAgainAfter := tb.TryToFulfill(amount) - if ok { - if expected != 0 { - t.Fatalf("expected not to be fulfilled") - } - } else { - if expected == 0 { - t.Fatalf("expected to be fulfilled") - } else if tryAgainAfter.Round(time.Microsecond) != expected.Round(time.Microsecond) { - t.Fatalf("expected tryAgainAfter %v, got %v", expected, tryAgainAfter) - } - } - } - - checkExhausted := func(expDur time.Duration) { - t.Helper() - if got := tb.Exhausted(); got != expDur { - t.Fatalf("expected exhausted duration %s, got %s", expDur, got) - } - } - - check(20) - tb.Adjust(-10) - check(10) - tb.Adjust(5) - check(15) - tb.Adjust(20) - check(20) - - mt.Advance(time.Second) - check(20) - tb.Adjust(-15) - check(5) - - mt.Advance(time.Second) - check(15) - mt.Advance(time.Second) - check(20) - - checkFulfill(15, 0) - checkFulfill(15, time.Second) - - mt.Advance(10 * time.Second) - // Now put the bucket into debt with a huge ask. - checkFulfill(120, 0) - checkFulfill(10, 11*time.Second) - - mt.Advance(100 * time.Second) - - // A full bucket should remain full. - tb.UpdateConfig(100, 1000) - checkFulfill(1000, 0) - checkFulfill(100, 1*time.Second) - - tb.UpdateConfig(10, 20) - check(-980) - checkFulfill(20, 100*time.Second) - - // Verify that resetting the bucket resets it to the burst size. - tb.Reset() - check(20) - - tb.UpdateConfig(100, 100) - tb.Reset() - check(100) - - // Ensure that the exhaustion metric behaves as expected. - initialExhausted := tb.Exhausted() - // Put the token bucket into debt. - tb.Adjust(-110) - check(-10) - // Advance the clock by 20ms, but it should still be in debt. - mt.Advance(20 * time.Millisecond) - check(-8) - // Verify that we've accumulated this 20ms into our exhaustion value. - checkExhausted(initialExhausted + 20*time.Millisecond) - // Advance the clock by just enough to no longer be exhausted. - mt.Advance(90 * time.Millisecond) - check(1) - // Verify that we've accumulated the 90ms into our exhaustion metric. - checkExhausted(initialExhausted + (20+90)*time.Millisecond) - // Add more tokens by advancing the clock. - mt.Advance(200 * time.Millisecond) - check(21) - // Check that our exhaustion duration is unchanged, since we've stayed in - // the positive. - checkExhausted(initialExhausted + (20+90)*time.Millisecond) -}