Redis实战项目源码地址

短信登录

基于 Session 实现登录流程

发送短信验证码:

  1. 用户在提交手机号后,会校验手机号是否合法
  2. 如果不合法,则要求用户重新输入手机号
  3. 如果手机号合法,后台此时生成对应的验证码,同时将验证码进行保存到 session,然后再通过短信的方式将验证码发送给用户

短信验证码登录、注册:

  1. 用户将验证码和手机号进行输入,后台从 session 中拿到当前验证码,然后和用户输入的验证码进行校验
  2. 如果不一致,则无法通过校验
  3. 如果一致,则后台根据手机号查询用户
  4. 如果用户不存在,则为用户创建账号信息,保存到数据库
  5. 无论是否存在,都会将用户信息保存到 session 中,方便后续获得当前登录信息

校验登录状态:

  1. 用户在请求时候,会从 cookie 中携带者 JsessionId 到后台
  2. 后台通过 JsessionId 从 session 中拿到用户信息
  3. 如果没有 session 信息,则进行拦截
  4. 如果有 session 信息,则将用户信息保存到 threadLocal 中,并且放行

实现发送短信验证码功能

发送短信验证码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public Result sendCode(String phone, HttpSession session) {
// 1. 校验手机号
if (RegexUtils.isPhoneInvalid(phone)) {
// 2. 如果不符合,返回错误信息
return Result.fail("手机号格式错误!");
}
// 3. 如果符合,生成验证码
String code = RandomUtil.randomNumbers(6);
// 4. 保存验证码到session
session.setAttribute("code", code);
// 5. 发送验证码
log.info("发送短信验证码成功,验证码:{}", code);
// 6. 返回ok
return Result.ok();
}

短信验证码登录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public Result login(LoginFormDTO loginForm, HttpSession session) {
// 0. 提交手机号和验证码

// 1. 校验手机号
String phone = loginForm.getPhone();
if (RegexUtils.isPhoneInvalid(phone)) {
// 如果不合法,返回错误信息
Result.fail("手机号格式错误");
}

// 2. 校验验证码
Object cacheCode = session.getAttribute("code"); // 发送的code
String code = loginForm.getCode(); // 用户填的code

// 3. 如果不一致,返回错误信息
if (cacheCode == null || !cacheCode.toString().equals(code)) {
Result.fail("验证码错误");
}

// 4. 如果一致,根据手机号查询用户
// SELECT * FROM tb_user WHERE phone = #{phone};
User user = query().eq("phone", phone).one();

// 5. 判断用户是否存在
if (user == null) {
// 6. 如果不存在,创建新用户并保存
user = createUserWithPhone(phone);
}

// 7. 保存用户信息到session中(如果存在直接执行该操作)
session.setAttribute("user", user);

return Result.ok();
}

实现登录校验拦截器

  1. 当监听线程知道用户想要和 tomcat 连接时,那会由监听线程创建 socket 连接,socket 都是成对出现的,用户通过 socket 互相传递数据
  2. 当 tomcat 端的 socket 接收到数据后,此时监听线程会从 tomcat 的线程池中取出一个线程执行用户请求
  3. 在我们的服务部署到 tomcat 后,线程会找到用户想要访问的工程,然后用这个线程转发到工程中的 controller,service,dao 中,并且访问对应的 DB,在用户执行完请求后,再统一返回,再找到 tomcat 端的 socket,再将数据写回到用户端的 socket,完成请求和响应

可以得知每个用户其实对应都是去找 tomcat 线程池中的一个线程来完成工作的, 使用完成后再进行回收,既然每个请求都是独立的,所以在每个用户去访问我们的工程时,我们可以使用 threadlocal 来做到线程隔离,每个线程操作自己的一份数据

拦截器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class LoginInterceptor implements HandlerInterceptor {


@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
// 1. 获取session
HttpSession session = request.getSession();

// 2. session中的用户
Object user = session.getAttribute("user");

// 3. 判断用户是否存在
if (user == null) {
// 4. 如果用户不存在,则进行拦截
response.setStatus(401); // 未授权
return false;
}

// 5. 如果用户存在,保存用户信息到ThreadLocal
UserHolder.saveUser((User) user);

// 6. 放行
return true;
}

@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) {
// 移除用户
UserHolder.removeUser();
}
}

配置拦截器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Configuration
public class MvcConfig implements WebMvcConfigurer {

@Resource
private StringRedisTemplate stringRedisTemplate;

@Override
public void addInterceptors(InterceptorRegistry registry) {
// 登录拦截器
registry.addInterceptor(new LoginInterceptor())
.excludePathPatterns(
"/shop/**",
"/voucher/**",
"/shop-type/**",
"/upload/**",
"/blog/hot",
"/user/code",
"/user/login"
);
}
}

隐藏用户敏感信息

在登录方法处修改

1
2
// 7.保存用户信息到session中
session.setAttribute("user", BeanUtils.copyProperties(user,UserDTO.class));

在拦截器处:

1
2
// 5.存在,保存用户信息到Threadlocal
UserHolder.saveUser((UserDTO) user);

在UserHolder处:将user对象换成UserDTO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class UserHolder {
private static final ThreadLocal<UserDTO> tl = new ThreadLocal<>();

public static void saveUser(UserDTO user){
tl.set(user);
}

public static UserDTO getUser(){
return tl.get();
}

public static void removeUser(){
tl.remove();
}
}

Session 共享问题

核心思路分析:

  1. 每个 tomcat 中都有一份属于自己的 session
  2. 假设用户第一次访问第一台 tomcat,并且把自己的信息存放到第一台服务器的 session 中,但是第二次这个用户访问到了第二台 tomcat,那么在第二台服务器上,肯定没有第一台服务器存放的 session,所以此时整个登录拦截功能就会出现问题
  3. 早期的方案是 session 拷贝,就是说虽然每个 tomcat 上都有不同的 session,但是每当任意一台服务器的 session 修改时,都会同步给其他的 Tomcat 服务器的 session,这样的话,就可以实现 session 的共享了
  4. 但是这种方案具有两个大问题
    • 每台服务器中都有完整的一份 session 数据,服务器压力过大。
    • session 拷贝数据时,可能会出现延迟
  5. 采用的方案都是 基于 redis 来完成,把 session 换成 redis,redis 数据本身就是共享的,就可以避免 session 共享的问题

Redis 代替 Session 的业务流程

设计 key 的结构

设计 key 的具体细节

在设计 key,需要满足两点

1、key 要具有唯一性

2、key 要方便携带

如果采用 phone:手机号这个的数据来存储当然是可以的,但是如果把这样的敏感数据存储到 redis 中并且从页面中带过来毕竟不太合适,所以我们 在后台生成一个随机串token,然后让前端带来这个 token 就能完成我们的整体逻辑

整体访问流程

  1. 当注册完成后,用户去登录会去校验用户提交的手机号和验证码,是否一致

  2. 如果一致,则根据手机号查询用户信息,不存在则新建,最后将用户数据保存到 redis,并且生成 token 作为 redis 的 key

  3. 当我们校验用户是否登录时,会去携带着 token 进行访问,从 redis 中取出 token 对应的 value,判断是否存在这个数据,如果没有则拦截,如果存在则将其保存到 threadLocal 中,并且放行。

