Concurrency with Pandas¶
pd.DataFrame.aio is an accessor that provides an async interface to OpenAI's API. It is designed to be used with Python's asyncio library, allowing you to make non-blocking concurrent requests to the OpenAI API.
In [2]:
Copied!
import os
from enum import Enum
import pandas as pd
from openai import AsyncOpenAI, OpenAI
from pydantic import BaseModel, Field
from openaivec import pandas_ext
pandas_ext.set_client(OpenAI())
pandas_ext.set_async_client(AsyncOpenAI())
pandas_ext.set_responses_model("gpt-4.1-mini")
pandas_ext.set_embeddings_model("text-embedding-3-small")
import os
from enum import Enum
import pandas as pd
from openai import AsyncOpenAI, OpenAI
from pydantic import BaseModel, Field
from openaivec import pandas_ext
pandas_ext.set_client(OpenAI())
pandas_ext.set_async_client(AsyncOpenAI())
pandas_ext.set_responses_model("gpt-4.1-mini")
pandas_ext.set_embeddings_model("text-embedding-3-small")
In [3]:
Copied!
files_dict = {"path": []}
for root, dirs, files in os.walk('../../src/openaivec'):
for file in files:
path = os.path.join(root, file)
if "__" not in path:
files_dict["path"].append(os.path.join(root, file))
implements_df = pd.DataFrame(files_dict).assign(
module=lambda df: df["path"].str.split("/")
.map(lambda x: x[3:])
.map(lambda x: ".".join(x))
.map(lambda x: x.replace(".py", "")),
)
files_dict = {"path": []}
for root, dirs, files in os.walk('../../src/openaivec'):
for file in files:
path = os.path.join(root, file)
if "__" not in path:
files_dict["path"].append(os.path.join(root, file))
implements_df = pd.DataFrame(files_dict).assign(
module=lambda df: df["path"].str.split("/")
.map(lambda x: x[3:])
.map(lambda x: ".".join(x))
.map(lambda x: x.replace(".py", "")),
)
In [4]:
Copied!
implements_df
implements_df
Out[4]:
| path | module | |
|---|---|---|
| 0 | ../../src/openaivec/_di.py | openaivec._di |
| 1 | ../../src/openaivec/_provider.py | openaivec._provider |
| 2 | ../../src/openaivec/_prompt.py | openaivec._prompt |
| 3 | ../../src/openaivec/_responses.py | openaivec._responses |
| 4 | ../../src/openaivec/_serialize.py | openaivec._serialize |
| 5 | ../../src/openaivec/_util.py | openaivec._util |
| 6 | ../../src/openaivec/pandas_ext.py | openaivec.pandas_ext |
| 7 | ../../src/openaivec/spark.py | openaivec.spark |
| 8 | ../../src/openaivec/_model.py | openaivec._model |
| 9 | ../../src/openaivec/_log.py | openaivec._log |
| 10 | ../../src/openaivec/_embeddings.py | openaivec._embeddings |
| 11 | ../../src/openaivec/_schema/spec.py | openaivec._schema.spec |
| 12 | ../../src/openaivec/_schema/infer.py | openaivec._schema.infer |
| 13 | ../../src/openaivec/_cache/optimize.py | openaivec._cache.optimize |
| 14 | ../../src/openaivec/_cache/proxy.py | openaivec._cache.proxy |
| 15 | ../../src/openaivec/task/customer_support/inqu... | openaivec.task.customer_support.inquiry_summary |
| 16 | ../../src/openaivec/task/customer_support/urge... | openaivec.task.customer_support.urgency_analysis |
| 17 | ../../src/openaivec/task/customer_support/inqu... | openaivec.task.customer_support.inquiry_classi... |
| 18 | ../../src/openaivec/task/customer_support/resp... | openaivec.task.customer_support.response_sugge... |
| 19 | ../../src/openaivec/task/customer_support/cust... | openaivec.task.customer_support.customer_senti... |
| 20 | ../../src/openaivec/task/customer_support/inte... | openaivec.task.customer_support.intent_analysis |
| 21 | ../../src/openaivec/task/table/fillna.py | openaivec.task.table.fillna |
| 22 | ../../src/openaivec/task/nlp/named_entity_reco... | openaivec.task.nlp.named_entity_recognition |
| 23 | ../../src/openaivec/task/nlp/keyword_extractio... | openaivec.task.nlp.keyword_extraction |
| 24 | ../../src/openaivec/task/nlp/sentiment_analysi... | openaivec.task.nlp.sentiment_analysis |
| 25 | ../../src/openaivec/task/nlp/morphological_ana... | openaivec.task.nlp.morphological_analysis |
| 26 | ../../src/openaivec/task/nlp/translation.py | openaivec.task.nlp.translation |
| 27 | ../../src/openaivec/task/nlp/dependency_parsin... | openaivec.task.nlp.dependency_parsing |
In [5]:
Copied!
class OpjectType(str, Enum):
FUNCTION = "function"
CLASS = "class"
VARIABLE = "variable"
class Question(BaseModel):
question: str = Field(description="The specific question related to the code section.")
answer: str = Field(description="The corresponding answer explaining the code aspect.")
class Section(BaseModel):
name: str = Field(description="The name of the function or class being documented.")
type: OpjectType = Field(description="The type of the code section, either a function or a class.")
description: str = Field(description="A concise summary of the function or class's purpose and functionality.")
questions: list[Question] = Field(description="A list of Q&A pairs clarifying aspects of this code section.")
class Document(BaseModel):
sections: list[Section] = Field(description="A list of sections, each documenting a specific function or class.")
class OpjectType(str, Enum):
FUNCTION = "function"
CLASS = "class"
VARIABLE = "variable"
class Question(BaseModel):
question: str = Field(description="The specific question related to the code section.")
answer: str = Field(description="The corresponding answer explaining the code aspect.")
class Section(BaseModel):
name: str = Field(description="The name of the function or class being documented.")
type: OpjectType = Field(description="The type of the code section, either a function or a class.")
description: str = Field(description="A concise summary of the function or class's purpose and functionality.")
questions: list[Question] = Field(description="A list of Q&A pairs clarifying aspects of this code section.")
class Document(BaseModel):
sections: list[Section] = Field(description="A list of sections, each documenting a specific function or class.")
Note that async methods are not available in lambda functions. We can use aio.assign instead of assign to use an async function in a lambda function.
And top-level await is allowed in notebook cells.
In [6]:
Copied!
docs_df = await implements_df.aio.assign(
code=lambda df: df["path"].map(lambda x: open(x).read()),
doc=lambda df: df["code"].aio.responses(
instructions="Document the code in detail, including a summary, Q&A pairs, and explanations.",
response_format=Document,
batch_size=1,
),
)
docs_df = await implements_df.aio.assign(
code=lambda df: df["path"].map(lambda x: open(x).read()),
doc=lambda df: df["code"].aio.responses(
instructions="Document the code in detail, including a summary, Q&A pairs, and explanations.",
response_format=Document,
batch_size=1,
),
)
Processing batches: 0%| | 0/28 [00:00<?, ?item/s]
In [7]:
Copied!
docs_df
docs_df
Out[7]:
| path | module | code | doc | |
|---|---|---|---|---|
| 0 | ../../src/openaivec/_di.py | openaivec._di | from collections.abc import Callable\nfrom dat... | sections=[Section(name='CircularDependencyErro... |
| 1 | ../../src/openaivec/_provider.py | openaivec._provider | import os\nimport warnings\n\nimport tiktoken\... | sections=[Section(name='_check_azure_v1_api_ur... |
| 2 | ../../src/openaivec/_prompt.py | openaivec._prompt | """\nThis module provides a builder for creati... | sections=[Section(name='Example', type=<Opject... |
| 3 | ../../src/openaivec/_responses.py | openaivec._responses | import warnings\nfrom dataclasses import datac... | sections=[Section(name='_handle_temperature_er... |
| 4 | ../../src/openaivec/_serialize.py | openaivec._serialize | """Refactored serialization utilities for Pyda... | sections=[Section(name='serialize_base_model',... |
| 5 | ../../src/openaivec/_util.py | openaivec._util | import asyncio\nimport functools\nimport re\ni... | sections=[Section(name='get_exponential_with_c... |
| 6 | ../../src/openaivec/pandas_ext.py | openaivec.pandas_ext | """Pandas Series / DataFrame extension for Ope... | sections=[Section(name='_df_rows_to_json_serie... |
| 7 | ../../src/openaivec/spark.py | openaivec.spark | """Asynchronous Spark UDFs for the OpenAI and ... | sections=[Section(name='setup', type=<OpjectTy... |
| 8 | ../../src/openaivec/_model.py | openaivec._model | from dataclasses import dataclass, field\nfrom... | sections=[Section(name='PreparedTask', type=<O... |
| 9 | ../../src/openaivec/_log.py | openaivec._log | import functools\nimport json\nimport time\nim... | sections=[Section(name='observe', type=<Opject... |
| 10 | ../../src/openaivec/_embeddings.py | openaivec._embeddings | from dataclasses import dataclass, field\nfrom... | sections=[Section(name='BatchEmbeddings', type... |
| 11 | ../../src/openaivec/_schema/spec.py | openaivec._schema.spec | from __future__ import annotations\n\nimport r... | sections=[Section(name='FieldSpec', type=<Opje... |
| 12 | ../../src/openaivec/_schema/infer.py | openaivec._schema.infer | """Internal schema inference & dynamic model m... | sections=[Section(name='SchemaInferenceOutput'... |
| 13 | ../../src/openaivec/_cache/optimize.py | openaivec._cache.optimize | import threading\nimport time\nfrom contextlib... | sections=[Section(name='PerformanceMetric', ty... |
| 14 | ../../src/openaivec/_cache/proxy.py | openaivec._cache.proxy | import asyncio\nimport threading\nfrom collect... | sections=[Section(name='ProxyBase', type=<Opje... |
| 15 | ../../src/openaivec/task/customer_support/inqu... | openaivec.task.customer_support.inquiry_summary | """Inquiry summary task for customer support i... | sections=[Section(name='InquirySummary', type=... |
| 16 | ../../src/openaivec/task/customer_support/urge... | openaivec.task.customer_support.urgency_analysis | """Urgency analysis task for customer support.... | sections=[Section(name='UrgencyAnalysis', type... |
| 17 | ../../src/openaivec/task/customer_support/inqu... | openaivec.task.customer_support.inquiry_classi... | """Inquiry classification task for customer su... | sections=[Section(name='InquiryClassification'... |
| 18 | ../../src/openaivec/task/customer_support/resp... | openaivec.task.customer_support.response_sugge... | """Response suggestion task for customer suppo... | sections=[Section(name='ResponseSuggestion', t... |
| 19 | ../../src/openaivec/task/customer_support/cust... | openaivec.task.customer_support.customer_senti... | """Customer sentiment analysis task for suppor... | sections=[Section(name='CustomerSentiment', ty... |
| 20 | ../../src/openaivec/task/customer_support/inte... | openaivec.task.customer_support.intent_analysis | """Intent analysis task for customer support i... | sections=[Section(name='IntentAnalysis', type=... |
| 21 | ../../src/openaivec/task/table/fillna.py | openaivec.task.table.fillna | """Missing value imputation task for DataFrame... | sections=[Section(name='get_examples', type=<O... |
| 22 | ../../src/openaivec/task/nlp/named_entity_reco... | openaivec.task.nlp.named_entity_recognition | """Named entity recognition task for OpenAI AP... | sections=[Section(name='NamedEntity', type=<Op... |
| 23 | ../../src/openaivec/task/nlp/keyword_extractio... | openaivec.task.nlp.keyword_extraction | """Keyword extraction task for OpenAI API.\n\n... | sections=[Section(name='Keyword', type=<Opject... |
| 24 | ../../src/openaivec/task/nlp/sentiment_analysi... | openaivec.task.nlp.sentiment_analysis | """Sentiment analysis task for OpenAI API.\n\n... | sections=[Section(name='SentimentAnalysis', ty... |
| 25 | ../../src/openaivec/task/nlp/morphological_ana... | openaivec.task.nlp.morphological_analysis | """Morphological analysis task for OpenAI API.... | sections=[Section(name='MorphologicalAnalysis'... |
| 26 | ../../src/openaivec/task/nlp/translation.py | openaivec.task.nlp.translation | """Multilingual translation task for OpenAI AP... | sections=[Section(name='MULTILINGUAL_TRANSLATI... |
| 27 | ../../src/openaivec/task/nlp/dependency_parsin... | openaivec.task.nlp.dependency_parsing | """Dependency parsing task for OpenAI API.\n\n... | sections=[Section(name='DependencyRelation', t... |
In [8]:
Copied!
questions_df = await docs_df.aio.pipe(
lambda df: df
.drop(columns=["code"])
.ai.extract("doc")
.explode("doc_sections")
.ai.extract("doc_sections")
.explode("doc_sections_questions")
.ai.extract("doc_sections_questions")
.reset_index(drop=True)
.aio.assign(
doc_sections_type=lambda df: df.doc_sections_type.map(lambda x: x.value),
embedding=lambda df: df["doc_sections_questions_question"].aio.embeddings()
)
)
questions_df = await docs_df.aio.pipe(
lambda df: df
.drop(columns=["code"])
.ai.extract("doc")
.explode("doc_sections")
.ai.extract("doc_sections")
.explode("doc_sections_questions")
.ai.extract("doc_sections_questions")
.reset_index(drop=True)
.aio.assign(
doc_sections_type=lambda df: df.doc_sections_type.map(lambda x: x.value),
embedding=lambda df: df["doc_sections_questions_question"].aio.embeddings()
)
)
Processing batches: 0%| | 0/326 [00:00<?, ?item/s]
In [9]:
Copied!
questions_df
questions_df
Out[9]:
| path | module | doc_sections_name | doc_sections_type | doc_sections_description | doc_sections_questions_question | doc_sections_questions_answer | embedding | |
|---|---|---|---|---|---|---|---|---|
| 0 | ../../src/openaivec/_di.py | openaivec._di | CircularDependencyError | class | Exception raised when the container detects a ... | What causes a CircularDependencyError to be ra... | It is raised when during resolution the contai... | [-0.027163064, 0.033928107, 0.07552155, 0.0463... |
| 1 | ../../src/openaivec/_di.py | openaivec._di | CircularDependencyError | class | Exception raised when the container detects a ... | How can I handle a circular dependency in my s... | You should refactor your services to break the... | [-0.04108237, -0.0004586271, 0.055678822, -0.0... |
| 2 | ../../src/openaivec/_di.py | openaivec._di | ProviderError | class | Exception raised when a provider function fail... | When is ProviderError raised? | It is raised if the provider callable throws a... | [-0.06448649, -0.027017353, 0.0700168, 0.04274... |
| 3 | ../../src/openaivec/_di.py | openaivec._di | ProviderError | class | Exception raised when a provider function fail... | How can I debug a ProviderError? | Check the inner exception and the error messag... | [-0.066516794, -0.025948327, 0.03345204, 0.011... |
| 4 | ../../src/openaivec/_di.py | openaivec._di | Provider | variable | Type alias for a callable that takes no argume... | What is a Provider in this context? | A callable that, when invoked, creates and ret... | [-0.04972546, -0.061839428, 0.03576001, 0.0449... |
| ... | ... | ... | ... | ... | ... | ... | ... | ... |
| 325 | ../../src/openaivec/task/nlp/dependency_parsin... | openaivec.task.nlp.dependency_parsing | DependencyParsing | class | Models the full dependency parsing output for ... | What information does 'dependencies' hold? | It holds a list of DependencyRelation instance... | [-0.023963861, 0.019676488, 0.0939815, -0.0201... |
| 326 | ../../src/openaivec/task/nlp/dependency_parsin... | openaivec.task.nlp.dependency_parsing | DependencyParsing | class | Models the full dependency parsing output for ... | How is 'root_word' defined? | It identifies the root word of the sentence's ... | [-0.009704481, 0.011894519, 0.08572626, 0.0383... |
| 327 | ../../src/openaivec/task/nlp/dependency_parsin... | openaivec.task.nlp.dependency_parsing | DependencyParsing | class | Models the full dependency parsing output for ... | What type of data is 'syntactic_structure'? | A string representing a tree-like structure th... | [0.015374741, 0.029959805, 0.019000068, -0.040... |
| 328 | ../../src/openaivec/task/nlp/dependency_parsin... | openaivec.task.nlp.dependency_parsing | DEPENDENCY_PARSING | variable | A pre-configured PreparedTask instance for per... | What is PreparedTask? | It is a utility class (imported) designed to e... | [-0.012449127, -0.009066647, 0.009980651, 0.00... |
| 329 | ../../src/openaivec/task/nlp/dependency_parsin... | openaivec.task.nlp.dependency_parsing | DEPENDENCY_PARSING | variable | A pre-configured PreparedTask instance for per... | Why are temperature and top_p set to 0.0 and 1.0? | These settings make the output deterministic t... | [0.0037462874, -0.017248916, 0.008083123, -0.0... |
330 rows × 8 columns
In [ ]:
Copied!