|
import io |
|
import logging |
|
import os |
|
import pathlib |
|
import shutil |
|
import sys |
|
import tempfile |
|
from collections import OrderedDict |
|
from contextlib import contextmanager |
|
from typing import (IO, Dict, Iterable, Iterator, Mapping, Optional, Tuple, |
|
Union) |
|
|
|
from .parser import Binding, parse_stream |
|
from .variables import parse_variables |
|
|
|
|
|
|
|
|
|
|
|
StrPath = Union[str, 'os.PathLike[str]'] |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
def with_warn_for_invalid_lines(mappings: Iterator[Binding]) -> Iterator[Binding]: |
|
for mapping in mappings: |
|
if mapping.error: |
|
logger.warning( |
|
"Python-dotenv could not parse statement starting at line %s", |
|
mapping.original.line, |
|
) |
|
yield mapping |
|
|
|
|
|
class DotEnv: |
|
def __init__( |
|
self, |
|
dotenv_path: Optional[StrPath], |
|
stream: Optional[IO[str]] = None, |
|
verbose: bool = False, |
|
encoding: Optional[str] = None, |
|
interpolate: bool = True, |
|
override: bool = True, |
|
) -> None: |
|
self.dotenv_path: Optional[StrPath] = dotenv_path |
|
self.stream: Optional[IO[str]] = stream |
|
self._dict: Optional[Dict[str, Optional[str]]] = None |
|
self.verbose: bool = verbose |
|
self.encoding: Optional[str] = encoding |
|
self.interpolate: bool = interpolate |
|
self.override: bool = override |
|
|
|
@contextmanager |
|
def _get_stream(self) -> Iterator[IO[str]]: |
|
if self.dotenv_path and os.path.isfile(self.dotenv_path): |
|
with open(self.dotenv_path, encoding=self.encoding) as stream: |
|
yield stream |
|
elif self.stream is not None: |
|
yield self.stream |
|
else: |
|
if self.verbose: |
|
logger.info( |
|
"Python-dotenv could not find configuration file %s.", |
|
self.dotenv_path or '.env', |
|
) |
|
yield io.StringIO('') |
|
|
|
def dict(self) -> Dict[str, Optional[str]]: |
|
"""Return dotenv as dict""" |
|
if self._dict: |
|
return self._dict |
|
|
|
raw_values = self.parse() |
|
|
|
if self.interpolate: |
|
self._dict = OrderedDict(resolve_variables(raw_values, override=self.override)) |
|
else: |
|
self._dict = OrderedDict(raw_values) |
|
|
|
return self._dict |
|
|
|
def parse(self) -> Iterator[Tuple[str, Optional[str]]]: |
|
with self._get_stream() as stream: |
|
for mapping in with_warn_for_invalid_lines(parse_stream(stream)): |
|
if mapping.key is not None: |
|
yield mapping.key, mapping.value |
|
|
|
def set_as_environment_variables(self) -> bool: |
|
""" |
|
Load the current dotenv as system environment variable. |
|
""" |
|
if not self.dict(): |
|
return False |
|
|
|
for k, v in self.dict().items(): |
|
if k in os.environ and not self.override: |
|
continue |
|
if v is not None: |
|
os.environ[k] = v |
|
|
|
return True |
|
|
|
def get(self, key: str) -> Optional[str]: |
|
""" |
|
""" |
|
data = self.dict() |
|
|
|
if key in data: |
|
return data[key] |
|
|
|
if self.verbose: |
|
logger.warning("Key %s not found in %s.", key, self.dotenv_path) |
|
|
|
return None |
|
|
|
|
|
def get_key( |
|
dotenv_path: StrPath, |
|
key_to_get: str, |
|
encoding: Optional[str] = "utf-8", |
|
) -> Optional[str]: |
|
""" |
|
Get the value of a given key from the given .env. |
|
|
|
Returns `None` if the key isn't found or doesn't have a value. |
|
""" |
|
return DotEnv(dotenv_path, verbose=True, encoding=encoding).get(key_to_get) |
|
|
|
|
|
@contextmanager |
|
def rewrite( |
|
path: StrPath, |
|
encoding: Optional[str], |
|
) -> Iterator[Tuple[IO[str], IO[str]]]: |
|
pathlib.Path(path).touch() |
|
|
|
with tempfile.NamedTemporaryFile(mode="w", encoding=encoding, delete=False) as dest: |
|
error = None |
|
try: |
|
with open(path, encoding=encoding) as source: |
|
yield (source, dest) |
|
except BaseException as err: |
|
error = err |
|
|
|
if error is None: |
|
shutil.move(dest.name, path) |
|
else: |
|
os.unlink(dest.name) |
|
raise error from None |
|
|
|
|
|
def set_key( |
|
dotenv_path: StrPath, |
|
key_to_set: str, |
|
value_to_set: str, |
|
quote_mode: str = "always", |
|
export: bool = False, |
|
encoding: Optional[str] = "utf-8", |
|
) -> Tuple[Optional[bool], str, str]: |
|
""" |
|
Adds or Updates a key/value to the given .env |
|
|
|
If the .env path given doesn't exist, fails instead of risking creating |
|
an orphan .env somewhere in the filesystem |
|
""" |
|
if quote_mode not in ("always", "auto", "never"): |
|
raise ValueError(f"Unknown quote_mode: {quote_mode}") |
|
|
|
quote = ( |
|
quote_mode == "always" |
|
or (quote_mode == "auto" and not value_to_set.isalnum()) |
|
) |
|
|
|
if quote: |
|
value_out = "'{}'".format(value_to_set.replace("'", "\\'")) |
|
else: |
|
value_out = value_to_set |
|
if export: |
|
line_out = f'export {key_to_set}={value_out}\n' |
|
else: |
|
line_out = f"{key_to_set}={value_out}\n" |
|
|
|
with rewrite(dotenv_path, encoding=encoding) as (source, dest): |
|
replaced = False |
|
missing_newline = False |
|
for mapping in with_warn_for_invalid_lines(parse_stream(source)): |
|
if mapping.key == key_to_set: |
|
dest.write(line_out) |
|
replaced = True |
|
else: |
|
dest.write(mapping.original.string) |
|
missing_newline = not mapping.original.string.endswith("\n") |
|
if not replaced: |
|
if missing_newline: |
|
dest.write("\n") |
|
dest.write(line_out) |
|
|
|
return True, key_to_set, value_to_set |
|
|
|
|
|
def unset_key( |
|
dotenv_path: StrPath, |
|
key_to_unset: str, |
|
quote_mode: str = "always", |
|
encoding: Optional[str] = "utf-8", |
|
) -> Tuple[Optional[bool], str]: |
|
""" |
|
Removes a given key from the given `.env` file. |
|
|
|
If the .env path given doesn't exist, fails. |
|
If the given key doesn't exist in the .env, fails. |
|
""" |
|
if not os.path.exists(dotenv_path): |
|
logger.warning("Can't delete from %s - it doesn't exist.", dotenv_path) |
|
return None, key_to_unset |
|
|
|
removed = False |
|
with rewrite(dotenv_path, encoding=encoding) as (source, dest): |
|
for mapping in with_warn_for_invalid_lines(parse_stream(source)): |
|
if mapping.key == key_to_unset: |
|
removed = True |
|
else: |
|
dest.write(mapping.original.string) |
|
|
|
if not removed: |
|
logger.warning("Key %s not removed from %s - key doesn't exist.", key_to_unset, dotenv_path) |
|
return None, key_to_unset |
|
|
|
return removed, key_to_unset |
|
|
|
|
|
def resolve_variables( |
|
values: Iterable[Tuple[str, Optional[str]]], |
|
override: bool, |
|
) -> Mapping[str, Optional[str]]: |
|
new_values: Dict[str, Optional[str]] = {} |
|
|
|
for (name, value) in values: |
|
if value is None: |
|
result = None |
|
else: |
|
atoms = parse_variables(value) |
|
env: Dict[str, Optional[str]] = {} |
|
if override: |
|
env.update(os.environ) |
|
env.update(new_values) |
|
else: |
|
env.update(new_values) |
|
env.update(os.environ) |
|
result = "".join(atom.resolve(env) for atom in atoms) |
|
|
|
new_values[name] = result |
|
|
|
return new_values |
|
|
|
|
|
def _walk_to_root(path: str) -> Iterator[str]: |
|
""" |
|
Yield directories starting from the given directory up to the root |
|
""" |
|
if not os.path.exists(path): |
|
raise IOError('Starting path not found') |
|
|
|
if os.path.isfile(path): |
|
path = os.path.dirname(path) |
|
|
|
last_dir = None |
|
current_dir = os.path.abspath(path) |
|
while last_dir != current_dir: |
|
yield current_dir |
|
parent_dir = os.path.abspath(os.path.join(current_dir, os.path.pardir)) |
|
last_dir, current_dir = current_dir, parent_dir |
|
|
|
|
|
def find_dotenv( |
|
filename: str = '.env', |
|
raise_error_if_not_found: bool = False, |
|
usecwd: bool = False, |
|
) -> str: |
|
""" |
|
Search in increasingly higher folders for the given file |
|
|
|
Returns path to the file if found, or an empty string otherwise |
|
""" |
|
|
|
def _is_interactive(): |
|
""" Decide whether this is running in a REPL or IPython notebook """ |
|
try: |
|
main = __import__('__main__', None, None, fromlist=['__file__']) |
|
except ModuleNotFoundError: |
|
return False |
|
return not hasattr(main, '__file__') |
|
|
|
if usecwd or _is_interactive() or getattr(sys, 'frozen', False): |
|
|
|
path = os.getcwd() |
|
else: |
|
|
|
frame = sys._getframe() |
|
current_file = __file__ |
|
|
|
while frame.f_code.co_filename == current_file or not os.path.exists( |
|
frame.f_code.co_filename |
|
): |
|
assert frame.f_back is not None |
|
frame = frame.f_back |
|
frame_filename = frame.f_code.co_filename |
|
path = os.path.dirname(os.path.abspath(frame_filename)) |
|
|
|
for dirname in _walk_to_root(path): |
|
check_path = os.path.join(dirname, filename) |
|
if os.path.isfile(check_path): |
|
return check_path |
|
|
|
if raise_error_if_not_found: |
|
raise IOError('File not found') |
|
|
|
return '' |
|
|
|
|
|
def load_dotenv( |
|
dotenv_path: Optional[StrPath] = None, |
|
stream: Optional[IO[str]] = None, |
|
verbose: bool = False, |
|
override: bool = False, |
|
interpolate: bool = True, |
|
encoding: Optional[str] = "utf-8", |
|
) -> bool: |
|
"""Parse a .env file and then load all the variables found as environment variables. |
|
|
|
Parameters: |
|
dotenv_path: Absolute or relative path to .env file. |
|
stream: Text stream (such as `io.StringIO`) with .env content, used if |
|
`dotenv_path` is `None`. |
|
verbose: Whether to output a warning the .env file is missing. |
|
override: Whether to override the system environment variables with the variables |
|
from the `.env` file. |
|
encoding: Encoding to be used to read the file. |
|
Returns: |
|
Bool: True if at least one environment variable is set else False |
|
|
|
If both `dotenv_path` and `stream` are `None`, `find_dotenv()` is used to find the |
|
.env file. |
|
""" |
|
if dotenv_path is None and stream is None: |
|
dotenv_path = find_dotenv() |
|
|
|
dotenv = DotEnv( |
|
dotenv_path=dotenv_path, |
|
stream=stream, |
|
verbose=verbose, |
|
interpolate=interpolate, |
|
override=override, |
|
encoding=encoding, |
|
) |
|
return dotenv.set_as_environment_variables() |
|
|
|
|
|
def dotenv_values( |
|
dotenv_path: Optional[StrPath] = None, |
|
stream: Optional[IO[str]] = None, |
|
verbose: bool = False, |
|
interpolate: bool = True, |
|
encoding: Optional[str] = "utf-8", |
|
) -> Dict[str, Optional[str]]: |
|
""" |
|
Parse a .env file and return its content as a dict. |
|
|
|
The returned dict will have `None` values for keys without values in the .env file. |
|
For example, `foo=bar` results in `{"foo": "bar"}` whereas `foo` alone results in |
|
`{"foo": None}` |
|
|
|
Parameters: |
|
dotenv_path: Absolute or relative path to the .env file. |
|
stream: `StringIO` object with .env content, used if `dotenv_path` is `None`. |
|
verbose: Whether to output a warning if the .env file is missing. |
|
encoding: Encoding to be used to read the file. |
|
|
|
If both `dotenv_path` and `stream` are `None`, `find_dotenv()` is used to find the |
|
.env file. |
|
""" |
|
if dotenv_path is None and stream is None: |
|
dotenv_path = find_dotenv() |
|
|
|
return DotEnv( |
|
dotenv_path=dotenv_path, |
|
stream=stream, |
|
verbose=verbose, |
|
interpolate=interpolate, |
|
override=True, |
|
encoding=encoding, |
|
).dict() |
|
|