Redis学习秘籍:一招击穿系统,附赠N个解决方案 | 原力计划

发表时间: 2020-04-17 18:20

作者 | Mark_MMXI

来源 | CSDN博客,责编 | 夕颜

出品 | CSDN(ID:CSDNnews)

缓存的存在是为了在高并发情形下,缓解DB压力,提高业务系统体验。业务系统访问数据,先去缓存中进行查询,假如缓存存在数据直接返回缓存数据,否则就去查询数据库再返回值。

Redis是一种缓存工具,是一种缓存解决方案,但是引入Redis又有可能出现缓存穿透、缓存击穿、缓存雪崩等问题。本文就对缓存雪崩问题进行较深入剖析,并通过场景模型加深理解,基于场景使用对应的解决方案尝试解决。

缓存原理及Redis解决方案

首先,我们来看一下缓存的工作原理图:

Redis 本质上是一个 Key-Value 类型的内存数据库。因为是纯内存操作,Redis 的性能非常出色,每秒可以处理超过 10、万次读写操作。Redis 还有一个优势就是是支持保存多种数据结构,例如 String、List、Set、Sorted Set、hash等。

缓存雪崩

2.1 缓存雪崩解释

缓存雪崩的情况是说,当某一时刻发生大规模的缓存失效的情况,比如你的缓存服务宕机了,DB直接负载大量请求压力导致挂掉。

2.2 模拟缓存雪崩

按照缓存雪崩的解释,其实我们要模拟,只需要达到以下几个点:

  1. 同一时刻大规模缓存失效。

  2. 失效的时刻有大量的查询请求冲击DB

 @Test public void testQuery{ ExecutorService es = Executors.newFixedThreadPool(10); int loop= 1000; int init=2000; //查询1k个key放进缓存 for (int i = init; i < loop+init; i++) { userService.queryById(i); } //缓存过期时间为1s,等待1s同时过期 try { Thread.sleep(1000); }catch (Exception e){ e.printStackTrace; } //开始了使用多线程疯狂查询 for (int i = 0; i < 100; i++) { es.execute( -> { for (int k = init; k < loop+init; k++) { userService.queryById(k); } }); } }

为了加快崩坏的速度,把数据库的最大连接数调整成5,同时增大数据库表的数据量达到百万级别。

然后执行测试程序,很快程序就报错并停止,详细错误如下:

Exception in thread "pool-1-thread-12" org.springframework.data.redis.RedisSystemException: Redis exception; nested exception is io.lettuce.core.RedisException: Connection is closedat org.springframework.data.redis.connection.lettuce.LettuceExceptionConverter.convert(LettuceExceptionConverter.java:74)at org.springframework.data.redis.connection.lettuce.LettuceExceptionConverter.convert(LettuceExceptionConverter.java:41)at org.springframework.data.redis.PassThroughExceptionTranslationStrategy.translate(PassThroughExceptionTranslationStrategy.java:44)at org.springframework.data.redis.FallbackExceptionTranslationStrategy.translate(FallbackExceptionTranslationStrategy.java:42)at org.springframework.data.redis.connection.lettuce.LettuceConnection.convertLettuceAccessException(LettuceConnection.java:270)at org.springframework.data.redis.connection.lettuce.LettuceStringCommands.convertLettuceAccessException(LettuceStringCommands.java:799)at org.springframework.data.redis.connection.lettuce.LettuceStringCommands.get(LettuceStringCommands.java:68)at org.springframework.data.redis.connection.DefaultedRedisConnection.get(DefaultedRedisConnection.java:260)at org.springframework.data.redis.cache.DefaultRedisCacheWriter.lambda$get$1(DefaultRedisCacheWriter.java:109)at org.springframework.data.redis.cache.DefaultRedisCacheWriter.execute(DefaultRedisCacheWriter.java:242)at org.springframework.data.redis.cache.DefaultRedisCacheWriter.get(DefaultRedisCacheWriter.java:109)at org.springframework.data.redis.cache.RedisCache.lookup(RedisCache.java:88)at org.springframework.cache.support.AbstractValueAdaptingCache.get(AbstractValueAdaptingCache.java:58)at org.springframework.cache.interceptor.AbstractCacheInvoker.doGet(AbstractCacheInvoker.java:73)at org.springframework.cache.interceptor.CacheAspectSupport.findInCaches(CacheAspectSupport.java:554)at org.springframework.cache.interceptor.CacheAspectSupport.findCachedItem(CacheAspectSupport.java:519)at org.springframework.cache.interceptor.CacheAspectSupport.execute(CacheAspectSupport.java:401)at org.springframework.cache.interceptor.CacheAspectSupport.execute(CacheAspectSupport.java:345)at org.springframework.cache.interceptor.CacheInterceptor.invoke(CacheInterceptor.java:61)at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:747)at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)at com.example.demo.user.service.impl.UserServiceImpl$$EnhancerBySpringCGLIB$$ba6638d2.queryById(<generated>)at com.example.demo.DemoApplicationTests$1.run(DemoApplicationTests.java:55)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)Caused by: io.lettuce.core.RedisException: Connection is closedat io.lettuce.core.protocol.DefaultEndpoint.validateWrite(DefaultEndpoint.java:195)at io.lettuce.core.protocol.DefaultEndpoint.write(DefaultEndpoint.java:137)at io.lettuce.core.protocol.CommandExpiryWriter.write(CommandExpiryWriter.java:112)2020-03-08 22:31:14.432 ERROR 37892 --- [eate-1895102622] com.alibaba.druid.pool.DruidDataSource : create connection SQLException, url: jdbc:mysql://localhost:3306/redis_demo?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=UTC, errorCode 1040, state 08004
java.sql.SQLNonTransientConnectionException: Data source rejected establishment of connection, message from server: "Too many connections"

