1 """Remove flow run notifications
2
3 Revision ID: 7655f31c5157
4 Revises: bbca16f6f218
5 Create Date: 2025-04-28 13:37:22.112876
6
7 """
8
9 from __future__ import annotations 1 ctx 1a
10
11 import json 1 ctx 1a
12 import textwrap 1 ctx 1a
13 from uuid import UUID , uuid4 1 ctx 1a
14
15 import sqlalchemy as sa 1 ctx 1a
16 from alembic import op 1 ctx 1a
17
18 import prefect 1 ctx 1a
19 from prefect . logging . loggers import get_logger 1 ctx 1a
20
21 # revision identifiers, used by Alembic.
22 revision = "7655f31c5157" 1 ctx 1a
23 down_revision = "bbca16f6f218" 1 ctx 1a
24 branch_labels = None 1 ctx 1a
25 depends_on = None 1 ctx 1a
26
27 DEFAULT_BODY = textwrap . dedent ( """ 1 ctx 1a
28 Flow run {{ flow.name }}/{{ flow_run.name }} observed in state `{{ flow_run.state.name }}` at {{ flow_run.state.timestamp }}.
29 Flow ID: {{ flow_run.flow_id }}
30 Flow run ID: {{ flow_run.id }}
31 Flow run URL: {{ flow_run|ui_url }}
32 State message: {{ flow_run.state.message }}
33 """ )
34
35 PLACEHOLDER_MAP = { 1 ctx 1a
36 "flow_run_notification_policy_id" : "Event ID {{ event.id }}" ,
37 "flow_id" : "{{ flow.id }}" ,
38 "flow_name" : "{{ flow.name }}" ,
39 "flow_run_url" : "{{ flow_run|ui_url }}" ,
40 "flow_run_id" : "{{ flow_run.id }}" ,
41 "flow_run_name" : "{{ flow_run.name }}" ,
42 "flow_run_parameters" : "{{ flow_run.parameters }}" ,
43 "flow_run_state_type" : "{{ flow_run.state.type }}" ,
44 "flow_run_state_name" : "{{ flow_run.state.name }}" ,
45 "flow_run_state_timestamp" : "{{ flow_run.state.timestamp }}" ,
46 "flow_run_state_message" : "{{ flow_run.state.message }}" ,
47 }
48
49
50 def upgrade ( ) : 1 ctx 1a
51 conn = op . get_bind ( ) 1 ctx 1a
52 with op . get_context ( ) . autocommit_block ( ) : 1 ctx 1a
53 rows = conn . execute ( 1 ctx 1a
54 sa . text (
55 "SELECT id, is_active, state_names, tags, message_template, block_document_id FROM flow_run_notification_policy"
56 ) ,
57 ) . fetchall ( )
58 if len ( rows ) > 0 : 58 ↛ 59 line 58 didn't jump to line 59 because the condition on line 58 was never true 1 ctx 1a
59 for row in rows :
60 row : sa . Row [ tuple [ UUID , bool , list [ str ] , list [ str ] , str | None , UUID ] ]
61
62 is_active = row [ 1 ]
63 state_names = json . loads ( row [ 2 ] )
64 tags = json . loads ( row [ 3 ] )
65 message_template = row [ 4 ]
66 block_document_id = row [ 5 ]
67
68 trigger = {
69 "id" : str ( uuid4 ( ) ) ,
70 "type" : "event" ,
71 "after" : [ ] ,
72 "match" : { "prefect.resource.id" : "prefect.flow-run.*" } ,
73 "expect" : list (
74 { f" prefect.flow-run. { state_name } " for state_name in state_names }
75 )
76 if state_names
77 else [ "prefect.flow-run.*" ] ,
78 "within" : 10 ,
79 "posture" : "Reactive" ,
80 "for_each" : [ "prefect.resource.id" ] ,
81 "threshold" : 1 ,
82 "match_related" : {
83 "prefect.resource.id" : [ f" prefect.tag. { tag } " for tag in tags ] ,
84 "prefect.resource.role" : "tag" ,
85 }
86 if tags
87 else { } ,
88 }
89
90 actions = [
91 {
92 "body" : message_template . format ( ** PLACEHOLDER_MAP )
93 if message_template
94 else DEFAULT_BODY ,
95 "type" : "send-notification" ,
96 "subject" : "Prefect flow run notification" ,
97 "block_document_id" : str ( block_document_id ) ,
98 }
99 ]
100
101 conn . execute (
102 sa . text (
103 "INSERT INTO automation (name, description, enabled, trigger, actions) VALUES (:name, :description, :enabled, :trigger, :actions)" ,
104 ) ,
105 {
106 "name" : "Flow Run State Change Notification" ,
107 "description" : "Migrated from a flow run notification policy" ,
108 "enabled" : is_active ,
109 "trigger" : json . dumps ( trigger ) ,
110 "actions" : json . dumps ( actions ) ,
111 } ,
112 )
113
114 conn . execute (
115 sa . text ( "DELETE FROM flow_run_notification_policy WHERE id = :id" ) ,
116 { "id" : row [ 0 ] } ,
117 )
118
119 get_logger ( ) . info (
120 f" Your { len ( rows ) } flow run notification policies have been migrated to automations. You can view the created automations in the Prefect UI. "
121 )
122
123 op . drop_table ( "flow_run_notification_queue" ) 1 ctx 1a
124 op . drop_table ( "flow_run_notification_policy" ) 1 ctx 1a
125
126
127 def downgrade ( ) : 1 ctx 1a
128 op . create_table (
129 "flow_run_notification_queue" ,
130 sa . Column (
131 "id" ,
132 prefect . server . utilities . database . UUID ( ) ,
133 server_default = sa . text (
134 "(\n (\n lower(hex(randomblob(4))) \n || '-' \n "
135 " || lower(hex(randomblob(2))) \n || '-4' \n ||"
136 " substr(lower(hex(randomblob(2))),2) \n || '-' \n ||"
137 " substr('89ab',abs(random()) % 4 + 1, 1) \n ||"
138 " substr(lower(hex(randomblob(2))),2) \n || '-' \n ||"
139 " lower(hex(randomblob(6)))\n )\n )"
140 ) ,
141 nullable = False ,
142 ) ,
143 sa . Column (
144 "created" ,
145 prefect . server . utilities . database . Timestamp ( timezone = True ) ,
146 server_default = sa . text ( "(strftime('%Y-%m-%d %H:%M:%f000', 'now'))" ) ,
147 nullable = False ,
148 ) ,
149 sa . Column (
150 "updated" ,
151 prefect . server . utilities . database . Timestamp ( timezone = True ) ,
152 server_default = sa . text ( "(strftime('%Y-%m-%d %H:%M:%f000', 'now'))" ) ,
153 nullable = False ,
154 ) ,
155 sa . Column (
156 "flow_run_notification_policy_id" ,
157 prefect . server . utilities . database . UUID ( ) ,
158 nullable = False ,
159 ) ,
160 sa . Column (
161 "flow_run_state_id" ,
162 prefect . server . utilities . database . UUID ( ) ,
163 nullable = False ,
164 ) ,
165 sa . PrimaryKeyConstraint ( "id" , name = op . f ( "pk_flow_run_notification_queue" ) ) ,
166 )
167 op . create_index (
168 op . f ( "ix_flow_run_notification_queue__updated" ) ,
169 "flow_run_notification_queue" ,
170 [ "updated" ] ,
171 unique = False ,
172 )
173 op . create_table (
174 "flow_run_notification_policy" ,
175 sa . Column (
176 "id" ,
177 prefect . server . utilities . database . UUID ( ) ,
178 server_default = sa . text (
179 "(\n (\n lower(hex(randomblob(4))) \n || '-' \n "
180 " || lower(hex(randomblob(2))) \n || '-4' \n ||"
181 " substr(lower(hex(randomblob(2))),2) \n || '-' \n ||"
182 " substr('89ab',abs(random()) % 4 + 1, 1) \n ||"
183 " substr(lower(hex(randomblob(2))),2) \n || '-' \n ||"
184 " lower(hex(randomblob(6)))\n )\n )"
185 ) ,
186 nullable = False ,
187 ) ,
188 sa . Column (
189 "created" ,
190 prefect . server . utilities . database . Timestamp ( timezone = True ) ,
191 server_default = sa . text ( "(strftime('%Y-%m-%d %H:%M:%f000', 'now'))" ) ,
192 nullable = False ,
193 ) ,
194 sa . Column (
195 "updated" ,
196 prefect . server . utilities . database . Timestamp ( timezone = True ) ,
197 server_default = sa . text ( "(strftime('%Y-%m-%d %H:%M:%f000', 'now'))" ) ,
198 nullable = False ,
199 ) ,
200 sa . Column ( "is_active" , sa . Boolean ( ) , server_default = "1" , nullable = False ) ,
201 sa . Column (
202 "state_names" ,
203 prefect . server . utilities . database . JSON ( astext_type = sa . Text ( ) ) ,
204 server_default = "[]" ,
205 nullable = False ,
206 ) ,
207 sa . Column (
208 "tags" ,
209 prefect . server . utilities . database . JSON ( astext_type = sa . Text ( ) ) ,
210 server_default = "[]" ,
211 nullable = False ,
212 ) ,
213 sa . Column ( "message_template" , sa . String ( ) , nullable = True ) ,
214 sa . Column (
215 "block_document_id" ,
216 prefect . server . utilities . database . UUID ( ) ,
217 nullable = False ,
218 ) ,
219 sa . ForeignKeyConstraint (
220 [ "block_document_id" ] ,
221 [ "block_document.id" ] ,
222 name = op . f (
223 "fk_flow_run_notification_policy__block_document_id__block_document"
224 ) ,
225 ondelete = "cascade" ,
226 ) ,
227 sa . PrimaryKeyConstraint ( "id" , name = op . f ( "pk_flow_run_notification_policy" ) ) ,
228 )
229 op . create_index (
230 op . f ( "ix_flow_run_notification_policy__updated" ) ,
231 "flow_run_notification_policy" ,
232 [ "updated" ] ,
233 unique = False ,
234 )