阶段一:线程基础能力
约 9577 字大约 32 分钟
2026-03-19
学习重点:学习线程本身。
重要
start():真正开启新线程,由 JVM 执行 run() 方法。
run():一个普通方法。
join():等待子线程执行完毕。
判断线程的基本状态
Thread t = new Thread(() -> {});
System.out.println(t.getState()); // NEW
t.start();
System.out.println(t.getState()); // RUNNABLE 或 TIMED_WAITING 等线程的三种创建方式
方式 1:继承 Thread
class Thread1 extends Thread {
@Override
public void run() {
System.out.println("==thread2 is running");
for (int i = 0; i < 5; i++) {
System.out.println("==thread2:" + i);
}
}
}方式 2:实现 Runnable 接口
class Thread2 implements Runnable {
@Override
public void run() {
System.out.println("--thread1 is running");
for (int i = 0; i < 5; i++) {
System.out.println("--thread1:" + i);
}
}
}方式 3: FeatureTask + Callable
class Thread3 implements Callable<String> {
@Override
public String call() throws Exception {
return "~~thread3 is running";
}
}
public class BasicThread {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> futureTask = new FutureTask<>(new Thread3());
Thread t3 = new Thread(futureTask);
t3.start();
String result = futureTask.get();
System.out.println(result);
}
}线程同步与通信
提示
wait()、notify() 等方法由 Object 提供。
线程同步的几种常见场景
| 场景 | 建议做法 |
|---|---|
| 多线程累加变量 | synchronized、Lock、AtomicInteger |
| 多线程读写共享集合 | Collections.synchronizedList、ConcurrentHashMap |
| 需要等待某个条件 | wait/notify、CountDownLatch、Condition |
线程同步和通信包含两组工具
wait() / notify()+synchroniedReentrantLock+Condition
synchronized 关键字(内置锁)
用法 1:作用于实例方法,锁的是当前对象
public synchronized void increment() {
count++;
}
// 等价于
public void increment() {
synchronized(this) {
count++;
}
}用法 2:作用于静态方法
public static synchronized void increment() {
// 锁的是 Class 对象(Counter.class)
}用法 3:作用于代码块
public void increment() {
synchronized(someObject) {
count++;
}
}对象锁 vs 类锁
- 对象锁:锁住的是某个实例(this 或其他对象)
- 类锁:锁住的是类的 .class 对象
多个线程使用不同对象调用同步方法时,对象锁是互不干扰的
案例:多个线程累加同一个变量
public class SyncCounter {
private int count = 0;
public synchronized void increment() {
count++;
}
public int get() {
return count;
}
public static void main(String[] args) throws InterruptedException {
SyncCounter counter = new SyncCounter();
Thread t1 = new Thread(() -> {
for (int i = 0; i < 10000; i++) {
counter.increment();
}
});
Thread t2 = new Thread(() -> {
for (int i = 0; i < 10000; i++) {
counter.increment();
}
});
t1.start(); t2.start();
t1.join(); t2.join();
System.out.println("最终结果:" + counter.get()); // 期望 20000
}
}wait() / notify() + synchronized
| 方法 | 含义 |
|---|---|
| wait() | 当前线程进入等待状态,释放锁,等待被唤醒 |
| notify() | 唤醒一个正在等待的线程(不会立即释放锁) |
| notifyAll() | 唤醒所有等待线程 |
提示
这些方法都是 Object 类定义的,意味着每个对象都可以作为“锁”和“通信器”。
public class WaitNotifyExample {
private static final Object lock = new Object();
private static boolean ready = false;
public static void main(String[] args) {
Thread waiter = new Thread(() -> {
synchronized (lock) {
while (!ready) {
try {
System.out.println("等待者:开始等待");
lock.wait(); // 释放锁,进入等待
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("等待者:收到通知,继续执行");
}
});
Thread notifier = new Thread(() -> {
synchronized (lock) {
ready = true;
lock.notify(); // 唤醒等待者
System.out.println("通知者:已通知");
}
});
waiter.start();
try {
Thread.sleep(1000); // 确保 waiter 先进入等待
} catch (InterruptedException e) {
e.printStackTrace();
}
notifier.start();
}
}
// 结果:
// 等待者:开始等待
// 通知者:已通知
// 等待者:收到通知,继续执行重要
wait()/notify()必须在同步块中使用(即synchronized(lock))。wait()会释放锁,notify()不会释放锁。- 搭配 while 循环判断条件,避免“假唤醒”。
✅ 实战用途
- 线程协作(如:生产者-消费者模型)。
- 等待条件达成(比如任务就绪、队列不空)。
- 手写线程池或锁时会用到它们。
常见疑问
Q: 为什么推荐用 while (条件不满足) wait() 而不是 if?
A:防止“虚假唤醒”(Spurious Wakeup)
错误写法synchronized (lock) { if (!ready) { lock.wait(); // 一旦被“错误地唤醒”,就跳过这一步了 } // 然后这里就执行了,尽管 ready 可能仍然是 false! }如果用 if,只判断一次,被唤醒后不再检查条件,就可能出现逻辑错误。
正确写法synchronized (lock) { while (!ready) { lock.wait(); // 被唤醒之后再次判断条件 } // 条件真的满足后再继续执行 }这叫 “条件等待 + 状态检查”,是 wait-notify 模型的标准写法。
Q:Spurious Wakeup 是什么?
A:Java 虚拟机规范允许线程在 没有被 notify() 的情况下也可能被唤醒,这是“虚假唤醒”。虽然在绝大多数情况下不会出现,但 JVM 的文档要求开发者必须写 while,因为它无法保证永远不发生。
volatile 关键字
volatile 是 Java 的一个轻量级同步机制,作用是保证 变量在多个线程间的可见性。
public class Demo {
static boolean flag = false;
public static void main(String[] args) throws Exception {
new Thread(() -> {
while (!flag) {
// 可能会死循环
}
System.out.println("flag changed");
}).start();
Thread.sleep(1000);
flag = true;
}
}这段代码理论上会输出 flag changed,但有时候不会。
原因:主线程改了 flag 为 true,但子线程可能缓存了 flag = false 的旧值 ,所以看不到主线程的改动
static volatile boolean flag = false;这表示:每次读写都直接从主内存中读取/写入,不使用线程缓存。
ReentrantLock 可重入锁 + Condition
虽然 synchronized 简单好用,但它有以下局限性:
| 功能需求 | synchronized | ReentrantLock |
|---|---|---|
| 可中断锁获取 | ❌ 不支持 | ✅ 支持 |
| 尝试加锁(带超时) | ❌ 不支持 | ✅ 支持 |
| 公平锁 | ❌ 不支持 | ✅ 支持 |
| 多条件变量 | ❌ 不支持 | ✅ 支持 |
| 更灵活的释放控制 | ❌ 自动释放 | ✅ 手动释放 |
所以,当需要更复杂的控制逻辑或高并发优化时,ReentrantLock 是首选。
import java.util.concurrent.locks.ReentrantLock;
public class LockExample {
private final ReentrantLock lock = new ReentrantLock();
private int count = 0;
public void increment() {
lock.lock(); // 加锁
try {
count++;
} finally {
lock.unlock(); // 一定要释放锁(防止死锁)
}
}
}提示
ReentrantLock 和 synchronized 的简要区别
lock()不会自动释放锁,必须在finally中调用unlock()- 如果
lock()多次,必须unlock()同样多次(可重入)
⚙️ tryLock():尝试加锁
可以防止阻塞,做失败快速返回的处理。
if (lock.tryLock()) {
try {
// 获取到了锁,安全操作
} finally {
lock.unlock();
}
} else {
// 没获取到锁,做其他事
}🕰️ tryLock(timeout, unit):限时尝试加锁
适合高并发限流场景,比如控制同时访问资源的人数。
if (lock.tryLock(1, TimeUnit.SECONDS)) {
try {
// 成功获取锁
} finally {
lock.unlock();
}
} else {
// 一秒内没抢到锁
}⚖️ 公平锁(Fair Lock)
- 公平锁:先来先得
- 非公平锁(默认):性能更好,但可能有“插队”
ReentrantLock lock = new ReentrantLock(true); // true 表示公平锁配套工具:Condition
如果说 synchronized + wait/notify 是配套的,那 ReentrantLock 的配套就是 Condition。
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();| 方法 | 含义 |
|---|---|
| await() | 等待,释放锁 |
| signal() | 唤醒一个线程 |
| signalAll() | 唤醒所有线程 |
示例:ReentrantLock + Condition 实现线程等待通知
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class ConditionExample {
private final ReentrantLock lock = new ReentrantLock();
private final Condition readyCondition = lock.newCondition();
private boolean ready = false;
// 等待线程调用的方法
public void awaitTask() {
lock.lock();
try {
while (!ready) {
System.out.println(Thread.currentThread().getName() + " 等待任务...");
readyCondition.await(); // 释放锁并挂起
}
System.out.println(Thread.currentThread().getName() + " 收到通知,继续执行");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
// 通知线程调用的方法
public void signalTask() {
lock.lock();
try {
ready = true;
System.out.println(Thread.currentThread().getName() + " 准备就绪,发送通知");
readyCondition.signal(); // 唤醒一个等待的线程
} finally {
lock.unlock();
}
}
// 主函数
public static void main(String[] args) throws InterruptedException {
ConditionExample example = new ConditionExample();
// 等待线程
Thread waiter = new Thread(example::awaitTask, "等待者");
// 通知线程(延迟启动)
Thread notifier = new Thread(() -> {
try {
Thread.sleep(2000); // 等待者先启动并进入等待
example.signalTask();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "通知者");
waiter.start();
notifier.start();
waiter.join();
notifier.join();
System.out.println("主线程结束");
}
}
// 可能的结果:
// 等待者 等待任务...
// 通知者 准备就绪,发送通知
// 等待者 收到通知,继续执行
// 主线程结束提示
Condition 是可以创建多个的,当有三个线程顺序执行的时候
可以创建多个 condition:
Condition conditionA = lock.newCondition();
Condition conditionB = lock.newCondition();
Condition conditionC = lock.newCondition();逻辑结构如下:
Thread A:await on conditionA → print A → signal conditionB
Thread B:await on conditionB → print B → signal conditionC
Thread C:await on conditionC → print C → signal conditionA
| 特性 | synchronized | ReentrantLock |
|---|---|---|
| 是否自动释放 | ✅ | ❌ 必须手动释放 |
| 是否支持 tryLock | ❌ | ✅ |
| 条件变量 | ❌ 只能一个(object) | ✅ 多个 Condition |
| 公平性控制 | ❌ | ✅ |
| 是否可中断获取 | ❌ | ✅ |
原子类(Atomic): 无锁编程利器
在多线程环境下,如果多个线程并发更新共享变量(如一个计数器),就必须使用同步手段防止竞态。之前我们用的是:
- synchronized(重量级)
- ReentrantLock(灵活,但也需要手动加解锁)
但有时候,我们只是对一个数值做简单的加减操作,如果用锁未免太重了。
✅ 解决方案:原子类 = 轻量级线程安全操作,利用底层的 CAS(Compare-And-Swap)机制,避免加锁,提高性能。
常见原子类(位于 java.util.concurrent.atomic 包)
| 类名 | 功能 |
|---|---|
AtomicInteger | 原子整型 |
AtomicLong | 原子长整型 |
AtomicBoolean | 原子布尔值 |
AtomicReference<T> | 原子引用类型 |
AtomicInteger 使用示例
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicExample {
private final AtomicInteger count = new AtomicInteger(0);
public void increment() {
count.incrementAndGet(); // 原子加1
}
public int get() {
return count.get(); // 获取当前值
}
public static void main(String[] args) throws InterruptedException {
AtomicExample example = new AtomicExample();
Thread t1 = new Thread(() -> {
for (int i = 0; i < 10000; i++) {
example.increment();
}
});
Thread t2 = new Thread(() -> {
for (int i = 0; i < 10000; i++) {
example.increment();
}
});
t1.start(); t2.start();
t1.join(); t2.join();
System.out.println("最终结果:" + example.get()); // 期望 20000
}
}常用方法一览(以 AtomicInteger 为例)
| 方法 | 含义 |
|---|---|
get() | 获取当前值 |
set(x) | 设置为指定值 |
incrementAndGet() | 加一并返回新值 |
getAndIncrement() | 返回旧值再加一 |
decrementAndGet() | 减一并返回 |
getAndSet(x) | 设置新值并返回旧值 |
compareAndSet(expect, update) | 如果当前值等于 expect,就设置为 update(CAS) |
警告
- 原子类只能保证一个变量的操作是线程安全的
- 若多个变量有关联逻辑(如账户转账),仍需要锁机制
- 和锁比:性能更高、开销更小、但功能也更有限
Atomic vs synchronized
| 特点 | AtomicInteger | synchronized |
|---|---|---|
| 是否加锁 | ❌ 无锁(CAS) | ✅ |
| 是否阻塞 | ❌ | ✅ |
| 粒度 | 单变量 | 任意代码块 |
| 性能 | 高 | 视并发量而定 |
| 适合场景 | 高并发计数器、状态标志等 | 复杂业务逻辑、多变量控制 |
总结一句话
如果你只是在并发环境下对一个数值进行 加减、设置、比较替换等简单操作,首选 AtomicXXX 类。
ThewadLocal
ThreadLocal 提供了线程本地变量,每个线程访问它时,获取的都是自己专属的副本,互不干扰。 它可以让多个线程在不加锁的情况下,各自拥有独立的变量值。
为什么需要它?
在多线程环境中,如果多个线程操作同一个变量,要么:
- 加锁(性能损耗)
- 拷贝副本(麻烦)
而 ThreadLocal 可以完美解决这个问题。
public class ThreadLocalExample {
private static final ThreadLocal<String> local = new ThreadLocal<>();
public static void main(String[] args) {
Runnable task = () -> {
String threadName = Thread.currentThread().getName();
local.set(threadName + " 的数据");
System.out.println(threadName + " 获取:" + local.get());
};
Thread t1 = new Thread(task, "线程A");
Thread t2 = new Thread(task, "线程B");
t1.start();
t2.start();
}
}常用方法
| 方法 | 说明 |
|---|---|
| set(T value) | 设置当前线程的值 |
| get() | 获取当前线程的值 |
| remove() | 清除当前线程的值(防止内存泄漏) |
| initialValue() | 可通过继承 ThreadLocal 并重写该方法设置默认值 |
使用场景举例
- 用户登录上下文信息(当前用户 ID)
- 数据库连接对象
- 日志追踪 ID(链路追踪)
- Spring 中的 RequestContextHolder(Web 请求线程隔离)
重要
内存泄漏风险(重点)
- ThreadLocal 实际是绑定在线程对象里的 Map 中(ThreadLocalMap)
- 如果线程是线程池里的(如 Tomcat、线程池复用),线程不会销毁
- 不调用 remove() 可能导致内存泄漏
try {
local.set("some data");
// 使用
} finally {
local.remove(); // 必须清理
}提示
内存泄露(Memory Leak):内存被占用,但是无法被回收。Leak 是比喻泼出去的水,无法再拿回来。
内存溢出(Memory Overflow):内存不够直接撑爆了。
总结
| 特点 | 内容 |
|---|---|
| 作用 | 线程私有变量 |
| 是否线程安全 | ✅(天生安全) |
| 是否共享数据 | ❌ 每个线程独立 |
| 使用建议 | 有设置就要有 remove |
并发容器(ConcurrentHashMap、BlockingQueue 等)
这些数据结构是支持高并发读写且无需手动加锁的神器。
在多线程环境下,常规的集合类(如 ArrayList、HashMap、HashSet)是非线程安全的:
- 并发修改时可能导致数据损坏(如死循环、元素丢失)
- 手动加锁麻烦且容易出错
Java 为了解决这个问题,提供了 高性能、线程安全的并发集合类。
常见并发容器对比表
| 容器 | 描述 | 适用场景 |
|---|---|---|
| ConcurrentHashMap | 线程安全的哈希表 | 替代 HashMap |
| CopyOnWriteArrayList | 读写分离列表 | 多读少写的列表 |
| BlockingQueue | 阻塞队列 | 生产者-消费者模型 |
| ConcurrentLinkedQueue | 非阻塞链表队列 | 高性能队列 |
| ConcurrentSkipListMap | 有序Map,基于跳表 | 替代 TreeMap |
ConcurrentHashMap(最常用)
支持高并发读写,内部采用分段锁(早期)+ CAS 机制实现。
import java.util.concurrent.ConcurrentHashMap;
public class CHMExample {
public static void main(String[] args) {
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
map.put("A", 1);
map.put("B", 2);
map.compute("A", (k, v) -> v + 1); // 原子更新
System.out.println(map); // 输出:{A=2, B=2}
}
}CopyOnWriteArrayList(多读少写)
✅ 适合“读远大于写”的场景,如系统配置、白名单等。
❗ 缺点:每次写操作都会 复制整个数组,写多了性能差。
import java.util.concurrent.CopyOnWriteArrayList;
public class COWExample {
public static void main(String[] args) {
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
list.add("Java");
list.add("Python");
for (String item : list) {
list.add("Go"); // 允许边遍历边修改,不抛异常
}
System.out.println(list); // 输出所有元素
}
}BlockingQueue(支持阻塞等待)
接口:BlockingQueue<E>
常用实现:
ArrayBlockingQueue(有界)LinkedBlockingQueue(无界)DelayQueue(带延迟)PriorityBlockingQueue(优先级队列)
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ProducerConsumer {
public static void main(String[] args) {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(2);
Thread producer = new Thread(() -> {
try {
queue.put(1); // 阻塞插入
System.out.println("生产了1");
queue.put(2);
System.out.println("生产了2");
queue.put(3); // 阻塞等待空间
System.out.println("生产了3");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread consumer = new Thread(() -> {
try {
Thread.sleep(1000); // 模拟延迟消费
System.out.println("消费了:" + queue.take()); // 阻塞获取
System.out.println("消费了:" + queue.take());
System.out.println("消费了:" + queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
});
producer.start();
consumer.start();
}
}✅ 这是多线程通信中最常用的队列形式,可以轻松实现线程解耦、异步任务处理。
总结
| 容器 | 特点 | 场景推荐 |
|---|---|---|
| ConcurrentHashMap | 高并发读写,不需要锁 | 替代 HashMap |
| CopyOnWriteArrayList | 写时复制,线程安全 | 多读少写 |
| BlockingQueue | 支持阻塞 put/take | 生产者-消费者 |
| ConcurrentLinkedQueue | 非阻塞队列 | 高性能无界队列 |
| ConcurrentSkipListMap | 有序线程安全 Map | 高并发排序结构 |
JUC 并发控制工具类
Java 提供了一批位于 java.util.concurrent 包下的 线程协调工具类,它们不是“锁”,但可以让多个线程协作完成任务。
| 工具类 | 功能简介 |
|---|---|
| CountDownLatch | 倒计时锁,等待一组线程完成 |
| CyclicBarrier | 回环屏障,等待一组线程到达统一点 |
| Semaphore | 信号量,限流控制 |
CountDownLatch - 等待多个线程完成
✅ 核心概念:
- 一开始设置一个计数值(例如 3)
- 每个线程完成后调用 countDown(),计数器减一
- 主线程调用 await() 会一直阻塞,直到计数为 0
🧠 应用场景:
- 主线程等待多个子任务初始化完成后再执行
- 多线程批处理
import java.util.concurrent.CountDownLatch;
public class LatchExample {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);
Runnable worker = () -> {
System.out.println(Thread.currentThread().getName() + " 正在处理任务...");
try {
Thread.sleep(1000);
} catch (InterruptedException ignored) {}
System.out.println(Thread.currentThread().getName() + " 完成任务!");
latch.countDown(); // 减一
};
for (int i = 0; i < 3; i++) {
new Thread(worker, "线程" + i).start();
}
latch.await(); // 主线程阻塞,直到 count 为 0
System.out.println("所有子线程已完成,主线程继续执行!");
}
}public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(5);
new Thread(() -> {
try {
System.out.println("查询用户信息...");
Thread.sleep(500);
System.out.println("用户信息查询完成");
} catch (InterruptedException ignored) {
} finally {
latch.countDown();
}
}).start();
new Thread(() -> {
try {
System.out.println("查询订单...");
Thread.sleep(700);
System.out.println("订单查询完成");
} catch (InterruptedException ignored) {
} finally {
latch.countDown();
}
}).start();
new Thread(() -> {
try {
System.out.println("查询库存...");
Thread.sleep(300);
System.out.println("库存查询完成");
} catch (InterruptedException ignored) {
} finally {
latch.countDown();
}
}).start();
new Thread(() -> {
try {
System.out.println("查询物流...");
Thread.sleep(800);
System.out.println("物流查询完成");
} catch (InterruptedException ignored) {
} finally {
latch.countDown();
}
}).start();
new Thread(() -> {
try {
System.out.println("查询支付...");
Thread.sleep(600);
System.out.println("支付查询完成");
} catch (InterruptedException ignored) {
} finally {
latch.countDown();
}
}).start();
// 主线程等待所有子任务完成
latch.await();
System.out.println("所有查询完成,开始汇总处理...");
}
}CyclicBarrier - 等待彼此都到齐
✅ 核心概念:
- 一组线程互相等待,直到“全部到齐”
- 到齐后统一继续执行
- 可以 循环使用
🧠 应用场景:
- 多个线程并行准备、统一执行
- 分阶段执行任务(如地图加载 + UI 渲染)
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class BarrierExample {
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(3, () ->
System.out.println("全部到达屏障点,统一出发!"));
Runnable task = () -> {
try {
System.out.println(Thread.currentThread().getName() + " 准备就绪");
Thread.sleep((long)(Math.random() * 2000));
barrier.await(); // 等待其他线程到达
System.out.println(Thread.currentThread().getName() + " 开始执行任务");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
};
for (int i = 0; i < 3; i++) {
new Thread(task, "线程" + i).start();
}
}
}import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierExample2 {
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(3,
() -> System.out.println("=== 所有子任务到达屏障点,进入下一阶段 ==="));
Runnable task1 = () -> doTask("任务A", barrier);
Runnable task2 = () -> doTask("任务B", barrier);
Runnable task3 = () -> doTask("任务C", barrier);
new Thread(task1).start();
new Thread(task2).start();
new Thread(task3).start();
}
private static void doTask(String name, CyclicBarrier barrier) {
try {
// 第一阶段
System.out.println(name + ":执行第一阶段工作");
Thread.sleep((long) (Math.random() * 1000));
System.out.println(name + ":第一阶段完成,等待其他任务");
barrier.await(); // 等待其他任务第一阶段完成
// 第二阶段
System.out.println(name + ":执行第二阶段工作");
Thread.sleep((long) (Math.random() * 1000));
System.out.println(name + ":第二阶段完成,等待其他任务");
barrier.await(); // 等待其他任务第二阶段完成
System.out.println(name + ":所有阶段完成,结束");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}任务A:执行第一阶段工作
任务B:执行第一阶段工作
任务C:执行第一阶段工作
任务B:第一阶段完成,等待其他任务
任务A:第一阶段完成,等待其他任务
任务C:第一阶段完成,等待其他任务
=== 所有子任务到达屏障点,进入下一阶段 ===
任务A:执行第二阶段工作
任务B:执行第二阶段工作
任务C:执行第二阶段工作
任务A:第二阶段完成,等待其他任务
任务B:第二阶段完成,等待其他任务
任务C:第二阶段完成,等待其他任务
=== 所有子任务到达屏障点,进入下一阶段 ===
任务A:所有阶段完成,结束
任务B:所有阶段完成,结束
任务C:所有阶段完成,结束Semaphore - 控制并发线程数
✅ 核心概念:
- 类似限流器
- 控制同时运行的线程数量
- 超过限制会阻塞,直到有线程释放许可
🧠 应用场景:
- 接口限流、并发控制
- 数据库连接池最大连接数
- 控制最多几个用户同时登录等
import java.util.concurrent.Semaphore;
public class SemaphoreExample {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(2); // 最多允许2个线程同时执行
Runnable task = () -> {
try {
semaphore.acquire(); // 获取许可
System.out.println(Thread.currentThread().getName() + " 获取许可,开始处理...");
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName() + " 处理完毕,释放许可");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release(); // 释放许可
}
};
for (int i = 0; i < 5; i++) {
new Thread(task, "线程" + i).start();
}
}
}import java.util.concurrent.Semaphore;
public class SemaphoreExample {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(2); // 最多允许两个线程同时运行
Runnable[] queries = new Runnable[]{
() -> query("用户查询", 500, semaphore),
() -> query("订单查询", 800, semaphore),
() -> query("库存查询", 300, semaphore),
() -> query("物流查询", 700, semaphore),
() -> query("支付查询", 600, semaphore)
};
for (Runnable query : queries) {
new Thread(query).start();
}
}
private static void query(String name, int millis, Semaphore semaphore) {
try {
semaphore.acquire(); // 获取许可
System.out.println(name + " 开始");
Thread.sleep(millis);
System.out.println(name + " 完成");
} catch (InterruptedException ignored) {
} finally {
semaphore.release(); // 释放许可
}
}
}总结:三种工具类对比
| 工具类 | 用法场景 | 是否可复用 | 核心方法 |
|---|---|---|---|
| CountDownLatch | 等待多个线程完成 | ❌ 不可重置 | countDown() / await() |
| CyclicBarrier | 一组线程集合后一起执行 | ✅ 可重置 | await() |
| Semaphore | 控制资源数量/并发限流 | ✅ 可复用 | acquire() / release() |
线程池(ExecutorService & ThreadPoolExecutor)
几乎所有实际项目中都用线程池来执行任务,尤其在:
- 接口异步处理
- 消息消费
- 定时任务调度
- 后台计算任务
等场景中,是性能、稳定性和扩展性的关键保障。
需要线程池的原因
如果每次执行任务都 new Thread():
❌ 缺点:
- 每次都创建线程,开销大
- 无法管理线程的数量和生命周期
- 无法复用线程、易OOM或线程过多
- 不利于排查问题
✅ 使用线程池好处:
- 控制最大并发数,防止系统资源耗尽
- 线程复用,避免频繁创建销毁
- 支持任务排队与调度
- 提供统一的生命周期管理、拒绝策略、异常处理机制
线程池核心接口:ExecutorService
ExecutorService executor = Executors.newFixedThreadPool(4);
executor.submit(() -> {
// 执行任务
});
executor.shutdown();这是最常用的线程池接口,支持:
- 提交任务
- 获取执行结果(Future)
- 优雅关闭线程池(shutdown())
四种常见线程池创建方式(Executors 工具类)
Executors.newFixedThreadPool(n); // 固定大小线程池
Executors.newCachedThreadPool(); // 自动扩容线程池
Executors.newSingleThreadExecutor(); // 单线程池
Executors.newScheduledThreadPool(n); // 定时任务线程池🔥 推荐使用自定义 ThreadPoolExecutor
public ThreadPoolExecutor(
int corePoolSize, // 核心线程数
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 非核心线程最大空闲时间
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, // 任务队列
ThreadFactory threadFactory, // 线程工厂(可选)
RejectedExecutionHandler handler // 拒绝策略
)笔记
面试中经常会问这几个参数。
import java.util.concurrent.*;
public class CustomThreadPool {
public static void main(String[] args) {
ExecutorService pool = new ThreadPoolExecutor(
2, // 核心线程数
4, // 最大线程数
60, TimeUnit.SECONDS,// 非核心线程存活时间
new LinkedBlockingQueue<>(10), // 等待队列
Executors.defaultThreadFactory(), // 线程工厂
new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
);
for (int i = 0; i < 15; i++) {
final int taskId = i;
pool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " 执行任务 " + taskId);
try {
Thread.sleep(1000);
} catch (InterruptedException ignored) {}
});
}
pool.shutdown();
}
}线程池中任务的排队与调度机制
三类任务提交过程:
- 核心线程未满 ➜ 新建线程处理任务
- 核心线程满,队列未满 ➜ 任务加入队列等待
- 队列满,线程数量未达最大 ➜ 创建非核心线程处理任务
- 队列满,线程满 ➜ 启动拒绝策略
拒绝策略(RejectedExecutionHandler)
| 策略 | 描述 |
|---|---|
| AbortPolicy | 默认,抛异常(推荐) |
| CallerRunsPolicy | 任务回退给提交线程 |
| DiscardPolicy | 直接丢弃任务 |
| DiscardOldestPolicy | 丢弃最早的队列任务,尝试执行当前任务 |
线程池参数推荐配置(参考)
| 项 | 推荐值说明 |
|---|---|
| 核心线程数 | CPU 核心数,如 Runtime.getRuntime().availableProcessors() |
| 最大线程数 | 视业务复杂度而定(建议为核心线程数的 2~4 倍) |
| 队列大小 | 看任务堆积容忍度,太大会导致内存撑爆 |
| 拒绝策略 | 推荐用 AbortPolicy 配合日志或监控告警 |
总结:要牢记的线程池核心点
| 关键项 | 说明 |
|---|---|
| 线程池入口 | ExecutorService |
| 推荐方式 | 自定义 ThreadPoolExecutor |
| 拒绝策略 | 一定要了解,避免任务悄悄丢失 |
| 线程复用 | 节省资源、提升性能 |
| shutdown | 及时关闭线程池,释放资源 |
线程池中的异步任务与 Future
什么是 Future
Future 表示一个可能还未完成的异步任务的结果,你可以:
- 提交一个任务,让它在线程池中运行
- 后续通过 Future.get() 获取结果(同步阻塞)
- 或者判断是否完成 / 取消任务
基础示例:submit + Future
import java.util.concurrent.*;
public class FutureExample {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(2);
Future<Integer> future = executor.submit(() -> {
Thread.sleep(1000);
return 42;
});
System.out.println("任务已提交,主线程可以做点别的事...");
// 阻塞等待结果
Integer result = future.get();
System.out.println("任务结果: " + result);
executor.shutdown();
}
}
// 输出
// 任务已提交,主线程可以做点别的事...
// 任务结果: 42Future.get() 是阻塞的!
future.get(); // 会一直等,直到任务完成
// 可以使用超时控制:
future.get(2, TimeUnit.SECONDS); // 最多等 2 秒局限性
Future 虽然可以获取结果,但它:
- ❌ 不支持任务完成后的自动回调
- ❌ 不支持链式任务(一个任务依赖另一个任务结果)
- ❌ 代码写起来容易阻塞、嵌套复杂
升级! CompletableFuture(JDK 8+)
CompletableFuture 是 Java8 引入的强大工具,支持:
| 特性 | 说明 |
|---|---|
| 非阻塞回调 | 支持任务完成后自动执行回调 |
| 异步执行 | 提交任务后不阻塞主线程 |
| 链式调用 | 支持多个任务组合、依赖 |
| 异常处理 | 支持 exceptionally() 捕获异常 |
示例 1:异步执行 + 回调
import java.util.concurrent.CompletableFuture;
public class CFExample1 {
public static void main(String[] args) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(1000);
System.out.println("任务完成!");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println("主线程继续执行");
future.join(); // 等待结束(非必须)
}
}有返回值 + thenApply 链式处理
import java.util.concurrent.CompletableFuture;
public class CFExample2 {
public static void main(String[] args) {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 10;
}).thenApply(value -> {
return value * 2;
});
System.out.println("结果是:" + future.join()); // 输出 20
}
}异常处理(exceptionally)
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
if (true) throw new RuntimeException("出错了!");
return 1;
}).exceptionally(e -> {
System.out.println("捕获异常:" + e.getMessage());
return -1;
});
System.out.println("结果是:" + future.join()); // 输出 -1CompletableFuture 核心方法一览
| 方法 | 说明 |
|---|---|
runAsync(Runnable) | 无返回值的异步任务 |
supplyAsync(Supplier<T>) | 有返回值的异步任务 |
thenApply(fn) | 对返回值做处理 |
thenAccept(fn) | 消费返回值,无返回 |
thenRun(fn) | 不关心结果,只执行 |
thenCombine(f1, f2) | 组合两个任务的结果 |
exceptionally(fn) | 异常恢复 |
whenComplete(fn) | 无论成功或失败都执行 |
总结:线程池异步编程核心对比
| 工具 | 特性 | 适合场景 |
|---|---|---|
| Future | 简单异步,阻塞获取结果 | 异步任务提交 |
| CompletableFuture | 非阻塞、回调、组合任务 | 复杂异步流程 |
💪 实战应用场景
- 并行远程调用多个服务(提升响应速度)
- 数据预处理 + UI 渲染
- 异步计算 + 回调通知
- 多任务合并处理、聚合数据
Q&A:FutureTask vs Future 的关系
一、类关系
这是 FutureTask
public class FutureTask<V> implements RunnableFuture<V>而 RunnableFuture 是一个接口,继承了 Runnable 和 Future:
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}结论:
FutureTask 是一个类,实现了 Future 接口,也实现了 Runnable 接口。
这意味着:
- 它可以被提交到线程池中执行(因为是 Runnable)
- 它也可以作为 Future 来获取异步结果
二、使用场景
你想自己控制异步任务的启动(不依赖线程池),就使用 FutureTask。
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
public class FutureTaskExample {
public static void main(String[] args) throws Exception {
Callable<Integer> task = () -> {
Thread.sleep(1000);
return 100;
};
FutureTask<Integer> futureTask = new FutureTask<>(task);
Thread thread = new Thread(futureTask);
thread.start();
System.out.println("主线程继续干活...");
Integer result = futureTask.get(); // 阻塞等待
System.out.println("结果是:" + result);
}
}三、和 ExecutorService.submit(Callable) 的对比
| 特性 | FutureTask | submit(Callable) |
|---|---|---|
| 是否线程池依赖 | ❌(可单独启动线程) | ✅ |
| 是否实现 Future | ✅ | ✅ |
| 是否支持取消 | ✅ | ✅ |
| 使用复杂度 | 稍复杂 | 简单 |
| 可复用性 | ✅(可以复用 task 对象) | ❌(一次性提交) |
四、总结
一句话:FutureTask = 既是一个任务,又是一个结果容器,属于 Future 的具体实现。
你可以用它直接结合线程(new Thread(futureTask)),也可以作为 Runnable 被线程池执行。
🎯 实用建议
- ✅ 简单异步任务:用 submit() 返回 Future
- ✅ 想自己掌控线程生命周期:用 FutureTask + Callable
- ✅ 更复杂的链式异步:用 CompletableFuture
线程基础总结
一、线程创建方式
| 方法 | 特点 | 场景示例 |
|---|---|---|
| 继承 Thread | 简单直观,不推荐复用 | 学习阶段、小型测试 |
| 实现 Runnable | 更灵活,适合线程池 | 批量任务处理、日志异步写入 |
| 实现 Callable + FutureTask | 有返回值,可结合线程池 | 计算型任务:分页查询结果、文件分析 |
| 线程池(ExecutorService) | 线程复用,统一管理 | 99% 实战任务处理选择 |
| CompletableFuture | 链式、异步、回调 | 并行请求、任务编排、聚合数据 |
二、线程同步与并发控制
| 工具 | 特点 | 抽象场景 | 具体案例 |
|---|---|---|---|
| synchronized | 内置锁,代码简洁 | 临界区控制 | 银行转账、票务系统 |
| ReentrantLock | 显式加锁,可中断/尝试锁 | 更灵活的资源保护 | 并发写数据库,分布式锁封装 |
| wait/notify | 线程通信 | 等待某条件成立 | 缓冲区满/空、单生产者消费者 |
| Condition | 精准通知 | 多条件同步 | 线程 A 唤醒线程 B |
| AtomicInteger | 无锁计数 | 高并发计数器 | 网站访问量、点赞数 |
三、线程间数据隔离
| 工具 | 特点 | 抽象场景 | 具体案例 |
|---|---|---|---|
| ThreadLocal | 每个线程私有数据 | 请求上下文隔离 | 登录用户信息、Trace ID 传递 |
| InheritableThreadLocal | 子线程继承父线程上下文 | 多线程传递用户上下文 | Spring Web + 异步任务共享用户信息 |
四、并发容器
| 容器 | 特点 | 抽象场景 | 具体案例 |
|---|---|---|---|
| ConcurrentHashMap | 并发读写安全 | 多线程共享缓存 | 配置表、字典表缓存 |
| CopyOnWriteArrayList | 多读少写安全 | 读远大于写 | 白名单列表、系统开关配置 |
| BlockingQueue | 阻塞式队列 | 线程间通信 | 生产者-消费者、任务分发 |
五、并发控制工具类(协调多个线程)
| 工具类 | 特点 | 抽象场景 | 具体案例 |
|---|---|---|---|
| CountDownLatch | 一组线程倒计时 | 等待多个任务完成 | 多个微服务启动后主程序执行 |
| CyclicBarrier | 所有线程同步起步 | 统一执行点 | 多人游戏准备阶段、分布式计算 |
| Semaphore | 资源配额控制 | 并发限流 | 同时只能处理 10 个请求、连接池控制 |
六、线程池(统一调度任务)
| 类型 | 特点 | 抽象场景 | 具体案例 |
|---|---|---|---|
| FixedThreadPool | 固定线程数 | 控制并发强度 | 网络爬虫、数据抓取 |
| CachedThreadPool | 动态线程数 | 短生命周期大量任务 | 日志异步写入 |
| SingleThreadExecutor | 顺序执行任务 | 保证串行逻辑 | 日志归档、定时备份 |
| ScheduledThreadPool | 定时任务 | 周期任务 | 健康检查、定时推送 |
| 自定义 ThreadPoolExecutor | 完全掌控线程行为 | 生产级系统核心 | RocketMQ 消费线程池、业务服务线程隔离 |
七、异步任务处理与编排
| 工具 | 特点 | 抽象场景 | 具体案例 |
|---|---|---|---|
| Future | 同步获取结果 | 提交任务 + 获取结果 | 图像处理、统计任务 |
| CompletableFuture | 异步回调 + 组合任务 | 并发任务聚合、链式处理 | 并行远程调用 + 聚合响应、AI 模型预测结果合并 |
八、学习建议总结:如何选择合适的线程技术?
| 你要实现什么? | 使用方案 |
|---|---|
| 执行异步任务 | ExecutorService |
| 获取异步任务结果 | Future / FutureTask / CompletableFuture |
| 控制并发数量 | Semaphore / 线程池 |
| 多线程间协调等待 | CountDownLatch / CyclicBarrier |
| 多线程共享安全集合 | ConcurrentHashMap, BlockingQueue |
| 多线程计数 | AtomicInteger |
| 实现高性能限流 | 自定义线程池 + Semaphore |
| 多线程数据隔离 | ThreadLocal |
知识图谱结构
---
markmap:
colorFreezeLevel: 2
---
# 线程基础
## 线程同步
- synchronized
- ReentrantLock
- wait/notify
- AtomicXXX
## 线程通信
- BlockingQueue
- CountDownLatch
- CyclicBarrier
- Semaphore
## 线程隔离
- ThreadLocal
## 并发容器
- ConcurrentHashMap
- CopyOnWriteArrayList
## 线程池
- Executors
- ThreadPoolExecutor
- Future / FutureTask
- CompletableFuture实战案例
| 难度 | 任务描述 |
|---|---|
| ⭐ 简单 | 三个线程按顺序打印 A-B-C |
| ⭐⭐ 中等 | 两个线程交替打印奇偶数 |
| ⭐⭐⭐ 进阶 | 使用线程池并发执行任务,收集返回结果 |
| ⭐⭐⭐⭐ 挑战 | 实现生产者-消费者模型(BlockingQueue + 多线程) |
| ⭐⭐⭐⭐ 进阶 | 使用 CompletableFuture 聚合两个异步请求结果 |
实战题目 1:线程顺序打印 —— A、B、C 顺序打印若干次
📘 真实场景背景:
假设你有三个任务:
- A:加载用户数据
- B:加载订单数据
- C:加载推荐内容
这些任务虽然可以异步执行,但你希望它们始终以 A → B → C 的顺序打印,反复执行若干次用于调试或测试。
🧩 抽象题目描述:启动三个线程,分别负责打印字母 "A"、"B"、"C",要求按顺序依次输出。
解法:解法:使用 ReentrantLock + Condition
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class PrintABC {
// 打印次数
private static final int PRINT_COUNT = 5;
public static void main(String[] args) {
Printer printer = new Printer();
new Thread(() -> printer.print("A", 0), "线程-A").start();
new Thread(() -> printer.print("B", 1), "线程-B").start();
new Thread(() -> printer.print("C", 2), "线程-C").start();
}
static class Printer {
private final Lock lock = new ReentrantLock();
private final Condition[] conditions = new Condition[3];
private int state = 0; // 当前应该打印哪个线程:0 -> A, 1 -> B, 2 -> C
public Printer() {
for (int i = 0; i < 3; i++) {
conditions[i] = lock.newCondition();
}
}
/**
* 打印函数
* @param letter 要打印的字符(A/B/C)
* @param targetState 当前线程对应的状态值
*/
public void print(String letter, int targetState) {
for (int i = 0; i < PRINT_COUNT; i++) {
lock.lock();
try {
// 等待轮到自己
while (state % 3 != targetState) {
conditions[targetState].await();
}
// 打印
System.out.println(Thread.currentThread().getName() + " 打印:" + letter);
// 更新状态
state++;
// 唤醒下一个线程
conditions[(targetState + 1) % 3].signal();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
}
}
}
}实战题目 2:两个线程交替打印奇偶数
在日志处理、流水号生成、分布式任务切片等系统中,常常需要两个线程协作:
- 一个处理奇数任务
- 一个处理偶数任务
比如:
- 线程 A 负责打印奇数(1,3,5…)
- 线程 B 负责打印偶数(2,4,6…)
要求两者交替打印,以保证输出顺序正确,避免重复或错位。
🧩 抽象题目描述:
使用两个线程,依次打印数字 1~10,其中一个线程打印奇数,另一个打印偶数,且打印顺序必须正确交替:
Thread-A: 1
Thread-B: 2
Thread-A: 3
Thread-B: 4
...解法:使用 synchronized + wait/notify
public class PrintOddEven {
private static final int MAX = 10;
private int num = 1;
private final Object lock = new Object();
public static void main(String[] args) {
PrintOddEven printer = new PrintOddEven();
Thread oddThread = new Thread(printer::printOdd, "Thread-A");
Thread evenThread = new Thread(printer::printEven, "Thread-B");
oddThread.start();
evenThread.start();
}
// 打印奇数的方法
public void printOdd() {
while (num <= MAX) {
synchronized (lock) {
if (num % 2 == 1) {
System.out.println(Thread.currentThread().getName() + ": " + num);
num++;
lock.notify(); // 唤醒偶数线程
} else {
try {
lock.wait(); // 等待偶数线程处理完
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
}
// 打印偶数的方法
public void printEven() {
while (num <= MAX) {
synchronized (lock) {
if (num % 2 == 0) {
System.out.println(Thread.currentThread().getName() + ": " + num);
num++;
lock.notify(); // 唤醒奇数线程
} else {
try {
lock.wait(); // 等待奇数线程处理完
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
}
}实战题目 3:线程池并发执行任务并收集所有返回结果
📘 真实场景背景:
你正在开发一个聚合查询系统,用户请求一个接口,你需要:
- 查询数据库用户信息
- 调用远程服务查询订单信息
- 查询推荐系统返回推荐列表
这些任务互不依赖,但都比较耗时。你希望并发执行所有任务并收集结果返回给用户,以缩短响应时间。
🧩 抽象题目描述:
创建一个线程池,并发执行 3 个耗时任务,每个任务都返回结果。主线程等待所有任务完成后,收集它们的返回值并打印出来。
解法:使用 ExecutorService + Callable + Future
import java.util.concurrent.*;
import java.util.*;
public class ConcurrentTaskExample {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executor = Executors.newFixedThreadPool(3);
// 定义三个任务
Callable<String> task1 = () -> {
Thread.sleep(1000);
return "用户信息";
};
Callable<String> task2 = () -> {
Thread.sleep(2000);
return "订单信息";
};
Callable<String> task3 = () -> {
Thread.sleep(1500);
return "推荐列表";
};
// 提交任务
List<Future<String>> futures = executor.invokeAll(Arrays.asList(task1, task2, task3));
// 收集结果
System.out.println("所有任务已提交,主线程等待结果...");
for (Future<String> future : futures) {
String result = future.get(); // 阻塞等待结果
System.out.println("收到结果:" + result);
}
executor.shutdown();
}
}实战题目 4:多线程实现生产者-消费者模型
📘 真实场景背景:
你在开发一个图像处理系统,有多个线程不断从设备读取图像(生产者),并将图像放入内存队列中。同时有多个线程从队列中取出图像进行分析(消费者)。
你需要控制好生产与消费之间的节奏,避免:
- 消费速度过慢,导致内存溢出(队列满)
- 生产速度过慢,消费者空转(队列空)
🧩 抽象题目描述:
使用多个线程,模拟生产者不断往队列中放入数据,消费者不断从中取出数据,要求使用线程安全的方式实现生产与消费协调,输出每次的操作。
解法:使用 BlockingQueue(推荐实战方式)
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class ProducerConsumerExample {
// 队列容量
private static final int QUEUE_CAPACITY = 5;
public static void main(String[] args) {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(QUEUE_CAPACITY);
AtomicInteger counter = new AtomicInteger(1); // 模拟产品编号
// 创建生产者线程
Runnable producer = () -> {
while (true) {
try {
String item = "产品-" + counter.getAndIncrement();
queue.put(item); // 如果满了会阻塞
System.out.println(Thread.currentThread().getName() + " 生产了:" + item);
Thread.sleep(500); // 模拟生产耗时
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
};
// 创建消费者线程
Runnable consumer = () -> {
while (true) {
try {
String item = queue.take(); // 如果空了会阻塞
System.out.println(Thread.currentThread().getName() + " 消费了:" + item);
Thread.sleep(800); // 模拟消费耗时
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
};
// 启动多个生产者和消费者线程
ExecutorService executor = Executors.newFixedThreadPool(4);
executor.execute(producer);
executor.execute(producer);
executor.execute(consumer);
executor.execute(consumer);
// 程序会持续运行,如需停止可添加终止逻辑
}
}实战题目 5:使用 CompletableFuture 聚合多个异步任务结果
📘 真实场景背景:
你在开发一个聚合查询接口,需要同时调用多个微服务:
- 获取当前用户信息
- 获取用户订单列表
- 获取用户推荐商品列表
这些任务彼此没有依赖关系,但都需要并发执行,最终主线程收集所有结果并返回统一响应。
目标是尽可能缩短响应时间,同时支持异常处理。
🧩 抽象题目描述:
使用 CompletableFuture 并发执行 3 个异步任务(模拟耗时 1~2 秒),每个任务返回一段数据,主线程等待所有任务完成后聚合这些数据,并输出结果。
解法:使用 CompletableFuture.supplyAsync() + allOf
import java.util.concurrent.*;
import java.util.*;
public class CompletableFutureCombine {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(3);
// 模拟远程调用 1:用户信息
CompletableFuture<String> userFuture = CompletableFuture.supplyAsync(() -> {
sleep(1000);
return "用户信息";
}, executor);
// 模拟远程调用 2:订单信息
CompletableFuture<String> orderFuture = CompletableFuture.supplyAsync(() -> {
sleep(2000);
return "订单列表";
}, executor);
// 模拟远程调用 3:推荐信息
CompletableFuture<String> recommendFuture = CompletableFuture.supplyAsync(() -> {
sleep(1500);
return "推荐商品";
}, executor);
// 合并所有异步任务
CompletableFuture<Void> all = CompletableFuture.allOf(userFuture, orderFuture, recommendFuture);
// 等待所有完成后,统一处理结果
all.thenRun(() -> {
try {
String user = userFuture.get();
String order = orderFuture.get();
String recommend = recommendFuture.get();
System.out.println("聚合结果:");
System.out.println("👤 " + user);
System.out.println("📦 " + order);
System.out.println("🛍️ " + recommend);
} catch (Exception e) {
System.err.println("聚合异常:" + e.getMessage());
} finally {
executor.shutdown();
}
});
// 主线程等待(也可以用 join)
all.join();
}
private static void sleep(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException ignored) {}
}
}版权所有
版权归属:FelixJY
