FlowAgent 是一个专为 AI 智能代理应用设计的强大企业级工作流引擎。它结合函数式编程的优雅性与生产环境所需的稳健性,提供传统和增强两种工作流执行模型,支持复杂的多代理协作、智能路由和并行处理。
- 🔗 流畅API设计 - 优雅的链式节点构建方式,支持函数式编程风格
- ⚡ 动态路由 - 基于执行结果的智能流程控制和条件分支
- 🚀 并行执行 - 内置并发节点处理支持,提升处理效率
- 🔄 自动重试 - 智能重试机制,支持指数退避策略
- 📊 全面指标 - 详细的性能监控、执行统计和错误跟踪
- 🛡️ 错误处理 - 强大的错误恢复机制和优雅降级
- ⏱️ 超时控制 - 节点级别的超时管理和资源保护
- 🤖 LLM集成 - 原生支持大语言模型,包括国内主流LLM服务
- 📋 配置管理 - 灵活的YAML配置文件和环境变量支持
- 🔒 内存安全 - 线程安全的共享存储和资源管理
FlowAgent 采用模块化设计,包含以下核心组件:
线程安全的数据存储组件,用于节点间数据传递:
- 支持多种数据类型 (string, int, slice等)
- 上下文管理和自动资源清理
- 读写锁机制保证并发安全
支持两种节点模型:
传统节点 (BaseNode)
- 三阶段执行模型:Prep → Exec → Post
- 适用于简单线性工作流
- 最小化开销设计
增强节点 (EnhancedNode)
- 企业级特性:超时控制、重试机制、指标收集
- 流畅API构建方式
- 支持复杂路由和并行执行
工作流执行引擎:
- 传统流程 (Flow) - 轻量级执行引擎
- 增强流程 (EnhancedFlow) - 企业级执行引擎,包含详细监控
统一的大语言模型接口:
- 支持OpenAI API标准
- 兼容国内主流LLM服务 (通义千问、智谱AI、月之暗面等)
- 自动降级和模拟响应
Node1[Prep] → Node1[Exec] → Node1[Post] → Node2[Prep] → ...
Node1[Prep+重试+超时] → Node1[Exec+指标+错误处理] → Node1[Post+路由] → 并行节点处理
go get github.com/sagoo-cloud/flowagent
- Go 1.19 或更高版本
- gopkg.in/yaml.v3 (用于配置文件解析)
package main
import (
"context"
"fmt"
"github.com/sagoo-cloud/flowagent"
)
func main() {
// 创建共享存储
ctx := context.Background()
shared := agent.NewSharedStore(ctx)
defer shared.Close()
// 创建简单问答节点
answerNode := agent.NewAnswerNode("问答节点", nil) // nil使用模拟LLM
// 设置问题
shared.Set("question", "什么是人工智能?")
// 创建并运行流程
flow := agent.NewFlow("简单问答", answerNode)
if err := flow.Run(shared); err != nil {
fmt.Printf("执行失败: %v\n", err)
return
}
// 获取结果
fmt.Printf("答案: %s\n", shared.GetString("answer"))
}
package main
import (
"context"
"fmt"
"time"
"github.com/sagoo-cloud/flowagent"
)
func main() {
// 创建LLM客户端
llmClient := agent.NewDefaultLLMClient(
"your-api-key",
"https://api.openai.com/v1",
"gpt-3.5-turbo",
)
ctx := context.Background()
shared := agent.NewSharedStore(ctx)
defer shared.Close()
// 构建增强版节点 - 流畅API
analysisNode := agent.NewEnhancedNode().
SetName("内容分析").
SetTimeout(30 * time.Second).
SetRetries(3).
SetPrep(func(ctx context.Context, shared *agent.SharedStore) (interface{}, error) {
content := shared.GetString("content")
return content, nil
}).
SetExec(func(ctx context.Context, prepResult interface{}, shared *agent.SharedStore) (interface{}, error) {
content := prepResult.(string)
messages := []agent.LLMMessage{
{Role: "user", Content: fmt.Sprintf("请分析以下内容的情感倾向: %s", content)},
}
return llmClient.Chat(ctx, messages)
}).
SetPost(func(ctx context.Context, prepResult interface{}, execResult interface{}, shared *agent.SharedStore) (string, error) {
result := execResult.(string)
shared.Set("analysis_result", result)
// 根据结果决定下一步
if strings.Contains(result, "积极") {
return "positive", nil
} else if strings.Contains(result, "消极") {
return "negative", nil
}
return "neutral", nil
})
// 创建后续处理节点
positiveNode := agent.NewEnhancedNode().
SetName("积极处理").
SetExec(func(ctx context.Context, prepResult interface{}, shared *agent.SharedStore) (interface{}, error) {
shared.Set("action", "发送祝贺消息")
return "处理完成", nil
})
negativeNode := agent.NewEnhancedNode().
SetName("消极处理").
SetExec(func(ctx context.Context, prepResult interface{}, shared *agent.SharedStore) (interface{}, error) {
shared.Set("action", "转交人工客服")
return "处理完成", nil
})
// 连接节点路由
analysisNode.Next("positive", positiveNode).
Next("negative", negativeNode)
// 创建并运行增强流程
flow := agent.NewEnhancedFlow("情感分析工作流", analysisNode)
shared.Set("content", "今天天气真好,心情很愉快!")
if err := flow.Run(ctx, shared); err != nil {
fmt.Printf("工作流执行失败: %v\n", err)
return
}
// 打印详细指标
flow.PrintMetrics()
}
// 创建多个并行处理节点
emailNode := agent.NewEnhancedNode().
SetName("发送邮件").
SetExec(func(ctx context.Context, prepResult interface{}, shared *agent.SharedStore) (interface{}, error) {
time.Sleep(1 * time.Second) // 模拟发送邮件
return "邮件已发送", nil
})
smsNode := agent.NewEnhancedNode().
SetName("发送短信").
SetExec(func(ctx context.Context, prepResult interface{}, shared *agent.SharedStore) (interface{}, error) {
time.Sleep(800 * time.Millisecond) // 模拟发送短信
return "短信已发送", nil
})
pushNode := agent.NewEnhancedNode().
SetName("推送通知").
SetExec(func(ctx context.Context, prepResult interface{}, shared *agent.SharedStore) (interface{}, error) {
time.Sleep(500 * time.Millisecond) // 模拟推送
return "推送已发送", nil
})
// 创建并行执行节点
parallelNode := agent.NewParallelNode("并行通知", emailNode, smsNode, pushNode)
// 智能决策节点
decisionNode := agent.NewConditionalNode("风险评估",
func(shared *agent.SharedStore) (string, error) {
score := shared.GetInt("risk_score")
amount := shared.GetInt("amount")
if score > 80 && amount > 10000 {
return "high_risk", nil
} else if score > 50 || amount > 5000 {
return "medium_risk", nil
}
return "low_risk", nil
})
// 连接不同处理路径
decisionNode.Next("high_risk", manualReviewNode).
Next("medium_risk", additionalVerifyNode).
Next("low_risk", autoApproveNode)
# 大语言模型配置
llm:
# 通义千问 (阿里云DashScope)
qwen:
api_key: "your-qwen-api-key"
base_url: "https://dashscope.aliyuncs.com/compatible-mode/v1"
model: "qwen-turbo"
# 智谱AI (清华)
zhipu:
api_key: "your-zhipu-api-key"
base_url: "https://open.bigmodel.cn/api/paas/v4"
model: "glm-4"
# 月之暗面 (Kimi)
moonshot:
api_key: "your-moonshot-api-key"
base_url: "https://api.moonshot.cn/v1"
model: "moonshot-v1-8k"
# 工作流配置
workflow:
default_timeout: "30s" # 默认超时时间
max_retries: 3 # 最大重试次数
enable_metrics: true # 启用指标收集
max_concurrent: 10 # 最大并发数
# OpenAI / 兼容接口
export OPENAI_API_KEY="your-api-key"
export OPENAI_BASE_URL="https://api.openai.com/v1"
export OPENAI_MODEL="gpt-3.5-turbo"
# 国内服务商
export QWEN_API_KEY="your-qwen-api-key"
export ZHIPU_API_KEY="your-zhipu-api-key"
export MOONSHOT_API_KEY="your-moonshot-api-key"
项目包含丰富的示例代码:
- 传统vs增强 - 两种执行模式的性能对比
- 智能审批工作流 - 多级审批,AI辅助决策,自动路由
- 内容审核系统 - 自动化内容分类、情感分析、合规检查
- 智能客服 - 多轮对话、意图识别、工单路由
- 文档处理管道 - OCR识别、内容提取、智能分类、存储归档
- 多代理协作 - 专业化代理分工合作,复杂任务分解
- 智能决策树 - 基于规则和AI的复合决策系统
- 数据处理管道 - ETL过程智能化,异常检测和处理
- 监控告警系统 - 智能告警分析、自动处理、升级机制
接收消息 → 意图识别 → 知识库查询 → 生成回复 → 满意度评估 → 人工转接/结束
内容接收 → 格式检查 → AI安全审核 → 人工复审 → 发布/拒绝 → 记录审核日志
用户输入 → 风险评估 → 策略匹配 → 产品推荐 → 合规检查 → 方案生成 → 跟踪优化
- 并发执行 - 原生支持Go协程,自动并行处理
- 内存优化 - 智能共享存储管理,避免数据复制
- 低延迟 - 简单流程零开销,复杂流程最小开销
- 高吞吐 - 支持批处理和流式处理
- 水平扩展 - 支持分布式部署和负载均衡
- 资源控制 - 细粒度的超时和并发控制
- 弹性处理 - 自动重试和降级机制
- 监控友好 - 丰富的指标和日志输出
- 简单节点执行:< 1ms
- 复杂工作流 (10节点):< 50ms
- 并发处理能力:> 1000 QPS
- 内存占用:< 10MB (基础运行时)
# 安装Go 1.19+
go version
# 克隆项目
git clone https://github.com/sagoo-cloud/flowagent.git
cd flowagent
# 安装依赖
go mod tidy
# 运行测试
go test ./...
type CustomNode struct {
*agent.BaseNode
// 自定义字段
}
func (c *CustomNode) Prep(shared *agent.SharedStore) (interface{}, error) {
// 准备逻辑
return data, nil
}
func (c *CustomNode) Exec(input interface{}) (interface{}, error) {
// 执行逻辑
return result, nil
}
func (c *CustomNode) Post(shared *agent.SharedStore, prepRes, execRes interface{}) (string, error) {
// 后处理逻辑
return "next_action", nil
}
customNode := agent.NewEnhancedNode().
SetName("自定义节点").
SetTimeout(10 * time.Second).
SetRetries(2).
SetExec(func(ctx context.Context, prepResult interface{}, shared *agent.SharedStore) (interface{}, error) {
// 自定义执行逻辑
return result, nil
})
- 使用上下文控制 - 传递context进行超时和取消控制
- 合理设置重试 - 根据操作类型设置重试次数和延迟
- 错误包装 - 提供详细的错误信息用于调试
- 资源清理 - 确保SharedStore正确关闭
This project is licensed under the MIT License