Module tinytroupe.agent.grounding

Expand source code
from tinytroupe.utils import JsonSerializableRegistry
import tinytroupe.utils as utils

from tinytroupe.agent import logger
from llama_index.core import  VectorStoreIndex, SimpleDirectoryReader, Document, StorageContext, load_index_from_storage
from llama_index.core.vector_stores import SimpleVectorStore
from llama_index.readers.web import SimpleWebPageReader
import json
import tempfile
import os
import shutil


#######################################################################################################################
# Grounding connectors
#######################################################################################################################

class GroundingConnector(JsonSerializableRegistry):
    """
    An abstract class representing a grounding connector. A grounding connector is a component that allows an agent to ground
    its knowledge in external sources, such as files, web pages, databases, etc.
    """

    serializable_attributes = ["name"]

    def __init__(self, name:str) -> None:
        self.name = name
    
    def retrieve_relevant(self, relevance_target:str, source:str, top_k=20) -> list:
        raise NotImplementedError("Subclasses must implement this method.")
    
    def retrieve_by_name(self, name:str) -> str:
        raise NotImplementedError("Subclasses must implement this method.")
    
    def list_sources(self) -> list:
        raise NotImplementedError("Subclasses must implement this method.")


@utils.post_init
class BaseSemanticGroundingConnector(GroundingConnector):
    """
    A base class for semantic grounding connectors. A semantic grounding connector is a component that indexes and retrieves
    documents based on so-called "semantic search" (i.e, embeddings-based search). This specific implementation
    is based on the VectorStoreIndex class from the LLaMa-Index library. Here, "documents" refer to the llama-index's
    data structure that stores a unit of content, not necessarily a file.
    """

    serializable_attributes = ["documents", "index"]
    
    # needs custom deserialization to handle Pydantic models (Document is a Pydantic model)
    custom_deserializers = {"documents": lambda docs_json: [Document.from_json(doc_json) for doc_json in docs_json],
                            "index": lambda index_json: BaseSemanticGroundingConnector._deserialize_index(index_json)}

    custom_serializers = {"documents": lambda docs: [doc.to_json() for doc in docs] if docs is not None else None,
                          "index": lambda index: BaseSemanticGroundingConnector._serialize_index(index)}

    def __init__(self, name:str="Semantic Grounding") -> None:
        super().__init__(name)

        self.documents = None 
        self.name_to_document = None
        self.index = None

        # @post_init ensures that _post_init is called after the __init__ method
    
    def _post_init(self):
        """
        This will run after __init__, since the class has the @post_init decorator.
        It is convenient to separate some of the initialization processes to make deserialize easier.
        """
        self.index = None

        if not hasattr(self, 'documents') or self.documents is None:
            self.documents = []
        
        if not hasattr(self, 'name_to_document') or self.name_to_document is None:
            self.name_to_document = {}

            if hasattr(self, 'documents') and self.documents is not None:
                for document in self.documents:
                    # if the document has a semantic memory ID, we use it as the identifier
                    name = document.metadata.get("semantic_memory_id", document.id_)
                    
                    # self.name_to_document[name] contains a list, since each source file could be split into multiple pages
                    if name in self.name_to_document:
                        self.name_to_document[name].append(document)
                    else:
                        self.name_to_document[name] = [document]
        
        # Rebuild index from documents if it's None or invalid
        if self.index is None and self.documents:
            logger.warning("No index found. Rebuilding index from documents.")
            vector_store = SimpleVectorStore()
            self.index = VectorStoreIndex.from_documents(
                self.documents,
                vector_store=vector_store,
                store_nodes_override=True
            )

        # TODO remove?
        #self.add_documents(self.documents)        

    @staticmethod
    def _serialize_index(index):
        """Helper function to serialize index with proper storage context"""
        if index is None:
            return None
        
        try:
            # Create a temporary directory to store the index
            with tempfile.TemporaryDirectory() as temp_dir:
                # Persist the index to the temporary directory
                index.storage_context.persist(persist_dir=temp_dir)
                
                # Read all the persisted files and store them in a dictionary
                persisted_data = {}
                for filename in os.listdir(temp_dir):
                    filepath = os.path.join(temp_dir, filename)
                    if os.path.isfile(filepath):
                        with open(filepath, 'r') as f:
                            persisted_data[filename] = f.read()
                
                return persisted_data
        except Exception as e:
            logger.warning(f"Failed to serialize index: {e}")
            return None

    @staticmethod
    def _deserialize_index(index_data):
        """Helper function to deserialize index with proper error handling"""
        if not index_data:
            return None
        
        try:
            # Create a temporary directory to restore the index
            with tempfile.TemporaryDirectory() as temp_dir:
                # Write all the persisted files to the temporary directory
                for filename, content in index_data.items():
                    filepath = os.path.join(temp_dir, filename)
                    with open(filepath, 'w') as f:
                        f.write(content)
                
                # Load the index from the temporary directory
                storage_context = StorageContext.from_defaults(persist_dir=temp_dir)
                index = load_index_from_storage(storage_context)
                
                return index
        except Exception as e:
            # If deserialization fails, return None
            # The index will be rebuilt from documents in _post_init
            logger.warning(f"Failed to deserialize index: {e}. Index will be rebuilt.")
            return None
    
    def retrieve_relevant(self, relevance_target:str, top_k=20) -> list:
        """
        Retrieves all values from memory that are relevant to a given target.
        """
        # Handle empty or None query
        if not relevance_target or not relevance_target.strip():
            return []
            
        if self.index is not None:
            retriever = self.index.as_retriever(similarity_top_k=top_k)
            nodes = retriever.retrieve(relevance_target)
        else:
            nodes = []

        retrieved = []
        for node in nodes:
            content = "SOURCE: " + node.metadata.get('file_name', '(unknown)')
            content += "\n" + "SIMILARITY SCORE:" + str(node.score)
            content += "\n" + "RELEVANT CONTENT:" + node.text
            retrieved.append(content)

            logger.debug(f"Content retrieved: {content[:200]}")

        return retrieved
    
    def retrieve_by_name(self, name:str) -> list:
        """
        Retrieves a content source by its name.
        """
        # TODO also optionally provide a relevance target?
        results = []
        if self.name_to_document is not None and name in self.name_to_document:
            docs = self.name_to_document[name]
            for i, doc in enumerate(docs):
                if doc is not None:
                    content = f"SOURCE: {name}\n"
                    content += f"PAGE: {i}\n"
                    content += "CONTENT: \n" + doc.text[:10000] # TODO a more intelligent way to limit the content
                    results.append(content)
                    
        return results
        
        
    def list_sources(self) -> list:
        """
        Lists the names of the available content sources.
        """
        if self.name_to_document is not None:
            return list(self.name_to_document.keys())
        else:
            return []
    
    def add_document(self, document) -> None:
        """
        Indexes a document for semantic retrieval.

        Assumes the document has a metadata field called "semantic_memory_id" that is used to identify the document within Semantic Memory.
        """
        self.add_documents([document])

    def add_documents(self, new_documents) -> list:
        """
        Indexes documents for semantic retrieval.
        """
        # index documents by name
        if len(new_documents) > 0:
            
            # process documents individually too
            for document in new_documents:
                logger.debug(f"Adding document {document} to index, text is: {document.text}")

                # out of an abundance of caution, we sanitize the text
                document.text = utils.sanitize_raw_string(document.text)

                logger.debug(f"Document text after sanitization: {document.text}")

                # add the new document to the list of documents after all sanitization and checks
                self.documents.append(document)

                if document.metadata.get("semantic_memory_id") is not None:
                    # if the document has a semantic memory ID, we use it as the identifier
                    name = document.metadata["semantic_memory_id"]
                    
                    # Ensure name_to_document is initialized
                    if not hasattr(self, 'name_to_document') or self.name_to_document is None:
                        self.name_to_document = {}
                    
                    # self.name_to_document[name] contains a list, since each source file could be split into multiple pages
                    if name in self.name_to_document:
                        self.name_to_document[name].append(document)
                    else:
                        self.name_to_document[name] = [document]


            # index documents for semantic retrieval
            if self.index is None:
                # Create storage context with vector store
                vector_store = SimpleVectorStore()
                storage_context = StorageContext.from_defaults(vector_store=vector_store)
                
                self.index = VectorStoreIndex.from_documents(
                    self.documents, 
                    storage_context=storage_context,
                    store_nodes_override=True  # This ensures nodes (with text) are stored
                )
            else:
                self.index.refresh(self.documents)
    
    @staticmethod
    def _set_internal_id_to_documents(documents:list, external_attribute_name:str ="file_name") -> None:
        """
        Sets the internal ID for each document in the list of documents.
        This is useful to ensure that each document has a unique identifier.
        """
        for doc in documents:
            if not hasattr(doc, 'metadata'):
                doc.metadata = {}
            doc.metadata["semantic_memory_id"] = doc.metadata.get(external_attribute_name, doc.id_)

        return documents
    

