Coverage for /usr/local/lib/python3.12/site-packages/prefect/deployments/schedules.py: 20%
31 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1from typing import TYPE_CHECKING, Any, List, Optional, Sequence, Union 1a
3from prefect.client.schemas.actions import ( 1a
4 DeploymentScheduleCreate,
5 DeploymentScheduleUpdate,
6)
7from prefect.client.schemas.schedules import is_schedule_type 1a
8from prefect.schedules import Schedule 1a
10if TYPE_CHECKING: 10 ↛ 11line 10 didn't jump to line 11 because the condition on line 10 was never true1a
11 from prefect.client.schemas.schedules import SCHEDULE_TYPES
13FlexibleScheduleList = Sequence[ 1a
14 Union[DeploymentScheduleCreate, dict[str, Any], "SCHEDULE_TYPES"]
15]
18def create_deployment_schedule_create( 1a
19 schedule: Union["SCHEDULE_TYPES", "Schedule"],
20 active: Optional[bool] = True,
21) -> DeploymentScheduleCreate:
22 """Create a DeploymentScheduleCreate object from common schedule parameters."""
24 if isinstance(schedule, Schedule):
25 return DeploymentScheduleCreate.from_schedule(schedule)
26 return DeploymentScheduleCreate(
27 schedule=schedule,
28 active=active if active is not None else True,
29 )
32def normalize_to_deployment_schedule( 1a
33 schedules: Optional["FlexibleScheduleList"],
34) -> List[Union[DeploymentScheduleCreate, DeploymentScheduleUpdate]]:
35 normalized: list[Union[DeploymentScheduleCreate, DeploymentScheduleUpdate]] = []
36 if schedules is not None:
37 for obj in schedules:
38 if is_schedule_type(obj):
39 normalized.append(create_deployment_schedule_create(obj))
40 elif isinstance(obj, Schedule):
41 normalized.append(create_deployment_schedule_create(obj))
42 elif isinstance(obj, dict):
43 normalized.append(create_deployment_schedule_create(**obj))
44 elif isinstance(obj, DeploymentScheduleCreate):
45 normalized.append(obj)
46 elif isinstance(obj, DeploymentScheduleUpdate):
47 normalized.append(obj)
48 elif _is_server_schema(obj):
49 raise ValueError(
50 "Server schema schedules are not supported. Please use "
51 "the schedule objects from `prefect.client.schemas.schedules`"
52 )
53 else:
54 raise ValueError(
55 "Invalid schedule provided. Must be a schedule object, a dict,"
56 "or a `DeploymentScheduleCreate` object"
57 )
59 return normalized
62def _is_server_schema(obj: Any): 1a
63 return obj.__class__.__module__.startswith("prefect.server.schemas")