diff --git a/ai_blueprint.md b/ai_blueprint.md index 28dec9e..34588d2 100644 --- a/ai_blueprint.md +++ b/ai_blueprint.md @@ -1,52 +1,56 @@ -# AI Context Optimization Blueprint (v2.12 — implemented 2026-02-21) +# AI Blueprint: Addressing Stuck Book Generation Jobs -This blueprint outlines architectural improvements for how AI context is managed during the writing process. The goal is to provide the AI (Claude/Gemini) with **better, highly-targeted context upfront**, which will dramatically improve first-draft quality and reduce the reliance on expensive, time-consuming quality checks and rewrites (currently up to 5 attempts). +> **Status: IMPLEMENTED — v2.14** +> All five steps below were implemented on 2026-02-21. -## Bug: Frontend Stuck on "Initializing/Waiting for logs" ✅ FIXED in v2.12 +## 1. The Problem: Progress Stalls -**Symptom:** -A book generation task is started from the web UI. Docker logs show the process is running correctly and making progress through different phases (ARCHITECT, WRITER, etc.). However, the frontend UI remains stuck on the "Initializing..." or "Waiting for logs..." state and never shows the live log feed or progress updates. +The primary issue is that book generation jobs can get "stuck" in a "running" state, preventing users from starting new runs and causing confusion as the UI shows no progress. This is likely caused by worker processes crashing or encountering unhandled errors before they can update the job's final status to "completed" or "failed". -**Root Cause Analysis:** -This is a state-synchronization issue between the backend task worker and the frontend UI. The UI polls a backend endpoint (`/run//status`) to get the latest status and log content. When this bug occurs, one of several underlying systems has failed: -1. **Log Persistence:** The backend worker fails to write its log messages to the shared database (`LogEntry` table) or fallback file (`web_console.log`). -2. **Database Connection/Session:** The web server process that responds to the `/run/.../status` poll has a stale view of the database or cannot connect. -3. **Frontend Polling:** The javascript on the page stops polling for updates. +## 2. Investigation Findings -Based on previous fixes (v2.9, v2.11), this is a recurring problem with multiple potential failure points. +- **State Management:** The `Run` table in the database has a `status` column. Tasks in `web/tasks.py` are responsible for updating this from "queued" to "running" and finally to "completed" or "failed". +- **Point of Failure:** The most likely failure point is a catastrophic crash of the Huey worker process (e.g., out-of-memory error) or a deadlock within the core `cli.engine.run_generation` function. In these scenarios, the `finally` block that updates the status is never reached. +- **Database Contention:** The direct use of `sqlite3` in the tasks can lead to `database is locked` errors. While there are some retries, prolonged locks could cause status updates to fail. +- **Silent Errors:** Some task functions use a bare `try...except: pass` around the final status update. If updating the database fails, the error is swallowed, and the job remains in a "running" state. -### Areas to Investigate & Verify (Piece-by-Piece) +## 3. The Plan: Enhancing Robustness -This section provides a checklist for a developer to debug this specific issue. +### Step 1: Implement a "Stale Job" Cleanup Process ✅ -#### 1. Backend Worker -> Database Communication (`web/tasks.py`) +- **`last_heartbeat` column added to `Run` model** (`web/db.py`). +- **Migration** added in `web/app.py` startup to add `last_heartbeat` column to existing databases. +- **Startup reset** already present — all `status='running'` jobs are reset to `failed` at boot. +- **Periodic stale-job watcher thread** (`_stale_job_watcher`) started in `web/app.py`: + - Runs every 5 minutes. + - Marks jobs `failed` if `last_heartbeat` is > 15 minutes stale. + - Marks jobs `failed` if `start_time` is > 2 hours old and no heartbeat was ever recorded. -* **`db_log_callback` Function:** This is the primary function responsible for writing log entries to the database. - * **Action:** Verify that the `try...except` block is not catching and silencing critical errors. The `except OperationalError` is expected for handling database locks, but other exceptions should be logged loudly. - * **Action:** Ensure the database session is correctly handled within the callback. Is it possible the session becomes invalid over the long duration of the task? -* **`generate_book_task` Final Status Update:** - * **Action:** At the very end of the `generate_book_task`, there are final `run.status = 'completed'` and `db.session.commit()` calls. Verify these are wrapped in a `finally` block to guarantee they execute even if the main task logic fails. If the task fails, the status should be explicitly set to `'failed'`. +### Step 2: Fortify Database Updates ✅ -#### 2. Web Server -> Database Communication (`web/routes/run.py`) +- **`_robust_update_run_status()`** helper added to `web/tasks.py`: + - 5 retries with linear backoff (1–5 seconds per attempt). + - Handles `sqlite3.OperationalError` specifically with retry; raises `RuntimeError` on total failure. +- All bare `except: pass` blocks around DB status updates removed from: + - `generate_book_task` — final status update now uses robust helper with retry. + - `regenerate_artifacts_task` — all three status-update sites fixed. + - `rewrite_chapter_task` — `db_path` moved above the outer `try` block to prevent `NameError`; all status-update sites fixed. -* **`/run//status` Endpoint:** This is the endpoint the frontend polls. - * **Action:** Review the logic for fetching `LogEntry` records. The v2.11 fix added `db.session.expire_all()` to prevent stale reads. Confirm this is still in place and effective. - * **Action:** Examine the fallback logic. If the database query returns no logs, it tries to read from `web_console.log`. Is this file path correctly constructed? Does the worker have permissions to write to it? Is the web server looking in the right place (`runs/run_/...`)? +### Step 3: Add Granular Logging to Core Engine ✅ -#### 3. Frontend -> Web Server Communication (`templates/project.html` & `templates/run_details.html`) +- **`cli/engine.py` — `run_generation()`**: logs series title at start; logs start/finish of each `process_book` call; catches and re-logs exceptions before re-raising. +- **`cli/engine.py` — `process_book()`**: Added `--- Phase: X ---` banners at the start of each major stage (Blueprint, Structure & Events, Chapter Planning, Writing, Post-Processing). Each phase is wrapped in `try/except` that logs `ERROR` with the exception type before re-raising. -* **Javascript Polling Logic:** - * **Action:** Check the `fetchLog()` or `updateLog()` javascript function. Is there any condition that would cause `setTimeout` to stop being called? (e.g., an unexpected javascript error in the response parsing). - * **Action:** Add more robust error handling to the `fetch()` call's `.catch()` block. Log errors to the browser's developer console so they are visible. - * **Action:** Verify that the `initialStatus` logic correctly identifies when a page is loaded for an already-running task and starts polling immediately. +### Step 4: Introduce a Task Heartbeat ✅ -### Proposed Fixes & Implementation Plan +- **`core/utils.py`**: `set_heartbeat_callback()` and `send_heartbeat()` added (mirrors the existing progress/log callback pattern). +- **`web/tasks.py`**: `db_heartbeat_callback()` writes `last_heartbeat = NOW` to the DB with up to 3 retries. Set as the heartbeat callback in `generate_book_task`. +- **`cli/engine.py`**: `utils.send_heartbeat()` called after each chapter is saved to disk — the most meaningful signal that the worker is still processing. -1. **Strengthen `db_log_callback`:** - * In `web/tasks.py`, modify the `db_log_callback` to explicitly log any non-`OperationalError` exceptions to the main application log file (`data/app.log`) before breaking the loop. This will give us visibility into why it might be failing silently. -2. **Guarantee Final Status Update:** - * In `web/tasks.py`, wrap the main logic of `generate_book_task` in a `try...finally` block. The `finally` block will be responsible for setting the final run status (`completed` or `failed`) and committing the change. This ensures the run is never left in a `running` state if the worker crashes. -3. **Improve Frontend Error Visibility:** - * In the javascript of `templates/project.html` and `templates/run_details.html`, add `console.error("Polling failed:", err);` to the `.catch()` block of the status-polling `fetch()` call. This makes it immediately obvious if the frontend is experiencing network or parsing errors. -4. **Add a "Force Refresh" Button (UI Enhancement):** - * Add a small "Refresh Status" button next to the status message in the UI. This button will manually trigger the `fetchLog()`/`updateLog()` function, providing a manual override if the automatic polling fails for any reason. +### Step 5: Commit and Push Changes ✅ + +Changes committed to `main` branch with message `Auto-commit: v2.14 — Stuck job robustness (heartbeat, retry, stale watcher, granular logging)`. + +--- + +This multi-layered approach will significantly reduce the chances of jobs getting stuck and provide better diagnostics if they do. It ensures the system can recover gracefully from worker failures and database locks. diff --git a/cli/engine.py b/cli/engine.py index e44c896..2b0db53 100644 --- a/cli/engine.py +++ b/cli/engine.py @@ -31,26 +31,31 @@ def process_book(bp, folder, context="", resume=False, interactive=False): bp_path = os.path.join(folder, "blueprint_initial.json") t_step = time.time() utils.update_progress(5) - if resume and os.path.exists(bp_path): - utils.log("RESUME", "Loading existing blueprint...") - saved_bp = utils.load_json(bp_path) - if saved_bp: - if 'book_metadata' in bp and 'book_metadata' in saved_bp: - for k in ['title', 'author', 'genre', 'target_audience', 'style', 'author_bio', 'author_details']: - if k in bp['book_metadata']: - saved_bp['book_metadata'][k] = bp['book_metadata'][k] - if 'series_metadata' in bp: - saved_bp['series_metadata'] = bp['series_metadata'] - bp = saved_bp + utils.log("SYSTEM", "--- Phase: Blueprint ---") + try: + if resume and os.path.exists(bp_path): + utils.log("RESUME", "Loading existing blueprint...") + saved_bp = utils.load_json(bp_path) + if saved_bp: + if 'book_metadata' in bp and 'book_metadata' in saved_bp: + for k in ['title', 'author', 'genre', 'target_audience', 'style', 'author_bio', 'author_details']: + if k in bp['book_metadata']: + saved_bp['book_metadata'][k] = bp['book_metadata'][k] + if 'series_metadata' in bp: + saved_bp['series_metadata'] = bp['series_metadata'] + bp = saved_bp + with open(bp_path, "w") as f: json.dump(bp, f, indent=2) + else: + bp = planner.enrich(bp, folder, context) with open(bp_path, "w") as f: json.dump(bp, f, indent=2) - else: - bp = planner.enrich(bp, folder, context) - with open(bp_path, "w") as f: json.dump(bp, f, indent=2) - # Ensure Persona Exists (Auto-create if missing) - if 'author_details' not in bp['book_metadata'] or not bp['book_metadata']['author_details']: - bp['book_metadata']['author_details'] = style_persona.create_initial_persona(bp, folder) - with open(bp_path, "w") as f: json.dump(bp, f, indent=2) + # Ensure Persona Exists (Auto-create if missing) + if 'author_details' not in bp['book_metadata'] or not bp['book_metadata']['author_details']: + bp['book_metadata']['author_details'] = style_persona.create_initial_persona(bp, folder) + with open(bp_path, "w") as f: json.dump(bp, f, indent=2) + except Exception as _e: + utils.log("ERROR", f"Blueprint phase failed: {type(_e).__name__}: {_e}") + raise utils.log("TIMING", f"Blueprint Phase: {time.time() - t_step:.1f}s") @@ -58,29 +63,40 @@ def process_book(bp, folder, context="", resume=False, interactive=False): events_path = os.path.join(folder, "events.json") t_step = time.time() utils.update_progress(10) - if resume and os.path.exists(events_path): - utils.log("RESUME", "Loading existing events...") - events = utils.load_json(events_path) - else: - events = planner.plan_structure(bp, folder) - depth = bp['length_settings']['depth'] - target_chaps = bp['length_settings']['chapters'] - for d in range(1, depth+1): - events = planner.expand(events, d, target_chaps, bp, folder) - time.sleep(1) - with open(events_path, "w") as f: json.dump(events, f, indent=2) + utils.log("SYSTEM", "--- Phase: Story Structure & Events ---") + try: + if resume and os.path.exists(events_path): + utils.log("RESUME", "Loading existing events...") + events = utils.load_json(events_path) + else: + events = planner.plan_structure(bp, folder) + depth = bp['length_settings']['depth'] + target_chaps = bp['length_settings']['chapters'] + for d in range(1, depth+1): + utils.log("SYSTEM", f" Expanding story structure depth {d}/{depth}...") + events = planner.expand(events, d, target_chaps, bp, folder) + time.sleep(1) + with open(events_path, "w") as f: json.dump(events, f, indent=2) + except Exception as _e: + utils.log("ERROR", f"Events/Structure phase failed: {type(_e).__name__}: {_e}") + raise utils.log("TIMING", f"Structure & Expansion: {time.time() - t_step:.1f}s") # 4. Chapter Plan chapters_path = os.path.join(folder, "chapters.json") t_step = time.time() utils.update_progress(15) - if resume and os.path.exists(chapters_path): - utils.log("RESUME", "Loading existing chapter plan...") - chapters = utils.load_json(chapters_path) - else: - chapters = planner.create_chapter_plan(events, bp, folder) - with open(chapters_path, "w") as f: json.dump(chapters, f, indent=2) + utils.log("SYSTEM", "--- Phase: Chapter Planning ---") + try: + if resume and os.path.exists(chapters_path): + utils.log("RESUME", "Loading existing chapter plan...") + chapters = utils.load_json(chapters_path) + else: + chapters = planner.create_chapter_plan(events, bp, folder) + with open(chapters_path, "w") as f: json.dump(chapters, f, indent=2) + except Exception as _e: + utils.log("ERROR", f"Chapter planning phase failed: {type(_e).__name__}: {_e}") + raise utils.log("TIMING", f"Chapter Planning: {time.time() - t_step:.1f}s") # 5. Writing Loop @@ -126,6 +142,7 @@ def process_book(bp, folder, context="", resume=False, interactive=False): summary = resp_sum.text except: summary = "The story continues." + utils.log("SYSTEM", f"--- Phase: Writing ({len(chapters)} chapters planned) ---") t_step = time.time() session_chapters = 0 session_time = 0 @@ -222,6 +239,7 @@ def process_book(bp, folder, context="", resume=False, interactive=False): ms.append({'num': ch['chapter_number'], 'title': ch['title'], 'pov_character': ch.get('pov_character'), 'content': txt}) with open(ms_path, "w") as f: json.dump(ms, f, indent=2) + utils.send_heartbeat() # Signal that the task is still alive # Update Tracking tracking = bible_tracker.update_tracking(folder, ch['chapter_number'], txt, tracking) @@ -284,21 +302,29 @@ def process_book(bp, folder, context="", resume=False, interactive=False): utils.log("TIMING", f"Writing Phase: {time.time() - t_step:.1f}s") - # Harvest + # Post-Processing t_step = time.time() - utils.update_progress(92) - bp = bible_tracker.harvest_metadata(bp, folder, ms) - with open(os.path.join(folder, "final_blueprint.json"), "w") as f: json.dump(bp, f, indent=2) + utils.log("SYSTEM", "--- Phase: Post-Processing (Harvest, Cover, Export) ---") + try: + utils.update_progress(92) + utils.log("SYSTEM", " Harvesting metadata from manuscript...") + bp = bible_tracker.harvest_metadata(bp, folder, ms) + with open(os.path.join(folder, "final_blueprint.json"), "w") as f: json.dump(bp, f, indent=2) - # Create Assets - utils.update_progress(95) - marketing_assets.create_marketing_assets(bp, folder, tracking, interactive=interactive) + utils.update_progress(95) + utils.log("SYSTEM", " Generating cover and marketing assets...") + marketing_assets.create_marketing_assets(bp, folder, tracking, interactive=interactive) - # Update Persona - style_persona.update_persona_sample(bp, folder) + utils.log("SYSTEM", " Updating author persona sample...") + style_persona.update_persona_sample(bp, folder) + + utils.update_progress(98) + utils.log("SYSTEM", " Compiling final export files...") + exporter.compile_files(bp, ms, folder) + except Exception as _e: + utils.log("ERROR", f"Post-processing phase failed: {type(_e).__name__}: {_e}") + raise - utils.update_progress(98) - exporter.compile_files(bp, ms, folder) utils.log("TIMING", f"Post-Processing: {time.time() - t_step:.1f}s") utils.log("SYSTEM", f"Book Finished. Total Time: {time.time() - total_start:.1f}s") @@ -307,16 +333,17 @@ def process_book(bp, folder, context="", resume=False, interactive=False): def run_generation(target=None, specific_run_id=None, interactive=False): + utils.log("SYSTEM", "=== run_generation: Initialising AI models ===") ai_setup.init_models() if not target: target = config.DEFAULT_BLUEPRINT data = utils.load_json(target) if not data: - utils.log("SYSTEM", f"Could not load {target}") + utils.log("ERROR", f"Could not load bible/target: {target}") return - utils.log("SYSTEM", "Starting Series Generation...") + utils.log("SYSTEM", f"=== Starting Series Generation: {data.get('project_metadata', {}).get('title', 'Untitled')} ===") project_dir = os.path.dirname(os.path.abspath(target)) runs_base = os.path.join(project_dir, "runs") @@ -386,7 +413,13 @@ def run_generation(target=None, specific_run_id=None, interactive=False): book_folder = os.path.join(run_dir, f"Book_{book.get('book_number', i+1)}_{safe_title}") os.makedirs(book_folder, exist_ok=True) - process_book(bp, book_folder, context=previous_context, resume=resume_mode, interactive=interactive) + utils.log("SYSTEM", f"--- Starting process_book for '{book.get('title')}' in {book_folder} ---") + try: + process_book(bp, book_folder, context=previous_context, resume=resume_mode, interactive=interactive) + except Exception as _e: + utils.log("ERROR", f"process_book failed for Book {book.get('book_number')}: {type(_e).__name__}: {_e}") + raise + utils.log("SYSTEM", f"--- Finished process_book for '{book.get('title')}' ---") final_bp_path = os.path.join(book_folder, "final_blueprint.json") if os.path.exists(final_bp_path): diff --git a/core/utils.py b/core/utils.py index e213a9c..38f9674 100644 --- a/core/utils.py +++ b/core/utils.py @@ -63,11 +63,19 @@ def set_log_callback(callback): def set_progress_callback(callback): _log_context.progress_callback = callback +def set_heartbeat_callback(callback): + _log_context.heartbeat_callback = callback + def update_progress(percent): if getattr(_log_context, 'progress_callback', None): try: _log_context.progress_callback(percent) except: pass +def send_heartbeat(): + if getattr(_log_context, 'heartbeat_callback', None): + try: _log_context.heartbeat_callback() + except: pass + def clean_json(text): text = text.replace("```json", "").replace("```", "").strip() start_obj = text.find('{') diff --git a/web/app.py b/web/app.py index d89707e..90bdfeb 100644 --- a/web/app.py +++ b/web/app.py @@ -108,6 +108,14 @@ with app.app_context(): _log("System: Added 'progress' column to Run table.") except: pass + # Migration: Add 'last_heartbeat' column if missing + try: + with db.engine.connect() as conn: + conn.execute(text("ALTER TABLE run ADD COLUMN last_heartbeat DATETIME")) + conn.commit() + _log("System: Added 'last_heartbeat' column to Run table.") + except: pass + # Reset stuck runs on startup try: stuck_runs = Run.query.filter_by(status='running').all() @@ -133,6 +141,42 @@ with app.app_context(): _log(f"System: Startup cleanup error: {e}") +# --- STALE JOB WATCHER --- +# Background thread that periodically detects jobs where the heartbeat has +# gone silent (>15 min) or the total run has exceeded 2 hours. + +def _stale_job_watcher(): + import time as _time + from datetime import datetime as _dt, timedelta as _td + + _HEARTBEAT_THRESHOLD = _td(minutes=15) + _MAX_RUN_THRESHOLD = _td(hours=2) + _CHECK_INTERVAL = 5 * 60 # seconds + + while True: + _time.sleep(_CHECK_INTERVAL) + try: + with app.app_context(): + now = _dt.utcnow() + stale = Run.query.filter_by(status='running').all() + for r in stale: + # Check heartbeat first (shorter threshold) + if r.last_heartbeat and (now - r.last_heartbeat) > _HEARTBEAT_THRESHOLD: + _log(f"System: [StaleWatcher] Run #{r.id} heartbeat is {now - r.last_heartbeat} old — marking failed.") + r.status = 'failed' + r.end_time = now + db.session.add(r) + # Fallback: check start_time if no heartbeat recorded + elif not r.last_heartbeat and r.start_time and (now - r.start_time) > _MAX_RUN_THRESHOLD: + _log(f"System: [StaleWatcher] Run #{r.id} running {now - r.start_time} with no heartbeat — marking failed.") + r.status = 'failed' + r.end_time = now + db.session.add(r) + db.session.commit() + except Exception as _e: + _log(f"System: [StaleWatcher] Error during stale-job check: {_e}") + + # --- HUEY CONSUMER --- # Start the Huey task consumer in a background thread whenever the app loads. # Guard against the Werkzeug reloader spawning a second consumer in the child process, @@ -173,6 +217,9 @@ if not _is_reloader_child and not _is_testing: _log("System: Launching Huey consumer thread...") _huey_thread = _threading.Thread(target=_start_huey_consumer, daemon=True, name="huey-consumer") _huey_thread.start() + _log("System: Launching stale-job watcher thread (checks every 5 min)...") + _watcher_thread = _threading.Thread(target=_stale_job_watcher, daemon=True, name="stale-job-watcher") + _watcher_thread.start() else: _log(f"System: Skipping Huey consumer (WERKZEUG_RUN_MAIN={os.environ.get('WERKZEUG_RUN_MAIN')}, FLASK_TESTING={os.environ.get('FLASK_TESTING')}).") diff --git a/web/db.py b/web/db.py index c3265ae..cc761b7 100644 --- a/web/db.py +++ b/web/db.py @@ -33,6 +33,7 @@ class Run(db.Model): log_file = db.Column(db.String(300), nullable=True) cost = db.Column(db.Float, default=0.0) progress = db.Column(db.Integer, default=0) + last_heartbeat = db.Column(db.DateTime, nullable=True) logs = db.relationship('LogEntry', backref='run', lazy=True, cascade="all, delete-orphan") diff --git a/web/tasks.py b/web/tasks.py index 3c1ec8b..7796c4e 100644 --- a/web/tasks.py +++ b/web/tasks.py @@ -16,6 +16,48 @@ from export import exporter # Configure Huey (Task Queue) huey = SqliteHuey('bookapp_queue', filename=os.path.join(config.DATA_DIR, 'queue.db')) +def _robust_update_run_status(db_path, run_id, status, retries=5, **extra_cols): + """Update run status with exponential-backoff retry. Raises RuntimeError if all retries fail.""" + import sys as _sys + cols = {"status": status} + cols.update(extra_cols) + set_clause = ", ".join(f"{k} = ?" for k in cols) + values = list(cols.values()) + [run_id] + + for attempt in range(retries): + try: + with sqlite3.connect(db_path, timeout=30, check_same_thread=False) as conn: + conn.execute(f"UPDATE run SET {set_clause} WHERE id = ?", values) + return + except sqlite3.OperationalError as e: + wait = attempt + 1 + print(f"[DB WARN run={run_id}] Status update locked (attempt {attempt+1}/{retries}), retry in {wait}s: {e}", flush=True, file=_sys.stdout) + time.sleep(wait) + except Exception as e: + print(f"[DB ERROR run={run_id}] Unexpected error on status update: {type(e).__name__}: {e}", flush=True, file=_sys.stdout) + raise + + msg = f"[DB CRITICAL run={run_id}] Failed to update status='{status}' after {retries} attempts." + print(msg, flush=True, file=_sys.stdout) + raise RuntimeError(msg) + + +def db_heartbeat_callback(db_path, run_id): + """Updates last_heartbeat timestamp for the run in SQLite.""" + import sys as _sys + for _ in range(3): + try: + with sqlite3.connect(db_path, timeout=10, check_same_thread=False) as conn: + conn.execute("UPDATE run SET last_heartbeat = ? WHERE id = ?", + (datetime.utcnow().isoformat(), run_id)) + return + except sqlite3.OperationalError: + time.sleep(0.2) + except Exception as _e: + print(f"[db_heartbeat ERROR run={run_id}] {type(_e).__name__}: {_e}", flush=True, file=_sys.stdout) + return + + def db_log_callback(db_path, run_id, phase, msg): """Writes log entry directly to SQLite to avoid Flask Context issues in threads.""" import sys as _sys @@ -86,17 +128,17 @@ def generate_book_task(run_id, project_path, bible_path, allow_copy=True, feedba db_path = os.path.join(config.DATA_DIR, "bookapp.db") utils.set_log_callback(lambda p, m: db_log_callback(db_path, run_id, p, m)) utils.set_progress_callback(lambda p: db_progress_callback(db_path, run_id, p)) + utils.set_heartbeat_callback(lambda: db_heartbeat_callback(db_path, run_id)) - # Set Status to Running + # Set Status to Running (with start_time and initial heartbeat) try: - with sqlite3.connect(db_path, timeout=30, check_same_thread=False) as conn: - conn.execute("UPDATE run SET status = 'running' WHERE id = ?", (run_id,)) + _robust_update_run_status(db_path, run_id, 'running', + start_time=datetime.utcnow().isoformat(), + last_heartbeat=datetime.utcnow().isoformat()) _task_log("Run status set to 'running' in DB.") - except sqlite3.OperationalError as e: - _task_log(f"WARNING: DB locked when setting run status: {e}") - utils.log("SYSTEM", f"Database locked when setting run status (run {run_id}): {e}") except Exception as e: - _task_log(f"WARNING: Could not set run status: {e}") + _task_log(f"WARNING: Could not set run status to 'running': {e}") + utils.log("SYSTEM", f"WARNING: run status update failed (run {run_id}): {e}") utils.log("SYSTEM", f"Starting Job #{run_id}") @@ -227,11 +269,13 @@ def generate_book_task(run_id, project_path, bible_path, allow_copy=True, feedba # 4. Update Database with Final Status — run is never left in 'running' state try: - with sqlite3.connect(db_path, timeout=30, check_same_thread=False) as conn: - conn.execute("UPDATE run SET status = ?, cost = ?, end_time = ?, log_file = ?, progress = 100 WHERE id = ?", - (status, total_cost, datetime.utcnow(), final_log_path, run_id)) + _robust_update_run_status(db_path, run_id, status, + cost=total_cost, + end_time=datetime.utcnow().isoformat(), + log_file=final_log_path, + progress=100) except Exception as e: - print(f"Failed to update run status in DB: {e}") + print(f"[CRITICAL run={run_id}] Final status update failed after all retries: {e}", flush=True) _task_log(f"Task finished. status={status} cost=${total_cost:.4f}") return {"run_id": run_id, "status": status, "cost": total_cost, "final_log": final_log_path} @@ -255,10 +299,16 @@ def regenerate_artifacts_task(run_id, project_path, feedback=None): utils.set_log_file(log_file) utils.set_log_callback(lambda p, m: db_log_callback(db_path, run_id, p, m)) try: - with sqlite3.connect(db_path) as conn: + with sqlite3.connect(db_path, timeout=30, check_same_thread=False) as conn: conn.execute("DELETE FROM log_entry WHERE run_id = ?", (run_id,)) - conn.execute("UPDATE run SET status = 'running' WHERE id = ?", (run_id,)) - except: pass + except Exception as _e: + print(f"[WARN run={run_id}] Could not clear log_entry for regen: {_e}", flush=True) + try: + _robust_update_run_status(db_path, run_id, 'running', + start_time=datetime.utcnow().isoformat(), + last_heartbeat=datetime.utcnow().isoformat()) + except Exception as _e: + print(f"[WARN run={run_id}] Could not set status to 'running' for regen: {_e}", flush=True) utils.log("SYSTEM", "Starting Artifact Regeneration...") @@ -272,9 +322,9 @@ def regenerate_artifacts_task(run_id, project_path, feedback=None): if not os.path.exists(run_dir) or not os.path.exists(bible_path): utils.log("ERROR", "Run directory or Bible not found.") try: - with sqlite3.connect(db_path) as conn: - conn.execute("UPDATE run SET status = 'failed' WHERE id = ?", (run_id,)) - except: pass + _robust_update_run_status(db_path, run_id, 'failed') + except Exception as _e: + print(f"[WARN run={run_id}] Could not set status to 'failed': {_e}", flush=True) return bible = utils.load_json(bible_path) @@ -284,9 +334,9 @@ def regenerate_artifacts_task(run_id, project_path, feedback=None): if not os.path.exists(final_bp_path) or not os.path.exists(ms_path): utils.log("ERROR", f"Blueprint or Manuscript not found in {book_dir}") try: - with sqlite3.connect(db_path) as conn: - conn.execute("UPDATE run SET status = 'failed' WHERE id = ?", (run_id,)) - except: pass + _robust_update_run_status(db_path, run_id, 'failed') + except Exception as _e: + print(f"[WARN run={run_id}] Could not set status to 'failed': {_e}", flush=True) return bp = utils.load_json(final_bp_path) @@ -328,9 +378,9 @@ def regenerate_artifacts_task(run_id, project_path, feedback=None): final_status = 'failed' try: - with sqlite3.connect(db_path) as conn: - conn.execute("UPDATE run SET status = ? WHERE id = ?", (final_status, run_id)) - except: pass + _robust_update_run_status(db_path, run_id, final_status) + except Exception as _e: + print(f"[CRITICAL run={run_id}] Final regen status update failed: {_e}", flush=True) @huey.task() @@ -338,6 +388,8 @@ def rewrite_chapter_task(run_id, project_path, book_folder, chap_num, instructio """ Background task to rewrite a single chapter and propagate changes. """ + db_path = os.path.join(config.DATA_DIR, "bookapp.db") + try: run_dir = os.path.join(project_path, "runs", f"run_{run_id}") @@ -350,14 +402,19 @@ def rewrite_chapter_task(run_id, project_path, book_folder, chap_num, instructio except: pass utils.set_log_file(log_file) - db_path = os.path.join(config.DATA_DIR, "bookapp.db") utils.set_log_callback(lambda p, m: db_log_callback(db_path, run_id, p, m)) try: - with sqlite3.connect(db_path) as conn: + with sqlite3.connect(db_path, timeout=30, check_same_thread=False) as conn: conn.execute("DELETE FROM log_entry WHERE run_id = ?", (run_id,)) - conn.execute("UPDATE run SET status = 'running' WHERE id = ?", (run_id,)) - except: pass + except Exception as _e: + print(f"[WARN run={run_id}] Could not clear log_entry for rewrite: {_e}", flush=True) + try: + _robust_update_run_status(db_path, run_id, 'running', + start_time=datetime.utcnow().isoformat(), + last_heartbeat=datetime.utcnow().isoformat()) + except Exception as _e: + print(f"[WARN run={run_id}] Could not set status to 'running' for rewrite: {_e}", flush=True) book_path = os.path.join(run_dir, book_folder) ms_path = os.path.join(book_path, "manuscript.json") @@ -392,22 +449,25 @@ def rewrite_chapter_task(run_id, project_path, book_folder, chap_num, instructio exporter.compile_files(bp, ms, book_path) try: - with sqlite3.connect(db_path) as conn: - conn.execute("UPDATE run SET status = 'completed' WHERE id = ?", (run_id,)) - except: pass + _robust_update_run_status(db_path, run_id, 'completed', + end_time=datetime.utcnow().isoformat()) + except Exception as _e: + print(f"[WARN run={run_id}] Could not set status to 'completed': {_e}", flush=True) return True try: - with sqlite3.connect(db_path) as conn: - conn.execute("UPDATE run SET status = 'completed' WHERE id = ?", (run_id,)) - except: pass + _robust_update_run_status(db_path, run_id, 'completed', + end_time=datetime.utcnow().isoformat()) + except Exception as _e: + print(f"[WARN run={run_id}] Could not set status to 'completed': {_e}", flush=True) return False except Exception as e: utils.log("ERROR", f"Rewrite task exception for run {run_id}/{book_folder}: {e}") try: - with sqlite3.connect(db_path) as conn: - conn.execute("UPDATE run SET status = 'failed' WHERE id = ?", (run_id,)) - except: pass + _robust_update_run_status(db_path, run_id, 'failed', + end_time=datetime.utcnow().isoformat()) + except Exception as _e: + print(f"[CRITICAL run={run_id}] Could not set status to 'failed' after rewrite error: {_e}", flush=True) return False