| import importlib |
| import sys |
| from dataclasses import dataclass |
| from logging import getLogger |
| from pathlib import Path |
| from typing import Union |
|
|
| from rich import print |
| from rich.padding import Padding |
| from rich.panel import Panel |
| from rich.syntax import Syntax |
| from rich.tree import Tree |
|
|
| from fastapi_cli.exceptions import FastAPICLIException |
|
|
| logger = getLogger(__name__) |
|
|
| try: |
| from fastapi import FastAPI |
| except ImportError: |
| FastAPI = None |
|
|
|
|
| def get_default_path() -> Path: |
| path = Path("main.py") |
| if path.is_file(): |
| return path |
| path = Path("app.py") |
| if path.is_file(): |
| return path |
| path = Path("api.py") |
| if path.is_file(): |
| return path |
| path = Path("app/main.py") |
| if path.is_file(): |
| return path |
| path = Path("app/app.py") |
| if path.is_file(): |
| return path |
| path = Path("app/api.py") |
| if path.is_file(): |
| return path |
| raise FastAPICLIException( |
| "Could not find a default file to run, please provide an explicit path" |
| ) |
|
|
|
|
| @dataclass |
| class ModuleData: |
| module_import_str: str |
| extra_sys_path: Path |
|
|
|
|
| def get_module_data_from_path(path: Path) -> ModuleData: |
| logger.info( |
| "Searching for package file structure from directories with [blue]__init__.py[/blue] files" |
| ) |
| use_path = path.resolve() |
| module_path = use_path |
| if use_path.is_file() and use_path.stem == "__init__": |
| module_path = use_path.parent |
| module_paths = [module_path] |
| extra_sys_path = module_path.parent |
| for parent in module_path.parents: |
| init_path = parent / "__init__.py" |
| if init_path.is_file(): |
| module_paths.insert(0, parent) |
| extra_sys_path = parent.parent |
| else: |
| break |
| logger.info(f"Importing from {extra_sys_path.resolve()}") |
| root = module_paths[0] |
| name = f"π {root.name}" if root.is_file() else f"π {root.name}" |
| root_tree = Tree(name) |
| if root.is_dir(): |
| root_tree.add("[dim]π __init__.py[/dim]") |
| tree = root_tree |
| for sub_path in module_paths[1:]: |
| sub_name = ( |
| f"π {sub_path.name}" if sub_path.is_file() else f"π {sub_path.name}" |
| ) |
| tree = tree.add(sub_name) |
| if sub_path.is_dir(): |
| tree.add("[dim]π __init__.py[/dim]") |
| title = "[b green]Python module file[/b green]" |
| if len(module_paths) > 1 or module_path.is_dir(): |
| title = "[b green]Python package file structure[/b green]" |
| panel = Padding( |
| Panel( |
| root_tree, |
| title=title, |
| expand=False, |
| padding=(1, 2), |
| ), |
| 1, |
| ) |
| print(panel) |
| module_str = ".".join(p.stem for p in module_paths) |
| logger.info(f"Importing module [green]{module_str}[/green]") |
| return ModuleData( |
| module_import_str=module_str, extra_sys_path=extra_sys_path.resolve() |
| ) |
|
|
|
|
| def get_app_name(*, mod_data: ModuleData, app_name: Union[str, None] = None) -> str: |
| try: |
| mod = importlib.import_module(mod_data.module_import_str) |
| except (ImportError, ValueError) as e: |
| logger.error(f"Import error: {e}") |
| logger.warning( |
| "Ensure all the package directories have an [blue]__init__.py[/blue] file" |
| ) |
| raise |
| if not FastAPI: |
| raise FastAPICLIException( |
| "Could not import FastAPI, try running 'pip install fastapi'" |
| ) from None |
| object_names = dir(mod) |
| object_names_set = set(object_names) |
| if app_name: |
| if app_name not in object_names_set: |
| raise FastAPICLIException( |
| f"Could not find app name {app_name} in {mod_data.module_import_str}" |
| ) |
| app = getattr(mod, app_name) |
| if not isinstance(app, FastAPI): |
| raise FastAPICLIException( |
| f"The app name {app_name} in {mod_data.module_import_str} doesn't seem to be a FastAPI app" |
| ) |
| return app_name |
| for preferred_name in ["app", "api"]: |
| if preferred_name in object_names_set: |
| obj = getattr(mod, preferred_name) |
| if isinstance(obj, FastAPI): |
| return preferred_name |
| for name in object_names: |
| obj = getattr(mod, name) |
| if isinstance(obj, FastAPI): |
| return name |
| raise FastAPICLIException("Could not find FastAPI app in module, try using --app") |
|
|
|
|
| def get_import_string( |
| *, path: Union[Path, None] = None, app_name: Union[str, None] = None |
| ) -> str: |
| if not path: |
| path = get_default_path() |
| logger.info(f"Using path [blue]{path}[/blue]") |
| logger.info(f"Resolved absolute path {path.resolve()}") |
| if not path.exists(): |
| raise FastAPICLIException(f"Path does not exist {path}") |
| mod_data = get_module_data_from_path(path) |
| sys.path.insert(0, str(mod_data.extra_sys_path)) |
| use_app_name = get_app_name(mod_data=mod_data, app_name=app_name) |
| import_example = Syntax( |
| f"from {mod_data.module_import_str} import {use_app_name}", "python" |
| ) |
| import_panel = Padding( |
| Panel( |
| import_example, |
| title="[b green]Importable FastAPI app[/b green]", |
| expand=False, |
| padding=(1, 2), |
| ), |
| 1, |
| ) |
| logger.info("Found importable FastAPI app") |
| print(import_panel) |
| import_string = f"{mod_data.module_import_str}:{use_app_name}" |
| logger.info(f"Using import string [b green]{import_string}[/b green]") |
| return import_string |
|
|