Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/schemas/sorting.py: 98%
95 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1"""
2Schemas for sorting Prefect REST API objects.
3"""
5from collections.abc import Iterable 1a
6from typing import TYPE_CHECKING, Any 1a
8import sqlalchemy as sa 1a
10from prefect.server.utilities.database import db_injector 1a
11from prefect.utilities.collections import AutoEnum 1a
13if TYPE_CHECKING: 13 ↛ 14line 13 didn't jump to line 14 because the condition on line 13 was never true1a
14 from prefect.server.database import PrefectDBInterface
16# TODO: Consider moving the `as_sql_sort` functions out of here since they are a
17# database model level function and do not properly separate concerns when
18# present in the schemas module
21class FlowRunSort(AutoEnum): 1a
22 """Defines flow run sorting options."""
24 ID_DESC = AutoEnum.auto() 1a
25 START_TIME_ASC = AutoEnum.auto() 1a
26 START_TIME_DESC = AutoEnum.auto() 1a
27 EXPECTED_START_TIME_ASC = AutoEnum.auto() 1a
28 EXPECTED_START_TIME_DESC = AutoEnum.auto() 1a
29 NAME_ASC = AutoEnum.auto() 1a
30 NAME_DESC = AutoEnum.auto() 1a
31 NEXT_SCHEDULED_START_TIME_ASC = AutoEnum.auto() 1a
32 END_TIME_DESC = AutoEnum.auto() 1a
34 @db_injector 1a
35 def as_sql_sort(self, db: "PrefectDBInterface") -> Iterable[sa.ColumnElement[Any]]: 1a
36 """Return an expression used to sort flow runs"""
37 sort_mapping: dict[str, Iterable[sa.ColumnElement[Any]]] = { 1bdc
38 "ID_DESC": [db.FlowRun.id.desc()],
39 "START_TIME_ASC": [
40 sa.func.coalesce(
41 db.FlowRun.start_time, db.FlowRun.expected_start_time
42 ).asc()
43 ],
44 "START_TIME_DESC": [
45 sa.func.coalesce(
46 db.FlowRun.start_time, db.FlowRun.expected_start_time
47 ).desc()
48 ],
49 "EXPECTED_START_TIME_ASC": [db.FlowRun.expected_start_time.asc()],
50 "EXPECTED_START_TIME_DESC": [db.FlowRun.expected_start_time.desc()],
51 "NAME_ASC": [db.FlowRun.name.asc()],
52 "NAME_DESC": [db.FlowRun.name.desc()],
53 "NEXT_SCHEDULED_START_TIME_ASC": [
54 db.FlowRun.next_scheduled_start_time.asc()
55 ],
56 "END_TIME_DESC": [db.FlowRun.end_time.desc()],
57 }
58 return sort_mapping[self.value] 1bdc
61class TaskRunSort(AutoEnum): 1a
62 """Defines task run sorting options."""
64 ID_DESC = AutoEnum.auto() 1a
65 EXPECTED_START_TIME_ASC = AutoEnum.auto() 1a
66 EXPECTED_START_TIME_DESC = AutoEnum.auto() 1a
67 NAME_ASC = AutoEnum.auto() 1a
68 NAME_DESC = AutoEnum.auto() 1a
69 NEXT_SCHEDULED_START_TIME_ASC = AutoEnum.auto() 1a
70 END_TIME_DESC = AutoEnum.auto() 1a
72 @db_injector 1a
73 def as_sql_sort(self, db: "PrefectDBInterface") -> Iterable[sa.ColumnElement[Any]]: 1a
74 """Return an expression used to sort task runs"""
75 sort_mapping: dict[str, Iterable[sa.ColumnElement[Any]]] = { 1bdc
76 "ID_DESC": [db.TaskRun.id.desc()],
77 "EXPECTED_START_TIME_ASC": [db.TaskRun.expected_start_time.asc()],
78 "EXPECTED_START_TIME_DESC": [db.TaskRun.expected_start_time.desc()],
79 "NAME_ASC": [db.TaskRun.name.asc()],
80 "NAME_DESC": [db.TaskRun.name.desc()],
81 "NEXT_SCHEDULED_START_TIME_ASC": [
82 db.TaskRun.next_scheduled_start_time.asc()
83 ],
84 "END_TIME_DESC": [db.TaskRun.end_time.desc()],
85 }
86 return sort_mapping[self.value] 1bdc
89class LogSort(AutoEnum): 1a
90 """Defines log sorting options."""
92 TIMESTAMP_ASC = AutoEnum.auto() 1a
93 TIMESTAMP_DESC = AutoEnum.auto() 1a
95 @db_injector 1a
96 def as_sql_sort(self, db: "PrefectDBInterface") -> Iterable[sa.ColumnElement[Any]]: 1a
97 """Return an expression used to sort task runs"""
98 sort_mapping: dict[str, Iterable[sa.ColumnElement[Any]]] = { 1bc
99 "TIMESTAMP_ASC": [db.Log.timestamp.asc()],
100 "TIMESTAMP_DESC": [db.Log.timestamp.desc()],
101 }
102 return sort_mapping[self.value] 1bc
105class FlowSort(AutoEnum): 1a
106 """Defines flow sorting options."""
108 CREATED_DESC = AutoEnum.auto() 1a
109 UPDATED_DESC = AutoEnum.auto() 1a
110 NAME_ASC = AutoEnum.auto() 1a
111 NAME_DESC = AutoEnum.auto() 1a
113 @db_injector 1a
114 def as_sql_sort(self, db: "PrefectDBInterface") -> Iterable[sa.ColumnElement[Any]]: 1a
115 """Return an expression used to sort task runs"""
116 sort_mapping: dict[str, Iterable[sa.ColumnElement[Any]]] = { 1bdec
117 "CREATED_DESC": [db.Flow.created.desc()],
118 "UPDATED_DESC": [db.Flow.updated.desc()],
119 "NAME_ASC": [db.Flow.name.asc()],
120 "NAME_DESC": [db.Flow.name.desc()],
121 }
122 return sort_mapping[self.value] 1bdec
125class DeploymentSort(AutoEnum): 1a
126 """Defines deployment sorting options."""
128 CREATED_DESC = AutoEnum.auto() 1a
129 UPDATED_DESC = AutoEnum.auto() 1a
130 NAME_ASC = AutoEnum.auto() 1a
131 NAME_DESC = AutoEnum.auto() 1a
133 @db_injector 1a
134 def as_sql_sort(self, db: "PrefectDBInterface") -> Iterable[sa.ColumnElement[Any]]: 1a
135 """Return an expression used to sort task runs"""
136 sort_mapping: dict[str, Iterable[sa.ColumnElement[Any]]] = { 1bdfec
137 "CREATED_DESC": [db.Deployment.created.desc()],
138 "UPDATED_DESC": [db.Deployment.updated.desc()],
139 "NAME_ASC": [db.Deployment.name.asc()],
140 "NAME_DESC": [db.Deployment.name.desc()],
141 }
142 return sort_mapping[self.value] 1bdfec
145class ArtifactSort(AutoEnum): 1a
146 """Defines artifact sorting options."""
148 CREATED_DESC = AutoEnum.auto() 1a
149 UPDATED_DESC = AutoEnum.auto() 1a
150 ID_DESC = AutoEnum.auto() 1a
151 KEY_DESC = AutoEnum.auto() 1a
152 KEY_ASC = AutoEnum.auto() 1a
154 @db_injector 1a
155 def as_sql_sort(self, db: "PrefectDBInterface") -> Iterable[sa.ColumnElement[Any]]: 1a
156 """Return an expression used to sort task runs"""
157 sort_mapping: dict[str, Iterable[sa.ColumnElement[Any]]] = { 1bdec
158 "CREATED_DESC": [db.Artifact.created.desc()],
159 "UPDATED_DESC": [db.Artifact.updated.desc()],
160 "ID_DESC": [db.Artifact.id.desc()],
161 "KEY_DESC": [db.Artifact.key.desc()],
162 "KEY_ASC": [db.Artifact.key.asc()],
163 }
164 return sort_mapping[self.value] 1bdec
167class ArtifactCollectionSort(AutoEnum): 1a
168 """Defines artifact collection sorting options."""
170 CREATED_DESC = AutoEnum.auto() 1a
171 UPDATED_DESC = AutoEnum.auto() 1a
172 ID_DESC = AutoEnum.auto() 1a
173 KEY_DESC = AutoEnum.auto() 1a
174 KEY_ASC = AutoEnum.auto() 1a
176 @db_injector 1a
177 def as_sql_sort(self, db: "PrefectDBInterface") -> Iterable[sa.ColumnElement[Any]]: 1a
178 """Return an expression used to sort task runs"""
179 sort_mapping: dict[str, Iterable[sa.ColumnElement[Any]]] = { 1bc
180 "CREATED_DESC": [db.ArtifactCollection.created.desc()],
181 "UPDATED_DESC": [db.ArtifactCollection.updated.desc()],
182 "ID_DESC": [db.ArtifactCollection.id.desc()],
183 "KEY_DESC": [db.ArtifactCollection.key.desc()],
184 "KEY_ASC": [db.ArtifactCollection.key.asc()],
185 }
186 return sort_mapping[self.value] 1bc
189class VariableSort(AutoEnum): 1a
190 """Defines variables sorting options."""
192 CREATED_DESC = "CREATED_DESC" 1a
193 UPDATED_DESC = "UPDATED_DESC" 1a
194 NAME_DESC = "NAME_DESC" 1a
195 NAME_ASC = "NAME_ASC" 1a
197 @db_injector 1a
198 def as_sql_sort(self, db: "PrefectDBInterface") -> Iterable[sa.ColumnElement[Any]]: 1a
199 """Return an expression used to sort task runs"""
200 sort_mapping: dict[str, Iterable[sa.ColumnElement[Any]]] = { 1bdc
201 "CREATED_DESC": [db.Variable.created.desc()],
202 "UPDATED_DESC": [db.Variable.updated.desc()],
203 "NAME_DESC": [db.Variable.name.desc()],
204 "NAME_ASC": [db.Variable.name.asc()],
205 }
206 return sort_mapping[self.value] 1bdc
209class BlockDocumentSort(AutoEnum): 1a
210 """Defines block document sorting options."""
212 NAME_DESC = "NAME_DESC" 1a
213 NAME_ASC = "NAME_ASC" 1a
214 BLOCK_TYPE_AND_NAME_ASC = "BLOCK_TYPE_AND_NAME_ASC" 1a
216 @db_injector 1a
217 def as_sql_sort(self, db: "PrefectDBInterface") -> Iterable[sa.ColumnElement[Any]]: 1a
218 """Return an expression used to sort task runs"""
219 sort_mapping: dict[str, Iterable[sa.ColumnElement[Any]]] = { 1bgdfc
220 "NAME_DESC": [db.BlockDocument.name.desc()],
221 "NAME_ASC": [db.BlockDocument.name.asc()],
222 "BLOCK_TYPE_AND_NAME_ASC": [
223 db.BlockDocument.block_type_name.asc(),
224 db.BlockDocument.name.asc(),
225 ],
226 }
227 return sort_mapping[self.value] 1bgdfc