强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

Java 完全指南 / 18 - 并发:Thread、ExecutorService、CompletableFuture、锁

18 - 并发:Thread、ExecutorService、CompletableFuture、锁

创建线程

public class ThreadDemo {
    // 方式1:继承 Thread
    static class MyThread extends Thread {
        @Override
        public void run() {
            System.out.println("Thread 子类: " + Thread.currentThread().getName());
        }
    }

    public static void main(String[] args) throws InterruptedException {
        // 方式2:实现 Runnable(推荐)
        Thread t1 = new Thread(() -> {
            System.out.println("Runnable: " + Thread.currentThread().getName());
        }, "worker-1");

        // 方式3:实现 Callable(有返回值)
        var executor = java.util.concurrent.Executors.newFixedThreadPool(2);
        var future = executor.submit(() -> {
            Thread.sleep(1000);
            return 42;
        });
        System.out.println("Callable 结果: " + future.get());  // 阻塞等待

        new MyThread().start();
        t1.start();
        t1.join();  // 等待线程结束

        executor.shutdown();
    }
}

线程状态

NEW → RUNNABLE → BLOCKED / WAITING / TIMED_WAITING → TERMINATED
方法说明
start()启动线程
join()等待线程结束
sleep(ms)当前线程休眠
interrupt()中断线程
yield()让出 CPU 时间片
setDaemon(true)设为守护线程

线程同步

public class SyncDemo {
    private int count = 0;
    private final Object lock = new Object();

    // synchronized 方法
    public synchronized void increment() {
        count++;
    }

    // synchronized 块
    public void decrement() {
        synchronized (lock) {
            count--;
        }
    }

    // ReentrantLock
    private final java.util.concurrent.locks.ReentrantLock reLock =
        new java.util.concurrent.locks.ReentrantLock();

    public void safeIncrement() {
        reLock.lock();
        try {
            count++;
        } finally {
            reLock.unlock();
        }
    }

    // volatile —— 保证可见性(不保证原子性)
    private volatile boolean running = true;

    public void stop() { running = false; }

    public static void main(String[] args) throws InterruptedException {
        SyncDemo demo = new SyncDemo();
        var pool = java.util.concurrent.Executors.newFixedThreadPool(10);

        for (int i = 0; i < 1000; i++) {
            pool.submit(demo::increment);
        }
        pool.shutdown();
        pool.awaitTermination(5, java.util.concurrent.TimeUnit.SECONDS);
        System.out.println("Count: " + demo.count);  // 1000
    }
}

原子类

import java.util.concurrent.atomic.*;

public class AtomicDemo {
    private final AtomicInteger counter = new AtomicInteger(0);
    private final AtomicReference<String> ref = new AtomicReference<>("initial");

    public void incrementAndGet() {
        int newVal = counter.incrementAndGet();  // CAS 操作
    }

    public int getAndUpdate() {
        return counter.getAndUpdate(n -> n + 5);  // 函数式更新
    }

    // LongAdder —— 高并发计数(比 AtomicLong 更快)
    private final java.util.concurrent.atomic.LongAdder adder = new java.util.concurrent.atomic.LongAdder();

    public void addCount() {
        adder.increment();
        adder.add(10);
    }

    public long getCount() { return adder.sum(); }
}

ExecutorService 线程池

import java.util.concurrent.*;

public class ExecutorDemo {
    public static void main(String[] args) throws Exception {
        // 固定大小线程池
        ExecutorService fixed = Executors.newFixedThreadPool(4);

        // 缓存线程池
        ExecutorService cached = Executors.newCachedThreadPool();

        // 单线程池
        ExecutorService single = Executors.newSingleThreadExecutor();

        // 定时线程池
        ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(2);

        // 推荐:自定义 ThreadPoolExecutor
        ThreadPoolExecutor custom = new ThreadPoolExecutor(
            4,                         // 核心线程数
            8,                         // 最大线程数
            60, TimeUnit.SECONDS,      // 空闲线程存活时间
            new LinkedBlockingQueue<>(100),  // 任务队列
            new ThreadPoolExecutor.CallerRunsPolicy()  // 拒绝策略
        );

        // 提交任务
        Future<String> future = fixed.submit(() -> {
            Thread.sleep(1000);
            return "完成";
        });
        System.out.println(future.get(5, TimeUnit.SECONDS));

        // 定时任务
        scheduled.scheduleAtFixedRate(() -> {
            System.out.println("定时任务: " + System.currentTimeMillis());
        }, 0, 2, TimeUnit.SECONDS);

        // invokeAll —— 批量执行
        var tasks = List.of(
            () -> { Thread.sleep(500); return "任务1"; },
            () -> { Thread.sleep(300); return "任务2"; },
            () -> { Thread.sleep(400); return "任务3"; }
        );
        var results = fixed.invokeAll(tasks);
        for (var r : results) {
            System.out.println(r.get());
        }

        // 优雅关闭
        fixed.shutdown();
        if (!fixed.awaitTermination(10, TimeUnit.SECONDS)) {
            fixed.shutdownNow();
        }
    }
}

拒绝策略

策略行为
AbortPolicy(默认)抛 RejectedExecutionException
CallerRunsPolicy提交者线程执行任务
DiscardPolicy静默丢弃
DiscardOldestPolicy丢弃队列中最早的任务

CompletableFuture

import java.util.concurrent.CompletableFuture;

