sp-hydra-veil-core/core/models/BaseProfile.py

198 lines
5.7 KiB
Python

from abc import ABC, abstractmethod
from core.Constants import Constants
from core.models.Location import Location
from core.models.Subscription import Subscription
from core.models.session.ApplicationVersion import ApplicationVersion
from dataclasses import dataclass, field, asdict
from dataclasses_json import config, Exclude, dataclass_json
from json import JSONDecodeError
from typing import Optional, Self
import json
import os
import re
import shutil
@dataclass_json
@dataclass
class BaseProfile(ABC):
id: int = field(
metadata=config(exclude=Exclude.ALWAYS)
)
name: str
subscription: Optional[Subscription]
location: Optional[Location]
@abstractmethod
def get_wireguard_configuration_path(self):
pass
def get_config_path(self):
return BaseProfile.__get_config_path(self.id)
def get_data_path(self):
return BaseProfile.__get_data_path(self.id)
def has_subscription(self):
return self.subscription is not None
def has_location(self):
return self.location is not None
def is_session_profile(self):
return type(self).__name__ == 'SessionProfile'
def is_system_profile(self):
return type(self).__name__ == 'SystemProfile'
def save(self: Self):
config_file_contents = f'{self.to_json(indent=4)}\n'
os.makedirs(self.get_config_path(), exist_ok=True)
os.makedirs(self.get_data_path(), exist_ok=True)
config_file_path = f'{self.get_config_path()}/config.json'
with open(config_file_path, 'w') as config_file:
config_file.write(config_file_contents)
config_file.close()
def delete_data(self):
shutil.rmtree(self.get_data_path(), ignore_errors=True)
def delete(self):
shutil.rmtree(self.get_config_path(), ignore_errors=True)
self.delete_data()
def get_wireguard_configuration_metadata(self, key):
configuration = self.get_wireguard_configuration()
if configuration is not None:
for line in configuration.splitlines():
match = re.match(r'^# {} = (.*)$'.format(re.escape(key)), line)
if match:
return re.sub(r'[^a-zA-Z0-9+=\-_ /]', '', match.group(1).strip())
return None
def get_wireguard_public_keys(self):
import configparser
wireguard_public_keys = set()
configuration = self.get_wireguard_configuration()
parsed_configuration = configparser.ConfigParser()
if configuration is not None:
parsed_configuration.read_string(configuration)
for section in parsed_configuration.sections():
if parsed_configuration.has_option(section, 'PublicKey'):
wireguard_public_keys.add(parsed_configuration.get(section, 'PublicKey'))
return tuple(wireguard_public_keys)
def get_wireguard_configuration(self):
try:
with open(self.get_wireguard_configuration_path(), 'r') as file:
return file.read()
except FileNotFoundError:
return None
def _get_dirty_keys(self: Self):
reference = BaseProfile.find_by_id(self.id)
if type(reference) != type(self):
return list(self.__dataclass_fields__.keys())
return list([key for key, value in asdict(self).items() if value != asdict(reference).get(key)])
@staticmethod
def find_by_id(id: int):
try:
config_file_contents = open(f'{BaseProfile.__get_config_path(id)}/config.json', 'r').read()
except FileNotFoundError:
return None
try:
profile = json.loads(config_file_contents)
except JSONDecodeError:
return None
profile['id'] = id
if profile['location'] is not None:
location = Location.find(profile['location']['country_code'] or None, profile['location']['code'] or None)
if location is not None:
if profile['location'].get('time_zone') is not None:
location.time_zone = profile['location']['time_zone']
profile['location'] = location
if 'application_version' in profile:
if profile['application_version'] is not None:
application_version = ApplicationVersion.find(profile['application_version']['application_code'] or None, profile['application_version']['version_number'] or None)
if application_version is not None:
profile['application_version'] = application_version
from core.models.session.SessionProfile import SessionProfile
# noinspection PyUnresolvedReferences
profile = SessionProfile.from_dict(profile)
else:
from core.models.system.SystemProfile import SystemProfile
# noinspection PyUnresolvedReferences
profile = SystemProfile.from_dict(profile)
return profile
@staticmethod
def exists(id: int):
return re.match(r'^\d{1,2}$', str(id)) and os.path.isfile(f'{BaseProfile.__get_config_path(id)}/config.json')
@staticmethod
def all():
profiles = {}
for directory_entry in os.listdir(Constants.HV_PROFILE_CONFIG_HOME):
try:
id = int(directory_entry)
except ValueError:
continue
if BaseProfile.exists(id):
profile = BaseProfile.find_by_id(id)
if profile is not None:
profiles[id] = profile
return dict(sorted(profiles.items()))
@staticmethod
def __get_config_path(id: int):
return f'{Constants.HV_PROFILE_CONFIG_HOME}/{str(id)}'
@staticmethod
def __get_data_path(id: int):
return f'{Constants.HV_PROFILE_DATA_HOME}/{str(id)}'