Skip to content

Commit

Permalink
Skip Oban when embedded in External Panel UI (Centerdata)
Browse files Browse the repository at this point in the history
  • Loading branch information
mellelieuwes committed Dec 14, 2023
1 parent e33553f commit e3da85e
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 52 deletions.
2 changes: 1 addition & 1 deletion core/config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ config :phoenix_inline_svg,

config :core, Oban,
repo: Core.Repo,
queues: [default: 5, email_dispatchers: 1, email_delivery: 1, data_donation_delivery: 1],
queues: [default: 5, email_dispatchers: 1, email_delivery: 1, storage_delivery: 1],
plugins: [
{Oban.Plugins.Cron,
crontab: [
Expand Down
19 changes: 2 additions & 17 deletions core/systems/assignment/crew_page.ex
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,11 @@ defmodule Systems.Assignment.CrewPage do
modal: nil
)
|> update_panel_info(session)
|> observe_storage_events()
|> observe_view_model()
|> update_flow()
}
end

def observe_storage_events(%{assigns: %{id: id, current_user: %{id: user_id}}} = socket) do
storage_pubsub_key = "crewpage:#{id}:user:#{user_id}"

if Phoenix.LiveView.connected?(socket) do
:ok = Phoenix.PubSub.subscribe(Core.PubSub, storage_pubsub_key)
Logger.warn("Subscribing on topic: #{storage_pubsub_key}")
end

assign(socket, storage_pubsub_key: storage_pubsub_key)
end

def handle_view_model_updated(socket) do
socket
|> update_flow()
Expand Down Expand Up @@ -107,7 +95,6 @@ defmodule Systems.Assignment.CrewPage do
# This is een temp solution before better integrating the donation protocol with Centerdata
#
def handle_info(%{storage_event: %{panel: _, form: _} = event}, socket) do
Logger.warn("handle_info: #{inspect(event)}")
{:noreply, socket |> send_event(:flow, "show_panel_form", event)}
end

Expand Down Expand Up @@ -144,8 +131,7 @@ defmodule Systems.Assignment.CrewPage do
assigns: %{
panel_info: panel_info,
model: assignment,
remote_ip: remote_ip,
storage_pubsub_key: storage_pubsub_key
remote_ip: remote_ip
}
} = socket,
key,
Expand All @@ -154,8 +140,7 @@ defmodule Systems.Assignment.CrewPage do
meta_data = %{
remote_ip: remote_ip,
timestamp: Timestamp.now() |> DateTime.to_unix(),
key: key,
pubsub_key: storage_pubsub_key
key: key
}

if storage_info = Storage.Private.storage_info(assignment) do
Expand Down
1 change: 0 additions & 1 deletion core/systems/assignment/crew_work_view.ex
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ defmodule Systems.Assignment.CrewWorkView do
socket
) do
child = prepare_child(socket, :panel_form, module, params)
Logger.warn("show_child: #{inspect(child)}")
{:noreply, socket |> show_child(child)}
end

Expand Down
4 changes: 4 additions & 0 deletions core/systems/assignment/external_panel_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ defmodule Systems.Assignment.ExternalPanelController do

defp add_panel_info(conn, params) do
panel_info = %{
embedded?: is_embedded(params),
participant: get_participant(params),
query_string: params
}
Expand Down Expand Up @@ -96,4 +97,7 @@ defmodule Systems.Assignment.ExternalPanelController do
defp get_locale(%{"language" => language}), do: language
defp get_locale(%{"locale" => locale}), do: locale
defp get_locale(_), do: nil

defp is_embedded(%{"panel" => "liss"}), do: true
defp is_embedded(_), do: false
end
28 changes: 18 additions & 10 deletions core/systems/storage/_public.ex
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
defmodule Systems.Storage.Public do
require Logger

alias Systems.{
Rate,
Storage
}

def store(
%{key: key, backend: backend, endpoint: endpoint},
panel_info,
%{embedded?: embedded?} = panel_info,
data,
%{remote_ip: remote_ip} = meta_data
) do
Expand All @@ -15,15 +17,21 @@ defmodule Systems.Storage.Public do
# raises error when request is denied
Rate.Public.request_permission(key, remote_ip, packet_size)

%{
backend: backend,
endpoint: endpoint,
panel_info: panel_info,
data: data,
meta_data: meta_data
}
|> Storage.Delivery.new()
|> Oban.insert()
if embedded? do
# submit data in current process
Logger.warn("[Storage.Public] deliver directly")
Storage.Delivery.deliver(backend, endpoint, panel_info, data, meta_data)
else
%{
backend: backend,
endpoint: endpoint,
panel_info: panel_info,
data: data,
meta_data: meta_data
}
|> Storage.Delivery.new()
|> Oban.insert()
end
end
end

Expand Down
15 changes: 4 additions & 11 deletions core/systems/storage/centerdata/backend.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ defmodule Systems.Storage.Centerdata.Backend do
require Logger

def store(
%{"url" => url} = _endpoint,
%{url: url} = _endpoint,
%{
"query_string" => %{
query_string: %{
"quest" => quest,
"varname1" => varname1,
"respondent" => respondent,
Expand All @@ -15,7 +15,7 @@ defmodule Systems.Storage.Centerdata.Backend do
}
} = _panel_info,
data,
%{"pubsub_key" => pubsub_key} = _meta_data
_meta_data
) do
Logger.warn("Centerdata store: respondent=#{respondent}")

Expand All @@ -38,13 +38,6 @@ defmodule Systems.Storage.Centerdata.Backend do
}
}

Logger.warn("Broadcasting on topic: #{pubsub_key}")

result =
Phoenix.PubSub.broadcast(Core.PubSub, pubsub_key, %{
storage_event: %{panel: :centerdata, form: form}
})

Logger.warn("Broadcast result: #{inspect(result)}")
send(self(), %{storage_event: %{panel: :centerdata, form: form}})
end
end
27 changes: 15 additions & 12 deletions core/systems/storage/delivery.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ defmodule Systems.Storage.Delivery do
end

use Oban.Worker,
queue: :data_donation_delivery,
queue: :storage_delivery,
priority: 1,
max_attempts: 3,
unique: [period: 30]
Expand All @@ -25,17 +25,20 @@ defmodule Systems.Storage.Delivery do
end
end

defp deliver(
%{
"backend" => backend,
"endpoint" => endpoint,
"panel_info" => panel_info,
"data" => data,
"meta_data" => meta_data
} = job
) do
Logger.warn("[Storage.Delivery] deliver: #{inspect(job)}")
def deliver(backend, endpoint, panel_info, data, meta_data) do
Logger.warn("[Storage.Delivery] deliver")
backend.store(endpoint, panel_info, data, meta_data)
end

String.to_existing_atom(backend).store(endpoint, panel_info, data, meta_data)
def deliver(
%{
"backend" => backend,
"endpoint" => endpoint,
"panel_info" => panel_info,
"data" => data,
"meta_data" => meta_data
} = _job
) do
deliver(String.to_existing_atom(backend), endpoint, panel_info, data, meta_data)
end
end

0 comments on commit e3da85e

Please sign in to comment.