| import requests |
| import os |
| from dotenv import load_dotenv |
| from typing import Dict, List, Optional, Any |
| import logging |
|
|
| load_dotenv() |
|
|
| logging.basicConfig( |
| level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" |
| ) |
| logger = logging.getLogger(__name__) |
|
|
| FMP_API_KEY = os.getenv("FMP_API_KEY") |
| ALPHAVANTAGE_API_KEY = os.getenv("ALPHAVANTAGE_API_KEY") |
|
|
| FMP_BASE_URL = "https://financialmodelingprep.com/api/v3" |
| ALPHAVANTAGE_BASE_URL = "https://www.alphavantage.co/query" |
|
|
|
|
| class DataIngestionError(Exception): |
| """Custom exception for data ingestion API errors.""" |
|
|
| pass |
|
|
|
|
| class FMPFetchError(DataIngestionError): |
| """Specific error for FMP fetching issues.""" |
|
|
| pass |
|
|
|
|
| class AVFetchError(DataIngestionError): |
| """Specific error for AlphaVantage fetching issues.""" |
|
|
| pass |
|
|
|
|
| def _fetch_from_fmp(ticker: str, api_key: str) -> Dict[str, Dict[str, Any]]: |
| """Internal function to fetch data from FMP. Uses /historical-price-full/ as recommended.""" |
|
|
| endpoint = f"{FMP_BASE_URL}/historical-price-full/{ticker}" |
| params = {"apikey": api_key} |
| logger.info( |
| f"Fetching historical daily data for {ticker} from FMP (using /historical-price-full/)." |
| ) |
| try: |
| response = requests.get(endpoint, params=params, timeout=30) |
| response.raise_for_status() |
| data = response.json() |
|
|
| if isinstance(data, dict): |
|
|
| if "Error Message" in data: |
| raise FMPFetchError( |
| f"FMP API returned error for {ticker}: {data['Error Message']}" |
| ) |
| if data.get("symbol") and "historical" in data: |
| historical_data_list = data.get("historical") |
|
|
| if isinstance(historical_data_list, list): |
| if not historical_data_list: |
| logger.warning( |
| f"FMP API returned empty historical data list for {ticker} (from /historical-price-full/)." |
| ) |
| return {} |
|
|
| prices_dict: Dict[str, Dict[str, Any]] = {} |
| for record in historical_data_list: |
| if isinstance(record, dict) and "date" in record: |
| prices_dict[record["date"]] = record |
| else: |
| logger.warning( |
| f"Skipping invalid FMP record format for {ticker}: {record}" |
| ) |
| logger.info( |
| f"Successfully fetched and formatted {len(prices_dict)} historical records for {ticker} from FMP." |
| ) |
| return prices_dict |
| else: |
| raise FMPFetchError( |
| f"FMP API historical data for {ticker} has unexpected 'historical' type: {type(historical_data_list)}" |
| ) |
| else: |
| raise FMPFetchError( |
| f"FMP API response for {ticker} (from /historical-price-full/) missing expected structure (symbol/historical keys). Response: {str(data)[:200]}" |
| ) |
|
|
| elif isinstance(data, list): |
| if not data: |
| logger.warning( |
| f"FMP API returned empty list for {ticker} (from /historical-price-full/)." |
| ) |
| return {} |
| if isinstance(data[0], dict) and ( |
| "Error Message" in data[0] or "error" in data[0] |
| ): |
| error_msg = data[0].get( |
| "Error Message", data[0].get("error", "Unknown error in list") |
| ) |
| raise FMPFetchError( |
| f"FMP API returned error list for {ticker}: {error_msg}" |
| ) |
| else: |
| raise FMPFetchError( |
| f"FMP API returned unexpected top-level list structure for {ticker} (from /historical-price-full/). Response: {str(data)[:200]}" |
| ) |
| else: |
| raise FMPFetchError( |
| f"FMP API returned unexpected response type for {ticker} (from /historical-price-full/): {type(data)}. Response: {str(data)[:200]}" |
| ) |
|
|
| except requests.exceptions.RequestException as e: |
| raise FMPFetchError(f"FMP data fetch (network) failed for {ticker}: {e}") |
| except Exception as e: |
| raise FMPFetchError( |
| f"FMP data fetch (processing) failed for {ticker}: {e}. Response: {str(locals().get('data', 'N/A'))[:200]}" |
| ) |
|
|
|
|
| def _fetch_from_alphavantage(ticker: str, api_key: str) -> Dict[str, Dict[str, Any]]: |
| """Internal function to fetch data from AlphaVantage.""" |
| endpoint = f"{ALPHAVANTAGE_BASE_URL}/query" |
| params = { |
| "function": "TIME_SERIES_DAILY_ADJUSTED", |
| "symbol": ticker, |
| "apikey": api_key, |
| "outputsize": "compact", |
| } |
| logger.info(f"Fetching historical daily data for {ticker} from AlphaVantage.") |
| try: |
| response = requests.get(endpoint, params=params, timeout=30) |
| response.raise_for_status() |
| data = response.json() |
|
|
| if not isinstance(data, dict): |
| raise AVFetchError( |
| f"AlphaVantage API returned unexpected response type for {ticker}: {type(data)}. Expected dict. Response: {str(data)[:200]}" |
| ) |
|
|
| if "Error Message" in data: |
| raise AVFetchError( |
| f"AlphaVantage API returned error for {ticker}: {data['Error Message']}" |
| ) |
| if "Note" in data: |
| logger.warning( |
| f"AlphaVantage API returned note for {ticker}: {data['Note']} - treating as no data." |
| ) |
|
|
| return {} |
|
|
| time_series_data = data.get("Time Series (Daily)") |
|
|
| if time_series_data is None: |
|
|
| if not data: |
| logger.warning( |
| f"AlphaVantage API returned an empty dictionary for {ticker}." |
| ) |
| return {} |
| else: |
| raise AVFetchError( |
| f"AlphaVantage API response for {ticker} missing 'Time Series (Daily)' key. Response: {str(data)[:200]}" |
| ) |
|
|
| if not isinstance(time_series_data, dict): |
| raise AVFetchError( |
| f"AlphaVantage API 'Time Series (Daily)' for {ticker} is not a dictionary. Type: {type(time_series_data)}. Response: {str(data)[:200]}" |
| ) |
|
|
| if not time_series_data: |
| logger.warning( |
| f"AlphaVantage API returned empty time series data for {ticker}." |
| ) |
| return {} |
|
|
| prices_dict: Dict[str, Dict[str, Any]] = {} |
| for date_str, values_dict in time_series_data.items(): |
| if isinstance(values_dict, dict): |
| cleaned_values: Dict[str, Any] = {} |
| if "1. open" in values_dict: |
| cleaned_values["open"] = values_dict["1. open"] |
| if "2. high" in values_dict: |
| cleaned_values["high"] = values_dict["2. high"] |
| if "3. low" in values_dict: |
| cleaned_values["low"] = values_dict["3. low"] |
| if "4. close" in values_dict: |
| cleaned_values["close"] = values_dict["4. close"] |
| if "5. adjusted close" in values_dict: |
| cleaned_values["adjClose"] = values_dict["5. adjusted close"] |
| if "6. volume" in values_dict: |
| cleaned_values["volume"] = values_dict["6. volume"] |
|
|
| if cleaned_values: |
| prices_dict[date_str] = cleaned_values |
| else: |
| logger.warning( |
| f"AlphaVantage data for {ticker} on {date_str} missing expected price keys within daily record." |
| ) |
| else: |
| logger.warning( |
| f"Skipping invalid AlphaVantage daily record (not a dict) for {ticker} on {date_str}: {values_dict}" |
| ) |
| logger.info( |
| f"Successfully fetched and formatted {len(prices_dict)} historical records for {ticker} from AlphaVantage." |
| ) |
| return prices_dict |
|
|
| except requests.exceptions.RequestException as e: |
| raise AVFetchError( |
| f"AlphaVantage data fetch (network) failed for {ticker}: {e}" |
| ) |
| except Exception as e: |
| raise AVFetchError( |
| f"AlphaVantage data fetch (processing) failed for {ticker}: {e}. Response: {str(locals().get('data', 'N/A'))[:200]}" |
| ) |
|
|
|
|
| def get_daily_adjusted_prices(ticker: str) -> Dict[str, Dict[str, Any]]: |
| """ |
| Fetches historical daily adjusted prices for a single ticker. |
| Tries FMP first if key is available. If FMP fails, tries AlphaVantage if key is available. |
| Returns a dictionary mapping date strings to price dictionaries. |
| Raises DataIngestionError if no keys are configured or if both APIs fail. |
| """ |
| fmp_key_available = bool(FMP_API_KEY) |
| av_key_available = bool(ALPHAVANTAGE_API_KEY) |
|
|
| if not fmp_key_available and not av_key_available: |
| raise DataIngestionError( |
| "No API keys configured for historical price data (FMP, AlphaVantage)." |
| ) |
|
|
| fmp_error_detail = None |
| av_error_detail = None |
| data_from_fmp = {} |
| data_from_av = {} |
|
|
| if fmp_key_available: |
| try: |
| data_from_fmp = _fetch_from_fmp(ticker, FMP_API_KEY) |
| if data_from_fmp: |
| return data_from_fmp |
| else: |
|
|
| fmp_error_detail = f"FMP API returned no data for {ticker}." |
| logger.warning(fmp_error_detail) |
| except FMPFetchError as e: |
| fmp_error_detail = str(e) |
| logger.error(f"FMPFetchError for {ticker}: {fmp_error_detail}") |
| except Exception as e: |
| fmp_error_detail = ( |
| f"An unexpected error occurred during FMP fetch for {ticker}: {e}" |
| ) |
| logger.error(fmp_error_detail) |
|
|
| if av_key_available: |
| try: |
| data_from_av = _fetch_from_alphavantage(ticker, ALPHAVANTAGE_API_KEY) |
| if data_from_av: |
| return data_from_av |
| else: |
|
|
| av_error_detail = f"AlphaVantage API returned no data for {ticker}." |
| logger.warning(av_error_detail) |
| except AVFetchError as e: |
| av_error_detail = str(e) |
| logger.error(f"AVFetchError for {ticker}: {av_error_detail}") |
| except Exception as e: |
| av_error_detail = f"An unexpected error occurred during AlphaVantage fetch for {ticker}: {e}" |
| logger.error(av_error_detail) |
|
|
| error_messages = [] |
| if fmp_key_available: |
| if fmp_error_detail: |
| error_messages.append(f"FMP: {fmp_error_detail}") |
| elif not data_from_fmp: |
| error_messages.append(f"FMP: Returned no data for {ticker}.") |
|
|
| if av_key_available: |
| if av_error_detail: |
| error_messages.append(f"AlphaVantage: {av_error_detail}") |
| elif not data_from_av: |
| error_messages.append(f"AlphaVantage: Returned no data for {ticker}.") |
|
|
| providers_tried = [] |
| if fmp_key_available: |
| providers_tried.append("FMP") |
| if av_key_available: |
| providers_tried.append("AlphaVantage") |
|
|
| final_message = f"Failed to fetch historical data for {ticker} after trying {', '.join(providers_tried) if providers_tried else 'available providers'}." |
| if error_messages: |
| final_message += " Details: " + "; ".join(error_messages) |
| else: |
| final_message += " No data was returned from any attempted source." |
|
|
| raise DataIngestionError(final_message) |
|
|