Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/deploy/_sla.py: 24%
37 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1from __future__ import annotations 1a
3import json 1a
4from typing import Any, List, Union 1a
5from uuid import UUID 1a
7from pydantic import TypeAdapter 1a
9from prefect._experimental.sla.objects import SlaTypes 1a
10from prefect.client.base import ServerType 1a
11from prefect.client.orchestration import PrefectClient 1a
13SlaAdapter: TypeAdapter[SlaTypes] = TypeAdapter(SlaTypes) 1a
16def _gather_deployment_sla_definitions( 1a
17 sla_flags: Union[list[str], None], existing_slas: Union[list[dict[str, Any]], None]
18) -> Union[list[dict[str, Any]], None]:
19 if sla_flags is not None:
20 sla_specs: list[dict[str, Any]] = []
21 for s in sla_flags:
22 try:
23 if s.endswith(".yaml"):
24 import yaml
26 with open(s, "r") as f:
27 sla_specs.extend(yaml.safe_load(f).get("sla", []))
28 elif s.endswith(".json"):
29 with open(s, "r") as f:
30 sla_specs.extend(json.load(f).get("sla", []))
31 else:
32 sla_specs.append(json.loads(s))
33 except Exception as e:
34 raise ValueError(f"Failed to parse SLA: {s}. Error: {str(e)}")
35 return sla_specs
37 return existing_slas
40def _initialize_deployment_slas( 1a
41 deployment_id: UUID, sla_specs: list[dict[str, Any]]
42) -> list[SlaTypes]:
43 if sla_specs == [] or sla_specs == [[]]:
44 return []
46 slas = [SlaAdapter.validate_python(spec) for spec in sla_specs]
47 for sla in slas:
48 sla.set_deployment_id(deployment_id)
49 return slas
52async def _create_slas( 1a
53 client: PrefectClient,
54 deployment_id: UUID,
55 slas: List[SlaTypes],
56):
57 if client.server_type == ServerType.CLOUD:
58 await client.apply_slas_for_deployment(deployment_id, slas)
59 else:
60 raise ValueError(
61 "SLA configuration is currently only supported on Prefect Cloud."
62 )