Source code for dffml.util.data

"""
Various helper functions for manipulating python data structures and values

Run doctests with

python -m doctest -v dffml/util/data.py
"""
import ast
import uuid
import types
import pydoc
import inspect
import dataclasses
import collections
from functools import wraps
import pathlib
from typing import Callable

from .log import LOGGER

try:
    from typing import get_origin, get_args
except ImportError:
    # Added in Python 3.8
    def get_origin(t):
        return getattr(t, "__origin__", None)

    def get_args(t):
        return getattr(t, "__args__", None)


def merge(one, two, list_append: bool = True):
    for key, value in two.items():
        if key in one:
            if isinstance(value, dict):
                merge(one[key], two[key], list_append=list_append)
            elif list_append and isinstance(value, list):
                one[key] += two[key]
        else:
            one[key] = two[key]


[docs]def traverse_config_set(target, *args): """ Examples -------- >>> from dffml import traverse_config_set >>> >>> traverse_config_set({ ... "level": { ... "plugin": None, ... "config": { ... "one": { ... "plugin": 1, ... "config": {}, ... }, ... }, ... }, ... }, "level", "one", 42) {'level': {'plugin': None, 'config': {'one': {'plugin': 42, 'config': {}}}}} """ # Seperate the path down from the value to set path, value = args[:-1], args[-1] current = target last = target for level in path: if not level in current: current[level] = {"plugin": None, "config": {}} last = current[level] current = last["config"] last["plugin"] = value return target
[docs]def traverse_config_get(target, *args): """ Examples -------- >>> from dffml import traverse_config_get >>> >>> traverse_config_get({ ... "level": { ... "plugin": None, ... "config": { ... "one": { ... "plugin": 1, ... "config": {}, ... }, ... }, ... }, ... }, "level", "one") 1 """ current = target last = target for level in args: last = current[level] current = last["config"] return last["plugin"]
def split_dot_seperated(val: str) -> "List[str]": raw_split = val.split(".") vals = [] i = 0 tl = [] trig = False for x in raw_split: if "'" in x or '"' in x: trig = not trig if not trig: tl.append(x) k = ".".join(tl).replace("'", "").replace('"', "") vals.append(k) tl = [] continue if trig: tl.append(x) continue vals.append(x) return vals
[docs]def traverse_get(target, *args): """ Travel down through a dict Examples -------- >>> from dffml import traverse_get >>> >>> traverse_get({"one": {"two": 3}}, "one", "two") 3 >>> traverse_get({"one": {"two": 3}}, "one.two") 3 >>> traverse_get({"one.two": {"three": 4}}, "'one.two'.three") 4 """ if len(args) == 1: args = split_dot_seperated(args[0]) current = target for level in args: try: current = current[level] except TypeError: LOGGER.getChild("traverse_get").error( "args %r, target: %r", args, target ) raise return current
[docs]def traverse_set(target, *args, value): """ Examples -------- >>> from dffml import traverse_set >>> >>> d = {"one": {"two": 3}} >>> traverse_set(d,"one.two", value = "Three") >>> d["one"]["two"] 'Three' """ if len(args) == 1: args = split_dot_seperated(args[0]) if len(args) == 1: target[args[0]] = value return current = target for level in args[:-1]: if level not in current: current[level] = {} current = current[level] current[args[-1]] = value return
[docs]def ignore_args(func): """ Decorator to call the decorated function without any arguments passed to it. """ @wraps(func) def wrapper(*_args, **_kwargs): return func() return wrapper
# STANDARD_TYPES Will be the type names which are applicable across languages # used to transform types from one language into anothers STANDARD_TYPES = {"Dict": "map", "List": "array", "Any": "generic"} STANDARD_TYPES_REVERSED = dict( zip(STANDARD_TYPES.values(), STANDARD_TYPES.keys()) ) def type_lookup(typename): if typename in STANDARD_TYPES_REVERSED: typename = f"typing.{STANDARD_TYPES_REVERSED[typename]}" # TODO(security) Make sure pydoc.locate won't blow up in our face ever typeof = pydoc.locate(typename) if typeof is None: raise TypeError(typename) return typeof def export_value(obj, key, value): # export and _asdict are not classmethods typename_lower = str(type(value)).lower() if hasattr(value, "ENTRY_POINT_ORIG_LABEL") and hasattr(value, "config"): obj[key] = {"plugin": value.ENTRY_POINT_ORIG_LABEL} export_value(obj[key], "config", value.config) elif inspect.isclass(value): obj[key] = value.__qualname__ elif isinstance(value, (pathlib.Path, uuid.UUID)): obj[key] = str(value) elif hasattr(value, "export"): obj[key] = value.export() elif hasattr(value, "_asdict"): obj[key] = value._asdict() elif "numpy" in typename_lower: if isinstance(value, collections.abc.Iterable) and isinstance( getattr(value, "tolist", None), collections.abc.Callable ): obj[key] = value.tolist() elif ".int" in typename_lower or ".uint" in typename_lower: obj[key] = int(value) elif "float" in typename_lower: obj[key] = float(value) else: raise ValueError(f"Unknown numpy type: {typename_lower}") elif dataclasses.is_dataclass(value): obj[key] = export_dict(**dataclasses.asdict(value)) elif getattr(value, "__module__", None) == "typing": obj[key] = STANDARD_TYPES.get( str(value).replace("typing.", ""), "generic" ) def export_list(iterable): for i, value in enumerate(iterable): export_value(iterable, i, value) if isinstance(value, (dict, types.MappingProxyType)): iterable[i] = export_dict(**iterable[i]) elif dataclasses.is_dataclass(value): iterable[i] = export_dict(**dataclasses.asdict(value)) elif isinstance(value, list): iterable[i] = export_list(iterable[i]) return iterable
[docs]def export_dict(**kwargs): """ Return the dict given as kwargs but first recurse into each element and call its export or _asdict function if it is not a serializable type. """ for key, value in kwargs.items(): export_value(kwargs, key, value) if isinstance(kwargs[key], (dict, types.MappingProxyType)): kwargs[key] = export_dict(**kwargs[key]) elif isinstance(kwargs[key], list): kwargs[key] = export_list(kwargs[key]) return kwargs
[docs]def export(obj): """ Convert a value from a Python object into something that is just primitives, so things that could be printed withough formatting issues or sent to :py:func:`json.dumps`. It works on :py:func:`typing.NamedTuple`, :py:func:`dataclasses.dataclass`, and anything that implements an ``export`` method that returns a dict. Examples -------- >>> from dffml import export >>> from typing import NamedTuple >>> >>> class MyType(NamedTuple): ... data: int >>> >>> export(MyType(data=42)) {'data': 42} >>> >>> class MyBiggerType: ... def export(self): ... return {"feed": "face", "sub": MyType(data=42)} >>> >>> export(MyBiggerType()) {'feed': 'face', 'sub': {'data': 42}} """ return export_dict(value=obj)["value"]
[docs]def explore_directories(path_dict: dict): """ Recursively explores any path binded to a key in `path_dict` Examples -------- >>> import pathlib >>> import tempfile >>> from dffml import explore_directories >>> >>> with tempfile.TemporaryDirectory() as root: ... # Setup directories for example ... STRUCTURE = ''' ... root ... | ... +--- deadbeef ... | | ... | +--- file1.txt ... | +--- colosseum ... | | ... | +--- battle.rst ... +--- face ... | ... +--- file2.jpg ... ''' ... # The writes will produce the 0's in the output (for doctest) ... pathlib.Path(root, "deadbeef").mkdir() ... pathlib.Path(root, "deadbeef", "file1.txt").write_text("") ... pathlib.Path(root, "deadbeef", "colosseum").mkdir() ... pathlib.Path(root, "deadbeef", "colosseum", "battle.rst").write_text("") ... pathlib.Path(root, "face").mkdir() ... pathlib.Path(root, "face", "file2.jpg").write_text("") ... # Explore directories ... root = explore_directories(path_dict={ ... "root": root ... })["root"] ... # Check that everything was found ... bool("deadbeef" in root) ... bool("colosseum" in root["deadbeef"]) ... bool("battle" in root["deadbeef"]["colosseum"]) ... bool("face" in root) ... bool("file2" in root["face"]) 0 0 0 True True True True True """ for key, val in path_dict.items(): t_path = pathlib.Path(val) if t_path.is_dir(): temp_path_dict = {} for _path in pathlib.Path(val).glob("*"): t_path = pathlib.Path(_path) temp_path_dict[t_path.stem] = _path explore_directories(temp_path_dict) path_dict[key] = temp_path_dict return path_dict
[docs]async def nested_apply(target: dict, func: Callable): """ Applies `func` recursively to all non dict types in `target` """ for key, val in target.items(): if isinstance(val, dict): target[key] = await nested_apply(val, func) else: if inspect.iscoroutinefunction(func): target[key] = await func(val) else: target[key] = func(val) return target
[docs]def parser_helper(value): """ Calls checks if value is string and if it is it converts it to a bool if the string is a string representation of common boolean value (on, off, true, false, yes, no). Otherwise it tries to call :py:func:`ast.literal_eval`, if that doesn't succeed the string value is returned. Examples -------- >>> from dffml import parser_helper >>> >>> parser_helper("on") True >>> parser_helper("[1, 2, 3]") [1, 2, 3] >>> parser_helper("1,2,3") (1, 2, 3) >>> parser_helper("hello") 'hello' >>> parser_helper("'on'") 'on' >>> parser_helper("on,off,feed,face,42") [True, False, 'feed', 'face', 42] >>> parser_helper("list,") ['list'] """ if not isinstance(value, str): return value if value.lower() in ["null", "nil", "none"]: return None elif value.lower() in ["yes", "true", "on"]: return True elif value.lower() in ["no", "false", "off"]: return False try: return ast.literal_eval(value) except: if "," in value: return list(map(parser_helper, filter(bool, value.split(",")))) return value