diff --git a/.gitignore b/.gitignore index 2fd2c86..814d49e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +# GHC and Cabal /dist/ /dist-newstyle/ /cabal.project.local diff --git a/example/Example.hs b/example/Example.hs index 5887ff9..c2f50fd 100644 --- a/example/Example.hs +++ b/example/Example.hs @@ -102,6 +102,7 @@ main = do , ccMaxRunningJobs = 1 , ccProcessJob = processJob , ccOnException = handleException + , ccMode = Standard } -- Add a job to the consumer's queue. diff --git a/src/Database/PostgreSQL/Consumers/Components.hs b/src/Database/PostgreSQL/Consumers/Components.hs index 144422d..726aeab 100644 --- a/src/Database/PostgreSQL/Consumers/Components.hs +++ b/src/Database/PostgreSQL/Consumers/Components.hs @@ -95,7 +95,7 @@ runConsumerWithMaybeIdleSignal cc cs mIdleSignal initialJobs <- liftBase $ readTVarIO runningJobsInfo (`fix` initialJobs) $ \loop jobsInfo -> do -- If jobs are still running, display info about them. - when (not $ M.null jobsInfo) $ do + unless (M.null jobsInfo) $ do logInfo "Waiting for running jobs" $ object [ "job_id" .= showJobsInfo jobsInfo ] @@ -108,7 +108,7 @@ runConsumerWithMaybeIdleSignal cc cs mIdleSignal -- If jobs info didn't change, wait for it to change. -- Otherwise loop so it either displays the new info -- or exits if there are no jobs running anymore. - if (newJobsInfo == jobsInfo) + if newJobsInfo == jobsInfo then retry else return $ loop newJobsInfo where @@ -167,7 +167,7 @@ spawnMonitor ConsumerConfig{..} cs cid = forkP "monitor" . forever $ do if ok then logInfo_ "Activity of the consumer updated" else do - logInfo_ $ "Consumer is not registered" + logInfo_ "Consumer is not registered" throwM ThreadKilled -- Freeing jobs locked by inactive consumers needs to happen -- exactly once, otherwise it's possible to free it twice, after @@ -257,8 +257,12 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore let subtractJobs = atomically $ do modifyTVar' runningJobs (subtract batchSize) void . forkP "batch processor" - . (`finally` subtractJobs) . restore $ do - mapM startJob batch >>= mapM joinJob >>= updateJobs + . (`finally` subtractJobs) . restore $ + -- Ensures that we only process one job at a time + -- when running in @'Duplicating'@ mode. + case batch of + Left job -> startJob job >>= joinJob >>= updateJob + Right jobs -> mapM startJob jobs >>= mapM joinJob >>= updateJobs when (batchSize == limit) $ do maxBatchSize <- atomically $ do @@ -269,7 +273,7 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore return (batchSize > 0) - reserveJobs :: Int -> m ([job], Int) + reserveJobs :: Int -> m (Either job [job], Int) reserveJobs limit = runDBT cs ts $ do now <- currentTime n <- runSQL $ smconcat [ @@ -283,21 +287,39 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore , "RETURNING" <+> mintercalate ", " ccJobSelectors ] -- Decode lazily as we want the transaction to be as short as possible. - (, n) . F.toList . fmap ccJobFetcher <$> queryResult + (, n) . limitJobs . F.toList . fmap ccJobFetcher <$> queryResult where + -- Reserve a single job or a list of jobs depending + -- on which @'ccMode'@ the consumer is running in. + limitJobs = case ccMode of + Standard -> Right + Duplicating _field -> Left . head reservedJobs :: UTCTime -> SQL - reservedJobs now = smconcat [ - "SELECT id FROM" <+> raw ccJobsTable - , "WHERE" - , " reserved_by IS NULL" - , " AND run_at IS NOT NULL" - , " AND run_at <= " now - , " ORDER BY run_at" - , "LIMIT" limit - , "FOR UPDATE SKIP LOCKED" - ] + reservedJobs now = case ccMode of + Standard -> smconcat [ + "SELECT id FROM" <+> raw ccJobsTable + , "WHERE" + , " reserved_by IS NULL" + , " AND run_at IS NOT NULL" + , " AND run_at <= " now + , " ORDER BY run_at" + , "LIMIT" limit + , "FOR UPDATE SKIP LOCKED" + ] + Duplicating "id" -> error "Cannot duplicate on the primary key field 'id'" + Duplicating field -> smconcat [ + "WITH latest_for_id AS" + , " (SELECT id," <+> raw field <+> "FROM" <+> raw ccJobsTable + , " ORDER BY run_at," <+> raw field <> ", id DESC LIMIT 1 FOR UPDATE SKIP LOCKED)," + , " lock_all AS" + , " (SELECT id," <+> raw field <+> "FROM" <+> raw ccJobsTable + , " WHERE" <+> raw field <+> "= (SELECT" <+> raw field <+> "FROM latest_for_id)" + , " AND id <= (SELECT id FROM latest_for_id)" + , " FOR UPDATE SKIP LOCKED)" + , "SELECT id FROM lock_all" + ] - -- | Spawn each job in a separate thread. + -- Spawn each job in a separate thread. startJob :: job -> m (job, m (T.Result Result)) startJob job = do (_, joinFork) <- mask $ \restore -> T.fork $ do @@ -311,7 +333,7 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore unregisterJob tid = atomically $ do modifyTVar' runningJobsInfo $ M.delete tid - -- | Wait for all the jobs and collect their results. + -- Wait for all the jobs and collect their results. joinJob :: (job, m (T.Result Result)) -> m (idx, Result) joinJob (job, joinFork) = joinFork >>= \eres -> case eres of Right result -> return (ccJobIndex job, result) @@ -325,14 +347,61 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore ] return (ccJobIndex job, Failed action) - -- | Update status of the jobs. + -- Update the status of a job running in @'Duplicating'@ mode. + updateJob :: (idx, Result) -> m () + updateJob (idx, result) = runDBT cs ts $ do + now <- currentTime + runSQL_ $ case result of + Ok Remove -> deleteQuery + -- TODO: Should we be deduplicating when a job fails with 'Remove' or only + -- remove the failing job? + Failed Remove -> deleteQuery + _ -> retryQuery now (isSuccess result) (getAction result) + where + deleteQuery = "DELETE FROM" <+> raw ccJobsTable <+> "WHERE" <+> raw idxRow <+> "<=" idx + + retryQuery now success action = smconcat + [ "UPDATE" <+> raw ccJobsTable <+> "SET" + , " reserved_by = NULL" + , ", run_at = CASE" + , " WHEN FALSE THEN run_at" + , retryToSQL + , " ELSE NULL" -- processed + , " END" + , if success then smconcat + [ ", finished_at = CASE" + , " WHEN id =" idx <+> "THEN" now + , " ELSE NULL" + , " END" + ] + else "" + , "WHERE" <+> raw idxRow <+> "<=" idx + ] + where + retryToSQL = case action of + RerunAfter int -> "WHEN id =" idx <+> "THEN" now <+> "+" int + RerunAt time -> "WHEN id =" idx <+> "THEN" time + MarkProcessed -> "" + Remove -> error "updateJob: 'Remove' should've been filtered out" + + idxRow = case ccMode of + Standard -> error $ "'updateJob' should never be called when ccMode = " <> show Standard + Duplicating field -> field + + isSuccess (Ok _) = True + isSuccess (Failed _) = False + + getAction (Ok action) = action + getAction (Failed action) = action + + -- Update the status of jobs running in @'Standard'@ mode. updateJobs :: [(idx, Result)] -> m () updateJobs results = runDBT cs ts $ do now <- currentTime - runSQL_ $ smconcat [ - "WITH removed AS (" + runSQL_ $ smconcat + [ "WITH removed AS (" , " DELETE FROM" <+> raw ccJobsTable - , " WHERE id = ANY(" Array1 deletes <+> ")" + , " WHERE id = ANY (" Array1 deletes <+> ")" , ")" , "UPDATE" <+> raw ccJobsTable <+> "SET" , " reserved_by = NULL" @@ -345,7 +414,7 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore , " WHEN id = ANY(" Array1 successes <+> ") THEN " now , " ELSE NULL" , " END" - , "WHERE id = ANY(" Array1 (map fst updates) <+> ")" + , "WHERE id = ANY (" Array1 (map fst updates) <+> ")" ] where retryToSQL now (Left int) ids = @@ -353,7 +422,7 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore retryToSQL _ (Right time) ids = ("WHEN id = ANY(" Array1 ids <+> ") THEN" time :) - retries = foldr step M.empty $ map f updates + retries = foldr (step . f) M.empty updates where f (idx, result) = case result of Ok action -> (idx, action) @@ -364,7 +433,7 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore RerunAfter int -> M.insertWith (++) (Left int) [idx] iretries RerunAt time -> M.insertWith (++) (Right time) [idx] iretries Remove -> error - "updateJobs: Remove should've been filtered out" + "updateJobs: 'Remove' should've been filtered out" successes = foldr step [] updates where diff --git a/src/Database/PostgreSQL/Consumers/Config.hs b/src/Database/PostgreSQL/Consumers/Config.hs index a45370b..bd13b8a 100644 --- a/src/Database/PostgreSQL/Consumers/Config.hs +++ b/src/Database/PostgreSQL/Consumers/Config.hs @@ -1,6 +1,7 @@ {-# LANGUAGE ExistentialQuantification #-} module Database.PostgreSQL.Consumers.Config ( Action(..) + , Mode(..) , Result(..) , ConsumerConfig(..) ) where @@ -21,12 +22,30 @@ data Action | RerunAfter Interval | RerunAt UTCTime | Remove - deriving (Eq, Ord, Show) + deriving (Eq, Ord, Show) -- | Result of processing a job. data Result = Ok Action | Failed Action deriving (Eq, Ord, Show) +-- | The mode the consumer will run in: +-- +-- * @'Standard'@ - Consumer jobs will be run in ascending order +-- based on the __run_at__ field. When jobs are updated, +-- ones that are marked for removal will be deleted. +-- +-- * @'Duplicating' field@ - The job with the highest __id__ for +-- the lowest __run_at__ and __field__ value is selected. Then +-- all jobs that have the same __field__ value and a smaller or equal +-- __id__ value are reserved and run. When /one/ of these jobs are removed +-- all jobs with a smaller or equal __field_ value are also deleted. This +-- essentially allows one to race multiple jobs, only applying the result +-- of whichever job finishes first. +-- +-- Note: One cannot duplicate on the primary key field named @id@ in the @'ccJobsTable'@. +data Mode = Standard | Duplicating (RawSQL ()) + deriving (Show) + -- | Config of a consumer. data ConsumerConfig m idx job = forall row. FromRow row => ConsumerConfig { -- | Name of the database table where jobs are stored. The table needs to have @@ -118,4 +137,8 @@ data ConsumerConfig m idx job = forall row. FromRow row => ConsumerConfig { -- Note that if this action throws an exception, the consumer goes -- down, so it's best to ensure that it doesn't throw. , ccOnException :: !(SomeException -> job -> m Action) +-- | The mode the consumer will use to reserve jobs. +-- In @'Duplicating'@ mode the SQL expression indicates which field +-- in the jobs table (specified by @'ccJobsTable'@) to select for duplication. +, ccMode :: Mode } diff --git a/src/Database/PostgreSQL/Consumers/Utils.hs b/src/Database/PostgreSQL/Consumers/Utils.hs index a6bd8e2..e079602 100644 --- a/src/Database/PostgreSQL/Consumers/Utils.hs +++ b/src/Database/PostgreSQL/Consumers/Utils.hs @@ -27,7 +27,7 @@ finalize m action = do ---------------------------------------- --- | Exception thrown to a thread to stop its execution. +-- Exception thrown to a thread to stop its execution. -- All exceptions other than 'StopExecution' thrown to -- threads spawned by 'forkP' and 'gforkP' are propagated -- back to the parent thread. diff --git a/test/Test.hs b/test/Test.hs index 289364f..1f351c3 100644 --- a/test/Test.hs +++ b/test/Test.hs @@ -11,6 +11,7 @@ module Main where +import Data.Monoid.Utils import Database.PostgreSQL.Consumers import Database.PostgreSQL.PQTypes import Database.PostgreSQL.PQTypes.Checks @@ -37,7 +38,7 @@ import System.Exit import TextShow import qualified Data.Text as T -import qualified Test.HUnit as T +import qualified Test.HUnit as Test data TestEnvSt = TestEnvSt { teCurrentTime :: UTCTime @@ -64,33 +65,113 @@ modifyTestTime :: (MonadState TestEnvSt m) => (UTCTime -> UTCTime) -> m () modifyTestTime modtime = modify (\te -> te { teCurrentTime = modtime . teCurrentTime $ te }) runTestEnv :: ConnectionSourceM (LogT IO) -> Logger -> TestEnv a -> IO a -runTestEnv connSource logger m = - (runLogT "consumers-test" logger defaultLogLevel) - . (runDBT connSource defaultTransactionSettings) - . (\m' -> fst <$> (runStateT m' $ TestEnvSt (UTCTime (ModifiedJulianDay 0) 0) 0)) +runTestEnv connSource logger = + runLogT "consumers-test" logger defaultLogLevel + . runDBT connSource defaultTransactionSettings + . (\m' -> evalStateT m' (TestEnvSt (UTCTime (ModifiedJulianDay 0) 0) 0)) . unTestEnv - $ m main :: IO () -main = void $ T.runTestTT $ T.TestCase test +main = do + connSource <- connectToDB + void . Test.runTestTT $ + Test.TestList + [ Test.TestLabel "Test standard (non-duplicating) consumer config" $ Test.TestCase (test connSource) + , Test.TestLabel "Test duplicating consumer config" $ Test.TestCase (testDuplicating connSource) + ] -test :: IO () -test = do +connectToDB :: IO (ConnectionSource [MonadBase IO, MonadMask]) +connectToDB = do connString <- getArgs >>= \case connString : _args -> return $ T.pack connString [] -> lookupEnv "GITHUB_ACTIONS" >>= \case Just "true" -> return "host=postgres user=postgres password=postgres" _ -> printUsage >> exitFailure - let connSettings = defaultConnectionSettings - { csConnInfo = connString } - ConnectionSource connSource = simpleSource connSettings + pure $ simpleSource defaultConnectionSettings { csConnInfo = connString } + where + printUsage = do + prog <- getProgName + putStrLn $ "Usage: " <> prog <> " " - withSimpleStdOutLogger $ \logger -> +testDuplicating :: ConnectionSource [MonadBase IO, MonadMask] -> IO () +testDuplicating (ConnectionSource connSource) = + withStdOutLogger $ \logger -> runTestEnv connSource logger $ do + createTables + idleSignal <- liftIO newEmptyTMVarIO + let rows = 15 + putJob rows "consumers_test_duplicating_jobs" "consumers_test_duplicating_chan" *> commit + + -- Move time forward 2 hours, because job is scheduled 1 hour into future + modifyTestTime . addUTCTime $ 2*60*60 + finalize (localDomain "process" $ + runConsumerWithIdleSignal duplicatingConsumerConfig connSource idleSignal) $ + waitUntilTrue idleSignal + currentTime >>= (logInfo_ . T.pack . ("current time: " ++) . show) + + runSQL_ "SELECT COUNT(*) from person_test" + rowcount0 :: Int64 <- fetchOne runIdentity + + runSQL_ "SELECT COUNT(*) from consumers_test_duplicating_jobs" + rowcount1 :: Int64 <- fetchOne runIdentity + + liftIO $ Test.assertEqual "Number of rows in person_test is 2×rows" (2 * rows) (fromIntegral rowcount0) + liftIO $ Test.assertEqual "Job in consumers_test_duplicating_jobs should be completed" 0 rowcount1 + + dropTables + where + + tables = [duplicatingConsumersTable, duplicatingJobsTable, personTable] + + migrations = createTableMigration <$> tables + + createTables :: TestEnv () + createTables = do + migrateDatabase defaultExtrasOptions + {- extensions -} [] {- composites -} [] {- domains -} [] + tables migrations + checkDatabase defaultExtrasOptions + {- composites -} [] {- domains -} [] + tables + + dropTables :: TestEnv () + dropTables = do + migrateDatabase defaultExtrasOptions + {- extensions -} [] {- composites -} [] {- domains -} [] {- tables -} [] + [ dropTableMigration duplicatingJobsTable + , dropTableMigration duplicatingConsumersTable + , dropTableMigration personTable + ] + + duplicatingConsumerConfig = ConsumerConfig + { ccJobsTable = "consumers_test_duplicating_jobs" + , ccConsumersTable = "consumers_test_duplicating_consumers" + , ccJobSelectors = ["id", "countdown"] + , ccJobFetcher = id + , ccJobIndex = snd + , ccNotificationChannel = Just "consumers_test_duplicating_chan" + -- select some small timeout + , ccNotificationTimeout = 100 * 1000 -- msec + , ccMaxRunningJobs = 20 + , ccProcessJob = insertNRows . snd + , ccOnException = \err (idx, _) -> handleException err idx + , ccMode = Duplicating "countdown" + } + +insertNRows :: Int32 -> TestEnv Result +insertNRows count = do + replicateM_ (fromIntegral count) $ do + runSQL_ "INSERT INTO person_test (name, age) VALUES ('Anna', 20)" + notify "consumers_test_duplicating_chan" "" + pure $ Ok Remove + +test :: ConnectionSource [MonadBase IO, MonadMask] -> IO () +test (ConnectionSource connSource) = + withStdOutLogger $ \logger -> runTestEnv connSource logger $ do createTables - idleSignal <- liftIO $ atomically $ newEmptyTMVar - putJob 10 >> commit + idleSignal <- liftIO newEmptyTMVarIO + putJob 10 "consumers_test_jobs" "consumers_test_chan" *> commit forM_ [1..10::Int] $ \_ -> do -- Move time forward 2hours, because jobs are scheduled 1 hour into future @@ -101,7 +182,7 @@ test = do currentTime >>= (logInfo_ . T.pack . ("current time: " ++) . show) -- Each job creates 2 new jobs, so there should be 1024 jobs in table. - runSQL_ $ "SELECT COUNT(*) from consumers_test_jobs" + runSQL_ "SELECT COUNT(*) from consumers_test_jobs" rowcount0 :: Int64 <- fetchOne runIdentity -- Move time 2 hours forward modifyTestTime $ addUTCTime (2*60*60) @@ -109,22 +190,16 @@ test = do runConsumerWithIdleSignal consumerConfig connSource idleSignal) $ do waitUntilTrue idleSignal -- Jobs are designed to double only 10 times, so there should be no jobs left now. - runSQL_ $ "SELECT COUNT(*) from consumers_test_jobs" + runSQL_ "SELECT COUNT(*) from consumers_test_jobs" rowcount1 :: Int64 <- fetchOne runIdentity - liftIO $ T.assertEqual "Number of jobs in table after 10 steps is 1024" 1024 rowcount0 - liftIO $ T.assertEqual "Number of jobs in table after 11 steps is 0" 0 rowcount1 + liftIO $ Test.assertEqual "Number of jobs in table consumers_test_jobs 10 steps is 1024" 1024 rowcount0 + liftIO $ Test.assertEqual "Number of jobs in table consumers_test_jobs 11 steps is 0" 0 rowcount1 dropTables where - waitUntilTrue tmvar = whileM_ (not <$> (liftIO $ atomically $ takeTMVar tmvar)) $ return () - - printUsage = do - prog <- getProgName - putStrLn $ "Usage: " <> prog <> " " tables = [consumersTable, jobsTable] -- NB: order of migrations is important. - migrations = [ createTableMigration consumersTable - , createTableMigration jobsTable ] + migrations = createTableMigration <$> tables createTables :: TestEnv () createTables = do @@ -140,44 +215,48 @@ test = do migrateDatabase defaultExtrasOptions {- extensions -} [] {- composites -} [] {- domains -} [] {- tables -} [] [ dropTableMigration jobsTable - , dropTableMigration consumersTable ] + , dropTableMigration consumersTable + ] consumerConfig = ConsumerConfig { ccJobsTable = "consumers_test_jobs" , ccConsumersTable = "consumers_test_consumers" , ccJobSelectors = ["id", "countdown"] , ccJobFetcher = id - , ccJobIndex = \(i::Int64, _::Int32) -> i + , ccJobIndex = fst , ccNotificationChannel = Just "consumers_test_chan" -- select some small timeout , ccNotificationTimeout = 100 * 1000 -- 100 msec , ccMaxRunningJobs = 20 - , ccProcessJob = processJob - , ccOnException = handleException + , ccProcessJob = processJob "consumers_test_jobs" "consumers_test_chan" + , ccOnException = \err (idx, _) -> handleException err idx + , ccMode = Standard } - putJob :: Int32 -> TestEnv () - putJob countdown = localDomain "put" $ do - now <- currentTime - runSQL_ $ "INSERT INTO consumers_test_jobs " - <> "(run_at, finished_at, reserved_by, attempts, countdown) " - <> "VALUES (" now <> " + interval '1 hour', NULL, NULL, 0, " countdown <> ")" - notify "consumers_test_chan" "" - - processJob :: (Int64, Int32) -> TestEnv Result - processJob (_idx, countdown) = do - when (countdown > 0) $ do - putJob (countdown - 1) - putJob (countdown - 1) - commit - return (Ok Remove) - - handleException :: SomeException -> (Int64, Int32) -> TestEnv Action - handleException exc (idx, _countdown) = do - logAttention_ $ - "Job #" <> showt idx <> " failed with: " <> showt exc - return . RerunAfter $ imicroseconds 500000 +waitUntilTrue :: MonadIO m => TMVar Bool -> m () +waitUntilTrue tmvar = whileM_ (not <$> liftIO (atomically $ takeTMVar tmvar)) $ pure () + +putJob :: Int32 -> SQL -> Channel -> TestEnv () +putJob countdown tableName notifyChan = localDomain "put" $ do + now <- currentTime + runSQL_ $ "INSERT INTO" <+> tableName + <+> "(run_at, finished_at, reserved_by, attempts, countdown)" + <+> "VALUES (" now <+> "+ interval '1 hour', NULL, NULL, 0, " countdown <> ")" + notify notifyChan "" +processJob :: SQL -> Channel -> (Int64, Int32) -> TestEnv Result +processJob tableName notifyChan (_idx, countdown) = do + when (countdown > 0) $ do + putJob (countdown - 1) tableName notifyChan + putJob (countdown - 1) tableName notifyChan + commit + return (Ok Remove) + +handleException :: SomeException -> Int64 -> TestEnv Action +handleException exc idx = do + logAttention_ $ + "Job #" <> showt idx <> " failed with: " <> showt exc + return . RerunAfter $ imicroseconds 500000 jobsTable :: Table jobsTable = @@ -208,17 +287,95 @@ jobsTable = ] } +personTable :: Table +personTable = + tblTable + { tblName = "person_test" + , tblVersion = 1 + , tblColumns = + [ tblColumn { colName = "id" + , colType = BigSerialT + , colNullable = False } + , tblColumn { colName = "name" + , colType = TextT + , colNullable = False } + , tblColumn { colName = "age" + , colType = IntegerT + , colNullable = False } + ] + , tblPrimaryKey = pkOnColumn "id" + } + +duplicatingJobsTable :: Table +duplicatingJobsTable = + tblTable + { tblName = "consumers_test_duplicating_jobs" + , tblVersion = 1 + , tblColumns = + [ tblColumn { colName = "id" + , colType = BigSerialT + , colNullable = False } + , tblColumn { colName = "run_at" + , colType = TimestampWithZoneT + , colNullable = True } + , tblColumn { colName = "finished_at" + , colType = TimestampWithZoneT + , colNullable = True } + , tblColumn { colName = "reserved_by" + , colType = BigIntT + , colNullable = True } + , tblColumn { colName = "attempts" + , colType = IntegerT + , colNullable = False } + + -- Non-obligatory field "countdown". Really more of a count + -- and not a countdown, but name is kept to that we can reuse + -- `putJob` function. + , tblColumn { colName = "countdown" + , colType = IntegerT + , colNullable = False } + ] + , tblPrimaryKey = pkOnColumn "id" + , tblForeignKeys = [ + (fkOnColumn "reserved_by" "consumers_test_duplicating_consumers" "id") { + fkOnDelete = ForeignKeySetNull + } + ] + } + consumersTable :: Table consumersTable = tblTable { tblName = "consumers_test_consumers" , tblVersion = 1 , tblColumns = - [ tblColumn { colName = "id", colType = BigSerialT + [ tblColumn { colName = "id" + , colType = BigSerialT + , colNullable = False } + , tblColumn { colName = "name" + , colType = TextT + , colNullable = False } + , tblColumn { colName = "last_activity" + , colType = TimestampWithZoneT + , colNullable = False } + ] + , tblPrimaryKey = pkOnColumn "id" + } + +duplicatingConsumersTable :: Table +duplicatingConsumersTable = + tblTable + { tblName = "consumers_test_duplicating_consumers" + , tblVersion = 1 + , tblColumns = + [ tblColumn { colName = "id" + , colType = BigSerialT , colNullable = False } - , tblColumn { colName = "name", colType = TextT + , tblColumn { colName = "name" + , colType = TextT , colNullable = False } - , tblColumn { colName = "last_activity", colType = TimestampWithZoneT + , tblColumn { colName = "last_activity" + , colType = TimestampWithZoneT , colNullable = False } ] , tblPrimaryKey = pkOnColumn "id"