Java 完全指南 / 18 - 并发:Thread、ExecutorService、CompletableFuture、锁
18 - 并发:Thread、ExecutorService、CompletableFuture、锁
创建线程
public class ThreadDemo {
// 方式1:继承 Thread
static class MyThread extends Thread {
@Override
public void run() {
System.out.println("Thread 子类: " + Thread.currentThread().getName());
}
}
public static void main(String[] args) throws InterruptedException {
// 方式2:实现 Runnable(推荐)
Thread t1 = new Thread(() -> {
System.out.println("Runnable: " + Thread.currentThread().getName());
}, "worker-1");
// 方式3:实现 Callable(有返回值)
var executor = java.util.concurrent.Executors.newFixedThreadPool(2);
var future = executor.submit(() -> {
Thread.sleep(1000);
return 42;
});
System.out.println("Callable 结果: " + future.get()); // 阻塞等待
new MyThread().start();
t1.start();
t1.join(); // 等待线程结束
executor.shutdown();
}
}
线程状态
NEW → RUNNABLE → BLOCKED / WAITING / TIMED_WAITING → TERMINATED
| 方法 | 说明 |
|---|---|
start() | 启动线程 |
join() | 等待线程结束 |
sleep(ms) | 当前线程休眠 |
interrupt() | 中断线程 |
yield() | 让出 CPU 时间片 |
setDaemon(true) | 设为守护线程 |
线程同步
public class SyncDemo {
private int count = 0;
private final Object lock = new Object();
// synchronized 方法
public synchronized void increment() {
count++;
}
// synchronized 块
public void decrement() {
synchronized (lock) {
count--;
}
}
// ReentrantLock
private final java.util.concurrent.locks.ReentrantLock reLock =
new java.util.concurrent.locks.ReentrantLock();
public void safeIncrement() {
reLock.lock();
try {
count++;
} finally {
reLock.unlock();
}
}
// volatile —— 保证可见性(不保证原子性)
private volatile boolean running = true;
public void stop() { running = false; }
public static void main(String[] args) throws InterruptedException {
SyncDemo demo = new SyncDemo();
var pool = java.util.concurrent.Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++) {
pool.submit(demo::increment);
}
pool.shutdown();
pool.awaitTermination(5, java.util.concurrent.TimeUnit.SECONDS);
System.out.println("Count: " + demo.count); // 1000
}
}
原子类
import java.util.concurrent.atomic.*;
public class AtomicDemo {
private final AtomicInteger counter = new AtomicInteger(0);
private final AtomicReference<String> ref = new AtomicReference<>("initial");
public void incrementAndGet() {
int newVal = counter.incrementAndGet(); // CAS 操作
}
public int getAndUpdate() {
return counter.getAndUpdate(n -> n + 5); // 函数式更新
}
// LongAdder —— 高并发计数(比 AtomicLong 更快)
private final java.util.concurrent.atomic.LongAdder adder = new java.util.concurrent.atomic.LongAdder();
public void addCount() {
adder.increment();
adder.add(10);
}
public long getCount() { return adder.sum(); }
}
ExecutorService 线程池
import java.util.concurrent.*;
public class ExecutorDemo {
public static void main(String[] args) throws Exception {
// 固定大小线程池
ExecutorService fixed = Executors.newFixedThreadPool(4);
// 缓存线程池
ExecutorService cached = Executors.newCachedThreadPool();
// 单线程池
ExecutorService single = Executors.newSingleThreadExecutor();
// 定时线程池
ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(2);
// 推荐:自定义 ThreadPoolExecutor
ThreadPoolExecutor custom = new ThreadPoolExecutor(
4, // 核心线程数
8, // 最大线程数
60, TimeUnit.SECONDS, // 空闲线程存活时间
new LinkedBlockingQueue<>(100), // 任务队列
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
// 提交任务
Future<String> future = fixed.submit(() -> {
Thread.sleep(1000);
return "完成";
});
System.out.println(future.get(5, TimeUnit.SECONDS));
// 定时任务
scheduled.scheduleAtFixedRate(() -> {
System.out.println("定时任务: " + System.currentTimeMillis());
}, 0, 2, TimeUnit.SECONDS);
// invokeAll —— 批量执行
var tasks = List.of(
() -> { Thread.sleep(500); return "任务1"; },
() -> { Thread.sleep(300); return "任务2"; },
() -> { Thread.sleep(400); return "任务3"; }
);
var results = fixed.invokeAll(tasks);
for (var r : results) {
System.out.println(r.get());
}
// 优雅关闭
fixed.shutdown();
if (!fixed.awaitTermination(10, TimeUnit.SECONDS)) {
fixed.shutdownNow();
}
}
}
拒绝策略
| 策略 | 行为 |
|---|---|
AbortPolicy(默认) | 抛 RejectedExecutionException |
CallerRunsPolicy | 提交者线程执行任务 |
DiscardPolicy | 静默丢弃 |
DiscardOldestPolicy | 丢弃队列中最早的任务 |
CompletableFuture
import java.util.concurrent.CompletableFuture;
public class CompletableFutureDemo {
public static void main(String[] args) throws Exception {
// 基本创建
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
sleep(500);
return "Hello";
});
// 链式操作
String result = CompletableFuture.supplyAsync(() -> "Hello")
.thenApply(s -> s + " World") // 转换
.thenApply(String::toUpperCase) // 再转换
.thenApply(s -> s + "!") // 再转换
.join(); // 阻塞获取
System.out.println(result); // HELLO WORLD!
// 消费结果(无返回值)
CompletableFuture.runAsync(() -> System.out.println("异步执行"));
// 组合多个异步任务
var userFuture = CompletableFuture.supplyAsync(() -> "用户数据");
var orderFuture = CompletableFuture.supplyAsync(() -> "订单数据");
// 等待所有完成
CompletableFuture.allOf(userFuture, orderFuture).join();
System.out.println(userFuture.join() + " + " + orderFuture.join());
// 任意一个完成
CompletableFuture.anyOf(
CompletableFuture.supplyAsync(() -> { sleep(1000); return "慢"; }),
CompletableFuture.supplyAsync(() -> { sleep(100); return "快"; })
).thenAccept(r -> System.out.println("最快: " + r));
// 异常处理
String fallback = CompletableFuture.supplyAsync(() -> {
if (true) throw new RuntimeException("失败");
return "ok";
}).exceptionally(ex -> "降级数据") // 异常时返回默认值
.join();
System.out.println(fallback); // 降级数据
// handle —— 无论成功失败都处理
CompletableFuture.supplyAsync(() -> "data")
.handle((data, ex) -> {
if (ex != null) return "错误: " + ex.getMessage();
return "成功: " + data;
})
.thenAccept(System.out::println);
}
static void sleep(long ms) {
try { Thread.sleep(ms); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
}
}
CompletableFuture 方法速查
| 方法 | 说明 | 返回 |
|---|---|---|
supplyAsync(fn) | 异步执行有返回值 | CompletableFuture<T> |
runAsync(rn) | 异步执行无返回值 | CompletableFuture<Void> |
thenApply(fn) | 转换结果 | CompletableFuture<R> |
thenAccept(consumer) | 消费结果 | CompletableFuture<Void> |
thenCompose(fn) | 扁平化组合 | CompletableFuture<R> |
thenCombine(other, fn) | 两个结果合并 | CompletableFuture<R> |
allOf(cfs...) | 等待全部完成 | CompletableFuture<Void> |
anyOf(cfs...) | 任意一个完成 | CompletableFuture<Object> |
exceptionally(fn) | 异常处理 | CompletableFuture<T> |
handle(fn) | 统一处理 | CompletableFuture<R> |
并发工具类
import java.util.concurrent.*;
public class ConcurrentUtils {
// CountDownLatch —— 倒计数门闩
static void countDownLatchDemo() throws InterruptedException {
int taskCount = 3;
CountDownLatch latch = new CountDownLatch(taskCount);
for (int i = 0; i < taskCount; i++) {
final int taskId = i;
new Thread(() -> {
try {
Thread.sleep((long)(Math.random() * 1000));
System.out.println("任务 " + taskId + " 完成");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown();
}
}).start();
}
latch.await(); // 等待所有任务完成
System.out.println("所有任务已完成");
}
// Semaphore —— 信号量(限流)
static void semaphoreDemo() {
Semaphore sem = new Semaphore(3); // 最多3个并发
for (int i = 0; i < 10; i++) {
final int id = i;
new Thread(() -> {
try {
sem.acquire(); // 获取许可
System.out.println(id + " 进入(并发: " + (3 - sem.availablePermits()) + ")");
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
sem.release();
}
}).start();
}
}
// ConcurrentHashMap
static void concurrentMapDemo() {
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
map.put("count", 0);
// 原子操作
map.compute("count", (k, v) -> v == null ? 1 : v + 1);
map.merge("count", 1, Integer::sum);
map.putIfAbsent("new", 1);
// 批量操作(JDK 8+)
map.forEach(1, (k, v) -> System.out.println(k + " = " + v));
long count = map.reduceValues(1, v -> v > 0 ? 1L : 0L, Long::sum);
}
}
⚠️ 注意事项
- 不要使用
Executors.newFixedThreadPool无界队列 — 可能导致 OOM。 - CompletableFuture 的异常不会被吞掉 — 调用
join()会抛CompletionException。 - 优先使用
CompletableFuture而非Future— 支持组合、异常处理。 - volatile 只保证可见性 —
count++不是原子操作,需要 synchronized 或 AtomicInteger。
💡 技巧
虚拟线程(JDK 21+):
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { IntStream.range(0, 10_000).forEach(i -> executor.submit(() -> { Thread.sleep(1000); return i; })); }超时控制:
CompletableFuture.supplyAsync(() -> slowTask()) .orTimeout(5, TimeUnit.SECONDS) // JDK 9+ .exceptionally(ex -> "超时降级");并行流自定义线程池:
ForkJoinPool pool = new ForkJoinPool(4); pool.submit(() -> list.parallelStream().map(...).collect(...)).get();
🏢 业务场景
- 异步 API 调用: CompletableFuture 并行调用多个微服务。
- 限流: Semaphore 限制并发请求数。
- 批量处理: ExecutorService 分批处理数据。
- 定时任务: ScheduledExecutorService 替代 Timer。