Skip to main content
Open In Colab

Websockets: Streaming input and output using websockets

Open on GitHub

This notebook demonstrates how to use the IOStream class to stream both input and output using websockets. The use of websockets allows you to build web clients that are more responsive than the one using web methods. The main difference is that the webosockets allows you to push data while you need to poll the server for new response using web mothods.

In this guide, we explore the capabilities of the IOStream class. It is specifically designed to enhance the development of clients such as web clients which use websockets for streaming both input and output. The IOStream stands out by enabling a more dynamic and interactive user experience for web applications.

Websockets technology is at the core of this functionality, offering a significant advancement over traditional web methods by allowing data to be “pushed” to the client in real-time. This is a departure from the conventional approach where clients must repeatedly “poll” the server to check for any new responses. By employing the underlining websockets library, the IOStream class facilitates a continuous, two-way communication channel between the server and client. This ensures that updates are received instantly, without the need for constant polling, thereby making web clients more efficient and responsive.

The real power of websockets, leveraged through the IOStream class, lies in its ability to create highly responsive web clients. This responsiveness is critical for applications requiring real-time data updates such as chat applications. By integrating the IOStream class into your web application, you not only enhance user experience through immediate data transmission but also reduce the load on your server by eliminating unnecessary polling.

In essence, the transition to using websockets through the IOStream class marks a significant enhancement in web client development. This approach not only streamlines the data exchange process between clients and servers but also opens up new possibilities for creating more interactive and engaging web applications. By following this guide, developers can harness the full potential of websockets and the IOStream class to push the boundaries of what is possible with web client responsiveness and interactivity.

Requirements

Requirements

Some extra dependencies are needed for this notebook, which can be installed via pip:

pip install pyautogen[websockets] fastapi uvicorn

For more information, please refer to the installation guide.

Set your API Endpoint

The config_list_from_json function loads a list of configurations from an environment variable or a json file.

from datetime import datetime
from tempfile import TemporaryDirectory

from websockets.sync.client import connect as ws_connect

import autogen
from autogen.io.websockets import IOWebsockets

config_list = autogen.config_list_from_json(
env_or_file="OAI_CONFIG_LIST",
filter_dict={
"model": ["gpt-4", "gpt-3.5-turbo", "gpt-3.5-turbo-16k"],
},
)
tip

Learn more about configuring LLMs for agents here.

Defining on_connect function

An on_connect function is a crucial part of applications that utilize websockets, acting as an event handler that is called whenever a new client connection is established. This function is designed to initiate any necessary setup, communication protocols, or data exchange procedures specific to the newly connected client. Essentially, it lays the groundwork for the interactive session that follows, configuring how the server and the client will communicate and what initial actions are to be taken once a connection is made. Now, let’s delve into the details of how to define this function, especially in the context of using the AutoGen framework with websockets.

Upon a client’s connection to the websocket server, the server automatically initiates a new instance of the IOWebsockets class. This instance is crucial for managing the data flow between the server and the client. The on_connect function leverages this instance to set up the communication protocol, define interaction rules, and initiate any preliminary data exchanges or configurations required for the client-server interaction to proceed smoothly.

def on_connect(iostream: IOWebsockets) -> None:
print(f" - on_connect(): Connected to client using IOWebsockets {iostream}", flush=True)

print(" - on_connect(): Receiving message from client.", flush=True)

# 1. Receive Initial Message
initial_msg = iostream.input()

# 2. Instantiate ConversableAgent
agent = autogen.ConversableAgent(
name="chatbot",
system_message="Complete a task given to you and reply TERMINATE when the task is done. If asked about the weather, use tool 'weather_forecast(city)' to get the weather forecast for a city.",
llm_config={
"config_list": autogen.config_list_from_json(
env_or_file="OAI_CONFIG_LIST",
filter_dict={
"model": ["gpt-4", "gpt-3.5-turbo", "gpt-3.5-turbo-16k"],
},
),
"stream": True,
},
)

