Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WX-1110[risk=low] Added endpoint to fetch failed tasks by workflow id #7165

Merged
merged 18 commits into from
Jul 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
b0efcf5
wx-1110 saving current progress, need to translate entries to events …
JVThomas May 25, 2023
978304d
WX-1110 end to end path building successfully, need to test if it wor…
JVThomas Jun 9, 2023
bec897b
WX-1110 added new metadata endpoint, partially implemented swagger do…
JVThomas Jun 9, 2023
c3b5f0e
WX-1110 corrections to SQL query and endpoint flow (need to test if a…
JVThomas Jun 22, 2023
53bd3f8
WX-1110 added test for new MetadataSlickDatabase function
JVThomas Jun 26, 2023
e6ba45e
WX-1110 recombined metadta entry transformation logic
JVThomas Jun 27, 2023
3f648a3
WX-1110 added content checks on emtries for db test
JVThomas Jun 27, 2023
ff6b590
WX-1110 created method to handle identifier parsing for postgres
JVThomas Jun 28, 2023
5e71659
WX-1110 corrections to seq insertions, test corrections
JVThomas Jun 28, 2023
55559ef
WX-1110 updated query to resolve for PG and non-PG dbs, updated test
JVThomas Jul 7, 2023
4b62b90
WX-1110 added test for failed task event for MetadataBuilderActor
JVThomas Jul 10, 2023
f8fa61e
WX-1110 added endpoint test for failed jobs
JVThomas Jul 10, 2023
f5bcb3e
Merge branch 'develop' into WX-1110
JVThomas Jul 10, 2023
6f4137b
WX-1110 minor update to swagger endpoint page
JVThomas Jul 10, 2023
27d9461
WX-1110 typo corrections, comment cleanup
JVThomas Jul 10, 2023
c6105cc
WX-1110 doc updates, mock response and test corrections, added row co…
JVThomas Jul 12, 2023
4235421
Merge branch 'develop' into WX-1110
JVThomas Jul 13, 2023
ccc601b
WX-1110 corrected query to correctly handle failed fetches against ta…
JVThomas Jul 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -515,4 +515,9 @@ class MetadataSlickDatabase(originalDatabaseConfig: Config)
override def getMetadataTableSizeInformation()(implicit ec: ExecutionContext): Future[Option[InformationSchemaEntry]] = {
runAction(dataAccess.metadataTableSizeInformation())
}

override def getFailedJobsMetadataWithWorkflowId(rootWorkflowId: String)(implicit ec: ExecutionContext): Future[Vector[MetadataEntry]] = {
val isPostgres = databaseConfig.getValue("db.driver").toString.toLowerCase().contains("postgres")
runLobAction(dataAccess.failedJobsMetadataWithWorkflowId(rootWorkflowId, isPostgres))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,84 @@ trait MetadataEntryComponent {
}).headOption
}

