跳至主要內容

Redis 应用

程序员李某某大约 36 分钟

Redis 应用

短信登录

session实现

image-20221113215740406
image-20221113215740406

发送、登录、注册

  • 发送验证码
    • 校验手机号格式RegexUtils.isPhoneInvalid(phone)
    • 生成验证码RandomUtil.randomNumbers(6)
    • 保存到sessionsession.setAttribute("code",code)
    • 发送验证码调用第三方api
  • 登录/注册
    • 校验手机号
    • 获取验证码session.getAttribute("code")
    • 校验验证码cacheCode == null || !cacheCode.toString().equals(code)
    • 根据手机号查库
      • 不存在,根据手机号创建用户
      • 存在,保存到sessionsession.setAttribute("user",user)

登录拦截

  • 登陆拦截

    public class LoginInterceptor implements HandlerInterceptor {
    
        @Override
        public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
           //1.获取session
            HttpSession session = request.getSession();
            //2.获取session中的用户
            Object user = session.getAttribute("user");
            //3.判断用户是否存在
            if(user == null){
                  //4.不存在,拦截,返回401状态码
                  response.setStatus(401);
                  return false;
            }
            //5.存在,保存用户信息到Threadlocal
            UserHolder.saveUser((User)user);
            //6.放行
            return true;
        }
    }
    
    @Configuration
    public class MvcConfig implements WebMvcConfigurer {
    
        @Resource
        private StringRedisTemplate stringRedisTemplate;
    
        @Override
        public void addInterceptors(InterceptorRegistry registry) {
            // 登录拦截器
            registry.addInterceptor(new LoginInterceptor())
                    .excludePathPatterns(
                            "/shop/**",
                            "/voucher/**",
                            "/shop-type/**",
                            "/upload/**",
                            "/blog/hot",
                            "/user/code",
                            "/user/login"
                    ).order(1);
            // token刷新的拦截器
            registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate)).addPathPatterns("/**").order(0);
        }
    }
    

隐藏敏感信息

  • 隐藏用户敏感信息

    • 创建VO或TO,无敏感信息进行返回

    • 登录时返回vo对象session.setAttribute("user", BeanUtils.copyProperties(user,UserDTO.class))

    • 拦截器中vo保存到ThreadLocal中UserHolder.saveUser((UserDTO) user)

    • 在UserHolder处:将user对象换成UserDTO

      public class UserHolder {
          private static final ThreadLocal<UserDTO> tl = new ThreadLocal<>();
      
          public static void saveUser(UserDTO user){
              tl.set(user);
          }
      
          public static UserDTO getUser(){
              return tl.get();
          }
      
          public static void removeUser(){
              tl.remove();
          }
      }
      

redis实现

  • session的问题:分布式下的tomcat中的session不共享,这台机器有,换台机器就没了,登录拦截存在问题,可以拷贝session到多台服务器
    • 每台都有,浪费,服务器压力大
    • 拷贝存在延迟
  • 数据类型的选择
    • string:更直观,但是每次增删改查都是完整大对象,性能不好,数据量小时可以考虑
    • hash:value是键值对,更新时只需要更新对应的属性即可,性能好
image-20221113222022687
image-20221113222022687

隐藏敏感信息

  • 敏感信息保护
    • 以手机号为key容易暴露隐私
    • 随机生成以一个token令牌更合适

发送、登录、注册

  • 发送验证码

    • 校验手机号格式RegexUtils.isPhoneInvalid(phone)
    • 生成验证码RandomUtil.randomNumbers(6)
    • 保存到redisstringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY + phone)
    • 发送验证码调用第三方api
  • 登录/注册

    • 校验手机号

    • 从redis获取验证码stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY + phone)

    • 校验验证码

    • 根据手机号查库

      • 不存在,根据手机号创建用户

      • 存在,保存到redis

        // 7.1.随机生成token,作为登录令牌
        String token = UUID.randomUUID().toString(true);
        // 7.2.将User对象转为HashMap存储
        UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
        Map<String, Object> userMap = BeanUtil.beanToMap(userDTO, new HashMap<>(),
                CopyOptions.create()
                        .setIgnoreNullValue(true)
                        .setFieldValueEditor((fieldName, fieldValue) -> fieldValue.toString()));
        // 7.3.存储
        String tokenKey = LOGIN_USER_KEY + token;
        stringRedisTemplate.opsForHash().putAll(tokenKey, userMap);
        // 7.4.设置token有效期
        stringRedisTemplate.expire(tokenKey, LOGIN_USER_TTL, TimeUnit.MINUTES);
        

拦截器

  • 拦截器

    image-20221113223545624
    image-20221113223545624
    • 问题:只拦截登录路径,其他路径不拦截,导致token有效期不能刷新

    • 解决:再加一个拦截器

      image-20221113223729851
      image-20221113223729851
      public class RefreshTokenInterceptor implements HandlerInterceptor {
      
          private StringRedisTemplate stringRedisTemplate;
      
          public RefreshTokenInterceptor(StringRedisTemplate stringRedisTemplate) {
              this.stringRedisTemplate = stringRedisTemplate;
          }
      
          @Override
          public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
              // 1.获取请求头中的token
              String token = request.getHeader("authorization");
              if (StrUtil.isBlank(token)) {
                  return true;
              }
              // 2.基于TOKEN获取redis中的用户
              String key  = LOGIN_USER_KEY + token;
              Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(key);
              // 3.判断用户是否存在
              if (userMap.isEmpty()) {
                  return true;
              }
              // 5.将查询到的hash数据转为UserDTO
              UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
              // 6.存在,保存用户信息到 ThreadLocal
              UserHolder.saveUser(userDTO);
              // 7.刷新token有效期
              stringRedisTemplate.expire(key, LOGIN_USER_TTL, TimeUnit.MINUTES);
              // 8.放行
              return true;
          }
      
          @Override
          public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
              // 移除用户
              UserHolder.removeUser();
          }
      }
      
      public class LoginInterceptor implements HandlerInterceptor {
      
          @Override
          public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
              // 1.判断是否需要拦截(ThreadLocal中是否有用户)
              if (UserHolder.getUser() == null) {
                  // 没有,需要拦截,设置状态码
                  response.setStatus(401);
                  // 拦截
                  return false;
              }
              // 有用户,则放行
              return true;
          }
      }
      

