Skip to content

Commit

Permalink
Reduce amount of subscription data stored in Elixir registry (absinth…
Browse files Browse the repository at this point in the history
…e-graphql#1006)

* Reduce amount of subscription data stored in Elixir registry

* Implement packing pipeline when storing in the registry

* Adjust naming to make it consistent with the rest of the codebase

* Use map_reduce instead of reduce + reverse

* Add tests for PipelineSerializer

* Fix a typo in PipelineSerializer test
  • Loading branch information
zoldar authored Jan 9, 2021
1 parent 37d7281 commit 5e54c50
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 3 deletions.
12 changes: 9 additions & 3 deletions lib/absinthe/subscription.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ defmodule Absinthe.Subscription do

alias __MODULE__

alias Absinthe.Subscription.PipelineSerializer

@doc """
Add Absinthe.Subscription to your process tree.
"""
Expand Down Expand Up @@ -111,7 +113,7 @@ defmodule Absinthe.Subscription do
doc_value = {
doc_id,
%{
initial_phases: doc.initial_phases,
initial_phases: PipelineSerializer.pack(doc.initial_phases),
source: doc.source
}
}
Expand All @@ -138,8 +140,12 @@ defmodule Absinthe.Subscription do
pubsub
|> registry_name
|> Registry.lookup(key)
|> Enum.map(&elem(&1, 1))
|> Map.new()
|> Enum.map(fn match ->
{_, {doc_id, doc}} = match
doc = Map.update!(doc, :initial_phases, &PipelineSerializer.unpack/1)

{doc_id, doc}
end)
end

@doc false
Expand Down
63 changes: 63 additions & 0 deletions lib/absinthe/subscription/pipeline_serializer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
defmodule Absinthe.Subscription.PipelineSerializer do
@moduledoc """
Serializer responsible for packing and unpacking pipeline stored in the Elixir registry.
The purpose of this logic is saving memory by deduplicating repeating options - (ETS
backed registry stores them flat in the memory).
"""

alias Absinthe.{Phase, Pipeline}

@type options_label :: {:options, non_neg_integer()}

@type packed_phase_config :: Phase.t() | {Phase.t(), options_label()}

@type options_map :: %{options_label() => Keyword.t()}

@type packed_pipeline :: {:packed, [packed_phase_config()], options_map()}

@spec pack(Pipeline.t()) :: packed_pipeline()
def pack(pipeline) do
{packed_pipeline, options_reverse_map} =
pipeline
|> List.flatten()
|> Enum.map_reduce(%{}, &maybe_pack_phase/2)

options_map = Map.new(options_reverse_map, fn {options, label} -> {label, options} end)

{:packed, packed_pipeline, options_map}
end

@spec unpack(Pipeline.t() | packed_pipeline()) :: Pipeline.t()
def unpack({:packed, pipeline, options_map}) do
Enum.map(pipeline, fn
{phase, {:options, _n} = options_label} ->
{phase, Map.fetch!(options_map, options_label)}

phase ->
phase
end)
end

def unpack([_ | _] = pipeline) do
pipeline
end

defp maybe_pack_phase({phase, options}, options_reverse_map) do
if Map.has_key?(options_reverse_map, options) do
options_label = options_reverse_map[options]

{{phase, options_label}, options_reverse_map}
else
new_index = map_size(options_reverse_map)
options_label = {:options, new_index}
options_reverse_map = Map.put(options_reverse_map, options, options_label)

{{phase, options_label}, options_reverse_map}
end
end

defp maybe_pack_phase(phase, options_reverse_map) do
{phase, options_reverse_map}
end
end
95 changes: 95 additions & 0 deletions test/absinthe/subscription/pipeline_serializer_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
defmodule Absinthe.Subscription.PipelineSerializerTest do
use ExUnit.Case, async: true

alias Absinthe.Pipeline
alias Absinthe.Subscription.PipelineSerializer

defmodule Schema do
use Absinthe.Schema

query do
# Query type must exist
end
end

describe "pack/1" do
test "packs full-fledged pipeline successfully" do
pipeline = Pipeline.for_document(Schema, some: :option)

assert {:packed, [_ | _], %{{:options, 0} => options}} = PipelineSerializer.pack(pipeline)
assert options[:some] == :option
end

test "packs with correct mapping of unique options sets" do
pipeline = [
{Phase1, [option1: :value1]},
Phase2,
{Phase3, [option2: :value2]},
{Phase4, [option1: :value1]}
]

assert {:packed,
[
{Phase1, {:options, 0}},
Phase2,
{Phase3, {:options, 1}},
{Phase4, {:options, 0}}
],
%{{:options, 0} => [option1: :value1], {:options, 1} => [option2: :value2]}} =
PipelineSerializer.pack(pipeline)
end
end

describe "unpack/1" do
test "unpacks full-fledged pipeline successfully" do
packed_pipeline =
Schema
|> Pipeline.for_document(some: :option)
|> PipelineSerializer.pack()

assert [_ | _] = PipelineSerializer.unpack(packed_pipeline)
end

test "leaves unpacked pipeline intact" do
pipeline = Pipeline.for_document(Schema, some: :option)

assert PipelineSerializer.unpack(pipeline) == pipeline
end

test "unpacks with correct options in right spots" do
pipeline = [
{Phase1, [option1: :value1]},
Phase2,
{Phase3, [option2: :value2]},
{Phase4, [option1: :value1]}
]

unpacked =
pipeline
|> PipelineSerializer.pack()
|> PipelineSerializer.unpack()

assert unpacked == pipeline
end
end

test "flattens nested pipeline in full pack/unpack cycle" do
pipeline = [
{Phase1, [option1: :value1]},
Phase2,
[{Phase3, [option2: :value2]}, {Phase4, [option1: :value1]}]
]

unpacked =
pipeline
|> PipelineSerializer.pack()
|> PipelineSerializer.unpack()

assert unpacked == [
{Phase1, [option1: :value1]},
Phase2,
{Phase3, [option2: :value2]},
{Phase4, [option1: :value1]}
]
end
end

0 comments on commit 5e54c50

Please sign in to comment.