跳至主要內容

Redis 分布式锁

程序员李某某大约 15 分钟

Redis 分布式锁

集群的并发问题

有关锁失效原因分析

  • 多个tomcat
  • 对应多个jvm
  • 两个线程不在同一个tomcat
  • 在对应的多个jvm中拿不到同一个锁对象 --- 锁不住

基本原理

分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁

核心思想:保证同一把锁

可见性:多个线程都能看到相同的结果,注意:这个地方说的可见性并不是并发编程中指的内存可见性,只是说多个进程之间都能感知到变化的意思

互斥:互斥是分布式锁的最基本的条件,使得程序串行执行

高可用:程序不易崩溃,时时刻刻都保证较高的可用性

高性能:由于加锁本身就让性能降低,所有对于分布式锁本身需要他就较高的加锁性能和释放锁性能

安全性:安全也是程序中必不可少的一环

实现方式对比

常见的分布式锁有三种

Mysql:mysql本身就带有锁机制,但是由于mysql性能本身一般,所以采用分布式锁的情况下,其实使用mysql作为分布式锁比较少见

Redis:redis作为分布式锁是非常常见的一种使用方式,现在企业级开发中基本都使用redis或者zookeeper作为分布式锁,利用setnx这个方法,如果插入key成功,则表示获得到了锁,如果有人插入成功,其他人插入失败则表示无法获得到锁,利用这套逻辑来实现分布式锁

Zookeeper:zookeeper也是企业级开发中较好的一个实现分布式锁的方案,由于本套视频并不讲解zookeeper的原理和分布式锁的实现,所以不过多阐述

image-20221114011515675
image-20221114011515675

核心思路

利用互斥锁解决缓存击穿问题

核心思路:相较于原来从缓存中查询不到数据后直接查询数据库而言,现在的方案是 进行查询之后,如果从缓存没有查询到数据,则进行互斥锁的获取,获取互斥锁后,判断是否获得到了锁,如果没有获得到,则休眠,过一会再进行尝试,直到获取到锁为止,才能进行查询

如果获取到了锁的线程,再去进行查询,查询后将数据写入redis,再释放锁,返回数据,利用互斥锁就能保证只有一个线程去执行操作数据库的逻辑,防止缓存击穿

image-20221031205819352
image-20221031205819352

核心思路就是利用redis的setnx方法来表示获取锁,该方法含义是redis中如果没有这个key,则插入成功,返回1,在stringRedisTemplate中返回true, 如果有这个key则插入失败,则返回0,在stringRedisTemplate返回false,我们可以通过true,或者是false,来表示是否有线程成功插入key,成功插入的key的线程我们认为他就是获得到锁的线程

#[NX表示not exit]
set lock lock1 NX
#EX后跟过期时间
set lock lock1 EX 300 NX

问题:加锁后不释放,造成死锁

//1、占分布式锁。去redis占坑
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111");
if (lock){
    //设置过期时间,避免不释放造成死锁
    redisTemplate.expire("lock",30, TimeUnit.SECONDS);
    //加锁成功  执行业务
    Map<String, List<Catelog2Vo>> dataFromDb = getDataFromDb();
    redisTemplate.delete("lock");
    return dataFromDb;
}

问题:占锁和设置时间必须原子性:占锁后时间没设置上,出现死锁

//1、占分布式锁。去redis占坑,并设置过期时间
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111",300,TimeUnit.SECONDS);
if (lock){
    //加锁成功  执行业务
    Map<String, List<Catelog2Vo>> dataFromDb = getDataFromDb();
    redisTemplate.delete("lock");
    return dataFromDb;
} 

问题:自动过期时间为10s,业务时间为20s,到期删除,其他业务抢到锁,锁失效

在占锁时加上或者设为UUID值。删除时进行判断

//1、占分布式锁。去redis占坑
String uuid = UUID.randomUUID().toString();
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid,300,TimeUnit.SECONDS);
if (lock){
    //加锁成功  执行业务
    Map<String, List<Catelog2Vo>> dataFromDb = getDataFromDb();
    if (uuid.equals(redisTemplate.opsForValue().get("lock"))){
        //存在网络时延问题,比如在redis获取到lock返回时,lock过期被自动删除,
        // 此时其他线程抢占了锁,创建了lock,但是会被这个线程删掉的情况
        redisTemplate.delete("lock");
    }
    return dataFromDb;
} 

问题:删除时,获取lock、对比、删除必须原子性,否则可能会删除其他业务lock

