Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/utilities/schemas/bases.py: 64%

68 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1import datetime 1a

2from abc import ABC, abstractmethod 1a

3from functools import partial 1a

4from typing import TYPE_CHECKING, Any, ClassVar, Optional, TypeVar 1a

5from uuid import UUID, uuid4 1a

6 

7from pydantic import BaseModel, ConfigDict, Field 1a

8from pydantic.config import JsonDict 1a

9from typing_extensions import Self 1a

10 

11from prefect._internal.uuid7 import uuid7 1a

12from prefect.types._datetime import DateTime, human_friendly_diff 1a

13 

14if TYPE_CHECKING: 14 ↛ 15line 14 didn't jump to line 15 because the condition on line 14 was never true1a

15 from pydantic.main import IncEx 

16 from rich.repr import RichReprResult 

17 

18T = TypeVar("T") 1a

19B = TypeVar("B", bound=BaseModel) 1a

20 

21 

22def get_class_fields_only(model: type[BaseModel]) -> set[str]: 1a

23 """ 

24 Gets all the field names defined on the model class but not any parent classes. 

25 Any fields that are on the parent but redefined on the subclass are included. 

26 """ 

27 # the annotations keys fit all of these criteria without further processing 

28 return set(model.__annotations__) 

29 

30 

31class PrefectDescriptorBase(ABC): 1a

32 """A base class for descriptor objects used with PrefectBaseModel 

33 

34 Pydantic needs to be told about any kind of non-standard descriptor 

35 objects used on a model, in order for these not to be treated as a field 

36 type instead. 

37 

38 This base class is registered as an ignored type with PrefectBaseModel 

39 and any classes that inherit from it will also be ignored. This allows 

40 such descriptors to be used as properties, methods or other bound 

41 descriptor use cases. 

42 

43 """ 

44 

45 @abstractmethod 1a

46 def __get__( 1a

47 self, __instance: Optional[Any], __owner: Optional[type[Any]] = None 

48 ) -> Any: 

49 """Base descriptor access. 

50 

51 The default implementation returns itself when the instance is None, 

52 and raises an attribute error when the instance is not not None. 

53 

54 """ 

55 if __instance is not None: 

56 raise AttributeError 

57 return self 

58 

59 

60class PrefectBaseModel(BaseModel): 1a

61 """A base pydantic.BaseModel for all Prefect schemas and pydantic models. 

62 

63 As the basis for most Prefect schemas, this base model ignores extra 

64 fields that are passed to it at instantiation. Because adding new fields to 

65 API payloads is not considered a breaking change, this ensures that any 

66 Prefect client loading data from a server running a possibly-newer version 

67 of Prefect will be able to process those new fields gracefully. 

68 """ 

69 

70 _reset_fields: ClassVar[set[str]] = set() 1a

71 

72 model_config: ClassVar[ConfigDict] = ConfigDict( 1a

73 ser_json_timedelta="float", 

74 extra="ignore", 

75 ignored_types=(PrefectDescriptorBase,), 

76 ) 

77 

78 def __eq__(self, other: Any) -> bool: 1a

79 """Equality operator that ignores the resettable fields of the PrefectBaseModel. 

80 

81 NOTE: this equality operator will only be applied if the PrefectBaseModel is 

82 the left-hand operand. This is a limitation of Python. 

83 """ 

84 copy_dict = self.model_dump(exclude=self._reset_fields) 

85 if isinstance(other, PrefectBaseModel): 

86 return copy_dict == other.model_dump(exclude=other._reset_fields) 

87 if isinstance(other, BaseModel): 

88 return copy_dict == other.model_dump() 

89 else: 

90 return copy_dict == other 

91 

92 def __rich_repr__(self) -> "RichReprResult": 1a

93 # Display all of the fields in the model if they differ from the default value 

94 for name, field in type(self).model_fields.items(): 

95 value = getattr(self, name) 

96 

97 # Simplify the display of some common fields 

98 if isinstance(value, UUID): 

99 value = str(value) 

100 elif isinstance(value, datetime.datetime): 

101 value = ( 

102 value.isoformat() 

103 if name == "timestamp" 

104 else human_friendly_diff(value) 

105 ) 

106 

107 yield name, value, field.get_default() 

108 

109 def reset_fields(self: Self) -> Self: 1a

110 """ 

111 Reset the fields of the model that are in the `_reset_fields` set. 

112 

113 Returns: 

114 PrefectBaseModel: A new instance of the model with the reset fields. 

115 """ 

116 return self.model_copy( 

117 update={ 

118 field: type(self) 

119 .model_fields[field] 

120 .get_default(call_default_factory=True) 

121 for field in self._reset_fields 

122 } 

123 ) 

