Redis实践基础篇学习笔记 学习来源:黑马程序员
学习时间:2023年6月26日
0 Linux环境Redis的安装 暂略
1 Redis的Java客户端 1.1 对比 在Redis官网中提供了各种语言的客户端。Java客户端主要如下:
以Redis命令作为方法名称,学习成本低,简单实用。但是Jedis实例是线程不安全的,多线程环境下需要基于连接池来使用。
Lettuce是基于Netty实现的,支持同步、异步和响应式编程方式,并且是线程安全的。支持Redis的哨兵模式、集群模式和管道模式。
Redisson是一个基于Redis实现的分布式、可伸缩的Java数据结构集合。包含了诸如Map、Queue、Lock、 Semaphore、AtomicLong等强大功能。
1.2 Jedis 1.2.1 代码示例 引入依赖:
1 2 3 4 5 <dependency > <groupId > redis.clients</groupId > <artifactId > jedis</artifactId > <version > 3.7.0</version > </dependency >
测试:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class JedisTest { private Jedis jedis; @Before public void setUp () { jedis = new Jedis("xxx.xxx.xxx.xxx" , 6379 ); jedis.auth("123456" ); jedis.select(0 ); } @Test public void setString () { String result = jedis.set("name" , "Hongyi" ); System.out.println("Result = " + result); String name = jedis.get("name" ); System.out.println("name = " + name); } @After public void tearDown () { if (jedis != null ) { jedis.close(); } } }
执行结果:
1 2 Result = OK name = Hongyi
1.2.2 Jedis连接池 Jedis本身是线程不安全的,并且频繁的创建和销毁连接会有性能损耗,因此推荐使用Jedis连接池代替Jedis的直连方式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class JedisConnectionFactory { private static final JedisPool jedisPool; static { JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); jedisPoolConfig.setMaxTotal(8 ); jedisPoolConfig.setMaxIdle(8 ); jedisPoolConfig.setMinIdle(0 ); jedisPoolConfig.setMaxWaitMillis(200 ); jedisPool = new JedisPool(jedisPoolConfig, "192.168.150.101" , 6379 , 1000 , "123456" ); } public static Jedis getJedis () { return jedisPool.getResource(); } }
测试代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class JedisTest { private Jedis jedis; @Before public void setUp () { jedis = JedisConnectionFactory.getJedis(); jedis.auth("123456" ); jedis.select(0 ); } @Test public void setString () { String result = jedis.set("name" , "Hongyi" ); System.out.println("Result: " + result); String name = jedis.get("name" ); System.out.println("name: " + name); } @After public void tearDown () { if (jedis != null ) { jedis.close(); } } }
1.3 Spring Data Redis 1.3.1 简介 SpringData是Spring中数据操作的模块,包含对各种数据库的集成,其中对Redis的集成模块就叫做SpringDataRedis。
特点:
提供了对不同Redis客户端的整合(Lettuce和Jedis)
提供了RedisTemplate统一API来操作Redis
支持Redis的发布订阅模型
支持Redis哨兵和Redis集群
支持基于Lettuce的响应式编程
支持基于JDK、JSON、字符串、Spring对象的数据序列化及反序列化
支持基于Redis的JDKCollection实现
1.3.2 代码示例 SpringDataRedis中提供了RedisTemplate
工具类,其中封装了各种对Redis的操作。并且将不同数据类型的操作API封装到了不同的类型中:
API
返回值类型
说明
redisTemplate .opsForValue()
ValueOperations
操作String类型数据
redisTemplate .opsForHash()
HashOperations
操作Hash类型数据
redisTemplate .opsForList()
ListOperations
操作List类型数据
redisTemplate .opsForSet()
SetOperations
操作Set类型数据
redisTemplate .opsForZSet()
ZSetOperations
操作SortedSet类型数据
redisTemplate
通用的命令
SpringBoot已经提供了对SpringDataRedis的支持,使用非常简单:
1 2 3 4 5 6 7 8 9 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-data-redis</artifactId > </dependency > <dependency > <groupId > org.apache.commons</groupId > <artifactId > commons-pool2</artifactId > </dependency >
配置:
1 2 3 4 5 6 7 8 9 10 spring: redis: host: xxx.xxx.xxx.xxx port: 6379 password: 123456 lettuce: pool: max-active: 8 min-idle: 0 max-wait: 100
测试类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @SpringBootTest class SpringDataRedisDemoApplicationTests { @Autowired private RedisTemplate redisTemplate; @Test void testString () { redisTemplate.opsForValue().set("name" , "Takao" ); Object name = redisTemplate.opsForValue().get("name" ); System.out.println("name = " + name); } }
执行结果:
1.3.3 序列化 问题:上面测试后,此时redis里的键值对理应是name=Takao
,但是:
1 2 127.0.0.1:6379> get name "Hongyi"
原因:RedisTemplate可以接收任意Object作为值写入Redis,只不过写入前会把Object序列化为字节形式,默认是采用JDK序列化。
解决:自定义RedisTemplate的序列化方式
1.3.4 StringRedisTemplate Spring默认提供了一个StringRedisTemplate类,它的key和value的序列化方式默认就是String
方式。省去了我们自定义RedisTemplate的过程。
步骤:
写入Redis时,手动把对象序列化为JSON
读取Redis时,手动把读取到的JSON反序列化为对象
2 项目介绍 2.1 项目架构 名称:黑马点评项目
项目结构:
2.2 后端项目部署 后端代码结构:
配置文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 server: port: 8081 spring: application: name: hmdp datasource: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://127.0.0.1:3306/hmdp?useSSL=false&serverTimezone=UTC username: root password: 123 redis: host: 192.168 .150 .101 port: 6379 password: 123321 lettuce: pool: max-active: 10 max-idle: 10 min-idle: 1 time-between-eviction-runs: 10s jackson: default-property-inclusion: non_null mybatis-plus: type-aliases-package: com.hongyi.entity logging: level: com.hmdp: debug
需要根据服务器实际情况,修改其中mysql和redis的配置。
2.2 前端项目部署 略,使用postman来测试接口
3 短信登录 3.1 基于Session实现登录 流程:
3.1.1 发送验证码
说明
请求方式
POST
请求路径
/user/code
请求体
phone
返回值
无
代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @RestController @RequestMapping("/user") public class UserController { @Resource private IUserService userService; @PostMapping("code") public Result sendCode (@RequestParam("phone") String phone, HttpSession session) { return userService.sendCode(phone, session); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Slf4j @Service public class UserServiceImpl extends ServiceImpl <UserMapper , User > implements IUserService { @Override public Result sendCode (String phone, HttpSession session) { if (RegexUtils.isPhoneInvalid(phone)) { return Result.fail("手机号格式错误" ); } String code = RandomUtil.randomNumbers(6 ); session.setAttribute("code" , code); log.info("发送验证码成功, 验证码为: {}" , code); return Result.ok(); } }
其中,dto.Result
封装返回信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Data @NoArgsConstructor @AllArgsConstructor public class Result { private Boolean success; private String errorMsg; private Object data; private Long total; public static Result ok () { return new Result(true , null , null , null ); } public static Result ok (Object data) { return new Result(true , null , data, null ); } public static Result ok (List<?> data, Long total) { return new Result(true , null , data, total); } public static Result fail (String errorMsg) { return new Result(false , errorMsg, null , null ); } }
接口测试:这里采用Postman
进行接口测试:
1 2023-06-27 14:57:42.916 INFO 3256 --- [nio-8081-exec-1] com.hongyi.service.impl.UserServiceImpl : 发送验证码成功, 验证码为: 348237
3.1.2 登录和注册
1 2 3 4 5 @PostMapping("/login") public Result login (@RequestBody loginForm loginForm, HttpSession session) { return userService.login(loginForm, session); }
这里将请求体(前端应该发送json
格式)中的数据封装到了loginForm
类中:
1 2 3 4 5 6 @Data public class LoginFormDTO { private String phone; private String code; private String password; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 @Override public Result login (LoginFormDTO loginForm, HttpSession session) { String phone = loginForm.getPhone(); if (RegexUtils.isPhoneInvalid(phone)) { return Result.fail("手机号格式错误" ); } String cacheCode = (String) session.getAttribute("code" ); String code = loginForm.getCode(); if (cacheCode == null || !cacheCode.equals(code)) { return Result.fail("验证码错误" ); } User user = query().eq("phone" , phone).one(); if (user == null ) { user = createUserWithPhone(phone); } session.setAttribute("user" , user); return Result.ok(); } private User createUserWithPhone (String phone) { User user = new User(); user.setPhone(phone); user.setNickName(USER_NICK_NAME_PREFIX + RandomUtil.randomString(10 )); save(user); return user; }
接口测试:
再查看数据库即可。
3.1.3 登录拦截器 使用拦截器:
代码实现
拦截器interceptor.LoginInterceptor
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class LoginInterceptor implements HandlerInterceptor { @Override public boolean preHandle (HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { HttpSession session = request.getSession(); User user = (User) session.getAttribute("user" ); if (user == null ) { response.setStatus(401 ); return false ; } UserDTO userDTO = new UserDTO(); userDTO.setId(user.getId()); userDTO.setNickName(user.getNickName()); userDTO.setIcon(user.getIcon()); UserHolder.saveUser(userDTO); return true ; } @Override public void afterCompletion (HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception { UserHolder.removeUser(); } }
UserDTO
包含了User
类的部分成员变量,用于隐藏敏感信息。
1 2 3 4 5 6 @Data public class UserDTO { private Long id; private String nickName; private String icon; }
注册拦截器,新建config.MvcConfig
配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 @Configuration public class MvcConfig implements WebMvcConfigurer { @Override public void addInterceptors (InterceptorRegistry registry) { registry.addInterceptor(new LoginInterceptor()) .excludePathPatterns( "/blog/hot" , "/user/code" , "/user/login" , "/shop/**" ); } }
excludePathPatterns
用于排除拦截的路径,即对这些路径予以放行。
1 2 3 4 5 6 @GetMapping("/me") public Result me () { UserDTO user = UserHolder.getUser(); return Result.ok(user); }
接口测试:
3.1.4 集群的session共享问题 session共享问题:多台Tomcat并不共享session存储空间,当请求切换到不同tomcat服务时导致数据丢失的问题。
session的替代方案应该满足:
3.2 基于Redis实现的登录 3.2.1 流程分析
3.2.2 发送验证码 发送验证码后,将phone=code
保存在redis中,而不是session中。
另外设置键值对的有效期。
1 2 3 4 5 6 7 8 9 10 @Resource private StringRedisTemplate stringRedisTemplate;@Override public Result sendCode (String phone, HttpSession session) { stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY + phone, code, 2 , TimeUnit.MINUTES); }
常量LOGIN_CODE_KEY
是一个识别的前缀,定义在RedisConstants
:
1 2 3 4 public class RedisConstants { public static final String LOGIN_CODE_KEY = "login:code:" ; }
测试结果:
3.2.3 登录和注册
根据手机号,从redis获取验证码并校验。
将用户信息保存在redis中,可以采用(前缀+随机token)=hash
的方式存储
例如:
代码实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 @Override public Result login (LoginFormDTO loginForm, HttpSession session) { String phone = loginForm.getPhone(); if (RegexUtils.isPhoneInvalid(phone)) { return Result.fail("手机号格式错误" ); } String cacheCode = stringRedisTemplate.opsForValue().get(RedisConstants.LOGIN_CODE_KEY + phone); String code = loginForm.getCode(); if (cacheCode == null || !cacheCode.equals(code)) { return Result.fail("验证码错误" ); } User user = query().eq("phone" , phone).one(); if (user == null ) { user = createUserWithPhone(phone); } String token = UUID.randomUUID().toString(true ); UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class); Map<String, Object> userMap = BeanUtil.beanToMap(userDTO, new HashMap<>(), CopyOptions.create().setIgnoreNullValue(true ).setFieldValueEditor( (fieldName, fieldValue) -> fieldValue.toString() )); String tokenKey = RedisConstants.LOGIN_USER_KEY + token; stringRedisTemplate.opsForHash().putAll(tokenKey, userMap); stringRedisTemplate.expire(tokenKey, 30 , TimeUnit.MINUTES); return Result.ok(token); }
在session中,默认超时时间为30min,如果连接有活动,则从新计算。
上面在设置键值对的超时时长为30min,即使连接有活动,也会过期,因此需要在拦截器中重新刷新过期时长。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public class LoginInterceptor implements HandlerInterceptor { private StringRedisTemplate stringRedisTemplate; public LoginInterceptor (StringRedisTemplate stringRedisTemplate) { this .stringRedisTemplate = stringRedisTemplate; } @Override public boolean preHandle (HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { String token = request.getHeader("authorization" ); if (token == null ) { response.setStatus(401 ); return false ; } String keyToken = RedisConstants.LOGIN_USER_KEY + token; Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(keyToken); if (userMap.isEmpty()) { response.setStatus(401 ); return false ; } UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false ); UserHolder.saveUser(userDTO); stringRedisTemplate.expire(keyToken, 30 , TimeUnit.MINUTES); return true ; } }
注意,这里不能注入StringRedisTemplate
,因为拦截器类不是容器管理的,因此需要在配置内中将StringRedisTemplate
注入,再构造传参:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Configuration public class MvcConfig implements WebMvcConfigurer { @Resource private StringRedisTemplate stringRedisTemplate; @Override public void addInterceptors (InterceptorRegistry registry) { registry.addInterceptor(new LoginInterceptor(stringRedisTemplate)) .excludePathPatterns( "/blog/hot" , "/user/code" , "/user/login" , "/shop/**" ); } }
接口测试:
注册后传回:
1 2 3 4 { "success" : true , "data" : "6ef6bf998625439187f5ba932b22486e" }
redis数据库状态:
使用token
访问/user/me
接口:
3.2.4 拦截器优化
代码示例
刷新拦截器RefreshTokenInterceptor
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public class RefreshTokenInterceptor implements HandlerInterceptor { private StringRedisTemplate stringRedisTemplate; public RefreshTokenInterceptor (StringRedisTemplate stringRedisTemplate) { this .stringRedisTemplate = stringRedisTemplate; } @Override public boolean preHandle (HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { String token = request.getHeader("authorization" ); if (token == null ) { return true ; } String keyToken = RedisConstants.LOGIN_USER_KEY + token; Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(keyToken); if (userMap.isEmpty()) { return true ; } UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false ); UserHolder.saveUser(userDTO); stringRedisTemplate.expire(keyToken, 30 , TimeUnit.MINUTES); return true ; } @Override public void afterCompletion (HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception { UserHolder.removeUser(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public class LoginInterceptor implements HandlerInterceptor { @Override public boolean preHandle (HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { if (UserHolder.getUser() == null ) { response.setStatus(401 ); return false ; } return true ; } @Override public void afterCompletion (HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception { UserHolder.removeUser(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Configuration public class MvcConfig implements WebMvcConfigurer { @Resource private StringRedisTemplate stringRedisTemplate; @Override public void addInterceptors (InterceptorRegistry registry) { registry.addInterceptor(new LoginInterceptor()) .excludePathPatterns( "/blog/hot" , "/user/code" , "/user/login" , "/shop/**" ).order(1 ); registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate)).addPathPatterns("/**" ).order(0 ); } }
4 查询缓存 4.1 什么是缓存 缓存就是数据交换的缓冲区,是存贮数据的临时地方,一般读写性能较高。
缓存的作用:
缓存的成本:
4.2 添加Redis缓存 4.2.1 流程分析 没有缓存时的模型:
有缓存的模型:
根据id查询商铺缓存的流程:
4.2.2 代码实现1 业务:通过id查询商户信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @RestController @RequestMapping("/shop") public class ShopController { @Resource public IShopService shopService; @GetMapping("/{id}") public Result queryShopById (@PathVariable("id") Long id) { return shopService.queryById(id); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 @Service public class ShopServiceImpl extends ServiceImpl <ShopMapper , Shop > implements IShopService { @Resource private StringRedisTemplate stringRedisTemplate; @Override public Result queryById (Long id) { String key = RedisConstants.CACHE_SHOP_KEY + id; String shopJson = stringRedisTemplate.opsForValue().get(key); if (shopJson != null ) { Shop shop = JSONUtil.toBean(shopJson, Shop.class); return Result.ok(shop); } Shop shop = getById(id); if (shop == null ) { return Result.fail("店铺不存在" ); } stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop)); return Result.ok(shop); } }
接口测试:
第一次查询:用时277ms
,因为走的是数据库
第二次查询:用时141ms
,走的是redis缓存
此时redis缓存:
4.2.3 代码实现2 给店铺类型查询业务添加缓存。略。
4.3 缓存更新策略 4.3.1 简介
内存淘汰
超时剔除
主动更新
说明
不用自己维护,利用Redis的内存淘汰机制,当内存不足时自动淘汰部分数据。下次查询时更新缓存。
给缓存数据添加TTL时间,到期后自动删除缓存。下次查询时更新缓存。
编写业务逻辑,在修改数据库的同时,更新缓存。
一致性
差
一般
好
维护成本
无
低
高
业务场景:
低一致性需求:使用内存淘汰机制。例如店铺类型的查询缓存
高一致性需求:主动更新,并以超时剔除作为兜底方案。例如店铺详情查询的缓存
4.3.2 主动更新策略
Cache Aside Pattern
:由缓存的调用者,在更新数据库的同时更新缓存
Read/Write Through Pattern
:缓存与数据库整合为一个服务,由服务来维护一致性。调用者调用该服务,无需关心缓存一致性问题。
Write Behind Caching Pattern
:调用者只操作缓存,由其它线程异步的将缓存数据持久化到数据库,保证最终一致。
Cache Aside Pattern
操作缓存和数据库时有三个问题需要考虑:
删除缓存还是更新缓存?
如何保证缓存与数据库的操作的同时成功或失败?
单体系统,将缓存与数据库操作放在一个事务
分布式系统,利用TCC等分布式事务方案
先操作缓存还是先操作数据库
先删除缓存,再操作数据库
先操作数据库,再删除缓存(推荐,因为数据库操作相比缓存要慢)
4.3.3 代码实现 需求:给查询商铺的缓存添加超时剔除和主动更新的策略
修改ShopController中的业务逻辑,满足下面的需求:
根据id查询店铺时,如果缓存未命中,则查询数据库,将数据库结果写入缓存,并设置超时时间(超时剔除)
根据id修改店铺时,先修改数据库,再删除缓存(主动更新)
代码示例
1 2 stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), 30 , TimeUnit.MINUTES);
1 2 3 4 5 6 7 8 9 @PutMapping public Result updateShop (@RequestBody Shop shop) { return shopService.update(shop); }
1 2 3 4 5 6 7 8 9 10 11 12 13 @Override @Transactional public Result update (Shop shop) { Long id = shop.getId(); if (id == null ) { return Result.fail("店铺id不能为空" ); } updateById(shop); stringRedisTemplate.delete(RedisConstants.CACHE_SHOP_KEY + id); return Result.ok(); }
4.4 缓存穿透 4.4.1 简介 缓存穿透是指客户端请求的数据(恶意数据)在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库。
常见的解决方案有两种:
缓存空对象(“”
)
优点:实现简单,维护方便
缺点:额外的内存消耗;可能造成短期的不一致
布隆过滤
优点:内存占用较少,没有多余的key
缺陷:实现复杂;存在误判的可能
4.4.2 代码实现 通过添加空值(意思是value为“”
)来解决:
代码实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 @Override public Result queryById (Long id) { String key = RedisConstants.CACHE_SHOP_KEY + id; String shopJson = stringRedisTemplate.opsForValue().get(key); if (StrUtil.isNotBlank(shopJson)) { Shop shop = JSONUtil.toBean(shopJson, Shop.class); return Result.ok(shop); } if (shopJson != null ) { return Result.fail("店铺不存在" ); } Shop shop = getById(id); if (shop == null ) { stringRedisTemplate.opsForValue().set(key, "" , 2 , TimeUnit.MINUTES); return Result.fail("店铺不存在" ); } stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), 30 , TimeUnit.MINUTES); return Result.ok(shop); }
StrUtil.isNotBlank
判断字符串是否不为空
,不为空是指:
4.5 缓存雪崩 缓存雪崩是指在同一时段大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力。
解决方案:
给不同的Key的TTL添加随机值
利用Redis集群提高服务的可用性
给缓存业务添加降级限流策略
给业务添加多级缓存
4.6 缓存击穿 4.6.1 简介 缓存击穿问题也叫热点Key问题 ,就是一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。
常见的解决方案:
解决方案
优点
缺点
互斥锁
没有额外的内存消耗,保证一致性实现简单
线程需要等待,性能受影响。可能有死锁风险
逻辑过期
线程无需等待,性能较好
不保证一致性,有额外内存消耗,实现复杂
4.6.2 代码实现——互斥锁 需求:修改根据id查询商铺的业务,基于互斥锁方式来解决缓存击穿问题。
代码实现
设置成功,返回 1 。 设置失败,返回 0 。
互斥锁代码:
1 2 3 4 5 6 7 8 9 private boolean tryLock (String key) { Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1" , 10 , TimeUnit.SECONDS); return BooleanUtil.isTrue(flag); } private void unlock (String key) { stringRedisTemplate.delete(key); }
修改后的业务代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 @Override public Result queryById (Long id) { Shop shop = queryWithMutex(id); if (shop == null ) { return Result.fail("店铺不存在" ); } return Result.ok(shop); } public Shop queryWithMutex (Long id) { String key = RedisConstants.CACHE_SHOP_KEY + id; String shopJson = stringRedisTemplate.opsForValue().get(key); if (StrUtil.isNotBlank(shopJson)) { return JSONUtil.toBean(shopJson, Shop.class); } if (shopJson != null ) { return null ; } String lockKey = null ; Shop shop = null ; try { lockKey = RedisConstants.LOCK_SHOP_KEY + id; boolean isLock = tryLock(lockKey); if (!isLock) { Thread.sleep(50 ); return queryWithMutex(id); } shop = getById(id); if (shop == null ) { stringRedisTemplate.opsForValue().set(key, "" , 2 , TimeUnit.MINUTES); return null ; } stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), 30 , TimeUnit.MINUTES); } catch (InterruptedException e) { throw new RuntimeException(e); } finally { unlock(lockKey); } return shop; }
4.6.3 代码实现——逻辑过期 需求:修改根据id查询商铺的业务,基于逻辑过期方式来解决缓存击穿问题。
代码实现
1 2 3 4 5 @Data public class RedisData { private LocalDateTime expireTime; private Object data; }
1 2 3 4 5 6 7 8 9 10 11 private void saveShop2Redis (Long id, Long expireSeconds) { Shop shop = getById(id); RedisData redisData = new RedisData(); redisData.setData(shop); redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds)); stringRedisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(redisData)); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10 );public Shop queryWithLogicalExpire (Long id) { String key = RedisConstants.CACHE_SHOP_KEY + id; String shopJson = stringRedisTemplate.opsForValue().get(key); if (StrUtil.isBlank(shopJson)) { return null ; } RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class); JSONObject data = (JSONObject) redisData.getData(); Shop shop = JSONUtil.toBean(data, Shop.class); LocalDateTime expireTime = redisData.getExpireTime(); if (expireTime.isAfter(LocalDateTime.now())) { return shop; } String lockKey = RedisConstants.LOCK_SHOP_KEY + id; boolean isLock = tryLock(lockKey); if (isLock) { CACHE_REBUILD_EXECUTOR.submit(() -> { try { this .saveShop2Redis(id, 20L ); } catch (Exception e) { throw new RuntimeException(e); } finally { unlock(lockKey); } }); } return shop; }
4.7 缓存工具类 基于StringRedisTemplate封装一个缓存工具类,满足下列需求:
方法1:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置TTL过期时间
方法2:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置逻辑过期时间 ,用于处理缓存击穿问题
方法3:根据指定的key查询缓存,并反序列化为指定类型,利用缓存空值 的方式解决缓存穿透问题
方法4:根据指定的key查询缓存,并反序列化为指定类型,需要利用逻辑过期 解决缓存击穿问题
暂略
5 优惠券秒杀 5.1 全局唯一ID 5.1.1 简介 每个店铺都可以发布优惠券:当用户抢购时,就会生成订单并保存到tb_voucher_order
这张表中,而订单表如果使用数据库自增ID就存在一些问题:
全局ID生成器:是一种在分布式系统下用来生成全局唯一ID的工具。
为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其它信息:
ID的组成部分:
符号位:1bit
,永远为0
时间戳:31bit
,以秒为单位,可以使用69年
序列号:32bit
,秒内的计数器,支持每秒产生2^32
个不同ID
其他的生成策略
全局唯一ID生成策略:
UUID
Redis自增
每天一个key,方便统计订单量
ID构造是 时间戳 + 计数器
snowflake算法(雪花算法)
数据库自增
5.1.2 Redis实现全局唯一ID 在工具类下新建RedisIdWorker
工具类,用于生成全局唯一ID:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Component public class RedisIdWorker { private static final long BEGIN_TIMESTAMP = 1640995200L ; private static final int COUNT_BITS = 32 ; private StringRedisTemplate stringRedisTemplate; public RedisIdWorker (StringRedisTemplate stringRedisTemplate) { this .stringRedisTemplate = stringRedisTemplate; } public long nextId (String keyPrefix) { LocalDateTime now = LocalDateTime.now(); long nowSecond = now.toEpochSecond(ZoneOffset.UTC); long timestamp = nowSecond - BEGIN_TIMESTAMP; String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd" )); long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date); return timestam << COUNT_BITS | count; } }
单元测试:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 @SpringBootTest class HmDianPingApplicationTests { @Resource private RedisIdWorker redisIdWorker; private ExecutorService es = Executors.newFixedThreadPool(500 ); @Test void testIdWorker () throws InterruptedException { CountDownLatch latch = new CountDownLatch(300 ); Runnable task = () -> { for (int i = 0 ; i < 100 ; i++) { long id = redisIdWorker.nextId("order" ); System.out.println("id = " + id); } latch.countDown(); }; long begin = System.currentTimeMillis(); for (int i = 0 ; i < 300 ; i++) { es.submit(task); } latch.await(); long end = System.currentTimeMillis(); System.out.println("time = " + (end - begin)); } }
300个线程异步执行,每个线程获取100个id,则会总共获取30000个id。
执行结果:(部分)
1 2 3 4 5 6 7 8 9 10 11 // ... id = 205460090000536872 id = 205460090000536871 id = 205460090000536875 id = 205460090000536876 id = 205460090000536878 id = 205460090000536877 id = 205460090000536874 id = 205460090000536879 id = 205460090000536880 time = 4190
redis情况:
5.2 实现优惠券秒杀下单 5.2.1 添加数据 每个店铺都可以发布优惠券,分为平价券和特价券。平价券可以任意购买,而特价券需要秒杀抢购:
表关系如下:
tb_voucher
(普通券):店铺id,优惠券的基本信息,优惠金额、使用规则等
tb_seckill_voucher
(特价券):优惠券的库存、开始抢购时间,结束抢购时间。特价优惠券才需要填写这些信息。
添加数据略
5.2.2 代码实现 用户可以在店铺页面中抢购这些优惠券:
下单时需要判断两点:
秒杀是否开始或结束,如果尚未开始或已经结束则无法下单
库存是否充足,不足则无法下单
代码实现
业务层代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 public class VoucherOrderServiceImpl extends ServiceImpl <VoucherOrderMapper , VoucherOrder > implements IVoucherOrderService { @Resource private ISeckillVoucherService seckillVoucherService; @Resource private RedisIdWorker redisIdWorker; @Transactional @Override public Result seckillVoucher (Long voucherId) { SeckillVoucher voucher = seckillVoucherService.getById(voucherId); if (voucher.getBeginTime().isAfter(LocalDateTime.now())) { return Result.fail("秒杀尚未开始!" ); } if (voucher.getEndTime().isBefore(LocalDateTime.now())) { return Result.fail("秒杀已经结束!" ); } if (voucher.getStock() < 1 ) { return Result.fail("库存不足!" ); } boolean success = seckillVoucherService.update() .setSql("stock = stock - 1" ) .eq("voucher_id" , voucherId).update(); if (!success) { return Result.fail("库存不足!" ); } VoucherOrder voucherOrder = new VoucherOrder(); long orderId = redisWorker.nextId("order" ); voucherOrder.setId(orderId); Long userId = UserHolder.getUser().getId(); voucherOrder.setUserId(userId); voucherOrder.setVoucherId(voucherId); save(voucherOrder); return Result.ok(orderId); } }
5.3 超卖问题 5.3.1 乐观锁 超卖问题是典型的多线程安全问题,针对这一问题的常见解决方案就是加锁:
悲观锁:认为线程安全问题一定会发生,因此在操作数据之前先获取锁,确保线程串行执行。
例如Synchronized、Lock都属于悲观锁
乐观锁:认为线程安全问题不一定会发生,因此不加锁,只是在更新数据时去判断有没有其它线程对数据做了修改。
如果没有修改则认为是安全的,自己才更新数据。
如果已经被其它线程修改说明发生了安全问题,此时可以重试或异常。
乐观锁的实现有两种常见方法:版本号法和CAS法。
版本号法:修改时需要修改版本。
CAS法(Compare And Set):如果将上面对version的判断修改为对stock数据本身的判断,即为CAS法。
5.3.2 代码实现 基于乐观锁的CAS法解决超卖问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 @Transactional @Override public Result seckillVoucher (Long voucherId) { boolean success = seckillVoucherService.update() .setSql("stock = stock - 1" ) .eq("voucher_id" , voucherId).eq("stock" , voucher.getStock()) .update(); }
缺点:存在成功率低的问题。
分析:当1个线程执行update语句,拿到mysql行锁 ,判断当前库存voucher.getStock()
和数据库中的库存一致(例如100),于是修改数据库中的库存为99,其他99个线程阻塞在update上,唤醒后,依次随机判断当前库存voucher.getStock()
与数据库中的库存不一致,于是放弃出卖,即失败率为99%。
修改:
1 2 3 4 5 6 7 8 9 10 11 12 13 @Transactional @Override public Result seckillVoucher (Long voucherId) { boolean success = seckillVoucherService.update() .setSql("stock = stock - 1" ) .eq("voucher_id" , voucherId).gt("stock" , voucher.getStock()) .update(); }
5.4 一人一单 需求:修改秒杀业务,要求同一个优惠券,一个用户只能下一单。
5.4.1 代码实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Transactional @Override public Result seckillVoucher (Long voucherId) { Long userId = UserHolder.getUser().getId(); int count = query().eq("user_id" , userId).eq("voucher_id" , voucherId).count(); if (count > 0 ) { return Result.fail("用户已经购买过一次" ); } }
5.4.2 单机模式下的并发问题 同样存在并发问题。由于mysql查询语句不存在排他锁,因此可以并发执行,count可能被多个线程查询出为0,导致一人多单。
可以使用悲观锁解决:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 @Override public Result seckillVoucher (Long voucherId) { SeckillVoucher voucher = seckillVoucherService.getById(voucherId); if (voucher.getBeginTime().isAfter(LocalDateTime.now())) { return Result.fail("秒杀尚未开始!" ); } if (voucher.getEndTime().isBefore(LocalDateTime.now())) { return Result.fail("秒杀已经结束!" ); } if (voucher.getStock() < 1 ) { return Result.fail("库存不足!" ); } Long userId = UserHolder.getUser().getId(); synchronized (userId.toString().intern()) { IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy(); return proxy.createVoucherOrder(voucherId); } } @Transactional public Result createVoucherOrder (Long voucherId) { int count = query().eq("user_id" , userId).eq("voucher_id" , voucherId).count(); if (count > 0 ) { return Result.fail("用户已经购买过一次!" ); } boolean success = seckillVoucherService.update() .setSql("stock = stock - 1" ) .eq("voucher_id" , voucherId).gt("stock" , 0 ) .update(); if (!success) { return Result.fail("库存不足!" ); } VoucherOrder voucherOrder = new VoucherOrder(); long orderId = redisIdWorker.nextId("order" ); voucherOrder.setId(orderId); voucherOrder.setUserId(userId); voucherOrder.setVoucherId(voucherId); save(voucherOrder); return Result.ok(orderId); }
5.4.3 集群模式下的并发问题 通过加锁可以解决在单机情况下的一人一单安全问题,但是在集群模式下就不行了。
sync锁只能保证单个JVM中的各个线程之间互斥,而不能保证多个JVM中的各个线程之间的互斥。
5.5 分布式锁 5.5.1 简介
分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。
分布式锁的核心是实现多进程之间互斥,而满足这一点的方式有很多,常见的有三种:
特点
MySQL
Redis
Zookeeper
互斥
利用mysql本身的互斥锁机制
利用setnx这样的互斥命令
利用节点的唯一性和有序性实现互斥
高可用
好
好
好
高性能
一般
好
一般
安全性
断开连接,自动释放锁
利用锁超时时间,到期释放
临时节点,断开连接自动释放
5.5.2 基于Redis的分布式锁
实现分布式锁时需要实现的两个基本方法:
5.5.3 代码实现——初级版 需求:定义一个类,实现下面接口,利用Redis实现分布式锁功能。
1 2 3 4 5 6 7 8 9 10 11 12 public interface ILock { boolean tryLock (long timeoutSec) ; void unlock () ; }
代码实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class SimpleRedisLock implements ILock { private String name; private StringRedisTemplate stringRedisTemplate; public SimpleRedisLock (String name, StringRedisTemplate stringRedisTemplate) { this .name = name; this .stringRedisTemplate = stringRedisTemplate; } private static final String KEY_PREFIX = "lock:" ; @Override public boolean tryLock (long timeoutSec) { long threadId = Thread.currentThread().getId(); Boolean success = stringRedisTemplate.opsForValue() .setIfAbsent(KEY_PREFIX + name, threadId + "" , timeoutSec, TimeUnit.SECONDS); return Boolean.TRUE.equals(success); } @Override public void unlock () { stringRedisTemplate.delete(KEY_PREFIX + name); } }
业务层代码改造:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 Long userId = UserHolder.getUser().getId(); synchronized (userId.toString().intern()) { IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy(); return proxy.createVoucherOrder(voucherId); } Long userId = UserHolder.getUser().getId(); SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate); boolean isLock = lock.tryLock(1200 );if (!isLock) { return Result.fail("不允许重复下单" ); } try { IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy(); return proxy.createVoucherOrder(voucherId); } finally { lock.unlock(); }
5.5.4 锁的误删
解决:线程1在释放锁时,判断锁是否是自己的,即key是否为自己的线程值。
流程变化:
5.5.5 代码实现——改进版1 需求:修改之前的分布式锁实现,满足:
在获取锁时存入线程标示(可以用UUID
表示,防止标识冲突)
在释放锁时先获取锁中的线程标示,判断是否与当前线程标示一致
代码实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public class SimpleRedisLock implements ILock { private String name; private StringRedisTemplate stringRedisTemplate; public SimpleRedisLock (String name, StringRedisTemplate stringRedisTemplate) { this .name = name; this .stringRedisTemplate = stringRedisTemplate; } private static final String KEY_PREFIX = "lock:" ; private static final String ID_PREFIX = UUID.randomUUID().toString(true ) + "-" ; @Override public boolean tryLock (long timeoutSec) { String threadId = ID_PREFIX + Thread.currentThread().getId(); Boolean success = stringRedisTemplate.opsForValue() .setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS); return Boolean.TRUE.equals(success); } @Override public void unlock () { String threadId = ID_PREFIX + Thread.currentThread().getId(); String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name); if (threadId.equals(id)) { stringRedisTemplate.delete(KEY_PREFIX + name); } } }
5.5.6 锁的原子性问题
本质:判断锁和释放锁的操作不是原子性操作
5.5.7 Lua脚本 Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令 ,确保多条命令执行时的原子性。
基本语法:
1 2 redis.call('命令名称' , 'key' , '其它参数' , ...)
例如:
1 2 3 4 5 6 redis.call('set' , 'name' , 'jack' ) local name = redis.call('get' , 'name' )return name
写好脚本以后,需要用Redis命令来调用脚本,调用脚本的常见命令如下:
1 EVAL script numkeys key [key...] arg [arg...]
例如,我们要执行 redis.call('set', 'name', 'jack')
这个脚本,语法如下:
1 2 EVAL "return redis.call('set', 'name', 'jack')" 0
又如:key类型参数会放入KEYS数组,其它参数会放入ARGV数组,在脚本中可以从KEYS和ARGV数组获取这些参数,注意,lua中下标从1开始:
1 2 EVAL "return redis.call('set', KEYS[1], ARGV[1])" 1 name Rose
释放锁的业务流程是这样的:
获取锁中的线程标示
判断是否与指定的标示(当前线程标示)一致
如果用Lua脚本来表示则是这样的:
1 2 3 4 5 6 7 8 if (redis.call('GET' , KEYS[1 ]) == ARGV[1 ]) then return redis.call('DEL' , KEYS[1 ]) end return 0
5.5.8 代码实现——改进版2 需求:基于Lua脚本实现分布式锁的释放锁逻辑
提示:RedisTemplate调用Lua脚本的API如下:
代码实现
在resource
中新增lua脚本:unlock.lua
,代码见上。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public class SimpleRedisLock implements ILock { private static final String KEY_PREFIX = "lock:" ; private static final String ID_PREFIX = UUID.randomUUID().toString(true ) + "-" ; private static final DefaultRedisScript<Long> UNLOCK_SCRIPT; static { UNLOCK_SCRIPT = new DefaultRedisScript<>(); UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua" )); UNLOCK_SCRIPT.setResultType(Long.class); } @Override public boolean tryLock (long timeoutSec) { String threadId = ID_PREFIX + Thread.currentThread().getId(); Boolean success = stringRedisTemplate.opsForValue() .setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS); return Boolean.TRUE.equals(success); } @Override public void unlock () { stringRedisTemplate.execute( UNLOCK_SCRIPT, Collections.singletonList(KEY_PREFIX + name), ID_PREFIX + Thread.currentThread().getId()); } }
5.6 Redisson 5.6.1 基于SETNX的分布式锁问题
不可重入:同一个线程无法多次获取同一把锁
不可重试:获取锁只尝试一次就返回false,没有重试机制
超时释放:锁超时释放虽然可以避免死锁,但如果是业务执行耗时较长,也会导致锁释放,存在安全隐患
主从一致性:如果Redis提供了主从集群,主从同步存在延迟,当主宕机时,如果从并同步主中的锁数据,则会出现锁失效
5.6.2 Redisson入门 Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。
配置:
1 2 3 4 5 6 7 8 9 10 11 12 @Configuration public class RedissonConfig { @Bean public RedissonClient redissonClient () { Config config = new Config(); config.useSingleServer().setAddress("redis://192.168.150.101:6379" ).setPassword("123321" ); return Redisson.create(config); } }
使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Resource private RedissonClient redissonClient;@Test void testRedisson () throws InterruptedException { RLock lock = redissonClient.getLock("anyLock" ); boolean isLock = lock.tryLock(1 , 10 , TimeUnit.SECONDS); if (isLock){ try { System.out.println("执行业务" ); } finally { lock.unlock(); } } }
5.6.3 可重入锁原理 Redssion实现了重入锁,下面的代码能够正常执行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 @Slf4j @SpringBootTest class RedissonTest { @Resource private RedissonClient redissonClient; private RLock lock; @BeforeEach void setUp () { lock = redissonClient.getLock("order" ); } @Test void method1 () throws InterruptedException { boolean isLock = lock.tryLock(1L , TimeUnit.SECONDS); if (!isLock) { log.error("获取锁失败 .... 1" ); return ; } try { log.info("获取锁成功 .... 1" ); method2(); log.info("开始执行业务 ... 1" ); } finally { log.warn("准备释放锁 .... 1" ); lock.unlock(); } } void method2 () { boolean isLock = lock.tryLock(); if (!isLock) { log.error("获取锁失败 .... 2" ); return ; } try { log.info("获取锁成功 .... 2" ); log.info("开始执行业务 ... 2" ); } finally { log.warn("准备释放锁 .... 2" ); lock.unlock(); } } }
手动实现可重入锁
在获取锁的时候,判断这个锁是属于哪个线程的,如果属于自己,则能够获取。
可以在VALUE
上使用hash,记录重入的次数:
流程:
由于redis中没有对值为hash结构的SETNX
命令,因此采用lua脚本来实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 local key = KEYS[1 ]; local threadId = ARGV[1 ]; local releaseTime = ARGV[2 ]; if (redis.call('exists' , key) == 0 ) then redis.call('hset' , key, threadId, '1' ); redis.call('expire' , key, releaseTime); return 1 ; end ;if (redis.call('hexists' , key, threadId) == 1 ) then redis.call('hincrby' , key, threadId, '1' ); redis.call('expire' , key, releaseTime); return 1 ; end ;return 0 ;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 local key = KEYS[1 ]; local threadId = ARGV[1 ]; local releaseTime = ARGV[2 ]; if (redis.call('HEXISTS' , key, threadId) == 0 ) then return nil ; end ;local count = redis.call('HINCRBY' , key, threadId, -1 );if (count > 0 ) then redis.call('EXPIRE' , key, releaseTime); return nil ; else redis.call('DEL' , key); return nil ; end ;
事实上,Redssion实现的可重入锁底层也是调用了lua脚本,和上面的类似。
5.6.4 Redisson分布式锁原理
暂略
总结:
可重入:利用hash结构记录线程id和重入次数
可重试:利用信号量和PubSub功能实现等待、唤醒,获取锁失败的重试机制
超时续约:利用watchDog,每隔一段时间(releaseTime / 3),重置超时时间
5.6.5 主从一致性问题
一个线程从Redis master上获取锁,获取到锁后,主节点宕机
解决:取消主从模式,而是采用分片集群:线程需要向三个redis服务器上获取锁成功时,才能成功:
这称之为Redssion的multiLock
:多个独立的Redis节点,必须在所有节点都获取重入锁,才算获取锁成功。
缺点:运维成本高、实现复杂。
5.7 秒杀优化 5.7.1 分析 原始流程:在之前的实现中,redis和数据库的操作是同步执行的,整个流程中后者耗时较长,因此导致并发量小。
优化:将耗时的数据库操作分离出来,在Tomcat中执行,并发量大的操作在redis中执行,redis中的结果异步通知到tomcat,进行数据库的读写操作。
redis中需要保存的有:
key:id为XXX的秒杀优惠券,value:该秒杀优惠券的库存
key:购买id为XXX的用户,value:用户id(使用set存储)
5.7.2 代码实现 需求:
新增秒杀优惠券的同时,将优惠券信息保存到Redis中
基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Override @Transactional public void addSeckillVoucher (Voucher voucher) { save(voucher); SeckillVoucher seckillVoucher = new SeckillVoucher(); seckillVoucher.setVoucherId(voucher.getId()); seckillVoucher.setStock(voucher.getStock()); seckillVoucher.setBeginTime(voucher.getBeginTime()); seckillVoucher.setEndTime(voucher.getEndTime()); seckillVoucherService.save(seckillVoucher); stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString()); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 local voucherId = ARGV[1 ]local userId = ARGV[2 ]local orderId = ARGV[3 ]local stockKey = 'seckill:stock:' .. voucherIdlocal orderKey = 'seckill:order:' .. voucherIdif (tonumber (redis.call('get' , stockKey)) <= 0 ) then return 1 end if (redis.call('sismember' , orderKey, userId) == 1 ) then return 2 end redis.call('incrby' , stockKey, -1 ) redis.call('sadd' , orderKey, userId) return 0
异步下单:VoucherOrderServiceImpl
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024 );@PostConstruct private void init () { SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler()); } private class VoucherOrderHandler implements Runnable { @Override public void run () { while (true ){ try { VoucherOrder voucherOrder = orderTasks.take(); createVoucherOrder(voucherOrder); } catch (Exception e) { log.error("处理订单异常" , e); } } } } @Override public Result seckillVoucher (Long voucherId) { Long userId = UserHolder.getUser().getId(); Long result = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString() ); int r = result.intValue(); if (r != 0 ) { return Result.fail(r == 1 ? "库存不足" : "不能重复下单" ); } VoucherOrder voucherOrder = new VoucherOrder(); long orderId = redisIdWorker.nextId("order" ); voucherOrder.setId(orderId); voucherOrder.setUserId(userId); voucherOrder.setVoucherId(voucherId); orderTasks.add(voucherOrder); return Result.ok(orderId); } private void createVoucherOrder (VoucherOrder voucherOrder) { Long userId = voucherOrder.getUserId(); Long voucherId = voucherOrder.getVoucherId(); RLock redisLock = redissonClient.getLock("lock:order:" + userId); boolean isLock = redisLock.tryLock(); if (!isLock) { log.error("不允许重复下单!" ); return ; } try { int count = query().eq("user_id" , userId).eq("voucher_id" , voucherId).count(); if (count > 0 ) { log.error("不允许重复下单!" ); return ; } boolean success = seckillVoucherService.update() .setSql("stock = stock - 1" ) .eq("voucher_id" , voucherId).gt("stock" , 0 ) .update(); if (!success) { log.error("库存不足!" ); return ; } save(voucherOrder); } finally { redisLock.unlock(); } }
小结:秒杀业务的优化思路是什么
先利用Redis完成库存余量、一人一单判断,完成抢单业务
再将下单业务放入阻塞队列,利用独立线程异步下单
问题:
内存限制问题:阻塞队列的长度受JVM限制
数据安全问题:宕机
5.8 消息队列实现异步秒杀 5.8.1 消息队列简介 消息队列(Message Queue),字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:
消息队列:存储和管理消息,也被称为消息代理(Message Broker),是独立于JVM的服务
生产者:发送消息到消息队列
消费者:从消息队列获取消息并处理消息
5.8.2 Redis中的消息队列 Redis提供了三种不同的方式来实现消息队列:
list结构:基于List结构模拟消息队列
PubSub:基本的点对点消息模型
Stream:比较完善的消息队列模型
① list结构 队列是入口和出口不在一边,因此我们可以利用:LPUSH
结合 RPOP
、或者 RPUSH
结合 LPOP
来实现。
不过要注意的是,当队列中没有消息时RPOP或LPOP操作会立即返回null,并不像JVM的阻塞队列那样会阻塞并等待 消息。因此这里应该使用BRPOP
或者BLPOP
来实现阻塞效果。
优点:
利用Redis存储,不受限于JVM内存上限
基于Redis的持久化机制,数据安全性有保证
可以满足消息有序性
缺点:
② PubSub PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。
SUBSCRIBE channel [channel]
:订阅一个或多个频道
PUBLISH channel msg
:向一个频道发送消息
PSUBSCRIBE pattern[pattern]
:订阅与pattern格式匹配的所有频道
优点:
缺点:
不支持数据持久化
无法避免消息丢失
消息堆积有上限,超出时数据丢失
③ Stream Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。
例如:
例如:使用XREAD读取第一个消息:
例如:阻塞方式,读取最新的消息:
在业务开发中,我们可以循环的调用XREAD阻塞方式来查询最新消息,从而实现持续监听队列的效果,伪代码如下:
当我们指定起始ID为$
时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息 的问题。
特点:
消息可回溯
一个消息可以被多个消费者读取
可以阻塞读取
有消息漏读的风险
5.8.3 消费者组 消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:
消息分流:队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度
消息标示:消费者组会维护一个标示,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标示之后读取消息。确保每一个消息都会被消费。
消息确认:消费者获取消息后,消息处于pending状态,并存入一个pending-list。当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list移除。