C/C++ Linux 开发教程(GCC + CMake) / 多线程编程(std::thread/Mutex/Atomic)
多线程编程(std::thread/Mutex/Atomic)
1. std::thread 创建与管理
#include <iostream>
#include <thread>
#include <string>
void worker(const std::string& name, int count) {
for (int i = 0; i < count; ++i) {
std::cout << name << " 工作中 " << i << "\n";
}
}
int main() {
// 创建线程
std::thread t1(worker, "线程A", 3);
std::thread t2(worker, "线程B", 3);
// 必须等待线程结束(join)或分离(detach)
t1.join();
t2.join();
std::cout << "所有线程完成\n";
return 0;
}
编译运行(需要链接 pthread):
g++ -std=c++17 -pthread -o demo demo.cpp && ./demo
Lambda 创建线程
#include <iostream>
#include <thread>
#include <vector>
int main() {
const int num_threads = 4;
std::vector<std::thread> threads;
for (int i = 0; i < num_threads; ++i) {
threads.emplace_back([i]() {
std::cout << "线程 " << i << " (id="
<< std::this_thread::get_id() << ")\n";
});
}
for (auto& t : threads) {
t.join();
}
return 0;
}
线程管理注意事项
#include <iostream>
#include <thread>
class ScopedThread {
std::thread t_;
public:
explicit ScopedThread(std::thread t) : t_(std::move(t)) {
if (!t_.joinable()) {
throw std::logic_error("No thread");
}
}
~ScopedThread() {
if (t_.joinable()) {
t_.join();
}
}
ScopedThread(const ScopedThread&) = delete;
ScopedThread& operator=(const ScopedThread&) = delete;
};
int main() {
// ✅ 使用 RAII 管理线程
{
ScopedThread st(std::thread([]() {
std::cout << "RAII 线程\n";
}));
} // st 析构时自动 join
// 硬件并发数
std::cout << "硬件并发线程数: " << std::thread::hardware_concurrency() << "\n";
return 0;
}
⚠️ 注意:如果
std::thread对象析构时仍 joinable(未 join 也未 detach),会调用std::terminate()。
2. std::mutex / lock_guard / unique_lock
基本互斥锁
#include <iostream>
#include <thread>
#include <mutex>
#include <vector>
int counter = 0;
std::mutex mtx;
void increment(int n) {
for (int i = 0; i < n; ++i) {
std::lock_guard<std::mutex> lock(mtx);
++counter;
// lock_guard 离开作用域自动解锁
}
}
int main() {
std::vector<std::thread> threads;
const int num_threads = 10;
const int increments = 10000;
for (int i = 0; i < num_threads; ++i) {
threads.emplace_back(increment, increments);
}
for (auto& t : threads) {
t.join();
}
std::cout << "期望值: " << num_threads * increments << "\n";
std::cout << "实际值: " << counter << "\n";
return 0;
}
std::lock_guard vs std::unique_lock
| 特性 | lock_guard | unique_lock |
|---|---|---|
| 构造时加锁 | ✅ | ✅(可配置) |
| 析构时解锁 | ✅ | ✅ |
| 手动 unlock | ❌ | ✅ |
| 延迟加锁 | ❌ | ✅ (defer_lock) |
| 尝试加锁 | ❌ | ✅ (try_to_lock) |
| 可移动 | ❌ | ✅ |
| 性能 | 略快 | 略慢(更灵活) |
#include <iostream>
#include <thread>
#include <mutex>
#include <string>
std::mutex mtx;
// lock_guard — 简单场景
void safe_print(const std::string& msg) {
std::lock_guard<std::mutex> lock(mtx);
std::cout << msg << "\n";
} // 自动解锁
// unique_lock — 灵活场景
void complex_operation() {
std::unique_lock<std::mutex> lock(mtx, std::defer_lock); // 延迟加锁
// ... 做一些不需要锁的工作 ...
lock.lock(); // 手动加锁
std::cout << "临界区操作\n";
lock.unlock(); // 提前解锁
// ... 做一些不需要锁的工作 ...
// 再次加锁
lock.lock();
std::cout << "再次进入临界区\n";
} // 自动解锁(如果仍持有锁)
// try_lock — 非阻塞尝试
void try_access() {
std::unique_lock<std::mutex> lock(mtx, std::try_to_lock);
if (lock.owns_lock()) {
std::cout << "获取锁成功\n";
} else {
std::cout << "锁被占用,跳过\n";
}
}
int main() {
std::thread t1(safe_print, "你好");
std::thread t2(safe_print, "世界");
t1.join();
t2.join();
return 0;
}
std::scoped_lock (C++17)
#include <iostream>
#include <thread>
#include <mutex>
std::mutex mtx1, mtx2;
// C++17 scoped_lock — 同时锁定多个互斥量(避免死锁)
void transfer(int& from, int& to, int amount) {
std::scoped_lock lock(mtx1, mtx2); // 原子性地锁定所有互斥量
from -= amount;
to += amount;
}
int main() {
int account_a = 1000, account_b = 1000;
std::thread t1(transfer, std::ref(account_a), std::ref(account_b), 100);
std::thread t2(transfer, std::ref(account_b), std::ref(account_a), 50);
t1.join();
t2.join();
std::cout << "A=" << account_a << " B=" << account_b << "\n";
return 0;
}
3. 条件变量 std::condition_variable
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
std::mutex mtx;
std::condition_variable cv;
std::queue<int> task_queue;
bool done = false;
void producer() {
for (int i = 0; i < 10; ++i) {
{
std::lock_guard<std::mutex> lock(mtx);
task_queue.push(i);
std::cout << "生产: " << i << "\n";
}
cv.notify_one(); // 通知一个等待线程
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
{
std::lock_guard<std::mutex> lock(mtx);
done = true;
}
cv.notify_all(); // 通知所有等待线程
}
void consumer(const std::string& name) {
while (true) {
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [] { return !task_queue.empty() || done; }); // 等待条件
while (!task_queue.empty()) {
int task = task_queue.front();
task_queue.pop();
lock.unlock();
std::cout << name << " 消费: " << task << "\n";
lock.lock();
}
if (done && task_queue.empty()) break;
}
}
int main() {
std::thread p(producer);
std::thread c1(consumer, "消费者1");
std::thread c2(consumer, "消费者2");
p.join();
c1.join();
c2.join();
std::cout << "全部完成\n";
return 0;
}
| 方法 | 作用 |
|---|---|
cv.wait(lock) | 等待通知,自动解锁/加锁 |
cv.wait(lock, pred) | 等待直到 pred() 为 true |
cv.notify_one() | 唤醒一个等待线程 |
cv.notify_all() | 唤醒所有等待线程 |
cv.wait_for(lock, duration) | 超时等待 |
cv.wait_until(lock, timepoint) | 等待到指定时间点 |
4. 原子操作 std::atomic
#include <iostream>
#include <thread>
#include <atomic>
#include <vector>
std::atomic<int> atomic_counter{0};
int non_atomic_counter = 0;
void increment(int n) {
for (int i = 0; i < n; ++i) {
atomic_counter++; // 原子递增
non_atomic_counter++; // 数据竞争!
}
}
int main() {
const int N = 100000;
std::vector<std::thread> threads;
for (int i = 0; i < 10; ++i) {
threads.emplace_back(increment, N);
}
for (auto& t : threads) {
t.join();
}
std::cout << "原子计数器: " << atomic_counter << " (期望: " << 10 * N << ")\n";
std::cout << "普通计数器: " << non_atomic_counter << " (期望: " << 10 * N << ")\n";
return 0;
}
atomic 操作
#include <iostream>
#include <atomic>
#include <thread>
struct AtomicFlag {
std::atomic<bool> flag{false};
std::atomic<int> value{0};
void set(int v) {
value.store(v, std::memory_order_relaxed);
flag.store(true, std::memory_order_release); // release 语义
}
int get() {
while (!flag.load(std::memory_order_acquire)) { // acquire 语义
std::this_thread::yield();
}
return value.load(std::memory_order_relaxed);
}
};
int main() {
AtomicFlag af;
std::thread writer([&af]() {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
af.set(42);
std::cout << "写入完成: 42\n";
});
std::thread reader([&af]() {
int val = af.get();
std::cout << "读取到: " << val << "\n";
});
writer.join();
reader.join();
// CAS 操作
std::atomic<int> counter{0};
int expected = 0;
bool success = counter.compare_exchange_strong(expected, 1);
std::cout << "CAS: " << std::boolalpha << success
<< " counter=" << counter << " expected=" << expected << "\n";
return 0;
}
| 内存序 | 说明 |
|---|---|
memory_order_relaxed | 无顺序约束,仅保证原子性 |
memory_order_acquire | 读操作,后续读写不会重排到此之前 |
memory_order_release | 写操作,之前读写不会重排到此之后 |
memory_order_acq_rel | 同时具有 acquire 和 release |
memory_order_seq_cst | 默认,全局顺序一致性(最强) |
5. std::future / std::promise / std::async
std::async
#include <iostream>
#include <future>
#include <numeric>
#include <vector>
long long sum_range(int start, int end) {
long long sum = 0;
for (int i = start; i < end; ++i) {
sum += i;
}
return sum;
}
int main() {
// 异步执行
auto future = std::async(std::launch::async, sum_range, 0, 1000000);
// 主线程继续做其他工作
std::cout << "主线程继续执行...\n";
// 获取结果(会阻塞直到完成)
long long result = future.get();
std::cout << "异步结果: " << result << "\n";
// 并行计算
auto f1 = std::async(std::launch::async, sum_range, 0, 250000);
auto f2 = std::async(std::launch::async, sum_range, 250000, 500000);
auto f3 = std::async(std::launch::async, sum_range, 500000, 750000);
auto f4 = std::async(std::launch::async, sum_range, 750000, 1000000);
long long total = f1.get() + f2.get() + f3.get() + f4.get();
std::cout << "并行结果: " << total << "\n";
return 0;
}
std::promise / std::future
#include <iostream>
#include <thread>
#include <future>
#include <string>
void compute(std::promise<int> prom) {
try {
int result = 42;
// 模拟复杂计算
std::this_thread::sleep_for(std::chrono::milliseconds(100));
prom.set_value(result); // 设置结果
} catch (...) {
prom.set_exception(std::current_exception()); // 传递异常
}
}
void failing_task(std::promise<std::string> prom) {
try {
throw std::runtime_error("计算失败");
} catch (...) {
prom.set_exception(std::current_exception());
}
}
int main() {
// promise → future 管道
std::promise<int> prom;
std::future<int> fut = prom.get_future();
std::thread t(compute, std::move(prom));
std::cout << "等待异步结果...\n";
int result = fut.get(); // 阻塞等待
std::cout << "结果: " << result << "\n";
t.join();
// 异常传播
std::promise<std::string> prom2;
std::future<std::string> fut2 = prom2.get_future();
std::thread t2(failing_task, std::move(prom2));
try {
fut2.get();
} catch (const std::exception& e) {
std::cout << "捕获异步异常: " << e.what() << "\n";
}
t2.join();
return 0;
}
| 工具 | 方向 | 用途 |
|---|---|---|
std::async | 调用者 → 任务 → 结果 | 最简单的异步方式 |
std::promise | 生产者设置值 | 单次值传递 |
std::future | 消费者获取值 | 获取异步结果 |
std::packaged_task | 包装可调用对象 | 任务队列 |
6. 线程池实现
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <functional>
#include <vector>
#include <future>
class ThreadPool {
std::vector<std::thread> workers_;
std::queue<std::function<void()>> tasks_;
std::mutex mtx_;
std::condition_variable cv_;
bool stop_ = false;
public:
explicit ThreadPool(size_t numThreads) {
for (size_t i = 0; i < numThreads; ++i) {
workers_.emplace_back([this] {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(mtx_);
cv_.wait(lock, [this] { return stop_ || !tasks_.empty(); });
if (stop_ && tasks_.empty()) return;
task = std::move(tasks_.front());
tasks_.pop();
}
task();
}
});
}
}
~ThreadPool() {
{
std::lock_guard<std::mutex> lock(mtx_);
stop_ = true;
}
cv_.notify_all();
for (auto& w : workers_) {
w.join();
}
}
// 提交任务并返回 future
template <typename F, typename... Args>
auto submit(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {
using ReturnType = decltype(f(args...));
auto task = std::make_shared<std::packaged_task<ReturnType()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<ReturnType> result = task->get_future();
{
std::lock_guard<std::mutex> lock(mtx_);
if (stop_) throw std::runtime_error("ThreadPool 已停止");
tasks_.emplace([task]() { (*task)(); });
}
cv_.notify_one();
return result;
}
};
int main() {
ThreadPool pool(4);
std::vector<std::future<int>> results;
for (int i = 0; i < 8; ++i) {
results.emplace_back(pool.submit([i]() -> int {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
return i * i;
}));
}
for (auto& f : results) {
std::cout << f.get() << " ";
}
std::cout << "\n";
return 0;
}
7. 数据竞争检测(ThreadSanitizer)
# 编译时启用 ThreadSanitizer
g++ -std=c++17 -fsanitize=thread -g -o demo demo.cpp && ./demo
#include <iostream>
#include <thread>
// ❌ 数据竞争示例(TSan 可以检测)
int shared_data = 0;
void data_race() {
// 两个线程同时写同一变量,没有同步
std::thread t1([]() {
for (int i = 0; i < 100000; ++i) shared_data++;
});
std::thread t2([]() {
for (int i = 0; i < 100000; ++i) shared_data++;
});
t1.join();
t2.join();
// 结果不确定,TSan 会报告 data race
std::cout << "data_race 结果: " << shared_data << " (可能不正确)\n";
}
// ✅ 修复:使用 mutex
#include <mutex>
int safe_data = 0;
std::mutex mtx;
void no_data_race() {
safe_data = 0;
std::thread t1([]() {
for (int i = 0; i < 100000; ++i) {
std::lock_guard<std::mutex> lock(mtx);
safe_data++;
}
});
std::thread t2([]() {
for (int i = 0; i < 100000; ++i) {
std::lock_guard<std::mutex> lock(mtx);
safe_data++;
}
});
t1.join();
t2.join();
std::cout << "no_data_race 结果: " << safe_data << "\n";
}
int main() {
data_race();
no_data_race();
return 0;
}
💡 提示:ThreadSanitizer 会显著降低程序速度(2-10 倍),仅在调试时使用。
8. C++20 std::jthread
#include <iostream>
#include <thread>
#include <chrono>
int main() {
// jthread 自动 join(析构时自动请求停止并 join)
{
std::jthread worker([](std::stop_token token) {
int count = 0;
while (!token.stop_requested()) {
std::cout << "工作中 " << count++ << "\n";
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
std::cout << "收到停止请求\n";
});
std::this_thread::sleep_for(std::chrono::milliseconds(200));
worker.request_stop(); // 请求停止
} // worker 析构时自动 join
std::cout << "jthread 已结束\n";
// stop_source / stop_token
std::stop_source source;
std::stop_token token = source.get_token();
std::jthread t([&token]() {
while (!token.stop_requested()) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
});
source.request_stop();
t.join();
return 0;
}
💡 提示:
std::jthread是std::thread的改进版,避免了忘记 join/detach 导致的崩溃。
9. 并发设计模式
生产者-消费者
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <string>
template <typename T>
class BlockingQueue {
std::queue<T> queue_;
std::mutex mtx_;
std::condition_variable not_empty_;
std::condition_variable not_full_;
size_t max_size_;
bool stopped_ = false;
public:
explicit BlockingQueue(size_t maxSize) : max_size_(maxSize) {}
void push(T item) {
std::unique_lock<std::mutex> lock(mtx_);
not_full_.wait(lock, [this] { return queue_.size() < max_size_ || stopped_; });
if (stopped_) return;
queue_.push(std::move(item));
not_empty_.notify_one();
}
bool pop(T& item) {
std::unique_lock<std::mutex> lock(mtx_);
not_empty_.wait(lock, [this] { return !queue_.empty() || stopped_; });
if (stopped_ && queue_.empty()) return false;
item = std::move(queue_.front());
queue_.pop();
not_full_.notify_one();
return true;
}
void stop() {
std::lock_guard<std::mutex> lock(mtx_);
stopped_ = true;
not_empty_.notify_all();
not_full_.notify_all();
}
};
int main() {
BlockingQueue<int> queue(5);
// 生产者
std::thread producer([&queue]() {
for (int i = 0; i < 20; ++i) {
queue.push(i);
std::cout << "生产: " << i << "\n";
}
queue.stop();
});
// 消费者
std::thread consumer([&queue]() {
int item;
while (queue.pop(item)) {
std::cout << "消费: " << item << "\n";
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
});
producer.join();
consumer.join();
std::cout << "生产者-消费者模式完成\n";
return 0;
}
扩展阅读
- cppreference — Thread support library
- C++ Core Guidelines — Concurrency
- C++ Concurrency in Action (Anthony Williams)
- ThreadSanitizer 文档
- C++20 jthread proposal