Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/flow.py: 39%
45 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1"""
2Command line interface for working with flows.
3"""
5from typing import List, Optional 1a
7import typer 1a
8from rich.table import Table 1a
10from prefect.cli._types import PrefectTyper 1a
11from prefect.cli._utilities import exit_with_error 1a
12from prefect.cli.root import app 1a
13from prefect.client.orchestration import get_client 1a
14from prefect.client.schemas.actions import DeploymentScheduleCreate 1a
15from prefect.client.schemas.schedules import construct_schedule 1a
16from prefect.client.schemas.sorting import FlowSort 1a
17from prefect.deployments.runner import RunnerDeployment 1a
18from prefect.exceptions import MissingFlowError 1a
19from prefect.runner import Runner 1a
20from prefect.utilities import urls 1a
22flow_app: PrefectTyper = PrefectTyper(name="flow", help="View and serve flows.") 1a
23app.add_typer(flow_app, aliases=["flows"]) 1a
26@flow_app.command() 1a
27async def ls( 1a
28 limit: int = 15,
29):
30 """
31 View flows.
32 """
33 async with get_client() as client:
34 flows = await client.read_flows(
35 limit=limit,
36 sort=FlowSort.CREATED_DESC,
37 )
39 table = Table(title="Flows")
40 table.add_column("ID", justify="right", style="cyan", no_wrap=True)
41 table.add_column("Name", style="green", no_wrap=True)
42 table.add_column("Created", no_wrap=True)
44 for flow in flows:
45 table.add_row(
46 str(flow.id),
47 str(flow.name),
48 str(flow.created),
49 )
51 app.console.print(table)
54@flow_app.command() 1a
55async def serve( 1a
56 entrypoint: str = typer.Argument(
57 ...,
58 help=(
59 "The path to a file containing a flow and the name of the flow function in"
60 " the format `./path/to/file.py:flow_func_name`."
61 ),
62 ),
63 name: str = typer.Option(
64 ...,
65 "--name",
66 "-n",
67 help="The name to give the deployment created for the flow.",
68 ),
69 description: Optional[str] = typer.Option(
70 None,
71 "--description",
72 "-d",
73 help=(
74 "The description to give the created deployment. If not provided, the"
75 " description will be populated from the flow's description."
76 ),
77 ),
78 version: Optional[str] = typer.Option(
79 None, "-v", "--version", help="A version to give the created deployment."
80 ),
81 tags: Optional[List[str]] = typer.Option(
82 None,
83 "-t",
84 "--tag",
85 help="One or more optional tags to apply to the created deployment.",
86 ),
87 cron: Optional[str] = typer.Option(
88 None,
89 "--cron",
90 help=(
91 "A cron string that will be used to set a schedule for the created"
92 " deployment."
93 ),
94 ),
95 interval: Optional[int] = typer.Option(
96 None,
97 "--interval",
98 help=(
99 "An integer specifying an interval (in seconds) between scheduled runs of"
100 " the flow."
101 ),
102 ),
103 interval_anchor: Optional[str] = typer.Option(
104 None, "--anchor-date", help="The start date for an interval schedule."
105 ),
106 rrule: Optional[str] = typer.Option(
107 None,
108 "--rrule",
109 help="An RRule that will be used to set a schedule for the created deployment.",
110 ),
111 timezone: Optional[str] = typer.Option(
112 None,
113 "--timezone",
114 help="Timezone to used scheduling flow runs e.g. 'America/New_York'",
115 ),
116 pause_on_shutdown: bool = typer.Option(
117 True,
118 help=(
119 "If set, provided schedule will be paused when the serve command is"
120 " stopped. If not set, the schedules will continue running."
121 ),
122 ),
123 limit: Optional[int] = typer.Option(
124 None,
125 help=(
126 "The maximum number of runs that can be executed concurrently by the"
127 " created runner; only applies to this served flow."
128 " To apply a limit across multiple served flows, use global_limit."
129 ),
130 ),
131 global_limit: Optional[int] = typer.Option(
132 None,
133 help=(
134 "The maximum number of concurrent runs allowed across all served"
135 " flow instances associated with the same deployment."
136 ),
137 ),
138):
139 """
140 Serve a flow via an entrypoint.
141 """
142 runner = Runner(
143 name=name,
144 pause_on_shutdown=pause_on_shutdown,
145 limit=limit,
146 )
147 try:
148 schedules = []
149 if interval or cron or rrule:
150 schedule = construct_schedule(
151 interval=interval,
152 cron=cron,
153 rrule=rrule,
154 timezone=timezone,
155 anchor_date=interval_anchor,
156 )
157 schedules = [DeploymentScheduleCreate(schedule=schedule, active=True)]
159 runner_deployment = RunnerDeployment.from_entrypoint(
160 entrypoint=entrypoint,
161 name=name,
162 schedules=schedules,
163 description=description,
164 tags=tags or [],
165 version=version,
166 concurrency_limit=global_limit,
167 )
168 except (MissingFlowError, ValueError) as exc:
169 exit_with_error(str(exc))
170 deployment_id = await runner.add_deployment(runner_deployment)
172 help_message = (
173 f"[green]Your flow {runner_deployment.flow_name!r} is being served and polling"
174 " for scheduled runs!\n[/]\nTo trigger a run for this flow, use the following"
175 " command:\n[blue]\n\t$ prefect deployment run"
176 f" '{runner_deployment.flow_name}/{name}'\n[/]"
177 )
179 deployment_url = urls.url_for("deployment", obj_id=deployment_id)
180 if deployment_url:
181 help_message += (
182 "\nYou can also run your flow via the Prefect UI:"
183 f" [blue]{deployment_url}[/]\n"
184 )
186 app.console.print(help_message, soft_wrap=True)
187 await runner.start()