Assign system WireGuard configurations to a secure location

This commit is contained in:
codeking 2024-09-24 05:12:06 +02:00
parent c744d9329d
commit 7cf9fdf0b3
5 changed files with 94 additions and 19 deletions

View file

@ -10,14 +10,20 @@ class Constants:
HOME: Final[str] = os.path.expanduser('~') HOME: Final[str] = os.path.expanduser('~')
SYSTEM_CONFIG_PATH: Final[str] = '/etc'
CONFIG_HOME: Final[str] = os.environ.get('XDG_CONFIG_HOME') or os.path.join(HOME, '.config') CONFIG_HOME: Final[str] = os.environ.get('XDG_CONFIG_HOME') or os.path.join(HOME, '.config')
DATA_HOME: Final[str] = os.environ.get('XDG_DATA_HOME') or os.path.join(HOME, '.local/share') DATA_HOME: Final[str] = os.environ.get('XDG_DATA_HOME') or os.path.join(HOME, '.local/share')
STATE_HOME: Final[str] = os.environ.get('XDG_STATE_HOME') or os.path.join(HOME, '.local/state') STATE_HOME: Final[str] = os.environ.get('XDG_STATE_HOME') or os.path.join(HOME, '.local/state')
SP_SYSTEM_CONFIG_PATH: Final[str] = f'{SYSTEM_CONFIG_PATH}/simplified-privacy'
SP_CONFIG_HOME: Final[str] = f'{CONFIG_HOME}/simplified-privacy' SP_CONFIG_HOME: Final[str] = f'{CONFIG_HOME}/simplified-privacy'
SP_DATA_HOME: Final[str] = f'{DATA_HOME}/simplified-privacy' SP_DATA_HOME: Final[str] = f'{DATA_HOME}/simplified-privacy'
SP_STATE_HOME: Final[str] = f'{STATE_HOME}/simplified-privacy' SP_STATE_HOME: Final[str] = f'{STATE_HOME}/simplified-privacy'
SP_SYSTEM_PROFILE_CONFIG_PATH: Final[str] = f'{SP_SYSTEM_CONFIG_PATH}/profiles'
SP_PROFILE_CONFIG_HOME: Final[str] = f'{SP_CONFIG_HOME}/profiles' SP_PROFILE_CONFIG_HOME: Final[str] = f'{SP_CONFIG_HOME}/profiles'
SP_PROFILE_DATA_HOME: Final[str] = f'{SP_DATA_HOME}/profiles' SP_PROFILE_DATA_HOME: Final[str] = f'{SP_DATA_HOME}/profiles'

View file

@ -6,6 +6,14 @@ class ConnectionTerminationError(Exception):
pass pass
class ProfileDeletionError(Exception):
pass
class ProfileModificationError(Exception):
pass
class ProfileStateConflictError(Exception): class ProfileStateConflictError(Exception):
pass pass

View file

@ -31,21 +31,6 @@ class BaseProfile:
def has_subscription(self): def has_subscription(self):
return self.subscription is not None return self.subscription is not None
def attach_wireguard_configuration(self, wireguard_configuration):
wireguard_configuration_file_path = self.get_wireguard_configuration_path()
with open(wireguard_configuration_file_path, 'w') as wireguard_configuration_file:
wireguard_configuration_file.write(wireguard_configuration)
wireguard_configuration_file.close()
def get_wireguard_configuration_path(self):
return f'{self.get_config_path()}/wg.conf'
def has_wireguard_configuration(self):
return os.path.exists(f'{self.get_config_path()}/wg.conf')
def is_session_profile(self): def is_session_profile(self):
return type(self).__name__ == 'SessionProfile' return type(self).__name__ == 'SessionProfile'
@ -53,12 +38,15 @@ class BaseProfile:
return type(self).__name__ == 'SystemProfile' return type(self).__name__ == 'SystemProfile'
def delete_data(self): def delete_data(self):
shutil.rmtree(BaseProfile.__get_data_path(self.id)) shutil.rmtree(BaseProfile.__get_data_path(self.id), ignore_errors=True)
def delete(self): def delete(self):
self._delete()
shutil.rmtree(BaseProfile.__get_config_path(self.id)) def _delete(self):
shutil.rmtree(BaseProfile.__get_data_path(self.id))
shutil.rmtree(BaseProfile.__get_config_path(self.id), ignore_errors=True)
self.delete_data()
@staticmethod @staticmethod
def find_by_id(id: int): def find_by_id(id: int):
@ -136,7 +124,7 @@ class BaseProfile:
persistent_state_path = f'{BaseProfile.__get_data_path(profile.id)}/persistent-state' persistent_state_path = f'{BaseProfile.__get_data_path(profile.id)}/persistent-state'
if os.path.isdir(persistent_state_path): if os.path.isdir(persistent_state_path):
shutil.rmtree(persistent_state_path) shutil.rmtree(persistent_state_path, ignore_errors=True)
config_file_contents = f'{profile.to_json(indent=4)}\n' config_file_contents = f'{profile.to_json(indent=4)}\n'

