毕昇核心流程梳理

应用分类

毕昇的应用分为三类,分别是,工作流,助手,技能

17436496803341743649679856.png

助手的功能和普通的知识库应用类似,设置 Prompt,配置大模型,外挂知识库,添加工具等等,只不过毕昇这里有一个特殊的实现,可以外挂技能。

17436503133371743650313052.png

工作流方面,毕昇应该也是接触了市面上常见的需求了,所以大多数的业务场景都有对应的或者和其他产品类似的解决方案,例如表单收集。在其他地方毕昇借用了 langgraph 的强大,实现了工作流的轻松构建,以及人机交互等有特色的节点。

17436505803321743650579422.png

节点名称 节点类型 节点功能
输入 基础节点 工作流运行的起始节点
输出 基础节点 可向用户发送消息,并且支持进行更丰富的交互,例如请求用户批准进行某项敏感操作、允许用户在模型输出内容的基础上直接修改并提交。
大模型 基础节点 调用大模型回答用户问题或者处理任务
助手 基础节点 AI 自主进行任务规划,选择合适的知识库、数据库或工具进行调用。
QA知识库检索 基础节点 从 QA 知识库中检索问题以及对应的答案。
文档知识库问答 基础节点 根据用户问题从知识库中检索相关内容,结合检索结果调用大模型生成最终结果,支持多个问题并行执行。
报告 基础节点 按照预设的word模板生成报告
代码 基础节点 自定义需要执行的代码
条件分支 基础节点 根据条件表达式执行不同的分支
结束 基础节点 工作流运行到此结束
论文获取 工具节点 从 Arxiv 网站检索论文的工具,输入为检索关键词。
Dalle3画图 工具节点 OpenAI 文生图模型
Bing Web 搜索 工具节点 BIng 搜索引擎,可联网检索互联网信息,例如天气、汇率、时事等
天眼查 工具节点 企业信息查询
经济经融数据 工具节点 包含股票、基金、期货等行情数据和宏观经济、公司财务等基本面数据的金融大数据平台
钉钉 工具节点 钉钉群机器人发送消息工具
Firecrawl 工具节点 指定 URL 爬取网页内容,并将其转换为 Markdown 格式
Jina AI 工具节点 将目标网址(支持 PDF)内容转换为大模型可处理的 Markdown 格式
飞书消息 工具节点 支持获取飞书单聊或群聊历史消息,向指定用户或者群聊发送消息
发送邮件 工具节点 通过smtp协议发送电子邮件
SiliconFlow 工具节点 基于文本提示生成高质量图像,支持 Flux 和 Stable Diffusion 模型
企业微信 工具节点 企业微信机器人发送群消息工具

技能是工作流的前身,也是基于 Langchain 的组件,但是使用起来比较复杂

17436521464101743652146370.png

langgraph

langchain 官方有一个项目叫做 opengpts),使用 langgraph 分别实现了助手,RAG,Chatbot 三种场景的代码。毕昇中的 bisheng_langchain 包应该就是借鉴该项目的。(在这基础上加上了自己的一些私有的东西,例如工具,然后重新封装为了 bisheng_langchain 这个包)

二者都是用了 langgraph 来构建工作流,例如使用下面代码就可以轻松构建一个 chatbot 的工作流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def route_tools(state: State):
if isinstance(state, list):
ai_message = state[-1]
elif messages := state.get("messages", []):
ai_message = messages[-1]
else:
raise ValueError(f"No messages found in input state to tool_edge: {state}")
if hasattr(ai_message, "tool_calls") and len(ai_message.tool_calls) > 0:
return "tools"
return END

graph_builder.add_conditional_edges(
"chatbot",
route_tools,
{"tools": "tools", END: END},
)
graph_builder.add_edge("tools", "chatbot")
graph_builder.add_edge(START, "chatbot")
graph = graph_builder.compile()

上述代码构建出的工作流结构如下:

17436616373761743661637155.png

助手

助手的工作逻辑和上述 langgraph 构建出来的工作是一致的,也是以 agent 为入口,然后构建一个循环的工作流。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# FunctionCall 方式判断是否继续执行
def should_continue(messages):
last_message = messages[-1]
# If there is no function call, then we finish
if 'tool_calls' not in last_message.additional_kwargs:
if '|<instruct>|' in system_message:
# cohere model
pattern = r'Answer:(.+)\nGrounded answer'
match = re.search(pattern, last_message.content)
if match:
last_message.content = match.group(1)
return 'end'
# Otherwise if there is, we continue
else:
return 'continue'

