Source code for autogen_ext.agents.video_surfer.tools
import base64
from typing import Any, Dict, List, Tuple
import cv2
import ffmpeg
import numpy as np
import whisper
from autogen_core import Image as AGImage
from autogen_core.models import (
    ChatCompletionClient,
    UserMessage,
)
[docs]
def transcribe_audio_with_timestamps(audio_path: str) -> str:
    """
    Transcribes the audio file with timestamps using the Whisper model.
    :param audio_path: Path to the audio file.
    :return: Transcription with timestamps.
    """
    model = whisper.load_model("base")  # type: ignore
    result: Dict[str, Any] = model.transcribe(audio_path, task="transcribe", language="en", verbose=False)  # type: ignore
    segments: List[Dict[str, Any]] = result["segments"]
    transcription_with_timestamps = ""
    for segment in segments:
        start: float = segment["start"]
        end: float = segment["end"]
        text: str = segment["text"]
        transcription_with_timestamps += f"[{start:.2f} - {end:.2f}] {text}\n"
    return transcription_with_timestamps 
[docs]
def get_video_length(video_path: str) -> str:
    """
    Returns the length of the video in seconds.
    :param video_path: Path to the video file.
    :return: Duration of the video in seconds.
    """
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise IOError(f"Cannot open video file {video_path}")
    fps = cap.get(cv2.CAP_PROP_FPS)
    frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT)
    duration = frame_count / fps
    cap.release()
    return f"The video is {duration:.2f} seconds long." 
[docs]
def save_screenshot(video_path: str, timestamp: float, output_path: str) -> None:
    """
    Captures a screenshot at the specified timestamp and saves it to the output path.
    :param video_path: Path to the video file.
    :param timestamp: Timestamp in seconds.
    :param output_path: Path to save the screenshot. The file format is determined by the extension in the path.
    """
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise IOError(f"Cannot open video file {video_path}")
    fps = cap.get(cv2.CAP_PROP_FPS)
    frame_number = int(timestamp * fps)
    cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
    ret, frame = cap.read()
    if ret:
        cv2.imwrite(output_path, frame)
    else:
        raise IOError(f"Failed to capture frame at {timestamp:.2f}s")
    cap.release() 
[docs]
async def transcribe_video_screenshot(video_path: str, timestamp: float, model_client: ChatCompletionClient) -> str:
    """
    Transcribes the content of a video screenshot captured at the specified timestamp using OpenAI API.
    :param video_path: Path to the video file.
    :param timestamp: Timestamp in seconds.
    :param model_client: ChatCompletionClient instance.
    :return: Description of the screenshot content.
    """
    screenshots = get_screenshot_at(video_path, [timestamp])
    if not screenshots:
        return "Failed to capture screenshot."
    _, frame = screenshots[0]
    # Convert the frame to bytes and then to base64 encoding
    _, buffer = cv2.imencode(".jpg", frame)
    frame_bytes = buffer.tobytes()
    frame_base64 = base64.b64encode(frame_bytes).decode("utf-8")
    screenshot_uri = f"data:image/jpeg;base64,{frame_base64}"
    messages = [
        UserMessage(
            content=[
                "Following is a screenshot from the video at {} seconds. Describe what you see here.",
                AGImage.from_uri(screenshot_uri),
            ],
            source="tool",
        )
    ]
    result = await model_client.create(messages=messages)
    return str(result.content) 
[docs]
def get_screenshot_at(video_path: str, timestamps: List[float]) -> List[Tuple[float, np.ndarray[Any, Any]]]:
    """
    Captures screenshots at the specified timestamps and returns them as Python objects.
    :param video_path: Path to the video file.
    :param timestamps: List of timestamps in seconds.
    :return: List of tuples containing timestamp and the corresponding frame (image).
             Each frame is a NumPy array (height x width x channels).
    """
    screenshots: List[Tuple[float, np.ndarray[Any, Any]]] = []
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise IOError(f"Cannot open video file {video_path}")
    fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = cap.get(cv2.CAP_PROP_FRAME_COUNT)
    duration = total_frames / fps
    for timestamp in timestamps:
        if 0 <= timestamp <= duration:
            frame_number = int(timestamp * fps)
            cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
            ret, frame = cap.read()
            if ret:
                # Append the timestamp and frame to the list
                screenshots.append((timestamp, frame))
            else:
                raise IOError(f"Failed to capture frame at {timestamp:.2f}s")
        else:
            raise ValueError(f"Timestamp {timestamp:.2f}s is out of range [0s, {duration:.2f}s]")
    cap.release()
    return screenshots