Skip to content

Commit

Permalink
Rework concurrency control to avoid thread locks (#54)
Browse files Browse the repository at this point in the history
This patch was created by Hailey Somerville in #52 (comment). All
the credit to her.

Co-authored-by: Hailey Somerville <[email protected]>
  • Loading branch information
jorgemanrubia and haileys authored Jan 24, 2025
1 parent 7cce173 commit 50186b9
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 28 deletions.
62 changes: 36 additions & 26 deletions lib/action_cable/subscription_adapter/solid_cable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require "action_cable/subscription_adapter/base"
require "action_cable/subscription_adapter/channel_prefix"
require "action_cable/subscription_adapter/subscriber_map"
require "concurrent/atomic/semaphore"

module ActionCable
module SubscriptionAdapter
Expand Down Expand Up @@ -38,34 +39,53 @@ def listener
end

class Listener < ::ActionCable::SubscriptionAdapter::SubscriberMap
Stop = Class.new(Exception)

def initialize(event_loop)
super()

@event_loop = event_loop

# Critical section begins with 0 permits. It can be understood as
# being "normally held" by the listener thread. It is released
# for specific sections of code, rather than acquired.
@critical = Concurrent::Semaphore.new(0)

@thread = Thread.new do
Thread.current.abort_on_exception = true
listen
end
end

def listen
loop do
break unless running?

with_polling_volume { broadcast_messages }
begin
instance = interruptible { Rails.application.executor.run! }
with_polling_volume { broadcast_messages }
ensure
instance.complete! if instance
end

interruptible_sleep ::SolidCable.polling_interval
interruptible { sleep ::SolidCable.polling_interval }
end
rescue Stop
ensure
@critical.release
end

def shutdown
self.running = false
wake_up
def interruptible
@critical.release
yield
ensure
@critical.acquire
end

ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
thread&.join
end
def shutdown
@critical.acquire
# We have the critical permit, and so the listen thread must be
# safe to interrupt.
thread.raise(Stop)
@critical.release
thread.join
end

def add_channel(channel, on_success)
Expand All @@ -83,15 +103,7 @@ def invoke_callback(*)

private
attr_reader :event_loop, :thread
attr_writer :running, :last_id

def running?
if defined?(@running)
@running
else
self.running = true
end
end
attr_writer :last_id

def last_id
@last_id ||= ::SolidCable::Message.maximum(:id) || 0
Expand All @@ -102,12 +114,10 @@ def channels
end

def broadcast_messages
Rails.application.executor.wrap do
::SolidCable::Message.broadcastable(channels, last_id).
each do |message|
broadcast(message.channel, message.payload)
self.last_id = message.id
end
::SolidCable::Message.broadcastable(channels, last_id).
each do |message|
broadcast(message.channel, message.payload)
self.last_id = message.id
end
end

Expand Down
3 changes: 1 addition & 2 deletions test/config_stubs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ def executor
end

class ExectorStub
def wrap(&block)
block.call
def run!
end
end
end
Expand Down

0 comments on commit 50186b9

Please sign in to comment.