diff --git a/IHP/ControllerPrelude.hs b/IHP/ControllerPrelude.hs
index 94ad62eaa..c1edf1f03 100644
--- a/IHP/ControllerPrelude.hs
+++ b/IHP/ControllerPrelude.hs
@@ -25,6 +25,7 @@ module IHP.ControllerPrelude
, module IHP.Modal.Types
, module IHP.Modal.ControllerFunctions
, module IHP.Controller.Layout
+ , module IHP.Job.Types
, Only (..)
) where
import IHP.Prelude
@@ -55,4 +56,6 @@ import IHP.Controller.Context
import IHP.Controller.Layout
import IHP.Modal.Types
-import IHP.Modal.ControllerFunctions
\ No newline at end of file
+import IHP.Modal.ControllerFunctions
+
+import IHP.Job.Types
\ No newline at end of file
diff --git a/IHP/IDE/CodeGen/Controller.hs b/IHP/IDE/CodeGen/Controller.hs
index d013d31f6..3826c41e0 100644
--- a/IHP/IDE/CodeGen/Controller.hs
+++ b/IHP/IDE/CodeGen/Controller.hs
@@ -10,6 +10,7 @@ import IHP.IDE.CodeGen.View.NewMail
import IHP.IDE.CodeGen.View.NewAction
import IHP.IDE.CodeGen.View.NewApplication
import IHP.IDE.CodeGen.View.NewMigration
+import IHP.IDE.CodeGen.View.NewJob
import IHP.IDE.CodeGen.Types
import IHP.IDE.CodeGen.ControllerGenerator as ControllerGenerator
import IHP.IDE.CodeGen.ScriptGenerator as ScriptGenerator
@@ -17,6 +18,7 @@ import IHP.IDE.CodeGen.ViewGenerator as ViewGenerator
import IHP.IDE.CodeGen.MailGenerator as MailGenerator
import IHP.IDE.CodeGen.ActionGenerator as ActionGenerator
import IHP.IDE.CodeGen.ApplicationGenerator as ApplicationGenerator
+import IHP.IDE.CodeGen.JobGenerator as JobGenerator
import IHP.IDE.ToolServer.Helper.Controller
import qualified System.Process as Process
import qualified System.Directory as Directory
@@ -157,6 +159,22 @@ instance Controller CodeGenController where
openEditor path 0 0
redirectTo GeneratorsAction
+ action NewJobAction = do
+ let jobName = paramOrDefault "" "name"
+ let applicationName = paramOrDefault "Web" "applicationName"
+ controllers <- findControllers applicationName
+ applications <- findApplications
+ plan <- JobGenerator.buildPlan jobName applicationName
+ render NewJobView { .. }
+
+ action CreateJobAction = do
+ let jobName = paramOrDefault "" "name"
+ let applicationName = "Web"
+ (Right plan) <- JobGenerator.buildPlan jobName applicationName
+ executePlan plan
+ setSuccessMessage "Job generated"
+ redirectTo GeneratorsAction
+
action OpenControllerAction = do
let name = param "name"
case name |> Inflector.toCamelCased True of
diff --git a/IHP/IDE/CodeGen/JobGenerator.hs b/IHP/IDE/CodeGen/JobGenerator.hs
new file mode 100644
index 000000000..998332fcf
--- /dev/null
+++ b/IHP/IDE/CodeGen/JobGenerator.hs
@@ -0,0 +1,79 @@
+module IHP.IDE.CodeGen.JobGenerator (buildPlan, buildPlan', JobConfig (..)) where
+
+import IHP.Prelude
+import IHP.HaskellSupport
+import qualified Data.Text as Text
+import qualified Data.Text.IO as Text
+import IHP.ViewSupport
+import qualified System.Process as Process
+import IHP.IDE.CodeGen.Types
+import qualified IHP.IDE.SchemaDesigner.Parser as SchemaDesigner
+import IHP.IDE.SchemaDesigner.Types
+import qualified Text.Countable as Countable
+
+data JobConfig = JobConfig
+ { applicationName :: Text
+ , tableName :: Text -- E.g. create_container_jobs
+ , modelName :: Text -- E.g. CreateContainerJob
+ } deriving (Eq, Show)
+
+buildPlan :: Text -> Text -> IO (Either Text [GeneratorAction])
+buildPlan jobName applicationName =
+ if null jobName
+ then pure $ Left "Job name can be empty"
+ else do
+ let jobConfig = JobConfig
+ { applicationName
+ , tableName = jobName
+ , modelName = tableNameToModelName jobName
+ }
+ pure $ Right $ buildPlan' jobConfig
+
+-- E.g. qualifiedMailModuleName config "Confirmation" == "Web.Mail.Users.Confirmation"
+qualifiedJobModuleName :: JobConfig -> Text
+qualifiedJobModuleName config =
+ get #applicationName config <> ".Job." <> unqualifiedJobModuleName config
+
+unqualifiedJobModuleName :: JobConfig -> Text
+unqualifiedJobModuleName config = Text.replace "Job" "" (get #modelName config)
+
+buildPlan' :: JobConfig -> [GeneratorAction]
+buildPlan' config =
+ let
+ name = get #modelName config
+ tableName = modelNameToTableName name
+ nameWithSuffix = if "Job" `isSuffixOf` name
+ then name
+ else name <> "Job" --e.g. "Test" -> "TestJob"
+ nameWithoutSuffix = if "Job" `isSuffixOf` name
+ then Text.replace "Job" "" name
+ else name --e.g. "TestJob" -> "Test""
+
+ job =
+ ""
+ <> "module " <> qualifiedJobModuleName config <> " where\n"
+ <> "import " <> get #applicationName config <> ".Controller.Prelude\n"
+ <> "\n"
+ <> "instance Job " <> nameWithSuffix <> " where\n"
+ <> " perform " <> name <> " { .. } = do\n"
+ <> " putStrLn \"Hello World!\"\n"
+
+ schemaSql =
+ ""
+ <> "CREATE TABLE " <> tableName <> " (\n"
+ <> " id UUID DEFAULT uuid_generate_v4() PRIMARY KEY NOT NULL,\n"
+ <> " created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() NOT NULL,\n"
+ <> " updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() NOT NULL,\n"
+ <> " status JOB_STATUS DEFAULT 'job_status_not_started' NOT NULL,\n"
+ <> " last_error TEXT DEFAULT NULL,\n"
+ <> " attempts_count INT DEFAULT 0 NOT NULL,\n"
+ <> " locked_at TIMESTAMP WITH TIME ZONE DEFAULT NULL,\n"
+ <> " locked_by UUID DEFAULT NULL\n"
+ <> ");\n"
+ in
+ [ EnsureDirectory { directory = get #applicationName config <> "/Job" }
+ , CreateFile { filePath = get #applicationName config <> "/Job/" <> nameWithoutSuffix <> ".hs", fileContent = job }
+ , AppendToFile { filePath = "Application/Schema.sql", fileContent = schemaSql }
+ , AppendToMarker { marker = "-- Job Imports", filePath = get #applicationName config <> "/Worker.hs", fileContent = ("import " <> qualifiedJobModuleName config) }
+ --, AddImport { filePath = get #applicationName config <> "/Controller/" <> controllerName <> ".hs", fileContent = "import " <> qualifiedViewModuleName config nameWithoutSuffix }
+ ]
diff --git a/IHP/IDE/CodeGen/View/Generators.hs b/IHP/IDE/CodeGen/View/Generators.hs
index 5ba838b49..312e08839 100644
--- a/IHP/IDE/CodeGen/View/Generators.hs
+++ b/IHP/IDE/CodeGen/View/Generators.hs
@@ -16,44 +16,26 @@ instance View GeneratorsView where
|]
+ where
+ generator :: Text -> Text -> Html -> Html
+ generator name path icon = [hsx|
+
+ {icon}
+ {name}
+
+ |]
renderPlan (Left error) = [hsx|{error}|]
renderPlan (Right actions) = [hsx|{forEach actions renderGeneratorAction}
|]
diff --git a/IHP/IDE/CodeGen/View/NewJob.hs b/IHP/IDE/CodeGen/View/NewJob.hs
new file mode 100644
index 000000000..305a10988
--- /dev/null
+++ b/IHP/IDE/CodeGen/View/NewJob.hs
@@ -0,0 +1,65 @@
+module IHP.IDE.CodeGen.View.NewJob where
+
+import IHP.ViewPrelude
+import IHP.IDE.SchemaDesigner.Types
+import IHP.IDE.ToolServer.Types
+import IHP.IDE.ToolServer.Layout
+import IHP.IDE.SchemaDesigner.View.Layout
+import IHP.IDE.CodeGen.Types
+import IHP.IDE.CodeGen.View.Generators (renderPlan)
+import qualified Data.Text as Text
+import qualified Data.Text.IO as Text
+
+data NewJobView = NewJobView
+ { plan :: Either Text [GeneratorAction]
+ , jobName :: Text
+ , applicationName :: Text
+ , applications :: [Text]
+ }
+
+instance View NewJobView where
+ html NewJobView { .. } = [hsx|
+
+ {renderFlashMessages}
+
+
+ {if isEmpty jobName then renderEmpty else renderPreview}
+ {unless (isEmpty jobName) (renderPlan plan)}
+
+
+
+ |]
+ where
+ renderEmpty = [hsx|
+
+ |]
+ renderApplicationOptions = forM_ applications (\x -> [hsx||])
+ renderApplicationSelector = [hsx|
+ |]
+ renderPreview = [hsx|
+
+ |]
diff --git a/IHP/IDE/ToolServer/Layout.hs b/IHP/IDE/ToolServer/Layout.hs
index fac183ac9..a62d3b8c3 100644
--- a/IHP/IDE/ToolServer/Layout.hs
+++ b/IHP/IDE/ToolServer/Layout.hs
@@ -131,3 +131,6 @@ globeIcon = preEscapedToHtml [plain||]
+
+-- | https://github.com/encharm/Font-Awesome-SVG-PNG/blob/master/white/svg/cogs.svg
+cogsIcon = preEscapedToHtml [plain||]
\ No newline at end of file
diff --git a/IHP/IDE/ToolServer/Types.hs b/IHP/IDE/ToolServer/Types.hs
index 19f13c15f..a4a9d3bc8 100644
--- a/IHP/IDE/ToolServer/Types.hs
+++ b/IHP/IDE/ToolServer/Types.hs
@@ -91,6 +91,7 @@ data CodeGenController
| NewActionAction
| NewApplicationAction
| NewMigrationAction
+ | NewJobAction
| CreateControllerAction
| CreateScriptAction
| CreateViewAction
@@ -98,6 +99,7 @@ data CodeGenController
| CreateActionAction
| CreateApplicationAction
| CreateMigrationAction
+ | CreateJobAction
| OpenControllerAction
deriving (Eq, Show, Data)
diff --git a/IHP/Job/Queue.hs b/IHP/Job/Queue.hs
new file mode 100644
index 000000000..296458c11
--- /dev/null
+++ b/IHP/Job/Queue.hs
@@ -0,0 +1,186 @@
+{-|
+Module: IHP.Job.Queue
+Description: Functions to operate on the Job Queue Database
+Copyright: (c) digitally induced GmbH, 2020
+-}
+module IHP.Job.Queue where
+
+import IHP.Prelude
+import IHP.Job.Types
+import qualified Database.PostgreSQL.Simple as PG
+import qualified Database.PostgreSQL.Simple.Types as PG
+import qualified Database.PostgreSQL.Simple.FromField as PG
+import qualified Database.PostgreSQL.Simple.ToField as PG
+import qualified Database.PostgreSQL.Simple.Notification as PG
+import qualified Control.Concurrent.Async as Async
+import IHP.ModelSupport
+import IHP.QueryBuilder
+import IHP.Controller.Param
+
+-- | Lock and fetch the next available job. In case no job is available returns Nothing.
+--
+-- The lock is set on the job row in an atomic way.
+--
+-- The job status is set to JobStatusRunning, lockedBy will be set to the worker id and the attemptsCount is incremented.
+--
+-- __Example:__ Locking a SendMailJob
+--
+-- > let workerId :: UUID = "faa5ba30-1d76-4adf-bf01-2d1f95cddc04"
+-- > job <- fetchNextJob @SendMailJob workerId
+--
+-- After you're done with the job, call 'jobDidFail' or 'jobDidSucceed' to make it available to the queue again.
+fetchNextJob :: forall job.
+ ( ?modelContext :: ModelContext
+ , job ~ GetModelByTableName (GetTableName job)
+ , FilterPrimaryKey (GetTableName job)
+ , FromRow job
+ , Show (PrimaryKey (GetTableName job))
+ , PG.FromField (PrimaryKey (GetTableName job))
+ , KnownSymbol (GetTableName job)
+ ) => UUID -> IO (Maybe job)
+fetchNextJob workerId = do
+ let query = "UPDATE ? SET status = ?, locked_at = NOW(), locked_by = ?, attempts_count = attempts_count + 1 WHERE id IN (SELECT id FROM ? WHERE (status = ?) OR (status = ? AND updated_at < NOW() + interval '30 seconds') AND locked_by IS NULL ORDER BY created_at LIMIT 1 FOR UPDATE) RETURNING id"
+ let params = (PG.Identifier (tableName @job), JobStatusRunning, workerId, PG.Identifier (tableName @job), JobStatusNotStarted, JobStatusRetry)
+
+ result :: [PG.Only (Id job)] <- sqlQuery query params
+ case result of
+ [] -> pure Nothing
+ [PG.Only id] -> Just <$> fetch id
+ otherwise -> error (show otherwise)
+
+-- | Calls a callback every time something is inserted, updated or deleted in a given database table.
+--
+-- In the background this function creates a database trigger to notify this function about table changes
+-- using pg_notify. When there are existing triggers, it will silently recreate them. So this will most likely
+-- not fail.
+--
+-- This function returns a Async. Call 'cancel' on the async to stop watching the database.
+--
+-- __Example:__
+--
+-- > watchInsertOrUpdateTable "projects" do
+-- > putStrLn "Something changed in the projects table"
+--
+-- Now insert something into the @projects@ table. E.g. by running @make psql@ and then running @INSERT INTO projects (id, name) VALUES (DEFAULT, 'New project');@
+-- You will see that @"Something changed in the projects table"@ is printed onto the screen.
+--
+watchForJob :: (?modelContext :: ModelContext) => Text -> IO () -> IO (Async.Async ())
+watchForJob tableName handleJob = do
+ sqlExec (PG.Query $ cs $ createNotificationTrigger tableName) ()
+
+ let listenStatement = "LISTEN " <> PG.Query (cs $ eventName tableName)
+ Async.asyncBound do
+ forever do
+ notification <- withDatabaseConnection \databaseConnection -> do
+ PG.execute databaseConnection listenStatement ()
+ PG.getNotification databaseConnection
+
+ handleJob
+
+createNotificationTrigger :: Text -> Text
+createNotificationTrigger tableName = "CREATE OR REPLACE FUNCTION " <> functionName <> "() RETURNS TRIGGER AS $$"
+ <> "BEGIN\n"
+ <> " PERFORM pg_notify('" <> eventName tableName <> "', '');\n"
+ <> " RETURN new;"
+ <> "END;\n"
+ <> "$$ language plpgsql;"
+ <> "DROP TRIGGER IF EXISTS " <> insertTriggerName <> " ON " <> tableName <> "; CREATE TRIGGER " <> insertTriggerName <> " AFTER INSERT ON \"" <> tableName <> "\" FOR EACH ROW WHEN (NEW.status = 'job_status_not_started' OR NEW.status = 'job_status_retry') EXECUTE PROCEDURE " <> functionName <> "();\n"
+ <> "DROP TRIGGER IF EXISTS " <> updateTriggerName <> " ON " <> tableName <> "; CREATE TRIGGER " <> updateTriggerName <> " AFTER UPDATE ON \"" <> tableName <> "\" FOR EACH ROW WHEN (NEW.status = 'job_status_not_started' OR NEW.status = 'job_status_retry') EXECUTE PROCEDURE " <> functionName <> "();\n"
+ where
+ functionName = "notify_job_queued_" <> tableName
+ insertTriggerName = "did_insert_job_" <> tableName
+ updateTriggerName = "did_update_job_" <> tableName
+
+-- | Retuns the event name of the event that the pg notify trigger dispatches
+eventName :: Text -> Text
+eventName tableName = "job_available_" <> tableName
+
+-- | Called when a job failed. Sets the job status to 'JobStatusFailed' or 'JobStatusRetry' (if more attempts are possible) and resets 'lockedBy'
+jobDidFail :: forall job.
+ ( job ~ GetModelByTableName (GetTableName job)
+ , SetField "lockedBy" job (Maybe UUID)
+ , SetField "status" job JobStatus
+ , SetField "updatedAt" job UTCTime
+ , HasField "attemptsCount" job Int
+ , SetField "lastError" job (Maybe Text)
+ , Job job
+ , CanUpdate job
+ , Show job
+ , ?modelContext :: ModelContext
+ ) => job -> SomeException -> IO ()
+jobDidFail job exception = do
+ updatedAt <- getCurrentTime
+
+ putStrLn ("Failed job with exception: " <> tshow exception)
+
+ let ?job = job
+ let canRetry = get #attemptsCount job < maxAttempts
+ let status = if canRetry then JobStatusRetry else JobStatusFailed
+ job
+ |> set #status status
+ |> set #lockedBy Nothing
+ |> set #updatedAt updatedAt
+ |> set #lastError (Just (tshow exception))
+ |> updateRecord
+
+ pure ()
+
+-- | Called when a job succeeded. Sets the job status to 'JobStatusSucceded' and resets 'lockedBy'
+jobDidSucceed :: forall job.
+ ( job ~ GetModelByTableName (GetTableName job)
+ , SetField "lockedBy" job (Maybe UUID)
+ , SetField "status" job JobStatus
+ , SetField "updatedAt" job UTCTime
+ , HasField "attemptsCount" job Int
+ , SetField "lastError" job (Maybe Text)
+ , Job job
+ , CanUpdate job
+ , Show job
+ , ?modelContext :: ModelContext
+ ) => job -> IO ()
+jobDidSucceed job = do
+ putStrLn "Succeeded job"
+ updatedAt <- getCurrentTime
+ job
+ |> set #status JobStatusSucceeded
+ |> set #lockedBy Nothing
+ |> set #updatedAt updatedAt
+ |> updateRecord
+
+ pure ()
+
+-- | Mapping for @JOB_STATUS@:
+--
+-- > CREATE TYPE JOB_STATUS AS ENUM ('job_status_not_started', 'job_status_running', 'job_status_failed', 'job_status_succeeded', 'job_status_retry');
+instance PG.FromField JobStatus where
+ fromField field (Just "job_status_not_started") = pure JobStatusNotStarted
+ fromField field (Just "job_status_running") = pure JobStatusRunning
+ fromField field (Just "job_status_failed") = pure JobStatusFailed
+ fromField field (Just "job_status_succeeded") = pure JobStatusSucceeded
+ fromField field (Just "job_status_retry") = pure JobStatusRetry
+ fromField field (Just value) = PG.returnError PG.ConversionFailed field ("Unexpected value for enum value. Got: " <> cs value)
+ fromField field Nothing = PG.returnError PG.UnexpectedNull field "Unexpected null for enum value"
+
+-- The default state is @not started@
+instance Default JobStatus where
+ def = JobStatusNotStarted
+
+-- | Mapping for @JOB_STATUS@:
+--
+-- > CREATE TYPE JOB_STATUS AS ENUM ('job_status_not_started', 'job_status_running', 'job_status_failed', 'job_status_succeeded', 'job_status_retry');
+instance PG.ToField JobStatus where
+ toField JobStatusNotStarted = PG.toField ("job_status_not_started" :: Text)
+ toField JobStatusRunning = PG.toField ("job_status_running" :: Text)
+ toField JobStatusFailed = PG.toField ("job_status_failed" :: Text)
+ toField JobStatusSucceeded = PG.toField ("job_status_succeeded" :: Text)
+ toField JobStatusRetry = PG.toField ("job_status_retry" :: Text)
+
+instance InputValue JobStatus where
+ inputValue JobStatusNotStarted = "job_status_not_started" :: Text
+ inputValue JobStatusRunning = "job_status_running" :: Text
+ inputValue JobStatusFailed = "job_status_failed" :: Text
+ inputValue JobStatusSucceeded = "job_status_succeeded" :: Text
+ inputValue JobStatusRetry = "job_status_retry" :: Text
+
+instance IHP.Controller.Param.ParamReader JobStatus where
+ readParameter = IHP.Controller.Param.enumParamReader
\ No newline at end of file
diff --git a/IHP/Job/Runner.hs b/IHP/Job/Runner.hs
new file mode 100644
index 000000000..c0e2a291f
--- /dev/null
+++ b/IHP/Job/Runner.hs
@@ -0,0 +1,134 @@
+{-# LANGUAGE AllowAmbiguousTypes #-}
+{-|
+Module: IHP.Job.Runner
+Description: Functions to run jobs
+Copyright: (c) digitally induced GmbH, 2020
+-}
+module IHP.Job.Runner where
+
+import IHP.Prelude
+import IHP.ControllerPrelude
+import IHP.ScriptSupport
+import IHP.Job.Types
+import qualified IHP.Job.Queue as Queue
+import qualified Control.Exception as Exception
+import qualified Database.PostgreSQL.Simple as PG
+import qualified Database.PostgreSQL.Simple.Types as PG
+import qualified Database.PostgreSQL.Simple.FromField as PG
+import qualified Data.UUID.V4 as UUID
+import qualified Control.Concurrent as Concurrent
+import qualified Control.Concurrent.Async as Async
+import qualified System.Posix.Signals as Signals
+import qualified System.Exit as Exit
+
+runJobWorkers :: [JobWorker] -> Script
+runJobWorkers jobWorkers = do
+ workerId <- UUID.nextRandom
+ allJobs <- newIORef []
+ let oneSecond = 1000000
+
+ putStrLn ("Starting worker " <> tshow workerId)
+
+ let jobWorkerArgs = JobWorkerArgs { allJobs, workerId, modelContext = ?modelContext, frameworkConfig = ?context}
+
+ threadId <- Concurrent.myThreadId
+ exitSignalsCount <- newIORef 0
+ let catchHandler = do
+ exitSignalsCount' <- readIORef exitSignalsCount
+ modifyIORef exitSignalsCount ((+) 1)
+ allJobs' <- readIORef allJobs
+ allJobsCompleted <- allJobs'
+ |> mapM Async.poll
+ >>= pure . filter isNothing
+ >>= pure . null
+ if allJobsCompleted
+ then Concurrent.throwTo threadId Exit.ExitSuccess
+ else if exitSignalsCount' == 0
+ then do
+ putStrLn "Waiting for jobs to complete. CTRL+C again to force exit"
+ forEach allJobs' Async.wait
+ Concurrent.throwTo threadId Exit.ExitSuccess
+ else if exitSignalsCount' == 1 then do
+ putStrLn "Canceling all running jobs. CTRL+C again to force exit"
+ forEach allJobs' Async.cancel
+ Concurrent.throwTo threadId Exit.ExitSuccess
+ else Concurrent.throwTo threadId Exit.ExitSuccess
+
+
+ Signals.installHandler Signals.sigINT (Signals.Catch catchHandler) Nothing
+ Signals.installHandler Signals.sigTERM (Signals.Catch catchHandler) Nothing
+
+ jobWorkers
+ |> mapM (\(JobWorker listenAndRun)-> listenAndRun jobWorkerArgs)
+ >>= Async.waitAnyCancel
+
+ pure ()
+
+
+worker :: forall job.
+ ( job ~ GetModelByTableName (GetTableName job)
+ , FilterPrimaryKey (GetTableName job)
+ , FromRow job
+ , Show (PrimaryKey (GetTableName job))
+ , PG.FromField (PrimaryKey (GetTableName job))
+ , KnownSymbol (GetTableName job)
+ , SetField "attemptsCount" job Int
+ , SetField "lockedBy" job (Maybe UUID)
+ , SetField "status" job JobStatus
+ , SetField "updatedAt" job UTCTime
+ , HasField "attemptsCount" job Int
+ , SetField "lastError" job (Maybe Text)
+ , Job job
+ , CanUpdate job
+ , Show job
+ ) => JobWorker
+worker = JobWorker (jobWorkerFetchAndRunLoop @job)
+
+
+jobWorkerFetchAndRunLoop :: forall job.
+ ( job ~ GetModelByTableName (GetTableName job)
+ , FilterPrimaryKey (GetTableName job)
+ , FromRow job
+ , Show (PrimaryKey (GetTableName job))
+ , PG.FromField (PrimaryKey (GetTableName job))
+ , KnownSymbol (GetTableName job)
+ , SetField "attemptsCount" job Int
+ , SetField "lockedBy" job (Maybe UUID)
+ , SetField "status" job JobStatus
+ , SetField "updatedAt" job UTCTime
+ , HasField "attemptsCount" job Int
+ , SetField "lastError" job (Maybe Text)
+ , Job job
+ , CanUpdate job
+ , Show job
+ ) => JobWorkerArgs -> IO (Async.Async ())
+jobWorkerFetchAndRunLoop JobWorkerArgs { .. } = do
+ let ?context = frameworkConfig
+ let ?modelContext = modelContext
+
+ -- This loop schedules all jobs that are in the queue.
+ -- It will be initally be called when first starting up this job worker
+ -- and after that it will be called when something has been inserted into the queue (or changed to retry)
+ let startLoop = do
+ asyncJob <- Async.async do
+ Exception.mask $ \restore -> do
+ maybeJob <- Queue.fetchNextJob @job workerId
+ case maybeJob of
+ Just job -> do
+ putStrLn ("Starting job: " <> tshow job)
+ resultOrException <- Exception.try (restore (perform job))
+ case resultOrException of
+ Left exception -> Queue.jobDidFail job exception
+ Right _ -> Queue.jobDidSucceed job
+
+ startLoop
+ Nothing -> pure ()
+ modifyIORef allJobs (asyncJob:)
+
+ -- Start all jobs in the queue
+ startLoop
+
+ -- Start a job when a new job is added to the table or when it's set to retry
+ watcher <- Queue.watchForJob (tableName @job) startLoop
+
+ pure watcher
diff --git a/IHP/Job/Types.hs b/IHP/Job/Types.hs
new file mode 100644
index 000000000..e3283bbe8
--- /dev/null
+++ b/IHP/Job/Types.hs
@@ -0,0 +1,40 @@
+module IHP.Job.Types
+( Job (..)
+, JobWorkerArgs (..)
+, JobWorker (..)
+, JobStatus (..)
+, Worker (..)
+)
+where
+
+import IHP.Prelude
+import IHP.FrameworkConfig
+import qualified Control.Concurrent.Async as Async
+
+class Job job where
+ perform :: (?modelContext :: ModelContext, ?context :: FrameworkConfig) => job -> IO ()
+
+ maxAttempts :: (?job :: job) => Int
+ maxAttempts = 10
+
+class Worker application where
+ workers :: application -> [JobWorker]
+
+data JobWorkerArgs = JobWorkerArgs
+ { allJobs :: IORef [Async.Async ()]
+ , workerId :: UUID
+ , modelContext :: ModelContext
+ , frameworkConfig :: FrameworkConfig }
+
+newtype JobWorker = JobWorker (JobWorkerArgs -> IO (Async.Async ()))
+
+-- | Mapping for @JOB_STATUS@. The DDL statement for this can be found in IHPSchema.sql:
+--
+-- > CREATE TYPE JOB_STATUS AS ENUM ('job_status_not_started', 'job_status_running', 'job_status_failed', 'job_status_succeeded', 'job_status_retry');
+data JobStatus
+ = JobStatusNotStarted
+ | JobStatusRunning
+ | JobStatusFailed
+ | JobStatusSucceeded
+ | JobStatusRetry
+ deriving (Eq, Show, Read, Enum)
\ No newline at end of file
diff --git a/IHP/SchemaCompiler.hs b/IHP/SchemaCompiler.hs
index 9ac7469e5..f96780a91 100644
--- a/IHP/SchemaCompiler.hs
+++ b/IHP/SchemaCompiler.hs
@@ -142,6 +142,8 @@ compileTypes options schema@(Schema statements) =
<> "import qualified Data.Aeson\n"
<> "import Database.PostgreSQL.Simple.Types (Query (Query), Binary ( .. ))\n"
<> "import qualified Database.PostgreSQL.Simple.Types\n"
+ <> "import IHP.Job.Types\n"
+ <> "import IHP.Job.Queue ()\n"
compileStatementPreview :: [Statement] -> Statement -> Text
compileStatementPreview statements statement = let ?schema = Schema statements in compileStatement previewCompilerOptions statement
diff --git a/IHP/Server.hs b/IHP/Server.hs
index c93d02820..17b8cbd88 100644
--- a/IHP/Server.hs
+++ b/IHP/Server.hs
@@ -29,8 +29,11 @@ import qualified IHP.AutoRefresh as AutoRefresh
import qualified IHP.AutoRefresh.Types as AutoRefresh
import qualified IHP.WebSocket as WS
import IHP.LibDir
+import qualified IHP.Job.Runner as Job
+import qualified IHP.Job.Types as Job
+import qualified Control.Concurrent.Async as Async
-run :: (FrontController RootApplication) => ConfigBuilder -> IO ()
+run :: (FrontController RootApplication, Job.Worker RootApplication) => ConfigBuilder -> IO ()
run configBuilder = do
frameworkConfig@(FrameworkConfig { environment, appPort, dbPoolMaxConnections, dbPoolIdleTime, databaseUrl, sessionCookie, requestLoggerMiddleware }) <- buildFrameworkConfig configBuilder
session <- Vault.newKey
@@ -57,13 +60,22 @@ run configBuilder = do
|> Warp.setPort appPort
in Warp.runSettings settings
else Warp.runEnv appPort
- runServer $
- staticMiddleware $
- sessionMiddleware $
- ihpWebsocketMiddleware $
- requestLoggerMiddleware $
- methodOverridePost $
- application
+
+ let jobWorkers = Job.workers RootApplication
+ let withBackgroundWorkers app =
+ if isDevelopment && not (isEmpty jobWorkers)
+ then Async.withAsync (let ?context = frameworkConfig in Job.runJobWorkers jobWorkers) (\_ -> app)
+ else app
+
+
+ withBackgroundWorkers do
+ runServer $
+ staticMiddleware $
+ sessionMiddleware $
+ ihpWebsocketMiddleware $
+ requestLoggerMiddleware $
+ methodOverridePost $
+ application
{-# INLINE run #-}
ihpWebsocketMiddleware :: (?applicationContext :: ApplicationContext) => Middleware
diff --git a/IHP/ViewPrelude.hs b/IHP/ViewPrelude.hs
index 42e36f7af..baa41bdf2 100644
--- a/IHP/ViewPrelude.hs
+++ b/IHP/ViewPrelude.hs
@@ -28,7 +28,8 @@ module IHP.ViewPrelude (
module IHP.Controller.Context,
module IHP.Controller.Layout,
module IHP.Modal.Types,
- module IHP.Modal.ViewFunctions
+ module IHP.Modal.ViewFunctions,
+ module IHP.Job.Types
) where
import IHP.Prelude
@@ -55,4 +56,5 @@ import IHP.Controller.Context
import IHP.Controller.Layout
import IHP.Modal.Types
-import IHP.Modal.ViewFunctions
\ No newline at end of file
+import IHP.Modal.ViewFunctions
+import IHP.Job.Types
\ No newline at end of file
diff --git a/Test/IDE/CodeGeneration/JobGenerator.hs b/Test/IDE/CodeGeneration/JobGenerator.hs
new file mode 100644
index 000000000..d2baa7fb2
--- /dev/null
+++ b/Test/IDE/CodeGeneration/JobGenerator.hs
@@ -0,0 +1,45 @@
+{-|
+Module: Test.IDE.CodeGeneration.JobGenerator
+Copyright: (c) digitally induced GmbH, 2020
+-}
+module Test.IDE.CodeGeneration.JobGenerator where
+
+import Test.Hspec
+import IHP.Prelude
+import qualified IHP.IDE.CodeGen.JobGenerator as JobGenerator
+import IHP.ViewPrelude (cs, plain)
+import qualified Text.Megaparsec as Megaparsec
+import IHP.IDE.CodeGen.Types
+import IHP.IDE.SchemaDesigner.Types
+import IHP.NameSupport
+
+
+tests = do
+ describe "Job Generator" do
+ it "should build a job with name \"CreateContainerJobs\"" do
+ let applicationName = "Web"
+ let tableName = "create_container_jobs"
+ let modelName = "CreateContainerJob"
+ let config = JobGenerator.JobConfig { .. }
+ let builtPlan = JobGenerator.buildPlan' config
+
+ builtPlan `shouldBe` [
+ EnsureDirectory {directory = "Web/Job"}
+ , CreateFile {filePath = "Web/Job/CreateContainer.hs", fileContent = "module Web.Job.CreateContainer where\nimport Web.Controller.Prelude\n\ninstance Job CreateContainerJob where\n perform CreateContainerJob { .. } = do\n putStrLn \"Hello World!\"\n"}
+ , AppendToFile {filePath = "Application/Schema.sql", fileContent = "CREATE TABLE create_container_jobs (\n id UUID DEFAULT uuid_generate_v4() PRIMARY KEY NOT NULL,\n created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() NOT NULL,\n updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() NOT NULL,\n status JOB_STATUS DEFAULT 'job_status_not_started' NOT NULL,\n last_error TEXT DEFAULT NULL,\n attempts_count INT DEFAULT 0 NOT NULL,\n locked_at TIMESTAMP WITH TIME ZONE DEFAULT NULL,\n locked_by UUID DEFAULT NULL\n);\n"}
+ , AppendToMarker {marker = "-- Job Imports", filePath = "Web/Worker.hs", fileContent = "import Web.Job.CreateContainer"}
+ ]
+
+ it "should support other applications" do
+ let applicationName = "Admin"
+ let tableName = "create_container_jobs"
+ let modelName = "CreateContainerJob"
+ let config = JobGenerator.JobConfig { .. }
+ let builtPlan = JobGenerator.buildPlan' config
+
+ builtPlan `shouldBe` [
+ EnsureDirectory {directory = "Admin/Job"}
+ , CreateFile {filePath = "Admin/Job/CreateContainer.hs", fileContent = "module Admin.Job.CreateContainer where\nimport Admin.Controller.Prelude\n\ninstance Job CreateContainerJob where\n perform CreateContainerJob { .. } = do\n putStrLn \"Hello World!\"\n"}
+ , AppendToFile {filePath = "Application/Schema.sql", fileContent = "CREATE TABLE create_container_jobs (\n id UUID DEFAULT uuid_generate_v4() PRIMARY KEY NOT NULL,\n created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() NOT NULL,\n updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() NOT NULL,\n status JOB_STATUS DEFAULT 'job_status_not_started' NOT NULL,\n last_error TEXT DEFAULT NULL,\n attempts_count INT DEFAULT 0 NOT NULL,\n locked_at TIMESTAMP WITH TIME ZONE DEFAULT NULL,\n locked_by UUID DEFAULT NULL\n);\n"}
+ , AppendToMarker {marker = "-- Job Imports", filePath = "Admin/Worker.hs", fileContent = "import Admin.Job.CreateContainer"}
+ ]
diff --git a/Test/Main.hs b/Test/Main.hs
index f01829ec2..04b30a0da 100644
--- a/Test/Main.hs
+++ b/Test/Main.hs
@@ -22,6 +22,7 @@ import qualified Test.ValidationSupport.ValidateFieldSpec
import qualified Test.IDE.CodeGeneration.ControllerGenerator
import qualified Test.IDE.CodeGeneration.ViewGenerator
import qualified Test.IDE.CodeGeneration.MailGenerator
+import qualified Test.IDE.CodeGeneration.JobGenerator
import qualified Test.HtmlSupport.QQSpec
import qualified Test.HtmlSupport.ParserSpec
import qualified Test.NameSupportSpec
@@ -39,6 +40,7 @@ main = hspec do
Test.IDE.CodeGeneration.ControllerGenerator.tests
Test.IDE.CodeGeneration.ViewGenerator.tests
Test.IDE.CodeGeneration.MailGenerator.tests
+ Test.IDE.CodeGeneration.JobGenerator.tests
Test.HtmlSupport.QQSpec.tests
Test.NameSupportSpec.tests
Test.HaskellSupportSpec.tests
diff --git a/ihp.cabal b/ihp.cabal
index c1f07ae86..f2b73c13a 100644
--- a/ihp.cabal
+++ b/ihp.cabal
@@ -258,6 +258,11 @@ library IHPFramework
, IHP.Version
, IHP.Point
, Paths_ihp
+ , IHP.IDE.CodeGen.JobGenerator
+ , IHP.IDE.CodeGen.View.NewJob
+ , IHP.Job.Queue
+ , IHP.Job.Runner
+ , IHP.Job.Types
executable RunDevServer
import: shared-properties
diff --git a/lib/IHP/IHPSchema.sql b/lib/IHP/IHPSchema.sql
index a58db1c1f..9da57bb0b 100644
--- a/lib/IHP/IHPSchema.sql
+++ b/lib/IHP/IHPSchema.sql
@@ -1,2 +1,5 @@
-- Provides all the default settings for a IHP database in development mode
-CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
\ No newline at end of file
+CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
+
+-- Used by IHP.Job
+CREATE TYPE JOB_STATUS AS ENUM ('job_status_not_started', 'job_status_running', 'job_status_failed', 'job_status_succeeded', 'job_status_retry');
\ No newline at end of file
diff --git a/lib/IHP/Makefile.dist b/lib/IHP/Makefile.dist
index 80fa2d424..50eb87e31 100644
--- a/lib/IHP/Makefile.dist
+++ b/lib/IHP/Makefile.dist
@@ -178,6 +178,22 @@ build/Script/Main/%.hs: Application/Script/%.hs
echo "import Application.Script.$* (run)" >> $@
echo "main = runScript Config.config run" >> $@
+build/bin/RunJobs: build/RunJobs.hs
+ mkdir -p build/bin
+ ghc -O2 -main-is 'RunJobs.main' ${GHC_OPTIONS} ${PROD_GHC_OPTIONS} $< -o $@ -odir build -hidir build
+
+build/RunJobs.hs: build/Generated/Types.hs
+ echo "module RunJobs (main) where" > $@
+ echo "import Application.Script.Prelude" >> $@
+ echo "import IHP.ScriptSupport" >> $@
+ echo "import IHP.Job.Runner" >> $@
+ echo "import qualified Config" >> $@
+ echo "import Main" >> $@
+ echo "main :: IO ()" >> $@
+ echo "main = runScript Config.config run" >> $@
+ echo "run :: Script" >> $@
+ echo "run = runJobWorkers (workers RootApplication)" >> $@
+
hie.yaml: # Configuration for haskell-language-server
echo "cradle:" > hie.yaml
echo " bios:" >> hie.yaml