# 3. Define UserProxyAgent
user_proxy = autogen.UserProxyAgent(
name="user_proxy",
system_message="A proxy for the user.",
is_termination_msg=lambda x: x.get("content", "") and x.get("content", "").rstrip().endswith("TERMINATE"),
human_input_mode="NEVER",
max_consecutive_auto_reply=10,
code_execution_config=False,
)

# 4. Define Agent-specific Functions
def weather_forecast(city: str) -> str:
return f"The weather forecast for {city} at {datetime.now()} is sunny."

autogen.register_function(
weather_forecast, caller=agent, executor=user_proxy, description="Weather forecast for a city"
)

# 5. Initiate conversation
print(
f" - on_connect(): Initiating chat with agent {agent} using message '{initial_msg}'",
flush=True,
)
user_proxy.initiate_chat( # noqa: F704
agent,
message=initial_msg,
)

Here’s an explanation on how a typical on_connect function such as the one in the example above is defined:

  1. Receive Initial Message: Immediately after establishing a connection, receive an initial message from the client. This step is crucial for understanding the client’s request or initiating the conversation flow.

  2. Instantiate ConversableAgent: Create an instance of ConversableAgent with a specific system message and the LLM configuration. If you need more than one agent, make sure they don’t share the same llm_config as adding a function to one of them will also attempt to add it to another.

  3. Instantiate UserProxyAgent: Similarly, create a UserProxyAgent instance, defining its termination condition, human input mode, and other relevant parameters. There is no need to define llm_config as the UserProxyAgent does not use LLM.

  4. Define Agent-specific Functions: If your conversable agent requires executing specific tasks, such as fetching a weather forecast in the example above, define these functions within the on_connect scope. Decorate these functions accordingly to link them with your agents.

  5. Initiate Conversation: Finally, use the initiate_chat method of your UserProxyAgent to start the interaction with the conversable agent, passing the initial message and a cache mechanism for efficiency.

Testing websockets server with Python client

Testing an on_connect function with a Python client involves simulating a client-server interaction to ensure the setup, data exchange, and communication protocols function as intended. Here’s a brief explanation on how to conduct this test using a Python client:

  1. Start the Websocket Server: Use the IOWebsockets.run_server_in_thread method to start the server in a separate thread, specifying the on_connect function and the port. This method returns the URI of the running websocket server.

  2. Connect to the Server: Open a connection to the server using the returned URI. This simulates a client initiating a connection to your websocket server.

  3. Send a Message to the Server: Once connected, send a message from the client to the server. This tests the server’s ability to receive messages through the established websocket connection.

  4. Receive and Process Messages: Implement a loop to continuously receive messages from the server. Decode the messages if necessary, and process them accordingly. This step verifies the server’s ability to respond back to the client’s request.

This test scenario effectively evaluates the interaction between a client and a server using the on_connect function, by simulating a realistic message exchange. It ensures that the server can handle incoming connections, process messages, and communicate responses back to the client, all critical functionalities for a robust websocket-based application.

with IOWebsockets.run_server_in_thread(on_connect=on_connect, port=8765) as uri:
print(f" - test_setup() with websocket server running on {uri}.", flush=True)

with ws_connect(uri) as websocket:
print(f" - Connected to server on {uri}", flush=True)

print(" - Sending message to server.", flush=True)
# websocket.send("2+2=?")
websocket.send("Check out the weather in Paris and write a poem about it.")

while True:
message = websocket.recv()
message = message.decode("utf-8") if isinstance(message, bytes) else message

print(message, end="", flush=True)

if "TERMINATE" in message:
print()
print(" - Received TERMINATE message. Exiting.", flush=True)
break
 - test_setup() with websocket server running on ws://127.0.0.1:8765.
