Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/deploy/_schedules.py: 19%
49 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1from __future__ import annotations 1a
3import json 1a
4from datetime import timedelta 1a
5from typing import Any 1a
7import prefect.cli.root as root 1a
8from prefect.cli._prompts import prompt_schedules 1a
9from prefect.cli.root import app 1a
10from prefect.client.schemas.schedules import ( 1a
11 CronSchedule,
12 IntervalSchedule,
13 RRuleSchedule,
14)
15from prefect.types._datetime import parse_datetime 1a
16from prefect.utilities.annotations import NotSet 1a
17from prefect.utilities.templating import apply_values 1a
20def _construct_schedules( 1a
21 deploy_config: dict[str, Any],
22 step_outputs: dict[str, Any],
23) -> list[dict[str, Any]]:
24 schedules: list[dict[str, Any]] = []
25 schedule_configs = deploy_config.get("schedules", NotSet) or []
27 if schedule_configs is not NotSet:
28 schedules = [
29 _schedule_config_to_deployment_schedule(schedule_config)
30 for schedule_config in apply_values(schedule_configs, step_outputs)
31 ]
32 elif schedule_configs is NotSet:
33 if root.is_interactive():
34 schedules = prompt_schedules(app.console)
36 return schedules
39def _schedule_config_to_deployment_schedule( 1a
40 schedule_config: dict[str, Any],
41) -> dict[str, Any]:
42 anchor_date = schedule_config.get("anchor_date")
43 timezone = schedule_config.get("timezone")
44 schedule_active = schedule_config.get("active")
45 parameters = schedule_config.get("parameters", {})
46 slug = schedule_config.get("slug")
48 if cron := schedule_config.get("cron"):
49 day_or = schedule_config.get("day_or")
50 cron_kwargs = {"cron": cron, "timezone": timezone, "day_or": day_or}
51 schedule = CronSchedule(
52 **{k: v for k, v in cron_kwargs.items() if v is not None}
53 )
54 elif interval := schedule_config.get("interval"):
55 interval_kwargs = {
56 "interval": timedelta(seconds=interval),
57 "anchor_date": parse_datetime(anchor_date) if anchor_date else None,
58 "timezone": timezone,
59 }
60 schedule = IntervalSchedule(
61 **{k: v for k, v in interval_kwargs.items() if v is not None}
62 )
63 elif rrule := schedule_config.get("rrule"):
64 try:
65 schedule = RRuleSchedule(**json.loads(rrule))
66 if timezone:
67 schedule.timezone = timezone
68 except json.JSONDecodeError:
69 schedule = RRuleSchedule(rrule=rrule, timezone=timezone)
70 else:
71 raise ValueError(
72 f"Unknown schedule type. Please provide a valid schedule. schedule={schedule_config}"
73 )
75 schedule_obj: dict[str, Any] = {"schedule": schedule}
76 if schedule_active is not None:
77 schedule_obj["active"] = schedule_active
78 if parameters:
79 schedule_obj["parameters"] = parameters
80 if slug:
81 schedule_obj["slug"] = slug
83 return schedule_obj