From a0a52d12220b850031ab5d19e4d6b5278bb2c4ce Mon Sep 17 00:00:00 2001 From: Antoine Augusti Date: Tue, 26 Nov 2024 09:17:39 +0100 Subject: [PATCH] =?UTF-8?q?Import=20des=20donn=C3=A9es=20des=20stations=20?= =?UTF-8?q?GBFS=20dans=20GeoData=20(#4325)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * GeoDataImport : ajoute colonne slug * Ajout job GBFSStationsToGeoData * Ignore virtual stations/stations with invalid coordinates * Set Ecto type for slug as Ecto.Enum --- ...geo_data _import.ex => geo_data_import.ex} | 1 + apps/transport/lib/jobs/geo_data/base.ex | 81 ++++++-- .../lib/jobs/geo_data/bnlc_to_geo_data.ex | 7 +- .../geo_data/gbfs_stations_to_geo_data.ex | 93 +++++++++ .../lib/jobs/geo_data/irve_to_geo_data.ex | 5 +- .../lib/jobs/geo_data/lez_to_geo_data.ex | 5 +- .../geo_data/parkings_relais_to_geo_data.ex | 5 +- ...0241120132849_geo_data_import_add_slug.exs | 11 + .../jobs/geo_data/bnlc_to_geodata_test.exs | 8 +- .../gbfs_stations_to_geo_data_test.exs | 193 ++++++++++++++++++ .../jobs/geo_data/irve_to_geodata_test.exs | 8 +- .../jobs/geo_data/lez_to_geo_data_test.exs | 8 +- .../parkings_relais_to_geo_data_test.exs | 8 +- config/runtime.exs | 1 + 14 files changed, 392 insertions(+), 42 deletions(-) rename apps/transport/lib/db/geo_data/{geo_data _import.ex => geo_data_import.ex} (88%) create mode 100644 apps/transport/lib/jobs/geo_data/gbfs_stations_to_geo_data.ex create mode 100644 apps/transport/priv/repo/migrations/20241120132849_geo_data_import_add_slug.exs create mode 100644 apps/transport/test/transport/jobs/geo_data/gbfs_stations_to_geo_data_test.exs diff --git a/apps/transport/lib/db/geo_data/geo_data _import.ex b/apps/transport/lib/db/geo_data/geo_data_import.ex similarity index 88% rename from apps/transport/lib/db/geo_data/geo_data _import.ex rename to apps/transport/lib/db/geo_data/geo_data_import.ex index 518d338af0..f50b0a3ce0 100644 --- a/apps/transport/lib/db/geo_data/geo_data _import.ex +++ b/apps/transport/lib/db/geo_data/geo_data_import.ex @@ -8,6 +8,7 @@ defmodule DB.GeoDataImport do typed_schema "geo_data_import" do belongs_to(:resource_history, DB.ResourceHistory) + field(:slug, Ecto.Enum, values: Transport.ConsolidatedDataset.geo_data_datasets() ++ [:gbfs_stations]) has_many(:geo_data, DB.GeoData) timestamps(type: :utc_datetime_usec) diff --git a/apps/transport/lib/jobs/geo_data/base.ex b/apps/transport/lib/jobs/geo_data/base.ex index 5e441186c5..be91b1fcaf 100644 --- a/apps/transport/lib/jobs/geo_data/base.ex +++ b/apps/transport/lib/jobs/geo_data/base.ex @@ -1,61 +1,103 @@ defmodule Transport.Jobs.BaseGeoData do @moduledoc """ Shared methods for GeoData import jobs. + + Provides methods to import/replace data for: + - consolidated datasets which should be replaced when we have a newer resource history + - anything else, identified by a slug, which should be replaced everytime """ require Logger + @import_timeout :timer.seconds(60) + @consolidated_datasets_slugs Transport.ConsolidatedDataset.geo_data_datasets() + + def insert_data(geo_data_import_id, prepare_data_for_insert_fn) do + prepare_data_for_insert_fn.(geo_data_import_id) + |> Stream.each(&DB.Repo.insert_all(DB.GeoData, &1)) + |> Stream.run() + end + def insert_data(body, geo_data_import_id, prepare_data_for_insert_fn) do body |> prepare_data_for_insert_fn.(geo_data_import_id) - |> Stream.chunk_every(1000) + |> Stream.chunk_every(1_000) |> Stream.each(&DB.Repo.insert_all(DB.GeoData, &1)) |> Stream.run() end - def needs_import?( - %DB.ResourceHistory{id: latest_resource_history_id}, - %DB.GeoDataImport{resource_history_id: resource_history_id} - ), - do: latest_resource_history_id != resource_history_id + defp needs_import?( + %DB.ResourceHistory{id: latest_resource_history_id}, + %DB.GeoDataImport{resource_history_id: resource_history_id} + ), + do: latest_resource_history_id != resource_history_id - def needs_import?(_, nil), do: true + defp needs_import?(_, nil), do: true - def import_replace_data(%DB.Resource{id: resource_id, dataset_id: dataset_id}, prepare_data_for_insert_fn) do + # For a static resource, associated to a consolidated dataset (BNLC, BNZFE, IRVE etc) + # We rely on the relevant `DB.ResourceHistory` to determine if we should replace the data. + def import_replace_data(slug, prepare_data_for_insert_fn) when slug in @consolidated_datasets_slugs do + %DB.Resource{id: resource_id, dataset_id: dataset_id} = Transport.ConsolidatedDataset.resource(slug) latest_resource_history = DB.ResourceHistory.latest_resource_history(resource_id) current_geo_data_import = DB.GeoDataImport.dataset_latest_geo_data_import(dataset_id) if needs_import?(latest_resource_history, current_geo_data_import) do Logger.info("New content detected...update content") - perform_import(current_geo_data_import, latest_resource_history, prepare_data_for_insert_fn) + perform_import(current_geo_data_import, latest_resource_history, slug, prepare_data_for_insert_fn) end :ok end + # For a non-static resource: we don't rely on a `DB.ResourceHistory` associated to a static + # resource to determine if we should replace the data, it should always be replaced. + def import_replace_data(slug, prepare_data_for_insert_fn) when is_atom(slug) do + current_geo_data_import = DB.Repo.get_by(DB.GeoDataImport, slug: slug) + + Logger.info("geo_data for a slug is always replaced… Updating content for #{slug}") + perform_import(current_geo_data_import, slug, prepare_data_for_insert_fn) + + :ok + end + + # For a consolidated dataset, we rely on the latest resource history content to perform the import. defp perform_import( current_geo_data_import, %DB.ResourceHistory{id: latest_resource_history_id, payload: %{"permanent_url" => permanent_url}}, + slug, prepare_data_for_insert_fn - ) do + ) + when slug in @consolidated_datasets_slugs do DB.Repo.transaction( fn -> unless is_nil(current_geo_data_import) do - # thanks to cascading delete, it will also clean geo_data table corresponding entries + # Thanks to cascading delete, it will also clean geo_data table corresponding entries current_geo_data_import |> DB.Repo.delete!() end - %{id: geo_data_import_id} = DB.Repo.insert!(%DB.GeoDataImport{resource_history_id: latest_resource_history_id}) + %{id: geo_data_import_id} = + %DB.GeoDataImport{resource_history_id: latest_resource_history_id, slug: slug} |> DB.Repo.insert!() + http_client = Transport.Shared.Wrapper.HTTPoison.impl() - %{status_code: 200, body: body} = http_client.get!(permanent_url) + %HTTPoison.Response{status_code: 200, body: body} = http_client.get!(permanent_url) insert_data(body, geo_data_import_id, prepare_data_for_insert_fn) end, - timeout: 60_000 + timeout: @import_timeout ) end - # keep 6 digits for WGS 84, see https://en.wikipedia.org/wiki/Decimal_degrees#Precision - def parse_coordinate(s) do - s |> string_to_float() |> Float.round(6) + defp perform_import(current_geo_data_import, slug, prepare_data_for_insert_fn) when is_atom(slug) do + DB.Repo.transaction( + fn -> + unless is_nil(current_geo_data_import) do + # Thanks to cascading delete, it will also clean geo_data table corresponding entries + current_geo_data_import |> DB.Repo.delete!() + end + + %DB.GeoDataImport{id: geo_data_import_id} = DB.Repo.insert!(%DB.GeoDataImport{slug: slug}) + insert_data(geo_data_import_id, prepare_data_for_insert_fn) + end, + timeout: @import_timeout + ) end def prepare_csv_data_for_import(body, prepare_data_fn, opts \\ []) do @@ -75,6 +117,11 @@ defmodule Transport.Jobs.BaseGeoData do |> Stream.map(prepare_data_fn) end + # keep 6 digits for WGS 84, see https://en.wikipedia.org/wiki/Decimal_degrees#Precision + def parse_coordinate(s) do + s |> string_to_float() |> Float.round(6) + end + # remove spaces (U+0020) and non-break spaces (U+00A0) from the string defp string_to_float(s), do: s |> String.trim() |> String.replace([" ", " "], "") |> String.to_float() end diff --git a/apps/transport/lib/jobs/geo_data/bnlc_to_geo_data.ex b/apps/transport/lib/jobs/geo_data/bnlc_to_geo_data.ex index d598071adf..2001c97604 100644 --- a/apps/transport/lib/jobs/geo_data/bnlc_to_geo_data.ex +++ b/apps/transport/lib/jobs/geo_data/bnlc_to_geo_data.ex @@ -1,17 +1,14 @@ defmodule Transport.Jobs.BNLCToGeoData do @moduledoc """ Job in charge of taking the content of the BNLC (Base Nationale de Covoiturage) and storing it - in the geo_data table + in the `geo_data` table """ use Oban.Worker, max_attempts: 3 require Logger @impl Oban.Worker def perform(%{}) do - Transport.ConsolidatedDataset.resource(:bnlc) - |> Transport.Jobs.BaseGeoData.import_replace_data(&prepare_data_for_insert/2) - - :ok + Transport.Jobs.BaseGeoData.import_replace_data(:bnlc, &prepare_data_for_insert/2) end def prepare_data_for_insert(body, geo_data_import_id) do diff --git a/apps/transport/lib/jobs/geo_data/gbfs_stations_to_geo_data.ex b/apps/transport/lib/jobs/geo_data/gbfs_stations_to_geo_data.ex new file mode 100644 index 0000000000..5267dafd16 --- /dev/null +++ b/apps/transport/lib/jobs/geo_data/gbfs_stations_to_geo_data.ex @@ -0,0 +1,93 @@ +defmodule Transport.Jobs.GBFSStationsToGeoData do + @moduledoc """ + Job in charge of importing GBFS stations data (docks for bikes usually) to the `geo_data` table. + + It ignores virtual stations and stations with invalid coordinates. + """ + use Oban.Worker, max_attempts: 3 + import Ecto.Query + + # The number of workers to run in parallel when fetching GBFS feeds + @task_concurrency 5 + @task_timeout :timer.seconds(15) + + @impl Oban.Worker + def perform(%Oban.Job{}) do + Transport.Jobs.BaseGeoData.import_replace_data(:gbfs_stations, &prepare_data_for_insert/1) + end + + def prepare_data_for_insert(geo_data_import_id) do + relevant_gbfs_urls() + |> Task.async_stream( + fn url -> prepare_stations_data(url, geo_data_import_id) end, + max_concurrency: @task_concurrency, + on_timeout: :kill_task, + timeout: @task_timeout + ) + |> Stream.filter(fn {status, _} -> status == :ok end) + |> Stream.map(fn {:ok, value} -> value end) + end + + def prepare_stations_data(gbfs_url, geo_data_import_id) do + with {:ok, %HTTPoison.Response{status_code: 200, body: body}} <- http_client().get(gbfs_url), + {:ok, json} <- Jason.decode(body), + {:ok, feed_url} <- {:ok, Transport.GBFSMetadata.feed_url_by_name(json, :station_information)}, + {:feed_exists, true} <- {:feed_exists, not is_nil(feed_url)}, + {:ok, %HTTPoison.Response{status_code: 200, body: body}} <- http_client().get(feed_url), + {:ok, json} <- Jason.decode(body) do + json["data"]["stations"] + |> Enum.reject(&(virtual_station?(&1) or missing_coordinates?(&1))) + |> Enum.map(fn station -> + %{ + geo_data_import_id: geo_data_import_id, + geom: %Geo.Point{coordinates: {station["lon"], station["lat"]}, srid: 4326}, + payload: %{ + capacity: station["capacity"], + name: station_name(station) + } + } + end) + else + _ -> [] + end + end + + defp virtual_station?(%{"is_virtual_station" => true}), do: true + defp virtual_station?(%{}), do: false + + defp missing_coordinates?(%{"lat" => lat, "lon" => lon}) do + is_nil(lon) or is_nil(lat) + end + + defp missing_coordinates?(%{}), do: true + + # From GBFS 1.1 until 2.3 + defp station_name(%{"name" => name}) when is_binary(name), do: name + + # From GBFS 3.0 onwards + defp station_name(%{"name" => names}) do + names |> hd() |> Map.get("text") + end + + @doc """ + Fetches relevant GBFS feeds for which we know they have stations data, based on the metadata we compute. + Look at the last week of metadata to prevent potential upstream downtimes/ + issues while computing metadata on our side. + """ + def relevant_gbfs_urls do + a_week_ago = DateTime.utc_now() |> DateTime.add(-7, :day) + + DB.ResourceMetadata.base_query() + |> join(:inner, [metadata: m], r in DB.Resource, on: r.id == m.resource_id, as: :resource) + |> where([resource: r], r.format == "gbfs") + |> where([metadata: m], m.inserted_at >= ^a_week_ago and fragment("?->'types' \\? 'stations'", m.metadata)) + |> select( + [resource: r, metadata: m], + last_value(r.url) |> over(partition_by: m.resource_id, order_by: m.resource_id) + ) + |> distinct(true) + |> DB.Repo.all() + end + + defp http_client, do: Transport.Shared.Wrapper.HTTPoison.impl() +end diff --git a/apps/transport/lib/jobs/geo_data/irve_to_geo_data.ex b/apps/transport/lib/jobs/geo_data/irve_to_geo_data.ex index 1ee100680e..883e47fec3 100644 --- a/apps/transport/lib/jobs/geo_data/irve_to_geo_data.ex +++ b/apps/transport/lib/jobs/geo_data/irve_to_geo_data.ex @@ -7,10 +7,7 @@ defmodule Transport.Jobs.IRVEToGeoData do @impl Oban.Worker def perform(%Oban.Job{}) do - Transport.ConsolidatedDataset.resource(:irve) - |> Transport.Jobs.BaseGeoData.import_replace_data(&prepare_data_for_insert/2) - - :ok + Transport.Jobs.BaseGeoData.import_replace_data(:irve, &prepare_data_for_insert/2) end def prepare_data_for_insert(body, geo_data_import_id) do diff --git a/apps/transport/lib/jobs/geo_data/lez_to_geo_data.ex b/apps/transport/lib/jobs/geo_data/lez_to_geo_data.ex index fdeb0b9d8b..f145f2599f 100644 --- a/apps/transport/lib/jobs/geo_data/lez_to_geo_data.ex +++ b/apps/transport/lib/jobs/geo_data/lez_to_geo_data.ex @@ -7,10 +7,7 @@ defmodule Transport.Jobs.LowEmissionZonesToGeoData do @impl Oban.Worker def perform(%{}) do - Transport.ConsolidatedDataset.resource(:zfe) - |> Transport.Jobs.BaseGeoData.import_replace_data(&prepare_data_for_insert/2) - - :ok + Transport.Jobs.BaseGeoData.import_replace_data(:zfe, &prepare_data_for_insert/2) end def prepare_data_for_insert(body, geo_data_import_id) do diff --git a/apps/transport/lib/jobs/geo_data/parkings_relais_to_geo_data.ex b/apps/transport/lib/jobs/geo_data/parkings_relais_to_geo_data.ex index e084674d3e..af77db2aeb 100644 --- a/apps/transport/lib/jobs/geo_data/parkings_relais_to_geo_data.ex +++ b/apps/transport/lib/jobs/geo_data/parkings_relais_to_geo_data.ex @@ -6,10 +6,7 @@ defmodule Transport.Jobs.ParkingsRelaisToGeoData do @impl Oban.Worker def perform(%{}) do - Transport.ConsolidatedDataset.resource(:parkings_relais) - |> Transport.Jobs.BaseGeoData.import_replace_data(&prepare_data_for_insert/2) - - :ok + Transport.Jobs.BaseGeoData.import_replace_data(:parkings_relais, &prepare_data_for_insert/2) end defp pr_count(""), do: 0 diff --git a/apps/transport/priv/repo/migrations/20241120132849_geo_data_import_add_slug.exs b/apps/transport/priv/repo/migrations/20241120132849_geo_data_import_add_slug.exs new file mode 100644 index 0000000000..25d3767919 --- /dev/null +++ b/apps/transport/priv/repo/migrations/20241120132849_geo_data_import_add_slug.exs @@ -0,0 +1,11 @@ +defmodule DB.Repo.Migrations.GeoDataImportAddSlug do + use Ecto.Migration + + def change do + alter table(:geo_data_import) do + add(:slug, :string) + end + + create(unique_index(:geo_data_import, [:slug])) + end +end diff --git a/apps/transport/test/transport/jobs/geo_data/bnlc_to_geodata_test.exs b/apps/transport/test/transport/jobs/geo_data/bnlc_to_geodata_test.exs index 15d2d30d6a..e4dd238162 100644 --- a/apps/transport/test/transport/jobs/geo_data/bnlc_to_geodata_test.exs +++ b/apps/transport/test/transport/jobs/geo_data/bnlc_to_geodata_test.exs @@ -71,7 +71,9 @@ defmodule Transport.Jobs.BNLCToGeoDataTest do assert :ok = perform_job(BNLCToGeoData, %{}) # data is imported - [%{id: geo_data_import_1, resource_history_id: ^id_0}] = DB.GeoDataImport |> DB.Repo.all() + [%DB.GeoDataImport{id: geo_data_import_1, resource_history_id: ^id_0, slug: :bnlc}] = + DB.GeoDataImport |> DB.Repo.all() + assert DB.GeoData |> DB.Repo.all() |> Enum.count() == 2 # relaunch job @@ -92,7 +94,9 @@ defmodule Transport.Jobs.BNLCToGeoDataTest do assert :ok = perform_job(BNLCToGeoData, %{}) # geo_data and geo_data_import are updated accordingly - [%{id: geo_data_import_2, resource_history_id: ^id_1}] = DB.GeoDataImport |> DB.Repo.all() + [%DB.GeoDataImport{id: geo_data_import_2, resource_history_id: ^id_1, slug: :bnlc}] = + DB.GeoDataImport |> DB.Repo.all() + assert geo_data_import_2 !== geo_data_import_1 [%{geo_data_import_id: ^geo_data_import_2}, %{geo_data_import_id: ^geo_data_import_2}] = DB.GeoData |> DB.Repo.all() diff --git a/apps/transport/test/transport/jobs/geo_data/gbfs_stations_to_geo_data_test.exs b/apps/transport/test/transport/jobs/geo_data/gbfs_stations_to_geo_data_test.exs new file mode 100644 index 0000000000..0b948a36ea --- /dev/null +++ b/apps/transport/test/transport/jobs/geo_data/gbfs_stations_to_geo_data_test.exs @@ -0,0 +1,193 @@ +defmodule Transport.Jobs.GBFSStationsToGeoDataTest do + use ExUnit.Case, async: true + use Oban.Testing, repo: DB.Repo + alias Transport.Jobs.GBFSStationsToGeoData + import DB.Factory + import Mox + + setup do + Ecto.Adapters.SQL.Sandbox.checkout(DB.Repo) + end + + setup :verify_on_exit! + + test "relevant_gbfs_urls" do + gbfs_1 = insert(:resource, format: "gbfs", url: "https://example.com/gbfs_1") + gbfs_2 = insert(:resource, format: "gbfs", url: "https://example.com/gbfs_2") + gbfs_3 = insert(:resource, format: "gbfs", url: "https://example.com/gbfs_3") + gbfs_4 = insert(:resource, format: "gbfs", url: "https://example.com/gbfs_4") + + ten_days_ago = DateTime.utc_now() |> DateTime.add(-10, :day) + five_days_ago = DateTime.utc_now() |> DateTime.add(-5, :day) + + # `gbfs_1` is relevant but should not be duplicated + insert(:resource_metadata, resource_id: gbfs_1.id, metadata: %{types: ["stations"]}, inserted_at: five_days_ago) + insert(:resource_metadata, resource_id: gbfs_1.id, metadata: %{types: ["stations"]}) + # `gbfs_4` should be included: stations + free floating + insert(:resource_metadata, resource_id: gbfs_4.id, metadata: %{types: ["free_floating", "stations"]}) + + # Ignored: too old + insert(:resource_metadata, resource_id: gbfs_2.id, metadata: %{types: ["stations"]}, inserted_at: ten_days_ago) + # Ignored: no stations + insert(:resource_metadata, + resource_id: gbfs_3.id, + metadata: %{types: ["free_floating"]}, + inserted_at: five_days_ago + ) + + result = GBFSStationsToGeoData.relevant_gbfs_urls() + assert Enum.count(result) == 2 + assert [gbfs_1.url, gbfs_4.url] |> MapSet.new() == result |> MapSet.new() + end + + describe "perform" do + test "imports stations data from 2 feeds" do + assert DB.GeoData |> DB.Repo.all() |> Enum.empty?() + assert DB.GeoDataImport |> DB.Repo.all() |> Enum.empty?() + + gbfs_1 = insert(:resource, format: "gbfs", url: "https://example.com/gbfs_1") + gbfs_2 = insert(:resource, format: "gbfs", url: "https://example.com/gbfs_2") + station_info_url_1 = "https://example.com/station_info_url_1" + station_info_url_2 = "https://example.com/station_info_url_2" + + insert(:resource_metadata, + resource_id: gbfs_1.id, + metadata: %{types: ["stations"]}, + inserted_at: DateTime.utc_now() |> DateTime.add(-5, :day) + ) + + insert(:resource_metadata, resource_id: gbfs_2.id, metadata: %{types: ["free_floating", "stations"]}) + + assert [gbfs_1.url, gbfs_2.url] |> MapSet.new() == GBFSStationsToGeoData.relevant_gbfs_urls() |> MapSet.new() + + setup_responses(%{ + gbfs_1.url => %{ + "version" => "3.0", + "data" => %{"feeds" => [%{"name" => "station_information", "url" => station_info_url_1}]} + }, + station_info_url_1 => %{ + "version" => "3.0", + "data" => %{ + "stations" => [ + %{ + "capacity" => 30, + "is_valet_station" => false, + "is_virtual_station" => false, + "lat" => 47.26095, + "lon" => -2.3353, + "name" => [ + %{ + "language" => "fr", + "text" => "Rond-point" + } + ] + } + ] + } + }, + gbfs_2.url => %{ + "version" => "3.0", + "data" => %{"feeds" => [%{"name" => "station_information", "url" => station_info_url_2}]} + }, + station_info_url_2 => %{ + "version" => "3.0", + "data" => %{ + "stations" => [ + %{ + "capacity" => 20, + "is_valet_station" => false, + "is_virtual_station" => false, + "lat" => 45.4542, + "lon" => 2.6278, + "name" => [ + %{ + "language" => "fr", + "text" => "Gare" + } + ] + }, + # Ignored: virtual station + %{ + "capacity" => 10, + "is_virtual_station" => true, + "lat" => 2, + "lon" => 1, + "name" => [ + %{ + "language" => "fr", + "text" => "Bistrot" + } + ] + }, + # Ignored: latitude is nil + %{ + "capacity" => 10, + "lat" => nil, + "lon" => 1, + "name" => [ + %{ + "language" => "fr", + "text" => "Pub" + } + ] + }, + # Ignored: no coordinates + %{ + "capacity" => 10, + "name" => [ + %{ + "language" => "fr", + "text" => "Bar" + } + ] + } + ] + } + } + }) + + assert :ok == perform_job(GBFSStationsToGeoData, %{}) + + %DB.GeoDataImport{id: geo_data_import_id} = DB.Repo.get_by!(DB.GeoDataImport, slug: :gbfs_stations) + + assert [ + %DB.GeoData{ + geom: %Geo.Point{coordinates: {2.6278, 45.4542}, srid: 4326}, + payload: %{"capacity" => 20, "name" => "Gare"}, + geo_data_import_id: ^geo_data_import_id + }, + %DB.GeoData{ + geom: %Geo.Point{coordinates: {-2.3353, 47.26095}, srid: 4326}, + payload: %{"capacity" => 30, "name" => "Rond-point"}, + geo_data_import_id: ^geo_data_import_id + } + ] = DB.GeoData |> DB.Repo.all() |> Enum.sort_by(& &1.payload["capacity"]) + end + + test "replaces existing data" do + geo_data_import = %DB.GeoDataImport{slug: :gbfs_stations} |> DB.Repo.insert!() + + %DB.GeoData{ + geom: %Geo.Point{coordinates: {2.6278, 45.4542}, srid: 4326}, + payload: %{"capacity" => 20, "name" => "Gare"}, + geo_data_import_id: geo_data_import.id + } + |> DB.Repo.insert!() + + # No GBFS metadata for stations, existing data should be deleted after running the job + + assert [] == GBFSStationsToGeoData.relevant_gbfs_urls() + + assert :ok == perform_job(GBFSStationsToGeoData, %{}) + + assert DB.GeoData |> DB.Repo.all() |> Enum.empty?() + end + end + + defp setup_responses(responses) do + expect(Transport.HTTPoison.Mock, :get, Enum.count(responses), fn url -> + body = Map.fetch!(responses, url) |> Jason.encode!() + {:ok, %HTTPoison.Response{status_code: 200, body: body, headers: [{"content-type", "application/json"}]}} + end) + end +end diff --git a/apps/transport/test/transport/jobs/geo_data/irve_to_geodata_test.exs b/apps/transport/test/transport/jobs/geo_data/irve_to_geodata_test.exs index dc73358b70..357833af33 100644 --- a/apps/transport/test/transport/jobs/geo_data/irve_to_geodata_test.exs +++ b/apps/transport/test/transport/jobs/geo_data/irve_to_geodata_test.exs @@ -96,7 +96,9 @@ defmodule Transport.Jobs.IRVEToGeoDataTest do assert :ok = perform_job(IRVEToGeoData, %{}) # data is imported - [%{id: geo_data_import_1, resource_history_id: ^id_1}] = DB.GeoDataImport |> DB.Repo.all() + [%DB.GeoDataImport{id: geo_data_import_1, resource_history_id: ^id_1, slug: :irve}] = + DB.GeoDataImport |> DB.Repo.all() + assert DB.GeoData |> DB.Repo.all() |> Enum.count() == 2 # relaunch job @@ -117,7 +119,9 @@ defmodule Transport.Jobs.IRVEToGeoDataTest do assert :ok = perform_job(IRVEToGeoData, %{}) # geo_data and geo_data_import are updated accordingly - [%{id: geo_data_import_2, resource_history_id: ^id_2}] = DB.GeoDataImport |> DB.Repo.all() + [%DB.GeoDataImport{id: geo_data_import_2, resource_history_id: ^id_2, slug: :irve}] = + DB.GeoDataImport |> DB.Repo.all() + assert geo_data_import_2 !== geo_data_import_1 [%{geo_data_import_id: ^geo_data_import_2}, %{geo_data_import_id: ^geo_data_import_2}] = DB.GeoData |> DB.Repo.all() diff --git a/apps/transport/test/transport/jobs/geo_data/lez_to_geo_data_test.exs b/apps/transport/test/transport/jobs/geo_data/lez_to_geo_data_test.exs index 48c3493e45..f840a8fb45 100644 --- a/apps/transport/test/transport/jobs/geo_data/lez_to_geo_data_test.exs +++ b/apps/transport/test/transport/jobs/geo_data/lez_to_geo_data_test.exs @@ -93,7 +93,9 @@ defmodule Transport.Jobs.LowEmissionZonesToGeoDataTest do assert :ok = perform_job(LowEmissionZonesToGeoData, %{}) # data is imported - [%{id: geo_data_import_1, resource_history_id: ^id_0}] = DB.GeoDataImport |> DB.Repo.all() + [%DB.GeoDataImport{id: geo_data_import_1, resource_history_id: ^id_0, slug: :zfe}] = + DB.GeoDataImport |> DB.Repo.all() + assert DB.GeoData |> DB.Repo.all() |> Enum.count() == 1 # relaunch job @@ -114,7 +116,9 @@ defmodule Transport.Jobs.LowEmissionZonesToGeoDataTest do assert :ok = perform_job(LowEmissionZonesToGeoData, %{}) # geo_data and geo_data_import are updated accordingly - [%{id: geo_data_import_2, resource_history_id: ^id_1}] = DB.GeoDataImport |> DB.Repo.all() + [%DB.GeoDataImport{id: geo_data_import_2, resource_history_id: ^id_1, slug: :zfe}] = + DB.GeoDataImport |> DB.Repo.all() + assert geo_data_import_2 !== geo_data_import_1 [%{geo_data_import_id: ^geo_data_import_2}] = DB.GeoData |> DB.Repo.all() diff --git a/apps/transport/test/transport/jobs/geo_data/parkings_relais_to_geo_data_test.exs b/apps/transport/test/transport/jobs/geo_data/parkings_relais_to_geo_data_test.exs index 94d9bd377f..366cd20d33 100644 --- a/apps/transport/test/transport/jobs/geo_data/parkings_relais_to_geo_data_test.exs +++ b/apps/transport/test/transport/jobs/geo_data/parkings_relais_to_geo_data_test.exs @@ -66,7 +66,9 @@ defmodule Transport.Jobs.ParkingsRelaisToGeoDataTest do assert :ok = perform_job(ParkingsRelaisToGeoData, %{}) # data is imported - [%{id: geo_data_import_1, resource_history_id: ^id_0}] = DB.GeoDataImport |> DB.Repo.all() + [%DB.GeoDataImport{id: geo_data_import_1, resource_history_id: ^id_0, slug: :parkings_relais}] = + DB.GeoDataImport |> DB.Repo.all() + assert DB.GeoData |> DB.Repo.all() |> Enum.count() == 1 # relaunch job @@ -87,7 +89,9 @@ defmodule Transport.Jobs.ParkingsRelaisToGeoDataTest do assert :ok = perform_job(ParkingsRelaisToGeoData, %{}) # geo_data and geo_data_import are updated accordingly - [%{id: geo_data_import_2, resource_history_id: ^id_1}] = DB.GeoDataImport |> DB.Repo.all() + [%DB.GeoDataImport{id: geo_data_import_2, resource_history_id: ^id_1, slug: :parkings_relais}] = + DB.GeoDataImport |> DB.Repo.all() + assert geo_data_import_2 !== geo_data_import_1 [%{geo_data_import_id: ^geo_data_import_2}] = DB.GeoData |> DB.Repo.all() diff --git a/config/runtime.exs b/config/runtime.exs index 5942be0b01..915119a8ac 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -121,6 +121,7 @@ oban_prod_crontab = [ {"30 */6 * * *", Transport.Jobs.ParkingsRelaisToGeoData}, {"30 */6 * * *", Transport.Jobs.LowEmissionZonesToGeoData}, {"30 */6 * * *", Transport.Jobs.IRVEToGeoData}, + {"30 6 * * *", Transport.Jobs.GBFSStationsToGeoData}, {"15 10 * * *", Transport.Jobs.DatabaseBackupReplicationJob}, {"0 7 * * *", Transport.Jobs.GTFSRTMultiValidationDispatcherJob}, {"30 7 * * *", Transport.Jobs.GBFSMultiValidationDispatcherJob},