Source code for dffml.high_level.dataflow

import asyncio
from typing import Optional, Tuple, List, Union, Dict, Any, AsyncIterator

from ..df.types import DataFlow, Input
from ..df.memory import MemoryOrchestrator
from ..df.base import BaseInputSetContext, BaseOrchestrator, BaseInputSet


[docs]async def run( dataflow: DataFlow, *input_sets: Union[List[Input], BaseInputSet], orchestrator: Optional[BaseOrchestrator] = None, strict: bool = True, ctx: Optional[BaseInputSetContext] = None, halt: Optional[asyncio.Event] = None, ) -> AsyncIterator[Tuple[BaseInputSetContext, Dict[str, Any]]]: """ Run a DataFlow Run a DataFlow using the the default orchestrator (:py:class:`MemoryOrchestrator <dffml.df.memory.MemoryOrchestrator>`), or the specified one. Parameters ---------- dataflow : DataFlow :py:class:`DataFlow <dffml.df.types.DataFlow>` to run. input_sets : InputSet, list, dict, optional :py:class:`Inputs <dffml.df.types.Input>` to give to the :py:class:`DataFlow <dffml.df.types.DataFlow>` when it starts. Can be in multiple formats. If each element is a ``list`` then it's expected that each element of that list be an :py:class:`Input <dffml.df.types.Input>`, in this case an :py:class:`InputSet <dffml.df.base.BaseInputSet>` will be created with a random string used as the :py:class:`StringInputSetContext <dffml.df.base.StringInputSetContext>`. If a ``dict`` is given then each key will become a :py:class:`StringInputSetContext <dffml.df.base.StringInputSetContext>`. The value for each key should be a ``list`` of :py:class:`Input <dffml.df.types.Input>` objects. If each element is a :py:class:`InputSet <dffml.df.base.BaseInputSet>` then each context :py:class:`InputSetContext <dffml.df.base.BaseInputSetContext>` will have its respective :py:class:`Inputs <dffml.df.types.Input>` added to it. orchestrator : BaseOrchestrator, optional Orchestrator to use, defaults to :py:class:`MemoryOrchestrator <dffml.df.memory.MemoryOrchestrator>` if ``None``. strict : bool, optional If true (default), raise exceptions when they occur in operations. If false, log exceptions without raising. ctx : BaseInputSetContext, optional If given and input_sets is a ``list`` then add inputs under the given context. Otherwise they are added under randomly generated contexts. halt : Event, optional If given, keep the dataflow running until this :py:class:`asyncio.Event` is set. Returns ------- asynciterator ``tuple`` of :py:class:`InputSetContext <dffml.df.base.BaseInputSetContext>` and ``dict`` where contents are determined by output operations. If multiple output operations are used, then the top level keys will be the names of the output operations. If only one is used, then the ``dict`` will be whatever the return value of that output operation was. Examples -------- The following creates a TCP echo server. We write an operation which using a DataFlow to open a connection and send a message to the server. For the inputs to the DataFlow, we create 2 Input objects whose values are the message to be sent to the TCP server. We also create Input objects for the host and port. When running a DataFlow, operations will be run with each possible permutation of their inputs. .. TODO Autogenerate this image during docs build graph LR send_to_server 1[First echo message] port[Port] host[Host] 2[Second echo message] 1_c[Host, Port, First echo] 2_c[Host, Port, Second echo] host --> 1_c port --> 1_c 2 --> 2_c port --> 2_c host --> 2_c 1 --> 1_c 1_c --> send_to_server 2_c --> send_to_server .. image:: /images/high_level_run_echo_server_input_combination.svg :alt: Flow chart showing how both echo messages create a parameter set including that echo message and the host and port Because there is a different Input object for each of the 2 "echo" messages, one will get combined with the host and port to make an argument list for the ``send_to_server`` operation. The other also combines with the host and port to make another argument list. Both argument lists are used to call the ``send_to_server`` operation. >>> # Socket server derived from >>> # https://docs.python.org/3/library/socketserver.html#asynchronous-mixins >>> import asyncio >>> import threading >>> import socketserver >>> from dffml import * >>> >>> class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler): ... def handle(self): ... self.request.sendall(self.request.recv(1024)) >>> >>> class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): ... pass >>> >>> @op ... async def send_to_server(host: str, port: int, message: str): ... reader, writer = await asyncio.open_connection(host, port) ... ... writer.write(message.encode()) ... await writer.drain() ... ... data = await reader.read(100) ... print(f"Client sent {message!r}, got: {data.decode()!r}") ... ... writer.close() ... await writer.wait_closed() >>> >>> # List of messages to send to the server, 2 long, each value is "echo" >>> messages = [Input(value="echo", definition=send_to_server.op.inputs["message"]) ... for _ in range(0, 2)] >>> >>> # DataFlow consisting of the single operation >>> dataflow = DataFlow.auto(send_to_server) >>> >>> async def main(): ... # Create a server with and pass 0 to get a random port assigned ... server = ThreadedTCPServer(("127.0.0.1", 0), ThreadedTCPRequestHandler) ... with server: ... host, port = server.server_address ... ... # Start a thread to run the server in the background ... server_thread = threading.Thread(target=server.serve_forever) ... # Exit the server thread when the main thread terminates ... server_thread.daemon = True ... server_thread.start() ... ... # Add the host and port to the list of Inputs for the DataFlow ... inputs = messages + [ ... Input(value=host, definition=send_to_server.op.inputs["host"]), ... Input(value=port, definition=send_to_server.op.inputs["port"]) ... ] ... ... try: ... async for ctx, results in run(dataflow, inputs): ... pass ... finally: ... server.shutdown() >>> >>> asyncio.run(main()) Client sent 'echo', got: 'echo' Client sent 'echo', got: 'echo' """ if orchestrator is None: orchestrator = MemoryOrchestrator.withconfig({}) async with orchestrator: async with orchestrator(dataflow) as ctx: async for ctx, results in ctx.run(*input_sets): yield ctx, results