@utils.post_init
class LocalFilesGroundingConnector(BaseSemanticGroundingConnector):

    serializable_attributes = ["folders_paths"]

    def __init__(self, name:str="Local Files", folders_paths: list=None) -> None:
        super().__init__(name)

        self.folders_paths = folders_paths

        # @post_init ensures that _post_init is called after the __init__ method
    
    def _post_init(self):
        """
        This will run after __init__, since the class has the @post_init decorator.
        It is convenient to separate some of the initialization processes to make deserialize easier.
        """
        self.loaded_folders_paths = []

        if not hasattr(self, 'folders_paths') or self.folders_paths is None:
            self.folders_paths = []

        self.add_folders(self.folders_paths)

    def add_folders(self, folders_paths:list) -> None:
        """
        Adds a path to a folder with files used for grounding.
        """

        if folders_paths is not None:
            for folder_path in folders_paths:
                try:
                    logger.debug(f"Adding the following folder to grounding index: {folder_path}")
                    self.add_folder(folder_path)
                except (FileNotFoundError, ValueError) as e:
                    print(f"Error: {e}")
                    print(f"Current working directory: {os.getcwd()}")
                    print(f"Provided path: {folder_path}")
                    print("Please check if the path exists and is accessible.")

    def add_folder(self, folder_path:str) -> None:
        """
        Adds a path to a folder with files used for grounding.
        """

        if folder_path not in self.loaded_folders_paths:
            self._mark_folder_as_loaded(folder_path)

            # for PDF files, please note that the document will be split into pages: https://github.com/run-llama/llama_index/issues/15903
            new_files = SimpleDirectoryReader(folder_path).load_data()
            BaseSemanticGroundingConnector._set_internal_id_to_documents(new_files, "file_name")

            self.add_documents(new_files)
    
    def add_file_path(self, file_path:str) -> None:
        """
        Adds a path to a file used for grounding.
        """
        # a trick to make SimpleDirectoryReader work with a single file
        new_files = SimpleDirectoryReader(input_files=[file_path]).load_data()
        
        logger.debug(f"Adding the following file to grounding index: {new_files}")
        BaseSemanticGroundingConnector._set_internal_id_to_documents(new_files, "file_name")
    
    def _mark_folder_as_loaded(self, folder_path:str) -> None:
        if folder_path not in self.loaded_folders_paths:
            self.loaded_folders_paths.append(folder_path)
        
        if folder_path not in self.folders_paths:
            self.folders_paths.append(folder_path)
    
    
    