官网对此也与解释,使用lua脚本解决这个问题

    public Map<String, List<Catalog2Vo>> getCatalogJsonFromDbWithRedisLock() {

        String token = UUID.randomUUID().toString();

        //1、占分布式锁。去redis占坑,
        Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", token, 100, TimeUnit.SECONDS);


        if (lock) {
            System.out.println("获取分布式锁成功");

            //为了让锁自动续期,不至于执行途中因为时间过短而失效,可以设置时间长一些,然后finally保证业务操作完成之后,就执行删除锁的操作
            //不管怎样,哪怕崩溃也直接解锁,不关心业务异常
            Map<String, List<Catalog2Vo>> dataFromDB;
            try {
                //加锁成功
                dataFromDB = getDataFromDB();
            } finally {
      			//lua脚本保证原子性
                String lua = "if redis.call(\"get\",KEYS[1]) == ARGV[1]\n" +
                        "then\n" +
                        "    return redis.call(\"del\",KEYS[1])\n" +
                        "else\n" +
                        "    return 0\n" +
                        "end";
                RedisScript<Long> luaScript = RedisScript.of(lua, Long.class);
                //删除锁
                Long lock1 = stringRedisTemplate.execute(luaScript, Arrays.asList("lock"), token);
            }

            return dataFromDB;
        } else {
            System.out.println("获取分布式锁失败,等待重试");
            //加锁失败,自旋的方式,休眠300ms重试
            try {
                TimeUnit.MILLISECONDS.sleep(300);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return getCatalogJsonFromDbWithRedisLock();
        }

    }

问题:锁的自动续期

Redisson

基于setnx实现的分布式锁存在下面的问题:

重入问题:重入问题是指 获得锁的线程可以再次进入到相同的锁的代码块中,可重入锁的意义在于防止死锁,比如HashTable这样的代码中,他的方法都是使用synchronized修饰的,假如他在一个方法内,调用另一个方法,那么此时如果是不可重入的,不就死锁了吗?所以可重入锁他的主要意义是防止死锁,我们的synchronized和Lock锁都是可重入的

不可重试:是指目前的分布式只能尝试一次,我们认为合理的情况是:当线程在获得锁失败后,他应该能再次尝试获得锁

**超时释放:**我们在加锁时增加了过期时间,这样的我们可以防止死锁,但是如果卡顿的时间超长,虽然我们采用了lua表达式防止删锁的时候,误删别人的锁,但是毕竟没有锁住,有安全隐患

主从一致性: 如果Redis提供了主从集群,当我们向集群写数据时,主机需要异步的将数据同步给从机,而万一在同步过去之前,主机宕机了,就会出现死锁问题

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现

整合

  • 依赖

    <dependency>
         <groupId>org.redisson</groupId>
         <artifactId>redisson</artifactId>
         <version>3.13.6</version>
     </dependency>
    
  • 配置类

    @Configuration
    public class MyRedissonConfig {
    
        /**
         * 所有对Redisson的使用都是对RedissionClient对象的操作
         * @return
         * @throws IOException
         */
        @Bean(destroyMethod="shutdown")
        public RedissonClient redisson() throws IOException {
            //1、创建配置
            Config config = new Config();
          	config.useSingleServer().setAddress("redis://192.168.218.128:6379");
            //2、根据Config创建出RedisClient实例
            RedissonClient redissonClient = Redisson.create(config);
            return redissonClient;
        }
    }
    
  • 测试

    @Autowired
    private RedissonClient redissonClient;
    
    
    @Test
    public void redisson(){
        //获取锁(可重入),指定锁的名称
        RLock lock = redissonClient.getLock("anyLock");
        //尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位
        boolean isLock = lock.tryLock(1,10,TimeUnit.SECONDS);
        //判断获取锁成功
        if(isLock){
            try{
                System.out.println("执行业务");          
            }finally{
                //释放锁
                lock.unlock();
            }
        }
    }
    
  • 应用

    @ResponseBody
    @GetMapping("/hello")
    public String hello() {
        //1、获取同一把锁,只要锁的名字一样,就是同一把锁,
        RLock lock = redisson.getLock("my-lock");
        //2、加锁
        //阻塞式等待
        lock.lock();
        try{
            System.out.println("加锁成功,执行业务"+Thread.currentThread().getId());
            Thread.sleep(30000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //3、解锁
            System.out.println(Thread.currentThread().getId()+"释放锁");
            lock.unlock();
        }
        return "hello";
    }
    
  • 手动没有解锁,也会为我解锁

    不断获取锁,只要能获取锁就继续执行我们的业务

  • 看门狗原理-redisson如何解决死锁

最佳实践

 @ResponseBody
    @GetMapping("/hello")
    public String hello() {
        //1、获取同一把锁,只要锁的名字一样,就是同一把锁,
        RLock lock = redisson.getLock("my-lock");
        //2、加锁
        //阻塞式等待,默认加的锁都是30秒
        //lock.lock();
        //1)、锁的自动续期,如果业务超长,运行期间自动给锁续上30s,不用担心业务时间长,锁自动过期被删掉
        //2)、加锁的业务只要运行完成,就不会给当前续期,即使不手动删除解锁,锁默认在30s以后自动删除。

        //10秒自动解锁,自动解锁时间一定要大于业务的执行的时间
        lock.lock(10, TimeUnit.SECONDS);
        // 问题:在锁时间到了以后,不会自动续期
        //1、如果我们传递了锁的超时时间,就发送给redis执行脚本,进行占锁,默认超时就是我们指定的时间
        //2、如果我们未指定锁的超时时间,就使用 30 * 1000 【看门狗lockWatchdogTimeout的默认时间】
        // 只要占锁成功,就会启动一个定时任务【重新给锁设定过期时间,新的过期时间就是看门狗的默认时间】,每隔10s自动续期,续成30s
        // internalLockLeaseTime / 3【看门狗时间】/3,10s

        //最佳实战
        //1) lock.lock(10, TimeUnit.SECONDS);省掉了整个续期操作,手动解锁  
        try {
            System.out.println("加锁成功,执行业务" + Thread.currentThread().getId());
            Thread.sleep(30000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //3、解锁 假设解锁代码没有运行,redis会不会出现死锁
            System.out.println(Thread.currentThread().getId() + "释放锁");
            lock.unlock();
        }
        return "hello";
    }
}

读写锁

/**
 * 保证一定能读到最新数据,修改期间,写锁是一个排他锁(互斥锁,共享锁)。读锁是一个共享锁
 * 写锁没释放,读就必须等待
 *
 * 读 + 读 :相当于无锁,并发读,只会在redis中记录好,所有当前的读锁,他们都会同时加锁成功
 * 写 + 读 :等待写锁释放
 * 写 + 写:阻塞方式
 * 读 + 写:有读锁,写也需要等待
 * //只要有写的存在,都必须等待
 * @return
 */
@GetMapping("/read")
@ResponseBody
public String readValue() {

    RReadWriteLock readWriteLock = redisson.getReadWriteLock("rw-lock");
    String s = "";
    RLock rLock = readWriteLock.readLock();
    rLock.lock();
    try {
        System.out.println("读锁加锁成功。。。。"+Thread.currentThread().getId());
        s = stringRedisTemplate.opsForValue().get("writeValue");
        Thread.sleep(30000);
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        rLock.unlock();
        System.out.println("读锁释放"+Thread.currentThread().getId());
    }
    return s;
}


@GetMapping("/write")
@ResponseBody
public String writeValue() {

    RReadWriteLock readWriteLock = redisson.getReadWriteLock("rw-lock");
    String s = "";
    RLock rLock = readWriteLock.writeLock();
    try {
        //1、改数据加写锁,读数据加读锁
        rLock.lock();
        System.out.println("写锁加锁成功。。。。"+Thread.currentThread().getId());
        s = UUID.randomUUID().toString();
        Thread.sleep(10000);
        stringRedisTemplate.opsForValue().set("writeValue", s);
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        rLock.unlock();
        System.out.println("写锁释放"+Thread.currentThread().getId());
    }
    return s;
}

信号量

/**
 * 信号量可以做分布式限流
 */
@GetMapping("/park")
@ResponseBody
public String park() throws InterruptedException {

    RSemaphore park = redisson.getSemaphore("park");
    boolean b = park.tryAcquire();
    if (b) {
        //执行业务
    } else {
        return "error";
    }
    return "ok" + b;

}

@GetMapping("/go")
@ResponseBody
public String go() throws InterruptedException {
    RSemaphore park = redisson.getSemaphore("park");
    park.release();
    return "走了";

}

闭锁

闭锁 --- 都执行完了,上锁

@GetMapping("/lockdoor")
@ResponseBody
public String lockDoor() throws InterruptedException {

    RCountDownLatch door = redisson.getCountDownLatch("door");
    door.trySetCount(5);
    door.await();
    return "放假了";
}

@GetMapping("/gogogo/{id}")
@ResponseBody
public String gogogo(@PathVariable int id){

    RCountDownLatch door = redisson.getCountDownLatch("door");
    door.countDown();
    return id+"号走了";
}

可重入原理

在Lock锁中,他是借助于底层

  • voaltile的一个state变量来记录重入的状态

  • 没有持有这把锁,那么state=0

  • 有持有这把锁,那么state=1

  • 再次持有这把锁,那么state就会+1

    如果是对于synchronized而言,他在c语言代码中会有一个count,原理和state类似,也是重入一次就加一,释放一次就-1 ,直到减少成0 时,表示当前这把锁没有被人持有

  • 在redission中,采用hash结构用来存储锁,其中大key表示表示这把锁是否存在,用小key表示当前这把锁被哪个线程持有

-- KEYS[1] : 锁名称,锁的大key
-- ARGV[1]:  锁失效时间
-- ARGV[2]:  锁id,锁的小key(id:threadId)

-- 上锁 
-- Lock{
--   id + ":" + threadId :  1
--}

-- 锁大key == 0,无锁
if (redis.call('exists', KEYS[1]) == 0) then
    redis.call('hset', KEYS[1], ARGV[2], 1);	-- 上锁,计数+1
    redis.call('pexpire', KEYS[1], ARGV[1]); 	-- 设置过期时间
    return nil;
end;
-- 锁小key == 1,是自己的锁
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
    redis.call('hincrby', KEYS[1], ARGV[2], 1);	-- 计数+1
    redis.call('pexpire', KEYS[1], ARGV[1]);	-- 设置过期时间
    return nil;
end;
-- 都不满足,抢锁失败,返回pttl,即为当前这把锁的失效时间
return redis.call('pttl', KEYS[1]);

锁重试原理

-- 抢锁过程
	-- 判断当前这个方法的返回值是否为null
	-- 如果是null,则对应则前两个if对应的条件,退出抢锁逻辑
	-- 如果返回的不是null,即走了第三个分支,会进行while(true)的自旋抢锁
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
    return;
}

leaseTime默认值为-1,如果传参,此时leaseTime != -1 则会进去抢锁

if (leaseTime != -1) {
    return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}

如果是没有传入时间,则此时也会进行抢锁, 而且抢锁时间是默认看门狗时间

看门狗原理

如果是没有传入时间,则此时也会进行抢锁, 而且抢锁时间是默认看门狗时间

//抢锁
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,
            //没有传入时间,则此时也会进行抢锁, 而且抢锁时间是默认看门狗时间 
        	commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
            TimeUnit.MILLISECONDS, 
            threadId, 
            RedisCommands.EVAL_LONG);

