【On Nacos】聊聊 Nacos
Nacos 是什么Nacos 在阿里巴巴起源于 2008 年五彩石项目,该项目完成了微服务拆分和业务中台建设,随着云计算和开源环境的兴起,2018 年,将 Nacos 开源,输出阿里十年关于服务发现和配管管理的沉淀,推动微服务行业发展,加速企业数字化转型。Nacos 是集配置中心和服务注册与发现于一身的微服务基础设施。致力于解决微服务架构中的配置中心、服务注册与发现等问题,它提供了一套简单易用的特性集,帮助开发者快速实现动态服务发现、服务配置、服务元数据、流量管理及统一配置。现在服务注册发现的技术已经非常成熟,配置中心和服务注册与发现的一站式解决方案貌似也成为了行业的主流,包括比较成熟的 etcd、consul 等也都是即可以当做配置心中使用,又可以当做服务注册与发现使用。特性易于使用动态配置管理、服务注册与发现的一站式解决方案20多种开箱即用的以服务为中心的架构特性基本符合生产要求的轻量级易用控制台更适应云架构无缝支持Kubernetes和Spring Cloud在主流公共云上更容易部署和运行(例如阿里云和AWS)多租户和多环境支持生产等级脱胎于历经阿里巴巴10年生产验证的内部产品支持具有数百万服务的大规模场景具备企业级SLA的开源产品丰富的应用场景支持限流、大促销预案和异地多活直接支持或稍作扩展即可支持大量有用的互联网应用场景流量调度和服务治理配置中心Nacos 配置中心可以通过 client SDK 或 控制台来发布配置,将配置的信息交由 Nacos 来保存。可以为用户提供外部化配置文件、动态修改、即时生效。解决因配置变更要重启服务带来的风险与困扰。Nacos 的配置中心包含以下特性:提供对配置信息的生命周期管理,增删改查。支持多种格式的配置信息配置,例如:text、json、xml、yaml、html、properties。支持配置版本管理,可以进行版本恢复。支持配置灰度功能,可以针对指定 ip 进行灰度测试。支持配置监听,变更实时推送。服务注册与发现Nacos 服务注册与发现基于内存进行服务实例信息及元数据的声明周期管理,服务在启动的时候将服务实例注册到 Nacos 中,并在关闭的时候注销服务实例。Nacos 的服务注册与发现包含一下特性:支持对服务信息及元数据的生命周期管理,增删改查。支持负载均衡策略。支持对服务实例的健康状态检查、服务权重管理。Nacos 架构下面的架构图是 Nacos 2.x 的架构图,在 1.x 的基础上增加了 gRPC、Rsocket 实现了长链接和 RPC 调用和推送能力。解决 1.x 下的 http 短连接模型的性能问题。接入层:Nacos 提供了 client SDK 和 OpenAPI 来提供和 Nacos 交互的能力。协议层:目前支持 gRPC、Rsocket、HTTP、UDP 协议。链接层:主要负责处理请求、流量控制、负载均衡等操作。功能层:面有服务注册与发现、配置管理。这块就是的核心业务层。数据一致性层:Nacos 提供了两种协议。分别是 AP 模式的 Distro 和 CP 模式的 Raft。Distro:非持久化服务的同步模式。Raft:持久化服务的同步模式、以及使用 Derby 作为配置的存储时同步配置操作。持久化层:Nacos 使用 MySQL 、Derby 和 本地文件来做持久化。配置信息、用户信息、权限等存储在 MySQL 和 Derby 中。服务实例以及服务元数据等信息存储在内存和本地文件中。Nacos 解决了什么问题从分布式系统的角度来看,Nacos主要解决了两个问题,统一配置管理,变更即时生效的问题、分布式服务实例列表管理问题。如果只有一台计算机,那么程序运行要么是成功的,要么是失败的。服务采用的是单体架构,所有的服务、配置都是在一台计算机上。这种架构的缺点就是:复杂性高容易积累技术债务开发部署效率低扩展能力受限阻断技术创新如果有台计算机,可能是成百上千台。这个时候每个服务分别部署在不同的计算机上,这个时候就要考虑计算机与计算机之间怎么通信?采用什么通信协议。要考虑多台计算机访问一台计算机的并发问题。计算机 A 调用计算机 B,计算机 B 出错了,怎么保证计算机 A 不受影响,或者降低影响。这里就要考虑计算机A的容错性问题。例如单体架构向微服务架构的转变。构建一个分布式系统是很难的,需要考虑非常多的问题。而为了解决这些问题产生了很多技术,有传输层面的、存储层面的、计算层面的等等。不过分布式也带来了很多优点:降低系统之间的耦合度增加了可扩展性开发部署更灵活、更方便提高代码的复用性统一配置管理问题在单体架构的时候我们可以将配置写在配置文件中,但有一个缺点就是每次修改配置都需要重启服 务才能生效。当应用程序实例比较少的时候还可以维护。如果转向微服务架构有成百上千个实例,每修改一次配 置要将全部实例重启,不仅增加了系统的不稳定性,也提高了维护的成本。那么如何能够做到服务不重启就可以修改配置?所有就产生了四个基础诉求:需要支持动态修改配置需要变更实时推送变更后的风险控制,如版本管理、灰度、权限控制等敏感信息的安全配置Nacos 配置中心分为服务端、客户端、控制台三部分,用户可以通过客户端和控制台进行配置的发布,将配置放到 Nacos服务端进行统一的管理。也可以通过客户端进行配置的监听,当在控制台修改配置时,客户端会实时监听到并做出响应。这样就解决了微服务架构中的配置多、配置难、配置变更等问题。分布式服务列表管理问题当我们谈论起服务的实例管理,总要提到注册中心。那么什么是注册中心?注册中心总共有三种角色:服务提供者服务消费者注册中心注册中心提供服务注册、注销能力。可以让服务提供者将自身或其他服务注册上来,由注册中心进行服务信息的维护,包括服务探活能力、路由功能等。服务提供者可以通过注册中心获取到自身或其他服务的服务信息。比如有两个服务,服务 A、服务 B,当服务B的服务实例比较少的时候,那么在服务 A 调用服务 B 的时候,我们可以直接在服务 A 中写死 服务 B 的 ip+port。这种方式在服务实例比较少的情况下还能够管理,当服务实例数多了以后,这种方式就没有办法管理了。还要考虑众多 ip 之间的负载、路由问题,当其中的 ip 不可用了,怎么做生命周期管理等。Nacos 通过自身的注册中心能力解决了服务实例列表管理的问题,提供了如高可用、高性能、水平扩展能力、服务探活能力、路由功能等。提供了 Springboot、springcloud 项目的接入方式,在服务启动的时候自动将服务实例注册到 Nacos 注册中心,由 Nacos 来管理服务实例,进行服务探活。Nacos 生态多语言Nacos 支持 Java、go、nodejs、c#、c++、python 等多种语言的接入。Spring 生态Nacos 无缝支持 Spring 全栈,包括 Spring Cloud、Spring Boot、Spring Framework。Docker & Kubernetes 生态nacos-docker 和 nacos-k8s 是 Nacos 开发团队为支持用户容器化衍生的项目。其本质是为了帮助用户方便快捷的通过官方镜像在 Docker 或者 Kubernetes 进行部署。Service Mesh 生态Nacos 目前支持 MCP、XDS 两种协议,Istio 可以通过 MCP 协议从 Naocs 获取全量的服务信息列表,在转化成 XDS 协议下发到 envoy。这样就支持了 mesh 化应用内的服务发现。MCP 协议是 Istio 社区提出的组件之间配置同步协议,这个协议在 1.8 之后就废弃了,替代方案是 MCP over XDS 协议,Nacos 两个协议都兼容。Nacos 工具nacos-syncnacos-sync 是一个支持多种注册中心的同步组件,基于 Spring boot 开发框架,数据层采用 Spring Data JPA ,遵循了标准的 JPA 访问规范,支持多种数据源存储,默认使用 Hibernate 实现。nacos-ctlnacos-ctl 是用 Java 开发的命令行工具,包含对命名空间、配置命令、服务命令、实例命令等。nacosctlnacosctl 这个项目是我使用 go 语言进行开发的一个命令行工具,纯属造轮子。基于?Nacos OpenApi?封装的命令行工具。提供一些对配置、服务注册与发现、命名空间等命令操作 。借助?go?语言的跨平台交叉编译,将编译好的二进制文件直接放到指定系统下就可以直接运行, 无需环境部署。如果大家想要了解更多的 Nacos 教程,欢迎 star 《On Nacos》开源项目。基于 Nacos 2.x 的入门、原理、源码、实战介绍,帮助开发者快速上手 Nacos。
参照有赞TMC框架原理简单实现多级缓存
项目场景:有位同事因为缓存被后台删除,导致一堆高并发请求直接怼到DB上,导致数据库cpu 100%解决方案:处理缓存击穿问题:像布隆过滤器,或者说提前设置热点key就是热点key检测,这里谈到了有赞TMC框架多级缓存以及它的热点key的发现个人简单实现相关原理本地变量像热点key储存,本地缓存以及相关参数设置设置。 在这里插入图片描述获取本地缓存的数据在这里插入图片描述 解释: 1.由于是分布式环境,所以先查询下这个key有没有被删除过 2.直接走本地缓存 3.如果是后台数据被修改,redis这个标识被修改到了,我们需要重新加载数据库的数据更新到本地缓存中,以及set到redis中数据一致性问题就是redis缓存跟本地缓存一致性问题,我的想法是惰性就行更新,如果有人去读取,先返回本地缓存的旧数据,后面再进行更新,也就是实现最终一致性问题。存在问题就是这里的flag在更新之后会变成0,我这里的的优化方案是:采用nacos的版本控制,redis有一份版本,本地也有一份版本,如果说redis上的版本跟本地缓存的版本有所不一样,那么就进行修改本地缓存,以及将最新的版本更新到本地缓存中。这样的话就不会导致说一台机器把redis设置为0,另一台本地缓存就不会变了。优化方案使用nacos版本修改的原理来控制不同机器的本地缓存更新更新的时候可以加个分布式锁,获得锁才能去查数据库,防止高并发查崩数据库。其次在把这个数据塞到redis还有本地缓存中。设置缓存的值加粗样式删除缓存在这里插入图片描述统一获取缓存的方法/** * 统一获取缓存数据
*
* @param key
* @return
*/
public String getRedisByKey(String key) {
//计数
stringRedisTemplate.opsForValue().increment(key + ":incr", 1);
//5秒过期
stringRedisTemplate.expire(key + ":incr", 10, TimeUnit.SECONDS);
String count = stringRedisTemplate.opsForValue().get(key + ":incr");
if (count != null && Integer.valueOf(count) > 2) {
if (map.get(key) != null) {
System.out.println("命中热点key....");
return getCacheValue(key);
}
//2写死,表示5秒内get超过2次,定义为热点key
map.put(key, "true");
if (stringRedisTemplate.getExpire(key, TimeUnit.SECONDS) < 10) {
//自动延期
System.out.println("自动延期");
stringRedisTemplate.expire(key, 20, TimeUnit.SECONDS);
}
} else {
map.remove(key);
String result = stringRedisTemplate.opsForValue().get(key);
if (result == null) {
String value = a(key);
setRedisByKey(key, value, 20L);
return value;
}
System.out.println("直接走redis");
return result;
}
return getCacheValue(key);
}前面是进行简单的计数法来保存这个热点key,如果命中热点key直接读本地缓存,否则读redis,没有的话再去读DB。重点如果是热点key的话,那么就会去判断它过期时间,如果不够的话会自动给它进行续期。优化比如说热点key的统计方式,这里只是简单的redis+1,如果高级一点就是时间滑窗统计热点key这里是封装redistemplate查询的方案,比较好的是有一个特有的分布式集群来收集这些redis查询,redis key过期、设置、删除操作等等,会更好。在删除热点key map那里也是需要再优化的,就是如果说重新这个key在接下来的时间内不那么火热,那么剔除map对应的key。所有代码import com.google.common.cache.CacheBuilder;import com.google.common.cache.CacheLoader;import com.google.common.cache.LoadingCache;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.redis.core.StringRedisTemplate;import org.springframework.stereotype.Component;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ExecutionException;import java.util.concurrent.TimeUnit;@Componentpublic class RedisManagement
[email protected]
private StringRedisTemplate stringRedisTemplate;
ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
private LoadingCache<String, String> graphs = CacheBuilder.newBuilder()
.maximumSize(1000)
.expireAfterWrite(1, TimeUnit.HOURS)
.refreshAfterWrite(1, TimeUnit.HOURS)
.build(
new CacheLoader<String, String>() {
@Override
public String load(String key) {
return a(key);
}
});
private String getCacheValue(String key) {
String result;
String flag = stringRedisTemplate.opsForValue().get(key + ":flag");
try {
System.out.println("走本地缓存");
result = graphs.get(key);
} catch (ExecutionException e) {
System.out.println("出现报错:" + e);
return null;
}
//不为空还有已经删除状态
if (flag != null && "1".equals(flag)) {
//更新本地缓存的
graphs.refresh(key);
//设置删除标识为未删除
stringRedisTemplate.opsForValue().set(key + ":flag", "0");
}
return result;
}
/**
* 统一设置缓存
*
* @param key
* @param value
* @return
*/
public void setRedisByKey(String key, String value, long time) {
//设置删除标识为未删除
stringRedisTemplate.opsForValue().set(key + ":flag", "0");
stringRedisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
}
/**
* 统一删除缓存
*
* @param key
* @return
*/
public Boolean delRedisByKey(String key) {
//设置删除标识为删除
stringRedisTemplate.opsForValue().set(key + ":flag", "1");
return stringRedisTemplate.delete(key);
}
/**
* 统一获取缓存数据
*
* @param key
* @return
*/
public String getRedisByKey(String key) {
//计数
stringRedisTemplate.opsForValue().increment(key + ":incr", 1);
//5秒过期
stringRedisTemplate.expire(key + ":incr", 10, TimeUnit.SECONDS);
String count = stringRedisTemplate.opsForValue().get(key + ":incr");
if (count != null && Integer.valueOf(count) > 2) {
if (map.get(key) != null) {
System.out.println("命中热点key....");
return getCacheValue(key);
}
//2写死,表示5秒内get超过2次,定义为热点key
map.put(key, "true");
if (stringRedisTemplate.getExpire(key, TimeUnit.SECONDS) < 10) {
//自动延期
System.out.println("自动延期");
stringRedisTemplate.expire(key, 20, TimeUnit.SECONDS);
}
} else {
map.remove(key);
String result = stringRedisTemplate.opsForValue().get(key);
if (result == null) {
String value = a(key);
setRedisByKey(key, value, 20L);
return value;
}
System.out.println("直接走redis");
return result;
}
return getCacheValue(key);
}
/**
* 初始化本地缓存数据
*
* @param key
* @return
*/
private String a(String key) {
System.out.println("查db");
//执行不同逻辑
if (key.startsWith("activity")) {
//查数据库
return "activity";
} else if (key.startsWith("content")) {
//查数据库
return "content";
} else {
return "haha";
}
}
}
参照有赞TMC框架原理简单实现多级缓存
项目场景:有位同事因为缓存被后台删除,导致一堆高并发请求直接怼到DB上,导致数据库cpu 100%
# 解决方案:
1. 处理缓存击穿问题:像布隆过滤器,或者说提前设置热点key
2. 就是热点key检测,这里谈到了有赞TMC框架多级缓存以及它的热点key的发现
# 个人简单实现相关原理
## 本地变量
像热点key储存,本地缓存以及相关参数设置设置。

