最近生产环境出现了一些NPE问题(微服务架构),上游保存了数据库之后,发送MQ消息通知下游,下游服务查询不到上游保存的数据,从而导致了NPE问题。

架构图大致如下:

怀疑一:数据库主从同步延时

可以确定的是数据库是多实例的,如果下游服务在当时读取不到数据,很有可能是数据库主从同步延时,下游服务去从库查询的时候,主库的数据还没有同步到从库,这样就出现了NPE问题。处理这样的问题也简单,让下游读方法强制去读主库就可以了。

按照这个思路调整了代码之后,再次部署到生产环境,第二天没有出问题,但是第三天同样的问题又出现了,也就是说,并不是数据库主从同步延时引起的NPE问题(或者说引起NPE问题的不仅仅是数据库主从同步延时)。为了缩小问题的范围,我去找DBA了解生产环境的数据库架构,得到的答案是:我们生产环境的确有两个实例,但是读写都是一个实例上,另一个实例仅仅作为灾备使用。

怀疑二:下游服务MQ消费过快

上游服务的伪代码如下:

1
2
3
4
5
6
@Transactional(rollbackFor = Exception.class)
public void process{
XXXDAO.save(entityA);
YYYDAO.save(entityB)
MQ.send(entityA.getId());
}

正常情况下这样的代码是没有问题的,数据库保存放在同一个事物中,如果MQ发送失败(抛出异常),数据库事物也会回滚…但是,MQ消息发送成功之后,数据库事物立马就提交了吗?MQ的消费者收到消息时,是否一定可以确定上游的数据库事物已经提交?

MQ消费者收到消息时,是不能确定上游的数据库事物已经提交了的。

  1. 如果上游数据库事物已经提交成功,那么消费者就可以通过ID在数据库中查询到相关的数据,业务流程可以正常继续;
  2. 如果上游的数据库事物还没有提交(仅仅只是当时还没来得及提交,最终会成功提交),那么消费者就无法通过ID在数据库中查询到相关的数据,程序中也就有可能出现NPE,事后在通过log去查询问题,发现数据库中居然存在该ID的数据。

解决方案一:降低下游服务的MQ消费速度

下游出现NPE的根本原因就是上游数据库事物还没有提交,所以程序上稍微等一会就可以解决问题了。

  • 上游服务
    涉及到这样的场景的时候,可以考虑使用延时队列,把要发送的消息放入延时队列,然后等待一定的时候之后再发出去。
  • 下游服务
    如果无法通过MQ查询到数据库中的数据,程序上稍微等待一下,添加一点重试逻辑。

结论:这样的修改方式比较简单,可以解决生产环境大部分的NPE问题,但是并没有从根本上解决数据库和MQ发送消息一致性的问题。

解决方案二:把数据库事物和MQ发送拆开

上游服务把数据库操作和MQ发送放在一个方法中执行,如果我们把数据库操作放在一个方法里,MQ发送放在一个方法里,在数据库操作完成之后,在执行MQ发送逻辑,这样下游消费的时候,不就不会出现NPE问题了吗?!

修改之后上游服务的伪代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void process{
saveAll();
sendMsg();
}

@Transactional(rollbackFor = Exception.class)
public void saveAll(){
XXXDAO.save(entityA);
YYYDAO.save(entityB);
}

public void sendMsg(){
MQ.send(entityA.getId());
}

看到这里,有经验的同学可能就会说了,saveAll() 上事物是不会生效的。的确,这样的写法会导致事物失效,但是如果调用方法改成这样呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class ServiceA {

@Autowired
ServiceA serviceA;

public void process{
serviceA.saveAll();
sendMsg();
}

@Transactional(rollbackFor = Exception.class)
public void saveAll(){
XXXDAO.save(entityA);
YYYDAO.save(entityB);
}

public void sendMsg(){
MQ.send(entityA.getId());
}
}

是不是有点野路子的感觉。

解决方案三:监听数据库事物提交事件,然后发送MQ消息

项目中经常有类似的需要在知道数据库事物提交之后进行的一些操作的需求,Spring对此也提供了自己的解决方案:

  1. 事务同步管理器TransactionSynchronizationManager
  2. @TransactionalEventListener注解

下面我们看看如何通过注解的方式来监听数据库事物提交(伪代码,希望不影响阅读):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Component
public class ServiceA {

@Autowired
private ApplicationEventPublisher applicationEventPublisher;

@Transactional(rollbackFor = Exception.class)
public void process() {
XXXDAO.save(entityA);
YYYDAO.save(entityB)
applicationEventPublisher.publishEvent(new MyEvent(this ,entityA));
}

}

public class MyEvent extends ApplicationEvent {
EntityA entityA;
public MyEvent(Object source, EntityA entityA) {
super(source);
this.entityA = entityA;
}

public EntityA getEntityA() {
return entityA;
}
}

@Component
public class MyListener {

@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void process(MyEvent event) {
MQ.send(event.getEntityA().getId());
}
}

重点关注@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) ,当数据库事物提交之后,就会触发这里的方法执行,把MQ消息的发送放在这里就可以了。

TransactionPhase提供了以下几个枚举值,应该很好理解。

看到这里,相信大家都能知道如何使用@TransactionalEventListener 来解决数据库事物监听的问题了,功能的使用就不再展开了,这里就提几个额外的点:

  1. ApplicationEventPublisher是Spring提供的发布订阅模型,默认是同步执行的,仅仅是实现了代码上的解耦,如果需要异步执行,需要额外添加异步注解。

  2. 使用@TransactionalEventListener 可能会产生获取到的数据库连接为空的情况,这并不是该注解的Bug,源代码注释上已经给出了解决方案:

    1
    org.springframework.transaction.support.TransactionSynchronization#afterCommit

    如果还是不清楚,可以参考:https://github.com/alibaba/druid/issues/1429

最后:虽然可以使用@TransactionalEventListener 解决数据库和MQ一致性的问题,但是项目中还是出现了一些反对的声音:如果原来一个方法中既有数据库操作,又需要按条件发送不同的MQ消息,一旦把数据库操作和MQ发送在代码上隔离开,这样就给后续的维护人员造成了困扰。

所以,封装一个MQ客户端,自动感知当前事物,然后在事物之后发送MQ消息。

=== 未完,待续===