Files
bookapp/web/tasks.py
Mike Wichers d77ceb376d feat: Save bible snapshot alongside each run on start
Copies bible.json as bible_snapshot.json into the run folder before
generation begins, preserving the exact blueprint used for that run.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-22 13:29:55 -05:00

574 lines
25 KiB
Python

import os
import json
import time
import sqlite3
import shutil
from datetime import datetime
from huey import SqliteHuey
from web.db import db, Run, User, Project
from core import utils, config
from ai import models as ai_models
from ai import setup as ai_setup
from story import bible_tracker
from marketing import cover as marketing_cover
from export import exporter
# Configure Huey (Task Queue)
huey = SqliteHuey('bookapp_queue', filename=os.path.join(config.DATA_DIR, 'queue.db'))
def _robust_update_run_status(db_path, run_id, status, retries=5, **extra_cols):
"""Update run status with exponential-backoff retry. Raises RuntimeError if all retries fail."""
import sys as _sys
cols = {"status": status}
cols.update(extra_cols)
set_clause = ", ".join(f"{k} = ?" for k in cols)
values = list(cols.values()) + [run_id]
for attempt in range(retries):
try:
with sqlite3.connect(db_path, timeout=30, check_same_thread=False) as conn:
conn.execute(f"UPDATE run SET {set_clause} WHERE id = ?", values)
return
except sqlite3.OperationalError as e:
wait = attempt + 1
print(f"[DB WARN run={run_id}] Status update locked (attempt {attempt+1}/{retries}), retry in {wait}s: {e}", flush=True, file=_sys.stdout)
time.sleep(wait)
except Exception as e:
print(f"[DB ERROR run={run_id}] Unexpected error on status update: {type(e).__name__}: {e}", flush=True, file=_sys.stdout)
raise
msg = f"[DB CRITICAL run={run_id}] Failed to update status='{status}' after {retries} attempts."
print(msg, flush=True, file=_sys.stdout)
raise RuntimeError(msg)
def db_heartbeat_callback(db_path, run_id):
"""Updates last_heartbeat timestamp for the run in SQLite."""
import sys as _sys
for _ in range(3):
try:
with sqlite3.connect(db_path, timeout=10, check_same_thread=False) as conn:
conn.execute("UPDATE run SET last_heartbeat = ? WHERE id = ?",
(datetime.utcnow().isoformat(), run_id))
return
except sqlite3.OperationalError:
time.sleep(0.2)
except Exception as _e:
print(f"[db_heartbeat ERROR run={run_id}] {type(_e).__name__}: {_e}", flush=True, file=_sys.stdout)
return
def db_log_callback(db_path, run_id, phase, msg):
"""Writes log entry directly to SQLite to avoid Flask Context issues in threads."""
import sys as _sys
for _ in range(5):
try:
with sqlite3.connect(db_path, timeout=30, check_same_thread=False) as conn:
conn.execute("INSERT INTO log_entry (run_id, timestamp, phase, message) VALUES (?, ?, ?, ?)",
(run_id, datetime.utcnow().isoformat(), phase, str(msg)))
break
except sqlite3.OperationalError:
time.sleep(0.1)
except Exception as _e:
print(f"[db_log_callback ERROR run={run_id}] {type(_e).__name__}: {_e}", flush=True, file=_sys.stdout)
try:
import os as _os
from core import config as _cfg
_app_log = _os.path.join(_cfg.DATA_DIR, "app.log")
with open(_app_log, 'a', encoding='utf-8') as _f:
_f.write(f"[db_log_callback ERROR run={run_id}] {type(_e).__name__}: {_e}\n")
except Exception:
pass
break
def db_progress_callback(db_path, run_id, percent):
"""Updates run progress in SQLite."""
import sys as _sys
for _ in range(5):
try:
with sqlite3.connect(db_path, timeout=30, check_same_thread=False) as conn:
conn.execute("UPDATE run SET progress = ? WHERE id = ?", (percent, run_id))
break
except sqlite3.OperationalError:
time.sleep(0.1)
except Exception as _e:
print(f"[db_progress_callback ERROR run={run_id}] {type(_e).__name__}: {_e}", flush=True, file=_sys.stdout)
break
@huey.task()
def generate_book_task(run_id, project_path, bible_path, allow_copy=True, feedback=None, source_run_id=None, keep_cover=False, exclude_folders=None):
"""
Background task to run the book generation.
"""
import sys as _sys
def _task_log(msg):
"""Print directly to stdout (docker logs) regardless of utils state."""
print(f"[TASK run={run_id}] {msg}", flush=True, file=_sys.stdout)
_task_log(f"Task picked up by Huey worker. project_path={project_path}")
# 0. Orphaned Job Guard — verify that all required resources exist before
# doing any work. If a run, project folder, or bible is missing, terminate
# silently and mark the run as failed to prevent data being written to the
# wrong book or project.
db_path_early = os.path.join(config.DATA_DIR, "bookapp.db")
try:
with sqlite3.connect(db_path_early, timeout=10) as _conn:
_row = _conn.execute("SELECT id FROM run WHERE id = ?", (run_id,)).fetchone()
if not _row:
_task_log(f"ABORT: Run #{run_id} no longer exists in DB. Terminating silently.")
return
except Exception as _e:
_task_log(f"WARNING: Could not verify run #{run_id} existence: {_e}")
if not os.path.isdir(project_path):
_task_log(f"ABORT: Project folder missing ({project_path}). Marking run #{run_id} as failed.")
try:
_robust_update_run_status(db_path_early, run_id, 'failed',
end_time=datetime.utcnow().isoformat())
except Exception: pass
return
if not os.path.isfile(bible_path):
_task_log(f"ABORT: Bible file missing ({bible_path}). Marking run #{run_id} as failed.")
try:
_robust_update_run_status(db_path_early, run_id, 'failed',
end_time=datetime.utcnow().isoformat())
except Exception: pass
return
# Validate that the bible has at least one book entry
try:
with open(bible_path, 'r', encoding='utf-8') as _bf:
_bible_check = json.load(_bf)
if not _bible_check.get('books'):
_task_log(f"ABORT: Bible has no books defined. Marking run #{run_id} as failed.")
try:
_robust_update_run_status(db_path_early, run_id, 'failed',
end_time=datetime.utcnow().isoformat())
except Exception: pass
return
except Exception as _e:
_task_log(f"ABORT: Could not parse bible ({bible_path}): {_e}. Marking run #{run_id} as failed.")
try:
_robust_update_run_status(db_path_early, run_id, 'failed',
end_time=datetime.utcnow().isoformat())
except Exception: pass
return
# 1. Setup Logging
log_filename = f"system_log_{run_id}.txt"
# Log to project root initially until run folder is created by engine
initial_log = os.path.join(project_path, log_filename)
# Touch the file immediately so the UI has something to poll even if the
# worker crashes before the first utils.log() call.
try:
with open(initial_log, 'a', encoding='utf-8') as _f:
pass
_task_log(f"Log file created: {initial_log}")
except Exception as _e:
_task_log(f"WARNING: Could not touch log file {initial_log}: {_e}")
utils.set_log_file(initial_log)
# Hook up Database Logging
db_path = os.path.join(config.DATA_DIR, "bookapp.db")
utils.set_log_callback(lambda p, m: db_log_callback(db_path, run_id, p, m))
utils.set_progress_callback(lambda p: db_progress_callback(db_path, run_id, p))
utils.set_heartbeat_callback(lambda: db_heartbeat_callback(db_path, run_id))
# Set Status to Running (with start_time and initial heartbeat)
try:
_robust_update_run_status(db_path, run_id, 'running',
start_time=datetime.utcnow().isoformat(),
last_heartbeat=datetime.utcnow().isoformat())
_task_log("Run status set to 'running' in DB.")
except Exception as e:
_task_log(f"WARNING: Could not set run status to 'running': {e}")
utils.log("SYSTEM", f"WARNING: run status update failed (run {run_id}): {e}")
utils.log("SYSTEM", f"Starting Job #{run_id}")
status = "failed" # Default to failed; overwritten to "completed" only on clean success
total_cost = 0.0
final_log_path = initial_log
try:
# 1.1 Handle Feedback / Modification (Re-run logic)
if feedback and source_run_id:
utils.log("SYSTEM", f"Applying feedback to Run #{source_run_id}: '{feedback}'")
bible_data = utils.load_json(bible_path)
if bible_data:
try:
ai_setup.init_models()
new_bible = bible_tracker.refine_bible(bible_data, feedback, project_path)
if new_bible:
bible_data = new_bible
with open(bible_path, 'w') as f: json.dump(bible_data, f, indent=2)
utils.log("SYSTEM", "Bible updated with feedback.")
except Exception as e:
utils.log("ERROR", f"Failed to refine bible: {e}")
# 1.2 Keep Cover Art Logic
if keep_cover:
source_run_dir = os.path.join(project_path, "runs", f"run_{source_run_id}")
if os.path.exists(source_run_dir):
utils.log("SYSTEM", "Attempting to preserve cover art...")
current_run_dir = os.path.join(project_path, "runs", f"run_{run_id}")
if not os.path.exists(current_run_dir): os.makedirs(current_run_dir)
source_books = {}
for d in os.listdir(source_run_dir):
if d.startswith("Book_") and os.path.isdir(os.path.join(source_run_dir, d)):
parts = d.split('_')
if len(parts) > 1 and parts[1].isdigit():
source_books[int(parts[1])] = os.path.join(source_run_dir, d)
if bible_data and 'books' in bible_data:
for i, book in enumerate(bible_data['books']):
b_num = book.get('book_number', i+1)
if b_num in source_books:
src_folder = source_books[b_num]
safe_title = utils.sanitize_filename(book.get('title', f"Book_{b_num}"))
target_folder = os.path.join(current_run_dir, f"Book_{b_num}_{safe_title}")
os.makedirs(target_folder, exist_ok=True)
src_cover = os.path.join(src_folder, "cover.png")
if os.path.exists(src_cover):
shutil.copy2(src_cover, os.path.join(target_folder, "cover.png"))
if os.path.exists(os.path.join(src_folder, "cover_art.png")):
shutil.copy2(os.path.join(src_folder, "cover_art.png"), os.path.join(target_folder, "cover_art.png"))
utils.log("SYSTEM", f" -> Copied cover for Book {b_num}")
# 1.5 Copy Forward Logic (Series Optimization)
is_series = False
if os.path.exists(bible_path):
bible_data = utils.load_json(bible_path)
if bible_data:
is_series = bible_data.get('project_metadata', {}).get('is_series', False)
runs_dir = os.path.join(project_path, "runs")
if allow_copy and is_series and os.path.exists(runs_dir):
all_runs = [d for d in os.listdir(runs_dir) if d.startswith("run_") and d != f"run_{run_id}"]
all_runs.sort(key=lambda x: int(x.split('_')[1]) if x.split('_')[1].isdigit() else 0)
if all_runs:
latest_run_dir = os.path.join(runs_dir, all_runs[-1])
current_run_dir = os.path.join(runs_dir, f"run_{run_id}")
os.makedirs(current_run_dir, exist_ok=True)
utils.log("SYSTEM", f"Checking previous run ({all_runs[-1]}) for completed books...")
for item in os.listdir(latest_run_dir):
if item.startswith("Book_") and os.path.isdir(os.path.join(latest_run_dir, item)):
if exclude_folders and item in exclude_folders:
utils.log("SYSTEM", f" -> Skipping copy of {item} (Target for revision).")
continue
if os.path.exists(os.path.join(latest_run_dir, item, "manuscript.json")):
src = os.path.join(latest_run_dir, item)
dst = os.path.join(current_run_dir, item)
try:
shutil.copytree(src, dst, dirs_exist_ok=True)
utils.log("SYSTEM", f" -> Copied {item} (Skipping generation).")
except Exception as e:
utils.log("SYSTEM", f" -> Failed to copy {item}: {e}")
# 2. Save Bible Snapshot alongside this run
run_dir_early = os.path.join(project_path, "runs", f"run_{run_id}")
os.makedirs(run_dir_early, exist_ok=True)
if os.path.exists(bible_path):
snapshot_path = os.path.join(run_dir_early, "bible_snapshot.json")
try:
shutil.copy2(bible_path, snapshot_path)
utils.log("SYSTEM", f"Bible snapshot saved to run folder.")
except Exception as _e:
utils.log("SYSTEM", f"WARNING: Could not save bible snapshot: {_e}")
# 3. Run Generation
from cli.engine import run_generation
run_generation(bible_path, specific_run_id=run_id)
utils.log("SYSTEM", "Job Complete.")
utils.update_progress(100)
status = "completed"
except Exception as e:
import traceback as _tb
_task_log(f"ERROR: Job failed — {type(e).__name__}: {e}")
_task_log(_tb.format_exc())
utils.log("ERROR", f"Job Failed: {e}")
# status remains "failed" (set before try block)
finally:
# 3. Calculate Cost & Cleanup — guaranteed to run even if worker crashes
run_dir = os.path.join(project_path, "runs", f"run_{run_id}")
if os.path.exists(run_dir):
final_log_path = os.path.join(run_dir, "web_console.log")
if os.path.exists(initial_log):
try:
os.rename(initial_log, final_log_path)
except OSError:
shutil.copy2(initial_log, final_log_path)
os.remove(initial_log)
for item in os.listdir(run_dir):
item_path = os.path.join(run_dir, item)
if os.path.isdir(item_path) and item.startswith("Book_"):
usage_path = os.path.join(item_path, "usage_log.json")
if os.path.exists(usage_path):
data = utils.load_json(usage_path)
total_cost += data.get('totals', {}).get('est_cost_usd', 0.0)
# 4. Update Database with Final Status — run is never left in 'running' state
try:
_robust_update_run_status(db_path, run_id, status,
cost=total_cost,
end_time=datetime.utcnow().isoformat(),
log_file=final_log_path,
progress=100)
except Exception as e:
print(f"[CRITICAL run={run_id}] Final status update failed after all retries: {e}", flush=True)
_task_log(f"Task finished. status={status} cost=${total_cost:.4f}")
return {"run_id": run_id, "status": status, "cost": total_cost, "final_log": final_log_path}
@huey.task()
def regenerate_artifacts_task(run_id, project_path, feedback=None):
db_path = os.path.join(config.DATA_DIR, "bookapp.db")
run_dir = os.path.join(project_path, "runs", f"run_{run_id}")
log_file = os.path.join(run_dir, "web_console.log")
if not os.path.exists(run_dir):
log_file = os.path.join(project_path, f"system_log_{run_id}.txt")
try:
with open(log_file, 'w', encoding='utf-8') as f:
f.write(f"[{datetime.utcnow().strftime('%H:%M:%S')}] --- REGENERATION STARTED ---\n")
except: pass
utils.set_log_file(log_file)
utils.set_log_callback(lambda p, m: db_log_callback(db_path, run_id, p, m))
try:
with sqlite3.connect(db_path, timeout=30, check_same_thread=False) as conn:
conn.execute("DELETE FROM log_entry WHERE run_id = ?", (run_id,))
except Exception as _e:
print(f"[WARN run={run_id}] Could not clear log_entry for regen: {_e}", flush=True)
try:
_robust_update_run_status(db_path, run_id, 'running',
start_time=datetime.utcnow().isoformat(),
last_heartbeat=datetime.utcnow().isoformat())
except Exception as _e:
print(f"[WARN run={run_id}] Could not set status to 'running' for regen: {_e}", flush=True)
utils.log("SYSTEM", "Starting Artifact Regeneration...")
book_dir = run_dir
if os.path.exists(run_dir):
subdirs = utils.get_sorted_book_folders(run_dir)
if subdirs: book_dir = os.path.join(run_dir, subdirs[0])
bible_path = os.path.join(project_path, "bible.json")
if not os.path.exists(run_dir) or not os.path.exists(bible_path):
utils.log("ERROR", "Run directory or Bible not found.")
try:
_robust_update_run_status(db_path, run_id, 'failed')
except Exception as _e:
print(f"[WARN run={run_id}] Could not set status to 'failed': {_e}", flush=True)
return
bible = utils.load_json(bible_path)
final_bp_path = os.path.join(book_dir, "final_blueprint.json")
ms_path = os.path.join(book_dir, "manuscript.json")
if not os.path.exists(final_bp_path) or not os.path.exists(ms_path):
utils.log("ERROR", f"Blueprint or Manuscript not found in {book_dir}")
try:
_robust_update_run_status(db_path, run_id, 'failed')
except Exception as _e:
print(f"[WARN run={run_id}] Could not set status to 'failed': {_e}", flush=True)
return
bp = utils.load_json(final_bp_path)
ms = utils.load_json(ms_path)
meta = bible.get('project_metadata', {})
if 'book_metadata' in bp:
for k in ['author', 'genre', 'target_audience', 'style']:
if k in meta:
bp['book_metadata'][k] = meta[k]
if bp.get('series_metadata', {}).get('is_series'):
bp['series_metadata']['series_title'] = meta.get('title', bp['series_metadata'].get('series_title'))
b_num = bp['series_metadata'].get('book_number')
for b in bible.get('books', []):
if b.get('book_number') == b_num:
bp['book_metadata']['title'] = b.get('title', bp['book_metadata'].get('title'))
break
else:
bp['book_metadata']['title'] = meta.get('title', bp['book_metadata'].get('title'))
with open(final_bp_path, 'w') as f: json.dump(bp, f, indent=2)
try:
ai_setup.init_models()
tracking = None
events_path = os.path.join(book_dir, "tracking_events.json")
if os.path.exists(events_path):
tracking = {"events": utils.load_json(events_path), "characters": utils.load_json(os.path.join(book_dir, "tracking_characters.json"))}
marketing_cover.generate_cover(bp, book_dir, tracking, feedback=feedback)
exporter.compile_files(bp, ms, book_dir)
utils.log("SYSTEM", "Regeneration Complete.")
final_status = 'completed'
except Exception as e:
utils.log("ERROR", f"Regeneration Failed: {e}")
final_status = 'failed'
try:
_robust_update_run_status(db_path, run_id, final_status)
except Exception as _e:
print(f"[CRITICAL run={run_id}] Final regen status update failed: {_e}", flush=True)
@huey.task()
def rewrite_chapter_task(run_id, project_path, book_folder, chap_num, instruction):
"""
Background task to rewrite a single chapter and propagate changes.
"""
db_path = os.path.join(config.DATA_DIR, "bookapp.db")
try:
run_dir = os.path.join(project_path, "runs", f"run_{run_id}")
log_file = os.path.join(run_dir, "web_console.log")
if not os.path.exists(log_file):
log_file = os.path.join(project_path, f"system_log_{run_id}.txt")
try:
with open(log_file, 'w', encoding='utf-8') as f: f.write("")
except: pass
utils.set_log_file(log_file)
utils.set_log_callback(lambda p, m: db_log_callback(db_path, run_id, p, m))
try:
with sqlite3.connect(db_path, timeout=30, check_same_thread=False) as conn:
conn.execute("DELETE FROM log_entry WHERE run_id = ?", (run_id,))
except Exception as _e:
print(f"[WARN run={run_id}] Could not clear log_entry for rewrite: {_e}", flush=True)
try:
_robust_update_run_status(db_path, run_id, 'running',
start_time=datetime.utcnow().isoformat(),
last_heartbeat=datetime.utcnow().isoformat())
except Exception as _e:
print(f"[WARN run={run_id}] Could not set status to 'running' for rewrite: {_e}", flush=True)
book_path = os.path.join(run_dir, book_folder)
ms_path = os.path.join(book_path, "manuscript.json")
bp_path = os.path.join(book_path, "final_blueprint.json")
if not (os.path.exists(ms_path) and os.path.exists(bp_path)):
utils.log("ERROR", f"Rewrite failed: files not found for run {run_id}/{book_folder}")
return False
ms = utils.load_json(ms_path)
bp = utils.load_json(bp_path)
ai_setup.init_models()
from story import editor as story_editor
result = story_editor.rewrite_chapter_content(bp, ms, chap_num, instruction, book_path)
if result and result[0]:
new_text, summary = result
for ch in ms:
if str(ch.get('num')) == str(chap_num):
ch['content'] = new_text
break
with open(ms_path, 'w') as f: json.dump(ms, f, indent=2)
updated_ms = story_editor.check_and_propagate(bp, ms, chap_num, book_path, change_summary=summary)
if updated_ms:
ms = updated_ms
with open(ms_path, 'w') as f: json.dump(ms, f, indent=2)
exporter.compile_files(bp, ms, book_path)
try:
_robust_update_run_status(db_path, run_id, 'completed',
end_time=datetime.utcnow().isoformat())
except Exception as _e:
print(f"[WARN run={run_id}] Could not set status to 'completed': {_e}", flush=True)
return True
try:
_robust_update_run_status(db_path, run_id, 'completed',
end_time=datetime.utcnow().isoformat())
except Exception as _e:
print(f"[WARN run={run_id}] Could not set status to 'completed': {_e}", flush=True)
return False
except Exception as e:
utils.log("ERROR", f"Rewrite task exception for run {run_id}/{book_folder}: {e}")
try:
_robust_update_run_status(db_path, run_id, 'failed',
end_time=datetime.utcnow().isoformat())
except Exception as _e:
print(f"[CRITICAL run={run_id}] Could not set status to 'failed' after rewrite error: {_e}", flush=True)
return False
@huey.task()
def refine_bible_task(project_path, instruction, source_type, selected_keys=None):
"""
Background task to refine the Bible.
Handles partial merging of selected keys into a temp base before refinement.
"""
try:
bible_path = os.path.join(project_path, "bible.json")
draft_path = os.path.join(project_path, "bible_draft.json")
lock_path = os.path.join(project_path, ".refining")
with open(lock_path, 'w') as f: f.write("running")
base_bible = utils.load_json(bible_path)
if not base_bible: return False
if source_type == 'draft' and os.path.exists(draft_path):
draft_bible = utils.load_json(draft_path)
if selected_keys is not None and draft_bible:
base_bible = bible_tracker.merge_selected_changes(base_bible, draft_bible, selected_keys)
elif draft_bible:
base_bible = draft_bible
ai_setup.init_models()
new_bible = bible_tracker.refine_bible(base_bible, instruction, project_path)
if new_bible:
with open(draft_path, 'w') as f: json.dump(new_bible, f, indent=2)
return True
return False
except Exception as e:
utils.log("ERROR", f"Bible refinement task failed: {e}")
return False
finally:
if os.path.exists(lock_path): os.remove(lock_path)