## 获取本地缓存的数据

解释:
1.由于是分布式环境,所以先查询下这个key有没有被删除过
2.直接走本地缓存
3.如果是后台数据被修改,redis这个标识被修改到了,我们需要重新加载数据库的数据更新到本地缓存中,以及set到redis中
### 数据一致性问题
就是redis缓存跟本地缓存一致性问题,我的想法是惰性就行更新,如果有人去读取,先返回本地缓存的旧数据,后面再进行更新,也就是实现最终一致性问题。
**存在问题**
就是这里的flag在更新之后会变成0,我这里的的**优化方案**是:采用nacos的版本控制,redis有一份版本,本地也有一份版本,如果说redis上的版本跟本地缓存的版本有所不一样,那么就进行修改本地缓存,以及将最新的版本更新到本地缓存中。
这样的话就不会导致说一台机器把redis设置为0,另一台本地缓存就不会变了。
**优化方案**
- 使用nacos版本修改的原理来控制不同机器的本地缓存更新
- 更新的时候可以加个分布式锁,获得锁才能去查数据库,防止高并发查崩数据库。其次在把这个数据塞到redis还有本地缓存中。
## 设置缓存的值

## 删除缓存

## 统一获取缓存的方法
```java
/**
* 统一获取缓存数据
*
* @param key
* @return
*/
public String getRedisByKey(String key) {
//计数
stringRedisTemplate.opsForValue().increment(key + ":incr", 1);
//5秒过期
stringRedisTemplate.expire(key + ":incr", 10, TimeUnit.SECONDS);
String count = stringRedisTemplate.opsForValue().get(key + ":incr");
if (count != null && Integer.valueOf(count) > 2) {
if (map.get(key) != null) {
System.out.println("命中热点key....");
return getCacheValue(key);
}
//2写死,表示5秒内get超过2次,定义为热点key
map.put(key, "true");
if (stringRedisTemplate.getExpire(key, TimeUnit.SECONDS) < 10) {
//自动延期
System.out.println("自动延期");
stringRedisTemplate.expire(key, 20, TimeUnit.SECONDS);
}
} else {
map.remove(key);
String result = stringRedisTemplate.opsForValue().get(key);
if (result == null) {
String value = a(key);
setRedisByKey(key, value, 20L);
return value;
}
System.out.println("直接走redis");
return result;
}
return getCacheValue(key);
}
```
前面是进行简单的计数法来保存这个热点key,如果命中热点key直接读本地缓存,否则读redis,没有的话再去读DB。
## 重点
如果是热点key的话,那么就会去判断它过期时间,如果不够的话会自动给它进行续期。
### 优化
- 比如说热点key的统计方式,这里只是简单的redis+1,如果高级一点就是时间滑窗统计热点key
- 这里是封装redistemplate查询的方案,比较好的是有一个特有的分布式集群来收集这些redis查询,redis key过期、设置、删除操作等等,会更好。
- 在删除热点key map那里也是需要再优化的,就是如果说重新这个key在接下来的时间内不那么火热,那么剔除map对应的key。
# 所有代码
```java
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@Component
public class RedisManagement {
@Autowired
private StringRedisTemplate stringRedisTemplate;
ConcurrentHashMap map = new ConcurrentHashMap<>();
private LoadingCache graphs = CacheBuilder.newBuilder()
.maximumSize(1000)
.expireAfterWrite(1, TimeUnit.HOURS)
.refreshAfterWrite(1, TimeUnit.HOURS)
.build(
new CacheLoader() {
@Override
public String load(String key) {
return a(key);
}
});
private String getCacheValue(String key) {
String result;
String flag = stringRedisTemplate.opsForValue().get(key + ":flag");
try {
System.out.println("走本地缓存");
result = graphs.get(key);
} catch (ExecutionException e) {
System.out.println("出现报错:" + e);
return null;
}
//不为空还有已经删除状态
if (flag != null && "1".equals(flag)) {
//更新本地缓存的
graphs.refresh(key);
//设置删除标识为未删除
stringRedisTemplate.opsForValue().set(key + ":flag", "0");
}
return result;
}
/**
* 统一设置缓存
*
* @param key
* @param value
* @return
*/
public void setRedisByKey(String key, String value, long time) {
//设置删除标识为未删除
stringRedisTemplate.opsForValue().set(key + ":flag", "0");
stringRedisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
}
/**
* 统一删除缓存
*
* @param key
* @return
*/
public Boolean delRedisByKey(String key) {
//设置删除标识为删除
stringRedisTemplate.opsForValue().set(key + ":flag", "1");
return stringRedisTemplate.delete(key);
}
/**
* 统一获取缓存数据
*
* @param key
* @return
*/
public String getRedisByKey(String key) {
//计数
stringRedisTemplate.opsForValue().increment(key + ":incr", 1);
//5秒过期
stringRedisTemplate.expire(key + ":incr", 10, TimeUnit.SECONDS);
String count = stringRedisTemplate.opsForValue().get(key + ":incr");
if (count != null && Integer.valueOf(count) > 2) {
if (map.get(key) != null) {
System.out.println("命中热点key....");
return getCacheValue(key);
}
//2写死,表示5秒内get超过2次,定义为热点key
map.put(key, "true");
if (stringRedisTemplate.getExpire(key, TimeUnit.SECONDS) < 10) {
//自动延期
System.out.println("自动延期");
stringRedisTemplate.expire(key, 20, TimeUnit.SECONDS);
}
} else {
map.remove(key);
String result = stringRedisTemplate.opsForValue().get(key);
if (result == null) {
String value = a(key);
setRedisByKey(key, value, 20L);
return value;
}
System.out.println("直接走redis");
return result;
}
return getCacheValue(key);
}
/**
* 初始化本地缓存数据
*
* @param key
* @return
*/
private String a(String key) {
System.out.println("查db");
//执行不同逻辑
if (key.startsWith("activity")) {
//查数据库
return "activity";
} else if (key.startsWith("content")) {
//查数据库
return "content";
} else {
return "haha";
}
}
}
```
Seata分布式事务环境搭建
下载seata服务端https://github.com/seata/seata/releases修改registry.conf这里使用nacos做注册中心和配置中心, 也就不需要服务端的file.conf了但是使用nacos时, nacos的密码不能有特殊符号, 否则seata可能连接不上(1.5.0已修复)registry {
type = "nacos"
nacos {
application = "seata-server"
serverAddr = "127.0.0.1:8848"
group = "SEATA_GROUP"
namespace = "e794b575-4231-4935-8271-145c5840d392"
cluster = "default"
username = "nacos"
password = "nacos"
}
}
config {
type = "nacos"
nacos {
serverAddr = "127.0.0.1:8848"
namespace = "e794b575-4231-4935-8271-145c5840d392"
group = "SEATA_GROUP"
username = "nacos"
password = "nacos"
dataId = "seataServer.properties"
}
}seata服务端需要的几个表: https://github.com/seata/seata/blob/develop/script/server/db/mysql.sql其他的一些相关的脚本https://github.com/seata/seata/tree/develop/scriptnacos建立命名空间新增配置文件Data ID: seataServer.propertiesGroup: SEATA_GROUPseataServer.properties配置内容### seata
store.mode=db
store.publicKey=
## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp)/HikariDataSource(hikari) etc.
store.db.datasource=druid
## mysql/oracle/postgresql/h2/oceanbase etc.
store.db.dbType=mysql
store.db.driverClassName=com.mysql.cj.jdbc.Driver
## if using mysql to store the data, recommend add rewriteBatchedStatements=true in jdbc connection param
store.db.url=jdbc:mysql://192.168.101.128:3309/seata?rewriteBatchedStatements=true&useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&&serverTimezone=Asia/Shanghai
store.db.user=root
store.db.password=123456
store.db.minConn=5
store.db.maxConn=100
store.db.globalTable = global_table
store.db.branchTable = branch_table
store.db.lockTable =lock_table
store.db.queryLimit = 100
store.db.maxWait = 5000
## transport
# tcp udt unix-domain-socket
transport.type=TCP
#NIO NATIVE
transport.server=NIO
#enable heartbeat
transport.heartbeat=true
transport.serialization=seata
transport.compressor=none
transport.threadFactory.bossThreadPrefix = NettyBoss
transport.threadFactory.workerThreadPrefix = NettyServerNIOWorker
transport.threadFactory.serverExecutorThread-prefix = NettyServerBizHandler
transport.threadFactory.shareBossWorker = false
transport.threadFactory.clientSelectorThreadPrefix = NettyClientSelector
transport.threadFactory.clientSelectorThreadSize = 1
transport.threadFactory.clientWorkerThreadPrefix = NettyClientWorkerThread
transport.threadFactory.bossThreadSize = 1
transport.threadFactory.workerThreadSize = default
# 销毁服务器时, 等待几秒钟
transport.shutdown.wait=3
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000单体服务多库事务SpringBoot项目引入依赖<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.4.2</version>
</dependency>项目配置文件seata:
application-id: test #这里填你应用的id
service:
grouplist:
# seata-server地址
default: 127.0.0.1:8091
# 分组事务
vgroup-mapping:
global_tx_group: default
enable-degrade: false
disable-global-transaction: false
# 是否开启spring-boot自动装配
enabled: true
# 是否启用数据源 bean 的自动代理
enable-auto-data-source-proxy: true
tx-service-group: global_tx_group
client:
tm:
# 一阶段全局提交结果上报TC重试次数 默认1次,建议大于1
commit-retry-count: 3
# 一阶段全局回滚结果上报TC重试次数 默认1次,建议大于1
rollback-retry-count: 3
rm:
# 是否上报一阶段成功 true、false,从1.1.0版本开始,默认false.true用于保持分支事务生命周期记录完整,false可提高不少性能
report-success-enable: true
# 自动刷新缓存中的表结构 默认false
table-meta-check-enable: true
# 一阶段结果上报TC重试次数
report-retry-count: 5
# 异步提交缓存队列长度 默认10000。 二阶段提交成功,RM异步清理undo队列
async-commit-buffer-limit: 1000
lock:
# 校验或占用全局锁重试间隔 默认10,单位毫秒
retry-interval: 10
# 分支事务与其它全局回滚事务冲突时锁策略 默认true,优先释放本地锁让回滚成功
retry-policy-branch-rollback-on-conflict: true
# 校验或占用全局锁重试次数
retry-times: 30
undo:
# 自定义undo表名 默认undo_log
log-table: seata_undo_log
# 二阶段回滚镜像校验
data-validation: true
# undo log序列化方式
log-serialization: jackson
transport:
type: TCP
server: NIO
heartbeat: true
# client和server通信编解码方式 seata(ByteBuf)、protobuf、kryo、hession、fst,默认seata
serialization: seata
# client和server通信数据压缩方式 none、gzip,默认none
compressor: none
thread-factory:
boss-thread-prefix: NettyBoss
client-worker-thread-prefix: NettyServerNIOWorker
server-executor-thread-prefix: NettyServerBizHandler
client-selector-thread-size: 1
client-selector-thread-prefix: NettyClientWorkerThread简单使用, 配合dynamic-datasource-spring-boot-starter使用@Autowired
StaffMapper staffMapper;
@Override
@GlobalTransactional(rollbackFor = Exception.class)
public void globalTx() {
userService.updateMaster();
userService.updateIndependent();
//模拟异常回滚
int i = 1 / 0;
}
@DS("master")
@Transactional(rollbackFor = Exception.class)
public void updateMaster() {
User user1 = baseDao.selectById(1);
user1.setAge(999);
baseDao.updateById(user1);
User user2 = baseDao.selectById(2);
user2.setAge(999);
baseDao.updateById(user2);
}
@DS("independent")
@Transactional(rollbackFor = Exception.class)
public void updateIndependent() {
Staff staff1 = staffMapper.selectById(1);
staff1.setAge(999);
staffMapper.updateById(staff1);
Staff staff2 = staffMapper.selectById(2);
staff2.setAge(999);
staffMapper.updateById(staff2);
}可以观察到seata_undo_log中的undo记录SELECT CAST(rollback_info AS char) FROM seata_undo_log{
"@class": "io.seata.rm.datasource.undo.BranchUndoLog",
"xid": "192.168.101.1:8091:6593516322371825665",
"branchId": 6593516322371825668,
"sqlUndoLogs": [
"java.util.ArrayList",
[
{
"@class": "io.seata.rm.datasource.undo.SQLUndoLog",
"sqlType": "UPDATE",
"tableName": "staff",
"beforeImage": {
"@class": "io.seata.rm.datasource.sql.struct.TableRecords",
"tableName": "staff",
"rows": [
"java.util.ArrayList",
[
{
"@class": "io.seata.rm.datasource.sql.struct.Row",
"fields": [
"java.util.ArrayList",
[
{
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "id",
"keyType": "PRIMARY_KEY",
"type": 4,
"value": [
"java.lang.Long",
1
]
},
{
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "name",
"keyType": "NULL",
"type": 12,
"value": "1"
},
{
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "age",
"keyType": "NULL",
"type": 4,
"value": 2
}
]
]
}
]
]
},
"afterImage": {
"@class": "io.seata.rm.datasource.sql.struct.TableRecords",
"tableName": "staff",
"rows": [
"java.util.ArrayList",
[
{
"@class": "io.seata.rm.datasource.sql.struct.Row",
"fields": [
"java.util.ArrayList",
[
{
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "id",
"keyType": "PRIMARY_KEY",
"type": 4,
"value": [
"java.lang.Long",
1
]
},
{
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "name",
"keyType": "NULL",
"type": 12,
"value": "1"
},
{
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "age",
"keyType": "NULL",
"type": 4,
"value": 999
}
]
]
}
]
]
}
},
{
"@class": "io.seata.rm.datasource.undo.SQLUndoLog",
"sqlType": "UPDATE",
"tableName": "staff",
"beforeImage": {
"@class": "io.seata.rm.datasource.sql.struct.TableRecords",
"tableName": "staff",
"rows": [
"java.util.ArrayList",
[
{
"@class": "io.seata.rm.datasource.sql.struct.Row",
"fields": [
"java.util.ArrayList",
[
{
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "id",
"keyType": "PRIMARY_KEY",
"type": 4,
"value": [
"java.lang.Long",
2
]
},
{
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "name",
"keyType": "NULL",
"type": 12,
"value": "2"
},
{
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "age",
"keyType": "NULL",
"type": 4,
"value": 3
}
]
]
}
]
]
},
"afterImage": {
"@class": "io.seata.rm.datasource.sql.struct.TableRecords",
"tableName": "staff",
"rows": [
"java.util.ArrayList",
[
{
"@class": "io.seata.rm.datasource.sql.struct.Row",
"fields": [
"java.util.ArrayList",
[
{
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "id",
"keyType": "PRIMARY_KEY",
"type": 4,
"value": [
"java.lang.Long",
2
]
},
{
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "name",
"keyType": "NULL",
"type": 12,
"value": "2"
},
{
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "age",
"keyType": "NULL",
"type": 4,
"value": 999
}
]
]
}
]
]
}
}
]
]
}可以看到该表内存储了数据操作前和操作后的记录微服务项目分布式事务如果是微服务项目, 需要分布式事务支持, 配置如下引入依赖<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>配置和单体服务多库事务是一样的, seata.application-id可以不填, 默认取当前应用id每个模块都需要配置, 因为seata需要代理数据源但实际1.4.2版本使用jackson/fastjson序列化Date字段时会失败(https://github.com/seata/seata/issues/3883), 可以替换序列化方式为kryo需要额外引入依赖<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-serializer-kryo</artifactId>
<version>1.4.2</version>
</dependency>简易demo@Autowired
RoleFeignClient roleFeignClient;
@Autowired
StaffFeignClient staffFeignClient;
@GlobalTransactional(rollbackFor = Exception.class)
public RestResult<Boolean> globalTxTest() {
log.info("xid: {}", RootContext.getXID());
roleFeignClient.updateRole();
staffFeignClient.updateUser();
int i = 1 / 0;
return RestResult.ok();
}@Override
@Transactional(rollbackFor = Exception.class)
public RestResult<Boolean> updateRole() {
log.info("xid: {}", RootContext.getXID());
RolePO role1 = roleDAO.selectById(1);
role1.setName("1111111111");
roleDAO.updateById(role1);
RolePO role2 = roleDAO.selectById(2);
role2.setName("2222222");
roleDAO.updateById(role2);
return RestResult.ok();
}@Override
@Transactional(rollbackFor = Exception.class)
public RestResult<Boolean> updateUser() {
log.info("xid: {}", RootContext.getXID());
StaffPO staff1 = baseMapper.selectById(1);
staff1.setAge(999);
baseMapper.updateById(staff1);
StaffPO staff2 = baseMapper.selectById(2);
staff2.setAge(999);
baseMapper.updateById(staff2);
return RestResult.ok();
}
策略模式,从防腐层改造聊到Nacos插件的应用
theme: smartbluehighlight: atom-one-dark前言总所周知,策略模式是个好东西,他不仅是一种技术,在我看来更是一种思想。what is 策略模式策略模式就像一个工具箱,当我们遇到不同的场景,拿出不同工具。它的好处是符合开闭原则还有单一原则,当我们需要对另外一种场景进行处理的时候,只需要去打造另一款工具,而不是在之前的工具去修改。
防腐层改造what is 防腐层我们在学习DDD的时候,会学习到防腐层。它主要功能是将第三方api进行隔离,这样不会跟内部系统进行强耦合,提高了可扩展性。how to do it场景刚好在项目里头,有依赖第三方api,之前由于时间很赶,直接跟业务代码耦合在一起了,在最近时间比较充裕的时候,我们将它优化一下。思路:采用防腐层,将外部api进行隔离开,采用api的形式进行实现,这样为后面的扩展提供良好的基础。防腐层改造首先是定义api,为了实现类通过该接口进行扩展。public interface Facade {
/**
* 处理逻辑
*
*/
xx<?> dealWith();
/**
* 类型
*
* @return
*/
String getType();
}
然后我们来实现扩展类
@Service
public class xxFacade implements Facade {
@Override
xx<?> dealWith(){
//todo 做特定逻辑处理
}
}
编写设配器
@Service
public class Adapter {
@Resource
private List<Facade> facadeList;
private Facade getFacade(String xx) {
return facadeList.stream().filter(it -> it.getType().equals(xx)).findFirst().orElseThrow(() -> new BizException("没有找到实现类"));
}
public xx<?> dealWith(xx) {
return getFacade(xx).dealWith();
}
}
到这里我们看到,借助spring注入实例的方法,然后通过接口里头的type方法,来判断我们具体要拿出什么工具来解决问题。到此反腐层就改造完成?Nacos 插件我们看下官网的文档,里面也有介绍Nacos插件这一块spi,如下图鉴权这一块的spi。接下来,我们来看下策略模式中里面的体现吧~
首先看到的是接口类,我们可以通过实现这个接口来扩展鉴权功能。
这里就是接口的一种实现类。
这里通过classload来将所有实现该接口的类塞到set里面,然后根据特定的标识来获取。总结策略模式很常见的设计模式,我们可以借助它来提高代码质量,提高系统的扩展性。
SpringCloud升级之路2020.0.x版-16.Eureka架构和核心概念
Eureka 目前 1.x 版本还在更新,但是应该不会更新新的功能了,只是对现有功能进行维护,升级并兼容所需的依赖。 Eureka 2.x 已经胎死腹中了。但是,这也不代表 Eureka 就是不能用了。如果你需要一个简便易于部署的注册中心,Eureka 还是一个很好的选择。云服务环境中,基本上所有实例地址和微服务名称都在不断变化,也并不太需要 Eureka 所缺少的持久化特性。当你的集群属于中小规模的时候(节点小于 1000 个), Eureka 依然是一个不错的选择。当你的集群很大的时候,Eureka 的同步机制可能就限制了他的表现。Eureka 的设计比较小巧,没有复杂的同步机制(例如 Nacos 基于 Raft,Zookeeper 基于 Zab),也没有复杂的持久化机制,集群关系只是简单的将收到的客户端请求转发到集群内的其他 Eureka 实例。Eureka 本身也只有注册中心的功能,不像其他种类的注册中心那样,将注册中心和配置中心合在一起,例如 Consul 和 nacos。这里我们忽略所有的 AWS 相关的术语以及配置还有相关逻辑处理。Eureka 中的术语:Eureka 实例:每个注册到 Eureka 上面的实例就是 Eureka 实例。Eureka 实例状态:包括 UP(可以处理请求),DOWN(健康检查失败,不能正常处理请求),STARTING(启动中,不能处理请求),OUT_OF_SERVICE(人为下线,暂时不处理请求),UNKNOWN(未知状态)。Eureka 服务器:作为注册中心运行,主要提供实例管理功能(处理实例注册(register)请求、处理实例注销(cancel)请求、处理实例心跳(renew)请求、内部处理实例过期(evict))、实例查询功能(各种查询实例信息的接口,例如通过 AppName 获取实例列表,通过实例 id 获取实例信息等等)Eureka 服务器集群:Eureka 服务器的集群,每个 Eureka 服务器都配置了区域以及可用区,Eureka 服务器收到的客户端请求会转发到同一区域内的其他 Eureka 服务器,可以配置优先发到同一可用区的 Eureka 服务器。非同一区域内 Eureka 服务器,通过定时拉取的方式进行同步。Eureka 客户端:请求 Eureka 服务器的客户端。封装发送实例注册(register)请求、实例注销(cancel)请求和实例心跳(renew)请求。VIP(或者是 Virtual Hostname): Eureka 中可以通过两种方式获取实例,一个是通过服务名称,另一种是通过 VIP。每个实例都有服务名称,以及 VIP。Eureka 服务器中的索引方式是以服务名称为 key 的索引,我们也可以通过遍历所有实例信息的方式通过 VIP 字符串匹配获取相关的实例。在 Spring Cloud 体系中,一个实例的 VIP、SVIP(其实就是 Secure VIP,即 https 的地址)以及服务名称都是 spring.application.name 指定的服务名称。首先,Service A 通过 Eureka Client 发送注册请求(Register)到同一可用区的 Eureka Server 1。之后通过发送心跳请求(Renew)到这个 Eureka Server 1. Eureka Server 1 收到这些请求的时候,会处理这些请求并将这些请求转发到其他的集群内的 Eureka Server 2 和 Eureka Server 3. Eureka Server 2 和 Eureka Server 3 不会再转发收到的 Eureka Server 1 转发过来的请求。然后,Service B 还有 Service C 通过 Eureka 获取到了 Service A 的位置,最后调用了 Service A。对于本地没有查询到的微服务,Eureka Server 还会从远程 Region 的 Eureka Server 去获取,例如这里对于 Service D,本地没有查到,Eureka Server 会返回远程 Region 的 Service D 的实例。由于本地有 Service A,所以肯定不会返回远程 Region 的 Service A 的实例。并且,本地是定时拉取的远程 Region 的 Service 列表,并不是每次查询的时候现查询的。一般的,微服务之间的互相调用,并不经过 Eureka,也不会涉及到 Eureka 客户端了,而是通过负载均衡器调用,这个我们后面就会提到。我们这一节详细分析了 Eureka 的架构,以及其中的核心概念。下一节,我们将开始介绍我们微服务的注册中心 Eureka 的实例配置。
Spring Cloud 升级之路 - 2020.0.x - 4. 使用 Eureka 作为注册中心(上)
Eureka 目前的状态:Eureka 目前 1.x 版本还在更新,但是应该不会更新新的功能了,只是对现有功能进行维护,升级并兼容所需的依赖。 Eureka 2.x 已经胎死腹中了。但是,这也不代表 Eureka 就是不能用了。如果你需要一个简便易于部署的注册中心,Eureka 还是一个很好的选择。云服务环境中,基本上所有实例地址和微服务名称都在不断变化,也并不太需要 Eureka 所缺少的持久化特性。当你的集群属于中小规模的时候(节点小于 1000 个), Eureka 依然是一个不错的选择。当你的集群很大的时候,Eureka 的同步机制可能就限制了他的表现。Eureka 的设计Eureka 的设计比较小巧,没有复杂的同步机制,也没有复杂的持久化机制,集群关系只是简单的将收到的客户端请求转发到集群内的其他 Eureka 实例。Eureka 本身也只有注册中心的功能,不像其他种类的注册中心那样,将注册中心和配置中心合在一起,例如 Consul 和 nacos。Eureka 的交互流程如下:首先,Service A 通过 Eureka Client 发送注册请求(Register)到同一可用区的 Eureka Server 1。之后通过发送心跳请求(Renew)到这个 Eureka Server 1. Eureka Server 1 收到这些请求的时候,会处理这些请求并将这些请求转发到其他的集群内的 Eureka Server 2 和 Eureka Server 3. Eureka Server 2 和 Eureka Server 3 不会再转发收到的 Eureka Server 1 转发过来的请求。然后,Service B 还有 Service C 通过 Eureka 获取到了 Service A 的位置,最后调用了 Service A。对于本地没有查询到的微服务,Eureka Server 还会从远程 Region 的 Eureka Server 去获取,例如这里对于 Service D,本地没有查到,Eureka Server 会返回远程 Region 的 Service D 的实例。由于本地有 Service A,所以肯定不会返回远程 Region 的 Service A 的实例。并且,本地是定时拉取的远程 Region 的 Service 列表,并不是每次查询的时候现查询的。一般的,微服务之间的互相调用,并不经过 Eureka,也不会涉及到 Eureka 客户端了,而是通过负载均衡器调用,这个我们后面就会提到。Eureka 相关概念这里我们忽略所有的 AWS 相关的术语以及配置还有相关逻辑处理。Eureka 中的术语:Eureka 实例:每个注册到 Eureka 上面的实例就是 Eureka 实例。Eureka 实例状态:包括 UP(可以处理请求),DOWN(健康检查失败,不能正常处理请求),STARTING(启动中,不能处理请求),OUT_OF_SERVICE(人为下线,暂时不处理请求),UNKNOWN(未知状态)。Eureka 服务器:作为注册中心运行,主要提供实例管理功能(处理实例注册(register)请求、处理实例注销(cancel)请求、处理实例心跳(renew)请求、内部处理实例过期(evict))、实例查询功能(各种查询实例信息的接口,例如通过 AppName 获取实例列表,通过实例 id 获取实例信息等等)Eureka 服务器集群:Eureka 服务器的集群,每个 Eureka 服务器都配置了区域以及可用区,Eureka 服务器收到的客户端请求会转发到同一区域内的其他 Eureka 服务器,可以配置优先发到同一可用区的 Eureka 服务器。非同一区域内 Eureka 服务器,通过定时拉取的方式进行同步。Eureka 客户端:请求 Eureka 服务器的客户端。封装发送实例注册(register)请求、实例注销(cancel)请求和实例心跳(renew)请求。VIP(或者是 Virtual Hostname): Eureka 中可以通过两种方式获取实例,一个是通过服务名称,另一种是通过 VIP。每个实例都有服务名称,以及 VIP。Eureka 服务器中的索引方式是以服务名称为 key 的索引,我们也可以通过遍历所有实例信息的方式通过 VIP 字符串匹配获取相关的实例。在 Spring Cloud 体系中,一个实例的 VIP、SVIP(其实就是 Secure VIP,即 https 的地址)以及服务名称都是 spring.application.name 指定的服务名称。Eureka 相关配置Eureka 实例配置:Eureka 实例,每个注册到 Eureka 上面的实例就是 Eureka 实例。Eureka 实例包含以下元素,以及相关配置:基本信息:包括 IP,端口等访问这个 Eureka 实例所需的信息:eureka:
instance:
#一般不用我们自己设置,EurekaInstanceConfigBean 的构造器会通过 InetUtils 获取 ip 地址
#ip-address:
#一般不用我们自己设置,EurekaInstanceConfigBean 的构造器会通过 InetUtils 获取 hostname
#hostname:
#注册到 eureka 上面供其他实例访问的地址使用 ip 进行注册,其他实例会通过 ip 进行访问
prefer-ip-address: true
#不用设置 non-secure-port,自动使用 server.port 作为 non-secure-port
#non-secure-port:
#如果 secure-port-enabled 是 true,则会自动使用 server.port 作为 secure-port;我们一般内部调用不用 ssl,所以不需要配置 secure-port
#secure-port:
#默认是启用 non-secure-port 的
non-secure-port-enabled: true
#默认是不启用 secure-port 的,我们一般内部调用不用 ssl
secure-port-enabled: false
#个性化的实例id,包括 ip:微服务名称:端口
instance-id: ${spring.cloud.client.ip-address}:${spring.application.name}:${server.port}
# app名称,不填写在 Spring-cloud-netflix 体系下默认就是 spring.application.name
appname: ${spring.application.name}
#app组名称归类用的,目前也没什么用
app-group-name: common
#实例命名空间,目前也没什么用
namespace: public基本链接信息:包括首页路径地址以及健康检查路径地址:eureka:
instance:
# 健康检查地址,默认是 /actuator/health
health-check-url-path: /actuator/health
# 实例状态地址,默认是 /actuator/info
status-page-url-path: /actuator/info
# 首页地址,默认是 /
home-page-url-path: /实例注册行为,即实例注册后的行为,以及心跳间隔等配置:eureka:
instance:
# 服务过期时间配置,超过这个时间没有接收到心跳EurekaServer就会将这个实例剔除
# 注意,EurekaServer一定要设置eureka.server.eviction-interval-timer-in-ms否则这个配置无效
# 这个配置一般为服务刷新时间配置的三倍
# 默认90s
lease-expiration-duration-in-seconds: 15
#服务刷新时间配置,每隔这个时间会主动心跳一次
#默认30s
lease-renewal-interval-in-seconds: 5
registry:
#请参考 wait-time-in-ms-when-sync-empty 配置说明
default-open-for-traffic-count: 1
#初始期望发送心跳请求的实例个数,默认为1,在有新实例注册的时候,会 +1,有注销的时候会 -1,初始默认为 1 一般因为自己也注册到 eureka 上
expected-number-of-clients-sending-renews: 1
#实例注册后是否立刻开始服务,默认为 false,一般注册后还需要做一些操作,所以注册实例的状态是 STARTING。后面改变状态后会更新为 UP
instance-enabled-onit: false实例元数据:eureka:
instance:
#元数据map,我们可以自己使用,放一些个性化的元数据,目前只有 configPath 和 zone 比较有用。 configPath 是使用 spring-cloud-config 的时候会设置
metadata-map:
# spring cloud 体系中,可用区的配置放入元数据中,key 为 zone
zone: zone1Eureka 客户端配置:Eureka 服务器地址配置,可以直接指定链接,也可以通过 region 和 zone 进行配置,也可以通过 DNS 配置:eureka:
instance:
# 可用区列表,key 为 region,value 为 zone
availability-zones:
region1: zone1, zone2
region2: zone3
# 所在区域,通过这个读取 availability-zones 获取 zone,然后通过 zone 读取 service-url 获取对应的 eureka url
# 这里的逻辑对应的类是 ConfigClusterResolver 和 ZoneAffinityClusterResolver
region: region1
# key 为 zone,value 为 eureka 链接,以逗号分隔
service-url:
# 默认eureka集群,这里必须是defaultZone,不能用-替换大写,与其他的配置不一样,因为实在EurekaClientConfigBean里面写死的
defaultZone: http://127.0.0.1:8211/eureka/
zone1: http://127.0.0.1:8212/eureka/
zone2: http://127.0.0.1:8213/eureka/
zone3: http://127.0.0.1:8214/eureka/
# 如果上面 eureka server 地址相关配置更新了,多久之后会重新读取感知到
eureka-service-url-poll-interval-seconds: 300
# 是否使用 dns 获取,如果指定了则通过下面的 dns 配置获取,而不是上面的 service-url
use-dns-for-fetching-service-urls: false
# dns 配置
# eureka-server-d-n-s-name:
# dns 配置的 eureka server 的 port
# eureka-server-port:
# dns 配置的 eureka server 的 port 后面的 uri 前缀 context
# eureka-server-u-r-l-context:
# 如果设置为 true,则同一个 zone 下的 eureka 会跑到前面优先访问。默认为 true
prefer-same-zone-eureka: true