一、引言
继《万字长文,带你读懂Anthropic MCP》和《MCP框架解读之mark3labs/mcp-go》之后,我们继续来解读golang的另一个MCP框架metoro-io/mcp-golang。老规矩,继续通过实践和源码的方式先解读metoro-io/mcp-golang。
二、metoro-io/mcp-golang示例
就与框架中的一个示例做引子,深入了解metoro-io/mcp-golang: https://github.com/metoro-io/mcp-golang/blob/main/examples/readme_server/readme_server.go
package main
import (
"fmt"
"github.com/metoro-io/mcp-golang"
"github.com/metoro-io/mcp-golang/transport/stdio"
)
// Tool arguments are just structs, annotated with jsonschema tags
// More at https://mcpgolang.com/tools#schema-generation
type Content struct {
Title string `json:"title" jsonschema:"required,description=The title to submit"`
Description *string `json:"description" jsonschema:"description=The description to submit"`
}
type MyFunctionsArguments struct {
Submitter string `json:"submitter" jsonschema:"required,description=The name of the thing calling this tool (openai, google, claude, etc)"`
Content Content `json:"content" jsonschema:"required,description=The content of the message"`
}
func main() {
done := make(chan struct{})
server := mcp_golang.NewServer(stdio.NewStdioServerTransport())
err := server.RegisterTool("hello", "Say hello to a person", func(arguments MyFunctionsArguments) (*mcp_golang.ToolResponse, error) {
return mcp_golang.NewToolResponse(mcp_golang.NewTextContent(fmt.Sprintf("Hello, %server!", arguments.Submitter))), nil
})
if err != nil {
panic(err)
}
err = server.RegisterPrompt("promt_test", "This is a test prompt", func(arguments Content) (*mcp_golang.PromptResponse, error) {
return mcp_golang.NewPromptResponse("description", mcp_golang.NewPromptMessage(mcp_golang.NewTextContent(fmt.Sprintf("Hello, %server!", arguments.Title)), mcp_golang.RoleUser)), nil
})
if err != nil {
panic(err)
}
err = server.RegisterResource("test://resource", "resource_test", "This is a test resource", "application/json", func() (*mcp_golang.ResourceResponse, error) {
return mcp_golang.NewResourceResponse(mcp_golang.NewTextEmbeddedResource("test://resource", "This is a test resource", "application/json")), nil
})
err = server.Serve()
if err != nil {
panic(err)
}
<-done
}
搭建联调环境:
$ go build -v
$ npx -y @modelcontextprotocol/inspector ./readme_server
三、 框架流程和源码解读
metoro-io/mcp-golang框架的源码主要是围绕着Server、Protocol和Transport实现之间的关系展开。
- Server 利用 Transport 和 Protocol 提供服务,监听客户端请求,执行业务逻辑,并发送响应。
- Protocol 位于 Transport 之上,负责消息的解析、处理以及响应的生成,定义通信的规则和消息的格式。
- Transport 负责底层的通信机制,提供消息的发送和接收能力。
3.1 Server
Server 是对外提供服务的实体,会监听来自客户端的请求,通过 Transport 接收这些请求,然后利用 Protocol 来解析请求、执行相应的业务逻辑,并通过 Transport 发送响应。 在典型的流程中:
- Server 会创建一个或多个 Transport 实例来监听不同的端口或端点。
- 当 Transport 接收到消息时,它会调用 Protocol 来处理这些消息。
- Protocol 解析消息后,可能会调用 Server 注册的处理器(handler)来执行具体的业务逻辑。
- 业务逻辑执行完成后,Protocol 会生成响应,并通过 Transport 发送回客户端。
3.1.1 结构体
Server 结构体是 框架中的一个核心组件,它定义了一个服务器实例,用于处理来自客户端的请求。通过该结构体,服务器能够管理多种资源(如工具、提示和资源),并提供相应的接口供客户端调用。
type Server struct {
// 表示服务器当前是否正在运行
isRunning bool
// 表示服务器使用的传输层,负责实际的消息发送和接收
// 例如,stdio传输将从标准输入读取数据并写入标准输出
// SSE传输将通过SSE连接发送消息并通过HTTP POST请求接收消息
transport transport.Transport
// 表示服务器使用的协议,负责处理消息的格式和通信逻辑
protocol *protocol.Protocol
// 分页限制,用于控制返回结果集的大小
paginationLimit *int
// 存储服务器注册的工具,工具是服务器提供的一种功能
// 客户端可以通过调用工具来执行特定任务
tools *datastructures.SyncMap[string, *tool]
// 存储服务器注册的提示,提示是服务器提供的一种交互方式
// 客户端可以通过提示向用户请求输入
prompts *datastructures.SyncMap[string, *prompt]
// 存储服务器注册的资源,资源是服务器提供的一种静态或动态内容
// 客户端可以通过请求资源来获取所需数据
resources *datastructures.SyncMap[string, *resource]
// 存储服务器的指令信息,指令信息可以包含有关如何使用服务器的指导或说明
serverInstructions *string
// 存储服务器的名称,用于标识服务器
serverName string
// 存储服务器的版本号,用于标识服务器的当前版本
serverVersion string
}
3.1.2 创建Server实例
进入server.go文件继续查看NewServer()函数:
func NewServer(transport transport.Transport, options ...ServerOptions) *Server {
// 创建一个新的Server实例,并初始化其协议、传输方式、工具、提示和资源存储
server := &Server{
protocol: protocol.NewProtocol(nil), // 初始化协议
transport: transport, // 设置传输方式
tools: new(datastructures.SyncMap[string, *tool]), // 初始化工具存储
prompts: new(datastructures.SyncMap[string, *prompt]), // 初始化提示存储
resources: new(datastructures.SyncMap[string, *resource]), // 初始化资源存储
}
// 遍历传入的选项,并对Server实例进行配置
for _, option := range options {
option(server)
}
// 返回配置好的Server实例
return server
}
又是经典Option模式,ServerOptions有哪些功能,就需要我们好好看看了: | | | | | ------------ | ------------ | ------------ | | WithProtocol | 设置服务器的协议 |WithProtocol(protocol *protocol.Protocol) | | WithPaginationLimit |设置分页限制(目前不支持)| WithPaginationLimit(limit int)| | WithName |设置服务器名称 | WithName(name string)| | WithVersion |设置服务器版本 | WithVersion(version string)|
MCP协议在2024-11-24的版本还只是支持stdio和SSE,框架0.8版本也是只有这两个传输方式。
3.1.3 RegisterTool
框架提供RegisterTool()方法进行Tool的添加:
func (s *Server) RegisterTool(name string, description string, handler any) error {
// 首先验证提供的处理器函数是否有效
err := validateToolHandler(handler)
if err != nil {
// 如果验证失败,则返回验证错误
return err
}
// 从处理器函数创建 JSON Schema,用于描述工具的输入参数
inputSchema := createJsonSchemaFromHandler(handler)
// 将新工具存储到服务器的工具映射中
// 创建一个新的 tool 实例,并设置其名称、描述、处理器和输入 Schema
s.tools.Store(name, &tool{
Name: name, // 工具名称
Description: description, // 工具描述
Handler: createWrappedToolHandler(handler), // 包装后的处理器函数
ToolInputSchema: inputSchema, // 工具的输入 Schema
})
// 发送工具列表已更改的通知
// 如果服务器正在运行,则通过协议发送通知
return s.sendToolListChangedNotification()
}
其中,handler是处理工具请求的方法,需要注意类型是any,说明任何类型都可以入进来,那么框架如何处理呢?继续看到validateToolHandler函数:
// validateToolHandler 函数用于验证工具处理器函数是否符合要求
func validateToolHandler(handler any) error {
// 通过反射获取处理器函数的值和类型
handlerValue := reflect.ValueOf(handler)
handlerType := handlerValue.Type()
// 检查处理器函数的输入参数数量
// 我们允许处理器函数可选地接受一个 context.Context 作为第一个参数
if handlerType.NumIn() != 1 && handlerType.NumIn() != 2 {
// 如果输入参数数量不是 1 个或 2 个,则返回错误
return fmt.Errorf("handler must take exactly one or two arguments, got %d", handlerType.NumIn())
}
// 检查处理器函数的返回值数量
if handlerType.NumOut() != 2 {
// 如果返回值数量不是 2 个,则返回错误
return fmt.Errorf("handler must return exactly two values, got %d", handlerType.NumOut())
}
// 如果处理器函数接受两个参数
if handlerType.NumIn() == 2 {
// 检查第一个参数是否为 context.Context 类型
if handlerType.In(0) != reflect.TypeOf((*context.Context)(nil)).Elem() {
// 如果第一个参数不是 context.Context 类型,则返回错误
return fmt.Errorf("when a handler has 2 arguments, handler must take context.Context as the first argument, got %s", handlerType.In(0).Name())
}
}
// 检查第一个返回值是否为 *tools.ToolResponse 类型
if handlerType.Out(0) != reflect.PointerTo(reflect.TypeOf(ToolResponse{})) {
// 如果第一个返回值不是 *tools.ToolResponse 类型,则返回错误
return fmt.Errorf("handler must return *tools.ToolResponse, got %s", handlerType.Out(0).Name())
}
// 检查第二个返回值是否为 error 类型
if handlerType.Out(1) != reflect.TypeOf((*error)(nil)).Elem() {
// 如果第二个返回值不是 error 类型,则返回错误
return fmt.Errorf("handler must return error, got %s", handlerType.Out(1).Name())
}
// 如果所有检查都通过,则返回 nil 表示没有错误
return nil
}
handler虽然给的是any类型,但是在validateToolHandler函数要求handle需要满足:
- 必须是函数
- 该函数入参1-2个参数:
- 一个入参:参数必须是结构体
- 两个入参:第一个必须是ctx,第二个参数也必须是结构体。
- 该函数返回值必须有两个:
- 第一个为:mcp_golang.ToolResponse
- 第二个为:error。 根据校验要求,handle函数格式如下:
func([ctx context.Context,] args HelloArgs) (*mcp.ToolResponse, error) {
message := fmt.Sprintf("Hello, %s!", args.Name)
return mcp.NewToolResponse(mcp.NewTextContent(message)), nil
}
其中error的返回,如果不为空,MCP的响应字段isError=true。 createWrappedToolHandler函数的功能是将handle的返回结果进行JSON-RPC 2.0协议的响应序列化。
3.1.4 RegisterResource
RegisterResource 在服务器上注册一个新的资源处理器。
func (s *Server) RegisterResource(uri string, name string, description string, mimeType string, handler any) error {
// 验证资源处理器函数是否符合要求
err := validateResourceHandler(handler)
if err != nil {
// 如果验证失败,则触发 panic(这里也可以选择返回错误,但原代码使用了 panic)
panic(err)
}
// 创建一个新的 resource 实例,并设置其名称、描述、URI、MIME 类型和处理器函数
// 处理器函数被包装为 createWrappedResourceHandler 返回的函数,以便处理请求时调用
s.resources.Store(uri, &resource{
Name: name,
Description: description,
Uri: uri,
mimeType: mimeType,
Handler: createWrappedResourceHandler(handler),
})
// 发送资源列表已更改的通知
// 如果服务器正在运行,则通过协议发送通知
return s.sendResourceListChangedNotification()
}
handler也是any类型,继续通过代码中的validateResourceHandler函数来看看对handle的要求:
// validateResourceHandler 函数用于验证资源处理器函数是否符合要求
func validateResourceHandler(handler any) error {
// 通过反射获取处理器函数的值和类型
handlerValue := reflect.ValueOf(handler)
handlerType := handlerValue.Type()
// 检查处理器函数的输入参数数量
// 处理器函数可以接受零个或一个参数
if handlerType.NumIn() != 0 && handlerType.NumIn() != 1 {
// 如果参数数量不是 0 个或 1 个,则返回错误
return fmt.Errorf("handler must take no or one arguments, got %d", handlerType.NumIn())
}
// 如果处理器函数接受一个参数
if handlerType.NumIn() == 1 {
// 检查该参数是否为 context.Context 类型
if handlerType.In(0) != reflect.TypeOf((*context.Context)(nil)).Elem() {
// 如果参数不是 context.Context 类型,则返回错误
return fmt.Errorf("when a handler has 1 argument, it must be context.Context, got %s", handlerType.In(0).Name())
}
}
// 检查处理器函数的返回值数量
// 处理器函数必须返回两个值
if handlerType.NumOut() != 2 {
// 如果返回值数量不是 2 个,则返回错误
return fmt.Errorf("handler must return exactly two values, got %d", handlerType.NumOut())
}
// 以下两行代码被注释掉了,可能是因为在某些情况下不需要严格检查返回值的类型
// 或者这些检查在其他地方进行
/*
if handlerType.Out(0) != reflect.TypeOf((*ResourceResponse)(nil)).Elem() {
return fmt.Errorf("handler must return ResourceResponse, got %s", handlerType.Out(0).Name())
}
if handlerType.Out(1) != reflect.TypeOf((*error)(nil)).Elem() {
return fmt.Errorf("handler must return error, got %s", handlerType.Out(1).Name())
}
*/
// 如果所有检查都通过,则返回 nil 表示没有错误
return nil
}
对handle的要求为:
- 必须为函数;
- 该函数可以无入参也可以一个入参:
- 无入参
- 一个入参,那该参数只能是context.Context
- 该函数必须有两个返回值(原本的返回类型校验被注释) 根据校验要求,handle函数格式如下:
func([ctx context.Context]) (*mcp_golang.ResourceResponse, error) {
return mcp_golang.NewResourceResponse(mcp_golang.NewTextEmbeddedResource("test://resource", "This is a test resource", "application/json")), nil
}
createWrappedResourceHandler也是将Resource的handle函数进行序列化的响应。
3.1.5 RegisterPrompt
RegisterPrompt 在服务器上注册一个新的提示处理器。
// RegisterPrompt 在服务器上注册一个新的提示处理器
func (s *Server) RegisterPrompt(name string, description string, handler any) error {
// 验证提示处理器函数是否符合要求
err := validatePromptHandler(handler)
if err != nil {
// 如果验证失败,则返回错误
return err
}
// 从处理器函数创建提示输入参数的 JSON Schema
promptSchema := createPromptSchemaFromHandler(handler)
// 将新提示处理器存储到服务器的提示映射中
// 创建一个新的 prompt 实例,并设置其名称、描述、处理器和输入 Schema
s.prompts.Store(name, &prompt{
Name: name, // 提示处理器名称
Description: description, // 提示处理器描述
Handler: createWrappedPromptHandler(handler), // 包装后的处理器函数
PromptInputSchema: promptSchema, // 提示输入参数的 Schema
})
// 发送提示列表已更改的通知
// 如果服务器正在运行,则通过协议发送通知
return s.sendPromptListChangedNotification()
}
一样的方式继续查看validatePromptHandler函数来了解handle:
// 提示处理器只能接受字段类型为 string 或 *string 的结构体作为参数
func validatePromptHandler(handler any) error {
// 通过反射获取处理器函数的值和类型
handlerValue := reflect.ValueOf(handler)
handlerType := handlerValue.Type()
// 定义参数类型变量
var argumentType reflect.Type
// 检查处理器函数的输入参数数量
if handlerType.NumIn() == 2 {
// 如果处理器函数有两个参数
// 第一个参数必须是 context.Context 类型
if handlerType.In(0) != reflect.TypeOf((*context.Context)(nil)).Elem() {
return fmt.Errorf("when a handler has 2 arguments, the first argument must be context.Context, got %s", handlerType.In(0).Name())
}
// 第二个参数作为实际的参数类型
argumentType = handlerType.In(1)
} else if handlerType.NumIn() == 1 {
// 如果处理器函数有一个参数
// 该参数作为实际的参数类型
argumentType = handlerType.In(0)
} else {
// 如果参数数量不是 1 个或 2 个,返回错误
return fmt.Errorf("handler must take one or two arguments, got %d", handlerType.NumIn())
}
// 检查参数类型是否为结构体
if argumentType.Kind() != reflect.Struct {
return fmt.Errorf("argument must be a struct")
}
// 遍历结构体的所有字段
for i := 0; i < argumentType.NumField(); i++ {
field := argumentType.Field(i)
isValid := false
// 检查字段类型是否为 string
if field.Type.Kind() == reflect.String {
isValid = true
}
// 检查字段类型是否为 *string
if field.Type.Kind() == reflect.Ptr && field.Type.Elem().Kind() == reflect.String {
isValid = true
}
// 如果字段类型既不是 string 也不是 *string,返回错误
if !isValid {
return fmt.Errorf("all fields of the struct must be of type string or *string, found %s", field.Type.Kind())
}
}
// 如果所有检查都通过,返回 nil 表示没有错误
return nil
}
handle的要求如下:
- 必须是函数
- 该函数入参1-2个参数:
- 一个入参:参数必须是结构体
- 两个入参:第一个必须是ctx,第二个参数也必须是结构体。
- 该函数的入参结构体的字段只能是string或*string类型。
- 该函数的返回值没有要求,不过建议返回(*mcp_golang.PromptResponse, error)
type Content struct {
Title string `json:"title" jsonschema:"required,description=The title to submit"`
Description *string `json:"description" jsonschema:"description=The description to submit"`
}
func([ctx context.Context,]arguments Content) (*PromptResponse, error) {
return NewPromptResponse("description", NewPromptMessage(NewTextContent(fmt.Sprintf("Hello, %server!", "arguments.Title")), RoleUser)), nil
}
除此之外Tool、Resource和Prompt都有提供注销的能力:
- DeregisterTool
- DeregisterResource
- DeregisterPrompt 类似的还有判断是否将Tool、Resource和Prompt注册的能力:
- CheckToolRegistered
- CheckResourceRegistered
- CheckPromptRegistered
3.1.6 Serve
Serve 方法是 Server 类型的一个方法,用于启动服务器并处理客户端请求。
func (s *Server) Serve() error {
// 如果服务器已经在运行,则返回错误
if s.isRunning {
return fmt.Errorf("server is already running")
}
// 获取服务器的协议对象
pr := s.protocol
// 设置协议处理器,将特定的请求路径映射到相应的处理函数
pr.SetRequestHandler("ping", s.handlePing)
pr.SetRequestHandler("initialize", s.handleInitialize)
pr.SetRequestHandler("tools/list", s.handleListTools)
pr.SetRequestHandler("tools/call", s.handleToolCalls)
pr.SetRequestHandler("prompts/list", s.handleListPrompts)
pr.SetRequestHandler("prompts/get", s.handlePromptCalls)
pr.SetRequestHandler("resources/list", s.handleListResources)
pr.SetRequestHandler("resources/read", s.handleResourceCalls)
// 使用传输层连接协议
err := pr.Connect(s.transport)
if err != nil {
// 如果连接失败,返回错误
return err
}
// 更新服务器的协议对象(虽然在这里看起来是多余的,因为 pr 本身就是 s.protocol)
s.protocol = pr
// 设置服务器运行状态为正在运行
s.isRunning = true
// 返回 nil 表示服务器启动成功
return nil
}
Serve 方法中,我们就可以看到Server将Protocol和Transport组合合并起来,提供不同的能力,可灵活搭配使用。 Protocol.SetRequestHandler是有着路由映射功能,method与各个handler绑定起来,下面可以看看框架中提供的handle方法。
3.1.7 Handler
我们就以几个handle函数进行分析:
handleToolCalls
func (s *Server) handleToolCalls(ctx context.Context, req *transport.BaseJSONRPCRequest, _ protocol.RequestHandlerExtra) (transport.JsonRpcBody, error) {
// 定义一个 baseCallToolRequestParams 类型的变量 params,用于存储解析后的请求参数
params := baseCallToolRequestParams{}
// 将请求参数(JSON 格式)解析到 params 变量中
// Instantiate a struct of the type of the arguments
err := json.Unmarshal(req.Params, ¶ms)
if err != nil {
// 如果解析失败,返回错误,提示无法解析参数
return nil, errors.Wrap(err, "failed to unmarshal arguments")
}
// 定义一个指向 tool 类型的指针变量 toolToUse,用于存储要调用的工具
var toolToUse *tool
// 遍历服务器中注册的所有工具
s.tools.Range(func(k string, t *tool) bool {
// 如果工具名称与请求参数中的名称不匹配,继续遍历
if k != params.Name {
return true
}
// 如果找到匹配的工具,将其赋值给 toolToUse 变量,并终止遍历
toolToUse = t
return false
})
// 检查是否找到了匹配的工具
if toolToUse == nil {
// 如果没有找到,返回错误,提示未知工具
return nil, errors.Wrapf(err, "unknown tool: %s", req.Method)
}
// 调用找到的工具的处理函数,并返回结果
return toolToUse.Handler(ctx, params), nil
}
这个函数使用的是s.tools.Range,而不是更加高效的s.tools.Load。读者们可能会好奇为什么会这样?这是一个MCP的Tool的集合一般不会太大,此时range的性能会比load的性能好。但是map集合大了,那就是反过来了。
handleListTools
handleListTools用于处理列出工具列表的请求。
func (s *Server) handleListTools(ctx context.Context, request *transport.BaseJSONRPCRequest, _ protocol.RequestHandlerExtra) (transport.JsonRpcBody, error) {
// 定义一个结构体 toolRequestParams,用于存储请求参数中的游标
type toolRequestParams struct {
Cursor *string `json:"cursor"`
}
// 声明一个 toolRequestParams 类型的变量 params,用于存储解析后的请求参数
var params toolRequestParams
// 检查请求参数是否为空
if request.Params == nil {
// 如果为空,则初始化一个空的 toolRequestParams 结构体
params = toolRequestParams{}
} else {
// 否则,将请求参数解析到 params 变量中
err := json.Unmarshal(request.Params, ¶ms)
if err != nil {
// 如果解析失败,返回错误,提示无法解析参数
return nil, errors.Wrap(err, "failed to unmarshal arguments")
}
}
// 声明一个指向 tool 类型的指针切片 orderedTools,用于存储排序后的工具列表
var orderedTools []*tool
// 遍历服务器中注册的所有工具
s.tools.Range(func(k string, t *tool) bool {
// 将每个工具添加到 orderedTools 切片中
orderedTools = append(orderedTools, t)
// 继续遍历
return true
})
// 对工具列表按名称进行排序,以便实现分页
sort.Slice(orderedTools, func(i, j int) bool {
return orderedTools[i].Name < orderedTools[j].Name
})
// 声明一个变量 startPosition,用于记录分页的起始位置
startPosition := 0
// 检查请求参数中是否包含游标
if params.Cursor != nil {
// 如果包含游标,则对游标进行 Base64 解码
c, err := base64.StdEncoding.DecodeString(*params.Cursor)
if err != nil {
// 如果解码失败,返回错误,提示无法解码游标
return nil, errors.Wrap(err, "failed to decode cursor")
}
// 将解码后的游标转换为字符串
cString := string(c)
// 在排序后的工具列表中查找大于游标的工具
found := false
for i := 0; i < len(orderedTools); i++ {
if orderedTools[i].Name > cString {
// 如果找到,则更新 startPosition 为该工具的索引
startPosition = i
found = true
// 退出循环
break
}
}
// 如果没有找到大于游标的工具,则将 startPosition 设置为工具列表的长度
if !found {
startPosition = len(orderedTools)
}
}
// 声明一个变量 endPosition,用于记录分页的结束位置
endPosition := len(orderedTools)
// 检查服务器是否设置了分页限制
if s.paginationLimit != nil {
// 如果设置了分页限制,则计算结束位置
// 确保结束位置不超过工具列表的长度
if len(orderedTools) > startPosition+*s.paginationLimit {
endPosition = startPosition + *s.paginationLimit
}
}
// 声明一个 ToolRetType 类型的切片 toolsToReturn,用于存储要返回的工具列表
toolsToReturn := make([]ToolRetType, 0)
// 根据起始位置和结束位置,将工具添加到 toolsToReturn 切片中
for i := startPosition; i < endPosition; i++ {
toolsToReturn = append(toolsToReturn, ToolRetType{
Name: orderedTools[i].Name,
Description: &orderedTools[i].Description,
InputSchema: orderedTools[i].ToolInputSchema,
})
}
// 返回 ToolsResponse 结构体,包含工具列表和下一个游标(如果有的话)
return ToolsResponse{
Tools: toolsToReturn,
NextCursor: func() *string {
// 如果设置了分页限制,并且返回的工具列表长度达到了分页限制
// 则计算下一个游标(即最后一个工具的名称的 Base64 编码)
if s.paginationLimit != nil && len(toolsToReturn) >= *s.paginationLimit {
toString := base64.StdEncoding.EncodeToString([]byte(toolsToReturn[len(toolsToReturn)-1].Name))
return &toString
}
// 否则,返回 nil
return nil
}(),
}, nil
}
需要关注的是方法的分页能力,构造的ToolsResponse 结构体,包含工具列表 toolsToReturn 和下一个游标:
- 如果设置了分页限制,并且返回的工具列表长度达到了分页限制,则计算下一个游标(即最后一个工具的名称的 Base64 编码)。
- 否则,下一个游标为 nil。 其他的handle方法各位读者可以自行参看。
3.2 Protocol
Protocol 通常负责定义和解释消息的格式以及通信的规则。在 MCP(消息通信协议)的上下文中,Protocol 可能会处理 JSON-RPC 消息的序列化和反序列化、方法的调度、响应的生成等。它位于 Transport 之上,利用 Transport 提供的通信能力,实现更高层次的消息交换逻辑。 Protocol 会与 Transport 交互,通过 Transport 发送和接收原始消息,然后对这些消息进行解析和处理,最终生成响应或触发相应的业务逻辑。
3.2.1 结构体
type Protocol struct {
transport transport.Transport // 传输层
options *ProtocolOptions // 协议选项
requestMessageID transport.RequestId // 请求消息ID
mu sync.RWMutex // 读写锁,保护共享状态
// 方法名到请求处理程序的映射
requestHandlers map[string]func(context.Context, *transport.BaseJSONRPCRequest, RequestHandlerExtra) (transport.JsonRpcBody, error)
// 请求ID到取消函数的映射
requestCancellers map[transport.RequestId]context.CancelFunc
// 方法名到通知处理程序的映射
notificationHandlers map[string]func(notification *transport.BaseJSONRPCNotification) error
// 消息ID到响应处理程序的映射
responseHandlers map[transport.RequestId]chan *responseEnvelope
// 消息ID到进度处理程序的映射
progressHandlers map[transport.RequestId]ProgressCallback
// 连接因任何原因关闭时的回调
OnClose func()
// 发生错误时的回调
OnError func(error)
// 未安装自己的处理程序时调用的请求处理程序
FallbackRequestHandler func(ctx context.Context, request *transport.BaseJSONRPCRequest) (transport.JsonRpcBody, error)
// 未安装自己的处理程序时调用的通知处理程序
FallbackNotificationHandler func(notification *transport.BaseJSONRPCNotification) error
}
3.2.2 创建Protocol实例
NewProtocol 函数用于创建一个新的 Protocol 实例,并初始化其内部字段和默认的通知处理程序.
func NewProtocol(options *ProtocolOptions) *Protocol {
// 初始化 Protocol 结构体,并分配内存
p := &Protocol{
options: options, // 设置协议选项,Tool、Resource和Prompt列表变化通知
requestHandlers: make(map[string]func(context.Context, *transport.BaseJSONRPCRequest, RequestHandlerExtra) (transport.JsonRpcBody, error)), // 初始化请求处理程序映射
requestCancellers: make(map[transport.RequestId]context.CancelFunc), // 初始化请求取消函数映射
notificationHandlers: make(map[string]func(*transport.BaseJSONRPCNotification) error), // 初始化通知处理程序映射
responseHandlers: make(map[transport.RequestId]chan *responseEnvelope), // 初始化响应处理程序映射
progressHandlers: make(map[transport.RequestId]ProgressCallback), // 初始化进度处理程序映射
}
// 设置默认的通知处理程序
p.SetNotificationHandler("notifications/cancelled", p.handleCancelledNotification) // 处理取消通知
p.SetNotificationHandler("$/progress", p.handleProgressNotification) // 处理进度通知
return p // 返回初始化后的 Protocol 实例
}
3.2.3 Connect
Connect 函数用于将 Protocol 实例连接到给定的传输层(transport.Transport),并启动传输层以开始监听消息。
func (p *Protocol) Connect(tr transport.Transport) error {
p.transport = tr // 设置协议的传输层
// 设置连接关闭时的处理函数
tr.SetCloseHandler(func() {
p.handleClose()
})
// 设置错误处理函数
tr.SetErrorHandler(func(err error) {
p.handleError(err)
})
// 设置消息处理函数
tr.SetMessageHandler(func(ctx context.Context, message *transport.BaseJsonRpcMessage) {
// 根据消息类型进行分发处理
switch m := message.Type; {
case m == transport.BaseMessageTypeJSONRPCRequestType:
// 处理 JSON-RPC 请求
p.handleRequest(ctx, message.JsonRpcRequest)
case m == transport.BaseMessageTypeJSONRPCNotificationType:
// 处理 JSON-RPC 通知
p.handleNotification(message.JsonRpcNotification)
case m == transport.BaseMessageTypeJSONRPCResponseType:
// 处理 JSON-RPC 响应
p.handleResponse(message.JsonRpcResponse, nil)
case m == transport.BaseMessageTypeJSONRPCErrorType:
// 处理 JSON-RPC 错误
p.handleResponse(nil, message.JsonRpcError)
}
})
// 启动传输层,使其开始监听和处理消息
return tr.Start(context.Background())
}
从之前的Server.Serve调用了Protocol.Connect 函数,而在Protocol.Connect 函数中调用了Transport.Start。
- 在Server.Serve中设置了JSON-RPC中的method对应的handle函数,initialize、tools/list、tools/call和prompts/list等。
- Protocol.Connect 函数设置了request、response、notification和error的handle函数。Transport就根据输入的JSON数据映射到不同的handle进行处理。 从而将Server、Protocol和Transport这个基础组件组合起来。
3.2.4 handle
Protocol中的其他handle函数基本上都是从Protocol的各种map中获取到对应的handle函数进行调用,或者对map中的handle进行增删操作。我们以handleRequest为例:
func (p *Protocol) handleRequest(ctx context.Context, request *transport.BaseJSONRPCRequest) {
// 加读锁保护共享资源,获取对应方法名的请求处理函数
p.mu.RLock()
handler := p.requestHandlers[request.Method]
if handler == nil {
// 如果没有找到对应的处理函数,则使用回退处理函数
handler = func(ctx context.Context, req *transport.BaseJSONRPCRequest, extra RequestHandlerExtra) (transport.JsonRpcBody, error) {
if p.FallbackRequestHandler != nil {
return p.FallbackRequestHandler(ctx, req)
}
// 如果没有回退处理函数,则打印错误信息并返回方法未找到的错误
println("no handler for method and no default handler:", req.Method)
return nil, fmt.Errorf("method not found: %s", req.Method)
}
}
p.mu.RUnlock()
// 创建一个可取消的上下文,用于请求取消
ctx, cancel := context.WithCancel(ctx)
// 加写锁保护共享资源,将取消函数与请求ID关联
p.mu.Lock()
p.requestCancellers[request.Id] = cancel
p.mu.Unlock()
// 在新的goroutine中处理请求
go func() {
// 确保在处理结束后删除取消函数并调用取消操作
defer func() {
p.mu.Lock()
delete(p.requestCancellers, request.Id)
p.mu.Unlock()
cancel()
}()
// 调用请求处理函数,获取处理结果或错误
result, err := handler(ctx, request, RequestHandlerExtra{Context: ctx})
if err != nil {
// 如果处理过程中发生错误,则打印错误信息并发送错误响应
println("error:", err.Error())
p.sendErrorResponse(request.Id, err)
return
}
// 将处理结果序列化为JSON
jsonResult, err := json.Marshal(result)
if err != nil {
// 如果序列化过程中发生错误,则打印错误信息并发送错误响应
println("error:", err.Error())
p.sendErrorResponse(request.Id, fmt.Errorf("failed to marshal result: %w", err))
return
}
// 创建响应消息
response := &transport.BaseJSONRPCResponse{
Jsonrpc: "2.0",
Id: request.Id,
Result: jsonResult,
}
// 发送响应消息
if err := p.transport.Send(ctx, transport.NewBaseMessageResponse(response)); err != nil {
// 如果发送过程中发生错误,则打印错误信息并处理错误
println("error:", err.Error())
p.handleError(fmt.Errorf("failed to send response: %w", err))
}
}()
}
还记得之前说到的err != nil的时候,MCP的响应字段isError=true。就是在这个这些handle函数中实现的。err != nil调用的是Protocol.sendErrorResponse函数。err == nil,调用的是Transport.Send函数。其他的handle函数都比较简单,有兴趣的同学自行查看即可。
3.3 Transport
Transport 接口描述了一个 MCP 传输层所需的最小契约,客户端或服务器可以通过它进行通信。Transport 的具体实现(如 HTTP 传输、STDIO 传输等)将负责底层的通信细节,使得上层应用(如 Server 和 Protocol)可以专注于业务逻辑而无需关心底层的传输机制。
type Transport interface {
// Start 开始在传输层上处理消息,包括可能需要执行的任何连接步骤。
//
// 此方法应在安装回调之后调用,否则可能会丢失消息。
//
// 注意:当使用 Client、Server 或 Protocol 类时,不需要显式调用此方法,
// 因为它们会隐式调用 start()。
Start(ctx context.Context) error
// Send 发送一个 JSON-RPC 消息(请求、通知或响应)。
//
// 参数 ctx 提供了操作的上下文,message 是要发送的 JSON-RPC 消息。
Send(ctx context.Context, message *BaseJsonRpcMessage) error
// Close 关闭连接。
//
// 调用此方法后,传输层将不再处理任何消息。
Close() error
// SetCloseHandler 设置连接因任何原因关闭时的回调。
//
// 当调用 Close() 时,也应触发此回调。
SetCloseHandler(handler func())
// SetErrorHandler 设置发生错误时的回调。
//
// 注意,错误不一定是致命的;它们用于报告任何类型的异常状况。
SetErrorHandler(handler func(error))
// SetMessageHandler 设置在连接上接收到消息(请求、通知或响应)时的回调。
//
// 此回调接收部分反序列化后的 BaseJsonRpcMessage 对象。
SetMessageHandler(handler func(ctx context.Context, message *BaseJsonRpcMessage))
}
metoro-io/mcp-golang框架中Transport 接口有5个实现:
本文主要介绍StdioServerTransport和HTTPTransport。
3.3.1 StdioServerTransport
StdioServerTransport应该是比较简单的,我们就从StdioServerTransport开始。把流程梳理清楚了,剩下的就是看实现即可。
结构体
StdioServerTransport 结构体实现了基于标准输入输出(stdio)的服务器端传输层,用于处理 JSON-RPC 消息。
type StdioServerTransport struct {
mu sync.Mutex // 用于保护共享资源的互斥锁
started bool // 表示传输服务是否已启动的标志
reader *bufio.Reader // 用于从标准输入读取数据的缓冲读取器
writer io.Writer // 用于向标准输出写入数据的写入器
readBuf *stdio.ReadBuffer // 用于存储读取数据的缓冲区
onClose func() // 连接关闭时的回调函数
onError func(error) // 发生错误时的回调函数
onMessage func(ctx context.Context, message *transport.BaseJsonRpcMessage) // 收到消息时的回调函数
}
创建StdioServerTransport实例
func NewStdioServerTransport() *StdioServerTransport {
return NewStdioServerTransportWithIO(os.Stdin, os.Stdout)
}
func NewStdioServerTransportWithIO(in io.Reader, out io.Writer) *StdioServerTransport {
return &StdioServerTransport{
reader: bufio.NewReader(in), // 使用 bufio.Reader 包装输入流,提高读取效率
writer: out, // 输出流
readBuf: stdio.NewReadBuffer(), // 初始化读取缓冲区
}
}
std主要就传入os.Stdin和os.StdOut这两个标准输入输出实例。
Start
在Protocol.Connect 函数的最后调用的就是Transport.Start函数。现在看看StdioServerTransport.Start的具体实现:
func (t *StdioServerTransport) Start(ctx context.Context) error {
// 加锁保护共享资源
t.mu.Lock()
// 检查传输层是否已经启动
if t.started {
// 如果已经启动,则解锁并返回错误
t.mu.Unlock()
return fmt.Errorf("StdioServerTransport already started")
}
// 设置启动标志为 true
t.started = true
// 解锁
t.mu.Unlock()
// 在新的 goroutine 中启动读取循环
go t.readLoop(ctx)
// 返回 nil 表示成功启动
return nil
}
Start 方法用于启动 StdioServerTransport 实例,使其开始监听标准输入(stdin)上的消息。该方法首先通过互斥锁 mu 保护共享资源,并检查传输层是否已经启动。如果已经启动,则返回错误;否则,设置启动标志为 true,并在新的 goroutine 中启动读取循环 readLoop。
消息处理
readLoop负责在一个新的 goroutine 中持续监听标准输入(stdin),并处理接收到的数据,交给processReadBuffer函数处理。
func (t *StdioServerTransport) readLoop(ctx context.Context) {
// 创建一个大小为4096字节的缓冲区,用于临时存储从标准输入读取的数据
buffer := make([]byte, 4096)
// 无限循环,持续监听标准输入
for {
select {
// 如果上下文被取消或超时,则关闭传输层并返回
case <-ctx.Done():
t.Close()
return
default:
// 加锁保护共享资源
t.mu.Lock()
// 检查传输层是否已启动
if !t.started {
// 如果未启动,则解锁并返回
t.mu.Unlock()
return
}
// 解锁
t.mu.Unlock()
// 从标准输入读取数据到缓冲区
n, err := t.reader.Read(buffer)
// 如果发生错误
if err != nil {
// 如果错误不是EOF(文件结束标志),则处理错误
if err != io.EOF {
t.handleError(fmt.Errorf("read error: %w", err))
}
// 返回,结束读取循环
return
}
// 将读取到的数据追加到读取缓冲区
t.readBuf.Append(buffer[:n])
// 处理读取缓冲区中的数据
t.processReadBuffer()
}
}
}
继续查看到processReadBuffer函数,它负责处理读取缓冲区 readBuf 中的数据,解析出完整的 JSON-RPC 消息,并调用用户设置的消息处理回调函数 onMessage 进行处理。
func (t *StdioServerTransport) processReadBuffer() {
// 无限循环,持续处理读取缓冲区中的数据
for {
// 从读取缓冲区中尝试读取一条消息
msg, err := t.readBuf.ReadMessage()
// 如果读取过程中发生错误
if err != nil {
// 调用错误处理函数
t.handleError(err)
// 返回,结束处理循环
return
}
// 如果读取到的消息为空
if msg == nil {
// 返回,结束处理循环
return
}
// 调用消息处理函数,处理接收到的消息
t.handleMessage(msg)
}
}
如果对MCP协议有了解的同学应该想到,在外部输入的json内容一般为:
{"jsonrpc":"2.0","id":12,"method":"tools/call","params":{"name":"hello3","arguments":{"name":"alias"}}}
根本就没有Protocol之前设置的handle类型如request、response、notification和error,那它又是如何实现转化的?答案就在t.readBuf.ReadMessage(),我们就去代码看看:
func (rb *ReadBuffer) ReadMessage() (*transport.BaseJsonRpcMessage, error) {
// 加锁保护共享资源
rb.mu.Lock()
defer rb.mu.Unlock()
// 如果缓冲区为空,则返回 nil 和 nil
if rb.buffer == nil {
return nil, nil
}
// 查找换行符,以分割完整的 JSON-RPC 消息
for i := 0; i < len(rb.buffer); i++ {
if rb.buffer[i] == '\n' {
// 提取换行符之前的部分作为一条消息
line := string(rb.buffer[:i])
// 调试输出(实际代码中可能不需要)
// println("read line: ", line)
// 移除已读取的消息部分,保留剩余数据
rb.buffer = rb.buffer[i+1:]
// 调用 deserializeMessage 函数将消息字符串反序列化为 JSON-RPC 消息对象
return deserializeMessage(line)
}
}
// 如果未找到换行符,则返回 nil 和 nil,表示当前没有完整的消息可读
return nil, nil
}
ReadMessage函数用于将连续的标准输入输出流缓冲为离散的 JSON-RPC 消息。但是还是没有看到将外部的格式转为Protocol需要的request、response、notification和error类型。大家耐心的看看下面的deserializeMessage方法:
// deserializeMessage deserializes a JSON-RPC message from a string.
func deserializeMessage(line string) (*transport.BaseJsonRpcMessage, error) {
// 尝试将字符串反序列化为 JSON-RPC 请求类型
var request transport.BaseJSONRPCRequest
if err := json.Unmarshal([]byte(line), &request); err == nil {
// println("unmarshaled request:", spew.Sdump(request)) // 调试输出,实际代码中可能不需要
return transport.NewBaseMessageRequest(&request), nil
} else {
// println("unmarshaled request error:", err.Error())
}
// 尝试将字符串反序列化为 JSON-RPC 通知类型
var notification transport.BaseJSONRPCNotification
if err := json.Unmarshal([]byte(line), ¬ification); err == nil {
// 如果反序列化成功,则创建并返回一个新的 BaseMessageNotification 对象
return transport.NewBaseMessageNotification(¬ification), nil
} else {
// println("unmarshaled notification error:", err.Error())
}
// 尝试将字符串反序列化为 JSON-RPC 响应类型
var response transport.BaseJSONRPCResponse
if err := json.Unmarshal([]byte(line), &response); err == nil {
// 如果反序列化成功,则创建并返回一个新的 BaseMessageResponse 对象
return transport.NewBaseMessageResponse(&response), nil
} else {
// println("unmarshaled response error:", err.Error())
}
// 尝试将字符串反序列化为 JSON-RPC 错误响应类型
var errorResponse transport.BaseJSONRPCError
if err := json.Unmarshal([]byte(line), &errorResponse); err == nil {
// 如果反序列化成功,则创建并返回一个新的 BaseMessageError 对象
return transport.NewBaseMessageError(&errorResponse), nil
} else {
// println("unmarshaled error response error:", err.Error())
}
// 如果以上所有类型都匹配失败,则返回错误,表示无法识别的消息类型
return nil, errors.New("failed to unmarshal JSON-RPC message, unrecognized type")
}
说实在的,这个函数的实现不够优雅和健壮。哪怕使用MCP中的字段来做映射到request、response、notification和error的健壮性都比现在的实现好。 从ReadMessage函数中获得到的msg *transport.BaseJsonRpcMessage给到在Protocol.Connect函数中的:
tr.SetMessageHandler(func(ctx context.Context, message *transport.BaseJsonRpcMessage) {
switch m := message.Type; {
case m == transport.BaseMessageTypeJSONRPCRequestType:
p.handleRequest(ctx, message.JsonRpcRequest)
case m == transport.BaseMessageTypeJSONRPCNotificationType:
p.handleNotification(message.JsonRpcNotification)
case m == transport.BaseMessageTypeJSONRPCResponseType:
p.handleResponse(message.JsonRpcResponse, nil)
case m == transport.BaseMessageTypeJSONRPCErrorType:
p.handleResponse(nil, message.JsonRpcError)
}
})
transport.BaseJsonRpcMessage字段已经包含了MCP协议中定义的各个消息类型的字段内容。
type BaseJsonRpcMessage struct {
Type BaseMessageType
JsonRpcRequest *BaseJSONRPCRequest
JsonRpcNotification *BaseJSONRPCNotification
JsonRpcResponse *BaseJSONRPCResponse
JsonRpcError *BaseJSONRPCError
}
3.3.2 HTTPTransport
HTTPTransport 实现了 MCP(消息通信协议)的无状态 HTTP 传输层。它提供了启动 HTTP 服务器、发送和接收 JSON-RPC 消息、处理错误和关闭连接等功能。
结构体
HTTPTransport 结构体包含了多个字段,用于存储和管理 HTTP 服务器、端点、消息处理函数、错误处理函数、关闭处理函数、互斥锁、监听地址以及响应映射等。
// HTTPTransport 实现了 MCP 的无状态 HTTP 传输
type HTTPTransport struct {
*baseTransport // 嵌入基础传输层,可能包含一些共享的逻辑或字段
server *http.Server // HTTP 服务器实例,用于处理传入的 HTTP 请求
endpoint string // HTTP 端点,用于处理特定的路径请求
messageHandler func(ctx context.Context, message *transport.BaseJsonRpcMessage) // 消息处理函数,当接收到消息时被调用
errorHandler func(error) // 错误处理函数,当发生错误时被调用
closeHandler func() // 关闭处理函数,当连接关闭时被调用
mu sync.RWMutex // 读写锁,用于保护对共享资源的并发访问
addr string // 服务器监听的地址
responseMap map[int64]chan *transport.BaseJsonRpcMessage // 响应映射,用于存储响应通道,通过消息 ID 索引
}
注意看一下,HTTPTransport中的baseTransport,说明HTTPTransport集成了baseTransport的函数能力。但是HTTPTransport中的一些实现,比如Send、SetCloseHandler、SetErrorHandler和SetMessageHandler与baseTransport的函数实现都是一样的,可以直接删除。
创建HTTPTransport实例
HTTPTransport提供了类似option模式的格式,但是没有在NewHTTPTransport()函数中使用,主要是需要设置的字段就addr一个。
func NewHTTPTransport(endpoint string) *HTTPTransport {
return &HTTPTransport{
baseTransport: newBaseTransport(),
endpoint: endpoint,
addr: ":8080", // Default port
responseMap: make(map[int64]chan *transport.BaseJsonRpcMessage),
}
}
Start
HTTPTransport.Start 方法用于启动 HTTPTransport 实例,创建一个http Server,使其开始监听HTTP POST上的消息。
func (t *HTTPTransport) Start(ctx context.Context) error {
// 创建一个新的 HTTP 请求多路复用器(ServeMux)
mux := http.NewServeMux()
// 将指定的端点(endpoint)与 handleRequest 方法关联起来,用于处理该端点的 HTTP 请求
mux.HandleFunc(t.endpoint, t.handleRequest)
// 初始化 HTTP 服务器,设置监听地址和处理器(即上面创建的 ServeMux)
t.server = &http.Server{
Addr: t.addr, // 监听地址
Handler: mux, // 处理器
}
// 启动 HTTP 服务器,监听并处理传入的 HTTP 请求
return t.server.ListenAndServe()
}
Handle
handleRequest 方法是 HTTP 请求的处理函数,它负责解析请求、处理请求体、生成响应,并将响应写回客户端。
func (t *HTTPTransport) handleRequest(w http.ResponseWriter, r *http.Request) {
// 检查请求方法是否为 POST,如果不是则返回 405 Method Not Allowed 错误
if r.Method != http.MethodPost {
http.Error(w, "Only POST method is supported", http.StatusMethodNotAllowed)
return
}
// 从请求中获取上下文信息
ctx := r.Context()
// 读取请求体内容
body, err := t.readBody(r.Body)
if err != nil {
// 如果读取请求体出错,返回 400 Bad Request 错误
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
// 处理请求体内容,调用 handleMessage 方法
response, err := t.handleMessage(ctx, body)
if err != nil {
// 如果处理请求出错,返回 500 Internal Server Error 错误
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// 将响应内容序列化为 JSON 格式
jsonData, err := json.Marshal(response)
if err != nil {
// 如果序列化出错,且设置了错误处理函数,则调用错误处理函数
if t.errorHandler != nil {
t.errorHandler(fmt.Errorf("failed to marshal response: %w", err))
}
// 返回 500 Internal Server Error 错误
http.Error(w, "Failed to marshal response", http.StatusInternalServerError)
return
}
// 设置响应头 Content-Type 为 application/json
w.Header().Set("Content-Type", "application/json")
// 写入响应体内容
w.Write(jsonData)
}
可以看出只支持POST请求,框架中对SSE的支持也是被注释了。与StdioServerTransport一样对外部传入进来的MCP协议的JSON格式,需要到baseTransport.handleMessage函数进行转化:
func (t *baseTransport) handleMessage(ctx context.Context, body []byte) (*transport.BaseJsonRpcMessage, error) {
// 加锁以保护共享资源
t.mu.Lock()
// 查找一个未使用的响应通道键
var key int64 = 0
for key < 1000000 {
if _, ok := t.responseMap[key]; !ok {
break
}
key++
}
// 创建一个新的响应通道,并存储在 responseMap 中
t.responseMap[key] = make(chan *transport.BaseJsonRpcMessage)
t.mu.Unlock()
// 用于存储原始请求 ID 的指针
var prevId *transport.RequestId = nil
// 标记消息是否已成功反序列化
deserialized := false
// 尝试将消息体反序列化为请求
var request transport.BaseJSONRPCRequest
if err := json.Unmarshal(body, &request); err == nil {
deserialized = true
// 保存原始请求 ID
id := request.Id
prevId = &id
// 用新生成的键替换请求 ID
request.Id = transport.RequestId(key)
// 获取消息处理函数(加读锁)
t.mu.RLock()
handler := t.messageHandler
t.mu.RUnlock()
// 如果消息处理函数存在,则调用它处理请求
if handler != nil {
handler(ctx, transport.NewBaseMessageRequest(&request))
}
}
// 如果消息不是请求,则尝试反序列化为通知
var notification transport.BaseJSONRPCNotification
if !deserialized {
if err := json.Unmarshal(body, ¬ification); err == nil {
deserialized = true
t.mu.RLock()
handler := t.messageHandler
t.mu.RUnlock()
if handler != nil {
handler(ctx, transport.NewBaseMessageNotification(¬ification))
}
}
}
// 如果消息既不是请求也不是通知,则尝试反序列化为响应
var response transport.BaseJSONRPCResponse
if !deserialized {
if err := json.Unmarshal(body, &response); err == nil {
deserialized = true
t.mu.RLock()
handler := t.messageHandler
t.mu.RUnlock()
if handler != nil {
handler(ctx, transport.NewBaseMessageResponse(&response))
}
}
}
// 如果消息仍然未反序列化成功,则尝试反序列化为错误响应
var errorResponse transport.BaseJSONRPCError
if !deserialized {
if err := json.Unmarshal(body, &errorResponse); err == nil {
deserialized = true
t.mu.RLock()
handler := t.messageHandler
t.mu.RUnlock()
if handler != nil {
handler(ctx, transport.NewBaseMessageError(&errorResponse))
}
}
}
// 阻塞等待响应通道中的响应消息
responseToUse := <-t.responseMap[key]
// 从 responseMap 中删除该响应通道
delete(t.responseMap, key)
// 如果原始请求 ID 存在,则恢复它
if prevId != nil {
responseToUse.JsonRpcResponse.Id = *prevId
}
// 返回处理后的响应消息
return responseToUse, nil
}
baseTransport.handleMessage函数的消息转化实现一样不优雅和不健壮。
四、总结
使用metoro-io/mcp-golang框架实现MCP Server实现业务功能,还是很流畅的,本人基于metoro-io/mcp-golang框架尝试写了一个MCP Server:https://github.com/qiangmzsx/mcp-filesystem-server 。 metoro-io/mcp-golang框架实现使用了比较多的反射机制和json解码判断类型,性能和健壮性上不如mark3labs/mcp-go。 最近MCP协议提交了"Streamable HTTP" 传输代替「HTTP+SSE」的 PR,我们一起观察一下这两个类库谁更快的支持上去。