Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deployment concurrency limits/configurable collision strategy #2745

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions src/components/DeploymentForm.vue
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@
<p-number-input v-model="concurrencyLimit" :state="concurrencyLimitState" placeholder="Unlimited" />
</p-label>

<p-label label="Concurrency Limit Collision Strategy (Optional)" description="Configure behavior for runs once the concurrency limit is reached.">
<p-select v-model="concurrencyLimitCollisionStrategy" :options="deploymentCollisionStrategies" empty-message="ENQUEUE" />
</p-label>

<p-divider />

<template v-if="schemaHasParameters">
Expand Down Expand Up @@ -84,7 +88,7 @@
import { JobVariableOverridesInput, WorkPoolCombobox, WorkPoolQueueCombobox } from '@/components'
import ToastParameterValidationError from '@/components/ToastParameterValidationError.vue'
import { localization } from '@/localization'
import { Deployment, DeploymentUpdateV2 } from '@/models'
import { Deployment, deploymentCollisionStrategies, DeploymentUpdateV2 } from '@/models'
import { DeploymentCreate } from '@/models/DeploymentCreate'
import { SchemaInputV2 } from '@/schemas'
import { useSchemaValidation } from '@/schemas/compositions/useSchemaValidation'
Expand All @@ -104,6 +108,7 @@
const workPoolName = ref(props.deployment.workPoolName)
const workQueueName = ref(props.deployment.workQueueName)
const concurrencyLimit = ref(props.deployment.concurrencyLimit)
const concurrencyLimitCollisionStrategy = ref(props.deployment.concurrencyOptions?.collisionStrategy)
const parameters = ref(props.deployment.parameters)
const tags = ref(props.deployment.tags)
const jobVariables = ref(stringify(props.deployment.jobVariables))
Expand Down Expand Up @@ -186,6 +191,7 @@
infrastructureDocumentId: props.deployment.infrastructureDocumentId,
pullSteps: props.deployment.pullSteps,
concurrencyLimit: concurrencyLimit.value,
concurrencyOptions: concurrencyLimitCollisionStrategy.value ? { collisionStrategy: concurrencyLimitCollisionStrategy.value } : null,
}
emit('submit', deploymentCreate)
} else {
Expand All @@ -198,11 +204,10 @@
enforceParameterSchema: enforceParameterSchema.value,
jobVariables: JSON.parse(jobVariables.value),
concurrencyLimit: concurrencyLimit.value,
concurrencyOptions: concurrencyLimitCollisionStrategy.value ? { collisionStrategy: concurrencyLimitCollisionStrategy.value } : null,
}
emit('submit', deploymentUpdate)
}


}

const cancel = (): void => {
Expand Down
20 changes: 18 additions & 2 deletions src/maps/deployment.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { sortStringArray } from '@prefecthq/prefect-design'
import { DeploymentFlowRunCreate, DeploymentFlowRunRequest, DeploymentUpdateRequest, DeploymentUpdateV2 } from '@/models'
import { DeploymentApiConcurrencyOptions } from '@/models/api/DeploymentApiConcurrencyOptions'
import { DeploymentCreateRequest } from '@/models/api/DeploymentCreateRequest'
import { DeploymentResponse } from '@/models/api/DeploymentResponse'
import { Deployment } from '@/models/Deployment'
import { Deployment, DeploymentConcurrencyOptions } from '@/models/Deployment'
import { DeploymentCreate } from '@/models/DeploymentCreate'
import { createObjectLevelCan } from '@/models/ObjectLevelCan'
import { schemaV2Mapper } from '@/schemas'
Expand Down Expand Up @@ -37,10 +38,17 @@ export const mapDeploymentResponseToDeployment: MapFunction<DeploymentResponse,
can: createObjectLevelCan(),
status: this.map('ServerDeploymentStatus', source.status, 'DeploymentStatus'),
disabled: source.disabled ?? false,
concurrencyLimit: source.concurrency_limit ?? source.global_concurrency_limit?.limit ?? null,
globalConcurrencyLimit: this.map('ConcurrencyV2LimitResponse', source.global_concurrency_limit, 'ConcurrencyV2Limit'),
concurrencyOptions: source.concurrency_options == null ? null : mapDeploymentApiConcurrencyOptionsToDeploymentConcurrencyOptions(source.concurrency_options),
})
}

