Skip to content

Commit

Permalink
[Job Launcher] Cancel job (#805)
Browse files Browse the repository at this point in the history
* Added cancel job functionality

* Updated job controller

* Updated type to patch and validation params approach

* Updated job cancelation logic

* Added job cancel functionality

* Added enums webhook

* Resolved comments

* [Job Launcher] Added job id relation to the payment (#835)

* Updated payment entity and migration

* Updated job service

* Updated unit tests

* Updated unit tests
  • Loading branch information
eugenvoronov authored Aug 29, 2023
1 parent 3039dfc commit 115e88f
Show file tree
Hide file tree
Showing 14 changed files with 289 additions and 34 deletions.
12 changes: 8 additions & 4 deletions packages/apps/job-launcher/server/src/common/config/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ export const ConfigNames = {
JOB_LAUNCHER_FEE: 'JOB_LAUNCHER_FEE',
RECORDING_ORACLE_FEE: 'RECORDING_ORACLE_FEE',
REPUTATION_ORACLE_FEE: 'REPUTATION_ORACLE_FEE',
EXCHANGE_ORACLE_ADDRESS: 'EXCHANGE_ORACLE_ADDRESS',
EXCHANGE_ORACLE_WEBHOOK_URL: 'EXCHANGE_ORACLE_WEBHOOK_URL',
FORTUNE_EXCHANGE_ORACLE_ADDRESS: 'FORTUNE_EXCHANGE_ORACLE_ADDRESS',
CVAT_EXCHANGE_ORACLE_ADDRESS: 'CVAT_EXCHANGE_ORACLE_ADDRESS',
FORTUNE_EXCHANGE_ORACLE_WEBHOOK_URL: 'FORTUNE_EXCHANGE_ORACLE_WEBHOOK_URL',
CVAT_EXCHANGE_ORACLE_WEBHOOK_URL: 'CVAT_EXCHANGE_ORACLE_WEBHOOK_URL',
RECORDING_ORACLE_ADDRESS: 'RECORDING_ORACLE_ADDRESS',
REPUTATION_ORACLE_ADDRESS: 'REPUTATION_ORACLE_ADDRESS',
S3_ENDPOINT: 'S3_ENDPOINT',
Expand Down Expand Up @@ -72,8 +74,10 @@ export const envValidator = Joi.object({
JOB_LAUNCHER_FEE: Joi.string().default(10),
RECORDING_ORACLE_FEE: Joi.string().default(10),
REPUTATION_ORACLE_FEE: Joi.string().default(10),
EXCHANGE_ORACLE_ADDRESS: Joi.string().required(),
EXCHANGE_ORACLE_WEBHOOK_URL: Joi.string().default('http://localhost:3005'),
FORTUNE_EXCHANGE_ORACLE_ADDRESS: Joi.string().required(),
CVAT_EXCHANGE_ORACLE_ADDRESS: Joi.string().required(),
FORTUNE_EXCHANGE_ORACLE_WEBHOOK_URL: Joi.string().default('http://localhost:3004'),
CVAT_EXCHANGE_ORACLE_WEBHOOK_URL: Joi.string().default('http://localhost:3005'),
RECORDING_ORACLE_ADDRESS: Joi.string().required(),
REPUTATION_ORACLE_ADDRESS: Joi.string().required(),
// S3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ export enum ErrorEscrow {
NotFound = 'Escrow not found',
NotCreated = 'Escrow has not been created',
NotLaunched = 'Escrow has not been launched',
InvalidStatusCancellation = 'Escrow has an invalid status for cancellation',
InvalidBalanceCancellation = 'Escrow has an invalid balance for cancellation'
}

/**
Expand Down
8 changes: 5 additions & 3 deletions packages/apps/job-launcher/server/src/common/enums/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@ export enum JobStatus {
PENDING = 'PENDING',
PAID = 'PAID',
LAUNCHED = 'LAUNCHED',
COMPLETED = 'COMPLETED',
FAILED = 'FAILED',
TO_CANCEL = 'TO_CANCEL',
CANCELED = 'CANCELED',
}

export enum JobStatusFilter {
PENDING = 'PENDING',
PAID = 'PAID',
LAUNCHED = 'LAUNCHED',
COMPLETED = 'COMPLETED',
FAILED = 'FAILED',
CANCELLED = 'CANCELLED',
TO_CANCEL = 'TO_CANCEL',
CANCELED = 'CANCELED',
}

export enum JobRequestType {
Expand Down
4 changes: 4 additions & 0 deletions packages/apps/job-launcher/server/src/common/enums/webhook.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export enum EventType {
ESCROW_CREATED = 'escrow_created',
TASK_CREATION_FAILED = 'task_creation_failed',
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ import { ConfigNames } from '../common/config';
keepConnectionAlive:
configService.get<string>(ConfigNames.NODE_ENV) === 'test',
migrationsRun: false,
ssl: configService.get<boolean>(ConfigNames.POSTGRES_SSL, false),
ssl: configService.get<string>(ConfigNames.POSTGRES_SSL)!.toLowerCase() === 'true',
};
},
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ export class InitialMigration1691485394906 implements MigrationInterface {
"source" "hmt"."payments_source_enum" NOT NULL,
"status" "hmt"."payments_status_enum" NOT NULL,
"user_id" integer NOT NULL,
"job_id" integer,
CONSTRAINT "REL_f83af8ea8055b85bde0e095e40" UNIQUE ("job_id"),
CONSTRAINT "PK_197ab7af18c93fbb0c9b28b4a59" PRIMARY KEY ("id")
)
`);
Expand All @@ -51,8 +53,9 @@ export class InitialMigration1691485394906 implements MigrationInterface {
'PENDING',
'PAID',
'LAUNCHED',
'COMPLETED',
'FAILED'
'FAILED',
'TO_CANCEL',
'CANCELED'
)
`);
await queryRunner.query(`
Expand Down Expand Up @@ -131,6 +134,10 @@ export class InitialMigration1691485394906 implements MigrationInterface {
ALTER TABLE "hmt"."jobs"
ADD CONSTRAINT "FK_9027c8f0ba75fbc1ac46647d043" FOREIGN KEY ("user_id") REFERENCES "hmt"."users"("id") ON DELETE NO ACTION ON UPDATE NO ACTION
`);
await queryRunner.query(`
ALTER TABLE "hmt"."payments"
ADD CONSTRAINT "FK_f83af8ea8055b85bde0e095e400" FOREIGN KEY ("job_id") REFERENCES "hmt"."jobs"("id") ON DELETE NO ACTION ON UPDATE NO ACTION
`);
await queryRunner.query(`
ALTER TABLE "hmt"."tokens"
ADD CONSTRAINT "FK_8769073e38c365f315426554ca5" FOREIGN KEY ("user_id") REFERENCES "hmt"."users"("id") ON DELETE NO ACTION ON UPDATE NO ACTION
Expand All @@ -151,6 +158,9 @@ export class InitialMigration1691485394906 implements MigrationInterface {
await queryRunner.query(`
ALTER TABLE "hmt"."jobs" DROP CONSTRAINT "FK_9027c8f0ba75fbc1ac46647d043"
`);
await queryRunner.query(`
ALTER TABLE "hmt"."payments" DROP CONSTRAINT "FK_f83af8ea8055b85bde0e095e400"
`);
await queryRunner.query(`
ALTER TABLE "hmt"."payments" DROP CONSTRAINT "FK_427785468fb7d2733f59e7d7d39"
`);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import {
Controller,
DefaultValuePipe,
Get,
Param,
Patch,
Post,
Query,
Request,
Expand All @@ -11,7 +13,7 @@ import {
import { ApiBearerAuth, ApiQuery, ApiTags } from '@nestjs/swagger';
import { JwtAuthGuard } from '../../common/guards';
import { RequestWithUser } from '../../common/types';
import { JobFortuneDto, JobCvatDto, JobListDto } from './job.dto';
import { JobFortuneDto, JobCvatDto, JobListDto, JobCancelDto } from './job.dto';
import { JobService } from './job.service';
import { JobRequestType, JobStatusFilter } from '../../common/enums/job';
import { Public } from '../../common/decorators';
Expand Down Expand Up @@ -65,4 +67,12 @@ export class JobController {
public async launchCronJob(): Promise<any> {
return this.jobService.launchCronJob();
}

@Patch('/cancel/:id')
public async cancelJob(
@Request() req: RequestWithUser,
@Param() params: JobCancelDto,
): Promise<boolean> {
return this.jobService.requestToCancelJob(req.user.id, params.id);
}
}
67 changes: 67 additions & 0 deletions packages/apps/job-launcher/server/src/modules/job/job.cron.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import { Injectable, Logger } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { InjectRepository } from '@nestjs/typeorm';
import { LessThanOrEqual, Repository } from 'typeorm';
import { SortDirection } from '../../common/enums/collection';
import { JOB_RETRIES_COUNT_THRESHOLD } from '../../common/constants';
import { JobStatus } from '../../common/enums/job';
import { JobEntity } from './job.entity';
import { JobService } from './job.service';

@Injectable()
export class JobCron {
private readonly logger = new Logger(JobCron.name);

constructor(
private readonly jobService: JobService,
@InjectRepository(JobEntity)
private readonly jobEntityRepository: Repository<JobEntity>,
) {}

@Cron(CronExpression.EVERY_10_SECONDS)
public async launchJob() {
try {
// TODO: Add retry policy and process failure requests https://github.com/humanprotocol/human-protocol/issues/334
const jobEntity = await this.jobEntityRepository.findOne({
where: {
status: JobStatus.PAID,
retriesCount: LessThanOrEqual(JOB_RETRIES_COUNT_THRESHOLD),
waitUntil: LessThanOrEqual(new Date()),
},
order: {
waitUntil: SortDirection.ASC,
},
});

if (!jobEntity) return;

await this.jobService.launchJob(jobEntity);
} catch (e) {
this.logger.error(e);
return;
}
}

@Cron(CronExpression.EVERY_10_SECONDS)
public async cancelJob() {
try {
const jobEntity = await this.jobEntityRepository.findOne({
where: {
status: JobStatus.TO_CANCEL,
retriesCount: LessThanOrEqual(JOB_RETRIES_COUNT_THRESHOLD),
waitUntil: LessThanOrEqual(new Date()),
},
order: {
waitUntil: SortDirection.ASC,
},
});

if (!jobEntity) return;

await this.jobService.cancelJob(jobEntity);
} catch (e) {
this.logger.error(e);
return;
}
}
}
9 changes: 9 additions & 0 deletions packages/apps/job-launcher/server/src/modules/job/job.dto.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ import {
IsDate,
IsOptional,
IsObject,
IsNumberString
} from 'class-validator';
import { ChainId } from '@human-protocol/sdk';
import {
JobRequestType,
JobStatus,
JobStatusFilter,
} from '../../common/enums/job';
import { EventType } from '../../common/enums/webhook';

export class JobCreateDto {
public chainId: ChainId;
Expand Down Expand Up @@ -82,6 +84,12 @@ export class JobCvatDto extends JobDto {
public jobBounty: string;
}

export class JobCancelDto {
@ApiProperty()
@IsNumberString()
public id: number;
}

export class JobUpdateDto {
@ApiPropertyOptional({
enum: JobStatus,
Expand All @@ -106,6 +114,7 @@ export class SaveManifestDto {
export class SendWebhookDto {
public escrowAddress: string;
public chainId: number;
public eventType: EventType;
}

export class FortuneManifestDto {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { Column, Entity, Index, ManyToOne } from 'typeorm';
import { Column, Entity, Index, ManyToOne, OneToOne } from 'typeorm';

import { NS } from '../../common/constants';
import { IJob } from '../../common/interfaces';
import { JobStatus } from '../../common/enums/job';
import { BaseEntity } from '../../database/base.entity';
import { UserEntity } from '../user/user.entity';
import { PaymentEntity } from '../payment/payment.entity';

@Entity({ schema: NS, name: 'jobs' })
@Index(['chainId', 'escrowAddress'], { unique: true })
Expand Down Expand Up @@ -39,6 +40,9 @@ export class JobEntity extends BaseEntity implements IJob {
@Column({ type: 'int' })
public userId: number;

@OneToOne(() => PaymentEntity, (payment) => payment.job)
public payment: PaymentEntity;

@Column({ type: 'int', default: 0 })
public retriesCount: number;

Expand Down
Loading

1 comment on commit 115e88f

@vercel
Copy link

@vercel vercel bot commented on 115e88f Aug 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

job-launcher-server – ./packages/apps/job-launcher/server

job-launcher-server-nine.vercel.app
job-launcher-server-humanprotocol.vercel.app
job-launcher-server-git-develop-humanprotocol.vercel.app

Please sign in to comment.