promptflow.executor.flow_executor module#

class promptflow.executor.flow_executor.FlowExecutor(flow: Flow, connections: ConnectionProvider, run_tracker: RunTracker, cache_manager: AbstractCacheManager, loaded_tools: Mapping[str, Callable], *, raise_ex: bool = False, working_dir=None, line_timeout_sec=None, flow_file=None)#

Bases: object

This class is used to execute a single flow for different inputs.

Parameters:
  • flow (Flow) – The flow to be executed.

  • connections (dict) – The connections to be used for the flow.

  • run_tracker (RunTracker) – The run tracker to be used for the flow.

  • cache_manager (AbstractCacheManager) – The cache manager to be used for the flow.

  • loaded_tools (Mapping[str, Callable]) – The loaded tools to be used for the flow.

  • worker_count (Optional[int]) – The number of workers to be used for the flow. Default is 16.

  • raise_ex (Optional[bool]) – Whether to raise exceptions or not. Default is False.

  • working_dir (Optional[str]) – The working directory to be used for the flow. Default is None.

  • line_timeout_sec (Optional[int]) – The line timeout in seconds to be used for the flow. Default is LINE_TIMEOUT_SEC.

  • flow_file (Optional[Path]) – The flow file to be used for the flow. Default is None.

property aggregation_nodes#

Get the aggregation nodes of the flow executor.

Returns:

A list of aggregation nodes.

Return type:

list

static apply_inputs_mapping(inputs: Mapping[str, Mapping[str, Any]], inputs_mapping: Mapping[str, str]) Dict[str, Any]#
convert_flow_input_types(inputs: dict) Mapping[str, Any]#

Convert the input types of the given inputs dictionary to match the expected types of the flow.

Parameters:

inputs (dict) – A dictionary containing the inputs to the flow.

Returns:

A dictionary containing the converted inputs.

Return type:

Mapping[str, Any]

classmethod create(flow_file: Path, connections: Union[dict, ConnectionProvider], working_dir: Optional[Path] = None, *, entry: Optional[str] = None, storage: Optional[AbstractRunStorage] = None, raise_ex: bool = True, node_override: Optional[Dict[str, Dict[str, Any]]] = None, line_timeout_sec: Optional[int] = None, init_kwargs: Optional[Dict[str, Any]] = None, **kwargs) FlowExecutor#

Create a new instance of FlowExecutor.

Parameters:
  • flow_file (Path) – The path to the flow file.

  • connections (Union[dict, ConnectionProvider]) – The connections to be used for the flow.

  • working_dir (Optional[str]) – The working directory to be used for the flow. Default is None.

  • func (Optional[str]) – The function to be used for the flow if .py is provided. Default is None.

  • storage (Optional[AbstractRunStorage]) – The storage to be used for the flow. Default is None.

  • raise_ex (Optional[bool]) – Whether to raise exceptions or not. Default is True.

  • node_override (Optional[Dict[str, Dict[str, Any]]]) – The node overrides to be used for the flow. Default is None.

  • line_timeout_sec (Optional[int]) – The line timeout in seconds to be used for the flow. Default is LINE_TIMEOUT_SEC.

  • init_kwargs (Optional[Dict[str, Any]]) – Class init arguments for callable class, only supported for flex flow.

Returns:

A new instance of FlowExecutor.

Return type:

FlowExecutor

enable_streaming_for_llm_flow(stream_required: Callable[[], bool])#

Enable the LLM node that is connected to output to return streaming results controlled by stream_required.

If the stream_required callback returns True, the LLM node will return a generator of strings. Otherwise, the LLM node will return a string.

Parameters:

stream_required (Callable[[], bool]) – A callback that takes no arguments and returns a boolean value indicating whether streaming results should be enabled for the LLM node.

Returns:

None

ensure_flow_is_serializable()#

Ensure that the flow is serializable.

Some of the nodes may return a generator of strings to create streaming outputs. This is useful when the flow is deployed as a web service. However, in the interactive mode, the executor assumes that the node result is JSON serializable.

This method adds a wrapper to each node in the flow to consume the streaming outputs and merge them into a string for executor usage.

Returns:

None

exec(inputs: dict, node_concurrency=16) dict#

Executes the flow with the given inputs and returns the output.

Parameters:
  • inputs (dict) – A dictionary containing the input values for the flow.

  • node_concurrency (int) – The maximum number of nodes that can be executed concurrently.

Returns:

A dictionary containing the output values of the flow.

Return type:

dict

exec_aggregation(inputs: Mapping[str, Any], aggregation_inputs: Mapping[str, Any], run_id=None, node_concurrency=16) AggregationResult#

Execute the aggregation node of the flow.

Parameters:
  • inputs (Mapping[str, Any]) – A mapping of input names to their values.

  • aggregation_inputs (Mapping[str, Any]) – A mapping of aggregation input names to their values.

  • run_id (Optional[str]) – The ID of the current run, if any.

  • node_concurrency (int) – The maximum number of nodes that can be executed concurrently.

Returns:

The result of the aggregation node.

Return type:

AggregationResult

Raises:

FlowError if the inputs or aggregation_inputs are invalid.

