Skip to content

Commit

Permalink
cria flows de materiazacao da bilhetagem 2
Browse files Browse the repository at this point in the history
  • Loading branch information
pixuimpou committed Jan 13, 2025
1 parent e90c065 commit 8692bab
Show file tree
Hide file tree
Showing 35 changed files with 597 additions and 671 deletions.
2 changes: 2 additions & 0 deletions pipelines/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
from pipelines.migration.veiculo.flows import * # noqa
from pipelines.serpro.flows import * # noqa
from pipelines.treatment.bilhetagem.flows import * # noqa
from pipelines.treatment.cadastro.flows import * # noqa
from pipelines.treatment.datario.flows import * # noqa
from pipelines.treatment.financeiro.flows import * # noqa
from pipelines.treatment.monitoramento.flows import * # noqa
from pipelines.treatment.planejamento.flows import * # noqa
from pipelines.treatment.validacao_dados_jae.flows import * # noqa
3 changes: 2 additions & 1 deletion pipelines/migration/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
source_type = Parameter("source_type", default=None)
interval_minutes = Parameter("interval_minutes", default=None)
recapture = Parameter("recapture", default=False)
recapture_window_days = Parameter("recapture_window_days", default=1)
recapture_window_days = Parameter("recapture_window_days", default=5)
timestamp = Parameter("timestamp", default=None)

# Parâmetros Pré-tratamento #
Expand All @@ -81,6 +81,7 @@
datetime_filter=current_timestamp,
interval_minutes=interval_minutes,
recapture_window_days=recapture_window_days,
max_recaptures=500,
)

with case(recapture, False):
Expand Down
28 changes: 27 additions & 1 deletion pipelines/treatment/bilhetagem/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from datetime import datetime
from enum import Enum

from pipelines.schedules import create_daily_cron
from pipelines.schedules import create_daily_cron, create_hourly_cron
from pipelines.treatment.templates.utils import DBTSelector


Expand All @@ -15,6 +15,32 @@ class constants(Enum): # pylint: disable=c0103
Valores constantes para materialização dos dados de bilhetagem
"""

TRANSACAO_SELECTOR = DBTSelector(
name="transacao",
schedule_cron=create_hourly_cron(),
initial_datetime=datetime(2025, 1, 16, 0, 0, 0),
incremental_delay_hours=1,
)

PASSAGEIRO_HORA_SELECTOR = DBTSelector(
name="passageiro_hora",
schedule_cron=create_hourly_cron(minute=10),
initial_datetime=datetime(2025, 1, 16, 0, 0, 0),
)

GPS_VALIDADOR_SELECTOR = DBTSelector(
name="gps_validador",
schedule_cron=create_hourly_cron(),
initial_datetime=datetime(2025, 1, 16, 0, 0, 0),
incremental_delay_hours=1,
)

INTEGRACAO_SELECTOR = DBTSelector(
name="integracao",
schedule_cron=create_daily_cron(hour=5, minute=15),
initial_datetime=datetime(2025, 1, 16, 0, 0, 0),
)

TRANSACAO_ORDEM_SELECTOR = DBTSelector(
name="transacao_ordem",
schedule_cron=create_daily_cron(hour=6, minute=10),
Expand Down
45 changes: 45 additions & 0 deletions pipelines/treatment/bilhetagem/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,59 @@
DBT: 2024-11-27 2
"""

from pipelines.capture.jae.constants import constants as jae_constants
from pipelines.constants import constants as smtr_constants
from pipelines.migration.br_rj_riodejaneiro_bilhetagem.constants import (
constants as old_constants,
)
from pipelines.schedules import create_daily_cron
from pipelines.treatment.bilhetagem.constants import constants
from pipelines.treatment.cadastro.constants import constants as cadastro_constants
from pipelines.treatment.templates.flows import create_default_materialization_flow

TRANSACAO_MATERIALIZACAO = create_default_materialization_flow(
flow_name="transacao - materializacao",
selector=constants.TRANSACAO_SELECTOR.value,
agent_label=smtr_constants.RJ_SMTR_AGENT_LABEL.value,
wait=[
jae_constants.TRANSACAO_SOURCE.value,
jae_constants.TRANSACAO_RIOCARD_SOURCE.value,
jae_constants.JAE_AUXILIAR_SOURCES.value["produto"],
jae_constants.JAE_AUXILIAR_SOURCES.value["cliente"],
jae_constants.JAE_AUXILIAR_SOURCES.value["gratuidade"],
cadastro_constants.CADASTRO_SELECTOR.value,
],
)

PASSAGEIRO_HORA_MATERIALIZACAO = create_default_materialization_flow(
flow_name="passageiro_hora - materializacao",
selector=constants.PASSAGEIRO_HORA_SELECTOR.value,
agent_label=smtr_constants.RJ_SMTR_AGENT_LABEL.value,
wait=[
constants.TRANSACAO_SELECTOR.value,
],
)

GPS_VALIDADOR_MATERIALIZACAO = create_default_materialization_flow(
flow_name="gps_validador - materializacao",
selector=constants.GPS_VALIDADOR_SELECTOR.value,
agent_label=smtr_constants.RJ_SMTR_AGENT_LABEL.value,
wait=[
cadastro_constants.CADASTRO_SELECTOR.value,
jae_constants.GPS_VALIDADOR_SOURCE.value,
],
)

INTEGRACAO_MATERIALIZACAO = create_default_materialization_flow(
flow_name="integracao - materializacao",
selector=constants.INTEGRACAO_SELECTOR.value,
agent_label=smtr_constants.RJ_SMTR_AGENT_LABEL.value,
wait=[
cadastro_constants.CADASTRO_SELECTOR.value,
jae_constants.INTEGRACAO_SOURCE.value,
],
)

