Skip to content

Commit

Permalink
Cache time series in feeder (electricitymaps#333)
Browse files Browse the repository at this point in the history
  • Loading branch information
corradio authored Feb 1, 2017
1 parent cd65aab commit 9f403ff
Show file tree
Hide file tree
Showing 9 changed files with 474 additions and 345 deletions.
2 changes: 2 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

datascience

feeder/node_modules

web/node_modules
web/build
web/public/dist
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
mongodata
src
grib2json
feeder/node_modules
web/public/dist
web/node_modules
.ipynb_checkpoints/
Expand Down
5 changes: 5 additions & 0 deletions circle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ dependencies:
- sleep 20
- docker-compose logs web
- curl --fail -i http://localhost:8000/
- curl --fail -i http://localhost:8000/health
- curl --fail -i http://localhost:8000/v1/co2?countryCode=FR
- curl --fail -i http://localhost:8000/v1/exchanges?countryCode=FR
- curl --fail -i http://localhost:8000/v1/production?countryCode=FR
- curl --fail -i http://localhost:8000/v2/co2LastDay?countryCode=FR
test:
override:
- echo "no test"
3 changes: 3 additions & 0 deletions feeder/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ WORKDIR /home/feeder
# Only add requirements to enable cached builds when it is unchanged
ADD feeder/requirements.txt /home/feeder/requirements.txt
RUN pip install -r requirements.txt
# Same with package.json
ADD feeder/package.json /home/feeder/package.json
RUN npm install
# Add the rest
ADD feeder /home/feeder
ADD shared /home/shared
Expand Down
28 changes: 18 additions & 10 deletions feeder/feeder.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import arrow
import glob
import pymongo
import json, logging, os, schedule, time
import json, logging, os, schedule, subprocess, time
import requests
import snappy

Expand Down Expand Up @@ -447,19 +447,27 @@ def fetch_weather():
except:
logger.exception('fetch_weather()')

def postprocess():
try:
subprocess.check_call(['node', 'push_cache.js'], shell=False)
except:
logger.exception('postprocess()')

def fetch_electricity():
# Fetch all electricity data
fetch_weather()
fetch_consumptions()
fetch_productions()
fetch_exchanges()
fetch_price()
postprocess()

migrate(db, validate_production)

schedule.every(15).minutes.do(fetch_weather)
schedule.every(INTERVAL_SECONDS).seconds.do(fetch_consumptions)
schedule.every(INTERVAL_SECONDS).seconds.do(fetch_productions)
schedule.every(INTERVAL_SECONDS).seconds.do(fetch_exchanges)
schedule.every(INTERVAL_SECONDS).seconds.do(fetch_price)
schedule.every(INTERVAL_SECONDS).seconds.do(fetch_electricity)

fetch_weather()
fetch_consumptions()
fetch_productions()
fetch_exchanges()
fetch_price()
fetch_electricity()

while True:
schedule.run_pending()
Expand Down
21 changes: 21 additions & 0 deletions feeder/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"name": "electricitymap-feeder",
"version": "0.0.1",
"description": "",
"dependencies": {
"async": "^2.0.1",
"d3": "^4.4.0",
"mathjs": "^3.5.1",
"memcached": "^2.2.2",
"moment": "^2.15.2",
"mongodb": "^2.2.9",
"opbeat": "^3.21.0",
"snappy": "^5.0.5"
},
"repository": {
"type": "git",
"url": "https://github.com/corradio/electricitymap.git"
},
"scripts": {
}
}
77 changes: 77 additions & 0 deletions feeder/push_cache.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
var isProduction = process.env.ENV === 'production';

console.log('Starting push_cache..');

// * Opbeat (must be the first thing started)
if (isProduction) {
var opbeat = require('opbeat').start({
appId: 'c36849e44e',
organizationId: '093c53b0da9d43c4976cd0737fe0f2b1',
secretToken: process.env['OPBEAT_SECRET']
});
app.use(opbeat.middleware.express())
}
function handleError(err) {
if (!err) return;
if (opbeat) opbeat.captureError(err);
console.error(err);
}

// Require
var async = require('async');
var d3 = require('d3');
var moment = require('moment');

// Custom modules
global.__base = __dirname;
var db = require('../shared/database');

db.connect(function (err, _) {
if (err) throw (err);
// cache key accessors
var CACHE_KEY_PREFIX_HISTORY_CO2 = 'HISTORY_';

// Compute values for the last 24 hours, step 5min
var now = moment();
var before = moment(now).subtract(1, 'day');
var dates = d3.timeMinute.every(5).range(before.toDate(), now.toDate());

var tasks = dates.map(function(d) {
return function (callback) {
return db.queryLastValuesBeforeDatetime(d, callback)
};
});
// Use a `series` call to avoid running out of memory
console.log('Querying state history..');
return async.series(tasks, function (err, objs) {
if (err) {
return handleError(err);
}
// Iterate for each country
console.log('Pushing CO2 histories..');
countryCodes = d3.keys(objs[objs.length - 1].countries);
countryCodes.forEach(function (countryCode) {
// Get a timeseries of CO2
// and dedup by datetime
var dict = {};
objs.forEach(function (d) {
var c = d.countries[countryCode];
if (!c) return;
dict[c.datetime] = c;
});
var ts = d3.values(dict)
.sort(function (x, y) { return d3.ascending(x.datetime, y.datetime); });
// Push to cache
db.setCache(
CACHE_KEY_PREFIX_HISTORY_CO2 + countryCode,
ts, 24 * 3600,
function (err) {
if (err) handleError(err);
});
});
// done
console.log('Done.')
process.exit();
});

});
Loading

0 comments on commit 9f403ff

Please sign in to comment.