Skip to content

Commit

Permalink
initial code
Browse files Browse the repository at this point in the history
  • Loading branch information
Robbie Pallas committed Sep 17, 2015
1 parent 5968ce9 commit a754dbd
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 0 deletions.
1 change: 1 addition & 0 deletions index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
module.exports = require('./lib');
26 changes: 26 additions & 0 deletions lib/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
var Async = require('async');
var Fs = require('fs');
var MongoClient = require('mongodb').MongoClient;

var Loader = require('./loader');

exports = module.exports = {
load: function (sourcePath, options, callback) {
Async.auto({
db: function (callback) {
console.log('opening db connection');
MongoClient.connect(options.mongoURI, callback);
},
subDirectory: function (callback) {
Fs.readdir(sourcePath, callback);
},
loadData: ['db', 'subDirectory', function (callback, results) {
Async.each(results.subDirectory, load(results.db), callback);
}],
cleanUp: ['db', 'loadData', function (callback, results) {
console.log('closing db connection');
results.db.close(callback);
}]
}, callback);
}
}
31 changes: 31 additions & 0 deletions lib/loader.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
var Csv = require('csv-streamify');
var Es = require('event-stream');
var Fs = require('fs');

var Streams = require('./streams');

var load = function (db, options) {
return function (directory, callback) {
var basePath = options.sourcePath + '/' + directory;
Async.waterfall([
function (callback) {
Fs.readdir(basePath, callback);
},
function (files, callback) {
console.log('loading ' + files.length + ' files from ' + directory);
Async.each(files, function (file, callback) {
Fs.createReadStream(basePath + '/' + file)
.pipe(Csv({objectMode: true, columns: true}))
.pipe(Streams.transform(directory))
.pipe(Streams.batch(200))
.pipe(Streams.insert(db).on('end', callback));
}, callback);
}
], callback);
};
};

module.exports = {
load: load
};

25 changes: 25 additions & 0 deletions lib/streams/batch.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
var batch = function (batchSize) {
batchSize = batchSize || 1000;
var batch = [];

return Es.through(
function write (data) {
batch.push(data);
if (batch.length === batchSize) {
this.emit('data', batch);
batch = [];
}
},
function end () {
if (batch.length) {
this.emit('data', batch);
batch = [];
}
this.emit('end');
}
);
};

module.exports = {
batch: batch
};
19 changes: 19 additions & 0 deletions lib/streams/insert.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
var insert = function (db) {
return Es.map(
function (data, callback) {
if (data.length) {
var bulk = db.collection('hnet').initializeUnorderedBulkOp();
data.forEach(function (doc) {
bulk.insert(doc);
});
bulk.execute(callback);
} else {
callback();
}
}
);
};

module.exports = {
insert: insert
};
11 changes: 11 additions & 0 deletions lib/streams/transform.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
var transform = function (directory) {
return Es.map(function (data, callback) {
data.siteRef = Mapping[directory];
data.epoch = parseInt((data.TheTime - 25569) * 86400) + 6 * 3600;
callback(null, data);
});
};

module.exports = {
transform: transform
};

0 comments on commit a754dbd

Please sign in to comment.