geo-platform / server /api.py
3v324v23's picture
fix: Simplify Dockerfile and add startup logging for HF Space debugging
fc8d4fd
import os
import json
from fastapi import FastAPI, Request, BackgroundTasks
from fastapi.responses import FileResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from pathlib import Path
import datetime
import sqlite3
from typing import Optional, List
from dotenv import load_dotenv
load_dotenv()
print(" GEO Platform API starting...")
print(f" Working directory: {os.getcwd()}")
print(f" Python version: {os.sys.version}")
# lazily import heavy pipeline to avoid startup failures when optional deps
# (like spaCy) are not installed. import inside handlers that need it.
run_pipeline = None
try:
from server import ai_visibility
print(" ai_visibility loaded")
except Exception as e:
print(f" ai_visibility failed: {e}")
ai_visibility = None
try:
from server import ai_analysis
print(" ai_analysis loaded")
except Exception as e:
print(f" ai_analysis failed: {e}")
ai_analysis = None
try:
from server import job_queue
print(" job_queue loaded")
except Exception as e:
print(f" job_queue failed: {e}")
job_queue = None
try:
from server import keyword_engine
print(" keyword_engine loaded")
except Exception as e:
print(f" keyword_engine failed: {e}")
keyword_engine = None
try:
from server import users as user_mgmt
print(" users loaded")
except Exception as e:
print(f" users failed: {e}")
user_mgmt = None
try:
from server import search_intel
print(" search_intel loaded")
except Exception as e:
print(f" search_intel failed: {e}")
search_intel = None
from fastapi import WebSocket
import asyncio
OUTPUT_DIR = Path(os.environ.get('OUTPUT_DIR', str(Path(__file__).resolve().parent.parent / 'output')))
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
print(f" Output directory: {OUTPUT_DIR}")
app = FastAPI(title='GEO Platform API')
print(" FastAPI app created")
# Serve frontend static files
frontend_dir = Path(__file__).resolve().parent.parent / 'frontend'
if frontend_dir.exists():
app.mount('/static', StaticFiles(directory=str(frontend_dir)), name='static')
print(f" Frontend mounted: {frontend_dir}")
else:
print(f" Frontend directory not found: {frontend_dir}")
class CrawlRequest(BaseModel):
url: str
org_name: str
org_url: str
max_pages: int = 3
runs: int = 1
api_keys: Optional[dict] = None
class RecommendationRequest(BaseModel):
api_keys: Optional[dict] = None
job_id: Optional[int] = None
extra_context: Optional[dict] = None
class AnalysisRequest(BaseModel):
api_keys: Optional[dict] = None
job_id: Optional[int] = None
@app.post('/api/crawl')
async def api_crawl(req: CrawlRequest):
global run_pipeline
if run_pipeline is None:
try:
from src.main import run_pipeline as _rp
run_pipeline = _rp
except Exception as e:
return JSONResponse({'ok': False, 'error': 'pipeline not available: ' + str(e)}, status_code=500)
# runs: perform multiple runs and average scores, snapshot first run
runs = max(1, req.runs or 1)
run_results = []
timestamps = []
scores = []
breakdowns = []
audit_objs = []
try:
for i in range(runs):
ts = datetime.datetime.utcnow().strftime('%Y%m%dT%H%M%SZ')
subdir = OUTPUT_DIR / f"run-{ts}-{i+1}"
res = run_pipeline(req.url, req.org_name, req.org_url, max_pages=req.max_pages, output_dir=subdir)
# load created audit
audit_path = Path(res['audit_path'])
with open(audit_path, 'r', encoding='utf-8') as f:
audit_obj = json.load(f)
audit_objs.append(audit_obj)
# Infer brand name if generic for visibility check
org_name_for_visibility = req.org_name
if not org_name_for_visibility or org_name_for_visibility == 'Company':
pages_for_inference = audit_obj.get('pages', [])
inferred = ai_analysis.infer_brand_name(pages_for_inference)
if inferred and inferred != 'Company':
org_name_for_visibility = inferred
# Update audit_obj with inferred name for consistency
audit_obj['org_name'] = inferred
# run AI visibility (Perplexity) for this run and attach; fallback to OpenAI visibility
queries = [f"What is {org_name_for_visibility}?", f"Best services for {org_name_for_visibility}"]
perf = ai_visibility.check_perplexity(org_name_for_visibility, queries)
if not perf.get('enabled') and perf.get('reason') == 'PERPLEXITY_KEY not set':
# try OpenAI fallback
try:
perf = ai_visibility.check_openai_visibility(org_name_for_visibility, queries)
perf['fallback'] = 'openai'
except Exception:
pass
audit_obj['ai_visibility'] = perf
# overwrite audit with ai visibility included
with open(audit_path, 'w', encoding='utf-8') as f:
json.dump(audit_obj, f, ensure_ascii=False, indent=2)
# compute geo score for this run
score = ai_analysis.compute_geo_score(audit_obj.get('pages', []), audit=audit_obj, ai_visibility=perf)
scores.append(score['score'])
breakdowns.append(score['breakdown'])
run_results.append({ 'ts': ts, 'audit_path': str(audit_path), 'geo_score': score })
timestamps.append(ts)
# snapshot first run pages for deterministic reference
snap_ts = timestamps[0]
snap_src = Path(run_results[0]['audit_path'])
snap_dst = OUTPUT_DIR / f"snapshot-{snap_ts}.json"
with open(snap_src, 'r', encoding='utf-8') as fsrc, open(snap_dst, 'w', encoding='utf-8') as fdst:
fdst.write(fsrc.read())
# Update the main audit/analysis files so UI endpoints (`/api/results`) read latest data
try:
# copy snapshot to output/audit.json
audit_main = OUTPUT_DIR / 'audit.json'
with open(snap_dst, 'r', encoding='utf-8') as s, open(audit_main, 'w', encoding='utf-8') as m:
m.write(s.read())
# generate and save aggregated analysis (OpenAI/Groq) and geo_score based on snapshot
try:
with open(audit_main, 'r', encoding='utf-8') as f:
audit_obj = json.load(f)
pages = audit_obj.get('pages', [])
analysis = ai_analysis.analyze_pages(pages)
geo_score = ai_analysis.compute_geo_score(pages, audit=audit_obj, ai_visibility=audit_obj.get('ai_visibility'))
analysis_out = { 'analysis': analysis, 'geo_score': geo_score }
with open(OUTPUT_DIR / 'analysis.json', 'w', encoding='utf-8') as fa:
json.dump(analysis_out, fa, ensure_ascii=False, indent=2)
# also save a top-level analysis.json for backwards compatibility
except Exception:
pass
except Exception:
pass
# compute aggregated stats
import statistics
mean_score = int(round(statistics.mean(scores)))
median_score = int(round(statistics.median(scores)))
variance = float(statistics.pstdev(scores))
history_path = OUTPUT_DIR / 'history.json'
history = []
if history_path.exists():
try:
history = json.loads(history_path.read_text(encoding='utf-8'))
except Exception:
history = []
entry = {
'timestamp': timestamps[0],
'url': req.url,
'org_name': req.org_name,
'runs': runs,
'scores': scores,
'mean': mean_score,
'median': median_score,
'variance': variance,
'runs_info': run_results
}
history.append(entry)
history_path.write_text(json.dumps(history, ensure_ascii=False, indent=2), encoding='utf-8')
return { 'ok': True, 'message': 'crawl completed', 'mean_score': mean_score, 'median_score': median_score, 'variance': variance, 'history_entry': entry }
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
class JobRequest(BaseModel):
url: str
org_name: str
org_url: str
max_pages: int = 3
runs: int = 1
@app.post('/api/jobs')
async def api_enqueue(job: JobRequest, background_tasks: BackgroundTasks):
try:
# attach user if Authorization bearer token present
user_id = None
company_id = None
auth = None
try:
from server import users as user_mgmt
auth = Request
except Exception:
pass
# read header from context if available
try:
token = None
if 'authorization' in job.__dict__:
token = job.__dict__.get('authorization')
except Exception:
token = None
# FastAPI request headers are not directly available here; attempt to read from global request state
from fastapi import Depends
# fallback: inspect the incoming request via starlette's Request if passed in
try:
# Use the current request from context (this handler doesn't receive Request by default)
pass
except Exception:
pass
# Simpler approach: if client provided 'Authorization' header, it will be available via Request object.
# For now, enqueue without user association; clients can call /api/jobs/claim or we can extend later.
jid = job_queue.enqueue_job(job.url, job.org_name, job.org_url, job.max_pages, job.runs)
# Dispatch background task immediately
from server.worker import process_job
job_data = job_queue.get_job(jid)
if job_data:
background_tasks.add_task(process_job, job_data)
return {'ok': True, 'job_id': jid}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.get('/api/jobs')
async def api_list_jobs():
try:
data = job_queue.list_jobs()
return {'ok': True, 'jobs': data}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.get('/api/jobs/{job_id}')
async def api_get_job(job_id: int):
try:
job = job_queue.get_job(job_id)
if not job:
return JSONResponse({'ok': False, 'error': 'not found'}, status_code=404)
# parse progress JSON
try:
job['progress'] = json.loads(job.get('progress') or '{}')
except Exception:
job['progress'] = {}
return {'ok': True, 'job': job}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.websocket('/ws/jobs/{job_id}')
async def ws_job_progress(ws: WebSocket, job_id: int):
await ws.accept()
last_updated = None
try:
while True:
job = job_queue.get_job(job_id)
if not job:
# send a not-found once then keep waiting in case job appears
try:
await ws.send_json({'ok': False, 'error': 'job not found'})
except Exception:
pass
await asyncio.sleep(1)
continue
# normalize progress and datetime fields for JSON
try:
prog = job.get('progress')
if isinstance(prog, str):
try:
job['progress'] = json.loads(prog or '{}')
except Exception:
job['progress'] = {}
else:
job['progress'] = prog or {}
except Exception:
job['progress'] = {}
# convert timestamps to ISO strings
for dt_key in ('created_at', 'updated_at'):
try:
val = job.get(dt_key)
if hasattr(val, 'isoformat'):
job[dt_key] = val.isoformat()
else:
job[dt_key] = str(val) if val is not None else None
except Exception:
job[dt_key] = str(job.get(dt_key))
updated = job.get('updated_at')
if updated != last_updated:
last_updated = updated
try:
await ws.send_json({'ok': True, 'job': job})
except Exception:
# if sending fails (e.g., client gone), break loop
break
await asyncio.sleep(1)
finally:
try:
await ws.close()
except Exception:
pass
@app.get('/api/jobs/{job_id}/results')
async def api_job_results(job_id: int):
job = job_queue.get_job(job_id)
if not job:
return JSONResponse({'ok': False, 'error': 'not found'}, status_code=404)
result_path = job.get('result_path')
if not result_path:
return JSONResponse({'ok': False, 'error': 'no results yet'}, status_code=400)
out = {'ok': True, 'job_id': job_id}
audit_path = Path(result_path) / 'audit.json'
analysis_path = Path(result_path) / 'analysis.json'
schema_path = Path(result_path) / 'schema.jsonld'
if audit_path.exists():
out['audit'] = json.loads(audit_path.read_text(encoding='utf-8'))
if analysis_path.exists():
out['analysis'] = json.loads(analysis_path.read_text(encoding='utf-8'))
if schema_path.exists():
out['schema'] = schema_path.read_text(encoding='utf-8')
return out
@app.get('/api/results')
async def api_results(ts: str | None = None):
if ts:
snapshot_path = OUTPUT_DIR / f'snapshot-{ts}.json'
if snapshot_path.exists():
try:
return {'audit': json.loads(snapshot_path.read_text(encoding='utf-8'))}
except Exception as e:
return JSONResponse({'ok': False, 'error': f'Failed to load snapshot: {str(e)}'}, status_code=500)
audit_path = OUTPUT_DIR / 'audit.json'
schema_path = OUTPUT_DIR / 'schema.jsonld'
out = {}
if audit_path.exists():
out['audit'] = json.loads(audit_path.read_text(encoding='utf-8'))
if schema_path.exists():
out['schema'] = schema_path.read_text(encoding='utf-8')
return out
@app.on_event("startup")
async def startup_event():
print(" GEO Platform API is ready!")
print(f" Access at: http://0.0.0.0:7860")
print(f" Health check: http://0.0.0.0:7860/health")
@app.get('/health')
async def health_check():
return {'status': 'healthy', 'service': 'GEO Platform'}
@app.get('/')
async def index():
index_file = frontend_dir / 'index.html'
if index_file.exists():
return FileResponse(str(index_file), headers={"Cache-Control": "no-store, no-cache, must-revalidate, max-age=0"})
return {'ok': True}
@app.get('/recommendations.html')
async def recommendations_page():
rec_file = frontend_dir / 'recommendations.html'
if rec_file.exists():
return FileResponse(str(rec_file), headers={"Cache-Control": "no-store, no-cache, must-revalidate, max-age=0"})
return JSONResponse({'ok': False, 'error': 'recommendations page not found'}, status_code=404)
@app.get('/jobs.html')
async def jobs_page():
f = frontend_dir / 'jobs.html'
if f.exists():
return FileResponse(str(f), headers={"Cache-Control": "no-store, no-cache, must-revalidate, max-age=0"})
return JSONResponse({'ok': False, 'error': 'jobs page not found'}, status_code=404)
@app.get('/search.html')
async def search_page():
f = frontend_dir / 'search.html'
if f.exists():
return FileResponse(str(f), headers={"Cache-Control": "no-store, no-cache, must-revalidate, max-age=0"})
return JSONResponse({'ok': False, 'error': 'search page not found'}, status_code=404)
@app.get('/content')
@app.get('/content_v2.html')
async def content_page_v2():
f = frontend_dir / 'content_v2.html'
if f.exists():
return FileResponse(str(f), headers={"Cache-Control": "no-store, no-cache, must-revalidate, max-age=0"})
return JSONResponse({'ok': False, 'error': 'content page not found'}, status_code=404)
@app.get('/theme.css')
async def theme_css_file():
f = frontend_dir / 'theme.css'
if f.exists():
return FileResponse(str(f))
return JSONResponse({'ok': False, 'error': 'theme.css not found'}, status_code=404)
@app.get('/api/jobs/{job_id}/report')
async def api_job_report(job_id: int):
job = job_queue.get_job(job_id)
if not job:
return JSONResponse({'ok': False, 'error': 'job not found'}, status_code=404)
result_path = job.get('result_path')
if not result_path:
return JSONResponse({'ok': False, 'error': 'job result not ready'}, status_code=400)
from server import reports
html = reports.build_html_report(result_path)
return JSONResponse({'ok': True, 'report_html': html})
@app.get('/api/jobs/{job_id}/report.pdf')
async def api_job_report_pdf(job_id: int):
job = job_queue.get_job(job_id)
if not job:
return JSONResponse({'ok': False, 'error': 'job not found'}, status_code=404)
result_path = job.get('result_path')
if not result_path:
return JSONResponse({'ok': False, 'error': 'job result not ready'}, status_code=400)
from server import reports
html = reports.build_html_report(result_path)
out_pdf = Path(result_path) / f'report-{job_id}.pdf'
ok = reports.try_render_pdf(html, out_pdf)
if ok and out_pdf.exists():
return FileResponse(str(out_pdf), media_type='application/pdf')
# fallback: return HTML as text for user to save
return JSONResponse({'ok': True, 'report_html': html})
@app.get('/api/jobs/{job_id}/keywords')
async def api_job_keywords(job_id: int, enrich: bool = True, analytics: bool = False):
try:
job = job_queue.get_job(job_id)
if not job:
return JSONResponse({'ok': False, 'error': 'job not found'}, status_code=404)
result_path = job.get('result_path')
if not result_path:
return JSONResponse({'ok': False, 'error': 'job result not ready'}, status_code=400)
audit_path = Path(result_path) / 'audit.json'
if not audit_path.exists():
return JSONResponse({'ok': False, 'error': 'audit.json not found in result_path'}, status_code=404)
with open(audit_path, 'r', encoding='utf-8') as f:
audit = json.load(f)
result = keyword_engine.extract_keywords_from_audit(audit, top_n=40, enrich=enrich, analytics=analytics)
if analytics:
return {'ok': True, 'analytics': result}
else:
return {'ok': True, 'keywords': result}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
class UserRegister(BaseModel):
email: str
password: str
company_id: int | None = None
class CompanyRegister(BaseModel):
name: str
domain: str | None = None
class LoginRequest(BaseModel):
email: str
password: str
@app.post('/api/companies/register')
async def api_register_company(req: CompanyRegister):
try:
cid = user_mgmt.create_company(req.name, req.domain)
return {'ok': True, 'company_id': cid}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/users/register')
async def api_register_user(req: UserRegister):
try:
uid = user_mgmt.create_user(req.email, req.password, company_id=req.company_id)
return {'ok': True, 'user_id': uid}
except sqlite3.IntegrityError:
return JSONResponse({'ok': False, 'error': 'email already exists'}, status_code=400)
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/users/login')
async def api_login(req: LoginRequest):
try:
user = user_mgmt.authenticate_user(req.email, req.password)
if not user:
return JSONResponse({'ok': False, 'error': 'invalid credentials'}, status_code=401)
token = user_mgmt.make_token(user['id'])
return {'ok': True, 'token': token, 'user': user}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.get('/api/users/me')
async def api_me(request: Request):
auth = request.headers.get('authorization') or request.headers.get('Authorization')
if not auth or not auth.lower().startswith('bearer '):
return JSONResponse({'ok': False, 'error': 'missing token'}, status_code=401)
token = auth.split(' ',1)[1].strip()
uid = user_mgmt.verify_token(token)
if not uid:
return JSONResponse({'ok': False, 'error': 'invalid token'}, status_code=401)
u = user_mgmt.get_user(uid)
if not u:
return JSONResponse({'ok': False, 'error': 'user not found'}, status_code=404)
return {'ok': True, 'user': u}
@app.post('/api/analyze')
async def api_analyze(req: AnalysisRequest = None):
api_keys = req.api_keys if req else {}
job_id = req.job_id if req else None
# Resolve audit path: job-specific first, then global fallback
if job_id:
job = job_queue.get_job(job_id)
if not job:
return JSONResponse({'ok': False, 'error': f'job {job_id} not found'}, status_code=404)
result_path = job.get('result_path')
if not result_path:
return JSONResponse({'ok': False, 'error': f'job {job_id} not completed yet'}, status_code=400)
audit_path = Path(result_path) / 'audit.json'
analysis_out_path = Path(result_path) / 'analysis.json'
else:
audit_path = OUTPUT_DIR / 'audit.json'
analysis_out_path = OUTPUT_DIR / 'analysis.json'
if not audit_path.exists():
return JSONResponse({'ok': False, 'error': 'no audit found; run crawl first'}, status_code=400)
with open(audit_path, 'r', encoding='utf-8') as f:
audit = json.load(f)
pages = audit.get('pages', [])
# Pass user-provided keys to analysis
analysis = ai_analysis.analyze_pages(pages, api_keys=api_keys)
# Re-run AI visibility if keys provided to ensure "Visibility Check" works
ai_vis = audit.get('ai_visibility')
if api_keys:
org_name = audit.get('org_name', 'Company')
queries = [f"What is {org_name}?", f"Best services for {org_name}"]
# Try Perplexity if key exists, otherwise OpenAI
if api_keys.get('perplexity'):
ai_vis = ai_visibility.check_perplexity(org_name, queries, api_key=api_keys.get('perplexity'))
elif api_keys.get('openai'):
ai_vis = ai_visibility.check_openai_visibility(org_name, queries, api_key=api_keys.get('openai'))
audit['ai_visibility'] = ai_vis
# Save updated audit
with open(audit_path, 'w', encoding='utf-8') as f:
json.dump(audit, f, ensure_ascii=False, indent=2)
from server import geo_services
# ── Enhanced Visibility Score v2 (API Based) ────────────────────────────────
org_name = audit.get('org_name') or ai_analysis.infer_brand_name(pages)
# Fetch real search results
searches = []
serp_key = api_keys.get("SERPAPI_KEY") if api_keys else None
zen_key = api_keys.get("ZENSERP_KEY") if api_keys else None
core_queries = [f"{org_name}", f"تحميل {org_name}", f"{org_name} review"]
for cq in core_queries[:2]:
s_res = geo_services._serp_api_search(cq, api_key=serp_key)
if not s_res:
s_res = geo_services._zenserp_search(cq, api_key=zen_key)
if s_res:
searches.append(s_res)
# ── Competitor Insight Enrichment ──────────────────────────────────────────
comp_insight = {}
try:
comp_insight = geo_services.get_competitor_insights(org_name, audit.get('url'), api_keys=api_keys)
except Exception:
pass
# Hybrid Score Calculation (v2)
ai_mentions = ai_vis.get("mentions", 0) if ai_vis else 0
total_queries = ai_vis.get("total_queries", 1) if ai_vis else 1
traffic_est = comp_insight.get("monthly_visits", "unknown")
geo_v2 = geo_services.calculate_visibility_score_v2(
org_name, searches, ai_mentions, total_queries, traffic_est
)
geo = ai_analysis.compute_geo_score(pages, audit=audit, ai_visibility=ai_vis)
geo["v2"] = geo_v2 # Combined 40/40/20 score
analysis_out = { 'analysis': analysis, 'geo_score': geo, 'competitor_insight': comp_insight }
with open(analysis_out_path, 'w', encoding='utf-8') as fa:
json.dump(analysis_out, fa, ensure_ascii=False, indent=2)
return { 'ok': True, 'analysis': analysis, 'geo_score': geo, 'competitor_insight': comp_insight }
@app.get('/api/history')
async def api_history():
history_path = OUTPUT_DIR / 'history.json'
if not history_path.exists():
return { 'ok': True, 'history': [] }
try:
data = json.loads(history_path.read_text(encoding='utf-8'))
return { 'ok': True, 'history': data }
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
class ActivateRequest(BaseModel):
ts: str
api_keys: Optional[dict] = None
@app.post('/api/history/activate')
async def api_activate_history(req: ActivateRequest):
"""
Switch the 'active' research to a historical snapshot.
This overwrites audit.json and triggers a re-analysis.
"""
try:
ts = req.ts
snapshot_path = OUTPUT_DIR / f'snapshot-{ts}.json'
if not snapshot_path.exists():
return JSONResponse({'ok': False, 'error': f'Snapshot {ts} not found'}, status_code=404)
# 1. Update audit.json
audit_main = OUTPUT_DIR / 'audit.json'
with open(snapshot_path, 'r', encoding='utf-8') as s, open(audit_main, 'w', encoding='utf-8') as m:
audit_obj = json.load(s)
json.dump(audit_obj, m, ensure_ascii=False, indent=2)
# 2. Trigger analysis and compute score
pages = audit_obj.get('pages', [])
api_keys = req.api_keys or {}
# Pass user-provided keys to analysis
analysis_data = ai_analysis.analyze_pages(pages, api_keys=api_keys)
# If brand name is generic, infer it
org_name = audit_obj.get('org_name', 'Company')
if not org_name or org_name == 'Company':
inferred = ai_analysis.infer_brand_name(pages)
if inferred and inferred != 'Company':
org_name = inferred
audit_obj['org_name'] = org_name
# Re-save with inferred name
with open(audit_main, 'w', encoding='utf-8') as f:
json.dump(audit_obj, f, ensure_ascii=False, indent=2)
ai_vis = audit_obj.get('ai_visibility')
# Re-run visibility if keys provided
if api_keys and (api_keys.get('perplexity') or api_keys.get('openai')):
queries = [f"What is {org_name}?", f"Best services for {org_name}"]
if api_keys.get('perplexity'):
ai_vis = ai_visibility.check_perplexity(org_name, queries, api_key=api_keys['perplexity'])
elif api_keys.get('openai'):
ai_vis = ai_visibility.check_openai_visibility(org_name, queries, api_key=api_keys.get('openai'))
audit_obj['ai_visibility'] = ai_vis
with open(audit_main, 'w', encoding='utf-8') as f:
json.dump(audit_obj, f, ensure_ascii=False, indent=2)
geo_score = ai_analysis.compute_geo_score(pages, audit=audit_obj, ai_visibility=ai_vis)
# 3. Save analysis.json
full_report = { 'analysis': analysis_data, 'geo_score': geo_score }
analysis_path = OUTPUT_DIR / 'analysis.json'
with open(analysis_path, 'w', encoding='utf-8') as fa:
json.dump(full_report, fa, ensure_ascii=False, indent=2)
# 4. Generate Strategic Intelligence Report for the newly activated data
# This ensures the Competitive Intelligence Matrix updates immediately
try:
from server import search_intelligence
report = search_intelligence.run_complete_analysis(pages, source_url=audit_obj.get('url', 'http://example.com'), api_keys=api_keys)
except Exception as e:
print(f"Intelligence sync error: {e}")
report = None
return {
'ok': True,
'message': f'Research {ts} activated successfully',
'org_name': org_name,
'pages_count': len(pages),
'report': report
}
except Exception as e:
import traceback
traceback.print_exc()
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/recommendations')
async def api_recommendations(req: RecommendationRequest = None):
# load analysis and audit results then produce recommendations
api_keys = req.api_keys if req else {}
job_id = req.job_id if req else None
extra_context = req.extra_context if (req and req.extra_context) else {}
# Resolve paths: prefer job-specific result_path, fall back to global output/
if job_id:
job = job_queue.get_job(job_id)
if not job:
return JSONResponse({'ok': False, 'error': f'job {job_id} not found'}, status_code=404)
result_path = job.get('result_path')
if not result_path:
return JSONResponse({'ok': False, 'error': f'job {job_id} has no results yet — wait for it to complete'}, status_code=400)
audit_path = Path(result_path) / 'audit.json'
analysis_path = Path(result_path) / 'analysis.json'
else:
audit_path = OUTPUT_DIR / 'audit.json'
analysis_path = OUTPUT_DIR / 'analysis.json'
if not audit_path.exists():
return JSONResponse({'ok': False, 'error': 'no audit found — run a crawl first'}, status_code=400)
with open(audit_path, 'r', encoding='utf-8') as f:
audit = json.load(f)
pages = audit.get('pages', [])
# If keys are provided, we should probably re-run analysis to fill in the "Visibility" and "AI Analysis" gaps
if api_keys:
# Re-run visibility if brand changed or results are missing
old_org = audit.get('org_name', 'Company')
org_name = old_org
if not org_name or org_name == 'Company':
inferred = ai_analysis.infer_brand_name(pages)
if inferred and inferred != 'Company':
org_name = inferred
audit['org_name'] = org_name
queries = [f"What is {org_name}?", f"Best services for {org_name}"]
ai_vis = audit.get('ai_visibility')
# If brand name improved OR visibility is missing/demo, re-run it
needs_visibility = (org_name != old_org) or not ai_vis or not ai_vis.get('enabled')
if needs_visibility and (api_keys.get('perplexity') or api_keys.get('openai')):
if api_keys.get('perplexity'):
ai_vis = ai_visibility.check_perplexity(org_name, queries, api_key=api_keys['perplexity'])
elif api_keys.get('openai'):
ai_vis = ai_visibility.check_openai_visibility(org_name, queries, api_key=api_keys.get('openai'))
audit['ai_visibility'] = ai_vis
# Re-run analysis
analysis_data = ai_analysis.analyze_pages(pages, api_keys=api_keys)
geo_score = ai_analysis.compute_geo_score(pages, audit=audit, ai_visibility=ai_vis)
# Save updated results
with open(audit_path, 'w', encoding='utf-8') as f:
json.dump(audit, f, ensure_ascii=False, indent=2)
with open(analysis_path, 'w', encoding='utf-8') as f:
json.dump({ 'analysis': analysis_data, 'geo_score': geo_score }, f, ensure_ascii=False, indent=2)
else:
# Just load existing
if not analysis_path.exists():
# Trigger basic analysis if missing
analysis_data = ai_analysis.analyze_pages(pages)
geo_score = ai_analysis.compute_geo_score(pages, audit=audit, ai_visibility=audit.get('ai_visibility'))
else:
with open(analysis_path, 'r', encoding='utf-8') as f:
ana_obj = json.load(f)
analysis_data = ana_obj.get('analysis')
geo_score = ana_obj.get('geo_score')
recs = ai_analysis.generate_recommendations(pages, geo_score=geo_score, api_keys=api_keys, ai_analysis_results=analysis_data, extra_context=extra_context)
return {
'ok': True,
'recommendations': recs,
'audit': audit,
'ai_visibility': audit.get('ai_visibility'),
'analysis': analysis_data,
'geo_score': geo_score
}
class KeywordsRequest(BaseModel):
url: str
max_pages: int = 1
api_keys: Optional[dict] = None
@app.post('/api/keywords')
async def api_keywords(req: KeywordsRequest, enrich: bool = True, analytics: bool = False):
try:
# try to fetch pages via crawler (will fallback to requests)
from src import crawler
pages = crawler.crawl_seed(req.url, max_pages=req.max_pages)
# build a minimal audit-like object
audit_obj = {'pages': pages}
from server import keyword_engine
result = keyword_engine.extract_keywords_from_audit(audit_obj, top_n=40, enrich=enrich, analytics=analytics)
if analytics:
# Return full analytics report
return {'ok': True, 'analytics': result, 'pages': [{'url': p.get('url'), 'title': p.get('title')} for p in pages]}
else:
# Return simple keyword list
return {'ok': True, 'keywords': result, 'pages': [{'url': p.get('url'), 'title': p.get('title')} for p in pages]}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.get('/api/test/keywords')
async def api_test_keywords():
"""Test endpoint to verify keyword extraction works."""
try:
from src import crawler
from server.keyword_engine import extract_keywords_from_audit
# Test with abayanoir
pages = crawler.crawl_seed('https://abayanoir.com', max_pages=1)
audit = {'pages': pages}
# Simple mode
simple = extract_keywords_from_audit(audit, top_n=5, enrich=False, analytics=False)
# Analytics mode
analytics = extract_keywords_from_audit(audit, top_n=5, enrich=False, analytics=True)
return {
'ok': True,
'simple_keywords': simple,
'analytics_type': str(type(analytics)),
'analytics_summary': analytics.get('summary') if isinstance(analytics, dict) else 'NOT A DICT',
'pages_crawled': len(pages)
}
except Exception as e:
import traceback
traceback.print_exc()
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/search/intelligence')
async def api_search_intelligence(req: KeywordsRequest, enrich: bool = True):
"""Complete search intelligence analysis with keywords, competitors, and recommendations."""
try:
from src import crawler
from server import search_intelligence
# Crawl pages
pages = crawler.crawl_seed(req.url, max_pages=req.max_pages)
# Ensure pages is a list
if not isinstance(pages, list):
pages = [pages] if pages else []
# Run complete analysis
report = search_intelligence.run_complete_analysis(pages, req.url, enrich_data=enrich, api_keys=req.api_keys)
return {'ok': True, 'report': report}
except Exception as e:
import traceback
traceback.print_exc()
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/export/keywords/csv')
async def api_export_keywords_csv(req: KeywordsRequest, enrich: bool = True):
"""Export keywords to CSV format."""
try:
from src import crawler
from server import search_intelligence
import csv
from io import StringIO
from fastapi.responses import StreamingResponse
# Crawl and analyze
pages = crawler.crawl_seed(req.url, max_pages=req.max_pages)
if not isinstance(pages, list):
pages = [pages] if pages else []
report = search_intelligence.run_complete_analysis(pages, req.url, enrich_data=enrich)
# Create CSV
output = StringIO()
writer = csv.writer(output)
# Header
writer.writerow(['Keyword', 'Count', 'Density (%)', 'Volume', 'CPC ($)', 'Competition', 'Classification'])
# Primary keywords
for kw in report['keyword_results']['classification']['primary']['keywords']:
writer.writerow([
kw['kw'],
kw['count'],
kw.get('density', 'N/A'),
kw.get('volume', 'N/A'),
kw.get('cpc', 'N/A'),
kw.get('competition', 'N/A'),
'Primary'
])
# Secondary keywords
for kw in report['keyword_results']['classification']['secondary']['keywords']:
writer.writerow([
kw['kw'],
kw['count'],
kw.get('density', 'N/A'),
kw.get('volume', 'N/A'),
kw.get('cpc', 'N/A'),
kw.get('competition', 'N/A'),
'Secondary'
])
# Long-tail keywords
for kw in report['keyword_results']['classification']['long_tail']['keywords']:
writer.writerow([
kw['kw'],
kw['count'],
kw.get('density', 'N/A'),
kw.get('volume', 'N/A'),
kw.get('cpc', 'N/A'),
kw.get('competition', 'N/A'),
'Long-tail'
])
output.seek(0)
return StreamingResponse(
iter([output.getvalue()]),
media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename=keywords_{req.url.replace('https://', '').replace('/', '_')}.csv"}
)
except Exception as e:
import traceback
traceback.print_exc()
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/search/competitors')
async def api_search_competitors(req: KeywordsRequest):
try:
from src import crawler
from server import competitor_analysis
pages = crawler.crawl_seed(req.url, max_pages=req.max_pages)
competitors = competitor_analysis.detect_competitors(pages, req.url, min_mentions=1)
summary = competitor_analysis.get_competitor_summary(competitors)
return {
'ok': True,
'result': {
'competitors': competitors,
'summary': summary,
'pages_analyzed': len(pages)
}
}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.get('/api/search/gsc')
async def api_search_gsc(site: str, start: str, end: str):
try:
res = search_intel.gsc_query(site, start, end)
return {'ok': True, 'result': res}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
# ── AI Content Engine ──────────────────────────────────────────────────────────
class ArticleRequest(BaseModel):
keyword: str
lang: str = 'en'
target_site: str = ""
research_insights: list = []
competitors_content: list = []
crawl_data: dict = {}
prefer_backend: str = 'groq'
api_keys: dict = {}
class OptimizeRequest(BaseModel):
content: str
keyword: str
lang: str = 'en'
target_site: str = ""
research_insights: list = []
crawl_data: dict = {}
prefer_backend: str = 'groq'
api_keys: dict = {}
class FaqRequest(BaseModel):
topic: str
page_content: str = ''
lang: str = 'en'
target_site: str = ""
research_insights: list = []
crawl_data: dict = {}
count: int = 5
prefer_backend: str = 'groq'
api_keys: dict = {}
class SemanticRequest(BaseModel):
content: str
lang: str = 'en'
prefer_backend: str = 'groq'
api_keys: dict = {}
@app.post('/api/content/generate')
async def api_content_generate(req: ArticleRequest):
try:
from server import content_engine
result = content_engine.generate_article(
req.keyword, lang=req.lang,
target_site=req.target_site,
research_insights=req.research_insights,
competitors_content=req.competitors_content,
crawl_data=req.crawl_data,
prefer_backend=req.prefer_backend,
api_keys=req.api_keys
)
return {'ok': True, 'result': result}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/content/optimize')
async def api_content_optimize(req: OptimizeRequest):
try:
from server import content_engine
result = content_engine.optimize_content(
req.content, req.keyword, lang=req.lang,
target_site=req.target_site,
research_insights=req.research_insights,
crawl_data=req.crawl_data,
prefer_backend=req.prefer_backend,
api_keys=req.api_keys
)
return {'ok': True, 'result': result}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/content/faqs')
async def api_content_faqs(req: FaqRequest):
try:
from server import content_engine
result = content_engine.generate_faqs(
req.topic, page_content=req.page_content,
lang=req.lang, count=req.count,
target_site=req.target_site,
research_insights=req.research_insights,
crawl_data=req.crawl_data,
prefer_backend=req.prefer_backend,
api_keys=req.api_keys
)
return {'ok': True, 'result': result}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/content/semantic')
async def api_content_semantic(req: SemanticRequest):
try:
from server import content_engine
result = content_engine.semantic_optimize(
req.content, lang=req.lang,
prefer_backend=req.prefer_backend,
api_keys=req.api_keys
)
return {'ok': True, 'result': result}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
# ═══════════════════════════════════════════════════════════════════════════════
# PAID ADS MANAGEMENT MODULE
# ═══════════════════════════════════════════════════════════════════════════════
from server import ads_manager, ads_ai
class AdsConnectRequest(BaseModel):
developer_token: str
client_id: str
client_secret: str
refresh_token: str
customer_id: str
class AdsAIRequest(BaseModel):
api_keys: Optional[dict] = None
lang: Optional[str] = 'ar'
service_name: Optional[str] = 'خدمات SEO'
usp: Optional[str] = 'نتائج مضمونة في 90 يوم'
target_audience: Optional[str] = 'شركات سعودية'
class CampaignCreateRequest(BaseModel):
name: str
budget_usd: float = 5.0
target_cpa: Optional[float] = None
@app.post('/api/ads/connect')
async def api_ads_connect(req: AdsConnectRequest):
"""Save Google Ads credentials and verify they work."""
try:
credentials = {
'developer_token': req.developer_token,
'client_id': req.client_id,
'client_secret': req.client_secret,
'refresh_token': req.refresh_token,
'customer_id': req.customer_id
}
result = ads_manager.verify_google_connection(credentials)
if result.get('ok'):
ads_manager.save_ads_config(credentials)
return result
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.get('/api/ads/dashboard')
async def api_ads_dashboard(demo: bool = False, days: int = 30):
"""Return unified KPI dashboard data — uses saved credentials or demo."""
try:
credentials = {} if demo else ads_manager.load_ads_config()
campaigns = ads_manager.get_campaign_performance(credentials, days=days)
summary = ads_manager.build_ads_summary(campaigns, credentials=credentials)
return {'ok': True, 'summary': summary, 'campaigns': campaigns}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.get('/api/ads/keywords')
async def api_ads_keywords(demo: bool = False):
"""Return keyword performance with Quality Scores."""
try:
credentials = {} if demo else ads_manager.load_ads_config()
keywords = ads_manager.get_keyword_performance(credentials)
return {'ok': True, 'keywords': keywords, 'count': len(keywords)}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.get('/api/ads/search-terms')
async def api_ads_search_terms(demo: bool = False, min_clicks: int = 5):
"""Return search term analysis — converting vs. wasted spend."""
try:
credentials = {} if demo else ads_manager.load_ads_config()
data = ads_manager.get_search_terms(credentials, min_clicks=min_clicks)
return {'ok': True, 'data': data}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/ads/campaigns')
async def api_ads_create_campaign(req: CampaignCreateRequest):
"""Create a new Google Ads campaign (starts PAUSED for safety)."""
try:
credentials = ads_manager.load_ads_config()
result = ads_manager.create_campaign(
credentials, req.name, req.budget_usd, req.target_cpa
)
return result
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/ads/ai/bid-suggestions')
async def api_ads_bid_suggestions(req: AdsAIRequest):
"""AI-powered bid adjustment recommendations for each keyword."""
try:
credentials = ads_manager.load_ads_config()
keywords = ads_manager.get_keyword_performance(credentials)
suggestions = ads_ai.ai_bid_suggestion(keywords, api_keys=req.api_keys or {})
return {'ok': True, 'suggestions': suggestions, 'count': len(suggestions)}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/ads/ai/copy')
async def api_ads_generate_copy(req: AdsAIRequest):
"""Generate Arabic + English RSA ad copy using AI."""
try:
result = ads_ai.generate_ad_copy(
service_name=req.service_name or 'خدمات SEO',
usp=req.usp or 'نتائج مضمونة في 90 يوم',
target_audience=req.target_audience or 'شركات سعودية',
lang=req.lang or 'ar',
api_keys=req.api_keys or {}
)
return {'ok': True, 'copy': result}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/ads/ai/negatives')
async def api_ads_negative_keywords(req: AdsAIRequest):
"""Detect irrelevant search terms and suggest negative keywords."""
try:
credentials = ads_manager.load_ads_config()
search_terms = ads_manager.get_search_terms(credentials)
result = ads_ai.detect_negative_keywords(search_terms, api_keys=req.api_keys or {})
return {'ok': True, 'result': result}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/ads/ai/weekly-report')
async def api_ads_weekly_report(req: AdsAIRequest):
"""Generate AI-written weekly performance report in Arabic or English."""
try:
credentials = ads_manager.load_ads_config()
campaigns = ads_manager.get_campaign_performance(credentials)
keywords = ads_manager.get_keyword_performance(credentials)
search_terms = ads_manager.get_search_terms(credentials)
report = ads_ai.generate_weekly_report(
campaigns, keywords, search_terms,
api_keys=req.api_keys or {},
lang=req.lang or 'ar'
)
return {'ok': True, 'report': report}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
# ══════════════════════════════════════════════════════════════════════════════
# GEO SERVICES — 6 AI Visibility Services
# ══════════════════════════════════════════════════════════════════════════════
from server import geo_services
class GeoServiceRequest(BaseModel):
brand: str
url: Optional[str] = None
queries: Optional[List[str]] = None
competitors: Optional[List[str]] = None
brand_variants: Optional[List[str]] = None
api_keys: Optional[dict] = None
class SimulatorRequest(BaseModel):
brand: str
original_content: str
improved_content: str
test_queries: List[str]
api_keys: Optional[dict] = None
@app.post('/api/geo/visibility')
async def api_geo_visibility(req: GeoServiceRequest):
try:
queries = req.queries or geo_services.DEFAULT_QUERIES
result = geo_services.visibility_score(req.brand, queries, req.api_keys or {})
return {'ok': True, 'result': result}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/geo/recognition')
async def api_geo_recognition(req: GeoServiceRequest):
try:
queries = req.queries or geo_services.DEFAULT_QUERIES
variants = req.brand_variants or [req.brand]
result = geo_services.brand_recognition(req.brand, variants, queries, req.api_keys or {})
return {'ok': True, 'result': result}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/geo/sentiment')
async def api_geo_sentiment(req: GeoServiceRequest):
try:
queries = req.queries or geo_services.DEFAULT_QUERIES
result = geo_services.sentiment_analysis(req.brand, queries, req.api_keys or {})
return {'ok': True, 'result': result}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/geo/competitors')
async def api_geo_competitors(req: GeoServiceRequest):
try:
queries = req.queries or geo_services.DEFAULT_QUERIES
competitors = req.competitors or ['SEMrush', 'Ahrefs', 'Moz']
result = geo_services.competitor_ranking(req.brand, competitors, queries, req.api_keys or {})
return {'ok': True, 'result': result}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/geo/regional')
async def api_geo_regional(req: GeoServiceRequest):
try:
result = geo_services.geo_regional_analysis(req.brand, req.api_keys or {})
return {'ok': True, 'result': result}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/geo/fix')
async def api_geo_fix(req: GeoServiceRequest):
try:
if not req.url:
return JSONResponse({'ok': False, 'error': 'url required'}, status_code=400)
result = geo_services.fix_recommendations(req.url, req.brand, {}, req.api_keys or {})
return {'ok': True, 'result': result}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/geo/simulate')
async def api_geo_simulate(req: SimulatorRequest):
try:
result = geo_services.visibility_simulator(
req.original_content, req.improved_content,
req.test_queries, req.brand, req.api_keys or {}
)
return {'ok': True, 'result': result}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/geo/suite')
async def api_geo_suite(req: GeoServiceRequest, background_tasks: BackgroundTasks):
"""Run all 6 GEO services at once for a brand."""
try:
result = geo_services.run_full_suite(
req.brand, req.url, req.competitors, req.api_keys or {}
)
return {'ok': True, 'result': result}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.get('/geo')
@app.get('/geo_services.html')
@app.get('/geo-toolkit.html')
async def serve_geo_services():
return FileResponse(str(frontend_dir / 'geo-toolkit.html'))
@app.get('/regional.html')
async def serve_regional_dashboard():
"""Serve the Regional AI Visibility dashboard."""
return FileResponse(str(frontend_dir / 'regional.html'))
@app.get('/ads')
@app.get('/ads.html')
async def serve_ads_dashboard():
"""Serve the Paid Ads Management dashboard."""
return FileResponse(str(frontend_dir / 'ads.html'))
# ═══════════════════════════════════════════════════════════════════════════════
# ADVANCED FEATURES — Keyword Tracking, Alerts, Scheduler, Bulk, Gap, Email
# ═══════════════════════════════════════════════════════════════════════════════
from server.advanced_features import (
save_keyword_snapshot, save_geo_score_snapshot,
get_keyword_trends, get_geo_score_trends,
check_and_create_alerts, get_alerts, mark_alerts_seen,
add_scheduled_crawl, list_scheduled_crawls, delete_scheduled_crawl,
competitor_keyword_gap, bulk_enqueue, send_weekly_report,
start_scheduler, init_advanced_tables
)
try:
init_advanced_tables()
start_scheduler()
except Exception:
pass
@app.get('/api/tracking/keywords')
async def api_keyword_trends(url: str, keyword: str = None, days: int = 30):
try:
return {'ok': True, 'trends': get_keyword_trends(url, keyword, days)}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.get('/api/tracking/geo-score')
async def api_geo_score_trends(url: str, days: int = 90):
try:
return {'ok': True, 'trends': get_geo_score_trends(url, days)}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.get('/api/alerts')
async def api_get_alerts(url: str = None, unseen_only: bool = False):
try:
alerts = get_alerts(url, unseen_only)
return {'ok': True, 'alerts': alerts, 'count': len(alerts)}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/alerts/seen')
async def api_mark_alerts_seen(req: dict):
try:
mark_alerts_seen(req.get('ids', []))
return {'ok': True}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
class ScheduleRequest(BaseModel):
url: str
org_name: str = ''
org_url: str = ''
max_pages: int = 3
frequency: str = 'weekly'
@app.post('/api/schedule')
async def api_add_schedule(req: ScheduleRequest):
try:
sid = add_scheduled_crawl(req.url, req.org_name, req.org_url, req.max_pages, req.frequency)
return {'ok': True, 'schedule_id': sid}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.get('/api/schedule')
async def api_list_schedules():
try:
return {'ok': True, 'schedules': list_scheduled_crawls()}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.delete('/api/schedule/{schedule_id}')
async def api_delete_schedule(schedule_id: int):
try:
delete_scheduled_crawl(schedule_id)
return {'ok': True}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
class GapRequest(BaseModel):
your_keywords: list = []
competitor_url: str
max_pages: int = 3
@app.post('/api/competitor/gap')
async def api_competitor_gap(req: GapRequest):
try:
return {'ok': True, 'gap': competitor_keyword_gap(req.your_keywords, req.competitor_url, req.max_pages)}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
class BulkRequest(BaseModel):
urls: list
org_name: str = ''
max_pages: int = 2
@app.post('/api/bulk/crawl')
async def api_bulk_crawl(req: BulkRequest):
try:
job_ids = bulk_enqueue(req.urls, req.org_name, req.max_pages)
return {'ok': True, 'job_ids': job_ids, 'count': len(job_ids)}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
class EmailReportRequest(BaseModel):
to_email: str
url: str
@app.post('/api/reports/email')
async def api_send_email_report(req: EmailReportRequest):
try:
ok = send_weekly_report(req.to_email, req.url)
if ok:
return {'ok': True, 'message': f'تم الإرسال إلى {req.to_email}'}
return JSONResponse({'ok': False, 'error': 'فشل الإرسال — أضف SMTP_HOST و SMTP_USER و SMTP_PASS في .env'}, status_code=400)
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
_SETTINGS_PATH = Path(os.environ.get('OUTPUT_DIR', Path(__file__).resolve().parent.parent / 'output')) / 'settings.json'
def _load_settings() -> dict:
if _SETTINGS_PATH.exists():
try:
return json.loads(_SETTINGS_PATH.read_text())
except Exception:
pass
return {}
def _save_settings(data: dict):
_SETTINGS_PATH.write_text(json.dumps(data, ensure_ascii=False, indent=2))
@app.get('/api/settings')
async def api_get_settings():
s = _load_settings()
safe = {}
for k, v in s.items():
safe[k] = '***' if (v and (k.endswith('_key') or k.endswith('_KEY') or 'pass' in k.lower())) else v
return {'ok': True, 'settings': safe}
@app.post('/api/settings')
async def api_save_settings(req: dict):
try:
current = _load_settings()
for k, v in req.items():
if v and v != '***':
current[k] = v
os.environ[k.upper()] = v
_save_settings(current)
return {'ok': True}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
try:
for k, v in _load_settings().items():
if v and v != '***':
os.environ.setdefault(k.upper(), v)
except Exception:
pass
# ── Competitor Intelligence Analyzer ─────────────────────────────────────────
class CompetitorIntelRequest(BaseModel):
url: str
region: str = 'Saudi Arabia'
industry: str = ''
count: int = 7
api_keys: dict = {}
@app.post('/api/competitor/intelligence')
async def api_competitor_intelligence(req: CompetitorIntelRequest):
try:
from server.competitor_intel import analyze_competitors
result = analyze_competitors(
req.url, region=req.region,
industry=req.industry, count=req.count,
api_keys=req.api_keys
)
return {'ok': True, 'result': result}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.get('/competitor-intel.html')
@app.get('/competitor-intel')
async def serve_competitor_intel():
return FileResponse(str(frontend_dir / 'competitor-intel.html'))