Add Ollama embedding support and improve prayer system with public/private visibility

- Add Ollama fallback support in vector search with Azure OpenAI as primary
- Enhance prayer system with public/private visibility options and language filtering
- Update OG image to use new biblical-guide-og-image.png
- Improve prayer request management with better categorization
- Remove deprecated ingest_json_pgvector.py script

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-09-28 19:25:49 +00:00
parent 2d27eae756
commit e4b815cb40
8 changed files with 457 additions and 320 deletions

View File

@@ -1,4 +1,4 @@
import os, re, json, math, time, asyncio
import os, re, json, math, time, asyncio, glob
from typing import List, Dict, Tuple, Iterable
from dataclasses import dataclass
from pathlib import Path
@@ -13,30 +13,28 @@ AZ_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT", "").rstrip("/")
AZ_API_KEY = os.getenv("AZURE_OPENAI_KEY")
AZ_API_VER = os.getenv("AZURE_OPENAI_API_VERSION", "2024-05-01-preview")
AZ_DEPLOYMENT = os.getenv("AZURE_OPENAI_EMBED_DEPLOYMENT", "embed-3")
EMBED_DIMS = int(os.getenv("EMBED_DIMS", "3072"))
EMBED_DIMS = int(os.getenv("EMBED_DIMS", "1536"))
DB_URL = os.getenv("DATABASE_URL")
BIBLE_MD_PATH = os.getenv("BIBLE_MD_PATH")
LANG_CODE = os.getenv("LANG_CODE", "ro")
TRANSLATION = os.getenv("TRANSLATION_CODE", "FIDELA")
BIBLE_JSON_DIR = os.getenv("BIBLE_JSON_DIR", "/root/biblical-guide/bibles/json")
VECTOR_SCHEMA = os.getenv("VECTOR_SCHEMA", "ai_bible")
MIN_FILE_SIZE = int(os.getenv("MIN_FILE_SIZE", "512000")) # 500KB in bytes
assert AZ_ENDPOINT and AZ_API_KEY and DB_URL and BIBLE_MD_PATH, "Missing required env vars"
assert AZ_ENDPOINT and AZ_API_KEY and DB_URL and BIBLE_JSON_DIR, "Missing required env vars"
EMBED_URL = f"{AZ_ENDPOINT}/openai/deployments/{AZ_DEPLOYMENT}/embeddings?api-version={AZ_API_VER}"
BOOKS_OT = [
"Geneza","Exodul","Leviticul","Numeri","Deuteronom","Iosua","Judecători","Rut",
"1 Samuel","2 Samuel","1 Imparati","2 Imparati","1 Cronici","2 Cronici","Ezra","Neemia","Estera",
"Iov","Psalmii","Proverbe","Eclesiastul","Cântarea Cântărilor","Isaia","Ieremia","Plângerile",
"Ezechiel","Daniel","Osea","Ioel","Amos","Obadia","Iona","Mica","Naum","Habacuc","Țefania","Hagai","Zaharia","Maleahi"
]
BOOKS_NT = [
"Matei","Marcu","Luca","Ioan","Faptele Apostolilor","Romani","1 Corinteni","2 Corinteni",
"Galateni","Efeseni","Filipeni","Coloseni","1 Tesaloniceni","2 Tesaloniceni","1 Timotei","2 Timotei",
"Titus","Filimon","Evrei","Iacov","1 Petru","2 Petru","1 Ioan","2 Ioan","3 Ioan","Iuda","Revelaţia"
]
def get_large_bible_files():
"""Get all bible JSON files larger than MIN_FILE_SIZE"""
bible_files = []
pattern = os.path.join(BIBLE_JSON_DIR, "*_bible.json")
BOOK_CANON = {b:("OT" if b in BOOKS_OT else "NT") for b in BOOKS_OT + BOOKS_NT}
for filepath in glob.glob(pattern):
file_size = os.path.getsize(filepath)
if file_size >= MIN_FILE_SIZE:
bible_files.append(filepath)
bible_files.sort()
return bible_files
@dataclass
class Verse:
@@ -52,58 +50,51 @@ def normalize_text(s: str) -> str:
s = s.replace(" ", " ")
return s
BOOK_RE = re.compile(r"^(?P<book>[A-ZĂÂÎȘȚ][^\n]+?)\s*$")
CH_RE = re.compile(r"^(?i:Capitolul|CApitoLuL)\s+(?P<ch>\d+)\b")
VERSE_RE = re.compile(r"^(?P<v>\d+)\s+(?P<body>.+)$")
def parse_bible_json(json_file_path: str):
"""Parse a Bible JSON file and yield verse data"""
try:
with open(json_file_path, 'r', encoding='utf-8') as f:
bible_data = json.load(f)
def parse_bible_md(md_text: str):
cur_book, cur_ch = None, None
testament = None
is_in_bible_content = False
bible_name = bible_data.get('name', 'Unknown Bible')
abbreviation = bible_data.get('abbreviation', 'UNKNOWN')
language = bible_data.get('language', 'unknown')
for line in md_text.splitlines():
line = line.rstrip()
print(f"Processing: {bible_name} ({abbreviation}, {language})")
# Start processing after "VECHIUL TESTAMENT" or when we find book markers
if line == 'VECHIUL TESTAMENT' or line == 'TESTAMENT' or '' in line:
is_in_bible_content = True
for book in bible_data.get('books', []):
book_name = book.get('name', 'Unknown Book')
testament = book.get('testament', 'Unknown')
if not is_in_bible_content:
continue
# Convert testament to short form for consistency
if 'Old' in testament:
testament = 'OT'
elif 'New' in testament:
testament = 'NT'
# Book detection: … BookName …
book_match = re.match(r'^…\s*(.+?)\s*…$', line)
if book_match:
bname = book_match.group(1).strip()
if bname in BOOK_CANON:
cur_book = bname
testament = BOOK_CANON[bname]
cur_ch = None
print(f"Found book: {bname}")
continue
for chapter in book.get('chapters', []):
chapter_num = chapter.get('chapterNum', 1)
# Chapter detection: Capitolul X or CApitoLuL X
m_ch = CH_RE.match(line)
if m_ch and cur_book:
cur_ch = int(m_ch.group("ch"))
print(f" Chapter {cur_ch}")
continue
for verse in chapter.get('verses', []):
verse_num = verse.get('verseNum', 1)
text_raw = verse.get('text', '')
# Verse detection: starts with number
m_v = VERSE_RE.match(line)
if m_v and cur_book and cur_ch:
vnum = int(m_v.group("v"))
body = m_v.group("body").strip()
if text_raw: # Only process non-empty verses
text_norm = normalize_text(text_raw)
yield {
"testament": testament,
"book": book_name,
"chapter": chapter_num,
"verse": verse_num,
"text_raw": text_raw,
"text_norm": text_norm,
"language": language,
"translation": abbreviation
}
# Remove paragraph markers
body = re.sub(r'\s*', '', body)
raw = body
norm = normalize_text(body)
yield {
"testament": testament, "book": cur_book, "chapter": cur_ch, "verse": vnum,
"text_raw": raw, "text_norm": norm
}
except Exception as e:
print(f"Error processing {json_file_path}: {e}")
return
async def embed_batch(client, inputs):
payload = {"input": inputs}
@@ -130,18 +121,23 @@ async def embed_batch(client, inputs):
def safe_ident(s: str) -> str:
return re.sub(r"[^a-z0-9_]+", "_", s.lower()).strip("_")
TABLE_BASENAME = f"bv_{safe_ident(LANG_CODE)}_{safe_ident(TRANSLATION)}"
TABLE_FQN = f'"{VECTOR_SCHEMA}"."{TABLE_BASENAME}"'
def get_table_info(language: str, translation: str):
"""Get table name and fully qualified name for a specific bible version"""
table_basename = f"bv_{safe_ident(language)}_{safe_ident(translation)}"
table_fqn = f'"{VECTOR_SCHEMA}"."{table_basename}"'
return table_basename, table_fqn
def create_table_sql() -> str:
def create_table_sql(table_fqn: str) -> str:
return f"""
CREATE SCHEMA IF NOT EXISTS "{VECTOR_SCHEMA}";
CREATE TABLE IF NOT EXISTS {TABLE_FQN} (
CREATE TABLE IF NOT EXISTS {table_fqn} (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
testament TEXT NOT NULL,
book TEXT NOT NULL,
chapter INT NOT NULL,
verse INT NOT NULL,
language TEXT NOT NULL,
translation TEXT NOT NULL,
ref TEXT GENERATED ALWAYS AS (book || ' ' || chapter || ':' || verse) STORED,
text_raw TEXT NOT NULL,
text_norm TEXT NOT NULL,
@@ -152,20 +148,21 @@ def create_table_sql() -> str:
);
"""
def create_indexes_sql() -> str:
def create_indexes_sql(table_fqn: str, table_basename: str) -> str:
return f"""
CREATE UNIQUE INDEX IF NOT EXISTS ux_ref_{TABLE_BASENAME} ON {TABLE_FQN} (book, chapter, verse);
CREATE INDEX IF NOT EXISTS idx_tsv_{TABLE_BASENAME} ON {TABLE_FQN} USING GIN (tsv);
CREATE INDEX IF NOT EXISTS idx_book_ch_{TABLE_BASENAME} ON {TABLE_FQN} (book, chapter);
CREATE INDEX IF NOT EXISTS idx_testament_{TABLE_BASENAME} ON {TABLE_FQN} (testament);
CREATE UNIQUE INDEX IF NOT EXISTS ux_ref_{table_basename} ON {table_fqn} (translation, language, book, chapter, verse);
CREATE INDEX IF NOT EXISTS idx_tsv_{table_basename} ON {table_fqn} USING GIN (tsv);
CREATE INDEX IF NOT EXISTS idx_book_ch_{table_basename} ON {table_fqn} (book, chapter);
CREATE INDEX IF NOT EXISTS idx_testament_{table_basename} ON {table_fqn} (testament);
CREATE INDEX IF NOT EXISTS idx_lang_trans_{table_basename} ON {table_fqn} (language, translation);
"""
def upsert_sql() -> str:
def upsert_sql(table_fqn: str) -> str:
return f"""
INSERT INTO {TABLE_FQN} (testament, book, chapter, verse, text_raw, text_norm, tsv, embedding)
VALUES (%(testament)s, %(book)s, %(chapter)s, %(verse)s, %(text_raw)s, %(text_norm)s,
INSERT INTO {table_fqn} (testament, book, chapter, verse, language, translation, text_raw, text_norm, tsv, embedding)
VALUES (%(testament)s, %(book)s, %(chapter)s, %(verse)s, %(language)s, %(translation)s, %(text_raw)s, %(text_norm)s,
to_tsvector(COALESCE(%(ts_lang)s,'simple')::regconfig, %(text_norm)s), %(embedding)s)
ON CONFLICT (book, chapter, verse) DO UPDATE
ON CONFLICT (translation, language, book, chapter, verse) DO UPDATE
SET text_raw=EXCLUDED.text_raw,
text_norm=EXCLUDED.text_norm,
tsv=EXCLUDED.tsv,
@@ -173,66 +170,188 @@ def upsert_sql() -> str:
updated_at=now();
"""
async def main():
print("Starting Bible embedding ingestion...")
async def process_bible_file(bible_file_path: str, client):
"""Process a single Bible JSON file"""
print(f"\n=== Processing {os.path.basename(bible_file_path)} ===")
md_text = Path(BIBLE_MD_PATH).read_text(encoding="utf-8", errors="ignore")
verses = list(parse_bible_md(md_text))
print(f"Parsed verses: {len(verses)}")
verses = list(parse_bible_json(bible_file_path))
if not verses:
print(f"No verses found in {bible_file_path}, skipping...")
return
batch_size = 128
print(f"Parsed {len(verses):,} verses")
# First create the schema + table structure for this language/version
# Get language and translation from first verse
first_verse = verses[0]
language = first_verse["language"]
translation = first_verse["translation"]
table_basename, table_fqn = get_table_info(language, translation)
# Create schema + table structure for this bible version
with psycopg.connect(DB_URL) as conn:
with conn.cursor() as cur:
print(f"Creating schema '{VECTOR_SCHEMA}' and table {TABLE_FQN} ...")
print(f"Creating table {table_fqn} ...")
cur.execute("CREATE EXTENSION IF NOT EXISTS vector;")
cur.execute(create_table_sql())
cur.execute(create_indexes_sql())
cur.execute(create_table_sql(table_fqn))
cur.execute(create_indexes_sql(table_fqn, table_basename))
conn.commit()
print("Schema/table ready")
# Now process embeddings
async with httpx.AsyncClient() as client:
with psycopg.connect(DB_URL, autocommit=False) as conn:
with conn.cursor() as cur:
for i in range(0, len(verses), batch_size):
batch = verses[i:i+batch_size]
inputs = [v["text_norm"] for v in batch]
# Process embeddings in batches
batch_size = 128
with psycopg.connect(DB_URL, autocommit=False) as conn:
with conn.cursor() as cur:
for i in range(0, len(verses), batch_size):
batch = verses[i:i+batch_size]
inputs = [v["text_norm"] for v in batch]
print(f"Generating embeddings for batch {i//batch_size + 1}/{(len(verses) + batch_size - 1)//batch_size}")
embs = await embed_batch(client, inputs)
print(f"Generating embeddings for batch {i//batch_size + 1}/{(len(verses) + batch_size - 1)//batch_size}")
embs = await embed_batch(client, inputs)
rows = []
for v, e in zip(batch, embs):
rows.append({
**v,
"ts_lang": "romanian" if LANG_CODE.lower().startswith("ro") else ("english" if LANG_CODE.lower().startswith("en") else "simple"),
"embedding": e
})
rows = []
for v, e in zip(batch, embs):
# Determine text search language based on language code
ts_lang = "simple" # default
if v["language"].lower().startswith("ro"):
ts_lang = "romanian"
elif v["language"].lower().startswith("en"):
ts_lang = "english"
elif v["language"].lower().startswith("es"):
ts_lang = "spanish"
elif v["language"].lower().startswith("fr"):
ts_lang = "french"
elif v["language"].lower().startswith("de"):
ts_lang = "german"
elif v["language"].lower().startswith("it"):
ts_lang = "italian"
cur.executemany(upsert_sql(), rows)
conn.commit()
print(f"Upserted {len(rows)} verses... {i+len(rows)}/{len(verses)}")
rows.append({
**v,
"ts_lang": ts_lang,
"embedding": e
})
cur.executemany(upsert_sql(table_fqn), rows)
conn.commit()
print(f"Upserted {len(rows)} verses... {i+len(rows)}/{len(verses)}")
# Create IVFFLAT index after data is loaded
print("Creating IVFFLAT index...")
with psycopg.connect(DB_URL, autocommit=True) as conn:
with conn.cursor() as cur:
cur.execute(f"VACUUM ANALYZE {TABLE_FQN};")
cur.execute(f"VACUUM ANALYZE {table_fqn};")
cur.execute(f"""
CREATE INDEX IF NOT EXISTS idx_vec_ivfflat_{TABLE_BASENAME}
ON {TABLE_FQN} USING ivfflat (embedding vector_cosine_ops)
CREATE INDEX IF NOT EXISTS idx_vec_ivfflat_{table_basename}
ON {table_fqn} USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 200);
""")
print("Bible embedding ingestion completed successfully!")
print(f"{translation} ({language}) completed successfully! Total verses: {len(verses):,}")
# Helpful pgAdmin queries:
print("\nRun these sample queries in pgAdmin:")
print(f"SELECT count(*) FROM {TABLE_FQN};")
print(f"SELECT book, chapter, verse, left(text_raw, 80) AS preview FROM {TABLE_FQN} ORDER BY book, chapter, verse LIMIT 10;")
print(f"SELECT * FROM {TABLE_FQN} WHERE book='Geneza' AND chapter=1 AND verse=1;")
def update_status(status_data):
"""Update the status file for monitoring progress"""
status_file = "/root/biblical-guide/scripts/ingest_status.json"
try:
import json
from datetime import datetime
status_data["last_update"] = datetime.now().isoformat()
with open(status_file, 'w') as f:
json.dump(status_data, f, indent=2)
except Exception as e:
print(f"Warning: Could not update status file: {e}")
async def main():
start_time = time.time()
print("Starting Bible embedding ingestion for all large Bible files...")
print(f"Timestamp: {time.strftime('%Y-%m-%d %H:%M:%S')}")
# Get all Bible files larger than minimum size
bible_files = get_large_bible_files()
if not bible_files:
print(f"No Bible files found larger than {MIN_FILE_SIZE/1024:.0f}KB in {BIBLE_JSON_DIR}")
return
print(f"Found {len(bible_files)} Bible files to process (>{MIN_FILE_SIZE/1024:.0f}KB each)")
# Initialize status tracking
status = {
"status": "running",
"start_time": time.strftime('%Y-%m-%d %H:%M:%S'),
"total_files": len(bible_files),
"processed": 0,
"successful": 0,
"failed": 0,
"current_file": "",
"errors": []
}
update_status(status)
# Process files one by one to avoid memory issues
async with httpx.AsyncClient(timeout=120.0) as client:
successful = 0
failed = 0
failed_files = []
for i, bible_file in enumerate(bible_files, 1):
try:
file_size_mb = os.path.getsize(bible_file) / (1024 * 1024)
filename = os.path.basename(bible_file)
print(f"\n[{i}/{len(bible_files)}] Processing {filename} ({file_size_mb:.1f}MB)")
print(f"Progress: {(i-1)/len(bible_files)*100:.1f}% complete")
# Update status
status["current_file"] = filename
status["processed"] = i - 1
status["successful"] = successful
status["failed"] = failed
update_status(status)
await process_bible_file(bible_file, client)
successful += 1
print(f"✅ Completed {filename}")
except Exception as e:
error_msg = f"Failed to process {os.path.basename(bible_file)}: {str(e)}"
print(f"{error_msg}")
failed += 1
failed_files.append(os.path.basename(bible_file))
status["errors"].append({"file": os.path.basename(bible_file), "error": str(e), "timestamp": time.strftime('%Y-%m-%d %H:%M:%S')})
update_status(status)
continue
# Final summary
elapsed_time = time.time() - start_time
elapsed_hours = elapsed_time / 3600
print(f"\n=== Final Summary ===")
print(f"✅ Successfully processed: {successful} files")
print(f"❌ Failed to process: {failed} files")
print(f"📊 Total files: {len(bible_files)}")
print(f"⏱️ Total time: {elapsed_hours:.2f} hours ({elapsed_time:.0f} seconds)")
print(f"📈 Average: {elapsed_time/len(bible_files):.1f} seconds per file")
if failed_files:
print(f"\n❌ Failed files:")
for filename in failed_files:
print(f" - {filename}")
# Final status update
status.update({
"status": "completed",
"end_time": time.strftime('%Y-%m-%d %H:%M:%S'),
"processed": len(bible_files),
"successful": successful,
"failed": failed,
"duration_seconds": elapsed_time,
"current_file": ""
})
update_status(status)
print("\n🎉 All large Bible files have been processed!")
print(f"📋 Status file: /root/biblical-guide/scripts/ingest_status.json")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,169 +0,0 @@
import os, json, re, asyncio
from pathlib import Path
from typing import List, Dict
from dotenv import load_dotenv
import httpx
import psycopg
load_dotenv()
AZ_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT", "").rstrip("/")
AZ_API_KEY = os.getenv("AZURE_OPENAI_KEY")
AZ_API_VER = os.getenv("AZURE_OPENAI_API_VERSION", "2024-05-01-preview")
AZ_DEPLOYMENT = os.getenv("AZURE_OPENAI_EMBED_DEPLOYMENT", "embed-3")
EMBED_DIMS = int(os.getenv("EMBED_DIMS", "3072"))
DB_URL = os.getenv("DATABASE_URL")
VECTOR_SCHEMA = os.getenv("VECTOR_SCHEMA", "ai_bible")
LANG_CODE = os.getenv("LANG_CODE", "en")
TRANSLATION = os.getenv("TRANSLATION_CODE", "WEB")
JSON_DIR = os.getenv("JSON_DIR", f"data/en_bible/{TRANSLATION}")
assert AZ_ENDPOINT and AZ_API_KEY and DB_URL and JSON_DIR, "Missing required env vars"
EMBED_URL = f"{AZ_ENDPOINT}/openai/deployments/{AZ_DEPLOYMENT}/embeddings?api-version={AZ_API_VER}"
def safe_ident(s: str) -> str:
return re.sub(r"[^a-z0-9_]+", "_", s.lower()).strip("_")
TABLE_BASENAME = f"bv_{safe_ident(LANG_CODE)}_{safe_ident(TRANSLATION)}"
TABLE_FQN = f'"{VECTOR_SCHEMA}"."{TABLE_BASENAME}"'
def create_table_sql() -> str:
return f"""
CREATE SCHEMA IF NOT EXISTS "{VECTOR_SCHEMA}";
CREATE TABLE IF NOT EXISTS {TABLE_FQN} (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
testament TEXT NOT NULL,
book TEXT NOT NULL,
chapter INT NOT NULL,
verse INT NOT NULL,
ref TEXT GENERATED ALWAYS AS (book || ' ' || chapter || ':' || verse) STORED,
text_raw TEXT NOT NULL,
text_norm TEXT NOT NULL,
tsv tsvector,
embedding vector({EMBED_DIMS}),
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now()
);
"""
def create_indexes_sql() -> str:
return f"""
CREATE UNIQUE INDEX IF NOT EXISTS ux_ref_{TABLE_BASENAME} ON {TABLE_FQN} (book, chapter, verse);
CREATE INDEX IF NOT EXISTS idx_tsv_{TABLE_BASENAME} ON {TABLE_FQN} USING GIN (tsv);
CREATE INDEX IF NOT EXISTS idx_book_ch_{TABLE_BASENAME} ON {TABLE_FQN} (book, chapter);
CREATE INDEX IF NOT EXISTS idx_testament_{TABLE_BASENAME} ON {TABLE_FQN} (testament);
"""
def upsert_sql() -> str:
return f"""
INSERT INTO {TABLE_FQN} (testament, book, chapter, verse, text_raw, text_norm, tsv, embedding)
VALUES (%(testament)s, %(book)s, %(chapter)s, %(verse)s, %(text_raw)s, %(text_norm)s,
to_tsvector(COALESCE(%(ts_lang)s,'simple')::regconfig, %(text_norm)s), %(embedding)s)
ON CONFLICT (book, chapter, verse) DO UPDATE
SET text_raw=EXCLUDED.text_raw,
text_norm=EXCLUDED.text_norm,
tsv=EXCLUDED.tsv,
embedding=EXCLUDED.embedding,
updated_at=now();
"""
def normalize(s: str) -> str:
s = re.sub(r"\s+", " ", s.strip())
return s
async def embed_batch(client: httpx.AsyncClient, inputs: List[str]) -> List[List[float]]:
payload = {"input": inputs}
headers = {"api-key": AZ_API_KEY, "Content-Type": "application/json"}
for attempt in range(6):
try:
r = await client.post(EMBED_URL, headers=headers, json=payload, timeout=60)
if r.status_code == 200:
data = r.json()
ordered = sorted(data["data"], key=lambda x: x["index"])
return [d["embedding"] for d in ordered]
elif r.status_code in (429, 500, 502, 503):
backoff = 2 ** attempt + (0.25 * attempt)
print(f"Rate/Server limited ({r.status_code}), waiting {backoff:.1f}s...")
await asyncio.sleep(backoff)
else:
raise RuntimeError(f"Embedding error {r.status_code}: {r.text}")
except Exception as e:
backoff = 2 ** attempt + (0.25 * attempt)
print(f"Error on attempt {attempt + 1}: {e}, waiting {backoff:.1f}s...")
await asyncio.sleep(backoff)
raise RuntimeError("Failed to embed after retries")
def load_json() -> List[Dict]:
ot = json.loads(Path(Path(JSON_DIR)/'old_testament.json').read_text('utf-8'))
nt = json.loads(Path(Path(JSON_DIR)/'new_testament.json').read_text('utf-8'))
verses = []
for test in (ot, nt):
testament = test.get('testament')
for book in test.get('books', []):
bname = book.get('name')
for ch in book.get('chapters', []):
cnum = int(ch.get('chapterNum'))
for v in ch.get('verses', []):
vnum = int(v.get('verseNum'))
text = str(v.get('text') or '').strip()
if text:
verses.append({
'testament': testament,
'book': bname,
'chapter': cnum,
'verse': vnum,
'text_raw': text,
'text_norm': normalize(text),
})
return verses
async def main():
print("Starting JSON embedding ingestion...", JSON_DIR)
verses = load_json()
print("Verses loaded:", len(verses))
batch_size = int(os.getenv('BATCH_SIZE', '128'))
# Prepare schema/table
with psycopg.connect(DB_URL) as conn:
with conn.cursor() as cur:
print(f"Ensuring schema/table {TABLE_FQN} ...")
cur.execute("CREATE EXTENSION IF NOT EXISTS vector;")
cur.execute(create_table_sql())
cur.execute(create_indexes_sql())
conn.commit()
async with httpx.AsyncClient() as client:
with psycopg.connect(DB_URL, autocommit=False) as conn:
with conn.cursor() as cur:
for i in range(0, len(verses), batch_size):
batch = verses[i:i+batch_size]
inputs = [v['text_norm'] for v in batch]
embs = await embed_batch(client, inputs)
rows = []
ts_lang = 'english' if LANG_CODE.lower().startswith('en') else 'simple'
for v, e in zip(batch, embs):
rows.append({ **v, 'ts_lang': ts_lang, 'embedding': e })
cur.executemany(upsert_sql(), rows)
conn.commit()
print(f"Upserted {len(rows)} verses... {i+len(rows)}/{len(verses)}")
print("Creating IVFFLAT index...")
with psycopg.connect(DB_URL, autocommit=True) as conn:
with conn.cursor() as cur:
cur.execute(f"VACUUM ANALYZE {TABLE_FQN};")
try:
cur.execute(f"""
CREATE INDEX IF NOT EXISTS idx_vec_ivfflat_{TABLE_BASENAME}
ON {TABLE_FQN} USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 200);
""")
except Exception as e:
print('IVFFLAT creation skipped (tune maintenance_work_mem):', e)
print("✅ JSON embedding ingestion completed successfully!")
if __name__ == '__main__':
asyncio.run(main())