引言
在现代Java开发中,多线程编程已成为构建高性能、高可用应用的核心技术之一。随着业务复杂度的增加和硬件资源的丰富,如何有效地管理和控制并发执行成为了开发者面临的重要挑战。Java并发包(java.util.concurrent,简称JUC)为解决这一问题提供了丰富的工具类和框架。
本文将深入剖析JUC包中的核心组件,包括AQS、CountDownLatch、CyclicBarrier等关键类的源码实现,并结合实际业务场景,讲解线程池配置优化和并发安全实现方案。通过理论与实践相结合的方式,帮助开发者掌握Java并发编程的核心技术和最佳实践。
Java并发编程基础
什么是并发编程
并发编程是指程序能够同时处理多个任务的技术。在Java中,线程是实现并发的基本单位。每个线程都有自己的执行路径和栈空间,可以独立执行代码片段。
JUC包概述
java.util.concurrent包是Java并发编程的核心库,它提供了丰富的并发工具类,包括:
- 线程池相关类(ExecutorService、ThreadPoolExecutor等)
- 同步工具类(CountDownLatch、CyclicBarrier、Semaphore等)
- 并发集合类(ConcurrentHashMap、CopyOnWriteArrayList等)
- 原子类(AtomicInteger、AtomicReference等)
AQS源码深度解析
AQS简介
AbstractQueuedSynchronizer(AQS)是JUC包中所有同步组件的基类,它提供了一种框架来实现依赖于先进先出(FIFO)等待队列的阻塞锁和相关同步器。AQS的核心思想是将共享资源的状态管理抽象出来,让具体的同步组件实现其特定的获取和释放逻辑。
核心数据结构
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer {
// 同步队列头节点
private transient volatile Node head;
// 同步队列尾节点
private transient volatile Node tail;
// 共享资源状态
private volatile int state;
static final class Node {
// 共享模式
static final Node SHARED = new Node();
// 独占模式
static final Node EXCLUSIVE = null;
// 节点状态
volatile int waitStatus;
// 前驱节点
volatile Node prev;
// 后继节点
volatile Node next;
// 持有该节点的线程
volatile Thread thread;
// 下一个等待共享锁的节点
Node nextWaiter;
}
}
核心实现机制
AQS的核心是通过CAS操作来管理同步状态,其主要方法包括:
- 获取资源:
acquire(int arg)- 通过tryAcquire尝试获取资源 - 释放资源:
release(int arg)- 通过tryRelease释放资源 - 共享模式获取:
acquireShared(int arg)- 共享模式下的获取 - 共享模式释放:
releaseShared(int arg)- 共享模式下的释放
// acquire方法核心实现
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 快速尝试将节点加入队尾
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果快速插入失败,则通过自旋方式入队
enq(node);
return node;
}
private boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 如果前驱节点是头节点,则尝试获取资源
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果获取失败,判断是否需要挂起线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
实际应用示例
以ReentrantLock为例,展示AQS的应用:
public class ReentrantLock implements Lock, java.io.Serializable {
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
// 独占获取
final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 无锁状态,尝试获取锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 已经持有锁的线程,增加计数
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
}
核心同步工具类分析
CountDownLatch源码解析
CountDownLatch是一个同步辅助类,允许一个或多个线程等待其他线程完成操作。
public class CountDownLatch {
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public void countDown() {
sync.releaseShared(1);
}
}
CyclicBarrier源码分析
CyclicBarrier是一个同步辅助类,它允许一组线程互相等待,直到所有线程都到达一个屏障点。
public class CyclicBarrier {
// 内部类,用于管理屏障状态
private static class Generation {
boolean broken = false;
}
private final int parties; // 线程数量
private final Runnable barrierAction; // 屏障动作
private volatile int count; // 当前等待线程数
private final Generation generation = new Generation();
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierAction = barrierAction;
}
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException, TimeoutException {
final Generation g = generation;
// 获取锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
final int index = --count;
if (index == 0) { // 线程数量达到预期
boolean ranAction = false;
try {
final Runnable command = barrierAction;
if (command != null)
command.run();
ranAction = true;
nextGeneration(); // 重置屏障
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// 等待其他线程到达
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.await(nanos, TimeUnit.NANOSECONDS);
} catch (InterruptedException ie) {
if (g == generation && !broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
}
Semaphore源码解析
Semaphore是一个计数信号量,用于控制同时访问特定资源的线程数量。
public class Semaphore implements java.io.Serializable {
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {
setState(permits);
}
final int getPermits() {
return getState();
}
final int tryAcquireShared(int acquires) {
for (;;) {
int current = getState();
int next = current - acquires;
if (next < 0 || compareAndSetState(current, next))
return next;
}
}
final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (compareAndSetState(current, next))
return true;
}
}
}
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public boolean tryAcquire() {
return sync.tryAcquireShared(1) >= 0;
}
public void release() {
sync.releaseShared(1);
}
}
线程池深度解析与优化
线程池核心组件
线程池是并发编程中最重要的概念之一,它通过复用线程来减少创建和销毁线程的开销。
public class ThreadPoolExecutor extends AbstractExecutorService {
// 线程池状态
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// 线程池状态和线程数量的组合
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 工作线程集合
private final HashSet<Worker> workers = new HashSet<Worker>();
// 工作队列
private final BlockingQueue<Runnable> workQueue;
// 线程工厂
private volatile ThreadFactory threadFactory;
// 拒绝策略
private volatile RejectedExecutionHandler handler;
// 空闲时间
private volatile int keepAliveTime;
// 最大线程数
private volatile int maximumPoolSize;
// 核心线程数
private volatile int corePoolSize;
// 工作线程数量
private volatile int poolSize;
// 任务计数器
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
}
线程池工作原理
线程池的工作流程如下:
- 提交任务:当提交一个任务时,首先检查核心线程数是否已满
- 队列排队:如果核心线程都在工作,则将任务放入阻塞队列
- 创建新线程:如果队列满了且线程数小于最大线程数,则创建新线程
- 拒绝策略:如果所有资源都被占用,执行拒绝策略
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 如果当前工作线程小于核心线程数,直接创建新线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 检查线程池是否处于运行状态
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 如果线程池状态改变,移除任务并执行拒绝策略
if (!isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果无法入队,则尝试创建新线程
else if (!addWorker(command, false))
reject(command);
}
线程池配置优化实战
核心参数详解
// 自定义线程池配置示例
public class ThreadPoolConfig {
public static ExecutorService createOptimizedThreadPool() {
// 根据CPU核心数计算最优线程数
int processors = Runtime.getRuntime().availableProcessors();
// 对于CPU密集型任务,线程数=CPU核心数
// 对于IO密集型任务,线程数=CPU核心数 * 2
int corePoolSize = processors;
int maximumPoolSize = processors * 2;
int keepAliveTime = 60; // 秒
return new ThreadPoolExecutor(
corePoolSize, // 核心线程数
maximumPoolSize, // 最大线程数
keepAliveTime, // 空闲时间
TimeUnit.SECONDS, // 时间单位
new LinkedBlockingQueue<>(1000), // 工作队列
Executors.defaultThreadFactory(), // 线程工厂
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
}
// 针对不同场景的线程池配置
public static ExecutorService createCpuIntensivePool() {
int processors = Runtime.getRuntime().availableProcessors();
return new ThreadPoolExecutor(
processors, // 核心线程数等于CPU核心数
processors, // 最大线程数
0L, // 空闲时间
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(100),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
}
public static ExecutorService createIoIntensivePool() {
int processors = Runtime.getRuntime().availableProcessors();
return new ThreadPoolExecutor(
processors * 2, // IO密集型任务需要更多线程
processors * 4,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
}
}
性能监控与调优
public class ThreadPoolMonitor {
public static void monitorThreadPool(ThreadPoolExecutor executor) {
// 获取线程池状态信息
System.out.println("核心线程数: " + executor.getCorePoolSize());
System.out.println("最大线程数: " + executor.getMaximumPoolSize());
System.out.println("当前活跃线程数: " + executor.getActiveCount());
System.out.println("已完成任务数: " + executor.getCompletedTaskCount());
System.out.println("总任务数: " + executor.getTaskCount());
System.out.println("队列大小: " + executor.getQueue().size());
// 监控线程池性能
if (executor.getQueue().size() > 100) {
System.err.println("警告:任务队列过长,可能存在性能问题");
}
if (executor.getActiveCount() == executor.getMaximumPoolSize()) {
System.err.println("警告:线程池已达到最大容量");
}
}
// 自定义拒绝策略
public static class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 记录被拒绝的任务
System.err.println("任务被拒绝: " + r.toString());
// 可以选择重试、记录日志或抛出异常
try {
// 等待一段时间后重试
Thread.sleep(100);
executor.execute(r);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
并发安全实现方案
原子类详解
原子类是Java并发编程中重要的基础工具,它们提供了无锁的线程安全操作。
public class AtomicExample {
// 原子整数
private final AtomicInteger atomicInteger = new AtomicInteger(0);
// 原子引用
private final AtomicReference<String> atomicReference = new AtomicReference<>("initial");
// 原子数组
private final AtomicIntegerArray atomicArray = new AtomicIntegerArray(new int[]{1, 2, 3, 4, 5});
public void demonstrateAtomicOperations() {
// 基本操作
int currentValue = atomicInteger.get();
int newValue = atomicInteger.incrementAndGet(); // 自增并返回新值
// CAS操作
boolean success = atomicInteger.compareAndSet(currentValue, newValue);
// 更新操作
atomicReference.set("new value");
String oldValue = atomicReference.getAndSet("another value");
// 数组操作
int arrayValue = atomicArray.get(0);
atomicArray.incrementAndGet(0);
}
// 原子更新字段
private static class Person {
volatile int age;
final AtomicIntegerFieldUpdater<Person> updater =
AtomicIntegerFieldUpdater.newUpdater(Person.class, "age");
public void updateAge(int newAge) {
updater.set(this, newAge);
}
}
}
并发集合类应用
public class ConcurrentCollectionExample {
// 线程安全的HashMap
private final ConcurrentHashMap<String, Integer> concurrentHashMap =
new ConcurrentHashMap<>();
// 线程安全的列表
private final CopyOnWriteArrayList<String> copyOnWriteList =
new CopyOnWriteArrayList<>();
// 线程安全的Set
private final CopyOnWriteArraySet<String> copyOnWriteSet =
new CopyOnWriteArraySet<>();
public void demonstrateConcurrentCollections() {
// ConcurrentHashMap操作
concurrentHashMap.put("key1", 1);
Integer value = concurrentHashMap.get("key1");
Integer oldValue = concurrentHashMap.putIfAbsent("key2", 2);
// CopyOnWriteArrayList操作
copyOnWriteList.add("item1");
String item = copyOnWriteList.get(0);
// CopyOnWriteArraySet操作
copyOnWriteSet.add("setItem1");
boolean contains = copyOnWriteSet.contains("setItem1");
}
// 实际应用场景:缓存实现
public class Cache<K, V> {
private final ConcurrentHashMap<K, V> cache = new ConcurrentHashMap<>();
public V get(K key) {
return cache.get(key);
}
public V put(K key, V value) {
return cache.put(key, value);
}
public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
return cache.computeIfAbsent(key, mappingFunction);
}
}
}
实际业务场景应用
高并发数据处理场景
public class HighConcurrencyProcessor {
// 使用线程池处理大量数据
private final ExecutorService executor =
new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() * 2,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
// 使用CountDownLatch协调任务完成
public void processBatchData(List<String> data) throws InterruptedException {
int batchSize = 100;
CountDownLatch latch = new CountDownLatch(data.size());
for (int i = 0; i < data.size(); i += batchSize) {
int end = Math.min(i + batchSize, data.size());
List<String> batch = data.subList(i, end);
executor.submit(() -> {
try {
processBatch(batch);
} finally {
latch.countDown();
}
});
}
latch.await(); // 等待所有批次处理完成
}
private void processBatch(List<String> batch) {
for (String item : batch) {
// 处理单个数据项
processItem(item);
}
}
private void processItem(String item) {
// 模拟数据处理逻辑
try {
Thread.sleep(10); // 模拟IO操作
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
分布式任务调度场景
public class DistributedTaskScheduler {
private final ExecutorService executor =
new ThreadPoolExecutor(
10, 20, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(500)
);
private final CyclicBarrier barrier = new CyclicBarrier(10);
public void scheduleDistributedTasks(List<Runnable> tasks) {
List<Future<?>> futures = new ArrayList<>();
for (Runnable task : tasks) {
Future<?> future = executor.submit(() -> {
try {
// 执行任务前等待其他节点准备就绪
barrier.await();
// 执行具体任务
task.run();
// 任务完成后通知其他节点
barrier.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
futures.add(future);
}
// 等待所有任务完成
for (Future<?> future : futures) {
try {
future.get(30, TimeUnit.SECONDS);
} catch (Exception e) {
System.err.println("任务执行失败: " + e.getMessage());
}
}
}
}
性能调优最佳实践
线程池调优策略
public class ThreadPoolOptimization {
// 根据任务类型选择合适的线程池
public static ExecutorService createTaskSpecificPool(TaskType type) {
switch (type) {
case CPU_INTENSIVE:
return createCpuIntensivePool();
case IO_INTENSIVE:
return createIoIntensivePool();
case MIXED:
return createMixedPool();
default:
return Executors.newFixedThreadPool(10);
}
}
private static ExecutorService createCpuIntensivePool() {
int processors = Runtime.getRuntime().availableProcessors();
return new ThreadPoolExecutor(
processors,
processors,
0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(100),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
}
private static ExecutorService createIoIntensivePool() {
int processors = Runtime.getRuntime().availableProcessors();
return new ThreadPoolExecutor(
processors * 2,
processors * 4,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
}
private static ExecutorService createMixedPool() {
return new ThreadPoolExecutor(
10, 20, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(500),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
}
enum TaskType {
CPU_INTENSIVE, IO_INTENSIVE, MIXED
}
// 监控线程池性能的工具类
public static class ThreadPoolMetrics {
private final ThreadPoolExecutor executor;
private final ScheduledExecutorService monitor =
Executors.newScheduledThreadPool(1);
public ThreadPoolMetrics(ThreadPoolExecutor executor) {
this.executor = executor;
}
public void startMonitoring() {
monitor.scheduleAtFixedRate(() -> {
System.out.println("线程池监控信息:");
System.out.println(" 核心线程数: " + executor.getCorePoolSize());
System.out.println(" 活跃线程数: " + executor.getActiveCount());
System.out.println(" 最大线程数: " + executor.getMaximumPoolSize());
System.out.println(" 队列大小: " + executor.getQueue().size());
System.out.println(" 完成任务数: " + executor.getCompletedTaskCount());
}, 0, 5, TimeUnit.SECONDS);
}
}
}
内存优化策略
public class MemoryOptimization {
// 使用对象池减少GC压力
private static final ObjectPool<StringBuilder> stringBuilderPool =
new GenericObjectPool<>(StringBuilder::new);
public void efficientStringBuilding() {
StringBuilder sb = null;
try {
sb = stringBuilderPool.borrowObject();
sb.setLength(0); // 清空缓冲区
// 执行字符串构建操作
for (int i = 0; i < 1000; i++) {
sb.append("item").append(i).append(",");
}
String result = sb.toString();
// 处理结果...
} finally {
if (sb != null) {
stringBuilderPool.returnObject(sb);
}
}
}
// 使用ThreadLocal减少对象创建
private static final ThreadLocal<SimpleDateFormat> dateFormat =
ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
public String formatTime(long time) {
return dateFormat.get().format(new Date(time));
}
// 缓存优化
private final Map<String, Future<String>> cache =
new ConcurrentHashMap<>();
public String getCachedResult(String key, Callable<String> task) {
Future<String> future = cache.get(key);
if (future == null) {
future = executor.submit(task);
Future<String> existing = cache.putIfAbsent(key, future);
if (existing != null) {
future =
评论 (0)