v2.0.0: Modularize project into single-responsibility packages

Replaced monolithic modules/ package with a clean architecture:

- core/       config.py, utils.py
- ai/         models.py (ResilientModel), setup.py (init_models)
- story/      planner.py, writer.py, editor.py, style_persona.py, bible_tracker.py
- marketing/  cover.py, blurb.py, fonts.py, assets.py
- export/     exporter.py
- web/        app.py (Flask factory), db.py, helpers.py, tasks.py, routes/{auth,project,run,persona,admin}.py
- cli/        engine.py (run_generation), wizard.py (BookWizard)

Flask routes split into 5 Blueprints; all templates updated with blueprint-
prefixed url_for() calls. Dockerfile and docker-compose updated to use
web.app entry point and new package paths.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-20 22:20:53 -05:00
parent edabc4d4fa
commit f7099cc3e4
52 changed files with 3984 additions and 3798 deletions

0
web/__init__.py Normal file
View File

106
web/app.py Normal file
View File

@@ -0,0 +1,106 @@
import os
from datetime import datetime
from sqlalchemy import text
from flask import Flask
from flask_login import LoginManager
from werkzeug.security import generate_password_hash
from web.db import db, User, Run
from web.tasks import huey
from core import config
# Calculate paths relative to this file (web/app.py -> project root is two levels up)
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
TEMPLATE_DIR = os.path.join(BASE_DIR, 'templates')
app = Flask(__name__, template_folder=TEMPLATE_DIR)
app.url_map.strict_slashes = False
app.config['SECRET_KEY'] = config.FLASK_SECRET
app.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:///{os.path.join(config.DATA_DIR, "bookapp.db")}'
db.init_app(app)
login_manager = LoginManager()
login_manager.login_view = 'auth.login'
login_manager.init_app(app)
@login_manager.user_loader
def load_user(user_id):
return db.session.get(User, int(user_id))
@app.context_processor
def inject_globals():
return dict(app_version=config.VERSION)
# Register Blueprints
from web.routes.auth import auth_bp
from web.routes.project import project_bp
from web.routes.run import run_bp
from web.routes.persona import persona_bp
from web.routes.admin import admin_bp
app.register_blueprint(auth_bp)
app.register_blueprint(project_bp)
app.register_blueprint(run_bp)
app.register_blueprint(persona_bp)
app.register_blueprint(admin_bp)
# --- SETUP ---
with app.app_context():
db.create_all()
# Auto-create Admin from Environment Variables (Docker/Portainer Setup)
if config.ADMIN_USER and config.ADMIN_PASSWORD:
admin = User.query.filter_by(username=config.ADMIN_USER).first()
if not admin:
print(f"🔐 System: Creating Admin User '{config.ADMIN_USER}' from environment variables.")
admin = User(username=config.ADMIN_USER, password=generate_password_hash(config.ADMIN_PASSWORD, method='pbkdf2:sha256'), is_admin=True)
db.session.add(admin)
db.session.commit()
else:
print(f"🔐 System: Syncing Admin User '{config.ADMIN_USER}' settings from environment.")
if not admin.is_admin: admin.is_admin = True
admin.password = generate_password_hash(config.ADMIN_PASSWORD, method='pbkdf2:sha256')
db.session.add(admin)
db.session.commit()
elif not User.query.filter_by(is_admin=True).first():
print(" System: No Admin credentials found in environment variables. Admin account not created.")
# Migration: Add 'progress' column if missing
try:
with db.engine.connect() as conn:
conn.execute(text("ALTER TABLE run ADD COLUMN progress INTEGER DEFAULT 0"))
conn.commit()
print("✅ System: Added 'progress' column to Run table.")
except: pass
# Reset stuck runs on startup
try:
stuck_runs = Run.query.filter_by(status='running').all()
if stuck_runs:
print(f"⚠️ System: Found {len(stuck_runs)} stuck runs. Resetting to 'failed'.")
for r in stuck_runs:
r.status = 'failed'
r.end_time = datetime.utcnow()
db.session.commit()
except Exception as e:
print(f"⚠️ System: Failed to clean up stuck runs: {e}")
if __name__ == "__main__":
import threading
from huey.contrib.mini import MiniHuey
# Start Huey consumer in background thread
def run_huey():
from huey.consumer import Consumer
consumer = Consumer(huey, workers=1, worker_type='thread', loglevel=20)
consumer.run()
t = threading.Thread(target=run_huey, daemon=True)
t.start()
app.run(host='0.0.0.0', port=7070, debug=False)

50
web/db.py Normal file
View File

