Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/ui/schemas.py: 48%
29 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1from typing import TYPE_CHECKING, Any 1a
3from fastapi import Body, Depends, HTTPException 1a
5from prefect._internal.compatibility.starlette import status 1a
6from prefect.logging import get_logger 1a
7from prefect.server.database import PrefectDBInterface, provide_database_interface 1a
8from prefect.server.schemas.responses import SchemaValuesValidationResponse 1a
9from prefect.server.utilities.server import APIRouter 1a
10from prefect.utilities.schema_tools.hydration import HydrationContext, hydrate 1a
11from prefect.utilities.schema_tools.validation import ( 1a
12 CircularSchemaRefError,
13 build_error_obj,
14 is_valid_schema,
15 preprocess_schema,
16 validate,
17)
19if TYPE_CHECKING: 19 ↛ 20line 19 didn't jump to line 20 because the condition on line 19 was never true1a
20 import logging
22router: APIRouter = APIRouter(prefix="/ui/schemas", tags=["UI", "Schemas"]) 1a
24logger: "logging.Logger" = get_logger("server.api.ui.schemas") 1a
27@router.post("/validate") 1a
28async def validate_obj( 1a
29 json_schema: dict[str, Any] = Body(
30 ...,
31 embed=True,
32 alias="schema",
33 validation_alias="schema",
34 json_schema_extra={"additionalProperties": True},
35 ),
36 values: dict[str, Any] = Body(
37 ..., embed=True, json_schema_extra={"additionalProperties": True}
38 ),
39 db: PrefectDBInterface = Depends(provide_database_interface),
40) -> SchemaValuesValidationResponse:
41 schema = preprocess_schema(json_schema)
43 try:
44 is_valid_schema(schema, preprocess=False)
45 except ValueError as exc:
46 raise HTTPException(
47 status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=str(exc)
48 )
50 async with db.session_context() as session:
51 ctx = await HydrationContext.build(
52 session=session, render_jinja=False, render_workspace_variables=True
53 )
55 hydrated_values = hydrate(values, ctx)
56 try:
57 errors = validate(hydrated_values, schema, preprocess=False)
58 except CircularSchemaRefError:
59 raise HTTPException(
60 status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
61 detail="Invalid schema: Unable to validate schema with circular references.",
62 )
63 error_obj = build_error_obj(errors)
65 return error_obj