Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/validation.py: 17%

131 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1""" 

2This module contains functions for validating job variables for deployments, work pools, 

3flow runs, and RunDeployment actions. These functions are used to validate that job 

4variables provided by users conform to the JSON schema defined in the work pool's base job 

5template. 

6 

7Note some important details: 

8 

91. The order of applying job variables is: work pool's base job template, deployment, flow 

10 run. This means that flow run job variables override deployment job variables, which 

11 override work pool job variables. 

12 

132. The validation of job variables for work pools and deployments ignores required keys in 

14 because we don't know if the full set of overrides will include values for any required 

15 fields. 

16 

173. Work pools can include default values for job variables. These can be normal types or 

18 references to blocks. We have not been validating these values or whether default blocks 

19 satisfy job variable JSON schemas. To avoid failing validation for existing (otherwise 

20 working) data, we ignore invalid defaults when validating deployment and flow run 

21 variables, but not when validating the work pool's base template, e.g. during work pool 

22 creation or updates. If we find defaults that are invalid, we have to ignore required 

23 fields when we run the full validation. 

24 

254. A flow run is the terminal point for job variables, so it is the only place where 

26 we validate required variables and default values. Thus, 

27 `validate_job_variables_for_deployment_flow_run` and 

28 `validate_job_variables_for_run_deployment_action` check for required fields. 

29 

305. We have been using Pydantic v1 to generate work pool base job templates, and it produces 

31 invalid JSON schemas for some fields, e.g. tuples and optional fields. We try to fix these 

32 schemas on the fly while validating job variables, but there is a case we can't resolve, 

33 which is whether or not an optional field supports a None value. In this case, we allow 

34 None values to be passed in, which means that if an optional field does not actually 

35 allow None values, the Pydantic model will fail to validate at runtime. 

36""" 

37 

38from typing import TYPE_CHECKING, Any, Dict, Optional, Union 1a

39from uuid import UUID 1a

40 

41import pydantic 1a

42from fastapi import HTTPException 1a

43from sqlalchemy.exc import DBAPIError, NoInspectionAvailable 1a

44from sqlalchemy.ext.asyncio import AsyncSession 1a

45 

46from prefect._internal.compatibility.starlette import status 1a

47from prefect.logging import get_logger 1a

48from prefect.server import models, schemas 1a

49from prefect.server.database.orm_models import Deployment as BaseDeployment 1a

50from prefect.server.events.actions import RunDeployment 1a

51from prefect.server.schemas.core import WorkPool 1a

52from prefect.utilities.schema_tools import ValidationError, is_valid_schema, validate 1a

53 

54if TYPE_CHECKING: 54 ↛ 55line 54 didn't jump to line 55 because the condition on line 54 was never true1a

55 import logging 

56 

57logger: "logging.Logger" = get_logger("server.api.validation") 1a

58 

59DeploymentAction = Union[ 1a

60 schemas.actions.DeploymentCreate, schemas.actions.DeploymentUpdate 

61] 

62FlowRunAction = Union[ 1a

63 schemas.actions.DeploymentFlowRunCreate, schemas.actions.FlowRunUpdate 

64] 

65 

66 

67async def _get_base_config_defaults( 1a

68 session: AsyncSession, 

69 base_config: dict[str, Any], 

70 ignore_invalid_defaults: bool = True, 

71) -> tuple[dict[str, Any], bool]: 

72 variables_schema = base_config.get("variables", {}) 

73 fields_schema: dict[str, Any] = variables_schema.get("properties", {}) 

74 defaults: dict[str, Any] = dict() 

75 has_invalid_defaults = False 

76 

77 if not fields_schema: 

78 return defaults, has_invalid_defaults 

79 

80 for variable_name, attrs in fields_schema.items(): 

81 if "default" not in attrs: 

82 continue 

83 

84 default = attrs["default"] 

85 

86 if isinstance(default, dict) and "$ref" in default: 

87 hydrated_block = await _resolve_default_reference(default, session) 

88 if hydrated_block is None: 

89 continue 

90 defaults[variable_name] = hydrated_block 

91 else: 

92 defaults[variable_name] = default 

93 

94 if ignore_invalid_defaults: 

95 errors = validate( 

96 {variable_name: defaults[variable_name]}, 

97 variables_schema, 

98 raise_on_error=False, 

99 preprocess=False, 

100 ignore_required=True, 

101 allow_none_with_default=False, 

102 ) 

103 if errors: 

104 has_invalid_defaults = True 

105 try: 

106 del defaults[variable_name] 

107 except (IndexError, KeyError): 

108 pass 

109 

110 return defaults, has_invalid_defaults 

111 

112 

113async def _resolve_default_reference( 1a

114 variable: dict[str, Any], session: AsyncSession 

115) -> Optional[Any]: 

