Redis项目学习笔记

学习来源:黑马程序员

学习时间:2023年6月26日

0 Linux环境Redis的安装

暂略

1 Redis的Java客户端

1.1 对比

在Redis官网中提供了各种语言的客户端。Java客户端主要如下:

  • Jedis

以Redis命令作为方法名称,学习成本低,简单实用。但是Jedis实例是线程不安全的,多线程环境下需要基于连接池来使用。

  • lettuce

Lettuce是基于Netty实现的,支持同步、异步和响应式编程方式,并且是线程安全的。支持Redis的哨兵模式、集群模式和管道模式。

  • Redisson

Redisson是一个基于Redis实现的分布式、可伸缩的Java数据结构集合。包含了诸如Map、Queue、Lock、 Semaphore、AtomicLong等强大功能。

1.2 Jedis

1.2.1 代码示例

引入依赖:

1
2
3
4
5
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.7.0</version>
</dependency>

测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class JedisTest {
private Jedis jedis;

@Before
public void setUp() {
// 建立连接
jedis = new Jedis("xxx.xxx.xxx.xxx", 6379);
jedis.auth("123456");
// 选择数据库
jedis.select(0);
}

@Test
public void setString() {
// 存入数据
String result = jedis.set("name", "Hongyi");
System.out.println("Result = " + result);
// 获取数据
String name = jedis.get("name");
System.out.println("name = " + name);
}

@After
public void tearDown() {
if (jedis != null) {
jedis.close();
}
}
}

执行结果:

1
2
Result = OK
name = Hongyi

1.2.2 Jedis连接池

Jedis本身是线程不安全的,并且频繁的创建和销毁连接会有性能损耗,因此推荐使用Jedis连接池代替Jedis的直连方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class JedisConnectionFactory {
private static final JedisPool jedisPool;

static {
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
// 最大连接
jedisPoolConfig.setMaxTotal(8);
// 最大空闲连接
jedisPoolConfig.setMaxIdle(8);
// 最小空闲连接
jedisPoolConfig.setMinIdle(0);
// 设置最长等待时间, ms
jedisPoolConfig.setMaxWaitMillis(200);
jedisPool = new JedisPool(jedisPoolConfig, "192.168.150.101", 6379, 1000, "123456");
}

// 获取jedis对象
public static Jedis getJedis() {
return jedisPool.getResource();
}
}

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class JedisTest {
private Jedis jedis;

@Before
public void setUp() {
// 通过连接池获取连接
jedis = JedisConnectionFactory.getJedis();
jedis.auth("123456");
// 选择数据库
jedis.select(0);
}

@Test
public void setString() {
// 存入数据
String result = jedis.set("name", "Hongyi");
System.out.println("Result: " + result);
// 获取数据
String name = jedis.get("name");
System.out.println("name: " + name);
}

@After
public void tearDown() {
if (jedis != null) {
jedis.close();
}
}
}

1.3 Spring Data Redis

1.3.1 简介

SpringData是Spring中数据操作的模块,包含对各种数据库的集成,其中对Redis的集成模块就叫做SpringDataRedis。

特点:

  • 提供了对不同Redis客户端的整合(Lettuce和Jedis)
  • 提供了RedisTemplate统一API来操作Redis
  • 支持Redis的发布订阅模型
  • 支持Redis哨兵和Redis集群
  • 支持基于Lettuce的响应式编程
  • 支持基于JDK、JSON、字符串、Spring对象的数据序列化及反序列化
  • 支持基于Redis的JDKCollection实现

1.3.2 代码示例

SpringDataRedis中提供了RedisTemplate工具类,其中封装了各种对Redis的操作。并且将不同数据类型的操作API封装到了不同的类型中:

API 返回值类型 说明
redisTemplate.opsForValue() ValueOperations 操作String类型数据
redisTemplate.opsForHash() HashOperations 操作Hash类型数据
redisTemplate.opsForList() ListOperations 操作List类型数据
redisTemplate.opsForSet() SetOperations 操作Set类型数据
redisTemplate.opsForZSet() ZSetOperations 操作SortedSet类型数据
redisTemplate 通用的命令

SpringBoot已经提供了对SpringDataRedis的支持,使用非常简单:

1
2
3
4
5
6
7
8
9
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- 连接池依赖 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>

配置:

1
2
3
4
5
6
7
8
9
10
spring:
redis:
host: xxx.xxx.xxx.xxx
port: 6379
password: 123456
lettuce:
pool:
max-active: 8
min-idle: 0
max-wait: 100

测试类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@SpringBootTest
class SpringDataRedisDemoApplicationTests {
// 注入RedisTemplate
@Autowired
private RedisTemplate redisTemplate;

@Test
void testString() {
redisTemplate.opsForValue().set("name", "Takao");
// 获取数据
Object name = redisTemplate.opsForValue().get("name");
System.out.println("name = " + name);
}

}

执行结果:

1
name = Takao

1.3.3 序列化

问题:上面测试后,此时redis里的键值对理应是name=Takao,但是:

1
2
127.0.0.1:6379> get name
"Hongyi"

原因:RedisTemplate可以接收任意Object作为值写入Redis,只不过写入前会把Object序列化为字节形式,默认是采用JDK序列化。

解决:自定义RedisTemplate的序列化方式

1.3.4 StringRedisTemplate

Spring默认提供了一个StringRedisTemplate类,它的key和value的序列化方式默认就是String方式。省去了我们自定义RedisTemplate的过程。

步骤:

  • 写入Redis时,手动把对象序列化为JSON
    • 例如使用JSONUtil.toJsonStr
  • 读取Redis时,手动把读取到的JSON反序列化为对象
    • 例如使用JSONUtil.toBean

2 项目介绍

2.1 项目架构

名称:黑马点评项目

项目结构:

image-20230626182657712

2.2 后端项目部署

后端代码结构:

image-20230627141026335

配置文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
server:
port: 8081
spring:
application:
name: hmdp
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/hmdp?useSSL=false&serverTimezone=UTC
username: root
password: 123
redis:
host: 192.168.150.101
port: 6379
password: 123321
lettuce:
pool:
max-active: 10
max-idle: 10
min-idle: 1
time-between-eviction-runs: 10s
jackson:
default-property-inclusion: non_null # JSON处理时忽略非空字段
mybatis-plus:
type-aliases-package: com.hongyi.entity # 别名扫描包
logging:
level:
com.hmdp: debug

需要根据服务器实际情况,修改其中mysql和redis的配置。

2.2 前端项目部署

略,使用postman来测试接口

3 短信登录

3.1 基于Session实现登录

流程:

image-20230626192654706

3.1.1 发送验证码

说明
请求方式 POST
请求路径 /user/code
请求体 phone
返回值

代码

  • 控制层UserController:省略其他代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@RestController
@RequestMapping("/user")
public class UserController {

@Resource
private IUserService userService;

@PostMapping("code")
public Result sendCode(@RequestParam("phone") String phone, HttpSession session) {
// TODO 发送短信验证码并保存验证码
return userService.sendCode(phone, session);
}
// ...

}
  • 业务层UserServiceImpl,接口略
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Slf4j
@Service
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {

@Override
public Result sendCode(String phone, HttpSession session) {
// 1.校验手机号
if (RegexUtils.isPhoneInvalid(phone)) {
// 2.不符合则返回错误信息
return Result.fail("手机号格式错误");
}
// 3.符合,则生成一个验证码
String code = RandomUtil.randomNumbers(6);
// 4.保存验证码到session
session.setAttribute("code", code);
// 5.发送验证码(模拟)
log.info("发送验证码成功, 验证码为: {}", code);
// 返回ok
return Result.ok();
}

}

