Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/transfer/_migratable_resources/automations.py: 0%
100 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1from __future__ import annotations
3import uuid
5from typing_extensions import Self
7from prefect.cli.transfer._exceptions import TransferSkipped
8from prefect.cli.transfer._migratable_resources import construct_migratable_resource
9from prefect.cli.transfer._migratable_resources.base import (
10 MigratableProtocol,
11 MigratableResource,
12)
13from prefect.cli.transfer._migratable_resources.blocks import MigratableBlockDocument
14from prefect.cli.transfer._migratable_resources.deployments import MigratableDeployment
15from prefect.cli.transfer._migratable_resources.work_pools import MigratableWorkPool
16from prefect.cli.transfer._migratable_resources.work_queues import MigratableWorkQueue
17from prefect.client.orchestration import get_client
18from prefect.client.schemas.filters import (
19 WorkPoolFilter,
20 WorkPoolFilterId,
21)
22from prefect.events.actions import (
23 AutomationAction,
24 CallWebhook,
25 DeploymentAction,
26 SendNotification,
27 WorkPoolAction,
28 WorkQueueAction,
29)
30from prefect.events.schemas.automations import Automation, AutomationCore
33class MigratableAutomation(MigratableResource[Automation]):
34 _instances: dict[uuid.UUID, Self] = {}
36 def __init__(self, automation: Automation):
37 self.source_automation = automation
38 self.destination_automation: Automation | None = None
39 self._dependencies: dict[uuid.UUID, MigratableProtocol] = {}
41 @property
42 def source_id(self) -> uuid.UUID:
43 return self.source_automation.id
45 @property
46 def destination_id(self) -> uuid.UUID | None:
47 return self.destination_automation.id if self.destination_automation else None
49 @classmethod
50 async def construct(cls, obj: Automation) -> Self:
51 if obj.id in cls._instances:
52 return cls._instances[obj.id]
53 instance = cls(obj)
54 cls._instances[obj.id] = instance
55 return instance
57 @classmethod
58 async def get_instance(
59 cls, id: uuid.UUID
60 ) -> "MigratableResource[Automation] | None":
61 if id in cls._instances:
62 return cls._instances[id]
63 return None
65 async def get_dependencies(self) -> "list[MigratableProtocol]":
66 if self._dependencies:
67 return list(self._dependencies.values())
69 async with get_client() as client:
70 for action in self.source_automation.actions:
71 if (
72 isinstance(action, DeploymentAction)
73 and action.deployment_id is not None
74 ):
75 if dependency := await MigratableDeployment.get_instance(
76 id=action.deployment_id
77 ):
78 self._dependencies[action.deployment_id] = dependency
79 else:
80 deployment = await client.read_deployment(action.deployment_id)
81 self._dependencies[
82 deployment.id
83 ] = await construct_migratable_resource(deployment)
84 elif (
85 isinstance(action, WorkPoolAction)
86 and action.work_pool_id is not None
87 ):
88 # TODO: Find a better way to get a work pool by id
89 if dependency := await MigratableWorkPool.get_instance(
90 id=action.work_pool_id
91 ):
92 self._dependencies[action.work_pool_id] = dependency
93 else:
94 work_pool = await client.read_work_pools(
95 work_pool_filter=WorkPoolFilter(
96 id=WorkPoolFilterId(any_=[action.work_pool_id])
97 )
98 )
99 if work_pool:
100 self._dependencies[
101 work_pool[0].id
102 ] = await construct_migratable_resource(work_pool[0])
103 elif (
104 isinstance(action, WorkQueueAction)
105 and action.work_queue_id is not None
106 ):
107 if dependency := await MigratableWorkQueue.get_instance(
108 id=action.work_queue_id
109 ):
110 self._dependencies[action.work_queue_id] = dependency
111 else:
112 work_queue = await client.read_work_queue(action.work_queue_id)
113 self._dependencies[
114 work_queue.id
115 ] = await construct_migratable_resource(work_queue)
116 elif (
117 isinstance(action, AutomationAction)
118 and action.automation_id is not None
119 ):
120 if dependency := await MigratableAutomation.get_instance(
121 id=action.automation_id
122 ):
123 self._dependencies[action.automation_id] = dependency
124 else:
125 automation = await client.find_automation(action.automation_id)
126 if automation:
127 self._dependencies[
128 automation.id
129 ] = await construct_migratable_resource(automation)
130 elif isinstance(action, CallWebhook):
131 if dependency := await MigratableBlockDocument.get_instance(
132 id=action.block_document_id
133 ):
134 self._dependencies[action.block_document_id] = dependency
135 else:
136 block_document = await client.read_block_document(
137 action.block_document_id
138 )
139 self._dependencies[
140 block_document.id
141 ] = await construct_migratable_resource(block_document)
142 elif isinstance(action, SendNotification):
143 if dependency := await MigratableBlockDocument.get_instance(
144 id=action.block_document_id
145 ):
146 self._dependencies[action.block_document_id] = dependency
147 else:
148 block_document = await client.read_block_document(
149 action.block_document_id
150 )
151 self._dependencies[
152 block_document.id
153 ] = await construct_migratable_resource(block_document)
154 return list(self._dependencies.values())
156 async def migrate(self) -> None:
157 async with get_client() as client:
158 automations = await client.read_automations_by_name(
159 name=self.source_automation.name
160 )
161 if automations:
162 self.destination_automation = automations[0]
163 raise TransferSkipped("Already exists")
164 else:
165 automation_copy = AutomationCore.model_validate(
166 self.source_automation.model_dump(mode="json")
167 )
168 for action in automation_copy.actions:
169 if (
170 isinstance(action, DeploymentAction)
171 and action.deployment_id is not None
172 ):
173 action.deployment_id = self._dependencies[
174 action.deployment_id
175 ].destination_id
176 elif (
177 isinstance(action, WorkPoolAction)
178 and action.work_pool_id is not None
179 ):
180 action.work_pool_id = self._dependencies[
181 action.work_pool_id
182 ].destination_id
183 elif (
184 isinstance(action, WorkQueueAction)
185 and action.work_queue_id is not None
186 ):
187 action.work_queue_id = self._dependencies[
188 action.work_queue_id
189 ].destination_id
190 elif (
191 isinstance(action, AutomationAction)
192 and action.automation_id is not None
193 ):
194 action.automation_id = self._dependencies[
195 action.automation_id
196 ].destination_id
197 elif isinstance(action, CallWebhook):
198 if destination_block_document_id := getattr(
199 self._dependencies.get(action.block_document_id),
200 "destination_id",
201 None,
202 ):
203 action.block_document_id = destination_block_document_id
204 elif isinstance(action, SendNotification):
205 if destination_block_document_id := getattr(
206 self._dependencies.get(action.block_document_id),
207 "destination_id",
208 None,
209 ):
210 action.block_document_id = destination_block_document_id
212 automation_id = await client.create_automation(
213 automation=automation_copy
214 )
215 self.destination_automation = await client.read_automation(
216 automation_id=automation_id
217 )