主要问题出在数据库连接已经满了,无法获取数据库连接进行查询,这个现象是就是缓存雪崩的效果。

2.3 解决缓存雪崩

2.3.1 分析雪崩场景

用图来说,实际上就是没有了redis这层担着上层流量压力

其实从这张图来看,对于我们一般的应用,客户端去访问应用到数据库的整个链路过程,其实在面临大流量的时候,我们一般是以"倒三角"模型进行流量缓冲,什么是“倒三角”模型

通过"倒三角"模型,按照并发需要优化系统,在面临雪崩这种情形,可以按照“倒三角”模型进行优化,注意雪崩是理论上没办法彻底解决的,可能到最终得提高硬件配置。

2.3.1 雪崩优化方案

经过分析得解决雪崩方案:

1.随机缓存过期时间,能一定程度缓解雪崩2.使用锁或队列、设置过期标志更新缓存3.添加本地缓存实现多级缓存4.添加熔断降级限流,缓冲压力

2.3.1.1 随机缓存时间

随机缓存时间意在避免大量热点key同时失效。

接下来,我们基于Redis+SpringBoot+SpringCache基础项目搭建这个项目继续进行实践。

由于是使用了SpringCache,我们最优的方案就是直接在@Cacheable等注解上面加参数,比如像表达式之类的,让数据放进缓存的时候按照表达式/参数值定义过期时间。

因此我们先查看原有的RedisCache是怎么样的put逻辑

RedisCacheManager创建Cache

protected RedisCache createRedisCache(String name, @able RedisCacheConfiguration cacheConfig) { return new RedisCache(name, this.cacheWriter, cacheConfig !=  ? cacheConfig : this.defaultCacheConfig); }

打开RedisCache.class,查看put 方法如下:

 public void put(Object key, @able Object value) { Object cacheValue = this.preProcessCacheValue(value); if (!this.isAllowValues && cacheValue == ) { throw new IllegalArgumentException(String.format("Cache '%s' does not allow '' values. Avoid storing via '@Cacheable(unless=\"#result == \")' or configure RedisCache to allow '' via RedisCacheConfiguration.", this.name)); } else { this.cacheWriter.put(this.name, this.createAndConvertCacheKey(key), this.serializeCacheValue(cacheValue), this.cacheConfig.getTtl); } }

这里this.cacheConfig.getTtl 就是缓存的过期时间,可以看到数据的缓存过期时间是从全局缓存配置里面获取的过期时间配置的,而我需要实现的是让某个cache下每个key随机时间过期,因此我们需要改动这里 this.cacheConfig.getTtl,我们在createRedisCache的时候改变这个值就行了。

