微服务架构如今非常的流行,这个架构下可能经常会遇到“双写”的场景。双写是指您的应用程序需要在两个不同的系统中更改数据的情况,比如它需要将数据存储在数据库中并向消息队列发送事件。您需要保证这两个操作都会成功。如果两个操作之一失败,您的系统可能会变得不一致。那针对这样的情况有什么好的方法或者设计保证呢?本文就和大家分享一个“发件箱模式”, 可以很好的避免此类问题。
假设我们有一个OrderService类,它在创建新订单时被调用,此时它应该将订单实体保存在数据库中并向交付微服务发送一个事件,以便交付部门可以开始计划交付。
你的代码可能是下面这样子的:
@Service
public record OrderService(
IDeliveryMessageQueueService deliveryMessageQueueService,
IOrderRepository orderRepository,
TransactionTemplate transactionTemplate) implements IOrderService {
@Override
public void create(int id, String description) {
String message = buildMessage(id, description);
transactionTemplate.executeWithoutResult(transactionStatus -> {
// 保存订单
orderRepository.save(id, description);
});
// 发送消息
deliveryMessageQueueService.send(message);
}
private String buildMessage(int id, String description) {
// ...
}
}
可以看到我们在事务中将订单保存在数据库中,然后我们使用消息队列将事件发送到交付服务。这是双写的一个场景。
这么写,会遇到什么问题呢?
首先,如果我们保存了订单但是发送消息失败了怎么办?送货服务永远不会收到消息。
那你可能想到把保存订单和发消息放到同一个事务中不就可以了吗,就是是将deliveryMessageQueueService#send移动到与orderRepository#save相同的事务中,如下图:
transactionTemplate.executeWithoutResult(transactionStatus -> {
// 保存订单
orderRepository.save(id, description);
// 发送消息
deliveryMessageQueueService.send(message);
});
实际上,在数据库事务内部建立 TCP 连接是一种糟糕的做法,我们不应该这样做。
有没有更好的方法呢?
我们可以订单表所在的同一数据库中有一个表“发件箱”(在最简单的情况下,它可以有一个列“消息”和当前时间戳)。保存订单时,在同一个事务中,我们在“发件箱”表中保存了一条消息。消息一发送,我们就可以将其从发件箱表中删除,代码如下:
@Service
public record OrderService(
IDeliveryMessageQueueService deliveryMessageQueueService,
IOrderRepository orderRepository,
IOutboxRepository outboxRepository,
TransactionTemplate transactionTemplate) implements IOrderService {
@Override
public void create(int id, String description) {
UUID outboxId = UUID.randomUUID();
String message = buildMessage(id, description);
transactionTemplate.executeWithoutResult(transactionStatus -> {
// 保存订单
orderRepository.save(id, description);
// 保存到发件箱
outboxRepository.save(new OutboxEntity(outboxId, message));
});
deliveryMessageQueueService.send(message);
// 删除
outboxRepository.delete(outboxId);
}
private String buildMessage(int id, String description) {
// ...
}
}
可以看到,我们在一次事务中将订单和发件箱实体保存在我们的数据库中。然后我们发送一条消息,如果成功,我们删除这条消息。
如果deliveryMessageQueueService#send失败会怎样?(例如,您的应用程序被终止或消息队列或数据库不可用)。在这种情况下,outboxRepository#delete将不会运行,我们必须重试发送消息。
它可以使用将在后台运行的计划任务来完成,该任务将尝试发送在表发件箱中显示超过 X 秒(例如 10 秒)的消息,如下面的代码。
@Service
public record OutboxRetryTask(IOutboxRepository outboxRepository,
IDeliveryMessageQueueService deliveryMessageQueueService) {
@Scheduled(fixedDelayString = "10000")
public void retry() {
List<OutboxEntity> outboxEntities = outboxRepository.findAllBefore(Instant.now().minusSeconds(60));
for (OutboxEntity outbox : outboxEntities) {
deliveryMessageQueueService.send(outbox.message());
outboxRepository.delete(outbox.id());
}
}
}
在这里你可以看到,我们每 10 秒运行一个任务,并发送之前没有发送过的消息。如果消息成功发送到消息队列,但发件箱实体没有从数据库中删除(例如因为数据库问题),那么下次该后台任务将尝试再次将此消息发送到消息队列。但这也意味着我们消息的消费者必须做好幂等处理,因为可能会多次接收相同的消息。
通过上面的例子,我们可以抽象出“发件箱模式”。
发件箱模式虽然听上去可能很简单,但是在平时开发中可能会忽略掉。如果还不能理解,我们可以将它类比到生活的场景,寄信人只需要写好信件,放入收件箱,之后就不用管了。送信的人会来收件箱取走信件,根据信件里需要送到的地址,将信件送至目的地。这样做的好处就是,寄信人写好信之后,就不需要等待收信人有空的时候才能寄信,只需要往发件箱里丢就好了。