from core.Constants import Constants from core.Errors import MissingSubscriptionError, InvalidSubscriptionError from core.controllers.ApplicationController import ApplicationController from core.controllers.ApplicationVersionController import ApplicationVersionController from core.controllers.ClientController import ClientController from core.controllers.ConfigurationController import ConfigurationController from core.controllers.InvoiceController import InvoiceController from core.controllers.LocationController import LocationController from core.controllers.ProfileController import ProfileController from core.controllers.SubscriptionController import SubscriptionController from core.controllers.SubscriptionPlanController import SubscriptionPlanController from core.models.session.Application import Application from core.models.session.SessionConnection import SessionConnection from core.models.session.SessionProfile import SessionProfile from core.models.system.SystemConnection import SystemConnection from core.models.system.SystemProfile import SystemProfile from core.observers.ApplicationVersionObserver import ApplicationVersionObserver from core.observers.ClientObserver import ClientObserver from core.observers.ConnectionObserver import ConnectionObserver from core.observers.InvoiceObserver import InvoiceObserver from core.observers.ProfileObserver import ProfileObserver from pathlib import Path from typing import Optional, Union import argparse import pprint import re if __name__ == '__main__': Path(Constants.SP_CONFIG_HOME).mkdir(parents=True, exist_ok=True) Path(Constants.SP_DATA_HOME).mkdir(parents=True, exist_ok=True) application_version_observer = ApplicationVersionObserver() client_observer = ClientObserver() connection_observer = ConnectionObserver() invoice_observer = InvoiceObserver() profile_observer = ProfileObserver() application_version_observer.subscribe('downloading', lambda event: print(f'Downloading {ApplicationController.get(event.subject.application_code).name}, version {event.subject.version_number}...')) application_version_observer.subscribe('download_progress', lambda event: print(f'Current progress: {event.meta.get('progress'):.2f}%', flush=True, end='\r')) application_version_observer.subscribe('download_finished', lambda event: print('\n')) client_observer.subscribe('synchronizing', lambda event: print('Synchronizing...\n')) connection_observer.subscribe('connecting', lambda event: print(f'[{event.subject.get("attempt_count")}/{event.subject.get("maximum_number_of_attempts")}] Performing connection attempt...\n')) invoice_observer.subscribe('retrieved', lambda event: print(f'\n{pprint.pp(event.subject)}\n')) invoice_observer.subscribe('processing', lambda event: print('A payment has been detected and is being verified...\n')) invoice_observer.subscribe('settled', lambda event: print('The payment has been successfully verified.\n')) profile_observer.subscribe('created', lambda event: pprint.pp((__sanitize_profile(event.subject), 'Created'))) profile_observer.subscribe('destroyed', lambda event: pprint.pp((__sanitize_profile(event.subject), 'Destroyed'))) profile_observer.subscribe('disabled', lambda event: pprint.pp((__sanitize_profile(event.subject), 'Disabled')) if event.meta.get('explicitly') else None) profile_observer.subscribe('enabled', lambda event: pprint.pp((__sanitize_profile(event.subject), 'Enabled'))) def __parse_application_string(application_string: Optional[str] = None): if application_string is None: return dict(application_code='', version_number='') parsed_application_string = re.match('^(?P.*?):(?P.*?)$', application_string) if parsed_application_string is None: return dict(application_code='', version_number='') else: return parsed_application_string.groupdict() def __sanitize_profile(candidate: Optional[Union[SessionProfile, SystemProfile]]): if candidate is not None and candidate.has_subscription(): sanitized_billing_code = candidate.subscription.get_sanitized_billing_code() candidate.subscription.billing_code = sanitized_billing_code return candidate pristine_parser = argparse.ArgumentParser(add_help=False) pristine_parser.add_argument('--pristine', '-p', action='store_true') safe_parser = argparse.ArgumentParser(add_help=False) safe_parser.add_argument('--force', '-f', action='store_true') main_parser = argparse.ArgumentParser(prog='simplified-privacy') main_parser.add_argument('--version', '-v', action='version', version='simplified-privacy v0.1.0') main_subparsers = main_parser.add_subparsers(title='commands', dest='command') profile_parser = main_subparsers.add_parser('profile') profile_subparsers = profile_parser.add_subparsers(title='subcommands', dest='subcommand') profile_base_parser = argparse.ArgumentParser(add_help=False) profile_base_parser.add_argument('--id', '-i', type=int, required=True) profile_subparsers.add_parser('list') profile_subparsers.add_parser('show', parents=[profile_base_parser]) profile_create_parser = profile_subparsers.add_parser('create') profile_create_subparsers = profile_create_parser.add_subparsers(title='profile_types', dest='profile_type') session_profile_create_parser = profile_create_subparsers.add_parser('session', parents=[profile_base_parser, safe_parser]) session_profile_create_parser.add_argument('--name', '-n', default='') session_profile_create_parser.add_argument('--location', '-l', dest='location_code', default=None) session_profile_create_parser.add_argument('--application', '-a', required=True) session_profile_create_parser.add_argument('--connection', '-c', dest='connection_type', choices=['system', 'tor', 'wireguard'], default='system') session_profile_create_parser.add_argument('--mask-connection', '-m', action='store_true') session_profile_create_parser.add_argument('--resolution', '-r', default='1280x720') system_profile_create_parser = profile_create_subparsers.add_parser('system', parents=[profile_base_parser, safe_parser]) system_profile_create_parser.add_argument('--name', '-n', default='') system_profile_create_parser.add_argument('--location', '-l', dest='location_code', default=None) system_profile_create_parser.add_argument('--connection', '-c', dest='connection_type', choices=['wireguard'], default='wireguard') profile_subparsers.add_parser('destroy', parents=[profile_base_parser, safe_parser]) profile_subparsers.add_parser('enable', parents=[profile_base_parser, pristine_parser, safe_parser]) profile_subparsers.add_parser('disable', parents=[profile_base_parser, safe_parser]) application_parser = main_subparsers.add_parser('application') application_subparsers = application_parser.add_subparsers(title='subcommands', dest='subcommand') application_base_parser = argparse.ArgumentParser(add_help=False) application_base_parser.add_argument('--application', '-a', required=True) application_list_parser = application_subparsers.add_parser('list') application_list_parser.add_argument('--code', '-c') application_show_parser = application_subparsers.add_parser('show', parents=[application_base_parser]) application_install_parser = application_subparsers.add_parser('install', parents=[application_base_parser]) application_install_parser.add_argument('--reinstall', '-r', action='store_true') application_uninstall_parser = application_subparsers.add_parser('uninstall', parents=[application_base_parser]) get_parser = main_subparsers.add_parser('get') get_subparsers = get_parser.add_subparsers(title='subcommands', dest='subcommand') get_connection_parser = get_subparsers.add_parser('connection') set_parser = main_subparsers.add_parser('set') set_subparsers = set_parser.add_subparsers(title='subcommands', dest='subcommand') set_connection_parser = set_subparsers.add_parser('connection') set_connection_parser.add_argument('connection_type', choices=['system', 'tor']) sync_parser = main_subparsers.add_parser('sync') arguments = main_parser.parse_args() if arguments.command is None: main_parser.print_help() elif arguments.command == 'profile': if arguments.subcommand is None: profile_parser.print_help() elif arguments.subcommand == 'list': profiles = ProfileController.get_all() for key, value in profiles.items(): profiles[key] = __sanitize_profile(value) pprint.pp(profiles) elif arguments.subcommand == 'show': pprint.pp(ProfileController.get(arguments.id)) elif arguments.subcommand == 'create': location = LocationController.get(arguments.location_code) if location is None: main_parser.error('the following argument should be a valid reference: --location/-l') if arguments.profile_type == 'session': application_details = __parse_application_string(arguments.application) application_version = ApplicationVersionController.get(application_details.get('application_code'), application_details.get('version_number')) if application_version is None: main_parser.error('the following argument should be a valid reference: --application/-a') connection = SessionConnection(arguments.connection_type, arguments.mask_connection) profile = SessionProfile(arguments.id, arguments.name, None, location, arguments.resolution, application_version, connection) ProfileController.create(profile, profile_observer=profile_observer) else: connection = SystemConnection(arguments.connection_type) profile = SystemProfile(arguments.id, arguments.name, None, location, connection) ProfileController.create(profile, profile_observer=profile_observer) elif arguments.subcommand == 'destroy': profile = ProfileController.get(arguments.id) if profile is not None: ProfileController.destroy(profile, profile_observer=profile_observer) else: main_parser.error('the following argument should be a valid reference: --id/-i') elif arguments.subcommand == 'enable': profile = ProfileController.get(arguments.id) if profile is not None: try: ProfileController.enable(profile, force=arguments.force, pristine=arguments.pristine, asynchronous=True, profile_observer=profile_observer, application_version_observer=application_version_observer, connection_observer=connection_observer) except (InvalidSubscriptionError, MissingSubscriptionError) as exception: if type(exception).__name__ == 'InvalidSubscriptionError': print('The profile\'s subscription appears to be invalid.\n') elif type(exception).__name__ == 'MissingSubscriptionError': print('The profile is not tied to a subscription.\n') manage_subscription_input = None while manage_subscription_input not in ('1', '2', '3', ''): print('Please select from the following:\n') print(' 1) Request new subscription') print(' 2) Enter billing code') print('\n 3) Exit') manage_subscription_input = input('\nEnter your choice [1]: ') if manage_subscription_input == '1' or manage_subscription_input == '': print('\nCreating subscription...\n') subscription_plan = SubscriptionPlanController.get(profile.connection, 720) if subscription_plan is None: raise RuntimeError('No compatible subscription plan was found. Please contact support.') potential_subscription = SubscriptionController.create(subscription_plan, profile, connection_observer=connection_observer) if potential_subscription is not None: ProfileController.attach_subscription(profile, potential_subscription) else: raise RuntimeError('The subscription could not be created. Please try again later.') subscription = InvoiceController.handle_payment(potential_subscription.billing_code, invoice_observer=invoice_observer, connection_observer=connection_observer) if subscription is not None: ProfileController.attach_subscription(profile, subscription) else: raise RuntimeError('The subscription could not be activated. Please try again later.') ProfileController.enable(profile, force=arguments.force, pristine=arguments.pristine, asynchronous=True, profile_observer=profile_observer, application_version_observer=application_version_observer, connection_observer=connection_observer) elif manage_subscription_input == '2': billing_code = input('\nEnter your billing code: ') print() subscription = SubscriptionController.get(billing_code, connection_observer=connection_observer) if subscription is not None: ProfileController.attach_subscription(profile, subscription) ProfileController.enable(profile, force=arguments.force, pristine=arguments.pristine, asynchronous=True, profile_observer=profile_observer, application_version_observer=application_version_observer, connection_observer=connection_observer) else: print('\nThe billing code appears to be invalid.\n') manage_subscription_input = None elif manage_subscription_input == '3': pass else: print('\nInput appears to be invalid. Please try again.\n') else: main_parser.error('the following argument should be a valid reference: --id/-i') elif arguments.subcommand == 'disable': profile = ProfileController.get(arguments.id) if profile is not None: ProfileController.disable(profile, force=arguments.force, profile_observer=profile_observer) else: main_parser.error('the following argument should be a valid reference: --id/-i') elif arguments.command == 'application': if arguments.subcommand is None: application_parser.print_help() elif arguments.subcommand == 'list': if arguments.code: application = Application.find(arguments.code) if application is not None: pprint.pp(ApplicationVersionController.get_all(application)) else: main_parser.error('the following argument should be a valid reference: --code/-c') else: pprint.pp(ApplicationVersionController.get_all()) elif arguments.subcommand == 'show': application_details = __parse_application_string(arguments.application) application_version = ApplicationVersionController.get(application_details.get('application_code'), application_details.get('version_number')) if application_version is not None: pprint.pp(application_version) else: main_parser.error('the following argument should be a valid reference: --application/-a') elif arguments.subcommand == 'install': application_details = __parse_application_string(arguments.application) application_version = ApplicationVersionController.get(application_details.get('application_code'), application_details.get('version_number')) if application_version is not None: ApplicationVersionController.install(application_version, arguments.reinstall, application_version_observer=application_version_observer, connection_observer=connection_observer) else: main_parser.error('the following argument should be a valid reference: --application/-a') elif arguments.subcommand == 'uninstall': application_details = __parse_application_string(arguments.application) application_version = ApplicationVersionController.get(application_details.get('application_code'), application_details.get('version_number')) if application_version is not None: ApplicationVersionController.uninstall(application_version) else: main_parser.error('the following argument should be a valid reference: --application/-a') elif arguments.command == 'sync': ClientController.sync(client_observer=client_observer, connection_observer=connection_observer) elif arguments.command == 'get': if arguments.subcommand is None: get_parser.print_help() elif arguments.subcommand == 'connection': print(ConfigurationController.get_connection()) elif arguments.command == 'set': if arguments.subcommand is None: set_parser.print_help() elif arguments.subcommand == 'connection': ConfigurationController.set_connection(arguments.connection_type)