数据库分库分表异常处理机制设计:从连接池管理到事务回滚的完整解决方案

技术趋势洞察
技术趋势洞察 2025-12-15T02:28:01+08:00
0 0 10

引言

随着业务规模的快速增长,传统单体数据库已无法满足高并发、大数据量的访问需求。数据库分库分表作为解决这一问题的核心技术方案,通过将数据分散存储到多个数据库实例中,有效提升了系统的扩展性和性能。然而,分库分表架构在带来性能提升的同时,也引入了复杂的异常处理挑战。

在分布式环境中,网络抖动、连接超时、节点故障等问题频发,如何确保系统在异常情况下的稳定运行成为关键难题。本文将深入探讨数据库分库分表架构下的异常处理机制设计,从连接池管理到分布式事务处理,提供一套完整的解决方案。

数据库分库分表架构概述

架构特点与挑战

数据库分库分表架构的核心在于将原本统一的数据库拆分为多个逻辑上独立的数据库实例,通过特定的路由规则将数据分布到不同的物理节点。这种架构带来了显著的优势:

  • 水平扩展性:能够轻松应对数据量增长和并发访问增加的需求
  • 性能提升:减少单点压力,提高查询效率
  • 资源隔离:不同业务模块可以独立部署和管理

然而,这种架构也带来了诸多挑战:

  1. 分布式事务处理复杂:跨库操作需要保证事务一致性
  2. 连接管理困难:多个数据库实例的连接池管理更加复杂
  3. 数据一致性保障:如何在异常情况下维持数据完整性
  4. 监控与故障恢复:复杂的分布式环境需要完善的监控体系

异常场景分析

在分库分表环境中,常见的异常场景包括:

  • 数据库连接中断或超时
  • 网络抖动导致的请求失败
  • 某个数据库节点故障
  • 分布式事务提交失败
  • 资源耗尽导致的系统不稳定

连接池管理与异常处理

连接池设计原则

连接池是数据库访问的基础组件,在分库分表架构中,需要为每个数据库实例维护独立的连接池。合理的连接池配置对于系统的稳定性和性能至关重要。

@Configuration
public class DataSourceConfig {
    
    @Bean("primaryDataSource")
    @Primary
    public DataSource primaryDataSource() {
        HikariConfig config = new HikariConfig();
        config.setJdbcUrl("jdbc:mysql://localhost:3306/db1");
        config.setUsername("user");
        config.setPassword("password");
        config.setMaximumPoolSize(20);
        config.setMinimumIdle(5);
        config.setConnectionTimeout(30000);
        config.setIdleTimeout(600000);
        config.setMaxLifetime(1800000);
        config.setLeakDetectionThreshold(60000);
        
        return new HikariDataSource(config);
    }
    
    @Bean("secondaryDataSource")
    public DataSource secondaryDataSource() {
        HikariConfig config = new HikariConfig();
        config.setJdbcUrl("jdbc:mysql://localhost:3306/db2");
        config.setUsername("user");
        config.setPassword("password");
        config.setMaximumPoolSize(15);
        config.setMinimumIdle(3);
        config.setConnectionTimeout(30000);
        config.setIdleTimeout(300000);
        config.setMaxLifetime(1200000);
        config.setLeakDetectionThreshold(60000);
        
        return new HikariDataSource(config);
    }
}

连接异常检测与恢复

连接池需要具备完善的异常检测机制,当检测到连接异常时能够及时处理:

@Component
public class ConnectionMonitor {
    
    private static final Logger logger = LoggerFactory.getLogger(ConnectionMonitor.class);
    
    @Autowired
    private HikariDataSource primaryDataSource;
    
    @Autowired
    private HikariDataSource secondaryDataSource;
    
    @Scheduled(fixedDelay = 60000)
    public void monitorConnectionPool() {
        try {
            // 检查主数据源连接池状态
            HikariPoolMXBean primaryPoolBean = primaryDataSource.getHikariPoolMXBean();
            int activeConnections = primaryPoolBean.getActiveConnections();
            int idleConnections = primaryPoolBean.getIdleConnections();
            int totalConnections = primaryPoolBean.getTotalConnections();
            
            logger.info("Primary DataSource - Active: {}, Idle: {}, Total: {}", 
                       activeConnections, idleConnections, totalConnections);
            
            // 检查备数据源连接池状态
            HikariPoolMXBean secondaryPoolBean = secondaryDataSource.getHikariPoolMXBean();
            int secondaryActive = secondaryPoolBean.getActiveConnections();
            int secondaryIdle = secondaryPoolBean.getIdleConnections();
            int secondaryTotal = secondaryPoolBean.getTotalConnections();
            
            logger.info("Secondary DataSource - Active: {}, Idle: {}, Total: {}", 
                       secondaryActive, secondaryIdle, secondaryTotal);
            
            // 异常处理逻辑
            if (activeConnections > totalConnections * 0.9) {
                logger.warn("Connection pool usage is too high: {}%", 
                           (activeConnections * 100.0 / totalConnections));
                // 触发告警或自动扩容
                handleHighUsage();
            }
            
        } catch (Exception e) {
            logger.error("Failed to monitor connection pools", e);
            // 处理监控异常,可能需要重启连接池
            handleConnectionPoolError();
        }
    }
    
