GraphRAG

GraphRAG Indexing 🤖

The GraphRAG indexing package is a data pipeline and transformation suite that is designed to extract meaningful, structured data from unstructured text using LLMs.

Indexing Pipelines are configurable. They are composed of workflows, standard and custom steps, prompt templates, and input/output adapters. Our standard pipeline is designed to:

The outputs of the pipeline can be stored in a variety of formats, including JSON and Parquet - or they can be handled manually via the Python API.

Getting Started

Requirements

See the requirements section in Get Started for details on setting up a development environment.

The Indexing Engine can be used in either a default configuration mode or with a custom pipeline. To configure GraphRAG, see the configuration documentation. After you have a config file you can run the pipeline using the CLI or the Python API.

Usage

CLI

# Via Poetry
poetry run poe cli --root <data_root> # default config mode
poetry run poe cli --config your_pipeline.yml # custom config mode

# Via Node
yarn run:index --root <data_root> # default config mode
yarn run:index --config your_pipeline.yml # custom config mode

Python API

from graphrag.index import run_pipeline
from graphrag.index.config import PipelineWorkflowReference

workflows: list[PipelineWorkflowReference] = [
    PipelineWorkflowReference(
        steps=[
            {
                # built-in verb
                "verb": "derive",  # https://github.com/microsoft/datashaper/blob/main/python/datashaper/datashaper/engine/verbs/derive.py
                "args": {
                    "column1": "col1",  # from above
                    "column2": "col2",  # from above
                    "to": "col_multiplied",  # new column name
                    "operator": "*",  # multiply the two columns
                },
                # Since we're trying to act on the default input, we don't need explicitly to specify an input
            }
        ]
    ),
]

dataset = pd.DataFrame([{"col1": 2, "col2": 4}, {"col1": 5, "col2": 10}])
outputs = []
async for output in await run_pipeline(dataset=dataset, workflows=workflows):
    outputs.append(output)
pipeline_result = outputs[-1]
print(pipeline_result)

Further Reading