from __future__ import annotations
import importlib.util
import logging
import os
import re
import string
import sys
from functools import lru_cache, reduce, wraps
from pathlib import Path, PurePosixPath
from re import IGNORECASE, compile as regexp
from typing import TYPE_CHECKING, Any, Callable, NamedTuple, Sequence, TypeVar
from urllib.parse import urlsplit
if TYPE_CHECKING: # pragma: no cover
from re import Pattern
from typing import Iterable
log = logging.getLogger(__name__)
number_pattern = regexp(r"(?P<prefix>\S*?)(?P<number>\d[\d,]*)\b")
hex_number_pattern = regexp(
r"(?P<prefix>\S*?)(?:0x)?(?P<number>[0-9a-f]+)\b", IGNORECASE
)
[docs]
def get_number_from_str(
string: str, default: int = -1, interpret_hex: bool = False
) -> int:
if interpret_hex and (match := hex_number_pattern.search(string)):
return abs(int(match.group("number"), 16))
if match := number_pattern.search(string):
return int(match.group("number"))
return default
[docs]
def sort_numerically(
iterable: Iterable[str], reverse: bool = False, allow_hex: bool = False
) -> list[str]:
# Alphabetically sort prefixes first, then sort by number
alphabetized_list = sorted(iterable)
# Extract prefixes in order to group items by prefix
unmatched_items = []
prefixes: dict[str, list[str]] = {}
for item in alphabetized_list:
if not (
pattern_match := (
(hex_number_pattern.search(item) if allow_hex else None)
or number_pattern.search(item)
)
):
unmatched_items.append(item)
continue
prefix = prefix if (prefix := pattern_match.group("prefix")) else ""
if prefix not in prefixes:
prefixes[prefix] = []
prefixes[prefix].append(item)
# Sort prefixes and items by number mixing in unmatched items as alphabetized with other prefixes
return reduce(
lambda acc, next_item: acc + next_item,
[
(
sorted(
prefixes[prefix],
key=lambda x: get_number_from_str(
x, default=-1, interpret_hex=allow_hex
),
reverse=reverse,
)
if prefix in prefixes
else [prefix]
)
for prefix in sorted([*prefixes.keys(), *unmatched_items])
],
[],
)
[docs]
def text_reducer(text: str, filter_pair: tuple[Pattern[str], str]) -> str:
"""Reduce function to apply mulitple filters to a string"""
if not text: # abort if the paragraph is empty
return text
filter_pattern, replacement = filter_pair
return filter_pattern.sub(replacement, text)
[docs]
def validate_types_in_sequence(
sequence: Sequence, types: type | tuple[type, ...]
) -> bool:
"""Validate that all elements in a sequence are of a specific type"""
return all(isinstance(item, types) for item in sequence)
_R = TypeVar("_R")
_FuncType = Callable[..., _R]
[docs]
def logged_function(logger: logging.Logger) -> Callable[[_FuncType[_R]], _FuncType[_R]]:
"""
Decorator which adds debug logging of a function's input arguments and return
value.
The input arguments are logged before the function is called, and the
return value is logged once it has completed.
:param logger: Logger to send output to.
"""
def _logged_function(func: _FuncType[_R]) -> _FuncType[_R]:
@wraps(func)
def _wrapper(*args: Any, **kwargs: Any) -> _R:
logger.debug(
"%s(%s, %s)",
func.__name__,
", ".join([format_arg(x) for x in args]),
", ".join([f"{k}={format_arg(v)}" for k, v in kwargs.items()]),
)
# Call function
result = func(*args, **kwargs)
# Log result
logger.debug("%s -> %s", func.__qualname__, str(result))
return result
return _wrapper
return _logged_function
[docs]
@logged_function(log)
def dynamic_import(import_path: str) -> Any:
"""
Dynamically import an object from a conventionally formatted "module:attribute"
string
"""
module_name, attr = import_path.split(":", maxsplit=1)
# Check if the module is a file path, if it can be resolved and exists on disk then import as a file
module_filepath = Path(module_name).resolve()
if module_filepath.exists():
module_path = (
module_filepath.stem
if Path(module_name).is_absolute()
else str(Path(module_name).with_suffix("")).replace(os.sep, ".").lstrip(".")
)
if module_path not in sys.modules:
log.debug("Loading '%s' from file '%s'", module_path, module_filepath)
spec = importlib.util.spec_from_file_location(
module_path, str(module_filepath)
)
if spec is None:
raise ImportError(f"Could not import {module_filepath}")
module = importlib.util.module_from_spec(spec) # type: ignore[arg-type]
sys.modules.update({spec.name: module})
spec.loader.exec_module(module) # type: ignore[union-attr]
return getattr(sys.modules[module_path], attr)
# Otherwise, import as a module
try:
log.debug("Importing module '%s'", module_name)
module = importlib.import_module(module_name)
log.debug("Loading '%s' from module '%s'", attr, module_name)
return getattr(module, attr)
except TypeError as err:
raise ImportError(
str.join(
"\n",
[
str(err.args[0]),
"Verify the import format matches 'module:attribute' or 'path/to/module:attribute'",
],
)
) from err
[docs]
class ParsedGitUrl(NamedTuple):
"""Container for the elements parsed from a git URL"""
scheme: str
netloc: str
namespace: str
repo_name: str
[docs]
@lru_cache(maxsize=512)
def parse_git_url(url: str) -> ParsedGitUrl:
"""
Attempt to parse a string as a git url http[s]://, git://, file://, or ssh format, into a
ParsedGitUrl.
supported examples:
http://git.mycompany.com/username/myproject.git
https://github.com/username/myproject.git
https://gitlab.com/group/subgroup/myproject.git
https://git.mycompany.com:4443/username/myproject.git
git://host.xz/path/to/repo.git/
git://host.xz:9418/path/to/repo.git/
git@github.com:username/myproject.git <-- assumes ssh://
ssh://git@github.com:3759/myproject.git <-- non-standard, but assume user 3759
ssh://git@github.com:username/myproject.git
ssh://git@bitbucket.org:7999/username/myproject.git
git+ssh://git@github.com:username/myproject.git
/Users/username/dev/remote/myproject.git <-- Posix File paths
file:///Users/username/dev/remote/myproject.git
C:/Users/username/dev/remote/myproject.git <-- Windows File paths
file:///C:/Users/username/dev/remote/myproject.git
REFERENCE: https://stackoverflow.com/questions/31801271/what-are-the-supported-git-url-formats
Raises ValueError if the url can't be parsed.
"""
log.debug("Parsing git url %r", url)
# Normalizers are a list of tuples of (pattern, replacement)
normalizers = [
# normalize implicit ssh urls to explicit ssh://
(r"^([\w._-]+@)", r"ssh://\1"),
# normalize git+ssh:// urls to ssh://
(r"^git\+ssh://", "ssh://"),
# normalize an scp like syntax to URL compatible syntax
# excluding port definitions (:#####) & including numeric usernames
(r"(ssh://(?:[\w._-]+@)?[\w.-]+):(?!\d{1,5}/\w+/)(.*)$", r"\1/\2"),
# normalize implicit file (windows || posix) urls to explicit file:// urls
(r"^([C-Z]:/)|^/(\w)", r"file:///\1\2"),
]
for pattern, replacement in normalizers:
url = re.compile(pattern).sub(replacement, url)
# run the url through urlsplit to separate out the parts
urllib_split = urlsplit(url)
# Fail if url scheme not found
if not urllib_split.scheme:
raise ValueError(f"Cannot parse {url!r}")
# We have been able to parse the url with urlsplit,
# so it's a (file|git|ssh|https?)://... structure
# but we aren't validating the protocol scheme as its not our business
# use PosixPath to normalize the path & then separate out the namespace & repo_name
namespace, _, name = (
str(PurePosixPath(urllib_split.path)).lstrip("/").rpartition("/")
)
# strip out the .git at the end of the repo_name if present
name = name[:-4] if name.endswith(".git") else name
# check that we have all the required parts of the url
required_parts = [
urllib_split.scheme,
# Allow empty net location for file:// urls
True if urllib_split.scheme == "file" else urllib_split.netloc,
namespace,
name,
]
if not all(required_parts):
raise ValueError(f"Bad url: {url!r}")
return ParsedGitUrl(
scheme=urllib_split.scheme,
netloc=urllib_split.netloc,
namespace=namespace,
repo_name=name,
)