Skip to content

Commit

Permalink
spline #1155 Move "findEvents()" to Foxx
Browse files Browse the repository at this point in the history
  • Loading branch information
wajda committed Sep 26, 2024
1 parent 8be794e commit bce12fc
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 59 deletions.
1 change: 1 addition & 0 deletions arangodb-foxx-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
<argument>za.co.absa.spline.consumer.service.model.Frame</argument>
<argument>za.co.absa.spline.consumer.service.model.ExecutionPlanInfo</argument>
<argument>za.co.absa.spline.consumer.service.model.ExecutionEventInfo</argument>
<argument>za.co.absa.spline.consumer.service.model.Label</argument>
</arguments>
</configuration>
<dependencies>
Expand Down
49 changes: 26 additions & 23 deletions arangodb-foxx-services/src/main/routes/events-router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import { listExecutionEvents, storeExecutionEvent } from '../services/execution-
import { TxManager } from '../services/txm'
import { checkKeyExistence } from '../services/store'
import { NodeCollectionName } from '../persistence/model'
import * as Logger from '../utils/logger'


export const eventsRouter: Foxx.Router = createRouter()
Expand All @@ -32,6 +33,10 @@ function toBoolean(str: string): boolean {
return str == null ? null : str.toLowerCase() === 'true'
}

function toNumber(str: string): number {
return str == null ? null : +str
}

