From 69b61528ec034116d45b18d4dde2c2e3902bf572 Mon Sep 17 00:00:00 2001 From: Oleg Pudeyev Date: Tue, 27 Aug 2024 09:51:15 -0400 Subject: [PATCH] Move tracing rate limiters to Core Dynamic instrumentation will use the TokenBucket rate limiter. To facilitate this, the basic rate limiters are being moved from tracing to core. --- lib/datadog/core/rate_limiter.rb | 183 +++++++++++++++++ lib/datadog/tracing/sampling/rate_limiter.rb | 185 ------------------ lib/datadog/tracing/sampling/span/ext.rb | 2 +- lib/datadog/tracing/sampling/span/rule.rb | 2 +- sig/datadog/core/rate_limiter.rbs | 35 ++++ sig/datadog/tracing/sampling/rate_limiter.rbs | 37 ---- .../sampling => core}/rate_limiter_spec.rb | 4 +- .../tracing/sampling/rule_sampler_spec.rb | 10 +- 8 files changed, 227 insertions(+), 231 deletions(-) create mode 100644 lib/datadog/core/rate_limiter.rb delete mode 100644 lib/datadog/tracing/sampling/rate_limiter.rb create mode 100644 sig/datadog/core/rate_limiter.rbs delete mode 100644 sig/datadog/tracing/sampling/rate_limiter.rbs rename spec/datadog/{tracing/sampling => core}/rate_limiter_spec.rb (97%) diff --git a/lib/datadog/core/rate_limiter.rb b/lib/datadog/core/rate_limiter.rb new file mode 100644 index 00000000000..0b749fcee2b --- /dev/null +++ b/lib/datadog/core/rate_limiter.rb @@ -0,0 +1,183 @@ +# frozen_string_literal: true + +require_relative 'utils/time' + +module Datadog + module Core + # Checks for rate limiting on a resource. + class RateLimiter + # Checks if resource of specified size can be + # conforms with the current limit. + # + # Implementations of this method are not guaranteed + # to be side-effect free. + # + # @return [Boolean] whether a resource conforms with the current limit + def allow?(size); end + + # The effective rate limiting ratio based on + # recent calls to `allow?`. + # + # @return [Float] recent allowance ratio + def effective_rate; end + end + + # Implementation of the Token Bucket metering algorithm + # for rate limiting. + # + # @see https://en.wikipedia.org/wiki/Token_bucket Token bucket + class TokenBucket < RateLimiter + attr_reader :rate, :max_tokens + + # @param rate [Numeric] Allowance rate, in units per second + # if rate is negative, always allow + # if rate is zero, never allow + # @param max_tokens [Numeric] Limit of available tokens + def initialize(rate, max_tokens = rate) + super() + + raise ArgumentError, "rate must be a number: #{rate}" unless rate.is_a?(Numeric) + raise ArgumentError, "max_tokens must be a number: #{max_tokens}" unless max_tokens.is_a?(Numeric) + + @rate = rate + @max_tokens = max_tokens + + @tokens = max_tokens + @total_messages = 0 + @conforming_messages = 0 + @prev_conforming_messages = nil + @prev_total_messages = nil + @current_window = nil + + @last_refill = Core::Utils::Time.get_time + end + + # Checks if a message of provided +size+ + # conforms with the current bucket limit. + # + # If it does, return +true+ and remove +size+ + # tokens from the bucket. + # If it does not, return +false+ without affecting + # the tokens from the bucket. + # + # @return [Boolean] +true+ if message conforms with current bucket limit + def allow?(size) + allowed = should_allow?(size) + update_rate_counts(allowed) + allowed + end + + # Ratio of 'conformance' per 'total messages' checked + # averaged for the past 2 buckets + # + # Returns +1.0+ when no messages have been checked yet. + # + # @return [Float] Conformance ratio, between +[0,1]+ + def effective_rate + return 0.0 if @rate.zero? + return 1.0 if @rate < 0 || @total_messages.zero? + + return current_window_rate if @prev_conforming_messages.nil? || @prev_total_messages.nil? + + (@conforming_messages.to_f + @prev_conforming_messages.to_f) / (@total_messages + @prev_total_messages) + end + + # Ratio of 'conformance' per 'total messages' checked + # on this bucket + # + # Returns +1.0+ when no messages have been checked yet. + # + # @return [Float] Conformance ratio, between +[0,1]+ + def current_window_rate + return 1.0 if @total_messages.zero? + + @conforming_messages.to_f / @total_messages + end + + # @return [Numeric] number of tokens currently available + def available_tokens + @tokens + end + + private + + def refill_since_last_message + now = Core::Utils::Time.get_time + elapsed = now - @last_refill + + # Update the number of available tokens, but ensure we do not exceed the max + # we return the min of tokens + rate*elapsed, or max tokens + refill_tokens(@rate * elapsed) + + @last_refill = now + end + + def refill_tokens(size) + @tokens += size + @tokens = @max_tokens if @tokens > @max_tokens + end + + def increment_total_count + @total_messages += 1 + end + + def increment_conforming_count + @conforming_messages += 1 + end + + def should_allow?(size) + # rate limit of 0 blocks everything + return false if @rate.zero? + + # negative rate limit disables rate limiting + return true if @rate < 0 + + refill_since_last_message + + # if tokens < 1 we don't allow? + return false if @tokens < size + + @tokens -= size + + true + end + + # Sets and Updates the past two 1 second windows for which + # the rate limiter must compute it's rate over and updates + # the total count, and conforming message count if +allowed+ + def update_rate_counts(allowed) + now = Core::Utils::Time.get_time + + # No tokens have been seen yet, start a new window + if @current_window.nil? + @current_window = now + # If more than 1 second has past since last window, reset + elsif now - @current_window >= 1 + @prev_conforming_messages = @conforming_messages + @prev_total_messages = @total_messages + @conforming_messages = 0 + @total_messages = 0 + @current_window = now + end + + increment_conforming_count if allowed + + increment_total_count + end + end + + # {Datadog::Core::RateLimiter} that accepts all resources, + # with no limits. + class UnlimitedLimiter < RateLimiter + # @return [Boolean] always +true+ + def allow?(_) + true + end + + # @return [Float] always 100% + def effective_rate + 1.0 + end + end + end +end diff --git a/lib/datadog/tracing/sampling/rate_limiter.rb b/lib/datadog/tracing/sampling/rate_limiter.rb deleted file mode 100644 index 40424f146fc..00000000000 --- a/lib/datadog/tracing/sampling/rate_limiter.rb +++ /dev/null @@ -1,185 +0,0 @@ -# frozen_string_literal: true - -require_relative '../../core/utils/time' - -module Datadog - module Tracing - module Sampling - # Checks for rate limiting on a resource. - class RateLimiter - # Checks if resource of specified size can be - # conforms with the current limit. - # - # Implementations of this method are not guaranteed - # to be side-effect free. - # - # @return [Boolean] whether a resource conforms with the current limit - def allow?(size); end - - # The effective rate limiting ratio based on - # recent calls to `allow?`. - # - # @return [Float] recent allowance ratio - def effective_rate; end - end - - # Implementation of the Token Bucket metering algorithm - # for rate limiting. - # - # @see https://en.wikipedia.org/wiki/Token_bucket Token bucket - class TokenBucket < RateLimiter - attr_reader :rate, :max_tokens - - # @param rate [Numeric] Allowance rate, in units per second - # if rate is negative, always allow - # if rate is zero, never allow - # @param max_tokens [Numeric] Limit of available tokens - def initialize(rate, max_tokens = rate) - super() - - raise ArgumentError, "rate must be a number: #{rate}" unless rate.is_a?(Numeric) - raise ArgumentError, "max_tokens must be a number: #{max_tokens}" unless max_tokens.is_a?(Numeric) - - @rate = rate - @max_tokens = max_tokens - - @tokens = max_tokens - @total_messages = 0 - @conforming_messages = 0 - @prev_conforming_messages = nil - @prev_total_messages = nil - @current_window = nil - - @last_refill = Core::Utils::Time.get_time - end - - # Checks if a message of provided +size+ - # conforms with the current bucket limit. - # - # If it does, return +true+ and remove +size+ - # tokens from the bucket. - # If it does not, return +false+ without affecting - # the tokens from the bucket. - # - # @return [Boolean] +true+ if message conforms with current bucket limit - def allow?(size) - allowed = should_allow?(size) - update_rate_counts(allowed) - allowed - end - - # Ratio of 'conformance' per 'total messages' checked - # averaged for the past 2 buckets - # - # Returns +1.0+ when no messages have been checked yet. - # - # @return [Float] Conformance ratio, between +[0,1]+ - def effective_rate - return 0.0 if @rate.zero? - return 1.0 if @rate < 0 || @total_messages.zero? - - return current_window_rate if @prev_conforming_messages.nil? || @prev_total_messages.nil? - - (@conforming_messages.to_f + @prev_conforming_messages.to_f) / (@total_messages + @prev_total_messages) - end - - # Ratio of 'conformance' per 'total messages' checked - # on this bucket - # - # Returns +1.0+ when no messages have been checked yet. - # - # @return [Float] Conformance ratio, between +[0,1]+ - def current_window_rate - return 1.0 if @total_messages.zero? - - @conforming_messages.to_f / @total_messages - end - - # @return [Numeric] number of tokens currently available - def available_tokens - @tokens - end - - private - - def refill_since_last_message - now = Core::Utils::Time.get_time - elapsed = now - @last_refill - - # Update the number of available tokens, but ensure we do not exceed the max - # we return the min of tokens + rate*elapsed, or max tokens - refill_tokens(@rate * elapsed) - - @last_refill = now - end - - def refill_tokens(size) - @tokens += size - @tokens = @max_tokens if @tokens > @max_tokens - end - - def increment_total_count - @total_messages += 1 - end - - def increment_conforming_count - @conforming_messages += 1 - end - - def should_allow?(size) - # rate limit of 0 blocks everything - return false if @rate.zero? - - # negative rate limit disables rate limiting - return true if @rate < 0 - - refill_since_last_message - - # if tokens < 1 we don't allow? - return false if @tokens < size - - @tokens -= size - - true - end - - # Sets and Updates the past two 1 second windows for which - # the rate limiter must compute it's rate over and updates - # the total count, and conforming message count if +allowed+ - def update_rate_counts(allowed) - now = Core::Utils::Time.get_time - - # No tokens have been seen yet, start a new window - if @current_window.nil? - @current_window = now - # If more than 1 second has past since last window, reset - elsif now - @current_window >= 1 - @prev_conforming_messages = @conforming_messages - @prev_total_messages = @total_messages - @conforming_messages = 0 - @total_messages = 0 - @current_window = now - end - - increment_conforming_count if allowed - - increment_total_count - end - end - - # {Datadog::Tracing::Sampling::RateLimiter} that accepts all resources, - # with no limits. - class UnlimitedLimiter < RateLimiter - # @return [Boolean] always +true+ - def allow?(_) - true - end - - # @return [Float] always 100% - def effective_rate - 1.0 - end - end - end - end -end diff --git a/lib/datadog/tracing/sampling/span/ext.rb b/lib/datadog/tracing/sampling/span/ext.rb index 5b04dfa6d1d..cc43505dd91 100644 --- a/lib/datadog/tracing/sampling/span/ext.rb +++ b/lib/datadog/tracing/sampling/span/ext.rb @@ -9,7 +9,7 @@ module Ext # Accept all spans (100% retention). DEFAULT_SAMPLE_RATE = 1.0 # Unlimited. - # @see Datadog::Tracing::Sampling::TokenBucket + # @see Datadog::Core::TokenBucket DEFAULT_MAX_PER_SECOND = -1 # Sampling decision method used to come to the sampling decision for this span diff --git a/lib/datadog/tracing/sampling/span/rule.rb b/lib/datadog/tracing/sampling/span/rule.rb index 797b355518c..827e17a6821 100644 --- a/lib/datadog/tracing/sampling/span/rule.rb +++ b/lib/datadog/tracing/sampling/span/rule.rb @@ -30,7 +30,7 @@ def initialize( @rate_limit = rate_limit @sampler = Sampling::RateSampler.new(sample_rate) - @rate_limiter = Sampling::TokenBucket.new(rate_limit) + @rate_limiter = Core::TokenBucket.new(rate_limit) end # This method should only be invoked for spans that are part diff --git a/sig/datadog/core/rate_limiter.rbs b/sig/datadog/core/rate_limiter.rbs new file mode 100644 index 00000000000..25fca42b282 --- /dev/null +++ b/sig/datadog/core/rate_limiter.rbs @@ -0,0 +1,35 @@ +module Datadog + module Core + class RateLimiter + def allow?: (untyped size) -> nil + def effective_rate: () -> nil + end + class TokenBucket < RateLimiter + attr_reader rate: untyped + + attr_reader max_tokens: untyped + def initialize: (untyped rate, ?untyped max_tokens) -> void + def allow?: (untyped size) -> untyped + def effective_rate: () -> (::Float | untyped) + def current_window_rate: () -> (::Float | untyped) + def available_tokens: () -> untyped + + private + + def refill_since_last_message: () -> untyped + + def refill_tokens: (untyped size) -> untyped + + def increment_total_count: () -> untyped + + def increment_conforming_count: () -> untyped + + def should_allow?: (untyped size) -> (false | true) + def update_rate_counts: (untyped allowed) -> untyped + end + class UnlimitedLimiter < RateLimiter + def allow?: (untyped _) -> true + def effective_rate: () -> ::Float + end + end +end diff --git a/sig/datadog/tracing/sampling/rate_limiter.rbs b/sig/datadog/tracing/sampling/rate_limiter.rbs deleted file mode 100644 index 63e86785d35..00000000000 --- a/sig/datadog/tracing/sampling/rate_limiter.rbs +++ /dev/null @@ -1,37 +0,0 @@ -module Datadog - module Tracing - module Sampling - class RateLimiter - def allow?: (untyped size) -> nil - def effective_rate: () -> nil - end - class TokenBucket < RateLimiter - attr_reader rate: untyped - - attr_reader max_tokens: untyped - def initialize: (untyped rate, ?untyped max_tokens) -> void - def allow?: (untyped size) -> untyped - def effective_rate: () -> (::Float | untyped) - def current_window_rate: () -> (::Float | untyped) - def available_tokens: () -> untyped - - private - - def refill_since_last_message: () -> untyped - - def refill_tokens: (untyped size) -> untyped - - def increment_total_count: () -> untyped - - def increment_conforming_count: () -> untyped - - def should_allow?: (untyped size) -> (false | true) - def update_rate_counts: (untyped allowed) -> untyped - end - class UnlimitedLimiter < RateLimiter - def allow?: (untyped _) -> true - def effective_rate: () -> ::Float - end - end - end -end diff --git a/spec/datadog/tracing/sampling/rate_limiter_spec.rb b/spec/datadog/core/rate_limiter_spec.rb similarity index 97% rename from spec/datadog/tracing/sampling/rate_limiter_spec.rb rename to spec/datadog/core/rate_limiter_spec.rb index bff9f01d4c1..11bbd1c146d 100644 --- a/spec/datadog/tracing/sampling/rate_limiter_spec.rb +++ b/spec/datadog/core/rate_limiter_spec.rb @@ -1,8 +1,8 @@ require 'spec_helper' -require 'datadog/tracing/sampling/rate_limiter' +require 'datadog/core/rate_limiter' -RSpec.describe Datadog::Tracing::Sampling::TokenBucket do +RSpec.describe Datadog::Core::TokenBucket do subject(:bucket) { described_class.new(rate, max_tokens) } let(:rate) { 1 } diff --git a/spec/datadog/tracing/sampling/rule_sampler_spec.rb b/spec/datadog/tracing/sampling/rule_sampler_spec.rb index 80ad835debd..119d63ae42d 100644 --- a/spec/datadog/tracing/sampling/rule_sampler_spec.rb +++ b/spec/datadog/tracing/sampling/rule_sampler_spec.rb @@ -9,7 +9,7 @@ RSpec.describe Datadog::Tracing::Sampling::RuleSampler do let(:rule_sampler) { described_class.new(rules, rate_limiter: rate_limiter, default_sampler: default_sampler) } let(:rules) { [] } - let(:rate_limiter) { instance_double(Datadog::Tracing::Sampling::RateLimiter) } + let(:rate_limiter) { instance_double(Datadog::Core::RateLimiter) } let(:default_sampler) { instance_double(Datadog::Tracing::Sampling::RateByServiceSampler) } let(:effective_rate) { 0.9 } let(:allow?) { true } @@ -32,7 +32,7 @@ describe '#initialize' do subject(:rule_sampler) { described_class.new(rules) } - it { expect(rule_sampler.rate_limiter).to be_a(Datadog::Tracing::Sampling::TokenBucket) } + it { expect(rule_sampler.rate_limiter).to be_a(Datadog::Core::TokenBucket) } it { expect(rule_sampler.default_sampler).to be_a(Datadog::Tracing::Sampling::RateByServiceSampler) } context 'with rate_limit ENV' do @@ -41,7 +41,7 @@ .and_return(20.0) end - it { expect(rule_sampler.rate_limiter).to be_a(Datadog::Tracing::Sampling::TokenBucket) } + it { expect(rule_sampler.rate_limiter).to be_a(Datadog::Core::TokenBucket) } end context 'with default_sample_rate ENV' do @@ -58,13 +58,13 @@ context 'with rate_limit' do subject(:rule_sampler) { described_class.new(rules, rate_limit: 1.0) } - it { expect(rule_sampler.rate_limiter).to be_a(Datadog::Tracing::Sampling::TokenBucket) } + it { expect(rule_sampler.rate_limiter).to be_a(Datadog::Core::TokenBucket) } end context 'with nil rate_limit' do subject(:rule_sampler) { described_class.new(rules, rate_limit: nil) } - it { expect(rule_sampler.rate_limiter).to be_a(Datadog::Tracing::Sampling::UnlimitedLimiter) } + it { expect(rule_sampler.rate_limiter).to be_a(Datadog::Core::UnlimitedLimiter) } end context 'with default_sample_rate' do