import asyncio
import base64
import io
import os
import random
from typing import Any, Callable, Dict, Optional, Tuple, Union, cast
# TODO: Fix unfollowed import
try:
    from markitdown import MarkItDown  # type: ignore
except ImportError:
    MarkItDown = None
from playwright._impl._errors import Error as PlaywrightError
from playwright._impl._errors import TimeoutError
from playwright.async_api import Download, Page
from ._types import (
    InteractiveRegion,
    VisualViewport,
    interactiveregion_from_dict,
    visualviewport_from_dict,
)
[docs]
class PlaywrightController:
    """
    A helper class to allow Playwright to interact with web pages to perform actions such as clicking, filling, and scrolling.
    Args:
        downloads_folder (str | None): The folder to save downloads to. If None, downloads are not saved.
        animate_actions (bool): Whether to animate the actions (create fake cursor to click).
        viewport_width (int): The width of the viewport.
        viewport_height (int): The height of the viewport.
        _download_handler (Optional[Callable[[Download], None]]): A function to handle downloads.
        to_resize_viewport (bool): Whether to resize the viewport
    """
    def __init__(
        self,
        downloads_folder: str | None = None,
        animate_actions: bool = False,
        viewport_width: int = 1440,
        viewport_height: int = 900,
        _download_handler: Optional[Callable[[Download], None]] = None,
        to_resize_viewport: bool = True,
    ) -> None:
        """
        Initialize the PlaywrightController.
        """
        assert isinstance(animate_actions, bool)
        assert isinstance(viewport_width, int)
        assert isinstance(viewport_height, int)
        assert viewport_height > 0
        assert viewport_width > 0
        self.animate_actions = animate_actions
        self.downloads_folder = downloads_folder
        self.viewport_width = viewport_width
        self.viewport_height = viewport_height
        self._download_handler = _download_handler
        self.to_resize_viewport = to_resize_viewport
        self._page_script: str = ""
        self.last_cursor_position: Tuple[float, float] = (0.0, 0.0)
        self._markdown_converter: Optional[Any] | None = None
        # Read page_script
        with open(os.path.join(os.path.abspath(os.path.dirname(__file__)), "page_script.js"), "rt") as fh:
            self._page_script = fh.read()
[docs]
    async def sleep(self, page: Page, duration: Union[int, float]) -> None:
        """
        Pause the execution for a specified duration.
        Args:
            page (Page): The Playwright page object.
            duration (Union[int, float]): The duration to sleep in milliseconds.
        """
        assert page is not None
        await page.wait_for_timeout(duration * 1000) 
[docs]
    async def get_interactive_rects(self, page: Page) -> Dict[str, InteractiveRegion]:
        """
        Retrieve interactive regions from the web page.
        Args:
            page (Page): The Playwright page object.
        Returns:
            Dict[str, InteractiveRegion]: A dictionary of interactive regions.
        """
        assert page is not None
        # Read the regions from the DOM
        try:
            await page.evaluate(self._page_script)
        except Exception:
            pass
        result = cast(Dict[str, Dict[str, Any]], await page.evaluate("MultimodalWebSurfer.getInteractiveRects();"))
        # Convert the results into appropriate types
        assert isinstance(result, dict)
        typed_results: Dict[str, InteractiveRegion] = {}
        for k in result:
            assert isinstance(k, str)
            typed_results[k] = interactiveregion_from_dict(result[k])
        return typed_results 
[docs]
    async def get_visual_viewport(self, page: Page) -> VisualViewport:
        """
        Retrieve the visual viewport of the web page.
        Args:
            page (Page): The Playwright page object.
        Returns:
            VisualViewport: The visual viewport of the page.
        """
        assert page is not None
        try:
            await page.evaluate(self._page_script)
        except Exception:
            pass
        return visualviewport_from_dict(await page.evaluate("MultimodalWebSurfer.getVisualViewport();")) 
[docs]
    async def get_focused_rect_id(self, page: Page) -> str:
        """
        Retrieve the ID of the currently focused element.
        Args:
            page (Page): The Playwright page object.
        Returns:
            str: The ID of the focused element.
        """
        assert page is not None
        try:
            await page.evaluate(self._page_script)
        except Exception:
            pass
        result = await page.evaluate("MultimodalWebSurfer.getFocusedElementId();")
        return str(result) 
