8000 GitHub - sagoo-cloud/flowagent
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

sagoo-cloud/flowagent

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

2 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

FlowAgent

FlowAgent Logo

智能代理工作流引擎 | Intelligent Workflow Engine for AI Agents

Go Version License GitHub Stars GitHub Issues Go Report Card

English | 中文文档

🚀 概述

FlowAgent 是一个专为 AI 智能代理应用设计的强大企业级工作流引擎。它结合函数式编程的优雅性与生产环境所需的稳健性,提供传统和增强两种工作流执行模型,支持复杂的多代理协作、智能路由和并行处理。

✨ 核心特性

  • 🔗 流畅API设计 - 优雅的链式节点构建方式,支持函数式编程风格
  • ⚡ 动态路由 - 基于执行结果的智能流程控制和条件分支
  • 🚀 并行执行 - 内置并发节点处理支持,提升处理效率
  • 🔄 自动重试 - 智能重试机制,支持指数退避策略
  • 📊 全面指标 - 详细的性能监控、执行统计和错误跟踪
  • 🛡️ 错误处理 - 强大的错误恢复机制和优雅降级
  • ⏱️ 超时控制 - 节点级别的超时管理和资源保护
  • 🤖 LLM集成 - 原生支持大语言模型,包括国内主流LLM服务
  • 📋 配置管理 - 灵活的YAML配置文件和环境变量支持
  • 🔒 内存安全 - 线程安全的共享存储和资源管理

🏗️ 系统架构

核心组件

FlowAgent 采用模块化设计,包含以下核心组件:

1. SharedStore (共享存储)

线程安全的数据存储组件,用于节点间数据传递:

  • 支持多种数据类型 (string, int, slice等)
  • 上下文管理和自动资源清理
  • 读写锁机制保证并发安全

2. Node (节点系统)

支持两种节点模型:

传统节点 (BaseNode)

  • 三阶段执行模型:Prep → Exec → Post
  • 适用于简单线性工作流
  • 最小化开销设计

增强节点 (EnhancedNode)

  • 企业级特性:超时控制、重试机制、指标收集
  • 流畅API构建方式
  • 支持复杂路由和并行执行

3. Flow (流程引擎)

工作流执行引擎:

  • 传统流程 (Flow) - 轻量级执行引擎
  • 增强流程 (EnhancedFlow) - 企业级执行引擎,包含详细监控

4. LLM客户端

统一的大语言模型接口:

  • 支持OpenAI API标准
  • 兼容国内主流LLM服务 (通义千问、智谱AI、月之暗面等)
  • 自动降级和模拟响应

执行模型

传统流程执行

Node1[Prep] → Node1[Exec] → Node1[Post] → Node2[Prep] → ...

增强流程执行

Node1[Prep+重试+超时] → Node1[Exec+指标+错误处理] → Node1[Post+路由] → 并行节点处理

📦 安装

go get github.com/sagoo-cloud/flowagent

依赖要求

  • Go 1.19 或更高版本
  • gopkg.in/yaml.v3 (用于配置文件解析)

🎯 快速开始

1. 基础使用 - 传统节点

package main

import (
    "context"
    "fmt"
    "github.com/sagoo-cloud/flowagent"
)

func main() {
    // 创建共享存储
    ctx := context.Background()
    shared := agent.NewSharedStore(ctx)
    defer shared.Close()

    // 创建简单问答节点
    answerNode := agent.NewAnswerNode("问答节点", nil) // nil使用模拟LLM
    
    // 设置问题
    shared.Set("question", "什么是人工智能?")
    
    // 创建并运行流程
    flow := agent.NewFlow("简单问答", answerNode)
    if err := flow.Run(shared); err != nil {
        fmt.Printf("执行失败: %v\n", err)
        return
    }
    
    // 获取结果
    fmt.Printf("答案: %s\n", shared.GetString("answer"))
}

2. 增强版本 - 企业级功能

package main

import (
    "context"
    "fmt"
    "time"
    "github.com/sagoo-cloud/flowagent"
)

