Spring Cloud OpenFeign 开启 circuitbreaker 后 Seata 全局事务回滚失败
背景
Spring Cloud 版本为2021.0.9,断路器是Resilience4J,Seata 在该版本下为1.6.1,Seata采用默认的AT模式,未启动断路器之前全局事务回滚正常(启动方式为feign.circuitbreaker.enabled,配置在FeignClientsConfiguration中CircuitBreakerPresentFeignBuilderConfiguration方法上,导入了CircuitBreakerFactory后就会向IOC容器中注入支持Circuit Breaker的builder),启动断路器后出现问题。
排查
熟悉 Seata 的朋友都知道,Seata 是通过一个全局事务ID(XID)来确认不同服务之间的操作属于同一个全局事务,如果没有这 XID 那自然没法进行事务的回滚,在全局事务发起方调用的被调方打开debug日志就能看到 Seata 在收到 TM 发起回滚后打出的日志,可以明显看到XID是null的,这里比较简单就不截图了。
既然能确认是缺少 XID 那现在开始排查,假如在不是很懂 Seata 原理的前提下一个比较简单的排查思路就是:正常 Seata 想要获得当前全局事务的 XID 是通过 RootContext 来获取的(这个看看官方文档就知道了,如果这都不看就别玩了),那么就在getXid()方法打上断点,同时可以看到该获取方法非常简单,就是在ThreadLocal中拿一下,既然有get那么必要对ThreadLocal进行put,就可以在RootContext.bind中找到相关逻辑,所以这里也打上断点,然后调用下全局事务标记的方法,此时断点肯定会断在bind逻辑中。

查看栈信息就可以找到 XID 就是在DefaultGlobalTransaction中的public void begin(int timeout, String name) throws TransactionException方法中得到的,而实际又是通过构造一个GlobalBeginRequest的请求发送给 事务管理器 TM(也就是Seata的服务端)来生成的。

其实到这里有经验的同学就已经大概能猜出问题的答案了,我明明都给当前线程的ThreadLocal中设置了XID的呀,为啥到参与方就没了。其实就一点,没传递 XID 到全局事务的参与方,不卖关子了,就是引入断路器后,默认采用线程池隔离模式,每次 Feign 调用都会通过 Resilience4j 的线程进行的,而非我们bind xid的Tomcat 的线程,所以导致 XID 就丢了没传到参与方去。


resilience4j的源码就不跟了,注意一下它这个bulkhead的原理就行了,底层是通过JUC中的信号量和线程池两种方式来实现线程隔离的。反正可以明显看到Thread不是一个。
解决方案
无非就是把XID设置到resilience4j的线程中,可以通过ContextPropagator来做到
1、全局事务发起方pom中导入依赖
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-bulkhead</artifactId>
</dependency>
不导入这个,上面Resilience4JCircuitBreaker中run方法中对bulkheadProvider的非空判断就不成立,具体这个bulkheadProvider怎么创建的就不说了,所以就会通过下面的线程池来执行请求的发起,这里可能也能做一些手脚来进行XID的拷贝,没做研究。
2、实现ContextPropagator
import io.github.resilience4j.core.ContextPropagator;
import io.seata.core.context.RootContext;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Supplier;
/**
* @Description:
* @Author: xuhao
* @Date: 2024/1/20 16:08
*/
public class Resilience4jSeataXidContextPropagator implements ContextPropagator<String> {
@Override
public Supplier<Optional<String>> retrieve() {
return () -> Optional.ofNullable(RootContext.getXID());
}
@Override
public Consumer<Optional<String>> copy() {
return s -> s.ifPresent(RootContext::bind);
}
@Override
public Consumer<Optional<String>> clear() {
return s -> RootContext.unbind();
}
}
3、添加配置
resilience4j:
thread-pool-bulkhead:
configs:
default:
context-propagators:
- cc.xuhao.order.config.Resilience4jSeataXidContextPropagator
XID 传递的原理追踪
Seata 实现RequestInterceptor可以对每个请求进行修改,在这里它把XID加到了请求头。

Seata 又实现了HandlerInterceptor 会在请求到达参与方,参与方的方法执行之前把Header中的XID拿出来绑定到当前线程的ThreadLocal,方法执行后解绑。

结语
有些地方也是大概看了下原理,可能会有理解错误的,但问题解决大致应该没问题。