引言
在现代Java开发中,并发编程已成为构建高性能应用的核心技能。随着多核处理器的普及和业务需求的复杂化,如何高效地管理多线程、避免竞态条件、提升系统吞吐量成为了每个开发者必须面对的挑战。Java并发工具包(JUC, java.util.concurrent)为开发者提供了丰富的并发编程工具,包括线程池、同步器、锁机制等核心组件。
本文将深入解析JUC包中的核心类,探讨CountDownLatch、CyclicBarrier、Semaphore等同步工具的使用场景,并结合实际案例分享线程池配置优化的最佳实践,帮助开发者构建更加健壮和高效的并发应用。
JUC包核心组件概述
并发编程的核心概念
Java并发编程的核心在于处理多个线程同时访问共享资源时可能出现的问题。主要问题包括:
- 竞态条件(Race Condition):多个线程同时修改共享变量导致数据不一致
- 死锁(Deadlock):线程相互等待对方释放资源导致程序停滞
- 内存可见性问题:一个线程对变量的修改对其他线程不可见
- 原子性问题:看似简单的操作实际上不是原子性的
JUC包通过提供高级并发工具来解决这些问题,让开发者能够更专注于业务逻辑而非底层同步细节。
JUC包主要组件分类
JUC包主要包含以下几类核心组件:
- 线程池相关:Executor、ExecutorService、ScheduledExecutorService等
- 同步器:CountDownLatch、CyclicBarrier、Semaphore、ReentrantLock等
- 并发集合:ConcurrentHashMap、BlockingQueue等
- 原子操作类:AtomicInteger、AtomicReference等
- 工具类:Future、Callable、CompletableFuture等
CountDownLatch深度解析
基本概念与工作原理
CountDownLatch是一个同步辅助类,它允许一个或多个线程等待其他线程完成操作。它的核心思想是通过一个计数器来控制线程的等待行为。
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
int threadCount = 3;
CountDownLatch latch = new CountDownLatch(threadCount);
// 创建多个工作线程
for (int i = 0; i < threadCount; i++) {
final int taskId = i;
new Thread(() -> {
try {
System.out.println("Thread " + taskId + " is working...");
Thread.sleep(1000); // 模拟工作耗时
System.out.println("Thread " + taskId + " finished");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown(); // 计数器减1
}
}).start();
}
System.out.println("Waiting for all threads to complete...");
latch.await(); // 等待所有线程完成
System.out.println("All threads completed, continuing main thread...");
}
}
实际应用场景
1. 并发测试场景
public class ConcurrentTestExample {
private static final int THREAD_COUNT = 10;
private static final int ITERATIONS = 1000;
public static void main(String[] args) throws InterruptedException {
CountDownLatch startLatch = new CountDownLatch(1);
CountDownLatch endLatch = new CountDownLatch(THREAD_COUNT);
long startTime = System.currentTimeMillis();
// 启动多个并发线程
for (int i = 0; i < THREAD_COUNT; i++) {
new Thread(() -> {
try {
startLatch.await(); // 等待开始信号
// 执行测试任务
for (int j = 0; j < ITERATIONS; j++) {
performTask();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
endLatch.countDown();
}
}).start();
}
// 同时启动所有线程
startLatch.countDown();
// 等待所有线程完成
endLatch.await();
long endTime = System.currentTimeMillis();
System.out.println("Total time: " + (endTime - startTime) + "ms");
}
private static void performTask() {
// 模拟业务逻辑
try {
Thread.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
2. 资源初始化场景
public class ResourceInitializationExample {
private static final CountDownLatch latch = new CountDownLatch(3);
public static void main(String[] args) throws InterruptedException {
// 启动资源初始化线程
new Thread(() -> {
initializeDatabase();
latch.countDown();
}).start();
new Thread(() -> {
initializeCache();
latch.countDown();
}).start();
new Thread(() -> {
initializeConfig();
latch.countDown();
}).start();
// 等待所有资源初始化完成
latch.await();
System.out.println("All resources initialized successfully");
}
private static void initializeDatabase() {
try {
Thread.sleep(1000);
System.out.println("Database initialized");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private static void initializeCache() {
try {
Thread.sleep(500);
System.out.println("Cache initialized");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private static void initializeConfig() {
try {
Thread.sleep(300);
System.out.println("Configuration initialized");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
最佳实践与注意事项
- 计数器不可重置:CountDownLatch的计数器一旦减到0就不能再重置,需要创建新的实例
- 避免死锁:确保所有线程都能正确调用countDown()方法
- 异常处理:在finally块中调用countDown()以确保计数器正确减少
CyclicBarrier深度解析
基本概念与工作原理
CyclicBarrier是一个同步辅助类,它允许一组线程互相等待直到所有线程都到达某个屏障点。与CountDownLatch不同的是,CyclicBarrier可以重复使用。
public class CyclicBarrierExample {
public static void main(String[] args) {
int threadCount = 3;
CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
System.out.println("All threads reached the barrier, continuing...");
});
for (int i = 0; i < threadCount; i++) {
final int taskId = i;
new Thread(() -> {
try {
System.out.println("Thread " + taskId + " is working...");
Thread.sleep(1000);
System.out.println("Thread " + taskId + " reached barrier");
// 等待其他线程到达屏障点
barrier.await();
System.out.println("Thread " + taskId + " continues after barrier");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}
实际应用场景
1. 分布式计算场景
public class DistributedCalculationExample {
private static final int WORKER_COUNT = 4;
private static final int DATA_CHUNK_SIZE = 1000;
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(WORKER_COUNT, () -> {
System.out.println("All workers finished their chunk processing");
});
ExecutorService executor = Executors.newFixedThreadPool(WORKER_COUNT);
for (int i = 0; i < WORKER_COUNT; i++) {
final int workerId = i;
executor.submit(() -> {
try {
// 模拟处理数据块
processChunk(workerId * DATA_CHUNK_SIZE,
(workerId + 1) * DATA_CHUNK_SIZE);
System.out.println("Worker " + workerId + " completed chunk");
// 等待其他工作线程完成
barrier.await();
// 执行汇总操作
if (workerId == 0) {
System.out.println("Performing final aggregation...");
}
} catch (Exception e) {
e.printStackTrace();
}
});
}
executor.shutdown();
}
private static void processChunk(int start, int end) throws InterruptedException {
// 模拟数据处理
Thread.sleep(500);
System.out.println("Processing chunk from " + start + " to " + end);
}
}
2. 性能测试场景
public class PerformanceTestExample {
private static final int THREAD_COUNT = 10;
private static final int TEST_ITERATIONS = 10000;
public static void main(String[] args) throws InterruptedException {
CyclicBarrier barrier = new CyclicBarrier(THREAD_COUNT);
CountDownLatch finishLatch = new CountDownLatch(THREAD_COUNT);
long[] times = new long[THREAD_COUNT];
for (int i = 0; i < THREAD_COUNT; i++) {
final int threadId = i;
new Thread(() -> {
try {
// 等待所有线程准备就绪
barrier.await();
long startTime = System.currentTimeMillis();
// 执行测试操作
for (int j = 0; j < TEST_ITERATIONS; j++) {
performOperation();
}
long endTime = System.currentTimeMillis();
times[threadId] = endTime - startTime;
System.out.println("Thread " + threadId + " completed in " +
times[threadId] + "ms");
} catch (Exception e) {
e.printStackTrace();
} finally {
finishLatch.countDown();
}
}).start();
}
// 等待所有测试完成
finishLatch.await();
// 计算平均时间
long totalTime = 0;
for (long time : times) {
totalTime += time;
}
System.out.println("Average time: " + (totalTime / THREAD_COUNT) + "ms");
}
private static void performOperation() {
// 模拟业务操作
try {
Thread.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
最佳实践与注意事项
- 重置机制:CyclicBarrier支持重复使用,但需要谨慎处理回调函数的执行时机
- 异常处理:await()方法可能抛出BrokenBarrierException,需要适当处理
- 性能考虑:避免在回调函数中执行耗时操作,影响整体性能
Semaphore深度解析
基本概念与工作原理
Semaphore是一个计数信号量,用于控制同时访问特定资源的线程数量。它通过维护一个许可集合来控制并发访问。
public class SemaphoreExample {
public static void main(String[] args) {
// 创建一个最多允许3个线程同时访问的信号量
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 5; i++) {
final int taskId = i;
new Thread(() -> {
try {
System.out.println("Thread " + taskId + " is requesting access");
// 获取许可
semaphore.acquire();
System.out.println("Thread " + taskId + " got access");
// 模拟处理时间
Thread.sleep(2000);
System.out.println("Thread " + taskId + " releasing access");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
// 释放许可
semaphore.release();
}
}).start();
}
}
}
实际应用场景
1. 连接池管理
public class ConnectionPoolExample {
private static final int MAX_CONNECTIONS = 5;
private static final Semaphore semaphore = new Semaphore(MAX_CONNECTIONS);
private static final List<Connection> pool = new ArrayList<>();
static {
// 初始化连接池
for (int i = 0; i < MAX_CONNECTIONS; i++) {
pool.add(new Connection("Connection-" + i));
}
}
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 15; i++) {
final int taskId = i;
executor.submit(() -> {
try {
// 获取数据库连接
Connection connection = getConnection();
System.out.println("Thread " + taskId + " got connection");
// 模拟数据库操作
Thread.sleep(1000);
// 释放连接
releaseConnection(connection);
} catch (Exception e) {
e.printStackTrace();
}
});
}
executor.shutdown();
}
private static Connection getConnection() throws InterruptedException {
semaphore.acquire();
return pool.remove(0); // 简化实现,实际应使用更复杂的机制
}
private static void releaseConnection(Connection connection) {
pool.add(connection);
semaphore.release();
}
static class Connection {
private final String name;
public Connection(String name) {
this.name = name;
}
@Override
public String toString() {
return name;
}
}
}
2. 限流控制
public class RateLimiterExample {
private static final Semaphore semaphore = new Semaphore(10); // 每秒最多10个请求
private static final AtomicInteger requestCount = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(20);
// 模拟每秒100个请求,但限制为每秒10个
for (int i = 0; i < 100; i++) {
final int requestId = i;
executor.submit(() -> {
try {
// 尝试获取许可(限流)
if (semaphore.tryAcquire(100, TimeUnit.MILLISECONDS)) {
System.out.println("Request " + requestId + " processed at " +
System.currentTimeMillis());
requestCount.incrementAndGet();
// 模拟处理时间
Thread.sleep(50);
semaphore.release();
} else {
System.out.println("Request " + requestId + " rejected due to rate limiting");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
System.out.println("Total processed requests: " + requestCount.get());
}
}
最佳实践与注意事项
- 合理设置许可数量:根据系统资源和业务需求设置合适的许可数
- 避免死锁:确保获取和释放许可的配对使用
- 性能优化:对于频繁的许可获取/释放操作,考虑使用tryAcquire()方法避免阻塞
线程池配置优化策略
线程池核心参数详解
线程池的核心配置参数包括:
- corePoolSize:核心线程数,即使空闲也会保持
- maximumPoolSize:最大线程数,超过核心线程数的线程会在空闲时被回收
- keepAliveTime:非核心线程的存活时间
- workQueue:工作队列,用于存储等待执行的任务
- threadFactory:创建新线程时使用的工厂类
- handler:拒绝策略,当任务无法被处理时的处理方式
线程池类型选择
public class ThreadPoolConfigurationExample {
// 1. 固定大小线程池
public static ExecutorService fixedThreadPool() {
return Executors.newFixedThreadPool(10);
}
// 2. 缓存线程池
public static ExecutorService cachedThreadPool() {
return Executors.newCachedThreadPool();
}
// 3. 单线程池
public static ExecutorService singleThreadExecutor() {
return Executors.newSingleThreadExecutor();
}
// 4. 定时任务线程池
public static ScheduledExecutorService scheduledThreadPool() {
return Executors.newScheduledThreadPool(5);
}
// 5. 自定义线程池(推荐)
public static ExecutorService customThreadPool() {
return new ThreadPoolExecutor(
10, // corePoolSize
20, // maximumPoolSize
60L, // keepAliveTime
TimeUnit.SECONDS, // time unit
new LinkedBlockingQueue<>(100), // work queue
Executors.defaultThreadFactory(), // thread factory
new ThreadPoolExecutor.CallerRunsPolicy() // rejection handler
);
}
}
性能调优最佳实践
1. 基于任务特性的配置
public class TaskBasedThreadPoolConfig {
/**
* CPU密集型任务优化配置
*/
public static ExecutorService cpuIntensivePool() {
int processors = Runtime.getRuntime().availableProcessors();
return new ThreadPoolExecutor(
processors, // 核心线程数等于CPU核心数
processors * 2, // 最大线程数
60L, // 空闲时间
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000), // 队列大小
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
}
/**
* IO密集型任务优化配置
*/
public static ExecutorService ioIntensivePool() {
int processors = Runtime.getRuntime().availableProcessors();
return new ThreadPoolExecutor(
processors, // 核心线程数
processors * 4, // 最大线程数(IO密集型需要更多线程)
60L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1000), // 使用有界队列
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
}
}
2. 监控与调优
public class ThreadPoolMonitor {
private final ThreadPoolExecutor executor;
private final ScheduledExecutorService monitor;
public ThreadPoolMonitor(ThreadPoolExecutor executor) {
this.executor = executor;
this.monitor = Executors.newSingleThreadScheduledExecutor();
startMonitoring();
}
private void startMonitoring() {
monitor.scheduleAtFixedRate(() -> {
System.out.println("=== Thread Pool Status ===");
System.out.println("Core Pool Size: " + executor.getCorePoolSize());
System.out.println("Maximum Pool Size: " + executor.getMaximumPoolSize());
System.out.println("Current Pool Size: " + executor.getPoolSize());
System.out.println("Active Threads: " + executor.getActiveCount());
System.out.println("Completed Tasks: " + executor.getCompletedTaskCount());
System.out.println("Total Tasks: " + executor.getTaskCount());
System.out.println("Queue Size: " + executor.getQueue().size());
System.out.println("==========================");
}, 0, 5, TimeUnit.SECONDS);
}
public void shutdown() {
monitor.shutdown();
executor.shutdown();
}
// 使用示例
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, 10, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100)
);
ThreadPoolMonitor monitor = new ThreadPoolMonitor(executor);
// 提交任务
for (int i = 0; i < 20; i++) {
final int taskId = i;
executor.submit(() -> {
try {
System.out.println("Task " + taskId + " started");
Thread.sleep(1000);
System.out.println("Task " + taskId + " completed");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 5秒后关闭监控
try {
Thread.sleep(10000);
monitor.shutdown();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
常见配置优化方案
1. 内存敏感型应用
public class MemorySensitiveConfig {
public static ExecutorService memoryOptimizedPool() {
return new ThreadPoolExecutor(
2, // 少量核心线程
4, // 适度的最大线程数
30L, // 较短的空闲时间
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(50), // 有界队列避免内存溢出
new ThreadFactory() {
private final AtomicInteger threadNumber = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "MemoryOptimizedPool-" + threadNumber.getAndIncrement());
t.setDaemon(false); // 非守护线程
return t;
}
},
new ThreadPoolExecutor.AbortPolicy() // 任务拒绝时直接抛出异常
);
}
}
2. 响应时间敏感型应用
public class ResponseTimeSensitiveConfig {
public static ExecutorService responseTimeOptimizedPool() {
return new ThreadPoolExecutor(
10, // 核心线程数
20, // 最大线程数
30L, // 空闲时间
TimeUnit.SECONDS,
new SynchronousQueue<>(), // 同步队列,立即执行任务
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy() // 调用者运行策略
);
}
}
实际应用案例
大型电商系统并发处理
public class ECommerceSystem {
private final ExecutorService orderProcessingPool;
private final Semaphore inventorySemaphore;
private final CountDownLatch completionLatch;
public ECommerceSystem() {
// 订单处理线程池
this.orderProcessingPool = new ThreadPoolExecutor(
10, 50, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new ThreadFactory() {
private final AtomicInteger threadNumber = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "OrderProcessor-" + threadNumber.getAndIncrement());
t.setDaemon(false);
return t;
}
},
new ThreadPoolExecutor.CallerRunsPolicy()
);
// 库存访问信号量(限制同时访问库存的线程数)
this.inventorySemaphore = new Semaphore(5);
// 完成计数器
this.completionLatch = new CountDownLatch(100);
}
public void processOrder(Order order) {
orderProcessingPool.submit(() -> {
try {
System.out.println("Processing order: " + order.getId());
// 检查库存
if (inventorySemaphore.tryAcquire(100, TimeUnit.MILLISECONDS)) {
try {
// 模拟库存检查和更新
checkAndUpdateInventory(order);
// 处理订单逻辑
processOrderLogic(order);
System.out.println("Order " + order.getId() + " processed successfully");
} finally {
inventorySemaphore.release();
}
} else {
System.err.println("Failed to acquire inventory lock for order: " + order.getId());
}
} catch (Exception e) {
System.err.println("Error processing order: " + order.getId() + ", error: " + e.getMessage());
} finally {
completionLatch.countDown();
}
});
}
private void checkAndUpdateInventory(Order order) throws InterruptedException {
// 模拟库存检查和更新
Thread.sleep(50);
}
private void processOrderLogic(Order order) throws InterruptedException {
// 模拟订单处理逻辑
Thread.sleep(100);
}
public void waitForCompletion() throws InterruptedException {
completionLatch.await();
}
public void shutdown() {
orderProcessingPool.shutdown();
try {
if (!orderProcessingPool.awaitTermination(60, TimeUnit.SECONDS)) {
orderProcessingPool.shutdownNow();
}
} catch (InterruptedException e) {
orderProcessingPool.shutdownNow();
Thread.currentThread().interrupt();
}
}
public static class Order {
private final String id;
private final int quantity;
public Order(String id, int quantity) {
this.id = id;
this.quantity = quantity;
}
public String getId() {
return id;
}
public int getQuantity() {
return quantity;
}
}
public static void main(String[] args) throws InterruptedException {
ECommerceSystem system = new ECommerceSystem();
// 模拟100个订单处理
for (int i = 0; i < 100; i++) {
Order order = new Order("ORDER-" + i, 1);
system.processOrder(order);
}
system.waitForCompletion();
system.shutdown();
System.out.println("All orders processed");
}
}
高并发数据处理场景
public class DataProcessingSystem {
private final ExecutorService processorPool;
private final CyclicBarrier barrier;
private final CountDownLatch completionLatch;
public DataProcessingSystem(int threadCount) {
this.processorPool = new ThreadPoolExecutor(
threadCount, threadCount * 2, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
this.barrier = new CyclicBarrier(threadCount, () -> {
System.out.println("All processors reached barrier, performing aggregation");
});
this.completionLatch = new CountDownLatch(threadCount);
}
public void processData(List<String> dataChunks) {
int chunkSize = dataChunks.size() / 10; // 假设分成10个块
for (int i = 0; i < 10; i++) {
final int startIndex = i * chunkSize;
final int endIndex = (i == 9) ? dataChunks.size() : (i + 1) * chunkSize;
processorPool.submit(() -> {
try {
// 处理数据块
List<String> chunk = dataChunks.subList(startIndex, endIndex);
processChunk(chunk);
System.out.println("Processor completed chunk " + i);
// 等待所有处理器完成当前阶段
barrier.await();
// 执行汇总操作
if (i == 0) {
System.out.println("Performing final data aggregation...");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
completionLatch.countDown();
}
});
}
}
private void processChunk(List<String> chunk) throws InterruptedException {
// 模拟数据处理
Thread.sleep(100);
System.out.println("Processing " + chunk.size() + " items
评论 (0)