c++多线程编程
一、线程创建
语法:
std::thread t1(function_name, arg1, arg2,...)
// 或
std::thread t2 = std::thread(function_name, arg1, arg2,...)
使用说明:创建线程后,使用t.join()函数等待线程完成,或者使用t.detach()分离线程,让它在后台运行。
方法 | 含义 | 特点 |
---|---|---|
join() | 等待线程执行完毕,主线程阻塞等待 | 安全、推荐 |
detach() | 分离线程后台执行,主线程不等待 | 一旦分离,无法再控制该线程;易出错但适合后台任务 |
示例:
void print() {std::cout << "hello world!" << std::endl;
}
int main() {std::thread t1(print); // 创建线程1std::thread t2 = std::thread(print); // 创建线程2if (t1.joinable()) t1.join();if (t2.joinable()) t2.join();
return 0;
}
二、线程参数传递方式
1、值传递(默认行为); 2、引用传递:需 std::ref()
包装; 3、右值引用:传递移动语义对象。
注:当使用引用来传递参数时,需要用std::ref()来包装引用,否则编译报错
void print(string str){cout << str << endl;
}
int x = 0; //全局变量
void increment(int& x){++x;
}
void print_r(std::string&& str) {std::cout << str << std::endl;
}int main() {thread t1(print,"helloword!"); // 值传递thread t2 = thread(increment, ref(x)); // 引用传递std::string s = "hello";std::thread t(print_r, std::move(s)); // 使用std::move将左值s转换为右值传递if (t1.joinable()) t1.join();if (t2.joinable()) t2.join();if (t.joinable()) t.join();cout << x << endl;return 0;
}
三、线程同步机制
如多个线程同时访问同一个变量,并且其中至少有一个线程对该变量进行写操作,那么就会出现数据竞争问题。可能导致程序崩溃、产生未定义、得到错误的结果。 为了避免数据竞争问题,需要使用同步机制来确保多个线程间的数据访问安全。常见的同步机制有:互斥量、条件变量、原子操作等。
1、互斥量
std::mutex mtx;
mtx.lock();
// 临界区
mtx.unlock();
示例:
int x = 0;
mutex mtx;
void increment(){for(int i = 0; i < 1000000; ++i){mtx.lock(); // 加锁++x;mtx.unlock(); //解锁}
}
int main() {thread t1(increment);thread t2 = thread(increment);
if (t1.joinable()) t1.join();if (t2.joinable()) t2.join();
cout << x << endl;
return 0;
}
2、死锁
死锁场景:
假设有两个线程T1和T2,他们需要对两个互斥量mtx1和mtx2进行访问,而且需要按照一下顺序获取互斥量的所有权: T1先获取mtx1的所有权,在获取mtx2的所有权; T1先获取mtx2的所有权,在获取mtx1的所有权; 如果两个线程同时执行,就会出现死锁问题,因为T1获取了mtx1的所有权,但是无法获取mtx2的所有权,而T2获取了mtx2的所有权,无法获取mtx1的所有权,两个线程互相等待对象释放互斥量,导致死锁。
int x = 0;
mutex mtx1;
mutex mtx2;
void increment_1(){for(int i = 0; i < 1000000; ++i){mtx1.lock(); // 加锁mtx2.lock();++x;mtx1.unlock(); //解锁mtx2.unlock();}
}
void increment_2(){for(int i = 0; i < 1000000; ++i){mtx2.lock(); // 加锁mtx1.lock();++x;mtx2.unlock(); //解锁mtx1.unlock();}
}
int main() {thread t1(increment_1);thread t2 = thread(increment_2);
if (t1.joinable()) t1.join();if (t2.joinable()) t2.join();
cout << x << endl;
return 0;
}
解决办法:可使用 std::lock(mtx1, mtx2)
+ std::lock_guard
避免死锁:
std::lock(mtx1, mtx2);
std::lock_guard<std::mutex> l1(mtx1, std::adopt_lock);
std::lock_guard<std::mutex> l2(mtx2, std::adopt_lock);
++x;
3、lock_guard
定义:是一种互斥量封装类,用于保护共享数据,防止多个线程同时访问同一资源而导致数据竞争问题。
特点:a、当构造函数被调用时,互斥量会被自动锁定 b、当析构函数被调用时,互斥量会被自动解锁 c、对象不能被复制或移动,只能在局部作用域中使用
优点:异常安全,进入作用域即加锁,退出自动解锁; 缺点:无法控制加锁时机,不支持条件变量。
示例:
int x = 0;
mutex mtx;
void increment(){for(int i = 0; i < 1000000; ++i){{lock_guard<mutex> l(mtx);++x;}}
}
int main() {thread t1(increment);thread t2(increment);
if (t1.joinable()) t1.join();if (t2.joinable()) t2.join();cout << x << endl;
return 0;
}
4、unique_lock
定义:unique_lock是一种互斥量封装类,用于多线程程序中对互斥量的加锁和解锁操作。
特点:可以对互斥量进行更灵活的管理,包括延迟加锁、条件变量、超时等。
成员函数: a、lock():对互斥量进行加锁,若当前互斥量被其他线程占有,则当前线程被阻塞,直到互斥量被加锁成果; b、try_lock():尝试对互斥量进行加锁,若当前互斥量被其他线程占有,则函数立即返回false,否则返回true; c、try_lock_for(const chrono::duration<Rep, _Period>& rtime):对互斥量进行加锁,若当前互斥量被其他线程占有,则当前线程被阻塞,直到互斥量被加锁成果,或者超过指定时间; d、try_lock_until(const chrono::time_point<Clock, _Duration>& atime):对互斥量进行加锁,若当前互斥量被其他线程占有,则当前线程被阻塞,直到互斥量被加锁成果,或者超过指定时间点; e、unlock():对互斥量进行解锁操作。
加、解锁操作:
// 自动加锁、解锁
for(int i = 0; i < 100000; ++i){unique_lock<mutex> lg(mtx);++x;
}
// 手动加锁、自动解锁
for(int i = 0; i < 100000; ++i){unique_lock<mutex> lg(mtx, defer_lock);lg.lock();++x;
}
// 手动加锁、 手动解锁
for(int i = 0; i < 100000; ++i){unique_lock<mutex> lg(mtx, defer_lock);lg.lock();++x;lg.unlock();
}
加锁(延迟等待一段时间):必须要用timed_mutex(mutex不支持时间操作)
for(int i = 0; i < 100000; ++i){unique_lock<timed_mutex> lg(mtx, defer_lock);lg.try_lock_for(chrono::seconds(5));++x;
}
加锁(延迟等待到时间点):
for (int i = 0; i < 100000; ++i) {unique_lock<timed_mutex> lg(mtx, defer_lock);if (lg.try_lock_until(chrono::steady_clock::now() + chrono::milliseconds(100))) {++x;}
}
5、call_once+once_flag
作用:确保某个初始化函数在多线程中只执行一次(比如单例的创建中)
mutex mtx;
static once_flag once;
class LOG{
public:LOG() = default;LOG(const LOG& Log) = delete;LOG& operator=(const LOG& Log) = delete;
static LOG* getIntance(){call_once(once, init);return log;}
static void init(){log = new LOG(); // 懒汉单例}
void print(string msg){cout << __TIMESTAMP__ << " : " << msg << endl;}
private:static LOG * log;
};
LOG * LOG::log = nullptr;
void print(){for (int i = 0; i < 5; ++i) {LOG::getIntance()->print(to_string(i));}
}
int main() {thread t1(print);thread t2(print);
if (t1.joinable()) t1.join();if (t2.joinable()) t2.join();
return 0;
}
6、condition_variable
优点:能够高效地实现线程之间的同步与通信,使线程在等待某个条件满足时自动挂起,避免忙等浪费 CPU 资源,当条件满足时再由其他线程唤醒,既保证了线程安全,又提高了系统性能,非常适合用在生产者-消费者等典型的并发场景中。
创建condition_variable对象的步骤: a、创建condition_variable对象g_cv; b、创建互斥锁对象mtx用于保护共享资源; c、在需要等待条件变量的线程使用unique_lock对象lg锁定互斥锁并调用g_cv.wait()、g_cv.wait_for()或g_cv.wait_until()等待条件变量; d、在需要通知等待的线程中,使用g_cv.notify_one()或g_cv.notify_all()通知等待线程
简单的生产者-消费者模型示例:
queue<int> g_queue; // 任务队列
condition_variable g_cv; // 条件变量
mutex mtx; // 互斥锁
void product(){for(int i = 0; i < 5; ++i){{unique_lock<mutex> lg(mtx);cout << "product: " << i << endl;g_queue.push(i);// 通知g_cv.notify_one();}this_thread::sleep_for(chrono::seconds(2));}
}
void consume(){while (true){unique_lock<mutex> lg(mtx);// 等待g_cv.wait(lg, [](){return !g_queue.empty();});cout << "consume: " << g_queue.front() << endl;g_queue.pop();}
}
int main() {thread t1(product);thread t2(consume);
if(t1.joinable()) t1.join();if(t2.joinable()) t2.join();return 0;
}
四、线程池
通俗解释:
线程池就是一个“养了一群线程工人的工厂”,你只需要把任务扔进去,它会自动安排工人执行,不用你每次都招人(创建线程)、解雇人(销毁线程)了。
// ----------- ThreadPool 类定义(不支持future返回值) -----------//
class threadPool1{
public:threadPool1(int numThreads) : threads_(numThreads) ,stop_(false){for(int i = 0; i < numThreads; ++i){threads_.emplace_back([this](){function<void()> task;{unique_lock<mutex> lock(mtx_);cond_.wait(lock, [this] {return stop_.load() || !task_.empty();});
if (task_.empty() && stop_.load()) return;task = std::move(task_.front());task_.pop();}task(); // 执行});}}
~threadPool1(){stop_.store(true);cond_.notify_all();for(auto & t : threads_){if(t.joinable()){t.join();}}}
template<typename T1, typename T2>void enqueue(T1&& task, T2&& Args...){{unique_lock<mutex> lock(mtx_);task_.push(bind(forward<T1>(task), forward<T2>(Args)));}cond_.notify_one();}
private:atomic<bool> stop_;
vector<thread> threads_;queue<function<void()>> task_;mutex mtx_;condition_variable cond_;
};
// ----------- ThreadPool 类定义(支持 future 返回值) -----------
class ThreadPool2 {
public:explicit ThreadPool2(int numThreads) : stop_(false) {for (int i = 0; i < numThreads; ++i) {threads_.emplace_back([this] {while (true) {function<void()> task;
{unique_lock<mutex> lock(mtx_);cond_.wait(lock, [this] {return stop_.load() || !tasks_.empty();});if (stop_.load() && tasks_.empty()) return;task = std::move(tasks_.front());tasks_.pop();}
task();}});}}
~ThreadPool2() {stop_.store(true);cond_.notify_all();for (auto& t : threads_) {if (t.joinable()) t.join();}}
// 支持返回 future 的 enqueuetemplate <class F, class... Args>auto enqueue(F&& f, Args&&... args)-> future<typename result_of<F(Args...)>::type>{using RetType = typename result_of<F(Args...)>::type;
auto taskPtr = make_shared<packaged_task<RetType()>>(bind(forward<F>(f), forward<Args>(args)...));future<RetType> res = taskPtr->get_future();
{unique_lock<mutex> lock(mtx_);if (stop_.load())throw runtime_error("enqueue on stopped ThreadPool");tasks_.emplace([taskPtr](){ (*taskPtr)(); });}
cond_.notify_one();return res;}
private:vector<thread> threads_;queue<function<void()>> tasks_;mutex mtx_;condition_variable cond_;atomic<bool> stop_;
};
void print(int nums){for(int i = 0; i < nums; ++i){cout << i << endl;}
}
int main() {// 线程池1threadPool1 threadpool1(10);threadpool1.enqueue(print, 200);threadpool1.enqueue(print, 200);// 线程池2ThreadPool2 threadpool2(10);vector<future<int>> futures;futures.reserve(10);// 提交 10 个任务,返回值都是 int,但类型由 decltype 自动推导for (int i = 0; i < 10; ++i) {futures.emplace_back(threadpool2.enqueue([i]() {// 模拟耗时this_thread::sleep_for(chrono::milliseconds(100 * i));return i * i;}));}// 获取并打印所有任务结果for (auto &fut : futures) {cout << fut.get() << "\n";}
return 0;
}
五、异步编程方式
1、async
定义:c++11引入的一个函数模板(自动管理线程),用于异步执行一个函数,并返回一个std::future对象,表示异步操作的结果。使用std::async可以方便地进行异步编程,避免了手动创建线程和管理线程的麻烦。
std::launch::async :立即启动新线程;
std::launch::deferred :懒执行,调用 .get() 时才执行;
可使用 future_status
检查状态。
函数 | 功能 |
---|---|
wait() | 阻塞,直到任务完成 |
wait_for(duration) | 等待一段时间,看任务是否完成 |
wait_until(time_point) | 等到某个时间点,看任务是否完成 |
示例:
int work() {this_thread::sleep_for(chrono::seconds(3)); // 模拟耗时任务return 42;
}
int main() {future<int> fut = async(launch::async, work);
cout << "Waiting up to 1s..." << endl;future_status status = fut.wait_for(chrono::seconds(1));
if (status == future_status::ready) {cout << "Result: " << fut.get() << endl;} else {cout << "Not ready yet, do something else...\n";// 等下再 get()fut.wait(); // 或 fut.get() 阻塞等待cout << "Result (later): " << fut.get() << endl; // ❌错误!future 只能 get 一次}
return 0;
}
2、packeged_task
定义:packeged_task是一个类模板,用于将一个可调用对象(如函数、函数对象或Lambda表达式)封装成一个可异步调用的任务,并返回一个std::future对象,表示异步调用的结果。packeged_task可以方便地将一个函数或可调用对象转换成一个异步调用任务,供其他线程使用。
示例:
int func(){int i = 0;for(; i < 10; i++){++i;}return i;
}
int main() {packaged_task<int()> task(func);auto future_rs= task.get_future();thread t(std::move(task));t.join();cout << future_rs.get() << endl;
return 0;
}
3、promise
定义:promise是一个类模板,用于在一个线程中产生一个值,并在另一个线程中获取这个值,适合用于“线程间通信”或“主动控制异步结果”的场景。常与async和future一起使用,实现异步编程。
示例:
void func_(promise<int>& f){f.set_value(1000);
}
int main() {promise<int> f;auto future_rs3 = f.get_future();// 线程t1中设置值thread t1(func_, std::ref(f));t1.join();// 主线程中访问值cout << future_rs3.get() << endl;
return 0;
}
六、原子操作
优势:性能优于 mutex,在多线程环境下对同一个atomic变量进行操作,不会出现数据竞争的问题;支持所有整数类型的原子加减。
成员函数:
a、load():将atomic变量的值加载到当前线程的本地缓存中,并返回这个值;
b、store(val):将val的值存储到atomic变量中,并保证这个操作的原子性,不会出现数据竞争; c、exchange(val):将val的值存储到atomic变量中,并返回原先的值;
d、compare_exchange_strong(expected, val)和compare_exchange_weak(expected, val),比较atomic变量中的值是否和expected的值相同,若相同,将val的值存储到atomic变量中,并返回true;否者将atomic变量的值存储到expected中,并返回false。
// 原子变量定义共享资源
atomic<int> shared_data;
void func(){for(int i = 0; i < 100000; ++i){++shared_data;}
}
int main(){thread t1(func);thread t2(func);
if (t1.joinable()) t1.join();if (t2.joinable()) t2.join();
cout << shared_data << endl;
return 0;
}
七、线程相关辅助工具
1、std::this_thread
sleep_for(duration):睡眠指定时间;
sleep_until(time_point):睡到某时刻;
get_id():获取当前线程ID;
yield():让出当前时间片。
2、 thread::hardware_concurrency()
返回CPU核心数(可用于线程池初始化)。