基于Redis实现短信登录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@Override
public Result login(LoginFormDTO loginForm, HttpSession session) {
// 0. 提交手机号和验证码

// 1. 校验手机号
String phone = loginForm.getPhone();
if (RegexUtils.isPhoneInvalid(phone)) {
// 如果不合法,返回错误信息
Result.fail("手机号格式错误");
}

// 2. 校验验证码
String cacheCode = stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY + phone);
String code = loginForm.getCode(); // 用户填的code

// 3. 如果不一致,返回错误信息
if (cacheCode == null || !cacheCode.toString().equals(code)) {
Result.fail("验证码错误");
}

// 4. 如果一致,根据手机号查询用户
// SELECT * FROM tb_user WHERE phone = #{phone};
User user = query().eq("phone", phone).one();

// 5. 判断用户是否存在
if (user == null) {
// 6. 如果不存在,创建新用户并保存
user = createUserWithPhone(phone);
}

// // 7. 保存用户信息到session中(如果存在直接执行该操作)
// session.setAttribute("user", BeanUtil.copyProperties(user, UserDTO.class));
// 7-1 随机生成token,作为登录令牌
String token = UUID.randomUUID().toString(true);
// 7-2 将User对象转HashMap存储
UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
Map<String, Object> map = BeanUtil.beanToMap(userDTO, new HashMap<>(),
CopyOptions.create()
.setIgnoreNullValue(true)
.setFieldValueEditor((fieldName, fieldValue)->fieldValue.toString())
);
// 7-3 存储,并设有效期
String tokenKey = LOGIN_USER_KEY + token;
stringRedisTemplate.opsForHash().putAll(tokenKey, map);
// 设置有效期
stringRedisTemplate.expire(tokenKey, LOGIN_USER_TTL, TimeUnit.MINUTES);

return Result.ok(token);
}

解决状态登录刷新问题

原始方案

可以使用对应路径的拦截,同时刷新登录 token 令牌的存活时间,但是现在这个拦截器他只是拦截需要被拦截的路径,假设当前用户访问了一些不需要拦截的路径,那么这个拦截器就不会生效,所以此时令牌刷新的动作实际上就不会执行

优化方案

既然之前的拦截器无法对不需要拦截的路径生效,那么我们可以添加一个拦截器,在第一个拦截器中拦截所有的路径,把第二个拦截器做的事情放入到第一个拦截器中,同时刷新令牌,因为第一个拦截器有了 threadLocal 的数据,所以此时第二个拦截器只需要判断拦截器中的 user 对象是否存在 即可完成整体刷新功能。

商铺查询缓存

缓存的概念

缓存 是数据交换的 缓冲区,俗称的缓存就是 缓冲区内的数据。缓存数据存储于代码中,而代码运行在内存中,内存的读写性能远高于磁盘。缓存可以大大降低用户访问并发量带来的服务器读写压力。实际开发过程中,企业的数据量,少则几十万,多则几千万,如果没有缓存来作为”避震器”,系统是几乎撑不住的,所以企业会大量运用到缓存技术;

浏览器缓存:主要是存在于浏览器端的缓存

应用层缓存:可以分为 tomcat 本地缓存,比如之前提到的 map,或者是使用 redis 作为缓存

数据库缓存: 在数据库中有一片空间是 buffer pool,增改查数据都会先加载到 mysql 的缓存中

CPU缓存: 当代计算机最大的问题是 cpu 性能提升了,但内存读写速度没有跟上,所以为了适应当下的情况,增加了 cpu 的L1,L2,L3级的缓存

添加商户缓存

缓存模型和思路

标准的操作方式就是查询数据库之前先查询缓存,如果缓存数据存在,则直接从缓存中返回,如果缓存数据不存在,再查询数据库,然后将数据存入 redis。

如果缓存有,则直接返回,如果缓存不存在,则查询数据库,然后存入redis。

1
2
3
4
5
@GetMapping("/{id}")
public Result queryShopById(@PathVariable("id") Long id) {
//这里是直接查询数据库
return shopService.queryById(id);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
public Result queryById(Long id) {

String key = CACHE_SHOP_KEY + id;
// 1. 从redis查询商铺缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);
// 2. 判断缓存 是否存在
if (StrUtil.isNotBlank(shopJson)) {
// 3. 如果缓存存在,直接返回信息
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return Result.ok(shop);
}
// 4. 如果缓存不存在,则根据id查询数据库
Shop shop = getById(id);
// 5. 如果数据库中不存在,则返回错误
if (shop == null) {
return Result.fail("店铺不存在!");
}
// 6. 如果数据库中存在。则写入redis
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop));
// 7. 返回
return Result.ok(shop);
}

添加店铺类型缓存

修改ShopTypeController中的queryTypeList方法,添加查询缓存

1
2
3
4
5
6
7
8
9
10
11
public class ShopTypeController {
@Resource
private IShopTypeService typeService;

@GetMapping("list")
public Result queryTypeList() {
List<ShopType> typeList = typeService
.query().orderByAsc("sort").list();
return Result.ok(typeList);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Override
public Result queryShopTypeList() {

// 1. 从redis中查询商铺类型缓存
List<String> shopTypeList = new ArrayList<>();
shopTypeList = stringRedisTemplate.opsForList().range(CACHE_SHOP_TYPE_KEY, 0, -1);
// 2. 判断缓存是否存在
if (!shopTypeList.isEmpty()) {
// 3. 如果缓存存在,直接返回缓存信息
List<ShopType> typeList = new ArrayList<>();
for (String item : shopTypeList) {
// 将缓存中的每一个list项转换成对应shopType类型元素
ShopType shopType = JSONUtil.toBean(item, ShopType.class);
typeList.add(shopType);
}
return Result.ok(typeList);
}

// 4. 如果缓存未命中,则直接从数据库查询商铺类型信息
List<ShopType> typeList = query().orderByAsc("sort").list();

// 5. 如果数据库不存在,则返回空
if (typeList.isEmpty()) {
return Result.fail("不存在店铺类型!");
}
// 6. 如果数据库中存在,则写入redis
for (ShopType shopType : typeList) {
String item = JSONUtil.toJsonStr(shopType);
shopTypeList.add(item);
}
stringRedisTemplate.opsForList().rightPushAll(CACHE_SHOP_TYPE_KEY, shopTypeList);
// 7. 返回结果
return Result.ok(typeList);
}

缓存更新策略

缓存更新是 redis 为节约内存 而提出的,当我们向 redis 插入太多数据,此时就可能会导致缓存中的数据过多,所以 redis 会对部分数据进行更新,或者进行淘汰。

内存淘汰: redis 自动进行,当 redis 内存达到 max-memery 时自动触发淘汰机制,淘汰不重要的数据

超时剔除: 当给 redis 设置过期时间 ttl 后,redis 会将超时的数据进行删除,方便继续使用缓存

主动更新: 可以手动调用方法把缓存删掉,通常用于解决缓存和数据库不一致问题

数据库缓存不一致解决方案

由于 缓存的数据源来自于数据库,而数据库的 数据会发生变化,如果当数据库中 数据发生变化,而缓存却没有同步,此时会出现 数据库缓存不一致问题

解决方案 说明
Cache Aside Pattern 缓存调用者在更新完数据库后再去更新缓存,也称之为双写方案
Read/Write Through Pattern 缓存与数据库整合为一个服务,由服务来维护一致性。调用者调用该服务,无需关系缓存一致性问题
Write Behind Caching Pattern 调用者只操作缓存,其他线程将缓存数据异步持久化到数据库,实现最终一致

数据库和缓存不一致解决方案

1.删除缓存还是更新缓存?

  • 更新缓存:每次更新数据库都更新缓存,无效写操作较多
  • 删除缓存:更新数据库时让缓存失效,查询时再更新缓存
    可以把缓存删除,等待再次查询时,将缓存中的数据加载出来

2.如何保证缓存与数据库的操作的同时成功或失败?

  • 单体系统:将缓存与数据库操作放在一个事务
  • 分布式系统:利用 TCC 等分布式事务方案

3.先操作缓存还是先操作数据库?

  • 应当是先操作数据库,再删除缓存

如果两个线程并发来访问时,假设线程1先来,他先把缓存删了。此时线程2过来,他查询缓存数据并不存在,此时他写入缓存,当他写入缓存后,线程 1 再执行更新动作时,实际上写入的就是旧的数据,新的数据就被旧数据覆盖了。

实现商铺和缓存与数据库双写一致

  1. 根据 id 查询店铺时,如果缓存未命中,则查询数据库,将数据库结果写入缓存,并设置超时时间

  2. 根据 id 修改店铺时,先修改数据库,再删除缓存

设置redis缓存时添加过期时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public Result queryById(Long id) {

String key = CACHE_SHOP_KEY + id;
// 1. 从redis查询商铺缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);
// 2. 判断缓存 是否存在
if (StrUtil.isNotBlank(shopJson)) {
// 3. 如果缓存存在,直接返回信息
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return Result.ok(shop);
}
// 4. 如果缓存不存在,则根据id查询数据库
Shop shop = getById(id);
// 5. 如果数据库中不存在,则返回错误
if (shop == null) {
return Result.fail("店铺不存在!");
}
// 6. 如果数据库中存在。则写入redis
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), CACHE_SHOP_TTL, TimeUnit.MINUTES);
// 7. 返回
return Result.ok(shop);
}

