Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WX-1110[risk=low] Added endpoint to fetch failed tasks by workflow id #7165

Merged
merged 18 commits into from
Jul 17, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
b0efcf5
wx-1110 saving current progress, need to translate entries to events …
JVThomas May 25, 2023
978304d
WX-1110 end to end path building successfully, need to test if it wor…
JVThomas Jun 9, 2023
bec897b
WX-1110 added new metadata endpoint, partially implemented swagger do…
JVThomas Jun 9, 2023
c3b5f0e
WX-1110 corrections to SQL query and endpoint flow (need to test if a…
JVThomas Jun 22, 2023
53bd3f8
WX-1110 added test for new MetadataSlickDatabase function
JVThomas Jun 26, 2023
e6ba45e
WX-1110 recombined metadta entry transformation logic
JVThomas Jun 27, 2023
3f648a3
WX-1110 added content checks on emtries for db test
JVThomas Jun 27, 2023
ff6b590
WX-1110 created method to handle identifier parsing for postgres
JVThomas Jun 28, 2023
5e71659
WX-1110 corrections to seq insertions, test corrections
JVThomas Jun 28, 2023
55559ef
WX-1110 updated query to resolve for PG and non-PG dbs, updated test
JVThomas Jul 7, 2023
4b62b90
WX-1110 added test for failed task event for MetadataBuilderActor
JVThomas Jul 10, 2023
f8fa61e
WX-1110 added endpoint test for failed jobs
JVThomas Jul 10, 2023
f5bcb3e
Merge branch 'develop' into WX-1110
JVThomas Jul 10, 2023
6f4137b
WX-1110 minor update to swagger endpoint page
JVThomas Jul 10, 2023
27d9461
WX-1110 typo corrections, comment cleanup
JVThomas Jul 10, 2023
c6105cc
WX-1110 doc updates, mock response and test corrections, added row co…
JVThomas Jul 12, 2023
4235421
Merge branch 'develop' into WX-1110
JVThomas Jul 13, 2023
ccc601b
WX-1110 corrected query to correctly handle failed fetches against ta…
JVThomas Jul 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -515,4 +515,9 @@ class MetadataSlickDatabase(originalDatabaseConfig: Config)
override def getMetadataTableSizeInformation()(implicit ec: ExecutionContext): Future[Option[InformationSchemaEntry]] = {
runAction(dataAccess.metadataTableSizeInformation())
}

override def getFailedJobsMetadataWithWorkflowId(rootWorkflowId: String)(implicit ec: ExecutionContext): Future[Vector[MetadataEntry]] = {
val isPostgres = databaseConfig.getValue("db.driver").toString.toLowerCase().contains("postgres")
runLobAction(dataAccess.failedJobsMetadataWithWorkflowId(rootWorkflowId, isPostgres))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,86 @@ trait MetadataEntryComponent {
}).headOption
}

