diff --git a/lib/IHP/DataSync/ihp-datasync.js b/lib/IHP/DataSync/ihp-datasync.js index 1b6d79bfd..2c1e76e90 100644 --- a/lib/IHP/DataSync/ihp-datasync.js +++ b/lib/IHP/DataSync/ihp-datasync.js @@ -150,9 +150,20 @@ class DataSubscription { throw new Error("Query passed to `new DataSubscription(..)` doesn't look like a query object. If you're using the `query()` functions to costruct the object, make sure you pass the `.query` property, like this: `new DataSubscription(query('my_table').orderBy('createdAt').query)`"); } this.query = query; - this.createOnServer(); + this.createOnServerPromise = new Promise((resolve, reject) => { + this.resolveCreateOnServer = resolve; + this.rejectCreateOnServer = reject; + }); + this.isClosed = false; + this.isConnected = false; + this.connectError = null; this.subscriptionId = null; + this.subscribers = []; + this.records = []; + + this.getRecords = this.getRecords.bind(this); + this.subscribe = this.subscribe.bind(this); } async createOnServer() { @@ -163,6 +174,9 @@ class DataSubscription { this.subscriptionId = subscriptionId; dataSyncController.addEventListener('message', message => { + if (this.isClosed) { + return; + } if (message.subscriptionId === this.subscriptionId) { this.receiveUpdate(message); } @@ -171,23 +185,25 @@ class DataSubscription { dataSyncController.addEventListener('close', this.onDataSyncClosed.bind(this)); dataSyncController.addEventListener('reconnect', this.onDataSyncReconnect.bind(this)); - // @ts-expect-error - this.onReady(result); + this.isConnected = true; + this.records = result; + + this.resolveCreateOnServer(result); + this.updateSubscribers(); } catch (e) { - throw new Error(e.message + ' while trying to subscribe to:\n' + JSON.stringify(this.query, null, 4)); + this.connectError = new Error(e.message + ' while trying to subscribe to:\n' + JSON.stringify(this.query, null, 4)); + this.rejectCreateOnServer(this.connectError); + throw this.connectError; } } receiveUpdate(message) { const tag = message.tag; if (tag === 'DidUpdate') { - // @ts-expect-error this.onUpdate(message.id, message.changeSet); } else if (tag === 'DidInsert') { - // @ts-expect-error this.onCreate(message.record); } else if (tag === 'DidDelete') { - // @ts-expect-error this.onDelete(message.id); } } @@ -197,19 +213,92 @@ class DataSubscription { return; } + // We cannot close the DataSubscription when the subscriptionId is not assigned + if (!this.isClosed && !this.isConnected) { + await this.createOnServerPromise; + this.close(); + return; + } + const dataSyncController = DataSyncController.getInstance(); const { subscriptionId } = await dataSyncController.sendMessage({ tag: 'DeleteDataSubscription', subscriptionId: this.subscriptionId }); this.isClosed = true; + this.isConnected = false; + + // If a onClose function has been set on this subscription, call it + // Used by the react 18 integration to remove the closed connection from the DataSubscriptionStore + if ('onClose' in this) { + this.onClose(); + } } onDataSyncClosed() { this.isClosed = true; + this.isConnected = false; } async onDataSyncReconnect() { await this.createOnServer(); } + + onUpdate(id, changeSet) { + this.records = this.records.map(record => { + if (record.id === id) { + return Object.assign({}, record, changeSet); + } + + return record; + }); + + this.updateSubscribers(); + } + + onCreate(newRecord) { + this.records = [...this.records, newRecord]; + this.updateSubscribers(); + } + + onDelete(id) { + this.records = this.records.filter(record => record.id !== id); + this.updateSubscribers(); + } + + subscribe(callback) { + if (this.isClosed) { + throw new Error('Trying to subscribe to a DataSubscription that is already closed'); + } + + this.subscribers.push(callback); + + return () => { + this.subscribers.splice(this.subscribers.indexOf(callback), 1); + + this.closeIfNotUsed(); + } + } + + updateSubscribers() { + for (const subscriber of this.subscribers) { + subscriber(this.records); + } + } + + getRecords() { + return this.records; + } + + /** + * If there's no subscriber on this DataSubscription, we will close it. + */ + closeIfNotUsed() { + const isUsed = this.subscribers.length > 0; + if (isUsed) { + return; + } + + this.close(); + } } function initIHPBackend({ host }) { diff --git a/lib/IHP/DataSync/react.js b/lib/IHP/DataSync/react.js index dbfc11bdf..bf1ea2a72 100644 --- a/lib/IHP/DataSync/react.js +++ b/lib/IHP/DataSync/react.js @@ -1,37 +1,21 @@ import React, { useState, useEffect } from 'react'; import { DataSubscription } from './ihp-datasync.js'; -// Usage: -// -// const messages = useQuery(query('messages').orderBy('createdAt')); -// +/** + * Returns the result of the current query in real-time. Returns `null` while the data is still being fetched from the server. + * @example + * const messages = useQuery(query('messages').orderBy('createdAt')); + */ export function useQuery(queryBuilder) { const [records, setRecords] = useState(null); useEffect(() => { const dataSubscription = new DataSubscription(queryBuilder.query); + dataSubscription.createOnServer(); - dataSubscription.onReady = setRecords; - dataSubscription.onUpdate = (id, changeSet) => { - setRecords(records => { - for (const record of records) { - if (record.id === id) { - Object.assign(record, changeSet); - break; - } - } - - return [...records]; - }); - } - dataSubscription.onCreate = newRecord => { - setRecords(records => [...records, newRecord]); - }; - dataSubscription.onDelete = id => { - setRecords(records => records.filter(record => record.id !== id)); - }; - - return () => { dataSubscription.close() }; + // The dataSubscription is automatically closed when the last subscriber on + // the DataSubscription object has been unsubscribed + return dataSubscription.subscribe(setRecords); }, [ JSON.stringify(queryBuilder.query) /* <-- It's terrible - but it works, we should find a better for this */ ]) diff --git a/lib/IHP/DataSync/react18.js b/lib/IHP/DataSync/react18.js new file mode 100644 index 000000000..dbb8af1de --- /dev/null +++ b/lib/IHP/DataSync/react18.js @@ -0,0 +1,49 @@ +import React, { useState, useEffect, useSyncExternalStore } from 'react'; +import { DataSubscription } from './ihp-datasync.js'; + +/** + * Returns the result of the current query in real-time. Suspends while the data is still being fetched from the server. + * @example + * const messages = useQuery(query('messages').orderBy('createdAt')); + */ +export function useQuery(queryBuilder) { + const dataSubscription = DataSubscriptionStore.get(queryBuilder.query); + + if (dataSubscription.isConnected) { + const records = useSyncExternalStore(dataSubscription.subscribe, dataSubscription.getRecords) + return records; + } else if (dataSubscription.connectError) { + throw dataSubscription.connectError; + } else { + throw dataSubscription.createOnServerPromise; + } +} + +export class DataSubscriptionStore { + static queryMap = new Map(); + static get(query) { + const strinigifiedQuery = JSON.stringify(query); + const existingSubscription = DataSubscriptionStore.queryMap.get(strinigifiedQuery) + + if (existingSubscription) { + return existingSubscription; + } else { + const subscription = new DataSubscription(query); + subscription.createOnServer(); + subscription.onClose = () => { DataSubscriptionStore.queryMap.delete(strinigifiedQuery); }; + + DataSubscriptionStore.queryMap.set(strinigifiedQuery, subscription); + + // If the query changes very rapid in `useQuery` it can happen that the `dataSubscription.subscribe` + // is never called at all. In this case we have a unused DataSubscription laying around. We avoid + // to many open connections laying around by trying to close them a second after opening them. + // A second is enough time for react to call the subscribe function. If it's not called by then, + // we most likely deal with a dead subscription, so we close it. + setTimeout(() => { + subscription.closeIfNotUsed(); + }, 1000); + + return subscription; + } + } +} \ No newline at end of file