采用删除策略,来解决双写问题。当我们修改了数据之后,然后把缓存中的数据进行删除,查询时发现缓存中没有数据,则会从 mysql 中加载最新的数据,从而避免数据库和缓存不一致的问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
@Transactional
public Result update(Shop shop) {

Long id = shop.getId();
if (id == null) {
return Result.fail("店铺Id不能为空!");
}

// 1. 更新数据库
updateById(shop);
// 2. 删除缓存
stringRedisTemplate.delete(CACHE_SHOP_KEY + id);
return Result.ok();
}

缓存穿透

缓存穿透是指查询一个一定不存在的数据,由于缓存是未命中时需要从数据库查询,查不到数据则不写入缓存,这将导致这个不存在的数据每次请求都要到数据库去查询,进而给数据库带来压力。

解决方案

缓存空对象

当我们客户端访问不存在的数据时,先请求 redis,但是此时 redis 中没有数据,此时会访问到数据库,但是数据库中也没有数据,这个数据穿透了缓存,直击数据库,我们都知道数据库能够承载的并发不如 redis 这么高,如果大量的请求同时过来访问这种不存在的数据,这些请求就都会访问到数据库,哪怕这个数据在数据库中也不存在,也把这个数据存入到 redis 中去,这样,下次用户过来访问这个不存在的数据,那么在 redis 中也能找到这个数据就不会进入到缓存了

  • 优点:实现简单,维护方便
  • 缺点:额外的内存消耗;可能造成短期的不一致

布隆过滤

布隆过滤器其实采用的是哈希思想来解决这个问题,通过一个庞大的二进制数组,去判断当前这个要查询的这个数据是否存在,如果布隆过滤器判断存在,则放行,这个请求会去访问 redis,哪怕此时 redis 中的数据过期了,但是数据库中一定存在这个数据,在数据库中查询出来这个数据后,再将其放入到 redis 中,假设布隆过滤器判断这个数据不存在,则直接返回

这种方式优点在于节约内存空间,存在误判,误判原因在于:布隆过滤器是哈希思想,只要哈希思想,就可能存在哈希冲突

  • 优点:内存占用较少,没有多余 key
  • 缺点:实现复杂,存在误判可能

编码解决商品查询的缓存穿透问题

核心思路

在原来的逻辑中,我们如果发现这个数据在 mysql 中不存在,直接就返回 404 了,这样是会存在缓存穿透问题的

修改逻辑

如果这个数据不存在,不会返回 404 ,还是会把这个数据写入到 Redis 中,并且将 value 设置为空,当再次发起查询时,我们如果发现命中之后,判断这个 value 是否是 null,如果是 null,则是之前写入的数据,证明是缓存穿透数据,如果不是,则直接返回数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Override
public Result queryById(Long id) {
String key = CACHE_SHOP_KEY + id;
// 1. 从redis中查询缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);
// 2. 判断缓存是否命中
if (StrUtil.isNotBlank(shopJson)) {
// 3. 如果缓存命中,直接返回商铺信息
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return Result.ok(shop);
}
// 判断命中的是否是空值
if (shopJson != null) {
// 返回错误信息
return Result.fail("店铺信息不存在!");
}
// 4. 如果缓存未命中,则根据id查询数据库。如果数据库中存在记录,直接返回
Shop shop = getById(id);
// 5. 如果数据库中不存在记录,返回404
if (shop == null) {
// 将空值写入Redis
stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES);
return Result.fail("店铺不存在!");
}
// 6. 当数据库中存在记录时,将商铺数据写入Redis
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), CACHE_SHOP_TTL, TimeUnit.MINUTES);
// 7. 结束
return Result.ok(shop);
}

缓存穿透产生的原因是什么

用户请求的数据在缓存中和数据库中都不存在,不断发起这样的请求,给数据库带来巨大压力

缓存穿透的解决方案有哪些

  • 缓存 null 值
  • 布隆过滤
  • 增强 id 的复杂度,避免被猜测 id 规律
  • 做好数据的基础格式校验
  • 加强用户权限校验
  • 做好热点参数的限流

缓存雪崩

缓存雪崩是指在 同一时段大量的缓存 key 同时失效或者 Redis 服务宕机,导致大量请求到达数据库,带来巨大压力。

解决方案:

  • 给不同的 Key 的 TTL 添加随机值
  • 利用 Redis 集群提高服务的可用性
  • 给缓存业务添加降级限流策略
  • 给业务添加多级缓存

缓存击穿

缓存击穿问题也叫 热点 Key 问题,就是一个被 高并发访问 并且 缓存重建业务较复杂 的 key 突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。

假设线程 1 在查询缓存后本来应该去查询数据库,然后把这个数据重新加载到缓存的,此时只要线程 1 走完这个逻辑,其他线程就都能从缓存中加载这些数据,但是 假设在线程 1 没有走完时,后续的线程 2,线程 3,线程 4 同时过来访问当前这个方法, 那么这些线程都不能从缓存中查询到数据,那么他们就会同一时刻来访问查询缓存,都没查到,接着同一时间去访问数据库,同时的去执行数据库代码,对数据库访问压力过大

