Redis 分布式锁
Redis 分布式锁
集群的并发问题
有关锁失效原因分析
- 多个tomcat
- 对应多个jvm
- 两个线程不在同一个tomcat
- 在对应的多个jvm中拿不到同一个锁对象 --- 锁不住
基本原理
分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁
核心思想:保证同一把锁
可见性:多个线程都能看到相同的结果,注意:这个地方说的可见性并不是并发编程中指的内存可见性,只是说多个进程之间都能感知到变化的意思
互斥:互斥是分布式锁的最基本的条件,使得程序串行执行
高可用:程序不易崩溃,时时刻刻都保证较高的可用性
高性能:由于加锁本身就让性能降低,所有对于分布式锁本身需要他就较高的加锁性能和释放锁性能
安全性:安全也是程序中必不可少的一环
实现方式对比
常见的分布式锁有三种
Mysql:mysql本身就带有锁机制,但是由于mysql性能本身一般,所以采用分布式锁的情况下,其实使用mysql作为分布式锁比较少见
Redis:redis作为分布式锁是非常常见的一种使用方式,现在企业级开发中基本都使用redis或者zookeeper作为分布式锁,利用setnx这个方法,如果插入key成功,则表示获得到了锁,如果有人插入成功,其他人插入失败则表示无法获得到锁,利用这套逻辑来实现分布式锁
Zookeeper:zookeeper也是企业级开发中较好的一个实现分布式锁的方案,由于本套视频并不讲解zookeeper的原理和分布式锁的实现,所以不过多阐述

核心思路
利用互斥锁解决缓存击穿问题
核心思路:相较于原来从缓存中查询不到数据后直接查询数据库而言,现在的方案是 进行查询之后,如果从缓存没有查询到数据,则进行互斥锁的获取,获取互斥锁后,判断是否获得到了锁,如果没有获得到,则休眠,过一会再进行尝试,直到获取到锁为止,才能进行查询
如果获取到了锁的线程,再去进行查询,查询后将数据写入redis,再释放锁,返回数据,利用互斥锁就能保证只有一个线程去执行操作数据库的逻辑,防止缓存击穿

