Session Factory & Pool

The SessionFactory and SessionPool classes provide platform-aware session creation and batch execution management, supporting 7 different session modes across Windows and Linux platforms.

Quick Reference:


Overview

The session factory and pool system provides:

  1. Platform Abstraction: Automatically creates the correct session type for Windows or Linux
  2. Mode Support: Handles 7 different execution modes with appropriate session classes
  3. Batch Management: Executes multiple sessions sequentially with status tracking
  4. Service Integration: Creates WebSocket-controlled sessions with AIP protocol

Architecture

graph TB subgraph "Client Code" REQ[User Request] MODE[Execution Mode] PLATFORM[Platform Detection] end subgraph "SessionFactory" FACTORY[SessionFactory] DETECT[Platform Detection] WINDOWS[_create_windows_session] LINUX[_create_linux_session] SERVICE[create_service_session] end subgraph "Session Types" S1[Session<br/>Windows Normal] S2[ServiceSession<br/>Windows Service] S3[FollowerSession<br/>Windows Follower] S4[FromFileSession<br/>Windows Batch] S5[OpenAIOperatorSession<br/>Windows Operator] S6[LinuxSession<br/>Linux Normal] S7[LinuxServiceSession<br/>Linux Service] end subgraph "SessionPool" POOL[SessionPool] LIST[session_list] RUN[run_all] NEXT[next_session] end REQ --> FACTORY MODE --> FACTORY PLATFORM --> DETECT FACTORY --> DETECT DETECT -->|Windows| WINDOWS DETECT -->|Linux| LINUX DETECT -->|Service| SERVICE WINDOWS --> S1 WINDOWS --> S2 WINDOWS --> S3 WINDOWS --> S4 WINDOWS --> S5 LINUX --> S6 LINUX --> S7 SERVICE -->|Windows| S2 SERVICE -->|Linux| S7 S1 --> POOL S2 --> POOL S3 --> POOL S4 --> POOL S5 --> POOL S6 --> POOL S7 --> POOL POOL --> LIST POOL --> RUN POOL --> NEXT style FACTORY fill:#e1f5ff style POOL fill:#f0ffe1 style DETECT fill:#fff4e1 style SERVICE fill:#ffe1f5

SessionFactory

SessionFactory is the central factory for creating all session types with automatic platform detection.

Class Overview

from ufo.module.session_pool import SessionFactory

factory = SessionFactory()

# Automatically detects platform and creates appropriate session
sessions = factory.create_session(
    task="email_task",
    mode="normal",
    plan="",
    request="Send an email to John"
)

Supported Modes

Mode Platform Session Type Use Case
normal Windows Session Interactive with HostAgent
normal Linux LinuxSession Interactive without HostAgent
normal_operator Windows Session Normal with operator mode
normal_operator Linux LinuxSession Normal with operator mode
service Windows ServiceSession WebSocket-controlled
service Linux LinuxServiceSession WebSocket-controlled
follower Windows FollowerSession Replay saved plans
batch_normal Windows FromFileSession Batch execution from files
operator Windows OpenAIOperatorSession Pure operator mode

Linux Mode Limitations

Currently, Linux only supports normal, normal_operator, and service modes. Follower and batch modes are planned for future releases.


create_session()

Creates one or more sessions based on platform, mode, and plan configuration.

Signature

def create_session(
    self,
    task: str,
    mode: str,
    plan: str,
    request: str = "",
    platform_override: Optional[str] = None,
    **kwargs,
) -> List[BaseSession]

Parameters

Parameter Type Default Description
task str Required Task name for logging/identification
mode str Required Execution mode (see table above)
plan str Required Plan file/folder path (for follower/batch modes)
request str "" User's natural language request
platform_override Optional[str] None Force platform: "windows" or "linux"
**kwargs Various - Additional parameters (see below)

Additional kwargs:

Key Type Used By Description
id int All modes Session ID for tracking
task_protocol TaskExecutionProtocol Service modes WebSocket protocol instance
application_name str Linux modes Target application

Return Value

List[BaseSession] - List of created sessions

  • Single session modes (normal, service, operator): Returns 1-element list
  • Batch modes (follower, batch_normal with folder): Returns list of sessions for each plan file

Platform Detection

graph TB START[create_session called] CHECK{platform_override?} AUTO[platform.system.lower] OVERRIDE[Use override value] WINDOWS{Platform == 'windows'?} LINUX{Platform == 'linux'?} ERROR[NotImplementedError] WIN_METHOD[_create_windows_session] LINUX_METHOD[_create_linux_session] RETURN[Return session list] START --> CHECK CHECK -->|None| AUTO CHECK -->|Set| OVERRIDE AUTO --> WINDOWS OVERRIDE --> WINDOWS WINDOWS -->|Yes| WIN_METHOD WINDOWS -->|No| LINUX LINUX -->|Yes| LINUX_METHOD LINUX -->|No| ERROR WIN_METHOD --> RETURN LINUX_METHOD --> RETURN style START fill:#e1f5ff style WIN_METHOD fill:#f0ffe1 style LINUX_METHOD fill:#fff4e1 style ERROR fill:#ffe1e1

Examples

Example 1: Normal Windows Session

factory = SessionFactory()

sessions = factory.create_session(
    task="browse_web",
    mode="normal",
    plan="",
    request="Open Chrome and navigate to google.com"
)

# Returns: [Session(task="browse_web", ...)]
session = sessions[0]
await session.run()

Example 2: Service Session (Auto-detected Platform)

from aip.protocol.task_execution import TaskExecutionProtocol

protocol = TaskExecutionProtocol(websocket_connection)

sessions = factory.create_session(
    task="remote_control",
    mode="service",
    plan="",
    request="Click the Start button",
    task_protocol=protocol
)

# On Windows: Returns [ServiceSession(...)]
# On Linux: Returns [LinuxServiceSession(...)]

Example 3: Batch Follower Sessions

sessions = factory.create_session(
    task="batch_email",
    mode="follower",
    plan="/path/to/plan_folder",  # Folder with multiple .json plan files
    request=""
)

# Returns: [
#   FollowerSession(task="batch_email/plan1", ...),
#   FollowerSession(task="batch_email/plan2", ...),
#   FollowerSession(task="batch_email/plan3", ...)
# ]

# Execute with SessionPool
pool = SessionPool(sessions)
await pool.run_all()

Example 4: Linux Session with Application

sessions = factory.create_session(
    task="edit_document",
    mode="normal",
    plan="",
    request="Type 'Hello World'",
    platform_override="linux",
    application_name="gedit"
)

# Returns: [LinuxSession(task="edit_document", application_name="gedit")]

Example 5: Operator Mode

sessions = factory.create_session(
    task="complex_workflow",
    mode="operator",
    plan="",
    request="Organize my desktop files by date"
)

# Returns: [OpenAIOperatorSession(task="complex_workflow", ...)]

create_service_session()

Simplified method specifically for creating service sessions on any platform.

Signature

def create_service_session(
    self,
    task: str,
    should_evaluate: bool,
    id: str,
    request: str,
    task_protocol: Optional["TaskExecutionProtocol"] = None,
    platform_override: Optional[str] = None,
) -> BaseSession

Parameters

Parameter Type Default Description
task str Required Task name
should_evaluate bool Required Enable evaluation
id str Required Session ID
request str Required User request
task_protocol TaskExecutionProtocol None AIP protocol instance
platform_override Optional[str] None Force platform

Return Value

BaseSession - Single service session instance

  • Windows: Returns ServiceSession
  • Linux: Returns LinuxServiceSession

Example

factory = SessionFactory()
protocol = TaskExecutionProtocol(websocket)

