Introduction

This repository contains the implementation of the Data Collection process for training the Large Action Models (LAMs) in the paper of Large Action Models: From Inception to Implementation. The Data Collection process is designed to streamline task processing, ensuring that all necessary steps are seamlessly integrated from initialization to execution. This module is part of the UFO project.

Dataflow

Dataflow uses UFO to implement instantiation, execution, and dataflow for a given task, with options for batch processing and single processing.

  1. Instantiation: Instantiation refers to the process of setting up and preparing a task for execution. This step typically involves choosing template, prefill and filter.
  2. Execution: Execution is the actual process of running the task. This step involves carrying out the actions or operations specified by the Instantiation. And after execution, an evaluate agent will evaluate the quality of the whole execution process.
  3. Dataflow: Dataflow is the overarching process that combines instantiation and execution into a single pipeline. It provides an end-to-end solution for processing tasks, ensuring that all necessary steps (from initialization to execution) are seamlessly integrated.

You can use instantiation and execution independently if you only need to perform one specific part of the process. When both steps are required for a task, the dataflow process streamlines them, allowing you to execute tasks from start to finish in a single pipeline.

The overall processing of dataflow is as below. Given a task-plan data, the LLMwill instantiatie the task-action data, including choosing template, prefill, filter.

How To Use

1. Install Packages

You should install the necessary packages in the UFO root folder:

pip install -r requirements.txt

2. Configure the LLMs

Before running dataflow, you need to provide your LLM configurations individually for PrefillAgent and FilterAgent. You can create your own config file dataflow/config/config.yaml, by copying the dataflow/config/config.yaml.template and editing config for PREFILL_AGENT and FILTER_AGENT as follows:

OpenAI

VISUAL_MODE: True, # Whether to use the visual mode
API_TYPE: "openai" , # The API type, "openai" for the OpenAI API.  
API_BASE: "https://api.openai.com/v1/chat/completions", # The the OpenAI API endpoint.
API_KEY: "sk-",  # The OpenAI API key, begin with sk-
API_VERSION: "2024-02-15-preview", # "2024-02-15-preview" by default
API_MODEL: "gpt-4-vision-preview",  # The only OpenAI model

Azure OpenAI (AOAI)

VISUAL_MODE: True, # Whether to use the visual mode
API_TYPE: "aoai" , # The API type, "aoai" for the Azure OpenAI.  
API_BASE: "YOUR_ENDPOINT", #  The AOAI API address. Format: https://{your-resource-name}.openai.azure.com
API_KEY: "YOUR_KEY",  # The aoai API key
API_VERSION: "2024-02-15-preview", # "2024-02-15-preview" by default
API_MODEL: "gpt-4-vision-preview",  # The only OpenAI model
API_DEPLOYMENT_ID: "YOUR_AOAI_DEPLOYMENT", # The deployment id for the AOAI API

You can also non-visial model (e.g., GPT-4) for each agent, by setting VISUAL_MODE: False and proper API_MODEL (openai) and API_DEPLOYMENT_ID (aoai).

Non-Visual Model Configuration

You can utilize non-visual models (e.g., GPT-4) for each agent by configuring the following settings in the config.yaml file:

  • VISUAL_MODE: False # To enable non-visual mode.
  • Specify the appropriate API_MODEL (OpenAI) and API_DEPLOYMENT_ID (AOAI) for each agent.

Ensure you configure these settings accurately to leverage non-visual models effectively.

Other Configurations

config_dev.yaml specifies the paths of relevant files and contains default settings. The match strategy for the window match and control filter supports options: 'contains', 'fuzzy', and 'regex', allowing flexible matching strategy for users. The MAX_STEPS is the max step for the execute_flow, which can be set by users.

Note

The specific implementation and invocation method of the matching strategy can refer to windows_app_env.

Note

BE CAREFUL! If you are using GitHub or other open-source tools, do not expose your config.yaml online, as it contains your private keys.

3. Prepare Files

Certain files need to be prepared before running the task.

3.1. Tasks as JSON

The tasks that need to be instantiated should be organized in a folder of JSON files, with the default folder path set to dataflow /tasks. This path can be changed in the dataflow/config/config.yaml file, or you can specify it in the terminal, as mentioned in 4. Start Running. For example, a task stored in dataflow/tasks/prefill/ may look like this:

{
    // The app you want to use
    "app": "word",
    // A unique ID to distinguish different tasks 
    "unique_id": "1",
    // The task and steps to be instantiated
    "task": "Type 'hello' and set the font type to Arial",
    "refined_steps": [
        "Type 'hello'",
        "Set the font to Arial"
    ]
}

3.2. Templates and Descriptions

You should place an app file as a reference for instantiation in a folder named after the app.

For example, if you have template1.docx for Word, it should be located at dataflow/templates/word/template1.docx.

Additionally, for each app folder, there should be a description.json file located at dataflow/templates/word/description.json, which describes each template file in detail. It may look like this:

{
    "template1.docx": "A document with a rectangle shape",
    "template2.docx": "A document with a line of text"
}

If a description.json file is not present, one template file will be selected at random.

3.3. Final Structure

Ensure the following files are in place:

  • JSON files to be instantiated
  • Templates as references for instantiation
  • Description file in JSON format

The structure of the files can be:

dataflow/
|
├── tasks
│   └── prefill
│       ├── bulleted.json
│       ├── delete.json
│       ├── draw.json
│       ├── macro.json
│       └── rotate.json
├── templates
│   └── word
│       ├── description.json
│       ├── template1.docx
│       ├── template2.docx
│       ├── template3.docx
│       ├── template4.docx
│       ├── template5.docx
│       ├── template6.docx
│       └── template7.docx
└── ...

4. Start Running

After finishing the previous steps, you can use the following commands in the command line. We provide single / batch process, for which you need to give the single file path / folder path. Determine the type of path provided by the user and automatically decide whether to process a single task or batch tasks.

Also, you can choose to use instantiation / execution sections individually, or use them as a whole section, which is named as dataflow.

The default task hub is set to be "TASKS_HUB" in dataflow/config_dev.yaml.

  • Dataflow Task:
python -m dataflow -dataflow --task_path path_to_task_file
  • Instantiation Task:
python -m dataflow -instantiation --task_path path_to_task_file
  • Execution Task:
python -m dataflow -execution --task_path path_to_task_file

Note

  1. Users should be careful to save the original files while using this project; otherwise, the files will be closed when the app is shut down.
  2. After starting the project, users should not close the app window while the program is taking screenshots.

Workflow

Instantiation

There are three key steps in the instantiation process:

  1. Choose a template file according to the specified app and instruction.
  2. Prefill the task using the current screenshot.
  3. Filter the established task.

Given the initial task, the dataflow first choose a template (Phase 1), the prefill the initial task based on word envrionment to obtain task-action data (Phase 2). Finnally, it will filter the established task to evaluate the quality of task-action data. (Phase 3)

Note

The more detailed code design documentation for instantiation can be found in instantiation.

Execution

The instantiated plans will be executed by a execute task. After execution, evalution agent will evaluation the quality of the entire execution process.

Note

The more detailed code design documentation for execution can be found in execution.

Result

The results will be saved in the results\ directory under instantiation, execution, and dataflow, and will be further stored in subdirectories based on the execution outcomes.

Note

The more detailed information of result can be found in result.

Quick Start

We prepare two cases to show the dataflow, which can be found in dataflow\tasks\prefill. So after installing required packages, you can type the following command in the command line:

python -m dataflow -dataflow

And you can see the hints showing in the terminal, which means the dataflow is working.

After the two tasks are finished, the task and output files would appear as follows:

UFO/
├── dataflow/
│   └── results/
│       ├── saved_document/         # Directory for saved documents
│       │   ├── bulleted.docx       # Result of the "bulleted" task
│       │   └── rotate.docx         # Result of the "rotate" task
│       ├── dataflow/                    # Dataflow results directory
│       │   ├── execution_pass/     # Successfully executed tasks
│       │   │   ├── bulleted.json   # Execution result for the "bulleted" task
│       │   │   ├── rotate.json      # Execution result for the "rotate" task
│       │   │   └── ...
└── ...

The specific results can be referenced in the result in JSON format along with example data.

Log files

The corresponding logs can be found in the directories logs/bulleted and logs/rotate, as shown below. Detailed logs for each workflow are recorded, capturing every step of the execution process.

Reference

AppEnum

Bases: Enum

Enum class for applications.

