Skip to content

Commit

Permalink
feat(request): Implement circuit breaking
Browse files Browse the repository at this point in the history
  • Loading branch information
2k-joker committed Apr 14, 2024
1 parent 2c64786 commit ead6342
Show file tree
Hide file tree
Showing 14 changed files with 778 additions and 11 deletions.
1 change: 1 addition & 0 deletions httpigeon.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ Gem::Specification.new do |spec|
spec.add_development_dependency "rubocop", "~> 1.21"
spec.add_development_dependency "rubocop-rspec", "~> 2.24"
spec.add_development_dependency "pry", "~> 0.13.1"
spec.add_development_dependency "timecop", "~> 0.9.8"

# For more information and examples about making a new gem, check out our
# guide at: https://bundler.io/guides/creating_gem.html
Expand Down
25 changes: 24 additions & 1 deletion lib/httpigeon.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
require "httpigeon/logger"
require "httpigeon/request"
require "httpigeon/response"
require "httpigeon/circuit_breaker/fuse"

module HTTPigeon
extend self
Expand All @@ -18,6 +19,8 @@ module FilterPatterns
CLIENT_SECRET = "/(?'key'(client_?(s|S)?ecret=))(?'value'([^&$])*)/".freeze
end

class InvalidConfigurationError < StandardError; end

delegate :default_event_type,
:default_filter_keys,
:redactor_string,
Expand All @@ -26,19 +29,39 @@ module FilterPatterns
:auto_generate_request_id,
:notify_all_exceptions,
:exception_notifier,
:mount_circuit_breaker,
:log_circuit_events,
:fuse_error_codes_watchlist,
:fuse_on_circuit_open,
:fuse_max_failures_count,
:fuse_min_failures_count,
:fuse_failure_rate_threshold,
:fuse_sample_window,
:fuse_open_circuit_sleep_window,
:fuse_on_open_circuit,
to: :configuration

def configure
@config = HTTPigeon::Configuration.new

yield(@config)
yield(@config) if block_given?

validate_config(@config)

@config.freeze
end

def stdout_logger
@stdout_logger ||= ::Logger.new($stdout)
end

private

def configuration
@configuration ||= @config || HTTPigeon::Configuration.new
end

def validate_config(config)
raise InvalidConfigurationError, "Fuse sleep window: #{config.fuse_open_circuit_sleep_window} must be less than or equal to sample window: #{config.fuse_sample_window}" if fuse_open_circuit_sleep_window > fuse_sample_window
end
end
11 changes: 11 additions & 0 deletions lib/httpigeon/circuit_breaker/errors.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module HTTPigeon
module CircuitBreaker
class Error < StandardError; end

class CircuitOpenError < Error
def initialize(service_id)
super("Circuit open for service: #{service_id}")
end
end
end
end
197 changes: 197 additions & 0 deletions lib/httpigeon/circuit_breaker/fuse.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
require_relative 'errors'
require_relative 'fuse_config'
require_relative 'memory_store'
require_relative '../middleware/circuit_breaker'

module HTTPigeon
module CircuitBreaker
class Fuse
STATE_OPEN = 'open'.freeze
STATE_HALF_OPEN = 'half_open'.freeze
STATE_CLOSED = 'closed'.freeze

def self.from_options(options)
new(FuseConfig.new(options))
end

attr_reader :service_id, :config, :storage

def initialize(config)
@config = config
@service_id = config.service_id.to_s
@storage = CircuitBreaker::MemoryStore.new(config.sample_window)
@open_storage_key = "circuit:#{service_id}:#{STATE_OPEN}"
@half_open_storage_key = "circuit:#{service_id}:#{STATE_HALF_OPEN}"
@state_change_syncer = Mutex.new
end

def execute(request_id: nil)
@request_id = request_id

if open?
record_tripped!

config.on_open_circuit.call(config.null_response, config.circuit_open_error)
else
begin
response = yield
server_maintenance_timeout = response.headers[config.maintenance_mode_header].to_i

if server_maintenance_timeout.positive?
record_failure!
open!(
{
expires_in: server_maintenance_timeout,
# for logging purposes. can't log expires_in because it might be overridden if greater than max
server_maintenance_timeout: server_maintenance_timeout
}
)

return config.on_open_circuit.call(response, config.circuit_open_error)
end

record_success!
response
rescue Faraday::Error => e
record_failure! if e.response_status >= 500 || config.error_codes_watchlist.include?(e.response_status)

raise e
end
end
end

def open?
storage.key?(open_storage_key)
end

def half_open?
storage.key?(half_open_storage_key)
end

def failure_count
storage.get(stat_storage_key(:failure)).to_i
end

def success_count
storage.get(stat_storage_key(:success)).to_i
end

def tripped_count
storage.get(stat_storage_key(:tripped)).to_i
end

def failure_rate
total_stats = success_count + failure_count + tripped_count

return 0.0 unless total_stats.positive?

(total_stats - success_count).to_f / total_stats
end

private

attr_reader :open_storage_key, :half_open_storage_key, :state_change_syncer, :request_id

def failed_request?(response)
response.status.nil? || response.status >= 500 || config.error_codes_watchlist.include?(response.status)
end

def should_open?
return false if failure_count < config.min_failures_count

