Skip to content

Commit

Permalink
fix(api-headless-cms-import-export): check for real status of child t…
Browse files Browse the repository at this point in the history
…asks [skip ci] (#4294)
  • Loading branch information
brunozoric authored Sep 26, 2024
1 parent 8f43ff3 commit af4195f
Show file tree
Hide file tree
Showing 11 changed files with 96 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ export const makeSureModelsAreIdentical = (params: IMakeSureModelsAreIdenticalPa
message: `Field "${value.field.fieldId}" not found in the model provided via the JSON data.`,
code: "MODEL_FIELD_NOT_FOUND",
data: {
...value
field: value,
targetValues,
modelValues
}
});
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import type { ITaskResponseResult, ITaskRunParams } from "@webiny/tasks";
import type {
import {
IImportFromUrlController,
IImportFromUrlControllerInput,
IImportFromUrlControllerInputStepsStep,
IImportFromUrlControllerOutput
} from "~/tasks/domain/abstractions/ImportFromUrlController";
import { IImportFromUrlControllerInputStep } from "~/tasks/domain/abstractions/ImportFromUrlController";
Expand All @@ -10,7 +11,7 @@ import { ImportFromUrlControllerDownloadStep } from "~/tasks/domain/importFromUr
import { ImportFromUrlControllerProcessEntriesStep } from "./importFromUrlControllerSteps/ImportFromUrlControllerProcessEntriesStep";
import { ImportFromUrlControllerProcessAssetsStep } from "./importFromUrlControllerSteps/ImportFromUrlControllerProcessAssetsStep";

const getDefaultStepValues = () => {
const getDefaultStepValues = (): IImportFromUrlControllerInputStepsStep => {
return {
files: [],
triggered: false,
Expand Down Expand Up @@ -56,7 +57,7 @@ export class ImportFromUrlController<

const downloadStep =
steps[IImportFromUrlControllerInputStep.DOWNLOAD] || getDefaultStepValues();
if (!downloadStep.done) {
if (!downloadStep.finished) {
const step = new ImportFromUrlControllerDownloadStep<C, I, O>();
return await step.execute(params);
} else if (downloadStep.failed.length) {
Expand All @@ -69,7 +70,7 @@ export class ImportFromUrlController<

const processEntriesStep =
steps[IImportFromUrlControllerInputStep.PROCESS_ENTRIES] || getDefaultStepValues();
if (!processEntriesStep.done) {
if (!processEntriesStep.finished) {
const step = new ImportFromUrlControllerProcessEntriesStep<C, I, O>();
return await step.execute(params);
} else if (processEntriesStep.failed.length) {
Expand All @@ -82,7 +83,7 @@ export class ImportFromUrlController<

const processAssetsStep =
steps[IImportFromUrlControllerInputStep.PROCESS_ASSETS] || getDefaultStepValues();
if (!processAssetsStep.done) {
if (!processAssetsStep.finished) {
const step = new ImportFromUrlControllerProcessAssetsStep<C, I, O>();
return await step.execute(params);
} else if (processAssetsStep.failed.length) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,20 @@ export enum IImportFromUrlControllerInputStep {
PROCESS_ASSETS = "processAssets"
}

export interface IImportFromUrlControllerInputStepsStep {
files: string[];
triggered: boolean;
finished: boolean;
done: string[];
failed: string[];
invalid: string[];
aborted: string[];
}

export interface IImportFromUrlControllerInputSteps {
[IImportFromUrlControllerInputStep.DOWNLOAD]?: {
files: string[];
triggered: boolean;
finished: boolean;
done: string[];
failed: string[];
invalid: string[];
aborted: string[];
};
[IImportFromUrlControllerInputStep.PROCESS_ENTRIES]?: {
files: string[];
triggered: boolean;
finished: boolean;
done: string[];
failed: string[];
invalid: string[];
aborted: string[];
};
[IImportFromUrlControllerInputStep.PROCESS_ASSETS]?: {
files: string[];
triggered: boolean;
finished: boolean;
done: string[];
failed: string[];
invalid: string[];
aborted: string[];
};
[IImportFromUrlControllerInputStep.DOWNLOAD]?: IImportFromUrlControllerInputStepsStep;
[IImportFromUrlControllerInputStep.PROCESS_ENTRIES]?: IImportFromUrlControllerInputStepsStep;
[IImportFromUrlControllerInputStep.PROCESS_ASSETS]?: IImportFromUrlControllerInputStepsStep;
}

export interface IImportFromUrlControllerInput {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,39 @@
import type { ITask, ITaskResponseDoneResultOutput } from "@webiny/tasks";
import { TaskDataStatus } from "@webiny/tasks";
import type { Context } from "~/types";
import { IStepFunctionServiceFetchResult } from "@webiny/tasks/service/StepFunctionServicePlugin";

export interface IGetChildTasksParams {
context: Context;
task: ITask;
definition: string;
}

const mapServiceStatusToTaskStatus = (
task: ITask<any, any>,
serviceInfo: IStepFunctionServiceFetchResult | null
) => {
if (!serviceInfo) {
console.log(`Service info is missing for task ${task.id} (${task.definitionId}).`);
return null;
}
if (serviceInfo.status === "RUNNING") {
return TaskDataStatus.RUNNING;
} else if (serviceInfo.status === "SUCCEEDED") {
return TaskDataStatus.SUCCESS;
} else if (serviceInfo.status === "FAILED") {
return TaskDataStatus.FAILED;
} else if (serviceInfo.status === "ABORTED") {
return TaskDataStatus.ABORTED;
} else if (serviceInfo.status === "TIMED_OUT" || serviceInfo.status === "PENDING_REDRIVE") {
console.log(
`Service status is ${serviceInfo.status} for task ${task.id} (${task.definitionId}).`
);
return null;
}
return TaskDataStatus.PENDING;
};

export const getChildTasks = async <I, O extends ITaskResponseDoneResultOutput>({
context,
task,
Expand All @@ -24,14 +50,33 @@ export const getChildTasks = async <I, O extends ITaskResponseDoneResultOutput>(
where: {
parentId: task.id,
definitionId: definition
}
},
limit: 100000
});
for (const task of items) {
collection.push(task);

if (
task.taskStatus === TaskDataStatus.RUNNING ||
task.taskStatus === TaskDataStatus.PENDING
) {
/**
* We also need to check the actual status of the service.
* It can happen that the task is marked as running, but the service is not running.
*/
const serviceInfo = await context.tasks.fetchServiceInfo(task);
const status = mapServiceStatusToTaskStatus(task, serviceInfo);

if (status === null || !serviceInfo) {
invalid.push(task.id);
continue;
} else if (status !== task.taskStatus) {
console.error(
`Status of the task is not same as the status of the service (task: ${task.taskStatus}, service: ${status} / ${serviceInfo.status}).`
);
invalid.push(task.id);
continue;
}
running.push(task.id);
continue;
} else if (task.taskStatus === TaskDataStatus.SUCCESS) {
Expand Down
4 changes: 2 additions & 2 deletions packages/tasks/src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { createDefinitionCrud } from "./crud/definition.tasks";
import { createServiceCrud } from "~/crud/service.tasks";
import { createTaskCrud } from "./crud/crud.tasks";
import { createTestingRunTask } from "~/tasks/testingRunTask";
import { createTransportPlugins } from "~/crud/transport";
import { createServicePlugins } from "~/service";

const createTasksCrud = () => {
const plugin = new ContextPlugin<Context>(async context => {
Expand All @@ -23,7 +23,7 @@ const createTasksCrud = () => {
};

const createTasksContext = (): Plugin[] => {
return [...createTransportPlugins(), ...createTaskModel(), createTasksCrud()];
return [...createServicePlugins(), ...createTaskModel(), createTasksCrud()];
};

export const createBackgroundTaskContext = (): Plugin[] => {
Expand Down
9 changes: 6 additions & 3 deletions packages/tasks/src/crud/service.tasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import type {
} from "~/types";
import { TaskDataStatus, TaskLogItemType } from "~/types";
import { NotFoundError } from "@webiny/handler-graphql";
import { createService } from "~/service/createService";
import { createService } from "~/service";
import { IStepFunctionServiceFetchResult } from "~/service/StepFunctionServicePlugin";

const MAX_DELAY_DAYS = 355;
const MAX_DELAY_SECONDS = MAX_DELAY_DAYS * 24 * 60 * 60;
Expand Down Expand Up @@ -101,7 +102,9 @@ export const createServiceCrud = (context: Context): ITasksContextServiceObject
eventResponse: result
});
},
fetchServiceInfo: async (input: ITask | string) => {
fetchServiceInfo: async (
input: ITask | string
): Promise<IStepFunctionServiceFetchResult | null> => {
const task = typeof input === "object" ? input : await context.tasks.getTask(input);
if (!task && typeof input === "string") {
throw new NotFoundError(`Task "${input}" was not found!`);
Expand All @@ -112,7 +115,7 @@ export const createServiceCrud = (context: Context): ITasksContextServiceObject
}

try {
return await service.fetch(task);
return (await service.fetch(task)) as IStepFunctionServiceFetchResult | null;
} catch (ex) {
console.log("Service fetch error.");
console.error(ex);
Expand Down
11 changes: 5 additions & 6 deletions packages/tasks/src/plugins/TaskServicePlugin.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { Plugin } from "@webiny/plugins";
import { Context, ITask } from "~/types";
import { GenericRecord } from "@webiny/api/types";

export interface ITaskServiceCreatePluginParams {
context: Context;
Expand All @@ -10,16 +9,16 @@ export interface ITaskServiceCreatePluginParams {

export type ITaskServiceTask = Pick<ITask, "id" | "definitionId">;

export interface ITaskService<T = GenericRecord> {
send(task: ITaskServiceTask, delay: number): Promise<T>;
fetch<R>(task: ITask): Promise<R | null>;
export interface ITaskService {
send(task: ITaskServiceTask, delay: number): Promise<unknown | null>;
fetch(task: ITask): Promise<unknown | null>;
}

export interface ITaskServicePluginParams {
default?: boolean;
}

export abstract class TaskServicePlugin<T = GenericRecord> extends Plugin {
export abstract class TaskServicePlugin extends Plugin {
public static override readonly type: string = "tasks.taskService";
public readonly default: boolean;

Expand All @@ -28,5 +27,5 @@ export abstract class TaskServicePlugin<T = GenericRecord> extends Plugin {
this.default = !!params?.default;
}

public abstract createService(params: ITaskServiceCreatePluginParams): ITaskService<T>;
public abstract createService(params: ITaskServiceCreatePluginParams): ITaskService;
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { EventBridgeClient, PutEventsCommand } from "@webiny/aws-sdk/client-even
import { WebinyError } from "@webiny/error";
import { GenericRecord } from "@webiny/api/types";

class EventBridgeService implements ITaskService<PutEventsCommandOutput> {
class EventBridgeService implements ITaskService {
protected readonly context: Context;
protected readonly getTenant: () => string;
protected readonly getLocale: () => string;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import {
ITaskServiceTask,
TaskServicePlugin
} from "~/plugins";
import { GenericRecord } from "@webiny/api/types";
import {
createStepFunctionClient,
DescribeExecutionCommandOutput,
describeExecutionFactory,
triggerStepFunctionFactory
} from "@webiny/aws-sdk/client-sfn";
Expand All @@ -15,11 +15,13 @@ import { generateAlphaNumericId } from "@webiny/utils";
import { ServiceDiscovery } from "@webiny/api";
import { ITask } from "~/types";

export type IStepFunctionServiceFetchResult = DescribeExecutionCommandOutput;

export interface IDetailWrapper<T> {
detail: T;
}

class StepFunctionService implements ITaskService<GenericRecord | null> {
class StepFunctionService implements ITaskService {
private readonly getTenant: () => string;
private readonly getLocale: () => string;
private readonly trigger: ReturnType<typeof triggerStepFunctionFactory>;
Expand Down Expand Up @@ -71,7 +73,7 @@ class StepFunctionService implements ITaskService<GenericRecord | null> {
}
}

public async fetch<R>(task: ITask): Promise<R | null> {
public async fetch(task: ITask): Promise<IStepFunctionServiceFetchResult | null> {
const executionArn = task.eventResponse?.executionArn;
if (!executionArn) {
console.error(`Execution ARN not found in task "${task.id}".`);
Expand All @@ -81,7 +83,7 @@ class StepFunctionService implements ITaskService<GenericRecord | null> {
const result = await this.get({
executionArn
});
return (result || null) as R;
return result || null;
} catch (ex) {
console.log("Could not get the execution details.");
console.error(ex);
Expand All @@ -90,7 +92,7 @@ class StepFunctionService implements ITaskService<GenericRecord | null> {
}
}

export class StepFunctionServicePlugin extends TaskServicePlugin<GenericRecord | null> {
export class StepFunctionServicePlugin extends TaskServicePlugin {
public override name = "task.stepFunctionTriggerTransport";

public createService(params: ITaskServiceCreatePluginParams) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import { EventBridgeEventTransportPlugin } from "./EventBridgeEventTransportPlugin";
import { StepFunctionServicePlugin } from "./StepFunctionServicePlugin";

export const createTransportPlugins = () => {
export const createServicePlugins = () => {
return [
new StepFunctionServicePlugin({ default: true }),
new EventBridgeEventTransportPlugin()
];
};

export * from "./createService";
5 changes: 4 additions & 1 deletion packages/tasks/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
import { IIsCloseToTimeoutCallable, ITaskManagerStore } from "./runner/abstractions";
import { SecurityPermission } from "@webiny/api-security/types";
import { GenericRecord } from "@webiny/api/types";
import { IStepFunctionServiceFetchResult } from "~/service/StepFunctionServicePlugin";

export * from "./handler/types";
export * from "./response/abstractions";
Expand Down Expand Up @@ -303,7 +304,9 @@ export interface ITasksContextServiceObject {
>(
params: ITaskAbortParams
) => Promise<ITask<T, O>>;
fetchServiceInfo: (input: ITask<any, any> | string) => Promise<GenericRecord | null>;
fetchServiceInfo: (
input: ITask<any, any> | string
) => Promise<IStepFunctionServiceFetchResult | null>;
}

export interface ITasksContextObject
Expand Down

0 comments on commit af4195f

Please sign in to comment.