Class based flow#
Experimental feature
This is an experimental feature, and may change at any time. Learn more.
When user need to persist objects (like connection) in memory during multiple rounds of flow runs, they can write a callable class as flow entry and put persist params in __init__
method.
If user need to log metrics on batch run outputs, they can add an __aggregate__
method and it will be scheduled after batch run finishes.
The __aggregate__
method should only contain 1 params which is list of batch run results.
See connection support & aggregation support for more details.
Class as a flow#
Assume we have a file flow_entry.py
:
class Reply(TypedDict):
output: str
class MyFlow:
def __init__(self, model_config: AzureOpenAIModelConfiguration, flow_config: dict):
"""Flow initialization logic goes here."""
self.model_config = model_config
self.flow_config = flow_config
def __call__(question: str) -> Reply:
"""Flow execution logic goes here."""
return Reply(output=output)
def __aggregate__(self, line_results: List[str]) -> dict:
"""Aggregation logic goes here. Return key-value pair as metrics."""
return {"key": val}
Flow test#
Test with original code#
Since flow’s definition is function/callable class. We recommend user directly run it like running other scripts:
class MyFlow:
pass
if __name__ == "__main__":
flow = MyFlow(model_config, flow_config)
output = flow(question)
metrics = flow.__aggregate__([output])
# check metrics here
Test via function call#
It’s also supported to convert your class entry to a flow and test with prompt flow’s ability.
You can test with the following CLI:
# flow entry syntax: path.to.module:ClassName
pf flow test --flow flow_entry:MyFlow --inputs question="What's the capital of France?" --init init.json
Note: currently this command will generate a flow.flex.yaml in your working directory. Which will become the flow’s entry.
Check out a full example here: basic-chat
Chat with a flow#
Chat with flow in CLI is supported:
pf flow test --flow flow_entry:MyFlow --inputs inputs.json --init init.json --ui
Check here for more information.
Batch run#
User can also batch run a flow.
pf run create --flow "path.to.module:ClassName" --data "./data.jsonl"
# user can also directly use entry in `flow` param for batch run
pf.run(flow="path.to.module:ClassName", init="./init.jsonl", data="./data.jsonl")
Or directly run the imported flow class or flow instance.
from promptflow.core import AzureOpenAIModelConfiguration
class MyFlow:
pass
config = AzureOpenAIModelConfiguration(
azure_deployment="my_deployment",
# connection and api_key configs are exclusive
connection="my_aoai_connection",
api_key="actual_key",
)
pf.run(flow=MyFlow, init={"model_config": config, "flow_config": {}}, data="./data.jsonl")
# or
flow_obj = MyFlow(model_config=config, flow_config={})
pf.run(flow=flow_obj, data="./data.jsonl")
Learn more on this topic on Run and evaluate a flow
Define a flow yaml#
User can write a YAML file with name flow.flex.yaml
manually or save a function/callable entry to YAML file.
This is required for advanced scenario like deployment or run in cloud.
A flow YAML may look like this:
$schema: https://azuremlschemas.azureedge.net/promptflow/latest/Flow.schema.json
entry: path.to.module:ClassName
Batch run with YAML#
User can batch run a flow. Flow init function’s param is supported by init
parameter.
User need to write an JSON file as init’s value since it’s hard to write model config in command line.
{
"model_config": {
"azure_endpoint": "my_endpoint",
"azure_deployment": "my_deployment",
"api_key": "actual_api_key"
},
"flow_config": {}
}
pf run create --flow "./flow.flex.yaml" --data "./data.jsonl" --init init.json
pf = PFClient()
config = AzureOpenAIModelConfiguration(
azure_deployment="my_deployment",
api_key="actual_key"
)
# if init's value is not json serializable, raise user error
pf.run(flow="./flow.flex.yaml", init={"model_config": config, "flow_config": {}}, data="./data.jsonl")
# when submit to cloud, user can only use connection
# in runtime executor will resolve connection in AzureOpenAIModelConfiguration and set connection's fields to ModelConfig: equal to original ModelConfiguration.from_connection()
config = AzureOpenAIModelConfiguration(
azure_deployment="my_embedding_deployment",
connection="my-aoai-connection",
)
pfazure.run(flow="./flow.flex.yaml", init={"model_config": config, "flow_config": {}}, data="./data.jsonl")
Deploy a flow#
User can serve a flow. Flow init function’s param is supported by init
parameter.
The flow should have complete init/inputs/outputs specification in YAML to make sure serving swagger can be generated.
User need to write an JSON file as init’s value since it’s hard to write model config in command line.
{
"model_config": {
"azure_endpoint": "my_endpoint",
"azure_deployment": "my_deployment",
"api_key": "actual_api_key"
},
"flow_config": {}
}
# user can only pass model config by file
pf flow serve --source "./" --port 8088 --host localhost --init path/to/init.json
Learn more: Deploy a flow.
Aggregation support#
Aggregation support is introduce to help user calculate metrics.
class MyFlow:
def __call__(text: str) -> str:
"""Flow execution logic goes here."""
pass
# will only execute once after batch run finished.
# the processed_results will be list of __call__'s output and we will log the return value as metrics automatically.
def __aggregate__(self, processed_results: List[str]) -> dict:
for element in processed_results:
# If __call__'s output is primitive type, element will be primitive type.
# If __call__'s output is dataclass, element will be a dictionary, but can access it's attribute with `element.attribute_name`
# For other cases, it's recommended to access by key `element["attribute_name"]`
Note:
There’s several limitations on aggregation support:
The aggregation function will only execute in batch run.
Only 1 hard coded
__aggregate__
function is supported.The
__aggregate__
will only be passed 1 positional arguments when executing.The aggregation function’s input will be flow run’s outputs list.
Each element inside
processed_results
passed passed inside__aggregate__
function is not same object with each line’s__call__
returns.The reconstructed element is a dictionary which supports 1 layer attribute access. But it’s recommended to access them by key. See the above example for usage.
If aggregation function accept more than 1 arguments, raise error in submission phase.