failure_count >= config.max_failures_count || failure_rate >= config.failure_rate_threshold
end

def close!(opts = {})
state_change_syncer.synchronize do
# We only close the circuit if there have been at least one successful request during the current sample window
return unless success_count.positive?

# For the circuit to be closable, it must NOT be open AND
# it must be currently half open (i.e half_open_storage_key must be true)
# Otherwise, we return early
return unless !open? && storage.delete(half_open_storage_key)

# reset failures count for current sample window
# so that we can only trip the circuit if we reach the min failures threshold again
storage.delete(stat_storage_key(:failure))
end

log_circuit_event('circuit_closed', STATE_CLOSED, opts)
end

def open!(opts = {})
state_change_syncer.synchronize do
return if open?

trip!(type: :full, **opts)
end

opts.delete(:expires_in) # don't log expires_in key as it may be overridden if greater than max
log_circuit_event('circuit_opened', STATE_OPEN, opts)
end

def half_open!(opts = {})
state_change_syncer.synchronize do
return if open? || half_open?

trip!(type: :partial, **opts)
end

log_circuit_event('circuit_half_opened', STATE_HALF_OPEN, opts)
end

def trip!(type:, **opts)
if type == :full
storage.set(open_storage_key, true, { expires_in: config.open_circuit_sleep_window }.merge(opts))
storage.set(half_open_storage_key, true, { expires_in: config.sample_window }.merge(opts))
elsif type == :partial
storage.set(half_open_storage_key, true, { expires_in: config.sample_window }.merge(opts))
end
end

def record_success!
record_stat(:success)

close! if half_open?
end

def record_failure!
record_stat(:failure)

open! if should_open? && (!half_open? || !open?)
half_open! if !half_open? && failure_count >= config.min_failures_count
end

def record_tripped!
record_stat(:tripped)
log_circuit_event('execution_skipped', STATE_OPEN)
end

def record_stat(outcome, value = 1)
storage.increment(stat_storage_key(outcome), value, expires_in: config.sample_window)
end

def stat_storage_key(outcome)
"run_stat:#{service_id}:#{outcome}"
end

def log_circuit_event(event, status, payload = {})
return unless HTTPigeon.log_circuit_events

payload = {
event_type: "httpigeon.fuse.#{event}",
service_id: service_id,
request_id: request_id,
circuit_state: status,
success_count: success_count,
failure_count: failure_count,
failure_rate: failure_rate,
recorded_at: Time.now.to_i
}.merge(payload).compact

HTTPigeon.event_logger&.log(payload) || HTTPigeon.stdout_logger.log(1, payload.to_json)
end
end
end
end
67 changes: 67 additions & 0 deletions lib/httpigeon/circuit_breaker/fuse_config.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
require 'faraday'

module HTTPigeon
module CircuitBreaker
class NullResponse < Faraday::Response
attr_reader :api_response, :exception

def initialize(response = nil, exception = nil)
@api_response = response
@exception = exception
super(status: 503, response_headers: response&.headers || {})
end
end

class FuseConfig
DEFAULT_MM_TIMEOUT_HEADER = 'X-Maintenance-Mode-Timeout'.freeze

attr_reader :max_failures_count,
:min_failures_count,
:failure_rate_threshold,
:sample_window,
:open_circuit_sleep_window,
:on_open_circuit,
:error_codes_watchlist,
:maintenance_mode_header,
:service_id

def initialize(fuse_options = {})
@service_id = fuse_options[:service_id].presence || raise(ArgumentError, 'service_id is required')
@max_failures_count = fuse_options[:max_failures_count] || HTTPigeon.fuse_max_failures_count
@min_failures_count = fuse_options[:min_failures_count] || HTTPigeon.fuse_min_failures_count
@failure_rate_threshold = fuse_options[:failure_rate_threshold] || HTTPigeon.fuse_failure_rate_threshold
@sample_window = fuse_options[:sample_window] || HTTPigeon.fuse_sample_window
@open_circuit_sleep_window = fuse_options[:open_circuit_sleep_window] || HTTPigeon.fuse_open_circuit_sleep_window
@error_codes_watchlist = fuse_options[:error_codes_watchlist].to_a | HTTPigeon.fuse_error_codes_watchlist.to_a
@maintenance_mode_header = fuse_options[:maintenance_mode_header] || DEFAULT_MM_TIMEOUT_HEADER

@on_open_circuit = if HTTPigeon.fuse_on_circuit_open.respond_to?(:call)
HTTPigeon.fuse_on_circuit_open
else
->(api_response, exception) { null_response(api_response, exception) }
end
end

def to_h
{
service_id: service_id,
max_failures_count: max_failures_count,
min_failures_count: min_failures_count,
failure_rate_threshold: failure_rate_threshold,
sample_window: sample_window,
open_circuit_sleep_window: open_circuit_sleep_window,
error_codes_watchlist: error_codes_watchlist,
maintenance_mode_header: maintenance_mode_header
}
end

def null_response(api_response = nil, exception = nil)
NullResponse.new(api_response, exception)
end

def circuit_open_error
CircuitOpenError.new(service_id)
end
end
end
end
Loading

0 comments on commit ead6342

Please sign in to comment.