引言
在现代分布式系统中,Redis作为高性能的内存数据库,广泛应用于缓存层以提升系统性能和响应速度。然而,在高并发场景下,缓存的使用也带来了诸多挑战,其中最典型的三大问题就是缓存穿透、缓存击穿和缓存雪崩。这些问题如果处理不当,可能导致系统性能急剧下降,甚至服务不可用。
本文将深入分析这三种常见缓存问题的本质原因,并提供切实可行的解决方案,帮助开发者构建更加稳定可靠的高并发系统。
缓存穿透:空值缓存的防御机制
什么是缓存穿透
缓存穿透是指查询一个根本不存在的数据,由于缓存中没有该数据,需要去数据库查询。如果数据库中也没有这个数据,就直接返回空结果。在高并发场景下,大量的请求会直接打到数据库,造成数据库压力过大,严重时可能导致数据库宕机。
缓存穿透的危害
// 模拟缓存穿透的典型场景
@Service
public class UserService {
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private UserMapper userMapper;
public User getUserById(Long id) {
// 1. 先从Redis中获取用户信息
String key = "user:" + id;
User user = (User) redisTemplate.opsForValue().get(key);
// 2. 如果Redis中没有,直接查询数据库
if (user == null) {
user = userMapper.selectById(id); // 这里会直接访问数据库
// 3. 将结果写入Redis(这里可能存在问题)
if (user != null) {
redisTemplate.opsForValue().set(key, user, 300, TimeUnit.SECONDS);
}
}
return user;
}
}
如上代码所示,当查询一个不存在的用户ID时,会直接穿透到数据库,造成不必要的压力。
缓存穿透解决方案
方案一:缓存空值
@Service
public class UserService {
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private UserMapper userMapper;
private static final Long NULL_USER_TTL = 300L; // 空值缓存过期时间
public User getUserById(Long id) {
String key = "user:" + id;
Object cacheValue = redisTemplate.opsForValue().get(key);
// 如果缓存中存在,直接返回
if (cacheValue != null) {
if (cacheValue instanceof String && "NULL".equals(cacheValue)) {
return null; // 返回空值
}
return (User) cacheValue;
}
// 缓存未命中,查询数据库
User user = userMapper.selectById(id);
// 将查询结果写入缓存
if (user != null) {
redisTemplate.opsForValue().set(key, user, 300, TimeUnit.SECONDS);
} else {
// 关键:将空值也缓存起来,避免重复查询数据库
redisTemplate.opsForValue().set(key, "NULL", NULL_USER_TTL, TimeUnit.SECONDS);
}
return user;
}
}
方案二:布隆过滤器
@Component
public class BloomFilterCache {
private static final int CAPACITY = 1000000; // 布隆过滤器容量
private static final double ERROR_RATE = 0.01; // 误判率
private final BloomFilter<String> bloomFilter;
public BloomFilterCache() {
this.bloomFilter = BloomFilter.create(
Funnels.stringFunnel(Charset.defaultCharset()),
CAPACITY,
ERROR_RATE
);
}
/**
* 将已存在的key加入布隆过滤器
*/
public void addKey(String key) {
bloomFilter.put(key);
}
/**
* 检查key是否存在
*/
public boolean containsKey(String key) {
return bloomFilter.mightContain(key);
}
}
@Service
public class UserService {
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private BloomFilterCache bloomFilterCache;
@Autowired
private UserMapper userMapper;
public User getUserById(Long id) {
String key = "user:" + id;
// 1. 先通过布隆过滤器检查key是否存在
if (!bloomFilterCache.containsKey(key)) {
return null; // 布隆过滤器判断不存在,直接返回
}
// 2. 如果可能存在,再查询缓存
Object cacheValue = redisTemplate.opsForValue().get(key);
if (cacheValue != null) {
if (cacheValue instanceof String && "NULL".equals(cacheValue)) {
return null;
}
return (User) cacheValue;
}
// 3. 缓存未命中,查询数据库
User user = userMapper.selectById(id);
if (user != null) {
redisTemplate.opsForValue().set(key, user, 300, TimeUnit.SECONDS);
bloomFilterCache.addKey(key); // 将存在的key加入布隆过滤器
} else {
redisTemplate.opsForValue().set(key, "NULL", 300, TimeUnit.SECONDS);
}
return user;
}
}
缓存击穿:热点数据的保护策略
什么是缓存击穿
缓存击穿是指某个热点数据在缓存中过期失效,此时大量并发请求同时访问该数据,导致所有请求都直接打到数据库上,形成缓存雪崩的前兆。
缓存击穿的危害
// 缓存击穿示例
@Service
public class ProductService {
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private ProductMapper productMapper;
public Product getProductById(Long id) {
String key = "product:" + id;
// 1. 先从缓存中获取
Product product = (Product) redisTemplate.opsForValue().get(key);
if (product == null) {
// 2. 缓存未命中,查询数据库
product = productMapper.selectById(id);
// 3. 写入缓存(如果存在)
if (product != null) {
redisTemplate.opsForValue().set(key, product, 300, TimeUnit.SECONDS);
}
}
return product;
}
}
当一个热点商品的缓存过期后,大量用户同时访问会导致数据库瞬间压力剧增。
缓存击穿解决方案
方案一:互斥锁机制
@Service
public class ProductService {
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private ProductMapper productMapper;
// 缓存过期时间
private static final long CACHE_EXPIRE_TIME = 300L;
public Product getProductById(Long id) {
String key = "product:" + id;
String lockKey = "lock:" + id;
// 1. 先从缓存中获取
Product product = (Product) redisTemplate.opsForValue().get(key);
if (product == null) {
// 2. 缓存未命中,尝试获取分布式锁
String lockValue = UUID.randomUUID().toString();
try {
// 使用SET命令的NX和EX参数实现分布式锁
Boolean lockResult = redisTemplate.opsForValue()
.setIfAbsent(lockKey, lockValue, 10, TimeUnit.SECONDS);
if (lockResult) {
// 3. 获取到锁,查询数据库
product = productMapper.selectById(id);
if (product != null) {
// 4. 将数据写入缓存
redisTemplate.opsForValue().set(key, product, CACHE_EXPIRE_TIME, TimeUnit.SECONDS);
} else {
// 5. 数据库中也没有,缓存空值
redisTemplate.opsForValue().set(key, "NULL", 300, TimeUnit.SECONDS);
}
} else {
// 6. 获取锁失败,等待一段时间后重试
Thread.sleep(100);
return getProductById(id); // 递归调用
}
} finally {
// 7. 释放锁
releaseLock(lockKey, lockValue);
}
} else if ("NULL".equals(product)) {
return null; // 返回空值
}
return product;
}
private void releaseLock(String lockKey, String lockValue) {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
redisTemplate.execute(new DefaultRedisScript<>(script, Long.class),
Collections.singletonList(lockKey), lockValue);
}
}
方案二:永不过期 + 异步更新
@Service
public class ProductService {
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private ProductMapper productMapper;
@Autowired
private ExecutorService executorService;
private static final long CACHE_EXPIRE_TIME = 300L;
private static final long REFRESH_THRESHOLD = 60L; // 提前刷新阈值
public Product getProductById(Long id) {
String key = "product:" + id;
// 1. 先从缓存中获取
Product product = (Product) redisTemplate.opsForValue().get(key);
if (product == null) {
// 2. 缓存未命中,查询数据库
product = productMapper.selectById(id);
if (product != null) {
// 3. 写入缓存(永不过期)
redisTemplate.opsForValue().set(key, product);
// 4. 异步刷新缓存(在后台线程中执行)
refreshCacheAsync(id, product);
} else {
// 5. 数据库中也没有,缓存空值
redisTemplate.opsForValue().set(key, "NULL", 300, TimeUnit.SECONDS);
}
} else {
// 6. 检查是否需要刷新缓存
checkAndRefreshCache(key, product, id);
}
return product;
}
private void refreshCacheAsync(Long id, Product product) {
executorService.submit(() -> {
try {
Thread.sleep(REFRESH_THRESHOLD * 1000); // 等待刷新阈值
// 重新查询数据库
Product freshProduct = productMapper.selectById(id);
if (freshProduct != null) {
String key = "product:" + id;
redisTemplate.opsForValue().set(key, freshProduct);
}
} catch (Exception e) {
log.error("缓存刷新失败", e);
}
});
}
private void checkAndRefreshCache(String key, Product product, Long id) {
// 可以通过设置一个标志位来标记是否需要刷新
String refreshKey = "refresh:" + id;
Boolean needRefresh = (Boolean) redisTemplate.opsForValue().get(refreshKey);
if (needRefresh == null || needRefresh) {
// 异步刷新缓存
refreshCacheAsync(id, product);
// 设置刷新标志位
redisTemplate.opsForValue().set(refreshKey, false, CACHE_EXPIRE_TIME, TimeUnit.SECONDS);
}
}
}
缓存雪崩:系统整体的防护策略
什么是缓存雪崩
缓存雪崩是指在某一时刻,大量的缓存同时过期失效,导致所有请求都直接打到数据库上,造成数据库压力过大,甚至服务宕机。这通常发生在缓存集中失效的场景下。
缓存雪崩的危害
// 模拟缓存雪崩场景
@Service
public class NewsService {
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private NewsMapper newsMapper;
public List<News> getNewsList() {
String key = "news:list";
// 1. 从缓存中获取新闻列表
List<News> newsList = (List<News>) redisTemplate.opsForValue().get(key);
if (newsList == null) {
// 2. 缓存未命中,查询数据库
newsList = newsMapper.selectAll();
// 3. 写入缓存(大量数据同时过期)
redisTemplate.opsForValue().set(key, newsList, 600, TimeUnit.SECONDS);
}
return newsList;
}
}
如果所有缓存都在同一时间失效,就会导致雪崩效应。
缓存雪崩解决方案
方案一:设置随机过期时间
@Service
public class NewsService {
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private NewsMapper newsMapper;
private static final long BASE_EXPIRE_TIME = 600L; // 基础过期时间
private static final long RANDOM_RANGE = 300L; // 随机范围
public List<News> getNewsList() {
String key = "news:list";
// 1. 从缓存中获取新闻列表
List<News> newsList = (List<News>) redisTemplate.opsForValue().get(key);
if (newsList == null) {
// 2. 缓存未命中,查询数据库
newsList = newsMapper.selectAll();
// 3. 写入缓存(设置随机过期时间)
long randomExpireTime = BASE_EXPIRE_TIME +
new Random().nextInt((int) RANDOM_RANGE);
redisTemplate.opsForValue().set(key, newsList, randomExpireTime, TimeUnit.SECONDS);
}
return newsList;
}
}
方案二:多级缓存架构
@Component
public class MultiLevelCache {
@Autowired
private RedisTemplate redisTemplate;
// 本地缓存(如Caffeine)
private final Cache<String, Object> localCache = Caffeine.newBuilder()
.maximumSize(1000)
.expireAfterWrite(300, TimeUnit.SECONDS)
.build();
public Object get(String key) {
// 1. 先查本地缓存
Object value = localCache.getIfPresent(key);
if (value != null) {
return value;
}
// 2. 再查Redis缓存
value = redisTemplate.opsForValue().get(key);
if (value != null) {
// 3. 将数据同步到本地缓存
localCache.put(key, value);
return value;
}
return null;
}
public void set(String key, Object value, long timeout, TimeUnit unit) {
// 1. 写入Redis
redisTemplate.opsForValue().set(key, value, timeout, unit);
// 2. 同步写入本地缓存
localCache.put(key, value);
}
}
@Service
public class NewsService {
@Autowired
private MultiLevelCache multiLevelCache;
@Autowired
private NewsMapper newsMapper;
public List<News> getNewsList() {
String key = "news:list";
// 1. 多级缓存获取数据
List<News> newsList = (List<News>) multiLevelCache.get(key);
if (newsList == null) {
// 2. 缓存未命中,查询数据库
newsList = newsMapper.selectAll();
// 3. 写入多级缓存
multiLevelCache.set(key, newsList, 600, TimeUnit.SECONDS);
}
return newsList;
}
}
方案三:限流和降级机制
@Component
public class RateLimiter {
private final RedisTemplate redisTemplate;
public boolean tryAcquire(String key, int permits, long timeout) {
String script = "local key = KEYS[1] " +
"local permits = tonumber(ARGV[1]) " +
"local timeout = tonumber(ARGV[2]) " +
"local current = redis.call('GET', key) " +
"if current == false then " +
" redis.call('SET', key, permits, 'EX', timeout) " +
" return 1 " +
"else " +
" local count = tonumber(current) - 1 " +
" if count >= 0 then " +
" redis.call('SET', key, count, 'EX', timeout) " +
" return 1 " +
" else " +
" return 0 " +
" end " +
"end";
try {
Long result = (Long) redisTemplate.execute(
new DefaultRedisScript<>(script, Long.class),
Collections.singletonList(key),
String.valueOf(permits),
String.valueOf(timeout)
);
return result != null && result == 1;
} catch (Exception e) {
log.error("限流失败", e);
return true; // 发生异常时允许通过
}
}
}
@Service
public class NewsService {
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private NewsMapper newsMapper;
@Autowired
private RateLimiter rateLimiter;
private static final String RATE_LIMIT_KEY = "news_rate_limit";
private static final int MAX_REQUESTS = 100; // 最大请求数
private static final long TIME_WINDOW = 60; // 时间窗口(秒)
public List<News> getNewsList() {
// 1. 限流检查
if (!rateLimiter.tryAcquire(RATE_LIMIT_KEY, MAX_REQUESTS, TIME_WINDOW)) {
// 2. 超过限流,降级处理
return getFallbackNewsList();
}
String key = "news:list";
// 3. 从缓存中获取新闻列表
List<News> newsList = (List<News>) redisTemplate.opsForValue().get(key);
if (newsList == null) {
// 4. 缓存未命中,查询数据库
newsList = newsMapper.selectAll();
// 5. 写入缓存
redisTemplate.opsForValue().set(key, newsList, 600, TimeUnit.SECONDS);
}
return newsList;
}
private List<News> getFallbackNewsList() {
// 降级策略:返回默认数据或最近的数据
log.warn("触发限流降级,返回缓存数据");
String key = "news:list";
return (List<News>) redisTemplate.opsForValue().get(key);
}
}
性能优化最佳实践
缓存预热策略
@Component
public class CacheWarmupService {
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private ProductMapper productMapper;
@PostConstruct
public void warmUpCache() {
// 系统启动时预热热点数据
List<Product> hotProducts = productMapper.selectHotProducts();
for (Product product : hotProducts) {
String key = "product:" + product.getId();
redisTemplate.opsForValue().set(key, product, 3600, TimeUnit.SECONDS);
}
log.info("缓存预热完成,预热了{}个热点商品", hotProducts.size());
}
}
缓存监控和告警
@Component
public class CacheMonitor {
@Autowired
private RedisTemplate redisTemplate;
private static final Logger logger = LoggerFactory.getLogger(CacheMonitor.class);
public void monitorCachePerformance() {
// 监控缓存命中率
String info = (String) redisTemplate.execute(
new DefaultRedisScript<>("return redis.call('INFO')", String.class)
);
// 解析INFO信息中的缓存命中率
double hitRate = calculateHitRate(info);
if (hitRate < 0.8) {
logger.warn("缓存命中率过低: {}%", hitRate * 100);
// 发送告警通知
sendAlert("缓存命中率异常", "当前命中率为" + hitRate);
}
}
private double calculateHitRate(String info) {
// 解析Redis INFO输出,提取命中率数据
// 这里简化处理,实际应该解析完整的INFO输出
return 0.95; // 示例值
}
private void sendAlert(String title, String message) {
// 实现告警通知逻辑
log.info("发送告警: {} - {}", title, message);
}
}
总结
Redis缓存穿透、击穿、雪崩是高并发场景下常见的性能问题,需要我们从多个维度进行防护:
- 缓存穿透:通过缓存空值和布隆过滤器来防止恶意查询
- 缓存击穿:使用互斥锁或异步更新机制保护热点数据
- 缓存雪崩:设置随机过期时间、多级缓存架构和限流降级策略
在实际应用中,建议综合运用多种方案,并根据业务特点进行调整优化。同时,建立完善的监控体系,及时发现和处理潜在问题,确保系统在高并发场景下的稳定运行。
通过本文介绍的解决方案和最佳实践,开发者可以有效应对Redis缓存相关的性能挑战,构建更加健壮的分布式系统架构。

评论 (0)