Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CORE-5696] Have a deduplicating job worker #18

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# GHC and Cabal
skykanin marked this conversation as resolved.
Show resolved Hide resolved
/dist/
/dist-newstyle/
/cabal.project.local
Expand Down
1 change: 1 addition & 0 deletions example/Example.hs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ main = do
, ccMaxRunningJobs = 1
, ccProcessJob = processJob
, ccOnException = handleException
, ccMode = Standard
}

-- Add a job to the consumer's queue.
Expand Down
121 changes: 95 additions & 26 deletions src/Database/PostgreSQL/Consumers/Components.hs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ runConsumerWithMaybeIdleSignal cc cs mIdleSignal
initialJobs <- liftBase $ readTVarIO runningJobsInfo
(`fix` initialJobs) $ \loop jobsInfo -> do
-- If jobs are still running, display info about them.
when (not $ M.null jobsInfo) $ do
unless (M.null jobsInfo) $ do
logInfo "Waiting for running jobs" $ object [
"job_id" .= showJobsInfo jobsInfo
]
Expand All @@ -108,7 +108,7 @@ runConsumerWithMaybeIdleSignal cc cs mIdleSignal
-- If jobs info didn't change, wait for it to change.
-- Otherwise loop so it either displays the new info
-- or exits if there are no jobs running anymore.
if (newJobsInfo == jobsInfo)
if newJobsInfo == jobsInfo
then retry
else return $ loop newJobsInfo
where
Expand Down Expand Up @@ -167,7 +167,7 @@ spawnMonitor ConsumerConfig{..} cs cid = forkP "monitor" . forever $ do
if ok
then logInfo_ "Activity of the consumer updated"
else do
logInfo_ $ "Consumer is not registered"
logInfo_ "Consumer is not registered"
throwM ThreadKilled
-- Freeing jobs locked by inactive consumers needs to happen
-- exactly once, otherwise it's possible to free it twice, after
Expand Down Expand Up @@ -257,8 +257,12 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore
let subtractJobs = atomically $ do
modifyTVar' runningJobs (subtract batchSize)
void . forkP "batch processor"
. (`finally` subtractJobs) . restore $ do
mapM startJob batch >>= mapM joinJob >>= updateJobs
. (`finally` subtractJobs) . restore $
-- Ensures that we only process one job at a time
-- when running in @'Duplicating'@ mode.
case batch of
Left job -> startJob job >>= joinJob >>= updateJob
Right jobs -> mapM startJob jobs >>= mapM joinJob >>= updateJobs

when (batchSize == limit) $ do
maxBatchSize <- atomically $ do
Expand All @@ -269,7 +273,7 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore

return (batchSize > 0)

reserveJobs :: Int -> m ([job], Int)
reserveJobs :: Int -> m (Either job [job], Int)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Either is quite confusing. Could you add a comment about the idea behind it so that people don't have to figure out what it's supposed to mean? I wonder if a simple data type wouldn't be better here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And by "better" I mean more readable...

reserveJobs limit = runDBT cs ts $ do
now <- currentTime
n <- runSQL $ smconcat [
Expand All @@ -283,21 +287,39 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore
, "RETURNING" <+> mintercalate ", " ccJobSelectors
]
-- Decode lazily as we want the transaction to be as short as possible.
(, n) . F.toList . fmap ccJobFetcher <$> queryResult
(, n) . limitJobs . F.toList . fmap ccJobFetcher <$> queryResult
where
-- Reserve a single job or a list of jobs depending
-- on which @'ccMode'@ the consumer is running in.
limitJobs = case ccMode of
Standard -> Right
Duplicating _field -> Left . head
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem right.

You're picking the first job from the list and assume that it was the one with the highest id later in updateJob, but why? The query above doesn't sort on the id field. But even if you take the highest one, it's not guaranteed that you want to update all jobs with a lower id later (once looking at run_at is fixed in the reservedJobs query).

Uhh, this looks to be more complicated than I first thought it will be (even more so considering my other comment below).

