强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

C/C++ Linux 开发教程(GCC + CMake) / 多线程编程(std::thread/Mutex/Atomic)

多线程编程(std::thread/Mutex/Atomic)

1. std::thread 创建与管理

#include <iostream>
#include <thread>
#include <string>

void worker(const std::string& name, int count) {
    for (int i = 0; i < count; ++i) {
        std::cout << name << " 工作中 " << i << "\n";
    }
}

int main() {
    // 创建线程
    std::thread t1(worker, "线程A", 3);
    std::thread t2(worker, "线程B", 3);

    // 必须等待线程结束(join)或分离(detach)
    t1.join();
    t2.join();

    std::cout << "所有线程完成\n";
    return 0;
}

编译运行(需要链接 pthread):

g++ -std=c++17 -pthread -o demo demo.cpp && ./demo

Lambda 创建线程

#include <iostream>
#include <thread>
#include <vector>

int main() {
    const int num_threads = 4;
    std::vector<std::thread> threads;

    for (int i = 0; i < num_threads; ++i) {
        threads.emplace_back([i]() {
            std::cout << "线程 " << i << " (id="
                      << std::this_thread::get_id() << ")\n";
        });
    }

    for (auto& t : threads) {
        t.join();
    }

    return 0;
}

线程管理注意事项

#include <iostream>
#include <thread>

class ScopedThread {
    std::thread t_;

public:
    explicit ScopedThread(std::thread t) : t_(std::move(t)) {
        if (!t_.joinable()) {
            throw std::logic_error("No thread");
        }
    }

    ~ScopedThread() {
        if (t_.joinable()) {
            t_.join();
        }
    }

    ScopedThread(const ScopedThread&) = delete;
    ScopedThread& operator=(const ScopedThread&) = delete;
};

int main() {
    // ✅ 使用 RAII 管理线程
    {
        ScopedThread st(std::thread([]() {
            std::cout << "RAII 线程\n";
        }));
    }  // st 析构时自动 join

    // 硬件并发数
    std::cout << "硬件并发线程数: " << std::thread::hardware_concurrency() << "\n";

    return 0;
}

⚠️ 注意:如果 std::thread 对象析构时仍 joinable(未 join 也未 detach),会调用 std::terminate()


2. std::mutex / lock_guard / unique_lock

基本互斥锁

#include <iostream>
#include <thread>
#include <mutex>
#include <vector>

int counter = 0;
std::mutex mtx;

void increment(int n) {
    for (int i = 0; i < n; ++i) {
        std::lock_guard<std::mutex> lock(mtx);
        ++counter;
        // lock_guard 离开作用域自动解锁
    }
}

int main() {
    std::vector<std::thread> threads;
    const int num_threads = 10;
    const int increments = 10000;

    for (int i = 0; i < num_threads; ++i) {
        threads.emplace_back(increment, increments);
    }

    for (auto& t : threads) {
        t.join();
    }

    std::cout << "期望值: " << num_threads * increments << "\n";
    std::cout << "实际值: " << counter << "\n";

    return 0;
}

std::lock_guard vs std::unique_lock

特性lock_guardunique_lock
构造时加锁✅(可配置)
析构时解锁
手动 unlock
延迟加锁✅ (defer_lock)
尝试加锁✅ (try_to_lock)
可移动
性能略快略慢(更灵活)
#include <iostream>
#include <thread>
#include <mutex>
#include <string>

std::mutex mtx;

// lock_guard — 简单场景
void safe_print(const std::string& msg) {
    std::lock_guard<std::mutex> lock(mtx);
    std::cout << msg << "\n";
}  // 自动解锁

// unique_lock — 灵活场景
void complex_operation() {
    std::unique_lock<std::mutex> lock(mtx, std::defer_lock);  // 延迟加锁

    // ... 做一些不需要锁的工作 ...

    lock.lock();  // 手动加锁
    std::cout << "临界区操作\n";

    lock.unlock();  // 提前解锁
    // ... 做一些不需要锁的工作 ...

    // 再次加锁
    lock.lock();
    std::cout << "再次进入临界区\n";
}  // 自动解锁(如果仍持有锁)

// try_lock — 非阻塞尝试
void try_access() {
    std::unique_lock<std::mutex> lock(mtx, std::try_to_lock);
    if (lock.owns_lock()) {
        std::cout << "获取锁成功\n";
    } else {
        std::cout << "锁被占用,跳过\n";
    }
}

int main() {
    std::thread t1(safe_print, "你好");
    std::thread t2(safe_print, "世界");
    t1.join();
    t2.join();

    return 0;
}

