sp-hydra-veil-core/core/models/system/SystemProfile.py

68 lines
2.5 KiB
Python

from core.Constants import Constants
from core.Errors import ProfileDeletionError, ProfileModificationError, CommandNotFoundError
from core.models.BaseProfile import BaseProfile
from core.models.system.SystemConnection import SystemConnection
from dataclasses import dataclass
from typing import Optional
import os
import shutil
import subprocess
@dataclass
class SystemProfile(BaseProfile):
connection: Optional[SystemConnection]
def get_system_config_path(self):
return self.__get_system_config_path(self.id)
def attach_wireguard_configuration(self, wireguard_configuration):
if shutil.which('pkexec') is None:
raise CommandNotFoundError('pkexec')
wireguard_configuration_file_backup_path = f'{self.get_config_path()}/wg.conf.bak'
with open(wireguard_configuration_file_backup_path, 'w') as wireguard_configuration_file:
wireguard_configuration_file.write(wireguard_configuration)
wireguard_configuration_file.close()
wireguard_configuration_is_attached = False
failed_attempt_count = 0
while not wireguard_configuration_is_attached and failed_attempt_count < 3:
process = subprocess.Popen(('pkexec', 'install', '-D', wireguard_configuration_file_backup_path, self.get_wireguard_configuration_path(), '-o', 'root', '-m', '210'))
wireguard_configuration_is_attached = not bool(os.waitpid(process.pid, 0)[1] >> 8)
if not wireguard_configuration_is_attached:
failed_attempt_count += 1
if not wireguard_configuration_is_attached:
raise ProfileModificationError('WireGuard configuration could not be attached.')
def get_wireguard_configuration_path(self):
return f'{self.get_system_config_path()}/wg.conf'
def has_wireguard_configuration(self):
return os.path.isfile(f'{self.get_system_config_path()}/wg.conf')
def delete(self):
if self.has_wireguard_configuration():
if shutil.which('pkexec') is None:
raise CommandNotFoundError('pkexec')
process = subprocess.Popen(('pkexec', 'rm', '-d', self.get_wireguard_configuration_path(), self.get_system_config_path()))
completed_successfully = not bool(os.waitpid(process.pid, 0)[1] >> 8)
if not completed_successfully:
raise ProfileDeletionError('The profile could not be deleted.')
super().delete()
@staticmethod
def __get_system_config_path(id: int):
return f'{Constants.HV_SYSTEM_PROFILE_CONFIG_PATH}/{str(id)}'