-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #28 from lil-skelly/refactor-server
[IMP] Refactored server
- Loading branch information
Showing
7 changed files
with
160 additions
and
175 deletions.
There are no files selected for viewing
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,191 +1,126 @@ | ||
import os | ||
import io | ||
from typing import Optional | ||
import logging | ||
import secrets | ||
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes | ||
import utils | ||
import os | ||
from typing import Optional | ||
|
||
from cryptography.hazmat.primitives.ciphers import algorithms | ||
|
||
from fraction import Fraction | ||
import utils | ||
|
||
class Fractionator: | ||
|
||
class Fractionator(utils.AES_CFB_HELPER): | ||
MAGIC: int = 0xDEADBEEF | ||
CHUNK_SIZE: int = 8192 | ||
FRACTION_PATH_LEN = 16 | ||
|
||
def __init__( | ||
self, path: str, out_path: str, key: bytes | ||
) -> None: | ||
"""Class to handle loading/preparation of a Fractionator object file to feed to the loader""" | ||
self._path: str = path # Path to LKM object file | ||
self._out_path: str = out_path # Path to store generated fractions | ||
self._fractions: list[Fraction] = [] # Keep track of the fraction objects | ||
self.fraction_paths: list[str] = [] # Book-keeping of fraction filenames for cleanup | ||
# I/O | ||
FRACTION_PATH_LEN: int = 16 | ||
algorithm = algorithms.AES256 | ||
|
||
def __init__(self, file_path: str, out_path: str, key: bytes) -> None: | ||
"""Prepare a Fractionator object for reading and generating fractions.""" | ||
self.file_path: str = file_path | ||
self.file_size: int = 0 | ||
self.out_path: str = out_path | ||
|
||
self._fractions: list[Fraction] = [] | ||
self.fraction_paths: list[str] = [] | ||
|
||
self._buf_reader: Optional[io.BufferedReader] = None | ||
# AES-256 related instance attributes | ||
self._iv: Optional[bytes] = None # AES-256 initialization vector | ||
self._key: Optional[bytes] = Fractionator.validate_aes_key( | ||
key | ||
) # AES-256 cryptographic key | ||
self._mode = modes.CFB | ||
self._algorithm = algorithms.AES256 | ||
self._cipher = None | ||
|
||
@property | ||
def cipher(self) -> Cipher: | ||
if not self._cipher: | ||
if not self._key or not self._iv: | ||
raise ValueError(f"Missing key or IV (_key:{self._key}, _iv:{self._iv})") | ||
|
||
self._cipher = Cipher(self._algorithm(self._key), self._mode(self._iv)) | ||
|
||
return self._cipher | ||
|
||
super().__init__(key, self.algorithm) | ||
|
||
def open_reading_stream(self) -> None: | ||
""" | ||
Opens a reading stream to the file specified in self._path. | ||
If a stream is already open, this function has no effect | ||
""" | ||
try: | ||
if self._buf_reader is None or self._buf_reader.closed: | ||
self._buf_reader = open(self._path, "rb") | ||
logging.debug(f"Opened reading stream to {self._path}.") | ||
return | ||
except FileNotFoundError as err: | ||
logging.error(f"File not found: {self._path}") | ||
raise err | ||
"""Open a stream for reading the object file.""" | ||
if not self._buf_reader or self._buf_reader.closed: | ||
try: | ||
self._buf_reader = open(self.file_path, "rb") | ||
logging.debug(f"Opened stream to {self.file_path}.") | ||
except FileNotFoundError as err: | ||
logging.error(f"File not found: {self.file_path}") | ||
raise err | ||
|
||
def _make_fraction(self, index: int) -> None: | ||
"""Read from the object-file and generate a fraction""" | ||
if not isinstance(index, int): | ||
raise ValueError(f"index must be an integer (got `{type(index)}`)") | ||
# Open a stream to the file and read a chunk | ||
"""Generate and encrypt a fraction from the object file.""" | ||
self.open_reading_stream() | ||
data = self._buf_reader.read( | ||
Fractionator.CHUNK_SIZE | ||
) # don't use peek, as it does not advance the position in the file | ||
|
||
# generate iv and encrypt the chunk | ||
encrypted_data = self._encrypt_chunk(data) | ||
|
||
# Create a fraction instance and add it to self._fractions | ||
|
||
data = self._buf_reader.read(self.CHUNK_SIZE) | ||
encrypted_data = self.encrypt(data) | ||
|
||
fraction = Fraction( | ||
magic=Fractionator.MAGIC, index=index, iv=self._iv, data=encrypted_data | ||
magic=self.MAGIC, index=index, iv=self.get_iv(), data=encrypted_data | ||
) | ||
self._fractions.append(fraction) | ||
|
||
logging.debug(f"Created fraction #{fraction.index}") | ||
|
||
def _encrypt_chunk(self, data: bytes) -> bytes: | ||
self._iv = self._generate_iv() | ||
return self.do_aes_operation(data, True) | ||
|
||
def _generate_iv(self) -> bytes: | ||
return secrets.token_bytes(16) | ||
|
||
def make_fractions(self) -> None: | ||
"""Iterate through the Fractionator object file specified in self._path and generate Fraction objects""" | ||
size = os.path.getsize(self._path) | ||
num_chunks = (size + Fractionator.CHUNK_SIZE - 1) // Fractionator.CHUNK_SIZE | ||
"""Generate all fractions from the object file.""" | ||
self.file_size = ( | ||
os.path.getsize(self.file_path) if not self.file_size else self.file_size | ||
) | ||
|
||
logging.info(f"[info: make_fractions] Creating {num_chunks} fractions.") | ||
num_chunks = (self.file_size + self.CHUNK_SIZE - 1) // self.CHUNK_SIZE | ||
logging.info(f"Creating {num_chunks} fractions.") | ||
for i in range(num_chunks): | ||
self._make_fraction(i) | ||
|
||
def _write_fraction(self, fraction: Fraction): | ||
"""Write a fraction to a file""" | ||
def _write_fraction(self, fraction: Fraction) -> None: | ||
"""Write a fraction to a file.""" | ||
path = os.path.join( | ||
self._out_path, utils.random_string(Fractionator.FRACTION_PATH_LEN) | ||
self.out_path, utils.random_string(Fractionator.FRACTION_PATH_LEN) | ||
) | ||
|
||
with open(path, "wb") as f: | ||
header_data = fraction.header_to_bytes() | ||
data = fraction.data | ||
|
||
f.write(header_data) | ||
f.write(data) | ||
|
||
f.write(fraction.header_to_bytes()) | ||
f.write(fraction.data) | ||
self.fraction_paths.append(path) | ||
logging.debug(f"Wrote fraction #{fraction.index} to {path}") | ||
|
||
def write_fractions(self) -> None: | ||
"""Convert the fraction objects to pure bytes and write them in the appropriate directory (self._out)""" | ||
os.makedirs(self._out_path, exist_ok=True) # enusre backup directory exists | ||
"""Write all fractions to disk.""" | ||
os.makedirs(self.out_path, exist_ok=True) | ||
for fraction in self._fractions: | ||
self._write_fraction(fraction) | ||
|
||
def save_backup(self, backup_path: str): | ||
def save_backup(self, backup_path: str) -> None: | ||
"""Save fraction paths to a backup file.""" | ||
try: | ||
with open(backup_path, "a") as f: | ||
for path in self.fraction_paths: | ||
f.write(path + "\n") # Ensure each path is written on a new line | ||
f.writelines(f"{path}\n" for path in self.fraction_paths) | ||
logging.debug(f"Backup saved at {backup_path}.") | ||
except OSError as e: | ||
logging.error(f"Failed to save backup: {e}") | ||
|
||
def load_backup(self, backup_path: str): | ||
"""Load fraction paths from the backup file and initialize self.fraction_paths.""" | ||
def load_backup(self, backup_path: str) -> None: | ||
"""Load fraction paths from a backup file.""" | ||
try: | ||
with open(backup_path, "r") as f: | ||
self.fraction_paths = [line.strip() for line in f] | ||
logging.debug( | ||
f"Loaded {len(self.fraction_paths)} paths from backup." | ||
) | ||
|
||
logging.debug(f"Loaded {len(self.fraction_paths)} paths from backup.") | ||
except OSError as e: | ||
logging.error(f"Failed to load backup: {e}") | ||
return [] | ||
return | ||
|
||
def _clean_fraction(self, path: str): | ||
"""Delete a fraction file""" | ||
def _clean_fraction(self, path: str) -> None: | ||
"""Delete a fraction file.""" | ||
try: | ||
os.remove(path) | ||
logging.debug(f"Removed {path}.") | ||
except FileNotFoundError: | ||
logging.debug(f"File not found: {path}") | ||
|
||
def clean_fractions(self) -> None: | ||
logging.info("Cleaning fractions . . .") | ||
if not self.fraction_paths: | ||
logging.error("No fraction paths detected.") | ||
return | ||
|
||
"""Delete all written fractions.""" | ||
logging.info("Cleaning fractions...") | ||
for path in self.fraction_paths: | ||
self._clean_fraction(path) | ||
|
||
self.fraction_paths = [] | ||
logging.info("Done.") | ||
|
||
|
||
def do_aes_operation(self, data: bytes, op: bool) -> bytes: | ||
"""Perform an AES-256 operation on given data (encryption [op=True]/decryption [op=False])""" | ||
if not self._key or not self._iv: | ||
raise ValueError(f"Missing key or IV (_key:{self._key}, _iv:{self._iv})") | ||
|
||
cipher = self.cipher | ||
operator = cipher.encryptor() if op else cipher.decryptor() | ||
|
||
return operator.update(data) + operator.finalize() | ||
self.fraction_paths.clear() | ||
logging.info("Cleaning complete.") | ||
|
||
def close_stream(self) -> None: | ||
"""Closes the open stream to self._path and resets self._buf_rw_stream""" | ||
if isinstance(self._buf_reader, io.BufferedReader): | ||
"""Close the file stream if open.""" | ||
if self._buf_reader: | ||
self._buf_reader.close() | ||
self._buf_reader = None | ||
logging.debug(f"Closed stream to {self._path}.") | ||
return | ||
|
||
logging.debug(f"No stream was open.") | ||
|
||
@staticmethod | ||
def validate_aes_key(key: bytes) -> bytes: | ||
"""Check if key is a valid AES-256 key (32 bytes)""" | ||
if not isinstance(key, bytes) or len(key) != 32: | ||
raise ValueError( | ||
f"Invalid AES-256 key. (expected 32 bytes of `{bytes}`, got {len(key)} of `{type(key)}`)" | ||
) | ||
return key | ||
logging.debug(f"Closed stream to {self.file_path}.") | ||
|
||
def __del__(self): | ||
def __del__(self) -> None: | ||
self.close_stream() |
Oops, something went wrong.