//监听抢锁结果
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
    if (e != null) {
        return;
    }

    // 抢到锁后,后台开启一个线程,进行续约逻辑,也就是看门狗线程
    if (ttlRemaining == null) {
        scheduleExpirationRenewal(threadId);
    }
});
return ttlRemainingFuture;
//续约逻辑
private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }
    //通过参数2,参数3 去描述什么时候去做参数1的事情,即到过期时间的1/3续约
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
            //续约
            RFuture<Boolean> future = renewExpirationAsync(threadId);
            //监听续约结果
            future.onComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock " + getName() + " expiration", e);
                    return;
                }
                //续约成功,递归调用自己,再重新设置一个timeTask(),完成不停的续约
				//假设我们的线程出现了宕机,不能再调用renewExpiration方法,时间到后自然释放
                if (res) {
                    // reschedule itself
                    renewExpiration();
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    
    ee.setTimeout(task);
}

MutiLock原理

为了提高redis的可用性,我们会搭建集群或者主从,现在以主从为例

问题:主机还没有来得及把数据写入到从机去的时候,此时主机宕机,哨兵会发现主机宕机,并且选举一个slave变成master,而此时新的master中实际上并没有锁信息,此时锁信息就已经丢掉了

解决:redission提出来了MutiLock锁,使用这把锁后每个节点的地位都一样, 这把锁加锁的逻辑需要写入到每一个主从节点上,只有所有的服务器都写入成功,此时才是加锁成功,假设现在某个节点挂了,那么他去获得锁的时候,只要有一个节点拿不到,都不能算是加锁成功,就保证了加锁的可靠性

