Java并发编程异常处理深度解析:线程池异常捕获、CompletableFuture错误处理与监控告警

D
dashen44 2025-09-09T14:15:21+08:00
0 0 184

Java并发编程异常处理深度解析:线程池异常捕获、CompletableFuture错误处理与监控告警

引言

在现代Java应用开发中,并发编程已成为构建高性能、高吞吐量系统的核心技术。然而,并发环境下的异常处理却是一个复杂且容易被忽视的问题。线程池中的任务异常、CompletableFuture链式调用中的错误传播、以及异步任务的监控告警,都是开发者在实际项目中经常遇到的挑战。

本文将深入探讨Java并发编程中的异常处理机制,从线程池的异常捕获到CompletableFuture的错误处理,再到异步任务的监控告警,为开发者提供一套完整的解决方案和最佳实践。

线程池异常处理机制详解

线程池异常处理的重要性

在线程池中执行的任务如果抛出未捕获的异常,会导致线程终止,线程池会创建新的线程来替代终止的线程。这种行为在某些情况下可能导致资源泄露或任务丢失。因此,正确处理线程池中的异常至关重要。

默认异常处理行为

首先,让我们看看线程池的默认异常处理行为:

import java.util.concurrent.*;

public class ThreadPoolExceptionDemo {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(2);
        
        // 提交一个会抛出异常的任务
        executor.submit(() -> {
            System.out.println("Task started");
            throw new RuntimeException("Task failed!");
        });
        
        // 提交一个正常任务
        executor.submit(() -> {
            System.out.println("Normal task executed");
        });
        
        executor.shutdown();
    }
}

运行上述代码,我们会发现异常被吞没了,但正常任务仍然可以执行。这是因为submit()方法会捕获异常并将其封装在返回的Future对象中。

通过Future获取异常

使用submit()方法提交任务时,可以通过Future.get()方法获取异常:

import java.util.concurrent.*;

public class FutureExceptionHandling {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(2);
        
        Future<?> future = executor.submit(() -> {
            System.out.println("Task started");
            throw new RuntimeException("Task failed!");
        });
        
        try {
            future.get(); // 这里会抛出ExecutionException
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            System.err.println("Task was interrupted");
        } catch (ExecutionException e) {
            System.err.println("Task failed with exception: " + e.getCause());
        }
        
        executor.shutdown();
    }
}

自定义ThreadFactory处理异常

通过自定义ThreadFactory可以为线程池中的线程设置未捕获异常处理器:

import java.util.concurrent.*;

public class CustomThreadFactory implements ThreadFactory {
    private final ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory();
    private final Thread.UncaughtExceptionHandler exceptionHandler;
    
    public CustomThreadFactory(Thread.UncaughtExceptionHandler exceptionHandler) {
        this.exceptionHandler = exceptionHandler;
    }
    
    @Override
    public Thread newThread(Runnable r) {
        Thread thread = defaultThreadFactory.newThread(r);
        thread.setUncaughtExceptionHandler(exceptionHandler);
        return thread;
    }
    
    public static void main(String[] args) {
        Thread.UncaughtExceptionHandler handler = (thread, throwable) -> {
            System.err.println("Thread " + thread.getName() + 
                " threw exception: " + throwable.getMessage());
            throwable.printStackTrace();
        };
        
        ExecutorService executor = new ThreadPoolExecutor(
            2, 4, 60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(),
            new CustomThreadFactory(handler)
        );
        
        executor.submit(() -> {
            System.out.println("Task started");
            throw new RuntimeException("Task failed!");
        });
        
        // 等待一段时间让异常处理完成
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        
        executor.shutdown();
    }
}

实现RejectedExecutionHandler处理拒绝策略异常

当线程池无法接受新任务时,可以通过自定义RejectedExecutionHandler来处理:

import java.util.concurrent.*;

