Update string formating and improve file storage-related logic

This commit is contained in:
codeking 2024-09-20 16:57:27 +02:00
parent 2174123e6c
commit fe92bbf351
13 changed files with 102 additions and 66 deletions

View file

@ -14,17 +14,17 @@ class Constants:
DATA_HOME: Final[str] = os.environ.get('XDG_DATA_HOME') or os.path.join(HOME, '.local/share')
STATE_HOME: Final[str] = os.environ.get('XDG_STATE_HOME') or os.path.join(HOME, '.local/state')
SP_CONFIG_HOME: Final[str] = CONFIG_HOME + '/simplified-privacy'
SP_DATA_HOME: Final[str] = DATA_HOME + '/simplified-privacy'
SP_STATE_HOME: Final[str] = STATE_HOME + '/simplified-privacy'
SP_CONFIG_HOME: Final[str] = f'{CONFIG_HOME}/simplified-privacy'
SP_DATA_HOME: Final[str] = f'{DATA_HOME}/simplified-privacy'
SP_STATE_HOME: Final[str] = f'{STATE_HOME}/simplified-privacy'
SP_PROFILE_CONFIG_HOME: Final[str] = SP_CONFIG_HOME + '/profiles'
SP_PROFILE_DATA_HOME: Final[str] = SP_DATA_HOME + '/profiles'
SP_PROFILE_CONFIG_HOME: Final[str] = f'{SP_CONFIG_HOME}/profiles'
SP_PROFILE_DATA_HOME: Final[str] = f'{SP_DATA_HOME}/profiles'
SP_APPLICATION_DATA_HOME: Final[str] = SP_DATA_HOME + '/applications'
SP_APPLICATION_DATA_HOME: Final[str] = f'{SP_DATA_HOME}/applications'
SP_SESSION_STATE_HOME: Final[str] = SP_STATE_HOME + '/sessions'
SP_SESSION_STATE_HOME: Final[str] = f'{SP_STATE_HOME}/sessions'
SP_STORAGE_DATABASE_PATH: Final[str] = SP_DATA_HOME + '/storage.db'
SP_STORAGE_DATABASE_PATH: Final[str] = f'{SP_DATA_HOME}/storage.db'
SP_API_BASE_URL: Final[str] = 'https://api.simplifiedprivacy.is/api/v1'

View file

@ -48,7 +48,11 @@ class ApplicationController:
initialization_file_path = f'{session_state.get_state_path()}/.init'
Path(initialization_file_path).touch(exist_ok=True, mode=0o600 | stat.S_IEXEC)
open(initialization_file_path, 'w').write(initialization_file_contents)
with open(initialization_file_path, 'w') as initialization_file:
initialization_file.write(initialization_file_contents)
initialization_file.close()
fork_process_id = os.fork()
@ -92,7 +96,7 @@ class ApplicationController:
if len(active_displays) > 0:
unused_display = sorted(active_displays)[-1] + 1
return ':' + str(unused_display)
return f':{str(unused_display)}'
else:
return ':170'

View file

@ -65,8 +65,15 @@ class ApplicationVersionController:
if file_hash != application_version.file_hash:
raise FileIntegrityError('Application version file integrity could not be verified.')
file = tarfile.open(fileobj=BytesIO(response.content), mode = 'r:gz')
file.extractall(application_version.installation_path)
with tarfile.open(fileobj=BytesIO(response.content), mode = 'r:gz') as tar_file:
tar_file.extractall(application_version.installation_path)
tar_file.close()
with open(f'{application_version.installation_path}/.sha3-512', 'w') as hash_file:
hash_file.write(f'{file_hash}\n')
hash_file.close()
else:
raise ConnectionError('The application version could not be downloaded.')

View file

