Java并发编程深度解析:JUC包源码分析与线程池优化实战

Nora253
Nora253 2026-01-30T20:01:23+08:00
0 0 1

引言

在现代Java开发中,多线程编程已成为构建高性能、高可用应用的核心技术之一。随着业务复杂度的增加和硬件资源的丰富,如何有效地管理和控制并发执行成为了开发者面临的重要挑战。Java并发包(java.util.concurrent,简称JUC)为解决这一问题提供了丰富的工具类和框架。

本文将深入剖析JUC包中的核心组件,包括AQS、CountDownLatch、CyclicBarrier等关键类的源码实现,并结合实际业务场景,讲解线程池配置优化和并发安全实现方案。通过理论与实践相结合的方式,帮助开发者掌握Java并发编程的核心技术和最佳实践。

Java并发编程基础

什么是并发编程

并发编程是指程序能够同时处理多个任务的技术。在Java中,线程是实现并发的基本单位。每个线程都有自己的执行路径和栈空间,可以独立执行代码片段。

JUC包概述

java.util.concurrent包是Java并发编程的核心库,它提供了丰富的并发工具类,包括:

  • 线程池相关类(ExecutorService、ThreadPoolExecutor等)
  • 同步工具类(CountDownLatch、CyclicBarrier、Semaphore等)
  • 并发集合类(ConcurrentHashMap、CopyOnWriteArrayList等)
  • 原子类(AtomicInteger、AtomicReference等)

AQS源码深度解析

AQS简介

AbstractQueuedSynchronizer(AQS)是JUC包中所有同步组件的基类,它提供了一种框架来实现依赖于先进先出(FIFO)等待队列的阻塞锁和相关同步器。AQS的核心思想是将共享资源的状态管理抽象出来,让具体的同步组件实现其特定的获取和释放逻辑。

核心数据结构

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer {
    // 同步队列头节点
    private transient volatile Node head;
    // 同步队列尾节点
    private transient volatile Node tail;
    // 共享资源状态
    private volatile int state;
    
    static final class Node {
        // 共享模式
        static final Node SHARED = new Node();
        // 独占模式
        static final Node EXCLUSIVE = null;
        
        // 节点状态
        volatile int waitStatus;
        // 前驱节点
        volatile Node prev;
        // 后继节点
        volatile Node next;
        // 持有该节点的线程
        volatile Thread thread;
        // 下一个等待共享锁的节点
        Node nextWaiter;
    }
}

核心实现机制

AQS的核心是通过CAS操作来管理同步状态,其主要方法包括:

  1. 获取资源acquire(int arg) - 通过tryAcquire尝试获取资源
  2. 释放资源release(int arg) - 通过tryRelease释放资源
  3. 共享模式获取acquireShared(int arg) - 共享模式下的获取
  4. 共享模式释放releaseShared(int arg) - 共享模式下的释放
// acquire方法核心实现
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // 快速尝试将节点加入队尾
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 如果快速插入失败,则通过自旋方式入队
    enq(node);
    return node;
}

private boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            // 如果前驱节点是头节点,则尝试获取资源
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 如果获取失败,判断是否需要挂起线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

实际应用示例

以ReentrantLock为例,展示AQS的应用:

public class ReentrantLock implements Lock, java.io.Serializable {
    private final Sync sync;
    
    abstract static class Sync extends AbstractQueuedSynchronizer {
        // 独占获取
        final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                // 无锁状态,尝试获取锁
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                // 已经持有锁的线程,增加计数
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }
}

核心同步工具类分析

CountDownLatch源码解析

CountDownLatch是一个同步辅助类,允许一个或多个线程等待其他线程完成操作。

public class CountDownLatch {
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
        
        Sync(int count) {
            setState(count);
        }
        
        int getCount() {
            return getState();
        }
        
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
        
        protected boolean tryReleaseShared(int releases) {
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c - 1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }
    
    private final Sync sync;
    
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
    
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    
    public boolean await(long timeout, TimeUnit unit) 
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
    
    public void countDown() {
        sync.releaseShared(1);
    }
}

CyclicBarrier源码分析

CyclicBarrier是一个同步辅助类,它允许一组线程互相等待,直到所有线程都到达一个屏障点。

public class CyclicBarrier {
    // 内部类,用于管理屏障状态
    private static class Generation {
        boolean broken = false;
    }
    
