Skip to content

Commit

Permalink
spline #1155 Move "getExecPlanById()" and "findExecPlans()" to Foxx
Browse files Browse the repository at this point in the history
  • Loading branch information
wajda committed Oct 7, 2024
1 parent c149c3d commit e0cfa29
Show file tree
Hide file tree
Showing 13 changed files with 265 additions and 185 deletions.
1 change: 1 addition & 0 deletions arangodb-foxx-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
<argument>za.co.absa.spline.consumer.service.model.DataSourceActionType</argument>
<argument>za.co.absa.spline.consumer.service.model.OperationDetails</argument>
<argument>za.co.absa.spline.consumer.service.model.ExpressionGraph</argument>
<argument>za.co.absa.spline.consumer.service.model.ExecutionPlanDetailed</argument>
</arguments>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 ABSA Group Limited
* Copyright 2024 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,13 +16,18 @@

package za.co.absa.spline.consumer.service.model

import com.fasterxml.jackson.core.`type`.TypeReference
import io.swagger.annotations.{ApiModel, ApiModelProperty}


@ApiModel(description = "Execution Plan and Lineage Graph")
case class LineageDetailed(
@ApiModel(description = "Execution Plan with Operation Graph")
case class ExecutionPlanDetailed(
@ApiModelProperty(value = "Information related to the execution plan")
executionPlan: ExecutionPlanInfo,
@ApiModelProperty(value = "Execution plan level lineage")
graph: LineageDetailedGraph
graph: OperationGraph
)

object ExecutionPlanDetailed {
implicit val typeRef: TypeReference[ExecutionPlanDetailed] = new TypeReference[ExecutionPlanDetailed] {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package za.co.absa.spline.consumer.service.model

import com.fasterxml.jackson.core.`type`.TypeReference
import io.swagger.annotations.{ApiModel, ApiModelProperty}
import za.co.absa.spline.consumer.service.model.ExecutionPlanInfo.Id

Expand All @@ -42,4 +43,6 @@ case class ExecutionPlanInfo

object ExecutionPlanInfo {
type Id = UUID

implicit val typeRefFrameOfExecPlanInfo: TypeReference[Frame[ExecutionPlanInfo]] = new TypeReference[Frame[ExecutionPlanInfo]] {}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 ABSA Group Limited
* Copyright 2024 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,12 +18,12 @@ package za.co.absa.spline.consumer.service.model

import io.swagger.annotations.{ApiModel, ApiModelProperty}

@ApiModel(description = "Lineage Graph")
case class LineageDetailedGraph
@ApiModel(description = "Operation Graph")
case class OperationGraph
(
@ApiModelProperty(value = "List of graph nodes representing the different operations of the lineage")
@ApiModelProperty(value = "List of graph nodes representing the operations of the execution plan")
nodes: Array[Operation],
@ApiModelProperty(value = "List of graph edges showing the triggered operations order")
@ApiModelProperty(value = "List of graph edges representing the transitions between the operations")
edges: Array[Transition]

) extends Graph {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export const eventsRouter: Foxx.Router = createRouter()
eventsRouter
.get('/',
(req: Foxx.Request, res: Foxx.Response) => {
const events = listExecutionEvents(
const events: Frame<Partial<ExecutionEventInfo>> = listExecutionEvents(
req.queryParams.asAtTime,
req.queryParams.timestampStart,
req.queryParams.timestampEnd,
Expand Down
33 changes: 31 additions & 2 deletions arangodb-foxx-services/src/main/routes/consumer/plans-router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,41 @@
*/

import { createRouter } from '@arangodb/foxx'
import { DataSourceActionType } from '../../../external/consumer-api.model'
import { getDataSourceURIsByActionType } from '../../services/execution-plan-store'
import { DataSourceActionType, ExecutionPlanDetailed, ExecutionPlanInfo, Frame } from '../../../external/consumer-api.model'
import { findExecutionPlanInfos, getDataSourceURIsByActionType, getExecutionPlanDetailedById } from '../../services/execution-plan-store'
import Joi from 'joi'


export const plansRouter: Foxx.Router = createRouter()

plansRouter
.get('/', (req: Foxx.Request, res: Foxx.Response) => {
const planInfos: Frame<ExecutionPlanInfo> = findExecutionPlanInfos(
req.queryParams.asAtTime,
req.queryParams.pageOffset,
req.queryParams.pageSize,
req.queryParams.sortField,
req.queryParams.sortOrder,
)
res.send(planInfos)
})
.queryParam('asAtTime', Joi.string().required(), 'As at time')
.queryParam('pageOffset', Joi.number().required(), 'Page offset')
.queryParam('pageSize', Joi.number().required(), 'Page size')
.queryParam('sortField', Joi.string().required(), 'Sort field')
.queryParam('sortOrder', Joi.string().required().valid('asc', 'desc'), 'Sort order')
.response(200, ['application/json'], 'Array of execution plan infos')
.summary('Find execution plan infos')

plansRouter
.get('/:planId/_detailed',
(req: Foxx.Request, res: Foxx.Response) => {
const plan: ExecutionPlanDetailed = getExecutionPlanDetailedById(req.pathParams.planId)
res.send(plan)
})
.pathParam('planId', Joi.string().min(1).required(), 'Execution Plan ID')
.response(200, ['application/json'], 'Detailed Execution Plan')
.summary('Get detailed execution plan by ID')

plansRouter
.get('/:planId/data-sources',
Expand All @@ -35,3 +63,4 @@ plansRouter
.pathParam('planId', Joi.string().min(1).required(), 'Execution Plan ID')
.queryParam('access', Joi.string().optional().valid(DataSourceActionType.values).default(null), 'Access type (read/write) to filter by')
.response(200, ['application/json'], 'Array of data source URIs')
.summary('Get data source URIs by action type')
192 changes: 191 additions & 1 deletion arangodb-foxx-services/src/main/services/execution-plan-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@


import { ExecutionPlanPersistentModel } from '../../external/persistence-api.model'
import { DataSourceActionType } from '../../external/consumer-api.model'
import { DataSourceActionType, ExecutionPlanDetailed, ExecutionPlanInfo, Frame } from '../../external/consumer-api.model'
import { CollectionName, EdgeCollectionName, NodeCollectionName, WriteTxInfo } from '../persistence/model'
import { checkKeyExistence, store } from '../persistence/store'
import { withTimeTracking } from '../utils/common'
Expand All @@ -26,6 +26,7 @@ import { DataSourceActionTypeValue } from './model'
import { aql, db } from '@arangodb'
import { AQLCodeGenHelper } from '../utils/aql-gen-helper'
import { TxManager } from '../persistence/txm'
import Cursor = ArangoDB.Cursor


export function checkExecutionPlanExists(planKey: DocumentKey, discriminator: string): boolean {
Expand Down Expand Up @@ -79,6 +80,103 @@ export function storeExecutionPlan(eppm: ExecutionPlanPersistentModel): void {
})
}

export function findExecutionPlanInfos(asAtTime: string, pageOffset: number, pageSize: number, sortField: string, sortOrder: string): Frame<ExecutionPlanInfo> {
const rtxInfo = TxManager.startRead()
const aqlGen = new AQLCodeGenHelper(rtxInfo)

const cursor: Cursor<ExecutionPlanInfo> = db._query(
aql`
WITH ${NodeCollectionName.ExecutionPlan},
${NodeCollectionName.Progress},
${NodeCollectionName.Operation},
${EdgeCollectionName.Follows},
${EdgeCollectionName.Emits},
${NodeCollectionName.Schema},
${EdgeCollectionName.ConsistsOf},
${NodeCollectionName.Attribute}
FOR execPlan IN executionPlan
${aqlGen.genTxIsolationCodeForLoop('execPlan')}
LET progress = (
FOR prog IN progress
${aqlGen.genTxIsolationCodeForLoop('prog')}
FILTER prog.execPlanDetails.executionPlanKey == execPlan._key
FILTER prog.timestamp <= ${asAtTime}
LIMIT 1
RETURN prog
)
FILTER LENGTH(progress)
SORT execPlan.${sortField} ${sortOrder}
LIMIT ${pageOffset * pageSize}, ${pageSize}
LET ops = (
FOR op IN operation
FILTER op._belongsTo == execPlan._id
RETURN op
)
LET edges = (
FOR f IN follows
FILTER f._belongsTo == execPlan._id
RETURN f
)
LET schemaIds = (
FOR op IN ops
FOR schema IN 1
OUTBOUND op emits
RETURN DISTINCT schema._id
)
LET attributes = (
FOR sid IN schemaIds
FOR a IN 1
OUTBOUND sid consistsOf
RETURN DISTINCT {
"id" : a._key,
"name" : a.name,
"dataTypeId" : a.dataType
}
)
LET inputs = FLATTEN(
FOR op IN ops
FILTER op.type == "Read"
RETURN op.inputSources[* RETURN {
"source" : CURRENT,
"sourceType": op.extra.sourceType
}]
)
LET output = FIRST(
ops[*
FILTER CURRENT.type == "Write"
RETURN {
"source" : CURRENT.outputSource,
"sourceType": CURRENT.extra.destinationType
}]
)
return {
"_id" : execPlan._key,
"systemInfo": execPlan.systemInfo,
"agentInfo" : execPlan.agentInfo,
"name" : execPlan.name || execPlan._key,
"extra" : MERGE(
execPlan.extra,
{ attributes },
{ "appName" : execPlan.name || execPlan._key }
),
"inputs" : inputs,
"output" : output
}
`,
{
fullCount: true
}
)

return {
offset: 0,
totalCount: cursor.getExtra().stats.fullCount,
items: cursor.toArray()
}
}

export function getDataSourceURIsByActionType(planKey: DocumentKey, access: DataSourceActionTypeValue): string[] {
const rtxInfo = TxManager.startRead()
const aqlGen = new AQLCodeGenHelper(rtxInfo)
Expand All @@ -102,3 +200,95 @@ export function getDataSourceURIsByActionType(planKey: DocumentKey, access: Data
RETURN ds.uri
`).toArray()
}

export function getExecutionPlanDetailedById(planKey: DocumentKey): ExecutionPlanDetailed {
const rtxInfo = TxManager.startRead()
const aqlGen = new AQLCodeGenHelper(rtxInfo)

return db._query(aql`
WITH ${NodeCollectionName.ExecutionPlan},
${EdgeCollectionName.Executes},
${NodeCollectionName.Operation},
${EdgeCollectionName.Follows},
${EdgeCollectionName.Emits},
${NodeCollectionName.Schema},
${EdgeCollectionName.ConsistsOf},
${NodeCollectionName.Attribute}
LET execPlan = FIRST(
FOR ep IN executionPlan
${aqlGen.genTxIsolationCodeForLoop('ep')}
FILTER ep._key == ${planKey}
RETURN ep
)
LET ops = (
FOR op IN operation
FILTER op._belongsTo == execPlan._id
RETURN op
)
LET edges = (
FOR f IN follows
FILTER f._belongsTo == execPlan._id
RETURN f
)
LET schemaIds = (
FOR op IN ops
FOR schema IN 1
OUTBOUND op emits
RETURN DISTINCT schema._id
)
LET attributes = (
FOR sid IN schemaIds
FOR a IN 1
OUTBOUND sid consistsOf
RETURN DISTINCT {
"id" : a._key,
"name" : a.name,
"dataTypeId" : a.dataType
}
)
LET inputs = FLATTEN(
FOR op IN ops
FILTER op.type == "Read"
RETURN op.inputSources[* RETURN {
"source" : CURRENT,
"sourceType": op.extra.sourceType
}]
)
LET output = FIRST(
ops[*
FILTER CURRENT.type == "Write"
RETURN {
"source" : CURRENT.outputSource,
"sourceType": CURRENT.extra.destinationType
}]
)
RETURN execPlan && {
"graph": {
"nodes": ops[* RETURN {
"_id" : CURRENT._key,
"_type": CURRENT.type,
"name" : CURRENT.name || CURRENT.type,
"properties": {}
}],
"edges": edges[* RETURN {
"source": PARSE_IDENTIFIER(CURRENT._to).key,
"target": PARSE_IDENTIFIER(CURRENT._from).key
}]
},
"executionPlan": {
"_id" : execPlan._key,
"systemInfo": execPlan.systemInfo,
"agentInfo" : execPlan.agentInfo,
"name" : execPlan.name || execPlan._key,
"extra" : MERGE(
execPlan.extra,
{ attributes },
{ "appName" : execPlan.name || execPlan._key }
),
"inputs" : inputs,
"output" : output
}
}
`).next()
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class ExecutionPlansController @Autowired()
def execPlan(
@ApiParam(value = "Id of the execution plan")
@PathVariable("planId") planId: ExecutionPlanInfo.Id
): Future[LineageDetailed] = {
): Future[ExecutionPlanDetailed] = {
epRepo.findById(planId)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import io.swagger.annotations._
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.web.bind.annotation._
import za.co.absa.spline.consumer.rest.controller.LineageDetailedController.AttributeLineageAndImpact
import za.co.absa.spline.consumer.service.model.{AttributeGraph, ExecutionPlanInfo, LineageDetailed}
import za.co.absa.spline.consumer.service.model.{AttributeGraph, ExecutionPlanInfo, ExecutionPlanDetailed}
import za.co.absa.spline.consumer.service.repo.ExecutionPlanRepository

import scala.concurrent.{ExecutionContext, Future}
Expand All @@ -44,7 +44,7 @@ class LineageDetailedController @Autowired()(val epRepo: ExecutionPlanRepository
def lineageDetailed(
@ApiParam(value = "Execution plan ID", required = true)
@RequestParam("execId") execId: ExecutionPlanInfo.Id
): Future[LineageDetailed] = {
): Future[ExecutionPlanDetailed] = {
epRepo.findById(execId)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class ExecutionPlansControllerTest extends org.scalatest.flatspec.AnyFlatSpec wi

it should "return an execPlan in a form of a LineageDetailed object from execPlan(id)" in {
val ep1id = UUID.randomUUID()
val lineageDetailed1 = mock[LineageDetailed]
val lineageDetailed1 = mock[ExecutionPlanDetailed]
val mockedEpRepo = mock[ExecutionPlanRepository]
when(mockedEpRepo.findById(eqTo(ep1id))(any())).thenReturn(Future.successful(lineageDetailed1))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ trait ExecutionPlanRepository {

def execPlanAttributeImpact(attrId: Attribute.Id)(implicit ec: ExecutionContext): Future[AttributeGraph]

def findById(execId: ExecutionPlanInfo.Id)(implicit ec: ExecutionContext): Future[LineageDetailed]
def findById(execId: ExecutionPlanInfo.Id)(implicit ec: ExecutionContext): Future[ExecutionPlanDetailed]

def find(
asAtTime: Long,
Expand Down
Loading

0 comments on commit e0cfa29

Please sign in to comment.