MCP 协议驱动数据库查询:原理、选型与实践路线
**一、为什么用 MCP 调数据库?**MCP 把“数据库→LLM”这条链路标准化:只需一次对接,即可让任何支持 MCP 的模型调用现有数据。借助流式传输,超长结果集可以分片回写,前端边收边渲染,大幅降低等待时间。细粒度 OAuth 作用域又能把查询权限限定在单张表或单条语句,满足企业合规要求。
二、三层架构:Server / Tool / Client
- MCP Server ― 持久保活数据库连接,暴露查询类 Tool。
- Tool 元数据 ― 用 JSON Schema 声明参数与 SQL 模板,让模型自行填参。
- MCP Client ― 模型或代理,通过
listTools→callTool
完成 SQL 执行并接收流式结果。
三、实现选型
方案 | 特色 & 适用场景 | 维护成本 |
---|---|---|
MCP Toolbox for Databases | 开箱即用,已内置 Postgres / MySQL / BigQuery 等 Source,K8s/Cloud Run 部署模板齐全 | 低 |
FastMCP + SQLAlchemy | 灵活自定义,适合需要业务逻辑拼装或混合 API 调用的后台 | 中 |
mcp-proxy | 零改造把现有 REST/GraphQL 查询接口“转译”为 MCP Tool | 低 |
四、查询模式与最佳实践
- 参数化查询:使用
$1,$2…
占位符可防注入;Tool 层自动绑定参数。 - 模板查询:需要动态插入表名/列名时,可用
{{.tableName}}
模板,但务必限制白名单。 - 流式结果:在 Server 端
yield
每 100 行结果,Client 侧立即渲染,避免一次性加载。
五、端到端开发流程
- 梳理可暴露的 SQL 语句,拆分为独立 Tool。
- 写
tools.yaml
描述kind: postgres-sql / mysql-sql
。 - 选择传输层:本地脚本用 Stdio,远程生产用 Streamable HTTP。
- 配置 OAuth + Scope,把每条 Tool 映射到最小权限。
- 在 LLM 提示词中列出可用 Tool,启用自动调用。
```yaml
# tools.yaml —— 参数化查询示例(PostgreSQL)
tools:
list_recent_orders:
kind: postgres-sql
source: orders-db
statement: |
SELECT * FROM orders
WHERE customer_id = $1
ORDER BY created_at DESC
LIMIT 20
description: >
查询指定客户最近 20 条订单。
parameters:
- name: customer_id
type: string
description: 客户唯一 ID
# server.py —— FastMCP + async SQLAlchemy
from fastmcp import FastMCP
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy import text
import os
DATABASE_URL = os.getenv("DATABASE_URL")
engine = create_async_engine(DATABASE_URL, echo=False)
mcp = FastMCP("DB-Tools", transport="streamable_http", listen="0.0.0.0:8080")
@mcp.tool(name="run_sql", description="执行安全参数化 SQL", parameters={"sql": "string", "args": "array"})
async def run_sql(sql: str, args: list[str]) -> list[dict]:
async with AsyncSession(engine) as session:
result = await session.execute(text(sql), args)
# 按行分块推送回客户端
for row in result.fetchall():
yield dict(row)
if __name__ == "__main__":
mcp.run()
# client.py —— Streamable HTTP 调用
import asyncio
from mcp.client.streamable_http import streamablehttp_client
from mcp import ClientSession
async def main():
async with streamablehttp_client("https://api.example.com/mcp") as (r, w, _):
async with ClientSession(r, w) as s:
await s.initialize()
res = await s.call_tool("list_recent_orders", {"customer_id": "C123"})
async for chunk in res:
print(chunk)
asyncio.run(main())