#!/usr/bin/python3 import os import uuid import zlib import struct import random import pathlib import argparse import cryptography.hazmat.backends as backends import cryptography.hazmat.primitives.ciphers as ciphers class Encryptor(): def __init__(self): default_backend = backends.default_backend() self.iv = os.urandom(16) self.key = os.urandom(16) self.cipher = ciphers.Cipher( ciphers.algorithms.AES(self.key), ciphers.modes.CBC(self.iv), backend=default_backend ) def __str__(self): return f'[{self.iv.hex()}|{self.key.hex()}]' def encrypt(self, text: 'bytes'): encryptor = self.cipher.encryptor() return (encryptor.update(text) + encryptor.finalize()) def get_encoded_cipherinfo(self): keys = list((self.iv + self.key)) keys.reverse() return bytes(keys) def pad(b: 'bytes', i: 'int'): n = (i - (len(b) % i)) return (b + bytes(([n] * n))) class EncryptedArchiveEntry(): def __init__(self, archive_entry: 'ArchiveEntry'): self.archive_entry = archive_entry self.encryptor = Encryptor() self.mapping = {} def __str__(self): return f'[{self.archive_entry.get_uuid}|{self.encryptor}]' def serialize_key(self): if (not self.mapping): raise RuntimeError('Serializing key before data...') output = self.archive_entry.get_uuid.bytes output += self.encryptor.get_encoded_cipherinfo() output += struct.pack('I', len(self.mapping)) for (leftside, rightside) in self.mapping.items(): output += struct.pack('2I', leftside, rightside) return output def encrypt(self): with self.archive_entry.f.open('rb') as f: contents = f.read() compressed = pad(zlib.compress(contents), 16) encrypted = pad(self.encryptor.encrypt(compressed), 128) blocks = (len(encrypted) // 128) new_block_positions = list(range(blocks)) random.shuffle(new_block_positions) for (leftside, rightside) in zip(list(range(blocks)), new_block_positions): self.mapping[leftside] = rightside x = [encrypted[(n * 128):((n + 1) * 128)] for n in range(blocks)] b = bytes([]) for r in range(blocks): b += x[self.mapping[r]] return b class ArchiveEntry(): def __init__(self, file: 'pathlib.Path'): self.file = file self.uuid = uuid.uuid4() self.stat = file.stat() def __str__(self): return f'[{self.uuid}|{self.file}]' @property def get_uuid(self): return self.uuid @property def f(self): return self.file @property def tostr(self): return str(self.file) @property def size(self): return self.stat.st_size @property def mode(self): return self.stat.st_mode @property def st_atime(self): return self.stat.st_atime @property def st_mtime(self): return self.stat.st_mtime @property def st_ctime(self): return self.stat.st_ctime def get_metadata(self): filename = (self.tostr.encode() + bytes([0])) output = struct.pack('I', len(filename)) output += filename output += self.get_uuid.bytes output += struct.pack('2I', self.size, self.mode) output += struct.pack('3d', self.st_atime, self.st_mtime, self.st_ctime) return output class KeyStore(): header = b'L0LKSTR\x00' def __init__(self): self.entries = [] def get_encrypted_archive_entry(self, archive_entry: 'ArchiveEntry'): encrypted_archive_entry = EncryptedArchiveEntry(archive_entry) print(encrypted_archive_entry) self.entries.append(encrypted_archive_entry) return encrypted_archive_entry def write(self, path: 'pathlib.Path'): output = KeyStore.header output += struct.pack('I', len(self.entries)) for encrypted_archive_entry in self.entries: output += encrypted_archive_entry.serialize_key() path.joinpath('keystore').write_bytes(output) class Archive(): header = b'L0LARCH\x00' size_limit = 1048576 def __init__(self): self.entries = [] self.keystore = KeyStore() def add_file(self, path: 'pathlib.Path'): archive_entry = ArchiveEntry(path) print(archive_entry) if (archive_entry.size > Archive.size_limit): raise RuntimeError(f'{archive_entry.f} size is above the limit ({Archive.size_limit})!') encrypted_archive_entry = self.keystore.get_encrypted_archive_entry(archive_entry) self.entries.append((archive_entry, encrypted_archive_entry)) def write(self, path: 'pathlib.Path'): output = Archive.header output += struct.pack('I', len(self.entries)) for (archive_entry, encrypted_archive_entry) in self.entries: print(f'adding {archive_entry.f}...') output += archive_entry.get_metadata() encrypted_contents = encrypted_archive_entry.encrypt() output += struct.pack('I', len(encrypted_contents)) output += encrypted_contents path.joinpath('archive').write_bytes(output) self.keystore.write(path) class DirectoryEnumerator(): def __init__(self, path: 'pathlib.Path'): self.path = pathlib.Path(path) def get_file_list(self, recursive: 'bool'): files = list(self.path.glob('*')) if recursive: files = list(self.path.rglob('*')) return list(filter((lambda f: f.is_file()), files)) def parse_args(): argparser = argparse.ArgumentParser(description='') argparser.add_argument('d') argparser.add_argument('o') return argparser.parse_args() def run(): args = parse_args() archive = Archive() d = DirectoryEnumerator(args.d) o = pathlib.Path(args.o) for file in d.get_file_list(True): archive.add_file(file) o.mkdir(parents=True, exist_ok=True) archive.write(o) if (__name__ == '__main__'): run()