    private void handleHighUsage() {
        // 实现高使用率处理逻辑
        // 可以触发告警、自动扩容等操作
    }
    
    private void handleConnectionPoolError() {
        // 连接池异常处理
        try {
            primaryDataSource.close();
            secondaryDataSource.close();
            
            // 重新初始化连接池
            initializeConnectionPools();
            
        } catch (Exception e) {
            logger.error("Failed to recover connection pools", e);
            // 记录严重错误,可能需要人工干预
        }
    }
    
    private void initializeConnectionPools() {
        // 重新初始化连接池逻辑
        logger.info("Reinitializing connection pools...");
    }
}

连接池动态调整策略

为了应对不同负载情况下的性能需求,连接池应该具备动态调整能力:

@Component
public class DynamicConnectionPoolManager {
    
    private static final Logger logger = LoggerFactory.getLogger(DynamicConnectionPoolManager.class);
    
    @Autowired
    private HikariDataSource primaryDataSource;
    
    @Autowired
    private HikariDataSource secondaryDataSource;
    
    // 根据负载动态调整连接池大小
    public void adjustPoolSizeBasedOnLoad() {
        try {
            HikariPoolMXBean primaryPoolBean = primaryDataSource.getHikariPoolMXBean();
            HikariPoolMXBean secondaryPoolBean = secondaryDataSource.getHikariPoolMXBean();
            
            // 获取当前负载指标
            double currentLoad = getCurrentSystemLoad();
            int currentActiveConnections = primaryPoolBean.getActiveConnections();
            int currentTotalConnections = primaryPoolBean.getTotalConnections();
            
            if (currentLoad > 0.8 && currentActiveConnections > currentTotalConnections * 0.7) {
                // 高负载情况下增加连接池大小
                increasePoolSize();
            } else if (currentLoad < 0.3 && currentActiveConnections < 5) {
                // 低负载情况下减少连接池大小
                decreasePoolSize();
            }
            
        } catch (Exception e) {
            logger.error("Failed to adjust pool size", e);
        }
    }
    
    private double getCurrentSystemLoad() {
        // 实现系统负载计算逻辑
        return 0.5; // 示例值
    }
    
    private void increasePoolSize() {
        try {
            // 动态调整连接池大小
            HikariConfig config = primaryDataSource.getHikariConfigMXBean();
            int currentMaxPoolSize = config.getMaximumPoolSize();
            int newMaxPoolSize = Math.min(currentMaxPoolSize * 2, 100);
            
            if (newMaxPoolSize > currentMaxPoolSize) {
                config.setMaximumPoolSize(newMaxPoolSize);
                logger.info("Increased primary pool size to: {}", newMaxPoolSize);
            }
        } catch (Exception e) {
            logger.error("Failed to increase pool size", e);
        }
    }
    
    private void decreasePoolSize() {
        try {
            HikariConfig config = primaryDataSource.getHikariConfigMXBean();
            int currentMaxPoolSize = config.getMaximumPoolSize();
            int newMaxPoolSize = Math.max(currentMaxPoolSize / 2, 5);
            
            if (newMaxPoolSize < currentMaxPoolSize) {
                config.setMaximumPoolSize(newMaxPoolSize);
                logger.info("Decreased primary pool size to: {}", newMaxPoolSize);
            }
        } catch (Exception e) {
            logger.error("Failed to decrease pool size", e);
        }
    }
}

分布式事务处理机制

两阶段提交协议实现

在分库分表环境中,分布式事务的处理是核心难点。两阶段提交协议(2PC)是常用的解决方案:

@Component
public class DistributedTransactionManager {
    
    private static final Logger logger = LoggerFactory.getLogger(DistributedTransactionManager.class);
    
