吴恩达MCP课程(5):mcp_chatbot_prompt_resource.py

article/2025/7/5 9:51:58

前提条件:
1、吴恩达MCP课程(5):research_server_prompt_resource.py
2、server_config_prompt_resource.json文件

{"mcpServers": {"filesystem": {"command": "npx","args": ["--y","@modelcontextprotocol/server-filesystem","."]},"research": {"command": "uv","args": ["run", "research_server_prompt_resource.py"]},"fetch": {"command": "uvx","args": ["mcp-server-fetch"]}}
}

代码

原课程用的anthropic的,下面改成openai,并用千问模型做测试

from dotenv import load_dotenv
import openai
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from contextlib import AsyncExitStack
import json
import asyncio
import osload_dotenv()class MCP_ChatBot:def __init__(self):self.exit_stack = AsyncExitStack()self.client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"),base_url=os.getenv("OPENAI_API_BASE"))# Tools list required for OpenAI APIself.available_tools = []# Prompts list for quick display self.available_prompts = []# Sessions dict maps tool/prompt names or resource URIs to MCP client sessionsself.sessions = {}async def connect_to_server(self, server_name, server_config):try:server_params = StdioServerParameters(**server_config)stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))read, write = stdio_transportsession = await self.exit_stack.enter_async_context(ClientSession(read, write))await session.initialize()try:# List available toolsresponse = await session.list_tools()for tool in response.tools:self.sessions[tool.name] = sessionself.available_tools.append({"type": "function","function": {"name": tool.name,"description": tool.description,"parameters": tool.inputSchema}})# List available promptsprompts_response = await session.list_prompts()if prompts_response and prompts_response.prompts:for prompt in prompts_response.prompts:self.sessions[prompt.name] = sessionself.available_prompts.append({"name": prompt.name,"description": prompt.description,"arguments": prompt.arguments})# List available resourcesresources_response = await session.list_resources()if resources_response and resources_response.resources:for resource in resources_response.resources:resource_uri = str(resource.uri)self.sessions[resource_uri] = sessionexcept Exception as e:print(f"Error {e}")except Exception as e:print(f"Error connecting to {server_name}: {e}")async def connect_to_servers(self):try:with open("server_config_prompt_resource.json", "r") as file:data = json.load(file)servers = data.get("mcpServers", {})for server_name, server_config in servers.items():await self.connect_to_server(server_name, server_config)except Exception as e:print(f"Error loading server config: {e}")raiseasync def process_query(self, query):messages = [{"role":"user", "content":query}]while True:response = self.client.chat.completions.create(model="qwen-turbo",tools=self.available_tools,messages=messages)message = response.choices[0].message# 检查是否有普通文本内容if message.content:print(message.content)messages.append({"role": "assistant", "content": message.content})break# 检查是否有工具调用elif message.tool_calls:# 添加助手消息到历史messages.append({"role": "assistant", "content": None,"tool_calls": message.tool_calls})# 处理每个工具调用for tool_call in message.tool_calls:tool_id = tool_call.idtool_name = tool_call.function.nametool_args = json.loads(tool_call.function.arguments)print(f"Calling tool {tool_name} with args {tool_args}")# 获取session并调用工具session = self.sessions.get(tool_name)if not session:print(f"Tool '{tool_name}' not found.")breakresult = await session.call_tool(tool_name, arguments=tool_args)# 添加工具结果到消息历史messages.append({"role": "tool","tool_call_id": tool_id,"content": result.content})else:breakasync def get_resource(self, resource_uri):session = self.sessions.get(resource_uri)# Fallback for papers URIs - try any papers resource sessionif not session and resource_uri.startswith("papers://"):for uri, sess in self.sessions.items():if uri.startswith("papers://"):session = sessbreakif not session:print(f"Resource '{resource_uri}' not found.")returntry:result = await session.read_resource(uri=resource_uri)if result and result.contents:print(f"\nResource: {resource_uri}")print("Content:")print(result.contents[0].text)else:print("No content available.")except Exception as e:print(f"Error: {e}")async def list_prompts(self):"""List all available prompts."""if not self.available_prompts:print("No prompts available.")returnprint("\nAvailable prompts:")for prompt in self.available_prompts:print(f"- {prompt['name']}: {prompt['description']}")if prompt['arguments']:print(f"  Arguments:")for arg in prompt['arguments']:arg_name = arg.name if hasattr(arg, 'name') else arg.get('name', '')print(f"    - {arg_name}")async def execute_prompt(self, prompt_name, args):"""Execute a prompt with the given arguments."""session = self.sessions.get(prompt_name)if not session:print(f"Prompt '{prompt_name}' not found.")returntry:result = await session.get_prompt(prompt_name, arguments=args)if result and result.messages:prompt_content = result.messages[0].content# Extract text from content (handles different formats)if isinstance(prompt_content, str):text = prompt_contentelif hasattr(prompt_content, 'text'):text = prompt_content.textelse:# Handle list of content itemstext = " ".join(item.text if hasattr(item, 'text') else str(item) for item in prompt_content)print(f"\nExecuting prompt '{prompt_name}'...")await self.process_query(text)except Exception as e:print(f"Error: {e}")async def chat_loop(self):print("\nMCP Chatbot Started!")print("Type your queries or 'quit' to exit.")print("Use @folders to see available topics")print("Use @<topic> to search papers in that topic")print("Use /prompts to list available prompts")print("Use /prompt <name> <arg1=value1> to execute a prompt")while True:try:query = input("\nQuery: ").strip()if not query:continueif query.lower() == 'quit':break# Check for @resource syntax firstif query.startswith('@'):# Remove @ sign  topic = query[1:]if topic == "folders":resource_uri = "papers://folders"else:resource_uri = f"papers://{topic}"await self.get_resource(resource_uri)continue# Check for /command syntaxif query.startswith('/'):parts = query.split()command = parts[0].lower()if command == '/prompts':await self.list_prompts()elif command == '/prompt':if len(parts) < 2:print("Usage: /prompt <name> <arg1=value1> <arg2=value2>")continueprompt_name = parts[1]args = {}# Parse argumentsfor arg in parts[2:]:if '=' in arg:key, value = arg.split('=', 1)args[key] = valueawait self.execute_prompt(prompt_name, args)else:print(f"Unknown command: {command}")continueawait self.process_query(query)except Exception as e:print(f"\nError: {str(e)}")async def cleanup(self):await self.exit_stack.aclose()async def main():chatbot = MCP_ChatBot()try:await chatbot.connect_to_servers()await chatbot.chat_loop()finally:await chatbot.cleanup()if __name__ == "__main__":asyncio.run(main())

