From aab65afbc13205ea695c503fbbdf2326fe3cafd2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Rodriguez?= Date: Tue, 22 Aug 2023 16:33:38 +0200 Subject: [PATCH] feat: digitaltwin exports (#315) * refactor(MeasureExporter): use AbstractExporter * feat(export): implement assets export * feat(export): implement devices export --- .gitignore | 2 +- lib/modules/asset/AssetsController.ts | 121 ++++++-- lib/modules/asset/types/AssetApi.ts | 12 + lib/modules/device/DevicesController.ts | 120 ++++++-- lib/modules/device/types/DeviceApi.ts | 12 + lib/modules/measure/MeasureExporter.ts | 281 ++++++------------ lib/modules/shared/index.ts | 1 + .../shared/services/AbstractExporter.ts | 159 ++++++++++ .../shared/services/DigitalTwinExporter.ts | 140 +++++++++ lib/modules/shared/services/index.ts | 2 + package-lock.json | 7 + package.json | 1 + tests/fixtures/fixtures.ts | 34 +++ .../assets/action-export-measures.test.ts | 7 +- .../modules/assets/action-export.test.ts | 110 +++++++ .../devices/action-export-measures.test.ts | 6 +- .../modules/devices/action-export.test.ts | 124 ++++++++ 17 files changed, 882 insertions(+), 257 deletions(-) create mode 100644 lib/modules/shared/services/AbstractExporter.ts create mode 100644 lib/modules/shared/services/DigitalTwinExporter.ts create mode 100644 lib/modules/shared/services/index.ts create mode 100644 tests/scenario/modules/assets/action-export.test.ts create mode 100644 tests/scenario/modules/devices/action-export.test.ts diff --git a/.gitignore b/.gitignore index 5ef75b46..6f3a7a73 100644 --- a/.gitignore +++ b/.gitignore @@ -22,4 +22,4 @@ dist/ types/dist/ # Test files -asset.csv +/*.csv diff --git a/lib/modules/asset/AssetsController.ts b/lib/modules/asset/AssetsController.ts index b0049d00..fceea69f 100644 --- a/lib/modules/asset/AssetsController.ts +++ b/lib/modules/asset/AssetsController.ts @@ -1,8 +1,8 @@ import { ControllerDefinition, HttpStream, KuzzleRequest } from "kuzzle"; -import { PassThrough } from "stream"; import { MeasureExporter } from "../measure/"; -import { DeviceManagerPlugin } from "../plugin"; +import { DeviceManagerPlugin, InternalCollection } from "../plugin"; +import { DigitalTwinExporter } from "../shared"; import { AssetService } from "./AssetService"; import { AssetSerializer } from "./model/AssetSerializer"; @@ -17,6 +17,8 @@ import { export class AssetsController { public definition: ControllerDefinition; + private exporter: DigitalTwinExporter; + private measureExporter: MeasureExporter; constructor( private plugin: DeviceManagerPlugin, @@ -76,9 +78,31 @@ export class AssetsController { }, ], }, + export: { + handler: this.export.bind(this), + http: [ + { + path: "device-manager/:engineId/assets/_export/:exportId", + verb: "get", + }, + { + path: "device-manager/:engineId/assets/_export", + verb: "post", + }, + ], + }, }, }; /* eslint-enable sort-keys */ + + this.exporter = new DigitalTwinExporter( + this.plugin, + InternalCollection.ASSETS + ); + this.measureExporter = new MeasureExporter( + this.plugin, + InternalCollection.ASSETS + ); } async get(request: KuzzleRequest): Promise { @@ -177,19 +201,21 @@ export class AssetsController { const sort = request.input.body?.sort; const type = request.input.args.type; - const exporter = new MeasureExporter(this.plugin, engineId, { - target: "asset", - }); - - const { measures, total } = await exporter.search(id, { - endAt, - from, - query, - size, - sort, - startAt, - type, - }); + const { measures, total } = await this.measureExporter.search( + engineId, + { + endAt, + id, + query, + sort, + startAt, + type, + }, + { + from, + size, + } + ); return { measures, total }; } @@ -203,11 +229,7 @@ export class AssetsController { ) { const exportId = request.getString("exportId"); - const stream = new PassThrough(); - - const exporter = new MeasureExporter(this.plugin, engineId); - - const { id } = await exporter.getExport(exportId); + const { id } = await this.measureExporter.getExport(engineId, exportId); request.response.configure({ headers: { @@ -216,7 +238,7 @@ export class AssetsController { }, }); - exporter.sendExport(stream, exportId); + const stream = await this.measureExporter.sendExport(engineId, exportId); return new HttpStream(stream); } @@ -230,17 +252,54 @@ export class AssetsController { const sort = request.input.body?.sort; const type = request.input.args.type; - const exporter = new MeasureExporter(this.plugin, engineId, { - target: "asset", - }); + const link = await this.measureExporter.prepareExport( + engineId, + request.getUser(), + { + endAt, + id, + query, + sort, + startAt, + type, + } + ); - const { link } = await exporter.prepareExport(request.getUser(), id, { - endAt, - query, - sort, - startAt, - type, - }); + return { link }; + } + + async export(request: KuzzleRequest) { + const engineId = request.getString("engineId"); + + if ( + request.context.connection.protocol === "http" && + request.context.connection.misc.verb === "GET" + ) { + const exportId = request.getString("exportId"); + + request.response.configure({ + headers: { + "Content-Disposition": `attachment; filename="${InternalCollection.ASSETS}.csv"`, + "Content-Type": "text/csv", + }, + }); + + const stream = await this.exporter.sendExport(engineId, exportId); + + return new HttpStream(stream); + } + + const query = request.input.body?.query; + const sort = request.input.body?.sort; + + const link = await this.exporter.prepareExport( + engineId, + request.getUser(), + { + query, + sort, + } + ); return { link }; } diff --git a/lib/modules/asset/types/AssetApi.ts b/lib/modules/asset/types/AssetApi.ts index 670e0f80..b2e6a171 100644 --- a/lib/modules/asset/types/AssetApi.ts +++ b/lib/modules/asset/types/AssetApi.ts @@ -122,3 +122,15 @@ export interface ApiAssetExportMeasuresRequest extends AssetsControllerRequest { export type ApiAssetExportMeasuresResult = { link: string; }; + +export interface ApiAssetExportRequest extends AssetsControllerRequest { + action: "export"; + + body?: { + query?: JSONObject; + sort?: JSONObject; + }; +} +export type ApiAssetExportResult = { + link: string; +}; diff --git a/lib/modules/device/DevicesController.ts b/lib/modules/device/DevicesController.ts index 9b607468..e39c2573 100644 --- a/lib/modules/device/DevicesController.ts +++ b/lib/modules/device/DevicesController.ts @@ -5,11 +5,11 @@ import { KuzzleRequest, } from "kuzzle"; import _ from "lodash"; -import { PassThrough } from "node:stream"; import { AssetSerializer } from "../asset/model/AssetSerializer"; import { MeasureExporter, DecodedMeasurement } from "../measure"; -import { DeviceManagerPlugin } from "../plugin"; +import { DeviceManagerPlugin, InternalCollection } from "../plugin"; +import { DigitalTwinExporter } from "../shared"; import { DeviceService } from "./DeviceService"; import { DeviceSerializer } from "./model/DeviceSerializer"; @@ -29,6 +29,8 @@ import { export class DevicesController { public definition: ControllerDefinition; + private exporter: DigitalTwinExporter; + private measureExporter: MeasureExporter; constructor( private plugin: DeviceManagerPlugin, @@ -136,9 +138,31 @@ export class DevicesController { }, ], }, + export: { + handler: this.export.bind(this), + http: [ + { + path: "device-manager/:engineId/devices/_export/:exportId", + verb: "get", + }, + { + path: "device-manager/:engineId/devices/_export", + verb: "post", + }, + ], + }, }, }; /* eslint-enable sort-keys */ + + this.exporter = new DigitalTwinExporter( + this.plugin, + InternalCollection.DEVICES + ); + this.measureExporter = new MeasureExporter( + this.plugin, + InternalCollection.DEVICES + ); } async get(request: KuzzleRequest): Promise { @@ -332,19 +356,18 @@ export class DevicesController { const sort = request.input.body?.sort; const type = request.input.args.type; - const exporter = new MeasureExporter(this.plugin, engineId, { - target: "device", - }); - - const { measures, total } = await exporter.search(id, { - endAt, - from, - query, - size, - sort, - startAt, - type, - }); + const { measures, total } = await this.measureExporter.search( + engineId, + { + endAt, + id, + query, + sort, + startAt, + type, + }, + { from, size } + ); return { measures, total }; } @@ -358,11 +381,7 @@ export class DevicesController { ) { const exportId = request.getString("exportId"); - const stream = new PassThrough(); - - const exporter = new MeasureExporter(this.plugin, engineId); - - const { id } = await exporter.getExport(exportId); + const { id } = await this.measureExporter.getExport(engineId, exportId); request.response.configure({ headers: { @@ -371,7 +390,7 @@ export class DevicesController { }, }); - exporter.sendExport(stream, exportId); + const stream = await this.measureExporter.sendExport(engineId, exportId); return new HttpStream(stream); } @@ -385,17 +404,18 @@ export class DevicesController { const sort = request.input.body?.sort; const type = request.input.args.type; - const exporter = new MeasureExporter(this.plugin, engineId, { - target: "device", - }); - - const { link } = await exporter.prepareExport(request.getUser(), id, { - endAt, - query, - sort, - startAt, - type, - }); + const link = await this.measureExporter.prepareExport( + engineId, + request.getUser(), + { + endAt, + id, + query, + sort, + startAt, + type, + } + ); return { link }; } @@ -443,4 +463,40 @@ export class DevicesController { payloadUuids ); } + + async export(request: KuzzleRequest) { + const engineId = request.getString("engineId"); + + if ( + request.context.connection.protocol === "http" && + request.context.connection.misc.verb === "GET" + ) { + const exportId = request.getString("exportId"); + + request.response.configure({ + headers: { + "Content-Disposition": `attachment; filename="${InternalCollection.DEVICES}.csv"`, + "Content-Type": "text/csv", + }, + }); + + const stream = await this.exporter.sendExport(engineId, exportId); + + return new HttpStream(stream); + } + + const query = request.input.body?.query; + const sort = request.input.body?.sort; + + const link = await this.exporter.prepareExport( + engineId, + request.getUser(), + { + query, + sort, + } + ); + + return { link }; + } } diff --git a/lib/modules/device/types/DeviceApi.ts b/lib/modules/device/types/DeviceApi.ts index 1549abab..90a4975e 100644 --- a/lib/modules/device/types/DeviceApi.ts +++ b/lib/modules/device/types/DeviceApi.ts @@ -222,3 +222,15 @@ export interface ApiDeviceReceiveMeasuresRequest< }; } export type ApiDeviceReceiveMeasuresResult = void; + +export interface ApiDeviceExportRequest extends DevicesControllerRequest { + action: "export"; + + body?: { + query?: JSONObject; + sort?: JSONObject; + }; +} +export type ApiDeviceExportResult = { + link: string; +}; diff --git a/lib/modules/measure/MeasureExporter.ts b/lib/modules/measure/MeasureExporter.ts index 96aa283a..43dd5d5f 100644 --- a/lib/modules/measure/MeasureExporter.ts +++ b/lib/modules/measure/MeasureExporter.ts @@ -1,14 +1,5 @@ -import { randomUUID } from "node:crypto"; -import { PassThrough } from "node:stream"; - -import { stringify } from "csv-stringify/sync"; -import { - InternalError, - JSONObject, - KDocument, - NotFoundError, - User, -} from "kuzzle"; +import { UUID } from "node:crypto"; +import { InternalError, JSONObject, KHit, User } from "kuzzle"; import _ from "lodash"; import { @@ -16,81 +7,71 @@ import { AskModelDeviceGet, AskModelMeasureGet, } from "../model"; -import { DeviceManagerPlugin, InternalCollection } from "../plugin"; -import { DigitalTwinContent, ask, flattenObject } from "../shared"; +import { InternalCollection } from "../plugin"; +import { + AbstractExporter, + Column, + DigitalTwinContent, + ExportParams, + ask, + flattenObject, +} from "../shared"; import { NamedMeasures } from "../decoder"; import { MeasureContent } from "./exports"; -type ExportOptions = { - sort?: JSONObject; - query?: JSONObject; - from?: number; - size?: number; +interface MeasureSearchParams extends ExportParams { + id: string; + type?: string; startAt?: string; endAt?: string; - type?: string; -}; +} -const TWO_MINUTES = 2 * 60; +interface MeasuresSearchOptions { + from?: number; + size?: number; +} -type ExportParams = { - query: JSONObject; - target: "asset" | "device"; - model: string; - sort: JSONObject; +interface MeasureExportParams extends ExportParams { id: string; -}; - -type Column = { - header: string; - path: string; - isMeasure?: boolean; -}; - -export class MeasureExporter { - private target?: "asset" | "device"; + model: string; +} - private get sdk() { - return this.plugin.context.accessors.sdk; +export class MeasureExporter extends AbstractExporter { + protected exportRedisKey(engineId: string, exportId: string) { + return `exports:measures:${engineId}:${exportId}`; } - constructor( - private plugin: DeviceManagerPlugin, - private engineId: string, - { target }: { target?: "asset" | "device" } = {} + protected getLink( + engineId: string, + exportId: UUID, + params: MeasureExportParams ) { - this.target = target; + return `/_/device-manager/${engineId}/${this.target}/${params.id}/measures/_export/${exportId}`; } /** * Searches for measures and return them in a standard JSON */ async search( - digitalTwinId: string, - { - size = 25, - from = 0, - endAt, - startAt, - query, - sort = { measuredAt: "desc" }, - type, - }: ExportOptions + engineId: string, + params: MeasureSearchParams, + options?: MeasuresSearchOptions ) { - const { searchQuery } = await this.prepareMeasureSearch( - digitalTwinId, - startAt, - endAt, - query, - type - ); + const searchQuery = this.prepareMeasureSearch(params); const result = await this.sdk.document.search( - this.engineId, + engineId, InternalCollection.MEASURES, - { query: searchQuery, sort }, - { from, lang: "koncorde", size: size } + { + query: searchQuery, + sort: params.sort ?? { measuredAt: "desc" }, + }, + { + from: options?.from ?? 0, + lang: "koncorde", + size: options?.size ?? 25, + } ); return { measures: result.hits, total: result.total }; @@ -106,54 +87,40 @@ export class MeasureExporter { * Never return a rejected promise and write potential error on the stream */ async prepareExport( + engineId: string, user: User, - digitalTwinId: string, - { - endAt, - startAt, - query, - sort = { measuredAt: "desc" }, - type, - }: ExportOptions + params: MeasureSearchParams ) { - const { digitalTwin, searchQuery } = await this.prepareMeasureSearch( - digitalTwinId, - startAt, - endAt, - query, - type + const digitalTwin = await this.sdk.document.get( + engineId, + this.target, + params.id ); - const exportParams: ExportParams = { - id: digitalTwinId, + const searchQuery = this.prepareMeasureSearch(params); + + const exportParams: MeasureExportParams = { + id: params.id, model: digitalTwin._source.model, query: searchQuery, - sort, - target: this.target, + sort: params.sort ?? { measuredAt: "desc" }, }; - const exportId = randomUUID(); - - await this.sdk.ms.setex( - this.exportRedisKey(exportId), - JSON.stringify(exportParams), - TWO_MINUTES - ); - - let link = `/_/device-manager/${this.engineId}/${this.target}s/${digitalTwinId}/measures/_export/${exportId}`; - - if (user._id !== "-1") { - const { result } = await this.sdk.as(user).query({ - action: "createToken", - controller: "auth", - expiresIn: TWO_MINUTES * 1000, - singleUse: true, - }); + return super.prepareExport(engineId, user, exportParams); + } - link += `?jwt=${result.token}`; - } + protected formatHit(columns: Column[], hit: KHit) { + return columns.map(({ header: measureName, isMeasure, path }) => { + if ( + isMeasure && + this.target === InternalCollection.ASSETS && + hit._source.asset?.measureName !== measureName + ) { + return null; + } - return { link }; + return _.get(hit, path, null); + }); } /** @@ -162,20 +129,22 @@ export class MeasureExporter { * This method never returns a rejected promise, but write potential error in * the stream. */ - async sendExport(stream: PassThrough, exportId: string) { + async sendExport(engineId: string, exportId: string) { try { - const { query, sort, model, target } = await this.getExport(exportId); + const { query, sort, model } = await this.getExport(engineId, exportId); - let result = await this.sdk.document.search( - this.engineId, + const result = await this.sdk.document.search( + engineId, InternalCollection.MEASURES, { query, sort }, { lang: "koncorde", size: 200 } ); - const engine = await this.getEngine(); + const targetModel = + this.target === InternalCollection.ASSETS ? "asset" : "device"; + const engine = await this.getEngine(engineId); const modelDocument = await ask( - `ask:device-manager:model:${target}:get`, + `ask:device-manager:model:${targetModel}:get`, { engineGroup: engine.group, model, @@ -183,7 +152,7 @@ export class MeasureExporter { ); const measureColumns = await this.generateMeasureColumns( - modelDocument[target].measures + modelDocument[targetModel].measures ); const columns: Column[] = [ @@ -197,35 +166,12 @@ export class MeasureExporter { ...measureColumns, ]; - stream.write(stringify([columns.map((column) => column.header)])); - - while (result) { - for (const hit of result.hits) { - stream.write( - stringify([ - columns.map(({ header: measureName, isMeasure, path }) => { - if ( - isMeasure && - target === "asset" && - hit._source.asset?.measureName !== measureName - ) { - return null; - } - - return _.get(hit, path, null); - }), - ]) - ); - } - - result = await result.next(); - } + const stream = this.getExportStream(result, columns); + await this.sdk.ms.del(this.exportRedisKey(engineId, exportId)); - await this.sdk.ms.del(this.exportRedisKey(exportId)); + return stream; } catch (error) { - stream.write(error.message); - } finally { - stream.end(); + this.log.error(error); } } @@ -287,40 +233,11 @@ export class MeasureExporter { return measures; } - async getExport(exportId: string) { - const exportParams = await this.sdk.ms.get(this.exportRedisKey(exportId)); - - if (!exportParams) { - throw new NotFoundError(`Export "${exportId}" not found or expired.`); - } - - return JSON.parse(exportParams) as ExportParams; - } - - private exportRedisKey(exportId: string) { - return `exports:measures:${this.engineId}:${exportId}`; - } - - private async prepareMeasureSearch( - digitalTwinId: string, - startAt?: string, - endAt?: string, - query?: JSONObject, - type?: string - ) { + private prepareMeasureSearch(params: MeasureSearchParams) { if (!this.target) { throw new InternalError('Missing "target" parameter'); } - const digitalTwin: KDocument = - await this.sdk.document.get( - this.engineId, - this.target === "asset" - ? InternalCollection.ASSETS - : InternalCollection.DEVICES, - digitalTwinId - ); - const measuredAtRange = { range: { measuredAt: { @@ -330,46 +247,36 @@ export class MeasureExporter { }, }; - if (startAt) { - measuredAtRange.range.measuredAt.gte = new Date(startAt).getTime(); + if (params.startAt) { + measuredAtRange.range.measuredAt.gte = new Date(params.startAt).getTime(); } - if (endAt) { - measuredAtRange.range.measuredAt.lte = new Date(endAt).getTime(); + if (params.endAt) { + measuredAtRange.range.measuredAt.lte = new Date(params.endAt).getTime(); } const searchQuery: JSONObject = { and: [measuredAtRange], }; - if (this.target === "asset") { + if (this.target === InternalCollection.ASSETS) { searchQuery.and.push({ - equals: { "asset._id": digitalTwinId }, + equals: { "asset._id": params.id }, }); } else { searchQuery.and.push({ - equals: { "origin._id": digitalTwinId }, + equals: { "origin._id": params.id }, }); } - if (type) { - searchQuery.and.push({ equals: { type } }); + if (params.type) { + searchQuery.and.push({ equals: { type: params.type } }); } - if (query) { - searchQuery.and.push(query); + if (params.query) { + searchQuery.and.push(params.query); } - return { digitalTwin, searchQuery }; - } - - private async getEngine(): Promise { - const engine = await this.sdk.document.get( - this.plugin.config.adminIndex, - InternalCollection.CONFIG, - `engine-device-manager--${this.engineId}` - ); - - return engine._source.engine; + return searchQuery; } } diff --git a/lib/modules/shared/index.ts b/lib/modules/shared/index.ts index 30299641..26403909 100644 --- a/lib/modules/shared/index.ts +++ b/lib/modules/shared/index.ts @@ -1,3 +1,4 @@ +export * from "./services"; export * from "./Module"; export * from "./types/DigitalTwinContent"; export * from "./types/Metadata"; diff --git a/lib/modules/shared/services/AbstractExporter.ts b/lib/modules/shared/services/AbstractExporter.ts new file mode 100644 index 00000000..b2bdfb60 --- /dev/null +++ b/lib/modules/shared/services/AbstractExporter.ts @@ -0,0 +1,159 @@ +import { UUID, randomUUID } from "node:crypto"; +import { PassThrough, Readable } from "node:stream"; +import { stringify } from "csv-stringify/sync"; +import { + JSONObject, + KDocumentContentGeneric, + KHit, + NotFoundError, + SearchResult, + User, +} from "kuzzle"; +import { EngineContent } from "kuzzle-plugin-commons"; +import _ from "lodash"; + +import { DeviceManagerPlugin, InternalCollection } from "../../plugin"; + +export interface ExporterOption { + /** + * Expiration time of export in seconds before being invalid + */ + expireTime: number; +} + +export interface ExportParams { + query: JSONObject; + sort?: JSONObject; +} + +export interface Column { + header: string; + path: string; + isMeasure?: boolean; +} + +export abstract class AbstractExporter

{ + protected config: ExporterOption = { + expireTime: 2 * 60, + }; + + constructor( + protected plugin: DeviceManagerPlugin, + protected target: InternalCollection, + config: Partial = {} + ) { + if (Object.keys(config).length > 0) { + this.config = { + ...this.config, + ...config, + }; + } + } + + protected get sdk() { + return this.plugin.context.accessors.sdk; + } + + protected get ms() { + return this.sdk.ms; + } + + protected get log() { + return this.plugin.context.log; + } + + protected async getEngine(engineId: string): Promise { + const engine = await this.sdk.document.get<{ engine: EngineContent }>( + this.plugin.config.adminIndex, + InternalCollection.CONFIG, + `engine-device-manager--${engineId}` + ); + + return engine._source.engine; + } + + protected abstract exportRedisKey(engineId: string, exportId: string): string; + + protected abstract getLink( + engineId: string, + exportId: UUID, + params: P + ): string; + + /** + * Retrieve a prepared export and write each document as a CSV in the stream + * + * This method never returns a rejected promise, but write potential error in + * the stream. + */ + abstract sendExport(engineId: string, exportId: string): Promise; + + async prepareExport(engineId: string, user: User, params: P) { + const exportId = randomUUID(); + + await this.ms.setex( + this.exportRedisKey(engineId, exportId), + JSON.stringify(params), + this.config.expireTime + ); + + let link = this.getLink(engineId, exportId, params); + if (user._id !== "-1") { + const { result } = await this.sdk.as(user).query({ + action: "createToken", + controller: "auth", + expiresIn: this.config.expireTime * 1000, + singleUse: true, + }); + + link += `?jwt=${result.token}`; + } + + return link; + } + + protected formatHit( + columns: Column[], + hit: KHit + ): string[] { + return columns.map(({ path }) => _.get(hit, path, null)); + } + + async getExport(engineId: string, exportId: string): Promise

{ + const exportParams = await this.sdk.ms.get( + this.exportRedisKey(engineId, exportId) + ); + + if (!exportParams) { + throw new NotFoundError(`Export "${exportId}" not found or expired.`); + } + + return JSON.parse(exportParams); + } + + async getExportStream( + request: SearchResult>, + columns: Column[] + ) { + const stream = new PassThrough(); + + let result = request; + try { + stream.write(stringify([columns.map((column) => column.header)])); + + while (result) { + for (const hit of result.hits) { + stream.write(stringify([this.formatHit(columns, hit)])); + } + + result = await result.next(); + } + } catch (error) { + stream.write(error.message); + } finally { + stream.end(); + } + + return stream; + } +} diff --git a/lib/modules/shared/services/DigitalTwinExporter.ts b/lib/modules/shared/services/DigitalTwinExporter.ts new file mode 100644 index 00000000..30483fc8 --- /dev/null +++ b/lib/modules/shared/services/DigitalTwinExporter.ts @@ -0,0 +1,140 @@ +import { UUID } from "node:crypto"; +import { JSONObject } from "kuzzle"; + +import { NamedMeasures } from "../../decoder"; +import { InternalCollection } from "../../plugin"; +import { + AskModelMeasureGet, + AssetModelContent, + DeviceModelContent, +} from "../../model"; +import { DigitalTwinContent, ask, flattenObject } from "../"; +import { AbstractExporter, Column } from "./AbstractExporter"; + +interface MeasureColumn extends Column { + isMeasure: boolean; +} + +export class DigitalTwinExporter extends AbstractExporter { + protected exportRedisKey(engineId: string, exportId: string) { + return `exports:${engineId}:${this.target}:${exportId}`; + } + + protected getLink(engineId: string, exportId: UUID) { + return `/_/device-manager/${engineId}/${this.target}/_export/${exportId}`; + } + + async sendExport(engineId: string, exportId: string) { + try { + const { query, sort } = await this.getExport(engineId, exportId); + + const digitalTwins = await this.sdk.document.search( + engineId, + this.target, + { query, sort }, + { lang: "koncorde", size: 200 } + ); + + const namedMeasures = await this.getNamedMeasures(engineId); + const measureColumns = await this.generateMeasureColumns(namedMeasures); + + const columns: Column[] = [ + { header: "Model", path: "_source.model" }, + { header: "Reference", path: "_source.reference" }, + ...measureColumns, + { header: "lastMeasuredAt", path: "_source.lastMeasuredAt" }, + ]; + + const stream = this.getExportStream(digitalTwins, columns); + await this.sdk.ms.del(this.exportRedisKey(engineId, exportId)); + + return stream; + } catch (error) { + this.log.error(error); + } + } + + /** + * Get the deduplicated Named Measures get from models + */ + private async getNamedMeasures(engineId: string): Promise { + const type = this.target === InternalCollection.ASSETS ? "asset" : "device"; + const query: JSONObject = { + and: [{ equals: { type } }], + }; + + if (this.target === InternalCollection.ASSETS) { + const engine = await this.getEngine(engineId); + query.and.push({ equals: { engineGroup: engine.group } }); + } + + let result = await this.sdk.document.search< + AssetModelContent | DeviceModelContent + >( + this.plugin.config.adminIndex, + InternalCollection.MODELS, + { + query: { equals: { type } }, + }, + { lang: "koncorde" } + ); + + // ? Use a map to dedup the NamedMeasures get from models + const namedMeasures = new Map(); + while (result) { + for (const { _source } of result.hits) { + for (const namedMeasure of _source[type].measures as NamedMeasures) { + if (!namedMeasures.has(namedMeasure.name)) { + namedMeasures.set(namedMeasure.name, namedMeasure); + } + } + } + result = await result.next(); + } + + // ? Ensure stable measures order + return [...namedMeasures.values()].sort((a, b) => { + if (a.name < b.name) { + return -1; + } + if (a.name > b.name) { + return 1; + } + return 0; + }); + } + + private async generateMeasureColumns( + namedMeasures: NamedMeasures + ): Promise { + const columns: MeasureColumn[] = []; + + const measuresPath = new Map(); + for (const { name, type } of namedMeasures) { + if (!measuresPath.has(name)) { + const { measure: measureDefinition } = await ask( + "ask:device-manager:model:measure:get", + { type } + ); + + const flattenMeasuresPath = Object.keys( + flattenObject(measureDefinition.valuesMappings) + ).map((path) => path.replace(".type", "").replace(".properties", "")); + + measuresPath.set(name, flattenMeasuresPath); + } + + for (const path of measuresPath.get(name)) { + const header = `${name}.${path}`.replace(`${name}.${type}`, name); + + columns.push({ + header, + isMeasure: true, + path: `_source.measures.${name}.values.${path}`, + }); + } + } + + return columns; + } +} diff --git a/lib/modules/shared/services/index.ts b/lib/modules/shared/services/index.ts new file mode 100644 index 00000000..2fb46bbf --- /dev/null +++ b/lib/modules/shared/services/index.ts @@ -0,0 +1,2 @@ +export * from "./AbstractExporter"; +export * from "./DigitalTwinExporter"; diff --git a/package-lock.json b/package-lock.json index d9e55f69..3ddad9e0 100644 --- a/package-lock.json +++ b/package-lock.json @@ -25,6 +25,7 @@ "@types/lodash": "^4.14.195", "@types/node": "^18.15.13", "axios": "^1.3.6", + "csv-parse": "^5.4.0", "cz-conventional-changelog": "^3.3.0", "ergol": "^1.0.2", "eslint-plugin-jest": "^27.2.1", @@ -4601,6 +4602,12 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/csv-parse": { + "version": "5.4.0", + "resolved": "https://registry.npmjs.org/csv-parse/-/csv-parse-5.4.0.tgz", + "integrity": "sha512-JiQosUWiOFgp4hQn0an+SBoV9IKdqzhROM0iiN4LB7UpfJBlsSJlWl9nq4zGgxgMAzHJ6V4t29VAVD+3+2NJAg==", + "dev": true + }, "node_modules/csv-stringify": { "version": "6.3.2", "resolved": "https://registry.npmjs.org/csv-stringify/-/csv-stringify-6.3.2.tgz", diff --git a/package.json b/package.json index cba692c5..84de52b5 100644 --- a/package.json +++ b/package.json @@ -40,6 +40,7 @@ "@types/lodash": "^4.14.195", "@types/node": "^18.15.13", "axios": "^1.3.6", + "csv-parse": "^5.4.0", "cz-conventional-changelog": "^3.3.0", "ergol": "^1.0.2", "eslint-plugin-jest": "^27.2.1", diff --git a/tests/fixtures/fixtures.ts b/tests/fixtures/fixtures.ts index 1ffa831c..251de19a 100644 --- a/tests/fixtures/fixtures.ts +++ b/tests/fixtures/fixtures.ts @@ -58,6 +58,31 @@ const deviceAyseUnlinked3 = { }; const deviceAyseUnlinked3Id = `${deviceAyseUnlinked3.model}-${deviceAyseUnlinked3.reference}`; +const deviceAyseWarehouse = { + model: "DummyTempPosition", + reference: "warehouse", + metadata: {}, + measures: {}, + engineId: "engine-ayse", + assetId: "Warehouse-linked", +}; +const deviceAyseWarehouseId = `${deviceAyseWarehouse.model}-${deviceAyseWarehouse.reference}`; + +const assetAyseWarehouseLinked = { + model: "Warehouse", + reference: "linked", + metadata: { + surface: 512, + }, + linkedDevices: [ + { + measureNames: [{ asset: "position", device: "position" }], + _id: "DummyTempPosition-warehouse", + }, + ], +}; +const assetAyseWarehouseLinkedId = `${assetAyseWarehouseLinked.model}-${assetAyseWarehouseLinked.reference}`; + const assetAyseLinked1 = { model: "Container", reference: "linked1", @@ -170,6 +195,9 @@ export default { { index: { _id: deviceAyseUnlinked3Id } }, deviceAyseUnlinked3, + + { index: { _id: deviceAyseWarehouseId } }, + deviceAyseWarehouse, ], }, @@ -190,6 +218,9 @@ export default { { index: { _id: deviceAyseUnlinked3Id } }, deviceAyseUnlinked3, + + { index: { _id: deviceAyseWarehouseId } }, + deviceAyseWarehouse, ], assets: [ { index: { _id: assetAyseLinked1Id } }, @@ -206,6 +237,9 @@ export default { { index: { _id: assetAyseGroupedId2 } }, assetAyseGrouped2, + + { index: { _id: assetAyseWarehouseLinkedId } }, + assetAyseWarehouseLinked, ], ...assetGroupFixtures, }, diff --git a/tests/scenario/modules/assets/action-export-measures.test.ts b/tests/scenario/modules/assets/action-export-measures.test.ts index ae6de7ea..c45482c2 100644 --- a/tests/scenario/modules/assets/action-export-measures.test.ts +++ b/tests/scenario/modules/assets/action-export-measures.test.ts @@ -1,5 +1,5 @@ import axios from "axios"; -import { writeFileSync } from "fs"; +import { parse as csvParse } from "csv-parse/sync"; import { ApiAssetExportMeasuresRequest } from "../../../../index"; @@ -63,8 +63,6 @@ describe("AssetsController:exportMeasures", () => { response.data.on("end", resolve); }); - writeFileSync("./asset.csv", csv.join("")); - expect(csv).toHaveLength(5); expect(csv[0]).toBe( "Payload Id,Measured At,Measure Type,Device Id,Device Model,Asset Id,Asset Model,temperatureExt,temperatureInt,position,position.accuracy,position.altitude,temperatureWeather\n" @@ -81,7 +79,8 @@ describe("AssetsController:exportMeasures", () => { temperatureInt, position, temperatureWeather, - ] = csv[1].replace("\n", "").split(","); + ] = csvParse(csv[1])[0]; + expect(typeof payloadId).toBe("string"); expect(typeof parseFloat(measuredAt)).toBe("number"); expect(measureType).toBe("temperature"); diff --git a/tests/scenario/modules/assets/action-export.test.ts b/tests/scenario/modules/assets/action-export.test.ts new file mode 100644 index 00000000..ec15ab8a --- /dev/null +++ b/tests/scenario/modules/assets/action-export.test.ts @@ -0,0 +1,110 @@ +import { writeFileSync } from "node:fs"; +import axios from "axios"; +import { parse as csvParse } from "csv-parse/sync"; + +import { ApiAssetExportRequest, ApiAssetExportResult } from "../../../../index"; + +import { sendDummyTempPositionPayloads, setupHooks } from "../../../helpers"; +import fixtures from "../../../fixtures/fixtures"; + +const assetCount = fixtures["engine-ayse"].assets.length / 2; +jest.setTimeout(10000); + +function getExportedColums(row) { + const parsedRow = csvParse(row)[0]; + + return { + model: parsedRow[0], + reference: parsedRow[1], + position: parsedRow[2], + positionAccuracy: parsedRow[3], + positionAltitude: parsedRow[4], + temperatureExt: parsedRow[5], + temperatureInt: parsedRow[6], + temperatureWeather: parsedRow[7], + lastMeasuredAt: parsedRow[8], + }; +} + +describe("AssetsController:exportMeasures", () => { + const sdk = setupHooks(); + + it("should prepare export of different assets types and return a CSV as stream", async () => { + await sendDummyTempPositionPayloads(sdk, [ + { + deviceEUI: "warehouse", + temperature: 23.3, + location: { lat: 42.2, lon: 2.42, accuracy: 2100 }, + battery: 0.8, + // ? Use date now - 1s to ensure this asset are second in export + measuredAt: Date.now() - 2000, + }, + { + deviceEUI: "linked2", + temperature: 23.3, + location: { lat: 42.2, lon: 2.42, accuracy: 2100 }, + battery: 0.8, + // ? Use date now to ensure this asset is first in export + measuredAt: Date.now(), + }, + ]); + await sdk.collection.refresh("engine-ayse", "assets"); + const { result } = await sdk.query< + ApiAssetExportRequest, + ApiAssetExportResult + >({ + controller: "device-manager/assets", + action: "export", + engineId: "engine-ayse", + body: { + sort: { lastMeasuredAt: "desc" }, + }, + }); + + expect(typeof result.link).toBe("string"); + + const response = await axios.get("http://localhost:7512" + result.link, { + responseType: "stream", + }); + + const csv = []; + response.data.on("data", (chunk) => { + csv.push(chunk.toString()); + }); + await new Promise((resolve) => { + response.data.on("end", resolve); + }); + + writeFileSync("./assets.csv", csv.join("")); + + expect(csv[0]).toBe( + "Model,Reference,position,position.accuracy,position.altitude,temperatureExt,temperatureInt,temperatureWeather,lastMeasuredAt\n" + ); + + expect(csv).toHaveLength(assetCount + 1); + + const row1 = getExportedColums(csv[1]); + + expect(row1.model).toBe("Container"); + expect(typeof row1.reference).toBe("string"); + expect(typeof row1.position).toBe("string"); + expect(typeof parseFloat(row1.positionAccuracy)).toBe("number"); + expect(typeof parseFloat(row1.positionAltitude)).toBe("number"); + expect(typeof parseFloat(row1.temperatureExt)).toBe("number"); + expect(typeof parseFloat(row1.temperatureInt)).toBe("number"); + expect(typeof parseFloat(row1.temperatureWeather)).toBe("number"); + expect(typeof parseFloat(row1.lastMeasuredAt)).toBe("number"); + + const row2 = getExportedColums(csv[2]); + + expect(row2.model).toBe("Warehouse"); + expect(typeof row2.reference).toBe("string"); + expect(typeof row2.position).toBe("string"); + expect(typeof parseFloat(row2.positionAccuracy)).toBe("number"); + expect(typeof parseFloat(row2.positionAltitude)).toBe("number"); + expect(typeof parseFloat(row2.temperatureExt)).toBe("number"); + expect(typeof parseFloat(row2.temperatureInt)).toBe("number"); + expect(typeof parseFloat(row2.temperatureWeather)).toBe("number"); + expect(typeof parseFloat(row2.lastMeasuredAt)).toBe("number"); + }); +}); diff --git a/tests/scenario/modules/devices/action-export-measures.test.ts b/tests/scenario/modules/devices/action-export-measures.test.ts index cb3482ba..1013d9e1 100644 --- a/tests/scenario/modules/devices/action-export-measures.test.ts +++ b/tests/scenario/modules/devices/action-export-measures.test.ts @@ -1,4 +1,5 @@ import axios from "axios"; +import { parse as csvParse } from "csv-parse/sync"; import { ApiDeviceExportMeasuresRequest } from "../../../../index"; @@ -72,6 +73,7 @@ describe("DevicesController:exportMeasures", () => { expect(csv[0]).toBe( "Payload Id,Measured At,Measure Type,Device Id,Device Model,Asset Id,Asset Model,temperature,accelerationSensor.x,accelerationSensor.y,accelerationSensor.z,accelerationSensor.accuracy,battery\n" ); + const [ payloadId, measuredAt, @@ -81,7 +83,7 @@ describe("DevicesController:exportMeasures", () => { assetId, assetModel, temperature, - ] = csv[1].split(","); + ] = csvParse(csv[1])[0]; const [ , , @@ -95,7 +97,7 @@ describe("DevicesController:exportMeasures", () => { accelerationY, accelerationZ, accelerationAccuracy, - ] = csv[2].split(","); + ] = csvParse(csv[2])[0]; expect(typeof payloadId).toBe("string"); expect(typeof parseFloat(measuredAt)).toBe("number"); diff --git a/tests/scenario/modules/devices/action-export.test.ts b/tests/scenario/modules/devices/action-export.test.ts new file mode 100644 index 00000000..7684f3e9 --- /dev/null +++ b/tests/scenario/modules/devices/action-export.test.ts @@ -0,0 +1,124 @@ +import axios from "axios"; +import { parse as csvParse } from "csv-parse/sync"; + +import { + ApiDeviceExportRequest, + ApiDeviceExportResult, +} from "../../../../index"; + +import { + sendDummyTempPositionPayloads, + sendDummyTempPayloads, + setupHooks, +} from "../../../helpers"; +import fixtures from "../../../fixtures/fixtures"; + +const deviceCount = fixtures["engine-ayse"].devices.length / 2; +jest.setTimeout(10000); + +function getExportedColums(row) { + const parsedRow = csvParse(row)[0]; + + return { + model: parsedRow[0], + reference: parsedRow[1], + accelerationSensorX: parsedRow[2], + accelerationSensorY: parsedRow[3], + accelerationSensorZ: parsedRow[4], + accelerationSensorAccuracy: parsedRow[5], + battery: parsedRow[6], + position: parsedRow[7], + positionAccuracy: parsedRow[8], + positionAltitude: parsedRow[9], + temperature: parsedRow[10], + lastMeasuredAt: parsedRow[11], + }; +} + +describe("AssetsController:exportMeasures", () => { + const sdk = setupHooks(); + + it("should prepare export of different devices types and return a CSV as stream", async () => { + await sendDummyTempPayloads(sdk, [ + { + deviceEUI: "linked1", + temperature: 23.3, + battery: 0.8, + // ? Use date now - 1s to ensure this asset are second in export + measuredAt: Date.now() - 1000, + }, + ]); + await sendDummyTempPositionPayloads(sdk, [ + { + deviceEUI: "linked2", + temperature: 23.3, + location: { lat: 42.2, lon: 2.42, accuracy: 2100 }, + battery: 0.8, + // ? Use date now to ensure this asset is first in export + measuredAt: Date.now(), + }, + ]); + await sdk.collection.refresh("engine-ayse", "devices"); + const { result } = await sdk.query< + ApiDeviceExportRequest, + ApiDeviceExportResult + >({ + controller: "device-manager/devices", + action: "export", + engineId: "engine-ayse", + body: { + sort: { lastMeasuredAt: "desc" }, + }, + }); + + expect(typeof result.link).toBe("string"); + + const response = await axios.get("http://localhost:7512" + result.link, { + responseType: "stream", + }); + + const csv = []; + response.data.on("data", (chunk) => { + csv.push(chunk.toString()); + }); + await new Promise((resolve) => { + response.data.on("end", resolve); + }); + + expect(csv[0]).toBe( + "Model,Reference,accelerationSensor.x,accelerationSensor.y,accelerationSensor.z,accelerationSensor.accuracy,battery,position,position.accuracy,position.altitude,temperature,lastMeasuredAt\n" + ); + + expect(csv).toHaveLength(deviceCount + 1); + + const row1 = getExportedColums(csv[1]); + + expect(row1.model).toBe("DummyTempPosition"); + expect(typeof row1.reference).toBe("string"); + expect(typeof parseFloat(row1.accelerationSensorX)).toBe("number"); + expect(typeof parseFloat(row1.accelerationSensorY)).toBe("number"); + expect(typeof parseFloat(row1.accelerationSensorZ)).toBe("number"); + expect(typeof parseFloat(row1.accelerationSensorAccuracy)).toBe("number"); + expect(typeof parseFloat(row1.battery)).toBe("number"); + expect(typeof row1.position).toBe("string"); + expect(typeof parseFloat(row1.positionAccuracy)).toBe("number"); + expect(typeof parseFloat(row1.positionAltitude)).toBe("number"); + expect(typeof parseFloat(row1.temperature)).toBe("number"); + expect(typeof parseFloat(row1.lastMeasuredAt)).toBe("number"); + + const row2 = getExportedColums(csv[1]); + + expect(row2.model).toBe("DummyTempPosition"); + expect(typeof row2.reference).toBe("string"); + expect(typeof parseFloat(row2.accelerationSensorX)).toBe("number"); + expect(typeof parseFloat(row2.accelerationSensorY)).toBe("number"); + expect(typeof parseFloat(row2.accelerationSensorZ)).toBe("number"); + expect(typeof parseFloat(row2.accelerationSensorAccuracy)).toBe("number"); + expect(typeof parseFloat(row2.battery)).toBe("number"); + expect(typeof row2.position).toBe("string"); + expect(typeof parseFloat(row2.positionAccuracy)).toBe("number"); + expect(typeof parseFloat(row2.positionAltitude)).toBe("number"); + expect(typeof parseFloat(row2.temperature)).toBe("number"); + expect(typeof parseFloat(row2.lastMeasuredAt)).toBe("number"); + }); +});