- web/tasks.py: db_log_callback now writes non-OperationalError exceptions to data/app.log for visibility - web/tasks.py: generate_book_task restructured with try...finally to guarantee final status update — run can never be left in 'running' state if worker crashes - templates/project.html: added .catch() to fetchLog() with console.error + polling resume on failure; added manual Refresh button to status bar - templates/run_details.html: improved .catch() in updateLog() with descriptive message + 5s retry; added manual Refresh button to status bar Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
453 lines
19 KiB
Python
453 lines
19 KiB
Python
import os
|
|
import json
|
|
import time
|
|
import sqlite3
|
|
import shutil
|
|
from datetime import datetime
|
|
from huey import SqliteHuey
|
|
from web.db import db, Run, User, Project
|
|
from core import utils, config
|
|
from ai import models as ai_models
|
|
from ai import setup as ai_setup
|
|
from story import bible_tracker
|
|
from marketing import cover as marketing_cover
|
|
from export import exporter
|
|
|
|
# Configure Huey (Task Queue)
|
|
huey = SqliteHuey('bookapp_queue', filename=os.path.join(config.DATA_DIR, 'queue.db'))
|
|
|
|
def db_log_callback(db_path, run_id, phase, msg):
|
|
"""Writes log entry directly to SQLite to avoid Flask Context issues in threads."""
|
|
import sys as _sys
|
|
for _ in range(5):
|
|
try:
|
|
with sqlite3.connect(db_path, timeout=30, check_same_thread=False) as conn:
|
|
conn.execute("INSERT INTO log_entry (run_id, timestamp, phase, message) VALUES (?, ?, ?, ?)",
|
|
(run_id, datetime.utcnow().isoformat(), phase, str(msg)))
|
|
break
|
|
except sqlite3.OperationalError:
|
|
time.sleep(0.1)
|
|
except Exception as _e:
|
|
print(f"[db_log_callback ERROR run={run_id}] {type(_e).__name__}: {_e}", flush=True, file=_sys.stdout)
|
|
try:
|
|
import os as _os
|
|
from core import config as _cfg
|
|
_app_log = _os.path.join(_cfg.DATA_DIR, "app.log")
|
|
with open(_app_log, 'a', encoding='utf-8') as _f:
|
|
_f.write(f"[db_log_callback ERROR run={run_id}] {type(_e).__name__}: {_e}\n")
|
|
except Exception:
|
|
pass
|
|
break
|
|
|
|
def db_progress_callback(db_path, run_id, percent):
|
|
"""Updates run progress in SQLite."""
|
|
import sys as _sys
|
|
for _ in range(5):
|
|
try:
|
|
with sqlite3.connect(db_path, timeout=30, check_same_thread=False) as conn:
|
|
conn.execute("UPDATE run SET progress = ? WHERE id = ?", (percent, run_id))
|
|
break
|
|
except sqlite3.OperationalError:
|
|
time.sleep(0.1)
|
|
except Exception as _e:
|
|
print(f"[db_progress_callback ERROR run={run_id}] {type(_e).__name__}: {_e}", flush=True, file=_sys.stdout)
|
|
break
|
|
|
|
@huey.task()
|
|
def generate_book_task(run_id, project_path, bible_path, allow_copy=True, feedback=None, source_run_id=None, keep_cover=False, exclude_folders=None):
|
|
"""
|
|
Background task to run the book generation.
|
|
"""
|
|
import sys as _sys
|
|
def _task_log(msg):
|
|
"""Print directly to stdout (docker logs) regardless of utils state."""
|
|
print(f"[TASK run={run_id}] {msg}", flush=True, file=_sys.stdout)
|
|
|
|
_task_log(f"Task picked up by Huey worker. project_path={project_path}")
|
|
|
|
# 1. Setup Logging
|
|
log_filename = f"system_log_{run_id}.txt"
|
|
|
|
# Log to project root initially until run folder is created by engine
|
|
initial_log = os.path.join(project_path, log_filename)
|
|
|
|
# Touch the file immediately so the UI has something to poll even if the
|
|
# worker crashes before the first utils.log() call.
|
|
try:
|
|
with open(initial_log, 'a', encoding='utf-8') as _f:
|
|
pass
|
|
_task_log(f"Log file created: {initial_log}")
|
|
except Exception as _e:
|
|
_task_log(f"WARNING: Could not touch log file {initial_log}: {_e}")
|
|
|
|
utils.set_log_file(initial_log)
|
|
|
|
# Hook up Database Logging
|
|
db_path = os.path.join(config.DATA_DIR, "bookapp.db")
|
|
utils.set_log_callback(lambda p, m: db_log_callback(db_path, run_id, p, m))
|
|
utils.set_progress_callback(lambda p: db_progress_callback(db_path, run_id, p))
|
|
|
|
# Set Status to Running
|
|
try:
|
|
with sqlite3.connect(db_path, timeout=30, check_same_thread=False) as conn:
|
|
conn.execute("UPDATE run SET status = 'running' WHERE id = ?", (run_id,))
|
|
_task_log("Run status set to 'running' in DB.")
|
|
except sqlite3.OperationalError as e:
|
|
_task_log(f"WARNING: DB locked when setting run status: {e}")
|
|
utils.log("SYSTEM", f"Database locked when setting run status (run {run_id}): {e}")
|
|
except Exception as e:
|
|
_task_log(f"WARNING: Could not set run status: {e}")
|
|
|
|
utils.log("SYSTEM", f"Starting Job #{run_id}")
|
|
|
|
status = "failed" # Default to failed; overwritten to "completed" only on clean success
|
|
total_cost = 0.0
|
|
final_log_path = initial_log
|
|
|
|
try:
|
|
# 1.1 Handle Feedback / Modification (Re-run logic)
|
|
if feedback and source_run_id:
|
|
utils.log("SYSTEM", f"Applying feedback to Run #{source_run_id}: '{feedback}'")
|
|
|
|
bible_data = utils.load_json(bible_path)
|
|
|
|
if bible_data:
|
|
try:
|
|
ai_setup.init_models()
|
|
new_bible = bible_tracker.refine_bible(bible_data, feedback, project_path)
|
|
if new_bible:
|
|
bible_data = new_bible
|
|
with open(bible_path, 'w') as f: json.dump(bible_data, f, indent=2)
|
|
utils.log("SYSTEM", "Bible updated with feedback.")
|
|
except Exception as e:
|
|
utils.log("ERROR", f"Failed to refine bible: {e}")
|
|
|
|
# 1.2 Keep Cover Art Logic
|
|
if keep_cover:
|
|
source_run_dir = os.path.join(project_path, "runs", f"run_{source_run_id}")
|
|
if os.path.exists(source_run_dir):
|
|
utils.log("SYSTEM", "Attempting to preserve cover art...")
|
|
|
|
current_run_dir = os.path.join(project_path, "runs", f"run_{run_id}")
|
|
if not os.path.exists(current_run_dir): os.makedirs(current_run_dir)
|
|
|
|
source_books = {}
|
|
for d in os.listdir(source_run_dir):
|
|
if d.startswith("Book_") and os.path.isdir(os.path.join(source_run_dir, d)):
|
|
parts = d.split('_')
|
|
if len(parts) > 1 and parts[1].isdigit():
|
|
source_books[int(parts[1])] = os.path.join(source_run_dir, d)
|
|
|
|
if bible_data and 'books' in bible_data:
|
|
for i, book in enumerate(bible_data['books']):
|
|
b_num = book.get('book_number', i+1)
|
|
if b_num in source_books:
|
|
src_folder = source_books[b_num]
|
|
safe_title = utils.sanitize_filename(book.get('title', f"Book_{b_num}"))
|
|
target_folder = os.path.join(current_run_dir, f"Book_{b_num}_{safe_title}")
|
|
|
|
os.makedirs(target_folder, exist_ok=True)
|
|
|
|
src_cover = os.path.join(src_folder, "cover.png")
|
|
if os.path.exists(src_cover):
|
|
shutil.copy2(src_cover, os.path.join(target_folder, "cover.png"))
|
|
if os.path.exists(os.path.join(src_folder, "cover_art.png")):
|
|
shutil.copy2(os.path.join(src_folder, "cover_art.png"), os.path.join(target_folder, "cover_art.png"))
|
|
utils.log("SYSTEM", f" -> Copied cover for Book {b_num}")
|
|
|
|
# 1.5 Copy Forward Logic (Series Optimization)
|
|
is_series = False
|
|
if os.path.exists(bible_path):
|
|
bible_data = utils.load_json(bible_path)
|
|
if bible_data:
|
|
is_series = bible_data.get('project_metadata', {}).get('is_series', False)
|
|
|
|
runs_dir = os.path.join(project_path, "runs")
|
|
|
|
if allow_copy and is_series and os.path.exists(runs_dir):
|
|
all_runs = [d for d in os.listdir(runs_dir) if d.startswith("run_") and d != f"run_{run_id}"]
|
|
all_runs.sort(key=lambda x: int(x.split('_')[1]) if x.split('_')[1].isdigit() else 0)
|
|
|
|
if all_runs:
|
|
latest_run_dir = os.path.join(runs_dir, all_runs[-1])
|
|
current_run_dir = os.path.join(runs_dir, f"run_{run_id}")
|
|
os.makedirs(current_run_dir, exist_ok=True)
|
|
|
|
utils.log("SYSTEM", f"Checking previous run ({all_runs[-1]}) for completed books...")
|
|
for item in os.listdir(latest_run_dir):
|
|
if item.startswith("Book_") and os.path.isdir(os.path.join(latest_run_dir, item)):
|
|
if exclude_folders and item in exclude_folders:
|
|
utils.log("SYSTEM", f" -> Skipping copy of {item} (Target for revision).")
|
|
continue
|
|
|
|
if os.path.exists(os.path.join(latest_run_dir, item, "manuscript.json")):
|
|
src = os.path.join(latest_run_dir, item)
|
|
dst = os.path.join(current_run_dir, item)
|
|
try:
|
|
shutil.copytree(src, dst, dirs_exist_ok=True)
|
|
utils.log("SYSTEM", f" -> Copied {item} (Skipping generation).")
|
|
except Exception as e:
|
|
utils.log("SYSTEM", f" -> Failed to copy {item}: {e}")
|
|
|
|
# 2. Run Generation
|
|
from cli.engine import run_generation
|
|
run_generation(bible_path, specific_run_id=run_id)
|
|
|
|
utils.log("SYSTEM", "Job Complete.")
|
|
utils.update_progress(100)
|
|
status = "completed"
|
|
|
|
except Exception as e:
|
|
import traceback as _tb
|
|
_task_log(f"ERROR: Job failed — {type(e).__name__}: {e}")
|
|
_task_log(_tb.format_exc())
|
|
utils.log("ERROR", f"Job Failed: {e}")
|
|
# status remains "failed" (set before try block)
|
|
|
|
finally:
|
|
# 3. Calculate Cost & Cleanup — guaranteed to run even if worker crashes
|
|
run_dir = os.path.join(project_path, "runs", f"run_{run_id}")
|
|
|
|
if os.path.exists(run_dir):
|
|
final_log_path = os.path.join(run_dir, "web_console.log")
|
|
if os.path.exists(initial_log):
|
|
try:
|
|
os.rename(initial_log, final_log_path)
|
|
except OSError:
|
|
shutil.copy2(initial_log, final_log_path)
|
|
os.remove(initial_log)
|
|
|
|
for item in os.listdir(run_dir):
|
|
item_path = os.path.join(run_dir, item)
|
|
if os.path.isdir(item_path) and item.startswith("Book_"):
|
|
usage_path = os.path.join(item_path, "usage_log.json")
|
|
if os.path.exists(usage_path):
|
|
data = utils.load_json(usage_path)
|
|
total_cost += data.get('totals', {}).get('est_cost_usd', 0.0)
|
|
|
|
# 4. Update Database with Final Status — run is never left in 'running' state
|
|
try:
|
|
with sqlite3.connect(db_path, timeout=30, check_same_thread=False) as conn:
|
|
conn.execute("UPDATE run SET status = ?, cost = ?, end_time = ?, log_file = ?, progress = 100 WHERE id = ?",
|
|
(status, total_cost, datetime.utcnow(), final_log_path, run_id))
|
|
except Exception as e:
|
|
print(f"Failed to update run status in DB: {e}")
|
|
|
|
_task_log(f"Task finished. status={status} cost=${total_cost:.4f}")
|
|
return {"run_id": run_id, "status": status, "cost": total_cost, "final_log": final_log_path}
|
|
|
|
|
|
@huey.task()
|
|
def regenerate_artifacts_task(run_id, project_path, feedback=None):
|
|
db_path = os.path.join(config.DATA_DIR, "bookapp.db")
|
|
|
|
run_dir = os.path.join(project_path, "runs", f"run_{run_id}")
|
|
log_file = os.path.join(run_dir, "web_console.log")
|
|
|
|
if not os.path.exists(run_dir):
|
|
log_file = os.path.join(project_path, f"system_log_{run_id}.txt")
|
|
|
|
try:
|
|
with open(log_file, 'w', encoding='utf-8') as f:
|
|
f.write(f"[{datetime.utcnow().strftime('%H:%M:%S')}] --- REGENERATION STARTED ---\n")
|
|
except: pass
|
|
|
|
utils.set_log_file(log_file)
|
|
utils.set_log_callback(lambda p, m: db_log_callback(db_path, run_id, p, m))
|
|
try:
|
|
with sqlite3.connect(db_path) as conn:
|
|
conn.execute("DELETE FROM log_entry WHERE run_id = ?", (run_id,))
|
|
conn.execute("UPDATE run SET status = 'running' WHERE id = ?", (run_id,))
|
|
except: pass
|
|
|
|
utils.log("SYSTEM", "Starting Artifact Regeneration...")
|
|
|
|
book_dir = run_dir
|
|
if os.path.exists(run_dir):
|
|
subdirs = utils.get_sorted_book_folders(run_dir)
|
|
if subdirs: book_dir = os.path.join(run_dir, subdirs[0])
|
|
|
|
bible_path = os.path.join(project_path, "bible.json")
|
|
|
|
if not os.path.exists(run_dir) or not os.path.exists(bible_path):
|
|
utils.log("ERROR", "Run directory or Bible not found.")
|
|
try:
|
|
with sqlite3.connect(db_path) as conn:
|
|
conn.execute("UPDATE run SET status = 'failed' WHERE id = ?", (run_id,))
|
|
except: pass
|
|
return
|
|
|
|
bible = utils.load_json(bible_path)
|
|
final_bp_path = os.path.join(book_dir, "final_blueprint.json")
|
|
ms_path = os.path.join(book_dir, "manuscript.json")
|
|
|
|
if not os.path.exists(final_bp_path) or not os.path.exists(ms_path):
|
|
utils.log("ERROR", f"Blueprint or Manuscript not found in {book_dir}")
|
|
try:
|
|
with sqlite3.connect(db_path) as conn:
|
|
conn.execute("UPDATE run SET status = 'failed' WHERE id = ?", (run_id,))
|
|
except: pass
|
|
return
|
|
|
|
bp = utils.load_json(final_bp_path)
|
|
ms = utils.load_json(ms_path)
|
|
|
|
meta = bible.get('project_metadata', {})
|
|
if 'book_metadata' in bp:
|
|
for k in ['author', 'genre', 'target_audience', 'style']:
|
|
if k in meta:
|
|
bp['book_metadata'][k] = meta[k]
|
|
|
|
if bp.get('series_metadata', {}).get('is_series'):
|
|
bp['series_metadata']['series_title'] = meta.get('title', bp['series_metadata'].get('series_title'))
|
|
b_num = bp['series_metadata'].get('book_number')
|
|
for b in bible.get('books', []):
|
|
if b.get('book_number') == b_num:
|
|
bp['book_metadata']['title'] = b.get('title', bp['book_metadata'].get('title'))
|
|
break
|
|
else:
|
|
bp['book_metadata']['title'] = meta.get('title', bp['book_metadata'].get('title'))
|
|
|
|
with open(final_bp_path, 'w') as f: json.dump(bp, f, indent=2)
|
|
|
|
try:
|
|
ai_setup.init_models()
|
|
|
|
tracking = None
|
|
events_path = os.path.join(book_dir, "tracking_events.json")
|
|
if os.path.exists(events_path):
|
|
tracking = {"events": utils.load_json(events_path), "characters": utils.load_json(os.path.join(book_dir, "tracking_characters.json"))}
|
|
|
|
marketing_cover.generate_cover(bp, book_dir, tracking, feedback=feedback)
|
|
exporter.compile_files(bp, ms, book_dir)
|
|
|
|
utils.log("SYSTEM", "Regeneration Complete.")
|
|
final_status = 'completed'
|
|
except Exception as e:
|
|
utils.log("ERROR", f"Regeneration Failed: {e}")
|
|
final_status = 'failed'
|
|
|
|
try:
|
|
with sqlite3.connect(db_path) as conn:
|
|
conn.execute("UPDATE run SET status = ? WHERE id = ?", (final_status, run_id))
|
|
except: pass
|
|
|
|
|
|
@huey.task()
|
|
def rewrite_chapter_task(run_id, project_path, book_folder, chap_num, instruction):
|
|
"""
|
|
Background task to rewrite a single chapter and propagate changes.
|
|
"""
|
|
try:
|
|
run_dir = os.path.join(project_path, "runs", f"run_{run_id}")
|
|
|
|
log_file = os.path.join(run_dir, "web_console.log")
|
|
if not os.path.exists(log_file):
|
|
log_file = os.path.join(project_path, f"system_log_{run_id}.txt")
|
|
|
|
try:
|
|
with open(log_file, 'w', encoding='utf-8') as f: f.write("")
|
|
except: pass
|
|
|
|
utils.set_log_file(log_file)
|
|
db_path = os.path.join(config.DATA_DIR, "bookapp.db")
|
|
utils.set_log_callback(lambda p, m: db_log_callback(db_path, run_id, p, m))
|
|
|
|
try:
|
|
with sqlite3.connect(db_path) as conn:
|
|
conn.execute("DELETE FROM log_entry WHERE run_id = ?", (run_id,))
|
|
conn.execute("UPDATE run SET status = 'running' WHERE id = ?", (run_id,))
|
|
except: pass
|
|
|
|
book_path = os.path.join(run_dir, book_folder)
|
|
ms_path = os.path.join(book_path, "manuscript.json")
|
|
bp_path = os.path.join(book_path, "final_blueprint.json")
|
|
|
|
if not (os.path.exists(ms_path) and os.path.exists(bp_path)):
|
|
utils.log("ERROR", f"Rewrite failed: files not found for run {run_id}/{book_folder}")
|
|
return False
|
|
|
|
ms = utils.load_json(ms_path)
|
|
bp = utils.load_json(bp_path)
|
|
|
|
ai_setup.init_models()
|
|
|
|
from story import editor as story_editor
|
|
result = story_editor.rewrite_chapter_content(bp, ms, chap_num, instruction, book_path)
|
|
|
|
if result and result[0]:
|
|
new_text, summary = result
|
|
for ch in ms:
|
|
if str(ch.get('num')) == str(chap_num):
|
|
ch['content'] = new_text
|
|
break
|
|
|
|
with open(ms_path, 'w') as f: json.dump(ms, f, indent=2)
|
|
|
|
updated_ms = story_editor.check_and_propagate(bp, ms, chap_num, book_path, change_summary=summary)
|
|
if updated_ms:
|
|
ms = updated_ms
|
|
|
|
with open(ms_path, 'w') as f: json.dump(ms, f, indent=2)
|
|
exporter.compile_files(bp, ms, book_path)
|
|
|
|
try:
|
|
with sqlite3.connect(db_path) as conn:
|
|
conn.execute("UPDATE run SET status = 'completed' WHERE id = ?", (run_id,))
|
|
except: pass
|
|
return True
|
|
|
|
try:
|
|
with sqlite3.connect(db_path) as conn:
|
|
conn.execute("UPDATE run SET status = 'completed' WHERE id = ?", (run_id,))
|
|
except: pass
|
|
return False
|
|
except Exception as e:
|
|
utils.log("ERROR", f"Rewrite task exception for run {run_id}/{book_folder}: {e}")
|
|
try:
|
|
with sqlite3.connect(db_path) as conn:
|
|
conn.execute("UPDATE run SET status = 'failed' WHERE id = ?", (run_id,))
|
|
except: pass
|
|
return False
|
|
|
|
|
|
@huey.task()
|
|
def refine_bible_task(project_path, instruction, source_type, selected_keys=None):
|
|
"""
|
|
Background task to refine the Bible.
|
|
Handles partial merging of selected keys into a temp base before refinement.
|
|
"""
|
|
try:
|
|
bible_path = os.path.join(project_path, "bible.json")
|
|
draft_path = os.path.join(project_path, "bible_draft.json")
|
|
lock_path = os.path.join(project_path, ".refining")
|
|
|
|
with open(lock_path, 'w') as f: f.write("running")
|
|
|
|
base_bible = utils.load_json(bible_path)
|
|
if not base_bible: return False
|
|
|
|
if source_type == 'draft' and os.path.exists(draft_path):
|
|
draft_bible = utils.load_json(draft_path)
|
|
|
|
if selected_keys is not None and draft_bible:
|
|
base_bible = bible_tracker.merge_selected_changes(base_bible, draft_bible, selected_keys)
|
|
elif draft_bible:
|
|
base_bible = draft_bible
|
|
|
|
ai_setup.init_models()
|
|
|
|
new_bible = bible_tracker.refine_bible(base_bible, instruction, project_path)
|
|
|
|
if new_bible:
|
|
with open(draft_path, 'w') as f: json.dump(new_bible, f, indent=2)
|
|
return True
|
|
|
|
return False
|
|
|
|
except Exception as e:
|
|
utils.log("ERROR", f"Bible refinement task failed: {e}")
|
|
return False
|
|
finally:
|
|
if os.path.exists(lock_path): os.remove(lock_path)
|