sp-hydra-veil-core/core/controllers/ApplicationVersionController.py

124 lines
4.9 KiB
Python

from core.Errors import FileIntegrityError, UnsupportedApplicationVersionError, ApplicationAlreadyInstalledError
from core.controllers.ApplicationController import ApplicationController
from core.models.session.Application import Application
from core.models.session.ApplicationVersion import ApplicationVersion
from core.observers.ApplicationVersionObserver import ApplicationVersionObserver
from core.observers.ConnectionObserver import ConnectionObserver
from core.services.WebServiceApiService import WebServiceApiService
from io import BytesIO
from typing import Optional
import hashlib
import requests
import shutil
import tarfile
class ApplicationVersionController:
@staticmethod
def get(application_code: str, version_number: str):
return ApplicationVersion.find(application_code, version_number)
@staticmethod
def get_all(application: Optional[Application] = None):
return ApplicationVersion.all(application)
@staticmethod
def install(application_version: ApplicationVersion, reinstall: bool = False, application_version_observer: Optional[ApplicationVersionObserver] = None, connection_observer: Optional[ConnectionObserver] = None):
if not application_version.is_supported():
raise UnsupportedApplicationVersionError('The application version in question is not supported.')
if reinstall is True:
ApplicationVersionController.uninstall(application_version)
if application_version.is_installed():
raise ApplicationAlreadyInstalledError('The application in question is already installed.')
from core.controllers.ConnectionController import ConnectionController
ConnectionController.with_preferred_connection(application_version, task=ApplicationVersionController.__install, application_version_observer=application_version_observer, connection_observer=connection_observer)
@staticmethod
def uninstall(application_version: ApplicationVersion):
shutil.rmtree(application_version.get_installation_path(), ignore_errors=True)
@staticmethod
def _sync(proxies: Optional[dict] = None):
applications = ApplicationController.get_all()
application_versions = []
for application_code in (application.code for application in applications):
application_version_subset = WebServiceApiService.get_application_versions(application_code, proxies)
for application_version in application_version_subset:
application_versions.append(application_version)
ApplicationVersion.truncate()
ApplicationVersion.save_many(application_versions)
@staticmethod
def __install(application_version: ApplicationVersion, application_version_observer: Optional[ApplicationVersionObserver] = None, proxies: Optional[dict] = None):
if application_version_observer is not None:
application_version_observer.notify('downloading', application_version)
if proxies is not None:
response = requests.get(application_version.download_path, stream=True, proxies=proxies)
else:
response = requests.get(application_version.download_path, stream=True)
if response.status_code == 200:
response_size = int(response.headers.get('Content-Length', 0))
response_buffer = BytesIO()
block_size = 1024
bytes_written = 0
for data in response.iter_content(block_size):
bytes_written += len(data)
response_buffer.write(data)
progress = (bytes_written / response_size) * 100 if response_size > 0 else 0
application_version_observer.notify('download_progressing', application_version, dict(
progress=progress
))
application_version_observer.notify('downloaded', application_version)
response_buffer.seek(0)
file_hash = ApplicationVersionController.__calculate_file_hash(response_buffer)
if file_hash != application_version.file_hash:
raise FileIntegrityError('Application version file integrity could not be verified.')
with tarfile.open(fileobj=response_buffer, mode = 'r:gz') as tar_file:
tar_file.extractall(application_version.get_installation_path())
tar_file.close()
with open(f'{application_version.get_installation_path()}/.sha3-512', 'w') as hash_file:
hash_file.write(f'{file_hash}\n')
hash_file.close()
else:
raise ConnectionError('The application version could not be downloaded.')
@staticmethod
def __calculate_file_hash(file):
hasher = hashlib.sha3_512()
buffer = file.read(65536)
while len(buffer) > 0:
hasher.update(buffer)
buffer = file.read(65536)
file.seek(0)
return hasher.hexdigest()