From bb795e26cce07995ca47d950f78cca78274f9957 Mon Sep 17 00:00:00 2001 From: Jean Boussier Date: Wed, 17 Aug 2022 15:01:00 +0200 Subject: [PATCH] Stabilize the jruby test suite Some threads would stay subscribed to various channel causing state leak across threads. It's still not perfect but should be much more stable. --- lib/redis/subscribe.rb | 4 + test/helper.rb | 2 +- test/redis/publish_subscribe_test.rb | 112 +++++++++++++++------------ 3 files changed, 67 insertions(+), 51 deletions(-) diff --git a/lib/redis/subscribe.rb b/lib/redis/subscribe.rb index de4c6d300..41d612167 100644 --- a/lib/redis/subscribe.rb +++ b/lib/redis/subscribe.rb @@ -34,6 +34,10 @@ def punsubscribe(*channels) call_v([:punsubscribe, *channels]) end + def close + @client.close + end + protected def subscription(start, stop, channels, block, timeout = 0) diff --git a/test/helper.rb b/test/helper.rb index ef5af88d3..38344777e 100644 --- a/test/helper.rb +++ b/test/helper.rb @@ -116,7 +116,7 @@ def setup end def teardown - redis&.quit + redis&.close super end diff --git a/test/redis/publish_subscribe_test.rb b/test/redis/publish_subscribe_test.rb index 04a771328..ee3d99961 100644 --- a/test/redis/publish_subscribe_test.rb +++ b/test/redis/publish_subscribe_test.rb @@ -5,6 +5,26 @@ class TestPublishSubscribe < Minitest::Test include Helper::Client + def setup + @threads = {} + super + end + + def teardown + super + @threads.each do |thread, redis| + if redis.subscribed? + redis.unsubscribe + redis.punsubscribe + end + redis.close + begin + thread.join(2) or warn("leaked thread") + rescue RedisClient::ConnectionError + end + end + end + class TestError < StandardError end @@ -12,8 +32,8 @@ def test_subscribe_and_unsubscribe @subscribed = false @unsubscribed = false - thread = Thread.new do - r.subscribe("foo") do |on| + thread = new_thread do |r| + r.subscribe(channel_name) do |on| on.subscribe do |_channel, total| @subscribed = true @t1 = total @@ -36,8 +56,7 @@ def test_subscribe_and_unsubscribe # Wait until the subscription is active before publishing Thread.pass until @subscribed - Redis.new(OPTIONS).publish("foo", "s1") - + redis.publish(channel_name, "s1") thread.join assert @subscribed @@ -51,8 +70,8 @@ def test_psubscribe_and_punsubscribe @subscribed = false @unsubscribed = false - thread = Thread.new do - r.psubscribe("f*") do |on| + thread = new_thread do |r| + r.psubscribe("channel:*") do |on| on.psubscribe do |_pattern, total| @subscribed = true @t1 = total @@ -74,9 +93,7 @@ def test_psubscribe_and_punsubscribe # Wait until the subscription is active before publishing Thread.pass until @subscribed - - Redis.new(OPTIONS).publish("foo", "s1") - + redis.publish(channel_name, "s1") thread.join assert @subscribed @@ -86,56 +103,37 @@ def test_psubscribe_and_punsubscribe assert_equal "s1", @message end - def test_pubsub_with_numpat_subcommand - @subscribed = false - thread = Thread.new do - r.psubscribe("f*") do |on| - on.psubscribe { |_channel, _total| @subscribed = true } - on.pmessage { |_pattern, _channel, _message| r.punsubscribe } - end - end - Thread.pass until @subscribed - redis = Redis.new(OPTIONS) - numpat_result = redis.pubsub(:numpat) - - redis.publish("foo", "s1") - thread.join - - assert_equal redis.pubsub(:numpat), 0 - assert_equal numpat_result, 1 - end - def test_pubsub_with_channels_and_numsub_subcommnads @subscribed = false - thread = Thread.new do - r.subscribe("foo") do |on| + thread = new_thread do |r| + r.subscribe(channel_name) do |on| on.subscribe { |_channel, _total| @subscribed = true } on.message { |_channel, _message| r.unsubscribe } end end Thread.pass until @subscribed - redis = Redis.new(OPTIONS) channels_result = redis.pubsub(:channels) channels_result.delete('__sentinel__:hello') - numsub_result = redis.pubsub(:numsub, 'foo', 'boo') + numsub_result = redis.pubsub(:numsub, channel_name, 'boo') - redis.publish("foo", "s1") + redis.publish(channel_name, "s1") thread.join - assert_equal channels_result, ['foo'] - assert_equal numsub_result, ['foo', 1, 'boo', 0] + assert_includes channels_result, channel_name + assert_equal [channel_name, 1, 'boo', 0], numsub_result end def test_subscribe_connection_usable_after_raise @subscribed = false - thread = Thread.new do - r.subscribe("foo") do |on| + thread = new_thread do |r| + r.subscribe(channel_name) do |on| on.subscribe do |_channel, _total| @subscribed = true end on.message do |_channel, _message| + r.unsubscribe raise TestError end end @@ -145,7 +143,7 @@ def test_subscribe_connection_usable_after_raise # Wait until the subscription is active before publishing Thread.pass until @subscribed - Redis.new(OPTIONS).publish("foo", "s1") + redis.publish(channel_name, "s1") thread.join @@ -155,8 +153,8 @@ def test_subscribe_connection_usable_after_raise def test_psubscribe_connection_usable_after_raise @subscribed = false - thread = Thread.new do - r.psubscribe("f*") do |on| + thread = new_thread do |r| + r.psubscribe("channel:*") do |on| on.psubscribe do |_pattern, _total| @subscribed = true end @@ -171,7 +169,7 @@ def test_psubscribe_connection_usable_after_raise # Wait until the subscription is active before publishing Thread.pass until @subscribed - Redis.new(OPTIONS).publish("foo", "s1") + redis.publish(channel_name, "s1") thread.join @@ -181,12 +179,12 @@ def test_psubscribe_connection_usable_after_raise def test_subscribe_within_subscribe @channels = [] - thread = Thread.new do - r.subscribe("foo") do |on| + thread = new_thread do |r| + r.subscribe(channel_name) do |on| on.subscribe do |channel, _total| @channels << channel - r.subscribe("bar") if channel == "foo" + r.subscribe("bar") if channel == channel_name r.unsubscribe if channel == "bar" end end @@ -194,21 +192,21 @@ def test_subscribe_within_subscribe thread.join - assert_equal ["foo", "bar"], @channels + assert_equal [channel_name, "bar"], @channels end def test_other_commands_within_a_subscribe - r.subscribe("foo") do |on| + r.subscribe(channel_name) do |on| on.subscribe do |_channel, _total| r.set("bar", "s2") - r.unsubscribe("foo") + r.unsubscribe(channel_name) end end end def test_subscribe_without_a_block assert_raises LocalJumpError do - r.subscribe("foo") + r.subscribe(channel_name) end end @@ -245,7 +243,7 @@ def test_subscribe_past_a_timeout def test_subscribe_with_timeout received = false - r.subscribe_with_timeout(LOW_TIMEOUT, "foo") do |on| + r.subscribe_with_timeout(LOW_TIMEOUT, channel_name) do |on| on.message do |_channel, _message| received = true end @@ -257,7 +255,7 @@ def test_subscribe_with_timeout def test_psubscribe_with_timeout received = false - r.psubscribe_with_timeout(LOW_TIMEOUT, "f*") do |on| + r.psubscribe_with_timeout(LOW_TIMEOUT, "channel:*") do |on| on.message do |_channel, _message| received = true end @@ -265,4 +263,18 @@ def test_psubscribe_with_timeout refute received end + + private + + def new_thread(&block) + redis = Redis.new(OPTIONS) + thread = Thread.new(redis, &block) + thread.report_on_exception = false + @threads[thread] = redis + thread + end + + def channel_name + @channel_name ||= "channel:#{rand}" + end end