Redis 应用
Redis 应用
短信登录
session实现

发送、登录、注册
- 发送验证码
- 校验手机号格式
RegexUtils.isPhoneInvalid(phone) - 生成验证码
RandomUtil.randomNumbers(6) - 保存到session
session.setAttribute("code",code) - 发送验证码
调用第三方api
- 校验手机号格式
- 登录/注册
- 校验手机号
- 获取验证码
session.getAttribute("code") - 校验验证码
cacheCode == null || !cacheCode.toString().equals(code) - 根据手机号
查库- 不存在,根据手机号创建用户
- 存在,保存到session
session.setAttribute("user",user)
登录拦截
登陆拦截
public class LoginInterceptor implements HandlerInterceptor { @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { //1.获取session HttpSession session = request.getSession(); //2.获取session中的用户 Object user = session.getAttribute("user"); //3.判断用户是否存在 if(user == null){ //4.不存在,拦截,返回401状态码 response.setStatus(401); return false; } //5.存在,保存用户信息到Threadlocal UserHolder.saveUser((User)user); //6.放行 return true; } }@Configuration public class MvcConfig implements WebMvcConfigurer { @Resource private StringRedisTemplate stringRedisTemplate; @Override public void addInterceptors(InterceptorRegistry registry) { // 登录拦截器 registry.addInterceptor(new LoginInterceptor()) .excludePathPatterns( "/shop/**", "/voucher/**", "/shop-type/**", "/upload/**", "/blog/hot", "/user/code", "/user/login" ).order(1); // token刷新的拦截器 registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate)).addPathPatterns("/**").order(0); } }
隐藏敏感信息
隐藏用户敏感信息
创建VO或TO,无敏感信息进行返回
登录时返回vo对象
session.setAttribute("user", BeanUtils.copyProperties(user,UserDTO.class))拦截器中vo保存到ThreadLocal中
UserHolder.saveUser((UserDTO) user)在UserHolder处:将user对象换成UserDTO
public class UserHolder { private static final ThreadLocal<UserDTO> tl = new ThreadLocal<>(); public static void saveUser(UserDTO user){ tl.set(user); } public static UserDTO getUser(){ return tl.get(); } public static void removeUser(){ tl.remove(); } }
redis实现
- session的问题:分布式下的tomcat中的session不共享,这台机器有,换台机器就没了,登录拦截存在问题,可以拷贝session到多台服务器
- 每台都有,浪费,服务器压力大
- 拷贝存在延迟
- 数据类型的选择
- string:更直观,但是每次增删改查都是完整大对象,性能不好,数据量小时可以考虑
- hash:value是键值对,更新时只需要更新对应的属性即可,性能好

隐藏敏感信息
- 敏感信息保护
- 以手机号为key容易暴露隐私
- 随机生成以一个token令牌更合适
发送、登录、注册
发送验证码
- 校验手机号格式
RegexUtils.isPhoneInvalid(phone) - 生成验证码
RandomUtil.randomNumbers(6) - 保存到redis
stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY + phone) - 发送验证码
调用第三方api
- 校验手机号格式
登录/注册
校验手机号
从redis获取验证码
stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY + phone)校验验证码
根据手机号
查库不存在,根据手机号创建用户
存在,保存到redis
// 7.1.随机生成token,作为登录令牌 String token = UUID.randomUUID().toString(true); // 7.2.将User对象转为HashMap存储 UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class); Map<String, Object> userMap = BeanUtil.beanToMap(userDTO, new HashMap<>(), CopyOptions.create() .setIgnoreNullValue(true) .setFieldValueEditor((fieldName, fieldValue) -> fieldValue.toString())); // 7.3.存储 String tokenKey = LOGIN_USER_KEY + token; stringRedisTemplate.opsForHash().putAll(tokenKey, userMap); // 7.4.设置token有效期 stringRedisTemplate.expire(tokenKey, LOGIN_USER_TTL, TimeUnit.MINUTES);
拦截器
拦截器

image-20221113223545624 问题:只拦截登录路径,其他路径不拦截,导致token有效期不能刷新
解决:再加一个拦截器

