sp-hydra-veil-core/core/controllers/ApplicationVersionController.py

85 lines
3.5 KiB
Python

from core.Errors import FileIntegrityError, UnsupportedApplicationVersionError
from core.controllers.ApplicationController import ApplicationController
from core.models.session.ApplicationVersion import ApplicationVersion
from core.observers.ApplicationVersionObserver import ApplicationVersionObserver
from core.observers.ConnectionObserver import ConnectionObserver
from core.services.WebServiceApiService import WebServiceApiService
from io import BytesIO
from typing import Optional
import hashlib
import requests
import shutil
import tarfile
class ApplicationVersionController:
@staticmethod
def get(application_code: str, version_number: str):
return ApplicationVersion.find(application_code, version_number)
@staticmethod
def install(application_version: ApplicationVersion, application_version_observer: Optional[ApplicationVersionObserver] = None, connection_observer: Optional[ConnectionObserver] = None):
if not application_version.is_supported():
raise UnsupportedApplicationVersionError('The application version in question is not supported.')
from core.controllers.ConnectionController import ConnectionController
ConnectionController.with_preferred_connection(application_version, task=ApplicationVersionController.__install, application_version_observer=application_version_observer, connection_observer=connection_observer)
@staticmethod
def uninstall(application_version: ApplicationVersion):
shutil.rmtree(application_version.installation_path, ignore_errors=True)
@staticmethod
def _sync(proxies: Optional[dict] = None):
applications = ApplicationController.get_all()
application_versions = []
for application_code in (application.code for application in applications):
application_version_subset = WebServiceApiService.get_application_versions(application_code, proxies)
for application_version in application_version_subset:
application_versions.append(application_version)
ApplicationVersion.truncate()
ApplicationVersion.save_many(application_versions)
@staticmethod
def __install(application_version: ApplicationVersion, application_version_observer: Optional[ApplicationVersionObserver] = None, proxies: Optional[dict] = None):
if application_version_observer is not None:
application_version_observer.notify('downloading', application_version)
if proxies is not None:
response = requests.get(application_version.download_path, stream=True, proxies=proxies)
else:
response = requests.get(application_version.download_path, stream=True)
if response.status_code == 200:
file_hash = ApplicationVersionController.__calculate_file_hash(BytesIO(response.content))
if file_hash != application_version.file_hash:
raise FileIntegrityError('Application version file integrity could not be verified.')
file = tarfile.open(fileobj=BytesIO(response.content), mode = 'r:gz')
file.extractall(application_version.installation_path)
else:
raise ConnectionError('The application version could not be downloaded.')
@staticmethod
def __calculate_file_hash(file):
hasher = hashlib.sha3_512()
buffer = file.read(65536)
while len(buffer) > 0:
hasher.update(buffer)
buffer = file.read(65536)
return hasher.hexdigest()