diff --git a/.dockerignore b/.dockerignore index c1225fa5a5..8029661c66 100644 --- a/.dockerignore +++ b/.dockerignore @@ -6,6 +6,8 @@ datascience +feeder/node_modules + web/node_modules web/build web/public/dist diff --git a/.gitignore b/.gitignore index a4be10a024..6509d25e78 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ mongodata src grib2json +feeder/node_modules web/public/dist web/node_modules .ipynb_checkpoints/ diff --git a/circle.yml b/circle.yml index d61e1931c8..dc64f77bb5 100644 --- a/circle.yml +++ b/circle.yml @@ -14,6 +14,11 @@ dependencies: - sleep 20 - docker-compose logs web - curl --fail -i http://localhost:8000/ + - curl --fail -i http://localhost:8000/health + - curl --fail -i http://localhost:8000/v1/co2?countryCode=FR + - curl --fail -i http://localhost:8000/v1/exchanges?countryCode=FR + - curl --fail -i http://localhost:8000/v1/production?countryCode=FR + - curl --fail -i http://localhost:8000/v2/co2LastDay?countryCode=FR test: override: - echo "no test" diff --git a/feeder/Dockerfile b/feeder/Dockerfile index 27f4ed19b4..1724ae527b 100644 --- a/feeder/Dockerfile +++ b/feeder/Dockerfile @@ -9,6 +9,9 @@ WORKDIR /home/feeder # Only add requirements to enable cached builds when it is unchanged ADD feeder/requirements.txt /home/feeder/requirements.txt RUN pip install -r requirements.txt +# Same with package.json +ADD feeder/package.json /home/feeder/package.json +RUN npm install # Add the rest ADD feeder /home/feeder ADD shared /home/shared diff --git a/feeder/feeder.py b/feeder/feeder.py index 782b830478..73fd0064d3 100644 --- a/feeder/feeder.py +++ b/feeder/feeder.py @@ -1,7 +1,7 @@ import arrow import glob import pymongo -import json, logging, os, schedule, time +import json, logging, os, schedule, subprocess, time import requests import snappy @@ -447,19 +447,27 @@ def fetch_weather(): except: logger.exception('fetch_weather()') +def postprocess(): + try: + subprocess.check_call(['node', 'push_cache.js'], shell=False) + except: + logger.exception('postprocess()') + +def fetch_electricity(): + # Fetch all electricity data + fetch_weather() + fetch_consumptions() + fetch_productions() + fetch_exchanges() + fetch_price() + postprocess() + migrate(db, validate_production) schedule.every(15).minutes.do(fetch_weather) -schedule.every(INTERVAL_SECONDS).seconds.do(fetch_consumptions) -schedule.every(INTERVAL_SECONDS).seconds.do(fetch_productions) -schedule.every(INTERVAL_SECONDS).seconds.do(fetch_exchanges) -schedule.every(INTERVAL_SECONDS).seconds.do(fetch_price) +schedule.every(INTERVAL_SECONDS).seconds.do(fetch_electricity) -fetch_weather() -fetch_consumptions() -fetch_productions() -fetch_exchanges() -fetch_price() +fetch_electricity() while True: schedule.run_pending() diff --git a/feeder/package.json b/feeder/package.json new file mode 100644 index 0000000000..137e35fe14 --- /dev/null +++ b/feeder/package.json @@ -0,0 +1,21 @@ +{ + "name": "electricitymap-feeder", + "version": "0.0.1", + "description": "", + "dependencies": { + "async": "^2.0.1", + "d3": "^4.4.0", + "mathjs": "^3.5.1", + "memcached": "^2.2.2", + "moment": "^2.15.2", + "mongodb": "^2.2.9", + "opbeat": "^3.21.0", + "snappy": "^5.0.5" + }, + "repository": { + "type": "git", + "url": "https://github.com/corradio/electricitymap.git" + }, + "scripts": { + } +} diff --git a/feeder/push_cache.js b/feeder/push_cache.js new file mode 100644 index 0000000000..3d24892777 --- /dev/null +++ b/feeder/push_cache.js @@ -0,0 +1,77 @@ +var isProduction = process.env.ENV === 'production'; + +console.log('Starting push_cache..'); + +// * Opbeat (must be the first thing started) +if (isProduction) { + var opbeat = require('opbeat').start({ + appId: 'c36849e44e', + organizationId: '093c53b0da9d43c4976cd0737fe0f2b1', + secretToken: process.env['OPBEAT_SECRET'] + }); + app.use(opbeat.middleware.express()) +} +function handleError(err) { + if (!err) return; + if (opbeat) opbeat.captureError(err); + console.error(err); +} + +// Require +var async = require('async'); +var d3 = require('d3'); +var moment = require('moment'); + +// Custom modules +global.__base = __dirname; +var db = require('../shared/database'); + +db.connect(function (err, _) { + if (err) throw (err); + // cache key accessors + var CACHE_KEY_PREFIX_HISTORY_CO2 = 'HISTORY_'; + + // Compute values for the last 24 hours, step 5min + var now = moment(); + var before = moment(now).subtract(1, 'day'); + var dates = d3.timeMinute.every(5).range(before.toDate(), now.toDate()); + + var tasks = dates.map(function(d) { + return function (callback) { + return db.queryLastValuesBeforeDatetime(d, callback) + }; + }); + // Use a `series` call to avoid running out of memory + console.log('Querying state history..'); + return async.series(tasks, function (err, objs) { + if (err) { + return handleError(err); + } + // Iterate for each country + console.log('Pushing CO2 histories..'); + countryCodes = d3.keys(objs[objs.length - 1].countries); + countryCodes.forEach(function (countryCode) { + // Get a timeseries of CO2 + // and dedup by datetime + var dict = {}; + objs.forEach(function (d) { + var c = d.countries[countryCode]; + if (!c) return; + dict[c.datetime] = c; + }); + var ts = d3.values(dict) + .sort(function (x, y) { return d3.ascending(x.datetime, y.datetime); }); + // Push to cache + db.setCache( + CACHE_KEY_PREFIX_HISTORY_CO2 + countryCode, + ts, 24 * 3600, + function (err) { + if (err) handleError(err); + }); + }); + // done + console.log('Done.') + process.exit(); + }); + +}); diff --git a/shared/database.js b/shared/database.js new file mode 100644 index 0000000000..6bb049632c --- /dev/null +++ b/shared/database.js @@ -0,0 +1,307 @@ +var exports = module.exports = {}; + +if (__base) module.paths.push(__base + '/node_modules'); + +// * Modules +var async = require('async'); +var d3 = require('d3'); +var snappy = require('snappy'); +var Memcached = require('memcached'); +var moment = require('moment'); + +// * Custom +var co2eq_parameters = require('./co2eq_parameters'); +var co2lib = require('./co2eq'); + +// * Cache +memcachedClient = new Memcached(process.env['MEMCACHED_HOST']); + +// * Database +var mongoGfsCollection; +var mongoExchangeCollection; +var mongoPriceCollection; +var mongoProductionCollection; +exports.connect = function (callback) { + require('mongodb').MongoClient.connect(process.env['MONGO_URL'], function(err, db) { + if (err) throw (err); + mongoGfsCollection = db.collection('gfs'); + mongoExchangeCollection = db.collection('exchange'); + mongoPriceCollection = db.collection('price'); + mongoProductionCollection = db.collection('production'); + callback(err, db); + }); +} + +// * Cache methods +exports.getCached = function (key, callback, cacheSeconds, asyncComputeFunction) { + memcachedClient.get(key, function (err, obj) { + if (err && asyncComputeFunction) { + console.error(err); + } + if (!asyncComputeFunction || obj) { + return callback(err, obj); + } else { + return asyncComputeFunction(function (err, obj) { + if (!err) { + memcachedClient.set(key, obj, cacheSeconds, function (err) { + if (err) console.error(err); + }); + } + return callback(err, obj); + }); + } + }); +} +exports.setCache = function (key, obj, cacheSeconds, callback) { + return memcachedClient.set(key, obj, cacheSeconds, function (err) { + callback(err); + }); +} + +// * Database methods +function processDatabaseResults(countries, exchanges, prices) { + // Assign exchanges to countries + d3.entries(exchanges).forEach(function(entry) { + sortedCountryCodes = entry.key.split('->'); + entry.value.countryCodes = sortedCountryCodes; + if (!countries[sortedCountryCodes[0]]) countries[sortedCountryCodes[0]] = { + countryCode: sortedCountryCodes[0] + }; + if (!countries[sortedCountryCodes[1]]) countries[sortedCountryCodes[1]] = { + countryCode: sortedCountryCodes[1] + }; + var country1 = countries[sortedCountryCodes[0]]; + var country2 = countries[sortedCountryCodes[1]]; + if (!country1.exchange) country1.exchange = {}; + if (!country2.exchange) country2.exchange = {}; + country1.exchange[sortedCountryCodes[1]] = entry.value.netFlow * -1.0; + country2.exchange[sortedCountryCodes[0]] = entry.value.netFlow; + }); + + // Assign prices to countries + d3.entries(prices).forEach(function(entry) { + if (countries[entry.key]) + countries[entry.key].price = { + datetime: entry.value.datetime, + value: entry.value.price + } + }); + + // Add countryCode + d3.keys(countries).forEach(function(k) { + if (!countries[k]) + countries[k] = {countryCode: k}; + country = countries[k]; + }); + // Compute aggregates + d3.values(countries).forEach(function(country) { + country.maxProduction = + d3.max(d3.values(country.production)); + country.totalProduction = + d3.sum(d3.values(country.production)); + country.maxStorage = + d3.max(d3.values(country.storage || {})); + country.totalStorage = + d3.sum(d3.values(country.storage || {})); + country.totalImport = + d3.sum(d3.values(country.exchange), function(d) { + return d >= 0 ? d : 0; + }) || 0; + country.totalExport = + d3.sum(d3.values(country.exchange), function(d) { + return d <= 0 ? -d : 0; + }) || 0; + country.totalNetExchange = country.totalImport - country.totalExport; + country.maxExport = + -Math.min(d3.min(d3.values(country.exchange)), 0) || 0; + country.maxImport = + Math.max(d3.max(d3.values(country.exchange)), 0) || 0; + }); + + computeCo2(countries, exchanges); + + return {countries: countries, exchanges: exchanges}; +} +function computeCo2(countries, exchanges) { + var assignments = co2lib.compute(countries); + d3.entries(countries).forEach(function(o) { + o.value.co2intensity = assignments[o.key]; + }); + d3.values(countries).forEach(function(country) { + country.exchangeCo2Intensities = {}; + d3.keys(country.exchange).forEach(function(k) { + // Note that for imports of countries with unknown co2intensity + // the current country co2intensity is used (see co2eq.js) + country.exchangeCo2Intensities[k] = + country.exchange[k] > 0 ? + (assignments[k] || country.co2intensity) : + country.co2intensity; + }); + country.productionCo2Intensities = {}; + d3.keys(country.production).forEach(function(k) { + country.productionCo2Intensities[k] = co2eq_parameters.footprintOf( + k, country.countryCode); + }) + }); + d3.values(exchanges).forEach(function(exchange) { + exchange.co2intensity = countries[exchange.countryCodes[exchange.netFlow > 0 ? 0 : 1]].co2intensity; + }); +} +exports.elementQuery = function (keyName, keyValue, minDate, maxDate) { + var query = { datetime: exports.rangeQuery(minDate, maxDate) }; + query[keyName] = keyValue + return query; +} +exports.rangeQuery = function (minDate, maxDate) { + var query = { }; + if (minDate) query['$gte'] = minDate; + if (maxDate) query['$lte'] = maxDate; + return query; +} +exports.queryElements = function (keyName, keyValues, collection, minDate, maxDate, callback) { + tasks = {}; + keyValues.forEach(function(k) { + tasks[k] = function(callback) { + return collection.findOne( + exports.elementQuery(keyName, k, minDate, maxDate), + { sort: [['datetime', -1]] }, + callback); + }; + }); + return async.parallel(tasks, callback); +} +exports.queryLastValuesBeforeDatetime = function (datetime, callback) { + var minDate = (moment(datetime) || moment.utc()).subtract(24, 'hours').toDate(); + var maxDate = datetime ? new Date(datetime) : undefined; + // Get list of countries, exchanges, and prices in db + return async.parallel([ + function(callback) { + mongoProductionCollection.distinct('countryCode', + {datetime: exports.rangeQuery(minDate, maxDate)}, callback); + }, + function(callback) { + mongoExchangeCollection.distinct('sortedCountryCodes', + {datetime: exports.rangeQuery(minDate, maxDate)}, callback); + }, + function(callback) { + mongoPriceCollection.distinct('countryCode', + {datetime: exports.rangeQuery(minDate, maxDate)}, callback); + }, + ], function(err, results) { + if (err) return callback(err); + productionCountryCodes = results[0]; // production keys + sortedCountryCodes = results[1]; // exchange keys + priceCountryCodes = results[2]; // price keys + // Query productions + exchanges + async.parallel([ + function(callback) { + return exports.queryElements('countryCode', productionCountryCodes, + mongoProductionCollection, minDate, maxDate, callback); + }, + function(callback) { + return exports.queryElements('sortedCountryCodes', sortedCountryCodes, + mongoExchangeCollection, minDate, maxDate, callback); + }, + function(callback) { + return exports.queryElements('countryCode', priceCountryCodes, + mongoPriceCollection, minDate, maxDate, callback); + }, + ], function(err, results) { + if (err) return callback(err); + countries = results[0]; + exchanges = results[1]; + prices = results[2]; + // This can crash, so we to try/catch + try { + result = processDatabaseResults(countries, exchanges, prices); + } catch(err) { + callback(err); + } + callback(err, result); + }); + }); +} +exports.queryLastValues = function (callback) { + return exports.queryLastValuesBeforeDatetime(undefined, callback); +} +function queryGfsAt(key, refTime, targetTime, callback) { + refTime = moment(refTime).toDate(); + targetTime = moment(targetTime).toDate(); + return mongoGfsCollection.findOne({ key, refTime, targetTime }, callback); +} +function queryLastGfsBefore(key, datetime, callback) { + return mongoGfsCollection.findOne( + { key, targetTime: exports.rangeQuery( + moment(datetime).subtract(2, 'hours').toDate(), datetime) }, + { sort: [['refTime', -1], ['targetTime', -1]] }, + callback); +} +function queryLastGfsAfter(key, datetime, callback) { + return mongoGfsCollection.findOne( + { key, targetTime: exports.rangeQuery(datetime, + moment(datetime).add(2, 'hours').toDate()) }, + { sort: [['refTime', -1], ['targetTime', 1]] }, + callback); +} +function decompressGfs(obj, callback) { + if (!obj) return callback(null, null); + return snappy.uncompress(obj, { asBuffer: true }, function (err, obj) { + if (err) return callback(err); + return callback(err, JSON.parse(obj)); + }); +} +function queryForecasts(key, datetime, callback) { + function fetchBefore(callback) { + return queryLastGfsBefore(key, now, callback); + }; + function fetchAfter(callback) { + return queryLastGfsAfter(key, now, callback); + }; + return async.parallel([fetchBefore, fetchAfter], callback); +} +function getParsedForecasts(key, datetime, useCache, callback) { + // Fetch two forecasts, using the cache if possible + var kb = key + '_before'; + var ka = key + '_after'; + function getCache(key, useCache, callback) { + if (!useCache) return callback(null, {}); + return memcachedClient.getMulti([kb, ka], callback); + } + getCache(key, useCache, function (err, data) { + if (err) { + return callback(err); + } else if (!data || !data[kb] || !data[ka]) { + // Nothing in cache, proceed as planned + return queryForecasts(key, datetime, function(err, objs) { + if (err) return callback(err); + if (!objs[0] || !objs[1]) return callback(null, null); + // Store raw (compressed) values in cache + if (useCache) { + var lifetime = parseInt( + (moment(objs[1]['targetTime']).toDate().getTime() - (new Date()).getTime()) / 1000.0); + memcachedClient.set(kb, objs[0]['data'].buffer, lifetime, handleError); + memcachedClient.set(ka, objs[1]['data'].buffer, lifetime, handleError); + } + // Decompress + return async.parallel([ + function(callback) { return decompressGfs(objs[0]['data'].buffer, callback); }, + function(callback) { return decompressGfs(objs[1]['data'].buffer, callback); } + ], function(err, objs) { + if (err) return callback(err); + // Return to sender + return callback(null, {'forecasts': objs, 'cached': false}); + }); + }) + } else { + // Decompress data, to be able to reconstruct a database object + return async.parallel([ + function(callback) { return decompressGfs(data[kb], callback); }, + function(callback) { return decompressGfs(data[ka], callback); } + ], function(err, objs) { + if (err) return callback(err); + // Reconstruct database object and return to sender + return callback(null, {'forecasts': objs, 'cached': true}); + }); + } + }); +} diff --git a/web/server.js b/web/server.js index 98765aa00b..dc02c06d62 100644 --- a/web/server.js +++ b/web/server.js @@ -21,12 +21,10 @@ var Memcached = require('memcached'); var moment = require('moment'); var MongoClient = require('mongodb').MongoClient; //var statsd = require('node-statsd'); // TODO: Remove -var snappy = require('snappy'); // Custom modules global.__base = __dirname; -var co2eq_parameters = require('../shared/co2eq_parameters'); -var co2lib = require('../shared/co2eq'); +var db = require('../shared/database') var app = express(); var server = http.Server(app); @@ -62,12 +60,10 @@ function handleError(err) { // * Database var mongoProductionCollection; var mongoExchangeCollection; -MongoClient.connect(process.env['MONGO_URL'], function(err, db) { +db.connect(function(err, db) { if (err) throw (err); console.log('Connected to database'); - mongoGfsCollection = db.collection('gfs'); mongoExchangeCollection = db.collection('exchange'); - mongoPriceCollection = db.collection('price'); mongoProductionCollection = db.collection('production'); // Start the application @@ -85,254 +81,6 @@ MongoClient.connect(process.env['MONGO_URL'], function(err, db) { // handleError(error); // }); -// * Database methods -function processDatabaseResults(countries, exchanges, prices) { - // Assign exchanges to countries - d3.entries(exchanges).forEach(function(entry) { - sortedCountryCodes = entry.key.split('->'); - entry.value.countryCodes = sortedCountryCodes; - if (!countries[sortedCountryCodes[0]]) countries[sortedCountryCodes[0]] = { - countryCode: sortedCountryCodes[0] - }; - if (!countries[sortedCountryCodes[1]]) countries[sortedCountryCodes[1]] = { - countryCode: sortedCountryCodes[1] - }; - var country1 = countries[sortedCountryCodes[0]]; - var country2 = countries[sortedCountryCodes[1]]; - if (!country1.exchange) country1.exchange = {}; - if (!country2.exchange) country2.exchange = {}; - country1.exchange[sortedCountryCodes[1]] = entry.value.netFlow * -1.0; - country2.exchange[sortedCountryCodes[0]] = entry.value.netFlow; - }); - - // Assign prices to countries - d3.entries(prices).forEach(function(entry) { - if (countries[entry.key]) - countries[entry.key].price = { - datetime: entry.value.datetime, - value: entry.value.price - } - }); - - // Add countryCode - d3.keys(countries).forEach(function(k) { - if (!countries[k]) - countries[k] = {countryCode: k}; - country = countries[k]; - }); - // Compute aggregates - d3.values(countries).forEach(function(country) { - country.maxProduction = - d3.max(d3.values(country.production)); - country.totalProduction = - d3.sum(d3.values(country.production)); - country.maxStorage = - d3.max(d3.values(country.storage || {})); - country.totalStorage = - d3.sum(d3.values(country.storage || {})); - country.totalImport = - d3.sum(d3.values(country.exchange), function(d) { - return d >= 0 ? d : 0; - }) || 0; - country.totalExport = - d3.sum(d3.values(country.exchange), function(d) { - return d <= 0 ? -d : 0; - }) || 0; - country.totalNetExchange = country.totalImport - country.totalExport; - country.maxExport = - -Math.min(d3.min(d3.values(country.exchange)), 0) || 0; - country.maxImport = - Math.max(d3.max(d3.values(country.exchange)), 0) || 0; - }); - - computeCo2(countries, exchanges); - - return {countries: countries, exchanges: exchanges}; -} -function computeCo2(countries, exchanges) { - var assignments = co2lib.compute(countries); - d3.entries(countries).forEach(function(o) { - o.value.co2intensity = assignments[o.key]; - }); - d3.values(countries).forEach(function(country) { - country.exchangeCo2Intensities = {}; - d3.keys(country.exchange).forEach(function(k) { - // Note that for imports of countries with unknown co2intensity - // the current country co2intensity is used (see co2eq.js) - country.exchangeCo2Intensities[k] = - country.exchange[k] > 0 ? - (assignments[k] || country.co2intensity) : - country.co2intensity; - }); - country.productionCo2Intensities = {}; - d3.keys(country.production).forEach(function(k) { - country.productionCo2Intensities[k] = co2eq_parameters.footprintOf( - k, country.countryCode); - }) - }); - d3.values(exchanges).forEach(function(exchange) { - exchange.co2intensity = countries[exchange.countryCodes[exchange.netFlow > 0 ? 0 : 1]].co2intensity; - }); -} -function elementQuery(keyName, keyValue, minDate, maxDate) { - var query = { datetime: rangeQuery(minDate, maxDate) }; - query[keyName] = keyValue - return query; -} -function rangeQuery(minDate, maxDate) { - var query = { }; - if (minDate) query['$gte'] = minDate; - if (maxDate) query['$lte'] = maxDate; - return query; -} -function queryElements(keyName, keyValues, collection, minDate, maxDate, callback) { - tasks = {}; - keyValues.forEach(function(k) { - tasks[k] = function(callback) { - return collection.findOne( - elementQuery(keyName, k, minDate, maxDate), - { sort: [['datetime', -1]] }, - callback); - }; - }); - return async.parallel(tasks, callback); -} -function queryLastValuesBeforeDatetime(datetime, callback) { - var minDate = (moment(datetime) || moment.utc()).subtract(24, 'hours').toDate(); - var maxDate = datetime ? new Date(datetime) : undefined; - // Get list of countries, exchanges, and prices in db - return async.parallel([ - function(callback) { - mongoProductionCollection.distinct('countryCode', - {datetime: rangeQuery(minDate, maxDate)}, callback); - }, - function(callback) { - mongoExchangeCollection.distinct('sortedCountryCodes', - {datetime: rangeQuery(minDate, maxDate)}, callback); - }, - function(callback) { - mongoPriceCollection.distinct('countryCode', - {datetime: rangeQuery(minDate, maxDate)}, callback); - }, - ], function(err, results) { - if (err) return callback(err); - productionCountryCodes = results[0]; // production keys - sortedCountryCodes = results[1]; // exchange keys - priceCountryCodes = results[2]; // price keys - // Query productions + exchanges - async.parallel([ - function(callback) { - return queryElements('countryCode', productionCountryCodes, - mongoProductionCollection, minDate, maxDate, callback); - }, - function(callback) { - return queryElements('sortedCountryCodes', sortedCountryCodes, - mongoExchangeCollection, minDate, maxDate, callback); - }, - function(callback) { - return queryElements('countryCode', priceCountryCodes, - mongoPriceCollection, minDate, maxDate, callback); - }, - ], function(err, results) { - if (err) return callback(err); - countries = results[0]; - exchanges = results[1]; - prices = results[2]; - // This can crash, so we to try/catch - try { - result = processDatabaseResults(countries, exchanges, prices); - } catch(err) { - callback(err); - } - callback(err, result); - }); - }); -} -function queryLastValues(callback) { - return queryLastValuesBeforeDatetime(undefined, callback); -} -function queryGfsAt(key, refTime, targetTime, callback) { - refTime = moment(refTime).toDate(); - targetTime = moment(targetTime).toDate(); - return mongoGfsCollection.findOne({ key, refTime, targetTime }, callback); -} -function queryLastGfsBefore(key, datetime, callback) { - return mongoGfsCollection.findOne( - { key, targetTime: rangeQuery( - moment(datetime).subtract(2, 'hours').toDate(), datetime) }, - { sort: [['refTime', -1], ['targetTime', -1]] }, - callback); -} -function queryLastGfsAfter(key, datetime, callback) { - return mongoGfsCollection.findOne( - { key, targetTime: rangeQuery(datetime, - moment(datetime).add(2, 'hours').toDate()) }, - { sort: [['refTime', -1], ['targetTime', 1]] }, - callback); -} -function decompressGfs(obj, callback) { - if (!obj) return callback(null, null); - return snappy.uncompress(obj, { asBuffer: true }, function (err, obj) { - if (err) return callback(err); - return callback(err, JSON.parse(obj)); - }); -} -function queryForecasts(key, datetime, callback) { - function fetchBefore(callback) { - return queryLastGfsBefore(key, now, callback); - }; - function fetchAfter(callback) { - return queryLastGfsAfter(key, now, callback); - }; - return async.parallel([fetchBefore, fetchAfter], callback); -} -function getParsedForecasts(key, datetime, useCache, callback) { - // Fetch two forecasts, using the cache if possible - var kb = key + '_before'; - var ka = key + '_after'; - function getCache(key, useCache, callback) { - if (!useCache) return callback(null, {}); - return memcachedClient.getMulti([kb, ka], callback); - } - getCache(key, useCache, function (err, data) { - if (err) { - return callback(err); - } else if (!data || !data[kb] || !data[ka]) { - // Nothing in cache, proceed as planned - return queryForecasts(key, datetime, function(err, objs) { - if (err) return callback(err); - if (!objs[0] || !objs[1]) return callback(null, null); - // Store raw (compressed) values in cache - if (useCache) { - var lifetime = parseInt( - (moment(objs[1]['targetTime']).toDate().getTime() - (new Date()).getTime()) / 1000.0); - memcachedClient.set(kb, objs[0]['data'].buffer, lifetime, handleError); - memcachedClient.set(ka, objs[1]['data'].buffer, lifetime, handleError); - } - // Decompress - return async.parallel([ - function(callback) { return decompressGfs(objs[0]['data'].buffer, callback); }, - function(callback) { return decompressGfs(objs[1]['data'].buffer, callback); } - ], function(err, objs) { - if (err) return callback(err); - // Return to sender - return callback(null, {'forecasts': objs, 'cached': false}); - }); - }) - } else { - // Decompress data, to be able to reconstruct a database object - return async.parallel([ - function(callback) { return decompressGfs(data[kb], callback); }, - function(callback) { return decompressGfs(data[ka], callback); } - ], function(err, objs) { - if (err) return callback(err); - // Reconstruct database object and return to sender - return callback(null, {'forecasts': objs, 'cached': true}); - }); - } - }); -} - // * Routes app.get('/v1/wind', function(req, res) { var t0 = (new Date().getTime()); @@ -404,48 +152,35 @@ app.get('/v1/state', function(req, res) { //statsdClient.increment('v1_state_GET'); var t0 = new Date().getTime(); function returnObj(obj, cached) { - if (cached) //statsdClient.increment('v1_state_GET_HIT_CACHE'); var deltaMs = new Date().getTime() - t0; - res.json({status: 'ok', data: obj, took: deltaMs + 'ms', cached: cached}); - //statsdClient.timing('state_GET', deltaMs); + res.json({status: 'ok', data: obj, took: deltaMs + 'ms'}); } if (req.query.datetime) { // Ignore requests in the future if (moment(req.query.datetime) > moment.now()) - returnObj({countries: {}, exchanges: {}}, false); - queryLastValuesBeforeDatetime(req.query.datetime, function (err, result) { + returnObj({countries: {}, exchanges: {}}); + db.queryLastValuesBeforeDatetime(req.query.datetime, function (err, result) { if (err) { //statsdClient.increment('state_GET_ERROR'); handleError(err); res.status(500).json({error: 'Unknown database error'}); } else { - returnObj(result, false); + returnObj(result); } }); } else { - memcachedClient.get('state', function (err, data) { - if (err) { - if (opbeat) - opbeat.captureError(err); - console.error(err); } - if (data) returnObj(data, true); - else { - queryLastValues(function (err, result) { - if (err) { - //statsdClient.increment('state_GET_ERROR'); - handleError(err); - res.status(500).json({error: 'Unknown database error'}); - } else { - memcachedClient.set('state', result, 5 * 60, function(err) { - if (err) { - handleError(err); - } - }); - returnObj(result, false); - } - }); - } - }); + return db.getCached('state', + function (err, data) { + if (err) { + if (opbeat) + opbeat.captureError(err); + console.error(err); + res.status(500).json({error: 'Unknown database error'}); + } + if (data) returnObj(data); + }, + 5 * 60, + db.queryLastValues); } }); app.get('/v1/co2', function(req, res) { @@ -488,7 +223,7 @@ app.get('/v1/co2', function(req, res) { .filter(function(d) { return d.types.indexOf('country') != -1; }); if (obj.length) { countryCode = obj[0].short_name; - queryLastValues(onCo2Computed); + db.queryLastValues(onCo2Computed); } else { console.error('Geocoder returned no usable results'); @@ -501,7 +236,7 @@ app.get('/v1/co2', function(req, res) { res.status(500).json({error: 'Error while geocoding'}); }); } else { - queryLastValues(onCo2Computed); + db.queryLastValues(onCo2Computed); } } else { res.status(400).json({'error': 'Missing arguments "lon" and "lat" or "countryCode"'}) @@ -517,7 +252,7 @@ app.get('/v1/exchanges', function(req, res) { var maxDate = datetime ? new Date(datetime) : undefined; var minDate = (moment(maxDate) || moment.utc()).subtract(24, 'hours').toDate(); mongoExchangeCollection.distinct('sortedCountryCodes', - {datetime: rangeQuery(minDate, maxDate)}, + {datetime: db.rangeQuery(minDate, maxDate)}, function(err, sortedCountryCodes) { if (err) { handleError(err); @@ -528,7 +263,7 @@ app.get('/v1/exchanges', function(req, res) { var from = arr[0]; var to = arr[1]; return (from === countryCode || to === countryCode); }); - queryElements('sortedCountryCodes', sortedCountryCodes, + db.queryElements('sortedCountryCodes', sortedCountryCodes, mongoExchangeCollection, minDate, maxDate, function(err, data) { if (err) { @@ -551,7 +286,7 @@ app.get('/v1/production', function(req, res) { var maxDate = datetime ? new Date(datetime) : undefined; var minDate = (moment(maxDate) || moment.utc()).subtract(24, 'hours').toDate(); mongoProductionCollection.findOne( - elementQuery('countryCode', countryCode, minDate, maxDate), + db.elementQuery('countryCode', countryCode, minDate, maxDate), { sort: [['datetime', -1]] }, function(err, doc) { if (err) { @@ -606,56 +341,26 @@ app.get('/v2/gfs/:key', function(req, res) { }); app.get('/v2/co2LastDay', function(req, res) { + // TODO: Remove + res.redirect(301, '/v2/history?countryCode=' + req.query.countryCode); +}); +app.get('/v2/history', function(req, res) { var countryCode = req.query.countryCode; if (!countryCode) return res.status(400).send('countryCode required'); - var cacheKey = 'co2LastDay_' + countryCode; - - function returnData(data, cached) { - res.json({ - 'data': data, - 'cached': cached - }) - }; - return memcachedClient.get(cacheKey, function (err, data) { - if (err) { - if (opbeat) - opbeat.captureError(err); - console.error(err); - } - if (data) returnData(data, true); - else { - var now = moment(); - var before = moment(now).subtract(1, 'day'); - var dates = [now]; - while (dates[dates.length - 1] > before) - dates.push(moment(dates[dates.length - 1]).subtract(30, 'minute')); - var tasks = dates.map(function(d) { - return function(callback) { - return queryLastValuesBeforeDatetime(d, callback) - }; - }); - return async.parallel(tasks, function(err, objs) { - if (err) { - handleError(err); - return res.status(500).send('Unknown server error'); - } - // Find unique entries - var dict = {}; - objs.forEach(function(d) { - if (d.countries[countryCode]) - dict[d.countries[countryCode].datetime] = d.countries[countryCode]; - }); - var data = d3.values(dict).sort(function(x, y) { return d3.ascending(x.datetime, y.datetime); }); - memcachedClient.set(cacheKey, data, 15 * 60, function(err) { - if (err) { - handleError(err); - } - }); - returnData(data, false); - }); - } - }); + return db.getCached('HISTORY_' + countryCode, + function (err, data) { + if (err) { + if (opbeat) + opbeat.captureError(err); + console.error(err); + res.status(500).send('Unknown database error'); + // } else if (!data) { + // res.status(500).send('No data was found'); + } else { + res.json({ 'data': data }) + } + }); }); // *** UNVERSIONED ***