More improvements.

This commit is contained in:
2026-02-06 11:05:46 -05:00
parent 7e5dbe6f00
commit 848d187f4b
7 changed files with 250 additions and 83 deletions

View File

@@ -108,7 +108,7 @@ def process_book(bp, folder, context="", resume=False, interactive=False):
INSTRUCTIONS: Use dense, factual bullet points. Focus on character meetings, relationships, and known information.
OUTPUT: Summary text.
""")
utils.log_usage(folder, "writer-flash", resp_sum.usage_metadata)
utils.log_usage(folder, ai.model_writer.name, resp_sum.usage_metadata)
summary = resp_sum.text
except: summary = "The story continues."
@@ -187,12 +187,12 @@ def process_book(bp, folder, context="", resume=False, interactive=False):
OUTPUT: Updated summary text.
"""
resp_sum = ai.model_writer.generate_content(update_prompt)
utils.log_usage(folder, "writer-flash", resp_sum.usage_metadata)
utils.log_usage(folder, ai.model_writer.name, resp_sum.usage_metadata)
summary = resp_sum.text
except:
try:
resp_fallback = ai.model_writer.generate_content(f"ROLE: Summarizer\nTASK: Summarize plot points.\nTEXT: {txt}\nOUTPUT: Bullet points.")
utils.log_usage(folder, "writer-flash", resp_fallback.usage_metadata)
utils.log_usage(folder, ai.model_writer.name, resp_fallback.usage_metadata)
summary += f"\n\nChapter {ch['chapter_number']}: " + resp_fallback.text
except: summary += f"\n\nChapter {ch['chapter_number']}: [Content processed]"

View File

