from core.Constants import Constants from core.Errors import ProfileDeletionError, ProfileModificationError from core.models.BaseProfile import BaseProfile from core.models.system.SystemConnection import SystemConnection from dataclasses import dataclass from typing import Optional import os import subprocess @dataclass class SystemProfile(BaseProfile): connection: Optional[SystemConnection] def get_system_config_path(self): return self.__get_system_config_path(self.id) def attach_wireguard_configuration(self, wireguard_configuration): if subprocess.getstatusoutput('pkexec --help')[0] == 127: raise OSError('The polkit toolkit does not appear to be installed.') wireguard_configuration_file_backup_path = f'{self.get_config_path()}/wg.conf.bak' with open(wireguard_configuration_file_backup_path, 'w') as wireguard_configuration_file: wireguard_configuration_file.write(wireguard_configuration) wireguard_configuration_file.close() wireguard_configuration_is_attached = False failed_attempt_count = 0 while not wireguard_configuration_is_attached and failed_attempt_count < 3: process = subprocess.Popen(('pkexec', 'install', '-D', wireguard_configuration_file_backup_path, self.get_wireguard_configuration_path(), '-o', 'root', '-m', '210')) wireguard_configuration_is_attached = not bool(os.waitpid(process.pid, 0)[1] >> 8) if not wireguard_configuration_is_attached: failed_attempt_count += 1 if not wireguard_configuration_is_attached: raise ProfileModificationError('WireGuard configuration could not be attached.') def get_wireguard_configuration_path(self): return f'{self.get_system_config_path()}/wg.conf' def has_wireguard_configuration(self): return os.path.isfile(f'{self.get_system_config_path()}/wg.conf') def delete(self): if self.has_wireguard_configuration(): if subprocess.getstatusoutput('pkexec --help')[0] == 127: raise OSError('The polkit toolkit does not appear to be installed.') process = subprocess.Popen(('pkexec', 'rm', '-d', self.get_wireguard_configuration_path(), self.get_system_config_path())) completed_successfully = not bool(os.waitpid(process.pid, 0)[1] >> 8) if not completed_successfully: raise ProfileDeletionError('The profile could not be deleted.') self._delete() @staticmethod def __get_system_config_path(id: int): return f'{Constants.SP_SYSTEM_PROFILE_CONFIG_PATH}/{str(id)}'