Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/concurrency_limit.py: 30%

77 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1""" 

2Command line interface for working with concurrency limits. 

3""" 

4 

5import textwrap 1a

6from typing import Optional 1a

7 

8import orjson 1a

9import typer 1a

10from rich.console import Group 1a

11from rich.panel import Panel 1a

12from rich.pretty import Pretty 1a

13from rich.table import Table 1a

14 

15from prefect.cli._types import PrefectTyper 1a

16from prefect.cli._utilities import exit_with_error, exit_with_success 1a

17from prefect.cli.root import app, is_interactive 1a

18from prefect.client.orchestration import get_client 1a

19from prefect.exceptions import ObjectNotFound 1a

20from prefect.types._datetime import human_friendly_diff 1a

21 

22concurrency_limit_app: PrefectTyper = PrefectTyper( 1a

23 name="concurrency-limit", 

24 help="Manage task-level concurrency limits.", 

25) 

26app.add_typer(concurrency_limit_app, aliases=["concurrency-limits"]) 1a

27 

28 

29@concurrency_limit_app.command() 1a

30async def create(tag: str, concurrency_limit: int): 1a

31 """ 

32 Create a concurrency limit against a tag. 

33 

34 This limit controls how many task runs with that tag may simultaneously be in a 

35 Running state. 

36 """ 

37 

38 async with get_client() as client: 

39 await client.create_concurrency_limit( 

40 tag=tag, concurrency_limit=concurrency_limit 

41 ) 

42 await client.read_concurrency_limit_by_tag(tag) 

43 

44 app.console.print( 

45 textwrap.dedent( 

46 f""" 

47 Created concurrency limit with properties: 

48 tag - {tag!r} 

49 concurrency_limit - {concurrency_limit} 

50 

51 Delete the concurrency limit: 

52 prefect concurrency-limit delete {tag!r} 

53  

54 Inspect the concurrency limit: 

55 prefect concurrency-limit inspect {tag!r} 

56 """ 

57 ) 

58 ) 

59 

60 

61@concurrency_limit_app.command() 1a

62async def inspect( 1a

63 tag: str, 

64 output: Optional[str] = typer.Option( 

65 None, 

66 "--output", 

67 "-o", 

68 help="Specify an output format. Currently supports: json", 

69 ), 

70): 

71 """ 

72 View details about a concurrency limit. `active_slots` shows a list of TaskRun IDs 

73 which are currently using a concurrency slot. 

74 """ 

75 if output and output.lower() != "json": 

76 exit_with_error("Only 'json' output format is supported.") 

77 

78 async with get_client() as client: 

79 try: 

80 result = await client.read_concurrency_limit_by_tag(tag=tag) 

81 except ObjectNotFound: 

82 exit_with_error(f"No concurrency limit found for the tag: {tag}") 

83 

84 if output and output.lower() == "json": 

85 result_json = result.model_dump(mode="json") 

86 json_output = orjson.dumps(result_json, option=orjson.OPT_INDENT_2).decode() 

87 app.console.print(json_output) 

88 else: 

89 trid_table = Table() 

90 trid_table.add_column("Active Task Run IDs", style="cyan", no_wrap=True) 

91 

92 cl_table = Table(title=f"Concurrency Limit ID: [red]{str(result.id)}") 

93 cl_table.add_column("Tag", style="green", no_wrap=True) 

94 cl_table.add_column("Concurrency Limit", style="blue", no_wrap=True) 

95 cl_table.add_column("Created", style="magenta", no_wrap=True) 

96 cl_table.add_column("Updated", style="magenta", no_wrap=True) 

97 

98 for trid in sorted(result.active_slots): 

99 trid_table.add_row(str(trid)) 

100 

101 cl_table.add_row( 

102 str(result.tag), 

103 str(result.concurrency_limit), 

104 Pretty(human_friendly_diff(result.created) if result.created else ""), 

105 Pretty(human_friendly_diff(result.updated) if result.updated else ""), 

106 ) 

107 

108 group = Group( 

109 cl_table, 

110 trid_table, 

111 ) 

112 app.console.print(Panel(group, expand=False)) 

113 

114 

115@concurrency_limit_app.command() 1a

116async def ls(limit: int = 15, offset: int = 0): 1a

117 """ 

118 View all concurrency limits. 

119 """ 

120 table = Table( 

121 title="Concurrency Limits", 

122 caption="inspect a concurrency limit to show active task run IDs", 

123 ) 

124 table.add_column("Tag", style="green", no_wrap=True) 

125 table.add_column("ID", justify="right", style="cyan", no_wrap=True) 

126 table.add_column("Concurrency Limit", style="blue", no_wrap=True) 

127 table.add_column("Active Task Runs", style="magenta", no_wrap=True) 

128 

129 async with get_client() as client: 

130 concurrency_limits = await client.read_concurrency_limits( 

131 limit=limit, offset=offset 

132 ) 

133 

134 for cl in sorted( 

135 concurrency_limits, key=lambda c: c.updated or c.created or "", reverse=True 

136 ): 

137 table.add_row( 

138 str(cl.tag), 

139 str(cl.id), 

140 str(cl.concurrency_limit), 

141 str(len(cl.active_slots)), 

142 ) 

143 

144 app.console.print(table) 

145 

146 

147@concurrency_limit_app.command() 1a

148async def reset(tag: str): 1a

149 """ 

150 Resets the concurrency limit slots set on the specified tag. 

151 """ 

152 

153 async with get_client() as client: 

154 try: 

155 await client.reset_concurrency_limit_by_tag(tag=tag) 

156 except ObjectNotFound: 

157 exit_with_error(f"No concurrency limit found for the tag: {tag}") 

158 

159 exit_with_success(f"Reset concurrency limit set on the tag: {tag}") 

160 

161 

162@concurrency_limit_app.command() 1a

163async def delete(tag: str): 1a

164 """ 

165 Delete the concurrency limit set on the specified tag. 

166 """ 

167 

168 async with get_client() as client: 

169 try: 

170 if is_interactive() and not typer.confirm( 

171 ( 

172 f"Are you sure you want to delete concurrency limit with tag {tag!r}?" 

173 ), 

174 default=False, 

175 ): 

176 exit_with_error("Deletion aborted.") 

177 await client.delete_concurrency_limit_by_tag(tag=tag) 

178 except ObjectNotFound: 

179 exit_with_error(f"No concurrency limit found for the tag: {tag}") 

180 

181 exit_with_success(f"Deleted concurrency limit set on the tag: {tag}")