Files
bookapp/web/tasks.py
Mike Wichers 0d4b9b761b Auto-commit: v2.10 — Docker diagnostic logging for consumer & task execution
- web/app.py: Startup banner to docker logs (Python version, platform,
  Huey version, DB paths). All print() calls now flush=True so Docker
  captures them immediately. Emoji-free for robust stdout encoding.
  Startup now detects orphaned queued runs (queue empty but DB queued)
  and resets them to 'failed' so the UI does not stay stuck on reload.
  Huey logging configured at INFO level so task pick-up/completion
  appears in `docker logs`. Consumer skip reason logged explicitly.
- web/tasks.py: generate_book_task now emits [TASK run=N] lines to
  stdout (docker logs) at pick-up, log-file creation, DB status update,
  and on error (with full traceback) so failures are always visible.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-21 12:05:07 -05:00

436 lines
18 KiB
Python

import os
import json
import time
import sqlite3
import shutil
from datetime import datetime
from huey import SqliteHuey
from web.db import db, Run, User, Project
from core import utils, config
from ai import models as ai_models
from ai import setup as ai_setup
from story import bible_tracker
from marketing import cover as marketing_cover
from export import exporter
# Configure Huey (Task Queue)
huey = SqliteHuey('bookapp_queue', filename=os.path.join(config.DATA_DIR, 'queue.db'))
def db_log_callback(db_path, run_id, phase, msg):
"""Writes log entry directly to SQLite to avoid Flask Context issues in threads."""
for _ in range(5):
try:
with sqlite3.connect(db_path, timeout=30, check_same_thread=False) as conn:
conn.execute("INSERT INTO log_entry (run_id, timestamp, phase, message) VALUES (?, ?, ?, ?)",
(run_id, datetime.utcnow(), phase, str(msg)))
break
except sqlite3.OperationalError:
time.sleep(0.1)
except: break
def db_progress_callback(db_path, run_id, percent):
"""Updates run progress in SQLite."""
for _ in range(5):
try:
with sqlite3.connect(db_path, timeout=30, check_same_thread=False) as conn:
conn.execute("UPDATE run SET progress = ? WHERE id = ?", (percent, run_id))
break
except sqlite3.OperationalError: time.sleep(0.1)
except: break
@huey.task()
def generate_book_task(run_id, project_path, bible_path, allow_copy=True, feedback=None, source_run_id=None, keep_cover=False, exclude_folders=None):
"""
Background task to run the book generation.
"""
import sys as _sys
def _task_log(msg):
"""Print directly to stdout (docker logs) regardless of utils state."""
print(f"[TASK run={run_id}] {msg}", flush=True, file=_sys.stdout)
_task_log(f"Task picked up by Huey worker. project_path={project_path}")
# 1. Setup Logging
log_filename = f"system_log_{run_id}.txt"
# Log to project root initially until run folder is created by engine
initial_log = os.path.join(project_path, log_filename)
# Touch the file immediately so the UI has something to poll even if the
# worker crashes before the first utils.log() call.
try:
with open(initial_log, 'a', encoding='utf-8') as _f:
pass
_task_log(f"Log file created: {initial_log}")
except Exception as _e:
_task_log(f"WARNING: Could not touch log file {initial_log}: {_e}")
utils.set_log_file(initial_log)
# Hook up Database Logging
db_path = os.path.join(config.DATA_DIR, "bookapp.db")
utils.set_log_callback(lambda p, m: db_log_callback(db_path, run_id, p, m))
utils.set_progress_callback(lambda p: db_progress_callback(db_path, run_id, p))
# Set Status to Running
try:
with sqlite3.connect(db_path, timeout=30, check_same_thread=False) as conn:
conn.execute("UPDATE run SET status = 'running' WHERE id = ?", (run_id,))
_task_log("Run status set to 'running' in DB.")
except sqlite3.OperationalError as e:
_task_log(f"WARNING: DB locked when setting run status: {e}")
utils.log("SYSTEM", f"Database locked when setting run status (run {run_id}): {e}")
except Exception as e:
_task_log(f"WARNING: Could not set run status: {e}")
utils.log("SYSTEM", f"Starting Job #{run_id}")
try:
# 1.1 Handle Feedback / Modification (Re-run logic)
if feedback and source_run_id:
utils.log("SYSTEM", f"Applying feedback to Run #{source_run_id}: '{feedback}'")
bible_data = utils.load_json(bible_path)
if bible_data:
try:
ai_setup.init_models()
new_bible = bible_tracker.refine_bible(bible_data, feedback, project_path)
if new_bible:
bible_data = new_bible
with open(bible_path, 'w') as f: json.dump(bible_data, f, indent=2)
utils.log("SYSTEM", "Bible updated with feedback.")
except Exception as e:
utils.log("ERROR", f"Failed to refine bible: {e}")
# 1.2 Keep Cover Art Logic
if keep_cover:
source_run_dir = os.path.join(project_path, "runs", f"run_{source_run_id}")
if os.path.exists(source_run_dir):
utils.log("SYSTEM", "Attempting to preserve cover art...")
current_run_dir = os.path.join(project_path, "runs", f"run_{run_id}")
if not os.path.exists(current_run_dir): os.makedirs(current_run_dir)
source_books = {}
for d in os.listdir(source_run_dir):
if d.startswith("Book_") and os.path.isdir(os.path.join(source_run_dir, d)):
parts = d.split('_')
if len(parts) > 1 and parts[1].isdigit():
source_books[int(parts[1])] = os.path.join(source_run_dir, d)
if bible_data and 'books' in bible_data:
for i, book in enumerate(bible_data['books']):
b_num = book.get('book_number', i+1)
if b_num in source_books:
src_folder = source_books[b_num]
safe_title = utils.sanitize_filename(book.get('title', f"Book_{b_num}"))
target_folder = os.path.join(current_run_dir, f"Book_{b_num}_{safe_title}")
os.makedirs(target_folder, exist_ok=True)
src_cover = os.path.join(src_folder, "cover.png")
if os.path.exists(src_cover):
shutil.copy2(src_cover, os.path.join(target_folder, "cover.png"))
if os.path.exists(os.path.join(src_folder, "cover_art.png")):
shutil.copy2(os.path.join(src_folder, "cover_art.png"), os.path.join(target_folder, "cover_art.png"))
utils.log("SYSTEM", f" -> Copied cover for Book {b_num}")
# 1.5 Copy Forward Logic (Series Optimization)
is_series = False
if os.path.exists(bible_path):
bible_data = utils.load_json(bible_path)
if bible_data:
is_series = bible_data.get('project_metadata', {}).get('is_series', False)
runs_dir = os.path.join(project_path, "runs")
if allow_copy and is_series and os.path.exists(runs_dir):
all_runs = [d for d in os.listdir(runs_dir) if d.startswith("run_") and d != f"run_{run_id}"]
all_runs.sort(key=lambda x: int(x.split('_')[1]) if x.split('_')[1].isdigit() else 0)
if all_runs:
latest_run_dir = os.path.join(runs_dir, all_runs[-1])
current_run_dir = os.path.join(runs_dir, f"run_{run_id}")
os.makedirs(current_run_dir, exist_ok=True)
utils.log("SYSTEM", f"Checking previous run ({all_runs[-1]}) for completed books...")
for item in os.listdir(latest_run_dir):
if item.startswith("Book_") and os.path.isdir(os.path.join(latest_run_dir, item)):
if exclude_folders and item in exclude_folders:
utils.log("SYSTEM", f" -> Skipping copy of {item} (Target for revision).")
continue
if os.path.exists(os.path.join(latest_run_dir, item, "manuscript.json")):
src = os.path.join(latest_run_dir, item)
dst = os.path.join(current_run_dir, item)
try:
shutil.copytree(src, dst, dirs_exist_ok=True)
utils.log("SYSTEM", f" -> Copied {item} (Skipping generation).")
except Exception as e:
utils.log("SYSTEM", f" -> Failed to copy {item}: {e}")
# 2. Run Generation
from cli.engine import run_generation
run_generation(bible_path, specific_run_id=run_id)
utils.log("SYSTEM", "Job Complete.")
utils.update_progress(100)
status = "completed"
except Exception as e:
import traceback as _tb
_task_log(f"ERROR: Job failed — {type(e).__name__}: {e}")
_task_log(_tb.format_exc())
utils.log("ERROR", f"Job Failed: {e}")
status = "failed"
# 3. Calculate Cost & Cleanup
run_dir = os.path.join(project_path, "runs", f"run_{run_id}")
total_cost = 0.0
final_log_path = initial_log
if os.path.exists(run_dir):
final_log_path = os.path.join(run_dir, "web_console.log")
if os.path.exists(initial_log):
try:
os.rename(initial_log, final_log_path)
except OSError:
shutil.copy2(initial_log, final_log_path)
os.remove(initial_log)
for item in os.listdir(run_dir):
item_path = os.path.join(run_dir, item)
if os.path.isdir(item_path) and item.startswith("Book_"):
usage_path = os.path.join(item_path, "usage_log.json")
if os.path.exists(usage_path):
data = utils.load_json(usage_path)
total_cost += data.get('totals', {}).get('est_cost_usd', 0.0)
# 4. Update Database with Final Status
try:
with sqlite3.connect(db_path, timeout=30, check_same_thread=False) as conn:
conn.execute("UPDATE run SET status = ?, cost = ?, end_time = ?, log_file = ?, progress = 100 WHERE id = ?",
(status, total_cost, datetime.utcnow(), final_log_path, run_id))
except Exception as e:
print(f"Failed to update run status in DB: {e}")
_task_log(f"Task finished. status={status} cost=${total_cost:.4f}")
return {"run_id": run_id, "status": status, "cost": total_cost, "final_log": final_log_path}
@huey.task()
def regenerate_artifacts_task(run_id, project_path, feedback=None):
db_path = os.path.join(config.DATA_DIR, "bookapp.db")
run_dir = os.path.join(project_path, "runs", f"run_{run_id}")
log_file = os.path.join(run_dir, "web_console.log")
if not os.path.exists(run_dir):
log_file = os.path.join(project_path, f"system_log_{run_id}.txt")
try:
with open(log_file, 'w', encoding='utf-8') as f:
f.write(f"[{datetime.utcnow().strftime('%H:%M:%S')}] --- REGENERATION STARTED ---\n")
except: pass
utils.set_log_file(log_file)
utils.set_log_callback(lambda p, m: db_log_callback(db_path, run_id, p, m))
try:
with sqlite3.connect(db_path) as conn:
conn.execute("DELETE FROM log_entry WHERE run_id = ?", (run_id,))
conn.execute("UPDATE run SET status = 'running' WHERE id = ?", (run_id,))
except: pass
utils.log("SYSTEM", "Starting Artifact Regeneration...")
book_dir = run_dir
if os.path.exists(run_dir):
subdirs = utils.get_sorted_book_folders(run_dir)
if subdirs: book_dir = os.path.join(run_dir, subdirs[0])
bible_path = os.path.join(project_path, "bible.json")
if not os.path.exists(run_dir) or not os.path.exists(bible_path):
utils.log("ERROR", "Run directory or Bible not found.")
try:
with sqlite3.connect(db_path) as conn:
conn.execute("UPDATE run SET status = 'failed' WHERE id = ?", (run_id,))
except: pass
return
bible = utils.load_json(bible_path)
final_bp_path = os.path.join(book_dir, "final_blueprint.json")
ms_path = os.path.join(book_dir, "manuscript.json")
if not os.path.exists(final_bp_path) or not os.path.exists(ms_path):
utils.log("ERROR", f"Blueprint or Manuscript not found in {book_dir}")
try:
with sqlite3.connect(db_path) as conn:
conn.execute("UPDATE run SET status = 'failed' WHERE id = ?", (run_id,))
except: pass
return
bp = utils.load_json(final_bp_path)
ms = utils.load_json(ms_path)
meta = bible.get('project_metadata', {})
if 'book_metadata' in bp:
for k in ['author', 'genre', 'target_audience', 'style']:
if k in meta:
bp['book_metadata'][k] = meta[k]
if bp.get('series_metadata', {}).get('is_series'):
bp['series_metadata']['series_title'] = meta.get('title', bp['series_metadata'].get('series_title'))
b_num = bp['series_metadata'].get('book_number')
for b in bible.get('books', []):
if b.get('book_number') == b_num:
bp['book_metadata']['title'] = b.get('title', bp['book_metadata'].get('title'))
break
else:
bp['book_metadata']['title'] = meta.get('title', bp['book_metadata'].get('title'))
with open(final_bp_path, 'w') as f: json.dump(bp, f, indent=2)
try:
ai_setup.init_models()
tracking = None
events_path = os.path.join(book_dir, "tracking_events.json")
if os.path.exists(events_path):
tracking = {"events": utils.load_json(events_path), "characters": utils.load_json(os.path.join(book_dir, "tracking_characters.json"))}
marketing_cover.generate_cover(bp, book_dir, tracking, feedback=feedback)
exporter.compile_files(bp, ms, book_dir)
utils.log("SYSTEM", "Regeneration Complete.")
final_status = 'completed'
except Exception as e:
utils.log("ERROR", f"Regeneration Failed: {e}")
final_status = 'failed'
try:
with sqlite3.connect(db_path) as conn:
conn.execute("UPDATE run SET status = ? WHERE id = ?", (final_status, run_id))
except: pass
@huey.task()
def rewrite_chapter_task(run_id, project_path, book_folder, chap_num, instruction):
"""
Background task to rewrite a single chapter and propagate changes.
"""
try:
run_dir = os.path.join(project_path, "runs", f"run_{run_id}")
log_file = os.path.join(run_dir, "web_console.log")
if not os.path.exists(log_file):
log_file = os.path.join(project_path, f"system_log_{run_id}.txt")
try:
with open(log_file, 'w', encoding='utf-8') as f: f.write("")
except: pass
utils.set_log_file(log_file)
db_path = os.path.join(config.DATA_DIR, "bookapp.db")
utils.set_log_callback(lambda p, m: db_log_callback(db_path, run_id, p, m))
try:
with sqlite3.connect(db_path) as conn:
conn.execute("DELETE FROM log_entry WHERE run_id = ?", (run_id,))
conn.execute("UPDATE run SET status = 'running' WHERE id = ?", (run_id,))
except: pass
book_path = os.path.join(run_dir, book_folder)
ms_path = os.path.join(book_path, "manuscript.json")
bp_path = os.path.join(book_path, "final_blueprint.json")
if not (os.path.exists(ms_path) and os.path.exists(bp_path)):
utils.log("ERROR", f"Rewrite failed: files not found for run {run_id}/{book_folder}")
return False
ms = utils.load_json(ms_path)
bp = utils.load_json(bp_path)
ai_setup.init_models()
from story import editor as story_editor
result = story_editor.rewrite_chapter_content(bp, ms, chap_num, instruction, book_path)
if result and result[0]:
new_text, summary = result
for ch in ms:
if str(ch.get('num')) == str(chap_num):
ch['content'] = new_text
break
with open(ms_path, 'w') as f: json.dump(ms, f, indent=2)
updated_ms = story_editor.check_and_propagate(bp, ms, chap_num, book_path, change_summary=summary)
if updated_ms:
ms = updated_ms
with open(ms_path, 'w') as f: json.dump(ms, f, indent=2)
exporter.compile_files(bp, ms, book_path)
try:
with sqlite3.connect(db_path) as conn:
conn.execute("UPDATE run SET status = 'completed' WHERE id = ?", (run_id,))
except: pass
return True
try:
with sqlite3.connect(db_path) as conn:
conn.execute("UPDATE run SET status = 'completed' WHERE id = ?", (run_id,))
except: pass
return False
except Exception as e:
utils.log("ERROR", f"Rewrite task exception for run {run_id}/{book_folder}: {e}")
try:
with sqlite3.connect(db_path) as conn:
conn.execute("UPDATE run SET status = 'failed' WHERE id = ?", (run_id,))
except: pass
return False
@huey.task()
def refine_bible_task(project_path, instruction, source_type, selected_keys=None):
"""
Background task to refine the Bible.
Handles partial merging of selected keys into a temp base before refinement.
"""
try:
bible_path = os.path.join(project_path, "bible.json")
draft_path = os.path.join(project_path, "bible_draft.json")
lock_path = os.path.join(project_path, ".refining")
with open(lock_path, 'w') as f: f.write("running")
base_bible = utils.load_json(bible_path)
if not base_bible: return False
if source_type == 'draft' and os.path.exists(draft_path):
draft_bible = utils.load_json(draft_path)
if selected_keys is not None and draft_bible:
base_bible = bible_tracker.merge_selected_changes(base_bible, draft_bible, selected_keys)
elif draft_bible:
base_bible = draft_bible
ai_setup.init_models()
new_bible = bible_tracker.refine_bible(base_bible, instruction, project_path)
if new_bible:
with open(draft_path, 'w') as f: json.dump(new_bible, f, indent=2)
return True
return False
except Exception as e:
utils.log("ERROR", f"Bible refinement task failed: {e}")
return False
finally:
if os.path.exists(lock_path): os.remove(lock_path)