def failedJobsMetadataWithWorkflowId(rootWorkflowId: String, isPostgres: Boolean) = {
val getMetadataEntryResult = GetResult(r => {
MetadataEntry(r.<<, r.<<, r.<<, r.<<, r.<<, r.nextClobOption().map(clob => new SerialClob(clob)), r.<<, r.<<, r.<<)
})

def dbIdentifierWrapper(identifier: String, isPostgres: Boolean) = {
if(isPostgres) s"${'"'}$identifier${'"'}" else identifier
}

def dbMetadataValueColCheckName(isPostgres: Boolean): String = {
if(isPostgres) "obj.data" else "METADATA_VALUE"
}

def dbTargetCallsGroupByStatement(isPostgres: Boolean, callFqn: String): String = {
s"GROUP BY ${callFqn}" + (if(isPostgres) ", obj.data" else "")
}

def targetCallsSelectStatement(callFqn: String, scatterIndex: String, retryAttempt: String): String = {
s"SELECT ${callFqn}, MAX(COALESCE(${scatterIndex}, 0)) as maxScatter, MAX(COALESCE(${retryAttempt}, 0)) AS maxRetry"
}

def pgObjectInnerJoinStatement(isPostgres: Boolean, metadataValColName: String): String = {
if(isPostgres) s"INNER JOIN pg_largeobject obj ON me.${metadataValColName} = cast(obj.loid as text)" else ""
}

val workflowUuid = dbIdentifierWrapper("WORKFLOW_EXECUTION_UUID", isPostgres)
val callFqn = dbIdentifierWrapper("CALL_FQN", isPostgres)
val scatterIndex = dbIdentifierWrapper("JOB_SCATTER_INDEX", isPostgres)
val retryAttempt = dbIdentifierWrapper("JOB_RETRY_ATTEMPT", isPostgres)
val metadataKey = dbIdentifierWrapper("METADATA_KEY", isPostgres)
val metadataValueType = dbIdentifierWrapper("METADATA_VALUE_TYPE", isPostgres)
val metadataTimestamp = dbIdentifierWrapper("METADATA_TIMESTAMP", isPostgres)
val metadataJournalId = dbIdentifierWrapper("METADATA_JOURNAL_ID", isPostgres)
val rootUuid = dbIdentifierWrapper("ROOT_WORKFLOW_EXECUTION_UUID", isPostgres)
val metadataValue = dbIdentifierWrapper("METADATA_VALUE", isPostgres)
val metadataEntry = dbIdentifierWrapper("METADATA_ENTRY", isPostgres)
val wmse = dbIdentifierWrapper("WORKFLOW_METADATA_SUMMARY_ENTRY", isPostgres)
val resultSetColumnNames = s"me.${workflowUuid}, me.${callFqn}, me.${scatterIndex}, me.${retryAttempt}, me.${metadataKey}, me.${metadataValue}, me.${metadataValueType}, me.${metadataTimestamp}, me.${metadataJournalId}"

val query = sql"""
SELECT #${resultSetColumnNames}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's up with these hash signs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's Slick's way of handling string interpolation. You would think "Why not use ${}? It's because Slick is using that notation specifically to bind values in a query.

So something like tableName.colName = ${val} would work, but `SELECT ${tableName}.${colName} would throw an error since you're declaring an identifier rather than providing a value.

This also caught me off guard and is only briefly covered in the Slick docs.

FROM #${metadataEntry} me
INNER JOIN (
#${targetCallsSelectStatement(callFqn, scatterIndex, retryAttempt)}
FROM #${metadataEntry} me
INNER JOIN #${wmse} wmse
ON wmse.#${workflowUuid} = me.#${workflowUuid}
#${pgObjectInnerJoinStatement(isPostgres, metadataValue)}
WHERE (wmse.#${rootUuid} = $rootWorkflowId OR wmse.#${workflowUuid} = $rootWorkflowId)
AND (me.#${metadataKey} in ('executionStatus', 'backendStatus') AND #${dbMetadataValueColCheckName(isPostgres)} = 'Failed')
AND #${callFqn} IS NOT NULL
#${dbTargetCallsGroupByStatement(isPostgres, callFqn)}
) AS targetCalls
ON me.#${callFqn} = targetCalls.#${callFqn}
LEFT JOIN (
SELECT DISTINCT #${callFqn}
FROM #${metadataEntry} me
INNER JOIN #${wmse} wmse
ON wmse.#${workflowUuid} = me.#${workflowUuid}
WHERE (wmse.#${rootUuid} = $rootWorkflowId OR wmse.#${workflowUuid} = $rootWorkflowId)
AND me.#${metadataKey} = 'subWorkflowId'
GROUP BY #${callFqn}
) AS avoidedCalls
ON me.#${callFqn} = avoidedCalls.#${callFqn}
INNER JOIN #${wmse} wmse
ON wmse.#${workflowUuid} = me.#${workflowUuid}
WHERE avoidedCalls.#${callFqn} IS NULL
AND COALESCE(me.#${scatterIndex}, 0) = targetCalls.maxScatter
AND COALESCE(me.#${retryAttempt}, 0) = targetCalls.maxRetry
GROUP BY #${resultSetColumnNames}
HAVING me.#${workflowUuid} IN (
SELECT DISTINCT wmse.#${workflowUuid}
FROM #${wmse} wmse
WHERE wmse.#${rootUuid} = $rootWorkflowId OR wmse.#${workflowUuid} = $rootWorkflowId
)
"""

query.as(getMetadataEntryResult)
}

