SpringBoot读写分离配置与事务
引入依赖<dependency>
<groupId>com.baomidou</groupId>
<artifactId>dynamic-datasource-spring-boot-starter</artifactId>
<version>3.5.1</version>
</dependency>跟mybatis-plus属于同一个开源组织 苞米豆配置文件spring:
datasource:
dynamic:
primary: master
# 严格匹配数据源,默认false. true未匹配到指定数据源时抛异常,false使用默认数据源
strict: true
datasource:
master:
url: jdbc:mysql://192.168.101.128:3307/test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
username: root
password: 123456
slave:
url: jdbc:mysql://192.168.101.128:3308/test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
username: root
password: 123456
# 按需开启日志
logging:
level:
com.baomidou.dynamic: debug方法或类上加上@DS注解即可切换数据源该框架获取数据库连接的核心逻辑是以下这段在被DS注解标记的方法上, 会被此拦截器拦截, 获取到注解上定义的值, 并存入栈结构中com.baomidou.dynamic.datasource.aop.DynamicDataSourceAnnotationInterceptorpublic Object invoke(MethodInvocation invocation) throws Throwable {
String dsKey = determineDatasourceKey(invocation);
//获取注解值入栈
DynamicDataSourceContextHolder.push(dsKey);
try {
return invocation.proceed();
} finally {
//方法结束后出栈
DynamicDataSourceContextHolder.poll();
}
}然后com.baomidou.dynamic.datasource.ds.AbstractRoutingDataSource#getConnection()public Connection getConnection() throws SQLException {
String xid = TransactionContext.getXID();
if (StringUtils.isEmpty(xid)) {
return determineDataSource().getConnection();
} else {
//获取栈顶的一个值
String ds = DynamicDataSourceContextHolder.peek();
ds = StringUtils.isEmpty(ds) ? "default" : ds;
//根据值获取对应数据库的连接
ConnectionProxy connection = ConnectionFactory.getConnection(ds);
return connection == null ? getConnectionProxy(ds, determineDataSource().getConnection()) : connection;
}
}可以发现框架是通过维护一个栈结构进行对应数据源的切换, 类似方法的栈, 因为方法间可能嵌套调用, 所以使用此结构便于管理但Spring的@Transactional会影响@DS例如@Autowired
@Lazy
CurrentService currentService;
@DS("master")
@Transactional
public void updateUser() {
baseMapper.updateById(user);
System.out.println(currentService.get());
}
@DS("slave")
public User get() {
return baseMapper.selectById(1);
}在这里, master和slave是使用binlog搭建的读写分离架构但实际get方法却能读取到updateUser所做的修改, 通过Debug也能看到真正的数据库连接属性, get方法还是使用的master库因为在Spring管理下, 获取到数据库连接后, 会和当前线程进行绑定, 如果后面的方法被判断为不需要新建连接, 则复用之前与线程绑定的连接, 那么即使有DS注解, 也切换不了库如何判断需不需要新建连接? 看被调用方法是否定义了事务传播属性.在org.springframework.transaction.support.AbstractPlatformTransactionManager#getTransaction方法中if (isExistingTransaction(transaction)) {
//找到现有事务 -> 检查传播行为以了解行为方式
//当前已经存在一个事务
return handleExistingTransaction(def, transaction, debugEnabled);
}继续调用org.springframework.transaction.support.AbstractPlatformTransactionManager#handleExistingTransaction//判断当前方法的隔离属性是否为PROPAGATION_REQUIRES_NEW
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
if (debugEnabled) {
logger.debug("Suspending current transaction, creating new transaction with name [" + definition.getName() + "]");
}
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
return startTransaction(definition, transaction, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
}继续调用org.springframework.transaction.support.AbstractPlatformTransactionManager#startTransactionprivate TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}继续调用org.springframework.jdbc.datasource.DataSourceTransactionManager#doBeginif (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
//获取当前的数据源, 此处才能让@DS注解生效
Connection newCon = obtainDataSource().getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
//将数据库连接绑定到事务
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}所以如果想让get方法读取从库, 则需要定义传播属性以便让Spring建立新连接第二种方法就是使用该框架的@DSTransactional该方法也会进行事务管理, 但功能比较简陋这个注解还提供了一个本地事务的功能: 解决多数据源的事务问题.但这个功能也有问题, 不建议使用看这个方法 com.baomidou.dynamic.datasource.tx.ConnectionFactory#notifypublic static void notify(Boolean state) {
try {
Map<String, ConnectionProxy> concurrentHashMap = CONNECTION_HOLDER.get();
//获取当前线程所有的数据库连接, 通知其进行回滚/提交, 可能存在某一个事务提交成功, 某一事务提交失败.
//并不能保证最终一致性
for (ConnectionProxy connectionProxy : concurrentHashMap.values()) {
connectionProxy.notify(state);
}
} finally {
CONNECTION_HOLDER.remove();
}
}对于这种多库事务, 建议使用Seata或消息队列贴心的是, 框架还与Seata进行了整合引入依赖<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.4.2</version>
</dependency>spring:
datasource:
dynamic:
#seata1.0之后支持自动代理 这里直接配置true
seata: true
【面经分享】-一年工作经验阿里三面
最近原来实习时候的Boss联系我,说他跳槽到了阿里,问我有没有兴趣面一个Java后台开发岗位。考虑到我只工作了一年,现在去阿里肯定要降薪,因此也没有太强烈的意愿。但出于提升自我的角度考虑,参加了面试。一面(电话面试一小时)首先做一个简单的自我介绍,主要包括学校经历和工作经历。我工作经历只有一年,大部分时间都是在做产品设计和UI/UX Design,因此隔着电话都能感受到面试官的shock。● Java基础。自动拆装箱如何实现,String,StringBuffer,StringBuilder的异同以及各自的实现。● JVM基础。JVM的内存模型,常见的垃圾回收算法。● 事务ACID,编程时如何保证事务,分布式情况下如何保证事务。● 由于分布式相关场景我没有接触过,因此面试官一直诱导我去设计实现一个分布式事务。● 数据库乐观锁和悲观锁。如何实现一个乐观锁。● 消息队列使用场景,Kafka的架构以及原理。● 什么是restful api,和rpc调用有什么区别。● 单例的几种写法。volatile关键字有什么作用。以上就是电话面试的大体问题,面试完之后,又发给我三道算法题目,要求我一小时内完成,下面是三道算法题:● 翻转一个long类型数字。例如输入123456L,输出654321L。- Leetcode翻转integer的变种。考察能否正确处理溢出的情况。● 输入一个double,要求返回与它最接近的.49或.99的数字。例如12.77返回12.99,11.02返回10.99,12.61返回12.49。● 有三个线程ABC分别向一个数组中写入a,l,i,要求最终的写入结果形如alialiali...写入次数由A线程决定。这三道题目做的还比较顺利,第二天面试官又联系我阐述一下第一题和第三题的思路,然后通知我可以参加下一轮了。二面(电话面试一小时)二面主要考察了一些开放式的问题。● 首先还是自我介绍。主要是工作后的经历。介绍一下工作一年所在team的产品,我承担了什么职责。● 开放式问题。如何设计一个rpc框架。● 开放式问题。如何设计一个服务注册中心。● 集合类源码。HashMap是如何实现的,扩容的过程,为什么要扩容为2倍。HashMap中的链表替换为数组可以吗?时间复杂度相同吗?● 集合类源码。线程安全的HashMap是什么?(HashTable和ConcurrentHashMap)ConcurrentHashMap是如何实现的?(Java7分段锁和Java8的CAS+Lock)和HashTable相比有什么优势?● 红黑树的结构,时间复杂度是多少,如何计算的● 什么是CAS操作,如何实现一个自定义锁● 数据库设计。有一张很大的order表,如何设计能够提升查询效率(同时满足根据买家id和卖家id查询)?二面也同样是一小时左右,面试过程还算顺利。只是当时我在厦门鼓浪屿的一家小餐馆吃晚饭,周围的嘈杂和闷热使我很烦躁,感觉面试官态度有些傲慢……ps.一面二面结束后面试官都各种暗示我要疯狂加班能不能接受blabla……三面(电话面试一个半小时)二面结束后的第三天,就收到了现场三面的通知。然而我还在厦门旅行,因此改为了电话面试。三面是一个大Boss,因此面试的问题都更考察一些分析问题的能力。● 介绍一下你工作一年学习到什么?所在项目的架构是什么样的?UI/UX设计有哪些规范(由于我说我学到了一些UI/UX设计方法,因此面试官就问了)?● 数据隔离级别,脏读幻读。● 线程池原理。● Synchronized的实现,锁的升级过程。● K8s的作用,K8s的底层架构。● 对我业余时间做的一些项目做了介绍。● 你觉得加入阿里你能给阿里带来什么?● 进入阿里你需要忍受很多困难,需要迎难而上,如果绩效考评拿到差评,你会怎么办?三面总的来说也还算顺利,面试官也算和蔼。总结整个流程从一面到三面结束大约持续了10天左右。总的来说,问题都是预期范围内的,虽然面试过程中问到了一些分布式相关问题,我都没有任何经验,这时候不要放弃,主动说出你的思路,然后在面试官的诱导下,相信你能说出属于的答案。最后,是我总结的一些面试Java后台工程师必须要掌握的知识点。集合类源码● ArrayList:内部数据结构,数组扩容机制 ● LinkedList:内部数据结构,为什么使用双向链表 ● HashMap:内部数据结构,put方法的完整流程,扩容机制 ● LikedHashMap:内部数据结构,如何实现一个Cache ● TreeMap:内部数据结构,时间复杂度 ● CurrentHashMap:内部数据结构,Java7分段锁,Java8 CAS+SynchronizedJava基础● 自动拆装箱原理● String,StringBuffer和StringBuilder● Throwable● reader和stream● NIOJVM基础● JVM内存模型● 常见垃圾回收算法并发编程基础● Synchronized关键字原理● wait,notify,sleep● 安全的终止线程以及线程的状态转换● 自定义Lock● 线程池原理数据库基础● 数据库三范式,事务ACID,隔离级别,视图,索引● JPA实体状态● EntityManger网络基础● TCP/IP常见设计模式● 装饰者,模板方法,策略,工厂,状态
【面试题看源码】-HashMap 初始容量 计算方法
HashMap 初始容量 计算方法如果在new HashMap的时候,没有指定初始initialCapacity,则初始initialCapacity为16,负载因子为0.75,下次扩容阈值为 16*0.75=12这个初始容量 不一定等于初始化完成后底层数组实际的容量,因为存在阈值的计算,方法如下;也不是初始容量是多少开始就能存多少个元素,因为存在负载因子,在底层数组还没满的时候就会进行扩容。阈值计算方法为:static final int tableSizeFor(int cap) {
int n = cap - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}该方法计算大于等于输入参数并最接近参数的2的整数次幂的数,如10,返回16cap -1 ,-1是为了在计算的时候能得到大于等于输入参数的值在HashMap 进行put方法的时候,如果判断已有的元素大于 阈值就会触发扩容计算,扩容步骤如代码所示:final Node<K,V>[] resize() {
//原table数组赋值
Node<K,V>[] oldTab = table;
//如果原数组为null,那么原数组长度为0
int oldCap = (oldTab == null) ? 0 : oldTab.length;
//赋值阈值
int oldThr = threshold;
//newCap 新数组长度
//newThr 下次扩容的阈值
int newCap, newThr = 0;
// 1. 如果原数组长度大于0
if (oldCap > 0) {
//如果大于最大长度1 << 30 = 1073741824,那么阈值赋值为Integer.MAX_VALUE后直接返回
if (oldCap >= MAXIMUM_CAPACITY) {
threshold = Integer.MAX_VALUE;
return oldTab;
}
// 2. 如果原数组长度的2倍小于最大长度,并且原数组长度大于默认长度16,那么新阈值为原阈值的2倍
else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&
oldCap >= DEFAULT_INITIAL_CAPACITY)
newThr = oldThr << 1; // double threshold
}
// 3. 如果原数组长度等于0,但原阈值大于0,那么新的数组长度赋值为原阈值大小
else if (oldThr > 0) // initial capacity was placed in threshold
newCap = oldThr;
else { // zero initial threshold signifies using defaults
// 4. 如果原数组长度为0,阈值为0,那么新数组长度,新阈值都初始化为默认值
newCap = DEFAULT_INITIAL_CAPACITY;
newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
}
// 5.如果新的阈值等于0
if (newThr == 0) {
//计算临时阈值
float ft = (float)newCap * loadFactor;
//新数组长度小于最大长度,临时阈值也小于最大长度,新阈值为临时阈值,否则是Integer.MAX_VALUE
newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?
(int)ft : Integer.MAX_VALUE);
}
//计算出来的新阈值赋值给对象的阈值
threshold = newThr;
@SuppressWarnings({"rawtypes","unchecked"})
//用新计算的数组长度新建一个Node数组,并赋值给对象的table
Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];
table = newTab;
//后面是copy数组和链表数据逻辑
if (oldTab != null) {
for (int j = 0; j < oldCap; ++j) {
Node<K,V> e;
if ((e = oldTab[j]) != null) {
oldTab[j] = null;
if (e.next == null)
newTab[e.hash & (newCap - 1)] = e;
else if (e instanceof TreeNode)
((TreeNode<K,V>)e).split(this, newTab, j, oldCap);
else { // preserve order
Node<K,V> loHead = null, loTail = null;
Node<K,V> hiHead = null, hiTail = null;
Node<K,V> next;
do {
next = e.next;
if ((e.hash & oldCap) == 0) {
if (loTail == null)
loHead = e;
else
loTail.next = e;
loTail = e;
}
else {
if (hiTail == null)
hiHead = e;
else
hiTail.next = e;
hiTail = e;
}
} while ((e = next) != null);
if (loTail != null) {
loTail.next = null;
newTab[j] = loHead;
}
if (hiTail != null) {
hiTail.next = null;
newTab[j + oldCap] = hiHead;
}
}
}
}
}
return newTab;
}总结:如下3种情况,例子需要自己调式,主要关注数组长度(OldCap,newCap)新老变化,扩容阈值(oldThr,newThr)新老变化//①
Map<String, String> map = new HashMap<>();
map.put("1", "1");
//②
Map<String, String> map1 = new HashMap<>(2);
map1.put("2", "2");
//③
Map<String, String> map2 = new HashMap<>(2, 0.5f);
map2.put("3", "3");① 没有设置initialCapacity,也没有设置负载因子,第一次put的时候会触发扩容第一次的时候,数组长度为默认值16,阈值为160.75=12,走的代码4逻辑,等到数组长度超过阈值12后,触发第二次扩容,此时table数组,和threshold都不为0,即oldTab、oldCap、oldThr都不为0,先走代码1,如果oldCap长度的2倍没有超过最大容量,并且oldCap 长度大于等于 默认容量16,那么下次扩容的阈值 变为oldThr大小的两倍即 12 2 = 24,newThr = 24,newCap=32② 设置了initialCapacity,没有设置负载因子,此时hashMap使用默认负载因子0.75,本实例设置的初始容量为2,通过计算阈值为2,第一次put的时候由于还没初始化table数组,因此触发第一次扩容。此时oldCap为0,oldThr为2,走代码3,确定这次扩容的新数组大小为2,此时还没有确定newThr 下次扩容的大小,于是进入代码5 确定newThr为 2 0.75 = 1.5 取整 1 ,及下次扩容阈值为1。当数组已有元素大于阈值及1时,触发第二次扩容,此时oldCap为1,oldThr为1,走代码1newCap = oldCap << 1 结果为 4 小于最大容量, 但oldCap 小于hashMap默认大小16,结果为false,跳出判断,此时由于newThr等于0,进入代码5,确定newThr为 4 0.75 = 3,下次扩容阈值为3③ 设置了initialCapacity=2,负载因子为0.5,通过tableSizeFor计算阈值为2,第一次put的时候,进行扩容,此时oldCap为2,oldThr为2,进入代码1,同实例②,newCap = oldCap << 1 结果为 4 小于最大容量, 但oldCap 小于hashMap默认大小16,结果为false,跳出判断,进入代码5,确定newThr为 4 * 0.5 = 2,下次扩容阈值为2面试回答要素回答什么情况下会第一扩容,举例说明,新数组大小,阈值大小以后什么情况下会再次扩容,这次是怎么计算新数组大小,及阈值大小的作者热门文章推荐:Java面试题专栏:《从Java面试题看源码》-LongAdder、LongAccumulator是个什么东西?《从Java面试题来看源码》-LinkedBlockingQueue 源码分析《从Java面试题看源码》-有哪些并发队列?及ConcurrentLinkedQueue 源码分析《从Java面试题看源码》-看完Kafka性能优化-让你吊打面试官《从Java面试题看源码》-默认线程池阻塞队列为什么用LinkedBlockingQueue
用seata有没有最佳实践,比方说需不需在前面放一个消息队列?
用seata有没有最佳实践,比方说需不需在前面放一个消息队列? 我们想在一个小金额,高并发单量的项目里使用
redis数据类型及常用数据操作
1 官方文档(大全)数据类型相关操作指令参考文档: http://redis.cn/commands.html命令速查:http://doc.redisfans.com2 String—字符串String数据结构是简单的 key-value 类型,value不仅可以是String,也可以是数字。String类型是二进制安全的,意思是redis的String可以包含任何数据,比如jpg图片或者序列化的对象。从内部实现来看其实String可以看作byte数组,最大上限是1G字节。下面是String类型的定义:struct sdshdr{long len;long free;char buf[];};len是buf数组的长度free是数组中剩余可用字节数buf是个char数组用于存贮实际的字符串内容常用操作:set key value 设置key对应的值为string类型的value
get key 获取key对应的value值,如果key不存在返回nil
KEYS pattern 查找所有符合给定模式pattern(正则表达式)的 key
EXISTS key 返回key是否存在,如果存在返回1,不存在返回0
DEL key 删除一个key值
TYPE key 返回key所存储的value的数据结构类型,它可以返回string, list, set, zset 和 hash等不同的类型。
SETNX key value 将key设置值为value,如果key不存在,这种情况下等同SET命令。 当key存在时,什么也不做。nx是not exist的意思。
SETEX key seconds value 设置key对应字符串value,并且设置key在给定的seconds时间之后超时过期。
SETRANGE key offset value 覆盖key对应的string的一部分,从指定的offset处开始,覆盖value的长度。如果offset比当前key对应string还要长,那这个string后面就补0以达到offset。
INCR key 对存储在指定key的数值执行原子的加1操作。
DECR key 对key对应的数字做减1操作。
MSET key value 对存对应给定的keys到他们相应的values上。
MGET key[key ...] 返回所有指定的key的value。
3 Hash—字典redis哈希是键值对的集合,它是是字符串字段和字符串值之间的映射,所以它们用来表示对象。常用操作:HSET key field value 设置 key 指定的哈希集中指定字段的值。
HGET key field 返回 key 指定的哈希集中该字段所关联的值。
HKEYS key 返回 key 指定的哈希集中所有字段的名字。
HDEL key field [field ...] 从 key 指定的哈希集中移除指定的域。
HLEN key 返回 key 指定的哈希集包含的字段的数量。
HMSET key field value [field value ...] 设置 key 指定的哈希集中指定字段的值。
HMGET key field [field ...] 返回 key 指定的哈希集中指定字段的值。
HGETALL key 返回 key 指定的哈希集中所有的字段和值。
4 List—列表List说白了就是链表(双端链表)。使用List结构,我们可以轻松地实现最新消息排行等功能(比如新浪微博的TimeLine )。List 的另一个应用就是消息队列,可以利用List的 PUSH操作,将任务存在List中,然后工作线程再用POP操作将任务取出进行执行。常用操作:LPUSH key value [value ...] 将所有指定的值插入到存于 key 的列表的头部。
LPOP key 移除并且返回 key 对应的 list 的第一个元素。
LRANGE key start stop 返回存储在 key 的列表里指定范围内的元素。
LTRIM key start stop 修剪(trim)一个已存在的 list,这样 list 就会只包含指定范围的指定元素。
5 Set—无序集合Set是字符串的无序集合,集合指一堆不重复值的组合。利用 Redis提供的Set数据结构,可以存储一些集合性的数据。比如在微博应用中,可以将一个用户所有的关注人存在一个集合中,将其所有粉丝存在一个集合。因为 Redis 非常人性化的为集合提供了求交集、并集、差集等操作,那么就可以非常方便的实现如共同关注、共同喜好、二度好友等功能,对上面的所有集合操作,你还可以使用不同的命令选择将结果返回给客户端还是存集到一个新的集合中。常用操作:SADD key member [member ...] 添加一个或多个指定的member元素到集合的 key中。
SMEMBERS key 返回key集合所有的元素。
SREM key member [member ...] 在key集合中移除指定的元素。
SINTER key [key ...] 返回指定所有的集合的成员的交集。
SUNIONSTORE destination key [key ...] 返回给定的多个集合的并集中的所有成员,而是将结果存储在destination集合中。
例如:key1 = {a,b,c,d}key2 = {c}key3 = {a,c,e}SINTER key1 key2 key3 = {c}6 Sorted Set—有序集合和Sets相比,Sorted Sets是将 Set 中的元素增加了一个权重参数 score,使得集合中的元素能够按 score 进行有序排列,比如一个存储全班同学成绩的 Sorted Sets,其集合 value 可以是同学的学号,而 score 就可以是其考试得分,这样在数据插入集合的时候,就已经进行了天然的排序。另外还可以用 Sorted Sets 来做带权重的队列,比如普通消息的 score 为1,重要消息的score 为2,然后工作线程可以选择按 score 的倒序来获取工作任务,让重要的任务优先执行。常用操作:ZADD key score member [score member ...] 将所有指定成员添加到键为key有序集合(sorted set)里面。 添加时可以指定多个分数/成员(score/member)对。
ZRANGE key start stop [WITHSCORES] 返回有序集key中,指定区间内的成员。其中成员的位置按score值递减(从小到大)来排列。
ZREVRANGE key start stop [WITHSCORES] 返回有序集key中,指定区间内的成员。其中成员的位置按score值递减(从大到小)来排列。
7 Pub/Sub—订阅/发布Pub/Sub 从字面上理解就是发布(Publish)与订阅(Subscribe)。发件人(在 Redis 中的术语称为发布者)发送邮件,而接收器(订户)接收它们。信息传输的链路称为通道。Redis 一个客户端可以订阅任意数量的通道。在 Redis 中,你可以设定对某一个key 值进行消息发布及消息订阅,当一个 key 值上进行了消息发布后,所有订阅它的客户端都会收到相应的消息。这一功能最明显的用法就是用作实时消息系统,比如普通的即时聊天、群聊等功能。常用操作:SUBSCRIBE channel [channel ...] 订阅给指定频道的信息。一旦客户端进入订阅状态,客户端就只可接受订阅相关的命令。
PUBLISH channel message 将信息 message 发送到指定的频道 channel。
启动一个客户端发布一个主题:再启动一个客户端,只要有发布以“mike”主题的消息,上面客户端都能收到:8 Transactions—事务redis事务允许一组命令在单一步骤中执行。事务有两个属性:在一个事务中的所有命令作为单个独立的操作顺序执行。在Redis事务中的执行过程中而另一客户机发出的请求,这是不可以的。redis事务是原子的。原子意味着要么所有的命令都执行,要么都不执行。redis 事务由指令 MULTI 发起的,之后传递需要在事务中和整个事务中,最后由 EXEC 命令执行所有命令的列表。
MULTI 标记一个事务块的开始。 随后的指令将在执行EXEC时作为一个原子执行。
EXEC 执行事务中所有在排队等待的指令并将链接状态恢复到正常。
汇总图
请教一个问题,kafka流表与pg维表关联,目前采用lookupjoin的方式关联速度非常慢,有什么
请教一个问题,kafka流表与pg维表关联,目前采用lookupjoin的方式关联速度非常慢,有什么解决方案吗?cdc可以解决这个问题不?
有没有知道的flink 分流到kafka 报事物性错误
如题
我问下我这边测试kafka 数据关联mysql , Kafka left join mysql为什么
我问下我这边测试kafka 数据关联mysql , Kafka left join mysql为什么 Kafka数据变了,最后落地mysql 里面没数据呢?
分层架构
最近连续做了两个新项目,借着新项目的机会,重新审视一下之前一些实践方法,进而寻求一下背后的理论支撑新项目开始,首先一个就是会新建一个project,那么这个project怎么分层,怎么创建module,怎么分包呢?经典分层以传统方式,经典的MVC分层,就controller,service,model找来一张servlet时代的经典处理流程,虽然技术手段日益更新,但处理流程是一样的抽象一下,经典的分层就是:现在大多数系统都是这种分层结构。JavaBean里面只有简单的get和set方法,被很多人鄙视的“贫血模型”;业务逻辑变得复杂时,service会变得很臃肿,出现很多的“上帝类”回想一下,我们的所有的代码都写在ServiceImpl里面,前几行代码是做validation的事,接下来几行是做convert的事,然后是几行业务处理逻辑的代码,穿插着,我们需要通过RPC或者DAO获取更多的数据,拿到数据后,又是几行convert的代码,在接上一段业务逻辑代码,然后还要落库,发消息…等等这也是事务脚本开发方式,横行于世。数据与行为被分离。简单业务这样开发是合适的,但业务一复杂起来,需求频繁变更后,就没人能理解任何一段代码,业务逻辑和技术细节糅杂在一起,业务领域模型没法清晰表达出来,开发人员只知道怎么处理了数据,但背后的业务语义完全不知道其实呢?还有很多的包,如config,common,core等等, 如果使用了一些中间件,如rabbitmq,还会相应创建上对应的包,简单点可能就被放在了service包下从有了maven之,module概念更加显现化service
common
core
test我们的那么多包有了更加明确的地方放置,不再是直接放置在工程目录下由于上面的这些问题 ,我们似乎可以指出经典的三层架构的弱点:架构被过分简化,如果解决方案中包含发送邮件通知,代码应该放置在哪些层?它虽然提出了业务逻辑隔离,但没有明确的架构元素指导我们如何隔离DDD虽然技术日新月异,但大多仅仅是技术,带了实现的便利性,但对于业务层次,更多的还是经验。随着业务的复杂性提升,系统的腐化速度和复杂性呈指数上升。DDD带了很多的认知的改变,最大的好处是将业务语义显现化,不再是分离数据与行为,而是通过领域对象将领域概念清晰的显性化表达出来当然这世间并没有银弹,但至少能给我们带来一种改进经典分层的理论支撑DDD中带来了Repository概念,以及基础设施层,再结合【DIP原则】,可以把三层结构变成再细看一下Controller,这一层,做些什么呢?轻业务逻辑,参数校验,异常兜底。通常这种接口可以轻易更换接口类型,所以业务逻辑必须要轻,甚至不做具体逻辑但在现实中,有些更极端,在servlet时代,还做下HttpRequest转换成DTO,传入service,现在有了springmvc,struts2框架的转换,不需要转换了,那么controller成了一个透传层,直接调用service这儿有两个问题,既然controller是透传,那有必要存在吗?controller调用的service,这个service指的服务是什么呢?第一controller显然有必要存在,不在于业务,而在于技术实现。不管是http,rpc都得有个请求入口。像thrift可能会比其他的一些rpc框架例如dubbo会多出一层,作用和controller层类似Tservice与controller的作用是一样的第二service服务指的是什么?领域服务吗?如果一个复杂的业务,那么会跨越多个领域,自然需要多个领域服务。如果放在controller里面,也就是在controller里面去编排领域服务,如果切换到thrift,那Tservice就得重复因此,此时需要另一个service,在DDD中就是应用服务应用服务算是领域服务的facade,应用层是高层抽象的概念,但表达的是业务的含义,领域层是底层实现的概念,表达的是业务的细节controller
application
domain
infrastructure
startcontroller模块:restfull风格的少不了这层application模块:类似以前的service包domain模块:如果是事务脚本方式,domain模块就没有了infrastructure模块:基础模块,类似之前的dao包,但这里面都是实现类,而像repository接口则在domain模块,还需要对应的convertor模块里面各个包,可能需要按实践情况而定了,后面再从项目中抽取个archetype,使用maven直接生成
NoSQL场景|学习笔记
开发者学堂课程【Java面试疑难点串讲2:NoSQL场景】学习笔记,与课程紧密联系,让用户快速学习知识。课程地址:/learning/course/25NoSQL场景 首先需要确认一个问题,NoSQL能做什么? 在现在的开发领域中NoSQL可以实现文档存储(BSON、JSON)、缓存存储、图像缓存(图像搜索),但是对于NoSQL的具体应用场景完全要根据实际的业务来讲:- 1、在传统的开发之中由于经常要使用到多表查询,性能根差,所以可以将一些经常显示的数据整理到文档型的NoSQL数据库(MongoDB),但是现在这个文档型的NoSQL使用越来越少,可以忽略了: 2、缓存型:例如在进行分布式开发的时候session存储、做一些临时的数据,例如:购物车、短信验证码等,现在使用最多的缓存数据库就是Redis(可以保存在磁盘,断电后数据可以被保留下来):-使用Redis实现消息队列(有病),可以使用更加高级的RabbitMQ、Kafka实现更方便。 优点:可以实现每秒近乎10W次的读写处理。