Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/collections.py: 55%
45 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1import json 1a
2from typing import Any, Dict 1a
4import httpx 1a
5from anyio import Path 1a
6from cachetools import TTLCache 1a
7from fastapi import HTTPException, status 1a
9from prefect.server.utilities.server import PrefectRouter 1a
11router: PrefectRouter = PrefectRouter(prefix="/collections", tags=["Collections"]) 1a
13GLOBAL_COLLECTIONS_VIEW_CACHE: TTLCache[str, dict[str, Any]] = TTLCache( 1a
14 maxsize=200, ttl=60 * 10
15)
17REGISTRY_VIEWS = ( 1a
18 "https://raw.githubusercontent.com/PrefectHQ/prefect-collection-registry/main/views"
19)
20KNOWN_VIEWS = { 1a
21 "aggregate-block-metadata": f"{REGISTRY_VIEWS}/aggregate-block-metadata.json",
22 "aggregate-flow-metadata": f"{REGISTRY_VIEWS}/aggregate-flow-metadata.json",
23 "aggregate-worker-metadata": f"{REGISTRY_VIEWS}/aggregate-worker-metadata.json",
24 "demo-flows": f"{REGISTRY_VIEWS}/demo-flows.json",
25}
28@router.get("/views/{view}") 1a
29async def read_view_content(view: str) -> Dict[str, Any]: 1a
30 """Reads the content of a view from the prefect-collection-registry."""
31 try: 1bc
32 return await get_collection_view(view) 1bc
33 except KeyError: 1bc
34 raise HTTPException( 1bc
35 status_code=status.HTTP_404_NOT_FOUND,
36 detail=f"View {view} not found in registry",
37 )
38 except httpx.HTTPStatusError as exc:
39 if exc.response.status_code == 404:
40 raise HTTPException(
41 status_code=status.HTTP_404_NOT_FOUND,
42 detail=f"Requested content missing for view {view}",
43 )
44 else:
45 raise
48async def get_collection_view(view: str) -> dict[str, Any]: 1a
49 try: 1bc
50 return GLOBAL_COLLECTIONS_VIEW_CACHE[view] 1bc
51 except KeyError: 1bc
52 pass 1bc
54 try: 1bc
55 async with httpx.AsyncClient() as client: 1bc
56 resp = await client.get(KNOWN_VIEWS[view]) 1bc
57 resp.raise_for_status()
59 data = resp.json()
60 if view == "aggregate-worker-metadata":
61 data.get("prefect", {}).pop("prefect-agent", None)
63 GLOBAL_COLLECTIONS_VIEW_CACHE[view] = data
64 return data
65 except Exception: 1bc
66 if view not in KNOWN_VIEWS: 66 ↛ 68line 66 didn't jump to line 68 because the condition on line 66 was always true1bc
67 raise 1bc
68 local_file = Path(__file__).parent / Path(f"collections_data/views/{view}.json")
69 if await local_file.exists():
70 raw_data = await local_file.read_text()
71 data = json.loads(raw_data)
72 GLOBAL_COLLECTIONS_VIEW_CACHE[view] = data
73 return data
74 else:
75 raise