func main() {
    // 创建LLM客户端
    llmClient := agent.NewDefaultLLMClient(
        "your-api-key",
        "https://api.openai.com/v1", 
        "gpt-3.5-turbo",
    )

    ctx := context.Background()
    shared := agent.NewSharedStore(ctx)
    defer shared.Close()

    // 构建增强版节点 - 流畅API
    analysisNode := agent.NewEnhancedNode().
        SetName("内容分析").
        SetTimeout(30 * time.Second).
        SetRetries(3).
        SetPrep(func(ctx context.Context, shared *agent.SharedStore) (interface{}, error) {
            content := shared.GetString("content")
            return content, nil
        }).
        SetExec(func(ctx context.Context, prepResult interface{}, shared *agent.SharedStore) (interface{}, error) {
            content := prepResult.(string)
            messages := []agent.LLMMessage{
                {Role: "user", Content: fmt.Sprintf("请分析以下内容的情感倾向: %s", content)},
            }
            return llmClient.Chat(ctx, messages)
        }).
        SetPost(func(ctx context.Context, prepResult interface{}, execResult interface{}, shared *agent.SharedStore) (string, error) {
            result := execResult.(string)
            shared.Set("analysis_result", result)
            
            // 根据结果决定下一步
            if strings.Contains(result, "积极") {
                return "positive", nil
            } else if strings.Contains(result, "消极") {
                return "negative", nil
            }
            return "neutral", nil
        })

    // 创建后续处理节点
    positiveNode := agent.NewEnhancedNode().
        SetName("积极处理").
        SetExec(func(ctx context.Context, prepResult interface{}, shared *agent.SharedStore) (interface{}, error) {
            shared.Set("action", "发送祝贺消息")
            return "处理完成", nil
        })

    negativeNode := agent.NewEnhancedNode().
        SetName("消极处理").
        SetExec(func(ctx context.Context, prepResult interface{}, shared *agent.SharedStore) (interface{}, error) {
            shared.Set("action", "转交人工客服")
            return "处理完成", nil
        })

    // 连接节点路由
    analysisNode.Next("positive", positiveNode).
                Next("negative", negativeNode)

    // 创建并运行增强流程
    flow := agent.NewEnhancedFlow("情感分析工作流", analysisNode)
    
    shared.Set("content", "今天天气真好,心情很愉快!")
    
    if err := flow.Run(ctx, shared); err != nil {
        fmt.Printf("工作流执行失败: %v\n", err)
        return
    }

    // 打印详细指标
    flow.PrintMetrics()
}

3. 并行处理

// 创建多个并行处理节点
emailNode := agent.NewEnhancedNode().
    SetName("发送邮件").
    SetExec(func(ctx context.Context, prepResult interface{}, shared *agent.SharedStore) (interface{}, error) {
        time.Sleep(1 * time.Second) // 模拟发送邮件
        return "邮件已发送", nil
    })

smsNode := agent.NewEnhancedNode().
    SetName("发送短信").
    SetExec(func(ctx context.Context, prepResult interface{}, shared *agent.SharedStore) (interface{}, error) {
        time.Sleep(800 * time.Millisecond) // 模拟发送短信
        return "短信已发送", nil
    })

pushNode := agent.NewEnhancedNode().
    SetName("推送通知").
    SetExec(func(ctx context.Context, prepResult interface{}, shared *agent.SharedStore) (interface{}, error) {
        time.Sleep(500 * time.Millisecond) // 模拟推送
        return "推送已发送", nil
    })

// 创建并行执行节点
parallelNode := agent.NewParallelNode("并行通知", emailNode, smsNode, pushNode)

4. 条件路由

// 智能决策节点
decisionNode := agent.NewConditionalNode("风险评估", 
    func(shared *agent.SharedStore) (string, error) {
        score := shared.GetInt("risk_score")
        amount := shared.GetInt("amount")
        
        if score > 80 && amount > 10000 {
            return "high_risk", nil
        } else if score > 50 || amount > 5000 {
            return "medium_risk", nil
        }
        return "low_risk", nil
    })

// 连接不同处理路径
decisionNode.Next("high_risk", manualReviewNode).
            Next("medium_risk", additionalVerifyNode).
            Next("low_risk", autoApproveNode)

🔧 配置管理

配置文件 (config.yaml)

# 大语言模型配置
llm:
  # 通义千问 (阿里云DashScope)
  qwen:
    api_key: "your-qwen-api-key"
    base_url: "https://dashscope.aliyuncs.com/compatible-mode/v1"
    model: "qwen-turbo"
  
  # 智谱AI (清华)
  zhipu:
    api_key: "your-zhipu-api-key"
    base_url: "https://open.bigmodel.cn/api/paas/v4"
    model: "glm-4"
  
  # 月之暗面 (Kimi)
  moonshot:
    api_key: "your-moonshot-api-key"
    base_url: "https://api.moonshot.cn/v1"
    model: "moonshot-v1-8k"

