1#!/usr/bin/env python3
  2
  3"""Function calling example using pydantic models."""
  4
  5from __future__ import annotations
  6
  7import argparse
  8import datetime
  9import json
 10import logging
 11import textwrap
 12import sys
 13from enum import Enum
 14from typing import Optional, Union
 15
 16import requests
 17from pydantic import BaseModel, Field
 18from pydantic_models_to_grammar import (add_run_method_to_dynamic_model, convert_dictionary_to_pydantic_model,
 19                                        create_dynamic_model_from_function, generate_gbnf_grammar_and_documentation)
 20
 21
 22def create_completion(host, prompt, gbnf_grammar):
 23    """Calls the /completion API on llama-server.
 24
 25    See
 26    https://github.com/ggml-org/llama.cpp/tree/HEAD/tools/server#api-endpoints
 27    """
 28    print(f"  Request:\n    Grammar:\n{textwrap.indent(gbnf_grammar, '      ')}\n    Prompt:\n{textwrap.indent(prompt.rstrip(), '      ')}")
 29    headers = {"Content-Type": "application/json"}
 30    data = {"prompt": prompt, "grammar": gbnf_grammar}
 31    result = requests.post(f"http://{host}/completion", headers=headers, json=data).json()
 32    assert data.get("error") is None, data
 33    logging.info("Result: %s", result)
 34    content = result["content"]
 35    print(f"  Model: {result['model']}")
 36    print(f"  Result:\n{textwrap.indent(json.dumps(json.loads(content), indent=2), '    ')}")
 37    return content
 38
 39
 40# A function for the agent to send a message to the user.
 41class SendMessageToUser(BaseModel):
 42    """Send a message to the User."""
 43    chain_of_thought: str = Field(..., description="Your chain of thought while sending the message.")
 44    message: str = Field(..., description="Message you want to send to the user.")
 45
 46    def run(self):
 47        print(f"SendMessageToUser: {self.message}")
 48
 49
 50def example_rce(host):
 51    """Minimal test case where the LLM call an arbitrary python function."""
 52    print("- example_rce")
 53    tools = [SendMessageToUser]
 54    gbnf_grammar, documentation = generate_gbnf_grammar_and_documentation(
 55        pydantic_model_list=tools, outer_object_name="function",
 56        outer_object_content="function_parameters", model_prefix="Function", fields_prefix="Parameters")
 57    system_message = "You are an advanced AI, tasked to assist the user by calling functions in JSON format. The following are the available functions and their parameters and types:\n\n" + documentation
 58    user_message = "What is 42 * 42?"
 59    prompt = f"<|im_start|>system\n{system_message}<|im_end|>\n<|im_start|>user\n{user_message}<|im_end|>\n<|im_start|>assistant"
 60    text = create_completion(host, prompt, gbnf_grammar)
 61    json_data = json.loads(text)
 62    tools_map = {tool.__name__:tool for tool in tools}
 63    # This finds "SendMessageToUser":
 64    tool = tools_map.get(json_data["function"])
 65    if not tool:
 66        print(f"Error: unknown tool {json_data['function']}")
 67        return 1
 68    tool(**json_data["function_parameters"]).run()
 69    return 0
 70
 71
 72# Enum for the calculator tool.
 73class MathOperation(Enum):
 74    ADD = "add"
 75    SUBTRACT = "subtract"
 76    MULTIPLY = "multiply"
 77    DIVIDE = "divide"
 78
 79
 80# Simple pydantic calculator tool for the agent that can add, subtract,
 81# multiply, and divide. Docstring and description of fields will be used in
 82# system prompt.
 83class Calculator(BaseModel):
 84    """Perform a math operation on two numbers."""
 85    number_one: Union[int, float] = Field(..., description="First number.")
 86    operation: MathOperation = Field(..., description="Math operation to perform.")
 87    number_two: Union[int, float] = Field(..., description="Second number.")
 88
 89    def run(self):
 90        if self.operation == MathOperation.ADD:
 91            return self.number_one + self.number_two
 92        elif self.operation == MathOperation.SUBTRACT:
 93            return self.number_one - self.number_two
 94        elif self.operation == MathOperation.MULTIPLY:
 95            return self.number_one * self.number_two
 96        elif self.operation == MathOperation.DIVIDE:
 97            return self.number_one / self.number_two
 98        else:
 99            raise ValueError("Unknown operation.")
