Dify Plugin Daemon 旨在管理 Dify 生态系统中插件的完整生命周期。它作为 Dify API 服务器与各种插件运行时环境之间的中央协调器,支持在不同部署场景中无缝执行插件。
Runtime 类型
Local Runtime (本地运行时)
通信方式:STDIN/STDOUT/STDERR
Debug Runtime (调试运行时)
通信方式:TCP Socket
Serverless Runtime (无服务器运行时)
通信方式:HTTP/SSE
类继承关系与接口
PluginRuntime 接口
1 2 3 4 5 6
| plugin_entities.PluginRuntime (接口) ├── Listen(sessionId) *Broadcast[SessionMessage] ├── Write(sessionId, action, data) ├── Type() PluginRuntimeType ├── Configuration() *PluginDeclaration └── RuntimeState() PluginRuntimeState
|
实现类
LocalPluginRuntime
1 2 3 4
| ├── LocalPluginRuntime (local_runtime/run.go) │ ├── stdioHolder (stdio.go) - 管理 STDIN/STDOUT/STDERR │ ├── pythonInterpreterPath - Python 解释器路径 │ └── process *exec.Cmd - 子进程对象
|
RemotePluginRuntime
1 2 3 4
| ├── RemotePluginRuntime (debugging_runtime/run.go) │ ├── tcpListener net.Listener - TCP 监听器 │ ├── tcpConn net.Conn - TCP 连接 │ └── debuggingKey string - 调试密钥
|
ServerlessPluginRuntime
1 2 3 4
| └── ServerlessPluginRuntime (serverless_runtime/run.go) │ ├── LambdaURL string - Lambda 函数 URL │ ├── client *http.Client - HTTP 客户端 │ ├── listeners sync.Map - 会话监听器映射表
|
调用流程
Local Runtime
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| [Dify API] ↓ HTTP Request [HTTP Server] (http_server.go) ↓ 路由分发 [Controller] (controllers/) ↓ 调用 [Plugin Daemon] (plugin_daemon/generic.go) ↓ GenericInvokePlugin[Req, Rsp]() ├─→ session.Runtime().Listen(sessionId) │ └─→ 创建 Broadcast 对象 │ └─→ 注册 stdioHolder 监听器 │ └─→ session.Write(event, action, data) └─→ stdioHolder.write(data + '\n') ↓ 写入 STDIN [Plugin Process] (Python 子进程) ↓ 读取 STDIN,处理请求 ↓ 写入 STDOUT (JSON 格式,按行) [stdioHolder.StartStdout()] (stdio.go) ↓ bufio.Scanner 按行读取 ↓ ParsePluginUniversalEvent() ├─→ SESSION_MESSAGE_TYPE_STREAM → listener.Send() ├─→ SESSION_MESSAGE_TYPE_INVOKE → backwards_invocation ├─→ SESSION_MESSAGE_TYPE_END → response.Close() └─→ SESSION_MESSAGE_TYPE_ERROR → response.WriteError() ↓ [Response Stream] (stream.Stream[Rsp]) ↓ 流式返回 [Controller] → [HTTP Response] (SSE) ↓ [Dify API]
|
Serverless Runtime
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| [Dify API] ↓ HTTP Request [HTTP Server] ↓ 路由分发 [Controller] ↓ 调用 [Plugin Daemon] (plugin_daemon/generic.go) ↓ GenericInvokePlugin[Req, Rsp]() ├─→ session.Runtime().Listen(sessionId) │ └─→ 创建 Broadcast 对象 │ └─→ 存储到 listeners.Store(sessionId, broadcast) │ └─→ session.Write(event, action, data) └─→ ServerlessPluginRuntime.Write() ↓ routine.Submit() 提交后台任务 ↓ 构建 URL: LambdaURL + "/invoke?action=" + action ↓ HTTP POST Request ├─→ Header: Content-Type: application/json ├─→ Header: Accept: text/event-stream ├─→ Header: Dify-Plugin-Session-ID: sessionId └─→ Body: JSON data ↓ [Lambda Function / Cloud Function] ↓ 处理请求 ↓ 返回 SSE 流式响应 [ServerlessPluginRuntime.Write()] (io.go) ↓ bufio.Scanner 按行读取 SSE ↓ ParsePluginUniversalEvent() ├─→ 解析 SessionMessage └─→ broadcast.Send(sessionMessage) ↓ [Response Stream] ↓ 流式返回 [Controller] → [HTTP Response] (SSE) ↓ [Dify API]
|
Debug Runtime
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| [Dify API] ↓ HTTP Request [HTTP Server] ↓ 路由分发 [Controller] ↓ 调用 [Plugin Daemon] ↓ GenericInvokePlugin[Req, Rsp]() ├─→ session.Runtime().Listen(sessionId) │ └─→ 创建 Broadcast 对象 │ └─→ 注册 TCP 连接监听器 │ └─→ session.Write(event, action, data) └─→ RemotePluginRuntime.Write() ↓ 写入 TCP 连接 (JSON + '\n') ↓ [开发者本地插件进程] ↓ 通过 TCP 连接读取请求 ↓ 处理请求 ↓ 通过 TCP 连接写入响应 (JSON + '\n') [RemotePluginRuntime.startReading()] (io.go) ↓ bufio.Scanner 按行读取 TCP ↓ ParsePluginUniversalEvent() └─→ broadcast.Send(sessionMessage) ↓ [Response Stream] ↓ 流式返回 [Controller] → [HTTP Response] (SSE) ↓ [Dify API]
|
核心组件调用关系图

