在微服务架构日益普及的今天,分布式事务成为了一个不可回避的技术难题。传统的单体应用中,我们可以依赖数据库的ACID特性来保证事务的一致性,但在微服务架构下,业务逻辑被拆分到不同的服务中,每个服务都有自己的数据库,这就导致了跨服务的事务处理变得异常复杂。
本文将深入探讨微服务架构下的分布式事务解决方案,重点分析Saga模式和TCC模式的实现原理,并提供详细的代码示例和最佳实践。
分布式事务的挑战
在深入具体解决方案之前,我们首先需要理解分布式事务面临的核心挑战:
1. 数据一致性问题
在微服务架构中,一个业务操作可能涉及多个服务的数据库操作。如果其中任何一个操作失败,就需要保证所有已经成功的操作都能回滚,这在分布式环境下实现起来非常困难。
2. 网络不可靠性
微服务之间通过网络通信,网络延迟、超时、中断等问题都可能导致事务状态不一致。
3. 服务可用性
参与事务的服务可能因为各种原因不可用,这会影响整个事务的执行。
4. 性能问题
分布式事务通常需要跨多个服务协调,这会带来额外的性能开销。
分布式事务解决方案概述
目前主流的分布式事务解决方案主要包括:
- 两阶段提交(2PC):传统但存在性能和可用性问题
- Saga模式:适合长事务,通过补偿机制保证最终一致性
- TCC模式:Try-Confirm-Cancel,通过预留资源的方式实现一致性
- 本地消息表:通过消息队列保证最终一致性
- 最大努力通知:适用于对一致性要求不严格的场景
本文将重点探讨Saga模式和TCC模式这两种在微服务架构中应用最为广泛的解决方案。
Saga模式详解
Saga模式基本概念
Saga模式是一种分布式事务解决方案,它将一个长事务拆分成多个短的本地事务,每个本地事务都有对应的补偿事务。当某个步骤失败时,通过执行之前步骤的补偿事务来保证事务的最终一致性。
Saga模式有两种实现方式:
- 事件驱动Saga:通过事件来协调各个服务的执行
- 命令协调Saga:通过专门的协调器来管理整个事务流程
事件驱动Saga实现
让我们通过一个电商订单处理的例子来演示事件驱动Saga的实现:
核心组件设计
// Saga事务状态枚举
public enum SagaStatus {
STARTED, // 已开始
SUCCESS, // 成功
FAILED, // 失败
COMPENSATING // 补偿中
}
// Saga步骤状态
public enum StepStatus {
PENDING, // 待执行
EXECUTING, // 执行中
SUCCESS, // 成功
FAILED, // 失败
COMPENSATING, // 补偿中
COMPENSATED // 已补偿
}
// Saga事务实体
@Entity
@Table(name = "saga_transaction")
public class SagaTransaction {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String transactionId;
@Enumerated(EnumType.STRING)
private SagaStatus status;
private String businessType;
private Date createTime;
private Date updateTime;
@OneToMany(mappedBy = "sagaTransaction", cascade = CascadeType.ALL)
private List<SagaStep> steps = new ArrayList<>();
// getters and setters
}
// Saga步骤实体
@Entity
@Table(name = "saga_step")
public class SagaStep {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@ManyToOne
@JoinColumn(name = "transaction_id")
private SagaTransaction sagaTransaction;
private String stepName;
private String serviceUrl;
private String compensationUrl;
private String requestData;
private String responseData;
@Enumerated(EnumType.STRING)
private StepStatus status;
private Integer stepOrder;
private Date createTime;
private Date updateTime;
// getters and setters
}
Saga协调器实现
@Service
@Transactional
public class SagaOrchestrator {
@Autowired
private SagaTransactionRepository transactionRepository;
@Autowired
private SagaStepRepository stepRepository;
@Autowired
private RestTemplate restTemplate;
/**
* 开始Saga事务
*/
public String startSaga(String businessType, List<SagaStepDefinition> stepDefinitions) {
// 创建Saga事务
SagaTransaction transaction = new SagaTransaction();
transaction.setTransactionId(UUID.randomUUID().toString());
transaction.setBusinessType(businessType);
transaction.setStatus(SagaStatus.STARTED);
transaction.setCreateTime(new Date());
transaction.setUpdateTime(new Date());
transaction = transactionRepository.save(transaction);
// 创建Saga步骤
List<SagaStep> steps = new ArrayList<>();
for (int i = 0; i < stepDefinitions.size(); i++) {
SagaStepDefinition def = stepDefinitions.get(i);
SagaStep step = new SagaStep();
step.setSagaTransaction(transaction);
step.setStepName(def.getStepName());
step.setServiceUrl(def.getServiceUrl());
step.setCompensationUrl(def.getCompensationUrl());
step.setStepOrder(i);
step.setStatus(StepStatus.PENDING);
step.setCreateTime(new Date());
step.setUpdateTime(new Date());
steps.add(step);
}
stepRepository.saveAll(steps);
// 执行Saga步骤
executeSagaSteps(transaction, steps);
return transaction.getTransactionId();
}
/**
* 执行Saga步骤
*/
private void executeSagaSteps(SagaTransaction transaction, List<SagaStep> steps) {
for (SagaStep step : steps) {
try {
// 更新步骤状态为执行中
step.setStatus(StepStatus.EXECUTING);
step.setUpdateTime(new Date());
stepRepository.save(step);
// 调用服务执行步骤
ResponseEntity<String> response = restTemplate.postForEntity(
step.getServiceUrl(),
step.getRequestData(),
String.class
);
// 更新步骤状态为成功
step.setStatus(StepStatus.SUCCESS);
step.setResponseData(response.getBody());
step.setUpdateTime(new Date());
stepRepository.save(step);
} catch (Exception e) {
// 步骤执行失败,开始补偿
step.setStatus(StepStatus.FAILED);
step.setUpdateTime(new Date());
stepRepository.save(step);
// 执行补偿
compensate(transaction, steps, step.getStepOrder());
return;
}
}
// 所有步骤执行成功
transaction.setStatus(SagaStatus.SUCCESS);
transaction.setUpdateTime(new Date());
transactionRepository.save(transaction);
}
/**
* 执行补偿
*/
private void compensate(SagaTransaction transaction, List<SagaStep> steps, int failedStepIndex) {
transaction.setStatus(SagaStatus.COMPENSATING);
transaction.setUpdateTime(new Date());
transactionRepository.save(transaction);
// 从失败的步骤开始向前补偿
for (int i = failedStepIndex - 1; i >= 0; i--) {
SagaStep step = steps.get(i);
if (step.getStatus() == StepStatus.SUCCESS) {
try {
step.setStatus(StepStatus.COMPENSATING);
step.setUpdateTime(new Date());
stepRepository.save(step);
// 调用补偿服务
restTemplate.postForEntity(
step.getCompensationUrl(),
step.getResponseData(),
String.class
);
step.setStatus(StepStatus.COMPENSATED);
step.setUpdateTime(new Date());
stepRepository.save(step);
} catch (Exception e) {
// 补偿失败,记录日志,需要人工干预
log.error("Compensation failed for step: " + step.getStepName(), e);
}
}
}
transaction.setStatus(SagaStatus.FAILED);
transaction.setUpdateTime(new Date());
transactionRepository.save(transaction);
}
}
服务实现示例
@RestController
@RequestMapping("/order")
public class OrderService {
@Autowired
private OrderRepository orderRepository;
@Autowired
private SagaStepRepository sagaStepRepository;
/**
* 创建订单
*/
@PostMapping("/create")
public ResponseEntity<String> createOrder(@RequestBody OrderCreateRequest request) {
try {
// 创建订单
Order order = new Order();
order.setUserId(request.getUserId());
order.setProductId(request.getProductId());
order.setQuantity(request.getQuantity());
order.setAmount(request.getAmount());
order.setStatus(OrderStatus.CREATED);
order.setCreateTime(new Date());
order = orderRepository.save(order);
return ResponseEntity.ok(order.getId().toString());
} catch (Exception e) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body("Order creation failed: " + e.getMessage());
}
}
/**
* 补偿订单创建
*/
@PostMapping("/compensate")
public ResponseEntity<String> compensateOrder(@RequestBody String orderId) {
try {
Order order = orderRepository.findById(Long.valueOf(orderId)).orElse(null);
if (order != null) {
order.setStatus(OrderStatus.CANCELLED);
order.setUpdateTime(new Date());
orderRepository.save(order);
}
return ResponseEntity.ok("Order compensated successfully");
} catch (Exception e) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body("Order compensation failed: " + e.getMessage());
}
}
}
@RestController
@RequestMapping("/inventory")
public class InventoryService {
@Autowired
private InventoryRepository inventoryRepository;
/**
* 扣减库存
*/
@PostMapping("/deduct")
public ResponseEntity<String> deductInventory(@RequestBody InventoryDeductRequest request) {
try {
Inventory inventory = inventoryRepository.findByProductId(request.getProductId());
if (inventory == null || inventory.getQuantity() < request.getQuantity()) {
return ResponseEntity.status(HttpStatus.BAD_REQUEST)
.body("Insufficient inventory");
}
inventory.setQuantity(inventory.getQuantity() - request.getQuantity());
inventory.setUpdateTime(new Date());
inventoryRepository.save(inventory);
return ResponseEntity.ok("Inventory deducted successfully");
} catch (Exception e) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body("Inventory deduction failed: " + e.getMessage());
}
}
/**
* 补偿库存扣减
*/
@PostMapping("/compensate")
public ResponseEntity<String> compensateInventory(@RequestBody InventoryDeductRequest request) {
try {
Inventory inventory = inventoryRepository.findByProductId(request.getProductId());
if (inventory != null) {
inventory.setQuantity(inventory.getQuantity() + request.getQuantity());
inventory.setUpdateTime(new Date());
inventoryRepository.save(inventory);
}
return ResponseEntity.ok("Inventory compensated successfully");
} catch (Exception e) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body("Inventory compensation failed: " + e.getMessage());
}
}
}
基于消息队列的Saga实现
另一种常见的Saga实现方式是基于消息队列,这样可以更好地解耦服务之间的依赖:
@Component
public class SagaMessageHandler {
@Autowired
private SagaOrchestrator sagaOrchestrator;
/**
* 处理Saga开始消息
*/
@RabbitListener(queues = "saga.start.queue")
public void handleSagaStart(SagaStartMessage message) {
try {
String transactionId = sagaOrchestrator.startSaga(
message.getBusinessType(),
message.getStepDefinitions()
);
// 发送Saga开始成功消息
SagaStartResultMessage result = new SagaStartResultMessage();
result.setTransactionId(transactionId);
result.setSuccess(true);
// 发送到结果队列
rabbitTemplate.convertAndSend("saga.result.queue", result);
} catch (Exception e) {
log.error("Saga start failed", e);
}
}
/**
* 处理步骤执行结果
*/
@RabbitListener(queues = "saga.step.result.queue")
public void handleStepResult(SagaStepResultMessage message) {
// 根据步骤执行结果决定是否继续执行下一个步骤或开始补偿
// 实现逻辑省略...
}
}
TCC模式详解
TCC模式基本概念
TCC(Try-Confirm-Cancel)模式是一种分布式事务解决方案,它要求业务逻辑实现三个操作:
- Try:预留资源,检查业务规则
- Confirm:确认执行,真正提交业务操作
- Cancel:取消执行,释放预留的资源
TCC模式的核心思想是通过预留资源的方式来避免分布式事务中的资源竞争问题。
TCC模式实现
TCC参与者接口定义
public interface TccParticipant {
/**
* Try阶段:预留资源
*/
boolean tryExecute(TccContext context);
/**
* Confirm阶段:确认执行
*/
boolean confirm(TccContext context);
/**
* Cancel阶段:取消执行
*/
boolean cancel(TccContext context);
}
// TCC上下文
public class TccContext {
private String transactionId;
private String businessType;
private Map<String, Object> parameters;
private Map<String, Object> reservedResources;
// constructors, getters and setters
}
TCC事务管理器
@Service
@Transactional
public class TccTransactionManager {
@Autowired
private TccTransactionRepository transactionRepository;
@Autowired
private TccParticipantRepository participantRepository;
private ThreadLocal<TccTransaction> currentTransaction = new ThreadLocal<>();
/**
* 开始TCC事务
*/
public String beginTransaction(String businessType) {
TccTransaction transaction = new TccTransaction();
transaction.setTransactionId(UUID.randomUUID().toString());
transaction.setBusinessType(businessType);
transaction.setStatus(TccStatus.TRYING);
transaction.setCreateTime(new Date());
transaction.setUpdateTime(new Date());
transaction = transactionRepository.save(transaction);
currentTransaction.set(transaction);
return transaction.getTransactionId();
}
/**
* 注册TCC参与者
*/
public void registerParticipant(String participantId, TccParticipant participant) {
TccTransaction transaction = currentTransaction.get();
if (transaction == null) {
throw new RuntimeException("No active TCC transaction");
}
TccParticipantRecord record = new TccParticipantRecord();
record.setTransactionId(transaction.getTransactionId());
record.setParticipantId(participantId);
record.setStatus(TccParticipantStatus.REGISTERED);
record.setCreateTime(new Date());
record.setUpdateTime(new Date());
participantRepository.save(record);
}
/**
* 执行Try阶段
*/
public boolean tryExecute(TccContext context) {
TccTransaction transaction = currentTransaction.get();
if (transaction == null) {
throw new RuntimeException("No active TCC transaction");
}
List<TccParticipantRecord> participants = participantRepository
.findByTransactionId(transaction.getTransactionId());
// 执行所有参与者的Try操作
for (TccParticipantRecord record : participants) {
try {
TccParticipant participant = getParticipant(record.getParticipantId());
boolean result = participant.tryExecute(context);
if (!result) {
// Try失败,标记事务为失败
transaction.setStatus(TccStatus.FAILED);
transaction.setUpdateTime(new Date());
transactionRepository.save(transaction);
return false;
}
record.setStatus(TccParticipantStatus.TRIED);
record.setUpdateTime(new Date());
participantRepository.save(record);
} catch (Exception e) {
log.error("Try phase failed for participant: " + record.getParticipantId(), e);
transaction.setStatus(TccStatus.FAILED);
transaction.setUpdateTime(new Date());
transactionRepository.save(transaction);
return false;
}
}
return true;
}
/**
* 执行Confirm阶段
*/
public void confirm(TccContext context) {
TccTransaction transaction = currentTransaction.get();
if (transaction == null) {
throw new RuntimeException("No active TCC transaction");
}
List<TccParticipantRecord> participants = participantRepository
.findByTransactionId(transaction.getTransactionId());
// 按照逆序执行Confirm操作
Collections.reverse(participants);
for (TccParticipantRecord record : participants) {
if (record.getStatus() == TccParticipantStatus.TRIED) {
try {
TccParticipant participant = getParticipant(record.getParticipantId());
participant.confirm(context);
record.setStatus(TccParticipantStatus.CONFIRMED);
record.setUpdateTime(new Date());
participantRepository.save(record);
} catch (Exception e) {
log.error("Confirm phase failed for participant: " + record.getParticipantId(), e);
// Confirm失败需要人工干预
throw new RuntimeException("Confirm failed", e);
}
}
}
transaction.setStatus(TccStatus.CONFIRMED);
transaction.setUpdateTime(new Date());
transactionRepository.save(transaction);
}
/**
* 执行Cancel阶段
*/
public void cancel(TccContext context) {
TccTransaction transaction = currentTransaction.get();
if (transaction == null) {
throw new RuntimeException("No active TCC transaction");
}
List<TccParticipantRecord> participants = participantRepository
.findByTransactionId(transaction.getTransactionId());
// 按照逆序执行Cancel操作
Collections.reverse(participants);
for (TccParticipantRecord record : participants) {
if (record.getStatus() == TccParticipantStatus.TRIED) {
try {
TccParticipant participant = getParticipant(record.getParticipantId());
participant.cancel(context);
record.setStatus(TccParticipantStatus.CANCELLED);
record.setUpdateTime(new Date());
participantRepository.save(record);
} catch (Exception e) {
log.error("Cancel phase failed for participant: " + record.getParticipantId(), e);
// Cancel失败需要人工干预
throw new RuntimeException("Cancel failed", e);
}
}
}
transaction.setStatus(TccStatus.CANCELLED);
transaction.setUpdateTime(new Date());
transactionRepository.save(transaction);
}
private TccParticipant getParticipant(String participantId) {
// 根据参与者ID获取对应的参与者实例
// 实现细节省略...
return null;
}
}
TCC参与者实现示例
@Component("orderTccParticipant")
public class OrderTccParticipant implements TccParticipant {
@Autowired
private OrderRepository orderRepository;
@Override
public boolean tryExecute(TccContext context) {
try {
OrderCreateRequest request = (OrderCreateRequest)
context.getParameters().get("orderRequest");
// 检查用户是否存在
if (!userService.exists(request.getUserId())) {
return false;
}
// 检查商品是否存在
if (!productService.exists(request.getProductId())) {
return false;
}
// 预留订单资源
Order order = new Order();
order.setUserId(request.getUserId());
order.setProductId(request.getProductId());
order.setQuantity(request.getQuantity());
order.setAmount(request.getAmount());
order.setStatus(OrderStatus.RESERVED);
order.setCreateTime(new Date());
order = orderRepository.save(order);
// 保存预留的订单ID到上下文中
context.getReservedResources().put("orderId", order.getId());
return true;
} catch (Exception e) {
log.error("Order try phase failed", e);
return false;
}
}
@Override
public boolean confirm(TccContext context) {
try {
Long orderId = (Long) context.getReservedResources().get("orderId");
Order order = orderRepository.findById(orderId).orElse(null);
if (order != null && order.getStatus() == OrderStatus.RESERVED) {
order.setStatus(OrderStatus.CONFIRMED);
order.setUpdateTime(new Date());
orderRepository.save(order);
return true;
}
return false;
} catch (Exception e) {
log.error("Order confirm phase failed", e);
return false;
}
}
@Override
public boolean cancel(TccContext context) {
try {
Long orderId = (Long) context.getReservedResources().get("orderId");
Order order = orderRepository.findById(orderId).orElse(null);
if (order != null && order.getStatus() == OrderStatus.RESERVED) {
order.setStatus(OrderStatus.CANCELLED);
order.setUpdateTime(new Date());
orderRepository.save(order);
return true;
}
return false;
} catch (Exception e) {
log.error("Order cancel phase failed", e);
return false;
}
}
}
@Component("inventoryTccParticipant")
public class InventoryTccParticipant implements TccParticipant {
@Autowired
private InventoryRepository inventoryRepository;
@Override
public boolean tryExecute(TccContext context) {
try {
InventoryDeductRequest request = (InventoryDeductRequest)
context.getParameters().get("inventoryRequest");
Inventory inventory = inventoryRepository.findByProductId(request.getProductId());
if (inventory == null || inventory.getQuantity() < request.getQuantity()) {
return false;
}
// 预留库存
inventory.setReservedQuantity(inventory.getReservedQuantity() + request.getQuantity());
inventory.setUpdateTime(new Date());
inventoryRepository.save(inventory);
// 保存预留信息到上下文
context.getReservedResources().put("productId", request.getProductId());
context.getReservedResources().put("reservedQuantity", request.getQuantity());
return true;
} catch (Exception e) {
log.error("Inventory try phase failed", e);
return false;
}
}
@Override
public boolean confirm(TccContext context) {
try {
Long productId = (Long) context.getReservedResources().get("productId");
Integer reservedQuantity = (Integer) context.getReservedResources().get("reservedQuantity");
Inventory inventory = inventoryRepository.findByProductId(productId);
if (inventory != null) {
inventory.setQuantity(inventory.getQuantity() - reservedQuantity);
inventory.setReservedQuantity(inventory.getReservedQuantity() - reservedQuantity);
inventory.setUpdateTime(new Date());
inventoryRepository.save(inventory);
return true;
}
return false;
} catch (Exception e) {
log.error("Inventory confirm phase failed", e);
return false;
}
}
@Override
public boolean cancel(TccContext context) {
try {
Long productId = (Long) context.getReservedResources().get("productId");
Integer reservedQuantity = (Integer) context.getReservedResources().get("reservedQuantity");
Inventory inventory = inventoryRepository.findByProductId(productId);
if (inventory != null) {
inventory.setReservedQuantity(inventory.getReservedQuantity() - reservedQuantity);
inventory.setUpdateTime(new Date());
inventoryRepository.save(inventory);
return true;
}
return false;
} catch (Exception e) {
log.error("Inventory cancel phase failed", e);
return false;
}
}
}
业务服务使用TCC
@Service
public class OrderBusinessService {
@Autowired
private TccTransactionManager transactionManager;
@Autowired
@Qualifier("orderTccParticipant")
private TccParticipant orderParticipant;
@Autowired
@Qualifier("inventoryTccParticipant")
private TccParticipant inventoryParticipant;
@Autowired
@Qualifier("paymentTccParticipant")
private TccParticipant paymentParticipant;
/**
* 创建订单业务方法
*/
public String createOrder(OrderCreateRequest request) {
// 开始TCC事务
String transactionId = transactionManager.beginTransaction("CREATE_ORDER");
try {
TccContext context = new TccContext();
context.setTransactionId(transactionId);
context.setBusinessType("CREATE_ORDER");
Map<String, Object> parameters = new HashMap<>();
parameters.put("orderRequest", request);
parameters.put("inventoryRequest", new InventoryDeductRequest(
request.getProductId(), request.getQuantity()));
parameters.put("paymentRequest", new PaymentRequest(
request.getUserId(), request.getAmount()));
context.setParameters(parameters);
// 注册参与者
transactionManager.registerParticipant("order", orderParticipant);
transactionManager.registerParticipant("inventory", inventoryParticipant);
transactionManager.registerParticipant("payment", paymentParticipant);
// 执行Try阶段
boolean tryResult = transactionManager.tryExecute(context);
if (!tryResult) {
// Try阶段失败,执行Cancel
transactionManager.cancel(context);
throw new RuntimeException("Order creation failed in try phase");
}
// Try阶段成功,执行Confirm
transactionManager.confirm(context);
return transactionId;
} catch (Exception e) {
log.error("Order creation failed", e);
throw new RuntimeException("Order creation failed", e);
}
}
}
本地消息表模式
本地消息表是另一种常用的分布式事务解决方案,它通过在本地数据库中维护消息表来保证消息的可靠投递。
本地消息表实现
@Entity
@Table(name = "message")
public class Message {
@Id
private String messageId;
private String messageType;
private String content;
@Enumerated(EnumType.STRING)
private MessageStatus status;
private String targetService;
private Integer retryCount;
private Date createTime;
private Date updateTime;
// getters and setters
}
public enum MessageStatus {
PENDING, // 待发送
SENT, // 已发送
FAILED, // 发送失败
PROCESSED // 已处理
}
@Service
public class MessageService {
@Autowired
private MessageRepository messageRepository;
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送消息
*/
@Transactional
public void sendMessage(String messageType, String content, String targetService) {
Message message = new Message();
message.setMessageId(UUID.randomUUID().toString());
message.setMessageType(messageType);
message.setContent(content);
message.setStatus(MessageStatus.PENDING);
message.setTargetService(targetService);
message.setRetryCount(0);
message.setCreateTime(new Date());
message.setUpdateTime(new Date());
messageRepository.save(message);
// 异步发送消息
asyncSendMessage(message);
}
/**
* 异步发送消息
*/
@Async
public void asyncSendMessage(Message message) {
try {
rabbitTemplate.convertAndSend(message.getTargetService() + ".queue", message);
message.setStatus(MessageStatus.SENT);
message.setUpdateTime(new Date());
messageRepository.save(message);
} catch (Exception e) {
log.error("Message send failed: " + message.getMessageId(), e);
message.setRetryCount(message.getRetryCount() + 1);
message.setUpdateTime(new Date());
messageRepository.save(message);
// 重试机制
if (message.getRetryCount() < 3) {
// 延迟重试
retryMessage(message);
} else {
message.setStatus(MessageStatus.FAILED);
message.setUpdateTime(new Date());
messageRepository.save(message);
}
}
}
/**
* 处理接收到的消息
*/
@RabbitListener(queues = "order.service.queue")
public void handleMessage(Message message) {
try {
// 处理业务逻辑
process
评论 (0)