def failedJobsMetadataWithWorkflowId(rootWorkflowId: String, isPostgres: Boolean) = {
val getMetadataEntryResult = GetResult(r => {
MetadataEntry(r.<<, r.<<, r.<<, r.<<, r.<<, r.nextClobOption().map(clob => new SerialClob(clob)), r.<<, r.<<, r.<<)
})

def dbIdentifierWrapper(identifier: String, isPostgres: Boolean) = {
if(isPostgres) s"${'"'}$identifier${'"'}" else identifier
}

def dbMetadataValueColCheckName(isPostgres: Boolean): String = {
if(isPostgres) "obj.data" else "METADATA_VALUE"
}

def targetCallsSelectStatement(callFqn: String, scatterIndex: String, retryAttempt: String): String = {
s"SELECT ${callFqn}, MAX(COALESCE(${scatterIndex}, 0)) as maxScatter, MAX(COALESCE(${retryAttempt}, 0)) AS maxRetry"
}

def pgObjectInnerJoinStatement(isPostgres: Boolean, metadataValColName: String): String = {
if(isPostgres) s"INNER JOIN pg_largeobject obj ON me.${metadataValColName} = cast(obj.loid as text)" else ""
}

val workflowUuid = dbIdentifierWrapper("WORKFLOW_EXECUTION_UUID", isPostgres)
val callFqn = dbIdentifierWrapper("CALL_FQN", isPostgres)
val scatterIndex = dbIdentifierWrapper("JOB_SCATTER_INDEX", isPostgres)
val retryAttempt = dbIdentifierWrapper("JOB_RETRY_ATTEMPT", isPostgres)
val metadataKey = dbIdentifierWrapper("METADATA_KEY", isPostgres)
val metadataValueType = dbIdentifierWrapper("METADATA_VALUE_TYPE", isPostgres)
val metadataTimestamp = dbIdentifierWrapper("METADATA_TIMESTAMP", isPostgres)
val metadataJournalId = dbIdentifierWrapper("METADATA_JOURNAL_ID", isPostgres)
val rootUuid = dbIdentifierWrapper("ROOT_WORKFLOW_EXECUTION_UUID", isPostgres)
val metadataValue = dbIdentifierWrapper("METADATA_VALUE", isPostgres)
val metadataEntry = dbIdentifierWrapper("METADATA_ENTRY", isPostgres)
val wmse = dbIdentifierWrapper("WORKFLOW_METADATA_SUMMARY_ENTRY", isPostgres)
val resultSetColumnNames = s"me.${workflowUuid}, me.${callFqn}, me.${scatterIndex}, me.${retryAttempt}, me.${metadataKey}, me.${metadataValue}, me.${metadataValueType}, me.${metadataTimestamp}, me.${metadataJournalId}"

val query = sql"""
SELECT #${resultSetColumnNames}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's up with these hash signs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's Slick's way of handling string interpolation. You would think "Why not use ${}? It's because Slick is using that notation specifically to bind values in a query.

So something like tableName.colName = ${val} would work, but `SELECT ${tableName}.${colName} would throw an error since you're declaring an identifier rather than providing a value.

This also caught me off guard and is only briefly covered in the Slick docs.

FROM #${metadataEntry} me
INNER JOIN (
#${targetCallsSelectStatement(callFqn, scatterIndex, retryAttempt)}
FROM #${metadataEntry} me
INNER JOIN #${wmse} wmse
ON wmse.#${workflowUuid} = me.#${workflowUuid}
WHERE (wmse.#${rootUuid} = $rootWorkflowId OR wmse.#${workflowUuid} = $rootWorkflowId)
AND #${callFqn} IS NOT NULL
GROUP BY #${callFqn}
) AS targetCalls
ON me.#${callFqn} = targetCalls.#${callFqn}
LEFT JOIN (
SELECT DISTINCT #${callFqn}
FROM #${metadataEntry} me
INNER JOIN #${wmse} wmse
ON wmse.#${workflowUuid} = me.#${workflowUuid}
WHERE (wmse.#${rootUuid} = $rootWorkflowId OR wmse.#${workflowUuid} = $rootWorkflowId)
AND me.#${metadataKey} = 'subWorkflowId'
GROUP BY #${callFqn}
) AS avoidedCalls
ON me.#${callFqn} = avoidedCalls.#${callFqn}
INNER JOIN #${wmse} wmse
ON wmse.#${workflowUuid} = me.#${workflowUuid}
#${pgObjectInnerJoinStatement(isPostgres, metadataValue)}
WHERE avoidedCalls.#${callFqn} IS NULL
AND (me.#${metadataKey} in ('executionStatus', 'backendStatus') AND #${dbMetadataValueColCheckName(isPostgres)} = 'Failed')
AND (
(COALESCE(me.#${retryAttempt}, 0) = targetCalls.maxRetry AND me.#${scatterIndex} IS NULL)
OR (COALESCE(me.#${retryAttempt}, 0) = targetCalls.maxRetry AND me.#${scatterIndex} = targetCalls.maxScatter)
)
GROUP BY #${resultSetColumnNames}
HAVING me.#${workflowUuid} IN (
SELECT DISTINCT wmse.#${workflowUuid}
FROM #${wmse} wmse
WHERE wmse.#${rootUuid} = $rootWorkflowId OR wmse.#${workflowUuid} = $rootWorkflowId
)
"""

query.as(getMetadataEntryResult)
}

