Linux 内核学习(12) --- Linux workqueue
目录
- workqueue 简介
- Default workqueue
- Init workqueue
- schedule workqueue
- flush workqueue
- cacel workqueue
- Custom workqueue
- create workqueue
- queue workqueue
- tasklet 区别和联系
workqueue 简介
work queue
即工作队列,也是中断下半部的一种
work queue
将下半部工作推迟给一个内核线程去执行 ==> work struct
总是会在进程的上下文执行,
重要的是 workqueue
允许重新调度甚至睡眠
两个关键的点:
- 如果推迟的工作需要睡眠,则使用
workqueue
,否则使用softirq
或tasklets
workqueue
适用于需要分配大量的内存,获得一个信号量,或者执行阻塞的I/O
的情况
工作队列创建的内核线程称为工作者线程(work thread
),工作队列子系统创建了一个缺省的工作者线程来处理这些推后的工作,一般都是使用缺省的工作线程
workqueue
允许在两个主要类型的线程中执行工作:
-
普通的内核线程:这些线程可以执行任何类型的工作,并且可以在多个CPU上并行执行,它们适用于通用的、非CPU亲和性的工作
-
绑定到特定CPU的内核线程:这些线程与特定的CPU核心绑定,并专门用于在该核心上执行工作,这适用于需要与特定硬件交互或者需要保持数据局部性的任务。
内核中存在两种类型的工作者线程:
默认工作队列(default workqueue
):这是最常见的工作队列,它由内核自动创建和管理工作线程,默认工作队列中的工作可以在系统的任何一个CPU上执行
自定义工作队列(custom workqueue
):用户可以创建自己的工作队列,并指定它们的工作线程是否绑定到特定的CPU
核心上,如果绑定到特定的 CPU
,那么工作只会在这个 CPU
上执行,这有助于提高缓存亲和性
对于默认工作队列,内核会为每个 CPU
核心创建一个工作者线程,这些线程的名字通常以 kworker/
开头,后面跟着 CPU
编号
如果一个系统有4个 CPU
核心,那么可能会有名为 kworker/0:0
、kworker/1:0
、kworker/2:0
和 kworker/3:0
的线程
比如 ubuntu
上的默认工作队列:
// 这里仅仅列出了 CPU 1 上的 kworker 线程
root 15617 0.0 0.0 0 0 ? I 10:41 0:00 [kworker/1:2-events]
root 16215 0.0 0.0 0 0 ? I 11:55 0:00 [kworker/1:0-events]
root 19089 0.0 0.0 0 0 ? I< 12:03 0:00 [kworker/1:1H]
root 19111 0.0 0.0 0 0 ? I< 12:04 0:00 [kworker/1:4H-kblockd]
root 19330 0.0 0.0 0 0 ? I< 12:51 0:00 [kworker/1:0H-kblockd
Linux 内核会动态创建多个 kworker/<CPU_ID>:<FLAGS>
线程来处理默认工作队列,主要原因涉及 并发性能、CPU 亲和性、任务隔离和优先级管理
内核会根据任务负载动态增加会减少 kworker
线程数量,空闲时仅保留少量线程,负载高时又会创建新的线程
对于自定义工作队列,如果它们是绑定到 CPU
的,那么线程的名字会反映这一点,例如 kworker/u2:0
表示这是一个绑定到 CPU
核心 0
的用户创建的工作队列的工作者线程
比如 ubuntu
上的自定义工作队列:
146 0.0 0.0 0 0 ? I< 07:25 0:00 [kworker/u257:0-hci0]
root 617 0.0 0.0 0 0 ? I< 07:26 0:00 [kworker/u257:2-hci0]
root 15265 0.0 0.0 0 0 ? I 10:20 0:03 [kworker/u256:1-events_freezable_power_]
root 19215 0.0 0.0 0 0 ? I 12:30 0:00 [kworker/u256:0-events_power_efficient]
root 19221 0.0 0.0 0 0 ? I 12:37 0:00 [kworker/u256:3-events_unbound]
其中 kworker/u
表示 unbound,不绑定到特定的 CPU
,由内核全局进行管理
Default workqueue
Init workqueue
DECLARE_WORK(name, void (*func)(void* ));// include/linux/workqueue.h n for name f for function
#define DECLARE_WORK(n, f) struct work_struct n = __WORK_INITIALIZER(n, f)
DECLARE_WORK
会静态的创建一个名称为 name
,处理函数为 func
的 work_struct
结构,这个 work_struct
函数每个工作队列的成员都会带一个
注意 DECLARE_WORK
包含了定义 workstruct
结构的功能,定义了 workstruct
并且指定了处理函数
也可以在运行时通过指针创建一个 work
,传入的 work_struct
的指针
函数原型: INIT_WORK(struct work_struct *work, void(*func)(void*))
#define INIT_WORK(_work, _func) __INIT_WORK((_work), (_func), 0)
这样会动态的初始化一个 work
指向的工作,处理函数为 func
工作队列的处理函数原型:
void work_handler(void *data)
这样的函数会有一个工作者线程执行,默认的情况下,允许响应中断,并且不持有任何的锁,如何需要,函数可以睡眠
schedule workqueue
对工作队列进行调度:
static inline bool schedule_work(struct work_struct *work);
static inline bool schedule_delayed_work(struct delayed_work *dwork,unsigned long delay);
schedule_work
把给定的处理函数提交给缺省events
的工作线程,work
马上会进行调度,一旦其所在处理器上的工作者线程被唤醒,它就会执行schedule_delayed_work
经过一段时间后延时执行
int schedule_work_on( int cpu, struct work_struct *work );
int scheduled_delayed_work_on(int cpu, struct delayed_work *dwork, unsigned long delay );
schedule_work_on
将work
调度到特定的cpu
上执行scheduled_delayed_work_on
等待一段时间后将work
加入workqueue
中,并且调度到特定的处理器(cpu
)上执行
flush workqueue
排入工作队列的工作会在 workthread
下一次被唤醒时执行,有时,在下一步工作之前,必须保证一些操作已经执行完毕了
卸载之前,有可能需要调用下面的函数,在内核的其他部分,为了防止竞争条件的出现,也有可能需要确保不再有待处理的工作:
static inline void flush_scheduled_work(void);
函数会一直等待,直到队列中所有的对象都被执行以后才会返回,在等待所有待处理的工作执行的时候,该函数会进入休眠状态,所以只能在进程上下文使用
cacel workqueue
int cancel_work_sync( struct work_struct *work );
int cancel_work_sync( struct work_struct *work );
cancel_work_sync
:取消workqueue
中尚未调度的work
, 如果该work
在执行过程中,那么当前调用将阻塞直到handler
执行完成cancel_delayed_work_sync
: 作用类似于cancel_work_sync
,但是它针对的是延迟入队的工作(delayed work
)
使用默认工作队列 demo
:
static void static_wq_fn(struct work_struct *work) {printk(KERN_INFO "Static workqueue function called on CPU[%d]\n", smp_processor_id());
}static void dynamic_wq_fn(struct work_struct *work) {printk(KERN_INFO "Dynamic workqueue function called on CPU[%d]\n", smp_processor_id());
}// struct work_struct static_work =
static DECLARE_WORK(static_work, static_wq_fn);
static struct work_struct dynamic_work;static int __init demo_mdrv_init(void) {int i;INIT_WORK(&dynamic_work, dynamic_wq_fn);for (i = 0; i < 3; i++) {//schedule_work(&static_work);// schedule work on cpu 2schedule_work_on(2, &static_work);schedule_work_on(3, &dynamic_work);flush_scheduled_work();printk(KERN_INFO "static_work: scheduling work index:%d\n", i);}printk(KERN_DEBUG "module has been initialized! \n");return 0;
}static void __exit demo_mdrv_exit(void) {flush_scheduled_work();cancel_work_sync(&static_work);cancel_work_sync(&dynamic_work);printk(KERN_DEBUG "mdrv workqueue module has been exited!\n");
}
Custom workqueue
struct workqueue_struct
是 Linux 内核中用于表示工作队列的结构体,
工作队列允许将需要延迟执行的工作从中断上下文移到进程上下文中执行,这样可以避免在中断上下文中进行复杂和耗时的操作
struct workqueue_struct
的定义在 include/linux/workqueue.h
下面是其简化的结构体定义:
struct workqueue_struct {struct list_head list;char name[WQ_NAME_LEN];struct pool_workqueue __percpu* cpu_pwqs;
}
list
: 用于将多个工作队列链接到一起name
: 工作对列的名称cpu_wq
: 指向每个CPU的工作队列结构的指针,工作队列通常是按CPU分布的,以便于在多处理器系统上并行执行
struct work_struct
只是表示工作/任务本身,它需要被添加到 struct workqueue_struct
中才会被调用和执行
create workqueue
create_workqueue
和 destroy_workqueue
用来创建和销毁工作队列
struct workqueue_struct *create_workqueue( name );
void destroy_workqueue( struct workqueue_struct * );#define create_workqueue(name) alloc_workqueue("%s", WQ_MEM_RECLAIM, 1, (name))
#define create_singlethread_workqueue(name) alloc_workqueue("%s", WQ_UNBOUND | WQ_MEM_RECLAIM, 1, (name))struct workqueue_struct *alloc_workqueue(const char *fmt,unsigned int flags,int max_active, ...
本质上两个宏都是调用 alloc_workqueue
这个函数实现的
fmt
: 创建工作队列的 printf 格式名称flags
: WQ_* 参数,用于指定工作队列的行为和属性max_active
: 同时最大处理的工作项数量
下面是一些常用的 WQ_*
参数:
-
WQ_UNBOUND
: 工作队列中的工作项由不绑定到任何特定CPU的特殊工作池(unbound worker-pool
)处理,该工作队列就像一个简单的执行上下文提供者,没有并发管理,它会尽可能快地启动对工作项的处理 -
WQ_FREEZABLE
: 在系统挂起(比如待机或休眠)时,这种工作队列会停止接收新任务,并等待当前的任务完成,在系统恢复时,再继续处理新任务 -
WQ_MEM_RECLAIM
: 这种工作队列在系统内存非常紧张的时候仍然能够保证有线程执行任务,适合于在内存不足时也要执行的任务,例如内存回收操作 -
WQ_HIGHPRI
:高优先级工作队列中的工作项被排入目标CPU的高优先级工作池,并且由具有较高nice级别的工作线程处理。正常和高优先级工作池不会互相影响,因为每个工作池维护其独立的工作线程池,并在其工作线程之间实现并发管理 -
WQ_CPU_INTENSIVE
:CPU密集型任务不会妨碍其他任务的执行,确保所有任务都能被调度执行
queue workqueue
int queue_work( struct workqueue_struct *wq, struct work_struct *work );
int queue_work_on( int cpu, struct workqueue_struct *wq, struct work_struct *work );
int queue_delayed_work( struct workqueue_struct *wq, struct delayed_work *dwork, unsigned long delay );
int queue_delayed_work_on( int cpu, struct workqueue_struct *wq, struct delayed_work *dwork, unsigned long delay );
-
queue_work
保证工作队列任务尽量会在提交任务的那个CPU
上运行,以保证任务的局部性(locallity
),但是如果那个CPU
不可用,任务会由其他的CPU
处理,这种机制可以确保任务不会因为某个CPU
的不可用而无法执行 -
queue_work_on
提交到特定的CPU
上运行 -
queue_delayed_work
在任务提交到工作队列之前延时一段时间,用delay
参数指定 -
queue_delayed_work_on
任务提交到特定的CPU
上执行之前等待一段时间,用delay
参数指定
struct custom_work {struct work_struct work;int id;int execution_count;char message[64];
};static struct workqueue_struct *custom_wq = NULL;static void custom_work_handler(struct work_struct *work) {struct custom_work *my_work = container_of(work, struct custom_work, work);my_work->execution_count++;printk(KERN_INFO "my_work:CPU[%d] execution #%d for id=%d, message='%s'\n", smp_processor_id(), my_work->execution_count, my_work->id, my_work->message);
}static int __init demo_mdrv_init(void) {struct custom_work *work;int i;custom_wq = alloc_workqueue("my_workqueue", WQ_UNBOUND | WQ_MEM_RECLAIM, 1);// create 3 workstruct and queue for every onefor (i = 0; i < 3; i++) {work = kzalloc(sizeof(struct custom_work), GFP_KERNEL);if (!work) {printk(KERN_ERR "Failed to allocate work for id=%d\n", i);}INIT_WORK(&work->work, custom_work_handler);work->id = i;work->execution_count = 0;snprintf(work->message, sizeof(work->message), "call from work item %d", i);printk(KERN_INFO "my_work: scheduling work %d\n", i);queue_work(custom_wq, &work->work);//queue_work_on(4, custom_wq, &work->work);flush_workqueue(custom_wq);}printk(KERN_DEBUG "module has been initialized! \n");return 0;
}static void __exit demo_mdrv_exit(void) {flush_workqueue(custom_wq);destroy_workqueue(custom_wq);printk(KERN_DEBUG "mdrv workqueue module has been exited!\n");
}
tasklet 区别和联系
任务队列和工作队列的区别:
任务队列(tasklet):
- 任务队列是基于软中断的机制,它们是轻量级的,用于处理短小的、不需要睡眠的底半部任务。
- 任务队列保证在任意时刻在同一个
CPU
核心上只能执行一个特定的任务队列,因此它们不需要处理并发执行的问题,简化了同步。 - 任务队列的执行上下文是中断上下文,这意味着任务队列中的代码不能睡眠
注意 tasklet
虽然是在中断上下文执行,但是不是在中断中执行的
工作队列(workqueue
):
工作队列是用于处理需要较长时间或者可以睡眠的底半部任务的机制。
- 工作队列中的任务会在内核线程中执行,这意味着它们可以睡眠,可以调用可能导致阻塞的函数,如kmalloc、msleep等。
- 工作队列提供了更多的灵活性,可以创建自定义的队列,并且可以控制任务的并发性和执行顺序。
- 工作队列通常用于处理那些对延迟不太敏感的底半部任务,或者需要较长时间处理的任务