Adding files.

This commit is contained in:
2026-02-03 10:13:33 -05:00
parent fc44a7834a
commit 9dec4a472f
34 changed files with 5984 additions and 0 deletions

215
modules/ai.py Normal file
View File

@@ -0,0 +1,215 @@
import os
import sys
import json
import time
import warnings
import google.generativeai as genai
import config
from . import utils
# Suppress Vertex AI warnings
warnings.filterwarnings("ignore", category=UserWarning, module="vertexai")
try:
import vertexai
from vertexai.preview.vision_models import ImageGenerationModel as VertexImageModel
HAS_VERTEX = True
except ImportError:
HAS_VERTEX = False
try:
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
HAS_OAUTH = True
except ImportError:
HAS_OAUTH = False
model_logic = None
model_writer = None
model_artist = None
model_image = None
def get_optimal_model(base_type="pro"):
try:
models = [m for m in genai.list_models() if 'generateContent' in m.supported_generation_methods]
candidates = [m.name for m in models if base_type in m.name]
if not candidates: return f"models/gemini-1.5-{base_type}"
def score(n):
# Prioritize stable models (higher quotas) over experimental/beta ones
if "exp" in n or "beta" in n: return 0
if "latest" in n: return 50
return 100
return sorted(candidates, key=score, reverse=True)[0]
except: return f"models/gemini-1.5-{base_type}"
def get_default_models():
return {
"logic": {"model": "models/gemini-1.5-pro", "reason": "Fallback: Default Pro model selected."},
"writer": {"model": "models/gemini-1.5-flash", "reason": "Fallback: Default Flash model selected."},
"artist": {"model": "models/gemini-1.5-flash", "reason": "Fallback: Default Flash model selected."},
"ranking": []
}
def select_best_models(force_refresh=False):
"""
Uses a safe bootstrapper model to analyze available models and pick the best ones.
Caches the result for 24 hours.
"""
cache_path = os.path.join(config.DATA_DIR, "model_cache.json")
cached_models = None
# 1. Check Cache
if os.path.exists(cache_path):
try:
with open(cache_path, 'r') as f:
cached = json.load(f)
cached_models = cached.get('models', {})
# Check if within 24 hours (86400 seconds)
if not force_refresh and time.time() - cached.get('timestamp', 0) < 86400:
models = cached_models
# Validate format (must be dicts with reasons, not just strings)
if isinstance(models.get('logic'), dict) and 'reason' in models['logic']:
utils.log("SYSTEM", "Using cached AI model selection (valid for 24h).")
return models
except Exception as e:
utils.log("SYSTEM", f"Cache read failed: {e}. Refreshing models.")
try:
utils.log("SYSTEM", "Refreshing AI model list from API...")
models = [m.name for m in genai.list_models() if 'generateContent' in m.supported_generation_methods]
bootstrapper = "models/gemini-1.5-flash"
if bootstrapper not in models:
candidates = [m for m in models if 'flash' in m]
bootstrapper = candidates[0] if candidates else "models/gemini-pro"
utils.log("SYSTEM", f"Bootstrapping model selection with: {bootstrapper}")
model = genai.GenerativeModel(bootstrapper)
prompt = f"Analyze this list of available Google Gemini models:\n{json.dumps(models)}\n\nSelect the best model for each of these three roles based on these criteria:\n- Most recent version with best features and ability.\n- Beta versions are okay, but avoid 'experimental' if a stable beta/prod version exists.\n- Consider quota efficiency (Flash is cheaper/faster, Pro is smarter).\n\nROLES:\n1. LOGIC: For complex reasoning, JSON structuring, and plot planning.\n2. WRITER: For creative fiction writing, prose generation, and speed.\n3. ARTIST: For generating visual art prompts and design instructions.\n\nAlso provide a 'ranking' list of ALL models analyzed, ordered from best/most useful to worst/least useful, with a short reason.\n\nReturn JSON: {{ 'logic': {{ 'model': 'model_name', 'reason': 'reasoning' }}, 'writer': {{ 'model': 'model_name', 'reason': 'reasoning' }}, 'artist': {{ 'model': 'model_name', 'reason': 'reasoning' }}, 'ranking': [ {{ 'model': 'model_name', 'reason': 'reasoning' }} ] }}"
response = model.generate_content(prompt)
selection = json.loads(utils.clean_json(response.text))
if not os.path.exists(config.DATA_DIR): os.makedirs(config.DATA_DIR)
with open(cache_path, 'w') as f:
json.dump({"timestamp": int(time.time()), "models": selection, "available_at_time": models}, f, indent=2)
return selection
except Exception as e:
utils.log("SYSTEM", f"AI Model Selection failed: {e}.")
# 3. Fallback to Stale Cache if available (Better than heuristics)
# Relaxed check: If we successfully loaded ANY JSON from the cache, use it.
if cached_models:
utils.log("SYSTEM", "⚠️ Using stale cached models due to API failure.")
return cached_models
utils.log("SYSTEM", "Falling back to heuristics.")
fallback = get_default_models()
# Save fallback to cache if file doesn't exist OR if we couldn't load it (corrupt/None)
# This ensures we have a valid file on disk for the web UI to read.
try:
with open(cache_path, 'w') as f:
json.dump({"timestamp": int(time.time()), "models": fallback, "error": str(e)}, f, indent=2)
except: pass
return fallback
def init_models(force=False):
global model_logic, model_writer, model_artist, model_image
if model_logic and not force: return
genai.configure(api_key=config.API_KEY)
# Check cache to skip frequent validation
cache_path = os.path.join(config.DATA_DIR, "model_cache.json")
skip_validation = False
if not force and os.path.exists(cache_path):
try:
with open(cache_path, 'r') as f: cached = json.load(f)
if time.time() - cached.get('timestamp', 0) < 86400: skip_validation = True
except: pass
if not skip_validation:
# Validate Gemini API Key
utils.log("SYSTEM", "Validating credentials...")
try:
list(genai.list_models(page_size=1))
utils.log("SYSTEM", "✅ Gemini API Key is valid.")
except Exception as e:
# Check if we have a cache file we can rely on before exiting
if os.path.exists(cache_path):
utils.log("SYSTEM", f"⚠️ API check failed ({e}), but cache exists. Attempting to use cached models.")
else:
utils.log("SYSTEM", f"⚠️ API check failed ({e}). No cache found. Attempting to initialize with defaults.")
utils.log("SYSTEM", "Selecting optimal models via AI...")
selected_models = select_best_models(force_refresh=force)
def get_model_name(role_data):
if isinstance(role_data, dict): return role_data.get('model')
return role_data
logic_name = get_model_name(selected_models['logic']) if config.MODEL_LOGIC_HINT == "AUTO" else config.MODEL_LOGIC_HINT
writer_name = get_model_name(selected_models['writer']) if config.MODEL_WRITER_HINT == "AUTO" else config.MODEL_WRITER_HINT
artist_name = get_model_name(selected_models['artist']) if config.MODEL_ARTIST_HINT == "AUTO" else config.MODEL_ARTIST_HINT
utils.log("SYSTEM", f"Models: Logic={logic_name} | Writer={writer_name} | Artist={artist_name}")
model_logic = genai.GenerativeModel(logic_name, safety_settings=utils.SAFETY_SETTINGS)
model_writer = genai.GenerativeModel(writer_name, safety_settings=utils.SAFETY_SETTINGS)
model_artist = genai.GenerativeModel(artist_name, safety_settings=utils.SAFETY_SETTINGS)
# Initialize Image Model (Default to None)
model_image = None
if hasattr(genai, 'ImageGenerationModel'):
try: model_image = genai.ImageGenerationModel("imagen-3.0-generate-001")
except: pass
img_source = "Gemini API" if model_image else "None"
if HAS_VERTEX and config.GCP_PROJECT:
creds = None
# Handle OAuth Client ID (credentials.json) if provided instead of Service Account
if HAS_OAUTH:
gac = config.GOOGLE_CREDS # Use persistent config, not volatile env var
if gac and os.path.exists(gac):
try:
with open(gac, 'r') as f: data = json.load(f)
if 'installed' in data or 'web' in data:
# It's an OAuth Client ID. Unset env var to avoid library crash.
if "GOOGLE_APPLICATION_CREDENTIALS" in os.environ:
del os.environ["GOOGLE_APPLICATION_CREDENTIALS"]
token_path = os.path.join(os.path.dirname(os.path.abspath(gac)), 'token.json')
SCOPES = ['https://www.googleapis.com/auth/cloud-platform']
if os.path.exists(token_path):
creds = Credentials.from_authorized_user_file(token_path, SCOPES)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
try:
creds.refresh(Request())
except Exception:
utils.log("SYSTEM", "Token refresh failed. Re-authenticating...")
flow = InstalledAppFlow.from_client_secrets_file(gac, SCOPES)
creds = flow.run_local_server(port=0)
else:
utils.log("SYSTEM", "OAuth Client ID detected. Launching browser to authenticate...")
flow = InstalledAppFlow.from_client_secrets_file(gac, SCOPES)
creds = flow.run_local_server(port=0)
with open(token_path, 'w') as token: token.write(creds.to_json())
utils.log("SYSTEM", "✅ Authenticated via OAuth Client ID.")
except Exception as e:
utils.log("SYSTEM", f"⚠️ OAuth check failed: {e}")
vertexai.init(project=config.GCP_PROJECT, location=config.GCP_LOCATION, credentials=creds)
utils.log("SYSTEM", f"✅ Vertex AI initialized (Project: {config.GCP_PROJECT})")
# Override with Vertex Image Model if available
try:
model_image = VertexImageModel.from_pretrained("imagen-3.0-generate-001")
img_source = "Vertex AI"
except: pass
utils.log("SYSTEM", f"Image Generation Provider: {img_source}")

