Index

s3-bsync / 8506c93

Bidirectional syncing tool to sync local filesystem directories with S3 buckets. (Incomplete)

Latest Commit

{#}TimeHashSubjectAuthor#(+)(-)GPG?
1528 Jun 2022 22:408506c93Update dataclass structuresJosh Stockin14617G

Blob @ s3-bsync / src / syncfile.py

application/x-python11912 bytesdownload raw
1# s3-bsync Copyright (c) 2022 Joshua Stockin
2# <https://joshstock.in>
3# <https://git.joshstock.in/s3-bsync>
4#
5# This software is licensed and distributed under the terms of the MIT License.
6# See the MIT License in the LICENSE file of this project's root folder.
7#
8# This comment block and its contents, including this disclaimer, MUST be
9# preserved in all copies or distributions of this software's source.
10
11import os
12import time
13import re
14import logging
15
16from .classes import *
17
18logger = logging.getLogger(__name__)
19
20__all__ = ["syncfile"]
21
22
23CONTROL_BYTES = {
24 "SIGNATURE": b"\x9D\x9F\x53\x33",
25 "BUCKET_BEGIN": b"\x90",
26 "BUCKET_END": b"\x91",
27 "DIRECTORY_BEGIN": b"\x92",
28 "DIRECTORY_END": b"\x93",
29 "OBJECT_BEGIN": b"\x94",
30 "OBJECT_END": b"\x95",
31 "ETAG_MD5": b"\x96",
32 "ETAG_OTHER": b"\x97",
33 "METADATA_BEGIN": b"\x9A",
34 "METADATA_END": b"\x9B",
35}
36
37CURRENT_VERSION = 1
38ENDIANNESS = "little"
39
40
41class syncfile:
42 file_path = None
43 file_version = 0
44 file_size = 0
45 last_synced_time = 0
46 managed_buckets = []
47
48 def __init__(self, state_file: str):
49 self.file_path = state_file
50
51 def map_directory(self, local_path, s3_path):
52 # Verify local path validity
53 if not os.path.isdir(local_path):
54 logger.error(
55 f'User supplied local directory ("{local_path}") is not a directory'
56 )
57 exit(1)
58
59 # Check S3 path supplied is valid
60 s3match = re.match("^s3:\/\/([a-z0-9][a-z0-9-]{1,61}[a-z0-9])\/(.*)$", s3_path)
61 if not s3match or len(s3match.groups()) != 2:
62 logger.error(f'User supplied invalid S3 path ("{s3_path}")')
63 exit(1)
64 bucket_name = s3match.group(1)
65 s3_prefix = s3match.group(2)
66 if s3_prefix.endswith("/") or len(s3_prefix) == 0:
67 logger.error(f'User supplied invalid S3 path prefix ("{s3_prefix}")')
68 exit(1)
69
70 logger.debug(
71 f'Local directory "{local_path}" mapped to bucket "{bucket_name}" at path prefix "{s3_prefix}"'
72 )
73
74 bucket = next((b for b in self.managed_buckets if b.bucket_name == bucket_name), False)
75
76 if not bucket:
77 bucket = sync_managed_bucket(bucket_name)
78 self.managed_buckets.append(bucket)
79
80 dirmap_exists = next((True for d in bucket.directory_maps if d.local_path == local_path and d.s3_prefix == s3_prefix), False)
81
82 if dirmap_exists:
83 logger.error(f"Directory map {local_path} <=> {s3_path} already exists")
84 exit(1)
85
86 logger.debug(f"Creating directory map {local_path} <=> {s3_path}")
87 bucket.create_dirmap(local_path, s3_prefix)
88
89 def remove_dirmap(self, local_path, s3_path):
90 # Check S3 path supplied is valid
91 s3match = re.match("^s3:\/\/([a-z0-9][a-z0-9-]{1,61}[a-z0-9])\/(.*)$", s3_path)
92 if not s3match or len(s3match.groups()) != 2:
93 logger.error(f'User supplied invalid S3 path ("{s3_path}")')
94 exit(1)
95 bucket_name = s3match.group(1)
96 s3_prefix = s3match.group(2)
97 if s3_prefix.endswith("/") or len(s3_prefix) == 0:
98 logger.error(f'User supplied invalid S3 path prefix ("{s3_prefix}")')
99 exit(1)
100
101 bucket = next(
102 (
103 bucket
104 for bucket in self.managed_buckets
105 if bucket.bucket_name == bucket_name
106 ),
107 None,
108 )
109 if not bucket:
110 logger.error(f"Bucket s3://{bucket_name} is not tracked by the sync file")
111 exit(1)
112
113 dirmaps = [x for x in bucket.directory_maps if x.local_path == local_path and x.s3_prefix == s3_prefix]
114 if len(dirmaps) > 0:
115 for dirmap in dirmaps:
116 logger.debug(f"Deleting directory map {local_path} <=> {s3_path}")
117 bucket.directory_maps.remove(dirmap)
118 del dirmap
119 else:
120 logger.error(f"Directory map {local_path} <=> {s3_path} does not exist")
121 exit(1)
122
123 def file_exists(self):
124 if os.path.exists(self.file_path) and not os.path.isdir(self.file_path):
125 return True
126 return False
127
128 def purge(self):
129 if self.file_exists():
130 if self.verify_file():
131 os.remove(self.file_path)
132 else:
133 logger.error("Attempt to purge (delete) a non-s3sync file")
134 exit(1)
135 else:
136 logger.error("State file nonexistent")
137 exit(1)
138
139 def verify_file(self):
140 if not self.file_exists():
141 return False
142 f = open(self.file_path, "rb")
143 f.seek(0)
144 b = f.read(4)
145 f.close()
146 if b != CONTROL_BYTES["SIGNATURE"]:
147 return False
148 return True
149
150 def serialize(self):
151 logger.debug("Compiling bytearray")
152
153 b = bytearray()
154
155 b += CONTROL_BYTES["SIGNATURE"]
156 b += CURRENT_VERSION.to_bytes(1, byteorder=ENDIANNESS)
157
158 b += CONTROL_BYTES["METADATA_BEGIN"]
159 current_time = time.time_ns() // 1000000
160 b += current_time.to_bytes(8, byteorder=ENDIANNESS)
161 b += CONTROL_BYTES["METADATA_END"]
162
163 for bucket in self.managed_buckets:
164 if len(bucket.directory_maps) == 0: # Don't serialize any buckets with no dirmaps
165 continue
166
167 b += CONTROL_BYTES["BUCKET_BEGIN"]
168 b += bucket.bucket_name.encode() + b"\x00"
169
170 logger.debug(f"Bucket {bucket.bucket_name}")
171
172 for dirmap in bucket.directory_maps:
173 b += CONTROL_BYTES["DIRECTORY_BEGIN"]
174 b += dirmap.local_path.encode() + b"\x00"
175 b += dirmap.s3_prefix.encode() + b"\x00"
176 b += dirmap.gz_compress.to_bytes(1, byteorder=ENDIANNESS)
177 b += dirmap.recursive.to_bytes(1, byteorder=ENDIANNESS)
178 b += dirmap.gpg_enabled.to_bytes(1, byteorder=ENDIANNESS)
179 if dirmap.gpg_enabled:
180 b += dirmap.gpg_email.encode() + b"\x00"
181 b += CONTROL_BYTES["DIRECTORY_END"]
182 logger.debug(
183 f"Serialized directory map {dirmap.local_path} <=> s3://{bucket.bucket_name}/{dirmap.s3_prefix}, recursive={dirmap.recursive}, gzip={dirmap.gz_compress}, gpg_enabled={dirmap.gpg_enabled}, gpg_email={dirmap.gpg_email}"
184 )
185
186 for fileobject in bucket.fileobjects:
187 b += CONTROL_BYTES["OBJECT_BEGIN"]
188 b += fileobject.key.encode() + b"\x00"
189 b += fileobject.modified.to_bytes(8, byteorder=ENDIANNESS)
190 if fileobject and len(fileobject.etag) == 16:
191 b += CONTROL_BYTES["ETAG_MD5"]
192 b += bytes.fromhex(fileobject.etag)
193 else:
194 b += CONTROL_BYTES["ETAG_OTHER"]
195 b += fileobject.etag.encode() + b"\x00"
196 b += fileobject.size.to_bytes(8, byteorder=ENDIANNESS)
197 b += CONTROL_BYTES["OBJECT_END"]
198
199 b += CONTROL_BYTES["BUCKET_END"]
200
201 logger.debug("Writing serialized state information to syncfile")
202 f = open(self.file_path, "wb")
203 f.seek(0)
204 f.write(b)
205 f.truncate()
206 f.close()
207 logger.debug(f"Finished writing to file (length {len(b)})")
208
209 def deserialize(self):
210 if not self.file_exists():
211 logger.error("Attempt to deserialize file that doesn't exist")
212 exit(1)
213
214 self.file_size = os.path.getsize(self.file_path)
215 f = open(self.file_path, "rb")
216 logger.debug(f"Deserializing file {f}")
217 f.seek(0)
218
219 def get_string():
220 return b"".join(iter(lambda: f.read(1), b"\x00")).decode()
221
222 b = f.read(4)
223 if b != CONTROL_BYTES["SIGNATURE"]:
224 logger.error(
225 "File signature does not match expected s3state file signature (not an s3sync file format or file corrupted)"
226 )
227 exit(1)
228
229 self.file_version = int.from_bytes(f.read(1), byteorder=ENDIANNESS)
230 if self.file_version == 0 or self.file_version > CURRENT_VERSION:
231 logger.error(
232 f"File version outside expected range (1..{CURRENT_VERSION}) (corrupt file)"
233 )
234 exit(1)
235 logger.debug(f"File is version {self.file_version}")
236
237 b = f.read(1)
238 if b != CONTROL_BYTES["METADATA_BEGIN"]:
239 logger.error("Expected metadata block begin byte not found (corrupt file)")
240 exit(1)
241 self.last_synced_time = int.from_bytes(f.read(8), byteorder=ENDIANNESS)
242 logger.debug(f"Last synced time reported as {self.last_synced_time}")
243
244 b = f.read(1)
245 if b != CONTROL_BYTES["METADATA_END"]:
246 logger.error("Expected metadata block end byte not found (corrupt file)")
247 exit(1)
248
249 while b := f.read(1):
250 if b != CONTROL_BYTES["BUCKET_BEGIN"]:
251 logger.error(b"Unexpected control byte detected (corrupt file)")
252 exit(1)
253 bucket_name = get_string()
254 bucket = sync_managed_bucket(bucket_name)
255 self.managed_buckets.append(bucket)
256
257 logger.debug(f"Bucket {bucket_name}")
258
259 while b2 := f.read(1):
260 if b2 == CONTROL_BYTES["DIRECTORY_BEGIN"]:
261 local_path = get_string()
262 s3_prefix = get_string()
263 gz_compress = int.from_bytes(f.read(1), byteorder=ENDIANNESS)
264 recursive = bool.from_bytes(f.read(1), byteorder=ENDIANNESS)
265 gpg_enabled = bool.from_bytes(f.read(1), byteorder=ENDIANNESS)
266 gpg_email = ""
267 if gpg_enabled:
268 gpg_email = get_string()
269 if f.read(1) != CONTROL_BYTES["DIRECTORY_END"]:
270 logger.error(
271 "Expected directory block end byte not found (corrupt file)"
272 )
273 exit(1)
274 bucket.create_dirmap(
275 local_path,
276 s3_prefix,
277 gz_compress,
278 recursive,
279 gpg_enabled,
280 gpg_email,
281 )
282 logger.debug(
283 f"Deserialized directory map {local_path} <=> s3://{bucket.bucket_name}/{s3_prefix}, recursive={recursive}, gzip={gz_compress}, gpg_enabled={gpg_enabled}, gpg_email={gpg_email}"
284 )
285
286 elif b2 == CONTROL_BYTES["OBJECT_BEGIN"]:
287 key = get_string()
288 modified = int.from_bytes(f.read(8), byteorder=ENDIANNESS)
289 etag_type = f.read(1)
290 etag = ""
291 if etag_type == CONTROL_BYTES["ETAG_MD5"]:
292 etag = f.read(16)
293 elif etag_type == CONTROL_BYTES["ETAG_OTHER"]:
294 etag = get_string()
295 file_size = int.form_bytes(f.read(8), byteorder=ENDIANNESS)
296 if f.read(1) != CONTROL_BYTES["OBJECT_END"]:
297 logger.error(
298 "Expected fileobject block end byte not found (corrupt file)"
299 )
300 exit(1)
301 bucket.create_fileobject(key, modified, etag, file_size)
302 logger.debug(
303 f"({bucket_name}) Created fileobject {key} with ETAG {etag}, file size {file_size}, last modified {modified}"
304 )
305
306 elif b2 == CONTROL_BYTES["BUCKET_END"]:
307 break
308
309 else:
310 logger.error("Unexpected control byte detected (corrupt file)")
311
312 f.close()
313