Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/database/_migrations/versions/sqlite/2022_05_28_081650_e73c6f1fe752_adds_block_schema_referecnes_and_block_.py: 61%

34 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1"""Adds block schema references and block document references 

2 

3Revision ID: e73c6f1fe752 

4Revises: 33439667aeea 

5Create Date: 2022-05-28 08:16:50.141505 

6 

7""" 

8 

9import sqlalchemy as sa 1a

10from alembic import op 1a

11 

12import prefect 1a

13from prefect.blocks.core import Block 1a

14 

15# revision identifiers, used by Alembic. 

16revision = "e73c6f1fe752" 1a

17down_revision = "33439667aeea" 1a

18branch_labels = None 1a

19depends_on = None 1a

20 

21# Used to update titles of existing storage block schemas. Need to update titles to match 

22# what the client generates to ensure the same checksum is generated on client and server. 

23BLOCK_SCHEMA_TITLE_MAP = { 1a

24 "S3 Storage": "S3StorageBlock", 

25 "Temporary Local Storage": "TempStorageBlock", 

26 "Local Storage": "LocalStorageBlock", 

27 "Google Cloud Storage": "GoogleCloudStorageBlock", 

28 "Azure Blob Storage": "AzureBlobStorageBlock", 

29 "KV Server Storage": "KVServerStorageBlock", 

30 "File Storage": "FileStorageBlock", 

31} 

32 

33 

34def upgrade(): 1a

35 # ### commands auto generated by Alembic - please adjust! ### 

36 op.create_table( 1a

37 "block_schema_reference", 

38 sa.Column( 

39 "id", 

40 prefect.server.utilities.database.UUID(), 

41 server_default=sa.text( 

42 "(\n (\n lower(hex(randomblob(4))) \n || '-' \n " 

43 " || lower(hex(randomblob(2))) \n || '-4' \n ||" 

44 " substr(lower(hex(randomblob(2))),2) \n || '-' \n ||" 

45 " substr('89ab',abs(random()) % 4 + 1, 1) \n ||" 

46 " substr(lower(hex(randomblob(2))),2) \n || '-' \n ||" 

47 " lower(hex(randomblob(6)))\n )\n )" 

48 ), 

49 nullable=False, 

50 ), 

51 sa.Column( 

52 "created", 

53 prefect.server.utilities.database.Timestamp(timezone=True), 

54 server_default=sa.text("(strftime('%Y-%m-%d %H:%M:%f000', 'now'))"), 

55 nullable=False, 

56 ), 

57 sa.Column( 

58 "updated", 

59 prefect.server.utilities.database.Timestamp(timezone=True), 

60 server_default=sa.text("(strftime('%Y-%m-%d %H:%M:%f000', 'now'))"), 

61 nullable=False, 

62 ), 

63 sa.Column("name", sa.String(), nullable=False), 

64 sa.Column( 

65 "parent_block_schema_id", 

66 prefect.server.utilities.database.UUID(), 

67 nullable=False, 

68 ), 

69 sa.Column( 

70 "reference_block_schema_id", 

71 prefect.server.utilities.database.UUID(), 

72 nullable=False, 

73 ), 

74 sa.ForeignKeyConstraint( 

75 ["parent_block_schema_id"], 

76 ["block_schema.id"], 

77 name=op.f( 

78 "fk_block_schema_reference__parent_block_schema_id__block_schema" 

79 ), 

80 ondelete="cascade", 

81 ), 

82 sa.ForeignKeyConstraint( 

83 ["reference_block_schema_id"], 

84 ["block_schema.id"], 

85 name=op.f( 

86 "fk_block_schema_reference__reference_block_schema_id__block_schema" 

87 ), 

88 ondelete="cascade", 

89 ), 

90 sa.PrimaryKeyConstraint("id", name=op.f("pk_block_schema_reference")), 

91 ) 

92 op.create_index( 1a

93 op.f("ix_block_schema_reference__updated"), 

94 "block_schema_reference", 

95 ["updated"], 

96 unique=False, 

97 ) 

