| | from typing import Any, List, Dict |
| | import uuid |
| | import time |
| | from datetime import datetime |
| | from fastapi import APIRouter, HTTPException, status, Request |
| |
|
| | import asyncio |
| | from motor.motor_asyncio import AsyncIOMotorClient |
| | from bson import ObjectId |
| |
|
| | import pandas as pd |
| | from trendspy import Trends |
| |
|
| | from app.core.config import settings |
| | from app.core.clients import qstash_client |
| | from app.core.exceptions import QuotaExceededError |
| | from app.schemas.analysis_schema import ( |
| | WeeklyTrendListResponse, |
| | TrendDetailResponseSchema, |
| | OnDemandRequestSchema, |
| | OnDemandResponseSchema, |
| | JobStatusResponseSchema, |
| | ) |
| | from app.services.sentiment_service import SentimentService |
| | from app.services.youtube_service import YouTubeService |
| |
|
| | |
| | |
| | router = APIRouter(prefix=settings.API_PREFIX_TRENDS) |
| |
|
| | |
| | client = AsyncIOMotorClient(settings.MONGODB_CONNECTION_STRING) |
| | db = client[settings.DB_NAME] |
| |
|
| | |
| | |
| | print("Initializing services...") |
| | tr = Trends(request_delay=2.0) |
| | yt_service = YouTubeService(api_key=settings.YT_API_KEY) |
| | sentiment_service = SentimentService() |
| |
|
| |
|
| | async def fetch_repr_comments(entity_id): |
| | |
| | source_docs = await db.sources_youtube.find({"entity_id": entity_id}).to_list( |
| | length=None |
| | ) |
| | source_ids = [doc["_id"] for doc in source_docs] |
| |
|
| | if not source_ids: |
| | return {"positive": [], "neutral": [], "negative": []} |
| |
|
| | |
| | sentiments = ["positive", "neutral", "negative"] |
| | comment_tasks = [] |
| | limit = settings.REPRESENTATIVE_COMMENTS_LIMIT |
| | for sentiment in sentiments: |
| | task = ( |
| | db.comments_youtube.find( |
| | {"source_id": {"$in": source_ids}, "sentiment": sentiment}, |
| | {"text": 1, "author": 1, "publish_date": 1, "_id": 0}, |
| | ) |
| | .sort("publish_date", -1) |
| | .limit(limit) |
| | .to_list(length=limit) |
| | ) |
| |
|
| | comment_tasks.append(task) |
| |
|
| | results = await asyncio.gather(*comment_tasks) |
| | |
| | for sentiment_list in results: |
| | for comment in sentiment_list: |
| | if "publish_date" in comment and hasattr( |
| | comment["publish_date"], "isoformat" |
| | ): |
| | comment["publish_date"] = comment["publish_date"].isoformat() |
| |
|
| | return dict(zip(sentiments, results)) |
| |
|
| |
|
| | async def _get_full_entity_details( |
| | entity_id: ObjectId, analysis_type: str |
| | ) -> Dict[str, Any] | None: |
| | """ |
| | Fetches all detailed data for an entity. It runs the database query, |
| | interest data fetching, and comment fetching as concurrent, independent tasks. |
| | """ |
| |
|
| | async def fetch_main_data_task(): |
| | """Fetches the main analysis data from the database.""" |
| | pipeline = [ |
| | {"$match": {"entity_id": entity_id, "analysis_type": analysis_type}}, |
| | {"$sort": {"created_at": -1}}, |
| | {"$limit": 1}, |
| | { |
| | "$lookup": { |
| | "from": "entities", |
| | "localField": "entity_id", |
| | "foreignField": "_id", |
| | "as": "entity_info", |
| | } |
| | }, |
| | {"$unwind": "$entity_info"}, |
| | { |
| | "$project": { |
| | "analysis_result_id": "$_id", |
| | "_id": {"$toString": "$entity_info._id"}, |
| | "keyword": "$entity_info.keyword", |
| | "thumbnail_url": "$entity_info.thumbnail_url", |
| | "representative_video_url": "$entity_info.video_url", |
| | "analysis": "$results", |
| | "interest_over_time": "$interest_over_time", |
| | } |
| | }, |
| | ] |
| | results = await db.analysis_results.aggregate(pipeline).to_list(length=1) |
| | return results[0] if results else None |
| |
|
| | |
| | main_data_task = fetch_main_data_task() |
| | comments_task = fetch_repr_comments(entity_id) |
| |
|
| | main_data, rep_comments = await asyncio.gather(main_data_task, comments_task) |
| |
|
| | if not main_data: |
| | |
| | return None |
| |
|
| | |
| | if not main_data.get("interest_over_time"): |
| | print( |
| | f"Interest data not found in DB for '{main_data['keyword']}'. Fetching live..." |
| | ) |
| |
|
| | def blocking_interest_fetch(keyword: str): |
| | """Synchronous wrapper for the blocking trendspy call.""" |
| | df = tr.interest_over_time(keywords=[keyword], timeframe="now 7-d") |
| | if df.empty: |
| | return [] |
| | daily_df = df[[keyword]].resample("D").mean().round(0).astype(int) |
| | return [ |
| | {"date": index.strftime("%Y-%m-%d"), "value": int(row.iloc[0])} |
| | for index, row in daily_df.iterrows() |
| | ] |
| |
|
| | try: |
| | |
| | interest_data_to_cache = await asyncio.to_thread( |
| | blocking_interest_fetch, main_data["keyword"] |
| | ) |
| |
|
| | if interest_data_to_cache: |
| | main_data["interest_over_time"] = interest_data_to_cache |
| | await db.analysis_results.update_one( |
| | {"_id": main_data["analysis_result_id"]}, |
| | {"$set": {"interest_over_time": interest_data_to_cache}}, |
| | ) |
| | print( |
| | f"Successfully cached interest data for '{main_data['keyword']}'." |
| | ) |
| | except Exception as e: |
| | print(f"Could not fetch live interest data: {e}") |
| | main_data["interest_over_time"] = [] |
| |
|
| | |
| | main_data.pop("analysis_result_id", None) |
| | return {**main_data, "representative_comments": rep_comments} |
| |
|
| |
|
| | @router.get("/weekly", response_model=WeeklyTrendListResponse) |
| | async def get_weekly_trends(): |
| | """ |
| | Retrieves the latest weekly sentiment analysis results. |
| | |
| | This endpoint fetches data from the 'analysis_results' collection and |
| | joins it with the 'entities' collection to get keyword and thumbnail details. |
| | """ |
| | try: |
| | |
| | pipeline = [ |
| | |
| | {"$match": {"analysis_type": "weekly"}}, |
| | {"$sort": {"created_at": -1}}, |
| | {"$limit": settings.HOME_PAGE_ENTITIES_LIMIT}, |
| | |
| | { |
| | "$lookup": { |
| | "from": "entities", |
| | "localField": "entity_id", |
| | "foreignField": "_id", |
| | "as": "entity_info", |
| | } |
| | }, |
| | |
| | {"$unwind": "$entity_info"}, |
| | |
| | { |
| | "$project": { |
| | "_id": {"$toString": "$entity_info._id"}, |
| | "keyword": "$entity_info.keyword", |
| | "thumbnail_url": "$entity_info.thumbnail_url", |
| | "analysis": { |
| | "positive_count": "$results.positive_count", |
| | "negative_count": "$results.negative_count", |
| | "neutral_count": "$results.neutral_count", |
| | "total_comments": "$results.total_comments", |
| | }, |
| | } |
| | }, |
| | ] |
| |
|
| | results = await db.analysis_results.aggregate(pipeline).to_list(length=None) |
| | if not results: |
| | raise HTTPException(status_code=500, detail="Internal server error") |
| |
|
| | response_data = {"data": results} |
| | return response_data |
| |
|
| | except Exception as e: |
| | |
| | print(f"An error occurred: {e}") |
| | raise HTTPException(status_code=500, detail="Internal server error") |
| |
|
| |
|
| | @router.get("/{analysis_type}/{entity_id}", response_model=TrendDetailResponseSchema) |
| | async def get_trend_detail_by_type(analysis_type: str, entity_id: str): |
| | """ |
| | Retrieves detailed information for a single entity, specifying |
| | whether to fetch the 'weekly' or 'on-demand' analysis result. |
| | """ |
| | if analysis_type not in ["weekly", "on-demand"]: |
| | raise HTTPException( |
| | status_code=400, |
| | detail="Invalid analysis type. Must be 'weekly' or 'on-demand'.", |
| | ) |
| |
|
| | try: |
| | entity_obj_id = ObjectId(entity_id) |
| | except Exception: |
| | raise HTTPException(status_code=400, detail="Invalid entity ID format.") |
| |
|
| | |
| | full_details = await _get_full_entity_details(entity_obj_id, analysis_type) |
| |
|
| | if not full_details: |
| | raise HTTPException( |
| | status_code=404, |
| | detail=f"'{analysis_type}' analysis for this entity not found.", |
| | ) |
| |
|
| | return full_details |
| |
|
| |
|
| | @router.post( |
| | "/analysis/on-demand", |
| | status_code=status.HTTP_202_ACCEPTED, |
| | response_model=OnDemandResponseSchema, |
| | ) |
| | async def create_on_demand_analysis(request_data: OnDemandRequestSchema): |
| | """ |
| | Handles an on-demand analysis request. |
| | First, it checks if a recent 'weekly' analysis for the keyword exists. |
| | If yes, it returns a 'found' status with the entity_id for immediate redirection. |
| | If not, it queues a new analysis job via QStash and returns a 'queued' status. |
| | """ |
| | if not request_data.keyword or not request_data.keyword.strip(): |
| | raise HTTPException(status_code=400, detail="Keyword cannot be empty.") |
| |
|
| | |
| | keyword = request_data.keyword.lower().strip() |
| |
|
| | |
| | entity = await db.entities.find_one({"keyword": keyword}) |
| | if entity: |
| | analysis = await db.analysis_results.find_one( |
| | {"entity_id": entity["_id"], "analysis_type": "weekly"} |
| | ) |
| | if analysis: |
| | print( |
| | f"Found existing weekly analysis for '{keyword}'. Returning redirect info." |
| | ) |
| | |
| | return {"status": "found", "entity_id": str(entity["_id"])} |
| |
|
| | |
| | print(f"No weekly analysis found for '{keyword}'. Queuing a new job.") |
| |
|
| | job_id = str(uuid.uuid4()) |
| |
|
| | job_document = { |
| | "_id": job_id, |
| | "keyword": keyword, |
| | "status": "pending", |
| | "created_at": datetime.now(), |
| | "updated_at": datetime.now(), |
| | "result_id": None, |
| | } |
| | await db.on_demand_jobs.insert_one(job_document) |
| |
|
| | |
| | callback_url = f"{settings.BASE_URL}{settings.API_PREFIX}{settings.API_VERSION}{settings.API_PREFIX_TRENDS}/analysis/process-job" |
| |
|
| | print( |
| | f"Queuing job {job_id} for keyword '{keyword}' with callback to {callback_url}" |
| | ) |
| |
|
| | try: |
| | qstash_client.message.publish_json( |
| | url=callback_url, body={"keyword": keyword, "job_id": job_id}, retries=0 |
| | ) |
| | except Exception as e: |
| | |
| | await db.on_demand_jobs.update_one( |
| | {"_id": job_id}, {"$set": {"status": "failed"}} |
| | ) |
| | print(f"Error publishing to QStash: {e}") |
| | raise HTTPException(status_code=500, detail="Failed to queue analysis job.") |
| |
|
| | return {"status": "queued", "job_id": job_id} |
| |
|
| |
|
| | @router.get("/analysis/status/{job_id}", response_model=JobStatusResponseSchema) |
| | async def get_analysis_status(job_id: str): |
| | """ |
| | Checks the status of an on-demand analysis job from the 'on_demand_jobs' collection. |
| | If complete or failed, it returns the final result or an error message. |
| | """ |
| | job = await db.on_demand_jobs.find_one({"_id": job_id}) |
| |
|
| | if not job: |
| | raise HTTPException(status_code=404, detail="Job not found.") |
| |
|
| | response_data = { |
| | "_id": job["_id"], |
| | "status": job["status"], |
| | "keyword": job["keyword"], |
| | "result": None, |
| | "error_message": job.get("error_message"), |
| | } |
| |
|
| | |
| | if job["status"] == "completed" and job.get("result_id"): |
| | analysis_doc = await db.analysis_results.find_one({"_id": job["result_id"]}) |
| |
|
| | |
| | if analysis_doc and analysis_doc.get("entity_id"): |
| | |
| | entity_id = analysis_doc["entity_id"] |
| |
|
| | |
| | full_details = await _get_full_entity_details(entity_id, "on-demand") |
| | response_data["result"] = full_details |
| |
|
| | return response_data |
| |
|
| |
|
| | @router.post("/analysis/process-job", include_in_schema=False) |
| | async def process_on_demand_job(request: Request): |
| | """ |
| | A webhook endpoint called by QStash to perform the full analysis for a |
| | single keyword. It fetches data, runs sentiment analysis, and saves all |
| | results to the database. |
| | """ |
| | start = time.perf_counter() |
| |
|
| | |
| | job_data = await request.json() |
| | |
| | keyword = job_data.get("keyword") |
| | job_id = job_data.get("job_id") |
| |
|
| | if not job_id: |
| | raise HTTPException(status_code=400, detail="Job ID is missing.") |
| |
|
| | if not keyword: |
| | |
| | |
| | await db.on_demand_jobs.update_one( |
| | {"_id": job_id}, |
| | {"$set": {"status": "failed", "updated_at": datetime.now()}}, |
| | ) |
| | raise HTTPException(status_code=400, detail="Keyword is missing, job ignored.") |
| |
|
| | |
| | await db.on_demand_jobs.update_one( |
| | {"_id": job_id}, |
| | {"$set": {"status": "processing", "updated_at": datetime.now()}}, |
| | ) |
| | print(f"Processing job {job_id} for keyword: {keyword}") |
| |
|
| | try: |
| | |
| | |
| | videos = yt_service.search_videos(query_string=keyword) |
| | if not videos: |
| | error_msg: str = ( |
| | f"No videos found for on-demand keyword '{keyword}' of job {job_id}." |
| | ) |
| | print(error_msg) |
| |
|
| | |
| | await db.on_demand_jobs.update_one( |
| | {"_id": job_id}, |
| | { |
| | "$set": { |
| | "status": "failed", |
| | "error_message": error_msg, |
| | "updated_at": datetime.now(), |
| | } |
| | }, |
| | ) |
| | raise HTTPException( |
| | status_code=404, |
| | detail=error_msg, |
| | ) |
| |
|
| | comments_for_entity: List[Dict[str, Any]] = [] |
| | for video in videos: |
| | video_id = video.get("id", {}).get("videoId") |
| | snippet = video.get("snippet", {}) |
| | if not video_id or not snippet: |
| | continue |
| |
|
| | comments = yt_service.fetch_comments( |
| | video_id=video_id, limit=settings.ON_DEMAND_COMMENTS_PER_VIDEO |
| | ) |
| |
|
| | for comment in comments: |
| | comment["video_id"] = video_id |
| | comment["video_title"] = snippet.get("title") |
| | comment["video_publish_date"] = snippet.get("publishedAt") |
| | comment["video_url"] = f"https://www.youtube.com/watch?v={video_id}" |
| | comments_for_entity.extend(comments) |
| |
|
| | if ( |
| | len(comments_for_entity) >= settings.ON_DEMAND_TOTAL_COMMENTS |
| | ): |
| | break |
| |
|
| | final_comments = comments_for_entity[: settings.ON_DEMAND_TOTAL_COMMENTS] |
| | if not final_comments: |
| | error_msg = ( |
| | f"No comments found for on-demand keyword '{keyword}' of job {job_id}." |
| | ) |
| | print(error_msg) |
| |
|
| | |
| | await db.on_demand_jobs.update_one( |
| | {"_id": job_id}, |
| | { |
| | "$set": { |
| | "status": "failed", |
| | "error_message": error_msg, |
| | "updated_at": datetime.now(), |
| | } |
| | }, |
| | ) |
| | raise HTTPException(status_code=404, detail=error_msg) |
| |
|
| | |
| | print( |
| | f"Analyzing {len(final_comments)} comments in batches for job {job_id} to background thread..." |
| | ) |
| | texts_to_predict = [comment.get("text", "") for comment in final_comments] |
| | predictions = await asyncio.to_thread( |
| | sentiment_service.predict, texts_to_predict |
| | ) |
| | print( |
| | f"Successfully analyzed {len(final_comments)} comments for job {job_id}!!!" |
| | ) |
| |
|
| | |
| |
|
| | |
| | video_id = videos[0].get("id", {}).get("videoId", "") |
| | entity_video_url = f"https://www.youtube.com/watch?v={video_id}" |
| | entity_thumbnail_url = ( |
| | videos[0] |
| | .get("snippet", {}) |
| | .get("thumbnails", {}) |
| | .get("high", {}) |
| | .get("url") |
| | ) |
| |
|
| | entity_doc = await db.entities.find_one_and_update( |
| | {"keyword": keyword}, |
| | { |
| | "$set": { |
| | "thumbnail_url": entity_thumbnail_url, |
| | "video_url": entity_video_url, |
| | }, |
| | "$setOnInsert": { |
| | "keyword": keyword, |
| | "geo": settings.FETCH_TRENDS_GEO, |
| | "volume": 0, |
| | "start_date": datetime.now(), |
| | }, |
| | }, |
| | upsert=True, |
| | return_document=True, |
| | ) |
| | entity_id = entity_doc["_id"] |
| |
|
| | |
| | |
| | sentiment_counts = {"positive": 0, "negative": 0, "neutral": 0} |
| |
|
| | video_id_cache: Dict[str, ObjectId] = {} |
| | comments_to_insert: List[Dict[str, Any]] = [] |
| |
|
| | for comment_data, prediction in zip(final_comments, predictions): |
| | sentiment_label = prediction["label"].lower() |
| |
|
| | |
| | sentiment_counts[sentiment_label] += 1 |
| |
|
| | |
| | video_id = comment_data.get("video_id") |
| | source_id: ObjectId | None = video_id_cache.get(video_id) |
| | if not source_id: |
| | source_doc = await db.sources_youtube.find_one_and_update( |
| | {"video_id": video_id}, |
| | { |
| | "$set": {"entity_id": entity_id}, |
| | "$setOnInsert": { |
| | "video_id": video_id, |
| | "url": comment_data.get("video_url"), |
| | "title": comment_data.get("video_title"), |
| | "publish_date": datetime.strptime( |
| | comment_data.get("video_publish_date"), |
| | "%Y-%m-%dT%H:%M:%SZ", |
| | ), |
| | }, |
| | }, |
| | upsert=True, |
| | return_document=True, |
| | ) |
| | source_id = source_doc["_id"] |
| | video_id_cache[video_id] = source_id |
| |
|
| | |
| | comments_to_insert.append( |
| | { |
| | "source_id": source_id, |
| | "comment_id": comment_data.get("comment_id"), |
| | "text": comment_data.get("text"), |
| | "author": comment_data.get("author"), |
| | "publish_date": datetime.strptime( |
| | comment_data.get("publish_date"), "%Y-%m-%dT%H:%M:%SZ" |
| | ), |
| | "sentiment": sentiment_label, |
| | } |
| | ) |
| |
|
| | |
| | if comments_to_insert: |
| | await db.comments_youtube.insert_many(comments_to_insert) |
| |
|
| | |
| | analysis_result_doc = await db.analysis_results.find_one_and_update( |
| | {"entity_id": entity_id, "analysis_type": "on-demand"}, |
| | { |
| | "$inc": { |
| | "results.positive_count": sentiment_counts["positive"], |
| | "results.negative_count": sentiment_counts["negative"], |
| | "results.neutral_count": sentiment_counts["neutral"], |
| | "results.total_comments": len(final_comments), |
| | }, |
| | "$setOnInsert": { |
| | "entity_id": entity_id, |
| | "analysis_type": "on-demand", |
| | "created_at": datetime.now(), |
| | "status": "processing", |
| | "interest_over_time": [], |
| | }, |
| | }, |
| | upsert=True, |
| | return_document=True, |
| | ) |
| | result_id = analysis_result_doc["_id"] |
| |
|
| | |
| | await db.on_demand_jobs.update_one( |
| | {"_id": job_id}, |
| | { |
| | "$set": { |
| | "status": "completed", |
| | "result_id": result_id, |
| | "updated_at": datetime.now(), |
| | } |
| | }, |
| | ) |
| | except QuotaExceededError as e: |
| | error_msg = str(e) |
| | print(f"Quota exceeded for job {job_id}: {error_msg}") |
| | await db.on_demand_jobs.update_one( |
| | {"_id": job_id}, |
| | { |
| | "$set": { |
| | "status": "failed", |
| | "error_message": error_msg, |
| | "updated_at": datetime.now(), |
| | } |
| | }, |
| | ) |
| |
|
| | |
| | raise HTTPException( |
| | status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail=error_msg |
| | ) |
| | except Exception as e: |
| | |
| | error_msg = str(e) |
| | print(f"An error occurred processing job {job_id}: {error_msg}") |
| | await db.on_demand_jobs.update_one( |
| | {"_id": job_id}, |
| | { |
| | "$set": { |
| | "status": "failed", |
| | "error_message": error_msg, |
| | "updated_at": datetime.now(), |
| | } |
| | }, |
| | ) |
| |
|
| | |
| | raise HTTPException( |
| | status_code=500, detail="An internal processing error occurred." |
| | ) |
| |
|
| | end = time.perf_counter() |
| | print( |
| | f"Successfully processed and saved analysis for job {job_id} in {end-start:.6f}" |
| | ) |
| | return {"message": f"Job {job_id} for '{keyword}' processed successfully."} |
| |
|