74
modules/export.py Normal file
View File

@@ -0,0 +1,74 @@
import os
import markdown
from docx import Document
from ebooklib import epub
from . import utils
def create_readme(folder, bp):
meta = bp['book_metadata']
ls = bp['length_settings']
content = f"""# {meta['title']}\n**Generated by BookApp**\n\n## Stats Used\n- **Type:** {ls.get('label', 'Custom')}\n- **Planned Chapters:** {ls['chapters']}\n- **Logic Depth:** {ls['depth']}\n- **Target Words:** {ls.get('words', 'Unknown')}"""
with open(os.path.join(folder, "README.md"), "w") as f: f.write(content)
def compile_files(bp, ms, folder):
utils.log("SYSTEM", "Compiling EPUB and DOCX...")
meta = bp.get('book_metadata', {})
title = meta.get('title', 'Untitled')
if meta.get('filename'):
safe = meta['filename']
else:
safe = "".join([c for c in title if c.isalnum() or c=='_']).replace(" ", "_")
doc = Document(); doc.add_heading(title, 0)
book = epub.EpubBook(); book.set_title(title); spine = ['nav']
# Add Cover if exists
cover_path = os.path.join(folder, "cover.png")
if os.path.exists(cover_path):
with open(cover_path, 'rb') as f:
book.set_cover("cover.png", f.read())
for c in ms:
# Determine filename/type
num_str = str(c['num']).lower()
if num_str == '0' or 'prologue' in num_str:
filename = "prologue.xhtml"
default_header = f"Prologue: {c['title']}"
elif 'epilogue' in num_str:
filename = "epilogue.xhtml"
default_header = f"Epilogue: {c['title']}"
else:
filename = f"ch_{c['num']}.xhtml"
default_header = f"Ch {c['num']}: {c['title']}"
# Check for AI-generated header in content
content = c['content'].strip()
clean_content = content.replace("```markdown", "").replace("```", "").strip()
lines = clean_content.split('\n')
ai_header = None
body_content = clean_content
if lines and lines[0].strip().startswith('# '):
ai_header = lines[0].strip().replace('#', '').strip()
header = ai_header
body_content = "\n".join(lines[1:]).strip()
else:
header = default_header
doc.add_heading(header, 1)
doc.add_paragraph(body_content)
ch = epub.EpubHtml(title=header, file_name=filename)
clean_content = clean_content.replace(f"{folder}\\", "").replace(f"{folder}/", "")
html_content = markdown.markdown(clean_content)
ch.content = html_content if ai_header else f"<h1>{header}</h1>{html_content}"
book.add_item(ch); spine.append(ch)
doc.save(os.path.join(folder, f"{safe}.docx"))
book.spine = spine; book.add_item(epub.EpubNcx()); book.add_item(epub.EpubNav())
epub.write_epub(os.path.join(folder, f"{safe}.epub"), book, {})
create_readme(folder, bp)

350
modules/marketing.py Normal file
View File

