| from celery_worker import celery |
| from core.database import SessionLocal |
| from models.analysis_job import AnalysisJob |
| from tools.data_tools import get_stock_data |
| from tools.news_tools import get_combined_news_and_sentiment |
| from tools.analyst_tools import get_llm_analysis |
| from uuid import UUID |
| import json |
|
|
| @celery.task |
| def run_full_analysis(job_id: str, ticker: str): |
| """ |
| The single, main task that orchestrates the entire analysis pipeline. |
| """ |
| print(f"\n--- [START] Full Analysis for Job ID: {job_id} ---") |
|
|
| db = SessionLocal() |
| job = db.query(AnalysisJob).filter(AnalysisJob.id == UUID(job_id)).first() |
|
|
| if not job: |
| print(f"Job {job_id} not found. Aborting.") |
| db.close() |
| return |
|
|
| try: |
| |
| print(f"Stage 1: DATA_FETCHING for job {job_id}") |
| job.status = "DATA_FETCHING" |
| db.commit() |
| |
| data_result = get_stock_data(ticker) |
| if "error" in data_result: |
| raise ValueError(f"Data fetching failed: {data_result['error']}") |
| |
| company_name = data_result.get("company_name", ticker) |
| |
| job.result = data_result |
| db.commit() |
| print("-> Data fetching stage complete.") |
|
|
| |
| print(f"Stage 2: INTELLIGENCE_GATHERING for job {job_id}") |
| job.status = "INTELLIGENCE_GATHERING" |
| db.commit() |
| |
| intelligence_result = get_combined_news_and_sentiment(ticker, company_name) |
| |
| current_result = dict(job.result) |
| current_result['intelligence_briefing'] = intelligence_result |
| job.result = current_result |
| db.commit() |
| print("-> Intelligence gathering stage complete.") |
| |
| |
| print(f"Stage 3: ANALYZING for job {job_id}") |
| job.status = "ANALYZING" |
| db.commit() |
|
|
| |
| db.refresh(job) |
| data_for_llm = job.result |
| |
| llm_result = get_llm_analysis(ticker, company_name, data_for_llm.get("intelligence_briefing", {})) |
| if "error" in llm_result: |
| raise ValueError(f"LLM analysis failed: {llm_result['error']}") |
| |
| |
| print("Finalizing results for job {job_id}") |
| final_result_data = dict(job.result) |
| final_result_data['llm_analysis'] = llm_result |
| |
| job.result = final_result_data |
| job.status = "SUCCESS" |
| db.commit() |
| |
| print(f"--- [SUCCESS] Full analysis for {job_id} complete. ---") |
|
|
| except Exception as e: |
| error_message = str(e) |
| print(f"!!! [FAILURE] Full analysis for {job_id} FAILED: {error_message}") |
| if job: |
| job.status = "FAILED" |
| |
| user_friendly_error = f"Analysis failed for ticker '{ticker}'. This stock may not be listed or there was a problem fetching its data. Please check the ticker symbol and try again. (Details: {error_message})" |
| |
| error_data = job.result if job.result else {} |
| error_data['error'] = user_friendly_error |
| job.result = error_data |
| db.commit() |
| finally: |
| db.close() |