常见的解决方案有两种:

  • 互斥锁
  • 逻辑过期

使用锁来解决:

因为锁能实现互斥性。假设线程过来,只能一个人一个人的来访问数据库,从而避免对于数据库访问压力过大,但这也会影响查询的性能,因为此时会让查询的性能从并行变成了串行,我们可以采用tryLock方法 + double check来解决这样的问题。

假设现在线程 1 过来访问,他查询缓存没有命中,但是此时他获得到了锁的资源,那么线程 1 就会一个人去执行逻辑,假设现在线程2过来,线程 2 在执行过程中,并没有获得到锁,那么线程 2 就可以进行到休眠,直到线程 1 把锁释放后,线程 2 获得到锁,然后再来执行逻辑,此时就能够从缓存中拿到数据了。

使用逻辑过期方案解决:

之所以会出现缓存击穿问题,是因为对 key 设置了过期时间,如果不设置过期时间就不会有缓存击穿的问题,但是不设置过期时间会导致数据就一直占用内存。把过期时间设置在 redis 的 value 中,但这个过期时间并不会直接作用于 redis,而是后续通过逻辑去处理。

假设线程 1 去查询缓存,然后从 value 中判断出来当前的数据已经过期,此时线程 1 去获得互斥锁,那么其他线程会进行阻塞。获得锁的线程 1 会开启一个线程 2 去进行重建缓存的逻辑,直到新开的线程 2 完成这个逻辑后才释放锁, 而线程 1 直接进行返回过期数据

假设现在线程 3 过来访问,由于线程线程 2 持有着锁,所以线程 3 无法获得锁,线程 3 也直接返回过期数据,只有等到新开的线程 2 重建缓存后,其他线程才能返回正确的数据。

该方案巧妙在于异步的构建缓存,缺点在于在构建完缓存之前,返回的都是脏数据

进行对比

互斥锁方案:由于保证了互斥性,所以数据一致,且实现简单,因为仅仅只需要加一把锁而已,也没其他的事情需要操心,所以没有额外的内存消耗,缺点在于有锁就有死锁问题的发生,且只能串行执行性能肯定受到影响

逻辑过期方案: 线程读取过程中不需要等待,性能好,有一个额外的线程持有锁去进行重构数据,但是在重构数据完成前,其他的线程只能返回之前的数据,且实现起来麻烦

利用互斥锁解决缓存击穿问题

相较于原来从缓存中查询不到数据后直接查询数据库而言,现在的方案是 :

进行查询之后,如果 从缓存没有查询到数据,则进行互斥锁的获取,获取互斥锁后,判断是否获得到了锁,如果没有获得到,则休眠,过一会再进行尝试,直到获取到锁为止,才能进行查询

如果获取到了锁的线程,再去进行查询,查询后将数据写入redis,再释放锁,返回数据,利用互斥锁就能保证只有一个线程去执行操作数据库的逻辑,防止缓存击穿

操作锁的代码:

核心思路就是利用redis的setnx方法来表示获取锁,该方法含义是 redis 中如果没有这个 key,则插入成功,返回 1,在stringRedisTemplate 中返回 true, 如果有这个 key 则插入失败,则返回 0,在 stringRedisTemplate 返回 false,我们可以通过 true,或者是 false,来表示是否有线程成功插入 key,成功插入的 key 的线程我们认为他就是获得到锁的线程。

1
2
3
4
5
6
7
8
private boolean tryLock(String key) {
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
return BooleanUtil.isTrue(flag);
}

private void unlock(String key) {
stringRedisTemplate.delete(key);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public Shop queryWithMutex(Long id) {
String key = CACHE_SHOP_KEY + id;
// 1. 从redis中查询缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);
// 2. 判断缓存是否命中
if (StrUtil.isNotBlank(shopJson)) {
// 3. 如果缓存命中,直接返回商铺信息
return JSONUtil.toBean(shopJson, Shop.class);
}
// 判断命中的是否是空值
if (shopJson != null) {
// 返回错误信息
return null;
}
// 4. 如果未命中。实现缓存重建
// 4.1 获取互斥锁
String lockKey = LOCK_SHOP_KEY + id;
Shop shop = null;
try {
boolean isLock = tryLock(lockKey);
// 4.2 判断是否获取成功
if (!isLock) {
// 4.3 失败,则休眠并重试
Thread.sleep(50);
return queryWithMutex(id);
}
// 4.4 成功,根据id查询数据库
shop = getById(id);
// 模拟重建延时
Thread.sleep(200);
// 5. 如果数据库中不存在记录,返回404
if (shop == null) {
// 将空值写入Redis
stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES);
return null;
}
// 6. 当数据库中存在记录时,将商铺数据写入Redis
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), CACHE_SHOP_TTL, TimeUnit.MINUTES);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
// 7. 释放互斥锁
unlock(lockKey);
}

// 7. 结束
return shop;
}

利用逻辑过期解决缓存击穿问题

  • 当用户开始查询 redis 时,判断是否命中,如果没有命中则直接返回空数据,不查询数据库
  • 而一旦命中后,将 value 取出,判断 value 中的过期时间是否满足
    • 如果没有过期,则直接返回 redis 中的数据
    • 如果过期,则在开启独立线程后直接返回之前的数据,独立线程去重构数据,重构完成后释放互斥锁。

1
2
3
4
5
@Data
public class RedisData {
private LocalDateTime expireTime;
private Object data;
}
1
2
3
4
5
6
7
8
9
10
11
public void saveShopToRedis(Long id, Long expireSeconds) throws InterruptedException {
// 1. 查询店铺数据
Shop shop = getById(id);
Thread.sleep(200);
// 2. 封装逻辑过期时间
RedisData redisData = new RedisData();
redisData.setData(shop);
redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));
// 3. 写入Redis
stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(redisData));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);

// 逻辑过期实现缓存击穿
public Shop queryWithLogicalExpire(Long id) {
String key = CACHE_SHOP_KEY + id;

// 1. 从redis查询商铺缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);
// 2. 判断缓存是否命中
if (StrUtil.isBlank(shopJson)) {
// 3. 如果缓存未命中,直接返回空
return null;
}

// 4. 如果命中,需要先把json反序列化为对象
RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
Shop shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class);
LocalDateTime expireTime = redisData.getExpireTime();
// 5. 判断缓存是否过期
if (expireTime.isAfter(LocalDateTime.now())) {
// 5-1 如果缓存未过期,直接返回店铺信息
return shop;
}

// 5-2 如果缓存已过期,尝试获取互斥锁,进行缓存重建

// 6. 尝试获取互斥锁
// 6-1 获取互斥锁
String lockKey = LOCK_SHOP_KEY + id;
boolean isLock = tryLock(lockKey);
// 6-2 判断是否获取锁成功
if (isLock) {
// 6-3 如果获取锁成功,开启独立线程,实现缓存重建
/**
* 获取锁成功应该再次检测redis缓存是否过期,做DoubleCheck,如果存在则无需重建缓存
*/
CACHE_REBUILD_EXECUTOR.submit(() -> {
try {
// 缓存重建
this.saveShopToRedis(id, 30L);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
// 释放锁
unlock(lockKey);
}
});
}

