Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/collections.py: 26%
45 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1import json 1a
2from typing import Any, Dict 1a
4import httpx 1a
5from anyio import Path 1a
6from cachetools import TTLCache 1a
7from fastapi import HTTPException, status 1a
9from prefect.server.utilities.server import PrefectRouter 1a
11router: PrefectRouter = PrefectRouter(prefix="/collections", tags=["Collections"]) 1a
13GLOBAL_COLLECTIONS_VIEW_CACHE: TTLCache[str, dict[str, Any]] = TTLCache( 1a
14 maxsize=200, ttl=60 * 10
15)
17REGISTRY_VIEWS = ( 1a
18 "https://raw.githubusercontent.com/PrefectHQ/prefect-collection-registry/main/views"
19)
20KNOWN_VIEWS = { 1a
21 "aggregate-block-metadata": f"{REGISTRY_VIEWS}/aggregate-block-metadata.json",
22 "aggregate-flow-metadata": f"{REGISTRY_VIEWS}/aggregate-flow-metadata.json",
23 "aggregate-worker-metadata": f"{REGISTRY_VIEWS}/aggregate-worker-metadata.json",
24 "demo-flows": f"{REGISTRY_VIEWS}/demo-flows.json",
25}
28@router.get("/views/{view}") 1a
29async def read_view_content(view: str) -> Dict[str, Any]: 1a
30 """Reads the content of a view from the prefect-collection-registry."""
31 try:
32 return await get_collection_view(view)
33 except KeyError:
34 raise HTTPException(
35 status_code=status.HTTP_404_NOT_FOUND,
36 detail=f"View {view} not found in registry",
37 )
38 except httpx.HTTPStatusError as exc:
39 if exc.response.status_code == 404:
40 raise HTTPException(
41 status_code=status.HTTP_404_NOT_FOUND,
42 detail=f"Requested content missing for view {view}",
43 )
44 else:
45 raise
48async def get_collection_view(view: str) -> dict[str, Any]: 1a
49 try:
50 return GLOBAL_COLLECTIONS_VIEW_CACHE[view]
51 except KeyError:
52 pass
54 try:
55 async with httpx.AsyncClient() as client:
56 resp = await client.get(KNOWN_VIEWS[view])
57 resp.raise_for_status()
59 data = resp.json()
60 if view == "aggregate-worker-metadata":
61 data.get("prefect", {}).pop("prefect-agent", None)
63 GLOBAL_COLLECTIONS_VIEW_CACHE[view] = data
64 return data
65 except Exception:
66 if view not in KNOWN_VIEWS:
67 raise
68 local_file = Path(__file__).parent / Path(f"collections_data/views/{view}.json")
69 if await local_file.exists():
70 raw_data = await local_file.read_text()
71 data = json.loads(raw_data)
72 GLOBAL_COLLECTIONS_VIEW_CACHE[view] = data
73 return data
74 else:
75 raise