/download')
@login_required
def download_artifact(run_id):
filename = request.args.get('file')
run = db.session.get(Run, run_id) or Run.query.get_or_404(run_id)
if run.project.user_id != current_user.id: return "Unauthorized", 403
run_dir = os.path.join(run.project.folder_path, "runs", "bible", f"run_{run.id}")
# If file not found in root, check subfolders (Series Support)
if not os.path.exists(os.path.join(run_dir, filename)) and os.path.exists(run_dir):
subdirs = sorted([d for d in os.listdir(run_dir) if os.path.isdir(os.path.join(run_dir, d)) and d.startswith("Book_")])
if subdirs:
# Try the first book folder
possible_path = os.path.join(subdirs[0], filename)
if os.path.exists(os.path.join(run_dir, possible_path)):
filename = possible_path
return send_from_directory(run_dir, filename, as_attachment=True)
@app.route('/logout')
def logout():
logout_user()
return redirect(url_for('login'))
@app.route('/debug/routes')
def debug_routes():
output = []
for rule in app.url_map.iter_rules():
methods = ','.join(rule.methods)
# Use brackets so they are visible in browser text
rule_str = str(rule).replace('<', '[').replace('>', ']')
line = "{:50s} {:20s} {}".format(rule.endpoint, methods, rule_str)
output.append(line)
return "" + "\n".join(output) + "
"
@app.route('/system/optimize_models', methods=['POST'])
@login_required
def optimize_models():
# Force refresh via AI module (safely handles failures)
try:
ai.init_models(force=True) # Force re-initialization and API scan
# Refresh Style Guidelines
if ai.model_logic:
story.refresh_style_guidelines(ai.model_logic)
flash("AI Models refreshed and Style Guidelines updated.")
except Exception as e:
flash(f"Error refreshing models: {e}")
return redirect(request.referrer or url_for('index'))
# --- COMPATIBILITY ROUTES (Fix 404s) ---
@app.route('/project//run/')
@login_required
def legacy_run_redirect(project_id, run_id):
return redirect(url_for('view_run', id=run_id))
@app.route('/system/status')
@login_required
def system_status():
# System Status View: Show AI Models and Quotas
models_info = {}
cache_data = {}
cache_path = os.path.join(config.DATA_DIR, "model_cache.json")
if os.path.exists(cache_path):
try:
with open(cache_path, 'r') as f:
cache_data = json.load(f)
models_info = cache_data.get('models', {})
except: pass
# Create a placeholder run object so the template doesn't crash
dummy_project = SimpleNamespace(user_id=current_user.id, name="System", folder_path="")
dummy_run = SimpleNamespace(id=0, status="System Status", cost=0.0, log_file=None, start_time=datetime.utcnow(), project=dummy_project, duration=lambda: "N/A")
return render_template('system_status.html', run=dummy_run, models=models_info, cache=cache_data, datetime=datetime)
@app.route('/personas')
@login_required
def list_personas():
personas = {}
if os.path.exists(config.PERSONAS_FILE):
try:
with open(config.PERSONAS_FILE, 'r') as f: personas = json.load(f)
except: pass
return render_template('personas.html', personas=personas)
@app.route('/persona/new')
@login_required
def new_persona():
return render_template('persona_edit.html', persona={}, name="")
@app.route('/persona/')
@login_required
def edit_persona(name):
personas = {}
if os.path.exists(config.PERSONAS_FILE):
try:
with open(config.PERSONAS_FILE, 'r') as f: personas = json.load(f)
except: pass
persona = personas.get(name)
if not persona:
flash(f"Persona '{name}' not found.")
return redirect(url_for('list_personas'))
return render_template('persona_edit.html', persona=persona, name=name)
@app.route('/persona/save', methods=['POST'])
@login_required
def save_persona():
old_name = request.form.get('old_name')
name = request.form.get('name')
if not name:
flash("Persona name is required.")
return redirect(url_for('list_personas'))
personas = {}
if os.path.exists(config.PERSONAS_FILE):
try:
with open(config.PERSONAS_FILE, 'r') as f: personas = json.load(f)
except: pass
if old_name and old_name != name and old_name in personas:
del personas[old_name]
persona = {
"name": name,
"bio": request.form.get('bio'),
"age": request.form.get('age'),
"gender": request.form.get('gender'),
"race": request.form.get('race'),
"nationality": request.form.get('nationality'),
"language": request.form.get('language'),
"sample_text": request.form.get('sample_text'),
"voice_keywords": request.form.get('voice_keywords'),
"style_inspirations": request.form.get('style_inspirations')
}
personas[name] = persona
with open(config.PERSONAS_FILE, 'w') as f: json.dump(personas, f, indent=2)
flash(f"Persona '{name}' saved.")
return redirect(url_for('list_personas'))
@app.route('/persona/delete/', methods=['POST'])
@login_required
def delete_persona(name):
personas = {}
if os.path.exists(config.PERSONAS_FILE):
try:
with open(config.PERSONAS_FILE, 'r') as f: personas = json.load(f)
except: pass
if name in personas:
del personas[name]
with open(config.PERSONAS_FILE, 'w') as f: json.dump(personas, f, indent=2)
flash(f"Persona '{name}' deleted.")
return redirect(url_for('list_personas'))
@app.route('/persona/analyze', methods=['POST'])
@login_required
def analyze_persona():
try: ai.init_models()
except: pass
if not ai.model_logic:
return {"error": "AI models not initialized."}, 500
data = request.json
sample = data.get('sample_text', '')
prompt = f"""
Act as a Literary Analyst. Create or analyze an Author Persona profile.
INPUT DATA:
Name: {data.get('name')}
Age: {data.get('age')} | Gender: {data.get('gender')} | Nationality: {data.get('nationality')}
Sample Text: {sample[:3000]}
TASK:
1. 'bio': Write a 2-3 sentence description of the writing style. If sample is provided, analyze it. If not, invent a style that fits the demographics/name.
2. 'voice_keywords': Comma-separated list of 3-5 adjectives describing the voice (e.g. Gritty, Whimsical, Sarcastic).
3. 'style_inspirations': Comma-separated list of 1-3 famous authors or genres that this style resembles.
RETURN JSON: {{ "bio": "...", "voice_keywords": "...", "style_inspirations": "..." }}
"""
try:
response = ai.model_logic.generate_content(prompt)
return json.loads(utils.clean_json(response.text))
except Exception as e:
return {"error": str(e)}, 500
# --- ADMIN ROUTES ---
@app.route('/admin')
@login_required
@admin_required
def admin_dashboard():
users = User.query.all()
projects = Project.query.all()
return render_template('admin_dashboard.html', users=users, projects=projects)
@app.route('/admin/user//delete', methods=['POST'])
@login_required
@admin_required
def admin_delete_user(user_id):
if user_id == current_user.id:
flash("Cannot delete yourself.")
return redirect(url_for('admin_dashboard'))
user = db.session.get(User, user_id)
if user:
# Delete user data folder
user_path = os.path.join(config.DATA_DIR, "users", str(user.id))
if os.path.exists(user_path):
try: shutil.rmtree(user_path)
except: pass
# Delete projects (DB cascade handles rows, we handle files)
projects = Project.query.filter_by(user_id=user.id).all()
for p in projects:
db.session.delete(p)
db.session.delete(user)
db.session.commit()
flash(f"User {user.username} deleted.")
return redirect(url_for('admin_dashboard'))
@app.route('/admin/project//delete', methods=['POST'])
@login_required
@admin_required
def admin_delete_project(project_id):
proj = db.session.get(Project, project_id)
if proj:
if os.path.exists(proj.folder_path):
try: shutil.rmtree(proj.folder_path)
except: pass
db.session.delete(proj)
db.session.commit()
flash(f"Project {proj.name} deleted.")
return redirect(url_for('admin_dashboard'))
@app.route('/admin/reset', methods=['POST'])
@login_required
@admin_required
def admin_factory_reset():
# 1. Delete ALL Projects (Files & DB)
projects = Project.query.all()
for p in projects:
if os.path.exists(p.folder_path):
try: shutil.rmtree(p.folder_path)
except: pass
db.session.delete(p)
# 2. Delete ALL Users except Current Admin
users = User.query.filter(User.id != current_user.id).all()
for u in users:
user_path = os.path.join(config.DATA_DIR, "users", str(u.id))
if os.path.exists(user_path):
try: shutil.rmtree(user_path)
except: pass
db.session.delete(u)
# 3. Reset Personas to Default
if os.path.exists(config.PERSONAS_FILE):
try: os.remove(config.PERSONAS_FILE)
except: pass
utils.create_default_personas()
db.session.commit()
flash("Factory Reset Complete. All other users and projects have been wiped.")
return redirect(url_for('admin_dashboard'))
@app.route('/admin/spend')
@login_required
@admin_required
def admin_spend_report():
days = request.args.get('days', 30, type=int)
if days > 0:
start_date = datetime.utcnow() - timedelta(days=days)
else:
start_date = datetime.min
# Aggregate spend per user
results = db.session.query(
User.username,
func.count(Run.id),
func.sum(Run.cost)
).join(Project, Project.user_id == User.id)\
.join(Run, Run.project_id == Project.id)\
.filter(Run.start_time >= start_date)\
.group_by(User.id).all()
report = []
total_period_spend = 0.0
for r in results:
cost = r[2] if r[2] else 0.0
report.append({"username": r[0], "runs": r[1], "cost": cost})
total_period_spend += cost
return render_template('admin_spend.html', report=report, days=days, total=total_period_spend)
@app.route('/admin/style', methods=['GET', 'POST'])
@login_required
@admin_required
def admin_style_guidelines():
path = os.path.join(config.DATA_DIR, "style_guidelines.json")
if request.method == 'POST':
ai_isms_raw = request.form.get('ai_isms', '')
filter_words_raw = request.form.get('filter_words', '')
data = {
"ai_isms": [x.strip() for x in ai_isms_raw.split('\n') if x.strip()],
"filter_words": [x.strip() for x in filter_words_raw.split('\n') if x.strip()]
}
with open(path, 'w') as f: json.dump(data, f, indent=2)
flash("Style Guidelines updated successfully.")
return redirect(url_for('admin_style_guidelines'))
# Load current (creates defaults if missing)
data = story.get_style_guidelines()
return render_template('admin_style.html', data=data)
@app.route('/admin/impersonate/')
@login_required
@admin_required
def impersonate_user(user_id):
if user_id == current_user.id:
flash("Cannot impersonate yourself.")
return redirect(url_for('admin_dashboard'))
user = db.session.get(User, user_id)
if user:
session['original_admin_id'] = current_user.id
login_user(user)
flash(f"Now viewing as {user.username}")
return redirect(url_for('index'))
return redirect(url_for('admin_dashboard'))
@app.route('/admin/stop_impersonate')
@login_required
def stop_impersonate():
admin_id = session.get('original_admin_id')
if admin_id:
admin = db.session.get(User, admin_id)
if admin:
login_user(admin)
session.pop('original_admin_id', None)
flash("Restored admin session.")
return redirect(url_for('admin_dashboard'))
return redirect(url_for('index'))
if __name__ == '__main__':
# Run the Huey consumer in a separate thread for testing on Pi
# For production, run `huey_consumer.py web_tasks.huey` in terminal
import threading
def run_huey():
from huey.consumer import Consumer
# Subclass to disable signal handling (which fails in threads)
class ThreadedConsumer(Consumer):
def _set_signal_handlers(self):
pass
c = ThreadedConsumer(huey, workers=1, worker_type='thread')
c.run()
# Configuration
debug_mode = True
# Run worker if: 1. In reloader child process OR 2. Reloader is disabled (debug=False)
if os.environ.get("WERKZEUG_RUN_MAIN") == "true" or not debug_mode:
threading.Thread(target=run_huey, daemon=True).start()
app.run(host='0.0.0.0', port=5000, debug=debug_mode)