Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FD-518 bulk operations #275

Merged
merged 1 commit into from
Apr 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions src/external-api/conductor-network-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -232,16 +232,6 @@ export function decodeWorkflowDetailOutput(value: unknown): WorkflowDetailOutput
return extractResult(WorkflowMetadata.decode(value));
}

const WorkflowEditValidator = t.type({
bulkErrorResults: t.record(t.string, t.string),
bulkSuccessfulResults: t.array(t.string),
});

export type WorkflowEditOutput = t.TypeOf<typeof WorkflowEditValidator>;

export function decodeWorkflowEditOutput(value: unknown): WorkflowEditOutput {
return extractResult(WorkflowEditValidator.decode(value));
}
export type BulkOperationOutput = t.TypeOf<typeof BulkOperation>;

export function decodeBulkOperationOutput(value: unknown): BulkOperationOutput {
Expand All @@ -261,3 +251,15 @@ export type TaskDefinitionsOutput = t.TypeOf<typeof TaskDefinitionsValidator>;
export function decodeTaskDefinitionsOutput(value: unknown): TaskDefinitionsOutput {
return extractResult(TaskDefinitionsValidator.decode(value));
}

export function decodeBulkTerminateOutput(value: unknown): BulkOperationOutput {
return extractResult(BulkOperation.decode(value));
}

export function decodeBulkRetryOutput(value: unknown): BulkOperationOutput {
return extractResult(BulkOperation.decode(value));
}

export function decodeBulkRestartOutput(value: unknown): BulkOperationOutput {
return extractResult(BulkOperation.decode(value));
}
28 changes: 24 additions & 4 deletions src/external-api/conductor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@ import {
decodeExecutedWorkflowDetailOutput,
decodeExecutedWorkflowsOutput,
decodeWorkflowDetailOutput,
decodeWorkflowEditOutput,
decodeWorkflowMetadataOutput,
WorkflowMetadataOutput,
WorkflowDetailInput,
ExecutedWorkflowsOutput,
WorkflowDetailOutput,
WorkflowEditOutput,
TaskDefinitionsOutput,
decodeTaskDefinitionsOutput,
decodeBulkTerminateOutput,
} from './conductor-network-types';
import { sendDeleteRequest, sendGetRequest, sendPostRequest, sendPutRequest } from './helpers';

Expand All @@ -35,9 +34,9 @@ async function createWorkflow(baseURL: string, workflow: WorkflowDetailInput): P
await sendPostRequest([baseURL, 'metadata/workflow'], workflow);
}

async function editWorkflow(baseURL: string, workflow: WorkflowDetailInput): Promise<WorkflowEditOutput> {
async function editWorkflow(baseURL: string, workflow: WorkflowDetailInput): Promise<BulkOperationOutput> {
const json = await sendPutRequest([baseURL, 'metadata/workflow'], [workflow]);
const data = decodeWorkflowEditOutput(json);
const data = decodeBulkOperationOutput(json);
return data;
}

Expand Down Expand Up @@ -67,6 +66,24 @@ async function bulkPauseWorkflow(baseURL: string, workflowIds: string[]): Promis
return data;
}

async function bulkTerminateWorkflow(baseURL: string, workflowIds: string[]): Promise<BulkOperationOutput> {
const json = await sendPostRequest([baseURL, `workflow/bulk/terminate`], workflowIds);
const data = decodeBulkTerminateOutput(json);
return data;
}

async function bulkRetryWorkflow(baseURL: string, workflowIds: string[]): Promise<BulkOperationOutput> {
const json = await sendPostRequest([baseURL, `workflow/bulk/retry`], workflowIds);
const data = decodeBulkTerminateOutput(json);
return data;
}

async function bulkRestartWorkflow(baseURL: string, workflowIds: string[]): Promise<BulkOperationOutput> {
const json = await sendPostRequest([baseURL, `workflow/bulk/restart`], workflowIds);
const data = decodeBulkTerminateOutput(json);
return data;
}