# 工作流配置
workflow:
  default_timeout: "30s"    # 默认超时时间
  max_retries: 3           # 最大重试次数
  enable_metrics: true     # 启用指标收集
  max_concurrent: 10       # 最大并发数

环境变量

# OpenAI / 兼容接口
export OPENAI_API_KEY="your-api-key"
export OPENAI_BASE_URL="https://api.openai.com/v1"
export OPENAI_MODEL="gpt-3.5-turbo"

# 国内服务商
export QWEN_API_KEY="your-qwen-api-key"
export ZHIPU_API_KEY="your-zhipu-api-key"
export MOONSHOT_API_KEY="your-moonshot-api-key"

📚 完整示例

项目包含丰富的示例代码:

基础示例

企业级示例

性能对比

🎯 典型应用场景

企业级应用

  • 智能审批工作流 - 多级审批,AI辅助决策,自动路由
  • 内容审核系统 - 自动化内容分类、情感分析、合规检查
  • 智能客服 - 多轮对话、意图识别、工单路由
  • 文档处理管道 - OCR识别、内容提取、智能分类、存储归档

AI代理场景

  • 多代理协作 - 专业化代理分工合作,复杂任务分解
  • 智能决策树 - 基于规则和AI的复合决策系统
  • 数据处理管道 - ETL过程智能化,异常检测和处理
  • 监控告警系统 - 智能告警分析、自动处理、升级机制

具体案例

1. 智能客服工作流

接收消息 → 意图识别 → 知识库查询 → 生成回复 → 满意度评估 → 人工转接/结束

2. 内容审核管道

内容接收 → 格式检查 → AI安全审核 → 人工复审 → 发布/拒绝 → 记录审核日志

3. 智能投资顾问

用户输入 → 风险评估 → 策略匹配 → 产品推荐 → 合规检查 → 方案生成 → 跟踪优化

📊 性能特性

高性能设计

  • 并发执行 - 原生支持Go协程,自动并行处理
  • 内存优化 - 智能共享存储管理,避免数据复制
  • 低延迟 - 简单流程零开销,复杂流程最小开销
  • 高吞吐 - 支持批处理和流式处理

可扩展性

  • 水平扩展 - 支持分布式部署和负载均衡
  • 资源控制 - 细粒度的超时和并发控制
  • 弹性处理 - 自动重试和降级机制
  • 监控友好 - 丰富的指标和日志输出

基准测试

  • 简单节点执行:< 1ms
  • 复杂工作流 (10节点):< 50ms
  • 并发处理能力:> 1000 QPS
  • 内存占用:< 10MB (基础运行时)

🛠️ 开发指南

环境准备

# 安装Go 1.19+
go version

# 克隆项目
git clone https://github.com/sagoo-cloud/flowagent.git
cd flowagent

# 安装依赖
go mod tidy

# 运行测试
go test ./...

自定义节点开发

传统节点

type CustomNode struct {
    *agent.BaseNode
    // 自定义字段
}

func (c *CustomNode) Prep(shared *agent.SharedStore) (interface{}, error) {
    // 准备逻辑
    return data, nil
}

func (c *CustomNode) Exec(input interface{}) (interface{}, error) {
    // 执行逻辑
    return result, nil
}

func (c *CustomNode) Post(shared *agent.SharedStore, prepRes, execRes interface{}) (string, error) {
    // 后处理逻辑
    return "next_action", nil
}

增强节点

customNode := agent.NewEnhancedNode().
    SetName("自定义节点").
    SetTimeout(10 * time.Second).
    SetRetries(2).
    SetExec(func(ctx context.Context, prepResult interface{}, shared *agent.SharedStore) (interface{}, error) {
        // 自定义执行逻辑
        return result, nil
    })

错误处理最佳实践

  1. 使用上下文控制 - 传递context进行超时和取消控制
  2. 合理设置重试 - 根据操作类型设置重试次数和延迟
  3. 错误包装 - 提供详细的错误信息用于调试
  4. 资源清理 - 确保SharedStore正确关闭

📄 License

This project is licensed under the MIT License

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

0