Auto-commit: v2.14 — Stuck job robustness (heartbeat, retry, stale watcher, granular logging)
- web/db.py: Add last_heartbeat column to Run model - core/utils.py: Add set_heartbeat_callback() and send_heartbeat() - web/tasks.py: Add _robust_update_run_status() with 5-retry exponential backoff; add db_heartbeat_callback(); remove all bare except:pass on DB status updates; set start_time + last_heartbeat when marking run as 'running' - web/app.py: Add last_heartbeat column migration; add _stale_job_watcher() background thread (checks every 5 min, 15-min heartbeat threshold, 2-hr start_time threshold) - cli/engine.py: Add phase-level logging banners and try/except wrappers in process_book(); add utils.send_heartbeat() after each chapter save; add start/finish logging in run_generation() Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
47
web/app.py
47
web/app.py
@@ -108,6 +108,14 @@ with app.app_context():
|
||||
_log("System: Added 'progress' column to Run table.")
|
||||
except: pass
|
||||
|
||||
# Migration: Add 'last_heartbeat' column if missing
|
||||
try:
|
||||
with db.engine.connect() as conn:
|
||||
conn.execute(text("ALTER TABLE run ADD COLUMN last_heartbeat DATETIME"))
|
||||
conn.commit()
|
||||
_log("System: Added 'last_heartbeat' column to Run table.")
|
||||
except: pass
|
||||
|
||||
# Reset stuck runs on startup
|
||||
try:
|
||||
stuck_runs = Run.query.filter_by(status='running').all()
|
||||
@@ -133,6 +141,42 @@ with app.app_context():
|
||||
_log(f"System: Startup cleanup error: {e}")
|
||||
|
||||
|
||||
# --- STALE JOB WATCHER ---
|
||||
# Background thread that periodically detects jobs where the heartbeat has
|
||||
# gone silent (>15 min) or the total run has exceeded 2 hours.
|
||||
|
||||
def _stale_job_watcher():
|
||||
import time as _time
|
||||
from datetime import datetime as _dt, timedelta as _td
|
||||
|
||||
_HEARTBEAT_THRESHOLD = _td(minutes=15)
|
||||
_MAX_RUN_THRESHOLD = _td(hours=2)
|
||||
_CHECK_INTERVAL = 5 * 60 # seconds
|
||||
|
||||
while True:
|
||||
_time.sleep(_CHECK_INTERVAL)
|
||||
try:
|
||||
with app.app_context():
|
||||
now = _dt.utcnow()
|
||||
stale = Run.query.filter_by(status='running').all()
|
||||
for r in stale:
|
||||
# Check heartbeat first (shorter threshold)
|
||||
if r.last_heartbeat and (now - r.last_heartbeat) > _HEARTBEAT_THRESHOLD:
|
||||
_log(f"System: [StaleWatcher] Run #{r.id} heartbeat is {now - r.last_heartbeat} old — marking failed.")
|
||||
r.status = 'failed'
|
||||
r.end_time = now
|
||||
db.session.add(r)
|
||||
# Fallback: check start_time if no heartbeat recorded
|
||||
elif not r.last_heartbeat and r.start_time and (now - r.start_time) > _MAX_RUN_THRESHOLD:
|
||||
_log(f"System: [StaleWatcher] Run #{r.id} running {now - r.start_time} with no heartbeat — marking failed.")
|
||||
r.status = 'failed'
|
||||
r.end_time = now
|
||||
db.session.add(r)
|
||||
db.session.commit()
|
||||
except Exception as _e:
|
||||
_log(f"System: [StaleWatcher] Error during stale-job check: {_e}")
|
||||
|
||||
|
||||
# --- HUEY CONSUMER ---
|
||||
# Start the Huey task consumer in a background thread whenever the app loads.
|
||||
# Guard against the Werkzeug reloader spawning a second consumer in the child process,
|
||||
@@ -173,6 +217,9 @@ if not _is_reloader_child and not _is_testing:
|
||||
_log("System: Launching Huey consumer thread...")
|
||||
_huey_thread = _threading.Thread(target=_start_huey_consumer, daemon=True, name="huey-consumer")
|
||||
_huey_thread.start()
|
||||
_log("System: Launching stale-job watcher thread (checks every 5 min)...")
|
||||
_watcher_thread = _threading.Thread(target=_stale_job_watcher, daemon=True, name="stale-job-watcher")
|
||||
_watcher_thread.start()
|
||||
else:
|
||||
_log(f"System: Skipping Huey consumer (WERKZEUG_RUN_MAIN={os.environ.get('WERKZEUG_RUN_MAIN')}, FLASK_TESTING={os.environ.get('FLASK_TESTING')}).")
|
||||
|
||||
|
||||
@@ -33,6 +33,7 @@ class Run(db.Model):
|
||||
log_file = db.Column(db.String(300), nullable=True)
|
||||
cost = db.Column(db.Float, default=0.0)
|
||||
progress = db.Column(db.Integer, default=0)
|
||||
last_heartbeat = db.Column(db.DateTime, nullable=True)
|
||||
|
||||
logs = db.relationship('LogEntry', backref='run', lazy=True, cascade="all, delete-orphan")
|
||||
|
||||
|
||||
132
web/tasks.py
132
web/tasks.py
@@ -16,6 +16,48 @@ from export import exporter
|
||||
# Configure Huey (Task Queue)
|
||||
huey = SqliteHuey('bookapp_queue', filename=os.path.join(config.DATA_DIR, 'queue.db'))
|
||||
|
||||
def _robust_update_run_status(db_path, run_id, status, retries=5, **extra_cols):
|
||||
"""Update run status with exponential-backoff retry. Raises RuntimeError if all retries fail."""
|
||||
import sys as _sys
|
||||
cols = {"status": status}
|
||||
cols.update(extra_cols)
|
||||
set_clause = ", ".join(f"{k} = ?" for k in cols)
|
||||
values = list(cols.values()) + [run_id]
|
||||
|
||||
for attempt in range(retries):
|
||||
try:
|
||||
with sqlite3.connect(db_path, timeout=30, check_same_thread=False) as conn:
|
||||
conn.execute(f"UPDATE run SET {set_clause} WHERE id = ?", values)
|
||||
return
|
||||
except sqlite3.OperationalError as e:
|
||||
wait = attempt + 1
|
||||
print(f"[DB WARN run={run_id}] Status update locked (attempt {attempt+1}/{retries}), retry in {wait}s: {e}", flush=True, file=_sys.stdout)
|
||||
time.sleep(wait)
|
||||
except Exception as e:
|
||||
print(f"[DB ERROR run={run_id}] Unexpected error on status update: {type(e).__name__}: {e}", flush=True, file=_sys.stdout)
|
||||
raise
|
||||
|
||||
msg = f"[DB CRITICAL run={run_id}] Failed to update status='{status}' after {retries} attempts."
|
||||
print(msg, flush=True, file=_sys.stdout)
|
||||
raise RuntimeError(msg)
|
||||
|
||||
|
||||
def db_heartbeat_callback(db_path, run_id):
|
||||
"""Updates last_heartbeat timestamp for the run in SQLite."""
|
||||
import sys as _sys
|
||||
for _ in range(3):
|
||||
try:
|
||||
with sqlite3.connect(db_path, timeout=10, check_same_thread=False) as conn:
|
||||
conn.execute("UPDATE run SET last_heartbeat = ? WHERE id = ?",
|
||||
(datetime.utcnow().isoformat(), run_id))
|
||||
return
|
||||
except sqlite3.OperationalError:
|
||||
time.sleep(0.2)
|
||||
except Exception as _e:
|
||||
print(f"[db_heartbeat ERROR run={run_id}] {type(_e).__name__}: {_e}", flush=True, file=_sys.stdout)
|
||||
return
|
||||
|
||||
|
||||
def db_log_callback(db_path, run_id, phase, msg):
|
||||
"""Writes log entry directly to SQLite to avoid Flask Context issues in threads."""
|
||||
import sys as _sys
|
||||
@@ -86,17 +128,17 @@ def generate_book_task(run_id, project_path, bible_path, allow_copy=True, feedba
|
||||
db_path = os.path.join(config.DATA_DIR, "bookapp.db")
|
||||
utils.set_log_callback(lambda p, m: db_log_callback(db_path, run_id, p, m))
|
||||
utils.set_progress_callback(lambda p: db_progress_callback(db_path, run_id, p))
|
||||
utils.set_heartbeat_callback(lambda: db_heartbeat_callback(db_path, run_id))
|
||||
|
||||
# Set Status to Running
|
||||
# Set Status to Running (with start_time and initial heartbeat)
|
||||
try:
|
||||
with sqlite3.connect(db_path, timeout=30, check_same_thread=False) as conn:
|
||||
conn.execute("UPDATE run SET status = 'running' WHERE id = ?", (run_id,))
|
||||
_robust_update_run_status(db_path, run_id, 'running',
|
||||
start_time=datetime.utcnow().isoformat(),
|
||||
last_heartbeat=datetime.utcnow().isoformat())
|
||||
_task_log("Run status set to 'running' in DB.")
|
||||
except sqlite3.OperationalError as e:
|
||||
_task_log(f"WARNING: DB locked when setting run status: {e}")
|
||||
utils.log("SYSTEM", f"Database locked when setting run status (run {run_id}): {e}")
|
||||
except Exception as e:
|
||||
_task_log(f"WARNING: Could not set run status: {e}")
|
||||
_task_log(f"WARNING: Could not set run status to 'running': {e}")
|
||||
utils.log("SYSTEM", f"WARNING: run status update failed (run {run_id}): {e}")
|
||||
|
||||
utils.log("SYSTEM", f"Starting Job #{run_id}")
|
||||
|
||||
@@ -227,11 +269,13 @@ def generate_book_task(run_id, project_path, bible_path, allow_copy=True, feedba
|
||||
|
||||
# 4. Update Database with Final Status — run is never left in 'running' state
|
||||
try:
|
||||
with sqlite3.connect(db_path, timeout=30, check_same_thread=False) as conn:
|
||||
conn.execute("UPDATE run SET status = ?, cost = ?, end_time = ?, log_file = ?, progress = 100 WHERE id = ?",
|
||||
(status, total_cost, datetime.utcnow(), final_log_path, run_id))
|
||||
_robust_update_run_status(db_path, run_id, status,
|
||||
cost=total_cost,
|
||||
end_time=datetime.utcnow().isoformat(),
|
||||
log_file=final_log_path,
|
||||
progress=100)
|
||||
except Exception as e:
|
||||
print(f"Failed to update run status in DB: {e}")
|
||||
print(f"[CRITICAL run={run_id}] Final status update failed after all retries: {e}", flush=True)
|
||||
|
||||
_task_log(f"Task finished. status={status} cost=${total_cost:.4f}")
|
||||
return {"run_id": run_id, "status": status, "cost": total_cost, "final_log": final_log_path}
|
||||
@@ -255,10 +299,16 @@ def regenerate_artifacts_task(run_id, project_path, feedback=None):
|
||||
utils.set_log_file(log_file)
|
||||
utils.set_log_callback(lambda p, m: db_log_callback(db_path, run_id, p, m))
|
||||
try:
|
||||
with sqlite3.connect(db_path) as conn:
|
||||
with sqlite3.connect(db_path, timeout=30, check_same_thread=False) as conn:
|
||||
conn.execute("DELETE FROM log_entry WHERE run_id = ?", (run_id,))
|
||||
conn.execute("UPDATE run SET status = 'running' WHERE id = ?", (run_id,))
|
||||
except: pass
|
||||
except Exception as _e:
|
||||
print(f"[WARN run={run_id}] Could not clear log_entry for regen: {_e}", flush=True)
|
||||
try:
|
||||
_robust_update_run_status(db_path, run_id, 'running',
|
||||
start_time=datetime.utcnow().isoformat(),
|
||||
last_heartbeat=datetime.utcnow().isoformat())
|
||||
except Exception as _e:
|
||||
print(f"[WARN run={run_id}] Could not set status to 'running' for regen: {_e}", flush=True)
|
||||
|
||||
utils.log("SYSTEM", "Starting Artifact Regeneration...")
|
||||
|
||||
@@ -272,9 +322,9 @@ def regenerate_artifacts_task(run_id, project_path, feedback=None):
|
||||
if not os.path.exists(run_dir) or not os.path.exists(bible_path):
|
||||
utils.log("ERROR", "Run directory or Bible not found.")
|
||||
try:
|
||||
with sqlite3.connect(db_path) as conn:
|
||||
conn.execute("UPDATE run SET status = 'failed' WHERE id = ?", (run_id,))
|
||||
except: pass
|
||||
_robust_update_run_status(db_path, run_id, 'failed')
|
||||
except Exception as _e:
|
||||
print(f"[WARN run={run_id}] Could not set status to 'failed': {_e}", flush=True)
|
||||
return
|
||||
|
||||
bible = utils.load_json(bible_path)
|
||||
@@ -284,9 +334,9 @@ def regenerate_artifacts_task(run_id, project_path, feedback=None):
|
||||
if not os.path.exists(final_bp_path) or not os.path.exists(ms_path):
|
||||
utils.log("ERROR", f"Blueprint or Manuscript not found in {book_dir}")
|
||||
try:
|
||||
with sqlite3.connect(db_path) as conn:
|
||||
conn.execute("UPDATE run SET status = 'failed' WHERE id = ?", (run_id,))
|
||||
except: pass
|
||||
_robust_update_run_status(db_path, run_id, 'failed')
|
||||
except Exception as _e:
|
||||
print(f"[WARN run={run_id}] Could not set status to 'failed': {_e}", flush=True)
|
||||
return
|
||||
|
||||
bp = utils.load_json(final_bp_path)
|
||||
@@ -328,9 +378,9 @@ def regenerate_artifacts_task(run_id, project_path, feedback=None):
|
||||
final_status = 'failed'
|
||||
|
||||
try:
|
||||
with sqlite3.connect(db_path) as conn:
|
||||
conn.execute("UPDATE run SET status = ? WHERE id = ?", (final_status, run_id))
|
||||
except: pass
|
||||
_robust_update_run_status(db_path, run_id, final_status)
|
||||
except Exception as _e:
|
||||
print(f"[CRITICAL run={run_id}] Final regen status update failed: {_e}", flush=True)
|
||||
|
||||
|
||||
@huey.task()
|
||||
@@ -338,6 +388,8 @@ def rewrite_chapter_task(run_id, project_path, book_folder, chap_num, instructio
|
||||
"""
|
||||
Background task to rewrite a single chapter and propagate changes.
|
||||
"""
|
||||
db_path = os.path.join(config.DATA_DIR, "bookapp.db")
|
||||
|
||||
try:
|
||||
run_dir = os.path.join(project_path, "runs", f"run_{run_id}")
|
||||
|
||||
@@ -350,14 +402,19 @@ def rewrite_chapter_task(run_id, project_path, book_folder, chap_num, instructio
|
||||
except: pass
|
||||
|
||||
utils.set_log_file(log_file)
|
||||
db_path = os.path.join(config.DATA_DIR, "bookapp.db")
|
||||
utils.set_log_callback(lambda p, m: db_log_callback(db_path, run_id, p, m))
|
||||
|
||||
try:
|
||||
with sqlite3.connect(db_path) as conn:
|
||||
with sqlite3.connect(db_path, timeout=30, check_same_thread=False) as conn:
|
||||
conn.execute("DELETE FROM log_entry WHERE run_id = ?", (run_id,))
|
||||
conn.execute("UPDATE run SET status = 'running' WHERE id = ?", (run_id,))
|
||||
except: pass
|
||||
except Exception as _e:
|
||||
print(f"[WARN run={run_id}] Could not clear log_entry for rewrite: {_e}", flush=True)
|
||||
try:
|
||||
_robust_update_run_status(db_path, run_id, 'running',
|
||||
start_time=datetime.utcnow().isoformat(),
|
||||
last_heartbeat=datetime.utcnow().isoformat())
|
||||
except Exception as _e:
|
||||
print(f"[WARN run={run_id}] Could not set status to 'running' for rewrite: {_e}", flush=True)
|
||||
|
||||
book_path = os.path.join(run_dir, book_folder)
|
||||
ms_path = os.path.join(book_path, "manuscript.json")
|
||||
@@ -392,22 +449,25 @@ def rewrite_chapter_task(run_id, project_path, book_folder, chap_num, instructio
|
||||
exporter.compile_files(bp, ms, book_path)
|
||||
|
||||
try:
|
||||
with sqlite3.connect(db_path) as conn:
|
||||
conn.execute("UPDATE run SET status = 'completed' WHERE id = ?", (run_id,))
|
||||
except: pass
|
||||
_robust_update_run_status(db_path, run_id, 'completed',
|
||||
end_time=datetime.utcnow().isoformat())
|
||||
except Exception as _e:
|
||||
print(f"[WARN run={run_id}] Could not set status to 'completed': {_e}", flush=True)
|
||||
return True
|
||||
|
||||
try:
|
||||
with sqlite3.connect(db_path) as conn:
|
||||
conn.execute("UPDATE run SET status = 'completed' WHERE id = ?", (run_id,))
|
||||
except: pass
|
||||
_robust_update_run_status(db_path, run_id, 'completed',
|
||||
end_time=datetime.utcnow().isoformat())
|
||||
except Exception as _e:
|
||||
print(f"[WARN run={run_id}] Could not set status to 'completed': {_e}", flush=True)
|
||||
return False
|
||||
except Exception as e:
|
||||
utils.log("ERROR", f"Rewrite task exception for run {run_id}/{book_folder}: {e}")
|
||||
try:
|
||||
with sqlite3.connect(db_path) as conn:
|
||||
conn.execute("UPDATE run SET status = 'failed' WHERE id = ?", (run_id,))
|
||||
except: pass
|
||||
_robust_update_run_status(db_path, run_id, 'failed',
|
||||
end_time=datetime.utcnow().isoformat())
|
||||
except Exception as _e:
|
||||
print(f"[CRITICAL run={run_id}] Could not set status to 'failed' after rewrite error: {_e}", flush=True)
|
||||
return False
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user