diff --git a/lib/logflare/backends.ex b/lib/logflare/backends.ex index 85b12005c..f27cf0cca 100644 --- a/lib/logflare/backends.ex +++ b/lib/logflare/backends.ex @@ -12,6 +12,7 @@ defmodule Logflare.Backends do alias Logflare.Backends.SourcesSup alias Logflare.Backends.SourceSup + alias Logflare.Buffers.Buffer alias Logflare.Buffers.MemoryBuffer alias Logflare.LogEvent alias Logflare.Repo @@ -129,10 +130,10 @@ defmodule Logflare.Backends do The ingestion pipeline then pulls from the buffer and dispatches log events to the correct backends. """ @type log_param :: map() - @spec ingest_logs(list(log_param()), Source.t()) :: :ok + @spec ingest_logs([log_param()], Source.t()) :: :ok def ingest_logs(log_events, source) do via = via_source(source, :buffer) - MemoryBuffer.add_many(via, log_events) + Buffer.add_many(MemoryBuffer, via, log_events) :ok end diff --git a/lib/logflare/backends/adaptor/postgres_adaptor.ex b/lib/logflare/backends/adaptor/postgres_adaptor.ex index 7805c524d..c87a46389 100644 --- a/lib/logflare/backends/adaptor/postgres_adaptor.ex +++ b/lib/logflare/backends/adaptor/postgres_adaptor.ex @@ -15,6 +15,7 @@ defmodule Logflare.Backends.Adaptor.PostgresAdaptor do alias Logflare.Backends alias Logflare.Backends.SourceBackend alias Logflare.Backends.SourceDispatcher + alias Logflare.Buffers.Buffer alias Logflare.Buffers.MemoryBuffer @behaviour Logflare.Backends.Adaptor @@ -160,7 +161,7 @@ defmodule Logflare.Backends.Adaptor.PostgresAdaptor do @impl GenServer def handle_call({:ingest, log_events}, _from, %{config: _config} = state) do - MemoryBuffer.add_many(state.buffer_pid, log_events) + Buffer.add_many(state.buffer_module, state.buffer_pid, log_events) {:reply, :ok, state} end end diff --git a/lib/logflare/backends/adaptor/webhook_adaptor.ex b/lib/logflare/backends/adaptor/webhook_adaptor.ex index bb5e4e44d..2952a39e2 100644 --- a/lib/logflare/backends/adaptor/webhook_adaptor.ex +++ b/lib/logflare/backends/adaptor/webhook_adaptor.ex @@ -8,6 +8,7 @@ defmodule Logflare.Backends.Adaptor.WebhookAdaptor do alias Logflare.Backends.Adaptor.WebhookAdaptor alias Logflare.Backends.SourceBackend alias Logflare.Backends.SourceDispatcher + alias Logflare.Buffers.Buffer alias Logflare.Buffers.MemoryBuffer @behaviour Logflare.Backends.Adaptor @@ -71,7 +72,7 @@ defmodule Logflare.Backends.Adaptor.WebhookAdaptor do @impl GenServer def handle_call({:ingest, log_events}, _from, %{config: _config} = state) do # TODO: queue, send concurrently - MemoryBuffer.add_many(state.buffer_pid, log_events) + Buffer.add_many(state.buffer_module, state.buffer_pid, log_events) {:reply, :ok, state} end diff --git a/lib/logflare/buffers/buffer.ex b/lib/logflare/buffers/buffer.ex index d98dcc485..71f82c997 100644 --- a/lib/logflare/buffers/buffer.ex +++ b/lib/logflare/buffers/buffer.ex @@ -2,10 +2,6 @@ defmodule Logflare.Buffers.Buffer do @moduledoc """ Defines a behaviour for a buffer. """ - @doc """ - Adds a payload to the buffer. - """ - @callback add(identifier(), payload :: term()) :: :ok @doc """ Adds a list of payloads to the buffer. @@ -23,12 +19,46 @@ defmodule Logflare.Buffers.Buffer do @callback length(identifier()) :: non_neg_integer() @doc """ - Returns one item from the buffer + Returns multiple items from the buffer + """ + @callback pop_many(identifier(), non_neg_integer()) :: [term()] + + @doc """ + Adds payload to the buffer. + """ + @spec add(module(), identifier(), term()) :: :ok + def add(mod, ident, payload), + do: mod.add_many(ident, [payload]) + + @doc """ + Adds a list of payloads to the buffer. """ - @callback pop(identifier) :: term() + @spec add_many(module(), identifier(), [term()]) :: :ok + def add_many(mod, ident, payloads) when is_list(payloads), + do: mod.add_many(ident, payloads) + + @doc """ + Clears the buffer and removes all enqueued items. + """ + @spec clear(module(), identifier()) :: :ok + def clear(mod, ident), do: mod.clear(ident) + + @doc """ + Returns the length of the buffer + """ + @spec length(module(), identifier()) :: non_neg_integer() + def length(mod, ident), do: mod.length(ident) + + @doc """ + Returns single item from the buffer + """ + @spec pop(module(), identifier()) :: term() + def pop(mod, ident), do: mod.pop_many(ident, 1) @doc """ Returns multiple items from the buffer """ - @callback pop_many(identifier(), non_neg_integer()) :: [term()] + @spec pop_many(module(), identifier(), non_neg_integer()) :: [term()] + def pop_many(mod, ident, count) when is_integer(count) and count > 0, + do: mod.pop_many(ident, count) end diff --git a/lib/logflare/buffers/buffer_producer.ex b/lib/logflare/buffers/buffer_producer.ex index 8dcc152a8..ec4660923 100644 --- a/lib/logflare/buffers/buffer_producer.ex +++ b/lib/logflare/buffers/buffer_producer.ex @@ -4,6 +4,8 @@ defmodule Logflare.Buffers.BufferProducer do """ use GenStage + alias Logflare.Buffers.Buffer + def start_link(opts) do GenStage.start_link(__MODULE__, opts) end @@ -33,11 +35,11 @@ defmodule Logflare.Buffers.BufferProducer do end defp resolve_demand( - %{buffer_module: module, buffer_pid: pid, demand: prev_demand} = state, + %{demand: prev_demand} = state, new_demand \\ 0 ) do total_demand = prev_demand + new_demand - {:ok, items} = module.pop_many(pid, total_demand) + {:ok, items} = Buffer.pop_many(state.buffer_module, state.buffer_pid, total_demand) {items, %{state | demand: total_demand - length(items)}} end diff --git a/lib/logflare/buffers/memory_buffer.ex b/lib/logflare/buffers/memory_buffer.ex index 91628db86..c74907144 100644 --- a/lib/logflare/buffers/memory_buffer.ex +++ b/lib/logflare/buffers/memory_buffer.ex @@ -1,66 +1,65 @@ defmodule Logflare.Buffers.MemoryBuffer do @moduledoc """ This is an implementation of an in-memory buffer, using `:queue`.any() - All operations are syncronous. + All operations are synchronous. """ - alias Logflare.{Buffers.Buffer, Buffers.MemoryBuffer} - use GenServer - @behaviour Buffer - - # GenServer state and init callbacks - defstruct queue: nil - def start_link(opts \\ []) do - GenServer.start_link(__MODULE__, [], opts) - end + @behaviour Logflare.Buffers.Buffer - @impl true - def init(_) do - {:ok, %MemoryBuffer{queue: :queue.new()}} - end + use GenServer + use TypedStruct # API - @impl Buffer - def add(identifier, payload) do - GenServer.call(identifier, {:add, [payload]}) - end - - @impl Buffer + @impl Logflare.Buffers.Buffer def add_many(identifier, payloads) do GenServer.call(identifier, {:add, payloads}) end - @impl Buffer - def pop(identifier) do - GenServer.call(identifier, {:pop, 1}) - end - - @impl Buffer + @impl Logflare.Buffers.Buffer def pop_many(identifier, number) do GenServer.call(identifier, {:pop, number}) end - @impl Buffer + @impl Logflare.Buffers.Buffer def clear(identifier) do GenServer.call(identifier, :clear) end - @impl Buffer + @impl Logflare.Buffers.Buffer def length(identifier) do GenServer.call(identifier, :length) end + # GenServer state and init callbacks + typedstruct module: State do + @moduledoc false + + field :queue, :queue.queue() | nil, default: :queue.new() + field :proc_name, atom() | binary() | pid(), enforce: true + field :size, non_neg_integer(), default: 0 + end + + def start_link(opts \\ []) do + proc_name = opts[:proc_name] || opts[:name] + + GenServer.start_link(__MODULE__, %{proc_name: proc_name}, opts) + end + + @impl GenServer + def init(%{proc_name: proc_name}) do + {:ok, %State{proc_name: proc_name || self()}} + end + # GenServer callbacks - @impl true + @impl GenServer def handle_call({:add, payloads}, _from, state) do to_join = :queue.from_list(payloads) new_queue = :queue.join(state.queue, to_join) - {:reply, :ok, %{state | queue: new_queue}} + {:reply, :ok, %State{state | queue: new_queue}} end - @impl true def handle_call({:pop, number}, _from, state) do {items, new_queue} = case :queue.len(state.queue) do @@ -75,15 +74,13 @@ defmodule Logflare.Buffers.MemoryBuffer do {:queue.to_list(popped_queue), queue} end - {:reply, {:ok, items}, %{state | queue: new_queue}} + {:reply, {:ok, items}, %State{state | queue: new_queue}} end - @impl true def handle_call(:clear, _from, state) do - {:reply, :ok, %{state | queue: :queue.new()}} + {:reply, :ok, %State{state | queue: :queue.new()}} end - @impl true def handle_call(:length, _from, state) do {:reply, :queue.len(state.queue), state} end diff --git a/test/logflare/buffers/memory_buffer_test.exs b/test/logflare/buffers/memory_buffer_test.exs index 8c018cc6d..dc53170b7 100644 --- a/test/logflare/buffers/memory_buffer_test.exs +++ b/test/logflare/buffers/memory_buffer_test.exs @@ -1,19 +1,22 @@ defmodule Logflare.Queues.MemoryBufferTest do @moduledoc false use ExUnit.Case, async: true - alias Logflare.Buffers.MemoryBuffer + + alias Logflare.Buffers.Buffer + + @subject Logflare.Buffers.MemoryBuffer @job %{some: "job"} setup do - pid = start_supervised!(MemoryBuffer) + pid = start_supervised!(@subject) {:ok, pid: pid} end test "can enqueue jobs", %{pid: pid} do - assert :ok = MemoryBuffer.add(pid, @job) - assert :ok = MemoryBuffer.add_many(pid, [@job]) - assert MemoryBuffer.length(pid) == 2 + assert :ok = Buffer.add(@subject, pid, @job) + assert :ok = Buffer.add_many(@subject, pid, [@job]) + assert Buffer.length(@subject, pid) == 2 end test "can pop n jobs from queue", %{pid: pid} do @@ -21,18 +24,18 @@ defmodule Logflare.Queues.MemoryBufferTest do job2 = %{some: "job2"} job3 = %{some: "job3"} job4 = %{some: "job4"} - assert :ok = MemoryBuffer.add_many(pid, [job1, job2]) - assert :ok = MemoryBuffer.add_many(pid, [job3, job4]) + assert :ok = Buffer.add_many(@subject, pid, [job1, job2]) + assert :ok = Buffer.add_many(@subject, pid, [job3, job4]) # should use fifo - assert {:ok, [%{some: "job1"}, %{some: "job2"}]} = MemoryBuffer.pop_many(pid, 2) - assert {:ok, [%{some: "job3"}, %{some: "job4"}]} = MemoryBuffer.pop_many(pid, 5) - assert {:ok, []} = MemoryBuffer.pop_many(pid, 5) + assert {:ok, [%{some: "job1"}, %{some: "job2"}]} = Buffer.pop_many(@subject, pid, 2) + assert {:ok, [%{some: "job3"}, %{some: "job4"}]} = Buffer.pop_many(@subject, pid, 5) + assert {:ok, []} = Buffer.pop_many(@subject, pid, 5) end test "can clear jobs from queue", %{pid: pid} do - assert :ok = MemoryBuffer.add(pid, @job) - MemoryBuffer.clear(pid) - assert MemoryBuffer.length(pid) == 0 + assert :ok = Buffer.add(@subject, pid, @job) + Buffer.clear(@subject, pid) + assert Buffer.length(@subject, pid) == 0 end end