# ========================================
# 不支持 FunctionCall 的模型,使用 ReAct 来推理,判断是否继续执行
def should_continue(data):
if isinstance(data['agent_outcome'], AgentFinish):
return 'end'
else:
return 'continue'

# ========================================
# 组装工具和 Agent 节点
if tools:
llm_with_tools = llm.bind(tools=[format_tool_to_openai_tool(t) for t in tools])
else:
llm_with_tools = llm

agent = _get_messages | llm_with_tools
workflow = MessageGraph()
workflow.add_node('agent', agent)
workflow.add_node('action', RunnableCallable(call_tool, acall_tool))
workflow.set_entry_point('agent')
workflow.add_conditional_edges(
'agent',
should_continue,
{
'continue': 'action',
'end': END,
},
)

从 Langsmith 的日志来看的确是如此的,run_agent 运行调用大模型,然后解析输出结果,判断是否需要继续执行。

17436632523801743663252292.png

上述 Langsmith 是没有携带工具的,携带工具反复调用的截图如下:

17436634263761743663425910.png

上述流程中可以看到,在进行 should_continue 的判断后,进行了工具的调用,并且整个流程运行了两次(这是因为 deepseek v3 版本的 FunctionCall 功能不完善导致的循环调用,按道理应该第二个 agent 的运行里面应该返回的是 end)

Langsmith 是什么?

Langsmith 是由 LangChain 团队开发的一款工具,旨在帮助开发者更高效地构建、测试和优化基于大语言模型(LLM)的应用程序。它是一个面向开发者的平台,专注于简化 LLM 应用的开发流程,同时提供强大的调试、评估和监控功能。Langsmith 的核心目标是解决在开发 LLM 应用过程中遇到的各种挑战,例如性能调优、链路追踪、日志记录和结果验证等。

工作流

工作流的建立,也是在 Langgraph 的基础上来实现的,整个实现的过程总的来说是将前端构建的数据结构,转换为 Langgraph 数据结构的过程。

Workflow 类是工作流的入口。Workflow 在初始化的时候会构建 GraphEngine,整个流程的节点的构建,边的管理,将数据转换为 Langgraph 的结构都是在 GraphEngine 中完成的。

Workflow 的运行就是调用 GraphEngine 的运行,GraphEngine 底层又去调用 Langgraph 的运行。Workflow 的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class Workflow:

def __init__(self,
workflow_id: str,
user_id: str = None,
workflow_data: Dict = None,
async_mode: bool = False,
max_steps: int = 0,
timeout: int = 0,
callback: BaseCallback = None):

# 运行的唯一标识,保存到数据库的唯一ID
self.workflow_id = workflow_id
self.user_id = user_id

# 超时时间,多久没有接收到用户输入终止workflow运行(单位:分钟)
self.timeout = timeout
self.current_time = None

# 初始化一个 GraphEngine 对象
self.graph_engine = GraphEngine(user_id=user_id,
async_mode=async_mode,
workflow_id=workflow_id,
workflow_data=workflow_data,
max_steps=max_steps,
callback=callback)

GraphEngine 的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
class GraphEngine:
def __init__(self,
user_id: str = None,
workflow_id: str = None,
workflow_data: Dict = None,
async_mode: bool = False,
max_steps: int = 0,
callback: BaseCallback = None):
self.user_id = user_id
self.workflow_id = workflow_id
self.workflow_data = workflow_data
self.max_steps = max_steps
self.async_mode = async_mode
# 回调
self.callback = callback

# node_id: NodeInstance
self.nodes_map = {}
# record how many nodes fan in this node
self.nodes_fan_in = {} # node_id: [node_ids]
# record how many nodes next to this node
self.nodes_next_nodes = {} # node_id: {node_ids}

# node_id: 1; 表示从start节点到此节点的最长路径
self.node_level = {}
# 互斥节点列表,包含condition节点和output节点(选择型交互)
self.condition_nodes = []

self.edges = None
self.graph_state = GraphState()

# 初始化 langgraph stategraph
self.graph_builder = StateGraph(TempState)
self.graph = None
# langgraph 的设置,用于中断恢复和限制速率
self.graph_config = {'configurable': {'thread_id': '1'}, 'recursion_limit': 50}

self.status = WorkflowStatus.RUNNING.value
self.reason = '' # 失败原因

# 使用前端传过来的数据构建边和节点
self.build_edges()
self.build_nodes()

