当前位置: 首页 > news >正文

Ragflow 源码:task_executor.py

目录

    • 介绍
      • 主要功能
      • 核心组件
    • 流程图
    • 核心代码解释
      • 1. 系统架构与核心组件
      • 2. 核心处理流程
      • 3. 高级处理能力
      • 4. 关键创新点
      • 5. 容错与监控机制
      • 6. 性能优化技巧

介绍

task_executor.py 是RAGFlow系统中的任务执行器(Task Executor)核心部分,主要负责文档的解析、分块(chunking)、向量化(embedding)和索引(indexing)处理流程。

主要功能

  1. 文档处理流水线

    • 从存储系统(如MinIO)获取文档
    • 根据文档类型选择相应的解析器(parser)
    • 将文档分块处理
    • 生成向量表示(embeddings)
    • 存储到向量数据库中
  2. 高级处理能力

    • 支持RAPTOR(递归抽象处理)算法
    • 支持知识图谱(GraphRAG)构建
    • 自动关键词提取和问题生成
    • 内容标签自动标注
  3. 任务管理

    • 从Redis队列获取任务
    • 任务状态跟踪和报告
    • 任务取消处理
    • 失败任务恢复

核心组件

  1. 文档解析器工厂(FACTORY)

    • 针对不同类型的文档(论文、书籍、演示文稿、法律文件等)有不同的解析器
    • 使用注册模式动态选择解析器
  2. 并发控制

    • 使用Trio库实现异步并发
    • 通过CapacityLimiter控制并发任务数
    • 分块构建、MinIO操作等都有独立的并发限制
  3. 错误处理和监控

    • 详细的任务状态跟踪
    • 心跳报告机制
    • 内存监控和快照功能
    • 任务取消和超时处理

流程图

请添加图片描述

核心代码解释

1. 系统架构与核心组件

  • 并发控制体系

    • 使用trio异步框架实现高效I/O操作
    • 四级并发限制器:
      task_limiter = trio.CapacityLimiter(MAX_CONCURRENT_TASKS)  # 任务级并发(默认5)
      chunk_limiter = trio.CapacityLimiter(MAX_CONCURRENT_CHUNK_BUILDERS)  # 分块处理并发(默认1)
      minio_limiter = trio.CapacityLimiter(MAX_CONCURRENT_MINIO)  # 存储操作并发(默认10)
      kg_limiter = trio.CapacityLimiter(2)  # 知识图谱处理并发
      
  • 文档处理工厂模式

    FACTORY = {"general": naive,ParserType.PAPER.value: paper,  # 学术论文处理器ParserType.BOOK.value: book,    # 书籍处理器ParserType.TABLE.value: table,  # 表格处理器# ...其他15+种文档类型处理器
    }
    

2. 核心处理流程

  • 任务处理主循环 (handle_task函数):

    async def handle_task():redis_msg, task = await collect()  # 从Redis获取任务CURRENT_TASKS[task["id"]] = task  # 登记当前任务try:await do_handle_task(task)    # 执行实际处理redis_msg.ack()              # 确认任务完成except Exception as e:FAILED_TASKS += 1set_progress(task["id"], prog=-1, msg=f"[Exception]: {str(e)}")
    
  • 文档处理三阶段 (do_handle_task函数):

    # 阶段1:文档解析与分块
    chunks = await build_chunks(task, progress_callback)# 阶段2:向量化处理
    token_count, vector_size = await embedding(chunks, embedding_model)# 阶段3:存储索引
    await settings.docStoreConn.insert(chunks, index_name, kb_id)
    

3. 高级处理能力

  • RAPTOR算法实现

    raptor = Raptor(max_cluster=64,                 # 最大聚类数chat_model=chat_mdl,            # LLM模型embd_model=embd_mdl,            # 嵌入模型prompt=config["prompt"],        # 聚类提示词max_token=config["max_token"],  # 最大token数threshold=config["threshold"]   # 相似度阈值
    )
    chunks = await raptor.process(original_chunks)
    
  • 知识图谱构建

    await run_graphrag(task, language=task_language,with_resolution=True,      # 是否解析关系with_community=True,       # 是否构建社区chat_model=chat_mdl,embd_model=embd_mdl
    )
    

