From 98f18e689960d64e5debb773669ce71c139df1b7 Mon Sep 17 00:00:00 2001 From: Marc Scholten Date: Thu, 26 Nov 2020 17:05:14 +0100 Subject: [PATCH 1/4] Added IHP.Job --- IHP/ControllerPrelude.hs | 5 +- IHP/IDE/CodeGen/Controller.hs | 18 +++ IHP/IDE/CodeGen/JobGenerator.hs | 79 ++++++++++ IHP/IDE/CodeGen/View/Generators.hs | 50 ++----- IHP/IDE/CodeGen/View/NewJob.hs | 65 +++++++++ IHP/IDE/ToolServer/Layout.hs | 3 + IHP/IDE/ToolServer/Types.hs | 2 + IHP/Job/Queue.hs | 186 ++++++++++++++++++++++++ IHP/Job/Runner.hs | 133 +++++++++++++++++ IHP/Job/Types.hs | 40 +++++ IHP/SchemaCompiler.hs | 2 + IHP/Server.hs | 28 +++- IHP/ViewPrelude.hs | 6 +- Test/IDE/CodeGeneration/JobGenerator.hs | 45 ++++++ Test/Main.hs | 2 + lib/IHP/IHPSchema.sql | 5 +- lib/IHP/Makefile.dist | 16 ++ 17 files changed, 639 insertions(+), 46 deletions(-) create mode 100644 IHP/IDE/CodeGen/JobGenerator.hs create mode 100644 IHP/IDE/CodeGen/View/NewJob.hs create mode 100644 IHP/Job/Queue.hs create mode 100644 IHP/Job/Runner.hs create mode 100644 IHP/Job/Types.hs create mode 100644 Test/IDE/CodeGeneration/JobGenerator.hs diff --git a/IHP/ControllerPrelude.hs b/IHP/ControllerPrelude.hs index 94ad62eaa..c1edf1f03 100644 --- a/IHP/ControllerPrelude.hs +++ b/IHP/ControllerPrelude.hs @@ -25,6 +25,7 @@ module IHP.ControllerPrelude , module IHP.Modal.Types , module IHP.Modal.ControllerFunctions , module IHP.Controller.Layout + , module IHP.Job.Types , Only (..) ) where import IHP.Prelude @@ -55,4 +56,6 @@ import IHP.Controller.Context import IHP.Controller.Layout import IHP.Modal.Types -import IHP.Modal.ControllerFunctions \ No newline at end of file +import IHP.Modal.ControllerFunctions + +import IHP.Job.Types \ No newline at end of file diff --git a/IHP/IDE/CodeGen/Controller.hs b/IHP/IDE/CodeGen/Controller.hs index d013d31f6..3826c41e0 100644 --- a/IHP/IDE/CodeGen/Controller.hs +++ b/IHP/IDE/CodeGen/Controller.hs @@ -10,6 +10,7 @@ import IHP.IDE.CodeGen.View.NewMail import IHP.IDE.CodeGen.View.NewAction import IHP.IDE.CodeGen.View.NewApplication import IHP.IDE.CodeGen.View.NewMigration +import IHP.IDE.CodeGen.View.NewJob import IHP.IDE.CodeGen.Types import IHP.IDE.CodeGen.ControllerGenerator as ControllerGenerator import IHP.IDE.CodeGen.ScriptGenerator as ScriptGenerator @@ -17,6 +18,7 @@ import IHP.IDE.CodeGen.ViewGenerator as ViewGenerator import IHP.IDE.CodeGen.MailGenerator as MailGenerator import IHP.IDE.CodeGen.ActionGenerator as ActionGenerator import IHP.IDE.CodeGen.ApplicationGenerator as ApplicationGenerator +import IHP.IDE.CodeGen.JobGenerator as JobGenerator import IHP.IDE.ToolServer.Helper.Controller import qualified System.Process as Process import qualified System.Directory as Directory @@ -157,6 +159,22 @@ instance Controller CodeGenController where openEditor path 0 0 redirectTo GeneratorsAction + action NewJobAction = do + let jobName = paramOrDefault "" "name" + let applicationName = paramOrDefault "Web" "applicationName" + controllers <- findControllers applicationName + applications <- findApplications + plan <- JobGenerator.buildPlan jobName applicationName + render NewJobView { .. } + + action CreateJobAction = do + let jobName = paramOrDefault "" "name" + let applicationName = "Web" + (Right plan) <- JobGenerator.buildPlan jobName applicationName + executePlan plan + setSuccessMessage "Job generated" + redirectTo GeneratorsAction + action OpenControllerAction = do let name = param "name" case name |> Inflector.toCamelCased True of diff --git a/IHP/IDE/CodeGen/JobGenerator.hs b/IHP/IDE/CodeGen/JobGenerator.hs new file mode 100644 index 000000000..4dda1bf88 --- /dev/null +++ b/IHP/IDE/CodeGen/JobGenerator.hs @@ -0,0 +1,79 @@ +module IHP.IDE.CodeGen.JobGenerator (buildPlan, buildPlan', JobConfig (..)) where + +import IHP.Prelude +import IHP.HaskellSupport +import qualified Data.Text as Text +import qualified Data.Text.IO as Text +import IHP.ViewSupport +import qualified System.Process as Process +import IHP.IDE.CodeGen.Types +import qualified IHP.IDE.SchemaDesigner.Parser as SchemaDesigner +import IHP.IDE.SchemaDesigner.Types +import qualified Text.Countable as Countable + +data JobConfig = JobConfig + { applicationName :: Text + , tableName :: Text -- | E.g. create_container_jobs + , modelName :: Text -- | E.g. CreateContainerJob + } deriving (Eq, Show) + +buildPlan :: Text -> Text -> IO (Either Text [GeneratorAction]) +buildPlan jobName applicationName = + if null jobName + then pure $ Left "Job name can be empty" + else do + let jobConfig = JobConfig + { applicationName + , tableName = jobName + , modelName = tableNameToModelName jobName + } + pure $ Right $ buildPlan' jobConfig + +-- E.g. qualifiedMailModuleName config "Confirmation" == "Web.Mail.Users.Confirmation" +qualifiedJobModuleName :: JobConfig -> Text +qualifiedJobModuleName config = + get #applicationName config <> ".Job." <> unqualifiedJobModuleName config + +unqualifiedJobModuleName :: JobConfig -> Text +unqualifiedJobModuleName config = Text.replace "Job" "" (get #modelName config) + +buildPlan' :: JobConfig -> [GeneratorAction] +buildPlan' config = + let + name = get #modelName config + tableName = modelNameToTableName name + nameWithSuffix = if "Job" `isSuffixOf` name + then name + else name <> "Job" --e.g. "Test" -> "TestJob" + nameWithoutSuffix = if "Job" `isSuffixOf` name + then Text.replace "Job" "" name + else name --e.g. "TestJob" -> "Test"" + + job = + "" + <> "module " <> qualifiedJobModuleName config <> " where\n" + <> "import " <> get #applicationName config <> ".Controller.Prelude\n" + <> "\n" + <> "instance Job " <> nameWithSuffix <> " where\n" + <> " perform " <> name <> " { .. } = do\n" + <> " putStrLn \"Hello World!\"\n" + + schemaSql = + "" + <> "CREATE TABLE " <> tableName <> " (\n" + <> " id UUID DEFAULT uuid_generate_v4() PRIMARY KEY NOT NULL,\n" + <> " created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() NOT NULL,\n" + <> " updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() NOT NULL,\n" + <> " status JOB_STATUS DEFAULT 'job_status_not_started' NOT NULL,\n" + <> " last_error TEXT DEFAULT NULL,\n" + <> " attempts_count INT DEFAULT 0 NOT NULL,\n" + <> " locked_at TIMESTAMP WITH TIME ZONE DEFAULT NULL,\n" + <> " locked_by UUID DEFAULT NULL\n" + <> ");\n" + in + [ EnsureDirectory { directory = get #applicationName config <> "/Job" } + , CreateFile { filePath = get #applicationName config <> "/Job/" <> nameWithoutSuffix <> ".hs", fileContent = job } + , AppendToFile { filePath = "Application/Schema.sql", fileContent = schemaSql } + , AppendToMarker { marker = "-- Job Imports", filePath = get #applicationName config <> "/Worker.hs", fileContent = ("import " <> qualifiedJobModuleName config) } + --, AddImport { filePath = get #applicationName config <> "/Controller/" <> controllerName <> ".hs", fileContent = "import " <> qualifiedViewModuleName config nameWithoutSuffix } + ] diff --git a/IHP/IDE/CodeGen/View/Generators.hs b/IHP/IDE/CodeGen/View/Generators.hs index 5ba838b49..312e08839 100644 --- a/IHP/IDE/CodeGen/View/Generators.hs +++ b/IHP/IDE/CodeGen/View/Generators.hs @@ -16,44 +16,26 @@ instance View GeneratorsView where
{renderFlashMessages}
- -
{copyIcon}
-
Controller
-
- - -
{copyIcon}
-
Action
-
- - -
{copyIcon}
-
View
-
- - -
{copyIcon}
-
Mail
-
- - -
{copyIcon}
-
Script
-
- - -
{databaseIcon}
-
Migration
-
- - -
{copyIcon}
-
Application
-
+ {generator "Controller" (pathTo NewControllerAction) copyIcon} + {generator "Action" (pathTo NewActionAction) copyIcon} + {generator "View" (pathTo NewViewAction) copyIcon} + {generator "Mail" (pathTo NewMailAction) copyIcon} + {generator "Background Job" (pathTo NewJobAction) cogsIcon} + {generator "Script" (pathTo NewScriptAction) copyIcon} + {generator "Migration" (pathTo NewMigrationAction) databaseIcon} + {generator "Application" (pathTo NewApplicationAction) copyIcon}
|] + where + generator :: Text -> Text -> Html -> Html + generator name path icon = [hsx| + +
{icon}
+
{name}
+
+ |] renderPlan (Left error) = [hsx|{error}|] renderPlan (Right actions) = [hsx|
{forEach actions renderGeneratorAction}
|] diff --git a/IHP/IDE/CodeGen/View/NewJob.hs b/IHP/IDE/CodeGen/View/NewJob.hs new file mode 100644 index 000000000..305a10988 --- /dev/null +++ b/IHP/IDE/CodeGen/View/NewJob.hs @@ -0,0 +1,65 @@ +module IHP.IDE.CodeGen.View.NewJob where + +import IHP.ViewPrelude +import IHP.IDE.SchemaDesigner.Types +import IHP.IDE.ToolServer.Types +import IHP.IDE.ToolServer.Layout +import IHP.IDE.SchemaDesigner.View.Layout +import IHP.IDE.CodeGen.Types +import IHP.IDE.CodeGen.View.Generators (renderPlan) +import qualified Data.Text as Text +import qualified Data.Text.IO as Text + +data NewJobView = NewJobView + { plan :: Either Text [GeneratorAction] + , jobName :: Text + , applicationName :: Text + , applications :: [Text] + } + +instance View NewJobView where + html NewJobView { .. } = [hsx| +
+ {renderFlashMessages} +
+
+ {if isEmpty jobName then renderEmpty else renderPreview} + {unless (isEmpty jobName) (renderPlan plan)} +
+
+
+ |] + where + renderEmpty = [hsx| +
+ {when (length applications /= 1) renderApplicationSelector} + + +
+ |] + renderApplicationOptions = forM_ applications (\x -> [hsx||]) + renderApplicationSelector = [hsx| + |] + renderPreview = [hsx| +
+
{applicationName}.Job.{jobName}
+ + + + + +
+ |] diff --git a/IHP/IDE/ToolServer/Layout.hs b/IHP/IDE/ToolServer/Layout.hs index fac183ac9..a62d3b8c3 100644 --- a/IHP/IDE/ToolServer/Layout.hs +++ b/IHP/IDE/ToolServer/Layout.hs @@ -131,3 +131,6 @@ globeIcon = preEscapedToHtml [plain||] + +-- | https://github.com/encharm/Font-Awesome-SVG-PNG/blob/master/white/svg/cogs.svg +cogsIcon = preEscapedToHtml [plain||] \ No newline at end of file diff --git a/IHP/IDE/ToolServer/Types.hs b/IHP/IDE/ToolServer/Types.hs index 19f13c15f..a4a9d3bc8 100644 --- a/IHP/IDE/ToolServer/Types.hs +++ b/IHP/IDE/ToolServer/Types.hs @@ -91,6 +91,7 @@ data CodeGenController | NewActionAction | NewApplicationAction | NewMigrationAction + | NewJobAction | CreateControllerAction | CreateScriptAction | CreateViewAction @@ -98,6 +99,7 @@ data CodeGenController | CreateActionAction | CreateApplicationAction | CreateMigrationAction + | CreateJobAction | OpenControllerAction deriving (Eq, Show, Data) diff --git a/IHP/Job/Queue.hs b/IHP/Job/Queue.hs new file mode 100644 index 000000000..296458c11 --- /dev/null +++ b/IHP/Job/Queue.hs @@ -0,0 +1,186 @@ +{-| +Module: IHP.Job.Queue +Description: Functions to operate on the Job Queue Database +Copyright: (c) digitally induced GmbH, 2020 +-} +module IHP.Job.Queue where + +import IHP.Prelude +import IHP.Job.Types +import qualified Database.PostgreSQL.Simple as PG +import qualified Database.PostgreSQL.Simple.Types as PG +import qualified Database.PostgreSQL.Simple.FromField as PG +import qualified Database.PostgreSQL.Simple.ToField as PG +import qualified Database.PostgreSQL.Simple.Notification as PG +import qualified Control.Concurrent.Async as Async +import IHP.ModelSupport +import IHP.QueryBuilder +import IHP.Controller.Param + +-- | Lock and fetch the next available job. In case no job is available returns Nothing. +-- +-- The lock is set on the job row in an atomic way. +-- +-- The job status is set to JobStatusRunning, lockedBy will be set to the worker id and the attemptsCount is incremented. +-- +-- __Example:__ Locking a SendMailJob +-- +-- > let workerId :: UUID = "faa5ba30-1d76-4adf-bf01-2d1f95cddc04" +-- > job <- fetchNextJob @SendMailJob workerId +-- +-- After you're done with the job, call 'jobDidFail' or 'jobDidSucceed' to make it available to the queue again. +fetchNextJob :: forall job. + ( ?modelContext :: ModelContext + , job ~ GetModelByTableName (GetTableName job) + , FilterPrimaryKey (GetTableName job) + , FromRow job + , Show (PrimaryKey (GetTableName job)) + , PG.FromField (PrimaryKey (GetTableName job)) + , KnownSymbol (GetTableName job) + ) => UUID -> IO (Maybe job) +fetchNextJob workerId = do + let query = "UPDATE ? SET status = ?, locked_at = NOW(), locked_by = ?, attempts_count = attempts_count + 1 WHERE id IN (SELECT id FROM ? WHERE (status = ?) OR (status = ? AND updated_at < NOW() + interval '30 seconds') AND locked_by IS NULL ORDER BY created_at LIMIT 1 FOR UPDATE) RETURNING id" + let params = (PG.Identifier (tableName @job), JobStatusRunning, workerId, PG.Identifier (tableName @job), JobStatusNotStarted, JobStatusRetry) + + result :: [PG.Only (Id job)] <- sqlQuery query params + case result of + [] -> pure Nothing + [PG.Only id] -> Just <$> fetch id + otherwise -> error (show otherwise) + +-- | Calls a callback every time something is inserted, updated or deleted in a given database table. +-- +-- In the background this function creates a database trigger to notify this function about table changes +-- using pg_notify. When there are existing triggers, it will silently recreate them. So this will most likely +-- not fail. +-- +-- This function returns a Async. Call 'cancel' on the async to stop watching the database. +-- +-- __Example:__ +-- +-- > watchInsertOrUpdateTable "projects" do +-- > putStrLn "Something changed in the projects table" +-- +-- Now insert something into the @projects@ table. E.g. by running @make psql@ and then running @INSERT INTO projects (id, name) VALUES (DEFAULT, 'New project');@ +-- You will see that @"Something changed in the projects table"@ is printed onto the screen. +-- +watchForJob :: (?modelContext :: ModelContext) => Text -> IO () -> IO (Async.Async ()) +watchForJob tableName handleJob = do + sqlExec (PG.Query $ cs $ createNotificationTrigger tableName) () + + let listenStatement = "LISTEN " <> PG.Query (cs $ eventName tableName) + Async.asyncBound do + forever do + notification <- withDatabaseConnection \databaseConnection -> do + PG.execute databaseConnection listenStatement () + PG.getNotification databaseConnection + + handleJob + +createNotificationTrigger :: Text -> Text +createNotificationTrigger tableName = "CREATE OR REPLACE FUNCTION " <> functionName <> "() RETURNS TRIGGER AS $$" + <> "BEGIN\n" + <> " PERFORM pg_notify('" <> eventName tableName <> "', '');\n" + <> " RETURN new;" + <> "END;\n" + <> "$$ language plpgsql;" + <> "DROP TRIGGER IF EXISTS " <> insertTriggerName <> " ON " <> tableName <> "; CREATE TRIGGER " <> insertTriggerName <> " AFTER INSERT ON \"" <> tableName <> "\" FOR EACH ROW WHEN (NEW.status = 'job_status_not_started' OR NEW.status = 'job_status_retry') EXECUTE PROCEDURE " <> functionName <> "();\n" + <> "DROP TRIGGER IF EXISTS " <> updateTriggerName <> " ON " <> tableName <> "; CREATE TRIGGER " <> updateTriggerName <> " AFTER UPDATE ON \"" <> tableName <> "\" FOR EACH ROW WHEN (NEW.status = 'job_status_not_started' OR NEW.status = 'job_status_retry') EXECUTE PROCEDURE " <> functionName <> "();\n" + where + functionName = "notify_job_queued_" <> tableName + insertTriggerName = "did_insert_job_" <> tableName + updateTriggerName = "did_update_job_" <> tableName + +-- | Retuns the event name of the event that the pg notify trigger dispatches +eventName :: Text -> Text +eventName tableName = "job_available_" <> tableName + +-- | Called when a job failed. Sets the job status to 'JobStatusFailed' or 'JobStatusRetry' (if more attempts are possible) and resets 'lockedBy' +jobDidFail :: forall job. + ( job ~ GetModelByTableName (GetTableName job) + , SetField "lockedBy" job (Maybe UUID) + , SetField "status" job JobStatus + , SetField "updatedAt" job UTCTime + , HasField "attemptsCount" job Int + , SetField "lastError" job (Maybe Text) + , Job job + , CanUpdate job + , Show job + , ?modelContext :: ModelContext + ) => job -> SomeException -> IO () +jobDidFail job exception = do + updatedAt <- getCurrentTime + + putStrLn ("Failed job with exception: " <> tshow exception) + + let ?job = job + let canRetry = get #attemptsCount job < maxAttempts + let status = if canRetry then JobStatusRetry else JobStatusFailed + job + |> set #status status + |> set #lockedBy Nothing + |> set #updatedAt updatedAt + |> set #lastError (Just (tshow exception)) + |> updateRecord + + pure () + +-- | Called when a job succeeded. Sets the job status to 'JobStatusSucceded' and resets 'lockedBy' +jobDidSucceed :: forall job. + ( job ~ GetModelByTableName (GetTableName job) + , SetField "lockedBy" job (Maybe UUID) + , SetField "status" job JobStatus + , SetField "updatedAt" job UTCTime + , HasField "attemptsCount" job Int + , SetField "lastError" job (Maybe Text) + , Job job + , CanUpdate job + , Show job + , ?modelContext :: ModelContext + ) => job -> IO () +jobDidSucceed job = do + putStrLn "Succeeded job" + updatedAt <- getCurrentTime + job + |> set #status JobStatusSucceeded + |> set #lockedBy Nothing + |> set #updatedAt updatedAt + |> updateRecord + + pure () + +-- | Mapping for @JOB_STATUS@: +-- +-- > CREATE TYPE JOB_STATUS AS ENUM ('job_status_not_started', 'job_status_running', 'job_status_failed', 'job_status_succeeded', 'job_status_retry'); +instance PG.FromField JobStatus where + fromField field (Just "job_status_not_started") = pure JobStatusNotStarted + fromField field (Just "job_status_running") = pure JobStatusRunning + fromField field (Just "job_status_failed") = pure JobStatusFailed + fromField field (Just "job_status_succeeded") = pure JobStatusSucceeded + fromField field (Just "job_status_retry") = pure JobStatusRetry + fromField field (Just value) = PG.returnError PG.ConversionFailed field ("Unexpected value for enum value. Got: " <> cs value) + fromField field Nothing = PG.returnError PG.UnexpectedNull field "Unexpected null for enum value" + +-- The default state is @not started@ +instance Default JobStatus where + def = JobStatusNotStarted + +-- | Mapping for @JOB_STATUS@: +-- +-- > CREATE TYPE JOB_STATUS AS ENUM ('job_status_not_started', 'job_status_running', 'job_status_failed', 'job_status_succeeded', 'job_status_retry'); +instance PG.ToField JobStatus where + toField JobStatusNotStarted = PG.toField ("job_status_not_started" :: Text) + toField JobStatusRunning = PG.toField ("job_status_running" :: Text) + toField JobStatusFailed = PG.toField ("job_status_failed" :: Text) + toField JobStatusSucceeded = PG.toField ("job_status_succeeded" :: Text) + toField JobStatusRetry = PG.toField ("job_status_retry" :: Text) + +instance InputValue JobStatus where + inputValue JobStatusNotStarted = "job_status_not_started" :: Text + inputValue JobStatusRunning = "job_status_running" :: Text + inputValue JobStatusFailed = "job_status_failed" :: Text + inputValue JobStatusSucceeded = "job_status_succeeded" :: Text + inputValue JobStatusRetry = "job_status_retry" :: Text + +instance IHP.Controller.Param.ParamReader JobStatus where + readParameter = IHP.Controller.Param.enumParamReader \ No newline at end of file diff --git a/IHP/Job/Runner.hs b/IHP/Job/Runner.hs new file mode 100644 index 000000000..7062a7582 --- /dev/null +++ b/IHP/Job/Runner.hs @@ -0,0 +1,133 @@ +{-# LANGUAGE AllowAmbiguousTypes #-} +{-| +Module: IHP.Job.Runner +Description: Functions to run jobs +Copyright: (c) digitally induced GmbH, 2020 +-} +module IHP.Job.Runner where + +import Application.Script.Prelude +import IHP.Job.Types +import qualified IHP.Job.Queue as Queue +import Application.Helper.Controller +import qualified Control.Exception as Exception +import qualified Database.PostgreSQL.Simple as PG +import qualified Database.PostgreSQL.Simple.Types as PG +import qualified Database.PostgreSQL.Simple.FromField as PG +import qualified Data.UUID.V4 as UUID +import qualified Control.Concurrent as Concurrent +import qualified Control.Concurrent.Async as Async +import qualified System.Posix.Signals as Signals +import qualified System.Exit as Exit + +runJobWorkers :: [JobWorker] -> Script +runJobWorkers jobWorkers = do + workerId <- UUID.nextRandom + allJobs <- newIORef [] + let oneSecond = 1000000 + + putStrLn ("Starting worker " <> tshow workerId) + + let jobWorkerArgs = JobWorkerArgs { allJobs, workerId, modelContext = ?modelContext, frameworkConfig = ?context} + + threadId <- Concurrent.myThreadId + exitSignalsCount <- newIORef 0 + let catchHandler = do + exitSignalsCount' <- readIORef exitSignalsCount + modifyIORef exitSignalsCount ((+) 1) + allJobs' <- readIORef allJobs + allJobsCompleted <- allJobs' + |> mapM Async.poll + >>= pure . filter isNothing + >>= pure . null + if allJobsCompleted + then Concurrent.throwTo threadId Exit.ExitSuccess + else if exitSignalsCount' == 0 + then do + putStrLn "Waiting for jobs to complete. CTRL+C again to force exit" + forEach allJobs' Async.wait + Concurrent.throwTo threadId Exit.ExitSuccess + else if exitSignalsCount' == 1 then do + putStrLn "Canceling all running jobs. CTRL+C again to force exit" + forEach allJobs' Async.cancel + Concurrent.throwTo threadId Exit.ExitSuccess + else Concurrent.throwTo threadId Exit.ExitSuccess + + + Signals.installHandler Signals.sigINT (Signals.Catch catchHandler) Nothing + Signals.installHandler Signals.sigTERM (Signals.Catch catchHandler) Nothing + + jobWorkers + |> mapM (\(JobWorker listenAndRun)-> listenAndRun jobWorkerArgs) + >>= Async.waitAnyCancel + + pure () + + +worker :: forall job. + ( job ~ GetModelByTableName (GetTableName job) + , FilterPrimaryKey (GetTableName job) + , FromRow job + , Show (PrimaryKey (GetTableName job)) + , PG.FromField (PrimaryKey (GetTableName job)) + , KnownSymbol (GetTableName job) + , SetField "attemptsCount" job Int + , SetField "lockedBy" job (Maybe UUID) + , SetField "status" job JobStatus + , SetField "updatedAt" job UTCTime + , HasField "attemptsCount" job Int + , SetField "lastError" job (Maybe Text) + , Job job + , CanUpdate job + , Show job + ) => JobWorker +worker = JobWorker (jobWorkerFetchAndRunLoop @job) + + +jobWorkerFetchAndRunLoop :: forall job. + ( job ~ GetModelByTableName (GetTableName job) + , FilterPrimaryKey (GetTableName job) + , FromRow job + , Show (PrimaryKey (GetTableName job)) + , PG.FromField (PrimaryKey (GetTableName job)) + , KnownSymbol (GetTableName job) + , SetField "attemptsCount" job Int + , SetField "lockedBy" job (Maybe UUID) + , SetField "status" job JobStatus + , SetField "updatedAt" job UTCTime + , HasField "attemptsCount" job Int + , SetField "lastError" job (Maybe Text) + , Job job + , CanUpdate job + , Show job + ) => JobWorkerArgs -> IO (Async.Async ()) +jobWorkerFetchAndRunLoop JobWorkerArgs { .. } = do + let ?context = frameworkConfig + let ?modelContext = modelContext + + -- This loop schedules all jobs that are in the queue. + -- It will be initally be called when first starting up this job worker + -- and after that it will be called when something has been inserted into the queue (or changed to retry) + let startLoop = do + asyncJob <- Async.async do + Exception.mask $ \restore -> do + maybeJob <- Queue.fetchNextJob @job workerId + case maybeJob of + Just job -> do + putStrLn ("Starting job: " <> tshow job) + resultOrException <- Exception.try (restore (perform job)) + case resultOrException of + Left exception -> Queue.jobDidFail job exception + Right _ -> Queue.jobDidSucceed job + + startLoop + Nothing -> pure () + modifyIORef allJobs (asyncJob:) + + -- Start all jobs in the queue + startLoop + + -- Start a job when a new job is added to the table or when it's set to retry + watcher <- Queue.watchForJob (tableName @job) startLoop + + pure watcher diff --git a/IHP/Job/Types.hs b/IHP/Job/Types.hs new file mode 100644 index 000000000..e3283bbe8 --- /dev/null +++ b/IHP/Job/Types.hs @@ -0,0 +1,40 @@ +module IHP.Job.Types +( Job (..) +, JobWorkerArgs (..) +, JobWorker (..) +, JobStatus (..) +, Worker (..) +) +where + +import IHP.Prelude +import IHP.FrameworkConfig +import qualified Control.Concurrent.Async as Async + +class Job job where + perform :: (?modelContext :: ModelContext, ?context :: FrameworkConfig) => job -> IO () + + maxAttempts :: (?job :: job) => Int + maxAttempts = 10 + +class Worker application where + workers :: application -> [JobWorker] + +data JobWorkerArgs = JobWorkerArgs + { allJobs :: IORef [Async.Async ()] + , workerId :: UUID + , modelContext :: ModelContext + , frameworkConfig :: FrameworkConfig } + +newtype JobWorker = JobWorker (JobWorkerArgs -> IO (Async.Async ())) + +-- | Mapping for @JOB_STATUS@. The DDL statement for this can be found in IHPSchema.sql: +-- +-- > CREATE TYPE JOB_STATUS AS ENUM ('job_status_not_started', 'job_status_running', 'job_status_failed', 'job_status_succeeded', 'job_status_retry'); +data JobStatus + = JobStatusNotStarted + | JobStatusRunning + | JobStatusFailed + | JobStatusSucceeded + | JobStatusRetry + deriving (Eq, Show, Read, Enum) \ No newline at end of file diff --git a/IHP/SchemaCompiler.hs b/IHP/SchemaCompiler.hs index 1ded89385..ca8d6816e 100644 --- a/IHP/SchemaCompiler.hs +++ b/IHP/SchemaCompiler.hs @@ -141,6 +141,8 @@ compileTypes options schema@(Schema statements) = <> "import qualified Data.Aeson\n" <> "import Database.PostgreSQL.Simple.Types (Query (Query), Binary ( .. ))\n" <> "import qualified Database.PostgreSQL.Simple.Types\n" + <> "import IHP.Job.Types\n" + <> "import IHP.Job.Queue ()\n" compileStatementPreview :: [Statement] -> Statement -> Text compileStatementPreview statements statement = let ?schema = Schema statements in compileStatement previewCompilerOptions statement diff --git a/IHP/Server.hs b/IHP/Server.hs index 7c7f1cdb4..baa413402 100644 --- a/IHP/Server.hs +++ b/IHP/Server.hs @@ -29,8 +29,11 @@ import qualified IHP.AutoRefresh as AutoRefresh import qualified IHP.AutoRefresh.Types as AutoRefresh import qualified IHP.WebSocket as WS import IHP.LibDir +import qualified IHP.Job.Runner as Job +import qualified IHP.Job.Types as Job +import qualified Control.Concurrent.Async as Async -run :: (FrontController RootApplication) => ConfigBuilder -> IO () +run :: (FrontController RootApplication, Job.Worker RootApplication) => ConfigBuilder -> IO () run configBuilder = do frameworkConfig@(FrameworkConfig { environment, appPort, dbPoolMaxConnections, dbPoolIdleTime, databaseUrl, sessionCookie, requestLoggerMiddleware }) <- buildFrameworkConfig configBuilder session <- Vault.newKey @@ -57,13 +60,22 @@ run configBuilder = do |> Warp.setPort appPort in Warp.runSettings settings else Warp.runEnv appPort - runServer $ - staticMiddleware $ - sessionMiddleware $ - ihpWebsocketMiddleware $ - requestLoggerMiddleware $ - methodOverridePost $ - application + + let jobWorkers = Job.workers RootApplication + let withBackgroundWorkers app = + if isDevelopment && not (isEmpty jobWorkers) + then Async.withAsync (let ?context = frameworkConfig in Job.runJobWorkers jobWorkers) (\_ -> app) + else app + + + withBackgroundWorkers do + runServer $ + staticMiddleware $ + sessionMiddleware $ + ihpWebsocketMiddleware $ + requestLoggerMiddleware $ + methodOverridePost $ + application ihpWebsocketMiddleware :: (?applicationContext :: ApplicationContext) => Middleware ihpWebsocketMiddleware (next :: Application) (request :: Request) respond = do diff --git a/IHP/ViewPrelude.hs b/IHP/ViewPrelude.hs index 42e36f7af..baa41bdf2 100644 --- a/IHP/ViewPrelude.hs +++ b/IHP/ViewPrelude.hs @@ -28,7 +28,8 @@ module IHP.ViewPrelude ( module IHP.Controller.Context, module IHP.Controller.Layout, module IHP.Modal.Types, - module IHP.Modal.ViewFunctions + module IHP.Modal.ViewFunctions, + module IHP.Job.Types ) where import IHP.Prelude @@ -55,4 +56,5 @@ import IHP.Controller.Context import IHP.Controller.Layout import IHP.Modal.Types -import IHP.Modal.ViewFunctions \ No newline at end of file +import IHP.Modal.ViewFunctions +import IHP.Job.Types \ No newline at end of file diff --git a/Test/IDE/CodeGeneration/JobGenerator.hs b/Test/IDE/CodeGeneration/JobGenerator.hs new file mode 100644 index 000000000..d2baa7fb2 --- /dev/null +++ b/Test/IDE/CodeGeneration/JobGenerator.hs @@ -0,0 +1,45 @@ +{-| +Module: Test.IDE.CodeGeneration.JobGenerator +Copyright: (c) digitally induced GmbH, 2020 +-} +module Test.IDE.CodeGeneration.JobGenerator where + +import Test.Hspec +import IHP.Prelude +import qualified IHP.IDE.CodeGen.JobGenerator as JobGenerator +import IHP.ViewPrelude (cs, plain) +import qualified Text.Megaparsec as Megaparsec +import IHP.IDE.CodeGen.Types +import IHP.IDE.SchemaDesigner.Types +import IHP.NameSupport + + +tests = do + describe "Job Generator" do + it "should build a job with name \"CreateContainerJobs\"" do + let applicationName = "Web" + let tableName = "create_container_jobs" + let modelName = "CreateContainerJob" + let config = JobGenerator.JobConfig { .. } + let builtPlan = JobGenerator.buildPlan' config + + builtPlan `shouldBe` [ + EnsureDirectory {directory = "Web/Job"} + , CreateFile {filePath = "Web/Job/CreateContainer.hs", fileContent = "module Web.Job.CreateContainer where\nimport Web.Controller.Prelude\n\ninstance Job CreateContainerJob where\n perform CreateContainerJob { .. } = do\n putStrLn \"Hello World!\"\n"} + , AppendToFile {filePath = "Application/Schema.sql", fileContent = "CREATE TABLE create_container_jobs (\n id UUID DEFAULT uuid_generate_v4() PRIMARY KEY NOT NULL,\n created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() NOT NULL,\n updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() NOT NULL,\n status JOB_STATUS DEFAULT 'job_status_not_started' NOT NULL,\n last_error TEXT DEFAULT NULL,\n attempts_count INT DEFAULT 0 NOT NULL,\n locked_at TIMESTAMP WITH TIME ZONE DEFAULT NULL,\n locked_by UUID DEFAULT NULL\n);\n"} + , AppendToMarker {marker = "-- Job Imports", filePath = "Web/Worker.hs", fileContent = "import Web.Job.CreateContainer"} + ] + + it "should support other applications" do + let applicationName = "Admin" + let tableName = "create_container_jobs" + let modelName = "CreateContainerJob" + let config = JobGenerator.JobConfig { .. } + let builtPlan = JobGenerator.buildPlan' config + + builtPlan `shouldBe` [ + EnsureDirectory {directory = "Admin/Job"} + , CreateFile {filePath = "Admin/Job/CreateContainer.hs", fileContent = "module Admin.Job.CreateContainer where\nimport Admin.Controller.Prelude\n\ninstance Job CreateContainerJob where\n perform CreateContainerJob { .. } = do\n putStrLn \"Hello World!\"\n"} + , AppendToFile {filePath = "Application/Schema.sql", fileContent = "CREATE TABLE create_container_jobs (\n id UUID DEFAULT uuid_generate_v4() PRIMARY KEY NOT NULL,\n created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() NOT NULL,\n updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() NOT NULL,\n status JOB_STATUS DEFAULT 'job_status_not_started' NOT NULL,\n last_error TEXT DEFAULT NULL,\n attempts_count INT DEFAULT 0 NOT NULL,\n locked_at TIMESTAMP WITH TIME ZONE DEFAULT NULL,\n locked_by UUID DEFAULT NULL\n);\n"} + , AppendToMarker {marker = "-- Job Imports", filePath = "Admin/Worker.hs", fileContent = "import Admin.Job.CreateContainer"} + ] diff --git a/Test/Main.hs b/Test/Main.hs index f01829ec2..04b30a0da 100644 --- a/Test/Main.hs +++ b/Test/Main.hs @@ -22,6 +22,7 @@ import qualified Test.ValidationSupport.ValidateFieldSpec import qualified Test.IDE.CodeGeneration.ControllerGenerator import qualified Test.IDE.CodeGeneration.ViewGenerator import qualified Test.IDE.CodeGeneration.MailGenerator +import qualified Test.IDE.CodeGeneration.JobGenerator import qualified Test.HtmlSupport.QQSpec import qualified Test.HtmlSupport.ParserSpec import qualified Test.NameSupportSpec @@ -39,6 +40,7 @@ main = hspec do Test.IDE.CodeGeneration.ControllerGenerator.tests Test.IDE.CodeGeneration.ViewGenerator.tests Test.IDE.CodeGeneration.MailGenerator.tests + Test.IDE.CodeGeneration.JobGenerator.tests Test.HtmlSupport.QQSpec.tests Test.NameSupportSpec.tests Test.HaskellSupportSpec.tests diff --git a/lib/IHP/IHPSchema.sql b/lib/IHP/IHPSchema.sql index a58db1c1f..9da57bb0b 100644 --- a/lib/IHP/IHPSchema.sql +++ b/lib/IHP/IHPSchema.sql @@ -1,2 +1,5 @@ -- Provides all the default settings for a IHP database in development mode -CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; \ No newline at end of file +CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; + +-- Used by IHP.Job +CREATE TYPE JOB_STATUS AS ENUM ('job_status_not_started', 'job_status_running', 'job_status_failed', 'job_status_succeeded', 'job_status_retry'); \ No newline at end of file diff --git a/lib/IHP/Makefile.dist b/lib/IHP/Makefile.dist index 80fa2d424..50eb87e31 100644 --- a/lib/IHP/Makefile.dist +++ b/lib/IHP/Makefile.dist @@ -178,6 +178,22 @@ build/Script/Main/%.hs: Application/Script/%.hs echo "import Application.Script.$* (run)" >> $@ echo "main = runScript Config.config run" >> $@ +build/bin/RunJobs: build/RunJobs.hs + mkdir -p build/bin + ghc -O2 -main-is 'RunJobs.main' ${GHC_OPTIONS} ${PROD_GHC_OPTIONS} $< -o $@ -odir build -hidir build + +build/RunJobs.hs: build/Generated/Types.hs + echo "module RunJobs (main) where" > $@ + echo "import Application.Script.Prelude" >> $@ + echo "import IHP.ScriptSupport" >> $@ + echo "import IHP.Job.Runner" >> $@ + echo "import qualified Config" >> $@ + echo "import Main" >> $@ + echo "main :: IO ()" >> $@ + echo "main = runScript Config.config run" >> $@ + echo "run :: Script" >> $@ + echo "run = runJobWorkers (workers RootApplication)" >> $@ + hie.yaml: # Configuration for haskell-language-server echo "cradle:" > hie.yaml echo " bios:" >> hie.yaml From ce2973be0867e51a6517d155f7874e24a7e3a91c Mon Sep 17 00:00:00 2001 From: Marc Scholten Date: Thu, 26 Nov 2020 17:20:03 +0100 Subject: [PATCH 2/4] Fixed indentation --- IHP/Job/Runner.hs | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/IHP/Job/Runner.hs b/IHP/Job/Runner.hs index 7062a7582..4b5086723 100644 --- a/IHP/Job/Runner.hs +++ b/IHP/Job/Runner.hs @@ -109,20 +109,20 @@ jobWorkerFetchAndRunLoop JobWorkerArgs { .. } = do -- It will be initally be called when first starting up this job worker -- and after that it will be called when something has been inserted into the queue (or changed to retry) let startLoop = do - asyncJob <- Async.async do - Exception.mask $ \restore -> do - maybeJob <- Queue.fetchNextJob @job workerId - case maybeJob of - Just job -> do - putStrLn ("Starting job: " <> tshow job) - resultOrException <- Exception.try (restore (perform job)) - case resultOrException of - Left exception -> Queue.jobDidFail job exception - Right _ -> Queue.jobDidSucceed job - - startLoop - Nothing -> pure () - modifyIORef allJobs (asyncJob:) + asyncJob <- Async.async do + Exception.mask $ \restore -> do + maybeJob <- Queue.fetchNextJob @job workerId + case maybeJob of + Just job -> do + putStrLn ("Starting job: " <> tshow job) + resultOrException <- Exception.try (restore (perform job)) + case resultOrException of + Left exception -> Queue.jobDidFail job exception + Right _ -> Queue.jobDidSucceed job + + startLoop + Nothing -> pure () + modifyIORef allJobs (asyncJob:) -- Start all jobs in the queue startLoop From 54cc16995806132eee4ab182f5b9d71b6cbd0280 Mon Sep 17 00:00:00 2001 From: Marc Scholten Date: Fri, 11 Dec 2020 08:56:43 +0100 Subject: [PATCH 3/4] Fixed imports --- IHP/Job/Runner.hs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/IHP/Job/Runner.hs b/IHP/Job/Runner.hs index 4b5086723..c0e2a291f 100644 --- a/IHP/Job/Runner.hs +++ b/IHP/Job/Runner.hs @@ -6,10 +6,11 @@ Copyright: (c) digitally induced GmbH, 2020 -} module IHP.Job.Runner where -import Application.Script.Prelude +import IHP.Prelude +import IHP.ControllerPrelude +import IHP.ScriptSupport import IHP.Job.Types import qualified IHP.Job.Queue as Queue -import Application.Helper.Controller import qualified Control.Exception as Exception import qualified Database.PostgreSQL.Simple as PG import qualified Database.PostgreSQL.Simple.Types as PG From 80846113b9cd9a545f2dad162f49db8073c1b6fe Mon Sep 17 00:00:00 2001 From: Marc Scholten Date: Fri, 11 Dec 2020 10:34:10 +0100 Subject: [PATCH 4/4] Fixed build --- IHP/IDE/CodeGen/JobGenerator.hs | 4 ++-- ihp.cabal | 5 +++++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/IHP/IDE/CodeGen/JobGenerator.hs b/IHP/IDE/CodeGen/JobGenerator.hs index 4dda1bf88..998332fcf 100644 --- a/IHP/IDE/CodeGen/JobGenerator.hs +++ b/IHP/IDE/CodeGen/JobGenerator.hs @@ -13,8 +13,8 @@ import qualified Text.Countable as Countable data JobConfig = JobConfig { applicationName :: Text - , tableName :: Text -- | E.g. create_container_jobs - , modelName :: Text -- | E.g. CreateContainerJob + , tableName :: Text -- E.g. create_container_jobs + , modelName :: Text -- E.g. CreateContainerJob } deriving (Eq, Show) buildPlan :: Text -> Text -> IO (Either Text [GeneratorAction]) diff --git a/ihp.cabal b/ihp.cabal index c1f07ae86..f2b73c13a 100644 --- a/ihp.cabal +++ b/ihp.cabal @@ -258,6 +258,11 @@ library IHPFramework , IHP.Version , IHP.Point , Paths_ihp + , IHP.IDE.CodeGen.JobGenerator + , IHP.IDE.CodeGen.View.NewJob + , IHP.Job.Queue + , IHP.Job.Runner + , IHP.Job.Types executable RunDevServer import: shared-properties