其中,dto.Result封装返回信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Result {
private Boolean success;
private String errorMsg; // 错误信息
private Object data;
private Long total;

public static Result ok(){
return new Result(true, null, null, null);
}
public static Result ok(Object data){
return new Result(true, null, data, null);
}
public static Result ok(List<?> data, Long total){
return new Result(true, null, data, total);
}
public static Result fail(String errorMsg){
return new Result(false, errorMsg, null, null);
}
}

接口测试:这里采用Postman进行接口测试:

image-20230627150109585

1
2023-06-27 14:57:42.916  INFO 3256 --- [nio-8081-exec-1] com.hongyi.service.impl.UserServiceImpl  : 发送验证码成功, 验证码为: 348237

3.1.2 登录和注册

  • 控制器UserController
1
2
3
4
5
@PostMapping("/login")
public Result login(@RequestBody loginForm loginForm, HttpSession session){
// TODO 实现登录功能
return userService.login(loginForm, session);
}

这里将请求体(前端应该发送json格式)中的数据封装到了loginForm类中:

1
2
3
4
5
6
@Data
public class LoginFormDTO {
private String phone; // 手机
private String code; // 验证码
private String password; // 密码
}
  • 业务层UserServiceImpl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Override
public Result login(LoginFormDTO loginForm, HttpSession session) {
String phone = loginForm.getPhone();
// 校验手机号
if (RegexUtils.isPhoneInvalid(phone)) {
// 不符合则返回错误信息
return Result.fail("手机号格式错误");
}

// 校验验证码
String cacheCode = (String) session.getAttribute("code");
String code = loginForm.getCode();
if (cacheCode == null || !cacheCode.equals(code)) {
return Result.fail("验证码错误");
}

// 根据手机号查询数据库,MyBatisPlus
User user = query().eq("phone", phone).one();
if (user == null) {
// 不存在则创建新用户
user = createUserWithPhone(phone);
}

// 保存user到session
session.setAttribute("user", user);
return Result.ok();
}

private User createUserWithPhone(String phone) {
User user = new User();
user.setPhone(phone);
user.setNickName(USER_NICK_NAME_PREFIX + RandomUtil.randomString(10));
// 保存用户到数据库
save(user);
return user;
}

接口测试:

image-20230627153251378

再查看数据库即可。

3.1.3 登录拦截器

使用拦截器:

image-20230627153651752

代码实现

  • 拦截器interceptor.LoginInterceptor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class LoginInterceptor implements HandlerInterceptor {

@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
// 获取session
HttpSession session = request.getSession();
// 获取session里的用户
User user = (User) session.getAttribute("user");
// 判断用户是否存在
if (user == null) {
// 不存在则拦截
response.setStatus(401);
return false;
}
// 将用户将用户信息保存到ThreadLocal
UserDTO userDTO = new UserDTO();
userDTO.setId(user.getId());
userDTO.setNickName(user.getNickName());
userDTO.setIcon(user.getIcon());
UserHolder.saveUser(userDTO);
return true;
}

@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
// 移除用户
UserHolder.removeUser();
}

}

UserDTO包含了User类的部分成员变量,用于隐藏敏感信息。

1
2
3
4
5
6
@Data
public class UserDTO {
private Long id;
private String nickName;
private String icon;
}
  • 注册拦截器,新建config.MvcConfig配置类
1
2
3
4
5
6
7
8
9
10
11
12
13
@Configuration
public class MvcConfig implements WebMvcConfigurer {
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new LoginInterceptor())
.excludePathPatterns(
"/blog/hot",
"/user/code",
"/user/login",
"/shop/**"
);
}
}

excludePathPatterns用于排除拦截的路径,即对这些路径予以放行。

  • 控制器UserController
1
2
3
4
5
6
@GetMapping("/me")
public Result me(){
// TODO 获取当前登录的用户并返回
UserDTO user = UserHolder.getUser();
return Result.ok(user);
}

接口测试:

image-20230627160413599

3.1.4 集群的session共享问题

session共享问题:多台Tomcat并不共享session存储空间,当请求切换到不同tomcat服务时导致数据丢失的问题。

image-20230627161153975

session的替代方案应该满足:

  • 数据共享
  • 内存存储
  • key、value结构

3.2 基于Redis实现的登录

3.2.1 流程分析

image-20230627170722937

image-20230627170821682

3.2.2 发送验证码

发送验证码后,将phone=code保存在redis中,而不是session中。

另外设置键值对的有效期。

1
2
3
4
5
6
7
8
9
10
@Resource
private StringRedisTemplate stringRedisTemplate;

@Override
public Result sendCode(String phone, HttpSession session) {
// ...
// 保存验证码到redis,2min后过期
stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY + phone, code, 2, TimeUnit.MINUTES);
// ...
}

常量LOGIN_CODE_KEY是一个识别的前缀,定义在RedisConstants

1
2
3
4
public class RedisConstants {
public static final String LOGIN_CODE_KEY = "login:code:";
// ...
}

测试结果:

image-20230627175328689

3.2.3 登录和注册

  • 根据手机号,从redis获取验证码并校验。
  • 将用户信息保存在redis中,可以采用(前缀+随机token)=hash的方式存储

例如:

image-20230627171824368

代码实现

  • UserServiceImpl/login
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Override
public Result login(LoginFormDTO loginForm, HttpSession session) {
String phone = loginForm.getPhone();
// 校验手机号
if (RegexUtils.isPhoneInvalid(phone)) {
// 不符合则返回错误信息
return Result.fail("手机号格式错误");
}

// 从redis获取验证码并校验
String cacheCode = stringRedisTemplate.opsForValue().get(RedisConstants.LOGIN_CODE_KEY + phone);
String code = loginForm.getCode();
if (cacheCode == null || !cacheCode.equals(code)) {
return Result.fail("验证码错误");
}

// 根据手机号查询数据库
User user = query().eq("phone", phone).one();
if (user == null) {
// 不存在则创建新用户
user = createUserWithPhone(phone);
}
// 保存user到redis
// 生成随机token
String token = UUID.randomUUID().toString(true);
// 将user对象转换为Hash存储
UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
Map<String, Object> userMap = BeanUtil.beanToMap(userDTO,
new HashMap<>(),
CopyOptions.create().setIgnoreNullValue(true).setFieldValueEditor(
(fieldName, fieldValue) -> fieldValue.toString()
));
// 将用户信息存储在redis
String tokenKey = RedisConstants.LOGIN_USER_KEY + token;
stringRedisTemplate.opsForHash().putAll(tokenKey, userMap);
// 为该键值对设置有效期
stringRedisTemplate.expire(tokenKey, 30, TimeUnit.MINUTES);
// 返回token
return Result.ok(token);
}

在session中,默认超时时间为30min,如果连接有活动,则从新计算。

