Module tinytroupe.clients.openai_client
Expand source code
import configparser
import logging
import os
import pickle
import threading
import time
from contextlib import contextmanager
from typing import Union
import httpx
import openai
import tiktoken
from openai import APITimeoutError, AzureOpenAI, OpenAI
from tinytroupe import config_manager, utils
from tinytroupe.control import transactional
logger = logging.getLogger("tinytroupe")
# We'll use various configuration elements below
config = utils.read_config_file()
###########################################################################
# Client class
###########################################################################
class OpenAIClient:
"""
A utility class for interacting with the OpenAI API.
"""
@config_manager.config_defaults(
cache_api_calls="cache_api_calls",
cache_file_name="cache_file_name",
max_concurrent_model_calls="max_concurrent_model_calls",
)
def __init__(
self,
cache_api_calls=None,
cache_file_name=None,
max_concurrent_model_calls=None,
) -> None:
logger.debug("Initializing OpenAIClient")
self._cache_lock = threading.RLock()
self._max_concurrent_model_calls = self._normalize_concurrency_limit(
max_concurrent_model_calls
)
self._concurrency_semaphore = (
threading.BoundedSemaphore(self._max_concurrent_model_calls)
if self._max_concurrent_model_calls is not None
else None
)
# Initialize cost tracking variables
self._cost_stats_lock = threading.RLock()
self._reset_cost_stats()
self.set_api_cache(cache_api_calls, cache_file_name)
@staticmethod
def _normalize_concurrency_limit(value):
if value is None:
return None
try:
candidate = int(value)
except (TypeError, ValueError):
logger.warning(
"Invalid concurrency limit '%s'. Concurrency protection disabled.",
value,
)
return None
if candidate <= 0:
return None
return candidate
@config_manager.config_defaults(cache_file_name="cache_file_name")
def set_api_cache(self, cache_api_calls, cache_file_name=None):
"""
Enables or disables the caching of API calls.
Args:
cache_file_name (str): The name of the file to use for caching API calls.
"""
self.cache_api_calls = cache_api_calls
self.cache_file_name = cache_file_name
if self.cache_api_calls:
# load the cache, if any
self.api_cache = self._load_cache()
def _reset_cost_stats(self):
"""
Resets the cost statistics to zero.
"""
with self._cost_stats_lock:
self._input_tokens = 0
self._output_tokens = 0
self._total_tokens = 0
self._model_calls = 0
self._cached_calls = 0
@config_manager.config_defaults(timeout="timeout")
def _setup_from_config(self, timeout=None):
"""
Sets up the OpenAI API configurations for this client.
"""
# On Sept./Oct. 2025 I noticed that the OpenAI library was randomly hanging during requests,
# and even the timeout was not being enforced. So after nearly going mad, I found out the
# strategy below to cancel problematic requests.
# Create httpx client with proper timeouts to prevent hanging
# This ensures timeouts work at ALL levels: connection, read, write, pool
httpx_client = httpx.Client(
timeout=httpx.Timeout(
timeout=timeout, # Overall timeout
connect=10.0, # Connection timeout (fixed at 10s)
read=timeout, # Read timeout (from config)
write=10.0, # Write timeout (fixed at 10s)
pool=5.0, # Pool timeout (fixed at 5s)
)
)
# we set max_retries to 0 because we do our own retrying with customized exponential backoff
self.client = OpenAI(
api_key=os.getenv("OPENAI_API_KEY"), max_retries=0, http_client=httpx_client
)
@config_manager.config_defaults(
model="model",
temperature="temperature",
max_completion_tokens="max_completion_tokens",
frequency_penalty="frequency_penalty",
presence_penalty="presence_penalty",
timeout="timeout",
max_attempts="max_attempts",
waiting_time="waiting_time",
exponential_backoff_factor="exponential_backoff_factor",
response_format=None,
echo=None,
)
def send_message(
self,
current_messages,
dedent_messages=True,
model=None,
temperature=None,
max_completion_tokens=None,
top_p=None,
frequency_penalty=None,
presence_penalty=None,
stop=None,
timeout=None,
max_attempts=None,
waiting_time=None,
exponential_backoff_factor=None,
n=1,
response_format=None,
enable_pydantic_model_return=False,
echo=False,
):
"""
Sends a message to the OpenAI API and returns the response.
Args:
current_messages (list): A list of dictionaries representing the conversation history.
dedent_messages (bool): Whether to dedent the messages before sending them to the API.
model (str): The ID of the model to use for generating the response.
temperature (float): Controls the "creativity" of the response. Higher values result in more diverse responses.
max_completion_tokens (int): The maximum number of tokens (words or punctuation marks) to generate in the response.
top_p (float): Controls the "quality" of the response. Higher values result in more coherent responses.
frequency_penalty (float): Controls the "repetition" of the response. Higher values result in less repetition.
presence_penalty (float): Controls the "diversity" of the response. Higher values result in more diverse responses.
stop (str): A string that, if encountered in the generated response, will cause the generation to stop.
max_attempts (int): The maximum number of attempts to make before giving up on generating a response.
timeout (int): The maximum number of seconds to wait for a response from the API.
waiting_time (int): The number of seconds to wait between requests.
exponential_backoff_factor (int): The factor by which to increase the waiting time between requests.
n (int): The number of completions to generate.
response_format: The format of the response, if any.
echo (bool): Whether to echo the input message in the response.
enable_pydantic_model_return (bool): Whether to enable Pydantic model return instead of dict when possible.
Returns:
A dictionary representing the generated response.
"""
from tinytroupe.clients import ( # avoid circular import
InvalidRequestError,
NonTerminalError,
)
def aux_exponential_backoff():
nonlocal waiting_time
# in case waiting time was initially set to 0
if waiting_time <= 0:
waiting_time = 2
logger.info(
f"Request failed. Waiting {waiting_time} seconds between requests..."
)
time.sleep(waiting_time)
# exponential backoff
waiting_time = waiting_time * exponential_backoff_factor
# setup the OpenAI configurations for this client.
self._setup_from_config()
# dedent the messages (field 'content' only) if needed (using textwrap)
if dedent_messages:
for message in current_messages:
if "content" in message:
message["content"] = utils.dedent(message["content"])
# We need to adapt the parameters to the API type, so we create a dictionary with them first
chat_api_params = {
"model": model,
"messages": current_messages,
"temperature": temperature,
"max_completion_tokens": max_completion_tokens,
"top_p": top_p,
"frequency_penalty": frequency_penalty,
"presence_penalty": presence_penalty,
"stop": stop,
"timeout": timeout,
"stream": False,
"n": n,
}
if response_format is not None:
chat_api_params["response_format"] = response_format
# remove any parameter that is None, so we use the API defaults
chat_api_params = {k: v for k, v in chat_api_params.items() if v is not None}
i = 0
while i < max_attempts:
try:
i += 1
try:
logger.debug(
f"Sending messages to OpenAI API. Token count={self._count_tokens(current_messages, model)}."
)
except NotImplementedError:
logger.debug(f"Token count not implemented for model {model}.")
start_time = time.monotonic()
logger.debug(
f"Calling model with client class {self.__class__.__name__}."
)
###############################################################
# call the model, either from the cache or from the API
###############################################################
cache_key = str((model, chat_api_params)) # need string to be hashable
pre_cached_response = self._get_cached_response(cache_key)
should_wait_before_call = (
waiting_time > 0 and pre_cached_response is None
)
if should_wait_before_call:
logger.info(
f"Waiting {waiting_time} seconds before next API request (to avoid throttling)..."
)
time.sleep(waiting_time)
with self._concurrency_slot():
response = None
cached_response = (
pre_cached_response
if pre_cached_response is not None
else self._get_cached_response(cache_key)
)
if cached_response is not None:
response = cached_response
else:
response = self._raw_model_call(model, chat_api_params)
if self.cache_api_calls:
with self._cache_lock:
existing = (
self.api_cache.get(cache_key)
if hasattr(self, "api_cache")
else None
)
if existing is None:
self.api_cache[cache_key] = response
self._save_cache()
else:
response = existing
raw_message = self._raw_model_response_extractor(response)
# Update cost statistics
self._update_cost_stats(response, cached_response is not None)
logger.debug(f"Got response from API: {response}")
end_time = time.monotonic()
logger.debug(
f"Got response in {end_time - start_time:.2f} seconds after {i} attempts."
)
if enable_pydantic_model_return:
return utils.to_pydantic_or_sanitized_dict(
raw_message,
model=response_format,
)
else:
return utils.sanitize_dict(raw_message)
except InvalidRequestError as e:
logger.error(f"[{i}] Invalid request error, won't retry: {e}")
# there's no point in retrying if the request is invalid
# so we return None right away
return None
except openai.BadRequestError as e:
logger.error(f"[{i}] Invalid request error, won't retry: {e}")
# there's no point in retrying if the request is invalid
# so we return None right away
return None
except openai.RateLimitError:
logger.warning(
f"[{i}] Rate limit error, waiting a bit and trying again."
)
aux_exponential_backoff()
except NonTerminalError as e:
logger.error(f"[{i}] Non-terminal error: {e}")
aux_exponential_backoff()
except APITimeoutError as e:
logger.error(f"[{i}] API Timeout error: {e}")
# no exponential timeout backoff here, just retry
except Exception as e:
logger.error(f"[{i}] {type(e).__name__} Error: {e}")
aux_exponential_backoff()
logger.error(f"Failed to get response after {max_attempts} attempts.")
return None
def _raw_model_call(self, model, chat_api_params):
"""
Calls the OpenAI API with the given parameters. Subclasses should
override this method to implement their own API calls.
"""
# adjust parameters depending on the model
if self._is_reasoning_model(model):
# Reasoning models have slightly different parameters
del chat_api_params["stream"]
del chat_api_params["temperature"]
del chat_api_params["top_p"]
del chat_api_params["frequency_penalty"]
del chat_api_params["presence_penalty"]
chat_api_params["max_completion_tokens"] = chat_api_params[
"max_completion_tokens"
]
del chat_api_params["max_completion_tokens"]
chat_api_params["reasoning_effort"] = config_manager.get("reasoning_effort")
# gpt-5 only supports temperature=1.0 (default), so remove temperature param if not default
if "gpt-5" in model and "temperature" in chat_api_params:
if chat_api_params["temperature"] != 1.0:
logger.warning(
f"gpt-5 only supports temperature=1.0, removing custom temperature={chat_api_params['temperature']}"
)
del chat_api_params["temperature"]
# To make the log cleaner, we remove the messages from the logged parameters
logged_params = {k: v for k, v in chat_api_params.items() if k != "messages"}
if "response_format" in chat_api_params:
# to enforce the response format via pydantic, we need to use a different method
if "stream" in chat_api_params:
del chat_api_params["stream"]
logger.debug(
f"Calling LLM model (using .parse too) with these parameters: {logged_params}. Not showing 'messages' parameter."
)
# complete message
logger.debug(
f" --> Complete messages sent to LLM: {chat_api_params['messages']}"
)
result_message = self.client.beta.chat.completions.parse(**chat_api_params)
return result_message
else:
logger.debug(
f"Calling LLM model with these parameters: {logged_params}. Not showing 'messages' parameter."
)
return self.client.chat.completions.create(**chat_api_params)
def _is_reasoning_model(self, model):
return "o1" in model or "o3" in model
def _raw_model_response_extractor(self, response):
"""
Extracts the response from the API response. Subclasses should
override this method to implement their own response extraction.
"""
return response.choices[0].message.to_dict()
def _get_cached_response(self, cache_key):
if not self.cache_api_calls:
return None
cache_store = getattr(self, "api_cache", None)
if cache_store is None:
return None
with self._cache_lock:
return cache_store.get(cache_key)
@contextmanager
def _concurrency_slot(self):
if self._concurrency_semaphore is None:
yield
return
self._concurrency_semaphore.acquire()
try:
yield
finally:
self._concurrency_semaphore.release()
def _count_tokens(self, messages: list, model: str):
"""
Count the number of OpenAI tokens in a list of messages using tiktoken.
Adapted from https://github.com/openai/openai-cookbook/blob/main/examples/How_to_count_tokens_with_tiktoken.ipynb
Args:
messages (list): A list of dictionaries representing the conversation history.
model (str): The name of the model to use for encoding the string.
"""
try:
try:
encoding = tiktoken.encoding_for_model(model)
except KeyError:
logger.debug(
"Token count: model not found. Using cl100k_base encoding."
)
encoding = tiktoken.get_encoding("cl100k_base")
if (
model
in {
"gpt-3.5-turbo-0613",
"gpt-3.5-turbo-16k-0613",
"gpt-4-0314",
"gpt-4-32k-0314",
"gpt-4-0613",
"gpt-4-32k-0613",
}
or "o1" in model
or "o3" in model
): # assuming o1/3 models work the same way
tokens_per_message = 3
tokens_per_name = 1
elif model == "gpt-3.5-turbo-0301":
tokens_per_message = (
4 # every message follows <|start|>{role/name}\n{content}<|end|>\n
)
tokens_per_name = -1 # if there's a name, the role is omitted
elif "gpt-3.5-turbo" in model:
logger.debug(
"Token count: gpt-3.5-turbo may update over time. Returning num tokens assuming gpt-3.5-turbo-0613."
)
return self._count_tokens(messages, model="gpt-3.5-turbo-0613")
elif ("gpt-4" in model) or ("ppo" in model):
logger.debug(
"Token count: gpt-4 may update over time. Returning num tokens assuming gpt-4-0613."
)
return self._count_tokens(messages, model="gpt-4-0613")
elif "gpt-5" in model:
logger.debug(
"Token count: no info on GPT-5 tokenizer yet, so we are just reusing GPT-4's."
)
return self._count_tokens(messages, model="gpt-4-0613")
else:
raise NotImplementedError(
f"""_count_tokens() is not implemented for model {model}. See https://github.com/openai/openai-python/blob/main/chatml.md for information on how messages are converted to tokens."""
)
num_tokens = 0
for message in messages:
num_tokens += tokens_per_message
for key, value in message.items():
num_tokens += len(encoding.encode(value))
if key == "name":
num_tokens += tokens_per_name
num_tokens += 3 # every reply is primed with <|start|>assistant<|message|>
return num_tokens
except Exception as e:
logger.error(f"Error counting tokens: {e}")
return None
def _save_cache(self):
"""
Saves the API cache to disk. We use pickle to do that because some obj
are not JSON serializable.
"""
# use pickle to save the cache
with open(self.cache_file_name, "wb") as f:
pickle.dump(self.api_cache, f)
def _load_cache(self):
"""
Loads the API cache from disk.
"""
if os.path.exists(self.cache_file_name):
try:
with open(self.cache_file_name, "rb") as f:
return pickle.load(f)
except (EOFError, pickle.UnpicklingError) as e:
logger.warning(f"Cache file exists but could not be loaded: {e}. Starting with empty cache.")
return {}
return {}
@config_manager.config_defaults(model="embedding_model")
def get_embedding(self, text, model=None):
"""
Gets the embedding of the given text using the specified model.
Args:
text (str): The text to embed.
model (str): The name of the model to use for embedding the text.
Returns:
The embedding of the text.
"""
response = self._raw_embedding_model_call(text, model)
return self._raw_embedding_model_response_extractor(response)
def _raw_embedding_model_call(self, text, model):
"""
Calls the OpenAI API to get the embedding of the given text. Subclasses should
override this method to implement their own API calls.
"""
return self.client.embeddings.create(input=[text], model=model)
def _raw_embedding_model_response_extractor(self, response):
"""
Extracts the embedding from the API response. Subclasses should
override this method to implement their own response extraction.
"""
return response.data[0].embedding
def _update_cost_stats(self, response, was_cached):
"""
Updates the cost statistics based on the API response.
Args:
response: The response object from the API.
was_cached (bool): Whether this response came from cache.
"""
with self._cost_stats_lock:
if was_cached:
self._cached_calls += 1
else:
self._model_calls += 1
# Extract token usage from response if available
if hasattr(response, "usage"):
usage = response.usage
if hasattr(usage, "prompt_tokens"):
self._input_tokens += usage.prompt_tokens
if hasattr(usage, "completion_tokens"):
self._output_tokens += usage.completion_tokens
if hasattr(usage, "total_tokens"):
self._total_tokens += usage.total_tokens
# Log the latest values in debug mode
logger.debug(
f"Cost stats updated - Input tokens: {usage.prompt_tokens if hasattr(usage, 'prompt_tokens') else 0}, "
f"Output tokens: {usage.completion_tokens if hasattr(usage, 'completion_tokens') else 0}, "
f"Total tokens: {usage.total_tokens if hasattr(usage, 'total_tokens') else 0}, "
f"Cached: {was_cached}"
)
def get_cost_stats(self):
"""
Returns the current cost statistics.
Returns:
dict: A dictionary containing cost statistics with keys:
- input_tokens: Number of input/prompt tokens used
- output_tokens: Number of output/completion tokens used
- total_tokens: Total number of tokens used
- model_calls: Number of actual API calls made
- cached_calls: Number of calls served from cache
"""
with self._cost_stats_lock:
return {
"input_tokens": self._input_tokens,
"output_tokens": self._output_tokens,
"total_tokens": self._total_tokens,
"model_calls": self._model_calls,
"cached_calls": self._cached_calls,
}
def pretty_print_cost_stats(self):
"""
Pretty prints the cost statistics to the console.
"""
stats = self.get_cost_stats()
print("\n" + "=" * 60)
print("LLM API COST STATISTICS")
print("=" * 60)
print(f"Input tokens: {stats['input_tokens']:,}")
print(f"Output tokens: {stats['output_tokens']:,}")
print(f"Total tokens: {stats['total_tokens']:,}")
print(f"Model API calls: {stats['model_calls']:,}")
print(f"Cached calls: {stats['cached_calls']:,}")
print(f"Total calls: {stats['model_calls'] + stats['cached_calls']:,}")
if stats["model_calls"] > 0:
print(
f"Avg tokens per call: {stats['total_tokens'] / stats['model_calls']:.1f}"
)
print("=" * 60 + "\n")
def reset_cost_stats(self):
"""
Resets the cost statistics. This is the public method that users should call.
"""
self._reset_cost_stats()
logger.info("Cost statistics have been reset.")
Classes
class OpenAIClient (cache_api_calls=None, cache_file_name=None, max_concurrent_model_calls=None)-
A utility class for interacting with the OpenAI API.
Expand source code
class OpenAIClient: """ A utility class for interacting with the OpenAI API. """ @config_manager.config_defaults( cache_api_calls="cache_api_calls", cache_file_name="cache_file_name", max_concurrent_model_calls="max_concurrent_model_calls", ) def __init__( self, cache_api_calls=None, cache_file_name=None, max_concurrent_model_calls=None, ) -> None: logger.debug("Initializing OpenAIClient") self._cache_lock = threading.RLock() self._max_concurrent_model_calls = self._normalize_concurrency_limit( max_concurrent_model_calls ) self._concurrency_semaphore = ( threading.BoundedSemaphore(self._max_concurrent_model_calls) if self._max_concurrent_model_calls is not None else None ) # Initialize cost tracking variables self._cost_stats_lock = threading.RLock() self._reset_cost_stats() self.set_api_cache(cache_api_calls, cache_file_name) @staticmethod def _normalize_concurrency_limit(value): if value is None: return None try: candidate = int(value) except (TypeError, ValueError): logger.warning( "Invalid concurrency limit '%s'. Concurrency protection disabled.", value, ) return None if candidate <= 0: return None return candidate @config_manager.config_defaults(cache_file_name="cache_file_name") def set_api_cache(self, cache_api_calls, cache_file_name=None): """ Enables or disables the caching of API calls. Args: cache_file_name (str): The name of the file to use for caching API calls. """ self.cache_api_calls = cache_api_calls self.cache_file_name = cache_file_name if self.cache_api_calls: # load the cache, if any self.api_cache = self._load_cache() def _reset_cost_stats(self): """ Resets the cost statistics to zero. """ with self._cost_stats_lock: self._input_tokens = 0 self._output_tokens = 0 self._total_tokens = 0 self._model_calls = 0 self._cached_calls = 0 @config_manager.config_defaults(timeout="timeout") def _setup_from_config(self, timeout=None): """ Sets up the OpenAI API configurations for this client. """ # On Sept./Oct. 2025 I noticed that the OpenAI library was randomly hanging during requests, # and even the timeout was not being enforced. So after nearly going mad, I found out the # strategy below to cancel problematic requests. # Create httpx client with proper timeouts to prevent hanging # This ensures timeouts work at ALL levels: connection, read, write, pool httpx_client = httpx.Client( timeout=httpx.Timeout( timeout=timeout, # Overall timeout connect=10.0, # Connection timeout (fixed at 10s) read=timeout, # Read timeout (from config) write=10.0, # Write timeout (fixed at 10s) pool=5.0, # Pool timeout (fixed at 5s) ) ) # we set max_retries to 0 because we do our own retrying with customized exponential backoff self.client = OpenAI( api_key=os.getenv("OPENAI_API_KEY"), max_retries=0, http_client=httpx_client ) @config_manager.config_defaults( model="model", temperature="temperature", max_completion_tokens="max_completion_tokens", frequency_penalty="frequency_penalty", presence_penalty="presence_penalty", timeout="timeout", max_attempts="max_attempts", waiting_time="waiting_time", exponential_backoff_factor="exponential_backoff_factor", response_format=None, echo=None, ) def send_message( self, current_messages, dedent_messages=True, model=None, temperature=None, max_completion_tokens=None, top_p=None, frequency_penalty=None, presence_penalty=None, stop=None, timeout=None, max_attempts=None, waiting_time=None, exponential_backoff_factor=None, n=1, response_format=None, enable_pydantic_model_return=False, echo=False, ): """ Sends a message to the OpenAI API and returns the response. Args: current_messages (list): A list of dictionaries representing the conversation history. dedent_messages (bool): Whether to dedent the messages before sending them to the API. model (str): The ID of the model to use for generating the response. temperature (float): Controls the "creativity" of the response. Higher values result in more diverse responses. max_completion_tokens (int): The maximum number of tokens (words or punctuation marks) to generate in the response. top_p (float): Controls the "quality" of the response. Higher values result in more coherent responses. frequency_penalty (float): Controls the "repetition" of the response. Higher values result in less repetition. presence_penalty (float): Controls the "diversity" of the response. Higher values result in more diverse responses. stop (str): A string that, if encountered in the generated response, will cause the generation to stop. max_attempts (int): The maximum number of attempts to make before giving up on generating a response. timeout (int): The maximum number of seconds to wait for a response from the API. waiting_time (int): The number of seconds to wait between requests. exponential_backoff_factor (int): The factor by which to increase the waiting time between requests. n (int): The number of completions to generate. response_format: The format of the response, if any. echo (bool): Whether to echo the input message in the response. enable_pydantic_model_return (bool): Whether to enable Pydantic model return instead of dict when possible. Returns: A dictionary representing the generated response. """ from tinytroupe.clients import ( # avoid circular import InvalidRequestError, NonTerminalError, ) def aux_exponential_backoff(): nonlocal waiting_time # in case waiting time was initially set to 0 if waiting_time <= 0: waiting_time = 2 logger.info( f"Request failed. Waiting {waiting_time} seconds between requests..." ) time.sleep(waiting_time) # exponential backoff waiting_time = waiting_time * exponential_backoff_factor # setup the OpenAI configurations for this client. self._setup_from_config() # dedent the messages (field 'content' only) if needed (using textwrap) if dedent_messages: for message in current_messages: if "content" in message: message["content"] = utils.dedent(message["content"]) # We need to adapt the parameters to the API type, so we create a dictionary with them first chat_api_params = { "model": model, "messages": current_messages, "temperature": temperature, "max_completion_tokens": max_completion_tokens, "top_p": top_p, "frequency_penalty": frequency_penalty, "presence_penalty": presence_penalty, "stop": stop, "timeout": timeout, "stream": False, "n": n, } if response_format is not None: chat_api_params["response_format"] = response_format # remove any parameter that is None, so we use the API defaults chat_api_params = {k: v for k, v in chat_api_params.items() if v is not None} i = 0 while i < max_attempts: try: i += 1 try: logger.debug( f"Sending messages to OpenAI API. Token count={self._count_tokens(current_messages, model)}." ) except NotImplementedError: logger.debug(f"Token count not implemented for model {model}.") start_time = time.monotonic() logger.debug( f"Calling model with client class {self.__class__.__name__}." ) ############################################################### # call the model, either from the cache or from the API ############################################################### cache_key = str((model, chat_api_params)) # need string to be hashable pre_cached_response = self._get_cached_response(cache_key) should_wait_before_call = ( waiting_time > 0 and pre_cached_response is None ) if should_wait_before_call: logger.info( f"Waiting {waiting_time} seconds before next API request (to avoid throttling)..." ) time.sleep(waiting_time) with self._concurrency_slot(): response = None cached_response = ( pre_cached_response if pre_cached_response is not None else self._get_cached_response(cache_key) ) if cached_response is not None: response = cached_response else: response = self._raw_model_call(model, chat_api_params) if self.cache_api_calls: with self._cache_lock: existing = ( self.api_cache.get(cache_key) if hasattr(self, "api_cache") else None ) if existing is None: self.api_cache[cache_key] = response self._save_cache() else: response = existing raw_message = self._raw_model_response_extractor(response) # Update cost statistics self._update_cost_stats(response, cached_response is not None) logger.debug(f"Got response from API: {response}") end_time = time.monotonic() logger.debug( f"Got response in {end_time - start_time:.2f} seconds after {i} attempts." ) if enable_pydantic_model_return: return utils.to_pydantic_or_sanitized_dict( raw_message, model=response_format, ) else: return utils.sanitize_dict(raw_message) except InvalidRequestError as e: logger.error(f"[{i}] Invalid request error, won't retry: {e}") # there's no point in retrying if the request is invalid # so we return None right away return None except openai.BadRequestError as e: logger.error(f"[{i}] Invalid request error, won't retry: {e}") # there's no point in retrying if the request is invalid # so we return None right away return None except openai.RateLimitError: logger.warning( f"[{i}] Rate limit error, waiting a bit and trying again." ) aux_exponential_backoff() except NonTerminalError as e: logger.error(f"[{i}] Non-terminal error: {e}") aux_exponential_backoff() except APITimeoutError as e: logger.error(f"[{i}] API Timeout error: {e}") # no exponential timeout backoff here, just retry except Exception as e: logger.error(f"[{i}] {type(e).__name__} Error: {e}") aux_exponential_backoff() logger.error(f"Failed to get response after {max_attempts} attempts.") return None def _raw_model_call(self, model, chat_api_params): """ Calls the OpenAI API with the given parameters. Subclasses should override this method to implement their own API calls. """ # adjust parameters depending on the model if self._is_reasoning_model(model): # Reasoning models have slightly different parameters del chat_api_params["stream"] del chat_api_params["temperature"] del chat_api_params["top_p"] del chat_api_params["frequency_penalty"] del chat_api_params["presence_penalty"] chat_api_params["max_completion_tokens"] = chat_api_params[ "max_completion_tokens" ] del chat_api_params["max_completion_tokens"] chat_api_params["reasoning_effort"] = config_manager.get("reasoning_effort") # gpt-5 only supports temperature=1.0 (default), so remove temperature param if not default if "gpt-5" in model and "temperature" in chat_api_params: if chat_api_params["temperature"] != 1.0: logger.warning( f"gpt-5 only supports temperature=1.0, removing custom temperature={chat_api_params['temperature']}" ) del chat_api_params["temperature"] # To make the log cleaner, we remove the messages from the logged parameters logged_params = {k: v for k, v in chat_api_params.items() if k != "messages"} if "response_format" in chat_api_params: # to enforce the response format via pydantic, we need to use a different method if "stream" in chat_api_params: del chat_api_params["stream"] logger.debug( f"Calling LLM model (using .parse too) with these parameters: {logged_params}. Not showing 'messages' parameter." ) # complete message logger.debug( f" --> Complete messages sent to LLM: {chat_api_params['messages']}" ) result_message = self.client.beta.chat.completions.parse(**chat_api_params) return result_message else: logger.debug( f"Calling LLM model with these parameters: {logged_params}. Not showing 'messages' parameter." ) return self.client.chat.completions.create(**chat_api_params) def _is_reasoning_model(self, model): return "o1" in model or "o3" in model def _raw_model_response_extractor(self, response): """ Extracts the response from the API response. Subclasses should override this method to implement their own response extraction. """ return response.choices[0].message.to_dict() def _get_cached_response(self, cache_key): if not self.cache_api_calls: return None cache_store = getattr(self, "api_cache", None) if cache_store is None: return None with self._cache_lock: return cache_store.get(cache_key) @contextmanager def _concurrency_slot(self): if self._concurrency_semaphore is None: yield return self._concurrency_semaphore.acquire() try: yield finally: self._concurrency_semaphore.release() def _count_tokens(self, messages: list, model: str): """ Count the number of OpenAI tokens in a list of messages using tiktoken. Adapted from https://github.com/openai/openai-cookbook/blob/main/examples/How_to_count_tokens_with_tiktoken.ipynb Args: messages (list): A list of dictionaries representing the conversation history. model (str): The name of the model to use for encoding the string. """ try: try: encoding = tiktoken.encoding_for_model(model) except KeyError: logger.debug( "Token count: model not found. Using cl100k_base encoding." ) encoding = tiktoken.get_encoding("cl100k_base") if ( model in { "gpt-3.5-turbo-0613", "gpt-3.5-turbo-16k-0613", "gpt-4-0314", "gpt-4-32k-0314", "gpt-4-0613", "gpt-4-32k-0613", } or "o1" in model or "o3" in model ): # assuming o1/3 models work the same way tokens_per_message = 3 tokens_per_name = 1 elif model == "gpt-3.5-turbo-0301": tokens_per_message = ( 4 # every message follows <|start|>{role/name}\n{content}<|end|>\n ) tokens_per_name = -1 # if there's a name, the role is omitted elif "gpt-3.5-turbo" in model: logger.debug( "Token count: gpt-3.5-turbo may update over time. Returning num tokens assuming gpt-3.5-turbo-0613." ) return self._count_tokens(messages, model="gpt-3.5-turbo-0613") elif ("gpt-4" in model) or ("ppo" in model): logger.debug( "Token count: gpt-4 may update over time. Returning num tokens assuming gpt-4-0613." ) return self._count_tokens(messages, model="gpt-4-0613") elif "gpt-5" in model: logger.debug( "Token count: no info on GPT-5 tokenizer yet, so we are just reusing GPT-4's." ) return self._count_tokens(messages, model="gpt-4-0613") else: raise NotImplementedError( f"""_count_tokens() is not implemented for model {model}. See https://github.com/openai/openai-python/blob/main/chatml.md for information on how messages are converted to tokens.""" ) num_tokens = 0 for message in messages: num_tokens += tokens_per_message for key, value in message.items(): num_tokens += len(encoding.encode(value)) if key == "name": num_tokens += tokens_per_name num_tokens += 3 # every reply is primed with <|start|>assistant<|message|> return num_tokens except Exception as e: logger.error(f"Error counting tokens: {e}") return None def _save_cache(self): """ Saves the API cache to disk. We use pickle to do that because some obj are not JSON serializable. """ # use pickle to save the cache with open(self.cache_file_name, "wb") as f: pickle.dump(self.api_cache, f) def _load_cache(self): """ Loads the API cache from disk. """ if os.path.exists(self.cache_file_name): try: with open(self.cache_file_name, "rb") as f: return pickle.load(f) except (EOFError, pickle.UnpicklingError) as e: logger.warning(f"Cache file exists but could not be loaded: {e}. Starting with empty cache.") return {} return {} @config_manager.config_defaults(model="embedding_model") def get_embedding(self, text, model=None): """ Gets the embedding of the given text using the specified model. Args: text (str): The text to embed. model (str): The name of the model to use for embedding the text. Returns: The embedding of the text. """ response = self._raw_embedding_model_call(text, model) return self._raw_embedding_model_response_extractor(response) def _raw_embedding_model_call(self, text, model): """ Calls the OpenAI API to get the embedding of the given text. Subclasses should override this method to implement their own API calls. """ return self.client.embeddings.create(input=[text], model=model) def _raw_embedding_model_response_extractor(self, response): """ Extracts the embedding from the API response. Subclasses should override this method to implement their own response extraction. """ return response.data[0].embedding def _update_cost_stats(self, response, was_cached): """ Updates the cost statistics based on the API response. Args: response: The response object from the API. was_cached (bool): Whether this response came from cache. """ with self._cost_stats_lock: if was_cached: self._cached_calls += 1 else: self._model_calls += 1 # Extract token usage from response if available if hasattr(response, "usage"): usage = response.usage if hasattr(usage, "prompt_tokens"): self._input_tokens += usage.prompt_tokens if hasattr(usage, "completion_tokens"): self._output_tokens += usage.completion_tokens if hasattr(usage, "total_tokens"): self._total_tokens += usage.total_tokens # Log the latest values in debug mode logger.debug( f"Cost stats updated - Input tokens: {usage.prompt_tokens if hasattr(usage, 'prompt_tokens') else 0}, " f"Output tokens: {usage.completion_tokens if hasattr(usage, 'completion_tokens') else 0}, " f"Total tokens: {usage.total_tokens if hasattr(usage, 'total_tokens') else 0}, " f"Cached: {was_cached}" ) def get_cost_stats(self): """ Returns the current cost statistics. Returns: dict: A dictionary containing cost statistics with keys: - input_tokens: Number of input/prompt tokens used - output_tokens: Number of output/completion tokens used - total_tokens: Total number of tokens used - model_calls: Number of actual API calls made - cached_calls: Number of calls served from cache """ with self._cost_stats_lock: return { "input_tokens": self._input_tokens, "output_tokens": self._output_tokens, "total_tokens": self._total_tokens, "model_calls": self._model_calls, "cached_calls": self._cached_calls, } def pretty_print_cost_stats(self): """ Pretty prints the cost statistics to the console. """ stats = self.get_cost_stats() print("\n" + "=" * 60) print("LLM API COST STATISTICS") print("=" * 60) print(f"Input tokens: {stats['input_tokens']:,}") print(f"Output tokens: {stats['output_tokens']:,}") print(f"Total tokens: {stats['total_tokens']:,}") print(f"Model API calls: {stats['model_calls']:,}") print(f"Cached calls: {stats['cached_calls']:,}") print(f"Total calls: {stats['model_calls'] + stats['cached_calls']:,}") if stats["model_calls"] > 0: print( f"Avg tokens per call: {stats['total_tokens'] / stats['model_calls']:.1f}" ) print("=" * 60 + "\n") def reset_cost_stats(self): """ Resets the cost statistics. This is the public method that users should call. """ self._reset_cost_stats() logger.info("Cost statistics have been reset.")Subclasses
Methods
def get_cost_stats(self)-
Returns the current cost statistics.
Returns
dict- A dictionary containing cost statistics with keys: - input_tokens: Number of input/prompt tokens used - output_tokens: Number of output/completion tokens used - total_tokens: Total number of tokens used - model_calls: Number of actual API calls made - cached_calls: Number of calls served from cache
Expand source code
def get_cost_stats(self): """ Returns the current cost statistics. Returns: dict: A dictionary containing cost statistics with keys: - input_tokens: Number of input/prompt tokens used - output_tokens: Number of output/completion tokens used - total_tokens: Total number of tokens used - model_calls: Number of actual API calls made - cached_calls: Number of calls served from cache """ with self._cost_stats_lock: return { "input_tokens": self._input_tokens, "output_tokens": self._output_tokens, "total_tokens": self._total_tokens, "model_calls": self._model_calls, "cached_calls": self._cached_calls, } def get_embedding(self, text, model=None)-
Gets the embedding of the given text using the specified model.
Args: text (str): The text to embed. model (str): The name of the model to use for embedding the text.
Returns: The embedding of the text.
Expand source code
@config_manager.config_defaults(model="embedding_model") def get_embedding(self, text, model=None): """ Gets the embedding of the given text using the specified model. Args: text (str): The text to embed. model (str): The name of the model to use for embedding the text. Returns: The embedding of the text. """ response = self._raw_embedding_model_call(text, model) return self._raw_embedding_model_response_extractor(response) def pretty_print_cost_stats(self)-
Pretty prints the cost statistics to the console.
Expand source code
def pretty_print_cost_stats(self): """ Pretty prints the cost statistics to the console. """ stats = self.get_cost_stats() print("\n" + "=" * 60) print("LLM API COST STATISTICS") print("=" * 60) print(f"Input tokens: {stats['input_tokens']:,}") print(f"Output tokens: {stats['output_tokens']:,}") print(f"Total tokens: {stats['total_tokens']:,}") print(f"Model API calls: {stats['model_calls']:,}") print(f"Cached calls: {stats['cached_calls']:,}") print(f"Total calls: {stats['model_calls'] + stats['cached_calls']:,}") if stats["model_calls"] > 0: print( f"Avg tokens per call: {stats['total_tokens'] / stats['model_calls']:.1f}" ) print("=" * 60 + "\n") def reset_cost_stats(self)-
Resets the cost statistics. This is the public method that users should call.
Expand source code
def reset_cost_stats(self): """ Resets the cost statistics. This is the public method that users should call. """ self._reset_cost_stats() logger.info("Cost statistics have been reset.") def send_message(self, current_messages, dedent_messages=True, model=None, temperature=None, max_completion_tokens=None, top_p=None, frequency_penalty=None, presence_penalty=None, stop=None, timeout=None, max_attempts=None, waiting_time=None, exponential_backoff_factor=None, n=1, response_format=None, enable_pydantic_model_return=False, echo=False)-
Sends a message to the OpenAI API and returns the response.
Args: current_messages (list): A list of dictionaries representing the conversation history. dedent_messages (bool): Whether to dedent the messages before sending them to the API. model (str): The ID of the model to use for generating the response. temperature (float): Controls the "creativity" of the response. Higher values result in more diverse responses. max_completion_tokens (int): The maximum number of tokens (words or punctuation marks) to generate in the response. top_p (float): Controls the "quality" of the response. Higher values result in more coherent responses. frequency_penalty (float): Controls the "repetition" of the response. Higher values result in less repetition. presence_penalty (float): Controls the "diversity" of the response. Higher values result in more diverse responses. stop (str): A string that, if encountered in the generated response, will cause the generation to stop. max_attempts (int): The maximum number of attempts to make before giving up on generating a response. timeout (int): The maximum number of seconds to wait for a response from the API. waiting_time (int): The number of seconds to wait between requests. exponential_backoff_factor (int): The factor by which to increase the waiting time between requests. n (int): The number of completions to generate. response_format: The format of the response, if any. echo (bool): Whether to echo the input message in the response. enable_pydantic_model_return (bool): Whether to enable Pydantic model return instead of dict when possible.
Returns: A dictionary representing the generated response.
Expand source code
@config_manager.config_defaults( model="model", temperature="temperature", max_completion_tokens="max_completion_tokens", frequency_penalty="frequency_penalty", presence_penalty="presence_penalty", timeout="timeout", max_attempts="max_attempts", waiting_time="waiting_time", exponential_backoff_factor="exponential_backoff_factor", response_format=None, echo=None, ) def send_message( self, current_messages, dedent_messages=True, model=None, temperature=None, max_completion_tokens=None, top_p=None, frequency_penalty=None, presence_penalty=None, stop=None, timeout=None, max_attempts=None, waiting_time=None, exponential_backoff_factor=None, n=1, response_format=None, enable_pydantic_model_return=False, echo=False, ): """ Sends a message to the OpenAI API and returns the response. Args: current_messages (list): A list of dictionaries representing the conversation history. dedent_messages (bool): Whether to dedent the messages before sending them to the API. model (str): The ID of the model to use for generating the response. temperature (float): Controls the "creativity" of the response. Higher values result in more diverse responses. max_completion_tokens (int): The maximum number of tokens (words or punctuation marks) to generate in the response. top_p (float): Controls the "quality" of the response. Higher values result in more coherent responses. frequency_penalty (float): Controls the "repetition" of the response. Higher values result in less repetition. presence_penalty (float): Controls the "diversity" of the response. Higher values result in more diverse responses. stop (str): A string that, if encountered in the generated response, will cause the generation to stop. max_attempts (int): The maximum number of attempts to make before giving up on generating a response. timeout (int): The maximum number of seconds to wait for a response from the API. waiting_time (int): The number of seconds to wait between requests. exponential_backoff_factor (int): The factor by which to increase the waiting time between requests. n (int): The number of completions to generate. response_format: The format of the response, if any. echo (bool): Whether to echo the input message in the response. enable_pydantic_model_return (bool): Whether to enable Pydantic model return instead of dict when possible. Returns: A dictionary representing the generated response. """ from tinytroupe.clients import ( # avoid circular import InvalidRequestError, NonTerminalError, ) def aux_exponential_backoff(): nonlocal waiting_time # in case waiting time was initially set to 0 if waiting_time <= 0: waiting_time = 2 logger.info( f"Request failed. Waiting {waiting_time} seconds between requests..." ) time.sleep(waiting_time) # exponential backoff waiting_time = waiting_time * exponential_backoff_factor # setup the OpenAI configurations for this client. self._setup_from_config() # dedent the messages (field 'content' only) if needed (using textwrap) if dedent_messages: for message in current_messages: if "content" in message: message["content"] = utils.dedent(message["content"]) # We need to adapt the parameters to the API type, so we create a dictionary with them first chat_api_params = { "model": model, "messages": current_messages, "temperature": temperature, "max_completion_tokens": max_completion_tokens, "top_p": top_p, "frequency_penalty": frequency_penalty, "presence_penalty": presence_penalty, "stop": stop, "timeout": timeout, "stream": False, "n": n, } if response_format is not None: chat_api_params["response_format"] = response_format # remove any parameter that is None, so we use the API defaults chat_api_params = {k: v for k, v in chat_api_params.items() if v is not None} i = 0 while i < max_attempts: try: i += 1 try: logger.debug( f"Sending messages to OpenAI API. Token count={self._count_tokens(current_messages, model)}." ) except NotImplementedError: logger.debug(f"Token count not implemented for model {model}.") start_time = time.monotonic() logger.debug( f"Calling model with client class {self.__class__.__name__}." ) ############################################################### # call the model, either from the cache or from the API ############################################################### cache_key = str((model, chat_api_params)) # need string to be hashable pre_cached_response = self._get_cached_response(cache_key) should_wait_before_call = ( waiting_time > 0 and pre_cached_response is None ) if should_wait_before_call: logger.info( f"Waiting {waiting_time} seconds before next API request (to avoid throttling)..." ) time.sleep(waiting_time) with self._concurrency_slot(): response = None cached_response = ( pre_cached_response if pre_cached_response is not None else self._get_cached_response(cache_key) ) if cached_response is not None: response = cached_response else: response = self._raw_model_call(model, chat_api_params) if self.cache_api_calls: with self._cache_lock: existing = ( self.api_cache.get(cache_key) if hasattr(self, "api_cache") else None ) if existing is None: self.api_cache[cache_key] = response self._save_cache() else: response = existing raw_message = self._raw_model_response_extractor(response) # Update cost statistics self._update_cost_stats(response, cached_response is not None) logger.debug(f"Got response from API: {response}") end_time = time.monotonic() logger.debug( f"Got response in {end_time - start_time:.2f} seconds after {i} attempts." ) if enable_pydantic_model_return: return utils.to_pydantic_or_sanitized_dict( raw_message, model=response_format, ) else: return utils.sanitize_dict(raw_message) except InvalidRequestError as e: logger.error(f"[{i}] Invalid request error, won't retry: {e}") # there's no point in retrying if the request is invalid # so we return None right away return None except openai.BadRequestError as e: logger.error(f"[{i}] Invalid request error, won't retry: {e}") # there's no point in retrying if the request is invalid # so we return None right away return None except openai.RateLimitError: logger.warning( f"[{i}] Rate limit error, waiting a bit and trying again." ) aux_exponential_backoff() except NonTerminalError as e: logger.error(f"[{i}] Non-terminal error: {e}") aux_exponential_backoff() except APITimeoutError as e: logger.error(f"[{i}] API Timeout error: {e}") # no exponential timeout backoff here, just retry except Exception as e: logger.error(f"[{i}] {type(e).__name__} Error: {e}") aux_exponential_backoff() logger.error(f"Failed to get response after {max_attempts} attempts.") return None def set_api_cache(self, cache_api_calls, cache_file_name=None)-
Enables or disables the caching of API calls.
Args: cache_file_name (str): The name of the file to use for caching API calls.
Expand source code
@config_manager.config_defaults(cache_file_name="cache_file_name") def set_api_cache(self, cache_api_calls, cache_file_name=None): """ Enables or disables the caching of API calls. Args: cache_file_name (str): The name of the file to use for caching API calls. """ self.cache_api_calls = cache_api_calls self.cache_file_name = cache_file_name if self.cache_api_calls: # load the cache, if any self.api_cache = self._load_cache()