diff --git a/t/integration/test_canvas.py b/t/integration/test_canvas.py index 27b5a06f760..34368cdb616 100644 --- a/t/integration/test_canvas.py +++ b/t/integration/test_canvas.py @@ -17,25 +17,32 @@ return_exception, return_priority, second_order_replace1, tsum) +RETRYABLE_EXCEPTIONS = (OSError, ConnectionError, TimeoutError) + + +def is_retryable_exception(exc): + return isinstance(exc, RETRYABLE_EXCEPTIONS) + + TIMEOUT = 120 class test_link_error: - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_link_error_eager(self): exception = ExpectedException("Task expected to fail", "test") result = fail.apply(args=("test", ), link_error=return_exception.s()) actual = result.get(timeout=TIMEOUT, propagate=False) assert actual == exception - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_link_error(self): exception = ExpectedException("Task expected to fail", "test") result = fail.apply(args=("test", ), link_error=return_exception.s()) actual = result.get(timeout=TIMEOUT, propagate=False) assert actual == exception - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_link_error_callback_error_callback_retries_eager(self): exception = ExpectedException("Task expected to fail", "test") result = fail.apply( @@ -44,7 +51,7 @@ def test_link_error_callback_error_callback_retries_eager(self): ) assert result.get(timeout=TIMEOUT, propagate=False) == exception - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_link_error_callback_retries(self): exception = ExpectedException("Task expected to fail", "test") result = fail.apply_async( @@ -53,7 +60,7 @@ def test_link_error_callback_retries(self): ) assert result.get(timeout=TIMEOUT, propagate=False) == exception - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_link_error_using_signature_eager(self): fail = signature('t.integration.tasks.fail', args=("test", )) retrun_exception = signature('t.integration.tasks.return_exception') @@ -63,7 +70,7 @@ def test_link_error_using_signature_eager(self): exception = ExpectedException("Task expected to fail", "test") assert (fail.apply().get(timeout=TIMEOUT, propagate=False), True) == (exception, True) - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_link_error_using_signature(self): fail = signature('t.integration.tasks.fail', args=("test", )) retrun_exception = signature('t.integration.tasks.return_exception') @@ -76,17 +83,17 @@ def test_link_error_using_signature(self): class test_chain: - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_simple_chain(self, manager): c = add.s(4, 4) | add.s(8) | add.s(16) assert c().get(timeout=TIMEOUT) == 32 - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_single_chain(self, manager): c = chain(add.s(3, 4))() assert c.get(timeout=TIMEOUT) == 7 - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_complex_chain(self, manager): c = ( add.s(2, 2) | ( @@ -97,7 +104,7 @@ def test_complex_chain(self, manager): res = c() assert res.get(timeout=TIMEOUT) == [64, 65, 66, 67] - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_group_results_in_chain(self, manager): # This adds in an explicit test for the special case added in commit # 1e3fcaa969de6ad32b52a3ed8e74281e5e5360e6 @@ -129,7 +136,7 @@ def test_chain_on_error(self, manager): with pytest.raises(ExpectedException): res.parent.get(propagate=True) - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_chain_inside_group_receives_arguments(self, manager): c = ( add.s(5, 6) | @@ -138,7 +145,7 @@ def test_chain_inside_group_receives_arguments(self, manager): res = c() assert res.get(timeout=TIMEOUT) == [14, 14] - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_eager_chain_inside_task(self, manager): from .tasks import chain_add @@ -149,7 +156,7 @@ def test_eager_chain_inside_task(self, manager): chain_add.app.conf.task_always_eager = prev - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_group_chord_group_chain(self, manager): from celery.five import bytes_if_py2 @@ -176,7 +183,7 @@ def test_group_chord_group_chain(self, manager): assert set(redis_messages[4:]) == after_items redis_connection.delete('redis-echo') - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_group_result_not_has_cache(self, manager): t1 = identity.si(1) t2 = identity.si(2) @@ -186,7 +193,7 @@ def test_group_result_not_has_cache(self, manager): result = task.delay() assert result.get(timeout=TIMEOUT) == [1, 2, [3, 4]] - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_second_order_replace(self, manager): from celery.five import bytes_if_py2 @@ -206,7 +213,7 @@ def test_second_order_replace(self, manager): expected_messages = [b'In A', b'In B', b'In/Out C', b'Out B', b'Out A'] assert redis_messages == expected_messages - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_parent_ids(self, manager, num=10): assert_ping(manager) @@ -274,7 +281,7 @@ def test_chain_error_handler_with_eta(self, manager): result = c.get() assert result == 10 - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_groupresult_serialization(self, manager): """Test GroupResult is correctly serialized to save in the result backend""" @@ -288,7 +295,7 @@ def test_groupresult_serialization(self, manager): assert len(result) == 2 assert isinstance(result[0][1], list) - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_chain_of_task_a_group_and_a_chord(self, manager): try: manager.app.backend.ensure_chords_allowed() @@ -303,7 +310,7 @@ def test_chain_of_task_a_group_and_a_chord(self, manager): res = c() assert res.get(timeout=TIMEOUT) == 8 - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_chain_of_chords_as_groups_chained_to_a_task_with_two_tasks(self, manager): try: manager.app.backend.ensure_chords_allowed() @@ -320,7 +327,7 @@ def test_chain_of_chords_as_groups_chained_to_a_task_with_two_tasks(self, manage res = c() assert res.get(timeout=TIMEOUT) == 12 - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_chain_of_chords_with_two_tasks(self, manager): try: manager.app.backend.ensure_chords_allowed() @@ -336,7 +343,7 @@ def test_chain_of_chords_with_two_tasks(self, manager): res = c() assert res.get(timeout=TIMEOUT) == 12 - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_chain_of_a_chord_and_a_group_with_two_tasks(self, manager): try: manager.app.backend.ensure_chords_allowed() @@ -352,7 +359,7 @@ def test_chain_of_a_chord_and_a_group_with_two_tasks(self, manager): res = c() assert res.get(timeout=TIMEOUT) == [6, 6] - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_chain_of_a_chord_and_a_task_and_a_group(self, manager): try: manager.app.backend.ensure_chords_allowed() @@ -367,7 +374,7 @@ def test_chain_of_a_chord_and_a_task_and_a_group(self, manager): res = c() assert res.get(timeout=TIMEOUT) == [6, 6] - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_chain_of_a_chord_and_two_tasks_and_a_group(self, manager): try: manager.app.backend.ensure_chords_allowed() @@ -383,7 +390,7 @@ def test_chain_of_a_chord_and_two_tasks_and_a_group(self, manager): res = c() assert res.get(timeout=TIMEOUT) == [7, 7] - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_chain_of_a_chord_and_three_tasks_and_a_group(self, manager): try: manager.app.backend.ensure_chords_allowed() @@ -403,14 +410,14 @@ def test_chain_of_a_chord_and_three_tasks_and_a_group(self, manager): class test_result_set: - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_result_set(self, manager): assert_ping(manager) rs = ResultSet([add.delay(1, 1), add.delay(2, 2)]) assert rs.get(timeout=TIMEOUT) == [2, 4] - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_result_set_error(self, manager): assert_ping(manager) @@ -422,7 +429,7 @@ def test_result_set_error(self, manager): class test_group: - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_ready_with_exception(self, manager): if not manager.app.conf.result_backend.startswith('redis'): raise pytest.skip('Requires redis result backend.') @@ -432,7 +439,7 @@ def test_ready_with_exception(self, manager): while not result.ready(): pass - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_empty_group_result(self, manager): if not manager.app.conf.result_backend.startswith('redis'): raise pytest.skip('Requires redis result backend.') @@ -444,7 +451,7 @@ def test_empty_group_result(self, manager): task = GroupResult.restore(result.id) assert task.results == [] - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_parent_ids(self, manager): assert_ping(manager) @@ -464,7 +471,7 @@ def test_parent_ids(self, manager): assert parent_id == expected_parent_id assert value == i + 2 - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_nested_group(self, manager): assert_ping(manager) @@ -482,7 +489,7 @@ def test_nested_group(self, manager): assert res.get(timeout=TIMEOUT) == [11, 101, 1001, 2001] - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_large_group(self, manager): assert_ping(manager) @@ -507,7 +514,7 @@ def assert_ping(manager): class test_chord: - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_redis_subscribed_channels_leak(self, manager): if not manager.app.conf.result_backend.startswith('redis'): raise pytest.skip('Requires redis result backend.') @@ -548,7 +555,7 @@ def test_redis_subscribed_channels_leak(self, manager): assert channels_after_count == initial_channels_count assert set(channels_after) == set(initial_channels) - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_replaced_nested_chord(self, manager): try: manager.app.backend.ensure_chords_allowed() @@ -568,7 +575,7 @@ def test_replaced_nested_chord(self, manager): res1 = c1() assert res1.get(timeout=TIMEOUT) == [29, 38] - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_add_to_chord(self, manager): if not manager.app.conf.result_backend.startswith('redis'): raise pytest.skip('Requires redis result backend.') @@ -577,7 +584,7 @@ def test_add_to_chord(self, manager): res = c() assert res.get() == [0, 5, 6, 7] - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_add_chord_to_chord(self, manager): if not manager.app.conf.result_backend.startswith('redis'): raise pytest.skip('Requires redis result backend.') @@ -586,7 +593,7 @@ def test_add_chord_to_chord(self, manager): res = c() assert res.get() == [0, 5 + 6 + 7] - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_eager_chord_inside_task(self, manager): from .tasks import chord_add @@ -597,7 +604,7 @@ def test_eager_chord_inside_task(self, manager): chord_add.app.conf.task_always_eager = prev - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_group_chain(self, manager): if not manager.app.conf.result_backend.startswith('redis'): raise pytest.skip('Requires redis result backend.') @@ -609,7 +616,7 @@ def test_group_chain(self, manager): res = c() assert res.get(timeout=TIMEOUT) == [12, 13, 14, 15] - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_nested_group_chain(self, manager): try: manager.app.backend.ensure_chords_allowed() @@ -635,7 +642,7 @@ def test_nested_group_chain(self, manager): res = c() assert res.get(timeout=TIMEOUT) == 11 - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_single_task_header(self, manager): try: manager.app.backend.ensure_chords_allowed() @@ -664,7 +671,7 @@ def test_empty_header_chord(self, manager): res2 = c2() assert res2.get(timeout=TIMEOUT) == [] - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_nested_chord(self, manager): try: manager.app.backend.ensure_chords_allowed() @@ -698,7 +705,7 @@ def test_nested_chord(self, manager): res = c() assert [[[[3, 3], 4], 5], 6] == res.get(timeout=TIMEOUT) - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_parent_ids(self, manager): if not manager.app.conf.result_backend.startswith('redis'): raise pytest.skip('Requires redis result backend.') @@ -713,7 +720,7 @@ def test_parent_ids(self, manager): ) self.assert_parentids_chord(g(), expected_root_id) - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_parent_ids__OR(self, manager): if not manager.app.conf.result_backend.startswith('redis'): raise pytest.skip('Requires redis result backend.') @@ -817,7 +824,7 @@ def test_chord_on_error(self, manager): assert len([cr for cr in chord_results if cr[2] != states.SUCCESS] ) == 1 - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_parallel_chords(self, manager): try: manager.app.backend.ensure_chords_allowed() @@ -831,7 +838,7 @@ def test_parallel_chords(self, manager): assert r.get(timeout=TIMEOUT) == [10, 10] - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_chord_in_chords_with_chains(self, manager): try: manager.app.backend.ensure_chords_allowed() @@ -862,7 +869,7 @@ def test_chord_in_chords_with_chains(self, manager): assert r.get(timeout=TIMEOUT) == 4 - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_chain_chord_chain_chord(self, manager): # test for #2573 try: @@ -888,7 +895,7 @@ def test_chain_chord_chain_chord(self, manager): res = c.delay() assert res.get(timeout=TIMEOUT) == 7 - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_large_header(self, manager): try: manager.app.backend.ensure_chords_allowed() @@ -899,7 +906,7 @@ def test_large_header(self, manager): res = c.delay() assert res.get(timeout=TIMEOUT) == 499500 - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_chain_to_a_chord_with_large_header(self, manager): try: manager.app.backend.ensure_chords_allowed() @@ -910,12 +917,12 @@ def test_chain_to_a_chord_with_large_header(self, manager): res = c.delay() assert res.get(timeout=TIMEOUT) == 1000 - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_priority(self, manager): c = chain(return_priority.signature(priority=3))() assert c.get(timeout=TIMEOUT) == "Priority: 3" - @pytest.mark.flaky(reruns=5, reruns_delay=1) + @pytest.mark.flaky(reruns=5, reruns_delay=1, cause=is_retryable_exception) def test_priority_chain(self, manager): c = return_priority.signature(priority=3) | return_priority.signature(priority=5) assert c().get(timeout=TIMEOUT) == "Priority: 5"