上面在设置键值对的超时时长为30min,即使连接有活动,也会过期,因此需要在拦截器中重新刷新过期时长。

  • 拦截器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class LoginInterceptor implements HandlerInterceptor {

private StringRedisTemplate stringRedisTemplate;

public LoginInterceptor(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}

@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
// 获取请求头中的token
String token = request.getHeader("authorization");
if (token == null) {
response.setStatus(401);
return false;
}
// 完整的token
String keyToken = RedisConstants.LOGIN_USER_KEY + token;
// 基于token获取redis中的用户
Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(keyToken);
// 判断用户是否存在
if (userMap.isEmpty()) {
// 不存在则拦截
response.setStatus(401);
return false;
}
// 将hash数据转为UserDTO对象
UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
// 将用户将用户信息保存到ThreadLocal
UserHolder.saveUser(userDTO);
// 刷新token的有效期
stringRedisTemplate.expire(keyToken, 30, TimeUnit.MINUTES);
return true;
}

// ...

}

注意,这里不能注入StringRedisTemplate,因为拦截器类不是容器管理的,因此需要在配置内中将StringRedisTemplate注入,再构造传参:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Configuration // 受容器管理
public class MvcConfig implements WebMvcConfigurer {

@Resource
private StringRedisTemplate stringRedisTemplate;

@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new LoginInterceptor(stringRedisTemplate)) // 传入注入的模板类
.excludePathPatterns(
"/blog/hot",
"/user/code",
"/user/login",
"/shop/**"
);
}
}

接口测试:

注册后传回:

1
2
3
4
{
"success": true,
"data": "6ef6bf998625439187f5ba932b22486e"
}

redis数据库状态:

image-20230627180053564

使用token访问/user/me接口:

image-20230627180243153

3.2.4 拦截器优化

image-20230627192245904

代码示例

  • 刷新拦截器RefreshTokenInterceptor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class RefreshTokenInterceptor implements HandlerInterceptor {

private StringRedisTemplate stringRedisTemplate;

public RefreshTokenInterceptor(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}

@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
// 获取请求头中的token
String token = request.getHeader("authorization");
if (token == null) {
return true;
}
// 完整的token
String keyToken = RedisConstants.LOGIN_USER_KEY + token;
// 基于token获取redis中的用户
Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(keyToken);
// 判断用户是否存在
if (userMap.isEmpty()) {
return true;
}
// 将hash数据转为UserDTO对象
UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
// 将用户将用户信息保存到ThreadLocal
UserHolder.saveUser(userDTO);
// 刷新token的有效期
stringRedisTemplate.expire(keyToken, 30, TimeUnit.MINUTES);
return true;
}

@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
// 移除用户
UserHolder.removeUser();
}

}
  • 登录拦截器LoginInterceptor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class LoginInterceptor implements HandlerInterceptor {

@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
// 判断是否需要拦截
if(UserHolder.getUser() == null) {
response.setStatus(401);
return false;
}
// 有用户则放行
return true;
}

@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
// 移除用户
UserHolder.removeUser();
}

}
  • 拦截器配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Configuration
public class MvcConfig implements WebMvcConfigurer {

@Resource
private StringRedisTemplate stringRedisTemplate;

@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new LoginInterceptor())
.excludePathPatterns(
"/blog/hot",
"/user/code",
"/user/login",
"/shop/**"
).order(1);

// 拦截所有请求
registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate)).addPathPatterns("/**").order(0);
}
}

4 查询缓存

4.1 什么是缓存

缓存就是数据交换的缓冲区,是存贮数据的临时地方,一般读写性能较高。

image-20230627192954029

缓存的作用:

  • 降低后端负载
  • 提高读写效率,降低响应时间

缓存的成本:

  • 数据一致性成本
  • 代码维护成本
  • 运维成本

4.2 添加Redis缓存

以通过商铺ID查询商铺为例。

4.2.1 流程分析

没有缓存时的模型:

image-20230627193246031

有缓存的模型:

image-20230627193258463

根据id查询商铺缓存的流程:

image-20230627193347492

4.2.2 代码实现1

业务:通过id查询商户信息。

  • 控制器ShopController
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@RestController
@RequestMapping("/shop")
public class ShopController {

@Resource
public IShopService shopService;

/**
* 根据id查询商铺信息
* @param id 商铺id
* @return 商铺详情数据
*/
@GetMapping("/{id}")
public Result queryShopById(@PathVariable("id") Long id) {
return shopService.queryById(id);
}
// ...
}
  • 业务层ShopServiceImpl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Service
public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService {

@Resource
private StringRedisTemplate stringRedisTemplate;

@Override
public Result queryById(Long id) {
String key = RedisConstants.CACHE_SHOP_KEY + id;
// 查询redis缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);
// 判断商户是否在缓存中
if (shopJson != null) {
// 在则直接返回
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return Result.ok(shop);
}

// 不存在则查询数据库
Shop shop = getById(id);
if (shop == null) {
// 数据库不存在则返回错误
return Result.fail("店铺不存在");
}

// 数据库存在,则需要写入缓存
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop));
return Result.ok(shop);
}

}

接口测试:

第一次查询:用时277ms,因为走的是数据库

image-20230627195636107

第二次查询:用时141ms,走的是redis缓存

image-20230627195652453

此时redis缓存:

image-20230627195752433

4.2.3 代码实现2

给店铺类型查询业务添加缓存。略。

4.3 缓存更新策略

缓存更新是指在数据发生变化时,保持缓存和数据库的数据一致性的问题。如果缓存和数据库的数据不一致,会导致用户看到过期或者错误的数据,影响业务逻辑和用户体验。

一致性的含义:

  • 如果缓存有数据,则要求缓存的数据和数据库的一致
  • 若缓存无数据,则数据库值必须是最新值

4.3.1 简介

内存淘汰 超时剔除 主动更新
说明 不用自己维护,利用Redis的内存淘汰机制,当内存不足时自动淘汰部分数据。下次查询时更新缓存。 给缓存数据添加TTL时间,到期后自动删除缓存。下次查询时更新缓存。 编写业务逻辑,在修改数据库的同时,更新缓存。
一致性 一般
维护成本

业务场景:

  • 低一致性需求:使用内存淘汰机制。例如店铺类型的查询缓存
  • 高一致性需求:主动更新,并以超时剔除作为兜底方案。例如店铺详情查询的缓存

4.3.2 主动更新策略

  • **Cache Aside Pattern**:程序与缓存和数据库两者交互

    • 查询策略:先查询缓存,如果缓存中没有,则查询数据库,并将结果写入缓存
    • 更新策略:先更新数据库,然后删除缓存或者更新缓存
  • Read/Write Through Pattern:程序只和缓存交互,缓存与数据库的一致性由缓存来维护

    • 查询策略:先查询缓存,如果缓存中没有,则缓存从数据库中加载数据,并写入缓存
    • 更新策略:先更新缓存,再由缓存同步更新数据库
  • Write Behind Caching Pattern:调用者只操作缓存,由其它线程异步的将缓存数据持久化到数据库,保证最终一致。

① Cache Aside Pattern
  • 查询策略

通过代码查询缓存,缓存命中则返回,如果没有命中则查询数据库并设置值。

image-20231102151827640

  • 更新策略:通过代码更新缓存,先更新数据库,后更新缓存

image-20231102151843858

  • 延迟双删

