diff --git a/core/config/config.exs b/core/config/config.exs index af91257d5..1e500f815 100644 --- a/core/config/config.exs +++ b/core/config/config.exs @@ -34,7 +34,7 @@ config :phoenix_inline_svg, config :core, Oban, repo: Core.Repo, - queues: [default: 5, email_dispatchers: 1, email_delivery: 1, data_donation_delivery: 1], + queues: [default: 5, email_dispatchers: 1, email_delivery: 1, storage_delivery: 1], plugins: [ {Oban.Plugins.Cron, crontab: [ diff --git a/core/systems/assignment/crew_page.ex b/core/systems/assignment/crew_page.ex index f4523de58..ad6d382ec 100644 --- a/core/systems/assignment/crew_page.ex +++ b/core/systems/assignment/crew_page.ex @@ -39,23 +39,11 @@ defmodule Systems.Assignment.CrewPage do modal: nil ) |> update_panel_info(session) - |> observe_storage_events() |> observe_view_model() |> update_flow() } end - def observe_storage_events(%{assigns: %{id: id, current_user: %{id: user_id}}} = socket) do - storage_pubsub_key = "crewpage:#{id}:user:#{user_id}" - - if Phoenix.LiveView.connected?(socket) do - :ok = Phoenix.PubSub.subscribe(Core.PubSub, storage_pubsub_key) - Logger.warn("Subscribing on topic: #{storage_pubsub_key}") - end - - assign(socket, storage_pubsub_key: storage_pubsub_key) - end - def handle_view_model_updated(socket) do socket |> update_flow() @@ -107,7 +95,6 @@ defmodule Systems.Assignment.CrewPage do # This is een temp solution before better integrating the donation protocol with Centerdata # def handle_info(%{storage_event: %{panel: _, form: _} = event}, socket) do - Logger.warn("handle_info: #{inspect(event)}") {:noreply, socket |> send_event(:flow, "show_panel_form", event)} end @@ -144,8 +131,7 @@ defmodule Systems.Assignment.CrewPage do assigns: %{ panel_info: panel_info, model: assignment, - remote_ip: remote_ip, - storage_pubsub_key: storage_pubsub_key + remote_ip: remote_ip } } = socket, key, @@ -154,8 +140,7 @@ defmodule Systems.Assignment.CrewPage do meta_data = %{ remote_ip: remote_ip, timestamp: Timestamp.now() |> DateTime.to_unix(), - key: key, - pubsub_key: storage_pubsub_key + key: key } if storage_info = Storage.Private.storage_info(assignment) do diff --git a/core/systems/assignment/crew_work_view.ex b/core/systems/assignment/crew_work_view.ex index b8ac4df74..b459e066e 100644 --- a/core/systems/assignment/crew_work_view.ex +++ b/core/systems/assignment/crew_work_view.ex @@ -171,7 +171,6 @@ defmodule Systems.Assignment.CrewWorkView do socket ) do child = prepare_child(socket, :panel_form, module, params) - Logger.warn("show_child: #{inspect(child)}") {:noreply, socket |> show_child(child)} end diff --git a/core/systems/assignment/external_panel_controller.ex b/core/systems/assignment/external_panel_controller.ex index 1031b262a..d7af3e519 100644 --- a/core/systems/assignment/external_panel_controller.ex +++ b/core/systems/assignment/external_panel_controller.ex @@ -61,6 +61,7 @@ defmodule Systems.Assignment.ExternalPanelController do defp add_panel_info(conn, params) do panel_info = %{ + embedded?: is_embedded(params), participant: get_participant(params), query_string: params } @@ -96,4 +97,7 @@ defmodule Systems.Assignment.ExternalPanelController do defp get_locale(%{"language" => language}), do: language defp get_locale(%{"locale" => locale}), do: locale defp get_locale(_), do: nil + + defp is_embedded(%{"panel" => "liss"}), do: true + defp is_embedded(_), do: false end diff --git a/core/systems/storage/_public.ex b/core/systems/storage/_public.ex index 3d9d01e22..1c7a50749 100644 --- a/core/systems/storage/_public.ex +++ b/core/systems/storage/_public.ex @@ -1,4 +1,6 @@ defmodule Systems.Storage.Public do + require Logger + alias Systems.{ Rate, Storage @@ -6,7 +8,7 @@ defmodule Systems.Storage.Public do def store( %{key: key, backend: backend, endpoint: endpoint}, - panel_info, + %{embedded?: embedded?} = panel_info, data, %{remote_ip: remote_ip} = meta_data ) do @@ -15,15 +17,21 @@ defmodule Systems.Storage.Public do # raises error when request is denied Rate.Public.request_permission(key, remote_ip, packet_size) - %{ - backend: backend, - endpoint: endpoint, - panel_info: panel_info, - data: data, - meta_data: meta_data - } - |> Storage.Delivery.new() - |> Oban.insert() + if embedded? do + # submit data in current process + Logger.warn("[Storage.Public] deliver directly") + Storage.Delivery.deliver(backend, endpoint, panel_info, data, meta_data) + else + %{ + backend: backend, + endpoint: endpoint, + panel_info: panel_info, + data: data, + meta_data: meta_data + } + |> Storage.Delivery.new() + |> Oban.insert() + end end end diff --git a/core/systems/storage/centerdata/backend.ex b/core/systems/storage/centerdata/backend.ex index 17cbd5100..616d2da69 100644 --- a/core/systems/storage/centerdata/backend.ex +++ b/core/systems/storage/centerdata/backend.ex @@ -4,9 +4,9 @@ defmodule Systems.Storage.Centerdata.Backend do require Logger def store( - %{"url" => url} = _endpoint, + %{url: url} = _endpoint, %{ - "query_string" => %{ + query_string: %{ "quest" => quest, "varname1" => varname1, "respondent" => respondent, @@ -15,7 +15,7 @@ defmodule Systems.Storage.Centerdata.Backend do } } = _panel_info, data, - %{"pubsub_key" => pubsub_key} = _meta_data + _meta_data ) do Logger.warn("Centerdata store: respondent=#{respondent}") @@ -38,13 +38,6 @@ defmodule Systems.Storage.Centerdata.Backend do } } - Logger.warn("Broadcasting on topic: #{pubsub_key}") - - result = - Phoenix.PubSub.broadcast(Core.PubSub, pubsub_key, %{ - storage_event: %{panel: :centerdata, form: form} - }) - - Logger.warn("Broadcast result: #{inspect(result)}") + send(self(), %{storage_event: %{panel: :centerdata, form: form}}) end end diff --git a/core/systems/storage/delivery.ex b/core/systems/storage/delivery.ex index cebf03589..1dfc495ad 100644 --- a/core/systems/storage/delivery.ex +++ b/core/systems/storage/delivery.ex @@ -5,7 +5,7 @@ defmodule Systems.Storage.Delivery do end use Oban.Worker, - queue: :data_donation_delivery, + queue: :storage_delivery, priority: 1, max_attempts: 3, unique: [period: 30] @@ -25,17 +25,20 @@ defmodule Systems.Storage.Delivery do end end - defp deliver( - %{ - "backend" => backend, - "endpoint" => endpoint, - "panel_info" => panel_info, - "data" => data, - "meta_data" => meta_data - } = job - ) do - Logger.warn("[Storage.Delivery] deliver: #{inspect(job)}") + def deliver(backend, endpoint, panel_info, data, meta_data) do + Logger.warn("[Storage.Delivery] deliver") + backend.store(endpoint, panel_info, data, meta_data) + end - String.to_existing_atom(backend).store(endpoint, panel_info, data, meta_data) + def deliver( + %{ + "backend" => backend, + "endpoint" => endpoint, + "panel_info" => panel_info, + "data" => data, + "meta_data" => meta_data + } = _job + ) do + deliver(String.to_existing_atom(backend), endpoint, panel_info, data, meta_data) end end