From 51d2b515cb3bca66d280df4325974ea60bd96557 Mon Sep 17 00:00:00 2001 From: Ygor Castor Date: Mon, 27 Jul 2020 16:02:27 +0200 Subject: [PATCH] Making EventStore persistent subscriber options configurable --- docker-compose.yml | 10 +++ .../adapters/extreme/subscription.ex | 50 +++++++++++--- .../extreme/subscriptions_supervisor.ex | 3 +- lib/commanded/event_store/extreme.ex | 36 +++++++++- mix.exs | 2 +- test/event_store/subscription_test.exs | 69 +++++++++++++++++++ 6 files changed, 156 insertions(+), 14 deletions(-) create mode 100644 docker-compose.yml diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..5d0d442 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,10 @@ +version: "3" +services: + eventstore: + image: eventstore/eventstore + environment: + - EVENTSTORE_RUN_PROJECTIONS=all + - EVENTSTORE_START_STANDARD_PROJECTIONS=true + ports: + - "2113:2113" + - "1113:1113" diff --git a/lib/commanded/event_store/adapters/extreme/subscription.ex b/lib/commanded/event_store/adapters/extreme/subscription.ex index 64b92c4..8ec8bf6 100644 --- a/lib/commanded/event_store/adapters/extreme/subscription.ex +++ b/lib/commanded/event_store/adapters/extreme/subscription.ex @@ -22,6 +22,16 @@ defmodule Commanded.EventStore.Adapters.Extreme.Subscription do :serializer, :stream, :start_from, + :message_timeout_milliseconds, + :record_statistics, + :live_buffer_size, + :read_batch_size, + :buffer_size, + :max_retry_count, + :prefer_round_robin, + :checkpoint_after_time, + :checkpoint_max_count, + :checkpoint_min_count, :subscriber_max_count, :subscriber, :subscriber_ref, @@ -44,6 +54,16 @@ defmodule Commanded.EventStore.Adapters.Extreme.Subscription do serializer: serializer, subscriber: subscriber, start_from: Keyword.get(opts, :start_from), + message_timeout_milliseconds: Keyword.get(opts, :message_timeout_milliseconds, 10_000), + record_statistics: Keyword.get(opts, :record_statistics, false), + live_buffer_size: Keyword.get(opts, :live_buffer_size, 500), + read_batch_size: Keyword.get(opts, :read_batch_size, 20), + buffer_size: Keyword.get(opts, :buffer_size, 500), + max_retry_count: Keyword.get(opts, :max_retry_count, 10), + prefer_round_robin: Keyword.get(opts, :prefer_round_robin, false), + checkpoint_after_time: Keyword.get(opts, :checkpoint_after_time, 1_000), + checkpoint_max_count: Keyword.get(opts, :checkpoint_max_count, 500), + checkpoint_min_count: Keyword.get(opts, :checkpoint_min_count, 1), subscriber_max_count: Keyword.get(opts, :subscriber_max_count, 1), retry_interval: subscription_retry_interval() } @@ -201,6 +221,16 @@ defmodule Commanded.EventStore.Adapters.Extreme.Subscription do name: name, stream: stream, start_from: start_from, + message_timeout_milliseconds: message_timeout_milliseconds, + record_statistics: record_statistics, + live_buffer_size: live_buffer_size, + read_batch_size: read_batch_size, + buffer_size: buffer_size, + max_retry_count: max_retry_count, + prefer_round_robin: prefer_round_robin, + checkpoint_after_time: checkpoint_after_time, + checkpoint_max_count: checkpoint_max_count, + checkpoint_min_count: checkpoint_min_count, subscriber_max_count: subscriber_max_count } = state @@ -217,16 +247,16 @@ defmodule Commanded.EventStore.Adapters.Extreme.Subscription do event_stream_id: stream, resolve_link_tos: true, start_from: start_from, - message_timeout_milliseconds: 10_000, - record_statistics: false, - live_buffer_size: 500, - read_batch_size: 20, - buffer_size: 500, - max_retry_count: 10, - prefer_round_robin: false, - checkpoint_after_time: 1_000, - checkpoint_max_count: 500, - checkpoint_min_count: 1, + message_timeout_milliseconds: message_timeout_milliseconds, + record_statistics: record_statistics, + live_buffer_size: live_buffer_size, + read_batch_size: read_batch_size, + buffer_size: buffer_size, + max_retry_count: max_retry_count, + prefer_round_robin: prefer_round_robin, + checkpoint_after_time: checkpoint_after_time, + checkpoint_max_count: checkpoint_max_count, + checkpoint_min_count: checkpoint_min_count, subscriber_max_count: subscriber_max_count ) diff --git a/lib/commanded/event_store/adapters/extreme/subscriptions_supervisor.ex b/lib/commanded/event_store/adapters/extreme/subscriptions_supervisor.ex index 85f28a3..05d21f6 100644 --- a/lib/commanded/event_store/adapters/extreme/subscriptions_supervisor.ex +++ b/lib/commanded/event_store/adapters/extreme/subscriptions_supervisor.ex @@ -47,12 +47,13 @@ defmodule Commanded.EventStore.Adapters.Extreme.SubscriptionsSupervisor do {:error, {:already_started, _pid}} -> case Keyword.get(opts, :subscriber_max_count) do - nil -> + subscriber_count when subscriber_count in [nil, 1] -> {:error, :subscription_already_exists} subscriber_max_count -> if index < subscriber_max_count - 1 do start_subscription( + event_store, stream, subscription_name, subscriber, diff --git a/lib/commanded/event_store/extreme.ex b/lib/commanded/event_store/extreme.ex index 27a1a6c..d302ea8 100644 --- a/lib/commanded/event_store/extreme.ex +++ b/lib/commanded/event_store/extreme.ex @@ -142,6 +142,23 @@ defmodule Commanded.EventStore.Adapters.Extreme do ) end + @impl Commanded.EventStore.Adapter + def subscribe_to(adapter_meta, stream_uuid, subscription_name, subscriber, start_from, options) do + event_store = server_name(adapter_meta) + stream = stream_name(adapter_meta, stream_uuid) + serializer = serializer(adapter_meta) + opts = subscription_options(start_from, options) + + SubscriptionsSupervisor.start_subscription( + event_store, + stream, + subscription_name, + subscriber, + serializer, + opts + ) + end + @impl Commanded.EventStore.Adapter def ack_event(_adapter_meta, subscription, %RecordedEvent{event_number: event_number}) do Subscription.ack(subscription, event_number) @@ -457,10 +474,25 @@ defmodule Commanded.EventStore.Adapters.Extreme do end end - defp subscription_options(start_from) do + defp subscription_options(start_from, options \\ []) do [ - start_from: start_from + start_from: start_from, + message_timeout_milliseconds: Keyword.get(options, :message_timeout_milliseconds), + record_statistics: Keyword.get(options, :record_statistics), + live_buffer_size: Keyword.get(options, :live_buffer_size), + read_batch_size: Keyword.get(options, :read_batch_size), + buffer_size: Keyword.get(options, :buffer_size), + max_retry_count: Keyword.get(options, :max_retry_count), + prefer_round_robin: Keyword.get(options, :prefer_round_robin), + checkpoint_after_time: Keyword.get(options, :checkpoint_after_time), + checkpoint_max_count: Keyword.get(options, :checkpoint_max_count), + checkpoint_min_count: Keyword.get(options, :checkpoint_min_count), + subscriber_max_count: Keyword.get(options, :subscriber_max_count) ] + |> Enum.reject(fn elmn -> + {_, value} = elmn + is_nil(value) + end) end # Event store supports the following special values for expected version: diff --git a/mix.exs b/mix.exs index a4f5cc4..b2bd3ee 100644 --- a/mix.exs +++ b/mix.exs @@ -37,7 +37,7 @@ defmodule Commanded.EventStore.Adapters.Extreme.Mixfile do defp deps do [ - {:commanded, "~> 1.1"}, + {:commanded, "~> 1.2"}, {:extreme, "~> 0.13"}, # Optional dependencies diff --git a/test/event_store/subscription_test.exs b/test/event_store/subscription_test.exs index ab3707d..40d8506 100644 --- a/test/event_store/subscription_test.exs +++ b/test/event_store/subscription_test.exs @@ -4,5 +4,74 @@ defmodule Commanded.EventStore.Adapters.Extreme.SubscriptionTest do use Commanded.ExtremeTestCase use Commanded.EventStore.SubscriptionTestCase, event_store: Extreme + describe "multiple subscribers to stream" do + test "should receive `:subscribed` message if the subscriber limit is not reached ", %{ + event_store: event_store, + event_store_meta: event_store_meta + } do + eventstore_options = [subscriber_max_count: 2] + + {:ok, first_subscription} = + event_store.subscribe_to( + event_store_meta, + "stream_multiple_1", + "subscriber", + self(), + :origin, + eventstore_options + ) + + {:ok, second_subscription} = + event_store.subscribe_to( + event_store_meta, + "stream_multiple_1", + "subscriber", + self(), + :origin, + eventstore_options + ) + + assert_receive {:subscribed, ^first_subscription} + assert_receive {:subscribed, ^second_subscription} + end + + test "should receive `:too_many_subscribers` message if the subscriber limit is reached ", %{ + event_store: event_store, + event_store_meta: event_store_meta + } do + eventstore_options = [subscriber_max_count: 2] + + {:ok, _first_subscription} = + event_store.subscribe_to( + event_store_meta, + "stream_multiple_2", + "subscriber", + self(), + :origin, + eventstore_options + ) + + {:ok, _second_subscription} = + event_store.subscribe_to( + event_store_meta, + "stream_multiple_2", + "subscriber", + self(), + :origin, + eventstore_options + ) + + assert {:error, :too_many_subscribers} == + event_store.subscribe_to( + event_store_meta, + "stream_multiple_2", + "subscriber", + self(), + :origin, + eventstore_options + ) + end + end + defp event_store_wait(_default \\ nil), do: 5_000 end