代码解释

这段代码实现了一个基于MCP(Model Context Protocol)的聊天机器人,能够连接到多个MCP服务器,使用它们提供的工具、提示和资源。下面是对代码的详细解释:

类结构与初始化

class MCP_ChatBot:def __init__(self):self.exit_stack = AsyncExitStack()self.client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"),base_url=os.getenv("OPENAI_API_BASE"))# Tools list required for OpenAI APIself.available_tools = []# Prompts list for quick display self.available_prompts = []# Sessions dict maps tool/prompt names or resource URIs to MCP client sessionsself.sessions = {}
  • AsyncExitStack:用于管理多个异步上下文管理器,确保资源正确释放
  • openai.OpenAI:创建OpenAI客户端,使用环境变量中的API密钥和基础URL
  • available_tools:存储可用工具列表,用于OpenAI API的工具调用功能
  • available_prompts:存储可用提示列表,用于快速显示
  • sessions:字典,将工具名称、提示名称或资源URI映射到对应的MCP客户端会话

服务器连接

async def connect_to_server(self, server_name, server_config):# 创建服务器参数、建立连接并初始化会话# 获取可用工具、提示和资源

这个方法负责:

  1. 使用提供的配置创建StdioServerParameters
  2. 通过stdio_client建立与服务器的连接
  3. 创建并初始化ClientSession
  4. 获取服务器提供的工具、提示和资源
  5. 将它们添加到相应的列表和字典中
async def connect_to_servers(self):# 从配置文件加载服务器信息并连接到每个服务器

这个方法从server_config_prompt_resource.json文件加载服务器配置,并为每个服务器调用connect_to_server方法。

查询处理

async def process_query(self, query):# 处理用户查询,使用OpenAI API和可用工具

这个方法是聊天机器人的核心,它:

  1. 创建包含用户查询的消息列表
  2. 调用OpenAI API,传递消息和可用工具
  3. 处理API的响应:
    • 如果有普通文本内容,打印并添加到消息历史
    • 如果有工具调用,执行每个工具调用并将结果添加到消息历史

工具调用的处理流程:

  1. 从响应中提取工具ID、名称和参数
  2. sessions字典中获取对应的会话
  3. 调用工具并获取结果
  4. 将结果添加到消息历史中

资源和提示管理

async def get_resource(self, resource_uri):# 获取并显示指定URI的资源内容

这个方法用于获取和显示资源内容,特别是论文资源。它会:

  1. sessions字典中获取对应的会话
  2. 对于以papers://开头的URI,如果找不到对应会话,会尝试使用任何处理papers的会话
  3. 调用read_resource方法获取资源内容
  4. 打印资源内容
async def list_prompts(self):# 列出所有可用的提示

这个方法列出所有可用的提示及其描述和参数。

async def execute_prompt(self, prompt_name, args):# 执行指定的提示,并处理结果

这个方法执行指定的提示,并将结果传递给process_query方法进行处理。它处理不同格式的提示内容,确保能够正确提取文本。