    @Autowired
    private TransactionTemplate transactionTemplate;
    
    @Autowired
    private DataSource primaryDataSource;
    
    @Autowired
    private DataSource secondaryDataSource;
    
    public boolean executeDistributedTransaction(List<TransactionalOperation> operations) {
        try {
            // 第一阶段:准备阶段
            List<Boolean> prepareResults = preparePhase(operations);
            
            // 检查所有节点是否都准备好
            if (!prepareResults.stream().allMatch(Boolean.TRUE::equals)) {
                logger.warn("Not all nodes prepared for transaction");
                rollbackPhase(operations);
                return false;
            }
            
            // 第二阶段:提交阶段
            boolean commitResult = commitPhase(operations);
            if (commitResult) {
                logger.info("Distributed transaction committed successfully");
                return true;
            } else {
                logger.warn("Distributed transaction commit failed, rolling back");
                rollbackPhase(operations);
                return false;
            }
            
        } catch (Exception e) {
            logger.error("Distributed transaction failed", e);
            try {
                rollbackPhase(operations);
            } catch (Exception rollbackEx) {
                logger.error("Failed to rollback distributed transaction", rollbackEx);
            }
            return false;
        }
    }
    
    private List<Boolean> preparePhase(List<TransactionalOperation> operations) {
        List<Boolean> results = new ArrayList<>();
        
        for (TransactionalOperation operation : operations) {
            try {
                boolean prepared = prepareOperation(operation);
                results.add(prepared);
            } catch (Exception e) {
                logger.error("Prepare phase failed for operation: {}", operation, e);
                results.add(false);
            }
        }
        
        return results;
    }
    
    private boolean prepareOperation(TransactionalOperation operation) throws Exception {
        // 在每个数据库实例上执行prepare操作
        Connection conn = null;
        try {
            conn = operation.getDataSource().getConnection();
            conn.setAutoCommit(false);
            
            // 执行预处理逻辑
            operation.prepare(conn);
            
            // 标记为已准备
            return true;
        } finally {
            if (conn != null) {
                conn.close();
            }
        }
    }
    
    private boolean commitPhase(List<TransactionalOperation> operations) {
        List<CompletableFuture<Boolean>> futures = new ArrayList<>();
        
        for (TransactionalOperation operation : operations) {
            CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> {
                try {
                    return commitOperation(operation);
                } catch (Exception e) {
                    logger.error("Commit phase failed for operation: {}", operation, e);
                    return false;
                }
            });
            futures.add(future);
        }
        
        // 等待所有提交完成
        try {
            CompletableFuture<Void> allFutures = CompletableFuture.allOf(
                futures.toArray(new CompletableFuture[0]));
            
            allFutures.get(30, TimeUnit.SECONDS); // 30秒超时
            
            return futures.stream().allMatch(future -> {
                try {
                    return future.get(1, TimeUnit.SECONDS);
                } catch (Exception e) {
                    logger.error("Failed to get commit result", e);
                    return false;
                }
            });
            
        } catch (Exception e) {
            logger.error("Commit phase timeout or failed", e);
            return false;
        }
    }
    
    private boolean commitOperation(TransactionalOperation operation) throws Exception {
        Connection conn = null;
        try {
            conn = operation.getDataSource().getConnection();
            conn.setAutoCommit(false);
            
            // 执行提交操作
            operation.commit(conn);
            
            conn.commit();
            return true;
        } catch (Exception e) {
            if (conn != null) {
                conn.rollback();
            }
            throw e;
        } finally {
            if (conn != null) {
                conn.close();
            }
        }
    }
    
    private void rollbackPhase(List<TransactionalOperation> operations) {
        for (TransactionalOperation operation : operations) {
            try {
                rollbackOperation(operation);
            } catch (Exception e) {
                logger.error("Rollback failed for operation: {}", operation, e);
                // 记录错误但继续回滚其他操作
            }
        }
    }
    
    private void rollbackOperation(TransactionalOperation operation) throws Exception {
        Connection conn = null;
        try {
            conn = operation.getDataSource().getConnection();
            conn.setAutoCommit(false);
            
            operation.rollback(conn);
            conn.rollback();
            
        } catch (Exception e) {
            logger.error("Rollback operation failed: {}", operation, e);
            throw e;
        } finally {
            if (conn != null) {
                conn.close();
            }
        }
    }
}

// 事务操作接口
public interface TransactionalOperation {
    void prepare(Connection connection) throws Exception;
    void commit(Connection connection) throws Exception;
    void rollback(Connection connection) throws Exception;
    DataSource getDataSource();
}

