Skip to content

Commit

Permalink
Import des données des stations GBFS dans GeoData (#4325)
Browse files Browse the repository at this point in the history
* GeoDataImport : ajoute colonne slug

* Ajout job GBFSStationsToGeoData

* Ignore virtual stations/stations with invalid coordinates

* Set Ecto type for slug as Ecto.Enum
  • Loading branch information
AntoineAugusti authored Nov 26, 2024
1 parent affa80f commit a0a52d1
Show file tree
Hide file tree
Showing 14 changed files with 392 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule DB.GeoDataImport do

typed_schema "geo_data_import" do
belongs_to(:resource_history, DB.ResourceHistory)
field(:slug, Ecto.Enum, values: Transport.ConsolidatedDataset.geo_data_datasets() ++ [:gbfs_stations])
has_many(:geo_data, DB.GeoData)

timestamps(type: :utc_datetime_usec)
Expand Down
81 changes: 64 additions & 17 deletions apps/transport/lib/jobs/geo_data/base.ex
Original file line number Diff line number Diff line change
@@ -1,61 +1,103 @@
defmodule Transport.Jobs.BaseGeoData do
@moduledoc """
Shared methods for GeoData import jobs.
Provides methods to import/replace data for:
- consolidated datasets which should be replaced when we have a newer resource history
- anything else, identified by a slug, which should be replaced everytime
"""
require Logger

@import_timeout :timer.seconds(60)
@consolidated_datasets_slugs Transport.ConsolidatedDataset.geo_data_datasets()

def insert_data(geo_data_import_id, prepare_data_for_insert_fn) do
prepare_data_for_insert_fn.(geo_data_import_id)
|> Stream.each(&DB.Repo.insert_all(DB.GeoData, &1))
|> Stream.run()
end

def insert_data(body, geo_data_import_id, prepare_data_for_insert_fn) do
body
|> prepare_data_for_insert_fn.(geo_data_import_id)
|> Stream.chunk_every(1000)
|> Stream.chunk_every(1_000)
|> Stream.each(&DB.Repo.insert_all(DB.GeoData, &1))
|> Stream.run()
end

def needs_import?(
%DB.ResourceHistory{id: latest_resource_history_id},
%DB.GeoDataImport{resource_history_id: resource_history_id}
),
do: latest_resource_history_id != resource_history_id
defp needs_import?(
%DB.ResourceHistory{id: latest_resource_history_id},
%DB.GeoDataImport{resource_history_id: resource_history_id}
),
do: latest_resource_history_id != resource_history_id

def needs_import?(_, nil), do: true
defp needs_import?(_, nil), do: true

def import_replace_data(%DB.Resource{id: resource_id, dataset_id: dataset_id}, prepare_data_for_insert_fn) do
# For a static resource, associated to a consolidated dataset (BNLC, BNZFE, IRVE etc)
# We rely on the relevant `DB.ResourceHistory` to determine if we should replace the data.
def import_replace_data(slug, prepare_data_for_insert_fn) when slug in @consolidated_datasets_slugs do
%DB.Resource{id: resource_id, dataset_id: dataset_id} = Transport.ConsolidatedDataset.resource(slug)
latest_resource_history = DB.ResourceHistory.latest_resource_history(resource_id)
current_geo_data_import = DB.GeoDataImport.dataset_latest_geo_data_import(dataset_id)

if needs_import?(latest_resource_history, current_geo_data_import) do
Logger.info("New content detected...update content")
perform_import(current_geo_data_import, latest_resource_history, prepare_data_for_insert_fn)
perform_import(current_geo_data_import, latest_resource_history, slug, prepare_data_for_insert_fn)
end

:ok
end

# For a non-static resource: we don't rely on a `DB.ResourceHistory` associated to a static
# resource to determine if we should replace the data, it should always be replaced.
def import_replace_data(slug, prepare_data_for_insert_fn) when is_atom(slug) do
current_geo_data_import = DB.Repo.get_by(DB.GeoDataImport, slug: slug)

Logger.info("geo_data for a slug is always replaced… Updating content for #{slug}")
perform_import(current_geo_data_import, slug, prepare_data_for_insert_fn)

:ok
end

# For a consolidated dataset, we rely on the latest resource history content to perform the import.
defp perform_import(
current_geo_data_import,
%DB.ResourceHistory{id: latest_resource_history_id, payload: %{"permanent_url" => permanent_url}},
slug,
prepare_data_for_insert_fn
) do
)
when slug in @consolidated_datasets_slugs do
DB.Repo.transaction(
fn ->
unless is_nil(current_geo_data_import) do
# thanks to cascading delete, it will also clean geo_data table corresponding entries
# Thanks to cascading delete, it will also clean geo_data table corresponding entries
current_geo_data_import |> DB.Repo.delete!()
end

%{id: geo_data_import_id} = DB.Repo.insert!(%DB.GeoDataImport{resource_history_id: latest_resource_history_id})
%{id: geo_data_import_id} =
%DB.GeoDataImport{resource_history_id: latest_resource_history_id, slug: slug} |> DB.Repo.insert!()

http_client = Transport.Shared.Wrapper.HTTPoison.impl()
%{status_code: 200, body: body} = http_client.get!(permanent_url)
%HTTPoison.Response{status_code: 200, body: body} = http_client.get!(permanent_url)
insert_data(body, geo_data_import_id, prepare_data_for_insert_fn)
end,
timeout: 60_000
timeout: @import_timeout
)
end

