Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/database/_migrations/versions/sqlite/2025_04_28_133722_7655f31c5157_remove_flow_run_notifications.py: 56%

37 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1"""Remove flow run notifications 

2 

3Revision ID: 7655f31c5157 

4Revises: bbca16f6f218 

5Create Date: 2025-04-28 13:37:22.112876 

6 

7""" 

8 

9from __future__ import annotations 1a

10 

11import json 1a

12import textwrap 1a

13from uuid import UUID, uuid4 1a

14 

15import sqlalchemy as sa 1a

16from alembic import op 1a

17 

18import prefect 1a

19from prefect.logging.loggers import get_logger 1a

20 

21# revision identifiers, used by Alembic. 

22revision = "7655f31c5157" 1a

23down_revision = "bbca16f6f218" 1a

24branch_labels = None 1a

25depends_on = None 1a

26 

27DEFAULT_BODY = textwrap.dedent(""" 1a

28Flow run {{ flow.name }}/{{ flow_run.name }} observed in state `{{ flow_run.state.name }}` at {{ flow_run.state.timestamp }}. 

29Flow ID: {{ flow_run.flow_id }} 

30Flow run ID: {{ flow_run.id }} 

31Flow run URL: {{ flow_run|ui_url }} 

32State message: {{ flow_run.state.message }} 

33""") 

34 

35PLACEHOLDER_MAP = { 1a

36 "flow_run_notification_policy_id": "Event ID {{ event.id }}", 

37 "flow_id": "{{ flow.id }}", 

38 "flow_name": "{{ flow.name }}", 

39 "flow_run_url": "{{ flow_run|ui_url }}", 

40 "flow_run_id": "{{ flow_run.id }}", 

41 "flow_run_name": "{{ flow_run.name }}", 

42 "flow_run_parameters": "{{ flow_run.parameters }}", 

43 "flow_run_state_type": "{{ flow_run.state.type }}", 

44 "flow_run_state_name": "{{ flow_run.state.name }}", 

45 "flow_run_state_timestamp": "{{ flow_run.state.timestamp }}", 

46 "flow_run_state_message": "{{ flow_run.state.message }}", 

47} 

48 

49 

50def upgrade(): 1a

51 conn = op.get_bind() 1a

52 with op.get_context().autocommit_block(): 1a

53 rows = conn.execute( 1a

54 sa.text( 

55 "SELECT id, is_active, state_names, tags, message_template, block_document_id FROM flow_run_notification_policy" 

56 ), 

57 ).fetchall() 

58 if len(rows) > 0: 58 ↛ 59line 58 didn't jump to line 59 because the condition on line 58 was never true1a

59 for row in rows: 

60 row: sa.Row[tuple[UUID, bool, list[str], list[str], str | None, UUID]] 

61 

62 is_active = row[1] 

63 state_names = json.loads(row[2]) 

64 tags = json.loads(row[3]) 

65 message_template = row[4] 

66 block_document_id = row[5] 

67 

68 trigger = { 

69 "id": str(uuid4()), 

70 "type": "event", 

71 "after": [], 

72 "match": {"prefect.resource.id": "prefect.flow-run.*"}, 

73 "expect": list( 

74 {f"prefect.flow-run.{state_name}" for state_name in state_names} 

75 ) 

76 if state_names 

77 else ["prefect.flow-run.*"], 

78 "within": 10, 

79 "posture": "Reactive", 

80 "for_each": ["prefect.resource.id"], 

81 "threshold": 1, 

82 "match_related": { 

83 "prefect.resource.id": [f"prefect.tag.{tag}" for tag in tags], 

84 "prefect.resource.role": "tag", 

85 } 

86 if tags 

87 else {}, 

88 } 

89 

90 actions = [ 

91 { 

92 "body": message_template.format(**PLACEHOLDER_MAP) 

93 if message_template 

94 else DEFAULT_BODY, 

95 "type": "send-notification", 

96 "subject": "Prefect flow run notification", 

97 "block_document_id": str(block_document_id), 

98 } 

99 ] 

100 

101 conn.execute( 

102 sa.text( 

103 "INSERT INTO automation (name, description, enabled, trigger, actions) VALUES (:name, :description, :enabled, :trigger, :actions)", 

104 ), 

105 { 

106 "name": "Flow Run State Change Notification", 

107 "description": "Migrated from a flow run notification policy", 

108 "enabled": is_active, 

109 "trigger": json.dumps(trigger), 

110 "actions": json.dumps(actions), 

111 }, 

112 ) 

113 

114 conn.execute( 

115 sa.text("DELETE FROM flow_run_notification_policy WHERE id = :id"), 

116 {"id": row[0]}, 

117 ) 

118 

