Coverage for /usr/local/lib/python3.12/site-packages/prefect/workers/utilities.py: 18%
34 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1from copy import deepcopy 1a
2from logging import getLogger 1a
3from typing import Any, Dict, List, Optional 1a
5from prefect.client.collections import get_collections_metadata_client 1a
6from prefect.logging.loggers import get_logger 1a
7from prefect.settings import PREFECT_DEBUG_MODE 1a
8from prefect.workers.base import BaseWorker 1a
11async def get_available_work_pool_types() -> List[str]: 1a
12 work_pool_types = set(BaseWorker.get_all_available_worker_types())
14 async with get_collections_metadata_client() as collections_client:
15 try:
16 worker_metadata = await collections_client.read_worker_metadata()
17 for collection in worker_metadata.values():
18 for worker in collection.values():
19 work_pool_types.add(worker.get("type"))
20 except Exception:
21 # Return only work pool types from the local type registry if
22 # the request to the collections registry fails.
23 if PREFECT_DEBUG_MODE:
24 getLogger().warning(
25 "Unable to get worker metadata from the collections registry",
26 exc_info=True,
27 )
29 return sorted(filter(None, work_pool_types))
32async def get_default_base_job_template_for_infrastructure_type( 1a
33 infra_type: str,
34) -> Optional[Dict[str, Any]]:
35 # Attempt to get the default base job template for the worker type
36 # from the local type registry first.
37 worker_cls = BaseWorker.get_worker_class_from_type(infra_type)
38 if worker_cls is not None:
39 return deepcopy(worker_cls.get_default_base_job_template())
41 # If the worker type is not found in the local type registry, attempt to
42 # get the default base job template from the collections registry.
43 async with get_collections_metadata_client() as collections_client:
44 try:
45 worker_metadata = await collections_client.read_worker_metadata()
46 for collection in worker_metadata.values():
47 for worker in collection.values():
48 if worker.get("type") == infra_type:
49 return worker.get("default_base_job_configuration")
50 except Exception:
51 if PREFECT_DEBUG_MODE:
52 get_logger().warning(
53 (
54 "Unable to get default base job template for"
55 f" {infra_type!r} worker type"
56 ),
57 exc_info=True,
58 )
59 return None