116 """ 

117 Resolve a reference to a block. The input variable should have a format of: 

118 

119 { 

120 "$ref": { 

121 "block_document_id": "block_document_id" 

122 }, 

123 } 

124 """ 

125 if not isinstance(variable, dict): 

126 return None 

127 

128 if "$ref" not in variable: 

129 return None 

130 

131 reference_data = variable.get("$ref", {}) 

132 if (provided_block_document_id := reference_data.get("block_document_id")) is None: 

133 return None 

134 

135 if isinstance(provided_block_document_id, UUID): 

136 block_document_id = provided_block_document_id 

137 else: 

138 try: 

139 block_document_id = UUID(provided_block_document_id) 

140 except ValueError: 

141 raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Block not found.") 

142 

143 try: 

144 block_document = await models.block_documents.read_block_document_by_id( 

145 session, block_document_id 

146 ) 

147 except pydantic.ValidationError: 

148 # It's possible to get an invalid UUID here because the block document ID is 

149 # not validated by our schemas. 

150 logger.info("Could not find block document with ID %s", block_document_id) 

151 block_document = None 

152 

153 if not block_document: 

154 raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Block not found.") 

155 

156 return block_document.data 

157 

158 

159async def _validate_work_pool_job_variables( 1a

160 session: AsyncSession, 

161 work_pool_name: str, 

162 base_job_template: Dict[str, Any], 

163 *job_vars: Dict[str, Any], 

164 ignore_required: bool = True, 

165 ignore_invalid_defaults: bool = True, 

166 raise_on_error=True, 

167) -> None: 

168 if not base_job_template: 168 ↛ 175line 168 didn't jump to line 175 because the condition on line 168 was always true1b

169 logger.info( 1b

170 "Cannot validate job variables for work pool %s because it does not have a base job template", 

171 work_pool_name, 

172 ) 

173 return 1b

174 

175 variables_schema = base_job_template.get("variables") 

176 if not variables_schema: 

177 logger.info( 

178 "Cannot validate job variables for work pool %s " 

179 "because it does not specify a variables schema", 

180 work_pool_name, 

181 ) 

182 return 

183 

184 try: 

185 is_valid_schema(variables_schema, preprocess=False) 

186 except ValueError as exc: 

187 raise HTTPException( 

188 status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=str(exc) 

189 ) 

190 

191 base_vars, invalid_defaults = await _get_base_config_defaults( 

192 session, base_job_template, ignore_invalid_defaults 

193 ) 

194 all_job_vars = {**base_vars} 

195 

196 for jvs in job_vars: 

197 if isinstance(jvs, dict): 

198 all_job_vars.update(jvs) 

199 

200 # If we are ignoring validation for default values and there were invalid defaults, 

201 # then we can't check for required fields because we won't have the default values 

202 # to satisfy nissing required fields. 

203 should_ignore_required = ignore_required or ( 

204 ignore_invalid_defaults and invalid_defaults 

205 ) 

206 

207 validate( 

208 all_job_vars, 

209 variables_schema, 

210 raise_on_error=raise_on_error, 

211 preprocess=True, 

212 ignore_required=should_ignore_required, 

213 # We allow None values to be passed in for optional fields if there is a default 

214 # value for the field. This is because we have blocks that contain default None 

215 # values that will fail to validate otherwise. However, this means that if an 

216 # optional field does not actually allow None values, the Pydantic model will fail 

217 # to validate at runtime. Unfortunately, there is not a good solution to this 

218 # problem at this time. 

219 allow_none_with_default=True, 

220 ) 

221 

222 

223async def validate_job_variables_for_deployment_flow_run( 1a

224 session: AsyncSession, 

225 deployment: BaseDeployment, 

226 flow_run: FlowRunAction, 

227) -> None: 

228 """ 

229 Validate job variables for a flow run created for a deployment. 

230 

231 Flow runs are the terminal point for job variable overlays, so we validate required 

232 job variables because all variables should now be present. 

233 """ 

234 # If we aren't able to access a deployment's work pool, we don't have a base job 

235 # template to validate job variables against. This is not a validation failure because 

236 # some deployments may not have a work pool, such as those created by flow.serve(). 

237 if not (deployment.work_queue and deployment.work_queue.work_pool): 

238 logger.info( 

239 "Cannot validate job variables for deployment %s " 

240 "because it does not have a work pool", 

241 deployment.id, 

242 ) 

243 return 

244 

245 work_pool = deployment.work_queue.work_pool 

246 

247 try: 

248 await _validate_work_pool_job_variables( 

249 session, 

250 work_pool.name, 

251 work_pool.base_job_template, 

252 deployment.job_variables or {}, 

253 flow_run.job_variables or {}, 

254 ignore_required=False, 

255 ignore_invalid_defaults=True, 

256 ) 

257 except ValidationError as exc: 