Initialize the application enum.

Parameters:
  • id (int) –

    The ID of the application.

  • description (str) –

    The description of the application.

  • file_extension (str) –

    The file extension of the application.

  • win_app (str) –

    The Windows application name.

Source code in dataflow/data_flow_controller.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def __init__(self, id: int, description: str, file_extension: str, win_app: str):
    """
    Initialize the application enum.
    :param id: The ID of the application.
    :param description: The description of the application.
    :param file_extension: The file extension of the application.
    :param win_app: The Windows application name.
    """

    self.id = id
    self.description = description
    self.file_extension = file_extension
    self.win_app = win_app
    self.app_root_name = win_app.upper() + ".EXE"

TaskObject

Initialize the task object.

Parameters:
  • task_file_path (str) –

    The path to the task file.

  • task_type (str) –

    The task_type of the task object (dataflow, instantiation, or execution).

Source code in dataflow/data_flow_controller.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def __init__(self, task_file_path: str, task_type: str) -> None:
    """
    Initialize the task object.
    :param task_file_path: The path to the task file.
    :param task_type: The task_type of the task object (dataflow, instantiation, or execution).
    """

    self.task_file_path = task_file_path
    self.task_file_base_name = os.path.basename(task_file_path)
    self.task_file_name = self.task_file_base_name.split(".")[0]

    task_json_file = load_json_file(task_file_path)
    self.app_object = self._choose_app_from_json(task_json_file["app"])
    # Initialize the task attributes based on the task_type
    self._init_attr(task_type, task_json_file)

DataFlowController

Flow controller class to manage the instantiation and execution process.

Initialize the flow controller.

Parameters:
  • task_path (str) –

    The path to the task file.

  • task_type (str) –

    The task_type of the flow controller (instantiation, execution, or dataflow).

Source code in dataflow/data_flow_controller.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def __init__(self, task_path: str, task_type: str) -> None:
    """
    Initialize the flow controller.
    :param task_path: The path to the task file.
    :param task_type: The task_type of the flow controller (instantiation, execution, or dataflow).
    """

    self.task_object = TaskObject(task_path, task_type)
    self.app_env = None
    self.app_name = self.task_object.app_object.description.lower()
    self.task_file_name = self.task_object.task_file_name

    self.schema = self._load_schema(task_type)

    self.task_type = task_type
    self.task_info = self.init_task_info()
    self.result_hub = _configs["RESULT_HUB"].format(task_type=task_type)

instantiated_plan property writable

Get the instantiated plan from the task information.

Returns:
  • List[Dict[str, Any]]

    The instantiated plan.

template_copied_path property

Get the copied template path from the task information.

Returns:
  • str

    The copied template path.

execute_execution(request, plan)

Execute the execution process.

Parameters:
  • request (str) –

    The task request to be executed.

  • plan (Dict[str, any]) –

    The execution plan containing detailed steps.

Source code in dataflow/data_flow_controller.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
def execute_execution(self, request: str, plan: Dict[str, any]) -> None:
    """
    Execute the execution process.
    :param request: The task request to be executed.
    :param plan: The execution plan containing detailed steps.
    """

    print_with_color("Executing the execution process...", "blue")
    execute_flow = None

    try:
        self.app_env.start(self.template_copied_path)
        # Initialize the execution context and flow
        context = Context()
        execute_flow = ExecuteFlow(self.task_file_name, context, self.app_env)

        # Execute the plan
        executed_plan, execute_result = execute_flow.execute(request, plan)

        # Update the instantiated plan
        self.instantiated_plan = executed_plan
        # Record execution results and time metrics
        self.task_info["execution_result"]["result"] = execute_result
        self.task_info["time_cost"]["execute"] = execute_flow.execution_time
        self.task_info["time_cost"]["execute_eval"] = execute_flow.eval_time

    except Exception as e:
        # Handle and log any exceptions that occur during execution
        self.task_info["execution_result"]["error"] = {
            "type": str(type(e).__name__),
            "message": str(e),
            "traceback": traceback.format_exc(),
        }
        print_with_color(f"Error in Execution: {e}", "red")
        raise e
    finally:
        # Record the total time cost of the execution process
        if execute_flow and hasattr(execute_flow, "execution_time"):
            self.task_info["time_cost"]["execute"] = execute_flow.execution_time
        else:
            self.task_info["time_cost"]["execute"] = None
        if execute_flow and hasattr(execute_flow, "eval_time"):
            self.task_info["time_cost"]["execute_eval"] = execute_flow.eval_time
        else:
            self.task_info["time_cost"]["execute_eval"] = None

