diff --git a/.gitignore b/.gitignore index 5f6c423e..e6f12915 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ dist node_modules -tmp \ No newline at end of file +tmp +# Level db database +*dbDir* \ No newline at end of file diff --git a/bin/utils.js b/bin/utils.js index 72775d96..1da841e3 100644 --- a/bin/utils.js +++ b/bin/utils.js @@ -144,12 +144,13 @@ class WSSharedDoc extends Y.Doc { */ const getYDoc = (docname, gc = true) => map.setIfUndefined(docs, docname, () => { const doc = new WSSharedDoc(docname) + let docLoadedPromise = null doc.gc = gc if (persistence !== null) { - persistence.bindState(docname, doc) + docLoadedPromise = persistence.bindState(docname, doc) } docs.set(docname, doc) - return doc + return { doc, docLoadedPromise } }) exports.getYDoc = getYDoc @@ -237,10 +238,20 @@ const pingTimeout = 30000 exports.setupWSConnection = (conn, req, { docName = req.url.slice(1).split('?')[0], gc = true } = {}) => { conn.binaryType = 'arraybuffer' // get doc, initialize if it does not exist yet - const doc = getYDoc(docName, gc) + const { doc, docLoadedPromise } = getYDoc(docName, gc) doc.conns.set(conn, new Set()) + + // it might take some time to load the doc from leveldb + // but before then we still need to listen for websocket events + let isDocLoaded = docLoadedPromise ? false : true + let queuedMessages = [] + let isConnectionAlive = true + // listen and reply to events - conn.on('message', /** @param {ArrayBuffer} message */ message => messageListener(conn, doc, new Uint8Array(message))) + conn.on('message', /** @param {ArrayBuffer} message */ message => { + if (isDocLoaded) messageListener(conn, doc, new Uint8Array(message)) + else queuedMessages.push(new Uint8Array(message)) + }) // Check if connection is still alive let pongReceived = true @@ -248,6 +259,7 @@ exports.setupWSConnection = (conn, req, { docName = req.url.slice(1).split('?')[ if (!pongReceived) { if (doc.conns.has(conn)) { closeConn(doc, conn) + isConnectionAlive = false } clearInterval(pingInterval) } else if (doc.conns.has(conn)) { @@ -256,12 +268,14 @@ exports.setupWSConnection = (conn, req, { docName = req.url.slice(1).split('?')[ conn.ping() } catch (e) { closeConn(doc, conn) + isConnectionAlive = false clearInterval(pingInterval) } } }, pingTimeout) conn.on('close', () => { closeConn(doc, conn) + isConnectionAlive = false clearInterval(pingInterval) }) conn.on('pong', () => { @@ -269,7 +283,7 @@ exports.setupWSConnection = (conn, req, { docName = req.url.slice(1).split('?')[ }) // put the following in a variables in a block so the interval handlers don't keep in in // scope - { + const sendSyncStep1 = () =>{ // send sync step 1 const encoder = encoding.createEncoder() encoding.writeVarUint(encoder, messageSync) @@ -283,4 +297,15 @@ exports.setupWSConnection = (conn, req, { docName = req.url.slice(1).split('?')[ send(doc, conn, encoding.toUint8Array(encoder)) } } + + if (docLoadedPromise) { + docLoadedPromise.then(() => { + if (!isConnectionAlive) return + + isDocLoaded = true + queuedMessages.forEach(message => messageListener(conn, doc, message)) + queuedMessages = null + sendSyncStep1() + }) + } } diff --git a/package.json b/package.json index 5f2a75f9..c74c6288 100644 --- a/package.json +++ b/package.json @@ -11,7 +11,7 @@ "url": "https://github.com/sponsors/dmonad" }, "scripts": { - "start": "node ./bin/server.js", + "start": "HOST=localhost PORT=1234 YPERSISTENCE=./dbDir node ./bin/server.js", "dist": "rm -rf dist && rollup -c && tsc", "lint": "standard && tsc", "test": "npm run lint",