网站建设图片素材,义乌网站建设方式,动画专业学什么,wordpress换成中文一、背景
与分布式锁相对应的是「单机锁」#xff0c;我们在写多线程程序时#xff0c;避免同时操作一个共享变量产生数据问题#xff0c;通常会使用一把锁来「互斥」#xff0c;以保证共享变量的正确性#xff0c;其使用范围是在「同一个进程」中。单机环境下#xff0…一、背景
与分布式锁相对应的是「单机锁」我们在写多线程程序时避免同时操作一个共享变量产生数据问题通常会使用一把锁来「互斥」以保证共享变量的正确性其使用范围是在「同一个进程」中。单机环境下我们常用 synchronized 或者 Lock 锁解决多线程并发访问产生的数据安全问题但是如果是在集群环境本地锁就会失效。为解决分布式场景下的并发问题 就需要用到分布式锁。下面介绍下Redis分布式锁。
二、实现思路
1. 如何实现互斥
为达到互斥可以使用SETNX 命令这个命令表示Set If Not Exists即如果 key 不存在才会设置它的值否则什么也不做。 如 SETNX lock 1 //加锁DEL lock //释放锁但是这样有个问题当客户端 1 拿到锁后如果发生下面的场景就会造成「死锁」 ①程序处理业务逻辑异常未释放锁 ②拿到锁后进程挂了没机会释放锁 这时这个客户端就会一直占用这个锁而其它客户端就「永远」拿不到这把锁了。
2. 如何避免死锁
锁无法释放产生「死锁」。那么我们给这个锁加个「租期」让它在一定时间内如果一直没释放就过期问题不就解决了。Redis支持这种语法示例
SETNX lock 1 // 加锁
EXPIRE lock 10 // 10s后自动过期但这样真的没问题了吗 No
现在的操作加锁、设置过期是 2 条命令有没有可能只执行了第一条第二条却「来不及」执行的情况发生呢例如 ①SETNX 执行成功执行 EXPIRE 时由于网络问题执行失败 ②SETNX 执行成功Redis 异常宕机EXPIRE 没有机会执行 ③SETNX 执行成功客户端异常崩溃EXPIRE 也没有机会执行 总之这两条命令不能保证是原子操作一起成功就有潜在的风险导致过期时间设置失败依旧发生「死锁」问题。
如何解决 Redis 2.6.12 之后Redis 扩展了 SET 命令的参数用这一条命令就可以执行上述两步操作 // 一条命令保证原子性执行SET lock 1 EX 10 NX我们再来看分析下它还有什么问题 试想这样一种场景 ①客户端 1 加锁成功开始操作共享资源 ②客户端 1 操作共享资源的时间「超过」了锁的过期时间锁被「自动释放」 ③客户端 2 加锁成功开始操作共享资源 ④客户端 1 操作共享资源完成释放锁但释放的是客户端 2 的锁
这里存在两个严重的问题 ①锁过期客户端 1 操作共享资源耗时太久导致锁被自动释放之后被客户端 2 持有 ②释放别人的锁客户端 1 操作共享资源完成后却又释放了客户端 2 的锁
3. 锁被别人释放怎么办?
解决办法是客户端在加锁时设置一个只有自己知道的「唯一标识」进去。释放时先判断这把锁是否是自己所有是的话再进行释放。 这样涉及到两步操作 ①判断这把锁是否是自己所有 ②释放锁。 非原子性也会出现并发问题如何解决呢Lua脚本 我们可以写好Lua脚本后交给Redis执行脚本如下
if redis.call(get, KEYS[1]) ARGV[1] then return redis.call(del, KEYS[1]) else return 0 end4. 锁过期怎么办
方案
可能是我们评估操作共享资源的时间不准确导致的。可以「冗余」适量过期时间降低锁提前过期的概率。加锁时先设置一个过期时间然后我们开启一个「守护线程」定时去检测这个锁的失效时间如果锁快要过期了操作共享资源还未完成那么就自动对锁进行「续期」重新设置过期时间。
备注个人根据项目的业务情况考虑「锁过期」对业务的影响看是否需要续期。
5. Redisson
Redisson 是一个 Java 语言实现的 Redis SDK 客户端在使用分布式锁时它就采用了「自动续期」的方案来避免锁过期这个守护线程我们一般也把它叫做「看门狗」线程。 除此之外这个 SDK 还封装了很多易用的功能
可重入锁乐观锁公平锁读写锁Redlock
6. Redlock
之前分析的场景都是锁在「单个」Redis 实例中可能产生的问题并没有涉及到 Redis 的部署架构细节。 而我们在使用 Redis 时一般会采用主从集群 哨兵的模式部署这样做的好处在于当主库异常宕机时哨兵可以实现「故障自动切换」把从库提升为主库继续提供服务以此保证可用性。 那当「主从发生切换」时这个分布锁会依旧安全吗
试想这样的场景
客户端 1 在主库上执行 SET 命令加锁成功此时主库异常宕机SET 命令还未同步到从库上主从复制是异步的从库被哨兵提升为新主库这个锁在新的主库上丢失了 为此Redis 的作者提出一种解决方案就是我们经常听到的 Redlock红锁。
Redlock 的方案基于 2 个前提
不再需要部署从库和哨兵实例只部署主库 但主库要部署多个官方推荐至少 5 个实例 也就是说想用使用 Redlock你至少要部署 5 个 Redis 实例而且都是主库它们之间没有任何关系都是一个个孤立的实例。 注意不是部署 Redis Cluster就是部署 5 个简单的 Redis 实例。
Redlock流程是这样的一共分为 5 步
客户端先获取「当前时间戳T1」客户端依次向这 5 个 Redis 实例发起加锁请求用前面讲到的 SET 命令且每个请求会设置超时时间毫秒级要远小于锁的有效时间如果某一个实例加锁失败包括网络超时、锁被其它人持有等各种异常情况就立即向下一个 Redis 实例申请加锁如果客户端从 3 个大多数以上 Redis 实例加锁成功则再次获取「当前时间戳T2」如果 T2 - T1 锁的过期时间此时认为客户端加锁成功否则认为加锁失败加锁成功去操作共享资源例如修改 MySQL 某一行或发起一个 API 请求加锁失败向「全部节点」发起释放锁请求前面讲到的 Lua 脚本释放锁
上述过程有4个重点
客户端在多个 Redis 实例上申请加锁必须保证大多数节点加锁成功大多数节点加锁的总耗时要小于锁设置的过期时间释放锁要向全部节点发起释放锁请求
实际生产中Redlock很少使用所以就简单介绍到这里。
三、springboot项目实现Redis分布式锁
1. 依赖 !--Springboot依赖 --dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactIdversion2.3.12.RELEASE/version/dependency!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-test --!--Springboot 测试依赖 --dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdversion2.3.12.RELEASE/versionscopetest/scope/dependency!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-data-redis --!--Springboot Redis依赖 --dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-data-redis/artifactIdversion2.3.12.RELEASE/version!--排除掉默认的 lettuce lettuce 在使用中存在偶尔连接超时问题--exclusionsexclusionartifactIdlettuce-core/artifactIdgroupIdio.lettuce/groupId/exclusion/exclusions/dependencydependencygroupIdredis.clients/groupIdartifactIdjedis/artifactIdversion3.2.0/version/dependency!-- https://mvnrepository.com/artifact/org.springframework.data/spring-data-commons --!-- Redis 需要引入这个依赖否则报错 --dependencygroupIdorg.springframework.data/groupIdartifactIdspring-data-commons/artifactIdversion2.3.9.RELEASE/version/dependency2. Redis配置类
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisClusterConfiguration;
import org.springframework.data.redis.connection.RedisNode;
import org.springframework.data.redis.connection.RedisPassword;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import redis.clients.jedis.JedisPoolConfig;import java.util.HashSet;
import java.util.Set;Configuration
public class RedisConfig {Value(${spring.redis.cluster.nodes:IP:Port})private String clusterNodes;Value(${spring.redis.cluster.max-redirects:3})private int maxRedirects;Value(${spring.redis.password:***})private String password;Value(${spring.redis.timeout:3000})private int timeout;/*** 最大空闲数*/Value(${spring.redis.maxIdle:100})private int maxIdle;/*** 控制一个pool可分配多少个jedis实例*/Value(${spring.redis.maxTotal:100})private int maxTotal;/*** 最大建立连接等待时间。如果超过此时间将接到异常。设为-1表示无限制*/Value(${spring.redis.maxWaitMillis:5000})private int maxWaitMillis;/*** 最小空闲数*/Value(${spring.redis.minIdle:5})private int minIdle;/*** 连接的最小空闲时间 默认1800000毫秒(30分钟)*/Value(${spring.redis.minEvictableIdleTimeMillis:300000})private int minEvictableIdleTimeMillis;/*** 每次释放连接的最大数目,默认3*/Value(${spring.redis.numTestsPerEvictionRun:3})private int numTestsPerEvictionRun;/*** 逐出扫描的时间间隔(毫秒) 如果为负数,则不运行逐出线程, 默认-1*/Value(${spring.redis.timeBetweenEvictionRunsMillis:30000})private int timeBetweenEvictionRunsMillis;/*** 是否在从池中取出连接前进行检验,如果检验失败,则从池中去除连接并尝试取出另一个*/Value(${spring.redis.testOnBorrow:true})private boolean testOnBorrow;/*** 在空闲时检查有效性, 默认false*/Value(${spring.redis.testWhileIdle:true})private boolean testWhileIdle;Beanpublic JedisPoolConfig getJedisPoolConfig() {JedisPoolConfig jedisPoolConfig new JedisPoolConfig();// 最大空闲数jedisPoolConfig.setMaxIdle(maxIdle);// 最小空闲数jedisPoolConfig.setMinIdle(minIdle);// 连接池的最大数据库连接数jedisPoolConfig.setMaxTotal(maxTotal);// 最大建立连接等待时间jedisPoolConfig.setMaxWaitMillis(maxWaitMillis);// 逐出连接的最小空闲时间 默认1800000毫秒(30分钟)jedisPoolConfig.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);// 每次逐出检查时 逐出的最大数目 如果为负数就是 : 1/abs(n), 默认3jedisPoolConfig.setNumTestsPerEvictionRun(numTestsPerEvictionRun);// 逐出扫描的时间间隔(毫秒) 如果为负数,则不运行逐出线程, 默认-1jedisPoolConfig.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);// 是否在从池中取出连接前进行检验,如果检验失败,则从池中去除连接并尝试取出另一个jedisPoolConfig.setTestOnBorrow(testOnBorrow);// 在空闲时检查有效性, 默认falsejedisPoolConfig.setTestWhileIdle(testWhileIdle);return jedisPoolConfig;}/*** Redis集群的配置** return RedisClusterConfiguration*/Beanpublic RedisClusterConfiguration redisClusterConfiguration() {RedisClusterConfiguration redisClusterConfiguration new RedisClusterConfiguration();// SetRedisNode clusterNodesString[] serverArray clusterNodes.split(,);SetRedisNode nodes new HashSet();for (String ipPort : serverArray) {String[] ipAndPort ipPort.split(:);nodes.add(new RedisNode(ipAndPort[0].trim(), Integer.parseInt(ipAndPort[1])));}redisClusterConfiguration.setClusterNodes(nodes);redisClusterConfiguration.setMaxRedirects(maxRedirects);redisClusterConfiguration.setPassword(RedisPassword.of(password));return redisClusterConfiguration;}/*** redis连接工厂类** return JedisConnectionFactory*/Beanpublic JedisConnectionFactory jedisConnectionFactory() {// 集群模式return new JedisConnectionFactory(redisClusterConfiguration(), getJedisPoolConfig());}Beanpublic RedisTemplateString, String poolRedisTemplate() {RedisTemplateString, String template new RedisTemplate();template.setConnectionFactory(jedisConnectionFactory());// 如果不配置Serializer那么存储的时候缺省使用String如果用User类型存储那么会提示错误User cant cast to StringStringRedisSerializer stringRedisSerializer new StringRedisSerializer();template.setKeySerializer(stringRedisSerializer);// hash的key也采用String的序列化方式template.setHashKeySerializer(stringRedisSerializer);Jackson2JsonRedisSerializerObject jackson2JsonRedisSerializer new Jackson2JsonRedisSerializer(Object.class);ObjectMapper mapper new ObjectMapper();mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);jackson2JsonRedisSerializer.setObjectMapper(mapper);template.setKeySerializer(stringRedisSerializer);template.setHashKeySerializer(stringRedisSerializer);template.setValueSerializer(jackson2JsonRedisSerializer);template.setHashValueSerializer(new StringRedisSerializer());template.setDefaultSerializer(jackson2JsonRedisSerializer);template.setEnableDefaultSerializer(true);template.afterPropertiesSet();return template;}
}
3. Redis 分布式锁
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;Service
public class RedisService {AutowiredQualifier(poolRedisTemplate)private RedisTemplateString, String stringRedisTemplate;public String getStr(String key) {return stringRedisTemplate.opsForValue().get(key);}/*** key 不存在 则进行设置* p* 原子性操作** param k 键* param v 值* param timeout 超时时间* param unit 超时时间的单位* return key存在返回false设置失败*/public Boolean setIfAbsent(String k, String v, long timeout, TimeUnit unit) {return stringRedisTemplate.opsForValue().setIfAbsent(k, v, timeout, unit);}/*** 加分布式锁** param key 锁* param timeout 超时时间单位秒* return 空串 表示加锁失败 uuid 表示加锁成功后续uuid要作为解锁的标识*/public String tryLock(String key, long timeout) {//释放锁时要根据uuid判断是否是自己的锁防止释放其他人的锁
// String uuid System.currentTimeMillis() ;String uuid UUID.randomUUID().toString();Boolean tryLock setIfAbsent(key, uuid, timeout, TimeUnit.SECONDS);if (tryLock) {return uuid;}//加锁失败返回 空串return ;}/*** “判断值与旧值是否相等相等则删除键” 的 Lua 脚本保证原子性操作*/private static final String SCRIPT if redis.call(get, KEYS[1]) ARGV[1] then return redis.call(del, KEYS[1]) else return 0 end;/*** 释放分布式锁** param key 锁* param oldValue 加锁时放入的标识* return 返回释放锁成功失败*/public Boolean unLock(String key, String oldValue) {ListString keys new ArrayList();keys.add(key);ListString args new ArrayList();//这里需要加下引号原因stringRedisTemplate获取的值带引号即redis.call(get, KEYS[1]) 的结果带引号而ARGV[1]不带引号比较时会出现不等的问题args.add(\ oldValue \);Long result stringRedisTemplate.execute((RedisCallbackLong) connection - {Object nativeConnection connection.getNativeConnection();// 集群模式和单机模式虽然执行脚本的方法一样但是没有共同的接口所以只能分开执行// 集群模式if (nativeConnection instanceof JedisCluster) {return (Long) ((JedisCluster) nativeConnection).eval(SCRIPT, keys, args);}// 单机模式else if (nativeConnection instanceof Jedis) {return (Long) ((Jedis) nativeConnection).eval(SCRIPT, keys, args);}return 0L;});return result 1L;}}
4. 测试类
import com.example.demo.redis.RedisService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.util.StringUtils;SpringBootTest
public class RedisTest {Autowiredprivate RedisService redisService;Testpublic void test() {String key test-lock-1235637;String uid redisService.tryLock(key, 10000);if(StringUtils.isEmpty(uid)){return;}System.out.println(uid);String uid2 redisService.tryLock(test-lock-123, 3000);System.out.println(重新获得锁是否成功 !StringUtils.isEmpty(uid2));try {//执行业务代码System.out.println(111111111);} finally {Boolean unlockSuccess redisService.unLock(key, uid);System.out.println(解锁结果 unlockSuccess);System.out.println(redisService.getStr(key));}}
}
四、springboot项目中Redission的简单使用
1. 依赖
!--redission相关依赖--dependencygroupIdorg.redisson/groupIdartifactIdredisson/artifactIdversion3.16.0/version/dependency2. 配置类
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class RedissionConfig {Beanpublic RedissonClient getRedisson() {
// //单机
// Config config new Config();
// //单机模式 依次设置redis地址和密码
// config.useSingleServer().
// setAddress(redis:// host : port);
// RedissonClient redisson Redisson.create(config);
//
// //主从
// Config config new Config();
// config.useMasterSlaveServers()
// .setMasterAddress(redis://127.0.0.1:6379)
// .addSlaveAddress(redis://127.0.0.1:6389, 127.0.0.1:6332, 127.0.0.1:6419)
// .addSlaveAddress(redis://127.0.0.1:6399);
// RedissonClient redisson Redisson.create(config);
//
//
// //哨兵
// Config config new Config();
// config.useSentinelServers()
// .setMasterName(mymaster)
// .addSentinelAddress(redis://127.0.0.1:26389, 127.0.0.1:26379)
// .addSentinelAddress(redis://127.0.0.1:26319);
// RedissonClient redisson Redisson.create(config);//集群 ,,,,,Config config new Config();config.useClusterServers().setScanInterval(2000) // cluster state scan interval in milliseconds.addNodeAddress(redis://ip:port, redis://ip:port).setPassword(***);RedissonClient redisson Redisson.create(config);return redisson;}
}
3. 测试类
import org.junit.jupiter.api.Test;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;SpringBootTest
public class RedissionTest {Resourceprivate RedissonClient redisson;Testpublic void testRedission() {System.out.println(开始执行);String lockKey 123456;RLock lock redisson.getLock(lockKey);System.out.println(获取锁);try {//尝试获取锁参数分别是获取锁的最大等待时间(期间会重试)锁自动释放时间时间单位boolean b lock.tryLock(1, 10, TimeUnit.SECONDS);System.out.println(是否获取锁 b);//执行业务逻辑System.out.println(执行业务逻辑....);Thread.sleep(3000);} catch (Exception e) {System.out.println(系统异常稍后重试....);} finally {//删除锁lock.unlock();System.out.println(解锁成功);}}
}
五、参考文献
https://mp.weixin.qq.com/s/2et43aJT6qjBsJ8Z9pcZcQ