Skip to content

Commit

Permalink
feat(ocsge): adapt code to handle ocsge from drom com
Browse files Browse the repository at this point in the history
  • Loading branch information
alexisig committed Nov 13, 2024
1 parent 905911f commit 4ae1076
Show file tree
Hide file tree
Showing 17 changed files with 148 additions and 41 deletions.
2 changes: 1 addition & 1 deletion airflow/dags/ingest_gpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def ingest():
"-lco",
"GEOMETRY_NAME=geom",
"-a_srs",
"EPSG:4236",
"EPSG:4326",
"-nln",
"gpu_zone_urba",
"-nlt",
Expand Down
14 changes: 11 additions & 3 deletions airflow/dags/ocsge.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
)
from include.pools import DBT_POOL, OCSGE_STAGING_POOL
from include.shapefile import get_shapefile_fields
from include.utils import multiline_string_to_single_line
from include.utils import get_srid_by_departement_code, multiline_string_to_single_line


def get_paths_from_directory(directory: str) -> list[tuple[str, str]]:
Expand Down Expand Up @@ -119,13 +119,16 @@ def load_shapefiles_to_dw(
departement: str,
loaded_date: int,
table_key: str,
mode: Literal["overwrite", "append"] = "append",
mode: Literal["overwrite", "append"],
table_suffix: str = "",
):
local_path = "/tmp/ocsge.7z"
Container().s3().get_file(path, local_path)
extract_dir = tempfile.mkdtemp()
py7zr.SevenZipFile(local_path, mode="r").extractall(path=extract_dir)

srid = get_srid_by_departement_code(departement)

for file_path, filename in get_paths_from_directory(extract_dir):
if not file_path.endswith(".shp"):
continue
Expand All @@ -146,6 +149,9 @@ def load_shapefiles_to_dw(
)
table_name = variables[table_key]

if table_suffix:
table_name += f"_{table_suffix}"

cmd = [
"ogr2ogr",
"-dialect",
Expand All @@ -157,7 +163,7 @@ def load_shapefiles_to_dw(
"-lco",
"GEOMETRY_NAME=geom",
"-a_srs",
"EPSG:2154",
f"EPSG:{srid}",
"-nlt",
"MULTIPOLYGON",
"-nlt",
Expand Down Expand Up @@ -283,6 +289,8 @@ def ingest_ocsge(path, **context) -> int:
departement=departement,
loaded_date=loaded_date,
table_key="dw_source",
mode="append",
table_suffix=departement,
)

return loaded_date
Expand Down
9 changes: 9 additions & 0 deletions airflow/include/ocsge/sources.json
Original file line number Diff line number Diff line change
Expand Up @@ -458,5 +458,14 @@
"2018": "https://data.geopf.fr/telechargement/download/OCSGE/OCS-GE_2-0__SHP_LAMB93_D095_2018-01-01/OCS-GE_2-0__SHP_LAMB93_D095_2018-01-01.7z",
"2021": "https://data.geopf.fr/telechargement/download/OCSGE/OCS-GE_2-0__SHP_LAMB93_D095_2021-01-01/OCS-GE_2-0__SHP_LAMB93_D095_2021-01-01.7z"
}
},
"972": {
"difference": {
"2017_2022": "https://data.geopf.fr/telechargement/download/OCSGE/OCS-GE_2-0_DIFF_SHP_RGAF09UTM20_D972_2017-2022/OCS-GE_2-0_DIFF_SHP_RGAF09UTM20_D972_2017-2022.7z"
},
"occupation_du_sol_et_zone_construite": {
"2017": "https://data.geopf.fr/telechargement/download/OCSGE/OCS-GE_2-0__SHP_RGAF09UTM20_D972_2017-01-01/OCS-GE_2-0__SHP_RGAF09UTM20_D972_2017-01-01.7z",
"2022": "https://data.geopf.fr/telechargement/download/OCSGE/OCS-GE_2-0__SHP_RGAF09UTM20_D972_2022-01-01/OCS-GE_2-0__SHP_RGAF09UTM20_D972_2022-01-01.7z"
}
}
}
59 changes: 59 additions & 0 deletions airflow/include/sql/sparte/macros/gpu/zonage_urbanisme.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@

{% macro zonage_urbanisme(srid, extent_table) %}
{{ config(materialized='table') }}

with extent as (
SELECT ST_transform(ST_Envelope(ST_Union(geom)), 4326) as geom FROM {{ ref(extent_table) }}
)
SELECT
gpu_doc_id,
gpu_status,
gpu_timestamp,
partition,
libelle,
libelle_long,
type_zone,
destination_dominante,
nom_fichier,
url_fichier,
commune_code,
date_approbation,
date_validation,
id_document_urbanisme,
checksum,
new_geom as geom,
srid_source,
ST_Area(new_geom) as surface
FROM (
SELECT
gpu_doc_id,
gpu_status,
gpu_timestamp::timestamptz as gpu_timestamp,
partition,
libelle,
NULLIF(libelong, '') as libelle_long,
typezone as type_zone,
NULLIF(destdomi, '') as destination_dominante,
nomfic as nom_fichier,
NULLIF(urlfic, '') as url_fichier,
NULLIF(insee, '') as commune_code,
TO_DATE(NULLIF(datappro, ''), 'YYYYMMDD') as date_approbation,
TO_DATE(NULLIF(datvalid, ''), 'YYYYMMDD') as date_validation,
NULLIF(idurba, '') as id_document_urbanisme,
checksum,
row_number() OVER (PARTITION BY checksum ORDER BY gpu_timestamp),
{{ make_valid_multipolygon('ST_transform(geom, ' + srid|string + ')') }} as new_geom,
{{ srid }} as srid_source
FROM
{{ source('public', 'gpu_zone_urba') }}
WHERE
{{ raw_date_starts_with_yyyy('datappro') }} AND
{{ raw_date_starts_with_yyyy('datvalid') }} AND
NOT ST_IsEmpty(geom) AND
ST_Intersects(geom, (
SELECT geom FROM extent
))
) as foo
WHERE row_number = 1
AND NOT ST_IsEmpty(new_geom)
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ SELECT
zonage_checksum AS zone_urba,
year,
max(departement) AS departement,
sum(st_area(st_transform(geom, 2154))) / 10000 AS area
sum(st_area(st_transform(geom, srid_source))) / 10000 AS area
FROM
{{ ref('occupation_du_sol_zonage_urbanisme') }}
WHERE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ SELECT
date_validation::text as datvalid,
surface / 10000 as area,
{{ make_valid_multipolygon('ST_Transform(geom, 4326)') }} as mpoly,
4326 AS srid_source
srid_source
FROM
{{ ref('zonage_urbanisme') }}
13 changes: 12 additions & 1 deletion airflow/include/sql/sparte/models/gpu/schema.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@