ordem_pagamento_materialize_params = (
old_constants.BILHETAGEM_MATERIALIZACAO_ORDEM_PAGAMENTO_PARAMS.value
)
Expand Down
Empty file.
22 changes: 22 additions & 0 deletions pipelines/treatment/cadastro/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# -*- coding: utf-8 -*-
"""
Valores constantes para materialização dos dados de cadastro
"""

from datetime import datetime
from enum import Enum

from pipelines.schedules import create_hourly_cron
from pipelines.treatment.templates.utils import DBTSelector


class constants(Enum): # pylint: disable=c0103
"""
Valores constantes para materialização dos dados de cadastro
"""

CADASTRO_SELECTOR = DBTSelector(
name="cadastro",
schedule_cron=create_hourly_cron(minute=10),
initial_datetime=datetime(2025, 12, 16, 0, 0, 0),
)
22 changes: 22 additions & 0 deletions pipelines/treatment/cadastro/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# -*- coding: utf-8 -*-
"""
Flows de tratamento dos dados de cadastro
"""

from pipelines.capture.jae.constants import constants as jae_constants
from pipelines.constants import constants as smtr_constants
from pipelines.treatment.cadastro.constants import constants
from pipelines.treatment.templates.flows import create_default_materialization_flow

CADASTRO_MATERIALIZACAO = create_default_materialization_flow(
flow_name="cadastro - materializacao",
selector=constants.CADASTRO_SELECTOR.value,
agent_label=smtr_constants.RJ_SMTR_AGENT_LABEL.value,
wait=[
jae_constants.JAE_AUXILIAR_SOURCES.value["linha"],
jae_constants.JAE_AUXILIAR_SOURCES.value["operadora_transporte"],
jae_constants.JAE_AUXILIAR_SOURCES.value["pessoa_fisica"],
jae_constants.JAE_AUXILIAR_SOURCES.value["consorcio"],
jae_constants.JAE_AUXILIAR_SOURCES.value["linha_consorcio_operadora_transporte"],
],
)
Empty file.
22 changes: 22 additions & 0 deletions pipelines/treatment/financeiro/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# -*- coding: utf-8 -*-
"""
Valores constantes para materialização dos dados financeiros
"""

from datetime import datetime
from enum import Enum

from pipelines.schedules import create_daily_cron
from pipelines.treatment.templates.utils import DBTSelector


class constants(Enum): # pylint: disable=c0103
"""
Valores constantes para materialização dos dados financeiros
"""

FINANCEIRO_BILHETAGEM_SELECTOR = DBTSelector(
name="financeiro_bilhetagem",
schedule_cron=create_daily_cron(hour=5, minute=15),
initial_datetime=datetime(2025, 12, 16, 0, 0, 0),
)
18 changes: 18 additions & 0 deletions pipelines/treatment/financeiro/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# -*- coding: utf-8 -*-
"""
Flows de tratamento dos dados financeiros
"""

from pipelines.capture.jae.constants import constants as jae_constants
from pipelines.constants import constants as smtr_constants
from pipelines.treatment.cadastro.constants import constants as cadastro_constants
from pipelines.treatment.financeiro.constants import constants
from pipelines.treatment.templates.flows import create_default_materialization_flow

FINANCEIRO_BILHETAGEM_MATERIALIZACAO = create_default_materialization_flow(
flow_name="financeiro_bilhetagem - materializacao",
selector=constants.FINANCEIRO_BILHETAGEM_SELECTOR.value,
agent_label=smtr_constants.RJ_SMTR_AGENT_LABEL.value,
wait=[cadastro_constants.CADASTRO_SELECTOR.value]
+ jae_constants.ORDEM_PAGAMENTO_SOURCES.value.values(),
)
2 changes: 1 addition & 1 deletion queries/models/bilhetagem/gps_validador.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
materialized="incremental",
partition_by={"field": "data", "data_type": "date", "granularity": "day"},
tags=["geolocalizacao"],
schema="br_rj_riodejaneiro_bilhetagem",
)
}}

Expand All @@ -15,7 +16,6 @@ select
id_operadora,
operadora,
id_servico_jae,
-- s.servico,
servico_jae,
descricao_servico_jae,
case
Expand Down
2 changes: 1 addition & 1 deletion queries/models/bilhetagem/gps_validador_van.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
materialized="incremental",
partition_by={"field": "data", "data_type": "date", "granularity": "day"},
tags=["geolocalizacao"],
schema="br_rj_riodejaneiro_bilhetagem",
)
}}

Expand All @@ -15,7 +16,6 @@ select
id_operadora,
operadora,
id_servico_jae,
-- s.servico,
servico_jae,
descricao_servico_jae,
id_veiculo,
Expand Down
2 changes: 1 addition & 1 deletion queries/models/bilhetagem/passageiro_hora.sql
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ select
* except (id_transacao, geo_point_transacao),
count(id_transacao) as quantidade_passageiros,
'{{ var("version") }}' as versao
from {{ ref("aux_passageiros_hora") }}
from {{ ref("aux_passageiro_hora") }}
where
{% if is_incremental() %}
{% if partition_list | length > 0 %} data in ({{ partition_list | join(", ") }})
Expand Down
2 changes: 1 addition & 1 deletion queries/models/bilhetagem/passageiro_tile_hora.sql
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ select
geo.tile_id,
count(id_transacao) as quantidade_passageiros,
'{{ var("version") }}' as versao
from {{ ref("aux_passageiros_hora") }} p
from {{ ref("aux_passageiro_hora") }} p
join {{ ref("aux_h3_res9") }} geo on st_contains(geo.geometry, geo_point_transacao)
where
{% if is_incremental() %}
Expand Down
Loading

0 comments on commit 8692bab

Please sign in to comment.