public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.err.println("Task rejected: " + r.toString());
        
        // 可以选择将任务记录到日志、放入队列或执行其他处理逻辑
        if (!executor.isShutdown()) {
            try {
                // 尝试放入阻塞队列
                executor.getQueue().put(r);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.err.println("Interrupted while trying to queue rejected task");
            }
        }
    }
    
    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            1, 1, 60L, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(1),
            new CustomRejectedExecutionHandler()
        );
        
        // 提交多个任务,触发拒绝策略
        for (int i = 0; i < 5; i++) {
            final int taskId = i;
            executor.submit(() -> {
                try {
                    Thread.sleep(2000); // 模拟耗时任务
                    System.out.println("Task " + taskId + " completed");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        
        executor.shutdown();
    }
}

CompletableFuture错误处理机制

CompletableFuture异常传播机制

CompletableFuture在链式调用中具有独特的异常传播机制。异常会在链中传播,直到遇到适当的异常处理方法。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureExceptionPropagation {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture
            .supplyAsync(() -> {
                System.out.println("Step 1: Starting computation");
                throw new RuntimeException("Computation failed in step 1");
            })
            .thenApply(result -> {
                System.out.println("Step 2: Processing result");
                return result.toUpperCase();
            })
            .thenApply(result -> {
                System.out.println("Step 3: Final processing");
                return "Processed: " + result;
            });
        
        try {
            String result = future.get();
            System.out.println("Final result: " + result);
        } catch (InterruptedException | ExecutionException e) {
            System.err.println("Operation failed: " + e.getCause().getMessage());
        }
    }
}

异常处理方法详解

handle()方法

handle()方法可以处理正常结果和异常:

import java.util.concurrent.CompletableFuture;

public class HandleExceptionDemo {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture
            .supplyAsync(() -> {
                if (Math.random() > 0.5) {
                    return "Success";
                } else {
                    throw new RuntimeException("Random failure");
                }
            })
            .handle((result, throwable) -> {
                if (throwable != null) {
                    System.err.println("Caught exception: " + throwable.getMessage());
                    return "Default value due to error";
                }
                return result + " processed";
            });
        
        future.thenAccept(System.out::println).join();
    }
}

exceptionally()方法

exceptionally()方法专门用于处理异常情况:

import java.util.concurrent.CompletableFuture;

public class ExceptionallyDemo {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture
            .supplyAsync(() -> {
                throw new RuntimeException("Original failure");
            })
            .exceptionally(throwable -> {
                System.err.println("Recovering from: " + throwable.getMessage());
                return "Recovered value";
            });
        
        System.out.println("Result: " + future.join());
    }
}

whenComplete()方法

whenComplete()方法在任务完成后执行,无论成功还是失败:

import java.util.concurrent.CompletableFuture;

public class WhenCompleteDemo {
    public static void main(String[] args) {
        CompletableFuture<String> successFuture = CompletableFuture
            .supplyAsync(() -> "Success result")
            .whenComplete((result, throwable) -> {
                if (throwable != null) {
                    System.err.println("Task failed: " + throwable.getMessage());
                } else {
                    System.out.println("Task succeeded with result: " + result);
                }
            });
        
        CompletableFuture<String> failureFuture = CompletableFuture
            .supplyAsync(() -> {
                throw new RuntimeException("Task failure");
            })
            .whenComplete((result, throwable) -> {
                if (throwable != null) {
                    System.err.println("Task failed: " + throwable.getMessage());
                } else {
                    System.out.println("Task succeeded with result: " + result);
                }
            });
        
        System.out.println("Success result: " + successFuture.join());
        try {
            System.out.println("Failure result: " + failureFuture.join());
        } catch (Exception e) {
            System.err.println("Caught exception in main: " + e.getMessage());
        }
    }
}

组合操作中的异常处理

在组合多个CompletableFuture时,需要特别注意异常处理:

import java.util.concurrent.CompletableFuture;

public class CompletableFutureCombinationException {
    public static void main(String[] args) {
        CompletableFuture<String> future1 = CompletableFuture
            .supplyAsync(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                return "Result 1";
            });
        
        CompletableFuture<String> future2 = CompletableFuture
            .supplyAsync(() -> {
                throw new RuntimeException("Failure in future 2");
            });
        
        // 使用allOf组合
        CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2);
        
        CompletableFuture<String> combinedFuture = allFutures
            .handle((voidResult, throwable) -> {
                if (throwable != null) {
                    System.err.println("One or more futures failed: " + throwable.getMessage());
                    return "Default combined result";
                }
                
                try {
                    String result1 = future1.get();
                    String result2 = future2.get();
                    return result1 + " + " + result2;
                } catch (Exception e) {
                    return "Error getting results: " + e.getMessage();
                }
            });
        
        System.out.println("Combined result: " + combinedFuture.join());
    }
}

异步任务监控告警机制

基于MDC的上下文追踪