version: 2

valid_srids: &valid_srids
values: [
32620,
2972,
2975,
2154
]

models:
- name: zonage_urbanisme
columns:
Expand All @@ -12,6 +19,10 @@ models:
- unique
- is_valid_geom
- is_not_empty_geom
- name: srid_source
data_tests:
- not_null
- accepted_values: *valid_srids

sources:
- name: public
Expand Down
42 changes: 11 additions & 31 deletions airflow/include/sql/sparte/models/gpu/zonage_urbanisme.sql
Original file line number Diff line number Diff line change
@@ -1,41 +1,21 @@

{{
config(
materialized='table',
indexes=[
{'columns': ['geom'], 'type': 'gist'},
{'columns': ['libelle'], 'type': 'btree'},
{'columns': ['type_zone'], 'type': 'btree'},
{'columns': ['checksum'], 'type': 'btree'}
{'columns': ['checksum'], 'type': 'btree'},
{'columns': ['srid_source'], 'type': 'btree'}
])
}}

SELECT *, ST_Area(geom) as surface FROM (
SELECT
gpu_doc_id,
gpu_status,
gpu_timestamp::timestamptz as gpu_timestamp,
partition,
libelle,
NULLIF(libelong, '') as libelle_long,
typezone as type_zone,
NULLIF(destdomi, '') as destination_dominante,
nomfic as nom_fichier,
NULLIF(urlfic, '') as url_fichier,
NULLIF(insee, '') as commune_code,
TO_DATE(NULLIF(datappro, ''), 'YYYYMMDD') as date_approbation,
TO_DATE(NULLIF(datvalid, ''), 'YYYYMMDD') as date_validation,
NULLIF(idurba, '') as id_document_urbanisme,
checksum,
row_number() OVER (PARTITION BY checksum ORDER BY gpu_timestamp),
{{ make_valid_multipolygon('ST_transform(geom, 2154)') }} as geom,
2154 as srid_source
FROM
{{ source('public', 'gpu_zone_urba') }}
WHERE
{{ raw_date_starts_with_yyyy('datappro') }} AND
{{ raw_date_starts_with_yyyy('datvalid') }} AND
NOT ST_IsEmpty(geom)
) as foo
WHERE row_number = 1
AND NOT ST_IsEmpty(geom)
SELECT * FROM {{ ref('zonage_urbanisme_guadeloupe') }}
UNION ALL
SELECT * FROM {{ ref('zonage_urbanisme_martinique') }}
UNION ALL
SELECT * FROM {{ ref('zonage_urbanisme_guyane') }}
UNION ALL
SELECT * FROM {{ ref('zonage_urbanisme_reunion') }}
UNION ALL
SELECT * FROM {{ ref('zonage_urbanisme_metropole') }}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{{
zonage_urbanisme(
32620,
"region_guadeloupe"
)
}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{{
zonage_urbanisme(
2972,
"region_guyane"
)
}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{{
zonage_urbanisme(
32620,
"region_martinique"
)
}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{{
zonage_urbanisme(
2154,
"region_metropole"
)
}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{{
zonage_urbanisme(
2975,
"region_reunion"
)
}}
2 changes: 1 addition & 1 deletion airflow/include/sql/sparte/models/ocsge/difference.sql
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ SELECT
foo.departement,
foo.uuid,
foo.geom,
2154 AS srid_source,
ST_Srid(foo.geom) AS srid_source,
to_timestamp(foo.loaded_date) AS loaded_date,
st_area(foo.geom) AS surface,
coalesce(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ SELECT
{{ is_artificial('code_cs', 'code_us') }} AS is_artificial,
uuid::uuid,
st_makevalid(geom) AS geom,
2154 AS srid_source
ST_Srid(geom) AS srid_source
FROM
{{ source('public', 'ocsge_occupation_du_sol') }}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ SELECT
year,
departement,
uuid::uuid,
2154 AS srid_source,
st_srid(geom) AS srid_source,
to_timestamp(loaded_date) AS loaded_date,
st_makevalid(geom) AS geom,
st_area(geom) AS surface
Expand Down
10 changes: 10 additions & 0 deletions airflow/include/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,13 @@ def get_dbt_command_from_directory(
directory="${AIRFLOW_HOME}/include/sql/sparte",
) -> str:
return f'cd "{directory}" && ' + cmd


def get_srid_by_departement_code(departement_code: str) -> int:
if departement_code in ["971", "972"]:
return 32620
if departement_code == "973":
return 2972
if departement_code == "974":
return 2975
return 2154

0 comments on commit 4ae1076

Please sign in to comment.