function mapDeploymentApiConcurrencyOptionsToDeploymentConcurrencyOptions(source: DeploymentApiConcurrencyOptions): DeploymentConcurrencyOptions {
return {
collisionStrategy: source.collision_strategy,
}
}

export const mapDeploymentUpdateV2ToDeploymentUpdateRequest: MapFunction<DeploymentUpdateV2, DeploymentUpdateRequest> = function(source) {
return {
description: source.description,
Expand All @@ -52,6 +60,7 @@ export const mapDeploymentUpdateV2ToDeploymentUpdateRequest: MapFunction<Deploym
job_variables: source.jobVariables,
enforce_parameter_schema: source.enforceParameterSchema,
concurrency_limit: source.concurrencyLimit,
concurrency_options: source.concurrencyOptions == null ? null : mapDeploymentConcurrencyOptionsToDeploymentApiConcurrencyOptions(source.concurrencyOptions),
}
}

Expand Down Expand Up @@ -93,5 +102,12 @@ export const mapDeploymentCreateToDeploymentCreateRequest: MapFunction<Deploymen
version: source.version,
paused: source.paused,
concurrency_limit: source.concurrencyLimit,
concurrency_options: source.concurrencyOptions == null ? null : mapDeploymentConcurrencyOptionsToDeploymentApiConcurrencyOptions(source.concurrencyOptions),
}
}

function mapDeploymentConcurrencyOptionsToDeploymentApiConcurrencyOptions(source: DeploymentConcurrencyOptions): DeploymentApiConcurrencyOptions {
return {
collision_strategy: source.collisionStrategy,
}
}
2 changes: 2 additions & 0 deletions src/mocks/deployment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ export const randomDeployment: MockFunction<Deployment, [Partial<Deployment>?]>
status: this.create('deploymentStatus'),
disabled: disabled,
concurrencyLimit: null,
globalConcurrencyLimit: null,
concurrencyOptions: null,
...overrides,
}
}
22 changes: 19 additions & 3 deletions src/models/Deployment.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
import { ConcurrencyV2Limit } from '@/models/ConcurrencyV2Limit'
import { CreatedOrUpdatedBy } from '@/models/CreatedOrUpdatedBy'
import { DeploymentSchedule } from '@/models/DeploymentSchedule'
import { DeploymentStatus } from '@/models/DeploymentStatus'
import { ObjectLevelCan } from '@/models/ObjectLevelCan'
import { SchemaV2, SchemaValuesV2 } from '@/schemas'
import { createTuple } from '@/utilities'

export const { values: deploymentCollisionStrategies, isValue: isDeploymentCollisionStrategy } = createTuple(['ENQUEUE', 'CANCEL_NEW'])
export type DeploymentCollisionStrategy = typeof deploymentCollisionStrategies[number]

export type DeploymentConcurrencyOptions = {
collisionStrategy: DeploymentCollisionStrategy,
}