@utils.post_init
class WebPagesGroundingConnector(BaseSemanticGroundingConnector):

    serializable_attributes = ["web_urls"]

    def __init__(self, name:str="Web Pages", web_urls: list=None) -> None:
        super().__init__(name)

        self.web_urls = web_urls

        # @post_init ensures that _post_init is called after the __init__ method
    
    def _post_init(self):
        self.loaded_web_urls = []

        if not hasattr(self, 'web_urls') or self.web_urls is None:
            self.web_urls = []

        # load web urls
        self.add_web_urls(self.web_urls)
    
    def add_web_urls(self, web_urls:list) -> None:
        """ 
        Adds the data retrieved from the specified URLs to grounding.
        """
        filtered_web_urls = [url for url in web_urls if url not in self.loaded_web_urls]
        for url in filtered_web_urls:
            self._mark_web_url_as_loaded(url)

        if len(filtered_web_urls) > 0:
            new_documents = SimpleWebPageReader(html_to_text=True).load_data(filtered_web_urls)
            BaseSemanticGroundingConnector._set_internal_id_to_documents(new_documents, "url")
            self.add_documents(new_documents)
    
    def add_web_url(self, web_url:str) -> None:
        """
        Adds the data retrieved from the specified URL to grounding.
        """
        # we do it like this because the add_web_urls could run scrapes in parallel, so it is better
        # to implement this one in terms of the other
        self.add_web_urls([web_url])
    
    def _mark_web_url_as_loaded(self, web_url:str) -> None:
        if web_url not in self.loaded_web_urls:
            self.loaded_web_urls.append(web_url)
        
        if web_url not in self.web_urls:
            self.web_urls.append(web_url)

