From 8cba48df75ffdf27acb9e62124078db11958fb7d Mon Sep 17 00:00:00 2001 From: Nelson Silva Date: Wed, 11 Dec 2013 15:48:04 +0000 Subject: [PATCH] Refactored to use Streams instead of Events --- lib/src/ajax.dart | 23 +++++------ lib/src/client.dart | 34 ++++++++-------- lib/src/events.dart | 60 ++++------------------------- lib/src/info.dart | 35 ++++++++--------- lib/src/transport/receiver-xhr.dart | 12 +++--- lib/src/transport/receiver.dart | 10 ++--- 6 files changed, 58 insertions(+), 116 deletions(-) diff --git a/lib/src/ajax.dart b/lib/src/ajax.dart index fbdab3c..4ca9843 100644 --- a/lib/src/ajax.dart +++ b/lib/src/ajax.dart @@ -1,25 +1,22 @@ part of sockjs_client; -class XHREvents extends event.Events { - event.ListenerList get chunk => this["chunk"]; - event.ListenerList get finish => this["finish"]; - event.ListenerList get timeout => this["timeout"]; -} - -class StatusEvent { +class StatusEvent extends event.Event { int status; String text; - StatusEvent([this.status = 0, this.text = ""]); + StatusEvent(String type, [this.status = 0, this.text = ""]) : super(type); } typedef AbstractXHRObject AjaxObjectFactory(String method, String baseUrl, [payload]); -class AbstractXHRObject implements event.Emitter { - XHREvents on = new XHREvents(); +class AbstractXHRObject extends Object with event.Emitter { HttpRequest xhr; StreamSubscription changeSubscription; + Stream get onChunk => this["chunk"]; + Stream get onFinish => this["finish"]; + Stream get onTimeout => this["timeout"]; + _start(method, url, payload, {noCredentials: false, headers}) { try { @@ -45,7 +42,7 @@ class AbstractXHRObject implements event.Emitter { xhr.open(method, url); } catch(e) { // IE raises an exception on wrong port. - on.finish.dispatch(new StatusEvent()); + dispatch(new StatusEvent("finish")); _cleanup(); return; }; @@ -76,11 +73,11 @@ class AbstractXHRObject implements event.Emitter { } catch (x) {}; // IE does return readystate == 3 for 404 answers. if (text != null && !text.isEmpty) { - on.chunk.dispatch(new StatusEvent(status, text)); + dispatch(new StatusEvent("chunk", status, text)); } break; case 4: - on.finish.dispatch(new StatusEvent( xhr.status, xhr.responseText)); + dispatch(new StatusEvent("finish", xhr.status, xhr.responseText)); _cleanup(false); break; } diff --git a/lib/src/client.dart b/lib/src/client.dart index 78f02e1..b6d7ff7 100644 --- a/lib/src/client.dart +++ b/lib/src/client.dart @@ -1,29 +1,20 @@ part of sockjs_client; -class CloseEvent { +class CloseEvent extends event.Event { int code; String reason; bool wasClean; var lastEvent; - CloseEvent({this.code, this.reason, this.wasClean, this.lastEvent}); + CloseEvent({this.code, this.reason, this.wasClean, this.lastEvent}) : super("close"); } -class MessageEvent { +class MessageEvent extends event.Event { var data; - MessageEvent(this.data); + MessageEvent(this.data) : super("message"); } -class SockJSEvents extends event.Events { - get open => this["open"]; - get message => this["message"]; - get close => this["close"]; - get heartbeat => this["heartbeat"]; -} - -class Client implements event.Emitter { - - SockJSEvents on = new SockJSEvents(); +class Client extends Object with event.Emitter { bool debug; bool devel; @@ -56,7 +47,7 @@ class Client implements event.Emitter { } _ir = new InfoReceiver.forURL(_baseUrl); - _ir.on.finish.add((InfoReceiverEvent evt) { + _ir.onFinish.listen((InfoReceiverEvent evt) { _ir = null; if (evt.info != null) { _applyInfo(evt.info); @@ -67,6 +58,11 @@ class Client implements event.Emitter { }); } + Stream get onOpen => this["open"]; + Stream get onMessage => this["message"]; + Stream get onClose => this["close"]; + Stream get onHeartbeat => this["heartbeat"]; + send(data) { if (readyState == CONNECTING) { throw 'INVALID_STATE_ERR'; @@ -110,7 +106,7 @@ class Client implements event.Emitter { } readyState = CLOSED; - Timer.run(() => on.close.dispatch(close_event)); + Timer.run(() => dispatch(close_event)); } _dispatchOpen() { @@ -120,7 +116,7 @@ class Client implements event.Emitter { _transportTref = null; } readyState = OPEN; - on.open.dispatch(); + dispatch("open"); } else { // The server might have been restarted, and lost track of our // connection. @@ -132,14 +128,14 @@ class Client implements event.Emitter { if (readyState != OPEN) { return; } - on.message.dispatch(new MessageEvent(data)); + dispatch(new MessageEvent(data)); } _dispatchHeartbeat() { if (readyState != OPEN) { return; } - on.heartbeat.dispatch(); + dispatch("heartbeat"); } _didMessage(String data) { diff --git a/lib/src/events.dart b/lib/src/events.dart index d05f4fd..0fb0619 100644 --- a/lib/src/events.dart +++ b/lib/src/events.dart @@ -1,64 +1,20 @@ library events; -typedef void Listener(event); +import "dart:async"; class Event { String type; Event(this.type); } -class Events { +class Emitter { - Map _listeners; + final _evtController = new StreamController.broadcast(); - Events() : _listeners = {}; + Stream operator[] (type) => _evtController.stream.where((e) => e.type == type); - ListenerList operator [](String type) => _listeners.putIfAbsent(type, () { - return new ListenerList(type); - }); - - removeAllListeners() => _listeners = {}; -} - -abstract class Emitter{ - - E on; - -} - -class ListenerList { - - final String _type; - - final List _listeners; - - ListenerList(this._type) : _listeners = []; - - ListenerList add(Listener listener) { - _add(listener); - return this; + dispatch(evtOrType) { + var evt = (evtOrType is String) ? new Event(evtOrType) : evtOrType; + _evtController.add(evt); } - - ListenerList remove(Listener listener) { - _remove(listener); - return this; - } - - bool dispatch([evt]) { - //assert(evt.type == _type); - _listeners.forEach((l) => l(evt)); - } - - void _add(Listener listener) { - _listeners.add(listener); - } - - void _remove(Listener listener) { - _listeners.removeRange(_listeners.indexOf(listener), 1); - } - - int get length => _listeners.length; - - bool get isEmpty => _listeners.isEmpty; -} - +} \ No newline at end of file diff --git a/lib/src/info.dart b/lib/src/info.dart index 3b96a5f..389d711 100644 --- a/lib/src/info.dart +++ b/lib/src/info.dart @@ -16,20 +16,17 @@ class Info { } } -class InfoReceiverEvent { +class InfoReceiverEvent extends event.Event { Info info; num rtt; - InfoReceiverEvent([this.info = null, this.rtt]); + InfoReceiverEvent(String type, [this.info = null, this.rtt]) : super(type); } -class InfoReceiverEvents extends event.Events { - get finish => this["finish"]; -} +abstract class InfoReceiver extends Object with event.Emitter { -abstract class InfoReceiver implements event.Emitter { - InfoReceiverEvents on; + InfoReceiver._(); - InfoReceiver() : on = new InfoReceiverEvents(); + Stream get onFinish => this["finish"]; factory InfoReceiver.forURL(String baseUrl) { if (utils.isSameOriginUrl(baseUrl)) { @@ -55,7 +52,7 @@ abstract class InfoReceiver implements event.Emitter { class AjaxInfoReceiver extends InfoReceiver { - AjaxInfoReceiver(String baseUrl, AjaxObjectFactory xhrFactory) { + AjaxInfoReceiver(String baseUrl, AjaxObjectFactory xhrFactory) : super._() { Timer.run(() => doXhr(baseUrl, xhrFactory)); } @@ -63,22 +60,22 @@ class AjaxInfoReceiver extends InfoReceiver { var t0 = new DateTime.now().millisecondsSinceEpoch; var xo = xhrFactory('GET', "$baseUrl/info"); - var tref = new Timer(new Duration(milliseconds:8000), xo.on.timeout.dispatch); + var tref = new Timer(new Duration(milliseconds:8000), () => dispatch("timeout")); - xo.on.finish.add( (StatusEvent evt) { + xo.onFinish.listen( (StatusEvent evt) { tref.cancel(); tref = null; if (evt.status == 200) { var rtt = new DateTime.now().millisecondsSinceEpoch - t0; var info = new Info.fromJSON(JSON.decode(evt.text)); - on.finish.dispatch(new InfoReceiverEvent(info, rtt)); + dispatch(new InfoReceiverEvent("finish", info, rtt)); } else { - on.finish.dispatch(new InfoReceiverEvent()); + dispatch(new InfoReceiverEvent("finish")); } }); - xo.on.timeout.add( (_) { + xo.onTimeout.listen( (_) { xo.close(); - on.finish.dispatch(); + dispatch(new InfoReceiverEvent("finish")); }); } } @@ -86,7 +83,7 @@ class AjaxInfoReceiver extends InfoReceiver { class InfoReceiverIframe extends InfoReceiver { - InfoReceiverIframe (base_url) { + InfoReceiverIframe(base_url) : super._() { if(document.body == null) { document.onLoad.listen((_) => go()); } else { @@ -123,11 +120,11 @@ class InfoReceiverIframe extends InfoReceiver { class InfoReceiverFake extends InfoReceiver { - InfoReceiverFake() { + InfoReceiverFake() : super._() { // It may not be possible to do cross domain AJAX to get the info // data, for example for IE7. But we want to run JSONP, so let's // fake the response, with rtt=2s (rto=6s). - new Timer(new Duration(milliseconds:2000), on.finish.dispatch); + new Timer(new Duration(milliseconds:2000), () => dispatch("finish")); } } @@ -136,7 +133,7 @@ class InfoReceiverFake extends InfoReceiver { class WInfoReceiverIframe { WInfoReceiverIframe(ri, _trans_url, baseUrl) { var ir = new AjaxInfoReceiver(baseUrl, XHRLocalObjectFactory); - ir.on.finish.add( (evt) { + ir.onFinish.listen( (evt) { ri._didMessage('m${JSON.encode([evt.info, evt.rtt])}'); ri._didClose(); }); diff --git a/lib/src/transport/receiver-xhr.dart b/lib/src/transport/receiver-xhr.dart index 6c29867..812b050 100644 --- a/lib/src/transport/receiver-xhr.dart +++ b/lib/src/transport/receiver-xhr.dart @@ -8,7 +8,7 @@ part of sockjs_client; var buf_pos = 0; xo = xhrFactory('POST', url); - xo.on.chunk.add((e){ + xo.onChunk.listen((e){ if (e.status != 200) return; while (true) { var buf = e.text.substring(buf_pos); @@ -16,21 +16,21 @@ part of sockjs_client; if (p == -1) break; buf_pos += p+1; var msg = buf.substring(0, p); - on.message.dispatch(new MessageEvent(msg)); + dispatch(new MessageEvent(msg)); } }); - xo.on.finish.add((e) { - xo.on.chunk.dispatch(new StatusEvent(e.status, e.text)); + xo.onFinish.listen((e) { + dispatch(new StatusEvent("chunk", e.status, e.text)); xo = null; var reason = (e.status == 200) ? 'network' : 'permanent'; - on.close.dispatch(new CloseEvent(reason: reason)); + dispatch(new CloseEvent(reason: reason)); }); } abort() { if (xo != null) { xo.close(); - on.close.dispatch(new CloseEvent(reason: 'user')); + dispatch(new CloseEvent(reason: 'user')); xo = null; } } diff --git a/lib/src/transport/receiver.dart b/lib/src/transport/receiver.dart index b121e3c..e507db0 100644 --- a/lib/src/transport/receiver.dart +++ b/lib/src/transport/receiver.dart @@ -1,12 +1,8 @@ part of sockjs_client; -class ReceiverEvents extends event.Events { - event.ListenerList get message => this["message"]; - event.ListenerList get close => this["close"]; -} - -class Receiver implements event.Emitter { - ReceiverEvents on = new ReceiverEvents(); +class Receiver extends Object with event.Emitter { + Stream get message => this["message"]; + Stream get close => this["close"]; } typedef Receiver ReceiverFactory(String recvUrl, AjaxObjectFactory xhrFactory); \ No newline at end of file