"""
Base classes for DFFML. All classes in DFFML should inherit from these so that
they follow a similar API for instantiation and usage.
"""
import abc
import copy
import inspect
import argparse
import functools
import contextlib
import collections
import dataclasses
import collections.abc
from argparse import ArgumentParser
from typing import Dict, Any, Type, Optional, Union
from .util.python import within_method
from .util.data import get_args, get_origin
from .util.cli.arg import Arg
from .util.data import (
traverse_config_set,
traverse_config_get,
type_lookup,
export_dict,
parser_helper,
)
from .util.entrypoint import Entrypoint
from .log import LOGGER
ARGP = ArgumentParser()
[docs]class ParseExpandAction(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
if not isinstance(values, list):
values = [values]
setattr(namespace, self.dest, self.LIST_CLS(*values))
# Maps classes to their ParseClassNameAction
LIST_ACTIONS: Dict[Type, Type] = {}
[docs]def list_action(list_cls):
"""
Action to take a list of values and make them values in the list of type
list_class. Which will be a class descendent from AsyncContextManagerList.
"""
LIST_ACTIONS.setdefault(
list_cls,
type(
f"Parse{list_cls.__qualname__}Action",
(ParseExpandAction,),
{"LIST_CLS": list_cls},
),
)
return LIST_ACTIONS[list_cls]
[docs]class MissingArg(Exception):
"""
Raised when a BaseConfigurable is missing an argument from the args dict it
created with args(). If this exception is raised then the config() method is
attempting to retrive an argument which was not set in the args() method.
"""
[docs]class MissingConfig(Exception):
"""
Raised when a BaseConfigurable is missing an argument from the config dict.
Also raised if there was no default value set and the argument is missing.
"""
[docs]class MissingRequiredProperty(Exception):
"""
Raised when a BaseDataFlowFacilitatorObject is missing some property which
should have been defined in the class.
"""
[docs]class LoggingLogger(object):
"""
Provide the logger property using Python's builtin logging module.
"""
@property
def logger(self):
prop_name = "__%s_logger" % (self.__class__.__qualname__,)
logger = getattr(self, prop_name, False)
if logger is False:
logger = LOGGER.getChild(self.__class__.__qualname__)
setattr(self, prop_name, logger)
return logger
def mkarg(field):
if field.type != bool:
arg = Arg(type=field.type)
else:
arg = Arg()
arg.annotation = field.type
# HACK For detecting dataclasses._MISSING_TYPE
if "dataclasses._MISSING_TYPE" not in repr(field.default):
arg["default"] = field.default
if "dataclasses._MISSING_TYPE" not in repr(field.default_factory):
arg["default"] = field.default_factory()
if field.type == bool:
arg["action"] = "store_true"
elif inspect.isclass(field.type):
if issubclass(field.type, (list, collections.UserList)):
arg["nargs"] = "+"
if not hasattr(field.type, "SINGLETON"):
raise AttributeError(
f"{field.type.__qualname__} missing attribute SINGLETON"
)
arg["action"] = list_action(field.type)
arg["type"] = field.type.SINGLETON
if hasattr(arg["type"], "load_labeled") and field.metadata.get(
"labeled", False
):
arg["type"] = arg["type"].load_labeled
if hasattr(arg["type"], "load"):
# TODO (python3.8) Use Protocol
arg["type"] = arg["type"].load
elif get_origin(field.type) in (list, tuple):
arg["type"] = get_args(field.type)[0]
arg["nargs"] = "+"
if "description" in field.metadata:
arg["help"] = field.metadata["description"]
if field.metadata.get("action"):
arg["action"] = field.metadata["action"]
if field.metadata.get("required"):
arg["required"] = field.metadata["required"]
return arg
PRIMITIVE_TYPES = (int, float, str, bool, dict, list, bytes)
def typing_type_cls(param_annotation):
if get_origin(param_annotation) in [
Union,
collections.abc.AsyncIterator,
]:
# If the annotation is of the form Optional
return list(get_args(param_annotation))[0]
elif (
get_origin(param_annotation) is list
or get_origin(param_annotation) is dict
):
# If the annotation are of the form List[MyDataClass] or Dict[str, MyDataClass]
# Return list or dict (probably should do more here)
return get_origin(param_annotation)
return param_annotation
def convert_value(arg, value):
if value is None:
# Return default if not found and available
if "default" in arg:
return copy.deepcopy(arg["default"])
raise MissingConfig
if not "nargs" in arg and isinstance(value, list):
value = value[0]
if "type" in arg:
type_cls = arg["type"]
if type_cls == Type:
type_cls = type_lookup
# TODO This is a oversimplification of argparse's nargs
if "nargs" in arg:
value = [
i if isinstance(i, type_cls) else type_cls(i) for i in value
]
elif getattr(type_cls, "CONFIGLOADABLE", False):
pass
else:
if not ".load" in str(type_cls):
type_cls = typing_type_cls(type_cls)
if isinstance(value, str) and type_cls is not str:
value = parser_helper(value)
# dict -> dataclass of namedtuple
if (
dataclasses.is_dataclass(type_cls)
or bool(
inspect.isclass(type_cls)
and issubclass(type_cls, tuple)
and hasattr(type_cls, "_asdict")
)
and isinstance(value, dict)
):
value = type_cls(**value)
else:
convert = True
# Try to see if the value is of the type it's supposed to be
# already
if arg.annotation is not None:
# The possible types are the type, the annotated type
# (property_name: Annotation), and the result of calling
# get_origin() on the annotation. get_origin(), for a
# typing.SomeThing like typing.Dict, or typing.Tuple, will
# return dict, or tuple respectively.
possible_types = [
arg["type"],
arg.annotation,
get_origin(arg.annotation),
]
# In the case we have an annotation that takes arguments,
# such as typing.Union. We want to check if the value is
# already any of the types listed in the acceptable types
# for that union of types. We can get those types by calling
# get_args() on the annotation.
annotation_args = get_args(arg.annotation)
# We check against the args if there are any
if annotation_args is not None:
possible_types += annotation_args
# We go through all the possible types the value should be
for possible_type in possible_types:
# We have to check that the possible type is a type
# before checking if the value is an instance of that
# type. Since it doesn't make sense to check if the
# value is an instance of something that's not a type.
if isinstance(possible_type, type) and isinstance(
value, possible_type
):
# We don't convert if the value is already of the
# correct type.
convert = False
break
if convert:
value = type_cls(value)
# list -> tuple
if arg.annotation is not None and get_origin(arg.annotation) is tuple:
value = get_origin(arg.annotation)(value)
if "action" in arg:
if isinstance(arg["action"], str):
# HACK This accesses _pop_action_class from ArgumentParser
# which is prefaced with an underscore indicating it not an API
# we can rely on
arg["action"] = ARGP._pop_action_class(arg)
namespace = ConfigurableParsingNamespace()
action = arg["action"](dest="dest", option_strings="")
action(None, namespace, value)
value = namespace.dest
return value
def is_config_dict(value):
return bool(
isinstance(value, dict)
and "plugin" in value
and "config" in value
and isinstance(value["config"], dict)
)
def _fromdict(cls, **kwargs):
for field in dataclasses.fields(cls):
if field.name in kwargs:
value = kwargs[field.name]
config = {}
if is_config_dict(value):
value, config = value["plugin"], value["config"]
value = convert_value(mkarg(field), value)
if inspect.isclass(value) and issubclass(value, BaseConfigurable):
# TODO This probably isn't 100% correct. Figure out what we need
# to do with nested configs.
value = value.withconfig(
{
value.ENTRY_POINT_NAME[-1]: {
"plugin": None,
"config": {
key: value
if is_config_dict(value)
else {"plugin": value, "config": {}}
for key, value in config.items()
},
}
}
)
kwargs[field.name] = value
return cls(**kwargs)
[docs]def field(
description: str,
*args,
action=None,
required: bool = False,
labeled: bool = False,
mutable: bool = False,
metadata: Optional[dict] = None,
**kwargs,
):
"""
Creates an instance of :py:func:`dataclasses.field`. The first argument,
``description`` is the description of the field, and will be set as the
``"description"`` key in the metadata ``dict``.
"""
if not metadata:
metadata = {}
metadata["description"] = description
metadata["required"] = required
metadata["labeled"] = labeled
metadata["action"] = action
metadata["mutable"] = mutable
return dataclasses.field(*args, metadata=metadata, **kwargs)
def config_asdict(self, *args, **kwargs):
return export_dict(**dataclasses.asdict(self, *args, **kwargs))
[docs]def config_ensure_immutable_init(self):
"""
Initialize immutable support config instance local variables.
We can't call this on ``__init__`` so we call it whenever we are about to
use it.
"""
if hasattr(self, "_mutable_callbacks"):
return
self._mutable_callbacks = set()
self._data = {}
# Only enforce mutable/immutable checks if _enforce_immutable == True
self._enforce_immutable = True
def config_add_mutable_callback(self, func):
config_ensure_immutable_init(self)
self._mutable_callbacks.add(func)
[docs]@contextlib.contextmanager
def config_no_enforce_immutable(self):
"""
By default, all properties of a config object are immutable. If you would
like to mutate immutable properties, you must explicitly call this method
using it as a context manager.
Examples
--------
>>> from dffml import config
>>>
>>> @config
... class MyConfig:
... C: int
>>>
>>> config = MyConfig(C=2)
>>> with config.no_enforce_immutable():
... config.C = 1
"""
config_ensure_immutable_init(self)
self._enforce_immutable = False
try:
yield
finally:
self._enforce_immutable = True
[docs]def config_make_getter(key):
"""
Create a getter function for use with :py:func:`property` on config objects.
"""
def getter_mutable(self):
if not key in self._data:
raise AttributeError(key)
return self._data[key]
return getter_mutable
[docs]class ImmutableConfigPropertyError(Exception):
"""
Raised when config property was changed but was not marked as mutable.
"""
[docs]class NoMutableCallbacksError(Exception):
"""
Raised when a config property is mutated but there are not mutable callbacks
present to handle it's update.
"""
[docs]def config_make_setter(key, immutable):
"""
Create a setter function for use with :py:func:`property` on config objects.
"""
def setter_immutable(self, value):
config_ensure_immutable_init(self)
# Reach into caller's stack frame to check if we are in the
# __init__ function of the dataclass. If we are in the __init__
# method we should not enforce immutability. Set max_depth to 4 in
# case of __post_init__. No point in searching farther.
if within_method(self, "__init__", max_depth=4):
# Mutate without checks if we are within the __init__ code of
# the class. Then bail out, we're done here.
self._data[key] = value
return
# Raise if the property is immutable and we're in enforcing mode
if self._enforce_immutable:
if immutable:
raise ImmutableConfigPropertyError(
f"Attempted to mutate immutable property {self.__class__.__qualname__}.{key}"
)
# Ensure we have callbacks if we're mutating
if self._mutable_callbacks:
raise NoMutableCallbacksError(
"Config instance has no mutable_callbacks registered but a mutable property was updated"
)
# Call callbacks to notify we've mutated
for func in self._mutable_callbacks:
func(key, value)
# Mutate property
self._data[key] = value
return setter_immutable
def _config(datacls):
datacls._fromdict = classmethod(_fromdict)
datacls._replace = lambda self, *args, **kwargs: dataclasses.replace(
self, *args, **kwargs
)
datacls._asdict = config_asdict
datacls.add_mutable_callback = config_add_mutable_callback
datacls.no_enforce_immutable = config_no_enforce_immutable
for field in dataclasses.fields(datacls):
# Make deleter None so it raises AttributeError: can't delete attribute
setattr(
datacls,
field.name,
property(
config_make_getter(field.name),
config_make_setter(
field.name, field.metadata.get("mutable", False)
),
None,
),
)
return datacls
[docs]def config(cls):
"""
Decorator to create a dataclass
"""
return _config(dataclasses.dataclass(eq=True, init=True)(cls))
[docs]def make_config(cls_name: str, fields, *args, namespace=None, **kwargs):
"""
Function to create a dataclass
"""
if namespace is None:
namespace = {}
namespace.setdefault("_fromdict", classmethod(_fromdict))
namespace.setdefault(
"_replace",
lambda self, *args, **kwargs: dataclasses.replace(
self, *args, **kwargs
),
)
namespace.setdefault("_asdict", config_asdict)
kwargs["eq"] = True
kwargs["init"] = True
# Ensure non-default arguments always come before default arguments
fields_non_default = []
fields_default = []
for name, cls, field in fields:
if (
field.default is not dataclasses.MISSING
or field.default_factory is not dataclasses.MISSING
):
fields_default.append((name, cls, field))
else:
fields_non_default.append((name, cls, field))
fields = fields_non_default + fields_default
# Create dataclass
return _config(
dataclasses.make_dataclass(
cls_name, fields, *args, namespace=namespace, **kwargs
)
)
[docs]@config
class BaseConfig:
"""
All DFFML Base Objects should take an object (likely a typing.NamedTuple) as
as their config.
"""
def __repr__(self):
return "BaseConfig()"
def __str__(self):
return repr(self)
class ConfigurableParsingNamespace(object):
def __init__(self):
self.dest = None
[docs]class ConfigAndKWArgsMutuallyExclusive(Exception):
"""
Raised when both kwargs and config are specified.
"""
[docs]class BaseConfigurable(metaclass=BaseConfigurableMetaClass):
"""
Class which produces a config for itself by providing Args to a CMD (from
dffml.util.cli.base) and then using a CMD after it contains parsed args to
instantiate a config (deriving from BaseConfig) which will be used as the
only parameter to the __init__ of a BaseDataFlowFacilitatorObject.
"""
def __init__(self, config: Type[BaseConfig]) -> None:
"""
BaseConfigurable takes only one argument to __init__,
its config, which should inherit from BaseConfig. It shall be a object
containing any information needed to configure the class and it's child
context's.
"""
self.config = config
str_config = str(self.config)
self.logger.debug(
str_config if len(str_config) < 512 else (str_config[:512] + "...")
)
def __eq__(self, other: "BaseConfigurable") -> bool:
if inspect.isclass(other) or not isinstance(other, self.__class__):
return
return self.config == other.config
@classmethod
def add_orig_label(cls, *above):
return (
list(above) + cls.ENTRY_POINT_NAME + [cls.ENTRY_POINT_ORIG_LABEL]
)
@classmethod
def add_label(cls, *above):
return list(above) + cls.ENTRY_POINT_NAME + [cls.ENTRY_POINT_LABEL]
@classmethod
def config_set(cls, args, above, *path) -> BaseConfig:
return traverse_config_set(
args, *(cls.add_orig_label(*above) + list(path))
)
[docs] @classmethod
def type_for(cls, param: inspect.Parameter):
"""
Guess the type based off the default value of the parameter, for when a
parameter doesn't have a type annotation.
"""
if param.annotation != inspect._empty:
return param.annotation
elif param.default is None:
return parser_helper
else:
type_of = type(param.default)
if type_of is bool:
return lambda value: bool(parser_helper(value))
return type_of
@classmethod
def config_get(cls, config, above, *path) -> BaseConfig:
# unittest.mock.patch doesn't work if we cache args() output.
args = cls.args({})
args_above = cls.add_orig_label() + list(path)
label_above = cls.add_label(*above) + list(path)
no_label_above = cls.add_label(*above)[:-1] + list(path)
arg = None
try:
arg = traverse_config_get(args, *args_above)
except KeyError as error:
pass
if arg is None:
raise MissingArg(
"Arg %r missing from %s%s%s"
% (
args_above[-1],
cls.__qualname__,
"." if args_above[:-1] else "",
".".join(args_above[:-1]),
)
)
value = None
# Try to get the value specific to this label
with contextlib.suppress(KeyError):
value = traverse_config_get(config, *label_above)
# Try to get the value specific to this plugin
if value is None:
with contextlib.suppress(KeyError):
value = traverse_config_get(config, *no_label_above)
try:
return convert_value(arg, value)
except MissingConfig as error:
error.args = (
(
"%s missing %r from %s"
% (
cls.__qualname__,
label_above[-1],
".".join(label_above[:-1]),
)
),
)
raise
[docs] @classmethod
def args(cls, args, *above) -> Dict[str, Arg]:
"""
Return a dict containing arguments required for this class
"""
if getattr(cls, "CONFIG", None) is None:
raise AttributeError(
f"{cls.__qualname__} requires CONFIG property or implementation of args() classmethod"
)
for field in dataclasses.fields(cls.CONFIG):
cls.config_set(args, above, field.name, mkarg(field))
return args
[docs] @classmethod
def config(cls, config, *above):
"""
Create the BaseConfig required to instantiate this class by parsing the
config dict.
"""
if getattr(cls, "CONFIG", None) is None:
raise AttributeError(
f"{cls.__qualname__} requires CONFIG property or implementation of config() classmethod"
)
# Build the arguments to the CONFIG class
kwargs: Dict[str, Any] = {}
for field in dataclasses.fields(cls.CONFIG):
kwargs[field.name] = got = cls.config_get(
config, above, field.name
)
if inspect.isclass(got) and issubclass(got, BaseConfigurable):
try:
kwargs[field.name] = got.withconfig(
config, *above, *cls.add_label()[:-1]
)
except MissingConfig:
kwargs[field.name] = got.withconfig(
config, *above, *cls.add_label()
)
return cls.CONFIG(**kwargs)
@classmethod
def withconfig(cls, config, *above):
return cls(cls.config(config, *above))
[docs]class BaseDataFlowFacilitatorObjectContext(LoggingLogger):
"""
Base class for all Data Flow Facilitator object's contexts. These are
classes which support async context management. Classes ending with
...Context are the most inner context's which are used in DFFML.
See the :class:BaseDataFlowFacilitatorObject for example usage.
"""
async def __aenter__(self) -> "BaseDataFlowFacilitatorObjectContext":
return self
async def __aexit__(self, exc_type, exc_value, traceback):
pass
[docs]class BaseDataFlowFacilitatorObject(
BaseDataFlowFacilitatorObjectContext, BaseConfigurable, Entrypoint
):
"""
Base class for all Data Flow Facilitator objects conforming to the
instantiate -> enter context -> return context via __call__ -> enter
returned context's context pattern. Therefore they must contain a CONTEXT
property, set to the BaseDataFlowFacilitatorObjectContext which will be
returned from a __call__ to this class.
DFFML is plugin based using Python's setuptool's entrypoint API. All
classes inheriting from BaseDataFlowFacilitatorObject must have a property
named ENTRYPOINT. In the form of `dffml.load_point` which will be used to
load all classes registered to that entry point.
>>> import asyncio
>>> from dffml import *
>>>
>>> # Create the base object. Then enter it's context to preform any initial
>>> # setup. Call obj to get an instance of obj.CONTEXT, which is a subclass
>>> # of BaseDataFlowFacilitatorObjectContext. ctx, the inner context, does
>>> # all the heavy lifting.
>>>
>>> class Context(BaseDataFlowFacilitatorObjectContext):
... async def method(self):
... return
>>>
>>> class Object(BaseDataFlowFacilitatorObject):
... CONTEXT = Context
... def __call__(self):
... return Context()
>>>
>>> async def main():
... async with Object(BaseConfig()) as obj:
... async with obj() as ctx:
... await ctx.method()
>>>
>>> asyncio.run(main())
"""
def __init__(self, config: Type[BaseConfig]) -> None:
BaseConfigurable.__init__(self, config)
# TODO figure out how to call these in __new__
self.__ensure_property("CONTEXT")
self.__ensure_property("ENTRYPOINT")
def __repr__(self):
return "%s(%r)" % (self.__class__.__qualname__, self.config)
@abc.abstractmethod
def __call__(self) -> "BaseDataFlowFacilitatorObjectContext":
pass
@classmethod
def __ensure_property(cls, property_name):
if getattr(cls, property_name, None) is None:
raise MissingRequiredProperty(
"BaseDataFlowFacilitatorObjects may not be "
"created without a `%s`. Missing %s.%s"
% (property_name, cls.__qualname__, property_name)
)