reservedJobs :: UTCTime -> SQL
reservedJobs now = smconcat [
"SELECT id FROM" <+> raw ccJobsTable
, "WHERE"
, " reserved_by IS NULL"
, " AND run_at IS NOT NULL"
, " AND run_at <= " <?> now
, " ORDER BY run_at"
, "LIMIT" <?> limit
, "FOR UPDATE SKIP LOCKED"
]
reservedJobs now = case ccMode of
Standard -> smconcat [
"SELECT id FROM" <+> raw ccJobsTable
, "WHERE"
, " reserved_by IS NULL"
, " AND run_at IS NOT NULL"
, " AND run_at <= " <?> now
, " ORDER BY run_at"
, "LIMIT" <?> limit
, "FOR UPDATE SKIP LOCKED"
]
Duplicating "id" -> error "Cannot duplicate on the primary key field 'id'"
Duplicating field -> smconcat [
"WITH latest_for_id AS"
, " (SELECT id," <+> raw field <+> "FROM" <+> raw ccJobsTable
, " ORDER BY run_at," <+> raw field <> ", id DESC LIMIT 1 FOR UPDATE SKIP LOCKED),"
, " lock_all AS"
, " (SELECT id," <+> raw field <+> "FROM" <+> raw ccJobsTable
, " WHERE" <+> raw field <+> "= (SELECT" <+> raw field <+> "FROM latest_for_id)"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this almost entirely ignores run_at column? There's no run_at <= <?> now, so this would just process any job, even ones scheduled in the future, but even if the conditions was set, the de-duplicating job worker would not be very efficient at de-duplicating if jobs were scheduled into the future or when ccNotificationChannel is set.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. The mode should lock the group of jobs with the same deduplication id (dedId) that are scheduled to be processed with run_at <= now. If there are other jobs with this dedId scheduled for the future, they should be left alone.

The other problem here is not looking at the reserved_by column. However, introduction of reserved_by check like in the standard case doesn't fully solve the issue because even if a job is still being processed, there might be another row inserted after it started with the same dedId. And now it will be started in parallel to the old one and there's going to be a race :/

Copy link
Contributor

@marco44 marco44 Jul 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, sorry, I feel like i'm arriving after the party…
so for what i see:

  • nothing is preventing us from having 2 sessions doing this at almost the same time and and both thinking that they want to work on the same value of "raw field", which i suppose is the dedId mentioned elsewhere ?
  • if we want to be 100% sure we don't have 2 jobs working on the same dedid at the same time, there is a more direct approach: we just lock this dedid. either in memory using an advisory lock (like select pg_advisory_lock(hash(dedid)) or something like that, or we just create a table for this with this dedid as the unique column and PK. When you want to work on a dedid, you insert a record there. when you have finished, you delete it and commit. noone will be able to work on it in the meantime. advisory locks are probably better here… you can tie them to a transaction or not (better in case you want them to be freed on error for instance), and you have the "try" function variants. So maybe it would be simpler to just:
  • find a candidate, get its deduplication id (the select for update skip lock should help us parallelize it), and try locking this dedid in shared memory. then the rest probably becomes simpler and more error proof ?

BTW, maybe i misunderstood how this work, i didn't look at the haskell code all around

, " AND id <= (SELECT id FROM latest_for_id)"
, " FOR UPDATE SKIP LOCKED)"
, "SELECT id FROM lock_all"
]

-- | Spawn each job in a separate thread.
-- Spawn each job in a separate thread.
startJob :: job -> m (job, m (T.Result Result))
startJob job = do
(_, joinFork) <- mask $ \restore -> T.fork $ do
Expand All @@ -311,7 +333,7 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore
unregisterJob tid = atomically $ do
modifyTVar' runningJobsInfo $ M.delete tid

-- | Wait for all the jobs and collect their results.
-- Wait for all the jobs and collect their results.
joinJob :: (job, m (T.Result Result)) -> m (idx, Result)
joinJob (job, joinFork) = joinFork >>= \eres -> case eres of
Right result -> return (ccJobIndex job, result)
Expand All @@ -325,14 +347,61 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore
]
return (ccJobIndex job, Failed action)

-- | Update status of the jobs.
-- Update the status of a job running in @'Duplicating'@ mode.
updateJob :: (idx, Result) -> m ()
skykanin marked this conversation as resolved.
Show resolved Hide resolved
updateJob (idx, result) = runDBT cs ts $ do
now <- currentTime
runSQL_ $ case result of
Ok Remove -> deleteQuery
-- TODO: Should we be deduplicating when a job fails with 'Remove' or only
-- remove the failing job?
Failed Remove -> deleteQuery
_ -> retryQuery now (isSuccess result) (getAction result)
where
deleteQuery = "DELETE FROM" <+> raw ccJobsTable <+> "WHERE" <+> raw idxRow <+> "<=" <?> idx

retryQuery now success action = smconcat
[ "UPDATE" <+> raw ccJobsTable <+> "SET"
, " reserved_by = NULL"
, ", run_at = CASE"
, " WHEN FALSE THEN run_at"
, retryToSQL
, " ELSE NULL" -- processed
, " END"
, if success then smconcat
[ ", finished_at = CASE"
, " WHEN id =" <?> idx <+> "THEN" <?> now
, " ELSE NULL"
, " END"
]
else ""
, "WHERE" <+> raw idxRow <+> "<=" <?> idx
]
where
retryToSQL = case action of
RerunAfter int -> "WHEN id =" <?> idx <+> "THEN" <?> now <+> "+" <?> int
RerunAt time -> "WHEN id =" <?> idx <+> "THEN" <?> time
MarkProcessed -> ""
Remove -> error "updateJob: 'Remove' should've been filtered out"

idxRow = case ccMode of
Standard -> error $ "'updateJob' should never be called when ccMode = " <> show Standard
skykanin marked this conversation as resolved.
Show resolved Hide resolved
Duplicating field -> field

isSuccess (Ok _) = True
isSuccess (Failed _) = False

getAction (Ok action) = action
getAction (Failed action) = action