[docs]
    async def get_page_metadata(self, page: Page) -> Dict[str, Any]:
        """
        Retrieve metadata from the web page.
        Args:
            page (Page): The Playwright page object.
        Returns:
            Dict[str, Any]: A dictionary of page metadata.
        """
        assert page is not None
        try:
            await page.evaluate(self._page_script)
        except Exception:
            pass
        result = await page.evaluate("MultimodalWebSurfer.getPageMetadata();")
        assert isinstance(result, dict)
        return cast(Dict[str, Any], result) 
[docs]
    async def on_new_page(self, page: Page) -> None:
        """
        Handle actions to perform on a new page.
        Args:
            page (Page): The Playwright page object.
        """
        assert page is not None
        page.on("download", self._download_handler)  # type: ignore
        if self.to_resize_viewport and self.viewport_width and self.viewport_height:
            await page.set_viewport_size({"width": self.viewport_width, "height": self.viewport_height})
        await self.sleep(page, 0.2)
        await page.add_init_script(path=os.path.join(os.path.abspath(os.path.dirname(__file__)), "page_script.js"))
        await page.wait_for_load_state() 
[docs]
    async def back(self, page: Page) -> None:
        """
        Navigate back to the previous page.
        Args:
            page (Page): The Playwright page object.
        """
        assert page is not None
        await page.go_back() 
[docs]
    async def visit_page(self, page: Page, url: str) -> Tuple[bool, bool]:
        """
        Visit a specified URL.
        Args:
            page (Page): The Playwright page object.
            url (str): The URL to visit.
        Returns:
            Tuple[bool, bool]: A tuple indicating whether to reset prior metadata hash and last download.
        """
        assert page is not None
        reset_prior_metadata_hash = False
        reset_last_download = False
        try:
            # Regular webpage
            await page.goto(url)
            await page.wait_for_load_state()
            reset_prior_metadata_hash = True
        except Exception as e_outer:
            # Downloaded file
            if self.downloads_folder and "net::ERR_ABORTED" in str(e_outer):
                async with page.expect_download() as download_info:
                    try:
                        await page.goto(url)
                    except Exception as e_inner:
                        if "net::ERR_ABORTED" in str(e_inner):
                            pass
                        else:
                            raise e_inner
                    download = await download_info.value
                    fname = os.path.join(self.downloads_folder, download.suggested_filename)
                    await download.save_as(fname)
                    message = f"<body style=\"margin: 20px;\"><h1>Successfully downloaded '{download.suggested_filename}' to local path:<br><br>{fname}</h1></body>"
                    await page.goto(
                        "data:text/html;base64," + base64.b64encode(message.encode("utf-8")).decode("utf-8")
                    )
                    reset_last_download = True
            else:
                raise e_outer
        return reset_prior_metadata_hash, reset_last_download 
[docs]
    async def page_down(self, page: Page) -> None:
        """
        Scroll the page down by one viewport height minus 50 pixels.
        Args:
            page (Page): The Playwright page object.
        """
        assert page is not None
        await page.evaluate(f"window.scrollBy(0, {self.viewport_height-50});") 
[docs]
    async def page_up(self, page: Page) -> None:
        """
        Scroll the page up by one viewport height minus 50 pixels.
        Args:
            page (Page): The Playwright page object.
        """
        assert page is not None
        await page.evaluate(f"window.scrollBy(0, -{self.viewport_height-50});") 
[docs]
    async def gradual_cursor_animation(
        self, page: Page, start_x: float, start_y: float, end_x: float, end_y: float
    ) -> None:
        """
        Animate the cursor movement gradually from start to end coordinates.
        Args:
            page (Page): The Playwright page object.
            start_x (float): The starting x-coordinate.
            start_y (float): The starting y-coordinate.
            end_x (float): The ending x-coordinate.
            end_y (float): The ending y-coordinate.
        """
        # animation helper
        steps = 20
        for step in range(steps):
            x = start_x + (end_x - start_x) * (step / steps)
            y = start_y + (end_y - start_y) * (step / steps)
            # await page.mouse.move(x, y, steps=1)
            await page.evaluate(f"""
                (function() {{
                    let cursor = document.getElementById('red-cursor');
                    cursor.style.left = '{x}px';
                    cursor.style.top = '{y}px';
                }})();
            """)
            await asyncio.sleep(0.05)
        self.last_cursor_position = (end_x, end_y) 
