Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/schemas/sorting.py: 79%

95 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1""" 

2Schemas for sorting Prefect REST API objects. 

3""" 

4 

5from collections.abc import Iterable 1a

6from typing import TYPE_CHECKING, Any 1a

7 

8import sqlalchemy as sa 1a

9 

10from prefect.server.utilities.database import db_injector 1a

11from prefect.utilities.collections import AutoEnum 1a

12 

13if TYPE_CHECKING: 13 ↛ 14line 13 didn't jump to line 14 because the condition on line 13 was never true1a

14 from prefect.server.database import PrefectDBInterface 

15 

16# TODO: Consider moving the `as_sql_sort` functions out of here since they are a 

17# database model level function and do not properly separate concerns when 

18# present in the schemas module 

19 

20 

21class FlowRunSort(AutoEnum): 1a

22 """Defines flow run sorting options.""" 

23 

24 ID_DESC = AutoEnum.auto() 1a

25 START_TIME_ASC = AutoEnum.auto() 1a

26 START_TIME_DESC = AutoEnum.auto() 1a

27 EXPECTED_START_TIME_ASC = AutoEnum.auto() 1a

28 EXPECTED_START_TIME_DESC = AutoEnum.auto() 1a

29 NAME_ASC = AutoEnum.auto() 1a

30 NAME_DESC = AutoEnum.auto() 1a

31 NEXT_SCHEDULED_START_TIME_ASC = AutoEnum.auto() 1a

32 END_TIME_DESC = AutoEnum.auto() 1a

33 

34 @db_injector 1a

35 def as_sql_sort(self, db: "PrefectDBInterface") -> Iterable[sa.ColumnElement[Any]]: 1a

36 """Return an expression used to sort flow runs""" 

37 sort_mapping: dict[str, Iterable[sa.ColumnElement[Any]]] = { 

38 "ID_DESC": [db.FlowRun.id.desc()], 

39 "START_TIME_ASC": [ 

40 sa.func.coalesce( 

41 db.FlowRun.start_time, db.FlowRun.expected_start_time 

42 ).asc() 

43 ], 

44 "START_TIME_DESC": [ 

45 sa.func.coalesce( 

46 db.FlowRun.start_time, db.FlowRun.expected_start_time 

47 ).desc() 

48 ], 

49 "EXPECTED_START_TIME_ASC": [db.FlowRun.expected_start_time.asc()], 

50 "EXPECTED_START_TIME_DESC": [db.FlowRun.expected_start_time.desc()], 

51 "NAME_ASC": [db.FlowRun.name.asc()], 

52 "NAME_DESC": [db.FlowRun.name.desc()], 

53 "NEXT_SCHEDULED_START_TIME_ASC": [ 

54 db.FlowRun.next_scheduled_start_time.asc() 

55 ], 

56 "END_TIME_DESC": [db.FlowRun.end_time.desc()], 

57 } 

58 return sort_mapping[self.value] 

59 

60 

61class TaskRunSort(AutoEnum): 1a

62 """Defines task run sorting options.""" 

63 

64 ID_DESC = AutoEnum.auto() 1a

65 EXPECTED_START_TIME_ASC = AutoEnum.auto() 1a

66 EXPECTED_START_TIME_DESC = AutoEnum.auto() 1a

67 NAME_ASC = AutoEnum.auto() 1a

68 NAME_DESC = AutoEnum.auto() 1a

69 NEXT_SCHEDULED_START_TIME_ASC = AutoEnum.auto() 1a

70 END_TIME_DESC = AutoEnum.auto() 1a

71 

72 @db_injector 1a

73 def as_sql_sort(self, db: "PrefectDBInterface") -> Iterable[sa.ColumnElement[Any]]: 1a

74 """Return an expression used to sort task runs""" 

