Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change executed task query #307

Merged
merged 5 commits into from
Jun 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/external-api/conductor-network-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ const ExecutedWorkflowTask = t.type({
taskType: optional(t.string),
status: optional(ExecutedWorkflowTaskStatus),
inputData: optional(t.record(t.string, t.unknown)),
outputData: optional(t.record(t.string, t.unknown)),
referenceTaskName: optional(t.string),
retryCount: optional(t.number),
seq: optional(t.number),
Expand Down Expand Up @@ -163,7 +164,7 @@ const ExecutedWorkflow = t.type({
updatedBy: optional(t.string),
status: ExecutedWorkflowStatus,
endTime: optional(t.number),
workflowId: optional(t.string),
workflowId: t.string,
parentWorkflowId: optional(t.string),
parentWorkflowTaskId: optional(t.string),
tasks: optional(t.array(ExecutedWorkflowTask)),
Expand Down Expand Up @@ -264,3 +265,7 @@ export function decodeBulkRetryOutput(value: unknown): BulkOperationOutput {
export function decodeBulkRestartOutput(value: unknown): BulkOperationOutput {
return extractResult(BulkOperation.decode(value));
}

export function decodeExecutedWorkflowTaskDetailOutput(value: unknown): ApiExecutedWorkflowTask {
return extractResult(ExecutedWorkflowTask.decode(value));
}
16 changes: 15 additions & 1 deletion src/external-api/conductor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import {
TaskDefinitionsOutput,
decodeTaskDefinitionsOutput,
decodeBulkTerminateOutput,
decodeExecutedWorkflowTaskDetailOutput,
ApiExecutedWorkflowTask,
} from './conductor-network-types';
import { sendDeleteRequest, sendGetRequest, sendPostRequest, sendPutRequest } from './helpers';

Expand Down Expand Up @@ -180,7 +182,12 @@ async function executeWorkflowByName(
{ name, inputParameters, correlationId, version, priority }: ExecuteWorkflowByNameInput,
): Promise<string> {
const executedWorkflowId = await sendPostRequest(
[baseURL, `workflow/${name}?version=${version}&correlationId=${correlationId}&priority=${priority}`],
[
baseURL,
`workflow/${name}?${version == null ? '' : `version=${version}`}${
correlationId == null ? '' : `&correlationId=${correlationId}`
}&priority=${priority == null ? 0 : priority}`,
],
inputParameters,
);

Expand All @@ -197,6 +204,12 @@ async function getTaskDefinitions(baseURL: string): Promise<TaskDefinitionsOutpu
return data;
}

async function getExecutedWorkflowTaskDetail(baseURL: string, taskId: string): Promise<ApiExecutedWorkflowTask> {
const json = await sendGetRequest([baseURL, `/tasks/${taskId}`]);
const data = decodeExecutedWorkflowTaskDetailOutput(json);
return data;
}

const conductorAPI = {
getWorkflowMetadata,
getWorkflowDetail,
Expand All @@ -219,6 +232,7 @@ const conductorAPI = {
executeNewWorkflow,
executeWorkflowByName,
getTaskDefinitions,
getExecutedWorkflowTaskDetail,
};

export type ConductorAPI = typeof conductorAPI;
Expand Down
11 changes: 7 additions & 4 deletions src/external-api/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,13 @@ async function apiFetch(path: APIPath, options: RequestInit): Promise<unknown> {
return response;
}

const json = JSON.parse(text);
logResponse(requestId, json);

return json;
try {
const json = JSON.parse(text);
return json;
} catch (e) {
logResponse(requestId, text);
return text;
}
}

export async function sendGetRequest(path: APIPath, cookie?: string): Promise<unknown> {
Expand Down
4 changes: 2 additions & 2 deletions src/helpers/workflow.helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ function extractSubworkflowsFromTasks(task: ExecutedWorkflowTask): SubWorkflow |
}

type SubworkflowDetail = {
taskReferenceName: string;
referenceTaskName: string;
workflowDetail: Workflow;
executedWorkflowDetail: ExecutedWorkflow;
};
Expand All @@ -137,7 +137,7 @@ async function getSubworklowsDetail(subWorkflow: SubWorkflow): Promise<Subworkfl
};

return {
taskReferenceName: referenceTaskName,
referenceTaskName,
workflowDetail: workflowDetailWithId,
executedWorkflowDetail: executedWorkflowDetailWithId,
};
Expand Down
14 changes: 11 additions & 3 deletions src/schema/api.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ input ExecuteWorkflowByName {
}

type ExecutedWorkflow implements Node {
correlationId: String
createdAt: String
createdBy: String
endTime: String
Expand All @@ -353,7 +354,7 @@ type ExecutedWorkflow implements Node {
variables: String
version: Int
workflowDefinition: Workflow
workflowId: String
workflowId: String!
workflowName: String
workflowVersion: Int
}
Expand Down Expand Up @@ -396,20 +397,27 @@ enum ExecutedWorkflowStatus {
}

type ExecutedWorkflowTask implements Node {
callbackAfterSeconds: Int
endTime: String
executed: Boolean
externalInputPayloadStoragePath: String
externalOutputPayloadStoragePath: String
id: ID!
inputData: String
outputData: String
pollCount: Int
reasonForIncompletion: String
referenceTaskName: String
retried: Boolean
retryCount: Int
scheduledTime: String
seq: Int
startTime: String
status: ExecutedWorkflowTaskStatus
subWorkflowId: String
taskDefName: String
taskDefinition: String
taskId: String
taskReferenceName: String
taskType: String
updateTime: String
version: Int
Expand Down Expand Up @@ -768,7 +776,7 @@ input StartWorkflowRequestInput {

type SubWorkflow {
executedWorkflowDetail: ExecutedWorkflow!
taskReferenceName: String!
referenceTaskName: String!
workflowDetail: Workflow!
}

Expand Down
9 changes: 9 additions & 0 deletions src/schema/global-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,15 @@ export const NodeQuery = extendType({

return { ...schedule, id: args.id, __typename: 'Schedule' };
}
case 'ExecutedWorkflowTask': {
const id = fromGraphId('ExecutedWorkflowTask', args.id);
const task = await conductorAPI.getExecutedWorkflowTaskDetail(config.conductorApiURL, id);
if (task == null) {
return null;
}

return { ...task, id: args.id, __typename: 'ExecutedWorkflowTask' };
}
/* eslint-enable */
default:
return null;
Expand Down
28 changes: 22 additions & 6 deletions src/schema/nexus-typegen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ export interface NexusGenObjects {
SubWorkflow: {
// root type
executedWorkflowDetail: NexusGenRootTypes['ExecutedWorkflow']; // ExecutedWorkflow!
taskReferenceName: string; // String!
referenceTaskName: string; // String!
workflowDetail: NexusGenRootTypes['Workflow']; // Workflow!
};
Subscription: {};
Expand Down Expand Up @@ -966,6 +966,7 @@ export interface NexusGenFieldTypes {
};
ExecutedWorkflow: {
// field return type
correlationId: string | null; // String
createdAt: string | null; // String
createdBy: string | null; // String
endTime: string | null; // String
Expand All @@ -985,7 +986,7 @@ export interface NexusGenFieldTypes {
variables: string | null; // String
version: number | null; // Int
workflowDefinition: NexusGenRootTypes['Workflow'] | null; // Workflow
workflowId: string | null; // String
workflowId: string; // String!
workflowName: string | null; // String
workflowVersion: number | null; // Int
};
Expand All @@ -1002,20 +1003,27 @@ export interface NexusGenFieldTypes {
};
ExecutedWorkflowTask: {
// field return type
callbackAfterSeconds: number | null; // Int
endTime: string | null; // String
executed: boolean | null; // Boolean
externalInputPayloadStoragePath: string | null; // String
externalOutputPayloadStoragePath: string | null; // String
id: string; // ID!
inputData: string | null; // String
outputData: string | null; // String
pollCount: number | null; // Int
reasonForIncompletion: string | null; // String
referenceTaskName: string | null; // String
retried: boolean | null; // Boolean
retryCount: number | null; // Int
scheduledTime: string | null; // String
seq: number | null; // Int
startTime: string | null; // String
status: NexusGenEnums['ExecutedWorkflowTaskStatus'] | null; // ExecutedWorkflowTaskStatus
subWorkflowId: string | null; // String
taskDefName: string | null; // String
taskDefinition: string | null; // String
taskId: string | null; // String
taskReferenceName: string | null; // String
taskType: string | null; // String
updateTime: string | null; // String
version: number | null; // Int
Expand Down Expand Up @@ -1258,7 +1266,7 @@ export interface NexusGenFieldTypes {
SubWorkflow: {
// field return type
executedWorkflowDetail: NexusGenRootTypes['ExecutedWorkflow']; // ExecutedWorkflow!
taskReferenceName: string; // String!
referenceTaskName: string; // String!
workflowDetail: NexusGenRootTypes['Workflow']; // Workflow!
};
Subscription: {
Expand Down Expand Up @@ -1605,6 +1613,7 @@ export interface NexusGenFieldTypeNames {
};
ExecutedWorkflow: {
// field return type name
correlationId: 'String';
createdAt: 'String';
createdBy: 'String';
endTime: 'String';
Expand Down Expand Up @@ -1641,20 +1650,27 @@ export interface NexusGenFieldTypeNames {
};
ExecutedWorkflowTask: {
// field return type name
callbackAfterSeconds: 'Int';
endTime: 'String';
executed: 'Boolean';
externalInputPayloadStoragePath: 'String';
externalOutputPayloadStoragePath: 'String';
id: 'ID';
inputData: 'String';
outputData: 'String';
pollCount: 'Int';
reasonForIncompletion: 'String';
referenceTaskName: 'String';
retried: 'Boolean';
retryCount: 'Int';
scheduledTime: 'String';
seq: 'Int';
startTime: 'String';
status: 'ExecutedWorkflowTaskStatus';
subWorkflowId: 'String';
taskDefName: 'String';
taskDefinition: 'String';
taskId: 'String';
taskReferenceName: 'String';
taskType: 'String';
updateTime: 'String';
version: 'Int';
Expand Down Expand Up @@ -1897,7 +1913,7 @@ export interface NexusGenFieldTypeNames {
SubWorkflow: {
// field return type name
executedWorkflowDetail: 'ExecutedWorkflow';
taskReferenceName: 'String';
referenceTaskName: 'String';
workflowDetail: 'Workflow';
};
Subscription: {
Expand Down
9 changes: 8 additions & 1 deletion src/schema/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ export const ExecutedWorkflowTask = objectType({
resolve: (executedWorkflowTask) => toGraphId('ExecutedWorkflowTask', executedWorkflowTask.taskId ?? uuid()),
});
t.string('taskType');
t.string('taskReferenceName');
t.string('referenceTaskName');
t.field('status', { type: ExecutedWorkflowTaskStatus });
t.int('retryCount');
t.string('startTime', {
Expand All @@ -76,6 +76,13 @@ export const ExecutedWorkflowTask = objectType({
t.string('reasonForIncompletion');
t.string('taskDefinition', { resolve: (task) => JSON.stringify(task.taskDefinition) });
t.string('subWorkflowId');
t.string('inputData', { resolve: (task) => JSON.stringify(task.inputData) });
t.string('outputData', { resolve: (task) => JSON.stringify(task.outputData) });
t.string('externalOutputPayloadStoragePath');
t.string('externalInputPayloadStoragePath');
t.int('callbackAfterSeconds');
t.int('seq');
t.int('pollCount');
},
});

Expand Down
Loading