From a903f2909e8b9e08bea434cd9a8e043b16063675 Mon Sep 17 00:00:00 2001 From: Jorge Manrubia Date: Wed, 8 Jan 2025 14:59:40 +0100 Subject: [PATCH] Rework concurrency control to avoid thread locks This patch was created by Hailey Somerville in https://github.com/rails/solid_cable/pull/52#issuecomment-2537366280. All the credit to her. Co-authored-by: Hailey Somerville --- .../subscription_adapter/solid_cable.rb | 62 +++++++++++-------- test/config_stubs.rb | 3 +- 2 files changed, 37 insertions(+), 28 deletions(-) diff --git a/lib/action_cable/subscription_adapter/solid_cable.rb b/lib/action_cable/subscription_adapter/solid_cable.rb index c0e012a..f712dee 100644 --- a/lib/action_cable/subscription_adapter/solid_cable.rb +++ b/lib/action_cable/subscription_adapter/solid_cable.rb @@ -3,6 +3,7 @@ require "action_cable/subscription_adapter/base" require "action_cable/subscription_adapter/channel_prefix" require "action_cable/subscription_adapter/subscriber_map" +require "concurrent/atomic/semaphore" module ActionCable module SubscriptionAdapter @@ -38,34 +39,53 @@ def listener end class Listener < ::ActionCable::SubscriptionAdapter::SubscriberMap + Stop = Class.new(Exception) + def initialize(event_loop) super() @event_loop = event_loop + # Critical section begins with 0 permits. It can be understood as + # being "normally held" by the listener thread. It is released + # for specific sections of code, rather than acquired. + @critical = Concurrent::Semaphore.new(0) + @thread = Thread.new do - Thread.current.abort_on_exception = true listen end end def listen loop do - break unless running? - - with_polling_volume { broadcast_messages } + begin + instance = interruptible { Rails.application.executor.run! } + with_polling_volume { broadcast_messages } + ensure + instance.complete! if instance + end - interruptible_sleep ::SolidCable.polling_interval + interruptible { sleep ::SolidCable.polling_interval } end + rescue Stop + ensure + @critical.release end - def shutdown - self.running = false - wake_up + def interruptible + @critical.release + yield + ensure + @critical.acquire + end - ActiveSupport::Dependencies.interlock.permit_concurrent_loads do - thread&.join - end + def shutdown + @critical.acquire + # We have the critical permit, and so the listen thread must be + # safe to interrupt. + thread.raise(Stop) + @critical.release + thread.join end def add_channel(channel, on_success) @@ -83,15 +103,7 @@ def invoke_callback(*) private attr_reader :event_loop, :thread - attr_writer :running, :last_id - - def running? - if defined?(@running) - @running - else - self.running = true - end - end + attr_writer :last_id def last_id @last_id ||= ::SolidCable::Message.maximum(:id) || 0 @@ -102,12 +114,10 @@ def channels end def broadcast_messages - Rails.application.executor.wrap do - ::SolidCable::Message.broadcastable(channels, last_id). - each do |message| - broadcast(message.channel, message.payload) - self.last_id = message.id - end + ::SolidCable::Message.broadcastable(channels, last_id). + each do |message| + broadcast(message.channel, message.payload) + self.last_id = message.id end end diff --git a/test/config_stubs.rb b/test/config_stubs.rb index f6e78af..84a30f4 100644 --- a/test/config_stubs.rb +++ b/test/config_stubs.rb @@ -18,8 +18,7 @@ def executor end class ExectorStub - def wrap(&block) - block.call + def run! end end end