    private final int parties;  // 线程数量
    private final Runnable barrierAction;  // 屏障动作
    private volatile int count;  // 当前等待线程数
    private final Generation generation = new Generation();
    
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierAction = barrierAction;
    }
    
    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
    
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException, TimeoutException {
        final Generation g = generation;
        
        // 获取锁
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final int index = --count;
            if (index == 0) {  // 线程数量达到预期
                boolean ranAction = false;
                try {
                    final Runnable command = barrierAction;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    nextGeneration(); // 重置屏障
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }
            
            // 等待其他线程到达
            for (;;) {
                try {
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.await(nanos, TimeUnit.NANOSECONDS);
                } catch (InterruptedException ie) {
                    if (g == generation && !broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        Thread.currentThread().interrupt();
                    }
                }
                if (g != generation)
                    return index;
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }
}

Semaphore源码解析

Semaphore是一个计数信号量,用于控制同时访问特定资源的线程数量。

public class Semaphore implements java.io.Serializable {
    private final Sync sync;
    
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;
        
        Sync(int permits) {
            setState(permits);
        }
        
        final int getPermits() {
            return getState();
        }
        
        final int tryAcquireShared(int acquires) {
            for (;;) {
                int current = getState();
                int next = current - acquires;
                if (next < 0 || compareAndSetState(current, next))
                    return next;
            }
        }
        
        final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (compareAndSetState(current, next))
                    return true;
            }
        }
    }
    
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    
    public boolean tryAcquire() {
        return sync.tryAcquireShared(1) >= 0;
    }
    
    public void release() {
        sync.releaseShared(1);
    }
}

线程池深度解析与优化

线程池核心组件

线程池是并发编程中最重要的概念之一,它通过复用线程来减少创建和销毁线程的开销。

public class ThreadPoolExecutor extends AbstractExecutorService {
    // 线程池状态
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
    
    // 线程池状态和线程数量的组合
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
    // 工作线程集合
    private final HashSet<Worker> workers = new HashSet<Worker>();
    
    // 工作队列
    private final BlockingQueue<Runnable> workQueue;
    
    // 线程工厂
    private volatile ThreadFactory threadFactory;
    
    // 拒绝策略
    private volatile RejectedExecutionHandler handler;
    
    // 空闲时间
    private volatile int keepAliveTime;
    
    // 最大线程数
    private volatile int maximumPoolSize;
    
    // 核心线程数
    private volatile int corePoolSize;
    
    // 工作线程数量
    private volatile int poolSize;
    
    // 任务计数器
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
}

线程池工作原理

线程池的工作流程如下:

  1. 提交任务:当提交一个任务时,首先检查核心线程数是否已满
  2. 队列排队:如果核心线程都在工作,则将任务放入阻塞队列
  3. 创建新线程:如果队列满了且线程数小于最大线程数,则创建新线程
  4. 拒绝策略:如果所有资源都被占用,执行拒绝策略
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    
    int c = ctl.get();
    // 如果当前工作线程小于核心线程数,直接创建新线程
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    
    // 检查线程池是否处于运行状态
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 如果线程池状态改变,移除任务并执行拒绝策略
        if (!isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 如果无法入队,则尝试创建新线程
    else if (!addWorker(command, false))
        reject(command);
}

线程池配置优化实战

核心参数详解

// 自定义线程池配置示例
public class ThreadPoolConfig {
    
    public static ExecutorService createOptimizedThreadPool() {
        // 根据CPU核心数计算最优线程数
        int processors = Runtime.getRuntime().availableProcessors();
        
        // 对于CPU密集型任务,线程数=CPU核心数
        // 对于IO密集型任务,线程数=CPU核心数 * 2
        int corePoolSize = processors;
        int maximumPoolSize = processors * 2;
        int keepAliveTime = 60; // 秒
        
        return new ThreadPoolExecutor(
            corePoolSize,           // 核心线程数
            maximumPoolSize,        // 最大线程数
            keepAliveTime,          // 空闲时间
            TimeUnit.SECONDS,       // 时间单位
            new LinkedBlockingQueue<>(1000),  // 工作队列
            Executors.defaultThreadFactory(), // 线程工厂
            new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
        );
    }
    
    // 针对不同场景的线程池配置
    public static ExecutorService createCpuIntensivePool() {
        int processors = Runtime.getRuntime().availableProcessors();
        return new ThreadPoolExecutor(
            processors,             // 核心线程数等于CPU核心数
            processors,             // 最大线程数
            0L,                     // 空闲时间
            TimeUnit.MILLISECONDS,
            new ArrayBlockingQueue<>(100),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy()
        );
    }
    
    public static ExecutorService createIoIntensivePool() {
        int processors = Runtime.getRuntime().availableProcessors();
        return new ThreadPoolExecutor(
            processors * 2,         // IO密集型任务需要更多线程
            processors * 4,
            60L,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(1000),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.CallerRunsPolicy()
        );
    }
}

性能监控与调优

public class ThreadPoolMonitor {
    