原理:redission会将多个锁添加到一个集合中,然后用while循环去不停去尝试拿锁,但是会有一个总共的加锁时间,这个时间是用需要加锁的个数 * 1500ms ,假设有3个锁,那么时间就是4500ms,假设在这4500ms内,所有的锁都加锁成功, 那么此时才算是加锁成功,如果在4500ms有线程加锁失败,则会再次去进行重试

image-20221031233735846
image-20221031233735846

Spring Cache

整合

  • 依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-cache</artifactId>
    </dependency>
    
  • 配置

    spring.cache.type=redis
    #spring.cache.cache-names=
    
    #一小时,这里单位是毫秒
    spring.cache.redis.time-to-live=3600000
    #如果使用前缀,就用我们指定的前缀,如果没有就默认使用缓存的名字作为前缀
    spring.cache.redis.key-prefix=CACHE_
    spring.cache.redis.use-key-prefix=true
    # 是否缓存空值。防止缓存穿透
    spring.cache.redis.cache-null-values=true
    
  • 开启

    @EnableCaching  // 开启缓存
    
  • 使用

    /**
     * 'catagory'放到哪个名字的缓存【缓存的分区(按照业务类型分)】
     *
     * 代表当前方法的结果需要缓存,如果缓存中有,方法不用调用
     * 如果缓存中没有,会调用方法,最后将方法的结果放入缓存
     *
     *  默认行为
     *  1)、如果缓存中有,方法不用调用
     *  2)、key默认自动生成,缓存名字::SimpleKey [](自动生成的key)
     *  3)、缓存的value值,默认使用的是jdk的序列化机制,将序列化后的值存在redis中
     *  4)、默认时间ttl=-1
     *
     *  自定义属性:
     *  1)、指定生成的缓存使用的key:key属性指定,使用spel表达式
     *        SPEL表达式:https://docs.spring.io/spring/docs/5.2.7.RELEASE/spring-framework-reference/integration.html#cache-spel-context
     *   2)、指定缓存的数据的存活时间:配置文件中修改ttl,spring.cache.redis.time-to-live=3600000
     *   3)、将数据保存为json格式(异构系统比如php可能不兼容)
     */
    
    //因为spel动态取值,所有需要额外加''表示字符串
    @Cacheable(value = {"catagory"},key = "'Level1Categorys'")
    @Override
    public List<CategoryEntity> getLevel1Categorys() {
        System.out.println("getLevel1Categorys......");
        long l = System.currentTimeMillis();
        return baseMapper.selectList(new QueryWrapper<CategoryEntity>().eq("parent_cid", 0));
    }
    
  • 配置类

    //开启属性配置绑定
    @EnableConfigurationProperties({CacheProperties.class})
    @EnableCaching
    @Configuration
    public class MyCacheConfig {
    
        /**
         * 配置文件中的东西没有用上
         *1、原来和配置文件绑定的配置类,是这样子的
         * @ConfigurationProperties(prefix = "spring.cache")
         * public class CacheProperties
         *2、要让他生效,要用这个注解,@EnableConfigurationProperties
         */
        @Bean
        public RedisCacheConfiguration redisCacheConfiguration(CacheProperties cacheProperties) {
            CacheProperties.Redis redisProperties = cacheProperties.getRedis();
            
            RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig();
            //指定缓存序列化方式为json
            config = config.serializeKeysWith(RedisSerializationContext
                                  .SerializationPair
                                  .fromSerializer(new StringRedisSerializer()));
            config = config.serializeValuesWith(RedisSerializationContext
                     .SerializationPair
                     .fromSerializer(new GenericJackson2JsonRedisSerializer()));
            CacheProperties.Redis redisProperties = cacheProperties.getRedis();
            //设置配置文件中的各项配置,如过期时间
            if (redisProperties.getTimeToLive() != null) {
                config = config.entryTtl(redisProperties.getTimeToLive());
            }
            if (redisProperties.getKeyPrefix() != null) {
                config = config.prefixKeysWith(redisProperties.getKeyPrefix());
            }
            if (!redisProperties.isCacheNullValues()) {
                config = config.disableCachingNullValues();
            }
            if (!redisProperties.isUseKeyPrefix()) {
                config = config.disableKeyPrefix();
            }
            return config;
        }
    }
    
  • 缓存失效

    //在执行被注解的方法时,会将缓存中的该数据删除
    @CacheEvict(value = {"category"},key = "'levelCategorys'")
    //删除多个
    @Caching(evict = {
            @CacheEvict(value = "category",key= "'xxx'"),
            @CacheEvict(value = "category",key = "'yyy'")
    })
    //删除所有
    @CacheEvict(value="category",allEntries=true)
    
  • 问题:读模式下缓存击穿的锁默认没有开启,开启后是本地锁,不是分布式锁,写模式无锁

    // sync = true
    @Cacheable(value = {"catagory"}, key = "#root.method.name", sync = true)
    
上次编辑于:
贡献者: 李元昊