75 sort_mapping: dict[str, Iterable[sa.ColumnElement[Any]]] = { 

76 "ID_DESC": [db.TaskRun.id.desc()], 

77 "EXPECTED_START_TIME_ASC": [db.TaskRun.expected_start_time.asc()], 

78 "EXPECTED_START_TIME_DESC": [db.TaskRun.expected_start_time.desc()], 

79 "NAME_ASC": [db.TaskRun.name.asc()], 

80 "NAME_DESC": [db.TaskRun.name.desc()], 

81 "NEXT_SCHEDULED_START_TIME_ASC": [ 

82 db.TaskRun.next_scheduled_start_time.asc() 

83 ], 

84 "END_TIME_DESC": [db.TaskRun.end_time.desc()], 

85 } 

86 return sort_mapping[self.value] 

87 

88 

89class LogSort(AutoEnum): 1a

90 """Defines log sorting options.""" 

91 

92 TIMESTAMP_ASC = AutoEnum.auto() 1a

93 TIMESTAMP_DESC = AutoEnum.auto() 1a

94 

95 @db_injector 1a

96 def as_sql_sort(self, db: "PrefectDBInterface") -> Iterable[sa.ColumnElement[Any]]: 1a

97 """Return an expression used to sort task runs""" 

98 sort_mapping: dict[str, Iterable[sa.ColumnElement[Any]]] = { 

99 "TIMESTAMP_ASC": [db.Log.timestamp.asc()], 

100 "TIMESTAMP_DESC": [db.Log.timestamp.desc()], 

101 } 

102 return sort_mapping[self.value] 

103 

104 

105class FlowSort(AutoEnum): 1a

106 """Defines flow sorting options.""" 

107 

108 CREATED_DESC = AutoEnum.auto() 1a

109 UPDATED_DESC = AutoEnum.auto() 1a

110 NAME_ASC = AutoEnum.auto() 1a

111 NAME_DESC = AutoEnum.auto() 1a

112 

113 @db_injector 1a

114 def as_sql_sort(self, db: "PrefectDBInterface") -> Iterable[sa.ColumnElement[Any]]: 1a

115 """Return an expression used to sort task runs""" 

116 sort_mapping: dict[str, Iterable[sa.ColumnElement[Any]]] = { 

117 "CREATED_DESC": [db.Flow.created.desc()], 

118 "UPDATED_DESC": [db.Flow.updated.desc()], 

119 "NAME_ASC": [db.Flow.name.asc()], 

120 "NAME_DESC": [db.Flow.name.desc()], 

121 } 

122 return sort_mapping[self.value] 

123 

124 

125class DeploymentSort(AutoEnum): 1a

126 """Defines deployment sorting options.""" 

127 

128 CREATED_DESC = AutoEnum.auto() 1a

129 UPDATED_DESC = AutoEnum.auto() 1a

130 NAME_ASC = AutoEnum.auto() 1a

131 NAME_DESC = AutoEnum.auto() 1a

132 

133 @db_injector 1a

134 def as_sql_sort(self, db: "PrefectDBInterface") -> Iterable[sa.ColumnElement[Any]]: 1a

135 """Return an expression used to sort task runs""" 

136 sort_mapping: dict[str, Iterable[sa.ColumnElement[Any]]] = { 

137 "CREATED_DESC": [db.Deployment.created.desc()], 

138 "UPDATED_DESC": [db.Deployment.updated.desc()], 

139 "NAME_ASC": [db.Deployment.name.asc()], 

140 "NAME_DESC": [db.Deployment.name.desc()], 

141 } 

142 return sort_mapping[self.value] 

143 

144 

145class ArtifactSort(AutoEnum): 1a

146 """Defines artifact sorting options.""" 

147 

148 CREATED_DESC = AutoEnum.auto() 1a

149 UPDATED_DESC = AutoEnum.auto() 1a

150 ID_DESC = AutoEnum.auto() 1a

151 KEY_DESC = AutoEnum.auto() 1a

152 KEY_ASC = AutoEnum.auto() 1a

153 

154 @db_injector 1a