TCC(Try-Confirm-Cancel)模式实现

TCC模式是另一种分布式事务处理方案,具有更好的性能和灵活性:

@Component
public class TccTransactionManager {
    
    private static final Logger logger = LoggerFactory.getLogger(TccTransactionManager.class);
    
    @Autowired
    private TccTransactionRepository transactionRepository;
    
    public <T> T executeTccTransaction(String transactionId, 
                                     Supplier<T> tryAction,
                                     Consumer<T> confirmAction,
                                     Consumer<T> cancelAction) {
        T result = null;
        
        try {
            // 1. Try阶段
            logger.info("Starting TCC transaction: {}", transactionId);
            
            // 记录try操作
            TransactionRecord record = new TransactionRecord();
            record.setTransactionId(transactionId);
            record.setStatus(TransactionStatus.TRYING);
            record.setCreateTime(new Date());
            transactionRepository.save(record);
            
            result = tryAction.get();
            
            // 2. Confirm阶段
            logger.info("Confirming TCC transaction: {}", transactionId);
            
            confirmAction.accept(result);
            
            // 更新事务状态为完成
            record.setStatus(TransactionStatus.CONFIRMED);
            transactionRepository.update(record);
            
            logger.info("TCC transaction completed successfully: {}", transactionId);
            return result;
            
        } catch (Exception e) {
            logger.error("TCC transaction failed, starting rollback: {}", transactionId, e);
            
            try {
                // 3. Cancel阶段
                if (result != null) {
                    cancelAction.accept(result);
                }
                
                // 更新事务状态为取消
                TransactionRecord record = transactionRepository.findById(transactionId);
                if (record != null) {
                    record.setStatus(TransactionStatus.CANCELLED);
                    transactionRepository.update(record);
                }
                
            } catch (Exception rollbackEx) {
                logger.error("Failed to rollback TCC transaction: {}", transactionId, rollbackEx);
                // 记录失败但不抛出异常,避免影响主流程
            }
            
            throw new RuntimeException("TCC transaction failed", e);
        }
    }
    
    public void handlePendingTransactions() {
        // 处理未完成的事务(定时任务)
        List<TransactionRecord> pendingRecords = transactionRepository.findPending();
        
        for (TransactionRecord record : pendingRecords) {
            try {
                if (record.getCreateTime().getTime() < System.currentTimeMillis() - 3600000) {
                    // 超时事务处理
                    handleTimeoutTransaction(record);
                }
            } catch (Exception e) {
                logger.error("Failed to handle pending transaction: {}", record.getTransactionId(), e);
            }
        }
    }
    
    private void handleTimeoutTransaction(TransactionRecord record) {
        // 处理超时事务的逻辑
        logger.warn("Handling timeout transaction: {}", record.getTransactionId());
        
        // 可以发送告警、手动处理或自动回滚
        record.setStatus(TransactionStatus.FAILED);
        transactionRepository.update(record);
    }
}

// 事务记录实体
public class TransactionRecord {
    private String transactionId;
    private TransactionStatus status;
    private Date createTime;
    private Date updateTime;
    
    // getters and setters
    public String getTransactionId() { return transactionId; }
    public void setTransactionId(String transactionId) { this.transactionId = transactionId; }
    
    public TransactionStatus getStatus() { return status; }
    public void setStatus(TransactionStatus status) { this.status = status; }
    
    public Date getCreateTime() { return createTime; }
    public void setCreateTime(Date createTime) { this.createTime = createTime; }
    
    public Date getUpdateTime() { return updateTime; }
    public void setUpdateTime(Date updateTime) { this.updateTime = updateTime; }
}

// 事务状态枚举
public enum TransactionStatus {
    TRYING, CONFIRMED, CANCELLED, FAILED
}

数据一致性保障机制

读写分离与数据同步

在分库分表架构中,确保数据一致性是关键挑战。通过合理的读写分离策略和数据同步机制来保障:

@Component
public class DataConsistencyManager {
    
    private static final Logger logger = LoggerFactory.getLogger(DataConsistencyManager.class);
    
    @Autowired
    private DataSource primaryDataSource;
    
