Skip to content

Commit

Permalink
Add requeue_on_reject configuration option (#112)
Browse files Browse the repository at this point in the history
* Add requeue_on_reject config option

* Add requeue_on_reject config option

* Make sure to invoke handle_message_rejection when the requeue_on_reject option is set to false, too.

* Improved tests, making sure calls order is checked

* Removed not needed option while testing NoRequeueConsumer
  • Loading branch information
cando authored Jul 15, 2022
1 parent 61f6d85 commit 8396a93
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 8 deletions.
16 changes: 16 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ config :amqpx,
handler_module: Amqpx.Test.Support.HandleRejectionConsumer,
backoff: 10
},
%{
handler_module: Amqpx.Test.Support.NoRequeueConsumer,
backoff: 10,
requeue_on_reject: false
},
%{
handler_module: Amqpx.Test.Support.ConsumerConnectionTwo,
connection_name: ConnectionTwo
Expand Down Expand Up @@ -97,6 +102,17 @@ config :amqpx, Amqpx.Test.Support.HandleRejectionConsumer, %{
]
}

config :amqpx, Amqpx.Test.Support.NoRequeueConsumer, %{
queue: "test-no-requeue",
exchanges: [
%{
name: "topic-no-requeue",
type: :topic,
routing_keys: ["amqpx.test-no-requeue"]
}
]
}

config :amqpx, Amqpx.Test.Support.ConsumerConnectionTwo, %{
queue: "connection-two",
exchanges: [
Expand Down
10 changes: 6 additions & 4 deletions lib/amqp/gen/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ defmodule Amqpx.Gen.Consumer do
:handler_state,
prefetch_count: 50,
backoff: 5_000,
connection_name: Amqpx.Gen.ConnectionManager
connection_name: Amqpx.Gen.ConnectionManager,
requeue_on_reject: true
]

@type state() :: %__MODULE__{}
Expand Down Expand Up @@ -191,7 +192,8 @@ defmodule Amqpx.Gen.Consumer do
%__MODULE__{
handler_module: handler_module,
handler_state: handler_state,
backoff: backoff
backoff: backoff,
requeue_on_reject: requeue_on_reject
} = state
) do
{:ok, handler_state} = handler_module.handle_message(message, meta, handler_state)
Expand All @@ -206,13 +208,13 @@ defmodule Amqpx.Gen.Consumer do

is_message_to_reject =
function_exported?(handler_module, :handle_message_rejection, 2) &&
redelivered
(!requeue_on_reject || (redelivered && requeue_on_reject))

if is_message_to_reject do
handler_module.handle_message_rejection(message, e)
end

Basic.reject(state.channel, tag, requeue: !redelivered)
Basic.reject(state.channel, tag, requeue: requeue_on_reject && !redelivered)
end)

state
Expand Down
85 changes: 81 additions & 4 deletions test/gen_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule Amqpx.Test.AmqpxTest do
alias Amqpx.Test.Support.Consumer1
alias Amqpx.Test.Support.Consumer2
alias Amqpx.Test.Support.HandleRejectionConsumer
alias Amqpx.Test.Support.NoRequeueConsumer
alias Amqpx.Test.Support.ConsumerConnectionTwo
alias Amqpx.Test.Support.Producer1
alias Amqpx.Test.Support.Producer2
Expand Down Expand Up @@ -96,17 +97,93 @@ defmodule Amqpx.Test.AmqpxTest do

test "e2e: should handle message rejected when handle message fails" do
test_pid = self()
error_message = "test_error"

with_mock(HandleRejectionConsumer,
handle_message: fn _, _, _ -> raise error_message end,
handle_message_rejection: fn _, error -> send(test_pid, {:ok, error.message}) end
handle_message: fn _, _, _ ->
mock_called =
case Process.get(:times_mock_called) do
nil -> 1
n -> n + 1
end

Process.put(:times_mock_called, mock_called)

raise "test_error ##{mock_called}"
end,
handle_message_rejection: fn _, error ->
send(test_pid, {:ok, error.message})
end
) do
publish_result =
Amqpx.Gen.Producer.publish("topic-rejection", "amqpx.test-rejection", "some-message", redeliver: false)

assert publish_result == :ok
assert_receive {:ok, ^error_message}, 1_000

# ensure handle_message_rejection is called only the second time
assert_receive {:ok, "test_error #2"}, 1_000
end
end

test "e2e: messages should not be re-enqueued when re-enqueue option is disabled" do
test_pid = self()
error_message = "test_error"

with_mock(NoRequeueConsumer,
handle_message: fn _, _, _ ->
mock_called =
case Process.get(:times_mock_called) do
nil -> 1
n -> n + 1
end

Process.put(:times_mock_called, mock_called)

send(test_pid, {:handled_message, mock_called})
raise error_message
end,
handle_message_rejection: fn _, _ -> :ok end
) do
:ok = Amqpx.Gen.Producer.publish("topic-no-requeue", "amqpx.test-no-requeue", "some-message", redeliver: false)
assert_receive {:handled_message, 1}
refute_receive {:handled_message, 2}
end
end

test "e2e: handle_message_reject should be called upon first time when re-enqueue option is disabled" do
test_pid = self()
error_message = "test-error-requeue"

with_mock(NoRequeueConsumer,
handle_message: fn _, _, _ ->
mock_called =
case Process.get(:times_mock_handle_message_called) do
nil -> 1
n -> n + 1
end

Process.put(:times_mock_handle_message_called, mock_called)

raise "#{error_message} ##{mock_called}"
end,
handle_message_rejection: fn _, error ->
mock_called =
case Process.get(:times_mock_handle_message_rejection_called) do
nil -> 1
n -> n + 1
end

Process.put(:times_mock_handle_message_rejection_called, mock_called)

send(test_pid, {:handled_message, {error.message, mock_called}})
:ok
end
) do
:ok = Amqpx.Gen.Producer.publish("topic-no-requeue", "amqpx.test-no-requeue", "some-message", redeliver: false)

# ensure the handle_message_rejection is called exactly one time after the handle_message call.
err_msg = {"#{error_message} #1", 1}
assert_receive {:handled_message, ^err_msg}
refute_receive {:handled_message, _}, 1_000
end
end

Expand Down
21 changes: 21 additions & 0 deletions test/support/consumer/consumer_no_requeue.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
defmodule Amqpx.Test.Support.NoRequeueConsumer do
@moduledoc nil
@behaviour Amqpx.Gen.Consumer

alias Amqpx.Basic
alias Amqpx.Helper

def setup(channel) do
Helper.declare(channel, Application.fetch_env!(:amqpx, __MODULE__))
Basic.consume(channel, Application.fetch_env!(:amqpx, __MODULE__)[:queue], self())
{:ok, %{}}
end

def handle_message_rejection(_msg, _err) do
:ok
end

def handle_message(_payload, _meta, _state) do
raise "test error"
end
end

0 comments on commit 8396a93

Please sign in to comment.