-
Notifications
You must be signed in to change notification settings - Fork 3
JobExecutor
Note: This documentation covers the latest version in the develop
branch.
The JobExecutor
takes a Job
object, traverses all of the Task
s defined within it, creating an execution DAG. The execution plan creates a DAG based on the list of Task
dependencies by way of transitive reduction.
Creation of said execution plan determines which tasks can run asynchronously depending on their definitions by the developer. It then generates CompletableFuture
.
Creation of a JobExecutor
is the last step before you actually run a Job
. Please make sure to familiarize yourself with creating Tasks and Jobs before continuing on.
Let's assume you've created a Job
object, and you're ready to prepare that job for execution. To do so:
val jobExec: JobExecutor = JobExecutor(job)
That's all there is to it.
Control of execution of the Job
s is where it can get interesting.
When you create a JobExecutor
and assign the Job
, all you've essentially done is called the constructor and created an object. It is not until you call the queue()
method that the DAG is created, an execution plan is formed, and the Job
is placed into JobQueued
state, along with all of its Task
s.
JobExecution
can take place one of two ways: either in a blocking state (meaning when you call the run()
method, it blocks until complete), or in a non-blocking state (fire-and-forget.)
Setting the JobExecution
to run either blocking or non-blocking is simply calling the setBlocking(flag)
method, and setting false
to run non-blocking. JobExecution
runs all Job
s in blocking state unless otherwise set. Note: you can retrieve a JobExecution
's blocking status by examining the blocking
value.
Once the JobExecutor
has queued the Job
for execution, it has created an internal CompletableFuture
to execute, with controls allowing to pause and resume the job.
To run the Job
, all that's required is a call to the run()
method.
Example:
val task1: Task = TaskBuilder() ... .build()
val job1: Job = JobBuilder() ... .build()
val jobExec: JobExecutor = JobExecutor(job1)
jobExec.queue().run()
This will run the Job
either in the background, or wait on the run()
until the execution completes all of the Task
s associated with it.
NOTE: Calling queue()
will queue the Task
list in the order in which it was created. Task
s are never created out-of-order.
The following is a list of Exception
s that could be thrown from the JobExecutor
:
-
InvalidTaskStateException
: indicates aTask
status was unexpectedly altered -
InvalidJobExecutionStateException
: indicates aJob
status was unexpectedly altered -
DuplicateTaskNameException
: indicates aTask
being queued already exists by thetask.name
-
InvalidTaskDependencyException
: indicates aTask
specified adependsOn
that was not yet queued
The first two Exception
types are rare, and only really occur when running code out of sync, or if these statuses are modified outside of the JobExecutor
class. You rarely need to modify the Task
status yourself.
DuplicateTaskNameException
can occur when a Task
is added to a Job
twice, or if a Task
has a duplicate name in another Task
.
InvalidTaskDependencyException
can occur when a Task
depends on another Task
but the dependent Task
has not yet been queued.
Flow control is provided via two mechanisms: pause()
and resume()
.
Calling pause()
on the JobExecutor
will pause further execution of any Task
s that have not yet been executed. Calling pause()
twice in succession will have no effect on the executor.
Calling resume()
on the JobExecutor
will resume execution of any remaining Task
s that have not yet been executed. Calling resume()
twice in succession will have no effect on the executor.
To cancel a Job
that is currently in progress, simply call the cancel(reason: String)
method. This will force the CompletableFuture
to cancel whatever Task
is currently running (interrupting it, essentially), and setting the Job
status to JobCanceled(reason)
.
Any Task
s that were currently in progress will be canceled, and any further Task
s that were scheduled to run will be canceled, their status
will be set to TaskCanceled(reason)
.
If you want to get at the CompletableFuture[Void]
after it has been created, use the getCompletableFuture()
method call.
So, to recap, all that's required to run a Job
is three steps:
- Create a Task
- Define a Job
- Create a
JobExecutor
- Queue the
JobExecutor
- Set to non-blocking if you want to run the
Job
in the background, and finally - Call
run()
on theJobExecutor
.
That's it.
Once the JobExecutor
's run()
method has completed, you can view the Job
's status
accessor, and view the status of the Job
after it has run. If no Exception
was thrown during the run, the Job
should terminate normally. Otherwise, you can view the result of the problem by match
ing the Job Status against a JobFailed(reason)
and extract the root Exception
.
Licensed under the Apache License 2.0 - Lots of help from Reddit - Happy Hacking!