250 lines
8.2 KiB
Python
250 lines
8.2 KiB
Python
import os
|
|
import json
|
|
import shutil
|
|
from datetime import datetime, timedelta
|
|
from flask import Blueprint, render_template, request, redirect, url_for, flash, session, jsonify
|
|
from flask_login import login_required, login_user, current_user
|
|
from sqlalchemy import func
|
|
from web.db import db, User, Project, Run, Persona
|
|
from web.helpers import admin_required
|
|
from core import config, utils
|
|
from ai import models as ai_models
|
|
from ai import setup as ai_setup
|
|
from story import style_persona, bible_tracker
|
|
|
|
admin_bp = Blueprint('admin', __name__)
|
|
|
|
|
|
@admin_bp.route('/admin')
|
|
@login_required
|
|
@admin_required
|
|
def admin_dashboard():
|
|
users = User.query.all()
|
|
projects = Project.query.all()
|
|
return render_template('admin_dashboard.html', users=users, projects=projects)
|
|
|
|
|
|
@admin_bp.route('/admin/user/<int:user_id>/delete', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def admin_delete_user(user_id):
|
|
if user_id == current_user.id:
|
|
flash("Cannot delete yourself.")
|
|
return redirect(url_for('admin.admin_dashboard'))
|
|
|
|
user = db.session.get(User, user_id)
|
|
if user:
|
|
user_path = os.path.join(config.DATA_DIR, "users", str(user.id))
|
|
if os.path.exists(user_path):
|
|
try: shutil.rmtree(user_path)
|
|
except: pass
|
|
|
|
projects = Project.query.filter_by(user_id=user.id).all()
|
|
for p in projects:
|
|
db.session.delete(p)
|
|
|
|
db.session.delete(user)
|
|
db.session.commit()
|
|
flash(f"User {user.username} deleted.")
|
|
return redirect(url_for('admin.admin_dashboard'))
|
|
|
|
|
|
@admin_bp.route('/admin/project/<int:project_id>/delete', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def admin_delete_project(project_id):
|
|
proj = db.session.get(Project, project_id)
|
|
if proj:
|
|
if os.path.exists(proj.folder_path):
|
|
try: shutil.rmtree(proj.folder_path)
|
|
except: pass
|
|
db.session.delete(proj)
|
|
db.session.commit()
|
|
flash(f"Project {proj.name} deleted.")
|
|
return redirect(url_for('admin.admin_dashboard'))
|
|
|
|
|
|
@admin_bp.route('/admin/reset', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def admin_factory_reset():
|
|
projects = Project.query.all()
|
|
for p in projects:
|
|
if os.path.exists(p.folder_path):
|
|
try: shutil.rmtree(p.folder_path)
|
|
except: pass
|
|
db.session.delete(p)
|
|
|
|
users = User.query.filter(User.id != current_user.id).all()
|
|
for u in users:
|
|
user_path = os.path.join(config.DATA_DIR, "users", str(u.id))
|
|
if os.path.exists(user_path):
|
|
try: shutil.rmtree(user_path)
|
|
except: pass
|
|
db.session.delete(u)
|
|
|
|
Persona.query.delete()
|
|
|
|
db.session.commit()
|
|
flash("Factory Reset Complete. All other users and projects have been wiped.")
|
|
return redirect(url_for('admin.admin_dashboard'))
|
|
|
|
|
|
@admin_bp.route('/admin/spend')
|
|
@login_required
|
|
@admin_required
|
|
def admin_spend_report():
|
|
days = request.args.get('days', 30, type=int)
|
|
|
|
if days > 0:
|
|
start_date = datetime.utcnow() - timedelta(days=days)
|
|
else:
|
|
start_date = datetime.min
|
|
|
|
results = db.session.query(
|
|
User.username,
|
|
func.count(Run.id),
|
|
func.sum(Run.cost)
|
|
).join(Project, Project.user_id == User.id)\
|
|
.join(Run, Run.project_id == Project.id)\
|
|
.filter(Run.start_time >= start_date)\
|
|
.group_by(User.id, User.username).all()
|
|
|
|
report = []
|
|
total_period_spend = 0.0
|
|
for r in results:
|
|
cost = r[2] if r[2] else 0.0
|
|
report.append({"username": r[0], "runs": r[1], "cost": cost})
|
|
total_period_spend += cost
|
|
|
|
return render_template('admin_spend.html', report=report, days=days, total=total_period_spend)
|
|
|
|
|
|
@admin_bp.route('/admin/style', methods=['GET', 'POST'])
|
|
@login_required
|
|
@admin_required
|
|
def admin_style_guidelines():
|
|
path = os.path.join(config.DATA_DIR, "style_guidelines.json")
|
|
|
|
if request.method == 'POST':
|
|
ai_isms_raw = request.form.get('ai_isms', '')
|
|
filter_words_raw = request.form.get('filter_words', '')
|
|
|
|
data = {
|
|
"ai_isms": [x.strip() for x in ai_isms_raw.split('\n') if x.strip()],
|
|
"filter_words": [x.strip() for x in filter_words_raw.split('\n') if x.strip()]
|
|
}
|
|
|
|
with open(path, 'w') as f: json.dump(data, f, indent=2)
|
|
flash("Style Guidelines updated successfully.")
|
|
return redirect(url_for('admin.admin_style_guidelines'))
|
|
|
|
data = style_persona.get_style_guidelines()
|
|
return render_template('admin_style.html', data=data)
|
|
|
|
|
|
@admin_bp.route('/admin/impersonate/<int:user_id>')
|
|
@login_required
|
|
@admin_required
|
|
def impersonate_user(user_id):
|
|
if user_id == current_user.id:
|
|
flash("Cannot impersonate yourself.")
|
|
return redirect(url_for('admin.admin_dashboard'))
|
|
|
|
user = db.session.get(User, user_id)
|
|
if user:
|
|
session['original_admin_id'] = current_user.id
|
|
login_user(user)
|
|
flash(f"Now viewing as {user.username}")
|
|
return redirect(url_for('project.index'))
|
|
return redirect(url_for('admin.admin_dashboard'))
|
|
|
|
|
|
@admin_bp.route('/admin/stop_impersonate')
|
|
@login_required
|
|
def stop_impersonate():
|
|
admin_id = session.get('original_admin_id')
|
|
if admin_id:
|
|
admin = db.session.get(User, admin_id)
|
|
if admin:
|
|
login_user(admin)
|
|
session.pop('original_admin_id', None)
|
|
flash("Restored admin session.")
|
|
return redirect(url_for('admin.admin_dashboard'))
|
|
return redirect(url_for('project.index'))
|
|
|
|
|
|
@admin_bp.route('/debug/routes')
|
|
@login_required
|
|
@admin_required
|
|
def debug_routes():
|
|
from flask import current_app
|
|
output = []
|
|
for rule in current_app.url_map.iter_rules():
|
|
methods = ','.join(rule.methods)
|
|
rule_str = str(rule).replace('<', '[').replace('>', ']')
|
|
line = "{:50s} {:20s} {}".format(rule.endpoint, methods, rule_str)
|
|
output.append(line)
|
|
return "<pre>" + "\n".join(output) + "</pre>"
|
|
|
|
|
|
@admin_bp.route('/system/optimize_models', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def optimize_models():
|
|
is_ajax = request.headers.get('X-Requested-With') == 'XMLHttpRequest'
|
|
try:
|
|
ai_setup.init_models(force=True)
|
|
|
|
if ai_models.model_logic:
|
|
style_persona.refresh_style_guidelines(ai_models.model_logic)
|
|
|
|
if is_ajax:
|
|
return jsonify({'status': 'ok', 'message': 'AI Models refreshed and Style Guidelines updated.'})
|
|
flash("AI Models refreshed and Style Guidelines updated.")
|
|
except Exception as e:
|
|
if is_ajax:
|
|
return jsonify({'status': 'error', 'message': f'Error refreshing models: {e}'}), 500
|
|
flash(f"Error refreshing models: {e}")
|
|
|
|
return redirect(request.referrer or url_for('project.index'))
|
|
|
|
|
|
@admin_bp.route('/admin/refresh-style-guidelines', methods=['POST'])
|
|
@login_required
|
|
@admin_required
|
|
def refresh_style_guidelines_route():
|
|
is_ajax = request.headers.get('X-Requested-With') == 'XMLHttpRequest'
|
|
try:
|
|
if not ai_models.model_logic:
|
|
raise Exception("No AI model available. Run 'Refresh & Optimize' first.")
|
|
new_data = style_persona.refresh_style_guidelines(ai_models.model_logic)
|
|
msg = f"Style Guidelines updated — {len(new_data.get('ai_isms', []))} AI-isms, {len(new_data.get('filter_words', []))} filter words."
|
|
utils.log("SYSTEM", msg)
|
|
if is_ajax:
|
|
return jsonify({'status': 'ok', 'message': msg})
|
|
flash(msg)
|
|
except Exception as e:
|
|
if is_ajax:
|
|
return jsonify({'status': 'error', 'message': str(e)}), 500
|
|
flash(f"Error refreshing style guidelines: {e}")
|
|
return redirect(request.referrer or url_for('admin.system_status'))
|
|
|
|
|
|
@admin_bp.route('/system/status')
|
|
@login_required
|
|
def system_status():
|
|
models_info = {}
|
|
cache_data = {}
|
|
|
|
cache_path = os.path.join(config.DATA_DIR, "model_cache.json")
|
|
if os.path.exists(cache_path):
|
|
try:
|
|
with open(cache_path, 'r') as f:
|
|
cache_data = json.load(f)
|
|
models_info = cache_data.get('models', {})
|
|
except: pass
|
|
|
|
return render_template('system_status.html', models=models_info, cache=cache_data, datetime=datetime,
|
|
image_model=ai_models.image_model_name, image_source=ai_models.image_model_source)
|