execute_instantiation()

Execute the instantiation process.

Returns:
  • Optional[List[Dict[str, Any]]]

    The instantiation plan if successful.

Source code in dataflow/data_flow_controller.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
def execute_instantiation(self) -> Optional[List[Dict[str, Any]]]:
    """
    Execute the instantiation process.
    :return: The instantiation plan if successful.
    """

    print_with_color(
        f"Instantiating task {self.task_object.task_file_name}...", "blue"
    )

    template_copied_path = self.instantiation_single_flow(
        ChooseTemplateFlow,
        "choose_template",
        init_params=[self.task_object.app_object.file_extension],
        execute_params=[],
    )

    if template_copied_path:
        self.app_env.start(template_copied_path)

        prefill_result = self.instantiation_single_flow(
            PrefillFlow,
            "prefill",
            init_params=[self.app_env],
            execute_params=[
                template_copied_path,
                self.task_object.task,
                self.task_object.refined_steps,
            ],
        )
        self.app_env.close()

        if prefill_result:
            self.instantiation_single_flow(
                FilterFlow,
                "instantiation_evaluation",
                init_params=[],
                execute_params=[prefill_result["instantiated_request"]],
            )
            return prefill_result["instantiated_plan"]

init_task_info()

Initialize the task information.

Returns:
  • Dict[str, Any]

    The initialized task information.

Source code in dataflow/data_flow_controller.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
def init_task_info(self) -> Dict[str, Any]:
    """
    Initialize the task information.
    :return: The initialized task information.
    """
    init_task_info = None
    if self.task_type == "execution":
        # read from the instantiated task file
        init_task_info = load_json_file(self.task_object.task_file_path)
    else:
        init_task_info = {
            "unique_id": self.task_object.unique_id,
            "app": self.app_name,
            "original": {
                "original_task": self.task_object.task,
                "original_steps": self.task_object.refined_steps,
            },
            "execution_result": {"result": None, "error": None},
            "instantiation_result": {
                "choose_template": {"result": None, "error": None},
                "prefill": {"result": None, "error": None},
                "instantiation_evaluation": {"result": None, "error": None},
            },
            "time_cost": {},
        }
    return init_task_info

instantiation_single_flow(flow_class, flow_type, init_params=None, execute_params=None)

Execute a single flow process in the instantiation phase.

Parameters:
  • flow_class (AppAgentProcessor) –

    The flow class to instantiate.

  • flow_type (str) –

    The type of the flow.

  • init_params

    The initialization parameters for the flow.

  • execute_params

    The execution parameters for the flow.

Returns:
  • Optional[Dict[str, Any]]

    The result of the flow process.

Source code in dataflow/data_flow_controller.py
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
def instantiation_single_flow(
    self,
    flow_class: AppAgentProcessor,
    flow_type: str,
    init_params=None,
    execute_params=None,
) -> Optional[Dict[str, Any]]:
    """
    Execute a single flow process in the instantiation phase.
    :param flow_class: The flow class to instantiate.
    :param flow_type: The type of the flow.
    :param init_params: The initialization parameters for the flow.
    :param execute_params: The execution parameters for the flow.
    :return: The result of the flow process.
    """

    flow_instance = None
    try:
        flow_instance = flow_class(self.app_name, self.task_file_name, *init_params)
        result = flow_instance.execute(*execute_params)
        self.task_info["instantiation_result"][flow_type]["result"] = result
        return result
    except Exception as e:
        self.task_info["instantiation_result"][flow_type]["error"] = {
            "type": str(e.__class__),
            "error_message": str(e),
            "traceback": traceback.format_exc(),
        }
        print_with_color(f"Error in {flow_type}: {e} {traceback.format_exc()}")
    finally:
        if flow_instance and hasattr(flow_instance, "execution_time"):
            self.task_info["time_cost"][flow_type] = flow_instance.execution_time
        else:
            self.task_info["time_cost"][flow_type] = None

reformat_to_batch(path)

Transfer the result to the result hub.

