Sources for all ETL of the Smart Emission Platform. Originally this ETL was developed for the Smart Emission Project Nijmegen and the Intemo Josene Sensor Device (2015-2017). As to accommodate other sensor devices like the EU JRC AirSensEUR, and LuftDaten.info kits, the ETL-framework has been generalized (2018/2019).
Uses deployment-specific variables for databases, passwords etc (not stored in GitHub).
All ETL is developed using Stetl. Stetl is a Python framework and programming model for any ETL process. The essence of Stetl is that each ETL process is a chain of linked Input, Filters and Output Python classes specified in a Stetl Config File.
The .sh
files each invoke a Stetl ETL process via Docker using a Stetl config (.cfg
) file specific
for that ETL process. Stetl is run via Docker.
Additional Python files implement specific ETL modules not defined in the Stetl Framework and are available under the Python smartem package.
All ETL processes are invoked using the same SE Stetl Docker image.
They can be scheduled via Kubernetes or cron
.
The Docker Image is hosted as: smartemission/se-stetl at DockerHub.
The main ETL is multi-step as follows.
The SE ETL follows a "pull" model: raw sensor data is "harvested" from data collector servers and other sensor networks.
The following ETL configs/processes:
- Harvester Whale: get all raw timeseries sensor-values from the Whale API for Intemo Jose sensor devices, see harvester_whale.cfg
- Harvester Influx: get all raw timeseries sensor-values from an InfluxDB, initially for AirSensEUR (ASE) devices, see harvester_influx.cfg
As a result all raw sensor-data is stored in PostGIS using the schema db-schema-raw.sql. The Raw Data fetched via the Harvesters is further processed in Step 2 Refiner.
In this step all raw harvested timeseries data is "refined". Refinement involves the following:
- validation: remove outliers (pre and post)
- conversion: convert raw sensor values to standard units (e.g. temperature milliKelvin to degree Celsius)
- calibration: calibrate raw sensor gas-values to standard units using ANN (e.g. resistance/Ohm to AQ ug/m3 concentration)
- aggregation: make hourly average values for each sensor (''uurwaarden'')
See refiner.cfg and smartem/refiner. In particular the above steps are driven from the type of sensor device. The learning process for ANN calibration is implemented under smartem/calibrator.
As a result of this step, sensor-data timeseries (hour-values) are stored in PostGIS db-schema-refined.sql AND in InfluxDB.
In this step all refined/aggregated timeseries data is published to various IoT/SWE services. The following publishers are present:
- SOSPublisher - publish to a remote SOS via SOS-T(ransactional) protocol sospublisher.cfg
- STAPublisher - publish to a remote SensorThings API (STA) via REST stapublisher.cfg
All publication/output ETL uses plain Python string templates (no need for Jinja2 yet) with parameter substitution, e.g. smartem/publisher/sostemplates for SOS and smartem/publisher/statemplates for STA.
NB publication to WFS and WMS is not explicitly required: these services directly use the timeseries refined tables and Postgres VIEWs from Step 2.
This step is special: it is a pass-through from the Raw Sensor API to a single table with (refined) last values for all sensors for the SOS emulation API (sosemu). This ETL process originated historically as no SOS and STA was initially available but the project needed to develop the SmartApp with last values.
- Last: get and convert last sensor-values for all devices: last.cfg.
As a result this raw sensor-data is stored in PostGIS db-schema-last.sql.
(Currently only for Intemo Josene devices)
In order to collect reference data and generate the ANN Calibration Estimator, three additional ETL processes have been added later in the project (dec 2016):
- Extractor: to extract raw (Jose) Sensor Values from the Harvested (Step 1) RawDBInput into InfluxDB
- Harvester_RIVM: to extract calibrated gas samples (hour averages) from RIVM LML SOS into InfluxDB
The above two datasets in InfluxDB are used to generate the ANN Calibration Estimator object by running the Calibrator ETL process:
- Calibrator: to read/merge RIVM and Jose values from InfluxDB to create the ANN Estimator object (pickled)
Deployment options per ETL process. This mainly involves setting the proper environment variables.
The convention is to use stetl_
names for variable names in the config files.
For example pg_database
within last.cfg becomes stetl_pg_database
within
a K8s or other Docker deployment.
Config file: last.cfg.
The following environment vars need to be set, either via docker-compose
or Kubernetes.
Environment variable |
---|
stetl_raw_device_url_1 |
stetl_raw_device_url_2 |
stetl_intemo_token |
stetl_pg_host |
stetl_pg_database |
stetl_pg_user |
stetl_pg_password |
stetl_pg_schema_rt |
stetl_pg_schema_calibrated |
Config file: harvester_whale.cfg.
The following environment vars need to be set, either via docker-compose
or
Kubernetes.
Environment variable |
---|
stetl_raw_device_url_1 |
stetl_raw_device_url_2 |
stetl_intemo_token |
stetl_pg_host |
stetl_pg_database |
stetl_pg_user |
stetl_pg_password |
stetl_pg_schema_raw |
stetl_max_proc_time_secs |
stetl_api_interval_secs |
Config file: harvester_rivm.cfg.
The following environment vars need to be set, either via docker-compose
or
Kubernetes.
Environment variable |
---|
stetl_rivm_sos_base_url |
stetl_max_proc_time_secs |
stetl_api_interval_secs |
stetl_pg_host |
stetl_pg_database |
stetl_pg_user |
stetl_pg_password |
stetl_pg_schema_raw |
stetl_pg_schema_harvest_rivm |
stetl_influx_host |
stetl_influx_port |
stetl_influx_se_database |
stetl_influx_se_measurement_rivm |
stetl_influx_se_writer |
stetl_influx_se_writer_password |
Config file: harvester_influx.cfg.
The following environment vars need to be set, either via docker-compose
or
Kubernetes.
Environment variable |
---|
stetl_influx_dc1_host |
stetl_influx_port |
stetl_influx_as_database |
stetl_influx_as_reader |
stetl_influx_as_reader_password |
stetl_max_proc_time_secs |
stetl_pg_host |
stetl_pg_database |
stetl_pg_user |
stetl_pg_password |
stetl_pg_schema_raw |
Config file: refiner.cfg.
The following environment vars need to be set, either via docker-compose
or
Kubernetes.
Environment variable |
---|
stetl_refiner_max_input_records |
stetl_refiner_raw_read_once |
stetl_pg_host |
stetl_pg_database |
stetl_pg_user |
stetl_pg_password |
stetl_pg_schema_raw |
stetl_pg_schema_refined |
stetl_pg_schema_calibrated |
stetl_influx_host |
stetl_influx_port |
stetl_influx_se_database |
stetl_influx_se_measurement_refined |
stetl_influx_se_writer |
stetl_influx_se_writer_password |
Config file: stapublisher.cfg.
The following environment vars need to be set, either via docker-compose
or
Kubernetes.
Environment variable |
---|
stetl_pg_host |
stetl_pg_database |
stetl_pg_user |
stetl_pg_password |
stetl_pg_schema_refined |
stetl_stapublisher_max_input_records |
stetl_sta_host |
stetl_sta_port |
stetl_sta_path |
stetl_sta_user |
stetl_sta_password |
Config file: sospublisher.cfg.
The following environment vars need to be set, either via docker-compose
or
Kubernetes.
Environment variable |
---|
stetl_pg_host |
stetl_pg_database |
stetl_pg_user |
stetl_pg_password |
stetl_pg_schema_refined |
stetl_sospublisher_max_input_records |
stetl_sos_host |
stetl_sos_port |
stetl_sos_path |
Config file: extractor.cfg.
The following environment vars need to be set, either via docker-compose
or
Kubernetes.
Environment variable |
---|
stetl_extractor_max_input_records |
stetl_extractor_raw_read_once |
stetl_pg_host |
stetl_pg_database |
stetl_pg_user |
stetl_pg_password |
stetl_pg_schema_raw |
stetl_pg_schema_refined |
stetl_pg_schema_extracted |
stetl_influx_host |
stetl_influx_port |
stetl_influx_se_database |
stetl_influx_se_measurement_extract |
stetl_influx_se_writer |
stetl_influx_se_writer_password |