From 2361d441d0c919b1bb4a729ad499c58714149ca9 Mon Sep 17 00:00:00 2001 From: Tim Zook Date: Fri, 5 Oct 2012 14:24:29 -0500 Subject: [PATCH] Added license, simplified protocol, cleaned up a lot of code. --- Default (Windows).sublime-keymap | 9 +- Default.sublime-commands | 22 ++ collab/agent.py | 128 ------- collab/client.py | 133 +++---- collab/collabdoc.py | 293 ++++++---------- collab/model.py | 586 +++++++------------------------ collab/server.py | 6 +- collab/session.py | 352 +++++++------------ collab/text.py | 1 - collab/websocket.py | 10 +- collaboration.py | 53 ++- license.txt | 21 ++ readme.md | 20 +- 13 files changed, 497 insertions(+), 1137 deletions(-) create mode 100644 Default.sublime-commands delete mode 100644 collab/agent.py create mode 100644 license.txt diff --git a/Default (Windows).sublime-keymap b/Default (Windows).sublime-keymap index d390097..1f16cfd 100644 --- a/Default (Windows).sublime-keymap +++ b/Default (Windows).sublime-keymap @@ -1,6 +1,7 @@ [ - { "keys": ["ctrl+alt+c"], "command": "connect_to_server" }, - { "keys": ["ctrl+alt+d"], "command": "disconnect_from_server" }, - { "keys": ["ctrl+alt+o"], "command": "open_document" }, - { "keys": ["ctrl+alt+s"], "command": "toggle_server" } + { "keys": ["ctrl+alt+c"], "command": "collab_connect_to_server" }, + { "keys": ["ctrl+alt+d"], "command": "collab_disconnect_from_server" }, + { "keys": ["ctrl+alt+o"], "command": "collab_open_document" }, + { "keys": ["ctrl+alt+s"], "command": "collab_toggle_server" }, + { "keys": ["ctrl+alt+a"], "command": "collab_add_current_document" } ] \ No newline at end of file diff --git a/Default.sublime-commands b/Default.sublime-commands new file mode 100644 index 0000000..fe87ca1 --- /dev/null +++ b/Default.sublime-commands @@ -0,0 +1,22 @@ +[ + { + "caption": "Collaboration: Connect To Server", + "command": "collab_connect_to_server" + }, + { + "caption": "Collaboration: Disconnect From Server", + "command": "collab_disconnect_from_server" + }, + { + "caption": "Collaboration: Toggle Local Server", + "command": "collab_toggle_server" + }, + { + "caption": "Collaboration: Open Document", + "command": "collab_open_document" + }, + { + "caption": "Collaboration: Add Current Document", + "command": "collab_add_current_document" + } +] diff --git a/collab/agent.py b/collab/agent.py deleted file mode 100644 index 0f88402..0000000 --- a/collab/agent.py +++ /dev/null @@ -1,128 +0,0 @@ -import hat, doctypes, time - -class CollabAgent(object): - def __init__(self, data, auth, model, options): - self.auth = auth - self.model = model - self.options = options - self.sessionId = hat.hat() - self.connectTime = time.time() - self.headers = data['headers'] - self.remoteAddress = data['remoteAddress'] - - self.listeners = {} - - self.name = None - - def doAuth(self, actionData, name, userCallback, acceptCallback): - action = actionData if actionData else {} - action['name'] = name - - if name == 'connect': - action['type'] = 'connect' - elif name == 'create': - action['type'] = 'create' - elif name in ['get snapshot', 'get ops', 'open']: - action['type'] = 'read' - elif name == 'submit op': - action['type'] = 'update' - elif name == 'submit meta': - action['type'] = 'update' - elif name == 'delete': - action['type'] = 'delete' - else: - raise Exception("Invalid action name {0}".format(name)) - - responded = False - - def _action_reject(): - if responded: - raise Exception('Multiple accept/reject calls made') - #responded = True - userCallback('forbidden', None) - action['reject'] = _action_reject - - def _action_accept(): - if responded: - raise Exception('Multiple accept/reject calls made') - #responded = True - acceptCallback() - action['accept'] = _action_accept - - return self.auth(self, action) - - def disconnect(self): - [self.model.removeListener(docName, self.listeners[docName]) for docName in self.listeners] - - def getOps(self, docName, start, end, callback): - self.doAuth({'docName':docName, 'start':start, 'end':end}, 'get ops', callback, lambda: self.model.getOps(docName, start, end, callback)) - - def getSnapshot(self, docName, callback): - self.doAuth({'docName':docName}, 'get snapshot', callback, lambda: self.model.getSnapshot(docName, callback)) - - def create(self, docName, type, meta, callback): - if isinstance(type, (str, unicode)): - type = doctypes.types[type] - - meta = {} - - if self.name: - meta['creator'] = self.name - meta['ctime'] = meta['mtime'] = time.time() - - self.doAuth({'docName':docName, 'docType':type, 'meta':meta}, 'create', callback, lambda: self.model.create(docName, type, meta, callback)) - - def submitOp(self, docName, opData, callback): - opData['meta']['source'] = self.sessionId - if 'meta' not in opData or not opData['meta']: opData['meta'] = {} - dupIfSource = opData['dupIfSource'] if 'dupIfSource' in opData and opData['dupIfSource'] else [] - - if 'op' in opData: - self.doAuth({'docName':docName, 'op':opData['op'], 'v':opData['v'], 'meta':opData['meta'], 'dupIfSource':dupIfSource}, 'submit op', callback, lambda: self.model.applyOp(docName, opData, callback)) - else: - self.doAuth({'docName':docName, 'meta':opData['meta']}, 'submit meta', callback, lambda: self.model.applyMetaOp(docName, opData, callback)) - - def delete(self, docName, callback): - self.doAuth({'docName':docName}, 'delete', callback, lambda: self.model.delete(docName, callback)) - - def listen(self, docName, version, listener, callback): - authOps = (lambda c: self.doAuth({'docName':docName, 'start':version, 'end':None}, 'get ops', callback, c)) if version else (lambda c: c()) - - def _do_authops(): - def _do_auth(): - if docName in self.listeners: - if callback: - return callback('Document is already open') - return - self.listeners[docName] = listener - - def _model_listen(error, v): - if error and docName in self.listeners: - del self.listeners[docName] - - if callback: - return callback(error, v) - self.model.listen(docName, version, listener, _model_listen) - - self.doAuth({'docName':docName, 'v':version} if version else {'docName':docName}, 'open', callback, _do_auth) - - authOps(_do_authops) - - def removeListener(self, docName): - if docName not in self.listeners: - raise Exception('Document is not open') - self.model.removeListener(docName, self.listeners[docName]) - del self.listeners[docName] - -def createAgent(model, options): - def _default_auth(agent, action): - if action['type'] in ['connect', 'read', 'create', 'update']: - action['accept']() - else: - action['reject']() - auth = options['auth'] if 'auth' in options else _default_auth - - def _returns(data, callback): - agent = CollabAgent(data, auth, model, options) - return agent.doAuth(None, 'connect', callback, lambda: callback(None, agent)) - return _returns diff --git a/collab/client.py b/collab/client.py index 31982ea..a1eceb9 100644 --- a/collab/client.py +++ b/collab/client.py @@ -1,33 +1,20 @@ -import collabdoc, json, doctypes, connection, websocket +import collabdoc, doctypes, connection, websocket, logging class CollabClient: def __init__(self, host, port): self.docs = {} self.state = 'connecting' - self.lastError = None self.connected = False self.id = None - - def _socket_onopen(reason=''): - self.setState('disconnected', reason) - if reason in ['Closed', 'Stopped by server']: - self.setState('stopped', self.lastError or reason) - - def _socket_onopen(): - self.lastError = self.lastReceivedDoc = self.lastSentDoc = None - self.setState('handshaking') - self.socket = websocket.ClientWebSocket(host, port) - self.socket.on('message', self._socket_message) - self.socket.on('error', lambda e: self.emit('error', e)) - self.socket.on('connecting', lambda: self.setState('connecting')) - self.socket.on('open', _socket_onopen) - self.socket.on('close', _socket_onopen) + self.socket.on('message', self.socket_message) + self.socket.on('error', self.socket_error) + self.socket.on('open', self.socket_open) + self.socket.on('close', self.socket_close) self.socket.start() - self._events = {} def on(self, event, fct): @@ -46,96 +33,68 @@ def emit(self, event, *args): callback(*args) return self - def _socket_message(self, msg): + def socket_open(self): + self.set_state('handshaking') + + def socket_close(self, reason=''): + self.set_state('closed', reason) + self.socket = None + + def socket_error(self, error): + self.emit('error', error) + + def socket_message(self, msg): if 'auth' in msg: - if msg['auth'] == '': - self.lastError = msg['error'] + if msg['auth'] is None or msg['auth'] == '': + logging.warning('Authentication failed: {0}'.format(msg['error'])) self.disconnect() - return self.emit('connect failed', msg.error) else: self.id = msg['auth'] - self.setState('ok') - return - - if 'doc' in msg: - docName = msg['doc'] - self.lastReceivedDoc = docName - else: - msg['doc'] = docName = self.lastReceivedDoc + self.set_state('ok') + return - if docName in self.docs: - self.docs[docName]._onMessage(msg) + if 'doc' in msg and msg['doc'] in self.docs: + self.docs[msg['doc']].on_message(msg) else: - print('Unhandled message {1}'.format(msg)) + logging.error('Unhandled message {0}'.format(msg)) - def setState(self, state, data=None): - if self.state is state: - return + def set_state(self, state, data=None): + if self.state is state: return self.state = state - if state is 'disconnected': + if state is 'closed': self.id = None self.emit(state, data) - for docName in self.docs: - self.docs[docName]._connectionStateChanged(state, data) - def send(self, data): - docName = data['doc'] - - if docName is self.lastSentDoc: - del data['doc'] - else: - self.lastSentDoc = docName - - self.socket.send(json.dumps(data)) + if self.state is not "closed": + self.socket.send(data) def disconnect(self): - self.socket.close() - - def makeDoc(self, name, data, callback): - if name in self.docs: - raise Exception("Doc {1} already open".format(name)) + if self.state is not "closed": + self.socket.close() - doc = collabdoc.CollabDoc(self, name, data) - self.docs[name] = doc - - def _doc_open(error): - if error: - del self.docs[name] - callback(error, doc if not error else None) - doc.open(_doc_open) - - def openExisting(self, docName, callback): - if self.state is 'stopped': - return callback('connection closed') - if docName in self.docs: - return callback(None, self.docs[docName]) - self.makeDoc(docName, {}, callback) - - def open(self, docName, type, callback): - if self.state is 'stopped': + def open(self, name, doctype, callback, **kwargs): + if self.state is 'closed': return callback('connection closed', None) if self.state is 'connecting': - self.on('handshaking', lambda x: self.open(docName, type, callback)) - return + return self.on('ok', lambda x: self.open(name, doctype, callback)) - if isinstance(type, (str, unicode)): - type = doctypes.types.get(type, None) + if name in self.docs: + return callback("doc {0} already open".format(name), None) - if not type: - return callback("OT code for document type missing", None) + if isinstance(doctype, (str, unicode)): + doctype = doctypes.types.get(doctype, None) - if not docName: - raise Exception('Server-generated random doc names are not currently supported') + if not doctype: + return callback("Invalid document type", None) - if docName in self.docs: - doc = self.docs[docName] - if doc.type.name == type.name: - callback(None, doc) - else: - callback('Type mismatch', doc) - return + doc = collabdoc.CollabDoc(self, name, doctype, kwargs.get('snapshot', None)) + self.docs[name] = doc + + doc.open(lambda error, doc: callback(error, doc if not error else None)) + + def closed(self, name): + del self.docs[name] - self.makeDoc(docName, {'create':True, 'type':type.name}, callback) \ No newline at end of file diff --git a/collab/collabdoc.py b/collab/collabdoc.py index 508072c..72859cd 100644 --- a/collab/collabdoc.py +++ b/collab/collabdoc.py @@ -1,31 +1,33 @@ import doctypes, functools class CollabDoc(): - def __init__(self, connection, name, openData): + def __init__(self, connection, name, doctype, snapshot=None): self.connection = connection self.name = name + self.version = 0 + self.snapshot = snapshot + self.state = 'closed' self._events = {} - if not openData: openData = {} - self.version = openData.get('v', 0) - self.snapshot = openData.get('snaphot', None) - self.type = None - if 'type' in openData: - self._setType(openData['type']) + self.doctype = doctype() + if hasattr(doctype, 'api'): + for var in self.doctype.api.__class__.__dict__: + try: + self.__dict__[var] = functools.partial(self.doctype.api.__class__.__dict__[var], self) + except: + self.__dict__[var] = self.doctype.api.__class__.__dict__[var] + if hasattr(self, '_register'): + self._register() + + self.connection.on('closed', lambda data: self.set_state('closed', data)) - self.state = 'closed' - self.created = None - self.autoOpen = False - self._create = openData['create'] if 'create' in openData else False self.inflightOp = None self.inflightCallbacks = [] - self.inflightSubmittedIds = [] self.pendingOp = None self.pendingCallbacks = [] self.serverOps = {} - self._closeCallback = None self._openCallback = None def on(self, event, fct): @@ -44,234 +46,135 @@ def emit(self, event, *args): callback(*args) return self - def _xf(self, client, server): - if hasattr(self.type, 'transformX'): - self.type.transformX(client, server) - else: - client_ = self.type.transform(client, server, 'left') - server_ = self.type.transform(server, client, 'right') - return [client_, server_] - - def _otApply(self, docOp, isRemote): - oldSnapshot = self.snapshot - self.snapshot = self.type.apply(self.snapshot, docOp) - - self.emit('change', docOp, oldSnapshot) - if isRemote: - self.emit('remoteop', docOp, oldSnapshot) - - def _connectionStateChanged(self, state, data): - if state == 'disconnected': - self.state = 'closed' - if self.inflightOp: - self.inflightSubmittedIds.append(self.connection.id) - self.emit('closed') - elif state == 'ok': - if self.autoOpen: - self.open() - elif state == 'stopped': - if self._openCallback: - self._openCallback(data) + def set_state(self, state, data=None): + if self.state is state: return + self.state = state + + if state is 'closed': + if self._openCallback: self._openCallback(data, None) + self.emit(state, data) + + def open(self, callback=None): + if self.state != 'closed': return - def _setType(self, type): - if isinstance(type, (str, unicode)): - type = doctypes.types[type] + self.connection.send({'doc': self.name, 'open': True, 'snapshot': self.snapshot, 'type': self.doctype.name, 'create': True}) + self.set_state('opening') - if type and not hasattr(type, 'compose'): - raise Exception('Support for types without compose() is not implemented') + self._openCallback = callback - self.type = type() - if hasattr(type, 'api'): - for var in type.api.__class__.__dict__: - try: - self.__dict__[var] = functools.partial(type.api.__class__.__dict__[var], self) - except: - self.__dict__[var] = type.api.__class__.__dict__[var] - if hasattr(self, '_register'): - self._register() + def close(self): + self.connection.send({'doc':self.name, 'open':False}) + self.set_state('closed', 'closed by local client') + + def submitOp(self, op, callback): + op = self.doctype.normalize(op) + + self.snapshot = self.doctype.apply(self.snapshot, op) + + if self.pendingOp is not None: + self.pendingOp = self.doctype.compose(self.pendingOp, op) + else: + self.pendingOp = op + + if callback: + self.pendingCallbacks.append(callback) + + self.emit('change', op) + + self.flush() + + def flush(self): + if not (self.connection.state == 'ok' and self.inflightOp is None and self.pendingOp is not None): + return + + self.inflightOp = self.pendingOp + self.inflightCallbacks = self.pendingCallbacks + + self.pendingOp = None + self.pendingCallbacks = [] + + self.connection.send({'doc':self.name, 'op':self.inflightOp, 'v':self.version}) + + def on_message(self, msg): + if msg['doc'] != self.name: + return self.emit('error', "Expected docName '{0}' but got {1}".format(self.name, msg['doc'])) + + def _otApply(docOp, isRemote): + oldSnapshot = self.snapshot + self.snapshot = self.doctype.apply(self.snapshot, docOp) + + self.emit('change', docOp, oldSnapshot) + if isRemote: + self.emit('remoteop', docOp, oldSnapshot) - def _onMessage(self, msg): if 'open' in msg: if msg['open'] == True: - self.state = 'open' - self._create = False - if not self.created: - self.created = msg['create'] if 'create' in msg else False - - if 'type' in msg: - self._setType(msg['type']) - if 'create' in msg and msg['create']: - self.created = True - self.snapshot = self.type.create() + + if 'create' in msg and msg['create'] and not self.snapshot: + self.snapshot = self.doctype.create() else: - if self.created != True: - self.created = False if 'snapshot' in msg: self.snapshot = msg['snapshot'] if 'v' in msg: self.version = msg['v'] - if self.inflightOp: - response = {'doc': self.name, 'op': self.inflightOp, 'v': self.version} - if len(self.inflightSubmittedIds): - response.dupIfSource = self.inflightSubmittedIds - self.connection.send(response) - else: - self.flush() - + self.state = 'open' self.emit('open') if self._openCallback: - self._openCallback(None) + self._openCallback(None, self) + self._openCallback = None elif msg['open'] == False: if 'error' in msg: self.emit('error', msg['error']) if self._openCallback: - self._openCallback(msg['error']) - - self.state = 'closed' - self.emit('closed') + self._openCallback(msg['error'], None) + self._openCallback = None - if self._closeCallback: - self._closeCallback() - self._closeCallback = None + self.set_state('closed', 'closed by remote server') + self.connection.closed(self.name) - elif 'op' in msg and msg['op'] is None and msg['error'] == 'Op already submitted': - pass - - elif ('op' not in msg and 'v' in msg) or ('op' in msg and 'meta' in msg and 'source' in msg['meta'] and msg['meta']['source'] in self.inflightSubmittedIds): + elif 'op' not in msg and 'v' in msg: + if msg['v'] != self.version: + return self.emit('error', "Expected version {0} but got {1}".format(self.version, msg['v'])) + oldInflightOp = self.inflightOp self.inflightOp = None - self.inflightSubmittedIds = [] if 'error' in msg: error = msg['error'] - if self.type.invert: - undo = self.type.invert(oldInflightOp) - if self.pendingOp: - self.pendingOp, undo = self._xf(self.pendingOp, undo) - self._otApply(undo, True) - else: - self.emit('error', "Op apply failed ({0}) and the op could not be reverted".format(error)) + undo = self.doctype.invert(oldInflightOp) + if self.pendingOp: + self.pendingOp, undo = self.doctype.transformX(self.pendingOp, undo) + _otApply(undo, True) for callback in self.inflightCallbacks: - callback(error) + callback(error, None) else: - if not msg['v'] == self.version: - raise Exception('Invalid version from server') - self.serverOps[self.version] = oldInflightOp - self.version+=1 + self.version += 1 for callback in self.inflightCallbacks: callback(None, oldInflightOp) self.flush() - elif 'op' in msg: - if msg['v'] < self.version: - return - - if msg['doc'] != self.name: - return self.emit('error', "Expected docName '{0}' but got {1}".format(self.name, msg['doc'])) + elif 'op' in msg and 'v' in msg: if msg['v'] != self.version: return self.emit('error', "Expected version {0} but got {1}".format(self.version, msg['v'])) op = msg['op'] self.serverOps[self.version] = op - docOp = op if self.inflightOp is not None: - [self.inflightOp, docOp] = self._xf(self.inflightOp, docOp) + [self.inflightOp, op] = self.doctype.transformX(self.inflightOp, op) if self.pendingOp is not None: - [self.pendingOp, docOp] = self._xf(self.pendingOp, docOp) + [self.pendingOp, op] = self.doctype.transformX(self.pendingOp, op) - self.version+=1 - self._otApply(docOp, True) - - elif 'meta' in msg: - path = msg['meta']['path'] - value = msg['meta']['value'] - - if path: - if path[0] == 'shout': - return self.emit('shout', value) - else: - print('Unhandled meta op: {0}'.format(msg)) + self.version += 1 + _otApply(op, True) else: - print('Unhandled document message: {0}'.format(msg)) - - def flush(self): - if not (self.connection.state == 'ok' and self.inflightOp is None and self.pendingOp is not None): - return - - self.inflightOp = self.pendingOp - self.inflightCallbacks = self.pendingCallbacks - - self.pendingOp = None - self.pendingCallbacks = [] - - self.connection.send({'doc':self.name, 'op':self.inflightOp, 'v':self.version}) - - def submitOp(self, op, callback): - if self.type.normalize: - op = self.type.normalize(op) - - self.snapshot = self.type.apply(self.snapshot, op) - - if self.pendingOp is not None: - self.pendingOp = self.type.compose(self.pendingOp, op) - else: - self.pendingOp = op - - if callback: - self.pendingCallbacks.append(callback) - - self.emit('change', op) - - self.flush() #setTimeout(self.flush, 0) -- A timeout is used so if the user sends multiple ops at the same time, they'll be composed & sent together. - - def shout(self, msg): - self.connection.send({'doc':self.name, 'meta':{'path':['shout'], 'value':msg}}) - - def open(self, callback=None): - self.autoOpen = True - if self.state != 'closed': - return - - message = {'doc': self.name, 'open': True} - - if not self.snapshot: - message['snapshot'] = None - if self.type: - message['type'] = self.type.name - if self.version: - message['v'] = self.version - if self._create: - message['create'] = True - - self.connection.send(message) - - self.state = 'opening' - - def tempOpenCallback(error): - self._openCallback = None - if callback: - callback(error) - self._openCallback = tempOpenCallback - - def close(self, callback=None): - self.autoOpen = False - if self.state is 'closed': - return callback() if callback else None - - self.connection.send({'doc':self.name, 'open':False}) - - self.state = 'closed' - - self.emit('closing') - self._closeCallback = callback - + logging.error('Unhandled document message: {0}'.format(msg)) diff --git a/collab/model.py b/collab/model.py index b7affb2..7851b41 100644 --- a/collab/model.py +++ b/collab/model.py @@ -1,502 +1,156 @@ -import doctypes, syncQueue, time, re - -class EventEmitter(object): - def __init__(self): - self._events = {} - - def on(self, event, fct): - if event not in self._events: self._events[event] = [] - self._events[event].append(fct) - return self - - def removeListener(self, event, fct): - if event not in self._events: return self - self._events[event].remove(fct) - return self - - def emit(self, event, *args): - if event not in self._events: return self - for callback in self._events[event]: - callback(*args) - return self - - def listeners(self, event): - return self._events.get(event, []) +import doctypes, syncQueue, time, re, logging class CollabModel(object): - def __init__(self, db, options=None): - self.options = options - self.db = db - - if not self.options: - options = {} + def __init__(self, options=None): + self.options = options if options else {} + self.options.setdefault('numCachedOps', 20) + self.options.setdefault('opsBeforeCommit', 20) + self.options.setdefault('maximumAge', 20) self.docs = {} - self._events = {} - - self.awaitingGetSnapshot = {} - - if 'numCachedOps' not in self.options: - self.options['numCachedOps'] = 20 - if 'opsBeforeCommit' not in self.options: - self.options['opsBeforeCommit'] = 20 + def make_op_queue(self, docname, doc): - if 'maximumAge' not in self.options: - self.options['maximumAge'] = 20 - - def on(self, event, fct): - if event not in self._events: self._events[event] = [] - self._events[event].append(fct) - return self - - def removeListener(self, event, fct): - if event not in self._events: return self - self._events[event].remove(fct) - return self - - def emit(self, event, *args): - if event not in self._events: return self - for callback in self._events[event]: - callback(*args) - return self - - def makeOpQueue(self, docName, doc): - - def _queue_process(opData, callback): + def queue_process(opData, callback): if 'v' not in opData or opData['v'] < 0: - return callback('Version missing') + return callback('Version missing', None) if opData['v'] > doc['v']: - return callback('Op at future version') + return callback('Op at future version', None) if opData['v'] < doc['v'] - self.options['maximumAge']: - return callback('Op too old') - - if 'meta' not in opData or not opData['meta']: - opData['meta'] = {} - opData['meta']['ts'] = time.time() - - def _get_ops(error, ops=None): - if error: - return callback(error) - - if doc['v'] - opData['v'] != len(ops): - print("Could not get old ops in model for document {0}".format(docName)) - print("Expected ops {0} to {1} and got {2} ops".format(opData['v'], doc['v'], len(ops))) - return callback('Internal error') - - if len(ops) > 0: - try: - for oldOp in ops: - if 'meta' in oldOp and 'source' in oldOp['meta'] and 'dupIfSource' in opData and oldOp['meta']['source'] in opData['dupIfSource']: - return callback('Op already submitted') - - opData['op'] = doc['type'].transform(opData['op'], oldOp['op'], 'left') - opData['v']+=1 - except Exception as error: - print(error) - return callback(str(error)) - - try: - snapshot = doc['type'].apply(doc['snapshot'], opData['op']) - except Exception as error: - print(error) - return callback(error) - - if opData['v'] != doc['v']: - print("Version mismatch detected in model. File a ticket - this is a bug.") - print("Expecting {0} == {1}".format(opData['v'], doc['v'])) - return callback('Internal error') - - writeOp = self.db.writeOp if self.db and 'writeOp' in self.db else lambda docName, newOpData, callback: callback() - - def _writeop_callback(error=None): - if error: - print("Error writing ops to database: {0}".format(error)) - return callback(error) - - if 'stats' in self.options and 'writeOp' in self.options['stats']: - self.options['stats']['writeOp']() - - oldSnapshot = doc['snapshot'] - - doc['v'] = opData['v'] + 1 - doc['snapshot'] = snapshot - - doc['ops'].append(opData) - if self.db and len(doc['ops']) > self.options['numCachedOps']: - doc['ops'].pop(0) - - self.emit('applyOp', docName, opData, snapshot, oldSnapshot) - doc['eventEmitter'].emit('op', opData, snapshot, oldSnapshot) - - callback(None, opData['v']) - - # I need a decent strategy here for deciding whether or not to save the snapshot. - # - # The 'right' strategy looks something like "Store the snapshot whenever the snapshot - # is smaller than the accumulated op data". For now, I'll just store it every 20 - # ops or something. (Configurable with doc.committedVersion) - if not doc['snapshotWriteLock'] and doc['committedVersion'] + self.options['opsBeforeCommit'] <= doc['v']: - def write_snappy_error(error=None): - if error: - print("Error writing snapshot {0}. This is nonfatal".format(error)) - self.tryWriteSnapshot(docName, write_snappy_error) - writeOp(docName, opData, _writeop_callback) - - self.getOps(docName, opData['v'], doc['v'], _get_ops) + return callback('Op too old', None) + if opData['v'] < 0: + return callback('Invalid version', None) - return syncQueue.syncQueue(_queue_process) + ops = doc['ops'][(len(doc['ops'])+opData['v']-doc['v']):] - def add(self, docName, error, data, committedVersion, ops, dbMeta): - callbacks = None - if docName in self.awaitingGetSnapshot: - callbacks = self.awaitingGetSnapshot[docName] - del self.awaitingGetSnapshot[docName] + if doc['v'] - opData['v'] != len(ops): + logging.error("Could not get old ops in model for document {1}. Expected ops {1} to {2} and got {3} ops".format(docname, opData['v'], doc['v'], len(ops))) + return callback('Internal error', None) - if not error and docName in self.docs: - error = "doc already exists" + for oldOp in ops: + opData['op'] = doc['type'].transform(opData['op'], oldOp['op'], 'left') + opData['v']+=1 - if error: - if callbacks: - for callback in callbacks: - callback(error) - else: - doc = { - 'snapshot': data['snapshot'], - 'v': data['v'], - 'type': data['type'], - 'meta': data['meta'], - 'ops': ops if ops else [], - 'eventEmitter': EventEmitter(), - 'committedVersion': committedVersion if committedVersion else data['v'], - 'snapshotWriteLock': False, - 'dbMeta': dbMeta - } + newSnapshot = doc['type'].apply(doc['snapshot'], opData['op']) - self.docs[docName] = doc + if opData['v'] != doc['v']: + logging.error("Version mismatch detected in model. File a ticket - this is a bug. Expecting {0} == {1}".format(opData['v'], doc['v'])) + return callback('Internal error', None) - doc['opQueue'] = self.makeOpQueue(docName, doc) - - self.emit('add', docName, data) - if callbacks: - for callback in callbacks: - callback(None, doc) + oldSnapshot = doc['snapshot'] + doc['v'] = opData['v'] + 1 + doc['snapshot'] = newSnapshot + for listener in doc['listeners']: + listener(opData, newSnapshot, oldSnapshot) - return doc - - - - def getOpsInternal(self, docName, start, end, callback): - if not self.db: - return callback('Document does not exist') - - def _getops(error, ops): - if error: - if callback: - return callback(error) - return - - v = start - for op in ops: - v+=1 - op['v'] = v - - if callback: - callback(None, ops) - - self.db.getOps(docName, start, end, _getops) - - - - def load(self, docName, callback): - if docName in self.docs: - if 'stats' in self.options and 'cacheHit' in self.options['stats']: - self.options['stats']['cacheHit']('getSnapshot') - return callback(None, self.docs[docName]) - - if not self.db: - return callback('Document does not exist') - - callbacks = self.awaitingGetSnapshot[docName] - - if callbacks: - return callbacks.append(callback) - - if 'stats' in self.options and 'cacheMiss' in self.options['stats']: - self.options['stats']['cacheMiss']('getSnapshot') - - self.awaitingGetSnapshot[docName] = callbacks - - def _get_snappy(error, data, dbMeta): - if error: - return self.add(docName, error) - - type = doctypes.types[data['type']] - if not type: - print("Type '{0}' missing".format(data.type)) - return callback("Type not found") - data['type'] = type - - committedVersion = data['v'] - - def _get_ops_internal(error, ops): + def save_op_callback(error=None): if error: - return callback(error) - - if len(ops) > 0: - print("Catchup {0} {1} -> {2}".format(docName, data['v'], data['v'] + len(ops))) # not an error? - - try: - for op in ops: - data['snapshot'] = type.apply(data['snapshot'], op['op']) - data['v']+=1 - except Exception as e: - print("Op data invalid for {0}: {1}".format(docName, e)) - return callback('Op data invalid') - - self.emit('load', docName, data) - self.add(docName, error, data, committedVersion, ops, dbMeta) - self.getOpsInternal(docName, data['v'], None, _get_ops_internal) - self.db.getSnapshot(docName, _get_snappy) - - - - def tryWriteSnapshot(self, docName, callback): - if not self.db or not docName in self.docs: - return callback() if callback else None - - doc = self.docs[docName] - - if not doc: - return callback() if callback else None - - if doc['committedVersion'] is doc['v']: - return callback() if callback else None - - if doc['snapshotWriteLock']: - return callback('Another snapshot write is in progress') if callback else None - - doc['snapshotWriteLock'] = True - - if 'stats' in self.options and 'writeSnapshot' in self.options['stats']: - self.options['stats']['writeSnapshot']() - - writeSnapshot = self.db.writeSnapshot if self.db else lambda docName, docData, dbMeta, callback: callback() - - data = { - 'v': doc['v'], - 'meta': doc['meta'], - 'snapshot': doc['snapshot'], - 'type': doc['type'].name + logging.error("Error saving op: {0}".format(error)) + return callback(error, None) + else: + callback(None, opData['v']) + self.save_op(docname, opData, save_op_callback) + + return syncQueue.syncQueue(queue_process) + + def save_op(self, docname, op, callback): + doc = self.docs[docname] + doc['ops'].append(op) + if len(doc['ops']) > self.options['numCachedOps']: + doc['ops'].pop(0) + if not doc['savelock'] and doc['savedversion'] + self.options['opsBeforeCommit'] <= doc['v']: + pass + callback(None) + + def exists(self, docname): + return docname in self.docs + + def add(self, docname, data): + doc = { + 'snapshot': data['snapshot'], + 'v': data['v'], + 'type': data['type'], + 'ops': data['ops'], + 'listeners': [], + 'savelock': False, + 'savedversion': 0, } - - def _write_snappy(error, dbMeta): - doc['snapshotWriteLock'] = False - doc['committedVersion'] = data['v'] - doc['dbMeta'] = dbMeta - return callback(error) if callback else None - - self.writeSnapshot(docName, data, doc['dbMeta'], _write_snappy) - - - - def create(self, docName, type, meta, callback=None): - if callable(meta): - callback = meta - meta = {} - - if not re.match("^[A-Za-z0-9._-]*$", docName): + + doc['opQueue'] = self.make_op_queue(docname, doc) + + self.docs[docname] = doc + + def load(self, docname, callback): + # try: + return callback(None, self.docs[docname]) + # except KeyError: + # return callback('Document does not exist', None) + + # self.loadingdocs = {} + # self.loadingdocs.setdefault(docname, []).append(callback) + # if docname in self.loadingdocs: + # for callback in self.loadingdocs[docname]: + # callback(None, doc) + # del self.loadingdocs[docname] + + def create(self, docname, doctype, snapshot=None, callback=None): + if not re.match("^[A-Za-z0-9._-]*$", docname): return callback('Invalid document name') if callback else None - if docName in self.docs: + if self.exists(docname): return callback('Document already exists') if callback else None - if isinstance(type, (str, unicode)): - type = doctypes.types.get(type, None) - - if not type: - return callback('Type not found') if callback else None + if isinstance(doctype, (str, unicode)): + doctype = doctypes.types.get(doctype, None) + if not doctype: + return callback('Invalid document type') if callback else None + doctype = doctype() data = { - 'snapshot': type().create(), - 'type': type.name, - 'meta': meta if meta else {}, - 'v': 0 + 'snapshot': snapshot if snapshot else doctype.create(), + 'type': doctype, + 'v': 0, + 'ops': [] } + self.add(docname, data) - def done(error=None, dbMeta=None): - if error: - return callback(error) if callback else None - - data['type'] = type() - self.add(docName, None, data, 0, [], dbMeta) - self.emit('create', docName, data) - return callback() if callback else None - - if self.db: - self.db.create(docName, data, done) - else: - done() - - - - def delete(self, docName, callback): - doc = None - if docName in self.docs: - doc = self.docs[docName] - del self.docs[docName] - - def done(error=None): - if not error: - model.emit('delete', docName) - return callback(error) if callback else None - - if self.db: - return self.db.delete(docName, doc['dbMeta'] if doc else None, done) - else: - return done() if doc else done('Document does not exist') - - - - def getOps(self, docName, start, end, callback): - if not start >= 0: - raise Exception('start must be 0+') - - if callable(end): - end, callback = None, end - - ops = None - if docName in self.docs: - ops = self.docs[docName]['ops'] - - version = self.docs[docName]['v'] - - if not end: - end = version - start = min(start, end) - - if start == end: - return callback(None, []) - - base = version - len(ops) - - if not self.db or start >= base: - if 'stats' in self.options and 'cacheHit' in self.options['stats']: - self.options['stats']['cacheHit']('getOps') - - return callback(None, ops[(start - base):(end - base)]) + return callback(None) if callback else None - if 'stats' in self.options and 'cacheMiss' in self.options['stats']: - self.options['stats']['cacheMiss']('getOps') + def delete(self, docname, callback=None): + if docname not in self.docs: raise Exception('delete called but document does not exist') + del self.docs[docname] + return callback(None) if callback else None - return self.getOpsInternal(docName, start, end, callback) + def listen(self, docname, listener, callback=None): + def done(error, doc): + if error: return callback(error, None) if callback else None + doc['listeners'].append(listener) + return callback(None, doc['v']) if callback else None + self.load(docname, done) + def remove_listener(self, docname, listener): + if docname not in self.docs: raise Exception('remove_listener called but document not loaded') + self.docs[docname]['listeners'].remove(listener) + def get_version(self, docname, callback): + self.load(docname, lambda error, doc: callback(error, None if error else doc['v'])) - def getSnapshot(self, docName, callback): - self.load(docName, lambda error, doc=None: callback(error, {'v':doc['v'], 'type':doc['type'], 'snapshot':doc['snapshot'], 'meta':doc['meta']} if doc else None)) - - - - def getVersion(self, docName, callback): - self.load(docName, lambda error, doc=None: callback(error, doc['v'] if doc else None)) + def get_doctype(self, docname, callback): + self.load(docname, lambda error, doc: callback(error, None if error else doc['type'])) + def get_snapshot(self, docname, callback): + self.load(docname, lambda error, doc: callback(error, None if error else doc['snapshot'])) + def get_data(self, docname, callback): + self.load(docname, lambda error, doc: callback(error, None if error else doc)) # Ops are queued before being applied so that the following code applies op C before op B: # model.applyOp 'doc', OPA, -> model.applyOp 'doc', OPB # model.applyOp 'doc', OPC - def applyOp(self, docName, opData, callback=None): - def _load(error, doc): - if error: return callback(error) if callback else None - doc['opQueue'](opData, lambda error, newVersion=None: callback(error, newVersion) if callback else None) - self.load(docName, _load) - - - - def applyMetaOp(self, docName, metaOpData, callback): - path = metaOpData['meta']['path'] - value = metaOpData['meta']['value'] - - def _load(error, doc): - if error: - return callback(error) if callback else None - else: - applied = False - if path[0] == 'shout': - doc['eventEmitter'].emit('op', metaOpData) - applied = True - - if applied: - model.emit('applyMetaOp', docName, path, value) - return callback(None, doc['v']) if callback else None - self.load(docName, _load) - - - - def listen(self, docName, version=None, listener=None, callback=None): - if callable(version): - version, listener, callback = None, version, listener - - def _load(error, doc): - if error: - return callback(error) if callback else None - - if version: - def _getops(error, data): - if error: - return callback(error) if callback else None - - doc['eventEmitter'].on('op', listener) - if callback: - callback(None, version) - - for op in data: - listener(op) - if not listener in doc['eventEmitter'].listeners('op'): - break - - self.getOps(docName, version, None, _getops) - - else: - doc['eventEmitter'].on('op', listener) - return callback(None, doc['v']) if callback else None - self.load(docName, _load) - - - - def removeListener(self, docName, listener): - if docName not in self.docs: - raise Exception('removeListener called but document not loaded') - self.docs[docName]['eventEmitter'].removeListener('op', listener) - - - - def flush(self, callback): - if not self.db: - return callback() if callback else None - - global pendingWrites - pendingWrites = 0 - - for docName in self.docs: - doc = self.docs[docName] - if doc['committedVersion'] < doc['v']: - pendingWrites+=1 - - def _write_it_snappy_like(): - global pendingWrites - pendingWrites-=1 - if pendingWrites == 0 and callback: - callback() - callback = None - self.tryWriteSnapshot(docName, _write_it_snappy_like) - - if pendingWrites == 0 and callback: - callback() - - - - def closeDb(self): - if self.db: - self.db.close() - self.db = None + def applyOp(self, docname, op, callback): + self.load(docname, lambda error, doc: callback(error, None) if error else doc['opQueue'](op, callback)) + + def flush(self, callback=None): + return callback() if callback else None + + def close(self): + self.flush() diff --git a/collab/server.py b/collab/server.py index 1861b70..1ccf010 100644 --- a/collab/server.py +++ b/collab/server.py @@ -16,16 +16,16 @@ def __init__(self, options=None): options = {} self.options = options - self.model = model.CollabModel(None, options) + self.model = model.CollabModel(options) self.host = self.options.get('host', '127.0.0.1') self.port = self.options.get('port', 6633) self.server = websocket.WebSocketServer(self.host, self.port) - self.server.on('connection', lambda connection: session.sessionHandler(connection, agent.createAgent(self.model, self.options))) - self.server.on('close', lambda: self.model.closeDb()) + self.server.on('connection', lambda connection: session.CollabUserSession(connection, self.model)) def run_forever(self): threading.Thread(target=self.server.run_forever).start() def close(self): + self.model.close() self.server.close() diff --git a/collab/session.py b/collab/session.py index b66caec..c20e58d 100644 --- a/collab/session.py +++ b/collab/session.py @@ -1,260 +1,154 @@ -import hat, syncQueue +import hat, syncQueue, logging -class sessionHandler(object): - def __init__(self, session, createAgent): - self.session = session - self.data = {'headers': self.session.headers, 'remoteAddress': self.session.address} - self.agent = None +class CollabUserSession(object): + def __init__(self, connection, model): + self.connection = connection + self.model = model - self.lastSentDoc = None - self.lastReceivedDoc = None + self.docs = {} + self.userid = hat.hat() - self.docState = {} - - self.buffer = [] - self.bufferMsg = lambda msg: self.buffer.append(msg) - self.session.on('message', self.bufferMsg) - - createAgent(self.data, self._agent_message) - - self.session.on('close', self._on_session_close) - - def _agent_message(self, error, agent_): - if error: - self.session.send({'auth':None, 'error':error}) - self.session.stop() + if self.connection.ready(): + self.on_session_create() else: - self.agent = agent_ - self.session.send({'auth':self.agent.sessionId}) + self.connection.on('ok', lambda: self.on_session_create) + self.connection.on('close', self.on_session_close) + self.connection.on('message', self.on_session_message) - self.session.removeListener('message', self.bufferMsg) - [self.handleMessage(msg) for msg in self.buffer] - self.buffer = None - self.session.on('message', self.handleMessage) + def on_session_create(self): + self.connection.send({'auth':self.userid}) - def _on_session_close(self): - if not self.agent: - return - for docName in self.docState: - if 'listener' in self.docState[docName] and self.docState[docName]['listener']: - self.agent.removeListener(docName) - self.docState = None + def on_session_close(self): + for docname in self.docs: + if 'listener' in self.docs[docname]: + self.model.remove_listener(docname, self.docs[docname]['listener']) + self.docs = None - def handleMessage(self, query, callback=None): + def on_session_message(self, query, callback=None): error = None - if not (('doc' in query and query['doc'] is None) or ('doc' in query and isinstance(query['doc'], (str, unicode))) or ('doc' not in query and self.lastReceivedDoc)): - error = 'Invalid docName' + if 'doc' not in query or not isinstance(query['doc'], (str, unicode)): + error = 'doc name invalid or missing' if 'create' in query and query['create'] is not True: error = "'create' must be True or missing" + if 'create' in query and query['create'] is True and 'type' not in query: + error = "create:True requires type specified" + if 'create' not in query and 'type' in query: + error = "'type' must only be set for create commands" if 'open' in query and query['open'] not in [True, False]: error = "'open' must be True, False or missing" - if 'snapshot' in query and query['snapshot'] is not None: - error = "'snapshot' must be None or missing" if 'type' in query and not isinstance(query['type'], (str, unicode)): error = "'type' invalid" if 'v' in query and (not isinstance(query['v'], (int, float)) or query['v'] < 0): error = "'v' invalid" if error: - print("Invalid query {0} from {1}: {2}".format(query, self.agent.sessionId, error)) - self.session.abort() - if callback: - return callback() - else: - return - - if 'doc' in query: - if query['doc'] is None: - query['doc'] = self.lastReceivedDoc = hat.hat() + logging.error("Invalid query {0} from {1}: {2}".format(query, self.userid, error)) + self.connection.abort() + return callback() if callback else None + + if query['doc'] not in self.docs: + self.docs[query['doc']] = {'queue': syncQueue.syncQueue(self.handle_message)} + + self.docs[query['doc']]['queue'](query) + + def handle_message(self, query, callback): + if not self.docs: + return callback() + + if 'open' in query and query['open'] == False: + if 'listener' not in self.docs[query['doc']]: + self.send({'doc':query['doc'], 'open':False, 'error':'Doc is not open'}) else: - self.lastReceivedDoc = query['doc'] - else: - if not self.lastReceivedDoc: - print("msg.doc missing in query {0} from {1}".format(query, self.agent.sessionId)) - return self.session.abort() - query['doc'] = self.lastReceivedDoc - - if query['doc'] not in self.docState: - self.docState[query['doc']] = {} - - if 'queue' not in self.docState[query['doc']]: - def _queue_func(query, callback): - if not self.docState: - return callback() - - if 'open' in query and query['open'] == False: - self.handleClose(query, callback) - elif 'open' in query or ('snapshot' in query and query['snapshot'] is None) or 'create' in query: - self.handleOpenCreateSnapshot(query, callback) - elif 'op' in query or ('meta' in query and 'path' in query['meta']): - self.handleOp(query, callback) - else: - print("Invalid query {0} from {1}".format(json.dumps(query), self.agent.sessionId)) - self.session.abort() - callback() + self.model.remove_listener(query['doc'], self.docs[query['doc']]['listener']) + del self.docs[query['doc']]['listener'] + self.send({'doc':query['doc'], 'open':False}) + callback() - self.docState[query['doc']]['queue'] = syncQueue.syncQueue(_queue_func) + elif 'open' in query or ('snapshot' in query and query['snapshot'] is None) or 'create' in query: + self.handle_opencreatesnapshot(query, callback) - self.docState[query['doc']]['queue'](query) + elif 'op' in query and 'v' in query: + def apply_op(error, appliedVersion): + self.send({'doc':query['doc'], 'v':None, 'error':error} if error else {'doc':query['doc'], 'v':appliedVersion}) + callback() + self.model.applyOp(query['doc'], {'doc':query['doc'], 'v':query['v'], 'op':query['op'], 'source':self.userid}, apply_op) - def send(self, response): - if response['doc'] is self.lastSentDoc: - del response['doc'] else: - self.lastSentDoc = response['doc'] - - if self.session.ready(): - self.session.send(response) - - def open(self, docName, version, callback): - if not self.docState: - return callback('Session closed') - if docName not in self.docState: - self.docState[docName] = {'queue':None, 'listener':None} - if 'listener' in self.docState[docName]: - return callback('Document already open') - - def _doc_listener(opData, snapshot, oldsnapshot): - if 'source' in opData['meta'] and opData['meta']['source'] is self.agent.sessionId: - return - opMsg = {'doc': docName, 'op': opData['op'], 'v': opData['v'], 'meta': opData['meta']} - self.send(opMsg) - - self.docState[docName]['listener'] = _doc_listener - - def _listen(error, v): - if error: - self.docState[docName]['listener'] = None - callback(error, v) - self.agent.listen(docName, version, self.docState[docName]['listener'], _listen) - - def close(self, docName, callback): - if not self.docState: - return callback('Session closed') - if docName not in self.docState: - return callback('Doc does not exist') - - listener = self.docState[docName]['listener'] - if not listener: - return callback('Doc already closed') - - self.agent.removeListener(docName) - self.docState[docName]['listener'] = None - return callback() - - def handleOpenCreateSnapshot(self, query, finished): - docName = query['doc'] - msg = {'doc':docName} - - def callback(error=None): - if error: - if 'open' in msg and msg['open'] == True: - self.close(docName) - if 'open' in query and query['open'] == True: - msg['open'] = False - if 'snapshot' in query: - msg['snapshot'] = None - if 'create' in msg: - del msg['create'] - - msg['error'] = error - - self.send(msg) - finished() - - if 'doc' not in query: - return callback('No docName specified') - - if 'create' in query and query['create'] == True: - if 'type' not in query or not isinstance(query['type'], (str, unicode)): - return callback('create:True requires type specified') - - if 'meta' in query: - if not isinstance(query['meta'], dict): - return callback('meta must be a dict') - - self.docData = None - - def step1Create(): - if 'create' not in query or query['create'] != True: - return step2Snapshot() + logging.error("Invalid query {0} from {1}".format(query, self.userid)) + self.connection.abort() + callback() - if self.docData: - msg['create'] = False - return step2Snapshot() - else: - def _agent_create(error=None): - if error == 'Document already exists': - def _agent_get_snapshot(error, data): - if error: - return callback(error) - self.docData = data - msg['create'] = False - return step2Snapshot() - self.agent.getSnapshot(docName, _agent_get_snapshot) - elif error: - return callback(error) - else: - msg['create'] = True - return step2Snapshot() - return self.agent.create(docName, query['type'] if 'type' in query else None, query['meta'] if 'meta' in query else {}, _agent_create) + def on_remote_message(self, message, snapshot, oldsnapshot): + if message['source'] is self.userid: return + self.send(message) - def step2Snapshot(): - if 'snapshot' not in query or query['snapshot'] != None or ('create' in msg and msg['create'] == True): - return step3Open() + def send(self, msg): + self.connection.send(msg) - if self.docData: - msg['v'] = self.docData['v'] - if not 'type' in query or query['type'] != self.docData['type'].name: - msg['type'] = self.docData['type'].name - msg['snapshot'] = self.docData['snapshot'] - else: - return callback('Document does not exist') + def handle_opencreatesnapshot(self, query, callback): + def finished(message): + if 'error' in message: + if 'create' in query and 'create' not in message: message['create'] = False + if 'snapshot' in query and 'snapshot' not in message: message['snapshot'] = None + if 'open' in query and 'open' not in message: message['open'] = False + self.send(message) + callback() - return step3Open() + def step1Create(message): + if 'create' not in query: + return step2Snapshot(message) + + def model_create(error=None): + if error == 'Document already exists': + message['create'] = False + if 'open' not in query: + message['error'] = error + return finished(message) + return step2Snapshot(message) + elif error: + message['create'] = False + message['error'] = error + return finished(message) + else: + message['create'] = True + return step2Snapshot(message) - def step3Open(): - if 'open' not in query or query['open'] != True: - return callback() + self.model.create(query['doc'], query['type'], query.get('snapshot', None), model_create) - if 'type' in query and self.docData and query['type'] != self.docData['type'].name: - return callback('Type mismatch') + def step2Snapshot(message): + if 'snapshot' not in query or query['snapshot'] is not None or message['create']: + return step3Open(message) - def _open_doc(error, version): + def model_get_data(error, data): if error: - return callback(error) - msg['open'] = True - msg['v'] = version - return callback() - return self.open(docName, query['v'] if 'v' in query else None, _open_doc) - - def _agent_get_snapshot(error, data): - if error and error != 'Document does not exist': - return callback(error) - self.docData = data - return step1Create() - - if query['snapshot'] == None or query['open'] == True: - return self.agent.getSnapshot(query['doc'], _agent_get_snapshot) - else: - return step1Create() - - def handleClose(self, query, callback): - def _close_doc(error=None): - if error: - self.send({'doc':query['doc'], 'open':False, 'error':error}) - else: - self.send({'doc':query['doc'], 'open':False}) - callback() - self.close(query['doc'], _close_doc) - - def handleOp(self, query, callback): - if 'v' not in query: - raise Exception('No version specified') - - opData = {'v':query['v'], 'op':query['op'], 'meta':query['meta'] if 'meta' in query else {}, 'dupIfSource':query['dupIfSource'] if 'dupIfSource' in query else None} - - def _agent_submitop(error, appliedVersion): - self.send({'doc':query['doc'], 'v':None, 'error':error} if error else{'doc':query['doc'], 'v':appliedVersion}) - callback() - self.agent.submitOp(query['doc'], opData, callback if ('op' not in opData and 'meta' in opData and 'path' in opData['meta']) else _agent_submitop) + message['snapshot'] = None + message['error'] = error + return finished(message) + message['v'] = data['v'] + message['snapshot'] = data['snapshot'] + return step3Open(message) + + return self.model.get_data(query['doc'], model_get_data) + + def step3Open(message): + if 'open' not in query: + return finished(message) + + doc = self.docs[query['doc']] + if 'listener' in doc: + message['open'] = True + return finished(message) + + doc['listener'] = self.on_remote_message + + def model_listen(error, v): + if error: + del doc['listener'] + message['open'] = False + message['error'] = error + message['open'] = True + if 'v' not in message: message['v'] = v + return finished(message) + self.model.listen(query['doc'], doc['listener'], model_listen) + + step1Create({'doc':query['doc']}) \ No newline at end of file diff --git a/collab/text.py b/collab/text.py index e420145..ba5bf62 100644 --- a/collab/text.py +++ b/collab/text.py @@ -1,5 +1,4 @@ - class CollabSystem(object): def __init__(self): pass diff --git a/collab/websocket.py b/collab/websocket.py index 3e677e8..48d6755 100644 --- a/collab/websocket.py +++ b/collab/websocket.py @@ -787,7 +787,7 @@ def emit(self, event, *args): def send(self, data): #print('Sending:{0}'.format(data)) - self.sock.send(data) + self.sock.send(json.dumps(data)) def close(self): self.keep_running = False @@ -849,6 +849,7 @@ def run(self): self.ws.setup(self.sock) self._ready = True + self.emit('ok') while self._ready: data = self.ws.recv() @@ -865,11 +866,11 @@ def _handshake(self): try: resp_headers = self._read_headers() except Exception as e: - print("Invalid WebSocket Header") + logging.error("Invalid WebSocket Header") return False if not self._validate_header(resp_headers): - print("Invalid WebSocket Header") + logging.error("Invalid WebSocket Header") return False key = base64.encodestring(hashlib.sha1(resp_headers['sec-websocket-key'] + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11").digest()).strip() @@ -930,9 +931,8 @@ def close(self): def send(self, msg): if not self._ready: return - msg = json.dumps(msg) #print('Sending to {0}:{1}'.format(self.address, msg)) - self.ws.send(msg) + self.ws.send(json.dumps(msg)) def ready(self): return self._ready diff --git a/collaboration.py b/collaboration.py index 6eb1e71..25f2b40 100644 --- a/collaboration.py +++ b/collaboration.py @@ -17,6 +17,7 @@ def emit(self, event, *args): if event not in self._events: return for callback in self._events[event]: callback(*args) + return self def on_modified(self, view): self.emit("modified", view) @@ -39,9 +40,6 @@ def on_pre_save(self, view): def on_post_save(self, view): self.emit("post_save", view) - def on_modified(self, view): - self.emit("modified", view) - def on_selection_modified(self, view): self.emit("selection_modified", view) @@ -88,8 +86,7 @@ def emit(self, event, *args): return self def focus(self): - pass - #sublime.set_timeout(lambda: self.window.focus_view(self.view), 0) + sublime.set_timeout(lambda: sublime.active_window().focus_view(self.view), 0) def _on_view_modified(self, view): if not self.view: return @@ -106,6 +103,7 @@ def _on_view_close(self, view): if not self.view: return if view.id() == self.view.id() and self.doc: print("closed "+self.doc.name) + self.doc.close() SublimeListener.removeListener("modified", self._on_view_modified) SublimeListener.removeListener("close", self._on_view_close) self.doc.removeListener('insert', self._on_doc_insert) @@ -140,6 +138,7 @@ def _getText(self): return self.view.substr(sublime.Region(0, self.view.size())).replace('\r\n', '\n') def _replaceText(self, text): + if self._getText() == text: return edit = self.view.begin_edit() self.view.replace(edit, sublime.Region(0, self.view.size()), text) self.view.end_edit(edit) @@ -181,17 +180,34 @@ def open(self, name): return editors[name].focus() client.open(name, 'text', self.open_callback) + def add_current(self, name): + global client + if not client: return + if name in editors: + return editors[name].focus() + view = sublime.active_window().active_view() + client.open(name, 'text', lambda error, doc: self.add_callback(view, error, doc), snapshot=view.substr(sublime.Region(0, view.size()))) + def open_callback(self, error, doc): if error: sublime.error_message("Error opening document: {0}".format(error)) else: sublime.set_timeout(lambda: self.create_editor(doc), 0) + def add_callback(self, view, error, doc): + if error: + sublime.error_message("Error adding document: {0}".format(error)) + else: + sublime.set_timeout(lambda: self.add_editor(view, doc), 0) + def create_editor(self, doc): - global editors view = sublime.active_window().new_file() view.set_scratch(True) view.set_name(doc.name) + self.add_editor(view, doc) + + def add_editor(self, view, doc): + global editors editor = SublimeEditor(view, doc) editor.on('close', lambda: editors.pop(doc.name)) editors[doc.name] = editor @@ -207,20 +223,35 @@ def toggle_server(self): server.run_forever() print("server started") -class ConnectToServerCommand(sublime_plugin.ApplicationCommand, SublimeCollaboration): +class CollabConnectToServerCommand(sublime_plugin.ApplicationCommand, SublimeCollaboration): def run(self): sublime.active_window().show_input_panel("Enter server IP:", "localhost", self.connect, None, None) -class DisconnectFromServerCommand(sublime_plugin.ApplicationCommand, SublimeCollaboration): +class CollabDisconnectFromServerCommand(sublime_plugin.ApplicationCommand, SublimeCollaboration): def run(self): self.disconnect() + def is_enabled(self): + global client + return client -class ToggleServerCommand(sublime_plugin.ApplicationCommand, SublimeCollaboration): +class CollabToggleServerCommand(sublime_plugin.ApplicationCommand, SublimeCollaboration): def run(self): self.toggle_server() -class OpenDocumentCommand(sublime_plugin.ApplicationCommand, SublimeCollaboration): +class CollabOpenDocumentCommand(sublime_plugin.ApplicationCommand, SublimeCollaboration): def run(self): global client if not client: return - sublime.active_window().show_input_panel("Enter document name:", "blag", self.open, None, None) \ No newline at end of file + sublime.active_window().show_input_panel("Enter document name:", "blag", self.open, None, None) + def is_enabled(self): + global client + return client + +class CollabAddCurrentDocumentCommand(sublime_plugin.ApplicationCommand, SublimeCollaboration): + def run(self): + global client + if not client: return + sublime.active_window().show_input_panel("Enter document name:", "blag", self.add_current, None, None) + def is_enabled(self): + global client + return client \ No newline at end of file diff --git a/license.txt b/license.txt new file mode 100644 index 0000000..846faa3 --- /dev/null +++ b/license.txt @@ -0,0 +1,21 @@ +This is free and unencumbered software released into the public domain. +Anyone is free to copy, modify, publish, use, compile, sell, or +distribute this software, either in source code form or as a compiled +binary, for any purpose, commercial or non-commercial, and by any +means. + +In jurisdictions that recognize copyright laws, the author or authors +of this software dedicate any and all copyright interest in the +software to the public domain. We make this dedication for the benefit +of the public at large and to the detriment of our heirs and +successors. We intend this dedication to be an overt act of +relinquishment in perpetuity of all present and future rights to this +software under copyright law. + +The software is provided "as is", without warranty of any kind, +express or implied, including but not limited to the warranties of +merchantibility, fitness for a particular purpose and noninfringement. +In no event shall the authors be liable for any claim, damages or +other liability, whether in an action of contract, tort or otherwise, +arising from, out of or in connection with the software or the use or +other dealings in the software. \ No newline at end of file diff --git a/readme.md b/readme.md index 2ce5381..0c217dc 100644 --- a/readme.md +++ b/readme.md @@ -5,19 +5,23 @@ A real-time collaborative editing plugin for Sublime Text 2. Instructions ------------ -Please note that this plugin is in a very early beta state, and most if not all of the user interface and API will probably be rewritten at some point, so this is not representative of what the final product will be like. +Please note that this plugin is in an early beta state, and much of the user interface and API will probably be rewritten at some point, so this is not representative of what the final product will be like. ### Install Put everything in this repo into a folder named Collaboration under the /Data/Packages/ directory in your Sublime Text 2 folder. ### Usage -Currently the only way to interact with the plugin is through keyboard shortcuts. +You can run commands via keyboard shortcuts or with the command prompt via Ctrl-Shift-P. -#### Keyboard Shortcuts -*ctrl+alt+c* Connect to a server -*ctrl+alt+d* Disconnect from server -*ctrl+alt+o* Open a document on the server -*ctrl+alt+s* Toggle local server on and off +#### Commands +"Collaboration: Connect To Server": Connects to a Sublime Collaboration server. *ctrl+alt+c* +"Collaboration: Disconnect From Server": Disconnect from server. *ctrl+alt+d* +"Collaboration: Toggle Local Server": Toggles local server on and off. *ctrl+alt+s* +"Collaboration: Open Document": Open a new or preexisting document on the server. *ctrl+alt+o* +"Collaboration: Add Current Document": Uploads the currently open document to the server for collaborative editing. *ctrl+alt+a* #### Example -If you're just testing this out, first start a local server with ctrl+alt+s. Then connect to it with ctrl+alt+c. Then open a document with ctrl+alt+o. Currently you can't open the same document in the same Sublime Text process, so you'll need to connect to the server again on another computer to test out the collaborative aspects. It uses port 6633 if you need to make firewall rules. \ No newline at end of file +If you're just testing this out, first toggle on your local server. Then connect to it and open a new document with the "Open Document" command. Currently you can't open the same document in the same Sublime Text process, so you'll need to connect to the server again from another computer to test out the collaborative aspects. It uses port 6633 if you need to make firewall rules. If all goes well, you should see changes in one buffer replicated on the other! + +#### Bugs +If you find something that creates an error or doesn't seem to be working properly, please make an issue about it. There are bound to be errors that I don't catch, so any feedback would be appreciated!