diff --git a/arangodb-foxx-api/pom.xml b/arangodb-foxx-api/pom.xml
index 34eb4442a..0fb6953c1 100644
--- a/arangodb-foxx-api/pom.xml
+++ b/arangodb-foxx-api/pom.xml
@@ -62,6 +62,7 @@
za.co.absa.spline.consumer.service.model.Frame
za.co.absa.spline.consumer.service.model.ExecutionPlanInfo
za.co.absa.spline.consumer.service.model.ExecutionEventInfo
+ za.co.absa.spline.consumer.service.model.Label
diff --git a/arangodb-foxx-services/src/main/routes/events-router.ts b/arangodb-foxx-services/src/main/routes/events-router.ts
index b72df9610..72dc2e64c 100644
--- a/arangodb-foxx-services/src/main/routes/events-router.ts
+++ b/arangodb-foxx-services/src/main/routes/events-router.ts
@@ -24,6 +24,7 @@ import { listExecutionEvents, storeExecutionEvent } from '../services/execution-
import { TxManager } from '../services/txm'
import { checkKeyExistence } from '../services/store'
import { NodeCollectionName } from '../persistence/model'
+import * as Logger from '../utils/logger'
export const eventsRouter: Foxx.Router = createRouter()
@@ -32,6 +33,10 @@ function toBoolean(str: string): boolean {
return str == null ? null : str.toLowerCase() === 'true'
}
+function toNumber(str: string): number {
+ return str == null ? null : +str
+}
+
// Store execution event
eventsRouter
.post('/',
@@ -48,37 +53,35 @@ eventsRouter
eventsRouter
.get('/',
(req: Foxx.Request, res: Foxx.Response) => {
+ Logger.debug(`Foxx: GET ${req.url}`)
const events = listExecutionEvents(
- +req.queryParams.asAtTime,
- +req.queryParams.timestampStart,
- +req.queryParams.timestampEnd,
- +req.queryParams.pageOffset,
- +req.queryParams.pageSize,
- req.queryParams.sortField,
- req.queryParams.sortOrder,
+ toNumber(req.queryParams.asAtTime),
+ toNumber(req.queryParams.timestampStart),
+ toNumber(req.queryParams.timestampEnd),
req.queryParams.searchTerm || null,
toBoolean(req.queryParams.writeAppends),
req.queryParams.applicationId || null,
req.queryParams.dataSourceUri || null,
- req.queryParams.lblNames || [],
- req.queryParams.lblValues || []
+ JSON.parse(req.queryParams.labels || '[]'),
+ req.queryParams.sortField,
+ req.queryParams.sortOrder,
+ toNumber(req.queryParams.offset),
+ toNumber(req.queryParams.limit),
)
res.send(events)
})
- //todo: add query params validation
- // .queryParam('asAtTime', Joi.number().required())
- // .queryParam('timestampStart', Joi.number().optional())
- // .queryParam('timestampEnd', Joi.number().optional())
- // .queryParam('pageOffset', Joi.number().required())
- // .queryParam('pageSize', Joi.number().required())
- // .queryParam('sortField', Joi.string().required())
- // .queryParam('sortOrder', Joi.string().required())
- // .queryParam('searchTerm', Joi.string().optional())
- // .queryParam('writeAppends', Joi.boolean().optional())
- // .queryParam('applicationId', Joi.string().optional())
- // .queryParam('dataSourceUri', Joi.string().optional())
- // .queryParam('lblNames', Joi.array().items(Joi.string()).required())
- // .queryParam('lblValues', Joi.array().items(Joi.string()).required())
+ .queryParam('asAtTime', Joi.number().required())
+ .queryParam('timestampStart', Joi.number().optional())
+ .queryParam('timestampEnd', Joi.number().optional())
+ .queryParam('searchTerm', Joi.string().optional())
+ .queryParam('writeAppends', Joi.boolean().optional())
+ .queryParam('applicationId', Joi.string().optional())
+ .queryParam('dataSourceUri', Joi.string().optional())
+ .queryParam('labels', Joi.string().optional()) // serialized JSON array
+ .queryParam('sortField', Joi.string().required())
+ .queryParam('sortOrder', Joi.string().required())
+ .queryParam('offset', Joi.number().required())
+ .queryParam('limit', Joi.number().required())
.response(200, ['application/json'])
.summary('List execution events')
diff --git a/arangodb-foxx-services/src/main/services/data-source-store.ts b/arangodb-foxx-services/src/main/services/data-source-store.ts
index 0f93f6d56..a1f4222a3 100644
--- a/arangodb-foxx-services/src/main/services/data-source-store.ts
+++ b/arangodb-foxx-services/src/main/services/data-source-store.ts
@@ -24,8 +24,8 @@ import * as persistence from '../model'
export function storeDataSources(ds: api.DataSource): persistence.DataSource {
return withTimeTracking(`STORE DATA SOURCE ${ds}`, () => {
return db._query(aql`
- WITH ${NodeCollectionName.DataSource}
- UPSERT { uri: ${ds.uri} }
+ WITH ${aql.literal(NodeCollectionName.DataSource)}
+ UPSERT { uri: ${ds}.uri }
INSERT KEEP(${ds}, ['_created', 'uri', 'name'])
UPDATE {} IN ${aql.literal(NodeCollectionName.DataSource)}
RETURN KEEP(NEW, ['_key', 'uri'])
diff --git a/arangodb-foxx-services/src/main/services/execution-event-store.ts b/arangodb-foxx-services/src/main/services/execution-event-store.ts
index 34d32b566..c74512c4d 100644
--- a/arangodb-foxx-services/src/main/services/execution-event-store.ts
+++ b/arangodb-foxx-services/src/main/services/execution-event-store.ts
@@ -15,13 +15,14 @@
*/
-import { ExecPlanDetails, ExecutionEventInfo, Frame, Progress } from '../../external/api.model'
-import { CollectionName, edge, WriteTxInfo } from '../persistence/model'
+import { ExecPlanDetails, ExecutionEventInfo, Frame, Label, Progress } from '../../external/api.model'
+import { CollectionName, edge, ViewName, WriteTxInfo } from '../persistence/model'
import { store } from './store'
import { aql, db } from '@arangodb'
import { withTimeTracking } from '../utils/common'
import { TxManager } from './txm'
import { TxTemplate } from './txm/tx-template'
+import * as Logger from '../utils/logger'
const SEARCH_FIELDS = [
@@ -32,35 +33,46 @@ const SEARCH_FIELDS = [
'execPlanDetails.dataSourceType',
]
-function escapeJavaScript(str: string): string {
- return str.replace(/[\-\[\]\/{}()*+?.\\^$|'"\n]/g, '\\$&')
-}
-
export function listExecutionEvents(
asAtTime: number,
timestampStart: number | null,
timestampEnd: number | null,
- pageOffset: number,
- pageSize: number,
- sortField: string,
- sortOrder: string,
searchTerm: string | null,
writeAppends: boolean | null,
applicationId: string | null,
dataSourceUri: string | null,
- lblNames: string[],
- lblValues: string[]
+ labels: Label[],
+ sortField: string,
+ sortOrder: string,
+ offset: number,
+ limit: number,
): Frame> {
+ const lblNames = labels.map(lbl => lbl.name)
+ const lblValues = labels.map(lbl => lbl.values)
- const q = aql`
- WITH progress_view
- FOR ee IN progress_view
+ const q: ArangoDB.Query = aql`
+ WITH ${aql.literal(ViewName.ProgressSearchView)}
+ FOR ee IN ${aql.literal(ViewName.ProgressSearchView)}
SEARCH ee._created <= ${asAtTime}
AND (${timestampStart} == null OR IN_RANGE(ee.timestamp, ${timestampStart}, ${timestampEnd}, true, true))
AND (${applicationId} == null OR ${applicationId} == ee.extra.appId)
AND (${dataSourceUri} == null OR ${dataSourceUri} == ee.execPlanDetails.dataSourceUri)
AND (${writeAppends} == null OR ee.execPlanDetails.append IN ${writeAppends})
+ ${aql.join(lblNames.map((lblName, i) => aql`
+ AND (
+ ${lblValues[i]} ANY == ee.labels[${lblName}]
+ OR ${lblValues[i]} ANY == ee.execPlanDetails.labels[${lblName}]
+ )
+ `))}
+
+ AND (
+ ${searchTerm} == null
+ ${aql.join(SEARCH_FIELDS.map(fld => aql`
+ OR ANALYZER(LIKE(ee.${aql.literal(fld)}, CONCAT("%", TOKENS(${searchTerm}, "norm_en")[0], "%")), "norm_en")
+ `))}
+ )
+
LET resItem = {
"executionEventId" : ee._key,
"executionPlanId" : ee.execPlanDetails.executionPlanKey,
@@ -79,12 +91,12 @@ export function listExecutionEvents(
}
SORT resItem.${sortField} ${sortOrder}
- LIMIT ${pageOffset * pageSize}, ${pageSize}
+ LIMIT ${offset}, ${limit}
RETURN resItem
`
- console.log('AQL query: ', q)
+ Logger.debug('AQL query: ', q)
const cursor = db._query(q, { fullCount: true })
const items: ExecutionEventInfo[] = cursor.toArray()
@@ -93,7 +105,7 @@ export function listExecutionEvents(
return {
items,
totalCount,
- offset: pageOffset * pageSize,
+ offset,
}
}
diff --git a/consumer-services/src/main/scala/za/co/absa/spline/consumer/service/model/PageRequest.scala b/consumer-services/src/main/scala/za/co/absa/spline/consumer/service/model/PageRequest.scala
index b5905af5e..2ab389b26 100644
--- a/consumer-services/src/main/scala/za/co/absa/spline/consumer/service/model/PageRequest.scala
+++ b/consumer-services/src/main/scala/za/co/absa/spline/consumer/service/model/PageRequest.scala
@@ -15,4 +15,9 @@
*/
package za.co.absa.spline.consumer.service.model
-case class PageRequest(page: Int, size: Int)
+case class PageRequest(page: Int, size: Int) {
+
+ def offset: Int = (page - 1) * size
+
+ def limit: Int = size
+}
diff --git a/consumer-services/src/main/scala/za/co/absa/spline/consumer/service/model/SortRequest.scala b/consumer-services/src/main/scala/za/co/absa/spline/consumer/service/model/SortRequest.scala
index 0c8cba34d..249ec6495 100644
--- a/consumer-services/src/main/scala/za/co/absa/spline/consumer/service/model/SortRequest.scala
+++ b/consumer-services/src/main/scala/za/co/absa/spline/consumer/service/model/SortRequest.scala
@@ -15,4 +15,4 @@
*/
package za.co.absa.spline.consumer.service.model
-case class SortRequest(sortField: String, sortOrder: String)
+case class SortRequest(field: String, order: String)
diff --git a/consumer-services/src/main/scala/za/co/absa/spline/consumer/service/repo/DataSourceRepositoryImpl.scala b/consumer-services/src/main/scala/za/co/absa/spline/consumer/service/repo/DataSourceRepositoryImpl.scala
index 7138e9b78..90ddd2627 100644
--- a/consumer-services/src/main/scala/za/co/absa/spline/consumer/service/repo/DataSourceRepositoryImpl.scala
+++ b/consumer-services/src/main/scala/za/co/absa/spline/consumer/service/repo/DataSourceRepositoryImpl.scala
@@ -112,8 +112,8 @@ class DataSourceRepositoryImpl @Autowired()(db: ArangoDatabaseAsync) extends Dat
"timestampEnd" -> maybeWriteTimestampEnd.map(Long.box).orNull,
"pageOffset" -> Int.box(pageRequest.page - 1),
"pageSize" -> Int.box(pageRequest.size),
- "sortField" -> sortRequest.sortField,
- "sortOrder" -> sortRequest.sortOrder,
+ "sortField" -> sortRequest.field,
+ "sortOrder" -> sortRequest.order,
"searchTerm" -> maybeSearchTerm.map(escapeAQLSearch).orNull,
"writeAppends" -> (if (writeAppendOptions.isEmpty) null else writeAppendOptions.flatten.map(Boolean.box)),
"includeNoWrite" -> Boolean.box(writeAppendOptions.contains(None)),
diff --git a/consumer-services/src/main/scala/za/co/absa/spline/consumer/service/repo/ExecutionEventRepositoryImpl.scala b/consumer-services/src/main/scala/za/co/absa/spline/consumer/service/repo/ExecutionEventRepositoryImpl.scala
index 8ebd72d3d..774cea13e 100644
--- a/consumer-services/src/main/scala/za/co/absa/spline/consumer/service/repo/ExecutionEventRepositoryImpl.scala
+++ b/consumer-services/src/main/scala/za/co/absa/spline/consumer/service/repo/ExecutionEventRepositoryImpl.scala
@@ -21,6 +21,7 @@ import org.springframework.stereotype.Repository
import za.co.absa.spline.common.StringEscapeUtils.escapeAQLSearch
import za.co.absa.spline.consumer.service.model._
import za.co.absa.spline.consumer.service.repo.ExecutionEventRepositoryImpl._
+import za.co.absa.spline.persistence.DefaultJsonSerDe._
import za.co.absa.spline.persistence.FoxxRouter
import scala.concurrent.{ExecutionContext, Future}
@@ -42,23 +43,19 @@ class ExecutionEventRepositoryImpl @Autowired()(
maybeApplicationId: Option[String],
maybeDataSourceUri: Option[String]
)(implicit ec: ExecutionContext): Future[Frame[ExecutionEventInfo]] = {
- val lblNames = labels.map(_.name)
- val lblValues = labels.map(_.values)
-
foxxRouter.get[Frame[ExecutionEventInfo]]("/spline/execution-events", Map(
"asAtTime" -> Long.box(asAtTime),
"timestampStart" -> maybeTimestampStart.map(Long.box).orNull,
"timestampEnd" -> maybeTimestampEnd.map(Long.box).orNull,
- "pageOffset" -> Int.box(pageRequest.page - 1),
- "pageSize" -> Int.box(pageRequest.size),
- "sortField" -> sortRequest.sortField,
- "sortOrder" -> sortRequest.sortOrder,
"searchTerm" -> maybeSearchTerm.map(escapeAQLSearch).orNull,
- "writeAppends" -> (if (writeAppendOptions.isEmpty) null else writeAppendOptions.flatten.map(Boolean.box)),
"applicationId" -> maybeApplicationId.orNull,
"dataSourceUri" -> maybeDataSourceUri.orNull,
- "lblNames" -> lblNames.toSeq,
- "lblValues" -> lblValues.toSeq,
+ "writeAppends" -> (if (writeAppendOptions.isEmpty) null else writeAppendOptions.flatten.map(Boolean.box)),
+ "labels" -> labels.toJson,
+ "sortField" -> sortRequest.field,
+ "sortOrder" -> sortRequest.order,
+ "offset" -> pageRequest.offset,
+ "limit" -> pageRequest.limit,
))
}
}
diff --git a/consumer-services/src/main/scala/za/co/absa/spline/consumer/service/repo/ExecutionPlanRepositoryImpl.scala b/consumer-services/src/main/scala/za/co/absa/spline/consumer/service/repo/ExecutionPlanRepositoryImpl.scala
index 8baa52f14..0c39eed84 100644
--- a/consumer-services/src/main/scala/za/co/absa/spline/consumer/service/repo/ExecutionPlanRepositoryImpl.scala
+++ b/consumer-services/src/main/scala/za/co/absa/spline/consumer/service/repo/ExecutionPlanRepositoryImpl.scala
@@ -190,8 +190,8 @@ class ExecutionPlanRepositoryImpl @Autowired()(db: ArangoDatabaseAsync) extends
"asAtTime" -> Long.box(asAtTime),
"pageOffset" -> Int.box(pageRequest.page - 1),
"pageSize" -> Int.box(pageRequest.size),
- "sortField" -> sortRequest.sortField,
- "sortOrder" -> sortRequest.sortOrder
+ "sortField" -> sortRequest.field,
+ "sortOrder" -> sortRequest.order
),
new AqlQueryOptions().fullCount(true)
)