Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(request): Implement circuit breaking #33

Merged
merged 6 commits into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions httpigeon.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ Gem::Specification.new do |spec|
spec.add_development_dependency "rubocop", "~> 1.21"
spec.add_development_dependency "rubocop-rspec", "~> 2.24"
spec.add_development_dependency "pry", "~> 0.13.1"
spec.add_development_dependency "timecop", "~> 0.9.8"

# For more information and examples about making a new gem, check out our
# guide at: https://bundler.io/guides/creating_gem.html
Expand Down
25 changes: 24 additions & 1 deletion lib/httpigeon.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
require "httpigeon/logger"
require "httpigeon/request"
require "httpigeon/response"
require "httpigeon/circuit_breaker/fuse"

module HTTPigeon
extend self
Expand All @@ -18,6 +19,8 @@ module FilterPatterns
CLIENT_SECRET = "/(?'key'(client_?(s|S)?ecret=))(?'value'([^&$])*)/".freeze
end

class InvalidConfigurationError < StandardError; end

delegate :default_event_type,
:default_filter_keys,
:redactor_string,
Expand All @@ -26,19 +29,39 @@ module FilterPatterns
:auto_generate_request_id,
:notify_all_exceptions,
:exception_notifier,
:mount_circuit_breaker,
:log_circuit_events,
:fuse_error_codes_watchlist,
:fuse_on_circuit_open,
:fuse_max_failures_count,
:fuse_min_failures_count,
:fuse_failure_rate_threshold,
:fuse_sample_window,
:fuse_open_circuit_sleep_window,
:fuse_on_open_circuit,
to: :configuration

def configure
@config = HTTPigeon::Configuration.new

yield(@config)
yield(@config) if block_given?

validate_config(@config)

@config.freeze
end

def stdout_logger
@stdout_logger ||= ::Logger.new($stdout)
end

private

def configuration
@configuration ||= @config || HTTPigeon::Configuration.new
end

def validate_config(config)
raise InvalidConfigurationError, "Fuse sleep window: #{config.fuse_open_circuit_sleep_window} must be less than or equal to sample window: #{config.fuse_sample_window}" if fuse_open_circuit_sleep_window > fuse_sample_window
end
end
11 changes: 11 additions & 0 deletions lib/httpigeon/circuit_breaker/errors.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module HTTPigeon
module CircuitBreaker
class Error < StandardError; end

class CircuitOpenError < Error
def initialize(service_id)
super("Circuit open for service: #{service_id}")
end
end
end
end
205 changes: 205 additions & 0 deletions lib/httpigeon/circuit_breaker/fuse.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
require_relative 'errors'
require_relative 'fuse_config'
require_relative 'memory_store'
require_relative '../middleware/circuit_breaker'

module HTTPigeon
module CircuitBreaker
class Fuse
STATE_OPEN = 'open'.freeze
STATE_HALF_OPEN = 'half_open'.freeze
STATE_CLOSED = 'closed'.freeze

def self.from_options(options)
new(FuseConfig.new(options))
end

attr_reader :service_id, :config, :storage

def initialize(config)
@config = config
@service_id = config.service_id.to_s
@storage = CircuitBreaker::MemoryStore.new(config.sample_window)
@open_storage_key = "circuit:#{service_id}:#{STATE_OPEN}"
@half_open_storage_key = "circuit:#{service_id}:#{STATE_HALF_OPEN}"
@state_change_syncer = Mutex.new
end

def execute(request_id: nil)
@request_id = request_id

if open?
record_tripped!

config.on_open_circuit.call(config.null_response, config.circuit_open_error)
else
begin
response = yield
server_maintenance_timeout = response.headers[config.maintenance_mode_header].to_i

if server_maintenance_timeout.positive?
record_failure!
open!(
{
expires_in: server_maintenance_timeout,
# for logging purposes. can't log expires_in because it might be overridden if greater than max
server_maintenance_timeout: server_maintenance_timeout
}
)

return config.on_open_circuit.call(response, config.circuit_open_error)
end

record_success!
response
rescue Faraday::Error => e
record_failure! if e.response_status >= 500 || config.error_codes_watchlist.include?(e.response_status)

raise e
end
end
end

def open?
storage.key?(open_storage_key)
end

def half_open?
storage.key?(half_open_storage_key)
end

def failure_count
storage.get(stat_storage_key(:failure)).to_i
end

def success_count
storage.get(stat_storage_key(:success)).to_i
end

def tripped_count
storage.get(stat_storage_key(:tripped)).to_i
end

def failure_rate
2k-joker marked this conversation as resolved.
Show resolved Hide resolved
total_stats = success_count + failure_count + tripped_count

return 0.0 unless total_stats.positive?

(total_stats - success_count).to_f / total_stats
end

def reset!
state_change_syncer.synchronize { storage.reset! }
end

private

attr_reader :open_storage_key, :half_open_storage_key, :state_change_syncer, :request_id

def failed_request?(response)
response.status.nil? || response.status >= 500 || config.error_codes_watchlist.include?(response.status)
end

def should_open?
return false if failure_count < config.min_failures_count

failure_count >= config.max_failures_count || failure_rate >= config.failure_rate_threshold
end