    public static void monitorThreadPool(ThreadPoolExecutor executor) {
        // 获取线程池状态信息
        System.out.println("核心线程数: " + executor.getCorePoolSize());
        System.out.println("最大线程数: " + executor.getMaximumPoolSize());
        System.out.println("当前活跃线程数: " + executor.getActiveCount());
        System.out.println("已完成任务数: " + executor.getCompletedTaskCount());
        System.out.println("总任务数: " + executor.getTaskCount());
        System.out.println("队列大小: " + executor.getQueue().size());
        
        // 监控线程池性能
        if (executor.getQueue().size() > 100) {
            System.err.println("警告:任务队列过长,可能存在性能问题");
        }
        
        if (executor.getActiveCount() == executor.getMaximumPoolSize()) {
            System.err.println("警告:线程池已达到最大容量");
        }
    }
    
    // 自定义拒绝策略
    public static class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            // 记录被拒绝的任务
            System.err.println("任务被拒绝: " + r.toString());
            
            // 可以选择重试、记录日志或抛出异常
            try {
                // 等待一段时间后重试
                Thread.sleep(100);
                executor.execute(r);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

并发安全实现方案

原子类详解

原子类是Java并发编程中重要的基础工具,它们提供了无锁的线程安全操作。

public class AtomicExample {
    
    // 原子整数
    private final AtomicInteger atomicInteger = new AtomicInteger(0);
    
    // 原子引用
    private final AtomicReference<String> atomicReference = new AtomicReference<>("initial");
    
    // 原子数组
    private final AtomicIntegerArray atomicArray = new AtomicIntegerArray(new int[]{1, 2, 3, 4, 5});
    
    public void demonstrateAtomicOperations() {
        // 基本操作
        int currentValue = atomicInteger.get();
        int newValue = atomicInteger.incrementAndGet(); // 自增并返回新值
        
        // CAS操作
        boolean success = atomicInteger.compareAndSet(currentValue, newValue);
        
        // 更新操作
        atomicReference.set("new value");
        String oldValue = atomicReference.getAndSet("another value");
        
        // 数组操作
        int arrayValue = atomicArray.get(0);
        atomicArray.incrementAndGet(0);
    }
    
    // 原子更新字段
    private static class Person {
        volatile int age;
        final AtomicIntegerFieldUpdater<Person> updater = 
            AtomicIntegerFieldUpdater.newUpdater(Person.class, "age");
        
        public void updateAge(int newAge) {
            updater.set(this, newAge);
        }
    }
}

并发集合类应用

public class ConcurrentCollectionExample {
    
    // 线程安全的HashMap
    private final ConcurrentHashMap<String, Integer> concurrentHashMap = 
        new ConcurrentHashMap<>();
    
    // 线程安全的列表
    private final CopyOnWriteArrayList<String> copyOnWriteList = 
        new CopyOnWriteArrayList<>();
    
    // 线程安全的Set
    private final CopyOnWriteArraySet<String> copyOnWriteSet = 
        new CopyOnWriteArraySet<>();
    
    public void demonstrateConcurrentCollections() {
        // ConcurrentHashMap操作
        concurrentHashMap.put("key1", 1);
        Integer value = concurrentHashMap.get("key1");
        Integer oldValue = concurrentHashMap.putIfAbsent("key2", 2);
        
        // CopyOnWriteArrayList操作
        copyOnWriteList.add("item1");
        String item = copyOnWriteList.get(0);
        
        // CopyOnWriteArraySet操作
        copyOnWriteSet.add("setItem1");
        boolean contains = copyOnWriteSet.contains("setItem1");
    }
    
    // 实际应用场景:缓存实现
    public class Cache<K, V> {
        private final ConcurrentHashMap<K, V> cache = new ConcurrentHashMap<>();
        
        public V get(K key) {
            return cache.get(key);
        }
        
        public V put(K key, V value) {
            return cache.put(key, value);
        }
        
        public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
            return cache.computeIfAbsent(key, mappingFunction);
        }
    }
}

实际业务场景应用

高并发数据处理场景

public class HighConcurrencyProcessor {
    
    // 使用线程池处理大量数据
    private final ExecutorService executor = 
        new ThreadPoolExecutor(
            Runtime.getRuntime().availableProcessors(),
            Runtime.getRuntime().availableProcessors() * 2,
            60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(1000),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.CallerRunsPolicy()
        );
    
    // 使用CountDownLatch协调任务完成
    public void processBatchData(List<String> data) throws InterruptedException {
        int batchSize = 100;
        CountDownLatch latch = new CountDownLatch(data.size());
        
        for (int i = 0; i < data.size(); i += batchSize) {
            int end = Math.min(i + batchSize, data.size());
            List<String> batch = data.subList(i, end);
            
            executor.submit(() -> {
                try {
                    processBatch(batch);
                } finally {
                    latch.countDown();
                }
            });
        }
        
        latch.await(); // 等待所有批次处理完成
    }
    
    private void processBatch(List<String> batch) {
        for (String item : batch) {
            // 处理单个数据项
            processItem(item);
        }
    }
    