1. 基于java动态执行字符串代码,返回过期时间。

实现基于Spring.expression的ExpressService

/**
* @title: ExpressUtil * @projectName redisdemo * @description: 动态执行字符串代码 * @author lps * @date 2020/3/912:01 */@Slf4jpublic class ExpressService {



private ExpressionParser spelExpressionParser;

private ParserContext parserContext; // 表达式解析上下文 private StandardEvaluationContext evaluationContext;

public static enum ExpressType { /** * ${}表达式格式 */ TYPE_FIRST, /** * #{}表达式格式 */ TYPE_SECOND } private static final String PRE_TYPE_1 = "${"; private static final String PRE_TYPE_2 = "#{"; private static final String SUF_STR = "}";

private ExpressService(String pre, String suf) { spelExpressionParser = new SpelExpressionParser; log.debug("表达式前缀={},表达式后缀={}", pre, suf); evaluationContext = new StandardEvaluationContext; // 增加map解析方案 evaluationContext.addPropertyAccessor(new MapAccessor); parserContext = new TemplateParserContext(pre, suf); }

/** * * <p> * 创建表达式处理服务对象 默认为创建#{}格式表达式 通过ExpressType指定表达式格式,现有两种${}和#{} * </p> * * * @param type * 表达式格式类型 * @return 表达式解析对象 */ public static ExpressService createExpressService(ExpressType type) { if (type == ExpressType.TYPE_FIRST) { log.debug("生成表达式,表达式前缀={}", PRE_TYPE_1); return new ExpressService(PRE_TYPE_1, SUF_STR); } else { return new ExpressService(PRE_TYPE_2, SUF_STR); }

}

public Object expressParse(String express, Object data) throws Exception { log.debug("解析表达式信息={}", express); Expression expression = spelExpressionParser.parseExpression(express, this.parserContext); return expression.getValue(evaluationContext, data); }
}

测试调用:

 @Test
public void testExpress{ ExpressService express = ExpressService.createExpressService; try { //固定超时时间 System.out.println("ttl="+express.expressParse("#{60}", )); //调用方法生成随机过期时间 System.out.println("ttl="+express.expressParse("#{T(org.apache.commons.lang3.RandomUtils).nextInt(60,200)}", )); } catch (Exception e) { e.printStackTrace; } }

2. 设计name拼接ttl规则

由于createRedisCache只有两个参数name以及cacheConfig,而只有name是对于单个cache来说的,cacheConfig是对于全局cache来说,因此我们需要设计name参数中指定cache的name以及过期时间的规则。

name赋值规则:name|ttlFun

eg:  @Cacheable(cacheName="test|#{T(org.apache.commons.lang3.RandomUtils).nextInt(60,200)}") @Cacheable(cacheName="test|#{60}")

3. 编写解析name代码

 /** * 分隔符| */ private static final String SEPERATE_LINE = "|"; public MyRedisCacheManager(RedisCacheWriter cacheWriter, RedisCacheConfiguration defaultCacheConfiguration) { super(cacheWriter, defaultCacheConfiguration); }

protected RedisCache createRedisCache(String name, @able RedisCacheConfiguration cacheConfig) {// ``name赋值规则:name|ttlFun `` if(name.contains(SEPERATE_LINE)){ String cacheName = name.substring(0,name.indexOf(SEPERATE_LINE)); String expression = name.substring(name.indexOf(SEPERATE_LINE)+1); try{ ExpressService express = ExpressService.createExpressService; long ttl = Long.parseLong(express.expressParse(expression, ).toString); cacheConfig = cacheConfig.entryTtl(Duration.ofSeconds(ttl)); return super.createRedisCache(cacheName, cacheConfig);

}catch (Exception e){ e.printStackTrace; return super.createRedisCache(name, cacheConfig); }

} return super.createRedisCache(name, cacheConfig);

}

4. 修改CacheConfig

将原本的RedisManager替换成#3编写的MyRedisManager

 /**
* 配置缓存管理器 */ @Bean public CacheManager cacheManager(RedisConnectionFactory factory) { //关键点,spring cache 的注解使用的序列化都从这来,没有这个配置的话使用的jdk自己的序列化,实际上不影响使用,只是打印出来不适合人眼识别 RedisCacheConfiguration cacheConfig = RedisCacheConfiguration.defaultCacheConfig // 将 key 序列化成字符串 .serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer)) // 将 value 序列化成 json .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer)) // 设置缓存过期时间,单位秒 .entryTtl(Duration.ofSeconds(cacheExpireTime)) // 不缓存空值 .disableCachingValues;/* RedisCacheManager redisCacheManager = RedisCacheManager.builder(factory) .cacheDefaults(cacheConfig) .build;*/ //修改RedisCacheManager 为MyRedisCacheManager MyRedisCacheManager redisCacheManager = new MyRedisCacheManager(RedisCacheWriter.nonLockingRedisCacheWriter(factory), cacheConfig); return redisCacheManager; }

5. 测试

编写单元测试

 @Test
public void testQueryIdWithExpress{ Assert.assertNot(userService.queryById(3333)); }

重新定义查询的cache定义

 @Override
@Cacheable( value = "ca1|#{60}",key = "#id" ,unless="#result == ")// @Cacheable( value = "ca1|#{T(org.apache.commons.lang3.RandomUtils).nextInt(100,200)}",key = "#id" ,unless="#result == ") public User queryById(int id) { return this.userDao.queryById(id); }

当value=ca1|#{60}的时候,通过查看Redis的TTL 剩余为58s

当value=ca1|#{T(
org.apache.commons.lang3.RandomUtils).nextInt(100,200)}的时候,随机100-220范围内秒数,通过查看Redis的TTL 剩余为107s

