Skip to content

Commit

Permalink
add useCount to DataSync for efficient count queries
Browse files Browse the repository at this point in the history
  • Loading branch information
mpscholten committed Dec 3, 2024
1 parent 0ac5d13 commit 9bc16f6
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 1 deletion.
43 changes: 43 additions & 0 deletions IHP/DataSync/ControllerImpl.hs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,49 @@ buildMessageHandler ensureRLSEnabled installTableChangeTriggers sendJSON handleC
MVar.takeMVar close
handleMessage CreateCountSubscription { query, requestId } = do
ensureBelowSubscriptionsLimit
tableNameRLS <- ensureRLSEnabled query.table
subscriptionId <- UUID.nextRandom
-- Allocate the close handle as early as possible
-- to make DeleteDataSubscription calls succeed even when the CountSubscription is
-- not fully set up yet
close <- MVar.newEmptyMVar
atomicModifyIORef'' ?state (\state -> state |> modify #subscriptions (HashMap.insert subscriptionId close))
let (theQuery, theParams) = compileQueryWithRenamer (renamer query.table) query
let countQuery = "SELECT COUNT(*) FROM (" <> theQuery <> ") AS _inner"
let
unpackResult :: [(Only Int)] -> Int
unpackResult [(Only value)] = value
unpackResult otherwise = error "DataSync.unpackResult: Expected INT, but got something else"
count <- unpackResult <$> sqlQueryWithRLS countQuery theParams
countRef <- newIORef count
installTableChangeTriggers tableNameRLS
let
callback :: ChangeNotifications.ChangeNotification -> IO ()
callback _ = do
newCount <- unpackResult <$> sqlQueryWithRLS countQuery theParams
lastCount <- readIORef countRef
when (newCount /= count) (sendJSON DidChangeCount { subscriptionId, count = newCount })
let subscribe = PGListener.subscribeJSON (ChangeNotifications.channelName tableNameRLS) callback pgListener
let unsubscribe subscription = PGListener.unsubscribe subscription pgListener
Exception.bracket subscribe unsubscribe \channelSubscription -> do
sendJSON DidCreateCountSubscription { subscriptionId, requestId, count }
MVar.takeMVar close
handleMessage DeleteDataSubscription { requestId, subscriptionId } = do
DataSyncReady { subscriptions } <- getState
case HashMap.lookup subscriptionId subscriptions of
Expand Down
3 changes: 3 additions & 0 deletions IHP/DataSync/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import Control.Concurrent.MVar as MVar
data DataSyncMessage
= DataSyncQuery { query :: !DynamicSQLQuery, requestId :: !Int, transactionId :: !(Maybe UUID) }
| CreateDataSubscription { query :: !DynamicSQLQuery, requestId :: !Int }
| CreateCountSubscription { query :: !DynamicSQLQuery, requestId :: !Int }
| DeleteDataSubscription { subscriptionId :: !UUID, requestId :: !Int }
| CreateRecordMessage { table :: !Text, record :: !(HashMap Text Value), requestId :: !Int, transactionId :: !(Maybe UUID) }
| CreateRecordsMessage { table :: !Text, records :: ![HashMap Text Value], requestId :: !Int, transactionId :: !(Maybe UUID) }
Expand All @@ -31,10 +32,12 @@ data DataSyncResponse
| DataSyncError { requestId :: !Int, errorMessage :: !Text }
| FailedToDecodeMessageError { errorMessage :: !Text }
| DidCreateDataSubscription { requestId :: !Int, subscriptionId :: !UUID, result :: ![[Field]] }
| DidCreateCountSubscription { requestId :: !Int, subscriptionId :: !UUID, count :: !Int }
| DidDeleteDataSubscription { requestId :: !Int, subscriptionId :: !UUID }
| DidInsert { subscriptionId :: !UUID, record :: ![Field] }
| DidUpdate { subscriptionId :: !UUID, id :: UUID, changeSet :: !Value }
| DidDelete { subscriptionId :: !UUID, id :: !UUID }
| DidChangeCount { subscriptionId :: !UUID, count :: !Int }
| DidCreateRecord { requestId :: !Int, record :: ![Field] } -- ^ Response to 'CreateRecordMessage'
| DidCreateRecords { requestId :: !Int, records :: ![[Field]] } -- ^ Response to 'CreateRecordsMessage'
| DidUpdateRecord { requestId :: !Int, record :: ![Field] } -- ^ Response to 'UpdateRecordMessage'
Expand Down
2 changes: 2 additions & 0 deletions ihp-datasync-typescript/IHP/DataSync/TypeScript/Compiler.hs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,8 @@ declare module 'ihp-datasync/react' {
*/
function useQuery<table extends TableName, result>(queryBuilder: QueryBuilder<table, result>, options?: DataSubscriptionOptions): Array<result> | null;

function useCount<table extends TableName>(queryBuilder: QueryBuilder<table, any>): number | null;

/**
* A version of `useQuery` when you only want to fetch a single record.
*
Expand Down
2 changes: 2 additions & 0 deletions ihp-datasync-typescript/Test/Spec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,8 @@ tests = do
*/
function useQuery<table extends TableName, result>(queryBuilder: QueryBuilder<table, result>, options?: DataSubscriptionOptions): Array<result> | null;

function useCount<table extends TableName>(queryBuilder: QueryBuilder<table, any>): number | null;

/**
* A version of `useQuery` when you only want to fetch a single record.
*
Expand Down
41 changes: 40 additions & 1 deletion lib/IHP/DataSync/react.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import React, { useState, useEffect, useContext, useSyncExternalStore } from 'react';
import React, { useState, useEffect, useContext, useSyncExternalStore, useRef, useMemo } from 'react';
import { DataSubscription, DataSyncController } from './ihp-datasync.js';

// Most IHP apps never use this context because they use session cookies for auth.
Expand Down Expand Up @@ -99,4 +99,43 @@ export class DataSubscriptionStore {
return subscription;
}
}
}

export function useCount(queryBuilder) {
const count = useRef(null);
const getSnapshot = useMemo(() => () => count.current, []);
const subscribe = useMemo(() => (onStoreChange) => {
const controller = DataSyncController.getInstance();
var isActive = true;
var subscriptionId = null;
const onMessage = (message) => {
if (message.tag === 'DidChangeCount' && message.subscriptionId === subscriptionId) {
count.current = message.count;
onStoreChange();
}
};
controller.sendMessage({ tag: 'CreateCountSubscription', query: queryBuilder.query })
.then((response) => {
if (isActive) {
subscriptionId = response.subscriptionId;
count.current = response.count;
onStoreChange();

controller.addEventListener('message', onMessage);
} else {
controller.sendMessage({ tag: 'DeleteDataSubscription', subscriptionId: response.subscriptionId });
}
})

return () => {
isActive = false;

if (subscriptionId) {
controller.sendMessage({ tag: 'DeleteDataSubscription', subscriptionId });
}
controller.removeEventListener('message', onMessage);
}
}, [JSON.stringify(queryBuilder.query)]);

return useSyncExternalStore(subscribe, getSnapshot);
}

0 comments on commit 9bc16f6

Please sign in to comment.