    private void processItem(String item) {
        // 模拟数据处理逻辑
        try {
            Thread.sleep(10); // 模拟IO操作
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

分布式任务调度场景

public class DistributedTaskScheduler {
    
    private final ExecutorService executor = 
        new ThreadPoolExecutor(
            10, 20, 60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(500)
        );
    
    private final CyclicBarrier barrier = new CyclicBarrier(10);
    
    public void scheduleDistributedTasks(List<Runnable> tasks) {
        List<Future<?>> futures = new ArrayList<>();
        
        for (Runnable task : tasks) {
            Future<?> future = executor.submit(() -> {
                try {
                    // 执行任务前等待其他节点准备就绪
                    barrier.await();
                    
                    // 执行具体任务
                    task.run();
                    
                    // 任务完成后通知其他节点
                    barrier.await();
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });
            
            futures.add(future);
        }
        
        // 等待所有任务完成
        for (Future<?> future : futures) {
            try {
                future.get(30, TimeUnit.SECONDS);
            } catch (Exception e) {
                System.err.println("任务执行失败: " + e.getMessage());
            }
        }
    }
}

性能调优最佳实践

线程池调优策略

public class ThreadPoolOptimization {
    
    // 根据任务类型选择合适的线程池
    public static ExecutorService createTaskSpecificPool(TaskType type) {
        switch (type) {
            case CPU_INTENSIVE:
                return createCpuIntensivePool();
            case IO_INTENSIVE:
                return createIoIntensivePool();
            case MIXED:
                return createMixedPool();
            default:
                return Executors.newFixedThreadPool(10);
        }
    }
    
    private static ExecutorService createCpuIntensivePool() {
        int processors = Runtime.getRuntime().availableProcessors();
        return new ThreadPoolExecutor(
            processors,
            processors,
            0L, TimeUnit.MILLISECONDS,
            new ArrayBlockingQueue<>(100),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy()
        );
    }
    
    private static ExecutorService createIoIntensivePool() {
        int processors = Runtime.getRuntime().availableProcessors();
        return new ThreadPoolExecutor(
            processors * 2,
            processors * 4,
            60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(1000),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.CallerRunsPolicy()
        );
    }
    
    private static ExecutorService createMixedPool() {
        return new ThreadPoolExecutor(
            10, 20, 60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(500),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.CallerRunsPolicy()
        );
    }
    
    enum TaskType {
        CPU_INTENSIVE, IO_INTENSIVE, MIXED
    }
    
    // 监控线程池性能的工具类
    public static class ThreadPoolMetrics {
        private final ThreadPoolExecutor executor;
        private final ScheduledExecutorService monitor = 
            Executors.newScheduledThreadPool(1);
        
        public ThreadPoolMetrics(ThreadPoolExecutor executor) {
            this.executor = executor;
        }
        
        public void startMonitoring() {
            monitor.scheduleAtFixedRate(() -> {
                System.out.println("线程池监控信息:");
                System.out.println("  核心线程数: " + executor.getCorePoolSize());
                System.out.println("  活跃线程数: " + executor.getActiveCount());
                System.out.println("  最大线程数: " + executor.getMaximumPoolSize());
                System.out.println("  队列大小: " + executor.getQueue().size());
                System.out.println("  完成任务数: " + executor.getCompletedTaskCount());
            }, 0, 5, TimeUnit.SECONDS);
        }
    }
}

内存优化策略

public class MemoryOptimization {
    
    // 使用对象池减少GC压力
    private static final ObjectPool<StringBuilder> stringBuilderPool = 
        new GenericObjectPool<>(StringBuilder::new);
    
    public void efficientStringBuilding() {
        StringBuilder sb = null;
        try {
            sb = stringBuilderPool.borrowObject();
            sb.setLength(0); // 清空缓冲区
            
            // 执行字符串构建操作
            for (int i = 0; i < 1000; i++) {
                sb.append("item").append(i).append(",");
            }
            
            String result = sb.toString();
            // 处理结果...
            
        } finally {
            if (sb != null) {
                stringBuilderPool.returnObject(sb);
            }
        }
    }
    
    // 使用ThreadLocal减少对象创建
    private static final ThreadLocal<SimpleDateFormat> dateFormat = 
        ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
    
    public String formatTime(long time) {
        return dateFormat.get().format(new Date(time));
    }
    
    // 缓存优化
    private final Map<String, Future<String>> cache = 
        new ConcurrentHashMap<>();
    
    public String getCachedResult(String key, Callable<String> task) {
        Future<String> future = cache.get(key);
        if (future == null) {
            future = executor.submit(task);
            Future<String> existing = cache.putIfAbsent(key, future);
            if (existing != null) {
                future =
相关推荐
广告位招租

相似文章

    评论 (0)

    0/2000