Classes

class BaseSemanticGroundingConnector (*args, **kwargs)

A base class for semantic grounding connectors. A semantic grounding connector is a component that indexes and retrieves documents based on so-called "semantic search" (i.e, embeddings-based search). This specific implementation is based on the VectorStoreIndex class from the LLaMa-Index library. Here, "documents" refer to the llama-index's data structure that stores a unit of content, not necessarily a file.

Expand source code
@utils.post_init
class BaseSemanticGroundingConnector(GroundingConnector):
    """
    A base class for semantic grounding connectors. A semantic grounding connector is a component that indexes and retrieves
    documents based on so-called "semantic search" (i.e, embeddings-based search). This specific implementation
    is based on the VectorStoreIndex class from the LLaMa-Index library. Here, "documents" refer to the llama-index's
    data structure that stores a unit of content, not necessarily a file.
    """

    serializable_attributes = ["documents", "index"]
    
    # needs custom deserialization to handle Pydantic models (Document is a Pydantic model)
    custom_deserializers = {"documents": lambda docs_json: [Document.from_json(doc_json) for doc_json in docs_json],
                            "index": lambda index_json: BaseSemanticGroundingConnector._deserialize_index(index_json)}

    custom_serializers = {"documents": lambda docs: [doc.to_json() for doc in docs] if docs is not None else None,
                          "index": lambda index: BaseSemanticGroundingConnector._serialize_index(index)}

    def __init__(self, name:str="Semantic Grounding") -> None:
        super().__init__(name)

        self.documents = None 
        self.name_to_document = None
        self.index = None

        # @post_init ensures that _post_init is called after the __init__ method
    
    def _post_init(self):
        """
        This will run after __init__, since the class has the @post_init decorator.
        It is convenient to separate some of the initialization processes to make deserialize easier.
        """
        self.index = None

        if not hasattr(self, 'documents') or self.documents is None:
            self.documents = []
        
        if not hasattr(self, 'name_to_document') or self.name_to_document is None:
            self.name_to_document = {}

            if hasattr(self, 'documents') and self.documents is not None:
                for document in self.documents:
                    # if the document has a semantic memory ID, we use it as the identifier
                    name = document.metadata.get("semantic_memory_id", document.id_)
                    
                    # self.name_to_document[name] contains a list, since each source file could be split into multiple pages
                    if name in self.name_to_document:
                        self.name_to_document[name].append(document)
                    else:
                        self.name_to_document[name] = [document]
        
        # Rebuild index from documents if it's None or invalid
        if self.index is None and self.documents:
            logger.warning("No index found. Rebuilding index from documents.")
            vector_store = SimpleVectorStore()
            self.index = VectorStoreIndex.from_documents(
                self.documents,
                vector_store=vector_store,
                store_nodes_override=True
            )

        # TODO remove?
        #self.add_documents(self.documents)        

    @staticmethod
    def _serialize_index(index):
        """Helper function to serialize index with proper storage context"""
        if index is None:
            return None
        
        try:
            # Create a temporary directory to store the index
            with tempfile.TemporaryDirectory() as temp_dir:
                # Persist the index to the temporary directory
                index.storage_context.persist(persist_dir=temp_dir)
                
                # Read all the persisted files and store them in a dictionary
                persisted_data = {}
                for filename in os.listdir(temp_dir):
                    filepath = os.path.join(temp_dir, filename)
                    if os.path.isfile(filepath):
                        with open(filepath, 'r') as f:
                            persisted_data[filename] = f.read()
                
                return persisted_data
        except Exception as e:
            logger.warning(f"Failed to serialize index: {e}")
            return None

    @staticmethod
    def _deserialize_index(index_data):
        """Helper function to deserialize index with proper error handling"""
        if not index_data:
            return None
        
        try:
            # Create a temporary directory to restore the index
            with tempfile.TemporaryDirectory() as temp_dir:
                # Write all the persisted files to the temporary directory
                for filename, content in index_data.items():
                    filepath = os.path.join(temp_dir, filename)
                    with open(filepath, 'w') as f:
                        f.write(content)
                
                # Load the index from the temporary directory
                storage_context = StorageContext.from_defaults(persist_dir=temp_dir)
                index = load_index_from_storage(storage_context)
                
                return index
        except Exception as e:
            # If deserialization fails, return None
            # The index will be rebuilt from documents in _post_init
            logger.warning(f"Failed to deserialize index: {e}. Index will be rebuilt.")
            return None
    
    def retrieve_relevant(self, relevance_target:str, top_k=20) -> list:
        """
        Retrieves all values from memory that are relevant to a given target.
        """
        # Handle empty or None query
        if not relevance_target or not relevance_target.strip():
            return []
            
        if self.index is not None:
            retriever = self.index.as_retriever(similarity_top_k=top_k)
            nodes = retriever.retrieve(relevance_target)
        else:
            nodes = []

        retrieved = []
        for node in nodes:
            content = "SOURCE: " + node.metadata.get('file_name', '(unknown)')
            content += "\n" + "SIMILARITY SCORE:" + str(node.score)
            content += "\n" + "RELEVANT CONTENT:" + node.text
            retrieved.append(content)

            logger.debug(f"Content retrieved: {content[:200]}")

        return retrieved
    
    def retrieve_by_name(self, name:str) -> list:
        """
        Retrieves a content source by its name.
        """
        # TODO also optionally provide a relevance target?
        results = []
        if self.name_to_document is not None and name in self.name_to_document:
            docs = self.name_to_document[name]
            for i, doc in enumerate(docs):
                if doc is not None:
                    content = f"SOURCE: {name}\n"
                    content += f"PAGE: {i}\n"
                    content += "CONTENT: \n" + doc.text[:10000] # TODO a more intelligent way to limit the content
                    results.append(content)
                    
        return results
        
        
    def list_sources(self) -> list:
        """
        Lists the names of the available content sources.
        """
        if self.name_to_document is not None:
            return list(self.name_to_document.keys())
        else:
            return []
    
    def add_document(self, document) -> None:
        """
        Indexes a document for semantic retrieval.

        Assumes the document has a metadata field called "semantic_memory_id" that is used to identify the document within Semantic Memory.
        """
        self.add_documents([document])

    def add_documents(self, new_documents) -> list:
        """
        Indexes documents for semantic retrieval.
        """
        # index documents by name
        if len(new_documents) > 0:
            
            # process documents individually too
            for document in new_documents:
                logger.debug(f"Adding document {document} to index, text is: {document.text}")

                # out of an abundance of caution, we sanitize the text
                document.text = utils.sanitize_raw_string(document.text)

                logger.debug(f"Document text after sanitization: {document.text}")

                # add the new document to the list of documents after all sanitization and checks
                self.documents.append(document)

                if document.metadata.get("semantic_memory_id") is not None:
                    # if the document has a semantic memory ID, we use it as the identifier
                    name = document.metadata["semantic_memory_id"]
                    
                    # Ensure name_to_document is initialized
                    if not hasattr(self, 'name_to_document') or self.name_to_document is None:
                        self.name_to_document = {}
                    
                    # self.name_to_document[name] contains a list, since each source file could be split into multiple pages
                    if name in self.name_to_document:
                        self.name_to_document[name].append(document)
                    else:
                        self.name_to_document[name] = [document]


            # index documents for semantic retrieval
            if self.index is None:
                # Create storage context with vector store
                vector_store = SimpleVectorStore()
                storage_context = StorageContext.from_defaults(vector_store=vector_store)
                
                self.index = VectorStoreIndex.from_documents(
                    self.documents, 
                    storage_context=storage_context,
                    store_nodes_override=True  # This ensures nodes (with text) are stored
                )
            else:
                self.index.refresh(self.documents)
    
    @staticmethod
    def _set_internal_id_to_documents(documents:list, external_attribute_name:str ="file_name") -> None:
        """
        Sets the internal ID for each document in the list of documents.
        This is useful to ensure that each document has a unique identifier.
        """
        for doc in documents:
            if not hasattr(doc, 'metadata'):
                doc.metadata = {}
            doc.metadata["semantic_memory_id"] = doc.metadata.get(external_attribute_name, doc.id_)

        return documents

