| import os |
| import random |
| import time |
| from typing import List, Dict |
|
|
| from flask import Flask, jsonify, request, render_template |
| from flask_cors import CORS |
|
|
| import google.generativeai as genai |
| from transformers import pipeline |
|
|
| |
| |
| |
| app = Flask(__name__, static_folder="static", template_folder="templates") |
| CORS(app) |
|
|
| |
| |
| |
| GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") or os.getenv("GEMINI_API_KEY") |
| if GOOGLE_API_KEY: |
| genai.configure(api_key=GOOGLE_API_KEY) |
|
|
| |
| MAX_POSTS = 50 |
| DEFAULT_POSTS = 20 |
|
|
| |
| |
| |
| |
| SENTIMENT_MODEL = "distilbert/distilbert-base-uncased-finetuned-sst-2-english" |
| sentiment_analyzer = pipeline( |
| "sentiment-analysis", |
| model=SENTIMENT_MODEL, |
| device=-1 |
| ) |
|
|
| |
| |
| |
| def normalize_count(n: int) -> int: |
| try: |
| n = int(n) |
| except Exception: |
| n = DEFAULT_POSTS |
| n = max(1, min(MAX_POSTS, n)) |
| return n |
|
|
| def parse_sentiment(label: str, score: float) -> Dict[str, str]: |
| |
| if label.upper() == "POSITIVE": |
| sentiment = "POSITIVE" |
| elif label.upper() == "NEGATIVE": |
| sentiment = "NEGATIVE" |
| else: |
| sentiment = "NEUTRAL" |
| return {"sentiment": sentiment, "score": float(score)} |
|
|
| def compute_aggregate(rows: List[Dict]) -> Dict: |
| pos = sum(1 for r in rows if r["sentiment"] == "POSITIVE") |
| neg = sum(1 for r in rows if r["sentiment"] == "NEGATIVE") |
| neu = sum(1 for r in rows if r["sentiment"] == "NEUTRAL") |
|
|
| total = max(1, len(rows)) |
| pos_pct = round(100 * pos / total, 2) |
| neg_pct = round(100 * neg / total, 2) |
| neu_pct = round(100 * neu / total, 2) |
|
|
| |
| rolling = [] |
| score_map = {"POSITIVE": 1.0, "NEUTRAL": 0.5, "NEGATIVE": 0.0} |
| alpha = 0.2 |
| ema = 0.5 |
| for r in rows: |
| ema = alpha * score_map[r["sentiment"]] + (1 - alpha) * ema |
| rolling.append(round(ema, 3)) |
|
|
| return { |
| "counts": {"positive": pos, "negative": neg, "neutral": neu, "total": total}, |
| "percent": {"positive": pos_pct, "negative": neg_pct, "neutral": neu_pct}, |
| "rolling": rolling, |
| } |
|
|
| |
| |
| |
| FALLBACK_PATTERNS_POS = [ |
| "Absolutely loving {tag} right now! 🔥", |
| "{tag} campaign is the best thing this season 🎉", |
| "I love {tag}! It's amazing ❤️", |
| "People are talking about {tag} everywhere 🌍", |
| "Super excited about {tag} 🙌", |
| ] |
| FALLBACK_PATTERNS_NEG = [ |
| "{tag} totally failed expectations 😠", |
| "I'm disappointed with {tag} 💔", |
| "{tag} needs serious improvements…", |
| "Not impressed by {tag} this time 😕", |
| ] |
| FALLBACK_PATTERNS_NEU = [ |
| "People are discussing {tag} a lot 🤔", |
| "Not sure how I feel about {tag} yet…", |
| "{tag} is trending — thoughts?", |
| "Mixed opinions around {tag}.", |
| ] |
|
|
| def make_fallback_posts(hashtag: str, n: int) -> List[str]: |
| tag = hashtag if hashtag.startswith("#") else f"#{hashtag}" |
| posts = [] |
| for _ in range(n): |
| bucket = random.choices( |
| [FALLBACK_PATTERNS_POS, FALLBACK_PATTERNS_NEU, FALLBACK_PATTERNS_NEG], |
| weights=[0.4, 0.35, 0.25], |
| k=1 |
| )[0] |
| txt = random.choice(bucket).format(tag=tag) |
| posts.append(txt) |
| return posts |
|
|
| |
| |
| |
| def generate_with_gemini(hashtag: str, n: int) -> List[str]: |
| """ |
| Generate up to n short social posts using Gemini 2.0 Flash. |
| Returns list of strings. If API missing or error occurs, raises Exception. |
| """ |
| if not GOOGLE_API_KEY: |
| raise RuntimeError("GOOGLE_API_KEY not set") |
|
|
| model = genai.GenerativeModel("gemini-2.0-flash") |
| tag = hashtag if hashtag.startswith("#") else f"#{hashtag}" |
|
|
| prompt = f""" |
| You are generating short, natural social posts (Twitter/Instagram style) about the topic {tag}. |
| Rules: |
| - Return exactly {n} posts. |
| - One post per line. |
| - Each post under 120 characters. |
| - Use a mix of positive, neutral, and critical tones. |
| - Avoid any hate speech, harassment, or slurs. |
| - Do NOT include numbering like "1." or "-". |
| - Do NOT wrap in code blocks. |
| - Language: English. |
| |
| Output format: |
| <post 1> |
| <post 2> |
| ... |
| <post {n}> |
| """ |
|
|
| |
| tries = 2 |
| for i in range(tries): |
| try: |
| r = model.generate_content(prompt) |
| text = (r.text or "").strip() |
| if not text: |
| raise RuntimeError("Empty response from Gemini") |
|
|
| lines = [ln.strip() for ln in text.split("\n") if ln.strip()] |
| |
| if len(lines) < n: |
| |
| lines += make_fallback_posts(hashtag, n - len(lines)) |
| posts = lines[:n] |
| return posts |
| except Exception as e: |
| if i == tries - 1: |
| raise |
| time.sleep(0.8) |
|
|
| |
| |
| |
| |
| |
| @app.route("/api/analyze", methods=["POST"]) |
| def analyze(): |
| data = request.get_json(silent=True) or {} |
| hashtag = (data.get("hashtag") or "").strip() |
| count = normalize_count(data.get("count") or DEFAULT_POSTS) |
|
|
| if not hashtag: |
| return jsonify({"error": "hashtag is required"}), 400 |
|
|
| posts: List[Dict] = [] |
| gemini_count = 0 |
| fallback_count = 0 |
|
|
| |
| try: |
| gemini_posts = generate_with_gemini(hashtag, count) |
| for p in gemini_posts: |
| posts.append({"text": p, "source": "gemini"}) |
| gemini_count = len(gemini_posts) |
| except Exception: |
| fb = make_fallback_posts(hashtag, count) |
| for p in fb: |
| posts.append({"text": p, "source": "fallback"}) |
| fallback_count = len(fb) |
|
|
| |
| rows = [] |
| for p in posts: |
| res = sentiment_analyzer(p["text"])[0] |
| parsed = parse_sentiment(res["label"], res["score"]) |
| rows.append({ |
| "text": p["text"], |
| "source": p["source"], |
| "sentiment": parsed["sentiment"], |
| "score": parsed["score"], |
| }) |
|
|
| agg = compute_aggregate(rows) |
|
|
| return jsonify({ |
| "meta": { |
| "hashtag": hashtag if hashtag.startswith("#") else f"#{hashtag}", |
| "requested": count, |
| "generated_by": { |
| "gemini": gemini_count, |
| "fallback": fallback_count |
| }, |
| "model": { |
| "generation": "gemini-2.0-flash" if gemini_count > 0 else "fallback-templates", |
| "sentiment": SENTIMENT_MODEL |
| } |
| }, |
| "rows": rows, |
| "aggregate": agg |
| }), 200 |
|
|
| |
| |
| |
| @app.route("/", methods=["GET"]) |
| def home(): |
| return render_template("index.html") |
|
|
| |
| |
| |
| if __name__ == "__main__": |
| port = int(os.getenv("PORT", "7860")) |
| app.run(host="0.0.0.0", port=port, debug=False) |
|
|