sp-hydra-veil-core/core/controllers/ApplicationController.py

137 lines
5.1 KiB
Python

from core.Constants import Constants
from core.controllers.SessionStateController import SessionStateController
from core.models.session.Application import Application
from core.models.session.ApplicationVersion import ApplicationVersion
from core.models.session.SessionProfile import SessionProfile
from core.models.session.SessionState import SessionState
from core.observers.ProfileObserver import ProfileObserver
from core.services.WebServiceApiService import WebServiceApiService
from pathlib import Path
from typing import Optional
import os
import re
import shutil
import stat
import subprocess
import sys
import time
class ApplicationController:
@staticmethod
def get(code: str):
return Application.find(code)
@staticmethod
def get_all():
return Application.all()
@staticmethod
def launch(version: ApplicationVersion, profile: SessionProfile, port_number: int = None, asynchronous: bool = False, profile_observer: Optional[ProfileObserver] = None):
from core.controllers.ProfileController import ProfileController
persistent_state_path = f'{profile.get_data_path()}/persistent-state'
if not os.path.isdir(persistent_state_path) or len(os.listdir(persistent_state_path)) == 0:
shutil.copytree(f'{version.get_installation_path()}/resources/initial-state', persistent_state_path)
display = ApplicationController.__find_unused_display()
base_initialization_file_template = open(f'/{Constants.HV_RUNTIME_DATA_HOME}/.init.ptpl', 'r').read()
base_initialization_file_contents = base_initialization_file_template.format(display=display, time_zone=profile.location.time_zone, hv_data_home=Constants.HV_DATA_HOME)
application_initialization_file_template = open(f'/{version.get_installation_path()}/.init.ptpl', 'r').read()
application_initialization_file_contents = application_initialization_file_template.format(application_version_home=version.get_installation_path(), port_number=port_number or -1, home=Constants.HOME, profile_data_path=profile.get_data_path(), config_home=Constants.CONFIG_HOME, application_version_number=version.version_number)
session_state = SessionStateController.get_or_new(profile.id)
SessionStateController.update_or_create(session_state)
initialization_file_contents = base_initialization_file_contents + application_initialization_file_contents
initialization_file_path = f'{session_state.get_state_path()}/.init'
Path(initialization_file_path).touch(exist_ok=True, mode=0o600 | stat.S_IEXEC)
with open(initialization_file_path, 'w') as initialization_file:
initialization_file.write(initialization_file_contents)
initialization_file.close()
if asynchronous:
fork_process_id = os.fork()
if not fork_process_id:
ApplicationController.__run_process(initialization_file_path, profile, display, session_state)
ProfileController.disable(profile, False, profile_observer=profile_observer)
time.sleep(1.0)
sys.exit()
else:
ApplicationController.__run_process(initialization_file_path, profile, display, session_state)
ProfileController.disable(profile, False, profile_observer=profile_observer)
@staticmethod
def _sync(proxies: Optional[dict] = None):
applications = WebServiceApiService.get_applications(proxies)
Application.truncate()
Application.save_many(applications)
@staticmethod
def __find_unused_display():
file_names = os.listdir('/tmp/.X11-unix')
active_displays = []
for file_name in file_names:
match_object = re.search(r'X(\d+)', file_name)
if match_object:
detected_display = int(match_object.group(1))
if 170 <= detected_display <= 198:
active_displays.append(detected_display)
if len(active_displays) > 0:
unused_display = sorted(active_displays)[-1] + 1
return f':{str(unused_display)}'
else:
return ':170'
@staticmethod
def __run_process(initialization_file_path, profile, display, session_state):
virtual_display_process = subprocess.Popen(('/usr/bin/Xephyr', '-ac', '-br', '-title', f'Hydra Veil - {profile.name or "Unnamed Profile"}', '-screen', profile.resolution, '-no-host-grab', display), stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)
start_time = time.time()
timeout = float(10)
while not os.path.exists(f'/tmp/.X11-unix/X{display[1:]}'):
if time.time() - start_time < timeout:
time.sleep(0.1)
else:
virtual_display_process.kill()
return
environment = os.environ.copy()
environment.update({'DISPLAY': display})
process = subprocess.Popen(initialization_file_path, env=environment)
session_state = SessionState(session_state.id, session_state.network_port_numbers, [virtual_display_process.pid, process.pid])
SessionStateController.update_or_create(session_state)
process.wait()