export interface IDeployment {
id: string,
Expand Down Expand Up @@ -32,7 +41,8 @@ export interface IDeployment {
can: ObjectLevelCan<'deployment'>,
status: DeploymentStatus,
disabled: boolean,
concurrencyLimit: number | null,
globalConcurrencyLimit: ConcurrencyV2Limit | null,
concurrencyOptions: DeploymentConcurrencyOptions | null,
}

export class Deployment implements IDeployment {
Expand Down Expand Up @@ -64,7 +74,8 @@ export class Deployment implements IDeployment {
public can: ObjectLevelCan<'deployment'>
public status: DeploymentStatus
public disabled: boolean
public concurrencyLimit: number | null
public globalConcurrencyLimit: ConcurrencyV2Limit | null
public concurrencyOptions: DeploymentConcurrencyOptions | null

public constructor(deployment: IDeployment) {
this.id = deployment.id
Expand Down Expand Up @@ -94,7 +105,12 @@ export class Deployment implements IDeployment {
this.can = deployment.can
this.status = deployment.status
this.disabled = deployment.disabled
this.concurrencyLimit = deployment.concurrencyLimit
this.globalConcurrencyLimit = deployment.globalConcurrencyLimit
this.concurrencyOptions = deployment.concurrencyOptions
}

public get concurrencyLimit(): number | null {
return this.globalConcurrencyLimit?.limit ?? null
}

public get deprecated(): boolean {
Expand Down
2 changes: 2 additions & 0 deletions src/models/DeploymentCreate.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { DeploymentConcurrencyOptions } from '@/models/Deployment'
import { DeploymentSchedule } from '@/models/DeploymentSchedule'
import { SchemaValuesV2 } from '@/schemas'

Expand All @@ -22,4 +23,5 @@ export type DeploymentCreate = {
enforceParameterSchema: boolean,
pullSteps: unknown,
concurrencyLimit: number | null,
concurrencyOptions: DeploymentConcurrencyOptions | null,
}
2 changes: 2 additions & 0 deletions src/models/DeploymentUpdate.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { DeploymentConcurrencyOptions } from '@/models/Deployment'
import { SchemaValuesV2 } from '@/schemas'

type Base = {
Expand All @@ -9,6 +10,7 @@ type Base = {
jobVariables?: Record<string, unknown> | null,
enforceParameterSchema?: boolean,
concurrencyLimit?: number | null,
concurrencyOptions?: DeploymentConcurrencyOptions | null,
}

type WithoutParameters = Base & {
Expand Down
3 changes: 3 additions & 0 deletions src/models/api/DeploymentApiConcurrencyOptions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export type DeploymentApiConcurrencyOptions = {
collision_strategy: 'ENQUEUE' | 'CANCEL_NEW',
}
2 changes: 2 additions & 0 deletions src/models/api/DeploymentCreateRequest.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { DeploymentApiConcurrencyOptions } from '@/models/api/DeploymentApiConcurrencyOptions'
import { SchemaValues } from '@/schemas/types/schemaValues'

export type DeploymentCreateRequest = {
Expand All @@ -21,4 +22,5 @@ export type DeploymentCreateRequest = {
version: string | null,
paused: boolean,
concurrency_limit: number | null,
concurrency_options: DeploymentApiConcurrencyOptions | null,
}
3 changes: 3 additions & 0 deletions src/models/api/DeploymentResponse.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { ConcurrencyV2Response } from '@/models/api/ConcurrencyV2Response'
import { CreatedOrUpdatedByResponse } from '@/models/api/CreatedOrUpdatedByResponse'
import { DeploymentApiConcurrencyOptions } from '@/models/api/DeploymentApiConcurrencyOptions'
import { DeploymentScheduleResponse } from '@/models/api/DeploymentScheduleResponse'
import { ScheduleResponse } from '@/models/api/ScheduleResponse'
import { ServerDeploymentStatus } from '@/models/DeploymentStatus'
Expand Down Expand Up @@ -36,6 +37,8 @@ export type DeploymentResponse = {
pull_steps: unknown,
status: ServerDeploymentStatus,
disabled?: boolean,
/** @deprecated Prefer `global_concurrency_limit */
concurrency_limit: number | null,
global_concurrency_limit: ConcurrencyV2Response | null,
concurrency_options: DeploymentApiConcurrencyOptions | null,
}
2 changes: 2 additions & 0 deletions src/models/api/DeploymentUpdateRequest.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { ScheduleResponse } from '@/models'
import { DeploymentApiConcurrencyOptions } from '@/models/api/DeploymentApiConcurrencyOptions'
import { SchemaValues } from '@/types/schemas'

export type DeploymentUpdateRequest = Partial<{
Expand All @@ -16,4 +17,5 @@ export type DeploymentUpdateRequest = Partial<{
job_variables: Record<string, unknown> | null,
enforce_parameter_schema: boolean,
concurrency_limit: number | null,
concurrency_options: DeploymentApiConcurrencyOptions | null,
}>
Loading