1 """Adds block schema references and block document references
2
3 Revision ID: e73c6f1fe752
4 Revises: 33439667aeea
5 Create Date: 2022-05-28 08:16:50.141505
6
7 """
8
9 import sqlalchemy as sa 1 ctx 1a
10 from alembic import op 1 ctx 1a
11
12 import prefect 1 ctx 1a
13 from prefect . blocks . core import Block 1 ctx 1a
14
15 # revision identifiers, used by Alembic.
16 revision = "e73c6f1fe752" 1 ctx 1a
17 down_revision = "33439667aeea" 1 ctx 1a
18 branch_labels = None 1 ctx 1a
19 depends_on = None 1 ctx 1a
20
21 # Used to update titles of existing storage block schemas. Need to update titles to match
22 # what the client generates to ensure the same checksum is generated on client and server.
23 BLOCK_SCHEMA_TITLE_MAP = { 1 ctx 1a
24 "S3 Storage" : "S3StorageBlock" ,
25 "Temporary Local Storage" : "TempStorageBlock" ,
26 "Local Storage" : "LocalStorageBlock" ,
27 "Google Cloud Storage" : "GoogleCloudStorageBlock" ,
28 "Azure Blob Storage" : "AzureBlobStorageBlock" ,
29 "KV Server Storage" : "KVServerStorageBlock" ,
30 "File Storage" : "FileStorageBlock" ,
31 }
32
33
34 def upgrade ( ) : 1 ctx 1a
35 # ### commands auto generated by Alembic - please adjust! ###
36 op . create_table ( 1 ctx 1a
37 "block_schema_reference" ,
38 sa . Column (
39 "id" ,
40 prefect . server . utilities . database . UUID ( ) ,
41 server_default = sa . text (
42 "(\n (\n lower(hex(randomblob(4))) \n || '-' \n "
43 " || lower(hex(randomblob(2))) \n || '-4' \n ||"
44 " substr(lower(hex(randomblob(2))),2) \n || '-' \n ||"
45 " substr('89ab',abs(random()) % 4 + 1, 1) \n ||"
46 " substr(lower(hex(randomblob(2))),2) \n || '-' \n ||"
47 " lower(hex(randomblob(6)))\n )\n )"
48 ) ,
49 nullable = False ,
50 ) ,
51 sa . Column (
52 "created" ,
53 prefect . server . utilities . database . Timestamp ( timezone = True ) ,
54 server_default = sa . text ( "(strftime('%Y-%m-%d %H:%M:%f000', 'now'))" ) ,
55 nullable = False ,
56 ) ,
57 sa . Column (
58 "updated" ,
59 prefect . server . utilities . database . Timestamp ( timezone = True ) ,
60 server_default = sa . text ( "(strftime('%Y-%m-%d %H:%M:%f000', 'now'))" ) ,
61 nullable = False ,
62 ) ,
63 sa . Column ( "name" , sa . String ( ) , nullable = False ) ,
64 sa . Column (
65 "parent_block_schema_id" ,
66 prefect . server . utilities . database . UUID ( ) ,
67 nullable = False ,
68 ) ,
69 sa . Column (
70 "reference_block_schema_id" ,
71 prefect . server . utilities . database . UUID ( ) ,
72 nullable = False ,
73 ) ,
74 sa . ForeignKeyConstraint (
75 [ "parent_block_schema_id" ] ,
76 [ "block_schema.id" ] ,
77 name = op . f (
78 "fk_block_schema_reference__parent_block_schema_id__block_schema"
79 ) ,
80 ondelete = "cascade" ,
81 ) ,
82 sa . ForeignKeyConstraint (
83 [ "reference_block_schema_id" ] ,
84 [ "block_schema.id" ] ,
85 name = op . f (
86 "fk_block_schema_reference__reference_block_schema_id__block_schema"
87 ) ,
88 ondelete = "cascade" ,
89 ) ,
90 sa . PrimaryKeyConstraint ( "id" , name = op . f ( "pk_block_schema_reference" ) ) ,
91 )
92 op . create_index ( 1 ctx 1a
93 op . f ( "ix_block_schema_reference__updated" ) ,
94 "block_schema_reference" ,
95 [ "updated" ] ,
96 unique = False ,
97 )
98 op . create_table ( 1 ctx 1a
99 "block_document_reference" ,
100 sa . Column (
101 "id" ,
102 prefect . server . utilities . database . UUID ( ) ,
103 server_default = sa . text (
104 "(\n (\n lower(hex(randomblob(4))) \n || '-' \n "
105 " || lower(hex(randomblob(2))) \n || '-4' \n ||"
106 " substr(lower(hex(randomblob(2))),2) \n || '-' \n ||"
107 " substr('89ab',abs(random()) % 4 + 1, 1) \n ||"
108 " substr(lower(hex(randomblob(2))),2) \n || '-' \n ||"
109 " lower(hex(randomblob(6)))\n )\n )"
110 ) ,
111 nullable = False ,
112 ) ,
113 sa . Column (
114 "created" ,
115 prefect . server . utilities . database . Timestamp ( timezone = True ) ,
116 server_default = sa . text ( "(strftime('%Y-%m-%d %H:%M:%f000', 'now'))" ) ,
117 nullable = False ,
118 ) ,
119 sa . Column (
120 "updated" ,
121 prefect . server . utilities . database . Timestamp ( timezone = True ) ,
122 server_default = sa . text ( "(strftime('%Y-%m-%d %H:%M:%f000', 'now'))" ) ,
123 nullable = False ,
124 ) ,
125 sa . Column ( "name" , sa . String ( ) , nullable = False ) ,
126 sa . Column (
127 "parent_block_document_id" ,
128 prefect . server . utilities . database . UUID ( ) ,
129 nullable = False ,
130 ) ,
131 sa . Column (
132 "reference_block_document_id" ,
133 prefect . server . utilities . database . UUID ( ) ,
134 nullable = False ,
135 ) ,
136 sa . ForeignKeyConstraint (
137 [ "parent_block_document_id" ] ,
138 [ "block_document.id" ] ,
139 name = op . f (
140 "fk_block_document_reference__parent_block_document_id__block_document"
141 ) ,
142 ondelete = "cascade" ,
143 ) ,
144 sa . ForeignKeyConstraint (
145 [ "reference_block_document_id" ] ,
146 [ "block_document.id" ] ,
147 name = op . f (
148 "fk_block_document_reference__reference_block_document_id__block_document"
149 ) ,
150 ondelete = "cascade" ,
151 ) ,
152 sa . PrimaryKeyConstraint ( "id" , name = op . f ( "pk_block_document_reference" ) ) ,
153 )
154 op . create_index ( 1 ctx 1a
155 op . f ( "ix_block_document_reference__updated" ) ,
156 "block_document_reference" ,
157 [ "updated" ] ,
158 unique = False ,
159 )
160
161 # Update existing schemas to account for block schema references
162 connection = op . get_bind ( ) 1 ctx 1a
163 meta_data = sa . MetaData ( ) 1 ctx 1a
164 meta_data . reflect ( connection ) 1 ctx 1a
165 BLOCK_SCHEMA = meta_data . tables [ "block_schema" ] 1 ctx 1a
166 BLOCK_TYPE = meta_data . tables [ "block_type" ] 1 ctx 1a
167
168 block_schemas = connection . execute ( 1 ctx 1a
169 sa . select (
170 BLOCK_SCHEMA . c . id , BLOCK_SCHEMA . c . fields , BLOCK_SCHEMA . c . block_type_id
171 )
172 )
173
174 for id , fields , block_type_id in block_schemas : 174 ↛ 175 line 174 didn't jump to line 175 because the loop on line 174 never started 1 ctx 1a
175 block_type_result = connection . execute (
176 sa . select ( BLOCK_TYPE . c . name ) . where ( BLOCK_TYPE . c . id == block_type_id )
177 ) . first ( )
178 block_type_name = block_type_result [ 0 ]
179 updated_fields = {
180 ** fields ,
181 "block_type_name" : block_type_name ,
182 "block_schema_references" : { } ,
183 }
184 updated_title = BLOCK_SCHEMA_TITLE_MAP . get ( block_type_name )
185 if updated_title is not None :
186 updated_fields [ "title" ] = updated_title
187 updated_checksum = Block . _calculate_schema_checksum ( updated_fields )
188 connection . execute (
189 sa . update ( BLOCK_SCHEMA )
190 . where ( BLOCK_SCHEMA . c . id == id )
191 . values ( fields = updated_fields , checksum = updated_checksum )
192 )
193 # ### end Alembic commands ###
194
195
196 def downgrade ( ) : 1 ctx 1a
197 # ### commands auto generated by Alembic - please adjust! ###
198 op . drop_index (
199 op . f ( "ix_block_document_reference__updated" ) ,
200 table_name = "block_document_reference" ,
201 )
202 op . drop_table ( "block_document_reference" )
203 op . drop_index (
204 op . f ( "ix_block_schema_reference__updated" ) , table_name = "block_schema_reference"
205 )
206 op . drop_table ( "block_schema_reference" )
207 # ### end Alembic commands ###