Spring Cloud OpenFeign 开启 circuitbreaker 后 Seata 全局事务回滚失败
背景
Spring Cloud 版本为2021.0.9,断路器是Resilience4J,Seata 在该版本下为1.6.1,Seata采用默认的AT模式,未启动断路器之前全局事务回滚正常(启动方式为feign.circuitbreaker.enabled
,配置在FeignClientsConfiguration中CircuitBreakerPresentFeignBuilderConfiguration
方法上,导入了CircuitBreakerFactory
后就会向IOC容器中注入支持Circuit Breaker的builder),启动断路器后出现问题。
排查
熟悉 Seata 的朋友都知道,Seata 是通过一个全局事务ID(XID)来确认不同服务之间的操作属于同一个全局事务,如果没有这 XID 那自然没法进行事务的回滚,在全局事务发起方调用的被调方打开debug日志就能看到 Seata 在收到 TM 发起回滚后打出的日志,可以明显看到XID是null的,这里比较简单就不截图了。
既然能确认是缺少 XID 那现在开始排查,假如在不是很懂 Seata 原理的前提下一个比较简单的排查思路就是:正常 Seata 想要获得当前全局事务的 XID 是通过 RootContext 来获取的(这个看看官方文档就知道了,如果这都不看就别玩了),那么就在getXid()
方法打上断点,同时可以看到该获取方法非常简单,就是在ThreadLocal
中拿一下,既然有get那么必要对ThreadLocal
进行put,就可以在RootContext.bind
中找到相关逻辑,所以这里也打上断点,然后调用下全局事务标记的方法,此时断点肯定会断在bind逻辑中。
查看栈信息就可以找到 XID 就是在DefaultGlobalTransaction
中的public void begin(int timeout, String name) throws TransactionException
方法中得到的,而实际又是通过构造一个GlobalBeginRequest的请求发送给 事务管理器 TM(也就是Seata的服务端)来生成的。
其实到这里有经验的同学就已经大概能猜出问题的答案了,我明明都给当前线程的ThreadLocal
中设置了XID的呀,为啥到参与方就没了。其实就一点,没传递 XID 到全局事务的参与方,不卖关子了,就是引入断路器后,默认采用线程池隔离模式,每次 Feign 调用都会通过 Resilience4j 的线程进行的,而非我们bind xid的Tomcat 的线程,所以导致 XID 就丢了没传到参与方去。
resilience4j的源码就不跟了,注意一下它这个bulkhead的原理就行了,底层是通过JUC中的信号量和线程池两种方式来实现线程隔离的。反正可以明显看到Thread不是一个。
解决方案
无非就是把XID设置到resilience4j的线程中,可以通过ContextPropagator
来做到
1、全局事务发起方pom中导入依赖
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-bulkhead</artifactId>
</dependency>
不导入这个,上面Resilience4JCircuitBreaker中run方法中对bulkheadProvider的非空判断就不成立,具体这个bulkheadProvider怎么创建的就不说了,所以就会通过下面的线程池来执行请求的发起,这里可能也能做一些手脚来进行XID的拷贝,没做研究。
2、实现ContextPropagator
import io.github.resilience4j.core.ContextPropagator;
import io.seata.core.context.RootContext;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Supplier;
/**
* @Description:
* @Author: xuhao
* @Date: 2024/1/20 16:08
*/
public class Resilience4jSeataXidContextPropagator implements ContextPropagator<String> {
@Override
public Supplier<Optional<String>> retrieve() {
return () -> Optional.ofNullable(RootContext.getXID());
}
@Override
public Consumer<Optional<String>> copy() {
return s -> s.ifPresent(RootContext::bind);
}
@Override
public Consumer<Optional<String>> clear() {
return s -> RootContext.unbind();
}
}
3、添加配置
resilience4j:
thread-pool-bulkhead:
configs:
default:
context-propagators:
- cc.xuhao.order.config.Resilience4jSeataXidContextPropagator
XID 传递的原理追踪
Seata 实现RequestInterceptor可以对每个请求进行修改,在这里它把XID加到了请求头。
Seata 又实现了HandlerInterceptor 会在请求到达参与方,参与方的方法执行之前把Header中的XID拿出来绑定到当前线程的ThreadLocal,方法执行后解绑。
结语
有些地方也是大概看了下原理,可能会有理解错误的,但问题解决大致应该没问题。