本文最后更新于195 天前,其中的信息可能已经过时,如有错误请发送邮件到big_fw@foxmail.com
📖 前言
Model Context Protocol (MCP) 是一个强大的框架,允许 AI 模型通过标准化的协议调用外部工具和服务。本文将通过一个实际案例——智能新闻分析与邮件发送系统,深入理解如何开发一个完整的 MCP 服务。
🎯 项目概述
我们将构建一个系统,它能够:
- 🔍 搜索 Google 新闻
- 🧠 使用 AI 进行情感分析
- 📧 自动发送邮件报告
技术栈
- MCP 框架:
mcp和fastmcp - AI 模型: 通义千问 (Qwen)
- 新闻 API: Serper API
- 邮件服务: SMTP (QQ 邮箱)
🏗️ 架构设计
┌─────────────┐ ┌──────────────┐ ┌─────────────┐
│ 用户输入 │ ──▶ │ MCP Client │ ──▶ │ MCP Server │
│ 自然语言 │ │ (client.py) │ │ (server.py) │
└─────────────┘ └──────────────┘ └─────────────┘
│ │
│ ├─▶ 🔍 搜索新闻
│ ├─▶ 🧠 情感分析
▼ └─▶ 📧 发送邮件
┌──────────┐
│ AI 模型 │
│ (Qwen) │
└──────────┘
📝 第一步:创建 MCP Server
MCP Server 负责定义和实现具体的工具功能。
1.1 初始化服务器
from mcp.server.fastmcp import FastMCP
from dotenv import load_dotenv
# 加载环境变量
load_dotenv()
# 初始化 MCP 服务器
mcp = FastMCP("NewsServer")
1.2 定义工具:搜索新闻
使用 @mcp.tool() 装饰器定义工具:
import httpx
import json
import os
from datetime import datetime
@mcp.tool()
async def search_google_news(keyword: str) -> str:
"""
使用 Serper API 搜索新闻内容,返回前5条标题、描述和链接。
参数:
keyword (str): 搜索关键词,如 "小米汽车"
返回:
str: JSON 字符串,包含新闻标题、描述、链接
"""
api_key = os.getenv("SERPER_API_KEY")
if not api_key:
return "❌ 未配置 SERPER_API_KEY"
url = "https://google.serper.dev/news"
headers = {
"X-API-KEY": api_key,
"Content-Type": "application/json"
}
payload = {"q": keyword}
async with httpx.AsyncClient() as client:
response = await client.post(url, headers=headers, json=payload)
data = response.json()
if "news" not in data:
return "❌ 未获取到搜索结果"
# 提取前5条新闻
articles = [
{
"title": item.get("title"),
"desc": item.get("snippet"),
"url": item.get("link")
} for item in data["news"][:5]
]
# 保存到本地
output_dir = "./google_news"
os.makedirs(output_dir, exist_ok=True)
filename = f"google_news_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
file_path = os.path.join(output_dir, filename)
with open(file_path, "w", encoding="utf-8") as f:
json.dump(articles, f, ensure_ascii=False, indent=2)
return (
f"✅ 已获取与 [{keyword}] 相关的前5条 Google 新闻:\n"
f"{json.dumps(articles, ensure_ascii=False, indent=2)}\n"
f"📄 已保存到:{file_path}"
)
1.3 定义工具:情感分析
from openai import OpenAI
@mcp.tool()
async def analyze_sentiment(text: str, filename: str) -> str:
"""
对文本进行情感分析,并保存为 Markdown 文件。
参数:
text (str): 新闻描述或文本内容
filename (str): 保存的 Markdown 文件名(不含路径)
返回:
str: 仅返回文件名(用于邮件发送)
"""
# 初始化 AI 客户端
openai_key = os.getenv("DASHSCOPE_API_KEY")
model = os.getenv("MODEL")
client = OpenAI(
api_key=openai_key,
base_url=os.getenv("BASE_URL")
)
# 构造提示词
prompt = f"请对以下新闻内容进行情绪倾向分析,并说明原因:\n\n{text}"
# 调用 AI 模型
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}]
)
result = response.choices[0].message.content.strip()
# 生成 Markdown 报告
markdown = f"""# 舆情分析报告
**分析时间:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
---
## 📥 原始文本
{text}
---
## 📊 分析结果
{result}
"""
# 保存文件
output_dir = "./sentiment_reports"
os.makedirs(output_dir, exist_ok=True)
if not filename:
filename = f"sentiment_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md"
file_path = os.path.join(output_dir, filename)
with open(file_path, "w", encoding="utf-8") as f:
f.write(markdown)
# 只返回文件名,方便后续邮件发送
return filename
1.4 定义工具:发送邮件
import smtplib
from email.message import EmailMessage
@mcp.tool()
async def send_email_with_attachment(
to: str,
subject: str,
body: str,
filename: str
) -> str:
"""
发送带附件的邮件。
参数:
to: 收件人邮箱地址
subject: 邮件标题
body: 邮件正文
filename: 文件名(不含路径)
返回:
邮件发送状态说明
"""
# 获取 SMTP 配置
smtp_server = os.getenv("SMTP_SERVER") # smtp.qq.com
smtp_port = int(os.getenv("SMTP_PORT", 465))
sender_email = os.getenv("EMAIL_USER")
sender_pass = os.getenv("EMAIL_PASS") # QQ 邮箱授权码
# 获取附件文件路径
full_path = os.path.abspath(
os.path.join("./sentiment_reports", filename)
)
if not os.path.exists(full_path):
return f"❌ 附件路径无效,未找到文件: {full_path}"
# 创建邮件
msg = EmailMessage()
msg["Subject"] = subject
msg["From"] = sender_email
msg["To"] = to
msg.set_content(body)
# 添加附件
try:
with open(full_path, "rb") as f:
file_data = f.read()
file_name = os.path.basename(full_path)
msg.add_attachment(
file_data,
maintype="application",
subtype="octet-stream",
filename=file_name
)
except Exception as e:
return f"❌ 附件读取失败: {str(e)}"
# 发送邮件
try:
with smtplib.SMTP_SSL(smtp_server, smtp_port) as server:
server.login(sender_email, sender_pass)
server.send_message(msg)
return f"✅ 邮件已成功发送给 {to},附件: {full_path}"
except Exception as e:
return f"❌ 邮件发送失败: {str(e)}"
1.5 启动服务器
if __name__ == "__main__":
mcp.run(transport='stdio')
🤖 第二步:创建 MCP Client
MCP Client 负责与 AI 模型交互,规划工具调用链。
2.1 初始化客户端
import asyncio
import os
import json
from typing import Optional, List
from contextlib import AsyncExitStack
from datetime import datetime
import re
from openai import OpenAI
from dotenv import load_dotenv
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
load_dotenv()
class MCPClient:
def __init__(self):
self.exit_stack = AsyncExitStack()
self.openai_api_key = os.getenv("DASHSCOPE_API_KEY")
self.base_url = os.getenv("BASE_URL")
self.model = os.getenv("MODEL")
if not self.openai_api_key:
raise ValueError("❌ 未找到 API Key")
self.client = OpenAI(
api_key=self.openai_api_key,
base_url=self.base_url
)
self.session: Optional[ClientSession] = None
2.2 连接到 MCP Server
async def connect_to_server(self, server_script_path: str):
"""连接到 MCP 服务器"""
# 判断服务器脚本类型
is_python = server_script_path.endswith('.py')
is_js = server_script_path.endswith('.js')
if not (is_python or is_js):
raise ValueError("服务器脚本必须是 .py 或 .js 文件")
# 确定启动命令
command = "python" if is_python else "node"
# 构造服务器参数
server_params = StdioServerParameters(
command=command,
args=[server_script_path],
env=None
)
# 启动服务进程
stdio_transport = await self.exit_stack.enter_async_context(
stdio_client(server_params)
)
self.stdio, self.write = stdio_transport
# 创建会话
self.session = await self.exit_stack.enter_async_context(
ClientSession(self.stdio, self.write)
)
# 初始化会话
await self.session.initialize()
# 获取工具列表
response = await self.session.list_tools()
tools = response.tools
print("\n已连接到服务器,支持以下工具:",
[tool.name for tool in tools])
2.3 工具调用规划(核心逻辑)
这是 MCP Client 的核心功能——让 AI 模型规划工具调用链:
async def plan_tool_usage(
self,
query: str,
tools: List[dict]
) -> List[dict]:
"""使用 AI 规划工具调用链"""
print("\n📤 提交给大模型的工具定义:")
print(json.dumps(tools, ensure_ascii=False, indent=2))
# 构造工具列表文本
tool_list_text = "\n".join([
f"- {tool['function']['name']}: "
f"{tool['function']['description']}"
for tool in tools
])
# 构造系统提示词
system_prompt = {
"role": "system",
"content": (
"你是一个智能任务规划助手,用户会给出自然语言请求。\n"
"你只能从以下工具中选择(严格使用工具名称):\n"
f"{tool_list_text}\n\n"
"**重要规则**:\n"
"1. 如果用户要求'搜索新闻并分析并发送邮件',"
"你必须规划3个步骤:\n"
" 步骤1: search_google_news (搜索新闻)\n"
" 步骤2: analyze_sentiment (分析并生成文件)\n"
" 步骤3: send_email_with_attachment (发送邮件)\n"
"2. 后续步骤可以使用 {{工具名}} 来引用上一步的输出\n"
"3. analyze_sentiment 返回的是文件名(不含路径),"
"直接用于 send_email_with_attachment 的 filename 参数\n"
"4. 返回格式必须是纯 JSON 数组,不要添加任何解释\n\n"
"示例输出格式:\n"
'[\n'
' {"name": "search_google_news", '
'"arguments": {"keyword": "小米SU7"}},\n'
' {"name": "analyze_sentiment", '
'"arguments": {"text": "{{search_google_news}}", '
'"filename": "report.md"}},\n'
' {"name": "send_email_with_attachment", '
'"arguments": {"to": "user@qq.com", '
'"subject": "报告", "body": "请查收", '
'"filename": "{{analyze_sentiment}}"}}\n'
']'
)
}
# 调用 AI 模型规划
planning_messages = [
system_prompt,
{"role": "user", "content": query}
]
response = self.client.chat.completions.create(
model=self.model,
messages=planning_messages,
tools=tools,
tool_choice="none"
)
# 提取 JSON 内容
content = response.choices[0].message.content.strip()
match = re.search(r"```(?:json)?\s*([\s\S]+?)\s*```", content)
if match:
json_text = match.group(1)
else:
json_text = content
# 解析并返回计划
try:
plan = json.loads(json_text)
print(f"\n🔧 AI 生成的工具调用计划:")
print(json.dumps(plan, ensure_ascii=False, indent=2))
return plan if isinstance(plan, list) else []
except Exception as e:
print(f"❌ 工具调用链规划失败: {e}\n原始返回: {content}")
return []
2.4 执行工具调用链
async def process_query(self, query: str) -> str:
"""处理用户查询"""
# 获取可用工具
response = await self.session.list_tools()
available_tools = [
{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"input_schema": tool.inputSchema
}
} for tool in response.tools
]
# 生成文件名(用于分析报告)
keyword_match = re.search(
r'(关于|分析|查询|搜索|查看)([^的\s,。、?\n]+)',
query
)
keyword = keyword_match.group(2) if keyword_match else "分析对象"
safe_keyword = re.sub(r'[\\/:*?"<>|]', '', keyword)[:20]
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
md_filename = f"sentiment_{safe_keyword}_{timestamp}.md"
md_path = os.path.join("./sentiment_reports", md_filename)
# 将文件名信息注入查询
query_with_filename = (
f"{query.strip()} "
f"[md_filename={md_filename}] [md_path={md_path}]"
)
# 规划工具调用链
tool_plan = await self.plan_tool_usage(
query_with_filename,
available_tools
)
# 执行工具调用
tool_outputs = {}
messages = [{"role": "user", "content": query_with_filename}]
for step in tool_plan:
tool_name = step["name"]
tool_args = step["arguments"]
# 解析占位符(如 {{search_google_news}})
for key, val in tool_args.items():
if isinstance(val, str) and \
val.startswith("{{") and val.endswith("}}"):
ref_key = val.strip("{} ")
resolved_val = tool_outputs.get(ref_key, val)
tool_args[key] = resolved_val
# 注入文件名
if tool_name == "analyze_sentiment" and \
"filename" not in tool_args:
tool_args["filename"] = md_filename
if tool_name == "send_email_with_attachment" and \
"attachment_path" not in tool_args:
tool_args["attachment_path"] = md_path
# 调用工具
result = await self.session.call_tool(tool_name, tool_args)
# 保存输出
tool_outputs[tool_name] = result.content[0].text
# 将工具输出添加到消息流
# 注意:使用 assistant 角色而非 tool,避免 API 限制
messages.append({
"role": "assistant",
"content": f"[tool:{tool_name} 输出]\n{result.content[0].text}"
})
# 生成最终回复
final_response = self.client.chat.completions.create(
model=self.model,
messages=messages
)
final_output = final_response.choices[0].message.content
# 保存对话记录
def clean_filename(text: str) -> str:
text = text.strip()
text = re.sub(r'[\\/:*?"<>|]', '', text)
return text[:50]
safe_filename = clean_filename(query)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f"{safe_filename}_{timestamp}.txt"
output_dir = "./llm_outputs"
os.makedirs(output_dir, exist_ok=True)
file_path = os.path.join(output_dir, filename)
with open(file_path, "w", encoding="utf-8") as f:
f.write(f"🗣 用户提问:{query}\n\n")
f.write(f"🤖 模型回复:\n{final_output}\n")
print(f"📄 对话记录已保存为:{file_path}")
return final_output
2.5 交互循环
async def chat_loop(self):
"""主交互循环"""
print("\n🤖 MCP 客户端已启动!输入 'quit' 退出")
while True:
try:
query = input("\n你: ").strip()
if query.lower() == 'quit':
break
response = await self.process_query(query)
print(f"\n🤖 AI: {response}")
except Exception as e:
print(f"\n⚠️ 发生错误: {str(e)}")
async def cleanup(self):
"""清理资源"""
await self.exit_stack.aclose()
2.6 主入口
async def main():
server_script_path = os.path.join(
os.path.dirname(__file__),
"server.py"
)
client = MCPClient()
try:
await client.connect_to_server(server_script_path)
await client.chat_loop()
finally:
await client.cleanup()
if __name__ == "__main__":
asyncio.run(main())
⚙️ 第三步:环境配置
3.1 创建 .env 文件
# AI 模型配置(通义千问)
BASE_URL=https://dashscope.aliyuncs.com/compatible-mode/v1
MODEL=qwen-plus
DASHSCOPE_API_KEY=sk-your-api-key-here
# 新闻搜索 API
SERPER_API_KEY=your-serper-api-key
# QQ 邮箱 SMTP 配置
SMTP_SERVER=smtp.qq.com
SMTP_PORT=465
EMAIL_USER=your-email@qq.com
EMAIL_PASS=your-authorization-code
3.2 获取 QQ 邮箱授权码
- 登录 QQ 邮箱网页版
- 设置 → 账户 → 开启 SMTP 服务
- 生成授权码(16位字符)
- 将授权码填入
EMAIL_PASS
3.3 安装依赖
pip install mcp openai python-dotenv httpx
🚀 第四步:运行系统
4.1 启动客户端
python client.py
4.2 使用示例
你: 分析一下小米SU7近期的热点新闻,并发送邮件给 user@qq.com
4.3 执行流程
📤 提交给大模型的工具定义
🔧 AI 生成的工具调用计划:
[
{
"name": "search_google_news",
"arguments": {"keyword": "小米SU7"}
},
{
"name": "analyze_sentiment",
"arguments": {
"text": "{{search_google_news}}",
"filename": "sentiment_小米su7_20251113_213000.md"
}
},
{
"name": "send_email_with_attachment",
"arguments": {
"to": "user@qq.com",
"subject": "小米SU7近期热点新闻情感分析报告",
"body": "您好,这是分析报告,请查收。",
"filename": "{{analyze_sentiment}}"
}
}
]
✅ 已获取新闻...
✅ 已生成分析报告...
✅ 邮件已成功发送...
🎓 核心知识点总结
1. MCP Server 设计要点
- ✅ 使用
@mcp.tool()装饰器定义工具 - ✅ 提供清晰的 docstring(会自动作为工具描述)
- ✅ 参数类型注解必须准确
- ✅ 返回值应该是字符串(便于 AI 理解)
- ✅ 使用
mcp.run(transport='stdio')启动
2. MCP Client 设计要点
- ✅ 通过
StdioServerParameters连接 Server - ✅ 使用
ClientSession管理会话 - ✅ 调用
list_tools()获取可用工具 - ✅ 使用 AI 模型规划工具调用链
- ✅ 处理工具间的数据传递(如
{{tool_name}})
3. 工具调用链规划技巧
关键点:通过精心设计的 system prompt,让 AI 理解:
- 可用工具及其功能
- 工具调用的顺序逻辑
- 数据如何在工具间传递
- 返回格式(JSON 数组)
4. 常见问题与解决方案
| 问题 | 原因 | 解决方案 |
|---|---|---|
role='tool' 错误 | API 要求 tool 消息必须有对应 tool_calls | 改用 role='assistant' 并标注工具名 |
| SMTP 连接超时 | 服务器地址错误 | 确保使用 smtp.qq.com 而非 qq.com |
| 工具未被调用 | AI 没有理解任务 | 强化 system prompt,提供示例 |
| 文件路径问题 | 工具间路径格式不一致 | 统一使用文件名(不含路径) |
🔥 进阶扩展
1. 添加更多工具
@mcp.tool()
async def generate_chart(data: str) -> str:
"""根据数据生成图表"""
# 使用 matplotlib 生成图表
pass
@mcp.tool()
async def translate_text(text: str, target_lang: str) -> str:
"""翻译文本"""
# 调用翻译 API
pass
2. 支持流式响应
async def stream_process_query(self, query: str):
"""流式处理查询"""
async for chunk in self.client.chat.completions.create(
model=self.model,
messages=messages,
stream=True
):
print(chunk.choices[0].delta.content, end="")
3. 添加错误重试机制
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
async def call_tool_with_retry(self, tool_name, tool_args):
"""带重试的工具调用"""
return await self.session.call_tool(tool_name, tool_args)
4. 工具权限控制
class ToolPermissionManager:
def __init__(self):
self.allowed_tools = {
"user1": ["search_google_news", "analyze_sentiment"],
"admin": ["*"] # 所有工具
}
def check_permission(self, user: str, tool_name: str) -> bool:
if "*" in self.allowed_tools.get(user, []):
return True
return tool_name in self.allowed_tools.get(user, [])
📊 性能优化建议
1. 并行执行独立工具
import asyncio
# 如果工具间无依赖,可并行执行
results = await asyncio.gather(
self.session.call_tool("tool1", args1),
self.session.call_tool("tool2", args2)
)
2. 缓存工具结果
from functools import lru_cache
@lru_cache(maxsize=100)
def get_cached_news(keyword: str):
# 缓存新闻搜索结果
pass
3. 限流控制
from aiolimiter import AsyncLimiter
limiter = AsyncLimiter(10, 60) # 每分钟最多10次
async def rate_limited_call(self, tool_name, args):
async with limiter:
return await self.session.call_tool(tool_name, args)
🎯 最佳实践
✅ DO(推荐做法)
- 清晰的工具命名:
search_google_news而非tool1 - 详细的 docstring:包含参数说明、返回值说明
- 统一的错误处理:返回带 ❌ 的错误信息
- 日志记录:记录所有工具调用和结果
- 参数验证:在工具内部验证参数合法性
- 幂等性设计:工具可被重复调用而不产生副作用
❌ DON’T(避免做法)
- 工具功能过于复杂:一个工具只做一件事
- 返回复杂对象:始终返回字符串或简单 JSON
- 硬编码路径:使用环境变量或相对路径
- 忽略异常处理:所有异常都应被捕获并返回友好信息
- 工具间强耦合:工具应独立可测试
🧪 测试建议
单元测试示例
import pytest
from server import search_google_news
@pytest.mark.asyncio
async def test_search_google_news():
result = await search_google_news("测试关键词")
assert "✅" in result or "❌" in result
@pytest.mark.asyncio
async def test_analyze_sentiment():
text = "这是一条正面新闻"
filename = "test_report.md"
result = await analyze_sentiment(text, filename)
assert result == filename
集成测试
@pytest.mark.asyncio
async def test_full_workflow():
client = MCPClient()
await client.connect_to_server("server.py")
query = "搜索并分析测试新闻"
response = await client.process_query(query)
assert "✅" in response
await client.cleanup()
📚 参考资源
- MCP 官方文档: https://modelcontextprotocol.io/
- FastMCP GitHub: https://github.com/jlowin/fastmcp
- 通义千问文档: https://help.aliyun.com/zh/model-studio/
- Serper API: https://serper.dev/
💡 总结
通过本文,我们完整实现了一个 MCP 智能新闻分析系统,涵盖了:
- ✅ MCP Server 的工具定义与实现
- ✅ MCP Client 的工具调用规划
- ✅ AI 模型与工具的协同工作
- ✅ 完整的错误处理与日志记录
- ✅ 实用的邮件发送功能
MCP 框架的核心价值在于:
- 🔌 标准化:统一的工具调用协议
- 🧩 可组合:工具可灵活组合成工作流
- 🤖 AI 驱动:由 AI 自主规划执行路径
- 🚀 可扩展:轻松添加新工具
希望这篇文章能帮助你快速上手 MCP 开发!如有问题,欢迎交流讨论。
作者:Yumo_Neeko 日期:2025年11月13日 标签:#MCP #AI #Python #自动化
📝 附录:完整项目结构
mcp-project/
├── .env # 环境变量配置
├── .gitignore # Git 忽略文件
├── pyproject.toml # 项目依赖
├── README.md # 项目说明
├── client.py # MCP 客户端
├── server.py # MCP 服务器
├── google_news/ # 新闻数据目录
├── sentiment_reports/ # 分析报告目录
└── llm_outputs/ # 对话记录目录
Happy Coding! 🎉