session = factory.create_service_session(
    task="remote_task",
    should_evaluate=True,
    id="session_001",
    request="Open Notepad",
    task_protocol=protocol
)

# Type varies by platform
if isinstance(session, ServiceSession):
    print("Windows service session")
elif isinstance(session, LinuxServiceSession):
    print("Linux service session")

await session.run()

_create_windows_session() (Internal)

Internal Method

Called by create_session() when platform is Windows. Not meant for direct use.

Mode Routing

graph TB START[_create_windows_session] MODE{mode value} NORMAL[normal/normal_operator] SERVICE[service] FOLLOWER[follower] BATCH[batch_normal] OPERATOR[operator] ERROR[ValueError] S1[Session] S2[ServiceSession] S3_CHECK{plan is folder?} S3_BATCH[create_follower_session_in_batch] S3_SINGLE[FollowerSession single] S4_CHECK{plan is folder?} S4_BATCH[create_sessions_in_batch] S4_SINGLE[FromFileSession single] S5[OpenAIOperatorSession] START --> MODE MODE -->|normal| NORMAL MODE -->|normal_operator| NORMAL MODE -->|service| SERVICE MODE -->|follower| FOLLOWER MODE -->|batch_normal| BATCH MODE -->|operator| OPERATOR MODE -->|other| ERROR NORMAL --> S1 SERVICE --> S2 FOLLOWER --> S3_CHECK S3_CHECK -->|Yes| S3_BATCH S3_CHECK -->|No| S3_SINGLE BATCH --> S4_CHECK S4_CHECK -->|Yes| S4_BATCH S4_CHECK -->|No| S4_SINGLE OPERATOR --> S5 style START fill:#e1f5ff style S1 fill:#f0ffe1 style S2 fill:#fff4e1 style ERROR fill:#ffe1e1

Created Session Types

Mode Condition Session Type Notes
normal - Session Standard interactive
normal_operator - Session With operator mode flag
service - ServiceSession Requires task_protocol
follower Plan is file FollowerSession Single plan replay
follower Plan is folder List[FollowerSession] Batch plan replay
batch_normal Plan is file FromFileSession Single file execution
batch_normal Plan is folder List[FromFileSession] Batch file execution
operator - OpenAIOperatorSession Pure operator mode

_create_linux_session() (Internal)

Internal Method

Called by create_session() when platform is Linux. Not meant for direct use.

Mode Routing

graph TB START[_create_linux_session] MODE{mode value} NORMAL[normal/normal_operator] SERVICE[service] ERROR[ValueError] S1[LinuxSession] S2[LinuxServiceSession] START --> MODE MODE -->|normal| NORMAL MODE -->|normal_operator| NORMAL MODE -->|service| SERVICE MODE -->|other| ERROR NORMAL --> S1 SERVICE --> S2 style START fill:#e1f5ff style S1 fill:#f0ffe1 style S2 fill:#fff4e1 style ERROR fill:#ffe1e1

Supported Modes

Mode Session Type Notes
normal LinuxSession Standard Linux interactive
normal_operator LinuxSession With operator mode flag
service LinuxServiceSession Requires task_protocol

Upcoming Features

Follower and batch_normal modes for Linux are planned for future releases.


Batch Session Creation

create_follower_session_in_batch()

Creates multiple follower sessions from a folder of plan files:

def create_follower_session_in_batch(
    self, 
    task: str, 
    plan: str
) -> List[BaseSession]

Process:

  1. Scan folder for .json files
  2. Extract file names (without extension)
  3. Create FollowerSession for each plan file
  4. Assign sequential IDs
  5. Prefix task name with file name: {task}/{filename}

Example:

# Folder structure:
# /plans/
#   ├── email_john.json
#   ├── email_jane.json
#   └── email_bob.json

sessions = factory.create_follower_session_in_batch(
    task="send_emails",
    plan="/plans/"
)

