Coverage for /usr/local/lib/python3.12/site-packages/prefect/deployments/schedules.py: 20%

31 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1from typing import TYPE_CHECKING, Any, List, Optional, Sequence, Union 1a

2 

3from prefect.client.schemas.actions import ( 1a

4 DeploymentScheduleCreate, 

5 DeploymentScheduleUpdate, 

6) 

7from prefect.client.schemas.schedules import is_schedule_type 1a

8from prefect.schedules import Schedule 1a

9 

10if TYPE_CHECKING: 10 ↛ 11line 10 didn't jump to line 11 because the condition on line 10 was never true1a

11 from prefect.client.schemas.schedules import SCHEDULE_TYPES 

12 

13FlexibleScheduleList = Sequence[ 1a

14 Union[DeploymentScheduleCreate, dict[str, Any], "SCHEDULE_TYPES"] 

15] 

16 

17 

18def create_deployment_schedule_create( 1a

19 schedule: Union["SCHEDULE_TYPES", "Schedule"], 

20 active: Optional[bool] = True, 

21) -> DeploymentScheduleCreate: 

22 """Create a DeploymentScheduleCreate object from common schedule parameters.""" 

23 

24 if isinstance(schedule, Schedule): 

25 return DeploymentScheduleCreate.from_schedule(schedule) 

26 return DeploymentScheduleCreate( 

27 schedule=schedule, 

28 active=active if active is not None else True, 

29 ) 

30 

31 

32def normalize_to_deployment_schedule( 1a

33 schedules: Optional["FlexibleScheduleList"], 

34) -> List[Union[DeploymentScheduleCreate, DeploymentScheduleUpdate]]: 

35 normalized: list[Union[DeploymentScheduleCreate, DeploymentScheduleUpdate]] = [] 

36 if schedules is not None: 

37 for obj in schedules: 

38 if is_schedule_type(obj): 

39 normalized.append(create_deployment_schedule_create(obj)) 

40 elif isinstance(obj, Schedule): 

41 normalized.append(create_deployment_schedule_create(obj)) 

42 elif isinstance(obj, dict): 

43 normalized.append(create_deployment_schedule_create(**obj)) 

44 elif isinstance(obj, DeploymentScheduleCreate): 

45 normalized.append(obj) 

46 elif isinstance(obj, DeploymentScheduleUpdate): 

47 normalized.append(obj) 

48 elif _is_server_schema(obj): 

49 raise ValueError( 

50 "Server schema schedules are not supported. Please use " 

51 "the schedule objects from `prefect.client.schemas.schedules`" 

52 ) 

53 else: 

54 raise ValueError( 

55 "Invalid schedule provided. Must be a schedule object, a dict," 

56 "or a `DeploymentScheduleCreate` object" 

57 ) 

58 

59 return normalized 

60 

61 

62def _is_server_schema(obj: Any): 1a

63 return obj.__class__.__module__.startswith("prefect.server.schemas")