import os import json import shutil import markdown from functools import wraps from datetime import datetime, timedelta from urllib.parse import urlparse, urljoin from sqlalchemy import func, text from sqlalchemy.exc import IntegrityError from flask import Flask, render_template, request, redirect, url_for, flash, send_from_directory, session from flask_login import LoginManager, login_user, login_required, logout_user, current_user from werkzeug.security import generate_password_hash, check_password_hash from .web_db import db, User, Project, Run, LogEntry from .web_tasks import huey, generate_book_task, regenerate_artifacts_task, rewrite_chapter_task, refine_bible_task import config from . import utils from . import ai from . import story from . import export # Calculate paths relative to this file (modules/web_app.py) BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) TEMPLATE_DIR = os.path.join(BASE_DIR, 'templates') app = Flask(__name__, template_folder=TEMPLATE_DIR) app.url_map.strict_slashes = False app.config['SECRET_KEY'] = config.FLASK_SECRET # Use absolute path for database to avoid Docker path resolution issues app.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:///{os.path.join(config.DATA_DIR, "bookapp.db")}' db.init_app(app) login_manager = LoginManager() login_manager.login_view = 'login' login_manager.init_app(app) @login_manager.user_loader def load_user(user_id): return db.session.get(User, int(user_id)) # --- SETUP --- with app.app_context(): db.create_all() # Auto-create Admin from Environment Variables (Docker/Portainer Setup) if config.ADMIN_USER and config.ADMIN_PASSWORD: admin = User.query.filter_by(username=config.ADMIN_USER).first() if not admin: print(f"🔐 System: Creating Admin User '{config.ADMIN_USER}' from environment variables.") admin = User(username=config.ADMIN_USER, password=generate_password_hash(config.ADMIN_PASSWORD, method='pbkdf2:sha256'), is_admin=True) db.session.add(admin) db.session.commit() else: # Ensure admin privileges and sync password with Env Var (allows password reset via Docker) print(f"🔐 System: Syncing Admin User '{config.ADMIN_USER}' settings from environment.") if not admin.is_admin: admin.is_admin = True admin.password = generate_password_hash(config.ADMIN_PASSWORD, method='pbkdf2:sha256') db.session.add(admin) db.session.commit() elif not User.query.filter_by(is_admin=True).first(): print("â„šī¸ System: No Admin credentials found in environment variables. Admin account not created.") # Migration: Add 'progress' column if missing try: with db.engine.connect() as conn: conn.execute(text("ALTER TABLE run ADD COLUMN progress INTEGER DEFAULT 0")) conn.commit() print("✅ System: Added 'progress' column to Run table.") except: pass # Reset stuck runs on startup try: stuck_runs = Run.query.filter_by(status='running').all() if stuck_runs: print(f"âš ī¸ System: Found {len(stuck_runs)} stuck runs. Resetting to 'failed'.") for r in stuck_runs: r.status = 'failed' r.end_time = datetime.utcnow() db.session.commit() except Exception as e: print(f"âš ī¸ System: Failed to clean up stuck runs: {e}") # --- DECORATORS --- def admin_required(f): @wraps(f) def decorated_function(*args, **kwargs): if not current_user.is_authenticated or not current_user.is_admin: flash("Admin access required.") return redirect(url_for('index')) return f(*args, **kwargs) return decorated_function def is_project_locked(project_id): """Returns True if the project has any completed runs (Book 1 written).""" return Run.query.filter_by(project_id=project_id, status='completed').count() > 0 def is_safe_url(target): ref_url = urlparse(request.host_url) test_url = urlparse(urljoin(request.host_url, target)) return test_url.scheme in ('http', 'https') and \ ref_url.netloc == test_url.netloc # --- ROUTES --- @app.route('/') @login_required def index(): projects = Project.query.filter_by(user_id=current_user.id).all() return render_template('dashboard.html', projects=projects, user=current_user) @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': username = request.form.get('username') password = request.form.get('password') user = User.query.filter_by(username=username).first() if user and check_password_hash(user.password, password): login_user(user) next_page = request.args.get('next') if not next_page or not is_safe_url(next_page): next_page = url_for('index') return redirect(next_page) if user and user.is_admin: print(f"âš ī¸ System: Admin login failed for '{username}'. Password hash mismatch.") flash('Invalid credentials') return render_template('login.html') @app.route('/register', methods=['GET', 'POST']) def register(): if request.method == 'POST': username = request.form.get('username') password = request.form.get('password') if User.query.filter_by(username=username).first(): flash('Username exists') return redirect(url_for('register')) new_user = User(username=username, password=generate_password_hash(password, method='pbkdf2:sha256')) # Auto-promote if matches env var if config.ADMIN_USER and username == config.ADMIN_USER: new_user.is_admin = True try: db.session.add(new_user) db.session.commit() login_user(new_user) return redirect(url_for('index')) except IntegrityError: db.session.rollback() flash('Username exists') return redirect(url_for('register')) return render_template('register.html') @app.route('/project/setup', methods=['POST']) @login_required def project_setup_wizard(): concept = request.form.get('concept') # Initialize AI if needed try: ai.init_models() except: pass if not ai.model_logic: flash("AI models not initialized.") return redirect(url_for('index')) prompt = f""" Analyze this story concept and suggest metadata for a book or series. CONCEPT: {concept} RETURN JSON with these keys: - title: Suggested book title - genre: Genre - target_audience: e.g. Adult, YA - tone: e.g. Dark, Whimsical - length_category: One of ["01", "1", "2", "2b", "3", "4", "5"] based on likely depth. - estimated_chapters: int (suggested chapter count) - estimated_word_count: string (e.g. "75,000") - include_prologue: boolean - include_epilogue: boolean - tropes: list of strings - pov_style: e.g. First Person - time_period: e.g. Modern - spice: e.g. Standard, Explicit - violence: e.g. None, Graphic - is_series: boolean - series_title: string (if series) - narrative_tense: e.g. Past, Present - language_style: e.g. Standard, Flowery - dialogue_style: e.g. Witty, Formal - page_orientation: Portrait, Landscape, or Square - formatting_rules: list of strings - author_bio: string (suggested persona bio) """ suggestions = {} try: response = ai.model_logic.generate_content(prompt) suggestions = json.loads(utils.clean_json(response.text)) except Exception as e: flash(f"AI Analysis failed: {e}") suggestions = {"title": "New Project", "genre": "Fiction"} # Load Personas for dropdown personas = {} if os.path.exists(config.PERSONAS_FILE): try: with open(config.PERSONAS_FILE, 'r') as f: personas = json.load(f) except: pass return render_template('project_setup.html', s=suggestions, concept=concept, personas=personas, lengths=config.LENGTH_DEFINITIONS) @app.route('/project/setup/refine', methods=['POST']) @login_required def project_setup_refine(): # Re-run the wizard logic with an instruction concept = request.form.get('concept') instruction = request.form.get('refine_instruction') # Reconstruct current state from form to pass back to AI current_state = { "title": request.form.get('title'), "genre": request.form.get('genre'), "target_audience": request.form.get('audience'), "tone": request.form.get('tone'), # ... (capture other fields if critical, or just rely on AI re-generating from concept + instruction) } try: ai.init_models() except: pass prompt = f""" Update these project suggestions based on the user instruction. ORIGINAL CONCEPT: {concept} CURRENT TITLE: {current_state['title']} INSTRUCTION: {instruction} RETURN JSON with the same keys as a full analysis (title, genre, length_category, etc). """ suggestions = {} try: response = ai.model_logic.generate_content(prompt) suggestions = json.loads(utils.clean_json(response.text)) except Exception as e: flash(f"Refinement failed: {e}") return redirect(url_for('index')) # Fallback personas = {} if os.path.exists(config.PERSONAS_FILE): try: with open(config.PERSONAS_FILE, 'r') as f: personas = json.load(f) except: pass return render_template('project_setup.html', s=suggestions, concept=concept, personas=personas, lengths=config.LENGTH_DEFINITIONS) @app.route('/project/create', methods=['POST']) @login_required def create_project_final(): title = request.form.get('title') safe_title = utils.sanitize_filename(title) user_dir = os.path.join(config.DATA_DIR, "users", str(current_user.id)) os.makedirs(user_dir, exist_ok=True) proj_path = os.path.join(user_dir, safe_title) if os.path.exists(proj_path): safe_title += f"_{int(datetime.utcnow().timestamp())}" proj_path = os.path.join(user_dir, safe_title) os.makedirs(proj_path, exist_ok=True) # Construct Bible from Form Data length_cat = request.form.get('length_category') len_def = config.LENGTH_DEFINITIONS.get(length_cat, config.LENGTH_DEFINITIONS['4']).copy() # Overrides try: len_def['chapters'] = int(request.form.get('chapters')) except: pass len_def['words'] = request.form.get('words') len_def['include_prologue'] = 'include_prologue' in request.form len_def['include_epilogue'] = 'include_epilogue' in request.form is_series = 'is_series' in request.form style = { "tone": request.form.get('tone'), "pov_style": request.form.get('pov_style'), "time_period": request.form.get('time_period'), "spice": request.form.get('spice'), "violence": request.form.get('violence'), "narrative_tense": request.form.get('narrative_tense'), "language_style": request.form.get('language_style'), "dialogue_style": request.form.get('dialogue_style'), "page_orientation": request.form.get('page_orientation'), "tropes": [x.strip() for x in request.form.get('tropes', '').split(',') if x.strip()], "formatting_rules": [x.strip() for x in request.form.get('formatting_rules', '').split(',') if x.strip()] } bible = { "project_metadata": { "title": title, "author": request.form.get('author'), "author_bio": request.form.get('author_bio'), "genre": request.form.get('genre'), "target_audience": request.form.get('audience'), "is_series": is_series, "length_settings": len_def, "style": style }, "books": [], "characters": [] } # Create Books count = 1 if is_series: try: count = int(request.form.get('series_count', 1)) except: count = 3 concept = request.form.get('concept', '') for i in range(count): bible['books'].append({ "book_number": i+1, "title": f"{title} - Book {i+1}" if is_series else title, "manual_instruction": concept if i==0 else "", "plot_beats": [] }) # Enrich via AI immediately (Always, to ensure Bible is full) try: ai.init_models() bible = story.enrich(bible, proj_path) except: pass with open(os.path.join(proj_path, "bible.json"), 'w') as f: json.dump(bible, f, indent=2) new_proj = Project(user_id=current_user.id, name=title, folder_path=proj_path) db.session.add(new_proj) db.session.commit() return redirect(url_for('view_project', id=new_proj.id)) @app.route('/project/import', methods=['POST']) @login_required def import_project(): if 'bible_file' not in request.files: flash('No file part') return redirect(url_for('index')) file = request.files['bible_file'] if file.filename == '': flash('No selected file') return redirect(url_for('index')) if file: try: bible = json.load(file) # Basic validation if 'project_metadata' not in bible or 'title' not in bible['project_metadata']: flash("Invalid Bible format: Missing project_metadata or title.") return redirect(url_for('index')) title = bible['project_metadata']['title'] safe_title = utils.sanitize_filename(title) user_dir = os.path.join(config.DATA_DIR, "users", str(current_user.id)) os.makedirs(user_dir, exist_ok=True) proj_path = os.path.join(user_dir, safe_title) if os.path.exists(proj_path): safe_title += f"_{int(datetime.utcnow().timestamp())}" proj_path = os.path.join(user_dir, safe_title) os.makedirs(proj_path) with open(os.path.join(proj_path, "bible.json"), 'w') as f: json.dump(bible, f, indent=2) new_proj = Project(user_id=current_user.id, name=title, folder_path=proj_path) db.session.add(new_proj) db.session.commit() flash(f"Project '{title}' imported successfully.") return redirect(url_for('view_project', id=new_proj.id)) except Exception as e: flash(f"Import failed: {str(e)}") return redirect(url_for('index')) @app.route('/project/') @login_required def view_project(id): proj = db.session.get(Project, id) if not proj: return "Project not found", 404 if proj.user_id != current_user.id: return "Unauthorized", 403 # Load Bible bible_path = os.path.join(proj.folder_path, "bible.json") bible_data = utils.load_json(bible_path) # Load Personas for dropdown personas = {} if os.path.exists(config.PERSONAS_FILE): try: with open(config.PERSONAS_FILE, 'r') as f: personas = json.load(f) except: pass runs = Run.query.filter_by(project_id=id).order_by(Run.id.desc()).all() latest_run = runs[0] if runs else None # Fetch other projects for "Related Series" import other_projects = Project.query.filter(Project.user_id == current_user.id, Project.id != id).all() artifacts = [] cover_image = None generated_books = {} # Map book_number -> {status: 'generated', run_id: int, folder: str} locked = is_project_locked(id) # Scan ALL completed runs to find the latest status of each book for r in runs: if r.status == 'completed': run_dir = os.path.join(proj.folder_path, "runs", f"run_{r.id}") if os.path.exists(run_dir): # 1. Scan for Generated Books for d in os.listdir(run_dir): if d.startswith("Book_") and os.path.isdir(os.path.join(run_dir, d)): # Check for manuscript to confirm generation if os.path.exists(os.path.join(run_dir, d, "manuscript.json")): try: parts = d.split('_') if len(parts) > 1 and parts[1].isdigit(): b_num = int(parts[1]) # Only add if we haven't found a newer version (runs are ordered desc) if b_num not in generated_books: # Find artifacts for direct download link book_path = os.path.join(run_dir, d) epub_file = next((f for f in os.listdir(book_path) if f.endswith('.epub')), None) docx_file = next((f for f in os.listdir(book_path) if f.endswith('.docx')), None) generated_books[b_num] = {'status': 'generated', 'run_id': r.id, 'folder': d, 'epub': os.path.join(d, epub_file).replace("\\", "/") if epub_file else None, 'docx': os.path.join(d, docx_file).replace("\\", "/") if docx_file else None} except: pass # Collect Artifacts from Latest Run if latest_run: run_dir = os.path.join(proj.folder_path, "runs", f"run_{latest_run.id}") if os.path.exists(run_dir): # Find Cover Image (Root or First Book) if os.path.exists(os.path.join(run_dir, "cover.png")): cover_image = "cover.png" else: subdirs = utils.get_sorted_book_folders(run_dir) for d in subdirs: if os.path.exists(os.path.join(run_dir, d, "cover.png")): cover_image = os.path.join(d, "cover.png").replace("\\", "/") break for root, dirs, files in os.walk(run_dir): for f in files: if f.lower().endswith(('.epub', '.docx')): rel_path = os.path.relpath(os.path.join(root, f), run_dir) artifacts.append({ 'name': f, 'path': rel_path.replace("\\", "/"), 'type': f.split('.')[-1].upper() }) return render_template('project.html', project=proj, bible=bible_data, runs=runs, active_run=latest_run, artifacts=artifacts, cover_image=cover_image, personas=personas, generated_books=generated_books, other_projects=other_projects, locked=locked) @app.route('/project//run', methods=['POST']) @login_required def run_project(id): proj = db.session.get(Project, id) or Project.query.get_or_404(id) # Create Run Entry new_run = Run(project_id=id, status="queued") db.session.add(new_run) db.session.commit() # Trigger Background Task bible_path = os.path.join(proj.folder_path, "bible.json") task = generate_book_task(new_run.id, proj.folder_path, bible_path, allow_copy=True) return redirect(url_for('view_project', id=id)) @app.route('/project//review') @login_required def review_project(id): proj = db.session.get(Project, id) or Project.query.get_or_404(id) if proj.user_id != current_user.id: return "Unauthorized", 403 bible_path = os.path.join(proj.folder_path, "bible.json") bible = utils.load_json(bible_path) return render_template('project_review.html', project=proj, bible=bible) @app.route('/project//update', methods=['POST']) @login_required def update_project_metadata(id): proj = db.session.get(Project, id) or Project.query.get_or_404(id) if proj.user_id != current_user.id: return "Unauthorized", 403 if is_project_locked(id): flash("Project is locked. Clone it to make changes.") return redirect(url_for('view_project', id=id)) new_title = request.form.get('title') new_author = request.form.get('author') bible_path = os.path.join(proj.folder_path, "bible.json") bible = utils.load_json(bible_path) if bible: if new_title: bible['project_metadata']['title'] = new_title proj.name = new_title if new_author: bible['project_metadata']['author'] = new_author with open(bible_path, 'w') as f: json.dump(bible, f, indent=2) db.session.commit() return redirect(url_for('view_project', id=id)) @app.route('/project//clone', methods=['POST']) @login_required def clone_project(id): source_proj = db.session.get(Project, id) or Project.query.get_or_404(id) if source_proj.user_id != current_user.id: return "Unauthorized", 403 new_name = request.form.get('new_name') instruction = request.form.get('instruction') # Create New Project safe_title = utils.sanitize_filename(new_name) user_dir = os.path.join(config.DATA_DIR, "users", str(current_user.id)) new_path = os.path.join(user_dir, safe_title) if os.path.exists(new_path): safe_title += f"_{int(datetime.utcnow().timestamp())}" new_path = os.path.join(user_dir, safe_title) os.makedirs(new_path) # Copy Bible source_bible_path = os.path.join(source_proj.folder_path, "bible.json") if os.path.exists(source_bible_path): bible = utils.load_json(source_bible_path) bible['project_metadata']['title'] = new_name # Apply AI Instruction if provided if instruction: try: ai.init_models() bible = story.refine_bible(bible, instruction, new_path) or bible except: pass with open(os.path.join(new_path, "bible.json"), 'w') as f: json.dump(bible, f, indent=2) new_proj = Project(user_id=current_user.id, name=new_name, folder_path=new_path) db.session.add(new_proj) db.session.commit() flash(f"Project cloned as '{new_name}'.") return redirect(url_for('view_project', id=new_proj.id)) @app.route('/project//bible_comparison') @login_required def bible_comparison(id): proj = db.session.get(Project, id) or Project.query.get_or_404(id) if proj.user_id != current_user.id: return "Unauthorized", 403 bible_path = os.path.join(proj.folder_path, "bible.json") draft_path = os.path.join(proj.folder_path, "bible_draft.json") if not os.path.exists(draft_path): flash("No draft found. Please refine the bible first.") return redirect(url_for('review_project', id=id)) original = utils.load_json(bible_path) new_draft = utils.load_json(draft_path) if not original or not new_draft: flash("Error loading bible data. Draft may be corrupt.") return redirect(url_for('review_project', id=id)) return render_template('bible_comparison.html', project=proj, original=original, new=new_draft) @app.route('/project//refine_bible', methods=['POST']) @login_required def refine_bible_route(id): proj = db.session.get(Project, id) or Project.query.get_or_404(id) if proj.user_id != current_user.id: return "Unauthorized", 403 if is_project_locked(id): flash("Project is locked. Clone it to make changes.") return redirect(url_for('view_project', id=id)) # Handle JSON request (AJAX) or Form request data = request.json if request.is_json else request.form instruction = data.get('instruction') if not instruction: return {"error": "Instruction required"}, 400 source_type = data.get('source', 'original') selected_keys = data.get('selected_keys') if isinstance(selected_keys, str): try: selected_keys = json.loads(selected_keys) if selected_keys.strip() else [] except: selected_keys = [] # Start Background Task task = refine_bible_task(proj.folder_path, instruction, source_type, selected_keys) return {"status": "queued", "task_id": task.id} @app.route('/project//refine_bible/confirm', methods=['POST']) @login_required def confirm_bible_refinement(id): proj = db.session.get(Project, id) or Project.query.get_or_404(id) if proj.user_id != current_user.id: return "Unauthorized", 403 action = request.form.get('action') draft_path = os.path.join(proj.folder_path, "bible_draft.json") bible_path = os.path.join(proj.folder_path, "bible.json") if action == 'accept' or action == 'accept_all': if os.path.exists(draft_path): shutil.move(draft_path, bible_path) flash("Bible updated successfully.") else: flash("Draft expired or missing.") elif action == 'accept_selected': if os.path.exists(draft_path) and os.path.exists(bible_path): selected_keys_json = request.form.get('selected_keys', '[]') try: selected_keys = json.loads(selected_keys_json) draft = utils.load_json(draft_path) original = utils.load_json(bible_path) original = story.merge_selected_changes(original, draft, selected_keys) with open(bible_path, 'w') as f: json.dump(original, f, indent=2) os.remove(draft_path) # Cleanup draft after merge flash(f"Merged {len(selected_keys)} changes into Bible.") except Exception as e: flash(f"Merge failed: {e}") else: flash("Files missing.") elif action == 'decline': if os.path.exists(draft_path): os.remove(draft_path) flash("Changes discarded.") return redirect(url_for('view_project', id=id)) @app.route('/project//add_book', methods=['POST']) @login_required def add_book(id): proj = db.session.get(Project, id) or Project.query.get_or_404(id) if proj.user_id != current_user.id: return "Unauthorized", 403 if is_project_locked(id): flash("Project is locked. Clone it to make changes.") return redirect(url_for('view_project', id=id)) title = request.form.get('title', 'Untitled') instruction = request.form.get('instruction', '') bible_path = os.path.join(proj.folder_path, "bible.json") bible = utils.load_json(bible_path) if bible: if 'books' not in bible: bible['books'] = [] next_num = len(bible['books']) + 1 new_book = { "book_number": next_num, "title": title, "manual_instruction": instruction, "plot_beats": [] # AI will fill this if empty during run } bible['books'].append(new_book) # If series metadata isn't set, set it if 'project_metadata' in bible: bible['project_metadata']['is_series'] = True with open(bible_path, 'w') as f: json.dump(bible, f, indent=2) flash(f"Added Book {next_num}: {title}") return redirect(url_for('view_project', id=id)) @app.route('/project//book//update', methods=['POST']) @login_required def update_book_details(id, book_num): proj = db.session.get(Project, id) or Project.query.get_or_404(id) if proj.user_id != current_user.id: return "Unauthorized", 403 if is_project_locked(id): flash("Project is locked. Clone it to make changes.") return redirect(url_for('view_project', id=id)) new_title = request.form.get('title') new_instruction = request.form.get('instruction') bible_path = os.path.join(proj.folder_path, "bible.json") bible = utils.load_json(bible_path) if bible and 'books' in bible: for b in bible['books']: if b.get('book_number') == book_num: if new_title: b['title'] = new_title if new_instruction is not None: b['manual_instruction'] = new_instruction break with open(bible_path, 'w') as f: json.dump(bible, f, indent=2) flash(f"Book {book_num} updated.") return redirect(url_for('view_project', id=id)) @app.route('/project//delete_book/', methods=['POST']) @login_required def delete_book(id, book_num): proj = db.session.get(Project, id) or Project.query.get_or_404(id) if proj.user_id != current_user.id: return "Unauthorized", 403 if is_project_locked(id): flash("Project is locked. Clone it to make changes.") return redirect(url_for('view_project', id=id)) bible_path = os.path.join(proj.folder_path, "bible.json") bible = utils.load_json(bible_path) if bible and 'books' in bible: bible['books'] = [b for b in bible['books'] if b.get('book_number') != book_num] # Renumber for i, b in enumerate(bible['books']): b['book_number'] = i + 1 # Update Series Status (Revert to standalone if only 1 book remains) if 'project_metadata' in bible: bible['project_metadata']['is_series'] = (len(bible['books']) > 1) with open(bible_path, 'w') as f: json.dump(bible, f, indent=2) flash("Book deleted from plan.") return redirect(url_for('view_project', id=id)) @app.route('/project//import_characters', methods=['POST']) @login_required def import_characters(id): target_proj = db.session.get(Project, id) source_id = request.form.get('source_project_id') source_proj = db.session.get(Project, source_id) if not target_proj or not source_proj: return "Project not found", 404 if target_proj.user_id != current_user.id or source_proj.user_id != current_user.id: return "Unauthorized", 403 if is_project_locked(id): flash("Project is locked. Clone it to make changes.") return redirect(url_for('view_project', id=id)) target_bible = utils.load_json(os.path.join(target_proj.folder_path, "bible.json")) source_bible = utils.load_json(os.path.join(source_proj.folder_path, "bible.json")) if target_bible and source_bible: existing_names = {c['name'].lower() for c in target_bible.get('characters', [])} added_count = 0 for char in source_bible.get('characters', []): if char['name'].lower() not in existing_names: target_bible['characters'].append(char) added_count += 1 if added_count > 0: with open(os.path.join(target_proj.folder_path, "bible.json"), 'w') as f: json.dump(target_bible, f, indent=2) flash(f"Imported {added_count} characters from {source_proj.name}.") else: flash("No new characters found to import.") return redirect(url_for('view_project', id=id)) @app.route('/project//set_persona', methods=['POST']) @login_required def set_project_persona(id): proj = db.session.get(Project, id) or Project.query.get_or_404(id) if proj.user_id != current_user.id: return "Unauthorized", 403 if is_project_locked(id): flash("Project is locked. Clone it to make changes.") return redirect(url_for('view_project', id=id)) persona_name = request.form.get('persona_name') bible_path = os.path.join(proj.folder_path, "bible.json") bible = utils.load_json(bible_path) if bible: personas = {} if os.path.exists(config.PERSONAS_FILE): try: with open(config.PERSONAS_FILE, 'r') as f: personas = json.load(f) except: pass if persona_name in personas: bible['project_metadata']['author_details'] = personas[persona_name] with open(bible_path, 'w') as f: json.dump(bible, f, indent=2) flash(f"Project voice updated to persona: {persona_name}") else: flash("Persona not found.") return redirect(url_for('view_project', id=id)) @app.route('/project//regenerate_artifacts', methods=['POST']) @login_required def regenerate_artifacts(run_id): run = db.session.get(Run, run_id) or Run.query.get_or_404(run_id) if run.project.user_id != current_user.id: return "Unauthorized", 403 if run.status == 'running': flash("Run is already active. Please wait for it to finish.") return redirect(url_for('view_run', id=run_id)) feedback = request.form.get('feedback') # Reset state immediately so UI polls correctly run.status = 'queued' db.session.commit() regenerate_artifacts_task(run_id, run.project.folder_path, feedback=feedback) flash("Regenerating cover and files with updated metadata...") return redirect(url_for('view_run', id=run_id)) @app.route('/run//stop', methods=['POST']) @login_required def stop_run(id): run = db.session.get(Run, id) or Run.query.get_or_404(id) if run.project.user_id != current_user.id: return "Unauthorized", 403 if run.status in ['queued', 'running']: run.status = 'cancelled' run.end_time = datetime.utcnow() db.session.commit() # Signal the backend process to stop by creating a .stop file run_dir = os.path.join(run.project.folder_path, "runs", f"run_{run.id}") if os.path.exists(run_dir): with open(os.path.join(run_dir, ".stop"), 'w') as f: f.write("stop") flash(f"Run {id} marked as cancelled.") return redirect(url_for('view_project', id=run.project_id)) @app.route('/run//restart', methods=['POST']) @login_required def restart_run(id): run = db.session.get(Run, id) or Run.query.get_or_404(id) if run.project.user_id != current_user.id: return "Unauthorized", 403 # Create a new run new_run = Run(project_id=run.project_id, status="queued") db.session.add(new_run) db.session.commit() # Check mode: 'resume' (default) vs 'restart' mode = request.form.get('mode', 'resume') feedback = request.form.get('feedback') keep_cover = 'keep_cover' in request.form force_regen = 'force_regenerate' in request.form allow_copy = (mode == 'resume' and not force_regen) if feedback: allow_copy = False # Force regeneration if feedback provided to ensure changes are applied task = generate_book_task(new_run.id, run.project.folder_path, os.path.join(run.project.folder_path, "bible.json"), allow_copy=allow_copy, feedback=feedback, source_run_id=id if feedback else None, keep_cover=keep_cover) flash(f"Started new Run #{new_run.id}" + (" with modifications." if feedback else ".")) return redirect(url_for('view_project', id=run.project_id)) @app.route('/run/') @login_required def view_run(id): run = db.session.get(Run, id) if not run: return "Run not found", 404 if run.project.user_id != current_user.id: return "Unauthorized", 403 # Fetch logs for initial render log_content = "" logs = LogEntry.query.filter_by(run_id=id).order_by(LogEntry.timestamp).all() if logs: log_content = "\n".join([f"[{l.timestamp.strftime('%H:%M:%S')}] {l.phase:<15} | {l.message}" for l in logs]) elif run.log_file and os.path.exists(run.log_file): with open(run.log_file, 'r') as f: log_content = f.read() # Fetch Artifacts for Display run_dir = os.path.join(run.project.folder_path, "runs", f"run_{run.id}") # Detect Books in Run (Series Support) books_data = [] if os.path.exists(run_dir): subdirs = utils.get_sorted_book_folders(run_dir) for d in subdirs: b_path = os.path.join(run_dir, d) b_info = {'folder': d, 'artifacts': [], 'cover': None, 'blurb': ''} # Artifacts for f in os.listdir(b_path): if f.lower().endswith(('.epub', '.docx')): b_info['artifacts'].append({'name': f, 'path': os.path.join(d, f).replace("\\", "/")}) # Cover if os.path.exists(os.path.join(b_path, "cover.png")): b_info['cover'] = os.path.join(d, "cover.png").replace("\\", "/") # Blurb blurb_p = os.path.join(b_path, "blurb.txt") if os.path.exists(blurb_p): with open(blurb_p, 'r', encoding='utf-8', errors='ignore') as f: b_info['blurb'] = f.read() books_data.append(b_info) # Load Bible Data for Dropdown bible_path = os.path.join(run.project.folder_path, "bible.json") bible_data = utils.load_json(bible_path) # Load Tracking Data for Run Details tracking = {"events": [], "characters": {}, "content_warnings": []} # We load tracking from the LAST book found to populate the general stats (most up-to-date) book_dir = os.path.join(run_dir, books_data[-1]['folder']) if books_data else run_dir if os.path.exists(book_dir): t_ev = os.path.join(book_dir, "tracking_events.json") t_ch = os.path.join(book_dir, "tracking_characters.json") t_wn = os.path.join(book_dir, "tracking_warnings.json") # Load safely, defaulting to empty structures if load_json returns None if os.path.exists(t_ev): tracking['events'] = utils.load_json(t_ev) or [] if os.path.exists(t_ch): tracking['characters'] = utils.load_json(t_ch) or {} if os.path.exists(t_wn): tracking['content_warnings'] = utils.load_json(t_wn) or [] # Use dedicated run details template return render_template('run_details.html', run=run, log_content=log_content, books=books_data, bible=bible_data, tracking=tracking) @app.route('/run//status') @login_required def run_status(id): run = db.session.get(Run, id) or Run.query.get_or_404(id) # Check status from DB or fallback to log file log_content = "" last_log = None # 1. Try Database Logs (Fastest & Best) logs = LogEntry.query.filter_by(run_id=id).order_by(LogEntry.timestamp).all() if logs: log_content = "\n".join([f"[{l.timestamp.strftime('%H:%M:%S')}] {l.phase:<15} | {l.message}" for l in logs]) last_log = logs[-1] # 2. Fallback to File (For old runs or if DB logging fails) if not log_content: if run.log_file and os.path.exists(run.log_file): with open(run.log_file, 'r') as f: log_content = f.read() elif run.status in ['queued', 'running']: temp_log = os.path.join(run.project.folder_path, f"system_log_{run.id}.txt") if os.path.exists(temp_log): with open(temp_log, 'r') as f: log_content = f.read() response = {"status": run.status, "log": log_content, "cost": run.cost, "percent": run.progress} if last_log: response["progress"] = { "phase": last_log.phase, "message": last_log.message, "timestamp": last_log.timestamp.timestamp() } return response @app.route('/project//download') @login_required def download_artifact(run_id): filename = request.args.get('file') run = db.session.get(Run, run_id) or Run.query.get_or_404(run_id) if run.project.user_id != current_user.id: return "Unauthorized", 403 if not filename: return "Missing filename", 400 # Security Check: Prevent path traversal # Combined check using normpath to ensure it stays within root and catches basic traversal chars if os.path.isabs(filename) or ".." in os.path.normpath(filename) or ":" in filename: return "Invalid filename", 400 run_dir = os.path.join(run.project.folder_path, "runs", f"run_{run.id}") # If file not found in root, check subfolders (Series Support) if not os.path.exists(os.path.join(run_dir, filename)) and os.path.exists(run_dir): subdirs = utils.get_sorted_book_folders(run_dir) # Scan all book folders for d in subdirs: possible_path = os.path.join(d, filename) if os.path.exists(os.path.join(run_dir, possible_path)): filename = possible_path break return send_from_directory(run_dir, filename, as_attachment=True) @app.route('/project//read/') @login_required def read_book(run_id, book_folder): run = db.session.get(Run, run_id) or Run.query.get_or_404(run_id) if run.project.user_id != current_user.id: return "Unauthorized", 403 # Security Check: Prevent path traversal in book_folder if not book_folder or "/" in book_folder or "\\" in book_folder or ".." in book_folder: return "Invalid book folder", 400 run_dir = os.path.join(run.project.folder_path, "runs", f"run_{run.id}") book_path = os.path.join(run_dir, book_folder) ms_path = os.path.join(book_path, "manuscript.json") if not os.path.exists(ms_path): flash("Manuscript not found.") return redirect(url_for('view_run', id=run_id)) manuscript = utils.load_json(ms_path) # Sort by chapter number (Handle Prologue/Epilogue) manuscript.sort(key=utils.chapter_sort_key) # Render Markdown for display for ch in manuscript: ch['html_content'] = markdown.markdown(ch.get('content', '')) return render_template('read_book.html', run=run, book_folder=book_folder, manuscript=manuscript) @app.route('/project//save_chapter', methods=['POST']) @login_required def save_chapter(run_id): run = db.session.get(Run, run_id) or Run.query.get_or_404(run_id) if run.project.user_id != current_user.id: return "Unauthorized", 403 if run.status == 'running': return "Cannot edit chapter while run is active.", 409 book_folder = request.form.get('book_folder') chap_num_raw = request.form.get('chapter_num') try: chap_num = int(chap_num_raw) except: chap_num = chap_num_raw new_content = request.form.get('content') # Security Check if not book_folder or "/" in book_folder or "\\" in book_folder or ".." in book_folder: return "Invalid book folder", 400 run_dir = os.path.join(run.project.folder_path, "runs", f"run_{run.id}") ms_path = os.path.join(run_dir, book_folder, "manuscript.json") if os.path.exists(ms_path): ms = utils.load_json(ms_path) for ch in ms: if ch.get('num') == chap_num: ch['content'] = new_content break with open(ms_path, 'w') as f: json.dump(ms, f, indent=2) # Regenerate Artifacts (EPUB/DOCX) to reflect manual edits book_path = os.path.join(run_dir, book_folder) bp_path = os.path.join(book_path, "final_blueprint.json") if os.path.exists(bp_path): bp = utils.load_json(bp_path) export.compile_files(bp, ms, book_path) return "Saved", 200 return "Error", 500 @app.route('/project//check_consistency/') @login_required def check_consistency(run_id, book_folder): run = db.session.get(Run, run_id) or Run.query.get_or_404(run_id) # Security Check if not book_folder or "/" in book_folder or "\\" in book_folder or ".." in book_folder: return "Invalid book folder", 400 run_dir = os.path.join(run.project.folder_path, "runs", f"run_{run.id}") book_path = os.path.join(run_dir, book_folder) bp = utils.load_json(os.path.join(book_path, "final_blueprint.json")) ms = utils.load_json(os.path.join(book_path, "manuscript.json")) if not bp or not ms: return "Data files missing or corrupt.", 404 try: ai.init_models() except: pass report = story.analyze_consistency(bp, ms, book_path) return render_template('consistency_report.html', report=report, run=run, book_folder=book_folder) @app.route('/project//sync_book/', methods=['POST']) @login_required def sync_book_metadata(run_id, book_folder): run = db.session.get(Run, run_id) or Run.query.get_or_404(run_id) if run.project.user_id != current_user.id: return "Unauthorized", 403 if run.status == 'running': flash("Cannot sync metadata while run is active.") return redirect(url_for('read_book', run_id=run_id, book_folder=book_folder)) # Security Check if not book_folder or "/" in book_folder or "\\" in book_folder or ".." in book_folder: return "Invalid book folder", 400 run_dir = os.path.join(run.project.folder_path, "runs", f"run_{run.id}") book_path = os.path.join(run_dir, book_folder) ms_path = os.path.join(book_path, "manuscript.json") bp_path = os.path.join(book_path, "final_blueprint.json") if os.path.exists(ms_path) and os.path.exists(bp_path): ms = utils.load_json(ms_path) bp = utils.load_json(bp_path) if not ms or not bp: flash("Data files corrupt.") return redirect(url_for('read_book', run_id=run_id, book_folder=book_folder)) try: ai.init_models() except: pass # 1. Harvest new characters/info from the EDITED text (Updates BP) bp = story.harvest_metadata(bp, book_path, ms) # 2. Sync Tracking (Ensure new characters exist in tracking file) tracking_path = os.path.join(book_path, "tracking_characters.json") if os.path.exists(tracking_path): tracking_chars = utils.load_json(tracking_path) or {} updated_tracking = False for c in bp.get('characters', []): if c.get('name') and c['name'] not in tracking_chars: tracking_chars[c['name']] = {"descriptors": [c.get('description', '')], "likes_dislikes": [], "last_worn": "Unknown"} updated_tracking = True if updated_tracking: with open(tracking_path, 'w') as f: json.dump(tracking_chars, f, indent=2) # 3. Update Persona (Style might have changed during manual edits) story.update_persona_sample(bp, book_path) with open(bp_path, 'w') as f: json.dump(bp, f, indent=2) flash("Metadata synced. Future generations will respect your edits.") else: flash("Files not found.") return redirect(url_for('read_book', run_id=run_id, book_folder=book_folder)) @app.route('/project//rewrite_chapter', methods=['POST']) @login_required def rewrite_chapter(run_id): run = db.session.get(Run, run_id) or Run.query.get_or_404(run_id) if run.project.user_id != current_user.id: return {"error": "Unauthorized"}, 403 if run.status == 'running': return {"error": "Cannot rewrite while run is active."}, 409 data = request.json book_folder = data.get('book_folder') chap_num = data.get('chapter_num') instruction = data.get('instruction') if not book_folder or chap_num is None or not instruction: return {"error": "Missing parameters"}, 400 # Security Check if "/" in book_folder or "\\" in book_folder or ".." in book_folder: return {"error": "Invalid book folder"}, 400 # Try to convert to int, but allow strings (e.g. "Epilogue") try: chap_num = int(chap_num) except: pass # Start background task task = rewrite_chapter_task(run.id, run.project.folder_path, book_folder, chap_num, instruction) # Store task ID in session to poll for status session['rewrite_task_id'] = task.id return {"status": "queued", "task_id": task.id}, 202 @app.route('/task_status/') @login_required def get_task_status(task_id): task_result = huey.result(task_id, peek=True) if task_result is None: return {"status": "running"} else: return {"status": "completed", "success": task_result} @app.route('/logout') def logout(): logout_user() return redirect(url_for('login')) @app.route('/debug/routes') @login_required @admin_required def debug_routes(): output = [] for rule in app.url_map.iter_rules(): methods = ','.join(rule.methods) # Use brackets so they are visible in browser text rule_str = str(rule).replace('<', '[').replace('>', ']') line = "{:50s} {:20s} {}".format(rule.endpoint, methods, rule_str) output.append(line) return "
" + "\n".join(output) + "
" @app.route('/system/optimize_models', methods=['POST']) @login_required @admin_required def optimize_models(): # Force refresh via AI module (safely handles failures) try: ai.init_models(force=True) # Force re-initialization and API scan # Refresh Style Guidelines if ai.model_logic: story.refresh_style_guidelines(ai.model_logic) flash("AI Models refreshed and Style Guidelines updated.") except Exception as e: flash(f"Error refreshing models: {e}") return redirect(request.referrer or url_for('index')) @app.route('/system/status') @login_required def system_status(): # System Status View: Show AI Models and Quotas models_info = {} cache_data = {} cache_path = os.path.join(config.DATA_DIR, "model_cache.json") if os.path.exists(cache_path): try: with open(cache_path, 'r') as f: cache_data = json.load(f) models_info = cache_data.get('models', {}) except: pass return render_template('system_status.html', models=models_info, cache=cache_data, datetime=datetime) @app.route('/personas') @login_required def list_personas(): personas = {} if os.path.exists(config.PERSONAS_FILE): try: with open(config.PERSONAS_FILE, 'r') as f: personas = json.load(f) except: pass return render_template('personas.html', personas=personas) @app.route('/persona/new') @login_required def new_persona(): return render_template('persona_edit.html', persona={}, name="") @app.route('/persona/') @login_required def edit_persona(name): personas = {} if os.path.exists(config.PERSONAS_FILE): try: with open(config.PERSONAS_FILE, 'r') as f: personas = json.load(f) except: pass persona = personas.get(name) if not persona: flash(f"Persona '{name}' not found.") return redirect(url_for('list_personas')) return render_template('persona_edit.html', persona=persona, name=name) @app.route('/persona/save', methods=['POST']) @login_required def save_persona(): old_name = request.form.get('old_name') name = request.form.get('name') if not name: flash("Persona name is required.") return redirect(url_for('list_personas')) personas = {} if os.path.exists(config.PERSONAS_FILE): try: with open(config.PERSONAS_FILE, 'r') as f: personas = json.load(f) except: pass if old_name and old_name != name and old_name in personas: del personas[old_name] persona = { "name": name, "bio": request.form.get('bio'), "age": request.form.get('age'), "gender": request.form.get('gender'), "race": request.form.get('race'), "nationality": request.form.get('nationality'), "language": request.form.get('language'), "sample_text": request.form.get('sample_text'), "voice_keywords": request.form.get('voice_keywords'), "style_inspirations": request.form.get('style_inspirations') } personas[name] = persona with open(config.PERSONAS_FILE, 'w') as f: json.dump(personas, f, indent=2) flash(f"Persona '{name}' saved.") return redirect(url_for('list_personas')) @app.route('/persona/delete/', methods=['POST']) @login_required def delete_persona(name): personas = {} if os.path.exists(config.PERSONAS_FILE): try: with open(config.PERSONAS_FILE, 'r') as f: personas = json.load(f) except: pass if name in personas: del personas[name] with open(config.PERSONAS_FILE, 'w') as f: json.dump(personas, f, indent=2) flash(f"Persona '{name}' deleted.") return redirect(url_for('list_personas')) @app.route('/persona/analyze', methods=['POST']) @login_required def analyze_persona(): try: ai.init_models() except: pass if not ai.model_logic: return {"error": "AI models not initialized."}, 500 data = request.json sample = data.get('sample_text', '') prompt = f""" Act as a Literary Analyst. Create or analyze an Author Persona profile. INPUT DATA: Name: {data.get('name')} Age: {data.get('age')} | Gender: {data.get('gender')} | Nationality: {data.get('nationality')} Sample Text: {sample[:3000]} TASK: 1. 'bio': Write a 2-3 sentence description of the writing style. If sample is provided, analyze it. If not, invent a style that fits the demographics/name. 2. 'voice_keywords': Comma-separated list of 3-5 adjectives describing the voice (e.g. Gritty, Whimsical, Sarcastic). 3. 'style_inspirations': Comma-separated list of 1-3 famous authors or genres that this style resembles. RETURN JSON: {{ "bio": "...", "voice_keywords": "...", "style_inspirations": "..." }} """ try: response = ai.model_logic.generate_content(prompt) return json.loads(utils.clean_json(response.text)) except Exception as e: return {"error": str(e)}, 500 # --- ADMIN ROUTES --- @app.route('/admin') @login_required @admin_required def admin_dashboard(): users = User.query.all() projects = Project.query.all() return render_template('admin_dashboard.html', users=users, projects=projects) @app.route('/admin/user//delete', methods=['POST']) @login_required @admin_required def admin_delete_user(user_id): if user_id == current_user.id: flash("Cannot delete yourself.") return redirect(url_for('admin_dashboard')) user = db.session.get(User, user_id) if user: # Delete user data folder user_path = os.path.join(config.DATA_DIR, "users", str(user.id)) if os.path.exists(user_path): try: shutil.rmtree(user_path) except: pass # Delete projects (DB cascade handles rows, we handle files) projects = Project.query.filter_by(user_id=user.id).all() for p in projects: db.session.delete(p) db.session.delete(user) db.session.commit() flash(f"User {user.username} deleted.") return redirect(url_for('admin_dashboard')) @app.route('/admin/project//delete', methods=['POST']) @login_required @admin_required def admin_delete_project(project_id): proj = db.session.get(Project, project_id) if proj: if os.path.exists(proj.folder_path): try: shutil.rmtree(proj.folder_path) except: pass db.session.delete(proj) db.session.commit() flash(f"Project {proj.name} deleted.") return redirect(url_for('admin_dashboard')) @app.route('/admin/reset', methods=['POST']) @login_required @admin_required def admin_factory_reset(): # 1. Delete ALL Projects (Files & DB) projects = Project.query.all() for p in projects: if os.path.exists(p.folder_path): try: shutil.rmtree(p.folder_path) except: pass db.session.delete(p) # 2. Delete ALL Users except Current Admin users = User.query.filter(User.id != current_user.id).all() for u in users: user_path = os.path.join(config.DATA_DIR, "users", str(u.id)) if os.path.exists(user_path): try: shutil.rmtree(user_path) except: pass db.session.delete(u) # 3. Reset Personas to Default if os.path.exists(config.PERSONAS_FILE): try: os.remove(config.PERSONAS_FILE) except: pass utils.create_default_personas() db.session.commit() flash("Factory Reset Complete. All other users and projects have been wiped.") return redirect(url_for('admin_dashboard')) @app.route('/admin/spend') @login_required @admin_required def admin_spend_report(): days = request.args.get('days', 30, type=int) if days > 0: start_date = datetime.utcnow() - timedelta(days=days) else: start_date = datetime.min # Aggregate spend per user results = db.session.query( User.username, func.count(Run.id), func.sum(Run.cost) ).join(Project, Project.user_id == User.id)\ .join(Run, Run.project_id == Project.id)\ .filter(Run.start_time >= start_date)\ .group_by(User.id, User.username).all() report = [] total_period_spend = 0.0 for r in results: cost = r[2] if r[2] else 0.0 report.append({"username": r[0], "runs": r[1], "cost": cost}) total_period_spend += cost return render_template('admin_spend.html', report=report, days=days, total=total_period_spend) @app.route('/admin/style', methods=['GET', 'POST']) @login_required @admin_required def admin_style_guidelines(): path = os.path.join(config.DATA_DIR, "style_guidelines.json") if request.method == 'POST': ai_isms_raw = request.form.get('ai_isms', '') filter_words_raw = request.form.get('filter_words', '') data = { "ai_isms": [x.strip() for x in ai_isms_raw.split('\n') if x.strip()], "filter_words": [x.strip() for x in filter_words_raw.split('\n') if x.strip()] } with open(path, 'w') as f: json.dump(data, f, indent=2) flash("Style Guidelines updated successfully.") return redirect(url_for('admin_style_guidelines')) # Load current (creates defaults if missing) data = story.get_style_guidelines() return render_template('admin_style.html', data=data) @app.route('/admin/impersonate/') @login_required @admin_required def impersonate_user(user_id): if user_id == current_user.id: flash("Cannot impersonate yourself.") return redirect(url_for('admin_dashboard')) user = db.session.get(User, user_id) if user: session['original_admin_id'] = current_user.id login_user(user) flash(f"Now viewing as {user.username}") return redirect(url_for('index')) return redirect(url_for('admin_dashboard')) @app.route('/admin/stop_impersonate') @login_required def stop_impersonate(): admin_id = session.get('original_admin_id') if admin_id: admin = db.session.get(User, admin_id) if admin: login_user(admin) session.pop('original_admin_id', None) flash("Restored admin session.") return redirect(url_for('admin_dashboard')) return redirect(url_for('index')) if __name__ == '__main__': # Run the Huey consumer in a separate thread for testing on Pi # For production, run `huey_consumer.py web_tasks.huey` in terminal import threading def run_huey(): from huey.consumer import Consumer # Subclass to disable signal handling (which fails in threads) class ThreadedConsumer(Consumer): def _set_signal_handlers(self): pass c = ThreadedConsumer(huey, workers=1, worker_type='thread') c.run() # Configuration debug_mode = os.environ.get("FLASK_DEBUG", "False").lower() == "true" # Run worker if: 1. In reloader child process OR 2. Reloader is disabled (debug=False) if os.environ.get("WERKZEUG_RUN_MAIN") == "true" or not debug_mode: threading.Thread(target=run_huey, daemon=True).start() app.run(host='0.0.0.0', port=5000, debug=debug_mode)