import os import json import time import sqlite3 import shutil from datetime import datetime from huey import SqliteHuey from web.db import db, Run, User, Project from core import utils, config from ai import models as ai_models from ai import setup as ai_setup from story import bible_tracker from marketing import cover as marketing_cover from export import exporter # Configure Huey (Task Queue) huey = SqliteHuey('bookapp_queue', filename=os.path.join(config.DATA_DIR, 'queue.db')) def _robust_update_run_status(db_path, run_id, status, retries=5, **extra_cols): """Update run status with exponential-backoff retry. Raises RuntimeError if all retries fail.""" import sys as _sys cols = {"status": status} cols.update(extra_cols) set_clause = ", ".join(f"{k} = ?" for k in cols) values = list(cols.values()) + [run_id] for attempt in range(retries): try: with sqlite3.connect(db_path, timeout=30, check_same_thread=False) as conn: conn.execute(f"UPDATE run SET {set_clause} WHERE id = ?", values) return except sqlite3.OperationalError as e: wait = attempt + 1 print(f"[DB WARN run={run_id}] Status update locked (attempt {attempt+1}/{retries}), retry in {wait}s: {e}", flush=True, file=_sys.stdout) time.sleep(wait) except Exception as e: print(f"[DB ERROR run={run_id}] Unexpected error on status update: {type(e).__name__}: {e}", flush=True, file=_sys.stdout) raise msg = f"[DB CRITICAL run={run_id}] Failed to update status='{status}' after {retries} attempts." print(msg, flush=True, file=_sys.stdout) raise RuntimeError(msg) def db_heartbeat_callback(db_path, run_id): """Updates last_heartbeat timestamp for the run in SQLite.""" import sys as _sys for _ in range(3): try: with sqlite3.connect(db_path, timeout=10, check_same_thread=False) as conn: conn.execute("UPDATE run SET last_heartbeat = ? WHERE id = ?", (datetime.utcnow().isoformat(), run_id)) return except sqlite3.OperationalError: time.sleep(0.2) except Exception as _e: print(f"[db_heartbeat ERROR run={run_id}] {type(_e).__name__}: {_e}", flush=True, file=_sys.stdout) return def db_log_callback(db_path, run_id, phase, msg): """Writes log entry directly to SQLite to avoid Flask Context issues in threads.""" import sys as _sys for _ in range(5): try: with sqlite3.connect(db_path, timeout=30, check_same_thread=False) as conn: conn.execute("INSERT INTO log_entry (run_id, timestamp, phase, message) VALUES (?, ?, ?, ?)", (run_id, datetime.utcnow().isoformat(), phase, str(msg))) break except sqlite3.OperationalError: time.sleep(0.1) except Exception as _e: print(f"[db_log_callback ERROR run={run_id}] {type(_e).__name__}: {_e}", flush=True, file=_sys.stdout) try: import os as _os from core import config as _cfg _app_log = _os.path.join(_cfg.DATA_DIR, "app.log") with open(_app_log, 'a', encoding='utf-8') as _f: _f.write(f"[db_log_callback ERROR run={run_id}] {type(_e).__name__}: {_e}\n") except Exception: pass break def db_progress_callback(db_path, run_id, percent): """Updates run progress in SQLite.""" import sys as _sys for _ in range(5): try: with sqlite3.connect(db_path, timeout=30, check_same_thread=False) as conn: conn.execute("UPDATE run SET progress = ? WHERE id = ?", (percent, run_id)) break except sqlite3.OperationalError: time.sleep(0.1) except Exception as _e: print(f"[db_progress_callback ERROR run={run_id}] {type(_e).__name__}: {_e}", flush=True, file=_sys.stdout) break @huey.task() def generate_book_task(run_id, project_path, bible_path, allow_copy=True, feedback=None, source_run_id=None, keep_cover=False, exclude_folders=None): """ Background task to run the book generation. """ import sys as _sys def _task_log(msg): """Print directly to stdout (docker logs) regardless of utils state.""" print(f"[TASK run={run_id}] {msg}", flush=True, file=_sys.stdout) _task_log(f"Task picked up by Huey worker. project_path={project_path}") # 0. Orphaned Job Guard — verify that all required resources exist before # doing any work. If a run, project folder, or bible is missing, terminate # silently and mark the run as failed to prevent data being written to the # wrong book or project. db_path_early = os.path.join(config.DATA_DIR, "bookapp.db") try: with sqlite3.connect(db_path_early, timeout=10) as _conn: _row = _conn.execute("SELECT id FROM run WHERE id = ?", (run_id,)).fetchone() if not _row: _task_log(f"ABORT: Run #{run_id} no longer exists in DB. Terminating silently.") return except Exception as _e: _task_log(f"WARNING: Could not verify run #{run_id} existence: {_e}") if not os.path.isdir(project_path): _task_log(f"ABORT: Project folder missing ({project_path}). Marking run #{run_id} as failed.") try: _robust_update_run_status(db_path_early, run_id, 'failed', end_time=datetime.utcnow().isoformat()) except Exception: pass return if not os.path.isfile(bible_path): _task_log(f"ABORT: Bible file missing ({bible_path}). Marking run #{run_id} as failed.") try: _robust_update_run_status(db_path_early, run_id, 'failed', end_time=datetime.utcnow().isoformat()) except Exception: pass return # Validate that the bible has at least one book entry try: with open(bible_path, 'r', encoding='utf-8') as _bf: _bible_check = json.load(_bf) if not _bible_check.get('books'): _task_log(f"ABORT: Bible has no books defined. Marking run #{run_id} as failed.") try: _robust_update_run_status(db_path_early, run_id, 'failed', end_time=datetime.utcnow().isoformat()) except Exception: pass return except Exception as _e: _task_log(f"ABORT: Could not parse bible ({bible_path}): {_e}. Marking run #{run_id} as failed.") try: _robust_update_run_status(db_path_early, run_id, 'failed', end_time=datetime.utcnow().isoformat()) except Exception: pass return # 1. Setup Logging log_filename = f"system_log_{run_id}.txt" # Log to project root initially until run folder is created by engine initial_log = os.path.join(project_path, log_filename) # Touch the file immediately so the UI has something to poll even if the # worker crashes before the first utils.log() call. try: with open(initial_log, 'a', encoding='utf-8') as _f: pass _task_log(f"Log file created: {initial_log}") except Exception as _e: _task_log(f"WARNING: Could not touch log file {initial_log}: {_e}") utils.set_log_file(initial_log) # Hook up Database Logging db_path = os.path.join(config.DATA_DIR, "bookapp.db") utils.set_log_callback(lambda p, m: db_log_callback(db_path, run_id, p, m)) utils.set_progress_callback(lambda p: db_progress_callback(db_path, run_id, p)) utils.set_heartbeat_callback(lambda: db_heartbeat_callback(db_path, run_id)) # Set Status to Running (with start_time and initial heartbeat) try: _robust_update_run_status(db_path, run_id, 'running', start_time=datetime.utcnow().isoformat(), last_heartbeat=datetime.utcnow().isoformat()) _task_log("Run status set to 'running' in DB.") except Exception as e: _task_log(f"WARNING: Could not set run status to 'running': {e}") utils.log("SYSTEM", f"WARNING: run status update failed (run {run_id}): {e}") utils.log("SYSTEM", f"Starting Job #{run_id}") status = "failed" # Default to failed; overwritten to "completed" only on clean success total_cost = 0.0 final_log_path = initial_log try: # 1.1 Handle Feedback / Modification (Re-run logic) if feedback and source_run_id: utils.log("SYSTEM", f"Applying feedback to Run #{source_run_id}: '{feedback}'") bible_data = utils.load_json(bible_path) if bible_data: try: ai_setup.init_models() new_bible = bible_tracker.refine_bible(bible_data, feedback, project_path) if new_bible: bible_data = new_bible with open(bible_path, 'w') as f: json.dump(bible_data, f, indent=2) utils.log("SYSTEM", "Bible updated with feedback.") except Exception as e: utils.log("ERROR", f"Failed to refine bible: {e}") # 1.2 Keep Cover Art Logic if keep_cover: source_run_dir = os.path.join(project_path, "runs", f"run_{source_run_id}") if os.path.exists(source_run_dir): utils.log("SYSTEM", "Attempting to preserve cover art...") current_run_dir = os.path.join(project_path, "runs", f"run_{run_id}") if not os.path.exists(current_run_dir): os.makedirs(current_run_dir) source_books = {} for d in os.listdir(source_run_dir): if d.startswith("Book_") and os.path.isdir(os.path.join(source_run_dir, d)): parts = d.split('_') if len(parts) > 1 and parts[1].isdigit(): source_books[int(parts[1])] = os.path.join(source_run_dir, d) if bible_data and 'books' in bible_data: for i, book in enumerate(bible_data['books']): b_num = book.get('book_number', i+1) if b_num in source_books: src_folder = source_books[b_num] safe_title = utils.sanitize_filename(book.get('title', f"Book_{b_num}")) target_folder = os.path.join(current_run_dir, f"Book_{b_num}_{safe_title}") os.makedirs(target_folder, exist_ok=True) src_cover = os.path.join(src_folder, "cover.png") if os.path.exists(src_cover): shutil.copy2(src_cover, os.path.join(target_folder, "cover.png")) if os.path.exists(os.path.join(src_folder, "cover_art.png")): shutil.copy2(os.path.join(src_folder, "cover_art.png"), os.path.join(target_folder, "cover_art.png")) utils.log("SYSTEM", f" -> Copied cover for Book {b_num}") # 1.5 Copy Forward Logic (Series Optimization) is_series = False if os.path.exists(bible_path): bible_data = utils.load_json(bible_path) if bible_data: is_series = bible_data.get('project_metadata', {}).get('is_series', False) runs_dir = os.path.join(project_path, "runs") if allow_copy and is_series and os.path.exists(runs_dir): all_runs = [d for d in os.listdir(runs_dir) if d.startswith("run_") and d != f"run_{run_id}"] all_runs.sort(key=lambda x: int(x.split('_')[1]) if x.split('_')[1].isdigit() else 0) if all_runs: latest_run_dir = os.path.join(runs_dir, all_runs[-1]) current_run_dir = os.path.join(runs_dir, f"run_{run_id}") os.makedirs(current_run_dir, exist_ok=True) utils.log("SYSTEM", f"Checking previous run ({all_runs[-1]}) for completed books...") for item in os.listdir(latest_run_dir): if item.startswith("Book_") and os.path.isdir(os.path.join(latest_run_dir, item)): if exclude_folders and item in exclude_folders: utils.log("SYSTEM", f" -> Skipping copy of {item} (Target for revision).") continue if os.path.exists(os.path.join(latest_run_dir, item, "manuscript.json")): src = os.path.join(latest_run_dir, item) dst = os.path.join(current_run_dir, item) try: shutil.copytree(src, dst, dirs_exist_ok=True) utils.log("SYSTEM", f" -> Copied {item} (Skipping generation).") except Exception as e: utils.log("SYSTEM", f" -> Failed to copy {item}: {e}") # 2. Run Generation from cli.engine import run_generation run_generation(bible_path, specific_run_id=run_id) utils.log("SYSTEM", "Job Complete.") utils.update_progress(100) status = "completed" except Exception as e: import traceback as _tb _task_log(f"ERROR: Job failed — {type(e).__name__}: {e}") _task_log(_tb.format_exc()) utils.log("ERROR", f"Job Failed: {e}") # status remains "failed" (set before try block) finally: # 3. Calculate Cost & Cleanup — guaranteed to run even if worker crashes run_dir = os.path.join(project_path, "runs", f"run_{run_id}") if os.path.exists(run_dir): final_log_path = os.path.join(run_dir, "web_console.log") if os.path.exists(initial_log): try: os.rename(initial_log, final_log_path) except OSError: shutil.copy2(initial_log, final_log_path) os.remove(initial_log) for item in os.listdir(run_dir): item_path = os.path.join(run_dir, item) if os.path.isdir(item_path) and item.startswith("Book_"): usage_path = os.path.join(item_path, "usage_log.json") if os.path.exists(usage_path): data = utils.load_json(usage_path) total_cost += data.get('totals', {}).get('est_cost_usd', 0.0) # 4. Update Database with Final Status — run is never left in 'running' state try: _robust_update_run_status(db_path, run_id, status, cost=total_cost, end_time=datetime.utcnow().isoformat(), log_file=final_log_path, progress=100) except Exception as e: print(f"[CRITICAL run={run_id}] Final status update failed after all retries: {e}", flush=True) _task_log(f"Task finished. status={status} cost=${total_cost:.4f}") return {"run_id": run_id, "status": status, "cost": total_cost, "final_log": final_log_path} @huey.task() def regenerate_artifacts_task(run_id, project_path, feedback=None): db_path = os.path.join(config.DATA_DIR, "bookapp.db") run_dir = os.path.join(project_path, "runs", f"run_{run_id}") log_file = os.path.join(run_dir, "web_console.log") if not os.path.exists(run_dir): log_file = os.path.join(project_path, f"system_log_{run_id}.txt") try: with open(log_file, 'w', encoding='utf-8') as f: f.write(f"[{datetime.utcnow().strftime('%H:%M:%S')}] --- REGENERATION STARTED ---\n") except: pass utils.set_log_file(log_file) utils.set_log_callback(lambda p, m: db_log_callback(db_path, run_id, p, m)) try: with sqlite3.connect(db_path, timeout=30, check_same_thread=False) as conn: conn.execute("DELETE FROM log_entry WHERE run_id = ?", (run_id,)) except Exception as _e: print(f"[WARN run={run_id}] Could not clear log_entry for regen: {_e}", flush=True) try: _robust_update_run_status(db_path, run_id, 'running', start_time=datetime.utcnow().isoformat(), last_heartbeat=datetime.utcnow().isoformat()) except Exception as _e: print(f"[WARN run={run_id}] Could not set status to 'running' for regen: {_e}", flush=True) utils.log("SYSTEM", "Starting Artifact Regeneration...") book_dir = run_dir if os.path.exists(run_dir): subdirs = utils.get_sorted_book_folders(run_dir) if subdirs: book_dir = os.path.join(run_dir, subdirs[0]) bible_path = os.path.join(project_path, "bible.json") if not os.path.exists(run_dir) or not os.path.exists(bible_path): utils.log("ERROR", "Run directory or Bible not found.") try: _robust_update_run_status(db_path, run_id, 'failed') except Exception as _e: print(f"[WARN run={run_id}] Could not set status to 'failed': {_e}", flush=True) return bible = utils.load_json(bible_path) final_bp_path = os.path.join(book_dir, "final_blueprint.json") ms_path = os.path.join(book_dir, "manuscript.json") if not os.path.exists(final_bp_path) or not os.path.exists(ms_path): utils.log("ERROR", f"Blueprint or Manuscript not found in {book_dir}") try: _robust_update_run_status(db_path, run_id, 'failed') except Exception as _e: print(f"[WARN run={run_id}] Could not set status to 'failed': {_e}", flush=True) return bp = utils.load_json(final_bp_path) ms = utils.load_json(ms_path) meta = bible.get('project_metadata', {}) if 'book_metadata' in bp: for k in ['author', 'genre', 'target_audience', 'style']: if k in meta: bp['book_metadata'][k] = meta[k] if bp.get('series_metadata', {}).get('is_series'): bp['series_metadata']['series_title'] = meta.get('title', bp['series_metadata'].get('series_title')) b_num = bp['series_metadata'].get('book_number') for b in bible.get('books', []): if b.get('book_number') == b_num: bp['book_metadata']['title'] = b.get('title', bp['book_metadata'].get('title')) break else: bp['book_metadata']['title'] = meta.get('title', bp['book_metadata'].get('title')) with open(final_bp_path, 'w') as f: json.dump(bp, f, indent=2) try: ai_setup.init_models() tracking = None events_path = os.path.join(book_dir, "tracking_events.json") if os.path.exists(events_path): tracking = {"events": utils.load_json(events_path), "characters": utils.load_json(os.path.join(book_dir, "tracking_characters.json"))} marketing_cover.generate_cover(bp, book_dir, tracking, feedback=feedback) exporter.compile_files(bp, ms, book_dir) utils.log("SYSTEM", "Regeneration Complete.") final_status = 'completed' except Exception as e: utils.log("ERROR", f"Regeneration Failed: {e}") final_status = 'failed' try: _robust_update_run_status(db_path, run_id, final_status) except Exception as _e: print(f"[CRITICAL run={run_id}] Final regen status update failed: {_e}", flush=True) @huey.task() def rewrite_chapter_task(run_id, project_path, book_folder, chap_num, instruction): """ Background task to rewrite a single chapter and propagate changes. """ db_path = os.path.join(config.DATA_DIR, "bookapp.db") try: run_dir = os.path.join(project_path, "runs", f"run_{run_id}") log_file = os.path.join(run_dir, "web_console.log") if not os.path.exists(log_file): log_file = os.path.join(project_path, f"system_log_{run_id}.txt") try: with open(log_file, 'w', encoding='utf-8') as f: f.write("") except: pass utils.set_log_file(log_file) utils.set_log_callback(lambda p, m: db_log_callback(db_path, run_id, p, m)) try: with sqlite3.connect(db_path, timeout=30, check_same_thread=False) as conn: conn.execute("DELETE FROM log_entry WHERE run_id = ?", (run_id,)) except Exception as _e: print(f"[WARN run={run_id}] Could not clear log_entry for rewrite: {_e}", flush=True) try: _robust_update_run_status(db_path, run_id, 'running', start_time=datetime.utcnow().isoformat(), last_heartbeat=datetime.utcnow().isoformat()) except Exception as _e: print(f"[WARN run={run_id}] Could not set status to 'running' for rewrite: {_e}", flush=True) book_path = os.path.join(run_dir, book_folder) ms_path = os.path.join(book_path, "manuscript.json") bp_path = os.path.join(book_path, "final_blueprint.json") if not (os.path.exists(ms_path) and os.path.exists(bp_path)): utils.log("ERROR", f"Rewrite failed: files not found for run {run_id}/{book_folder}") return False ms = utils.load_json(ms_path) bp = utils.load_json(bp_path) ai_setup.init_models() from story import editor as story_editor result = story_editor.rewrite_chapter_content(bp, ms, chap_num, instruction, book_path) if result and result[0]: new_text, summary = result for ch in ms: if str(ch.get('num')) == str(chap_num): ch['content'] = new_text break with open(ms_path, 'w') as f: json.dump(ms, f, indent=2) updated_ms = story_editor.check_and_propagate(bp, ms, chap_num, book_path, change_summary=summary) if updated_ms: ms = updated_ms with open(ms_path, 'w') as f: json.dump(ms, f, indent=2) exporter.compile_files(bp, ms, book_path) try: _robust_update_run_status(db_path, run_id, 'completed', end_time=datetime.utcnow().isoformat()) except Exception as _e: print(f"[WARN run={run_id}] Could not set status to 'completed': {_e}", flush=True) return True try: _robust_update_run_status(db_path, run_id, 'completed', end_time=datetime.utcnow().isoformat()) except Exception as _e: print(f"[WARN run={run_id}] Could not set status to 'completed': {_e}", flush=True) return False except Exception as e: utils.log("ERROR", f"Rewrite task exception for run {run_id}/{book_folder}: {e}") try: _robust_update_run_status(db_path, run_id, 'failed', end_time=datetime.utcnow().isoformat()) except Exception as _e: print(f"[CRITICAL run={run_id}] Could not set status to 'failed' after rewrite error: {_e}", flush=True) return False @huey.task() def refine_bible_task(project_path, instruction, source_type, selected_keys=None): """ Background task to refine the Bible. Handles partial merging of selected keys into a temp base before refinement. """ try: bible_path = os.path.join(project_path, "bible.json") draft_path = os.path.join(project_path, "bible_draft.json") lock_path = os.path.join(project_path, ".refining") with open(lock_path, 'w') as f: f.write("running") base_bible = utils.load_json(bible_path) if not base_bible: return False if source_type == 'draft' and os.path.exists(draft_path): draft_bible = utils.load_json(draft_path) if selected_keys is not None and draft_bible: base_bible = bible_tracker.merge_selected_changes(base_bible, draft_bible, selected_keys) elif draft_bible: base_bible = draft_bible ai_setup.init_models() new_bible = bible_tracker.refine_bible(base_bible, instruction, project_path) if new_bible: with open(draft_path, 'w') as f: json.dump(new_bible, f, indent=2) return True return False except Exception as e: utils.log("ERROR", f"Bible refinement task failed: {e}") return False finally: if os.path.exists(lock_path): os.remove(lock_path)