# keep 6 digits for WGS 84, see https://en.wikipedia.org/wiki/Decimal_degrees#Precision
def parse_coordinate(s) do
s |> string_to_float() |> Float.round(6)
defp perform_import(current_geo_data_import, slug, prepare_data_for_insert_fn) when is_atom(slug) do
DB.Repo.transaction(
fn ->
unless is_nil(current_geo_data_import) do
# Thanks to cascading delete, it will also clean geo_data table corresponding entries
current_geo_data_import |> DB.Repo.delete!()
end

%DB.GeoDataImport{id: geo_data_import_id} = DB.Repo.insert!(%DB.GeoDataImport{slug: slug})
insert_data(geo_data_import_id, prepare_data_for_insert_fn)
end,
timeout: @import_timeout
)
end

def prepare_csv_data_for_import(body, prepare_data_fn, opts \\ []) do
Expand All @@ -75,6 +117,11 @@ defmodule Transport.Jobs.BaseGeoData do
|> Stream.map(prepare_data_fn)
end

# keep 6 digits for WGS 84, see https://en.wikipedia.org/wiki/Decimal_degrees#Precision
def parse_coordinate(s) do
s |> string_to_float() |> Float.round(6)
end

# remove spaces (U+0020) and non-break spaces (U+00A0) from the string
defp string_to_float(s), do: s |> String.trim() |> String.replace([" ", " "], "") |> String.to_float()
end
7 changes: 2 additions & 5 deletions apps/transport/lib/jobs/geo_data/bnlc_to_geo_data.ex
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
defmodule Transport.Jobs.BNLCToGeoData do
@moduledoc """
Job in charge of taking the content of the BNLC (Base Nationale de Covoiturage) and storing it
in the geo_data table
in the `geo_data` table
"""
use Oban.Worker, max_attempts: 3
require Logger

@impl Oban.Worker
def perform(%{}) do
Transport.ConsolidatedDataset.resource(:bnlc)
|> Transport.Jobs.BaseGeoData.import_replace_data(&prepare_data_for_insert/2)

:ok
Transport.Jobs.BaseGeoData.import_replace_data(:bnlc, &prepare_data_for_insert/2)
end

def prepare_data_for_insert(body, geo_data_import_id) do
Expand Down
93 changes: 93 additions & 0 deletions apps/transport/lib/jobs/geo_data/gbfs_stations_to_geo_data.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
defmodule Transport.Jobs.GBFSStationsToGeoData do
@moduledoc """
Job in charge of importing GBFS stations data (docks for bikes usually) to the `geo_data` table.
It ignores virtual stations and stations with invalid coordinates.
"""
use Oban.Worker, max_attempts: 3
import Ecto.Query

# The number of workers to run in parallel when fetching GBFS feeds
@task_concurrency 5
@task_timeout :timer.seconds(15)

@impl Oban.Worker
def perform(%Oban.Job{}) do
Transport.Jobs.BaseGeoData.import_replace_data(:gbfs_stations, &prepare_data_for_insert/1)
end

def prepare_data_for_insert(geo_data_import_id) do
relevant_gbfs_urls()
|> Task.async_stream(
fn url -> prepare_stations_data(url, geo_data_import_id) end,
max_concurrency: @task_concurrency,
on_timeout: :kill_task,
timeout: @task_timeout
)
|> Stream.filter(fn {status, _} -> status == :ok end)
|> Stream.map(fn {:ok, value} -> value end)
end

def prepare_stations_data(gbfs_url, geo_data_import_id) do
with {:ok, %HTTPoison.Response{status_code: 200, body: body}} <- http_client().get(gbfs_url),
{:ok, json} <- Jason.decode(body),
{:ok, feed_url} <- {:ok, Transport.GBFSMetadata.feed_url_by_name(json, :station_information)},
{:feed_exists, true} <- {:feed_exists, not is_nil(feed_url)},
{:ok, %HTTPoison.Response{status_code: 200, body: body}} <- http_client().get(feed_url),
{:ok, json} <- Jason.decode(body) do
json["data"]["stations"]
|> Enum.reject(&(virtual_station?(&1) or missing_coordinates?(&1)))
|> Enum.map(fn station ->
%{
geo_data_import_id: geo_data_import_id,
geom: %Geo.Point{coordinates: {station["lon"], station["lat"]}, srid: 4326},
payload: %{
capacity: station["capacity"],
name: station_name(station)
}
}
end)
else
_ -> []
end
end

