Coverage for polar/integrations/aws/s3/schemas.py: 62%
109 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 17:15 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 17:15 +0000
1import base64 1a
2import hashlib 1a
3from datetime import datetime 1a
4from typing import Any, Self 1a
6from pydantic import UUID4, computed_field 1a
8from polar.kit.schemas import IDSchema, Schema 1a
9from polar.kit.utils import human_readable_size 1a
10from polar.organization.schemas import OrganizationID 1a
13def get_downloadable_content_disposition(filename: str) -> str: 1a
14 return f'attachment; filename="{filename}"'
17class S3FileCreatePart(Schema): 1a
18 number: int 1a
19 chunk_start: int 1a
20 chunk_end: int 1a
22 checksum_sha256_base64: str | None = None 1a
24 def get_boto3_arguments(self) -> dict[str, Any]: 1a
25 if not self.checksum_sha256_base64:
26 return dict(PartNumber=self.number)
28 return dict(
29 PartNumber=self.number,
30 ChecksumAlgorithm="SHA256",
31 ChecksumSHA256=self.checksum_sha256_base64,
32 )
35class S3FileCreateMultipart(Schema): 1a
36 parts: list[S3FileCreatePart] 1a
39class S3FileCreate(Schema): 1a
40 organization_id: OrganizationID | None = None 1a
41 name: str 1a
42 mime_type: str 1a
43 size: int 1a
45 checksum_sha256_base64: str | None = None 1a
47 upload: S3FileCreateMultipart 1a
50class S3File(IDSchema, validate_assignment=True): 1a
51 organization_id: UUID4 1a
53 name: str 1a
54 path: str 1a
55 mime_type: str 1a
56 size: int 1a
58 # Provided by AWS S3
59 storage_version: str | None 1a
60 checksum_etag: str | None 1a
62 # Provided by us
63 checksum_sha256_base64: str | None 1a
64 checksum_sha256_hex: str | None 1a
66 last_modified_at: datetime | None 1a
68 @computed_field # type: ignore[prop-decorator] 1a
69 @property 1a
70 def size_readable(self) -> str: 1a
71 return human_readable_size(self.size)
73 def to_metadata(self) -> dict[str, str]: 1a
74 metadata = {
75 "polar-id": str(self.id),
76 "polar-organization-id": str(self.organization_id),
77 "polar-name": self.name.encode("ascii", "ignore").decode("ascii"),
78 "polar-size": str(self.size),
79 }
80 if self.checksum_sha256_base64:
81 metadata["polar-sha256-base64"] = self.checksum_sha256_base64
82 if self.checksum_sha256_hex:
83 metadata["polar-sha256-hex"] = self.checksum_sha256_hex
84 return metadata
86 @classmethod 1a
87 def from_head(cls, path: str, head: dict[str, Any]) -> Self: 1a
88 metadata = head.get("Metadata", {})
90 return cls(
91 id=metadata.get("polar-id"),
92 organization_id=metadata.get("polar-organization-id"),
93 name=metadata.get("polar-name"),
94 path=path,
95 mime_type=head["ContentType"],
96 size=metadata.get("polar-size"),
97 storage_version=head.get("VersionId", None),
98 checksum_etag=head.get("ETag", None),
99 checksum_sha256_base64=metadata.get("polar-sha256-base64"),
100 checksum_sha256_hex=metadata.get("polar-sha256-hex"),
101 last_modified_at=head.get("LastModified", None),
102 )
105class S3FileUploadPart(S3FileCreatePart): 1a
106 url: str 1a
107 expires_at: datetime 1a
109 headers: dict[str, str] = {} 1a
111 @classmethod 1a
112 def generate_headers(cls, sha256_base64: str | None) -> dict[str, str]: 1a
113 if not sha256_base64:
114 return {}
116 return {
117 "x-amz-checksum-sha256": sha256_base64,
118 "x-amz-sdk-checksum-algorithm": "SHA256",
119 }
122class S3FileUploadMultipart(Schema): 1a
123 id: str 1a
124 path: str 1a
125 parts: list[S3FileUploadPart] 1a
128class S3FileUpload(S3File): 1a
129 upload: S3FileUploadMultipart 1a
132class S3FileUploadCompletedPart(Schema): 1a
133 number: int 1a
134 checksum_etag: str 1a
135 checksum_sha256_base64: str | None 1a
138class S3FileUploadCompleted(Schema): 1a
139 id: str 1a
140 path: str 1a
141 parts: list[S3FileUploadCompletedPart] 1a
143 @staticmethod 1a
144 def generate_base64_multipart_checksum(checksum_digests: list[bytes]) -> str: 1a
145 # S3 SHA-256 BASE64 validation for multipart upload is special.
146 # It's not the same as SHA-256 BASE64 on the entire file contents.
147 #
148 # 1. Concatenates SHA-256 digests (not base64 encoded) from chunks
149 # 2. New SHA-256 digest of the concatenation
150 # 3. Base64 encode the new digest
151 #
152 # See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html
153 # See: https://youtu.be/Te6s1VZPGfk?si=mnq2NizKJy_bM5-D&t=510
154 #
155 # We only use this for S3 validation. Our SHA-256 base64 & hexdigest in
156 # the database is for the entire file contents to support regular
157 # client-side validation post download.
158 concatenated = b"".join(checksum_digests)
159 digest = hashlib.sha256(concatenated).digest()
160 return base64.b64encode(digest).decode("utf-8")
162 def get_boto3_arguments(self) -> dict[str, Any]: 1a
163 parts = []
164 checksum_digests = []
165 for part in self.parts:
166 data = dict(
167 ETag=part.checksum_etag,
168 PartNumber=part.number,
169 )
170 if part.checksum_sha256_base64:
171 data["ChecksumSHA256"] = part.checksum_sha256_base64
172 digest = base64.b64decode(part.checksum_sha256_base64)
173 checksum_digests.append(digest)
175 parts.append(data)
177 ret = dict(
178 UploadId=self.id,
179 MultipartUpload=dict(
180 Parts=parts,
181 ),
182 )
183 if not checksum_digests:
184 return ret
186 ret["ChecksumSHA256"] = self.generate_base64_multipart_checksum(
187 checksum_digests
188 )
189 return ret
192class S3FileUploadCompleteResponse(Schema): 1a
193 id: str 1a
194 path: str 1a
195 success: bool 1a
196 checksum_etag: str 1a
197 storage_version: str 1a
200class S3DownloadURL(Schema): 1a
201 url: str 1a
202 headers: dict[str, str] = {} 1a
203 expires_at: datetime 1a
206class S3FileDownload(S3File): 1a
207 download: S3DownloadURL 1a