在更新数据库后,先删除缓存,然后等待一段时间,再次删除缓存。这样做的目的是为了防止在数据库和缓存主从同步的过程中,有其他请求查询到旧的缓存数据,并写回到缓存中,具体的流程如下:

② Read/Write Through
  • 查询策略

先查询缓存,如果缓存没有,由缓存去数据库查询,而不是应用层,查询后更新缓存。

image-20231102152837613

  • 更新策略

先更新缓存,再由缓存同步更新数据库

image-20231102152914076

③ Write Behind

Write Behind 策略是指在写入数据时,只更新缓存中的数据,然后建立一个异步任务或者定时任务来批量更新数据库中的数据。这样,应用程序无需等待数据库的响应,也无需自己去同步更新数据库和缓存,而是交由缓存服务来完成这些操作。

image-20231102152942323

4.3.3 代码实现

需求:给查询商铺的缓存添加超时剔除和主动更新的策略

修改ShopController中的业务逻辑,满足下面的需求:

  • 根据id查询店铺时,如果缓存未命中,则查询数据库,将数据库结果写入缓存,并设置超时时间(超时剔除)
  • 根据id修改店铺时,先修改数据库,再删除缓存(主动更新)

代码示例

1
2
// 数据库存在,则需要写入缓存,并添加过期时间
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), 30, TimeUnit.MINUTES);
  • 控制层
1
2
3
4
5
6
7
8
9
/**
* 更新商铺信息
* @param shop 商铺数据
* @return
*/
@PutMapping
public Result updateShop(@RequestBody Shop shop) {
return shopService.update(shop);
}
  • 业务层
1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
@Transactional // 事务,如果更新出错可以回滚
public Result update(Shop shop) {
Long id = shop.getId();
if (id == null) {
return Result.fail("店铺id不能为空");
}
// 先更新数据库
updateById(shop);
// 后删除缓存
stringRedisTemplate.delete(RedisConstants.CACHE_SHOP_KEY + id);
return Result.ok();
}

4.4 缓存穿透

4.4.1 简介

缓存穿透是指客户端请求的数据(恶意数据)在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库。

常见的解决方案有两种:

  • 缓存空对象(“”
    • 优点:实现简单,维护方便
    • 缺点:额外的内存消耗;可能造成短期的不一致
  • 布隆过滤
    • 优点:内存占用较少,没有多余的key
    • 缺陷:实现复杂;存在误判的可能
  • 增强id复杂度,避免被猜测id规律(可以采用雪花算法)

image-20230628171304094

4.4.2 代码实现

通过添加空值(意思是value为“”)来解决:

image-20230628171618911

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Override
public Result queryById(Long id) {
String key = RedisConstants.CACHE_SHOP_KEY + id;
// 查询redis缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);
// 判断商户是否在缓存中
// 或者: shopJson != null && shopJson != ""
if (StrUtil.isNotBlank(shopJson)) {
// 在则直接返回
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return Result.ok(shop);
}

// 命中的是空值(shopJson=="")
if (shopJson == "") {
return Result.fail("店铺不存在");
}

// 不存在则查询数据库
Shop shop = getById(id);
if (shop == null) {
// 将空值写入redis
stringRedisTemplate.opsForValue().set(key, "", 2, TimeUnit.MINUTES);
return Result.fail("店铺不存在");
}

// 数据库存在,则需要写入缓存,并添加过期时间
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), 30, TimeUnit.MINUTES);
return Result.ok(shop);
}

StrUtil.isNotBlank判断字符串是否不为空,不为空是指:

  • 不为null
  • 长度不为0,即不为“”

4.5 缓存雪崩

缓存雪崩是指在同一时段大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力。

解决方案:

  • 给不同的Key的TTL添加随机值
  • 利用Redis集群提高服务的可用性
  • 给缓存业务添加降级限流策略
  • 给业务添加多级缓存

image-20230628172527272

4.6 缓存击穿

4.6.1 简介

缓存击穿问题也叫热点Key问题,就是一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。

image-20230628172650327

常见的解决方案:

  • 互斥锁
  • 逻辑过期
解决方案 优点 缺点
互斥锁 没有额外的内存消耗,保证一致性实现简单 线程需要等待,性能受影响。可能有死锁风险
逻辑过期 线程无需等待,性能较好 不保证一致性,有额外内存消耗,实现复杂

  • 互斥锁:

image-20230628172935325

  • 逻辑过期

image-20230628173012624

其他解决方案:

  • 延迟缓存双写策略:在缓存中查询数据时,如果数据不存在,不立即去数据库中查询,而是先在缓存中写入一个空对象,然后再去数据库中查询数据并更新缓存,从而避免了缓存击穿的问题。
  • 设置热点数据永不过期

4.6.2 代码实现——互斥锁

需求:修改根据id查询商铺的业务,基于互斥锁方式来解决缓存击穿问题。

image-20230628173454896

代码实现

  • 获取锁
1
SETNX KEY_NAME VALUE

设置成功,返回 1 。 设置失败,返回 0 。

  • 释放锁
1
DEL KEY_NAME

  • ShopServiceImpl

互斥锁代码:

1
2
3
4
5
6
7
8
9
private boolean tryLock(String key) {
// 设置值和有效期
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
return BooleanUtil.isTrue(flag);
}

private void unlock(String key) {
stringRedisTemplate.delete(key);
}

修改后的业务代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@Override
public Result queryById(Long id) {
// Shop shop = queryWithPassThrough(id);
Shop shop = queryWithMutex(id);
if (shop == null) {
return Result.fail("店铺不存在");
}
return Result.ok(shop);
}

public Shop queryWithMutex(Long id) {
String key = RedisConstants.CACHE_SHOP_KEY + id;
// 查询redis缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);
// 判断商户是否在缓存中
if (StrUtil.isNotBlank(shopJson)) {
// 在则直接返回
return JSONUtil.toBean(shopJson, Shop.class);
}

// 命中的是空值,直接返回,防止访问数据库
if (shopJson != null) {
return null;
}
// 缓存重建
// 获取互斥锁
String lockKey = null;
Shop shop = null;
try {
// 互斥锁的key
lockKey = RedisConstants.LOCK_SHOP_KEY + id;
boolean isLock = tryLock(lockKey);
if (!isLock) {
// 上锁失败,则休眠并重试
Thread.sleep(50);
return queryWithMutex(id);
}
// 不存在则查询数据库
shop = getById(id);
if (shop == null) { // 数据库中没有
// 将空值("")写入redis——防止缓存穿透
stringRedisTemplate.opsForValue().set(key, "", 2, TimeUnit.MINUTES);
return null;
}
// 数据库存在,则需要写入缓存,并添加过期时间
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), 30, TimeUnit.MINUTES);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
// 释放锁
unlock(lockKey);
}
return shop;
}

4.6.3 代码实现——逻辑过期

需求:修改根据id查询商铺的业务,基于逻辑过期方式来解决缓存击穿问题。

image-20230703140308482

代码实现

  • RedisData:用于封装逻辑过期时间
1
2
3
4
5
@Data
public class RedisData {
private LocalDateTime expireTime; // 逻辑过期时间
private Object data; // 存取Redis的数据
}
  • 业务类
1
2
3
4
5
6
7
8
9
10
11
private void saveShop2Redis(Long id, Long expireSeconds) {
// 查询店铺数据
Shop shop = getById(id);

// 封装逻辑过期时间
RedisData redisData = new RedisData();
redisData.setData(shop);
redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));
// 写入redis
stringRedisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(redisData));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);