@ -161,7 +161,7 @@ class ConnectionController:
if subprocess.getstatusoutput('tor --help')[0] == 127:
raise OSError('Tor does not appear to be installed.')
tor_session_directory = session_directory + '/tor'
tor_session_directory = f'{session_directory}/tor'
Path(tor_session_directory).mkdir(exist_ok=True, mode=0o700)
process = subprocess.Popen(('echo', f'DataDirectory {tor_session_directory}/tor\nSocksPort {port_number}'), stdout=subprocess.PIPE)
@ -173,16 +173,18 @@ class ConnectionController:
if not profile.has_wireguard_configuration():
raise FileNotFoundError('No valid WireGuard configuration file detected.')
wireguard_session_directory = session_directory + '/wireguard'
wireguard_session_directory = f'{session_directory}/wireguard'
Path(wireguard_session_directory).mkdir(exist_ok=True, mode=0o700)
wireproxy_configuration_file_path = wireguard_session_directory + '/wireproxy.conf'
wireproxy_configuration_file_path = f'{wireguard_session_directory}/wireproxy.conf'
Path(wireproxy_configuration_file_path).touch(exist_ok=True, mode=0o600)
with open(wireproxy_configuration_file_path, 'w') as output_file:
print(f'WGConfig = {profile.get_wireguard_configuration_path()}\n\n[Socks5]\nBindAddress = 127.0.0.1:{str(port_number)}\n', file=output_file)
with open(wireproxy_configuration_file_path, 'w') as wireproxy_configuration_file:
return subprocess.Popen((Constants.SP_DATA_HOME + '/wireproxy/wireproxy', '-c', wireproxy_configuration_file_path), stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)
wireproxy_configuration_file.write(f'WGConfig = {profile.get_wireguard_configuration_path()}\n\n[Socks5]\nBindAddress = 127.0.0.1:{str(port_number)}\n')
wireproxy_configuration_file.close()
return subprocess.Popen((f'{Constants.SP_DATA_HOME}/wireproxy/wireproxy', '-c', wireproxy_configuration_file_path), stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)
@staticmethod
def establish_proxy_session_connection(profile: SessionProfile, session_directory: str, port_number: int, proxy_port_number: int):
@ -198,25 +200,28 @@ class ConnectionController:
else:
raise FileNotFoundError('No valid proxy configuration file detected.')
proxy_session_directory = session_directory + '/proxy'
proxy_session_directory = f'{session_directory}/proxy'
Path(proxy_session_directory).mkdir(parents=True, exist_ok=True, mode=0o700)
proxychains_proxy_list = ''
if port_number is not None:
proxychains_proxy_list = proxychains_proxy_list + f'socks5 127.0.0.1 {port_number}\n'
proxychains_proxy_list = f'socks5 127.0.0.1 {port_number}\n'
proxychains_proxy_list = proxychains_proxy_list + f'socks5 {proxy_configuration.ip_address} {proxy_configuration.port_number} {proxy_configuration.username} {proxy_configuration.password}'
proxychains_template_file_path = f'{Constants.SP_DATA_HOME}/proxychains.ptpl'
with open(Constants.SP_DATA_HOME + '/proxychains.ptpl', 'r') as proxychains_template:
with open(proxychains_template_file_path, 'r') as proxychains_template_file:
proxychains_configuration_file_path = proxy_session_directory + '/proxychains.conf'
proxychains_configuration_file_path = f'{proxy_session_directory}/proxychains.conf'
Path(proxychains_configuration_file_path).touch(exist_ok=True, mode=0o600)
with open(proxychains_configuration_file_path, 'w') as output_file:
proxychains_configuration_file_contents = proxychains_template_file.read().format(proxy_list=proxychains_proxy_list)
output_file_contents = proxychains_template.read().format(proxy_list=proxychains_proxy_list)
output_file.write(output_file_contents)
with open(proxychains_configuration_file_path, 'w') as proxychains_configuration_file:
proxychains_configuration_file.write(proxychains_configuration_file_contents)
proxychains_configuration_file.close()
return subprocess.Popen(('proxychains4', '-f', proxychains_configuration_file_path, 'microsocks', '-p', str(proxy_port_number)), stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)

View file

@ -33,14 +33,18 @@ class BaseProfile:
def attach_wireguard_configuration(self, wireguard_configuration):
with open(self.get_wireguard_configuration_path(), 'w') as output_file:
output_file.write(wireguard_configuration)
wireguard_configuration_file_path = self.get_wireguard_configuration_path()
with open(wireguard_configuration_file_path, 'w') as wireguard_configuration_file:
wireguard_configuration_file.write(wireguard_configuration)
wireguard_configuration_file.close()
def get_wireguard_configuration_path(self):
return self.get_config_path() + '/wg.conf'
return f'{self.get_config_path()}/wg.conf'
def has_wireguard_configuration(self):
return os.path.exists(self.get_config_path() + '/wg.conf')
return os.path.exists(f'{self.get_config_path()}/wg.conf')
def is_session_profile(self):
return type(self).__name__ == 'SessionProfile'
@ -60,7 +64,7 @@ class BaseProfile:
def find_by_id(id: int):
try:
config_file_contents = open(BaseProfile.__get_config_path(id) + '/config.json', 'r').read()
config_file_contents = open(f'{BaseProfile.__get_config_path(id)}/config.json', 'r').read()
except FileNotFoundError:
return None
@ -115,7 +119,7 @@ class BaseProfile:
if BaseProfile.exists(id):
if os.path.exists(BaseProfile.__get_config_path(id) + '/config.json'):
if os.path.exists(f'{BaseProfile.__get_config_path(id)}/config.json'):
profile = BaseProfile.find_by_id(id)
@ -134,18 +138,22 @@ class BaseProfile:
if os.path.isdir(persistent_state_path):
shutil.rmtree(persistent_state_path)
config_file_contents = profile.to_json(indent=4) + '\n'
config_file_contents = f'{profile.to_json(indent=4)}\n'
os.makedirs(BaseProfile.__get_config_path(profile.id), exist_ok=True)
os.makedirs(BaseProfile.__get_data_path(profile.id), exist_ok=True)
text_io_wrapper = open(BaseProfile.__get_config_path(profile.id) + '/config.json', 'w')
text_io_wrapper.write(config_file_contents)
config_file_path = f'{BaseProfile.__get_config_path(profile.id)}/config.json'
with open(config_file_path, 'w') as config_file:
config_file.write(config_file_contents)
config_file.close()
@staticmethod
def __get_config_path(id: int):
return Constants.SP_PROFILE_CONFIG_HOME + '/' + str(id)
return f'{Constants.SP_PROFILE_CONFIG_HOME}/{str(id)}'
@staticmethod
def __get_data_path(id: int):
return Constants.SP_PROFILE_DATA_HOME + '/' + str(id)
return f'{Constants.SP_PROFILE_DATA_HOME}/{str(id)}'

