Source code for autogen_ext.cache_store.redis

from typing import Any, Dict, Optional, TypeVar, cast

import redis
from autogen_core import CacheStore, Component
from pydantic import BaseModel
from typing_extensions import Self

T = TypeVar("T")


[docs] class RedisStoreConfig(BaseModel): """Configuration for RedisStore""" host: str = "localhost" port: int = 6379 db: int = 0 # Add other relevant redis connection parameters username: Optional[str] = None password: Optional[str] = None ssl: bool = False socket_timeout: Optional[float] = None
[docs] class RedisStore(CacheStore[T], Component[RedisStoreConfig]): """ A typed CacheStore implementation that uses redis as the underlying storage. See :class:`~autogen_ext.models.cache.ChatCompletionCache` for an example of usage. Args: cache_instance: An instance of `redis.Redis`. The user is responsible for managing the Redis instance's lifetime. """ component_config_schema = RedisStoreConfig component_provider_override = "autogen_ext.cache_store.redis.RedisStore" def __init__(self, redis_instance: redis.Redis): self.cache = redis_instance
[docs] def get(self, key: str, default: Optional[T] = None) -> Optional[T]: value = cast(Optional[T], self.cache.get(key)) if value is None: return default return value
[docs] def set(self, key: str, value: T) -> None: self.cache.set(key, cast(Any, value))
[docs] def _to_config(self) -> RedisStoreConfig: # Extract connection info from redis instance connection_pool = self.cache.connection_pool connection_kwargs: Dict[str, Any] = connection_pool.connection_kwargs # type: ignore[reportUnknownMemberType] username = connection_kwargs.get("username") password = connection_kwargs.get("password") socket_timeout = connection_kwargs.get("socket_timeout") return RedisStoreConfig( host=str(connection_kwargs.get("host", "localhost")), port=int(connection_kwargs.get("port", 6379)), db=int(connection_kwargs.get("db", 0)), username=str(username) if username is not None else None, password=str(password) if password is not None else None, ssl=bool(connection_kwargs.get("ssl", False)), socket_timeout=float(socket_timeout) if socket_timeout is not None else None, )
[docs] @classmethod def _from_config(cls, config: RedisStoreConfig) -> Self: # Create new redis instance from config redis_instance = redis.Redis( host=config.host, port=config.port, db=config.db, username=config.username, password=config.password, ssl=config.ssl, socket_timeout=config.socket_timeout, ) return cls(redis_instance=redis_instance)