1 | # s3-bsync Copyright (c) 2022 Joshua Stockin |
2 | # <https://joshstock.in> |
3 | # <https://git.joshstock.in/s3-bsync> |
4 | # |
5 | # This software is licensed and distributed under the terms of the MIT License. |
6 | # See the MIT License in the LICENSE file of this project's root folder. |
7 | # |
8 | # This comment block and its contents, including this disclaimer, MUST be |
9 | # preserved in all copies or distributions of this software's source. |
10 |
|
11 | import os |
12 | import time |
13 | import re |
14 | import logging |
15 |
|
16 | from .classes import * |
17 |
|
18 | logger = logging.getLogger(__name__) |
19 |
|
20 | __all__ = ["syncfile"] |
21 |
|
22 |
|
23 | CONTROL_BYTES = { |
24 | "SIGNATURE": b"\x9D\x9F\x53\x33", |
25 | "BUCKET_BEGIN": b"\x90", |
26 | "BUCKET_END": b"\x91", |
27 | "DIRECTORY_BEGIN": b"\x92", |
28 | "DIRECTORY_END": b"\x93", |
29 | "OBJECT_BEGIN": b"\x94", |
30 | "OBJECT_END": b"\x95", |
31 | "ETAG_MD5": b"\x96", |
32 | "ETAG_OTHER": b"\x97", |
33 | "METADATA_BEGIN": b"\x9A", |
34 | "METADATA_END": b"\x9B", |
35 | } |
36 |
|
37 | CURRENT_VERSION = 1 |
38 | ENDIANNESS = "little" |
39 |
|
40 |
|
41 | class syncfile: |
42 | file_path = None |
43 | file_version = 0 |
44 | file_size = 0 |
45 | last_synced_time = 0 |
46 | managed_buckets = [] |
47 |
|
48 | def __init__(self, state_file: str): |
49 | self.file_path = state_file |
50 |
|
51 | def map_directory(self, local_path, s3_path): |
52 | # Verify local path validity |
53 | if not os.path.isdir(local_path): |
54 | logger.error( |
55 | f'User supplied local directory ("{local_path}") is not a directory' |
56 | ) |
57 | exit(1) |
58 |
|
59 | # Check S3 path supplied is valid |
60 | s3match = re.match("^s3:\/\/([a-z0-9][a-z0-9-]{1,61}[a-z0-9])\/(.*)$", s3_path) |
61 | if not s3match or len(s3match.groups()) != 2: |
62 | logger.error(f'User supplied invalid S3 path ("{s3_path}")') |
63 | exit(1) |
64 | bucket_name = s3match.group(1) |
65 | s3_prefix = s3match.group(2) |
66 | if s3_prefix.endswith("/") or len(s3_prefix) == 0: |
67 | logger.error(f'User supplied invalid S3 path prefix ("{s3_prefix}")') |
68 | exit(1) |
69 |
|
70 | logger.debug( |
71 | f'Local directory "{local_path}" mapped to bucket "{bucket_name}" at path prefix "{s3_prefix}"' |
72 | ) |
73 |
|
74 | bucket = next( |
75 | ( |
76 | bucket |
77 | for bucket in self.managed_buckets |
78 | if bucket.bucket_name == bucket_name |
79 | ), |
80 | None, |
81 | ) |
82 | if not bucket: |
83 | bucket = sync_managed_bucket(bucket_name) |
84 | self.managed_buckets.append(bucket) |
85 |
|
86 | dirmap = f"{local_path}:{s3_prefix}" |
87 | dirmap_exists = next( |
88 | ( |
89 | True |
90 | for x in bucket.directory_maps |
91 | if f"{x.local_path}:{x.s3_prefix}" == dirmap |
92 | ), |
93 | None, |
94 | ) |
95 | if dirmap_exists: |
96 | logger.error(f"Directory mapping {local_path}:{s3_prefix} already exists") |
97 | exit(1) |
98 | bucket.create_dirmap(local_path, s3_prefix) |
99 |
|
100 | def file_exists(self): |
101 | if os.path.exists(self.file_path) and not os.path.isdir(self.file_path): |
102 | return True |
103 | return False |
104 |
|
105 | def purge(self): |
106 | if self.file_exists(): |
107 | if self.verify_file(): |
108 | os.remove(self.file_path) |
109 | else: |
110 | logger.error("Attempt to purge (delete) a non-s3sync file") |
111 | exit(1) |
112 | else: |
113 | logger.error("State file nonexistent") |
114 | exit(1) |
115 |
|
116 | def verify_file(self): |
117 | if not self.file_exists(): |
118 | return False |
119 | f = open(self.file_path, "rb") |
120 | f.seek(0) |
121 | b = f.read(4) |
122 | f.close() |
123 | if b != CONTROL_BYTES["SIGNATURE"]: |
124 | return False |
125 | return True |
126 |
|
127 | def serialize(self): |
128 | logger.debug("Compiling bytearray") |
129 |
|
130 | b = bytearray() |
131 |
|
132 | b += CONTROL_BYTES["SIGNATURE"] |
133 | b += CURRENT_VERSION.to_bytes(1, byteorder=ENDIANNESS) |
134 |
|
135 | b += CONTROL_BYTES["METADATA_BEGIN"] |
136 | current_time = time.time_ns() // 1000000 |
137 | b += current_time.to_bytes(8, byteorder=ENDIANNESS) |
138 | b += CONTROL_BYTES["METADATA_END"] |
139 |
|
140 | for bucket in self.managed_buckets: |
141 | b += CONTROL_BYTES["BUCKET_BEGIN"] |
142 | b += bucket.bucket_name.encode() + b"\x00" |
143 |
|
144 | logger.debug(f"Bucket {bucket.bucket_name}") |
145 |
|
146 | for dirmap in bucket.directory_maps: |
147 | b += CONTROL_BYTES["DIRECTORY_BEGIN"] |
148 | b += dirmap.local_path.encode() + b"\x00" |
149 | b += dirmap.s3_prefix.encode() + b"\x00" |
150 | b += dirmap.gz_compress.to_bytes(1, byteorder=ENDIANNESS) |
151 | b += dirmap.recursive.to_bytes(1, byteorder=ENDIANNESS) |
152 | b += dirmap.gpg_enabled.to_bytes(1, byteorder=ENDIANNESS) |
153 | if dirmap.gpg_enabled: |
154 | b += dirmap.gpg_email.encode() + b"\x00" |
155 | b += CONTROL_BYTES["DIRECTORY_END"] |
156 |
|
157 | for fileobject in bucket.fileobjects: |
158 | b += CONTROL_BYTES["OBJECT_BEGIN"] |
159 | b += fileobject.key.encode() + b"\x00" |
160 | b += fileobject.modified.to_bytes(8, byteorder=ENDIANNESS) |
161 | if fileobject and len(fileobject.etag) == 16: |
162 | b += CONTROL_BYTES["ETAG_MD5"] |
163 | b += bytes.fromhex(fileobject.etag) |
164 | else: |
165 | b += CONTROL_BYTES["ETAG_OTHER"] |
166 | b += fileobject.etag.encode() + b"\x00" |
167 | b += fileobject.size.to_bytes(8, byteorder=ENDIANNESS) |
168 | b += CONTROL_BYTES["OBJECT_END"] |
169 |
|
170 | b += CONTROL_BYTES["BUCKET_END"] |
171 |
|
172 | logger.debug("Writing serialized state information to syncfile") |
173 | f = open(self.file_path, "wb") |
174 | f.seek(0) |
175 | f.write(b) |
176 | f.truncate() |
177 | f.close() |
178 | logger.debug(f"Finished writing to file (length {len(b)})") |
179 |
|
180 | def deserialize(self): |
181 | if not self.file_exists(): |
182 | logger.error("Attempt to deserialize file that doesn't exist") |
183 | exit(1) |
184 |
|
185 | self.file_size = os.path.getsize(self.file_path) |
186 | f = open(self.file_path, "rb") |
187 | logger.debug(f"Deserializing file {f}") |
188 | f.seek(0) |
189 |
|
190 | def get_string(): |
191 | return b"".join(iter(lambda: f.read(1), b"\x00")).decode() |
192 |
|
193 | b = f.read(4) |
194 | if b != CONTROL_BYTES["SIGNATURE"]: |
195 | logger.error( |
196 | "File signature does not match expected s3state file signature (not an s3sync file format or file corrupted)" |
197 | ) |
198 | exit(1) |
199 |
|
200 | self.file_version = int.from_bytes(f.read(1), byteorder=ENDIANNESS) |
201 | if self.file_version == 0 or self.file_version > CURRENT_VERSION: |
202 | logger.error( |
203 | f"File version outside expected range (1..{CURRENT_VERSION}) (corrupt file)" |
204 | ) |
205 | exit(1) |
206 | logger.debug(f"File is version {self.file_version}") |
207 |
|
208 | b = f.read(1) |
209 | if b != CONTROL_BYTES["METADATA_BEGIN"]: |
210 | logger.error("Expected metadata block begin byte not found (corrupt file)") |
211 | exit(1) |
212 | self.last_synced_time = int.from_bytes(f.read(8), byteorder=ENDIANNESS) |
213 | logger.debug(f"Last synced time reported as {self.last_synced_time}") |
214 |
|
215 | b = f.read(1) |
216 | if b != CONTROL_BYTES["METADATA_END"]: |
217 | logger.error("Expected metadata block end byte not found (corrupt file)") |
218 | exit(1) |
219 |
|
220 | while b := f.read(1): |
221 | if b != CONTROL_BYTES["BUCKET_BEGIN"]: |
222 | logger.error(b"Unexpected control byte detected (corrupt file)") |
223 | exit(1) |
224 | bucket_name = get_string() |
225 | bucket = sync_managed_bucket(bucket_name) |
226 | self.managed_buckets.append(bucket) |
227 |
|
228 | logger.debug(f"Bucket {bucket_name}") |
229 |
|
230 | while b2 := f.read(1): |
231 | if b2 == CONTROL_BYTES["DIRECTORY_BEGIN"]: |
232 | local_path = get_string() |
233 | s3_prefix = get_string() |
234 | gz_compress = int.from_bytes(f.read(1), byteorder=ENDIANNESS) |
235 | recursive = bool.from_bytes(f.read(1), byteorder=ENDIANNESS) |
236 | gpg_enabled = bool.from_bytes(f.read(1), byteorder=ENDIANNESS) |
237 | gpg_email = "" |
238 | if gpg_enabled: |
239 | gpg_email = get_string() |
240 | if f.read(1) != CONTROL_BYTES["DIRECTORY_END"]: |
241 | logger.error( |
242 | "Expected directory block end byte not found (corrupt file)" |
243 | ) |
244 | exit(1) |
245 | bucket.create_dirmap( |
246 | local_path, |
247 | s3_prefix, |
248 | gz_compress, |
249 | recursive, |
250 | gpg_enabled, |
251 | gpg_email or "", |
252 | ) |
253 | logger.debug( |
254 | f"Created directory map {local_path}:{s3_prefix}, recursive={recursive}, gzip={gz_compress}, gpg_enabled={gpg_enabled}, gpg_email={gpg_email}" |
255 | ) |
256 |
|
257 | elif b2 == CONTROL_BYTES["OBJECT_BEGIN"]: |
258 | key = get_string() |
259 | modified = int.from_bytes(f.read(8), byteorder=ENDIANNESS) |
260 | etag_type = f.read(1) |
261 | etag = "" |
262 | if etag_type == CONTROL_BYTES["ETAG_MD5"]: |
263 | etag = f.read(16) |
264 | elif etag_type == CONTROL_BYTES["ETAG_OTHER"]: |
265 | etag = get_string() |
266 | file_size = int.form_bytes(f.read(8), byteorder=ENDIANNESS) |
267 | if f.read(1) != CONTROL_BYTES["OBJECT_END"]: |
268 | logger.error( |
269 | "Expected fileobject block end byte not found (corrupt file)" |
270 | ) |
271 | exit(1) |
272 | bucket.create_fileobject(key, modified, etag, file_size) |
273 | logger.debug( |
274 | f"Created fileobject {key} with ETAG {etag}, file size {file_size}, last modified {modified}" |
275 | ) |
276 |
|
277 | elif b2 == CONTROL_BYTES["BUCKET_END"]: |
278 | break |
279 |
|
280 | else: |
281 | logger.error("Unexpected control byte detected (corrupt file)") |
282 |
|
283 | f.close() |
284 |
|