Ancestors

Subclasses

Class variables

var custom_deserializers
var custom_serializers
var serializable_attributes

Methods

def add_document(self, document) ‑> None

Indexes a document for semantic retrieval.

Assumes the document has a metadata field called "semantic_memory_id" that is used to identify the document within Semantic Memory.

Expand source code
def add_document(self, document) -> None:
    """
    Indexes a document for semantic retrieval.

    Assumes the document has a metadata field called "semantic_memory_id" that is used to identify the document within Semantic Memory.
    """
    self.add_documents([document])
def add_documents(self, new_documents) ‑> list

Indexes documents for semantic retrieval.

Expand source code
def add_documents(self, new_documents) -> list:
    """
    Indexes documents for semantic retrieval.
    """
    # index documents by name
    if len(new_documents) > 0:
        
        # process documents individually too
        for document in new_documents:
            logger.debug(f"Adding document {document} to index, text is: {document.text}")

            # out of an abundance of caution, we sanitize the text
            document.text = utils.sanitize_raw_string(document.text)

            logger.debug(f"Document text after sanitization: {document.text}")

            # add the new document to the list of documents after all sanitization and checks
            self.documents.append(document)

            if document.metadata.get("semantic_memory_id") is not None:
                # if the document has a semantic memory ID, we use it as the identifier
                name = document.metadata["semantic_memory_id"]
                
                # Ensure name_to_document is initialized
                if not hasattr(self, 'name_to_document') or self.name_to_document is None:
                    self.name_to_document = {}
                
                # self.name_to_document[name] contains a list, since each source file could be split into multiple pages
                if name in self.name_to_document:
                    self.name_to_document[name].append(document)
                else:
                    self.name_to_document[name] = [document]


        # index documents for semantic retrieval
        if self.index is None:
            # Create storage context with vector store
            vector_store = SimpleVectorStore()
            storage_context = StorageContext.from_defaults(vector_store=vector_store)
            
            self.index = VectorStoreIndex.from_documents(
                self.documents, 
                storage_context=storage_context,
                store_nodes_override=True  # This ensures nodes (with text) are stored
            )
        else:
            self.index.refresh(self.documents)
