Skip to content

Commit

Permalink
Added support for react 18's suspense feature in DataSync
Browse files Browse the repository at this point in the history
  • Loading branch information
mpscholten committed Dec 28, 2021
1 parent 3d727b1 commit 0d289cb
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 32 deletions.
103 changes: 96 additions & 7 deletions lib/IHP/DataSync/ihp-datasync.js
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,20 @@ class DataSubscription {
throw new Error("Query passed to `new DataSubscription(..)` doesn't look like a query object. If you're using the `query()` functions to costruct the object, make sure you pass the `.query` property, like this: `new DataSubscription(query('my_table').orderBy('createdAt').query)`");
}
this.query = query;
this.createOnServer();
this.createOnServerPromise = new Promise((resolve, reject) => {
this.resolveCreateOnServer = resolve;
this.rejectCreateOnServer = reject;
});

this.isClosed = false;
this.isConnected = false;
this.connectError = null;
this.subscriptionId = null;
this.subscribers = [];
this.records = [];

this.getRecords = this.getRecords.bind(this);
this.subscribe = this.subscribe.bind(this);
}

async createOnServer() {
Expand All @@ -163,6 +174,9 @@ class DataSubscription {
this.subscriptionId = subscriptionId;

dataSyncController.addEventListener('message', message => {
if (this.isClosed) {
return;
}
if (message.subscriptionId === this.subscriptionId) {
this.receiveUpdate(message);
}
Expand All @@ -171,23 +185,25 @@ class DataSubscription {
dataSyncController.addEventListener('close', this.onDataSyncClosed.bind(this));
dataSyncController.addEventListener('reconnect', this.onDataSyncReconnect.bind(this));

// @ts-expect-error
this.onReady(result);
this.isConnected = true;
this.records = result;

this.resolveCreateOnServer(result);
this.updateSubscribers();
} catch (e) {
throw new Error(e.message + ' while trying to subscribe to:\n' + JSON.stringify(this.query, null, 4));
this.connectError = new Error(e.message + ' while trying to subscribe to:\n' + JSON.stringify(this.query, null, 4));
this.rejectCreateOnServer(this.connectError);
throw this.connectError;
}
}

receiveUpdate(message) {
const tag = message.tag;
if (tag === 'DidUpdate') {
// @ts-expect-error
this.onUpdate(message.id, message.changeSet);
} else if (tag === 'DidInsert') {
// @ts-expect-error
this.onCreate(message.record);
} else if (tag === 'DidDelete') {
// @ts-expect-error
this.onDelete(message.id);
}
}
Expand All @@ -197,19 +213,92 @@ class DataSubscription {
return;
}

// We cannot close the DataSubscription when the subscriptionId is not assigned
if (!this.isClosed && !this.isConnected) {
await this.createOnServerPromise;
this.close();
return;
}

const dataSyncController = DataSyncController.getInstance();
const { subscriptionId } = await dataSyncController.sendMessage({ tag: 'DeleteDataSubscription', subscriptionId: this.subscriptionId });

this.isClosed = true;
this.isConnected = false;

// If a onClose function has been set on this subscription, call it
// Used by the react 18 integration to remove the closed connection from the DataSubscriptionStore
if ('onClose' in this) {
this.onClose();
}
}

onDataSyncClosed() {
this.isClosed = true;
this.isConnected = false;
}

async onDataSyncReconnect() {
await this.createOnServer();
}

onUpdate(id, changeSet) {
this.records = this.records.map(record => {
if (record.id === id) {
return Object.assign({}, record, changeSet);
}

return record;
});

this.updateSubscribers();
}

onCreate(newRecord) {
this.records = [...this.records, newRecord];
this.updateSubscribers();
}

onDelete(id) {
this.records = this.records.filter(record => record.id !== id);
this.updateSubscribers();
}

subscribe(callback) {
if (this.isClosed) {
throw new Error('Trying to subscribe to a DataSubscription that is already closed');
}

this.subscribers.push(callback);

return () => {
this.subscribers.splice(this.subscribers.indexOf(callback), 1);

this.closeIfNotUsed();
}
}

updateSubscribers() {
for (const subscriber of this.subscribers) {
subscriber(this.records);
}
}

getRecords() {
return this.records;
}

/**
* If there's no subscriber on this DataSubscription, we will close it.
*/
closeIfNotUsed() {
const isUsed = this.subscribers.length > 0;
if (isUsed) {
return;
}

this.close();
}
}

function initIHPBackend({ host }) {
Expand Down
34 changes: 9 additions & 25 deletions lib/IHP/DataSync/react.js
Original file line number Diff line number Diff line change
@@ -1,37 +1,21 @@
import React, { useState, useEffect } from 'react';
import { DataSubscription } from './ihp-datasync.js';

// Usage:
//
// const messages = useQuery(query('messages').orderBy('createdAt'));
//
/**
* Returns the result of the current query in real-time. Returns `null` while the data is still being fetched from the server.
* @example
* const messages = useQuery(query('messages').orderBy('createdAt'));
*/
export function useQuery(queryBuilder) {
const [records, setRecords] = useState(null);

useEffect(() => {
const dataSubscription = new DataSubscription(queryBuilder.query);
dataSubscription.createOnServer();

dataSubscription.onReady = setRecords;
dataSubscription.onUpdate = (id, changeSet) => {
setRecords(records => {
for (const record of records) {
if (record.id === id) {
Object.assign(record, changeSet);
break;
}
}

return [...records];
});
}
dataSubscription.onCreate = newRecord => {
setRecords(records => [...records, newRecord]);
};
dataSubscription.onDelete = id => {
setRecords(records => records.filter(record => record.id !== id));
};

return () => { dataSubscription.close() };
// The dataSubscription is automatically closed when the last subscriber on
// the DataSubscription object has been unsubscribed
return dataSubscription.subscribe(setRecords);
}, [
JSON.stringify(queryBuilder.query) /* <-- It's terrible - but it works, we should find a better for this */
])
Expand Down
49 changes: 49 additions & 0 deletions lib/IHP/DataSync/react18.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import React, { useState, useEffect, useSyncExternalStore } from 'react';
import { DataSubscription } from './ihp-datasync.js';

/**
* Returns the result of the current query in real-time. Suspends while the data is still being fetched from the server.
* @example
* const messages = useQuery(query('messages').orderBy('createdAt'));
*/
export function useQuery(queryBuilder) {
const dataSubscription = DataSubscriptionStore.get(queryBuilder.query);

if (dataSubscription.isConnected) {
const records = useSyncExternalStore(dataSubscription.subscribe, dataSubscription.getRecords)
return records;
} else if (dataSubscription.connectError) {
throw dataSubscription.connectError;
} else {
throw dataSubscription.createOnServerPromise;
}
}

export class DataSubscriptionStore {
static queryMap = new Map();
static get(query) {
const strinigifiedQuery = JSON.stringify(query);
const existingSubscription = DataSubscriptionStore.queryMap.get(strinigifiedQuery)

if (existingSubscription) {
return existingSubscription;
} else {
const subscription = new DataSubscription(query);
subscription.createOnServer();
subscription.onClose = () => { DataSubscriptionStore.queryMap.delete(strinigifiedQuery); };

DataSubscriptionStore.queryMap.set(strinigifiedQuery, subscription);

// If the query changes very rapid in `useQuery` it can happen that the `dataSubscription.subscribe`
// is never called at all. In this case we have a unused DataSubscription laying around. We avoid
// to many open connections laying around by trying to close them a second after opening them.
// A second is enough time for react to call the subscribe function. If it's not called by then,
// we most likely deal with a dead subscription, so we close it.
setTimeout(() => {
subscription.closeIfNotUsed();
}, 1000);

return subscription;
}
}
}

0 comments on commit 0d289cb

Please sign in to comment.