exec_line(inputs: Mapping[str, Any], index: Optional[int] = None, run_id: Optional[str] = None, validate_inputs: bool = True, node_concurrency=16, allow_generator_output: bool = False, line_timeout_sec: Optional[int] = None) LineResult#

Execute a single line of the flow.

Parameters:
  • inputs (Mapping[str, Any]) – The input values for the line.

  • index (Optional[int]) – The index of the line to execute.

  • run_id (Optional[str]) – The ID of the flow run.

  • validate_inputs (bool) – Whether to validate the input values.

  • node_concurrency (int) – The maximum number of nodes that can be executed concurrently.

  • allow_generator_output (bool) – Whether to allow generator output.

  • line_timeout_sec (Optional[int]) – The maximum time to wait for a line of output.

Returns:

The result of executing the line.

Return type:

LineResult

async exec_line_async(inputs: Mapping[str, Any], index: Optional[int] = None, run_id: Optional[str] = None, validate_inputs: bool = True, node_concurrency=16, allow_generator_output: bool = False, line_timeout_sec: Optional[int] = None, sync_iterator_to_async: bool = True) LineResult#

Execute a single line of the flow.

Parameters:
  • inputs (Mapping[str, Any]) – The input values for the line.

  • index (Optional[int]) – The index of the line to execute.

  • run_id (Optional[str]) – The ID of the flow run.

  • validate_inputs (bool) – Whether to validate the input values.

  • node_concurrency (int) – The maximum number of nodes that can be executed concurrently.

  • allow_generator_output (bool) – Whether to allow generator output.

  • sync_iterator_to_async (bool) – Whether to convert sync iterator output to async iterator.

Returns:

The result of executing the line.

Return type:

LineResult

get_inputs_definition()#
get_status_summary(run_id: str)#

Get a summary of the status of a given run.

Parameters:

run_id (str) – The ID of the run to get the status summary for.

Returns:

A summary of the status of the given run.

Return type:

str

property has_aggregation_node: bool#

Check if the flow executor has any aggregation nodes.

Returns:

True if the flow executor has at least one aggregation node, False otherwise.

Return type:

bool

classmethod load_and_exec_node(flow_file: Path, node_name: str, *, storage: Optional[AbstractRunStorage] = None, output_sub_dir: Optional[str] = None, flow_inputs: Optional[Mapping[str, Any]] = None, dependency_nodes_outputs: Optional[Mapping[str, Any]] = None, connections: Optional[dict] = None, working_dir: Optional[Path] = None, raise_ex: bool = False)#

Load and execute a single node from the flow.

Parameters:
  • flow_file (Path) – The path to the flow file.

  • node_name (str) – The name of the node to be executed.

  • storage (Optional[AbstractRunStorage]) – The storage to be used for the flow.

  • output_sub_dir (Optional[str]) – The directory to persist image for the flow. Keep it only for backward compatibility.

  • flow_inputs (Optional[Mapping[str, Any]]) – The inputs to be used for the flow. Default is None.

  • dependency_nodes_outputs (Optional[Mapping[str, Any]) – The outputs of the dependency nodes. Default is None.

  • connections (Optional[dict]) – The connections to be used for the flow. Default is None.

  • working_dir (Optional[str]) – The working directory to be used for the flow. Default is None.

  • raise_ex (Optional[bool]) – Whether to raise exceptions or not. Default is False.

static update_environment_variables_with_connections(connections: dict)#

Update environment variables with connections.

Parameters:

connections (dict) – A dictionary containing connection information.

Returns:

A dictionary containing updated environment variables.

Return type:

dict

promptflow.executor.flow_executor.enable_streaming_for_llm_tool(f)#

Enable the stream mode for LLM tools that support it.

Parameters:

f (function) – The function to wrap.

Returns:

The wrapped function.

Return type:

function

AzureOpenAI.completion and AzureOpenAI.chat tools support both stream and non-stream mode. The stream mode is turned off by default. Use this wrapper to turn it on.

promptflow.executor.flow_executor.execute_flow(flow_file: Path, working_dir: Path, output_dir: Path, connections: dict, inputs: Mapping[str, Any], *, run_id: Optional[str] = None, run_aggregation: bool = True, enable_stream_output: bool = False, allow_generator_output: bool = False, init_kwargs: Optional[dict] = None, **kwargs) LineResult#

Execute the flow, including aggregation nodes.

Parameters:
  • flow_file (Path) – The path to the flow file.

  • working_dir (Path) – The working directory of the flow.

  • output_dir (Path) – Relative path relative to working_dir.

  • connections (dict) – A dictionary containing connection information.

  • inputs (Mapping[str, Any]) – A dictionary containing the input values for the flow.

  • enable_stream_output (Optional[bool]) – Whether to allow stream (generator) output for flow output. Default is False.

  • run_id (Optional[str]) – Run id will be set in operation context and used for session.

  • init_kwargs (dict) – Initialization parameters for flex flow, only supported when flow is callable class.

  • kwargs (Any) – Other keyword arguments to create flow executor.

Returns:

The line result of executing the flow.

Return type:

LineResult

promptflow.executor.flow_executor.signal_handler(sig, frame)#

Handle the terminate signal received by the process.

Currently, only the single node run use this handler. We print the log and raise a KeyboardInterrupt so that external code can catch this exception and cancel the running node.”