Skip to content

Commit

Permalink
rename: deduplication -> duplication
Browse files Browse the repository at this point in the history
  • Loading branch information
skykanin committed Apr 20, 2023
1 parent 28ab14b commit f1e7cad
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 28 deletions.
4 changes: 2 additions & 2 deletions src/Database/PostgreSQL/Consumers/Components.hs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore
-- reserve one job at a time.
(batch, batchSize) <- reserveJobs $ case ccMode of
Standard -> limit
Deduplicating _ -> 1
Duplicating _ -> 1
when (batchSize > 0) $ do
logInfo "Processing batch" $ object [
"batch_size" .= batchSize
Expand Down Expand Up @@ -301,7 +301,7 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore
, "LIMIT" <?> limit
, "FOR UPDATE SKIP LOCKED"
]
Deduplicating field -> smconcat [
Duplicating field -> smconcat [
"WITH latest_for_id AS"
, " (SELECT id," <+> field <+> "FROM" <+> raw ccJobsTable
, " ORDER BY run_at," <+> field <> ", id" <+> "DESC LIMIT" <?> limit <+> "FOR UPDATE SKIP LOCKED),"
Expand Down
6 changes: 3 additions & 3 deletions src/Database/PostgreSQL/Consumers/Config.hs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ data Result = Ok Action | Failed Action
deriving (Eq, Ord, Show)

-- | The mode the consumer will run in.
data Mode = Standard | Deduplicating SQL
data Mode = Standard | Duplicating SQL
deriving (Show)

-- | Config of a consumer.
Expand Down Expand Up @@ -124,7 +124,7 @@ data ConsumerConfig m idx job = forall row. FromRow row => ConsumerConfig {
-- down, so it's best to ensure that it doesn't throw.
, ccOnException :: !(SomeException -> job -> m Action)
-- | The mode the consumer will use to reserve jobs.
-- In 'Deduplicating' mode the SQL expression indicates which field
-- to select for deduplication.
-- In 'Duplicating' mode the SQL expression indicates which field
-- to select for duplication.
, ccMode :: Mode
}
46 changes: 23 additions & 23 deletions test/Test.hs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ main = do
void . Test.runTestTT $
Test.TestList
[ Test.TestLabel "Test standard consumer config" $ Test.TestCase (test connSource)
, Test.TestLabel "Test deduplicating consumer config" $ Test.TestCase (testDeduplicating connSource)
, Test.TestLabel "Test duplicating consumer config" $ Test.TestCase (testDuplicating connSource)
]

-- | Connect to the postgres database
Expand All @@ -95,34 +95,34 @@ connectToDB = do
prog <- getProgName
putStrLn $ "Usage: " <> prog <> " <connection info string>"

testDeduplicating :: ConnectionSource [MonadBase IO, MonadMask] -> IO ()
testDeduplicating (ConnectionSource connSource) =
testDuplicating :: ConnectionSource [MonadBase IO, MonadMask] -> IO ()
testDuplicating (ConnectionSource connSource) =
withSimpleStdOutLogger $ \logger -> runTestEnv connSource logger $ do
createTables
idleSignal <- liftIO newEmptyTMVarIO
let rows = 15
putJob rows "consumers_test_deduplicating_jobs" "consumers_test_deduplicating_chan" *> commit
putJob rows "consumers_test_duplicating_jobs" "consumers_test_duplicating_chan" *> commit

-- Move time forward 2hours, because job is scheduled 1 hour into future
modifyTestTime . addUTCTime $ 2*60*60
finalize (localDomain "process" $
runConsumerWithIdleSignal deduplicatingConsumerConfig connSource idleSignal) $
runConsumerWithIdleSignal duplicatingConsumerConfig connSource idleSignal) $
waitUntilTrue idleSignal
currentTime >>= (logInfo_ . T.pack . ("current time: " ++) . show)

runSQL_ "SELECT COUNT(*) from person_test"
rowcount0 :: Int64 <- fetchOne runIdentity

runSQL_ "SELECT COUNT(*) from consumers_test_deduplicating_jobs"
runSQL_ "SELECT COUNT(*) from consumers_test_duplicating_jobs"
rowcount1 :: Int64 <- fetchOne runIdentity

liftIO $ Test.assertEqual "Number of rows in person_test is 2×rows" (2 * rows) (fromIntegral rowcount0)
liftIO $ Test.assertEqual "Job in consumers_test_deduplicating_jobs should be completed" 0 rowcount1
liftIO $ Test.assertEqual "Job in consumers_test_duplicating_jobs should be completed" 0 rowcount1

dropTables
where

tables = [deduplicatingConsumersTable, deduplicatingJobsTable, personTable]
tables = [duplicatingConsumersTable, duplicatingJobsTable, personTable]

migrations = createTableMigration <$> tables

Expand All @@ -139,31 +139,31 @@ testDeduplicating (ConnectionSource connSource) =
dropTables = do
migrateDatabase defaultExtrasOptions
{- extensions -} [] {- composites -} [] {- domains -} [] {- tables -} []
[ dropTableMigration deduplicatingJobsTable
, dropTableMigration deduplicatingConsumersTable
[ dropTableMigration duplicatingJobsTable
, dropTableMigration duplicatingConsumersTable
, dropTableMigration personTable
]

deduplicatingConsumerConfig = ConsumerConfig
{ ccJobsTable = "consumers_test_deduplicating_jobs"
, ccConsumersTable = "consumers_test_deduplicating_consumers"
duplicatingConsumerConfig = ConsumerConfig
{ ccJobsTable = "consumers_test_duplicating_jobs"
, ccConsumersTable = "consumers_test_duplicating_consumers"
, ccJobSelectors = ["id", "countdown"]
, ccJobFetcher = id
, ccJobIndex = fst
, ccNotificationChannel = Just "consumers_test_deduplicating_chan"
, ccNotificationChannel = Just "consumers_test_duplicating_chan"
-- select some small timeout
, ccNotificationTimeout = 100 * 1000 -- 100 msec
, ccMaxRunningJobs = 20
, ccProcessJob = insertNRows . snd
, ccOnException = \err (idx, _) -> handleException err idx
, ccMode = Deduplicating "countdown"
, ccMode = Duplicating "countdown"
}

insertNRows :: Int32 -> TestEnv Result
insertNRows count = do
replicateM_ (fromIntegral count) $ do
runSQL_ "INSERT INTO person_test (name, age) VALUES ('Anna', 20)"
notify "consumers_test_deduplicating_chan" ""
notify "consumers_test_duplicating_chan" ""
pure $ Ok Remove

test :: ConnectionSource [MonadBase IO, MonadMask] -> IO ()
Expand Down Expand Up @@ -304,10 +304,10 @@ personTable =
, tblPrimaryKey = pkOnColumn "id"
}

deduplicatingJobsTable :: Table
deduplicatingJobsTable =
duplicatingJobsTable :: Table
duplicatingJobsTable =
tblTable
{ tblName = "consumers_test_deduplicating_jobs"
{ tblName = "consumers_test_duplicating_jobs"
, tblVersion = 1
, tblColumns =
[ tblColumn { colName = "id", colType = BigSerialT
Expand All @@ -329,7 +329,7 @@ deduplicatingJobsTable =
]
, tblPrimaryKey = pkOnColumn "id"
, tblForeignKeys = [
(fkOnColumn "reserved_by" "consumers_test_deduplicating_consumers" "id") {
(fkOnColumn "reserved_by" "consumers_test_duplicating_consumers" "id") {
fkOnDelete = ForeignKeySetNull
}
]
Expand All @@ -351,10 +351,10 @@ consumersTable =
, tblPrimaryKey = pkOnColumn "id"
}

deduplicatingConsumersTable :: Table
deduplicatingConsumersTable =
duplicatingConsumersTable :: Table
duplicatingConsumersTable =
tblTable
{ tblName = "consumers_test_deduplicating_consumers"
{ tblName = "consumers_test_duplicating_consumers"
, tblVersion = 1
, tblColumns =
[ tblColumn { colName = "id", colType = BigSerialT
Expand Down

0 comments on commit f1e7cad

Please sign in to comment.