119 get_logger().info( 

120 f"Your {len(rows)} flow run notification policies have been migrated to automations. You can view the created automations in the Prefect UI." 

121 ) 

122 

123 op.drop_table("flow_run_notification_queue") 1a

124 op.drop_table("flow_run_notification_policy") 1a

125 

126 

127def downgrade(): 1a

128 op.create_table( 

129 "flow_run_notification_queue", 

130 sa.Column( 

131 "id", 

132 prefect.server.utilities.database.UUID(), 

133 server_default=sa.text( 

134 "(\n (\n lower(hex(randomblob(4))) \n || '-' \n " 

135 " || lower(hex(randomblob(2))) \n || '-4' \n ||" 

136 " substr(lower(hex(randomblob(2))),2) \n || '-' \n ||" 

137 " substr('89ab',abs(random()) % 4 + 1, 1) \n ||" 

138 " substr(lower(hex(randomblob(2))),2) \n || '-' \n ||" 

139 " lower(hex(randomblob(6)))\n )\n )" 

140 ), 

141 nullable=False, 

142 ), 

143 sa.Column( 

144 "created", 

145 prefect.server.utilities.database.Timestamp(timezone=True), 

146 server_default=sa.text("(strftime('%Y-%m-%d %H:%M:%f000', 'now'))"), 

147 nullable=False, 

148 ), 

149 sa.Column( 

150 "updated", 

151 prefect.server.utilities.database.Timestamp(timezone=True), 

152 server_default=sa.text("(strftime('%Y-%m-%d %H:%M:%f000', 'now'))"), 

153 nullable=False, 

154 ), 

155 sa.Column( 

156 "flow_run_notification_policy_id", 

157 prefect.server.utilities.database.UUID(), 

158 nullable=False, 

159 ), 

160 sa.Column( 

161 "flow_run_state_id", 

162 prefect.server.utilities.database.UUID(), 

163 nullable=False, 

164 ), 

165 sa.PrimaryKeyConstraint("id", name=op.f("pk_flow_run_notification_queue")), 

166 ) 

167 op.create_index( 

168 op.f("ix_flow_run_notification_queue__updated"), 

169 "flow_run_notification_queue", 

170 ["updated"], 

171 unique=False, 

172 ) 

173 op.create_table( 

174 "flow_run_notification_policy", 

175 sa.Column( 

176 "id", 

177 prefect.server.utilities.database.UUID(), 

178 server_default=sa.text( 

179 "(\n (\n lower(hex(randomblob(4))) \n || '-' \n " 

180 " || lower(hex(randomblob(2))) \n || '-4' \n ||" 

181 " substr(lower(hex(randomblob(2))),2) \n || '-' \n ||" 

182 " substr('89ab',abs(random()) % 4 + 1, 1) \n ||" 

183 " substr(lower(hex(randomblob(2))),2) \n || '-' \n ||" 

184 " lower(hex(randomblob(6)))\n )\n )" 

185 ), 

186 nullable=False, 

187 ), 

188 sa.Column( 

189 "created", 

190 prefect.server.utilities.database.Timestamp(timezone=True), 

191 server_default=sa.text("(strftime('%Y-%m-%d %H:%M:%f000', 'now'))"), 

192 nullable=False, 

193 ), 

194 sa.Column( 

195 "updated", 

196 prefect.server.utilities.database.Timestamp(timezone=True), 

197 server_default=sa.text("(strftime('%Y-%m-%d %H:%M:%f000', 'now'))"), 

198 nullable=False, 

199 ), 

200 sa.Column("is_active", sa.Boolean(), server_default="1", nullable=False), 

201 sa.Column( 

202 "state_names", 

203 prefect.server.utilities.database.JSON(astext_type=sa.Text()), 

204 server_default="[]", 

205 nullable=False, 

206 ), 

207 sa.Column( 

208 "tags", 

209 prefect.server.utilities.database.JSON(astext_type=sa.Text()), 

210 server_default="[]", 

211 nullable=False, 

212 ), 

213 sa.Column("message_template", sa.String(), nullable=True), 

214 sa.Column( 

215 "block_document_id", 

216 prefect.server.utilities.database.UUID(), 

217 nullable=False, 

218 ), 

219 sa.ForeignKeyConstraint( 

220 ["block_document_id"], 

221 ["block_document.id"], 

222 name=op.f( 

223 "fk_flow_run_notification_policy__block_document_id__block_document" 

224 ), 

225 ondelete="cascade", 

226 ), 

227 sa.PrimaryKeyConstraint("id", name=op.f("pk_flow_run_notification_policy")), 

228 ) 

229 op.create_index( 

230 op.f("ix_flow_run_notification_policy__updated"), 

231 "flow_run_notification_policy", 

232 ["updated"], 

233 unique=False, 

234 )