Dify Plugin Daemon 源码解析

Dify Plugin Daemon 旨在管理 Dify 生态系统中插件的完整生命周期。它作为 Dify API 服务器与各种插件运行时环境之间的中央协调器,支持在不同部署场景中无缝执行插件。

Runtime 类型

Local Runtime (本地运行时)

通信方式:STDIN/STDOUT/STDERR

Debug Runtime (调试运行时)

通信方式:TCP Socket

Serverless Runtime (无服务器运行时)

通信方式:HTTP/SSE

类继承关系与接口

PluginRuntime 接口

1
2
3
4
5
6
plugin_entities.PluginRuntime (接口)
├── Listen(sessionId) *Broadcast[SessionMessage]
├── Write(sessionId, action, data)
├── Type() PluginRuntimeType
├── Configuration() *PluginDeclaration
└── RuntimeState() PluginRuntimeState

实现类

LocalPluginRuntime

1
2
3
4
├── LocalPluginRuntime (local_runtime/run.go)
│ ├── stdioHolder (stdio.go) - 管理 STDIN/STDOUT/STDERR
│ ├── pythonInterpreterPath - Python 解释器路径
│ └── process *exec.Cmd - 子进程对象

RemotePluginRuntime

1
2
3
4
├── RemotePluginRuntime (debugging_runtime/run.go)
│ ├── tcpListener net.Listener - TCP 监听器
│ ├── tcpConn net.Conn - TCP 连接
│ └── debuggingKey string - 调试密钥

ServerlessPluginRuntime

1
2
3
4
└── ServerlessPluginRuntime (serverless_runtime/run.go)
│ ├── LambdaURL string - Lambda 函数 URL
│ ├── client *http.Client - HTTP 客户端
│ ├── listeners sync.Map - 会话监听器映射表

调用流程

Local Runtime

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
[Dify API]
↓ HTTP Request
[HTTP Server] (http_server.go)
↓ 路由分发
[Controller] (controllers/)
↓ 调用
[Plugin Daemon] (plugin_daemon/generic.go)
↓ GenericInvokePlugin[Req, Rsp]()
├─→ session.Runtime().Listen(sessionId)
│ └─→ 创建 Broadcast 对象
│ └─→ 注册 stdioHolder 监听器

└─→ session.Write(event, action, data)
└─→ stdioHolder.write(data + '\n')
↓ 写入 STDIN
[Plugin Process] (Python 子进程)
↓ 读取 STDIN,处理请求
↓ 写入 STDOUT (JSON 格式,按行)
[stdioHolder.StartStdout()] (stdio.go)
↓ bufio.Scanner 按行读取
↓ ParsePluginUniversalEvent()
├─→ SESSION_MESSAGE_TYPE_STREAM → listener.Send()
├─→ SESSION_MESSAGE_TYPE_INVOKE → backwards_invocation
├─→ SESSION_MESSAGE_TYPE_END → response.Close()
└─→ SESSION_MESSAGE_TYPE_ERROR → response.WriteError()

[Response Stream] (stream.Stream[Rsp])
↓ 流式返回
[Controller] → [HTTP Response] (SSE)

[Dify API]

Serverless Runtime

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
[Dify API]
↓ HTTP Request
[HTTP Server]
↓ 路由分发
[Controller]
↓ 调用
[Plugin Daemon] (plugin_daemon/generic.go)
↓ GenericInvokePlugin[Req, Rsp]()
├─→ session.Runtime().Listen(sessionId)
│ └─→ 创建 Broadcast 对象
│ └─→ 存储到 listeners.Store(sessionId, broadcast)

└─→ session.Write(event, action, data)
└─→ ServerlessPluginRuntime.Write()
↓ routine.Submit() 提交后台任务
↓ 构建 URL: LambdaURL + "/invoke?action=" + action
↓ HTTP POST Request
├─→ Header: Content-Type: application/json
├─→ Header: Accept: text/event-stream
├─→ Header: Dify-Plugin-Session-ID: sessionId
└─→ Body: JSON data

[Lambda Function / Cloud Function]
↓ 处理请求
↓ 返回 SSE 流式响应
[ServerlessPluginRuntime.Write()] (io.go)
↓ bufio.Scanner 按行读取 SSE
↓ ParsePluginUniversalEvent()
├─→ 解析 SessionMessage
└─→ broadcast.Send(sessionMessage)

[Response Stream]
↓ 流式返回
[Controller] → [HTTP Response] (SSE)

[Dify API]

Debug Runtime

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
[Dify API]
↓ HTTP Request
[HTTP Server]
↓ 路由分发
[Controller]
↓ 调用
[Plugin Daemon]
↓ GenericInvokePlugin[Req, Rsp]()
├─→ session.Runtime().Listen(sessionId)
│ └─→ 创建 Broadcast 对象
│ └─→ 注册 TCP 连接监听器

└─→ session.Write(event, action, data)
└─→ RemotePluginRuntime.Write()
↓ 写入 TCP 连接 (JSON + '\n')

[开发者本地插件进程]
↓ 通过 TCP 连接读取请求
↓ 处理请求
↓ 通过 TCP 连接写入响应 (JSON + '\n')
[RemotePluginRuntime.startReading()] (io.go)
↓ bufio.Scanner 按行读取 TCP
↓ ParsePluginUniversalEvent()
└─→ broadcast.Send(sessionMessage)

