sp-hydra-veil-core/core/controllers/ApplicationController.py

170 lines
6.6 KiB
Python

from core.Constants import Constants
from core.Errors import CommandNotFoundError
from core.controllers.SessionStateController import SessionStateController
from core.models.session.Application import Application
from core.models.session.ApplicationVersion import ApplicationVersion
from core.models.session.SessionProfile import SessionProfile
from core.models.session.SessionState import SessionState
from core.observers.ProfileObserver import ProfileObserver
from core.services.WebServiceApiService import WebServiceApiService
from pathlib import Path
from typing import Optional
import os
import random
import re
import shutil
import stat
import subprocess
import sys
import time
class ApplicationController:
@staticmethod
def get(code: str):
return Application.find(code)
@staticmethod
def get_all():
return Application.all()
@staticmethod
def launch(version: ApplicationVersion, profile: SessionProfile, port_number: int = None, asynchronous: bool = False, profile_observer: Optional[ProfileObserver] = None):
from core.controllers.ProfileController import ProfileController
persistent_state_path = f'{profile.get_data_path()}/persistent-state'
font_path = f'{profile.get_data_path()}/fonts'
if not os.path.isdir(persistent_state_path) or len(os.listdir(persistent_state_path)) == 0:
shutil.copytree(f'{version.get_installation_path()}/resources/initial-state', persistent_state_path)
if (not os.path.isdir(font_path) or len(os.listdir(font_path)) == 0) and os.path.isdir('/usr/share/fonts/truetype'):
font_families = [file.name for file in Path('/usr/share/fonts/truetype').iterdir() if file.is_dir()]
preferred_font_families = ['dejavu', 'droid', 'liberation', 'libreoffice', 'noto', 'ubuntu']
font_family_subset = ApplicationController.__select_random_subset(font_families, preferred_font_families, 12)
for font_family in font_family_subset:
shutil.copytree(f'/usr/share/fonts/truetype/{font_family}', f'{font_path}/{font_family}')
display = ApplicationController.__find_unused_display()
time_zone = profile.determine_timezone()
runtime_initialization_file_template = open(f'/{Constants.HV_RUNTIME_DATA_HOME}/.init.ptpl', 'r').read()
runtime_initialization_file_contents = runtime_initialization_file_template.format(display=display, time_zone=time_zone, hv_runtime_data_home=Constants.HV_RUNTIME_DATA_HOME)
application_initialization_file_template = open(f'/{version.get_installation_path()}/.init.ptpl', 'r').read()
application_initialization_file_contents = application_initialization_file_template.format(application_version_home=version.get_installation_path(), port_number=port_number or -1, home=Constants.HOME, profile_data_path=profile.get_data_path(), config_home=Constants.CONFIG_HOME, application_version_number=version.version_number)
session_state = SessionStateController.get_or_new(profile.id)
SessionStateController.update_or_create(session_state)
initialization_file_contents = runtime_initialization_file_contents + application_initialization_file_contents
initialization_file_path = f'{session_state.get_state_path()}/.init'
Path(initialization_file_path).touch(exist_ok=True, mode=0o600 | stat.S_IEXEC)
with open(initialization_file_path, 'w') as initialization_file:
initialization_file.write(initialization_file_contents)
initialization_file.close()
if asynchronous:
fork_process_id = os.fork()
if not fork_process_id:
ApplicationController.__run_process(initialization_file_path, profile, display, session_state)
ProfileController.disable(profile, False, profile_observer=profile_observer)
time.sleep(1.0)
sys.exit()
else:
ApplicationController.__run_process(initialization_file_path, profile, display, session_state)
ProfileController.disable(profile, False, profile_observer=profile_observer)
@staticmethod
def _sync(proxies: Optional[dict] = None):
applications = WebServiceApiService.get_applications(proxies)
Application.truncate()
Application.save_many(applications)
@staticmethod
def __find_unused_display():
file_names = os.listdir('/tmp/.X11-unix')
active_displays = []
for file_name in file_names:
match_object = re.search(r'X(\d+)', file_name)
if match_object:
detected_display = int(match_object.group(1))
if 170 <= detected_display <= 198:
active_displays.append(detected_display)
if len(active_displays) > 0:
unused_display = sorted(active_displays)[-1] + 1
return f':{str(unused_display)}'
else:
return ':170'
@staticmethod
def __select_random_subset(items: list, preferred_items: list, limit: int):
available_preferred_items = [item for item in preferred_items if item in items]
selectable_items = [item for item in items if item not in preferred_items]
selected_items = random.sample(selectable_items, random.randint(0, min(limit, len(selectable_items))))
return selected_items + available_preferred_items
@staticmethod
def __run_process(initialization_file_path, profile, display, session_state):
if shutil.which('bwrap') is None:
raise CommandNotFoundError('bwrap')
if shutil.which('ratpoison') is None:
raise CommandNotFoundError('ratpoison')
if shutil.which('Xephyr') is None:
raise CommandNotFoundError('Xephyr')
virtual_display_process = subprocess.Popen(('Xephyr', '-ac', '-br', '-title', f'HydraVeil - {profile.name or "Unnamed Profile"}', '-screen', profile.resolution, '-no-host-grab', display), stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)
start_time = time.time()
timeout = float(10)
while not os.path.exists(f'/tmp/.X11-unix/X{display[1:]}'):
if time.time() - start_time < timeout:
time.sleep(0.1)
else:
virtual_display_process.kill()
return
environment = os.environ.copy()
environment.update({'DISPLAY': display})
process = subprocess.Popen(initialization_file_path, env=environment)
session_state = SessionState(session_state.id, session_state.network_port_numbers, [virtual_display_process.pid, process.pid])
SessionStateController.update_or_create(session_state)
process.wait()