# Returns:
# [
#   FollowerSession(task="send_emails/email_john", plan="/plans/email_john.json", id=0),
#   FollowerSession(task="send_emails/email_jane", plan="/plans/email_jane.json", id=1),
#   FollowerSession(task="send_emails/email_bob", plan="/plans/email_bob.json", id=2)
# ]

create_sessions_in_batch()

Creates multiple FromFileSession instances with task status tracking:

def create_sessions_in_batch(
    self, 
    task: str, 
    plan: str
) -> List[BaseSession]

Features:

  • Tracks completed tasks in tasks_status.json
  • Skips already-completed tasks
  • Resumes from last incomplete task

Task Status File:

{
  "email_john": true,
  "email_jane": false,
  "email_bob": false
}

Example:

# First run
sessions = factory.create_sessions_in_batch(
    task="batch_emails",
    plan="/requests/"
)
# Returns 3 sessions: email_john, email_jane, email_bob

# email_john completes successfully
# tasks_status.json updated: {"email_john": true, "email_jane": false, "email_bob": false}

# Second run (after restart)
sessions = factory.create_sessions_in_batch(
    task="batch_emails",
    plan="/requests/"
)
# Returns 2 sessions: email_jane, email_bob (skips completed email_john)

Configuration:

# Enable task status tracking
ufo_config.system.task_status = True

# Custom status file location
ufo_config.system.task_status_file = "/path/to/status.json"

SessionPool

SessionPool manages multiple sessions and executes them sequentially.

Class Overview

from ufo.module.session_pool import SessionPool

# Create sessions
sessions = factory.create_session(
    task="batch_task",
    mode="follower",
    plan="/plans_folder/"
)

# Create pool
pool = SessionPool(session_list=sessions)

# Execute all
await pool.run_all()

Constructor

def __init__(self, session_list: List[BaseSession]) -> None

Parameters:

Parameter Type Description
session_list List[BaseSession] Initial list of sessions

Methods

run_all()

Execute all sessions in the pool sequentially:

async def run_all(self) -> None

Execution Flow:

sequenceDiagram participant Pool as SessionPool participant S1 as Session 1 participant S2 as Session 2 participant S3 as Session 3 Pool->>S1: await session.run() S1->>S1: Execute task S1-->>Pool: Complete Pool->>S2: await session.run() S2->>S2: Execute task S2-->>Pool: Complete Pool->>S3: await session.run() S3->>S3: Execute task S3-->>Pool: Complete Pool-->>Pool: All sessions complete

Example:

pool = SessionPool(sessions)

# Execute all sequentially
await pool.run_all()

# All sessions have completed
print("Batch execution complete")

add_session()

Add a session to the pool:

def add_session(self, session: BaseSession) -> None

Example:

pool = SessionPool([session1, session2])

# Add another session
pool.add_session(session3)

# Now pool has 3 sessions

next_session()

Get and remove the next session from the pool:

def next_session(self) -> BaseSession

Example:

pool = SessionPool([session1, session2, session3])

# Get next session (FIFO)
next_sess = pool.next_session()
# next_sess == session1
# Pool now has [session2, session3]

await next_sess.run()

session_list (Property)

Get the current session list:

@property
def session_list(self) -> List[BaseSession]

Example:

pool = SessionPool(sessions)

print(f"Pool has {len(pool.session_list)} sessions")

for session in pool.session_list:
    print(f"Task: {session.task}")

Usage Patterns

Pattern 1: Single Interactive Session

factory = SessionFactory()

sessions = factory.create_session(
    task="user_task",
    mode="normal",
    plan="",
    request="Open Word and create a document"
)

session = sessions[0]
await session.run()

Pattern 2: Service Session with WebSocket

from aip.protocol.task_execution import TaskExecutionProtocol

# WebSocket connection established
protocol = TaskExecutionProtocol(websocket)

factory = SessionFactory()

session = factory.create_service_session(
    task="remote_automation",
    should_evaluate=True,
    id="session_123",
    request="Click the Submit button",
    task_protocol=protocol
)

