Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/deploy/_schedules.py: 19%

49 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1from __future__ import annotations 1a

2 

3import json 1a

4from datetime import timedelta 1a

5from typing import Any 1a

6 

7import prefect.cli.root as root 1a

8from prefect.cli._prompts import prompt_schedules 1a

9from prefect.cli.root import app 1a

10from prefect.client.schemas.schedules import ( 1a

11 CronSchedule, 

12 IntervalSchedule, 

13 RRuleSchedule, 

14) 

15from prefect.types._datetime import parse_datetime 1a

16from prefect.utilities.annotations import NotSet 1a

17from prefect.utilities.templating import apply_values 1a

18 

19 

20def _construct_schedules( 1a

21 deploy_config: dict[str, Any], 

22 step_outputs: dict[str, Any], 

23) -> list[dict[str, Any]]: 

24 schedules: list[dict[str, Any]] = [] 

25 schedule_configs = deploy_config.get("schedules", NotSet) or [] 

26 

27 if schedule_configs is not NotSet: 

28 schedules = [ 

29 _schedule_config_to_deployment_schedule(schedule_config) 

30 for schedule_config in apply_values(schedule_configs, step_outputs) 

31 ] 

32 elif schedule_configs is NotSet: 

33 if root.is_interactive(): 

34 schedules = prompt_schedules(app.console) 

35 

36 return schedules 

37 

38 

39def _schedule_config_to_deployment_schedule( 1a

40 schedule_config: dict[str, Any], 

41) -> dict[str, Any]: 

42 anchor_date = schedule_config.get("anchor_date") 

43 timezone = schedule_config.get("timezone") 

44 schedule_active = schedule_config.get("active") 

45 parameters = schedule_config.get("parameters", {}) 

46 slug = schedule_config.get("slug") 

47 

48 if cron := schedule_config.get("cron"): 

49 day_or = schedule_config.get("day_or") 

50 cron_kwargs = {"cron": cron, "timezone": timezone, "day_or": day_or} 

51 schedule = CronSchedule( 

52 **{k: v for k, v in cron_kwargs.items() if v is not None} 

53 ) 

54 elif interval := schedule_config.get("interval"): 

55 interval_kwargs = { 

56 "interval": timedelta(seconds=interval), 

57 "anchor_date": parse_datetime(anchor_date) if anchor_date else None, 

58 "timezone": timezone, 

59 } 

60 schedule = IntervalSchedule( 

61 **{k: v for k, v in interval_kwargs.items() if v is not None} 

62 ) 

63 elif rrule := schedule_config.get("rrule"): 

64 try: 

65 schedule = RRuleSchedule(**json.loads(rrule)) 

66 if timezone: 

67 schedule.timezone = timezone 

68 except json.JSONDecodeError: 

69 schedule = RRuleSchedule(rrule=rrule, timezone=timezone) 

70 else: 

71 raise ValueError( 

72 f"Unknown schedule type. Please provide a valid schedule. schedule={schedule_config}" 

73 ) 

74 

75 schedule_obj: dict[str, Any] = {"schedule": schedule} 

76 if schedule_active is not None: 

77 schedule_obj["active"] = schedule_active 

78 if parameters: 

79 schedule_obj["parameters"] = parameters 

80 if slug: 

81 schedule_obj["slug"] = slug 

82 

83 return schedule_obj