Skip to content

Commit

Permalink
GBFS : inscrire les exploitants aux notifications pour producteurs (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
AntoineAugusti authored Nov 27, 2024
1 parent b37399d commit 89eeafb
Show file tree
Hide file tree
Showing 6 changed files with 390 additions and 36 deletions.
10 changes: 9 additions & 1 deletion apps/transport/lib/db/contact.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,15 @@ defmodule DB.Contact do
field(:phone_number, DB.Encrypted.Binary)
field(:secondary_phone_number, DB.Encrypted.Binary)
field(:last_login_at, :utc_datetime_usec)
field(:creation_source, Ecto.Enum, values: [:"automation:import_contact_point", :admin, :datagouv_oauth_login])

field(:creation_source, Ecto.Enum,
values: [
:"automation:import_contact_point",
:"automation:import_gbfs_feed_contact_email",
:admin,
:datagouv_oauth_login
]
)

timestamps(type: :utc_datetime_usec)

Expand Down
41 changes: 40 additions & 1 deletion apps/transport/lib/db/notification_subscription.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ defmodule DB.NotificationSubscription do
:user,
:"automation:promote_producer_space",
:"automation:migrate_from_reuser_to_producer",
:"automation:import_contact_point"
:"automation:import_contact_point",
:"automation:gbfs_feed_contact_email"
]
)

Expand Down Expand Up @@ -127,6 +128,44 @@ defmodule DB.NotificationSubscription do
|> DB.Repo.all()
end

@doc """
Creates producer subscriptions for a given dataset, contact and notification source for all reasons.
"""
def create_producer_subscriptions(%DB.Dataset{id: dataset_id}, %DB.Contact{id: contact_id}, source) do
existing_reasons =
DB.NotificationSubscription.base_query()
|> where(
[notification_subscription: ns],
ns.dataset_id == ^dataset_id and ns.role == :producer and ns.contact_id == ^contact_id
)
|> select([notification_subscription: ns], ns.reason)
|> DB.Repo.all()
|> MapSet.new()

Transport.NotificationReason.subscribable_reasons_related_to_datasets(:producer)
|> MapSet.new()
|> MapSet.difference(existing_reasons)
|> Enum.each(fn reason ->
DB.NotificationSubscription.insert!(%{
role: :producer,
source: source,
reason: reason,
contact_id: contact_id,
dataset_id: dataset_id
})
end)
end

@doc """
Given a dataset, contact and notification source, delete other producer subscriptions for the same dataset and source.
"""
def delete_other_producers_subscriptions(%DB.Dataset{id: dataset_id}, %DB.Contact{id: contact_id}, source) do
DB.NotificationSubscription.base_query()
|> where([notification_subscription: ns], ns.dataset_id == ^dataset_id and ns.contact_id != ^contact_id)
|> where([notification_subscription: ns], ns.role == :producer and ns.source == ^source)
|> DB.Repo.delete_all()
end

defp validate_reason_is_allowed_for_subscriptions(changeset) do
reason = get_field(changeset, :reason)

Expand Down
43 changes: 9 additions & 34 deletions apps/transport/lib/jobs/import_dataset_contact_points_job.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ defmodule Transport.Jobs.ImportDatasetContactPointsJob do

# The number of workers to run in parallel
@task_concurrency 5
# The source when creating a contact
@contact_source :"automation:import_contact_point"
# The notification subscription source when creating/deleting subscriptions
@notification_subscription_source "automation:import_contact_point"

Expand Down Expand Up @@ -89,40 +91,13 @@ defmodule Transport.Jobs.ImportDatasetContactPointsJob do

defp update_contact_point(%DB.Dataset{} = dataset, %{"email" => _, "name" => _} = contact_point) do
contact = find_or_create_contact(contact_point)
create_contact_point_subscriptions(dataset, contact)
delete_other_contact_points_subscriptions(dataset, contact)
end

defp create_contact_point_subscriptions(%DB.Dataset{id: dataset_id}, %DB.Contact{id: contact_id}) do
existing_reasons =
DB.NotificationSubscription.base_query()
|> where(
[notification_subscription: ns],
ns.dataset_id == ^dataset_id and ns.role == :producer and ns.contact_id == ^contact_id
)
|> select([notification_subscription: ns], ns.reason)
|> DB.Repo.all()
|> MapSet.new()

Transport.NotificationReason.subscribable_reasons_related_to_datasets(:producer)
|> MapSet.new()
|> MapSet.difference(existing_reasons)
|> Enum.each(fn reason ->
DB.NotificationSubscription.insert!(%{
role: :producer,
source: @notification_subscription_source,
reason: reason,
contact_id: contact_id,
dataset_id: dataset_id
})
end)
end
DB.NotificationSubscription.create_producer_subscriptions(dataset, contact, @notification_subscription_source)

