Skip to content

Commit

Permalink
Stabilize the jruby test suite
Browse files Browse the repository at this point in the history
Some threads would stay subscribed to various channel
causing state leak across threads.

It's still not perfect but should be much more stable.
  • Loading branch information
byroot committed Aug 17, 2022
1 parent 325da9d commit bb795e2
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 51 deletions.
4 changes: 4 additions & 0 deletions lib/redis/subscribe.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ def punsubscribe(*channels)
call_v([:punsubscribe, *channels])
end

def close
@client.close
end

protected

def subscription(start, stop, channels, block, timeout = 0)
Expand Down
2 changes: 1 addition & 1 deletion test/helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def setup
end

def teardown
redis&.quit
redis&.close
super
end

Expand Down
112 changes: 62 additions & 50 deletions test/redis/publish_subscribe_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,35 @@
class TestPublishSubscribe < Minitest::Test
include Helper::Client

def setup
@threads = {}
super
end

def teardown
super
@threads.each do |thread, redis|
if redis.subscribed?
redis.unsubscribe
redis.punsubscribe
end
redis.close
begin
thread.join(2) or warn("leaked thread")
rescue RedisClient::ConnectionError
end
end
end

class TestError < StandardError
end

def test_subscribe_and_unsubscribe
@subscribed = false
@unsubscribed = false

thread = Thread.new do
r.subscribe("foo") do |on|
thread = new_thread do |r|
r.subscribe(channel_name) do |on|
on.subscribe do |_channel, total|
@subscribed = true
@t1 = total
Expand All @@ -36,8 +56,7 @@ def test_subscribe_and_unsubscribe
# Wait until the subscription is active before publishing
Thread.pass until @subscribed

Redis.new(OPTIONS).publish("foo", "s1")

redis.publish(channel_name, "s1")
thread.join

assert @subscribed
Expand All @@ -51,8 +70,8 @@ def test_psubscribe_and_punsubscribe
@subscribed = false
@unsubscribed = false

thread = Thread.new do
r.psubscribe("f*") do |on|
thread = new_thread do |r|
r.psubscribe("channel:*") do |on|
on.psubscribe do |_pattern, total|
@subscribed = true
@t1 = total
Expand All @@ -74,9 +93,7 @@ def test_psubscribe_and_punsubscribe

# Wait until the subscription is active before publishing
Thread.pass until @subscribed

Redis.new(OPTIONS).publish("foo", "s1")

redis.publish(channel_name, "s1")
thread.join

assert @subscribed
Expand All @@ -86,56 +103,37 @@ def test_psubscribe_and_punsubscribe
assert_equal "s1", @message
end

def test_pubsub_with_numpat_subcommand
@subscribed = false
thread = Thread.new do
r.psubscribe("f*") do |on|
on.psubscribe { |_channel, _total| @subscribed = true }
on.pmessage { |_pattern, _channel, _message| r.punsubscribe }
end
end
Thread.pass until @subscribed
redis = Redis.new(OPTIONS)
numpat_result = redis.pubsub(:numpat)

redis.publish("foo", "s1")
thread.join

assert_equal redis.pubsub(:numpat), 0
assert_equal numpat_result, 1
end

def test_pubsub_with_channels_and_numsub_subcommnads
@subscribed = false
thread = Thread.new do
r.subscribe("foo") do |on|
thread = new_thread do |r|
r.subscribe(channel_name) do |on|
on.subscribe { |_channel, _total| @subscribed = true }
on.message { |_channel, _message| r.unsubscribe }
end
end
Thread.pass until @subscribed
redis = Redis.new(OPTIONS)
channels_result = redis.pubsub(:channels)
channels_result.delete('__sentinel__:hello')
numsub_result = redis.pubsub(:numsub, 'foo', 'boo')
numsub_result = redis.pubsub(:numsub, channel_name, 'boo')

redis.publish("foo", "s1")
redis.publish(channel_name, "s1")
thread.join

assert_equal channels_result, ['foo']
assert_equal numsub_result, ['foo', 1, 'boo', 0]
assert_includes channels_result, channel_name
assert_equal [channel_name, 1, 'boo', 0], numsub_result
end

def test_subscribe_connection_usable_after_raise
@subscribed = false

thread = Thread.new do
r.subscribe("foo") do |on|
thread = new_thread do |r|
r.subscribe(channel_name) do |on|
on.subscribe do |_channel, _total|
@subscribed = true
end

on.message do |_channel, _message|
r.unsubscribe
raise TestError
end
end
Expand All @@ -145,7 +143,7 @@ def test_subscribe_connection_usable_after_raise
# Wait until the subscription is active before publishing
Thread.pass until @subscribed

Redis.new(OPTIONS).publish("foo", "s1")
redis.publish(channel_name, "s1")

thread.join

Expand All @@ -155,8 +153,8 @@ def test_subscribe_connection_usable_after_raise
def test_psubscribe_connection_usable_after_raise
@subscribed = false

thread = Thread.new do
r.psubscribe("f*") do |on|
thread = new_thread do |r|
r.psubscribe("channel:*") do |on|
on.psubscribe do |_pattern, _total|
@subscribed = true
end
Expand All @@ -171,7 +169,7 @@ def test_psubscribe_connection_usable_after_raise
# Wait until the subscription is active before publishing
Thread.pass until @subscribed

Redis.new(OPTIONS).publish("foo", "s1")
redis.publish(channel_name, "s1")

thread.join

Expand All @@ -181,34 +179,34 @@ def test_psubscribe_connection_usable_after_raise
def test_subscribe_within_subscribe
@channels = []

thread = Thread.new do
r.subscribe("foo") do |on|
thread = new_thread do |r|
r.subscribe(channel_name) do |on|
on.subscribe do |channel, _total|
@channels << channel

r.subscribe("bar") if channel == "foo"
r.subscribe("bar") if channel == channel_name
r.unsubscribe if channel == "bar"
end
end
end

thread.join

assert_equal ["foo", "bar"], @channels
assert_equal [channel_name, "bar"], @channels
end

def test_other_commands_within_a_subscribe
r.subscribe("foo") do |on|
r.subscribe(channel_name) do |on|
on.subscribe do |_channel, _total|
r.set("bar", "s2")
r.unsubscribe("foo")
r.unsubscribe(channel_name)
end
end
end

def test_subscribe_without_a_block
assert_raises LocalJumpError do
r.subscribe("foo")
r.subscribe(channel_name)
end
end

Expand Down Expand Up @@ -245,7 +243,7 @@ def test_subscribe_past_a_timeout
def test_subscribe_with_timeout
received = false

r.subscribe_with_timeout(LOW_TIMEOUT, "foo") do |on|
r.subscribe_with_timeout(LOW_TIMEOUT, channel_name) do |on|
on.message do |_channel, _message|
received = true
end
Expand All @@ -257,12 +255,26 @@ def test_subscribe_with_timeout
def test_psubscribe_with_timeout
received = false

r.psubscribe_with_timeout(LOW_TIMEOUT, "f*") do |on|
r.psubscribe_with_timeout(LOW_TIMEOUT, "channel:*") do |on|
on.message do |_channel, _message|
received = true
end
end

refute received
end

private

def new_thread(&block)
redis = Redis.new(OPTIONS)
thread = Thread.new(redis, &block)
thread.report_on_exception = false
@threads[thread] = redis
thread
end

def channel_name
@channel_name ||= "channel:#{rand}"
end
end

0 comments on commit bb795e2

Please sign in to comment.