98 op.create_table( 1a

99 "block_document_reference", 

100 sa.Column( 

101 "id", 

102 prefect.server.utilities.database.UUID(), 

103 server_default=sa.text( 

104 "(\n (\n lower(hex(randomblob(4))) \n || '-' \n " 

105 " || lower(hex(randomblob(2))) \n || '-4' \n ||" 

106 " substr(lower(hex(randomblob(2))),2) \n || '-' \n ||" 

107 " substr('89ab',abs(random()) % 4 + 1, 1) \n ||" 

108 " substr(lower(hex(randomblob(2))),2) \n || '-' \n ||" 

109 " lower(hex(randomblob(6)))\n )\n )" 

110 ), 

111 nullable=False, 

112 ), 

113 sa.Column( 

114 "created", 

115 prefect.server.utilities.database.Timestamp(timezone=True), 

116 server_default=sa.text("(strftime('%Y-%m-%d %H:%M:%f000', 'now'))"), 

117 nullable=False, 

118 ), 

119 sa.Column( 

120 "updated", 

121 prefect.server.utilities.database.Timestamp(timezone=True), 

122 server_default=sa.text("(strftime('%Y-%m-%d %H:%M:%f000', 'now'))"), 

123 nullable=False, 

124 ), 

125 sa.Column("name", sa.String(), nullable=False), 

126 sa.Column( 

127 "parent_block_document_id", 

128 prefect.server.utilities.database.UUID(), 

129 nullable=False, 

130 ), 

131 sa.Column( 

132 "reference_block_document_id", 

133 prefect.server.utilities.database.UUID(), 

134 nullable=False, 

135 ), 

136 sa.ForeignKeyConstraint( 

137 ["parent_block_document_id"], 

138 ["block_document.id"], 

139 name=op.f( 

140 "fk_block_document_reference__parent_block_document_id__block_document" 

141 ), 

142 ondelete="cascade", 

143 ), 

144 sa.ForeignKeyConstraint( 

145 ["reference_block_document_id"], 

146 ["block_document.id"], 

147 name=op.f( 

148 "fk_block_document_reference__reference_block_document_id__block_document" 

149 ), 

150 ondelete="cascade", 

151 ), 

152 sa.PrimaryKeyConstraint("id", name=op.f("pk_block_document_reference")), 

153 ) 

154 op.create_index( 1a

155 op.f("ix_block_document_reference__updated"), 

156 "block_document_reference", 

157 ["updated"], 

158 unique=False, 

159 ) 

160 

161 # Update existing schemas to account for block schema references 

162 connection = op.get_bind() 1a

163 meta_data = sa.MetaData() 1a

164 meta_data.reflect(connection) 1a

165 BLOCK_SCHEMA = meta_data.tables["block_schema"] 1a

166 BLOCK_TYPE = meta_data.tables["block_type"] 1a

167 

168 block_schemas = connection.execute( 1a

169 sa.select( 

170 BLOCK_SCHEMA.c.id, BLOCK_SCHEMA.c.fields, BLOCK_SCHEMA.c.block_type_id 

171 ) 

172 ) 

173 

174 for id, fields, block_type_id in block_schemas: 174 ↛ 175line 174 didn't jump to line 175 because the loop on line 174 never started1a

175 block_type_result = connection.execute( 

176 sa.select(BLOCK_TYPE.c.name).where(BLOCK_TYPE.c.id == block_type_id) 

177 ).first() 

178 block_type_name = block_type_result[0] 

179 updated_fields = { 

180 **fields, 

181 "block_type_name": block_type_name, 

182 "block_schema_references": {}, 

183 } 

184 updated_title = BLOCK_SCHEMA_TITLE_MAP.get(block_type_name) 

185 if updated_title is not None: 

186 updated_fields["title"] = updated_title 

187 updated_checksum = Block._calculate_schema_checksum(updated_fields) 

188 connection.execute( 

189 sa.update(BLOCK_SCHEMA) 

190 .where(BLOCK_SCHEMA.c.id == id) 

191 .values(fields=updated_fields, checksum=updated_checksum) 

192 ) 

193 # ### end Alembic commands ### 

194 

195 

196def downgrade(): 1a

197 # ### commands auto generated by Alembic - please adjust! ### 

198 op.drop_index( 

199 op.f("ix_block_document_reference__updated"), 

200 table_name="block_document_reference", 

201 ) 

202 op.drop_table("block_document_reference") 

203 op.drop_index( 

204 op.f("ix_block_schema_reference__updated"), table_name="block_schema_reference" 

205 ) 

206 op.drop_table("block_schema_reference") 

207 # ### end Alembic commands ###