Skip to content
This repository has been archived by the owner on Feb 16, 2020. It is now read-only.

[WIP] 0.6 new gekko events #1850

Merged
merged 58 commits into from
May 28, 2018
Merged
Show file tree
Hide file tree
Changes from 50 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
dbb292a
add dedicated events page
askmike Feb 1, 2018
95c8ffb
add events page
askmike Feb 1, 2018
16d9673
streamline events intro
askmike Feb 1, 2018
fc4ac03
typo
askmike Feb 1, 2018
7f41958
catch unsuccessful broadcast
askmike Feb 1, 2018
28326bf
use gekko events for market start and market update
askmike Feb 1, 2018
4c647e2
hook up plugins to market events
askmike Feb 1, 2018
c5dbb78
add eventLogger plugin
askmike Feb 1, 2018
5e459c3
add additional strat events "stratStart" and "stratUpdate"
askmike Feb 2, 2018
c7aedf2
make sure to properly enclose broadcast catch wrap
askmike Feb 5, 2018
875a3b1
rm stratStart event
askmike Feb 6, 2018
7caf0d3
rm all stratEvent docs
askmike Feb 6, 2018
9a4b2b1
remove stratStart subscription
askmike Feb 9, 2018
0c35666
introduce portfolioChange & portfolioValueChange events
askmike Feb 11, 2018
1af48aa
introduce events to describe trades async
askmike Feb 12, 2018
08beb2b
add stratWarmupCompleted event
askmike Feb 12, 2018
9ca7caf
implement stratWarmupCompleted
askmike Feb 13, 2018
d2a2146
implement stratUpdate event
askmike Feb 13, 2018
e78cfbb
error when plugins consume candles too slow
askmike Feb 21, 2018
0d97570
make sure to callback after consuming candle
askmike Feb 21, 2018
d29f785
var cleanup
askmike Feb 22, 2018
3993638
Fix issue with trade events being deferred too long
cmroche Feb 13, 2018
b722b17
force order of market events
askmike Mar 3, 2018
410dff5
remove cpRelay out of performance analyzer
askmike Mar 5, 2018
0eabf08
make sure we dont report on no trades
askmike Mar 5, 2018
decddd8
rm mentions of simulated
askmike Mar 5, 2018
eeec247
defer processCandle until the strat is completely done processing the…
askmike Mar 16, 2018
505ed01
implement tradeInitialized & tradeCompleted events
askmike Mar 19, 2018
4d52ca9
use a LIFO stack based event emittor
askmike Mar 19, 2018
d05b4fe
make all plugins fifo event emitters
askmike Mar 20, 2018
350efd3
refer to blogpost with background information
askmike Mar 20, 2018
e7d7e56
add native gekko indicator results to stratUpdate event
askmike Mar 20, 2018
59b5c70
implement roundtrip, roundtripUpdate & performanceUpdate events
askmike Mar 23, 2018
587ddcb
properly catch no trade scenario
askmike Mar 24, 2018
4df7af2
pass all plugins to gekkoStream
askmike Mar 24, 2018
4995f22
pass all plugins into gekkostream
askmike Mar 24, 2018
479d321
create plugin to handle backtest results
askmike Mar 24, 2018
3847362
only wait for actual candle consumers to handle candles
askmike Mar 25, 2018
cea17ca
only flush events from plugins that actually emit
askmike Mar 25, 2018
a42e5a0
rm the id of the small candle
askmike Mar 25, 2018
5e4d4d8
add stratCandle event
askmike Mar 25, 2018
5e9c670
properly handle stratUpdates
askmike Mar 25, 2018
674af1d
clarify strat events during warmup
askmike Mar 25, 2018
0d6ce81
allow the exporting of raw trades
askmike Mar 25, 2018
7bbb59c
hookup backtest UI to new event flow
askmike Mar 25, 2018
6ae42a2
make sure to print cp final words
askmike Mar 25, 2018
a694b09
upgrade backtest API call to use backtestResultExporter plugin
askmike Mar 25, 2018
2041cd2
update to new backtest api call
askmike Mar 26, 2018
8e2760a
allow for specifying what candle props to return
askmike Mar 26, 2018
a639317
make sure we output the binance error, fix #2037
askmike Apr 1, 2018
c6faee1
Performance Analyzer fixes and features (#2178)
stereohelix May 17, 2018
f948a2d
define relativeYearlyProfit, fix #2190
askmike May 18, 2018
b080ff7
update required node version to 8.11.2
askmike May 24, 2018
7358e55
run appveyor tests using node v9
askmike May 24, 2018
52f6e5d
pull async indicator wrap code out of base strat
askmike May 28, 2018
658bf76
remove cp.js
askmike May 28, 2018
e703963
Merge branch 'pre-v0.6' into feature/sync-gekko-events
askmike May 28, 2018
ea6df42
remove tulind & talib from default deps
askmike May 28, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 18 additions & 33 deletions core/budfox/budfox.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,24 @@ var BudFox = function(config) {

// BudFox data flow:

// relay a marketUpdate event
this.marketDataProvider.on(
'marketUpdate',
e => this.emit('marketUpdate', e)
);

// relay a marketStart event
this.marketDataProvider.on(
'marketStart',
e => this.emit('marketStart', e)
);

// Output the candles
this.candleManager.on(
'candles',
this.pushCandles
);

// on every `tick` retrieve trade data
this.heart.on(
'tick',
Expand All @@ -42,26 +60,7 @@ var BudFox = function(config) {
this.candleManager.processTrades
);

// Output the candles
this.candleManager.on(
'candles',
this.pushCandles
);

this.heart.pump();

// Budfox also reports:

// Trades & last trade
//
// this.marketDataProvider.on(
// 'trades',
// this.broadcast('trades')
// );
// this.marketDataProvider.on(
// 'trades',
// this.broadcastTrade
// );
}

var Readable = require('stream').Readable;
Expand All @@ -76,18 +75,4 @@ BudFox.prototype.pushCandles = function(candles) {
_.each(candles, this.push);
}

// BudFox.prototype.broadcastTrade = function(trades) {
// _.defer(function() {
// this.emit('trade', trades.last);
// }.bind(this));
// }

// BudFox.prototype.broadcast = function(message) {
// return function(payload) {
// _.defer(function() {
// this.emit(message, payload);
// }.bind(this));
// }.bind(this);
// }

module.exports = BudFox;
2 changes: 1 addition & 1 deletion core/budfox/candleCreator.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ CandleCreator.prototype.calculateCandles = function() {

// catch error from high volume getTrades
if (this.lastTrade !== undefined)
// create a string referencing to minute this trade happened in
// create a string referencing the minute this trade happened in
var lastMinute = this.lastTrade.date.format('YYYY-MM-DD HH:mm');

var candles = _.map(this.buckets, function(bucket, name) {
Expand Down
10 changes: 0 additions & 10 deletions core/budfox/candleManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@ var Manager = function() {

this.candleCreator
.on('candles', this.relayCandles);

this.messageFirstCandle = _.once(candle => {
cp.firstCandle(candle);
})
};

util.makeEventEmitter(Manager);
Expand All @@ -34,12 +30,6 @@ Manager.prototype.processTrades = function(tradeBatch) {

Manager.prototype.relayCandles = function(candles) {
this.emit('candles', candles);

if(!_.size(candles))
return;

this.messageFirstCandle(_.first(candles));
cp.lastCandle(_.last(candles));
}

module.exports = Manager;
10 changes: 5 additions & 5 deletions core/budfox/marketDataProvider.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ Manager.prototype.retrieve = function() {


Manager.prototype.relayTrades = function(batch) {
this.emit('trades', batch);
this.sendMarketStart(batch);
this.emit('marketUpdate', batch.last.date);

this.sendStartAt(batch);
cp.update(batch.last.date.format());
this.emit('trades', batch);
}

Manager.prototype.sendStartAt = _.once(function(batch) {
cp.startAt(batch.first.date.format())
Manager.prototype.sendMarketStart = _.once(function(batch) {
this.emit('marketStart', batch.first.date);
});

module.exports = Manager;
20 changes: 18 additions & 2 deletions core/candleBatcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ var CandleBatcher = function(candleSize) {

this.candleSize = candleSize;
this.smallCandles = [];
this.calculatedCandles = [];

_.bindAll(this);
}
Expand All @@ -28,22 +29,37 @@ CandleBatcher.prototype.write = function(candles) {
if(!_.isArray(candles))
throw 'candles is not an array';

this.emitted = 0;

_.each(candles, function(candle) {
this.smallCandles.push(candle);
this.check();
}, this);

return this.emitted;
}

CandleBatcher.prototype.check = function() {
if(_.size(this.smallCandles) % this.candleSize !== 0)
return;

this.emit('candle', this.calculate());
this.emitted++;
this.calculatedCandles.push(this.calculate());
this.smallCandles = [];
}

CandleBatcher.prototype.flush = function() {
_.each(
this.calculatedCandles,
candle => this.emit('candle', candle)
);

this.calculatedCandles = [];
}

CandleBatcher.prototype.calculate = function() {
var first = this.smallCandles.shift();
// remove the id property of the small candle
var { id, ...first } = this.smallCandles.shift();

first.vwp = first.vwp * first.volume;

Expand Down
18 changes: 0 additions & 18 deletions core/cp.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,6 @@ var message = (type, payload) => {
}

var cp = {
// string like: '2016-12-03T22:23:00.000Z'
update: latest => message('update', { latest }),
startAt: startAt => message('startAt', { startAt }),

// object like:
//
// {
// start: '2016-12-03T22:23:00.000Z',
// open: 765,
// high: 765,
// low: 765,
// close: 765,
// vwp: 765,
// volume: 0,
// trades: 0
// }
lastCandle: lastCandle => message('lastCandle', { lastCandle }),
firstCandle: firstCandle => message('firstCandle', { firstCandle }),

// object like:
//
Expand Down
34 changes: 34 additions & 0 deletions core/emitter.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Gekko uses a custom event emitter within the GekkoStream (the plugins) to guarantee
// the correct order of events that are triggered by eachother. Turns sync events from
// LIFO into a FIFO stack based model.
//
// More details here: https://forum.gekko.wizb.it/thread-56579.html

const util = require('util');
const events = require('events');
const NativeEventEmitter = events.EventEmitter;

const GekkoEventEmitter = function() {
NativeEventEmitter.call(this);
this.defferedEvents = [];
}

util.inherits(GekkoEventEmitter, NativeEventEmitter);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could use ES6 extends here, it's reference in the official doc and available since node 4.9.1 https://node.green/

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, will do!


// push to stack
GekkoEventEmitter.prototype.deferredEmit = function(name, payload) {
this.defferedEvents.push({name, payload});
}

// resolve FIFO
GekkoEventEmitter.prototype.broadcastDeferredEmit = function() {
if(this.defferedEvents.length === 0)
return false;

const event = this.defferedEvents.shift();

this.emit(event.name, event.payload);
return true;
}

module.exports = GekkoEventEmitter;
32 changes: 0 additions & 32 deletions core/eventLogger.js

This file was deleted.

84 changes: 69 additions & 15 deletions core/gekkoStream.js
Original file line number Diff line number Diff line change
@@ -1,30 +1,84 @@
// Small writable stream wrapper that
// passes data to all `candleConsumers`.

var Writable = require('stream').Writable;
var _ = require('lodash');
var async = require('async');
const Writable = require('stream').Writable;
const _ = require('lodash');
const async = require('async');
const moment = require('moment');

var util = require('./util');
var env = util.gekkoEnv();
var mode = util.gekkoMode();
const util = require('./util');
const env = util.gekkoEnv();
const mode = util.gekkoMode();
const config = util.getConfig();
const log = require(util.dirs().core + 'log');

var Gekko = function(candleConsumers) {
this.candleConsumers = candleConsumers;
var Gekko = function(plugins) {
this.plugins = plugins;
this.candleConsumers = plugins
.filter(plugin => plugin.processCandle);
Writable.call(this, {objectMode: true});

this.producers = this.plugins
.filter(p => p.meta.emits);

this.finalize = _.bind(this.finalize, this);
}

Gekko.prototype = Object.create(Writable.prototype, {
constructor: { value: Gekko }
});

Gekko.prototype._write = function(chunk, encoding, _done) {
var done = _.after(this.candleConsumers.length, _done);
_.each(this.candleConsumers, function(c) {
c.processCandle(chunk, done);
});
if(config.debug) {
// decorate with more debug information
Gekko.prototype._write = function(chunk, encoding, _done) {

const start = moment();
var relayed = false;
var at = null;

const timer = setTimeout(() => {
if(!relayed)
log.error([
`The plugin "${at}" has not processed a candle for 1 second.`,
`This will cause Gekko to slow down or stop working completely.`
].join(' '));
}, 1000);

const flushEvents = _.after(this.candleConsumers.length, () => {
relayed = true;
clearInterval(timer);
this.flushDefferedEvents();
_done();
});
_.each(this.candleConsumers, function(c) {
at = c.meta.name;
c.processCandle(chunk, flushEvents);
}, this);
}
} else {
// skip decoration
Gekko.prototype._write = function(chunk, encoding, _done) {
const flushEvents = _.after(this.candleConsumers.length, () => {
this.flushDefferedEvents();
_done();
});
_.each(this.candleConsumers, function(c) {
c.processCandle(chunk, flushEvents);
}, this);
}
}

Gekko.prototype.flushDefferedEvents = function() {
const broadcasted = _.find(
this.producers,
producer => producer.broadcastDeferredEmit()
);

// If we braodcasted anything we might have
// triggered more events, recurse until we
// have fully broadcasted everything.
if(broadcasted)
this.flushDefferedEvents();
}

Gekko.prototype.finalize = function() {
Expand All @@ -41,12 +95,12 @@ Gekko.prototype.finalize = function() {

Gekko.prototype.shutdown = function() {
async.eachSeries(
this.candleConsumers,
this.plugins,
function(c, callback) {
if (c.finalize) c.finalize(callback);
else callback();
},
function() {
() => {
// If we are a child process, we signal to the parent to kill the child once it is done
// so that is has time to process all remaining events (and send report data)
if (env === 'child-process') process.send('done');
Expand Down
Loading