微服务-熔断机制
背景由于微服务间通过RPC来进行数据交换,所以我们可以做一个假设:在IO型服务中,假设服务A依赖服务B和服务C,而B服务和C服务有可能继续依赖其他的服务,继续下去会使得调用链路过长,技术上称1->N扇出问题如果在A的链路上某个或几个被调用的子服务不可用或延迟较高,则会导致调用A服务的请求被堵住,堵住的请求会消耗占用掉系统的线程、io等资源,当该类请求越来越多,占用的计算机资源越来越多的时候,会导致系统瓶颈出现,造成其他的请求同样不可用,最终导致业务系统崩溃服务器失败影响服务质量超负荷导致整个服务失败服务失败造成的雪崩效应熔断熔断模式:这种模式主要是参考电路熔断,如果一条线路电压过高,保险丝会熔断,防止火灾。放到我们的系统中,如果某个目标服务调用慢或者有大量超时,此时,熔断该服务的调用,对于后续调用请求,不在继续调用目标服务,直接返回,快速释放资源。如果目标服务情况好转则恢复调用。定义里面有几个量化的地方目标服务调用慢或者超时:开启熔断的阀值量化可以通过两个维度:时间与请求数时间 多长时间内的超时请求达到多少,触发熔断请求数 从服务启动,超时请求数达到多少,触发这两个维度都需要记录超时请求数和统计总请求数情况好转,恢复调用如何量化情况好转:多长时间之后超时请求数低于多少关闭熔断熔断状态三种状态的切换开 -- 半开 -- 关开:使用快速失败返回,调用链结束半开:当熔断开启一段时间后,尝试阶段关:调用正常实现机制可以使用一段伪代码表示://正常request
if( request is open) {
//fastfail
} else if( request is halfopen) {
if ( request success count > recoverySampleVolume) {
//state --> close
}
}
//失败request
if( request is failcount > requestVolumeThreshold && errorPercentage > threshold) {
//close --> open
}请求熔断开启时,直接快速失败是halfopen状态,如果成功处理次数是否大于恢复配置,就关闭熔断如果失败次数超过阀值,开启熔断而对于open-->halfopen的转换,可以通过定时器主动触发具体实现现在有很多开源的failsafe:https://github.com/jhalterman/failsafeHystrix个案实现在没有熔断时,请求链路:client --> request --> balance -- > handler一个请求过来,通过负载均衡找到具体的server,再执行加入熔断后:client --> request --> circuitBreakerfilter --> balance -- > handlerCircuitBreakerFilter过滤掉被熔断的server,在负载均衡时,不再被选中getAllServers() 获取所有服务器列表根据requestService,requestMethod获取熔断的servers从allserverList中剔除这些server熔断服务列表怎么维护呢?正常状态 --> 熔断状态1. 收到失败请求(e.g.超时,系统异常)
2. 判断此service是否配置了熔断策略 map<serviceName,circuitBreakerpolicy>
- 根据serviceName,method,serverInfo获取CircuitBreakerCounter
- counter对失败次数+1
- 此server是否在half open状态 HalfOpenServersMap<serverName+method,serverList>
- 在:如果失败次数超过RecoverySampleVolume,openserversmap<servername+method,serverlist>进行put操作、并从HalfOpenServersMap中remove
- 不在:请求数大于等于10笔(requestVolumeThreshold),且错误率达到60%(errorPercentage),openserversmap<servername+method,serverlist>进行put操作熔断状态 --> 正常状态1. 收到请求
2. 判断此service是否配置了熔断策略 map<serviceName,circuitBreakerpolicy>
- 根据serviceName,method,serverInfo获取CircuitBreakerCounter
- counter调用次数+1
- 若half-open 状态下的服务instance被调用次数超过取样的sample数,从HalfOpenServersMap中remove疑问错误率怎么计算?counter的实现上面是close与open的转换,怎么转换到halfopen?错误率= 错误次数/请求次数halfopen状态在上面的提到,被熔断的服务,如果情况好转就会关闭熔断!“情况好转”:什么时候去判断情况好转,怎么判断情况好转两方面在加入到openserversmap时,同时开启延迟时间窗口后的定时任务从openserversmap中移除,加入到halfOpenServersMapcounter实现简单点:AtomicLong,如当是halfopen时,使用这种简单的计数器叠加滑动时间窗口实现VS 降级提到熔断,不得不起一下降级。两者的区别有时语言真是乏力,不容易表达清楚,罗列一下熔断是框架提供,不管业务什么样,防止系统雪崩,都需要提供一下基本功能;而降级与业务有关,自动或手动。比如支付,有很多种支付方式,储蓄卡,信用卡,支付宝,微信。若发现某一支付通道不稳定,或压力过大,手动先关闭,这就是一种降级由此可看出:触发原因不太一样,服务熔断一般是某个服务(下游服务)故障引起,而服务降级一般是从整体负荷考虑;管理目标的层次不太一样,熔断其实是一个框架级的处理,每个微服务都需要(无层级之分),而降级一般需要对业务有层级之分(比如降级一般是从最外围服务开始)实现方式不一样参考微服务熔断与隔离CircuitBreaker
ECS使用体验
??大家好,我是一名研二的学生,来自于东北大学,就读于控制工程专业,同过师兄的介绍了解到阿里云的“飞天加速计划·高校学生在家实践”活动,非常有幸参与了这次活动。??大家可以同过阿里云的服务器写博客,既可以分享技术,又可以巩固自己得知识。我们可以不光是从开发者的角度去使用ESC,也可以去考虑如何让你的应用部署变得弹性可扩展、高可用、安全、应用如何和阿里云上的其它服务集成。比如说:可以用阿里云的负载均衡实现企业级应用部署,可以思考如何部署一个弹性架构,如果在应用层做扩展,如何实现两层架构,如何将结构化数据和非结构化数据分别处理。可以通过云盾+ECS,学习云上的安全知识。通过云监控,学习如何管理阿里云服务器ECS。可以通过SLS简单日志服务,学习如何分析你的应用程序(比如博客)产生的日志,进而了解访客的各种行为。你可以实现最简单的All-IN-ONE的架构,把你的博客应用(也可以是你觉得有价值的其它WEB应用、Java应用等),数据库,内容都部署在一个ECS实例上。??使用阿里云服务器建网站,推荐一个网站,宝塔面板(WordPress以及其它建站程序)对于爱折腾,有大量闲暇时间的,可以不用宝塔面板,而自己手工安装各种网站程序需要的运行环境。使用阿里云服务器还有一个注意事项,小伙伴要切记在ESC服务器后台管理系统中打开对应端口号的安全组。??通过阿里云的“高校学生在家实践”,让我体验到了如何使用linux操作系统的服务器,曾经看似多么遥不可及的东西,可以变成现实,希望自己将来可以玩转服务器,做一名优秀的程序员,也经常分享自己得技术心得,在这个过程中收获到了无比的开了,另外也祝愿阿里云越来越好!
PolarisMesh系列文章——概念系列(一)
# 北极星是什么?北极星是腾讯开源的服务发现和治理中心,致力于解决分布式或者微服务架构中的服务可见、故障容错、流量控制和安全问题。虽然,业界已经有些组件可以解决其中一部分问题,但是缺少一个标准的、多语言的、框架无关的实现。腾讯具有海量的分布式服务,加上业务线和技术栈的多样性,沉淀了大大小小数十个相关组件。从 2019 年开始,我们通过北极星对这些组件进行抽象和整合,打造公司统一的服务发现和治理方案,帮助业务提升研发效率和运营质量。目前,北极星在腾讯内部的服务注册数量超过百万,日接口调用量超过十万亿,通用性和稳定性都得到了大规模的验证。因此,我们将其贡献到开源社区,希望对其他企业有所帮助,也希望吸引更多开发者参与共建。下面从三个方面介绍北极星。## 北极星诞生的背景让我们回顾一下应用架构发现的历程:### 单体架构:单体架构的所有代码都在一个应用中,单体架构具备系统复杂度低,部署简单,易运维等优点,适合小规模或者初创期的业务。但随着应用模块和开发人员增加,单体应用面临众多问题,例如:- 变更成本高:任何修改需要重新部署整个系统;- 扩展性差:无法针对某些热点模块进行水平扩展;- 故障扩散:某个模块出现故障也会影响其他模块;### 微服务架构对单体架构中的每一层进行细粒度的拆分,可以拆分成功整体更松散,模块更内聚的微服务架构。微服务架构具备职责单一、平台无关的通信、独立性、进程隔离的特点。微服务架构具备以下优点:- 服务作为组件,可独立部署,独立变更,独立扩缩容。- 团队管理变简单,可围绕业务来划分团队。- 技术选择可多样性- 业务进程及数据独立,故障不会相互影响。### 微服务架构的实施中的核心问题北极星主要解决的是“服务发现及治理”的问题。## 北极星解决哪些问题在分布式架构及微服务架构实施过程中,业务可能面临以下四类问题。北极星以服务为中心,提供一站式解决方案。| 问题类型 | 问题示例 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? | 解决方案 || -------- | ------------------------------------------------------------ | -------- || 服务可见 | 主调方如何知道被调方的服务地址 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? | 注册发现 || 配置可见 | 如何实现服务配置的版本管理、动态下发、按需变更。 ? ? ? ? ? ? | 配置管理 || 故障容错 | 当被调方的部分实例异常时,如何屏蔽异常实例,屏蔽之后如何恢复 | 熔断降级 || ? ? ? ? ?| 当某些主调方的请求量过多时,如何限制这些主调方的请求,避免影响其他主调方的请求 | 访问限流 || 流量控制 | 被调方包含多个实例,主调方如何确定请求发送到哪个实例,如何保证请求均衡 | 负载均衡 || ? ? ? ? ?| 如何实现按地域就近、单元化隔离、金丝雀发布等各种请求调度策略 | 动态路由 || ? ? ? ? ?| 外网到内网的调用链路如何进行加密,消息防窃听 ? ? ? ? ? ? ? ? | 链路加密 || ? ? ? ? ?| 服务资源如何按用户及角色进行授权 ? ? ? ? ? ? ? ? ? ? ? ? ? ? | 服务鉴权 |## 北极星具备哪些功能北极星主要提供以下核心功能,各部分功能都是基于插件化设计,可单独使用,从功能大类来看,分为注册中心、配置中心以及服务网格三类功能:### 注册中心- 服务注册发现及服务健康检查 ?以服务为中心的分布式应用架构,通过服务和注册发现的方式维护不断变化的请求地址,提高应用的扩展能力,降低应用的迁移成本。北极星提供对注册上来的服务实例进行健康检查,阻止主调方对不健康的服务实例发送请求,同时也提供了保护能力,实例剔除超过一定数量则停止剔除,防止因不健康实例过多导致雪崩效应。### 配置中心- 动态配置 ?提供配置管理的功能,支持应用配置、公共配置的订阅发布、版本管理、变更通知,实现应用配置动态生效。### 服务网格- 路由和负载均衡? ?根据请求标签、实例标签和标签匹配规则,对线上流量进行动态调度,可以应用于按地域就近、单元化隔离和金丝雀发布等多种场景。- 熔断降级和限流? ?提供熔断功能,根据实时采集的错误率等指标,及时熔断异常的服务、接口、实例或者实例分组,降低请求失败率。 ?当负载已经超过了系统的最大处理能力时,北极星提供服务限流功能,可针对不同的请求来源和系统资源进行访问限流,避免服务被压垮。?- 可观测性? ?提供服务治理可视化监控视图,支持请求量、请求延时和请求成功率的指标查询,支持服务调用关系和多维度的流量曲线查询,实现服务治理功能和流量观测一体化。?- Proxyless与Proxy接入 ?提供多语言SDK、以及无侵入的JavaAgent,适配用户高性能低长尾时延以Porxyless模式的接入场景;同时提供独立的Sidecar,支持用户的低侵入性、基于流量接管的Proxy模式接入场景。## 对比其他产品虽然业界没有组件的形态和北极星完全相同,但是一些组件的功能和北极星有所重叠,下面介绍北极星和这些组件的区别。## Eureka从定位上来说,Eureka是服务注册中心,只提供服务发现、服务注册和健康检查功能。北极星是服务发现和治理中心,除服务发现、服务注册和健康检查之外,还提供流量控制、故障容错和安全能力。从架构上来说,Eureka集群采用异步复制的方式同步数据,每个Server将收到的写请求异步复制给集群内的其他Server。当Client越来越多时,需要扩容Server。但是,增加Server也会增加Server之间的复制请求,导致扩容效果不明显。北极星服务端计算存储分离,计算层节点可以随着客户端节点的增加平行扩展,轻松支持百万级节点接入。## IstioIstio通过流量劫持的方式实现服务发现和治理,这种方式有些问题。首先,增加了资源消耗和请求延时,特别是CPU消耗,请求量大的业务难以接受。第二,每个企业使用的RPC协议各不相同,目前Istio主要支持HTTP和gRPC,难以快速支持其他RPC协议。第三,流量劫持直接影响业务的每个请求,稳定性和运维工作要求极高。最后,和主流微服务框架无法共存,业务改造的成本大、风险高。北极星不仅提供无侵入Sidecar,还提供高性能SDK,实现语义相同的服务发现和治理功能,用户可以根据业务场景自行选择。对于请求量大和资源消耗敏感用户,可以在业务应用或者开发框架中集成北极星SDK,快速补齐服务发现和治理功能。
ECS使用体验
??大家好,我是一名研二的学生,来自于东北大学,就读于控制工程专业,同过师兄的介绍了解到阿里云的“飞天加速计划·高校学生在家实践”活动,非常有幸参与了这次活动。??大家可以同过阿里云的服务器写博客,既可以分享技术,又可以巩固自己得知识。我们可以不光是从开发者的角度去使用ESC,也可以去考虑如何让你的应用部署变得弹性可扩展、高可用、安全、应用如何和阿里云上的其它服务集成。比如说:可以用阿里云的负载均衡实现企业级应用部署,可以思考如何部署一个弹性架构,如果在应用层做扩展,如何实现两层架构,如何将结构化数据和非结构化数据分别处理。可以通过云盾+ECS,学习云上的安全知识。通过云监控,学习如何管理阿里云服务器ECS。可以通过SLS简单日志服务,学习如何分析你的应用程序(比如博客)产生的日志,进而了解访客的各种行为。你可以实现最简单的All-IN-ONE的架构,把你的博客应用(也可以是你觉得有价值的其它WEB应用、Java应用等),数据库,内容都部署在一个ECS实例上。??使用阿里云服务器建网站,推荐一个网站,宝塔面板(WordPress以及其它建站程序)对于爱折腾,有大量闲暇时间的,可以不用宝塔面板,而自己手工安装各种网站程序需要的运行环境。使用阿里云服务器还有一个注意事项,小伙伴要切记在ESC服务器后台管理系统中打开对应端口号的安全组。??通过阿里云的“高校学生在家实践”,让我体验到了如何使用linux操作系统的服务器,曾经看似多么遥不可及的东西,可以变成现实,希望自己将来可以玩转服务器,做一名优秀的程序员,也经常分享自己得技术心得,在这个过程中收获到了无比的开了,另外也祝愿阿里云越来越好!
硬核 - Java 随机数相关 API 的演进与思考(上2)
SEED 的来源由于 JDK 中所有的随机算法都是基于上一次输入的,如果我们使用固定 SEED 那么生成的随机序列也一定是一样的。这样在安全敏感的场景,不够合适,官方对于 cryptographically secure 的定义是,要求 SEED 必须是不可预知的,产生非确定性输出。在 Linux 中,会采集用户输入,系统中断等系统运行数据,生成随机种子放入池中,程序可以读取这个池子获取一个随机数。但是这个池子是采集一定数据后才会生成,大小有限,并且它的随机分布肯定不够好,所以我们不能直接用它来做随机数,而是用它来做我们的随机数生成器的种子。这个池子在 Linux 中被抽象为两个文件,这两个文件他们分别是:/dev/random 和 /dev/urandom。一个是必须采集一定熵的数据才放开从池子里面取否则阻塞,另一个则是不管是否采集够直接返回现有的。在 Linux 4.8 之前:在 Linux 4.8 之后:在熵池不够用的时候,file:/dev/random会阻塞,file:/dev/urandom不会。对于我们来说,/dev/urandom 一般就够用,所以一般通过-Djava.security.egd=file:/dev/./urandom设置 JVM 启动参数,使用 urandom 来减少阻塞。我们也可以通过业务中的一些特性,来定时重新设置所有 Random 的 SEED 来进一步增加被破解的难度,例如,每小时用过去一小时的活跃用户数量 * 下单数量作为新的 SEED。测试随机算法随机性以上算法实现的都是伪随机,即当前随机数结果与上一次是强相关的关系。事实上目前基本所有快速的随机算法,都是这样的。并且就算我们让 SEED 足够隐秘,但是如果我们知道算法,还是可以通过当前的随机输出,推测出下一个随机输出。或者算法未知,但是能从几次随机结果反推出算法从而推出之后的结果。针对这种伪随机算法,需要验证算法生成的随机数满足一些特性,例如:period 尽可能长:a full cycle 或者 period 指的是随机序列将所有可能的随机结果都遍历过一遍,同时结果回到初始 seed 需要的结果个数。这个 period 要尽可能的长一些。平均分布(equidistribution),生成的随机数的每个可能结果,在一个 Period 内要尽可能保证每种结果的出现次数是相同的。否则,会影响在某些业务的使用,例如抽奖这种业务,我们需要保证概率要准。复杂度测试:生成的随机序列是否够复杂,不会有那种有规律的数字序列,例如等比数列,等差数列等等。安全性测试:很难通过比较少的结果反推出这个随机算法。目前,已经有很多框架工具用来针对某个算法生成的随机序列进行测试,评价随机序列结果,验证算法的随机性,常用的包括:testU01 随机性测试:https://github.com/umontreal-simul/TestU01-2009/NIST 随机性测试:https://nvlpubs.nist.gov/nistpubs/legacy/sp/nistspecialpublication800-22r1a.pdfDieHarder Suite 随机性测试Java 中内置的随机算法,基本都通过了 testU01 的大部分测试。目前,上面提到过的优化算法都或多或少的暴露出一些随机性问题。目前, Java 17 中的 LXM 算法是随机性测试中表现最好的。注意是随机性表现,而不是性能。Java 中涉及到的所有随机算法(不包括 SecureRandom)Linear Congruential generator: https://doi.org/10.1093%2Fcomjnl%2F1.2.83Linear-feedback shift register: https://www.ams.org/journals/mcom/1965-19-090/S0025-5718-1965-0184406-1/S0025-5718-1965-0184406-1.pdfXORShift: https://doi.org/10.18637%2Fjss.v008.i14Xoroshiro128+: https://arxiv.org/abs/1805.01407LXM: https://dl.packetstormsecurity.net/papers/general/Google_Chrome_3.0_Beta_Math.random_vulnerability.pdfSplitMix: http://gee.cs.oswego.edu/dl/papers/oopsla14.pdf为什么我们在实际业务应用中很少考虑随机安全性问题主要因为,我们一般做了负载均衡多实例部署,还有多线程。一般每个线程使用不同初始 SEED 的 Random 实例(例如 ThreadLocalRandom)。并且一个随机敏感业务,例如抽奖,单个用户一般都会限制次数,所以很难采集够足够的结果反推出算法以及下一个结果,而且你还需要和其他用户一起抽。然后,我们一般会限制随机数范围,而不是使用原始的随机数,这就更大大增加了反解的难度。最后,我们也可以定时使用业务的一些实时指标定时设置我们的 SEED,例如:,每小时用过去一小时的(活跃用户数量 * 下单数量)作为新的 SEED。所以,一般现实业务中,我们很少会用 SecureRandom。如果我们想初始 SEED 让编写程序的人也不能猜出来(时间戳也能猜出来),可以指定随机类的初始 SEED 源,通过 JVM 参数 -Djava.util.secureRandomSeed=true。这个对于所有 Java 中的随机数生成器都有效(例如,Random,SplittableRandom,ThreadLocalRandom 等等)对应源码:static {
String sec = VM.getSavedProperty("java.util.secureRandomSeed");
if (Boolean.parseBoolean(sec)) {
//初始 SEED 从 SecureRandom 中取
// SecureRandom 的 SEED 源,在 Linux 中即我们前面提到的环境变量 java.security.egd 指定的 /dev/random 或者 /dev/urandom
byte[] seedBytes = java.security.SecureRandom.getSeed(8);
long s = (long)seedBytes[0] & 0xffL;
for (int i = 1; i < 8; ++i)
s = (s << 8) | ((long)seedBytes[i] & 0xffL);
seeder.set(s);
}
}所以,针对我们的业务,我们一般只关心算法的性能以及随机性中的平均性,而通过测试的算法,一般随机性都没啥大问题,所以我们只主要关心性能即可。针对安全性敏感的业务,像是 SSL 加密,生成加密随机散列这种,则需要考虑更高的安全随机性。这时候才考虑使用 SecureRandom。SecureRandom 的实现中,随机算法更加复杂且涉及了一些加密思想,我们这里就不关注这些 Secure 的 Random 的算法了。
SpringCloud升级之路2020.0.x版-40. spock 单元测试封装的 WebClient(上)
本系列代码地址:https://github.com/JoJoTec/spring-cloud-parent我们来测试下前面封装好的 WebClient,这里开始,我们使用 spock 编写 groovy 单元测试,这种编写出来的单元测试,代码更加简洁,同时更加灵活,我们在接下来的单元测试代码中就能看出来。编写基于 spock 的 spring-boot context 测试我们加入前面设计的配置,编写测试类:@SpringBootTest(
properties = [
"webclient.configs.testServiceWithCannotConnect.baseUrl=http://testServiceWithCannotConnect",
"webclient.configs.testServiceWithCannotConnect.serviceName=testServiceWithCannotConnect",
"webclient.configs.testService.baseUrl=http://testService",
"webclient.configs.testService.serviceName=testService",
"webclient.configs.testService.responseTimeout=1s",
"webclient.configs.testService.retryablePaths[0]=/delay/3",
"webclient.configs.testService.retryablePaths[1]=/status/4*",
"spring.cloud.loadbalancer.zone=zone1",
"resilience4j.retry.configs.default.maxAttempts=3",
"resilience4j.circuitbreaker.configs.default.failureRateThreshold=50",
"resilience4j.circuitbreaker.configs.default.slidingWindowType=TIME_BASED",
"resilience4j.circuitbreaker.configs.default.slidingWindowSize=5",
//因为重试是 3 次,为了防止断路器打开影响测试,设置为正好比重试多一次的次数,防止触发
//同时我们在测试的时候也需要手动清空断路器统计
"resilience4j.circuitbreaker.configs.default.minimumNumberOfCalls=4",
"resilience4j.circuitbreaker.configs.default.recordExceptions=java.lang.Exception"
],
classes = MockConfig
)
class WebClientUnitTest extends Specification {
@SpringBootApplication
static class MockConfig {
}
}我们加入三个服务实例供单元测试调用:class WebClientUnitTest extends Specification {
def zone1Instance1 = new DefaultServiceInstance(instanceId: "instance1", host: "www.httpbin.org", port: 80, metadata: Map.ofEntries(Map.entry("zone", "zone1")))
def zone1Instance2 = new DefaultServiceInstance(instanceId: "instance2", host: "www.httpbin.org", port: 8081, metadata: Map.ofEntries(Map.entry("zone", "zone1")))
def zone1Instance3 = new DefaultServiceInstance(instanceId: "instance3", host: "httpbin.org", port: 80, metadata: Map.ofEntries(Map.entry("zone", "zone1")))
}我们要动态的指定负载均衡获取服务实例列表的响应,即去 Mock 负载均衡器的 ServiceInstanceListSupplier 并覆盖:class WebClientUnitTest extends Specification {
@Autowired
private Tracer tracer
@Autowired
private ServiceInstanceMetrics serviceInstanceMetrics
RoundRobinWithRequestSeparatedPositionLoadBalancer loadBalancerClientFactoryInstance = Spy();
ServiceInstanceListSupplier serviceInstanceListSupplier = Spy();
//所有测试的方法执行前会调用的方法
def setup() {
//初始化 loadBalancerClientFactoryInstance 负载均衡器
loadBalancerClientFactoryInstance.setTracer(tracer)
loadBalancerClientFactoryInstance.setServiceInstanceMetrics(serviceInstanceMetrics)
loadBalancerClientFactoryInstance.setServiceInstanceListSupplier(serviceInstanceListSupplier)
}
}之后,我们可以通过下面的 groovy 代码,动态指定微服务返回实例://指定 testService 微服务的 LoadBalancer 为 loadBalancerClientFactoryInstance
loadBalancerClientFactory.getInstance("testService") >> loadBalancerClientFactoryInstance
//指定 testService 微服务实例列表为 zone1Instance1, zone1Instance3
serviceInstanceListSupplier.get() >> Flux.just(Lists.newArrayList(zone1Instance1, zone1Instance3))测试断路器异常重试以及断路器级别我们需要验证:对于断路器打开的异常,由于没有请求发出去,所以需要直接重试其他的实例。我们可以设立一个微服务,包含两个实例,将其中一个实例的某个路径断路器打开,之后多次调用这个微服务的这个路径接口,看是否都调用成功(由于有重试,所以每次调用都会成功)。同时验证,对于负载均衡器获取服务实例的调用,多于调用次数(每次重试都会调用负载均衡器获取一个新的实例用于调用)某个路径断路器打开的时候,其他路径断路器不会打开。在上面打开一个微服务某个实例的一个路径的断路器之后,我们调用其他的路径,无论多少次,都成功并且调用负载均衡器获取服务实例的次数等于调用次数,代表没有重试,也就是没有断路器异常。编写代码:@SpringBootTest(
properties = [
"webclient.configs.testServiceWithCannotConnect.baseUrl=http://testServiceWithCannotConnect",
"webclient.configs.testServiceWithCannotConnect.serviceName=testServiceWithCannotConnect",
"webclient.configs.testService.baseUrl=http://testService",
"webclient.configs.testService.serviceName=testService",
"webclient.configs.testService.responseTimeout=1s",
"webclient.configs.testService.retryablePaths[0]=/delay/3",
"webclient.configs.testService.retryablePaths[1]=/status/4*",
"spring.cloud.loadbalancer.zone=zone1",
"resilience4j.retry.configs.default.maxAttempts=3",
"resilience4j.circuitbreaker.configs.default.failureRateThreshold=50",
"resilience4j.circuitbreaker.configs.default.slidingWindowType=TIME_BASED",
"resilience4j.circuitbreaker.configs.default.slidingWindowSize=5",
//因为重试是 3 次,为了防止断路器打开影响测试,设置为正好比重试多一次的次数,防止触发
//同时我们在测试的时候也需要手动清空断路器统计
"resilience4j.circuitbreaker.configs.default.minimumNumberOfCalls=4",
"resilience4j.circuitbreaker.configs.default.recordExceptions=java.lang.Exception"
],
classes = MockConfig
)
class WebClientUnitTest extends Specification {
@SpringBootApplication
static class MockConfig {
}
@SpringBean
private LoadBalancerClientFactory loadBalancerClientFactory = Mock()
@Autowired
private CircuitBreakerRegistry circuitBreakerRegistry
@Autowired
private Tracer tracer
@Autowired
private ServiceInstanceMetrics serviceInstanceMetrics
@Autowired
private WebClientNamedContextFactory webClientNamedContextFactory
//不同的测试方法的类对象不是同一个对象,会重新生成,保证互相没有影响
def zone1Instance1 = new DefaultServiceInstance(instanceId: "instance1", host: "www.httpbin.org", port: 80, metadata: Map.ofEntries(Map.entry("zone", "zone1")))
def zone1Instance2 = new DefaultServiceInstance(instanceId: "instance2", host: "www.httpbin.org", port: 8081, metadata: Map.ofEntries(Map.entry("zone", "zone1")))
def zone1Instance3 = new DefaultServiceInstance(instanceId: "instance3", host: "httpbin.org", port: 80, metadata: Map.ofEntries(Map.entry("zone", "zone1")))
RoundRobinWithRequestSeparatedPositionLoadBalancer loadBalancerClientFactoryInstance = Spy();
ServiceInstanceListSupplier serviceInstanceListSupplier = Spy();
//所有测试的方法执行前会调用的方法
def setup() {
//初始化 loadBalancerClientFactoryInstance 负载均衡器
loadBalancerClientFactoryInstance.setTracer(tracer)
loadBalancerClientFactoryInstance.setServiceInstanceMetrics(serviceInstanceMetrics)
loadBalancerClientFactoryInstance.setServiceInstanceListSupplier(serviceInstanceListSupplier)
}
def "测试断路器异常重试以及断路器级别"() {
given: "设置 testService 的实例都是正常实例"
loadBalancerClientFactory.getInstance("testService") >> loadBalancerClientFactoryInstance
serviceInstanceListSupplier.get() >> Flux.just(Lists.newArrayList(zone1Instance1, zone1Instance3))
when: "断路器打开"
//清除断路器影响
circuitBreakerRegistry.getAllCircuitBreakers().forEach({ c -> c.reset() })
loadBalancerClientFactoryInstance = (RoundRobinWithRequestSeparatedPositionLoadBalancer) loadBalancerClientFactory.getInstance("testService")
def breaker
try {
breaker = circuitBreakerRegistry.circuitBreaker("httpbin.org:80/anything", "testService")
} catch (ConfigurationNotFoundException e) {
breaker = circuitBreakerRegistry.circuitBreaker("httpbin.org:80/anything")
}
//打开实例 3 的断路器
breaker.transitionToOpenState()
//调用 10 次
for (i in 0..<10) {
Mono<String> stringMono = webClientNamedContextFactory.getWebClient("testService")
.get().uri("/anything").retrieve()
.bodyToMono(String.class)
println(stringMono.block())
}
then:"调用至少 10 次负载均衡器且没有异常即成功"
(10.._) * loadBalancerClientFactoryInstance.getInstanceResponseByRoundRobin(*_)
when: "调用不同的路径,验证断路器在这个路径上都是关闭"
//调用 10 次
for (i in 0..<10) {
Mono<String> stringMono = webClientNamedContextFactory.getWebClient("testService")
.get().uri("/status/200").retrieve()
.bodyToMono(String.class)
println(stringMono.block())
}
then: "调用必须为正好 10 次代表没有重试,一次成功,断路器之间相互隔离"
10 * loadBalancerClientFactoryInstance.getInstanceResponseByRoundRobin(*_)
}
}测试针对 connectTimeout 重试对于连接超时,我们需要验证:无论是否可以重试的方法或者路径,都必须重试,因为请求并没有真的发出去。可以这样验证:设置微服务 testServiceWithCannotConnect 一个实例正常,另一个实例会连接超时,我们配置了重试 3 次,所以每次请求应该都能成功,并且随着程序运行,后面的调用不可用的实例还会被断路,照样可以成功调用。@SpringBootTest(
properties = [
"webclient.configs.testServiceWithCannotConnect.baseUrl=http://testServiceWithCannotConnect",
"webclient.configs.testServiceWithCannotConnect.serviceName=testServiceWithCannotConnect",
"webclient.configs.testService.baseUrl=http://testService",
"webclient.configs.testService.serviceName=testService",
"webclient.configs.testService.responseTimeout=1s",
"webclient.configs.testService.retryablePaths[0]=/delay/3",
"webclient.configs.testService.retryablePaths[1]=/status/4*",
"spring.cloud.loadbalancer.zone=zone1",
"resilience4j.retry.configs.default.maxAttempts=3",
"resilience4j.circuitbreaker.configs.default.failureRateThreshold=50",
"resilience4j.circuitbreaker.configs.default.slidingWindowType=TIME_BASED",
"resilience4j.circuitbreaker.configs.default.slidingWindowSize=5",
//因为重试是 3 次,为了防止断路器打开影响测试,设置为正好比重试多一次的次数,防止触发
//同时我们在测试的时候也需要手动清空断路器统计
"resilience4j.circuitbreaker.configs.default.minimumNumberOfCalls=4",
"resilience4j.circuitbreaker.configs.default.recordExceptions=java.lang.Exception"
],
classes = MockConfig
)
class WebClientUnitTest extends Specification {
@SpringBootApplication
static class MockConfig {
}
@SpringBean
private LoadBalancerClientFactory loadBalancerClientFactory = Mock()
@Autowired
private CircuitBreakerRegistry circuitBreakerRegistry
@Autowired
private Tracer tracer
@Autowired
private ServiceInstanceMetrics serviceInstanceMetrics
@Autowired
private WebClientNamedContextFactory webClientNamedContextFactory
//不同的测试方法的类对象不是同一个对象,会重新生成,保证互相没有影响
def zone1Instance1 = new DefaultServiceInstance(instanceId: "instance1", host: "www.httpbin.org", port: 80, metadata: Map.ofEntries(Map.entry("zone", "zone1")))
def zone1Instance2 = new DefaultServiceInstance(instanceId: "instance2", host: "www.httpbin.org", port: 8081, metadata: Map.ofEntries(Map.entry("zone", "zone1")))
def zone1Instance3 = new DefaultServiceInstance(instanceId: "instance3", host: "httpbin.org", port: 80, metadata: Map.ofEntries(Map.entry("zone", "zone1")))
RoundRobinWithRequestSeparatedPositionLoadBalancer loadBalancerClientFactoryInstance = Spy();
ServiceInstanceListSupplier serviceInstanceListSupplier = Spy();
//所有测试的方法执行前会调用的方法
def setup() {
//初始化 loadBalancerClientFactoryInstance 负载均衡器
loadBalancerClientFactoryInstance.setTracer(tracer)
loadBalancerClientFactoryInstance.setServiceInstanceMetrics(serviceInstanceMetrics)
loadBalancerClientFactoryInstance.setServiceInstanceListSupplier(serviceInstanceListSupplier)
}
def "测试针对 connectTimeout 重试"() {
given: "设置微服务 testServiceWithCannotConnect 一个实例正常,另一个实例会连接超时"
loadBalancerClientFactory.getInstance("testServiceWithCannotConnect") >> loadBalancerClientFactoryInstance
serviceInstanceListSupplier.get() >> Flux.just(Lists.newArrayList(zone1Instance1, zone1Instance2))
when:
//由于我们针对 testService 返回了两个实例,一个可以正常连接,一个不可以,但是我们配置了重试 3 次,所以每次请求应该都能成功,并且随着程序运行,后面的调用不可用的实例还会被断路
//这里主要测试针对 connect time out 还有 断路器打开的情况都会重试,并且无论是 GET 方法还是其他的
Span span = tracer.nextSpan()
for (i in 0..<10) {
Tracer.SpanInScope cleared = tracer.withSpanInScope(span)
try {
//测试 get 方法(默认 get 方法会重试)
Mono<String> stringMono = webClientNamedContextFactory.getWebClient("testServiceWithCannotConnect")
.get().uri("/anything").retrieve()
.bodyToMono(String.class)
println(stringMono.block())
//测试 post 方法(默认 post 方法针对请求已经发出的不会重试,这里没有发出请求所以还是会重试的)
stringMono = webClientNamedContextFactory.getWebClient("testServiceWithCannotConnect")
.post().uri("/anything").retrieve()
.bodyToMono(String.class)
println(stringMono.block())
}
finally {
cleared.close()
}
}
then:"调用至少 20 次负载均衡器且没有异常即成功"
(20.._) * loadBalancerClientFactoryInstance.getInstanceResponseByRoundRobin(*_)
}
}
SpringCloud升级之路2020.0.x版-39. 改造 resilience4j 粘合 WebClient
本系列代码地址:https://github.com/JoJoTec/spring-cloud-parent要想实现我们上一节中提到的:需要在重试以及断路中加一些日志,便于日后的优化需要定义重试的 Exception,并且与断路器相结合,将非 2xx 的响应码也封装成特定的异常需要在断路器相关的 Operator 中增加类似于 FeignClient 中的负载均衡的数据更新,使得负载均衡更加智能我们需要将 resilience4j 本身提供的粘合库做一些改造,其实主要就是对 resilience4j 实现的 project reactor 的 Operator 进行改造。关于断路器的改造首先,WebClient 的返回对象只可能是 ClientResponse 类型,所以我们这里改造出来的 Operator 不必带上形参,只需要针对 ClientResponse 即可,即:public class ClientResponseCircuitBreakerOperator implements UnaryOperator<Publisher<ClientResponse>> {
...
}在原有的断路器逻辑中,我们需要加入针对 GET 方法以及之前定义的可以重试的路径匹配配置可以重试的逻辑,这需要我们拿到原有请求的 URL 信息。但是 ClientResponse 中并没有暴露这些信息的接口,其默认实现 DefaultClientResponse(我们只要没有自己给 WebClient 加入特殊的改造逻辑,实现都是 DefaultClientResponse) 中的 request() 方法可以获取请求 HttpRequest,其中包含 url 信息。但是这个类还有方法都是 package-private 的,我们需要反射出来:ClientResponseCircuitBreakerSubscriberprivate static final Class<?> aClass;
private static final Method request;
static {
try {
aClass = Class.forName("org.springframework.web.reactive.function.client.DefaultClientResponse");
request = ReflectionUtils.findMethod(aClass, "request");
request.setAccessible(true);
} catch (Exception e) {
throw new RuntimeException(e);
}
}之后,在获取到 ClientResponse 之后记录断路器的逻辑中,需要加入上面提到的关于重试的改造,以及负载均衡器的记录:ClientResponseCircuitBreakerSubscriberprotected void hookOnNext(ClientResponse clientResponse) {
if (!isDisposed()) {
if (singleProducer && successSignaled.compareAndSet(false, true)) {
int rawStatusCode = clientResponse.rawStatusCode();
HttpStatus httpStatus = HttpStatus.resolve(rawStatusCode);
try {
HttpRequest httpRequest = (HttpRequest) request.invoke(clientResponse);
//判断方法是否为 GET,以及是否在可重试路径配置中,从而得出是否可以重试
if (httpRequest.getMethod() != HttpMethod.GET && !webClientProperties.retryablePathsMatch(httpRequest.getURI().getPath())) {
//如果不能重试,则直接返回结果
circuitBreaker.onResult(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit(), clientResponse);
} else {
if (httpStatus != null && httpStatus.is2xxSuccessful()) {
//如果成功,则直接返回结果
circuitBreaker.onResult(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit(), clientResponse);
} else {
/**
* 如果异常,参考 DefaultClientResponse 的代码进行异常封装
* @see org.springframework.web.reactive.function.client.DefaultClientResponse#createException
*/
Exception exception;
if (httpStatus != null) {
exception = WebClientResponseException.create(rawStatusCode, httpStatus.getReasonPhrase(), clientResponse.headers().asHttpHeaders(), EMPTY, null, null);
} else {
exception = new UnknownHttpStatusCodeException(rawStatusCode, clientResponse.headers().asHttpHeaders(), EMPTY, null, null);
}
circuitBreaker.onError(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit(), exception);
downstreamSubscriber.onError(exception);
return;
}
}
} catch (Exception e) {
log.fatal("judge request method in circuit breaker error! the resilience4j feature would not be enabled: {}", e.getMessage(), e);
circuitBreaker.onResult(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit(), clientResponse);
}
}
eventWasEmitted.set(true);
downstreamSubscriber.onNext(clientResponse);
}
}同样的,在原有的完成,取消还有失败的记录逻辑中,也加上记录负载均衡数据:ClientResponseCircuitBreakerSubscriber@Override
protected void hookOnComplete() {
if (successSignaled.compareAndSet(false, true)) {
serviceInstanceMetrics.recordServiceInstanceCalled(serviceInstance, true);
circuitBreaker.onSuccess(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit());
}
downstreamSubscriber.onComplete();
}
@Override
public void hookOnCancel() {
if (!successSignaled.get()) {
serviceInstanceMetrics.recordServiceInstanceCalled(serviceInstance, true);
if (eventWasEmitted.get()) {
circuitBreaker.onSuccess(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit());
} else {
circuitBreaker.releasePermission();
}
}
}
@Override
protected void hookOnError(Throwable e) {
serviceInstanceMetrics.recordServiceInstanceCalled(serviceInstance, false);
circuitBreaker.onError(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit(), e);
downstreamSubscriber.onError(e);
}粘合 WebClient 与 resilience4j 的同时覆盖重试逻辑由于前面的断路器中,我们针对可以重试的非 2XX 响应封装成为 WebClientResponseException。所以在重试器中,我们需要加上针对这个异常的重试。同时,需要将重试器放在负载均衡器之前,因为每次重试,都要从负载均衡器中获取一个新的实例。同时,断路器需要放在负载均衡器之后,因为只有在这个之后,才能获取到本次调用的实例,我们的的断路器是针对实例方法级别的:WebClientDefaultConfiguration.java@Bean
public WebClient getWebClient(
ReactorLoadBalancerExchangeFilterFunction lbFunction,
WebClientConfigurationProperties webClientConfigurationProperties,
Environment environment,
RetryRegistry retryRegistry,
CircuitBreakerRegistry circuitBreakerRegistry,
ServiceInstanceMetrics serviceInstanceMetrics
) {
String name = environment.getProperty(WebClientNamedContextFactory.PROPERTY_NAME);
Map<String, WebClientConfigurationProperties.WebClientProperties> configs = webClientConfigurationProperties.getConfigs();
if (configs == null || configs.size() == 0) {
throw new BeanCreationException("Failed to create webClient, please provide configurations under namespace: webclient.configs");
}
WebClientConfigurationProperties.WebClientProperties webClientProperties = configs.get(name);
if (webClientProperties == null) {
throw new BeanCreationException("Failed to create webClient, please provide configurations under namespace: webclient.configs." + name);
}
String serviceName = webClientProperties.getServiceName();
//如果没填写微服务名称,就使用配置 key 作为微服务名称
if (StringUtils.isBlank(serviceName)) {
serviceName = name;
}
String baseUrl = webClientProperties.getBaseUrl();
//如果没填写 baseUrl,就使用微服务名称填充
if (StringUtils.isBlank(baseUrl)) {
baseUrl = "http://" + serviceName;
}
Retry retry = null;
try {
retry = retryRegistry.retry(serviceName, serviceName);
} catch (ConfigurationNotFoundException e) {
retry = retryRegistry.retry(serviceName);
}
//覆盖其中的异常判断
retry = Retry.of(serviceName, RetryConfig.from(retry.getRetryConfig()).retryOnException(throwable -> {
//WebClientResponseException 会重试,因为在这里能 catch 的 WebClientResponseException 只对可以重试的请求封装了 WebClientResponseException
//参考 ClientResponseCircuitBreakerSubscriber 的代码
if (throwable instanceof WebClientResponseException) {
log.info("should retry on {}", throwable.toString());
return true;
}
//断路器异常重试,因为请求没有发出去
if (throwable instanceof CallNotPermittedException) {
log.info("should retry on {}", throwable.toString());
return true;
}
if (throwable instanceof WebClientRequestException) {
WebClientRequestException webClientRequestException = (WebClientRequestException) throwable;
HttpMethod method = webClientRequestException.getMethod();
URI uri = webClientRequestException.getUri();
//判断是否为响应超时,响应超时代表请求已经发出去了,对于非 GET 并且没有标注可以重试的请求则不能重试
boolean isResponseTimeout = false;
Throwable cause = throwable.getCause();
//netty 的读取超时一般是 ReadTimeoutException
if (cause instanceof ReadTimeoutException) {
log.info("Cause is a ReadTimeoutException which indicates it is a response time out");
isResponseTimeout = true;
} else {
//对于其他一些框架,使用了 java 底层 nio 的一般是 SocketTimeoutException,message 为 read time out
//还有一些其他异常,但是 message 都会有 read time out 字段,所以通过 message 判断
String message = throwable.getMessage();
if (StringUtils.isNotBlank(message) && StringUtils.containsIgnoreCase(message.replace(" ", ""), "readtimeout")) {
log.info("Throwable message contains readtimeout which indicates it is a response time out");
isResponseTimeout = true;
}
}
//如果请求是 GET 或者标注了重试,则直接判断可以重试
if (method == HttpMethod.GET || webClientProperties.retryablePathsMatch(uri.getPath())) {
log.info("should retry on {}-{}, {}", method, uri, throwable.toString());
return true;
} else {
//否则,只针对请求还没有发出去的异常进行重试
if (isResponseTimeout) {
log.info("should not retry on {}-{}, {}", method, uri, throwable.toString());
} else {
log.info("should retry on {}-{}, {}", method, uri, throwable.toString());
return true;
}
}
}
return false;
}).build());
HttpClient httpClient = HttpClient
.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) webClientProperties.getConnectTimeout().toMillis())
.doOnConnected(connection ->
connection
.addHandlerLast(new ReadTimeoutHandler((int) webClientProperties.getResponseTimeout().toSeconds()))
.addHandlerLast(new WriteTimeoutHandler((int) webClientProperties.getResponseTimeout().toSeconds()))
);
Retry finalRetry = retry;
String finalServiceName = serviceName;
return WebClient.builder()
.exchangeStrategies(ExchangeStrategies.builder()
.codecs(configurer -> configurer
.defaultCodecs()
//最大 body 占用 16m 内存
.maxInMemorySize(16 * 1024 * 1024))
.build())
.clientConnector(new ReactorClientHttpConnector(httpClient))
//Retry在负载均衡前
.filter((clientRequest, exchangeFunction) -> {
return exchangeFunction
.exchange(clientRequest)
.transform(ClientResponseRetryOperator.of(finalRetry));
})
//负载均衡器,改写url
.filter(lbFunction)
//实例级别的断路器需要在负载均衡获取真正地址之后
.filter((clientRequest, exchangeFunction) -> {
ServiceInstance serviceInstance = getServiceInstance(clientRequest);
serviceInstanceMetrics.recordServiceInstanceCall(serviceInstance);
CircuitBreaker circuitBreaker;
//这时候的url是经过负载均衡器的,是实例的url
//需要注意的一点是,使用异步 client 的时候,最好不要带路径参数,否则这里的断路器效果不好
//断路器是每个实例每个路径一个断路器
String instancId = clientRequest.url().getHost() + ":" + clientRequest.url().getPort() + clientRequest.url().getPath();
try {
//使用实例id新建或者获取现有的CircuitBreaker,使用serviceName获取配置
circuitBreaker = circuitBreakerRegistry.circuitBreaker(instancId, finalServiceName);
} catch (ConfigurationNotFoundException e) {
circuitBreaker = circuitBreakerRegistry.circuitBreaker(instancId);
}
log.info("webclient circuit breaker [{}-{}] status: {}, data: {}", finalServiceName, instancId, circuitBreaker.getState(), JSON.toJSONString(circuitBreaker.getMetrics()));
return exchangeFunction.exchange(clientRequest).transform(ClientResponseCircuitBreakerOperator.of(circuitBreaker, serviceInstance, serviceInstanceMetrics, webClientProperties));
}).baseUrl(baseUrl)
.build();
}
private ServiceInstance getServiceInstance(ClientRequest clientRequest) {
URI url = clientRequest.url();
DefaultServiceInstance defaultServiceInstance = new DefaultServiceInstance();
defaultServiceInstance.setHost(url.getHost());
defaultServiceInstance.setPort(url.getPort());
return defaultServiceInstance;
}这样,我们就实现了我们封装的基于配置的 WebClient
SpringCloud升级之路2020.0.x版-38. 实现自定义 WebClient 的 NamedContextFactory
本系列代码地址:https://github.com/JoJoTec/spring-cloud-parent实现 WeClient 的 NamedContextFactory我们要实现的是不同微服务自动配置装载不同的 WebClient Bean,这样就可以通过 NamedContextFactory 实现。我们先来编写下实现这个 NamedContextFactory 整个的加载流程的代码,其结构图如下所示:spring.factories# AutoConfiguration
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.github.jojotech.spring.cloud.webflux.auto.WebClientAutoConfiguration在 spring.factories 定义了自动装载的自动配置类 WebClientAutoConfigurationWebClientAutoConfiguration@Import(WebClientConfiguration.class)
@Configuration(proxyBeanMethods = false)
public class WebClientAutoConfiguration {
}WebClientAutoConfiguration 这个自动配置类 Import 了 WebClientConfigurationWebClientConfiguration@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(WebClientConfigurationProperties.class)
public class WebClientConfiguration {
@Bean
public WebClientNamedContextFactory getWebClientNamedContextFactory() {
return new WebClientNamedContextFactory();
}
}WebClientConfiguration 中创建了 WebClientNamedContextFactory 这个 NamedContextFactory 的 Bean。在这个 NamedContextFactory 中,定义了默认配置 WebClientDefaultConfiguration。在这个默认配置中,主要是给每个微服务都定义了一个 WebClient定义 WebClient 的配置类我们编写下上一节定义的配置,包括:微服务名称微服务地址,服务地址,不填写则为 http://微服务名称连接超时,使用 Duration,这样我们可以用更直观的配置了,例如 5ms,6s,7m 等等响应超时,使用 Duration,这样我们可以用更直观的配置了,例如 5ms,6s,7m 等等可以重试的路径,默认只对 GET 方法重试,通过这个配置增加针对某些非 GET 方法的路径的重试;同时,这些路径可以使用 * 等路径匹配符,即 Spring 中的 AntPathMatcher 进行路径匹配多个路径。例如 /query/order/**WebClientConfigurationProperties@Data
@NoArgsConstructor
@ConfigurationProperties(prefix = "webclient")
public class WebClientConfigurationProperties {
private Map<String, WebClientProperties> configs;
@Data
@NoArgsConstructor
public static class WebClientProperties {
private static AntPathMatcher antPathMatcher = new AntPathMatcher();
private Cache<String, Boolean> retryablePathsMatchResult = Caffeine.newBuilder().build();
/**
* 服务地址,不填写则为 http://serviceName
*/
private String baseUrl;
/**
* 微服务名称,不填写就是 configs 这个 map 的 key
*/
private String serviceName;
/**
* 可以重试的路径,默认只对 GET 方法重试,通过这个配置增加针对某些非 GET 方法的路径的重试
*/
private List<String> retryablePaths;
/**
* 连接超时
*/
private Duration connectTimeout = Duration.ofMillis(500);
/**
* 响应超时
*/
private Duration responseTimeout = Duration.ofSeconds(8);
/**
* 是否匹配
* @param path
* @return
*/
public boolean retryablePathsMatch(String path) {
if (CollectionUtils.isEmpty(retryablePaths)) {
return false;
}
return retryablePathsMatchResult.get(path, k -> {
return retryablePaths.stream().anyMatch(pattern -> antPathMatcher.match(pattern, path));
});
}
}
}粘合 WebClient 与 resilience4j接下来粘合 WebClient 与 resilience4j 实现断路器以及重试逻辑,WebClient 基于 project-reactor 实现,resilience4j 官方提供了与 project-reactor 的粘合库:<!--粘合 project-reactor 与 resilience4j,这个在异步场景经常会用到-->
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-reactor</artifactId>
</dependency>参考官方文档,我们可以像下面这样给普通的 WebClient 增加相关组件:增加重试器://由于还是在前面弄好的 spring-cloud 环境下,所以还是可以这样获取配置对应的 retry
Retry retry;
try {
retry = retryRegistry.retry(name, name);
} catch (ConfigurationNotFoundException e) {
retry = retryRegistry.retry(name);
}
Retry finalRetry = retry;
WebClient.builder().filter((clientRequest, exchangeFunction) -> {
return exchangeFunction.exchange(clientRequest)
//核心就是加入 RetryOperator
.transform(RetryOperator.of(finalRetry));
})这个 RetryOperator 其实就是使用了 project-reactor 中的 retryWhen 方法实现了 resilience4j 的 retry 机制:RetryOperator@Override
public Publisher<T> apply(Publisher<T> publisher) {
//对于 mono 的处理
if (publisher instanceof Mono) {
Context<T> context = new Context<>(retry.asyncContext());
Mono<T> upstream = (Mono<T>) publisher;
return upstream.doOnNext(context::handleResult)
.retryWhen(reactor.util.retry.Retry.withThrowable(errors -> errors.flatMap(context::handleErrors)))
.doOnSuccess(t -> context.onComplete());
} else if (publisher instanceof Flux) {
//对于 flux 的处理
Context<T> context = new Context<>(retry.asyncContext());
Flux<T> upstream = (Flux<T>) publisher;
return upstream.doOnNext(context::handleResult)
.retryWhen(reactor.util.retry.Retry.withThrowable(errors -> errors.flatMap(context::handleErrors)))
.doOnComplete(context::onComplete);
} else {
//不可能是 mono 或者 flux 以外的其他的
throw new IllegalPublisherException(publisher);
}
}可以看出,其实主要填充了:doOnNext(context::handleResult): 在有响应之后调用,将响应结果传入 retry 的 Context,判断是否需要重试以及重试间隔是多久,并且抛出异常 RetryDueToResultExceptionretryWhen(reactor.util.retry.Retry.withThrowable(errors -> errors.flatMap(context::handleErrors))):捕捉异常 RetryDueToResultException,根据其中的间隔时间,返回 reactor 的重试间隔:Mono.delay(Duration.ofMillis(waitDurationMillis))doOnComplete(context::onComplete):请求完成,没有异常之后,调用 retry 的 complete 进行清理增加断路器://由于还是在前面弄好的 spring-cloud 环境下,所以还是可以这样获取配置对应的 circuitBreaker
CircuitBreaker circuitBreaker;
try {
circuitBreaker = circuitBreakerRegistry.circuitBreaker(instancId, finalServiceName);
} catch (ConfigurationNotFoundException e) {
circuitBreaker = circuitBreakerRegistry.circuitBreaker(instancId);
}
CircuitBreaker finalCircuitBreaker = circuitBreaker;
WebClient.builder().filter((clientRequest, exchangeFunction) -> {
return exchangeFunction.exchange(clientRequest)
//核心就是加入 CircuitBreakerOperator
.transform(CircuitBreakerOperator.of(finalCircuitBreaker));
})类似的,CircuitBreakerOperator 其实也是粘合断路器与 reactor 的 publisher 中的一些 stage 方法,将结果的成功或者失败记录入断路器,这里需要注意,可能有的链路能走到 onNext,可能有的链路能走到 onComplete,也有可能都走到,所以这两个方法都要记录成功,并且保证只记录一次:CircuitBreakerSubscriberclass CircuitBreakerSubscriber<T> extends AbstractSubscriber<T> {
private final CircuitBreaker circuitBreaker;
private final long start;
private final boolean singleProducer;
private final AtomicBoolean successSignaled = new AtomicBoolean(false);
private final AtomicBoolean eventWasEmitted = new AtomicBoolean(false);
protected CircuitBreakerSubscriber(CircuitBreaker circuitBreaker,
CoreSubscriber<? super T> downstreamSubscriber,
boolean singleProducer) {
super(downstreamSubscriber);
this.circuitBreaker = requireNonNull(circuitBreaker);
this.singleProducer = singleProducer;
this.start = circuitBreaker.getCurrentTimestamp();
}
@Override
protected void hookOnNext(T value) {
if (!isDisposed()) {
//正常完成时,断路器也标记成功,因为可能会触发多次(因为 onComplete 也会记录),所以需要 successSignaled 标记只记录一次
if (singleProducer && successSignaled.compareAndSet(false, true)) {
circuitBreaker.onResult(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit(), value);
}
//标记事件已经发出,就是已经执行完 WebClient 的请求,后面判断取消的时候会用到
eventWasEmitted.set(true);
downstreamSubscriber.onNext(value);
}
}
@Override
protected void hookOnComplete() {
//正常完成时,断路器也标记成功,因为可能会触发多次(因为 onNext 也会记录),所以需要 successSignaled 标记只记录一次
if (successSignaled.compareAndSet(false, true)) {
circuitBreaker.onSuccess(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit());
}
downstreamSubscriber.onComplete();
}
@Override
public void hookOnCancel() {
if (!successSignaled.get()) {
//如果事件已经发出,那么也记录成功
if (eventWasEmitted.get()) {
circuitBreaker.onSuccess(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit());
} else {
//否则取消
circuitBreaker.releasePermission();
}
}
}
@Override
protected void hookOnError(Throwable e) {
//记录失败
circuitBreaker.onError(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit(), e);
downstreamSubscriber.onError(e);
}
}我们会使用这个库进行粘合,但是不会直接使用上面的代码,因为考虑到:需要在重试以及断路中加一些日志,便于日后的优化需要定义重试的 Exception,并且与断路器相结合,将非 2xx 的响应码也封装成特定的异常需要在断路器相关的 Operator 中增加类似于 FeignClient 中的负载均衡的数据更新,使得负载均衡更加智能在下面一节我们会详细说明我们是如何实现的有断路器以及重试逻辑和负载均衡数据更新的 WebClient。
SpringCloud升级之路2020.0.x版-37. 实现异步的客户端封装配置管理的意义与设计
本系列代码地址:https://github.com/JoJoTec/spring-cloud-parent为何需要封装异步 HTTP 客户端 WebClient对于同步的请求,我们使用 spring-cloud-openfeign 封装的 FeignClient,并做了额外的定制。对于异步的请求,使用的是异步 Http 客户端即 WebClient。WebClient 使用也比较简单,举一个简单的例子即://使用 WebClient 的 Builder 创建 WebClient
WebClient client = WebClient.builder()
//指定基址
.baseUrl("http://httpbin.org")
//可以指定一些默认的参数,例如默认 Cookie,默认 HttpHeader 等等
.defaultCookie("cookieKey", "cookieValue")
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.build();创建好 WebClient 后即可以使用这个 WebClient 进行调用:// GET 请求 /anything 并将 body 转化为 String
Mono<String> stringMono = client.get().uri("/anything").retrieve().bodyToMono(String.class);
//这里为了测试,采用阻塞获取
String block = stringMono.block();返回的结果如下所示(请求 http://httporg.bin/anything 会将请求中的所有内容原封不动返回,从这里我们可以看出上面测试的 Header 还有 cokkie 都被返回了):{
"args": {},
"data": "",
"files": {},
"form": {},
"headers": {
"Accept": "*/*",
"Accept-Encoding": "gzip",
"Cookie": "TestCookie=TestCookieValue,getAnythingCookie=getAnythingCookieValue",
"Getanythingheader": "getAnythingHeaderValue",
"Host": "httpbin.org",
"Testheader": "TestHeaderValue",
"User-Agent": "ReactorNetty/1.0.7"
},
"json": null,
"method": "GET",
"origin": "12.12.12.12",
"url": "http://httpbin.org/anything"
}我们也可以加入负载均衡的功能,让 WebClient 利用我们内部的 LoadBalancer,负载均衡调用其他微服务,首先注入负载均衡 Filter:@Autowired
ReactorLoadBalancerExchangeFilterFunction lbFunction;创建 WebClient 的时候,将这个 Filter 加入://使用 WebClient 的 Builder 创建 WebClient
WebClient client = WebClient.builder()
//指定基址微服务
.baseUrl("http://微服务名称")
//可以指定一些默认的参数,例如默认 Cookie,默认 HttpHeader 等等
.defaultCookie("cookieKey", "cookieValue")
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
//负载均衡器,改写url
.filter(lbFunction)
.build();这样,这个 WebClient 就能调用微服务了。但是,这样还不能满足我们的需求:需要在 WebClient 加入像 Feignclient 里面加的类似的重试与断路机制,线程隔离就不需要了,因为都是异步请求不会阻塞任务线程。需要针对不同的微服务配置不同的连接超时以及响应超时来适应不同微服务。这些配置都增加了代码的复杂度,我们需要减少这些代码对于业务的侵入性,最好能通过纯配置实现这些 WebClient 的初始化。要实现的配置设计以及使用举例首先,我们要实现的 WebClient,其 Filter 包含三个:重试 Filter:重试的 Filter 要在负载均衡 Filter 之前,因为重试的时候,我们会从负载均衡器获取另一个实例进行重试,而不是在同一个实例上重试多次。负载均衡 Filter,其实就是内置的 ReactorLoadBalancerExchangeFilterFunction断路器 Filter:断路器需要在负载均衡之后,因为只有负载均衡之后才能拿到具体本地调用的服务实例,这样我们才能实现基于微服务实例方法级别的断路器。需要重试的场景:非 2xx 的响应码返回,并且方法是可以重试的方法。如何定义方法是可以重试的,首先 GET 方法是可以重试的,对于其他方法,根据配置中的是否配置了这个 URL 可以重试决定。异常重试:连接异常:例如连接超时,连接中断等等,所有请求的连接异常都可以重试,因为请求并没有发出去。断路器异常:该服务实例方法级别的断路器打开了,需要直接重试其他实例,因为请求并没有发出去。响应超时异常:这个重试逻辑和非 2xx 的响应码返回一样。我们需要实现的配置方式是,通过这样配置 application.yml:webclient:
configs:
//微服务名称
testService1:
//请求基址,第一级域名作为微服务名称
baseUrl: http://testService1
//最多的 http 连接数量
maxConnection: 50
//连接超时
connectTimeout: 500ms
//响应超时
responseTimeout: 60s
//除了 GET 方法外,哪些路径还能重试
retryablePaths:
- /retryable/**
- /user/orders加入这些配置,我们就能获取载对应微服务的 WebClient 的 Bean,例如://自动装载 我们自定义的 WebClient 的 NamedContextFactory,这个是我们后面要实现的
@Autowired
private WebClientNamedContextFactory webClientNamedContextFactory;
//通过微服务名称,获取对应的微服务调用的 WebClient
webClientNamedContextFactory.getWebClient("testService1");接下来,我们会实现这些。
SpringCloud升级之路2020.0.x版-34.验证重试配置正确性(3)
本系列代码地址:https://github.com/JoJoTec/spring-cloud-parent我们继续上一节针对我们的重试进行测试验证针对可重试的方法响应超时异常重试正确我们可以通过 httpbin.org 的 /delay/响应时间秒 来实现请求响应超时。例如 /delay/3 就会延迟三秒后返回。这个接口也是可以接受任何类型的 HTTP 请求方法。我们先来指定关于 Feign 超时的配置 Options://SpringExtension也包含了 Mockito 相关的 Extension,所以 @Mock 等注解也生效了
@ExtendWith(SpringExtension.class)
@SpringBootTest(properties = {
//关闭 eureka client
"eureka.client.enabled=false",
//默认请求重试次数为 3
"resilience4j.retry.configs.default.maxAttempts=3",
//指定默认响应超时为 2s
"feign.client.config.default.readTimeout=2000",
})
@Log4j2
public class OpenFeignClientTest {
@SpringBootApplication
@Configuration
public static class App {
@Bean
public DiscoveryClient discoveryClient() {
//模拟两个服务实例
ServiceInstance service1Instance1 = Mockito.spy(ServiceInstance.class);
ServiceInstance service1Instance3 = Mockito.spy(ServiceInstance.class);
Map<String, String> zone1 = Map.ofEntries(
Map.entry("zone", "zone1")
);
when(service1Instance1.getMetadata()).thenReturn(zone1);
when(service1Instance1.getInstanceId()).thenReturn("service1Instance1");
when(service1Instance1.getHost()).thenReturn("httpbin.org");
when(service1Instance1.getPort()).thenReturn(80);
DiscoveryClient spy = Mockito.spy(DiscoveryClient.class);
//微服务 testService1 有一个实例即 service1Instance1
Mockito.when(spy.getInstances("testService1"))
.thenReturn(List.of(service1Instance1));
return spy;
}
}
}我们分别定义会超时和不会超时的接口:@FeignClient(name = "testService1", contextId = "testService1Client")
public interface TestService1Client {
@GetMapping("/delay/1")
String testGetDelayOneSecond();
@GetMapping("/delay/3")
String testGetDelayThreeSeconds();
}编写测试,还是通过获取调用负载均衡获取实例的次数确定请求调用了多少次。@Test
public void testTimeOutAndRetry() throws InterruptedException {
Span span = tracer.nextSpan();
try (Tracer.SpanInScope cleared = tracer.withSpanInScope(span)) {
//防止断路器影响
circuitBreakerRegistry.getAllCircuitBreakers().asJava().forEach(CircuitBreaker::reset);
long l = span.context().traceId();
RoundRobinWithRequestSeparatedPositionLoadBalancer loadBalancerClientFactoryInstance
= (RoundRobinWithRequestSeparatedPositionLoadBalancer) loadBalancerClientFactory.getInstance("testService1");
AtomicInteger atomicInteger = loadBalancerClientFactoryInstance.getPositionCache().get(l);
int start = atomicInteger.get();
//不超时,则不会有重试,也不会有异常导致 fallback
String s = testService1Client.testGetDelayOneSecond();
//没有重试,只会请求一次
Assertions.assertEquals(1, atomicInteger.get() - start);
//防止断路器影响
circuitBreakerRegistry.getAllCircuitBreakers().asJava().forEach(CircuitBreaker::reset);
start = atomicInteger.get();
//超时,并且方法可以重试,所以会请求 3 次
try {
s = testService1Client.testGetDelayThreeSeconds();
} catch(Exception e) {}
Assertions.assertEquals(3, atomicInteger.get() - start);
}
}验证针对不可重试的方法响应超时异常不能重试对于 GET 方法,我们默认是可以重试的。但是一般扣款这种涉及修改请求的接口,我们会使用其他方法例如 POST。这一类方法一般请求超时我们不会直接重试的。我们还是通过 httporg.bin 的延迟接口进行测试:@FeignClient(name = "testService1", contextId = "testService1Client")
public interface TestService1Client {
@PostMapping("/delay/3")
String testPostDelayThreeSeconds();
}编写测试,还是通过获取调用负载均衡获取实例的次数确定请求调用了多少次。@Test
public void testTimeOutAndRetry() throws InterruptedException {
Span span = tracer.nextSpan();
try (Tracer.SpanInScope cleared = tracer.withSpanInScope(span)) {
//防止断路器影响
circuitBreakerRegistry.getAllCircuitBreakers().asJava().forEach(CircuitBreaker::reset);
long l = span.context().traceId();
RoundRobinWithRequestSeparatedPositionLoadBalancer loadBalancerClientFactoryInstance
= (RoundRobinWithRequestSeparatedPositionLoadBalancer) loadBalancerClientFactory.getInstance("testService1");
AtomicInteger atomicInteger = loadBalancerClientFactoryInstance.getPositionCache().get(l);
int start = atomicInteger.get();
//不超时,则不会有重试,也不会有异常导致 fallback
String s = testService1Client.testPostDelayThreeSeconds();
//没有重试,只会请求一次
Assertions.assertEquals(1, atomicInteger.get() - start);
}
}