引言
在现代Java开发中,多线程编程已成为构建高性能应用的核心技能。随着硬件架构的发展和业务需求的增长,如何有效地处理并发问题,避免竞态条件、死锁等常见问题,成为了每个Java开发者必须掌握的技能。Java并发包(java.util.concurrent,简称JUC)为开发者提供了丰富的并发工具类和框架,使得编写高效、安全的多线程程序变得更加简单。
本文将深入分析JUC包的核心组件,包括AQS框架、原子类、并发集合、线程池等,并通过大量实际代码示例演示如何正确使用这些组件构建高并发、高性能的Java应用。我们将重点关注常见的并发安全问题和性能瓶颈,并提供相应的最佳实践指导。
Java并发编程基础
并发与并行的区别
在深入JUC包之前,我们首先需要明确并发(Concurrency)与并行(Parallelism)的区别:
- 并发:多个任务在同一时间段内交替执行,通过时间片轮转实现
- 并行:多个任务真正同时执行,需要多核CPU支持
Java中的并发编程主要解决的是如何在单个或多个线程中安全地处理共享资源的问题。
线程安全的核心概念
线程安全是指多个线程访问同一共享资源时,程序能够正确执行而不会产生错误结果。关键要素包括:
- 原子性:操作不可分割
- 可见性:一个线程对共享变量的修改对其他线程可见
- 有序性:指令重排序不影响程序正确性
AQS框架深度解析
AQS简介
AbstractQueuedSynchronizer(AQS)是Java并发包中的核心框架,它提供了一个用于实现依赖于先进先出(FIFO)等待队列的阻塞同步器的基础框架。许多并发组件如ReentrantLock、Semaphore、CountDownLatch等都是基于AQS构建的。
AQS的核心机制
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer {
// 节点状态
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
// 同步队列头节点
private transient volatile Node head;
// 同步队列尾节点
private transient volatile Node tail;
// 独占模式下同步状态
private volatile int state;
// 节点类
static final class Node {
volatile Node prev; // 前驱节点
volatile Node next; // 后继节点
Thread thread; // 等待线程
Node nextWaiter; // 等待队列中的下一个节点
int waitStatus; // 节点状态
}
}
AQS的工作原理
AQS通过以下机制实现同步控制:
- 状态管理:使用
state变量管理同步状态 - 队列机制:当获取锁失败时,线程会被加入等待队列
- CAS操作:通过原子操作修改状态和节点信息
- 阻塞唤醒:使用
LockSupport.park()和unpark()进行线程阻塞
自定义AQS示例
public class MyLock extends AbstractQueuedSynchronizer {
private static final int LOCKED = 1;
private static final int UNLOCKED = 0;
public void lock() {
// 尝试获取锁,如果失败则加入等待队列
if (compareAndSetState(UNLOCKED, LOCKED)) {
setExclusiveOwnerThread(Thread.currentThread());
} else {
acquire(1);
}
}
public void unlock() {
// 释放锁
setState(UNLOCKED);
setExclusiveOwnerThread(null);
}
@Override
protected boolean tryAcquire(int arg) {
return compareAndSetState(UNLOCKED, LOCKED);
}
@Override
protected boolean tryRelease(int arg) {
setState(UNLOCKED);
return true;
}
@Override
protected boolean isHeldExclusively() {
return getState() == LOCKED;
}
}
原子类详解
原子类的核心价值
原子类是JUC包中处理并发安全的轻量级解决方案。它们通过底层的CAS(Compare-And-Swap)操作保证操作的原子性,避免了传统synchronized关键字带来的性能开销。
常用原子类介绍
AtomicInteger
public class AtomicIntegerExample {
private static final AtomicInteger counter = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
// 原子递增操作
for (int i = 0; i < 1000; i++) {
new Thread(() -> {
counter.incrementAndGet(); // 等价于 counter++
}).start();
}
Thread.sleep(1000);
System.out.println("最终计数器值: " + counter.get());
}
}
AtomicLong
public class AtomicLongExample {
private static final AtomicLong counter = new AtomicLong(0L);
public static void main(String[] args) {
// 多线程环境下的安全计数
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10000; i++) {
executor.submit(() -> {
counter.incrementAndGet();
});
}
executor.shutdown();
try {
executor.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("最终计数: " + counter.get());
}
}
AtomicReference
public class AtomicReferenceExample {
static class Person {
private String name;
private int age;
public Person(String name, int age) {
this.name = name;
this.age = age;
}
// getter和setter方法
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public int getAge() { return age; }
public void setAge(int age) { this.age = age; }
}
public static void main(String[] args) {
AtomicReference<Person> personRef = new AtomicReference<>(new Person("张三", 25));
// 原子更新对象
personRef.updateAndGet(person -> new Person(person.getName(), person.getAge() + 1));
System.out.println("姓名: " + personRef.get().getName() +
", 年龄: " + personRef.get().getAge());
}
}
原子类的性能优势
相比传统的synchronized关键字,原子类具有以下优势:
public class AtomicVsSynchronized {
private static int syncCounter = 0;
private static final AtomicInteger atomicCounter = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
long startTime = System.currentTimeMillis();
// 测试synchronized性能
testSync();
long syncTime = System.currentTimeMillis() - startTime;
startTime = System.currentTimeMillis();
// 测试原子类性能
testAtomic();
long atomicTime = System.currentTimeMillis() - startTime;
System.out.println("Synchronized耗时: " + syncTime + "ms");
System.out.println("Atomic耗时: " + atomicTime + "ms");
}
private static void testSync() {
ExecutorService executor = Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(1000);
for (int i = 0; i < 1000; i++) {
executor.submit(() -> {
synchronized (AtomicVsSynchronized.class) {
syncCounter++;
}
latch.countDown();
});
}
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
executor.shutdown();
}
private static void testAtomic() {
ExecutorService executor = Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(1000);
for (int i = 0; i < 1000; i++) {
executor.submit(() -> {
atomicCounter.incrementAndGet();
latch.countDown();
});
}
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
executor.shutdown();
}
}
并发集合详解
ConcurrentHashMap深入分析
ConcurrentHashMap是JUC包中最核心的并发集合类之一,它在保证线程安全的同时提供了极高的并发性能。
public class ConcurrentHashMapExample {
private static final ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
public static void main(String[] args) {
// 并发写入测试
ExecutorService executor = Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(100);
for (int i = 0; i < 100; i++) {
final int index = i;
executor.submit(() -> {
map.put("key" + index, index);
latch.countDown();
});
}
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Map大小: " + map.size());
executor.shutdown();
}
}
其他重要并发集合
CopyOnWriteArrayList
public class CopyOnWriteArrayListExample {
private static final CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
public static void main(String[] args) {
// 读写分离,适合读多写少的场景
ExecutorService executor = Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(100);
// 多线程添加元素
for (int i = 0; i < 50; i++) {
final int index = i;
executor.submit(() -> {
list.add("element" + index);
latch.countDown();
});
}
// 多线程遍历(不会抛出ConcurrentModificationException)
for (int i = 0; i < 50; i++) {
executor.submit(() -> {
for (String element : list) {
System.out.println(element);
}
latch.countDown();
});
}
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
executor.shutdown();
}
}
BlockingQueue
public class BlockingQueueExample {
private static final BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
public static void main(String[] args) throws InterruptedException {
ExecutorService producerExecutor = Executors.newFixedThreadPool(3);
ExecutorService consumerExecutor = Executors.newFixedThreadPool(2);
// 生产者
for (int i = 0; i < 5; i++) {
final int index = i;
producerExecutor.submit(() -> {
try {
for (int j = 0; j < 10; j++) {
queue.put(index * 10 + j);
System.out.println("生产: " + (index * 10 + j));
Thread.sleep(100);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 消费者
for (int i = 0; i < 2; i++) {
consumerExecutor.submit(() -> {
try {
while (true) {
Integer value = queue.take();
System.out.println("消费: " + value);
Thread.sleep(200);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
Thread.sleep(5000);
producerExecutor.shutdown();
consumerExecutor.shutdown();
}
}
线程池详解与最佳实践
线程池核心概念
线程池是管理线程生命周期的工具,它避免了频繁创建和销毁线程带来的开销。
public class ThreadPoolExample {
public static void main(String[] args) {
// 创建固定大小的线程池
ExecutorService fixedPool = Executors.newFixedThreadPool(5);
// 创建缓存线程池
ExecutorService cachedPool = Executors.newCachedThreadPool();
// 创建单线程池
ExecutorService singlePool = Executors.newSingleThreadExecutor();
// 创建定时任务线程池
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(3);
// 提交任务
for (int i = 0; i < 10; i++) {
final int taskId = i;
fixedPool.submit(() -> {
System.out.println("任务 " + taskId + " 执行线程: " +
Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
fixedPool.shutdown();
}
}
线程池参数详解
public class ThreadPoolConfigExample {
public static void main(String[] args) {
// 自定义线程池配置
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, // 核心线程数
10, // 最大线程数
60L, // 空闲线程存活时间
TimeUnit.SECONDS, // 时间单位
new LinkedBlockingQueue<>(100), // 工作队列
Executors.defaultThreadFactory(), // 线程工厂
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
// 提交任务
for (int i = 0; i < 20; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("执行任务 " + taskId +
",当前线程: " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
executor.shutdown();
}
}
线程池最佳实践
1. 合理设置线程池参数
public class ThreadPoolBestPractices {
/**
* 根据任务类型计算最优线程数
*/
public static int calculateOptimalThreads() {
// CPU密集型任务
int cpuCores = Runtime.getRuntime().availableProcessors();
int cpuIntensiveThreads = cpuCores;
// IO密集型任务
int ioIntensiveThreads = cpuCores * 2;
return cpuIntensiveThreads; // 示例返回CPU密集型线程数
}
/**
* 线程池配置示例
*/
public static ExecutorService createOptimalThreadPool() {
int corePoolSize = Runtime.getRuntime().availableProcessors();
int maximumPoolSize = corePoolSize * 2;
long keepAliveTime = 60L;
return new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
}
public static void main(String[] args) {
ExecutorService executor = createOptimalThreadPool();
// 任务执行
for (int i = 0; i < 100; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("处理任务 " + taskId +
",线程: " + Thread.currentThread().getName());
// 模拟业务处理
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
executor.shutdown();
}
}
2. 合理选择拒绝策略
public class RejectionPolicyExample {
public static void main(String[] args) throws InterruptedException {
// 1. AbortPolicy - 直接抛出异常
testRejectionPolicy(new ThreadPoolExecutor.AbortPolicy());
// 2. CallerRunsPolicy - 调用者线程执行
testRejectionPolicy(new ThreadPoolExecutor.CallerRunsPolicy());
// 3. DiscardPolicy - 直接丢弃
testRejectionPolicy(new ThreadPoolExecutor.DiscardPolicy());
// 4. DiscardOldestPolicy - 丢弃最老的任务
testRejectionPolicy(new ThreadPoolExecutor.DiscardOldestPolicy());
}
private static void testRejectionPolicy(ThreadPoolExecutor.RejectedExecutionHandler handler) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 2, 1L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2),
Executors.defaultThreadFactory(),
handler
);
// 提交足够多的任务触发拒绝策略
for (int i = 0; i < 10; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("执行任务 " + taskId);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
}
}
3. 监控线程池状态
public class ThreadPoolMonitor {
public static void monitorThreadPool(ThreadPoolExecutor executor) {
ScheduledExecutorService monitor = Executors.newScheduledThreadPool(1);
monitor.scheduleAtFixedRate(() -> {
System.out.println("=== 线程池监控 ===");
System.out.println("核心线程数: " + executor.getCorePoolSize());
System.out.println("最大线程数: " + executor.getMaximumPoolSize());
System.out.println("当前活跃线程数: " + executor.getActiveCount());
System.out.println("已完成任务数: " + executor.getCompletedTaskCount());
System.out.println("总任务数: " + executor.getTaskCount());
System.out.println("队列大小: " + executor.getQueue().size());
System.out.println("================");
}, 0, 2, TimeUnit.SECONDS);
}
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, 10, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100)
);
monitorThreadPool(executor);
// 提交任务
for (int i = 0; i < 20; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("处理任务 " + taskId);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 保持程序运行
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
executor.shutdown();
}
}
并发安全问题与解决方案
常见并发问题分析
1. 竞态条件
public class RaceConditionExample {
// 不安全的计数器
private static int unsafeCounter = 0;
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(1000);
for (int i = 0; i < 1000; i++) {
executor.submit(() -> {
unsafeCounter++; // 竞态条件
latch.countDown();
});
}
latch.await();
System.out.println("不安全计数器结果: " + unsafeCounter); // 结果通常小于1000
executor.shutdown();
}
}
2. 死锁问题
public class DeadlockExample {
private static final Object lock1 = new Object();
private static final Object lock2 = new Object();
public static void main(String[] args) {
Thread thread1 = new Thread(() -> {
synchronized (lock1) {
System.out.println("线程1获取锁1");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("线程1等待锁2");
synchronized (lock2) {
System.out.println("线程1获取锁2");
}
}
});
Thread thread2 = new Thread(() -> {
synchronized (lock2) {
System.out.println("线程2获取锁2");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("线程2等待锁1");
synchronized (lock1) {
System.out.println("线程2获取锁1");
}
}
});
thread1.start();
thread2.start();
try {
thread1.join();
thread2.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
解决方案
1. 使用synchronized关键字
public class SynchronizedSolution {
private static int safeCounter = 0;
public static synchronized void increment() {
safeCounter++;
}
public static synchronized int getCounter() {
return safeCounter;
}
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(1000);
for (int i = 0; i < 1000; i++) {
executor.submit(() -> {
increment();
latch.countDown();
});
}
latch.await();
System.out.println("安全计数器结果: " + getCounter()); // 结果为1000
executor.shutdown();
}
}
2. 使用Lock接口
public class LockSolution {
private static final ReentrantLock lock = new ReentrantLock();
private static int counter = 0;
public static void increment() {
lock.lock();
try {
counter++;
} finally {
lock.unlock();
}
}
public static int getCounter() {
lock.lock();
try {
return counter;
} finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(1000);
for (int i = 0; i < 1000; i++) {
executor.submit(() -> {
increment();
latch.countDown();
});
}
latch.await();
System.out.println("Lock安全计数器结果: " + getCounter());
executor.shutdown();
}
}
性能优化与最佳实践
1. 线程池调优策略
public class ThreadPoolTuning {
/**
* 根据任务类型选择合适的线程池
*/
public static ExecutorService createTaskSpecificPool(int taskType) {
switch (taskType) {
case 1: // CPU密集型任务
return new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors(),
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100)
);
case 2: // IO密集型任务
return new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors() * 2,
Runtime.getRuntime().availableProcessors() * 2,
60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100)
);
default:
return Executors.newFixedThreadPool(5);
}
}
public static void main(String[] args) {
ExecutorService executor = createTaskSpecificPool(2); // IO密集型
// 模拟IO密集型任务
for (int i = 0; i < 100; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("开始处理任务 " + taskId);
try {
// 模拟IO等待
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("完成任务 " + taskId);
});
}
executor.shutdown();
}
}
2. 减少锁竞争
public class LockContentionReduction {
// 使用读写锁减少写操作对读操作的影响
private static final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private static final Lock readLock = readWriteLock.readLock();
private static final Lock writeLock = readWriteLock.writeLock();
private static volatile Map<String, String> dataMap = new ConcurrentHashMap<>();
public static String getData(String key) {
readLock.lock();
try {
return dataMap.get(key);
} finally {
readLock.unlock();
}
}
public static void putData(String key, String value) {
writeLock.lock();
try {
dataMap.put(key, value);
} finally {
writeLock.unlock();
}
}
// 使用原子类减少锁竞争
private static final AtomicLong atomicCounter = new AtomicLong(0);
public static long incrementCounter() {
return atomicCounter.incrementAndGet();
}
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(1000);
// 测试原子类性能
for (int i = 0; i < 1000; i++) {
executor.submit(() -> {
incrementCounter();
latch.countDown();
});
}
latch.await();
System.out.println("原子计数器结果: " + atomicCounter.get());
executor.shutdown();
}
}
3. 合理使用并发工具类
public class ConcurrentUtilsExample {
/**
* 使用CountDownLatch等待所有任务完成
*/
public static void useCountDownLatch() throws InterruptedException {
int taskCount = 10;
CountDownLatch latch = new CountDownLatch(taskCount);
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 0; i < taskCount; i++) {
final int taskId = i;
executor.submit(() -> {
try {
System.out.println("任务 " + taskId + " 开始执行");
Thread.sleep(1000);
System.out.println("任务 " + taskId + " 执行完成");

评论 (0)