利用springEvent,进行服务内部领域事件处理
SpringEvent的最佳实践
文章目录
- SpringEvent的最佳实践
- 前言
- 一、领域事件是什么
- 二、使用步骤
- 1.事务提交之后才发送domainEvent
- 2.异步处理事件
- 总结
前言
相信java boy对于Spring Event,多多少少都有一些熟悉了。如果不了解的,可以参考我的这篇文章:Spring的Event编程以及实现原理
一、领域事件是什么
简单说领域事件,其实就是对PO的封装。我这边要说明的并不是DDD对系统落地的改造。而是借鉴DDD的思想,对用户操作的实体-entity,进行封装成一个domainEvent结构。有了这个domainEvent,有两个好处:
- 可以对entity进行防腐。该domainEvent作为后续操作的dto,不关注如何通过entity构建的domainEvent,只需要基于该domainEvent进行操作即可
- 可以提高并行开发的效率。对entity操作的代码和后续操作的代码,可以让不同的研发同时进行
二、使用步骤
针对B端,流量并没有这么大的服务,没有必要考虑分布式事务。但是接入一个自产自销的MQ,感觉又有点过重。还不如SpringEvent来自己处理
1.事务提交之后才发送domainEvent
我们需要考虑的问题是,对于entity进行的操作,一定是写库操作,必然涉及到事务。针对业务流程来说,必然需要保证事务提交成功之后,再进行后续操作
此时可以使用Spring事务的钩子函数,让事务提交成功之后才可以进行操作。
@Transactional(rollbackFor = Exception.class)
public void doProcess(User user) {
// 1.根据entity构建domainEvent的dto,作为防腐层
UserMsgBO userMsgBO = buildMsgBO(user);
// 2. 注册事务钩子函数,让事务提交之后,进行Event的publish
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {@Overridepublic void afterCommit() {log.info("publish event, userMsgBO:{}", JSON.toJSONString(userMsgBO));applicationEventPublisher.publishEvent(userMsgBO);}});
}
//3. 数据库操作
2.异步处理事件
需要考虑日志打印问题
根据Spring的Event编程以及实现原理文章里写的,如果要进行事件处理的异步进行,需要进行实例化一个SimpleApplicationEventMulticaster,并设置其taskExecutor属性。实际上呢,还可以使用@Async注解来实现:@Async的使用及原理
@Async("springEventExecutor")
@EventListener(UserMsgBO.class)
public void doSendMsg(UserMsgBO userMsgBO) {log.info("doSendMsg, msg:[{}]", JSON.toJSONString(userMsgBO));try {// 具体的处理逻辑} catch (Exception e) {log.error("doSendMsg error, msg:[{}]", JSON.toJSONString(userMsgBO), e);}
}
可以看到,此时给@Async注解传入了一个线程池:springEventExecutor。还差最后一步,怎么第二节提出的问题,使用了线程池实现了异步,那么ThreadLocal里的traceId是不是丢了呢?
相信大家应该都知道阿里的TransmittableThreadLocal,也知道具体的场景
@Bean("springEventExecutor")
public Executor springEventExecutor() {ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 60L, TimeUnit.SECONDS,new LinkedBlockingQueue<>(1000),new ThreadFactoryBuilder().setNameFormat("spring-event-thread-pool-%d").build(),new ThreadPoolExecutor.CallerRunsPolicy());// 利用TtlExecutors包装一下线程池,可以在日志中打印traceIdreturn TtlExecutors.getTtlExecutorService(threadPoolExecutor);
}
当然保险起见,还可以针对消费domainEvent的doSendMsg方法增加一个后门,防止服务重启时打断异步流程。
当然,如果对于数据一致性要求比较高,那么就借鉴B端系统的最终一致性方案了:可以将domainEvent进行持久化,然后通过定时任务来保证最终一致性。
总结
B端系统开发,需要考虑:
- 事务
- 兼顾性能
没有银子弹,本文只是给出了一种常见场景的常见解决方案。