Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/database/_migrations/versions/sqlite/2022_07_06_142824_e2dae764a603_migrates_block_schemas_with_new_secrets_.py: 61%

28 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1"""Migrates block schemas with new secrets fields 

2 

3Revision ID: e2dae764a603 

4Revises: 3bd87ecdac38 

5Create Date: 2022-07-06 14:28:24.493390 

6 

7""" 

8 

9import sqlalchemy as sa 1a

10from alembic import op 1a

11 

12# revision identifiers, used by Alembic. 

13revision = "e2dae764a603" 1a

14down_revision = "3bd87ecdac38" 1a

15branch_labels = None 1a

16depends_on = None 1a

17 

18BLOCKS_TO_MIGRATE = [ 1a

19 { 

20 "BLOCK_TYPE_NAME": "S3 Storage", 

21 "OLD_CHECKSUM": ( 

22 "sha256:3ffda32926202a34fc30d7618e4792f71088d37fbcf92c18d070104ca6da7431" 

23 ), 

24 "NEW_CHECKSUM": ( 

25 "sha256:68ed6efe6ab724c5f36519803012af68a22965eccfa5944c94fa809b8b9a6e04" 

26 ), 

27 "CAPABILITIES": ["readable", "storage", "writeable"], 

28 "NEW_FIELDS": { 

29 "title": "S3StorageBlock", 

30 "description": "Store data in an AWS S3 bucket.", 

31 "type": "object", 

32 "properties": { 

33 "bucket": {"title": "Bucket", "type": "string"}, 

34 "aws_access_key_id": {"title": "Aws Access Key Id", "type": "string"}, 

35 "aws_secret_access_key": { 

36 "title": "Aws Secret Access Key", 

37 "type": "string", 

38 "writeOnly": True, 

39 "format": "password", 

40 }, 

41 "aws_session_token": {"title": "Aws Session Token", "type": "string"}, 

42 "profile_name": {"title": "Profile Name", "type": "string"}, 

43 "region_name": {"title": "Region Name", "type": "string"}, 

44 }, 

45 "required": ["bucket"], 

46 "block_type_name": "S3 Storage", 

47 "secret_fields": ["aws_secret_access_key"], 

48 "block_schema_references": {}, 

49 }, 

50 }, 

51 { 

52 "BLOCK_TYPE_NAME": "Azure Blob Storage", 

53 "OLD_CHECKSUM": ( 

54 "sha256:4488e8f7d196f7627e3ead24ca136860f0a54d54f6c98533cf3ef2f4ba9cf51b" 

55 ), 

56 "NEW_CHECKSUM": ( 

57 "sha256:2aef5e384a1f4a2d8dd0ff8c3b96d2c5eb5852462078b6915f7d756847341a42" 

58 ), 

59 "CAPABILITIES": ["readable", "storage", "writeable"], 

60 "NEW_FIELDS": { 

61 "title": "AzureBlobStorageBlock", 

62 "description": "Store data in an Azure blob storage container.", 

63 "type": "object", 

64 "properties": { 

65 "container": {"title": "Container", "type": "string"}, 

66 "connection_string": { 

67 "title": "Connection String", 

68 "type": "string", 

69 "writeOnly": True, 

70 "format": "password", 

71 }, 

72 }, 

73 "required": ["container", "connection_string"], 

74 "block_type_name": "Azure Blob Storage", 

75 "secret_fields": ["connection_string"], 

76 "block_schema_references": {}, 

77 }, 

78 }, 

79] 

80 

81 

82def upgrade(): 1a

83 # ### commands auto generated by Alembic - please adjust! ### 

84 connection = op.get_bind() 1a

85 meta_data = sa.MetaData() 1a

86 meta_data.reflect(connection) 1a

87 BLOCK_TYPE = meta_data.tables["block_type"] 1a

88 BLOCK_SCHEMA = meta_data.tables["block_schema"] 1a

89 BLOCK_DOCUMENT = meta_data.tables["block_document"] 1a

90 

91 for block_migration_config in BLOCKS_TO_MIGRATE: 1a

92 block_type_result = connection.execute( 1a

93 sa.select(BLOCK_TYPE.c.id).where( 

94 BLOCK_TYPE.c.name == block_migration_config["BLOCK_TYPE_NAME"] 

95 ) 

96 ).first() 

97 old_block_schema_result = connection.execute( 1a

98 sa.select(BLOCK_SCHEMA.c.id).where( 

99 BLOCK_SCHEMA.c.checksum == block_migration_config["OLD_CHECKSUM"] 

100 ) 

101 ).first() 

102 # Only run migration for this block if the type and old schema already exist 

103 if block_type_result is not None and old_block_schema_result is not None: 103 ↛ 105line 103 didn't jump to line 105 because the condition on line 103 was never true1a

104 # Check if new version of the schema is present 

105 new_block_schema_result = connection.execute( 

106 sa.select(BLOCK_SCHEMA.c.id).where( 

107 BLOCK_SCHEMA.c.checksum == block_migration_config["NEW_CHECKSUM"] 

108 ) 

109 ).first() 

110 if new_block_schema_result is None: 

111 # Create new schema if not present 

112 connection.execute( 

113 sa.insert(BLOCK_SCHEMA).values( 

114 checksum=block_migration_config["NEW_CHECKSUM"], 

115 fields=block_migration_config["NEW_FIELDS"], 

116 block_type_id=block_type_result[0], 

117 capabilities=block_migration_config["CAPABILITIES"], 

118 ) 

119 ) 

120 new_block_schema_result = connection.execute( 

121 sa.select(BLOCK_SCHEMA.c.id).where( 

122 BLOCK_SCHEMA.c.checksum 

123 == block_migration_config["NEW_CHECKSUM"] 

124 ) 

125 ).first() 

126 # Get all block documents that use the old block schema 

127 existing_block_documents_result = connection.execute( 

128 sa.select(BLOCK_DOCUMENT.c.id).where( 

129 BLOCK_DOCUMENT.c.block_schema_id == old_block_schema_result[0] 

130 ) 

131 ).all() 

132 # Update all block documents using the old block schema to use new block schema 

133 for block_document in existing_block_documents_result: 

134 connection.execute( 

135 sa.update(BLOCK_DOCUMENT) 

136 .where(BLOCK_DOCUMENT.c.id == block_document[0]) 

137 .values(block_schema_id=new_block_schema_result[0]) 

138 ) 

139 # Remove the old unused block schema 

140 connection.execute( 

141 sa.delete(BLOCK_SCHEMA).where( 

142 BLOCK_SCHEMA.c.id == old_block_schema_result[0] 

143 ) 

144 ) 

145 # ### end Alembic commands ### 

146 

147 

148def downgrade(): 1a

149 # Purely a data migration for 2.0b8. No downgrade necessary. 

150 pass