Spaces:
Running
Running
| """ | |
| GEO Platform — Advanced Features Module | |
| Covers: keyword tracking, scheduled crawls, smart alerts, email reports, | |
| competitor gap analysis, bulk URL analysis. | |
| """ | |
| import os | |
| import json | |
| import sqlite3 | |
| import smtplib | |
| import threading | |
| from datetime import datetime, timedelta | |
| from pathlib import Path | |
| from email.mime.text import MIMEText | |
| from email.mime.multipart import MIMEMultipart | |
| from typing import List, Dict, Optional | |
| _OUTPUT = Path(os.environ.get('OUTPUT_DIR', Path(__file__).resolve().parent.parent / 'output')) | |
| _OUTPUT.mkdir(parents=True, exist_ok=True) | |
| DB_PATH = _OUTPUT / 'jobs.db' | |
| # ── DB helpers ──────────────────────────────────────────────────────────────── | |
| def _conn(): | |
| c = sqlite3.connect(str(DB_PATH), detect_types=sqlite3.PARSE_DECLTYPES) | |
| c.row_factory = sqlite3.Row | |
| return c | |
| def init_advanced_tables(): | |
| """Create tables for keyword tracking, alerts, and scheduled jobs.""" | |
| c = _conn() | |
| cur = c.cursor() | |
| # Keyword history | |
| cur.execute('''CREATE TABLE IF NOT EXISTS keyword_history ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| job_id INTEGER, | |
| url TEXT, | |
| keyword TEXT, | |
| count INTEGER, | |
| volume INTEGER, | |
| cpc REAL, | |
| competition TEXT, | |
| opportunity_score INTEGER, | |
| tracked_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
| )''') | |
| # GEO score history | |
| cur.execute('''CREATE TABLE IF NOT EXISTS geo_score_history ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| job_id INTEGER, | |
| url TEXT, | |
| score INTEGER, | |
| status TEXT, | |
| breakdown TEXT, | |
| tracked_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
| )''') | |
| # Smart alerts | |
| cur.execute('''CREATE TABLE IF NOT EXISTS alerts ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| url TEXT, | |
| alert_type TEXT, | |
| message TEXT, | |
| severity TEXT, | |
| seen INTEGER DEFAULT 0, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
| )''') | |
| # Scheduled crawls | |
| cur.execute('''CREATE TABLE IF NOT EXISTS scheduled_crawls ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| url TEXT, | |
| org_name TEXT, | |
| org_url TEXT, | |
| max_pages INTEGER DEFAULT 3, | |
| frequency TEXT DEFAULT 'weekly', | |
| next_run TIMESTAMP, | |
| last_run TIMESTAMP, | |
| active INTEGER DEFAULT 1, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
| )''') | |
| c.commit() | |
| c.close() | |
| # ── Keyword Tracking ────────────────────────────────────────────────────────── | |
| def save_keyword_snapshot(job_id: int, url: str, keywords: List[Dict]): | |
| """Save keyword data snapshot for trend tracking.""" | |
| init_advanced_tables() | |
| c = _conn() | |
| cur = c.cursor() | |
| now = datetime.utcnow() | |
| for kw in keywords[:50]: | |
| cur.execute('''INSERT INTO keyword_history | |
| (job_id, url, keyword, count, volume, cpc, competition, opportunity_score, tracked_at) | |
| VALUES (?,?,?,?,?,?,?,?,?)''', ( | |
| job_id, url, | |
| kw.get('kw', ''), | |
| kw.get('count', 0), | |
| kw.get('volume'), | |
| kw.get('cpc'), | |
| kw.get('competition'), | |
| kw.get('opportunity_score', 0), | |
| now | |
| )) | |
| c.commit() | |
| c.close() | |
| def save_geo_score_snapshot(job_id: int, url: str, geo_score: Dict): | |
| """Save GEO score for trend tracking.""" | |
| init_advanced_tables() | |
| c = _conn() | |
| cur = c.cursor() | |
| cur.execute('''INSERT INTO geo_score_history | |
| (job_id, url, score, status, breakdown, tracked_at) | |
| VALUES (?,?,?,?,?,?)''', ( | |
| job_id, url, | |
| geo_score.get('score', 0), | |
| geo_score.get('status', ''), | |
| json.dumps(geo_score.get('breakdown', {})), | |
| datetime.utcnow() | |
| )) | |
| c.commit() | |
| c.close() | |
| def get_keyword_trends(url: str, keyword: str = None, days: int = 30) -> List[Dict]: | |
| """Get keyword history for a URL over time.""" | |
| init_advanced_tables() | |
| c = _conn() | |
| cur = c.cursor() | |
| since = datetime.utcnow() - timedelta(days=days) | |
| if keyword: | |
| cur.execute('''SELECT * FROM keyword_history | |
| WHERE url=? AND keyword=? AND tracked_at > ? | |
| ORDER BY tracked_at ASC''', (url, keyword, since)) | |
| else: | |
| cur.execute('''SELECT keyword, MAX(volume) as volume, MAX(opportunity_score) as opp, | |
| COUNT(*) as snapshots, MAX(tracked_at) as last_seen | |
| FROM keyword_history WHERE url=? AND tracked_at > ? | |
| GROUP BY keyword ORDER BY opp DESC LIMIT 30''', (url, since)) | |
| rows = [dict(r) for r in cur.fetchall()] | |
| c.close() | |
| return rows | |
| def get_geo_score_trends(url: str, days: int = 90) -> List[Dict]: | |
| """Get GEO score history for trend chart.""" | |
| init_advanced_tables() | |
| c = _conn() | |
| cur = c.cursor() | |
| since = datetime.utcnow() - timedelta(days=days) | |
| cur.execute('''SELECT score, status, breakdown, tracked_at FROM geo_score_history | |
| WHERE url=? AND tracked_at > ? ORDER BY tracked_at ASC''', (url, since)) | |
| rows = [dict(r) for r in cur.fetchall()] | |
| c.close() | |
| return rows | |
| # ── Smart Alerts ────────────────────────────────────────────────────────────── | |
| def check_and_create_alerts(job_id: int, url: str, geo_score: Dict, prev_score: Optional[int] = None): | |
| """Automatically create alerts based on GEO score changes.""" | |
| init_advanced_tables() | |
| alerts = [] | |
| score = geo_score.get('score', 0) | |
| # Score drop alert | |
| if prev_score is not None and score < prev_score - 10: | |
| alerts.append({ | |
| 'url': url, 'alert_type': 'score_drop', | |
| 'message': f'انخفضت درجة GEO من {prev_score}% إلى {score}% — تراجع {prev_score - score} نقطة', | |
| 'severity': 'high' | |
| }) | |
| # Critical score alert | |
| if score < 30: | |
| alerts.append({ | |
| 'url': url, 'alert_type': 'critical_score', | |
| 'message': f'درجة GEO حرجة: {score}% — الموقع غير مرئي لمحركات الذكاء الاصطناعي', | |
| 'severity': 'critical' | |
| }) | |
| # Missing H1 alert | |
| breakdown = geo_score.get('breakdown', {}) | |
| if breakdown.get('headings', 20) < 5: | |
| alerts.append({ | |
| 'url': url, 'alert_type': 'missing_h1', | |
| 'message': 'عناوين H1 مفقودة أو ضعيفة — يؤثر على الفهرسة', | |
| 'severity': 'medium' | |
| }) | |
| # No AI visibility | |
| if breakdown.get('ai_visibility', 20) < 5: | |
| alerts.append({ | |
| 'url': url, 'alert_type': 'no_ai_visibility', | |
| 'message': 'لا يوجد ظهور في محركات الذكاء الاصطناعي — العلامة التجارية غير معروفة', | |
| 'severity': 'high' | |
| }) | |
| if alerts: | |
| c = _conn() | |
| cur = c.cursor() | |
| for a in alerts: | |
| cur.execute('''INSERT INTO alerts (url, alert_type, message, severity) | |
| VALUES (?,?,?,?)''', (a['url'], a['alert_type'], a['message'], a['severity'])) | |
| c.commit() | |
| c.close() | |
| return alerts | |
| def get_alerts(url: str = None, unseen_only: bool = False) -> List[Dict]: | |
| """Get alerts, optionally filtered by URL or unseen status.""" | |
| init_advanced_tables() | |
| c = _conn() | |
| cur = c.cursor() | |
| query = 'SELECT * FROM alerts WHERE 1=1' | |
| params = [] | |
| if url: | |
| query += ' AND url=?'; params.append(url) | |
| if unseen_only: | |
| query += ' AND seen=0' | |
| query += ' ORDER BY created_at DESC LIMIT 50' | |
| cur.execute(query, params) | |
| rows = [dict(r) for r in cur.fetchall()] | |
| c.close() | |
| return rows | |
| def mark_alerts_seen(alert_ids: List[int]): | |
| c = _conn() | |
| c.execute(f'UPDATE alerts SET seen=1 WHERE id IN ({",".join("?" * len(alert_ids))})', alert_ids) | |
| c.commit() | |
| c.close() | |
| # ── Scheduled Crawls ────────────────────────────────────────────────────────── | |
| def add_scheduled_crawl(url: str, org_name: str, org_url: str, | |
| max_pages: int = 3, frequency: str = 'weekly') -> int: | |
| """Schedule a recurring crawl. frequency: daily | weekly | monthly""" | |
| init_advanced_tables() | |
| freq_map = {'daily': 1, 'weekly': 7, 'monthly': 30} | |
| days = freq_map.get(frequency, 7) | |
| next_run = datetime.utcnow() + timedelta(days=days) | |
| c = _conn() | |
| cur = c.cursor() | |
| cur.execute('''INSERT INTO scheduled_crawls | |
| (url, org_name, org_url, max_pages, frequency, next_run, active) | |
| VALUES (?,?,?,?,?,?,1)''', (url, org_name, org_url, max_pages, frequency, next_run)) | |
| sid = cur.lastrowid | |
| c.commit() | |
| c.close() | |
| return sid | |
| def list_scheduled_crawls() -> List[Dict]: | |
| init_advanced_tables() | |
| c = _conn() | |
| cur = c.cursor() | |
| cur.execute('SELECT * FROM scheduled_crawls ORDER BY next_run ASC') | |
| rows = [dict(r) for r in cur.fetchall()] | |
| c.close() | |
| return rows | |
| def delete_scheduled_crawl(schedule_id: int): | |
| c = _conn() | |
| c.execute('DELETE FROM scheduled_crawls WHERE id=?', (schedule_id,)) | |
| c.commit() | |
| c.close() | |
| def run_due_scheduled_crawls(): | |
| """Called by scheduler — enqueues jobs for due scheduled crawls.""" | |
| try: | |
| from server import job_queue | |
| init_advanced_tables() | |
| c = _conn() | |
| cur = c.cursor() | |
| now = datetime.utcnow() | |
| cur.execute('SELECT * FROM scheduled_crawls WHERE active=1 AND next_run <= ?', (now,)) | |
| due = [dict(r) for r in cur.fetchall()] | |
| for s in due: | |
| # Enqueue the job | |
| job_queue.enqueue_job(s['url'], s['org_name'], s['org_url'], s['max_pages']) | |
| # Update next_run | |
| freq_map = {'daily': 1, 'weekly': 7, 'monthly': 30} | |
| days = freq_map.get(s['frequency'], 7) | |
| next_run = now + timedelta(days=days) | |
| cur.execute('UPDATE scheduled_crawls SET last_run=?, next_run=? WHERE id=?', | |
| (now, next_run, s['id'])) | |
| c.commit() | |
| c.close() | |
| except Exception as e: | |
| print(f'Scheduler error: {e}') | |
| def start_scheduler(): | |
| """Start background scheduler thread.""" | |
| def _loop(): | |
| import time | |
| while True: | |
| run_due_scheduled_crawls() | |
| time.sleep(3600) # check every hour | |
| t = threading.Thread(target=_loop, daemon=True) | |
| t.start() | |
| # ── Competitor Gap Analysis ─────────────────────────────────────────────────── | |
| def competitor_keyword_gap(your_keywords: List[str], competitor_url: str, | |
| max_pages: int = 3) -> Dict: | |
| """Find keywords competitor has that you don't.""" | |
| try: | |
| from src.crawler import crawl_seed | |
| from server.keyword_engine import extract_keywords_from_audit | |
| pages = crawl_seed(competitor_url, max_pages=max_pages) | |
| audit = {'pages': pages} | |
| comp_kws = extract_keywords_from_audit(audit, top_n=50, enrich=False) | |
| comp_set = {k.get('kw', '').lower() for k in comp_kws} | |
| your_set = {k.lower() for k in your_keywords} | |
| gaps = comp_set - your_set | |
| overlaps = comp_set & your_set | |
| return { | |
| 'competitor_url': competitor_url, | |
| 'competitor_keywords': len(comp_set), | |
| 'your_keywords': len(your_set), | |
| 'gaps': sorted(list(gaps))[:30], | |
| 'overlaps': sorted(list(overlaps))[:20], | |
| 'gap_count': len(gaps), | |
| 'opportunity_score': min(100, int((len(gaps) / max(len(comp_set), 1)) * 100)) | |
| } | |
| except Exception as e: | |
| return {'error': str(e), 'gaps': [], 'gap_count': 0} | |
| # ── Bulk URL Analysis ───────────────────────────────────────────────────────── | |
| def bulk_enqueue(urls: List[str], org_name: str = '', max_pages: int = 2) -> List[int]: | |
| """Enqueue multiple URLs as separate jobs.""" | |
| from server import job_queue | |
| job_ids = [] | |
| for url in urls[:10]: # max 10 at once | |
| url = url.strip() | |
| if not url: | |
| continue | |
| jid = job_queue.enqueue_job(url, org_name or url, url, max_pages) | |
| job_ids.append(jid) | |
| return job_ids | |
| # ── Email Reports ───────────────────────────────────────────────────────────── | |
| def send_email_report(to_email: str, subject: str, html_body: str, | |
| smtp_host: str = None, smtp_port: int = 587, | |
| smtp_user: str = None, smtp_pass: str = None) -> bool: | |
| """Send HTML email report via SMTP.""" | |
| smtp_host = smtp_host or os.getenv('SMTP_HOST', '') | |
| smtp_user = smtp_user or os.getenv('SMTP_USER', '') | |
| smtp_pass = smtp_pass or os.getenv('SMTP_PASS', '') | |
| if not smtp_host or not smtp_user: | |
| return False | |
| try: | |
| msg = MIMEMultipart('alternative') | |
| msg['Subject'] = subject | |
| msg['From'] = smtp_user | |
| msg['To'] = to_email | |
| msg.attach(MIMEText(html_body, 'html', 'utf-8')) | |
| with smtplib.SMTP(smtp_host, smtp_port) as server: | |
| server.starttls() | |
| server.login(smtp_user, smtp_pass) | |
| server.sendmail(smtp_user, to_email, msg.as_string()) | |
| return True | |
| except Exception as e: | |
| print(f'Email error: {e}') | |
| return False | |
| def send_weekly_report(to_email: str, url: str): | |
| """Build and send a weekly GEO report for a URL.""" | |
| from server.reports import build_html_report | |
| from server import job_queue | |
| # Find latest completed job for this URL | |
| jobs = job_queue.list_jobs(limit=100) | |
| job = next((j for j in jobs if j.get('url') == url and j.get('status') == 'completed'), None) | |
| if not job or not job.get('result_path'): | |
| return False | |
| html = build_html_report(job['result_path']) | |
| subject = f'تقرير GEO الأسبوعي — {url}' | |
| return send_email_report(to_email, subject, html) | |