// Store execution event
eventsRouter
.post('/',
Expand All @@ -48,37 +53,35 @@ eventsRouter
eventsRouter
.get('/',
(req: Foxx.Request, res: Foxx.Response) => {
Logger.debug(`Foxx: GET ${req.url}`)
const events = listExecutionEvents(
+req.queryParams.asAtTime,
+req.queryParams.timestampStart,
+req.queryParams.timestampEnd,
+req.queryParams.pageOffset,
+req.queryParams.pageSize,
req.queryParams.sortField,
req.queryParams.sortOrder,
toNumber(req.queryParams.asAtTime),
toNumber(req.queryParams.timestampStart),
toNumber(req.queryParams.timestampEnd),
req.queryParams.searchTerm || null,
toBoolean(req.queryParams.writeAppends),
req.queryParams.applicationId || null,
req.queryParams.dataSourceUri || null,
req.queryParams.lblNames || [],
req.queryParams.lblValues || []
JSON.parse(req.queryParams.labels || '[]'),
req.queryParams.sortField,
req.queryParams.sortOrder,
toNumber(req.queryParams.offset),
toNumber(req.queryParams.limit),
)
res.send(events)
})
//todo: add query params validation
// .queryParam('asAtTime', Joi.number().required())
// .queryParam('timestampStart', Joi.number().optional())
// .queryParam('timestampEnd', Joi.number().optional())
// .queryParam('pageOffset', Joi.number().required())
// .queryParam('pageSize', Joi.number().required())
// .queryParam('sortField', Joi.string().required())
// .queryParam('sortOrder', Joi.string().required())
// .queryParam('searchTerm', Joi.string().optional())
// .queryParam('writeAppends', Joi.boolean().optional())
// .queryParam('applicationId', Joi.string().optional())
// .queryParam('dataSourceUri', Joi.string().optional())
// .queryParam('lblNames', Joi.array().items(Joi.string()).required())
// .queryParam('lblValues', Joi.array().items(Joi.string()).required())
.queryParam('asAtTime', Joi.number().required())
.queryParam('timestampStart', Joi.number().optional())
.queryParam('timestampEnd', Joi.number().optional())
.queryParam('searchTerm', Joi.string().optional())
.queryParam('writeAppends', Joi.boolean().optional())
.queryParam('applicationId', Joi.string().optional())
.queryParam('dataSourceUri', Joi.string().optional())
.queryParam('labels', Joi.string().optional()) // serialized JSON array
.queryParam('sortField', Joi.string().required())
.queryParam('sortOrder', Joi.string().required())
.queryParam('offset', Joi.number().required())
.queryParam('limit', Joi.number().required())
.response(200, ['application/json'])
.summary('List execution events')

Expand Down
4 changes: 2 additions & 2 deletions arangodb-foxx-services/src/main/services/data-source-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import * as persistence from '../model'
export function storeDataSources(ds: api.DataSource): persistence.DataSource {
return withTimeTracking(`STORE DATA SOURCE ${ds}`, () => {
return db._query(aql`
WITH ${NodeCollectionName.DataSource}
UPSERT { uri: ${ds.uri} }
WITH ${aql.literal(NodeCollectionName.DataSource)}
UPSERT { uri: ${ds}.uri }
INSERT KEEP(${ds}, ['_created', 'uri', 'name'])
UPDATE {} IN ${aql.literal(NodeCollectionName.DataSource)}
RETURN KEEP(NEW, ['_key', 'uri'])
Expand Down
48 changes: 30 additions & 18 deletions arangodb-foxx-services/src/main/services/execution-event-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
*/


import { ExecPlanDetails, ExecutionEventInfo, Frame, Progress } from '../../external/api.model'
import { CollectionName, edge, WriteTxInfo } from '../persistence/model'
import { ExecPlanDetails, ExecutionEventInfo, Frame, Label, Progress } from '../../external/api.model'
import { CollectionName, edge, ViewName, WriteTxInfo } from '../persistence/model'
import { store } from './store'
import { aql, db } from '@arangodb'
import { withTimeTracking } from '../utils/common'
import { TxManager } from './txm'
import { TxTemplate } from './txm/tx-template'
import * as Logger from '../utils/logger'


const SEARCH_FIELDS = [
Expand All @@ -32,35 +33,46 @@ const SEARCH_FIELDS = [
'execPlanDetails.dataSourceType',
]

function escapeJavaScript(str: string): string {
return str.replace(/[\-\[\]\/{}()*+?.\\^$|'"\n]/g, '\\$&')
}

export function listExecutionEvents(
asAtTime: number,
timestampStart: number | null,
timestampEnd: number | null,
pageOffset: number,
pageSize: number,
sortField: string,
sortOrder: string,
searchTerm: string | null,
writeAppends: boolean | null,
applicationId: string | null,
dataSourceUri: string | null,
lblNames: string[],
lblValues: string[]
labels: Label[],
sortField: string,
sortOrder: string,
offset: number,
limit: number,
): Frame<Partial<ExecutionEventInfo>> {
const lblNames = labels.map(lbl => lbl.name)
const lblValues = labels.map(lbl => lbl.values)

const q = aql`
WITH progress_view
FOR ee IN progress_view
const q: ArangoDB.Query = aql`
WITH ${aql.literal(ViewName.ProgressSearchView)}
FOR ee IN ${aql.literal(ViewName.ProgressSearchView)}
SEARCH ee._created <= ${asAtTime}
AND (${timestampStart} == null OR IN_RANGE(ee.timestamp, ${timestampStart}, ${timestampEnd}, true, true))
AND (${applicationId} == null OR ${applicationId} == ee.extra.appId)
AND (${dataSourceUri} == null OR ${dataSourceUri} == ee.execPlanDetails.dataSourceUri)
AND (${writeAppends} == null OR ee.execPlanDetails.append IN ${writeAppends})
${aql.join(lblNames.map((lblName, i) => aql`
AND (
${lblValues[i]} ANY == ee.labels[${lblName}]
OR ${lblValues[i]} ANY == ee.execPlanDetails.labels[${lblName}]
)
`))}
AND (
${searchTerm} == null
${aql.join(SEARCH_FIELDS.map(fld => aql`
OR ANALYZER(LIKE(ee.${aql.literal(fld)}, CONCAT("%", TOKENS(${searchTerm}, "norm_en")[0], "%")), "norm_en")
`))}
)
LET resItem = {
"executionEventId" : ee._key,
"executionPlanId" : ee.execPlanDetails.executionPlanKey,
Expand All @@ -79,12 +91,12 @@ export function listExecutionEvents(
}
SORT resItem.${sortField} ${sortOrder}
LIMIT ${pageOffset * pageSize}, ${pageSize}
LIMIT ${offset}, ${limit}
RETURN resItem
`

console.log('AQL query: ', q)
Logger.debug('AQL query: ', q)

const cursor = db._query(q, { fullCount: true })
const items: ExecutionEventInfo[] = cursor.toArray()
Expand All @@ -93,7 +105,7 @@ export function listExecutionEvents(
return {
items,
totalCount,
offset: pageOffset * pageSize,
offset,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,9 @@
*/
package za.co.absa.spline.consumer.service.model

case class PageRequest(page: Int, size: Int)
case class PageRequest(page: Int, size: Int) {

def offset: Int = (page - 1) * size

def limit: Int = size
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@
*/
package za.co.absa.spline.consumer.service.model

case class SortRequest(sortField: String, sortOrder: String)
case class SortRequest(field: String, order: String)
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ class DataSourceRepositoryImpl @Autowired()(db: ArangoDatabaseAsync) extends Dat
"timestampEnd" -> maybeWriteTimestampEnd.map(Long.box).orNull,
"pageOffset" -> Int.box(pageRequest.page - 1),
"pageSize" -> Int.box(pageRequest.size),
"sortField" -> sortRequest.sortField,
"sortOrder" -> sortRequest.sortOrder,
"sortField" -> sortRequest.field,
"sortOrder" -> sortRequest.order,
"searchTerm" -> maybeSearchTerm.map(escapeAQLSearch).orNull,
"writeAppends" -> (if (writeAppendOptions.isEmpty) null else writeAppendOptions.flatten.map(Boolean.box)),
"includeNoWrite" -> Boolean.box(writeAppendOptions.contains(None)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.springframework.stereotype.Repository
import za.co.absa.spline.common.StringEscapeUtils.escapeAQLSearch
import za.co.absa.spline.consumer.service.model._
import za.co.absa.spline.consumer.service.repo.ExecutionEventRepositoryImpl._
import za.co.absa.spline.persistence.DefaultJsonSerDe._
import za.co.absa.spline.persistence.FoxxRouter

import scala.concurrent.{ExecutionContext, Future}
Expand All @@ -42,23 +43,19 @@ class ExecutionEventRepositoryImpl @Autowired()(
maybeApplicationId: Option[String],
maybeDataSourceUri: Option[String]
)(implicit ec: ExecutionContext): Future[Frame[ExecutionEventInfo]] = {
val lblNames = labels.map(_.name)
val lblValues = labels.map(_.values)

foxxRouter.get[Frame[ExecutionEventInfo]]("/spline/execution-events", Map(
"asAtTime" -> Long.box(asAtTime),
"timestampStart" -> maybeTimestampStart.map(Long.box).orNull,
"timestampEnd" -> maybeTimestampEnd.map(Long.box).orNull,
"pageOffset" -> Int.box(pageRequest.page - 1),
"pageSize" -> Int.box(pageRequest.size),
"sortField" -> sortRequest.sortField,
"sortOrder" -> sortRequest.sortOrder,
"searchTerm" -> maybeSearchTerm.map(escapeAQLSearch).orNull,
"writeAppends" -> (if (writeAppendOptions.isEmpty) null else writeAppendOptions.flatten.map(Boolean.box)),
"applicationId" -> maybeApplicationId.orNull,
"dataSourceUri" -> maybeDataSourceUri.orNull,
"lblNames" -> lblNames.toSeq,
"lblValues" -> lblValues.toSeq,
"writeAppends" -> (if (writeAppendOptions.isEmpty) null else writeAppendOptions.flatten.map(Boolean.box)),
"labels" -> labels.toJson,
"sortField" -> sortRequest.field,
"sortOrder" -> sortRequest.order,
"offset" -> pageRequest.offset,
"limit" -> pageRequest.limit,
))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ class ExecutionPlanRepositoryImpl @Autowired()(db: ArangoDatabaseAsync) extends
"asAtTime" -> Long.box(asAtTime),
"pageOffset" -> Int.box(pageRequest.page - 1),
"pageSize" -> Int.box(pageRequest.size),
"sortField" -> sortRequest.sortField,
"sortOrder" -> sortRequest.sortOrder
"sortField" -> sortRequest.field,
"sortOrder" -> sortRequest.order
),
new AqlQueryOptions().fullCount(true)
)
Expand Down

0 comments on commit bce12fc

Please sign in to comment.