Skip to content

Commit

Permalink
feat(orchestrator): fetch data input schema from `/management/process…
Browse files Browse the repository at this point in the history
…es` (janus-idp#45)

* Fetch data input schema from /management/processes

* Fix some code smells

* Rename WorkflowProcess -> WorkflowInfo
  • Loading branch information
caponetto authored Dec 1, 2023
1 parent c4ddb7c commit cc64e09
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 192 deletions.
2 changes: 1 addition & 1 deletion plugins/orchestrator-backend/config.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export interface Config {
port?: string;
/**
* Whether to start the Sonata Flow service automatically.
* If set to `false`, the plugin assumes that the SonataFlow service is already running on `baseUrl`:`port` (or just `baseUrl` if `port` is not set)`.
* If set to `false`, the plugin assumes that the SonataFlow service is already running on `baseUrl`:`port` (or just `baseUrl` if `port` is not set).
* Default: false
*/
autoStart?: boolean;
Expand Down
5 changes: 1 addition & 4 deletions plugins/orchestrator-backend/src/service/DataIndexService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,7 @@ export class DataIndexService {
public static getNewGraphQLClient(
dataIndexUrl = DEFAULT_DATA_INDEX_URL,
): Client {
const diURL =
this.backendExecCtx && this.backendExecCtx.dataIndexUrl
? this.backendExecCtx.dataIndexUrl
: dataIndexUrl;
const diURL = this.backendExecCtx?.dataIndexUrl ?? dataIndexUrl;
return new Client({
url: diURL,
exchanges: [cacheExchange, fetchExchange],
Expand Down
133 changes: 1 addition & 132 deletions plugins/orchestrator-backend/src/service/DataInputSchemaService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@ import { JSONSchema4 } from 'json-schema';
import { OpenAPIV3 } from 'openapi-types';
import { Logger } from 'winston';

import {
WorkflowDataInputSchema,
WorkflowDefinition,
} from '@janus-idp/backstage-plugin-orchestrator-common';
import { WorkflowDefinition } from '@janus-idp/backstage-plugin-orchestrator-common';

type OpenApiSchemaProperties = {
[k: string]: OpenAPIV3.SchemaObject | OpenAPIV3.ReferenceObject;
Expand Down Expand Up @@ -1195,132 +1192,4 @@ export class DataInputSchemaService {

return inputVariableSet;
}

public async resolveDataInputSchema(args: {
openApi: OpenAPIV3.Document;
workflowId: string;
}): Promise<WorkflowDataInputSchema | undefined> {
const requestBody =
args.openApi.paths[`/${args.workflowId}`]?.post?.requestBody;
if (!requestBody) {
return undefined;
}

const content = (requestBody as OpenAPIV3.RequestBodyObject).content;
if (!content) {
return undefined;
}

const mainSchema = content[`application/json`]?.schema;
if (!mainSchema) {
return undefined;
}

const referencedSchemas = this.findReferencedSchemas({
workflowId: args.workflowId,
openApi: args.openApi,
schema: mainSchema as OpenAPIV3.SchemaObject,
});

const compositionSchema = {
...mainSchema,
title: '',
properties: {
...referencedSchemas.reduce(
(obj, s) => {
if (obj && s.title) {
obj[s.title] = {
$ref: `#/components/schemas/${s.title}`,
};
}
return obj;
},
{} as WorkflowDataInputSchema['properties'],
),
},
};

const dataInputSchema: WorkflowDataInputSchema = {
...compositionSchema,
properties: referencedSchemas.reduce(
(obj, s) => {
if (obj) {
obj[s.title!] = {
$ref: `#/components/schemas/${s.title!}`,
};
}
return obj;
},
{} as WorkflowDataInputSchema['properties'],
),
components: {
schemas: referencedSchemas.reduce(
(obj, s) => {
obj[s.title!] = s as OpenAPIV3.NonArraySchemaObject;
return obj;
},
{} as WorkflowDataInputSchema['components']['schemas'],
),
},
};

return dataInputSchema;
}

private findReferencedSchemas(args: {
workflowId: string;
openApi: OpenAPIV3.Document;
schema: JSONSchema4;
}): JSONSchema4[] {
if (!args.schema.properties) {
return [];
}

const schemas: JSONSchema4[] = [];

for (const key of Object.keys(args.schema.properties)) {
const property = args.schema.properties[key];
if (!property.$ref) {
continue;
}
const referencedSchema = this.findReferencedSchema({
rootSchema: args.openApi,
ref: property.$ref,
});
if (referencedSchema) {
schemas.push({
...referencedSchema,
title: referencedSchema
.title!.replace(`${args.workflowId}:`, '')
.trim(),
});
}
}

if (!schemas.length) {
return [args.schema];
}

return schemas;
}

private findReferencedSchema(args: {
rootSchema: JSONSchema4;
ref: string;
}): JSONSchema4 | undefined {
const pathParts = args.ref
.split('/')
.filter(part => !['#', ''].includes(part));

let current: JSONSchema4 | undefined = args.rootSchema;

for (const part of pathParts) {
current = current?.[part];
if (current === undefined) {
return undefined;
}
}

return current;
}
}
62 changes: 42 additions & 20 deletions plugins/orchestrator-backend/src/service/SonataFlowService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
ProcessInstance,
WorkflowDefinition,
WorkflowExecutionResponse,
WorkflowInfo,
WorkflowItem,
WorkflowOverview,
} from '@janus-idp/backstage-plugin-orchestrator-common';
Expand Down Expand Up @@ -91,17 +92,17 @@ export class SonataFlowService {
workflowId: string,
): Promise<string | undefined> {
try {
const response = await executeWithRetry(() =>
fetch(`${this.url}/management/processes/${workflowId}/sources`),
);
const urlToFetch = `${this.url}/management/processes/${workflowId}/sources`;
const response = await executeWithRetry(() => fetch(urlToFetch));

if (response.ok) {
const json = (await response.json()) as SonataFlowSource[];
// Assuming only one source in the list
return json.pop()?.uri;
}
const responseStr = JSON.stringify(response);
this.logger.error(
`Response was NOT okay when fetch(${this.url}/management/processes/${workflowId}/sources). Received response: ${response}`,
`Response was NOT okay when fetch(${urlToFetch}). Received response: ${responseStr}`,
);
} catch (error) {
this.logger.error(`Error when fetching workflow uri: ${error}`);
Expand All @@ -110,19 +111,40 @@ export class SonataFlowService {
return undefined;
}

public async fetchWorkflowInfo(
workflowId: string,
): Promise<WorkflowInfo | undefined> {
try {
const urlToFetch = `${this.url}/management/processes/${workflowId}`;
const response = await executeWithRetry(() => fetch(urlToFetch));

if (response.ok) {
return await response.json();
}
const responseStr = JSON.stringify(response);
this.logger.error(
`Response was NOT okay when fetch(${urlToFetch}). Received response: ${responseStr}`,
);
} catch (error) {
this.logger.error(`Error when fetching workflow info: ${error}`);
}

return undefined;
}

public async fetchWorkflowSource(
workflowId: string,
): Promise<string | undefined> {
try {
const response = await executeWithRetry(() =>
fetch(`${this.url}/management/processes/${workflowId}/source`),
);
const urlToFetch = `${this.url}/management/processes/${workflowId}/source`;
const response = await executeWithRetry(() => fetch(urlToFetch));

if (response.ok) {
return await response.text();
}
const responseStr = JSON.stringify(response);
this.logger.error(
`Response was NOT okay when fetch(${this.url}/management/processes/${workflowId}/source). Received response: ${response}`,
`Response was NOT okay when fetch(${urlToFetch}). Received response: ${responseStr}`,
);
} catch (error) {
this.logger.error(`Error when fetching workflow source: ${error}`);
Expand All @@ -146,14 +168,14 @@ export class SonataFlowService {

public async fetchOpenApi(): Promise<OpenAPIV3.Document | undefined> {
try {
const response = await executeWithRetry(() =>
fetch(`${this.url}/q/openapi.json`),
);
const urlToFetch = `${this.url}/q/openapi.json`;
const response = await executeWithRetry(() => fetch(urlToFetch));
if (response.ok) {
return await response.json();
}
const responseStr = JSON.stringify(response);
this.logger.error(
`Response was NOT okay when fetch(${this.url}/q/openapi.json). Received response: ${response}`,
`Response was NOT okay when fetch(${urlToFetch}). Received response: ${responseStr}`,
);
} catch (error) {
this.logger.error(`Error when fetching openapi: ${error}`);
Expand All @@ -163,9 +185,8 @@ export class SonataFlowService {

public async fetchWorkflows(): Promise<WorkflowItem[] | undefined> {
try {
const response = await executeWithRetry(() =>
fetch(`${this.url}/management/processes`),
);
const urlToFetch = `${this.url}/management/processes`;
const response = await executeWithRetry(() => fetch(urlToFetch));

if (response.ok) {
const workflowIds = (await response.json()) as string[];
Expand Down Expand Up @@ -193,8 +214,9 @@ export class SonataFlowService {
);
return items.filter((item): item is WorkflowItem => !!item);
}
const responseStr = JSON.stringify(response);
this.logger.error(
`Response was NOT okay when fetch(${this.url}/management/processes). Received response: ${response}`,
`Response was NOT okay when fetch(${urlToFetch}). Received response: ${responseStr}`,
);
} catch (error) {
this.logger.error(`Error when fetching workflows: ${error}`);
Expand All @@ -206,9 +228,8 @@ export class SonataFlowService {
WorkflowOverview[] | undefined
> {
try {
const response = await executeWithRetry(() =>
fetch(`${this.url}/management/processes`),
);
const urlToFetch = `${this.url}/management/processes`;
const response = await executeWithRetry(() => fetch(urlToFetch));

if (response.ok) {
const workflowIds = (await response.json()) as string[];
Expand All @@ -222,8 +243,9 @@ export class SonataFlowService {
);
return items.filter((item): item is WorkflowOverview => !!item);
}
const responseStr = JSON.stringify(response);
this.logger.error(
`Response was NOT okay when fetch(${this.url}/management/processes). Received response: ${response}`,
`Response was NOT okay when fetch(${urlToFetch}). Received response: ${responseStr}`,
);
} catch (error) {
this.logger.error(
Expand Down
30 changes: 7 additions & 23 deletions plugins/orchestrator-backend/src/service/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import { JsonObject, JsonValue } from '@backstage/types';

import express from 'express';
import Router from 'express-promise-router';
import { JSONSchema7 } from 'json-schema';
import { Logger } from 'winston';

import {
fromWorkflowSource,
orchestrator_service_ready_topic,
WorkflowDataInputSchema,
WorkflowDataInputSchemaResponse,
WorkflowItem,
WorkflowListResult,
Expand Down Expand Up @@ -91,7 +91,6 @@ export async function createBackendRouter(
args.sonataFlowService,
workflowService,
openApiService,
dataInputSchemaService,
jiraService,
);
setupExternalRoutes(router, discovery, scaffolderService);
Expand Down Expand Up @@ -125,7 +124,6 @@ function setupInternalRoutes(
sonataFlowService: SonataFlowService,
workflowService: WorkflowService,
openApiService: OpenApiService,
dataInputSchemaService: DataInputSchemaService,
jiraService: JiraService,
) {
router.get('/workflows/definitions', async (_, response) => {
Expand Down Expand Up @@ -290,32 +288,18 @@ function setupInternalRoutes(

const workflowItem: WorkflowItem = { uri, definition };

let schema: WorkflowDataInputSchema | undefined = undefined;
let schema: JSONSchema7 | undefined = undefined;

if (definition.dataInputSchema) {
const openApi = await sonataFlowService.fetchOpenApi();

if (!openApi) {
res.status(500).send(`Couldn't fetch OpenAPI from SonataFlow service`);
return;
}

const workflowDataInputSchema =
await dataInputSchemaService.resolveDataInputSchema({
openApi,
workflowId,
});
const workflowInfo =
await sonataFlowService.fetchWorkflowInfo(workflowId);

if (!workflowDataInputSchema) {
res
.status(500)
.send(
`Couldn't resolve data input schema for workflow ${workflowId}`,
);
if (!workflowInfo) {
res.status(500).send(`Couldn't fetch workflow info ${workflowId}`);
return;
}

schema = workflowDataInputSchema;
schema = workflowInfo.inputSchema;
}

const response: WorkflowDataInputSchemaResponse = {
Expand Down
4 changes: 2 additions & 2 deletions plugins/orchestrator-backend/src/types/apiResponse.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
export interface ApiResponse {
message?: String;
message?: string;
result?: any;
backEndErrCd?: String;
backEndErrCd?: string;
}

export class ApiResponseBuilder {
Expand Down
Loading

0 comments on commit cc64e09

Please sign in to comment.