Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/transfer/_migratable_resources/work_pools.py: 0%

74 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1from __future__ import annotations 

2 

3import uuid 

4 

5from typing_extensions import Self 

6 

7from prefect.cli.transfer._exceptions import TransferSkipped 

8from prefect.cli.transfer._migratable_resources import construct_migratable_resource 

9from prefect.cli.transfer._migratable_resources.base import ( 

10 MigratableProtocol, 

11 MigratableResource, 

12) 

13from prefect.cli.transfer._migratable_resources.blocks import MigratableBlockDocument 

14from prefect.client.orchestration import get_client 

15from prefect.client.schemas.actions import ( 

16 WorkPoolCreate, 

17) 

18from prefect.client.schemas.objects import ( 

19 WorkPool, 

20 WorkQueue, 

21) 

22from prefect.exceptions import ( 

23 ObjectAlreadyExists, 

24 ObjectUnsupported, 

25) 

26 

27 

28class MigratableWorkPool(MigratableResource[WorkPool]): 

29 _instances: dict[uuid.UUID, Self] = {} 

30 

31 def __init__(self, work_pool: WorkPool, default_queue: WorkQueue): 

32 self.source_work_pool = work_pool 

33 self.source_default_queue = default_queue 

34 self.destination_work_pool: WorkPool | None = None 

35 self._dependencies: list[MigratableProtocol] = [] 

36 

37 @property 

38 def source_id(self) -> uuid.UUID: 

39 return self.source_work_pool.id 

40 

41 @property 

42 def destination_id(self) -> uuid.UUID | None: 

43 return self.destination_work_pool.id if self.destination_work_pool else None 

44 

45 @classmethod 

46 async def construct(cls, obj: WorkPool) -> Self: 

47 if obj.id in cls._instances: 

48 return cls._instances[obj.id] 

49 async with get_client() as client: 

50 default_queue = await client.read_work_queue(obj.default_queue_id) 

51 instance = cls(obj, default_queue) 

52 cls._instances[obj.id] = instance 

53 return instance 

54 

55 @classmethod 

56 async def get_instance(cls, id: uuid.UUID) -> "MigratableResource[WorkPool] | None": 

57 if id in cls._instances: 

58 return cls._instances[id] 

59 return None 

60 

61 @classmethod 

62 async def get_instance_by_name( 

63 cls, name: str 

64 ) -> "MigratableResource[WorkPool] | None": 

65 for instance in cls._instances.values(): 

66 if instance.source_work_pool.name == name: 

67 return instance 

68 return None 

69 

70 async def get_dependencies(self) -> "list[MigratableProtocol]": 

71 if self._dependencies: 

72 return self._dependencies 

73 

74 async with get_client() as client: 

75 if ( 

76 self.source_work_pool.storage_configuration.default_result_storage_block_id 

77 is not None 

78 ): 

79 if dependency := await MigratableBlockDocument.get_instance( 

80 id=self.source_work_pool.storage_configuration.default_result_storage_block_id 

81 ): 

82 self._dependencies.append(dependency) 

83 else: 

84 result_storage_block = await client.read_block_document( 

85 self.source_work_pool.storage_configuration.default_result_storage_block_id 

86 ) 

87 self._dependencies.append( 

88 await construct_migratable_resource(result_storage_block) 

89 ) 

90 if ( 

91 self.source_work_pool.storage_configuration.bundle_upload_step 

92 is not None 

93 ): 

94 # TODO: Figure out how to find block document references in bundle upload step 

95 pass 

96 if ( 

97 self.source_work_pool.storage_configuration.bundle_execution_step 

98 is not None 

99 ): 

100 # TODO: Figure out how to find block document references in bundle download step 

101 pass 

102 return self._dependencies 

103 

104 async def migrate(self) -> None: 

105 async with get_client() as client: 

106 # Skip managed pools always - they're cloud-specific infrastructure 

107 if self.source_work_pool.is_managed_pool: 

108 raise TransferSkipped("Skipped managed pool (cloud-specific)") 

109 

110 # Allow push pools only if destination is Cloud 

111 if self.source_work_pool.is_push_pool: 

112 from prefect.client.base import ServerType 

113 

114 if client.server_type != ServerType.CLOUD: 

115 raise TransferSkipped("Skipped push pool (requires Prefect Cloud)") 

116 try: 

117 self.destination_work_pool = await client.create_work_pool( 

118 work_pool=WorkPoolCreate( 

119 name=self.source_work_pool.name, 

120 type=self.source_work_pool.type, 

121 base_job_template=self.source_work_pool.base_job_template, 

122 is_paused=self.source_work_pool.is_paused, 

123 concurrency_limit=self.source_work_pool.concurrency_limit, 

124 storage_configuration=self.source_work_pool.storage_configuration, 

125 ), 

126 ) 

127 except ObjectUnsupported: 

128 raise TransferSkipped("Destination requires Standard/Pro tier") 

129 except ObjectAlreadyExists: 

130 self.destination_work_pool = await client.read_work_pool( 

131 self.source_work_pool.name 

132 ) 

133 raise TransferSkipped("Already exists") 

134 

135 # Update the default queue after successful creation 

136 await client.update_work_queue( 

137 id=self.destination_work_pool.default_queue_id, 

138 description=self.source_default_queue.description, 

139 priority=self.source_default_queue.priority, 

140 concurrency_limit=self.source_default_queue.concurrency_limit, 

141 )