聊天循环

async def chat_loop(self):# 主聊天循环,处理用户输入

这个方法是聊天机器人的主循环,它:

  1. 打印欢迎信息和使用说明
  2. 循环接收用户输入
  3. 根据输入的不同格式执行不同操作:
    • 如果输入是quit,退出循环
    • 如果输入以@开头,调用get_resource方法获取资源
    • 如果输入以/开头,执行命令(如列出提示或执行提示)
    • 否则,调用process_query方法处理普通查询

资源清理

async def cleanup(self):# 清理资源

这个方法调用exit_stack.aclose()来清理所有资源。

主函数

async def main():chatbot = MCP_ChatBot()try:await chatbot.connect_to_servers()await chatbot.chat_loop()finally:await chatbot.cleanup()

主函数创建MCP_ChatBot实例,连接到服务器,运行聊天循环,并确保在结束时清理资源。

代码运行结果

uv run mcp_chatbot_prompt_resource.py

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述


http://www.hkcw.cn/article/QWXuxuCuNE.shtml

相关文章

性能测试的概念和场景设计

一、什么是性能 性能的定义&#xff1a;性能是指系统或设备在执行特定任务时的时间效率和资源利用情况。可以从两个主要维度来理解&#xff1a; 时间维度&#xff1a; a、响应时间&#xff1a;指系统从接收用户请求到返回结果所需的时间。 b、吞吐量&#xff1a;指单位时间内…

【Java学习笔记】异常

异常&#xff08;Exception&#xff09; 一、基本介绍 在 Java 程序中&#xff0c;将运行中发生的不正常情况称为 “异常”&#xff0c;开发过程中的语法错误和运行时发生的异常情况是不一样的。 二、异常的分类 1. Error&#xff08;错误&#xff09;&#xff1a;Java 虚拟…

50天50个小项目 (Vue3 + Tailwindcss V4) ✨ | Form Wave(表单label波动效果)

&#x1f4c5; 我们继续 50 个小项目挑战&#xff01;—— FormWave组件 仓库地址&#xff1a;https://github.com/SunACong/50-vue-projects 项目预览地址&#xff1a;https://50-vue-projects.vercel.app/ &#x1f3af; 组件目标 构建一个美观、动态的登录表单&#xff0…

数据库系统概论(十六)数据库安全性(安全标准,控制,视图机制,审计与数据加密)

数据库系统概论&#xff08;十六&#xff09;数据库安全性 前言一、数据库安全性1. 什么是数据库安全性&#xff1f;2. 为何会存在安全问题&#xff1f; 二、安全标准的发展1. 早期的“开拓者”&#xff1a;TCSEC标准2. 走向国际统一&#xff1a;CC标准3. TCSEC和CC标准有什么不…

显示即战略:铁电液晶如何成为 “数字中国” 的 “像素基石”?

一、显示技术&#xff1a;数字时代的核心战略支点 &#xff08;一&#xff09;从 “视觉窗口” 到 “战略基础设施” 在数字经济蓬勃发展的当下&#xff0c;显示技术早已超越了单纯的 “视觉呈现” 范畴&#xff0c;成为连接人与数字世界的关键接口。从智能手机、平板电脑到车…

⚡️ Linux grep 命令参数详解

⚡️ Linux grep 用法及参数详解 &#x1f4d8; 1. grep 简介 grep 是 Linux/Unix 系统中用于文本搜索的命令&#xff0c;其全称为 Global Regular Expression Print&#xff0c;意为全局正则表达式打印器。 它根据给定的 模式&#xff08;pattern&#xff09; 对文件或标准…

Rust 变量与可变性

文章目录 变量与可变性常量遮蔽&#xff08;Shadowing&#xff09; 变量与可变性 Rust中变量默认是不可变的&#xff0c;这是 Rust 鼓励你编写更安全、易于并发代码的众多方式之一。不过&#xff0c;你仍然可以选择让变量可变。让我们来探讨 Rust 为什么鼓励你优先使用不可变性…

YOLOV7改进之融合深浅下采样模块(DSD Module)和轻量特征融合模块(LFI Module)

目录 一、研究背景​ 二. 核心创新点​ ​2.1 避免高MAC操作​ ​2.2 DSDM-LFIM主干网络​ 2.3 P2小目标检测分支​ ​3. 代码复现指南​ 环境配置 关键修改点 ​4. 实验结果对比​ 4.1 VisDrone数据集性能 4.2 边缘设备部署 4.3 检测效果可视化 ​5. 应用场景​ …

(Python网络爬虫);抓取B站404页面小漫画

目录 一. 分析网页 二. 准备工作 三. 实现爬虫 1. 抓取工作 2. 分析工作 3. 拼接主函数&运行结果 四. 完整代码清单 1.多线程版本spider.py&#xff1a; 2.异步版本async_spider.py&#xff1a; 经常逛B站的同志们可能知道&#xff0c;B站的404页面做得别具匠心&…