std::scoped_lock (C++17)

#include <iostream>
#include <thread>
#include <mutex>

std::mutex mtx1, mtx2;

// C++17 scoped_lock — 同时锁定多个互斥量(避免死锁)
void transfer(int& from, int& to, int amount) {
    std::scoped_lock lock(mtx1, mtx2);  // 原子性地锁定所有互斥量
    from -= amount;
    to += amount;
}

int main() {
    int account_a = 1000, account_b = 1000;

    std::thread t1(transfer, std::ref(account_a), std::ref(account_b), 100);
    std::thread t2(transfer, std::ref(account_b), std::ref(account_a), 50);

    t1.join();
    t2.join();

    std::cout << "A=" << account_a << " B=" << account_b << "\n";
    return 0;
}

3. 条件变量 std::condition_variable

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>

std::mutex mtx;
std::condition_variable cv;
std::queue<int> task_queue;
bool done = false;

void producer() {
    for (int i = 0; i < 10; ++i) {
        {
            std::lock_guard<std::mutex> lock(mtx);
            task_queue.push(i);
            std::cout << "生产: " << i << "\n";
        }
        cv.notify_one();  // 通知一个等待线程
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }

    {
        std::lock_guard<std::mutex> lock(mtx);
        done = true;
    }
    cv.notify_all();  // 通知所有等待线程
}

void consumer(const std::string& name) {
    while (true) {
        std::unique_lock<std::mutex> lock(mtx);
        cv.wait(lock, [] { return !task_queue.empty() || done; });  // 等待条件

        while (!task_queue.empty()) {
            int task = task_queue.front();
            task_queue.pop();
            lock.unlock();
            std::cout << name << " 消费: " << task << "\n";
            lock.lock();
        }

        if (done && task_queue.empty()) break;
    }
}

int main() {
    std::thread p(producer);
    std::thread c1(consumer, "消费者1");
    std::thread c2(consumer, "消费者2");

    p.join();
    c1.join();
    c2.join();

    std::cout << "全部完成\n";
    return 0;
}
方法作用
cv.wait(lock)等待通知,自动解锁/加锁
cv.wait(lock, pred)等待直到 pred() 为 true
cv.notify_one()唤醒一个等待线程
cv.notify_all()唤醒所有等待线程
cv.wait_for(lock, duration)超时等待
cv.wait_until(lock, timepoint)等待到指定时间点

4. 原子操作 std::atomic

#include <iostream>
#include <thread>
#include <atomic>
#include <vector>

std::atomic<int> atomic_counter{0};
int non_atomic_counter = 0;

void increment(int n) {
    for (int i = 0; i < n; ++i) {
        atomic_counter++;                // 原子递增
        non_atomic_counter++;            // 数据竞争!
    }
}

int main() {
    const int N = 100000;
    std::vector<std::thread> threads;

    for (int i = 0; i < 10; ++i) {
        threads.emplace_back(increment, N);
    }

    for (auto& t : threads) {
        t.join();
    }

    std::cout << "原子计数器: " << atomic_counter << " (期望: " << 10 * N << ")\n";
    std::cout << "普通计数器: " << non_atomic_counter << " (期望: " << 10 * N << ")\n";

    return 0;
}

atomic 操作

#include <iostream>
#include <atomic>
#include <thread>

struct AtomicFlag {
    std::atomic<bool> flag{false};
    std::atomic<int> value{0};

    void set(int v) {
        value.store(v, std::memory_order_relaxed);
        flag.store(true, std::memory_order_release);  // release 语义
    }

    int get() {
        while (!flag.load(std::memory_order_acquire)) {  // acquire 语义
            std::this_thread::yield();
        }
        return value.load(std::memory_order_relaxed);
    }
};

int main() {
    AtomicFlag af;

    std::thread writer([&af]() {
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        af.set(42);
        std::cout << "写入完成: 42\n";
    });

    std::thread reader([&af]() {
        int val = af.get();
        std::cout << "读取到: " << val << "\n";
    });

    writer.join();
    reader.join();

    // CAS 操作
    std::atomic<int> counter{0};
    int expected = 0;
    bool success = counter.compare_exchange_strong(expected, 1);
    std::cout << "CAS: " << std::boolalpha << success
              << " counter=" << counter << " expected=" << expected << "\n";

    return 0;
}
内存序说明
memory_order_relaxed无顺序约束,仅保证原子性
memory_order_acquire读操作,后续读写不会重排到此之前
memory_order_release写操作,之前读写不会重排到此之后
memory_order_acq_rel同时具有 acquire 和 release
memory_order_seq_cst默认,全局顺序一致性(最强)