@@ -0,0 +1,50 @@
from flask_sqlalchemy import SQLAlchemy
from flask_login import UserMixin
from datetime import datetime
db = SQLAlchemy()
class User(UserMixin, db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(150), unique=True, nullable=False)
password = db.Column(db.String(150), nullable=False)
api_key = db.Column(db.String(200), nullable=True)
total_spend = db.Column(db.Float, default=0.0)
is_admin = db.Column(db.Boolean, default=False)
class Project(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
name = db.Column(db.String(150), nullable=False)
folder_path = db.Column(db.String(300), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
runs = db.relationship('Run', backref='project', lazy=True, cascade="all, delete-orphan")
class Run(db.Model):
id = db.Column(db.Integer, primary_key=True)
project_id = db.Column(db.Integer, db.ForeignKey('project.id'), nullable=False)
status = db.Column(db.String(50), default="queued")
start_time = db.Column(db.DateTime, default=datetime.utcnow)
end_time = db.Column(db.DateTime, nullable=True)
log_file = db.Column(db.String(300), nullable=True)
cost = db.Column(db.Float, default=0.0)
progress = db.Column(db.Integer, default=0)
logs = db.relationship('LogEntry', backref='run', lazy=True, cascade="all, delete-orphan")
def duration(self):
if self.end_time and self.start_time:
return str(self.end_time - self.start_time).split('.')[0]
return "Running..."
class LogEntry(db.Model):
id = db.Column(db.Integer, primary_key=True)
run_id = db.Column(db.Integer, db.ForeignKey('run.id'), nullable=False)
timestamp = db.Column(db.DateTime, default=datetime.utcnow)
phase = db.Column(db.String(50))
message = db.Column(db.Text)

25
web/helpers.py Normal file
View File

@@ -0,0 +1,25 @@
from functools import wraps
from urllib.parse import urlparse, urljoin
from flask import redirect, url_for, flash, request
from flask_login import current_user
from web.db import Run
def admin_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated or not current_user.is_admin:
flash("Admin access required.")
return redirect(url_for('project.index'))
return f(*args, **kwargs)
return decorated_function
def is_project_locked(project_id):
return Run.query.filter_by(project_id=project_id, status='completed').count() > 0
def is_safe_url(target):
ref_url = urlparse(request.host_url)
test_url = urlparse(urljoin(request.host_url, target))
return test_url.scheme in ('http', 'https') and ref_url.netloc == test_url.netloc

15
web/requirements_web.txt Normal file
View File

@@ -0,0 +1,15 @@
flask
flask-login
flask-sqlalchemy
huey
werkzeug
google-generativeai
python-dotenv
rich
markdown
python-docx
EbookLib
requests
Pillow
google-cloud-aiplatform
google-auth-oauthlib

0
web/routes/__init__.py Normal file
View File

226
web/routes/admin.py Normal file
View File

@@ -0,0 +1,226 @@
import os
import json
import shutil
from datetime import datetime, timedelta
from flask import Blueprint, render_template, request, redirect, url_for, flash, session
from flask_login import login_required, login_user, current_user
from sqlalchemy import func
from web.db import db, User, Project, Run
from web.helpers import admin_required
from core import config, utils
from ai import models as ai_models
from ai import setup as ai_setup
from story import style_persona, bible_tracker
admin_bp = Blueprint('admin', __name__)
@admin_bp.route('/admin')
@login_required
@admin_required
def admin_dashboard():
users = User.query.all()
projects = Project.query.all()
return render_template('admin_dashboard.html', users=users, projects=projects)
@admin_bp.route('/admin/user/<int:user_id>/delete', methods=['POST'])
@login_required
@admin_required
def admin_delete_user(user_id):
if user_id == current_user.id:
flash("Cannot delete yourself.")
return redirect(url_for('admin.admin_dashboard'))
user = db.session.get(User, user_id)
if user:
user_path = os.path.join(config.DATA_DIR, "users", str(user.id))
if os.path.exists(user_path):
try: shutil.rmtree(user_path)
except: pass
projects = Project.query.filter_by(user_id=user.id).all()
for p in projects:
db.session.delete(p)
db.session.delete(user)
db.session.commit()
flash(f"User {user.username} deleted.")
return redirect(url_for('admin.admin_dashboard'))
@admin_bp.route('/admin/project/<int:project_id>/delete', methods=['POST'])
@login_required
@admin_required
def admin_delete_project(project_id):
proj = db.session.get(Project, project_id)
if proj:
if os.path.exists(proj.folder_path):
try: shutil.rmtree(proj.folder_path)
except: pass
db.session.delete(proj)
db.session.commit()
flash(f"Project {proj.name} deleted.")
return redirect(url_for('admin.admin_dashboard'))
@admin_bp.route('/admin/reset', methods=['POST'])
@login_required
@admin_required
def admin_factory_reset():
projects = Project.query.all()
for p in projects:
if os.path.exists(p.folder_path):
try: shutil.rmtree(p.folder_path)
except: pass
db.session.delete(p)
users = User.query.filter(User.id != current_user.id).all()
for u in users:
user_path = os.path.join(config.DATA_DIR, "users", str(u.id))
if os.path.exists(user_path):
try: shutil.rmtree(user_path)
except: pass
db.session.delete(u)
if os.path.exists(config.PERSONAS_FILE):
try: os.remove(config.PERSONAS_FILE)
except: pass
utils.create_default_personas()
db.session.commit()
flash("Factory Reset Complete. All other users and projects have been wiped.")
return redirect(url_for('admin.admin_dashboard'))
@admin_bp.route('/admin/spend')
@login_required
@admin_required
def admin_spend_report():
days = request.args.get('days', 30, type=int)
if days > 0:
start_date = datetime.utcnow() - timedelta(days=days)
else:
start_date = datetime.min
results = db.session.query(
User.username,
func.count(Run.id),
func.sum(Run.cost)
).join(Project, Project.user_id == User.id)\
.join(Run, Run.project_id == Project.id)\
.filter(Run.start_time >= start_date)\
.group_by(User.id, User.username).all()
report = []
total_period_spend = 0.0
for r in results:
cost = r[2] if r[2] else 0.0
report.append({"username": r[0], "runs": r[1], "cost": cost})
total_period_spend += cost
return render_template('admin_spend.html', report=report, days=days, total=total_period_spend)
@admin_bp.route('/admin/style', methods=['GET', 'POST'])
@login_required
@admin_required
def admin_style_guidelines():
path = os.path.join(config.DATA_DIR, "style_guidelines.json")
if request.method == 'POST':
ai_isms_raw = request.form.get('ai_isms', '')
filter_words_raw = request.form.get('filter_words', '')
data = {
"ai_isms": [x.strip() for x in ai_isms_raw.split('\n') if x.strip()],
"filter_words": [x.strip() for x in filter_words_raw.split('\n') if x.strip()]
}
with open(path, 'w') as f: json.dump(data, f, indent=2)
flash("Style Guidelines updated successfully.")
return redirect(url_for('admin.admin_style_guidelines'))
data = style_persona.get_style_guidelines()
return render_template('admin_style.html', data=data)
@admin_bp.route('/admin/impersonate/<int:user_id>')
@login_required
@admin_required
def impersonate_user(user_id):
if user_id == current_user.id:
flash("Cannot impersonate yourself.")
return redirect(url_for('admin.admin_dashboard'))
user = db.session.get(User, user_id)
if user:
session['original_admin_id'] = current_user.id
login_user(user)
flash(f"Now viewing as {user.username}")
return redirect(url_for('project.index'))
return redirect(url_for('admin.admin_dashboard'))
@admin_bp.route('/admin/stop_impersonate')
@login_required
def stop_impersonate():
admin_id = session.get('original_admin_id')
if admin_id:
admin = db.session.get(User, admin_id)
if admin:
login_user(admin)
session.pop('original_admin_id', None)
flash("Restored admin session.")
return redirect(url_for('admin.admin_dashboard'))
return redirect(url_for('project.index'))
@admin_bp.route('/debug/routes')
@login_required
@admin_required
def debug_routes():
from flask import current_app
output = []
for rule in current_app.url_map.iter_rules():
methods = ','.join(rule.methods)
rule_str = str(rule).replace('<', '[').replace('>', ']')
line = "{:50s} {:20s} {}".format(rule.endpoint, methods, rule_str)
output.append(line)
return "<pre>" + "\n".join(output) + "</pre>"
@admin_bp.route('/system/optimize_models', methods=['POST'])
@login_required
@admin_required
def optimize_models():
try:
ai_setup.init_models(force=True)
if ai_models.model_logic:
style_persona.refresh_style_guidelines(ai_models.model_logic)
flash("AI Models refreshed and Style Guidelines updated.")
except Exception as e:
flash(f"Error refreshing models: {e}")
return redirect(request.referrer or url_for('project.index'))
@admin_bp.route('/system/status')
@login_required
def system_status():
models_info = {}
cache_data = {}
cache_path = os.path.join(config.DATA_DIR, "model_cache.json")
if os.path.exists(cache_path):
try:
with open(cache_path, 'r') as f:
cache_data = json.load(f)
models_info = cache_data.get('models', {})
except: pass
return render_template('system_status.html', models=models_info, cache=cache_data, datetime=datetime,
image_model=ai_models.image_model_name, image_source=ai_models.image_model_source)

57
web/routes/auth.py Normal file
View File

@@ -0,0 +1,57 @@
from flask import Blueprint, render_template, request, redirect, url_for, flash, session
from flask_login import login_user, login_required, logout_user, current_user
from werkzeug.security import generate_password_hash, check_password_hash
from sqlalchemy.exc import IntegrityError
from web.db import db, User
from web.helpers import is_safe_url
from core import config
auth_bp = Blueprint('auth', __name__)
@auth_bp.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
user = User.query.filter_by(username=username).first()
if user and check_password_hash(user.password, password):
login_user(user)
next_page = request.args.get('next')
if not next_page or not is_safe_url(next_page):
next_page = url_for('project.index')
return redirect(next_page)
if user and user.is_admin:
print(f"⚠️ System: Admin login failed for '{username}'. Password hash mismatch.")
flash('Invalid credentials')
return render_template('login.html')
@auth_bp.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
if User.query.filter_by(username=username).first():
flash('Username exists')
return redirect(url_for('auth.register'))
new_user = User(username=username, password=generate_password_hash(password, method='pbkdf2:sha256'))
if config.ADMIN_USER and username == config.ADMIN_USER:
new_user.is_admin = True
try:
db.session.add(new_user)
db.session.commit()
login_user(new_user)
return redirect(url_for('project.index'))
except IntegrityError:
db.session.rollback()
flash('Username exists')
return redirect(url_for('auth.register'))
return render_template('register.html')
@auth_bp.route('/logout')
def logout():
logout_user()
return redirect(url_for('auth.login'))

135
web/routes/persona.py Normal file
View File

@@ -0,0 +1,135 @@
import os
import json
from flask import Blueprint, render_template, request, redirect, url_for, flash
from flask_login import login_required
from core import config, utils
from ai import models as ai_models
from ai import setup as ai_setup
persona_bp = Blueprint('persona', __name__)
@persona_bp.route('/personas')
@login_required
def list_personas():
personas = {}
if os.path.exists(config.PERSONAS_FILE):
try:
with open(config.PERSONAS_FILE, 'r') as f: personas = json.load(f)
except: pass
return render_template('personas.html', personas=personas)
@persona_bp.route('/persona/new')
@login_required
def new_persona():
return render_template('persona_edit.html', persona={}, name="")
@persona_bp.route('/persona/<string:name>')
@login_required
def edit_persona(name):
personas = {}
if os.path.exists(config.PERSONAS_FILE):
try:
with open(config.PERSONAS_FILE, 'r') as f: personas = json.load(f)
except: pass
persona = personas.get(name)
if not persona:
flash(f"Persona '{name}' not found.")
return redirect(url_for('persona.list_personas'))
return render_template('persona_edit.html', persona=persona, name=name)
@persona_bp.route('/persona/save', methods=['POST'])
@login_required
def save_persona():
old_name = request.form.get('old_name')
name = request.form.get('name')
if not name:
flash("Persona name is required.")
return redirect(url_for('persona.list_personas'))
personas = {}
if os.path.exists(config.PERSONAS_FILE):
try:
with open(config.PERSONAS_FILE, 'r') as f: personas = json.load(f)
except: pass
if old_name and old_name != name and old_name in personas:
del personas[old_name]
persona = {
"name": name,
"bio": request.form.get('bio'),
"age": request.form.get('age'),
"gender": request.form.get('gender'),
"race": request.form.get('race'),
"nationality": request.form.get('nationality'),
"language": request.form.get('language'),
"sample_text": request.form.get('sample_text'),
"voice_keywords": request.form.get('voice_keywords'),
"style_inspirations": request.form.get('style_inspirations')
}
personas[name] = persona
with open(config.PERSONAS_FILE, 'w') as f: json.dump(personas, f, indent=2)
flash(f"Persona '{name}' saved.")
return redirect(url_for('persona.list_personas'))
@persona_bp.route('/persona/delete/<string:name>', methods=['POST'])
@login_required
def delete_persona(name):
personas = {}
if os.path.exists(config.PERSONAS_FILE):
try:
with open(config.PERSONAS_FILE, 'r') as f: personas = json.load(f)
except: pass
if name in personas:
del personas[name]
with open(config.PERSONAS_FILE, 'w') as f: json.dump(personas, f, indent=2)
flash(f"Persona '{name}' deleted.")
return redirect(url_for('persona.list_personas'))
@persona_bp.route('/persona/analyze', methods=['POST'])
@login_required
def analyze_persona():
try: ai_setup.init_models()
except: pass
if not ai_models.model_logic:
return {"error": "AI models not initialized."}, 500
data = request.json
sample = data.get('sample_text', '')
prompt = f"""
ROLE: Literary Analyst
TASK: Create or analyze an Author Persona profile.
INPUT_DATA:
- NAME: {data.get('name')}
- DEMOGRAPHICS: Age: {data.get('age')} | Gender: {data.get('gender')} | Nationality: {data.get('nationality')}
- SAMPLE_TEXT: {sample[:3000]}
INSTRUCTIONS:
1. BIO: Write a 2-3 sentence description of the writing style. If sample is provided, analyze it. If not, invent a style that fits the demographics/name.
2. KEYWORDS: Comma-separated list of 3-5 adjectives describing the voice (e.g. Gritty, Whimsical, Sarcastic).
3. INSPIRATIONS: Comma-separated list of 1-3 famous authors or genres that this style resembles.
OUTPUT_FORMAT (JSON): {{ "bio": "String", "voice_keywords": "String", "style_inspirations": "String" }}
"""
try:
response = ai_models.model_logic.generate_content(prompt)
return json.loads(utils.clean_json(response.text))
except Exception as e:
return {"error": str(e)}, 500

760
web/routes/project.py Normal file
View File

@@ -0,0 +1,760 @@
import os
import json
import shutil
from datetime import datetime
from flask import Blueprint, render_template, request, redirect, url_for, flash
from flask_login import login_required, current_user
from web.db import db, Project, Run
from web.helpers import is_project_locked
from core import config, utils
from ai import models as ai_models
from ai import setup as ai_setup
from story import planner, bible_tracker
from web.tasks import generate_book_task, refine_bible_task
project_bp = Blueprint('project', __name__)
@project_bp.route('/')
@login_required
def index():
projects = Project.query.filter_by(user_id=current_user.id).all()
return render_template('dashboard.html', projects=projects, user=current_user)
@project_bp.route('/project/setup', methods=['POST'])
@login_required
def project_setup_wizard():
concept = request.form.get('concept')
try: ai_setup.init_models()
except: pass
if not ai_models.model_logic:
flash("AI models not initialized.")
return redirect(url_for('project.index'))
prompt = f"""
ROLE: Publishing Analyst
TASK: Suggest metadata for a story concept.
CONCEPT: {concept}
OUTPUT_FORMAT (JSON):
{{
"title": "String",
"genre": "String",
"target_audience": "String",
"tone": "String",
"length_category": "String (Select code: '01'=Chapter Book, '1'=Flash Fiction, '2'=Short Story, '2b'=Young Adult, '3'=Novella, '4'=Novel, '5'=Epic)",
"estimated_chapters": Int,
"estimated_word_count": "String (e.g. '75,000')",
"include_prologue": Bool,
"include_epilogue": Bool,
"tropes": ["String"],
"pov_style": "String",
"time_period": "String",
"spice": "String",
"violence": "String",
"is_series": Bool,
"series_title": "String",
"narrative_tense": "String",
"language_style": "String",
"dialogue_style": "String",
"page_orientation": "Portrait|Landscape|Square",
"formatting_rules": ["String (e.g. 'Chapter Headers: Number + Title')"],
"author_bio": "String"
}}
"""
suggestions = {}
try:
response = ai_models.model_logic.generate_content(prompt)
suggestions = json.loads(utils.clean_json(response.text))
except Exception as e:
flash(f"AI Analysis failed: {e}")
suggestions = {"title": "New Project", "genre": "Fiction"}
personas = {}
if os.path.exists(config.PERSONAS_FILE):
try:
with open(config.PERSONAS_FILE, 'r') as f: personas = json.load(f)
except: pass
return render_template('project_setup.html', s=suggestions, concept=concept, personas=personas, lengths=config.LENGTH_DEFINITIONS)
@project_bp.route('/project/setup/refine', methods=['POST'])
@login_required
def project_setup_refine():
concept = request.form.get('concept')
instruction = request.form.get('refine_instruction')
current_state = {
"title": request.form.get('title'),
"genre": request.form.get('genre'),
"target_audience": request.form.get('audience'),
"tone": request.form.get('tone'),
}
try: ai_setup.init_models()
except: pass
prompt = f"""
ROLE: Publishing Analyst
TASK: Refine project metadata based on user instruction.
INPUT_DATA:
- ORIGINAL_CONCEPT: {concept}
- CURRENT_TITLE: {current_state['title']}
- INSTRUCTION: {instruction}
OUTPUT_FORMAT (JSON): Same structure as the initial analysis (title, genre, length_category, etc). Ensure length_category matches the word count.
"""
suggestions = {}
try:
response = ai_models.model_logic.generate_content(prompt)
suggestions = json.loads(utils.clean_json(response.text))
except Exception as e:
flash(f"Refinement failed: {e}")
return redirect(url_for('project.index'))
personas = {}
if os.path.exists(config.PERSONAS_FILE):
try:
with open(config.PERSONAS_FILE, 'r') as f: personas = json.load(f)
except: pass
return render_template('project_setup.html', s=suggestions, concept=concept, personas=personas, lengths=config.LENGTH_DEFINITIONS)
@project_bp.route('/project/create', methods=['POST'])
@login_required
def create_project_final():
title = request.form.get('title')
safe_title = utils.sanitize_filename(title)
user_dir = os.path.join(config.DATA_DIR, "users", str(current_user.id))
os.makedirs(user_dir, exist_ok=True)
proj_path = os.path.join(user_dir, safe_title)
if os.path.exists(proj_path):
safe_title += f"_{int(datetime.utcnow().timestamp())}"
proj_path = os.path.join(user_dir, safe_title)
os.makedirs(proj_path, exist_ok=True)
length_cat = request.form.get('length_category')
len_def = config.LENGTH_DEFINITIONS.get(length_cat, config.LENGTH_DEFINITIONS['4']).copy()
try: len_def['chapters'] = int(request.form.get('chapters'))
except: pass
len_def['words'] = request.form.get('words')
len_def['include_prologue'] = 'include_prologue' in request.form
len_def['include_epilogue'] = 'include_epilogue' in request.form
is_series = 'is_series' in request.form
style = {
"tone": request.form.get('tone'),
"pov_style": request.form.get('pov_style'),
"time_period": request.form.get('time_period'),
"spice": request.form.get('spice'),
"violence": request.form.get('violence'),
"narrative_tense": request.form.get('narrative_tense'),
"language_style": request.form.get('language_style'),
"dialogue_style": request.form.get('dialogue_style'),
"page_orientation": request.form.get('page_orientation'),
"tropes": [x.strip() for x in request.form.get('tropes', '').split(',') if x.strip()],
"formatting_rules": [x.strip() for x in request.form.get('formatting_rules', '').split(',') if x.strip()]
}
bible = {
"project_metadata": {
"title": title,
"author": request.form.get('author'),
"author_bio": request.form.get('author_bio'),
"genre": request.form.get('genre'),
"target_audience": request.form.get('audience'),
"is_series": is_series,
"length_settings": len_def,
"style": style
},
"books": [],
"characters": []
}
count = 1
if is_series:
try: count = int(request.form.get('series_count', 1))
except: count = 3
concept = request.form.get('concept', '')
for i in range(count):
bible['books'].append({
"book_number": i+1,
"title": f"{title} - Book {i+1}" if is_series else title,
"manual_instruction": concept if i==0 else "",
"plot_beats": []
})
try:
ai_setup.init_models()
bible = planner.enrich(bible, proj_path)
except: pass
with open(os.path.join(proj_path, "bible.json"), 'w') as f:
json.dump(bible, f, indent=2)
new_proj = Project(user_id=current_user.id, name=title, folder_path=proj_path)
db.session.add(new_proj)
db.session.commit()
return redirect(url_for('project.view_project', id=new_proj.id))
@project_bp.route('/project/import', methods=['POST'])
@login_required
def import_project():
if 'bible_file' not in request.files:
flash('No file part')
return redirect(url_for('project.index'))
file = request.files['bible_file']
if file.filename == '':
flash('No selected file')
return redirect(url_for('project.index'))
if file:
try:
bible = json.load(file)
if 'project_metadata' not in bible or 'title' not in bible['project_metadata']:
flash("Invalid Bible format: Missing project_metadata or title.")
return redirect(url_for('project.index'))
title = bible['project_metadata']['title']
safe_title = utils.sanitize_filename(title)
user_dir = os.path.join(config.DATA_DIR, "users", str(current_user.id))
os.makedirs(user_dir, exist_ok=True)
proj_path = os.path.join(user_dir, safe_title)
if os.path.exists(proj_path):
safe_title += f"_{int(datetime.utcnow().timestamp())}"
proj_path = os.path.join(user_dir, safe_title)
os.makedirs(proj_path)
with open(os.path.join(proj_path, "bible.json"), 'w') as f:
json.dump(bible, f, indent=2)
new_proj = Project(user_id=current_user.id, name=title, folder_path=proj_path)
db.session.add(new_proj)
db.session.commit()
flash(f"Project '{title}' imported successfully.")
return redirect(url_for('project.view_project', id=new_proj.id))
except Exception as e:
flash(f"Import failed: {str(e)}")
return redirect(url_for('project.index'))
@project_bp.route('/project/<int:id>')
@login_required
def view_project(id):
proj = db.session.get(Project, id)
if not proj: return "Project not found", 404
if proj.user_id != current_user.id: return "Unauthorized", 403
bible_path = os.path.join(proj.folder_path, "bible.json")
bible_data = utils.load_json(bible_path)
draft_path = os.path.join(proj.folder_path, "bible_draft.json")
has_draft = os.path.exists(draft_path)
is_refining = os.path.exists(os.path.join(proj.folder_path, ".refining"))
personas = {}
if os.path.exists(config.PERSONAS_FILE):
try:
with open(config.PERSONAS_FILE, 'r') as f: personas = json.load(f)
except: pass
runs = Run.query.filter_by(project_id=id).order_by(Run.id.desc()).all()
latest_run = runs[0] if runs else None
other_projects = Project.query.filter(Project.user_id == current_user.id, Project.id != id).all()
artifacts = []
cover_image = None
generated_books = {}
locked = is_project_locked(id)
for r in runs:
if r.status == 'completed':
run_dir = os.path.join(proj.folder_path, "runs", f"run_{r.id}")
if os.path.exists(run_dir):
for d in os.listdir(run_dir):
if d.startswith("Book_") and os.path.isdir(os.path.join(run_dir, d)):
if os.path.exists(os.path.join(run_dir, d, "manuscript.json")):
try:
parts = d.split('_')
if len(parts) > 1 and parts[1].isdigit():
b_num = int(parts[1])
if b_num not in generated_books:
book_path = os.path.join(run_dir, d)
epub_file = next((f for f in os.listdir(book_path) if f.endswith('.epub')), None)
docx_file = next((f for f in os.listdir(book_path) if f.endswith('.docx')), None)
generated_books[b_num] = {'status': 'generated', 'run_id': r.id, 'folder': d, 'epub': os.path.join(d, epub_file).replace("\\", "/") if epub_file else None, 'docx': os.path.join(d, docx_file).replace("\\", "/") if docx_file else None}
except: pass
if latest_run:
run_dir = os.path.join(proj.folder_path, "runs", f"run_{latest_run.id}")
if os.path.exists(run_dir):
if os.path.exists(os.path.join(run_dir, "cover.png")):
cover_image = "cover.png"
else:
subdirs = utils.get_sorted_book_folders(run_dir)
for d in subdirs:
if os.path.exists(os.path.join(run_dir, d, "cover.png")):
cover_image = os.path.join(d, "cover.png").replace("\\", "/")
break
for root, dirs, files in os.walk(run_dir):
for f in files:
if f.lower().endswith(('.epub', '.docx')):
rel_path = os.path.relpath(os.path.join(root, f), run_dir)
artifacts.append({
'name': f,
'path': rel_path.replace("\\", "/"),
'type': f.split('.')[-1].upper()
})
return render_template('project.html', project=proj, bible=bible_data, runs=runs, active_run=latest_run, artifacts=artifacts, cover_image=cover_image, personas=personas, generated_books=generated_books, other_projects=other_projects, locked=locked, has_draft=has_draft, is_refining=is_refining)
@project_bp.route('/project/<int:id>/run', methods=['POST'])
@login_required
def run_project(id):
proj = db.session.get(Project, id) or Project.query.get_or_404(id)
new_run = Run(project_id=id, status="queued")
db.session.add(new_run)
db.session.commit()
bible_path = os.path.join(proj.folder_path, "bible.json")
generate_book_task(new_run.id, proj.folder_path, bible_path, allow_copy=True)
return redirect(url_for('project.view_project', id=id))
@project_bp.route('/project/<int:id>/review')
@login_required
def review_project(id):
proj = db.session.get(Project, id) or Project.query.get_or_404(id)
if proj.user_id != current_user.id: return "Unauthorized", 403
bible_path = os.path.join(proj.folder_path, "bible.json")
bible = utils.load_json(bible_path)
return render_template('project_review.html', project=proj, bible=bible)
@project_bp.route('/project/<int:id>/update', methods=['POST'])
@login_required
def update_project_metadata(id):
proj = db.session.get(Project, id) or Project.query.get_or_404(id)
if proj.user_id != current_user.id: return "Unauthorized", 403
if is_project_locked(id):
flash("Project is locked. Clone it to make changes.")
return redirect(url_for('project.view_project', id=id))
new_title = request.form.get('title')
new_author = request.form.get('author')
bible_path = os.path.join(proj.folder_path, "bible.json")
bible = utils.load_json(bible_path)
if bible:
if new_title:
bible['project_metadata']['title'] = new_title
proj.name = new_title
if new_author:
bible['project_metadata']['author'] = new_author
with open(bible_path, 'w') as f: json.dump(bible, f, indent=2)
db.session.commit()
return redirect(url_for('project.view_project', id=id))
@project_bp.route('/project/<int:id>/clone', methods=['POST'])
@login_required
def clone_project(id):
source_proj = db.session.get(Project, id) or Project.query.get_or_404(id)
if source_proj.user_id != current_user.id: return "Unauthorized", 403
new_name = request.form.get('new_name')
instruction = request.form.get('instruction')
safe_title = utils.sanitize_filename(new_name)
user_dir = os.path.join(config.DATA_DIR, "users", str(current_user.id))
new_path = os.path.join(user_dir, safe_title)
if os.path.exists(new_path):
safe_title += f"_{int(datetime.utcnow().timestamp())}"
new_path = os.path.join(user_dir, safe_title)
os.makedirs(new_path)
source_bible_path = os.path.join(source_proj.folder_path, "bible.json")
if os.path.exists(source_bible_path):
bible = utils.load_json(source_bible_path)
bible['project_metadata']['title'] = new_name
if instruction:
try:
ai_setup.init_models()
bible = bible_tracker.refine_bible(bible, instruction, new_path) or bible
except: pass
with open(os.path.join(new_path, "bible.json"), 'w') as f: json.dump(bible, f, indent=2)
new_proj = Project(user_id=current_user.id, name=new_name, folder_path=new_path)
db.session.add(new_proj)
db.session.commit()
flash(f"Project cloned as '{new_name}'.")
return redirect(url_for('project.view_project', id=new_proj.id))
@project_bp.route('/project/<int:id>/bible_comparison')
@login_required
def bible_comparison(id):
proj = db.session.get(Project, id) or Project.query.get_or_404(id)
if proj.user_id != current_user.id: return "Unauthorized", 403
bible_path = os.path.join(proj.folder_path, "bible.json")
draft_path = os.path.join(proj.folder_path, "bible_draft.json")
if not os.path.exists(draft_path):
flash("No draft found. Please refine the bible first.")
return redirect(url_for('project.review_project', id=id))
original = utils.load_json(bible_path)
new_draft = utils.load_json(draft_path)
if not original or not new_draft:
flash("Error loading bible data. Draft may be corrupt.")
return redirect(url_for('project.review_project', id=id))
return render_template('bible_comparison.html', project=proj, original=original, new=new_draft)
@project_bp.route('/project/<int:id>/refine_bible', methods=['POST'])
@login_required
def refine_bible_route(id):
proj = db.session.get(Project, id) or Project.query.get_or_404(id)
if proj.user_id != current_user.id: return "Unauthorized", 403
if is_project_locked(id):
flash("Project is locked. Clone it to make changes.")
return redirect(url_for('project.view_project', id=id))
data = request.json if request.is_json else request.form
instruction = data.get('instruction')
if not instruction:
return {"error": "Instruction required"}, 400
source_type = data.get('source', 'original')
selected_keys = data.get('selected_keys')
if isinstance(selected_keys, str):
try: selected_keys = json.loads(selected_keys) if selected_keys.strip() else []
except: selected_keys = []
task = refine_bible_task(proj.folder_path, instruction, source_type, selected_keys)
return {"status": "queued", "task_id": task.id}
@project_bp.route('/project/<int:id>/is_refining')
@login_required
def check_refinement_status(id):
proj = db.session.get(Project, id) or Project.query.get_or_404(id)
if proj.user_id != current_user.id: return "Unauthorized", 403
is_refining = os.path.exists(os.path.join(proj.folder_path, ".refining"))
return {"is_refining": is_refining}
@project_bp.route('/project/<int:id>/refine_bible/confirm', methods=['POST'])
@login_required
def confirm_bible_refinement(id):
proj = db.session.get(Project, id) or Project.query.get_or_404(id)
if proj.user_id != current_user.id: return "Unauthorized", 403
action = request.form.get('action')
draft_path = os.path.join(proj.folder_path, "bible_draft.json")
bible_path = os.path.join(proj.folder_path, "bible.json")
if action == 'accept' or action == 'accept_all':
if os.path.exists(draft_path):
shutil.move(draft_path, bible_path)
flash("Bible updated successfully.")
else:
flash("Draft expired or missing.")
elif action == 'accept_selected':
if os.path.exists(draft_path) and os.path.exists(bible_path):
selected_keys_json = request.form.get('selected_keys', '[]')
try:
selected_keys = json.loads(selected_keys_json)
draft = utils.load_json(draft_path)
original = utils.load_json(bible_path)
original = bible_tracker.merge_selected_changes(original, draft, selected_keys)
with open(bible_path, 'w') as f: json.dump(original, f, indent=2)
os.remove(draft_path)
flash(f"Merged {len(selected_keys)} changes into Bible.")
except Exception as e:
flash(f"Merge failed: {e}")
else:
flash("Files missing.")
elif action == 'decline':
if os.path.exists(draft_path):
os.remove(draft_path)
flash("Changes discarded.")
return redirect(url_for('project.view_project', id=id))
@project_bp.route('/project/<int:id>/add_book', methods=['POST'])
@login_required
def add_book(id):
proj = db.session.get(Project, id) or Project.query.get_or_404(id)
if proj.user_id != current_user.id: return "Unauthorized", 403
if is_project_locked(id):
flash("Project is locked. Clone it to make changes.")
return redirect(url_for('project.view_project', id=id))
title = request.form.get('title', 'Untitled')
instruction = request.form.get('instruction', '')
bible_path = os.path.join(proj.folder_path, "bible.json")
bible = utils.load_json(bible_path)
if bible:
if 'books' not in bible: bible['books'] = []
next_num = len(bible['books']) + 1
new_book = {
"book_number": next_num,
"title": title,
"manual_instruction": instruction,
"plot_beats": []
}
bible['books'].append(new_book)
if 'project_metadata' in bible:
bible['project_metadata']['is_series'] = True
with open(bible_path, 'w') as f: json.dump(bible, f, indent=2)
flash(f"Added Book {next_num}: {title}")
return redirect(url_for('project.view_project', id=id))
@project_bp.route('/project/<int:id>/book/<int:book_num>/update', methods=['POST'])
@login_required
def update_book_details(id, book_num):
proj = db.session.get(Project, id) or Project.query.get_or_404(id)
if proj.user_id != current_user.id: return "Unauthorized", 403
if is_project_locked(id):
flash("Project is locked. Clone it to make changes.")
return redirect(url_for('project.view_project', id=id))
new_title = request.form.get('title')
new_instruction = request.form.get('instruction')
bible_path = os.path.join(proj.folder_path, "bible.json")
bible = utils.load_json(bible_path)
if bible and 'books' in bible:
for b in bible['books']:
if b.get('book_number') == book_num:
if new_title: b['title'] = new_title
if new_instruction is not None: b['manual_instruction'] = new_instruction
break
with open(bible_path, 'w') as f: json.dump(bible, f, indent=2)
flash(f"Book {book_num} updated.")
return redirect(url_for('project.view_project', id=id))
@project_bp.route('/project/<int:id>/delete_book/<int:book_num>', methods=['POST'])
@login_required
def delete_book(id, book_num):
proj = db.session.get(Project, id) or Project.query.get_or_404(id)
if proj.user_id != current_user.id: return "Unauthorized", 403
if is_project_locked(id):
flash("Project is locked. Clone it to make changes.")
return redirect(url_for('project.view_project', id=id))
bible_path = os.path.join(proj.folder_path, "bible.json")
bible = utils.load_json(bible_path)
if bible and 'books' in bible:
bible['books'] = [b for b in bible['books'] if b.get('book_number') != book_num]
for i, b in enumerate(bible['books']):
b['book_number'] = i + 1
if 'project_metadata' in bible:
bible['project_metadata']['is_series'] = (len(bible['books']) > 1)
with open(bible_path, 'w') as f: json.dump(bible, f, indent=2)
flash("Book deleted from plan.")
return redirect(url_for('project.view_project', id=id))
@project_bp.route('/project/<int:id>/import_characters', methods=['POST'])
@login_required
def import_characters(id):
target_proj = db.session.get(Project, id)
source_id = request.form.get('source_project_id')
source_proj = db.session.get(Project, source_id)
if not target_proj or not source_proj: return "Project not found", 404
if target_proj.user_id != current_user.id or source_proj.user_id != current_user.id: return "Unauthorized", 403
if is_project_locked(id):
flash("Project is locked. Clone it to make changes.")
return redirect(url_for('project.view_project', id=id))
target_bible = utils.load_json(os.path.join(target_proj.folder_path, "bible.json"))
source_bible = utils.load_json(os.path.join(source_proj.folder_path, "bible.json"))
if target_bible and source_bible:
existing_names = {c['name'].lower() for c in target_bible.get('characters', [])}
added_count = 0
for char in source_bible.get('characters', []):
if char['name'].lower() not in existing_names:
target_bible['characters'].append(char)
added_count += 1
if added_count > 0:
with open(os.path.join(target_proj.folder_path, "bible.json"), 'w') as f:
json.dump(target_bible, f, indent=2)
flash(f"Imported {added_count} characters from {source_proj.name}.")
else:
flash("No new characters found to import.")
return redirect(url_for('project.view_project', id=id))
@project_bp.route('/project/<int:id>/set_persona', methods=['POST'])
@login_required
def set_project_persona(id):
proj = db.session.get(Project, id) or Project.query.get_or_404(id)
if proj.user_id != current_user.id: return "Unauthorized", 403
if is_project_locked(id):
flash("Project is locked. Clone it to make changes.")
return redirect(url_for('project.view_project', id=id))
persona_name = request.form.get('persona_name')
bible_path = os.path.join(proj.folder_path, "bible.json")
bible = utils.load_json(bible_path)
if bible:
personas = {}
if os.path.exists(config.PERSONAS_FILE):
try:
with open(config.PERSONAS_FILE, 'r') as f: personas = json.load(f)
except: pass
if persona_name in personas:
bible['project_metadata']['author_details'] = personas[persona_name]
with open(bible_path, 'w') as f: json.dump(bible, f, indent=2)
flash(f"Project voice updated to persona: {persona_name}")
else:
flash("Persona not found.")
return redirect(url_for('project.view_project', id=id))
@project_bp.route('/run/<int:id>/stop', methods=['POST'])
@login_required
def stop_run(id):
run = db.session.get(Run, id) or Run.query.get_or_404(id)
if run.project.user_id != current_user.id: return "Unauthorized", 403
if run.status in ['queued', 'running']:
run.status = 'cancelled'
run.end_time = datetime.utcnow()
db.session.commit()
run_dir = os.path.join(run.project.folder_path, "runs", f"run_{run.id}")
if os.path.exists(run_dir):
with open(os.path.join(run_dir, ".stop"), 'w') as f: f.write("stop")
flash(f"Run {id} marked as cancelled.")
return redirect(url_for('project.view_project', id=run.project_id))
@project_bp.route('/run/<int:id>/restart', methods=['POST'])
@login_required
def restart_run(id):
run = db.session.get(Run, id) or Run.query.get_or_404(id)
if run.project.user_id != current_user.id: return "Unauthorized", 403
new_run = Run(project_id=run.project_id, status="queued")
db.session.add(new_run)
db.session.commit()
mode = request.form.get('mode', 'resume')
feedback = request.form.get('feedback')
keep_cover = 'keep_cover' in request.form
force_regen = 'force_regenerate' in request.form
allow_copy = (mode == 'resume' and not force_regen)
if feedback: allow_copy = False
generate_book_task(new_run.id, run.project.folder_path, os.path.join(run.project.folder_path, "bible.json"), allow_copy=allow_copy, feedback=feedback, source_run_id=id if feedback else None, keep_cover=keep_cover)
flash(f"Started new Run #{new_run.id}" + (" with modifications." if feedback else "."))
return redirect(url_for('project.view_project', id=run.project_id))
@project_bp.route('/project/<int:run_id>/revise_book/<string:book_folder>', methods=['POST'])
@login_required
def revise_book(run_id, book_folder):
run = db.session.get(Run, run_id) or Run.query.get_or_404(run_id)
if run.project.user_id != current_user.id: return "Unauthorized", 403
instruction = request.form.get('instruction')
new_run = Run(project_id=run.project_id, status="queued")
db.session.add(new_run)
db.session.commit()
generate_book_task(
new_run.id,
run.project.folder_path,
os.path.join(run.project.folder_path, "bible.json"),
allow_copy=True,
feedback=instruction,
source_run_id=run.id,
keep_cover=True,
exclude_folders=[book_folder]
)
flash(f"Started Revision Run #{new_run.id}. Book '{book_folder}' will be regenerated.")
return redirect(url_for('project.view_project', id=run.project_id))

335
web/routes/run.py Normal file
View File

@@ -0,0 +1,335 @@
import os
import json
import markdown
from flask import Blueprint, render_template, request, redirect, url_for, flash, session, send_from_directory
from flask_login import login_required, current_user
from web.db import db, Run, LogEntry
from core import utils
from ai import models as ai_models
from ai import setup as ai_setup
from story import editor as story_editor
from story import bible_tracker, style_persona
from export import exporter
from web.tasks import huey, regenerate_artifacts_task, rewrite_chapter_task
run_bp = Blueprint('run', __name__)
@run_bp.route('/run/<int:id>')
@login_required
def view_run(id):
run = db.session.get(Run, id)
if not run: return "Run not found", 404
if run.project.user_id != current_user.id: return "Unauthorized", 403
log_content = ""
logs = LogEntry.query.filter_by(run_id=id).order_by(LogEntry.timestamp).all()
if logs:
log_content = "\n".join([f"[{l.timestamp.strftime('%H:%M:%S')}] {l.phase:<15} | {l.message}" for l in logs])
elif run.log_file and os.path.exists(run.log_file):
with open(run.log_file, 'r') as f: log_content = f.read()
run_dir = os.path.join(run.project.folder_path, "runs", f"run_{run.id}")
books_data = []
if os.path.exists(run_dir):
subdirs = utils.get_sorted_book_folders(run_dir)
for d in subdirs:
b_path = os.path.join(run_dir, d)
b_info = {'folder': d, 'artifacts': [], 'cover': None, 'blurb': ''}
for f in os.listdir(b_path):
if f.lower().endswith(('.epub', '.docx')):
b_info['artifacts'].append({'name': f, 'path': os.path.join(d, f).replace("\\", "/")})
if os.path.exists(os.path.join(b_path, "cover.png")):
b_info['cover'] = os.path.join(d, "cover.png").replace("\\", "/")
blurb_p = os.path.join(b_path, "blurb.txt")
if os.path.exists(blurb_p):
with open(blurb_p, 'r', encoding='utf-8', errors='ignore') as f: b_info['blurb'] = f.read()
books_data.append(b_info)
bible_path = os.path.join(run.project.folder_path, "bible.json")
bible_data = utils.load_json(bible_path)
tracking = {"events": [], "characters": {}, "content_warnings": []}
book_dir = os.path.join(run_dir, books_data[-1]['folder']) if books_data else run_dir
if os.path.exists(book_dir):
t_ev = os.path.join(book_dir, "tracking_events.json")
t_ch = os.path.join(book_dir, "tracking_characters.json")
t_wn = os.path.join(book_dir, "tracking_warnings.json")
if os.path.exists(t_ev): tracking['events'] = utils.load_json(t_ev) or []
if os.path.exists(t_ch): tracking['characters'] = utils.load_json(t_ch) or {}
if os.path.exists(t_wn): tracking['content_warnings'] = utils.load_json(t_wn) or []
return render_template('run_details.html', run=run, log_content=log_content, books=books_data, bible=bible_data, tracking=tracking)
@run_bp.route('/run/<int:id>/status')
@login_required
def run_status(id):
run = db.session.get(Run, id) or Run.query.get_or_404(id)
log_content = ""
last_log = None
logs = LogEntry.query.filter_by(run_id=id).order_by(LogEntry.timestamp).all()
if logs:
log_content = "\n".join([f"[{l.timestamp.strftime('%H:%M:%S')}] {l.phase:<15} | {l.message}" for l in logs])
last_log = logs[-1]
if not log_content:
if run.log_file and os.path.exists(run.log_file):
with open(run.log_file, 'r') as f: log_content = f.read()
elif run.status in ['queued', 'running']:
temp_log = os.path.join(run.project.folder_path, f"system_log_{run.id}.txt")
if os.path.exists(temp_log):
with open(temp_log, 'r') as f: log_content = f.read()
response = {
"status": run.status,
"log": log_content,
"cost": run.cost,
"percent": run.progress,
"start_time": run.start_time.timestamp() if run.start_time else None
}
if last_log:
response["progress"] = {
"phase": last_log.phase,
"message": last_log.message,
"timestamp": last_log.timestamp.timestamp()
}
return response
@run_bp.route('/project/<int:run_id>/download')
@login_required
def download_artifact(run_id):
filename = request.args.get('file')
run = db.session.get(Run, run_id) or Run.query.get_or_404(run_id)
if run.project.user_id != current_user.id: return "Unauthorized", 403
if not filename: return "Missing filename", 400
if os.path.isabs(filename) or ".." in os.path.normpath(filename) or ":" in filename:
return "Invalid filename", 400
run_dir = os.path.join(run.project.folder_path, "runs", f"run_{run.id}")
if not os.path.exists(os.path.join(run_dir, filename)) and os.path.exists(run_dir):
subdirs = utils.get_sorted_book_folders(run_dir)
for d in subdirs:
possible_path = os.path.join(d, filename)
if os.path.exists(os.path.join(run_dir, possible_path)):
filename = possible_path
break
return send_from_directory(run_dir, filename, as_attachment=True)
@run_bp.route('/project/<int:run_id>/read/<string:book_folder>')
@login_required
def read_book(run_id, book_folder):
run = db.session.get(Run, run_id) or Run.query.get_or_404(run_id)
if run.project.user_id != current_user.id: return "Unauthorized", 403
if not book_folder or "/" in book_folder or "\\" in book_folder or ".." in book_folder: return "Invalid book folder", 400
run_dir = os.path.join(run.project.folder_path, "runs", f"run_{run.id}")
book_path = os.path.join(run_dir, book_folder)
ms_path = os.path.join(book_path, "manuscript.json")
if not os.path.exists(ms_path):
flash("Manuscript not found.")
return redirect(url_for('run.view_run', id=run_id))
manuscript = utils.load_json(ms_path)
manuscript.sort(key=utils.chapter_sort_key)
for ch in manuscript:
ch['html_content'] = markdown.markdown(ch.get('content', ''))
return render_template('read_book.html', run=run, book_folder=book_folder, manuscript=manuscript)
@run_bp.route('/project/<int:run_id>/save_chapter', methods=['POST'])
@login_required
def save_chapter(run_id):
run = db.session.get(Run, run_id) or Run.query.get_or_404(run_id)
if run.project.user_id != current_user.id: return "Unauthorized", 403
if run.status == 'running':
return "Cannot edit chapter while run is active.", 409
book_folder = request.form.get('book_folder')
chap_num_raw = request.form.get('chapter_num')
try: chap_num = int(chap_num_raw)
except: chap_num = chap_num_raw
new_content = request.form.get('content')
if not book_folder or "/" in book_folder or "\\" in book_folder or ".." in book_folder: return "Invalid book folder", 400
run_dir = os.path.join(run.project.folder_path, "runs", f"run_{run.id}")
ms_path = os.path.join(run_dir, book_folder, "manuscript.json")
if os.path.exists(ms_path):
ms = utils.load_json(ms_path)
for ch in ms:
if str(ch.get('num')) == str(chap_num):
ch['content'] = new_content
break
with open(ms_path, 'w') as f: json.dump(ms, f, indent=2)
book_path = os.path.join(run_dir, book_folder)
bp_path = os.path.join(book_path, "final_blueprint.json")
if os.path.exists(bp_path):
bp = utils.load_json(bp_path)
exporter.compile_files(bp, ms, book_path)
return "Saved", 200
return "Error", 500
@run_bp.route('/project/<int:run_id>/check_consistency/<string:book_folder>')
@login_required
def check_consistency(run_id, book_folder):
run = db.session.get(Run, run_id) or Run.query.get_or_404(run_id)
if not book_folder or "/" in book_folder or "\\" in book_folder or ".." in book_folder: return "Invalid book folder", 400
run_dir = os.path.join(run.project.folder_path, "runs", f"run_{run.id}")
book_path = os.path.join(run_dir, book_folder)
bp = utils.load_json(os.path.join(book_path, "final_blueprint.json"))
ms = utils.load_json(os.path.join(book_path, "manuscript.json"))
if not bp or not ms:
return "Data files missing or corrupt.", 404
try: ai_setup.init_models()
except: pass
report = story_editor.analyze_consistency(bp, ms, book_path)
return render_template('consistency_report.html', report=report, run=run, book_folder=book_folder)
@run_bp.route('/project/<int:run_id>/sync_book/<string:book_folder>', methods=['POST'])
@login_required
def sync_book_metadata(run_id, book_folder):
run = db.session.get(Run, run_id) or Run.query.get_or_404(run_id)
if run.project.user_id != current_user.id: return "Unauthorized", 403
if run.status == 'running':
flash("Cannot sync metadata while run is active.")
return redirect(url_for('run.read_book', run_id=run_id, book_folder=book_folder))
if not book_folder or "/" in book_folder or "\\" in book_folder or ".." in book_folder: return "Invalid book folder", 400
run_dir = os.path.join(run.project.folder_path, "runs", f"run_{run.id}")
book_path = os.path.join(run_dir, book_folder)
ms_path = os.path.join(book_path, "manuscript.json")
bp_path = os.path.join(book_path, "final_blueprint.json")
if os.path.exists(ms_path) and os.path.exists(bp_path):
ms = utils.load_json(ms_path)
bp = utils.load_json(bp_path)
if not ms or not bp:
flash("Data files corrupt.")
return redirect(url_for('run.read_book', run_id=run_id, book_folder=book_folder))
try: ai_setup.init_models()
except: pass
bp = bible_tracker.harvest_metadata(bp, book_path, ms)
tracking_path = os.path.join(book_path, "tracking_characters.json")
if os.path.exists(tracking_path):
tracking_chars = utils.load_json(tracking_path) or {}
updated_tracking = False
for c in bp.get('characters', []):
if c.get('name') and c['name'] not in tracking_chars:
tracking_chars[c['name']] = {"descriptors": [c.get('description', '')], "likes_dislikes": [], "last_worn": "Unknown"}
updated_tracking = True
if updated_tracking:
with open(tracking_path, 'w') as f: json.dump(tracking_chars, f, indent=2)
style_persona.update_persona_sample(bp, book_path)
with open(bp_path, 'w') as f: json.dump(bp, f, indent=2)
flash("Metadata synced. Future generations will respect your edits.")
else:
flash("Files not found.")
return redirect(url_for('run.read_book', run_id=run_id, book_folder=book_folder))
@run_bp.route('/project/<int:run_id>/rewrite_chapter', methods=['POST'])
@login_required
def rewrite_chapter(run_id):
run = db.session.get(Run, run_id) or Run.query.get_or_404(run_id)
if run.project.user_id != current_user.id:
return {"error": "Unauthorized"}, 403
if run.status == 'running':
return {"error": "Cannot rewrite while run is active."}, 409
data = request.json
book_folder = data.get('book_folder')
chap_num = data.get('chapter_num')
instruction = data.get('instruction')
if not book_folder or chap_num is None or not instruction:
return {"error": "Missing parameters"}, 400
if "/" in book_folder or "\\" in book_folder or ".." in book_folder: return {"error": "Invalid book folder"}, 400
try: chap_num = int(chap_num)
except: pass
task = rewrite_chapter_task(run.id, run.project.folder_path, book_folder, chap_num, instruction)
session['rewrite_task_id'] = task.id
return {"status": "queued", "task_id": task.id}, 202
@run_bp.route('/task_status/<string:task_id>')
@login_required
def get_task_status(task_id):
try:
task_result = huey.result(task_id, preserve=True)
except Exception as e:
return {"status": "completed", "success": False, "error": str(e)}
if task_result is None:
return {"status": "running"}
else:
return {"status": "completed", "success": task_result}
@run_bp.route('/project/<int:run_id>/regenerate_artifacts', methods=['POST'])
@login_required
def regenerate_artifacts(run_id):
run = db.session.get(Run, run_id) or Run.query.get_or_404(run_id)
if run.project.user_id != current_user.id: return "Unauthorized", 403
if run.status == 'running':
flash("Run is already active. Please wait for it to finish.")
return redirect(url_for('run.view_run', id=run_id))
feedback = request.form.get('feedback')
run.status = 'queued'
db.session.commit()
regenerate_artifacts_task(run_id, run.project.folder_path, feedback=feedback)
flash("Regenerating cover and files with updated metadata...")
return redirect(url_for('run.view_run', id=run_id))

409
web/tasks.py Normal file
View File

@@ -0,0 +1,409 @@
import os
import json
import time
import sqlite3
import shutil
from datetime import datetime
from huey import SqliteHuey
from web.db import db, Run, User, Project
from core import utils, config
from ai import models as ai_models
from ai import setup as ai_setup
from story import bible_tracker
from marketing import cover as marketing_cover
from export import exporter
# Configure Huey (Task Queue)
huey = SqliteHuey('bookapp_queue', filename=os.path.join(config.DATA_DIR, 'queue.db'))
def db_log_callback(db_path, run_id, phase, msg):
"""Writes log entry directly to SQLite to avoid Flask Context issues in threads."""
for _ in range(5):
try:
with sqlite3.connect(db_path, timeout=5) as conn:
conn.execute("INSERT INTO log_entry (run_id, timestamp, phase, message) VALUES (?, ?, ?, ?)",
(run_id, datetime.utcnow(), phase, str(msg)))
break
except sqlite3.OperationalError:
time.sleep(0.1)
except: break
def db_progress_callback(db_path, run_id, percent):
"""Updates run progress in SQLite."""
for _ in range(5):
try:
with sqlite3.connect(db_path, timeout=5) as conn:
conn.execute("UPDATE run SET progress = ? WHERE id = ?", (percent, run_id))
break
except sqlite3.OperationalError: time.sleep(0.1)
except: break
@huey.task()
def generate_book_task(run_id, project_path, bible_path, allow_copy=True, feedback=None, source_run_id=None, keep_cover=False, exclude_folders=None):
"""
Background task to run the book generation.
"""
# 1. Setup Logging
log_filename = f"system_log_{run_id}.txt"
# Log to project root initially until run folder is created by engine
initial_log = os.path.join(project_path, log_filename)
utils.set_log_file(initial_log)
# Hook up Database Logging
db_path = os.path.join(config.DATA_DIR, "bookapp.db")
utils.set_log_callback(lambda p, m: db_log_callback(db_path, run_id, p, m))
utils.set_progress_callback(lambda p: db_progress_callback(db_path, run_id, p))
# Set Status to Running
try:
with sqlite3.connect(db_path, timeout=10) as conn:
conn.execute("UPDATE run SET status = 'running' WHERE id = ?", (run_id,))
except: pass
utils.log("SYSTEM", f"Starting Job #{run_id}")
try:
# 1.1 Handle Feedback / Modification (Re-run logic)
if feedback and source_run_id:
utils.log("SYSTEM", f"Applying feedback to Run #{source_run_id}: '{feedback}'")
bible_data = utils.load_json(bible_path)
if bible_data:
try:
ai_setup.init_models()
new_bible = bible_tracker.refine_bible(bible_data, feedback, project_path)
if new_bible:
bible_data = new_bible
with open(bible_path, 'w') as f: json.dump(bible_data, f, indent=2)
utils.log("SYSTEM", "Bible updated with feedback.")
except Exception as e:
utils.log("ERROR", f"Failed to refine bible: {e}")
# 1.2 Keep Cover Art Logic
if keep_cover:
source_run_dir = os.path.join(project_path, "runs", f"run_{source_run_id}")
if os.path.exists(source_run_dir):
utils.log("SYSTEM", "Attempting to preserve cover art...")
current_run_dir = os.path.join(project_path, "runs", f"run_{run_id}")
if not os.path.exists(current_run_dir): os.makedirs(current_run_dir)
source_books = {}
for d in os.listdir(source_run_dir):
if d.startswith("Book_") and os.path.isdir(os.path.join(source_run_dir, d)):
parts = d.split('_')
if len(parts) > 1 and parts[1].isdigit():
source_books[int(parts[1])] = os.path.join(source_run_dir, d)
if bible_data and 'books' in bible_data:
for i, book in enumerate(bible_data['books']):
b_num = book.get('book_number', i+1)
if b_num in source_books:
src_folder = source_books[b_num]
safe_title = utils.sanitize_filename(book.get('title', f"Book_{b_num}"))
target_folder = os.path.join(current_run_dir, f"Book_{b_num}_{safe_title}")
os.makedirs(target_folder, exist_ok=True)
src_cover = os.path.join(src_folder, "cover.png")
if os.path.exists(src_cover):
shutil.copy2(src_cover, os.path.join(target_folder, "cover.png"))
if os.path.exists(os.path.join(src_folder, "cover_art.png")):
shutil.copy2(os.path.join(src_folder, "cover_art.png"), os.path.join(target_folder, "cover_art.png"))
utils.log("SYSTEM", f" -> Copied cover for Book {b_num}")
# 1.5 Copy Forward Logic (Series Optimization)
is_series = False
if os.path.exists(bible_path):
bible_data = utils.load_json(bible_path)
if bible_data:
is_series = bible_data.get('project_metadata', {}).get('is_series', False)
runs_dir = os.path.join(project_path, "runs")
if allow_copy and is_series and os.path.exists(runs_dir):
all_runs = [d for d in os.listdir(runs_dir) if d.startswith("run_") and d != f"run_{run_id}"]
all_runs.sort(key=lambda x: int(x.split('_')[1]) if x.split('_')[1].isdigit() else 0)
if all_runs:
latest_run_dir = os.path.join(runs_dir, all_runs[-1])
current_run_dir = os.path.join(runs_dir, f"run_{run_id}")
os.makedirs(current_run_dir, exist_ok=True)
utils.log("SYSTEM", f"Checking previous run ({all_runs[-1]}) for completed books...")
for item in os.listdir(latest_run_dir):
if item.startswith("Book_") and os.path.isdir(os.path.join(latest_run_dir, item)):
if exclude_folders and item in exclude_folders:
utils.log("SYSTEM", f" -> Skipping copy of {item} (Target for revision).")
continue
if os.path.exists(os.path.join(latest_run_dir, item, "manuscript.json")):
src = os.path.join(latest_run_dir, item)
dst = os.path.join(current_run_dir, item)
try:
shutil.copytree(src, dst, dirs_exist_ok=True)
utils.log("SYSTEM", f" -> Copied {item} (Skipping generation).")
except Exception as e:
utils.log("SYSTEM", f" -> Failed to copy {item}: {e}")
# 2. Run Generation
from cli.engine import run_generation
run_generation(bible_path, specific_run_id=run_id)
utils.log("SYSTEM", "Job Complete.")
utils.update_progress(100)
status = "completed"
except Exception as e:
utils.log("ERROR", f"Job Failed: {e}")
status = "failed"
# 3. Calculate Cost & Cleanup
run_dir = os.path.join(project_path, "runs", f"run_{run_id}")
total_cost = 0.0
final_log_path = initial_log
if os.path.exists(run_dir):
final_log_path = os.path.join(run_dir, "web_console.log")
if os.path.exists(initial_log):
try:
os.rename(initial_log, final_log_path)
except OSError:
shutil.copy2(initial_log, final_log_path)
os.remove(initial_log)
for item in os.listdir(run_dir):
item_path = os.path.join(run_dir, item)
if os.path.isdir(item_path) and item.startswith("Book_"):
usage_path = os.path.join(item_path, "usage_log.json")
if os.path.exists(usage_path):
data = utils.load_json(usage_path)
total_cost += data.get('totals', {}).get('est_cost_usd', 0.0)
# 4. Update Database with Final Status
try:
with sqlite3.connect(db_path, timeout=10) as conn:
conn.execute("UPDATE run SET status = ?, cost = ?, end_time = ?, log_file = ?, progress = 100 WHERE id = ?",
(status, total_cost, datetime.utcnow(), final_log_path, run_id))
except Exception as e:
print(f"Failed to update run status in DB: {e}")
return {"run_id": run_id, "status": status, "cost": total_cost, "final_log": final_log_path}
@huey.task()
def regenerate_artifacts_task(run_id, project_path, feedback=None):
db_path = os.path.join(config.DATA_DIR, "bookapp.db")
run_dir = os.path.join(project_path, "runs", f"run_{run_id}")
log_file = os.path.join(run_dir, "web_console.log")
if not os.path.exists(run_dir):
log_file = os.path.join(project_path, f"system_log_{run_id}.txt")
try:
with open(log_file, 'w', encoding='utf-8') as f:
f.write(f"[{datetime.utcnow().strftime('%H:%M:%S')}] --- REGENERATION STARTED ---\n")
except: pass
utils.set_log_file(log_file)
utils.set_log_callback(lambda p, m: db_log_callback(db_path, run_id, p, m))
try:
with sqlite3.connect(db_path) as conn:
conn.execute("DELETE FROM log_entry WHERE run_id = ?", (run_id,))
conn.execute("UPDATE run SET status = 'running' WHERE id = ?", (run_id,))
except: pass
utils.log("SYSTEM", "Starting Artifact Regeneration...")
book_dir = run_dir
if os.path.exists(run_dir):
subdirs = utils.get_sorted_book_folders(run_dir)
if subdirs: book_dir = os.path.join(run_dir, subdirs[0])
bible_path = os.path.join(project_path, "bible.json")
if not os.path.exists(run_dir) or not os.path.exists(bible_path):
utils.log("ERROR", "Run directory or Bible not found.")
try:
with sqlite3.connect(db_path) as conn:
conn.execute("UPDATE run SET status = 'failed' WHERE id = ?", (run_id,))
except: pass
return
bible = utils.load_json(bible_path)
final_bp_path = os.path.join(book_dir, "final_blueprint.json")
ms_path = os.path.join(book_dir, "manuscript.json")
if not os.path.exists(final_bp_path) or not os.path.exists(ms_path):
utils.log("ERROR", f"Blueprint or Manuscript not found in {book_dir}")
try:
with sqlite3.connect(db_path) as conn:
conn.execute("UPDATE run SET status = 'failed' WHERE id = ?", (run_id,))
except: pass
return
bp = utils.load_json(final_bp_path)
ms = utils.load_json(ms_path)
meta = bible.get('project_metadata', {})
if 'book_metadata' in bp:
for k in ['author', 'genre', 'target_audience', 'style']:
if k in meta:
bp['book_metadata'][k] = meta[k]
if bp.get('series_metadata', {}).get('is_series'):
bp['series_metadata']['series_title'] = meta.get('title', bp['series_metadata'].get('series_title'))
b_num = bp['series_metadata'].get('book_number')
for b in bible.get('books', []):
if b.get('book_number') == b_num:
bp['book_metadata']['title'] = b.get('title', bp['book_metadata'].get('title'))
break
else:
bp['book_metadata']['title'] = meta.get('title', bp['book_metadata'].get('title'))
with open(final_bp_path, 'w') as f: json.dump(bp, f, indent=2)
try:
ai_setup.init_models()
tracking = None
events_path = os.path.join(book_dir, "tracking_events.json")
if os.path.exists(events_path):
tracking = {"events": utils.load_json(events_path), "characters": utils.load_json(os.path.join(book_dir, "tracking_characters.json"))}
marketing_cover.generate_cover(bp, book_dir, tracking, feedback=feedback)
exporter.compile_files(bp, ms, book_dir)
utils.log("SYSTEM", "Regeneration Complete.")
final_status = 'completed'
except Exception as e:
utils.log("ERROR", f"Regeneration Failed: {e}")
final_status = 'failed'
try:
with sqlite3.connect(db_path) as conn:
conn.execute("UPDATE run SET status = ? WHERE id = ?", (final_status, run_id))
except: pass
@huey.task()
def rewrite_chapter_task(run_id, project_path, book_folder, chap_num, instruction):
"""
Background task to rewrite a single chapter and propagate changes.
"""
try:
run_dir = os.path.join(project_path, "runs", f"run_{run_id}")
log_file = os.path.join(run_dir, "web_console.log")
if not os.path.exists(log_file):
log_file = os.path.join(project_path, f"system_log_{run_id}.txt")
try:
with open(log_file, 'w', encoding='utf-8') as f: f.write("")
except: pass
utils.set_log_file(log_file)
db_path = os.path.join(config.DATA_DIR, "bookapp.db")
utils.set_log_callback(lambda p, m: db_log_callback(db_path, run_id, p, m))
try:
with sqlite3.connect(db_path) as conn:
conn.execute("DELETE FROM log_entry WHERE run_id = ?", (run_id,))
conn.execute("UPDATE run SET status = 'running' WHERE id = ?", (run_id,))
except: pass
book_path = os.path.join(run_dir, book_folder)
ms_path = os.path.join(book_path, "manuscript.json")
bp_path = os.path.join(book_path, "final_blueprint.json")
if not (os.path.exists(ms_path) and os.path.exists(bp_path)):
utils.log("ERROR", f"Rewrite failed: files not found for run {run_id}/{book_folder}")
return False
ms = utils.load_json(ms_path)
bp = utils.load_json(bp_path)
ai_setup.init_models()
from story import editor as story_editor
result = story_editor.rewrite_chapter_content(bp, ms, chap_num, instruction, book_path)
if result and result[0]:
new_text, summary = result
for ch in ms:
if str(ch.get('num')) == str(chap_num):
ch['content'] = new_text
break
with open(ms_path, 'w') as f: json.dump(ms, f, indent=2)
updated_ms = story_editor.check_and_propagate(bp, ms, chap_num, book_path, change_summary=summary)
if updated_ms:
ms = updated_ms
with open(ms_path, 'w') as f: json.dump(ms, f, indent=2)
exporter.compile_files(bp, ms, book_path)
try:
with sqlite3.connect(db_path) as conn:
conn.execute("UPDATE run SET status = 'completed' WHERE id = ?", (run_id,))
except: pass
return True
try:
with sqlite3.connect(db_path) as conn:
conn.execute("UPDATE run SET status = 'completed' WHERE id = ?", (run_id,))
except: pass
return False
except Exception as e:
utils.log("ERROR", f"Rewrite task exception for run {run_id}/{book_folder}: {e}")
try:
with sqlite3.connect(db_path) as conn:
conn.execute("UPDATE run SET status = 'failed' WHERE id = ?", (run_id,))
except: pass
return False
@huey.task()
def refine_bible_task(project_path, instruction, source_type, selected_keys=None):
"""
Background task to refine the Bible.
Handles partial merging of selected keys into a temp base before refinement.
"""
try:
bible_path = os.path.join(project_path, "bible.json")
draft_path = os.path.join(project_path, "bible_draft.json")
lock_path = os.path.join(project_path, ".refining")
with open(lock_path, 'w') as f: f.write("running")
base_bible = utils.load_json(bible_path)
if not base_bible: return False
if source_type == 'draft' and os.path.exists(draft_path):
draft_bible = utils.load_json(draft_path)
if selected_keys is not None and draft_bible:
base_bible = bible_tracker.merge_selected_changes(base_bible, draft_bible, selected_keys)
elif draft_bible:
base_bible = draft_bible
ai_setup.init_models()
new_bible = bible_tracker.refine_bible(base_bible, instruction, project_path)
if new_bible:
with open(draft_path, 'w') as f: json.dump(new_bible, f, indent=2)
return True
return False
except Exception as e:
utils.log("ERROR", f"Bible refinement task failed: {e}")
return False
finally:
if os.path.exists(lock_path): os.remove(lock_path)