1485 lines
56 KiB
Python
1485 lines
56 KiB
Python
import os
|
|
import json
|
|
import shutil
|
|
import markdown
|
|
from functools import wraps
|
|
from datetime import datetime, timedelta
|
|
from urllib.parse import urlparse, urljoin
|
|
from sqlalchemy import func, text
|
|
from sqlalchemy.exc import IntegrityError
|
|
from flask import Flask, render_template, request, redirect, url_for, flash, send_from_directory, session
|
|
from flask_login import LoginManager, login_user, login_required, logout_user, current_user
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
from .web_db import db, User, Project, Run, LogEntry
|
|
from .web_tasks import huey, generate_book_task, regenerate_artifacts_task, rewrite_chapter_task
|
|
import config
|
|
from . import utils
|
|
from . import ai
|
|
from . import story
|
|
from . import export
|
|
|
|
# Calculate paths relative to this file (modules/web_app.py)
|
|
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
TEMPLATE_DIR = os.path.join(BASE_DIR, 'templates')
|
|
|
|
app = Flask(__name__, template_folder=TEMPLATE_DIR)
|
|
app.url_map.strict_slashes = False
|
|
app.config['SECRET_KEY'] = config.FLASK_SECRET
|
|
# Use absolute path for database to avoid Docker path resolution issues
|
|
app.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:///{os.path.join(config.DATA_DIR, "bookapp.db")}'
|
|
|
|
db.init_app(app)
|
|
login_manager = LoginManager()
|
|
login_manager.login_view = 'login'
|
|
login_manager.init_app(app)
|
|
|
|
@login_manager.user_loader
|
|
def load_user(user_id):
|
|
return db.session.get(User, int(user_id))
|
|
|
|
# --- SETUP ---
|
|
with app.app_context():
|
|
db.create_all()
|
|
|
|
# Auto-create Admin from Environment Variables (Docker/Portainer Setup)
|
|
if config.ADMIN_USER and config.ADMIN_PASSWORD:
|
|
admin = User.query.filter_by(username=config.ADMIN_USER).first()
|
|
if not admin:
|
|
print(f"🔐 System: Creating Admin User '{config.ADMIN_USER}' from environment variables.")
|
|
admin = User(username=config.ADMIN_USER, password=generate_password_hash(config.ADMIN_PASSWORD, method='pbkdf2:sha256'), is_admin=True)
|
|
db.session.add(admin)
|
|
db.session.commit()
|
|
else:
|
|
# Ensure admin privileges and sync password with Env Var (allows password reset via Docker)
|
|
if not admin.is_admin: admin.is_admin = True
|
|
admin.password = generate_password_hash(config.ADMIN_PASSWORD, method='pbkdf2:sha256')
|
|
db.session.commit()
|
|
|
|
# Migration: Add 'progress' column if missing
|
|
try:
|
|
with db.engine.connect() as conn:
|
|
conn.execute(text("ALTER TABLE run ADD COLUMN progress INTEGER DEFAULT 0"))
|
|
conn.commit()
|
|
print("✅ System: Added 'progress' column to Run table.")
|
|
except: pass
|
|
|
|
# Reset stuck runs on startup
|
|
try:
|
|
stuck_runs = Run.query.filter_by(status='running').all()
|
|
if stuck_runs:
|
|
print(f"⚠️ System: Found {len(stuck_runs)} stuck runs. Resetting to 'failed'.")
|
|
for r in stuck_runs:
|
|
r.status = 'failed'
|
|
r.end_time = datetime.utcnow()
|
|
db.session.commit()
|
|
except Exception as e:
|
|
print(f"⚠️ System: Failed to clean up stuck runs: {e}")
|
|
|
|
# --- DECORATORS ---
|
|
def admin_required(f):
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if not current_user.is_authenticated or not current_user.is_admin:
|
|
flash("Admin access required.")
|
|
return redirect(url_for('index'))
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
def is_project_locked(project_id):
|
|
"""Returns True if the project has any completed runs (Book 1 written)."""
|
|
return Run.query.filter_by(project_id=project_id, status='completed').count() > 0
|
|
|
|
def is_safe_url(target):
|
|
ref_url = urlparse(request.host_url)
|
|
test_url = urlparse(urljoin(request.host_url, target))
|
|
return test_url.scheme in ('http', 'https') and \
|
|
ref_url.netloc == test_url.netloc
|
|
|
|
# --- ROUTES ---
|
|
|
|
@app.route('/')
|
|
@login_required
|
|
def index():
|
|
projects = Project.query.filter_by(user_id=current_user.id).all()
|
|
return render_template('dashboard.html', projects=projects, user=current_user)
|
|
|
|
@app.route('/login', methods=['GET', 'POST'])
|
|
def login():
|
|
if request.method == 'POST':
|
|
username = request.form.get('username')
|
|
password = request.form.get('password')
|
|
user = User.query.filter_by(username=username).first()
|
|
if user and check_password_hash(user.password, password):
|
|
login_user(user)
|
|
next_page = request.args.get('next')
|
|
if not next_page or not is_safe_url(next_page):
|
|
next_page = url_for('index')
|
|
return redirect(next_page)
|
|
flash('Invalid credentials')
|
|
return render_template('login.html')
|
|
|
|
@app.route('/register', methods=['GET', 'POST'])
|
|
def register():
|
|
if request.method == 'POST':
|
|
username = request.form.get('username')
|
|
password = request.form.get('password')
|
|
if User.query.filter_by(username=username).first():
|
|
flash('Username exists')
|
|
return redirect(url_for('register'))
|
|
|
|
new_user = User(username=username, password=generate_password_hash(password, method='pbkdf2:sha256'))
|
|
# Auto-promote if matches env var
|
|
if config.ADMIN_USER and username == config.ADMIN_USER:
|
|
new_user.is_admin = True
|
|
try:
|
|
db.session.add(new_user)
|
|
db.session.commit()
|
|
login_user(new_user)
|
|
return redirect(url_for('index'))
|
|
except IntegrityError:
|
|
db.session.rollback()
|
|
flash('Username exists')
|
|
return redirect(url_for('register'))
|
|
return render_template('register.html')
|
|
|
|
@app.route('/project/setup', methods=['POST'])
|
|
@login_required
|
|
def project_setup_wizard():
|
|
concept = request.form.get('concept')
|
|
|
|
# Initialize AI if needed
|
|
try: ai.init_models()
|
|
except: pass
|
|
|
|
if not ai.model_logic:
|
|
flash("AI models not initialized.")
|
|
return redirect(url_for('index'))
|
|
|
|
prompt = f"""
|
|
Analyze this story concept and suggest metadata for a book or series.
|
|
CONCEPT: {concept}
|
|
|
|
RETURN JSON with these keys:
|
|
- title: Suggested book title
|
|
- genre: Genre
|
|
- target_audience: e.g. Adult, YA
|
|
- tone: e.g. Dark, Whimsical
|
|
- length_category: One of ["01", "1", "2", "2b", "3", "4", "5"] based on likely depth.
|
|
- estimated_chapters: int (suggested chapter count)
|
|
- estimated_word_count: string (e.g. "75,000")
|
|
- include_prologue: boolean
|
|
- include_epilogue: boolean
|
|
- tropes: list of strings
|
|
- pov_style: e.g. First Person
|
|
- time_period: e.g. Modern
|
|
- spice: e.g. Standard, Explicit
|
|
- violence: e.g. None, Graphic
|
|
- is_series: boolean
|
|
- series_title: string (if series)
|
|
- narrative_tense: e.g. Past, Present
|
|
- language_style: e.g. Standard, Flowery
|
|
- dialogue_style: e.g. Witty, Formal
|
|
- page_orientation: Portrait, Landscape, or Square
|
|
- formatting_rules: list of strings
|
|
- author_bio: string (suggested persona bio)
|
|
"""
|
|
|
|
suggestions = {}
|
|
try:
|
|
response = ai.model_logic.generate_content(prompt)
|
|
suggestions = json.loads(utils.clean_json(response.text))
|
|
except Exception as e:
|
|
flash(f"AI Analysis failed: {e}")
|
|
suggestions = {"title": "New Project", "genre": "Fiction"}
|
|
|
|
# Load Personas for dropdown
|
|
personas = {}
|
|
if os.path.exists(config.PERSONAS_FILE):
|
|
try:
|
|
with open(config.PERSONAS_FILE, 'r') as f: personas = json.load(f)
|
|
except: pass
|
|
|
|
return render_template('project_setup.html', s=suggestions, concept=concept, personas=personas, lengths=config.LENGTH_DEFINITIONS)
|
|
|
|
@app.route('/project/setup/refine', methods=['POST'])
|
|
@login_required
|
|
def project_setup_refine():
|
|
# Re-run the wizard logic with an instruction
|
|
concept = request.form.get('concept')
|
|
instruction = request.form.get('refine_instruction')
|
|
|
|
# Reconstruct current state from form to pass back to AI
|
|
current_state = {
|
|
"title": request.form.get('title'),
|
|
"genre": request.form.get('genre'),
|
|
"target_audience": request.form.get('audience'),
|
|
"tone": request.form.get('tone'),
|
|
# ... (capture other fields if critical, or just rely on AI re-generating from concept + instruction)
|
|
}
|
|
|
|
try: ai.init_models()
|
|
except: pass
|
|
|
|
prompt = f"""
|
|
Update these project suggestions based on the user instruction.
|
|
ORIGINAL CONCEPT: {concept}
|
|
CURRENT TITLE: {current_state['title']}
|
|
INSTRUCTION: {instruction}
|
|
|
|
RETURN JSON with the same keys as a full analysis (title, genre, length_category, etc).
|
|
"""
|
|
|
|
suggestions = {}
|
|
try:
|
|
response = ai.model_logic.generate_content(prompt)
|
|
suggestions = json.loads(utils.clean_json(response.text))
|
|
except Exception as e:
|
|
flash(f"Refinement failed: {e}")
|
|
return redirect(url_for('index')) # Fallback
|
|
|
|
personas = {}
|
|
if os.path.exists(config.PERSONAS_FILE):
|
|
try:
|
|
with open(config.PERSONAS_FILE, 'r') as f: personas = json.load(f)
|
|
except: pass
|
|
|
|
return render_template('project_setup.html', s=suggestions, concept=concept, personas=personas, lengths=config.LENGTH_DEFINITIONS)
|
|
|
|
@app.route('/project/create', methods=['POST'])
|
|
@login_required
|
|
def create_project_final():
|
|
title = request.form.get('title')
|
|
safe_title = utils.sanitize_filename(title)
|
|
|
|
user_dir = os.path.join(config.DATA_DIR, "users", str(current_user.id))
|
|
os.makedirs(user_dir, exist_ok=True)
|
|
|
|
proj_path = os.path.join(user_dir, safe_title)
|
|
if os.path.exists(proj_path):
|
|
safe_title += f"_{int(datetime.utcnow().timestamp())}"
|
|
proj_path = os.path.join(user_dir, safe_title)
|
|
os.makedirs(proj_path, exist_ok=True)
|
|
|
|
# Construct Bible from Form Data
|
|
length_cat = request.form.get('length_category')
|
|
len_def = config.LENGTH_DEFINITIONS.get(length_cat, config.LENGTH_DEFINITIONS['4']).copy()
|
|
|
|
# Overrides
|
|
try: len_def['chapters'] = int(request.form.get('chapters'))
|
|
except: pass
|
|
len_def['words'] = request.form.get('words')
|
|
len_def['include_prologue'] = 'include_prologue' in request.form
|
|
len_def['include_epilogue'] = 'include_epilogue' in request.form
|
|
|
|
is_series = 'is_series' in request.form
|
|
|
|
style = {
|
|
"tone": request.form.get('tone'),
|
|
"pov_style": request.form.get('pov_style'),
|
|
"time_period": request.form.get('time_period'),
|
|
"spice": request.form.get('spice'),
|
|
"violence": request.form.get('violence'),
|
|
"narrative_tense": request.form.get('narrative_tense'),
|
|
"language_style": request.form.get('language_style'),
|
|
"dialogue_style": request.form.get('dialogue_style'),
|
|
"page_orientation": request.form.get('page_orientation'),
|
|
"tropes": [x.strip() for x in request.form.get('tropes', '').split(',') if x.strip()],
|
|
"formatting_rules": [x.strip() for x in request.form.get('formatting_rules', '').split(',') if x.strip()]
|
|
}
|
|
|
|
bible = {
|
|
"project_metadata": {
|
|
"title": title,
|
|
"author": request.form.get('author'),
|
|
"author_bio": request.form.get('author_bio'),
|
|
"genre": request.form.get('genre'),
|
|
"target_audience": request.form.get('audience'),
|
|
"is_series": is_series,
|
|
"length_settings": len_def,
|
|
"style": style
|
|
},
|
|
"books": [],
|
|
"characters": []
|
|
}
|
|
|
|
# Create Books
|
|
count = 1
|
|
if is_series:
|
|
try: count = int(request.form.get('series_count', 1))
|
|
except: count = 3
|
|
|
|
concept = request.form.get('concept', '')
|
|
|
|
for i in range(count):
|
|
bible['books'].append({
|
|
"book_number": i+1,
|
|
"title": f"{title} - Book {i+1}" if is_series else title,
|
|
"manual_instruction": concept if i==0 else "",
|
|
"plot_beats": []
|
|
})
|
|
|
|
# Enrich via AI immediately (Always, to ensure Bible is full)
|
|
try:
|
|
ai.init_models()
|
|
bible = story.enrich(bible, proj_path)
|
|
except: pass
|
|
|
|
with open(os.path.join(proj_path, "bible.json"), 'w') as f:
|
|
json.dump(bible, f, indent=2)
|
|
|
|
new_proj = Project(user_id=current_user.id, name=title, folder_path=proj_path)
|
|
db.session.add(new_proj)
|
|
db.session.commit()
|
|
|
|
return redirect(url_for('view_project', id=new_proj.id))
|
|
|
|
@app.route('/project/import', methods=['POST'])
|
|
@login_required
|
|
def import_project():
|
|
if 'bible_file' not in request.files:
|
|
flash('No file part')
|
|
return redirect(url_for('index'))
|
|
|
|
file = request.files['bible_file']
|
|
if file.filename == '':
|
|
flash('No selected file')
|
|
return redirect(url_for('index'))
|
|
|
|
if file:
|
|
try:
|
|
bible = json.load(file)
|
|
# Basic validation
|
|
if 'project_metadata' not in bible or 'title' not in bible['project_metadata']:
|
|
flash("Invalid Bible format: Missing project_metadata or title.")
|
|
return redirect(url_for('index'))
|
|
|
|
title = bible['project_metadata']['title']
|
|
safe_title = utils.sanitize_filename(title)
|
|
|
|
user_dir = os.path.join(config.DATA_DIR, "users", str(current_user.id))
|
|
os.makedirs(user_dir, exist_ok=True)
|
|
|
|
proj_path = os.path.join(user_dir, safe_title)
|
|
if os.path.exists(proj_path):
|
|
safe_title += f"_{int(datetime.utcnow().timestamp())}"
|
|
proj_path = os.path.join(user_dir, safe_title)
|
|
os.makedirs(proj_path)
|
|
|
|
with open(os.path.join(proj_path, "bible.json"), 'w') as f:
|
|
json.dump(bible, f, indent=2)
|
|
|
|
new_proj = Project(user_id=current_user.id, name=title, folder_path=proj_path)
|
|
db.session.add(new_proj)
|
|
db.session.commit()
|
|
|
|
flash(f"Project '{title}' imported successfully.")
|
|
return redirect(url_for('view_project', id=new_proj.id))
|
|
|
|
except Exception as e:
|
|
flash(f"Import failed: {str(e)}")
|
|
return redirect(url_for('index'))
|
|
|
|
@app.route('/project/<int:id>')
|
|
@login_required
|
|
def view_project(id):
|
|
proj = db.session.get(Project, id)
|
|
if not proj: return "Project not found", 404
|
|
|
|
if proj.user_id != current_user.id: return "Unauthorized", 403
|
|
|
|
# Load Bible
|
|
bible_path = os.path.join(proj.folder_path, "bible.json")
|
|
bible_data = utils.load_json(bible_path)
|
|
|
|
# Load Personas for dropdown
|
|
personas = {}
|
|
if os.path.exists(config.PERSONAS_FILE):
|
|
try:
|
|
with open(config.PERSONAS_FILE, 'r') as f: personas = json.load(f)
|
|
except: pass
|
|
|
|
runs = Run.query.filter_by(project_id=id).order_by(Run.id.desc()).all()
|
|
latest_run = runs[0] if runs else None
|
|
|
|
# Fetch other projects for "Related Series" import
|
|
other_projects = Project.query.filter(Project.user_id == current_user.id, Project.id != id).all()
|
|
|
|
artifacts = []
|
|
cover_image = None
|
|
generated_books = {} # Map book_number -> {status: 'generated', run_id: int, folder: str}
|
|
locked = is_project_locked(id)
|
|
|
|
# Scan ALL completed runs to find the latest status of each book
|
|
for r in runs:
|
|
if r.status == 'completed':
|
|
run_dir = os.path.join(proj.folder_path, "runs", f"run_{r.id}")
|
|
if os.path.exists(run_dir):
|
|
# 1. Scan for Generated Books
|
|
for d in os.listdir(run_dir):
|
|
if d.startswith("Book_") and os.path.isdir(os.path.join(run_dir, d)):
|
|
# Check for manuscript to confirm generation
|
|
if os.path.exists(os.path.join(run_dir, d, "manuscript.json")):
|
|
try:
|
|
parts = d.split('_')
|
|
if len(parts) > 1 and parts[1].isdigit():
|
|
b_num = int(parts[1])
|
|
# Only add if we haven't found a newer version (runs are ordered desc)
|
|
if b_num not in generated_books:
|
|
# Find artifacts for direct download link
|
|
book_path = os.path.join(run_dir, d)
|
|
epub_file = next((f for f in os.listdir(book_path) if f.endswith('.epub')), None)
|
|
docx_file = next((f for f in os.listdir(book_path) if f.endswith('.docx')), None)
|
|
|
|
generated_books[b_num] = {'status': 'generated', 'run_id': r.id, 'folder': d, 'epub': os.path.join(d, epub_file).replace("\\", "/") if epub_file else None, 'docx': os.path.join(d, docx_file).replace("\\", "/") if docx_file else None}
|
|
except: pass
|
|
|
|
# Collect Artifacts from Latest Run
|
|
if latest_run:
|
|
run_dir = os.path.join(proj.folder_path, "runs", f"run_{latest_run.id}")
|
|
if os.path.exists(run_dir):
|
|
# Find Cover Image (Root or First Book)
|
|
if os.path.exists(os.path.join(run_dir, "cover.png")):
|
|
cover_image = "cover.png"
|
|
else:
|
|
subdirs = utils.get_sorted_book_folders(run_dir)
|
|
for d in subdirs:
|
|
if os.path.exists(os.path.join(run_dir, d, "cover.png")):
|
|
cover_image = os.path.join(d, "cover.png").replace("\\", "/")
|
|
break
|
|
|
|
for root, dirs, files in os.walk(run_dir):
|
|
for f in files:
|
|
if f.lower().endswith(('.epub', '.docx')):
|
|
rel_path = os.path.relpath(os.path.join(root, f), run_dir)
|
|
artifacts.append({
|
|
'name': f,
|
|
'path': rel_path.replace("\\", "/"),
|
|
'type': f.split('.')[-1].upper()
|
|
})
|
|
|
|
return render_template('project.html', project=proj, bible=bible_data, runs=runs, active_run=latest_run, artifacts=artifacts, cover_image=cover_image, personas=personas, generated_books=generated_books, other_projects=other_projects, locked=locked)
|
|
|
|
@app.route('/project/<int:id>/run', methods=['POST'])
|
|
@login_required
|
|
def run_project(id):
|
|
proj = db.session.get(Project, id) or Project.query.get_or_404(id)
|
|
|
|
# Create Run Entry
|
|
new_run = Run(project_id=id, status="queued")
|
|
db.session.add(new_run)
|
|
db.session.commit()
|
|
|
|
# Trigger Background Task
|
|
bible_path = os.path.join(proj.folder_path, "bible.json")
|
|
task = generate_book_task(new_run.id, proj.folder_path, bible_path, allow_copy=True)
|
|
|
|
return redirect(url_for('view_project', id=id))
|
|
|
|
@app.route('/project/<int:id>/review')
|
|
@login_required
|
|
def review_project(id):
|
|
proj = db.session.get(Project, id) or Project.query.get_or_404(id)
|
|
if proj.user_id != current_user.id: return "Unauthorized", 403
|
|
|
|
bible_path = os.path.join(proj.folder_path, "bible.json")
|
|
bible = utils.load_json(bible_path)
|
|
|
|
return render_template('project_review.html', project=proj, bible=bible)
|
|
|
|
@app.route('/project/<int:id>/update', methods=['POST'])
|
|
@login_required
|
|
def update_project_metadata(id):
|
|
proj = db.session.get(Project, id) or Project.query.get_or_404(id)
|
|
if proj.user_id != current_user.id: return "Unauthorized", 403
|
|
|
|
if is_project_locked(id):
|
|
flash("Project is locked. Clone it to make changes.")
|
|
return redirect(url_for('view_project', id=id))
|
|
|
|
new_title = request.form.get('title')
|
|
new_author = request.form.get('author')
|
|
|
|
bible_path = os.path.join(proj.folder_path, "bible.json")
|
|
bible = utils.load_json(bible_path)
|
|
|
|
if bible:
|
|
if new_title:
|
|
bible['project_metadata']['title'] = new_title
|
|
proj.name = new_title
|
|
if new_author:
|
|
bible['project_metadata']['author'] = new_author
|
|
|
|
with open(bible_path, 'w') as f: json.dump(bible, f, indent=2)
|
|
db.session.commit()
|
|
|
|
return redirect(url_for('view_project', id=id))
|
|
|
|
@app.route('/project/<int:id>/clone', methods=['POST'])
|
|
@login_required
|
|
def clone_project(id):
|
|
source_proj = db.session.get(Project, id) or Project.query.get_or_404(id)
|
|
if source_proj.user_id != current_user.id: return "Unauthorized", 403
|
|
|
|
new_name = request.form.get('new_name')
|
|
instruction = request.form.get('instruction')
|
|
|
|
# Create New Project
|
|
safe_title = utils.sanitize_filename(new_name)
|
|
user_dir = os.path.join(config.DATA_DIR, "users", str(current_user.id))
|
|
new_path = os.path.join(user_dir, safe_title)
|
|
if os.path.exists(new_path):
|
|
safe_title += f"_{int(datetime.utcnow().timestamp())}"
|
|
new_path = os.path.join(user_dir, safe_title)
|
|
os.makedirs(new_path)
|
|
|
|
# Copy Bible
|
|
source_bible_path = os.path.join(source_proj.folder_path, "bible.json")
|
|
if os.path.exists(source_bible_path):
|
|
bible = utils.load_json(source_bible_path)
|
|
bible['project_metadata']['title'] = new_name
|
|
|
|
# Apply AI Instruction if provided
|
|
if instruction:
|
|
try:
|
|
ai.init_models()
|
|
bible = story.refine_bible(bible, instruction, new_path) or bible
|
|
except: pass
|
|
|
|
with open(os.path.join(new_path, "bible.json"), 'w') as f: json.dump(bible, f, indent=2)
|
|
|
|
new_proj = Project(user_id=current_user.id, name=new_name, folder_path=new_path)
|
|
db.session.add(new_proj)
|
|
db.session.commit()
|
|
|
|
flash(f"Project cloned as '{new_name}'.")
|
|
return redirect(url_for('view_project', id=new_proj.id))
|
|
|
|
@app.route('/project/<int:id>/refine_bible', methods=['POST'])
|
|
@login_required
|
|
def refine_bible_route(id):
|
|
proj = db.session.get(Project, id) or Project.query.get_or_404(id)
|
|
if proj.user_id != current_user.id: return "Unauthorized", 403
|
|
|
|
if is_project_locked(id):
|
|
flash("Project is locked. Clone it to make changes.")
|
|
return redirect(url_for('view_project', id=id))
|
|
|
|
instruction = request.form.get('instruction')
|
|
if not instruction:
|
|
flash("Instruction required.")
|
|
return redirect(url_for('view_project', id=id))
|
|
|
|
bible_path = os.path.join(proj.folder_path, "bible.json")
|
|
bible = utils.load_json(bible_path)
|
|
|
|
if bible:
|
|
try: ai.init_models()
|
|
except: pass
|
|
|
|
if not ai.model_logic:
|
|
flash("AI models not initialized.")
|
|
return redirect(url_for('view_project', id=id))
|
|
|
|
new_bible = story.refine_bible(bible, instruction, proj.folder_path)
|
|
|
|
if new_bible:
|
|
with open(bible_path, 'w') as f: json.dump(new_bible, f, indent=2)
|
|
flash("Bible updated successfully.")
|
|
else:
|
|
flash("AI failed to update Bible. Check logs.")
|
|
|
|
return redirect(url_for('view_project', id=id))
|
|
|
|
@app.route('/project/<int:id>/add_book', methods=['POST'])
|
|
@login_required
|
|
def add_book(id):
|
|
proj = db.session.get(Project, id) or Project.query.get_or_404(id)
|
|
if proj.user_id != current_user.id: return "Unauthorized", 403
|
|
|
|
if is_project_locked(id):
|
|
flash("Project is locked. Clone it to make changes.")
|
|
return redirect(url_for('view_project', id=id))
|
|
|
|
title = request.form.get('title', 'Untitled')
|
|
instruction = request.form.get('instruction', '')
|
|
|
|
bible_path = os.path.join(proj.folder_path, "bible.json")
|
|
bible = utils.load_json(bible_path)
|
|
|
|
if bible:
|
|
if 'books' not in bible: bible['books'] = []
|
|
next_num = len(bible['books']) + 1
|
|
|
|
new_book = {
|
|
"book_number": next_num,
|
|
"title": title,
|
|
"manual_instruction": instruction,
|
|
"plot_beats": [] # AI will fill this if empty during run
|
|
}
|
|
bible['books'].append(new_book)
|
|
|
|
# If series metadata isn't set, set it
|
|
if 'project_metadata' in bible:
|
|
bible['project_metadata']['is_series'] = True
|
|
|
|
with open(bible_path, 'w') as f: json.dump(bible, f, indent=2)
|
|
flash(f"Added Book {next_num}: {title}")
|
|
|
|
return redirect(url_for('view_project', id=id))
|
|
|
|
@app.route('/project/<int:id>/book/<int:book_num>/update', methods=['POST'])
|
|
@login_required
|
|
def update_book_details(id, book_num):
|
|
proj = db.session.get(Project, id) or Project.query.get_or_404(id)
|
|
if proj.user_id != current_user.id: return "Unauthorized", 403
|
|
|
|
if is_project_locked(id):
|
|
flash("Project is locked. Clone it to make changes.")
|
|
return redirect(url_for('view_project', id=id))
|
|
|
|
new_title = request.form.get('title')
|
|
new_instruction = request.form.get('instruction')
|
|
|
|
bible_path = os.path.join(proj.folder_path, "bible.json")
|
|
bible = utils.load_json(bible_path)
|
|
|
|
if bible and 'books' in bible:
|
|
for b in bible['books']:
|
|
if b.get('book_number') == book_num:
|
|
if new_title: b['title'] = new_title
|
|
if new_instruction is not None: b['manual_instruction'] = new_instruction
|
|
break
|
|
with open(bible_path, 'w') as f: json.dump(bible, f, indent=2)
|
|
flash(f"Book {book_num} updated.")
|
|
|
|
return redirect(url_for('view_project', id=id))
|
|
|
|
@app.route('/project/<int:id>/delete_book/<int:book_num>', methods=['POST'])
|
|
@login_required
|
|
def delete_book(id, book_num):
|
|
proj = db.session.get(Project, id) or Project.query.get_or_404(id)
|
|
if proj.user_id != current_user.id: return "Unauthorized", 403
|
|
|
|
if is_project_locked(id):
|
|
flash("Project is locked. Clone it to make changes.")
|
|
return redirect(url_for('view_project', id=id))
|
|
|
|
bible_path = os.path.join(proj.folder_path, "bible.json")
|
|
bible = utils.load_json(bible_path)
|
|
|
|
if bible and 'books' in bible:
|
|
bible['books'] = [b for b in bible['books'] if b.get('book_number') != book_num]
|
|
# Renumber
|
|
for i, b in enumerate(bible['books']):
|
|
b['book_number'] = i + 1
|
|
|
|
# Update Series Status (Revert to standalone if only 1 book remains)
|
|
if 'project_metadata' in bible:
|
|
bible['project_metadata']['is_series'] = (len(bible['books']) > 1)
|
|
|
|
with open(bible_path, 'w') as f: json.dump(bible, f, indent=2)
|
|
flash("Book deleted from plan.")
|
|
|
|
return redirect(url_for('view_project', id=id))
|
|
|
|
@app.route('/project/<int:id>/import_characters', methods=['POST'])
|
|
@login_required
|
|
def import_characters(id):
|
|
target_proj = db.session.get(Project, id)
|
|
source_id = request.form.get('source_project_id')
|
|
source_proj = db.session.get(Project, source_id)
|
|
|
|
if not target_proj or not source_proj: return "Project not found", 404
|
|
if target_proj.user_id != current_user.id or source_proj.user_id != current_user.id: return "Unauthorized", 403
|
|
|
|
if is_project_locked(id):
|
|
flash("Project is locked. Clone it to make changes.")
|
|
return redirect(url_for('view_project', id=id))
|
|
|
|
target_bible = utils.load_json(os.path.join(target_proj.folder_path, "bible.json"))
|
|
source_bible = utils.load_json(os.path.join(source_proj.folder_path, "bible.json"))
|
|
|
|
if target_bible and source_bible:
|
|
existing_names = {c['name'].lower() for c in target_bible.get('characters', [])}
|
|
added_count = 0
|
|
|
|
for char in source_bible.get('characters', []):
|
|
if char['name'].lower() not in existing_names:
|
|
target_bible['characters'].append(char)
|
|
added_count += 1
|
|
|
|
if added_count > 0:
|
|
with open(os.path.join(target_proj.folder_path, "bible.json"), 'w') as f:
|
|
json.dump(target_bible, f, indent=2)
|
|
flash(f"Imported {added_count} characters from {source_proj.name}.")
|
|
else:
|
|
flash("No new characters found to import.")
|
|
|
|
return redirect(url_for('view_project', id=id))
|
|
|
|
@app.route('/project/<int:id>/set_persona', methods=['POST'])
|
|
@login_required
|
|
def set_project_persona(id):
|
|
proj = db.session.get(Project, id) or Project.query.get_or_404(id)
|
|
if proj.user_id != current_user.id: return "Unauthorized", 403
|
|
|
|
if is_project_locked(id):
|
|
flash("Project is locked. Clone it to make changes.")
|
|
return redirect(url_for('view_project', id=id))
|
|
|
|
persona_name = request.form.get('persona_name')
|
|
|
|
bible_path = os.path.join(proj.folder_path, "bible.json")
|
|
bible = utils.load_json(bible_path)
|
|
|
|
if bible:
|
|
personas = {}
|
|
if os.path.exists(config.PERSONAS_FILE):
|
|
try:
|
|
with open(config.PERSONAS_FILE, 'r') as f: personas = json.load(f)
|
|
except: pass
|
|
|
|
if persona_name in personas:
|
|
bible['project_metadata']['author_details'] = personas[persona_name]
|
|
with open(bible_path, 'w') as f: json.dump(bible, f, indent=2)
|
|
flash(f"Project voice updated to persona: {persona_name}")
|
|
else:
|
|
flash("Persona not found.")
|
|
|
|
return redirect(url_for('view_project', id=id))
|
|
|
|
@app.route('/project/<int:run_id>/regenerate_artifacts', methods=['POST'])
|
|
@login_required
|
|
def regenerate_artifacts(run_id):
|
|
run = db.session.get(Run, run_id) or Run.query.get_or_404(run_id)
|
|
if run.project.user_id != current_user.id: return "Unauthorized", 403
|
|
|
|
if run.status == 'running':
|
|
flash("Run is already active. Please wait for it to finish.")
|
|
return redirect(url_for('view_run', id=run_id))
|
|
|
|
feedback = request.form.get('feedback')
|
|
|
|
# Reset state immediately so UI polls correctly
|
|
run.status = 'queued'
|
|
db.session.commit()
|
|
|
|
regenerate_artifacts_task(run_id, run.project.folder_path, feedback=feedback)
|
|
flash("Regenerating cover and files with updated metadata...")
|
|
return redirect(url_for('view_run', id=run_id))
|
|
|
|
@app.route('/run/<int:id>/stop', methods=['POST'])
|
|
@login_required
|
|
def stop_run(id):
|
|
run = db.session.get(Run, id) or Run.query.get_or_404(id)
|
|
if run.project.user_id != current_user.id: return "Unauthorized", 403
|
|
|
|
if run.status in ['queued', 'running']:
|
|
run.status = 'cancelled'
|
|
run.end_time = datetime.utcnow()
|
|
db.session.commit()
|
|
|
|
# Signal the backend process to stop by creating a .stop file
|
|
run_dir = os.path.join(run.project.folder_path, "runs", f"run_{run.id}")
|
|
if os.path.exists(run_dir):
|
|
with open(os.path.join(run_dir, ".stop"), 'w') as f: f.write("stop")
|
|
|
|
flash(f"Run {id} marked as cancelled.")
|
|
|
|
return redirect(url_for('view_project', id=run.project_id))
|
|
|
|
@app.route('/run/<int:id>/restart', methods=['POST'])
|
|
@login_required
|
|
def restart_run(id):
|
|
run = db.session.get(Run, id) or Run.query.get_or_404(id)
|
|
if run.project.user_id != current_user.id: return "Unauthorized", 403
|
|
|
|
# Create a new run
|
|
new_run = Run(project_id=run.project_id, status="queued")
|
|
db.session.add(new_run)
|
|
db.session.commit()
|
|
|
|
# Check mode: 'resume' (default) vs 'restart'
|
|
mode = request.form.get('mode', 'resume')
|
|
feedback = request.form.get('feedback')
|
|
keep_cover = 'keep_cover' in request.form
|
|
force_regen = 'force_regenerate' in request.form
|
|
allow_copy = (mode == 'resume' and not force_regen)
|
|
if feedback: allow_copy = False # Force regeneration if feedback provided to ensure changes are applied
|
|
|
|
task = generate_book_task(new_run.id, run.project.folder_path, os.path.join(run.project.folder_path, "bible.json"), allow_copy=allow_copy, feedback=feedback, source_run_id=id if feedback else None, keep_cover=keep_cover)
|
|
flash(f"Started new Run #{new_run.id}" + (" with modifications." if feedback else "."))
|
|
return redirect(url_for('view_project', id=run.project_id))
|
|
|
|
@app.route('/run/<int:id>')
|
|
@login_required
|
|
def view_run(id):
|
|
run = db.session.get(Run, id)
|
|
if not run: return "Run not found", 404
|
|
|
|
if run.project.user_id != current_user.id: return "Unauthorized", 403
|
|
|
|
# Fetch logs for initial render
|
|
log_content = ""
|
|
logs = LogEntry.query.filter_by(run_id=id).order_by(LogEntry.timestamp).all()
|
|
if logs:
|
|
log_content = "\n".join([f"[{l.timestamp.strftime('%H:%M:%S')}] {l.phase:<15} | {l.message}" for l in logs])
|
|
elif run.log_file and os.path.exists(run.log_file):
|
|
with open(run.log_file, 'r') as f: log_content = f.read()
|
|
|
|
# Fetch Artifacts for Display
|
|
run_dir = os.path.join(run.project.folder_path, "runs", f"run_{run.id}")
|
|
|
|
# Detect Books in Run (Series Support)
|
|
books_data = []
|
|
if os.path.exists(run_dir):
|
|
subdirs = utils.get_sorted_book_folders(run_dir)
|
|
|
|
for d in subdirs:
|
|
b_path = os.path.join(run_dir, d)
|
|
b_info = {'folder': d, 'artifacts': [], 'cover': None, 'blurb': ''}
|
|
|
|
# Artifacts
|
|
for f in os.listdir(b_path):
|
|
if f.lower().endswith(('.epub', '.docx')):
|
|
b_info['artifacts'].append({'name': f, 'path': os.path.join(d, f).replace("\\", "/")})
|
|
|
|
# Cover
|
|
if os.path.exists(os.path.join(b_path, "cover.png")):
|
|
b_info['cover'] = os.path.join(d, "cover.png").replace("\\", "/")
|
|
|
|
# Blurb
|
|
blurb_p = os.path.join(b_path, "blurb.txt")
|
|
if os.path.exists(blurb_p):
|
|
with open(blurb_p, 'r', encoding='utf-8', errors='ignore') as f: b_info['blurb'] = f.read()
|
|
|
|
books_data.append(b_info)
|
|
|
|
# Load Bible Data for Dropdown
|
|
bible_path = os.path.join(run.project.folder_path, "bible.json")
|
|
bible_data = utils.load_json(bible_path)
|
|
|
|
# Load Tracking Data for Run Details
|
|
tracking = {"events": [], "characters": {}, "content_warnings": []}
|
|
# We load tracking from the LAST book found to populate the general stats (most up-to-date)
|
|
book_dir = os.path.join(run_dir, books_data[-1]['folder']) if books_data else run_dir
|
|
if os.path.exists(book_dir):
|
|
t_ev = os.path.join(book_dir, "tracking_events.json")
|
|
t_ch = os.path.join(book_dir, "tracking_characters.json")
|
|
t_wn = os.path.join(book_dir, "tracking_warnings.json")
|
|
|
|
# Load safely, defaulting to empty structures if load_json returns None
|
|
if os.path.exists(t_ev): tracking['events'] = utils.load_json(t_ev) or []
|
|
if os.path.exists(t_ch): tracking['characters'] = utils.load_json(t_ch) or {}
|
|
if os.path.exists(t_wn): tracking['content_warnings'] = utils.load_json(t_wn) or []
|
|
|
|
# Use dedicated run details template
|
|
return render_template('run_details.html', run=run, log_content=log_content, books=books_data, bible=bible_data, tracking=tracking)
|
|
|
|
@app.route('/run/<int:id>/status')
|
|
@login_required
|
|
def run_status(id):
|
|
run = db.session.get(Run, id) or Run.query.get_or_404(id)
|
|
|
|
# Check status from DB or fallback to log file
|
|
|
|
log_content = ""
|
|
last_log = None
|
|
|
|
# 1. Try Database Logs (Fastest & Best)
|
|
logs = LogEntry.query.filter_by(run_id=id).order_by(LogEntry.timestamp).all()
|
|
if logs:
|
|
log_content = "\n".join([f"[{l.timestamp.strftime('%H:%M:%S')}] {l.phase:<15} | {l.message}" for l in logs])
|
|
last_log = logs[-1]
|
|
|
|
# 2. Fallback to File (For old runs or if DB logging fails)
|
|
if not log_content:
|
|
if run.log_file and os.path.exists(run.log_file):
|
|
with open(run.log_file, 'r') as f: log_content = f.read()
|
|
elif run.status in ['queued', 'running']:
|
|
temp_log = os.path.join(run.project.folder_path, f"system_log_{run.id}.txt")
|
|
if os.path.exists(temp_log):
|
|
with open(temp_log, 'r') as f: log_content = f.read()
|
|
|
|
response = {"status": run.status, "log": log_content, "cost": run.cost, "percent": run.progress}
|
|
|
|
if last_log:
|
|
response["progress"] = {
|
|
"phase": last_log.phase,
|
|
"message": last_log.message,
|
|
"timestamp": last_log.timestamp.timestamp()
|
|
}
|
|
return response
|
|
|
|
@app.route('/project/<int:run_id>/download')
|
|
@login_required
|
|
def download_artifact(run_id):
|
|
filename = request.args.get('file')
|
|
run = db.session.get(Run, run_id) or Run.query.get_or_404(run_id)
|
|
if run.project.user_id != current_user.id: return "Unauthorized", 403
|
|
|
|
if not filename: return "Missing filename", 400
|
|
|
|
# Security Check: Prevent path traversal
|
|
# Combined check using normpath to ensure it stays within root and catches basic traversal chars
|
|
if os.path.isabs(filename) or ".." in os.path.normpath(filename) or ":" in filename:
|
|
return "Invalid filename", 400
|
|
|
|
run_dir = os.path.join(run.project.folder_path, "runs", f"run_{run.id}")
|
|
|
|
# If file not found in root, check subfolders (Series Support)
|
|
if not os.path.exists(os.path.join(run_dir, filename)) and os.path.exists(run_dir):
|
|
subdirs = utils.get_sorted_book_folders(run_dir)
|
|
# Scan all book folders
|
|
for d in subdirs:
|
|
possible_path = os.path.join(d, filename)
|
|
if os.path.exists(os.path.join(run_dir, possible_path)):
|
|
filename = possible_path
|
|
break
|
|
|
|
return send_from_directory(run_dir, filename, as_attachment=True)
|
|
|
|
@app.route('/project/<int:run_id>/read/<string:book_folder>')
|
|
@login_required
|
|
def read_book(run_id, book_folder):
|
|
run = db.session.get(Run, run_id) or Run.query.get_or_404(run_id)
|
|
if run.project.user_id != current_user.id: return "Unauthorized", 403
|
|
|
|
# Security Check: Prevent path traversal in book_folder
|
|
if not book_folder or "/" in book_folder or "\\" in book_folder or ".." in book_folder: return "Invalid book folder", 400
|
|
|
|
run_dir = os.path.join(run.project.folder_path, "runs", f"run_{run.id}")
|
|
book_path = os.path.join(run_dir, book_folder)
|
|
ms_path = os.path.join(book_path, "manuscript.json")
|
|
|
|
if not os.path.exists(ms_path):
|
|
flash("Manuscript not found.")
|
|
return redirect(url_for('view_run', id=run_id))
|
|
|
|
manuscript = utils.load_json(ms_path)
|
|
|
|
# Sort by chapter number (Handle Prologue/Epilogue)
|
|
manuscript.sort(key=utils.chapter_sort_key)
|
|
|
|
# Render Markdown for display
|
|
for ch in manuscript:
|
|
ch['html_content'] = markdown.markdown(ch.get('content', ''))
|
|
|
|
return render_template('read_book.html', run=run, book_folder=book_folder, manuscript=manuscript)
|
|
|
|
@app.route('/project/<int:run_id>/save_chapter', methods=['POST'])
|
|
@login_required
|
|
def save_chapter(run_id):
|
|
run = db.session.get(Run, run_id) or Run.query.get_or_404(run_id)
|
|
if run.project.user_id != current_user.id: return "Unauthorized", 403
|
|
|
|
if run.status == 'running':
|
|
return "Cannot edit chapter while run is active.", 409
|
|
|
|
book_folder = request.form.get('book_folder')
|
|
chap_num_raw = request.form.get('chapter_num')
|
|
try: chap_num = int(chap_num_raw)
|
|
except: chap_num = chap_num_raw
|
|
new_content = request.form.get('content')
|
|
|
|
# Security Check
|
|
if not book_folder or "/" in book_folder or "\\" in book_folder or ".." in book_folder: return "Invalid book folder", 400
|
|
|
|
run_dir = os.path.join(run.project.folder_path, "runs", f"run_{run.id}")
|
|
ms_path = os.path.join(run_dir, book_folder, "manuscript.json")
|
|
|
|
if os.path.exists(ms_path):
|
|
ms = utils.load_json(ms_path)
|
|
for ch in ms:
|
|
if ch.get('num') == chap_num:
|
|
ch['content'] = new_content
|
|
break
|
|
with open(ms_path, 'w') as f: json.dump(ms, f, indent=2)
|
|
|
|
# Regenerate Artifacts (EPUB/DOCX) to reflect manual edits
|
|
book_path = os.path.join(run_dir, book_folder)
|
|
bp_path = os.path.join(book_path, "final_blueprint.json")
|
|
if os.path.exists(bp_path):
|
|
bp = utils.load_json(bp_path)
|
|
export.compile_files(bp, ms, book_path)
|
|
|
|
return "Saved", 200
|
|
return "Error", 500
|
|
|
|
@app.route('/project/<int:run_id>/check_consistency/<string:book_folder>')
|
|
@login_required
|
|
def check_consistency(run_id, book_folder):
|
|
run = db.session.get(Run, run_id) or Run.query.get_or_404(run_id)
|
|
|
|
# Security Check
|
|
if not book_folder or "/" in book_folder or "\\" in book_folder or ".." in book_folder: return "Invalid book folder", 400
|
|
|
|
run_dir = os.path.join(run.project.folder_path, "runs", f"run_{run.id}")
|
|
book_path = os.path.join(run_dir, book_folder)
|
|
|
|
bp = utils.load_json(os.path.join(book_path, "final_blueprint.json"))
|
|
ms = utils.load_json(os.path.join(book_path, "manuscript.json"))
|
|
|
|
if not bp or not ms:
|
|
return "Data files missing or corrupt.", 404
|
|
|
|
try: ai.init_models()
|
|
except: pass
|
|
|
|
report = story.analyze_consistency(bp, ms, book_path)
|
|
return render_template('consistency_report.html', report=report, run=run, book_folder=book_folder)
|
|
|
|
@app.route('/project/<int:run_id>/sync_book/<string:book_folder>', methods=['POST'])
|
|
@login_required
|
|
def sync_book_metadata(run_id, book_folder):
|
|
run = db.session.get(Run, run_id) or Run.query.get_or_404(run_id)
|
|
if run.project.user_id != current_user.id: return "Unauthorized", 403
|
|
|
|
if run.status == 'running':
|
|
flash("Cannot sync metadata while run is active.")
|
|
return redirect(url_for('read_book', run_id=run_id, book_folder=book_folder))
|
|
|
|
# Security Check
|
|
if not book_folder or "/" in book_folder or "\\" in book_folder or ".." in book_folder: return "Invalid book folder", 400
|
|
|
|
run_dir = os.path.join(run.project.folder_path, "runs", f"run_{run.id}")
|
|
book_path = os.path.join(run_dir, book_folder)
|
|
|
|
ms_path = os.path.join(book_path, "manuscript.json")
|
|
bp_path = os.path.join(book_path, "final_blueprint.json")
|
|
|
|
if os.path.exists(ms_path) and os.path.exists(bp_path):
|
|
ms = utils.load_json(ms_path)
|
|
bp = utils.load_json(bp_path)
|
|
|
|
if not ms or not bp:
|
|
flash("Data files corrupt.")
|
|
return redirect(url_for('read_book', run_id=run_id, book_folder=book_folder))
|
|
|
|
try: ai.init_models()
|
|
except: pass
|
|
|
|
# 1. Harvest new characters/info from the EDITED text (Updates BP)
|
|
bp = story.harvest_metadata(bp, book_path, ms)
|
|
|
|
# 2. Sync Tracking (Ensure new characters exist in tracking file)
|
|
tracking_path = os.path.join(book_path, "tracking_characters.json")
|
|
if os.path.exists(tracking_path):
|
|
tracking_chars = utils.load_json(tracking_path) or {}
|
|
updated_tracking = False
|
|
for c in bp.get('characters', []):
|
|
if c.get('name') and c['name'] not in tracking_chars:
|
|
tracking_chars[c['name']] = {"descriptors": [c.get('description', '')], "likes_dislikes": [], "last_worn": "Unknown"}
|
|
updated_tracking = True
|
|
if updated_tracking:
|
|
with open(tracking_path, 'w') as f: json.dump(tracking_chars, f, indent=2)
|
|
|
|
# 3. Update Persona (Style might have changed during manual edits)
|
|
story.update_persona_sample(bp, book_path)
|
|
|
|
with open(bp_path, 'w') as f: json.dump(bp, f, indent=2)
|
|
flash("Metadata synced. Future generations will respect your edits.")
|
|
else:
|
|
flash("Files not found.")
|
|
|
|
return redirect(url_for('read_book', run_id=run_id, book_folder=book_folder))
|
|
|
|
@app.route('/project/<int:run_id>/rewrite_chapter', methods=['POST'])
|
|
@login_required
|
|
def rewrite_chapter(run_id):
|
|
run = db.session.get(Run, run_id) or Run.query.get_or_404(run_id)
|
|
if run.project.user_id != current_user.id:
|
|
return {"error": "Unauthorized"}, 403
|
|
|
|
if run.status == 'running':
|
|
return {"error": "Cannot rewrite while run is active."}, 409
|
|
|
|
data = request.json
|
|
book_folder = data.get('book_folder')
|
|
chap_num = data.get('chapter_num')
|
|
instruction = data.get('instruction')
|
|
|
|
if not book_folder or chap_num is None or not instruction:
|
|
return {"error": "Missing parameters"}, 400
|
|
|
|
# Security Check
|
|
if "/" in book_folder or "\\" in book_folder or ".." in book_folder: return {"error": "Invalid book folder"}, 400
|
|
|
|
# Try to convert to int, but allow strings (e.g. "Epilogue")
|
|
try: chap_num = int(chap_num)
|
|
except: pass
|
|
|
|
# Start background task
|
|
task = rewrite_chapter_task(run.id, run.project.folder_path, book_folder, chap_num, instruction)
|
|
|
|
# Store task ID in session to poll for status
|
|
session['rewrite_task_id'] = task.id
|
|
|
|
return {"status": "queued", "task_id": task.id}, 202
|
|
|
|
@app.route('/task_status/<string:task_id>')
|
|
@login_required
|
|
def get_task_status(task_id):
|
|
task_result = huey.result(task_id, peek=True)
|
|
|
|
if task_result is None:
|
|
return {"status": "running"}
|
|
else:
|
|
return {"status": "completed", "success": task_result}
|
|
|
|
@app.route('/logout')
|
|
def logout():
|
|
logout_user()
|
|
return redirect(url_for('login'))
|
|
|
|
@app.route('/debug/routes')
|
|
@login_required
|
|
@admin_required
|
|
def debug_routes():
|
|
output = []
|
|
for rule in app.url_map.iter_rules():
|
|
methods = ','.join(rule.methods)
|
|
# Use brackets so they are visible in browser text
|
|
rule_str = str(rule).replace('<', '[').replace('>', ']')
|
|
line = "{:50s} {:20s} {}".format(rule.endpoint, methods, rule_str)
|
|
output.append(line)
|
|
return "<pre>" + "\n".join(output) + "</pre>"
|
|
|
|
@app.route('/system/optimize_models', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def optimize_models():
|
|
# Force refresh via AI module (safely handles failures)
|
|
try:
|
|
ai.init_models(force=True) # Force re-initialization and API scan
|
|
|
|
# Refresh Style Guidelines
|
|
if ai.model_logic:
|
|
story.refresh_style_guidelines(ai.model_logic)
|
|
|
|
flash("AI Models refreshed and Style Guidelines updated.")
|
|
except Exception as e:
|
|
flash(f"Error refreshing models: {e}")
|
|
|
|
return redirect(request.referrer or url_for('index'))
|
|
|
|
@app.route('/system/status')
|
|
@login_required
|
|
def system_status():
|
|
# System Status View: Show AI Models and Quotas
|
|
models_info = {}
|
|
cache_data = {}
|
|
|
|
cache_path = os.path.join(config.DATA_DIR, "model_cache.json")
|
|
if os.path.exists(cache_path):
|
|
try:
|
|
with open(cache_path, 'r') as f:
|
|
cache_data = json.load(f)
|
|
models_info = cache_data.get('models', {})
|
|
except: pass
|
|
|
|
return render_template('system_status.html', models=models_info, cache=cache_data, datetime=datetime)
|
|
|
|
@app.route('/personas')
|
|
@login_required
|
|
def list_personas():
|
|
personas = {}
|
|
if os.path.exists(config.PERSONAS_FILE):
|
|
try:
|
|
with open(config.PERSONAS_FILE, 'r') as f: personas = json.load(f)
|
|
except: pass
|
|
return render_template('personas.html', personas=personas)
|
|
|
|
@app.route('/persona/new')
|
|
@login_required
|
|
def new_persona():
|
|
return render_template('persona_edit.html', persona={}, name="")
|
|
|
|
@app.route('/persona/<string:name>')
|
|
@login_required
|
|
def edit_persona(name):
|
|
personas = {}
|
|
if os.path.exists(config.PERSONAS_FILE):
|
|
try:
|
|
with open(config.PERSONAS_FILE, 'r') as f: personas = json.load(f)
|
|
except: pass
|
|
|
|
persona = personas.get(name)
|
|
if not persona:
|
|
flash(f"Persona '{name}' not found.")
|
|
return redirect(url_for('list_personas'))
|
|
|
|
return render_template('persona_edit.html', persona=persona, name=name)
|
|
|
|
@app.route('/persona/save', methods=['POST'])
|
|
@login_required
|
|
def save_persona():
|
|
old_name = request.form.get('old_name')
|
|
name = request.form.get('name')
|
|
|
|
if not name:
|
|
flash("Persona name is required.")
|
|
return redirect(url_for('list_personas'))
|
|
|
|
personas = {}
|
|
if os.path.exists(config.PERSONAS_FILE):
|
|
try:
|
|
with open(config.PERSONAS_FILE, 'r') as f: personas = json.load(f)
|
|
except: pass
|
|
|
|
if old_name and old_name != name and old_name in personas:
|
|
del personas[old_name]
|
|
|
|
persona = {
|
|
"name": name,
|
|
"bio": request.form.get('bio'),
|
|
"age": request.form.get('age'),
|
|
"gender": request.form.get('gender'),
|
|
"race": request.form.get('race'),
|
|
"nationality": request.form.get('nationality'),
|
|
"language": request.form.get('language'),
|
|
"sample_text": request.form.get('sample_text'),
|
|
"voice_keywords": request.form.get('voice_keywords'),
|
|
"style_inspirations": request.form.get('style_inspirations')
|
|
}
|
|
|
|
personas[name] = persona
|
|
|
|
with open(config.PERSONAS_FILE, 'w') as f: json.dump(personas, f, indent=2)
|
|
|
|
flash(f"Persona '{name}' saved.")
|
|
return redirect(url_for('list_personas'))
|
|
|
|
@app.route('/persona/delete/<string:name>', methods=['POST'])
|
|
@login_required
|
|
def delete_persona(name):
|
|
personas = {}
|
|
if os.path.exists(config.PERSONAS_FILE):
|
|
try:
|
|
with open(config.PERSONAS_FILE, 'r') as f: personas = json.load(f)
|
|
except: pass
|
|
|
|
if name in personas:
|
|
del personas[name]
|
|
with open(config.PERSONAS_FILE, 'w') as f: json.dump(personas, f, indent=2)
|
|
flash(f"Persona '{name}' deleted.")
|
|
|
|
return redirect(url_for('list_personas'))
|
|
|
|
@app.route('/persona/analyze', methods=['POST'])
|
|
@login_required
|
|
def analyze_persona():
|
|
try: ai.init_models()
|
|
except: pass
|
|
|
|
if not ai.model_logic:
|
|
return {"error": "AI models not initialized."}, 500
|
|
|
|
data = request.json
|
|
sample = data.get('sample_text', '')
|
|
|
|
prompt = f"""
|
|
Act as a Literary Analyst. Create or analyze an Author Persona profile.
|
|
|
|
INPUT DATA:
|
|
Name: {data.get('name')}
|
|
Age: {data.get('age')} | Gender: {data.get('gender')} | Nationality: {data.get('nationality')}
|
|
Sample Text: {sample[:3000]}
|
|
|
|
TASK:
|
|
1. 'bio': Write a 2-3 sentence description of the writing style. If sample is provided, analyze it. If not, invent a style that fits the demographics/name.
|
|
2. 'voice_keywords': Comma-separated list of 3-5 adjectives describing the voice (e.g. Gritty, Whimsical, Sarcastic).
|
|
3. 'style_inspirations': Comma-separated list of 1-3 famous authors or genres that this style resembles.
|
|
|
|
RETURN JSON: {{ "bio": "...", "voice_keywords": "...", "style_inspirations": "..." }}
|
|
"""
|
|
try:
|
|
response = ai.model_logic.generate_content(prompt)
|
|
return json.loads(utils.clean_json(response.text))
|
|
except Exception as e:
|
|
return {"error": str(e)}, 500
|
|
|
|
# --- ADMIN ROUTES ---
|
|
|
|
@app.route('/admin')
|
|
@login_required
|
|
@admin_required
|
|
def admin_dashboard():
|
|
users = User.query.all()
|
|
projects = Project.query.all()
|
|
return render_template('admin_dashboard.html', users=users, projects=projects)
|
|
|
|
@app.route('/admin/user/<int:user_id>/delete', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def admin_delete_user(user_id):
|
|
if user_id == current_user.id:
|
|
flash("Cannot delete yourself.")
|
|
return redirect(url_for('admin_dashboard'))
|
|
|
|
user = db.session.get(User, user_id)
|
|
if user:
|
|
# Delete user data folder
|
|
user_path = os.path.join(config.DATA_DIR, "users", str(user.id))
|
|
if os.path.exists(user_path):
|
|
try: shutil.rmtree(user_path)
|
|
except: pass
|
|
|
|
# Delete projects (DB cascade handles rows, we handle files)
|
|
projects = Project.query.filter_by(user_id=user.id).all()
|
|
for p in projects:
|
|
db.session.delete(p)
|
|
|
|
db.session.delete(user)
|
|
db.session.commit()
|
|
flash(f"User {user.username} deleted.")
|
|
return redirect(url_for('admin_dashboard'))
|
|
|
|
@app.route('/admin/project/<int:project_id>/delete', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def admin_delete_project(project_id):
|
|
proj = db.session.get(Project, project_id)
|
|
if proj:
|
|
if os.path.exists(proj.folder_path):
|
|
try: shutil.rmtree(proj.folder_path)
|
|
except: pass
|
|
db.session.delete(proj)
|
|
db.session.commit()
|
|
flash(f"Project {proj.name} deleted.")
|
|
return redirect(url_for('admin_dashboard'))
|
|
|
|
@app.route('/admin/reset', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def admin_factory_reset():
|
|
# 1. Delete ALL Projects (Files & DB)
|
|
projects = Project.query.all()
|
|
for p in projects:
|
|
if os.path.exists(p.folder_path):
|
|
try: shutil.rmtree(p.folder_path)
|
|
except: pass
|
|
db.session.delete(p)
|
|
|
|
# 2. Delete ALL Users except Current Admin
|
|
users = User.query.filter(User.id != current_user.id).all()
|
|
for u in users:
|
|
user_path = os.path.join(config.DATA_DIR, "users", str(u.id))
|
|
if os.path.exists(user_path):
|
|
try: shutil.rmtree(user_path)
|
|
except: pass
|
|
db.session.delete(u)
|
|
|
|
# 3. Reset Personas to Default
|
|
if os.path.exists(config.PERSONAS_FILE):
|
|
try: os.remove(config.PERSONAS_FILE)
|
|
except: pass
|
|
utils.create_default_personas()
|
|
|
|
db.session.commit()
|
|
flash("Factory Reset Complete. All other users and projects have been wiped.")
|
|
return redirect(url_for('admin_dashboard'))
|
|
|
|
@app.route('/admin/spend')
|
|
@login_required
|
|
@admin_required
|
|
def admin_spend_report():
|
|
days = request.args.get('days', 30, type=int)
|
|
|
|
if days > 0:
|
|
start_date = datetime.utcnow() - timedelta(days=days)
|
|
else:
|
|
start_date = datetime.min
|
|
|
|
# Aggregate spend per user
|
|
results = db.session.query(
|
|
User.username,
|
|
func.count(Run.id),
|
|
func.sum(Run.cost)
|
|
).join(Project, Project.user_id == User.id)\
|
|
.join(Run, Run.project_id == Project.id)\
|
|
.filter(Run.start_time >= start_date)\
|
|
.group_by(User.id, User.username).all()
|
|
|
|
report = []
|
|
total_period_spend = 0.0
|
|
for r in results:
|
|
cost = r[2] if r[2] else 0.0
|
|
report.append({"username": r[0], "runs": r[1], "cost": cost})
|
|
total_period_spend += cost
|
|
|
|
return render_template('admin_spend.html', report=report, days=days, total=total_period_spend)
|
|
|
|
@app.route('/admin/style', methods=['GET', 'POST'])
|
|
@login_required
|
|
@admin_required
|
|
def admin_style_guidelines():
|
|
path = os.path.join(config.DATA_DIR, "style_guidelines.json")
|
|
|
|
if request.method == 'POST':
|
|
ai_isms_raw = request.form.get('ai_isms', '')
|
|
filter_words_raw = request.form.get('filter_words', '')
|
|
|
|
data = {
|
|
"ai_isms": [x.strip() for x in ai_isms_raw.split('\n') if x.strip()],
|
|
"filter_words": [x.strip() for x in filter_words_raw.split('\n') if x.strip()]
|
|
}
|
|
|
|
with open(path, 'w') as f: json.dump(data, f, indent=2)
|
|
flash("Style Guidelines updated successfully.")
|
|
return redirect(url_for('admin_style_guidelines'))
|
|
|
|
# Load current (creates defaults if missing)
|
|
data = story.get_style_guidelines()
|
|
return render_template('admin_style.html', data=data)
|
|
|
|
@app.route('/admin/impersonate/<int:user_id>')
|
|
@login_required
|
|
@admin_required
|
|
def impersonate_user(user_id):
|
|
if user_id == current_user.id:
|
|
flash("Cannot impersonate yourself.")
|
|
return redirect(url_for('admin_dashboard'))
|
|
|
|
user = db.session.get(User, user_id)
|
|
if user:
|
|
session['original_admin_id'] = current_user.id
|
|
login_user(user)
|
|
flash(f"Now viewing as {user.username}")
|
|
return redirect(url_for('index'))
|
|
return redirect(url_for('admin_dashboard'))
|
|
|
|
@app.route('/admin/stop_impersonate')
|
|
@login_required
|
|
def stop_impersonate():
|
|
admin_id = session.get('original_admin_id')
|
|
if admin_id:
|
|
admin = db.session.get(User, admin_id)
|
|
if admin:
|
|
login_user(admin)
|
|
session.pop('original_admin_id', None)
|
|
flash("Restored admin session.")
|
|
return redirect(url_for('admin_dashboard'))
|
|
return redirect(url_for('index'))
|
|
|
|
if __name__ == '__main__':
|
|
# Run the Huey consumer in a separate thread for testing on Pi
|
|
# For production, run `huey_consumer.py web_tasks.huey` in terminal
|
|
import threading
|
|
def run_huey():
|
|
from huey.consumer import Consumer
|
|
# Subclass to disable signal handling (which fails in threads)
|
|
class ThreadedConsumer(Consumer):
|
|
def _set_signal_handlers(self):
|
|
pass
|
|
c = ThreadedConsumer(huey, workers=1, worker_type='thread')
|
|
c.run()
|
|
|
|
# Configuration
|
|
debug_mode = os.environ.get("FLASK_DEBUG", "False").lower() == "true"
|
|
|
|
# Run worker if: 1. In reloader child process OR 2. Reloader is disabled (debug=False)
|
|
if os.environ.get("WERKZEUG_RUN_MAIN") == "true" or not debug_mode:
|
|
threading.Thread(target=run_huey, daemon=True).start()
|
|
|
|
app.run(host='0.0.0.0', port=5000, debug=debug_mode) |