View file

@ -28,9 +28,21 @@ class SessionProfile(BaseProfile):
proxy_configuration_file.write(proxy_configuration_file_contents) proxy_configuration_file.write(proxy_configuration_file_contents)
proxy_configuration_file.close() proxy_configuration_file.close()
def attach_wireguard_configuration(self, wireguard_configuration):
wireguard_configuration_file_path = self.get_wireguard_configuration_path()
with open(wireguard_configuration_file_path, 'w') as wireguard_configuration_file:
wireguard_configuration_file.write(wireguard_configuration)
wireguard_configuration_file.close()
def get_proxy_configuration_path(self): def get_proxy_configuration_path(self):
return f'{self.get_config_path()}/proxy.json' return f'{self.get_config_path()}/proxy.json'
def get_wireguard_configuration_path(self):
return f'{self.get_config_path()}/wg.conf'
def get_proxy_configuration(self): def get_proxy_configuration(self):
try: try:
@ -49,3 +61,6 @@ class SessionProfile(BaseProfile):
def has_proxy_configuration(self): def has_proxy_configuration(self):
return os.path.exists(f'{self.get_config_path()}/proxy.json') return os.path.exists(f'{self.get_config_path()}/proxy.json')
def has_wireguard_configuration(self):
return os.path.exists(f'{self.get_config_path()}/wg.conf')

View file

@ -1,9 +1,67 @@
from core.Constants import Constants
from core.Errors import ProfileDeletionError, ProfileModificationError
from core.models.BaseProfile import BaseProfile from core.models.BaseProfile import BaseProfile
from core.models.system.SystemConnection import SystemConnection from core.models.system.SystemConnection import SystemConnection
from dataclasses import dataclass from dataclasses import dataclass
from typing import Optional from typing import Optional
import os
import subprocess
@dataclass @dataclass
class SystemProfile(BaseProfile): class SystemProfile(BaseProfile):
connection: Optional[SystemConnection] connection: Optional[SystemConnection]
def get_system_config_path(self):
return self.__get_system_config_path(self.id)
def attach_wireguard_configuration(self, wireguard_configuration):
if subprocess.getstatusoutput('pkexec --help')[0] == 127:
raise OSError('The polkit toolkit does not appear to be installed.')
wireguard_configuration_file_backup_path = f'{self.get_config_path()}/wg.conf.bak'
with open(wireguard_configuration_file_backup_path, 'w') as wireguard_configuration_file:
wireguard_configuration_file.write(wireguard_configuration)
wireguard_configuration_file.close()
wireguard_configuration_is_attached = False
failed_attempt_count = 0
while not wireguard_configuration_is_attached and failed_attempt_count < 3:
process = subprocess.Popen(('pkexec', 'install', '-D', wireguard_configuration_file_backup_path, self.get_wireguard_configuration_path(), '-o', 'root', '-m', '210'))
wireguard_configuration_is_attached = not bool(os.waitpid(process.pid, 0)[1] >> 8)
if not wireguard_configuration_is_attached:
failed_attempt_count += 1
if not wireguard_configuration_is_attached:
raise ProfileModificationError('WireGuard configuration could not be attached.')
def get_wireguard_configuration_path(self):
return f'{self.get_system_config_path()}/wg.conf'
def has_wireguard_configuration(self):
return os.path.exists(f'{self.get_system_config_path()}/wg.conf')
def delete(self):
if self.has_wireguard_configuration():
if subprocess.getstatusoutput('pkexec --help')[0] == 127:
raise OSError('The polkit toolkit does not appear to be installed.')
process = subprocess.Popen(('pkexec', 'rm', '-d', self.get_wireguard_configuration_path(), self.get_system_config_path()))
completed_successfully = not bool(os.waitpid(process.pid, 0)[1] >> 8)
if not completed_successfully:
raise ProfileDeletionError('The profile could not be deleted.')
self._delete()
@staticmethod
def __get_system_config_path(id: int):
return f'{Constants.SP_SYSTEM_PROFILE_CONFIG_PATH}/{str(id)}'