5. std::future / std::promise / std::async

std::async

#include <iostream>
#include <future>
#include <numeric>
#include <vector>

long long sum_range(int start, int end) {
    long long sum = 0;
    for (int i = start; i < end; ++i) {
        sum += i;
    }
    return sum;
}

int main() {
    // 异步执行
    auto future = std::async(std::launch::async, sum_range, 0, 1000000);

    // 主线程继续做其他工作
    std::cout << "主线程继续执行...\n";

    // 获取结果(会阻塞直到完成)
    long long result = future.get();
    std::cout << "异步结果: " << result << "\n";

    // 并行计算
    auto f1 = std::async(std::launch::async, sum_range, 0, 250000);
    auto f2 = std::async(std::launch::async, sum_range, 250000, 500000);
    auto f3 = std::async(std::launch::async, sum_range, 500000, 750000);
    auto f4 = std::async(std::launch::async, sum_range, 750000, 1000000);

    long long total = f1.get() + f2.get() + f3.get() + f4.get();
    std::cout << "并行结果: " << total << "\n";

    return 0;
}

std::promise / std::future

#include <iostream>
#include <thread>
#include <future>
#include <string>

void compute(std::promise<int> prom) {
    try {
        int result = 42;
        // 模拟复杂计算
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        prom.set_value(result);  // 设置结果
    } catch (...) {
        prom.set_exception(std::current_exception());  // 传递异常
    }
}

void failing_task(std::promise<std::string> prom) {
    try {
        throw std::runtime_error("计算失败");
    } catch (...) {
        prom.set_exception(std::current_exception());
    }
}

int main() {
    // promise → future 管道
    std::promise<int> prom;
    std::future<int> fut = prom.get_future();

    std::thread t(compute, std::move(prom));
    std::cout << "等待异步结果...\n";

    int result = fut.get();  // 阻塞等待
    std::cout << "结果: " << result << "\n";
    t.join();

    // 异常传播
    std::promise<std::string> prom2;
    std::future<std::string> fut2 = prom2.get_future();
    std::thread t2(failing_task, std::move(prom2));

    try {
        fut2.get();
    } catch (const std::exception& e) {
        std::cout << "捕获异步异常: " << e.what() << "\n";
    }
    t2.join();

    return 0;
}
工具方向用途
std::async调用者 → 任务 → 结果最简单的异步方式
std::promise生产者设置值单次值传递
std::future消费者获取值获取异步结果
std::packaged_task包装可调用对象任务队列

6. 线程池实现

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <functional>
#include <vector>
#include <future>

class ThreadPool {
    std::vector<std::thread> workers_;
    std::queue<std::function<void()>> tasks_;
    std::mutex mtx_;
    std::condition_variable cv_;
    bool stop_ = false;

public:
    explicit ThreadPool(size_t numThreads) {
        for (size_t i = 0; i < numThreads; ++i) {
            workers_.emplace_back([this] {
                while (true) {
                    std::function<void()> task;
                    {
                        std::unique_lock<std::mutex> lock(mtx_);
                        cv_.wait(lock, [this] { return stop_ || !tasks_.empty(); });

                        if (stop_ && tasks_.empty()) return;

                        task = std::move(tasks_.front());
                        tasks_.pop();
                    }
                    task();
                }
            });
        }
    }

    ~ThreadPool() {
        {
            std::lock_guard<std::mutex> lock(mtx_);
            stop_ = true;
        }
        cv_.notify_all();
        for (auto& w : workers_) {
            w.join();
        }
    }

