Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/validation.py: 14%
131 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1"""
2This module contains functions for validating job variables for deployments, work pools,
3flow runs, and RunDeployment actions. These functions are used to validate that job
4variables provided by users conform to the JSON schema defined in the work pool's base job
5template.
7Note some important details:
91. The order of applying job variables is: work pool's base job template, deployment, flow
10 run. This means that flow run job variables override deployment job variables, which
11 override work pool job variables.
132. The validation of job variables for work pools and deployments ignores required keys in
14 because we don't know if the full set of overrides will include values for any required
15 fields.
173. Work pools can include default values for job variables. These can be normal types or
18 references to blocks. We have not been validating these values or whether default blocks
19 satisfy job variable JSON schemas. To avoid failing validation for existing (otherwise
20 working) data, we ignore invalid defaults when validating deployment and flow run
21 variables, but not when validating the work pool's base template, e.g. during work pool
22 creation or updates. If we find defaults that are invalid, we have to ignore required
23 fields when we run the full validation.
254. A flow run is the terminal point for job variables, so it is the only place where
26 we validate required variables and default values. Thus,
27 `validate_job_variables_for_deployment_flow_run` and
28 `validate_job_variables_for_run_deployment_action` check for required fields.
305. We have been using Pydantic v1 to generate work pool base job templates, and it produces
31 invalid JSON schemas for some fields, e.g. tuples and optional fields. We try to fix these
32 schemas on the fly while validating job variables, but there is a case we can't resolve,
33 which is whether or not an optional field supports a None value. In this case, we allow
34 None values to be passed in, which means that if an optional field does not actually
35 allow None values, the Pydantic model will fail to validate at runtime.
36"""
38from typing import TYPE_CHECKING, Any, Dict, Optional, Union 1a
39from uuid import UUID 1a
41import pydantic 1a
42from fastapi import HTTPException 1a
43from sqlalchemy.exc import DBAPIError, NoInspectionAvailable 1a
44from sqlalchemy.ext.asyncio import AsyncSession 1a
46from prefect._internal.compatibility.starlette import status 1a
47from prefect.logging import get_logger 1a
48from prefect.server import models, schemas 1a
49from prefect.server.database.orm_models import Deployment as BaseDeployment 1a
50from prefect.server.events.actions import RunDeployment 1a
51from prefect.server.schemas.core import WorkPool 1a
52from prefect.utilities.schema_tools import ValidationError, is_valid_schema, validate 1a
54if TYPE_CHECKING: 54 ↛ 55line 54 didn't jump to line 55 because the condition on line 54 was never true1a
55 import logging
57logger: "logging.Logger" = get_logger("server.api.validation") 1a
59DeploymentAction = Union[ 1a
60 schemas.actions.DeploymentCreate, schemas.actions.DeploymentUpdate
61]
62FlowRunAction = Union[ 1a
63 schemas.actions.DeploymentFlowRunCreate, schemas.actions.FlowRunUpdate
64]
67async def _get_base_config_defaults( 1a
68 session: AsyncSession,
69 base_config: dict[str, Any],
70 ignore_invalid_defaults: bool = True,
71) -> tuple[dict[str, Any], bool]:
72 variables_schema = base_config.get("variables", {})
73 fields_schema: dict[str, Any] = variables_schema.get("properties", {})
74 defaults: dict[str, Any] = dict()
75 has_invalid_defaults = False
77 if not fields_schema:
78 return defaults, has_invalid_defaults
80 for variable_name, attrs in fields_schema.items():
81 if "default" not in attrs:
82 continue
84 default = attrs["default"]
86 if isinstance(default, dict) and "$ref" in default:
87 hydrated_block = await _resolve_default_reference(default, session)
88 if hydrated_block is None:
89 continue
90 defaults[variable_name] = hydrated_block
91 else:
92 defaults[variable_name] = default
94 if ignore_invalid_defaults:
95 errors = validate(
96 {variable_name: defaults[variable_name]},
97 variables_schema,
98 raise_on_error=False,
99 preprocess=False,
100 ignore_required=True,
101 allow_none_with_default=False,
102 )
103 if errors:
104 has_invalid_defaults = True
105 try:
106 del defaults[variable_name]
107 except (IndexError, KeyError):
108 pass
110 return defaults, has_invalid_defaults
113async def _resolve_default_reference( 1a
114 variable: dict[str, Any], session: AsyncSession
115) -> Optional[Any]:
116 """
117 Resolve a reference to a block. The input variable should have a format of:
119 {
120 "$ref": {
121 "block_document_id": "block_document_id"
122 },
123 }
124 """
125 if not isinstance(variable, dict):
126 return None
128 if "$ref" not in variable:
129 return None
131 reference_data = variable.get("$ref", {})
132 if (provided_block_document_id := reference_data.get("block_document_id")) is None:
133 return None
135 if isinstance(provided_block_document_id, UUID):
136 block_document_id = provided_block_document_id
137 else:
138 try:
139 block_document_id = UUID(provided_block_document_id)
140 except ValueError:
141 raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Block not found.")
143 try:
144 block_document = await models.block_documents.read_block_document_by_id(
145 session, block_document_id
146 )
147 except pydantic.ValidationError:
148 # It's possible to get an invalid UUID here because the block document ID is
149 # not validated by our schemas.
150 logger.info("Could not find block document with ID %s", block_document_id)
151 block_document = None
153 if not block_document:
154 raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Block not found.")
156 return block_document.data
159async def _validate_work_pool_job_variables( 1a
160 session: AsyncSession,
161 work_pool_name: str,
162 base_job_template: Dict[str, Any],
163 *job_vars: Dict[str, Any],
164 ignore_required: bool = True,
165 ignore_invalid_defaults: bool = True,
166 raise_on_error=True,
167) -> None:
168 if not base_job_template:
169 logger.info(
170 "Cannot validate job variables for work pool %s because it does not have a base job template",
171 work_pool_name,
172 )
173 return
175 variables_schema = base_job_template.get("variables")
176 if not variables_schema:
177 logger.info(
178 "Cannot validate job variables for work pool %s "
179 "because it does not specify a variables schema",
180 work_pool_name,
181 )
182 return
184 try:
185 is_valid_schema(variables_schema, preprocess=False)
186 except ValueError as exc:
187 raise HTTPException(
188 status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=str(exc)
189 )
191 base_vars, invalid_defaults = await _get_base_config_defaults(
192 session, base_job_template, ignore_invalid_defaults
193 )
194 all_job_vars = {**base_vars}
196 for jvs in job_vars:
197 if isinstance(jvs, dict):
198 all_job_vars.update(jvs)
200 # If we are ignoring validation for default values and there were invalid defaults,
201 # then we can't check for required fields because we won't have the default values
202 # to satisfy nissing required fields.
203 should_ignore_required = ignore_required or (
204 ignore_invalid_defaults and invalid_defaults
205 )
207 validate(
208 all_job_vars,
209 variables_schema,
210 raise_on_error=raise_on_error,
211 preprocess=True,
212 ignore_required=should_ignore_required,
213 # We allow None values to be passed in for optional fields if there is a default
214 # value for the field. This is because we have blocks that contain default None
215 # values that will fail to validate otherwise. However, this means that if an
216 # optional field does not actually allow None values, the Pydantic model will fail
217 # to validate at runtime. Unfortunately, there is not a good solution to this
218 # problem at this time.
219 allow_none_with_default=True,
220 )
223async def validate_job_variables_for_deployment_flow_run( 1a
224 session: AsyncSession,
225 deployment: BaseDeployment,
226 flow_run: FlowRunAction,
227) -> None:
228 """
229 Validate job variables for a flow run created for a deployment.
231 Flow runs are the terminal point for job variable overlays, so we validate required
232 job variables because all variables should now be present.
233 """
234 # If we aren't able to access a deployment's work pool, we don't have a base job
235 # template to validate job variables against. This is not a validation failure because
236 # some deployments may not have a work pool, such as those created by flow.serve().
237 if not (deployment.work_queue and deployment.work_queue.work_pool):
238 logger.info(
239 "Cannot validate job variables for deployment %s "
240 "because it does not have a work pool",
241 deployment.id,
242 )
243 return
245 work_pool = deployment.work_queue.work_pool
247 try:
248 await _validate_work_pool_job_variables(
249 session,
250 work_pool.name,
251 work_pool.base_job_template,
252 deployment.job_variables or {},
253 flow_run.job_variables or {},
254 ignore_required=False,
255 ignore_invalid_defaults=True,
256 )
257 except ValidationError as exc:
258 if isinstance(flow_run, schemas.actions.DeploymentFlowRunCreate):
259 error_msg = f"Error creating flow run: {exc}"
260 else:
261 error_msg = f"Error updating flow run: {exc}"
262 raise HTTPException(status.HTTP_422_UNPROCESSABLE_ENTITY, detail=error_msg)
265async def validate_job_variables_for_deployment( 1a
266 session: AsyncSession,
267 work_pool: WorkPool,
268 deployment: DeploymentAction,
269) -> None:
270 """
271 Validate job variables for deployment creation and updates.
273 This validation applies only to deployments that have a work pool. If the deployment
274 does not have a work pool, we cannot validate job variables because we don't have a
275 base job template to validate against, so we skip this validation.
277 Unlike validations for flow runs, validation here ignores required keys in the schema
278 because we don't know if the full set of overrides will include values for any
279 required fields. If the full set of job variables when a flow is running, including
280 the deployment's and flow run's overrides, fails to specify a value for the required
281 key, that's an error.
282 """
283 if not deployment.job_variables:
284 return
285 try:
286 await _validate_work_pool_job_variables(
287 session,
288 work_pool.name,
289 work_pool.base_job_template,
290 deployment.job_variables or {},
291 ignore_required=True,
292 ignore_invalid_defaults=True,
293 )
294 except ValidationError as exc:
295 if isinstance(deployment, schemas.actions.DeploymentCreate):
296 error_msg = f"Error creating deployment: {exc}"
297 else:
298 error_msg = f"Error updating deployment: {exc}"
299 raise HTTPException(status.HTTP_422_UNPROCESSABLE_ENTITY, detail=error_msg)
302async def validate_job_variable_defaults_for_work_pool( 1a
303 session: AsyncSession,
304 work_pool_name: str,
305 base_job_template: Dict[str, Any],
306) -> None:
307 """
308 Validate the default job variables for a work pool.
310 This validation checks that default values for job variables match the JSON schema
311 defined in the work pool's base job template. It also resolves references to block
312 documents in the default values and hydrates them to perform the validation.
314 Unlike validations for flow runs, validation here ignores required keys in the schema
315 because we're only concerned with default values. The absence of a default for a
316 required field is not an error, but if the full set of job variables when a flow is
317 running, including the deployment's and flow run's overrides, fails to specify a value
318 for the required key, that's an error.
320 NOTE: This will raise an HTTP 404 error if a referenced block document does not exist.
321 """
322 try:
323 await _validate_work_pool_job_variables(
324 session,
325 work_pool_name,
326 base_job_template,
327 ignore_required=True,
328 ignore_invalid_defaults=False,
329 )
330 except ValidationError as exc:
331 error_msg = f"Validation failed for work pool's job variable defaults: {exc}"
332 raise HTTPException(status.HTTP_422_UNPROCESSABLE_ENTITY, detail=error_msg)
335async def validate_job_variables_for_run_deployment_action( 1a
336 session: AsyncSession,
337 run_action: RunDeployment,
338) -> None:
339 """
340 Validate the job variables for a RunDeployment action.
342 This action is equivalent to creating a flow run for a deployment, so we validate
343 required job variables because all variables should now be present.
344 """
345 if not run_action.deployment_id:
346 logger.error(
347 "Cannot validate job variables for RunDeployment action because it does not have a deployment ID"
348 )
349 return
351 try:
352 deployment = await models.deployments.read_deployment(
353 session, run_action.deployment_id
354 )
355 except (DBAPIError, NoInspectionAvailable):
356 # It's possible to get an invalid UUID here because the deployment ID is
357 # not validated by our schemas.
358 logger.info("Could not find deployment with ID %s", run_action.deployment_id)
359 deployment = None
360 if not deployment:
361 raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Deployment not found.")
363 if not (deployment.work_queue and deployment.work_queue.work_pool):
364 logger.info(
365 "Cannot validate job variables for deployment %s "
366 "because it does not have a work pool",
367 run_action.deployment_id,
368 )
369 return
371 if not (deployment.job_variables or run_action.job_variables):
372 return
374 work_pool = deployment.work_queue.work_pool
376 await _validate_work_pool_job_variables(
377 session,
378 work_pool.name,
379 work_pool.base_job_template,
380 run_action.job_variables or {},
381 ignore_required=False,
382 ignore_invalid_defaults=True,
383 )