// 逻辑过期时间
public Shop queryWithLogicalExpire(Long id) {
String key = RedisConstants.CACHE_SHOP_KEY + id;
// 查询redis缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);
// 判断商户是否在缓存中
if (StrUtil.isBlank(shopJson)) {
return null;
}
// 命中,需要先把json反序列化为RedisData
RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
JSONObject data = (JSONObject) redisData.getData();
Shop shop = JSONUtil.toBean(data, Shop.class);
LocalDateTime expireTime = redisData.getExpireTime();
// 判断是否过期
if (expireTime.isAfter(LocalDateTime.now())) {
// 未过期
return shop;
}
// 已过期,则缓存更新
// 获取互斥锁
String lockKey = RedisConstants.LOCK_SHOP_KEY + id;
boolean isLock = tryLock(lockKey);
// 判断是否获取锁成功
if (isLock) {
// 成功,开启独立线程,实现缓存重建
CACHE_REBUILD_EXECUTOR.submit(() -> {
try {
this.saveShop2Redis(id, 20L);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
// 释放锁
unlock(lockKey);
}
});
}
// 返回过期的商铺信息,牺牲一致性
return shop;
}

4.7 缓存工具类

基于StringRedisTemplate封装一个缓存工具类,满足下列需求:

  • 方法1:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置TTL过期时间
  • 方法2:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置逻辑过期时间,用于处理缓存击穿问题
  • 方法3:根据指定的key查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题
  • 方法4:根据指定的key查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题

暂略

5 优惠券秒杀

5.1 全局唯一ID

5.1.1 简介

每个店铺都可以发布优惠券:当用户抢购时,就会生成订单并保存到tb_voucher_order这张表中,而订单表如果使用数据库自增ID就存在一些问题:

  • id的规律性太明显
  • 受单表数据量的限制

全局ID生成器:是一种在分布式系统下用来生成全局唯一ID的工具。

为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其它信息:

image-20230708154401272

ID的组成部分:

  • 符号位:1bit,永远为0
  • 时间戳:31bit,以秒为单位,可以使用69年
  • 序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID

其他的生成策略

全局唯一ID生成策略:

  • UUID
  • Redis自增
    • 每天一个key,方便统计订单量
    • ID构造是 时间戳 + 计数器
  • snowflake算法(雪花算法)
  • 数据库自增

5.1.2 Redis实现全局唯一ID

在工具类下新建RedisIdWorker工具类,用于生成全局唯一ID:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Component
public class RedisIdWorker {
// 开始的时间戳(基准时间戳)
private static final long BEGIN_TIMESTAMP = 1640995200L;
// 序列号位数
private static final int COUNT_BITS = 32;

private StringRedisTemplate stringRedisTemplate;

// 构造方法注入
public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}

public long nextId(String keyPrefix) {
// 生成时间戳:当前时间距离某个基准时间的秒数
LocalDateTime now = LocalDateTime.now(); // 当前时间
long nowSecond = now.toEpochSecond(ZoneOffset.UTC); // 转换为时间戳(距离1970年)
long timestamp = nowSecond - BEGIN_TIMESTAMP;
// 生成序列号
String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
// 如果不存在key,则自动创建
// 存在key,则自增1,并得到自增后的值count
long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
// 拼接并返回下一个可用的全局唯一ID
return timestam << COUNT_BITS | count;
}

}

单元测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@SpringBootTest
class HmDianPingApplicationTests {

@Resource
private RedisIdWorker redisIdWorker;

private ExecutorService es = Executors.newFixedThreadPool(500);

@Test
void testIdWorker() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(300);
// 线程任务:获取100个id
Runnable task = () -> {
for (int i = 0; i < 100; i++) {
long id = redisIdWorker.nextId("order");
System.out.println("id = " + id);
}
latch.countDown();
};
long begin = System.currentTimeMillis();
// 每个线程提交300次任务,每个任务获取100个id
for (int i = 0; i < 300; i++) {
es.submit(task);
}
// 等待计数为0
latch.await();
long end = System.currentTimeMillis();
System.out.println("time = " + (end - begin));
}
}

300个线程异步执行,每个线程获取100个id,则会总共获取30000个id。

执行结果:(部分)

1
2
3
4
5
6
7
8
9
10
11
// ...
id = 205460090000536872
id = 205460090000536871
id = 205460090000536875
id = 205460090000536876
id = 205460090000536878
id = 205460090000536877
id = 205460090000536874
id = 205460090000536879
id = 205460090000536880
time = 4190

redis情况:

image-20230708162122423

5.2 实现优惠券秒杀下单

5.2.1 添加数据

每个店铺都可以发布优惠券,分为平价券和特价券。平价券可以任意购买,而特价券需要秒杀抢购:

image-20230708162335077

表关系如下:

  • tb_voucher:店铺id,优惠券的基本信息,优惠金额、使用规则等,其中有一个字段表示是否是特价券。
  • tb_seckill_voucher:优惠券的库存、开始抢购时间,结束抢购时间。特价优惠券才需要填写这些信息。

5.2.2 代码实现

用户可以在店铺页面中抢购这些优惠券:

image-20230708163251192

下单时需要判断两点:

  • 秒杀是否开始或结束,如果尚未开始或已经结束则无法下单
  • 库存是否充足,不足则无法下单

image-20230708163448778

代码实现

业务层代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

@Resource
private ISeckillVoucherService seckillVoucherService;

@Resource
private RedisIdWorker redisIdWorker;

@Transactional
@Override
public Result seckillVoucher(Long voucherId) {
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀尚未开始!");
}
// 3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 已经结束
return Result.fail("秒杀已经结束!");
}
// 4.判断库存是否充足
if (voucher.getStock() < 1) {
// 库存不足
return Result.fail("库存不足!");
}
// 5.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId).update();
if(!success) {
return Result.fail("库存不足!");
}
// 6.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 6.1订单ID
long orderId = redisWorker.nextId("order");
voucherOrder.setId(orderId);
// 6.2用户ID
Long userId = UserHolder.getUser().getId();
voucherOrder.setUserId(userId);
// 6.3代金券ID
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
return Result.ok(orderId);
}
}

5.2.3 超卖

使用Jmeter进行压测,使用200个线程并发下单:

  • 200个线程

image-20231103113238541

  • 请求

image-20231103113302985

  • 请求头:添加鉴权信息

image-20231103113314720

数据库结果:

image-20231103113347599

image-20231103113358876

发现超卖了9张。

原因

1
2
3
4
5
6
7
8
9
10
// 4.判断库存是否充足
if (seckillVoucher.getStock() < 1) {
return Result.fail("优惠券已被抢光了哦,下次记得手速快点");
}

// 5.扣减库存
boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id", voucherId).update();
if (!success) {
return Result.fail("库存不足");
}

假设现在只剩下一张优惠券,线程1过来查询库存,判断库存数大于1,但还没来得及去扣减库存,此时库线程2也过来查询库存,发现库存数也大于1,那么这两个线程都会进行扣减库存操作,最终相当于是多个线程都进行了扣减库存,那么此时就会出现超卖问题

5.3 超卖问题

5.3.1 乐观锁

