Coverage for /usr/local/lib/python3.12/site-packages/prefect/workers/utilities.py: 18%

34 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1from copy import deepcopy 1a

2from logging import getLogger 1a

3from typing import Any, Dict, List, Optional 1a

4 

5from prefect.client.collections import get_collections_metadata_client 1a

6from prefect.logging.loggers import get_logger 1a

7from prefect.settings import PREFECT_DEBUG_MODE 1a

8from prefect.workers.base import BaseWorker 1a

9 

10 

11async def get_available_work_pool_types() -> List[str]: 1a

12 work_pool_types = set(BaseWorker.get_all_available_worker_types()) 

13 

14 async with get_collections_metadata_client() as collections_client: 

15 try: 

16 worker_metadata = await collections_client.read_worker_metadata() 

17 for collection in worker_metadata.values(): 

18 for worker in collection.values(): 

19 work_pool_types.add(worker.get("type")) 

20 except Exception: 

21 # Return only work pool types from the local type registry if 

22 # the request to the collections registry fails. 

23 if PREFECT_DEBUG_MODE: 

24 getLogger().warning( 

25 "Unable to get worker metadata from the collections registry", 

26 exc_info=True, 

27 ) 

28 

29 return sorted(filter(None, work_pool_types)) 

30 

31 

32async def get_default_base_job_template_for_infrastructure_type( 1a

33 infra_type: str, 

34) -> Optional[Dict[str, Any]]: 

35 # Attempt to get the default base job template for the worker type 

36 # from the local type registry first. 

37 worker_cls = BaseWorker.get_worker_class_from_type(infra_type) 

38 if worker_cls is not None: 

39 return deepcopy(worker_cls.get_default_base_job_template()) 

40 

41 # If the worker type is not found in the local type registry, attempt to 

42 # get the default base job template from the collections registry. 

43 async with get_collections_metadata_client() as collections_client: 

44 try: 

45 worker_metadata = await collections_client.read_worker_metadata() 

46 for collection in worker_metadata.values(): 

47 for worker in collection.values(): 

48 if worker.get("type") == infra_type: 

49 return worker.get("default_base_job_configuration") 

50 except Exception: 

51 if PREFECT_DEBUG_MODE: 

52 get_logger().warning( 

53 ( 

54 "Unable to get default base job template for" 

55 f" {infra_type!r} worker type" 

56 ), 

57 exc_info=True, 

58 ) 

59 return None