4. 关键创新点

  • 智能分块增强

    # 自动关键词提取
    d["important_kwd"] = keyword_extraction(chat_mdl, content)# 自动问题生成
    d["question_kwd"] = question_proposal(chat_mdl, content)# 智能标签系统
    d[TAG_FLD] = content_tagging(chat_mdl, content, all_tags)
    
  • 混合向量生成

    # 标题向量权重调整
    title_w = parser_config.get("filename_embd_weight", 0.1)
    vects = (title_w * title_vectors + (1-title_w) * content_vectors)
    

5. 容错与监控机制

  • 分布式锁管理

    with RedisDistributedLock("clean_task_executor"):# 清理超时workerREDIS_CONN.srem("TASKEXE", expired_workers)
    
  • 内存监控系统

    def start_tracemalloc_and_snapshot():tracemalloc.start()snapshot = tracemalloc.take_snapshot()snapshot.dump(f"snapshot_{timestamp}.trace")logging.info(f"Peak memory: {peak / 10**6:.2f} MB")
    
  • 心跳监测系统

    REDIS_CONN.zadd(CONSUMER_NAME, json.dumps({"pending": PENDING_TASKS,"current": CURRENT_TASKS,# ...其他状态指标}), timestamp
    )
    

6. 性能优化技巧

  • 批量处理策略

    # 向量化批量处理
    for i in range(0, len(texts), batch_size=16):vectors = await mdl.encode(texts[i:i+16])
    
  • 缓存机制

    # LLM结果缓存
    cached = get_llm_cache(llm_name, text, "keywords")
    if not cached:cached = await keyword_extraction(llm, text)set_llm_cache(llm_name, text, cached)
    
http://www.lqws.cn/news/478225.html

相关文章:

  • Sqlserver 设置对特定数据库特定表只读访问权限
  • 1928: 日期差值 codeup
  • MySQL安装与配置【windowsMac】
  • Unity3D仿星露谷物语开发69之动作声音
  • Unity Addressable使用之服务器远程加载
  • leetcode:面试题 08.01. 三步问题
  • AWS认证系列:考点解析 - cloud trail,cloud watch,aws config
  • JavaEE-Mybatis初阶
  • ubuntu24.04+5090显卡驱动安装踩坑
  • C4.5算法深度解析:决策树进化的里程碑
  • 低空经济三大赛道深度解析:交通、安防、能源领域的革命性突破
  • 华为公布《鸿蒙编程语言白皮书》V1.0 版:解读适用场景
  • es中向量索引的增量更新
  • Linux:早期操作系统的系统调用
  • 【论文阅读笔记】TransparentGS:当高斯溅射学会“看穿”玻璃,如何攻克透明物体重建难题?
  • Day56打卡 @浙大疏锦行
  • 【threejs】一天一个小案例讲解:控制面板(GUI)
  • 扩散模型与强化学习(1):字节Seedance中的人类偏好优化实践
  • 华为云Flexus+DeepSeek征文|基于Dify构建解析网页写入Notion笔记工作流
  • sqlite3 数据库反弹shell
  • 开发语言本身只是提供了一种解决问题的工具
  • 【AI智能体】Spring AI MCP 服务常用开发模式实战详解
  • TDengine 3.3.5.0 新功能——服务端查询内存管控
  • PaddleOCR + Flask 构建 Web OCR 服务实战
  • Flink Sink函数深度解析:从原理到实践的全流程探索
  • 63-Oracle LogMiner(23ai)-实操
  • 合成生物学与人工智能的融合:从生命编程到智能设计的IT新前沿
  • 华为云Flexus+DeepSeek征文|在Dify-LLM平台中开发童话故事精灵工作流AI Agent
  • Kafka动态配置深度解析
  • 测试用例原则之 FIRST/CORRECT/5C原则