from core.Constants import Constants from core.Errors import CommandNotFoundError, PolicyAssignmentError, PolicyInstatementError, PolicyRevocationError from packaging import version from packaging.version import InvalidVersion from subprocess import CalledProcessError import os import pwd import re import shutil import subprocess class PrivilegePolicyController: @staticmethod def preview(): username = PrivilegePolicyController.__determine_username() return PrivilegePolicyController.__generate(username) @staticmethod def instate(): if shutil.which('pkexec') is None: raise CommandNotFoundError('pkexec') if not PrivilegePolicyController.__is_compatible(): raise PolicyInstatementError('The privilege policy is not compatible.') username = PrivilegePolicyController.__determine_username() privilege_policy = PrivilegePolicyController.__generate(username) completed_successfully = False failed_attempt_count = 0 while not completed_successfully and failed_attempt_count < 3: process = subprocess.Popen([ 'pkexec', 'install', '/dev/stdin', Constants.HV_PRIVILEGE_POLICY_PATH, '-o', 'root', '-m', '440' ], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) process.communicate(f'{privilege_policy}\n') completed_successfully = (process.returncode == 0) if not completed_successfully: failed_attempt_count += 1 if not completed_successfully: raise PolicyInstatementError('The privilege policy could not be instated.') @staticmethod def revoke(): if shutil.which('pkexec') is None: raise CommandNotFoundError('pkexec') process = subprocess.Popen(('pkexec', 'rm', Constants.HV_PRIVILEGE_POLICY_PATH)) completed_successfully = not bool(os.waitpid(process.pid, 0)[1] >> 8) if not completed_successfully: raise PolicyRevocationError('The privilege policy could not be revoked.') @staticmethod def is_instated(): return os.path.exists(Constants.HV_PRIVILEGE_POLICY_PATH) @staticmethod def __determine_username(): try: password_database_entry = pwd.getpwuid(os.geteuid()) except (OSError, KeyError): raise PolicyAssignmentError('The privilege policy could not be assigned to the current user.') if password_database_entry.pw_uid == 0: raise PolicyAssignmentError('The privilege policy could not be assigned to the current user.') return password_database_entry.pw_name @staticmethod def __generate(username: str): return '\n'.join([ f'{username} ALL=(root) NOPASSWD: /usr/bin/wg-quick ^up {Constants.HV_SYSTEM_PROFILE_CONFIG_PATH}/[0-9]+/wg.conf$' ]) @staticmethod def __is_compatible(): try: process_output = subprocess.check_output(['sudo', '-V'], text=True) except CalledProcessError: return False if process_output.splitlines(): sudo_version_details = process_output.splitlines()[0].strip() else: return False sudo_version_number = (m := re.search(r'(\d[0-9.]+?)(?=p|$)', sudo_version_details)) and m.group(1) if not sudo_version_number: return False try: sudo_version = version.parse(sudo_version_number) except InvalidVersion: return False return sudo_version >= version.parse('1.9.10') and os.path.isfile('/usr/bin/wg-quick')