核心思路就是利用redis的setnx方法来表示获取锁,该方法含义是redis中如果没有这个key,则插入成功,返回1,在stringRedisTemplate中返回true, 如果有这个key则插入失败,则返回0,在stringRedisTemplate返回false,我们可以通过true,或者是false,来表示是否有线程成功插入key,成功插入的key的线程我们认为他就是获得到锁的线程
#[NX表示not exit]
set lock lock1 NX
#EX后跟过期时间
set lock lock1 EX 300 NX
问题:加锁后不释放,造成死锁
//1、占分布式锁。去redis占坑
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111");
if (lock){
//设置过期时间,避免不释放造成死锁
redisTemplate.expire("lock",30, TimeUnit.SECONDS);
//加锁成功 执行业务
Map<String, List<Catelog2Vo>> dataFromDb = getDataFromDb();
redisTemplate.delete("lock");
return dataFromDb;
}
问题:占锁和设置时间必须原子性:占锁后时间没设置上,出现死锁
//1、占分布式锁。去redis占坑,并设置过期时间
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111",300,TimeUnit.SECONDS);
if (lock){
//加锁成功 执行业务
Map<String, List<Catelog2Vo>> dataFromDb = getDataFromDb();
redisTemplate.delete("lock");
return dataFromDb;
}
问题:自动过期时间为10s,业务时间为20s,到期删除,其他业务抢到锁,锁失效
在占锁时加上或者设为UUID值。删除时进行判断
//1、占分布式锁。去redis占坑
String uuid = UUID.randomUUID().toString();
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid,300,TimeUnit.SECONDS);
if (lock){
//加锁成功 执行业务
Map<String, List<Catelog2Vo>> dataFromDb = getDataFromDb();
if (uuid.equals(redisTemplate.opsForValue().get("lock"))){
//存在网络时延问题,比如在redis获取到lock返回时,lock过期被自动删除,
// 此时其他线程抢占了锁,创建了lock,但是会被这个线程删掉的情况
redisTemplate.delete("lock");
}
return dataFromDb;
}
问题:删除时,获取lock、对比、删除必须原子性,否则可能会删除其他业务lock
官网对此也与解释,使用lua脚本解决这个问题
public Map<String, List<Catalog2Vo>> getCatalogJsonFromDbWithRedisLock() {
String token = UUID.randomUUID().toString();
//1、占分布式锁。去redis占坑,
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", token, 100, TimeUnit.SECONDS);
if (lock) {
System.out.println("获取分布式锁成功");
//为了让锁自动续期,不至于执行途中因为时间过短而失效,可以设置时间长一些,然后finally保证业务操作完成之后,就执行删除锁的操作
//不管怎样,哪怕崩溃也直接解锁,不关心业务异常
Map<String, List<Catalog2Vo>> dataFromDB;
try {
//加锁成功
dataFromDB = getDataFromDB();
} finally {
//lua脚本保证原子性
String lua = "if redis.call(\"get\",KEYS[1]) == ARGV[1]\n" +
"then\n" +
" return redis.call(\"del\",KEYS[1])\n" +
"else\n" +
" return 0\n" +
"end";
RedisScript<Long> luaScript = RedisScript.of(lua, Long.class);
//删除锁
Long lock1 = stringRedisTemplate.execute(luaScript, Arrays.asList("lock"), token);
}
return dataFromDB;
} else {
System.out.println("获取分布式锁失败,等待重试");
//加锁失败,自旋的方式,休眠300ms重试
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
return getCatalogJsonFromDbWithRedisLock();
}
}
问题:锁的自动续期
Redisson
基于setnx实现的分布式锁存在下面的问题:
重入问题:重入问题是指 获得锁的线程可以再次进入到相同的锁的代码块中,可重入锁的意义在于防止死锁,比如HashTable这样的代码中,他的方法都是使用synchronized修饰的,假如他在一个方法内,调用另一个方法,那么此时如果是不可重入的,不就死锁了吗?所以可重入锁他的主要意义是防止死锁,我们的synchronized和Lock锁都是可重入的
不可重试:是指目前的分布式只能尝试一次,我们认为合理的情况是:当线程在获得锁失败后,他应该能再次尝试获得锁
**超时释放:**我们在加锁时增加了过期时间,这样的我们可以防止死锁,但是如果卡顿的时间超长,虽然我们采用了lua表达式防止删锁的时候,误删别人的锁,但是毕竟没有锁住,有安全隐患
主从一致性: 如果Redis提供了主从集群,当我们向集群写数据时,主机需要异步的将数据同步给从机,而万一在同步过去之前,主机宕机了,就会出现死锁问题
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现
整合
依赖
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.13.6</version> </dependency>配置类
@Configuration public class MyRedissonConfig { /** * 所有对Redisson的使用都是对RedissionClient对象的操作 * @return * @throws IOException */ @Bean(destroyMethod="shutdown") public RedissonClient redisson() throws IOException { //1、创建配置 Config config = new Config(); config.useSingleServer().setAddress("redis://192.168.218.128:6379"); //2、根据Config创建出RedisClient实例 RedissonClient redissonClient = Redisson.create(config); return redissonClient; } }测试
@Autowired private RedissonClient redissonClient; @Test public void redisson(){ //获取锁(可重入),指定锁的名称 RLock lock = redissonClient.getLock("anyLock"); //尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位 boolean isLock = lock.tryLock(1,10,TimeUnit.SECONDS); //判断获取锁成功 if(isLock){ try{ System.out.println("执行业务"); }finally{ //释放锁 lock.unlock(); } } }应用
@ResponseBody @GetMapping("/hello") public String hello() { //1、获取同一把锁,只要锁的名字一样,就是同一把锁, RLock lock = redisson.getLock("my-lock"); //2、加锁 //阻塞式等待 lock.lock(); try{ System.out.println("加锁成功,执行业务"+Thread.currentThread().getId()); Thread.sleep(30000); } catch (InterruptedException e) { e.printStackTrace(); } finally { //3、解锁 System.out.println(Thread.currentThread().getId()+"释放锁"); lock.unlock(); } return "hello"; }手动没有解锁,也会为我解锁
不断获取锁,只要能获取锁就继续执行我们的业务
看门狗原理-redisson如何解决死锁
最佳实践
@ResponseBody
@GetMapping("/hello")
public String hello() {
//1、获取同一把锁,只要锁的名字一样,就是同一把锁,
RLock lock = redisson.getLock("my-lock");
//2、加锁
//阻塞式等待,默认加的锁都是30秒
//lock.lock();
//1)、锁的自动续期,如果业务超长,运行期间自动给锁续上30s,不用担心业务时间长,锁自动过期被删掉
//2)、加锁的业务只要运行完成,就不会给当前续期,即使不手动删除解锁,锁默认在30s以后自动删除。
//10秒自动解锁,自动解锁时间一定要大于业务的执行的时间
lock.lock(10, TimeUnit.SECONDS);
// 问题:在锁时间到了以后,不会自动续期
//1、如果我们传递了锁的超时时间,就发送给redis执行脚本,进行占锁,默认超时就是我们指定的时间
//2、如果我们未指定锁的超时时间,就使用 30 * 1000 【看门狗lockWatchdogTimeout的默认时间】
// 只要占锁成功,就会启动一个定时任务【重新给锁设定过期时间,新的过期时间就是看门狗的默认时间】,每隔10s自动续期,续成30s
// internalLockLeaseTime / 3【看门狗时间】/3,10s
//最佳实战
//1) lock.lock(10, TimeUnit.SECONDS);省掉了整个续期操作,手动解锁
try {
System.out.println("加锁成功,执行业务" + Thread.currentThread().getId());
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//3、解锁 假设解锁代码没有运行,redis会不会出现死锁
System.out.println(Thread.currentThread().getId() + "释放锁");
lock.unlock();
}
return "hello";
}
}
读写锁
/**
* 保证一定能读到最新数据,修改期间,写锁是一个排他锁(互斥锁,共享锁)。读锁是一个共享锁
* 写锁没释放,读就必须等待
*
* 读 + 读 :相当于无锁,并发读,只会在redis中记录好,所有当前的读锁,他们都会同时加锁成功
* 写 + 读 :等待写锁释放
* 写 + 写:阻塞方式
* 读 + 写:有读锁,写也需要等待
* //只要有写的存在,都必须等待
* @return
*/
@GetMapping("/read")
@ResponseBody
public String readValue() {
RReadWriteLock readWriteLock = redisson.getReadWriteLock("rw-lock");
String s = "";
RLock rLock = readWriteLock.readLock();
rLock.lock();
try {
System.out.println("读锁加锁成功。。。。"+Thread.currentThread().getId());
s = stringRedisTemplate.opsForValue().get("writeValue");
Thread.sleep(30000);
} catch (Exception e) {
e.printStackTrace();
} finally {
rLock.unlock();
System.out.println("读锁释放"+Thread.currentThread().getId());
}
return s;
}
@GetMapping("/write")
@ResponseBody
public String writeValue() {
RReadWriteLock readWriteLock = redisson.getReadWriteLock("rw-lock");
String s = "";
RLock rLock = readWriteLock.writeLock();
try {
//1、改数据加写锁,读数据加读锁
rLock.lock();
System.out.println("写锁加锁成功。。。。"+Thread.currentThread().getId());
s = UUID.randomUUID().toString();
Thread.sleep(10000);
stringRedisTemplate.opsForValue().set("writeValue", s);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
rLock.unlock();
System.out.println("写锁释放"+Thread.currentThread().getId());
}
return s;
}
信号量
/**
* 信号量可以做分布式限流
*/
@GetMapping("/park")
@ResponseBody
public String park() throws InterruptedException {
RSemaphore park = redisson.getSemaphore("park");
boolean b = park.tryAcquire();
if (b) {
//执行业务
} else {
return "error";
}
return "ok" + b;
}
@GetMapping("/go")
@ResponseBody
public String go() throws InterruptedException {
RSemaphore park = redisson.getSemaphore("park");
park.release();
return "走了";
}
闭锁
闭锁 --- 都执行完了,上锁
@GetMapping("/lockdoor")
@ResponseBody
public String lockDoor() throws InterruptedException {
RCountDownLatch door = redisson.getCountDownLatch("door");
door.trySetCount(5);
door.await();
return "放假了";
}
@GetMapping("/gogogo/{id}")
@ResponseBody
public String gogogo(@PathVariable int id){
RCountDownLatch door = redisson.getCountDownLatch("door");
door.countDown();
return id+"号走了";
}
可重入原理
在Lock锁中,他是借助于底层
voaltile的一个state变量来记录重入的状态
没有持有这把锁,那么state=0
有持有这把锁,那么state=1
再次持有这把锁,那么state就会+1
如果是对于synchronized而言,他在c语言代码中会有一个count,原理和state类似,也是重入一次就加一,释放一次就-1 ,直到减少成0 时,表示当前这把锁没有被人持有
在redission中,采用hash结构用来存储锁,其中大key表示表示这把锁是否存在,用小key表示当前这把锁被哪个线程持有
-- KEYS[1] : 锁名称,锁的大key
-- ARGV[1]: 锁失效时间
-- ARGV[2]: 锁id,锁的小key(id:threadId)
-- 上锁
-- Lock{
-- id + ":" + threadId : 1
--}
-- 锁大key == 0,无锁
if (redis.call('exists', KEYS[1]) == 0) then
redis.call('hset', KEYS[1], ARGV[2], 1); -- 上锁,计数+1
redis.call('pexpire', KEYS[1], ARGV[1]); -- 设置过期时间
return nil;
end;
-- 锁小key == 1,是自己的锁
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('hincrby', KEYS[1], ARGV[2], 1); -- 计数+1
redis.call('pexpire', KEYS[1], ARGV[1]); -- 设置过期时间
return nil;
end;
-- 都不满足,抢锁失败,返回pttl,即为当前这把锁的失效时间
return redis.call('pttl', KEYS[1]);
锁重试原理
-- 抢锁过程
-- 判断当前这个方法的返回值是否为null
-- 如果是null,则对应则前两个if对应的条件,退出抢锁逻辑
-- 如果返回的不是null,即走了第三个分支,会进行while(true)的自旋抢锁
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
leaseTime默认值为-1,如果传参,此时leaseTime != -1 则会进去抢锁
if (leaseTime != -1) {
return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
如果是没有传入时间,则此时也会进行抢锁, 而且抢锁时间是默认看门狗时间
看门狗原理
如果是没有传入时间,则此时也会进行抢锁, 而且抢锁时间是默认看门狗时间
//抢锁
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,
//没有传入时间,则此时也会进行抢锁, 而且抢锁时间是默认看门狗时间
commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
TimeUnit.MILLISECONDS,
threadId,
RedisCommands.EVAL_LONG);
//监听抢锁结果
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// 抢到锁后,后台开启一个线程,进行续约逻辑,也就是看门狗线程
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
//续约逻辑
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
//通过参数2,参数3 去描述什么时候去做参数1的事情,即到过期时间的1/3续约
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
//续约
RFuture<Boolean> future = renewExpirationAsync(threadId);
//监听续约结果
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
//续约成功,递归调用自己,再重新设置一个timeTask(),完成不停的续约
//假设我们的线程出现了宕机,不能再调用renewExpiration方法,时间到后自然释放
if (res) {
// reschedule itself
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
MutiLock原理
为了提高redis的可用性,我们会搭建集群或者主从,现在以主从为例
问题:主机还没有来得及把数据写入到从机去的时候,此时主机宕机,哨兵会发现主机宕机,并且选举一个slave变成master,而此时新的master中实际上并没有锁信息,此时锁信息就已经丢掉了
解决:redission提出来了MutiLock锁,使用这把锁后每个节点的地位都一样, 这把锁加锁的逻辑需要写入到每一个主从节点上,只有所有的服务器都写入成功,此时才是加锁成功,假设现在某个节点挂了,那么他去获得锁的时候,只要有一个节点拿不到,都不能算是加锁成功,就保证了加锁的可靠性
原理:redission会将多个锁添加到一个集合中,然后用while循环去不停去尝试拿锁,但是会有一个总共的加锁时间,这个时间是用需要加锁的个数 * 1500ms ,假设有3个锁,那么时间就是4500ms,假设在这4500ms内,所有的锁都加锁成功, 那么此时才算是加锁成功,如果在4500ms有线程加锁失败,则会再次去进行重试

Spring Cache
整合
依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-cache</artifactId> </dependency>配置
spring.cache.type=redis #spring.cache.cache-names= #一小时,这里单位是毫秒 spring.cache.redis.time-to-live=3600000 #如果使用前缀,就用我们指定的前缀,如果没有就默认使用缓存的名字作为前缀 spring.cache.redis.key-prefix=CACHE_ spring.cache.redis.use-key-prefix=true # 是否缓存空值。防止缓存穿透 spring.cache.redis.cache-null-values=true开启
@EnableCaching // 开启缓存使用
/** * 'catagory'放到哪个名字的缓存【缓存的分区(按照业务类型分)】 * * 代表当前方法的结果需要缓存,如果缓存中有,方法不用调用 * 如果缓存中没有,会调用方法,最后将方法的结果放入缓存 * * 默认行为 * 1)、如果缓存中有,方法不用调用 * 2)、key默认自动生成,缓存名字::SimpleKey [](自动生成的key) * 3)、缓存的value值,默认使用的是jdk的序列化机制,将序列化后的值存在redis中 * 4)、默认时间ttl=-1 * * 自定义属性: * 1)、指定生成的缓存使用的key:key属性指定,使用spel表达式 * SPEL表达式:https://docs.spring.io/spring/docs/5.2.7.RELEASE/spring-framework-reference/integration.html#cache-spel-context * 2)、指定缓存的数据的存活时间:配置文件中修改ttl,spring.cache.redis.time-to-live=3600000 * 3)、将数据保存为json格式(异构系统比如php可能不兼容) */ //因为spel动态取值,所有需要额外加''表示字符串 @Cacheable(value = {"catagory"},key = "'Level1Categorys'") @Override public List<CategoryEntity> getLevel1Categorys() { System.out.println("getLevel1Categorys......"); long l = System.currentTimeMillis(); return baseMapper.selectList(new QueryWrapper<CategoryEntity>().eq("parent_cid", 0)); }配置类
//开启属性配置绑定 @EnableConfigurationProperties({CacheProperties.class}) @EnableCaching @Configuration public class MyCacheConfig { /** * 配置文件中的东西没有用上 *1、原来和配置文件绑定的配置类,是这样子的 * @ConfigurationProperties(prefix = "spring.cache") * public class CacheProperties *2、要让他生效,要用这个注解,@EnableConfigurationProperties */ @Bean public RedisCacheConfiguration redisCacheConfiguration(CacheProperties cacheProperties) { CacheProperties.Redis redisProperties = cacheProperties.getRedis(); RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig(); //指定缓存序列化方式为json config = config.serializeKeysWith(RedisSerializationContext .SerializationPair .fromSerializer(new StringRedisSerializer())); config = config.serializeValuesWith(RedisSerializationContext .SerializationPair .fromSerializer(new GenericJackson2JsonRedisSerializer())); CacheProperties.Redis redisProperties = cacheProperties.getRedis(); //设置配置文件中的各项配置,如过期时间 if (redisProperties.getTimeToLive() != null) { config = config.entryTtl(redisProperties.getTimeToLive()); } if (redisProperties.getKeyPrefix() != null) { config = config.prefixKeysWith(redisProperties.getKeyPrefix()); } if (!redisProperties.isCacheNullValues()) { config = config.disableCachingNullValues(); } if (!redisProperties.isUseKeyPrefix()) { config = config.disableKeyPrefix(); } return config; } }缓存失效
//在执行被注解的方法时,会将缓存中的该数据删除 @CacheEvict(value = {"category"},key = "'levelCategorys'") //删除多个 @Caching(evict = { @CacheEvict(value = "category",key= "'xxx'"), @CacheEvict(value = "category",key = "'yyy'") }) //删除所有 @CacheEvict(value="category",allEntries=true)问题:读模式下缓存击穿的锁默认没有开启,开启后是本地锁,不是分布式锁,写模式无锁
// sync = true @Cacheable(value = {"catagory"}, key = "#root.method.name", sync = true)