- on_connect(): Connected to client using IOWebsockets <autogen.io.websockets.IOWebsockets object at 0x7b8fd65b3c10>
- on_connect(): Receiving message from client.
- Connected to server on ws://127.0.0.1:8765
- Sending message to server.
- on_connect(): Initiating chat with agent <autogen.agentchat.conversable_agent.ConversableAgent object at 0x7b909c086290> using message 'Check out the weather in Paris and write a poem about it.'
user_proxy (to chatbot):

Check out the weather in Paris and write a poem about it.

--------------------------------------------------------------------------------

>>>>>>>> USING AUTO REPLY...
chatbot (to user_proxy):


***** Suggested tool call (call_xFFWe52vwdpgZ8xTRV6adBdy): weather_forecast *****
Arguments:
{
"city": "Paris"
}
*********************************************************************************

--------------------------------------------------------------------------------

>>>>>>>> EXECUTING FUNCTION weather_forecast...
user_proxy (to chatbot):

user_proxy (to chatbot):

***** Response from calling tool (call_xFFWe52vwdpgZ8xTRV6adBdy) *****
The weather forecast for Paris at 2024-04-05 12:00:06.206125 is sunny.
**********************************************************************

--------------------------------------------------------------------------------

>>>>>>>> USING AUTO REPLY...
In the heart of France, beneath the sun's warm glow,
Lies the city of Paris, where the Seine waters flow.
Bathed in sunlight, every street and spire,
Illuminated each detail, just like a docile fire.

Once monochromatic cityscape, kissed by the sun's bright light,
Now a kaleidoscope of colors, from morning till the night.
This sun-swept city sparkles, under the azure dome,
Her inhabitants find comfort, for they call this city home.

One can wander in her sunshine, on this perfect weather day,
And feel the warmth it brings, to chase your blues away.
For the weather in Paris, is more than just a forecast,
It is a stage setting for dwellers and tourists amassed.

TERMINATE

chatbot (to user_proxy):

In the heart of France, beneath the sun's warm glow,
Lies the city of Paris, where the Seine waters flow.
Bathed in sunlight, every street and spire,
Illuminated each detail, just like a docile fire.

Once monochromatic cityscape, kissed by the sun's bright light,
Now a kaleidoscope of colors, from morning till the night.
This sun-swept city sparkles, under the azure dome,
Her inhabitants find comfort, for they call this city home.

One can wander in her sunshine, on this perfect weather day,
And feel the warmth it brings, to chase your blues away.
For the weather in Paris, is more than just a forecast,
It is a stage setting for dwellers and tourists amassed.

TERMINATE

- Received TERMINATE message. Exiting.

Testing websockets server running inside FastAPI server with HTML/JS client

The code snippets below outlines an approach for testing an on_connect function in a web environment using FastAPI to serve a simple interactive HTML page. This method allows users to send messages through a web interface, which are then processed by the server running the AutoGen framework via websockets. Here’s a step-by-step explanation:

  1. FastAPI Application Setup: The code initiates by importing necessary libraries and setting up a FastAPI application. FastAPI is a modern, fast web framework for building APIs with Python 3.7+ based on standard Python type hints.

  2. HTML Template for User Interaction: An HTML template is defined as a multi-line Python string, which includes a basic form for message input and a script for managing websocket communication. This template creates a user interface where messages can be sent to the server and responses are displayed dynamically.

  3. Running the Websocket Server: The run_websocket_server async context manager starts the websocket server using IOWebsockets.run_server_in_thread with the specified on_connect function and port. This server listens for incoming websocket connections.

  4. FastAPI Route for Serving HTML Page: A FastAPI route (@app.get("/")) is defined to serve the HTML page to users. When a user accesses the root URL, the HTML content for the websocket chat is returned, allowing them to interact with the websocket server.

  5. Starting the FastAPI Application: Lastly, the FastAPI application is started using Uvicorn, an ASGI server, configured with the app and additional parameters as needed. The server is then launched to serve the FastAPI application, making the interactive HTML page accessible to users.