image-20221113223729851 public class RefreshTokenInterceptor implements HandlerInterceptor { private StringRedisTemplate stringRedisTemplate; public RefreshTokenInterceptor(StringRedisTemplate stringRedisTemplate) { this.stringRedisTemplate = stringRedisTemplate; } @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { // 1.获取请求头中的token String token = request.getHeader("authorization"); if (StrUtil.isBlank(token)) { return true; } // 2.基于TOKEN获取redis中的用户 String key = LOGIN_USER_KEY + token; Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(key); // 3.判断用户是否存在 if (userMap.isEmpty()) { return true; } // 5.将查询到的hash数据转为UserDTO UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false); // 6.存在,保存用户信息到 ThreadLocal UserHolder.saveUser(userDTO); // 7.刷新token有效期 stringRedisTemplate.expire(key, LOGIN_USER_TTL, TimeUnit.MINUTES); // 8.放行 return true; } @Override public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception { // 移除用户 UserHolder.removeUser(); } }public class LoginInterceptor implements HandlerInterceptor { @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { // 1.判断是否需要拦截(ThreadLocal中是否有用户) if (UserHolder.getUser() == null) { // 没有,需要拦截,设置状态码 response.setStatus(401); // 拦截 return false; } // 有用户,则放行 return true; } }
秒杀
分布式ID
优惠券的id
需要全局唯一
若使用订单的自增id存在问题:
- 规律明显,容易猜出敏感信息,如订单量
- 受单表数据量限制,数据量大需要拆表,id可能一样,但逻辑上还是一张表,必须保证id唯一
全局ID生成器,是一种在分布式系统下用来生成全局唯一ID的工具
常见:UUID、雪花算法、redis自增ID等
redis自增ID一般要满足下列特性
- 唯一性
- 递增
- 安全
- 高可用
- 高性能
redis自增ID组成
- 符号位:1bit,永远为0
- 时间戳:31bit,以秒为单位,可以使用69年
- 序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID
@Component public class RedisIdWorker { /** * 开始时间戳 */ private static final long BEGIN_TIMESTAMP = 1640995200L; /** * 序列号的位数 */ private static final int COUNT_BITS = 32; private StringRedisTemplate stringRedisTemplate; public RedisIdWorker(StringRedisTemplate stringRedisTemplate) { this.stringRedisTemplate = stringRedisTemplate; } public long nextId(String keyPrefix) { // 1.生成时间戳 LocalDateTime now = LocalDateTime.now(); long nowSecond = now.toEpochSecond(ZoneOffset.UTC); long timestamp = nowSecond - BEGIN_TIMESTAMP; // 2.生成序列号 // 2.1.获取当前日期,精确到天 String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd")); // 2.2.自增长 long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date); // 3.拼接并返回 return timestamp << COUNT_BITS | count; } }redis自增ID测试
@Test void testIdWorker() throws InterruptedException { CountDownLatch latch = new CountDownLatch(300); Runnable task = () -> { for (int i = 0; i < 100; i++) { long id = redisIdWorker.nextId("order"); System.out.println("id = " + id); } latch.countDown(); }; long begin = System.currentTimeMillis(); for (int i = 0; i < 300; i++) { es.submit(task); } latch.await(); long end = System.currentTimeMillis(); System.out.println("time = " + (end - begin)); }CountDownLatch名为信号枪:主要的作用是同步协调在多线程的等待于唤醒问题
当异步程序没有执行完时,主线程已经执行完,我们期望的是分线程全部走完,主线程再走
CountDownLatch 中有两个最重要的方法
- countDown
- await
await 方法 是阻塞方法,我们担心分线程没有执行完时,main线程就先执行,所以使用await可以让main线程阻塞
当CountDownLatch 内部维护的 变量变为0时,就不再阻塞,直接放行
- 调用一次countDown ,内部变量就减少1,我们让分线程和变量绑定, 执行完一个分线程就减少一个变量,直到解除阻塞,统计出来的时间也就是所有分线程执行完后的时间
秒杀逻辑
- 查询优惠券
- 判断是否具备秒杀条件
- 秒杀是否开始
- 秒杀是否结束
- 库存是否充足
- 一人一券一单:查库:是否用券下过单(存在并发问题)
- 扣减库存(存在并发问题)
- 创建订单(订单id,用户id,优惠券id)(存在并发问题)
并发问题
乐观锁比较适合更新数据,而插入数据适合使用悲观锁
扣减库存 --- 乐观锁
每次扣减的sql加上库存的判断条件
- stock = xxx.getStock()
- 但是同时进来的其他线程就永远无法满足上述条件,因为库存已经比getStock少一个
- 故,只要 stock > 0 即可
cas自旋压力过大
AtomicLong中cas自旋压力过大,销毁cpu
java8的改进,LongAdder
大量线程并发更新一个原子性的时候,天然的问题就是自旋,会导致并发性问题,当然这也比我们直接使用syn来的好
LongAdder来进行优化,如果获取某个值,则会对cell和base的值进行递增,最后返回一个完整的值

image-20221113232703291
一人一券一单(恶意刷单) --- 悲观锁
锁
降低锁粒度,不能使用同步方法或this锁,只需要锁住当前用户
userId为保证是同一把锁,每个线程进来的userId不是同一个对象,所以
userId.toString()但包装类toString()是new String(),仍不是同一个对象,故
userId.toString().intern()@Transactional public Result createVoucherOrder(Long voucherId) { Long userId = UserHolder.getUser().getId(); synchronized(userId.toString().intern()){ // 5.1.查询订单 int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count(); // 5.2.判断是否存在 if (count > 0) { // 用户已经购买过了 return Result.fail("用户已经购买过一次!"); } // 6.扣减库存 boolean success = seckillVoucherService.update() .setSql("stock = stock - 1") // set stock = stock - 1 .eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0 .update(); if (!success) { // 扣减失败 return Result.fail("库存不足!"); } // 7.创建订单 VoucherOrder voucherOrder = new VoucherOrder(); // 7.1.订单id long orderId = redisIdWorker.nextId("order"); voucherOrder.setId(orderId); // 7.2.用户id voucherOrder.setUserId(userId); // 7.3.代金券id voucherOrder.setVoucherId(voucherId); save(voucherOrder); // 7.返回订单id return Result.ok(orderId); } }
事务与锁
@Transactional方法被spring的事务控制,
同步代码块执行完,锁释放
但当前方法事务未提交
新的线程拿到锁,造成并发
Long userId = UserHolder.getUser().getId(); synchronized(userId.toString().intern()){ return this.createVoucherOrder(voucherId); }
事务
spring的事务是AOP实现的,也就是代理对象处理的事务,非代理对象执行事务方法不生效
this.的方式调用的事务不生效,需要利用代理对象调用来生效
Long userId = UserHolder.getUser().getId(); synchronized(userId.toString().intern()){ // 获取代理对象 IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy(); return proxy.createVoucherOrder(voucherId); }
集群下的并发问题 --- 分布式锁
public interface ILock{ boolean tryLock(long timeoutSec); void unlock(); }public class SimpleRedisLock implements ILock{ private static final String KEY_PREFIX="lock:" @Override public boolean tryLock(long timeoutSec) { // 获取线程标示 String threadId = Thread.currentThread().getId() // 获取锁 Boolean success = stringRedisTemplate.opsForValue() .setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS); return Boolean.TRUE.equals(success); } public void unlock() { //通过del删除锁 stringRedisTemplate.delete(KEY_PREFIX + name); } }//创建锁对象(新增代码) SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate); //获取锁对象 boolean isLock = lock.tryLock(1200); //加锁失败 if (!isLock) { return Result.fail("不允许重复下单"); } try { //获取代理对象(事务) IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); return proxy.createVoucherOrder(voucherId); } finally { //释放锁 lock.unlock(); }加锁的原子性问题(死锁)(上述代码不存在)
锁误删
- 好多线程都在等锁释放
- 但是
线程1没有手动释放,到期后自动释放 线程2拿到锁线程1开始手动释放删除锁,误把线程2的删除了
解决方案:在占锁时加上或者设为UUID值。删除时进行判断
删除的原子性问题
自动续期问题 --- Redisson
//获取锁对象 RLock lock = redissonClient.getLock("lock:order:" + userId); boolean isLock = lock.tryLock(); //看门狗时间30s //加锁失败 if (!isLock) { return Result.fail("不允许重复下单"); } try { //获取代理对象(事务) IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); return proxy.createVoucherOrder(voucherId); } finally { //释放锁 lock.unlock(); }
优化
- 将耗时比较短的逻辑判断放入到redis中,比如是否库存足够,比如是否一人一单
- 这样的操作,只要这种逻辑可以完成,就意味着我们是一定可以下单完成的
- 我们只需要进行快速的逻辑判断,根本就不用等下单逻辑走完,我们直接给用户返回成功,
- 再在后台开一个线程,后台线程慢慢的去执行queue里边的消息,而且也不用担心线程池消耗殆尽的问题,因为这里我们的程序中并没有手动使用任何线程池
- 快速校验一人一单,还有库存判断
- 库存放到redis查
- 一人一单放到redis的set集合(用户id、优惠券id)
- redis没有订单,把订单信息放到queue中,异步完成订单
- 下单成功,获取订单id,返回给前端
Redis完成秒杀资格判断
新增秒杀优惠券的同时,将优惠券信息保存到Redis中
基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
-- 1.参数列表 -- 1.1.优惠券id local voucherId = ARGV[1] -- 1.2.用户id local userId = ARGV[2] -- 1.3.订单id local orderId = ARGV[3] -- 2.数据key -- 2.1.库存key local stockKey = 'seckill:stock:' .. voucherId -- 2.2.订单key local orderKey = 'seckill:order:' .. voucherId -- 3.脚本业务 -- 3.1.判断库存是否充足 get stockKey if(tonumber(redis.call('get', stockKey)) <= 0) then -- 3.2.库存不足,返回1 return 1 end -- 3.2.判断用户是否下单 SISMEMBER orderKey userId if(redis.call('sismember', orderKey, userId) == 1) then -- 3.3.存在,说明是重复下单,返回2 return 2 end -- 3.4.扣库存 incrby stockKey -1 redis.call('incrby', stockKey, -1) -- 3.5.下单(保存用户)sadd orderKey userId redis.call('sadd', orderKey, userId) -- 3.6.发送消息到队列中, XADD stream.orders * k1 v1 k2 v2 ... redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId) return 0如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
@Override public Result seckillVoucher(Long voucherId) { //获取用户 Long userId = UserHolder.getUser().getId(); long orderId = redisIdWorker.nextId("order"); // 1.执行lua脚本 Long result = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString(), String.valueOf(orderId) ); int r = result.intValue(); // 2.判断结果是否为0 if (r != 0) { // 2.1.不为0 ,代表没有购买资格 return Result.fail(r == 1 ? "库存不足" : "不能重复下单"); } //TODO 保存阻塞队列 // 3.返回订单id return Result.ok(orderId); }
基于阻塞队列实现秒杀优化
开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能
实现类完整代码
//异步处理线程池 private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor(); // 消息队列 private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024); // 代理对象 private IVoucherOrderService proxy; //在类初始化之后执行,因为当这个类初始化好了之后,随时都是有可能要执行的 @PostConstruct private void init() { SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler()); } // 业务逻辑主体 @Override public Result seckillVoucher(Long voucherId) { Long userId = UserHolder.getUser().getId(); long orderId = redisIdWorker.nextId("order"); // 1.执行lua脚本 Long result = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString(), String.valueOf(orderId) ); int r = result.intValue(); // 2.判断结果是否为0 if (r != 0) { // 2.1.不为0 ,代表没有购买资格 return Result.fail(r == 1 ? "库存不足" : "不能重复下单"); } VoucherOrder voucherOrder = new VoucherOrder(); // 2.3.订单id long orderId = redisIdWorker.nextId("order"); voucherOrder.setId(orderId); // 2.4.用户id voucherOrder.setUserId(userId); // 2.5.代金券id voucherOrder.setVoucherId(voucherId); // 2.6.放入阻塞队列 orderTasks.add(voucherOrder); //3.获取代理对象 proxy = (IVoucherOrderService) AopContext.currentProxy(); //4.返回订单id return Result.ok(orderId); } @Transactional public void createVoucherOrder(VoucherOrder voucherOrder) { Long userId = voucherOrder.getUserId(); // 5.1.查询订单 int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count( // 5.2.判断是否存在 if (count > 0) { // 用户已经购买过了 log.error("用户已经购买过了"); return; } // 6.扣减库存 boolean success = seckillVoucherService.update() .setSql("stock = stock - 1") // set stock = stock - 1 .eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0) // where id = ? and stoc .update(); if (!success) { // 扣减失败 log.error("库存不足"); return; } save(voucherOrder); } // 用于线程池处理的任务 // 当初始化完毕后,就会去从对列中去拿信息 private class VoucherOrderHandler implements Runnable { @Override public void run() { while (true) { try { // 1.获取队列中的订单信息 VoucherOrder voucherOrder = orderTasks.take(); // 2.创建订单 handleVoucherOrder(voucherOrder); } catch (Exception e) { log.error("处理订单异常", e); } } } // 订单放入redis private void handleVoucherOrder(VoucherOrder voucherOrder) { //1.获取用户 Long userId = voucherOrder.getUserId(); // 2.创建锁对象 RLock redisLock = redissonClient.getLock("lock:order:" + userId); // 3.尝试获取锁 boolean isLock = redisLock.lock(); // 4.判断是否获得锁成功 if (!isLock) { // 获取锁失败,直接返回失败或者重试 log.error("不允许重复下单!"); return; } try { //注意:由于是spring的事务是放在threadLocal中,此时的是多线程,事务会失效 proxy.createVoucherOrder(voucherOrder); } finally { // 释放锁 redisLock.unlock(); } } }问题
- 内存限制
- 数据安全
基于Redis的Stream结构作为消息队列,实现异步秒杀下单
- 创建一个Stream类型的消息队列,名为stream.orders
- 修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId
- 项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单
修改lua表达式,新增3.6

image-20221114150749099 实现类
private class VoucherOrderHandler implements Runnable { @Override public void run() { while (true) { try { // 1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 > List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read( Consumer.from("g1", "c1"), StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)), StreamOffset.create("stream.orders", ReadOffset.lastConsumed()) ); // 2.判断订单信息是否为空 if (list == null || list.isEmpty()) { // 如果为null,说明没有消息,继续下一次循环 continue; } // 解析数据 MapRecord<String, Object, Object> record = list.get(0); Map<Object, Object> value = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true); // 3.创建订单 createVoucherOrder(voucherOrder); // 4.确认消息 XACK stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId()); } catch (Exception e) { log.error("处理订单异常", e); //处理异常消息 handlePendingList(); } } } private void handlePendingList() { while (true) { try { // 1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 0 List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read( Consumer.from("g1", "c1"), StreamReadOptions.empty().count(1), StreamOffset.create("stream.orders", ReadOffset.from("0")) ); // 2.判断订单信息是否为空 if (list == null || list.isEmpty()) { // 如果为null,说明没有异常消息,结束循环 break; } // 解析数据 MapRecord<String, Object, Object> record = list.get(0); Map<Object, Object> value = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true); // 3.创建订单 createVoucherOrder(voucherOrder); // 4.确认消息 XACK stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId()); } catch (Exception e) { log.error("处理pendding订单异常", e); try{ Thread.sleep(20); }catch(Exception e){ e.printStackTrace(); } } } } }
消息队列
基于List实现消息队列
Redis的list数据结构是一个双向链表,很容易模拟出队列效果
队列是入口和出口不在一边,因此我们可以利用:LPUSH 结合 RPOP、或者 RPUSH 结合 LPOP来实现
不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息,因此这里应该使用BRPOP或者BLPOP来实现阻塞效果
blpop key timeout #取一个,设置超时时间,临时阻塞
brpop key timeout
优点:
- 利用Redis存储,不受限于JVM内存上限
- 基于Redis的持久化机制,数据安全性有保证
- 可以满足消息有序性
缺点:
- 无法避免消息丢失
- 只支持单消费者
基于PubSub的消息队列
PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。
SUBSCRIBE channel [channel] :订阅一个或多个频道 PUBLISH channel msg :向一个频道发送消息 PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的所有频道
SUBSCRIBE channel [channel] #订阅一个或多个频道
PUBLISH channel msg #向一个频道发送消息
PSUBSCRIBE pattern[pattern] #订阅与pattern格式匹配的所有频道
优点:
- 采用发布订阅模型,支持多生产、多消费
缺点:
- 不支持数据持久化
- 无法避免消息丢失
- 消息堆积有上限,超出时数据丢失
基于Stream的消息队列
Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。
发送消息的命令:


读取消息的方式之一:XREAD


XREAD阻塞方式,读取最新的消息:

在业务开发中,我们可以循环的调用XREAD阻塞方式来查询最新消息,从而实现持续监听队列的效果,伪代码如下

注意:当我们指定起始ID为$时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题
STREAM类型消息队列的XREAD命令特点:
- 消息可回溯
- 一个消息可以被多个消费者读取
- 可以阻塞读取
- 有消息漏读的风险
基于Stream的消息队列-消费者组
消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:

创建消费者组:

key:队列名称 groupName:消费者组名称 ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息 MKSTREAM:队列不存在时自动创建队列 其它常见命令:
删除指定的消费者组
XGROUP DESTORY key groupName
给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupname consumername
删除消费者组中的指定消费者
XGROUP DELCONSUMER key groupname consumername
从消费者组读取消息:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
- group:消费组名称
- consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
- count:本次查询的最大数量
- BLOCK milliseconds:当没有消息时最长等待时间
- NOACK:无需手动ACK,获取到消息后自动确认
- STREAMS key:指定队列名称
- ID:获取消息的起始ID:
">":从下一个未消费的消息开始 其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始
消费者监听消息的基本思路:

STREAM类型消息队列的XREADGROUP命令特点:
- 消息可回溯
- 可以多消费者争抢消息,加快消费速度
- 可以阻塞读取
- 没有消息漏读的风险
- 有消息确认机制,保证消息至少被消费一次
最后我们来个小对比

排行榜
点赞
实现点赞,用到set的不可重复
需求:
- 同一个用户只能点赞一次,再次点击则取消点赞
- 如果当前用户已经点赞,则点赞按钮高亮显示(前端已实现,判断字段Blog类的isLike属性)
修改查询业务实现:
给Blog类中添加一个isLike字段,标示是否被当前用户点赞
@TableField(exist = false) private Boolean isLike;查询blog时设置isLike
- 获取登录用户
- 判断是否点过赞
stringRedisTemplate.opsForSet().isMember(key, userId.toString())- 未点赞,setIsLike(false)
- 已点赞,setIsLike(true)
点赞业务实现:
- 获取登录用户
- 判断是否点过赞
stringRedisTemplate.opsForSet().isMember(key, userId.toString())- 未点赞,则可以点赞(更新数据库,赞数+1,用户id放入redis的set中)
- 已点赞,则取消点赞(更新数据库,赞数-1,用户id在redis中移除)
点赞排行
实现排序,用到zset即sortedSet
修改点赞业务实现:
- 获取登录用户
- 判断是否点过赞
stringRedisTemplate.opsForZSet().score(key, userId.toString())- 未点赞,则可以点赞(更新数据库,赞数+1,用户id放入redis的set中)
- 已点赞,则取消点赞(更新数据库,赞数-1,用户id在redis中移除)
查询top5
- 查询top5的点赞用户 zrange key 0 4
stringRedisTemplate.opsForZSet().range(key, 0, 4)- 没有,返回空集合
Collections.emptyList() - 有,拿到id,查库
- 没有,返回空集合
关注
关注 - 取关
关注业务
- 获取登录用户
- 判断到底是关注还是取关
- 关注 --- 更新数据库
- 取关 --- 更新数据库
查询业务
- 获取登录用户
- 查询是否关注count > 0
共同关注
使用set中交并补的api
关注业务
- 获取登录用户
- 判断到底是关注还是取关
- 关注 --- 更新数据库,放入redis的set
stringRedisTemplate.opsForSet().add(key, followUserId.toString()) - 取关 --- 更新数据库,删除redis
stringRedisTemplate.opsForSet().remove(key, followUserId.toString())
- 关注 --- 更新数据库,放入redis的set
共同关注
- 获取当前用户
- 求交集
stringRedisTemplate.opsForSet().intersect(key, key2)- 无交集,返回空集合
Collections.emptyList() - 有交集,拿到id集合查库
- 无交集,返回空集合
推送
Feed流推送
传统:通过查询被动呈现
Feed:主动推送(投喂)
常见模式
Timeline:不做内容筛选,简单的按照内容发布时间排序,常用于好友或关注。例如朋友圈
- 优点:信息全面,不会有缺失。并且实现也相对简单
- 缺点:信息噪音较多,用户不一定感兴趣,内容获取效率低
智能排序:利用智能算法屏蔽掉违规的、用户不感兴趣的内容。推送用户感兴趣信息来吸引用户
- 优点:投喂用户感兴趣信息,用户粘度很高,容易沉迷
- 缺点:如果算法不精准,可能起到反作用
Timeline实现方案:
拉模式:也叫做读扩散
博主将消息存放在自己发件箱,等粉丝打开收件箱读取时,向每个博主发件箱拉取消息,进行排序
优点:比较节约空间,因为粉丝读信息时,并没有重复读取,读取完之后可以清除收件箱
缺点:比较延迟,当用户读取数据时才去关注的人里边去读取数据,假设用户关注了大量的用户,那么此时就会拉取海量的内容,对服务器压力巨大
推模式:也叫做写扩散。
博主直接推送到粉丝收件箱
优点:时效快,不用临时拉取
缺点:内存压力大,假设一个大V写信息,很多人关注他, 就会写很多分数据到粉丝那边去
推拉结合模式:也叫做读写混合,兼具推和拉两种模式的优点。
推拉模式是一个折中的方案,站在博主一端
- 如果是个普通的人,那么我们采用写扩散的方式,直接把数据写入到他的粉丝中去,因为普通的人他的粉丝关注量比较小,所以这样做没有压力
- 如果是大V,那么他是直接将数据先写入到一份到发件箱里边去
- 对于活跃粉丝,直接发送
- 不活跃粉丝,等待拉取
站在收件人这端
- 如果是活跃粉丝,大V和普通博主发的都会直接写入到自己收件箱里
- 如果是普通粉丝,由于他们上线不是很频繁,所以等他们上线时,再从发件箱里边去拉信息
需求:
- 修改新增探店笔记的业务,在保存blog到数据库的同时,推送到粉丝的收件箱
- 收件箱满足可以根据时间戳排序,必须用Redis的数据结构实现
- 查询收件箱数据时,可以实现分页查询
传统分页失效:Feed流中的数据会不断更新,所以数据的角标也在变化,因此不能采用传统的分页模式。

Feed流的滚动分页:需要记录每次操作的最后一条,然后从这个位置开始去读取数据

修改新增blog业务
- 获取登录用户
- 保存blog
- 查询所有粉丝
- 推送笔记id给所有粉丝
stringRedisTemplate.opsForZSet().add(key, blog.getId().toString(), System.currentTimeMillis())
分页查询业务
每次查询完成后,需要得到数据的最小时间戳,这个值会作为下一次查询的条件
下次查询需要找到与上一次查询相同的查询个数,作为偏移量,跳过这些查询过的数据
综上:我们的请求参数中就需要携带上一次查询的最小时间戳( lastId)、偏移量(offset)这两个参数
这两个参数第一次会由前端来指定,以后的查询就根据后台结果作为条件,再次传递到后台
获取当前用户
查询收件箱
Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(key, 0, lastId, offset, 2)非空判断 --- null直接返回
解析blogId、minTime(时间戳)、offset
List<Long> ids = new ArrayList<>(typedTuples.size()); long minTime = 0; // 最小时间戳. int os = 1; // 相同时间的偏移量 for (ZSetOperations.TypedTuple<String> tuple : typedTuples) { // 4.1.获取id,放入集合,用于查询blog内容 ids.add(Long.valueOf(tuple.getValue())); // 4.2.获取分数(时间戳) long time = tuple.getScore().longValue(); if(time == minTime){ os++; // 相同时间戳,os++ }else{ // 时间戳不同,表示当前时间戳的blog全部浏览完,重置最小时间戳和相同时间戳偏移量 minTime = time; os = 1; } }根据id查询blog
- 查询blog有关的用户 --- blog内容包含用户信息(需要加排序)
- 查询blog是否被点赞
封装并返回
ScrollResult r = new ScrollResult(); r.setList(blogs); // blog集合 r.setOffset(os); // 偏移量 r.setMinTime(minTime); // 最小时间戳 return Result.ok(r);
互相关注
MVP技术方案V0
mvp:最小可用版本
建模:互关功能比较简单,一般来说,除了用户主表,只需要一张中间表来存储用户之间的关系即可
- 中间表:follower(id,from_user_id(粉丝),to_user_id(博主))
接口逻辑
关注某人:在follower中添加一条from_user_id为当前登录用户,to_user_id为被关注者id
取关某人:将follower中from_user_id为当前登录用户,并且to_user_id为某人Id的数据删除,或逻辑删除
粉丝列表:查询to_user_id为某用户Id的所有用户Id,去user表查询用户缩略信息,比如头像,昵称等
展示是否互关,根据博主id查询粉丝id,再将粉丝id作为to_user_id查from_user_id,然后进行一一匹配,可以匹配上的,可以认为是已经互关,在Vo层展示的时候可以加一个是否互关字段,用于展示是否互关
我的关注:查询from_user_id为当前登录用户的所有用户id,然后去user表查询用户缩略信息
同理,展示是否互关,还需要再查询to_user_id为当前登录用户的所有用户id,然后进行一一匹配
关注数量、粉丝数量:查询from_user_id等于当前登录用户的数量、from_user_id等于当前登录用户的数量
部署方案:MVP部署方案,旨在首先将功能跑通,然后再在此基础上进行部署架构的优化
依赖组件:
nginx:代理页面静态资源,和api
tomcat:部署应用
mysql:持久化数据
不足
- 主键使用的数据库自带的id,这个无法应对分库分表的场景
- 查询互关的时候,需要查两遍库,增加IO次数,并且需要匹配,也会降低接口性能
- 获取用户粉丝数和关注数时,使用count的方法,在数据量级上来之后,还是会导致性能问题
- follower表的数据是基本成指数级增长,比如,如果系统有1W的用户量,最糟糕的情况下,可能会产生小2亿的数据(不过一般达不到),但随着用户量的增加,这个表数据量会非常惊人
- MVP版本部署,首先系统的吞吐量会比较低,其次,服务是处于一个无法容灾的状态,完全没有高可用
改进V1
- 对应MVP的解决方案
- 增加uid(分布式ID),作为业务主键,方便后续分库分表
- 在follower表中增加is_mutual_follwer标识是否互关,在关注接口获取,取关接口对此值进行修改,可以提升查询时的速度。因为正常情况下都是读多写少
- 增加user_follower_info用来保存用户的关注数和粉丝数,避免使用count(*)导致性能问题
- 对follower表进行拆分,拆成follwer_from表和follower_to表,分别表示用户关注和用户粉丝,方便后续对表水平拆分
- 冷热分离,为了进一步对表进行拆分,减少对大表的检索,将follwer_from表和follower_to表,分别拆分为follwer_from_hot表、follwer_from_cold表、follower_to_hot表和follwer_to_cold表,用来存储热点数据,按关注的先后顺序区分热点数据,在follwer_from_hot表,只保存每个人用户前100的数据(或者业务自己定),当新关注其他人时,先检查数量是否大于100,然后如果大于则将最旧挪到follwer_from_cold表,然后再将新数据插入此表。follwer_to_hot同理。此优化主要针对,大部分情况下,用户都是只看前面关注数,不会深度查询,这样可以增加较快的查询效率,尤其针对一些大V账号,动不动都是百万甚至千万级别的粉丝,效果尤其显著
- 部署
- nginx提供负载均衡
- tomcat集群
- 数据库有MHA
- 不足
- 用户的基本信息、follwer_from_hot和follwer_to_hot、关注数和粉丝数缓存至redis
- 缓存一致性问题:加MQ
改进V2

不足
此部署架构,基本就是一个可以应对较高并发的架构了,但是像微博这样的并发量,此架构显然无法承载,因为,仅考虑国内用户的话,基本就分为了三大区域,
华南、华中、华北,而这三大区域的网络运营商是不一样的,所以,如果服务器部署在华北,很有可能造成华中和华南用户的体验不如华北,所以就需要多机房部署,应对不同地区的用户,当然,上面的nginx还是一台,这个显然也是无法满足的,所以,上SLB也是势在必行。因此,我们进一步优化
改进V3

至此,基本已经可以应对高并发了,如果数据量不断增加的话,可以通过使用Mycat或者sharding JDBC进行分库分表。增加分布式任务调度是为了保证缓存和数据库的一致性,因为不能单纯靠rocketmq去保证一致性,增加日志服务为了后续审计数据,增加监控服务,快速感知应用服务和中间件等服务的状态。当然,代码层面依然有优化的空间,比如,可以使用WebFlux和complatablefuture进行编程,将有所有方法改成异步,这样可以进一步提升系统的吞吐量。并且可以增加一级缓存,继续提升系统性能,采用mq进行一级缓存和redis直接数据同步
GEO数据类型
GEO就是Geolocation的简写形式,代表地理坐标。Redis在3.2版本中加入了对GEO的支持,允许存储地理坐标信息,帮助我们根据经纬度来检索数据。常见的命令有:
它只是一个zset结构
- GEOADD:添加一个地理空间信息,包含:经度(longitude)、纬度(latitude)、值(member)
- GEODIST:计算指定的两个点之间的距离并返回
- GEOHASH:将指定member的坐标转为hash字符串形式并返回
- GEOPOS:返回指定member的坐标
- GEORADIUS:指定圆心、半径,找到该圆内包含的所有member,并按照与圆心之间的距离排序后返回。6.以后已废弃
- GEOSEARCH:在指定范围内搜索member,并按照与指定点之间的距离排序后返回。范围可以是圆形或矩形。6.2.新功能
- GEOSEARCHSTORE:与GEOSEARCH功能一致,不过可以把结果存储到一个指定的key。 6.2.新功能
附近
api
- 请求方式 GET
- 请求路径 /shop/of/type
- 请求参数
- typeId:商户类型
- current:滚动查询页码
- x:经度
- y:纬度
- 返回值 List
<Shop>
导入Redis
将数据库表中的数据导入到redis中
- redis中的GEO,GEO在redis中就一个menber和一个经纬度
- 把x和y轴传入到redis做的经纬度位置去,menber只存id(内存有限)
根据type筛选
redis中并没有存储type,所以我们无法根据type来对数据进行筛选
按照商户类型做分组,类型相同的商户作为同一组,以typeId为key存入同一个GEO集合中即可
如:
# typeId x y shopId geoadd huoguo 116.25 34.44 haidilao geoadd huoguo 116.25 34.44 xiapuxiapu geoadd xiaochi 116.25 34.44 mixian geoadd xiaochi 116.25 34.44 shaokao
查询店铺信息
把店铺分组,按照typeId分组,typeId一致的放到一个集合
分批完成写入Redis
- 获取类型typeId
- 获取同类型的店铺的集合
- 写入redis GEOADD key 经度 纬度 member
@Test void loadShopData() { // 1.查询店铺信息 List<Shop> list = shopService.list(); // 2.把店铺分组,按照typeId分组,typeId一致的放到一个集合 Map<Long, List<Shop>> map = list.stream().collect(Collectors.groupingBy(Shop::getTypeId)); // 3.分批完成写入Redis for (Map.Entry<Long, List<Shop>> entry : map.entrySet()) { // 3.1.获取类型id Long typeId = entry.getKey(); String key = SHOP_GEO_KEY + typeId; // 3.2.获取同类型的店铺的集合 List<Shop> value = entry.getValue(); List<RedisGeoCommands.GeoLocation<String>> locations = new ArrayList<>(value.size()); // 3.3.写入redis GEOADD key 经度 纬度 member for (Shop shop : value) { // stringRedisTemplate.opsForGeo().add(key, new Point(shop.getX(), shop.getY()), shop.getId().toString()); locations.add(new RedisGeoCommands.GeoLocation<>( shop.getId().toString(), new Point(shop.getX(), shop.getY()) )); } stringRedisTemplate.opsForGeo().add(key, locations); } }
查询附近业务
判断是否需要根据坐标查询
@Override public Result queryShopByType(Integer typeId, Integer current, Double x, Double y) { // 1.判断是否需要根据坐标查询 if (x == null || y == null) { // 不需要坐标查询,按数据库查询 Page<Shop> page = query() .eq("type_id", typeId) .page(new Page<>(current, SystemConstants.DEFAULT_PAGE_SIZE)); // 返回数据 return Result.ok(page.getRecords()); } // 2.计算分页参数 int from = (current - 1) * SystemConstants.DEFAULT_PAGE_SIZE; int end = current * SystemConstants.DEFAULT_PAGE_SIZE; // 3.查询redis、按照距离排序、分页。结果:shopId、distance String key = SHOP_GEO_KEY + typeId; GeoResults<RedisGeoCommands.GeoLocation<String>> results = stringRedisTemplate.opsForGeo() // GEOSEARCH key BYLONLAT x y BYRADIUS 10 WITHDISTANCE .search( key, GeoReference.fromCoordinate(x, y), new Distance(5000), RedisGeoCommands.GeoSearchCommandArgs.newGeoSearchArgs().includeDistance().limit(end) ); // 4.解析出id if (results == null) { return Result.ok(Collections.emptyList()); } List<GeoResult<RedisGeoCommands.GeoLocation<String>>> list = results.getContent(); if (list.size() <= from) { // 没有下一页了,结束 return Result.ok(Collections.emptyList()); } // 4.1.截取 from ~ end的部分 List<Long> ids = new ArrayList<>(list.size()); Map<String, Distance> distanceMap = new HashMap<>(list.size()); list.stream().skip(from).forEach(result -> { // 4.2.获取店铺id String shopIdStr = result.getContent().getName(); ids.add(Long.valueOf(shopIdStr)); // 4.3.获取距离 Distance distance = result.getDistance(); distanceMap.put(shopIdStr, distance); }); // 5.根据id查询Shop String idStr = StrUtil.join(",", ids); List<Shop> shops = query().in("id", ids).last("ORDER BY FIELD(id," + idStr + ")").list(); for (Shop shop : shops) { shop.setDistance(distanceMap.get(shop.getId().toString()).getValue()); } // 6.返回 return Result.ok(shops); }
签到
数据库太浪费,而且并发下,数据库压力大
位图:把每一个bit位对应当月的每一天,形成了映射关系。用0和1标示业务状态
Redis中是利用string类型数据结构实现BitMap,因此最大上限是512M,转换为bit则是 2^32个bit位
BitMap的操作命令有:
- SETBIT:向指定位置(offset)存入一个0或1
- GETBIT :获取指定位置(offset)的bit值
- BITCOUNT :统计BitMap中值为1的bit位的数量
- BITFIELD :操作(查询、修改、自增)BitMap中bit数组中的指定位置(offset)的值
- BITFIELD_RO :获取BitMap中bit数组,并以十进制形式返回
- BITOP :将多个BitMap的结果做位运算(与 、或、异或)
- BITPOS :查找bit数组中指定范围内第一个0或1出现的位置
用户签到
获取当前登录用户
获取日期
拼接key
获取今天是本月的第几天
写入Redis SETBIT key offset 1
@Override public Result sign() { // 1.获取当前登录用户 Long userId = UserHolder.getUser().getId(); // 2.获取日期 LocalDateTime now = LocalDateTime.now(); // 3.拼接key String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM")); String key = USER_SIGN_KEY + userId + keySuffix; // 4.获取今天是本月的第几天 int dayOfMonth = now.getDayOfMonth(); // 5.写入Redis SETBIT key offset 1 stringRedisTemplate.opsForValue().setBit(key, dayOfMonth - 1, true); return Result.ok(); }
签到统计
**问题1:**什么叫做连续签到天数?
从最后一次签到开始向前统计,直到遇到第一次未签到为止,计算总的签到次数,就是连续签到天数
Java逻辑代码:获得当前这个月的最后一次签到数据,定义一个计数器,然后不停的向前统计,直到获得第一个非0的数字即可,每得到一个非0的数字计数器+1,直到遍历完所有的数据,就可以获得当前月的签到总天数了
**问题2:**如何得到本月到今天为止的所有签到数据?
BITFIELD key GET u[dayOfMonth] 0
假设今天是10号,那么我们就可以从当前月的第一天开始,获得到当前这一天的位数,是10号,那么就是10位,去拿这段时间的数据,就能拿到所有的数据了,那么这10天里边签到了多少次呢?统计有多少个1即可
问题3:如何从后向前遍历每个bit位?
注意:bitMap返回的数据是10进制,假如说返回一个数字8,那么我哪儿知道到底哪些是0,哪些是1呢?我们只需要让得到的10进制数字和1做与运算就可以了,因为1只有遇见1 才是1,其他数字都是0 ,我们把签到结果和1进行与操作,每与一次,就把签到结果向右移动一位,依次内推,我们就能完成逐个遍历的效果了。
需求:统计当前用户截止当前时间在本月的连续签到天数
有用户有时间我们就可以组织出对应的key,此时就能找到这个用户截止这天的所有签到记录,再根据这套算法,就能统计出来他连续签到的次数了
@Override
public Result signCount() {
// 1.获取当前登录用户
Long userId = UserHolder.getUser().getId();
// 2.获取日期
LocalDateTime now = LocalDateTime.now();
// 3.拼接key
String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
String key = USER_SIGN_KEY + userId + keySuffix;
// 4.获取今天是本月的第几天
int dayOfMonth = now.getDayOfMonth();
// 5.获取本月截止今天为止的所有的签到记录,返回的是一个十进制的数字 BITFIELD sign:5:202203 GET u14 0
// bitField可以多个子命令同时查询,故需要集合封装返回结果,这里其实集合中就一个元素
List<Long> result = stringRedisTemplate.opsForValue().bitField(
key,
BitFieldSubCommands.create()
.get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth)).valueAt(0)
);
if (result == null || result.isEmpty()) {
// 没有任何签到结果
return Result.ok(0);
}
Long num = result.get(0);
if (num == null || num == 0) {
return Result.ok(0);
}
// 6.循环遍历
int count = 0;
while (true) {
// 6.1.让这个数字与1做与运算,得到数字的最后一个bit位 // 判断这个bit位是否为0
if ((num & 1) == 0) {
// 如果为0,说明未签到,结束
break;
}else {
// 如果不为0,说明已签到,计数器+1
count++;
}
// 把数字右移一位,抛弃最后一个bit位,继续下一个bit位
num >>>= 1;
}
return Result.ok(count);
}
UV统计
通常来说PV会比UV大很多,所以衡量同一个网站的访问量,我们需要综合考虑很多因素,所以我们只是单纯的把这两个值作为一个参考值
- UV:全称Unique Visitor,也叫独立访客量,是指通过互联网访问、浏览这个网页的自然人。1天内同一个用户多次访问该网站,只记录1次。
- PV:全称Page View,也叫页面访问量或点击量,用户每访问网站的一个页面,记录1次PV,用户多次打开页面,则记录多次PV。往往用来衡量网站的流量。
UV统计在服务端做会比较麻烦,因为要判断该用户是否已经统计过了,需要将统计过的用户信息保存
但是如果每个访问的用户都保存到Redis中,数据量会非常恐怖,那怎么处理呢?
HyperLogLog
天生唯一性,重复数据只记录一次
Hyperloglog(HLL)是从Loglog算法派生的概率算法,用于确定非常大的集合的基数,而不需要存储其所有值。
相关算法原理大家可以参考:https://juejin.cn/post/6844903785744056333#heading-0
Redis中的HLL是基于string结构实现的,单个HLL的内存永远小于16kb,内存占用低的令人发指!作为代价,其测量结果是概率性的,有小于0.81%的误差。不过对于UV统计来说,这完全可以忽略
指令

UV统计测试
测试思路:我们直接利用单元测试,向HyperLogLog中添加100万条数据,看看内存占用和统计效果如何

经过测试:我们会发生他的误差是在允许范围内,并且内存占用极小