超卖问题是典型的多线程安全问题,针对这一问题的常见解决方案就是加锁:

  • 悲观锁:认为线程安全问题一定会发生,因此在操作数据之前先获取锁,确保线程串行执行。
    • 例如Synchronized、Lock都属于悲观锁
  • 乐观锁:认为线程安全问题不一定会发生,因此不加锁,只是在更新数据时去判断有没有其它线程对数据做了修改。
    • 如果没有修改则认为是安全的,自己才更新数据。
    • 如果已经被其它线程修改说明发生了安全问题,此时可以重试或异常。

乐观锁的实现有两种常见方法:版本号法和CAS法。

版本号法:修改时需要修改版本。

image-20230718195437884

CAS法(Compare And Set):如果将上面对version的判断修改为对stock数据本身的判断,即为CAS法。

image-20230718195709328

5.3.2 代码实现

基于乐观锁的CAS法解决超卖问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
@Transactional
@Override
public Result seckillVoucher(Long voucherId) {
// ...

// 5.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherId).eq("stock", voucher.getStock()) // where id = ? and stock = ?
.update();

// ...
}

缺点:存在成功率低的问题。

分析:当1个线程执行update语句,拿到mysql行锁,判断当前库存voucher.getStock()和数据库中的库存一致(例如100),于是修改数据库中的库存为99,其他99个线程阻塞在update上,唤醒后,依次随机判断当前库存voucher.getStock()与数据库中的库存不一致,于是放弃出卖,即失败率为99%。

修改:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Transactional
@Override
public Result seckillVoucher(Long voucherId) {
// ...

// 5.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherId).gt("stock", voucher.getStock()) // where id = ? and stock > ?
.update();

// ...
}

5.4 一人一单

需求:修改秒杀业务,要求同一个优惠券,一个用户只能下一单。

image-20230718202148741

5.4.1 代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Transactional
@Override
public Result seckillVoucher(Long voucherId) {
// ...
// 一人一单
Long userId = UserHolder.getUser().getId();
// 在数据库中查询优惠券订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if(count > 0) {
return Result.fail("用户已经购买过一次");
}
// 扣减库存...
// ...
}

5.4.2 单机模式下的并发问题

同样存在并发问题。如果用户故意开多线程抢优惠券,由于mysql查询语句不存在排他锁,因此可以并发执行,count可能被多个线程查询出为0,导致一人多单。

可以使用悲观锁解决:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
@Override
public Result seckillVoucher(Long voucherId) {
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀尚未开始!");
}
// 3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 已经结束
return Result.fail("秒杀已经结束!");
}
// 4.判断库存是否充足
if (voucher.getStock() < 1) {
// 库存不足
return Result.fail("库存不足!");
}
// 5.一人一单
Long userId = UserHolder.getUser().getId();
// 对同一个用户加锁
synchronized (userId.toString().intern()) {
// 获取代理对象(防止Spring事务失效)
IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}
}

@Transactional
public Result createVoucherOrder(Long voucherId) {
// 5.1.查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
// 5.2.判断是否存在
if (count > 0) {
// 用户已经购买过了
return Result.fail("用户已经购买过一次!");
}

// 6.扣减库存——乐观锁
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0
.update();
if (!success) {
// 扣减失败
return Result.fail("库存不足!");
}

// 7.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 7.1.订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 7.2.用户id
voucherOrder.setUserId(userId);
// 7.3.代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);

// 7.返回订单id
return Result.ok(orderId);
}

5.4.3 集群模式下的并发问题

通过加锁可以解决在单机情况下的一人一单安全问题,但是在集群模式下就不行了。

image-20230720195717712

sync锁只能保证单个JVM中的各个线程之间互斥,而不能保证多个JVM中的各个线程之间的互斥。

5.5 分布式锁

5.5.1 简介

image-20230720200041732

分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。

分布式锁的核心是实现多进程之间互斥,而满足这一点的方式有很多,常见的有三种:

特点 MySQL Redis Zookeeper
互斥 利用mysql本身的互斥锁机制 利用setnx这样的互斥命令 利用节点的唯一性和有序性实现互斥
高可用
高性能 一般 一般
安全性 断开连接,自动释放锁 利用锁超时时间,到期释放 临时节点,断开连接自动释放

5.5.2 基于Redis的分布式锁

image-20230720201118895

实现分布式锁时需要实现的两个基本方法:

  • 获取锁

    • 互斥:确保只能一个线程获取锁
    1
    2
    3
    4
    5
    6
    7
    # 利用SETNX命令的互斥性
    SETNX lock thread1
    # 添加锁的过期时间,避免服务器宕机引起死锁
    EXPIRE lock 10

    # 上面的两条语句可以合并为一个原子操作
    SET lock thread1 EX 10 NX
    • 非阻塞:尝试1次,成功返回true,失败返回false
  • 释放锁

    • 手动释放
    1
    DEL lock
    • 超时释放

5.5.3 代码实现——初级版

需求:定义一个类,实现下面接口,利用Redis实现分布式锁功能。

1
2
3
4
5
6
7
8
9
10
11
12
public interface ILock {
/**
* 尝试获取锁
* @param timeoutSec 锁持有的超时时间,过期后自动释放
* @return true代表获取锁成功; false代表获取锁失败
*/
boolean tryLock(long timeoutSec);
/**
* 释放锁
*/
void unlock();
}

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class SimpleRedisLock implements ILock {

// key:包含业务和用户id
private String name;
private StringRedisTemplate stringRedisTemplate;

// 构造方法注入
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}

// 锁的前缀
private static final String KEY_PREFIX = "lock:";

@Override
public boolean tryLock(long timeoutSec) {
// 获取线程标识
long threadId = Thread.currentThread().getId();
// 获取锁
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}

@Override
public void unlock() {
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}

业务层代码改造:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 5.一人一单
Long userId = UserHolder.getUser().getId();
// 对同一个用户加锁
synchronized (userId.toString().intern()) {
// 获取代理对象(防止Spring事务失效)
IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}

// 修改为:
Long userId = UserHolder.getUser().getId();
// 创建锁对象
// 需要锁的是什么:
// (1)业务:下单,即order
// (2)用户:锁住同一个用户
SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
// 获取锁
boolean isLock = lock.tryLock(1200);
// 判断
if(!isLock) {
// 获取锁失败
return Result.fail("不允许重复下单");
}
try {
// 执行业务
// 获取代理对象(防止Spring事务失效)
IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} finally {
// 释放锁
lock.unlock();
}

5.5.4 锁的误删

image-20230720203531867

解决:线程1在释放锁时,判断锁是否是自己的,即key是否为自己的线程值。

image-20230720203704071

流程变化:

image-20230720203814821

5.5.5 代码实现——改进版1

需求:修改之前的分布式锁实现,满足:

  • 在获取锁时存入线程标示(可以用UUID表示,防止标识冲突)
  • 在释放锁时先获取锁中的线程标示,判断是否与当前线程标示一致
    • 如果一致则释放锁
    • 如果不一致则不释放锁

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class SimpleRedisLock implements ILock {

// 业务的名称
private String name;
private StringRedisTemplate stringRedisTemplate;

// 构造方法注入
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}

// 锁的前缀
private static final String KEY_PREFIX = "lock:";
// 线程标识
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";

