Operation Dataflow

exception dffml.operation.dataflow.InvalidCustomRunDataFlowContext[source]

Thrown when custom inputs for dffml.dataflow.run do not list an input with string as its primitive as the first input.

exception dffml.operation.dataflow.InvalidCustomRunDataFlowOutputs[source]

Thrown when outputs for a custom dffml.dataflow.run do not match that of it’s subflow.

class dffml.operation.dataflow.RunDataFlowConfig(dataflow: dffml.df.types.DataFlow)[source]
no_enforce_immutable()

By default, all properties of a config object are immutable. If you would like to mutate immutable properties, you must explicitly call this method using it as a context manager.

Examples

>>> from dffml import config
>>>
>>> @config
... class MyConfig:
...     C: int
>>>
>>> config = MyConfig(C=2)
>>> with config.no_enforce_immutable():
...     config.C = 1
class dffml.operation.dataflow.run_dataflow(parent: dffml.df.base.OperationImplementation, ctx: dffml.df.base.BaseInputSetContext, octx: dffml.df.base.BaseOrchestratorContext)[source]

Starts a subflow self.config.dataflow and adds inputs in it.

Parameters

inputs (dict) – The inputs to add to the subflow. These should be a key value mapping of the context string to the inputs which should be seeded for that context string.

Returns

Maps context strings in inputs to output after running through dataflow.

Return type

dict

Examples

The following shows how to use run dataflow in its default behavior.

>>> import asyncio
>>> from dffml import *
>>>
>>> URL = Definition(name="URL", primitive="string")
>>>
>>> subflow = DataFlow.auto(GetSingle)
>>> subflow.definitions[URL.name] = URL
>>> subflow.seed.append(
...     Input(
...         value=[URL.name],
...         definition=GetSingle.op.inputs["spec"]
...     )
... )
>>>
>>> dataflow = DataFlow.auto(run_dataflow, GetSingle)
>>> dataflow.configs[run_dataflow.op.name] = RunDataFlowConfig(subflow)
>>> dataflow.seed.append(
...     Input(
...         value=[run_dataflow.op.outputs["results"].name],
...         definition=GetSingle.op.inputs["spec"]
...     )
... )
>>>
>>> async def main():
...     async for ctx, results in MemoryOrchestrator.run(dataflow, {
...         "run_subflow": [
...             Input(
...                 value={
...                     "dffml": [
...                         {
...                             "value": "https://github.com/intel/dffml",
...                             "definition": URL.name
...                         }
...                     ]
...                 },
...                 definition=run_dataflow.op.inputs["inputs"]
...             )
...         ]
...     }):
...         print(results)
>>>
>>> asyncio.run(main())
{'flow_results': {'dffml': {'URL': 'https://github.com/intel/dffml'}}}

The following shows how to use run dataflow with custom inputs and outputs. This allows you to run a subflow as if it were an operation.

>>> import asyncio
>>> from dffml import *
>>>
>>> URL = Definition(name="URL", primitive="string")
>>>
>>> @op(
...     inputs={"url": URL},
...     outputs={"last": Definition("last_element_in_path", primitive="string")},
... )
... def last_path(url):
...     return {"last": url.split("/")[-1]}
>>>
>>> subflow = DataFlow.auto(last_path, GetSingle)
>>> subflow.seed.append(
...     Input(
...         value=[last_path.op.outputs["last"].name],
...         definition=GetSingle.op.inputs["spec"],
...     )
... )
>>>
>>> dataflow = DataFlow.auto(run_dataflow, GetSingle)
>>> dataflow.operations[run_dataflow.op.name] = run_dataflow.op._replace(
...     inputs={"URL": URL},
...     outputs={last_path.op.outputs["last"].name: last_path.op.outputs["last"]},
...     expand=[],
... )
>>> dataflow.configs[run_dataflow.op.name] = RunDataFlowConfig(subflow)
>>> dataflow.seed.append(
...     Input(
...         value=[last_path.op.outputs["last"].name],
...         definition=GetSingle.op.inputs["spec"],
...     )
... )
>>> dataflow.update(auto_flow=True)
>>>
>>> async def main():
...     async for ctx, results in MemoryOrchestrator.run(
...         dataflow,
...         {
...             "run_subflow": [
...                 Input(value="https://github.com/intel/dffml", definition=URL)
...             ]
...         },
...     ):
...         print(results)
>>>
>>> asyncio.run(main())
{'last_element_in_path': 'dffml'}
imp

alias of dffml.base.DffmlDataflowRunImplementation

async run(inputs: Dict[str, Any]) Dict[str, Any][source]

Implementation of the operation goes here. Should take and return a dict with keys matching the input and output parameters of the Operation object associated with this operation implementation context.

async run_default(inputs: Dict[str, Any]) Dict[str, Any][source]

The default implementation for the dataflow.run operation is the uctx mode. This mode is when we map unique strings to a list of inputs to be given to the respective string’s context.