    // 提交任务并返回 future
    template <typename F, typename... Args>
    auto submit(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {
        using ReturnType = decltype(f(args...));

        auto task = std::make_shared<std::packaged_task<ReturnType()>>(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );

        std::future<ReturnType> result = task->get_future();
        {
            std::lock_guard<std::mutex> lock(mtx_);
            if (stop_) throw std::runtime_error("ThreadPool 已停止");
            tasks_.emplace([task]() { (*task)(); });
        }
        cv_.notify_one();
        return result;
    }
};

int main() {
    ThreadPool pool(4);

    std::vector<std::future<int>> results;

    for (int i = 0; i < 8; ++i) {
        results.emplace_back(pool.submit([i]() -> int {
            std::this_thread::sleep_for(std::chrono::milliseconds(50));
            return i * i;
        }));
    }

    for (auto& f : results) {
        std::cout << f.get() << " ";
    }
    std::cout << "\n";

    return 0;
}

7. 数据竞争检测(ThreadSanitizer)

# 编译时启用 ThreadSanitizer
g++ -std=c++17 -fsanitize=thread -g -o demo demo.cpp && ./demo
#include <iostream>
#include <thread>

// ❌ 数据竞争示例(TSan 可以检测)
int shared_data = 0;

void data_race() {
    // 两个线程同时写同一变量,没有同步
    std::thread t1([]() {
        for (int i = 0; i < 100000; ++i) shared_data++;
    });
    std::thread t2([]() {
        for (int i = 0; i < 100000; ++i) shared_data++;
    });

    t1.join();
    t2.join();
    // 结果不确定,TSan 会报告 data race
    std::cout << "data_race 结果: " << shared_data << " (可能不正确)\n";
}

// ✅ 修复:使用 mutex
#include <mutex>
int safe_data = 0;
std::mutex mtx;

void no_data_race() {
    safe_data = 0;
    std::thread t1([]() {
        for (int i = 0; i < 100000; ++i) {
            std::lock_guard<std::mutex> lock(mtx);
            safe_data++;
        }
    });
    std::thread t2([]() {
        for (int i = 0; i < 100000; ++i) {
            std::lock_guard<std::mutex> lock(mtx);
            safe_data++;
        }
    });

    t1.join();
    t2.join();
    std::cout << "no_data_race 结果: " << safe_data << "\n";
}

int main() {
    data_race();
    no_data_race();
    return 0;
}

💡 提示:ThreadSanitizer 会显著降低程序速度(2-10 倍),仅在调试时使用。


8. C++20 std::jthread

#include <iostream>
#include <thread>
#include <chrono>

int main() {
    // jthread 自动 join(析构时自动请求停止并 join)
    {
        std::jthread worker([](std::stop_token token) {
            int count = 0;
            while (!token.stop_requested()) {
                std::cout << "工作中 " << count++ << "\n";
                std::this_thread::sleep_for(std::chrono::milliseconds(50));
            }
            std::cout << "收到停止请求\n";
        });

        std::this_thread::sleep_for(std::chrono::milliseconds(200));
        worker.request_stop();  // 请求停止
    }  // worker 析构时自动 join

    std::cout << "jthread 已结束\n";

    // stop_source / stop_token
    std::stop_source source;
    std::stop_token token = source.get_token();

    std::jthread t([&token]() {
        while (!token.stop_requested()) {
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
        }
    });

    source.request_stop();
    t.join();

    return 0;
}

💡 提示std::jthreadstd::thread 的改进版,避免了忘记 join/detach 导致的崩溃。


9. 并发设计模式

生产者-消费者

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <string>

template <typename T>
class BlockingQueue {
    std::queue<T> queue_;
    std::mutex mtx_;
    std::condition_variable not_empty_;
    std::condition_variable not_full_;
    size_t max_size_;
    bool stopped_ = false;

public:
    explicit BlockingQueue(size_t maxSize) : max_size_(maxSize) {}

    void push(T item) {
        std::unique_lock<std::mutex> lock(mtx_);
        not_full_.wait(lock, [this] { return queue_.size() < max_size_ || stopped_; });
        if (stopped_) return;
        queue_.push(std::move(item));
        not_empty_.notify_one();
    }

    bool pop(T& item) {
        std::unique_lock<std::mutex> lock(mtx_);
        not_empty_.wait(lock, [this] { return !queue_.empty() || stopped_; });
        if (stopped_ && queue_.empty()) return false;
        item = std::move(queue_.front());
        queue_.pop();
        not_full_.notify_one();
        return true;
    }

    void stop() {
        std::lock_guard<std::mutex> lock(mtx_);
        stopped_ = true;
        not_empty_.notify_all();
        not_full_.notify_all();
    }
};

int main() {
    BlockingQueue<int> queue(5);

    // 生产者
    std::thread producer([&queue]() {
        for (int i = 0; i < 20; ++i) {
            queue.push(i);
            std::cout << "生产: " << i << "\n";
        }
        queue.stop();
    });

    // 消费者
    std::thread consumer([&queue]() {
        int item;
        while (queue.pop(item)) {
            std::cout << "消费: " << item << "\n";
            std::this_thread::sleep_for(std::chrono::milliseconds(50));
        }
    });

    producer.join();
    consumer.join();

    std::cout << "生产者-消费者模式完成\n";
    return 0;
}

扩展阅读