await session.run()

Pattern 3: Batch Execution

# Create batch sessions
factory = SessionFactory()

sessions = factory.create_session(
    task="daily_reports",
    mode="batch_normal",
    plan="/request_files/",  # Folder with .json request files
    request=""
)

# Execute with pool
pool = SessionPool(sessions)
await pool.run_all()

print(f"Completed {len(sessions)} tasks")

Pattern 4: Cross-Platform Application

import platform

factory = SessionFactory()

# Detect current platform
current_os = platform.system().lower()

sessions = factory.create_session(
    task="cross_platform_task",
    mode="normal",
    plan="",
    request="Open text editor",
    application_name="gedit" if current_os == "linux" else None
)

# Correct session type automatically created
await sessions[0].run()

Pattern 5: Dynamic Session Pool

pool = SessionPool([])

# Add sessions dynamically
for user_request in user_requests:
    sessions = factory.create_session(
        task=f"request_{len(pool.session_list)}",
        mode="normal",
        plan="",
        request=user_request
    )
    pool.add_session(sessions[0])

# Execute all
await pool.run_all()

Pattern 6: Resumable Batch Processing

# Enable task status tracking
ufo_config.system.task_status = True
ufo_config.system.task_status_file = "progress.json"

factory = SessionFactory()

# First run
sessions = factory.create_sessions_in_batch(
    task="large_batch",
    plan="/tasks/"
)

pool = SessionPool(sessions)

try:
    await pool.run_all()
except KeyboardInterrupt:
    print("Interrupted - progress saved")

# Second run (resumes from last incomplete)
sessions = factory.create_sessions_in_batch(
    task="large_batch",
    plan="/tasks/"
)
# Only uncompleted tasks loaded

pool = SessionPool(sessions)
await pool.run_all()

Configuration Integration

UFO Config Settings

Setting Type Purpose
ufo_config.system.eva_session bool Enable session evaluation
ufo_config.system.task_status bool Enable task status tracking
ufo_config.system.task_status_file str Custom status file path

Example Configuration

# config/ufo/config.yaml
system:
  eva_session: true
  task_status: true
  task_status_file: "./logs/task_status.json"

Usage:

from config.config_loader import get_ufo_config

ufo_config = get_ufo_config()

# These settings affect SessionFactory behavior
factory = SessionFactory()

# Uses ufo_config.system.eva_session for should_evaluate
sessions = factory.create_session(
    task="configured_task",
    mode="normal",
    plan="",
    request="Do something"
)

Platform Detection

Automatic Detection

import platform

current_platform = platform.system().lower()
# Returns: "windows", "linux", "darwin" (macOS)

Supported Platforms:

  • "windows" → Windows-specific sessions
  • "linux" → Linux-specific sessions
  • Others → NotImplementedError

Manual Override

Force platform selection:

# Force Windows session on Linux machine (for testing)
sessions = factory.create_session(
    task="test",
    mode="normal",
    plan="",
    request="Test request",
    platform_override="windows"
)

# Creates Session instead of LinuxSession

Override Use Cases

  • Testing: Test Windows sessions on Linux
  • Development: Test platform-specific code
  • Cross-compilation: Generate plans for other platforms
  • Not for production: Always use auto-detection in production

Error Handling

NotImplementedError

Trigger: Unsupported platform or mode

try:
    sessions = factory.create_session(
        task="task",
        mode="follower",
        plan="",
        request="",
        platform_override="darwin"  # macOS not supported
    )
except NotImplementedError as e:
    print(f"Error: {e}")
    # Error: Platform darwin is not supported yet.

ValueError

Trigger: Invalid mode for platform

try:
    sessions = factory.create_session(
        task="task",
        mode="follower",
        plan="",
        request="",
        platform_override="linux"
    )
except ValueError as e:
    print(f"Error: {e}")
    # Error: The follower mode is not supported on Linux yet.
    #        Supported modes: normal, normal_operator, service

