Batch Mode

Batch mode allows automated execution of tasks on specific applications or files using predefined plan files. This mode is particularly useful for repetitive tasks on Microsoft Office applications (Word, Excel, PowerPoint).

Quick Start

Step 1: Create a Plan File

Create a JSON plan file that defines the task to be automated. The plan file should contain the following fields:

Field Description Type
task The task description. String
object The application or file to interact with. String
close Determines whether to close the corresponding application or file after completing the task. Boolean

Example plan file:

{
    "task": "Type in a text of 'Test For Fun' with heading 1 level",
    "object": "draft.docx",
    "close": false
}

Important: The close field should be a boolean value (true or false), not a Python boolean (True or False).

The file structure should be organized as follows:

Parent/
├── tasks/
│   └── plan.json
└── files/
    └── draft.docx

The object field in the plan file refers to files in the files directory. The plan reader will automatically resolve the full file path by replacing tasks with files in the directory structure.

Step 2: Start Batch Mode

Run the following command to start batch mode:

# Assume you are in the cloned UFO folder
python -m ufo --task {task_name} --mode batch_normal --plan {plan_file}

Parameters: - {task_name}: Name for this task execution (used for logging) - {plan_file}: Full path to the plan JSON file (e.g., C:/Parent/tasks/plan.json)

Supported Applications

Batch mode currently supports the following Microsoft Office applications:

  • Word (.docx files) - WINWORD.EXE
  • Excel (.xlsx files) - EXCEL.EXE
  • PowerPoint (.pptx files) - POWERPNT.EXE

The application will be automatically launched when the batch mode starts, and the specified file will be opened and maximized.

Evaluation

UFO can automatically evaluate whether the task was completed successfully. To enable evaluation, ensure EVA_SESSION is set to True in the config/ufo/system.yaml file.

Check the evaluation results in logs/{task_name}/evaluation.log.

References

The batch mode uses a PlanReader to parse the plan file and creates a FromFileSession to execute the plan.

PlanReader

The PlanReader is located at ufo/module/sessions/plan_reader.py.

The reader for a plan file.

Initialize a plan reader.

Parameters:
  • plan_file (str) –

    The path of the plan file.

Source code in module/sessions/plan_reader.py
18
19
20
21
22
23
24
25
26
27
28
def __init__(self, plan_file: str):
    """
    Initialize a plan reader.
    :param plan_file: The path of the plan file.
    """

    self.plan_file = plan_file
    with open(plan_file, "r") as f:
        self.plan = json.load(f)
    self.remaining_steps = self.get_steps()
    self.support_apps = ["WINWORD.EXE", "EXCEL.EXE", "POWERPNT.EXE"]

get_close()

Check if the plan is closed.

Returns:
  • bool

    True if the plan need closed, False otherwise.

Source code in module/sessions/plan_reader.py
30
31
32
33
34
35
36
def get_close(self) -> bool:
    """
    Check if the plan is closed.
    :return: True if the plan need closed, False otherwise.
    """

    return self.plan.get("close", False)

get_host_agent_request()

Get the request for the host agent.

Returns:
  • str

    The request for the host agent.

Source code in module/sessions/plan_reader.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def get_host_agent_request(self) -> str:
    """
    Get the request for the host agent.
    :return: The request for the host agent.
    """

    object_name = self.get_operation_object()

    request = (
        f"Open and select the application of {object_name}, and output the FINISH status immediately, without assigning any subtask"
        "You must output the selected application with their control text and label even if it is already open."
    )

    return request

get_host_request()

Get the request for the host agent.

Returns:
  • str

    The request for the host agent.

Source code in module/sessions/plan_reader.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def get_host_request(self) -> str:
    """
    Get the request for the host agent.
    :return: The request for the host agent.
    """

    task = self.get_task()
    object_name = self.get_operation_object()
    if object_name in self.support_apps:
        request = task
    else:
        request = (
            f"Your task is '{task}'. And open the application of {object_name}. "
            "You must output the selected application with their control text and label even if it is already open."
        )
    return request

get_initial_request()

Get the initial request in the plan.

Returns:
  • str

    The initial request.

Source code in module/sessions/plan_reader.py
62
63
64
65
66
67
68
69
70
71
72
73
def get_initial_request(self) -> str:
    """
    Get the initial request in the plan.
    :return: The initial request.
    """

    task = self.get_task()
    object_name = self.get_operation_object()

    request = f"{task} in {object_name}"

    return request

get_operation_object()

Get the operation object in the step.

Returns:
  • str

    The operation object.