// 6-4 如果获取锁失败,返回过期的商铺信息
return shop;
}

缓存工具封装

基于 StringRedisTemplate 封装一个缓存工具类,满足下列需求:

  • 方法 1:将任意 Java 对象序列化为 json 并存储在 string 类型的 key 中,并且可以设置 TTL 过期时间
  • 方法 2:将任意 Java 对象序列化为 json 并存储在 string 类型的 key 中,并且可以设置逻辑过期时间,用于 处理缓存击穿问题

  • 方法 3:根据指定的 key 查询缓存,并反序列化为指定类型,利用缓存空值的方式 解决缓存穿透问题

  • 方法 4:根据指定的 key 查询缓存,并反序列化为指定类型,需要利用逻辑过期 解决缓存击穿问题

优惠券秒杀

全局唯一ID

当用户抢购时,就会生成订单并保存到 tb_voucher_order 这张表中,而订单表如果使用数据库自增 ID 就存在一些问题:

  • id 的规律性太明显
  • 受单表数据量的限制

全局 ID 生成器,是一种在分布式系统下用来生成全局唯一 ID 的工具,一般要满足下列特性:

  • 唯一性
  • 高可用
  • 高性能
  • 递增性
  • 安全性

为了增加 ID 的安全性,我们可以不直接使用 Redis 自增的数值,而是拼接一些其它信息:

ID 的组成部分:符号位:1 bit,永远为 0

时间戳:31 bit,以秒为单位,可以使用 69 年

序列号:32 bit,秒内的计数器,支持每秒产生 $2^{32}$ 个不同 ID

全局唯一ID生成策略

  • UUID
  • Redis 自增:每日一 Key,方便统计订单量;ID 构造:时间戳 + 计数器
  • 雪花算法
  • 数据库自增

添加优惠券

每个店铺都可以发布优惠券,分为平价券和特价券。平价券可以任意购买,而特价券需要秒杀抢购:

tb_voucher:优惠券的基本信息,优惠金额、使用规则等
tb_seckill_voucher:优惠券的库存、开始抢购时间,结束抢购时间。特价优惠券才需要填写这些信息

特价卷除了具有优惠卷的基本信息以外,还具有库存,抢购时间,结束时间等等字段

优惠券秒杀下单

秒杀下单应该思考的内容

下单时需要判断两点:

  • 秒杀是否开始或结束,如果尚未开始或已经结束则无法下单
  • 库存是否充足,不足则无法下单

下单核心逻辑分析:

当用户开始进行下单,我们应当去查询优惠卷信息,查询到优惠卷信息,判断是否满足秒杀条件

比如时间是否充足,如果时间充足,则进一步判断库存是否足够,如果两者都满足,则扣减库存,创建订单,然后返回订单id,如果有一个条件不满足则直接结束。

参考:VoucherOrderController

库存超卖

1
2
3
4
5
6
7
8
9
10
11
12
if (voucher.getStock() < 1) {
// 库存不足
return Result.fail("库存不足!");
}
//5,扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock= stock -1")
.eq("voucher_id", voucherId).update();
if (!success) {
//扣减库存
return Result.fail("库存不足!");
}

假设线程 1 过来查询库存,判断出来库存大于 1,正准备去扣减库存,但是还没有来得及去扣减,此时线程 2 过来,线程 2 也去查询库存,发现这个数量一定也大于 1,那么这两个线程都会去扣减库存,最终多个线程相当于一起去扣减库存,此时就会出现库存的超卖问题。

悲观锁:

悲观锁可以实现对于数据的串行化执行,比如syn,和lock都是悲观锁的代表,同时,悲观锁中又可以再细分为公平锁,非公平锁,可重入锁,等等

乐观锁:

乐观锁会有一个版本号,每次操作数据会对版本号+1,再提交回数据时,会去校验是否比之前的版本大1 ,如果大1 ,则进行操作成功,

这套机制的核心逻辑在于,如果在操作过程中,版本号只比原来大1 ,那么就意味着操作过程中没有人对他进行过修改,他的操作就是安全的,如果不大1,则数据被修改过

CAS 和 版本号法

利用cas进行无锁化机制加锁,var5 是操作前读取的内存值,while中的var1+var2 是预估值,如果预估值 == 内存值,则代表中间没有被人修改过,此时就将新值去替换 内存值。其中do while 是为了在操作失败时,再次进行自旋操作,即把之前的逻辑再操作一次。

1
2
3
4
5
6
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

return var5;

解决方案

修改代码方案一、

VoucherOrderServiceImpl 在扣减库存时,改为:

1
2
3
boolean success = seckillVoucherService.update()
.setSql("stock= stock -1") // set stock = stock -1
.eq("voucher_id", voucherId).eq("stock",voucher.getStock()).update(); // where id = ? and stock = ?

以上逻辑的核心含义是:只要我扣减库存时的库存和之前我查询到的库存是一样的,就意味着没有人在中间修改过库存,那么此时就是安全的,但是以上这种方式通过测试发现会有很多失败的情况

失败的原因在于:在使用乐观锁过程中假设100个线程同时都拿到了100的库存,然后大家一起去进行扣减,但是100个人中只有1个人能扣减成功,其他的人在处理时,他们在扣减时,库存已经被修改过了,所以此时其他线程都会失败

修改代码方案二、

之前的方式要修改前后都保持一致,但是这样我们分析过,成功的概率太低,所以我们的乐观锁需要变一下,改成stock大于0 即可

1
2
3
boolean success = seckillVoucherService.update()
.setSql("stock= stock -1")
.eq("voucher_id", voucherId).update().gt("stock",0); //where id = ? and stock > 0

限制下单

需求:修改秒杀业务,要求同一个优惠券,一个用户只能下一单

比如时间是否充足,如果时间充足,则进一步判断库存是否足够,然后再根据优惠卷id和用户id查询是否已经下过这个订单,如果下过这个订单,则不再下单,否则进行下单

参考:VoucherOrderServiceImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@Override
public Result seckillVoucher(Long voucherId) {
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀尚未开始!");
}
// 3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀已经结束!");
}
// 4.判断库存是否充足
if (voucher.getStock() < 1) {
// 库存不足
return Result.fail("库存不足!");
}
// 5.一人一单逻辑
// 5.1.用户id
Long userId = UserHolder.getUser().getId();
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
// 5.2.判断是否存在
if (count > 0) {
// 用户已经购买过了
return Result.fail("用户已经购买过一次!");
}

//6,扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock= stock -1")
.eq("voucher_id", voucherId).update();
if (!success) {
//扣减库存
return Result.fail("库存不足!");
}
//7.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 7.1.订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);

voucherOrder.setUserId(userId);
// 7.3.代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);

return Result.ok(orderId);

}

存在问题: 现在的问题还是和之前一样,并发过来,查询数据库,都不存在订单,所以我们还是需要加锁,但是乐观锁比较适合更新数据,而现在是插入数据,所以我们需要使用悲观锁操作

