promptflow.executor.flow_executor module#
- class promptflow.executor.flow_executor.FlowExecutor(flow: Flow, connections: ConnectionProvider, run_tracker: RunTracker, cache_manager: AbstractCacheManager, loaded_tools: Mapping[str, Callable], *, raise_ex: bool = False, working_dir=None, line_timeout_sec=None, flow_file=None)#
Bases:
object
This class is used to execute a single flow for different inputs.
- Parameters:
flow (Flow) – The flow to be executed.
connections (dict) – The connections to be used for the flow.
run_tracker (RunTracker) – The run tracker to be used for the flow.
cache_manager (AbstractCacheManager) – The cache manager to be used for the flow.
loaded_tools (Mapping[str, Callable]) – The loaded tools to be used for the flow.
worker_count (Optional[int]) – The number of workers to be used for the flow. Default is 16.
raise_ex (Optional[bool]) – Whether to raise exceptions or not. Default is False.
working_dir (Optional[str]) – The working directory to be used for the flow. Default is None.
line_timeout_sec (Optional[int]) – The line timeout in seconds to be used for the flow. Default is LINE_TIMEOUT_SEC.
flow_file (Optional[Path]) – The flow file to be used for the flow. Default is None.
- property aggregation_nodes#
Get the aggregation nodes of the flow executor.
- Returns:
A list of aggregation nodes.
- Return type:
list
- static apply_inputs_mapping(inputs: Mapping[str, Mapping[str, Any]], inputs_mapping: Mapping[str, str]) Dict[str, Any] #
- convert_flow_input_types(inputs: dict) Mapping[str, Any] #
Convert the input types of the given inputs dictionary to match the expected types of the flow.
- Parameters:
inputs (dict) – A dictionary containing the inputs to the flow.
- Returns:
A dictionary containing the converted inputs.
- Return type:
Mapping[str, Any]
- classmethod create(flow_file: Path, connections: Union[dict, ConnectionProvider], working_dir: Optional[Path] = None, *, entry: Optional[str] = None, storage: Optional[AbstractRunStorage] = None, raise_ex: bool = True, node_override: Optional[Dict[str, Dict[str, Any]]] = None, line_timeout_sec: Optional[int] = None, init_kwargs: Optional[Dict[str, Any]] = None, **kwargs) FlowExecutor #
Create a new instance of FlowExecutor.
- Parameters:
flow_file (Path) – The path to the flow file.
connections (Union[dict, ConnectionProvider]) – The connections to be used for the flow.
working_dir (Optional[str]) – The working directory to be used for the flow. Default is None.
func (Optional[str]) – The function to be used for the flow if .py is provided. Default is None.
storage (Optional[AbstractRunStorage]) – The storage to be used for the flow. Default is None.
raise_ex (Optional[bool]) – Whether to raise exceptions or not. Default is True.
node_override (Optional[Dict[str, Dict[str, Any]]]) – The node overrides to be used for the flow. Default is None.
line_timeout_sec (Optional[int]) – The line timeout in seconds to be used for the flow. Default is LINE_TIMEOUT_SEC.
init_kwargs (Optional[Dict[str, Any]]) – Class init arguments for callable class, only supported for flex flow.
- Returns:
A new instance of FlowExecutor.
- Return type:
- enable_streaming_for_llm_flow(stream_required: Callable[[], bool])#
Enable the LLM node that is connected to output to return streaming results controlled by stream_required.
If the stream_required callback returns True, the LLM node will return a generator of strings. Otherwise, the LLM node will return a string.
- Parameters:
stream_required (Callable[[], bool]) – A callback that takes no arguments and returns a boolean value indicating whether streaming results should be enabled for the LLM node.
- Returns:
None
- ensure_flow_is_serializable()#
Ensure that the flow is serializable.
Some of the nodes may return a generator of strings to create streaming outputs. This is useful when the flow is deployed as a web service. However, in the interactive mode, the executor assumes that the node result is JSON serializable.
This method adds a wrapper to each node in the flow to consume the streaming outputs and merge them into a string for executor usage.
- Returns:
None
- exec(inputs: dict, node_concurrency=16) dict #
Executes the flow with the given inputs and returns the output.
- Parameters:
inputs (dict) – A dictionary containing the input values for the flow.
node_concurrency (int) – The maximum number of nodes that can be executed concurrently.
- Returns:
A dictionary containing the output values of the flow.
- Return type:
dict
- exec_aggregation(inputs: Mapping[str, Any], aggregation_inputs: Mapping[str, Any], run_id=None, node_concurrency=16) AggregationResult #
Execute the aggregation node of the flow.
- Parameters:
inputs (Mapping[str, Any]) – A mapping of input names to their values.
aggregation_inputs (Mapping[str, Any]) – A mapping of aggregation input names to their values.
run_id (Optional[str]) – The ID of the current run, if any.
node_concurrency (int) – The maximum number of nodes that can be executed concurrently.
- Returns:
The result of the aggregation node.
- Return type:
AggregationResult
- Raises:
FlowError if the inputs or aggregation_inputs are invalid.
- exec_line(inputs: Mapping[str, Any], index: Optional[int] = None, run_id: Optional[str] = None, validate_inputs: bool = True, node_concurrency=16, allow_generator_output: bool = False, line_timeout_sec: Optional[int] = None) LineResult #
Execute a single line of the flow.
- Parameters:
inputs (Mapping[str, Any]) – The input values for the line.
index (Optional[int]) – The index of the line to execute.
run_id (Optional[str]) – The ID of the flow run.
validate_inputs (bool) – Whether to validate the input values.
node_concurrency (int) – The maximum number of nodes that can be executed concurrently.
allow_generator_output (bool) – Whether to allow generator output.
line_timeout_sec (Optional[int]) – The maximum time to wait for a line of output.
- Returns:
The result of executing the line.
- Return type:
LineResult
- async exec_line_async(inputs: Mapping[str, Any], index: Optional[int] = None, run_id: Optional[str] = None, validate_inputs: bool = True, node_concurrency=16, allow_generator_output: bool = False, line_timeout_sec: Optional[int] = None, sync_iterator_to_async: bool = True) LineResult #
Execute a single line of the flow.
- Parameters:
inputs (Mapping[str, Any]) – The input values for the line.
index (Optional[int]) – The index of the line to execute.
run_id (Optional[str]) – The ID of the flow run.
validate_inputs (bool) – Whether to validate the input values.
node_concurrency (int) – The maximum number of nodes that can be executed concurrently.
allow_generator_output (bool) – Whether to allow generator output.
sync_iterator_to_async (bool) – Whether to convert sync iterator output to async iterator.
- Returns:
The result of executing the line.
- Return type:
LineResult
- get_inputs_definition()#
- get_status_summary(run_id: str)#
Get a summary of the status of a given run.
- Parameters:
run_id (str) – The ID of the run to get the status summary for.
- Returns:
A summary of the status of the given run.
- Return type:
str
- property has_aggregation_node: bool#
Check if the flow executor has any aggregation nodes.
- Returns:
True if the flow executor has at least one aggregation node, False otherwise.
- Return type:
bool
- classmethod load_and_exec_node(flow_file: Path, node_name: str, *, storage: Optional[AbstractRunStorage] = None, output_sub_dir: Optional[str] = None, flow_inputs: Optional[Mapping[str, Any]] = None, dependency_nodes_outputs: Optional[Mapping[str, Any]] = None, connections: Optional[dict] = None, working_dir: Optional[Path] = None, raise_ex: bool = False)#
Load and execute a single node from the flow.
- Parameters:
flow_file (Path) – The path to the flow file.
node_name (str) – The name of the node to be executed.
storage (Optional[AbstractRunStorage]) – The storage to be used for the flow.
output_sub_dir (Optional[str]) – The directory to persist image for the flow. Keep it only for backward compatibility.
flow_inputs (Optional[Mapping[str, Any]]) – The inputs to be used for the flow. Default is None.
dependency_nodes_outputs (Optional[Mapping[str, Any]) – The outputs of the dependency nodes. Default is None.
connections (Optional[dict]) – The connections to be used for the flow. Default is None.
working_dir (Optional[str]) – The working directory to be used for the flow. Default is None.
raise_ex (Optional[bool]) – Whether to raise exceptions or not. Default is False.
- static update_environment_variables_with_connections(connections: dict)#
Update environment variables with connections.
- Parameters:
connections (dict) – A dictionary containing connection information.
- Returns:
A dictionary containing updated environment variables.
- Return type:
dict
- promptflow.executor.flow_executor.enable_streaming_for_llm_tool(f)#
Enable the stream mode for LLM tools that support it.
- Parameters:
f (function) – The function to wrap.
- Returns:
The wrapped function.
- Return type:
function
AzureOpenAI.completion and AzureOpenAI.chat tools support both stream and non-stream mode. The stream mode is turned off by default. Use this wrapper to turn it on.
- promptflow.executor.flow_executor.execute_flow(flow_file: Path, working_dir: Path, output_dir: Path, connections: dict, inputs: Mapping[str, Any], *, run_id: Optional[str] = None, run_aggregation: bool = True, enable_stream_output: bool = False, allow_generator_output: bool = False, init_kwargs: Optional[dict] = None, **kwargs) LineResult #
Execute the flow, including aggregation nodes.
- Parameters:
flow_file (Path) – The path to the flow file.
working_dir (Path) – The working directory of the flow.
output_dir (Path) – Relative path relative to working_dir.
connections (dict) – A dictionary containing connection information.
inputs (Mapping[str, Any]) – A dictionary containing the input values for the flow.
enable_stream_output (Optional[bool]) – Whether to allow stream (generator) output for flow output. Default is False.
run_id (Optional[str]) – Run id will be set in operation context and used for session.
init_kwargs (dict) – Initialization parameters for flex flow, only supported when flow is callable class.
kwargs (Any) – Other keyword arguments to create flow executor.
- Returns:
The line result of executing the flow.
- Return type:
LineResult
- promptflow.executor.flow_executor.signal_handler(sig, frame)#
Handle the terminate signal received by the process.
Currently, only the single node run use this handler. We print the log and raise a KeyboardInterrupt so that external code can catch this exception and cancel the running node.”