Source code for dffml.operation.io

import asyncio
import concurrent.futures
from typing import Dict, Any

from ..df.types import Operation, Definition
from ..df.base import (
    op,
    OperationImplementationContext,
    OperationImplementation,
)


# Definitions
UserInput = Definition(name="UserInput", primitive="str")
DataToPrint = Definition(name="DataToPrint", primitive="generic")

AcceptUserInput = Operation(
    name="AcceptUserInput",
    inputs={},
    outputs={"InputData": UserInput},
    conditions=[],
)


[docs]class AcceptUserInputContext(OperationImplementationContext): @staticmethod def receive_input(): print("Enter the value: ", end="") return input()
[docs] async def run(self, inputs: Dict[str, Any]) -> Dict[str, Any]: user_input = await self.parent.loop.run_in_executor( self.parent.pool, self.receive_input ) return {"InputData": user_input}
[docs]class AcceptUserInput(OperationImplementation): """ Accept input from stdin using python input() Returns ------- dict A dictionary containing user input. Examples -------- The following example shows how to use AcceptUserInput. (Assumes that the input from stdio is "Data flow is awesome"!) >>> import asyncio >>> from dffml import * >>> >>> dataflow = DataFlow.auto(AcceptUserInput, GetSingle) >>> dataflow.seed.append( ... Input( ... value=[AcceptUserInput.op.outputs["InputData"].name], ... definition=GetSingle.op.inputs["spec"], ... ) ... ) >>> >>> async def main(): ... async for ctx, results in MemoryOrchestrator.run(dataflow, {"input": []}): ... print(results) >>> >>> asyncio.run(main()) Enter the value: {'UserInput': 'Data flow is awesome'} """ op = AcceptUserInput CONTEXT = AcceptUserInputContext def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.loop = None self.pool = None self.__pool = None async def __aenter__(self) -> "OperationImplementationContext": self.loop = asyncio.get_event_loop() self.pool = concurrent.futures.ThreadPoolExecutor() self.__pool = self.pool.__enter__() return self async def __aexit__(self, exc_type, exc_value, traceback): self.__pool.__exit__(exc_type, exc_value, traceback) self.__pool = None self.pool = None self.loop = None