def build_edges(self):
# 构建边
...
1. 初始化一个 EdgeManage 对象,主要创建两个数据结构
2. 一个数据结构是哪个节点连接我,另一个数据结构是我连接到哪些节点

def build_nodes(self):
# 构建节点
...
1. 初始化节点,获取开始节点,结束节点和需要中断的节点
2. 处理开始节点和结束节点(可能有多个),然后在连接其他节点

def run(self):
# 运行图
...

def continue_run(self, data: Any = None):
# 接收用户的输入
...
1. 主要是接收用户输入,然后继续执行

def judge_status(self):
# 状态判断
...

例如,我在页面上构建一个应用,得到的 Langgraph 实际结构如图所示:

17436754094031743675409036.png

上图中,我构建了一个工作流,流程为: 接收用户的输入,并且去 bing 进行搜索,然后进行判断,没有搜索结果那么就继续搜索,否则就使用助手进行总结,最后输出总结的内容。

对应生成 Langgraph 流程如下:

技能

技能是毕昇中对于初学者,或者使用者比较难以上手的一部分,因为节点的学习也需要一些能力,但官方对其的描述,还是基于 langchain 的 component 来实现的。毕昇目前也不推荐使用了,而是推荐采用工作流的方式来搭建。

17436737443991743673743478.png

FAQ

Q: 节点有哪些状态?

A: 节点有 WAITING,RUNNING,SUCCESS,FAILED,INPUT,INPUT_OVER

Q: 节点如何实现中断的?

A: 是借助 Langgraph 来实现的,在获取到开始,结束,中断节点后,就会设置在这些中断节点运行之前进行中断,具体可参考 Langgraph 官方示例: 如何设置断点 — How to add breakpoints

毕昇的实现如下所示:

1
2
# compile langgraph
self.graph = self.graph_builder.compile(checkpointer=MemorySaver(), interrupt_before=interrupt_nodes)

Q: 节点之间的数据如何保存?

A: 节点的数据(输入输出),聊天历史记录是通过一个叫 GraphState 的对象来进行管理的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class GraphState(BaseModel):
# 存储聊天历史
history_memory: Optional[ConversationBufferWindowMemory]

# 全局变量池
variables_pool: Dict[str, Dict[str, Any]] = Field(default={}, description='全局变量池: {node_id: {key: value}}')

def get_history_memory(self, count: int) -> str:
""" 获取聊天历史记录 因为不是1对1,所以重写 buffer_as_str"""
...

def get_history_list(self, count: int) -> List[BaseMessage]:
return self.history_memory.buffer_as_messages[-count:]


def save_context(self, content: str, msg_sender: str) -> None:
""" 保存聊天记录
workflow 特殊情况,过程会有多轮交互,所以不是一条对一条,重制消息结构"""
if msg_sender == 'human':
self.history_memory.chat_memory.add_messages([HumanMessage(content=content)])
elif msg_sender == 'AI':
self.history_memory.chat_memory.add_messages([AIMessage(content=content)])

def set_variable(self, node_id: str, key: str, value: Any):
""" 将节点产生的数据放到全局变量里 """
if node_id not in self.variables_pool:
self.variables_pool[node_id] = {}
self.variables_pool[node_id][key] = value

def get_variable(self, node_id: str, key: str, count: Optional[int] = None) -> Any:
""" 从全局变量中获取数据 """
if node_id not in self.variables_pool:
return None

if key == 'chat_history':
return self.get_history_memory(count=count)
return self.variables_pool[node_id].get(key)

def get_variable_by_str(self, contact_key: str, history_count: Optional[int] = None) -> Any:
"""
从全局变量中获取数据
contact_key: node_id.key#index #index不一定需要
"""
...

def set_variable_by_str(self, contact_key: str, value: Any):
...

def get_all_variables(self) -> Dict[str, Any]:
""" 获取所有的变量,key为node_id.key的格式 """
ret = {}
for node_id, node_variables in self.variables_pool.items():
for key, value in node_variables.items():
ret[f'{node_id}.{key}'] = self.get_variable(node_id, key)
# 特殊处理下 preset_question key
if key == 'preset_question':
for k, v in value.items():
ret[f'{node_id}.{key}#{k}'] = v
return ret

Q: 如何传递用户输入的数据?

A: 当接收到 init_data 的事件后,Redis 中会缓存 WorkFlow 的 data(前端传过来的结构信息),除此之外,Redis 还会缓存 WorkFlow 的 status,event,input,stop 信息, 处理用户输入也是如此,先保存到 Redis 中,使用时再从 Redis 中获取

