Coverage for /usr/local/lib/python3.12/site-packages/prefect/_result_records.py: 34%

96 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1from __future__ import annotations 1a

2 

3import inspect 1a

4import uuid 1a

5from typing import ( 1a

6 TYPE_CHECKING, 

7 Any, 

8 Generic, 

9 Optional, 

10 TypeVar, 

11 Union, 

12 cast, 

13) 

14from uuid import UUID 1a

15 

16from pydantic import ( 1a

17 BaseModel, 

18 Field, 

19 ValidationError, 

20 model_validator, 

21) 

22 

23import prefect 1a

24from prefect.exceptions import ( 1a

25 SerializationError, 

26) 

27from prefect.serializers import PickleSerializer, Serializer 1a

28from prefect.types import DateTime 1a

29 

30if TYPE_CHECKING: 30 ↛ 31line 30 didn't jump to line 31 because the condition on line 30 was never true1a

31 pass 

32 

33 

34ResultSerializer = Union[Serializer, str] 1a

35LITERAL_TYPES: set[type] = {type(None), bool, UUID} 1a

36R = TypeVar("R") 1a

37 

38 

39class ResultRecordMetadata(BaseModel): 1a

40 """ 

41 Metadata for a result record. 

42 """ 

43 

44 storage_key: Optional[str] = Field( 1a

45 default=None 

46 ) # optional for backwards compatibility 

47 expiration: Optional[DateTime] = Field(default=None) 1a

48 serializer: Serializer = Field(default_factory=PickleSerializer) 1a

49 prefect_version: str = Field(default=prefect.__version__) 1a

50 storage_block_id: Optional[uuid.UUID] = Field(default=None) 1a

51 

52 def dump_bytes(self) -> bytes: 1a

53 """ 

54 Serialize the metadata to bytes. 

55 

56 Returns: 

57 bytes: the serialized metadata 

58 """ 

59 return self.model_dump_json(serialize_as_any=True).encode() 

60 

61 @classmethod 1a

62 def load_bytes(cls, data: bytes) -> "ResultRecordMetadata": 1a

63 """ 

64 Deserialize metadata from bytes. 

65 

66 Args: 

67 data: the serialized metadata 

68 

69 Returns: 

70 ResultRecordMetadata: the deserialized metadata 

71 """ 

72 return cls.model_validate_json(data) 

73 

74 def __eq__(self, other: Any) -> bool: 1a

75 if not isinstance(other, ResultRecordMetadata): 

76 return False 

77 return ( 

78 self.storage_key == other.storage_key 

79 and self.expiration == other.expiration 

80 and self.serializer == other.serializer 

81 and self.prefect_version == other.prefect_version 

82 and self.storage_block_id == other.storage_block_id 

83 ) 

84 

85 

86class ResultRecord(BaseModel, Generic[R]): 1a

87 """ 

88 A record of a result. 

89 """ 

90 

91 metadata: ResultRecordMetadata 1a

92 result: R 1a

93 

94 @property 1a

95 def expiration(self) -> DateTime | None: 1a

96 return self.metadata.expiration 

97 

98 @property 1a

99 def serializer(self) -> Serializer: 1a

100 return self.metadata.serializer 

101 

102 def serialize_result(self) -> bytes: 1a

103 try: 

104 data = self.serializer.dumps(self.result) 

105 except Exception as exc: 

106 extra_info = ( 

107 'You can try a different serializer (e.g. result_serializer="json") ' 

108 "or disabling persistence (persist_result=False) for this flow or task." 

109 ) 

110 # check if this is a known issue with cloudpickle and pydantic 

111 # and add extra information to help the user recover 

112 

113 if ( 

114 isinstance(exc, TypeError) 

115 and isinstance(self.result, BaseModel) 

116 and str(exc).startswith("cannot pickle") 

117 ): 

118 try: 

119 from IPython.core.getipython import get_ipython 

120 

121 if get_ipython() is not None: 

