sp-hydra-veil-core/main.py
2024-09-11 19:39:33 +02:00

259 lines
13 KiB
Python

from core.Constants import Constants
from core.Errors import MissingSubscriptionError, InvalidSubscriptionError
from core.controllers.ApplicationController import ApplicationController
from core.controllers.ApplicationVersionController import ApplicationVersionController
from core.controllers.ClientController import ClientController
from core.controllers.ConfigurationController import ConfigurationController
from core.controllers.InvoiceController import InvoiceController
from core.controllers.LocationController import LocationController
from core.controllers.ProfileController import ProfileController
from core.controllers.SubscriptionController import SubscriptionController
from core.controllers.SubscriptionPlanController import SubscriptionPlanController
from core.models.session.SessionConnection import SessionConnection
from core.models.session.SessionProfile import SessionProfile
from core.models.system.SystemConnection import SystemConnection
from core.models.system.SystemProfile import SystemProfile
from core.observers.ApplicationVersionObserver import ApplicationVersionObserver
from core.observers.ClientObserver import ClientObserver
from core.observers.ConnectionObserver import ConnectionObserver
from core.observers.InvoiceObserver import InvoiceObserver
from core.observers.ProfileObserver import ProfileObserver
from pathlib import Path
import argparse
import pprint
if __name__ == '__main__':
Path(Constants.SP_CONFIG_HOME).mkdir(parents=True, exist_ok=True)
Path(Constants.SP_DATA_HOME).mkdir(parents=True, exist_ok=True)
application_version_observer = ApplicationVersionObserver()
client_observer = ClientObserver()
connection_observer = ConnectionObserver()
invoice_observer = InvoiceObserver()
profile_observer = ProfileObserver()
application_version_observer.subscribe('downloading', lambda event: print(f'Downloading {ApplicationController.get(event.subject.application_code).name}, version {event.subject.version_number}...\n'))
client_observer.subscribe('synchronizing', lambda event: print('Synchronizing...\n'))
connection_observer.subscribe('connecting', lambda event: print(f'[{event.subject.get("attempt_count")}/{event.subject.get("maximum_amount_of_retries")}] Performing connection attempt...\n'))
invoice_observer.subscribe('retrieved', lambda event: print(f'\n{pprint.pp(event.subject)}\n'))
invoice_observer.subscribe('processing', lambda event: print('A payment has been detected and is being verified...\n'))
invoice_observer.subscribe('settled', lambda event: print('The payment has been successfully verified.\n'))
profile_observer.subscribe('created', lambda event: pprint.pp((event.subject, 'Created')))
profile_observer.subscribe('destroyed', lambda event: pprint.pp((event.subject, 'Destroyed')))
profile_observer.subscribe('disabled', lambda event: pprint.pp((event.subject, 'Disabled')) if event.meta.get('explicitly') else None)
profile_observer.subscribe('enabled', lambda event: pprint.pp((event.subject, 'Enabled')))
safe_parser = argparse.ArgumentParser(add_help=False)
safe_parser.add_argument('--force', '-f', action='store_true')
main_parser = argparse.ArgumentParser(prog='simplified-privacy')
main_parser.add_argument('--version', '-v', action='version', version='simplified-privacy v0.1.0')
main_subparsers = main_parser.add_subparsers(title='commands', dest='command')
profile_parser = main_subparsers.add_parser('profile')
profile_subparsers = profile_parser.add_subparsers(title='subcommands', dest='subcommand')
profile_base_parser = argparse.ArgumentParser(add_help=False)
profile_base_parser.add_argument('--id', '-i', type=int, required=True)
profile_subparsers.add_parser('list')
profile_subparsers.add_parser('show', parents=[profile_base_parser])
profile_create_parser = profile_subparsers.add_parser('create')
profile_create_subparsers = profile_create_parser.add_subparsers(title='profile_types', dest='profile_type')
session_profile_create_parser = profile_create_subparsers.add_parser('session', parents=[profile_base_parser, safe_parser])
session_profile_create_parser.add_argument('--name', '-n', default='')
session_profile_create_parser.add_argument('--location', '-l', dest='location_code', default=None)
session_profile_create_parser.add_argument('--application', '-a', required=True)
session_profile_create_parser.add_argument('--connection', '-c', dest='connection_type', choices=['system', 'tor', 'wireguard'], default='system')
session_profile_create_parser.add_argument('--mask-connection', '-m', action='store_true')
session_profile_create_parser.add_argument('--resolution', '-r', default='1280x720')
system_profile_create_parser = profile_create_subparsers.add_parser('system', parents=[profile_base_parser, safe_parser])
system_profile_create_parser.add_argument('--name', '-n', default='')
system_profile_create_parser.add_argument('--location', '-l', dest='location_code', default=None)
system_profile_create_parser.add_argument('--connection', '-c', dest='connection_type', choices=['wireguard'], default='wireguard')
profile_subparsers.add_parser('destroy', parents=[profile_base_parser, safe_parser])
profile_subparsers.add_parser('enable', parents=[profile_base_parser, safe_parser])
profile_subparsers.add_parser('disable', parents=[profile_base_parser, safe_parser])
get_parser = main_subparsers.add_parser('get')
get_subparsers = get_parser.add_subparsers(title='subcommands', dest='subcommand')
get_connection_parser = get_subparsers.add_parser('connection')
set_parser = main_subparsers.add_parser('set')
set_subparsers = set_parser.add_subparsers(title='subcommands', dest='subcommand')
set_connection_parser = set_subparsers.add_parser('connection')
set_connection_parser.add_argument('connection_type', choices=['system', 'tor'])
sync_parser = main_subparsers.add_parser('sync')
arguments = main_parser.parse_args()
if arguments.command is None:
main_parser.print_help()
elif arguments.command == 'profile':
if arguments.subcommand is None:
profile_parser.print_help()
elif arguments.subcommand == 'list':
pprint.pp(ProfileController.get_all())
elif arguments.subcommand == 'show':
pprint.pp(ProfileController.get(arguments.id))
elif arguments.subcommand == 'create':
location = LocationController.get(arguments.location_code)
if location is None:
main_parser.error('the following argument should be a valid reference: --location/-l')
if arguments.profile_type == 'session':
if not arguments.application:
arguments.application = ':'
application_details = arguments.application.split(':', 1)
application_version = ApplicationVersionController.get(application_details[0], application_details[1])
if application_version is None:
main_parser.error('the following argument should be a valid reference: --application/-a')
connection = SessionConnection(arguments.connection_type, arguments.mask_connection)
profile = SessionProfile(arguments.id, arguments.name, None, location, arguments.resolution, application_version, connection)
ProfileController.create(profile, profile_observer=profile_observer)
else:
connection = SystemConnection(arguments.connection_type)
profile = SystemProfile(arguments.id, arguments.name, None, location, connection)
ProfileController.create(profile, profile_observer=profile_observer)
elif arguments.subcommand == 'destroy':
profile = ProfileController.get(arguments.id)
if profile is not None:
ProfileController.destroy(profile, profile_observer=profile_observer)
else:
main_parser.error('the following argument should be a valid reference: --id/-i')
elif arguments.subcommand == 'enable':
profile = ProfileController.get(arguments.id)
if profile is not None:
try:
ProfileController.enable(profile, arguments.force, profile_observer=profile_observer, application_version_observer=application_version_observer, connection_observer=connection_observer)
except (InvalidSubscriptionError, MissingSubscriptionError) as exception:
if type(exception).__name__ == 'InvalidSubscriptionError':
print('The profile\'s subscription appears to be invalid.\n')
if type(exception).__name__ == 'MissingSubscriptionError':
print('The profile is not tied to a subscription.\n')
manage_subscription_input = None
while manage_subscription_input not in ['1', '2', '3', '']:
print('Please select from the following:\n')
print(' 1) Request new subscription')
print(' 2) Enter billing code')
print('\n 3) Exit')
manage_subscription_input = input('\nEnter your choice [1]: ')
if manage_subscription_input == '1' or manage_subscription_input == '':
print('\nCreating subscription...\n')
subscription_plan = SubscriptionPlanController.get(profile.connection, 720)
if subscription_plan is None:
raise RuntimeError('No compatible subscription plan was found. Please contact support.')
potential_subscription = SubscriptionController.create(subscription_plan, profile, connection_observer=connection_observer)
if potential_subscription is not None:
ProfileController.attach_subscription(profile, potential_subscription)
else:
raise RuntimeError('The subscription could not be created. Please try again later.')
subscription = InvoiceController.handle_payment(potential_subscription.billing_code, invoice_observer=invoice_observer, connection_observer=connection_observer)
if subscription is not None:
ProfileController.attach_subscription(profile, subscription)
else:
raise RuntimeError('The subscription could not be activated. Please try again later.')
ProfileController.enable(profile, force=arguments.force, profile_observer=profile_observer, application_version_observer=application_version_observer, connection_observer=connection_observer)
elif manage_subscription_input == '2':
billing_code = input('\nEnter your billing code: ')
print()
subscription = SubscriptionController.get(billing_code, connection_observer=connection_observer)
if subscription is not None:
ProfileController.attach_subscription(profile, subscription)
ProfileController.enable(profile, arguments.force, profile_observer=profile_observer, application_version_observer=application_version_observer, connection_observer=connection_observer)
else:
print('\nThe billing code appears to be invalid.\n')
manage_subscription_input = None
elif manage_subscription_input == '3':
pass
else:
print('\nInput appears to be invalid. Please try again.\n')
else:
main_parser.error('the following argument should be a valid reference: --id/-i')
elif arguments.subcommand == 'disable':
profile = ProfileController.get(arguments.id)
if profile is not None:
ProfileController.disable(profile, force=arguments.force, profile_observer=profile_observer)
else:
main_parser.error('the following argument should be a valid reference: --id/-i')
elif arguments.command == 'sync':
ClientController.sync(client_observer=client_observer, connection_observer=connection_observer)
elif arguments.command == 'get':
if arguments.subcommand == 'connection':
print(ConfigurationController.get_connection())
elif arguments.command == 'set':
if arguments.subcommand == 'connection':
ConfigurationController.set_connection(arguments.connection_type)