保存输入:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
async def handle_user_input(self, data: dict):
status_info = self.workflow.get_workflow_status(user_cache=False)
if status_info['status'] != WorkflowStatus.INPUT.value:
logger.warning(f'workflow is not input status: {status_info}')
else:
user_input = {}
message_id = None
new_message = None
# 目前支持一个输入节点
for node_id, node_info in data.items():
user_input[node_id] = node_info['data']
message_id = node_info.get('message_id')
new_message = node_info.get('message')
break
self.workflow.set_user_input(user_input, message_id=message_id, message_content=new_message)
self.workflow.set_workflow_status(WorkflowStatus.INPUT_OVER.value)

def set_user_input(self, data: dict, message_id: int = None, message_content: str = None):
if self.chat_id and message_id:
message_db = ChatMessageDao.get_message_by_id(message_id)
if message_db:
self.update_old_message(data, message_db, message_content)
# 通知异步任务用户输入
self.redis_client.set(self.workflow_input_key, data, expiration=self.workflow_expire_time)
return

使用输入:

1
2
3
4
5
6
7
8
9
10
11
def get_user_input(self) -> dict | None:
ret = self.redis_client.get(self.workflow_input_key)
if ret:
self.redis_client.delete(self.workflow_input_key)
return ret

user_input = redis_callback.get_user_input()
if not user_input:
continue
redis_callback.set_workflow_status(WorkflowStatus.RUNNING.value)
status, reason = workflow.run(user_input)

Q: 那就是为什么这里不使用 GraphState,而是使用 Redis 做为中间媒介?

A: 这是因为 WorkFlow 的执行是一个 Celery 的 Worker 有关,即没法将用户的中断输入内容传递到一个 Worker 中去,只能通过外部 Redis,通过在里面循环的来读取才能获取得到相关的输入内容。

Q: 工作流最大支持运行多少时间?

A: 工作流运行默认最大时间为 720 分钟,节点默认最大步数为 50 步

1
2
3
class WorkflowConf(BaseModel):
max_steps: int = Field(default=50, description="节点运行最大步数")
timeout: int = Field(default=720, description="节点超时时间(min)")

Q: 工作流的运行代码是什么样的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def _execute_workflow(unique_id: str, workflow_id: str, chat_id: str, user_id: str):
redis_callback = RedisCallback(unique_id, workflow_id, chat_id, user_id)
try:
# update workflow status
redis_callback.set_workflow_status(WorkflowStatus.RUNNING.value)
# get workflow data
workflow_data = redis_callback.get_workflow_data()
if not workflow_data:
raise Exception('workflow data not found maybe data is expired')

# init workflow
workflow_conf = settings.get_workflow_conf()
workflow = Workflow(workflow_id, user_id, workflow_data, False,
workflow_conf.max_steps,
workflow_conf.timeout,
redis_callback)
redis_callback.workflow = workflow
start_time = time.time()
status, reason = workflow.run()
first_input = True
# run workflow
while True:
logger.debug(f'workflow execute status: {workflow.status()}')
if workflow.status() in [WorkflowStatus.FAILED.value, WorkflowStatus.SUCCESS.value]:
redis_callback.set_workflow_status(status, reason)
break
elif workflow.status() == WorkflowStatus.INPUT.value:
if first_input:
start_time = time.time()
first_input = False
redis_callback.set_workflow_status(status, reason)
time.sleep(1)
if time.time() - start_time > workflow.timeout * 60:
raise IgnoreException('workflow wait user input timeout')
if redis_callback.get_workflow_stop():
raise IgnoreException('workflow stop by user')
user_input = redis_callback.get_user_input()
if not user_input:
continue
redis_callback.set_workflow_status(WorkflowStatus.RUNNING.value)
status, reason = workflow.run(user_input)
first_input = True
else:
raise Exception(f'unexpected workflow status error: {status}')
except IgnoreException as e:
logger.warning(f'execute_workflow ignore error: {e}')
redis_callback.set_workflow_status(WorkflowStatus.FAILED.value, str(e))
except Exception as e:
logger.exception('execute_workflow error')
redis_callback.set_workflow_status(WorkflowStatus.FAILED.value, str(e)[:100])

可以看出 WorkFlow 的运行本质上是一个 while 循环,直到状态变为 FAILED 或者 SUCCESS 为止