@Override
public boolean tryLock(long timeoutSec) {
// 获取线程标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}

@Override
public void unlock() {
// 获取线程标示
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁中的标示
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
// 判断标示是否一致
if(threadId.equals(id)) {
// 是自己的锁,才释放锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
}

5.5.6 锁的原子性问题

image-20230720204447707

本质:判断锁和释放锁的操作不是原子性操作

5.5.7 Lua脚本

Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。

基本语法:

1
2
-- 执行redis命令
redis.call('命令名称', 'key', '其它参数', ...)

例如:

1
2
3
4
5
6
-- 先执行 set name jack
redis.call('set', 'name', 'jack')
-- 再执行 get name
local name = redis.call('get', 'name')
-- 返回
return name

写好脚本以后,需要用Redis命令来调用脚本,调用脚本的常见命令如下:

1
EVAL script numkeys key [key...] arg [arg...]

例如,我们要执行 redis.call('set', 'name', 'jack') 这个脚本,语法如下:

1
2
# 调用脚本
EVAL "return redis.call('set', 'name', 'jack')" 0

image-20230720205157425

又如:key类型参数会放入KEYS数组,其它参数会放入ARGV数组,在脚本中可以从KEYS和ARGV数组获取这些参数,注意,lua中下标从1开始:

1
2
# 调用脚本 
EVAL "return redis.call('set', KEYS[1], ARGV[1])" 1 name Rose

释放锁的业务流程是这样的:

  • 获取锁中的线程标示
  • 判断是否与指定的标示(当前线程标示)一致
    • 如果一致则释放锁(删除)
    • 如果不一致则什么都不做

如果用Lua脚本来表示则是这样的:

1
2
3
4
5
6
7
8
-- 这里的 KEYS[1] 就是锁的key,这里的ARGV[1]就是当前线程标示
-- 获取锁中的标示,判断是否与当前线程标示一致
if (redis.call('GET', KEYS[1]) == ARGV[1]) then
-- 一致,则删除锁
return redis.call('DEL', KEYS[1])
end
-- 不一致,则直接返回
return 0

5.5.8 代码实现——改进版2

需求:基于Lua脚本实现分布式锁的释放锁逻辑

提示:RedisTemplate调用Lua脚本的API如下:

image-20230720210151929

代码实现

resource中新增lua脚本:unlock.lua,代码见上。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class SimpleRedisLock implements ILock {
// ...

// 锁的前缀
private static final String KEY_PREFIX = "lock:";
// 线程标识
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
// 静态初始化
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}

@Override
public boolean tryLock(long timeoutSec) {
// 获取线程标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}

@Override
public void unlock() {
// 调用lua脚本
stringRedisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX + name),
ID_PREFIX + Thread.currentThread().getId());
}
}

5.6 Redisson

5.6.1 基于SETNX的分布式锁问题

  • 不可重入:同一个线程无法多次获取同一把锁
  • 不可重试:获取锁只尝试一次就返回false,没有重试机制
  • 超时释放:锁超时释放虽然可以避免死锁,但如果是业务执行耗时较长,也会导致锁释放,存在安全隐患
  • 主从一致性:如果Redis提供了主从集群,主从同步存在延迟,当主宕机时,如果从并同步主中的锁数据,则会出现锁失效

5.6.2 Redisson入门

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。

image-20230720211219451

配置:

1
2
3
4
5
6
7
8
9
10
11
12
@Configuration
public class RedissonConfig {

@Bean
public RedissonClient redissonClient(){
// 配置
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.150.101:6379").setPassword("123321");
// 创建RedissonClient对象
return Redisson.create(config);
}
}

使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Resource
private RedissonClient redissonClient;

@Test
void testRedisson() throws InterruptedException {
// 获取锁(可重入),指定锁的名称
RLock lock = redissonClient.getLock("anyLock");
// 尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位
boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS);
// 判断释放获取成功
if(isLock){
try {
System.out.println("执行业务");
} finally {
// 释放锁
lock.unlock();
}
}
}

5.6.3 可重入锁原理

Redssion实现了重入锁,下面的代码能够正常执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@Slf4j
@SpringBootTest
class RedissonTest {

@Resource
private RedissonClient redissonClient;

private RLock lock;

@BeforeEach
void setUp() {
lock = redissonClient.getLock("order");
}

@Test
void method1() throws InterruptedException {
// 尝试获取锁
boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS);
if (!isLock) {
log.error("获取锁失败 .... 1");
return;
}
try {
log.info("获取锁成功 .... 1");
// 再次获取锁
method2();
log.info("开始执行业务 ... 1");
} finally {
log.warn("准备释放锁 .... 1");
lock.unlock();
}
}
void method2() {
// 尝试获取锁
boolean isLock = lock.tryLock();
if (!isLock) {
log.error("获取锁失败 .... 2");
return;
}
try {
log.info("获取锁成功 .... 2");
log.info("开始执行业务 ... 2");
} finally {
log.warn("准备释放锁 .... 2");
lock.unlock();
}
}
}

手动实现可重入锁

在获取锁的时候,判断这个锁是属于哪个线程的,如果属于自己,则能够获取。

可以在VALUE上使用hash,记录重入的次数:

image-20230722211119117

流程:

image-20230722211508858

由于redis中没有对值为hash结构的SETNX命令,因此采用lua脚本来实现:

  • 获取锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
local key = KEYS[1]; -- 锁的key
local threadId = ARGV[1]; -- 线程唯一标识
local releaseTime = ARGV[2]; -- 锁的自动释放时间
-- 判断锁是否存在
if(redis.call('exists', key) == 0) then
-- 不存在, 获取锁
redis.call('hset', key, threadId, '1');
-- 设置有效期
redis.call('expire', key, releaseTime);
return 1; -- 返回结果
end;
-- 如果锁已经存在,则判断threadId是否是自己
if(redis.call('hexists', key, threadId) == 1) then
-- 不存在, 获取锁,重入次数+1
redis.call('hincrby', key, threadId, '1');
-- 设置有效期
redis.call('expire', key, releaseTime);
return 1; -- 返回结果
end;
return 0; -- 代码走到这里,说明获取锁的不是自己,获取锁失败
  • 释放锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
local key = KEYS[1]; -- 锁的key
local threadId = ARGV[1]; -- 线程唯一标识
local releaseTime = ARGV[2]; -- 锁的自动释放时间
-- 判断当前锁是否还是被自己持有
if (redis.call('HEXISTS', key, threadId) == 0) then
return nil; -- 如果已经不是自己,则直接返回
end;
-- 是自己的锁,则重入次数-1
local count = redis.call('HINCRBY', key, threadId, -1);
-- 判断是否重入次数是否已经为0
if (count > 0) then
-- 大于0说明不能释放锁,重置有效期然后返回
redis.call('EXPIRE', key, releaseTime);
return nil;
else -- 等于0说明可以释放锁,直接删除
redis.call('DEL', key);
return nil;
end;

事实上,Redssion实现的可重入锁底层也是调用了lua脚本,和上面的类似。

5.6.4 Redisson分布式锁原理

image-20230722213005184

暂略

总结:

  • 可重入:利用hash结构记录线程id和重入次数
  • 可重试:利用信号量和PubSub功能实现等待、唤醒,获取锁失败的重试机制
  • 超时续约:利用watchDog,每隔一段时间(releaseTime / 3),重置超时时间

