diff --git a/arangodb-foxx-api/pom.xml b/arangodb-foxx-api/pom.xml index 62c8fd31e..c3e5ebca5 100644 --- a/arangodb-foxx-api/pom.xml +++ b/arangodb-foxx-api/pom.xml @@ -72,6 +72,7 @@ za.co.absa.spline.consumer.service.model.DataSourceActionType za.co.absa.spline.consumer.service.model.OperationDetails za.co.absa.spline.consumer.service.model.ExpressionGraph + za.co.absa.spline.consumer.service.model.ExecutionPlanDetailed diff --git a/consumer-services/src/main/scala/za/co/absa/spline/consumer/service/model/LineageDetailed.scala b/arangodb-foxx-api/src/main/scala/za/co/absa/spline/consumer/service/model/ExecutionPlanDetailed.scala similarity index 70% rename from consumer-services/src/main/scala/za/co/absa/spline/consumer/service/model/LineageDetailed.scala rename to arangodb-foxx-api/src/main/scala/za/co/absa/spline/consumer/service/model/ExecutionPlanDetailed.scala index ef5fdf510..dea188158 100644 --- a/consumer-services/src/main/scala/za/co/absa/spline/consumer/service/model/LineageDetailed.scala +++ b/arangodb-foxx-api/src/main/scala/za/co/absa/spline/consumer/service/model/ExecutionPlanDetailed.scala @@ -1,5 +1,5 @@ /* - * Copyright 2019 ABSA Group Limited + * Copyright 2024 ABSA Group Limited * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,13 +16,18 @@ package za.co.absa.spline.consumer.service.model +import com.fasterxml.jackson.core.`type`.TypeReference import io.swagger.annotations.{ApiModel, ApiModelProperty} -@ApiModel(description = "Execution Plan and Lineage Graph") -case class LineageDetailed( +@ApiModel(description = "Execution Plan with Operation Graph") +case class ExecutionPlanDetailed( @ApiModelProperty(value = "Information related to the execution plan") executionPlan: ExecutionPlanInfo, @ApiModelProperty(value = "Execution plan level lineage") - graph: LineageDetailedGraph + graph: OperationGraph ) + +object ExecutionPlanDetailed { + implicit val typeRef: TypeReference[ExecutionPlanDetailed] = new TypeReference[ExecutionPlanDetailed] {} +} diff --git a/arangodb-foxx-api/src/main/scala/za/co/absa/spline/consumer/service/model/ExecutionPlanInfo.scala b/arangodb-foxx-api/src/main/scala/za/co/absa/spline/consumer/service/model/ExecutionPlanInfo.scala index 8681edd5d..a7b0f3e26 100644 --- a/arangodb-foxx-api/src/main/scala/za/co/absa/spline/consumer/service/model/ExecutionPlanInfo.scala +++ b/arangodb-foxx-api/src/main/scala/za/co/absa/spline/consumer/service/model/ExecutionPlanInfo.scala @@ -16,6 +16,7 @@ package za.co.absa.spline.consumer.service.model +import com.fasterxml.jackson.core.`type`.TypeReference import io.swagger.annotations.{ApiModel, ApiModelProperty} import za.co.absa.spline.consumer.service.model.ExecutionPlanInfo.Id @@ -42,4 +43,6 @@ case class ExecutionPlanInfo object ExecutionPlanInfo { type Id = UUID + + implicit val typeRefFrameOfExecPlanInfo: TypeReference[Frame[ExecutionPlanInfo]] = new TypeReference[Frame[ExecutionPlanInfo]] {} } diff --git a/consumer-services/src/main/scala/za/co/absa/spline/consumer/service/model/LineageDetailedGraph.scala b/arangodb-foxx-api/src/main/scala/za/co/absa/spline/consumer/service/model/OperationGraph.scala similarity index 78% rename from consumer-services/src/main/scala/za/co/absa/spline/consumer/service/model/LineageDetailedGraph.scala rename to arangodb-foxx-api/src/main/scala/za/co/absa/spline/consumer/service/model/OperationGraph.scala index 9bc3382c8..771ce37b4 100644 --- a/consumer-services/src/main/scala/za/co/absa/spline/consumer/service/model/LineageDetailedGraph.scala +++ b/arangodb-foxx-api/src/main/scala/za/co/absa/spline/consumer/service/model/OperationGraph.scala @@ -1,5 +1,5 @@ /* - * Copyright 2019 ABSA Group Limited + * Copyright 2024 ABSA Group Limited * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,12 +18,12 @@ package za.co.absa.spline.consumer.service.model import io.swagger.annotations.{ApiModel, ApiModelProperty} -@ApiModel(description = "Lineage Graph") -case class LineageDetailedGraph +@ApiModel(description = "Operation Graph") +case class OperationGraph ( - @ApiModelProperty(value = "List of graph nodes representing the different operations of the lineage") + @ApiModelProperty(value = "List of graph nodes representing the operations of the execution plan") nodes: Array[Operation], - @ApiModelProperty(value = "List of graph edges showing the triggered operations order") + @ApiModelProperty(value = "List of graph edges representing the transitions between the operations") edges: Array[Transition] ) extends Graph { diff --git a/consumer-services/src/main/scala/za/co/absa/spline/consumer/service/model/Transition.scala b/arangodb-foxx-api/src/main/scala/za/co/absa/spline/consumer/service/model/Transition.scala similarity index 100% rename from consumer-services/src/main/scala/za/co/absa/spline/consumer/service/model/Transition.scala rename to arangodb-foxx-api/src/main/scala/za/co/absa/spline/consumer/service/model/Transition.scala diff --git a/arangodb-foxx-services/src/main/routes/consumer/events-router.ts b/arangodb-foxx-services/src/main/routes/consumer/events-router.ts index 0a907f71d..9af45f70a 100644 --- a/arangodb-foxx-services/src/main/routes/consumer/events-router.ts +++ b/arangodb-foxx-services/src/main/routes/consumer/events-router.ts @@ -31,7 +31,7 @@ export const eventsRouter: Foxx.Router = createRouter() eventsRouter .get('/', (req: Foxx.Request, res: Foxx.Response) => { - const events = listExecutionEvents( + const events: Frame> = listExecutionEvents( req.queryParams.asAtTime, req.queryParams.timestampStart, req.queryParams.timestampEnd, diff --git a/arangodb-foxx-services/src/main/routes/consumer/plans-router.ts b/arangodb-foxx-services/src/main/routes/consumer/plans-router.ts index ba9fa512b..05dfb44e7 100644 --- a/arangodb-foxx-services/src/main/routes/consumer/plans-router.ts +++ b/arangodb-foxx-services/src/main/routes/consumer/plans-router.ts @@ -15,13 +15,41 @@ */ import { createRouter } from '@arangodb/foxx' -import { DataSourceActionType } from '../../../external/consumer-api.model' -import { getDataSourceURIsByActionType } from '../../services/execution-plan-store' +import { DataSourceActionType, ExecutionPlanDetailed, ExecutionPlanInfo, Frame } from '../../../external/consumer-api.model' +import { findExecutionPlanInfos, getDataSourceURIsByActionType, getExecutionPlanDetailedById } from '../../services/execution-plan-store' import Joi from 'joi' export const plansRouter: Foxx.Router = createRouter() +plansRouter + .get('/', (req: Foxx.Request, res: Foxx.Response) => { + const planInfos: Frame = findExecutionPlanInfos( + req.queryParams.asAtTime, + req.queryParams.pageOffset, + req.queryParams.pageSize, + req.queryParams.sortField, + req.queryParams.sortOrder, + ) + res.send(planInfos) + }) + .queryParam('asAtTime', Joi.string().required(), 'As at time') + .queryParam('pageOffset', Joi.number().required(), 'Page offset') + .queryParam('pageSize', Joi.number().required(), 'Page size') + .queryParam('sortField', Joi.string().required(), 'Sort field') + .queryParam('sortOrder', Joi.string().required().valid('asc', 'desc'), 'Sort order') + .response(200, ['application/json'], 'Array of execution plan infos') + .summary('Find execution plan infos') + +plansRouter + .get('/:planId/_detailed', + (req: Foxx.Request, res: Foxx.Response) => { + const plan: ExecutionPlanDetailed = getExecutionPlanDetailedById(req.pathParams.planId) + res.send(plan) + }) + .pathParam('planId', Joi.string().min(1).required(), 'Execution Plan ID') + .response(200, ['application/json'], 'Detailed Execution Plan') + .summary('Get detailed execution plan by ID') plansRouter .get('/:planId/data-sources', @@ -35,3 +63,4 @@ plansRouter .pathParam('planId', Joi.string().min(1).required(), 'Execution Plan ID') .queryParam('access', Joi.string().optional().valid(DataSourceActionType.values).default(null), 'Access type (read/write) to filter by') .response(200, ['application/json'], 'Array of data source URIs') + .summary('Get data source URIs by action type') diff --git a/arangodb-foxx-services/src/main/services/execution-plan-store.ts b/arangodb-foxx-services/src/main/services/execution-plan-store.ts index fbc4c507f..af3c5d83a 100644 --- a/arangodb-foxx-services/src/main/services/execution-plan-store.ts +++ b/arangodb-foxx-services/src/main/services/execution-plan-store.ts @@ -16,7 +16,7 @@ import { ExecutionPlanPersistentModel } from '../../external/persistence-api.model' -import { DataSourceActionType } from '../../external/consumer-api.model' +import { DataSourceActionType, ExecutionPlanDetailed, ExecutionPlanInfo, Frame } from '../../external/consumer-api.model' import { CollectionName, EdgeCollectionName, NodeCollectionName, WriteTxInfo } from '../persistence/model' import { checkKeyExistence, store } from '../persistence/store' import { withTimeTracking } from '../utils/common' @@ -26,6 +26,7 @@ import { DataSourceActionTypeValue } from './model' import { aql, db } from '@arangodb' import { AQLCodeGenHelper } from '../utils/aql-gen-helper' import { TxManager } from '../persistence/txm' +import Cursor = ArangoDB.Cursor export function checkExecutionPlanExists(planKey: DocumentKey, discriminator: string): boolean { @@ -79,6 +80,103 @@ export function storeExecutionPlan(eppm: ExecutionPlanPersistentModel): void { }) } +export function findExecutionPlanInfos(asAtTime: string, pageOffset: number, pageSize: number, sortField: string, sortOrder: string): Frame { + const rtxInfo = TxManager.startRead() + const aqlGen = new AQLCodeGenHelper(rtxInfo) + + const cursor: Cursor = db._query( + aql` + WITH ${NodeCollectionName.ExecutionPlan}, + ${NodeCollectionName.Progress}, + ${NodeCollectionName.Operation}, + ${EdgeCollectionName.Follows}, + ${EdgeCollectionName.Emits}, + ${NodeCollectionName.Schema}, + ${EdgeCollectionName.ConsistsOf}, + ${NodeCollectionName.Attribute} + + FOR execPlan IN executionPlan + ${aqlGen.genTxIsolationCodeForLoop('execPlan')} + LET progress = ( + FOR prog IN progress + ${aqlGen.genTxIsolationCodeForLoop('prog')} + FILTER prog.execPlanDetails.executionPlanKey == execPlan._key + FILTER prog.timestamp <= ${asAtTime} + LIMIT 1 + RETURN prog + ) + FILTER LENGTH(progress) + SORT execPlan.${sortField} ${sortOrder} + LIMIT ${pageOffset * pageSize}, ${pageSize} + + LET ops = ( + FOR op IN operation + FILTER op._belongsTo == execPlan._id + RETURN op + ) + LET edges = ( + FOR f IN follows + FILTER f._belongsTo == execPlan._id + RETURN f + ) + LET schemaIds = ( + FOR op IN ops + FOR schema IN 1 + OUTBOUND op emits + RETURN DISTINCT schema._id + ) + LET attributes = ( + FOR sid IN schemaIds + FOR a IN 1 + OUTBOUND sid consistsOf + RETURN DISTINCT { + "id" : a._key, + "name" : a.name, + "dataTypeId" : a.dataType + } + ) + LET inputs = FLATTEN( + FOR op IN ops + FILTER op.type == "Read" + RETURN op.inputSources[* RETURN { + "source" : CURRENT, + "sourceType": op.extra.sourceType + }] + ) + LET output = FIRST( + ops[* + FILTER CURRENT.type == "Write" + RETURN { + "source" : CURRENT.outputSource, + "sourceType": CURRENT.extra.destinationType + }] + ) + return { + "_id" : execPlan._key, + "systemInfo": execPlan.systemInfo, + "agentInfo" : execPlan.agentInfo, + "name" : execPlan.name || execPlan._key, + "extra" : MERGE( + execPlan.extra, + { attributes }, + { "appName" : execPlan.name || execPlan._key } + ), + "inputs" : inputs, + "output" : output + } + `, + { + fullCount: true + } + ) + + return { + offset: 0, + totalCount: cursor.getExtra().stats.fullCount, + items: cursor.toArray() + } +} + export function getDataSourceURIsByActionType(planKey: DocumentKey, access: DataSourceActionTypeValue): string[] { const rtxInfo = TxManager.startRead() const aqlGen = new AQLCodeGenHelper(rtxInfo) @@ -102,3 +200,95 @@ export function getDataSourceURIsByActionType(planKey: DocumentKey, access: Data RETURN ds.uri `).toArray() } + +export function getExecutionPlanDetailedById(planKey: DocumentKey): ExecutionPlanDetailed { + const rtxInfo = TxManager.startRead() + const aqlGen = new AQLCodeGenHelper(rtxInfo) + + return db._query(aql` + WITH ${NodeCollectionName.ExecutionPlan}, + ${EdgeCollectionName.Executes}, + ${NodeCollectionName.Operation}, + ${EdgeCollectionName.Follows}, + ${EdgeCollectionName.Emits}, + ${NodeCollectionName.Schema}, + ${EdgeCollectionName.ConsistsOf}, + ${NodeCollectionName.Attribute} + + LET execPlan = FIRST( + FOR ep IN executionPlan + ${aqlGen.genTxIsolationCodeForLoop('ep')} + FILTER ep._key == ${planKey} + RETURN ep + ) + LET ops = ( + FOR op IN operation + FILTER op._belongsTo == execPlan._id + RETURN op + ) + LET edges = ( + FOR f IN follows + FILTER f._belongsTo == execPlan._id + RETURN f + ) + LET schemaIds = ( + FOR op IN ops + FOR schema IN 1 + OUTBOUND op emits + RETURN DISTINCT schema._id + ) + LET attributes = ( + FOR sid IN schemaIds + FOR a IN 1 + OUTBOUND sid consistsOf + RETURN DISTINCT { + "id" : a._key, + "name" : a.name, + "dataTypeId" : a.dataType + } + ) + LET inputs = FLATTEN( + FOR op IN ops + FILTER op.type == "Read" + RETURN op.inputSources[* RETURN { + "source" : CURRENT, + "sourceType": op.extra.sourceType + }] + ) + LET output = FIRST( + ops[* + FILTER CURRENT.type == "Write" + RETURN { + "source" : CURRENT.outputSource, + "sourceType": CURRENT.extra.destinationType + }] + ) + RETURN execPlan && { + "graph": { + "nodes": ops[* RETURN { + "_id" : CURRENT._key, + "_type": CURRENT.type, + "name" : CURRENT.name || CURRENT.type, + "properties": {} + }], + "edges": edges[* RETURN { + "source": PARSE_IDENTIFIER(CURRENT._to).key, + "target": PARSE_IDENTIFIER(CURRENT._from).key + }] + }, + "executionPlan": { + "_id" : execPlan._key, + "systemInfo": execPlan.systemInfo, + "agentInfo" : execPlan.agentInfo, + "name" : execPlan.name || execPlan._key, + "extra" : MERGE( + execPlan.extra, + { attributes }, + { "appName" : execPlan.name || execPlan._key } + ), + "inputs" : inputs, + "output" : output + } + } + `).next() +} diff --git a/consumer-rest-core/src/main/scala/za/co/absa/spline/consumer/rest/controller/ExecutionPlansController.scala b/consumer-rest-core/src/main/scala/za/co/absa/spline/consumer/rest/controller/ExecutionPlansController.scala index 47f2d5e93..fc322bd5f 100644 --- a/consumer-rest-core/src/main/scala/za/co/absa/spline/consumer/rest/controller/ExecutionPlansController.scala +++ b/consumer-rest-core/src/main/scala/za/co/absa/spline/consumer/rest/controller/ExecutionPlansController.scala @@ -72,7 +72,7 @@ class ExecutionPlansController @Autowired() def execPlan( @ApiParam(value = "Id of the execution plan") @PathVariable("planId") planId: ExecutionPlanInfo.Id - ): Future[LineageDetailed] = { + ): Future[ExecutionPlanDetailed] = { epRepo.findById(planId) } diff --git a/consumer-rest-core/src/main/scala/za/co/absa/spline/consumer/rest/controller/LineageDetailedController.scala b/consumer-rest-core/src/main/scala/za/co/absa/spline/consumer/rest/controller/LineageDetailedController.scala index 01f484a33..cdc82b59c 100644 --- a/consumer-rest-core/src/main/scala/za/co/absa/spline/consumer/rest/controller/LineageDetailedController.scala +++ b/consumer-rest-core/src/main/scala/za/co/absa/spline/consumer/rest/controller/LineageDetailedController.scala @@ -20,7 +20,7 @@ import io.swagger.annotations._ import org.springframework.beans.factory.annotation.Autowired import org.springframework.web.bind.annotation._ import za.co.absa.spline.consumer.rest.controller.LineageDetailedController.AttributeLineageAndImpact -import za.co.absa.spline.consumer.service.model.{AttributeGraph, ExecutionPlanInfo, LineageDetailed} +import za.co.absa.spline.consumer.service.model.{AttributeGraph, ExecutionPlanInfo, ExecutionPlanDetailed} import za.co.absa.spline.consumer.service.repo.ExecutionPlanRepository import scala.concurrent.{ExecutionContext, Future} @@ -44,7 +44,7 @@ class LineageDetailedController @Autowired()(val epRepo: ExecutionPlanRepository def lineageDetailed( @ApiParam(value = "Execution plan ID", required = true) @RequestParam("execId") execId: ExecutionPlanInfo.Id - ): Future[LineageDetailed] = { + ): Future[ExecutionPlanDetailed] = { epRepo.findById(execId) } diff --git a/consumer-rest-core/src/test/scala/za/co/absa/spline/consumer/rest/controller/ExecutionPlansControllerTest.scala b/consumer-rest-core/src/test/scala/za/co/absa/spline/consumer/rest/controller/ExecutionPlansControllerTest.scala index 1f6016081..fd0ad40d2 100644 --- a/consumer-rest-core/src/test/scala/za/co/absa/spline/consumer/rest/controller/ExecutionPlansControllerTest.scala +++ b/consumer-rest-core/src/test/scala/za/co/absa/spline/consumer/rest/controller/ExecutionPlansControllerTest.scala @@ -32,7 +32,7 @@ class ExecutionPlansControllerTest extends org.scalatest.flatspec.AnyFlatSpec wi it should "return an execPlan in a form of a LineageDetailed object from execPlan(id)" in { val ep1id = UUID.randomUUID() - val lineageDetailed1 = mock[LineageDetailed] + val lineageDetailed1 = mock[ExecutionPlanDetailed] val mockedEpRepo = mock[ExecutionPlanRepository] when(mockedEpRepo.findById(eqTo(ep1id))(any())).thenReturn(Future.successful(lineageDetailed1)) diff --git a/consumer-services/src/main/scala/za/co/absa/spline/consumer/service/repo/ExecutionPlanRepository.scala b/consumer-services/src/main/scala/za/co/absa/spline/consumer/service/repo/ExecutionPlanRepository.scala index 9309fbd6a..fb2df8c68 100644 --- a/consumer-services/src/main/scala/za/co/absa/spline/consumer/service/repo/ExecutionPlanRepository.scala +++ b/consumer-services/src/main/scala/za/co/absa/spline/consumer/service/repo/ExecutionPlanRepository.scala @@ -26,7 +26,7 @@ trait ExecutionPlanRepository { def execPlanAttributeImpact(attrId: Attribute.Id)(implicit ec: ExecutionContext): Future[AttributeGraph] - def findById(execId: ExecutionPlanInfo.Id)(implicit ec: ExecutionContext): Future[LineageDetailed] + def findById(execId: ExecutionPlanInfo.Id)(implicit ec: ExecutionContext): Future[ExecutionPlanDetailed] def find( asAtTime: Long, diff --git a/consumer-services/src/main/scala/za/co/absa/spline/consumer/service/repo/ExecutionPlanRepositoryImpl.scala b/consumer-services/src/main/scala/za/co/absa/spline/consumer/service/repo/ExecutionPlanRepositoryImpl.scala index 0c39eed84..7645ef708 100644 --- a/consumer-services/src/main/scala/za/co/absa/spline/consumer/service/repo/ExecutionPlanRepositoryImpl.scala +++ b/consumer-services/src/main/scala/za/co/absa/spline/consumer/service/repo/ExecutionPlanRepositoryImpl.scala @@ -16,98 +16,23 @@ package za.co.absa.spline.consumer.service.repo -import com.arangodb.async.{ArangoCursorAsync, ArangoDatabaseAsync} -import com.arangodb.model.AqlQueryOptions +import com.arangodb.async.ArangoDatabaseAsync import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Repository import za.co.absa.spline.consumer.service.model._ +import za.co.absa.spline.persistence.ArangoImplicits.ArangoDatabaseAsyncScalaWrapper +import za.co.absa.spline.persistence.FoxxRouter import za.co.absa.spline.persistence.model.Operation.OperationTypes import scala.concurrent.{ExecutionContext, Future} -import scala.jdk.StreamConverters._ @Repository -class ExecutionPlanRepositoryImpl @Autowired()(db: ArangoDatabaseAsync) extends ExecutionPlanRepository { +class ExecutionPlanRepositoryImpl @Autowired()(db: ArangoDatabaseAsync, foxxRouter: FoxxRouter) extends ExecutionPlanRepository { - import za.co.absa.spline.persistence.ArangoImplicits._ - - override def findById(execId: ExecutionPlanInfo.Id)(implicit ec: ExecutionContext): Future[LineageDetailed] = { - db.queryOne[LineageDetailed]( - s""" - |WITH executionPlan, executes, operation, follows, emits, schema, consistsOf, attribute - |LET execPlan = DOCUMENT("executionPlan", @execPlanId) - |LET ops = ( - | FOR op IN operation - | FILTER op._belongsTo == execPlan._id - | RETURN op - | ) - |LET edges = ( - | FOR f IN follows - | FILTER f._belongsTo == execPlan._id - | RETURN f - | ) - |LET schemaIds = ( - | FOR op IN ops - | FOR schema IN 1 - | OUTBOUND op emits - | RETURN DISTINCT schema._id - | ) - |LET attributes = ( - | FOR sid IN schemaIds - | FOR a IN 1 - | OUTBOUND sid consistsOf - | RETURN DISTINCT { - | "id" : a._key, - | "name" : a.name, - | "dataTypeId" : a.dataType - | } - | ) - |LET inputs = FLATTEN( - | FOR op IN ops - | FILTER op.type == "Read" - | RETURN op.inputSources[* RETURN { - | "source" : CURRENT, - | "sourceType": op.extra.sourceType - | }] - | ) - |LET output = FIRST( - | ops[* - | FILTER CURRENT.type == "Write" - | RETURN { - | "source" : CURRENT.outputSource, - | "sourceType": CURRENT.extra.destinationType - | }] - | ) - |RETURN execPlan && { - | "graph": { - | "nodes": ops[* RETURN { - | "_id" : CURRENT._key, - | "_type": CURRENT.type, - | "name" : CURRENT.name || CURRENT.type, - | "properties": {} - | }], - | "edges": edges[* RETURN { - | "source": PARSE_IDENTIFIER(CURRENT._to).key, - | "target": PARSE_IDENTIFIER(CURRENT._from).key - | }] - | }, - | "executionPlan": { - | "_id" : execPlan._key, - | "systemInfo": execPlan.systemInfo, - | "agentInfo" : execPlan.agentInfo, - | "name" : execPlan.name || execPlan._key, - | "extra" : MERGE( - | execPlan.extra, - | { attributes }, - | { "appName" : execPlan.name || execPlan._key } - | ), - | "inputs" : inputs, - | "output" : output - | } - |} - |""".stripMargin, - Map("execPlanId" -> execId) - ).filter(null.!=) + override def findById(planId: ExecutionPlanInfo.Id)(implicit ec: ExecutionContext): Future[ExecutionPlanDetailed] = { + foxxRouter + .get[ExecutionPlanDetailed](s"/spline/consumer/execution-plans/$planId/_detailed") + .filter(null.!=) } override def find( @@ -115,91 +40,18 @@ class ExecutionPlanRepositoryImpl @Autowired()(db: ArangoDatabaseAsync) extends pageRequest: PageRequest, sortRequest: SortRequest )(implicit ec: ExecutionContext): Future[(Seq[ExecutionPlanInfo], Long)] = { - val queryResult: Future[ArangoCursorAsync[ExecutionPlanInfo]] = db.queryAs[ExecutionPlanInfo]( - s""" - |WITH executionPlan, progress, operation, follows, emits, schema, consistsOf, attribute - |FOR execPlan IN executionPlan - | LET progress = ( - | FOR prog IN progress - | FILTER prog.execPlanDetails.executionPlanKey == execPlan._key - | FILTER prog.timestamp <= @asAtTime - | LIMIT 1 - | RETURN prog - | ) - | FILTER LENGTH(progress) - | SORT execPlan.@sortField @sortOrder - | LIMIT @pageOffset*@pageSize, @pageSize - | - | LET ops = ( - | FOR op IN operation - | FILTER op._belongsTo == execPlan._id - | RETURN op - | ) - | LET edges = ( - | FOR f IN follows - | FILTER f._belongsTo == execPlan._id - | RETURN f - | ) - | LET schemaIds = ( - | FOR op IN ops - | FOR schema IN 1 - | OUTBOUND op emits - | RETURN DISTINCT schema._id - | ) - | LET attributes = ( - | FOR sid IN schemaIds - | FOR a IN 1 - | OUTBOUND sid consistsOf - | RETURN DISTINCT { - | "id" : a._key, - | "name" : a.name, - | "dataTypeId" : a.dataType - | } - | ) - | LET inputs = FLATTEN( - | FOR op IN ops - | FILTER op.type == "Read" - | RETURN op.inputSources[* RETURN { - | "source" : CURRENT, - | "sourceType": op.extra.sourceType - | }] - | ) - | LET output = FIRST( - | ops[* - | FILTER CURRENT.type == "Write" - | RETURN { - | "source" : CURRENT.outputSource, - | "sourceType": CURRENT.extra.destinationType - | }] - | ) - | return { - | "_id" : execPlan._key, - | "systemInfo": execPlan.systemInfo, - | "agentInfo" : execPlan.agentInfo, - | "name" : execPlan.name || execPlan._key, - | "extra" : MERGE( - | execPlan.extra, - | { attributes }, - | { "appName" : execPlan.name || execPlan._key } - | ), - | "inputs" : inputs, - | "output" : output - | } - |""".stripMargin, - Map[String, AnyRef]( - "asAtTime" -> Long.box(asAtTime), - "pageOffset" -> Int.box(pageRequest.page - 1), - "pageSize" -> Int.box(pageRequest.size), - "sortField" -> sortRequest.field, - "sortOrder" -> sortRequest.order - ), - new AqlQueryOptions().fullCount(true) - ) + val execPlanInfoFrame: Future[Frame[ExecutionPlanInfo]] = foxxRouter.get[Frame[ExecutionPlanInfo]](s"/spline/consumer/execution-plans/", Map[String, AnyRef]( + "asAtTime" -> Long.box(asAtTime), + "pageOffset" -> Int.box(pageRequest.page - 1), + "pageSize" -> Int.box(pageRequest.size), + "sortField" -> sortRequest.field, + "sortOrder" -> sortRequest.order + )) - val findResult: Future[(Seq[ExecutionPlanInfo], Long)] = queryResult.map { - arangoCursorAsync => - val items = arangoCursorAsync.streamRemaining().toScala(LazyList) - val totalCount = arangoCursorAsync.getStats.getFullCount + val findResult: Future[(Seq[ExecutionPlanInfo], Long)] = execPlanInfoFrame.map { + frame => + val items = frame.items + val totalCount = frame.totalCount (items, totalCount) }