Source code for autogen_ext.agents.web_surfer.playwright_controller
importasyncioimportbase64importioimportosimportrandomimportwarningsfromtypingimportAny,Callable,Dict,Optional,Tuple,Union,cast# TODO: Fix unfollowed importtry:# Suppress warnings from markitdown -- which is pretty chattywarnings.filterwarnings(action="ignore",module="markitdown")frommarkitdownimportMarkItDown# type: ignoreexceptImportError:MarkItDown=Nonefromplaywright._impl._errorsimportErrorasPlaywrightErrorfromplaywright._impl._errorsimportTimeoutErrorfromplaywright.async_apiimportDownload,Pagefrom._typesimport(InteractiveRegion,VisualViewport,interactiveregion_from_dict,visualviewport_from_dict,)
[docs]classPlaywrightController:""" A helper class to allow Playwright to interact with web pages to perform actions such as clicking, filling, and scrolling. Args: downloads_folder (str | None): The folder to save downloads to. If None, downloads are not saved. animate_actions (bool): Whether to animate the actions (create fake cursor to click). viewport_width (int): The width of the viewport. viewport_height (int): The height of the viewport. _download_handler (Optional[Callable[[Download], None]]): A function to handle downloads. to_resize_viewport (bool): Whether to resize the viewport """def__init__(self,downloads_folder:str|None=None,animate_actions:bool=False,viewport_width:int=1440,viewport_height:int=900,_download_handler:Optional[Callable[[Download],None]]=None,to_resize_viewport:bool=True,)->None:""" Initialize the PlaywrightController. """assertisinstance(animate_actions,bool)assertisinstance(viewport_width,int)assertisinstance(viewport_height,int)assertviewport_height>0assertviewport_width>0self.animate_actions=animate_actionsself.downloads_folder=downloads_folderself.viewport_width=viewport_widthself.viewport_height=viewport_heightself._download_handler=_download_handlerself.to_resize_viewport=to_resize_viewportself._page_script:str=""self.last_cursor_position:Tuple[float,float]=(0.0,0.0)self._markdown_converter:Optional[Any]|None=None# Read page_scriptwithopen(os.path.join(os.path.abspath(os.path.dirname(__file__)),"page_script.js"),"rt")asfh:self._page_script=fh.read()
[docs]asyncdefsleep(self,page:Page,duration:Union[int,float])->None:""" Pause the execution for a specified duration. Args: page (Page): The Playwright page object. duration (Union[int, float]): The duration to sleep in milliseconds. """assertpageisnotNoneawaitpage.wait_for_timeout(duration*1000)
[docs]asyncdefget_interactive_rects(self,page:Page)->Dict[str,InteractiveRegion]:""" Retrieve interactive regions from the web page. Args: page (Page): The Playwright page object. Returns: Dict[str, InteractiveRegion]: A dictionary of interactive regions. """assertpageisnotNone# Read the regions from the DOMtry:awaitpage.evaluate(self._page_script)exceptException:passresult=cast(Dict[str,Dict[str,Any]],awaitpage.evaluate("MultimodalWebSurfer.getInteractiveRects();"))# Convert the results into appropriate typesassertisinstance(result,dict)typed_results:Dict[str,InteractiveRegion]={}forkinresult:assertisinstance(k,str)typed_results[k]=interactiveregion_from_dict(result[k])returntyped_results
[docs]asyncdefget_visual_viewport(self,page:Page)->VisualViewport:""" Retrieve the visual viewport of the web page. Args: page (Page): The Playwright page object. Returns: VisualViewport: The visual viewport of the page. """assertpageisnotNonetry:awaitpage.evaluate(self._page_script)exceptException:passreturnvisualviewport_from_dict(awaitpage.evaluate("MultimodalWebSurfer.getVisualViewport();"))
[docs]asyncdefget_focused_rect_id(self,page:Page)->str|None:""" Retrieve the ID of the currently focused element. Args: page (Page): The Playwright page object. Returns: str: The ID of the focused element or None if no control has focus. """assertpageisnotNonetry:awaitpage.evaluate(self._page_script)exceptException:passresult=awaitpage.evaluate("MultimodalWebSurfer.getFocusedElementId();")returnNoneifresultisNoneelsestr(result)
[docs]asyncdefget_page_metadata(self,page:Page)->Dict[str,Any]:""" Retrieve metadata from the web page. Args: page (Page): The Playwright page object. Returns: Dict[str, Any]: A dictionary of page metadata. """assertpageisnotNonetry:awaitpage.evaluate(self._page_script)exceptException:passresult=awaitpage.evaluate("MultimodalWebSurfer.getPageMetadata();")assertisinstance(result,dict)returncast(Dict[str,Any],result)
[docs]asyncdefon_new_page(self,page:Page)->None:""" Handle actions to perform on a new page. Args: page (Page): The Playwright page object. """assertpageisnotNonepage.on("download",self._download_handler)# type: ignoreifself.to_resize_viewportandself.viewport_widthandself.viewport_height:awaitpage.set_viewport_size({"width":self.viewport_width,"height":self.viewport_height})awaitself.sleep(page,0.2)awaitpage.add_init_script(path=os.path.join(os.path.abspath(os.path.dirname(__file__)),"page_script.js"))awaitpage.wait_for_load_state()
[docs]asyncdefback(self,page:Page)->None:""" Navigate back to the previous page. Args: page (Page): The Playwright page object. """assertpageisnotNoneawaitpage.go_back()
[docs]asyncdefvisit_page(self,page:Page,url:str)->Tuple[bool,bool]:""" Visit a specified URL. Args: page (Page): The Playwright page object. url (str): The URL to visit. Returns: Tuple[bool, bool]: A tuple indicating whether to reset prior metadata hash and last download. """assertpageisnotNonereset_prior_metadata_hash=Falsereset_last_download=Falsetry:# Regular webpageawaitpage.goto(url)awaitpage.wait_for_load_state()reset_prior_metadata_hash=TrueexceptExceptionase_outer:# Downloaded fileifself.downloads_folderand"net::ERR_ABORTED"instr(e_outer):asyncwithpage.expect_download()asdownload_info:try:awaitpage.goto(url)exceptExceptionase_inner:if"net::ERR_ABORTED"instr(e_inner):passelse:raisee_innerdownload=awaitdownload_info.valuefname=os.path.join(self.downloads_folder,download.suggested_filename)awaitdownload.save_as(fname)message=f"<body style=\"margin: 20px;\"><h1>Successfully downloaded '{download.suggested_filename}' to local path:<br><br>{fname}</h1></body>"awaitpage.goto("data:text/html;base64,"+base64.b64encode(message.encode("utf-8")).decode("utf-8"))reset_last_download=Trueelse:raisee_outerreturnreset_prior_metadata_hash,reset_last_download
[docs]asyncdefpage_down(self,page:Page)->None:""" Scroll the page down by one viewport height minus 50 pixels. Args: page (Page): The Playwright page object. """assertpageisnotNoneawaitpage.evaluate(f"window.scrollBy(0, {self.viewport_height-50});")
[docs]asyncdefpage_up(self,page:Page)->None:""" Scroll the page up by one viewport height minus 50 pixels. Args: page (Page): The Playwright page object. """assertpageisnotNoneawaitpage.evaluate(f"window.scrollBy(0, -{self.viewport_height-50});")
[docs]asyncdefgradual_cursor_animation(self,page:Page,start_x:float,start_y:float,end_x:float,end_y:float)->None:""" Animate the cursor movement gradually from start to end coordinates. Args: page (Page): The Playwright page object. start_x (float): The starting x-coordinate. start_y (float): The starting y-coordinate. end_x (float): The ending x-coordinate. end_y (float): The ending y-coordinate. """# animation helpersteps=20forstepinrange(steps):x=start_x+(end_x-start_x)*(step/steps)y=start_y+(end_y-start_y)*(step/steps)# await page.mouse.move(x, y, steps=1)awaitpage.evaluate(f""" (function() {{ let cursor = document.getElementById('red-cursor'); cursor.style.left = '{x}px'; cursor.style.top = '{y}px';}})(); """)awaitasyncio.sleep(0.05)self.last_cursor_position=(end_x,end_y)
[docs]asyncdefadd_cursor_box(self,page:Page,identifier:str)->None:""" Add a red cursor box around the element with the given identifier. Args: page (Page): The Playwright page object. identifier (str): The element identifier. """# animation helperawaitpage.evaluate(f""" (function() {{ let elm = document.querySelector("[__elementId='{identifier}']"); if (elm) {{ elm.style.transition = 'border 0.3s ease-in-out'; elm.style.border = '2px solid red';}}}})(); """)awaitasyncio.sleep(0.3)# Create a red cursorawaitpage.evaluate(""" (function() { let cursor = document.createElement('div'); cursor.id = 'red-cursor'; cursor.style.width = '10px'; cursor.style.height = '10px'; cursor.style.backgroundColor = 'red'; cursor.style.position = 'absolute'; cursor.style.borderRadius = '50%'; cursor.style.zIndex = '10000'; document.body.appendChild(cursor); })(); """)
[docs]asyncdefremove_cursor_box(self,page:Page,identifier:str)->None:""" Remove the red cursor box around the element with the given identifier. Args: page (Page): The Playwright page object. identifier (str): The element identifier. """# Remove the highlight and cursorawaitpage.evaluate(f""" (function() {{ let elm = document.querySelector("[__elementId='{identifier}']"); if (elm) {{ elm.style.border = '';}} let cursor = document.getElementById('red-cursor'); if (cursor) {{ cursor.remove();}}}})(); """)
[docs]asyncdefclick_id(self,page:Page,identifier:str)->Page|None:""" Click the element with the given identifier. Args: page (Page): The Playwright page object. identifier (str): The element identifier. Returns: Page | None: The new page if a new page is opened, otherwise None. """new_page:Page|None=NoneassertpageisnotNonetarget=page.locator(f"[__elementId='{identifier}']")# See if it existstry:awaittarget.wait_for(timeout=5000)exceptTimeoutError:raiseValueError("No such element.")fromNone# Click itawaittarget.scroll_into_view_if_needed()awaitasyncio.sleep(0.3)box=cast(Dict[str,Union[int,float]],awaittarget.bounding_box())ifself.animate_actions:awaitself.add_cursor_box(page,identifier)# Move cursor to the box slowlystart_x,start_y=self.last_cursor_positionend_x,end_y=box["x"]+box["width"]/2,box["y"]+box["height"]/2awaitself.gradual_cursor_animation(page,start_x,start_y,end_x,end_y)awaitasyncio.sleep(0.1)try:# Give it a chance to open a new pageasyncwithpage.expect_event("popup",timeout=1000)aspage_info:# type: ignoreawaitpage.mouse.click(end_x,end_y,delay=10)new_page=awaitpage_info.value# type: ignoreassertisinstance(new_page,Page)awaitself.on_new_page(new_page)exceptTimeoutError:passawaitself.remove_cursor_box(page,identifier)else:try:# Give it a chance to open a new pageasyncwithpage.expect_event("popup",timeout=1000)aspage_info:# type: ignoreawaitpage.mouse.click(box["x"]+box["width"]/2,box["y"]+box["height"]/2,delay=10)new_page=awaitpage_info.value# type: ignoreassertisinstance(new_page,Page)awaitself.on_new_page(new_page)exceptTimeoutError:passreturnnew_page# type: ignore
[docs]asyncdefhover_id(self,page:Page,identifier:str)->None:""" Hover the mouse over the element with the given identifier. Args: page (Page): The Playwright page object. identifier (str): The element identifier. """assertpageisnotNonetarget=page.locator(f"[__elementId='{identifier}']")# See if it existstry:awaittarget.wait_for(timeout=5000)exceptTimeoutError:raiseValueError("No such element.")fromNone# Hover over itawaittarget.scroll_into_view_if_needed()awaitasyncio.sleep(0.3)box=cast(Dict[str,Union[int,float]],awaittarget.bounding_box())ifself.animate_actions:awaitself.add_cursor_box(page,identifier)# Move cursor to the box slowlystart_x,start_y=self.last_cursor_positionend_x,end_y=box["x"]+box["width"]/2,box["y"]+box["height"]/2awaitself.gradual_cursor_animation(page,start_x,start_y,end_x,end_y)awaitasyncio.sleep(0.1)awaitpage.mouse.move(box["x"]+box["width"]/2,box["y"]+box["height"]/2)awaitself.remove_cursor_box(page,identifier)else:awaitpage.mouse.move(box["x"]+box["width"]/2,box["y"]+box["height"]/2)
[docs]asyncdeffill_id(self,page:Page,identifier:str,value:str,press_enter:bool=True)->None:""" Fill the element with the given identifier with the specified value. Args: page (Page): The Playwright page object. identifier (str): The element identifier. value (str): The value to fill. """assertpageisnotNonetarget=page.locator(f"[__elementId='{identifier}']")# See if it existstry:awaittarget.wait_for(timeout=5000)exceptTimeoutError:raiseValueError("No such element.")fromNone# Fill itawaittarget.scroll_into_view_if_needed()box=cast(Dict[str,Union[int,float]],awaittarget.bounding_box())ifself.animate_actions:awaitself.add_cursor_box(page,identifier)# Move cursor to the box slowlystart_x,start_y=self.last_cursor_positionend_x,end_y=box["x"]+box["width"]/2,box["y"]+box["height"]/2awaitself.gradual_cursor_animation(page,start_x,start_y,end_x,end_y)awaitasyncio.sleep(0.1)# Focus on the elementawaittarget.focus()ifself.animate_actions:# fill char by char to mimic human speed for short text and type fast for long textiflen(value)<100:delay_typing_speed=50+100*random.random()else:delay_typing_speed=10awaittarget.press_sequentially(value,delay=delay_typing_speed)else:try:awaittarget.fill(value)exceptPlaywrightError:awaittarget.press_sequentially(value)ifpress_enter:awaittarget.press("Enter")ifself.animate_actions:awaitself.remove_cursor_box(page,identifier)
[docs]asyncdefscroll_id(self,page:Page,identifier:str,direction:str)->None:""" Scroll the element with the given identifier in the specified direction. Args: page (Page): The Playwright page object. identifier (str): The element identifier. direction (str): The direction to scroll ("up" or "down"). """assertpageisnotNoneawaitpage.evaluate(f""" (function() {{ let elm = document.querySelector("[__elementId='{identifier}']"); if (elm) {{ if ("{direction}" == "up") {{ elm.scrollTop = Math.max(0, elm.scrollTop - elm.clientHeight);}} else {{ elm.scrollTop = Math.min(elm.scrollHeight - elm.clientHeight, elm.scrollTop + elm.clientHeight);}}}}}})(); """)
[docs]asyncdefget_webpage_text(self,page:Page,n_lines:int=50)->str:""" Retrieve the text content of the web page. Args: page (Page): The Playwright page object. n_lines (int): The number of lines to return from the page inner text. Returns: str: The text content of the page. """assertpageisnotNonetry:text_in_viewport=awaitpage.evaluate("""() => { return document.body.innerText; }""")text_in_viewport="\n".join(text_in_viewport.split("\n")[:n_lines])# remove empty linestext_in_viewport="\n".join([lineforlineintext_in_viewport.split("\n")ifline.strip()])assertisinstance(text_in_viewport,str)returntext_in_viewportexceptException:return""
[docs]asyncdefget_visible_text(self,page:Page)->str:""" Retrieve the text content of the browser viewport (approximately). Args: page (Page): The Playwright page object. Returns: str: The text content of the page. """assertpageisnotNonetry:awaitpage.evaluate(self._page_script)exceptException:passresult=awaitpage.evaluate("MultimodalWebSurfer.getVisibleText();")assertisinstance(result,str)returnresult
[docs]asyncdefget_page_markdown(self,page:Page)->str:""" Retrieve the markdown content of the web page. Currently not implemented. Args: page (Page): The Playwright page object. Returns: str: The markdown content of the page. """assertpageisnotNoneifself._markdown_converterisNoneandMarkItDownisnotNone:self._markdown_converter=MarkItDown()html=awaitpage.evaluate("document.documentElement.outerHTML;")res=self._markdown_converter.convert_stream(io.StringIO(html),file_extension=".html",url=page.url)# type: ignoreasserthasattr(res,"text_content")andisinstance(res.text_content,str)returnres.text_contentelse:returnawaitself.get_webpage_text(page,n_lines=200)