Skip to content

Commit

Permalink
Fixed #35
Browse files Browse the repository at this point in the history
Changed the way default options are managed
#2 (comment)) and added key to improve performances
  • Loading branch information
claustres committed Oct 21, 2019
1 parent 8b89f75 commit 532ce74
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 65 deletions.
121 changes: 64 additions & 57 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,21 @@ import { LocalService, RemoteService } from './service'

const debug = makeDebug('feathers-distributed')

export default function init (options) {
export default function init (options = {}) {
return function () {
const distributionOptions = Object.assign(
{
publicationDelay: 5000,
middlewares: {},
cote: {
helloInterval: 10000,
checkInterval: 20000,
nodeTimeout: 30000,
masterTimeout: 60000
}
},
options
)
const app = this
app.coteOptions = Object.assign({
helloInterval: 10000,
checkInterval: 20000,
nodeTimeout: 30000,
masterTimeout: 60000,
log: false,
basePort: 10000
}, options.cote)
const distributionOptions = Object.assign({
publicationDelay: 10000,
middlewares: {}
}, options)
const isInternalService = service => {
// Default is to expose all services
if (!distributionOptions.services) return false
Expand All @@ -34,57 +34,29 @@ export default function init (options) {
if (typeof distributionOptions.remoteServices === 'function') return distributionOptions.remoteServices(service)
else return distributionOptions.remoteServices.includes(service.path)
}
const app = this
// Because options are forwarded and assigned to defaults options of services allocate an empty object if nothing is provided
app.coteOptions = distributionOptions.cote || {}
// Change default base port for automated port finding
portfinder.basePort = app.coteOptions.basePort || 10000
app.cote = (distributionOptions.cote ? makeCote(distributionOptions.cote) : makeCote())
portfinder.basePort = app.coteOptions.basePort
// Setup cote with options
app.cote = makeCote(app.coteOptions)
// We need to uniquely identify the app to avoid infinite loop by registering our own services
app.uuid = uuid()
debug('Initializing feathers-distributed')
debug('Initializing feathers-distributed with cote options', app.coteOptions)

// This publisher publishes an event each time a local app service is registered
app.servicePublisher = new app.cote.Publisher(
{
name: 'feathers services publisher',
namespace: 'services',
broadcasts: ['service']
},
Object.assign({ log: false }, app.coteOptions)
)
// Also each time a new node pops up so that it does not depend of the initialization order of the apps
app.servicePublisher.on('cote:added', data => {
// console.log(data)
// Add a timeout so that the subscriber has been initialized on the node
setTimeout(_ => {
Object.getOwnPropertyNames(app.services).forEach(path => {
const service = app.services[path]
if (service.remote) return
const serviceDescriptor = { uuid: app.uuid, path }
// Skip internal services
if (isInternalService(serviceDescriptor)) {
debug('Ignoring local service on path ' + serviceDescriptor.path)
return
}
app.servicePublisher.publish('service', { uuid: app.uuid, path })
debug('Republished local service on path ' + path)
})
}, distributionOptions.publicationDelay)
})
// This subscriber listen to an event each time a remote app service has been registered
app.serviceSubscriber = new app.cote.Subscriber(
{
name: 'feathers services subscriber',
namespace: 'services',
subscribesTo: ['service']
},
Object.assign({ log: false }, app.coteOptions)
)
app.serviceSubscriber = new app.cote.Subscriber({
name: 'feathers services subscriber',
namespace: 'services',
key: 'services',
subscribesTo: ['application', 'service']
}, app.coteOptions)
debug('Services subscriber ready for app with uuid ' + app.uuid)
// When a remote service is declared create the local proxy interface to it
app.serviceSubscriber.on('service', serviceDescriptor => {
// Do not register our own services
if (serviceDescriptor.uuid === app.uuid) return
if (serviceDescriptor.uuid === app.uuid) {
debug('Ignoring local service registration on path ' + serviceDescriptor.path)
return
}
// Skip already registered services
const service = app.service(serviceDescriptor.path)
if (service) {
Expand Down Expand Up @@ -117,6 +89,41 @@ export default function init (options) {
// dispatch an event internally through node so that async processes can run
app.emit('service', serviceDescriptor)
})
// This publisher publishes an event each time a local app or service is registered
app.servicePublisher = new app.cote.Publisher({
name: 'feathers services publisher',
namespace: 'services',
key: 'services',
broadcasts: ['application', 'service']
}, app.coteOptions)
debug('Services publisher ready for app with uuid ' + app.uuid)
// Also each time a new app pops up so that it does not depend of the initialization order of the apps
app.serviceSubscriber.on('application', applicationDescriptor => {
// Not required for our own app
if (applicationDescriptor.uuid === app.uuid) {
debug('Ignoring local services republication for app ' + app.uuid)
return
}
debug('Republishing local services of app ' + app.uuid + ' for remote app ' + applicationDescriptor.uuid)
Object.getOwnPropertyNames(app.services).forEach(path => {
const service = app.services[path]
if (service.remote) return
const serviceDescriptor = { uuid: app.uuid, path }
// Skip internal services
if (isInternalService(serviceDescriptor)) {
debug('Ignoring local service republication on path ' + serviceDescriptor.path)
return
}
app.servicePublisher.publish('service', serviceDescriptor)
debug('Republished local service on path ' + path)
})
})
// Tell others apps I'm here
// Add a timeout so that the publisher/subscriber has been initialized on the node
setTimeout(_ => {
app.servicePublisher.publish('application', { uuid: app.uuid })
debug('Published local app with uuid ' + app.uuid)
}, distributionOptions.publicationDelay)

// We replace the use method to inject service publisher/responder
const superUse = app.use
Expand Down
22 changes: 15 additions & 7 deletions src/service.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,19 @@ class RemoteService {
// Create the request manager to remote ones for this service
this.requester = new app.cote.Requester({
name: path + ' requester',
namespace: path,
namespace: path,
key: path,
requests: ['find', 'get', 'create', 'update', 'patch', 'remove']
}, Object.assign({ log: false }, app.coteOptions))
}, app.coteOptions)
this.path = path
debug('Requester created for remote service on path ' + this.path)
// Create the subscriber to listen to events from other nodes
this.serviceEventsSubscriber = new app.cote.Subscriber({
name: path + ' events subscriber',
namespace: path,
namespace: path,
key: path,
subscribesTo: ['created', 'updated', 'patched', 'removed']
}, { log: false })
}, app.coteOptions)
this.serviceEventsSubscriber.on('created', object => {
debug('Dispatching created remote service event on path ' + path, object)
this.emit('created', object)
Expand Down Expand Up @@ -118,7 +120,12 @@ class LocalService extends cote.Responder {
constructor (options) {
const app = options.app
const path = options.path
super({ name: path + ' responder', namespace: path, respondsTo: ['find', 'get', 'create', 'update', 'patch', 'remove'] }, { log: false })
super({
name: path + ' responder',
namespace: path,
key: path,
respondsTo: ['find', 'get', 'create', 'update', 'patch', 'remove']
}, app.coteOptions)
debug('Responder created for local service on path ' + path)
const service = app.service(path)

Expand Down Expand Up @@ -163,9 +170,10 @@ class LocalService extends cote.Responder {
// Dispatch events to other nodes
this.serviceEventsPublisher = new app.cote.Publisher({
name: path + ' events publisher',
namespace: path,
namespace: path,
key: path,
broadcasts: ['created', 'updated', 'patched', 'removed']
}, Object.assign({ log: false }, app.coteOptions))
}, app.coteOptions)
service.on('created', object => {
debug('Publishing created local service event on path ' + path, object)
this.serviceEventsPublisher.publish('created', object)
Expand Down
3 changes: 2 additions & 1 deletion test/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ describe('feathers-distributed', () => {
middlewares: { after: express.errorHandler() },
// Distribute only the users service
services: (service) => service.path.endsWith('users'),
publicationDelay: 5000,
cote: { // Use cote defaults
helloInterval: 2000,
checkInterval: 4000,
Expand Down Expand Up @@ -262,7 +263,7 @@ describe('feathers-distributed', () => {
it('ensure local service hooks have been called with the remote service flag', () => {
expect(hookFromRemote).beTrue()
})

it('ensure middleware can been called on local service', async () => {
const url = 'http://localhost:' + (8080 + gateway) + '/users'
await request.get(url)
Expand Down

0 comments on commit 532ce74

Please sign in to comment.