Skip to content

Commit

Permalink
Merge pull request #1103 from digitallyinduced/datasync
Browse files Browse the repository at this point in the history
DataSync
  • Loading branch information
mpscholten authored Oct 7, 2021
2 parents 958dbb3 + 434c377 commit f01ec32
Show file tree
Hide file tree
Showing 17 changed files with 1,114 additions and 1 deletion.
1 change: 1 addition & 0 deletions .ghci
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
:set -XBlockArguments
:set -XMultiWayIf
:set -XFunctionalDependencies
:set -XTemplateHaskell
:set -fbyte-code
:set -Wno-partial-type-signatures
:set -XPartialTypeSignatures
Expand Down
126 changes: 126 additions & 0 deletions IHP/DataSync/ChangeNotifications.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
module IHP.DataSync.ChangeNotifications
( watchInsertOrUpdateTable
, eventName
, ChangeNotification (..)
) where

import IHP.Prelude
import qualified Database.PostgreSQL.Simple as PG
import qualified Database.PostgreSQL.Simple.Types as PG
import qualified Database.PostgreSQL.Simple.Notification as PG
import Control.Concurrent.Async
import IHP.ModelSupport
import Data.String.Interpolate.IsString (i)
import Data.Aeson
import Data.Aeson.TH
import qualified Data.Text as Text
import qualified Control.Concurrent.MVar as MVar
import IHP.DataSync.DynamicQuery (transformColumnNamesToFieldNames)
import qualified IHP.DataSync.RowLevelSecurity as RLS

data ChangeNotification
= DidInsert { id :: UUID }
| DidUpdate { id :: UUID, changeSet :: Value }
| DidDelete { id :: UUID }

-- | The table is wrapped as a TableWithRLS to ensure that the RLS has been checked before calling this
watchInsertOrUpdateTable :: (?modelContext :: ModelContext) => RLS.TableWithRLS -> IO (MVar.MVar ChangeNotification)
watchInsertOrUpdateTable table = do
let tableName = table |> get #tableName
let (listenStatement, listenArgs) = ("LISTEN ?", [PG.Identifier (eventName tableName)])

latestNotification <- MVar.newEmptyMVar

async do

withDatabaseConnection \databaseConnection -> do
PG.execute databaseConnection (PG.Query $ cs $ createNotificationFunction tableName) ()

forever do
notification <- withDatabaseConnection \databaseConnection -> do
PG.execute databaseConnection listenStatement listenArgs
PG.getNotification databaseConnection