    @Autowired
    private DataSource secondaryDataSource;
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    // 读写分离策略
    public <T> T executeReadOperation(Function<Connection, T> operation) {
        Connection conn = null;
        try {
            // 优先使用从库
            conn = secondaryDataSource.getConnection();
            return operation.apply(conn);
        } catch (Exception e) {
            logger.warn("Failed to read from secondary database, falling back to primary", e);
            try {
                // 备用方案:使用主库
                conn = primaryDataSource.getConnection();
                return operation.apply(conn);
            } catch (Exception fallbackEx) {
                logger.error("Failed to read from primary database", fallbackEx);
                throw new RuntimeException("Database read failed", fallbackEx);
            }
        } finally {
            if (conn != null) {
                try {
                    conn.close();
                } catch (SQLException e) {
                    logger.warn("Failed to close connection", e);
                }
            }
        }
    }
    
    public <T> T executeWriteOperation(Function<Connection, T> operation) {
        Connection conn = null;
        try {
            conn = primaryDataSource.getConnection();
            T result = operation.apply(conn);
            
            // 数据同步到缓存
            syncToCache(result);
            
            return result;
        } catch (Exception e) {
            logger.error("Write operation failed", e);
            throw new RuntimeException("Database write failed", e);
        } finally {
            if (conn != null) {
                try {
                    conn.close();
                } catch (SQLException e) {
                    logger.warn("Failed to close connection", e);
                }
            }
        }
    }
    
    private void syncToCache(Object data) {
        try {
            // 同步数据到Redis缓存
            String cacheKey = generateCacheKey(data);
            redisTemplate.opsForValue().set(cacheKey, data, 30, TimeUnit.MINUTES);
            
            logger.debug("Data synced to cache: {}", cacheKey);
        } catch (Exception e) {
            logger.warn("Failed to sync data to cache", e);
            // 缓存同步失败不影响主流程,但需要记录日志
        }
    }
    
    private String generateCacheKey(Object data) {
        // 根据数据生成缓存key的逻辑
        return "data:" + data.hashCode();
    }
    
    // 数据一致性检查
    public boolean checkDataConsistency(String tableName, String condition) {
        try {
            // 检查主从库数据一致性
            Connection primaryConn = null;
            Connection secondaryConn = null;
            
            try {
                primaryConn = primaryDataSource.getConnection();
                secondaryConn = secondaryDataSource.getConnection();
                
                long primaryCount = countRecords(primaryConn, tableName, condition);
                long secondaryCount = countRecords(secondaryConn, tableName, condition);
                
                boolean consistent = primaryCount == secondaryCount;
                logger.info("Data consistency check - Primary: {}, Secondary: {}, Consistent: {}", 
                           primaryCount, secondaryCount, consistent);
                
                return consistent;
                
            } finally {
                if (primaryConn != null) primaryConn.close();
                if (secondaryConn != null) secondaryConn.close();
            }
            
        } catch (Exception e) {
            logger.error("Data consistency check failed", e);
            return false;
        }
    }
    
    private long countRecords(Connection conn, String tableName, String condition) throws SQLException {
        String sql = "SELECT COUNT(*) FROM " + tableName + " WHERE " + condition;
        try (PreparedStatement stmt = conn.prepareStatement(sql);
             ResultSet rs = stmt.executeQuery()) {
            if (rs.next()) {
                return rs.getLong(1);
            }
            return 0;
        }
    }
}

数据补偿机制

针对异常情况下的数据不一致问题,建立完善的数据补偿机制:

@Component
public class DataCompensationManager {
    
    private static final Logger logger = LoggerFactory.getLogger(DataCompensationManager.class);
    
    @Autowired
    private DataSource primaryDataSource;
    
    @Autowired
    private DataSource secondaryDataSource;
    
    @Autowired
    private TaskScheduler taskScheduler;
    
    // 补偿任务队列
    private final BlockingQueue<CompensationTask> compensationQueue = 
        new LinkedBlockingQueue<>();
    
    @PostConstruct
    public void startCompensationProcessor() {
        // 启动补偿任务处理器
        taskScheduler.scheduleAtFixedRate(this::processCompensationTasks, 
                                        Duration.ofMinutes(5));
    }
    
    public void scheduleCompensationTask(String taskId, String operationType, 
                                       Map<String, Object> parameters) {
        CompensationTask task = new CompensationTask();
        task.setTaskId(taskId);
        task.setOperationType(operationType);
        task.setParameters(parameters);
        task.setCreateTime(new Date());
        task.setStatus(CompensationStatus.PENDING);
        
        try {
            compensationQueue.put(task);
            logger.info("Scheduled compensation task: {}", taskId);
        } catch (InterruptedException e) {
            logger.error("Failed to schedule compensation task", e);
            Thread.currentThread().interrupt();
        }
    }
    