[docs]
    async def add_cursor_box(self, page: Page, identifier: str) -> None:
        """
        Add a red cursor box around the element with the given identifier.
        Args:
            page (Page): The Playwright page object.
            identifier (str): The element identifier.
        """
        # animation helper
        await page.evaluate(f"""
            (function() {{
                let elm = document.querySelector("[__elementId='{identifier}']");
                if (elm) {{
                    elm.style.transition = 'border 0.3s ease-in-out';
                    elm.style.border = '2px solid red';
                }}
            }})();
        """)
        await asyncio.sleep(0.3)
        # Create a red cursor
        await page.evaluate("""
            (function() {
                let cursor = document.createElement('div');
                cursor.id = 'red-cursor';
                cursor.style.width = '10px';
                cursor.style.height = '10px';
                cursor.style.backgroundColor = 'red';
                cursor.style.position = 'absolute';
                cursor.style.borderRadius = '50%';
                cursor.style.zIndex = '10000';
                document.body.appendChild(cursor);
            })();
        """) 
[docs]
    async def remove_cursor_box(self, page: Page, identifier: str) -> None:
        """
        Remove the red cursor box around the element with the given identifier.
        Args:
            page (Page): The Playwright page object.
            identifier (str): The element identifier.
        """
        # Remove the highlight and cursor
        await page.evaluate(f"""
            (function() {{
                let elm = document.querySelector("[__elementId='{identifier}']");
                if (elm) {{
                    elm.style.border = '';
                }}
                let cursor = document.getElementById('red-cursor');
                if (cursor) {{
                    cursor.remove();
                }}
            }})();
        """) 
[docs]
    async def click_id(self, page: Page, identifier: str) -> Page | None:
        """
        Click the element with the given identifier.
        Args:
            page (Page): The Playwright page object.
            identifier (str): The element identifier.
        Returns:
            Page | None: The new page if a new page is opened, otherwise None.
        """
        new_page: Page | None = None
        assert page is not None
        target = page.locator(f"[__elementId='{identifier}']")
        # See if it exists
        try:
            await target.wait_for(timeout=5000)
        except TimeoutError:
            raise ValueError("No such element.") from None
        # Click it
        await target.scroll_into_view_if_needed()
        await asyncio.sleep(0.3)
        box = cast(Dict[str, Union[int, float]], await target.bounding_box())
        if self.animate_actions:
            await self.add_cursor_box(page, identifier)
            # Move cursor to the box slowly
            start_x, start_y = self.last_cursor_position
            end_x, end_y = box["x"] + box["width"] / 2, box["y"] + box["height"] / 2
            await self.gradual_cursor_animation(page, start_x, start_y, end_x, end_y)
            await asyncio.sleep(0.1)
            try:
                # Give it a chance to open a new page
                async with page.expect_event("popup", timeout=1000) as page_info:  # type: ignore
                    await page.mouse.click(end_x, end_y, delay=10)
                    new_page = await page_info.value  # type: ignore
                    assert isinstance(new_page, Page)
                    await self.on_new_page(new_page)
            except TimeoutError:
                pass
            await self.remove_cursor_box(page, identifier)
        else:
            try:
                # Give it a chance to open a new page
                async with page.expect_event("popup", timeout=1000) as page_info:  # type: ignore
                    await page.mouse.click(box["x"] + box["width"] / 2, box["y"] + box["height"] / 2, delay=10)
                    new_page = await page_info.value  # type: ignore
                    assert isinstance(new_page, Page)
                    await self.on_new_page(new_page)
            except TimeoutError:
                pass
        return new_page  # type: ignore 