122 extra_info = inspect.cleandoc( 

123 """ 

124 This is a known issue in Pydantic that prevents 

125 locally-defined (non-imported) models from being 

126 serialized by cloudpickle in IPython/Jupyter 

127 environments. Please see 

128 https://github.com/pydantic/pydantic/issues/8232 for 

129 more information. To fix the issue, either: (1) move 

130 your Pydantic class definition to an importable 

131 location, (2) use the JSON serializer for your flow 

132 or task (`result_serializer="json"`), or (3) 

133 disable result persistence for your flow or task 

134 (`persist_result=False`). 

135 """ 

136 ).replace("\n", " ") 

137 except ImportError: 

138 pass 

139 raise SerializationError( 

140 f"Failed to serialize object of type {type(self.result).__name__!r} with " 

141 f"serializer {self.serializer.type!r}. {extra_info}" 

142 ) from exc 

143 

144 return data 

145 

146 @model_validator(mode="before") 1a

147 @classmethod 1a

148 def coerce_old_format(cls, value: dict[str, Any] | Any) -> dict[str, Any]: 1a

149 if isinstance(value, dict): 

150 if TYPE_CHECKING: # TODO: # isintance doesn't accept generic parameters 

151 value = cast(dict[str, Any], value) 

152 if "data" in value: 

153 value["result"] = value.pop("data") 

154 if "metadata" not in value: 

155 value["metadata"] = {} 

156 if "expiration" in value: 

157 value["metadata"]["expiration"] = value.pop("expiration") 

158 if "serializer" in value: 

159 value["metadata"]["serializer"] = value.pop("serializer") 

160 if "prefect_version" in value: 

161 value["metadata"]["prefect_version"] = value.pop("prefect_version") 

162 return value 

163 

164 def serialize_metadata(self) -> bytes: 1a

165 return self.metadata.dump_bytes() 

166 

167 def serialize( 1a

168 self, 

169 ) -> bytes: 

170 """ 

171 Serialize the record to bytes. 

172 

173 Returns: 

174 bytes: the serialized record 

175 

176 """ 

177 return ( 

178 self.model_copy(update={"result": self.serialize_result()}) 

179 .model_dump_json(serialize_as_any=True) 

180 .encode() 

181 ) 

182 

183 @classmethod 1a

184 def deserialize( 1a

185 cls, data: bytes, backup_serializer: Serializer | None = None 

186 ) -> "ResultRecord[R]": 

187 """ 

188 Deserialize a record from bytes. 

189 

190 Args: 

191 data: the serialized record 

192 backup_serializer: The serializer to use to deserialize the result record. Only 

193 necessary if the provided data does not specify a serializer. 

194 

195 Returns: 

196 ResultRecord: the deserialized record 

197 """ 

198 try: 

199 instance = cls.model_validate_json(data) 

200 except ValidationError: 

201 if backup_serializer is None: 

202 raise 

203 else: 

204 result = backup_serializer.loads(data) 

205 return cls( 

206 metadata=ResultRecordMetadata(serializer=backup_serializer), 

207 result=result, 

208 ) 

209 if isinstance(instance.result, bytes): 

210 instance.result = instance.serializer.loads(instance.result) 

211 elif isinstance(instance.result, str): 

212 instance.result = instance.serializer.loads(instance.result.encode()) 

213 return instance 

214 

215 @classmethod 1a

216 def deserialize_from_result_and_metadata( 1a

217 cls, result: bytes, metadata: bytes 

218 ) -> "ResultRecord[R]": 

219 """ 

220 Deserialize a record from separate result and metadata bytes. 

221 

222 Args: 

223 result: the result 

224 metadata: the serialized metadata 

225 

226 Returns: 

227 ResultRecord: the deserialized record 

228 """ 

229 result_record_metadata = ResultRecordMetadata.load_bytes(metadata) 

230 return cls( 

231 metadata=result_record_metadata, 

232 result=result_record_metadata.serializer.loads(result), 

233 ) 

234 

235 def __eq__(self, other: Any | "ResultRecord[Any]") -> bool: 1a

236 if not isinstance(other, ResultRecord): 

237 return False 

238 return self.model_dump(include={"metadata", "result"}) == other.model_dump( 

239 include={"metadata", "result"} 

240 )