@@ -0,0 +1,350 @@
import os
import json
import shutil
import textwrap
import requests
import google.generativeai as genai
from . import utils
import config
from modules import ai
try:
from PIL import Image, ImageDraw, ImageFont, ImageStat
HAS_PIL = True
except ImportError:
HAS_PIL = False
def download_font(font_name):
"""Attempts to download a Google Font from GitHub."""
if not font_name: font_name = "Roboto"
if not os.path.exists(config.FONTS_DIR): os.makedirs(config.FONTS_DIR)
# Handle CSS-style lists (e.g. "Roboto, sans-serif")
if "," in font_name: font_name = font_name.split(",")[0].strip()
# Handle filenames provided by AI
if font_name.lower().endswith(('.ttf', '.otf')):
font_name = os.path.splitext(font_name)[0]
font_name = font_name.strip().strip("'").strip('"')
for suffix in ["-Regular", " Regular", " regular", "Regular", " Bold", " Italic"]:
if font_name.endswith(suffix):
font_name = font_name[:-len(suffix)]
font_name = font_name.strip()
clean_name = font_name.replace(" ", "").lower()
font_filename = f"{clean_name}.ttf"
font_path = os.path.join(config.FONTS_DIR, font_filename)
if os.path.exists(font_path) and os.path.getsize(font_path) > 1000:
utils.log("ASSETS", f"Using cached font: {font_path}")
return font_path
utils.log("ASSETS", f"Downloading font: {font_name}...")
compact_name = font_name.replace(" ", "")
title_compact = "".join(x.title() for x in font_name.split())
patterns = [
f"static/{title_compact}-Regular.ttf", f"{title_compact}-Regular.ttf",
f"{title_compact}[wght].ttf", f"{title_compact}[wdth,wght].ttf",
f"static/{compact_name}-Regular.ttf", f"{compact_name}-Regular.ttf",
f"{title_compact}-Regular.otf",
]
headers = {"User-Agent": "Mozilla/5.0 (BookApp/1.0)"}
for license_type in ["ofl", "apache", "ufl"]:
base_url = f"https://github.com/google/fonts/raw/main/{license_type}/{clean_name}"
for pattern in patterns:
try:
r = requests.get(f"{base_url}/{pattern}", headers=headers, timeout=5)
if r.status_code == 200 and len(r.content) > 1000:
with open(font_path, 'wb') as f: f.write(r.content)
utils.log("ASSETS", f"✅ Downloaded {font_name} to {font_path}")
return font_path
except Exception: continue
if clean_name != "roboto":
utils.log("ASSETS", f"⚠️ Font '{font_name}' not found. Falling back to Roboto.")
return download_font("Roboto")
return None
def evaluate_image_quality(image_path, prompt, model, folder=None):
if not HAS_PIL: return None, "PIL not installed"
try:
img = Image.open(image_path)
response = model.generate_content([f"Analyze this generated image against the description: '{prompt}'.\nRate accuracy/relevance on a scale of 1-10.\nProvide a 1-sentence critique.\nReturn JSON: {{'score': int, 'reason': 'string'}}", img])
if folder: utils.log_usage(folder, "logic-pro", response.usage_metadata)
data = json.loads(utils.clean_json(response.text))
return data.get('score'), data.get('reason')
except Exception as e: return None, str(e)
def generate_blurb(bp, folder):
utils.log("MARKETING", "Generating blurb...")
meta = bp.get('book_metadata', {})
prompt = f"""
Write a compelling back-cover blurb (approx 150-200 words) for this book.
TITLE: {meta.get('title')}
GENRE: {meta.get('genre')}
LOGLINE: {bp.get('manual_instruction')}
PLOT: {json.dumps(bp.get('plot_beats', []))}
CHARACTERS: {json.dumps(bp.get('characters', []))}
"""
try:
response = ai.model_writer.generate_content(prompt)
utils.log_usage(folder, "writer-flash", response.usage_metadata)
blurb = response.text
with open(os.path.join(folder, "blurb.txt"), "w") as f: f.write(blurb)
with open(os.path.join(folder, "back_cover.txt"), "w") as f: f.write(blurb)
except:
utils.log("MARKETING", "Failed to generate blurb.")
def generate_cover(bp, folder, tracking=None, feedback=None):
if not HAS_PIL:
utils.log("MARKETING", "Pillow not installed. Skipping image cover.")
return
utils.log("MARKETING", "Generating cover...")
meta = bp.get('book_metadata', {})
series = bp.get('series_metadata', {})
orientation = meta.get('style', {}).get('page_orientation', 'Portrait')
ar = "3:4"
if orientation == "Landscape": ar = "4:3"
elif orientation == "Square": ar = "1:1"
visual_context = ""
if tracking:
visual_context = "IMPORTANT VISUAL CONTEXT:\n"
if 'events' in tracking:
visual_context += f"Key Events/Themes: {json.dumps(tracking['events'][-5:])}\n"
if 'characters' in tracking:
visual_context += f"Character Appearances: {json.dumps(tracking['characters'])}\n"
# Feedback Analysis
regenerate_image = True
design_instruction = ""
if feedback and feedback.strip():
utils.log("MARKETING", f"Analyzing feedback: '{feedback}'...")
analysis_prompt = f"""
User Feedback on Book Cover: "{feedback}"
Determine if the user wants to:
1. Keep the current background image but change text/layout/color (REGENERATE_LAYOUT).
2. Create a completely new background image (REGENERATE_IMAGE).
NOTE: If the feedback is generic (e.g. "regenerate", "try again") or does not explicitly mention keeping the image/changing text only, default to REGENERATE_IMAGE.
Return JSON: {{ "action": "REGENERATE_LAYOUT" or "REGENERATE_IMAGE", "instruction": "Specific instruction for the Art Director" }}
"""
try:
resp = ai.model_logic.generate_content(analysis_prompt)
decision = json.loads(utils.clean_json(resp.text))
if decision.get('action') == 'REGENERATE_LAYOUT':
regenerate_image = False
utils.log("MARKETING", "Feedback indicates keeping image. Regenerating layout only.")
design_instruction = decision.get('instruction', feedback)
except:
utils.log("MARKETING", "Feedback analysis failed. Defaulting to full regeneration.")
design_prompt = f"""
Act as an Art Director. Design the cover for this book.
TITLE: {meta.get('title')}
GENRE: {meta.get('genre')}
TONE: {meta.get('style', {}).get('tone')}
CRITICAL INSTRUCTIONS:
1. CHARACTER APPEARANCE: Strictly adhere to the provided character descriptions (hair, eyes, race, age, clothing) in the Visual Context.
2. GENRE EXPRESSIONS: Ensure character facial expressions and body language heavily reflect the GENRE (e.g. Horror = terrified/menacing, Romance = longing/soft, Thriller = intense/alert).
{visual_context}
{f"USER FEEDBACK: {feedback}" if feedback else ""}
{f"INSTRUCTION: {design_instruction}" if design_instruction else ""}
Provide JSON output:
{{
"font_name": "Name of a popular Google Font (e.g. Roboto, Cinzel, Oswald, Playfair Display)",
"primary_color": "#HexCode (Background)",
"text_color": "#HexCode (Contrast)",
"art_prompt": "A detailed description of the cover art for an image generator. Explicitly describe characters based on the visual context."
}}
"""
try:
response = ai.model_artist.generate_content(design_prompt)
utils.log_usage(folder, "artist-flash", response.usage_metadata)
design = json.loads(utils.clean_json(response.text))
bg_color = design.get('primary_color', '#252570')
text_color = design.get('text_color', '#FFFFFF')
art_prompt = design.get('art_prompt', f"Cover art for {meta.get('title')}")
with open(os.path.join(folder, "cover_art_prompt.txt"), "w") as f:
f.write(art_prompt)
img = None
image_generated = False
width, height = 600, 900
best_img_score = 0
best_img_path = None
if regenerate_image:
for i in range(1, 6):
utils.log("MARKETING", f"Generating cover art (Attempt {i}/5)...")
try:
if not ai.model_image: raise ImportError("No Image Generation Model available.")
status = "success"
try:
result = ai.model_image.generate_images(prompt=art_prompt, number_of_images=1, aspect_ratio=ar)
except Exception as e:
if "resource" in str(e).lower() and ai.HAS_VERTEX:
utils.log("MARKETING", "⚠️ Imagen 3 failed. Trying Imagen 2...")
fb_model = ai.VertexImageModel.from_pretrained("imagegeneration@006")
result = fb_model.generate_images(prompt=art_prompt, number_of_images=1, aspect_ratio=ar)
status = "success_fallback"
else: raise e
attempt_path = os.path.join(folder, f"cover_art_attempt_{i}.png")
result.images[0].save(attempt_path)
utils.log_usage(folder, "imagen", image_count=1)
score, critique = evaluate_image_quality(attempt_path, art_prompt, ai.model_logic, folder)
if score is None: score = 0
utils.log("MARKETING", f" -> Image Score: {score}/10. Critique: {critique}")
utils.log_image_attempt(folder, "cover", art_prompt, f"cover_art_{i}.png", status, score=score, critique=critique)
if score > best_img_score:
best_img_score = score
best_img_path = attempt_path
if score == 10:
utils.log("MARKETING", " -> Perfect image accepted.")
break
if "scar" in critique.lower() or "deform" in critique.lower() or "blur" in critique.lower():
art_prompt += " (Ensure high quality, clear skin, no scars, sharp focus)."
except Exception as e:
utils.log("MARKETING", f"Image generation failed: {e}")
if "quota" in str(e).lower(): break
if best_img_path and os.path.exists(best_img_path):
final_art_path = os.path.join(folder, "cover_art.png")
if best_img_path != final_art_path:
shutil.copy(best_img_path, final_art_path)
img = Image.open(final_art_path).resize((width, height)).convert("RGB")
image_generated = True
else:
utils.log("MARKETING", "Falling back to solid color cover.")
img = Image.new('RGB', (width, height), color=bg_color)
utils.log_image_attempt(folder, "cover", art_prompt, "cover.png", "fallback_solid")
else:
# Load existing art
final_art_path = os.path.join(folder, "cover_art.png")
if os.path.exists(final_art_path):
utils.log("MARKETING", "Using existing cover art (Layout update only).")
img = Image.open(final_art_path).resize((width, height)).convert("RGB")
else:
utils.log("MARKETING", "Existing art not found. Forcing regeneration.")
# Fallback to solid color if we were supposed to reuse but couldn't find it
img = Image.new('RGB', (width, height), color=bg_color)
font_path = download_font(design.get('font_name') or 'Arial')
best_layout_score = 0
best_layout_path = None
base_layout_prompt = f"""
Act as a Senior Book Cover Designer. Analyze this 600x900 cover art.
BOOK DETAILS: Title: {meta.get('title')}, Author: {meta.get('author')}, Genre: {meta.get('genre')}
TASK: Determine best (x, y) coordinates for Title and Author. Do NOT place text over faces.
RETURN JSON: {{ "title": {{ "x": int, "y": int, "font_size": int, "font_name": "String", "color": "#Hex" }}, "author": {{ "x": int, "y": int, "font_size": int, "font_name": "String", "color": "#Hex" }} }}
"""
if feedback:
base_layout_prompt += f"\nUSER FEEDBACK: {feedback}\nAdjust layout/colors accordingly."
layout_prompt = base_layout_prompt
for attempt in range(1, 6):
utils.log("MARKETING", f"Designing text layout (Attempt {attempt}/5)...")
try:
response = ai.model_logic.generate_content([layout_prompt, img])
utils.log_usage(folder, "logic-pro", response.usage_metadata)
layout = json.loads(utils.clean_json(response.text))
if isinstance(layout, list): layout = layout[0] if layout else {}
except Exception as e:
utils.log("MARKETING", f"Layout generation failed: {e}")
continue
img_copy = img.copy()
draw = ImageDraw.Draw(img_copy)
def draw_element(key, text_override=None):
elem = layout.get(key)
if not elem: return
if isinstance(elem, list): elem = elem[0] if elem else {}
text = text_override if text_override else elem.get('text')
if not text: return
f_name = elem.get('font_name') or 'Arial'
f_path = download_font(f_name)
try:
if f_path: font = ImageFont.truetype(f_path, elem.get('font_size', 40))
else: raise IOError("Font not found")
except: font = ImageFont.load_default()
x, y = elem.get('x', 300), elem.get('y', 450)
color = elem.get('color') or '#FFFFFF'
avg_char_w = font.getlength("A")
wrap_w = int(550 / avg_char_w) if avg_char_w > 0 else 20
lines = textwrap.wrap(text, width=wrap_w)
line_heights = []
for l in lines:
bbox = draw.textbbox((0, 0), l, font=font)
line_heights.append(bbox[3] - bbox[1] + 10)
total_h = sum(line_heights)
current_y = y - (total_h // 2)
for i, line in enumerate(lines):
bbox = draw.textbbox((0, 0), line, font=font)
lx = x - ((bbox[2] - bbox[0]) / 2)
draw.text((lx, current_y), line, font=font, fill=color)
current_y += line_heights[i]
draw_element('title', meta.get('title'))
draw_element('author', meta.get('author'))
attempt_path = os.path.join(folder, f"cover_layout_attempt_{attempt}.png")
img_copy.save(attempt_path)
# Evaluate Layout
eval_prompt = f"Analyze this book cover layout. Is the text legible? Is the contrast good? Does it look professional? Title: {meta.get('title')}"
score, critique = evaluate_image_quality(attempt_path, eval_prompt, ai.model_logic, folder)
if score is None: score = 0
utils.log("MARKETING", f" -> Layout Score: {score}/10. Critique: {critique}")
if score > best_layout_score:
best_layout_score = score
best_layout_path = attempt_path
if score == 10:
utils.log("MARKETING", " -> Perfect layout accepted.")
break
layout_prompt = base_layout_prompt + f"\nCRITIQUE OF PREVIOUS ATTEMPT: {critique}\nAdjust position/color to fix this."
if best_layout_path:
shutil.copy(best_layout_path, os.path.join(folder, "cover.png"))
except Exception as e:
utils.log("MARKETING", f"Cover generation failed: {e}")
def create_marketing_assets(bp, folder, tracking=None):
generate_blurb(bp, folder)
generate_cover(bp, folder, tracking)

View File

@@ -0,0 +1,15 @@
flask
flask-login
flask-sqlalchemy
huey
werkzeug
google-generativeai
python-dotenv
rich
markdown
python-docx
EbookLib
requests
Pillow
google-cloud-aiplatform
google-auth-oauthlib

626
modules/story.py Normal file
View File

@@ -0,0 +1,626 @@
import json
import os
import random
import time
import config
from modules import ai
from . import utils
def enrich(bp, folder, context=""):
utils.log("ENRICHER", "Fleshing out details from description...")
# If book_metadata is missing, create empty dict so AI can fill it
if 'book_metadata' not in bp: bp['book_metadata'] = {}
if 'characters' not in bp: bp['characters'] = []
if 'plot_beats' not in bp: bp['plot_beats'] = []
prompt = f"""
You are a Creative Director.
The user has provided a minimal description. You must build a full Book Bible.
USER DESCRIPTION: "{bp.get('manual_instruction', 'A generic story')}"
CONTEXT (Sequel): {context}
TASK:
1. Generate a catchy Title.
2. Define the Genre and Tone.
3. Determine the Time Period (e.g. "Modern", "1920s", "Sci-Fi Future").
4. Define Formatting Rules for text messages, thoughts, and chapter headers.
5. Create Protagonist and Antagonist/Love Interest.
- IF SEQUEL: Decide if we continue with previous protagonists or shift to side characters based on USER DESCRIPTION.
- IF NEW CHARACTERS: Create them.
- IF RETURNING: Reuse details from CONTEXT.
6. Outline 5-7 core Plot Beats.
7. Define a 'structure_prompt' describing the narrative arc (e.g. "Hero's Journey", "3-Act Structure", "Detective Procedural").
RETURN JSON in this EXACT format:
{{
"book_metadata": {{ "title": "Book Title", "genre": "Genre", "content_warnings": ["Violence", "Major Character Death"], "structure_prompt": "...", "style": {{ "tone": "Tone", "time_period": "Modern", "formatting_rules": ["Chapter Headers: Number + Title", "Text Messages: Italic", "Thoughts: Italic"] }} }},
"characters": [ {{ "name": "Name", "role": "Role", "description": "Description", "key_events": ["Planned injury in Act 2"] }} ],
"plot_beats": [ "Beat 1", "Beat 2", "..." ]
}}
"""
try:
# Merge AI response with existing data (don't overwrite if user provided specific keys)
response = ai.model_logic.generate_content(prompt)
utils.log_usage(folder, "logic-pro", response.usage_metadata)
response_text = response.text
cleaned_json = utils.clean_json(response_text)
ai_data = json.loads(cleaned_json)
# Smart Merge: Only fill missing fields
if 'book_metadata' not in bp:
bp['book_metadata'] = {}
if 'title' not in bp['book_metadata']:
bp['book_metadata']['title'] = ai_data.get('book_metadata', {}).get('title')
if 'structure_prompt' not in bp['book_metadata']:
bp['book_metadata']['structure_prompt'] = ai_data.get('book_metadata', {}).get('structure_prompt')
if 'content_warnings' not in bp['book_metadata']:
bp['book_metadata']['content_warnings'] = ai_data.get('book_metadata', {}).get('content_warnings', [])
# Merge Style (Flexible)
if 'style' not in bp['book_metadata']:
bp['book_metadata']['style'] = {}
# Handle AI returning legacy keys or new style key
source_style = ai_data.get('book_metadata', {}).get('style', {})
for k, v in source_style.items():
if k not in bp['book_metadata']['style']:
bp['book_metadata']['style'][k] = v
if 'characters' not in bp or not bp['characters']:
bp['characters'] = ai_data.get('characters', [])
if 'plot_beats' not in bp or not bp['plot_beats']:
bp['plot_beats'] = ai_data.get('plot_beats', [])
return bp
except Exception as e:
utils.log("ENRICHER", f"Enrichment failed: {e}")
return bp
def plan_structure(bp, folder):
utils.log("ARCHITECT", "Creating structure...")
if 'plot_outline' in bp and isinstance(bp['plot_outline'], dict):
po = bp['plot_outline']
if 'beats' in po and isinstance(po['beats'], list):
events = []
for act in po['beats']:
if 'plot_points' in act and isinstance(act['plot_points'], list):
for pp in act['plot_points']:
desc = pp.get('description')
point = pp.get('point', 'Event')
if desc: events.append({"description": desc, "purpose": point})
if events:
utils.log("ARCHITECT", f"Using {len(events)} events from Plot Outline as base structure.")
return events
structure_type = bp.get('book_metadata', {}).get('structure_prompt')
if not structure_type:
label = bp.get('length_settings', {}).get('label', 'Novel')
structures = {
"Chapter Book": "Create a simple episodic structure with clear chapter hooks.",
"Young Adult": "Create a character-driven arc with high emotional stakes and a clear 'Coming of Age' theme.",
"Flash Fiction": "Create a single, impactful scene structure with a twist.",
"Short Story": "Create a concise narrative arc (Inciting Incident -> Rising Action -> Climax -> Resolution).",
"Novella": "Create a standard 3-Act Structure.",
"Novel": "Create a detailed 3-Act Structure with A and B plots.",
"Epic": "Create a complex, multi-arc structure (Hero's Journey) with extensive world-building events."
}
structure_type = structures.get(label, "Create a 3-Act Structure.")
beats_context = []
if 'plot_outline' in bp and isinstance(bp['plot_outline'], dict):
po = bp['plot_outline']
if 'beats' in po:
for act in po['beats']:
beats_context.append(f"ACT {act.get('act', '?')}: {act.get('title', '')} - {act.get('summary', '')}")
for pp in act.get('plot_points', []):
beats_context.append(f" * {pp.get('point', 'Beat')}: {pp.get('description', '')}")
if not beats_context:
beats_context = bp.get('plot_beats', [])
prompt = f"{structure_type}\nTITLE: {bp['book_metadata']['title']}\nBEATS: {json.dumps(beats_context)}\nReturn JSON: {{'events': [{{'description':'...', 'purpose':'...'}}]}}"
try:
response = ai.model_logic.generate_content(prompt)
utils.log_usage(folder, "logic-pro", response.usage_metadata)
return json.loads(utils.clean_json(response.text))['events']
except:
return []
def expand(events, pass_num, target_chapters, bp, folder):
utils.log("ARCHITECT", f"Expansion pass {pass_num} | Current Beats: {len(events)} | Target Chaps: {target_chapters}")
beats_context = []
if 'plot_outline' in bp and isinstance(bp['plot_outline'], dict):
po = bp['plot_outline']
if 'beats' in po:
for act in po['beats']:
beats_context.append(f"ACT {act.get('act', '?')}: {act.get('title', '')} - {act.get('summary', '')}")
for pp in act.get('plot_points', []):
beats_context.append(f" * {pp.get('point', 'Beat')}: {pp.get('description', '')}")
if not beats_context:
beats_context = bp.get('plot_beats', [])
prompt = f"""
You are a Story Architect.
Goal: Flesh out this outline for a {target_chapters}-chapter book.
Current Status: {len(events)} beats.
ORIGINAL OUTLINE:
{json.dumps(beats_context)}
INSTRUCTIONS:
1. Look for jumps in time or logic.
2. Insert new intermediate events to smooth the pacing.
3. Deepen subplots while staying true to the ORIGINAL OUTLINE.
4. Do NOT remove or drastically alter the original outline points; expand AROUND them.
CURRENT EVENTS:
{json.dumps(events)}
Return JSON: {{'events': [ ...updated full list... ]}}
"""
try:
response = ai.model_logic.generate_content(prompt)
utils.log_usage(folder, "logic-pro", response.usage_metadata)
new_events = json.loads(utils.clean_json(response.text))['events']
if len(new_events) > len(events):
utils.log("ARCHITECT", f" -> Added {len(new_events) - len(events)} new beats.")
elif len(str(new_events)) > len(str(events)) + 20:
utils.log("ARCHITECT", f" -> Fleshed out descriptions (Text grew by {len(str(new_events)) - len(str(events))} chars).")
else:
utils.log("ARCHITECT", " -> No significant changes.")
return new_events
except Exception as e:
utils.log("ARCHITECT", f" -> Pass skipped due to error: {e}")
return events
def create_chapter_plan(events, bp, folder):
utils.log("ARCHITECT", "Finalizing Chapters...")
target = bp['length_settings']['chapters']
words = bp['length_settings'].get('words', 'Flexible')
include_prologue = bp.get('length_settings', {}).get('include_prologue', False)
include_epilogue = bp.get('length_settings', {}).get('include_epilogue', False)
structure_instructions = ""
if include_prologue: structure_instructions += "- Include a 'Prologue' (chapter_number: 0) to set the scene.\n"
if include_epilogue: structure_instructions += "- Include an 'Epilogue' (chapter_number: 'Epilogue') to wrap up.\n"
meta = bp.get('book_metadata', {})
style = meta.get('style', {})
pov_chars = style.get('pov_characters', [])
pov_instruction = ""
if pov_chars:
pov_instruction = f"- Assign a 'pov_character' for each chapter from this list: {json.dumps(pov_chars)}."
prompt = f"""
Group events into Chapters.
TARGET CHAPTERS: {target} (Approximate. Feel free to adjust +/- 20% for better pacing).
TARGET WORDS: {words} (Total for the book).
INSTRUCTIONS:
- Vary chapter pacing. Options: 'Very Fast', 'Fast', 'Standard', 'Slow', 'Very Slow'.
- Assign an estimated word count to each chapter based on its pacing and content.
{structure_instructions}
{pov_instruction}
EVENTS: {json.dumps(events)}
Return JSON: [{{'chapter_number':1, 'title':'...', 'pov_character': 'Name', 'pacing': 'Standard', 'estimated_words': 2000, 'beats':[...]}}]
"""
try:
response = ai.model_logic.generate_content(prompt)
utils.log_usage(folder, "logic-pro", response.usage_metadata)
plan = json.loads(utils.clean_json(response.text))
target_str = str(words).lower().replace(',', '').replace('k', '000').replace('+', '').replace(' ', '')
target_val = 0
if '-' in target_str:
try:
parts = target_str.split('-')
target_val = int((int(parts[0]) + int(parts[1])) / 2)
except: pass
else:
try: target_val = int(target_str)
except: pass
if target_val > 0:
variance = random.uniform(0.90, 1.10)
target_val = int(target_val * variance)
utils.log("ARCHITECT", f"Target adjusted with variance ({variance:.2f}x): {target_val} words.")
current_sum = sum(int(c.get('estimated_words', 0)) for c in plan)
if current_sum > 0:
factor = target_val / current_sum
utils.log("ARCHITECT", f"Adjusting chapter lengths by {factor:.2f}x to match target.")
for c in plan:
c['estimated_words'] = int(c.get('estimated_words', 0) * factor)
return plan
except Exception as e:
utils.log("ARCHITECT", f"Failed to create chapter plan: {e}")
return []
def update_tracking(folder, chapter_num, chapter_text, current_tracking):
utils.log("TRACKER", f"Updating world state & character visuals for Ch {chapter_num}...")
prompt = f"""
Analyze this chapter text to update the Story Bible.
CURRENT TRACKING DATA:
{json.dumps(current_tracking)}
NEW CHAPTER TEXT:
{chapter_text[:500000]}
TASK:
1. EVENTS: Append 1-3 concise bullet points summarizing key plot events in this chapter to the 'events' list.
2. CHARACTERS: Update entries for any characters appearing in the scene.
- "descriptors": List of strings. Add PERMANENT physical traits (height, hair, eyes), specific items (jewelry, weapons). Avoid duplicates.
- "likes_dislikes": List of strings. Add specific preferences, likes, or dislikes mentioned (e.g., "Hates coffee", "Loves jazz").
- "last_worn": String. Update if specific clothing is described. IMPORTANT: If a significant time jump occurred (e.g. next day) and no new clothing is described, reset this to "Unknown".
- "major_events": List of strings. Log significant life-altering events occurring in THIS chapter (e.g. "Lost an arm", "Married", "Betrayed by X").
3. CONTENT_WARNINGS: List of strings. Identify specific triggers present in this chapter (e.g. "Graphic Violence", "Sexual Assault", "Torture", "Self-Harm"). Append to existing list.
RETURN JSON with the SAME structure as CURRENT TRACKING DATA (events list, characters dict, content_warnings list).
"""
try:
response = ai.model_logic.generate_content(prompt)
utils.log_usage(folder, "logic-pro", response.usage_metadata)
new_data = json.loads(utils.clean_json(response.text))
return new_data
except Exception as e:
utils.log("TRACKER", f"Failed to update tracking: {e}")
return current_tracking
def evaluate_chapter_quality(text, chapter_title, model, folder):
prompt = f"""
Analyze this book chapter text.
CHAPTER TITLE: {chapter_title}
CRITERIA:
1. ORGANIC FEEL: Does it sound like a human wrote it? Are "AI-isms" (e.g. 'testament to', 'tapestry', 'shiver down spine', 'unspoken agreement') absent?
2. ENGAGEMENT: Is it interesting? Does it hook the reader?
3. REPETITION: Is sentence structure varied? Are words repeated unnecessarily?
4. PROGRESSION: Does the story move forward, or is it spinning its wheels?
Rate on a scale of 1-10.
Provide a concise critique focusing on the biggest flaw.
Return JSON: {{'score': int, 'critique': 'string'}}
"""
try:
response = model.generate_content([prompt, text[:30000]])
utils.log_usage(folder, "logic-pro", response.usage_metadata)
data = json.loads(utils.clean_json(response.text))
return data.get('score', 0), data.get('critique', 'No critique provided.')
except Exception as e:
return 0, f"Evaluation error: {str(e)}"
def create_initial_persona(bp, folder):
utils.log("SYSTEM", "Generating initial Author Persona based on genre/tone...")
meta = bp.get('book_metadata', {})
style = meta.get('style', {})
prompt = f"""
Create a fictional 'Author Persona' best suited to write this book.
BOOK DETAILS:
Title: {meta.get('title')}
Genre: {meta.get('genre')}
Tone: {style.get('tone')}
Target Audience: {meta.get('target_audience')}
TASK:
Create a profile for the ideal writer of this book.
Return JSON: {{ "name": "Pen Name", "bio": "Description of writing style (voice, sentence structure, vocabulary)...", "age": "...", "gender": "..." }}
"""
try:
response = ai.model_logic.generate_content(prompt)
utils.log_usage(folder, "logic-pro", response.usage_metadata)
return json.loads(utils.clean_json(response.text))
except Exception as e:
utils.log("SYSTEM", f"Persona generation failed: {e}")
return {"name": "AI Author", "bio": "Standard, balanced writing style."}
def refine_persona(bp, text, folder):
utils.log("SYSTEM", "Refining Author Persona based on recent chapters...")
ad = bp.get('book_metadata', {}).get('author_details', {})
current_bio = ad.get('bio', 'Standard style.')
prompt = f"""
Analyze this text sample from the book.
TEXT:
{text[:3000]}
CURRENT AUTHOR BIO:
{current_bio}
TASK:
Refine the Author Bio to better match the actual text produced.
Highlight specific stylistic quirks, sentence patterns, or vocabulary choices found in the text.
The goal is to ensure future chapters sound exactly like this one.
Return JSON: {{ "bio": "Updated bio..." }}
"""
try:
response = ai.model_logic.generate_content(prompt)
utils.log_usage(folder, "logic-pro", response.usage_metadata)
new_bio = json.loads(utils.clean_json(response.text)).get('bio')
if new_bio:
ad['bio'] = new_bio
utils.log("SYSTEM", " -> Persona bio updated.")
return ad
except: pass
return ad
def write_chapter(chap, bp, folder, prev_sum, tracking=None, prev_content=None):
pacing = chap.get('pacing', 'Standard')
est_words = chap.get('estimated_words', 'Flexible')
utils.log("WRITER", f"Drafting Ch {chap['chapter_number']} ({pacing} | ~{est_words} words): {chap['title']}")
ls = bp['length_settings']
meta = bp.get('book_metadata', {})
style = meta.get('style', {})
pov_char = chap.get('pov_character', '')
ad = meta.get('author_details', {})
if not ad and 'author_bio' in meta:
persona_info = meta['author_bio']
else:
persona_info = f"Name: {ad.get('name', meta.get('author', 'Unknown'))}\n"
if ad.get('age'): persona_info += f"Age: {ad['age']}\n"
if ad.get('gender'): persona_info += f"Gender: {ad['gender']}\n"
if ad.get('race'): persona_info += f"Race: {ad['race']}\n"
if ad.get('nationality'): persona_info += f"Nationality: {ad['nationality']}\n"
if ad.get('language'): persona_info += f"Language: {ad['language']}\n"
if ad.get('bio'): persona_info += f"Style/Bio: {ad['bio']}\n"
samples = []
if ad.get('sample_text'):
samples.append(f"--- SAMPLE PARAGRAPH ---\n{ad['sample_text']}")
if ad.get('sample_files'):
for fname in ad['sample_files']:
fpath = os.path.join(config.PERSONAS_DIR, fname)
if os.path.exists(fpath):
try:
with open(fpath, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read(3000)
samples.append(f"--- SAMPLE FROM {fname} ---\n{content}...")
except: pass
if samples:
persona_info += "\nWRITING STYLE SAMPLES:\n" + "\n".join(samples)
char_visuals = ""
if tracking and 'characters' in tracking:
char_visuals = "\nCHARACTER TRACKING (Visuals & Preferences):\n"
for name, data in tracking['characters'].items():
desc = ", ".join(data.get('descriptors', []))
likes = ", ".join(data.get('likes_dislikes', []))
worn = data.get('last_worn', 'Unknown')
char_visuals += f"- {name}: {desc}\n * Likes/Dislikes: {likes}\n"
major = data.get('major_events', [])
if major: char_visuals += f" * Major Events: {'; '.join(major)}\n"
if worn and worn != 'Unknown':
char_visuals += f" * Last Worn: {worn} (NOTE: Only relevant if scene is continuous from previous chapter)\n"
style_block = "\n".join([f"- {k.replace('_', ' ').title()}: {v}" for k, v in style.items() if isinstance(v, (str, int, float))])
if 'tropes' in style and isinstance(style['tropes'], list):
style_block += f"\n- Tropes: {', '.join(style['tropes'])}"
if 'formatting_rules' in style and isinstance(style['formatting_rules'], list):
style_block += "\n- Formatting Rules:\n * " + "\n * ".join(style['formatting_rules'])
prev_context_block = ""
if prev_content:
prev_context_block = f"\nPREVIOUS CHAPTER TEXT (For Tone & Continuity):\n{prev_content}\n"
prompt = f"""
Write Chapter {chap['chapter_number']}: {chap['title']}
PACING GUIDE:
- Format: {ls.get('label', 'Story')}
- Chapter Pacing: {pacing}
- Target Word Count: ~{est_words} (Use this as a guide, but prioritize story flow. Allow flexibility.)
- POV Character: {pov_char if pov_char else 'Protagonist'}
STYLE & FORMATTING:
{style_block}
AUTHOR VOICE (CRITICAL):
{persona_info}
INSTRUCTION:
Write the scene.
- Start with the Chapter Header formatted as Markdown H1 (e.g. '# Chapter X: Title'). Follow the 'Formatting Rules' for the header style.
- DEEP POV: Immerse the reader in the POV character's immediate experience. Filter descriptions through their specific worldview and emotional state.
- SHOW, DON'T TELL: Focus on immediate action and internal reaction. Don't summarize feelings; show the physical manifestation of them.
- SENSORY DETAILS: Use specific, grounding sensory details (smell, touch, sound) rather than generic descriptions.
- AVOID CLICHÉS: Avoid common AI tropes (e.g., 'shiver down spine', 'palpable tension', 'unspoken agreement', 'testament to').
- MAINTAIN CONTINUITY: Pay close attention to the PREVIOUS CONTEXT. Characters must NOT know things that haven't happened yet or haven't been revealed to them.
- CHARACTER INTERACTIONS: If characters are meeting for the first time in the summary, treat them as strangers.
- SENTENCE VARIETY: Avoid repetitive sentence structures (e.g. starting multiple sentences with "He" or "She"). Vary sentence length to create rhythm.
- 'Very Fast': Rapid fire, pure action/dialogue, minimal description.
- 'Fast': Punchy, keep it moving.
- 'Standard': Balanced dialogue and description.
- 'Slow': Detailed, atmospheric, immersive.
- 'Very Slow': Deep introspection, heavy sensory detail, slow burn.
PREVIOUS CONTEXT (Story So Far): {prev_sum}
{prev_context_block}
CHARACTERS: {json.dumps(bp['characters'])}
{char_visuals}
SCENE BEATS: {json.dumps(chap['beats'])}
Output Markdown.
"""
current_text = ""
try:
resp_draft = ai.model_writer.generate_content(prompt)
utils.log_usage(folder, "writer-flash", resp_draft.usage_metadata)
current_text = resp_draft.text
except Exception as e:
utils.log("WRITER", f"⚠️ Failed Ch {chap['chapter_number']}: {e}")
return f"## Chapter {chap['chapter_number']} Failed\n\nError: {e}"
# Refinement Loop
max_attempts = 3
best_score = 0
best_text = current_text
for attempt in range(1, max_attempts + 1):
utils.log("WRITER", f" -> Evaluating Ch {chap['chapter_number']} (Attempt {attempt}/{max_attempts})...")
score, critique = evaluate_chapter_quality(current_text, chap['title'], ai.model_logic, folder)
if "Evaluation error" in critique:
utils.log("WRITER", f" ⚠️ {critique}. Keeping current draft.")
if best_score == 0: best_text = current_text
break
utils.log("WRITER", f" Score: {score}/10. Critique: {critique}")
if score >= 8:
utils.log("WRITER", " Quality threshold met.")
return current_text
if score > best_score:
best_score = score
best_text = current_text
if attempt == max_attempts:
utils.log("WRITER", " Max attempts reached. Using best version.")
return best_text
utils.log("WRITER", f" -> Refining Ch {chap['chapter_number']} based on feedback...")
refine_prompt = f"""
Act as a Senior Editor. Rewrite this chapter to fix the issues identified below.
CRITIQUE TO ADDRESS:
{critique}
ADDITIONAL OBJECTIVES:
1. NATURAL FLOW: Fix stilted phrasing. Ensure the prose flows naturally for the genre ({meta.get('genre', 'Fiction')}) and tone ({style.get('tone', 'Standard')}).
2. HUMANIZATION: Remove robotic phrasing. Ensure dialogue has subtext, interruptions, and distinct voices. Remove "AI-isms" (e.g. 'testament to', 'tapestry of', 'symphony of').
3. SENTENCE VARIETY: Check for and fix repetitive sentence starts or uniform sentence lengths. The prose should have a dynamic rhythm.
4. CONTINUITY: Ensure consistency with the Story So Far.
STORY SO FAR:
{prev_sum}
{prev_context_block}
CURRENT DRAFT:
{current_text}
Return the polished, final version of the chapter in Markdown.
"""
try:
resp_refine = ai.model_writer.generate_content(refine_prompt)
utils.log_usage(folder, "writer-flash", resp_refine.usage_metadata)
current_text = resp_refine.text
except Exception as e:
utils.log("WRITER", f"Refinement failed: {e}")
return best_text
return best_text
def harvest_metadata(bp, folder, full_manuscript):
utils.log("HARVESTER", "Scanning for new characters...")
full_text = "\n".join([c['content'] for c in full_manuscript])[:50000]
prompt = f"Identify new significant characters NOT in:\n{json.dumps(bp['characters'])}\nTEXT:\n{full_text}\nReturn JSON: {{'new_characters': [{{'name':'...', 'role':'...', 'description':'...'}}]}}"
try:
response = ai.model_logic.generate_content(prompt)
utils.log_usage(folder, "logic-pro", response.usage_metadata)
new_chars = json.loads(utils.clean_json(response.text)).get('new_characters', [])
if new_chars:
utils.log("HARVESTER", f"Found {len(new_chars)} new chars.")
bp['characters'].extend(new_chars)
except: pass
return bp
def update_persona_sample(bp, folder):
utils.log("SYSTEM", "Extracting author persona from manuscript...")
ms_path = os.path.join(folder, "manuscript.json")
if not os.path.exists(ms_path): return
ms = utils.load_json(ms_path)
if not ms: return
# 1. Extract Text Sample
full_text = "\n".join([c.get('content', '') for c in ms])
if len(full_text) < 500: return
# 2. Save Sample File
if not os.path.exists(config.PERSONAS_DIR): os.makedirs(config.PERSONAS_DIR)
meta = bp.get('book_metadata', {})
safe_title = "".join([c for c in meta.get('title', 'book') if c.isalnum() or c=='_']).replace(" ", "_")[:20]
timestamp = int(time.time())
filename = f"sample_{safe_title}_{timestamp}.txt"
filepath = os.path.join(config.PERSONAS_DIR, filename)
sample_text = full_text[:3000]
with open(filepath, 'w', encoding='utf-8') as f: f.write(sample_text)
# 3. Update or Create Persona
author_name = meta.get('author', 'Unknown Author')
personas = {}
if os.path.exists(config.PERSONAS_FILE):
try:
with open(config.PERSONAS_FILE, 'r') as f: personas = json.load(f)
except: pass
if author_name not in personas:
utils.log("SYSTEM", f"Generating new persona profile for '{author_name}'...")
prompt = f"Analyze this writing style (Tone, Voice, Vocabulary). Write a 1-sentence author bio describing it.\nTEXT: {sample_text[:1000]}"
try:
response = ai.model_logic.generate_content(prompt)
utils.log_usage(folder, "logic-pro", response.usage_metadata)
bio = response.text.strip()
except: bio = "Style analysis unavailable."
personas[author_name] = {
"name": author_name,
"bio": bio,
"sample_files": [filename],
"sample_text": sample_text[:500]
}
else:
utils.log("SYSTEM", f"Updating persona '{author_name}' with new sample.")
if 'sample_files' not in personas[author_name]: personas[author_name]['sample_files'] = []
if filename not in personas[author_name]['sample_files']:
personas[author_name]['sample_files'].append(filename)
with open(config.PERSONAS_FILE, 'w') as f: json.dump(personas, f, indent=2)
def refine_bible(bible, instruction, folder):
utils.log("SYSTEM", f"Refining Bible with instruction: {instruction}")
prompt = f"""
Act as a Book Editor.
CURRENT JSON: {json.dumps(bible)}
USER INSTRUCTION: {instruction}
TASK: Update the JSON based on the instruction. Maintain valid JSON structure.
RETURN ONLY THE JSON.
"""
try:
response = ai.model_logic.generate_content(prompt)
utils.log_usage(folder, "logic-pro", response.usage_metadata)
new_data = json.loads(utils.clean_json(response.text))
return new_data
except Exception as e:
utils.log("SYSTEM", f"Refinement failed: {e}")
return None

202
modules/utils.py Normal file
View File

@@ -0,0 +1,202 @@
import os
import json
import datetime
import time
import config
import threading
SAFETY_SETTINGS = [
{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"},
]
# Thread-local storage for logging context
_log_context = threading.local()
def set_log_file(filepath):
_log_context.log_file = filepath
def set_log_callback(callback):
_log_context.callback = callback
def clean_json(text):
text = text.replace("```json", "").replace("```", "").strip()
# Robust extraction: find first { or [ and last } or ]
start_obj = text.find('{')
start_arr = text.find('[')
if start_obj == -1 and start_arr == -1: return text
if start_obj != -1 and (start_arr == -1 or start_obj < start_arr):
return text[start_obj:text.rfind('}')+1]
else:
return text[start_arr:text.rfind(']')+1]
# --- SHARED UTILS ---
def log(phase, msg):
timestamp = datetime.datetime.now().strftime('%H:%M:%S')
line = f"[{timestamp}] {phase:<15} | {msg}"
print(line)
# Write to thread-specific log file if set
if getattr(_log_context, 'log_file', None):
with open(_log_context.log_file, "a", encoding="utf-8") as f:
f.write(line + "\n")
# Trigger callback if set (e.g. for Database logging)
if getattr(_log_context, 'callback', None):
try: _log_context.callback(phase, msg)
except: pass
def load_json(path):
return json.load(open(path, 'r')) if os.path.exists(path) else None
def create_default_personas():
# Initialize empty personas file if it doesn't exist
if not os.path.exists(config.PERSONAS_DIR): os.makedirs(config.PERSONAS_DIR)
if not os.path.exists(config.PERSONAS_FILE):
try:
with open(config.PERSONAS_FILE, 'w') as f: json.dump({}, f, indent=2)
except: pass
def get_length_presets():
"""Returns a dict mapping Label -> Settings for use in main.py"""
presets = {}
for k, v in config.LENGTH_DEFINITIONS.items():
presets[v['label']] = v
return presets
def log_image_attempt(folder, img_type, prompt, filename, status, error=None, score=None, critique=None):
log_path = os.path.join(folder, "image_log.json")
entry = {
"timestamp": int(time.time()),
"type": img_type,
"prompt": prompt,
"filename": filename,
"status": status,
"error": str(error) if error else None,
"score": score,
"critique": critique
}
data = []
if os.path.exists(log_path):
try:
with open(log_path, 'r') as f: data = json.load(f)
except:
pass
data.append(entry)
with open(log_path, 'w') as f: json.dump(data, f, indent=2)
def get_run_folder(base_name):
if not os.path.exists(base_name): os.makedirs(base_name)
runs = [d for d in os.listdir(base_name) if d.startswith("run_")]
next_num = max([int(r.split("_")[1]) for r in runs if r.split("_")[1].isdigit()] + [0]) + 1
folder = os.path.join(base_name, f"run_{next_num}")
os.makedirs(folder)
return folder
def get_latest_run_folder(base_name):
if not os.path.exists(base_name): return None
runs = [d for d in os.listdir(base_name) if d.startswith("run_")]
if not runs: return None
runs.sort(key=lambda x: int(x.split('_')[1]) if x.split('_')[1].isdigit() else 0)
return os.path.join(base_name, runs[-1])
def log_usage(folder, model_label, usage_metadata=None, image_count=0):
if not folder or not os.path.exists(folder): return
log_path = os.path.join(folder, "usage_log.json")
entry = {
"timestamp": int(time.time()),
"model": model_label,
"input_tokens": 0,
"output_tokens": 0,
"images": image_count
}
if usage_metadata:
try:
entry["input_tokens"] = usage_metadata.prompt_token_count
entry["output_tokens"] = usage_metadata.candidates_token_count
except: pass
data = {"log": [], "totals": {"input_tokens": 0, "output_tokens": 0, "images": 0, "est_cost_usd": 0.0}}
if os.path.exists(log_path):
try:
loaded = json.load(open(log_path, 'r'))
if isinstance(loaded, list): data["log"] = loaded
else: data = loaded
except: pass
data["log"].append(entry)
# Recalculate totals
t_in = sum(x.get('input_tokens', 0) for x in data["log"])
t_out = sum(x.get('output_tokens', 0) for x in data["log"])
t_img = sum(x.get('images', 0) for x in data["log"])
cost = 0.0
for x in data["log"]:
m = x.get('model', '').lower()
i = x.get('input_tokens', 0)
o = x.get('output_tokens', 0)
imgs = x.get('images', 0)
if 'flash' in m:
cost += (i / 1_000_000 * 0.075) + (o / 1_000_000 * 0.30)
elif 'pro' in m or 'logic' in m:
cost += (i / 1_000_000 * 3.50) + (o / 1_000_000 * 10.50)
elif 'imagen' in m or imgs > 0:
cost += (imgs * 0.04)
data["totals"] = {
"input_tokens": t_in,
"output_tokens": t_out,
"images": t_img,
"est_cost_usd": round(cost, 4)
}
with open(log_path, 'w') as f: json.dump(data, f, indent=2)
def normalize_settings(bp):
"""
CRITICAL: Enforces defaults.
1. If series_metadata is missing, force it to SINGLE mode.
2. If length_settings is missing, force explicit numbers.
"""
# Force Series Default (1 Book)
if 'series_metadata' not in bp:
bp['series_metadata'] = {
"is_series": False,
"mode": "single",
"series_title": "Standalone",
"total_books_to_generate": 1
}
# Check for empty series count just in case
if bp['series_metadata'].get('total_books_to_generate') is None:
bp['series_metadata']['total_books_to_generate'] = 1
# Force Length Defaults
settings = bp.get('length_settings', {})
label = settings.get('label', 'Novella') # Default to Novella if nothing provided
# Get defaults based on label (or Novella if unknown)
presets = get_length_presets()
defaults = presets.get(label, presets['Novella'])
if 'chapters' not in settings: settings['chapters'] = defaults['chapters']
if 'words' not in settings: settings['words'] = defaults['words']
# Smart Depth Calculation (if not manually set)
if 'depth' not in settings:
c = int(settings['chapters'])
if c <= 5: settings['depth'] = 1
elif c <= 20: settings['depth'] = 2
elif c <= 40: settings['depth'] = 3
else: settings['depth'] = 4
bp['length_settings'] = settings
return bp

1132
modules/web_app.py Normal file

File diff suppressed because it is too large Load Diff

47
modules/web_db.py Normal file
View File

@@ -0,0 +1,47 @@
from flask_sqlalchemy import SQLAlchemy
from flask_login import UserMixin
from datetime import datetime
db = SQLAlchemy()
class User(UserMixin, db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(150), unique=True, nullable=False)
password = db.Column(db.String(150), nullable=False)
api_key = db.Column(db.String(200), nullable=True) # Optional: User-specific Gemini Key
total_spend = db.Column(db.Float, default=0.0)
is_admin = db.Column(db.Boolean, default=False)
class Project(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
name = db.Column(db.String(150), nullable=False)
folder_path = db.Column(db.String(300), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
# Relationships
runs = db.relationship('Run', backref='project', lazy=True, cascade="all, delete-orphan")
class Run(db.Model):
id = db.Column(db.Integer, primary_key=True)
project_id = db.Column(db.Integer, db.ForeignKey('project.id'), nullable=False)
status = db.Column(db.String(50), default="queued") # queued, running, completed, failed
start_time = db.Column(db.DateTime, default=datetime.utcnow)
end_time = db.Column(db.DateTime, nullable=True)
log_file = db.Column(db.String(300), nullable=True)
cost = db.Column(db.Float, default=0.0)
# Relationships
logs = db.relationship('LogEntry', backref='run', lazy=True, cascade="all, delete-orphan")
def duration(self):
if self.end_time and self.start_time:
return str(self.end_time - self.start_time).split('.')[0]
return "Running..."
class LogEntry(db.Model):
id = db.Column(db.Integer, primary_key=True)
run_id = db.Column(db.Integer, db.ForeignKey('run.id'), nullable=False)
timestamp = db.Column(db.DateTime, default=datetime.utcnow)
phase = db.Column(db.String(50))
message = db.Column(db.Text)

218
modules/web_tasks.py Normal file
View File

@@ -0,0 +1,218 @@
import os
import json
import time
import sqlite3
import shutil
from datetime import datetime
from huey import SqliteHuey
from .web_db import db, Run, User, Project
from . import utils
import main
import config
# Configure Huey (Task Queue)
huey = SqliteHuey('bookapp_queue', filename=os.path.join(config.DATA_DIR, 'queue.db'))
def db_log_callback(db_path, run_id, phase, msg):
"""Writes log entry directly to SQLite to avoid Flask Context issues in threads."""
for _ in range(5):
try:
with sqlite3.connect(db_path, timeout=5) as conn:
conn.execute("INSERT INTO log_entry (run_id, timestamp, phase, message) VALUES (?, ?, ?, ?)",
(run_id, datetime.utcnow(), phase, str(msg)))
break
except sqlite3.OperationalError:
time.sleep(0.1)
except: break
@huey.task()
def generate_book_task(run_id, project_path, bible_path, allow_copy=True):
"""
Background task to run the book generation.
"""
# 1. Setup Logging
log_filename = f"system_log_{run_id}.txt"
log_path = os.path.join(project_path, "runs", "bible", f"run_{run_id}", log_filename)
# Log to project root initially until run folder is created by main
initial_log = os.path.join(project_path, log_filename)
utils.set_log_file(initial_log)
# Hook up Database Logging
db_path = os.path.join(config.DATA_DIR, "bookapp.db")
utils.set_log_callback(lambda p, m: db_log_callback(db_path, run_id, p, m))
# Set Status to Running
try:
with sqlite3.connect(db_path, timeout=10) as conn:
conn.execute("UPDATE run SET status = 'running' WHERE id = ?", (run_id,))
except: pass
utils.log("SYSTEM", f"Starting Job #{run_id}")
try:
# 1.5 Copy Forward Logic (Series Optimization)
# Check for previous runs and copy completed books to skip re-generation
runs_dir = os.path.join(project_path, "runs", "bible")
if allow_copy and os.path.exists(runs_dir):
# Get all run folders except current
all_runs = [d for d in os.listdir(runs_dir) if d.startswith("run_") and d != f"run_{run_id}"]
# Sort by ID (ascending)
all_runs.sort(key=lambda x: int(x.split('_')[1]) if x.split('_')[1].isdigit() else 0)
if all_runs:
latest_run_dir = os.path.join(runs_dir, all_runs[-1])
current_run_dir = os.path.join(runs_dir, f"run_{run_id}")
if not os.path.exists(current_run_dir): os.makedirs(current_run_dir)
utils.log("SYSTEM", f"Checking previous run ({all_runs[-1]}) for completed books...")
for item in os.listdir(latest_run_dir):
# Copy only folders that look like books and have a manuscript
if item.startswith("Book_") and os.path.isdir(os.path.join(latest_run_dir, item)):
if os.path.exists(os.path.join(latest_run_dir, item, "manuscript.json")):
src = os.path.join(latest_run_dir, item)
dst = os.path.join(current_run_dir, item)
try:
shutil.copytree(src, dst)
utils.log("SYSTEM", f" -> Copied {item} (Skipping generation).")
except Exception as e:
utils.log("SYSTEM", f" -> Failed to copy {item}: {e}")
# 2. Run Generation
# We call the existing entry point
main.run_generation(bible_path, specific_run_id=run_id)
utils.log("SYSTEM", "Job Complete.")
status = "completed"
except Exception as e:
utils.log("ERROR", f"Job Failed: {e}")
status = "failed"
# 3. Calculate Cost & Cleanup
# Use the specific run folder we know main.py used
run_dir = os.path.join(project_path, "runs", "bible", f"run_{run_id}")
total_cost = 0.0
final_log_path = initial_log
if os.path.exists(run_dir):
# Move our log file there
final_log_path = os.path.join(run_dir, "web_console.log")
if os.path.exists(initial_log):
try:
os.rename(initial_log, final_log_path)
except OSError:
# If rename fails (e.g. across filesystems), copy and delete
shutil.copy2(initial_log, final_log_path)
os.remove(initial_log)
# Calculate Total Cost from all Book subfolders
# usage_log.json is inside each Book folder
for item in os.listdir(run_dir):
item_path = os.path.join(run_dir, item)
if os.path.isdir(item_path) and item.startswith("Book_"):
usage_path = os.path.join(item_path, "usage_log.json")
if os.path.exists(usage_path):
data = utils.load_json(usage_path)
total_cost += data.get('totals', {}).get('est_cost_usd', 0.0)
# 4. Update Database with Final Status
try:
with sqlite3.connect(db_path, timeout=10) as conn:
conn.execute("UPDATE run SET status = ?, cost = ?, end_time = ?, log_file = ? WHERE id = ?",
(status, total_cost, datetime.utcnow(), final_log_path, run_id))
except Exception as e:
print(f"Failed to update run status in DB: {e}")
return {"run_id": run_id, "status": status, "cost": total_cost, "final_log": final_log_path}
@huey.task()
def regenerate_artifacts_task(run_id, project_path, feedback=None):
# Hook up Database Logging & Status
db_path = os.path.join(config.DATA_DIR, "bookapp.db")
# Truncate log file to ensure clean slate
log_filename = f"system_log_{run_id}.txt"
initial_log = os.path.join(project_path, log_filename)
with open(initial_log, 'w', encoding='utf-8') as f: f.write("")
utils.set_log_file(initial_log)
utils.set_log_callback(lambda p, m: db_log_callback(db_path, run_id, p, m))
try:
with sqlite3.connect(db_path) as conn:
conn.execute("UPDATE run SET status = 'running' WHERE id = ?", (run_id,))
except: pass
utils.log("SYSTEM", "Starting Artifact Regeneration...")
# 1. Setup Paths
run_dir = os.path.join(project_path, "runs", "bible", f"run_{run_id}")
# Detect Book Subfolder
book_dir = run_dir
if os.path.exists(run_dir):
subdirs = sorted([d for d in os.listdir(run_dir) if os.path.isdir(os.path.join(run_dir, d)) and d.startswith("Book_")])
if subdirs: book_dir = os.path.join(run_dir, subdirs[0])
bible_path = os.path.join(project_path, "bible.json")
if not os.path.exists(run_dir) or not os.path.exists(bible_path):
utils.log("ERROR", "Run directory or Bible not found.")
return
# 2. Load Data
bible = utils.load_json(bible_path)
final_bp_path = os.path.join(book_dir, "final_blueprint.json")
ms_path = os.path.join(book_dir, "manuscript.json")
if not os.path.exists(final_bp_path) or not os.path.exists(ms_path):
utils.log("ERROR", f"Blueprint or Manuscript not found in {book_dir}")
return
bp = utils.load_json(final_bp_path)
ms = utils.load_json(ms_path)
# 3. Update Blueprint with new Metadata from Bible
meta = bible.get('project_metadata', {})
if 'book_metadata' in bp:
# Sync all core metadata
for k in ['author', 'genre', 'target_audience', 'style']:
if k in meta:
bp['book_metadata'][k] = meta[k]
if bp.get('series_metadata', {}).get('is_series'):
bp['series_metadata']['series_title'] = meta.get('title', bp['series_metadata'].get('series_title'))
# Find specific book title from Bible
b_num = bp['series_metadata'].get('book_number')
for b in bible.get('books', []):
if b.get('book_number') == b_num:
bp['book_metadata']['title'] = b.get('title', bp['book_metadata'].get('title'))
break
else:
bp['book_metadata']['title'] = meta.get('title', bp['book_metadata'].get('title'))
with open(final_bp_path, 'w') as f: json.dump(bp, f, indent=2)
# 4. Regenerate
try:
main.ai.init_models()
tracking = None
events_path = os.path.join(book_dir, "tracking_events.json")
if os.path.exists(events_path):
tracking = {"events": utils.load_json(events_path), "characters": utils.load_json(os.path.join(book_dir, "tracking_characters.json"))}
main.marketing.generate_cover(bp, book_dir, tracking, feedback=feedback)
main.export.compile_files(bp, ms, book_dir)
utils.log("SYSTEM", "Regeneration Complete.")
final_status = 'completed'
except Exception as e:
utils.log("ERROR", f"Regeneration Failed: {e}")
final_status = 'failed'
try:
with sqlite3.connect(db_path) as conn:
conn.execute("UPDATE run SET status = ? WHERE id = ?", (final_status, run_id))
except: pass