def list_sources(self) ‑> list

Lists the names of the available content sources.

Expand source code
def list_sources(self) -> list:
    """
    Lists the names of the available content sources.
    """
    if self.name_to_document is not None:
        return list(self.name_to_document.keys())
    else:
        return []
def retrieve_by_name(self, name: str) ‑> list

Retrieves a content source by its name.

Expand source code
def retrieve_by_name(self, name:str) -> list:
    """
    Retrieves a content source by its name.
    """
    # TODO also optionally provide a relevance target?
    results = []
    if self.name_to_document is not None and name in self.name_to_document:
        docs = self.name_to_document[name]
        for i, doc in enumerate(docs):
            if doc is not None:
                content = f"SOURCE: {name}\n"
                content += f"PAGE: {i}\n"
                content += "CONTENT: \n" + doc.text[:10000] # TODO a more intelligent way to limit the content
                results.append(content)
                
    return results
def retrieve_relevant(self, relevance_target: str, top_k=20) ‑> list

Retrieves all values from memory that are relevant to a given target.

Expand source code
def retrieve_relevant(self, relevance_target:str, top_k=20) -> list:
    """
    Retrieves all values from memory that are relevant to a given target.
    """
    # Handle empty or None query
    if not relevance_target or not relevance_target.strip():
        return []
        
    if self.index is not None:
        retriever = self.index.as_retriever(similarity_top_k=top_k)
        nodes = retriever.retrieve(relevance_target)
    else:
        nodes = []

    retrieved = []
    for node in nodes:
        content = "SOURCE: " + node.metadata.get('file_name', '(unknown)')
        content += "\n" + "SIMILARITY SCORE:" + str(node.score)
        content += "\n" + "RELEVANT CONTENT:" + node.text
        retrieved.append(content)

        logger.debug(f"Content retrieved: {content[:200]}")

    return retrieved

Inherited members

class GroundingConnector (name: str)

An abstract class representing a grounding connector. A grounding connector is a component that allows an agent to ground its knowledge in external sources, such as files, web pages, databases, etc.

Expand source code
class GroundingConnector(JsonSerializableRegistry):
    """
    An abstract class representing a grounding connector. A grounding connector is a component that allows an agent to ground
    its knowledge in external sources, such as files, web pages, databases, etc.
    """

    serializable_attributes = ["name"]

    def __init__(self, name:str) -> None:
        self.name = name
    
    def retrieve_relevant(self, relevance_target:str, source:str, top_k=20) -> list:
        raise NotImplementedError("Subclasses must implement this method.")
    
    def retrieve_by_name(self, name:str) -> str:
        raise NotImplementedError("Subclasses must implement this method.")
    
    def list_sources(self) -> list:
        raise NotImplementedError("Subclasses must implement this method.")