秒杀

分布式ID

  • 优惠券的id

    • 需要全局唯一

    • 若使用订单的自增id存在问题:

      • 规律明显,容易猜出敏感信息,如订单量
      • 受单表数据量限制,数据量大需要拆表,id可能一样,但逻辑上还是一张表,必须保证id唯一
    • 全局ID生成器,是一种在分布式系统下用来生成全局唯一ID的工具

      • 常见:UUID、雪花算法、redis自增ID等

      • redis自增ID一般要满足下列特性

        • 唯一性
        • 递增
        • 安全
        • 高可用
        • 高性能
      • redis自增ID组成

        • 符号位:1bit,永远为0
        • 时间戳:31bit,以秒为单位,可以使用69年
        • 序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID
      @Component
      public class RedisIdWorker {
          /**
           * 开始时间戳
           */
          private static final long BEGIN_TIMESTAMP = 1640995200L;
          /**
           * 序列号的位数
           */
          private static final int COUNT_BITS = 32;
      
          private StringRedisTemplate stringRedisTemplate;
      
          public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {
              this.stringRedisTemplate = stringRedisTemplate;
          }
      
          public long nextId(String keyPrefix) {
              // 1.生成时间戳
              LocalDateTime now = LocalDateTime.now();
              long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
              long timestamp = nowSecond - BEGIN_TIMESTAMP;
      
              // 2.生成序列号
              // 2.1.获取当前日期,精确到天
              String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
              // 2.2.自增长
              long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
      
              // 3.拼接并返回
              return timestamp << COUNT_BITS | count;
          }
      }
      
    • redis自增ID测试

      @Test
      void testIdWorker() throws InterruptedException {
          CountDownLatch latch = new CountDownLatch(300);
      
          Runnable task = () -> {
              for (int i = 0; i < 100; i++) {
                  long id = redisIdWorker.nextId("order");
                  System.out.println("id = " + id);
              }
              latch.countDown();
          };
          long begin = System.currentTimeMillis();
          for (int i = 0; i < 300; i++) {
              es.submit(task);
          }
          latch.await();
          long end = System.currentTimeMillis();
          System.out.println("time = " + (end - begin));
      }
      

      CountDownLatch名为信号枪:主要的作用是同步协调在多线程的等待于唤醒问题

      当异步程序没有执行完时,主线程已经执行完,我们期望的是分线程全部走完,主线程再走

      CountDownLatch 中有两个最重要的方法

      • countDown
      • await

      await 方法 是阻塞方法,我们担心分线程没有执行完时,main线程就先执行,所以使用await可以让main线程阻塞

      当CountDownLatch 内部维护的 变量变为0时,就不再阻塞,直接放行

      • 调用一次countDown ,内部变量就减少1,我们让分线程和变量绑定, 执行完一个分线程就减少一个变量,直到解除阻塞,统计出来的时间也就是所有分线程执行完后的时间