这时候使用random的方式就可以实现随机过期时间了,随机数最好选择符合高斯(正态)分布的会比较好。

new Random.nextGaussian

2.3.1.2 互斥锁排队

业界比价普遍的一种做法,即根据key获取value值为空时,锁上,从数据库中load数据后再释放锁。若其它线程获取锁失败,则等待一段时间后重试。这里要注意,分布式环境中要使用分布式锁,单机的话用普通的锁(synchronized、Lock)就够了。

这样做思路比较清晰,也从一定程度上减轻数据库压力,但是锁机制使得逻辑的复杂度增加,吞吐量也降低了,有点治标不治本。

1.使用setnx的方式设置互斥锁

 public User queryById(int id) { try { if (redisTemplate.hasKey(id+"")) { return (User) redisTemplate.opsForValue.get(id+""); } else { //获取锁 if(lock(id+"")){ // 数据库查询 User user = userDao.queryById(id); redisTemplate.opsForValue.set(id+"",user, Duration.ofSeconds(3000)); //释放锁 redisTemplate.delete(LOCK_PREFIX + "id"); } } } catch (Exception e) { e.printStackTrace; } return (User) redisTemplate.opsForValue.get(""+id); } private static String LOCK_PREFIX = "prefix"; private static long LOCK_EXPIRE =3000; /** * 互斥锁实现 */ public boolean lock(String key) { String lock = LOCK_PREFIX + key; return (Boolean) redisTemplate.execute((RedisCallback) connection -> { long expireAt = System.currentTimeMillis + LOCK_EXPIRE + 1; //SETNX Boolean acquire = connection.setNX(lock.getBytes, String.valueOf(expireAt).getBytes); if (acquire) { return true; } else { byte value = connection.get(lock.getBytes); if (Objects.non(value) && value.length > 0) { long expireTime = Long.parseLong(new String(value));  //判断锁是否过期  if (expireTime < System.currentTimeMillis) { byte oldValue = connection.getSet(lock.getBytes, String.valueOf(System.currentTimeMillis + LOCK_EXPIRE + 1).getBytes); return Long.parseLong(new String(oldValue)) < System.currentTimeMillis; } } } return false; }); }

2.3.1.3 设置过期标志更新缓存

定时更新缓存,阻塞部分请求,达到缓冲作用,也可以设置key永不过期

2.3.1.4 多级缓存

这个方案主要在redis宕机,或者key在更新进缓存的中间,可以响应业务应用,减轻压力

2.3.1.5 熔断降级限流

这个方案是直接在业务应用之上进行请求流量控制,减轻下层压力

原文链接:
https://blog.csdn.net/qq_28540443/article/details/104746655