Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/schemas/actions.py: 74%
376 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1"""
2Reduced schemas for accepting API actions.
3"""
5from __future__ import annotations 1a
7import json 1a
8from copy import deepcopy 1a
9from typing import Annotated, Any, ClassVar, Dict, List, Optional, Union 1a
10from uuid import UUID, uuid4 1a
12from pydantic import ( 1a
13 AfterValidator,
14 ConfigDict,
15 Field,
16 field_validator,
17 model_validator,
18)
20import prefect.server.schemas as schemas 1a
21from prefect._internal.schemas.validators import ( 1a
22 get_or_create_run_name,
23 remove_old_deployment_fields,
24 validate_cache_key_length,
25 validate_max_metadata_length,
26 validate_name_present_on_nonanonymous_blocks,
27 validate_parameter_openapi_schema,
28 validate_parameters_conform_to_schema,
29 validate_parent_and_ref_diff,
30 validate_schedule_max_scheduled_runs,
31)
32from prefect.server.utilities.schemas import get_class_fields_only 1a
33from prefect.server.utilities.schemas.bases import PrefectBaseModel 1a
34from prefect.settings import PREFECT_DEPLOYMENT_SCHEDULE_MAX_SCHEDULED_RUNS 1a
35from prefect.types import ( 1a
36 DateTime,
37 KeyValueLabels,
38 Name,
39 NonEmptyishName,
40 NonNegativeFloat,
41 NonNegativeInteger,
42 PositiveInteger,
43 StrictVariableValue,
44)
45from prefect.types._datetime import now 1a
46from prefect.types._schema import ParameterSchema 1a
47from prefect.types.names import ( 1a
48 ArtifactKey,
49 BlockDocumentName,
50 BlockTypeSlug,
51 VariableName,
52)
53from prefect.utilities.names import generate_slug 1a
54from prefect.utilities.templating import find_placeholders 1a
57class ActionBaseModel(PrefectBaseModel): 1a
58 model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") 1a
61class FlowCreate(ActionBaseModel): 1a
62 """Data used by the Prefect REST API to create a flow."""
64 name: Name = Field( 1a
65 default=..., description="The name of the flow", examples=["my-flow"]
66 )
67 tags: List[str] = Field( 1a
68 default_factory=list,
69 description="A list of flow tags",
70 examples=[["tag-1", "tag-2"]],
71 )
72 labels: Union[KeyValueLabels, None] = Field( 1a
73 default_factory=dict,
74 description="A dictionary of key-value labels. Values can be strings, numbers, or booleans.",
75 examples=[{"key": "value1", "key2": 42}],
76 )
79class FlowUpdate(ActionBaseModel): 1a
80 """Data used by the Prefect REST API to update a flow."""
82 tags: List[str] = Field( 1a
83 default_factory=list,
84 description="A list of flow tags",
85 examples=[["tag-1", "tag-2"]],
86 )
89class DeploymentScheduleCreate(ActionBaseModel): 1a
90 active: bool = Field( 1a
91 default=True, description="Whether or not the schedule is active."
92 )
93 schedule: schemas.schedules.SCHEDULE_TYPES = Field( 1a
94 default=..., description="The schedule for the deployment."
95 )
96 max_scheduled_runs: Optional[PositiveInteger] = Field( 1a
97 default=None,
98 description="The maximum number of scheduled runs for the schedule.",
99 )
100 parameters: dict[str, Any] = Field( 1a
101 default_factory=dict, description="A dictionary of parameter value overrides."
102 )
103 slug: Optional[str] = Field( 1a
104 default=None,
105 description="A unique identifier for the schedule.",
106 )
108 @field_validator("max_scheduled_runs") 1a
109 @classmethod 1a
110 def validate_max_scheduled_runs( 1a
111 cls, v: PositiveInteger | None
112 ) -> PositiveInteger | None:
113 return validate_schedule_max_scheduled_runs(
114 v, PREFECT_DEPLOYMENT_SCHEDULE_MAX_SCHEDULED_RUNS.value()
115 )
118class DeploymentScheduleUpdate(ActionBaseModel): 1a
119 active: Optional[bool] = Field( 1a
120 default=None, description="Whether or not the schedule is active."
121 )
122 schedule: Optional[schemas.schedules.SCHEDULE_TYPES] = Field( 1a
123 default=None, description="The schedule for the deployment."
124 )
126 max_scheduled_runs: Optional[PositiveInteger] = Field( 1a
127 default=None,
128 description="The maximum number of scheduled runs for the schedule.",
129 )
130 parameters: dict[str, Any] = Field( 1a
131 default_factory=dict, description="A dictionary of parameter value overrides."
132 )
133 slug: Optional[str] = Field( 1a
134 default=None,
135 description="A unique identifier for the schedule.",
136 )
138 @field_validator("max_scheduled_runs") 1a
139 @classmethod 1a
140 def validate_max_scheduled_runs( 1a
141 cls, v: PositiveInteger | None
142 ) -> PositiveInteger | None:
143 return validate_schedule_max_scheduled_runs(
144 v, PREFECT_DEPLOYMENT_SCHEDULE_MAX_SCHEDULED_RUNS.value()
145 )
148class DeploymentCreate(ActionBaseModel): 1a
149 """Data used by the Prefect REST API to create a deployment."""
151 name: str = Field( 1a
152 default=...,
153 description="The name of the deployment.",
154 examples=["my-deployment"],
155 )
156 flow_id: UUID = Field( 1a
157 default=..., description="The ID of the flow associated with the deployment."
158 )
159 paused: bool = Field( 1a
160 default=False, description="Whether or not the deployment is paused."
161 )
162 schedules: list[DeploymentScheduleCreate] = Field( 1a
163 default_factory=lambda: [],
164 description="A list of schedules for the deployment.",
165 )
166 concurrency_limit: Optional[PositiveInteger] = Field( 1a
167 default=None, description="The deployment's concurrency limit."
168 )
169 concurrency_options: Optional[schemas.core.ConcurrencyOptions] = Field( 1a
170 default=None, description="The deployment's concurrency options."
171 )
172 global_concurrency_limit_id: Optional[UUID] = Field( 1a
173 default=None,
174 description="The ID of the global concurrency limit to apply to the deployment.",
175 )
176 enforce_parameter_schema: bool = Field( 1a
177 default=True,
178 description=(
179 "Whether or not the deployment should enforce the parameter schema."
180 ),
181 )
182 parameter_openapi_schema: Optional[ParameterSchema] = Field( 1a
183 default_factory=lambda: {"type": "object", "properties": {}},
184 description="The parameter schema of the flow, including defaults.",
185 json_schema_extra={"additionalProperties": True},
186 )
187 parameters: Dict[str, Any] = Field( 1a
188 default_factory=dict,
189 description="Parameters for flow runs scheduled by the deployment.",
190 json_schema_extra={"additionalProperties": True},
191 )
192 tags: List[str] = Field( 1a
193 default_factory=list,
194 description="A list of deployment tags.",
195 examples=[["tag-1", "tag-2"]],
196 )
197 labels: Union[KeyValueLabels, None] = Field( 1a
198 default_factory=dict,
199 description="A dictionary of key-value labels. Values can be strings, numbers, or booleans.",
200 examples=[{"key": "value1", "key2": 42}],
201 )
202 pull_steps: Optional[List[dict[str, Any]]] = Field(None) 1a
204 work_queue_name: Optional[str] = Field(None) 1a
205 work_pool_name: Optional[str] = Field( 1a
206 default=None,
207 description="The name of the deployment's work pool.",
208 examples=["my-work-pool"],
209 )
210 storage_document_id: Optional[UUID] = Field(None) 1a
211 infrastructure_document_id: Optional[UUID] = Field(None) 1a
212 description: Optional[str] = Field(None) 1a
213 path: Optional[str] = Field(None) 1a
214 version: Optional[str] = Field(None) 1a
215 entrypoint: Optional[str] = Field(None) 1a
216 job_variables: Dict[str, Any] = Field( 1a
217 default_factory=dict,
218 description="Overrides for the flow's infrastructure configuration.",
219 json_schema_extra={"additionalProperties": True},
220 )
222 version_info: Optional[schemas.core.VersionInfo] = Field( 1a
223 default=None, description="A description of this version of the deployment."
224 )
226 def check_valid_configuration(self, base_job_template: dict[str, Any]) -> None: 1a
227 """
228 Check that the combination of base_job_template defaults and job_variables
229 conforms to the specified schema.
231 NOTE: This method does not hydrate block references in default values within the
232 base job template to validate them. Failing to do this can cause user-facing
233 errors. Instead of this method, use `validate_job_variables_for_deployment`
234 function from `prefect_cloud.orion.api.validation`.
235 """
236 # This import is here to avoid a circular import
237 from prefect.utilities.schema_tools import validate
239 variables_schema = deepcopy(base_job_template.get("variables"))
241 if variables_schema is not None:
242 validate(
243 self.job_variables,
244 variables_schema,
245 raise_on_error=True,
246 preprocess=True,
247 ignore_required=True,
248 )
250 @model_validator(mode="before") 1a
251 @classmethod 1a
252 def remove_old_fields(cls, values: dict[str, Any]) -> dict[str, Any]: 1a
253 return remove_old_deployment_fields(values)
255 @model_validator(mode="before") 1a
256 def _validate_parameters_conform_to_schema( 1a
257 cls, values: dict[str, Any]
258 ) -> dict[str, Any]:
259 values["parameters"] = validate_parameters_conform_to_schema(
260 values.get("parameters", {}), values
261 )
262 schema = validate_parameter_openapi_schema(
263 values.get("parameter_openapi_schema"), values
264 )
265 if schema is not None:
266 values["parameter_openapi_schema"] = schema
267 return values
269 @model_validator(mode="before") 1a
270 def _validate_concurrency_limits(cls, values: dict[str, Any]) -> dict[str, Any]: 1a
271 """Validate that a deployment does not have both a concurrency limit and global concurrency limit."""
272 if values.get("concurrency_limit") and values.get(
273 "global_concurrency_limit_id"
274 ):
275 raise ValueError(
276 "A deployment cannot have both a concurrency limit and a global concurrency limit."
277 )
278 return values
281class DeploymentUpdate(ActionBaseModel): 1a
282 """Data used by the Prefect REST API to update a deployment."""
284 @model_validator(mode="before") 1a
285 @classmethod 1a
286 def remove_old_fields(cls, values: dict[str, Any]) -> dict[str, Any]: 1a
287 return remove_old_deployment_fields(values)
289 version: Optional[str] = Field(default=None) 1a
290 description: Optional[str] = Field(default=None) 1a
291 paused: bool = Field( 1a
292 default=False, description="Whether or not the deployment is paused."
293 )
294 schedules: list[DeploymentScheduleUpdate] = Field( 1a
295 default_factory=lambda: [],
296 description="A list of schedules for the deployment.",
297 )
298 concurrency_limit: Optional[PositiveInteger] = Field( 1a
299 default=None, description="The deployment's concurrency limit."
300 )
301 concurrency_options: Optional[schemas.core.ConcurrencyOptions] = Field( 1a
302 default=None, description="The deployment's concurrency options."
303 )
304 global_concurrency_limit_id: Optional[UUID] = Field( 1a
305 default=None,
306 description="The ID of the global concurrency limit to apply to the deployment.",
307 )
308 parameters: Optional[Dict[str, Any]] = Field( 1a
309 default=None,
310 description="Parameters for flow runs scheduled by the deployment.",
311 )
312 parameter_openapi_schema: Optional[ParameterSchema] = Field( 1a
313 default=None,
314 description="The parameter schema of the flow, including defaults.",
315 )
316 tags: List[str] = Field( 1a
317 default_factory=list,
318 description="A list of deployment tags.",
319 examples=[["tag-1", "tag-2"]],
320 )
321 work_queue_name: Optional[str] = Field(default=None) 1a
322 work_pool_name: Optional[str] = Field( 1a
323 default=None,
324 description="The name of the deployment's work pool.",
325 examples=["my-work-pool"],
326 )
327 path: Optional[str] = Field(default=None) 1a
328 job_variables: Optional[Dict[str, Any]] = Field( 1a
329 default=None,
330 description="Overrides for the flow's infrastructure configuration.",
331 )
332 pull_steps: Optional[List[dict[str, Any]]] = Field(default=None) 1a
333 entrypoint: Optional[str] = Field(default=None) 1a
334 storage_document_id: Optional[UUID] = Field(default=None) 1a
335 infrastructure_document_id: Optional[UUID] = Field(default=None) 1a
336 enforce_parameter_schema: Optional[bool] = Field( 1a
337 default=None,
338 description=(
339 "Whether or not the deployment should enforce the parameter schema."
340 ),
341 )
342 model_config: ClassVar[ConfigDict] = ConfigDict(populate_by_name=True) 1a
344 version_info: Optional[schemas.core.VersionInfo] = Field( 1a
345 default=None, description="A description of this version of the deployment."
346 )
348 def check_valid_configuration(self, base_job_template: dict[str, Any]) -> None: 1a
349 """
350 Check that the combination of base_job_template defaults and job_variables
351 conforms to the schema specified in the base_job_template.
353 NOTE: This method does not hydrate block references in default values within the
354 base job template to validate them. Failing to do this can cause user-facing
355 errors. Instead of this method, use `validate_job_variables_for_deployment`
356 function from `prefect_cloud.orion.api.validation`.
357 """
358 # This import is here to avoid a circular import
359 from prefect.utilities.schema_tools import validate
361 variables_schema = deepcopy(base_job_template.get("variables"))
363 if variables_schema is not None and self.job_variables is not None:
364 errors = validate(
365 self.job_variables,
366 variables_schema,
367 raise_on_error=False,
368 preprocess=True,
369 ignore_required=True,
370 )
371 if errors:
372 for error in errors:
373 raise error
375 @model_validator(mode="before") 1a
376 def _validate_concurrency_limits(cls, values: dict[str, Any]) -> dict[str, Any]: 1a
377 """Validate that a deployment does not have both a concurrency limit and global concurrency limit."""
378 if values.get("concurrency_limit") and values.get(
379 "global_concurrency_limit_id"
380 ):
381 raise ValueError(
382 "A deployment cannot have both a concurrency limit and a global concurrency limit."
383 )
384 return values
387class FlowRunUpdate(ActionBaseModel): 1a
388 """Data used by the Prefect REST API to update a flow run."""
390 name: Optional[str] = Field(None) 1a
391 flow_version: Optional[str] = Field(None) 1a
392 parameters: Dict[str, Any] = Field(default_factory=dict) 1a
393 empirical_policy: schemas.core.FlowRunPolicy = Field( 1a
394 default_factory=schemas.core.FlowRunPolicy
395 )
396 tags: List[str] = Field(default_factory=list) 1a
397 infrastructure_pid: Optional[str] = Field(None) 1a
398 job_variables: Optional[Dict[str, Any]] = Field(None) 1a
400 @field_validator("name", mode="before") 1a
401 @classmethod 1a
402 def set_name(cls, name: str) -> str: 1a
403 return get_or_create_run_name(name)
406class StateCreate(ActionBaseModel): 1a
407 """Data used by the Prefect REST API to create a new state."""
409 type: schemas.states.StateType = Field( 1a
410 default=..., description="The type of the state to create"
411 )
412 name: Optional[str] = Field( 1a
413 default=None, description="The name of the state to create"
414 )
415 message: Optional[str] = Field( 1a
416 default=None, description="The message of the state to create"
417 )
418 data: Optional[Any] = Field( 1a
419 default=None, description="The data of the state to create"
420 )
421 state_details: schemas.states.StateDetails = Field( 1a
422 default_factory=schemas.states.StateDetails,
423 description="The details of the state to create",
424 )
426 @model_validator(mode="after") 1a
427 def default_name_from_type(self): 1a
428 """If a name is not provided, use the type"""
429 # if `type` is not in `values` it means the `type` didn't pass its own
430 # validation check and an error will be raised after this function is called
431 name = self.name
432 if name is None and self.type:
433 self.name = " ".join([v.capitalize() for v in self.type.value.split("_")])
434 return self
436 @model_validator(mode="after") 1a
437 def default_scheduled_start_time(self): 1a
438 from prefect.server.schemas.states import StateType
440 if self.type == StateType.SCHEDULED:
441 if not self.state_details.scheduled_time:
442 self.state_details.scheduled_time = now("UTC")
444 return self
447class TaskRunCreate(ActionBaseModel): 1a
448 """Data used by the Prefect REST API to create a task run"""
450 id: Optional[UUID] = Field( 1a
451 default=None,
452 description="The ID to assign to the task run. If not provided, a random UUID will be generated.",
453 )
454 # TaskRunCreate states must be provided as StateCreate objects
455 state: Optional[StateCreate] = Field( 1a
456 default=None, description="The state of the task run to create"
457 )
459 name: str = Field( 1a
460 default_factory=lambda: generate_slug(2), examples=["my-task-run"]
461 )
462 flow_run_id: Optional[UUID] = Field( 1a
463 default=None, description="The flow run id of the task run."
464 )
465 task_key: str = Field( 1a
466 default=..., description="A unique identifier for the task being run."
467 )
468 dynamic_key: str = Field( 1a
469 default=...,
470 description=(
471 "A dynamic key used to differentiate between multiple runs of the same task"
472 " within the same flow run."
473 ),
474 )
475 cache_key: Optional[str] = Field( 1a
476 default=None,
477 description=(
478 "An optional cache key. If a COMPLETED state associated with this cache key"
479 " is found, the cached COMPLETED state will be used instead of executing"
480 " the task run."
481 ),
482 )
483 cache_expiration: Optional[DateTime] = Field( 1a
484 default=None, description="Specifies when the cached state should expire."
485 )
486 task_version: Optional[str] = Field( 1a
487 default=None, description="The version of the task being run."
488 )
489 empirical_policy: schemas.core.TaskRunPolicy = Field( 1a
490 default_factory=schemas.core.TaskRunPolicy,
491 )
492 tags: List[str] = Field( 1a
493 default_factory=list,
494 description="A list of tags for the task run.",
495 examples=[["tag-1", "tag-2"]],
496 )
497 labels: Union[KeyValueLabels, None] = Field( 1a
498 default_factory=dict,
499 description="A dictionary of key-value labels. Values can be strings, numbers, or booleans.",
500 examples=[{"key": "value1", "key2": 42}],
501 )
502 task_inputs: Dict[ 1a
503 str,
504 List[
505 Union[
506 schemas.core.TaskRunResult,
507 schemas.core.FlowRunResult,
508 schemas.core.Parameter,
509 schemas.core.Constant,
510 ]
511 ],
512 ] = Field(
513 default_factory=dict,
514 description="The inputs to the task run.",
515 )
517 @field_validator("name", mode="before") 1a
518 @classmethod 1a
519 def set_name(cls, name: str) -> str: 1a
520 return get_or_create_run_name(name)
522 @field_validator("cache_key") 1a
523 @classmethod 1a
524 def validate_cache_key(cls, cache_key: str | None) -> str | None: 1a
525 return validate_cache_key_length(cache_key)
528class TaskRunUpdate(ActionBaseModel): 1a
529 """Data used by the Prefect REST API to update a task run"""
531 name: str = Field( 1a
532 default_factory=lambda: generate_slug(2), examples=["my-task-run"]
533 )
535 @field_validator("name", mode="before") 1a
536 @classmethod 1a
537 def set_name(cls, name: str) -> str: 1a
538 return get_or_create_run_name(name)
541class FlowRunCreate(ActionBaseModel): 1a
542 """Data used by the Prefect REST API to create a flow run."""
544 # FlowRunCreate states must be provided as StateCreate objects
545 state: Optional[StateCreate] = Field( 1a
546 default=None, description="The state of the flow run to create"
547 )
549 name: str = Field( 1a
550 default_factory=lambda: generate_slug(2),
551 description=(
552 "The name of the flow run. Defaults to a random slug if not specified."
553 ),
554 examples=["my-flow-run"],
555 )
556 flow_id: UUID = Field(default=..., description="The id of the flow being run.") 1a
557 flow_version: Optional[str] = Field( 1a
558 default=None, description="The version of the flow being run."
559 )
560 parameters: Dict[str, Any] = Field( 1a
561 default_factory=dict,
562 )
563 context: Dict[str, Any] = Field( 1a
564 default_factory=dict,
565 description="The context of the flow run.",
566 )
567 parent_task_run_id: Optional[UUID] = Field(None) 1a
568 infrastructure_document_id: Optional[UUID] = Field(None) 1a
569 empirical_policy: schemas.core.FlowRunPolicy = Field( 1a
570 default_factory=schemas.core.FlowRunPolicy,
571 description="The empirical policy for the flow run.",
572 )
573 tags: List[str] = Field( 1a
574 default_factory=list,
575 description="A list of tags for the flow run.",
576 examples=[["tag-1", "tag-2"]],
577 )
578 labels: Union[KeyValueLabels, None] = Field( 1a
579 default_factory=dict,
580 description="A dictionary of key-value labels. Values can be strings, numbers, or booleans.",
581 examples=[{"key": "value1", "key2": 42}],
582 )
583 idempotency_key: Optional[str] = Field( 1a
584 None,
585 description=(
586 "An optional idempotency key. If a flow run with the same idempotency key"
587 " has already been created, the existing flow run will be returned."
588 ),
589 )
590 work_pool_name: Optional[str] = Field( 1a
591 default=None,
592 description="The name of the work pool to run the flow run in.",
593 )
594 work_queue_name: Optional[str] = Field( 1a
595 default=None,
596 description="The name of the work queue to place the flow run in.",
597 )
598 job_variables: Optional[Dict[str, Any]] = Field( 1a
599 default=None,
600 description="The job variables to use when setting up flow run infrastructure.",
601 )
603 # DEPRECATED
605 deployment_id: Optional[UUID] = Field( 1a
606 None,
607 description=(
608 "DEPRECATED: The id of the deployment associated with this flow run, if"
609 " available."
610 ),
611 deprecated=True,
612 )
614 @field_validator("name", mode="before") 1a
615 @classmethod 1a
616 def set_name(cls, name: str) -> str: 1a
617 return get_or_create_run_name(name)
620class DeploymentFlowRunCreate(ActionBaseModel): 1a
621 """Data used by the Prefect REST API to create a flow run from a deployment."""
623 # FlowRunCreate states must be provided as StateCreate objects
624 state: Optional[StateCreate] = Field( 1a
625 default=None, description="The state of the flow run to create"
626 )
628 name: str = Field( 1a
629 default_factory=lambda: generate_slug(2),
630 description=(
631 "The name of the flow run. Defaults to a random slug if not specified."
632 ),
633 examples=["my-flow-run"],
634 )
635 parameters: Dict[str, Any] = Field( 1a
636 default_factory=dict,
637 json_schema_extra={"additionalProperties": True},
638 )
639 enforce_parameter_schema: Optional[bool] = Field( 1a
640 default=None,
641 description="Whether or not to enforce the parameter schema on this run.",
642 )
643 context: Dict[str, Any] = Field(default_factory=dict) 1a
644 infrastructure_document_id: Optional[UUID] = Field(None) 1a
645 empirical_policy: schemas.core.FlowRunPolicy = Field( 1a
646 default_factory=schemas.core.FlowRunPolicy,
647 description="The empirical policy for the flow run.",
648 )
649 tags: List[str] = Field( 1a
650 default_factory=list,
651 description="A list of tags for the flow run.",
652 examples=[["tag-1", "tag-2"]],
653 )
654 idempotency_key: Optional[str] = Field( 1a
655 None,
656 description=(
657 "An optional idempotency key. If a flow run with the same idempotency key"
658 " has already been created, the existing flow run will be returned."
659 ),
660 )
661 labels: Union[KeyValueLabels, None] = Field( 1a
662 None,
663 description="A dictionary of key-value labels. Values can be strings, numbers, or booleans.",
664 examples=[{"key": "value1", "key2": 42}],
665 )
666 parent_task_run_id: Optional[UUID] = Field(None) 1a
667 work_queue_name: Optional[str] = Field(None) 1a
668 job_variables: Optional[Dict[str, Any]] = Field( 1a
669 default_factory=dict,
670 json_schema_extra={"additionalProperties": True},
671 )
673 @field_validator("name", mode="before") 1a
674 @classmethod 1a
675 def set_name(cls, name: str) -> str: 1a
676 return get_or_create_run_name(name)
679class SavedSearchCreate(ActionBaseModel): 1a
680 """Data used by the Prefect REST API to create a saved search."""
682 name: str = Field(default=..., description="The name of the saved search.") 1a
683 filters: list[schemas.core.SavedSearchFilter] = Field( 1a
684 default_factory=lambda: [], description="The filter set for the saved search."
685 )
688class ConcurrencyLimitCreate(ActionBaseModel): 1a
689 """Data used by the Prefect REST API to create a concurrency limit."""
691 tag: str = Field( 1a
692 default=..., description="A tag the concurrency limit is applied to."
693 )
694 concurrency_limit: int = Field(default=..., description="The concurrency limit.") 1a
697class ConcurrencyLimitV2Create(ActionBaseModel): 1a
698 """Data used by the Prefect REST API to create a v2 concurrency limit."""
700 active: bool = Field( 1a
701 default=True, description="Whether the concurrency limit is active."
702 )
703 name: Name = Field(default=..., description="The name of the concurrency limit.") 1a
704 limit: NonNegativeInteger = Field(default=..., description="The concurrency limit.") 1a
705 active_slots: NonNegativeInteger = Field( 1a
706 default=0, description="The number of active slots."
707 )
708 denied_slots: NonNegativeInteger = Field( 1a
709 default=0, description="The number of denied slots."
710 )
711 slot_decay_per_second: NonNegativeFloat = Field( 1a
712 default=0,
713 description="The decay rate for active slots when used as a rate limit.",
714 )
717class ConcurrencyLimitV2Update(ActionBaseModel): 1a
718 """Data used by the Prefect REST API to update a v2 concurrency limit."""
720 active: Optional[bool] = Field(None) 1a
721 name: Optional[Name] = Field(None) 1a
722 limit: Optional[NonNegativeInteger] = Field(None) 1a
723 active_slots: Optional[NonNegativeInteger] = Field(None) 1a
724 denied_slots: Optional[NonNegativeInteger] = Field(None) 1a
725 slot_decay_per_second: Optional[NonNegativeFloat] = Field(None) 1a
728class BlockTypeCreate(ActionBaseModel): 1a
729 """Data used by the Prefect REST API to create a block type."""
731 name: Name = Field(default=..., description="A block type's name") 1a
732 slug: BlockTypeSlug = Field(default=..., description="A block type's slug") 1a
733 logo_url: Optional[str] = Field( # TODO: HttpUrl 1a
734 default=None, description="Web URL for the block type's logo"
735 )
736 documentation_url: Optional[str] = Field( # TODO: HttpUrl 1a
737 default=None, description="Web URL for the block type's documentation"
738 )
739 description: Optional[str] = Field( 1a
740 default=None,
741 description="A short blurb about the corresponding block's intended use",
742 )
743 code_example: Optional[str] = Field( 1a
744 default=None,
745 description="A code snippet demonstrating use of the corresponding block",
746 )
749class BlockTypeUpdate(ActionBaseModel): 1a
750 """Data used by the Prefect REST API to update a block type."""
752 logo_url: Optional[str] = Field(None) # TODO: HttpUrl 1a
753 documentation_url: Optional[str] = Field(None) # TODO: HttpUrl 1a
754 description: Optional[str] = Field(None) 1a
755 code_example: Optional[str] = Field(None) 1a
757 @classmethod 1a
758 def updatable_fields(cls) -> set[str]: 1a
759 return get_class_fields_only(cls)
762class BlockSchemaCreate(ActionBaseModel): 1a
763 """Data used by the Prefect REST API to create a block schema."""
765 fields: dict[str, Any] = Field( 1a
766 default_factory=dict, description="The block schema's field schema"
767 )
768 block_type_id: UUID = Field(default=..., description="A block type ID") 1a
770 capabilities: List[str] = Field( 1a
771 default_factory=list,
772 description="A list of Block capabilities",
773 )
774 version: str = Field( 1a
775 default=schemas.core.DEFAULT_BLOCK_SCHEMA_VERSION,
776 description="Human readable identifier for the block schema",
777 )
780class BlockDocumentCreate(ActionBaseModel): 1a
781 """Data used by the Prefect REST API to create a block document."""
783 name: Optional[BlockDocumentName] = Field( 1a
784 default=None,
785 description=(
786 "The block document's name. Not required for anonymous block documents."
787 ),
788 )
789 data: Dict[str, Any] = Field( 1a
790 default_factory=dict, description="The block document's data"
791 )
792 block_schema_id: UUID = Field(default=..., description="A block schema ID") 1a
794 block_type_id: UUID = Field(default=..., description="A block type ID") 1a
796 is_anonymous: bool = Field( 1a
797 default=False,
798 description=(
799 "Whether the block is anonymous (anonymous blocks are usually created by"
800 " Prefect automatically)"
801 ),
802 )
804 @model_validator(mode="before") 1a
805 def validate_name_is_present_if_not_anonymous( 1a
806 cls, values: dict[str, Any]
807 ) -> dict[str, Any]:
808 return validate_name_present_on_nonanonymous_blocks(values)
811class BlockDocumentUpdate(ActionBaseModel): 1a
812 """Data used by the Prefect REST API to update a block document."""
814 block_schema_id: Optional[UUID] = Field( 1a
815 default=None, description="A block schema ID"
816 )
817 data: Dict[str, Any] = Field( 1a
818 default_factory=dict, description="The block document's data"
819 )
820 merge_existing_data: bool = True 1a
823class BlockDocumentReferenceCreate(ActionBaseModel): 1a
824 """Data used to create block document reference."""
826 id: UUID = Field( 1a
827 default_factory=uuid4, description="The block document reference ID"
828 )
829 parent_block_document_id: UUID = Field( 1a
830 default=..., description="ID of the parent block document"
831 )
832 reference_block_document_id: UUID = Field( 1a
833 default=..., description="ID of the nested block document"
834 )
835 name: str = Field( 1a
836 default=..., description="The name that the reference is nested under"
837 )
839 @model_validator(mode="before") 1a
840 def validate_parent_and_ref_are_different(cls, values): 1a
841 return validate_parent_and_ref_diff(values)
844class LogCreate(ActionBaseModel): 1a
845 """Data used by the Prefect REST API to create a log."""
847 name: str = Field(default=..., description="The logger name.") 1a
848 level: int = Field(default=..., description="The log level.") 1a
849 message: str = Field(default=..., description="The log message.") 1a
850 timestamp: DateTime = Field(default=..., description="The log timestamp.") 1a
851 flow_run_id: Optional[UUID] = Field(None) 1a
852 task_run_id: Optional[UUID] = Field(None) 1a
855def validate_base_job_template(v: dict[str, Any]) -> dict[str, Any]: 1a
856 if v == dict():
857 return v
859 job_config = v.get("job_configuration")
860 variables_schema = v.get("variables")
861 if not (job_config and variables_schema):
862 raise ValueError(
863 "The `base_job_template` must contain both a `job_configuration` key"
864 " and a `variables` key."
865 )
866 template_variables: set[str] = set()
867 for template in job_config.values():
868 # find any variables inside of double curly braces, minus any whitespace
869 # e.g. "{{ var1 }}.{{var2}}" -> ["var1", "var2"]
870 # convert to json string to handle nested objects and lists
871 found_variables = find_placeholders(json.dumps(template))
872 template_variables.update({placeholder.name for placeholder in found_variables})
874 provided_variables = set(variables_schema.get("properties", {}).keys())
875 if not template_variables.issubset(provided_variables):
876 missing_variables = template_variables - provided_variables
877 raise ValueError(
878 "The variables specified in the job configuration template must be "
879 "present as properties in the variables schema. "
880 "Your job configuration uses the following undeclared "
881 f"variable(s): {' ,'.join(missing_variables)}."
882 )
883 return v
886class WorkPoolCreate(ActionBaseModel): 1a
887 """Data used by the Prefect REST API to create a work pool."""
889 name: NonEmptyishName = Field(..., description="The name of the work pool.") 1a
890 description: Optional[str] = Field( 1a
891 default=None, description="The work pool description."
892 )
893 type: str = Field(description="The work pool type.", default="prefect-agent") 1a
894 base_job_template: Dict[str, Any] = Field( 1a
895 default_factory=dict, description="The work pool's base job template."
896 )
897 is_paused: bool = Field( 1a
898 default=False,
899 description="Pausing the work pool stops the delivery of all work.",
900 )
901 concurrency_limit: Optional[NonNegativeInteger] = Field( 1a
902 default=None, description="A concurrency limit for the work pool."
903 )
905 storage_configuration: schemas.core.WorkPoolStorageConfiguration = Field( 1a
906 default_factory=schemas.core.WorkPoolStorageConfiguration,
907 description="The storage configuration for the work pool.",
908 )
910 _validate_base_job_template = field_validator("base_job_template")( 1a
911 validate_base_job_template
912 )
915class WorkPoolUpdate(ActionBaseModel): 1a
916 """Data used by the Prefect REST API to update a work pool."""
918 description: Optional[str] = Field(default=None) 1a
919 is_paused: Optional[bool] = Field(default=None) 1a
920 base_job_template: Optional[Dict[str, Any]] = Field(default=None) 1a
921 concurrency_limit: Optional[NonNegativeInteger] = Field(default=None) 1a
922 storage_configuration: Optional[schemas.core.WorkPoolStorageConfiguration] = Field( 1a
923 default=None,
924 description="The storage configuration for the work pool.",
925 )
926 _validate_base_job_template = field_validator("base_job_template")( 1a
927 validate_base_job_template
928 )
931class WorkQueueCreate(ActionBaseModel): 1a
932 """Data used by the Prefect REST API to create a work queue."""
934 name: Name = Field(default=..., description="The name of the work queue.") 1a
935 description: Optional[str] = Field( 1a
936 default="", description="An optional description for the work queue."
937 )
938 is_paused: bool = Field( 1a
939 default=False, description="Whether or not the work queue is paused."
940 )
941 concurrency_limit: Optional[NonNegativeInteger] = Field( 1a
942 None, description="The work queue's concurrency limit."
943 )
944 priority: Optional[PositiveInteger] = Field( 1a
945 None,
946 description=(
947 "The queue's priority. Lower values are higher priority (1 is the highest)."
948 ),
949 )
951 # DEPRECATED
953 filter: Optional[schemas.core.QueueFilter] = Field( 1a
954 None,
955 description="DEPRECATED: Filter criteria for the work queue.",
956 deprecated=True,
957 )
960class WorkQueueUpdate(ActionBaseModel): 1a
961 """Data used by the Prefect REST API to update a work queue."""
963 name: Optional[str] = Field(None) 1a
964 description: Optional[str] = Field(None) 1a
965 is_paused: bool = Field( 1a
966 default=False, description="Whether or not the work queue is paused."
967 )
968 concurrency_limit: Optional[NonNegativeInteger] = Field(None) 1a
969 priority: Optional[PositiveInteger] = Field(None) 1a
970 last_polled: Optional[DateTime] = Field(None) 1a
972 # DEPRECATED
974 filter: Optional[schemas.core.QueueFilter] = Field( 1a
975 None,
976 description="DEPRECATED: Filter criteria for the work queue.",
977 deprecated=True,
978 )
981class ArtifactCreate(ActionBaseModel): 1a
982 """Data used by the Prefect REST API to create an artifact."""
984 key: Optional[ArtifactKey] = Field( 1a
985 default=None, description="An optional unique reference key for this artifact."
986 )
987 type: Optional[str] = Field( 1a
988 default=None,
989 description=(
990 "An identifier that describes the shape of the data field. e.g. 'result',"
991 " 'table', 'markdown'"
992 ),
993 )
994 description: Optional[str] = Field( 1a
995 default=None, description="A markdown-enabled description of the artifact."
996 )
997 data: Optional[Union[Dict[str, Any], Any]] = Field( 1a
998 default=None,
999 description=(
1000 "Data associated with the artifact, e.g. a result.; structure depends on"
1001 " the artifact type."
1002 ),
1003 )
1004 metadata_: Optional[ 1a
1005 Annotated[dict[str, str], AfterValidator(validate_max_metadata_length)]
1006 ] = Field(
1007 default=None,
1008 description=(
1009 "User-defined artifact metadata. Content must be string key and value"
1010 " pairs."
1011 ),
1012 )
1013 flow_run_id: Optional[UUID] = Field( 1a
1014 default=None, description="The flow run associated with the artifact."
1015 )
1016 task_run_id: Optional[UUID] = Field( 1a
1017 default=None, description="The task run associated with the artifact."
1018 )
1020 @classmethod 1a
1021 def from_result(cls, data: Any | dict[str, Any]) -> "ArtifactCreate": 1a
1022 artifact_info: dict[str, Any] = dict()
1023 if isinstance(data, dict):
1024 artifact_key = data.pop("artifact_key", None)
1025 if artifact_key:
1026 artifact_info["key"] = artifact_key
1028 artifact_type = data.pop("artifact_type", None)
1029 if artifact_type:
1030 artifact_info["type"] = artifact_type
1032 description = data.pop("artifact_description", None)
1033 if description:
1034 artifact_info["description"] = description
1036 return cls(data=data, **artifact_info)
1039class ArtifactUpdate(ActionBaseModel): 1a
1040 """Data used by the Prefect REST API to update an artifact."""
1042 data: Optional[Union[Dict[str, Any], Any]] = Field(None) 1a
1043 description: Optional[str] = Field(None) 1a
1044 metadata_: Optional[ 1a
1045 Annotated[dict[str, str], AfterValidator(validate_max_metadata_length)]
1046 ] = Field(None)
1049class VariableCreate(ActionBaseModel): 1a
1050 """Data used by the Prefect REST API to create a Variable."""
1052 name: VariableName = Field(default=...) 1a
1053 value: StrictVariableValue = Field( 1a
1054 default=...,
1055 description="The value of the variable",
1056 examples=["my-value"],
1057 )
1058 tags: list[str] = Field( 1a
1059 default_factory=list,
1060 description="A list of variable tags",
1061 examples=[["tag-1", "tag-2"]],
1062 )
1065class VariableUpdate(ActionBaseModel): 1a
1066 """Data used by the Prefect REST API to update a Variable."""
1068 name: Optional[VariableName] = Field(default=None) 1a
1069 value: StrictVariableValue = Field( 1a
1070 default=None,
1071 description="The value of the variable",
1072 examples=["my-value"],
1073 )
1074 tags: Optional[list[str]] = Field( 1a
1075 default=None,
1076 description="A list of variable tags",
1077 examples=[["tag-1", "tag-2"]],
1078 )