Source code in dataflow/data_flow_controller.py
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
def reformat_to_batch(self, path) -> None:
    """
    Transfer the result to the result hub.
    """
    os.makedirs(path, exist_ok=True)
    source_files_path = os.path.join(
        self.result_hub,
        self.task_type + "_pass",
    )
    source_template_path = os.path.join(
        os.path.dirname(self.result_hub),
        "saved_document",
    )
    target_file_path = os.path.join(
        path,
        "tasks",
    )
    target_template_path = os.path.join(
        path,
        "files",
    )
    os.makedirs((target_file_path), exist_ok=True)
    os.makedirs((target_template_path), exist_ok=True)

    for file in os.listdir(source_files_path):
        if file.endswith(".json"):
            source_file = os.path.join(source_files_path, file)
            target_file = os.path.join(target_file_path, file)
            target_object = os.path.join(
                target_template_path, file.replace(".json", ".docx")
            )
            is_successed = reformat_json_file(
                target_file,
                target_object,
                load_json_file(source_file),
            )
            if is_successed:
                shutil.copy(
                    os.path.join(
                        source_template_path, file.replace(".json", ".docx")
                    ),
                    target_template_path,
                )

run()

Run the instantiation and execution process.

Source code in dataflow/data_flow_controller.py
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
def run(self) -> None:
    """
    Run the instantiation and execution process.
    """

    start_time = time.time()

    try:
        self.app_env = WindowsAppEnv(self.task_object.app_object)

        if self.task_type == "dataflow":
            plan = self.execute_instantiation()
            self.execute_execution(self.task_object.task, plan)
        elif self.task_type == "instantiation":
            self.execute_instantiation()
        elif self.task_type == "execution":
            plan = self.instantiated_plan
            self.execute_execution(self.task_object.task, plan)
        else:
            raise ValueError(f"Unsupported task_type: {self.task_type}")
    except Exception as e:
        raise e

    finally:
        # Update or record the total time cost of the process
        total_time = round(time.time() - start_time, 3)
        new_total_time = (
            self.task_info.get("time_cost", {}).get("total", 0) + total_time
        )
        self.task_info["time_cost"]["total"] = round(new_total_time, 3)

        self.save_result()

    if _configs["REFORMAT_TO_BATCH"]:
        self.reformat_to_batch(_configs["REFORMAT_TO_BATCH_HUB"])

save_result()

Validate and save the instantiated task result.

Source code in dataflow/data_flow_controller.py
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
def save_result(self) -> None:
    """
    Validate and save the instantiated task result.
    """

    validation_error = None

    # Validate the result against the schema
    try:
        validate(instance=self.task_info, schema=self.schema)
    except ValidationError as e:
        # Record the validation error but allow the process to continue
        validation_error = str(e.message)
        print_with_color(f"Validation Error: {e.message}", "yellow")

    # Determine the target directory based on task_type and quality/completeness
    target_file = None

    if self.task_type == "instantiation":
        # Determine the quality of the instantiation
        if not self.task_info["instantiation_result"]["instantiation_evaluation"][
            "result"
        ]:
            target_file = INSTANTIATION_RESULT_MAP[False]
        else:
            is_quality_good = self.task_info["instantiation_result"][
                "instantiation_evaluation"
            ]["result"]["judge"]
            target_file = INSTANTIATION_RESULT_MAP.get(
                is_quality_good, INSTANTIATION_RESULT_MAP[False]
            )

    else:
        # Determine the completion status of the execution
        if not self.task_info["execution_result"]["result"]:
            target_file = EXECUTION_RESULT_MAP["no"]
        else:
            is_completed = self.task_info["execution_result"]["result"]["complete"]
            target_file = EXECUTION_RESULT_MAP.get(
                is_completed, EXECUTION_RESULT_MAP["no"]
            )

    # Construct the full path to save the result
    new_task_path = os.path.join(
        self.result_hub, target_file, self.task_object.task_file_base_name
    )
    os.makedirs(os.path.dirname(new_task_path), exist_ok=True)
    save_json_file(new_task_path, self.task_info)

    print(f"Task saved to {new_task_path}")

    # If validation failed, indicate that the saved result may need further inspection
    if validation_error:
        print(
            "The saved task result does not conform to the expected schema and may require review."
        )