CodexFlow_TM / public_data_ingest.py
LordXido's picture
Create public_data_ingest.py
c165904 verified
raw
history blame contribute delete
870 Bytes
import requests
from public_data_sources import WORLD_BANK_BASE, INDICATORS
def fetch_world_bank_indicator(
country="WLD",
indicator="NY.GDP.MKTP.CD",
year="2022"
):
url = (
f"{WORLD_BANK_BASE}/country/{country}/indicator/"
f"{indicator}?format=json&per_page=1&date={year}"
)
try:
response = requests.get(url, timeout=10)
data = response.json()
value = data[1][0]["value"]
return value
except Exception:
return None
def fetch_macro_anchor():
return {
"global_gdp": fetch_world_bank_indicator(
indicator=INDICATORS["GDP"]
),
"global_inflation": fetch_world_bank_indicator(
indicator=INDICATORS["INFLATION"]
),
"population": fetch_world_bank_indicator(
indicator=INDICATORS["POPULATION"]
),
}