From 7cf9fdf0b3ea7562a90db6b5827776872724b8cb Mon Sep 17 00:00:00 2001 From: codeking Date: Tue, 24 Sep 2024 05:12:06 +0200 Subject: [PATCH] Assign system WireGuard configurations to a secure location --- core/Constants.py | 6 +++ core/Errors.py | 8 ++++ core/models/BaseProfile.py | 26 ++++-------- core/models/session/SessionProfile.py | 15 +++++++ core/models/system/SystemProfile.py | 58 +++++++++++++++++++++++++++ 5 files changed, 94 insertions(+), 19 deletions(-) diff --git a/core/Constants.py b/core/Constants.py index b0cf4d1..201c1ce 100644 --- a/core/Constants.py +++ b/core/Constants.py @@ -10,14 +10,20 @@ class Constants: HOME: Final[str] = os.path.expanduser('~') + SYSTEM_CONFIG_PATH: Final[str] = '/etc' + CONFIG_HOME: Final[str] = os.environ.get('XDG_CONFIG_HOME') or os.path.join(HOME, '.config') DATA_HOME: Final[str] = os.environ.get('XDG_DATA_HOME') or os.path.join(HOME, '.local/share') STATE_HOME: Final[str] = os.environ.get('XDG_STATE_HOME') or os.path.join(HOME, '.local/state') + SP_SYSTEM_CONFIG_PATH: Final[str] = f'{SYSTEM_CONFIG_PATH}/simplified-privacy' + SP_CONFIG_HOME: Final[str] = f'{CONFIG_HOME}/simplified-privacy' SP_DATA_HOME: Final[str] = f'{DATA_HOME}/simplified-privacy' SP_STATE_HOME: Final[str] = f'{STATE_HOME}/simplified-privacy' + SP_SYSTEM_PROFILE_CONFIG_PATH: Final[str] = f'{SP_SYSTEM_CONFIG_PATH}/profiles' + SP_PROFILE_CONFIG_HOME: Final[str] = f'{SP_CONFIG_HOME}/profiles' SP_PROFILE_DATA_HOME: Final[str] = f'{SP_DATA_HOME}/profiles' diff --git a/core/Errors.py b/core/Errors.py index 3c5b3c4..eb14ae6 100644 --- a/core/Errors.py +++ b/core/Errors.py @@ -6,6 +6,14 @@ class ConnectionTerminationError(Exception): pass +class ProfileDeletionError(Exception): + pass + + +class ProfileModificationError(Exception): + pass + + class ProfileStateConflictError(Exception): pass diff --git a/core/models/BaseProfile.py b/core/models/BaseProfile.py index 16a8d07..0150e33 100644 --- a/core/models/BaseProfile.py +++ b/core/models/BaseProfile.py @@ -31,21 +31,6 @@ class BaseProfile: def has_subscription(self): return self.subscription is not None - def attach_wireguard_configuration(self, wireguard_configuration): - - wireguard_configuration_file_path = self.get_wireguard_configuration_path() - - with open(wireguard_configuration_file_path, 'w') as wireguard_configuration_file: - - wireguard_configuration_file.write(wireguard_configuration) - wireguard_configuration_file.close() - - def get_wireguard_configuration_path(self): - return f'{self.get_config_path()}/wg.conf' - - def has_wireguard_configuration(self): - return os.path.exists(f'{self.get_config_path()}/wg.conf') - def is_session_profile(self): return type(self).__name__ == 'SessionProfile' @@ -53,12 +38,15 @@ class BaseProfile: return type(self).__name__ == 'SystemProfile' def delete_data(self): - shutil.rmtree(BaseProfile.__get_data_path(self.id)) + shutil.rmtree(BaseProfile.__get_data_path(self.id), ignore_errors=True) def delete(self): + self._delete() - shutil.rmtree(BaseProfile.__get_config_path(self.id)) - shutil.rmtree(BaseProfile.__get_data_path(self.id)) + def _delete(self): + + shutil.rmtree(BaseProfile.__get_config_path(self.id), ignore_errors=True) + self.delete_data() @staticmethod def find_by_id(id: int): @@ -136,7 +124,7 @@ class BaseProfile: persistent_state_path = f'{BaseProfile.__get_data_path(profile.id)}/persistent-state' if os.path.isdir(persistent_state_path): - shutil.rmtree(persistent_state_path) + shutil.rmtree(persistent_state_path, ignore_errors=True) config_file_contents = f'{profile.to_json(indent=4)}\n' diff --git a/core/models/session/SessionProfile.py b/core/models/session/SessionProfile.py index 26d03bc..7a1b665 100644 --- a/core/models/session/SessionProfile.py +++ b/core/models/session/SessionProfile.py @@ -28,9 +28,21 @@ class SessionProfile(BaseProfile): proxy_configuration_file.write(proxy_configuration_file_contents) proxy_configuration_file.close() + def attach_wireguard_configuration(self, wireguard_configuration): + + wireguard_configuration_file_path = self.get_wireguard_configuration_path() + + with open(wireguard_configuration_file_path, 'w') as wireguard_configuration_file: + + wireguard_configuration_file.write(wireguard_configuration) + wireguard_configuration_file.close() + def get_proxy_configuration_path(self): return f'{self.get_config_path()}/proxy.json' + def get_wireguard_configuration_path(self): + return f'{self.get_config_path()}/wg.conf' + def get_proxy_configuration(self): try: @@ -49,3 +61,6 @@ class SessionProfile(BaseProfile): def has_proxy_configuration(self): return os.path.exists(f'{self.get_config_path()}/proxy.json') + + def has_wireguard_configuration(self): + return os.path.exists(f'{self.get_config_path()}/wg.conf') diff --git a/core/models/system/SystemProfile.py b/core/models/system/SystemProfile.py index 7d9e2e3..979a5db 100644 --- a/core/models/system/SystemProfile.py +++ b/core/models/system/SystemProfile.py @@ -1,9 +1,67 @@ +from core.Constants import Constants +from core.Errors import ProfileDeletionError, ProfileModificationError from core.models.BaseProfile import BaseProfile from core.models.system.SystemConnection import SystemConnection from dataclasses import dataclass from typing import Optional +import os +import subprocess @dataclass class SystemProfile(BaseProfile): connection: Optional[SystemConnection] + + def get_system_config_path(self): + return self.__get_system_config_path(self.id) + + def attach_wireguard_configuration(self, wireguard_configuration): + + if subprocess.getstatusoutput('pkexec --help')[0] == 127: + raise OSError('The polkit toolkit does not appear to be installed.') + + wireguard_configuration_file_backup_path = f'{self.get_config_path()}/wg.conf.bak' + + with open(wireguard_configuration_file_backup_path, 'w') as wireguard_configuration_file: + + wireguard_configuration_file.write(wireguard_configuration) + wireguard_configuration_file.close() + + wireguard_configuration_is_attached = False + failed_attempt_count = 0 + + while not wireguard_configuration_is_attached and failed_attempt_count < 3: + + process = subprocess.Popen(('pkexec', 'install', '-D', wireguard_configuration_file_backup_path, self.get_wireguard_configuration_path(), '-o', 'root', '-m', '210')) + wireguard_configuration_is_attached = not bool(os.waitpid(process.pid, 0)[1] >> 8) + + if not wireguard_configuration_is_attached: + failed_attempt_count += 1 + + if not wireguard_configuration_is_attached: + raise ProfileModificationError('WireGuard configuration could not be attached.') + + def get_wireguard_configuration_path(self): + return f'{self.get_system_config_path()}/wg.conf' + + def has_wireguard_configuration(self): + return os.path.exists(f'{self.get_system_config_path()}/wg.conf') + + def delete(self): + + if self.has_wireguard_configuration(): + + if subprocess.getstatusoutput('pkexec --help')[0] == 127: + raise OSError('The polkit toolkit does not appear to be installed.') + + process = subprocess.Popen(('pkexec', 'rm', '-d', self.get_wireguard_configuration_path(), self.get_system_config_path())) + completed_successfully = not bool(os.waitpid(process.pid, 0)[1] >> 8) + + if not completed_successfully: + raise ProfileDeletionError('The profile could not be deleted.') + + self._delete() + + @staticmethod + def __get_system_config_path(id: int): + return f'{Constants.SP_SYSTEM_PROFILE_CONFIG_PATH}/{str(id)}'