142 lines
5.3 KiB
Python
142 lines
5.3 KiB
Python
from core.Constants import Constants
|
|
from core.Errors import CommandNotFoundError
|
|
from core.controllers.SessionStateController import SessionStateController
|
|
from core.models.session.Application import Application
|
|
from core.models.session.ApplicationVersion import ApplicationVersion
|
|
from core.models.session.SessionProfile import SessionProfile
|
|
from core.models.session.SessionState import SessionState
|
|
from core.observers.ProfileObserver import ProfileObserver
|
|
from core.services.WebServiceApiService import WebServiceApiService
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
import os
|
|
import re
|
|
import shutil
|
|
import stat
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
|
|
|
|
class ApplicationController:
|
|
|
|
@staticmethod
|
|
def get(code: str):
|
|
return Application.find(code)
|
|
|
|
@staticmethod
|
|
def get_all():
|
|
return Application.all()
|
|
|
|
@staticmethod
|
|
def launch(version: ApplicationVersion, profile: SessionProfile, port_number: int = None, asynchronous: bool = False, profile_observer: Optional[ProfileObserver] = None):
|
|
|
|
from core.controllers.ProfileController import ProfileController
|
|
|
|
persistent_state_path = f'{profile.get_data_path()}/persistent-state'
|
|
|
|
if not os.path.isdir(persistent_state_path) or len(os.listdir(persistent_state_path)) == 0:
|
|
shutil.copytree(f'{version.get_installation_path()}/resources/initial-state', persistent_state_path)
|
|
|
|
display = ApplicationController.__find_unused_display()
|
|
time_zone = profile.determine_timezone()
|
|
|
|
base_initialization_file_template = open(f'/{Constants.HV_RUNTIME_DATA_HOME}/.init.ptpl', 'r').read()
|
|
base_initialization_file_contents = base_initialization_file_template.format(display=display, time_zone=time_zone, hv_data_home=Constants.HV_DATA_HOME)
|
|
|
|
application_initialization_file_template = open(f'/{version.get_installation_path()}/.init.ptpl', 'r').read()
|
|
application_initialization_file_contents = application_initialization_file_template.format(application_version_home=version.get_installation_path(), port_number=port_number or -1, home=Constants.HOME, profile_data_path=profile.get_data_path(), config_home=Constants.CONFIG_HOME, application_version_number=version.version_number)
|
|
|
|
session_state = SessionStateController.get_or_new(profile.id)
|
|
SessionStateController.update_or_create(session_state)
|
|
|
|
initialization_file_contents = base_initialization_file_contents + application_initialization_file_contents
|
|
initialization_file_path = f'{session_state.get_state_path()}/.init'
|
|
|
|
Path(initialization_file_path).touch(exist_ok=True, mode=0o600 | stat.S_IEXEC)
|
|
|
|
with open(initialization_file_path, 'w') as initialization_file:
|
|
|
|
initialization_file.write(initialization_file_contents)
|
|
initialization_file.close()
|
|
|
|
if asynchronous:
|
|
|
|
fork_process_id = os.fork()
|
|
|
|
if not fork_process_id:
|
|
|
|
ApplicationController.__run_process(initialization_file_path, profile, display, session_state)
|
|
ProfileController.disable(profile, False, profile_observer=profile_observer)
|
|
|
|
time.sleep(1.0)
|
|
sys.exit()
|
|
|
|
else:
|
|
|
|
ApplicationController.__run_process(initialization_file_path, profile, display, session_state)
|
|
ProfileController.disable(profile, False, profile_observer=profile_observer)
|
|
|
|
@staticmethod
|
|
def _sync(proxies: Optional[dict] = None):
|
|
|
|
applications = WebServiceApiService.get_applications(proxies)
|
|
|
|
Application.truncate()
|
|
Application.save_many(applications)
|
|
|
|
@staticmethod
|
|
def __find_unused_display():
|
|
|
|
file_names = os.listdir('/tmp/.X11-unix')
|
|
active_displays = []
|
|
|
|
for file_name in file_names:
|
|
|
|
match_object = re.search(r'X(\d+)', file_name)
|
|
|
|
if match_object:
|
|
|
|
detected_display = int(match_object.group(1))
|
|
|
|
if 170 <= detected_display <= 198:
|
|
active_displays.append(detected_display)
|
|
|
|
if len(active_displays) > 0:
|
|
|
|
unused_display = sorted(active_displays)[-1] + 1
|
|
return f':{str(unused_display)}'
|
|
|
|
else:
|
|
return ':170'
|
|
|
|
@staticmethod
|
|
def __run_process(initialization_file_path, profile, display, session_state):
|
|
|
|
if shutil.which('Xephyr') is None:
|
|
raise CommandNotFoundError('Xephyr')
|
|
|
|
virtual_display_process = subprocess.Popen(('Xephyr', '-ac', '-br', '-title', f'Hydra Veil - {profile.name or "Unnamed Profile"}', '-screen', profile.resolution, '-no-host-grab', display), stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)
|
|
|
|
start_time = time.time()
|
|
timeout = float(10)
|
|
|
|
while not os.path.exists(f'/tmp/.X11-unix/X{display[1:]}'):
|
|
|
|
if time.time() - start_time < timeout:
|
|
time.sleep(0.1)
|
|
|
|
else:
|
|
|
|
virtual_display_process.kill()
|
|
return
|
|
|
|
environment = os.environ.copy()
|
|
environment.update({'DISPLAY': display})
|
|
|
|
process = subprocess.Popen(initialization_file_path, env=environment)
|
|
|
|
session_state = SessionState(session_state.id, session_state.network_port_numbers, [virtual_display_process.pid, process.pid])
|
|
SessionStateController.update_or_create(session_state)
|
|
|
|
process.wait()
|