private[this] def metadataEntryHasMetadataKeysLike(metadataEntry: MetadataEntries,
metadataKeysToFilterFor: List[String],
metadataKeysToFilterOut: List[String]): Rep[Boolean] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,4 +197,6 @@ trait MetadataSqlDatabase extends SqlDatabase {
def countWorkflowsLeftToDeleteThatEndedOnOrBeforeThresholdTimestamp(workflowEndTimestampThreshold: Timestamp)(implicit ec: ExecutionContext): Future[Int]

def getMetadataTableSizeInformation()(implicit ec: ExecutionContext): Future[Option[InformationSchemaEntry]]

def getFailedJobsMetadataWithWorkflowId(rootWorkflowId: String)(implicit ec: ExecutionContext): Future[Vector[MetadataEntry]]
}
32 changes: 31 additions & 1 deletion docs/api/RESTAPI.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions engine/src/main/resources/swagger/cromwell.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,26 @@ paths:
$ref: '#/responses/NotFound'
'500':
$ref: '#/responses/ServerError'
'/api/workflows/{version}/{id}/metadata/failed-jobs':
get:
operationId: failed-jobs
summary: Get call-level metadata of failed tasks for a specified workflow
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this work for subworkflows as well? If not this should probably say "root workflow."

Copy link
Contributor Author

@JVThomas JVThomas Jul 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point (only works for root workflows). I'll update the description