155 def as_sql_sort(self, db: "PrefectDBInterface") -> Iterable[sa.ColumnElement[Any]]: 1a

156 """Return an expression used to sort task runs""" 

157 sort_mapping: dict[str, Iterable[sa.ColumnElement[Any]]] = { 

158 "CREATED_DESC": [db.Artifact.created.desc()], 

159 "UPDATED_DESC": [db.Artifact.updated.desc()], 

160 "ID_DESC": [db.Artifact.id.desc()], 

161 "KEY_DESC": [db.Artifact.key.desc()], 

162 "KEY_ASC": [db.Artifact.key.asc()], 

163 } 

164 return sort_mapping[self.value] 

165 

166 

167class ArtifactCollectionSort(AutoEnum): 1a

168 """Defines artifact collection sorting options.""" 

169 

170 CREATED_DESC = AutoEnum.auto() 1a

171 UPDATED_DESC = AutoEnum.auto() 1a

172 ID_DESC = AutoEnum.auto() 1a

173 KEY_DESC = AutoEnum.auto() 1a

174 KEY_ASC = AutoEnum.auto() 1a

175 

176 @db_injector 1a

177 def as_sql_sort(self, db: "PrefectDBInterface") -> Iterable[sa.ColumnElement[Any]]: 1a

178 """Return an expression used to sort task runs""" 

179 sort_mapping: dict[str, Iterable[sa.ColumnElement[Any]]] = { 

180 "CREATED_DESC": [db.ArtifactCollection.created.desc()], 

181 "UPDATED_DESC": [db.ArtifactCollection.updated.desc()], 

182 "ID_DESC": [db.ArtifactCollection.id.desc()], 

183 "KEY_DESC": [db.ArtifactCollection.key.desc()], 

184 "KEY_ASC": [db.ArtifactCollection.key.asc()], 

185 } 

186 return sort_mapping[self.value] 

187 

188 

189class VariableSort(AutoEnum): 1a

190 """Defines variables sorting options.""" 

191 

192 CREATED_DESC = "CREATED_DESC" 1a

193 UPDATED_DESC = "UPDATED_DESC" 1a

194 NAME_DESC = "NAME_DESC" 1a

195 NAME_ASC = "NAME_ASC" 1a

196 

197 @db_injector 1a

198 def as_sql_sort(self, db: "PrefectDBInterface") -> Iterable[sa.ColumnElement[Any]]: 1a

199 """Return an expression used to sort task runs""" 

200 sort_mapping: dict[str, Iterable[sa.ColumnElement[Any]]] = { 

201 "CREATED_DESC": [db.Variable.created.desc()], 

202 "UPDATED_DESC": [db.Variable.updated.desc()], 

203 "NAME_DESC": [db.Variable.name.desc()], 

204 "NAME_ASC": [db.Variable.name.asc()], 

205 } 

206 return sort_mapping[self.value] 

207 

208 

209class BlockDocumentSort(AutoEnum): 1a

210 """Defines block document sorting options.""" 

211 

212 NAME_DESC = "NAME_DESC" 1a

213 NAME_ASC = "NAME_ASC" 1a

214 BLOCK_TYPE_AND_NAME_ASC = "BLOCK_TYPE_AND_NAME_ASC" 1a

215 

216 @db_injector 1a

217 def as_sql_sort(self, db: "PrefectDBInterface") -> Iterable[sa.ColumnElement[Any]]: 1a

218 """Return an expression used to sort task runs""" 

219 sort_mapping: dict[str, Iterable[sa.ColumnElement[Any]]] = { 

220 "NAME_DESC": [db.BlockDocument.name.desc()], 

221 "NAME_ASC": [db.BlockDocument.name.asc()], 

222 "BLOCK_TYPE_AND_NAME_ASC": [ 

223 db.BlockDocument.block_type_name.asc(), 

224 db.BlockDocument.name.asc(), 

225 ], 

226 } 

227 return sort_mapping[self.value]