Ancestors

Subclasses

Class variables

var serializable_attributes

Methods

def list_sources(self) ‑> list
Expand source code
def list_sources(self) -> list:
    raise NotImplementedError("Subclasses must implement this method.")
def retrieve_by_name(self, name: str) ‑> str
Expand source code
def retrieve_by_name(self, name:str) -> str:
    raise NotImplementedError("Subclasses must implement this method.")
def retrieve_relevant(self, relevance_target: str, source: str, top_k=20) ‑> list
Expand source code
def retrieve_relevant(self, relevance_target:str, source:str, top_k=20) -> list:
    raise NotImplementedError("Subclasses must implement this method.")

Inherited members

class LocalFilesGroundingConnector (*args, **kwargs)

A base class for semantic grounding connectors. A semantic grounding connector is a component that indexes and retrieves documents based on so-called "semantic search" (i.e, embeddings-based search). This specific implementation is based on the VectorStoreIndex class from the LLaMa-Index library. Here, "documents" refer to the llama-index's data structure that stores a unit of content, not necessarily a file.

Expand source code
@utils.post_init
class LocalFilesGroundingConnector(BaseSemanticGroundingConnector):

    serializable_attributes = ["folders_paths"]

    def __init__(self, name:str="Local Files", folders_paths: list=None) -> None:
        super().__init__(name)

        self.folders_paths = folders_paths

        # @post_init ensures that _post_init is called after the __init__ method
    
    def _post_init(self):
        """
        This will run after __init__, since the class has the @post_init decorator.
        It is convenient to separate some of the initialization processes to make deserialize easier.
        """
        self.loaded_folders_paths = []

        if not hasattr(self, 'folders_paths') or self.folders_paths is None:
            self.folders_paths = []

        self.add_folders(self.folders_paths)

    def add_folders(self, folders_paths:list) -> None:
        """
        Adds a path to a folder with files used for grounding.
        """

        if folders_paths is not None:
            for folder_path in folders_paths:
                try:
                    logger.debug(f"Adding the following folder to grounding index: {folder_path}")
                    self.add_folder(folder_path)
                except (FileNotFoundError, ValueError) as e:
                    print(f"Error: {e}")
                    print(f"Current working directory: {os.getcwd()}")
                    print(f"Provided path: {folder_path}")
                    print("Please check if the path exists and is accessible.")

    def add_folder(self, folder_path:str) -> None:
        """
        Adds a path to a folder with files used for grounding.
        """

        if folder_path not in self.loaded_folders_paths:
            self._mark_folder_as_loaded(folder_path)

            # for PDF files, please note that the document will be split into pages: https://github.com/run-llama/llama_index/issues/15903
            new_files = SimpleDirectoryReader(folder_path).load_data()
            BaseSemanticGroundingConnector._set_internal_id_to_documents(new_files, "file_name")

            self.add_documents(new_files)
    
    def add_file_path(self, file_path:str) -> None:
        """
        Adds a path to a file used for grounding.
        """
        # a trick to make SimpleDirectoryReader work with a single file
        new_files = SimpleDirectoryReader(input_files=[file_path]).load_data()
        
        logger.debug(f"Adding the following file to grounding index: {new_files}")
        BaseSemanticGroundingConnector._set_internal_id_to_documents(new_files, "file_name")
    
    def _mark_folder_as_loaded(self, folder_path:str) -> None:
        if folder_path not in self.loaded_folders_paths:
            self.loaded_folders_paths.append(folder_path)
        
        if folder_path not in self.folders_paths:
            self.folders_paths.append(folder_path)

Ancestors

Class variables

var custom_deserializers
var custom_serializers
var serializable_attributes

Methods

def add_file_path(self, file_path: str) ‑> None

Adds a path to a file used for grounding.

Expand source code
def add_file_path(self, file_path:str) -> None:
    """
    Adds a path to a file used for grounding.
    """
    # a trick to make SimpleDirectoryReader work with a single file
    new_files = SimpleDirectoryReader(input_files=[file_path]).load_data()
    
    logger.debug(f"Adding the following file to grounding index: {new_files}")
    BaseSemanticGroundingConnector._set_internal_id_to_documents(new_files, "file_name")
def add_folder(self, folder_path: str) ‑> None

Adds a path to a folder with files used for grounding.

