import uuid
import copy
import itertools
import pkg_resources
from enum import Enum
from dataclasses import dataclass, field, asdict, replace
from typing import (
NamedTuple,
Union,
List,
Dict,
Optional,
Any,
Iterator,
Callable,
Tuple,
)
from ..base import BaseConfig
from ..util.data import export_dict, type_lookup
from ..util.entrypoint import Entrypoint, base_entry_point
[docs]class DefinitionMissing(Exception):
"""
Definition missing from linked DataFlow
"""
[docs]class PrimitiveDoesNotMatchValue(Exception):
"""
Primitive does not match the value type
"""
class _NO_DEFAULT:
pass
NO_DEFAULT = _NO_DEFAULT()
[docs]class Definition(NamedTuple):
"""
Examples
--------
>>> from dffml import Definition, Input
>>> from typing import NamedTuple
>>>
>>> class Person(NamedTuple):
... name: str
... age: int
>>>
>>> Friend = Definition(name="Friend", primitive="map", spec=Person)
>>> Input(value={"name": "Bob", "age": 42}, definition=Friend)
Input(value=Person(name='Bob', age=42), definition=Friend)
>>>
>>> SignUpQueue = Definition(name="SignUpQueue", primitive="array", spec=Person, subspec=True)
>>> Input(value=[{"name": "Bob", "age": 42}], definition=SignUpQueue)
Input(value=[Person(name='Bob', age=42)], definition=SignUpQueue)
>>>
>>> AddressBook = Definition(name="AddressBook", primitive="map", spec=Person, subspec=True)
>>> Input(value={"bob": {"name": "Bob", "age": 42}}, definition=AddressBook)
Input(value={'bob': Person(name='Bob', age=42)}, definition=AddressBook)
"""
name: str
primitive: str
default: Any = NO_DEFAULT
lock: bool = False
# spec is a NamedTuple which could be populated via a dict
spec: NamedTuple = None
# subspec is when your input is a list or dict of values which conform to
# the spec
subspec: bool = False
# validate property will be a callable (function or lambda) which returns
# the sanitized version of the value
validate: Callable[[Any], Any] = None
def __repr__(self):
return self.name
def __str__(self):
return repr(self)
def __eq__(self, other):
return bool(self.export() == other.export())
def export(self):
exported = dict(self._asdict())
if "dffml.df.types._NO_DEFAULT" in repr(self.default):
del exported["default"]
if not self.lock:
del exported["lock"]
if not self.validate:
del exported["validate"]
if not self.spec:
del exported["spec"]
del exported["subspec"]
else:
exported["spec"] = export_dict(
name=self.spec.__qualname__,
types=self.spec._field_types,
defaults=self.spec._field_defaults,
)
return exported
@classmethod
def _fromdict(cls, **kwargs):
# TODO(p5) We should avoid coping here, i think
kwargs = copy.deepcopy(kwargs)
if "spec" in kwargs:
# Alright this is horrible. But bear with me here. The
# typing.NamedTuple API as of 3.7 does not provide a clean way to
# create a new NamedTuple class where you specify the type hinting
# and the default values. The following is based on looking at the
# source code
# https://github.com/python/cpython/blob/3.7/Lib/typing.py#L1360
# and seeing that we can hijack the __annotations__ property to
# allow us to set default values
def_tuple = kwargs["spec"]["defaults"]
# All dict are ordered in 3.7+
annotations = {}
annotations_with_defaults = {}
for key, typename in kwargs["spec"]["types"].items():
# Properties without defaults must come before those with
# defaults
if key not in def_tuple:
annotations[key] = cls.type_lookup(typename)
else:
annotations_with_defaults[key] = cls.type_lookup(typename)
# Ensure annotations with defaults are after those without defaults
# in dict
for key, dtype in annotations_with_defaults.items():
annotations[key] = dtype
def_tuple["__annotations__"] = annotations
kwargs["spec"] = type(
kwargs["spec"]["name"], (NamedTuple,), def_tuple
)
return cls(**kwargs)
@classmethod
def type_lookup(cls, typename):
# Allowlist of non-python builtin types
if typename in ["Definition"]:
# TODO More types
return cls
return type_lookup(typename)
# Some common types
GENERIC = Definition(name="generic", primitive="generic")
[docs]class Stage(Enum):
PROCESSING = "processing"
CLEANUP = "cleanup"
OUTPUT = "output"
[docs]class FailedToLoadOperation(Exception):
"""
Raised when an Operation wasn't found to be registered with the
dffml.operation entrypoint.
"""
[docs]@dataclass(frozen=True)
@base_entry_point("dffml.operation", "operation")
class Operation(Entrypoint):
name: str
inputs: Dict[str, Definition] = field(default_factory=lambda: {})
outputs: Dict[str, Definition] = field(default_factory=lambda: {})
stage: Stage = Stage.PROCESSING
conditions: Optional[List[Definition]] = field(default_factory=lambda: [])
expand: Optional[List[str]] = field(default_factory=lambda: [])
instance_name: Optional[str] = None
validator: bool = False
retry: int = 0
def _replace(self, **kwargs):
return replace(self, **kwargs)
def export(self):
exported = {
"name": self.name,
"inputs": self.inputs.copy(),
"outputs": self.outputs.copy(),
"conditions": self.conditions.copy(),
"stage": self.stage.value,
"expand": self.expand.copy(),
"retry": self.retry,
}
for to_string in ["inputs", "outputs"]:
exported[to_string] = dict(
map(
lambda key_def: (key_def[0], key_def[1].export()),
exported[to_string].items(),
)
)
if not exported["conditions"]:
del exported["conditions"]
if not exported["expand"]:
del exported["expand"]
return exported
[docs] @classmethod
def definitions(cls, *args: "Operation"):
"""
Create key value mapping of definition names to definitions for all
given operations.
"""
definitions = {}
for op in args:
for has_definition in ["inputs", "outputs"]:
for definition in getattr(op, has_definition, {}).values():
definitions[definition.name] = definition
for has_definition in ["conditions"]:
for definition in getattr(op, has_definition, []):
definitions[definition.name] = definition
return definitions
@classmethod
def load(cls, loading=None):
loading_classes = []
# Load operations
for i in pkg_resources.iter_entry_points(cls.ENTRYPOINT):
if loading is not None and i.name == loading:
loaded = i.load()
if isinstance(loaded, cls):
return loaded
elif isinstance(getattr(loaded, "op", None), cls):
# Handle operations decorated with op
return loaded.op
else:
loaded = i.load()
loading_classes.append(loaded)
for i in pkg_resources.iter_entry_points(cls.ENTRYPOINT):
if loading is not None and i.name == loading:
return i.load()
else:
loading_classes.append(loaded)
if loading is not None:
raise KeyError(
"%s was not found in (%s)"
% (
repr(loading),
", ".join(list(map(lambda op: op.name, loading_classes))),
)
)
return loading_classes
@classmethod
def _op(cls, loaded):
"""
Returns the operation from a loaded entrypoint object, or None if its
not an operation or doesn't have the op parameter which is an operation.
"""
for obj in [loaded, getattr(loaded, "op", None)]:
if isinstance(obj, cls):
return obj
return None
[docs] @classmethod
def load(cls, loading=None):
loading_classes = []
# Load operations
for i in pkg_resources.iter_entry_points(cls.ENTRYPOINT):
if loading is not None and i.name == loading:
loaded = cls._op(i.load())
if loaded is not None:
return loaded
elif loading is None:
loaded = cls._op(i.load())
if loaded is not None:
loading_classes.append(loaded)
if loading is not None:
raise FailedToLoadOperation(
"%s was not found in (%s)"
% (
repr(loading),
", ".join(list(map(lambda op: op.name, loading_classes))),
)
)
return loading_classes
@classmethod
def _fromdict(cls, **kwargs):
for prop in ["inputs", "outputs"]:
if not prop in kwargs:
continue
kwargs[prop] = {
argument_name: Definition._fromdict(**definition)
for argument_name, definition in kwargs[prop].items()
}
for prop in ["conditions"]:
if not prop in kwargs:
continue
kwargs[prop] = [
Definition._fromdict(**definition)
for definition in kwargs[prop]
]
if "stage" in kwargs:
kwargs["stage"] = Stage[kwargs["stage"].upper()]
return cls(**kwargs)
[docs]class Output(NamedTuple):
name: str
select: List[Definition]
fill: Any
single: bool = False
ismap: bool = False
[docs]class Parameter(Input):
def __init__(
self, key: str, value: Any, origin: Input, definition: Definition,
):
super().__init__(
value=value, definition=definition,
)
self.key = key
self.origin = origin
[docs]@dataclass
class Forward:
"""
Keeps a map of operation instance_names to list of definitions
of inputs which should be forwarded to the subflow running in that operation.
"""
book: "Dict[str, List[Definitions]]" = None
def __post_init__(self):
if self.book is None:
self.book = {}
self._internal_book = []
def add(self, instance_name: str, definition_list: List[Definition]):
self.book[instance_name] = definition_list
self._internal_book.extend(definition_list)
[docs] def get_instances_to_forward(self, definition: Definition) -> List[str]:
"""
Returns a list of all instances of operation to which `definition` should
be forwarded to.
"""
if not definition in self._internal_book:
return []
return [
instance_name
for instance_name, definitions in self.book.items()
if definition in definitions
]
def export(self):
return export_dict(**asdict(self))
@classmethod
def _fromdict(cls, **kwargs):
return cls(**kwargs)
class DataFlow:
CONFIGLOADABLE = True
def __init__(
self,
*args,
operations: Dict[str, Union[Operation, Callable]] = None,
seed: List[Input] = None,
configs: Dict[str, BaseConfig] = None,
definitions: Dict[str, Definition] = False,
flow: Dict[str, InputFlow] = None,
by_origin: Dict[Stage, Dict[str, Operation]] = None,
# Implementations can be provided in case they haven't been registered
# via the entrypoint system.
implementations: Dict[str, "OperationImplementation"] = None,
forward: Forward = None,
) -> None:
# Prevent usage of a global dict (if we set default to {} then all the
# instances will share the same instance of that dict, or list)
if operations is None:
operations = {}
if forward is None:
forward = Forward()
if seed is None:
seed = []
if configs is None:
configs = {}
if by_origin is None:
by_origin = {}
if implementations is None:
implementations = {}
if definitions == False:
definitions = {}
validators = {} # Maps `validator` ops instance_name to op
for operation in args:
name = getattr(getattr(operation, "op", operation), "name")
if name in operations:
raise ValueError("Operation given as positional and in dict")
operations[name] = operation
self.operations = operations
self.seed = seed
self.configs = configs
self.definitions = definitions
self.flow = flow
self.by_origin = by_origin
self.implementations = implementations
self.forward = forward
self.validators = validators
self.update(auto_flow=bool(self.flow is None))
def update(self, auto_flow: bool = False):
self.update_operations()
self.update_definitions()
if auto_flow:
self.flow = self.auto_flow()
self.update_by_origin()
def update_operations(self):
# Allow callers to pass in functions decorated with op. Iterate over the
# given operations and replace any which have been decorated with their
# operation. Add the implementation to our dict of implementations.
for instance_name, value in self.operations.items():
# TODO(p4) We can't do isinstance because its defined in base, maybe
# we should move it in here. This is a hack.
is_opimp_non_decorated = bool(
getattr(value, "ENTRY_POINT_NAME", ["not-opimp"]) == ["opimp"]
)
if (
getattr(value, "imp", None) is not None
or is_opimp_non_decorated
) and getattr(value, "op", None) is not None:
# Get the operation and implementation from the wrapped object
operation = getattr(value, "op", None)
opimp = (
value
if is_opimp_non_decorated
else getattr(value, "imp", None)
)
# Set the implementation if not explicitly set
self.implementations.setdefault(operation.name, opimp)
# Change this entry to the instance of Operation associated with
# the wrapped object
self.operations[instance_name] = operation
value = operation
# Make sure every operation has the correct instance name
value = value._replace(instance_name=instance_name)
self.operations[instance_name] = value
if value.validator:
self.validators[instance_name] = value
def update_definitions(self):
# Grab all definitions from operations
operations = list(self.operations.values())
definitions = list(
set(
itertools.chain(
self.definitions.values(),
[item.definition for item in self.seed],
*[
itertools.chain(
operation.inputs.values(),
operation.outputs.values(),
operation.conditions,
)
for operation in operations
],
)
)
)
definitions = {
definition.name: definition for definition in definitions
}
self.definitions = definitions
def update_by_origin(self):
# Create by_origin which maps operation instance names to the sources
self.by_origin = {}
for instance_name, input_flow in self.flow.items():
operation = self.operations[instance_name]
self.by_origin.setdefault(operation.stage, {})
# TODO(p5) Make stanardize this so that seed is also a dict?
for output_source in input_flow.conditions:
if isinstance(output_source, str):
self.by_origin[operation.stage].setdefault(
output_source, []
)
self.by_origin[operation.stage][output_source].append(
operation
)
else:
for origin in output_source.items():
_, origin = input_flow.get_alternate_definitions(
origin
)
self.by_origin[operation.stage].setdefault(origin, [])
self.by_origin[operation.stage][origin].append(
operation
)
for output_name, output_sources in input_flow.inputs.items():
for output_source in output_sources:
if isinstance(output_source, str):
self.by_origin[operation.stage].setdefault(
output_source, []
)
self.by_origin[operation.stage][output_source].append(
operation
)
else:
# In order to support selection an input based using an
# alternate definition along with restriction to inputs
# who's origins match the alternate definitions in the
# list. We select the first output source since that
# will be the immediate alternate definition
if (
isinstance(output_source, list)
and output_source
and isinstance(output_source[0], dict)
):
output_source = output_source[0]
for origin in output_source.items():
# If we have an output_source item with an origin
# that has a list as it's value we know that the key
# (aka origin[0] since output_source is a dict) is
# the Input.origin (like "seed"). And the value
# (origin[1]) is the list of definitions which are
# acceptable from that origin for this input.
_, origin = input_flow.get_alternate_definitions(
origin
)
self.by_origin[operation.stage].setdefault(
origin, []
)
self.by_origin[operation.stage][origin].append(
operation
)
def export(self, *, linked: bool = False):
exported = {
"operations": {
instance_name: operation.export()
for instance_name, operation in self.operations.items()
}
}
if self.seed:
exported["seed"] = self.seed.copy()
if self.flow:
exported["flow"] = self.flow.copy()
if self.configs:
exported["configs"] = self.configs.copy()
if self.forward.book:
exported["forward"] = self.forward.export()
exported = export_dict(**exported)
if linked:
self._linked(exported)
return exported
@classmethod
def _fromdict(cls, *, linked: bool = False, **kwargs):
# Import all operations
if linked:
kwargs.update(cls._resolve(kwargs))
kwargs["definitions"] = {
name: Definition._fromdict(**definition)
for name, definition in kwargs["definitions"].items()
}
kwargs["operations"] = {
instance_name: Operation._fromdict(
instance_name=instance_name, **operation
)
for instance_name, operation in kwargs["operations"].items()
}
# Import seed inputs
if "seed" in kwargs:
kwargs["seed"] = [
Input._fromdict(**input_data) for input_data in kwargs["seed"]
]
# Import input flows
if "flow" in kwargs:
kwargs["flow"] = {
instance_name: InputFlow._fromdict(**input_flow)
for instance_name, input_flow in kwargs["flow"].items()
}
# Import forward
if "forward" in kwargs:
kwargs["forward"] = Forward._fromdict(**kwargs["forward"])
return cls(**kwargs)
@classmethod
def auto(cls, *operations):
return cls(*operations)
def auto_flow(self):
# Determine the dataflow if not given
flow_dict = {}
# Create output_dict, which maps all of the definitions to the
# operations that create them.
output_dict = {}
for operation in self.operations.values():
for definition in operation.outputs.values():
output_dict.setdefault(definition.name, {})
output_dict[definition.name].update(
{operation.instance_name: operation}
)
# Got through all the operations and look at their inputs
for operation in self.operations.values():
# TODO Auto flow on operation conditions too
flow_dict.setdefault(operation.instance_name, InputFlow())
# Example operation:
# Operation(
# name="pypi_package_json",
# # internal_name: package
# # definition: package = Definition(name="package", primitive="str")
# inputs={"package": package},
# # internal_name: response_json
# # definition: package_json = Definition(name="package_json", primitive="Dict")
# outputs={"response_json": package_json},
# )
# For each input
for internal_name, definition in operation.inputs.items():
# With pypi_package_json example
# internal_name = "package"
# definition = package
# = Definition(name="package", primitive="str")
if definition.name in output_dict:
# Grab the dict of operations that produce this definition
# as an output
producing_operations = output_dict[definition.name]
# If the input could be produced by an operation in the
# network, then it's definition name will be in output_dict.
flow_dict[operation.instance_name].inputs[
internal_name
] = []
# We look through the outputs and add any one that matches
# the definition and add it to the list in format of
# operation_name . internal_name (of output)
for producting_operation in producing_operations.values():
for (
internal_name_of_output,
output_definition,
) in producting_operation.outputs.items():
if output_definition == definition:
flow_dict[operation.instance_name].inputs[
internal_name
].append(
{
producting_operation.instance_name: internal_name_of_output
}
)
else:
flow_dict[operation.instance_name].inputs[
internal_name
] = ["seed"]
# Now do conditions
for definition in operation.conditions:
if definition.name in output_dict:
# Grab the dict of operations that produce this definition
# as an output
producing_operations = output_dict[definition.name]
# We look through the outputs and add any one that matches
# the definition and add it to the list in format of
# operation_name . internal_name (of output)
for producting_operation in producing_operations.values():
for (
internal_name_of_output,
output_definition,
) in producting_operation.outputs.items():
if output_definition == definition:
flow_dict[
operation.instance_name
].conditions.append(
{
producting_operation.instance_name: internal_name_of_output
}
)
else:
flow_dict[operation.instance_name].conditions = ["seed"]
return flow_dict
@classmethod
def _resolve(cls, source: Dict):
definitions = {}
operations = {}
for name, definition in source.get("definitions", {}).items():
definition.setdefault("name", name)
definitions[name] = definition
for instance_name, operation in source.get("operations", {}).items():
# Replaces strings referencing definitions with definitions
for arg in ["conditions"]:
if not arg in operation:
continue
for i, definition_name in enumerate(operation[arg]):
if not definition_name in definitions:
raise DefinitionMissing(
f"While resolving {instance_name}.{arg}, missing {definition_name} not found in {definitions.keys()}"
)
operation[arg][i] = definitions[definition_name]
for arg in ["inputs", "outputs"]:
if not arg in operation:
continue
for input_name, definition_name in operation[arg].items():
if not definition_name in definitions:
raise DefinitionMissing(
f"While resolving {instance_name}.{arg}, missing {definition_name} not found in {definitions.keys()}"
)
operation[arg][input_name] = definitions[definition_name]
operation.setdefault("name", name)
for item in source.get("seed", []):
# Replaces strings referencing definitions with definitions
if not item["definition"] in definitions:
raise DefinitionMissing(
f"While resolving seed {item}, definition {item['definition']} not found in {definitions.keys()}"
)
item["definition"] = definitions[item["definition"]]
return source
def _linked(self, exported):
# Set linked
exported["linked"] = True
# Include definitions
exported["definitions"] = export_dict(**self.definitions.copy())
# Remove definitions from operations, just use definition name
for operation in exported["operations"].values():
for arg in ["conditions"]:
if not arg in operation:
continue
for i, definition in enumerate(operation[arg]):
operation[arg][i] = definition["name"]
for arg in ["inputs", "outputs"]:
if not arg in operation:
continue
for io_name, definition in operation[arg].items():
operation[arg][io_name] = definition["name"]
# Remove definitions from seed inputs, just use definition name
if "seed" in exported:
for item in exported["seed"]:
item["definition"] = item["definition"]["name"]
return exported