    private void processCompensationTasks() {
        List<CompensationTask> tasksToProcess = new ArrayList<>();
        
        // 批量获取待处理任务
        while (!compensationQueue.isEmpty() && tasksToProcess.size() < 10) {
            CompensationTask task = compensationQueue.poll();
            if (task != null) {
                tasksToProcess.add(task);
            }
        }
        
        for (CompensationTask task : tasksToProcess) {
            try {
                processCompensationTask(task);
            } catch (Exception e) {
                logger.error("Failed to process compensation task: {}", task.getTaskId(), e);
                // 重新入队或标记为失败
                if (task.getRetryCount() < 3) {
                    task.setRetryCount(task.getRetryCount() + 1);
                    try {
                        compensationQueue.put(task);
                    } catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }
    }
    
    private void processCompensationTask(CompensationTask task) {
        logger.info("Processing compensation task: {}", task.getTaskId());
        
        try {
            switch (task.getOperationType()) {
                case "INSERT":
                    handleInsertCompensation(task);
                    break;
                case "UPDATE":
                    handleUpdateCompensation(task);
                    break;
                case "DELETE":
                    handleDeleteCompensation(task);
                    break;
                default:
                    logger.warn("Unknown operation type: {}", task.getOperationType());
            }
            
            // 标记任务完成
            task.setStatus(CompensationStatus.COMPLETED);
            logger.info("Compensation task completed: {}", task.getTaskId());
            
        } catch (Exception e) {
            logger.error("Compensation task failed: {}", task.getTaskId(), e);
            task.setStatus(CompensationStatus.FAILED);
            throw new RuntimeException("Compensation failed", e);
        }
    }
    
    private void handleInsertCompensation(CompensationTask task) {
        // 处理插入补偿逻辑
        Map<String, Object> params = task.getParameters();
        String tableName = (String) params.get("tableName");
        String primaryKey = (String) params.get("primaryKey");
        
        try (Connection conn = primaryDataSource.getConnection();
             PreparedStatement stmt = conn.prepareStatement(
                 "DELETE FROM " + tableName + " WHERE " + primaryKey + " = ?")) {
            
            stmt.setObject(1, params.get("keyValue"));
            stmt.executeUpdate();
            
            logger.info("Compensated insert operation for table: {}", tableName);
        } catch (SQLException e) {
            throw new RuntimeException("Failed to compensate insert", e);
        }
    }
    
    private void handleUpdateCompensation(CompensationTask task) {
        // 处理更新补偿逻辑
        Map<String, Object> params = task.getParameters();
        String tableName = (String) params.get("tableName");
        String primaryKey = (String) params.get("primaryKey");
        String oldValue = (String) params.get("oldValue");
        
        try (Connection conn = primaryDataSource.getConnection();
             PreparedStatement stmt = conn.prepareStatement(
                 "UPDATE " + tableName + " SET " + primaryKey + " = ? WHERE " + primaryKey + " = ?")) {
            
            stmt.setObject(1, oldValue);
            stmt.setObject(2, params.get("keyValue"));
            stmt.executeUpdate();
            
            logger.info("Compensated update operation for table: {}", tableName);
        } catch (SQLException e) {
            throw new RuntimeException("Failed to compensate update", e);
        }
    }
    
    private void handleDeleteCompensation(CompensationTask task) {
        // 处理删除补偿逻辑
        Map<String, Object> params = task.getParameters();
        String tableName = (String) params.get("tableName");
        String primaryKey = (String) params.get("primaryKey");
        String recordData = (String) params.get("recordData");
        
        try (Connection conn = primaryDataSource.getConnection();
             PreparedStatement stmt = conn.prepareStatement(
                 "INSERT INTO " + tableName + " (" + primaryKey + ", data) VALUES (?, ?)")) {
            
            stmt.setObject(1, params.get("keyValue"));
            stmt.setString(2, recordData);
            stmt.executeUpdate();
            
            logger.info("Compensated delete operation for table: {}", tableName);
        } catch (SQLException e) {
            throw new RuntimeException("Failed to compensate delete", e);
        }
    }
}

// 补偿任务实体
public class CompensationTask {
    private String taskId;
    private String operationType;
    private Map<String, Object> parameters;
    private Date createTime;
    private int retryCount = 0;
    private CompensationStatus status;
    
    // getters and setters
    public String getTaskId() { return taskId; }
    public void setTaskId(String taskId) { this.taskId = taskId; }
    
    public String get
相关推荐
广告位招租

相似文章

    评论 (0)

    0/2000