230 lines
6.8 KiB
Python
230 lines
6.8 KiB
Python
from abc import ABC, abstractmethod
|
|
from core.Constants import Constants
|
|
from core.models.Location import Location
|
|
from core.models.Subscription import Subscription
|
|
from core.models.session.ApplicationVersion import ApplicationVersion
|
|
from dataclasses import dataclass, field, asdict
|
|
from dataclasses_json import config, Exclude, dataclass_json
|
|
from json import JSONDecodeError
|
|
from pathlib import Path
|
|
from typing import Optional, Self
|
|
import json
|
|
import os
|
|
import re
|
|
import shutil
|
|
import tempfile
|
|
|
|
|
|
@dataclass_json
|
|
@dataclass
|
|
class BaseProfile(ABC):
|
|
id: int = field(
|
|
metadata=config(exclude=Exclude.ALWAYS)
|
|
)
|
|
name: str
|
|
subscription: Optional[Subscription]
|
|
location: Optional[Location]
|
|
|
|
@abstractmethod
|
|
def get_wireguard_configuration_path(self):
|
|
pass
|
|
|
|
@abstractmethod
|
|
def has_wireguard_configuration(self):
|
|
pass
|
|
|
|
def get_config_path(self):
|
|
return BaseProfile.__get_config_path(self.id)
|
|
|
|
def get_data_path(self):
|
|
return BaseProfile.__get_data_path(self.id)
|
|
|
|
def has_subscription(self):
|
|
return self.subscription is not None
|
|
|
|
def has_location(self):
|
|
return self.location is not None
|
|
|
|
def is_session_profile(self):
|
|
return type(self).__name__ == 'SessionProfile'
|
|
|
|
def is_system_profile(self):
|
|
return type(self).__name__ == 'SystemProfile'
|
|
|
|
def save(self: Self):
|
|
|
|
config_file_contents = f'{self.to_json(indent=4)}\n'
|
|
|
|
os.makedirs(self.get_config_path(), exist_ok=True)
|
|
os.makedirs(self.get_data_path(), exist_ok=True)
|
|
|
|
config_file_path = f'{self.get_config_path()}/config.json'
|
|
|
|
with open(config_file_path, 'w') as config_file:
|
|
|
|
config_file.write(config_file_contents)
|
|
config_file.close()
|
|
|
|
def delete_data(self):
|
|
shutil.rmtree(self.get_data_path(), ignore_errors=True)
|
|
|
|
def delete(self):
|
|
shutil.rmtree(self.get_config_path(), ignore_errors=True)
|
|
self.delete_data()
|
|
|
|
def get_wireguard_configuration_metadata(self, key):
|
|
|
|
configuration = self.get_wireguard_configuration()
|
|
|
|
if configuration is not None:
|
|
|
|
for line in configuration.splitlines():
|
|
match = re.match(r'^# {} = (.*)$'.format(re.escape(key)), line)
|
|
|
|
if match:
|
|
return re.sub(r'[^a-zA-Z0-9+=\-_ /]', '', match.group(1).strip())
|
|
|
|
return None
|
|
|
|
def get_wireguard_public_keys(self):
|
|
|
|
import configparser
|
|
|
|
wireguard_public_keys = set()
|
|
|
|
configuration = self.get_wireguard_configuration()
|
|
parsed_configuration = configparser.ConfigParser()
|
|
|
|
if configuration is not None:
|
|
|
|
parsed_configuration.read_string(configuration)
|
|
|
|
for section in parsed_configuration.sections():
|
|
|
|
if parsed_configuration.has_option(section, 'PublicKey'):
|
|
wireguard_public_keys.add(parsed_configuration.get(section, 'PublicKey'))
|
|
|
|
return tuple(wireguard_public_keys)
|
|
|
|
def get_wireguard_configuration(self):
|
|
|
|
try:
|
|
with open(self.get_wireguard_configuration_path(), 'r') as file:
|
|
return file.read()
|
|
|
|
except (FileNotFoundError, PermissionError):
|
|
return None
|
|
|
|
def address_security_incident(self):
|
|
|
|
if self.has_wireguard_configuration():
|
|
|
|
wireguard_configuration_path = Path(self.get_wireguard_configuration_path())
|
|
|
|
incident_data_path = Path(Constants.HV_INCIDENT_DATA_HOME)
|
|
incident_data_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
incident_path = Path(tempfile.mkdtemp(dir=incident_data_path, prefix=''))
|
|
incident_wireguard_configuration_path = f'{incident_path}/{wireguard_configuration_path.name}'
|
|
|
|
try:
|
|
|
|
shutil.copy2(wireguard_configuration_path, incident_wireguard_configuration_path)
|
|
os.chmod(incident_wireguard_configuration_path, 0o644)
|
|
|
|
except (FileNotFoundError, PermissionError):
|
|
|
|
if incident_path.is_dir():
|
|
|
|
incident_path_contents = incident_path.iterdir()
|
|
|
|
if not any(incident_path_contents):
|
|
incident_path.rmdir()
|
|
|
|
def _get_dirty_keys(self: Self):
|
|
|
|
reference = BaseProfile.find_by_id(self.id)
|
|
|
|
if type(reference) != type(self):
|
|
return list(self.__dataclass_fields__.keys())
|
|
|
|
return list([key for key, value in asdict(self).items() if value != asdict(reference).get(key)])
|
|
|
|
@staticmethod
|
|
def find_by_id(id: int):
|
|
|
|
try:
|
|
config_file_contents = open(f'{BaseProfile.__get_config_path(id)}/config.json', 'r').read()
|
|
except FileNotFoundError:
|
|
return None
|
|
|
|
try:
|
|
profile = json.loads(config_file_contents)
|
|
except JSONDecodeError:
|
|
return None
|
|
|
|
profile['id'] = id
|
|
|
|
if profile['location'] is not None:
|
|
|
|
location = Location.find(profile['location']['country_code'] or None, profile['location']['code'] or None)
|
|
|
|
if location is not None:
|
|
|
|
if profile['location'].get('time_zone') is not None:
|
|
location.time_zone = profile['location']['time_zone']
|
|
|
|
profile['location'] = location
|
|
|
|
if 'application_version' in profile:
|
|
|
|
if profile['application_version'] is not None:
|
|
application_version = ApplicationVersion.find(profile['application_version']['application_code'] or None, profile['application_version']['version_number'] or None)
|
|
|
|
if application_version is not None:
|
|
profile['application_version'] = application_version
|
|
|
|
from core.models.session.SessionProfile import SessionProfile
|
|
# noinspection PyUnresolvedReferences
|
|
profile = SessionProfile.from_dict(profile)
|
|
|
|
else:
|
|
|
|
from core.models.system.SystemProfile import SystemProfile
|
|
# noinspection PyUnresolvedReferences
|
|
profile = SystemProfile.from_dict(profile)
|
|
|
|
return profile
|
|
|
|
@staticmethod
|
|
def exists(id: int):
|
|
return re.match(r'^\d{1,2}$', str(id)) and os.path.isfile(f'{BaseProfile.__get_config_path(id)}/config.json')
|
|
|
|
@staticmethod
|
|
def all():
|
|
|
|
profiles = {}
|
|
|
|
for directory_entry in os.listdir(Constants.HV_PROFILE_CONFIG_HOME):
|
|
|
|
try:
|
|
id = int(directory_entry)
|
|
except ValueError:
|
|
continue
|
|
|
|
if BaseProfile.exists(id):
|
|
|
|
profile = BaseProfile.find_by_id(id)
|
|
|
|
if profile is not None:
|
|
profiles[id] = profile
|
|
|
|
return dict(sorted(profiles.items()))
|
|
|
|
@staticmethod
|
|
def __get_config_path(id: int):
|
|
return f'{Constants.HV_PROFILE_CONFIG_HOME}/{str(id)}'
|
|
|
|
@staticmethod
|
|
def __get_data_path(id: int):
|
|
return f'{Constants.HV_PROFILE_DATA_HOME}/{str(id)}'
|