Skip to content

Commit

Permalink
Refactored to use Streams instead of Events
Browse files Browse the repository at this point in the history
  • Loading branch information
Nelson Silva committed Dec 11, 2013
1 parent 5aa5b92 commit 8cba48d
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 116 deletions.
23 changes: 10 additions & 13 deletions lib/src/ajax.dart
Original file line number Diff line number Diff line change
@@ -1,25 +1,22 @@
part of sockjs_client;

class XHREvents extends event.Events {
event.ListenerList get chunk => this["chunk"];
event.ListenerList get finish => this["finish"];
event.ListenerList get timeout => this["timeout"];
}

class StatusEvent {
class StatusEvent extends event.Event {
int status;
String text;
StatusEvent([this.status = 0, this.text = ""]);
StatusEvent(String type, [this.status = 0, this.text = ""]) : super(type);
}

typedef AbstractXHRObject AjaxObjectFactory(String method, String baseUrl, [payload]);

class AbstractXHRObject implements event.Emitter<XHREvents> {
XHREvents on = new XHREvents();
class AbstractXHRObject extends Object with event.Emitter {

HttpRequest xhr;
StreamSubscription changeSubscription;

Stream get onChunk => this["chunk"];
Stream get onFinish => this["finish"];
Stream get onTimeout => this["timeout"];

_start(method, url, payload, {noCredentials: false, headers}) {

try {
Expand All @@ -45,7 +42,7 @@ class AbstractXHRObject implements event.Emitter<XHREvents> {
xhr.open(method, url);
} catch(e) {
// IE raises an exception on wrong port.
on.finish.dispatch(new StatusEvent());
dispatch(new StatusEvent("finish"));
_cleanup();
return;
};
Expand Down Expand Up @@ -76,11 +73,11 @@ class AbstractXHRObject implements event.Emitter<XHREvents> {
} catch (x) {};
// IE does return readystate == 3 for 404 answers.
if (text != null && !text.isEmpty) {
on.chunk.dispatch(new StatusEvent(status, text));
dispatch(new StatusEvent("chunk", status, text));
}
break;
case 4:
on.finish.dispatch(new StatusEvent( xhr.status, xhr.responseText));
dispatch(new StatusEvent("finish", xhr.status, xhr.responseText));
_cleanup(false);
break;
}
Expand Down
34 changes: 15 additions & 19 deletions lib/src/client.dart
Original file line number Diff line number Diff line change
@@ -1,29 +1,20 @@
part of sockjs_client;

class CloseEvent {
class CloseEvent extends event.Event {
int code;
String reason;
bool wasClean;
var lastEvent;

CloseEvent({this.code, this.reason, this.wasClean, this.lastEvent});
CloseEvent({this.code, this.reason, this.wasClean, this.lastEvent}) : super("close");
}

class MessageEvent {
class MessageEvent extends event.Event {
var data;
MessageEvent(this.data);
MessageEvent(this.data) : super("message");
}

class SockJSEvents extends event.Events {
get open => this["open"];
get message => this["message"];
get close => this["close"];
get heartbeat => this["heartbeat"];
}

class Client implements event.Emitter<SockJSEvents> {

SockJSEvents on = new SockJSEvents();
class Client extends Object with event.Emitter {

bool debug;
bool devel;
Expand Down Expand Up @@ -56,7 +47,7 @@ class Client implements event.Emitter<SockJSEvents> {
}

_ir = new InfoReceiver.forURL(_baseUrl);
_ir.on.finish.add((InfoReceiverEvent evt) {
_ir.onFinish.listen((InfoReceiverEvent evt) {
_ir = null;
if (evt.info != null) {
_applyInfo(evt.info);
Expand All @@ -67,6 +58,11 @@ class Client implements event.Emitter<SockJSEvents> {
});
}

Stream get onOpen => this["open"];
Stream get onMessage => this["message"];
Stream get onClose => this["close"];
Stream get onHeartbeat => this["heartbeat"];

send(data) {
if (readyState == CONNECTING) {
throw 'INVALID_STATE_ERR';
Expand Down Expand Up @@ -110,7 +106,7 @@ class Client implements event.Emitter<SockJSEvents> {
}
readyState = CLOSED;

Timer.run(() => on.close.dispatch(close_event));
Timer.run(() => dispatch(close_event));
}

_dispatchOpen() {
Expand All @@ -120,7 +116,7 @@ class Client implements event.Emitter<SockJSEvents> {
_transportTref = null;
}
readyState = OPEN;
on.open.dispatch();
dispatch("open");
} else {
// The server might have been restarted, and lost track of our
// connection.
Expand All @@ -132,14 +128,14 @@ class Client implements event.Emitter<SockJSEvents> {
if (readyState != OPEN) {
return;
}
on.message.dispatch(new MessageEvent(data));
dispatch(new MessageEvent(data));
}

_dispatchHeartbeat() {
if (readyState != OPEN) {
return;
}
on.heartbeat.dispatch();
dispatch("heartbeat");
}

_didMessage(String data) {
Expand Down
60 changes: 8 additions & 52 deletions lib/src/events.dart
Original file line number Diff line number Diff line change
@@ -1,64 +1,20 @@
library events;

typedef void Listener(event);
import "dart:async";

class Event {
String type;
Event(this.type);
}

class Events {
class Emitter {

Map<String, ListenerList> _listeners;
final _evtController = new StreamController<Event>.broadcast();

Events() : _listeners = <String, ListenerList>{};
Stream<Event> operator[] (type) => _evtController.stream.where((e) => e.type == type);

ListenerList operator [](String type) => _listeners.putIfAbsent(type, () {
return new ListenerList(type);
});

removeAllListeners() => _listeners = {};
}

abstract class Emitter<E extends Events>{

E on;

}

class ListenerList {

final String _type;

final List<Listener> _listeners;

ListenerList(this._type) : _listeners = <Listener>[];

ListenerList add(Listener listener) {
_add(listener);
return this;
dispatch(evtOrType) {
var evt = (evtOrType is String) ? new Event(evtOrType) : evtOrType;
_evtController.add(evt);
}

ListenerList remove(Listener listener) {
_remove(listener);
return this;
}

bool dispatch([evt]) {
//assert(evt.type == _type);
_listeners.forEach((l) => l(evt));
}

void _add(Listener listener) {
_listeners.add(listener);
}

void _remove(Listener listener) {
_listeners.removeRange(_listeners.indexOf(listener), 1);
}

int get length => _listeners.length;

bool get isEmpty => _listeners.isEmpty;
}

}
35 changes: 16 additions & 19 deletions lib/src/info.dart
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,17 @@ class Info {
}
}

class InfoReceiverEvent {
class InfoReceiverEvent extends event.Event {
Info info;
num rtt;
InfoReceiverEvent([this.info = null, this.rtt]);
InfoReceiverEvent(String type, [this.info = null, this.rtt]) : super(type);
}

class InfoReceiverEvents extends event.Events {
get finish => this["finish"];
}
abstract class InfoReceiver extends Object with event.Emitter {

abstract class InfoReceiver implements event.Emitter<InfoReceiverEvents> {
InfoReceiverEvents on;
InfoReceiver._();

InfoReceiver() : on = new InfoReceiverEvents();
Stream<event.Event> get onFinish => this["finish"];

factory InfoReceiver.forURL(String baseUrl) {
if (utils.isSameOriginUrl(baseUrl)) {
Expand All @@ -55,38 +52,38 @@ abstract class InfoReceiver implements event.Emitter<InfoReceiverEvents> {

class AjaxInfoReceiver extends InfoReceiver {

AjaxInfoReceiver(String baseUrl, AjaxObjectFactory xhrFactory) {
AjaxInfoReceiver(String baseUrl, AjaxObjectFactory xhrFactory) : super._() {
Timer.run(() => doXhr(baseUrl, xhrFactory));
}

doXhr(String baseUrl, AjaxObjectFactory xhrFactory) {
var t0 = new DateTime.now().millisecondsSinceEpoch;
var xo = xhrFactory('GET', "$baseUrl/info");

var tref = new Timer(new Duration(milliseconds:8000), xo.on.timeout.dispatch);
var tref = new Timer(new Duration(milliseconds:8000), () => dispatch("timeout"));

xo.on.finish.add( (StatusEvent evt) {
xo.onFinish.listen( (StatusEvent evt) {
tref.cancel();
tref = null;
if (evt.status == 200) {
var rtt = new DateTime.now().millisecondsSinceEpoch - t0;
var info = new Info.fromJSON(JSON.decode(evt.text));
on.finish.dispatch(new InfoReceiverEvent(info, rtt));
dispatch(new InfoReceiverEvent("finish", info, rtt));
} else {
on.finish.dispatch(new InfoReceiverEvent());
dispatch(new InfoReceiverEvent("finish"));
}
});
xo.on.timeout.add( (_) {
xo.onTimeout.listen( (_) {
xo.close();
on.finish.dispatch();
dispatch(new InfoReceiverEvent("finish"));
});
}
}


class InfoReceiverIframe extends InfoReceiver {

InfoReceiverIframe (base_url) {
InfoReceiverIframe(base_url) : super._() {
if(document.body == null) {
document.onLoad.listen((_) => go());
} else {
Expand Down Expand Up @@ -123,11 +120,11 @@ class InfoReceiverIframe extends InfoReceiver {

class InfoReceiverFake extends InfoReceiver {

InfoReceiverFake() {
InfoReceiverFake() : super._() {
// It may not be possible to do cross domain AJAX to get the info
// data, for example for IE7. But we want to run JSONP, so let's
// fake the response, with rtt=2s (rto=6s).
new Timer(new Duration(milliseconds:2000), on.finish.dispatch);
new Timer(new Duration(milliseconds:2000), () => dispatch("finish"));
}
}

Expand All @@ -136,7 +133,7 @@ class InfoReceiverFake extends InfoReceiver {
class WInfoReceiverIframe {
WInfoReceiverIframe(ri, _trans_url, baseUrl) {
var ir = new AjaxInfoReceiver(baseUrl, XHRLocalObjectFactory);
ir.on.finish.add( (evt) {
ir.onFinish.listen( (evt) {
ri._didMessage('m${JSON.encode([evt.info, evt.rtt])}');
ri._didClose();
});
Expand Down
12 changes: 6 additions & 6 deletions lib/src/transport/receiver-xhr.dart
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,29 @@ part of sockjs_client;
var buf_pos = 0;

xo = xhrFactory('POST', url);
xo.on.chunk.add((e){
xo.onChunk.listen((e){
if (e.status != 200) return;
while (true) {
var buf = e.text.substring(buf_pos);
var p = buf.indexOf('\n');
if (p == -1) break;
buf_pos += p+1;
var msg = buf.substring(0, p);
on.message.dispatch(new MessageEvent(msg));
dispatch(new MessageEvent(msg));
}
});
xo.on.finish.add((e) {
xo.on.chunk.dispatch(new StatusEvent(e.status, e.text));
xo.onFinish.listen((e) {
dispatch(new StatusEvent("chunk", e.status, e.text));
xo = null;
var reason = (e.status == 200) ? 'network' : 'permanent';
on.close.dispatch(new CloseEvent(reason: reason));
dispatch(new CloseEvent(reason: reason));
});
}

abort() {
if (xo != null) {
xo.close();
on.close.dispatch(new CloseEvent(reason: 'user'));
dispatch(new CloseEvent(reason: 'user'));
xo = null;
}
}
Expand Down
10 changes: 3 additions & 7 deletions lib/src/transport/receiver.dart
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
part of sockjs_client;

class ReceiverEvents extends event.Events {
event.ListenerList get message => this["message"];
event.ListenerList get close => this["close"];
}

class Receiver implements event.Emitter<ReceiverEvents> {
ReceiverEvents on = new ReceiverEvents();
class Receiver extends Object with event.Emitter {
Stream get message => this["message"];
Stream get close => this["close"];
}

typedef Receiver ReceiverFactory(String recvUrl, AjaxObjectFactory xhrFactory);

0 comments on commit 8cba48d

Please sign in to comment.