当前位置: 首页 > news >正文

gopool 源码分析

gopool

gopool是字节跳动开源节流的gopkg包中协程池的一个实现。

关键结构

协程池:

type pool struct {// The name of the poolname string// capacity of the pool, the maximum number of goroutines that are actually working// 协程池的最大容量cap int32// Configuration informationconfig *Config// linked list of tasks// 任务链表taskHead  *tasktaskTail  *tasktaskLock  sync.MutextaskCount int32// Record the number of running workers// 运行中的协程数workerCount int32// This method will be called when the worker panic// 出现 panic 时调用、 panicHandler func(context.Context, interface{})
}

任务:

type task struct {ctx context.Contextf   func()next *task
}

worker:

type worker struct {pool *pool
}

源码分析

先说一下 gopool 的工作流程:

  1. 通过 Go 或者 CtxGo 方法调用
  2. 从 taskPool 中取出一个 t
  3. 如果当前的积压task达到阈值且worker(工作协程)的数量未达到上限,则新建一个worker。

pool.cap 最大工作协程与实际运行的最大协程可能会存在误差。因为新建worker这块不是原子操作:

if (atomic.LoadInt32(&p.taskCount) >= p.config.ScaleThreshold && p.WorkerCount() < atomic.LoadInt32(&p.cap)) || p.WorkerCount() == 0 {// 工作协程加1p.incWorkerCount()w := workerPool.Get().(*worker)w.pool = pw.run()}

worker 的最大数量不会超过pool.cap 。worker run 流程比较简单:

  1. 循环的从 pool 中取出 task 执行

为了方便查看源码,我把相关代码都粘到了下面的,详细流程如下:

var workerPool sync.Poolvar taskPool sync.Pool// 初始化 taskPool
func init() {taskPool.New = newTask
}func (p *pool) Go(f func()) {p.CtxGo(context.Background(), f)
}func (p *pool) CtxGo(ctx context.Context, f func()) {// 从 taskPool 中取 task,避免频繁创建销毁t := taskPool.Get().(*task)t.ctx = ctx// 赋值执行函数t.f = f// 将 t 添加到任务链表里,加锁保证并发安全p.taskLock.Lock()if p.taskHead == nil {p.taskHead = tp.taskTail = t} else {p.taskTail.next = tp.taskTail = t}p.taskLock.Unlock()// 任务链表数量原子加 1atomic.AddInt32(&p.taskCount, 1)// The following two conditions are met:// 1. the number of tasks is greater than the threshold.// 2. The current number of workers is less than the upper limit p.cap.// or there are currently no workers.// 满足以下两个条件:// 1.任务数大于等于设置的阈值(默认为1)// 2.当前的协程数低于上限,或者目前没有工人if (atomic.LoadInt32(&p.taskCount) >= p.config.ScaleThreshold && p.WorkerCount() < atomic.LoadInt32(&p.cap)) || p.WorkerCount() == 0 {// 工作协程加1p.incWorkerCount()w := workerPool.Get().(*worker)w.pool = pw.run()}
}func (w *worker) run() {go func() {for {var t *taskw.pool.taskLock.Lock()if w.pool.taskHead != nil {// 取出任务t = w.pool.taskHeadw.pool.taskHead = w.pool.taskHead.nextatomic.AddInt32(&w.pool.taskCount, -1)}// 没有任务则结束if t == nil {// if there's no task to do, exitw.close()w.pool.taskLock.Unlock()w.Recycle()return}w.pool.taskLock.Unlock()func() {defer func() {if r := recover(); r != nil {if w.pool.panicHandler != nil {w.pool.panicHandler(t.ctx, r)} else {msg := fmt.Sprintf("GOPOOL: panic in pool: %s: %v: %s", w.pool.name, r, debug.Stack())logger.CtxErrorf(t.ctx, msg)}}}()// 执行t.f()}()t.Recycle()}}()
}func (t *task) Recycle() {t.zero()taskPool.Put(t)
}
http://www.lqws.cn/news/215767.html

相关文章:

  • 跨平台资源下载工具:res-downloader 的使用体验
  • electron-vite串口通信
  • 第9篇:数据库中间件的容错机制与高可用架构设计
  • 阿里云 RDS mysql 5.7 怎么 添加白名单 并链接数据库
  • 34 C 语言字符串转数值函数详解:strtol、strtoll、strtoul、strtoull(含 errno 处理、ERANGE 错误)
  • 坚持每日Codeforces三题挑战:Day 4 - 题目详解(2025-06-07,难度:1000, 1100, 1400)
  • 6个月Python学习计划 Day 16 - 面向对象编程(OOP)基础
  • 利用qcustomplot绘制曲线图
  • 智慧零售管理中的客流统计与属性分析
  • 前端开发者常用网站
  • 基于51单片机的光强调节LED亮度
  • 关于脏读,幻读,可重复读的学习
  • python训练营打卡第47天
  • CAD2025安装教程与资源下载
  • 湖北理元理律师事务所:债务咨询中的心理支持技术应用
  • cv2.stereoRectify中R1, R2, P1, P2, Q中每一个分量的物理意义
  • SMC自修改代码一
  • MCP详解及协议的使用(python版本和Node版本)
  • FreeRTOS事件组-笔记
  • AI大神吴恩达-提示词课程笔记
  • 【Go语言基础【14】】defer与异常处理(panic、recover)
  • 深入剖析MySQL存储架构,索引结构,日志机制,事务提交流程
  • 【HarmonyOS 5.0】DevEco Testing:鸿蒙应用质量保障的终极武器
  • KTO: Model Alignment as Prospect Theoretic Optimization
  • [总结篇]个人网站
  • XGBoost时间序列预测之-未来销量的预测
  • 【氧化镓】HTFB应力对β - Ga2O3 SBD的影响
  • 【JavaSE】泛型学习笔记
  • GIC700组件
  • 什么是预训练?深入解读大模型AI的“高考集训”