Index

s3-bsync / master

Bidirectional syncing tool to sync local filesystem directories with S3 buckets. (Incomplete)

Latest Commit

{#}TimeHashSubjectAuthor#(+)(-)GPG?
1614 Jul 2022 17:1211b250eUpdate debug messages, begin filescan implementationJosh Stockin14212G

Blob @ s3-bsync / src / syncfile.py

application/x-python12416 bytesdownload raw
1# s3-bsync Copyright (c) 2022 Joshua Stockin
2# <https://joshstock.in>
3# <https://git.joshstock.in/s3-bsync>
4#
5# This software is licensed and distributed under the terms of the MIT License.
6# See the MIT License in the LICENSE file of this project's root folder.
7#
8# This comment block and its contents, including this disclaimer, MUST be
9# preserved in all copies or distributions of this software's source.
10
11import os
12import time
13import re
14import logging
15
16from .classes import *
17
18logger = logging.getLogger(__name__)
19
20__all__ = ["syncfile", "dirmap_stringify"]
21
22
23CONTROL_BYTES = {
24 "SIGNATURE": b"\x9D\x9F\x53\x33",
25 "BUCKET_BEGIN": b"\x90",
26 "BUCKET_END": b"\x91",
27 "DIRECTORY_BEGIN": b"\x92",
28 "DIRECTORY_END": b"\x93",
29 "OBJECT_BEGIN": b"\x94",
30 "OBJECT_END": b"\x95",
31 "ETAG_MD5": b"\x96",
32 "ETAG_OTHER": b"\x97",
33 "METADATA_BEGIN": b"\x9A",
34 "METADATA_END": b"\x9B",
35}
36
37CURRENT_VERSION = 1
38ENDIANNESS = "little"
39
40
41def dirmap_stringify(local_path, bucket_name, s3_prefix):
42 return f'"{local_path}" <=> "s3://{bucket_name}/{s3_prefix}"'
43
44
45class syncfile:
46 file_path = None
47 file_version = 0
48 file_size = 0
49 last_synced_time = 0
50 managed_buckets = []
51
52 def __init__(self, state_file: str):
53 self.file_path = state_file
54
55 def map_directory(self, local_path, s3_path):
56 # Verify local path validity
57 if not os.path.isdir(local_path):
58 logger.error(
59 f'User supplied local directory ("{local_path}") is not a directory'
60 )
61 exit(1)
62
63 # Check S3 path supplied is valid
64 s3match = re.match("^s3:\/\/([a-z0-9][a-z0-9-]{1,61}[a-z0-9])\/(.*)$", s3_path)
65 if not s3match or len(s3match.groups()) != 2:
66 logger.error(f'User supplied invalid S3 path ("{s3_path}")')
67 exit(1)
68 bucket_name = s3match.group(1)
69 s3_prefix = s3match.group(2)
70 if s3_prefix.endswith("/") or len(s3_prefix) == 0:
71 logger.error(f'User supplied invalid S3 path prefix ("{s3_prefix}")')
72 exit(1)
73
74 logger.debug(
75 f'Local directory "{local_path}" mapped to bucket "{bucket_name}" at path prefix "{s3_prefix}"'
76 )
77
78 bucket = next(
79 (b for b in self.managed_buckets if b.bucket_name == bucket_name), False
80 )
81
82 if not bucket:
83 bucket = sync_managed_bucket(bucket_name)
84 self.managed_buckets.append(bucket)
85
86 dirmap_exists = next(
87 (
88 True
89 for d in bucket.directory_maps
90 if d.local_path == local_path and d.s3_prefix == s3_prefix
91 ),
92 False,
93 )
94
95 if dirmap_exists:
96 logger.error(
97 f"Directory map {dirmap_stringify(local_path, bucket.bucket_name, s3_prefix)} already exists"
98 )
99 exit(1)
100
101 logger.debug(
102 f"Creating directory map {dirmap_stringify(local_path, bucket.bucket_name, s3_prefix)}"
103 )
104 bucket.create_dirmap(local_path, s3_prefix)
105
106 def remove_dirmap(self, local_path, s3_path):
107 # Check S3 path supplied is valid
108 s3match = re.match("^s3:\/\/([a-z0-9][a-z0-9-]{1,61}[a-z0-9])\/(.*)$", s3_path)
109 if not s3match or len(s3match.groups()) != 2:
110 logger.error(f'User supplied invalid S3 path ("{s3_path}")')
111 exit(1)
112 bucket_name = s3match.group(1)
113 s3_prefix = s3match.group(2)
114 if s3_prefix.endswith("/") or len(s3_prefix) == 0:
115 logger.error(f'User supplied invalid S3 path prefix ("{s3_prefix}")')
116 exit(1)
117
118 bucket = next(
119 (
120 bucket
121 for bucket in self.managed_buckets
122 if bucket.bucket_name == bucket_name
123 ),
124 None,
125 )
126 if not bucket:
127 logger.error(f"Bucket s3://{bucket_name} is not tracked by the sync file")
128 exit(1)
129
130 dirmaps = [
131 x
132 for x in bucket.directory_maps
133 if x.local_path == local_path and x.s3_prefix == s3_prefix
134 ]
135 if len(dirmaps) > 0:
136 for dirmap in dirmaps:
137 logger.debug(
138 f"Deleting directory map {dirmap_stringify(local_path, bucket.bucket_name, s3_prefix)}"
139 )
140 bucket.directory_maps.remove(dirmap)
141 del dirmap
142 else:
143 logger.error(
144 f"Directory map {dirmap_stringify(local_path, bucket.bucket_name, s3_prefix)} does not exist"
145 )
146 exit(1)
147
148 def file_exists(self):
149 if os.path.exists(self.file_path) and not os.path.isdir(self.file_path):
150 return True
151 return False
152
153 def purge(self):
154 if self.file_exists():
155 if self.verify_file():
156 os.remove(self.file_path)
157 else:
158 logger.error("Attempt to purge (delete) a non-s3sync file")
159 exit(1)
160 else:
161 logger.error("State file nonexistent")
162 exit(1)
163
164 def verify_file(self):
165 if not self.file_exists():
166 return False
167 f = open(self.file_path, "rb")
168 f.seek(0)
169 b = f.read(4)
170 f.close()
171 if b != CONTROL_BYTES["SIGNATURE"]:
172 return False
173 return True
174
175 def serialize(self):
176 logger.debug("Compiling bytearray")
177
178 b = bytearray()
179
180 b += CONTROL_BYTES["SIGNATURE"]
181 b += CURRENT_VERSION.to_bytes(1, byteorder=ENDIANNESS)
182
183 b += CONTROL_BYTES["METADATA_BEGIN"]
184 current_time = time.time_ns() // 1000000
185 b += current_time.to_bytes(8, byteorder=ENDIANNESS)
186 b += CONTROL_BYTES["METADATA_END"]
187
188 for bucket in self.managed_buckets:
189 if (
190 len(bucket.directory_maps) == 0
191 ): # Don't serialize any buckets with no dirmaps
192 continue
193
194 b += CONTROL_BYTES["BUCKET_BEGIN"]
195 b += bucket.bucket_name.encode() + b"\x00"
196
197 logger.debug(f"Bucket {bucket.bucket_name}")
198
199 for dirmap in bucket.directory_maps:
200 b += CONTROL_BYTES["DIRECTORY_BEGIN"]
201 b += dirmap.local_path.encode() + b"\x00"
202 b += dirmap.s3_prefix.encode() + b"\x00"
203 b += dirmap.gz_compress.to_bytes(1, byteorder=ENDIANNESS)
204 b += dirmap.recursive.to_bytes(1, byteorder=ENDIANNESS)
205 b += dirmap.gpg_enabled.to_bytes(1, byteorder=ENDIANNESS)
206 if dirmap.gpg_enabled:
207 b += dirmap.gpg_email.encode() + b"\x00"
208 b += CONTROL_BYTES["DIRECTORY_END"]
209 logger.debug(
210 f"Serialized directory map {dirmap_stringify(dirmap.local_path, bucket.bucket_name, dirmap.s3_prefix)}"
211 )
212
213 for fileobject in bucket.fileobjects:
214 b += CONTROL_BYTES["OBJECT_BEGIN"]
215 b += fileobject.key.encode() + b"\x00"
216 b += fileobject.modified.to_bytes(8, byteorder=ENDIANNESS)
217 if fileobject and len(fileobject.etag) == 16:
218 b += CONTROL_BYTES["ETAG_MD5"]
219 b += bytes.fromhex(fileobject.etag)
220 else:
221 b += CONTROL_BYTES["ETAG_OTHER"]
222 b += fileobject.etag.encode() + b"\x00"
223 b += fileobject.size.to_bytes(8, byteorder=ENDIANNESS)
224 b += CONTROL_BYTES["OBJECT_END"]
225 logger.debug(
226 f"Serialized fileobject s3://{bucket.name}/{fileobject.key} ({fileobject.etag})"
227 )
228
229 b += CONTROL_BYTES["BUCKET_END"]
230
231 logger.debug("Writing serialized state information to syncfile")
232 f = open(self.file_path, "wb")
233 f.seek(0)
234 f.write(b)
235 f.truncate()
236 f.close()
237 logger.debug(f"Finished writing to file (length {len(b)})")
238
239 def deserialize(self):
240 if not self.file_exists():
241 logger.error("Attempt to deserialize file that doesn't exist")
242 exit(1)
243
244 self.file_size = os.path.getsize(self.file_path)
245 f = open(self.file_path, "rb")
246 logger.debug(f"Deserializing file {f}")
247 f.seek(0)
248
249 def get_string():
250 return b"".join(iter(lambda: f.read(1), b"\x00")).decode()
251
252 b = f.read(4)
253 if b != CONTROL_BYTES["SIGNATURE"]:
254 logger.error(
255 "File signature does not match expected s3state file signature (not an s3sync file format or file corrupted)"
256 )
257 exit(1)
258
259 self.file_version = int.from_bytes(f.read(1), byteorder=ENDIANNESS)
260 if self.file_version == 0 or self.file_version > CURRENT_VERSION:
261 logger.error(
262 f"File version outside expected range (1..{CURRENT_VERSION}) (corrupt file)"
263 )
264 exit(1)
265 logger.debug(f"File is version {self.file_version}")
266
267 b = f.read(1)
268 if b != CONTROL_BYTES["METADATA_BEGIN"]:
269 logger.error("Expected metadata block begin byte not found (corrupt file)")
270 exit(1)
271 self.last_synced_time = int.from_bytes(f.read(8), byteorder=ENDIANNESS)
272 logger.debug(f"Last synced time reported as {self.last_synced_time}")
273
274 b = f.read(1)
275 if b != CONTROL_BYTES["METADATA_END"]:
276 logger.error("Expected metadata block end byte not found (corrupt file)")
277 exit(1)
278
279 while b := f.read(1):
280 if b != CONTROL_BYTES["BUCKET_BEGIN"]:
281 logger.error(b"Unexpected control byte detected (corrupt file)")
282 exit(1)
283 bucket_name = get_string()
284 bucket = sync_managed_bucket(bucket_name)
285 self.managed_buckets.append(bucket)
286
287 logger.debug(f"Bucket {bucket_name}")
288
289 while b2 := f.read(1):
290 if b2 == CONTROL_BYTES["DIRECTORY_BEGIN"]:
291 local_path = get_string()
292 s3_prefix = get_string()
293 gz_compress = int.from_bytes(f.read(1), byteorder=ENDIANNESS)
294 recursive = bool.from_bytes(f.read(1), byteorder=ENDIANNESS)
295 gpg_enabled = bool.from_bytes(f.read(1), byteorder=ENDIANNESS)
296 gpg_email = ""
297 if gpg_enabled:
298 gpg_email = get_string()
299 if f.read(1) != CONTROL_BYTES["DIRECTORY_END"]:
300 logger.error(
301 "Expected directory block end byte not found (corrupt file)"
302 )
303 exit(1)
304 bucket.create_dirmap(
305 local_path,
306 s3_prefix,
307 gz_compress,
308 recursive,
309 gpg_enabled,
310 gpg_email,
311 )
312 logger.debug(
313 f"Deserialized directory map {dirmap_stringify(local_path, bucket.bucket_name, s3_prefix)}"
314 )
315
316 elif b2 == CONTROL_BYTES["OBJECT_BEGIN"]:
317 key = get_string()
318 modified = int.from_bytes(f.read(8), byteorder=ENDIANNESS)
319 etag_type = f.read(1)
320 etag = ""
321 if etag_type == CONTROL_BYTES["ETAG_MD5"]:
322 etag = f.read(16)
323 elif etag_type == CONTROL_BYTES["ETAG_OTHER"]:
324 etag = get_string()
325 file_size = int.form_bytes(f.read(8), byteorder=ENDIANNESS)
326 if f.read(1) != CONTROL_BYTES["OBJECT_END"]:
327 logger.error(
328 "Expected fileobject block end byte not found (corrupt file)"
329 )
330 exit(1)
331 bucket.create_fileobject(key, modified, etag, file_size)
332 logger.debug(
333 f"Deserialized fileobject s3://{bucket.name}/{key} ({etag})"
334 )
335
336 elif b2 == CONTROL_BYTES["BUCKET_END"]:
337 break
338
339 else:
340 logger.error("Unexpected control byte detected (corrupt file)")
341
342 f.close()
343