geo-platform / server /keyword_engine.py
3v324v23's picture
initial: geo-platform full stack
5c429d4
import re
from pathlib import Path
from html import unescape
try:
from .dataforseo_client import enrich_keywords
except ImportError:
def enrich_keywords(kws, **kwargs):
return [{'kw': k, 'volume': None, 'cpc': None} for k in kws]
try:
from .keyword_analytics import analyze_keywords, format_analytics_report
except ImportError:
def analyze_keywords(kws, **kwargs):
return {'summary': {}, 'top_keywords': kws}
def format_analytics_report(analytics):
return str(analytics)
try:
import spacy
_nlp = None
try:
_nlp = spacy.load('en_core_web_sm')
except Exception:
try:
_nlp = spacy.load('xx_ent_wiki_sm')
except Exception:
_nlp = None
except Exception:
_nlp = None
_STOPWORDS = set([
'the','and','for','with','from','this','that','are','you','your','www','http','https','com','org','net','page','pages',
'about','more','shop','home','contact','search','menu','cart','login','sign','account','view','add','buy','price'
])
def _clean_html(text):
"""Strip HTML tags and decode entities."""
text = re.sub(r'<script[^>]*>.*?</script>', ' ', text, flags=re.DOTALL|re.IGNORECASE)
text = re.sub(r'<style[^>]*>.*?</style>', ' ', text, flags=re.DOTALL|re.IGNORECASE)
text = re.sub(r'<[^>]+>', ' ', text)
text = unescape(text)
return text
def extract_keywords_from_audit(audit_obj, top_n=20, enrich=True, analytics=False, expected_keywords=None):
"""Extract candidate keywords from an audit object.
Tries to use spaCy noun-chunk extraction when available, otherwise
falls back to simple regex tokenization and frequency counts. Returns
a list of dicts: {kw: <keyword>, count: <n>, volume: <int>, cpc: <float>} ordered by count desc.
Args:
audit_obj: Audit dictionary with pages
top_n: Number of keywords to return
enrich: Whether to enrich with DataForSEO volume/CPC data
analytics: Whether to return full analytics report
expected_keywords: List of expected keywords for coverage analysis
"""
pages = audit_obj.get('pages', []) if isinstance(audit_obj, dict) else []
texts = []
for p in pages:
title = p.get('title', '')
headings = ' '.join(h.get('text', '') if isinstance(h, dict) else str(h) for h in p.get('headings', []))
paras = p.get('paragraphs', [])
if isinstance(paras, list):
paras = ' '.join(par.get('text', '') if isinstance(par, dict) else str(par) for par in paras)
else:
paras = str(paras)
raw = p.get('text') or p.get('content') or p.get('html') or ''
cleaned = _clean_html(raw)
texts.append(f"{title} {title} {headings} {paras} {cleaned}")
combined = '\n'.join(texts)
if not combined.strip():
return []
# spaCy path
if _nlp is not None:
try:
doc = _nlp(combined[:100000]) # limit to avoid memory issues
candidates = []
for chunk in doc.noun_chunks:
txt = ' '.join([t.text for t in chunk if not t.is_stop]).strip().lower()
if len(txt) < 3 or txt in _STOPWORDS:
continue
candidates.append(txt)
for ent in doc.ents:
txt = ent.text.strip().lower()
if len(txt) < 3 or txt in _STOPWORDS:
continue
candidates.append(txt)
counts = {}
for c in candidates:
counts[c] = counts.get(c, 0) + 1
items = sorted(counts.items(), key=lambda x: x[1], reverse=True)
results = [{'kw': k, 'count': v} for k, v in items]
# If analytics requested, process through analytics
if analytics:
total_words = len(combined.split())
analytics_report = analyze_keywords(results, total_words=total_words, expected_keywords=expected_keywords)
# Enrich if requested
if enrich and analytics_report['all_keywords']:
all_kws = analytics_report['all_keywords']
enriched = enrich_keywords([r['kw'] for r in all_kws[:50]])
enriched_map = {e['kw']: e for e in enriched}
for r in all_kws:
if r['kw'] in enriched_map:
r.update(enriched_map[r['kw']])
for r in analytics_report['top_keywords']:
if r['kw'] in enriched_map:
r.update(enriched_map[r['kw']])
for category in ['primary', 'secondary', 'long_tail']:
for r in analytics_report['classification'][category]:
if r['kw'] in enriched_map:
r.update(enriched_map[r['kw']])
for cluster_kws in analytics_report['clusters'].values():
for r in cluster_kws:
if r['kw'] in enriched_map:
r.update(enriched_map[r['kw']])
return analytics_report
# Simple mode
results = results[:top_n]
if enrich:
enriched = enrich_keywords([r['kw'] for r in results])
enriched_map = {e['kw']: e for e in enriched}
for r in results:
if r['kw'] in enriched_map:
r.update(enriched_map[r['kw']])
return results
except Exception:
pass
# simple regex fallback (supports Arabic letters too)
words = re.findall(r"[\w\u0600-\u06FF]{3,}", combined.lower())
# extract 2-3 word phrases
phrases = re.findall(r"\b([\w\u0600-\u06FF]+(?:\s+[\w\u0600-\u06FF]+){1,2})\b", combined.lower())
counts = {}
for w in words:
if w in _STOPWORDS or len(w) < 3:
continue
counts[w] = counts.get(w, 0) + 1
for p in phrases:
if len(p) > 5 and not any(s in p for s in _STOPWORDS):
counts[p] = counts.get(p, 0) + 2 # boost phrases
items = sorted(counts.items(), key=lambda x: x[1], reverse=True)[:top_n * 2] # Get more for analytics
results = [{'kw': k, 'count': v} for k, v in items]
# Calculate total words for density
total_words = len(words)
# Run analytics if requested
if analytics:
analytics_report = analyze_keywords(results, total_words=total_words, expected_keywords=expected_keywords)
# Enrich with DataForSEO if requested
if enrich and analytics_report['all_keywords']:
# Enrich all keywords
all_kws = analytics_report['all_keywords']
enriched = enrich_keywords([r['kw'] for r in all_kws[:50]]) # Enrich top 50
enriched_map = {e['kw']: e for e in enriched}
# Update all keyword lists with enrichment data
for r in all_kws:
if r['kw'] in enriched_map:
r.update(enriched_map[r['kw']])
# Update top_keywords
for r in analytics_report['top_keywords']:
if r['kw'] in enriched_map:
r.update(enriched_map[r['kw']])
# Update classification keywords
for category in ['primary', 'secondary', 'long_tail']:
for r in analytics_report['classification'][category]:
if r['kw'] in enriched_map:
r.update(enriched_map[r['kw']])
# Update cluster keywords
for cluster_kws in analytics_report['clusters'].values():
for r in cluster_kws:
if r['kw'] in enriched_map:
r.update(enriched_map[r['kw']])
return analytics_report
# Standard flow (no analytics)
results = results[:top_n]
# Enrich with DataForSEO if requested
if enrich and results:
enriched = enrich_keywords([r['kw'] for r in results])
enriched_map = {e['kw']: e for e in enriched}
for r in results:
if r['kw'] in enriched_map:
r.update(enriched_map[r['kw']])
return results