[docs]classRedisStoreConfig(BaseModel):"""Configuration for RedisStore"""host:str="localhost"port:int=6379db:int=0# Add other relevant redis connection parametersusername:Optional[str]=Nonepassword:Optional[str]=Nonessl:bool=Falsesocket_timeout:Optional[float]=None
[docs]classRedisStore(CacheStore[T],Component[RedisStoreConfig]):""" A typed CacheStore implementation that uses redis as the underlying storage. See :class:`~autogen_ext.models.cache.ChatCompletionCache` for an example of usage. Args: cache_instance: An instance of `redis.Redis`. The user is responsible for managing the Redis instance's lifetime. """component_config_schema=RedisStoreConfigcomponent_provider_override="autogen_ext.cache_store.redis.RedisStore"def__init__(self,redis_instance:redis.Redis):self.cache=redis_instance
[docs]def_to_config(self)->RedisStoreConfig:# Extract connection info from redis instanceconnection_pool=self.cache.connection_poolconnection_kwargs:Dict[str,Any]=connection_pool.connection_kwargs# type: ignore[reportUnknownMemberType]username=connection_kwargs.get("username")password=connection_kwargs.get("password")socket_timeout=connection_kwargs.get("socket_timeout")returnRedisStoreConfig(host=str(connection_kwargs.get("host","localhost")),port=int(connection_kwargs.get("port",6379)),db=int(connection_kwargs.get("db",0)),username=str(username)ifusernameisnotNoneelseNone,password=str(password)ifpasswordisnotNoneelseNone,ssl=bool(connection_kwargs.get("ssl",False)),socket_timeout=float(socket_timeout)ifsocket_timeoutisnotNoneelseNone,)
[docs]@classmethoddef_from_config(cls,config:RedisStoreConfig)->Self:# Create new redis instance from configredis_instance=redis.Redis(host=config.host,port=config.port,db=config.db,username=config.username,password=config.password,ssl=config.ssl,socket_timeout=config.socket_timeout,)returncls(redis_instance=redis_instance)