case decode (cs $ get #notificationData notification) of
Just notification -> MVar.putMVar latestNotification notification

Nothing -> pure ()

pure latestNotification

-- | Returns the sql code to set up a database trigger. Mainly used by 'watchInsertOrUpdateTable'.
createNotificationFunction :: Text -> Text
createNotificationFunction tableName = [i|
BEGIN;

CREATE OR REPLACE FUNCTION #{functionName}() RETURNS TRIGGER AS $$
DECLARE
updated_values jsonb;
BEGIN
CASE TG_OP
WHEN 'UPDATE' THEN
SELECT jsonb_object_agg(n.key, n.value)
INTO updated_values
FROM jsonb_each(to_jsonb(OLD)) o
JOIN jsonb_each(to_jsonb(NEW)) n USING (key)
WHERE n.value IS DISTINCT FROM o.value;

PERFORM pg_notify(
'#{eventName tableName}',
json_build_object(
'UPDATE', NEW.id::text,
'CHANGESET', updated_values
)::text
);
WHEN 'DELETE' THEN
PERFORM pg_notify(
'#{eventName tableName}',
(json_build_object('DELETE', OLD.id)::text)
);
WHEN 'INSERT' THEN
PERFORM pg_notify(
'#{eventName tableName}',
json_build_object('INSERT', NEW.id)::text
);
END CASE;
RETURN new;
END;
$$ language plpgsql;

DROP TRIGGER IF EXISTS #{insertTriggerName} ON #{tableName};
DROP TRIGGER IF EXISTS #{updateTriggerName} ON #{tableName};
DROP TRIGGER IF EXISTS #{deleteTriggerName} ON #{tableName};


CREATE TRIGGER #{insertTriggerName} AFTER INSERT ON "#{tableName}" FOR EACH ROW EXECUTE PROCEDURE #{functionName}();
CREATE TRIGGER #{updateTriggerName} AFTER UPDATE ON "#{tableName}" FOR EACH ROW EXECUTE PROCEDURE #{functionName}();
CREATE TRIGGER #{deleteTriggerName} AFTER DELETE ON "#{tableName}" FOR EACH ROW EXECUTE PROCEDURE #{functionName}();

COMMIT;

|]

where
functionName = "notify_did_change_" <> tableName
insertTriggerName = "did_insert_" <> tableName
updateTriggerName = "did_update_" <> tableName
deleteTriggerName = "did_delete_" <> tableName

-- | Returns the event name of the event that the pg notify trigger dispatches
eventName :: Text -> Text
eventName tableName = "did_change_" <> tableName


instance FromJSON ChangeNotification where
parseJSON = withObject "ChangeNotification" $ \values -> insert values <|> update values <|> delete values
where
insert values = do
id <- values .: "INSERT"
pure DidInsert { id }
update values = do
id <- values .: "UPDATE"
changeSet <- values .: "CHANGESET"
pure $ DidUpdate id (transformColumnNamesToFieldNames changeSet)
delete values = DidDelete <$> values .: "DELETE"

$(deriveToJSON defaultOptions 'DidInsert)
174 changes: 174 additions & 0 deletions IHP/DataSync/Controller.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
{-# LANGUAGE UndecidableInstances #-}
module IHP.DataSync.Controller where

import IHP.ControllerPrelude
import qualified Control.Exception as Exception
import qualified IHP.Log as Log
import qualified Data.Aeson as Aeson

import Data.Aeson.TH
import Data.Aeson
import qualified IHP.QueryBuilder as QueryBuilder
import qualified Database.PostgreSQL.Simple as PG
import qualified Database.PostgreSQL.Simple.FromField as PG
import qualified Database.PostgreSQL.Simple.FromRow as PG
import qualified Database.PostgreSQL.Simple.ToField as PG
import qualified Database.PostgreSQL.Simple.Types as PG
import qualified Database.PostgreSQL.Simple.Notification as PG
import qualified Data.HashMap.Strict as HashMap
import qualified Data.UUID.V4 as UUID
import qualified Control.Concurrent.MVar as MVar
import IHP.DataSync.Types
import IHP.DataSync.RowLevelSecurity
import IHP.DataSync.DynamicQuery
import IHP.DataSync.DynamicQueryCompiler
import qualified IHP.DataSync.ChangeNotifications as ChangeNotifications

instance (
PG.ToField (PrimaryKey (GetTableName CurrentUserRecord))
, Show (PrimaryKey (GetTableName CurrentUserRecord))
, HasNewSessionUrl CurrentUserRecord
, Typeable CurrentUserRecord
, HasField "id" CurrentUserRecord (Id' (GetTableName CurrentUserRecord))
) => WSApp DataSyncController where
initialState = DataSyncController

run = do
setState DataSyncReady { subscriptions = HashMap.empty }

let maybeUserId = get #id <$> currentUserOrNothing

let
handleMessage :: DataSyncMessage -> IO ()
handleMessage DataSyncQuery { query, requestId } = do
ensureRLSEnabled (get #table query)

let (theQuery, theParams) = compileQuery query

result :: [[Field]] <- withRLS $ sqlQuery theQuery theParams

sendJSON DataSyncResult { result, requestId }

handleMessage CreateDataSubscription { query, requestId } = do
tableNameRLS <- ensureRLSEnabled (get #table query)

subscriptionId <- UUID.nextRandom

let (theQuery, theParams) = compileQuery query

result :: [[Field]] <- withRLS $ sqlQuery theQuery theParams

let tableName = get #table query

-- We need to keep track of all the ids of entities we're watching to make
-- sure that we only send update notifications to clients that can actually
-- access the record (e.g. if a RLS policy denies access)
let watchedRecordIds = recordIds result

-- Store it in IORef as an INSERT requires us to add an id
watchedRecordIdsRef <- newIORef watchedRecordIds

notificationStream <- ChangeNotifications.watchInsertOrUpdateTable tableNameRLS

streamReader <- async do
forever do
notification <- MVar.takeMVar notificationStream
case notification of
ChangeNotifications.DidInsert { id } -> do
-- The new record could not be accessible to the current user with a RLS policy
-- E.g. it could be a new record in a 'projects' table, but the project belongs
-- to a different user, and thus the current user should not be able to see it.
--
-- To honor the RLS policies we therefore need to fetch the record as the current user
-- If the result set is empty, we know the record is not accesible to us
newRecord :: [[Field]] <- withRLS $ sqlQuery "SELECT * FROM ? WHERE id = ? LIMIT 1" (PG.Identifier (get #table query), id)
case headMay newRecord of
Just record -> do
-- Add the new record to 'watchedRecordIdsRef'
-- Otherwise the updates and deletes will not be dispatched to the client
modifyIORef watchedRecordIdsRef (id:)
sendJSON DidInsert { subscriptionId, record }
Nothing -> pure ()
ChangeNotifications.DidUpdate { id, changeSet } -> do
-- Only send the notifcation if the deleted record was part of the initial
-- results set
isWatchingRecord <- elem id <$> readIORef watchedRecordIdsRef
when isWatchingRecord do
sendJSON DidUpdate { subscriptionId, id, changeSet }
ChangeNotifications.DidDelete { id } -> do
-- Only send the notifcation if the deleted record was part of the initial
-- results set
isWatchingRecord <- elem id <$> readIORef watchedRecordIdsRef
when isWatchingRecord do
sendJSON DidDelete { subscriptionId, id }
modifyIORef ?state (\state -> state |> modify #subscriptions (HashMap.insert subscriptionId Subscription { id = subscriptionId, tableWatcher = streamReader }))
sendJSON DidCreateDataSubscription { subscriptionId, requestId, result }
handleMessage DeleteDataSubscription { requestId, subscriptionId } = do
DataSyncReady { subscriptions } <- getState
let maybeSubscription :: Maybe Subscription = HashMap.lookup subscriptionId subscriptions
-- Cancel table watcher
case maybeSubscription of
Just subscription -> cancel (get #tableWatcher subscription)
Nothing -> pure ()
modifyIORef ?state (\state -> state |> modify #subscriptions (HashMap.delete subscriptionId))
sendJSON DidDeleteDataSubscription { subscriptionId, requestId }
forever do
message <- Aeson.decode <$> receiveData @LByteString
case message of
Just decodedMessage -> do
let requestId = get #requestId decodedMessage
-- Handle the messages in an async way
-- This increases throughput as multiple queries can be fetched
-- in parallel
async do
result <- Exception.try (handleMessage decodedMessage)
case result of
Left (e :: Exception.SomeException) -> do
Log.error (tshow e)
sendJSON DataSyncError { requestId }
Right result -> pure ()
pure ()
Nothing -> sendJSON FailedToDecodeMessageError
onClose = cleanupAllSubscriptions
cleanupAllSubscriptions :: (?state :: IORef DataSyncController) => IO ()
cleanupAllSubscriptions = do
state <- getState
case state of
DataSyncReady { subscriptions } -> do
forEach (subscriptions |> HashMap.elems) \subscription -> do
cancel (get #tableWatcher subscription)
pure ()
_ -> pure ()
sendJSON payload = sendTextData (Aeson.encode payload)
queryFieldNamesToColumnNames :: SQLQuery -> SQLQuery
queryFieldNamesToColumnNames sqlQuery = sqlQuery
|> modify #orderByClause (map convertOrderByClause)
where
convertOrderByClause OrderByClause { orderByColumn, orderByDirection } = OrderByClause { orderByColumn = cs (fieldNameToColumnName (cs orderByColumn)), orderByDirection }
$(deriveFromJSON defaultOptions 'DataSyncQuery)
$(deriveToJSON defaultOptions 'DataSyncResult)
instance SetField "subscriptions" DataSyncController (HashMap UUID Subscription) where
setField subscriptions record = record { subscriptions }
Loading

0 comments on commit f01ec32

Please sign in to comment.