反向调用流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| 插件需要回调 Dify API 时:
[Plugin Process] ↓ 发送 SESSION_MESSAGE_TYPE_INVOKE [Plugin Daemon] ↓ 检测到 INVOKE 消息 [backwards_invocation.InvokeDify()] (backwards_invocation/) ↓ 解析反向调用请求 ├─→ 获取 InvokeFrom (Dify API URL) ├─→ 创建 HTTP Request └─→ 调用 Dify API ↓ HTTP POST to Dify API [Dify API Server] ↓ 处理请求(查询数据库、调用服务等) ↓ 返回响应 [backwards_invocation] ↓ 接收响应 ├─→ Local/Debug: 通过 EventWriter 写回插件 └─→ Serverless: 通过 Transaction Handler 返回 ↓ [Plugin Process] ↓ 接收反向调用响应 ↓ 继续处理
|
集群模式
集群模式下的插件调度

跨节点调用流程
- Dify API → Node 1 HTTP Server
- Node 1 检查插件是否在本地
- 如果不在,查询 Redis 找到插件所在节点
- 重定向请求到 Node 2
- Node 2 执行插件调用
- 返回响应给 Dify API
1 2 3 4 5 6 7 8 9 10 11 12
|
if ok, originalError := app.cluster.IsPluginOnCurrentNode(identity); !ok { app.redirectPluginInvokeByPluginIdentifier(ctx, identity, originalError) ctx.Abort() } else { ctx.Next() }
|
1 2 3 4 5 6 7 8 9 10
|
nodes, err := app.cluster.FetchPluginAvailableNodesById(plugin_unique_identifier.String()) ...
nodeId := nodes[0]
statusCode, header, body, err := app.cluster.RedirectRequest(nodeId, ctx.Request) ...
|
关键文件索引
核心接口和实体
pkg/entities/plugin_entities/runtime.go - PluginRuntime 接口定义
pkg/entities/plugin_entities/event.go - SessionMessage 消息类型
Local Runtime
internal/core/plugin_manager/local_runtime/run.go - 本地运行时主逻辑
internal/core/plugin_manager/local_runtime/io.go - Listen/Write 接口实现
internal/core/plugin_manager/local_runtime/stdio.go - STDIN/STDOUT 管理
internal/core/plugin_manager/local_runtime/environment_python.go - Python 环境初始化
Debug Runtime
internal/core/plugin_manager/debugging_runtime/run.go - 调试运行时主逻辑
internal/core/plugin_manager/debugging_runtime/io.go - TCP 通信实现
Serverless Runtime
internal/core/plugin_manager/serverless_runtime/run.go - 无服务器运行时主逻辑
internal/core/plugin_manager/serverless_runtime/io.go - HTTP/SSE 通信实现
插件调用核心
internal/core/plugin_daemon/generic.go - GenericInvokePlugin 泛型调用函数
internal/core/plugin_daemon/backwards_invocation/ - 反向调用实现
internal/core/session_manager/ - 会话管理
生命周期管理
internal/core/plugin_manager/lifecycle/full_duplex.go - 全双工生命周期
internal/core/plugin_manager/manager.go - 插件管理器
internal/core/plugin_manager/launcher.go - 插件启动器
集群管理
internal/cluster/cluster.go - 集群管理器
internal/cluster/plugin.go - 插件集群调度
internal/cluster/node.go - 节点管理
HTTP 服务器
internal/server/http_server.go - HTTP 路由配置
internal/server/server.go - 服务器启动
internal/server/controllers/ - API 控制器
关键差异对比
| 特性 |
Local Runtime |
Debug Runtime |
Serverless Runtime |
| 通信方式 |
STDIN/STDOUT |
TCP Socket |
HTTP/SSE |
| 进程模型 |
子进程 |
独立进程 |
无状态函数 |
| 启动方式 |
exec.Command() |
TCP 连接 |
HTTP 调用 |
| 环境初始化 |
Python venv |
开发者自行管理 |
预构建镜像 |
| 反向调用 |
全双工支持 |
全双工支持 |
Transaction 机制 |
| 心跳检测 |
120秒超时 |
TCP 连接保活 |
无需心跳 |
| 资源清理 |
Kill 进程 |
关闭 TCP 连接 |
无需清理 |
| 适用场景 |
生产环境 |
开发调试 |
云原生部署 |
| 扩展性 |
单机 |
单机 |
高扩展性 |
| 冷启动 |
首次较慢 |
无冷启动 |
有冷启动 |