| import io |
| import logging |
| import os |
| import pathlib |
| import shutil |
| import sys |
| import tempfile |
| from collections import OrderedDict |
| from contextlib import contextmanager |
| from typing import (IO, Dict, Iterable, Iterator, Mapping, Optional, Tuple, |
| Union) |
|
|
| from .parser import Binding, parse_stream |
| from .variables import parse_variables |
|
|
| |
| |
| |
| |
| StrPath = Union[str, 'os.PathLike[str]'] |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| def with_warn_for_invalid_lines(mappings: Iterator[Binding]) -> Iterator[Binding]: |
| for mapping in mappings: |
| if mapping.error: |
| logger.warning( |
| "Python-dotenv could not parse statement starting at line %s", |
| mapping.original.line, |
| ) |
| yield mapping |
|
|
|
|
| class DotEnv: |
| def __init__( |
| self, |
| dotenv_path: Optional[StrPath], |
| stream: Optional[IO[str]] = None, |
| verbose: bool = False, |
| encoding: Optional[str] = None, |
| interpolate: bool = True, |
| override: bool = True, |
| ) -> None: |
| self.dotenv_path: Optional[StrPath] = dotenv_path |
| self.stream: Optional[IO[str]] = stream |
| self._dict: Optional[Dict[str, Optional[str]]] = None |
| self.verbose: bool = verbose |
| self.encoding: Optional[str] = encoding |
| self.interpolate: bool = interpolate |
| self.override: bool = override |
|
|
| @contextmanager |
| def _get_stream(self) -> Iterator[IO[str]]: |
| if self.dotenv_path and os.path.isfile(self.dotenv_path): |
| with open(self.dotenv_path, encoding=self.encoding) as stream: |
| yield stream |
| elif self.stream is not None: |
| yield self.stream |
| else: |
| if self.verbose: |
| logger.info( |
| "Python-dotenv could not find configuration file %s.", |
| self.dotenv_path or '.env', |
| ) |
| yield io.StringIO('') |
|
|
| def dict(self) -> Dict[str, Optional[str]]: |
| """Return dotenv as dict""" |
| if self._dict: |
| return self._dict |
|
|
| raw_values = self.parse() |
|
|
| if self.interpolate: |
| self._dict = OrderedDict(resolve_variables(raw_values, override=self.override)) |
| else: |
| self._dict = OrderedDict(raw_values) |
|
|
| return self._dict |
|
|
| def parse(self) -> Iterator[Tuple[str, Optional[str]]]: |
| with self._get_stream() as stream: |
| for mapping in with_warn_for_invalid_lines(parse_stream(stream)): |
| if mapping.key is not None: |
| yield mapping.key, mapping.value |
|
|
| def set_as_environment_variables(self) -> bool: |
| """ |
| Load the current dotenv as system environment variable. |
| """ |
| if not self.dict(): |
| return False |
|
|
| for k, v in self.dict().items(): |
| if k in os.environ and not self.override: |
| continue |
| if v is not None: |
| os.environ[k] = v |
|
|
| return True |
|
|
| def get(self, key: str) -> Optional[str]: |
| """ |
| """ |
| data = self.dict() |
|
|
| if key in data: |
| return data[key] |
|
|
| if self.verbose: |
| logger.warning("Key %s not found in %s.", key, self.dotenv_path) |
|
|
| return None |
|
|
|
|
| def get_key( |
| dotenv_path: StrPath, |
| key_to_get: str, |
| encoding: Optional[str] = "utf-8", |
| ) -> Optional[str]: |
| """ |
| Get the value of a given key from the given .env. |
| |
| Returns `None` if the key isn't found or doesn't have a value. |
| """ |
| return DotEnv(dotenv_path, verbose=True, encoding=encoding).get(key_to_get) |
|
|
|
|
| @contextmanager |
| def rewrite( |
| path: StrPath, |
| encoding: Optional[str], |
| ) -> Iterator[Tuple[IO[str], IO[str]]]: |
| pathlib.Path(path).touch() |
|
|
| with tempfile.NamedTemporaryFile(mode="w", encoding=encoding, delete=False) as dest: |
| error = None |
| try: |
| with open(path, encoding=encoding) as source: |
| yield (source, dest) |
| except BaseException as err: |
| error = err |
|
|
| if error is None: |
| shutil.move(dest.name, path) |
| else: |
| os.unlink(dest.name) |
| raise error from None |
|
|
|
|
| def set_key( |
| dotenv_path: StrPath, |
| key_to_set: str, |
| value_to_set: str, |
| quote_mode: str = "always", |
| export: bool = False, |
| encoding: Optional[str] = "utf-8", |
| ) -> Tuple[Optional[bool], str, str]: |
| """ |
| Adds or Updates a key/value to the given .env |
| |
| If the .env path given doesn't exist, fails instead of risking creating |
| an orphan .env somewhere in the filesystem |
| """ |
| if quote_mode not in ("always", "auto", "never"): |
| raise ValueError(f"Unknown quote_mode: {quote_mode}") |
|
|
| quote = ( |
| quote_mode == "always" |
| or (quote_mode == "auto" and not value_to_set.isalnum()) |
| ) |
|
|
| if quote: |
| value_out = "'{}'".format(value_to_set.replace("'", "\\'")) |
| else: |
| value_out = value_to_set |
| if export: |
| line_out = f'export {key_to_set}={value_out}\n' |
| else: |
| line_out = f"{key_to_set}={value_out}\n" |
|
|
| with rewrite(dotenv_path, encoding=encoding) as (source, dest): |
| replaced = False |
| missing_newline = False |
| for mapping in with_warn_for_invalid_lines(parse_stream(source)): |
| if mapping.key == key_to_set: |
| dest.write(line_out) |
| replaced = True |
| else: |
| dest.write(mapping.original.string) |
| missing_newline = not mapping.original.string.endswith("\n") |
| if not replaced: |
| if missing_newline: |
| dest.write("\n") |
| dest.write(line_out) |
|
|
| return True, key_to_set, value_to_set |
|
|
|
|
| def unset_key( |
| dotenv_path: StrPath, |
| key_to_unset: str, |
| quote_mode: str = "always", |
| encoding: Optional[str] = "utf-8", |
| ) -> Tuple[Optional[bool], str]: |
| """ |
| Removes a given key from the given `.env` file. |
| |
| If the .env path given doesn't exist, fails. |
| If the given key doesn't exist in the .env, fails. |
| """ |
| if not os.path.exists(dotenv_path): |
| logger.warning("Can't delete from %s - it doesn't exist.", dotenv_path) |
| return None, key_to_unset |
|
|
| removed = False |
| with rewrite(dotenv_path, encoding=encoding) as (source, dest): |
| for mapping in with_warn_for_invalid_lines(parse_stream(source)): |
| if mapping.key == key_to_unset: |
| removed = True |
| else: |
| dest.write(mapping.original.string) |
|
|
| if not removed: |
| logger.warning("Key %s not removed from %s - key doesn't exist.", key_to_unset, dotenv_path) |
| return None, key_to_unset |
|
|
| return removed, key_to_unset |
|
|
|
|
| def resolve_variables( |
| values: Iterable[Tuple[str, Optional[str]]], |
| override: bool, |
| ) -> Mapping[str, Optional[str]]: |
| new_values: Dict[str, Optional[str]] = {} |
|
|
| for (name, value) in values: |
| if value is None: |
| result = None |
| else: |
| atoms = parse_variables(value) |
| env: Dict[str, Optional[str]] = {} |
| if override: |
| env.update(os.environ) |
| env.update(new_values) |
| else: |
| env.update(new_values) |
| env.update(os.environ) |
| result = "".join(atom.resolve(env) for atom in atoms) |
|
|
| new_values[name] = result |
|
|
| return new_values |
|
|
|
|
| def _walk_to_root(path: str) -> Iterator[str]: |
| """ |
| Yield directories starting from the given directory up to the root |
| """ |
| if not os.path.exists(path): |
| raise IOError('Starting path not found') |
|
|
| if os.path.isfile(path): |
| path = os.path.dirname(path) |
|
|
| last_dir = None |
| current_dir = os.path.abspath(path) |
| while last_dir != current_dir: |
| yield current_dir |
| parent_dir = os.path.abspath(os.path.join(current_dir, os.path.pardir)) |
| last_dir, current_dir = current_dir, parent_dir |
|
|
|
|
| def find_dotenv( |
| filename: str = '.env', |
| raise_error_if_not_found: bool = False, |
| usecwd: bool = False, |
| ) -> str: |
| """ |
| Search in increasingly higher folders for the given file |
| |
| Returns path to the file if found, or an empty string otherwise |
| """ |
|
|
| def _is_interactive(): |
| """ Decide whether this is running in a REPL or IPython notebook """ |
| try: |
| main = __import__('__main__', None, None, fromlist=['__file__']) |
| except ModuleNotFoundError: |
| return False |
| return not hasattr(main, '__file__') |
|
|
| if usecwd or _is_interactive() or getattr(sys, 'frozen', False): |
| |
| path = os.getcwd() |
| else: |
| |
| frame = sys._getframe() |
| current_file = __file__ |
|
|
| while frame.f_code.co_filename == current_file or not os.path.exists( |
| frame.f_code.co_filename |
| ): |
| assert frame.f_back is not None |
| frame = frame.f_back |
| frame_filename = frame.f_code.co_filename |
| path = os.path.dirname(os.path.abspath(frame_filename)) |
|
|
| for dirname in _walk_to_root(path): |
| check_path = os.path.join(dirname, filename) |
| if os.path.isfile(check_path): |
| return check_path |
|
|
| if raise_error_if_not_found: |
| raise IOError('File not found') |
|
|
| return '' |
|
|
|
|
| def load_dotenv( |
| dotenv_path: Optional[StrPath] = None, |
| stream: Optional[IO[str]] = None, |
| verbose: bool = False, |
| override: bool = False, |
| interpolate: bool = True, |
| encoding: Optional[str] = "utf-8", |
| ) -> bool: |
| """Parse a .env file and then load all the variables found as environment variables. |
| |
| Parameters: |
| dotenv_path: Absolute or relative path to .env file. |
| stream: Text stream (such as `io.StringIO`) with .env content, used if |
| `dotenv_path` is `None`. |
| verbose: Whether to output a warning the .env file is missing. |
| override: Whether to override the system environment variables with the variables |
| from the `.env` file. |
| encoding: Encoding to be used to read the file. |
| Returns: |
| Bool: True if at least one environment variable is set else False |
| |
| If both `dotenv_path` and `stream` are `None`, `find_dotenv()` is used to find the |
| .env file. |
| """ |
| if dotenv_path is None and stream is None: |
| dotenv_path = find_dotenv() |
|
|
| dotenv = DotEnv( |
| dotenv_path=dotenv_path, |
| stream=stream, |
| verbose=verbose, |
| interpolate=interpolate, |
| override=override, |
| encoding=encoding, |
| ) |
| return dotenv.set_as_environment_variables() |
|
|
|
|
| def dotenv_values( |
| dotenv_path: Optional[StrPath] = None, |
| stream: Optional[IO[str]] = None, |
| verbose: bool = False, |
| interpolate: bool = True, |
| encoding: Optional[str] = "utf-8", |
| ) -> Dict[str, Optional[str]]: |
| """ |
| Parse a .env file and return its content as a dict. |
| |
| The returned dict will have `None` values for keys without values in the .env file. |
| For example, `foo=bar` results in `{"foo": "bar"}` whereas `foo` alone results in |
| `{"foo": None}` |
| |
| Parameters: |
| dotenv_path: Absolute or relative path to the .env file. |
| stream: `StringIO` object with .env content, used if `dotenv_path` is `None`. |
| verbose: Whether to output a warning if the .env file is missing. |
| encoding: Encoding to be used to read the file. |
| |
| If both `dotenv_path` and `stream` are `None`, `find_dotenv()` is used to find the |
| .env file. |
| """ |
| if dotenv_path is None and stream is None: |
| dotenv_path = find_dotenv() |
|
|
| return DotEnv( |
| dotenv_path=dotenv_path, |
| stream=stream, |
| verbose=verbose, |
| interpolate=interpolate, |
| override=True, |
| encoding=encoding, |
| ).dict() |
|
|