Source code in module/sessions/plan_reader.py
54
55
56
57
58
59
60
def get_operation_object(self) -> str:
    """
    Get the operation object in the step.
    :return: The operation object.
    """

    return self.plan.get("object", None).lower()

get_root_path()

Get the root path of the plan.

Returns:
  • str

    The root path of the plan.

Source code in module/sessions/plan_reader.py
148
149
150
151
152
153
154
def get_root_path(self) -> str:
    """
    Get the root path of the plan.
    :return: The root path of the plan.
    """

    return os.path.dirname(os.path.abspath(self.plan_file))

get_steps()

Get the steps in the plan.

Returns:
  • List[str]

    The steps in the plan.

Source code in module/sessions/plan_reader.py
46
47
48
49
50
51
52
def get_steps(self) -> List[str]:
    """
    Get the steps in the plan.
    :return: The steps in the plan.
    """

    return self.plan.get("steps", [])

get_support_apps()

Get the support apps in the plan.

Returns:
  • List[str]

    The support apps in the plan.

Source code in module/sessions/plan_reader.py
103
104
105
106
107
108
109
def get_support_apps(self) -> List[str]:
    """
    Get the support apps in the plan.
    :return: The support apps in the plan.
    """

    return self.support_apps

get_task()

Get the task name.

Returns:
  • str

    The task name.

Source code in module/sessions/plan_reader.py
38
39
40
41
42
43
44
def get_task(self) -> str:
    """
    Get the task name.
    :return: The task name.
    """

    return self.plan.get("task", "")

next_step()

Get the next step in the plan.

Returns:
  • Optional[str]

    The next step.

Source code in module/sessions/plan_reader.py
128
129
130
131
132
133
134
135
136
137
138
def next_step(self) -> Optional[str]:
    """
    Get the next step in the plan.
    :return: The next step.
    """

    if self.remaining_steps:
        step = self.remaining_steps.pop(0)
        return step

    return None

task_finished()

Check if the task is finished.

Returns:
  • bool

    True if the task is finished, False otherwise.

Source code in module/sessions/plan_reader.py
140
141
142
143
144
145
146
def task_finished(self) -> bool:
    """
    Check if the task is finished.
    :return: True if the task is finished, False otherwise.
    """

    return not self.remaining_steps

FromFileSession

The FromFileSession is located at ufo/module/sessions/session.py.

Bases: WindowsBaseSession

A session for UFO from files on Windows.

Initialize a session.

Parameters:
  • task (str) –

    The name of current task.

  • plan_file (str) –

    The path of the plan file to follow.

  • should_evaluate (bool) –

    Whether to evaluate the session.

  • id (int) –

    The id of the session.

Source code in module/sessions/session.py
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
def __init__(
    self, task: str, plan_file: str, should_evaluate: bool, id: int
) -> None:
    """
    Initialize a session.
    :param task: The name of current task.
    :param plan_file: The path of the plan file to follow.
    :param should_evaluate: Whether to evaluate the session.
    :param id: The id of the session.
    """

    super().__init__(task, should_evaluate, id)
    self.plan_file = plan_file
    self.plan_reader = PlanReader(plan_file)
    self.support_apps = self.plan_reader.get_support_apps()
    self.close = self.plan_reader.get_close()
    self.task_name = task.split("/")[1]
    self.object_name = ""

create_new_round()

Create a new round.

Source code in module/sessions/session.py
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
def create_new_round(self) -> None:
    """
    Create a new round.
    """

    # Get a request for the new round.
    request = self.next_request()

    # Create a new round and return None if the session is finished.
    if self.is_finished():
        return None

    self._host_agent.set_state(ContinueHostAgentState())

    round = BaseRound(
        request=request,
        agent=self._host_agent,
        context=self.context,
        should_evaluate=ufo_config.system.eva_round,
        id=self.total_rounds,
    )

    self.add_round(round.id, round)

    return round

get_app_com(object_name)

Get the COM object name based on the object name.

Parameters:
  • object_name (str) –

    The name of the object.

Returns:
  • str

    The COM object name.

Source code in module/sessions/session.py
344
345
346
347
348
349
350
351
352
353
354
355
356
def get_app_com(self, object_name: str) -> str:
    """
    Get the COM object name based on the object name.
    :param object_name: The name of the object.
    :return: The COM object name.
    """
    application_mapping = {
        ".docx": "Word.Application",
        ".xlsx": "Excel.Application",
        ".pptx": "PowerPoint.Application",
    }
    self.app_name = application_mapping.get(object_name)
    return self.app_name

