diff --git a/examples/simple.js b/examples/simple.js index 90a1de2..0f29edd 100644 --- a/examples/simple.js +++ b/examples/simple.js @@ -1,29 +1,57 @@ -import Whisper from "../src"; +import Whisper from '../src'; const app = new Whisper(); -app.listen(3456); +app.listen(3456, () => console.log(`app started at 3456`)); -app.use(async (ctx, next) => { - const buf = ctx.data; +const logMiddleware = async (ctx, next) => { + // step 1: log start + const startedAt = Date.now(); - // handle request - ctx.start = buf.toString("utf8", 0, 2); + await next(); + + // step 4: log end + const endAt = Date.now(); + console.log(ctx.data); + console.log(`req: ${ctx.req}`); + console.log(`res: ${ctx.res}`); + console.log(`session ${ctx.session.id}: seq ${ctx.no} success ${endAt - startedAt} ms`); +}; + +const dataMiddleware = async (ctx, next) => { + // step 2: handle request + ctx.req = ctx.data.toString('utf8', 0, 2); await next(); - // send data back to client - // body could be string or buf - // body also can be a stream, like file stream - ctx.body = "haha"; + // step 3: send data + ctx.res = `haha ${ctx.req}`; + ctx.send(ctx.res); +}; + +app.use(logMiddleware); +app.use(dataMiddleware); + +app.on('close', session => { + console.log(`session ${session.id}: closed`); }); -app.use(async (ctx, next) => { - await next(); - console.log(ctx.start); +app.on('timeout', session => { + console.log(`session ${session.id}: timeout`); +}); + +app.on('end', session => { + console.log(`session ${session.id}: end`); +}); + +app.on('error', session => { + console.log(`session ${session.id}: error`); }); /** * test data */ +// request // 23 23 02 FE 45 52 52 30 38 30 33 30 30 30 +// response +// haha ## diff --git a/package.json b/package.json index 46bee45..0b62dfd 100644 --- a/package.json +++ b/package.json @@ -23,7 +23,7 @@ "prepublishOnly": "npm run build", "release": "standard-version && git push --follow-tags origin master", "test:cov": "jest --coverage && npm run lint", - "test": "npm run lint && jest" + "test": "npm run lint && jest --silent" }, "commitlint": { "extends": [ diff --git a/src/context.js b/src/context.js index 355cb92..820fc02 100644 --- a/src/context.js +++ b/src/context.js @@ -1,64 +1,17 @@ -import util from "util"; - -import delegate from "delegates"; - -import Session from "./session"; +import delegate from 'delegates'; /** * Context class. */ export default class Context { - app; + no; data; session; - socket; - - constructor(session, data) { - if (!session instanceof Session) - throw new TypeError("whisper context should attached to a session"); - if (!data) throw new Error("whisper context should have request data"); - - this.seq = session.genSeq(); - this.data = data; - this.session = session; - this.app = session.app; - this.socket = session.socket; - } - - /** - * Default error handling. - * - * @param {Error} err - * @api private - */ - - onerror(err) { - // don't do anything if there is no error. - // this allows you to pass `this.onerror` - // to node-style callbacks. - if (null === err) return; - if (!(err instanceof Error)) err = new Error(util.format("non-error thrown: %j", err)); - - // delegate - this.app.emit("error", err, this); - } } -/** - * Socket delegation. - */ - -delegate(Context.prototype, "socket") - .getter("bufferSize") - .getter("bytesRead") - .getter("bytesWritten") - .getter("localAddress") - .getter("localPort") - .getter("remoteAddress") - .getter("remotePort") - .method("address") - .method("pause") - .method("resume") - .method("setTimeout") - .method("write"); +delegate(Context.prototype, 'session') + .access('app') + .access('socket') + .method('send') + .method('onerror'); diff --git a/src/index.js b/src/index.js index d4b1e41..4ae2974 100644 --- a/src/index.js +++ b/src/index.js @@ -1,16 +1,15 @@ -import Emitter from "events"; -import net from "net"; -import util from "util"; -import Stream from "stream"; +import Emitter from 'events'; +import net from 'net'; +import util from 'util'; -import compose from "koa-compose"; -import Debugger from "debug"; -import only from "only"; +import compose from 'koa-compose'; +import Debugger from 'debug'; +import only from 'only'; -import Session from "./session"; -import Context from "./context"; +import Session from './session'; +import Context from './context'; -const debug = new Debugger("whisper"); +const debug = new Debugger('whisper'); /** * Expose `Application` class. @@ -28,7 +27,8 @@ export default class Application extends Emitter { super(); this.middleware = []; - this.env = process.env.NODE_ENV || "development"; + this.sessions = []; + this.env = process.env.NODE_ENV || 'development'; if (util.inspect.custom) { this[util.inspect.custom] = this.inspect; @@ -46,7 +46,7 @@ export default class Application extends Emitter { */ listen(...args) { - debug("listen"); + debug('listen'); const server = net.createServer(this.callback()); return server.listen(...args); } @@ -60,7 +60,7 @@ export default class Application extends Emitter { */ toJSON() { - return only(this, ["env"]); + return only(this, ['env']); } /** @@ -85,8 +85,8 @@ export default class Application extends Emitter { */ use(fn) { - if (typeof fn !== "function") throw new TypeError("middleware must be a function!"); - debug("use %s", fn._name || fn.name || "-"); + if (typeof fn !== 'function') throw new TypeError('middleware must be a function!'); + debug('use %s', fn._name || fn.name || '-'); this.middleware.push(fn); return this; } @@ -102,35 +102,58 @@ export default class Application extends Emitter { callback() { const fn = compose(this.middleware); - if (!this.listenerCount("error")) this.on("error", this.onerror); + if (!this.listenerCount('error')) this.on('error', this.onerror); const handleConnect = socket => { // create session while new connection - const session = new Session(this, socket); - socket.on("data", data => { - const ctx = new Context(session, data); + const session = this.createSession(socket); + + // hanlders + const handleEnd = () => session.end(); + const handleClose = () => session.close(); + const handleError = err => session.onerror(err); + const handleTimeout = () => session.timeout(); + const handleData = data => { + const ctx = this.createContext(session, data); this.handleRequest(ctx, fn); - }); + }; + + socket.on('close', handleClose); + socket.on('data', handleData); + socket.on('end', handleEnd); + socket.on('error', handleError); + socket.on('timeout', handleTimeout); }; return handleConnect; } + /** + * broad cast data to all client or clients match filter + * filt is a function + * filt = session => { return true; } + * + * @param {*} data string/buffer/json + */ + broadcast(data, filt = () => true) { + debug('broadcast'); + this.sessions.filter(filt).forEach(session => session.send(data)); + } + /** * Handle request in callback. * + * @param {Context} ctx + * @param {Function} fnMiddleware * @api private */ handleRequest(ctx, fnMiddleware) { - debug("data comming with seq %s", ctx.seq); + debug('data comming with seq no %s', ctx.no); debug(ctx.data); const onerror = err => ctx.onerror(err); - const handleResponse = () => respond(ctx); - return fnMiddleware(ctx) - .then(handleResponse) - .catch(onerror); + return fnMiddleware(ctx).catch(onerror); } /** @@ -141,44 +164,70 @@ export default class Application extends Emitter { */ onerror(err) { - if (!(err instanceof Error)) throw new TypeError(util.format("non-error thrown: %j", err)); + if (!(err instanceof Error)) throw new TypeError(util.format('non-error thrown: %j', err)); if (this.silent) return; const msg = err.stack || err.toString(); console.error(); - console.error(msg.replace(/^/gm, " ")); + console.error(msg.replace(/^/gm, ' ')); console.error(); } -} -/** - * Response helper. - */ + /** + * add session to list + * + * @param {*} session + * @api private + */ + add(session) { + const index = this.sessions.indexOf(session); + if (index === -1) this.sessions.push(session); + return session; + } -function respond(ctx) { - const { socket, dumb, body } = ctx; + /** + * remove session from list + * + * @param {*} session + * @api private + */ + remove(session) { + const index = this.sessions.indexOf(session); + if (index !== -1) this.sessions.splice(index, 1); + return session; + } - // allow bypassing whisper - if (dumb) return; + /** + * Initialize a new session. + * @param {Socket} socket + * + * @api private + */ - // socket close - if (!socket.writable) return; + createSession(socket) { + const session = new Session(); + session.app = this; + session.socket = socket; - // no body - if (!body) { - return debug("no body"); + return this.add(session); } - debug("respond body"); - debug(body); + /** + * Initialize a new context. + * @param {Socket} socket + * @param {Session} session + * @param {*} data + * + * @api private + */ - // responses - if (Buffer.isBuffer(body)) return socket.write(body); - if ("string" === typeof body) return socket.write(body); - if (body instanceof Stream) return body.pipe(socket); + createContext(session, data) { + const ctx = new Context(); + ctx.data = data; + ctx.session = session; + ctx.no = session.genSeq(); - // body: json - const str = JSON.stringify(body); - socket.write(str); + return ctx; + } } diff --git a/src/index.test.js b/src/index.test.js index 9fb626d..c4f95ee 100644 --- a/src/index.test.js +++ b/src/index.test.js @@ -1,11 +1,99 @@ -import Whisper from "./index"; +import net from 'net'; -describe("app", () => { - test("should set development env when NODE_ENV missing", () => { +import Whisper from './index'; + +describe('app', () => { + test('should listen on timeout', done => { + const app = new Whisper(); + const connect = app.callback(); + const s1 = new net.Socket(); + connect(s1); + app.on('timeout', session => { + expect(session).toBe(app.sessions[0]); + done(); + }); + + s1.emit('timeout'); + }); + + test('should listen on error after timeout', done => { + const app = new Whisper(); + const connect = app.callback(); + const s1 = new net.Socket(); + connect(s1); + + app.on('error', err => { + expect(err.message).toBe('session timeout'); + done(); + }); + + s1.emit('timeout'); + }); + + test('should listen on close after error', done => { + const app = new Whisper(); + const connect = app.callback(); + const s1 = new net.Socket(); + connect(s1); + s1.connect('baidu.com:80'); + + const s = app.sessions[0]; + app.on('close', session => { + expect(session).toBe(s); + expect(app.sessions).toHaveLength(0); + s1.destroy(); + done(); + }); + + s1.emit('error'); + }); + + test('should manage session list', () => { + const app = new Whisper(); + const connect = app.callback(); + const s1 = new net.Socket(); + const s2 = new net.Socket(); + connect(s1); + connect(s2); + + expect(app.sessions).toHaveLength(2); + s1.emit('close'); + expect(app.sessions).toHaveLength(1); + }); + + test('should have write session list', () => { + const app = new Whisper(); + const connect = app.callback(); + const s1 = new net.Socket(); + const s2 = new net.Socket(); + connect(s1); + connect(s2); + + expect(app.sessions).toHaveLength(2); + }); + + test('should set development env when NODE_ENV missing', () => { const NODE_ENV = process.env.NODE_ENV; - process.env.NODE_ENV = ""; + process.env.NODE_ENV = ''; const app = new Whisper(); process.env.NODE_ENV = NODE_ENV; - expect(app.env).toEqual("development"); + expect(app.env).toEqual('development'); + }); + + test('should broad cast to all clients', () => { + const app = new Whisper(); + const connect = app.callback(); + const s1 = new net.Socket(); + const s2 = new net.Socket(); + s1.write = jest.fn(); + s2.write = jest.fn(); + s1.writable = true; + s2.writable = true; + connect(s1); + connect(s2); + + app.broadcast('some data'); + expect(s1.write).toBeCalledWith('some data'); + expect(s2.write).toBeCalledWith('some data'); }); }); diff --git a/src/session.js b/src/session.js index 309b22c..f1d1525 100644 --- a/src/session.js +++ b/src/session.js @@ -1,25 +1,126 @@ -import shortid from "shortid"; +import Stream from 'stream'; +import util from 'util'; + +import Debugger from 'debug'; +import shortid from 'shortid'; + +const debug = new Debugger('whisper'); /** * Session class. */ export default class Session { + createdAt; + closedAt; id; app; socket; - constructor(app, socket) { - if (!app) throw new Error("whisper session should attached to an app"); - if (!socket) throw new Error("whisper session should attached to a socket"); - + constructor() { + this.createdAt = new Date(); this.id = shortid.generate(); - this.app = app; - this.socket = socket; this._lastSeq = 0; } + /** + * send data to client + * + * @param {*} ctx + * @api public + */ + send(body) { + const socket = this.socket; + + // no socket + if (!socket) return; + + // socket close + if (!socket.writable) return; + + // no body + if (!body) { + return debug('no body'); + } + + debug('bodyponse body'); + debug(body); + + // body: string, buffer, stream + if (Buffer.isBuffer(body)) return socket.write(body); + if ('string' === typeof body) return socket.write(body); + if (body instanceof Stream) return body.pipe(socket); + + // body: json + const str = JSON.stringify(body); + socket.write(str); + } + + /** + * generate seq no + * + * @api private + */ genSeq() { return this._lastSeq++; } + + /** + * timeout session + * + * @api private + */ + timeout() { + debug('session timeout'); + + // delegate + this.app.emit('timeout', this); + this.socket.destroy(new Error('session timeout')); + } + + /** + * end session + * + * @api private + */ + end() { + debug('session end'); + + // delegate + this.app.remove(this); + this.app.emit('end', this); + } + + /** + * close session + * + * @api private + */ + close() { + debug('session close'); + + // delegate + this.app.remove(this); + this.app.emit('close', this); + } + + /** + * Default error handling. + * + * @param {Error} err + * @api private + */ + + onerror(err) { + debug('session error'); + + // don't do anything if there is no error. + // this allows you to pass `this.onerror` + // to node-style callbacks. + if (null === err) return; + if (!(err instanceof Error)) err = new Error(util.format('non-error thrown: %j', err)); + + // delegate + this.app.emit('error', err, this); + } } diff --git a/src/session.test.js b/src/session.test.js index a752585..8bb4265 100644 --- a/src/session.test.js +++ b/src/session.test.js @@ -1,7 +1,7 @@ -import Session from "./session"; +import Session from './session'; -describe("session", () => { - test("should auto increase seq", () => { +describe('session', () => { + test('should auto increase seq', () => { const app = {}; const socket = {}; const s = new Session(app, socket); @@ -10,4 +10,12 @@ describe("session", () => { const seq1 = s.genSeq(); expect(seq1).toBe(1); }); + + test('should send to client', () => { + const session = new Session(); + session.socket = { writable: true, write: jest.fn() }; + + session.send('some data'); + expect(session.socket.write).toBeCalledWith('some data'); + }); });