diff --git a/docs/3.0/api-ref/rest-api/server/schema.json b/docs/3.0/api-ref/rest-api/server/schema.json index 807d0283b011..5203926be3be 100644 --- a/docs/3.0/api-ref/rest-api/server/schema.json +++ b/docs/3.0/api-ref/rest-api/server/schema.json @@ -15449,7 +15449,8 @@ "type": "null" } ], - "description": "Filter criteria for `Deployment.concurrency`" + "description": "DEPRECATED: Prefer `Deployment.concurrency_limit_id` over `Deployment.concurrency_limit`. If provided, will be ignored for backwards-compatibility. Will be removed after December 2024.", + "deprecated": true } }, "additionalProperties": false, @@ -15499,7 +15500,7 @@ "additionalProperties": false, "type": "object", "title": "DeploymentFilterConcurrencyLimit", - "description": "Filter by `Deployment.concurrency_limit`." + "description": "DEPRECATED: Prefer `Deployment.concurrency_limit_id` over `Deployment.concurrency_limit`." }, "DeploymentFilterId": { "properties": { @@ -15933,7 +15934,19 @@ } ], "title": "Concurrency Limit", - "description": "The maximum number of flow runs that can be active at once." + "description": "DEPRECATED: Prefer `global_concurrency_limit`. Will always be None for backwards compatibility. Will be removed after December 2024.", + "deprecated": true + }, + "global_concurrency_limit": { + "anyOf": [ + { + "$ref": "#/components/schemas/GlobalConcurrencyLimitResponse" + }, + { + "type": "null" + } + ], + "description": "The global concurrency limit object for enforcing the maximum number of flow runs that can be active at once." }, "concurrency_options": { "anyOf": [ diff --git a/src/prefect/client/schemas/filters.py b/src/prefect/client/schemas/filters.py index b6cbaac85d1e..444b402ad0fc 100644 --- a/src/prefect/client/schemas/filters.py +++ b/src/prefect/client/schemas/filters.py @@ -506,7 +506,7 @@ class DeploymentFilterTags(PrefectBaseModel, OperatorMixin): class DeploymentFilterConcurrencyLimit(PrefectBaseModel): - """Filter by `Deployment.concurrency_limit`.""" + """DEPRECATED: Prefer `Deployment.concurrency_limit_id` over `Deployment.concurrency_limit`.""" ge_: Optional[int] = Field( default=None, @@ -538,7 +538,9 @@ class DeploymentFilter(PrefectBaseModel, OperatorMixin): default=None, description="Filter criteria for `Deployment.work_queue_name`" ) concurrency_limit: Optional[DeploymentFilterConcurrencyLimit] = Field( - default=None, description="Filter criteria for `Deployment.concurrency_limit`" + default=None, + description="DEPRECATED: Prefer `Deployment.concurrency_limit_id` over `Deployment.concurrency_limit`. If provided, will be ignored for backwards-compatibility. Will be removed after December 2024.", + deprecated=True, ) diff --git a/src/prefect/client/schemas/responses.py b/src/prefect/client/schemas/responses.py index 91eeaa973e1f..08e7b4f82115 100644 --- a/src/prefect/client/schemas/responses.py +++ b/src/prefect/client/schemas/responses.py @@ -314,7 +314,13 @@ class DeploymentResponse(ObjectBaseModel): default=..., description="The flow id associated with the deployment." ) concurrency_limit: Optional[int] = Field( - default=None, description="The concurrency limit for the deployment." + default=None, + description="DEPRECATED: Prefer `global_concurrency_limit`. Will always be None for backwards compatibility. Will be removed after December 2024.", + deprecated=True, + ) + global_concurrency_limit: Optional["GlobalConcurrencyLimitResponse"] = Field( + default=None, + description="The global concurrency limit object for enforcing the maximum number of flow runs that can be active at once.", ) concurrency_options: Optional[objects.ConcurrencyOptions] = Field( default=None, diff --git a/src/prefect/runner/runner.py b/src/prefect/runner/runner.py index cac85d54aa16..f1b28d0d9830 100644 --- a/src/prefect/runner/runner.py +++ b/src/prefect/runner/runner.py @@ -1049,15 +1049,9 @@ async def _submit_run_and_capture_errors( if flow_run.deployment_id: deployment = await self._client.read_deployment(flow_run.deployment_id) - if deployment and deployment.concurrency_limit: - limit_name = f"deployment:{deployment.id}" + if deployment and deployment.global_concurrency_limit: + limit_name = deployment.global_concurrency_limit.name concurrency_ctx = concurrency - - # ensure that the global concurrency limit is available - # and up-to-date before attempting to acquire a slot - await self._client.upsert_global_concurrency_limit_by_name( - limit_name, deployment.concurrency_limit - ) else: limit_name = "" concurrency_ctx = asyncnullcontext diff --git a/src/prefect/server/database/migrations/MIGRATION-NOTES.md b/src/prefect/server/database/migrations/MIGRATION-NOTES.md index 7c4b55473211..d6d04831ed31 100644 --- a/src/prefect/server/database/migrations/MIGRATION-NOTES.md +++ b/src/prefect/server/database/migrations/MIGRATION-NOTES.md @@ -8,8 +8,11 @@ Each time a database migration is written, an entry is included here with: This gives us a history of changes and will create merge conflicts if two migrations are made at once, flagging situations where a branch needs to be updated before merging. -# Adds `concurrency_options` to `Deployments` +# Migrate `Deployment.concurrency_limit` to a foreign key `Deployment.concurrency_limit_id` +SQLite: `4ad4658cbefe` +Postgres: `eaec5004771f` +# Adds `concurrency_options` to `Deployments` SQLite: `7d6350aea855` Postgres: `555ed31b284d` diff --git a/src/prefect/server/database/migrations/versions/postgresql/2024_09_16_152051_eaec5004771f_add_deployment_to_global_concurrency_.py b/src/prefect/server/database/migrations/versions/postgresql/2024_09_16_152051_eaec5004771f_add_deployment_to_global_concurrency_.py new file mode 100644 index 000000000000..6f23b07842ce --- /dev/null +++ b/src/prefect/server/database/migrations/versions/postgresql/2024_09_16_152051_eaec5004771f_add_deployment_to_global_concurrency_.py @@ -0,0 +1,62 @@ +"""Add deployment to global concurrency limit FK + +Revision ID: eaec5004771f +Revises: 555ed31b284d +Create Date: 2024-09-16 15:20:51.582204 + +""" +import sqlalchemy as sa +from alembic import op + +import prefect + +# revision identifiers, used by Alembic. +revision = "eaec5004771f" +down_revision = "555ed31b284d" +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column( + "deployment", + sa.Column( + "concurrency_limit_id", + prefect.server.utilities.database.UUID(), + nullable=True, + ), + ) + + op.create_foreign_key( + op.f("fk_deployment__concurrency_limit_id__concurrency_limit_v2"), + "deployment", + "concurrency_limit_v2", + ["concurrency_limit_id"], + ["id"], + ondelete="SET NULL", + ) + + # migrate existing data + sql = sa.text( + """ + WITH deployment_limit_mapping AS ( + SELECT d.id AS deployment_id, l.id AS limit_id + FROM deployment d + JOIN concurrency_limit_v2 l ON l.name = 'deployment:' || d.id::text + ) + UPDATE deployment + SET concurrency_limit_id = dlm.limit_id + FROM deployment_limit_mapping dlm + WHERE deployment.id = dlm.deployment_id; + """ + ) + op.execute(sql) + + +def downgrade(): + op.drop_constraint( + op.f("fk_deployment__concurrency_limit_id__concurrency_limit_v2"), + "deployment", + type_="foreignkey", + ) + op.drop_column("deployment", "concurrency_limit_id") diff --git a/src/prefect/server/database/migrations/versions/sqlite/2024_09_16_162719_4ad4658cbefe_add_deployment_to_global_concurrency_.py b/src/prefect/server/database/migrations/versions/sqlite/2024_09_16_162719_4ad4658cbefe_add_deployment_to_global_concurrency_.py new file mode 100644 index 000000000000..bd3a6c197ebb --- /dev/null +++ b/src/prefect/server/database/migrations/versions/sqlite/2024_09_16_162719_4ad4658cbefe_add_deployment_to_global_concurrency_.py @@ -0,0 +1,60 @@ +"""Add deployment to global concurrency limit FK + +Revision ID: 4ad4658cbefe +Revises: 7d6350aea855 +Create Date: 2024-09-16 16:27:19.451150 + +""" +import sqlalchemy as sa +from alembic import op + +import prefect + +# revision identifiers, used by Alembic. +revision = "4ad4658cbefe" +down_revision = "7d6350aea855" +branch_labels = None +depends_on = None + + +def upgrade(): + with op.batch_alter_table("deployment", schema=None) as batch_op: + batch_op.add_column( + sa.Column( + "concurrency_limit_id", + prefect.server.utilities.database.UUID(), + nullable=True, + ) + ) + batch_op.create_foreign_key( + batch_op.f("fk_deployment__concurrency_limit_id__concurrency_limit_v2"), + "concurrency_limit_v2", + ["concurrency_limit_id"], + ["id"], + ondelete="SET NULL", + ) + + # migrate existing data + sql = sa.text( + """ + WITH deployment_limit_mapping AS ( + SELECT d.id AS deployment_id, l.id AS limit_id + FROM deployment d + JOIN concurrency_limit_v2 l ON l.name = 'deployment:' || d.id + ) + UPDATE deployment + SET concurrency_limit_id = dlm.limit_id + FROM deployment_limit_mapping dlm + WHERE deployment.id = dlm.deployment_id; + """ + ) + op.execute(sql) + + +def downgrade(): + with op.batch_alter_table("deployment", schema=None) as batch_op: + batch_op.drop_constraint( + batch_op.f("fk_deployment__concurrency_limit_id__concurrency_limit_v2"), + type_="foreignkey", + ) + batch_op.drop_column("concurrency_limit_id") diff --git a/src/prefect/server/database/orm_models.py b/src/prefect/server/database/orm_models.py index 3f4777477623..818bb52e3d38 100644 --- a/src/prefect/server/database/orm_models.py +++ b/src/prefect/server/database/orm_models.py @@ -877,11 +877,19 @@ def job_variables(self): order_by=sa.desc(sa.text("updated")), ) - concurrency_limit: Mapped[Union[int, None]] = mapped_column( - sa.Integer, - server_default=None, + # deprecated in favor of `concurrency_limit_id` FK + _concurrency_limit: Mapped[Union[int, None]] = mapped_column( + sa.Integer, default=None, nullable=True, name="concurrency_limit" + ) + concurrency_limit_id: Mapped[Union[uuid.UUID, None]] = mapped_column( + UUID, + sa.ForeignKey("concurrency_limit_v2.id", ondelete="SET NULL"), nullable=True, - default=None, + ) + global_concurrency_limit: Mapped[ + Union["ConcurrencyLimitV2", None] + ] = sa.orm.relationship( + lazy="selectin", ) concurrency_options: Mapped[ Union[schemas.core.ConcurrencyOptions, None] @@ -891,6 +899,7 @@ def job_variables(self): nullable=True, default=None, ) + tags: Mapped[List[str]] = mapped_column( JSON, server_default="[]", default=list, nullable=False ) @@ -984,7 +993,7 @@ class ConcurrencyLimitV2(Base): active = sa.Column(sa.Boolean, nullable=False, default=True) name = sa.Column(sa.String, nullable=False) limit = sa.Column(sa.Integer, nullable=False) - active_slots = sa.Column(sa.Integer, nullable=False) + active_slots = sa.Column(sa.Integer, nullable=False, default=0) denied_slots = sa.Column(sa.Integer, nullable=False, default=0) slot_decay_per_second = sa.Column(sa.Float, default=0.0, nullable=False) diff --git a/src/prefect/server/models/deployments.py b/src/prefect/server/models/deployments.py index eb88d0d942bb..a1425c483e2c 100644 --- a/src/prefect/server/models/deployments.py +++ b/src/prefect/server/models/deployments.py @@ -83,7 +83,7 @@ async def create_deployment( schedules = deployment.schedules insert_values = deployment.model_dump_for_orm( - exclude_unset=True, exclude={"schedules"} + exclude_unset=True, exclude={"schedules", "concurrency_limit"} ) # The job_variables field in client and server schemas is named @@ -155,6 +155,10 @@ async def create_deployment( ], ) + await _create_or_update_deployment_concurrency_limit( + session, deployment_id, deployment.concurrency_limit + ) + query = ( sa.select(orm_models.Deployment) .where( @@ -194,7 +198,7 @@ async def update_deployment( # the user, ignoring any defaults on the model update_data = deployment.model_dump_for_orm( exclude_unset=True, - exclude={"work_pool_name"}, + exclude={"work_pool_name", "concurrency_limit"}, ) # The job_variables field in client and server schemas is named @@ -263,9 +267,41 @@ async def update_deployment( ], ) + await _create_or_update_deployment_concurrency_limit( + session, deployment_id, deployment.concurrency_limit + ) + return result.rowcount > 0 +async def _create_or_update_deployment_concurrency_limit( + session: AsyncSession, deployment_id: UUID, limit: Optional[int] +): + deployment = await session.get(orm_models.Deployment, deployment_id) + assert deployment is not None + + if ( + deployment.global_concurrency_limit + and deployment.global_concurrency_limit.limit == limit + ) or (deployment.global_concurrency_limit is None and limit is None): + return + + deployment._concurrency_limit = limit + if limit is None: + await _delete_related_concurrency_limit( + session=session, deployment_id=deployment_id + ) + await session.refresh(deployment) + elif deployment.global_concurrency_limit: + deployment.global_concurrency_limit.limit = limit + else: + limit_name = f"deployment:{deployment_id}" + new_limit = orm_models.ConcurrencyLimitV2(name=limit_name, limit=limit) + deployment.global_concurrency_limit = new_limit + + session.add(deployment) + + async def read_deployment( session: AsyncSession, deployment_id: UUID ) -> Optional[orm_models.Deployment]: @@ -482,12 +518,27 @@ async def delete_deployment(session: AsyncSession, deployment_id: UUID) -> bool: session=session, deployment_id=deployment_id, auto_scheduled_only=False ) + await _delete_related_concurrency_limit( + session=session, deployment_id=deployment_id + ) + result = await session.execute( delete(orm_models.Deployment).where(orm_models.Deployment.id == deployment_id) ) return result.rowcount > 0 +async def _delete_related_concurrency_limit(session: AsyncSession, deployment_id: UUID): + return await session.execute( + delete(orm_models.ConcurrencyLimitV2).where( + orm_models.ConcurrencyLimitV2.id + == sa.select(orm_models.Deployment.concurrency_limit_id) + .where(orm_models.Deployment.id == deployment_id) + .scalar_subquery() + ) + ) + + async def schedule_runs( session: AsyncSession, deployment_id: UUID, diff --git a/src/prefect/server/schemas/filters.py b/src/prefect/server/schemas/filters.py index 7dd5d3c87ea9..0f3161d4a65e 100644 --- a/src/prefect/server/schemas/filters.py +++ b/src/prefect/server/schemas/filters.py @@ -997,7 +997,7 @@ def _get_filter_list(self) -> List: class DeploymentFilterConcurrencyLimit(PrefectFilterBaseModel): - """Filter by `Deployment.concurrency_limit`.""" + """DEPRECATED: Prefer `Deployment.concurrency_limit_id` over `Deployment.concurrency_limit`.""" ge_: Optional[int] = Field( default=None, @@ -1014,18 +1014,9 @@ class DeploymentFilterConcurrencyLimit(PrefectFilterBaseModel): ) def _get_filter_list(self) -> List: - filters = [] - if self.ge_ is not None: - filters.append(orm_models.Deployment.concurrency_limit >= self.ge_) - if self.le_ is not None: - filters.append(orm_models.Deployment.concurrency_limit <= self.le_) - if self.is_null_ is not None: - filters.append( - orm_models.Deployment.concurrency_limit.is_(None) - if self.is_null_ - else orm_models.Deployment.concurrency_limit.is_not(None) - ) - return filters + # This used to filter on an `int` column that was moved to a `ForeignKey` relationship + # This filter is now deprecated rather than support filtering on the new relationship + return [] class DeploymentFilterTags(PrefectOperatorFilterBaseModel): @@ -1080,7 +1071,9 @@ class DeploymentFilter(PrefectOperatorFilterBaseModel): default=None, description="Filter criteria for `Deployment.work_queue_name`" ) concurrency_limit: Optional[DeploymentFilterConcurrencyLimit] = Field( - default=None, description="Filter criteria for `Deployment.concurrency`" + default=None, + description="DEPRECATED: Prefer `Deployment.concurrency_limit_id` over `Deployment.concurrency_limit`. If provided, will be ignored for backwards-compatibility. Will be removed after December 2024.", + deprecated=True, ) def _get_filter_list(self) -> List: @@ -1097,8 +1090,7 @@ def _get_filter_list(self) -> List: filters.append(self.tags.as_sql_filter()) if self.work_queue_name is not None: filters.append(self.work_queue_name.as_sql_filter()) - if self.concurrency_limit is not None: - filters.append(self.concurrency_limit.as_sql_filter()) + return filters diff --git a/src/prefect/server/schemas/responses.py b/src/prefect/server/schemas/responses.py index 47d95168546d..fd8d7cab0b5f 100644 --- a/src/prefect/server/schemas/responses.py +++ b/src/prefect/server/schemas/responses.py @@ -362,7 +362,12 @@ class DeploymentResponse(ORMBaseModel): ) concurrency_limit: Optional[int] = Field( default=None, - description="The maximum number of flow runs that can be active at once.", + description="DEPRECATED: Prefer `global_concurrency_limit`. Will always be None for backwards compatibility. Will be removed after December 2024.", + deprecated=True, + ) + global_concurrency_limit: Optional["GlobalConcurrencyLimitResponse"] = Field( + default=None, + description="The global concurrency limit object for enforcing the maximum number of flow runs that can be active at once.", ) concurrency_options: Optional[schemas.core.ConcurrencyOptions] = Field( default=None, diff --git a/src/prefect/workers/base.py b/src/prefect/workers/base.py index abb6831c03a6..55067e11ed8a 100644 --- a/src/prefect/workers/base.py +++ b/src/prefect/workers/base.py @@ -869,15 +869,9 @@ async def _submit_run_and_capture_errors( if flow_run.deployment_id: deployment = await self._client.read_deployment(flow_run.deployment_id) - if deployment and deployment.concurrency_limit: - limit_name = f"deployment:{deployment.id}" + if deployment and deployment.global_concurrency_limit: + limit_name = deployment.global_concurrency_limit.name concurrency_ctx = concurrency - - # ensure that the global concurrency limit is available - # and up-to-date before attempting to acquire a slot - await self._client.upsert_global_concurrency_limit_by_name( - limit_name, deployment.concurrency_limit - ) else: limit_name = "" concurrency_ctx = asyncnullcontext diff --git a/tests/cli/test_deploy.py b/tests/cli/test_deploy.py index 482a903c68ff..661494a4ce4a 100644 --- a/tests/cli/test_deploy.py +++ b/tests/cli/test_deploy.py @@ -269,7 +269,7 @@ def uninitialized_project_dir_with_git_with_remote( ) return uninitialized_project_dir_with_git_no_remote - async def test_project_deploy(self, project_dir, prefect_client): + async def test_project_deploy(self, project_dir, prefect_client: PrefectClient): await prefect_client.create_work_pool( WorkPoolCreate(name="test-pool", type="test") ) @@ -293,7 +293,7 @@ async def test_project_deploy(self, project_dir, prefect_client): assert deployment.work_pool_name == "test-pool" assert deployment.version == "1.0.0" assert deployment.tags == ["foo-bar"] - assert deployment.concurrency_limit == 42 + assert deployment.global_concurrency_limit.limit == 42 assert deployment.job_variables == {"env": "prod"} assert deployment.enforce_parameter_schema @@ -4138,7 +4138,7 @@ async def test_deploy_warns_with_single_deployment_and_multiple_names( @pytest.mark.usefixtures("project_dir") async def test_concurrency_limit_config_deployment_yaml( - self, work_pool, prefect_client + self, work_pool, prefect_client: PrefectClient ): concurrency_limit_config = {"limit": 42, "collision_strategy": "CANCEL_NEW"} @@ -4162,7 +4162,12 @@ async def test_concurrency_limit_config_deployment_yaml( "An important name/test-name" ) - assert deployment.concurrency_limit == concurrency_limit_config["limit"] + assert deployment.global_concurrency_limit is not None + assert ( + deployment.global_concurrency_limit.limit + == concurrency_limit_config["limit"] + ) + assert deployment.concurrency_options is not None assert ( deployment.concurrency_options.collision_strategy == concurrency_limit_config["collision_strategy"] diff --git a/tests/client/test_prefect_client.py b/tests/client/test_prefect_client.py index 531484e6b178..457c3238396d 100644 --- a/tests/client/test_prefect_client.py +++ b/tests/client/test_prefect_client.py @@ -654,7 +654,7 @@ def foo(): assert lookup.schedules[0].schedule == schedule.schedule assert lookup.schedules[0].active == schedule.active assert lookup.schedules[0].deployment_id == deployment_id - assert lookup.concurrency_limit == 42 + assert lookup.global_concurrency_limit.limit == 42 assert lookup.parameters == {"foo": "bar"} assert lookup.tags == ["foo", "bar"] assert lookup.storage_document_id == storage_document_id @@ -698,7 +698,7 @@ def foo(): updated_deployment = await prefect_client.read_deployment(deployment_id) # tags and concurrency should be updated assert updated_deployment.tags == ["new", "tags"] - assert updated_deployment.concurrency_limit == 42 + assert updated_deployment.global_concurrency_limit.limit == 42 # everything else should be the same assert updated_deployment.id == deployment.id assert updated_deployment.name == deployment.name diff --git a/tests/runner/test_runner.py b/tests/runner/test_runner.py index 363a86f4dffe..858c612b69a8 100644 --- a/tests/runner/test_runner.py +++ b/tests/runner/test_runner.py @@ -1443,7 +1443,7 @@ async def test_apply(self, prefect_client: PrefectClient): assert deployment.enforce_parameter_schema assert deployment.job_variables == {} assert deployment.paused is False - assert deployment.concurrency_limit is None + assert deployment.global_concurrency_limit is None async def test_apply_with_work_pool(self, prefect_client: PrefectClient, work_pool): deployment = RunnerDeployment.from_flow( diff --git a/tests/server/models/test_concurrency_limits_v2.py b/tests/server/models/test_concurrency_limits_v2.py index 3e50153b733f..05379c046d40 100644 --- a/tests/server/models/test_concurrency_limits_v2.py +++ b/tests/server/models/test_concurrency_limits_v2.py @@ -6,6 +6,7 @@ from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.asyncio import AsyncSession +from prefect.server import models, schemas from prefect.server.database.interface import PrefectDBInterface from prefect.server.models.concurrency_limits_v2 import ( MINIMUM_OCCUPANCY_SECONDS_PER_SLOT, @@ -271,6 +272,26 @@ async def test_delete_concurrency_limit_by_id( ) +async def test_delete_concurrency_limit_used_for_deployment_concurrency_limiting( + session: AsyncSession, deployment +): + await models.deployments.update_deployment( + session, deployment.id, schemas.actions.DeploymentUpdate(concurrency_limit=6) + ) + await session.commit() + await session.refresh(deployment) + assert deployment.global_concurrency_limit is not None + + assert await delete_concurrency_limit( + session, concurrency_limit_id=deployment.concurrency_limit_id + ) + await session.commit() + + await session.refresh(deployment) + assert deployment.global_concurrency_limit is None + assert deployment.concurrency_limit_id is None + + async def test_update_concurrency_limit_with_invalid_name_raises( concurrency_limit: ConcurrencyLimitV2, session: AsyncSession ): diff --git a/tests/server/models/test_deployments.py b/tests/server/models/test_deployments.py index fd7928870486..27e4de9d0731 100644 --- a/tests/server/models/test_deployments.py +++ b/tests/server/models/test_deployments.py @@ -30,7 +30,7 @@ async def test_create_deployment_succeeds(self, session, flow): assert deployment.flow_id == flow.id assert deployment.parameters == {"foo": "bar"} assert deployment.tags == ["foo", "bar"] - assert deployment.concurrency_limit is None + assert deployment.global_concurrency_limit is None async def test_creating_a_deployment_with_existing_work_queue_is_ok( self, session, flow, work_queue @@ -245,6 +245,53 @@ async def test_create_deployment_with_updated_by(self, session, flow): assert updated_deployment.updated_by.type == new_updated_by.type async def test_create_deployment_with_concurrency_limit(self, session, flow): + deployment = await models.deployments.create_deployment( + session=session, + deployment=schemas.core.Deployment( + name="My Deployment", + flow_id=flow.id, + concurrency_limit=2, + ), + ) + assert deployment is not None + assert deployment._concurrency_limit == 2 + + assert deployment.global_concurrency_limit is not None + assert deployment.global_concurrency_limit.limit == 2 + + async def test_create_deployment_can_remove_concurrency_limit_on_upsert( + self, session, deployment + ): + await models.deployments.update_deployment( + session, + deployment.id, + schemas.actions.DeploymentUpdate(concurrency_limit=5), + ) + await session.commit() + assert deployment.global_concurrency_limit is not None + assert deployment.global_concurrency_limit.limit == 5 + gcl_id = deployment.concurrency_limit_id + + updated_deployment = await models.deployments.create_deployment( + session=session, + deployment=schemas.core.Deployment( + id=deployment.id, + name=deployment.name, + flow_id=deployment.flow_id, + concurrency_limit=None, + ), + ) + + assert updated_deployment.global_concurrency_limit is None + assert updated_deployment.concurrency_limit_id is None + assert updated_deployment._concurrency_limit is None + + assert ( + await models.concurrency_limits_v2.read_concurrency_limit(session, gcl_id) + is None + ), "Expected the concurrency limit to be deleted, but it was not" + + async def test_create_deployment_with_concurrency_options(self, session, flow): concurrency_options = schemas.core.ConcurrencyOptions( collision_strategy="ENQUEUE", ) @@ -257,7 +304,8 @@ async def test_create_deployment_with_concurrency_limit(self, session, flow): concurrency_options=concurrency_options, ), ) - assert deployment.concurrency_limit == 42 + assert deployment._concurrency_limit == 42 + assert deployment.global_concurrency_limit.limit == 42 assert ( deployment.concurrency_options.collision_strategy == concurrency_options.collision_strategy @@ -579,6 +627,38 @@ async def test_delete_deployment_returns_false_if_does_not_exist(self, session): ) assert result is False + async def test_delete_deployment_with_concurrency_limit(self, session, flow): + deployment = await models.deployments.create_deployment( + session=session, + deployment=schemas.core.Deployment( + name="My Deployment", + flow_id=flow.id, + concurrency_limit=2, + ), + ) + assert deployment is not None + assert deployment._concurrency_limit == 2 + + assert deployment.global_concurrency_limit is not None + assert deployment.global_concurrency_limit.limit == 2 + + assert await models.deployments.delete_deployment( + session=session, deployment_id=deployment.id + ) + await session.commit() + + # make sure the deployment is deleted + result = await models.deployments.read_deployment( + session=session, deployment_id=deployment.id + ) + assert result is None + + # make sure the concurrency limit is deleted + result = await models.concurrency_limits_v2.read_concurrency_limit( + session, deployment.concurrency_limit_id + ) + assert result is None + class TestScheduledRuns: async def test_schedule_runs_inserts_in_db(self, deployment, session): @@ -1091,6 +1171,70 @@ async def test_update_deployment_with_concurrency_limit( self, session, deployment, + ): + assert deployment.global_concurrency_limit is None + + await models.deployments.update_deployment( + session=session, + deployment_id=deployment.id, + deployment=schemas.actions.DeploymentUpdate( + concurrency_limit=5, + ), + ) + await session.commit() + + updated_deployment = await models.deployments.read_deployment( + session=session, deployment_id=deployment.id + ) + assert updated_deployment + assert updated_deployment._concurrency_limit == 5 + assert updated_deployment.global_concurrency_limit.limit == 5 + + async def test_update_deployment_can_remove_concurrency_limit( + self, + session, + deployment, + ): + # Given a deployment with a concurrency limit + await models.deployments.update_deployment( + session=session, + deployment_id=deployment.id, + deployment=schemas.actions.DeploymentUpdate( + concurrency_limit=5, + ), + ) + await session.commit() + await session.refresh(deployment) + assert deployment.global_concurrency_limit is not None + gcl_id = deployment.concurrency_limit_id + + # update it to remove the concurrency limit + await models.deployments.update_deployment( + session=session, + deployment_id=deployment.id, + deployment=schemas.actions.DeploymentUpdate( + concurrency_limit=None, + ), + ) + await session.commit() + + updated_deployment = await models.deployments.read_deployment( + session=session, deployment_id=deployment.id + ) + assert updated_deployment + assert updated_deployment._concurrency_limit is None + assert updated_deployment.concurrency_limit_id is None + assert updated_deployment.global_concurrency_limit is None + + assert ( + await models.concurrency_limits_v2.read_concurrency_limit(session, gcl_id) + is None + ), "Expected the concurrency limit to be deleted, but it was not" + + async def test_update_deployment_with_concurrency_options( + self, + session, + deployment, ): await models.deployments.update_deployment( session=session, @@ -1105,7 +1249,8 @@ async def test_update_deployment_with_concurrency_limit( updated_deployment = await models.deployments.read_deployment( session=session, deployment_id=deployment.id ) - assert updated_deployment.concurrency_limit == 42 + assert updated_deployment._concurrency_limit == 42 + assert updated_deployment.global_concurrency_limit.limit == 42 assert updated_deployment.concurrency_options.collision_strategy == "CANCEL_NEW" diff --git a/tests/server/orchestration/api/test_deployments.py b/tests/server/orchestration/api/test_deployments.py index 868764eee88e..75a1ffa9d7dd 100644 --- a/tests/server/orchestration/api/test_deployments.py +++ b/tests/server/orchestration/api/test_deployments.py @@ -1014,6 +1014,34 @@ async def test_can_pause_deployment_by_upserting_paused( assert response.status_code == 200 assert response.json()["paused"] is True + async def test_create_deployment_with_concurrency_limit( + self, + client, + flow, + ): + response = await client.post( + "/deployments/", + json=dict( + name="My Deployment", + flow_id=str(flow.id), + concurrency_limit=3, + ), + ) + assert response.status_code == status.HTTP_201_CREATED + + json_response = response.json() + assert ( + json_response["concurrency_limit"] is None + ), "Deprecated int-only field should be None for backwards-compatibility" + + global_concurrency_limit = json_response.get("global_concurrency_limit") + assert global_concurrency_limit is not None + assert global_concurrency_limit.get("limit") == 3 + assert global_concurrency_limit.get("active") is True + assert ( + global_concurrency_limit.get("name") == f"deployment:{json_response['id']}" + ) + class TestReadDeployment: async def test_read_deployment( @@ -1031,6 +1059,29 @@ async def test_read_deployment_returns_404_if_does_not_exist(self, client): response = await client.get(f"/deployments/{uuid4()}") assert response.status_code == status.HTTP_404_NOT_FOUND + async def test_read_deployment_with_concurrency_limit( + self, session, client, deployment + ): + update = DeploymentUpdate(concurrency_limit=4) + await models.deployments.update_deployment(session, deployment.id, update) + await session.commit() + + response = await client.get(f"/deployments/{deployment.id}") + assert response.status_code == status.HTTP_200_OK + + json_response = response.json() + assert ( + json_response["concurrency_limit"] is None + ), "Deprecated int-only field should be None for backwards-compatibility" + + global_concurrency_limit = json_response.get("global_concurrency_limit") + assert global_concurrency_limit is not None + assert global_concurrency_limit.get("limit") == update.concurrency_limit + assert global_concurrency_limit.get("active") is True + assert ( + global_concurrency_limit.get("name") == f"deployment:{json_response['id']}" + ) + class TestReadDeploymentByName: async def test_read_deployment_by_name(self, client, flow, deployment): @@ -1795,6 +1846,24 @@ async def test_updating_paused_does_not_change_schedule( assert schedules[0].schedule.interval == new_schedule.interval assert schedules[0].active is True + async def test_updating_deployment_with_concurrency_limit( + self, + client, + deployment, + session, + ): + assert deployment.global_concurrency_limit is None + + response = await client.patch( + f"/deployments/{deployment.id}", json={"concurrency_limit": 1} + ) + assert response.status_code == 204 + + await session.refresh(deployment) + assert deployment + assert deployment._concurrency_limit == 1 + assert deployment.global_concurrency_limit.limit == 1 + class TestGetScheduledFlowRuns: @pytest.fixture diff --git a/tests/test_flows.py b/tests/test_flows.py index 653a59d8ead6..d7d6723b18fd 100644 --- a/tests/test_flows.py +++ b/tests/test_flows.py @@ -4052,7 +4052,7 @@ def test_serve_creates_deployment(self, sync_prefect_client: SyncPrefectClient): assert deployment.version == "alpha" assert deployment.enforce_parameter_schema assert deployment.paused - assert deployment.concurrency_limit == 42 + assert deployment.global_concurrency_limit.limit == 42 def test_serve_can_user_a_module_path_entrypoint(self, sync_prefect_client): deployment = self.flow.serve(