Index

s3-bsync / 0fae871

Bidirectional syncing tool to sync local filesystem directories with S3 buckets. (Incomplete)

Latest Commit

{#}TimeHashSubjectAuthor#(+)(-)GPG?
1316 Jun 2022 21:050fae871Update serialized data handlersJosh Stockin19120G

Blob @ s3-bsync / src / syncfile.py

application/x-python5659 bytesdownload raw
1# s3-bsync Copyright (c) 2022 Joshua Stockin
2# <https://joshstock.in>
3# <https://git.joshstock.in/s3-bsync>
4#
5# This software is licensed and distributed under the terms of the MIT License.
6# See the MIT License in the LICENSE file of this project's root folder.
7#
8# This comment block and its contents, including this disclaimer, MUST be
9# preserved in all copies or distributions of this software's source.
10
11import os
12import time
13import logging
14
15from .classes import *
16
17logger = logging.getLogger(__name__)
18
19__all__ = ["syncfile"]
20
21
22CONTROL_BYTES = {
23 "SIGNATURE": b"\x9D\x9F\x53\x33",
24 "BUCKET_BEGIN": b"\x90",
25 "BUCKET_END": b"\x91",
26 "DIRECTORY_BEGIN": b"\x92",
27 "DIRECTORY_END": b"\x93",
28 "OBJECT_BEGIN": b"\x94",
29 "OBJECT_END": b"\x95",
30 "ETAG_MD5": b"\x96",
31 "ETAG_OTHER": b"\x97",
32 "METADATA_BEGIN": b"\x9A",
33 "METADATA_END": b"\x9B",
34}
35
36CURRENT_VERSION = 1
37ENDIANNESS = "little"
38
39
40class syncfile:
41 file_path = None
42 file_version = 0
43 last_synced_time = 0
44 managed_buckets = {}
45
46 def __init__(self, state_file: str):
47 self.file_path = state_file
48
49 def file_exists(self):
50 if os.path.exists(self.file_path) and not os.path.isdir(self.file_path):
51 return True
52 return False
53
54 def purge(self):
55 if self.file_exists():
56 if self.verify_file():
57 os.remove(self.file_path)
58 else:
59 logger.error("Attempt to purge (delete) a non-s3sync file")
60 exit(1)
61 else:
62 logger.debug("File already nonexistent")
63
64 def verify_file(self):
65 if not self.file_exists():
66 return False
67 f = open(self.file_path, "rb")
68 f.seek(0)
69 b = f.read(4)
70 f.close()
71 if b != CONTROL_BYTES["SIGNATURE"]:
72 return False
73 return True
74
75 def serialize(self):
76 logger.debug("Compiling bytearray")
77
78 b = bytearray()
79
80 b += CONTROL_BYTES["SIGNATURE"]
81 b += CURRENT_VERSION.to_bytes(1, byteorder=ENDIANNESS)
82
83 b += CONTROL_BYTES["METADATA_BEGIN"]
84 current_time = time.time_ns() // 1000000
85 b += current_time.to_bytes(8, byteorder=ENDIANNESS)
86 b += CONTROL_BYTES["METADATA_END"]
87
88 for bucket in self.managed_buckets:
89 b += CONTROL_BYTES["BUCKET_BEGIN"]
90 b += bucket.bucket_name.encode()
91
92 for dirmap in bucket.directory_maps:
93 b += CONTROL_BYTES["DIRECTORY_BEGIN"]
94 b += dirmap.local_path.encode() + b"\x00"
95 b += dirmap.s3_prefix.encode() + b"\x00"
96 b += dirmap.gz_compress.to_bytes(1, byteorder=ENDIANNESS)
97 b += dirmap.recursive.to_bytes(1, byteorder=ENDIANNESS)
98 b += dirmap.gpg_enabled.to_bytes(1, byteorder=ENDIANNESS)
99 if dirmap.gpg_enabled:
100 b += dirmap.gpg_email.encode() + b"\x00"
101 b += CONTROL_BYTES["DIRECTORY_END"]
102
103 for fileobject in bucket.fileobjects:
104 b += CONTROL_BYTES["OBJECT_BEGIN"]
105 b += fileobject.key.encode() + b"\x00"
106 b += fileobject.modified.to_bytes(8, byteorder=ENDIANNESS)
107 if fileobject and len(fileobject.etag) == 16:
108 b += CONTROL_BYTES["ETAG_MD5"]
109 b += bytes.fromhex(fileobject.etag)
110 else:
111 b += CONTROL_BYTES["ETAG_OTHER"]
112 b += fileobject.etag.encode() + b"\x00"
113 b += fileobject.size.to_bytes(8, byteorder=ENDIANNESS)
114 b += CONTROL_BYTES["OBJECT_END"]
115
116 b += CONTROL_BYTES["BUCKET_END"]
117
118 logger.debug("Writing serialized state information to syncfile")
119 f = open(self.file_path, "wb")
120 f.seek(0)
121 f.write(b)
122 f.truncate()
123 f.close()
124
125 def deserialize(self):
126 if not self.file_exists():
127 logger.error("Attempt to deserialize file that doesn't exist")
128 exit(1)
129
130 f = open(self.file_path, "rb")
131 logger.debug(f"Deserializing file {f}")
132 f.seek(0)
133
134 def get_string():
135 return "".join(iter(lambda: f.read(1), "\x00"))
136
137 b = f.read(4)
138 if b != CONTROL_BYTES["SIGNATURE"]:
139 logger.error(
140 "File signature does not match expected s3state file signature (not an s3sync file format or file corrupted)"
141 )
142 exit(1)
143
144 self.file_version = int.from_bytes(f.read(1), byteorder=ENDIANNESS)
145 if self.file_version == 0 or self.file_version > CURRENT_VERSION:
146 logger.error(
147 f"File version outside expected range (1..{CURRENT_VERSION}) (corrupt file)"
148 )
149 exit(1)
150 logger.debug(f"File is version {self.file_version}")
151
152 b = f.read(1)
153 if b != CONTROL_BYTES["METADATA_BEGIN"]:
154 logger.error("Expected metadata block begin byte not found (corrupt file)")
155 exit(1)
156 self.last_synced_time = int.from_bytes(f.read(8), byteorder=ENDIANNESS)
157 logger.debug(f"Last synced time reported as {self.last_synced_time}")
158
159 b = f.read(1)
160 if b != CONTROL_BYTES["METADATA_END"]:
161 logger.error("Expected metadata block end byte not found (corrupt file)")
162 exit(1)
163
164 while b := f.read(1):
165 if b != CONTROL_BYTES["BUCKET_BEGIN"]:
166 logger.error(b"Unexpected control byte detected (corrupt file)")
167 exit(1)
168 bucket_name = get_string()
169 bucket = classes.sync_managed_bucket(bucket_name)
170
171 f.close()
172