在异步任务中保持上下文信息对于问题排查至关重要:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ContextAwareAsyncTask {
    private static final Logger logger = LoggerFactory.getLogger(ContextAwareAsyncTask.class);
    private static final ExecutorService executor = Executors.newFixedThreadPool(4);
    
    public static CompletableFuture<String> processWithTraceId(String input) {
        String traceId = UUID.randomUUID().toString();
        
        return CompletableFuture.supplyAsync(() -> {
            try {
                MDC.put("traceId", traceId);
                logger.info("Starting async processing for input: {}", input);
                
                // 模拟业务处理
                if (input == null || input.isEmpty()) {
                    throw new IllegalArgumentException("Input cannot be null or empty");
                }
                
                Thread.sleep(100); // 模拟耗时操作
                String result = input.toUpperCase();
                logger.info("Processing completed with result: {}", result);
                return result;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            } finally {
                MDC.clear();
            }
        }, executor);
    }
    
    public static void main(String[] args) {
        processWithTraceId("hello")
            .thenAccept(result -> logger.info("Final result: {}", result))
            .exceptionally(throwable -> {
                logger.error("Processing failed", throwable);
                return null;
            });
        
        // 等待任务完成
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        
        executor.shutdown();
    }
}

任务执行时间监控

监控任务执行时间有助于发现性能瓶颈:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

public class TaskExecutionMonitor {
    private static final AtomicLong totalTasks = new AtomicLong(0);
    private static final AtomicLong failedTasks = new AtomicLong(0);
    private static final AtomicLong totalTime = new AtomicLong(0);
    
    public static <T> CompletableFuture<T> monitoredSupplyAsync(Supplier<T> supplier) {
        long startTime = System.currentTimeMillis();
        totalTasks.incrementAndGet();
        
        return CompletableFuture.supplyAsync(() -> {
            try {
                T result = supplier.get();
                return result;
            } catch (Exception e) {
                failedTasks.incrementAndGet();
                throw e;
            } finally {
                long executionTime = System.currentTimeMillis() - startTime;
                totalTime.addAndGet(executionTime);
                System.out.printf("Task completed in %d ms. Total tasks: %d, Failed: %d, Avg time: %.2f ms%n",
                    executionTime, totalTasks.get(), failedTasks.get(),
                    (double) totalTime.get() / totalTasks.get());
            }
        });
    }
    
