Redis-2-实战篇
短信登录
基于 Session 实现登录流程
发送短信验证码:
- 用户在提交手机号后,会校验手机号是否合法
- 如果不合法,则要求用户重新输入手机号
- 如果手机号合法,后台此时生成对应的验证码,同时将验证码进行保存到 session,然后再通过短信的方式将验证码发送给用户
短信验证码登录、注册:
- 用户将验证码和手机号进行输入,后台从 session 中拿到当前验证码,然后和用户输入的验证码进行校验
- 如果不一致,则无法通过校验
- 如果一致,则后台根据手机号查询用户
- 如果用户不存在,则为用户创建账号信息,保存到数据库
- 无论是否存在,都会将用户信息保存到 session 中,方便后续获得当前登录信息
校验登录状态:
- 用户在请求时候,会从 cookie 中携带者 JsessionId 到后台
- 后台通过 JsessionId 从 session 中拿到用户信息
- 如果没有 session 信息,则进行拦截
- 如果有 session 信息,则将用户信息保存到 threadLocal 中,并且放行
实现发送短信验证码功能
发送短信验证码
1 | public Result sendCode(String phone, HttpSession session) { |
短信验证码登录
1 | public Result login(LoginFormDTO loginForm, HttpSession session) { |
实现登录校验拦截器
- 当监听线程知道用户想要和 tomcat 连接时,那会由监听线程创建 socket 连接,socket 都是成对出现的,用户通过 socket 互相传递数据
- 当 tomcat 端的 socket 接收到数据后,此时监听线程会从 tomcat 的线程池中取出一个线程执行用户请求
- 在我们的服务部署到 tomcat 后,线程会找到用户想要访问的工程,然后用这个线程转发到工程中的 controller,service,dao 中,并且访问对应的 DB,在用户执行完请求后,再统一返回,再找到 tomcat 端的 socket,再将数据写回到用户端的 socket,完成请求和响应
可以得知每个用户其实对应都是去找 tomcat 线程池中的一个线程来完成工作的, 使用完成后再进行回收,既然每个请求都是独立的,所以在每个用户去访问我们的工程时,我们可以使用 threadlocal 来做到线程隔离,每个线程操作自己的一份数据
拦截器
1 | public class LoginInterceptor implements HandlerInterceptor { |
配置拦截器
1 |
|
隐藏用户敏感信息
在登录方法处修改
1 | // 7.保存用户信息到session中 |
在拦截器处:
1 | // 5.存在,保存用户信息到Threadlocal |
在UserHolder处:将user对象换成UserDTO
1 | public class UserHolder { |
Session 共享问题
核心思路分析:
- 每个 tomcat 中都有一份属于自己的 session
- 假设用户第一次访问第一台 tomcat,并且把自己的信息存放到第一台服务器的 session 中,但是第二次这个用户访问到了第二台 tomcat,那么在第二台服务器上,肯定没有第一台服务器存放的 session,所以此时整个登录拦截功能就会出现问题
- 早期的方案是 session 拷贝,就是说虽然每个 tomcat 上都有不同的 session,但是每当任意一台服务器的 session 修改时,都会同步给其他的 Tomcat 服务器的 session,这样的话,就可以实现 session 的共享了
- 但是这种方案具有两个大问题
- 每台服务器中都有完整的一份 session 数据,服务器压力过大。
- session 拷贝数据时,可能会出现延迟
- 采用的方案都是 基于 redis 来完成,把 session 换成 redis,redis 数据本身就是共享的,就可以避免 session 共享的问题
Redis 代替 Session 的业务流程
设计 key 的结构
设计 key 的具体细节
在设计 key,需要满足两点
1、key 要具有唯一性
2、key 要方便携带
如果采用 phone:手机号这个的数据来存储当然是可以的,但是如果把这样的敏感数据存储到 redis 中并且从页面中带过来毕竟不太合适,所以我们 在后台生成一个随机串token,然后让前端带来这个 token 就能完成我们的整体逻辑
整体访问流程
当注册完成后,用户去登录会去校验用户提交的手机号和验证码,是否一致
如果一致,则根据手机号查询用户信息,不存在则新建,最后将用户数据保存到 redis,并且生成 token 作为 redis 的 key
当我们校验用户是否登录时,会去携带着 token 进行访问,从 redis 中取出 token 对应的 value,判断是否存在这个数据,如果没有则拦截,如果存在则将其保存到 threadLocal 中,并且放行。
基于Redis实现短信登录
1 |
|
解决状态登录刷新问题
原始方案
可以使用对应路径的拦截,同时刷新登录 token 令牌的存活时间,但是现在这个拦截器他只是拦截需要被拦截的路径,假设当前用户访问了一些不需要拦截的路径,那么这个拦截器就不会生效,所以此时令牌刷新的动作实际上就不会执行
优化方案
既然之前的拦截器无法对不需要拦截的路径生效,那么我们可以添加一个拦截器,在第一个拦截器中拦截所有的路径,把第二个拦截器做的事情放入到第一个拦截器中,同时刷新令牌,因为第一个拦截器有了 threadLocal 的数据,所以此时第二个拦截器只需要判断拦截器中的 user 对象是否存在 即可完成整体刷新功能。
商铺查询缓存
缓存的概念
缓存 是数据交换的 缓冲区,俗称的缓存就是 缓冲区内的数据。缓存数据存储于代码中,而代码运行在内存中,内存的读写性能远高于磁盘。缓存可以大大降低用户访问并发量带来的服务器读写压力。实际开发过程中,企业的数据量,少则几十万,多则几千万,如果没有缓存来作为”避震器”,系统是几乎撑不住的,所以企业会大量运用到缓存技术;
浏览器缓存:主要是存在于浏览器端的缓存
应用层缓存:可以分为 tomcat 本地缓存,比如之前提到的 map,或者是使用 redis 作为缓存
数据库缓存: 在数据库中有一片空间是 buffer pool,增改查数据都会先加载到 mysql 的缓存中
CPU缓存: 当代计算机最大的问题是 cpu 性能提升了,但内存读写速度没有跟上,所以为了适应当下的情况,增加了 cpu 的L1,L2,L3级的缓存
添加商户缓存
缓存模型和思路
标准的操作方式就是查询数据库之前先查询缓存,如果缓存数据存在,则直接从缓存中返回,如果缓存数据不存在,再查询数据库,然后将数据存入 redis。
如果缓存有,则直接返回,如果缓存不存在,则查询数据库,然后存入redis。
1 |
|
1 |
|
添加店铺类型缓存
修改ShopTypeController中的queryTypeList方法,添加查询缓存
1 | public class ShopTypeController { |
1 |
|
缓存更新策略
缓存更新是 redis 为节约内存 而提出的,当我们向 redis 插入太多数据,此时就可能会导致缓存中的数据过多,所以 redis 会对部分数据进行更新,或者进行淘汰。
内存淘汰: redis 自动进行,当 redis 内存达到 max-memery 时自动触发淘汰机制,淘汰不重要的数据
超时剔除: 当给 redis 设置过期时间 ttl 后,redis 会将超时的数据进行删除,方便继续使用缓存
主动更新: 可以手动调用方法把缓存删掉,通常用于解决缓存和数据库不一致问题
数据库缓存不一致解决方案
由于 缓存的数据源来自于数据库,而数据库的 数据会发生变化,如果当数据库中 数据发生变化,而缓存却没有同步,此时会出现 数据库缓存不一致问题
解决方案 | 说明 |
---|---|
Cache Aside Pattern |
缓存调用者在更新完数据库后再去更新缓存,也称之为双写方案 |
Read/Write Through Pattern |
缓存与数据库整合为一个服务,由服务来维护一致性。调用者调用该服务,无需关系缓存一致性问题 |
Write Behind Caching Pattern |
调用者只操作缓存,其他线程将缓存数据异步持久化到数据库,实现最终一致 |
数据库和缓存不一致解决方案
1.删除缓存还是更新缓存?
- 更新缓存:每次更新数据库都更新缓存,无效写操作较多
- 删除缓存:更新数据库时让缓存失效,查询时再更新缓存
可以把缓存删除,等待再次查询时,将缓存中的数据加载出来
2.如何保证缓存与数据库的操作的同时成功或失败?
- 单体系统:将缓存与数据库操作放在一个事务
- 分布式系统:利用 TCC 等分布式事务方案
3.先操作缓存还是先操作数据库?
- 应当是先操作数据库,再删除缓存
如果两个线程并发来访问时,假设线程1先来,他先把缓存删了。此时线程2过来,他查询缓存数据并不存在,此时他写入缓存,当他写入缓存后,线程 1 再执行更新动作时,实际上写入的就是旧的数据,新的数据就被旧数据覆盖了。
实现商铺和缓存与数据库双写一致
根据 id 查询店铺时,如果缓存未命中,则查询数据库,将数据库结果写入缓存,并设置超时时间
根据 id 修改店铺时,先修改数据库,再删除缓存
设置redis缓存时添加过期时间
1 | public Result queryById(Long id) { |
采用删除策略,来解决双写问题。当我们修改了数据之后,然后把缓存中的数据进行删除,查询时发现缓存中没有数据,则会从 mysql 中加载最新的数据,从而避免数据库和缓存不一致的问题
1 |
|
缓存穿透
缓存穿透是指查询一个一定不存在的数据,由于缓存是未命中时需要从数据库查询,查不到数据则不写入缓存,这将导致这个不存在的数据每次请求都要到数据库去查询,进而给数据库带来压力。
解决方案
缓存空对象
当我们客户端访问不存在的数据时,先请求 redis,但是此时 redis 中没有数据,此时会访问到数据库,但是数据库中也没有数据,这个数据穿透了缓存,直击数据库,我们都知道数据库能够承载的并发不如 redis 这么高,如果大量的请求同时过来访问这种不存在的数据,这些请求就都会访问到数据库,哪怕这个数据在数据库中也不存在,也把这个数据存入到 redis 中去,这样,下次用户过来访问这个不存在的数据,那么在 redis 中也能找到这个数据就不会进入到缓存了
- 优点:实现简单,维护方便
- 缺点:额外的内存消耗;可能造成短期的不一致
布隆过滤
布隆过滤器其实采用的是哈希思想来解决这个问题,通过一个庞大的二进制数组,去判断当前这个要查询的这个数据是否存在,如果布隆过滤器判断存在,则放行,这个请求会去访问 redis,哪怕此时 redis 中的数据过期了,但是数据库中一定存在这个数据,在数据库中查询出来这个数据后,再将其放入到 redis 中,假设布隆过滤器判断这个数据不存在,则直接返回
这种方式优点在于节约内存空间,存在误判,误判原因在于:布隆过滤器是哈希思想,只要哈希思想,就可能存在哈希冲突
- 优点:内存占用较少,没有多余 key
- 缺点:实现复杂,存在误判可能
编码解决商品查询的缓存穿透问题:
核心思路
在原来的逻辑中,我们如果发现这个数据在 mysql 中不存在,直接就返回 404 了,这样是会存在缓存穿透问题的
修改逻辑
如果这个数据不存在,不会返回 404 ,还是会把这个数据写入到 Redis 中,并且将 value 设置为空,当再次发起查询时,我们如果发现命中之后,判断这个 value 是否是 null,如果是 null,则是之前写入的数据,证明是缓存穿透数据,如果不是,则直接返回数据。
1 |
|
缓存穿透产生的原因是什么?
用户请求的数据在缓存中和数据库中都不存在,不断发起这样的请求,给数据库带来巨大压力
缓存穿透的解决方案有哪些?
- 缓存 null 值
- 布隆过滤
- 增强 id 的复杂度,避免被猜测 id 规律
- 做好数据的基础格式校验
- 加强用户权限校验
- 做好热点参数的限流
缓存雪崩
缓存雪崩是指在 同一时段大量的缓存 key 同时失效或者 Redis 服务宕机,导致大量请求到达数据库,带来巨大压力。
解决方案:
- 给不同的 Key 的 TTL 添加随机值
- 利用 Redis 集群提高服务的可用性
- 给缓存业务添加降级限流策略
- 给业务添加多级缓存
缓存击穿
缓存击穿问题也叫 热点 Key 问题,就是一个被 高并发访问 并且 缓存重建业务较复杂 的 key 突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。
假设线程 1 在查询缓存后本来应该去查询数据库,然后把这个数据重新加载到缓存的,此时只要线程 1 走完这个逻辑,其他线程就都能从缓存中加载这些数据,但是 假设在线程 1 没有走完时,后续的线程 2,线程 3,线程 4 同时过来访问当前这个方法, 那么这些线程都不能从缓存中查询到数据,那么他们就会同一时刻来访问查询缓存,都没查到,接着同一时间去访问数据库,同时的去执行数据库代码,对数据库访问压力过大
常见的解决方案有两种:
- 互斥锁
- 逻辑过期
使用锁来解决:
因为锁能实现互斥性。假设线程过来,只能一个人一个人的来访问数据库,从而避免对于数据库访问压力过大,但这也会影响查询的性能,因为此时会让查询的性能从并行变成了串行,我们可以采用tryLock方法 + double check来解决这样的问题。
假设现在线程 1 过来访问,他查询缓存没有命中,但是此时他获得到了锁的资源,那么线程 1 就会一个人去执行逻辑,假设现在线程2过来,线程 2 在执行过程中,并没有获得到锁,那么线程 2 就可以进行到休眠,直到线程 1 把锁释放后,线程 2 获得到锁,然后再来执行逻辑,此时就能够从缓存中拿到数据了。
使用逻辑过期方案解决:
之所以会出现缓存击穿问题,是因为对 key 设置了过期时间,如果不设置过期时间就不会有缓存击穿的问题,但是不设置过期时间会导致数据就一直占用内存。把过期时间设置在 redis 的 value 中,但这个过期时间并不会直接作用于 redis,而是后续通过逻辑去处理。
假设线程 1 去查询缓存,然后从 value 中判断出来当前的数据已经过期,此时线程 1 去获得互斥锁,那么其他线程会进行阻塞。获得锁的线程 1 会开启一个线程 2 去进行重建缓存的逻辑,直到新开的线程 2 完成这个逻辑后才释放锁, 而线程 1 直接进行返回过期数据
假设现在线程 3 过来访问,由于线程线程 2 持有着锁,所以线程 3 无法获得锁,线程 3 也直接返回过期数据,只有等到新开的线程 2 重建缓存后,其他线程才能返回正确的数据。
该方案巧妙在于异步的构建缓存,缺点在于在构建完缓存之前,返回的都是脏数据。
进行对比
互斥锁方案:由于保证了互斥性,所以数据一致,且实现简单,因为仅仅只需要加一把锁而已,也没其他的事情需要操心,所以没有额外的内存消耗,缺点在于有锁就有死锁问题的发生,且只能串行执行性能肯定受到影响
逻辑过期方案: 线程读取过程中不需要等待,性能好,有一个额外的线程持有锁去进行重构数据,但是在重构数据完成前,其他的线程只能返回之前的数据,且实现起来麻烦
利用互斥锁解决缓存击穿问题
相较于原来从缓存中查询不到数据后直接查询数据库而言,现在的方案是 :
进行查询之后,如果 从缓存没有查询到数据,则进行互斥锁的获取,获取互斥锁后,判断是否获得到了锁,如果没有获得到,则休眠,过一会再进行尝试,直到获取到锁为止,才能进行查询
如果获取到了锁的线程,再去进行查询,查询后将数据写入redis,再释放锁,返回数据,利用互斥锁就能保证只有一个线程去执行操作数据库的逻辑,防止缓存击穿
操作锁的代码:
核心思路就是利用redis的setnx
方法来表示获取锁,该方法含义是 redis 中如果没有这个 key,则插入成功,返回 1,在stringRedisTemplate 中返回 true, 如果有这个 key 则插入失败,则返回 0,在 stringRedisTemplate 返回 false,我们可以通过 true,或者是 false,来表示是否有线程成功插入 key,成功插入的 key 的线程我们认为他就是获得到锁的线程。
1 | private boolean tryLock(String key) { |
1 | public Shop queryWithMutex(Long id) { |
利用逻辑过期解决缓存击穿问题
- 当用户开始查询 redis 时,判断是否命中,如果没有命中则直接返回空数据,不查询数据库
- 而一旦命中后,将 value 取出,判断 value 中的过期时间是否满足
- 如果没有过期,则直接返回 redis 中的数据
- 如果过期,则在开启独立线程后直接返回之前的数据,独立线程去重构数据,重构完成后释放互斥锁。
1 |
|
1 | public void saveShopToRedis(Long id, Long expireSeconds) throws InterruptedException { |
1 | private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10); |
缓存工具封装
基于 StringRedisTemplate 封装一个缓存工具类,满足下列需求:
- 方法 1:将任意 Java 对象序列化为 json 并存储在 string 类型的 key 中,并且可以设置 TTL 过期时间
方法 2:将任意 Java 对象序列化为 json 并存储在 string 类型的 key 中,并且可以设置逻辑过期时间,用于 处理缓存击穿问题
方法 3:根据指定的 key 查询缓存,并反序列化为指定类型,利用缓存空值的方式 解决缓存穿透问题
- 方法 4:根据指定的 key 查询缓存,并反序列化为指定类型,需要利用逻辑过期 解决缓存击穿问题
优惠券秒杀
全局唯一ID
当用户抢购时,就会生成订单并保存到 tb_voucher_order
这张表中,而订单表如果使用数据库自增 ID 就存在一些问题:
id
的规律性太明显- 受单表数据量的限制
全局 ID 生成器,是一种在分布式系统下用来生成全局唯一 ID 的工具,一般要满足下列特性:
- 唯一性
- 高可用
- 高性能
- 递增性
- 安全性
为了增加 ID 的安全性,我们可以不直接使用 Redis 自增的数值,而是拼接一些其它信息:
ID 的组成部分:符号位:1 bit,永远为 0
时间戳:31 bit,以秒为单位,可以使用 69 年
序列号:32 bit,秒内的计数器,支持每秒产生 $2^{32}$ 个不同 ID
全局唯一ID生成策略
- UUID
- Redis 自增:每日一 Key,方便统计订单量;ID 构造:时间戳 + 计数器
- 雪花算法
- 数据库自增
添加优惠券
每个店铺都可以发布优惠券,分为平价券和特价券。平价券可以任意购买,而特价券需要秒杀抢购:
tb_voucher
:优惠券的基本信息,优惠金额、使用规则等tb_seckill_voucher
:优惠券的库存、开始抢购时间,结束抢购时间。特价优惠券才需要填写这些信息
特价卷除了具有优惠卷的基本信息以外,还具有库存,抢购时间,结束时间等等字段
优惠券秒杀下单
秒杀下单应该思考的内容
下单时需要判断两点:
- 秒杀是否开始或结束,如果尚未开始或已经结束则无法下单
- 库存是否充足,不足则无法下单
下单核心逻辑分析:
当用户开始进行下单,我们应当去查询优惠卷信息,查询到优惠卷信息,判断是否满足秒杀条件
比如时间是否充足,如果时间充足,则进一步判断库存是否足够,如果两者都满足,则扣减库存,创建订单,然后返回订单id,如果有一个条件不满足则直接结束。
参考:VoucherOrderController
库存超卖
1 | if (voucher.getStock() < 1) { |
假设线程 1 过来查询库存,判断出来库存大于 1,正准备去扣减库存,但是还没有来得及去扣减,此时线程 2 过来,线程 2 也去查询库存,发现这个数量一定也大于 1,那么这两个线程都会去扣减库存,最终多个线程相当于一起去扣减库存,此时就会出现库存的超卖问题。
悲观锁:
悲观锁可以实现对于数据的串行化执行,比如syn,和lock都是悲观锁的代表,同时,悲观锁中又可以再细分为公平锁,非公平锁,可重入锁,等等
乐观锁:
乐观锁会有一个版本号,每次操作数据会对版本号+1,再提交回数据时,会去校验是否比之前的版本大1 ,如果大1 ,则进行操作成功,
这套机制的核心逻辑在于,如果在操作过程中,版本号只比原来大1 ,那么就意味着操作过程中没有人对他进行过修改,他的操作就是安全的,如果不大1,则数据被修改过
CAS 和 版本号法
利用cas进行无锁化机制加锁,var5 是操作前读取的内存值,while中的var1+var2 是预估值,如果预估值 == 内存值,则代表中间没有被人修改过,此时就将新值去替换 内存值。其中do while 是为了在操作失败时,再次进行自旋操作,即把之前的逻辑再操作一次。
1 | int var5; |
解决方案
修改代码方案一、
VoucherOrderServiceImpl 在扣减库存时,改为:
1 | boolean success = seckillVoucherService.update() |
以上逻辑的核心含义是:只要我扣减库存时的库存和之前我查询到的库存是一样的,就意味着没有人在中间修改过库存,那么此时就是安全的,但是以上这种方式通过测试发现会有很多失败的情况,
失败的原因在于:在使用乐观锁过程中假设100个线程同时都拿到了100的库存,然后大家一起去进行扣减,但是100个人中只有1个人能扣减成功,其他的人在处理时,他们在扣减时,库存已经被修改过了,所以此时其他线程都会失败
修改代码方案二、
之前的方式要修改前后都保持一致,但是这样我们分析过,成功的概率太低,所以我们的乐观锁需要变一下,改成stock大于0 即可
1 | boolean success = seckillVoucherService.update() |
限制下单
需求:修改秒杀业务,要求同一个优惠券,一个用户只能下一单
比如时间是否充足,如果时间充足,则进一步判断库存是否足够,然后再根据优惠卷id和用户id查询是否已经下过这个订单,如果下过这个订单,则不再下单,否则进行下单
参考:VoucherOrderServiceImpl
1 |
|
存在问题: 现在的问题还是和之前一样,并发过来,查询数据库,都不存在订单,所以我们还是需要加锁,但是乐观锁比较适合更新数据,而现在是插入数据,所以我们需要使用悲观锁操作
注意:在这里提到了非常多的问题,我们需要慢慢的来思考,首先我们的初始方案是封装了一个 createVoucherOrder 方法,同时为了确保他线程安全,在方法上添加了一把 synchronized 锁
1 |
|
但是以上代码还是存在问题,问题的原因在于 当前方法被 spring 的事务控制,如果你在方法内部加锁,可能会导致当前方法事务还没有提交,但是锁已经释放也会导致问题,所以我们选择将当前方法整体包裹起来,确保事务不会出现问题:如下:
在 seckillVoucher 方法中,添加以下逻辑,这样就能保证事务的特性,同时也控制了锁的粒度
1 | Long userId = UserHolder.getUser().getId(); |
但是以上做法依然有问题,因为你调用的方法,其实是 this.
的方式调用的,事务想要生效,还需要使用代理防止事务失效,所以这个地方,我们需要获得原始的事务对象来操作事务
1 | // 获取userId准备上锁 |
分布式锁
分布式锁
满足分布式系统或集群模式下多进程可见并且互斥的锁。
分布式锁的核心思想
让大家都 使用同一把锁,只要大家使用的是同一把锁,那么我们就能锁住线程,不让线程进行,让程序串行执行
分布式锁满足条件
可见性:多个线程都能看到相同的结果,注意:这个地方说的可见性并不是并发编程中指的内存可见性,只是说多个进程之间都能感知到变化
互斥:互斥是分布式锁的最基本的条件,使得程序串行执行
高可用:程序不易崩溃,时时刻刻都保证较高的可用性
高性能:由于加锁本身就让性能降低,所有对于分布式锁本身需要他就较高的加锁性能和释放锁性能
安全性:安全也是程序中必不可少的一环
常见的分布式锁有三种
Mysql:mysql 本身就带有锁机制,但是由于 mysql 性能本身一般,所以采用分布式锁的情况下,其实使用 mysql 作为分布式锁 比较少见
Redis:redis 作为分布式锁是 非常常见 的一种使用方式,现在企业级开发中基本都使用 redis 或者 zookeeper 作为分布式锁,利用 setnx 这个方法,如果插入 key 成功,则表示获得到了锁,如果有人插入成功,其他人插入失败则表示无法获得到锁,利用这套逻辑来实现分布式锁
Zookeeper:zookeeper 也是企业级开发中较好的一个实现分布式锁的方案
Redis 分布式锁
实现分布式锁时需要实现的两个基本方法:
获取锁:
互斥:确保只能有一个线程获取锁
1
2添加锁,利用setnx的互斥锁
SETNX lock thread1- 非阻塞:尝试一次,成功返回 true,失败返回 false
释放锁:
手动释放
1
2释放锁,删除即可
DEL key超时释放:获取锁时添加一个超时时间
Redis 分布式锁误删情况说明
逻辑说明:
持有锁的线程在锁的内部出现了阻塞,导致他的锁自动释放,这时其他线程,线程 2 来尝试获得锁,就拿到了这把锁,然后线程 2 在持有锁执行过程中,线程 1 反应过来,继续执行,而线程 1 执行过程中,走到了删除锁逻辑,此时就会把本应该属于线程 2 的锁进行删除,这就是误删别人锁的情况说明
解决方案:
解决方案就是 在每个线程释放锁的时候,去判断一下当前这把锁是否属于自己,如果属于自己,则不进行锁的删除。
假设还是上边的情况,线程 1 卡顿,锁自动释放,线程 2 进入到锁的内部执行逻辑,此时线程 1 反应过来,然后删除锁,但是线程 1,一看当前这把锁不是属于自己,于是不进行删除锁逻辑,当线程 2 走到删除锁逻辑时,如果没有卡过自动释放锁的时间点,则判断当前这把锁是属于自己的,于是删除这把锁。
解决 Redis 分布式锁误删问题
需求:
修改之前的分布式锁实现,满足:在获取锁时存入 线程标示(可以用 UUID 表示)
在释放锁时先获取锁中的线程标示,判断是否与当前线程标示一致
- 如果一致则释放锁
- 如果不一致则不释放锁
核心逻辑:
- 在存入锁时,放入自己线程的标识
- 在删除锁时,判断当前这把锁的标识是不是自己存入的。如果是,则进行删除;如果不是,则不进行删除。
分布式锁的原子性问题
线程 1 现在持有锁之后,在执行业务逻辑过程中,他正准备删除锁,而且已经走到了条件判断的过程中,比如他已经拿到了当前这把锁确实是属于他自己的,正准备删除锁,但是 此时他的锁到期 了
那么此时线程 2 进来,但是线程 1 他会接着往后执行,当他卡顿结束后,他直接就会执行删除锁那行代码,相当于条件判断并没有起到作用,这就是删锁时的原子性问题,之所以有这个问题,是因为线程 1 的拿锁,比锁,删锁,实际上并不是原子性的,我们要防止刚才的情况发生,
Lua 脚本解决多条命令原子性问题
Redis 提供了 Lua 脚本功能,在一个脚本中编写多条 Redis 命令,确保多条命令执行时的原子性。
Redis提供的调用函数,语法如下:
1 | redis.call('命令名称', 'key', '其它参数', ...) |
要执行set name jack,则脚本是这样:
1 | # 执行 set name jack |
要先执行set name Rose,再执行get name,则脚本如下:
1 | # 先执行 set name jack |
Java 调用 Lua 脚本改造分布式锁
在 RedisTemplate 中,可以利用 execute 方法去执行 lua 脚本,参数对应关系就如下
基于 Redis 的分布式锁实现思路:
- 利用
set nx/ex
获取锁,并设置过期时间,保存线程标示 - 释放锁时先判断线程标示是否与自己一致,一致则删除锁
特性:
- 利用
set nx
满足互斥性 - 利用
set ex
保证故障时锁依然能释放,避免死锁,提高安全性 - 利用 Redis 集群保证高可用和高并发特性
Redisson 实现分布式锁
基于 setnx 实现的分布式锁存在的问题:
不可重入:同一个线程无法多次获取同一把锁
- 可重入是指 获得锁的线程可以再次进入到相同的锁的代码块 中
- 可重入锁的意义在于防止死锁,常见的 synchronized 和 Lock 锁都是可重入的
不可重试:
- 获取锁只尝试一次就返回 false,没有重试机制
- 合理情况:当线程在获得锁失败后应该能再次尝试获得锁
超时释放: 锁超时释放虽然可以避免死锁,但如果是业务执行耗时较长,也会导致锁释放,存在安全隐患
- 在加锁时增加了过期时间,可以防止死锁
- 但是如果卡顿的时间超长,虽然采用 lua 表达式防止删锁时误删别人的锁,但是毕竟没有锁住,有安全隐患
主从一致性:
- 如果 Redis 提供主从集群,当我们向集群写数据时,主机需要异步的将数据同步给从机,而万一在同步过去之前主机宕机就会出现死锁问题。
Redisson 是一个在 Redis 的基础上实现的 Java 驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的 Java 常用对象,还提供了许多分布式服务,其中就包含了 各种分布式锁的实现。
引入依赖
1 | <dependency> |
配置 Redisson
1 |
|
1 | RLock lock = redissonClient.getLock(lockName); |
实现可重入锁
目的:保证同一个线程可以多次同一把锁
获取锁
1 | local key = KEYS[1]; -- 锁的key |
释放锁
1 | local key = KEYS[1]; -- 锁的key |
重试获取锁
基于 Redis Pub/Sub 发布订阅机制。 如果获取锁失败,则阻塞订阅释放锁的消息;当锁被释放时,会触发推送(告诉其他线程我释放锁啦),然后其他线程再重试获取;如此往复,直到超时。
防止锁提前超时释放
基于看门狗机制。 如果不手动设置锁释放时间(leaseTime),默认设置 30 秒过期,并且给当前锁注册一个定时任务,该定时任务每隔 1 / 3 的锁释放时间(一般是 10 秒)会重置锁的过期时间(递归调用,一次续期完了再)。
思考:
- 如何保证同一个锁只注册一个定时任务
如何防止无限续期
要解决这些问题,使用全局 ConcurrentHashMap 来管理锁 => 任务信息,key 为锁的 id,从而保证唯一。当某个锁释放时,从全局 ConcurrentHashMap 中取出定时任务并取消掉,然后把锁的信息从 Map 中删掉即可。
完整的分布式锁流程:
解决主从一致性问题
如果使用主从复制的 Redis 集群,可能出现主从节点设置的锁状态不一致的问题。
此时我们去写命令,写在主机上, 主机会将数据同步给从机,但是假设在主机还没有来得及把数据写入到从机去的时候,此时主机宕机,哨兵会发现主机宕机,并且选举一个 slave 变成 master,而此时新的 master 中实际上并没有锁信息,此时锁信息就已经丢掉了。
为了解决这个问题,redission 提出 MutiLock 锁,使用这把锁就不使用主从了,每个节点的地位都是一样的, 这把锁加锁的逻辑需要写入到每一个主丛节点上,只有所有的服务器都写入成功,此时才是加锁成功,假设现在某个节点挂了,那么他去获得锁的时候,只要有一个节点拿不到,都不能算是加锁成功,就保证了加锁的可靠性。
实现 MultiLock 的几个关键:
- 遍历所有节点,依次设置锁,并使用列表来记录所有主节点的锁是否设置成功。
- 只要有一个节点设置不成功,就要释放所有的锁,从头来过。
- 因为不同节点设置锁成功的时间不同,所以在所有锁设置成功后,要统一设置过期时间(但如果 leaseTime = -1 就不用了,因为开启了看门狗机制会自动续期)
- 锁释放时间(leaseTime)必须要大于抢锁最大等待时间(waitTime),否则可能出现第一个节点抢到锁,最后一个节点还没抢到锁,之前的锁就已经超时释放了。所以如果指定了 waitTime 和 leaseTime,默认 leaseTime = waitTime * 2。
秒杀优化
优化思路
- 串行改并行:原本由 1 个线程的操作改为由 2 个或多个线程同时操作,比如 1 个线程负责判断秒杀资格,1 个线程负责减库存 + 创建订单(写)
- 同步改异步:判断完秒杀资格后,就可以返回订单 id 给前端;其余的写库操作可以异步执行。
- 提高判断秒杀资格的性能:读 DB 改为读 Redis
异步秒杀
下单流程
当用户发起请求,此时会请求nginx,nginx会访问到tomcat,而tomcat中的程序,会进行串行操作,分成如下几个步骤
查询优惠卷
判断秒杀库存是否足够
查询订单
校验是否是一人一单
扣减库存
创建订单
在这六步操作中,又有很多操作是要去操作数据库的,而且还是一个线程串行执行, 这样就会导致程序执行的很慢,所以需要异步程序执行,那么如何加速呢?
优化方案:
将耗时比较短的逻辑判断放入到redis中,比如是否库存足够,比如是否一人一单,只要这种逻辑可以完成,就意味着我们是一定可以下单完成的,我们只需要进行快速的逻辑判断,根本就不用等下单逻辑走完,我们直接给用户返回成功, 再在后台开一个线程,后台线程慢慢的去执行queue里边的消息,这样程序不就超级快了吗?而且也不用担心线程池消耗殆尽的问题,因为这里我们的程序中并没有手动使用任何线程池
两个难点
第一个难点 怎么在redis中去快速校验一人一单,还有库存判断
第二个难点 由于我们校验和tomct下单是两个线程,那么我们如何知道到底哪个单他最后是否成功,或者是下单完成,为了完成这件事我们在redis操作完之后,我们会将一些信息返回给前端,同时也会把这些信息丢到异步queue中去,后续操作中,可以通过这个id来查询我们tomcat中的下单逻辑是否完成了。
整体思路:
当用户下单之后,判断库存是否充足只需要到redis中去根据key找对应的value是否大于0即可
- 如果不充足,则直接结束
- 如果充足,继续在redis中判断用户是否可以下单
- 如果set集合中没有这条数据,说明他可以下单,如果set集合中没有这条记录,则将userId和优惠卷存入到redis中,并且返回0,整个过程需要保证是原子性的,我们可以使用lua来操作
当以上判断逻辑走完之后,我们可以判断当前redis中返回的结果是否是0 ,如果是0,则表示可以下单,则将之前说的信息存入到到queue中去,然后返回,然后再来个线程异步的下单,前端可以通过返回的订单id来判断是否下单成功。
基于Redis完成秒杀资格判断
需求:
新增秒杀优惠券的同时,将优惠券信息保存到Redis中
基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能
小总结:
秒杀业务的优化思路
- 先利用Redis完成库存余量、一人一单判断,完成抢单业务
- 再将下单业务放入阻塞队列,利用独立线程异步下单
- 基于阻塞队列的异步秒杀存在哪些问题?
- 内存限制问题
- 数据安全问题(如果宕机了,内存中的数据就没了)
Redis 消息队列
- 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
- 生产者:发送消息到消息队列
- 消费者:从消息队列获取消息并处理消息
秒杀场景:下单之后,利用redis去进行校验下单条件,再通过队列把消息发送出去,然后再启动一个线程去消费这个消息,完成解耦,同时也加快响应速度。
基于List实现消息队列
使用Redis List的结构作为消息队列,使用LPush模拟生产者发送消息入队,使用BRPOP(阻塞弹出)模拟消费者取出消息。没有消息时会保持阻塞状态,从而实现类似JVM阻塞队列的效果
优点
- 利用Redis存储,不受限于JVM内存上限
- 基于Redis的持久化机制,数据安全性有保证
- 可以满足消息有序性
缺点
- 无法避免消息丢失
- 只支持单消费者
基于PubSub的消息队列
使用Redis的订阅发布模型,生产者可以讲消息推送给某个Channel,多个消费者可以订阅该Channel,从而同时得到消息
命令 | 说明 |
---|---|
SUBSCRIBE channel [channel] | 订阅一个或多个频道 |
PUBLISH channel msg | 向一个频道发送消息 |
PSUBSCRIBE pattern[pattern] | 订阅与pattern格式匹配的所有频道 |
优点
采用发布订阅模型,支持多生产、多消费
缺点
- 不支持数据持久化
- 无法避免消息丢失
- 消息堆积有上限,超出时数据丢失
基于Stream的消息队列
Redis 5.0推出的数据结构,可以实现单向的消息队列
命令 | 说明 |
---|---|
XAdd | 添加消息/创建队列,消息会自动持久化、不会丢失,每个消息都有唯一id |
XRead | 读取消息,支持多消费者读、可以从指定消息id开始读、支持阻塞读最新消息 |
发送消息
读取消息
在业务开发中,我们可以循环的调用XREAD阻塞方式来查询最新消息,从而实现持续监听队列的效果,伪代码如下
1 | while (true) { |
注意:当指定起始ID为$时,代表读取最新的消息,如果处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题
STREAM类型消息队列的XREAD命令特点:
- 消息可回溯
- 一个消息可以被多个消费者读取
- 可以阻塞读取
- 有消息漏读的风险
消费者组
只用这两个命令还是不够的,因为目前只支持阻塞读最新消息,假设处理消息过程中又来了几条消息,可能出现漏读消息的情况 。为解决上述问题,可以用 Stream 的以下特性:
消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。
- 消息分流:队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度
- 消息标识:消费者组会维护一个标识,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标识之后读取消息。确保每一个消息都会被消费
- 消息确认:消费者获取消息后,消息处于一个pending状态,并存入一个pending-list。当处理完成后需要通过XACK来确认消息,才会从pending-list移除。这样如果消费业务处理异常,可以从 pending list 的开头依次读取未确认消息,重试处理。(也要避免无限重试,实在处理不成功就强制 ACK + 业务记日志)
创建消费者组
1 | XGROUP CREATE key groupName ID [MKSTREAM] |
参数 | 说明 |
---|---|
key | 队列名称 |
groupName | 消费者组名称 |
ID | 起始ID标识,$代表队列中最后一个消息,0则代表队列中第一个消息 |
MKSTRAEM | 队列不存在时自动创建队列 |
:自动记录消费的进度,支持从上次未消费的地方开始接着消费,保证每条消息按顺序消费
删除指定的消费者组
1 | XGROUP DESTORY key groupName |
给指定的消费者组添加消费者
1 | XGROUP CREATECONSUMER key groupname consumername |
删除消费者组中的指定消费者
1 | XGROUP DELCONSUMER key groupname consumername |
从消费者组读取消息:
1 | XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...] |
参数 | 说明 |
---|---|
group | 消费组名称 |
consumer | 消费者名称,如果消费者不存在,会自动创建一个消费者 |
count | 本次查询的最大数量 |
BLOCK milliseconds | 当没有消息时最长等待时间 |
NOACK | 无需手动ACK,获取到消息后自动确认 |
STREAMS key | 指定队列名称 |
ID | 获取消息的起始ID: |
注意
>
:从下一个未消费的消息开始
其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始
STREAM类型消息队列的XREADGROUP命令特点:
- 消息可回溯
- 可以多消费者争抢消息,加快消费速度
- 可以阻塞读取
- 没有消息漏读的风险
- 有消息确认机制,保证消息至少被消费一次
整个消费流程伪代码
1 | while (true) { |
Java中操作Redis Stream方法
- 调用Lua
- 使用 Redis Tempalte 的 opsForStream()