parameters:
- $ref: '#/parameters/versionParam'
- $ref: '#/parameters/singleId'
tags:
- Workflows
responses:
'200':
description: Successful request
schema:
$ref: '#/definitions/WorkflowMetadataResponse'
'400':
$ref: '#/responses/BadRequest'
'404':
$ref: '#/responses/NotFound'
'500':
$ref: '#/responses/ServerError'
'/api/workflows/{version}/{id}/metadata':
get:
operationId: metadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,17 @@ trait MetadataRouteSupport extends HttpInstrumentation {
}
}
},
encodeResponse {
path("workflows" / Segment / Segment / "metadata" / "failed-jobs") { (_, possibleWorkflowId) =>
instrumentRequest {
metadataLookup(
possibleWorkflowId,
(w: WorkflowId) => FetchFailedJobsMetadataWithWorkflowId(w),
serviceRegistryActor
)
}
}
},
encodeResponse {
path("workflows" / Segment / Segment / "metadata") { (_, possibleWorkflowId) =>
instrumentRequest {
Expand Down Expand Up @@ -212,8 +223,7 @@ object MetadataRouteSupport {
}

def completeMetadataBuilderResponse(response: Future[MetadataJsonResponse]): Route = {
onComplete(response) {
case Success(r: SuccessfulMetadataJsonResponse) => complete(r.responseJson)
onComplete(response) { case Success(r: SuccessfulMetadataJsonResponse) => complete(r.responseJson)
case Success(r: FailedMetadataJsonResponse) => r.reason.errorRequest(StatusCodes.InternalServerError)
case Failure(_: AskTimeoutException) if CromwellShutdown.shutdownInProgress() => serviceShuttingDownResponse
case Failure(e: UnrecognizedWorkflowException) => e.failRequest(StatusCodes.NotFound)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class MetadataBuilderActorSpec extends TestKitSuite with AsyncFlatSpecLike with
events: Seq[MetadataEvent],
expectedRes: String,
metadataBuilderActorName: String,
failedTasks: Boolean = false
): Future[Assertion] = {
val mockReadMetadataWorkerActor = TestProbe("mockReadMetadataWorkerActor")
def readMetadataWorkerMaker = () => mockReadMetadataWorkerActor.props
Expand All @@ -45,13 +46,17 @@ class MetadataBuilderActorSpec extends TestKitSuite with AsyncFlatSpecLike with
props = MetadataBuilderActor.props(readMetadataWorkerMaker, 1000000),
name = metadataBuilderActorName,
)

val response = mba.ask(action).mapTo[MetadataJsonResponse]
mockReadMetadataWorkerActor.expectMsg(defaultTimeout, action)
mockReadMetadataWorkerActor.reply(MetadataLookupResponse(queryReply, events))
mockReadMetadataWorkerActor.reply(
if(failedTasks) FetchFailedJobsMetadataLookupResponse(events) else MetadataLookupResponse(queryReply, events)
)
response map { r => r shouldBe a [SuccessfulMetadataJsonResponse] }
response.mapTo[SuccessfulMetadataJsonResponse] map { b => b.responseJson shouldBe expectedRes.parseJson}
}


def assertMetadataFailureResponse(action: MetadataServiceAction,
metadataServiceResponse: MetadataServiceResponse,
expectedException: Exception,
Expand Down Expand Up @@ -95,6 +100,7 @@ class MetadataBuilderActorSpec extends TestKitSuite with AsyncFlatSpecLike with
// We'll use a Query instead of a SingleWorkflowMetadataGet, so we expect the WorkflowID this time:
val expectedRes =
s"""{
|"${workflowA}": {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this change represent a real change in behavior, or just a change in the structure of the test?

Copy link
Contributor Author

@JVThomas JVThomas Jul 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See this comment. TLDR: I updated the wrong test.

| "calls": {
| "callB": [{
| "attempt": 1,
Expand Down Expand Up @@ -123,8 +129,8 @@ class MetadataBuilderActorSpec extends TestKitSuite with AsyncFlatSpecLike with
| "shardIndex": -1
| }]
| },
| "NOT_CHECKED": "NOT_CHECKED",
| "id": "$workflowA"
| "NOT_CHECKED": "NOT_CHECKED"
| }
|}""".stripMargin

val mdQuery = MetadataQuery(workflowA, None, None, None, None, expandSubWorkflows = false)
Expand All @@ -134,7 +140,8 @@ class MetadataBuilderActorSpec extends TestKitSuite with AsyncFlatSpecLike with
queryReply = mdQuery,
events = workflowAEvents,
expectedRes = expectedRes,
metadataBuilderActorName = "mba-scope-tree",
metadataBuilderActorName = "mba-failed-tasks-tree",
true
)
}

Expand Down Expand Up @@ -162,6 +169,7 @@ class MetadataBuilderActorSpec extends TestKitSuite with AsyncFlatSpecLike with
eventMaker: WorkflowId => (String, MetadataValue, OffsetDateTime) => MetadataEvent =
makeEvent,
metadataBuilderActorName: String,
isFailedTaskFetch: Boolean = false
): Future[Assertion] = {

val events = eventList map { e => (e._1, MetadataValue(e._2), e._3) } map Function.tupled(eventMaker(workflow))
Expand All @@ -172,6 +180,72 @@ class MetadataBuilderActorSpec extends TestKitSuite with AsyncFlatSpecLike with
assertMetadataResponse(queryAction, mdQuery, events, expectedRes, metadataBuilderActorName)
}

it should "build the call list for failed tasks when prompted" in {

def makeEvent(workflow: WorkflowId, key: Option[MetadataJobKey]) = {
MetadataEvent(MetadataKey(workflow, key, "NOT_CHECKED"), MetadataValue("NOT_CHECKED"))
}

val workflowA = WorkflowId.randomId()

val workflowACalls = List(
Option(MetadataJobKey("callB", Option(1), 3)),
Option(MetadataJobKey("callB", None, 1)),
Option(MetadataJobKey("callB", Option(1), 2)),
Option(MetadataJobKey("callA", None, 1)),
Option(MetadataJobKey("callB", Option(1), 1)),
Option(MetadataJobKey("callB", Option(0), 1)),
None
)
val workflowAEvents = workflowACalls map {
makeEvent(workflowA, _)
}

val expectedRes =
s"""{
| "calls": {
| "callB": [{
| "attempt": 1,
| "NOT_CHECKED": "NOT_CHECKED",
| "shardIndex": -1
| }, {
| "attempt": 1,
| "NOT_CHECKED": "NOT_CHECKED",
| "shardIndex": 0
| }, {
| "attempt": 1,
| "NOT_CHECKED": "NOT_CHECKED",
| "shardIndex": 1
| }, {
| "attempt": 2,
| "NOT_CHECKED": "NOT_CHECKED",
| "shardIndex": 1
| }, {
| "attempt": 3,
| "NOT_CHECKED": "NOT_CHECKED",
| "shardIndex": 1
| }],
| "callA": [{
| "attempt": 1,
| "NOT_CHECKED": "NOT_CHECKED",
| "shardIndex": -1
| }]
| },
| "NOT_CHECKED": "NOT_CHECKED",
| "id": "$workflowA"
|}""".stripMargin

val mdQuery = MetadataQuery(workflowA, None, None, None, None, expandSubWorkflows = false)
val queryAction = GetMetadataAction(mdQuery)
assertMetadataResponse(
action = queryAction,
queryReply = mdQuery,
events = workflowAEvents,
expectedRes = expectedRes,
metadataBuilderActorName = "mba-failed-tasks",
JVThomas marked this conversation as resolved.
Show resolved Hide resolved
)
}

it should "assume the event list is ordered and keep last event if 2 events have same key" in {
val eventBuilderList = List(
("a", "aLater", OffsetDateTime.parse("2000-01-02T12:00:00Z")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,7 @@ object CromwellApiServiceSpec {
MetadataEvent(MetadataKey(workflowId, None, "testKey2a"), MetadataValue("myValue2a", MetadataString)),
)
}

private def wesFullMetadataResponse(workflowId: WorkflowId) = {
List(
MetadataEvent(MetadataKey(workflowId, None, "status"), MetadataValue("Running", MetadataString)),
Expand All @@ -576,6 +577,7 @@ object CromwellApiServiceSpec {
)
}


def responseMetadataValues(workflowId: WorkflowId, withKeys: List[String], withoutKeys: List[String]): JsObject = {
def keyFilter(keys: List[String])(m: MetadataEvent) = keys.exists(k => m.key.key.startsWith(k))
val metadataEvents = if (workflowId == wesWorkflowId) wesFullMetadataResponse(workflowId) else fullMetadataResponse(workflowId)
Expand Down Expand Up @@ -645,6 +647,8 @@ object CromwellApiServiceSpec {
sender() ! SuccessfulMetadataJsonResponse(request, MetadataBuilderActor.processOutputsResponse(id, event))
case request @ GetLogs(id) =>
sender() ! SuccessfulMetadataJsonResponse(request, MetadataBuilderActor.workflowMetadataResponse(id, logsEvents(id), includeCallsIfEmpty = false, Map.empty))
case request @ FetchFailedJobsMetadataWithWorkflowId(id) =>
sender() ! SuccessfulMetadataJsonResponse(request, responseMetadataValues(id, List.empty, List.empty))
case request @ GetMetadataAction(MetadataQuery(id, _, _, withKeys, withoutKeys, _), _) =>
val withKeysList = withKeys.map(_.toList).getOrElse(List.empty)
val withoutKeysList = withoutKeys.map(_.toList).getOrElse(List.empty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,20 @@ class MetadataRouteSupportSpec extends AsyncFlatSpec with ScalatestRouteTest wit
}
}

it should "return 200 with metadata for failed tasks on a workflow" in {
Get(s"/workflows/$version/${CromwellApiServiceSpec.ExistingWorkflowId}/metadata/failed-jobs") ~>
akkaHttpService.metadataRoutes ~>
check {
status should be(StatusCodes.OK)
val result = responseAs[JsObject]
result.fields.keys should contain allOf("testKey1a", "testKey1b", "testKey2a")
result.fields.keys shouldNot contain("testKey3")
result.fields("testKey1a") should be(JsString("myValue1a"))
result.fields("testKey1b") should be(JsString("myValue1b"))
result.fields("testKey2a") should be(JsString("myValue2a"))
}
}

behavior of "REST API /query GET endpoint"
it should "return good results for a good query" in {
Get(s"/workflows/$version/query?status=Succeeded&id=${CromwellApiServiceSpec.ExistingWorkflowId}") ~>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ object MetadataKey {

object MetadataEvent {
def apply(key: MetadataKey, value: MetadataValue) = new MetadataEvent(key, Option(value), OffsetDateTime.now)

def apply(key: MetadataKey, optionalValue: Option[MetadataValue]) = new MetadataEvent(key, optionalValue, OffsetDateTime.now)

def empty(key: MetadataKey) = new MetadataEvent(key, None, OffsetDateTime.now)

def labelsToMetadataEvents(labels: Labels, workflowId: WorkflowId): Iterable[MetadataEvent] = {
Expand Down
Loading