北京做网站哪个好/免费推广平台排行
最近项目当中有个场景问题,需要在数据更新完成之后,发送mq消息通知周边系统进行操作。但是周边系统需要用到当前事务更新完成的数据。经常出现mq消息处理过程中拿不到更新之后的数据。经过排查日志,查看sql执行情况,代码逻辑。还跟组内成员反复查看代码执行逻辑。就是没发现问题点在哪里!
然后只能跟踪debug代码执行过程,终于发现在事务执行过程中,并没看到commit的过程,mq就把消息发出去了。
然后查看源码,发现在事务提交后,提供了afterCommit的方法,需要我们自己来实现。
解决方案:
1、把mq发消息放到afterCommit里来执行
@Transactional(rollbackFor = Exception.class)
@Override
public boolean synchronousData(ReqDTO reqDTO) throws BizException {// 生成任务idfinal Long taskId = generator.gain();if(!CollectionUtils.isEmpty(ids)){// 更新taskIdList<Long> idList = ids.stream().map(Long::valueOf).collect(Collectors.toList());userMapper.updateBatchTaskId(idList,taskId,new Date());TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {// 事务完成后,进行消息发送@Overridepublic void afterCommit() {// 发送任务消息TaskCreateDTO taskCreateDTO = new TaskCreateDTO();taskCreateDTO.setType(1);taskCreateDTO.setTaskId(taskId);boolean result = produceHandler.sendTaskCreateMessage(taskCreateDTO);log.debug("sendTaskCreateMessage taskId:{},result:{}", taskId, result);}});}else {return false;}return true;
}
2、把mq发消息的逻辑移出@Transactional包含的事务代码块
@Transactional(rollbackFor = Exception.class)
@Override
public boolean synchronousData(ReqDTO reqDTO) throws BizException {// 生成任务idfinal Long taskId = generator.gain();if(!CollectionUtils.isEmpty(ids)){// 更新taskIdList<Long> idList = ids.stream().map(Long::valueOf).collect(Collectors.toList());userMapper.updateBatchTaskId(idList,taskId,new Date());}else {return false;}return true;
}// 发送消息
private void sendMessage(Long taskId){// 发送任务消息TaskCreateDTO taskCreateDTO = new TaskCreateDTO();taskCreateDTO.setType(1);taskCreateDTO.setTaskId(taskId);boolean result = produceHandler.sendTaskCreateMessage(taskCreateDTO);log.debug("sendTaskCreateMessage taskId:{},result:{}", taskId, result);
}
源码学习:
// 事务同步器
public interface TransactionSynchronization extends Flushable {int STATUS_COMMITTED = 0;int STATUS_ROLLED_BACK = 1;int STATUS_UNKNOWN = 2;default void suspend() {}default void resume() {}default void flush() {}// 事务提交前的操作default void beforeCommit(boolean readOnly) {}default void beforeCompletion() {}// 事务提交后的操作default void afterCommit() {}default void afterCompletion(int status) {}
}
-------------欢迎各位留言交流学习,如有不正确的地方,请予以指正。【Q:981233589】