@@ -28,6 +28,45 @@ model_logic = None
model_writer = None
model_artist = None
model_image = None
logic_model_name = "models/gemini-1.5-pro"
writer_model_name = "models/gemini-1.5-flash"
artist_model_name = "models/gemini-1.5-flash"
class ResilientModel:
def __init__(self, name, safety_settings, role):
self.name = name
self.safety_settings = safety_settings
self.role = role
self.model = genai.GenerativeModel(name, safety_settings=safety_settings)
def update(self, name):
self.name = name
self.model = genai.GenerativeModel(name, safety_settings=self.safety_settings)
def generate_content(self, *args, **kwargs):
retries = 0
max_retries = 3
base_delay = 5
while True:
try:
return self.model.generate_content(*args, **kwargs)
except Exception as e:
is_quota = "429" in str(e) or "quota" in str(e).lower()
if is_quota and retries < max_retries:
delay = base_delay * (2 ** retries)
utils.log("SYSTEM", f"⚠️ Quota error on {self.role} ({self.name}). Retrying in {delay}s...")
time.sleep(delay)
# On first retry, attempt to re-optimize/rotate models
if retries == 0:
utils.log("SYSTEM", "Attempting to re-optimize models to find alternative...")
init_models(force=True)
# Note: init_models calls .update() on this instance
retries += 1
continue
raise e
def get_optimal_model(base_type="pro"):
try:
@@ -44,9 +83,9 @@ def get_optimal_model(base_type="pro"):
def get_default_models():
return {
"logic": {"model": "models/gemini-1.5-pro", "reason": "Fallback: Default Pro model selected."},
"writer": {"model": "models/gemini-1.5-flash", "reason": "Fallback: Default Flash model selected."},
"artist": {"model": "models/gemini-1.5-flash", "reason": "Fallback: Default Flash model selected."},
"logic": {"model": "models/gemini-1.5-pro", "reason": "Fallback: Default Pro model selected.", "estimated_cost": "$3.50/1M"},
"writer": {"model": "models/gemini-1.5-flash", "reason": "Fallback: Default Flash model selected.", "estimated_cost": "$0.075/1M"},
"artist": {"model": "models/gemini-1.5-flash", "reason": "Fallback: Default Flash model selected.", "estimated_cost": "$0.075/1M"},
"ranking": []
}
@@ -78,11 +117,8 @@ def select_best_models(force_refresh=False):
utils.log("SYSTEM", "Refreshing AI model list from API...")
models = [m.name for m in genai.list_models() if 'generateContent' in m.supported_generation_methods and 'gemini' in m.name.lower()]
bootstrapper = "models/gemini-1.5-flash"
if bootstrapper not in models:
candidates = [m for m in models if 'flash' in m]
bootstrapper = candidates[0] if candidates else "models/gemini-pro"
utils.log("SYSTEM", f"Bootstrapping model selection with: {bootstrapper}")
bootstrapper = get_optimal_model("flash")
utils.log("SYSTEM", f"Bootstrapping model selection with: {bootstrapper}")
model = genai.GenerativeModel(bootstrapper)
prompt = f"""
@@ -92,6 +128,10 @@ def select_best_models(force_refresh=False):
AVAILABLE_MODELS:
{json.dumps(models)}
PRICING_CONTEXT (USD per 1M tokens):
- Flash Models (e.g. gemini-1.5-flash): ~$0.075 Input / $0.30 Output. (Very Cheap)
- Pro Models (e.g. gemini-1.5-pro): ~$3.50 Input / $10.50 Output. (Expensive)
CRITERIA:
- LOGIC: Needs complex reasoning, JSON adherence, and instruction following. (Prefer Pro/1.5).
- WRITER: Needs creativity, prose quality, and speed. (Prefer Flash/1.5 for speed, or Pro for quality).
@@ -103,10 +143,10 @@ def select_best_models(force_refresh=False):
OUTPUT_FORMAT (JSON):
{{
"logic": {{ "model": "string", "reason": "string" }},
"writer": {{ "model": "string", "reason": "string" }},
"artist": {{ "model": "string", "reason": "string" }},
"ranking": [ {{ "model": "string", "reason": "string" }} ]
"logic": {{ "model": "string", "reason": "string", "estimated_cost": "$X.XX Input / $X.XX Output" }},
"writer": {{ "model": "string", "reason": "string", "estimated_cost": "$X.XX Input / $X.XX Output" }},
"artist": {{ "model": "string", "reason": "string", "estimated_cost": "$X.XX Input / $X.XX Output" }},
"ranking": [ {{ "model": "string", "reason": "string", "estimated_cost": "string" }} ]
}}
"""
@@ -142,7 +182,7 @@ def select_best_models(force_refresh=False):
return fallback
def init_models(force=False):
global model_logic, model_writer, model_artist, model_image
global model_logic, model_writer, model_artist, model_image, logic_model_name, writer_model_name, artist_model_name
if model_logic and not force: return
genai.configure(api_key=config.API_KEY)
@@ -171,18 +211,45 @@ def init_models(force=False):
utils.log("SYSTEM", "Selecting optimal models via AI...")
selected_models = select_best_models(force_refresh=force)
def get_model_name(role_data):
if isinstance(role_data, dict): return role_data.get('model')
return role_data
# Check for missing costs and force refresh if needed
if not force:
missing_costs = False
for role in ['logic', 'writer', 'artist']:
if 'estimated_cost' not in selected_models.get(role, {}) or selected_models[role].get('estimated_cost') == 'N/A':
missing_costs = True
if missing_costs:
utils.log("SYSTEM", "⚠️ Missing cost info in cached models. Forcing refresh.")
return init_models(force=True)
logic_name = get_model_name(selected_models['logic']) if config.MODEL_LOGIC_HINT == "AUTO" else config.MODEL_LOGIC_HINT
writer_name = get_model_name(selected_models['writer']) if config.MODEL_WRITER_HINT == "AUTO" else config.MODEL_WRITER_HINT
artist_name = get_model_name(selected_models['artist']) if config.MODEL_ARTIST_HINT == "AUTO" else config.MODEL_ARTIST_HINT
utils.log("SYSTEM", f"Models: Logic={logic_name} | Writer={writer_name} | Artist={artist_name}")
def get_model_details(role_data):
if isinstance(role_data, dict): return role_data.get('model'), role_data.get('estimated_cost', 'N/A')
return role_data, 'N/A'
logic_name, logic_cost = get_model_details(selected_models['logic'])
writer_name, writer_cost = get_model_details(selected_models['writer'])
artist_name, artist_cost = get_model_details(selected_models['artist'])
logic_name = logic_model_name = logic_name if config.MODEL_LOGIC_HINT == "AUTO" else config.MODEL_LOGIC_HINT
writer_name = writer_model_name = writer_name if config.MODEL_WRITER_HINT == "AUTO" else config.MODEL_WRITER_HINT
artist_name = artist_model_name = artist_name if config.MODEL_ARTIST_HINT == "AUTO" else config.MODEL_ARTIST_HINT
model_logic = genai.GenerativeModel(logic_name, safety_settings=utils.SAFETY_SETTINGS)
model_writer = genai.GenerativeModel(writer_name, safety_settings=utils.SAFETY_SETTINGS)
model_artist = genai.GenerativeModel(artist_name, safety_settings=utils.SAFETY_SETTINGS)
utils.log("SYSTEM", f"Models: Logic={logic_name} ({logic_cost}) | Writer={writer_name} ({writer_cost}) | Artist={artist_name}")
# Update pricing in utils
utils.update_pricing(logic_name, logic_cost)
utils.update_pricing(writer_name, writer_cost)
utils.update_pricing(artist_name, artist_cost)
# Initialize or Update Resilient Models
if model_logic is None:
model_logic = ResilientModel(logic_name, utils.SAFETY_SETTINGS, "Logic")
model_writer = ResilientModel(writer_name, utils.SAFETY_SETTINGS, "Writer")
model_artist = ResilientModel(artist_name, utils.SAFETY_SETTINGS, "Artist")
else:
# If models already exist (re-init), update them in place
model_logic.update(logic_name)
model_writer.update(writer_name)
model_artist.update(artist_name)
# Initialize Image Model (Default to None)
model_image = None

