Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/flow.py: 39%

45 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1""" 

2Command line interface for working with flows. 

3""" 

4 

5from typing import List, Optional 1a

6 

7import typer 1a

8from rich.table import Table 1a

9 

10from prefect.cli._types import PrefectTyper 1a

11from prefect.cli._utilities import exit_with_error 1a

12from prefect.cli.root import app 1a

13from prefect.client.orchestration import get_client 1a

14from prefect.client.schemas.actions import DeploymentScheduleCreate 1a

15from prefect.client.schemas.schedules import construct_schedule 1a

16from prefect.client.schemas.sorting import FlowSort 1a

17from prefect.deployments.runner import RunnerDeployment 1a

18from prefect.exceptions import MissingFlowError 1a

19from prefect.runner import Runner 1a

20from prefect.utilities import urls 1a

21 

22flow_app: PrefectTyper = PrefectTyper(name="flow", help="View and serve flows.") 1a

23app.add_typer(flow_app, aliases=["flows"]) 1a

24 

25 

26@flow_app.command() 1a

27async def ls( 1a

28 limit: int = 15, 

29): 

30 """ 

31 View flows. 

32 """ 

33 async with get_client() as client: 

34 flows = await client.read_flows( 

35 limit=limit, 

36 sort=FlowSort.CREATED_DESC, 

37 ) 

38 

39 table = Table(title="Flows") 

40 table.add_column("ID", justify="right", style="cyan", no_wrap=True) 

41 table.add_column("Name", style="green", no_wrap=True) 

42 table.add_column("Created", no_wrap=True) 

43 

44 for flow in flows: 

45 table.add_row( 

46 str(flow.id), 

47 str(flow.name), 

48 str(flow.created), 

49 ) 

50 

51 app.console.print(table) 

52 

53 

54@flow_app.command() 1a

55async def serve( 1a

56 entrypoint: str = typer.Argument( 

57 ..., 

58 help=( 

59 "The path to a file containing a flow and the name of the flow function in" 

60 " the format `./path/to/file.py:flow_func_name`." 

61 ), 

62 ), 

63 name: str = typer.Option( 

64 ..., 

65 "--name", 

66 "-n", 

67 help="The name to give the deployment created for the flow.", 

68 ), 

69 description: Optional[str] = typer.Option( 

70 None, 

71 "--description", 

72 "-d", 

73 help=( 

74 "The description to give the created deployment. If not provided, the" 

75 " description will be populated from the flow's description." 

76 ), 

77 ), 

78 version: Optional[str] = typer.Option( 

79 None, "-v", "--version", help="A version to give the created deployment." 

80 ), 

81 tags: Optional[List[str]] = typer.Option( 

82 None, 

83 "-t", 

84 "--tag", 

85 help="One or more optional tags to apply to the created deployment.", 

86 ), 

87 cron: Optional[str] = typer.Option( 

88 None, 

89 "--cron", 

90 help=( 

91 "A cron string that will be used to set a schedule for the created" 

92 " deployment." 

93 ), 

94 ), 

95 interval: Optional[int] = typer.Option( 

96 None, 

97 "--interval", 

98 help=( 

99 "An integer specifying an interval (in seconds) between scheduled runs of" 

100 " the flow." 

101 ), 

102 ), 

103 interval_anchor: Optional[str] = typer.Option( 

104 None, "--anchor-date", help="The start date for an interval schedule." 

105 ), 

106 rrule: Optional[str] = typer.Option( 

107 None, 

108 "--rrule", 

109 help="An RRule that will be used to set a schedule for the created deployment.", 

110 ), 

111 timezone: Optional[str] = typer.Option( 

112 None, 

113 "--timezone", 

114 help="Timezone to used scheduling flow runs e.g. 'America/New_York'", 

115 ), 

116 pause_on_shutdown: bool = typer.Option( 

117 True, 

118 help=( 

119 "If set, provided schedule will be paused when the serve command is" 

120 " stopped. If not set, the schedules will continue running." 

121 ), 

122 ), 

123 limit: Optional[int] = typer.Option( 

124 None, 

125 help=( 

126 "The maximum number of runs that can be executed concurrently by the" 

127 " created runner; only applies to this served flow." 

128 " To apply a limit across multiple served flows, use global_limit." 

129 ), 

130 ), 

131 global_limit: Optional[int] = typer.Option( 

132 None, 

133 help=( 

134 "The maximum number of concurrent runs allowed across all served" 

135 " flow instances associated with the same deployment." 

136 ), 

137 ), 

138): 

139 """ 

140 Serve a flow via an entrypoint. 

141 """ 

142 runner = Runner( 

143 name=name, 

144 pause_on_shutdown=pause_on_shutdown, 

145 limit=limit, 

146 ) 

147 try: 

148 schedules = [] 

149 if interval or cron or rrule: 

150 schedule = construct_schedule( 

151 interval=interval, 

152 cron=cron, 

153 rrule=rrule, 

154 timezone=timezone, 

155 anchor_date=interval_anchor, 

156 ) 

157 schedules = [DeploymentScheduleCreate(schedule=schedule, active=True)] 

158 

159 runner_deployment = RunnerDeployment.from_entrypoint( 

160 entrypoint=entrypoint, 

161 name=name, 

162 schedules=schedules, 

163 description=description, 

164 tags=tags or [], 

165 version=version, 

166 concurrency_limit=global_limit, 

167 ) 

168 except (MissingFlowError, ValueError) as exc: 

169 exit_with_error(str(exc)) 

170 deployment_id = await runner.add_deployment(runner_deployment) 

171 

172 help_message = ( 

173 f"[green]Your flow {runner_deployment.flow_name!r} is being served and polling" 

174 " for scheduled runs!\n[/]\nTo trigger a run for this flow, use the following" 

175 " command:\n[blue]\n\t$ prefect deployment run" 

176 f" '{runner_deployment.flow_name}/{name}'\n[/]" 

177 ) 

178 

179 deployment_url = urls.url_for("deployment", obj_id=deployment_id) 

180 if deployment_url: 

181 help_message += ( 

182 "\nYou can also run your flow via the Prefect UI:" 

183 f" [blue]{deployment_url}[/]\n" 

184 ) 

185 

186 app.console.print(help_message, soft_wrap=True) 

187 await runner.start()