    public static void main(String[] args) {
        // 正常任务
        monitoredSupplyAsync(() -> {
            try {
                Thread.sleep(100);
                return "Success";
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }).thenAccept(System.out::println);
        
        // 失败任务
        monitoredSupplyAsync(() -> {
            throw new RuntimeException("Simulated failure");
        }).exceptionally(throwable -> {
            System.err.println("Task failed: " + throwable.getMessage());
            return "Default";
        });
        
        // 等待任务完成
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

自定义异步任务包装器

创建一个通用的异步任务包装器,集成异常处理和监控功能:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.function.Supplier;

public class AsyncTaskWrapper {
    
    public static <T> CompletableFuture<T> wrap(Supplier<T> task, Executor executor) {
        return CompletableFuture.supplyAsync(() -> {
            long startTime = System.currentTimeMillis();
            String taskName = Thread.currentThread().getName();
            
            try {
                T result = task.get();
                long executionTime = System.currentTimeMillis() - startTime;
                System.out.printf("Task %s completed successfully in %d ms%n", taskName, executionTime);
                return result;
            } catch (Exception e) {
                long executionTime = System.currentTimeMillis() - startTime;
                System.err.printf("Task %s failed after %d ms: %s%n", taskName, executionTime, e.getMessage());
                throw e;
            }
        }, executor);
    }
    
    public static <T, R> CompletableFuture<R> wrapWithRecovery(
            Supplier<T> task, 
            Function<T, R> successHandler,
            Function<Throwable, R> errorHandler,
            Executor executor) {
        
        return CompletableFuture.supplyAsync(() -> {
            try {
                T result = task.get();
                return successHandler.apply(result);
            } catch (Exception e) {
                return errorHandler.apply(e);
            }
        }, executor);
    }
    
    public static void main(String[] args) {
        // 使用包装器执行任务
        wrap(() -> {
            if (Math.random() > 0.7) {
                throw new RuntimeException("Random failure");
            }
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return "Success result";
        }, CompletableFuture.delayedExecutor(1, java.util.concurrent.TimeUnit.SECONDS))
        .thenAccept(System.out::println)
        .exceptionally(throwable -> {
            System.err.println("Final exception handler: " + throwable.getMessage());
            return null;
        });
        
        // 使用带恢复机制的包装器
        wrapWithRecovery(
            () -> {
                if (Math.random() > 0.5) {
                    throw new RuntimeException("Task failure");
                }
                return "Primary result";
            },
            result -> "Processed: " + result,
            throwable -> "Recovered from: " + throwable.getMessage(),
            CompletableFuture.delayedExecutor(2, java.util.concurrent.TimeUnit.SECONDS)
        ).thenAccept(System.out::println);
        
        // 等待任务完成
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

高级异常处理模式

重试机制实现

在异步任务中实现重试机制是处理临时性故障的有效方法:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

public class RetryableAsyncTask {
    private static final ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(2);
    
    public static <T> CompletableFuture<T> retryableSupplyAsync(
            Supplier<T> task, 
            int maxRetries, 
            long delayMillis) {
        
        return retryableSupplyAsync(task, maxRetries, delayMillis, 0);
    }
    
    private static <T> CompletableFuture<T> retryableSupplyAsync(
            Supplier<T> task, 
            int maxRetries, 
            long delayMillis,
            int currentAttempt) {
        
        CompletableFuture<T> future = CompletableFuture.supplyAsync(task);
        
        if (currentAttempt >= maxRetries) {
            return future;
        }
        
        return future.exceptionally(throwable -> {
            System.err.printf("Attempt %d failed: %s. Retrying in %d ms...%n", 
                currentAttempt + 1, throwable.getMessage(), delayMillis);
            
            CompletableFuture<T> delayedFuture = new CompletableFuture<>();
            scheduler.schedule(() -> {
                retryableSupplyAsync(task, maxRetries, delayMillis, currentAttempt + 1)
                    .whenComplete((result, error) -> {
                        if (error != null) {
                            delayedFuture.completeExceptionally(error);
                        } else {
                            delayedFuture.complete(result);
                        }
                    });
            }, delayMillis, TimeUnit.MILLISECONDS);
            
            return null; // 这个值不会被使用
        }).thenCompose(result -> {
            if (result == null) {
                // 如果结果为null,说明发生了异常,需要等待重试结果
                // 这里需要特殊处理,实际应用中可能需要更复杂的逻辑
                return new CompletableFuture<>();
            }
            return CompletableFuture.completedFuture(result);
        });
    }
    
    // 更简洁的重试实现
    public static <T> CompletableFuture<T> retryableSupplyAsyncSimple(
            Supplier<T> task, 
            int maxRetries, 
            long delayMillis) {
        
        CompletableFuture<T> future = CompletableFuture.supplyAsync(task);
        
        for (int i = 0; i < maxRetries; i++) {
            future = future.exceptionally(throwable -> {
                try {
                    Thread.sleep(delayMillis);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e);
                }
                return task.get();
            });
        }
        
        return future;
    }
    
    public static void main(String[] args) {
        retryableSupplyAsyncSimple(() -> {
            if (Math.random() > 0.7) {
                throw new RuntimeException("Temporary failure");
            }
            return "Success after retry";
        }, 3, 1000).thenAccept(System.out::println)
          .exceptionally(throwable -> {
              System.err.println("All retries failed: " + throwable.getMessage());
              return null;
          });
        
        // 等待任务完成
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        
        scheduler.shutdown();
    }
}

超时控制机制

为异步任务设置超时时间是防止任务无限期阻塞的重要手段:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class TimeoutControl {
    private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
    
    public static <T> CompletableFuture<T> withTimeout(
            CompletableFuture<T> future, 
            long timeout, 
            TimeUnit unit) {
        
        CompletableFuture<T> timeoutFuture = new CompletableFuture<>();
        
        scheduler.schedule(() -> {
            if (!future.isDone()) {
                timeoutFuture.completeExceptionally(
                    new RuntimeException("Task timed out after " + timeout + " " + unit));
            }
        }, timeout, unit);
        
        return CompletableFuture.anyOf(future, timeoutFuture)
            .thenApply(result -> {
                if (result instanceof CompletableFuture) {
                    try {
                        return ((CompletableFuture<T>) result).get();
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
                return (T) result;
            });
    }
    
    public static void main(String[] args) {
        CompletableFuture<String> longRunningTask = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000); // 模拟长时间运行的任务
                return "Task completed";
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        });
        
        withTimeout(longRunningTask, 2, TimeUnit.SECONDS)
            .thenAccept(System.out::println)
            .exceptionally(throwable -> {
                System.err.println("Task failed or timed out: " + throwable.getMessage());
                return null;
            });
        
        // 等待任务完成
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        
        scheduler.shutdown();
    }
}

最佳实践与建议

1. 统一异常处理策略

建立统一的异常处理策略,确保所有异步任务都有适当的异常处理:

public class UnifiedExceptionHandler {
    public static <T> CompletableFuture<T> withUnifiedHandling(CompletableFuture<T> future) {
        return future.exceptionally(throwable -> {
            // 记录日志
            System.err.println("Unhandled exception in async task: " + throwable.getMessage());
            throwable.printStackTrace();
            
            // 发送告警
            sendAlert(throwable);
            
            // 根据异常类型决定是否重新抛出
            if (throwable instanceof RuntimeException) {
                throw (RuntimeException) throwable;
            } else {
                throw new RuntimeException(throwable);
            }
        });
    }
    
    private static void sendAlert(Throwable throwable) {
        // 实现告警逻辑,如发送邮件、调用监控系统API等
        System.out.println("Alert sent for exception: " + throwable.getMessage());
    }
}

2. 资源管理

确保异步任务中的资源得到正确释放:

import java.util.concurrent.CompletableFuture;
import java.io.Closeable;

public class ResourceManagedAsyncTask {
    public static <T extends Closeable, R> CompletableFuture<R> withResource(
            Supplier<T> resourceSupplier,
            Function<T, R> task) {
        
        return CompletableFuture.supplyAsync(() -> {
            T resource = resourceSupplier.get();
            try {
                return task.apply(resource);
            } finally {
                try {
                    resource.close();
                } catch (Exception e) {
                    System.err.println("Error closing resource: " + e.getMessage());
                }
            }
        });
    }
    
    // 使用示例
    public static void main(String[] args) {
        withResource(
            () -> {
                // 创建资源,如数据库连接、文件句柄等
                System.out.println("Resource acquired");
                return new MockResource();
            },
            resource -> {
                // 使用资源执行任务
                System.out.println("Using resource to perform task");
                return "Task result";
            }
        ).thenAccept(System.out::println)
         .exceptionally(throwable -> {
             System.err.println("Task failed: " + throwable.getMessage());
             return null;
         });
        
        // 等待任务完成
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    static class MockResource implements Closeable {
        @Override
        public void close() {
            System.out.println("Resource closed");
        }
    }
}

3. 监控指标收集

建立完善的监控指标收集机制:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Map;

public class AsyncTaskMetrics {
    private static final AtomicLong totalTasks = new AtomicLong(0);
    private static final AtomicLong failedTasks = new AtomicLong(0);
    private static final AtomicLong successfulTasks = new AtomicLong(0);
    private static final Map<String, AtomicLong> taskTypeCounters = new ConcurrentHashMap<>();
    
    public static <T> CompletableFuture<T> withMetrics(
            CompletableFuture<T> future, 
            String taskType) {
        
        totalTasks.incrementAndGet();
        taskTypeCounters.computeIfAbsent(taskType, k -> new AtomicLong(0))
                       .incrementAndGet();
        
        long startTime = System.currentTimeMillis();
        
        return future.whenComplete((result, throwable) -> {
            long executionTime = System.currentTimeMillis() - startTime;
            
            if (throwable != null) {
                failedTasks.incrementAndGet();
                System.err.printf("Task %s failed after %d ms: %s%n", 
                    taskType, executionTime, throwable.getMessage());
            } else {
                successfulTasks.incrementAndGet();
                System.out.printf("Task %s completed successfully in %d ms%n", 
                    taskType, executionTime);
            }
            
            // 可以将指标发送到监控系统
            reportMetrics(taskType, executionTime, throwable != null);
        });
    }
    
    private static void reportMetrics(String taskType, long executionTime, boolean failed) {
        // 实现指标上报逻辑
        System.out.printf("Reporting metrics - Task: %s, Time: %d ms, Failed: %b%n", 
            taskType, executionTime, failed);
    }
    
    public static void printSummary() {
        System.out.printf("Total tasks: %d, Successful: %d, Failed: %d%n",
            totalTasks.get(), successfulTasks.get(), failedTasks.get());
    }
    
    public static void main(String[] args) {
        withMetrics(
            CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(100);
                    return "Result 1";
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e);
                }
            }),
            "database-query"
        ).thenAccept(System.out::println);
        
        withMetrics(
            CompletableFuture.supplyAsync(() -> {
                throw new RuntimeException("Simulated error");
            }),
            "api-call"
        ).exceptionally(throwable -> {
            System.err.println("API call failed: " + throwable.getMessage());
            return null;
        });
        
        // 等待任务完成并打印摘要
        try {
            Thread.sleep(2000);
            printSummary();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

总结

Java并发编程中的异常处理是一个复杂

相似文章

    评论 (0)