秒杀逻辑

  • 查询优惠券
  • 判断是否具备秒杀条件
    • 秒杀是否开始
    • 秒杀是否结束
    • 库存是否充足
    • 一人一券一单:查库:是否用券下过单(存在并发问题
  • 扣减库存(存在并发问题
  • 创建订单(订单id,用户id,优惠券id)(存在并发问题

并发问题

乐观锁比较适合更新数据,而插入数据适合使用悲观锁

  • 扣减库存 --- 乐观锁

    • 每次扣减的sql加上库存的判断条件

      • stock = xxx.getStock()
      • 但是同时进来的其他线程就永远无法满足上述条件,因为库存已经比getStock少一个
      • 故,只要 stock > 0 即可
    • cas自旋压力过大

      • AtomicLong中cas自旋压力过大,销毁cpu

      • java8的改进,LongAdder

        • 大量线程并发更新一个原子性的时候,天然的问题就是自旋,会导致并发性问题,当然这也比我们直接使用syn来的好

        • LongAdder来进行优化,如果获取某个值,则会对cell和base的值进行递增,最后返回一个完整的值

          image-20221113232703291
          image-20221113232703291
  • 一人一券一单(恶意刷单) --- 悲观锁

      • 降低锁粒度,不能使用同步方法或this锁,只需要锁住当前用户userId

      • 为保证是同一把锁,每个线程进来的userId不是同一个对象,所以userId.toString()

      • 包装类toString()是new String(),仍不是同一个对象,故userId.toString().intern()

        @Transactional
        public  Result createVoucherOrder(Long voucherId) {
        	Long userId = UserHolder.getUser().getId();
        	synchronized(userId.toString().intern()){
                 // 5.1.查询订单
                int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
                // 5.2.判断是否存在
                if (count > 0) {
                    // 用户已经购买过了
                    return Result.fail("用户已经购买过一次!");
                }
        
                // 6.扣减库存
                boolean success = seckillVoucherService.update()
                     .setSql("stock = stock - 1") // set stock = stock - 1
                     .eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0
                     .update();
                if (!success) {
                    // 扣减失败
                    return Result.fail("库存不足!");
                }
        
                // 7.创建订单
                VoucherOrder voucherOrder = new VoucherOrder();
                // 7.1.订单id
                long orderId = redisIdWorker.nextId("order");
                voucherOrder.setId(orderId);
                // 7.2.用户id
                voucherOrder.setUserId(userId);
                // 7.3.代金券id
                voucherOrder.setVoucherId(voucherId);
                save(voucherOrder);
        
                // 7.返回订单id
                return Result.ok(orderId);
            }
        }
        
    • 事务与锁

      • @Transactional方法被spring的事务控制,

      • 同步代码块执行完,锁释放

      • 但当前方法事务未提交

      • 新的线程拿到锁,造成并发

        Long userId = UserHolder.getUser().getId();
        synchronized(userId.toString().intern()){
            return this.createVoucherOrder(voucherId);
        }
        
    • 事务

      • spring的事务是AOP实现的,也就是代理对象处理的事务,非代理对象执行事务方法不生效

      • this.的方式调用的事务不生效,需要利用代理对象调用来生效

        Long userId = UserHolder.getUser().getId();
        synchronized(userId.toString().intern()){
            // 获取代理对象
            IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy();
            return proxy.createVoucherOrder(voucherId);
        }
        
  • 集群下的并发问题 --- 分布式锁

    public interface ILock{
        boolean tryLock(long timeoutSec);
        void unlock();
    }
    
    public class SimpleRedisLock implements ILock{
        private static final String KEY_PREFIX="lock:"
        @Override
        public boolean tryLock(long timeoutSec) {
            // 获取线程标示
            String threadId = Thread.currentThread().getId()
            // 获取锁
            Boolean success = stringRedisTemplate.opsForValue()
                    .setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);
            return Boolean.TRUE.equals(success);
        }
        
        public void unlock() {
            //通过del删除锁
            stringRedisTemplate.delete(KEY_PREFIX + name);
    	}
    }
    
    //创建锁对象(新增代码)
    SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
    //获取锁对象
    boolean isLock = lock.tryLock(1200);
    //加锁失败
    if (!isLock) {
        return Result.fail("不允许重复下单");
    }
    try {
        //获取代理对象(事务)
        IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
        return proxy.createVoucherOrder(voucherId);
    } finally {
        //释放锁
        lock.unlock();
    }
    
    • 加锁的原子性问题(死锁)(上述代码不存在)

    • 锁误删

      • 好多线程都在等锁释放
      • 但是线程1没有手动释放,到期后自动释放
      • 线程2拿到锁
      • 线程1开始手动释放删除锁,误把线程2的删除了

      解决方案:在占锁时加上或者设为UUID值。删除时进行判断

    • 删除的原子性问题

    • 自动续期问题 --- Redisson

      //获取锁对象
      RLock lock = redissonClient.getLock("lock:order:" + userId);
      boolean isLock = lock.tryLock();	//看门狗时间30s
      
      //加锁失败
      if (!isLock) {
          return Result.fail("不允许重复下单");
      }
      try {
          //获取代理对象(事务)
          IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
          return proxy.createVoucherOrder(voucherId);
      } finally {
          //释放锁
          lock.unlock();
      }
      

优化

  • 将耗时比较短的逻辑判断放入到redis中,比如是否库存足够,比如是否一人一单
  • 这样的操作,只要这种逻辑可以完成,就意味着我们是一定可以下单完成的
  • 我们只需要进行快速的逻辑判断,根本就不用等下单逻辑走完,我们直接给用户返回成功,
  • 再在后台开一个线程,后台线程慢慢的去执行queue里边的消息,而且也不用担心线程池消耗殆尽的问题,因为这里我们的程序中并没有手动使用任何线程池
  • 快速校验一人一单,还有库存判断
    • 库存放到redis查
    • 一人一单放到redis的set集合(用户id、优惠券id)
    • redis没有订单,把订单信息放到queue中,异步完成订单
    • 下单成功,获取订单id,返回给前端
  • Redis完成秒杀资格判断

    • 新增秒杀优惠券的同时,将优惠券信息保存到Redis中

    • 基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功

      -- 1.参数列表
      -- 1.1.优惠券id
      local voucherId = ARGV[1]
      -- 1.2.用户id
      local userId = ARGV[2]
      -- 1.3.订单id
      local orderId = ARGV[3]
      
      -- 2.数据key
      -- 2.1.库存key
      local stockKey = 'seckill:stock:' .. voucherId
      -- 2.2.订单key
      local orderKey = 'seckill:order:' .. voucherId
      
      -- 3.脚本业务
      -- 3.1.判断库存是否充足 get stockKey
      if(tonumber(redis.call('get', stockKey)) <= 0) then
          -- 3.2.库存不足,返回1
          return 1
      end
      -- 3.2.判断用户是否下单 SISMEMBER orderKey userId
      if(redis.call('sismember', orderKey, userId) == 1) then
          -- 3.3.存在,说明是重复下单,返回2
          return 2
      end
      -- 3.4.扣库存 incrby stockKey -1
      redis.call('incrby', stockKey, -1)
      -- 3.5.下单(保存用户)sadd orderKey userId
      redis.call('sadd', orderKey, userId)
      -- 3.6.发送消息到队列中, XADD stream.orders * k1 v1 k2 v2 ...
      redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
      return 0
      
    • 如果抢购成功,将优惠券id和用户id封装后存入阻塞队列

      @Override
      public Result seckillVoucher(Long voucherId) {
          //获取用户
          Long userId = UserHolder.getUser().getId();
          long orderId = redisIdWorker.nextId("order");
          // 1.执行lua脚本
          Long result = stringRedisTemplate.execute(
                  SECKILL_SCRIPT,
                  Collections.emptyList(),
                  voucherId.toString(), userId.toString(), String.valueOf(orderId)
          );
          int r = result.intValue();
          // 2.判断结果是否为0
          if (r != 0) {
              // 2.1.不为0 ,代表没有购买资格
              return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
          }
          //TODO 保存阻塞队列
          // 3.返回订单id
          return Result.ok(orderId);
      }
      
  • 基于阻塞队列实现秒杀优化

    • 开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能

    • 实现类完整代码

      //异步处理线程池
      private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
      // 消息队列
      private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);
      // 代理对象
      private IVoucherOrderService proxy;
      //在类初始化之后执行,因为当这个类初始化好了之后,随时都是有可能要执行的
      @PostConstruct
      private void init() {
          SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
      }
      // 业务逻辑主体
      @Override
      public Result seckillVoucher(Long voucherId) {
          Long userId = UserHolder.getUser().getId();
          long orderId = redisIdWorker.nextId("order");
          // 1.执行lua脚本
          Long result = stringRedisTemplate.execute(
                  SECKILL_SCRIPT,
                  Collections.emptyList(),
                  voucherId.toString(), userId.toString(), String.valueOf(orderId)
          );
          int r = result.intValue();
          // 2.判断结果是否为0
          if (r != 0) {
              // 2.1.不为0 ,代表没有购买资格
              return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
          }
          VoucherOrder voucherOrder = new VoucherOrder();
          // 2.3.订单id
          long orderId = redisIdWorker.nextId("order");
          voucherOrder.setId(orderId);
          // 2.4.用户id
          voucherOrder.setUserId(userId);
          // 2.5.代金券id
          voucherOrder.setVoucherId(voucherId);
          // 2.6.放入阻塞队列
          orderTasks.add(voucherOrder);
          //3.获取代理对象
          proxy = (IVoucherOrderService) AopContext.currentProxy();
          //4.返回订单id
          return Result.ok(orderId);
      }
      @Transactional
      public void createVoucherOrder(VoucherOrder voucherOrder) {
          Long userId = voucherOrder.getUserId();
          // 5.1.查询订单
          int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count(
          // 5.2.判断是否存在
          if (count > 0) {
              // 用户已经购买过了
              log.error("用户已经购买过了");
              return;
          }
          // 6.扣减库存
          boolean success = seckillVoucherService.update()
                  .setSql("stock = stock - 1") // set stock = stock - 1
                  .eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0) // where id = ? and stoc
                  .update();
          if (!success) {
              // 扣减失败
              log.error("库存不足");
              return;
          }
          save(voucherOrder);
      }
      // 用于线程池处理的任务
      // 当初始化完毕后,就会去从对列中去拿信息
      private class VoucherOrderHandler implements Runnable {
          @Override
          public void run() {
              while (true) {
                  try {
                      // 1.获取队列中的订单信息
                      VoucherOrder voucherOrder = orderTasks.take();
                      // 2.创建订单
                      handleVoucherOrder(voucherOrder);
                  } catch (Exception e) {
                      log.error("处理订单异常", e);
                  }
              }
          }
          // 订单放入redis
          private void handleVoucherOrder(VoucherOrder voucherOrder) {
              //1.获取用户
              Long userId = voucherOrder.getUserId();
              // 2.创建锁对象
              RLock redisLock = redissonClient.getLock("lock:order:" + userId);
              // 3.尝试获取锁
              boolean isLock = redisLock.lock();
              // 4.判断是否获得锁成功
              if (!isLock) {
                  // 获取锁失败,直接返回失败或者重试
                  log.error("不允许重复下单!");
                  return;
              }
              try {
                  //注意:由于是spring的事务是放在threadLocal中,此时的是多线程,事务会失效
                  proxy.createVoucherOrder(voucherOrder);
              } finally {
                  // 释放锁
                  redisLock.unlock();
              }
          }
      }
      
    • 问题

      • 内存限制
      • 数据安全
  • 基于Redis的Stream结构作为消息队列,实现异步秒杀下单

    • 创建一个Stream类型的消息队列,名为stream.orders
    • 修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId
    • 项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单

    修改lua表达式,新增3.6

    image-20221114150749099
    image-20221114150749099

    实现类

    private class VoucherOrderHandler implements Runnable {
    
        @Override
        public void run() {
            while (true) {
                try {
                    // 1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                        Consumer.from("g1", "c1"),
                        StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                        StreamOffset.create("stream.orders", ReadOffset.lastConsumed())
                    );
                    // 2.判断订单信息是否为空
                    if (list == null || list.isEmpty()) {
                        // 如果为null,说明没有消息,继续下一次循环
                        continue;
                    }
                    // 解析数据
                    MapRecord<String, Object, Object> record = list.get(0);
                    Map<Object, Object> value = record.getValue();
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
                    // 3.创建订单
                    createVoucherOrder(voucherOrder);
                    // 4.确认消息 XACK
                    stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());
                } catch (Exception e) {
                    log.error("处理订单异常", e);
                    //处理异常消息
                    handlePendingList();
                }
            }
        }
    
        private void handlePendingList() {
            while (true) {
                try {
                    // 1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 0
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                        Consumer.from("g1", "c1"),
                        StreamReadOptions.empty().count(1),
                        StreamOffset.create("stream.orders", ReadOffset.from("0"))
                    );
                    // 2.判断订单信息是否为空
                    if (list == null || list.isEmpty()) {
                        // 如果为null,说明没有异常消息,结束循环
                        break;
                    }
                    // 解析数据
                    MapRecord<String, Object, Object> record = list.get(0);
                    Map<Object, Object> value = record.getValue();
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
                    // 3.创建订单
                    createVoucherOrder(voucherOrder);
                    // 4.确认消息 XACK
                    stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());
                } catch (Exception e) {
                    log.error("处理pendding订单异常", e);
                    try{
                        Thread.sleep(20);
                    }catch(Exception e){
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    

消息队列

基于List实现消息队列

Redis的list数据结构是一个双向链表,很容易模拟出队列效果

队列是入口和出口不在一边,因此我们可以利用:LPUSH 结合 RPOP、或者 RPUSH 结合 LPOP来实现

不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息,因此这里应该使用BRPOP或者BLPOP来实现阻塞效果

blpop key timeout			#取一个,设置超时时间,临时阻塞
brpop key timeout

优点:

  • 利用Redis存储,不受限于JVM内存上限
  • 基于Redis的持久化机制,数据安全性有保证
  • 可以满足消息有序性

缺点:

  • 无法避免消息丢失
  • 只支持单消费者

基于PubSub的消息队列

PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。

SUBSCRIBE channel [channel] :订阅一个或多个频道 PUBLISH channel msg :向一个频道发送消息 PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的所有频道

SUBSCRIBE channel [channel] 		#订阅一个或多个频道
PUBLISH channel msg					#向一个频道发送消息
PSUBSCRIBE pattern[pattern] 		#订阅与pattern格式匹配的所有频道

优点:

  • 采用发布订阅模型,支持多生产、多消费

缺点:

  • 不支持数据持久化
  • 无法避免消息丢失
  • 消息堆积有上限,超出时数据丢失

基于Stream的消息队列

Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。

发送消息的命令:

image-20221114145730726
image-20221114145730726

读取消息的方式之一:XREAD

image-20221114145800660
image-20221114145800660
image-20221114145806812
image-20221114145806812

XREAD阻塞方式,读取最新的消息:

image-20221114145820418
image-20221114145820418

在业务开发中,我们可以循环的调用XREAD阻塞方式来查询最新消息,从而实现持续监听队列的效果,伪代码如下

image-20221114145837269
image-20221114145837269

注意:当我们指定起始ID为$时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题

STREAM类型消息队列的XREAD命令特点:

  • 消息可回溯
  • 一个消息可以被多个消费者读取
  • 可以阻塞读取
  • 有消息漏读的风险

基于Stream的消息队列-消费者组

消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:

image-20221114150459861
image-20221114150459861

创建消费者组:

image-20221114150535384
image-20221114150535384

key:队列名称 groupName:消费者组名称 ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息 MKSTREAM:队列不存在时自动创建队列 其它常见命令:

删除指定的消费者组

XGROUP DESTORY key groupName

给指定的消费者组添加消费者

XGROUP CREATECONSUMER key groupname consumername

删除消费者组中的指定消费者

XGROUP DELCONSUMER key groupname consumername

从消费者组读取消息:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
  • group:消费组名称
  • consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
  • count:本次查询的最大数量
  • BLOCK milliseconds:当没有消息时最长等待时间
  • NOACK:无需手动ACK,获取到消息后自动确认
  • STREAMS key:指定队列名称
  • ID:获取消息的起始ID:

">":从下一个未消费的消息开始 其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始

消费者监听消息的基本思路:

image-20221114150552194
image-20221114150552194

STREAM类型消息队列的XREADGROUP命令特点:

  • 消息可回溯
  • 可以多消费者争抢消息,加快消费速度
  • 可以阻塞读取
  • 没有消息漏读的风险
  • 有消息确认机制,保证消息至少被消费一次

最后我们来个小对比

image-20221114150601907
image-20221114150601907

排行榜

点赞

实现点赞,用到set的不可重复

需求:

  • 同一个用户只能点赞一次,再次点击则取消点赞
  • 如果当前用户已经点赞,则点赞按钮高亮显示(前端已实现,判断字段Blog类的isLike属性)

修改查询业务实现:

  • 给Blog类中添加一个isLike字段,标示是否被当前用户点赞

    @TableField(exist = false)
    private Boolean isLike;
    
  • 查询blog时设置isLike

    • 获取登录用户
    • 判断是否点过赞stringRedisTemplate.opsForSet().isMember(key, userId.toString())
      • 未点赞,setIsLike(false)
      • 已点赞,setIsLike(true)

点赞业务实现:

  • 获取登录用户
  • 判断是否点过赞stringRedisTemplate.opsForSet().isMember(key, userId.toString())
    • 未点赞,则可以点赞(更新数据库,赞数+1,用户id放入redis的set中)
    • 已点赞,则取消点赞(更新数据库,赞数-1,用户id在redis中移除)

点赞排行

实现排序,用到zset即sortedSet

修改点赞业务实现:

  • 获取登录用户
  • 判断是否点过赞stringRedisTemplate.opsForZSet().score(key, userId.toString())
    • 未点赞,则可以点赞(更新数据库,赞数+1,用户id放入redis的set中)
    • 已点赞,则取消点赞(更新数据库,赞数-1,用户id在redis中移除)

查询top5

  • 查询top5的点赞用户 zrange key 0 4stringRedisTemplate.opsForZSet().range(key, 0, 4)
    • 没有,返回空集合Collections.emptyList()
    • 有,拿到id,查库

关注

关注 - 取关

关注业务

  • 获取登录用户
  • 判断到底是关注还是取关
    • 关注 --- 更新数据库
    • 取关 --- 更新数据库

查询业务

  • 获取登录用户
  • 查询是否关注count > 0

共同关注

使用set交并补的api

关注业务

  • 获取登录用户
  • 判断到底是关注还是取关
    • 关注 --- 更新数据库,放入redis的setstringRedisTemplate.opsForSet().add(key, followUserId.toString())
    • 取关 --- 更新数据库,删除redis stringRedisTemplate.opsForSet().remove(key, followUserId.toString())

共同关注

  • 获取当前用户
  • 求交集stringRedisTemplate.opsForSet().intersect(key, key2)
    • 无交集,返回空集合Collections.emptyList()
    • 有交集,拿到id集合查库

推送

Feed流推送

传统:通过查询被动呈现

Feed:主动推送(投喂)

常见模式

  • Timeline:不做内容筛选,简单的按照内容发布时间排序,常用于好友或关注。例如朋友圈

    • 优点:信息全面,不会有缺失。并且实现也相对简单
    • 缺点:信息噪音较多,用户不一定感兴趣,内容获取效率低
  • 智能排序:利用智能算法屏蔽掉违规的、用户不感兴趣的内容。推送用户感兴趣信息来吸引用户

    • 优点:投喂用户感兴趣信息,用户粘度很高,容易沉迷
    • 缺点:如果算法不精准,可能起到反作用

Timeline实现方案:

  • 拉模式:也叫做读扩散

    博主将消息存放在自己发件箱,等粉丝打开收件箱读取时,向每个博主发件箱拉取消息,进行排序

    • 优点:比较节约空间,因为粉丝读信息时,并没有重复读取,读取完之后可以清除收件箱

    • 缺点:比较延迟,当用户读取数据时才去关注的人里边去读取数据,假设用户关注了大量的用户,那么此时就会拉取海量的内容,对服务器压力巨大

  • 推模式:也叫做写扩散。

    博主直接推送到粉丝收件箱

    • 优点:时效快,不用临时拉取

    • 缺点:内存压力大,假设一个大V写信息,很多人关注他, 就会写很多分数据到粉丝那边去

  • 推拉结合模式:也叫做读写混合,兼具推和拉两种模式的优点。

    推拉模式是一个折中的方案,站在博主一端

    • 如果是个普通的人,那么我们采用写扩散的方式,直接把数据写入到他的粉丝中去,因为普通的人他的粉丝关注量比较小,所以这样做没有压力
    • 如果是大V,那么他是直接将数据先写入到一份到发件箱里边去
      • 对于活跃粉丝,直接发送
      • 不活跃粉丝,等待拉取

    站在收件人这端

    • 如果是活跃粉丝,大V和普通博主发的都会直接写入到自己收件箱里
    • 如果是普通粉丝,由于他们上线不是很频繁,所以等他们上线时,再从发件箱里边去拉信息

需求:

  • 修改新增探店笔记的业务,在保存blog到数据库的同时,推送到粉丝的收件箱
  • 收件箱满足可以根据时间戳排序,必须用Redis的数据结构实现
  • 查询收件箱数据时,可以实现分页查询

传统分页失效:Feed流中的数据会不断更新,所以数据的角标也在变化,因此不能采用传统的分页模式。

image-20221114211814455
image-20221114211814455

Feed流的滚动分页:需要记录每次操作的最后一条,然后从这个位置开始去读取数据

image-20221114170959145
image-20221114170959145

修改新增blog业务

  • 获取登录用户
  • 保存blog
  • 查询所有粉丝
  • 推送笔记id给所有粉丝stringRedisTemplate.opsForZSet().add(key, blog.getId().toString(), System.currentTimeMillis())

分页查询业务

  • 每次查询完成后,需要得到数据的最小时间戳,这个值会作为下一次查询的条件

  • 下次查询需要找到与上一次查询相同的查询个数,作为偏移量,跳过这些查询过的数据

综上:我们的请求参数中就需要携带上一次查询的最小时间戳( lastId)、偏移量(offset)这两个参数

这两个参数第一次会由前端来指定,以后的查询就根据后台结果作为条件,再次传递到后台

  • 获取当前用户

  • 查询收件箱

    Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(key, 0, lastId, offset, 2)
    
  • 非空判断 --- null直接返回

  • 解析blogId、minTime(时间戳)、offset

    List<Long> ids = new ArrayList<>(typedTuples.size());
    long minTime = 0; // 最小时间戳.
    int os = 1; // 相同时间的偏移量
    for (ZSetOperations.TypedTuple<String> tuple : typedTuples) { 
        // 4.1.获取id,放入集合,用于查询blog内容
        ids.add(Long.valueOf(tuple.getValue()));
        // 4.2.获取分数(时间戳)
        long time = tuple.getScore().longValue();
        if(time == minTime){
            os++;	// 相同时间戳,os++
        }else{		// 时间戳不同,表示当前时间戳的blog全部浏览完,重置最小时间戳和相同时间戳偏移量
            minTime = time;
            os = 1;
        }
    }
    
  • 根据id查询blog

    • 查询blog有关的用户 --- blog内容包含用户信息(需要加排序
    • 查询blog是否被点赞
  • 封装并返回

    ScrollResult r = new ScrollResult();
    r.setList(blogs);		// blog集合
    r.setOffset(os);		// 偏移量
    r.setMinTime(minTime);	// 最小时间戳
    
    return Result.ok(r);
    

互相关注

MVP技术方案V0

mvp:最小可用版本

  • 建模:互关功能比较简单,一般来说,除了用户主表,只需要一张中间表来存储用户之间的关系即可

    • 中间表:follower(id,from_user_id(粉丝),to_user_id(博主))
  • 接口逻辑

    • 关注某人:在follower中添加一条from_user_id为当前登录用户,to_user_id为被关注者id

    • 取关某人:将follower中from_user_id为当前登录用户,并且to_user_id为某人Id的数据删除,或逻辑删除

    • 粉丝列表:查询to_user_id为某用户Id的所有用户Id,去user表查询用户缩略信息,比如头像,昵称等

      展示是否互关,根据博主id查询粉丝id,再将粉丝id作为to_user_id查from_user_id,然后进行一一匹配,可以匹配上的,可以认为是已经互关,在Vo层展示的时候可以加一个是否互关字段,用于展示是否互关

    • 我的关注:查询from_user_id为当前登录用户的所有用户id,然后去user表查询用户缩略信息

      同理,展示是否互关,还需要再查询to_user_id为当前登录用户的所有用户id,然后进行一一匹配

    • 关注数量、粉丝数量:查询from_user_id等于当前登录用户的数量、from_user_id等于当前登录用户的数量

  • 部署方案:MVP部署方案,旨在首先将功能跑通,然后再在此基础上进行部署架构的优化

    依赖组件:

    • nginx:代理页面静态资源,和api

    • tomcat:部署应用

    • mysql:持久化数据

  • 不足

    • 主键使用的数据库自带的id,这个无法应对分库分表的场景
    • 查询互关的时候,需要查两遍库,增加IO次数,并且需要匹配,也会降低接口性能
    • 获取用户粉丝数和关注数时,使用count的方法,在数据量级上来之后,还是会导致性能问题
    • follower表的数据是基本成指数级增长,比如,如果系统有1W的用户量,最糟糕的情况下,可能会产生小2亿的数据(不过一般达不到),但随着用户量的增加,这个表数据量会非常惊人
    • MVP版本部署,首先系统的吞吐量会比较低,其次,服务是处于一个无法容灾的状态,完全没有高可用

改进V1

  • 对应MVP的解决方案
    • 增加uid(分布式ID),作为业务主键,方便后续分库分表
    • 在follower表中增加is_mutual_follwer标识是否互关,在关注接口获取,取关接口对此值进行修改,可以提升查询时的速度。因为正常情况下都是读多写少
    • 增加user_follower_info用来保存用户的关注数和粉丝数,避免使用count(*)导致性能问题
    • 对follower表进行拆分,拆成follwer_from表和follower_to表,分别表示用户关注和用户粉丝,方便后续对表水平拆分
    • 冷热分离,为了进一步对表进行拆分,减少对大表的检索,将follwer_from表和follower_to表,分别拆分为follwer_from_hot表、follwer_from_cold表、follower_to_hot表和follwer_to_cold表,用来存储热点数据,按关注的先后顺序区分热点数据,在follwer_from_hot表,只保存每个人用户前100的数据(或者业务自己定),当新关注其他人时,先检查数量是否大于100,然后如果大于则将最旧挪到follwer_from_cold表,然后再将新数据插入此表。follwer_to_hot同理。此优化主要针对,大部分情况下,用户都是只看前面关注数,不会深度查询,这样可以增加较快的查询效率,尤其针对一些大V账号,动不动都是百万甚至千万级别的粉丝,效果尤其显著
  • 部署
    • nginx提供负载均衡
    • tomcat集群
    • 数据库有MHA
  • 不足
    • 用户的基本信息、follwer_from_hot和follwer_to_hot、关注数和粉丝数缓存至redis
    • 缓存一致性问题:加MQ

改进V2

image-20221114215024326
image-20221114215024326
  • 不足

    此部署架构,基本就是一个可以应对较高并发的架构了,但是像微博这样的并发量,此架构显然无法承载,因为,仅考虑国内用户的话,基本就分为了三大区域

    华南、华中、华北,而这三大区域的网络运营商是不一样的,所以,如果服务器部署在华北,很有可能造成华中和华南用户的体验不如华北,所以就需要多机房部署,应对不同地区的用户,当然,上面的nginx还是一台,这个显然也是无法满足的,所以,上SLB也是势在必行。因此,我们进一步优化

改进V3

V4_version
V4_version

至此,基本已经可以应对高并发了,如果数据量不断增加的话,可以通过使用Mycat或者sharding JDBC进行分库分表增加分布式任务调度是为了保证缓存和数据库的一致性,因为不能单纯靠rocketmq去保证一致性,增加日志服务为了后续审计数据,增加监控服务,快速感知应用服务和中间件等服务的状态。当然,代码层面依然有优化的空间,比如,可以使用WebFlux和complatablefuture进行编程,将有所有方法改成异步,这样可以进一步提升系统的吞吐量。并且可以增加一级缓存,继续提升系统性能,采用mq进行一级缓存和redis直接数据同步

GEO数据类型

GEO就是Geolocation的简写形式,代表地理坐标。Redis在3.2版本中加入了对GEO的支持,允许存储地理坐标信息,帮助我们根据经纬度来检索数据。常见的命令有:

它只是一个zset结构

  • GEOADD:添加一个地理空间信息,包含:经度(longitude)、纬度(latitude)、值(member)
  • GEODIST:计算指定的两个点之间的距离并返回
  • GEOHASH:将指定member的坐标转为hash字符串形式并返回
  • GEOPOS:返回指定member的坐标
  • GEORADIUS:指定圆心、半径,找到该圆内包含的所有member,并按照与圆心之间的距离排序后返回。6.以后已废弃
  • GEOSEARCH:在指定范围内搜索member,并按照与指定点之间的距离排序后返回。范围可以是圆形或矩形。6.2.新功能
  • GEOSEARCHSTORE:与GEOSEARCH功能一致,不过可以把结果存储到一个指定的key。 6.2.新功能

附近

api

  • 请求方式 GET
  • 请求路径 /shop/of/type
  • 请求参数
    • typeId:商户类型
    • current:滚动查询页码
    • x:经度
    • y:纬度
  • 返回值 List<Shop>

导入Redis

将数据库表中的数据导入到redis中

  • redis中的GEO,GEO在redis中就一个menber和一个经纬度
  • 把x和y轴传入到redis做的经纬度位置去,menber只存id(内存有限)

根据type筛选

  • redis中并没有存储type,所以我们无法根据type来对数据进行筛选

  • 按照商户类型做分组,类型相同的商户作为同一组,以typeId为key存入同一个GEO集合中即可

    如:

    #		typeId	x		y	shopId
    geoadd huoguo 116.25 34.44 haidilao
    geoadd huoguo 116.25 34.44 xiapuxiapu
    geoadd xiaochi 116.25 34.44 mixian
    geoadd xiaochi 116.25 34.44 shaokao
    
  • 查询店铺信息

  • 把店铺分组,按照typeId分组,typeId一致的放到一个集合

  • 分批完成写入Redis

    • 获取类型typeId
    • 获取同类型的店铺的集合
    • 写入redis GEOADD key 经度 纬度 member
    @Test
    void loadShopData() {
        // 1.查询店铺信息
        List<Shop> list = shopService.list();
        // 2.把店铺分组,按照typeId分组,typeId一致的放到一个集合
        Map<Long, List<Shop>> map = list.stream().collect(Collectors.groupingBy(Shop::getTypeId));
        // 3.分批完成写入Redis
        for (Map.Entry<Long, List<Shop>> entry : map.entrySet()) {
            // 3.1.获取类型id
            Long typeId = entry.getKey();
            String key = SHOP_GEO_KEY + typeId;
            // 3.2.获取同类型的店铺的集合
            List<Shop> value = entry.getValue();
            List<RedisGeoCommands.GeoLocation<String>> locations = new ArrayList<>(value.size());
            // 3.3.写入redis GEOADD key 经度 纬度 member
            for (Shop shop : value) {
                // stringRedisTemplate.opsForGeo().add(key, new Point(shop.getX(), shop.getY()), shop.getId().toString());
                locations.add(new RedisGeoCommands.GeoLocation<>(
                        shop.getId().toString(),
                        new Point(shop.getX(), shop.getY())
                ));
            }
            stringRedisTemplate.opsForGeo().add(key, locations);
        }
    }
    

查询附近业务

  • 判断是否需要根据坐标查询

    @Override
    public Result queryShopByType(Integer typeId, Integer current, Double x, Double y) {
        // 1.判断是否需要根据坐标查询
        if (x == null || y == null) {
            // 不需要坐标查询,按数据库查询
            Page<Shop> page = query()
                    .eq("type_id", typeId)
                    .page(new Page<>(current, SystemConstants.DEFAULT_PAGE_SIZE));
            // 返回数据
            return Result.ok(page.getRecords());
        }
    
        // 2.计算分页参数
        int from = (current - 1) * SystemConstants.DEFAULT_PAGE_SIZE;
        int end = current * SystemConstants.DEFAULT_PAGE_SIZE;
    
        // 3.查询redis、按照距离排序、分页。结果:shopId、distance
        String key = SHOP_GEO_KEY + typeId;
        GeoResults<RedisGeoCommands.GeoLocation<String>> results = stringRedisTemplate.opsForGeo() // GEOSEARCH key BYLONLAT x y BYRADIUS 10 WITHDISTANCE
                .search(
                        key,
                        GeoReference.fromCoordinate(x, y),
                        new Distance(5000),
                        RedisGeoCommands.GeoSearchCommandArgs.newGeoSearchArgs().includeDistance().limit(end)
                );
        // 4.解析出id
        if (results == null) {
            return Result.ok(Collections.emptyList());
        }
        List<GeoResult<RedisGeoCommands.GeoLocation<String>>> list = results.getContent();
        if (list.size() <= from) {
            // 没有下一页了,结束
            return Result.ok(Collections.emptyList());
        }
        // 4.1.截取 from ~ end的部分
        List<Long> ids = new ArrayList<>(list.size());
        Map<String, Distance> distanceMap = new HashMap<>(list.size());
        list.stream().skip(from).forEach(result -> {
            // 4.2.获取店铺id
            String shopIdStr = result.getContent().getName();
            ids.add(Long.valueOf(shopIdStr));
            // 4.3.获取距离
            Distance distance = result.getDistance();
            distanceMap.put(shopIdStr, distance);
        });
        // 5.根据id查询Shop
        String idStr = StrUtil.join(",", ids);
        List<Shop> shops = query().in("id", ids).last("ORDER BY FIELD(id," + idStr + ")").list();
        for (Shop shop : shops) {
            shop.setDistance(distanceMap.get(shop.getId().toString()).getValue());
        }
        // 6.返回
        return Result.ok(shops);
    }
    

签到

数据库太浪费,而且并发下,数据库压力大

位图:把每一个bit位对应当月的每一天,形成了映射关系。用0和1标示业务状态

Redis中是利用string类型数据结构实现BitMap,因此最大上限是512M,转换为bit则是 2^32个bit位

BitMap的操作命令有:

  • SETBIT:向指定位置(offset)存入一个0或1
  • GETBIT :获取指定位置(offset)的bit值
  • BITCOUNT :统计BitMap中值为1的bit位的数量
  • BITFIELD :操作(查询、修改、自增)BitMap中bit数组中的指定位置(offset)的值
  • BITFIELD_RO :获取BitMap中bit数组,并以十进制形式返回
  • BITOP :将多个BitMap的结果做位运算(与 、或、异或)
  • BITPOS :查找bit数组中指定范围内第一个0或1出现的位置

用户签到

  • 获取当前登录用户

  • 获取日期

  • 拼接key

  • 获取今天是本月的第几天

  • 写入Redis SETBIT key offset 1

    @Override
    public Result sign() {
        // 1.获取当前登录用户
        Long userId = UserHolder.getUser().getId();
        // 2.获取日期
        LocalDateTime now = LocalDateTime.now();
        // 3.拼接key
        String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
        String key = USER_SIGN_KEY + userId + keySuffix;
        // 4.获取今天是本月的第几天
        int dayOfMonth = now.getDayOfMonth();
        // 5.写入Redis SETBIT key offset 1
        stringRedisTemplate.opsForValue().setBit(key, dayOfMonth - 1, true);
        return Result.ok();
    }
    

签到统计

**问题1:**什么叫做连续签到天数?

从最后一次签到开始向前统计,直到遇到第一次未签到为止,计算总的签到次数,就是连续签到天数

Java逻辑代码:获得当前这个月的最后一次签到数据,定义一个计数器,然后不停的向前统计,直到获得第一个非0的数字即可,每得到一个非0的数字计数器+1,直到遍历完所有的数据,就可以获得当前月的签到总天数了

**问题2:**如何得到本月到今天为止的所有签到数据?

BITFIELD key GET u[dayOfMonth] 0

假设今天是10号,那么我们就可以从当前月的第一天开始,获得到当前这一天的位数,是10号,那么就是10位,去拿这段时间的数据,就能拿到所有的数据了,那么这10天里边签到了多少次呢?统计有多少个1即可

问题3:如何从后向前遍历每个bit位?

注意:bitMap返回的数据是10进制,假如说返回一个数字8,那么我哪儿知道到底哪些是0,哪些是1呢?我们只需要让得到的10进制数字和1做与运算就可以了,因为1只有遇见1 才是1,其他数字都是0 ,我们把签到结果和1进行与操作,每与一次,就把签到结果向右移动一位,依次内推,我们就能完成逐个遍历的效果了。

需求:统计当前用户截止当前时间在本月的连续签到天数

有用户有时间我们就可以组织出对应的key,此时就能找到这个用户截止这天的所有签到记录,再根据这套算法,就能统计出来他连续签到的次数了

@Override
public Result signCount() {
    // 1.获取当前登录用户
    Long userId = UserHolder.getUser().getId();
    // 2.获取日期
    LocalDateTime now = LocalDateTime.now();
    // 3.拼接key
    String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
    String key = USER_SIGN_KEY + userId + keySuffix;
    // 4.获取今天是本月的第几天
    int dayOfMonth = now.getDayOfMonth();
    // 5.获取本月截止今天为止的所有的签到记录,返回的是一个十进制的数字 BITFIELD sign:5:202203 GET u14 0
    // bitField可以多个子命令同时查询,故需要集合封装返回结果,这里其实集合中就一个元素
    List<Long> result = stringRedisTemplate.opsForValue().bitField(
            key,
            BitFieldSubCommands.create()
                    .get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth)).valueAt(0)
    );
    if (result == null || result.isEmpty()) {
        // 没有任何签到结果
        return Result.ok(0);
    }
    Long num = result.get(0);
    if (num == null || num == 0) {
        return Result.ok(0);
    }
    // 6.循环遍历
    int count = 0;
    while (true) {
        // 6.1.让这个数字与1做与运算,得到数字的最后一个bit位  // 判断这个bit位是否为0
        if ((num & 1) == 0) {
            // 如果为0,说明未签到,结束
            break;
        }else {
            // 如果不为0,说明已签到,计数器+1
            count++;
        }
        // 把数字右移一位,抛弃最后一个bit位,继续下一个bit位
        num >>>= 1;
    }
    return Result.ok(count);
}

UV统计

通常来说PV会比UV大很多,所以衡量同一个网站的访问量,我们需要综合考虑很多因素,所以我们只是单纯的把这两个值作为一个参考值

  • UV:全称Unique Visitor,也叫独立访客量,是指通过互联网访问、浏览这个网页的自然人。1天内同一个用户多次访问该网站,只记录1次。
  • PV:全称Page View,也叫页面访问量或点击量,用户每访问网站的一个页面,记录1次PV,用户多次打开页面,则记录多次PV。往往用来衡量网站的流量。

UV统计在服务端做会比较麻烦,因为要判断该用户是否已经统计过了,需要将统计过的用户信息保存

但是如果每个访问的用户都保存到Redis中,数据量会非常恐怖,那怎么处理呢?

HyperLogLog

天生唯一性,重复数据只记录一次

Hyperloglog(HLL)是从Loglog算法派生的概率算法,用于确定非常大的集合的基数,而不需要存储其所有值。

相关算法原理大家可以参考:https://juejin.cn/post/6844903785744056333#heading-0

Redis中的HLL是基于string结构实现的,单个HLL的内存永远小于16kb内存占用低的令人发指!作为代价,其测量结果是概率性的,有小于0.81%的误差。不过对于UV统计来说,这完全可以忽略

指令

image-20221114210107347
image-20221114210107347

UV统计测试

测试思路:我们直接利用单元测试,向HyperLogLog中添加100万条数据,看看内存占用和统计效果如何

image-20221114210920255
image-20221114210920255

经过测试:我们会发生他的误差是在允许范围内,并且内存占用极小

上次编辑于:
贡献者: 李元昊