This method of testing allows for interactive communication between the user and the server, providing a practical way to demonstrate and evaluate the behavior of the on_connect function in real-time. Users can send messages through the webpage, and the server processes these messages as per the logic defined in the on_connect function, showcasing the capabilities and responsiveness of the AutoGen framework’s websocket handling in a user-friendly manner.

from contextlib import asynccontextmanager  # noqa: E402
from pathlib import Path # noqa: E402

from fastapi import FastAPI # noqa: E402
from fastapi.responses import HTMLResponse # noqa: E402

PORT = 8000

html = """
<!DOCTYPE html>
<html>
<head>
<title>Autogen websocket test</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<form action="" onsubmit="sendMessage(event)">
<input type="text" id="messageText" autocomplete="off"/>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var ws = new WebSocket("ws://localhost:8080/ws");
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""


@asynccontextmanager
async def run_websocket_server(app):
with IOWebsockets.run_server_in_thread(on_connect=on_connect, port=8080) as uri:
print(f"Websocket server started at {uri}.", flush=True)

yield


app = FastAPI(lifespan=run_websocket_server)


@app.get("/")
async def get():
return HTMLResponse(html)
import uvicorn  # noqa: E402

config = uvicorn.Config(app)
server = uvicorn.Server(config)
await server.serve() # noqa: F704
INFO:     Started server process [5227]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
INFO: Shutting down
INFO: Waiting for application shutdown.
INFO: Application shutdown complete.
INFO: Finished server process [5227]
Websocket server started at ws://127.0.0.1:8080.
INFO: 127.0.0.1:42548 - "GET / HTTP/1.1" 200 OK
INFO: 127.0.0.1:42548 - "GET /favicon.ico HTTP/1.1" 404 Not Found
- on_connect(): Connected to client using IOWebsockets <autogen.io.websockets.IOWebsockets object at 0x7b8fc6991420>
- on_connect(): Receiving message from client.
- on_connect(): Initiating chat with agent <autogen.agentchat.conversable_agent.ConversableAgent object at 0x7b909c0cab00> using message 'write a poem about lundon'

The testing setup described above, leveraging FastAPI and websockets, not only serves as a robust testing framework for the on_connect function but also lays the groundwork for developing real-world applications. This approach exemplifies how web-based interactions can be made dynamic and real-time, a critical aspect of modern application development.

For instance, this setup can be directly applied or adapted to build interactive chat applications, real-time data dashboards, or live support systems. The integration of websockets enables the server to push updates to clients instantly, a key feature for applications that rely on the timely delivery of information. For example, a chat application built on this framework can support instantaneous messaging between users, enhancing user engagement and satisfaction.

Moreover, the simplicity and interactivity of the HTML page used for testing reflect how user interfaces can be designed to provide seamless experiences. Developers can expand upon this foundation to incorporate more sophisticated elements such as user authentication, message encryption, and custom user interactions, further tailoring the application to meet specific use case requirements.

The flexibility of the FastAPI framework, combined with the real-time communication enabled by websockets, provides a powerful toolset for developers looking to build scalable, efficient, and highly interactive web applications. Whether it’s for creating collaborative platforms, streaming services, or interactive gaming experiences, this testing setup offers a glimpse into the potential applications that can be developed with these technologies.

Testing websockets server with HTML/JS client

The provided code snippet below is an example of how to create an interactive testing environment for an on_connect function using Python’s built-in http.server module. This setup allows for real-time interaction within a web browser, enabling developers to test the websocket functionality in a more user-friendly and practical manner. Here’s a breakdown of how this code operates and its potential applications:

  1. Serving a Simple HTML Page: The code starts by defining an HTML page that includes a form for sending messages and a list to display incoming messages. JavaScript is used to handle the form submission and websocket communication.

  2. Temporary Directory for HTML File: A temporary directory is created to store the HTML file. This approach ensures that the testing environment is clean and isolated, minimizing conflicts with existing files or configurations.

  3. Custom HTTP Request Handler: A custom subclass of SimpleHTTPRequestHandler is defined to serve the HTML file. This handler overrides the do_GET method to redirect the root path (/) to the chat.html page, ensuring that visitors to the server’s root URL are immediately presented with the chat interface.

  4. Starting the Websocket Server: Concurrently, a websocket server is started on a different port using the IOWebsockets.run_server_in_thread method, with the previously defined on_connect function as the callback for new connections.

  5. HTTP Server for the HTML Interface: An HTTP server is instantiated to serve the HTML chat interface, enabling users to interact with the websocket server through a web browser.

This setup showcases a practical application of integrating websockets with a simple HTTP server to create a dynamic and interactive web application. By using Python’s standard library modules, it demonstrates a low-barrier entry to developing real-time applications such as chat systems, live notifications, or interactive dashboards.

The key takeaway from this code example is how easily Python’s built-in libraries can be leveraged to prototype and test complex web functionalities. For developers looking to build real-world applications, this approach offers a straightforward method to validate and refine websocket communication logic before integrating it into larger frameworks or systems. The simplicity and accessibility of this testing setup make it an excellent starting point for developing a wide range of interactive web applications.

from http.server import HTTPServer, SimpleHTTPRequestHandler  # noqa: E402

PORT = 8000

html = """
<!DOCTYPE html>
<html>
<head>
<title>Autogen websocket test</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<form action="" onsubmit="sendMessage(event)">
<input type="text" id="messageText" autocomplete="off"/>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var ws = new WebSocket("ws://localhost:8080/ws");
ws.onmessage = function(event) {
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
};
function sendMessage(event) {
var input = document.getElementById("messageText")
ws.send(input.value)
input.value = ''
event.preventDefault()
}
</script>
</body>
</html>
"""

with TemporaryDirectory() as temp_dir:
# create a simple HTTP webpage
path = Path(temp_dir) / "chat.html"
with open(path, "w") as f:
f.write(html)

#
class MyRequestHandler(SimpleHTTPRequestHandler):
def __init__(self, *args, **kwargs):
super().__init__(*args, directory=temp_dir, **kwargs)

def do_GET(self):
if self.path == "/":
self.path = "/chat.html"
return SimpleHTTPRequestHandler.do_GET(self)

handler = MyRequestHandler

with IOWebsockets.run_server_in_thread(on_connect=on_connect, port=8080) as uri:
print(f"Websocket server started at {uri}.", flush=True)

with HTTPServer(("", PORT), handler) as httpd:
print("HTTP server started at http://localhost:" + str(PORT))
try:
httpd.serve_forever()
except KeyboardInterrupt:
print(" - HTTP server stopped.", flush=True)
Websocket server started at ws://127.0.0.1:8080.
HTTP server started at http://localhost:8000
- on_connect(): Connected to client using IOWebsockets <autogen.io.websockets.IOWebsockets object at 0x7b8fc69937f0>
- on_connect(): Receiving message from client.
- on_connect(): Initiating chat with agent <autogen.agentchat.conversable_agent.ConversableAgent object at 0x7b8fc6990310> using message 'write a poem about new york'
- on_connect(): Connected to client using IOWebsockets <autogen.io.websockets.IOWebsockets object at 0x7b8fc68bdc90>
- on_connect(): Receiving message from client.
- on_connect(): Initiating chat with agent <autogen.agentchat.conversable_agent.ConversableAgent object at 0x7b8fc68be170> using message 'check the weather in london and write a poem about it'
- HTTP server stopped.
127.0.0.1 - - [05/Apr/2024 12:01:51] "GET / HTTP/1.1" 200 -
127.0.0.1 - - [05/Apr/2024 12:01:51] "GET / HTTP/1.1" 200 -
127.0.0.1 - - [05/Apr/2024 12:02:27] "GET / HTTP/1.1" 304 -