Skip to content

Commit

Permalink
feat(network): refactor + creation of Network, Channel and Source
Browse files Browse the repository at this point in the history
affects: patois.api, @tao.js/socket.io, @tao.js/utils, @tao.js/core

- Network refactors the inniards of Kernel of the handler network so it can be reused
  - Network has "Control" methods for setting context which allows
    - sending a control message along when an AC is set
    - using a forwardAppCtx that will be called when a handler returns an AC
  - Network has `use` method to add middleware called when an AC is set to "handle" it
- Channel allows channeling ACs destined for the channel as when an AC is set from a source of ACs
- Source allows setting ACs on the network without them bouncing back to the source
  • Loading branch information
eudaimos committed Jul 24, 2019
1 parent 03e0131 commit ad392a0
Show file tree
Hide file tree
Showing 14 changed files with 789 additions and 321 deletions.
225 changes: 156 additions & 69 deletions examples/patois.api/src/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -116,76 +116,159 @@ TAO.addAsyncHandler({ t: 'Space', a: 'Enter', o: 'Portal' }, (tao, data) => {
// outgoing: { t: 'Space', a: ['List', 'Enter', 'Fail'], o: 'Portal' },
// });

function initClientTAO(clientTAO, id) {
clientTAO.addInterceptHandler({}, (tao, data) => {
console.log(`clientTAO[${id}].handling:`, tao);
});
// taoKoa.bridge({
// inline: [
// { t: 'Space', a: 'Stored' },
// { t: 'Space', a: 'Enter' }
// ],
// });

clientTAO.addInlineHandler(
{ t: 'Space', a: 'Find', o: 'Portal' },
async (tao, { Find }) => {
try {
const data = await (!Find || !Find._id
? spaces.findSpaces()
: spaces.getSpace(Find._id));
return new AppCtx(
'Space',
Array.isArray(data) ? 'List' : 'Enter',
tao.o,
{
Space: data
}
);
} catch (apiErr) {
console.error('Failed to retrieve Space:', apiErr);
return new AppCtx('Space', 'Fail', tao.o, {
Fail: {
on: tao.a,
message: apiErr.message,
Find
}
});
}
// For client Interaction
TAO.addInlineHandler(
{ t: 'Space', a: 'Find', o: 'Portal' },
async (tao, { Find }) => {
try {
const data = await (!Find || !Find._id
? spaces.findSpaces()
: spaces.getSpace(Find._id));
// return new AppCtx(
// 'Space',
// Array.isArray(data) ? 'List' : 'Enter',
// tao.o,
// {
// Space: data
// }
// );
// return new AppCtx('Space', Array.isArray(data) ? 'Fetch' : 'Retrieve', tao.o, { Space: data });
return new AppCtx(
'Space',
Array.isArray(data) ? 'List' : 'Enter',
tao.o,
{ Space: data }
);
} catch (apiErr) {
console.error('Failed to retrieve Space:', apiErr);
return new AppCtx('Space', 'Fail', tao.o, {
Fail: {
on: tao.a,
message: apiErr.message,
Find
}
});
}
);
}
);

clientTAO.addInterceptHandler(
{ t: 'Space', a: 'Find', o: 'Portal' },
async (tao, data) => {
if (!data.Find || !data.Find._id) {
// don't check cache
return;
}
const Space = await redis.getItem(tao.t, data.Find._id);
if (!Space || !Space._id) {
// cache miss
console.log('CACHE MISS! on:', data.Find._id);
return;
}
// use the cache hit to go to the next AppCtx in the protocol chain
console.log('CACHE HIT on:', Space._id);
return new AppCtx('Space', 'Enter', 'Portal', Space);
TAO.addInterceptHandler(
{ t: 'Space', a: 'Find', o: 'Portal' },
async (tao, data) => {
if (!data.Find || !data.Find._id) {
// don't check cache
return;
}
);
const Space = await redis.getItem(tao.t, data.Find._id);
if (!Space || !Space._id) {
// cache miss
console.log('CACHE MISS! on:', data.Find._id);
return;
}
// use the cache hit to go to the next AppCtx in the protocol chain
console.log('CACHE HIT on:', Space._id);
// return new AppCtx('Space', 'Retrieve', 'Portal', Space);
return new AppCtx('Space', 'Enter', 'Portal', Space);
}
);

clientTAO.addInlineHandler({ t: 'Space', a: 'Update' }, saveSpaceHandler);
clientTAO.addInlineHandler({ t: 'Space', a: 'Add' }, saveSpaceHandler);
clientTAO.addInlineHandler({ t: 'Space', a: 'Stored' }, (tao, data) => {
return new AppCtx('Space', 'Enter', tao.o, { Space: data.Space });
});
TAO.addInlineHandler({ t: 'Space', a: 'Update' }, saveSpaceHandler);
TAO.addInlineHandler({ t: 'Space', a: 'Add' }, saveSpaceHandler);
TAO.addInlineHandler({ t: 'Space', a: 'Stored' }, (tao, data) => {
console.log('server::stored handler::data:', data);
return new AppCtx('Space', 'Enter', tao.o, { Space: data.Space });
});

const bridgeToGlobal = utils.inlineBridge(
clientTAO,
TAO,
{ t: 'Space', a: 'Stored' },
{ t: 'Space', a: 'Enter' }
);
const retrieveHandler = (tao, data) =>
new AppCtx('Space', 'Enter', tao.o, data);
const fetchHandler = (tao, data) => new AppCtx('Space', 'List', tao.o, data);

const bridgeToClient = utils.inlineBridge(TAO, clientTAO, {
t: 'Space',
a: 'Tracked',
o: 'Portal'
});
function initClientTAO(clientTAO, id) {
const clientHandler = (tao, data) => {
console.log(`clientTAO[${id}].handling:`, tao);
};
clientTAO.addInterceptHandler({}, clientHandler);

// clientTAO.addInlineHandler({ t: 'Space', a: 'Retrieve', o: 'Portal' },
// retrieveHandler
// );

// clientTAO.addInlineHandler({ t: 'Space', a: 'Fetch', o: 'Portal' },
// fetchHandler
// );

// clientTAO.addInlineHandler(
// { t: 'Space', a: 'Find', o: 'Portal' },
// async (tao, { Find }) => {
// try {
// const data = await (!Find || !Find._id
// ? spaces.findSpaces()
// : spaces.getSpace(Find._id));
// return new AppCtx(
// 'Space',
// Array.isArray(data) ? 'List' : 'Enter',
// tao.o,
// {
// Space: data
// }
// );
// } catch (apiErr) {
// console.error('Failed to retrieve Space:', apiErr);
// return new AppCtx('Space', 'Fail', tao.o, {
// Fail: {
// on: tao.a,
// message: apiErr.message,
// Find
// }
// });
// }
// }
// );

// clientTAO.addInterceptHandler(
// { t: 'Space', a: 'Find', o: 'Portal' },
// async (tao, data) => {
// if (!data.Find || !data.Find._id) {
// // don't check cache
// return;
// }
// const Space = await redis.getItem(tao.t, data.Find._id);
// if (!Space || !Space._id) {
// // cache miss
// console.log('CACHE MISS! on:', data.Find._id);
// return;
// }
// // use the cache hit to go to the next AppCtx in the protocol chain
// console.log('CACHE HIT on:', Space._id);
// return new AppCtx('Space', 'Enter', 'Portal', Space);
// }
// );

// clientTAO.addInlineHandler({ t: 'Space', a: 'Update' }, saveSpaceHandler);
// clientTAO.addInlineHandler({ t: 'Space', a: 'Add' }, saveSpaceHandler);
// clientTAO.addInlineHandler({ t: 'Space', a: 'Stored' }, (tao, data) => {
// return new AppCtx('Space', 'Enter', tao.o, { Space: data.Space });
// });

// const bridgeToGlobal = utils.inlineBridge(
// clientTAO,
// TAO,
// { t: 'Space', a: 'Stored' },
// { t: 'Space', a: 'Enter' }
// );

// const bridgeToClient = utils.inlineBridge(TAO, clientTAO, {
// t: 'Space',
// a: 'Tracked',
// o: 'Portal'
// });

// const forwardSpaceTracked = (tao, data) => {
// clientTAO.setCtx(tao, data);
Expand All @@ -199,12 +282,16 @@ function initClientTAO(clientTAO, id) {

return () => {
console.log('disconnected client - removing TAO handler');
bridgeToClient();
bridgeToGlobal();
// TAO.removeInlineHandler(
// { t: 'Space', a: 'Tracked', o: 'Portal' },
// forwardSpaceTracked
// );
// bridgeToClient();
// bridgeToGlobal();
// // TAO.removeInlineHandler(
// // { t: 'Space', a: 'Tracked', o: 'Portal' },
// // forwardSpaceTracked
// // );
clientTAO.removeInterceptHandler({}, clientHandler);
// clientTAO.removeInlineHandler({ t: 'Space', a: 'Retrieve', o: 'Portal' }, retrieveHandler);
// clientTAO.removeInlineHandler({ t: 'Space', a: 'Fetch', o: 'Portal' }, fetchHandler);
clientTAO = null;
};
}

Expand Down
3 changes: 2 additions & 1 deletion packages/tao-socket-io/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"@tao.js/core": "^0.6.2"
},
"devDependencies": {
"@tao.js/core": "file:../tao"
"@tao.js/core": "file:../tao",
"@tao.js/utils": "file:../tao-utils"
}
}
67 changes: 44 additions & 23 deletions packages/tao-socket-io/src/index.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Kernel } from '@tao.js/core';
// import { Kernel } from '@tao.js/core';
import { Channel, Source } from '@tao.js/utils';

const DEFAULT_NAMESPACE = 'tao';
const SOURCE_PROP = '__$source$__';
Expand All @@ -9,43 +10,59 @@ const ON_EVENT = IS_SERVER ? 1 : 0;
const EVENTS = ['fromServer', 'fromClient'];

const socketHandler = socket => (tao, data) => {
if (!data[tao.o] || data[tao.o][SOURCE_PROP] !== SOCKET_SOURCE) {
// console.log(`sending to socket[${socket.id}]:`, tao);
if (IS_SERVER) {
socket.emit(EVENTS[EMIT_EVENT], { tao, data });
} else {
if (!data[tao.o] || data[tao.o][SOURCE_PROP] !== SOCKET_SOURCE) {
// console.log(`sending to socket[${socket.id}]:`, tao);
socket.emit(EVENTS[EMIT_EVENT], { tao, data });
}
}
};

function decorateNetwork(TAO, socket) {
const fromHandler = handler =>
socket.on(EVENTS[ON_EVENT], ({ tao, data }) => handler(tao, data));
const toEmit = (tao, data) => socket.emit(EVENTS[EMIT_EVENT], { tao, data });
const source = new Source(TAO, fromHandler, toEmit);
return source;
}

function decorateSocket(TAO, socket, forwardACs) {
console.log('TAO is instanceof Channel:', TAO instanceof Channel);
socket.on(EVENTS[ON_EVENT], ({ tao, data }) => {
// console.log(`socket[${socket.id}] received event:`, tao);
if (!forwardACs) {
TAO.setCtx(tao, data);
return;
}
const datum = Object.assign({}, data);
if (!datum[tao.o]) {
datum[tao.o] = {};
}
datum[tao.o][SOURCE_PROP] = SOCKET_SOURCE;
TAO.setCtx(tao, datum);
// if (!forwardACs) {
// if (IS_SERVER) {
TAO.setCtx(tao, data);
// return;
// }
// const datum = Object.assign({}, data);
// if (!datum[tao.o]) {
// datum[tao.o] = {};
// }
// datum[tao.o][SOURCE_PROP] = SOCKET_SOURCE;
// TAO.setCtx(tao, datum);
});

if (IS_SERVER) {
if (forwardACs) {
const handler = socketHandler(socket);
TAO.addInlineHandler({}, handler);
socket.on('disconnect', () => {
TAO.removeInlineHandler({}, handler);
});
}
// if (forwardACs) {
const handler = socketHandler(socket);
TAO.addInlineHandler({}, handler);
socket.on('disconnect', () => {
TAO.removeInlineHandler({}, handler);
});
// }
} else {
TAO.addAsyncHandler({}, socketHandler(socket));
}
}

const ioMiddleware = (TAO, onConnect) => (socket, next) => {
if (onConnect && typeof onConnect === 'function') {
let clientTAO = new Kernel();
// let clientTAO = new Kernel();
// let clientTAO = TAO.clone();
let clientTAO = new Channel(TAO, socket.id);
decorateSocket(clientTAO, socket, true);
let onDisconnect = onConnect(clientTAO, socket.id);
socket.on('disconnect', reason => {
Expand All @@ -55,8 +72,9 @@ const ioMiddleware = (TAO, onConnect) => (socket, next) => {
clientTAO = null;
onDisconnect = null;
});
} else {
decorateSocket(TAO, socket);
}
decorateSocket(TAO, socket);

if (next && typeof next === 'function') {
return next();
Expand All @@ -69,7 +87,10 @@ export default function wireTaoJsToSocketIO(TAO, io, opts = {}) {
if (io && typeof io === 'function') {
const host = opts.host || '';
const socket = io(`${host}/${ns}`);
decorateSocket(TAO, socket, true);
// decorateSocket(TAO, socket, true);
// let source = new Source(TAO);
// decorateSocket(source, socket, true);
const source = decorateNetwork(TAO, socket);
return socket;
}
} else {
Expand Down
Loading

0 comments on commit ad392a0

Please sign in to comment.