5.6.5 主从一致性问题

  • 一个线程从Redis master上获取锁,获取到锁后,主节点宕机

image-20230722213853353

  • 此时,主从同步尚未完成,主节点的锁信息丢失

image-20230722213919175

解决:取消主从模式,而是采用分片集群:线程需要向三个redis服务器上获取锁成功时,才能成功:

image-20230722214011481

这称之为Redssion的multiLock:多个独立的Redis节点,必须在所有节点都获取重入锁,才算获取锁成功。

缺点:运维成本高、实现复杂。

5.7 秒杀优化

5.7.1 分析

原始流程:在之前的实现中,redis和数据库的操作是同步执行的,整个流程中后者耗时较长,因此导致并发量小。

image-20230722214747804

优化:将耗时的数据库操作分离出来,在Tomcat中执行,并发量大的操作在redis中执行,redis中的结果异步通知到tomcat,进行数据库的读写操作。

image-20230725200025602


image-20230725200456175

redis中需要保存的有:

  • key:id为XXX的秒杀优惠券,value:该秒杀优惠券的库存
    • 作用:防止超卖
  • key:购买id为XXX的用户,value:用户id(使用set存储)
    • 作用:一人一单

5.7.2 代码实现

需求:

  • 新增秒杀优惠券的同时,将优惠券信息保存到Redis中
  • 基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
  • 如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
  • 开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能

  • VoucherServiceImpl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 添加秒杀优惠券到redis和数据库
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
// 保存优惠券
save(voucher);
// 保存秒杀信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
// 保存秒杀库存到Redis中
stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
}
  • seckill.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
-- 1.参数列表
-- 1.1.优惠券id
local voucherId = ARGV[1]
-- 1.2.用户id
local userId = ARGV[2]
-- 1.3.订单id
local orderId = ARGV[3]

-- 2.数据key
-- 2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId

-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call('get', stockKey)) <= 0) then
-- 3.2.库存不足,返回1
return 1
end
-- 3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1) then
-- 3.3.存在,说明是重复下单,返回2
return 2
end
-- 3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5.下单(保存用户)sadd orderKey userId
redis.call('sadd', orderKey, userId)
return 0
  • 异步下单:VoucherOrderServiceImpl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
// 线程池
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

// 阻塞队列(高并发情况下会有内存溢出问题)
private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);

// 当VoucherOrderServiceImpl构造完毕(注入容器后)就执行init方法
@PostConstruct
private void init() {
// 提交任务
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}

// 线程任务
private class VoucherOrderHandler implements Runnable{
@Override
public void run() {
while (true){
try {
// 1.获取队列中的订单信息(orderTasks队列为空时阻塞在这里)
VoucherOrder voucherOrder = orderTasks.take();
// 2.创建订单(数据库操作)
createVoucherOrder(voucherOrder);
} catch (Exception e) {
log.error("处理订单异常", e);
}
}
}
}

@Override
public Result seckillVoucher(Long voucherId) {
Long userId = UserHolder.getUser().getId();
// 1.执行lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
// ARGV[1] ARGV[2]
voucherId.toString(), userId.toString()
);
int r = result.intValue();
// 2.判断结果是否为0
if (r != 0) {
// 2.1.不为0 ,代表没有购买资格
return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
}
// 2.2.为0 ,有购买资格,把下单信息保存到阻塞队列
VoucherOrder voucherOrder = new VoucherOrder();
// 2.3.订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 2.4.用户id
voucherOrder.setUserId(userId);
// 2.5.代金券id
voucherOrder.setVoucherId(voucherId);
// 2.6.放入阻塞队列(耗时的数据库操作)
orderTasks.add(voucherOrder);

// 3.返回订单id
return Result.ok(orderId);
}

private void createVoucherOrder(VoucherOrder voucherOrder) {
Long userId = voucherOrder.getUserId();
Long voucherId = voucherOrder.getVoucherId();
// 创建锁对象
RLock redisLock = redissonClient.getLock("lock:order:" + userId);
// 尝试获取锁
boolean isLock = redisLock.tryLock();
// 判断
if (!isLock) {
// 获取锁失败,直接返回失败或者重试
log.error("不允许重复下单!");
return;
}

try {
// 5.1.查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
// 5.2.判断是否存在
if (count > 0) {
// 用户已经购买过了
log.error("不允许重复下单!");
return;
}

// 6.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0
.update();
if (!success) {
// 扣减失败
log.error("库存不足!");
return;
}

// 7.创建订单
save(voucherOrder);
} finally {
// 释放锁
redisLock.unlock();
}
}

小结:秒杀业务的优化思路是什么

  • 先利用Redis完成库存余量、一人一单判断,完成抢单业务
  • 再将下单业务放入阻塞队列,利用独立线程异步下单

问题:

  • 内存限制问题:阻塞队列的长度受JVM限制
  • 数据安全问题:宕机

5.8 消息队列实现异步秒杀

5.8.1 消息队列简介

消息队列(Message Queue),字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:

  • 消息队列:存储和管理消息,也被称为消息代理(Message Broker),是独立于JVM的服务
  • 生产者:发送消息到消息队列
  • 消费者:从消息队列获取消息并处理消息

image-20230725204835301

5.8.2 Redis中的消息队列

Redis提供了三种不同的方式来实现消息队列:

  • list结构:基于List结构模拟消息队列
  • PubSub:基本的点对点消息模型
  • Stream:比较完善的消息队列模型
① list结构

队列是入口和出口不在一边,因此我们可以利用:LPUSH 结合 RPOP、或者 RPUSH 结合 LPOP来实现。

不过要注意的是,当队列中没有消息时RPOP或LPOP操作会立即返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此这里应该使用BRPOP或者BLPOP来实现阻塞效果。

优点:

  • 利用Redis存储,不受限于JVM内存上限
  • 基于Redis的持久化机制,数据安全性有保证
  • 可以满足消息有序性

缺点:

  • 无法避免消息丢失
  • 只支持单消费者
② PubSub

PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。

  • SUBSCRIBE channel [channel] :订阅一个或多个频道
  • PUBLISH channel msg :向一个频道发送消息
  • PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的所有频道

优点:

  • 采用发布订阅模型,支持多生产、多消费

缺点:

  • 不支持数据持久化
  • 无法避免消息丢失
  • 消息堆积有上限,超出时数据丢失
③ Stream

Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。

  • 发送消息:XADD

image-20230725205812453

例如:

image-20230725205843911

  • 接收消息:XREAD

image-20230725205915175

例如:使用XREAD读取第一个消息:

image-20230725205924826

例如:阻塞方式,读取最新的消息:

image-20230725210144778

在业务开发中,我们可以循环的调用XREAD阻塞方式来查询最新消息,从而实现持续监听队列的效果,伪代码如下:

image-20230725210304506

当我们指定起始ID为$时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题。

特点:

  • 消息可回溯
  • 一个消息可以被多个消费者读取
  • 可以阻塞读取
  • 有消息漏读的风险

5.8.3 消费者组

消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:

  • 消息分流:队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度
  • 消息标示:消费者组会维护一个标示,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标示之后读取消息。确保每一个消息都会被消费。
  • 消息确认:消费者获取消息后,消息处于pending状态,并存入一个pending-list。当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list移除。