# Copyright 2023 The Qwen team, Alibaba Group. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import copy import json import os from typing import List, Literal, Union import json5 from qwen_agent.llm.fncall_prompts.base_fncall_prompt import BaseFnCallPrompt from qwen_agent.llm.schema import ASSISTANT, FUNCTION, SYSTEM, USER, ContentItem, FunctionCall, Message from qwen_agent.log import logger class NousFnCallPrompt(BaseFnCallPrompt): def preprocess_fncall_messages(self, messages: List[Message], functions: List[dict], lang: Literal['en', 'zh'], parallel_function_calls: bool = True, function_choice: Union[Literal['auto'], str] = 'auto', **kwargs) -> List[Message]: del lang # ignored del parallel_function_calls # ignored if function_choice != 'auto': raise NotImplementedError ori_messages = messages # Change function_call responses to plaintext responses: messages = [] for msg in copy.deepcopy(ori_messages): role, content, reasoning_content = msg.role, msg.content, msg.reasoning_content if role in (SYSTEM, USER): messages.append(msg) elif role == ASSISTANT: content = (content or []) fn_call = msg.function_call if fn_call: if (not SPECIAL_CODE_MODE) or (CODE_TOOL_PATTERN not in fn_call.name): arguments = fn_call.arguments try: arguments = json5.loads(arguments) except Exception: logger.warning('Invalid json tool-calling arguments') fc = {'name': fn_call.name, 'arguments': arguments} fc = json.dumps(fc, ensure_ascii=False) fc = f'\n{fc}\n' else: para = json5.loads(fn_call.arguments) code = para['code'] para['code'] = '' fc = {'name': fn_call.name, 'arguments': para} fc = json.dumps(fc, ensure_ascii=False) fc = f'\n{fc}\n\n{code}\n\n' content.append(ContentItem(text=fc)) if messages and messages[-1].role == ASSISTANT: if messages[-1].content and messages[-1].content[-1].text and ( not messages[-1].content[-1].text.endswith('\n')): messages[-1].content.append(ContentItem(text='\n')) messages[-1].content.extend(content) else: # TODO: Assuming there will only be one continuous reasoning_content here messages.append(Message(role=role, content=content, reasoning_content=reasoning_content)) elif role == FUNCTION: assert isinstance(content, list) content = [ContentItem(text='\n')] + content + [ContentItem(text='\n')] if messages[-1].role == USER: messages[-1].content.append(ContentItem(text='\n')) messages[-1].content.extend(content) else: messages.append(Message(role=USER, content=content)) else: raise TypeError tool_descs = [{'type': 'function', 'function': f} for f in functions] tool_names = [function.get('name_for_model', function.get('name', '')) for function in functions] tool_descs = '\n'.join([json.dumps(f, ensure_ascii=False) for f in tool_descs]) if SPECIAL_CODE_MODE and any([CODE_TOOL_PATTERN in x for x in tool_names]): tool_system = FN_CALL_TEMPLATE_WITH_CI.format(tool_descs=tool_descs) else: tool_system = FN_CALL_TEMPLATE.format(tool_descs=tool_descs) if messages and messages[0].role == SYSTEM: messages[0].content.append(ContentItem(text='\n\n' + tool_system)) else: messages = [Message(role=SYSTEM, content=[ContentItem(text=tool_system)])] + messages return messages def postprocess_fncall_messages( self, messages: List[Message], parallel_function_calls: bool = True, function_choice: Union[Literal['auto'], str] = 'auto', thought_in_content: bool = False, ) -> List[Message]: if function_choice != 'auto': raise NotImplementedError # Convert plaintext responses to function_call responses: new_messages = [] tool_id = 1 for msg in messages: role, content, reasoning_content, extra = msg.role, msg.content, msg.reasoning_content, msg.extra extra = extra or {} assert isinstance(content, list) if role in (SYSTEM, USER): new_messages.append( Message(role=role, content=content, reasoning_content=reasoning_content, extra=extra)) continue # Reasoning content is placed in a separate message if reasoning_content: new_messages.append(Message(role=role, content='', reasoning_content=reasoning_content, extra=extra)) new_content = [] for item in content: item_type, item_text = item.get_type_and_value() if item_type != 'text': # multimodal new_content.append(item) continue # Do not parse in thought!!! if '' in item_text: thought_in_content = True if thought_in_content: if '' not in item_text: new_content.append(ContentItem(text=item_text)) continue _item_text = item_text.split('') # assert len(_item_text) == 2 new_content.append(ContentItem(text=''.join(_item_text[:-1]) + '')) item_text = _item_text[-1] i = item_text.find('') # If no function call: if i < 0: show_text = item_text if show_text: new_content.append(ContentItem(text=show_text)) continue # split tool-call to separate assistant msg tool_call_list = item_text.split('') pre_thought = tool_call_list[0] if pre_thought.strip(): new_content.append(ContentItem(text=pre_thought)) for txt in tool_call_list[1:]: if not txt.strip(): continue if '' not in txt: # incomplete : This is to better represent incomplete tool calls in streaming output fn_name, fn_args = extract_fn(txt) if fn_name: # need to call function if new_content: new_messages.append(Message( role=role, content=new_content, extra=extra, )) # split thought and function call new_content = [] # TODO: process incomplete tool-call messages _extra = copy.deepcopy(extra) if extra else {'function_id': ''} _extra['function_id'] = str(tool_id) tool_id += 1 new_messages.append( Message( role=ASSISTANT, content=[], function_call=FunctionCall( name=fn_name, arguments=fn_args, ), extra=_extra, )) continue one_tool_call_txt = txt.split('') # The complete tool-call response if new_content: new_messages.append(Message( role=role, content=new_content, extra=extra, )) # split thought and function call new_content = [] fn = None if SPECIAL_CODE_MODE and '' in one_tool_call_txt[0] and '' in one_tool_call_txt[0]: _snips = one_tool_call_txt[0].split('') for i, _s in enumerate(_snips): if i == 0: fn = json5.loads(_s) else: # TODO: support more flexible params code = _s.replace('', '') fn['arguments']['code'] = code else: try: fn = json5.loads(one_tool_call_txt[0].strip()) except Exception: logger.warning('Invalid json tool-calling arguments') fn_name, fn_args = extract_fn(one_tool_call_txt[0].strip()) _extra = copy.deepcopy(extra) if extra else {'function_id': ''} _extra['function_id'] = str(tool_id) tool_id += 1 new_messages.append( Message( role=ASSISTANT, content=[], function_call=FunctionCall( name=fn_name, arguments=fn_args, ), extra=_extra, )) if fn and 'name' in fn and 'arguments' in fn: _extra = copy.deepcopy(extra) if extra else {} _extra['function_id'] = str(tool_id) tool_id += 1 new_messages.append( Message( role=ASSISTANT, content=[], function_call=FunctionCall( name=fn['name'], arguments=json.dumps(fn['arguments'], ensure_ascii=False), ), extra=_extra, )) # Expected not to output extra tails # if one_tool_call_txt[1].strip(): # new_content.append(ContentItem(text=one_tool_call_txt[1])) if new_content: new_messages.append(Message(role=role, content=new_content, extra=extra)) return new_messages FN_CALL_TEMPLATE = """# Tools You may call one or more functions to assist with the user query. You are provided with function signatures within XML tags: {tool_descs} For each function call, return a json object with function name and arguments within XML tags: {{"name": , "arguments": }} """ SPECIAL_CODE_MODE = os.getenv('SPECIAL_CODE_MODE', 'false').lower() == 'true' CODE_TOOL_PATTERN = 'code_interpreter' FN_CALL_TEMPLATE_WITH_CI = """# Tools You may call one or more functions to assist with the user query. You are provided with function signatures within XML tags: {tool_descs} For each function call, return a json object with function name and arguments within XML tags: {{"name": , "arguments": }} For code parameters, use placeholders first, and then put the code within XML tags, such as: {{"name": , "arguments": {{"code": ""}}}} Here is the code. """ # Mainly for removing incomplete special tokens when streaming the output # This assumes that '\n{"name": "' is the special token for the NousFnCallPrompt def remove_incomplete_special_tokens(text: str) -> str: if text in '\n{"name": "': text = '' return text def extract_fn(text: str): fn_name, fn_args = '', '' fn_name_s = '"name": "' fn_name_e = '", "' fn_args_s = '"arguments": ' i = text.find(fn_name_s) k = text.find(fn_args_s) if i > 0: _text = text[i + len(fn_name_s):] j = _text.find(fn_name_e) if j > -1: fn_name = _text[:j] if k > 0: fn_args = text[k + len(fn_args_s):] fn_args = fn_args.strip() if len(fn_args) > 2: fn_args = fn_args[:-1] else: fn_args = '' return fn_name, fn_args