View File

@@ -80,7 +80,8 @@ def evaluate_image_quality(image_path, prompt, model, folder=None):
PROMPT: '{prompt}'
OUTPUT_FORMAT (JSON): {{ "score": int (1-10), "reason": "string" }}
""", img])
if folder: utils.log_usage(folder, "logic-pro", response.usage_metadata)
model_name = getattr(model, 'name', "logic-pro")
if folder: utils.log_usage(folder, model_name, response.usage_metadata)
data = json.loads(utils.clean_json(response.text))
return data.get('score'), data.get('reason')
except Exception as e: return None, str(e)
@@ -104,7 +105,7 @@ def generate_blurb(bp, folder):
"""
try:
response = ai.model_writer.generate_content(prompt)
utils.log_usage(folder, "writer-flash", response.usage_metadata)
utils.log_usage(folder, ai.model_writer.name, response.usage_metadata)
blurb = response.text
with open(os.path.join(folder, "blurb.txt"), "w") as f: f.write(blurb)
with open(os.path.join(folder, "back_cover.txt"), "w") as f: f.write(blurb)
@@ -157,6 +158,7 @@ def generate_cover(bp, folder, tracking=None, feedback=None, interactive=False):
"""
try:
resp = ai.model_logic.generate_content(analysis_prompt)
utils.log_usage(folder, ai.model_logic.name, resp.usage_metadata)
decision = json.loads(utils.clean_json(resp.text))
if decision.get('action') == 'REGENERATE_LAYOUT':
regenerate_image = False
@@ -193,7 +195,7 @@ def generate_cover(bp, folder, tracking=None, feedback=None, interactive=False):
"""
try:
response = ai.model_artist.generate_content(design_prompt)
utils.log_usage(folder, "artist-flash", response.usage_metadata)
utils.log_usage(folder, ai.model_artist.name, response.usage_metadata)
design = json.loads(utils.clean_json(response.text))
bg_color = design.get('primary_color', '#252570')
@@ -211,7 +213,7 @@ def generate_cover(bp, folder, tracking=None, feedback=None, interactive=False):
best_img_path = None
if regenerate_image:
for i in range(1, 6):
for i in range(1, 4):
utils.log("MARKETING", f"Generating cover art (Attempt {i}/5)...")
try:
if not ai.model_image: raise ImportError("No Image Generation Model available.")
@@ -231,7 +233,7 @@ def generate_cover(bp, folder, tracking=None, feedback=None, interactive=False):
result.images[0].save(attempt_path)
utils.log_usage(folder, "imagen", image_count=1)
score, critique = evaluate_image_quality(attempt_path, art_prompt, ai.model_logic, folder)
score, critique = evaluate_image_quality(attempt_path, art_prompt, ai.model_writer, folder)
if score is None: score = 0
utils.log("MARKETING", f" -> Image Score: {score}/10. Critique: {critique}")
@@ -319,8 +321,8 @@ def generate_cover(bp, folder, tracking=None, feedback=None, interactive=False):
for attempt in range(1, 6):
utils.log("MARKETING", f"Designing text layout (Attempt {attempt}/5)...")
try:
response = ai.model_logic.generate_content([layout_prompt, img])
utils.log_usage(folder, "logic-pro", response.usage_metadata)
response = ai.model_writer.generate_content([layout_prompt, img])
utils.log_usage(folder, ai.model_writer.name, response.usage_metadata)
layout = json.loads(utils.clean_json(response.text))
if isinstance(layout, list): layout = layout[0] if layout else {}
except Exception as e:
@@ -379,7 +381,7 @@ def generate_cover(bp, folder, tracking=None, feedback=None, interactive=False):
2. Is the contrast sufficient?
3. Does it look professional?
"""
score, critique = evaluate_image_quality(attempt_path, eval_prompt, ai.model_logic, folder)
score, critique = evaluate_image_quality(attempt_path, eval_prompt, ai.model_writer, folder)
if score is None: score = 0
utils.log("MARKETING", f" -> Layout Score: {score}/10. Critique: {critique}")

View File

@@ -52,7 +52,8 @@ def refresh_style_guidelines(model, folder=None):
"""
try:
response = model.generate_content(prompt)
if folder: utils.log_usage(folder, "logic-pro", response.usage_metadata)
model_name = getattr(model, 'name', ai.logic_model_name)
if folder: utils.log_usage(folder, model_name, response.usage_metadata)
new_data = json.loads(utils.clean_json(response.text))
# Validate
@@ -157,7 +158,7 @@ def enrich(bp, folder, context=""):
try:
# Merge AI response with existing data (don't overwrite if user provided specific keys)
response = ai.model_logic.generate_content(prompt)
utils.log_usage(folder, "logic-pro", response.usage_metadata)
utils.log_usage(folder, ai.model_logic.name, response.usage_metadata)
response_text = response.text
cleaned_json = utils.clean_json(response_text)
ai_data = json.loads(cleaned_json)
@@ -234,7 +235,7 @@ def plan_structure(bp, folder):
"""
try:
response = ai.model_logic.generate_content(prompt)
utils.log_usage(folder, "logic-pro", response.usage_metadata)
utils.log_usage(folder, ai.model_logic.name, response.usage_metadata)
return json.loads(utils.clean_json(response.text))['events']
except:
return []
@@ -268,7 +269,7 @@ def expand(events, pass_num, target_chapters, bp, folder):
"""
try:
response = ai.model_logic.generate_content(prompt)
utils.log_usage(folder, "logic-pro", response.usage_metadata)
utils.log_usage(folder, ai.model_logic.name, response.usage_metadata)
new_events = json.loads(utils.clean_json(response.text))['events']
if len(new_events) > len(events):
@@ -318,7 +319,7 @@ def create_chapter_plan(events, bp, folder):
"""
try:
response = ai.model_logic.generate_content(prompt)
utils.log_usage(folder, "logic-pro", response.usage_metadata)
utils.log_usage(folder, ai.model_logic.name, response.usage_metadata)
plan = json.loads(utils.clean_json(response.text))
target_str = str(words).lower().replace(',', '').replace('k', '000').replace('+', '').replace(' ', '')
@@ -376,7 +377,7 @@ def update_tracking(folder, chapter_num, chapter_text, current_tracking):
"""
try:
response = ai.model_logic.generate_content(prompt)
utils.log_usage(folder, "logic-pro", response.usage_metadata)
utils.log_usage(folder, ai.model_logic.name, response.usage_metadata)
new_data = json.loads(utils.clean_json(response.text))
return new_data
except Exception as e:
@@ -439,7 +440,8 @@ def evaluate_chapter_quality(text, chapter_title, genre, model, folder):
"""
try:
response = model.generate_content([prompt, text[:30000]])
utils.log_usage(folder, "logic-pro", response.usage_metadata)
model_name = getattr(model, 'name', ai.logic_model_name)
utils.log_usage(folder, model_name, response.usage_metadata)
data = json.loads(utils.clean_json(response.text))
critique_text = data.get('critique', 'No critique provided.')
@@ -483,7 +485,7 @@ def check_pacing(bp, summary, last_chapter_text, last_chapter_data, remaining_ch
"""
try:
response = ai.model_logic.generate_content(prompt)
utils.log_usage(folder, "logic-pro", response.usage_metadata)
utils.log_usage(folder, ai.model_logic.name, response.usage_metadata)
return json.loads(utils.clean_json(response.text))
except Exception as e:
utils.log("ARCHITECT", f"Pacing check failed: {e}")
@@ -508,7 +510,7 @@ def create_initial_persona(bp, folder):
"""
try:
response = ai.model_logic.generate_content(prompt)
utils.log_usage(folder, "logic-pro", response.usage_metadata)
utils.log_usage(folder, ai.model_logic.name, response.usage_metadata)
return json.loads(utils.clean_json(response.text))
except Exception as e:
utils.log("SYSTEM", f"Persona generation failed: {e}")
@@ -533,7 +535,7 @@ def refine_persona(bp, text, folder):
"""
try:
response = ai.model_logic.generate_content(prompt)
utils.log_usage(folder, "logic-pro", response.usage_metadata)
utils.log_usage(folder, ai.model_logic.name, response.usage_metadata)
new_bio = json.loads(utils.clean_json(response.text)).get('bio')
if new_bio:
ad['bio'] = new_bio
@@ -678,7 +680,7 @@ def write_chapter(chap, bp, folder, prev_sum, tracking=None, prev_content=None):
current_text = ""
try:
resp_draft = ai.model_writer.generate_content(prompt)
utils.log_usage(folder, "writer-flash", resp_draft.usage_metadata)
utils.log_usage(folder, ai.model_writer.name, resp_draft.usage_metadata)
current_text = resp_draft.text
except Exception as e:
utils.log("WRITER", f"⚠️ Failed Ch {chap['chapter_number']}: {e}")
@@ -696,7 +698,7 @@ def write_chapter(chap, bp, folder, prev_sum, tracking=None, prev_content=None):
for attempt in range(1, max_attempts + 1):
utils.log("WRITER", f" -> Evaluating Ch {chap['chapter_number']} (Attempt {attempt}/{max_attempts})...")
score, critique = evaluate_chapter_quality(current_text, chap['title'], meta.get('genre', 'Fiction'), ai.model_logic, folder)
score, critique = evaluate_chapter_quality(current_text, chap['title'], meta.get('genre', 'Fiction'), ai.model_writer, folder)
past_critiques.append(f"Attempt {attempt}: {critique}")
@@ -736,8 +738,8 @@ def write_chapter(chap, bp, folder, prev_sum, tracking=None, prev_content=None):
"""
try:
resp_rewrite = ai.model_writer.generate_content(full_rewrite_prompt)
utils.log_usage(folder, "writer-flash", resp_rewrite.usage_metadata)
resp_rewrite = ai.model_logic.generate_content(full_rewrite_prompt)
utils.log_usage(folder, ai.model_logic.name, resp_rewrite.usage_metadata)
current_text = resp_rewrite.text
continue
except Exception as e:
@@ -787,9 +789,9 @@ def write_chapter(chap, bp, folder, prev_sum, tracking=None, prev_content=None):
OUTPUT: Polished Markdown.
"""
try:
# Use Logic model (Pro) for refinement to ensure higher quality prose
resp_refine = ai.model_logic.generate_content(refine_prompt)
utils.log_usage(folder, "logic-pro", resp_refine.usage_metadata)
# Use Writer model (Flash) for refinement to save costs (Flash 1.5 is sufficient for editing)
resp_refine = ai.model_writer.generate_content(refine_prompt)
utils.log_usage(folder, ai.model_writer.name, resp_refine.usage_metadata)
current_text = resp_refine.text
except Exception as e:
utils.log("WRITER", f"Refinement failed: {e}")
@@ -814,7 +816,7 @@ def harvest_metadata(bp, folder, full_manuscript):
"""
try:
response = ai.model_logic.generate_content(prompt)
utils.log_usage(folder, "logic-pro", response.usage_metadata)
utils.log_usage(folder, ai.model_logic.name, response.usage_metadata)
new_chars = json.loads(utils.clean_json(response.text)).get('new_characters', [])
if new_chars:
valid_chars = filter_characters(new_chars)
@@ -867,7 +869,7 @@ def update_persona_sample(bp, folder):
"""
try:
response = ai.model_logic.generate_content(prompt)
utils.log_usage(folder, "logic-pro", response.usage_metadata)
utils.log_usage(folder, ai.model_logic.name, response.usage_metadata)
bio = response.text.strip()
except: bio = "Style analysis unavailable."
@@ -903,7 +905,7 @@ def refine_bible(bible, instruction, folder):
"""
try:
response = ai.model_logic.generate_content(prompt)
utils.log_usage(folder, "logic-pro", response.usage_metadata)
utils.log_usage(folder, ai.model_logic.name, response.usage_metadata)
new_data = json.loads(utils.clean_json(response.text))
return new_data
except Exception as e:
@@ -939,7 +941,7 @@ def analyze_consistency(bp, manuscript, folder):
"""
try:
response = ai.model_logic.generate_content(prompt)
utils.log_usage(folder, "logic-pro", response.usage_metadata)
utils.log_usage(folder, ai.model_logic.name, response.usage_metadata)
return json.loads(utils.clean_json(response.text))
except Exception as e:
return {"issues": [f"Analysis failed: {e}"], "score": 0, "summary": "Error during analysis."}
@@ -1034,8 +1036,8 @@ def rewrite_chapter_content(bp, manuscript, chapter_num, instruction, folder):
"""
try:
response = ai.model_writer.generate_content(prompt)
utils.log_usage(folder, "writer-flash", response.usage_metadata)
response = ai.model_logic.generate_content(prompt)
utils.log_usage(folder, ai.model_logic.name, response.usage_metadata)
try:
data = json.loads(utils.clean_json(response.text))
return data.get('content'), data.get('summary')
@@ -1073,6 +1075,7 @@ def check_and_propagate(bp, manuscript, changed_chap_num, folder, change_summary
"""
try:
resp = ai.model_writer.generate_content(change_summary_prompt)
utils.log_usage(folder, ai.model_writer.name, resp.usage_metadata)
current_context = resp.text
except:
current_context = changed_chap.get('content', '')[-2000:] # Fallback
@@ -1133,6 +1136,7 @@ def check_and_propagate(bp, manuscript, changed_chap_num, folder, change_summary
try:
resp = ai.model_logic.generate_content(scan_prompt)
utils.log_usage(folder, ai.model_logic.name, resp.usage_metadata)
potential_impact_chapters = json.loads(utils.clean_json(resp.text))
if not isinstance(potential_impact_chapters, list): potential_impact_chapters = []
# Ensure integers
@@ -1179,6 +1183,7 @@ def check_and_propagate(bp, manuscript, changed_chap_num, folder, change_summary
try:
response = ai.model_writer.generate_content(prompt)
utils.log_usage(folder, ai.model_writer.name, response.usage_metadata)
data = json.loads(utils.clean_json(response.text))
if data.get('status') == 'NO_CHANGE':

View File

@@ -4,6 +4,7 @@ import datetime
import time
import config
import threading
import re
SAFETY_SETTINGS = [
{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"},
@@ -15,6 +16,9 @@ SAFETY_SETTINGS = [
# Thread-local storage for logging context
_log_context = threading.local()
# Cache for dynamic pricing from AI model selection
PRICING_CACHE = {}
def set_log_file(filepath):
_log_context.log_file = filepath
@@ -136,32 +140,85 @@ def get_latest_run_folder(base_name):
runs.sort(key=lambda x: int(x.split('_')[1]) if x.split('_')[1].isdigit() else 0)
return os.path.join(base_name, runs[-1])
def update_pricing(model_name, cost_str):
"""Parses cost string from AI selection and updates cache."""
if not model_name or not cost_str or cost_str == 'N/A': return
try:
# Look for patterns like "$0.075 Input" or "$3.50/1M"
# Default to 0.0
in_cost = 0.0
out_cost = 0.0
# Extract all float-like numbers following a $ sign
prices = re.findall(r'(?:\$|USD)\s*([0-9]+\.?[0-9]*)', cost_str, re.IGNORECASE)
if len(prices) >= 2:
in_cost = float(prices[0])
out_cost = float(prices[1])
elif len(prices) == 1:
in_cost = float(prices[0])
out_cost = in_cost * 3 # Rough heuristic if only one price provided
if in_cost > 0:
PRICING_CACHE[model_name] = {"input": in_cost, "output": out_cost}
# log("SYSTEM", f"Updated pricing for {model_name}: In=${in_cost} | Out=${out_cost}")
except:
pass
def calculate_cost(model_label, input_tokens, output_tokens, image_count=0):
cost = 0.0
m = model_label.lower()
# Check dynamic cache first
if model_label in PRICING_CACHE:
rates = PRICING_CACHE[model_label]
cost = (input_tokens / 1_000_000 * rates['input']) + (output_tokens / 1_000_000 * rates['output'])
elif 'imagen' in m or image_count > 0:
cost = (image_count * 0.04)
else:
# Fallbacks
if 'flash' in m:
cost = (input_tokens / 1_000_000 * 0.075) + (output_tokens / 1_000_000 * 0.30)
elif 'pro' in m or 'logic' in m:
cost = (input_tokens / 1_000_000 * 3.50) + (output_tokens / 1_000_000 * 10.50)
return round(cost, 6)
def log_usage(folder, model_label, usage_metadata=None, image_count=0):
if not folder or not os.path.exists(folder): return
log_path = os.path.join(folder, "usage_log.json")
entry = {
"timestamp": int(time.time()),
"model": model_label,
"input_tokens": 0,
"output_tokens": 0,
"images": image_count
}
input_tokens = 0
output_tokens = 0
if usage_metadata:
try:
entry["input_tokens"] = usage_metadata.prompt_token_count
entry["output_tokens"] = usage_metadata.candidates_token_count
input_tokens = usage_metadata.prompt_token_count
output_tokens = usage_metadata.candidates_token_count
except: pass
# Calculate Cost
cost = calculate_cost(model_label, input_tokens, output_tokens, image_count)
entry = {
"timestamp": int(time.time()),
"date": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
"model": model_label,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"images": image_count,
"cost": round(cost, 6)
}
data = {"log": [], "totals": {"input_tokens": 0, "output_tokens": 0, "images": 0, "est_cost_usd": 0.0}}
if os.path.exists(log_path):
try:
loaded = json.load(open(log_path, 'r'))
if isinstance(loaded, list): data["log"] = loaded
else: data = loaded
elif isinstance(loaded, dict): data = loaded
except: pass
data["log"].append(entry)
@@ -171,25 +228,28 @@ def log_usage(folder, model_label, usage_metadata=None, image_count=0):
t_out = sum(x.get('output_tokens', 0) for x in data["log"])
t_img = sum(x.get('images', 0) for x in data["log"])
cost = 0.0
total_cost = 0.0
for x in data["log"]:
m = x.get('model', '').lower()
i = x.get('input_tokens', 0)
o = x.get('output_tokens', 0)
imgs = x.get('images', 0)
if 'flash' in m:
cost += (i / 1_000_000 * 0.075) + (o / 1_000_000 * 0.30)
elif 'pro' in m or 'logic' in m:
cost += (i / 1_000_000 * 3.50) + (o / 1_000_000 * 10.50)
elif 'imagen' in m or imgs > 0:
cost += (imgs * 0.04)
if 'cost' in x:
total_cost += x['cost']
else:
# Fallback calculation for old logs without explicit cost field
c = 0.0
mx = x.get('model', '').lower()
ix = x.get('input_tokens', 0)
ox = x.get('output_tokens', 0)
imgx = x.get('images', 0)
if 'flash' in mx: c = (ix / 1_000_000 * 0.075) + (ox / 1_000_000 * 0.30)
elif 'pro' in mx or 'logic' in mx: c = (ix / 1_000_000 * 3.50) + (ox / 1_000_000 * 10.50)
elif 'imagen' in mx or imgx > 0: c = (imgx * 0.04)
total_cost += c
data["totals"] = {
"input_tokens": t_in,
"output_tokens": t_out,
"images": t_img,
"est_cost_usd": round(cost, 4)
"est_cost_usd": round(total_cost, 4)
}
with open(log_path, 'w') as f: json.dump(data, f, indent=2)

View File

@@ -224,9 +224,10 @@ def regenerate_artifacts_task(run_id, project_path, feedback=None):
if not os.path.exists(run_dir):
log_file = os.path.join(project_path, f"system_log_{run_id}.txt")
# Clear previous logs (File) to refresh the log window
try:
with open(log_file, 'a', encoding='utf-8') as f:
f.write(f"\n\n[{datetime.utcnow().strftime('%H:%M:%S')}] --- REGENERATION STARTED ---\n")
with open(log_file, 'w', encoding='utf-8') as f:
f.write(f"[{datetime.utcnow().strftime('%H:%M:%S')}] --- REGENERATION STARTED ---\n")
except: pass
utils.set_log_file(log_file)
@@ -234,6 +235,7 @@ def regenerate_artifacts_task(run_id, project_path, feedback=None):
utils.set_log_callback(lambda p, m: db_log_callback(db_path, run_id, p, m))
try:
with sqlite3.connect(db_path) as conn:
conn.execute("DELETE FROM log_entry WHERE run_id = ?", (run_id,)) # Clear DB logs
conn.execute("UPDATE run SET status = 'running' WHERE id = ?", (run_id,))
except: pass
@@ -326,14 +328,24 @@ def rewrite_chapter_task(run_id, project_path, book_folder, chap_num, instructio
run_dir = os.path.join(project_path, "runs", f"run_{run_id}")
# --- Setup Logging for Rewrite ---
# Append to the existing run log so it appears in the UI
log_file = os.path.join(run_dir, "web_console.log")
if not os.path.exists(log_file):
log_file = os.path.join(project_path, f"system_log_{run_id}.txt")
# Clear previous logs to refresh the log window
try:
with open(log_file, 'w', encoding='utf-8') as f: f.write("")
except: pass
utils.set_log_file(log_file)
db_path = os.path.join(config.DATA_DIR, "bookapp.db")
utils.set_log_callback(lambda p, m: db_log_callback(db_path, run_id, p, m))
try:
with sqlite3.connect(db_path) as conn:
conn.execute("DELETE FROM log_entry WHERE run_id = ?", (run_id,))
conn.execute("UPDATE run SET status = 'running' WHERE id = ?", (run_id,))
except: pass
# ---------------------------------
book_path = os.path.join(run_dir, book_folder)
@@ -368,10 +380,25 @@ def rewrite_chapter_task(run_id, project_path, book_folder, chap_num, instructio
with open(ms_path, 'w') as f: json.dump(ms, f, indent=2)
export.compile_files(bp, ms, book_path)
try:
with sqlite3.connect(db_path) as conn:
conn.execute("UPDATE run SET status = 'completed' WHERE id = ?", (run_id,))
except: pass
return True
# If result is False/None
try:
with sqlite3.connect(db_path) as conn:
conn.execute("UPDATE run SET status = 'completed' WHERE id = ?", (run_id,))
except: pass
return False
except Exception as e:
utils.log("ERROR", f"Rewrite task exception for run {run_id}/{book_folder}: {e}")
try:
with sqlite3.connect(db_path) as conn:
conn.execute("UPDATE run SET status = 'failed' WHERE id = ?", (run_id,))
except: pass
return False
@huey.task()

View File

@@ -27,6 +27,7 @@
<tr>
<th style="width: 15%">Role</th>
<th style="width: 25%">Selected Model</th>
<th style="width: 15%">Est. Cost</th>
<th>Selection Reasoning</th>
</tr>
</thead>
@@ -39,6 +40,9 @@
<td>
<span class="badge bg-info text-dark">{{ info.model }}</span>
</td>
<td>
<span class="badge bg-light text-dark border">{{ info.estimated_cost }}</span>
</td>
<td class="small text-muted">
{{ info.reason }}
</td>
@@ -71,6 +75,7 @@
<tr>
<th style="width: 10%">Rank</th>
<th style="width: 30%">Model Name</th>
<th style="width: 15%">Est. Cost</th>
<th>Reasoning</th>
</tr>
</thead>
@@ -80,6 +85,7 @@
<tr>
<td class="fw-bold">{{ loop.index }}</td>
<td><span class="badge bg-secondary">{{ item.model }}</span></td>
<td><small>{{ item.estimated_cost }}</small></td>
<td class="small text-muted">{{ item.reason }}</td>
</tr>
{% endfor %}