From fd4860c399c9bb85dcca1b82031899bb44dec86a Mon Sep 17 00:00:00 2001 From: Gabriel Fosse <67290377+majesticio@users.noreply.github.com> Date: Mon, 8 Apr 2024 15:46:13 -0700 Subject: [PATCH] Recover data from Israel (#1100) * removed lodash * Refactor to use client * cleaned up * cleaned up * Add Israel Envista adapter and update adapter name in source configuration * Refactor data filtering and value assignment in israel-envista.js * ignore system wide error code where value is 9999 or -9999 in unifyMeasurements units * Update averaging period to 5 minutes in israel-envista.js --------- Co-authored-by: Gabriel Fosse --- src/adapters/envista.js | 168 ------------------------- src/adapters/israel-envista.js | 217 +++++++++++++++++++++++++++++++++ src/lib/utils.js | 5 + src/sources/il.json | 6 +- 4 files changed, 225 insertions(+), 171 deletions(-) delete mode 100644 src/adapters/envista.js create mode 100644 src/adapters/israel-envista.js diff --git a/src/adapters/envista.js b/src/adapters/envista.js deleted file mode 100644 index d9c35f3a..00000000 --- a/src/adapters/envista.js +++ /dev/null @@ -1,168 +0,0 @@ -'use strict'; - -import { default as baseRequest } from 'request'; -import { REQUEST_TIMEOUT } from '../lib/constants.js'; -import { DateTime } from 'luxon'; -const { difference, flattenDeep } = pkg; -import pkg from 'lodash'; -import { parallel, parallelLimit } from 'async'; -import { convertUnits, unifyMeasurementUnits } from '../lib/utils.js'; - -const headers = { - 'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:54.0) Gecko/20100101 Firefox/54.0', - 'Accept': 'text/html,application/json,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', - 'Content-Type': 'text/html; charset=utf-8', - 'envi-data-source': 'MANA', - 'Authorization': 'ApiToken ' + `${process.env.ISRAEL_ENVISTA_TOKEN}` -}; - -const requestHeaders = baseRequest.defaults({ - timeout: REQUEST_TIMEOUT, - rejectUnauthorized: false, // set due to self-signed cert - strictSSL: false, - headers: headers -}); - -export const name = 'envista'; - -export function fetchData (source, cb) { - let regionListUrl = source.url + 'regions'; - requestHeaders(regionListUrl, (err, res, body) => { - if (err || res.statusCode !== 200) { - return cb({message: 'Failure to load data url.'}); - } - - let tasks = []; - const regionList = JSON.parse(body); - regionList.forEach(region => { - tasks.push(handleRegion(source, region)); - }); - - parallel(tasks, (err, results) => { - if (err) { - return cb(err, []); - } - results = flattenDeep(results); - results = convertUnits(results); - return cb(err, {name: 'unused', measurements: results}); - }); - }); -} - -const handleRegion = function (source, region) { - let stationList = region.stations; - return function (done) { - let tasks = []; - stationList.forEach(station => { - if (station.active && hasAcceptedParameters(station)) { - tasks.push(handleStation(source, region.name, station)); - } - }); - - let limit = 15 // Magic number to avoid rate limiting is 16. - parallelLimit(tasks, limit, (err, results) => { - if (err) { - return done(err, []); - } - return done(null, results); - }); - }; -}; - -const handleStation = function (source, regionName, station) { - return function (done) { - let stationUrl = source.url + 'stations/' + station.stationId + '/data/latest'; - requestHeaders(stationUrl, (err, res, body) => { - if (err || res.statusCode !== 200) { - return done(null, []); - } - - const data = JSON.parse(body); - try { - formatData(source, regionName, station, data, (measurements) => { - return done(null, measurements); - }); - } catch (err) { - return done(null, []); - } - }); - }; -}; - -const formatData = function (source, regionName, station, data, cb) { - const base = { - location: station.name, - city: regionName, - coordinates: { - latitude: parseFloat(station.location.latitude), - longitude: parseFloat(station.location.longitude) - }, - averagingPeriod: { unit: 'hours', value: 0.25 }, // Believed to update every 15 minutes - attribution: [{ - name: source.organization, - url: source.url - }] - }; - - const measurements = data.data.map(datapoint => formatChannels(base, station, datapoint)); - return cb(measurements); -}; - -const formatChannels = function (base, station, datapoint) { - base.date = getDate(datapoint.datetime); - const datapoints = datapoint.channels.map(channel => { - if (isAcceptedParameter(channel.name)) { - return getMeasurement(base, station, channel); - } - }); - const filteredData = datapoints.filter(point => (point)); // removes undefined/invalid measurements - return filteredData; -}; - -const hasAcceptedParameters = function (station) { - const stationParameters = station.monitors.map(monitor => monitor.name.toLowerCase().split('.').join("")); - const stationAcceptableParameters = difference(acceptableParameters, stationParameters); - return Boolean(stationAcceptableParameters); -}; - -const isAcceptedParameter = function (parameter) { - return acceptableParameters.includes(parameter.toLowerCase().split('.').join("")); -}; - -const getMeasurement = function (base, station, channel) { - let measurement = Object.assign({}, base); - let parameterName = channel.name.toLowerCase().split('.').join(""); - measurement.parameter = parameterName; - measurement.value = channel.value - measurement.unit = getUnit(station, channel); - measurement = unifyMeasurementUnits(measurement); - return measurement; -}; - -const getUnit = function (station, channel) { - return station.monitors.find(monitor => monitor.channelId === channel.id).units; -}; - -function getDate(value) { - const dt = DateTime.fromISO(value).setZone('Asia/Jerusalem'); - const utc = dt.toUTC().toISO({ suppressMilliseconds: true }); - const local = dt.toISO({suppressMilliseconds: true}) - return { utc, local }; -} - -const acceptableParameters = [ // expanded params can be added by uncommenting these lines - // 'no', - // 'nox', - // 'ws', - // 'wd', - // 'rh', - // 'temp', // unit is °C, change to °F or C ? - // 'benzene', - 'pm25', - 'pm10', - 'co', - 'so2', - 'no2', - 'bc', - 'o3', -]; \ No newline at end of file diff --git a/src/adapters/israel-envista.js b/src/adapters/israel-envista.js new file mode 100644 index 00000000..0e095f94 --- /dev/null +++ b/src/adapters/israel-envista.js @@ -0,0 +1,217 @@ +'use strict'; + +import log from '../lib/logger.js'; +import client from '../lib/requests.js'; + +import { DateTime } from 'luxon'; +import { parallelLimit } from 'async'; +import { convertUnits, unifyMeasurementUnits, acceptableParameters } from '../lib/utils.js'; + + +export const name = 'israel-envista'; + +/** + * Fetches data from the Israel Envista API. + * @param {Object} source - The source configuration object. + * @param {Function} cb - The callback function to handle the fetched data. + */ +export async function fetchData(source, cb) { + const headers = { + Authorization: 'ApiToken ' + `${source.credentials.token}`, + }; + + const regionListUrl = source.url + 'regions'; + + try { + const regionList = await client({ + url: regionListUrl, + headers: headers, + }); + + const tasks = regionList.map((region) => handleRegion(source, region)); + + const results = await Promise.all(tasks); + const flatResults = results.flat(Infinity); + const convertedResults = convertUnits(flatResults); + + log.debug(`Example measurements: ${convertedResults.slice(0,5)} .`); + return cb(null, { name: 'unused', measurements: convertedResults }); + } catch (err) { + log.error(`Error fetching data: ${err.message}`); + return cb({ message: 'Failure to load data url.' }); + } +} + +/** + * Handles the processing of a single region. + * @param {Object} source - The source configuration object. + * @param {Object} region - The region object. + * @returns {Promise} A promise that resolves to an array of measurements for the region. + */ +async function handleRegion(source, region) { + const stationList = region.stations.filter( + (station) => station.active && hasAcceptedParameters(station) + ); + + const limit = 15; // Magic number to avoid rate limiting is 16. + + return new Promise((resolve, reject) => { + parallelLimit( + stationList.map((station) => (callback) => + handleStation(source, region.name, station).then( + (measurements) => callback(null, measurements), + (err) => callback(err) + ) + ), + limit, + (err, results) => { + if (err) { + log.error(`Error in handleRegion: ${err.message}`); + reject(err); + } else { + resolve(results); + } + } + ); + }); +} + +/** + * Handles the processing of a single station. + * @param {Object} source - The source configuration object. + * @param {string} regionName - The name of the region. + * @param {Object} station - The station object. + * @returns {Promise} A promise that resolves to an array of measurements for the station. + */ +async function handleStation(source, regionName, station) { + const headers = { + Authorization: 'ApiToken ' + `${source.credentials.token}`, + }; + + const stationUrl = `${source.url}stations/${station.stationId}/data/latest`; + try { + const data = await client({ + url: stationUrl, + headers: headers, + }); + return new Promise((resolve) => { + formatData(source, regionName, station, data, (measurements) => { + resolve(measurements); + }); + }); + } catch (err) { + log.error(`Error fetching station data: ${err.message}`); + return []; + } +} + +/** + * Formats the data for a single station. + * @param {Object} source - The source configuration object. + * @param {string} regionName - The name of the region. + * @param {Object} station - The station object. + * @param {Object} data - The data object retrieved from the API. + * @param {Function} cb - The callback function to handle the formatted measurements. + */ +function formatData(source, regionName, station, data, cb) { + const base = { + location: station.name, + city: regionName, + coordinates: { + latitude: parseFloat(station.location.latitude), + longitude: parseFloat(station.location.longitude), + }, + averagingPeriod: { unit: 'minutes', value: 5 }, // Updates every 5 minutes + attribution: [ + { + name: source.organization, + url: source.url, + }, + ], + }; + + const timeWindow = DateTime.utc().minus({ hours: 6 }); + + const filteredData = data.data.filter((datapoint) => { + const measurementDateTime = DateTime.fromISO(datapoint.datetime); + return measurementDateTime >= timeWindow; + }); + + const measurements = filteredData + .map((datapoint) => formatChannels(base, station, datapoint)) + .flat() + .filter((measurement) => measurement); + + return cb(measurements); +} + +/** + * Formats the channels for a single datapoint. + * @param {Object} base - The base measurement object. + * @param {Object} station - The station object. + * @param {Object} datapoint - The datapoint object. + * @returns {Array} An array of formatted measurements. + */ +function formatChannels(base, station, datapoint) { + const date = getDate(datapoint.datetime); + + return datapoint.channels + .filter((channel) => isAcceptedParameter(channel.name)) + .map((channel) => ({ + ...base, + ...date, + parameter: channel.name.toLowerCase().split('.').join(''), + value: channel.value, + unit: getUnit(station, channel), + })) + .map(unifyMeasurementUnits); +} + +/** + * Checks if a station has accepted parameters. + * @param {Object} station - The station object. + * @returns {boolean} True if the station has accepted parameters, false otherwise. + */ +function hasAcceptedParameters(station) { + const stationParameters = station.monitors.map((monitor) => + monitor.name.toLowerCase().split('.').join('') + ); + return acceptableParameters.some((param) => + stationParameters.includes(param) + ); +} + +/** + * Checks if a parameter is accepted. + * @param {string} parameter - The parameter to check. + * @returns {boolean} True if the parameter is accepted, false otherwise. + */ +function isAcceptedParameter(parameter) { + return acceptableParameters.includes( + parameter.toLowerCase().split('.').join('') + ); +} + +/** + * Gets the unit for a channel. + * @param {Object} station - The station object. + * @param {Object} channel - The channel object. + * @returns {string} The unit for the channel. + */ +function getUnit(station, channel) { + return station.monitors.find( + (monitor) => monitor.channelId === channel.id + ).units; +} + +/** + * Gets the date object from a datetime string. + * @param {string} value - The datetime string. + * @returns {Object} An object containing the UTC and local date strings. + */ +function getDate(value) { + const dt = DateTime.fromISO(value).setZone('Asia/Jerusalem'); + const utc = dt.toUTC().toISO({ suppressMilliseconds: true }); + const local = dt.toISO({ suppressMilliseconds: true }); + return { date: { utc, local } }; +} diff --git a/src/lib/utils.js b/src/lib/utils.js index c79aca2b..0211694d 100644 --- a/src/lib/utils.js +++ b/src/lib/utils.js @@ -31,6 +31,11 @@ export function convertUnits (input) { return input; } export function unifyMeasurementUnits (m) { if (!m || typeof m.unit !== 'string' || isNaN(+m.value)) return; + // ignore and pass through values that are known error codes + if (m.value === -9999 || m.value === 9999) { + return m; + } + m.unit = m.unit && m.unit.toLowerCase(); switch (m.unit) { diff --git a/src/sources/il.json b/src/sources/il.json index 2e3aac51..a4c4a0fb 100644 --- a/src/sources/il.json +++ b/src/sources/il.json @@ -1,7 +1,7 @@ [ { - "url": "https://api.svivaaqm.net/v1/envista/", - "adapter": "envista", + "url": "https://air-api.sviva.gov.il/v1/envista/", + "adapter": "israel-envista", "name": "israel-envista", "country": "IL", "description": "", @@ -12,6 +12,6 @@ "contacts": [ "info@openaq.org" ], - "active": false + "active": true } ]