110 lines
3.6 KiB
Python
110 lines
3.6 KiB
Python
from core.Constants import Constants
|
|
from core.Errors import CommandNotFoundError, PolicyAssignmentError, PolicyInstatementError, PolicyRevocationError
|
|
from packaging import version
|
|
from packaging.version import InvalidVersion
|
|
from subprocess import CalledProcessError
|
|
import os
|
|
import pwd
|
|
import re
|
|
import shutil
|
|
import subprocess
|
|
|
|
|
|
class PrivilegePolicyController:
|
|
|
|
@staticmethod
|
|
def preview():
|
|
|
|
username = PrivilegePolicyController.__determine_username()
|
|
return PrivilegePolicyController.__generate(username)
|
|
|
|
@staticmethod
|
|
def instate():
|
|
|
|
if shutil.which('pkexec') is None:
|
|
raise CommandNotFoundError('pkexec')
|
|
|
|
if not PrivilegePolicyController.__is_compatible():
|
|
raise PolicyInstatementError('The privilege policy is not compatible.')
|
|
|
|
username = PrivilegePolicyController.__determine_username()
|
|
privilege_policy = PrivilegePolicyController.__generate(username)
|
|
|
|
completed_successfully = False
|
|
failed_attempt_count = 0
|
|
|
|
while not completed_successfully and failed_attempt_count < 3:
|
|
|
|
process = subprocess.Popen([
|
|
'pkexec', 'install', '/dev/stdin', Constants.HV_PRIVILEGE_POLICY_PATH, '-o', 'root', '-m', '440'
|
|
], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
|
|
|
process.communicate(f'{privilege_policy}\n')
|
|
completed_successfully = (process.returncode == 0)
|
|
|
|
if not completed_successfully:
|
|
failed_attempt_count += 1
|
|
|
|
if not completed_successfully:
|
|
raise PolicyInstatementError('The privilege policy could not be instated.')
|
|
|
|
@staticmethod
|
|
def revoke():
|
|
|
|
if shutil.which('pkexec') is None:
|
|
raise CommandNotFoundError('pkexec')
|
|
|
|
process = subprocess.Popen(('pkexec', 'rm', Constants.HV_PRIVILEGE_POLICY_PATH))
|
|
completed_successfully = not bool(os.waitpid(process.pid, 0)[1] >> 8)
|
|
|
|
if not completed_successfully:
|
|
raise PolicyRevocationError('The privilege policy could not be revoked.')
|
|
|
|
@staticmethod
|
|
def is_instated():
|
|
return os.path.exists(Constants.HV_PRIVILEGE_POLICY_PATH)
|
|
|
|
@staticmethod
|
|
def __determine_username():
|
|
|
|
try:
|
|
password_database_entry = pwd.getpwuid(os.geteuid())
|
|
except (OSError, KeyError):
|
|
raise PolicyAssignmentError('The privilege policy could not be assigned to the current user.')
|
|
|
|
if password_database_entry.pw_uid == 0:
|
|
raise PolicyAssignmentError('The privilege policy could not be assigned to the current user.')
|
|
|
|
return password_database_entry.pw_name
|
|
|
|
@staticmethod
|
|
def __generate(username: str):
|
|
|
|
return '\n'.join([
|
|
f'{username} ALL=(root) NOPASSWD: /usr/bin/wg-quick ^up {Constants.HV_SYSTEM_PROFILE_CONFIG_PATH}/[0-9]+/wg.conf$'
|
|
])
|
|
|
|
@staticmethod
|
|
def __is_compatible():
|
|
|
|
try:
|
|
process_output = subprocess.check_output(['sudo', '-V'], text=True)
|
|
except CalledProcessError:
|
|
return False
|
|
|
|
if process_output.splitlines():
|
|
sudo_version_details = process_output.splitlines()[0].strip()
|
|
else:
|
|
return False
|
|
|
|
sudo_version_number = (m := re.search(r'(\d[0-9.]+?)(?=p|$)', sudo_version_details)) and m.group(1)
|
|
|
|
if not sudo_version_number:
|
|
return False
|
|
|
|
try:
|
|
sudo_version = version.parse(sudo_version_number)
|
|
except InvalidVersion:
|
|
return False
|
|
|
|
return sudo_version >= version.parse('1.9.10') and os.path.isfile('/usr/bin/wg-quick')
|