Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/transfer/_migratable_resources/concurrency_limits.py: 0%

42 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1from __future__ import annotations 

2 

3import uuid 

4 

5from typing_extensions import Self 

6 

7from prefect.cli.transfer._exceptions import TransferSkipped 

8from prefect.cli.transfer._migratable_resources.base import ( 

9 MigratableProtocol, 

10 MigratableResource, 

11) 

12from prefect.client.orchestration import get_client 

13from prefect.client.schemas.actions import ( 

14 GlobalConcurrencyLimitCreate, 

15) 

16from prefect.client.schemas.responses import ( 

17 GlobalConcurrencyLimitResponse, 

18) 

19from prefect.exceptions import ( 

20 ObjectAlreadyExists, 

21) 

22 

23 

24class MigratableGlobalConcurrencyLimit( 

25 MigratableResource[GlobalConcurrencyLimitResponse] 

26): 

27 _instances: dict[uuid.UUID, Self] = {} 

28 

29 def __init__(self, global_concurrency_limit: GlobalConcurrencyLimitResponse): 

30 self.source_global_concurrency_limit = global_concurrency_limit 

31 self.destination_global_concurrency_limit: ( 

32 GlobalConcurrencyLimitResponse | None 

33 ) = None 

34 

35 @property 

36 def source_id(self) -> uuid.UUID: 

37 return self.source_global_concurrency_limit.id 

38 

39 @property 

40 def destination_id(self) -> uuid.UUID | None: 

41 return ( 

42 self.destination_global_concurrency_limit.id 

43 if self.destination_global_concurrency_limit 

44 else None 

45 ) 

46 

47 @classmethod 

48 async def construct(cls, obj: GlobalConcurrencyLimitResponse) -> Self: 

49 if obj.id in cls._instances: 

50 return cls._instances[obj.id] 

51 instance = cls(obj) 

52 cls._instances[obj.id] = instance 

53 return instance 

54 

55 @classmethod 

56 async def get_instance( 

57 cls, id: uuid.UUID 

58 ) -> "MigratableResource[GlobalConcurrencyLimitResponse] | None": 

59 if id in cls._instances: 

60 return cls._instances[id] 

61 return None 

62 

63 async def get_dependencies(self) -> "list[MigratableProtocol]": 

64 return [] 

65 

66 async def migrate(self) -> None: 

67 async with get_client() as client: 

68 try: 

69 await client.create_global_concurrency_limit( 

70 concurrency_limit=GlobalConcurrencyLimitCreate( 

71 name=self.source_global_concurrency_limit.name, 

72 limit=self.source_global_concurrency_limit.limit, 

73 active=self.source_global_concurrency_limit.active, 

74 active_slots=self.source_global_concurrency_limit.active_slots, 

75 ), 

76 ) 

77 self.destination_global_concurrency_limit = ( 

78 await client.read_global_concurrency_limit_by_name( 

79 self.source_global_concurrency_limit.name 

80 ) 

81 ) 

82 except ObjectAlreadyExists: 

83 self.destination_global_concurrency_limit = ( 

84 await client.read_global_concurrency_limit_by_name( 

85 self.source_global_concurrency_limit.name 

86 ) 

87 ) 

88 raise TransferSkipped("Already exists")