Index

s3-bsync / 2414eba

Bidirectional syncing tool to sync local filesystem directories with S3 buckets. (Incomplete)

Latest Commit

{#}TimeHashSubjectAuthor#(+)(-)GPG?
1419 Jun 2022 18:222414ebaDeserialization and directory mappingJosh Stockin11175G

Blob @ s3-bsync / src / syncfile.py

application/x-python10261 bytesdownload raw
1# s3-bsync Copyright (c) 2022 Joshua Stockin
2# <https://joshstock.in>
3# <https://git.joshstock.in/s3-bsync>
4#
5# This software is licensed and distributed under the terms of the MIT License.
6# See the MIT License in the LICENSE file of this project's root folder.
7#
8# This comment block and its contents, including this disclaimer, MUST be
9# preserved in all copies or distributions of this software's source.
10
11import os
12import time
13import re
14import logging
15
16from .classes import *
17
18logger = logging.getLogger(__name__)
19
20__all__ = ["syncfile"]
21
22
23CONTROL_BYTES = {
24 "SIGNATURE": b"\x9D\x9F\x53\x33",
25 "BUCKET_BEGIN": b"\x90",
26 "BUCKET_END": b"\x91",
27 "DIRECTORY_BEGIN": b"\x92",
28 "DIRECTORY_END": b"\x93",
29 "OBJECT_BEGIN": b"\x94",
30 "OBJECT_END": b"\x95",
31 "ETAG_MD5": b"\x96",
32 "ETAG_OTHER": b"\x97",
33 "METADATA_BEGIN": b"\x9A",
34 "METADATA_END": b"\x9B",
35}
36
37CURRENT_VERSION = 1
38ENDIANNESS = "little"
39
40
41class syncfile:
42 file_path = None
43 file_version = 0
44 file_size = 0
45 last_synced_time = 0
46 managed_buckets = []
47
48 def __init__(self, state_file: str):
49 self.file_path = state_file
50
51 def map_directory(self, local_path, s3_path):
52 # Verify local path validity
53 if not os.path.isdir(local_path):
54 logger.error(
55 f'User supplied local directory ("{local_path}") is not a directory'
56 )
57 exit(1)
58
59 # Check S3 path supplied is valid
60 s3match = re.match("^s3:\/\/([a-z0-9][a-z0-9-]{1,61}[a-z0-9])\/(.*)$", s3_path)
61 if not s3match or len(s3match.groups()) != 2:
62 logger.error(f'User supplied invalid S3 path ("{s3_path}")')
63 exit(1)
64 bucket_name = s3match.group(1)
65 s3_prefix = s3match.group(2)
66 if s3_prefix.endswith("/") or len(s3_prefix) == 0:
67 logger.error(f'User supplied invalid S3 path prefix ("{s3_prefix}")')
68 exit(1)
69
70 logger.debug(
71 f'Local directory "{local_path}" mapped to bucket "{bucket_name}" at path prefix "{s3_prefix}"'
72 )
73
74 bucket = next(
75 (
76 bucket
77 for bucket in self.managed_buckets
78 if bucket.bucket_name == bucket_name
79 ),
80 None,
81 )
82 if not bucket:
83 bucket = sync_managed_bucket(bucket_name)
84 self.managed_buckets.append(bucket)
85
86 dirmap = f"{local_path}:{s3_prefix}"
87 dirmap_exists = next(
88 (
89 True
90 for x in bucket.directory_maps
91 if f"{x.local_path}:{x.s3_prefix}" == dirmap
92 ),
93 None,
94 )
95 if dirmap_exists:
96 logger.error(f"Directory mapping {local_path}:{s3_prefix} already exists")
97 exit(1)
98 bucket.create_dirmap(local_path, s3_prefix)
99
100 def file_exists(self):
101 if os.path.exists(self.file_path) and not os.path.isdir(self.file_path):
102 return True
103 return False
104
105 def purge(self):
106 if self.file_exists():
107 if self.verify_file():
108 os.remove(self.file_path)
109 else:
110 logger.error("Attempt to purge (delete) a non-s3sync file")
111 exit(1)
112 else:
113 logger.error("State file nonexistent")
114 exit(1)
115
116 def verify_file(self):
117 if not self.file_exists():
118 return False
119 f = open(self.file_path, "rb")
120 f.seek(0)
121 b = f.read(4)
122 f.close()
123 if b != CONTROL_BYTES["SIGNATURE"]:
124 return False
125 return True
126
127 def serialize(self):
128 logger.debug("Compiling bytearray")
129
130 b = bytearray()
131
132 b += CONTROL_BYTES["SIGNATURE"]
133 b += CURRENT_VERSION.to_bytes(1, byteorder=ENDIANNESS)
134
135 b += CONTROL_BYTES["METADATA_BEGIN"]
136 current_time = time.time_ns() // 1000000
137 b += current_time.to_bytes(8, byteorder=ENDIANNESS)
138 b += CONTROL_BYTES["METADATA_END"]
139
140 for bucket in self.managed_buckets:
141 b += CONTROL_BYTES["BUCKET_BEGIN"]
142 b += bucket.bucket_name.encode() + b"\x00"
143
144 logger.debug(f"Bucket {bucket.bucket_name}")
145
146 for dirmap in bucket.directory_maps:
147 b += CONTROL_BYTES["DIRECTORY_BEGIN"]
148 b += dirmap.local_path.encode() + b"\x00"
149 b += dirmap.s3_prefix.encode() + b"\x00"
150 b += dirmap.gz_compress.to_bytes(1, byteorder=ENDIANNESS)
151 b += dirmap.recursive.to_bytes(1, byteorder=ENDIANNESS)
152 b += dirmap.gpg_enabled.to_bytes(1, byteorder=ENDIANNESS)
153 if dirmap.gpg_enabled:
154 b += dirmap.gpg_email.encode() + b"\x00"
155 b += CONTROL_BYTES["DIRECTORY_END"]
156
157 for fileobject in bucket.fileobjects:
158 b += CONTROL_BYTES["OBJECT_BEGIN"]
159 b += fileobject.key.encode() + b"\x00"
160 b += fileobject.modified.to_bytes(8, byteorder=ENDIANNESS)
161 if fileobject and len(fileobject.etag) == 16:
162 b += CONTROL_BYTES["ETAG_MD5"]
163 b += bytes.fromhex(fileobject.etag)
164 else:
165 b += CONTROL_BYTES["ETAG_OTHER"]
166 b += fileobject.etag.encode() + b"\x00"
167 b += fileobject.size.to_bytes(8, byteorder=ENDIANNESS)
168 b += CONTROL_BYTES["OBJECT_END"]
169
170 b += CONTROL_BYTES["BUCKET_END"]
171
172 logger.debug("Writing serialized state information to syncfile")
173 f = open(self.file_path, "wb")
174 f.seek(0)
175 f.write(b)
176 f.truncate()
177 f.close()
178 logger.debug(f"Finished writing to file (length {len(b)})")
179
180 def deserialize(self):
181 if not self.file_exists():
182 logger.error("Attempt to deserialize file that doesn't exist")
183 exit(1)
184
185 self.file_size = os.path.getsize(self.file_path)
186 f = open(self.file_path, "rb")
187 logger.debug(f"Deserializing file {f}")
188 f.seek(0)
189
190 def get_string():
191 return b"".join(iter(lambda: f.read(1), b"\x00")).decode()
192
193 b = f.read(4)
194 if b != CONTROL_BYTES["SIGNATURE"]:
195 logger.error(
196 "File signature does not match expected s3state file signature (not an s3sync file format or file corrupted)"
197 )
198 exit(1)
199
200 self.file_version = int.from_bytes(f.read(1), byteorder=ENDIANNESS)
201 if self.file_version == 0 or self.file_version > CURRENT_VERSION:
202 logger.error(
203 f"File version outside expected range (1..{CURRENT_VERSION}) (corrupt file)"
204 )
205 exit(1)
206 logger.debug(f"File is version {self.file_version}")
207
208 b = f.read(1)
209 if b != CONTROL_BYTES["METADATA_BEGIN"]:
210 logger.error("Expected metadata block begin byte not found (corrupt file)")
211 exit(1)
212 self.last_synced_time = int.from_bytes(f.read(8), byteorder=ENDIANNESS)
213 logger.debug(f"Last synced time reported as {self.last_synced_time}")
214
215 b = f.read(1)
216 if b != CONTROL_BYTES["METADATA_END"]:
217 logger.error("Expected metadata block end byte not found (corrupt file)")
218 exit(1)
219
220 while b := f.read(1):
221 if b != CONTROL_BYTES["BUCKET_BEGIN"]:
222 logger.error(b"Unexpected control byte detected (corrupt file)")
223 exit(1)
224 bucket_name = get_string()
225 bucket = sync_managed_bucket(bucket_name)
226 self.managed_buckets.append(bucket)
227
228 logger.debug(f"Bucket {bucket_name}")
229
230 while b2 := f.read(1):
231 if b2 == CONTROL_BYTES["DIRECTORY_BEGIN"]:
232 local_path = get_string()
233 s3_prefix = get_string()
234 gz_compress = int.from_bytes(f.read(1), byteorder=ENDIANNESS)
235 recursive = bool.from_bytes(f.read(1), byteorder=ENDIANNESS)
236 gpg_enabled = bool.from_bytes(f.read(1), byteorder=ENDIANNESS)
237 gpg_email = ""
238 if gpg_enabled:
239 gpg_email = get_string()
240 if f.read(1) != CONTROL_BYTES["DIRECTORY_END"]:
241 logger.error(
242 "Expected directory block end byte not found (corrupt file)"
243 )
244 exit(1)
245 bucket.create_dirmap(
246 local_path,
247 s3_prefix,
248 gz_compress,
249 recursive,
250 gpg_enabled,
251 gpg_email or "",
252 )
253 logger.debug(
254 f"Created directory map {local_path}:{s3_prefix}, recursive={recursive}, gzip={gz_compress}, gpg_enabled={gpg_enabled}, gpg_email={gpg_email}"
255 )
256
257 elif b2 == CONTROL_BYTES["OBJECT_BEGIN"]:
258 key = get_string()
259 modified = int.from_bytes(f.read(8), byteorder=ENDIANNESS)
260 etag_type = f.read(1)
261 etag = ""
262 if etag_type == CONTROL_BYTES["ETAG_MD5"]:
263 etag = f.read(16)
264 elif etag_type == CONTROL_BYTES["ETAG_OTHER"]:
265 etag = get_string()
266 file_size = int.form_bytes(f.read(8), byteorder=ENDIANNESS)
267 if f.read(1) != CONTROL_BYTES["OBJECT_END"]:
268 logger.error(
269 "Expected fileobject block end byte not found (corrupt file)"
270 )
271 exit(1)
272 bucket.create_fileobject(key, modified, etag, file_size)
273 logger.debug(
274 f"Created fileobject {key} with ETAG {etag}, file size {file_size}, last modified {modified}"
275 )
276
277 elif b2 == CONTROL_BYTES["BUCKET_END"]:
278 break
279
280 else:
281 logger.error("Unexpected control byte detected (corrupt file)")
282
283 f.close()
284