Model selection (ai.py): - get_optimal_model() now scores Gemini 2.5 > 2.0 > 1.5 when ranking candidates - get_default_models() fallbacks updated to gemini-2.0-pro-exp (logic) and gemini-2.0-flash (writer/artist) - AI selection prompt rewritten: includes Gemini 2.x pricing context, guidance to avoid 'thinking' models for writer/artist roles, and instructions to prefer 2.x over 1.5 - Added image_model_name and image_model_source globals for UI visibility - init_models() now reads MODEL_IMAGE_HINT; tries imagen-3.0-generate-001 then imagen-3.0-fast-generate-001 on both Gemini API and Vertex AI paths Cover generation (marketing.py): - Fixed display bug: "Attempt X/5" now correctly reads "Attempt X/3" - Added imagen-3.0-fast-generate-001 as intermediate fallback before legacy Imagen 2 - Quality threshold: images with score < 5 are only kept if nothing better exists - Smarter prompt refinement on retry: deformity, blur, and watermark critique keywords each append targeted corrections to the art prompt - Fixed missing sys import (sys.platform check for macOS was silently broken) Config / Docker: - config.py: added MODEL_IMAGE_HINT env var, bumped version to 1.2.0 - docker-compose.yml: added MODEL_IMAGE environment variable - Dockerfile: added libpng-dev and libfreetype6-dev for better font/PNG rendering; added HEALTHCHECK so Portainer detects unhealthy containers System status UI: - system_status.html: added Image row showing active Imagen model and provider (Gemini API / Vertex AI) - Added cache expiry countdown with colour-coded badges Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
372 lines
17 KiB
Python
372 lines
17 KiB
Python
import os
|
|
import json
|
|
import time
|
|
import warnings
|
|
import google.generativeai as genai
|
|
import config
|
|
from . import utils
|
|
|
|
# Suppress Vertex AI warnings
|
|
warnings.filterwarnings("ignore", category=UserWarning, module="vertexai")
|
|
|
|
try:
|
|
import vertexai
|
|
from vertexai.preview.vision_models import ImageGenerationModel as VertexImageModel
|
|
HAS_VERTEX = True
|
|
except ImportError:
|
|
HAS_VERTEX = False
|
|
|
|
try:
|
|
from google.auth.transport.requests import Request
|
|
from google.oauth2.credentials import Credentials
|
|
from google_auth_oauthlib.flow import InstalledAppFlow
|
|
HAS_OAUTH = True
|
|
except ImportError:
|
|
HAS_OAUTH = False
|
|
|
|
model_logic = None
|
|
model_writer = None
|
|
model_artist = None
|
|
model_image = None
|
|
logic_model_name = "models/gemini-1.5-pro"
|
|
writer_model_name = "models/gemini-1.5-flash"
|
|
artist_model_name = "models/gemini-1.5-flash"
|
|
image_model_name = None
|
|
image_model_source = "None"
|
|
|
|
class ResilientModel:
|
|
def __init__(self, name, safety_settings, role):
|
|
self.name = name
|
|
self.safety_settings = safety_settings
|
|
self.role = role
|
|
self.model = genai.GenerativeModel(name, safety_settings=safety_settings)
|
|
|
|
def update(self, name):
|
|
self.name = name
|
|
self.model = genai.GenerativeModel(name, safety_settings=self.safety_settings)
|
|
|
|
def generate_content(self, *args, **kwargs):
|
|
retries = 0
|
|
max_retries = 3
|
|
base_delay = 5
|
|
|
|
while True:
|
|
try:
|
|
return self.model.generate_content(*args, **kwargs)
|
|
except Exception as e:
|
|
err_str = str(e).lower()
|
|
is_retryable = "429" in err_str or "quota" in err_str or "500" in err_str or "503" in err_str or "504" in err_str or "deadline" in err_str or "internal error" in err_str
|
|
if is_retryable and retries < max_retries:
|
|
delay = base_delay * (2 ** retries)
|
|
utils.log("SYSTEM", f"⚠️ Quota error on {self.role} ({self.name}). Retrying in {delay}s...")
|
|
time.sleep(delay)
|
|
|
|
# On first retry, attempt to re-optimize/rotate models
|
|
if retries == 0:
|
|
utils.log("SYSTEM", "Attempting to re-optimize models to find alternative...")
|
|
init_models(force=True)
|
|
# Note: init_models calls .update() on this instance
|
|
|
|
retries += 1
|
|
continue
|
|
raise e
|
|
|
|
def get_optimal_model(base_type="pro"):
|
|
try:
|
|
models = [m for m in genai.list_models() if 'generateContent' in m.supported_generation_methods]
|
|
candidates = [m.name for m in models if base_type in m.name]
|
|
if not candidates: return f"models/gemini-1.5-{base_type}"
|
|
def score(n):
|
|
# Prefer newer generations: 2.5 > 2.0 > 1.5
|
|
gen_bonus = 0
|
|
if "2.5" in n: gen_bonus = 300
|
|
elif "2.0" in n: gen_bonus = 200
|
|
elif "2." in n: gen_bonus = 150
|
|
# Within a generation, prefer stable over experimental
|
|
if "exp" in n or "beta" in n or "preview" in n: return gen_bonus + 0
|
|
if "latest" in n: return gen_bonus + 50
|
|
return gen_bonus + 100
|
|
return sorted(candidates, key=score, reverse=True)[0]
|
|
except Exception as e:
|
|
utils.log("SYSTEM", f"⚠️ Error finding optimal model: {e}")
|
|
return f"models/gemini-1.5-{base_type}"
|
|
|
|
def get_default_models():
|
|
return {
|
|
"logic": {"model": "models/gemini-2.0-pro-exp", "reason": "Fallback: Gemini 2.0 Pro for complex reasoning and JSON adherence.", "estimated_cost": "$0.00/1M (Experimental)"},
|
|
"writer": {"model": "models/gemini-2.0-flash", "reason": "Fallback: Gemini 2.0 Flash for fast, high-quality creative writing.", "estimated_cost": "$0.10/1M"},
|
|
"artist": {"model": "models/gemini-2.0-flash", "reason": "Fallback: Gemini 2.0 Flash for visual prompt design.", "estimated_cost": "$0.10/1M"},
|
|
"ranking": []
|
|
}
|
|
|
|
def select_best_models(force_refresh=False):
|
|
"""
|
|
Uses a safe bootstrapper model to analyze available models and pick the best ones.
|
|
Caches the result for 24 hours.
|
|
"""
|
|
cache_path = os.path.join(config.DATA_DIR, "model_cache.json")
|
|
cached_models = None
|
|
|
|
# 1. Check Cache
|
|
if os.path.exists(cache_path):
|
|
try:
|
|
with open(cache_path, 'r') as f:
|
|
cached = json.load(f)
|
|
cached_models = cached.get('models', {})
|
|
# Check if within 24 hours (86400 seconds)
|
|
if not force_refresh and time.time() - cached.get('timestamp', 0) < 86400:
|
|
models = cached_models
|
|
# Validate format (must be dicts with reasons, not just strings)
|
|
if isinstance(models.get('logic'), dict) and 'reason' in models['logic']:
|
|
utils.log("SYSTEM", "Using cached AI model selection (valid for 24h).")
|
|
return models
|
|
except Exception as e:
|
|
utils.log("SYSTEM", f"Cache read failed: {e}. Refreshing models.")
|
|
|
|
try:
|
|
utils.log("SYSTEM", "Refreshing AI model list from API...")
|
|
all_models = list(genai.list_models())
|
|
raw_model_names = [m.name for m in all_models]
|
|
utils.log("SYSTEM", f"Found {len(all_models)} raw models from Google API.")
|
|
|
|
models = [m.name for m in all_models if 'generateContent' in m.supported_generation_methods and 'gemini' in m.name.lower()]
|
|
utils.log("SYSTEM", f"Identified {len(models)} compatible Gemini models: {models}")
|
|
|
|
bootstrapper = get_optimal_model("flash")
|
|
utils.log("SYSTEM", f"Bootstrapping model selection with: {bootstrapper}")
|
|
|
|
model = genai.GenerativeModel(bootstrapper)
|
|
prompt = f"""
|
|
ROLE: AI Model Architect
|
|
TASK: Select the optimal Gemini models for a book-writing application. Prefer newer Gemini 2.x models when available.
|
|
|
|
AVAILABLE_MODELS:
|
|
{json.dumps(models)}
|
|
|
|
PRICING_CONTEXT (USD per 1M tokens, approximate):
|
|
- Gemini 2.5 Pro/Flash: Best quality/speed; check current pricing.
|
|
- Gemini 2.0 Flash: ~$0.10 Input / $0.40 Output. (Fast, cost-effective, excellent quality).
|
|
- Gemini 2.0 Pro Exp: Free experimental tier with strong reasoning.
|
|
- Gemini 1.5 Flash: ~$0.075 Input / $0.30 Output. (Legacy, still reliable).
|
|
- Gemini 1.5 Pro: ~$1.25 Input / $5.00 Output. (Legacy, expensive).
|
|
|
|
CRITERIA:
|
|
- LOGIC: Needs complex reasoning, strict JSON adherence, plot consistency, and instruction following.
|
|
-> Prefer: Gemini 2.5 Pro > 2.0 Pro > 2.0 Flash > 1.5 Pro
|
|
- WRITER: Needs creativity, prose quality, long-form text generation, and speed.
|
|
-> Prefer: Gemini 2.5 Flash/Pro > 2.0 Flash > 1.5 Flash (balance quality/cost)
|
|
- ARTIST: Needs rich visual description, prompt understanding for cover art design.
|
|
-> Prefer: Gemini 2.0 Flash > 1.5 Flash (speed and visual understanding)
|
|
|
|
CONSTRAINTS:
|
|
- Strongly prefer Gemini 2.x over 1.5 where available.
|
|
- Avoid 'experimental' or 'preview' only if a stable 2.x version exists; otherwise experimental 2.x is fine.
|
|
- 'thinking' models are too slow/expensive for Writer/Artist roles.
|
|
- Provide a ranking of ALL available models from best to worst overall.
|
|
|
|
OUTPUT_FORMAT (JSON only, no markdown):
|
|
{{
|
|
"logic": {{ "model": "string", "reason": "string", "estimated_cost": "$X.XX/1M" }},
|
|
"writer": {{ "model": "string", "reason": "string", "estimated_cost": "$X.XX/1M" }},
|
|
"artist": {{ "model": "string", "reason": "string", "estimated_cost": "$X.XX/1M" }},
|
|
"ranking": [ {{ "model": "string", "reason": "string", "estimated_cost": "string" }} ]
|
|
}}
|
|
"""
|
|
|
|
try:
|
|
response = model.generate_content(prompt)
|
|
selection = json.loads(utils.clean_json(response.text))
|
|
except Exception as e:
|
|
utils.log("SYSTEM", f"Model selection generation failed (Safety/Format): {e}")
|
|
raise e
|
|
|
|
if not os.path.exists(config.DATA_DIR): os.makedirs(config.DATA_DIR)
|
|
with open(cache_path, 'w') as f:
|
|
json.dump({
|
|
"timestamp": int(time.time()),
|
|
"models": selection,
|
|
"available_at_time": models,
|
|
"raw_models": raw_model_names
|
|
}, f, indent=2)
|
|
return selection
|
|
except Exception as e:
|
|
utils.log("SYSTEM", f"AI Model Selection failed: {e}.")
|
|
|
|
# 3. Fallback to Stale Cache if available (Better than heuristics)
|
|
# Relaxed check: If we successfully loaded ANY JSON from the cache, use it.
|
|
if cached_models:
|
|
utils.log("SYSTEM", "⚠️ Using stale cached models due to API failure.")
|
|
return cached_models
|
|
|
|
utils.log("SYSTEM", "Falling back to heuristics.")
|
|
fallback = get_default_models()
|
|
|
|
# Save fallback to cache if file doesn't exist OR if we couldn't load it (corrupt/None)
|
|
# This ensures we have a valid file on disk for the web UI to read.
|
|
try:
|
|
with open(cache_path, 'w') as f:
|
|
json.dump({"timestamp": int(time.time()), "models": fallback, "error": str(e)}, f, indent=2)
|
|
except: pass
|
|
return fallback
|
|
|
|
def init_models(force=False):
|
|
global model_logic, model_writer, model_artist, model_image, logic_model_name, writer_model_name, artist_model_name, image_model_name, image_model_source
|
|
if model_logic and not force: return
|
|
genai.configure(api_key=config.API_KEY)
|
|
|
|
# Check cache to skip frequent validation
|
|
cache_path = os.path.join(config.DATA_DIR, "model_cache.json")
|
|
skip_validation = False
|
|
if not force and os.path.exists(cache_path):
|
|
try:
|
|
with open(cache_path, 'r') as f: cached = json.load(f)
|
|
if time.time() - cached.get('timestamp', 0) < 86400: skip_validation = True
|
|
except: pass
|
|
|
|
if not skip_validation:
|
|
# Validate Gemini API Key
|
|
utils.log("SYSTEM", "Validating credentials...")
|
|
try:
|
|
list(genai.list_models(page_size=1))
|
|
utils.log("SYSTEM", "✅ Gemini API Key is valid.")
|
|
except Exception as e:
|
|
# Check if we have a cache file we can rely on before exiting
|
|
if os.path.exists(cache_path):
|
|
utils.log("SYSTEM", f"⚠️ API check failed ({e}), but cache exists. Attempting to use cached models.")
|
|
else:
|
|
utils.log("SYSTEM", f"⚠️ API check failed ({e}). No cache found. Attempting to initialize with defaults.")
|
|
|
|
utils.log("SYSTEM", "Selecting optimal models via AI...")
|
|
selected_models = select_best_models(force_refresh=force)
|
|
|
|
# Check for missing costs and force refresh if needed
|
|
if not force:
|
|
missing_costs = False
|
|
for role in ['logic', 'writer', 'artist']:
|
|
if 'estimated_cost' not in selected_models.get(role, {}) or selected_models[role].get('estimated_cost') == 'N/A':
|
|
missing_costs = True
|
|
if missing_costs:
|
|
utils.log("SYSTEM", "⚠️ Missing cost info in cached models. Forcing refresh.")
|
|
return init_models(force=True)
|
|
|
|
def get_model_details(role_data):
|
|
if isinstance(role_data, dict): return role_data.get('model'), role_data.get('estimated_cost', 'N/A')
|
|
return role_data, 'N/A'
|
|
|
|
logic_name, logic_cost = get_model_details(selected_models['logic'])
|
|
writer_name, writer_cost = get_model_details(selected_models['writer'])
|
|
artist_name, artist_cost = get_model_details(selected_models['artist'])
|
|
|
|
logic_name = logic_model_name = logic_name if config.MODEL_LOGIC_HINT == "AUTO" else config.MODEL_LOGIC_HINT
|
|
writer_name = writer_model_name = writer_name if config.MODEL_WRITER_HINT == "AUTO" else config.MODEL_WRITER_HINT
|
|
artist_name = artist_model_name = artist_name if config.MODEL_ARTIST_HINT == "AUTO" else config.MODEL_ARTIST_HINT
|
|
|
|
utils.log("SYSTEM", f"Models: Logic={logic_name} ({logic_cost}) | Writer={writer_name} ({writer_cost}) | Artist={artist_name}")
|
|
|
|
# Update pricing in utils
|
|
utils.update_pricing(logic_name, logic_cost)
|
|
utils.update_pricing(writer_name, writer_cost)
|
|
utils.update_pricing(artist_name, artist_cost)
|
|
|
|
# Initialize or Update Resilient Models
|
|
if model_logic is None:
|
|
model_logic = ResilientModel(logic_name, utils.SAFETY_SETTINGS, "Logic")
|
|
model_writer = ResilientModel(writer_name, utils.SAFETY_SETTINGS, "Writer")
|
|
model_artist = ResilientModel(artist_name, utils.SAFETY_SETTINGS, "Artist")
|
|
else:
|
|
# If models already exist (re-init), update them in place
|
|
model_logic.update(logic_name)
|
|
model_writer.update(writer_name)
|
|
model_artist.update(artist_name)
|
|
|
|
# Initialize Image Model
|
|
model_image = None
|
|
image_model_name = None
|
|
image_model_source = "None"
|
|
|
|
hint = config.MODEL_IMAGE_HINT if hasattr(config, 'MODEL_IMAGE_HINT') else "AUTO"
|
|
|
|
if hasattr(genai, 'ImageGenerationModel'):
|
|
# Candidate image models in preference order
|
|
if hint and hint != "AUTO":
|
|
candidates = [hint]
|
|
else:
|
|
candidates = ["imagen-3.0-generate-001", "imagen-3.0-fast-generate-001"]
|
|
for candidate in candidates:
|
|
try:
|
|
model_image = genai.ImageGenerationModel(candidate)
|
|
image_model_name = candidate
|
|
image_model_source = "Gemini API"
|
|
utils.log("SYSTEM", f"✅ Image model: {candidate} (Gemini API)")
|
|
break
|
|
except Exception:
|
|
continue
|
|
|
|
# Auto-detect GCP Project from credentials if not set (Fix for Image Model)
|
|
if HAS_VERTEX and not config.GCP_PROJECT and config.GOOGLE_CREDS and os.path.exists(config.GOOGLE_CREDS):
|
|
try:
|
|
with open(config.GOOGLE_CREDS, 'r') as f:
|
|
cdata = json.load(f)
|
|
# Check common OAuth structures
|
|
for k in ['installed', 'web']:
|
|
if k in cdata and 'project_id' in cdata[k]:
|
|
config.GCP_PROJECT = cdata[k]['project_id']
|
|
utils.log("SYSTEM", f"Auto-detected GCP Project ID: {config.GCP_PROJECT}")
|
|
break
|
|
except: pass
|
|
|
|
if HAS_VERTEX and config.GCP_PROJECT:
|
|
creds = None
|
|
# Handle OAuth Client ID (credentials.json) if provided instead of Service Account
|
|
if HAS_OAUTH:
|
|
gac = config.GOOGLE_CREDS # Use persistent config, not volatile env var
|
|
if gac and os.path.exists(gac):
|
|
try:
|
|
with open(gac, 'r') as f: data = json.load(f)
|
|
if 'installed' in data or 'web' in data:
|
|
# It's an OAuth Client ID. Unset env var to avoid library crash.
|
|
if "GOOGLE_APPLICATION_CREDENTIALS" in os.environ:
|
|
del os.environ["GOOGLE_APPLICATION_CREDENTIALS"]
|
|
|
|
token_path = os.path.join(os.path.dirname(os.path.abspath(gac)), 'token.json')
|
|
SCOPES = ['https://www.googleapis.com/auth/cloud-platform']
|
|
|
|
if os.path.exists(token_path):
|
|
creds = Credentials.from_authorized_user_file(token_path, SCOPES)
|
|
|
|
if not creds or not creds.valid:
|
|
if creds and creds.expired and creds.refresh_token:
|
|
try:
|
|
creds.refresh(Request())
|
|
except Exception:
|
|
utils.log("SYSTEM", "Token refresh failed. Re-authenticating...")
|
|
flow = InstalledAppFlow.from_client_secrets_file(gac, SCOPES)
|
|
creds = flow.run_local_server(port=0)
|
|
else:
|
|
utils.log("SYSTEM", "OAuth Client ID detected. Launching browser to authenticate...")
|
|
flow = InstalledAppFlow.from_client_secrets_file(gac, SCOPES)
|
|
creds = flow.run_local_server(port=0)
|
|
with open(token_path, 'w') as token: token.write(creds.to_json())
|
|
|
|
utils.log("SYSTEM", "✅ Authenticated via OAuth Client ID.")
|
|
except Exception as e:
|
|
utils.log("SYSTEM", f"⚠️ OAuth check failed: {e}")
|
|
|
|
vertexai.init(project=config.GCP_PROJECT, location=config.GCP_LOCATION, credentials=creds)
|
|
utils.log("SYSTEM", f"✅ Vertex AI initialized (Project: {config.GCP_PROJECT})")
|
|
|
|
# Override with Vertex Image Model if available
|
|
vertex_candidates = ["imagen-3.0-generate-001", "imagen-3.0-fast-generate-001"]
|
|
if hint and hint != "AUTO":
|
|
vertex_candidates = [hint]
|
|
for candidate in vertex_candidates:
|
|
try:
|
|
model_image = VertexImageModel.from_pretrained(candidate)
|
|
image_model_name = candidate
|
|
image_model_source = "Vertex AI"
|
|
utils.log("SYSTEM", f"✅ Image model: {candidate} (Vertex AI)")
|
|
break
|
|
except Exception:
|
|
continue
|
|
|
|
utils.log("SYSTEM", f"Image Generation Provider: {image_model_source} ({image_model_name or 'unavailable'})") |