100
101
102def example_calculator(host):
103    """Have the LLM ask to get a calculation done.
104
105    Here the grammar gets generated by passing the available function models to
106    generate_gbnf_grammar_and_documentation function. This also generates a
107    documentation usable by the LLM.
108
109    pydantic_model_list is the list of pydantic models outer_object_name is an
110    optional name for an outer object around the actual model object. Like a
111    "function" object with "function_parameters" which contains the actual model
112    object. If None, no outer object will be generated outer_object_content is
113    the name of outer object content.
114
115    model_prefix is the optional prefix for models in the documentation. (Default="Output Model")
116    fields_prefix is the prefix for the model fields in the documentation. (Default="Output Fields")
117    """
118    print("- example_calculator")
119    tools = [SendMessageToUser, Calculator]
120    gbnf_grammar, documentation = generate_gbnf_grammar_and_documentation(
121        pydantic_model_list=tools, outer_object_name="function",
122        outer_object_content="function_parameters", model_prefix="Function", fields_prefix="Parameters")
123    system_message = "You are an advanced AI, tasked to assist the user by calling functions in JSON format. The following are the available functions and their parameters and types:\n\n" + documentation
124    user_message1 = "What is 42 * 42?"
125    prompt = f"<|im_start|>system\n{system_message}<|im_end|>\n<|im_start|>user\n{user_message1}<|im_end|>\n<|im_start|>assistant"
126    text = create_completion(host, prompt, gbnf_grammar)
127    json_data = json.loads(text)
128    expected = {
129        "function": "Calculator",
130        "function_parameters": {
131            "number_one": 42,
132            "operation": "multiply",
133            "number_two": 42
134        }
135    }
136    if json_data != expected:
137        print("  Result is not as expected!")
138    tools_map = {tool.__name__:tool for tool in tools}
139    # This finds "Calculator":
140    tool = tools_map.get(json_data["function"])
141    if not tool:
142        print(f"Error: unknown tool {json_data['function']}")
143        return 1
144    result = tool(**json_data["function_parameters"]).run()
145    print(f"  Call {json_data['function']} gave result {result}")
146    return 0
147
148
149class Category(Enum):
150    """The category of the book."""
151    Fiction = "Fiction"
152    NonFiction = "Non-Fiction"
153
154
155class Book(BaseModel):
156    """Represents an entry about a book."""
157    title: str = Field(..., description="Title of the book.")
158    author: str = Field(..., description="Author of the book.")
159    published_year: Optional[int] = Field(..., description="Publishing year of the book.")
160    keywords: list[str] = Field(..., description="A list of keywords.")
161    category: Category = Field(..., description="Category of the book.")
162    summary: str = Field(..., description="Summary of the book.")
163
164
165def example_struct(host):
166    """A example structured output based on pydantic models.
167
168    The LLM will create an entry for a Book database out of an unstructured
169    text. We need no additional parameters other than our list of pydantic
170    models.
171    """
172    print("- example_struct")
173    tools = [Book]
174    gbnf_grammar, documentation = generate_gbnf_grammar_and_documentation(pydantic_model_list=tools)
175    system_message = "You are an advanced AI, tasked to create a dataset entry in JSON for a Book. The following is the expected output model:\n\n" + documentation
176    text = """The Feynman Lectures on Physics is a physics textbook based on some lectures by Richard Feynman, a Nobel laureate who has sometimes been called "The Great Explainer". The lectures were presented before undergraduate students at the California Institute of Technology (Caltech), during 1961–1963. The book's co-authors are Feynman, Robert B. Leighton, and Matthew Sands."""
177    prompt = f"<|im_start|>system\n{system_message}<|im_end|>\n<|im_start|>user\n{text}<|im_end|>\n<|im_start|>assistant"
178    text = create_completion(host, prompt, gbnf_grammar)
179    json_data = json.loads(text)
180    # In this case, there's no function nor function_parameters.
181    # Here the result will vary based on the LLM used.
182    keys = sorted(["title", "author", "published_year", "keywords", "category", "summary"])
183    if keys != sorted(json_data.keys()):
184        print(f"Unexpected result: {sorted(json_data.keys())}")
185        return 1
186    book = Book(**json_data)
187    print(f"  As a Book object: %s" % book)
188    return 0
189
190
191def get_current_datetime(output_format: Optional[str] = None):
192    """Get the current date and time in the given format.
193
194    Args:
195         output_format: formatting string for the date and time, defaults to '%Y-%m-%d %H:%M:%S'
196    """
197    return datetime.datetime.now().strftime(output_format or "%Y-%m-%d %H:%M:%S")
198
199
200# Example function to get the weather.
201def get_current_weather(location, unit):
202    """Get the current weather in a given location"""
203    if "London" in location:
204        return json.dumps({"location": "London", "temperature": "42", "unit": unit.value})
205    elif "New York" in location:
206        return json.dumps({"location": "New York", "temperature": "24", "unit": unit.value})
207    elif "North Pole" in location:
208        return json.dumps({"location": "North Pole", "temperature": "-42", "unit": unit.value})
209    return json.dumps({"location": location, "temperature": "unknown"})
210
211
212def example_concurrent(host):
213    """An example for parallel function calling with a Python function, a pydantic
214    function model and an OpenAI like function definition.
215    """
216    print("- example_concurrent")
217    # Function definition in OpenAI style.
218    current_weather_tool = {
219        "type": "function",
220        "function": {
221            "name": "get_current_weather",
222            "description": "Get the current weather in a given location",
223            "parameters": {
224                "type": "object",
225                "properties": {
226                    "location": {
227                        "type": "string",
228                        "description": "The city and state, e.g. San Francisco, CA",
229                    },
230                    "unit": {"type": "string", "enum": ["celsius", "fahrenheit"]},
231                },
232                "required": ["location"],
233            },
234        },
235    }
236    # Convert OpenAI function definition into pydantic model.
237    current_weather_tool_model = convert_dictionary_to_pydantic_model(current_weather_tool)
238    # Add the actual function to a pydantic model.
239    current_weather_tool_model = add_run_method_to_dynamic_model(current_weather_tool_model, get_current_weather)
240
241    # Convert normal Python function to a pydantic model.
242    current_datetime_model = create_dynamic_model_from_function(get_current_datetime)
243
244    tools = [SendMessageToUser, Calculator, current_datetime_model, current_weather_tool_model]
245    gbnf_grammar, documentation = generate_gbnf_grammar_and_documentation(
246        pydantic_model_list=tools, outer_object_name="function",
247        outer_object_content="params", model_prefix="Function", fields_prefix="Parameters", list_of_outputs=True)
248    system_message = "You are an advanced AI assistant. You are interacting with the user and with your environment by calling functions. You call functions by writing JSON objects, which represent specific function calls.\nBelow is a list of your available function calls:\n\n" + documentation
249    text = """Get the date and time, get the current weather in celsius in London and solve the following calculation: 42 * 42"""
250    prompt = f"<|im_start|>system\n{system_message}<|im_end|>\n<|im_start|>user\n{text}<|im_end|>\n<|im_start|>assistant"
251    text = create_completion(host, prompt, gbnf_grammar)
252    json_data = json.loads(text)
253    expected = [
254      {
255        "function": "get_current_datetime",
256        "params": {
257          "output_format": "%Y-%m-%d %H:%M:%S"
258        }
259      },
260      {
261        "function": "get_current_weather",
262        "params": {
263          "location": "London",
264          "unit": "celsius"
265        }
266      },
267      {
268        "function": "Calculator",
269        "params": {
270          "number_one": 42,
271          "operation": "multiply",
272          "number_two": 42
273        }
274      }
275    ]
276    res = 0
277    if json_data != expected:
278        print("  Result is not as expected!")
279        print("  This can happen on highly quantized models")
280        res = 1
281    tools_map = {tool.__name__:tool for tool in tools}
282    for call in json_data:
283      tool = tools_map.get(call["function"])
284      if not tool:
285          print(f"Error: unknown tool {call['function']}")
286          return 1
287      result = tool(**call["params"]).run()
288      print(f"  Call {call['function']} returned {result}")
289    # Should output something like this:
290    #   Call get_current_datetime returned 2024-07-15 09:50:38
291    #   Call get_current_weather returned {"location": "London", "temperature": "42", "unit": "celsius"}
292    #   Call Calculator returned 1764
293    return res
294
295
296def main():
297    parser = argparse.ArgumentParser(description=sys.modules[__name__].__doc__)
298    parser.add_argument("--host", default="localhost:8080", help="llama.cpp server")
299    parser.add_argument("-v", "--verbose", action="store_true", help="enables logging")
300    args = parser.parse_args()
301    logging.basicConfig(level=logging.INFO if args.verbose else logging.ERROR)
302    ret = 0
303    # Comment out below to only run the example you want.
304    ret = ret or example_rce(args.host)
305    ret = ret or example_calculator(args.host)
306    ret = ret or example_struct(args.host)
307    ret = ret or example_concurrent(args.host)
308    return ret
309
310
311if __name__ == "__main__":
312    sys.exit(main())