-
Notifications
You must be signed in to change notification settings - Fork 360
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WX-1217 Workflow completion callback #7213
Changes from 31 commits
05a30bb
106b669
5fc1e66
3e8500f
a114372
4ff381d
14584e9
a87cf07
891121e
a5454aa
f37d614
04b8ce7
a9c2ce2
27dac91
b190a44
84b6461
5f1e3cb
2870c92
ade1ca2
7cddd10
0270448
a91ddd9
c8ea146
9867876
6c94189
fd1b171
25e2f3a
5392525
9dcfe21
f8983cf
d51bece
029b24c
b291a1d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
The workflow callback is a simple way to integrate Cromwell with an external system. When each workflow reaches a terminal | ||
state, Cromwell will POST a message to a provided URL (see below for schema of this message). Messages are sent for root | ||
workflows only, not subworkflows. | ||
|
||
### Configuration | ||
|
||
This feature will only be used if enabled via config. All config items except `enabled` are optional. | ||
|
||
``` | ||
workflow-state-callback { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we include where in the config file should this go? Top level? In There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems like the answer is: yes, top level |
||
enabled: true | ||
num-threads: 5 | ||
endpoint: "http://example.com" | ||
auth.azure: true | ||
request-backoff { | ||
min: "3 seconds", | ||
max: "5 minutes", | ||
multiplier: 1.1 | ||
} | ||
max-retries = 10 | ||
} | ||
``` | ||
|
||
* `enabled`: This boolean controls whether a callback will be attempted or not. | ||
* `num-threads`: The number of threads Cromwell will allocate for performing callbacks. | ||
* `endpoint`: This is the default URL to send the message to. If this is unset, and no URL is set in workflow options, no callback will be sent. | ||
* `auth.azure`: If true, and if Cromwell is running in an Azure environment, Cromwell will include an auth header with bearer token generated from local Azure credentials. | ||
* `request-backoff` and `max-retries`: Include these to override the default retry behavior (default behavior shown here). | ||
|
||
### Workflow Options | ||
|
||
You may choose to override the `endpoint` set in config by including this workflow option: | ||
``` | ||
{ | ||
"workflow_callback_uri": "http://mywebsite.com" | ||
} | ||
``` | ||
|
||
### Callback schema | ||
|
||
Below is an example of a callback request body. | ||
|
||
``` | ||
{ | ||
"workflowId": "00001111-2222-3333-4444-555566667777", | ||
"state": "Succeeded", | ||
"outputs": { | ||
"task1.out": 5, | ||
"task2.out": "/some/file.txt" | ||
} | ||
} | ||
``` | ||
|
||
* `workflowId`: The UUID of the workflow | ||
* `state`: The terminal state of the workflow. The list of possible values is: `Succeeded`, `Failed`, `Aborted` | ||
* `outputs`: The final outputs of the workflow, as would be returned from the `api/workflows/{version}/{id}/outputs` endpoint. Expected to be empty when the workflow is not successful.. | ||
* `failures`: A list of strings describing the workflow's failures. Expected to be empty if the workflow did not fail. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,6 @@ | ||
package cromwell.engine.workflow | ||
|
||
import java.util.concurrent.atomic.AtomicInteger | ||
|
||
import akka.actor.SupervisorStrategy.Stop | ||
import akka.actor._ | ||
import com.typesafe.config.Config | ||
|
@@ -23,6 +22,7 @@ | |
import cromwell.engine.workflow.lifecycle.deletion.DeleteWorkflowFilesActor.{DeleteWorkflowFilesFailedResponse, DeleteWorkflowFilesSucceededResponse, StartWorkflowFilesDeletion} | ||
import cromwell.engine.workflow.lifecycle.execution.WorkflowExecutionActor | ||
import cromwell.engine.workflow.lifecycle.execution.WorkflowExecutionActor._ | ||
import cromwell.engine.workflow.lifecycle.finalization.WorkflowCallbackActor.PerformCallbackCommand | ||
import cromwell.engine.workflow.lifecycle.finalization.WorkflowFinalizationActor.{StartFinalizationCommand, WorkflowFinalizationFailedResponse, WorkflowFinalizationSucceededResponse} | ||
import cromwell.engine.workflow.lifecycle.finalization.{CopyWorkflowLogsActor, CopyWorkflowOutputsActor, WorkflowFinalizationActor} | ||
import cromwell.engine.workflow.lifecycle.initialization.WorkflowInitializationActor | ||
|
@@ -149,7 +149,7 @@ | |
initializationData: AllBackendInitializationData, | ||
lastStateReached: StateCheckpoint, | ||
effectiveStartableState: StartableState, | ||
workflowFinalOutputs: Set[WomValue] = Set.empty, | ||
workflowFinalOutputs: Option[CallOutputs] = None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This change was made so that the |
||
workflowAllOutputs: Set[WomValue] = Set.empty, | ||
rootAndSubworkflowIds: Set[WorkflowId] = Set.empty, | ||
failedInitializationAttempts: Int = 0) | ||
|
@@ -176,6 +176,7 @@ | |
ioActor: ActorRef, | ||
serviceRegistryActor: ActorRef, | ||
workflowLogCopyRouter: ActorRef, | ||
workflowCallbackActor: Option[ActorRef], | ||
jobStoreActor: ActorRef, | ||
subWorkflowStoreActor: ActorRef, | ||
callCacheReadActor: ActorRef, | ||
|
@@ -199,6 +200,7 @@ | |
ioActor = ioActor, | ||
serviceRegistryActor = serviceRegistryActor, | ||
workflowLogCopyRouter = workflowLogCopyRouter, | ||
workflowCallbackActor = workflowCallbackActor, | ||
jobStoreActor = jobStoreActor, | ||
subWorkflowStoreActor = subWorkflowStoreActor, | ||
callCacheReadActor = callCacheReadActor, | ||
|
@@ -226,6 +228,7 @@ | |
ioActor: ActorRef, | ||
override val serviceRegistryActor: ActorRef, | ||
workflowLogCopyRouter: ActorRef, | ||
workflowCallbackActor: Option[ActorRef], | ||
jobStoreActor: ActorRef, | ||
subWorkflowStoreActor: ActorRef, | ||
callCacheReadActor: ActorRef, | ||
|
@@ -534,26 +537,27 @@ | |
case _ => // The WMA is waiting for the WorkflowActorWorkComplete message. No extra information needed here. | ||
} | ||
|
||
// Copy/Delete workflow logs | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This section is just a copy/paste of this code to move the brute force workflow options up in scope (outside |
||
if (WorkflowLogger.isEnabled) { | ||
/* | ||
* The submitted workflow options have been previously validated by the CromwellApiHandler. These are | ||
* being recreated so that in case MaterializeWorkflowDescriptor fails, the workflow logs can still | ||
* be copied by accessing the workflow options outside of the EngineWorkflowDescriptor. | ||
*/ | ||
def bruteForceWorkflowOptions: WorkflowOptions = sources.workflowOptions | ||
val system = context.system | ||
val ec = context.system.dispatcher | ||
def bruteForcePathBuilders: Future[List[PathBuilder]] = { | ||
// Protect against path builders that may throw an exception instead of returning a failed future | ||
Future(EngineFilesystems.pathBuildersForWorkflow(bruteForceWorkflowOptions, pathBuilderFactories)(system))(ec).flatten | ||
} | ||
/* | ||
* The submitted workflow options have been previously validated by the CromwellApiHandler. These are | ||
* being recreated so that even if the MaterializeWorkflowDescriptor fails, the workflow options can still be | ||
* accessed outside of the EngineWorkflowDescriptor. Used for both copying workflow log and sending workflow callbacks. | ||
*/ | ||
def bruteForceWorkflowOptions: WorkflowOptions = sources.workflowOptions | ||
|
||
val system = context.system | ||
val ec = context.system.dispatcher | ||
def bruteForcePathBuilders: Future[List[PathBuilder]] = { | ||
// Protect against path builders that may throw an exception instead of returning a failed future | ||
Future(EngineFilesystems.pathBuildersForWorkflow(bruteForceWorkflowOptions, pathBuilderFactories)(system))(ec).flatten | ||
} | ||
|
||
val (workflowOptions, pathBuilders) = stateData.workflowDescriptor match { | ||
case Some(wd) => (wd.backendDescriptor.workflowOptions, Future.successful(wd.pathBuilders)) | ||
case None => (bruteForceWorkflowOptions, bruteForcePathBuilders) | ||
} | ||
val (workflowOptions, pathBuilders) = stateData.workflowDescriptor match { | ||
case Some(wd) => (wd.backendDescriptor.workflowOptions, Future.successful(wd.pathBuilders)) | ||
case None => (bruteForceWorkflowOptions, bruteForcePathBuilders) | ||
} | ||
|
||
// Copy/Delete workflow logs | ||
if (WorkflowLogger.isEnabled) { | ||
workflowOptions.get(FinalWorkflowLogDir).toOption match { | ||
case Some(destinationDir) => | ||
pathBuilders | ||
|
@@ -566,6 +570,18 @@ | |
} | ||
} | ||
|
||
// Attempt to perform workflow completion callback | ||
workflowCallbackActor.foreach { wca => | ||
val callbackUri = workflowOptions.get(WorkflowOptions.WorkflowCallbackUri).toOption | ||
wca ! PerformCallbackCommand( | ||
workflowId, | ||
callbackUri, | ||
terminalState.workflowState, | ||
stateData.workflowFinalOutputs.getOrElse(CallOutputs.empty), | ||
nextStateData.lastStateReached.failures.toList.flatMap(_.map(_.getMessage)) | ||
) | ||
} | ||
|
||
// We can't transition from within another transition function, but we can instruct ourselves to with a message: | ||
self ! AwaitMetadataIntegrity | ||
jgainerdewar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
@@ -627,7 +643,7 @@ | |
rootWorkflowId = rootWorkflowId, | ||
rootWorkflowRootPaths = data.initializationData.getWorkflowRoots(), | ||
rootAndSubworkflowIds = data.rootAndSubworkflowIds, | ||
workflowFinalOutputs = data.workflowFinalOutputs, | ||
workflowFinalOutputs = data.workflowFinalOutputs.map(out => out.outputs.values.toSet).getOrElse(Set.empty), | ||
workflowAllOutputs = data.workflowAllOutputs, | ||
pathBuilders = data.workflowDescriptor.get.pathBuilders, | ||
serviceRegistryActor = serviceRegistryActor, | ||
|
@@ -689,7 +705,7 @@ | |
finalizationActor ! StartFinalizationCommand | ||
goto(FinalizingWorkflowState) using data.copy( | ||
lastStateReached = StateCheckpoint (lastStateOverride.getOrElse(stateName), failures), | ||
workflowFinalOutputs = workflowFinalOutputs.outputs.values.toSet, | ||
workflowFinalOutputs = Option(workflowFinalOutputs), | ||
workflowAllOutputs = workflowAllOutputs, | ||
rootAndSubworkflowIds = rootAndSubworkflowIds | ||
) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One last thought - maybe include the phase "best effort" here to make sure it clear that Cromwell will try, but makes no guarantees about delivery