258 if isinstance(flow_run, schemas.actions.DeploymentFlowRunCreate): 

259 error_msg = f"Error creating flow run: {exc}" 

260 else: 

261 error_msg = f"Error updating flow run: {exc}" 

262 raise HTTPException(status.HTTP_422_UNPROCESSABLE_ENTITY, detail=error_msg) 

263 

264 

265async def validate_job_variables_for_deployment( 1a

266 session: AsyncSession, 

267 work_pool: WorkPool, 

268 deployment: DeploymentAction, 

269) -> None: 

270 """ 

271 Validate job variables for deployment creation and updates. 

272 

273 This validation applies only to deployments that have a work pool. If the deployment 

274 does not have a work pool, we cannot validate job variables because we don't have a 

275 base job template to validate against, so we skip this validation. 

276 

277 Unlike validations for flow runs, validation here ignores required keys in the schema 

278 because we don't know if the full set of overrides will include values for any 

279 required fields. If the full set of job variables when a flow is running, including 

280 the deployment's and flow run's overrides, fails to specify a value for the required 

281 key, that's an error. 

282 """ 

283 if not deployment.job_variables: 

284 return 

285 try: 

286 await _validate_work_pool_job_variables( 

287 session, 

288 work_pool.name, 

289 work_pool.base_job_template, 

290 deployment.job_variables or {}, 

291 ignore_required=True, 

292 ignore_invalid_defaults=True, 

293 ) 

294 except ValidationError as exc: 

295 if isinstance(deployment, schemas.actions.DeploymentCreate): 

296 error_msg = f"Error creating deployment: {exc}" 

297 else: 

298 error_msg = f"Error updating deployment: {exc}" 

299 raise HTTPException(status.HTTP_422_UNPROCESSABLE_ENTITY, detail=error_msg) 

300 

301 

302async def validate_job_variable_defaults_for_work_pool( 1a

303 session: AsyncSession, 

304 work_pool_name: str, 

305 base_job_template: Dict[str, Any], 

306) -> None: 

307 """ 

308 Validate the default job variables for a work pool. 

309 

310 This validation checks that default values for job variables match the JSON schema 

311 defined in the work pool's base job template. It also resolves references to block 

312 documents in the default values and hydrates them to perform the validation. 

313 

314 Unlike validations for flow runs, validation here ignores required keys in the schema 

315 because we're only concerned with default values. The absence of a default for a 

316 required field is not an error, but if the full set of job variables when a flow is 

317 running, including the deployment's and flow run's overrides, fails to specify a value 

318 for the required key, that's an error. 

319 

320 NOTE: This will raise an HTTP 404 error if a referenced block document does not exist. 

321 """ 

322 try: 1b

323 await _validate_work_pool_job_variables( 1b

324 session, 

325 work_pool_name, 

326 base_job_template, 

327 ignore_required=True, 

328 ignore_invalid_defaults=False, 

329 ) 

330 except ValidationError as exc: 

331 error_msg = f"Validation failed for work pool's job variable defaults: {exc}" 

332 raise HTTPException(status.HTTP_422_UNPROCESSABLE_ENTITY, detail=error_msg) 

333 

334 

335async def validate_job_variables_for_run_deployment_action( 1a

336 session: AsyncSession, 

337 run_action: RunDeployment, 

338) -> None: 

339 """ 

340 Validate the job variables for a RunDeployment action. 

341 

342 This action is equivalent to creating a flow run for a deployment, so we validate 

343 required job variables because all variables should now be present. 

344 """ 

345 if not run_action.deployment_id: 

346 logger.error( 

347 "Cannot validate job variables for RunDeployment action because it does not have a deployment ID" 

348 ) 

349 return 

350 

351 try: 

352 deployment = await models.deployments.read_deployment( 

353 session, run_action.deployment_id 

354 ) 

355 except (DBAPIError, NoInspectionAvailable): 

356 # It's possible to get an invalid UUID here because the deployment ID is 

357 # not validated by our schemas. 

358 logger.info("Could not find deployment with ID %s", run_action.deployment_id) 

359 deployment = None 

360 if not deployment: 

361 raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Deployment not found.") 

362 

363 if not (deployment.work_queue and deployment.work_queue.work_pool): 

364 logger.info( 

365 "Cannot validate job variables for deployment %s " 

366 "because it does not have a work pool", 

367 run_action.deployment_id, 

368 ) 

369 return 

370 

371 if not (deployment.job_variables or run_action.job_variables): 

372 return 

373 

374 work_pool = deployment.work_queue.work_pool 

375 

376 await _validate_work_pool_job_variables( 

377 session, 

378 work_pool.name, 

379 work_pool.base_job_template, 

380 run_action.job_variables or {}, 

381 ignore_required=False, 

382 ignore_invalid_defaults=True, 

383 )