public class CompletableFutureDemo {
    public static void main(String[] args) throws Exception {
        // 基本创建
        CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
            sleep(500);
            return "Hello";
        });

        // 链式操作
        String result = CompletableFuture.supplyAsync(() -> "Hello")
            .thenApply(s -> s + " World")         // 转换
            .thenApply(String::toUpperCase)       // 再转换
            .thenApply(s -> s + "!")              // 再转换
            .join();                               // 阻塞获取
        System.out.println(result);  // HELLO WORLD!

        // 消费结果(无返回值)
        CompletableFuture.runAsync(() -> System.out.println("异步执行"));

        // 组合多个异步任务
        var userFuture = CompletableFuture.supplyAsync(() -> "用户数据");
        var orderFuture = CompletableFuture.supplyAsync(() -> "订单数据");

        // 等待所有完成
        CompletableFuture.allOf(userFuture, orderFuture).join();
        System.out.println(userFuture.join() + " + " + orderFuture.join());

        // 任意一个完成
        CompletableFuture.anyOf(
            CompletableFuture.supplyAsync(() -> { sleep(1000); return "慢"; }),
            CompletableFuture.supplyAsync(() -> { sleep(100); return "快"; })
        ).thenAccept(r -> System.out.println("最快: " + r));

        // 异常处理
        String fallback = CompletableFuture.supplyAsync(() -> {
            if (true) throw new RuntimeException("失败");
            return "ok";
        }).exceptionally(ex -> "降级数据")   // 异常时返回默认值
          .join();
        System.out.println(fallback);  // 降级数据

        // handle —— 无论成功失败都处理
        CompletableFuture.supplyAsync(() -> "data")
            .handle((data, ex) -> {
                if (ex != null) return "错误: " + ex.getMessage();
                return "成功: " + data;
            })
            .thenAccept(System.out::println);
    }

    static void sleep(long ms) {
        try { Thread.sleep(ms); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
    }
}

CompletableFuture 方法速查

方法说明返回
supplyAsync(fn)异步执行有返回值CompletableFuture<T>
runAsync(rn)异步执行无返回值CompletableFuture<Void>
thenApply(fn)转换结果CompletableFuture<R>
thenAccept(consumer)消费结果CompletableFuture<Void>
thenCompose(fn)扁平化组合CompletableFuture<R>
thenCombine(other, fn)两个结果合并CompletableFuture<R>
allOf(cfs...)等待全部完成CompletableFuture<Void>
anyOf(cfs...)任意一个完成CompletableFuture<Object>
exceptionally(fn)异常处理CompletableFuture<T>
handle(fn)统一处理CompletableFuture<R>

并发工具类

import java.util.concurrent.*;

public class ConcurrentUtils {
    // CountDownLatch —— 倒计数门闩
    static void countDownLatchDemo() throws InterruptedException {
        int taskCount = 3;
        CountDownLatch latch = new CountDownLatch(taskCount);

        for (int i = 0; i < taskCount; i++) {
            final int taskId = i;
            new Thread(() -> {
                try {
                    Thread.sleep((long)(Math.random() * 1000));
                    System.out.println("任务 " + taskId + " 完成");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    latch.countDown();
                }
            }).start();
        }

        latch.await();  // 等待所有任务完成
        System.out.println("所有任务已完成");
    }

    // Semaphore —— 信号量(限流)
    static void semaphoreDemo() {
        Semaphore sem = new Semaphore(3);  // 最多3个并发

        for (int i = 0; i < 10; i++) {
            final int id = i;
            new Thread(() -> {
                try {
                    sem.acquire();  // 获取许可
                    System.out.println(id + " 进入(并发: " + (3 - sem.availablePermits()) + ")");
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    sem.release();
                }
            }).start();
        }
    }

    // ConcurrentHashMap
    static void concurrentMapDemo() {
        ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
        map.put("count", 0);

        // 原子操作
        map.compute("count", (k, v) -> v == null ? 1 : v + 1);
        map.merge("count", 1, Integer::sum);
        map.putIfAbsent("new", 1);

        // 批量操作(JDK 8+)
        map.forEach(1, (k, v) -> System.out.println(k + " = " + v));
        long count = map.reduceValues(1, v -> v > 0 ? 1L : 0L, Long::sum);
    }
}

⚠️ 注意事项

  1. 不要使用 Executors.newFixedThreadPool 无界队列 — 可能导致 OOM。
  2. CompletableFuture 的异常不会被吞掉 — 调用 join() 会抛 CompletionException
  3. 优先使用 CompletableFuture 而非 Future — 支持组合、异常处理。
  4. volatile 只保证可见性count++ 不是原子操作,需要 synchronized 或 AtomicInteger。

💡 技巧

  1. 虚拟线程(JDK 21+)

    try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
        IntStream.range(0, 10_000).forEach(i ->
            executor.submit(() -> { Thread.sleep(1000); return i; }));
    }
    
  2. 超时控制

    CompletableFuture.supplyAsync(() -> slowTask())
        .orTimeout(5, TimeUnit.SECONDS)        // JDK 9+
        .exceptionally(ex -> "超时降级");
    
  3. 并行流自定义线程池

    ForkJoinPool pool = new ForkJoinPool(4);
    pool.submit(() -> list.parallelStream().map(...).collect(...)).get();
    

🏢 业务场景

  • 异步 API 调用: CompletableFuture 并行调用多个微服务。
  • 限流: Semaphore 限制并发请求数。
  • 批量处理: ExecutorService 分批处理数据。
  • 定时任务: ScheduledExecutorService 替代 Timer。

📖 扩展阅读