from core.Constants import Constants from core.Errors import CommandNotFoundError from core.controllers.SessionStateController import SessionStateController from core.models.session.Application import Application from core.models.session.ApplicationVersion import ApplicationVersion from core.models.session.SessionProfile import SessionProfile from core.models.session.SessionState import SessionState from core.observers.ProfileObserver import ProfileObserver from core.services.WebServiceApiService import WebServiceApiService from pathlib import Path from typing import Optional import os import random import re import shutil import stat import subprocess import sys import time class ApplicationController: @staticmethod def get(code: str): return Application.find(code) @staticmethod def get_all(): return Application.all() @staticmethod def launch(version: ApplicationVersion, profile: SessionProfile, port_number: int = None, asynchronous: bool = False, profile_observer: Optional[ProfileObserver] = None): from core.controllers.ProfileController import ProfileController persistent_state_path = f'{profile.get_data_path()}/persistent-state' font_path = f'{profile.get_data_path()}/fonts' if not os.path.isdir(persistent_state_path) or len(os.listdir(persistent_state_path)) == 0: shutil.copytree(f'{version.get_installation_path()}/resources/initial-state', persistent_state_path) if (not os.path.isdir(font_path) or len(os.listdir(font_path)) == 0) and os.path.isdir('/usr/share/fonts/truetype'): font_families = [file.name for file in Path('/usr/share/fonts/truetype').iterdir() if file.is_dir()] preferred_font_families = ['dejavu', 'droid', 'liberation', 'libreoffice', 'noto', 'ubuntu'] font_family_subset = ApplicationController.__select_random_subset(font_families, preferred_font_families, 12) for font_family in font_family_subset: shutil.copytree(f'/usr/share/fonts/truetype/{font_family}', f'{font_path}/{font_family}') display = ApplicationController.__find_unused_display() time_zone = profile.determine_timezone() runtime_initialization_file_template = open(f'/{Constants.HV_RUNTIME_DATA_HOME}/.init.ptpl', 'r').read() runtime_initialization_file_contents = runtime_initialization_file_template.format(display=display, time_zone=time_zone, hv_runtime_data_home=Constants.HV_RUNTIME_DATA_HOME) application_initialization_file_template = open(f'/{version.get_installation_path()}/.init.ptpl', 'r').read() application_initialization_file_contents = application_initialization_file_template.format(application_version_home=version.get_installation_path(), port_number=port_number or -1, home=Constants.HOME, profile_data_path=profile.get_data_path(), config_home=Constants.CONFIG_HOME, application_version_number=version.version_number) session_state = SessionStateController.get_or_new(profile.id) SessionStateController.update_or_create(session_state) initialization_file_contents = runtime_initialization_file_contents + application_initialization_file_contents initialization_file_path = f'{session_state.get_state_path()}/.init' Path(initialization_file_path).touch(exist_ok=True, mode=0o600 | stat.S_IEXEC) with open(initialization_file_path, 'w') as initialization_file: initialization_file.write(initialization_file_contents) initialization_file.close() if asynchronous: fork_process_id = os.fork() if not fork_process_id: ApplicationController.__run_process(initialization_file_path, profile, display, session_state) ProfileController.disable(profile, False, profile_observer=profile_observer) time.sleep(1.0) sys.exit() else: ApplicationController.__run_process(initialization_file_path, profile, display, session_state) ProfileController.disable(profile, False, profile_observer=profile_observer) @staticmethod def _sync(proxies: Optional[dict] = None): applications = WebServiceApiService.get_applications(proxies) Application.truncate() Application.save_many(applications) @staticmethod def __find_unused_display(): file_names = os.listdir('/tmp/.X11-unix') active_displays = [] for file_name in file_names: match_object = re.search(r'X(\d+)', file_name) if match_object: detected_display = int(match_object.group(1)) if 170 <= detected_display <= 198: active_displays.append(detected_display) if len(active_displays) > 0: unused_display = sorted(active_displays)[-1] + 1 return f':{str(unused_display)}' else: return ':170' @staticmethod def __select_random_subset(items: list, preferred_items: list, limit: int): available_preferred_items = [item for item in preferred_items if item in items] selectable_items = [item for item in items if item not in preferred_items] selected_items = random.sample(selectable_items, random.randint(0, min(limit, len(selectable_items)))) return selected_items + available_preferred_items @staticmethod def __run_process(initialization_file_path, profile, display, session_state): if shutil.which('bwrap') is None: raise CommandNotFoundError('bwrap') if shutil.which('ratpoison') is None: raise CommandNotFoundError('ratpoison') if shutil.which('Xephyr') is None: raise CommandNotFoundError('Xephyr') virtual_display_process = subprocess.Popen(('Xephyr', '-ac', '-br', '-title', f'HydraVeil - {profile.name or "Unnamed Profile"}', '-screen', profile.resolution, '-no-host-grab', display), stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT) start_time = time.time() timeout = float(10) while not os.path.exists(f'/tmp/.X11-unix/X{display[1:]}'): if time.time() - start_time < timeout: time.sleep(0.1) else: virtual_display_process.kill() return environment = os.environ.copy() environment.update({'DISPLAY': display}) process = subprocess.Popen(initialization_file_path, env=environment) session_state = SessionState(session_state.id, session_state.network_port_numbers, [virtual_display_process.pid, process.pid]) SessionStateController.update_or_create(session_state) process.wait()