async function getExecutedWorkflows(
baseURL: string,
query?: SearchQuery | null,
Expand Down Expand Up @@ -171,6 +188,9 @@ const conductorAPI = {
resumeWorkflow,
bulkResumeWorkflow,
bulkPauseWorkflow,
bulkTerminateWorkflow,
bulkRetryWorkflow,
bulkRestartWorkflow,
getExecutedWorkflows,
getExecutedWorkflowDetail,
retryWorkflow,
Expand Down
3 changes: 3 additions & 0 deletions src/schema/api.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,10 @@ type Mutation {
addZone(input: AddZoneInput!): AddZonePayload!
applySnapshot(input: ApplySnapshotInput!, transactionId: String!): ApplySnapshotPayload!
bulkPauseWorkflow(workflowIds: [String!]!): BulkOperationResponse
bulkRestartWorkflow(workflowIds: [String!]!): BulkOperationResponse
bulkResumeWorkflow(workflowIds: [String!]!): BulkOperationResponse
bulkRetryWorkflow(workflowIds: [String!]!): BulkOperationResponse
bulkTerminateWorkflow(workflowIds: [String!]!): BulkOperationResponse
closeTransaction(deviceId: String!, transactionId: String!): CloseTransactionPayload!
commitConfig(input: CommitConfigInput!, transactionId: String!): CommitConfigPayload!
createLabel(input: CreateLabelInput!): CreateLabelPayload!
Expand Down
18 changes: 18 additions & 0 deletions src/schema/nexus-typegen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -986,7 +986,10 @@ export interface NexusGenFieldTypes {
addZone: NexusGenRootTypes['AddZonePayload']; // AddZonePayload!
applySnapshot: NexusGenRootTypes['ApplySnapshotPayload']; // ApplySnapshotPayload!
bulkPauseWorkflow: NexusGenRootTypes['BulkOperationResponse'] | null; // BulkOperationResponse
bulkRestartWorkflow: NexusGenRootTypes['BulkOperationResponse'] | null; // BulkOperationResponse
bulkResumeWorkflow: NexusGenRootTypes['BulkOperationResponse'] | null; // BulkOperationResponse
bulkRetryWorkflow: NexusGenRootTypes['BulkOperationResponse'] | null; // BulkOperationResponse
bulkTerminateWorkflow: NexusGenRootTypes['BulkOperationResponse'] | null; // BulkOperationResponse
closeTransaction: NexusGenRootTypes['CloseTransactionPayload']; // CloseTransactionPayload!
commitConfig: NexusGenRootTypes['CommitConfigPayload']; // CommitConfigPayload!
createLabel: NexusGenRootTypes['CreateLabelPayload']; // CreateLabelPayload!
Expand Down Expand Up @@ -1535,7 +1538,10 @@ export interface NexusGenFieldTypeNames {
addZone: 'AddZonePayload';
applySnapshot: 'ApplySnapshotPayload';
bulkPauseWorkflow: 'BulkOperationResponse';
bulkRestartWorkflow: 'BulkOperationResponse';
bulkResumeWorkflow: 'BulkOperationResponse';
bulkRetryWorkflow: 'BulkOperationResponse';
bulkTerminateWorkflow: 'BulkOperationResponse';
closeTransaction: 'CloseTransactionPayload';
commitConfig: 'CommitConfigPayload';
createLabel: 'CreateLabelPayload';
Expand Down Expand Up @@ -1796,10 +1802,22 @@ export interface NexusGenArgTypes {
// args
workflowIds: string[]; // [String!]!
};
bulkRestartWorkflow: {
// args
workflowIds: string[]; // [String!]!
};
bulkResumeWorkflow: {
// args
workflowIds: string[]; // [String!]!
};
bulkRetryWorkflow: {
// args
workflowIds: string[]; // [String!]!
};
bulkTerminateWorkflow: {
// args
workflowIds: string[]; // [String!]!
};
closeTransaction: {
// args
deviceId: string; // String!
Expand Down
50 changes: 49 additions & 1 deletion src/schema/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import {
} from 'nexus';
import config from '../config';
import { WorkflowDetailInput } from '../external-api/conductor-network-types';
import { toGraphId } from '../helpers/id-helper';
import { fromGraphId, toGraphId } from '../helpers/id-helper';
import { validateTasks } from '../helpers/workflow-helpers';
import getLogger from '../get-logger';
import { IsOkResponse, Node, PageInfo, PaginationConnectionArgs } from './global-types';
Expand Down Expand Up @@ -595,6 +595,54 @@ export const BulkPauseWorkflowMutation = mutationField('bulkPauseWorkflow', {
},
});

export const BulkTerminateWorkflow = mutationField('bulkTerminateWorkflow', {
type: BulkOperationResponse,
args: {
workflowIds: nonNull(list(nonNull(stringArg()))),
},
resolve: async (_, { workflowIds }, { conductorAPI }) => {
const nativeWorkflowIds = workflowIds.map((id) => fromGraphId('Workflow', id));
const data = await conductorAPI.bulkTerminateWorkflow(config.conductorApiURL, nativeWorkflowIds);

return {
bulkErrorResults: JSON.stringify(data.bulkErrorResults),
bulkSuccessfulResults: data.bulkSuccessfulResults,
};
},
});

export const BulkRetryWorkflow = mutationField('bulkRetryWorkflow', {
type: BulkOperationResponse,
args: {
workflowIds: nonNull(list(nonNull(stringArg()))),
},
resolve: async (_, { workflowIds }, { conductorAPI }) => {
const nativeWorkflowIds = workflowIds.map((id) => fromGraphId('Workflow', id));
const data = await conductorAPI.bulkRetryWorkflow(config.conductorApiURL, nativeWorkflowIds);

return {
bulkErrorResults: JSON.stringify(data.bulkErrorResults),
bulkSuccessfulResults: data.bulkSuccessfulResults,
};
},
});

export const BulkRestartWorkflow = mutationField('bulkRestartWorkflow', {
type: BulkOperationResponse,
args: {
workflowIds: nonNull(list(nonNull(stringArg()))),
},
resolve: async (_, { workflowIds }, { conductorAPI }) => {
const nativeWorkflowIds = workflowIds.map((id) => fromGraphId('Workflow', id));
const data = await conductorAPI.bulkRestartWorkflow(config.conductorApiURL, nativeWorkflowIds);

return {
bulkErrorResults: JSON.stringify(data.bulkErrorResults),
bulkSuccessfulResults: data.bulkSuccessfulResults,
};
},
});

export const RetryWorkflowMutation = mutationField('retryWorkflow', {
type: IsOkResponse,
args: {
Expand Down