View file

@ -43,7 +43,7 @@ class Configuration:
def get():
try:
config_file_contents = open(Constants.SP_CONFIG_HOME + '/config.json', 'r').read()
config_file_contents = open(f'{Constants.SP_CONFIG_HOME}/config.json', 'r').read()
except FileNotFoundError:
return None
@ -60,11 +60,15 @@ class Configuration:
@staticmethod
def save(configuration):
config_file_contents = configuration.to_json(indent=4) + '\n'
config_file_contents = f'{configuration.to_json(indent=4)}\n'
os.makedirs(Constants.SP_CONFIG_HOME, exist_ok=True)
text_io_wrapper = open(Constants.SP_CONFIG_HOME + '/config.json', 'w')
text_io_wrapper.write(config_file_contents)
config_file_path = f'{Constants.SP_CONFIG_HOME}/config.json'
with open(config_file_path, 'w') as config_file:
config_file.write(config_file_contents)
config_file.close()
@staticmethod
def _iso_format(datetime_instance: datetime):

View file

@ -11,9 +11,9 @@ class Model:
cursor = connection.cursor()
if drop_existing:
cursor.execute('DROP TABLE IF EXISTS ' + table_name)
cursor.execute(f'DROP TABLE IF EXISTS {table_name}')
cursor.execute('CREATE TABLE IF NOT EXISTS ' + table_name + '(' + table_definition + ')')
cursor.execute(f'CREATE TABLE IF NOT EXISTS {table_name} ({table_definition})')
connection.commit()
connection.close()
@ -40,7 +40,7 @@ class Model:
connection = sqlite3.connect(Constants.SP_STORAGE_DATABASE_PATH)
cursor = connection.cursor()
response = cursor.execute('SELECT EXISTS(' + query + ')', parameters).fetchone()
response = cursor.execute(f'SELECT EXISTS({query})', parameters).fetchone()
connection.close()
return response[0] == 1

View file

@ -57,7 +57,7 @@ class ApplicationVersion(Model):
)
def __post_init__(self):
self.installation_path = Constants.SP_APPLICATION_DATA_HOME + '/' + self.application_code + '/' + self.version_number
self.installation_path = f'{Constants.SP_APPLICATION_DATA_HOME}/{self.application_code}/{self.version_number}'
def is_installed(self):
return os.path.isdir(self.installation_path) and len(os.listdir(self.installation_path)) > 0

View file

@ -18,14 +18,18 @@ class SessionProfile(BaseProfile):
def attach_proxy_configuration(self, proxy_configuration):
config_file_contents = proxy_configuration.to_json(indent=4) + '\n'
proxy_configuration_file_contents = f'{proxy_configuration.to_json(indent=4)}\n'
os.makedirs(Constants.SP_CONFIG_HOME, exist_ok=True)
text_io_wrapper = open(self.get_proxy_configuration_path(), 'w')
text_io_wrapper.write(config_file_contents)
proxy_configuration_file_path = self.get_proxy_configuration_path()
with open(proxy_configuration_file_path, 'w') as proxy_configuration_file:
proxy_configuration_file.write(proxy_configuration_file_contents)
proxy_configuration_file.close()
def get_proxy_configuration_path(self):
return self.get_config_path() + '/proxy.json'
return f'{self.get_config_path()}/proxy.json'
def get_proxy_configuration(self):
@ -44,4 +48,4 @@ class SessionProfile(BaseProfile):
return proxy_configuration
def has_proxy_configuration(self):
return os.path.exists(self.get_config_path() + '/proxy.json')
return os.path.exists(f'{self.get_config_path()}/proxy.json')

