734 lines
31 KiB
Python
734 lines
31 KiB
Python
# Módulos estándar
|
|
import glob
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import re
|
|
import shutil
|
|
import subprocess
|
|
import time
|
|
|
|
# Módulos de terceros
|
|
from tqdm import tqdm
|
|
|
|
# Módulos de PyQt6
|
|
from PyQt6.QtCore import QThread, pyqtSignal
|
|
|
|
# Otros
|
|
import importlib.util
|
|
|
|
class WorkerThread(QThread):
|
|
result = pyqtSignal(bool)
|
|
|
|
def __init__(self, key, profile_selected, hugo_memory=None, template_selected=None, project_selected=None, parent=None):
|
|
super().__init__(parent)
|
|
self.key = key
|
|
self.profile_selected = profile_selected
|
|
self.hugo_memory = hugo_memory or {}
|
|
self.template_selected = template_selected
|
|
self.project_selected = project_selected
|
|
|
|
|
|
|
|
def run(self):
|
|
success = False
|
|
if self.key == 'verify_wallet':
|
|
success = self.verify_wallet()
|
|
elif self.key == 'create_wallet':
|
|
success = self.create_wallet()
|
|
elif self.key == 'create_website':
|
|
success = self.create_website()
|
|
elif self.key == 'edit_website':
|
|
success = self.edit_website()
|
|
elif self.key == 'public_arweave':
|
|
success = self.public_arweave()
|
|
elif self.key == 'refresh_arweave':
|
|
success = self.refresh_arweave()
|
|
else:
|
|
print("Invalid key provided.")
|
|
self.result.emit(success)
|
|
|
|
def create_wallet(self):
|
|
# Ruta del perfil
|
|
profile_path = os.path.join(".config_app", "profiles", self.profile_selected)
|
|
wallet_path = os.path.join(profile_path, f"wallet_{self.profile_selected}.json")
|
|
|
|
# Verificar si el directorio del perfil existe. Si no, crearlo
|
|
if not os.path.exists(profile_path):
|
|
try:
|
|
os.makedirs(profile_path) # Crear el directorio para el perfil si no existe
|
|
print(f"El directorio del perfil {self.profile_selected} ha sido creado.")
|
|
except Exception as e:
|
|
print(f"Error al crear el directorio del perfil {self.profile_selected}: {e}")
|
|
|
|
# Generación de la seedphrase
|
|
seed_proc = subprocess.run(["ardrive", "generate-seedphrase"], text=True, stdout=subprocess.PIPE)
|
|
seed = seed_proc.stdout.strip().replace('"', '')
|
|
|
|
# Generar la wallet
|
|
result = subprocess.run(
|
|
["ardrive", "generate-wallet", "-s", seed],
|
|
cwd=profile_path,
|
|
text=True,
|
|
stdout=subprocess.PIPE
|
|
)
|
|
wallet_data = result.stdout.strip()
|
|
|
|
# Guardar la wallet en un archivo JSON
|
|
try:
|
|
with open(wallet_path, 'w') as f:
|
|
json.dump(json.loads(wallet_data), f, indent=4)
|
|
except Exception as e:
|
|
print(f"Error al guardar la wallet en {wallet_path}: {e}")
|
|
|
|
# Obtener el balance de la wallet
|
|
try:
|
|
subprocess.run(
|
|
["ardrive", "get-balance", "-w", wallet_path],
|
|
capture_output=True, text=True, check=True
|
|
)
|
|
return True
|
|
except subprocess.CalledProcessError:
|
|
return False
|
|
def verify_wallet(self):
|
|
wallet_path = os.path.join(
|
|
".config_app", "profiles", self.profile_selected, f"wallet_{self.profile_selected}.json"
|
|
)
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
["ardrive", "get-balance", "-w", wallet_path],
|
|
capture_output=True,
|
|
text=True,
|
|
check=True
|
|
)
|
|
print("Resultado del balance:")
|
|
print(result.stdout)
|
|
return True
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
print("Error al obtener el balance:")
|
|
print(e.stderr)
|
|
return False
|
|
|
|
def create_website(self):
|
|
print(self.hugo_memory)
|
|
def create_hugo_structure():
|
|
#creaar estructura hugo
|
|
time.sleep(2)
|
|
print(f"BasePage Variables:\nprofile_selected: {self.profile_selected}\nproject_selected: {self.project_selected}\ntemplate_selected: {self.template_selected}")
|
|
|
|
general_path = os.path.join(".config_app", "profiles", self.profile_selected)
|
|
create_hugo = subprocess.run(
|
|
["hugo", "new", "site", self.project_selected],
|
|
cwd=general_path, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
|
|
)
|
|
|
|
if create_hugo.returncode != 0:
|
|
print(f"Error: {create_hugo.stderr}")
|
|
return False
|
|
|
|
print(f"Website '{self.project_selected}' created successfully!")
|
|
|
|
project_path = os.path.join(general_path, self.project_selected)
|
|
os.makedirs(os.path.join(project_path, "config", "_default"), exist_ok=True)
|
|
os.makedirs(os.path.join(project_path, "kontent"), exist_ok=True)
|
|
for fname in ["config.yaml", "taxonomies.yaml", "outputs.yaml"]:
|
|
open(os.path.join(project_path, "config", "_default", fname), "a").close()
|
|
|
|
for unwanted in ["content", "layouts", "static"]:
|
|
path = os.path.join(project_path, unwanted)
|
|
if os.path.isdir(path):
|
|
shutil.rmtree(path, ignore_errors=True)
|
|
hugo_toml = os.path.join(project_path, "hugo.toml")
|
|
if os.path.isfile(hugo_toml):
|
|
os.remove(hugo_toml)
|
|
|
|
# Copiar la carpeta del tema (ya implementado)
|
|
src_theme_path = os.path.join(
|
|
".config_app", "templates", self.template_selected, self.template_selected
|
|
)
|
|
dst_theme_dir = os.path.join(project_path, "themes")
|
|
dst_theme_path = os.path.join(dst_theme_dir, self.template_selected)
|
|
|
|
if os.path.isdir(src_theme_path):
|
|
os.makedirs(dst_theme_dir, exist_ok=True)
|
|
shutil.copytree(src_theme_path, dst_theme_path, dirs_exist_ok=True)
|
|
print(f"Template '{self.template_selected}' copiado a 'themes/'.")
|
|
else:
|
|
print(f"Error: template no encontrado en {src_theme_path}")
|
|
return False
|
|
|
|
# Copiar la carpeta 'kontent' del template a profile/
|
|
copy_kontent_path = os.path.join(
|
|
".config_app", "templates", self.template_selected, "kontent"
|
|
)
|
|
paste_kontent_path = os.path.join(general_path, self.project_selected, "kontent")
|
|
|
|
if os.path.isdir(copy_kontent_path):
|
|
shutil.copytree(copy_kontent_path, paste_kontent_path, dirs_exist_ok=True)
|
|
print(f"Karpeta 'kontent' copiada a {paste_kontent_path}.")
|
|
else:
|
|
print(f"Error: carpeta 'kontent' no encontrada en {copy_kontent_path}")
|
|
|
|
return True
|
|
|
|
|
|
time.sleep(1)
|
|
def call_create_markdown():
|
|
template_dir = os.path.join(".config_app","templates",self.template_selected)
|
|
for file_path in glob.glob(f"{template_dir}/*.py"):
|
|
try:
|
|
name = os.path.splitext(os.path.basename(file_path))[0]
|
|
spec = importlib.util.spec_from_file_location(name, file_path)
|
|
module = importlib.util.module_from_spec(spec)
|
|
spec.loader.exec_module(module)
|
|
#Ir al main.py de layout (plantilla) y esperar envio de sesultado, se le envia
|
|
#hugo memory
|
|
|
|
if hasattr(module, 'create_markdown'):
|
|
print(f"Execute create_markdown from: {file_path}")
|
|
result = module.create_markdown(
|
|
self.profile_selected,
|
|
self.hugo_memory,
|
|
self.template_selected,
|
|
self.project_selected
|
|
)
|
|
|
|
if result is True:
|
|
print("Markdown creado correctamente. Añadiendo contenido y compilando...")
|
|
|
|
base_path = os.path.join(".config_app","profiles",self.profile_selected, self.project_selected)
|
|
config_dir = os.path.join(base_path, "config", "_default")
|
|
os.makedirs(config_dir, exist_ok=True)
|
|
|
|
config_path = os.path.join(config_dir, "config.yaml")
|
|
config_text = (
|
|
f'title: "{self.hugo_memory.get("line_website_name", "Generic name")}"\n'
|
|
f'contentDir: "kontent"\n'
|
|
f'theme: "{self.template_selected}"\n'
|
|
f'relativeURLs: true\n'
|
|
f'params:\n'
|
|
f' author: "{self.profile_selected}"\n'
|
|
f' description: "{self.hugo_memory.get("plain_website_description", "No description")}"\n'
|
|
f' seoKeys: "{self.hugo_memory.get("plain_website_seo", "No keywords")}"\n'
|
|
f'markup:\n'
|
|
f' goldmark:\n'
|
|
f' renderer:\n'
|
|
f' unsafe: true\n'
|
|
)
|
|
with open(config_path, "w", encoding="utf-8") as f:
|
|
f.write(config_text)
|
|
print(f"Config created in {config_path}")
|
|
|
|
outputs_path = os.path.join(config_dir, "outputs.yaml")
|
|
outputs_text = (
|
|
f'home:\n'
|
|
f' - HTML\n'
|
|
f' - RSS\n'
|
|
f'section:\n'
|
|
f' - HTML\n'
|
|
f'page:\n'
|
|
f' - HTML\n'
|
|
f'taxonomy:\n'
|
|
f' - HTML\n'
|
|
f'term:\n'
|
|
f' - HTML\n'
|
|
)
|
|
with open(outputs_path, "w", encoding="utf-8") as f:
|
|
f.write(outputs_text)
|
|
print(f"Config created in {outputs_path}")
|
|
|
|
taxonomies_path = os.path.join(config_dir, "taxonomies.yaml")
|
|
|
|
# Obtener la lista desde hugo_memory, o usar lista vacía si no existe
|
|
taxonomies_list = self.hugo_memory.get("taxonomies", [])
|
|
|
|
# Generar el texto YAML dinámicamente
|
|
taxonomies_text = ""
|
|
for taxonomy in taxonomies_list:
|
|
taxonomies_text += f'{taxonomy}: "{taxonomy}"\n'
|
|
|
|
# Guardar el archivo YAML
|
|
with open(taxonomies_path, "w", encoding="utf-8") as f:
|
|
f.write(taxonomies_text)
|
|
|
|
print(f"Config created in {taxonomies_path}")
|
|
print(f"Ejecutando Hugo en: {base_path}")
|
|
print("Comando: hugo")
|
|
|
|
build = subprocess.run(
|
|
["hugo"],
|
|
cwd=base_path, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
|
|
)
|
|
|
|
print("Salida estándar:")
|
|
print(build.stdout)
|
|
print("Salida de error:")
|
|
print(build.stderr)
|
|
|
|
if build.returncode != 0:
|
|
print(f"Error hugo:\n{build.stderr}")
|
|
return False
|
|
|
|
|
|
# 🔐 Crear y guardar estructura con hashes
|
|
print("New Structure!!")
|
|
structure = {}
|
|
public_path = os.path.join(base_path, "public")
|
|
for root, dirs, files in os.walk(public_path, topdown=False):
|
|
rel_path = os.path.relpath(root, public_path)
|
|
pointer = structure
|
|
if rel_path != ".":
|
|
for part in rel_path.split(os.sep):
|
|
pointer = pointer.setdefault(part, {})
|
|
file_data = {}
|
|
for file in sorted(files):
|
|
full_path = os.path.join(root, file)
|
|
with open(full_path, "rb") as f:
|
|
file_hash = hashlib.md5(f.read()).hexdigest()
|
|
pointer[file] = file_hash
|
|
file_data[file] = file_hash
|
|
for d in sorted(dirs):
|
|
sub_pointer = structure
|
|
for part in os.path.join(rel_path, d).split(os.sep):
|
|
sub_pointer = sub_pointer.get(part, {})
|
|
if "__hash__" in sub_pointer:
|
|
file_data[d] = {"__hash__": sub_pointer["__hash__"]}
|
|
pointer["__hash__"] = hashlib.md5(json.dumps(file_data, sort_keys=True).encode()).hexdigest()
|
|
|
|
structure_path = os.path.join(base_path, "structure.json")
|
|
with open(structure_path, "w", encoding="utf-8") as f:
|
|
json.dump(structure, f, indent=4, sort_keys=True)
|
|
print(f"Save structure in {structure_path}")
|
|
|
|
return True
|
|
|
|
return False
|
|
except Exception as e:
|
|
print(f"Error{file_path} → {e}")
|
|
return False
|
|
print("No found create_markdown")
|
|
return False
|
|
|
|
|
|
# Flujo principal
|
|
if create_hugo_structure():
|
|
return call_create_markdown()
|
|
return False
|
|
|
|
def edit_website(self):
|
|
print(self.template_selected)
|
|
#obtener self.template_selected
|
|
template_dir = os.path.join(".config_app","templates",self.template_selected)
|
|
base_path = os.path.join(".config_app", "profiles", self.profile_selected, self.project_selected)
|
|
|
|
for file_path in glob.glob(f"{template_dir}/*.py"):
|
|
try:
|
|
name = os.path.splitext(os.path.basename(file_path))[0]
|
|
spec = importlib.util.spec_from_file_location(name, file_path)
|
|
module = importlib.util.module_from_spec(spec)
|
|
spec.loader.exec_module(module)
|
|
|
|
if hasattr(module, 'create_markdown'):
|
|
result = module.create_markdown(
|
|
self.profile_selected,
|
|
self.hugo_memory,
|
|
self.template_selected,
|
|
self.project_selected
|
|
)
|
|
|
|
if result is True:
|
|
build = subprocess.run(
|
|
["hugo"],
|
|
cwd=base_path, text=True,
|
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE
|
|
)
|
|
print(build.stdout)
|
|
print(build.stderr)
|
|
return result
|
|
except Exception as e:
|
|
print(f"Error {file_path} → {e}")
|
|
return False
|
|
|
|
return False
|
|
|
|
def public_arweave(self):
|
|
print("DEBUG2, public first:", self.profile_selected, self.project_selected)
|
|
base_path = os.path.join(".config_app","profiles",self.profile_selected)
|
|
wallet_path = os.path.join(base_path, f"wallet_{self.profile_selected}.json")
|
|
wallet_path2 = os.path.abspath(wallet_path)
|
|
project_path = os.path.join(base_path, self.project_selected)
|
|
public_path = os.path.join(project_path, "public")
|
|
manifest_path = os.path.join(project_path, "manifest.json")
|
|
drive_file_path = os.path.join(project_path, f"drive_{self.project_selected}.json")
|
|
time.sleep(5)
|
|
print("Wait ensambling")
|
|
|
|
#si no existe ejecutar este codigo normalmente
|
|
create_drive = [
|
|
"ardrive", "create-drive",
|
|
"--wallet-file", wallet_path,
|
|
"--drive-name", self.project_selected
|
|
]
|
|
result = subprocess.run(create_drive, capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
print("Error creating drive:", result.stderr)
|
|
return False
|
|
|
|
project_data = json.loads(result.stdout)
|
|
with open(drive_file_path, "w", encoding="utf-8") as f:
|
|
json.dump(project_data, f, indent=4)
|
|
|
|
drive_id = next(
|
|
(d["entityId"] for d in project_data.get("created", []) if d.get("type") == "folder"),
|
|
None
|
|
)
|
|
if not drive_id:
|
|
print("No drive_id found")
|
|
return False
|
|
|
|
print("Drive ID:", drive_id)
|
|
for _ in tqdm(range(5), desc="Stop", ncols=80):
|
|
time.sleep(2)
|
|
|
|
upload_website = (
|
|
f"for i in *; do ardrive upload-file --local-path \"$i\" "
|
|
f"--parent-folder-id {drive_id} -w '{wallet_path2}'; done"
|
|
)
|
|
print(upload_website)
|
|
|
|
subprocess.run(upload_website, shell=True, cwd=public_path, text=True)
|
|
|
|
for _ in tqdm(range(5), desc="Make Manifest", ncols=80):
|
|
time.sleep(1)
|
|
|
|
upload_manifest = (
|
|
f"ardrive create-manifest -f '{drive_id}' -w '{wallet_path2}' > ../manifest.json"
|
|
)
|
|
subprocess.run(upload_manifest, shell=True, cwd=public_path, text=True)
|
|
if os.path.isfile(manifest_path):
|
|
print("Manifest created successfully")
|
|
for _ in tqdm(range(10), desc="Wait, distrubution on arweave system", ncols=80):
|
|
time.sleep(1)
|
|
return True
|
|
else:
|
|
print("Manifest file not created")
|
|
return False
|
|
|
|
def refresh_arweave(self):
|
|
print("DEBUG-Refresh-Arweave, public first:", self.profile_selected, self.project_selected)
|
|
|
|
print(self.template_selected)
|
|
profile_path = os.path.join(".config_app","profiles",self.profile_selected)
|
|
def change_manifest(profile_path):#firts step
|
|
print(profile_path)
|
|
try:
|
|
manifest_path = os.path.join(profile_path, self.project_selected, "manifest.json")
|
|
with open(manifest_path, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
paths_data = data["manifest"]["paths"]
|
|
|
|
new_path = os.path.join(os.path.dirname(manifest_path), "ma.json")
|
|
with open(new_path, 'w', encoding='utf-8') as f:
|
|
json.dump({"paths": paths_data}, f, separators=(',', ':'))
|
|
os.remove(manifest_path)
|
|
print("manifest.json ---> ma.json.")
|
|
except Exception as e:
|
|
print(f"Error processing manifest: {e}")
|
|
|
|
def generate_structure(profile_path):
|
|
public_path = os.path.join(profile_path)
|
|
print(f"Buscando en: {public_path}")
|
|
print(self.project_selected)
|
|
|
|
if not os.path.exists(public_path):
|
|
print("El directorio 'public' no existe.")
|
|
return {}
|
|
|
|
structure = {}
|
|
|
|
for root, dirs, files in os.walk(public_path, topdown=False):
|
|
print(f"Explorando: {root}")
|
|
rel_path = os.path.relpath(root, public_path)
|
|
pointer = structure
|
|
if rel_path != ".":
|
|
for part in rel_path.split(os.sep):
|
|
pointer = pointer.setdefault(part, {})
|
|
|
|
file_data = {}
|
|
for file in sorted(files):
|
|
print(f" - Archivo encontrado: {file}")
|
|
full_path = os.path.join(root, file)
|
|
with open(full_path, "rb") as f:
|
|
data = f.read()
|
|
file_hash = hashlib.md5(data).hexdigest()
|
|
pointer[file] = file_hash
|
|
file_data[file] = file_hash
|
|
|
|
for d in sorted(dirs):
|
|
sub_path = os.path.join(rel_path, d)
|
|
sub_pointer = structure
|
|
for part in sub_path.split(os.sep):
|
|
sub_pointer = sub_pointer.get(part, {})
|
|
if "__hash__" in sub_pointer:
|
|
file_data[d] = {"__hash__": sub_pointer["__hash__"]}
|
|
|
|
folder_hash = hashlib.md5(json.dumps(file_data, sort_keys=True).encode()).hexdigest()
|
|
pointer["__hash__"] = folder_hash
|
|
|
|
print("Nueva radiografía generada:")
|
|
print(json.dumps(structure, indent=2, ensure_ascii=False))
|
|
return structure
|
|
|
|
def compare_hash(profile_path):
|
|
print("Verify structure")
|
|
public_path = os.path.join(profile_path,self.project_selected, "public")
|
|
swarp_path = os.path.join(profile_path,self.project_selected, "swarp")
|
|
radiografia_path = os.path.join(profile_path,self.project_selected, "structure.json")
|
|
|
|
if os.path.exists(radiografia_path):
|
|
with open(radiografia_path, "r") as f:
|
|
old_structure = json.load(f)
|
|
else:
|
|
print("Empty,fail old structure")
|
|
return
|
|
|
|
new_structure = generate_structure(public_path)
|
|
|
|
os.makedirs(swarp_path, exist_ok=True)
|
|
|
|
for root, dirs, files in os.walk(public_path, topdown=False):
|
|
rel_path = os.path.relpath(root, public_path)
|
|
old_pointer = old_structure
|
|
new_pointer = new_structure
|
|
|
|
if rel_path != ".":
|
|
for part in rel_path.split(os.sep):
|
|
old_pointer = old_pointer.get(part, {})
|
|
new_pointer = new_pointer.get(part, {})
|
|
|
|
for file in files:
|
|
if file == "__hash__":
|
|
continue
|
|
new_file_hash = new_pointer.get(file)
|
|
old_file_hash = old_pointer.get(file)
|
|
|
|
if new_file_hash and old_file_hash == new_file_hash:
|
|
src = os.path.join(root, file)
|
|
dest_dir = os.path.join(swarp_path, rel_path)
|
|
os.makedirs(dest_dir, exist_ok=True)
|
|
shutil.move(src, os.path.join(dest_dir, file))
|
|
print(f"No changes, No upload: {rel_path}/{file}")
|
|
|
|
if not os.listdir(root):
|
|
os.rmdir(root)
|
|
|
|
os.remove(radiografia_path)
|
|
def new_radiography(profile_path):
|
|
print("Generate new structure.json")
|
|
public_path = os.path.join(profile_path, self.project_selected, "public")
|
|
radiografia_path = os.path.join(profile_path, self.project_selected, "structure.json")
|
|
|
|
structure = generate_structure(public_path)
|
|
|
|
with open(radiografia_path, "w", encoding="utf-8") as f:
|
|
json.dump(structure, f, indent=4, sort_keys=True)
|
|
#bien, listo para lo que sigue
|
|
def upload_public(profile_path):
|
|
print("Upload new files")
|
|
|
|
project_file = os.path.join(profile_path,self.project_selected, f"drive_{self.project_selected}.json")
|
|
wallet_path = os.path.join(profile_path, f"wallet_{self.profile_selected}.json")
|
|
wallet_path2 = os.path.abspath(wallet_path)
|
|
|
|
public_path = os.path.join(profile_path, self.project_selected, "public")
|
|
project_path = os.path.join(profile_path, self.project_selected)
|
|
|
|
|
|
print('en esta direccion buscarmeos el json drive')
|
|
print(project_file)
|
|
time.sleep(30)
|
|
with open(project_file, "r") as f:
|
|
project_data = json.load(f)
|
|
|
|
drive_id = next(
|
|
(d["entityId"] for d in project_data.get("created", []) if d.get("type") == "folder"),
|
|
None
|
|
)
|
|
|
|
if not drive_id:
|
|
print("No valid drive_id")
|
|
return
|
|
|
|
for _ in tqdm(range(10), desc="Wait...", ncols=80):
|
|
time.sleep(1)
|
|
|
|
upload_cmd = (
|
|
f"for i in *; do ardrive upload-file --local-path \"$i\" "
|
|
f"--parent-folder-id '{drive_id}' -w '{wallet_path2}'; done > ../ni.json"
|
|
)
|
|
|
|
subprocess.run(upload_cmd, shell=True, cwd=public_path, text=True)
|
|
print("New files upload ---> ni.json")
|
|
|
|
for _ in tqdm(range(10), desc="Formating ni.json", ncols=80):
|
|
time.sleep(1)
|
|
ni_path = os.path.join(project_path, "ni.json")
|
|
|
|
with open(ni_path, 'r', encoding='utf-8') as f:
|
|
contenido = f.read()
|
|
|
|
patron = r'"created"\s*:\s*(\[[^\]]*\](?:\s*,\s*\[[^\]]*\])*)'
|
|
coincidencias = re.findall(patron, contenido, re.DOTALL)
|
|
|
|
resultado_paths = {}
|
|
|
|
for coincidencia in coincidencias:
|
|
try:
|
|
lista = json.loads(coincidencia)
|
|
for item in lista:
|
|
if item.get("type") == "file" and "sourceUri" in item and "dataTxId" in item:
|
|
# Limpiar el sourceUri: quitar todo antes de "public/"
|
|
uri = item["sourceUri"]
|
|
uri = re.sub(r'^.*?public/', '', uri) # Elimina todo antes de "public/"
|
|
resultado_paths[uri] = {"id": item["dataTxId"]} # Reemplazar dataTxId por id
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
with open(ni_path, 'w', encoding='utf-8') as out:
|
|
json.dump({"paths": resultado_paths}, out, indent=4, ensure_ascii=False)
|
|
|
|
print(f"ni.json correcty formatted")
|
|
##voy aca
|
|
def combine_folders(profile_path):
|
|
public_path = os.path.join(profile_path,self.project_selected, "public")
|
|
swarp_path = os.path.join(profile_path,self.project_selected, "swarp")
|
|
|
|
if not os.path.exists(swarp_path):
|
|
print("No swarp")
|
|
return
|
|
|
|
for item in os.listdir(swarp_path):
|
|
s_item = os.path.join(swarp_path, item)
|
|
p_item = os.path.join(public_path, item)
|
|
|
|
if os.path.isdir(s_item):
|
|
shutil.copytree(s_item, p_item, dirs_exist_ok=True)
|
|
else:
|
|
shutil.copy2(s_item, p_item)
|
|
|
|
shutil.rmtree(swarp_path)
|
|
def new_manifest(profile_path):
|
|
print("ma.json + ni.json = manifest.json...")
|
|
|
|
ma_path = os.path.join(profile_path,self.project_selected, "ma.json")
|
|
ni_path = os.path.join(profile_path,self.project_selected, "ni.json")
|
|
mani_path = os.path.join(profile_path,self.project_selected, "manifest.json")
|
|
|
|
try:
|
|
# Leer los archivos 'ma.json' (old) y 'ni.json' (new)
|
|
with open(ma_path, 'r', encoding='utf-8') as f:
|
|
ma_data = json.load(f)
|
|
|
|
with open(ni_path, 'r', encoding='utf-8') as f:
|
|
ni_data = json.load(f)
|
|
|
|
ma_paths = ma_data.get("paths", {})
|
|
ni_paths = ni_data.get("paths", {})
|
|
|
|
for key in ni_paths:
|
|
if key in ma_paths:
|
|
del ma_paths[key]
|
|
|
|
ma_paths.update(ni_paths)
|
|
|
|
new_manifest_data = {
|
|
"manifest": "arweave/paths",
|
|
"version": "0.1.0",
|
|
"index": {
|
|
"path": "index.html"
|
|
},
|
|
"paths": ma_paths
|
|
}
|
|
|
|
# Guardar el nuevo manifiesto combinado en 'mani.json'
|
|
with open(mani_path, 'w', encoding='utf-8') as f:
|
|
json.dump(new_manifest_data, f, indent=4)
|
|
|
|
print("Manifest ready for upload.")
|
|
|
|
except Exception as e:
|
|
print(f"Error 404 {e}")
|
|
|
|
def upload_manifest(profile_path):
|
|
print("Upload manifest")
|
|
|
|
project_file = os.path.join(profile_path, self.project_selected, f"drive_{self.project_selected}.json")
|
|
wallet_path = os.path.join(profile_path, f"wallet_{self.profile_selected}.json")
|
|
wallet_path2 = os.path.abspath(wallet_path)
|
|
project_path = os.path.join(profile_path, self.project_selected)
|
|
|
|
with open(project_file, "r") as f:
|
|
project_data = json.load(f)
|
|
|
|
drive_id = next(
|
|
(d["entityId"] for d in project_data.get("created", []) if d.get("type") == "folder"),
|
|
None
|
|
)
|
|
|
|
if not drive_id:
|
|
print("No found drive_id")
|
|
return
|
|
|
|
upload_cmd = (
|
|
f"ardrive upload-file --content-type 'application/x.arweave-manifest+json' "
|
|
f"--local-path manifest.json --parent-folder-id '{drive_id}' -w '{wallet_path2}' > fest.json"
|
|
)
|
|
|
|
subprocess.run(upload_cmd, shell=True, cwd=project_path, text=True)
|
|
print("Manifest correctly uploaded")
|
|
|
|
mani_path = os.path.join(profile_path,self.project_selected, "manifest.json")
|
|
fest_path = os.path.join(profile_path,self.project_selected, "fest.json")
|
|
final_manifest_path = os.path.join(profile_path,self.project_selected, "manifest.json")
|
|
|
|
with open(mani_path, "r", encoding="utf-8") as f:
|
|
mani_data = json.load(f)
|
|
|
|
with open(fest_path, "r", encoding="utf-8") as f:
|
|
fest_data = json.load(f)
|
|
|
|
combined_data = {
|
|
"manifest": mani_data,
|
|
**fest_data # Esto "expande" created, tips, fees directamente
|
|
}
|
|
|
|
with open(final_manifest_path, "w", encoding="utf-8") as f:
|
|
json.dump(combined_data, f, indent=4)
|
|
|
|
def delete(profile_path):
|
|
project_path = os.path.join(profile_path, self.project_selected)
|
|
|
|
archivos = ["ma.json", "ni.json", "fest.json"]
|
|
for archivo in archivos:
|
|
ruta = os.path.join(project_path, archivo)
|
|
if os.path.exists(ruta):
|
|
os.remove(ruta)
|
|
else:
|
|
print("Error")
|
|
try:
|
|
# Call the change_manifest function
|
|
change_manifest(profile_path)
|
|
compare_hash(profile_path)
|
|
upload_public(profile_path)
|
|
combine_folders(profile_path)
|
|
new_manifest(profile_path)
|
|
new_radiography(profile_path)
|
|
upload_manifest(profile_path)
|
|
delete(profile_path)
|
|
|
|
return True
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
return False
|