Java并发编程异常处理深度解析:线程池异常捕获、CompletableFuture错误处理与监控告警
引言
在现代Java应用开发中,并发编程已成为构建高性能、高吞吐量系统的核心技术。然而,并发环境下的异常处理却是一个复杂且容易被忽视的问题。线程池中的任务异常、CompletableFuture链式调用中的错误传播、以及异步任务的监控告警,都是开发者在实际项目中经常遇到的挑战。
本文将深入探讨Java并发编程中的异常处理机制,从线程池的异常捕获到CompletableFuture的错误处理,再到异步任务的监控告警,为开发者提供一套完整的解决方案和最佳实践。
线程池异常处理机制详解
线程池异常处理的重要性
在线程池中执行的任务如果抛出未捕获的异常,会导致线程终止,线程池会创建新的线程来替代终止的线程。这种行为在某些情况下可能导致资源泄露或任务丢失。因此,正确处理线程池中的异常至关重要。
默认异常处理行为
首先,让我们看看线程池的默认异常处理行为:
import java.util.concurrent.*;
public class ThreadPoolExceptionDemo {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(2);
// 提交一个会抛出异常的任务
executor.submit(() -> {
System.out.println("Task started");
throw new RuntimeException("Task failed!");
});
// 提交一个正常任务
executor.submit(() -> {
System.out.println("Normal task executed");
});
executor.shutdown();
}
}
运行上述代码,我们会发现异常被吞没了,但正常任务仍然可以执行。这是因为submit()方法会捕获异常并将其封装在返回的Future对象中。
通过Future获取异常
使用submit()方法提交任务时,可以通过Future.get()方法获取异常:
import java.util.concurrent.*;
public class FutureExceptionHandling {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(2);
Future<?> future = executor.submit(() -> {
System.out.println("Task started");
throw new RuntimeException("Task failed!");
});
try {
future.get(); // 这里会抛出ExecutionException
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println("Task was interrupted");
} catch (ExecutionException e) {
System.err.println("Task failed with exception: " + e.getCause());
}
executor.shutdown();
}
}
自定义ThreadFactory处理异常
通过自定义ThreadFactory可以为线程池中的线程设置未捕获异常处理器:
import java.util.concurrent.*;
public class CustomThreadFactory implements ThreadFactory {
private final ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory();
private final Thread.UncaughtExceptionHandler exceptionHandler;
public CustomThreadFactory(Thread.UncaughtExceptionHandler exceptionHandler) {
this.exceptionHandler = exceptionHandler;
}
@Override
public Thread newThread(Runnable r) {
Thread thread = defaultThreadFactory.newThread(r);
thread.setUncaughtExceptionHandler(exceptionHandler);
return thread;
}
public static void main(String[] args) {
Thread.UncaughtExceptionHandler handler = (thread, throwable) -> {
System.err.println("Thread " + thread.getName() +
" threw exception: " + throwable.getMessage());
throwable.printStackTrace();
};
ExecutorService executor = new ThreadPoolExecutor(
2, 4, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new CustomThreadFactory(handler)
);
executor.submit(() -> {
System.out.println("Task started");
throw new RuntimeException("Task failed!");
});
// 等待一段时间让异常处理完成
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
executor.shutdown();
}
}
实现RejectedExecutionHandler处理拒绝策略异常
当线程池无法接受新任务时,可以通过自定义RejectedExecutionHandler来处理:
import java.util.concurrent.*;
public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.err.println("Task rejected: " + r.toString());
// 可以选择将任务记录到日志、放入队列或执行其他处理逻辑
if (!executor.isShutdown()) {
try {
// 尝试放入阻塞队列
executor.getQueue().put(r);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println("Interrupted while trying to queue rejected task");
}
}
}
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1, 1, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1),
new CustomRejectedExecutionHandler()
);
// 提交多个任务,触发拒绝策略
for (int i = 0; i < 5; i++) {
final int taskId = i;
executor.submit(() -> {
try {
Thread.sleep(2000); // 模拟耗时任务
System.out.println("Task " + taskId + " completed");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
executor.shutdown();
}
}
CompletableFuture错误处理机制
CompletableFuture异常传播机制
CompletableFuture在链式调用中具有独特的异常传播机制。异常会在链中传播,直到遇到适当的异常处理方法。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureExceptionPropagation {
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
System.out.println("Step 1: Starting computation");
throw new RuntimeException("Computation failed in step 1");
})
.thenApply(result -> {
System.out.println("Step 2: Processing result");
return result.toUpperCase();
})
.thenApply(result -> {
System.out.println("Step 3: Final processing");
return "Processed: " + result;
});
try {
String result = future.get();
System.out.println("Final result: " + result);
} catch (InterruptedException | ExecutionException e) {
System.err.println("Operation failed: " + e.getCause().getMessage());
}
}
}
异常处理方法详解
handle()方法
handle()方法可以处理正常结果和异常:
import java.util.concurrent.CompletableFuture;
public class HandleExceptionDemo {
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
if (Math.random() > 0.5) {
return "Success";
} else {
throw new RuntimeException("Random failure");
}
})
.handle((result, throwable) -> {
if (throwable != null) {
System.err.println("Caught exception: " + throwable.getMessage());
return "Default value due to error";
}
return result + " processed";
});
future.thenAccept(System.out::println).join();
}
}
exceptionally()方法
exceptionally()方法专门用于处理异常情况:
import java.util.concurrent.CompletableFuture;
public class ExceptionallyDemo {
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
throw new RuntimeException("Original failure");
})
.exceptionally(throwable -> {
System.err.println("Recovering from: " + throwable.getMessage());
return "Recovered value";
});
System.out.println("Result: " + future.join());
}
}
whenComplete()方法
whenComplete()方法在任务完成后执行,无论成功还是失败:
import java.util.concurrent.CompletableFuture;
public class WhenCompleteDemo {
public static void main(String[] args) {
CompletableFuture<String> successFuture = CompletableFuture
.supplyAsync(() -> "Success result")
.whenComplete((result, throwable) -> {
if (throwable != null) {
System.err.println("Task failed: " + throwable.getMessage());
} else {
System.out.println("Task succeeded with result: " + result);
}
});
CompletableFuture<String> failureFuture = CompletableFuture
.supplyAsync(() -> {
throw new RuntimeException("Task failure");
})
.whenComplete((result, throwable) -> {
if (throwable != null) {
System.err.println("Task failed: " + throwable.getMessage());
} else {
System.out.println("Task succeeded with result: " + result);
}
});
System.out.println("Success result: " + successFuture.join());
try {
System.out.println("Failure result: " + failureFuture.join());
} catch (Exception e) {
System.err.println("Caught exception in main: " + e.getMessage());
}
}
}
组合操作中的异常处理
在组合多个CompletableFuture时,需要特别注意异常处理:
import java.util.concurrent.CompletableFuture;
public class CompletableFutureCombinationException {
public static void main(String[] args) {
CompletableFuture<String> future1 = CompletableFuture
.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Result 1";
});
CompletableFuture<String> future2 = CompletableFuture
.supplyAsync(() -> {
throw new RuntimeException("Failure in future 2");
});
// 使用allOf组合
CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2);
CompletableFuture<String> combinedFuture = allFutures
.handle((voidResult, throwable) -> {
if (throwable != null) {
System.err.println("One or more futures failed: " + throwable.getMessage());
return "Default combined result";
}
try {
String result1 = future1.get();
String result2 = future2.get();
return result1 + " + " + result2;
} catch (Exception e) {
return "Error getting results: " + e.getMessage();
}
});
System.out.println("Combined result: " + combinedFuture.join());
}
}
异步任务监控告警机制
基于MDC的上下文追踪
在异步任务中保持上下文信息对于问题排查至关重要:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ContextAwareAsyncTask {
private static final Logger logger = LoggerFactory.getLogger(ContextAwareAsyncTask.class);
private static final ExecutorService executor = Executors.newFixedThreadPool(4);
public static CompletableFuture<String> processWithTraceId(String input) {
String traceId = UUID.randomUUID().toString();
return CompletableFuture.supplyAsync(() -> {
try {
MDC.put("traceId", traceId);
logger.info("Starting async processing for input: {}", input);
// 模拟业务处理
if (input == null || input.isEmpty()) {
throw new IllegalArgumentException("Input cannot be null or empty");
}
Thread.sleep(100); // 模拟耗时操作
String result = input.toUpperCase();
logger.info("Processing completed with result: {}", result);
return result;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} finally {
MDC.clear();
}
}, executor);
}
public static void main(String[] args) {
processWithTraceId("hello")
.thenAccept(result -> logger.info("Final result: {}", result))
.exceptionally(throwable -> {
logger.error("Processing failed", throwable);
return null;
});
// 等待任务完成
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
executor.shutdown();
}
}
任务执行时间监控
监控任务执行时间有助于发现性能瓶颈:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
public class TaskExecutionMonitor {
private static final AtomicLong totalTasks = new AtomicLong(0);
private static final AtomicLong failedTasks = new AtomicLong(0);
private static final AtomicLong totalTime = new AtomicLong(0);
public static <T> CompletableFuture<T> monitoredSupplyAsync(Supplier<T> supplier) {
long startTime = System.currentTimeMillis();
totalTasks.incrementAndGet();
return CompletableFuture.supplyAsync(() -> {
try {
T result = supplier.get();
return result;
} catch (Exception e) {
failedTasks.incrementAndGet();
throw e;
} finally {
long executionTime = System.currentTimeMillis() - startTime;
totalTime.addAndGet(executionTime);
System.out.printf("Task completed in %d ms. Total tasks: %d, Failed: %d, Avg time: %.2f ms%n",
executionTime, totalTasks.get(), failedTasks.get(),
(double) totalTime.get() / totalTasks.get());
}
});
}
public static void main(String[] args) {
// 正常任务
monitoredSupplyAsync(() -> {
try {
Thread.sleep(100);
return "Success";
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}).thenAccept(System.out::println);
// 失败任务
monitoredSupplyAsync(() -> {
throw new RuntimeException("Simulated failure");
}).exceptionally(throwable -> {
System.err.println("Task failed: " + throwable.getMessage());
return "Default";
});
// 等待任务完成
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
自定义异步任务包装器
创建一个通用的异步任务包装器,集成异常处理和监控功能:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.function.Supplier;
public class AsyncTaskWrapper {
public static <T> CompletableFuture<T> wrap(Supplier<T> task, Executor executor) {
return CompletableFuture.supplyAsync(() -> {
long startTime = System.currentTimeMillis();
String taskName = Thread.currentThread().getName();
try {
T result = task.get();
long executionTime = System.currentTimeMillis() - startTime;
System.out.printf("Task %s completed successfully in %d ms%n", taskName, executionTime);
return result;
} catch (Exception e) {
long executionTime = System.currentTimeMillis() - startTime;
System.err.printf("Task %s failed after %d ms: %s%n", taskName, executionTime, e.getMessage());
throw e;
}
}, executor);
}
public static <T, R> CompletableFuture<R> wrapWithRecovery(
Supplier<T> task,
Function<T, R> successHandler,
Function<Throwable, R> errorHandler,
Executor executor) {
return CompletableFuture.supplyAsync(() -> {
try {
T result = task.get();
return successHandler.apply(result);
} catch (Exception e) {
return errorHandler.apply(e);
}
}, executor);
}
public static void main(String[] args) {
// 使用包装器执行任务
wrap(() -> {
if (Math.random() > 0.7) {
throw new RuntimeException("Random failure");
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Success result";
}, CompletableFuture.delayedExecutor(1, java.util.concurrent.TimeUnit.SECONDS))
.thenAccept(System.out::println)
.exceptionally(throwable -> {
System.err.println("Final exception handler: " + throwable.getMessage());
return null;
});
// 使用带恢复机制的包装器
wrapWithRecovery(
() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("Task failure");
}
return "Primary result";
},
result -> "Processed: " + result,
throwable -> "Recovered from: " + throwable.getMessage(),
CompletableFuture.delayedExecutor(2, java.util.concurrent.TimeUnit.SECONDS)
).thenAccept(System.out::println);
// 等待任务完成
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
高级异常处理模式
重试机制实现
在异步任务中实现重试机制是处理临时性故障的有效方法:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
public class RetryableAsyncTask {
private static final ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(2);
public static <T> CompletableFuture<T> retryableSupplyAsync(
Supplier<T> task,
int maxRetries,
long delayMillis) {
return retryableSupplyAsync(task, maxRetries, delayMillis, 0);
}
private static <T> CompletableFuture<T> retryableSupplyAsync(
Supplier<T> task,
int maxRetries,
long delayMillis,
int currentAttempt) {
CompletableFuture<T> future = CompletableFuture.supplyAsync(task);
if (currentAttempt >= maxRetries) {
return future;
}
return future.exceptionally(throwable -> {
System.err.printf("Attempt %d failed: %s. Retrying in %d ms...%n",
currentAttempt + 1, throwable.getMessage(), delayMillis);
CompletableFuture<T> delayedFuture = new CompletableFuture<>();
scheduler.schedule(() -> {
retryableSupplyAsync(task, maxRetries, delayMillis, currentAttempt + 1)
.whenComplete((result, error) -> {
if (error != null) {
delayedFuture.completeExceptionally(error);
} else {
delayedFuture.complete(result);
}
});
}, delayMillis, TimeUnit.MILLISECONDS);
return null; // 这个值不会被使用
}).thenCompose(result -> {
if (result == null) {
// 如果结果为null,说明发生了异常,需要等待重试结果
// 这里需要特殊处理,实际应用中可能需要更复杂的逻辑
return new CompletableFuture<>();
}
return CompletableFuture.completedFuture(result);
});
}
// 更简洁的重试实现
public static <T> CompletableFuture<T> retryableSupplyAsyncSimple(
Supplier<T> task,
int maxRetries,
long delayMillis) {
CompletableFuture<T> future = CompletableFuture.supplyAsync(task);
for (int i = 0; i < maxRetries; i++) {
future = future.exceptionally(throwable -> {
try {
Thread.sleep(delayMillis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
return task.get();
});
}
return future;
}
public static void main(String[] args) {
retryableSupplyAsyncSimple(() -> {
if (Math.random() > 0.7) {
throw new RuntimeException("Temporary failure");
}
return "Success after retry";
}, 3, 1000).thenAccept(System.out::println)
.exceptionally(throwable -> {
System.err.println("All retries failed: " + throwable.getMessage());
return null;
});
// 等待任务完成
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
scheduler.shutdown();
}
}
超时控制机制
为异步任务设置超时时间是防止任务无限期阻塞的重要手段:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class TimeoutControl {
private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
public static <T> CompletableFuture<T> withTimeout(
CompletableFuture<T> future,
long timeout,
TimeUnit unit) {
CompletableFuture<T> timeoutFuture = new CompletableFuture<>();
scheduler.schedule(() -> {
if (!future.isDone()) {
timeoutFuture.completeExceptionally(
new RuntimeException("Task timed out after " + timeout + " " + unit));
}
}, timeout, unit);
return CompletableFuture.anyOf(future, timeoutFuture)
.thenApply(result -> {
if (result instanceof CompletableFuture) {
try {
return ((CompletableFuture<T>) result).get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
return (T) result;
});
}
public static void main(String[] args) {
CompletableFuture<String> longRunningTask = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000); // 模拟长时间运行的任务
return "Task completed";
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
});
withTimeout(longRunningTask, 2, TimeUnit.SECONDS)
.thenAccept(System.out::println)
.exceptionally(throwable -> {
System.err.println("Task failed or timed out: " + throwable.getMessage());
return null;
});
// 等待任务完成
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
scheduler.shutdown();
}
}
最佳实践与建议
1. 统一异常处理策略
建立统一的异常处理策略,确保所有异步任务都有适当的异常处理:
public class UnifiedExceptionHandler {
public static <T> CompletableFuture<T> withUnifiedHandling(CompletableFuture<T> future) {
return future.exceptionally(throwable -> {
// 记录日志
System.err.println("Unhandled exception in async task: " + throwable.getMessage());
throwable.printStackTrace();
// 发送告警
sendAlert(throwable);
// 根据异常类型决定是否重新抛出
if (throwable instanceof RuntimeException) {
throw (RuntimeException) throwable;
} else {
throw new RuntimeException(throwable);
}
});
}
private static void sendAlert(Throwable throwable) {
// 实现告警逻辑,如发送邮件、调用监控系统API等
System.out.println("Alert sent for exception: " + throwable.getMessage());
}
}
2. 资源管理
确保异步任务中的资源得到正确释放:
import java.util.concurrent.CompletableFuture;
import java.io.Closeable;
public class ResourceManagedAsyncTask {
public static <T extends Closeable, R> CompletableFuture<R> withResource(
Supplier<T> resourceSupplier,
Function<T, R> task) {
return CompletableFuture.supplyAsync(() -> {
T resource = resourceSupplier.get();
try {
return task.apply(resource);
} finally {
try {
resource.close();
} catch (Exception e) {
System.err.println("Error closing resource: " + e.getMessage());
}
}
});
}
// 使用示例
public static void main(String[] args) {
withResource(
() -> {
// 创建资源,如数据库连接、文件句柄等
System.out.println("Resource acquired");
return new MockResource();
},
resource -> {
// 使用资源执行任务
System.out.println("Using resource to perform task");
return "Task result";
}
).thenAccept(System.out::println)
.exceptionally(throwable -> {
System.err.println("Task failed: " + throwable.getMessage());
return null;
});
// 等待任务完成
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
static class MockResource implements Closeable {
@Override
public void close() {
System.out.println("Resource closed");
}
}
}
3. 监控指标收集
建立完善的监控指标收集机制:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Map;
public class AsyncTaskMetrics {
private static final AtomicLong totalTasks = new AtomicLong(0);
private static final AtomicLong failedTasks = new AtomicLong(0);
private static final AtomicLong successfulTasks = new AtomicLong(0);
private static final Map<String, AtomicLong> taskTypeCounters = new ConcurrentHashMap<>();
public static <T> CompletableFuture<T> withMetrics(
CompletableFuture<T> future,
String taskType) {
totalTasks.incrementAndGet();
taskTypeCounters.computeIfAbsent(taskType, k -> new AtomicLong(0))
.incrementAndGet();
long startTime = System.currentTimeMillis();
return future.whenComplete((result, throwable) -> {
long executionTime = System.currentTimeMillis() - startTime;
if (throwable != null) {
failedTasks.incrementAndGet();
System.err.printf("Task %s failed after %d ms: %s%n",
taskType, executionTime, throwable.getMessage());
} else {
successfulTasks.incrementAndGet();
System.out.printf("Task %s completed successfully in %d ms%n",
taskType, executionTime);
}
// 可以将指标发送到监控系统
reportMetrics(taskType, executionTime, throwable != null);
});
}
private static void reportMetrics(String taskType, long executionTime, boolean failed) {
// 实现指标上报逻辑
System.out.printf("Reporting metrics - Task: %s, Time: %d ms, Failed: %b%n",
taskType, executionTime, failed);
}
public static void printSummary() {
System.out.printf("Total tasks: %d, Successful: %d, Failed: %d%n",
totalTasks.get(), successfulTasks.get(), failedTasks.get());
}
public static void main(String[] args) {
withMetrics(
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(100);
return "Result 1";
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}),
"database-query"
).thenAccept(System.out::println);
withMetrics(
CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("Simulated error");
}),
"api-call"
).exceptionally(throwable -> {
System.err.println("API call failed: " + throwable.getMessage());
return null;
});
// 等待任务完成并打印摘要
try {
Thread.sleep(2000);
printSummary();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
总结
Java并发编程中的异常处理是一个复杂
评论 (0)