Source code for dffml.operation.mapping
from typing import Dict, List, Any
from ..df.types import Definition
from ..df.base import op
from ..util.data import traverse_get
MAPPING = Definition(name="mapping", primitive="map")
MAPPING_TRAVERSE = Definition(name="mapping_traverse", primitive="List[str]")
MAPPING_KEY = Definition(name="key", primitive="str")
MAPPING_VALUE = Definition(name="value", primitive="generic")
[docs]@op(
name="dffml.mapping.extract",
inputs={"mapping": MAPPING, "traverse": MAPPING_TRAVERSE},
outputs={"value": MAPPING_VALUE},
)
def mapping_extract_value(mapping: Dict[str, Any], traverse: List[str]):
"""
Extracts value from a given mapping.
Parameters
----------
mapping : dict
The mapping to extract the value from.
traverse : list[str]
A list of keys to traverse through the mapping dictionary and extract the values.
Returns
-------
dict
A dictionary containing the value of the keys.
Examples
--------
>>> import asyncio
>>> from dffml import *
>>>
>>> dataflow = DataFlow.auto(mapping_extract_value, GetSingle)
>>>
>>> dataflow.seed.append(
... Input(
... value=[mapping_extract_value.op.outputs["value"].name],
... definition=GetSingle.op.inputs["spec"],
... )
... )
>>> inputs = [
... Input(
... value={"key1": {"key2": 42}},
... definition=mapping_extract_value.op.inputs["mapping"],
... ),
... Input(
... value=["key1", "key2"],
... definition=mapping_extract_value.op.inputs["traverse"],
... ),
... ]
>>>
>>> async def main():
... async for ctx, result in MemoryOrchestrator.run(dataflow, inputs):
... print(result)
>>>
>>> asyncio.run(main())
{'value': 42}
"""
return {"value": traverse_get(mapping, *traverse)}
[docs]@op(
name="dffml.mapping.create",
inputs={"key": MAPPING_KEY, "value": MAPPING_VALUE},
outputs={"mapping": MAPPING},
)
def create_mapping(key: str, value: Any):
"""
Creates a mapping of a given key and value.
Parameters
----------
key : str
The key for the mapping.
value : Any
The value for the mapping.
Returns
-------
dict
A dictionary containing the mapping created.
Examples
--------
>>> import asyncio
>>> from dffml import *
>>>
>>> dataflow = DataFlow.auto(create_mapping, GetSingle)
>>> dataflow.seed.append(
... Input(
... value=[create_mapping.op.outputs["mapping"].name],
... definition=GetSingle.op.inputs["spec"],
... )
... )
>>> inputs = [
... Input(
... value="key1", definition=create_mapping.op.inputs["key"],
... ),
... Input(
... value=42, definition=create_mapping.op.inputs["value"],
... ),
... ]
>>>
>>> async def main():
... async for ctx, result in MemoryOrchestrator.run(dataflow, inputs):
... print(result)
>>>
>>> asyncio.run(main())
{'mapping': {'key1': 42}}
"""
return {"mapping": {key: value}}