Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/ui/schemas.py: 48%

29 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1from typing import TYPE_CHECKING, Any 1a

2 

3from fastapi import Body, Depends, HTTPException 1a

4 

5from prefect._internal.compatibility.starlette import status 1a

6from prefect.logging import get_logger 1a

7from prefect.server.database import PrefectDBInterface, provide_database_interface 1a

8from prefect.server.schemas.responses import SchemaValuesValidationResponse 1a

9from prefect.server.utilities.server import APIRouter 1a

10from prefect.utilities.schema_tools.hydration import HydrationContext, hydrate 1a

11from prefect.utilities.schema_tools.validation import ( 1a

12 CircularSchemaRefError, 

13 build_error_obj, 

14 is_valid_schema, 

15 preprocess_schema, 

16 validate, 

17) 

18 

19if TYPE_CHECKING: 19 ↛ 20line 19 didn't jump to line 20 because the condition on line 19 was never true1a

20 import logging 

21 

22router: APIRouter = APIRouter(prefix="/ui/schemas", tags=["UI", "Schemas"]) 1a

23 

24logger: "logging.Logger" = get_logger("server.api.ui.schemas") 1a

25 

26 

27@router.post("/validate") 1a

28async def validate_obj( 1a

29 json_schema: dict[str, Any] = Body( 

30 ..., 

31 embed=True, 

32 alias="schema", 

33 validation_alias="schema", 

34 json_schema_extra={"additionalProperties": True}, 

35 ), 

36 values: dict[str, Any] = Body( 

37 ..., embed=True, json_schema_extra={"additionalProperties": True} 

38 ), 

39 db: PrefectDBInterface = Depends(provide_database_interface), 

40) -> SchemaValuesValidationResponse: 

41 schema = preprocess_schema(json_schema) 

42 

43 try: 

44 is_valid_schema(schema, preprocess=False) 

45 except ValueError as exc: 

46 raise HTTPException( 

47 status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=str(exc) 

48 ) 

49 

50 async with db.session_context() as session: 

51 ctx = await HydrationContext.build( 

52 session=session, render_jinja=False, render_workspace_variables=True 

53 ) 

54 

55 hydrated_values = hydrate(values, ctx) 

56 try: 

57 errors = validate(hydrated_values, schema, preprocess=False) 

58 except CircularSchemaRefError: 

59 raise HTTPException( 

60 status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, 

61 detail="Invalid schema: Unable to validate schema with circular references.", 

62 ) 

63 error_obj = build_error_obj(errors) 

64 

65 return error_obj