1 | # s3-bsync Copyright (c) 2022 Joshua Stockin |
2 | # <https://joshstock.in> |
3 | # <https://git.joshstock.in/s3-bsync> |
4 | # |
5 | # This software is licensed and distributed under the terms of the MIT License. |
6 | # See the MIT License in the LICENSE file of this project's root folder. |
7 | # |
8 | # This comment block and its contents, including this disclaimer, MUST be |
9 | # preserved in all copies or distributions of this software's source. |
10 |
|
11 | import os |
12 | import time |
13 | import re |
14 | import logging |
15 |
|
16 | from .classes import * |
17 |
|
18 | logger = logging.getLogger(__name__) |
19 |
|
20 | __all__ = ["syncfile", "dirmap_stringify"] |
21 |
|
22 |
|
23 | CONTROL_BYTES = { |
24 | "SIGNATURE": b"\x9D\x9F\x53\x33", |
25 | "BUCKET_BEGIN": b"\x90", |
26 | "BUCKET_END": b"\x91", |
27 | "DIRECTORY_BEGIN": b"\x92", |
28 | "DIRECTORY_END": b"\x93", |
29 | "OBJECT_BEGIN": b"\x94", |
30 | "OBJECT_END": b"\x95", |
31 | "ETAG_MD5": b"\x96", |
32 | "ETAG_OTHER": b"\x97", |
33 | "METADATA_BEGIN": b"\x9A", |
34 | "METADATA_END": b"\x9B", |
35 | } |
36 |
|
37 | CURRENT_VERSION = 1 |
38 | ENDIANNESS = "little" |
39 |
|
40 |
|
41 | def dirmap_stringify(local_path, bucket_name, s3_prefix): |
42 | return f'"{local_path}" <=> "s3://{bucket_name}/{s3_prefix}"' |
43 |
|
44 |
|
45 | class syncfile: |
46 | file_path = None |
47 | file_version = 0 |
48 | file_size = 0 |
49 | last_synced_time = 0 |
50 | managed_buckets = [] |
51 |
|
52 | def __init__(self, state_file: str): |
53 | self.file_path = state_file |
54 |
|
55 | def map_directory(self, local_path, s3_path): |
56 | # Verify local path validity |
57 | if not os.path.isdir(local_path): |
58 | logger.error( |
59 | f'User supplied local directory ("{local_path}") is not a directory' |
60 | ) |
61 | exit(1) |
62 |
|
63 | # Check S3 path supplied is valid |
64 | s3match = re.match("^s3:\/\/([a-z0-9][a-z0-9-]{1,61}[a-z0-9])\/(.*)$", s3_path) |
65 | if not s3match or len(s3match.groups()) != 2: |
66 | logger.error(f'User supplied invalid S3 path ("{s3_path}")') |
67 | exit(1) |
68 | bucket_name = s3match.group(1) |
69 | s3_prefix = s3match.group(2) |
70 | if s3_prefix.endswith("/") or len(s3_prefix) == 0: |
71 | logger.error(f'User supplied invalid S3 path prefix ("{s3_prefix}")') |
72 | exit(1) |
73 |
|
74 | logger.debug( |
75 | f'Local directory "{local_path}" mapped to bucket "{bucket_name}" at path prefix "{s3_prefix}"' |
76 | ) |
77 |
|
78 | bucket = next( |
79 | (b for b in self.managed_buckets if b.bucket_name == bucket_name), False |
80 | ) |
81 |
|
82 | if not bucket: |
83 | bucket = sync_managed_bucket(bucket_name) |
84 | self.managed_buckets.append(bucket) |
85 |
|
86 | dirmap_exists = next( |
87 | ( |
88 | True |
89 | for d in bucket.directory_maps |
90 | if d.local_path == local_path and d.s3_prefix == s3_prefix |
91 | ), |
92 | False, |
93 | ) |
94 |
|
95 | if dirmap_exists: |
96 | logger.error( |
97 | f"Directory map {dirmap_stringify(local_path, bucket.bucket_name, s3_prefix)} already exists" |
98 | ) |
99 | exit(1) |
100 |
|
101 | logger.debug( |
102 | f"Creating directory map {dirmap_stringify(local_path, bucket.bucket_name, s3_prefix)}" |
103 | ) |
104 | bucket.create_dirmap(local_path, s3_prefix) |
105 |
|
106 | def remove_dirmap(self, local_path, s3_path): |
107 | # Check S3 path supplied is valid |
108 | s3match = re.match("^s3:\/\/([a-z0-9][a-z0-9-]{1,61}[a-z0-9])\/(.*)$", s3_path) |
109 | if not s3match or len(s3match.groups()) != 2: |
110 | logger.error(f'User supplied invalid S3 path ("{s3_path}")') |
111 | exit(1) |
112 | bucket_name = s3match.group(1) |
113 | s3_prefix = s3match.group(2) |
114 | if s3_prefix.endswith("/") or len(s3_prefix) == 0: |
115 | logger.error(f'User supplied invalid S3 path prefix ("{s3_prefix}")') |
116 | exit(1) |
117 |
|
118 | bucket = next( |
119 | ( |
120 | bucket |
121 | for bucket in self.managed_buckets |
122 | if bucket.bucket_name == bucket_name |
123 | ), |
124 | None, |
125 | ) |
126 | if not bucket: |
127 | logger.error(f"Bucket s3://{bucket_name} is not tracked by the sync file") |
128 | exit(1) |
129 |
|
130 | dirmaps = [ |
131 | x |
132 | for x in bucket.directory_maps |
133 | if x.local_path == local_path and x.s3_prefix == s3_prefix |
134 | ] |
135 | if len(dirmaps) > 0: |
136 | for dirmap in dirmaps: |
137 | logger.debug( |
138 | f"Deleting directory map {dirmap_stringify(local_path, bucket.bucket_name, s3_prefix)}" |
139 | ) |
140 | bucket.directory_maps.remove(dirmap) |
141 | del dirmap |
142 | else: |
143 | logger.error( |
144 | f"Directory map {dirmap_stringify(local_path, bucket.bucket_name, s3_prefix)} does not exist" |
145 | ) |
146 | exit(1) |
147 |
|
148 | def file_exists(self): |
149 | if os.path.exists(self.file_path) and not os.path.isdir(self.file_path): |
150 | return True |
151 | return False |
152 |
|
153 | def purge(self): |
154 | if self.file_exists(): |
155 | if self.verify_file(): |
156 | os.remove(self.file_path) |
157 | else: |
158 | logger.error("Attempt to purge (delete) a non-s3sync file") |
159 | exit(1) |
160 | else: |
161 | logger.error("State file nonexistent") |
162 | exit(1) |
163 |
|
164 | def verify_file(self): |
165 | if not self.file_exists(): |
166 | return False |
167 | f = open(self.file_path, "rb") |
168 | f.seek(0) |
169 | b = f.read(4) |
170 | f.close() |
171 | if b != CONTROL_BYTES["SIGNATURE"]: |
172 | return False |
173 | return True |
174 |
|
175 | def serialize(self): |
176 | logger.debug("Compiling bytearray") |
177 |
|
178 | b = bytearray() |
179 |
|
180 | b += CONTROL_BYTES["SIGNATURE"] |
181 | b += CURRENT_VERSION.to_bytes(1, byteorder=ENDIANNESS) |
182 |
|
183 | b += CONTROL_BYTES["METADATA_BEGIN"] |
184 | current_time = time.time_ns() // 1000000 |
185 | b += current_time.to_bytes(8, byteorder=ENDIANNESS) |
186 | b += CONTROL_BYTES["METADATA_END"] |
187 |
|
188 | for bucket in self.managed_buckets: |
189 | if ( |
190 | len(bucket.directory_maps) == 0 |
191 | ): # Don't serialize any buckets with no dirmaps |
192 | continue |
193 |
|
194 | b += CONTROL_BYTES["BUCKET_BEGIN"] |
195 | b += bucket.bucket_name.encode() + b"\x00" |
196 |
|
197 | logger.debug(f"Bucket {bucket.bucket_name}") |
198 |
|
199 | for dirmap in bucket.directory_maps: |
200 | b += CONTROL_BYTES["DIRECTORY_BEGIN"] |
201 | b += dirmap.local_path.encode() + b"\x00" |
202 | b += dirmap.s3_prefix.encode() + b"\x00" |
203 | b += dirmap.gz_compress.to_bytes(1, byteorder=ENDIANNESS) |
204 | b += dirmap.recursive.to_bytes(1, byteorder=ENDIANNESS) |
205 | b += dirmap.gpg_enabled.to_bytes(1, byteorder=ENDIANNESS) |
206 | if dirmap.gpg_enabled: |
207 | b += dirmap.gpg_email.encode() + b"\x00" |
208 | b += CONTROL_BYTES["DIRECTORY_END"] |
209 | logger.debug( |
210 | f"Serialized directory map {dirmap_stringify(dirmap.local_path, bucket.bucket_name, dirmap.s3_prefix)}" |
211 | ) |
212 |
|
213 | for fileobject in bucket.fileobjects: |
214 | b += CONTROL_BYTES["OBJECT_BEGIN"] |
215 | b += fileobject.key.encode() + b"\x00" |
216 | b += fileobject.modified.to_bytes(8, byteorder=ENDIANNESS) |
217 | if fileobject and len(fileobject.etag) == 16: |
218 | b += CONTROL_BYTES["ETAG_MD5"] |
219 | b += bytes.fromhex(fileobject.etag) |
220 | else: |
221 | b += CONTROL_BYTES["ETAG_OTHER"] |
222 | b += fileobject.etag.encode() + b"\x00" |
223 | b += fileobject.size.to_bytes(8, byteorder=ENDIANNESS) |
224 | b += CONTROL_BYTES["OBJECT_END"] |
225 | logger.debug( |
226 | f"Serialized fileobject s3://{bucket.name}/{fileobject.key} ({fileobject.etag})" |
227 | ) |
228 |
|
229 | b += CONTROL_BYTES["BUCKET_END"] |
230 |
|
231 | logger.debug("Writing serialized state information to syncfile") |
232 | f = open(self.file_path, "wb") |
233 | f.seek(0) |
234 | f.write(b) |
235 | f.truncate() |
236 | f.close() |
237 | logger.debug(f"Finished writing to file (length {len(b)})") |
238 |
|
239 | def deserialize(self): |
240 | if not self.file_exists(): |
241 | logger.error("Attempt to deserialize file that doesn't exist") |
242 | exit(1) |
243 |
|
244 | self.file_size = os.path.getsize(self.file_path) |
245 | f = open(self.file_path, "rb") |
246 | logger.debug(f"Deserializing file {f}") |
247 | f.seek(0) |
248 |
|
249 | def get_string(): |
250 | return b"".join(iter(lambda: f.read(1), b"\x00")).decode() |
251 |
|
252 | b = f.read(4) |
253 | if b != CONTROL_BYTES["SIGNATURE"]: |
254 | logger.error( |
255 | "File signature does not match expected s3state file signature (not an s3sync file format or file corrupted)" |
256 | ) |
257 | exit(1) |
258 |
|
259 | self.file_version = int.from_bytes(f.read(1), byteorder=ENDIANNESS) |
260 | if self.file_version == 0 or self.file_version > CURRENT_VERSION: |
261 | logger.error( |
262 | f"File version outside expected range (1..{CURRENT_VERSION}) (corrupt file)" |
263 | ) |
264 | exit(1) |
265 | logger.debug(f"File is version {self.file_version}") |
266 |
|
267 | b = f.read(1) |
268 | if b != CONTROL_BYTES["METADATA_BEGIN"]: |
269 | logger.error("Expected metadata block begin byte not found (corrupt file)") |
270 | exit(1) |
271 | self.last_synced_time = int.from_bytes(f.read(8), byteorder=ENDIANNESS) |
272 | logger.debug(f"Last synced time reported as {self.last_synced_time}") |
273 |
|
274 | b = f.read(1) |
275 | if b != CONTROL_BYTES["METADATA_END"]: |
276 | logger.error("Expected metadata block end byte not found (corrupt file)") |
277 | exit(1) |
278 |
|
279 | while b := f.read(1): |
280 | if b != CONTROL_BYTES["BUCKET_BEGIN"]: |
281 | logger.error(b"Unexpected control byte detected (corrupt file)") |
282 | exit(1) |
283 | bucket_name = get_string() |
284 | bucket = sync_managed_bucket(bucket_name) |
285 | self.managed_buckets.append(bucket) |
286 |
|
287 | logger.debug(f"Bucket {bucket_name}") |
288 |
|
289 | while b2 := f.read(1): |
290 | if b2 == CONTROL_BYTES["DIRECTORY_BEGIN"]: |
291 | local_path = get_string() |
292 | s3_prefix = get_string() |
293 | gz_compress = int.from_bytes(f.read(1), byteorder=ENDIANNESS) |
294 | recursive = bool.from_bytes(f.read(1), byteorder=ENDIANNESS) |
295 | gpg_enabled = bool.from_bytes(f.read(1), byteorder=ENDIANNESS) |
296 | gpg_email = "" |
297 | if gpg_enabled: |
298 | gpg_email = get_string() |
299 | if f.read(1) != CONTROL_BYTES["DIRECTORY_END"]: |
300 | logger.error( |
301 | "Expected directory block end byte not found (corrupt file)" |
302 | ) |
303 | exit(1) |
304 | bucket.create_dirmap( |
305 | local_path, |
306 | s3_prefix, |
307 | gz_compress, |
308 | recursive, |
309 | gpg_enabled, |
310 | gpg_email, |
311 | ) |
312 | logger.debug( |
313 | f"Deserialized directory map {dirmap_stringify(local_path, bucket.bucket_name, s3_prefix)}" |
314 | ) |
315 |
|
316 | elif b2 == CONTROL_BYTES["OBJECT_BEGIN"]: |
317 | key = get_string() |
318 | modified = int.from_bytes(f.read(8), byteorder=ENDIANNESS) |
319 | etag_type = f.read(1) |
320 | etag = "" |
321 | if etag_type == CONTROL_BYTES["ETAG_MD5"]: |
322 | etag = f.read(16) |
323 | elif etag_type == CONTROL_BYTES["ETAG_OTHER"]: |
324 | etag = get_string() |
325 | file_size = int.form_bytes(f.read(8), byteorder=ENDIANNESS) |
326 | if f.read(1) != CONTROL_BYTES["OBJECT_END"]: |
327 | logger.error( |
328 | "Expected fileobject block end byte not found (corrupt file)" |
329 | ) |
330 | exit(1) |
331 | bucket.create_fileobject(key, modified, etag, file_size) |
332 | logger.debug( |
333 | f"Deserialized fileobject s3://{bucket.name}/{key} ({etag})" |
334 | ) |
335 |
|
336 | elif b2 == CONTROL_BYTES["BUCKET_END"]: |
337 | break |
338 |
|
339 | else: |
340 | logger.error("Unexpected control byte detected (corrupt file)") |
341 |
|
342 | f.close() |
343 |
|