get_app_name(object_name)

Get the application name based on the object name.

Parameters:
  • object_name (str) –

    The name of the object.

Returns:
  • str

    The application name.

Source code in module/sessions/session.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
def get_app_name(self, object_name: str) -> str:
    """
    Get the application name based on the object name.
    :param object_name: The name of the object.
    :return: The application name.
    """
    application_mapping = {
        ".docx": "WINWORD.EXE",
        ".xlsx": "EXCEL.EXE",
        ".pptx": "POWERPNT.EXE",
        # "outlook": "olk.exe",
        # "onenote": "ONENOTE.EXE",
    }
    self.app_name = application_mapping.get(object_name)
    return self.app_name

next_request()

Get the request for the host agent.

Returns:
  • str

    The request for the host agent.

Source code in module/sessions/session.py
315
316
317
318
319
320
321
322
323
324
325
326
def next_request(self) -> str:
    """
    Get the request for the host agent.
    :return: The request for the host agent.
    """

    if self.total_rounds == 0:
        console.print(self.plan_reader.get_host_request(), style="cyan")
        return self.plan_reader.get_host_request()
    else:
        self._finish = True
        return

record_task_done()

Record the task done.

Source code in module/sessions/session.py
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
def record_task_done(self) -> None:
    """
    Record the task done.
    """
    is_record = ufo_config.system.task_status
    if is_record:
        file_path = ufo_config.system.get(
            "TASK_STATUS_FILE",
            os.path.join(self.plan_file, "../..", "tasks_status.json"),
        )
        task_done = json.load(open(file_path, "r"))
        task_done[self.task_name] = True
        json.dump(
            task_done,
            open(file_path, "w"),
            indent=4,
        )

request_to_evaluate()

Get the request to evaluate. return: The request(s) to evaluate.

Source code in module/sessions/session.py
420
421
422
423
424
425
def request_to_evaluate(self) -> str:
    """
    Get the request to evaluate.
    return: The request(s) to evaluate.
    """
    return self.plan_reader.get_task()

run()

Run the session.

Source code in module/sessions/session.py
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
def run(self) -> None:
    """
    Run the session.
    """
    self.setup_application_environment()
    try:
        super().run()
        self.record_task_done()
    except Exception as e:
        import traceback

        traceback.print_exc()
        print(f"An error occurred: {e}")
    # Close the APP if the user ask so.
    self.terminate_application_processes()

setup_application_environment()

Sets up the application environment by determining the application name and command based on the operation object, and then launching the application.

Raises: Exception: If an error occurs during the execution of the command or while interacting with the application via COM.

Source code in module/sessions/session.py
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
def setup_application_environment(self):
    """
    Sets up the application environment by determining the application name and
    command based on the operation object, and then launching the application.

    Raises:
        Exception: If an error occurs during the execution of the command or
                   while interacting with the application via COM.
    """
    self.object_name = self.plan_reader.get_operation_object()
    if self.object_name:
        suffix = os.path.splitext(self.object_name)[1]
        self.app_name = self.get_app_name(suffix)
        print("app_name:", self.app_name)
        if self.app_name not in self.support_apps:
            print(f"The app {self.app_name} is not supported.")
            return  # The app is not supported, so we don't need to setup the environment.
        file = self.plan_reader.get_file_path()
        code_snippet = f"import os\nos.system('start {self.app_name} \"{file}\"')"
        code_snippet = code_snippet.replace("\\", "\\\\")  # escape backslashes
        try:
            exec(code_snippet, globals())
            app_com = self.get_app_com(suffix)
            time.sleep(2)  # wait for the app to boot
            word_app = win32com.client.Dispatch(app_com)
            word_app.WindowState = 1  # wdWindowStateMaximize
        except Exception as e:
            print(f"An error occurred: {e}")

terminate_application_processes()

Terminates specific application processes based on the provided conditions.

Source code in module/sessions/session.py
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
def terminate_application_processes(self):
    """
    Terminates specific application processes based on the provided conditions.
    """
    if self.close:
        if self.object_name:
            for process in psutil.process_iter(["name"]):
                if process.info["name"] == self.app_name:
                    os.system(f"taskkill /f /im {self.app_name}")
                    time.sleep(1)
        else:
            app_names = ["WINWORD.EXE", "EXCEL.EXE", "POWERPNT.EXE"]
            for process in psutil.process_iter(["name"]):
                if process.info["name"] in app_names:
                    os.system(f"taskkill /f /im {process.info['name']}")
                    time.sleep(1)