Files
bookapp/web/tasks.py
Mike Wichers 87f24d2bd8 Auto-commit: v2.11 — Fix live UI log feed (db_log_callback + run_status)
- web/tasks.py: db_log_callback bare `except: break` replaced with
  explicit `except Exception as _e: print(...)` so insertion failures
  are visible in Docker logs. Also fixed datetime.utcnow() → .isoformat()
  for clean string storage in SQLite.
  Same fix applied to db_progress_callback.

- web/routes/run.py (run_status): added db.session.expire_all() to
  force fresh reads; raw sqlite3 bypass query when ORM returns no rows;
  file fallback wrapped in try/except with stdout error reporting;
  secondary check for web_console.log inside the run directory;
  utf-8 encoding on all file opens.

- ai_blueprint.md: bumped to v2.11, documented root causes and fixes.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-21 15:28:27 -05:00

443 lines
18 KiB
Python

import os
import json
import time
import sqlite3
import shutil
from datetime import datetime
from huey import SqliteHuey
from web.db import db, Run, User, Project
from core import utils, config
from ai import models as ai_models
from ai import setup as ai_setup
from story import bible_tracker
from marketing import cover as marketing_cover
from export import exporter
# Configure Huey (Task Queue)
huey = SqliteHuey('bookapp_queue', filename=os.path.join(config.DATA_DIR, 'queue.db'))
def db_log_callback(db_path, run_id, phase, msg):
"""Writes log entry directly to SQLite to avoid Flask Context issues in threads."""
import sys as _sys
for _ in range(5):
try:
with sqlite3.connect(db_path, timeout=30, check_same_thread=False) as conn:
conn.execute("INSERT INTO log_entry (run_id, timestamp, phase, message) VALUES (?, ?, ?, ?)",
(run_id, datetime.utcnow().isoformat(), phase, str(msg)))
break
except sqlite3.OperationalError:
time.sleep(0.1)
except Exception as _e:
print(f"[db_log_callback ERROR run={run_id}] {type(_e).__name__}: {_e}", flush=True, file=_sys.stdout)
break
def db_progress_callback(db_path, run_id, percent):
"""Updates run progress in SQLite."""
import sys as _sys
for _ in range(5):
try:
with sqlite3.connect(db_path, timeout=30, check_same_thread=False) as conn:
conn.execute("UPDATE run SET progress = ? WHERE id = ?", (percent, run_id))
break
except sqlite3.OperationalError:
time.sleep(0.1)
except Exception as _e:
print(f"[db_progress_callback ERROR run={run_id}] {type(_e).__name__}: {_e}", flush=True, file=_sys.stdout)
break
@huey.task()
def generate_book_task(run_id, project_path, bible_path, allow_copy=True, feedback=None, source_run_id=None, keep_cover=False, exclude_folders=None):
"""
Background task to run the book generation.
"""
import sys as _sys
def _task_log(msg):
"""Print directly to stdout (docker logs) regardless of utils state."""
print(f"[TASK run={run_id}] {msg}", flush=True, file=_sys.stdout)
_task_log(f"Task picked up by Huey worker. project_path={project_path}")
# 1. Setup Logging
log_filename = f"system_log_{run_id}.txt"
# Log to project root initially until run folder is created by engine
initial_log = os.path.join(project_path, log_filename)
# Touch the file immediately so the UI has something to poll even if the
# worker crashes before the first utils.log() call.
try:
with open(initial_log, 'a', encoding='utf-8') as _f:
pass
_task_log(f"Log file created: {initial_log}")
except Exception as _e:
_task_log(f"WARNING: Could not touch log file {initial_log}: {_e}")
utils.set_log_file(initial_log)
# Hook up Database Logging
db_path = os.path.join(config.DATA_DIR, "bookapp.db")
utils.set_log_callback(lambda p, m: db_log_callback(db_path, run_id, p, m))
utils.set_progress_callback(lambda p: db_progress_callback(db_path, run_id, p))
# Set Status to Running
try:
with sqlite3.connect(db_path, timeout=30, check_same_thread=False) as conn:
conn.execute("UPDATE run SET status = 'running' WHERE id = ?", (run_id,))
_task_log("Run status set to 'running' in DB.")
except sqlite3.OperationalError as e:
_task_log(f"WARNING: DB locked when setting run status: {e}")
utils.log("SYSTEM", f"Database locked when setting run status (run {run_id}): {e}")
except Exception as e:
_task_log(f"WARNING: Could not set run status: {e}")
utils.log("SYSTEM", f"Starting Job #{run_id}")
try:
# 1.1 Handle Feedback / Modification (Re-run logic)
if feedback and source_run_id:
utils.log("SYSTEM", f"Applying feedback to Run #{source_run_id}: '{feedback}'")
bible_data = utils.load_json(bible_path)
if bible_data:
try:
ai_setup.init_models()
new_bible = bible_tracker.refine_bible(bible_data, feedback, project_path)
if new_bible:
bible_data = new_bible
with open(bible_path, 'w') as f: json.dump(bible_data, f, indent=2)
utils.log("SYSTEM", "Bible updated with feedback.")
except Exception as e:
utils.log("ERROR", f"Failed to refine bible: {e}")
# 1.2 Keep Cover Art Logic
if keep_cover:
source_run_dir = os.path.join(project_path, "runs", f"run_{source_run_id}")
if os.path.exists(source_run_dir):
utils.log("SYSTEM", "Attempting to preserve cover art...")
current_run_dir = os.path.join(project_path, "runs", f"run_{run_id}")
if not os.path.exists(current_run_dir): os.makedirs(current_run_dir)
source_books = {}
for d in os.listdir(source_run_dir):
if d.startswith("Book_") and os.path.isdir(os.path.join(source_run_dir, d)):
parts = d.split('_')
if len(parts) > 1 and parts[1].isdigit():
source_books[int(parts[1])] = os.path.join(source_run_dir, d)
if bible_data and 'books' in bible_data:
for i, book in enumerate(bible_data['books']):
b_num = book.get('book_number', i+1)
if b_num in source_books:
src_folder = source_books[b_num]
safe_title = utils.sanitize_filename(book.get('title', f"Book_{b_num}"))
target_folder = os.path.join(current_run_dir, f"Book_{b_num}_{safe_title}")
os.makedirs(target_folder, exist_ok=True)
src_cover = os.path.join(src_folder, "cover.png")
if os.path.exists(src_cover):
shutil.copy2(src_cover, os.path.join(target_folder, "cover.png"))
if os.path.exists(os.path.join(src_folder, "cover_art.png")):
shutil.copy2(os.path.join(src_folder, "cover_art.png"), os.path.join(target_folder, "cover_art.png"))
utils.log("SYSTEM", f" -> Copied cover for Book {b_num}")
# 1.5 Copy Forward Logic (Series Optimization)
is_series = False
if os.path.exists(bible_path):
bible_data = utils.load_json(bible_path)
if bible_data:
is_series = bible_data.get('project_metadata', {}).get('is_series', False)
runs_dir = os.path.join(project_path, "runs")
if allow_copy and is_series and os.path.exists(runs_dir):
all_runs = [d for d in os.listdir(runs_dir) if d.startswith("run_") and d != f"run_{run_id}"]
all_runs.sort(key=lambda x: int(x.split('_')[1]) if x.split('_')[1].isdigit() else 0)
if all_runs:
latest_run_dir = os.path.join(runs_dir, all_runs[-1])
current_run_dir = os.path.join(runs_dir, f"run_{run_id}")
os.makedirs(current_run_dir, exist_ok=True)
utils.log("SYSTEM", f"Checking previous run ({all_runs[-1]}) for completed books...")
for item in os.listdir(latest_run_dir):
if item.startswith("Book_") and os.path.isdir(os.path.join(latest_run_dir, item)):
if exclude_folders and item in exclude_folders:
utils.log("SYSTEM", f" -> Skipping copy of {item} (Target for revision).")
continue
if os.path.exists(os.path.join(latest_run_dir, item, "manuscript.json")):
src = os.path.join(latest_run_dir, item)
dst = os.path.join(current_run_dir, item)
try:
shutil.copytree(src, dst, dirs_exist_ok=True)
utils.log("SYSTEM", f" -> Copied {item} (Skipping generation).")
except Exception as e:
utils.log("SYSTEM", f" -> Failed to copy {item}: {e}")
# 2. Run Generation
from cli.engine import run_generation
run_generation(bible_path, specific_run_id=run_id)
utils.log("SYSTEM", "Job Complete.")
utils.update_progress(100)
status = "completed"
except Exception as e:
import traceback as _tb
_task_log(f"ERROR: Job failed — {type(e).__name__}: {e}")
_task_log(_tb.format_exc())
utils.log("ERROR", f"Job Failed: {e}")
status = "failed"
# 3. Calculate Cost & Cleanup
run_dir = os.path.join(project_path, "runs", f"run_{run_id}")
total_cost = 0.0
final_log_path = initial_log
if os.path.exists(run_dir):
final_log_path = os.path.join(run_dir, "web_console.log")
if os.path.exists(initial_log):
try:
os.rename(initial_log, final_log_path)
except OSError:
shutil.copy2(initial_log, final_log_path)
os.remove(initial_log)
for item in os.listdir(run_dir):
item_path = os.path.join(run_dir, item)
if os.path.isdir(item_path) and item.startswith("Book_"):
usage_path = os.path.join(item_path, "usage_log.json")
if os.path.exists(usage_path):
data = utils.load_json(usage_path)
total_cost += data.get('totals', {}).get('est_cost_usd', 0.0)
# 4. Update Database with Final Status
try:
with sqlite3.connect(db_path, timeout=30, check_same_thread=False) as conn:
conn.execute("UPDATE run SET status = ?, cost = ?, end_time = ?, log_file = ?, progress = 100 WHERE id = ?",
(status, total_cost, datetime.utcnow(), final_log_path, run_id))
except Exception as e:
print(f"Failed to update run status in DB: {e}")
_task_log(f"Task finished. status={status} cost=${total_cost:.4f}")
return {"run_id": run_id, "status": status, "cost": total_cost, "final_log": final_log_path}
@huey.task()
def regenerate_artifacts_task(run_id, project_path, feedback=None):
db_path = os.path.join(config.DATA_DIR, "bookapp.db")
run_dir = os.path.join(project_path, "runs", f"run_{run_id}")
log_file = os.path.join(run_dir, "web_console.log")
if not os.path.exists(run_dir):
log_file = os.path.join(project_path, f"system_log_{run_id}.txt")
try:
with open(log_file, 'w', encoding='utf-8') as f:
f.write(f"[{datetime.utcnow().strftime('%H:%M:%S')}] --- REGENERATION STARTED ---\n")
except: pass
utils.set_log_file(log_file)
utils.set_log_callback(lambda p, m: db_log_callback(db_path, run_id, p, m))
try:
with sqlite3.connect(db_path) as conn:
conn.execute("DELETE FROM log_entry WHERE run_id = ?", (run_id,))
conn.execute("UPDATE run SET status = 'running' WHERE id = ?", (run_id,))
except: pass
utils.log("SYSTEM", "Starting Artifact Regeneration...")
book_dir = run_dir
if os.path.exists(run_dir):
subdirs = utils.get_sorted_book_folders(run_dir)
if subdirs: book_dir = os.path.join(run_dir, subdirs[0])
bible_path = os.path.join(project_path, "bible.json")
if not os.path.exists(run_dir) or not os.path.exists(bible_path):
utils.log("ERROR", "Run directory or Bible not found.")
try:
with sqlite3.connect(db_path) as conn:
conn.execute("UPDATE run SET status = 'failed' WHERE id = ?", (run_id,))
except: pass
return
bible = utils.load_json(bible_path)
final_bp_path = os.path.join(book_dir, "final_blueprint.json")
ms_path = os.path.join(book_dir, "manuscript.json")
if not os.path.exists(final_bp_path) or not os.path.exists(ms_path):
utils.log("ERROR", f"Blueprint or Manuscript not found in {book_dir}")
try:
with sqlite3.connect(db_path) as conn:
conn.execute("UPDATE run SET status = 'failed' WHERE id = ?", (run_id,))
except: pass
return
bp = utils.load_json(final_bp_path)
ms = utils.load_json(ms_path)
meta = bible.get('project_metadata', {})
if 'book_metadata' in bp:
for k in ['author', 'genre', 'target_audience', 'style']:
if k in meta:
bp['book_metadata'][k] = meta[k]
if bp.get('series_metadata', {}).get('is_series'):
bp['series_metadata']['series_title'] = meta.get('title', bp['series_metadata'].get('series_title'))
b_num = bp['series_metadata'].get('book_number')
for b in bible.get('books', []):
if b.get('book_number') == b_num:
bp['book_metadata']['title'] = b.get('title', bp['book_metadata'].get('title'))
break
else:
bp['book_metadata']['title'] = meta.get('title', bp['book_metadata'].get('title'))
with open(final_bp_path, 'w') as f: json.dump(bp, f, indent=2)
try:
ai_setup.init_models()
tracking = None
events_path = os.path.join(book_dir, "tracking_events.json")
if os.path.exists(events_path):
tracking = {"events": utils.load_json(events_path), "characters": utils.load_json(os.path.join(book_dir, "tracking_characters.json"))}
marketing_cover.generate_cover(bp, book_dir, tracking, feedback=feedback)
exporter.compile_files(bp, ms, book_dir)
utils.log("SYSTEM", "Regeneration Complete.")
final_status = 'completed'
except Exception as e:
utils.log("ERROR", f"Regeneration Failed: {e}")
final_status = 'failed'
try:
with sqlite3.connect(db_path) as conn:
conn.execute("UPDATE run SET status = ? WHERE id = ?", (final_status, run_id))
except: pass
@huey.task()
def rewrite_chapter_task(run_id, project_path, book_folder, chap_num, instruction):
"""
Background task to rewrite a single chapter and propagate changes.
"""
try:
run_dir = os.path.join(project_path, "runs", f"run_{run_id}")
log_file = os.path.join(run_dir, "web_console.log")
if not os.path.exists(log_file):
log_file = os.path.join(project_path, f"system_log_{run_id}.txt")
try:
with open(log_file, 'w', encoding='utf-8') as f: f.write("")
except: pass
utils.set_log_file(log_file)
db_path = os.path.join(config.DATA_DIR, "bookapp.db")
utils.set_log_callback(lambda p, m: db_log_callback(db_path, run_id, p, m))
try:
with sqlite3.connect(db_path) as conn:
conn.execute("DELETE FROM log_entry WHERE run_id = ?", (run_id,))
conn.execute("UPDATE run SET status = 'running' WHERE id = ?", (run_id,))
except: pass
book_path = os.path.join(run_dir, book_folder)
ms_path = os.path.join(book_path, "manuscript.json")
bp_path = os.path.join(book_path, "final_blueprint.json")
if not (os.path.exists(ms_path) and os.path.exists(bp_path)):
utils.log("ERROR", f"Rewrite failed: files not found for run {run_id}/{book_folder}")
return False
ms = utils.load_json(ms_path)
bp = utils.load_json(bp_path)
ai_setup.init_models()
from story import editor as story_editor
result = story_editor.rewrite_chapter_content(bp, ms, chap_num, instruction, book_path)
if result and result[0]:
new_text, summary = result
for ch in ms:
if str(ch.get('num')) == str(chap_num):
ch['content'] = new_text
break
with open(ms_path, 'w') as f: json.dump(ms, f, indent=2)
updated_ms = story_editor.check_and_propagate(bp, ms, chap_num, book_path, change_summary=summary)
if updated_ms:
ms = updated_ms
with open(ms_path, 'w') as f: json.dump(ms, f, indent=2)
exporter.compile_files(bp, ms, book_path)
try:
with sqlite3.connect(db_path) as conn:
conn.execute("UPDATE run SET status = 'completed' WHERE id = ?", (run_id,))
except: pass
return True
try:
with sqlite3.connect(db_path) as conn:
conn.execute("UPDATE run SET status = 'completed' WHERE id = ?", (run_id,))
except: pass
return False
except Exception as e:
utils.log("ERROR", f"Rewrite task exception for run {run_id}/{book_folder}: {e}")
try:
with sqlite3.connect(db_path) as conn:
conn.execute("UPDATE run SET status = 'failed' WHERE id = ?", (run_id,))
except: pass
return False
@huey.task()
def refine_bible_task(project_path, instruction, source_type, selected_keys=None):
"""
Background task to refine the Bible.
Handles partial merging of selected keys into a temp base before refinement.
"""
try:
bible_path = os.path.join(project_path, "bible.json")
draft_path = os.path.join(project_path, "bible_draft.json")
lock_path = os.path.join(project_path, ".refining")
with open(lock_path, 'w') as f: f.write("running")
base_bible = utils.load_json(bible_path)
if not base_bible: return False
if source_type == 'draft' and os.path.exists(draft_path):
draft_bible = utils.load_json(draft_path)
if selected_keys is not None and draft_bible:
base_bible = bible_tracker.merge_selected_changes(base_bible, draft_bible, selected_keys)
elif draft_bible:
base_bible = draft_bible
ai_setup.init_models()
new_bible = bible_tracker.refine_bible(base_bible, instruction, project_path)
if new_bible:
with open(draft_path, 'w') as f: json.dump(new_bible, f, indent=2)
return True
return False
except Exception as e:
utils.log("ERROR", f"Bible refinement task failed: {e}")
return False
finally:
if os.path.exists(lock_path): os.remove(lock_path)