124 

125 def model_dump_for_orm( 1a

126 self, 

127 *, 

128 include: Optional["IncEx"] = None, 

129 exclude: Optional["IncEx"] = None, 

130 by_alias: bool = False, 

131 exclude_unset: bool = False, 

132 exclude_defaults: bool = False, 

133 exclude_none: bool = False, 

134 ) -> dict[str, Any]: 

135 """ 

136 Prefect extension to `BaseModel.model_dump`. Generate a Python dictionary 

137 representation of the model suitable for passing to SQLAlchemy model 

138 constructors, `INSERT` statements, etc. The critical difference here is that 

139 this method will return any nested BaseModel objects as `BaseModel` instances, 

140 rather than serialized Python dictionaries. 

141 

142 Accepts the standard Pydantic `model_dump` arguments, except for `mode` (which 

143 is always "python"), `round_trip`, and `warnings`. 

144 

145 Usage docs: https://docs.pydantic.dev/2.6/concepts/serialization/#modelmodel_dump 

146 

147 Args: 

148 include: A list of fields to include in the output. 

149 exclude: A list of fields to exclude from the output. 

150 by_alias: Whether to use the field's alias in the dictionary key if defined. 

151 exclude_unset: Whether to exclude fields that have not been explicitly set. 

152 exclude_defaults: Whether to exclude fields that are set to their default 

153 value. 

154 exclude_none: Whether to exclude fields that have a value of `None`. 

155 

156 Returns: 

157 A dictionary representation of the model, suitable for passing 

158 to SQLAlchemy model constructors, INSERT statements, etc. 

159 """ 

160 # TODO: this could be optimized by excluding any fields that we know we are 

161 # going to replace because they are `BaseModel` instances. This would involve 

162 # understanding which fields would be included or excluded by model_dump so we 

163 # could instruct Pydantic to exclude them up front. 

164 deep = self.model_dump( 1ecdb

165 mode="python", 

166 include=include, 

167 exclude=exclude, 

168 by_alias=by_alias, 

169 exclude_unset=exclude_unset, 

170 exclude_defaults=exclude_defaults, 

171 exclude_none=exclude_none, 

172 context={"for_orm": True}, 

173 ) 

174 for k, v in self: 1ecdb

175 if k in deep and isinstance(v, BaseModel): 1ecdb

176 deep[k] = v 1cdb

177 return deep 1ecdb

178 

179 

180def _ensure_fields_required(field_names: list[str], schema: JsonDict) -> None: 1a

181 for field_name in field_names: 

182 if "required" not in schema: 

183 schema["required"] = [] 

184 if ( 

185 (required := schema.get("required")) 

186 and isinstance(required, list) 

187 and field_name not in required 

188 ): 

189 required.append(field_name) 

190 

191 

192class IDBaseModel(PrefectBaseModel): 1a

193 """ 

194 A PrefectBaseModel with an auto-generated UUID ID value. 

195 

196 The ID is reset on copy() and not included in equality comparisons. 

197 """ 

198 

199 model_config: ClassVar[ConfigDict] = ConfigDict( 1a

200 json_schema_extra=partial(_ensure_fields_required, ["id"]) 

201 ) 

202 

203 _reset_fields: ClassVar[set[str]] = {"id"} 1a

204 id: UUID = Field(default_factory=uuid4) 1a

205 

206 

207class TimeSeriesBaseModel(IDBaseModel): 1a

208 """ 

209 A PrefectBaseModel with a time-oriented UUIDv7 ID value. Used for models that 

210 operate like timeseries, such as runs, states, and logs. 

211 """ 

212 

213 id: UUID = Field(default_factory=uuid7) 1a

214 

215 

216class ORMBaseModel(IDBaseModel): 1a

217 """ 

218 A PrefectBaseModel with an auto-generated UUID ID value and created / 

219 updated timestamps, intended for compatibility with our standard ORM models. 

220 

221 The ID, created, and updated fields are reset on copy() and not included in 

222 equality comparisons. 

223 """ 

224 

225 _reset_fields: ClassVar[set[str]] = {"id", "created", "updated"} 1a

226 

227 model_config: ClassVar[ConfigDict] = ConfigDict( 1a

228 from_attributes=True, 

229 json_schema_extra=partial( 

230 _ensure_fields_required, ["id", "created", "updated"] 

231 ), 

232 ) 

233 

234 created: Optional[DateTime] = Field(default=None, repr=False) 1a

235 updated: Optional[DateTime] = Field(default=None, repr=False) 1a

236 

237 

238class ActionBaseModel(PrefectBaseModel): 1a

239 model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") 1a