diff --git a/config/test.exs b/config/test.exs index 900b6fd..6b0105e 100644 --- a/config/test.exs +++ b/config/test.exs @@ -45,6 +45,11 @@ config :amqpx, handler_module: Amqpx.Test.Support.HandleRejectionConsumer, backoff: 10 }, + %{ + handler_module: Amqpx.Test.Support.NoRequeueConsumer, + backoff: 10, + requeue_on_reject: false + }, %{ handler_module: Amqpx.Test.Support.ConsumerConnectionTwo, connection_name: ConnectionTwo @@ -97,6 +102,17 @@ config :amqpx, Amqpx.Test.Support.HandleRejectionConsumer, %{ ] } +config :amqpx, Amqpx.Test.Support.NoRequeueConsumer, %{ + queue: "test-no-requeue", + exchanges: [ + %{ + name: "topic-no-requeue", + type: :topic, + routing_keys: ["amqpx.test-no-requeue"] + } + ] +} + config :amqpx, Amqpx.Test.Support.ConsumerConnectionTwo, %{ queue: "connection-two", exchanges: [ diff --git a/lib/amqp/gen/consumer.ex b/lib/amqp/gen/consumer.ex index 037e123..3c9f334 100644 --- a/lib/amqp/gen/consumer.ex +++ b/lib/amqp/gen/consumer.ex @@ -13,7 +13,8 @@ defmodule Amqpx.Gen.Consumer do :handler_state, prefetch_count: 50, backoff: 5_000, - connection_name: Amqpx.Gen.ConnectionManager + connection_name: Amqpx.Gen.ConnectionManager, + requeue_on_reject: true ] @type state() :: %__MODULE__{} @@ -191,7 +192,8 @@ defmodule Amqpx.Gen.Consumer do %__MODULE__{ handler_module: handler_module, handler_state: handler_state, - backoff: backoff + backoff: backoff, + requeue_on_reject: requeue_on_reject } = state ) do {:ok, handler_state} = handler_module.handle_message(message, meta, handler_state) @@ -206,13 +208,13 @@ defmodule Amqpx.Gen.Consumer do is_message_to_reject = function_exported?(handler_module, :handle_message_rejection, 2) && - redelivered + (!requeue_on_reject || (redelivered && requeue_on_reject)) if is_message_to_reject do handler_module.handle_message_rejection(message, e) end - Basic.reject(state.channel, tag, requeue: !redelivered) + Basic.reject(state.channel, tag, requeue: requeue_on_reject && !redelivered) end) state diff --git a/test/gen_test.exs b/test/gen_test.exs index a00f72c..df027e8 100644 --- a/test/gen_test.exs +++ b/test/gen_test.exs @@ -4,6 +4,7 @@ defmodule Amqpx.Test.AmqpxTest do alias Amqpx.Test.Support.Consumer1 alias Amqpx.Test.Support.Consumer2 alias Amqpx.Test.Support.HandleRejectionConsumer + alias Amqpx.Test.Support.NoRequeueConsumer alias Amqpx.Test.Support.ConsumerConnectionTwo alias Amqpx.Test.Support.Producer1 alias Amqpx.Test.Support.Producer2 @@ -96,17 +97,93 @@ defmodule Amqpx.Test.AmqpxTest do test "e2e: should handle message rejected when handle message fails" do test_pid = self() - error_message = "test_error" with_mock(HandleRejectionConsumer, - handle_message: fn _, _, _ -> raise error_message end, - handle_message_rejection: fn _, error -> send(test_pid, {:ok, error.message}) end + handle_message: fn _, _, _ -> + mock_called = + case Process.get(:times_mock_called) do + nil -> 1 + n -> n + 1 + end + + Process.put(:times_mock_called, mock_called) + + raise "test_error ##{mock_called}" + end, + handle_message_rejection: fn _, error -> + send(test_pid, {:ok, error.message}) + end ) do publish_result = Amqpx.Gen.Producer.publish("topic-rejection", "amqpx.test-rejection", "some-message", redeliver: false) assert publish_result == :ok - assert_receive {:ok, ^error_message}, 1_000 + + # ensure handle_message_rejection is called only the second time + assert_receive {:ok, "test_error #2"}, 1_000 + end + end + + test "e2e: messages should not be re-enqueued when re-enqueue option is disabled" do + test_pid = self() + error_message = "test_error" + + with_mock(NoRequeueConsumer, + handle_message: fn _, _, _ -> + mock_called = + case Process.get(:times_mock_called) do + nil -> 1 + n -> n + 1 + end + + Process.put(:times_mock_called, mock_called) + + send(test_pid, {:handled_message, mock_called}) + raise error_message + end, + handle_message_rejection: fn _, _ -> :ok end + ) do + :ok = Amqpx.Gen.Producer.publish("topic-no-requeue", "amqpx.test-no-requeue", "some-message", redeliver: false) + assert_receive {:handled_message, 1} + refute_receive {:handled_message, 2} + end + end + + test "e2e: handle_message_reject should be called upon first time when re-enqueue option is disabled" do + test_pid = self() + error_message = "test-error-requeue" + + with_mock(NoRequeueConsumer, + handle_message: fn _, _, _ -> + mock_called = + case Process.get(:times_mock_handle_message_called) do + nil -> 1 + n -> n + 1 + end + + Process.put(:times_mock_handle_message_called, mock_called) + + raise "#{error_message} ##{mock_called}" + end, + handle_message_rejection: fn _, error -> + mock_called = + case Process.get(:times_mock_handle_message_rejection_called) do + nil -> 1 + n -> n + 1 + end + + Process.put(:times_mock_handle_message_rejection_called, mock_called) + + send(test_pid, {:handled_message, {error.message, mock_called}}) + :ok + end + ) do + :ok = Amqpx.Gen.Producer.publish("topic-no-requeue", "amqpx.test-no-requeue", "some-message", redeliver: false) + + # ensure the handle_message_rejection is called exactly one time after the handle_message call. + err_msg = {"#{error_message} #1", 1} + assert_receive {:handled_message, ^err_msg} + refute_receive {:handled_message, _}, 1_000 end end diff --git a/test/support/consumer/consumer_no_requeue.ex b/test/support/consumer/consumer_no_requeue.ex new file mode 100644 index 0000000..26be4b9 --- /dev/null +++ b/test/support/consumer/consumer_no_requeue.ex @@ -0,0 +1,21 @@ +defmodule Amqpx.Test.Support.NoRequeueConsumer do + @moduledoc nil + @behaviour Amqpx.Gen.Consumer + + alias Amqpx.Basic + alias Amqpx.Helper + + def setup(channel) do + Helper.declare(channel, Application.fetch_env!(:amqpx, __MODULE__)) + Basic.consume(channel, Application.fetch_env!(:amqpx, __MODULE__)[:queue], self()) + {:ok, %{}} + end + + def handle_message_rejection(_msg, _err) do + :ok + end + + def handle_message(_payload, _meta, _state) do + raise "test error" + end +end