-- Update the status of jobs running in @'Standard'@ mode.
updateJobs :: [(idx, Result)] -> m ()
updateJobs results = runDBT cs ts $ do
now <- currentTime
runSQL_ $ smconcat [
"WITH removed AS ("
runSQL_ $ smconcat
[ "WITH removed AS ("
, " DELETE FROM" <+> raw ccJobsTable
, " WHERE id = ANY(" <?> Array1 deletes <+> ")"
, " WHERE id = ANY (" <?> Array1 deletes <+> ")"
, ")"
, "UPDATE" <+> raw ccJobsTable <+> "SET"
, " reserved_by = NULL"
Expand All @@ -345,15 +414,15 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore
, " WHEN id = ANY(" <?> Array1 successes <+> ") THEN " <?> now
, " ELSE NULL"
, " END"
, "WHERE id = ANY(" <?> Array1 (map fst updates) <+> ")"
, "WHERE id = ANY (" <?> Array1 (map fst updates) <+> ")"
]
where
retryToSQL now (Left int) ids =
("WHEN id = ANY(" <?> Array1 ids <+> ") THEN " <?> now <> " +" <?> int :)
retryToSQL _ (Right time) ids =
("WHEN id = ANY(" <?> Array1 ids <+> ") THEN" <?> time :)

retries = foldr step M.empty $ map f updates
retries = foldr (step . f) M.empty updates
where
f (idx, result) = case result of
Ok action -> (idx, action)
Expand All @@ -364,7 +433,7 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore
RerunAfter int -> M.insertWith (++) (Left int) [idx] iretries
RerunAt time -> M.insertWith (++) (Right time) [idx] iretries
Remove -> error
"updateJobs: Remove should've been filtered out"
"updateJobs: 'Remove' should've been filtered out"

successes = foldr step [] updates
where
Expand Down
25 changes: 24 additions & 1 deletion src/Database/PostgreSQL/Consumers/Config.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{-# LANGUAGE ExistentialQuantification #-}
module Database.PostgreSQL.Consumers.Config (
Action(..)
, Mode(..)
, Result(..)
, ConsumerConfig(..)
) where
Expand All @@ -21,12 +22,30 @@ data Action
| RerunAfter Interval
| RerunAt UTCTime
| Remove
deriving (Eq, Ord, Show)
deriving (Eq, Ord, Show)

-- | Result of processing a job.
data Result = Ok Action | Failed Action
deriving (Eq, Ord, Show)

-- | The mode the consumer will run in:
--
-- * @'Standard'@ - Consumer jobs will be run in ascending order
-- based on the __run_at__ field. When jobs are updated,
-- ones that are marked for removal will be deleted.
--
-- * @'Duplicating' field@ - The job with the highest __id__ for
-- the lowest __run_at__ and __field__ value is selected. Then
-- all jobs that have the same __field__ value and a smaller or equal
-- __id__ value are reserved and run. When /one/ of these jobs are removed
-- all jobs with a smaller or equal __field_ value are also deleted. This
-- essentially allows one to race multiple jobs, only applying the result
-- of whichever job finishes first.
--
-- Note: One cannot duplicate on the primary key field named @id@ in the @'ccJobsTable'@.
data Mode = Standard | Duplicating (RawSQL ())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we allow deduplicating on the primary key row of the jobs table like ccMode = Duplicating "id"? If you do this right now you get an ambiguity error in the sql query used for reserving jobs in the consumer because part of the query used in reserveJobs becomes SELECT id, id ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My approach would be not to do that if it's not necessary for proper function now. It can be added later if there's a need for it. And maybe document it somewhere that you can't deduplicate based on fields that are called id.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicating should probably be a non-empty array of expressions as one may want to be able to de-duplicate on more than one expression. And the SQL expression type should be just SQL and not RawSQL ().

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RawSQL () is fine, it's for "sql literals", i.e. values that can't hold parameters.

skykanin marked this conversation as resolved.
Show resolved Hide resolved
deriving (Show)

-- | Config of a consumer.
data ConsumerConfig m idx job = forall row. FromRow row => ConsumerConfig {
-- | Name of the database table where jobs are stored. The table needs to have
Expand Down Expand Up @@ -118,4 +137,8 @@ data ConsumerConfig m idx job = forall row. FromRow row => ConsumerConfig {
-- Note that if this action throws an exception, the consumer goes
-- down, so it's best to ensure that it doesn't throw.
, ccOnException :: !(SomeException -> job -> m Action)
-- | The mode the consumer will use to reserve jobs.
-- In @'Duplicating'@ mode the SQL expression indicates which field
-- in the jobs table (specified by @'ccJobsTable'@) to select for duplication.
, ccMode :: Mode
}
2 changes: 1 addition & 1 deletion src/Database/PostgreSQL/Consumers/Utils.hs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ finalize m action = do

----------------------------------------

-- | Exception thrown to a thread to stop its execution.
-- Exception thrown to a thread to stop its execution.
-- All exceptions other than 'StopExecution' thrown to
-- threads spawned by 'forkP' and 'gforkP' are propagated
-- back to the parent thread.
Expand Down
Loading