defp delete_other_contact_points_subscriptions(%DB.Dataset{id: dataset_id}, %DB.Contact{id: contact_id}) do
DB.NotificationSubscription.base_query()
|> where([notification_subscription: ns], ns.dataset_id == ^dataset_id and ns.contact_id != ^contact_id)
|> where([notification_subscription: ns], ns.role == :producer and ns.source == ^@notification_subscription_source)
|> DB.Repo.delete_all()
DB.NotificationSubscription.delete_other_producers_subscriptions(
dataset,
contact,
@notification_subscription_source
)
end

defp find_or_create_contact(%{"email" => email, "name" => name}) do
Expand All @@ -131,7 +106,7 @@ defmodule Transport.Jobs.ImportDatasetContactPointsJob do
contact

nil ->
Map.merge(guess_identity(name), %{email: email, creation_source: :"automation:import_contact_point"})
Map.merge(guess_identity(name), %{email: email, creation_source: @contact_source})
|> DB.Contact.insert!()
end
end
Expand Down
91 changes: 91 additions & 0 deletions apps/transport/lib/jobs/import_gbfs_feed_contact_email_job.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
defmodule Transport.Jobs.ImportGBFSFeedContactEmailJob do
@moduledoc """
Reuse `feed_contact_email` from GBFS feed.
Use these email addresses to find or create a contact and subscribe this contact
to producer subscriptions for this dataset.
When a `feed_contact_point` was previously set and has been removed, we delete old subscriptions.
"""
use Oban.Worker, max_attempts: 3
import Ecto.Query
require Logger

# The source when creating a contact
@contact_source :"automation:import_gbfs_feed_contact_email"
# The notification subscription source when creating/deleting subscriptions
@notification_subscription_source :"automation:gbfs_feed_contact_email"

@impl Oban.Worker
def perform(%Oban.Job{}) do
gbfs_feed_contact_emails() |> Enum.each(&update_feed_contact_email/1)
end

def update_feed_contact_email(
%{
resource_url: _,
dataset_id: dataset_id,
feed_contact_email: _
} = params
) do
dataset = DB.Repo.get!(DB.Dataset, dataset_id)
contact = find_or_create_contact(params)
DB.NotificationSubscription.create_producer_subscriptions(dataset, contact, @notification_subscription_source)

DB.NotificationSubscription.delete_other_producers_subscriptions(
dataset,
contact,
@notification_subscription_source
)
end

defp find_or_create_contact(%{resource_url: resource_url, feed_contact_email: feed_contact_email}) do
case DB.Repo.get_by(DB.Contact, email_hash: String.downcase(feed_contact_email)) do
%DB.Contact{} = contact ->
contact

nil ->
%{mailing_list_title: contact_title(resource_url), email: feed_contact_email, creation_source: @contact_source}
|> DB.Contact.insert!()
end
end

@doc """
iex> contact_title("https://api.cyclocity.fr/contracts/nantes/gbfs/gbfs.json")
"Équipe technique GBFS JC Decaux"
iex> contact_title("https://example.com/gbfs.json")
"Équipe technique GBFS Example"
iex> contact_title("https://404.fr")
"Équipe technique GBFS"
"""
def contact_title(resource_url) do
operator_name = Transport.GBFSMetadata.operator(resource_url) || ""
"Équipe technique GBFS #{operator_name}" |> String.trim()
end

@doc """
Finds feed contact emails for GBFS feeds.
Uses the metadata collected by `Transport.GBFSMetadata` over the last week.
"""
def gbfs_feed_contact_emails do
last_week = DateTime.utc_now() |> DateTime.add(-7, :day)

DB.ResourceMetadata.base_query()
|> join(:inner, [metadata: m], r in DB.Resource, on: r.id == m.resource_id, as: :resource)
|> where([resource: r], r.format == "gbfs")
|> where(
[metadata: m],
m.inserted_at >= ^last_week and fragment("?->'system_details' \\? 'feed_contact_email'", m.metadata)
)
|> select([metadata: m, resource: r], %{
resource_id: r.id,
resource_url: r.url,
dataset_id: r.dataset_id,
feed_contact_email:
last_value(fragment("?->'system_details'->> 'feed_contact_email'", m.metadata))
|> over(partition_by: m.resource_id, order_by: m.resource_id)
})
|> distinct(true)
|> DB.Repo.all()
end
end
Loading

0 comments on commit 89eeafb

Please sign in to comment.