import os import json import html import shutil from functools import wraps from types import SimpleNamespace from datetime import datetime, timedelta from sqlalchemy import func from flask import Flask, render_template, request, redirect, url_for, flash, send_from_directory, session from flask_login import LoginManager, login_user, login_required, logout_user, current_user from werkzeug.security import generate_password_hash, check_password_hash from .web_db import db, User, Project, Run, LogEntry from .web_tasks import huey, generate_book_task, regenerate_artifacts_task import config from . import utils from . import ai from . import story # Calculate paths relative to this file (modules/web_app.py) BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) TEMPLATE_DIR = os.path.join(BASE_DIR, 'templates') app = Flask(__name__, template_folder=TEMPLATE_DIR) app.url_map.strict_slashes = False app.config['SECRET_KEY'] = config.FLASK_SECRET # Use absolute path for database to avoid Docker path resolution issues app.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:///{os.path.join(config.DATA_DIR, "bookapp.db")}' db.init_app(app) login_manager = LoginManager() login_manager.login_view = 'login' login_manager.init_app(app) @login_manager.user_loader def load_user(user_id): return db.session.get(User, int(user_id)) def migrate_logs(): """Parses old log files and inserts them into the database.""" runs = Run.query.all() migrated = 0 files_to_clean = [] for run in runs: # Check if DB logs exist has_db_logs = LogEntry.query.filter_by(run_id=run.id).first() is not None # Locate Log File log_path = run.log_file if not log_path or not os.path.exists(log_path): # Try common fallback locations candidates = [ os.path.join(run.project.folder_path, f"system_log_{run.id}.txt"), os.path.join(run.project.folder_path, "runs", "bible", f"run_{run.id}", "web_console.log") ] for c in candidates: if os.path.exists(c): log_path = c break if log_path and os.path.exists(log_path): if has_db_logs: # Logs are already in DB (New Run or previous migration). Mark file for cleanup. files_to_clean.append(log_path) continue try: with open(log_path, 'r', encoding='utf-8', errors='ignore') as f: lines = f.readlines() entries = [] for line in lines: # Parse standard log format: [HH:MM:SS] PHASE | Message if '|' in line and line.strip().startswith('['): try: parts = line.split('|', 1) meta = parts[0].strip() msg = parts[1].strip() if ']' in meta: ts_str = meta[1:meta.find(']')] phase = meta[meta.find(']')+1:].strip() # Reconstruct datetime base_date = run.start_time.date() if run.start_time else datetime.utcnow().date() t_time = datetime.strptime(ts_str, "%H:%M:%S").time() dt = datetime.combine(base_date, t_time) entries.append(LogEntry(run_id=run.id, timestamp=dt, phase=phase, message=msg)) except: continue if entries: db.session.add_all(entries) migrated += 1 files_to_clean.append(log_path) except Exception as e: print(f"Migration failed for Run {run.id}: {e}") if migrated > 0: db.session.commit() print(f"โœ… Migrated logs for {migrated} runs to Database.") # Cleanup files (even if no new migrations happened) if files_to_clean: count = 0 for fpath in files_to_clean: try: os.remove(fpath) count += 1 except: pass if count > 0: print(f"๐Ÿงน Cleaned up {count} redundant log files.") # --- SETUP --- with app.app_context(): db.create_all() migrate_logs() # Auto-create Admin from Environment Variables (Docker/Portainer Setup) if config.ADMIN_USER and config.ADMIN_PASSWORD: admin = User.query.filter_by(username=config.ADMIN_USER).first() if not admin: print(f"๐Ÿ” System: Creating Admin User '{config.ADMIN_USER}' from environment variables.") admin = User(username=config.ADMIN_USER, password=generate_password_hash(config.ADMIN_PASSWORD, method='pbkdf2:sha256'), is_admin=True) db.session.add(admin) db.session.commit() elif not admin.is_admin: admin.is_admin = True db.session.commit() # --- DECORATORS --- def admin_required(f): @wraps(f) def decorated_function(*args, **kwargs): if not current_user.is_authenticated or not current_user.is_admin: flash("Admin access required.") return redirect(url_for('index')) return f(*args, **kwargs) return decorated_function # --- ROUTES --- @app.route('/') @login_required def index(): projects = Project.query.filter_by(user_id=current_user.id).all() return render_template('dashboard.html', projects=projects, user=current_user) @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': username = request.form.get('username') password = request.form.get('password') user = User.query.filter_by(username=username).first() if user and check_password_hash(user.password, password): login_user(user) return redirect(url_for('index')) flash('Invalid credentials') return render_template('login.html') @app.route('/register', methods=['GET', 'POST']) def register(): if request.method == 'POST': username = request.form.get('username') password = request.form.get('password') if User.query.filter_by(username=username).first(): flash('Username exists') return redirect(url_for('register')) new_user = User(username=username, password=generate_password_hash(password, method='pbkdf2:sha256')) db.session.add(new_user) db.session.commit() login_user(new_user) return redirect(url_for('index')) return render_template('register.html') @app.route('/project/setup', methods=['POST']) @login_required def project_setup_wizard(): concept = request.form.get('concept') # Initialize AI if needed try: ai.init_models() except: pass if not ai.model_logic: flash("AI models not initialized.") return redirect(url_for('index')) prompt = f""" Analyze this story concept and suggest metadata for a book or series. CONCEPT: {concept} RETURN JSON with these keys: - title: Suggested book title - genre: Genre - target_audience: e.g. Adult, YA - tone: e.g. Dark, Whimsical - length_category: One of ["01", "1", "2", "2b", "3", "4", "5"] based on likely depth. - estimated_chapters: int (suggested chapter count) - estimated_word_count: string (e.g. "75,000") - include_prologue: boolean - include_epilogue: boolean - tropes: list of strings - pov_style: e.g. First Person - time_period: e.g. Modern - spice: e.g. Standard, Explicit - violence: e.g. None, Graphic - is_series: boolean - series_title: string (if series) - narrative_tense: e.g. Past, Present - language_style: e.g. Standard, Flowery - dialogue_style: e.g. Witty, Formal - page_orientation: Portrait, Landscape, or Square - formatting_rules: list of strings - author_bio: string (suggested persona bio) """ suggestions = {} try: response = ai.model_logic.generate_content(prompt) suggestions = json.loads(utils.clean_json(response.text)) except Exception as e: flash(f"AI Analysis failed: {e}") suggestions = {"title": "New Project", "genre": "Fiction"} # Load Personas for dropdown personas = {} if os.path.exists(config.PERSONAS_FILE): try: with open(config.PERSONAS_FILE, 'r') as f: personas = json.load(f) except: pass return render_template('project_setup.html', s=suggestions, concept=concept, personas=personas, lengths=config.LENGTH_DEFINITIONS) @app.route('/project/setup/refine', methods=['POST']) @login_required def project_setup_refine(): # Re-run the wizard logic with an instruction concept = request.form.get('concept') instruction = request.form.get('refine_instruction') # Reconstruct current state from form to pass back to AI current_state = { "title": request.form.get('title'), "genre": request.form.get('genre'), "target_audience": request.form.get('audience'), "tone": request.form.get('tone'), # ... (capture other fields if critical, or just rely on AI re-generating from concept + instruction) } try: ai.init_models() except: pass prompt = f""" Update these project suggestions based on the user instruction. ORIGINAL CONCEPT: {concept} CURRENT TITLE: {current_state['title']} INSTRUCTION: {instruction} RETURN JSON with the same keys as a full analysis (title, genre, length_category, etc). """ suggestions = {} try: response = ai.model_logic.generate_content(prompt) suggestions = json.loads(utils.clean_json(response.text)) except Exception as e: flash(f"Refinement failed: {e}") return redirect(url_for('index')) # Fallback personas = {} if os.path.exists(config.PERSONAS_FILE): try: with open(config.PERSONAS_FILE, 'r') as f: personas = json.load(f) except: pass return render_template('project_setup.html', s=suggestions, concept=concept, personas=personas, lengths=config.LENGTH_DEFINITIONS) @app.route('/project/create', methods=['POST']) @login_required def create_project_final(): title = request.form.get('title') safe_title = "".join([c for c in title if c.isalnum() or c=='_']).replace(" ", "_") user_dir = os.path.join(config.DATA_DIR, "users", str(current_user.id)) if not os.path.exists(user_dir): os.makedirs(user_dir) proj_path = os.path.join(user_dir, safe_title) if os.path.exists(proj_path): safe_title += f"_{int(datetime.utcnow().timestamp())}" proj_path = os.path.join(user_dir, safe_title) os.makedirs(proj_path) # Construct Bible from Form Data length_cat = request.form.get('length_category') len_def = config.LENGTH_DEFINITIONS.get(length_cat, config.LENGTH_DEFINITIONS['4']).copy() # Overrides try: len_def['chapters'] = int(request.form.get('chapters')) except: pass len_def['words'] = request.form.get('words') len_def['include_prologue'] = 'include_prologue' in request.form len_def['include_epilogue'] = 'include_epilogue' in request.form is_series = 'is_series' in request.form style = { "tone": request.form.get('tone'), "pov_style": request.form.get('pov_style'), "time_period": request.form.get('time_period'), "spice": request.form.get('spice'), "violence": request.form.get('violence'), "narrative_tense": request.form.get('narrative_tense'), "language_style": request.form.get('language_style'), "dialogue_style": request.form.get('dialogue_style'), "page_orientation": request.form.get('page_orientation'), "tropes": [x.strip() for x in request.form.get('tropes', '').split(',') if x.strip()], "formatting_rules": [x.strip() for x in request.form.get('formatting_rules', '').split(',') if x.strip()] } bible = { "project_metadata": { "title": title, "author": request.form.get('author'), "author_bio": request.form.get('author_bio'), "genre": request.form.get('genre'), "target_audience": request.form.get('audience'), "is_series": is_series, "length_settings": len_def, "style": style }, "books": [], "characters": [] } # Create Books count = 1 if is_series: try: count = int(request.form.get('series_count', 1)) except: count = 3 concept = request.form.get('concept', '') for i in range(count): bible['books'].append({ "book_number": i+1, "title": f"{title} - Book {i+1}" if is_series else title, "manual_instruction": concept if i==0 else "", "plot_beats": [] }) # Enrich via AI immediately if concept exists if concept: try: ai.init_models() bible = story.enrich(bible, proj_path) except: pass with open(os.path.join(proj_path, "bible.json"), 'w') as f: json.dump(bible, f, indent=2) new_proj = Project(user_id=current_user.id, name=title, folder_path=proj_path) db.session.add(new_proj) db.session.commit() return redirect(url_for('view_project', id=new_proj.id)) @app.route('/project/') @login_required def view_project(id): proj = db.session.get(Project, id) if not proj: return "Project not found", 404 if proj.user_id != current_user.id: return "Unauthorized", 403 # Load Bible bible_path = os.path.join(proj.folder_path, "bible.json") bible_data = utils.load_json(bible_path) # Load Personas for dropdown personas = {} if os.path.exists(config.PERSONAS_FILE): try: with open(config.PERSONAS_FILE, 'r') as f: personas = json.load(f) except: pass runs = Run.query.filter_by(project_id=id).order_by(Run.id.desc()).all() latest_run = runs[0] if runs else None # Fetch other projects for "Related Series" import other_projects = Project.query.filter(Project.user_id == current_user.id, Project.id != id).all() artifacts = [] cover_image = None generated_books = {} # Map book_number -> {status: 'generated', run_id: int, folder: str} # Scan ALL completed runs to find the latest status of each book for r in runs: if r.status == 'completed': run_dir = os.path.join(proj.folder_path, "runs", "bible", f"run_{r.id}") if os.path.exists(run_dir): # 1. Scan for Generated Books for d in os.listdir(run_dir): if d.startswith("Book_") and os.path.isdir(os.path.join(run_dir, d)): # Check for manuscript to confirm generation if os.path.exists(os.path.join(run_dir, d, "manuscript.json")): try: parts = d.split('_') if len(parts) > 1 and parts[1].isdigit(): b_num = int(parts[1]) # Only add if we haven't found a newer version (runs are ordered desc) if b_num not in generated_books: # Find artifacts for direct download link book_path = os.path.join(run_dir, d) epub_file = next((f for f in os.listdir(book_path) if f.endswith('.epub')), None) docx_file = next((f for f in os.listdir(book_path) if f.endswith('.docx')), None) generated_books[b_num] = {'status': 'generated', 'run_id': r.id, 'folder': d, 'epub': os.path.join(d, epub_file).replace("\\", "/") if epub_file else None, 'docx': os.path.join(d, docx_file).replace("\\", "/") if docx_file else None} except: pass # Collect Artifacts from Latest Run if latest_run: run_dir = os.path.join(proj.folder_path, "runs", "bible", f"run_{latest_run.id}") if os.path.exists(run_dir): # Find Cover Image (Root or First Book) if os.path.exists(os.path.join(run_dir, "cover.png")): cover_image = "cover.png" else: subdirs = sorted([d for d in os.listdir(run_dir) if os.path.isdir(os.path.join(run_dir, d)) and d.startswith("Book_")]) for d in subdirs: if os.path.exists(os.path.join(run_dir, d, "cover.png")): cover_image = os.path.join(d, "cover.png").replace("\\", "/") break for root, dirs, files in os.walk(run_dir): for f in files: if f.lower().endswith(('.epub', '.docx')): rel_path = os.path.relpath(os.path.join(root, f), run_dir) artifacts.append({ 'name': f, 'path': rel_path.replace("\\", "/"), 'type': f.split('.')[-1].upper() }) return render_template('project.html', project=proj, bible=bible_data, runs=runs, active_run=latest_run, artifacts=artifacts, cover_image=cover_image, personas=personas, generated_books=generated_books, other_projects=other_projects) @app.route('/project//run', methods=['POST']) @login_required def run_project(id): proj = db.session.get(Project, id) or Project.query.get_or_404(id) # Create Run Entry new_run = Run(project_id=id, status="queued") db.session.add(new_run) db.session.commit() # Trigger Background Task bible_path = os.path.join(proj.folder_path, "bible.json") task = generate_book_task(new_run.id, proj.folder_path, bible_path, allow_copy=True) return redirect(url_for('view_project', id=id)) @app.route('/project//review') @login_required def review_project(id): proj = db.session.get(Project, id) or Project.query.get_or_404(id) if proj.user_id != current_user.id: return "Unauthorized", 403 bible_path = os.path.join(proj.folder_path, "bible.json") bible = utils.load_json(bible_path) return render_template('project_review.html', project=proj, bible=bible) @app.route('/project//update', methods=['POST']) @login_required def update_project_metadata(id): proj = db.session.get(Project, id) or Project.query.get_or_404(id) if proj.user_id != current_user.id: return "Unauthorized", 403 new_title = request.form.get('title') new_author = request.form.get('author') bible_path = os.path.join(proj.folder_path, "bible.json") bible = utils.load_json(bible_path) if bible: if new_title: bible['project_metadata']['title'] = new_title proj.name = new_title if new_author: bible['project_metadata']['author'] = new_author with open(bible_path, 'w') as f: json.dump(bible, f, indent=2) db.session.commit() return redirect(url_for('view_project', id=id)) @app.route('/project//refine_bible', methods=['POST']) @login_required def refine_bible_route(id): proj = db.session.get(Project, id) or Project.query.get_or_404(id) if proj.user_id != current_user.id: return "Unauthorized", 403 instruction = request.form.get('instruction') if not instruction: flash("Instruction required.") return redirect(url_for('view_project', id=id)) bible_path = os.path.join(proj.folder_path, "bible.json") bible = utils.load_json(bible_path) if bible: try: ai.init_models() except: pass if not ai.model_logic: flash("AI models not initialized.") return redirect(url_for('view_project', id=id)) new_bible = story.refine_bible(bible, instruction, proj.folder_path) if new_bible: with open(bible_path, 'w') as f: json.dump(new_bible, f, indent=2) flash("Bible updated successfully.") else: flash("AI failed to update Bible. Check logs.") return redirect(url_for('view_project', id=id)) @app.route('/project//add_book', methods=['POST']) @login_required def add_book(id): proj = db.session.get(Project, id) or Project.query.get_or_404(id) if proj.user_id != current_user.id: return "Unauthorized", 403 title = request.form.get('title', 'Untitled') instruction = request.form.get('instruction', '') bible_path = os.path.join(proj.folder_path, "bible.json") bible = utils.load_json(bible_path) if bible: if 'books' not in bible: bible['books'] = [] next_num = len(bible['books']) + 1 new_book = { "book_number": next_num, "title": title, "manual_instruction": instruction, "plot_beats": [] # AI will fill this if empty during run } bible['books'].append(new_book) # If series metadata isn't set, set it if 'project_metadata' in bible: bible['project_metadata']['is_series'] = True with open(bible_path, 'w') as f: json.dump(bible, f, indent=2) flash(f"Added Book {next_num}: {title}") return redirect(url_for('view_project', id=id)) @app.route('/project//book//update', methods=['POST']) @login_required def update_book_details(id, book_num): proj = db.session.get(Project, id) or Project.query.get_or_404(id) if proj.user_id != current_user.id: return "Unauthorized", 403 new_title = request.form.get('title') new_instruction = request.form.get('instruction') bible_path = os.path.join(proj.folder_path, "bible.json") bible = utils.load_json(bible_path) if bible and 'books' in bible: for b in bible['books']: if b.get('book_number') == book_num: if new_title: b['title'] = new_title if new_instruction: b['manual_instruction'] = new_instruction break with open(bible_path, 'w') as f: json.dump(bible, f, indent=2) flash(f"Book {book_num} updated.") return redirect(url_for('view_project', id=id)) @app.route('/project//delete_book/', methods=['POST']) @login_required def delete_book(id, book_num): proj = db.session.get(Project, id) or Project.query.get_or_404(id) if proj.user_id != current_user.id: return "Unauthorized", 403 bible_path = os.path.join(proj.folder_path, "bible.json") bible = utils.load_json(bible_path) if bible and 'books' in bible: bible['books'] = [b for b in bible['books'] if b.get('book_number') != book_num] # Renumber for i, b in enumerate(bible['books']): b['book_number'] = i + 1 # Update Series Status (Revert to standalone if only 1 book remains) if 'project_metadata' in bible: bible['project_metadata']['is_series'] = (len(bible['books']) > 1) with open(bible_path, 'w') as f: json.dump(bible, f, indent=2) flash("Book deleted from plan.") return redirect(url_for('view_project', id=id)) @app.route('/project//import_characters', methods=['POST']) @login_required def import_characters(id): target_proj = db.session.get(Project, id) source_id = request.form.get('source_project_id') source_proj = db.session.get(Project, source_id) if not target_proj or not source_proj: return "Project not found", 404 if target_proj.user_id != current_user.id or source_proj.user_id != current_user.id: return "Unauthorized", 403 target_bible = utils.load_json(os.path.join(target_proj.folder_path, "bible.json")) source_bible = utils.load_json(os.path.join(source_proj.folder_path, "bible.json")) if target_bible and source_bible: existing_names = {c['name'].lower() for c in target_bible.get('characters', [])} added_count = 0 for char in source_bible.get('characters', []): if char['name'].lower() not in existing_names: target_bible['characters'].append(char) added_count += 1 if added_count > 0: with open(os.path.join(target_proj.folder_path, "bible.json"), 'w') as f: json.dump(target_bible, f, indent=2) flash(f"Imported {added_count} characters from {source_proj.name}.") else: flash("No new characters found to import.") return redirect(url_for('view_project', id=id)) @app.route('/project//set_persona', methods=['POST']) @login_required def set_project_persona(id): proj = db.session.get(Project, id) or Project.query.get_or_404(id) if proj.user_id != current_user.id: return "Unauthorized", 403 persona_name = request.form.get('persona_name') bible_path = os.path.join(proj.folder_path, "bible.json") bible = utils.load_json(bible_path) if bible: personas = {} if os.path.exists(config.PERSONAS_FILE): try: with open(config.PERSONAS_FILE, 'r') as f: personas = json.load(f) except: pass if persona_name in personas: bible['project_metadata']['author_details'] = personas[persona_name] with open(bible_path, 'w') as f: json.dump(bible, f, indent=2) flash(f"Project voice updated to persona: {persona_name}") else: flash("Persona not found.") return redirect(url_for('view_project', id=id)) @app.route('/project//regenerate_artifacts', methods=['POST']) @login_required def regenerate_artifacts(run_id): run = db.session.get(Run, run_id) or Run.query.get_or_404(run_id) if run.project.user_id != current_user.id: return "Unauthorized", 403 feedback = request.form.get('feedback') # Reset state immediately so UI polls correctly run.status = 'queued' LogEntry.query.filter_by(run_id=run_id).delete() db.session.commit() regenerate_artifacts_task(run_id, run.project.folder_path, feedback=feedback) flash("Regenerating cover and files with updated metadata...") return redirect(url_for('view_run', id=run_id)) @app.route('/run//stop', methods=['POST']) @login_required def stop_run(id): run = db.session.get(Run, id) or Run.query.get_or_404(id) if run.project.user_id != current_user.id: return "Unauthorized", 403 if run.status in ['queued', 'running']: run.status = 'cancelled' run.end_time = datetime.utcnow() db.session.commit() flash(f"Run {id} marked as cancelled.") return redirect(url_for('view_project', id=run.project_id)) @app.route('/run//restart', methods=['POST']) @login_required def restart_run(id): run = db.session.get(Run, id) or Run.query.get_or_404(id) if run.project.user_id != current_user.id: return "Unauthorized", 403 # Create a new run new_run = Run(project_id=run.project_id, status="queued") db.session.add(new_run) db.session.commit() # Check mode: 'resume' (default) vs 'restart' mode = request.form.get('mode', 'resume') allow_copy = (mode == 'resume') task = generate_book_task(new_run.id, run.project.folder_path, os.path.join(run.project.folder_path, "bible.json"), allow_copy=allow_copy) flash(f"Started new Run #{new_run.id}.") return redirect(url_for('view_project', id=run.project_id)) @app.route('/run/') @login_required def view_run(id): run = db.session.get(Run, id) if not run: return "Run not found", 404 if run.project.user_id != current_user.id: return "Unauthorized", 403 # Fetch logs for initial render log_content = "" logs = LogEntry.query.filter_by(run_id=id).order_by(LogEntry.timestamp).all() if logs: log_content = "\n".join([f"[{l.timestamp.strftime('%H:%M:%S')}] {l.phase:<15} | {l.message}" for l in logs]) elif run.log_file and os.path.exists(run.log_file): with open(run.log_file, 'r') as f: log_content = f.read() # Fetch Artifacts for Display run_dir = os.path.join(run.project.folder_path, "runs", "bible", f"run_{run.id}") # Detect Books in Run (Series Support) books_data = [] if os.path.exists(run_dir): subdirs = sorted([d for d in os.listdir(run_dir) if os.path.isdir(os.path.join(run_dir, d)) and d.startswith("Book_")]) if not subdirs: subdirs = ["."] # Handle legacy/flat runs for d in subdirs: b_path = os.path.join(run_dir, d) b_info = {'folder': d, 'artifacts': [], 'cover': None, 'blurb': ''} # Artifacts for f in os.listdir(b_path): if f.lower().endswith(('.epub', '.docx')): b_info['artifacts'].append({'name': f, 'path': os.path.join(d, f).replace("\\", "/")}) # Cover if os.path.exists(os.path.join(b_path, "cover.png")): b_info['cover'] = os.path.join(d, "cover.png").replace("\\", "/") # Blurb blurb_p = os.path.join(b_path, "blurb.txt") if os.path.exists(blurb_p): with open(blurb_p, 'r', encoding='utf-8', errors='ignore') as f: b_info['blurb'] = f.read() books_data.append(b_info) # Load Bible Data for Dropdown bible_path = os.path.join(run.project.folder_path, "bible.json") bible_data = utils.load_json(bible_path) # Load Tracking Data for Run Details tracking = {"events": [], "characters": {}, "content_warnings": []} # We load tracking from the first book found to populate the general stats book_dir = os.path.join(run_dir, books_data[0]['folder']) if books_data else run_dir if os.path.exists(book_dir): t_ev = os.path.join(book_dir, "tracking_events.json") t_ch = os.path.join(book_dir, "tracking_characters.json") t_wn = os.path.join(book_dir, "tracking_warnings.json") # Load safely, defaulting to empty structures if load_json returns None if os.path.exists(t_ev): tracking['events'] = utils.load_json(t_ev) or [] if os.path.exists(t_ch): tracking['characters'] = utils.load_json(t_ch) or {} if os.path.exists(t_wn): tracking['content_warnings'] = utils.load_json(t_wn) or [] # Use dedicated run details template return render_template('run_details.html', run=run, log_content=log_content, books=books_data, bible=bible_data, tracking=tracking) @app.route('/run//status') @login_required def run_status(id): run = db.session.get(Run, id) or Run.query.get_or_404(id) # Check status from DB or fallback to log file log_content = "" # 1. Try Database Logs (Fastest & Best) logs = LogEntry.query.filter_by(run_id=id).order_by(LogEntry.timestamp).all() if logs: log_content = "\n".join([f"[{l.timestamp.strftime('%H:%M:%S')}] {l.phase:<15} | {l.message}" for l in logs]) # 2. Fallback to File (For old runs or if DB logging fails) if not log_content: if run.log_file and os.path.exists(run.log_file): with open(run.log_file, 'r') as f: log_content = f.read() elif run.status in ['queued', 'running']: temp_log = os.path.join(run.project.folder_path, f"system_log_{run.id}.txt") if os.path.exists(temp_log): with open(temp_log, 'r') as f: log_content = f.read() return {"status": run.status, "log": log_content, "cost": run.cost} @app.route('/project//download') @login_required def download_artifact(run_id): filename = request.args.get('file') run = db.session.get(Run, run_id) or Run.query.get_or_404(run_id) if run.project.user_id != current_user.id: return "Unauthorized", 403 run_dir = os.path.join(run.project.folder_path, "runs", "bible", f"run_{run.id}") # If file not found in root, check subfolders (Series Support) if not os.path.exists(os.path.join(run_dir, filename)) and os.path.exists(run_dir): subdirs = sorted([d for d in os.listdir(run_dir) if os.path.isdir(os.path.join(run_dir, d)) and d.startswith("Book_")]) if subdirs: # Try the first book folder possible_path = os.path.join(subdirs[0], filename) if os.path.exists(os.path.join(run_dir, possible_path)): filename = possible_path return send_from_directory(run_dir, filename, as_attachment=True) @app.route('/logout') def logout(): logout_user() return redirect(url_for('login')) @app.route('/debug/routes') def debug_routes(): output = [] for rule in app.url_map.iter_rules(): methods = ','.join(rule.methods) # Use brackets so they are visible in browser text rule_str = str(rule).replace('<', '[').replace('>', ']') line = "{:50s} {:20s} {}".format(rule.endpoint, methods, rule_str) output.append(line) return "
" + "\n".join(output) + "
" @app.route('/system/optimize_models', methods=['POST']) @login_required def optimize_models(): # Force refresh via AI module (safely handles failures) try: ai.init_models(force=True) # Force re-initialization and API scan # Refresh Style Guidelines if ai.model_logic: story.refresh_style_guidelines(ai.model_logic) flash("AI Models refreshed and Style Guidelines updated.") except Exception as e: flash(f"Error refreshing models: {e}") return redirect(request.referrer or url_for('index')) # --- COMPATIBILITY ROUTES (Fix 404s) --- @app.route('/project//run/') @login_required def legacy_run_redirect(project_id, run_id): return redirect(url_for('view_run', id=run_id)) @app.route('/system/status') @login_required def system_status(): # System Status View: Show AI Models and Quotas models_info = {} cache_data = {} cache_path = os.path.join(config.DATA_DIR, "model_cache.json") if os.path.exists(cache_path): try: with open(cache_path, 'r') as f: cache_data = json.load(f) models_info = cache_data.get('models', {}) except: pass # Create a placeholder run object so the template doesn't crash dummy_project = SimpleNamespace(user_id=current_user.id, name="System", folder_path="") dummy_run = SimpleNamespace(id=0, status="System Status", cost=0.0, log_file=None, start_time=datetime.utcnow(), project=dummy_project, duration=lambda: "N/A") return render_template('system_status.html', run=dummy_run, models=models_info, cache=cache_data, datetime=datetime) @app.route('/personas') @login_required def list_personas(): personas = {} if os.path.exists(config.PERSONAS_FILE): try: with open(config.PERSONAS_FILE, 'r') as f: personas = json.load(f) except: pass return render_template('personas.html', personas=personas) @app.route('/persona/new') @login_required def new_persona(): return render_template('persona_edit.html', persona={}, name="") @app.route('/persona/') @login_required def edit_persona(name): personas = {} if os.path.exists(config.PERSONAS_FILE): try: with open(config.PERSONAS_FILE, 'r') as f: personas = json.load(f) except: pass persona = personas.get(name) if not persona: flash(f"Persona '{name}' not found.") return redirect(url_for('list_personas')) return render_template('persona_edit.html', persona=persona, name=name) @app.route('/persona/save', methods=['POST']) @login_required def save_persona(): old_name = request.form.get('old_name') name = request.form.get('name') if not name: flash("Persona name is required.") return redirect(url_for('list_personas')) personas = {} if os.path.exists(config.PERSONAS_FILE): try: with open(config.PERSONAS_FILE, 'r') as f: personas = json.load(f) except: pass if old_name and old_name != name and old_name in personas: del personas[old_name] persona = { "name": name, "bio": request.form.get('bio'), "age": request.form.get('age'), "gender": request.form.get('gender'), "race": request.form.get('race'), "nationality": request.form.get('nationality'), "language": request.form.get('language'), "sample_text": request.form.get('sample_text'), "voice_keywords": request.form.get('voice_keywords'), "style_inspirations": request.form.get('style_inspirations') } personas[name] = persona with open(config.PERSONAS_FILE, 'w') as f: json.dump(personas, f, indent=2) flash(f"Persona '{name}' saved.") return redirect(url_for('list_personas')) @app.route('/persona/delete/', methods=['POST']) @login_required def delete_persona(name): personas = {} if os.path.exists(config.PERSONAS_FILE): try: with open(config.PERSONAS_FILE, 'r') as f: personas = json.load(f) except: pass if name in personas: del personas[name] with open(config.PERSONAS_FILE, 'w') as f: json.dump(personas, f, indent=2) flash(f"Persona '{name}' deleted.") return redirect(url_for('list_personas')) @app.route('/persona/analyze', methods=['POST']) @login_required def analyze_persona(): try: ai.init_models() except: pass if not ai.model_logic: return {"error": "AI models not initialized."}, 500 data = request.json sample = data.get('sample_text', '') prompt = f""" Act as a Literary Analyst. Create or analyze an Author Persona profile. INPUT DATA: Name: {data.get('name')} Age: {data.get('age')} | Gender: {data.get('gender')} | Nationality: {data.get('nationality')} Sample Text: {sample[:3000]} TASK: 1. 'bio': Write a 2-3 sentence description of the writing style. If sample is provided, analyze it. If not, invent a style that fits the demographics/name. 2. 'voice_keywords': Comma-separated list of 3-5 adjectives describing the voice (e.g. Gritty, Whimsical, Sarcastic). 3. 'style_inspirations': Comma-separated list of 1-3 famous authors or genres that this style resembles. RETURN JSON: {{ "bio": "...", "voice_keywords": "...", "style_inspirations": "..." }} """ try: response = ai.model_logic.generate_content(prompt) return json.loads(utils.clean_json(response.text)) except Exception as e: return {"error": str(e)}, 500 # --- ADMIN ROUTES --- @app.route('/admin') @login_required @admin_required def admin_dashboard(): users = User.query.all() projects = Project.query.all() return render_template('admin_dashboard.html', users=users, projects=projects) @app.route('/admin/user//delete', methods=['POST']) @login_required @admin_required def admin_delete_user(user_id): if user_id == current_user.id: flash("Cannot delete yourself.") return redirect(url_for('admin_dashboard')) user = db.session.get(User, user_id) if user: # Delete user data folder user_path = os.path.join(config.DATA_DIR, "users", str(user.id)) if os.path.exists(user_path): try: shutil.rmtree(user_path) except: pass # Delete projects (DB cascade handles rows, we handle files) projects = Project.query.filter_by(user_id=user.id).all() for p in projects: db.session.delete(p) db.session.delete(user) db.session.commit() flash(f"User {user.username} deleted.") return redirect(url_for('admin_dashboard')) @app.route('/admin/project//delete', methods=['POST']) @login_required @admin_required def admin_delete_project(project_id): proj = db.session.get(Project, project_id) if proj: if os.path.exists(proj.folder_path): try: shutil.rmtree(proj.folder_path) except: pass db.session.delete(proj) db.session.commit() flash(f"Project {proj.name} deleted.") return redirect(url_for('admin_dashboard')) @app.route('/admin/reset', methods=['POST']) @login_required @admin_required def admin_factory_reset(): # 1. Delete ALL Projects (Files & DB) projects = Project.query.all() for p in projects: if os.path.exists(p.folder_path): try: shutil.rmtree(p.folder_path) except: pass db.session.delete(p) # 2. Delete ALL Users except Current Admin users = User.query.filter(User.id != current_user.id).all() for u in users: user_path = os.path.join(config.DATA_DIR, "users", str(u.id)) if os.path.exists(user_path): try: shutil.rmtree(user_path) except: pass db.session.delete(u) # 3. Reset Personas to Default if os.path.exists(config.PERSONAS_FILE): try: os.remove(config.PERSONAS_FILE) except: pass utils.create_default_personas() db.session.commit() flash("Factory Reset Complete. All other users and projects have been wiped.") return redirect(url_for('admin_dashboard')) @app.route('/admin/spend') @login_required @admin_required def admin_spend_report(): days = request.args.get('days', 30, type=int) if days > 0: start_date = datetime.utcnow() - timedelta(days=days) else: start_date = datetime.min # Aggregate spend per user results = db.session.query( User.username, func.count(Run.id), func.sum(Run.cost) ).join(Project, Project.user_id == User.id)\ .join(Run, Run.project_id == Project.id)\ .filter(Run.start_time >= start_date)\ .group_by(User.id).all() report = [] total_period_spend = 0.0 for r in results: cost = r[2] if r[2] else 0.0 report.append({"username": r[0], "runs": r[1], "cost": cost}) total_period_spend += cost return render_template('admin_spend.html', report=report, days=days, total=total_period_spend) @app.route('/admin/style', methods=['GET', 'POST']) @login_required @admin_required def admin_style_guidelines(): path = os.path.join(config.DATA_DIR, "style_guidelines.json") if request.method == 'POST': ai_isms_raw = request.form.get('ai_isms', '') filter_words_raw = request.form.get('filter_words', '') data = { "ai_isms": [x.strip() for x in ai_isms_raw.split('\n') if x.strip()], "filter_words": [x.strip() for x in filter_words_raw.split('\n') if x.strip()] } with open(path, 'w') as f: json.dump(data, f, indent=2) flash("Style Guidelines updated successfully.") return redirect(url_for('admin_style_guidelines')) # Load current (creates defaults if missing) data = story.get_style_guidelines() return render_template('admin_style.html', data=data) @app.route('/admin/impersonate/') @login_required @admin_required def impersonate_user(user_id): if user_id == current_user.id: flash("Cannot impersonate yourself.") return redirect(url_for('admin_dashboard')) user = db.session.get(User, user_id) if user: session['original_admin_id'] = current_user.id login_user(user) flash(f"Now viewing as {user.username}") return redirect(url_for('index')) return redirect(url_for('admin_dashboard')) @app.route('/admin/stop_impersonate') @login_required def stop_impersonate(): admin_id = session.get('original_admin_id') if admin_id: admin = db.session.get(User, admin_id) if admin: login_user(admin) session.pop('original_admin_id', None) flash("Restored admin session.") return redirect(url_for('admin_dashboard')) return redirect(url_for('index')) if __name__ == '__main__': # Run the Huey consumer in a separate thread for testing on Pi # For production, run `huey_consumer.py web_tasks.huey` in terminal import threading def run_huey(): from huey.consumer import Consumer # Subclass to disable signal handling (which fails in threads) class ThreadedConsumer(Consumer): def _set_signal_handlers(self): pass c = ThreadedConsumer(huey, workers=1, worker_type='thread') c.run() # Configuration debug_mode = True # Run worker if: 1. In reloader child process OR 2. Reloader is disabled (debug=False) if os.environ.get("WERKZEUG_RUN_MAIN") == "true" or not debug_mode: threading.Thread(target=run_huey, daemon=True).start() app.run(host='0.0.0.0', port=5000, debug=debug_mode)