Expand source code
def add_folder(self, folder_path:str) -> None:
    """
    Adds a path to a folder with files used for grounding.
    """

    if folder_path not in self.loaded_folders_paths:
        self._mark_folder_as_loaded(folder_path)

        # for PDF files, please note that the document will be split into pages: https://github.com/run-llama/llama_index/issues/15903
        new_files = SimpleDirectoryReader(folder_path).load_data()
        BaseSemanticGroundingConnector._set_internal_id_to_documents(new_files, "file_name")

        self.add_documents(new_files)
def add_folders(self, folders_paths: list) ‑> None

Adds a path to a folder with files used for grounding.

Expand source code
def add_folders(self, folders_paths:list) -> None:
    """
    Adds a path to a folder with files used for grounding.
    """

    if folders_paths is not None:
        for folder_path in folders_paths:
            try:
                logger.debug(f"Adding the following folder to grounding index: {folder_path}")
                self.add_folder(folder_path)
            except (FileNotFoundError, ValueError) as e:
                print(f"Error: {e}")
                print(f"Current working directory: {os.getcwd()}")
                print(f"Provided path: {folder_path}")
                print("Please check if the path exists and is accessible.")

Inherited members

class WebPagesGroundingConnector (*args, **kwargs)

A base class for semantic grounding connectors. A semantic grounding connector is a component that indexes and retrieves documents based on so-called "semantic search" (i.e, embeddings-based search). This specific implementation is based on the VectorStoreIndex class from the LLaMa-Index library. Here, "documents" refer to the llama-index's data structure that stores a unit of content, not necessarily a file.

Expand source code
@utils.post_init
class WebPagesGroundingConnector(BaseSemanticGroundingConnector):

    serializable_attributes = ["web_urls"]

    def __init__(self, name:str="Web Pages", web_urls: list=None) -> None:
        super().__init__(name)

        self.web_urls = web_urls

        # @post_init ensures that _post_init is called after the __init__ method
    
    def _post_init(self):
        self.loaded_web_urls = []

        if not hasattr(self, 'web_urls') or self.web_urls is None:
            self.web_urls = []

        # load web urls
        self.add_web_urls(self.web_urls)
    
    def add_web_urls(self, web_urls:list) -> None:
        """ 
        Adds the data retrieved from the specified URLs to grounding.
        """
        filtered_web_urls = [url for url in web_urls if url not in self.loaded_web_urls]
        for url in filtered_web_urls:
            self._mark_web_url_as_loaded(url)

        if len(filtered_web_urls) > 0:
            new_documents = SimpleWebPageReader(html_to_text=True).load_data(filtered_web_urls)
            BaseSemanticGroundingConnector._set_internal_id_to_documents(new_documents, "url")
            self.add_documents(new_documents)
    
    def add_web_url(self, web_url:str) -> None:
        """
        Adds the data retrieved from the specified URL to grounding.
        """
        # we do it like this because the add_web_urls could run scrapes in parallel, so it is better
        # to implement this one in terms of the other
        self.add_web_urls([web_url])
    
    def _mark_web_url_as_loaded(self, web_url:str) -> None:
        if web_url not in self.loaded_web_urls:
            self.loaded_web_urls.append(web_url)
        
        if web_url not in self.web_urls:
            self.web_urls.append(web_url)

Ancestors

Class variables

var custom_deserializers
var custom_serializers
var serializable_attributes

Methods

def add_web_url(self, web_url: str) ‑> None

Adds the data retrieved from the specified URL to grounding.

Expand source code
def add_web_url(self, web_url:str) -> None:
    """
    Adds the data retrieved from the specified URL to grounding.
    """
    # we do it like this because the add_web_urls could run scrapes in parallel, so it is better
    # to implement this one in terms of the other
    self.add_web_urls([web_url])
def add_web_urls(self, web_urls: list) ‑> None

Adds the data retrieved from the specified URLs to grounding.

Expand source code
def add_web_urls(self, web_urls:list) -> None:
    """ 
    Adds the data retrieved from the specified URLs to grounding.
    """
    filtered_web_urls = [url for url in web_urls if url not in self.loaded_web_urls]
    for url in filtered_web_urls:
        self._mark_web_url_as_loaded(url)

    if len(filtered_web_urls) > 0:
        new_documents = SimpleWebPageReader(html_to_text=True).load_data(filtered_web_urls)
        BaseSemanticGroundingConnector._set_internal_id_to_documents(new_documents, "url")
        self.add_documents(new_documents)

Inherited members