Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/deployments.py: 39%
281 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1"""
2Routes for interacting with Deployment objects.
3"""
5import datetime 1b
6from typing import List, Optional 1b
7from uuid import UUID 1b
9import jsonschema.exceptions 1b
10import sqlalchemy as sa 1b
11from fastapi import Body, Depends, HTTPException, Path, Response 1b
13import prefect.server.api.dependencies as dependencies 1b
14import prefect.server.models as models 1b
15import prefect.server.schemas as schemas 1b
16from prefect._internal.compatibility.starlette import status 1b
17from prefect.server.api.validation import ( 1b
18 validate_job_variables_for_deployment,
19 validate_job_variables_for_deployment_flow_run,
20)
21from prefect.server.api.workers import WorkerLookups 1b
22from prefect.server.database import PrefectDBInterface, provide_database_interface 1b
23from prefect.server.exceptions import MissingVariableError, ObjectNotFoundError 1b
24from prefect.server.models.deployments import mark_deployments_ready 1b
25from prefect.server.models.workers import DEFAULT_AGENT_WORK_POOL_NAME 1b
26from prefect.server.schemas.responses import DeploymentPaginationResponse 1b
27from prefect.server.utilities.server import PrefectRouter 1b
28from prefect.types import DateTime 1b
29from prefect.types._datetime import now 1b
30from prefect.utilities.schema_tools.hydration import ( 1b
31 HydrationContext,
32 HydrationError,
33 hydrate,
34)
35from prefect.utilities.schema_tools.validation import ( 1b
36 CircularSchemaRefError,
37 ValidationError,
38 validate,
39)
41router: PrefectRouter = PrefectRouter(prefix="/deployments", tags=["Deployments"]) 1b
44def _multiple_schedules_error(deployment_id) -> HTTPException: 1b
45 return HTTPException(
46 status.HTTP_422_UNPROCESSABLE_ENTITY,
47 detail=(
48 "Error updating deployment: "
49 f"Deployment {deployment_id!r} has multiple schedules. "
50 "Please use the UI or update your client to adjust this "
51 "deployment's schedules.",
52 ),
53 )
56@router.post("/") 1b
57async def create_deployment( 1b
58 deployment: schemas.actions.DeploymentCreate,
59 response: Response,
60 worker_lookups: WorkerLookups = Depends(WorkerLookups),
61 created_by: Optional[schemas.core.CreatedBy] = Depends(dependencies.get_created_by),
62 updated_by: Optional[schemas.core.UpdatedBy] = Depends(dependencies.get_updated_by),
63 db: PrefectDBInterface = Depends(provide_database_interface),
64) -> schemas.responses.DeploymentResponse:
65 """
66 Creates a new deployment from the provided schema. If a deployment with
67 the same name and flow_id already exists, the deployment is updated.
69 If the deployment has an active schedule, flow runs will be scheduled.
70 When upserting, any scheduled runs from the existing deployment will be deleted.
72 For more information, see https://docs.prefect.io/v3/concepts/deployments.
73 """
75 data = deployment.model_dump(exclude_unset=True) 1adc
76 data["created_by"] = created_by.model_dump() if created_by else None 1adc
77 data["updated_by"] = updated_by.model_dump() if created_by else None 1adc
79 async with db.session_context(begin_transaction=True) as session: 1adc
80 if (
81 deployment.work_pool_name
82 and deployment.work_pool_name != DEFAULT_AGENT_WORK_POOL_NAME
83 ):
84 # Make sure that deployment is valid before beginning creation process
85 work_pool = await models.workers.read_work_pool_by_name( 1ac
86 session=session, work_pool_name=deployment.work_pool_name
87 )
88 if work_pool is None:
89 raise HTTPException(
90 status_code=status.HTTP_404_NOT_FOUND,
91 detail=f'Work pool "{deployment.work_pool_name}" not found.',
92 )
94 await validate_job_variables_for_deployment(
95 session,
96 work_pool,
97 deployment,
98 )
100 # hydrate the input model into a full model
101 deployment_dict: dict = deployment.model_dump( 1adc
102 exclude={"work_pool_name"},
103 exclude_unset=True,
104 )
106 requested_concurrency_limit = deployment_dict.pop( 1adc
107 "global_concurrency_limit_id", "unset"
108 )
109 if requested_concurrency_limit != "unset": 1adc
110 if requested_concurrency_limit: 1adc
111 concurrency_limit = (
112 await models.concurrency_limits_v2.read_concurrency_limit(
113 session=session,
114 concurrency_limit_id=requested_concurrency_limit,
115 )
116 )
118 if not concurrency_limit:
119 raise HTTPException(
120 status_code=status.HTTP_404_NOT_FOUND,
121 detail="Concurrency limit not found",
122 )
124 deployment_dict["concurrency_limit_id"] = requested_concurrency_limit 1adc
126 if deployment.work_pool_name and deployment.work_queue_name: 126 ↛ 129line 126 didn't jump to line 129 because the condition on line 126 was never true1adc
127 # If a specific pool name/queue name combination was provided, get the
128 # ID for that work pool queue.
129 deployment_dict[
130 "work_queue_id"
131 ] = await worker_lookups._get_work_queue_id_from_name(
132 session=session,
133 work_pool_name=deployment.work_pool_name,
134 work_queue_name=deployment.work_queue_name,
135 create_queue_if_not_found=True,
136 )
137 elif deployment.work_pool_name: 137 ↛ 140line 137 didn't jump to line 140 because the condition on line 137 was never true1adc
138 # If just a pool name was provided, get the ID for its default
139 # work pool queue.
140 deployment_dict[
141 "work_queue_id"
142 ] = await worker_lookups._get_default_work_queue_id_from_work_pool_name(
143 session=session,
144 work_pool_name=deployment.work_pool_name,
145 )
146 elif deployment.work_queue_name: 1adc
147 # If just a queue name was provided, ensure that the queue exists and
148 # get its ID.
149 work_queue = await models.work_queues.ensure_work_queue_exists( 1adc
150 session=session, name=deployment.work_queue_name
151 )
152 deployment_dict["work_queue_id"] = work_queue.id
154 deployment = schemas.core.Deployment(**deployment_dict) 1adc
155 # check to see if relevant blocks exist, allowing us throw a useful error message
156 # for debugging
157 if deployment.infrastructure_document_id is not None: 1adc
158 infrastructure_block = (
159 await models.block_documents.read_block_document_by_id(
160 session=session,
161 block_document_id=deployment.infrastructure_document_id,
162 )
163 )
164 if not infrastructure_block:
165 raise HTTPException(
166 status_code=status.HTTP_409_CONFLICT,
167 detail=(
168 "Error creating deployment. Could not find infrastructure"
169 f" block with id: {deployment.infrastructure_document_id}. This"
170 " usually occurs when applying a deployment specification that"
171 " was built against a different Prefect database / workspace."
172 ),
173 )
175 if deployment.storage_document_id is not None: 1adc
176 storage_block = await models.block_documents.read_block_document_by_id( 1adc
177 session=session,
178 block_document_id=deployment.storage_document_id,
179 )
180 if not storage_block:
181 raise HTTPException(
182 status_code=status.HTTP_409_CONFLICT,
183 detail=(
184 "Error creating deployment. Could not find storage block with"
185 f" id: {deployment.storage_document_id}. This usually occurs"
186 " when applying a deployment specification that was built"
187 " against a different Prefect database / workspace."
188 ),
189 )
191 right_now = now("UTC") 1adc
192 model = await models.deployments.create_deployment( 1adc
193 session=session, deployment=deployment
194 )
196 if model.created >= right_now:
197 response.status_code = status.HTTP_201_CREATED
199 return schemas.responses.DeploymentResponse.model_validate(
200 model, from_attributes=True
201 )
204@router.patch("/{id:uuid}", status_code=status.HTTP_204_NO_CONTENT) 1b
205async def update_deployment( 1b
206 deployment: schemas.actions.DeploymentUpdate,
207 deployment_id: UUID = Path(..., description="The deployment id", alias="id"),
208 db: PrefectDBInterface = Depends(provide_database_interface),
209) -> None:
210 async with db.session_context(begin_transaction=True) as session: 1adc
211 existing_deployment = await models.deployments.read_deployment( 1adc
212 session=session, deployment_id=deployment_id
213 )
214 if not existing_deployment:
215 raise HTTPException(
216 status.HTTP_404_NOT_FOUND, detail="Deployment not found."
217 )
219 # Checking how we should handle schedule updates
220 # If not all existing schedules have slugs then we'll fall back to the existing logic where are schedules are recreated to match the request.
221 # If the existing schedules have slugs, but not all provided schedules have slugs, then we'll return a 422 to avoid accidentally blowing away schedules.
222 # Otherwise, we'll use the existing slugs and the provided slugs to make targeted updates to the deployment's schedules.
223 schedules_to_patch: list[schemas.actions.DeploymentScheduleUpdate] = []
224 schedules_to_create: list[schemas.actions.DeploymentScheduleUpdate] = []
225 all_provided_have_slugs = all( 1dc
226 schedule.slug is not None for schedule in deployment.schedules or []
227 )
228 all_existing_have_slugs = existing_deployment.schedules and all(
229 schedule.slug is not None for schedule in existing_deployment.schedules
230 )
231 if all_provided_have_slugs and all_existing_have_slugs:
232 current_slugs = [
233 schedule.slug for schedule in existing_deployment.schedules
234 ]
236 for schedule in deployment.schedules:
237 if schedule.slug in current_slugs:
238 schedules_to_patch.append(schedule)
239 elif schedule.schedule:
240 schedules_to_create.append(schedule)
241 else:
242 raise HTTPException(
243 status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
244 detail="Unable to create new deployment schedules without a schedule configuration.",
245 )
246 # Clear schedules to handle their update/creation separately
247 deployment.schedules = None
248 elif not all_provided_have_slugs and all_existing_have_slugs:
249 raise HTTPException(
250 status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
251 detail="Please provide a slug for each schedule in your request to ensure schedules are updated correctly.",
252 )
254 if deployment.work_pool_name:
255 # Make sure that deployment is valid before beginning creation process
256 work_pool = await models.workers.read_work_pool_by_name(
257 session=session, work_pool_name=deployment.work_pool_name
258 )
259 try:
260 deployment.check_valid_configuration(work_pool.base_job_template)
261 except (MissingVariableError, jsonschema.exceptions.ValidationError) as exc:
262 raise HTTPException(
263 status_code=status.HTTP_409_CONFLICT,
264 detail=f"Error creating deployment: {exc!r}",
265 )
267 if deployment.parameters is not None:
268 try:
269 dehydrated_params = deployment.parameters
270 ctx = await HydrationContext.build(
271 session=session,
272 raise_on_error=True,
273 render_jinja=True,
274 render_workspace_variables=True,
275 )
276 parameters = hydrate(dehydrated_params, ctx)
277 deployment.parameters = parameters
278 except HydrationError as exc:
279 raise HTTPException(
280 status.HTTP_400_BAD_REQUEST,
281 detail=f"Error hydrating deployment parameters: {exc}",
282 )
283 else:
284 parameters = existing_deployment.parameters
286 enforce_parameter_schema = (
287 deployment.enforce_parameter_schema
288 if deployment.enforce_parameter_schema is not None
289 else existing_deployment.enforce_parameter_schema
290 )
291 if enforce_parameter_schema:
292 # ensure that the new parameters conform to the proposed schema
293 if deployment.parameter_openapi_schema:
294 openapi_schema = deployment.parameter_openapi_schema
295 else:
296 openapi_schema = existing_deployment.parameter_openapi_schema
298 if not isinstance(openapi_schema, dict):
299 raise HTTPException(
300 status.HTTP_409_CONFLICT,
301 detail=(
302 "Error updating deployment: Cannot update parameters because"
303 " parameter schema enforcement is enabled and the deployment"
304 " does not have a valid parameter schema."
305 ),
306 )
307 try:
308 validate(
309 parameters,
310 openapi_schema,
311 raise_on_error=True,
312 ignore_required=True,
313 )
314 except ValidationError as exc:
315 raise HTTPException(
316 status.HTTP_409_CONFLICT,
317 detail=f"Error updating deployment: {exc}",
318 )
319 except CircularSchemaRefError:
320 raise HTTPException(
321 status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
322 detail="Invalid schema: Unable to validate schema with circular references.",
323 )
325 if deployment.global_concurrency_limit_id:
326 concurrency_limit = (
327 await models.concurrency_limits_v2.read_concurrency_limit(
328 session=session,
329 concurrency_limit_id=deployment.global_concurrency_limit_id,
330 )
331 )
333 if not concurrency_limit:
334 raise HTTPException(
335 status_code=status.HTTP_404_NOT_FOUND,
336 detail="Concurrency limit not found",
337 )
339 result = await models.deployments.update_deployment(
340 session=session,
341 deployment_id=deployment_id,
342 deployment=deployment,
343 )
345 for schedule in schedules_to_patch:
346 await models.deployments.update_deployment_schedule(
347 session=session,
348 deployment_id=deployment_id,
349 schedule=schedule,
350 deployment_schedule_slug=schedule.slug,
351 )
352 if schedules_to_create:
353 await models.deployments.create_deployment_schedules(
354 session=session,
355 deployment_id=deployment_id,
356 schedules=[
357 schemas.actions.DeploymentScheduleCreate(
358 schedule=schedule.schedule, # type: ignore We will raise above if schedule is not provided
359 active=schedule.active if schedule.active is not None else True,
360 slug=schedule.slug,
361 parameters=schedule.parameters,
362 )
363 for schedule in schedules_to_create
364 ],
365 )
366 if not result:
367 raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Deployment not found.")
370@router.get("/name/{flow_name}/{deployment_name}") 1b
371async def read_deployment_by_name( 1b
372 flow_name: str = Path(..., description="The name of the flow"),
373 deployment_name: str = Path(..., description="The name of the deployment"),
374 db: PrefectDBInterface = Depends(provide_database_interface),
375) -> schemas.responses.DeploymentResponse:
376 """
377 Get a deployment using the name of the flow and the deployment.
378 """
379 async with db.session_context() as session: 1ac
380 deployment = await models.deployments.read_deployment_by_name( 1ac
381 session=session, name=deployment_name, flow_name=flow_name
382 )
383 if not deployment:
384 raise HTTPException(
385 status.HTTP_404_NOT_FOUND, detail="Deployment not found"
386 )
387 return schemas.responses.DeploymentResponse.model_validate(
388 deployment, from_attributes=True
389 )
392@router.get("/{id:uuid}") 1b
393async def read_deployment( 1b
394 deployment_id: UUID = Path(..., description="The deployment id", alias="id"),
395 db: PrefectDBInterface = Depends(provide_database_interface),
396) -> schemas.responses.DeploymentResponse:
397 """
398 Get a deployment by id.
399 """
400 async with db.session_context() as session: 1adc
401 deployment = await models.deployments.read_deployment( 1adc
402 session=session, deployment_id=deployment_id
403 )
404 if not deployment:
405 raise HTTPException(
406 status_code=status.HTTP_404_NOT_FOUND, detail="Deployment not found"
407 )
408 return schemas.responses.DeploymentResponse.model_validate(
409 deployment, from_attributes=True
410 )
413@router.post("/filter") 1b
414async def read_deployments( 1b
415 limit: int = dependencies.LimitBody(),
416 offset: int = Body(0, ge=0),
417 flows: Optional[schemas.filters.FlowFilter] = None,
418 flow_runs: Optional[schemas.filters.FlowRunFilter] = None,
419 task_runs: Optional[schemas.filters.TaskRunFilter] = None,
420 deployments: Optional[schemas.filters.DeploymentFilter] = None,
421 work_pools: Optional[schemas.filters.WorkPoolFilter] = None,
422 work_pool_queues: Optional[schemas.filters.WorkQueueFilter] = None,
423 sort: schemas.sorting.DeploymentSort = Body(
424 schemas.sorting.DeploymentSort.NAME_ASC
425 ),
426 db: PrefectDBInterface = Depends(provide_database_interface),
427) -> List[schemas.responses.DeploymentResponse]:
428 """
429 Query for deployments.
430 """
431 async with db.session_context() as session: 1adgc
432 response = await models.deployments.read_deployments( 1adgc
433 session=session,
434 offset=offset,
435 sort=sort,
436 limit=limit,
437 flow_filter=flows,
438 flow_run_filter=flow_runs,
439 task_run_filter=task_runs,
440 deployment_filter=deployments,
441 work_pool_filter=work_pools,
442 work_queue_filter=work_pool_queues,
443 )
444 return [
445 schemas.responses.DeploymentResponse.model_validate(
446 deployment, from_attributes=True
447 )
448 for deployment in response
449 ]
452@router.post("/paginate") 1b
453async def paginate_deployments( 1b
454 limit: int = dependencies.LimitBody(),
455 page: int = Body(1, ge=1),
456 flows: Optional[schemas.filters.FlowFilter] = None,
457 flow_runs: Optional[schemas.filters.FlowRunFilter] = None,
458 task_runs: Optional[schemas.filters.TaskRunFilter] = None,
459 deployments: Optional[schemas.filters.DeploymentFilter] = None,
460 work_pools: Optional[schemas.filters.WorkPoolFilter] = None,
461 work_pool_queues: Optional[schemas.filters.WorkQueueFilter] = None,
462 sort: schemas.sorting.DeploymentSort = Body(
463 schemas.sorting.DeploymentSort.NAME_ASC
464 ),
465 db: PrefectDBInterface = Depends(provide_database_interface),
466) -> DeploymentPaginationResponse:
467 """
468 Pagination query for flow runs.
469 """
470 offset = (page - 1) * limit 1adec
472 async with db.session_context() as session: 1adec
473 response = await models.deployments.read_deployments( 1adec
474 session=session,
475 offset=offset,
476 sort=sort,
477 limit=limit,
478 flow_filter=flows,
479 flow_run_filter=flow_runs,
480 task_run_filter=task_runs,
481 deployment_filter=deployments,
482 work_pool_filter=work_pools,
483 work_queue_filter=work_pool_queues,
484 )
486 count = await models.deployments.count_deployments( 1adec
487 session=session,
488 flow_filter=flows,
489 flow_run_filter=flow_runs,
490 task_run_filter=task_runs,
491 deployment_filter=deployments,
492 work_pool_filter=work_pools,
493 work_queue_filter=work_pool_queues,
494 )
496 results = [ 1adec
497 schemas.responses.DeploymentResponse.model_validate(
498 deployment, from_attributes=True
499 )
500 for deployment in response
501 ]
503 return DeploymentPaginationResponse( 1adec
504 results=results,
505 count=count,
506 limit=limit,
507 pages=(count + limit - 1) // limit,
508 page=page,
509 )
512@router.post("/get_scheduled_flow_runs") 1b
513async def get_scheduled_flow_runs_for_deployments( 1b
514 docket: dependencies.Docket,
515 deployment_ids: list[UUID] = Body(
516 default=..., description="The deployment IDs to get scheduled runs for"
517 ),
518 scheduled_before: DateTime = Body(
519 None, description="The maximum time to look for scheduled flow runs"
520 ),
521 limit: int = dependencies.LimitBody(),
522 db: PrefectDBInterface = Depends(provide_database_interface),
523) -> list[schemas.responses.FlowRunResponse]:
524 """
525 Get scheduled runs for a set of deployments. Used by a runner to poll for work.
526 """
527 async with db.session_context() as session: 1afdc
528 orm_flow_runs = await models.flow_runs.read_flow_runs( 1afdc
529 session=session,
530 limit=limit,
531 deployment_filter=schemas.filters.DeploymentFilter(
532 id=schemas.filters.DeploymentFilterId(any_=deployment_ids),
533 ),
534 flow_run_filter=schemas.filters.FlowRunFilter(
535 next_scheduled_start_time=schemas.filters.FlowRunFilterNextScheduledStartTime(
536 before_=scheduled_before
537 ),
538 state=schemas.filters.FlowRunFilterState(
539 type=schemas.filters.FlowRunFilterStateType(
540 any_=[schemas.states.StateType.SCHEDULED]
541 )
542 ),
543 ),
544 sort=schemas.sorting.FlowRunSort.NEXT_SCHEDULED_START_TIME_ASC,
545 )
547 flow_run_responses = [
548 schemas.responses.FlowRunResponse.model_validate(
549 orm_flow_run, from_attributes=True
550 )
551 for orm_flow_run in orm_flow_runs
552 ]
554 await docket.add(mark_deployments_ready)( 1afdc
555 deployment_ids=deployment_ids,
556 )
558 return flow_run_responses 1afdc
561@router.post("/count") 1b
562async def count_deployments( 1b
563 flows: Optional[schemas.filters.FlowFilter] = None,
564 flow_runs: Optional[schemas.filters.FlowRunFilter] = None,
565 task_runs: Optional[schemas.filters.TaskRunFilter] = None,
566 deployments: Optional[schemas.filters.DeploymentFilter] = None,
567 work_pools: Optional[schemas.filters.WorkPoolFilter] = None,
568 work_pool_queues: Optional[schemas.filters.WorkQueueFilter] = None,
569 db: PrefectDBInterface = Depends(provide_database_interface),
570) -> int:
571 """
572 Count deployments.
573 """
574 async with db.session_context() as session: 1ac
575 return await models.deployments.count_deployments( 1ac
576 session=session,
577 flow_filter=flows,
578 flow_run_filter=flow_runs,
579 task_run_filter=task_runs,
580 deployment_filter=deployments,
581 work_pool_filter=work_pools,
582 work_queue_filter=work_pool_queues,
583 )
586@router.delete("/{id:uuid}", status_code=status.HTTP_204_NO_CONTENT) 1b
587async def delete_deployment( 1b
588 deployment_id: UUID = Path(..., description="The deployment id", alias="id"),
589 db: PrefectDBInterface = Depends(provide_database_interface),
590) -> None:
591 """
592 Delete a deployment by id.
593 """
594 async with db.session_context(begin_transaction=True) as session: 1adc
595 result = await models.deployments.delete_deployment( 1adc
596 session=session, deployment_id=deployment_id
597 )
598 if not result: 1adc
599 raise HTTPException( 1adc
600 status_code=status.HTTP_404_NOT_FOUND, detail="Deployment not found"
601 )
604@router.post("/{id:uuid}/schedule") 1b
605async def schedule_deployment( 1b
606 deployment_id: UUID = Path(..., description="The deployment id", alias="id"),
607 start_time: datetime.datetime = Body(
608 None, description="The earliest date to schedule"
609 ),
610 end_time: datetime.datetime = Body(None, description="The latest date to schedule"),
611 # Workaround for the fact that FastAPI does not let us configure ser_json_timedelta
612 # to represent timedeltas as floats in JSON.
613 min_time: float = Body(
614 None,
615 description=(
616 "Runs will be scheduled until at least this long after the `start_time`"
617 ),
618 json_schema_extra={"format": "time-delta"},
619 ),
620 min_runs: int = Body(None, description="The minimum number of runs to schedule"),
621 max_runs: int = Body(None, description="The maximum number of runs to schedule"),
622 db: PrefectDBInterface = Depends(provide_database_interface),
623) -> None:
624 """
625 Schedule runs for a deployment. For backfills, provide start/end times in the past.
627 This function will generate the minimum number of runs that satisfy the min
628 and max times, and the min and max counts. Specifically, the following order
629 will be respected.
631 - Runs will be generated starting on or after the `start_time`
632 - No more than `max_runs` runs will be generated
633 - No runs will be generated after `end_time` is reached
634 - At least `min_runs` runs will be generated
635 - Runs will be generated until at least `start_time + min_time` is reached
636 """
637 if isinstance(min_time, float): 637 ↛ 638line 637 didn't jump to line 638 because the condition on line 637 was never true1a
638 min_time = datetime.timedelta(seconds=min_time)
640 async with db.session_context(begin_transaction=True) as session: 1a
641 await models.deployments.schedule_runs( 1a
642 session=session,
643 deployment_id=deployment_id,
644 start_time=start_time,
645 min_time=min_time,
646 end_time=end_time,
647 min_runs=min_runs,
648 max_runs=max_runs,
649 )
652@router.post("/{id:uuid}/resume_deployment") 1b
653async def resume_deployment( 1b
654 deployment_id: UUID = Path(..., description="The deployment id", alias="id"),
655 db: PrefectDBInterface = Depends(provide_database_interface),
656) -> None:
657 """
658 Set a deployment schedule to active. Runs will be scheduled immediately.
659 """
660 async with db.session_context(begin_transaction=True) as session: 1a
661 deployment = await models.deployments.read_deployment( 1a
662 session=session, deployment_id=deployment_id
663 )
664 if not deployment:
665 raise HTTPException(
666 status_code=status.HTTP_404_NOT_FOUND, detail="Deployment not found"
667 )
668 deployment.paused = False
671@router.post("/{id:uuid}/pause_deployment") 1b
672async def pause_deployment( 1b
673 deployment_id: UUID = Path(..., description="The deployment id", alias="id"),
674 db: PrefectDBInterface = Depends(provide_database_interface),
675) -> None:
676 """
677 Set a deployment schedule to inactive. Any auto-scheduled runs still in a Scheduled
678 state will be deleted.
679 """
680 async with db.session_context(begin_transaction=False) as session: 1a
681 deployment = await models.deployments.read_deployment( 1a
682 session=session, deployment_id=deployment_id
683 )
684 if not deployment:
685 raise HTTPException(
686 status_code=status.HTTP_404_NOT_FOUND, detail="Deployment not found"
687 )
688 deployment.paused = True
690 # commit here to make the inactive schedule "visible" to the scheduler service
691 await session.commit()
693 # delete any auto scheduled runs
694 await models.deployments._delete_scheduled_runs(
695 session=session,
696 deployment_id=deployment_id,
697 auto_scheduled_only=True,
698 )
700 await session.commit()
703@router.post("/{id:uuid}/create_flow_run") 1b
704async def create_flow_run_from_deployment( 1b
705 flow_run: schemas.actions.DeploymentFlowRunCreate,
706 deployment_id: UUID = Path(..., description="The deployment id", alias="id"),
707 created_by: Optional[schemas.core.CreatedBy] = Depends(dependencies.get_created_by),
708 db: PrefectDBInterface = Depends(provide_database_interface),
709 worker_lookups: WorkerLookups = Depends(WorkerLookups),
710 response: Response = None,
711) -> schemas.responses.FlowRunResponse:
712 """
713 Create a flow run from a deployment.
715 Any parameters not provided will be inferred from the deployment's parameters.
716 If tags are not provided, the deployment's tags will be used.
718 If no state is provided, the flow run will be created in a SCHEDULED state.
719 """
720 async with db.session_context(begin_transaction=True) as session: 1afdgc
721 # get relevant info from the deployment
722 deployment = await models.deployments.read_deployment( 1afdgc
723 session=session, deployment_id=deployment_id
724 )
726 if not deployment:
727 raise HTTPException(
728 status_code=status.HTTP_404_NOT_FOUND, detail="Deployment not found"
729 )
731 try:
732 dehydrated_params = deployment.parameters
733 dehydrated_params.update(flow_run.parameters or {})
734 ctx = await HydrationContext.build(
735 session=session,
736 raise_on_error=True,
737 render_jinja=True,
738 render_workspace_variables=True,
739 )
740 parameters = hydrate(dehydrated_params, ctx)
741 except HydrationError as exc:
742 raise HTTPException(
743 status.HTTP_400_BAD_REQUEST,
744 detail=f"Error hydrating flow run parameters: {exc}",
745 )
747 # default
748 enforce_parameter_schema = deployment.enforce_parameter_schema
750 # run override
751 if flow_run.enforce_parameter_schema is not None:
752 enforce_parameter_schema = flow_run.enforce_parameter_schema
754 if enforce_parameter_schema:
755 if not isinstance(deployment.parameter_openapi_schema, dict):
756 raise HTTPException(
757 status.HTTP_409_CONFLICT,
758 detail=(
759 "Error updating deployment: Cannot update parameters because"
760 " parameter schema enforcement is enabled and the deployment"
761 " does not have a valid parameter schema."
762 ),
763 )
764 try:
765 validate(
766 parameters, deployment.parameter_openapi_schema, raise_on_error=True
767 )
768 except ValidationError as exc:
769 raise HTTPException(
770 status.HTTP_409_CONFLICT,
771 detail=f"Error creating flow run: {exc}",
772 )
773 except CircularSchemaRefError:
774 raise HTTPException(
775 status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
776 detail="Invalid schema: Unable to validate schema with circular references.",
777 )
779 await validate_job_variables_for_deployment_flow_run(
780 session, deployment, flow_run
781 )
783 work_queue_name = deployment.work_queue_name
784 work_queue_id = deployment.work_queue_id
786 if flow_run.work_queue_name:
787 # can't mutate the ORM model or else it will commit the changes back
788 if deployment.work_queue is None or deployment.work_queue.work_pool is None:
789 raise HTTPException(
790 status_code=status.HTTP_400_BAD_REQUEST,
791 detail=f"Cannot create flow run in work queue {flow_run.work_queue_name} because deployment {deployment_id} is not associated with a work pool. Please remove work_pool_name and try again.",
792 )
794 work_queue_id = await worker_lookups._get_work_queue_id_from_name(
795 session=session,
796 work_pool_name=deployment.work_queue.work_pool.name,
797 work_queue_name=flow_run.work_queue_name,
798 create_queue_if_not_found=True,
799 )
800 work_queue_name = flow_run.work_queue_name
802 # hydrate the input model into a full flow run / state model
803 flow_run = schemas.core.FlowRun(
804 **flow_run.model_dump(
805 exclude={
806 "parameters",
807 "tags",
808 "infrastructure_document_id",
809 "work_queue_name",
810 "enforce_parameter_schema",
811 }
812 ),
813 flow_id=deployment.flow_id,
814 deployment_id=deployment.id,
815 deployment_version=deployment.version,
816 parameters=parameters,
817 tags=set(deployment.tags).union(flow_run.tags),
818 infrastructure_document_id=(
819 flow_run.infrastructure_document_id
820 or deployment.infrastructure_document_id
821 ),
822 work_queue_name=work_queue_name,
823 work_queue_id=work_queue_id,
824 created_by=created_by,
825 )
827 if not flow_run.state:
828 flow_run.state = schemas.states.Scheduled()
830 right_now = now("UTC")
831 model = await models.flow_runs.create_flow_run(
832 session=session, flow_run=flow_run
833 )
834 if model.created >= right_now:
835 response.status_code = status.HTTP_201_CREATED
836 return schemas.responses.FlowRunResponse.model_validate(
837 model, from_attributes=True
838 )
841# DEPRECATED
842@router.get("/{id:uuid}/work_queue_check", deprecated=True) 1b
843async def work_queue_check_for_deployment( 1b
844 deployment_id: UUID = Path(..., description="The deployment id", alias="id"),
845 db: PrefectDBInterface = Depends(provide_database_interface),
846) -> List[schemas.core.WorkQueue]:
847 """
848 Get list of work-queues that are able to pick up the specified deployment.
850 This endpoint is intended to be used by the UI to provide users warnings
851 about deployments that are unable to be executed because there are no work
852 queues that will pick up their runs, based on existing filter criteria. It
853 may be deprecated in the future because there is not a strict relationship
854 between work queues and deployments.
855 """
856 try: 1a
857 async with db.session_context() as session: 1a
858 work_queues = await models.deployments.check_work_queues_for_deployment( 1a
859 session=session, deployment_id=deployment_id
860 )
861 except ObjectNotFoundError: 1a
862 raise HTTPException( 1a
863 status_code=status.HTTP_404_NOT_FOUND, detail="Deployment not found"
864 )
865 return work_queues
868@router.get("/{id:uuid}/schedules") 1b
869async def read_deployment_schedules( 1b
870 deployment_id: UUID = Path(..., description="The deployment id", alias="id"),
871 db: PrefectDBInterface = Depends(provide_database_interface),
872) -> List[schemas.core.DeploymentSchedule]:
873 async with db.session_context() as session: 1a
874 deployment = await models.deployments.read_deployment( 1a
875 session=session, deployment_id=deployment_id
876 )
878 if not deployment:
879 raise HTTPException(
880 status.HTTP_404_NOT_FOUND, detail="Deployment not found."
881 )
883 return await models.deployments.read_deployment_schedules(
884 session=session,
885 deployment_id=deployment.id,
886 )
889@router.post("/{id:uuid}/schedules", status_code=status.HTTP_201_CREATED) 1b
890async def create_deployment_schedules( 1b
891 deployment_id: UUID = Path(..., description="The deployment id", alias="id"),
892 schedules: List[schemas.actions.DeploymentScheduleCreate] = Body(
893 default=..., description="The schedules to create"
894 ),
895 db: PrefectDBInterface = Depends(provide_database_interface),
896) -> List[schemas.core.DeploymentSchedule]:
897 async with db.session_context(begin_transaction=True) as session: 1ac
898 deployment = await models.deployments.read_deployment( 1ac
899 session=session, deployment_id=deployment_id
900 )
902 if not deployment:
903 raise HTTPException(
904 status.HTTP_404_NOT_FOUND, detail="Deployment not found."
905 )
907 try:
908 created = await models.deployments.create_deployment_schedules(
909 session=session,
910 deployment_id=deployment.id,
911 schedules=schedules,
912 )
913 except sa.exc.IntegrityError as e:
914 if "duplicate key value violates unique constraint" in str(e):
915 raise HTTPException(
916 status.HTTP_409_CONFLICT,
917 detail="Schedule slugs must be unique within a deployment.",
918 )
919 raise
920 return created
923@router.patch( 1b
924 "/{id:uuid}/schedules/{schedule_id:uuid}", status_code=status.HTTP_204_NO_CONTENT
925)
926async def update_deployment_schedule( 1b
927 deployment_id: UUID = Path(..., description="The deployment id", alias="id"),
928 schedule_id: UUID = Path(..., description="The schedule id", alias="schedule_id"),
929 schedule: schemas.actions.DeploymentScheduleUpdate = Body(
930 default=..., description="The updated schedule"
931 ),
932 db: PrefectDBInterface = Depends(provide_database_interface),
933) -> None:
934 async with db.session_context(begin_transaction=True) as session: 1ac
935 deployment = await models.deployments.read_deployment( 1ac
936 session=session, deployment_id=deployment_id
937 )
939 if not deployment:
940 raise HTTPException(
941 status.HTTP_404_NOT_FOUND, detail="Deployment not found."
942 )
944 updated = await models.deployments.update_deployment_schedule(
945 session=session,
946 deployment_id=deployment_id,
947 deployment_schedule_id=schedule_id,
948 schedule=schedule,
949 )
951 if not updated:
952 raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Schedule not found.")
954 await models.deployments._delete_scheduled_runs(
955 session=session,
956 deployment_id=deployment_id,
957 auto_scheduled_only=True,
958 future_only=True,
959 )
962@router.delete( 1b
963 "/{id:uuid}/schedules/{schedule_id:uuid}", status_code=status.HTTP_204_NO_CONTENT
964)
965async def delete_deployment_schedule( 1b
966 deployment_id: UUID = Path(..., description="The deployment id", alias="id"),
967 schedule_id: UUID = Path(..., description="The schedule id", alias="schedule_id"),
968 db: PrefectDBInterface = Depends(provide_database_interface),
969) -> None:
970 async with db.session_context(begin_transaction=True) as session: 1a
971 deployment = await models.deployments.read_deployment( 1a
972 session=session, deployment_id=deployment_id
973 )
975 if not deployment:
976 raise HTTPException(
977 status.HTTP_404_NOT_FOUND, detail="Deployment not found."
978 )
980 deleted = await models.deployments.delete_deployment_schedule(
981 session=session,
982 deployment_id=deployment_id,
983 deployment_schedule_id=schedule_id,
984 )
986 if not deleted:
987 raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Schedule not found.")
989 await models.deployments._delete_scheduled_runs(
990 session=session,
991 deployment_id=deployment_id,
992 auto_scheduled_only=True,
993 )