[docs]
    async def hover_id(self, page: Page, identifier: str) -> None:
        """
        Hover the mouse over the element with the given identifier.
        Args:
            page (Page): The Playwright page object.
            identifier (str): The element identifier.
        """
        assert page is not None
        target = page.locator(f"[__elementId='{identifier}']")
        # See if it exists
        try:
            await target.wait_for(timeout=5000)
        except TimeoutError:
            raise ValueError("No such element.") from None
        # Hover over it
        await target.scroll_into_view_if_needed()
        await asyncio.sleep(0.3)
        box = cast(Dict[str, Union[int, float]], await target.bounding_box())
        if self.animate_actions:
            await self.add_cursor_box(page, identifier)
            # Move cursor to the box slowly
            start_x, start_y = self.last_cursor_position
            end_x, end_y = box["x"] + box["width"] / 2, box["y"] + box["height"] / 2
            await self.gradual_cursor_animation(page, start_x, start_y, end_x, end_y)
            await asyncio.sleep(0.1)
            await page.mouse.move(box["x"] + box["width"] / 2, box["y"] + box["height"] / 2)
            await self.remove_cursor_box(page, identifier)
        else:
            await page.mouse.move(box["x"] + box["width"] / 2, box["y"] + box["height"] / 2) 
[docs]
    async def fill_id(self, page: Page, identifier: str, value: str, press_enter: bool = True) -> None:
        """
        Fill the element with the given identifier with the specified value.
        Args:
            page (Page): The Playwright page object.
            identifier (str): The element identifier.
            value (str): The value to fill.
        """
        assert page is not None
        target = page.locator(f"[__elementId='{identifier}']")
        # See if it exists
        try:
            await target.wait_for(timeout=5000)
        except TimeoutError:
            raise ValueError("No such element.") from None
        # Fill it
        await target.scroll_into_view_if_needed()
        box = cast(Dict[str, Union[int, float]], await target.bounding_box())
        if self.animate_actions:
            await self.add_cursor_box(page, identifier)
            # Move cursor to the box slowly
            start_x, start_y = self.last_cursor_position
            end_x, end_y = box["x"] + box["width"] / 2, box["y"] + box["height"] / 2
            await self.gradual_cursor_animation(page, start_x, start_y, end_x, end_y)
            await asyncio.sleep(0.1)
        # Focus on the element
        await target.focus()
        if self.animate_actions:
            # fill char by char to mimic human speed for short text and type fast for long text
            if len(value) < 100:
                delay_typing_speed = 50 + 100 * random.random()
            else:
                delay_typing_speed = 10
            await target.press_sequentially(value, delay=delay_typing_speed)
        else:
            try:
                await target.fill(value)
            except PlaywrightError:
                await target.press_sequentially(value)
        if press_enter:
            await target.press("Enter")
        if self.animate_actions:
            await self.remove_cursor_box(page, identifier) 
[docs]
    async def get_webpage_text(self, page: Page, n_lines: int = 50) -> str:
        """
        Retrieve the text content of the web page.
        Args:
            page (Page): The Playwright page object.
            n_lines (int): The number of lines to return from the page inner text.
        Returns:
            str: The text content of the page.
        """
        assert page is not None
        try:
            text_in_viewport = await page.evaluate("""() => {
                return document.body.innerText;
            }""")
            text_in_viewport = "\n".join(text_in_viewport.split("\n")[:n_lines])
            # remove empty lines
            text_in_viewport = "\n".join([line for line in text_in_viewport.split("\n") if line.strip()])
            assert isinstance(text_in_viewport, str)
            return text_in_viewport
        except Exception:
            return "" 
[docs]
    async def get_page_markdown(self, page: Page) -> str:
        """
        Retrieve the markdown content of the web page.
        Currently not implemented.
        Args:
            page (Page): The Playwright page object.
        Returns:
            str: The markdown content of the page.
        """
        assert page is not None
        if self._markdown_converter is None and MarkItDown is not None:
            self._markdown_converter = MarkItDown()
            html = await page.evaluate("document.documentElement.outerHTML;")
            res = self._markdown_converter.convert_stream(io.StringIO(html), file_extension=".html", url=page.url)  # type: ignore
            assert hasattr(res, "text_content") and isinstance(res.text_content, str)
            return res.text_content
        else:
            return await self.get_webpage_text(page, n_lines=200)