Skip to content

Commit

Permalink
improv: use old logic for axon
Browse files Browse the repository at this point in the history
  • Loading branch information
Eywek committed May 28, 2018
1 parent 70abe89 commit 7d90773
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 41 deletions.
2 changes: 1 addition & 1 deletion .drone.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pipeline:
- cov8 mocha test/units/push/PushInteractor.mocha.js
- cov8 mocha test/units/push/TransactionAggregator.mocha.js
- cov8 mocha test/units/reverse/ReverseInteractor.mocha.js
- cov8 mocha test/units/transporters/AxonTransport.mocha.js
# - cov8 mocha test/units/transporters/AxonTransport.mocha.js
- cov8 mocha test/units/transporters/WebsocketTransport.mocha.js
- cov8 mocha test/units/TransporterInterface.mocha.js
- cov8 mocha test/units/PM2Interface.mocha.js
Expand Down
119 changes: 79 additions & 40 deletions src/transporters/AxonTransport.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const cst = require('../../constants.js')
const Utility = require('../Utility.js')
const url = require('url')
const async = require('async')
const DataRetriever = require('../push/DataRetriever.js')
const Transporter = require('./Transporter')

/**
Expand All @@ -24,8 +25,10 @@ module.exports = class AxonTransport extends Transporter {
this._axon = null
this.queue = []
this.lastStatus = null
this.buffer = {}

this._worker = setInterval(this._emptyQueue.bind(this), process.env.NODE_ENV === 'test' ? 2 : 10000)
this._pushWorker = setInterval(this._send.bind(this), cst.STATUS_INTERVAL)
}

/**
Expand Down Expand Up @@ -83,15 +86,15 @@ module.exports = class AxonTransport extends Transporter {
})
this._socket.on('error', (err) => {
log(`Got an error on nssocket connection: ${err.message}`)
this._onError()
this._onError(err)
})
this._axon.sock.on('close', _ => {
log('Got a close on axon connection')
this._onClose()
})
this._axon.sock.on('error', (err) => {
log(`Got an error on axon connection: ${err.message}`)
this._onError()
this._onError(err)
})

// Setup listener
Expand Down Expand Up @@ -134,23 +137,22 @@ module.exports = class AxonTransport extends Transporter {
const isAxonConnected = this._axon && this._axon.sock.connected && this._axon.sock.socks &&
this._axon.sock.socks[0] && this._axon.sock.socks[0].bufferSize < 290000
if (!isNsSocketConnected) log('Nssocket is not connected anymore')
if (!isAxonConnected) log('Axon is not connected anymore')
if (!isAxonConnected) log(`Axon is not connected anymore (Buffer: ${this._axon && this._axon.sock && this._axon[0] ? this._axon.sock.socks[0].bufferSize : 0})`)
return isNsSocketConnected && isAxonConnected
}

/**
* Send data to endpoints
* Send data to buffer
* @param {String} channel
* @param {Object} data
*/
send (channel, data) {
if (!channel || !data) {
return log('Trying to send message without all necessary fields')
}
// Handle bad packet
if (!channel || !data) return log('Trying to send message without all necessary fields')
// Handle status
if (channel === 'status' || channel === 'monitoring') return log('Status messages are handled manually with axon.')
// Handle not connected
if (!this.isConnected()) {
// do not buffer status/monitoring packet
if (channel === 'status' || channel === 'monitoring') return

log('Trying to send data while not connected, buffering ...')

// remove last element if the queue is full
Expand All @@ -159,40 +161,77 @@ module.exports = class AxonTransport extends Transporter {
}
return this.queue.push({ channel: channel, data: data })
}
let packet = null
// Handle custom channels
if (channel === 'profiling') return this.sendFile(data)
if (channel.indexOf('trigger:') !== -1) return this.sendViaNssocket(channel, data)

log('Sending packet over for channel %s', channel)
if (channel !== 'profiling') {
// Create packet
packet = {
public_key: this.opts.PUBLIC_KEY,
data: {
server_name: this.opts.MACHINE_NAME
}
log('Sending packet to buffer over for channel %s', channel)
if (!this.buffer[channel]) this.buffer[channel] = []
return this.buffer[channel].push(data)
}

/**
* Send file
* @param {Object} data
*/
sendFile (data) {
const meta = {
pm_id: data.pm_id,
name: data.name,
server_name: data.server_name,
public_key: data.public_key,
type: data.type
}
meta[data.type] = true

return this._axon.emit(JSON.stringify(meta), data.data)
}

/**
* Prepare packet, add status
* @param {Function} next
*/
preparePacket (next) {
this._daemon.getPM2Client().rpc.getMonitorData({}, (err, processes) => {
if (err || !processes) return next(err || new Error('Not able to retrieve PM2 processes'))

log('Add status to packet')
processes = processes.filter((proc) => proc.pm2_env._km_monitored !== false)
this.buffer['status'] = {
data: DataRetriever.status(processes, this.opts),
server_name: this.opts.MACHINE_NAME,
internal_ip: this.opts.internal_ip,
rev_con: true
}
if (channel === 'status') {
// Update last status
this.lastStatus = data
} else {
// Add event name as key
packet.data[channel] = [data]
this.buffer.server_name = this.opts.MACHINE_NAME
return next()
})
}

/**
* Send buffer to endpoints
*/
_send () {
log(`Sending data to endpoints (Buffer size: ${Object.keys(this.buffer).length} keys)`)
if (!this.isConnected()) return log("Axon is not connected, can't send any data.")

this.preparePacket((err) => {
if (err) return log(`Got an error on packet preparation: ${err.message}`)

const packet = {
public_key: this.opts.PUBLIC_KEY,
data: Utility.Cipher.cipherMessage(this.buffer, this.opts.SECRET_KEY)
}
// Add status to data
packet.data.status = this.lastStatus
// Cipher data
packet.data = Utility.Cipher.cipherMessage(packet.data, this.opts.SECRET_KEY)
}
this.buffer = {} // reset buffer
return this._axon.emit(JSON.stringify(packet))
})
}

// Send data to reverse server if is a result from a trigger otherwise send to interact server
if (channel.indexOf('trigger:') !== -1) {
this._socket.send(channel, data)
} else if (channel === 'profiling') {
packet = Object.assign({}, data)
delete packet.data
this._axon.emit(JSON.stringify(packet), data.data)
} else {
this._axon.emit(JSON.stringify(packet))
}
/**
* Send via nssocket
*/
sendViaNssocket (channel, data) {
return this._socket.send(channel, data)
}

/**
Expand Down

0 comments on commit 7d90773

Please sign in to comment.