Batch Mode
Batch mode is a feature of UFO, the agent allows batch automation of tasks.
Quick Start
Step 1: Create a Plan file
Before starting the Batch mode, you need to create a plan file that contains the list of steps for the agent to follow. The plan file is a JSON file that contains the following fields:
Field |
Description |
Type |
task |
The task description. |
String |
object |
The application or file to interact with. |
String |
close |
Determines whether to close the corresponding application or file after completing the task. |
Boolean |
Below is an example of a plan file:
{
"task": "Type in a text of 'Test For Fun' with heading 1 level",
"object": "draft.docx",
"close": False
}
Note
The object
field is the application or file that the agent will interact with. The object must be active (can be minimized) when starting the Batch mode.
The structure of your files should be as follows, where tasks
is the directory for your tasks and files
is where your object files are stored:
Step 2: Start the Batch Mode
To start the Batch mode, run the following command:
# assume you are in the cloned UFO folder
python ufo.py --task_name {task_name} --mode batch_normal --plan {plan_file}
Tip
Replace {task_name}
with the name of the task and {plan_file}
with the Path_to_Parent/Plan_file
.
Evaluation
You may want to evaluate the task
is completed successfully or not by following the plan. UFO will call the EvaluationAgent
to evaluate the task if EVA_SESSION
is set to True
in the config_dev.yaml
file.
You can check the evaluation log in the logs/{task_name}/evaluation.log
file.
References
The batch mode employs a PlanReader
to parse the plan file and create a FromFileSession
to follow the plan.
PlanReader
The PlanReader
is located in the ufo/module/sessions/plan_reader.py
file.
The reader for a plan file.
Initialize a plan reader.
Parameters: |
-
plan_file
(str )
–
The path of the plan file.
|
Source code in module/sessions/plan_reader.py
18
19
20
21
22
23
24
25
26
27
28 | def __init__(self, plan_file: str):
"""
Initialize a plan reader.
:param plan_file: The path of the plan file.
"""
self.plan_file = plan_file
with open(plan_file, "r") as f:
self.plan = json.load(f)
self.remaining_steps = self.get_steps()
self.support_apps = ["word", "excel", "powerpoint"]
|
get_close()
Check if the plan is closed.
Returns: |
-
bool
–
True if the plan need closed, False otherwise.
|
Source code in module/sessions/plan_reader.py
| def get_close(self) -> bool:
"""
Check if the plan is closed.
:return: True if the plan need closed, False otherwise.
"""
return self.plan.get("close", False)
|
get_host_agent_request()
Get the request for the host agent.
Returns: |
-
str
–
The request for the host agent.
|
Source code in module/sessions/plan_reader.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88 | def get_host_agent_request(self) -> str:
"""
Get the request for the host agent.
:return: The request for the host agent.
"""
object_name = self.get_operation_object()
request = (
f"Open and select the application of {object_name}, and output the FINISH status immediately. "
"You must output the selected application with their control text and label even if it is already open."
)
return request
|
get_host_request()
Get the request for the host agent.
Returns: |
-
str
–
The request for the host agent.
|
Source code in module/sessions/plan_reader.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124 | def get_host_request(self) -> str:
"""
Get the request for the host agent.
:return: The request for the host agent.
"""
task = self.get_task()
object_name = self.get_operation_object()
if object_name in self.support_apps:
request = task
else:
request = f"Open the application of {task}. You must output the selected application with their control text and label even if it is already open."
return request
|
get_initial_request()
Get the initial request in the plan.
Source code in module/sessions/plan_reader.py
62
63
64
65
66
67
68
69
70
71
72
73 | def get_initial_request(self) -> str:
"""
Get the initial request in the plan.
:return: The initial request.
"""
task = self.get_task()
object_name = self.get_operation_object()
request = f"{task} in {object_name}"
return request
|
get_operation_object()
Get the operation object in the step.
Source code in module/sessions/plan_reader.py
| def get_operation_object(self) -> str:
"""
Get the operation object in the step.
:return: The operation object.
"""
return self.plan.get("object", None).lower()
|
get_root_path()
Get the root path of the plan.
Returns: |
-
str
–
The root path of the plan.
|
Source code in module/sessions/plan_reader.py
146
147
148
149
150
151
152 | def get_root_path(self) -> str:
"""
Get the root path of the plan.
:return: The root path of the plan.
"""
return os.path.dirname(os.path.abspath(self.plan_file))
|
get_steps()
Get the steps in the plan.
Source code in module/sessions/plan_reader.py
| def get_steps(self) -> List[str]:
"""
Get the steps in the plan.
:return: The steps in the plan.
"""
return self.plan.get("steps", [])
|
get_support_apps()
Get the support apps in the plan.
Returns: |
-
List[str]
–
The support apps in the plan.
|
Source code in module/sessions/plan_reader.py
103
104
105
106
107
108
109 | def get_support_apps(self) -> List[str]:
"""
Get the support apps in the plan.
:return: The support apps in the plan.
"""
return self.support_apps
|
get_task()
Get the task name.
Source code in module/sessions/plan_reader.py
| def get_task(self) -> str:
"""
Get the task name.
:return: The task name.
"""
return self.plan.get("task", "")
|
next_step()
Get the next step in the plan.
Source code in module/sessions/plan_reader.py
126
127
128
129
130
131
132
133
134
135
136 | def next_step(self) -> Optional[str]:
"""
Get the next step in the plan.
:return: The next step.
"""
if self.remaining_steps:
step = self.remaining_steps.pop(0)
return step
return None
|
task_finished()
Check if the task is finished.
Returns: |
-
bool
–
True if the task is finished, False otherwise.
|
Source code in module/sessions/plan_reader.py
138
139
140
141
142
143
144 | def task_finished(self) -> bool:
"""
Check if the task is finished.
:return: True if the task is finished, False otherwise.
"""
return not self.remaining_steps
|
FollowerSession
The FromFileSession
is also located in the ufo/module/sessions/session.py
file.
Bases: BaseSession
A session for UFO from files.
Initialize a session.
Parameters: |
-
task
(str )
–
The name of current task.
-
plan_file
(str )
–
The path of the plan file to follow.
-
should_evaluate
(bool )
–
Whether to evaluate the session.
-
id
(int )
–
|
Source code in module/sessions/session.py
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362 | def __init__(
self, task: str, plan_file: str, should_evaluate: bool, id: int
) -> None:
"""
Initialize a session.
:param task: The name of current task.
:param plan_file: The path of the plan file to follow.
:param should_evaluate: Whether to evaluate the session.
:param id: The id of the session.
"""
super().__init__(task, should_evaluate, id)
self.plan_file = plan_file
self.plan_reader = PlanReader(plan_file)
self.support_apps = self.plan_reader.get_support_apps()
self.close = self.plan_reader.get_close()
self.task_name = task.split("/")[1]
self.object_name = ""
|
create_new_round()
Create a new round.
Source code in module/sessions/session.py
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396 | def create_new_round(self) -> None:
"""
Create a new round.
"""
# Get a request for the new round.
request = self.next_request()
# Create a new round and return None if the session is finished.
if self.is_finished():
return None
self._host_agent.set_state(ContinueHostAgentState())
round = BaseRound(
request=request,
agent=self._host_agent,
context=self.context,
should_evaluate=configs.get("EVA_ROUND", False),
id=self.total_rounds,
)
self.add_round(round.id, round)
return round
|
get_app_com(object_name)
Get the COM object name based on the object name.
Source code in module/sessions/session.py
426
427
428
429
430
431
432
433
434
435
436
437
438 | def get_app_com(self, object_name: str) -> str:
"""
Get the COM object name based on the object name.
:param object_name: The name of the object.
:return: The COM object name.
"""
application_mapping = {
".docx": "Word.Application",
".xlsx": "Excel.Application",
".pptx": "PowerPoint.Application",
}
self.app_name = application_mapping.get(object_name)
return self.app_name
|
get_app_name(object_name)
Get the application name based on the object name.
Source code in module/sessions/session.py
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424 | def get_app_name(self, object_name: str) -> str:
"""
Get the application name based on the object name.
:param object_name: The name of the object.
:return: The application name.
"""
application_mapping = {
".docx": "WINWORD.EXE",
".xlsx": "EXCEL.EXE",
".pptx": "POWERPNT.EXE",
# "outlook": "olk.exe",
# "onenote": "ONENOTE.EXE",
}
self.app_name = application_mapping.get(object_name)
return self.app_name
|
next_request()
Get the request for the host agent.
Returns: |
-
str
–
The request for the host agent.
|
Source code in module/sessions/session.py
398
399
400
401
402
403
404
405
406
407
408 | def next_request(self) -> str:
"""
Get the request for the host agent.
:return: The request for the host agent.
"""
if self.total_rounds == 0:
return self.plan_reader.get_host_request()
else:
self._finish = True
return
|
record_task_done()
Record the task done.
Source code in module/sessions/session.py
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524 | def record_task_done(self) -> None:
"""
Record the task done.
"""
is_record = configs.get("TASK_STATUS", True)
if is_record:
file_path = configs.get(
"TASK_STATUS_FILE",
os.path.join(self.plan_file, "../..", "tasks_status.json"),
)
task_done = json.load(open(file_path, "r"))
task_done[self.task_name] = True
json.dump(
task_done,
open(file_path, "w"),
indent=4,
)
|
request_to_evaluate()
Check if the session should be evaluated.
Returns: |
-
bool
–
True if the session should be evaluated, False otherwise.
|
Source code in module/sessions/session.py
500
501
502
503
504
505
506 | def request_to_evaluate(self) -> bool:
"""
Check if the session should be evaluated.
:return: True if the session should be evaluated, False otherwise.
"""
request_memory = self._host_agent.blackboard.requests
return request_memory.to_json()
|
run()
Run the session.
Source code in module/sessions/session.py
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454 | def run(self) -> None:
"""
Run the session.
"""
self.setup_application_environment()
try:
super().run()
self.record_task_done()
except Exception as e:
import traceback
traceback.print_exc()
print(f"An error occurred: {e}")
# Close the APP if the user ask so.
self.terminate_application_processes()
|
setup_application_environment()
Sets up the application environment by determining the application name and
command based on the operation object, and then launching the application.
Raises:
Exception: If an error occurs during the execution of the command or
while interacting with the application via COM.
Source code in module/sessions/session.py
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498 | def setup_application_environment(self):
"""
Sets up the application environment by determining the application name and
command based on the operation object, and then launching the application.
Raises:
Exception: If an error occurs during the execution of the command or
while interacting with the application via COM.
"""
self.object_name = self.plan_reader.get_operation_object()
if self.object_name:
suffix = os.path.splitext(self.object_name)[1]
self.app_name = self.get_app_name(suffix)
if self.app_name not in self.support_apps:
return # The app is not supported, so we don't need to setup the environment.
file = self.plan_reader.get_file_path()
code_snippet = f"import os\nos.system('start {self.app_name} \"{file}\"')"
code_snippet = code_snippet.replace("\\", "\\\\") # escape backslashes
try:
exec(code_snippet, globals())
app_com = self.get_app_com(suffix)
time.sleep(2) # wait for the app to boot
word_app = win32com.client.Dispatch(app_com)
word_app.WindowState = 1 # wdWindowStateMaximize
except Exception as e:
print(f"An error occurred: {e}")
|
terminate_application_processes()
Terminates specific application processes based on the provided conditions.
Source code in module/sessions/session.py
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471 | def terminate_application_processes(self):
"""
Terminates specific application processes based on the provided conditions.
"""
if self.close:
if self.object_name:
for process in psutil.process_iter(["name"]):
if process.info["name"] == self.app_name:
os.system(f"taskkill /f /im {self.app_name}")
time.sleep(1)
else:
app_names = ["WINWORD.EXE", "EXCEL.EXE", "POWERPNT.EXE"]
for process in psutil.process_iter(["name"]):
if process.info["name"] in app_names:
os.system(f"taskkill /f /im {process.info['name']}")
time.sleep(1)
|