View file

@ -26,7 +26,7 @@ class SessionState:
def find_by_id(id: int):
try:
session_state_file_contents = open(SessionState.__get_state_path(id) + '/state.json', 'r').read()
session_state_file_contents = open(f'{SessionState.__get_state_path(id)}/state.json', 'r').read()
except FileNotFoundError:
return None
@ -47,14 +47,16 @@ class SessionState:
@staticmethod
def save(session_state):
session_state_file_contents = session_state.to_json(indent=4) + '\n'
session_state_file_contents = f'{session_state.to_json(indent=4)}\n'
os.makedirs(SessionState.__get_state_path(session_state.id), exist_ok=True, mode=0o700)
session_state_file_path = SessionState.__get_state_path(session_state.id) + '/state.json'
session_state_file_path = f'{SessionState.__get_state_path(session_state.id)}/state.json'
Path(session_state_file_path).touch(exist_ok=True, mode=0o600)
text_io_wrapper = open(session_state_file_path, 'w')
text_io_wrapper.write(session_state_file_contents)
with open(session_state_file_path, 'w') as session_state_file:
session_state_file.write(session_state_file_contents)
session_state_file.close()
@staticmethod
def dissolve(id: int):
@ -99,4 +101,4 @@ class SessionState:
@staticmethod
def __get_state_path(id: int):
return Constants.SP_SESSION_STATE_HOME + '/' + str(id)
return f'{Constants.SP_SESSION_STATE_HOME}/{str(id)}'

View file

@ -16,7 +16,7 @@ class SystemState:
def get():
try:
system_state_file_contents = open(SystemState.__get_state_path() + '/system.json', 'r').read()
system_state_file_contents = open(f'{SystemState.__get_state_path()}/system.json', 'r').read()
except FileNotFoundError:
return None
@ -30,19 +30,21 @@ class SystemState:
@staticmethod
def exists():
return os.path.isfile(SystemState.__get_state_path() + '/system.json')
return os.path.isfile(f'{SystemState.__get_state_path()}/system.json')
@staticmethod
def save(system_state):
system_state_file_contents = system_state.to_json(indent=4) + '\n'
system_state_file_contents = f'{system_state.to_json(indent=4)}\n'
os.makedirs(SystemState.__get_state_path(), exist_ok=True, mode=0o700)
system_state_file_path = SystemState.__get_state_path() + '/system.json'
system_state_file_path = f'{SystemState.__get_state_path()}/system.json'
Path(system_state_file_path).touch(exist_ok=True, mode=0o600)
text_io_wrapper = open(system_state_file_path, 'w')
text_io_wrapper.write(system_state_file_contents)
with open(system_state_file_path, 'w') as system_state_file:
system_state_file.write(system_state_file_contents)
system_state_file.close()
@staticmethod
def dissolve():
@ -51,7 +53,7 @@ class SystemState:
if system_state is not None:
system_state_path = SystemState.__get_state_path() + '/system.json'
system_state_path = f'{SystemState.__get_state_path()}/system.json'
pathlib.Path.unlink(Path(system_state_path), missing_ok=True)
@staticmethod

View file

@ -5,7 +5,7 @@ class BaseObserver:
def subscribe(self, topic, callback):
callbacks = getattr(self, 'on_' + topic, None)
callbacks = getattr(self, f'on_{topic}', None)
if callbacks is None:
return
@ -14,7 +14,7 @@ class BaseObserver:
def notify(self, topic, subject = None, meta = None):
callbacks = getattr(self, 'on_' + topic, None)
callbacks = getattr(self, f'on_{topic}', None)
if callbacks is None:
return

View file

@ -30,7 +30,7 @@ class WebServiceApiService:
@staticmethod
def get_application_versions(code: str, proxies: Optional[dict] = None):
response = WebServiceApiService.__get('/platforms/linux-x86_64/applications/' + code + '/application-versions', None, proxies)
response = WebServiceApiService.__get(f'/platforms/linux-x86_64/applications/{code}/application-versions', None, proxies)
application_versions = []
if response.status_code == requests.codes.ok:
@ -139,7 +139,7 @@ class WebServiceApiService:
@staticmethod
def post_wireguard_session(location_code: str, billing_code: str, proxies: Optional[dict] = None):
response = WebServiceApiService.__post('/locations/' + location_code + '/wireguard-sessions', billing_code, proxies)
response = WebServiceApiService.__post(f'/locations/{location_code}/wireguard-sessions', billing_code, proxies)
if response.status_code == requests.codes.created:
return response.text