Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/collections.py: 26%

45 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1import json 1a

2from typing import Any, Dict 1a

3 

4import httpx 1a

5from anyio import Path 1a

6from cachetools import TTLCache 1a

7from fastapi import HTTPException, status 1a

8 

9from prefect.server.utilities.server import PrefectRouter 1a

10 

11router: PrefectRouter = PrefectRouter(prefix="/collections", tags=["Collections"]) 1a

12 

13GLOBAL_COLLECTIONS_VIEW_CACHE: TTLCache[str, dict[str, Any]] = TTLCache( 1a

14 maxsize=200, ttl=60 * 10 

15) 

16 

17REGISTRY_VIEWS = ( 1a

18 "https://raw.githubusercontent.com/PrefectHQ/prefect-collection-registry/main/views" 

19) 

20KNOWN_VIEWS = { 1a

21 "aggregate-block-metadata": f"{REGISTRY_VIEWS}/aggregate-block-metadata.json", 

22 "aggregate-flow-metadata": f"{REGISTRY_VIEWS}/aggregate-flow-metadata.json", 

23 "aggregate-worker-metadata": f"{REGISTRY_VIEWS}/aggregate-worker-metadata.json", 

24 "demo-flows": f"{REGISTRY_VIEWS}/demo-flows.json", 

25} 

26 

27 

28@router.get("/views/{view}") 1a

29async def read_view_content(view: str) -> Dict[str, Any]: 1a

30 """Reads the content of a view from the prefect-collection-registry.""" 

31 try: 

32 return await get_collection_view(view) 

33 except KeyError: 

34 raise HTTPException( 

35 status_code=status.HTTP_404_NOT_FOUND, 

36 detail=f"View {view} not found in registry", 

37 ) 

38 except httpx.HTTPStatusError as exc: 

39 if exc.response.status_code == 404: 

40 raise HTTPException( 

41 status_code=status.HTTP_404_NOT_FOUND, 

42 detail=f"Requested content missing for view {view}", 

43 ) 

44 else: 

45 raise 

46 

47 

48async def get_collection_view(view: str) -> dict[str, Any]: 1a

49 try: 

50 return GLOBAL_COLLECTIONS_VIEW_CACHE[view] 

51 except KeyError: 

52 pass 

53 

54 try: 

55 async with httpx.AsyncClient() as client: 

56 resp = await client.get(KNOWN_VIEWS[view]) 

57 resp.raise_for_status() 

58 

59 data = resp.json() 

60 if view == "aggregate-worker-metadata": 

61 data.get("prefect", {}).pop("prefect-agent", None) 

62 

63 GLOBAL_COLLECTIONS_VIEW_CACHE[view] = data 

64 return data 

65 except Exception: 

66 if view not in KNOWN_VIEWS: 

67 raise 

68 local_file = Path(__file__).parent / Path(f"collections_data/views/{view}.json") 

69 if await local_file.exists(): 

70 raw_data = await local_file.read_text() 

71 data = json.loads(raw_data) 

72 GLOBAL_COLLECTIONS_VIEW_CACHE[view] = data 

73 return data 

74 else: 

75 raise