diff --git a/apps/transport/lib/db/contact.ex b/apps/transport/lib/db/contact.ex index c56e6d5521..ff3f0efe78 100644 --- a/apps/transport/lib/db/contact.ex +++ b/apps/transport/lib/db/contact.ex @@ -29,7 +29,15 @@ defmodule DB.Contact do field(:phone_number, DB.Encrypted.Binary) field(:secondary_phone_number, DB.Encrypted.Binary) field(:last_login_at, :utc_datetime_usec) - field(:creation_source, Ecto.Enum, values: [:"automation:import_contact_point", :admin, :datagouv_oauth_login]) + + field(:creation_source, Ecto.Enum, + values: [ + :"automation:import_contact_point", + :"automation:import_gbfs_feed_contact_email", + :admin, + :datagouv_oauth_login + ] + ) timestamps(type: :utc_datetime_usec) diff --git a/apps/transport/lib/db/notification_subscription.ex b/apps/transport/lib/db/notification_subscription.ex index 85366e9f28..5810c8cd6c 100644 --- a/apps/transport/lib/db/notification_subscription.ex +++ b/apps/transport/lib/db/notification_subscription.ex @@ -20,7 +20,8 @@ defmodule DB.NotificationSubscription do :user, :"automation:promote_producer_space", :"automation:migrate_from_reuser_to_producer", - :"automation:import_contact_point" + :"automation:import_contact_point", + :"automation:gbfs_feed_contact_email" ] ) @@ -127,6 +128,44 @@ defmodule DB.NotificationSubscription do |> DB.Repo.all() end + @doc """ + Creates producer subscriptions for a given dataset, contact and notification source for all reasons. + """ + def create_producer_subscriptions(%DB.Dataset{id: dataset_id}, %DB.Contact{id: contact_id}, source) do + existing_reasons = + DB.NotificationSubscription.base_query() + |> where( + [notification_subscription: ns], + ns.dataset_id == ^dataset_id and ns.role == :producer and ns.contact_id == ^contact_id + ) + |> select([notification_subscription: ns], ns.reason) + |> DB.Repo.all() + |> MapSet.new() + + Transport.NotificationReason.subscribable_reasons_related_to_datasets(:producer) + |> MapSet.new() + |> MapSet.difference(existing_reasons) + |> Enum.each(fn reason -> + DB.NotificationSubscription.insert!(%{ + role: :producer, + source: source, + reason: reason, + contact_id: contact_id, + dataset_id: dataset_id + }) + end) + end + + @doc """ + Given a dataset, contact and notification source, delete other producer subscriptions for the same dataset and source. + """ + def delete_other_producers_subscriptions(%DB.Dataset{id: dataset_id}, %DB.Contact{id: contact_id}, source) do + DB.NotificationSubscription.base_query() + |> where([notification_subscription: ns], ns.dataset_id == ^dataset_id and ns.contact_id != ^contact_id) + |> where([notification_subscription: ns], ns.role == :producer and ns.source == ^source) + |> DB.Repo.delete_all() + end + defp validate_reason_is_allowed_for_subscriptions(changeset) do reason = get_field(changeset, :reason) diff --git a/apps/transport/lib/jobs/import_dataset_contact_points_job.ex b/apps/transport/lib/jobs/import_dataset_contact_points_job.ex index aaeb235094..24a833a896 100644 --- a/apps/transport/lib/jobs/import_dataset_contact_points_job.ex +++ b/apps/transport/lib/jobs/import_dataset_contact_points_job.ex @@ -17,6 +17,8 @@ defmodule Transport.Jobs.ImportDatasetContactPointsJob do # The number of workers to run in parallel @task_concurrency 5 + # The source when creating a contact + @contact_source :"automation:import_contact_point" # The notification subscription source when creating/deleting subscriptions @notification_subscription_source "automation:import_contact_point" @@ -89,40 +91,13 @@ defmodule Transport.Jobs.ImportDatasetContactPointsJob do defp update_contact_point(%DB.Dataset{} = dataset, %{"email" => _, "name" => _} = contact_point) do contact = find_or_create_contact(contact_point) - create_contact_point_subscriptions(dataset, contact) - delete_other_contact_points_subscriptions(dataset, contact) - end - - defp create_contact_point_subscriptions(%DB.Dataset{id: dataset_id}, %DB.Contact{id: contact_id}) do - existing_reasons = - DB.NotificationSubscription.base_query() - |> where( - [notification_subscription: ns], - ns.dataset_id == ^dataset_id and ns.role == :producer and ns.contact_id == ^contact_id - ) - |> select([notification_subscription: ns], ns.reason) - |> DB.Repo.all() - |> MapSet.new() - - Transport.NotificationReason.subscribable_reasons_related_to_datasets(:producer) - |> MapSet.new() - |> MapSet.difference(existing_reasons) - |> Enum.each(fn reason -> - DB.NotificationSubscription.insert!(%{ - role: :producer, - source: @notification_subscription_source, - reason: reason, - contact_id: contact_id, - dataset_id: dataset_id - }) - end) - end + DB.NotificationSubscription.create_producer_subscriptions(dataset, contact, @notification_subscription_source) - defp delete_other_contact_points_subscriptions(%DB.Dataset{id: dataset_id}, %DB.Contact{id: contact_id}) do - DB.NotificationSubscription.base_query() - |> where([notification_subscription: ns], ns.dataset_id == ^dataset_id and ns.contact_id != ^contact_id) - |> where([notification_subscription: ns], ns.role == :producer and ns.source == ^@notification_subscription_source) - |> DB.Repo.delete_all() + DB.NotificationSubscription.delete_other_producers_subscriptions( + dataset, + contact, + @notification_subscription_source + ) end defp find_or_create_contact(%{"email" => email, "name" => name}) do @@ -131,7 +106,7 @@ defmodule Transport.Jobs.ImportDatasetContactPointsJob do contact nil -> - Map.merge(guess_identity(name), %{email: email, creation_source: :"automation:import_contact_point"}) + Map.merge(guess_identity(name), %{email: email, creation_source: @contact_source}) |> DB.Contact.insert!() end end diff --git a/apps/transport/lib/jobs/import_gbfs_feed_contact_email_job.ex b/apps/transport/lib/jobs/import_gbfs_feed_contact_email_job.ex new file mode 100644 index 0000000000..1e58fbf9f5 --- /dev/null +++ b/apps/transport/lib/jobs/import_gbfs_feed_contact_email_job.ex @@ -0,0 +1,91 @@ +defmodule Transport.Jobs.ImportGBFSFeedContactEmailJob do + @moduledoc """ + Reuse `feed_contact_email` from GBFS feed. + + Use these email addresses to find or create a contact and subscribe this contact + to producer subscriptions for this dataset. + + When a `feed_contact_point` was previously set and has been removed, we delete old subscriptions. + """ + use Oban.Worker, max_attempts: 3 + import Ecto.Query + require Logger + + # The source when creating a contact + @contact_source :"automation:import_gbfs_feed_contact_email" + # The notification subscription source when creating/deleting subscriptions + @notification_subscription_source :"automation:gbfs_feed_contact_email" + + @impl Oban.Worker + def perform(%Oban.Job{}) do + gbfs_feed_contact_emails() |> Enum.each(&update_feed_contact_email/1) + end + + def update_feed_contact_email( + %{ + resource_url: _, + dataset_id: dataset_id, + feed_contact_email: _ + } = params + ) do + dataset = DB.Repo.get!(DB.Dataset, dataset_id) + contact = find_or_create_contact(params) + DB.NotificationSubscription.create_producer_subscriptions(dataset, contact, @notification_subscription_source) + + DB.NotificationSubscription.delete_other_producers_subscriptions( + dataset, + contact, + @notification_subscription_source + ) + end + + defp find_or_create_contact(%{resource_url: resource_url, feed_contact_email: feed_contact_email}) do + case DB.Repo.get_by(DB.Contact, email_hash: String.downcase(feed_contact_email)) do + %DB.Contact{} = contact -> + contact + + nil -> + %{mailing_list_title: contact_title(resource_url), email: feed_contact_email, creation_source: @contact_source} + |> DB.Contact.insert!() + end + end + + @doc """ + iex> contact_title("https://api.cyclocity.fr/contracts/nantes/gbfs/gbfs.json") + "Équipe technique GBFS JC Decaux" + iex> contact_title("https://example.com/gbfs.json") + "Équipe technique GBFS Example" + iex> contact_title("https://404.fr") + "Équipe technique GBFS" + """ + def contact_title(resource_url) do + operator_name = Transport.GBFSMetadata.operator(resource_url) || "" + "Équipe technique GBFS #{operator_name}" |> String.trim() + end + + @doc """ + Finds feed contact emails for GBFS feeds. + Uses the metadata collected by `Transport.GBFSMetadata` over the last week. + """ + def gbfs_feed_contact_emails do + last_week = DateTime.utc_now() |> DateTime.add(-7, :day) + + DB.ResourceMetadata.base_query() + |> join(:inner, [metadata: m], r in DB.Resource, on: r.id == m.resource_id, as: :resource) + |> where([resource: r], r.format == "gbfs") + |> where( + [metadata: m], + m.inserted_at >= ^last_week and fragment("?->'system_details' \\? 'feed_contact_email'", m.metadata) + ) + |> select([metadata: m, resource: r], %{ + resource_id: r.id, + resource_url: r.url, + dataset_id: r.dataset_id, + feed_contact_email: + last_value(fragment("?->'system_details'->> 'feed_contact_email'", m.metadata)) + |> over(partition_by: m.resource_id, order_by: m.resource_id) + }) + |> distinct(true) + |> DB.Repo.all() + end +end diff --git a/apps/transport/test/transport/jobs/import_gbfs_feed_contact_point_job_test.exs b/apps/transport/test/transport/jobs/import_gbfs_feed_contact_point_job_test.exs new file mode 100644 index 0000000000..0b4c58115e --- /dev/null +++ b/apps/transport/test/transport/jobs/import_gbfs_feed_contact_point_job_test.exs @@ -0,0 +1,239 @@ +defmodule Transport.Test.Transport.Jobs.ImportGBFSFeedContactEmailJobTest do + use ExUnit.Case, async: true + import DB.Factory + import Ecto.Query + use Oban.Testing, repo: DB.Repo + alias Transport.Jobs.ImportGBFSFeedContactEmailJob + doctest ImportGBFSFeedContactEmailJob, import: true + + @producer_reasons Transport.NotificationReason.subscribable_reasons_related_to_datasets(:producer) |> MapSet.new() + + setup do + Ecto.Adapters.SQL.Sandbox.checkout(DB.Repo) + end + + test "gbfs_feed_contact_emails" do + gbfs_1 = insert(:resource, format: "gbfs") + gbfs_2 = insert(:resource, format: "gbfs") + gbfs_3 = insert(:resource, format: "gbfs") + gbfs_4 = insert(:resource, format: "gbfs") + + ten_days_ago = DateTime.utc_now() |> DateTime.add(-10, :day) + five_days_ago = DateTime.utc_now() |> DateTime.add(-5, :day) + + # `gbfs_1` is relevant but should not be duplicated + insert(:resource_metadata, + resource_id: gbfs_1.id, + metadata: %{system_details: %{feed_contact_email: "gbfs1_old@example.com"}}, + inserted_at: five_days_ago + ) + + insert(:resource_metadata, + resource_id: gbfs_1.id, + metadata: %{system_details: %{feed_contact_email: gbfs_1_email = "gbfs1@example.com"}} + ) + + insert(:resource_metadata, + resource_id: gbfs_1.id, + metadata: %{system_details: %{feed_contact_email: gbfs_1_email}} + ) + + # `gbfs_4` should be included + insert(:resource_metadata, + resource_id: gbfs_4.id, + metadata: %{system_details: %{feed_contact_email: gbfs_4_email = "gbfs4@example.com"}} + ) + + # Ignored: too old + insert(:resource_metadata, + resource_id: gbfs_2.id, + metadata: %{system_details: %{feed_contact_email: "gbfs2@example.com"}}, + inserted_at: ten_days_ago + ) + + # Ignored: no feed_contact_email + insert(:resource_metadata, + resource_id: gbfs_3.id, + metadata: %{system_details: %{foo: 42}}, + inserted_at: five_days_ago + ) + + result = ImportGBFSFeedContactEmailJob.gbfs_feed_contact_emails() + assert Enum.count(result) == 2 + + assert [ + %{ + resource_id: gbfs_1.id, + feed_contact_email: gbfs_1_email, + dataset_id: gbfs_1.dataset_id, + resource_url: gbfs_1.url + }, + %{ + resource_id: gbfs_4.id, + feed_contact_email: gbfs_4_email, + dataset_id: gbfs_4.dataset_id, + resource_url: gbfs_4.url + } + ] + |> MapSet.new() == MapSet.new(result) + end + + describe "update_feed_contact_email" do + test "creates producer subscriptions for an existing contact with a subscription" do + %DB.Contact{id: contact_id} = gbfs_contact = insert_contact() + %DB.Dataset{id: dataset_id} = dataset = insert(:dataset) + + insert(:notification_subscription, + dataset_id: dataset.id, + contact_id: gbfs_contact.id, + role: :producer, + reason: :expiration, + source: :user + ) + + ImportGBFSFeedContactEmailJob.update_feed_contact_email(%{ + resource_url: "https://example.com/gbfs.json", + dataset_id: dataset_id, + feed_contact_email: gbfs_contact.email + }) + + assert @producer_reasons == + DB.NotificationSubscription.base_query() + |> where( + [notification_subscription: ns], + ns.dataset_id == ^dataset_id and ns.role == :producer and ns.contact_id == ^contact_id + ) + |> select([notification_subscription: ns], ns.reason) + |> DB.Repo.all() + |> MapSet.new() + + # Kept the already existing subscription made by the user (`source: :user`) and created + # the remaining producer reasons. + assert [ + %{count: Enum.count(@producer_reasons) - 1, source: :"automation:gbfs_feed_contact_email"}, + %{count: 1, source: :user} + ] == + DB.NotificationSubscription.base_query() + |> where( + [notification_subscription: ns], + ns.dataset_id == ^dataset_id and ns.role == :producer and ns.contact_id == ^contact_id + ) + |> select([notification_subscription: ns], %{source: ns.source, count: count(ns.id)}) + |> group_by([notification_subscription: ns], ns.source) + |> DB.Repo.all() + end + + test "creates a new contact and producer subscriptions, deletes the previous GBFS contact subscriptions" do + %DB.Dataset{id: dataset_id} = dataset = insert(:dataset) + previous_gbfs_contact = insert_contact() + email = "john@example.fr" + + previous_gbfs_contact_ns = + insert(:notification_subscription, + dataset_id: dataset.id, + contact_id: previous_gbfs_contact.id, + role: :producer, + reason: :expiration, + source: :"automation:gbfs_feed_contact_email" + ) + + ImportGBFSFeedContactEmailJob.update_feed_contact_email(%{ + resource_url: "https://example.com/gbfs.json", + dataset_id: dataset_id, + feed_contact_email: email + }) + + %DB.Contact{email: ^email, creation_source: :"automation:import_gbfs_feed_contact_email"} = + contact = DB.Repo.get_by(DB.Contact, mailing_list_title: "Équipe technique GBFS Example") + + assert nil == DB.Repo.reload(previous_gbfs_contact_ns) + assert MapSet.new([]) == subscribed_reasons(dataset, previous_gbfs_contact) + assert @producer_reasons == subscribed_reasons(dataset, contact) + end + + test "does nothing if the subscriptions are already in place, for another source" do + gbfs_contact = insert_contact() + other_producer = insert_contact() + dataset = insert(:dataset) + + subscriptions = + Enum.map(@producer_reasons, fn reason -> + insert(:notification_subscription, + dataset_id: dataset.id, + contact_id: gbfs_contact.id, + role: :producer, + reason: reason, + source: :user + ) + end) + + # Another producer production does not interfere + other_ns = + insert(:notification_subscription, + dataset_id: dataset.id, + contact_id: other_producer.id, + role: :producer, + reason: :expiration, + source: :user + ) + + ImportGBFSFeedContactEmailJob.update_feed_contact_email(%{ + resource_url: "https://example.com/gbfs.json", + dataset_id: dataset.id, + feed_contact_email: gbfs_contact.email + }) + + # Subscriptions are still there and did not change + assert other_ns == DB.Repo.reload(other_ns) + assert subscriptions == DB.Repo.reload(subscriptions) + end + end + + test "perform" do + gbfs_1 = insert(:resource, dataset: insert(:dataset), format: "gbfs", url: "https://example.com/gbfs.json") + gbfs_2 = insert(:resource, dataset: insert(:dataset), format: "gbfs") + %DB.Contact{id: existing_gbfs_contact_id, email: gbfs_2_email} = existing_gbfs_contact = insert_contact() + + five_days_ago = DateTime.utc_now() |> DateTime.add(-5, :day) + + insert(:resource_metadata, + resource_id: gbfs_1.id, + metadata: %{system_details: %{feed_contact_email: gbfs_1_email = "gbfs1@example.com"}}, + inserted_at: five_days_ago + ) + + insert(:resource_metadata, + resource_id: gbfs_2.id, + metadata: %{system_details: %{feed_contact_email: gbfs_2_email}} + ) + + assert :ok == perform_job(ImportGBFSFeedContactEmailJob, %{}) + + assert [first_contact, new_contact] = DB.Contact |> DB.Repo.all() |> Enum.sort_by(& &1.inserted_at) + + assert %DB.Contact{id: ^existing_gbfs_contact_id, email: ^gbfs_2_email} = first_contact + assert "Example" == Transport.GBFSMetadata.operator(gbfs_1.url) + assert %DB.Contact{email: ^gbfs_1_email, mailing_list_title: "Équipe technique GBFS Example"} = new_contact + + # Subscriptions have been created: + # - `new_contact` for `gbfs_1`'s dataset only + # - `existing_gbfs_contact` for `gbfs_2`'s dataset only + assert @producer_reasons == subscribed_reasons(%DB.Dataset{id: gbfs_1.dataset_id}, new_contact) + assert MapSet.new([]) == subscribed_reasons(%DB.Dataset{id: gbfs_2.dataset_id}, new_contact) + + assert MapSet.new([]) == subscribed_reasons(%DB.Dataset{id: gbfs_1.dataset_id}, existing_gbfs_contact) + assert @producer_reasons == subscribed_reasons(%DB.Dataset{id: gbfs_2.dataset_id}, existing_gbfs_contact) + end + + defp subscribed_reasons(%DB.Dataset{id: dataset_id}, %DB.Contact{id: contact_id}) do + DB.NotificationSubscription.base_query() + |> where( + [notification_subscription: ns], + ns.dataset_id == ^dataset_id and ns.role == :producer and ns.contact_id == ^contact_id and + ns.source == :"automation:gbfs_feed_contact_email" + ) + |> select([notification_subscription: ns], ns.reason) + |> DB.Repo.all() + |> MapSet.new() + end +end diff --git a/config/runtime.exs b/config/runtime.exs index 915119a8ac..12a415b7b0 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -155,6 +155,8 @@ oban_prod_crontab = [ {"15 8 * 3,6,11 1", Transport.Jobs.PeriodicReminderProducersNotificationJob}, {"15 5 * * *", Transport.Jobs.ImportDatasetFollowersJob}, {"20 5 * * *", Transport.Jobs.ImportDatasetContactPointsJob}, + # Should be ideally executed after `GBFSMultiValidationDispatcherJob` to use fresh metadata + {"30 8 * * *", Transport.Jobs.ImportGBFSFeedContactEmailJob}, {"30 5 * * *", Transport.Jobs.ImportDatasetMonthlyMetricsJob}, {"45 5 * * *", Transport.Jobs.ImportResourceMonthlyMetricsJob}, {"0 8 * * *", Transport.Jobs.WarnUserInactivityJob},