前言
在现代高并发的互联网应用中,缓存技术已成为提升系统性能的关键手段。Redis作为最受欢迎的内存数据库之一,广泛应用于各种缓存场景。然而,在高并发环境下,如果缓存策略设计不当,很容易出现缓存穿透、击穿、雪崩等问题,严重影响系统的稳定性和用户体验。
本文将深入剖析这些常见的缓存问题,提供详细的解决方案和最佳实践,帮助开发者构建高性能、高可用的后端服务系统。
Redis缓存核心概念回顾
在深入讨论具体问题之前,我们先来回顾一下Redis缓存的基本工作原理。
缓存的工作模式
典型的缓存架构遵循"先查缓存,后查数据库"的模式:
// 缓存查询伪代码
public Object getData(String key) {
// 1. 先从缓存中获取数据
Object data = redisTemplate.get(key);
if (data != null) {
return data; // 缓存命中
}
// 2. 缓存未命中,查询数据库
data = database.query(key);
// 3. 将数据写入缓存
redisTemplate.set(key, data, expireTime);
return data;
}
缓存的常见问题
在高并发场景下,上述简单的缓存逻辑会暴露出诸多问题。这些问题不仅影响系统性能,还可能导致服务雪崩,因此需要我们深入理解和有效防护。
缓存穿透问题详解与解决方案
什么是缓存穿透
缓存穿透是指查询一个根本不存在的数据,缓存层和存储层都不会命中,导致请求直接打到数据库上。这种场景在高并发下尤为严重,因为大量无效请求会直接冲击数据库,可能导致数据库宕机。
缓存穿透的典型场景
// 模拟缓存穿透场景
public class CachePenetrationDemo {
private RedisTemplate<String, Object> redisTemplate;
private UserService userService;
public User getUserById(Long id) {
// 1. 先查Redis缓存
String key = "user:" + id;
User user = (User) redisTemplate.opsForValue().get(key);
if (user == null) {
// 2. Redis中没有,查询数据库
user = userService.findById(id);
if (user == null) {
// 3. 数据库中也没有,此时应该:
// - 将空值写入缓存(防止持续查询数据库)
// - 或者设置较短的过期时间
// 这里直接返回null,会导致后续大量请求都打到数据库
return null;
}
// 4. 查询到数据后,写入缓存
redisTemplate.opsForValue().set(key, user, 300, TimeUnit.SECONDS);
}
return user;
}
}
缓存穿透的解决方案
1. 布隆过滤器(Bloom Filter)
布隆过滤器是一种概率型数据结构,可以高效地判断一个元素是否存在于集合中。使用布隆过滤器可以在查询前进行预判,避免无效查询。
@Component
public class BloomFilterCache {
private final BloomFilter<String> bloomFilter;
public BloomFilterCache() {
// 初始化布隆过滤器,预计存储100万条数据,误判率0.1%
this.bloomFilter = BloomFilter.create(
Funnels.stringFunnel(Charset.defaultCharset()),
1000000,
0.001
);
}
// 将真实存在的key加入布隆过滤器
public void addKey(String key) {
bloomFilter.put(key);
}
// 检查key是否存在
public boolean contains(String key) {
return bloomFilter.mightContain(key);
}
}
// 使用布隆过滤器的优化版本
public User getUserByIdWithBloom(Long id) {
String key = "user:" + id;
// 1. 先通过布隆过滤器检查key是否存在
if (!bloomFilter.contains(key)) {
return null; // 布隆过滤器判断不存在,直接返回
}
// 2. 再查询Redis缓存
User user = (User) redisTemplate.opsForValue().get(key);
if (user != null) {
return user;
}
// 3. Redis中没有,查询数据库
user = userService.findById(id);
if (user != null) {
// 4. 查询到数据后,写入缓存并更新布隆过滤器
redisTemplate.opsForValue().set(key, user, 300, TimeUnit.SECONDS);
bloomFilter.addKey(key);
} else {
// 5. 数据库中也没有,将空值写入缓存(设置较短过期时间)
redisTemplate.opsForValue().set(key, "", 10, TimeUnit.SECONDS);
}
return user;
}
2. 缓存空值
对于查询不到的数据,可以将空值也缓存起来,但设置较短的过期时间。
public User getUserByIdWithNullCache(Long id) {
String key = "user:" + id;
// 1. 先查Redis缓存
Object cached = redisTemplate.opsForValue().get(key);
if (cached == null) {
// 2. 缓存中没有,查询数据库
User user = userService.findById(id);
if (user != null) {
// 3. 查询到数据,写入缓存
redisTemplate.opsForValue().set(key, user, 300, TimeUnit.SECONDS);
} else {
// 4. 数据库中也没有,将空值写入缓存(设置较短过期时间)
redisTemplate.opsForValue().set(key, "", 10, TimeUnit.SECONDS);
}
return user;
}
// 5. 缓存中有数据,直接返回
return cached instanceof String ? null : (User) cached;
}
3. 互斥锁机制
使用分布式锁确保同一时间只有一个线程去查询数据库。
public User getUserByIdWithMutex(Long id) {
String key = "user:" + id;
String lockKey = "lock:user:" + id;
try {
// 1. 尝试获取分布式锁
if (redisTemplate.opsForValue().setIfAbsent(lockKey, "locked",
10, TimeUnit.SECONDS)) {
// 2. 获取锁成功,查询数据库
User user = userService.findById(id);
if (user != null) {
// 3. 查询到数据,写入缓存
redisTemplate.opsForValue().set(key, user, 300, TimeUnit.SECONDS);
} else {
// 4. 数据库中也没有,将空值写入缓存
redisTemplate.opsForValue().set(key, "", 10, TimeUnit.SECONDS);
}
return user;
} else {
// 5. 获取锁失败,等待一段时间后重试
Thread.sleep(100);
return getUserByIdWithMutex(id); // 递归调用
}
} catch (Exception e) {
throw new RuntimeException("获取用户信息失败", e);
} finally {
// 6. 释放锁
redisTemplate.delete(lockKey);
}
}
缓存击穿问题详解与解决方案
什么是缓存击穿
缓存击穿是指某个热点数据在缓存中过期失效的瞬间,大量并发请求同时访问该数据,导致数据库瞬间压力剧增。与缓存穿透不同的是,缓存击穿中的数据是真实存在的,只是缓存失效了。
缓存击穿的典型场景
// 模拟缓存击穿场景
public class CacheBreakdownDemo {
private RedisTemplate<String, Object> redisTemplate;
private UserService userService;
public User getUserById(Long id) {
String key = "user:" + id;
// 1. 先查Redis缓存
User user = (User) redisTemplate.opsForValue().get(key);
if (user == null) {
// 2. 缓存失效,查询数据库
user = userService.findById(id);
if (user != null) {
// 3. 查询到数据后,重新写入缓存
redisTemplate.opsForValue().set(key, user, 300, TimeUnit.SECONDS);
}
}
return user;
}
}
缓存击穿的解决方案
1. 热点数据永不过期
对于特别热点的数据,可以设置为永不过期,通过业务逻辑来更新数据。
@Component
public class HotDataCache {
private RedisTemplate<String, Object> redisTemplate;
// 设置热点数据永不过期
public void setHotData(String key, Object data) {
redisTemplate.opsForValue().set(key, data); // 不设置过期时间
}
// 通过业务更新热点数据
public void updateHotData(String key, Object newData) {
redisTemplate.opsForValue().set(key, newData);
}
// 定时任务刷新热点数据
@Scheduled(fixedRate = 300000) // 每5分钟刷新一次
public void refreshHotData() {
// 批量更新热点数据
List<String> hotKeys = getHotKeys();
for (String key : hotKeys) {
Object data = loadFromDatabase(key);
redisTemplate.opsForValue().set(key, data);
}
}
}
2. 互斥锁保护
使用分布式锁确保同一时间只有一个线程去查询数据库。
public User getUserByIdWithLock(Long id) {
String key = "user:" + id;
String lockKey = "lock:user:" + id;
try {
// 1. 先从缓存中获取数据
User user = (User) redisTemplate.opsForValue().get(key);
if (user != null) {
return user; // 缓存命中
}
// 2. 尝试获取分布式锁
Boolean lockAcquired = redisTemplate.opsForValue()
.setIfAbsent(lockKey, "locked", 10, TimeUnit.SECONDS);
if (lockAcquired != null && lockAcquired) {
// 3. 获取锁成功,查询数据库
user = userService.findById(id);
if (user != null) {
// 4. 查询到数据,写入缓存
redisTemplate.opsForValue().set(key, user, 300, TimeUnit.SECONDS);
} else {
// 5. 数据库中也没有,将空值写入缓存
redisTemplate.opsForValue().set(key, "", 10, TimeUnit.SECONDS);
}
return user;
} else {
// 6. 获取锁失败,等待后重试
Thread.sleep(50);
return getUserByIdWithLock(id);
}
} catch (Exception e) {
throw new RuntimeException("获取用户信息失败", e);
} finally {
// 7. 释放锁
if (redisTemplate.hasKey(lockKey)) {
redisTemplate.delete(lockKey);
}
}
}
3. 分布式锁优化
使用更完善的分布式锁实现,避免死锁问题。
@Component
public class DistributedLock {
private RedisTemplate<String, String> redisTemplate;
/**
* 获取分布式锁
*/
public boolean acquireLock(String key, String value, long expireTime) {
String script = "if redis.call('exists', KEYS[1]) == 0 then " +
"redis.call('set', KEYS[1], ARGV[1]) " +
"redis.call('expire', KEYS[1], ARGV[2]) " +
"return 1 " +
"else return 0 end";
Object result = redisTemplate.execute(
new DefaultRedisScript<>(script, Long.class),
Collections.singletonList(key),
value,
String.valueOf(expireTime)
);
return result != null && (Long) result == 1L;
}
/**
* 释放分布式锁
*/
public boolean releaseLock(String key, String value) {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then " +
"return redis.call('del', KEYS[1]) " +
"else return 0 end";
Object result = redisTemplate.execute(
new DefaultRedisScript<>(script, Long.class),
Collections.singletonList(key),
value
);
return result != null && (Long) result == 1L;
}
}
// 使用分布式锁的缓存击穿解决方案
public class CacheBreakdownSolution {
private RedisTemplate<String, Object> redisTemplate;
private DistributedLock distributedLock;
private UserService userService;
public User getUserById(Long id) {
String key = "user:" + id;
String lockKey = "lock:user:" + id;
String requestId = UUID.randomUUID().toString();
try {
// 1. 先从缓存获取
User user = (User) redisTemplate.opsForValue().get(key);
if (user != null) {
return user;
}
// 2. 获取分布式锁
if (distributedLock.acquireLock(lockKey, requestId, 10)) {
// 3. 再次检查缓存(双重检查)
user = (User) redisTemplate.opsForValue().get(key);
if (user != null) {
return user;
}
// 4. 查询数据库
user = userService.findById(id);
if (user != null) {
// 5. 写入缓存
redisTemplate.opsForValue().set(key, user, 300, TimeUnit.SECONDS);
} else {
// 6. 数据库无数据,写入空值缓存
redisTemplate.opsForValue().set(key, "", 10, TimeUnit.SECONDS);
}
return user;
} else {
// 7. 获取锁失败,稍后重试
Thread.sleep(50);
return getUserById(id);
}
} catch (Exception e) {
throw new RuntimeException("获取用户信息失败", e);
} finally {
// 8. 释放锁(使用Lua脚本确保原子性)
distributedLock.releaseLock(lockKey, requestId);
}
}
}
缓存雪崩问题详解与解决方案
什么是缓存雪崩
缓存雪崩是指缓存中大量数据同时过期失效,导致大量请求直接打到数据库上,造成数据库压力剧增,甚至引发服务宕机。这通常发生在缓存系统整体性故障或大规模数据同时过期时。
缓存雪崩的典型场景
// 模拟缓存雪崩场景
public class CacheAvalancheDemo {
private RedisTemplate<String, Object> redisTemplate;
public void batchExpire() {
// 1. 大批量数据同时过期(模拟雪崩场景)
Set<String> keys = redisTemplate.keys("user:*");
for (String key : keys) {
// 这里可能同时设置大量key的过期时间
redisTemplate.expire(key, 300, TimeUnit.SECONDS);
}
}
public User getUserById(Long id) {
String key = "user:" + id;
// 2. 如果这些key同时过期,大量请求会同时查询数据库
User user = (User) redisTemplate.opsForValue().get(key);
if (user == null) {
// 3. 直接打到数据库
user = userService.findById(id);
if (user != null) {
redisTemplate.opsForValue().set(key, user, 300, TimeUnit.SECONDS);
}
}
return user;
}
}
缓存雪崩的解决方案
1. 过期时间随机化
为缓存设置随机的过期时间,避免大量数据同时失效。
@Component
public class RandomExpireCache {
private RedisTemplate<String, Object> redisTemplate;
/**
* 设置带随机过期时间的缓存
*/
public void setWithRandomExpire(String key, Object value, long baseExpireTime) {
// 1. 基于基础时间加上随机偏移量
Random random = new Random();
long randomOffset = random.nextInt(300); // 随机0-300秒
long actualExpireTime = baseExpireTime + randomOffset;
redisTemplate.opsForValue().set(key, value, actualExpireTime, TimeUnit.SECONDS);
}
/**
* 批量设置缓存并随机化过期时间
*/
public void batchSetWithRandomExpire(List<String> keys, List<Object> values) {
for (int i = 0; i < keys.size(); i++) {
String key = keys.get(i);
Object value = values.get(i);
// 设置随机过期时间(300-600秒)
Random random = new Random();
long expireTime = 300 + random.nextInt(300);
redisTemplate.opsForValue().set(key, value, expireTime, TimeUnit.SECONDS);
}
}
}
2. 缓存预热机制
在系统启动或业务高峰期前,提前将热点数据加载到缓存中。
@Component
public class CacheWarmupService {
private RedisTemplate<String, Object> redisTemplate;
private UserService userService;
/**
* 系统启动时进行缓存预热
*/
@PostConstruct
public void warmUpCache() {
// 1. 获取热点数据ID列表
List<Long> hotUserIds = getHotUserIds();
// 2. 批量加载到缓存中
for (Long userId : hotUserIds) {
String key = "user:" + userId;
User user = userService.findById(userId);
if (user != null) {
// 3. 设置随机过期时间避免雪崩
Random random = new Random();
long expireTime = 300 + random.nextInt(300);
redisTemplate.opsForValue().set(key, user, expireTime, TimeUnit.SECONDS);
}
}
}
/**
* 定时预热缓存
*/
@Scheduled(cron = "0 0 2 * * ?") // 每天凌晨2点执行
public void scheduledWarmup() {
// 1. 获取需要预热的数据
List<Long> userIds = getDailyHotUsers();
// 2. 批量预热
batchLoadCache(userIds);
}
private void batchLoadCache(List<Long> userIds) {
for (Long userId : userIds) {
String key = "user:" + userId;
User user = userService.findById(userId);
if (user != null) {
redisTemplate.opsForValue().set(key, user, 300, TimeUnit.SECONDS);
}
}
}
}
3. 多级缓存架构
构建多级缓存体系,降低单层缓存失效的影响。
@Component
public class MultiLevelCache {
private RedisTemplate<String, Object> redisTemplate;
private Cache localCache = new ConcurrentHashMap<>(); // 本地缓存
/**
* 多级缓存查询
*/
public User getUserById(Long id) {
String key = "user:" + id;
// 1. 先查本地缓存
User user = (User) localCache.get(key);
if (user != null) {
return user;
}
// 2. 再查Redis缓存
user = (User) redisTemplate.opsForValue().get(key);
if (user != null) {
// 3. Redis命中,同时写入本地缓存
localCache.put(key, user);
return user;
}
// 4. 最后查数据库
user = userService.findById(id);
if (user != null) {
// 5. 查询到数据,写入所有层级缓存
redisTemplate.opsForValue().set(key, user, 300, TimeUnit.SECONDS);
localCache.put(key, user);
}
return user;
}
/**
* 缓存刷新策略
*/
public void refreshCache(String key) {
// 1. 先从数据库获取最新数据
User user = userService.findById(parseUserId(key));
if (user != null) {
// 2. 更新所有层级缓存
redisTemplate.opsForValue().set(key, user, 300, TimeUnit.SECONDS);
localCache.put(key, user);
} else {
// 3. 数据库无数据,清理缓存
redisTemplate.delete(key);
localCache.remove(key);
}
}
private Long parseUserId(String key) {
return Long.valueOf(key.split(":")[1]);
}
}
4. 限流降级机制
在缓存雪崩发生时,通过限流和降级保护系统。
@Component
public class CacheProtectionService {
private RedisTemplate<String, Object> redisTemplate;
/**
* 限流检查
*/
public boolean isAllowed(String key) {
String rateKey = "rate_limit:" + key;
// 1. 使用Redis计数器实现限流
Long currentCount = redisTemplate.opsForValue().increment(rateKey, 1);
if (currentCount == 1) {
// 2. 第一次访问,设置过期时间
redisTemplate.expire(rateKey, 1, TimeUnit.MINUTES);
}
// 3. 限制每分钟最多100次请求
return currentCount <= 100;
}
/**
* 降级处理
*/
public User fallbackGetUser(Long id) {
// 1. 返回默认值或缓存旧数据
String key = "user:" + id;
Object cached = redisTemplate.opsForValue().get(key);
if (cached != null && !(cached instanceof String)) {
return (User) cached;
}
// 2. 返回空值或默认用户信息
return new User(id, "default_user", "default@example.com");
}
/**
* 缓存雪崩保护策略
*/
public User getUserWithProtection(Long id) {
String key = "user:" + id;
try {
// 1. 限流检查
if (!isAllowed(key)) {
// 2. 超过限流,降级处理
return fallbackGetUser(id);
}
// 3. 正常缓存查询逻辑
User user = (User) redisTemplate.opsForValue().get(key);
if (user == null) {
// 4. 缓存未命中,查询数据库
user = userService.findById(id);
if (user != null) {
redisTemplate.opsForValue().set(key, user, 300, TimeUnit.SECONDS);
} else {
// 5. 数据库也无数据,缓存空值
redisTemplate.opsForValue().set(key, "", 10, TimeUnit.SECONDS);
}
}
return user;
} catch (Exception e) {
// 6. 异常情况下降级处理
return fallbackGetUser(id);
}
}
}
综合优化实践案例
完整的缓存解决方案
@Service
public class UserServiceImpl implements UserService {
private final RedisTemplate<String, Object> redisTemplate;
private final DistributedLock distributedLock;
private final BloomFilterCache bloomFilterCache;
@Override
public User findById(Long id) {
String key = "user:" + id;
// 1. 布隆过滤器预检查
if (!bloomFilterCache.contains(key)) {
return null;
}
// 2. 查询Redis缓存
Object cached = redisTemplate.opsForValue().get(key);
if (cached != null) {
if (cached instanceof String && ((String) cached).isEmpty()) {
// 空值缓存,直接返回null
return null;
}
return (User) cached;
}
// 3. Redis未命中,使用分布式锁保护数据库查询
String lockKey = "lock:" + key;
String requestId = UUID.randomUUID().toString();
try {
if (distributedLock.acquireLock(lockKey, requestId, 10)) {
// 双重检查缓存
cached = redisTemplate.opsForValue().get(key);
if (cached != null) {
if (cached instanceof String && ((String) cached).isEmpty()) {
return null;
}
return (User) cached;
}
// 4. 查询数据库
User user = queryFromDatabase(id);
if (user != null) {
// 5. 写入缓存(随机过期时间)
Random random = new Random();
long expireTime = 300 + random.nextInt(300);
redisTemplate.opsForValue().set(key, user, expireTime, TimeUnit.SECONDS);
// 6. 更新布隆过滤器
bloomFilterCache.addKey(key);
} else {
// 7. 数据库无数据,缓存空值
redisTemplate.opsForValue().set(key, "", 10, TimeUnit.SECONDS);
}
return user;
} else {
// 获取锁失败,等待后重试
Thread.sleep(50);
return findById(id);
}
} catch (Exception e) {
throw new RuntimeException("查询用户信息失败", e);
} finally {
distributedLock.releaseLock(lockKey, requestId);
}
}
private User queryFromDatabase(Long id) {
// 实际的数据库查询逻辑
return userMapper.selectById(id);
}
}
性能监控与告警
@Component
public class CacheMonitor {
private final RedisTemplate<String, Object> redisTemplate;
private final MeterRegistry meterRegistry;
@Scheduled(fixedRate = 60000) // 每分钟统计一次
public void monitorCachePerformance() {
// 1. 统计缓存命中率
long totalRequests = getCounterValue("cache_requests");
long hitRequests = getCounterValue("cache_hits");
double hitRate = totalRequests > 0 ? (double) hitRequests / totalRequests : 0;
// 2. 记录指标到监控系统
Gauge
评论 (0)