[Response Stream]
↓ 流式返回
[Controller] → [HTTP Response] (SSE)

[Dify API]

核心组件调用关系图

17630155940651763015593616.png

反向调用流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
插件需要回调 Dify API 时:

[Plugin Process]
↓ 发送 SESSION_MESSAGE_TYPE_INVOKE
[Plugin Daemon]
↓ 检测到 INVOKE 消息
[backwards_invocation.InvokeDify()] (backwards_invocation/)
↓ 解析反向调用请求
├─→ 获取 InvokeFrom (Dify API URL)
├─→ 创建 HTTP Request
└─→ 调用 Dify API
↓ HTTP POST to Dify API
[Dify API Server]
↓ 处理请求(查询数据库、调用服务等)
↓ 返回响应
[backwards_invocation]
↓ 接收响应
├─→ Local/Debug: 通过 EventWriter 写回插件
└─→ Serverless: 通过 Transaction Handler 返回

[Plugin Process]
↓ 接收反向调用响应
↓ 继续处理

集群模式

集群模式下的插件调度

17630160670651763016066749.png

跨节点调用流程

  1. Dify API → Node 1 HTTP Server
  2. Node 1 检查插件是否在本地
  3. 如果不在,查询 Redis 找到插件所在节点
  4. 重定向请求到 Node 2
  5. Node 2 执行插件调用
  6. 返回响应给 Dify API
1
2
3
4
5
6
7
8
9
10
11
12
// 检查插件是否在当前节点
// 调用集群管理器查询插件位置
if ok, originalError := app.cluster.IsPluginOnCurrentNode(identity); !ok {
// 插件不在当前节点,执行重定向逻辑
// 查询插件所在节点并转发请求
app.redirectPluginInvokeByPluginIdentifier(ctx, identity, originalError)
// 中止当前节点的请求处理
ctx.Abort()
} else {
// 插件在当前节点,继续执行后续中间件
ctx.Next()
}
1
2
3
4
5
6
7
8
9
10
// 从集群管理器查询插件的可用节点列表
// 通过 Redis 查询插件状态,获取所有运行该插件的节点 ID
nodes, err := app.cluster.FetchPluginAvailableNodesById(plugin_unique_identifier.String())
...
// 简单的负载均衡策略:使用列表中的第一个节点
nodeId := nodes[0]
// 调用集群管理器转发请求到目标节点
// 返回目标节点的响应状态码、响应头和响应体
statusCode, header, body, err := app.cluster.RedirectRequest(nodeId, ctx.Request)
...

关键文件索引

核心接口和实体

  • pkg/entities/plugin_entities/runtime.go - PluginRuntime 接口定义
  • pkg/entities/plugin_entities/event.go - SessionMessage 消息类型

Local Runtime

  • internal/core/plugin_manager/local_runtime/run.go - 本地运行时主逻辑
  • internal/core/plugin_manager/local_runtime/io.go - Listen/Write 接口实现
  • internal/core/plugin_manager/local_runtime/stdio.go - STDIN/STDOUT 管理
  • internal/core/plugin_manager/local_runtime/environment_python.go - Python 环境初始化

Debug Runtime

  • internal/core/plugin_manager/debugging_runtime/run.go - 调试运行时主逻辑
  • internal/core/plugin_manager/debugging_runtime/io.go - TCP 通信实现

Serverless Runtime

  • internal/core/plugin_manager/serverless_runtime/run.go - 无服务器运行时主逻辑
  • internal/core/plugin_manager/serverless_runtime/io.go - HTTP/SSE 通信实现

插件调用核心

  • internal/core/plugin_daemon/generic.go - GenericInvokePlugin 泛型调用函数
  • internal/core/plugin_daemon/backwards_invocation/ - 反向调用实现
  • internal/core/session_manager/ - 会话管理

生命周期管理

  • internal/core/plugin_manager/lifecycle/full_duplex.go - 全双工生命周期
  • internal/core/plugin_manager/manager.go - 插件管理器
  • internal/core/plugin_manager/launcher.go - 插件启动器

集群管理

  • internal/cluster/cluster.go - 集群管理器
  • internal/cluster/plugin.go - 插件集群调度
  • internal/cluster/node.go - 节点管理

HTTP 服务器

  • internal/server/http_server.go - HTTP 路由配置
  • internal/server/server.go - 服务器启动
  • internal/server/controllers/ - API 控制器

关键差异对比

特性 Local Runtime Debug Runtime Serverless Runtime
通信方式 STDIN/STDOUT TCP Socket HTTP/SSE
进程模型 子进程 独立进程 无状态函数
启动方式 exec.Command() TCP 连接 HTTP 调用
环境初始化 Python venv 开发者自行管理 预构建镜像
反向调用 全双工支持 全双工支持 Transaction 机制
心跳检测 120秒超时 TCP 连接保活 无需心跳
资源清理 Kill 进程 关闭 TCP 连接 无需清理
适用场景 生产环境 开发调试 云原生部署
扩展性 单机 单机 高扩展性
冷启动 首次较慢 无冷启动 有冷启动