Source code for autogen_ext.models.openai._openai_client
importasyncioimportinspectimportjsonimportloggingimportmathimportreimportwarningsfromasyncioimportTaskfromtypingimport(Any,AsyncGenerator,Dict,List,Mapping,Optional,Sequence,Set,Type,Union,cast,)importtiktokenfromautogen_coreimport(EVENT_LOGGER_NAME,TRACE_LOGGER_NAME,CancellationToken,Component,FunctionCall,Image,MessageHandlerContext,)fromautogen_core.loggingimportLLMCallEventfromautogen_core.modelsimport(AssistantMessage,ChatCompletionClient,ChatCompletionTokenLogprob,CreateResult,FinishReasons,FunctionExecutionResultMessage,LLMMessage,ModelCapabilities,# type: ignoreModelFamily,ModelInfo,RequestUsage,SystemMessage,TopLogprob,UserMessage,)fromautogen_core.toolsimportTool,ToolSchemafromopenaiimportAsyncAzureOpenAI,AsyncOpenAIfromopenai.types.chatimport(ChatCompletion,ChatCompletionAssistantMessageParam,ChatCompletionContentPartImageParam,ChatCompletionContentPartParam,ChatCompletionContentPartTextParam,ChatCompletionMessageParam,ChatCompletionMessageToolCallParam,ChatCompletionRole,ChatCompletionSystemMessageParam,ChatCompletionToolMessageParam,ChatCompletionToolParam,ChatCompletionUserMessageParam,ParsedChatCompletion,ParsedChoice,completion_create_params,)fromopenai.types.chat.chat_completionimportChoicefromopenai.types.chat.chat_completion_chunkimportChoiceasChunkChoicefromopenai.types.shared_paramsimportFunctionDefinition,FunctionParametersfrompydanticimportBaseModelfromtyping_extensionsimportSelf,Unpackfrom.import_model_infofrom.configimport(AzureOpenAIClientConfiguration,AzureOpenAIClientConfigurationConfigModel,OpenAIClientConfiguration,OpenAIClientConfigurationConfigModel,)logger=logging.getLogger(EVENT_LOGGER_NAME)trace_logger=logging.getLogger(TRACE_LOGGER_NAME)openai_init_kwargs=set(inspect.getfullargspec(AsyncOpenAI.__init__).kwonlyargs)aopenai_init_kwargs=set(inspect.getfullargspec(AsyncAzureOpenAI.__init__).kwonlyargs)create_kwargs=set(completion_create_params.CompletionCreateParamsBase.__annotations__.keys())|set(("timeout","stream"))# Only single choice alloweddisallowed_create_args=set(["stream","messages","function_call","functions","n"])required_create_args:Set[str]=set(["model"])def_azure_openai_client_from_config(config:Mapping[str,Any])->AsyncAzureOpenAI:# Take a copycopied_config=dict(config).copy()# Shave down the config to just the AzureOpenAIChatCompletionClient kwargsazure_config={k:vfork,vincopied_config.items()ifkinaopenai_init_kwargs}returnAsyncAzureOpenAI(**azure_config)def_openai_client_from_config(config:Mapping[str,Any])->AsyncOpenAI:# Shave down the config to just the OpenAI kwargsopenai_config={k:vfork,vinconfig.items()ifkinopenai_init_kwargs}returnAsyncOpenAI(**openai_config)def_create_args_from_config(config:Mapping[str,Any])->Dict[str,Any]:create_args={k:vfork,vinconfig.items()ifkincreate_kwargs}create_args_keys=set(create_args.keys())ifnotrequired_create_args.issubset(create_args_keys):raiseValueError(f"Required create args are missing: {required_create_args-create_args_keys}")ifdisallowed_create_args.intersection(create_args_keys):raiseValueError(f"Disallowed create args are present: {disallowed_create_args.intersection(create_args_keys)}")returncreate_args# TODO check types# oai_system_message_schema = type2schema(ChatCompletionSystemMessageParam)# oai_user_message_schema = type2schema(ChatCompletionUserMessageParam)# oai_assistant_message_schema = type2schema(ChatCompletionAssistantMessageParam)# oai_tool_message_schema = type2schema(ChatCompletionToolMessageParam)deftype_to_role(message:LLMMessage)->ChatCompletionRole:ifisinstance(message,SystemMessage):return"system"elifisinstance(message,UserMessage):return"user"elifisinstance(message,AssistantMessage):return"assistant"else:return"tool"defuser_message_to_oai(message:UserMessage)->ChatCompletionUserMessageParam:assert_valid_name(message.source)ifisinstance(message.content,str):returnChatCompletionUserMessageParam(content=message.content,role="user",name=message.source,)else:parts:List[ChatCompletionContentPartParam]=[]forpartinmessage.content:ifisinstance(part,str):oai_part=ChatCompletionContentPartTextParam(text=part,type="text",)parts.append(oai_part)elifisinstance(part,Image):# TODO: support url based images# TODO: support specifying detailsparts.append(cast(ChatCompletionContentPartImageParam,part.to_openai_format()))else:raiseValueError(f"Unknown content type: {part}")returnChatCompletionUserMessageParam(content=parts,role="user",name=message.source,)defsystem_message_to_oai(message:SystemMessage)->ChatCompletionSystemMessageParam:returnChatCompletionSystemMessageParam(content=message.content,role="system",)deffunc_call_to_oai(message:FunctionCall)->ChatCompletionMessageToolCallParam:returnChatCompletionMessageToolCallParam(id=message.id,function={"arguments":message.arguments,"name":message.name,},type="function",)deftool_message_to_oai(message:FunctionExecutionResultMessage,)->Sequence[ChatCompletionToolMessageParam]:return[ChatCompletionToolMessageParam(content=x.content,role="tool",tool_call_id=x.call_id)forxinmessage.content]defassistant_message_to_oai(message:AssistantMessage,)->ChatCompletionAssistantMessageParam:assert_valid_name(message.source)ifisinstance(message.content,list):returnChatCompletionAssistantMessageParam(tool_calls=[func_call_to_oai(x)forxinmessage.content],role="assistant",name=message.source,)else:returnChatCompletionAssistantMessageParam(content=message.content,role="assistant",name=message.source,)defto_oai_type(message:LLMMessage)->Sequence[ChatCompletionMessageParam]:ifisinstance(message,SystemMessage):return[system_message_to_oai(message)]elifisinstance(message,UserMessage):return[user_message_to_oai(message)]elifisinstance(message,AssistantMessage):return[assistant_message_to_oai(message)]else:returntool_message_to_oai(message)defcalculate_vision_tokens(image:Image,detail:str="auto")->int:MAX_LONG_EDGE=2048BASE_TOKEN_COUNT=85TOKENS_PER_TILE=170MAX_SHORT_EDGE=768TILE_SIZE=512ifdetail=="low":returnBASE_TOKEN_COUNTwidth,height=image.image.size# Scale down to fit within a MAX_LONG_EDGE x MAX_LONG_EDGE square if necessaryifwidth>MAX_LONG_EDGEorheight>MAX_LONG_EDGE:aspect_ratio=width/heightifaspect_ratio>1:# Width is greater than heightwidth=MAX_LONG_EDGEheight=int(MAX_LONG_EDGE/aspect_ratio)else:# Height is greater than or equal to widthheight=MAX_LONG_EDGEwidth=int(MAX_LONG_EDGE*aspect_ratio)# Resize such that the shortest side is MAX_SHORT_EDGE if both dimensions exceed MAX_SHORT_EDGEaspect_ratio=width/heightifwidth>MAX_SHORT_EDGEandheight>MAX_SHORT_EDGE:ifaspect_ratio>1:# Width is greater than heightheight=MAX_SHORT_EDGEwidth=int(MAX_SHORT_EDGE*aspect_ratio)else:# Height is greater than or equal to widthwidth=MAX_SHORT_EDGEheight=int(MAX_SHORT_EDGE/aspect_ratio)# Calculate the number of tiles based on TILE_SIZEtiles_width=math.ceil(width/TILE_SIZE)tiles_height=math.ceil(height/TILE_SIZE)total_tiles=tiles_width*tiles_height# Calculate the total tokens based on the number of tiles and the base token counttotal_tokens=BASE_TOKEN_COUNT+TOKENS_PER_TILE*total_tilesreturntotal_tokensdef_add_usage(usage1:RequestUsage,usage2:RequestUsage)->RequestUsage:returnRequestUsage(prompt_tokens=usage1.prompt_tokens+usage2.prompt_tokens,completion_tokens=usage1.completion_tokens+usage2.completion_tokens,)defconvert_tools(tools:Sequence[Tool|ToolSchema],)->List[ChatCompletionToolParam]:result:List[ChatCompletionToolParam]=[]fortoolintools:ifisinstance(tool,Tool):tool_schema=tool.schemaelse:assertisinstance(tool,dict)tool_schema=toolresult.append(ChatCompletionToolParam(type="function",function=FunctionDefinition(name=tool_schema["name"],description=(tool_schema["description"]if"description"intool_schemaelse""),parameters=(cast(FunctionParameters,tool_schema["parameters"])if"parameters"intool_schemaelse{}),),))# Check if all tools have valid names.fortool_paraminresult:assert_valid_name(tool_param["function"]["name"])returnresultdefnormalize_name(name:str)->str:""" LLMs sometimes ask functions while ignoring their own format requirements, this function should be used to replace invalid characters with "_". Prefer _assert_valid_name for validating user configuration or input """returnre.sub(r"[^a-zA-Z0-9_-]","_",name)[:64]defassert_valid_name(name:str)->str:""" Ensure that configured names are valid, raises ValueError if not. For munging LLM responses use _normalize_name to ensure LLM specified names don't break the API. """ifnotre.match(r"^[a-zA-Z0-9_-]+$",name):raiseValueError(f"Invalid name: {name}. Only letters, numbers, '_' and '-' are allowed.")iflen(name)>64:raiseValueError(f"Invalid name: {name}. Name must be less than 64 characters.")returnnamedefnormalize_stop_reason(stop_reason:str|None)->FinishReasons:ifstop_reasonisNone:return"unknown"# Convert to lower casestop_reason=stop_reason.lower()KNOWN_STOP_MAPPINGS:Dict[str,FinishReasons]={"end_turn":"stop","tool_calls":"function_calls",}returnKNOWN_STOP_MAPPINGS.get(stop_reason,"unknown")
[docs]classBaseOpenAIChatCompletionClient(ChatCompletionClient):def__init__(self,client:Union[AsyncOpenAI,AsyncAzureOpenAI],*,create_args:Dict[str,Any],model_capabilities:Optional[ModelCapabilities]=None,# type: ignoremodel_info:Optional[ModelInfo]=None,):self._client=clientifmodel_capabilitiesisNoneandmodel_infoisNone:try:self._model_info=_model_info.get_info(create_args["model"])exceptKeyErroraserr:raiseValueError("model_info is required when model name is not a valid OpenAI model")fromerrelifmodel_capabilitiesisnotNoneandmodel_infoisnotNone:raiseValueError("model_capabilities and model_info are mutually exclusive")elifmodel_capabilitiesisnotNoneandmodel_infoisNone:warnings.warn("model_capabilities is deprecated, use model_info instead",DeprecationWarning,stacklevel=2)info=cast(ModelInfo,model_capabilities)info["family"]=ModelFamily.UNKNOWNself._model_info=infoelifmodel_capabilitiesisNoneandmodel_infoisnotNone:self._model_info=model_infoself._resolved_model:Optional[str]=Noneif"model"increate_args:self._resolved_model=_model_info.resolve_model(create_args["model"])if("response_format"increate_argsandcreate_args["response_format"]["type"]=="json_object"andnotself._model_info["json_output"]):raiseValueError("Model does not support JSON output")self._create_args=create_argsself._total_usage=RequestUsage(prompt_tokens=0,completion_tokens=0)self._actual_usage=RequestUsage(prompt_tokens=0,completion_tokens=0)
[docs]asyncdefcreate(self,messages:Sequence[LLMMessage],*,tools:Sequence[Tool|ToolSchema]=[],json_output:Optional[bool]=None,extra_create_args:Mapping[str,Any]={},cancellation_token:Optional[CancellationToken]=None,)->CreateResult:# Make sure all extra_create_args are validextra_create_args_keys=set(extra_create_args.keys())ifnotcreate_kwargs.issuperset(extra_create_args_keys):raiseValueError(f"Extra create args are invalid: {extra_create_args_keys-create_kwargs}")# Copy the create args and overwrite anything in extra_create_argscreate_args=self._create_args.copy()create_args.update(extra_create_args)# Declare use_beta_clientuse_beta_client:bool=Falseresponse_format_value:Optional[Type[BaseModel]]=Noneif"response_format"increate_args:value=create_args["response_format"]# If value is a Pydantic model class, use the beta clientifisinstance(value,type)andissubclass(value,BaseModel):response_format_value=valueuse_beta_client=Trueelse:# response_format_value is not a Pydantic model classuse_beta_client=Falseresponse_format_value=None# Remove 'response_format' from create_args to prevent passing it twicecreate_args_no_response_format={k:vfork,vincreate_args.items()ifk!="response_format"}# TODO: allow custom handling.# For now we raise an error if images are present and vision is not supportedifself.model_info["vision"]isFalse:formessageinmessages:ifisinstance(message,UserMessage):ifisinstance(message.content,list)andany(isinstance(x,Image)forxinmessage.content):raiseValueError("Model does not support vision and image was provided")ifjson_outputisnotNone:ifself.model_info["json_output"]isFalseandjson_outputisTrue:raiseValueError("Model does not support JSON output")ifjson_outputisTrue:create_args["response_format"]={"type":"json_object"}else:create_args["response_format"]={"type":"text"}ifself.model_info["json_output"]isFalseandjson_outputisTrue:raiseValueError("Model does not support JSON output")oai_messages_nested=[to_oai_type(m)forminmessages]oai_messages=[itemforsublistinoai_messages_nestedforiteminsublist]ifself.model_info["function_calling"]isFalseandlen(tools)>0:raiseValueError("Model does not support function calling")future:Union[Task[ParsedChatCompletion[BaseModel]],Task[ChatCompletion]]iflen(tools)>0:converted_tools=convert_tools(tools)ifuse_beta_client:# Pass response_format_value if it's not Noneifresponse_format_valueisnotNone:future=asyncio.ensure_future(self._client.beta.chat.completions.parse(messages=oai_messages,tools=converted_tools,response_format=response_format_value,**create_args_no_response_format,))else:future=asyncio.ensure_future(self._client.beta.chat.completions.parse(messages=oai_messages,tools=converted_tools,**create_args_no_response_format,))else:future=asyncio.ensure_future(self._client.chat.completions.create(messages=oai_messages,stream=False,tools=converted_tools,**create_args,))else:ifuse_beta_client:ifresponse_format_valueisnotNone:future=asyncio.ensure_future(self._client.beta.chat.completions.parse(messages=oai_messages,response_format=response_format_value,**create_args_no_response_format,))else:future=asyncio.ensure_future(self._client.beta.chat.completions.parse(messages=oai_messages,**create_args_no_response_format,))else:future=asyncio.ensure_future(self._client.chat.completions.create(messages=oai_messages,stream=False,**create_args,))ifcancellation_tokenisnotNone:cancellation_token.link_future(future)result:Union[ParsedChatCompletion[BaseModel],ChatCompletion]=awaitfutureifuse_beta_client:result=cast(ParsedChatCompletion[Any],result)usage=RequestUsage(# TODO backup token countingprompt_tokens=result.usage.prompt_tokensifresult.usageisnotNoneelse0,completion_tokens=(result.usage.completion_tokensifresult.usageisnotNoneelse0),)# If we are running in the context of a handler we can get the agent_idtry:agent_id=MessageHandlerContext.agent_id()exceptRuntimeError:agent_id=Nonelogger.info(LLMCallEvent(messages=cast(Dict[str,Any],oai_messages),response=result.model_dump(),prompt_tokens=usage.prompt_tokens,completion_tokens=usage.completion_tokens,agent_id=agent_id,))ifself._resolved_modelisnotNone:ifself._resolved_model!=result.model:warnings.warn(f"Resolved model mismatch: {self._resolved_model} != {result.model}. Model mapping may be incorrect.",stacklevel=2,)# Limited to a single choice currently.choice:Union[ParsedChoice[Any],ParsedChoice[BaseModel],Choice]=result.choices[0]ifchoice.finish_reason=="function_call":raiseValueError("Function calls are not supported in this context")content:Union[str,List[FunctionCall]]ifchoice.finish_reason=="tool_calls":assertchoice.message.tool_callsisnotNoneassertchoice.message.function_callisNone# NOTE: If OAI response type changes, this will need to be updatedcontent=[FunctionCall(id=x.id,arguments=x.function.arguments,name=normalize_name(x.function.name),)forxinchoice.message.tool_calls]finish_reason="function_calls"else:finish_reason=choice.finish_reasoncontent=choice.message.contentor""logprobs:Optional[List[ChatCompletionTokenLogprob]]=Noneifchoice.logprobsandchoice.logprobs.content:logprobs=[ChatCompletionTokenLogprob(token=x.token,logprob=x.logprob,top_logprobs=[TopLogprob(logprob=y.logprob,bytes=y.bytes)foryinx.top_logprobs],bytes=x.bytes,)forxinchoice.logprobs.content]response=CreateResult(finish_reason=finish_reason,# type: ignorecontent=content,usage=usage,cached=False,logprobs=logprobs,)self._total_usage=_add_usage(self._total_usage,usage)self._actual_usage=_add_usage(self._actual_usage,usage)# TODO - why is this cast needed?returnresponse
[docs]asyncdefcreate_stream(self,messages:Sequence[LLMMessage],*,tools:Sequence[Tool|ToolSchema]=[],json_output:Optional[bool]=None,extra_create_args:Mapping[str,Any]={},cancellation_token:Optional[CancellationToken]=None,max_consecutive_empty_chunk_tolerance:int=0,)->AsyncGenerator[Union[str,CreateResult],None]:""" Creates an AsyncGenerator that will yield a stream of chat completions based on the provided messages and tools. Args: messages (Sequence[LLMMessage]): A sequence of messages to be processed. tools (Sequence[Tool | ToolSchema], optional): A sequence of tools to be used in the completion. Defaults to `[]`. json_output (Optional[bool], optional): If True, the output will be in JSON format. Defaults to None. extra_create_args (Mapping[str, Any], optional): Additional arguments for the creation process. Default to `{}`. cancellation_token (Optional[CancellationToken], optional): A token to cancel the operation. Defaults to None. max_consecutive_empty_chunk_tolerance (int): The maximum number of consecutive empty chunks to tolerate before raising a ValueError. This seems to only be needed to set when using `AzureOpenAIChatCompletionClient`. Defaults to 0. Yields: AsyncGenerator[Union[str, CreateResult], None]: A generator yielding the completion results as they are produced. In streaming, the default behaviour is not return token usage counts. See: [OpenAI API reference for possible args](https://platform.openai.com/docs/api-reference/chat/create). However `extra_create_args={"stream_options": {"include_usage": True}}` will (if supported by the accessed API) return a final chunk with usage set to a RequestUsage object having prompt and completion token counts, all preceding chunks will have usage as None. See: [stream_options](https://platform.openai.com/docs/api-reference/chat/create#chat-create-stream_options). Other examples of OPENAI supported arguments that can be included in `extra_create_args`: - `temperature` (float): Controls the randomness of the output. Higher values (e.g., 0.8) make the output more random, while lower values (e.g., 0.2) make it more focused and deterministic. - `max_tokens` (int): The maximum number of tokens to generate in the completion. - `top_p` (float): An alternative to sampling with temperature, called nucleus sampling, where the model considers the results of the tokens with top_p probability mass. - `frequency_penalty` (float): A value between -2.0 and 2.0 that penalizes new tokens based on their existing frequency in the text so far, decreasing the likelihood of repeated phrases. - `presence_penalty` (float): A value between -2.0 and 2.0 that penalizes new tokens based on whether they appear in the text so far, encouraging the model to talk about new topics. """# Make sure all extra_create_args are validextra_create_args_keys=set(extra_create_args.keys())ifnotcreate_kwargs.issuperset(extra_create_args_keys):raiseValueError(f"Extra create args are invalid: {extra_create_args_keys-create_kwargs}")# Copy the create args and overwrite anything in extra_create_argscreate_args=self._create_args.copy()create_args.update(extra_create_args)oai_messages_nested=[to_oai_type(m)forminmessages]oai_messages=[itemforsublistinoai_messages_nestedforiteminsublist]# TODO: allow custom handling.# For now we raise an error if images are present and vision is not supportedifself.model_info["vision"]isFalse:formessageinmessages:ifisinstance(message,UserMessage):ifisinstance(message.content,list)andany(isinstance(x,Image)forxinmessage.content):raiseValueError("Model does not support vision and image was provided")ifjson_outputisnotNone:ifself.model_info["json_output"]isFalseandjson_outputisTrue:raiseValueError("Model does not support JSON output")ifjson_outputisTrue:create_args["response_format"]={"type":"json_object"}else:create_args["response_format"]={"type":"text"}iflen(tools)>0:converted_tools=convert_tools(tools)stream_future=asyncio.ensure_future(self._client.chat.completions.create(messages=oai_messages,stream=True,tools=converted_tools,**create_args,))else:stream_future=asyncio.ensure_future(self._client.chat.completions.create(messages=oai_messages,stream=True,**create_args))ifcancellation_tokenisnotNone:cancellation_token.link_future(stream_future)stream=awaitstream_futurechoice:Union[ParsedChoice[Any],ParsedChoice[BaseModel],ChunkChoice]=cast(ChunkChoice,None)chunk=Nonestop_reason=Nonemaybe_model=Nonecontent_deltas:List[str]=[]full_tool_calls:Dict[int,FunctionCall]={}completion_tokens=0logprobs:Optional[List[ChatCompletionTokenLogprob]]=Noneempty_chunk_count=0whileTrue:try:chunk_future=asyncio.ensure_future(anext(stream))ifcancellation_tokenisnotNone:cancellation_token.link_future(chunk_future)chunk=awaitchunk_future# This is to address a bug in AzureOpenAIChatCompletionClient. OpenAIChatCompletionClient works fine.# https://github.com/microsoft/autogen/issues/4213iflen(chunk.choices)==0:empty_chunk_count+=1ifmax_consecutive_empty_chunk_tolerance==0:raiseValueError("Consecutive empty chunks found. Change max_empty_consecutive_chunk_tolerance to increase empty chunk tolerance")elifempty_chunk_count>=max_consecutive_empty_chunk_tolerance:raiseValueError("Exceeded the threshold of receiving consecutive empty chunks")continueelse:empty_chunk_count=0# to process usage chunk in streaming situations# add stream_options={"include_usage": True} in the initialization of OpenAIChatCompletionClient(...)# However the different api's# OPENAI api usage chunk produces no choices so need to check if there is a choice# liteLLM api usage chunk does produce choiceschoice=(chunk.choices[0]iflen(chunk.choices)>0elsechoiceifchunk.usageisnotNoneandstop_reasonisnotNoneelsecast(ChunkChoice,None))# for liteLLM chunk usage, do the following hack keeping the pervious chunk.stop_reason (if set).# set the stop_reason for the usage chunk to the prior stop_reasonstop_reason=choice.finish_reasonifchunk.usageisNoneandstop_reasonisNoneelsestop_reasonmaybe_model=chunk.model# First try get contentifchoice.delta.contentisnotNone:content_deltas.append(choice.delta.content)iflen(choice.delta.content)>0:yieldchoice.delta.contentcontinue# Otherwise, get tool callsifchoice.delta.tool_callsisnotNone:fortool_call_chunkinchoice.delta.tool_calls:idx=tool_call_chunk.indexifidxnotinfull_tool_calls:# We ignore the type hint here because we want to fill in type when the delta provides itfull_tool_calls[idx]=FunctionCall(id="",arguments="",name="")iftool_call_chunk.idisnotNone:full_tool_calls[idx].id+=tool_call_chunk.idiftool_call_chunk.functionisnotNone:iftool_call_chunk.function.nameisnotNone:full_tool_calls[idx].name+=tool_call_chunk.function.nameiftool_call_chunk.function.argumentsisnotNone:full_tool_calls[idx].arguments+=tool_call_chunk.function.argumentsifchoice.logprobsandchoice.logprobs.content:logprobs=[ChatCompletionTokenLogprob(token=x.token,logprob=x.logprob,top_logprobs=[TopLogprob(logprob=y.logprob,bytes=y.bytes)foryinx.top_logprobs],bytes=x.bytes,)forxinchoice.logprobs.content]exceptStopAsyncIteration:breakmodel=maybe_modelorcreate_args["model"]model=model.replace("gpt-35","gpt-3.5")# hack for Azure APIifchunkandchunk.usage:prompt_tokens=chunk.usage.prompt_tokenselse:prompt_tokens=0ifstop_reason=="function_call":raiseValueError("Function calls are not supported in this context")content:Union[str,List[FunctionCall]]iflen(content_deltas)>1:content="".join(content_deltas)ifchunkandchunk.usage:completion_tokens=chunk.usage.completion_tokenselse:completion_tokens=0else:completion_tokens=0# TODO: fix assumption that dict values were added in order and actually order by int index# for tool_call in full_tool_calls.values():# # value = json.dumps(tool_call)# # completion_tokens += count_token(value, model=model)# completion_tokens += 0content=list(full_tool_calls.values())usage=RequestUsage(prompt_tokens=prompt_tokens,completion_tokens=completion_tokens,)result=CreateResult(finish_reason=normalize_stop_reason(stop_reason),content=content,usage=usage,cached=False,logprobs=logprobs,)self._total_usage=_add_usage(self._total_usage,usage)self._actual_usage=_add_usage(self._actual_usage,usage)yieldresult
[docs]defcount_tokens(self,messages:Sequence[LLMMessage],*,tools:Sequence[Tool|ToolSchema]=[])->int:model=self._create_args["model"]try:encoding=tiktoken.encoding_for_model(model)exceptKeyError:trace_logger.warning(f"Model {model} not found. Using cl100k_base encoding.")encoding=tiktoken.get_encoding("cl100k_base")tokens_per_message=3tokens_per_name=1num_tokens=0# Message tokens.formessageinmessages:num_tokens+=tokens_per_messageoai_message=to_oai_type(message)foroai_message_partinoai_message:forkey,valueinoai_message_part.items():ifvalueisNone:continueifisinstance(message,UserMessage)andisinstance(value,list):typed_message_value=cast(List[ChatCompletionContentPartParam],value)assertlen(typed_message_value)==len(message.content),"Mismatch in message content and typed message value"# We need image properties that are only in the original messageforpart,content_partinzip(typed_message_value,message.content,strict=False):ifisinstance(content_part,Image):# TODO: add detail parameternum_tokens+=calculate_vision_tokens(content_part)elifisinstance(part,str):num_tokens+=len(encoding.encode(part))else:try:serialized_part=json.dumps(part)num_tokens+=len(encoding.encode(serialized_part))exceptTypeError:trace_logger.warning(f"Could not convert {part} to string, skipping.")else:ifnotisinstance(value,str):try:value=json.dumps(value)exceptTypeError:trace_logger.warning(f"Could not convert {value} to string, skipping.")continuenum_tokens+=len(encoding.encode(value))ifkey=="name":num_tokens+=tokens_per_namenum_tokens+=3# every reply is primed with <|start|>assistant<|message|># Tool tokens.oai_tools=convert_tools(tools)fortoolinoai_tools:function=tool["function"]tool_tokens=len(encoding.encode(function["name"]))if"description"infunction:tool_tokens+=len(encoding.encode(function["description"]))tool_tokens-=2if"parameters"infunction:parameters=function["parameters"]if"properties"inparameters:assertisinstance(parameters["properties"],dict)forpropertiesKeyinparameters["properties"]:# pyright: ignoreassertisinstance(propertiesKey,str)tool_tokens+=len(encoding.encode(propertiesKey))v=parameters["properties"][propertiesKey]# pyright: ignoreforfieldinv:# pyright: ignoreiffield=="type":tool_tokens+=2tool_tokens+=len(encoding.encode(v["type"]))# pyright: ignoreeliffield=="description":tool_tokens+=2tool_tokens+=len(encoding.encode(v["description"]))# pyright: ignoreeliffield=="enum":tool_tokens-=3foroinv["enum"]:# pyright: ignoretool_tokens+=3tool_tokens+=len(encoding.encode(o))# pyright: ignoreelse:trace_logger.warning(f"Not supported field {field}")tool_tokens+=11iflen(parameters["properties"])==0:# pyright: ignoretool_tokens-=2num_tokens+=tool_tokensnum_tokens+=12returnnum_tokens
@propertydefcapabilities(self)->ModelCapabilities:# type: ignorewarnings.warn("capabilities is deprecated, use model_info instead",DeprecationWarning,stacklevel=2)returnself._model_info@propertydefmodel_info(self)->ModelInfo:returnself._model_info
[docs]classOpenAIChatCompletionClient(BaseOpenAIChatCompletionClient,Component[OpenAIClientConfigurationConfigModel]):"""Chat completion client for OpenAI hosted models. You can also use this client for OpenAI-compatible ChatCompletion endpoints. **Using this client for non-OpenAI models is not tested or guaranteed.** For non-OpenAI models, please first take a look at our `community extensions <https://microsoft.github.io/autogen/dev/user-guide/extensions-user-guide/index.html>`_ for additional model clients. Args: model (str): Which OpenAI model to use. api_key (optional, str): The API key to use. **Required if 'OPENAI_API_KEY' is not found in the environment variables.** organization (optional, str): The organization ID to use. base_url (optional, str): The base URL to use. **Required if the model is not hosted on OpenAI.** timeout: (optional, float): The timeout for the request in seconds. max_retries (optional, int): The maximum number of retries to attempt. model_info (optional, ModelInfo): The capabilities of the model. **Required if the model name is not a valid OpenAI model.** frequency_penalty (optional, float): logit_bias: (optional, dict[str, int]): max_tokens (optional, int): n (optional, int): presence_penalty (optional, float): response_format (optional, literal["json_object", "text"]): seed (optional, int): stop (optional, str | List[str]): temperature (optional, float): top_p (optional, float): user (optional, str): To use this client, you must install the `openai` extension: .. code-block:: bash pip install "autogen-ext[openai]" The following code snippet shows how to use the client with an OpenAI model: .. code-block:: python from autogen_ext.models.openai import OpenAIChatCompletionClient from autogen_core.models import UserMessage openai_client = OpenAIChatCompletionClient( model="gpt-4o-2024-08-06", # api_key="sk-...", # Optional if you have an OPENAI_API_KEY environment variable set. ) result = await openai_client.create([UserMessage(content="What is the capital of France?", source="user")]) # type: ignore print(result) To use the client with a non-OpenAI model, you need to provide the base URL of the model and the model capabilities: .. code-block:: python from autogen_ext.models.openai import OpenAIChatCompletionClient custom_model_client = OpenAIChatCompletionClient( model="custom-model-name", base_url="https://custom-model.com/reset/of/the/path", api_key="placeholder", model_capabilities={ "vision": True, "function_calling": True, "json_output": True, }, ) To load the client from a configuration, you can use the `load_component` method: .. code-block:: python from autogen_core.models import ChatCompletionClient config = { "provider": "OpenAIChatCompletionClient", "config": {"model": "gpt-4o", "api_key": "REPLACE_WITH_YOUR_API_KEY"}, } client = ChatCompletionClient.load_component(config) To view the full list of available configuration options, see the :py:class:`OpenAIClientConfigurationConfigModel` class. """component_type="model"component_config_schema=OpenAIClientConfigurationConfigModelcomponent_provider_override="autogen_ext.models.openai.OpenAIChatCompletionClient"def__init__(self,**kwargs:Unpack[OpenAIClientConfiguration]):if"model"notinkwargs:raiseValueError("model is required for OpenAIChatCompletionClient")model_capabilities:Optional[ModelCapabilities]=None# type: ignorecopied_args=dict(kwargs).copy()if"model_capabilities"inkwargs:model_capabilities=kwargs["model_capabilities"]delcopied_args["model_capabilities"]model_info:Optional[ModelInfo]=Noneif"model_info"inkwargs:model_info=kwargs["model_info"]delcopied_args["model_info"]client=_openai_client_from_config(copied_args)create_args=_create_args_from_config(copied_args)self._raw_config:Dict[str,Any]=copied_argssuper().__init__(client=client,create_args=create_args,model_capabilities=model_capabilities,model_info=model_info)def__getstate__(self)->Dict[str,Any]:state=self.__dict__.copy()state["_client"]=Nonereturnstatedef__setstate__(self,state:Dict[str,Any])->None:self.__dict__.update(state)self._client=_openai_client_from_config(state["_raw_config"])
[docs]classAzureOpenAIChatCompletionClient(BaseOpenAIChatCompletionClient,Component[AzureOpenAIClientConfigurationConfigModel]):"""Chat completion client for Azure OpenAI hosted models. Args: model (str): Which OpenAI model to use. azure_endpoint (str): The endpoint for the Azure model. **Required for Azure models.** azure_deployment (str): Deployment name for the Azure model. **Required for Azure models.** api_version (str): The API version to use. **Required for Azure models.** azure_ad_token (str): The Azure AD token to use. Provide this or `azure_ad_token_provider` for token-based authentication. azure_ad_token_provider (optional, Callable[[], Awaitable[str]] | AzureTokenProvider): The Azure AD token provider to use. Provide this or `azure_ad_token` for token-based authentication. api_key (optional, str): The API key to use, use this if you are using key based authentication. It is optional if you are using Azure AD token based authentication or `AZURE_OPENAI_API_KEY` environment variable. timeout: (optional, float): The timeout for the request in seconds. max_retries (optional, int): The maximum number of retries to attempt. model_info (optional, ModelInfo): The capabilities of the model. **Required if the model name is not a valid OpenAI model.** frequency_penalty (optional, float): logit_bias: (optional, dict[str, int]): max_tokens (optional, int): n (optional, int): presence_penalty (optional, float): response_format (optional, literal["json_object", "text"]): seed (optional, int): stop (optional, str | List[str]): temperature (optional, float): top_p (optional, float): user (optional, str): To use this client, you must install the `azure` and `openai` extensions: .. code-block:: bash pip install "autogen-ext[openai,azure]" To use the client, you need to provide your deployment id, Azure Cognitive Services endpoint, api version, and model capabilities. For authentication, you can either provide an API key or an Azure Active Directory (AAD) token credential. The following code snippet shows how to use AAD authentication. The identity used must be assigned the `Cognitive Services OpenAI User <https://learn.microsoft.com/en-us/azure/ai-services/openai/how-to/role-based-access-control#cognitive-services-openai-user>`_ role. .. code-block:: python from autogen_ext.models.openai import AzureOpenAIChatCompletionClient from azure.identity import DefaultAzureCredential, get_bearer_token_provider # Create the token provider token_provider = get_bearer_token_provider(DefaultAzureCredential(), "https://cognitiveservices.azure.com/.default") az_model_client = AzureOpenAIChatCompletionClient( azure_deployment="{your-azure-deployment}", model="{deployed-model, such as 'gpt-4o'}", api_version="2024-06-01", azure_endpoint="https://{your-custom-endpoint}.openai.azure.com/", azure_ad_token_provider=token_provider, # Optional if you choose key-based authentication. # api_key="sk-...", # For key-based authentication. `AZURE_OPENAI_API_KEY` environment variable can also be used instead. ) To load the client that uses identity based aith from a configuration, you can use the `load_component` method: .. code-block:: python from autogen_core.models import ChatCompletionClient config = { "provider": "AzureOpenAIChatCompletionClient", "config": { "model": "gpt-4o-2024-05-13", "azure_endpoint": "https://{your-custom-endpoint}.openai.azure.com/", "azure_deployment": "{your-azure-deployment}", "api_version": "2024-06-01", "azure_ad_token_provider": { "provider": "autogen_ext.auth.azure.AzureTokenProvider", "config": { "provider_kind": "DefaultAzureCredential", "scopes": ["https://cognitiveservices.azure.com/.default"], }, }, }, } client = ChatCompletionClient.load_component(config) To view the full list of available configuration options, see the :py:class:`AzureOpenAIClientConfigurationConfigModel` class. .. note:: Right now only `DefaultAzureCredential` is supported with no additional args passed to it. See `here <https://learn.microsoft.com/en-us/azure/ai-services/openai/how-to/managed-identity#chat-completions>`_ for how to use the Azure client directly or for more info. """component_type="model"component_config_schema=AzureOpenAIClientConfigurationConfigModelcomponent_provider_override="autogen_ext.models.openai.AzureOpenAIChatCompletionClient"def__init__(self,**kwargs:Unpack[AzureOpenAIClientConfiguration]):model_capabilities:Optional[ModelCapabilities]=None# type: ignorecopied_args=dict(kwargs).copy()if"model_capabilities"inkwargs:model_capabilities=kwargs["model_capabilities"]delcopied_args["model_capabilities"]model_info:Optional[ModelInfo]=Noneif"model_info"inkwargs:model_info=kwargs["model_info"]delcopied_args["model_info"]client=_azure_openai_client_from_config(copied_args)create_args=_create_args_from_config(copied_args)self._raw_config:Dict[str,Any]=copied_argssuper().__init__(client=client,create_args=create_args,model_capabilities=model_capabilities,model_info=model_info)def__getstate__(self)->Dict[str,Any]:state=self.__dict__.copy()state["_client"]=Nonereturnstatedef__setstate__(self,state:Dict[str,Any])->None:self.__dict__.update(state)self._client=_azure_openai_client_from_config(state["_raw_config"])
[docs]def_to_config(self)->AzureOpenAIClientConfigurationConfigModel:from...auth.azureimportAzureTokenProvidercopied_config=self._raw_config.copy()if"azure_ad_token_provider"incopied_config:ifnotisinstance(copied_config["azure_ad_token_provider"],AzureTokenProvider):raiseValueError("azure_ad_token_provider must be a AzureTokenProvider to be component serialized")copied_config["azure_ad_token_provider"]=(copied_config["azure_ad_token_provider"].dump_component().model_dump(exclude_none=True))returnAzureOpenAIClientConfigurationConfigModel(**copied_config)