diff --git a/src/external-api/conductor-network-types.ts b/src/external-api/conductor-network-types.ts index 44e17d25..d6cc487e 100644 --- a/src/external-api/conductor-network-types.ts +++ b/src/external-api/conductor-network-types.ts @@ -118,6 +118,7 @@ const ExecutedWorkflowTask = t.type({ taskType: optional(t.string), status: optional(ExecutedWorkflowTaskStatus), inputData: optional(t.record(t.string, t.unknown)), + outputData: optional(t.record(t.string, t.unknown)), referenceTaskName: optional(t.string), retryCount: optional(t.number), seq: optional(t.number), @@ -163,7 +164,7 @@ const ExecutedWorkflow = t.type({ updatedBy: optional(t.string), status: ExecutedWorkflowStatus, endTime: optional(t.number), - workflowId: optional(t.string), + workflowId: t.string, parentWorkflowId: optional(t.string), parentWorkflowTaskId: optional(t.string), tasks: optional(t.array(ExecutedWorkflowTask)), @@ -264,3 +265,7 @@ export function decodeBulkRetryOutput(value: unknown): BulkOperationOutput { export function decodeBulkRestartOutput(value: unknown): BulkOperationOutput { return extractResult(BulkOperation.decode(value)); } + +export function decodeExecutedWorkflowTaskDetailOutput(value: unknown): ApiExecutedWorkflowTask { + return extractResult(ExecutedWorkflowTask.decode(value)); +} diff --git a/src/external-api/conductor.ts b/src/external-api/conductor.ts index a7d29b21..37107143 100644 --- a/src/external-api/conductor.ts +++ b/src/external-api/conductor.ts @@ -15,6 +15,8 @@ import { TaskDefinitionsOutput, decodeTaskDefinitionsOutput, decodeBulkTerminateOutput, + decodeExecutedWorkflowTaskDetailOutput, + ApiExecutedWorkflowTask, } from './conductor-network-types'; import { sendDeleteRequest, sendGetRequest, sendPostRequest, sendPutRequest } from './helpers'; @@ -180,7 +182,12 @@ async function executeWorkflowByName( { name, inputParameters, correlationId, version, priority }: ExecuteWorkflowByNameInput, ): Promise { const executedWorkflowId = await sendPostRequest( - [baseURL, `workflow/${name}?version=${version}&correlationId=${correlationId}&priority=${priority}`], + [ + baseURL, + `workflow/${name}?${version == null ? '' : `version=${version}`}${ + correlationId == null ? '' : `&correlationId=${correlationId}` + }&priority=${priority == null ? 0 : priority}`, + ], inputParameters, ); @@ -197,6 +204,12 @@ async function getTaskDefinitions(baseURL: string): Promise { + const json = await sendGetRequest([baseURL, `/tasks/${taskId}`]); + const data = decodeExecutedWorkflowTaskDetailOutput(json); + return data; +} + const conductorAPI = { getWorkflowMetadata, getWorkflowDetail, @@ -219,6 +232,7 @@ const conductorAPI = { executeNewWorkflow, executeWorkflowByName, getTaskDefinitions, + getExecutedWorkflowTaskDetail, }; export type ConductorAPI = typeof conductorAPI; diff --git a/src/external-api/helpers.ts b/src/external-api/helpers.ts index 7e0c53a9..0af2cef9 100644 --- a/src/external-api/helpers.ts +++ b/src/external-api/helpers.ts @@ -89,10 +89,13 @@ async function apiFetch(path: APIPath, options: RequestInit): Promise { return response; } - const json = JSON.parse(text); - logResponse(requestId, json); - - return json; + try { + const json = JSON.parse(text); + return json; + } catch (e) { + logResponse(requestId, text); + return text; + } } export async function sendGetRequest(path: APIPath, cookie?: string): Promise { diff --git a/src/helpers/workflow.helpers.ts b/src/helpers/workflow.helpers.ts index d010effe..30347ba7 100644 --- a/src/helpers/workflow.helpers.ts +++ b/src/helpers/workflow.helpers.ts @@ -112,7 +112,7 @@ function extractSubworkflowsFromTasks(task: ExecutedWorkflowTask): SubWorkflow | } type SubworkflowDetail = { - taskReferenceName: string; + referenceTaskName: string; workflowDetail: Workflow; executedWorkflowDetail: ExecutedWorkflow; }; @@ -137,7 +137,7 @@ async function getSubworklowsDetail(subWorkflow: SubWorkflow): Promise toGraphId('ExecutedWorkflowTask', executedWorkflowTask.taskId ?? uuid()), }); t.string('taskType'); - t.string('taskReferenceName'); + t.string('referenceTaskName'); t.field('status', { type: ExecutedWorkflowTaskStatus }); t.int('retryCount'); t.string('startTime', { @@ -76,6 +76,13 @@ export const ExecutedWorkflowTask = objectType({ t.string('reasonForIncompletion'); t.string('taskDefinition', { resolve: (task) => JSON.stringify(task.taskDefinition) }); t.string('subWorkflowId'); + t.string('inputData', { resolve: (task) => JSON.stringify(task.inputData) }); + t.string('outputData', { resolve: (task) => JSON.stringify(task.outputData) }); + t.string('externalOutputPayloadStoragePath'); + t.string('externalInputPayloadStoragePath'); + t.int('callbackAfterSeconds'); + t.int('seq'); + t.int('pollCount'); }, }); diff --git a/src/schema/workflow.ts b/src/schema/workflow.ts index bece9947..f1acb5a3 100644 --- a/src/schema/workflow.ts +++ b/src/schema/workflow.ts @@ -213,7 +213,7 @@ export const ExecutedWorkflow = objectType({ definition(t) { t.implements(Node); t.nonNull.id('id', { - resolve: () => toGraphId('ExecutedWorkflow', uuid()), + resolve: (root) => toGraphId('ExecutedWorkflow', root.workflowId), }); t.string('createdBy', { resolve: (executedWorkflow) => executedWorkflow.createdBy ?? null }); t.string('updatedBy', { resolve: (workflow) => workflow.updatedBy ?? null }); @@ -243,10 +243,11 @@ export const ExecutedWorkflow = objectType({ }); t.int('workflowVersion'); t.string('workflowName'); - t.string('workflowId'); + t.nonNull.string('workflowId'); t.list.nonNull.field('tasks', { type: ExecutedWorkflowTask, }); + t.string('correlationId'); }, }); @@ -323,7 +324,7 @@ export const ExecutedWorkflowsQuery = queryField('executedWorkflows', { const executedWorkflowsWithId = executedWorkflows .map((w) => ({ ...w, - id: toGraphId('ExecutedWorkflow', uuid()), + id: toGraphId('ExecutedWorkflow', w.workflowId), })) .slice(0, args.pagination?.size ?? 0 - 1); @@ -346,7 +347,7 @@ export const ExecutedWorkflowsQuery = queryField('executedWorkflows', { const SubWorkflow = objectType({ name: 'SubWorkflow', definition: (t) => { - t.nonNull.string('taskReferenceName'); + t.nonNull.string('referenceTaskName'); t.nonNull.field('workflowDetail', { type: Workflow }); t.nonNull.field('executedWorkflowDetail', { type: ExecutedWorkflow }); }, @@ -372,7 +373,7 @@ export const WorkflowInstanceQuery = queryField('workflowInstanceDetail', { const result = await conductorAPI.getExecutedWorkflowDetail( config.conductorApiURL, - workflowId, + fromGraphId('ExecutedWorkflow', workflowId), shouldIncludeTasks ?? false, ); @@ -390,7 +391,7 @@ export const WorkflowInstanceQuery = queryField('workflowInstanceDetail', { const subworkflows = await getSubworkflows({ ...result, - id: toGraphId('ExecutedWorkflow', uuid()), + id: toGraphId('ExecutedWorkflow', result.workflowId), }); return { @@ -646,7 +647,7 @@ export const PauseWorkflowMutation = mutationField('pauseWorkflow', { id: nonNull(stringArg()), }, resolve: async (_, { id }, { conductorAPI }) => { - await conductorAPI.pauseWorkflow(config.conductorApiURL, id); + await conductorAPI.pauseWorkflow(config.conductorApiURL, fromGraphId('ExecutedWorkflow', id)); return { isOk: true }; }, @@ -658,7 +659,7 @@ export const ResumeWorkflowMutation = mutationField('resumeWorkflow', { id: nonNull(stringArg()), }, resolve: async (_, { id }, { conductorAPI }) => { - await conductorAPI.resumeWorkflow(config.conductorApiURL, id); + await conductorAPI.resumeWorkflow(config.conductorApiURL, fromGraphId('ExecutedWorkflow', id)); return { isOk: true }; }, @@ -685,11 +686,12 @@ export const BulkResumeWorkflowMutation = mutationField('bulkResumeWorkflow', { input: nonNull(arg({ type: BulkOperationInput })), }, resolve: async (_, { input }, { conductorAPI }) => { - const data = await conductorAPI.bulkResumeWorkflow(config.conductorApiURL, input.executedWorkflowIds); + const executedWorkflowIds = input.executedWorkflowIds.map((id) => fromGraphId('ExecutedWorkflow', id)); + const data = await conductorAPI.bulkResumeWorkflow(config.conductorApiURL, executedWorkflowIds); return { bulkErrorResults: JSON.stringify(data.bulkErrorResults), - bulkSuccessfulResults: data.bulkSuccessfulResults, + bulkSuccessfulResults: data.bulkSuccessfulResults.map((id) => toGraphId('ExecutedWorkflow', id)), }; }, }); @@ -715,15 +717,21 @@ export const ExecuteWorkflowByName = mutationField('executeWorkflowByName', { { input: { inputParameters, workflowName, workflowVersion, correlationId, priority } }, { conductorAPI }, ) => { + const json = parseJson>(inputParameters); + + if (json == null) { + throw new Error('inputParameters must be a valid JSON string'); + } + const workflowId = await conductorAPI.executeWorkflowByName(config.conductorApiURL, { - inputParameters: parseJson(inputParameters), + inputParameters: json, name: workflowName, version: workflowVersion, correlationId, priority, }); - return workflowId; + return toGraphId('ExecutedWorkflow', workflowId); }, }); @@ -733,11 +741,12 @@ export const BulkPauseWorkflowMutation = mutationField('bulkPauseWorkflow', { input: nonNull(arg({ type: BulkOperationInput })), }, resolve: async (_, { input }, { conductorAPI }) => { - const data = await conductorAPI.bulkPauseWorkflow(config.conductorApiURL, input.executedWorkflowIds); + const executedWorkflowIds = input.executedWorkflowIds.map((id) => fromGraphId('ExecutedWorkflow', id)); + const data = await conductorAPI.bulkPauseWorkflow(config.conductorApiURL, executedWorkflowIds); return { bulkErrorResults: JSON.stringify(data.bulkErrorResults), - bulkSuccessfulResults: data.bulkSuccessfulResults, + bulkSuccessfulResults: data.bulkSuccessfulResults.map((id) => toGraphId('ExecutedWorkflow', id)), }; }, }); @@ -748,11 +757,12 @@ export const BulkTerminateWorkflow = mutationField('bulkTerminateWorkflow', { input: nonNull(arg({ type: BulkOperationInput })), }, resolve: async (_, { input }, { conductorAPI }) => { - const data = await conductorAPI.bulkTerminateWorkflow(config.conductorApiURL, input.executedWorkflowIds); + const executedWorkflowIds = input.executedWorkflowIds.map((id) => fromGraphId('ExecutedWorkflow', id)); + const data = await conductorAPI.bulkTerminateWorkflow(config.conductorApiURL, executedWorkflowIds); return { bulkErrorResults: JSON.stringify(data.bulkErrorResults), - bulkSuccessfulResults: data.bulkSuccessfulResults, + bulkSuccessfulResults: data.bulkSuccessfulResults.map((id) => toGraphId('ExecutedWorkflow', id)), }; }, }); @@ -763,11 +773,12 @@ export const BulkRetryWorkflow = mutationField('bulkRetryWorkflow', { input: nonNull(arg({ type: BulkOperationInput })), }, resolve: async (_, { input }, { conductorAPI }) => { - const data = await conductorAPI.bulkRetryWorkflow(config.conductorApiURL, input.executedWorkflowIds); + const executedWorkflowIds = input.executedWorkflowIds.map((id) => fromGraphId('ExecutedWorkflow', id)); + const data = await conductorAPI.bulkRetryWorkflow(config.conductorApiURL, executedWorkflowIds); return { bulkErrorResults: JSON.stringify(data.bulkErrorResults), - bulkSuccessfulResults: data.bulkSuccessfulResults, + bulkSuccessfulResults: data.bulkSuccessfulResults.map((id) => toGraphId('ExecutedWorkflow', id)), }; }, }); @@ -778,11 +789,12 @@ export const BulkRestartWorkflow = mutationField('bulkRestartWorkflow', { input: nonNull(arg({ type: BulkOperationInput })), }, resolve: async (_, { input }, { conductorAPI }) => { - const data = await conductorAPI.bulkRestartWorkflow(config.conductorApiURL, input.executedWorkflowIds); + const executedWorkflowIds = input.executedWorkflowIds.map((id) => fromGraphId('ExecutedWorkflow', id)); + const data = await conductorAPI.bulkRestartWorkflow(config.conductorApiURL, executedWorkflowIds); return { bulkErrorResults: JSON.stringify(data.bulkErrorResults), - bulkSuccessfulResults: data.bulkSuccessfulResults, + bulkSuccessfulResults: data.bulkSuccessfulResults.map((id) => toGraphId('ExecutedWorkflow', id)), }; }, }); @@ -801,7 +813,11 @@ export const RetryWorkflowMutation = mutationField('retryWorkflow', { input: arg({ type: RetryWorkflowInput }), }, resolve: async (_, { id, input }, { conductorAPI }) => { - await conductorAPI.retryWorkflow(config.conductorApiURL, id, input?.shouldResumeSubworkflowTasks); + await conductorAPI.retryWorkflow( + config.conductorApiURL, + fromGraphId('ExecutedWorkflow', id), + input?.shouldResumeSubworkflowTasks, + ); return { isOk: true }; }, @@ -821,7 +837,11 @@ export const RestartWorkflowMutation = mutationField('restartWorkflow', { input: arg({ type: RestartWorkflowInput }), }, resolve: async (_, { id, input }, { conductorAPI }) => { - await conductorAPI.restartWorkflow(config.conductorApiURL, id, input?.shouldUseLatestDefinitions); + await conductorAPI.restartWorkflow( + config.conductorApiURL, + fromGraphId('ExecutedWorkflow', id), + input?.shouldUseLatestDefinitions, + ); return { isOk: true }; }, @@ -841,7 +861,7 @@ export const TerminateWorkflowMutation = mutationField('terminateWorkflow', { input: arg({ type: TerminateWorkflowInput }), }, resolve: async (_, { id, input }, { conductorAPI }) => { - await conductorAPI.terminateWorkflow(config.conductorApiURL, id, input?.reason); + await conductorAPI.terminateWorkflow(config.conductorApiURL, fromGraphId('ExecutedWorkflow', id), input?.reason); return { isOk: true }; }, @@ -861,7 +881,11 @@ export const RemoveWorkflowMutation = mutationField('removeWorkflow', { input: arg({ type: RemoveWorkflowInput }), }, resolve: async (_, { id, input }, { conductorAPI }) => { - await conductorAPI.removeWorkflow(config.conductorApiURL, id, input?.shouldArchiveWorkflow); + await conductorAPI.removeWorkflow( + config.conductorApiURL, + fromGraphId('ExecutedWorkflow', id), + input?.shouldArchiveWorkflow, + ); return { isOk: true }; }, @@ -1059,10 +1083,11 @@ export const ExecutedWorkflowSubscription = subscriptionField('controlExecutedWo args: { id: nonNull(stringArg()), }, - subscribe: async (_, { id }, { conductorAPI }) => + subscribe: (_, { id }, { conductorAPI }) => asyncGenerator({ repeatTill: (workflow) => workflow?.status === 'RUNNING' || workflow?.status === 'PAUSED', - fn: () => conductorAPI.getExecutedWorkflowDetail(config.conductorApiURL, id, false), + fn: () => + conductorAPI.getExecutedWorkflowDetail(config.conductorApiURL, fromGraphId('ExecutedWorkflow', id), true), }), resolve: (workflow) => { if (workflow == null) { @@ -1071,10 +1096,7 @@ export const ExecutedWorkflowSubscription = subscriptionField('controlExecutedWo return { ...workflow, - id: toGraphId('ExecutedWorkflow', uuid()), + id: toGraphId('ExecutedWorkflow', workflow.workflowId), }; }, }); - -// id generujem podla nasej hodnoty nejakej -// ked budem chciet query alebo presmerovat niekam pouzijem workflowId a ak bude null vratim error ze neexistuje workflow s tymto id