private[this] def metadataEntryHasMetadataKeysLike(metadataEntry: MetadataEntries,
metadataKeysToFilterFor: List[String],
metadataKeysToFilterOut: List[String]): Rep[Boolean] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,4 +197,6 @@ trait MetadataSqlDatabase extends SqlDatabase {
def countWorkflowsLeftToDeleteThatEndedOnOrBeforeThresholdTimestamp(workflowEndTimestampThreshold: Timestamp)(implicit ec: ExecutionContext): Future[Int]

def getMetadataTableSizeInformation()(implicit ec: ExecutionContext): Future[Option[InformationSchemaEntry]]

def getFailedJobsMetadataWithWorkflowId(rootWorkflowId: String)(implicit ec: ExecutionContext): Future[Vector[MetadataEntry]]
}
32 changes: 31 additions & 1 deletion docs/api/RESTAPI.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions engine/src/main/resources/swagger/cromwell.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,26 @@ paths:
$ref: '#/responses/NotFound'
'500':
$ref: '#/responses/ServerError'
'/api/workflows/{version}/{id}/metadata/failed-jobs':
get:
operationId: failed-jobs
summary: Get call-level metadata of failed tasks for a specified root workflow
parameters:
- $ref: '#/parameters/versionParam'
- $ref: '#/parameters/singleId'
tags:
- Workflows
responses:
'200':
description: Successful request
schema:
$ref: '#/definitions/WorkflowMetadataResponse'
'400':
$ref: '#/responses/BadRequest'
'404':
$ref: '#/responses/NotFound'
'500':
$ref: '#/responses/ServerError'
'/api/workflows/{version}/{id}/metadata':
get:
operationId: metadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,17 @@ trait MetadataRouteSupport extends HttpInstrumentation {
}
}
},
encodeResponse {
path("workflows" / Segment / Segment / "metadata" / "failed-jobs") { (_, possibleWorkflowId) =>
instrumentRequest {
metadataLookup(
possibleWorkflowId,
(w: WorkflowId) => FetchFailedJobsMetadataWithWorkflowId(w),
serviceRegistryActor
)
}
}
},
encodeResponse {
path("workflows" / Segment / Segment / "metadata") { (_, possibleWorkflowId) =>
instrumentRequest {
Expand Down Expand Up @@ -212,8 +223,7 @@ object MetadataRouteSupport {
}

def completeMetadataBuilderResponse(response: Future[MetadataJsonResponse]): Route = {
onComplete(response) {
case Success(r: SuccessfulMetadataJsonResponse) => complete(r.responseJson)
onComplete(response) { case Success(r: SuccessfulMetadataJsonResponse) => complete(r.responseJson)
case Success(r: FailedMetadataJsonResponse) => r.reason.errorRequest(StatusCodes.InternalServerError)
case Failure(_: AskTimeoutException) if CromwellShutdown.shutdownInProgress() => serviceShuttingDownResponse
case Failure(e: UnrecognizedWorkflowException) => e.failRequest(StatusCodes.NotFound)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class MetadataBuilderActorSpec extends TestKitSuite with AsyncFlatSpecLike with
events: Seq[MetadataEvent],
expectedRes: String,
metadataBuilderActorName: String,
failedTasks: Boolean = false
): Future[Assertion] = {
val mockReadMetadataWorkerActor = TestProbe("mockReadMetadataWorkerActor")
def readMetadataWorkerMaker = () => mockReadMetadataWorkerActor.props
Expand All @@ -45,13 +46,17 @@ class MetadataBuilderActorSpec extends TestKitSuite with AsyncFlatSpecLike with
props = MetadataBuilderActor.props(readMetadataWorkerMaker, 1000000),
name = metadataBuilderActorName,
)

val response = mba.ask(action).mapTo[MetadataJsonResponse]
mockReadMetadataWorkerActor.expectMsg(defaultTimeout, action)
mockReadMetadataWorkerActor.reply(MetadataLookupResponse(queryReply, events))
mockReadMetadataWorkerActor.reply(
if(failedTasks) FetchFailedJobsMetadataLookupResponse(events) else MetadataLookupResponse(queryReply, events)
)
response map { r => r shouldBe a [SuccessfulMetadataJsonResponse] }
response.mapTo[SuccessfulMetadataJsonResponse] map { b => b.responseJson shouldBe expectedRes.parseJson}
}


def assertMetadataFailureResponse(action: MetadataServiceAction,
metadataServiceResponse: MetadataServiceResponse,
expectedException: Exception,
Expand Down Expand Up @@ -95,37 +100,37 @@ class MetadataBuilderActorSpec extends TestKitSuite with AsyncFlatSpecLike with
// We'll use a Query instead of a SingleWorkflowMetadataGet, so we expect the WorkflowID this time:
val expectedRes =
s"""{
| "calls": {
| "callB": [{
| "attempt": 1,
| "NOT_CHECKED": "NOT_CHECKED",
| "shardIndex": -1
| }, {
| "attempt": 1,
| "NOT_CHECKED": "NOT_CHECKED",
| "shardIndex": 0
| }, {
| "attempt": 1,
| "NOT_CHECKED": "NOT_CHECKED",
| "shardIndex": 1
| }, {
| "attempt": 2,
| "NOT_CHECKED": "NOT_CHECKED",
| "shardIndex": 1
| }, {
| "attempt": 3,
| "NOT_CHECKED": "NOT_CHECKED",
| "shardIndex": 1
| }],
| "callA": [{
| "attempt": 1,
| "NOT_CHECKED": "NOT_CHECKED",
| "shardIndex": -1
| }]
| },
| "NOT_CHECKED": "NOT_CHECKED",
| "id": "$workflowA"
|}""".stripMargin
| "calls": {
| "callB": [{
| "attempt": 1,
| "NOT_CHECKED": "NOT_CHECKED",
| "shardIndex": -1
| }, {
| "attempt": 1,
| "NOT_CHECKED": "NOT_CHECKED",
| "shardIndex": 0
| }, {
| "attempt": 1,
| "NOT_CHECKED": "NOT_CHECKED",
| "shardIndex": 1
| }, {
| "attempt": 2,
| "NOT_CHECKED": "NOT_CHECKED",
| "shardIndex": 1
| }, {
| "attempt": 3,
| "NOT_CHECKED": "NOT_CHECKED",
| "shardIndex": 1
| }],
| "callA": [{
| "attempt": 1,
| "NOT_CHECKED": "NOT_CHECKED",
| "shardIndex": -1
| }]
| },
| "NOT_CHECKED": "NOT_CHECKED",
| "id": "$workflowA"
|}""".stripMargin

val mdQuery = MetadataQuery(workflowA, None, None, None, None, expandSubWorkflows = false)
val queryAction = GetMetadataAction(mdQuery)
Expand All @@ -134,7 +139,7 @@ class MetadataBuilderActorSpec extends TestKitSuite with AsyncFlatSpecLike with
queryReply = mdQuery,
events = workflowAEvents,
expectedRes = expectedRes,
metadataBuilderActorName = "mba-scope-tree",
metadataBuilderActorName = "mba-scope-tree"
)
}

Expand Down Expand Up @@ -162,6 +167,7 @@ class MetadataBuilderActorSpec extends TestKitSuite with AsyncFlatSpecLike with
eventMaker: WorkflowId => (String, MetadataValue, OffsetDateTime) => MetadataEvent =
makeEvent,
metadataBuilderActorName: String,
isFailedTaskFetch: Boolean = false
): Future[Assertion] = {

val events = eventList map { e => (e._1, MetadataValue(e._2), e._3) } map Function.tupled(eventMaker(workflow))
Expand All @@ -172,6 +178,74 @@ class MetadataBuilderActorSpec extends TestKitSuite with AsyncFlatSpecLike with
assertMetadataResponse(queryAction, mdQuery, events, expectedRes, metadataBuilderActorName)
}

it should "build the call list for failed tasks when prompted" in {

def makeEvent(workflow: WorkflowId, key: Option[MetadataJobKey]) = {
MetadataEvent(MetadataKey(workflow, key, "NOT_CHECKED"), MetadataValue("NOT_CHECKED"))
}

val workflowA = WorkflowId.randomId()

val workflowACalls = List(
Option(MetadataJobKey("callB", Option(1), 3)),
Option(MetadataJobKey("callB", None, 1)),
Option(MetadataJobKey("callB", Option(1), 2)),
Option(MetadataJobKey("callA", None, 1)),
Option(MetadataJobKey("callB", Option(1), 1)),
Option(MetadataJobKey("callB", Option(0), 1)),
None
)
val workflowAEvents = workflowACalls map {
makeEvent(workflowA, _)
}

val expectedRes =
s"""{
|"${workflowA}": {
| "calls": {
| "callB": [{
| "attempt": 1,
| "NOT_CHECKED": "NOT_CHECKED",
| "shardIndex": -1
| }, {
| "attempt": 1,
| "NOT_CHECKED": "NOT_CHECKED",
| "shardIndex": 0
| }, {
| "attempt": 1,
| "NOT_CHECKED": "NOT_CHECKED",
| "shardIndex": 1
| }, {
| "attempt": 2,
| "NOT_CHECKED": "NOT_CHECKED",
| "shardIndex": 1
| }, {
| "attempt": 3,
| "NOT_CHECKED": "NOT_CHECKED",
| "shardIndex": 1
| }],
| "callA": [{
| "attempt": 1,
| "NOT_CHECKED": "NOT_CHECKED",
| "shardIndex": -1
| }]
| },
| "NOT_CHECKED": "NOT_CHECKED"
| }
|}""".stripMargin

val mdQuery = MetadataQuery(workflowA, None, None, None, None, expandSubWorkflows = false)
val queryAction = GetMetadataAction(mdQuery)
assertMetadataResponse(
action = queryAction,
queryReply = mdQuery,
events = workflowAEvents,
expectedRes = expectedRes,
failedTasks = true,
metadataBuilderActorName = "mba-failed-tasks",
JVThomas marked this conversation as resolved.
Show resolved Hide resolved
)
}

it should "assume the event list is ordered and keep last event if 2 events have same key" in {
val eventBuilderList = List(
("a", "aLater", OffsetDateTime.parse("2000-01-02T12:00:00Z")),
Expand Down
Loading