注意:在这里提到了非常多的问题,我们需要慢慢的来思考,首先我们的初始方案是封装了一个 createVoucherOrder 方法,同时为了确保他线程安全,在方法上添加了一把 synchronized 锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Transactional
public Result createVoucherOrder(Long voucherId) {
Long userId = UserHolder.getUser().getId();
synchronized(userId.toString().intern()){
// 5.1.查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
// 5.2.判断是否存在
if (count > 0) {
// 用户已经购买过了
return Result.fail("用户已经购买过一次!");
}

// 6.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0
.update();
if (!success) {
// 扣减失败
return Result.fail("库存不足!");
}

// 7.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 7.1.订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 7.2.用户id
voucherOrder.setUserId(userId);
// 7.3.代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);

// 7.返回订单id
return Result.ok(orderId);
}
}

但是以上代码还是存在问题,问题的原因在于 当前方法被 spring 的事务控制,如果你在方法内部加锁,可能会导致当前方法事务还没有提交,但是锁已经释放也会导致问题,所以我们选择将当前方法整体包裹起来,确保事务不会出现问题:如下:

在 seckillVoucher 方法中,添加以下逻辑,这样就能保证事务的特性,同时也控制了锁的粒度

1
2
3
4
5
Long userId = UserHolder.getUser().getId();
synchronized (userId.toString().intern()) {
return this.createVoucherOrder(voucherId);
}

但是以上做法依然有问题,因为你调用的方法,其实是 this. 的方式调用的,事务想要生效,还需要使用代理防止事务失效,所以这个地方,我们需要获得原始的事务对象来操作事务

1
2
3
4
5
6
7
8
// 获取userId准备上锁
Long userId = UserHolder.getUser().getId();
synchronized (userId.toString().intern()) {
// 获取代理对象
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}

分布式锁

分布式锁

满足分布式系统或集群模式下多进程可见并且互斥的锁。

分布式锁的核心思想

让大家都 使用同一把锁,只要大家使用的是同一把锁,那么我们就能锁住线程,不让线程进行,让程序串行执行

分布式锁满足条件

可见性:多个线程都能看到相同的结果,注意:这个地方说的可见性并不是并发编程中指的内存可见性,只是说多个进程之间都能感知到变化

互斥:互斥是分布式锁的最基本的条件,使得程序串行执行

高可用:程序不易崩溃,时时刻刻都保证较高的可用性

高性能:由于加锁本身就让性能降低,所有对于分布式锁本身需要他就较高的加锁性能和释放锁性能

安全性:安全也是程序中必不可少的一环

常见的分布式锁有三种

Mysql:mysql 本身就带有锁机制,但是由于 mysql 性能本身一般,所以采用分布式锁的情况下,其实使用 mysql 作为分布式锁 比较少见

Redis:redis 作为分布式锁是 非常常见 的一种使用方式,现在企业级开发中基本都使用 redis 或者 zookeeper 作为分布式锁,利用 setnx 这个方法,如果插入 key 成功,则表示获得到了锁,如果有人插入成功,其他人插入失败则表示无法获得到锁,利用这套逻辑来实现分布式锁

Zookeeper:zookeeper 也是企业级开发中较好的一个实现分布式锁的方案

Redis 分布式锁

实现分布式锁时需要实现的两个基本方法:

获取锁

  • 互斥:确保只能有一个线程获取锁

    1
    2
    # 添加锁,利用setnx的互斥锁
    SETNX lock thread1
  • 非阻塞:尝试一次,成功返回 true,失败返回 false

释放锁

  • 手动释放

    1
    2
    # 释放锁,删除即可
    DEL key
  • 超时释放:获取锁时添加一个超时时间

Redis 分布式锁误删情况说明

逻辑说明:

持有锁的线程在锁的内部出现了阻塞,导致他的锁自动释放,这时其他线程,线程 2 来尝试获得锁,就拿到了这把锁,然后线程 2 在持有锁执行过程中,线程 1 反应过来,继续执行,而线程 1 执行过程中,走到了删除锁逻辑,此时就会把本应该属于线程 2 的锁进行删除,这就是误删别人锁的情况说明

解决方案:

解决方案就是 在每个线程释放锁的时候,去判断一下当前这把锁是否属于自己,如果属于自己,则不进行锁的删除。
假设还是上边的情况,线程 1 卡顿,锁自动释放,线程 2 进入到锁的内部执行逻辑,此时线程 1 反应过来,然后删除锁,但是线程 1,一看当前这把锁不是属于自己,于是不进行删除锁逻辑,当线程 2 走到删除锁逻辑时,如果没有卡过自动释放锁的时间点,则判断当前这把锁是属于自己的,于是删除这把锁。

解决 Redis 分布式锁误删问题

需求:

修改之前的分布式锁实现,满足:在获取锁时存入 线程标示(可以用 UUID 表示)
在释放锁时先获取锁中的线程标示,判断是否与当前线程标示一致

  • 如果一致则释放锁
  • 如果不一致则不释放锁

核心逻辑:

  • 在存入锁时,放入自己线程的标识
  • 在删除锁时,判断当前这把锁的标识是不是自己存入的。如果是,则进行删除;如果不是,则不进行删除。

分布式锁的原子性问题

线程 1 现在持有锁之后,在执行业务逻辑过程中,他正准备删除锁,而且已经走到了条件判断的过程中,比如他已经拿到了当前这把锁确实是属于他自己的,正准备删除锁,但是 此时他的锁到期

那么此时线程 2 进来,但是线程 1 他会接着往后执行,当他卡顿结束后,他直接就会执行删除锁那行代码,相当于条件判断并没有起到作用,这就是删锁时的原子性问题,之所以有这个问题,是因为线程 1 的拿锁,比锁,删锁,实际上并不是原子性的,我们要防止刚才的情况发生,

Lua 脚本解决多条命令原子性问题

Redis 提供了 Lua 脚本功能,在一个脚本中编写多条 Redis 命令,确保多条命令执行时的原子性。

Redis提供的调用函数,语法如下:

1
redis.call('命令名称', 'key', '其它参数', ...)

要执行set name jack,则脚本是这样:

1
2
# 执行 set name jack
redis.call('set', 'name', 'jack')

要先执行set name Rose,再执行get name,则脚本如下:

1
2
3
4
5
6
# 先执行 set name jack
redis.call('set', 'name', 'Rose')
# 再执行 get name
local name = redis.call('get', 'name')
# 返回
return name

Java 调用 Lua 脚本改造分布式锁

在 RedisTemplate 中,可以利用 execute 方法去执行 lua 脚本,参数对应关系就如下

基于 Redis 的分布式锁实现思路:

  • 利用 set nx/ex 获取锁,并设置过期时间,保存线程标示
  • 释放锁时先判断线程标示是否与自己一致,一致则删除锁

特性:

  • 利用 set nx 满足互斥性
  • 利用 set ex保证故障时锁依然能释放,避免死锁,提高安全性
  • 利用 Redis 集群保证高可用和高并发特性

Redisson 实现分布式锁

基于 setnx 实现的分布式锁存在的问题:

不可重入:同一个线程无法多次获取同一把锁

  • 可重入是指 获得锁的线程可以再次进入到相同的锁的代码块
  • 可重入锁的意义在于防止死锁,常见的 synchronized 和 Lock 锁都是可重入的

不可重试

  • 获取锁只尝试一次就返回 false,没有重试机制
  • 合理情况:当线程在获得锁失败后应该能再次尝试获得锁

超时释放: 锁超时释放虽然可以避免死锁,但如果是业务执行耗时较长,也会导致锁释放,存在安全隐患

  • 在加锁时增加了过期时间,可以防止死锁
  • 但是如果卡顿的时间超长,虽然采用 lua 表达式防止删锁时误删别人的锁,但是毕竟没有锁住,有安全隐患

主从一致性:

  • 如果 Redis 提供主从集群,当我们向集群写数据时,主机需要异步的将数据同步给从机,而万一在同步过去之前主机宕机就会出现死锁问题。

Redisson 是一个在 Redis 的基础上实现的 Java 驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的 Java 常用对象,还提供了许多分布式服务,其中就包含了 各种分布式锁的实现

引入依赖

1
2
3
4
5
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>

配置 Redisson

1
2
3
4
5
6
7
8
9
10
11
12
13
@Configuration
public class RedissonConfig {

@Bean
public RedissonClient redissonClient(){
// 配置
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.150.101:6379")
.setPassword("123321");
// 创建RedissonClient对象
return Redisson.create(config);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
RLock lock = redissonClient.getLock(lockName);
Object result = null;
// 获取锁成功,执行方法
if (lock.tryLock(0, leaseTime, TimeUnit.MILLISECONDS)) {
log.info("{} getLock", lockName);
try {
doSomething();
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
} else {
// 获取锁失败
log.info("{} not getLock", lockName);
}

实现可重入锁

目的:保证同一个线程可以多次同一把锁

获取锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
local key = KEYS[1]; -- 锁的key
local threadId = ARGV[1]; -- 线程唯一标识
local releaseTime = ARGV[2]; -- 锁的自动释放时间
-- 判断是否存在
if (redis.call('exists'), key == 0) then
-- 不存在,获取锁
redis.call('hset', key, threadId, '1');
-- 设置有效期
redis.call('expire', key, releaseTime);
return 1; -- 返回结果
end;
-- 锁已经存在,判断threadId是否是自己
if(redis.call('hexists', key, threadIId) == 1) then
-- 不存在,获取锁,重入次数+1
redis.call('hincrby', key, threadId, '1');
-- 设置有效期
redis.call('expire', key, releaseTime);
return 1; -- 返回结果
end;
return 0; -- 走到此处,说明获取锁的不是自己,获取锁失败

释放锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
local key = KEYS[1]; -- 锁的key
local threadId = ARGV[1]; -- 线程唯一标识
local releaseTime = ARGV[2]; -- 锁的自动释放时间
-- 判断当前锁是否为自己持有
if (redis.call('HEXISTS', key, threadId) == 0) then
return nil; -- 如果不是自己,直接返回
end;
-- 如果是自己的锁,则重入次数-1
local count = redis.call('HINCRBY', key, threadId, -1);
-- 判断锁计数是否为0
if (count > 0) then
-- 大于0说明不能释放锁,重置有效期然后返回
redis.call('EXPIRE', key, releaseTime);
return nil;
else -- 等于0说明可以释放锁,直接删除
redis.call('DEL', key);
return nil;
end;

重试获取锁

基于 Redis Pub/Sub 发布订阅机制。 如果获取锁失败,则阻塞订阅释放锁的消息;当锁被释放时,会触发推送(告诉其他线程我释放锁啦),然后其他线程再重试获取;如此往复,直到超时。

防止锁提前超时释放

基于看门狗机制。 如果不手动设置锁释放时间(leaseTime),默认设置 30 秒过期,并且给当前锁注册一个定时任务,该定时任务每隔 1 / 3 的锁释放时间(一般是 10 秒)会重置锁的过期时间(递归调用,一次续期完了再)。

思考:

  1. 如何保证同一个锁只注册一个定时任务
  2. 如何防止无限续期

    要解决这些问题,使用全局 ConcurrentHashMap 来管理锁 => 任务信息,key 为锁的 id,从而保证唯一。当某个锁释放时,从全局 ConcurrentHashMap 中取出定时任务并取消掉,然后把锁的信息从 Map 中删掉即可。

完整的分布式锁流程:

解决主从一致性问题

如果使用主从复制的 Redis 集群,可能出现主从节点设置的锁状态不一致的问题。

此时我们去写命令,写在主机上, 主机会将数据同步给从机,但是假设在主机还没有来得及把数据写入到从机去的时候,此时主机宕机,哨兵会发现主机宕机,并且选举一个 slave 变成 master,而此时新的 master 中实际上并没有锁信息,此时锁信息就已经丢掉了。

为了解决这个问题,redission 提出 MutiLock 锁,使用这把锁就不使用主从了,每个节点的地位都是一样的, 这把锁加锁的逻辑需要写入到每一个主丛节点上,只有所有的服务器都写入成功,此时才是加锁成功,假设现在某个节点挂了,那么他去获得锁的时候,只要有一个节点拿不到,都不能算是加锁成功,就保证了加锁的可靠性。

实现 MultiLock 的几个关键:

  1. 遍历所有节点,依次设置锁,并使用列表来记录所有主节点的锁是否设置成功。
  2. 只要有一个节点设置不成功,就要释放所有的锁,从头来过。
  3. 因为不同节点设置锁成功的时间不同,所以在所有锁设置成功后,要统一设置过期时间(但如果 leaseTime = -1 就不用了,因为开启了看门狗机制会自动续期)
  4. 锁释放时间(leaseTime)必须要大于抢锁最大等待时间(waitTime),否则可能出现第一个节点抢到锁,最后一个节点还没抢到锁,之前的锁就已经超时释放了。所以如果指定了 waitTime 和 leaseTime,默认 leaseTime = waitTime * 2。

秒杀优化

优化思路

  1. 串行改并行:原本由 1 个线程的操作改为由 2 个或多个线程同时操作,比如 1 个线程负责判断秒杀资格,1 个线程负责减库存 + 创建订单(写)
  2. 同步改异步:判断完秒杀资格后,就可以返回订单 id 给前端;其余的写库操作可以异步执行。
  3. 提高判断秒杀资格的性能:读 DB 改为读 Redis

异步秒杀

下单流程

当用户发起请求,此时会请求nginx,nginx会访问到tomcat,而tomcat中的程序,会进行串行操作,分成如下几个步骤

  1. 查询优惠卷

  2. 判断秒杀库存是否足够

  3. 查询订单

  4. 校验是否是一人一单

  5. 扣减库存

  6. 创建订单

在这六步操作中,又有很多操作是要去操作数据库的,而且还是一个线程串行执行, 这样就会导致程序执行的很慢,所以需要异步程序执行,那么如何加速呢?

优化方案:

耗时比较短的逻辑判断放入到redis中,比如是否库存足够,比如是否一人一单,只要这种逻辑可以完成,就意味着我们是一定可以下单完成的,我们只需要进行快速的逻辑判断,根本就不用等下单逻辑走完,我们直接给用户返回成功, 再在后台开一个线程,后台线程慢慢的去执行queue里边的消息,这样程序不就超级快了吗?而且也不用担心线程池消耗殆尽的问题,因为这里我们的程序中并没有手动使用任何线程池

两个难点

第一个难点 怎么在redis中去快速校验一人一单,还有库存判断

第二个难点 由于我们校验和tomct下单是两个线程,那么我们如何知道到底哪个单他最后是否成功,或者是下单完成,为了完成这件事我们在redis操作完之后,我们会将一些信息返回给前端,同时也会把这些信息丢到异步queue中去,后续操作中,可以通过这个id来查询我们tomcat中的下单逻辑是否完成了。

整体思路:

当用户下单之后,判断库存是否充足只需要到redis中去根据key找对应的value是否大于0即可

  • 如果不充足,则直接结束
  • 如果充足,继续在redis中判断用户是否可以下单
  • 如果set集合中没有这条数据,说明他可以下单,如果set集合中没有这条记录,则将userId和优惠卷存入到redis中,并且返回0,整个过程需要保证是原子性的,我们可以使用lua来操作

当以上判断逻辑走完之后,我们可以判断当前redis中返回的结果是否是0 ,如果是0,则表示可以下单,则将之前说的信息存入到到queue中去,然后返回,然后再来个线程异步的下单,前端可以通过返回的订单id来判断是否下单成功。

基于Redis完成秒杀资格判断

需求:

  1. 新增秒杀优惠券的同时,将优惠券信息保存到Redis中

  2. 基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功

  3. 如果抢购成功,将优惠券id和用户id封装后存入阻塞队列

  4. 开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能

小总结:

秒杀业务的优化思路

  • 先利用Redis完成库存余量、一人一单判断,完成抢单业务
  • 再将下单业务放入阻塞队列,利用独立线程异步下单
  • 基于阻塞队列的异步秒杀存在哪些问题?
    • 内存限制问题
    • 数据安全问题(如果宕机了,内存中的数据就没了)

Redis 消息队列

  • 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
  • 生产者:发送消息到消息队列
  • 消费者:从消息队列获取消息并处理消息

秒杀场景:下单之后,利用redis去进行校验下单条件,再通过队列把消息发送出去,然后再启动一个线程去消费这个消息,完成解耦,同时也加快响应速度。

基于List实现消息队列

使用Redis List的结构作为消息队列,使用LPush模拟生产者发送消息入队,使用BRPOP(阻塞弹出)模拟消费者取出消息。没有消息时会保持阻塞状态,从而实现类似JVM阻塞队列的效果

优点

  • 利用Redis存储,不受限于JVM内存上限
  • 基于Redis的持久化机制,数据安全性有保证
  • 可以满足消息有序性

缺点

  • 无法避免消息丢失
  • 只支持单消费者

基于PubSub的消息队列

使用Redis的订阅发布模型,生产者可以讲消息推送给某个Channel,多个消费者可以订阅该Channel,从而同时得到消息

命令 说明
SUBSCRIBE channel [channel] 订阅一个或多个频道
PUBLISH channel msg 向一个频道发送消息
PSUBSCRIBE pattern[pattern] 订阅与pattern格式匹配的所有频道

优点

采用发布订阅模型,支持多生产、多消费

缺点

  • 不支持数据持久化
  • 无法避免消息丢失
  • 消息堆积有上限,超出时数据丢失

基于Stream的消息队列

Redis 5.0推出的数据结构,可以实现单向的消息队列

命令 说明
XAdd 添加消息/创建队列,消息会自动持久化、不会丢失,每个消息都有唯一id
XRead 读取消息,支持多消费者读、可以从指定消息id开始读、支持阻塞读最新消息

发送消息

读取消息

在业务开发中,我们可以循环的调用XREAD阻塞方式来查询最新消息,从而实现持续监听队列的效果,伪代码如下

1
2
3
4
5
6
7
8
9
while (true) {
// 尝试读取队列中的消息,最多阻塞2秒
Object msg = redis.execute("XREAD COUNT 1 BLOCK 2000 STREAMS users $");
if (msg == null) {
continue;
}
// 处理消息
handleMsg(msg);
}

注意:当指定起始ID为$时,代表读取最新的消息,如果处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题

STREAM类型消息队列的XREAD命令特点:

  • 消息可回溯
  • 一个消息可以被多个消费者读取
  • 可以阻塞读取
  • 有消息漏读的风险

消费者组

只用这两个命令还是不够的,因为目前只支持阻塞读最新消息,假设处理消息过程中又来了几条消息,可能出现漏读消息的情况 。为解决上述问题,可以用 Stream 的以下特性:

消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。

  • 消息分流:队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度
  • 消息标识:消费者组会维护一个标识,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标识之后读取消息。确保每一个消息都会被消费
  • 消息确认:消费者获取消息后,消息处于一个pending状态,并存入一个pending-list。当处理完成后需要通过XACK来确认消息,才会从pending-list移除。这样如果消费业务处理异常,可以从 pending list 的开头依次读取未确认消息,重试处理。(也要避免无限重试,实在处理不成功就强制 ACK + 业务记日志)

创建消费者组

1
XGROUP CREATE key groupName ID [MKSTREAM]
参数 说明
key 队列名称
groupName 消费者组名称
ID 起始ID标识,$代表队列中最后一个消息,0则代表队列中第一个消息
MKSTRAEM 队列不存在时自动创建队列

:自动记录消费的进度,支持从上次未消费的地方开始接着消费,保证每条消息按顺序消费

删除指定的消费者组

1
XGROUP DESTORY key groupName

给指定的消费者组添加消费者

1
XGROUP CREATECONSUMER key groupname consumername

删除消费者组中的指定消费者

1
XGROUP DELCONSUMER key groupname consumername

从消费者组读取消息

1
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
参数 说明
group 消费组名称
consumer 消费者名称,如果消费者不存在,会自动创建一个消费者
count 本次查询的最大数量
BLOCK milliseconds 当没有消息时最长等待时间
NOACK 无需手动ACK,获取到消息后自动确认
STREAMS key 指定队列名称
ID 获取消息的起始ID:

注意

>:从下一个未消费的消息开始
其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始

STREAM类型消息队列的XREADGROUP命令特点:

  • 消息可回溯
  • 可以多消费者争抢消息,加快消费速度
  • 可以阻塞读取
  • 没有消息漏读的风险
  • 有消息确认机制,保证消息至少被消费一次

整个消费流程伪代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
while (true) {
// 尝试监听队列,使用阻塞模式,最长等待时间2000ms
Object msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1");
if (msg == null) { // null 说明没有消息,继续下次循环
continue;
}
try {
// 处理消息,处理完成后进行确认处理
handleMessage(msg);
} catch (Exception) {
while (true) {
Object msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 STREAMS s1 0");
if (msg == null) { // null 说明没有异常消息,所有消息已经确认,可以结束循环
break;
}
try {
// 说明有异常消息,再次处理
handleMessage(msg);
} catch (Exception e) {
// 再次出现异常,记录日志,继续循环
continue;
}
}
}
}

Java中操作Redis Stream方法

  • 调用Lua
  • 使用 Redis Tempalte 的 opsForStream()