def close!(opts = {})
state_change_syncer.synchronize do
# We only close the circuit if there have been at least one successful request during the current sample window
return unless success_count.positive?

# For the circuit to be closable, it must NOT be open AND
# it must be currently half open (i.e half_open_storage_key must be true)
# Otherwise, we return early
return unless !open? && storage.delete(half_open_storage_key)

# reset failures count for current sample window
# so that we can only trip the circuit if we reach the min failures threshold again
storage.delete(stat_storage_key(:failure))
end

log_circuit_event('circuit_closed', STATE_CLOSED, opts)
end

def open!(opts = {})
state_change_syncer.synchronize do
return if open?

trip!(type: :full, **opts)

# reset failures count for current sample window so that the circuit doesn't re-open immediately
# if a request fails while in half_open state
storage.delete(stat_storage_key(:failure))
end

opts.delete(:expires_in) # don't log expires_in key as it may be overridden if greater than max
log_circuit_event('circuit_opened', STATE_OPEN, opts)
end

def half_open!(opts = {})
state_change_syncer.synchronize do
return if open? || half_open?

trip!(type: :partial, **opts)
end

log_circuit_event('circuit_half_opened', STATE_HALF_OPEN, opts)
end

def trip!(type:, **opts)
if type == :full
storage.set(open_storage_key, true, { expires_in: config.open_circuit_sleep_window }.merge(opts))
storage.set(half_open_storage_key, true, { expires_in: config.sample_window }.merge(opts))
elsif type == :partial
storage.set(half_open_storage_key, true, { expires_in: config.sample_window }.merge(opts))
end
end

def record_success!
record_stat(:success)

close! if half_open?
end

def record_failure!
record_stat(:failure)

open! if should_open? && (!half_open? || !open?)
half_open! if !half_open? && failure_count >= config.min_failures_count
end

def record_tripped!
record_stat(:tripped)
log_circuit_event('execution_skipped', STATE_OPEN)
end

def record_stat(outcome, value = 1)
storage.increment(stat_storage_key(outcome), value, expires_in: config.sample_window)
end

def stat_storage_key(outcome)
"run_stat:#{service_id}:#{outcome}"
end

def log_circuit_event(event, status, payload = {})
return unless HTTPigeon.log_circuit_events

payload = {
event_type: "httpigeon.fuse.#{event}",
service_id: service_id,
request_id: request_id,
circuit_state: status,
success_count: success_count,
failure_count: failure_count,
failure_rate: failure_rate,
recorded_at: Time.now.to_i
}.merge(payload).compact

HTTPigeon.event_logger&.log(payload) || HTTPigeon.stdout_logger.log(1, payload.to_json)
end
end
end
end
67 changes: 67 additions & 0 deletions lib/httpigeon/circuit_breaker/fuse_config.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
require 'faraday'

module HTTPigeon
module CircuitBreaker
class NullResponse < Faraday::Response
attr_reader :api_response, :exception

def initialize(response = nil, exception = nil)
@api_response = response
@exception = exception
super(status: 503, response_headers: response&.headers || {})
end
end

class FuseConfig
DEFAULT_MM_TIMEOUT_HEADER = 'X-Maintenance-Mode-Timeout'.freeze

attr_reader :max_failures_count,
:min_failures_count,
:failure_rate_threshold,
:sample_window,
:open_circuit_sleep_window,
:on_open_circuit,
:error_codes_watchlist,
:maintenance_mode_header,
:service_id

def initialize(fuse_options = {})
@service_id = fuse_options[:service_id].presence || raise(ArgumentError, 'service_id is required')
@max_failures_count = fuse_options[:max_failures_count] || HTTPigeon.fuse_max_failures_count
@min_failures_count = fuse_options[:min_failures_count] || HTTPigeon.fuse_min_failures_count
@failure_rate_threshold = fuse_options[:failure_rate_threshold] || HTTPigeon.fuse_failure_rate_threshold
@sample_window = fuse_options[:sample_window] || HTTPigeon.fuse_sample_window
@open_circuit_sleep_window = fuse_options[:open_circuit_sleep_window] || HTTPigeon.fuse_open_circuit_sleep_window
@error_codes_watchlist = fuse_options[:error_codes_watchlist].to_a | HTTPigeon.fuse_error_codes_watchlist.to_a
@maintenance_mode_header = fuse_options[:maintenance_mode_header] || DEFAULT_MM_TIMEOUT_HEADER

@on_open_circuit = if HTTPigeon.fuse_on_circuit_open.respond_to?(:call)
HTTPigeon.fuse_on_circuit_open
else
->(api_response, exception) { null_response(api_response, exception) }
end
end

def to_h
{
service_id: service_id,
max_failures_count: max_failures_count,
min_failures_count: min_failures_count,
failure_rate_threshold: failure_rate_threshold,
sample_window: sample_window,
open_circuit_sleep_window: open_circuit_sleep_window,
error_codes_watchlist: error_codes_watchlist,
maintenance_mode_header: maintenance_mode_header
}
end

def null_response(api_response = nil, exception = nil)
NullResponse.new(api_response, exception)
end

def circuit_open_error
CircuitOpenError.new(service_id)
end
end
end
end
Loading
Loading