Qt OpenGL 3D 编程入门

Qt 提供了强大的 OpenGL 集成功能&#xff0c;使得在 Qt 应用中实现 3D 图形变得更加简单。以下是使用 Qt 进行 OpenGL 3D 编程的基础知识。 1. 环境配置 创建 Qt 项目 新建 Qt Widgets Application 项目 在 .pro 文件中添加 OpenGL 模块&#xff1a; qmake QT co…

山东警方:10岁失联男童在河道溺亡 搜救行动告终

6月2日,山东滕州市公安局发布警情通报。5月31日22时35分许,警方接到孔某某报警,称其10岁的外孙赵某某于当天17时许离家后失联。接警后,公安机关迅速调阅监控、走访群众,并联合当地政府和社会救援力量,使用搜救警犬和无人机持续开展搜寻工作。6月2日15时许,在邻村一河道内…

韩国大选目前选情如何 李在明领跑选战

韩国第21届总统选举定于6月3日举行,主要候选人仍在抓紧最后机会展开竞选活动,争取更多选票。此次大选在前总统尹锡悦被弹劾后举行,共有7名候选人登记参选,但其中两人已宣布退选,最终有5名候选人参加角逐。这5名候选人分别是共同民主党候选人李在明、国民力量党候选人金文洙…

【深度学习新浪潮】以Dify为例的大模型平台的对比分析

我们从核心功能、适用群体、易用性、可扩展性和安全性五个维度展开对比分析: 一、核心功能对比 平台核心功能多模型支持插件与工具链Dify低代码开发、RAG增强、Agent自律执行、企业级安全支持GPT-4/5、Claude、Llama3、Gemini及开源模型(如Qwen-VL-72B),支持混合模型组合可…

【JAVA】注解+元注解+自定义注解(万字详解)

&#x1f4da;博客主页&#xff1a;代码探秘者 ✨专栏&#xff1a;《JavaSe》 其他更新ing… ❤️感谢大家点赞&#x1f44d;&#x1f3fb;收藏⭐评论✍&#x1f3fb;&#xff0c;您的三连就是我持续更新的动力❤️ &#x1f64f;作者水平有限&#xff0c;欢迎各位大佬指点&…

unity随机生成未知符号教程

目录 前言方法1方法2脚本后言示例代码 前言 在某些游戏中&#xff0c;有一些让人感到意味不明的未知符号&#xff0c;例如在游戏《巴别塔圣歌》中&#xff0c;就有这样一些能让人在初次就看不懂的未知符号。 或者在其他时候&#xff0c;这些未知符号如果跟粒子系统结合在一起的…

OpenCV——Mac系统搭建OpenCV的Java环境

这里写目录标题 一、源码编译安装1.1、下载源码包1.2、cmake安装1.3、java配置1.4、测试 二、Maven引入2.1、添加Maven依赖2.2、加载本地库 一、源码编译安装 1.1、下载源码包 官网下载opencv包&#xff1a;https://opencv.org/releases/ 以4.6.0为例&#xff0c;下载解压后&…

从Docker拉取镜像一直失败超时解决办法

项目场景&#xff1a; 在ubuntu中&#xff0c;使用docker拉去镜像时&#xff0c;一直超时&#xff0c;拉去失败。 问题描述 原因分析&#xff1a; 国外服务器网络不好导致。 解决方案&#xff1a; 解决方案1 设置国内源 我这边测试&#xff0c;更改以后仍然失败 阿里云提供…

告别printf!嵌入式系统高效日志记录方案

目录 1、分级控制与动态过滤机制 2、异步处理与零拷贝架构 3、跨平台适配层设计 在嵌入式系统开发领域&#xff0c;日志记录系统如同数字世界的黑匣子&#xff0c;承载着系统运行状态的关键信息。传统的printf调试方式虽简单易用&#xff0c;但在处理复杂系统时暴露出效率低…

复变函数 $w = z^2$ 的映射图像演示

复变函数 w z 2 w z^2 wz2 的映射图像演示 复变函数 w z 2 w z^2 wz2 是一个基本的二次函数&#xff0c;在复平面上具有有趣的映射性质。下面我将介绍这个函数的映射特性&#xff0c;并使用MATLAB进行可视化演示。 映射特性 极坐标表示&#xff1a;若 z r e i θ z …

【Redis】Zset 有序集合

文章目录 常用命令zaddzcardzcountzrange && zrevrangezrangebyscorezpopmax && bzpopmaxzpopmin && zpopmaxzrank && zrevrankzscorezremzremrangebyrankzremrangebyscorezincrby 集合间操作交集 zinterstore并集 zunionstore 内部编码应用场…