Graceful Handling

def create_session_safely(task, mode, plan, request):
    """Create session with error handling."""
    factory = SessionFactory()

    try:
        sessions = factory.create_session(
            task=task,
            mode=mode,
            plan=plan,
            request=request
        )
        return sessions

    except NotImplementedError as e:
        logger.error(f"Platform not supported: {e}")
        return []

    except ValueError as e:
        logger.error(f"Invalid mode: {e}")
        # Fallback to normal mode
        return factory.create_session(
            task=task,
            mode="normal",
            plan="",
            request=request
        )

Best Practices

Session Creation

Efficient Session Management

  • ✅ Use create_service_session() for service sessions (cleaner API)
  • ✅ Let platform auto-detect unless testing
  • ✅ Use batch modes for multiple similar tasks
  • ✅ Enable task status tracking for long-running batches
  • ❌ Don't create sessions in tight loops (use batch modes)
  • ❌ Don't mix session types in same pool without reason

Batch Processing

Optimal Batch Execution

  1. Group similar tasks in same folder
  2. Enable task status tracking for resumability
  3. Use descriptive filenames for task identification
  4. Handle failures gracefully (don't stop entire batch)
  5. Monitor progress with logging

Platform Handling

Cross-Platform Considerations

  • Always check platform before platform-specific operations
  • Use application_name parameter for Linux sessions
  • Test on both platforms if deploying cross-platform
  • Document platform-specific features clearly

Troubleshooting

Issue: Wrong Session Type Created

Symptoms: - Expected LinuxSession but got Session - Mode not working as expected

Diagnosis:

session = sessions[0]
print(f"Session type: {type(session).__name__}")
print(f"Platform: {platform.system().lower()}")

Solutions: 1. Check platform detection: platform.system().lower() 2. Verify mode spelling and case 3. Use platform_override if needed for testing

Issue: Batch Sessions Not Found

Symptoms: - Empty session list from batch creation - create_sessions_in_batch() returns []

Diagnosis:

plan_files = factory.get_plan_files("/path/to/folder")
print(f"Found {len(plan_files)} plan files")
print(f"Files: {plan_files}")

Solutions: 1. Ensure folder exists: os.path.isdir(plan_folder) 2. Check files have .json extension 3. Verify file permissions 4. Check task status file hasn't marked all as done

Issue: Service Session Missing Protocol

Symptoms: - ValueError about missing protocol - Service session fails to initialize

Diagnosis:

protocol = kwargs.get("task_protocol")
print(f"Protocol: {protocol}")
print(f"Type: {type(protocol)}")

Solution: Always provide task_protocol for service sessions:

from aip.protocol.task_execution import TaskExecutionProtocol

protocol = TaskExecutionProtocol(websocket)

session = factory.create_service_session(
    task="service_task",
    should_evaluate=True,
    id="sess_001",
    request="Do something",
    task_protocol=protocol  # ← Required!
)

Reference

SessionFactory Methods

The factory class to create a session. Supports both Windows and Linux platforms with different session types.

create_follower_session_in_batch(task, plan)

Create a follower session.

Parameters:
  • task (str) –

    The name of current task.

  • plan (str) –

    The path folder of all plan files.

Returns:
  • List[BaseSession]

    The list of created follower sessions.

Source code in module/session_pool.py
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
def create_follower_session_in_batch(
    self, task: str, plan: str
) -> List[BaseSession]:
    """
    Create a follower session.
    :param task: The name of current task.
    :param plan: The path folder of all plan files.
    :return: The list of created follower sessions.
    """
    plan_files = self.get_plan_files(plan)
    file_names = [self.get_file_name_without_extension(f) for f in plan_files]
    sessions = [
        FollowerSession(
            f"{task}/{file_name}",
            plan_file,
            ufo_config.system.eva_session,
            id=i,
        )
        for i, (file_name, plan_file) in enumerate(zip(file_names, plan_files))
    ]

    return sessions

create_service_session(task, should_evaluate, id, request, task_protocol=None, platform_override=None)

Convenient method to create a service session for any platform.

Parameters:
  • task (str) –

    Task name.

  • should_evaluate (bool) –

    Whether to evaluate.

  • id (str) –

    Session ID.

  • request (str) –

    User request.

  • task_protocol (Optional[TaskExecutionProtocol], default: None ) –

    AIP TaskExecutionProtocol instance.

  • platform_override (Optional[str], default: None ) –

    Override platform detection ('windows' or 'linux').

Returns:
  • BaseSession

    Platform-specific service session.

Source code in module/session_pool.py
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
def create_service_session(
    self,
    task: str,
    should_evaluate: bool,
    id: str,
    request: str,
    task_protocol: Optional["TaskExecutionProtocol"] = None,
    platform_override: Optional[str] = None,
) -> BaseSession:
    """
    Convenient method to create a service session for any platform.
    :param task: Task name.
    :param should_evaluate: Whether to evaluate.
    :param id: Session ID.
    :param request: User request.
    :param task_protocol: AIP TaskExecutionProtocol instance.
    :param platform_override: Override platform detection ('windows' or 'linux').
    :return: Platform-specific service session.
    """
    current_platform = platform_override or platform.system().lower()

    if current_platform == "windows":
        self.logger.info("Creating Windows service session")
        return ServiceSession(
            task=task,
            should_evaluate=should_evaluate,
            id=id,
            request=request,
            task_protocol=task_protocol,
        )
    elif current_platform == "linux":
        self.logger.info("Creating Linux service session")
        return LinuxServiceSession(
            task=task,
            should_evaluate=should_evaluate,
            id=id,
            request=request,
            task_protocol=task_protocol,
        )
    else:
        raise NotImplementedError(
            f"Service session not supported on {current_platform}"
        )

create_session(task, mode, plan, request='', platform_override=None, **kwargs)

Create a session based on platform and mode.

Parameters:
  • task (str) –

    The name of current task.

  • mode (str) –

    The mode of the task.

  • plan (str) –

    The plan file or folder path (for follower/batch modes).

  • request (str, default: '' ) –

    The user request.

  • platform_override (Optional[str], default: None ) –

    Override platform detection ('windows' or 'linux').

  • kwargs

    Additional platform-specific parameters: - application_name: Target application (for Linux sessions) - websocket: WebSocket connection (for service sessions)

Returns:
  • List[BaseSession]

    The created session list.

Source code in module/session_pool.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def create_session(
    self,
    task: str,
    mode: str,
    plan: str,
    request: str = "",
    platform_override: Optional[str] = None,
    **kwargs,
) -> List[BaseSession]:
    """
    Create a session based on platform and mode.
    :param task: The name of current task.
    :param mode: The mode of the task.
    :param plan: The plan file or folder path (for follower/batch modes).
    :param request: The user request.
    :param platform_override: Override platform detection ('windows' or 'linux').
    :param kwargs: Additional platform-specific parameters:
        - application_name: Target application (for Linux sessions)
        - websocket: WebSocket connection (for service sessions)
    :return: The created session list.
    """
    current_platform = platform_override or platform.system().lower()

    if current_platform == "windows":
        return self._create_windows_session(task, mode, plan, request, **kwargs)
    elif current_platform == "linux":
        return self._create_linux_session(task, mode, plan, request, **kwargs)
    else:
        raise NotImplementedError(
            f"Platform {current_platform} is not supported yet."
        )

create_sessions_in_batch(task, plan)

Create a follower session.

Parameters:
  • task (str) –

    The name of current task.

  • plan (str) –

    The path folder of all plan files.

Returns:
  • List[BaseSession]

    The list of created follower sessions.

Source code in module/session_pool.py
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
def create_sessions_in_batch(self, task: str, plan: str) -> List[BaseSession]:
    """
    Create a follower session.
    :param task: The name of current task.
    :param plan: The path folder of all plan files.
    :return: The list of created follower sessions.
    """
    is_record = ufo_config.system.task_status
    plan_files = self.get_plan_files(plan)
    file_names = [self.get_file_name_without_extension(f) for f in plan_files]
    is_done_files = []
    if is_record:
        file_path = ufo_config.system.task_status_file or os.path.join(
            os.path.dirname(plan), "tasks_status.json"
        )
        if not os.path.exists(file_path):
            self.task_done = {f: False for f in file_names}
            json.dump(
                self.task_done,
                open(file_path, "w"),
                indent=4,
            )
        else:
            self.task_done = json.load(open(file_path, "r"))
            is_done_files = [f for f in file_names if self.task_done.get(f, False)]

    sessions = [
        FromFileSession(
            f"{task}/{file_name}",
            plan_file,
            ufo_config.system.eva_session,
            id=i,
        )
        for i, (file_name, plan_file) in enumerate(zip(file_names, plan_files))
        if file_name not in is_done_files
    ]

    return sessions

get_file_name_without_extension(file_path)

Get the file name without extension.

Parameters:
  • file_path (str) –

    The path of the file.

Returns:
  • str

    The file name without extension.

Source code in module/session_pool.py
347
348
349
350
351
352
353
def get_file_name_without_extension(self, file_path: str) -> str:
    """
    Get the file name without extension.
    :param file_path: The path of the file.
    :return: The file name without extension.
    """
    return os.path.splitext(os.path.basename(file_path))[0]

get_plan_files(path) staticmethod

Get the plan files in the folder. The plan file should have the extension ".json".

Parameters:
  • path (str) –

    The path of the folder.

Returns:
  • List[str]

    The plan files in the folder.

Source code in module/session_pool.py
338
339
340
341
342
343
344
345
@staticmethod
def get_plan_files(path: str) -> List[str]:
    """
    Get the plan files in the folder. The plan file should have the extension ".json".
    :param path: The path of the folder.
    :return: The plan files in the folder.
    """
    return [os.path.join(path, f) for f in os.listdir(path) if f.endswith(".json")]

is_folder(path) staticmethod

Check if the path is a folder.

Parameters:
  • path (str) –

    The path to check.

Returns:
  • bool

    True if the path is a folder, False otherwise.

Source code in module/session_pool.py
329
330
331
332
333
334
335
336
@staticmethod
def is_folder(path: str) -> bool:
    """
    Check if the path is a folder.
    :param path: The path to check.
    :return: True if the path is a folder, False otherwise.
    """
    return os.path.isdir(path)

SessionPool Methods

The manager for the UFO clients.

Initialize a batch UFO client.

Source code in module/session_pool.py
33
34
35
36
37
38
def __init__(self, session_list: List[BaseSession]) -> None:
    """
    Initialize a batch UFO client.
    """

    self._session_list = session_list

session_list property

Get the session list.

Returns:
  • List[BaseSession]

    The session list.

add_session(session)

Add a session to the session list.

Parameters:
  • session (BaseSession) –

    The session to add.

Source code in module/session_pool.py
56
57
58
59
60
61
def add_session(self, session: BaseSession) -> None:
    """
    Add a session to the session list.
    :param session: The session to add.
    """
    self._session_list.append(session)

next_session()

Get the next session.

Returns:
  • BaseSession

    The next session.

Source code in module/session_pool.py
63
64
65
66
67
68
def next_session(self) -> BaseSession:
    """
    Get the next session.
    :return: The next session.
    """
    return self._session_list.pop(0)

run_all() async

Run the batch UFO client.

Source code in module/session_pool.py
40
41
42
43
44
45
46
async def run_all(self) -> None:
    """
    Run the batch UFO client.
    """

    for session in self.session_list:
        await session.run()

See Also