defp virtual_station?(%{"is_virtual_station" => true}), do: true
defp virtual_station?(%{}), do: false

defp missing_coordinates?(%{"lat" => lat, "lon" => lon}) do
is_nil(lon) or is_nil(lat)
end

defp missing_coordinates?(%{}), do: true

# From GBFS 1.1 until 2.3
defp station_name(%{"name" => name}) when is_binary(name), do: name

# From GBFS 3.0 onwards
defp station_name(%{"name" => names}) do
names |> hd() |> Map.get("text")
end

@doc """
Fetches relevant GBFS feeds for which we know they have stations data, based on the metadata we compute.
Look at the last week of metadata to prevent potential upstream downtimes/
issues while computing metadata on our side.
"""
def relevant_gbfs_urls do
a_week_ago = DateTime.utc_now() |> DateTime.add(-7, :day)

DB.ResourceMetadata.base_query()
|> join(:inner, [metadata: m], r in DB.Resource, on: r.id == m.resource_id, as: :resource)
|> where([resource: r], r.format == "gbfs")
|> where([metadata: m], m.inserted_at >= ^a_week_ago and fragment("?->'types' \\? 'stations'", m.metadata))
|> select(
[resource: r, metadata: m],
last_value(r.url) |> over(partition_by: m.resource_id, order_by: m.resource_id)
)
|> distinct(true)
|> DB.Repo.all()
end

defp http_client, do: Transport.Shared.Wrapper.HTTPoison.impl()
end
5 changes: 1 addition & 4 deletions apps/transport/lib/jobs/geo_data/irve_to_geo_data.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@ defmodule Transport.Jobs.IRVEToGeoData do

@impl Oban.Worker
def perform(%Oban.Job{}) do
Transport.ConsolidatedDataset.resource(:irve)
|> Transport.Jobs.BaseGeoData.import_replace_data(&prepare_data_for_insert/2)

:ok
Transport.Jobs.BaseGeoData.import_replace_data(:irve, &prepare_data_for_insert/2)
end

def prepare_data_for_insert(body, geo_data_import_id) do
Expand Down
5 changes: 1 addition & 4 deletions apps/transport/lib/jobs/geo_data/lez_to_geo_data.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@ defmodule Transport.Jobs.LowEmissionZonesToGeoData do

@impl Oban.Worker
def perform(%{}) do
Transport.ConsolidatedDataset.resource(:zfe)
|> Transport.Jobs.BaseGeoData.import_replace_data(&prepare_data_for_insert/2)

:ok
Transport.Jobs.BaseGeoData.import_replace_data(:zfe, &prepare_data_for_insert/2)
end

def prepare_data_for_insert(body, geo_data_import_id) do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@ defmodule Transport.Jobs.ParkingsRelaisToGeoData do

@impl Oban.Worker
def perform(%{}) do
Transport.ConsolidatedDataset.resource(:parkings_relais)
|> Transport.Jobs.BaseGeoData.import_replace_data(&prepare_data_for_insert/2)

:ok
Transport.Jobs.BaseGeoData.import_replace_data(:parkings_relais, &prepare_data_for_insert/2)
end

defp pr_count(""), do: 0
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
defmodule DB.Repo.Migrations.GeoDataImportAddSlug do
use Ecto.Migration

def change do
alter table(:geo_data_import) do
add(:slug, :string)
end

create(unique_index(:geo_data_import, [:slug]))
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ defmodule Transport.Jobs.BNLCToGeoDataTest do
assert :ok = perform_job(BNLCToGeoData, %{})

# data is imported
[%{id: geo_data_import_1, resource_history_id: ^id_0}] = DB.GeoDataImport |> DB.Repo.all()
[%DB.GeoDataImport{id: geo_data_import_1, resource_history_id: ^id_0, slug: :bnlc}] =
DB.GeoDataImport |> DB.Repo.all()

assert DB.GeoData |> DB.Repo.all() |> Enum.count() == 2

# relaunch job
Expand All @@ -92,7 +94,9 @@ defmodule Transport.Jobs.BNLCToGeoDataTest do
assert :ok = perform_job(BNLCToGeoData, %{})

# geo_data and geo_data_import are updated accordingly
[%{id: geo_data_import_2, resource_history_id: ^id_1}] = DB.GeoDataImport |> DB.Repo.all()
[%DB.GeoDataImport{id: geo_data_import_2, resource_history_id: ^id_1, slug: :bnlc}] =
DB.GeoDataImport |> DB.Repo.all()

assert geo_data_import_2 !== geo_data_import_1

[%{geo_data_import_id: ^geo_data_import_2}, %{geo_data_import_id: ^geo_data_import_2}] = DB.GeoData |> DB.Repo.all()
Expand Down
Loading

0 comments on commit a0a52d1

Please sign in to comment.