| 1 | # s3-bsync Copyright (c) 2022 Joshua Stockin |
| 2 | # <https://joshstock.in> |
| 3 | # <https://git.joshstock.in/s3-bsync> |
| 4 | # |
| 5 | # This software is licensed and distributed under the terms of the MIT License. |
| 6 | # See the MIT License in the LICENSE file of this project's root folder. |
| 7 | # |
| 8 | # This comment block and its contents, including this disclaimer, MUST be |
| 9 | # preserved in all copies or distributions of this software's source. |
| 10 |
|
| 11 | import os |
| 12 | import time |
| 13 | import re |
| 14 | import logging |
| 15 |
|
| 16 | from .classes import * |
| 17 |
|
| 18 | logger = logging.getLogger(__name__) |
| 19 |
|
| 20 | __all__ = ["syncfile"] |
| 21 |
|
| 22 |
|
| 23 | CONTROL_BYTES = { |
| 24 | "SIGNATURE": b"\x9D\x9F\x53\x33", |
| 25 | "BUCKET_BEGIN": b"\x90", |
| 26 | "BUCKET_END": b"\x91", |
| 27 | "DIRECTORY_BEGIN": b"\x92", |
| 28 | "DIRECTORY_END": b"\x93", |
| 29 | "OBJECT_BEGIN": b"\x94", |
| 30 | "OBJECT_END": b"\x95", |
| 31 | "ETAG_MD5": b"\x96", |
| 32 | "ETAG_OTHER": b"\x97", |
| 33 | "METADATA_BEGIN": b"\x9A", |
| 34 | "METADATA_END": b"\x9B", |
| 35 | } |
| 36 |
|
| 37 | CURRENT_VERSION = 1 |
| 38 | ENDIANNESS = "little" |
| 39 |
|
| 40 |
|
| 41 | class syncfile: |
| 42 | file_path = None |
| 43 | file_version = 0 |
| 44 | file_size = 0 |
| 45 | last_synced_time = 0 |
| 46 | managed_buckets = [] |
| 47 |
|
| 48 | def __init__(self, state_file: str): |
| 49 | self.file_path = state_file |
| 50 |
|
| 51 | def map_directory(self, local_path, s3_path): |
| 52 | # Verify local path validity |
| 53 | if not os.path.isdir(local_path): |
| 54 | logger.error( |
| 55 | f'User supplied local directory ("{local_path}") is not a directory' |
| 56 | ) |
| 57 | exit(1) |
| 58 |
|
| 59 | # Check S3 path supplied is valid |
| 60 | s3match = re.match("^s3:\/\/([a-z0-9][a-z0-9-]{1,61}[a-z0-9])\/(.*)$", s3_path) |
| 61 | if not s3match or len(s3match.groups()) != 2: |
| 62 | logger.error(f'User supplied invalid S3 path ("{s3_path}")') |
| 63 | exit(1) |
| 64 | bucket_name = s3match.group(1) |
| 65 | s3_prefix = s3match.group(2) |
| 66 | if s3_prefix.endswith("/") or len(s3_prefix) == 0: |
| 67 | logger.error(f'User supplied invalid S3 path prefix ("{s3_prefix}")') |
| 68 | exit(1) |
| 69 |
|
| 70 | logger.debug( |
| 71 | f'Local directory "{local_path}" mapped to bucket "{bucket_name}" at path prefix "{s3_prefix}"' |
| 72 | ) |
| 73 |
|
| 74 | bucket = next((b for b in self.managed_buckets if b.bucket_name == bucket_name), False) |
| 75 |
|
| 76 | if not bucket: |
| 77 | bucket = sync_managed_bucket(bucket_name) |
| 78 | self.managed_buckets.append(bucket) |
| 79 |
|
| 80 | dirmap_exists = next((True for d in bucket.directory_maps if d.local_path == local_path and d.s3_prefix == s3_prefix), False) |
| 81 |
|
| 82 | if dirmap_exists: |
| 83 | logger.error(f"Directory map {local_path} <=> {s3_path} already exists") |
| 84 | exit(1) |
| 85 |
|
| 86 | logger.debug(f"Creating directory map {local_path} <=> {s3_path}") |
| 87 | bucket.create_dirmap(local_path, s3_prefix) |
| 88 |
|
| 89 | def remove_dirmap(self, local_path, s3_path): |
| 90 | # Check S3 path supplied is valid |
| 91 | s3match = re.match("^s3:\/\/([a-z0-9][a-z0-9-]{1,61}[a-z0-9])\/(.*)$", s3_path) |
| 92 | if not s3match or len(s3match.groups()) != 2: |
| 93 | logger.error(f'User supplied invalid S3 path ("{s3_path}")') |
| 94 | exit(1) |
| 95 | bucket_name = s3match.group(1) |
| 96 | s3_prefix = s3match.group(2) |
| 97 | if s3_prefix.endswith("/") or len(s3_prefix) == 0: |
| 98 | logger.error(f'User supplied invalid S3 path prefix ("{s3_prefix}")') |
| 99 | exit(1) |
| 100 |
|
| 101 | bucket = next( |
| 102 | ( |
| 103 | bucket |
| 104 | for bucket in self.managed_buckets |
| 105 | if bucket.bucket_name == bucket_name |
| 106 | ), |
| 107 | None, |
| 108 | ) |
| 109 | if not bucket: |
| 110 | logger.error(f"Bucket s3://{bucket_name} is not tracked by the sync file") |
| 111 | exit(1) |
| 112 |
|
| 113 | dirmaps = [x for x in bucket.directory_maps if x.local_path == local_path and x.s3_prefix == s3_prefix] |
| 114 | if len(dirmaps) > 0: |
| 115 | for dirmap in dirmaps: |
| 116 | logger.debug(f"Deleting directory map {local_path} <=> {s3_path}") |
| 117 | bucket.directory_maps.remove(dirmap) |
| 118 | del dirmap |
| 119 | else: |
| 120 | logger.error(f"Directory map {local_path} <=> {s3_path} does not exist") |
| 121 | exit(1) |
| 122 |
|
| 123 | def file_exists(self): |
| 124 | if os.path.exists(self.file_path) and not os.path.isdir(self.file_path): |
| 125 | return True |
| 126 | return False |
| 127 |
|
| 128 | def purge(self): |
| 129 | if self.file_exists(): |
| 130 | if self.verify_file(): |
| 131 | os.remove(self.file_path) |
| 132 | else: |
| 133 | logger.error("Attempt to purge (delete) a non-s3sync file") |
| 134 | exit(1) |
| 135 | else: |
| 136 | logger.error("State file nonexistent") |
| 137 | exit(1) |
| 138 |
|
| 139 | def verify_file(self): |
| 140 | if not self.file_exists(): |
| 141 | return False |
| 142 | f = open(self.file_path, "rb") |
| 143 | f.seek(0) |
| 144 | b = f.read(4) |
| 145 | f.close() |
| 146 | if b != CONTROL_BYTES["SIGNATURE"]: |
| 147 | return False |
| 148 | return True |
| 149 |
|
| 150 | def serialize(self): |
| 151 | logger.debug("Compiling bytearray") |
| 152 |
|
| 153 | b = bytearray() |
| 154 |
|
| 155 | b += CONTROL_BYTES["SIGNATURE"] |
| 156 | b += CURRENT_VERSION.to_bytes(1, byteorder=ENDIANNESS) |
| 157 |
|
| 158 | b += CONTROL_BYTES["METADATA_BEGIN"] |
| 159 | current_time = time.time_ns() // 1000000 |
| 160 | b += current_time.to_bytes(8, byteorder=ENDIANNESS) |
| 161 | b += CONTROL_BYTES["METADATA_END"] |
| 162 |
|
| 163 | for bucket in self.managed_buckets: |
| 164 | if len(bucket.directory_maps) == 0: # Don't serialize any buckets with no dirmaps |
| 165 | continue |
| 166 |
|
| 167 | b += CONTROL_BYTES["BUCKET_BEGIN"] |
| 168 | b += bucket.bucket_name.encode() + b"\x00" |
| 169 |
|
| 170 | logger.debug(f"Bucket {bucket.bucket_name}") |
| 171 |
|
| 172 | for dirmap in bucket.directory_maps: |
| 173 | b += CONTROL_BYTES["DIRECTORY_BEGIN"] |
| 174 | b += dirmap.local_path.encode() + b"\x00" |
| 175 | b += dirmap.s3_prefix.encode() + b"\x00" |
| 176 | b += dirmap.gz_compress.to_bytes(1, byteorder=ENDIANNESS) |
| 177 | b += dirmap.recursive.to_bytes(1, byteorder=ENDIANNESS) |
| 178 | b += dirmap.gpg_enabled.to_bytes(1, byteorder=ENDIANNESS) |
| 179 | if dirmap.gpg_enabled: |
| 180 | b += dirmap.gpg_email.encode() + b"\x00" |
| 181 | b += CONTROL_BYTES["DIRECTORY_END"] |
| 182 | logger.debug( |
| 183 | f"Serialized directory map {dirmap.local_path} <=> s3://{bucket.bucket_name}/{dirmap.s3_prefix}, recursive={dirmap.recursive}, gzip={dirmap.gz_compress}, gpg_enabled={dirmap.gpg_enabled}, gpg_email={dirmap.gpg_email}" |
| 184 | ) |
| 185 |
|
| 186 | for fileobject in bucket.fileobjects: |
| 187 | b += CONTROL_BYTES["OBJECT_BEGIN"] |
| 188 | b += fileobject.key.encode() + b"\x00" |
| 189 | b += fileobject.modified.to_bytes(8, byteorder=ENDIANNESS) |
| 190 | if fileobject and len(fileobject.etag) == 16: |
| 191 | b += CONTROL_BYTES["ETAG_MD5"] |
| 192 | b += bytes.fromhex(fileobject.etag) |
| 193 | else: |
| 194 | b += CONTROL_BYTES["ETAG_OTHER"] |
| 195 | b += fileobject.etag.encode() + b"\x00" |
| 196 | b += fileobject.size.to_bytes(8, byteorder=ENDIANNESS) |
| 197 | b += CONTROL_BYTES["OBJECT_END"] |
| 198 |
|
| 199 | b += CONTROL_BYTES["BUCKET_END"] |
| 200 |
|
| 201 | logger.debug("Writing serialized state information to syncfile") |
| 202 | f = open(self.file_path, "wb") |
| 203 | f.seek(0) |
| 204 | f.write(b) |
| 205 | f.truncate() |
| 206 | f.close() |
| 207 | logger.debug(f"Finished writing to file (length {len(b)})") |
| 208 |
|
| 209 | def deserialize(self): |
| 210 | if not self.file_exists(): |
| 211 | logger.error("Attempt to deserialize file that doesn't exist") |
| 212 | exit(1) |
| 213 |
|
| 214 | self.file_size = os.path.getsize(self.file_path) |
| 215 | f = open(self.file_path, "rb") |
| 216 | logger.debug(f"Deserializing file {f}") |
| 217 | f.seek(0) |
| 218 |
|
| 219 | def get_string(): |
| 220 | return b"".join(iter(lambda: f.read(1), b"\x00")).decode() |
| 221 |
|
| 222 | b = f.read(4) |
| 223 | if b != CONTROL_BYTES["SIGNATURE"]: |
| 224 | logger.error( |
| 225 | "File signature does not match expected s3state file signature (not an s3sync file format or file corrupted)" |
| 226 | ) |
| 227 | exit(1) |
| 228 |
|
| 229 | self.file_version = int.from_bytes(f.read(1), byteorder=ENDIANNESS) |
| 230 | if self.file_version == 0 or self.file_version > CURRENT_VERSION: |
| 231 | logger.error( |
| 232 | f"File version outside expected range (1..{CURRENT_VERSION}) (corrupt file)" |
| 233 | ) |
| 234 | exit(1) |
| 235 | logger.debug(f"File is version {self.file_version}") |
| 236 |
|
| 237 | b = f.read(1) |
| 238 | if b != CONTROL_BYTES["METADATA_BEGIN"]: |
| 239 | logger.error("Expected metadata block begin byte not found (corrupt file)") |
| 240 | exit(1) |
| 241 | self.last_synced_time = int.from_bytes(f.read(8), byteorder=ENDIANNESS) |
| 242 | logger.debug(f"Last synced time reported as {self.last_synced_time}") |
| 243 |
|
| 244 | b = f.read(1) |
| 245 | if b != CONTROL_BYTES["METADATA_END"]: |
| 246 | logger.error("Expected metadata block end byte not found (corrupt file)") |
| 247 | exit(1) |
| 248 |
|
| 249 | while b := f.read(1): |
| 250 | if b != CONTROL_BYTES["BUCKET_BEGIN"]: |
| 251 | logger.error(b"Unexpected control byte detected (corrupt file)") |
| 252 | exit(1) |
| 253 | bucket_name = get_string() |
| 254 | bucket = sync_managed_bucket(bucket_name) |
| 255 | self.managed_buckets.append(bucket) |
| 256 |
|
| 257 | logger.debug(f"Bucket {bucket_name}") |
| 258 |
|
| 259 | while b2 := f.read(1): |
| 260 | if b2 == CONTROL_BYTES["DIRECTORY_BEGIN"]: |
| 261 | local_path = get_string() |
| 262 | s3_prefix = get_string() |
| 263 | gz_compress = int.from_bytes(f.read(1), byteorder=ENDIANNESS) |
| 264 | recursive = bool.from_bytes(f.read(1), byteorder=ENDIANNESS) |
| 265 | gpg_enabled = bool.from_bytes(f.read(1), byteorder=ENDIANNESS) |
| 266 | gpg_email = "" |
| 267 | if gpg_enabled: |
| 268 | gpg_email = get_string() |
| 269 | if f.read(1) != CONTROL_BYTES["DIRECTORY_END"]: |
| 270 | logger.error( |
| 271 | "Expected directory block end byte not found (corrupt file)" |
| 272 | ) |
| 273 | exit(1) |
| 274 | bucket.create_dirmap( |
| 275 | local_path, |
| 276 | s3_prefix, |
| 277 | gz_compress, |
| 278 | recursive, |
| 279 | gpg_enabled, |
| 280 | gpg_email, |
| 281 | ) |
| 282 | logger.debug( |
| 283 | f"Deserialized directory map {local_path} <=> s3://{bucket.bucket_name}/{s3_prefix}, recursive={recursive}, gzip={gz_compress}, gpg_enabled={gpg_enabled}, gpg_email={gpg_email}" |
| 284 | ) |
| 285 |
|
| 286 | elif b2 == CONTROL_BYTES["OBJECT_BEGIN"]: |
| 287 | key = get_string() |
| 288 | modified = int.from_bytes(f.read(8), byteorder=ENDIANNESS) |
| 289 | etag_type = f.read(1) |
| 290 | etag = "" |
| 291 | if etag_type == CONTROL_BYTES["ETAG_MD5"]: |
| 292 | etag = f.read(16) |
| 293 | elif etag_type == CONTROL_BYTES["ETAG_OTHER"]: |
| 294 | etag = get_string() |
| 295 | file_size = int.form_bytes(f.read(8), byteorder=ENDIANNESS) |
| 296 | if f.read(1) != CONTROL_BYTES["OBJECT_END"]: |
| 297 | logger.error( |
| 298 | "Expected fileobject block end byte not found (corrupt file)" |
| 299 | ) |
| 300 | exit(1) |
| 301 | bucket.create_fileobject(key, modified, etag, file_size) |
| 302 | logger.debug( |
| 303 | f"({bucket_name}) Created fileobject {key} with ETAG {etag}, file size {file_size}, last modified {modified}" |
| 304 | ) |
| 305 |
|
| 306 | elif b2 == CONTROL_BYTES["BUCKET_END"]: |
| 307 | break |
| 308 |
|
| 309 | else: |
| 310 | logger.error("Unexpected control byte detected (corrupt file)") |
| 311 |
|
| 312 | f.close() |
| 313 |
|