引言
在现代分布式系统中,Redis作为高性能的内存数据库,已经成为缓存系统的核心组件。然而,在实际应用过程中,我们经常会遇到缓存穿透、击穿、雪崩这三大经典问题,这些问题不仅影响系统的性能,还可能导致服务不可用。本文将深入分析这三个问题的产生原因,并详细介绍相应的解决方案,包括分布式锁实现、布隆过滤器应用以及多级缓存架构设计等核心技术。
Redis缓存三大问题概述
缓存穿透
缓存穿透是指查询一个不存在的数据,由于缓存中没有该数据,需要从数据库中查询,但数据库中也没有该数据。每次请求都会直接打到数据库,导致数据库压力增大,严重时可能造成数据库宕机。
缓存击穿
缓存击穿是指某个热点数据在缓存中过期的瞬间,大量并发请求同时访问该数据,这些请求会直接打到数据库,造成数据库瞬时压力过大。
缓存雪崩
缓存雪崩是指缓存中大量的数据在同一时间大面积失效,导致大量请求直接打到数据库,造成数据库压力过大,甚至导致服务不可用。
缓存穿透解决方案
1. 布隆过滤器(Bloom Filter)
布隆过滤器是一种概率型数据结构,用于快速判断一个元素是否存在于集合中。它通过多个哈希函数将元素映射到位数组中,虽然可能存在误判率,但可以有效防止缓存穿透。
// 使用Redis实现布隆过滤器
@Component
public class BloomFilterService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private static final String BLOOM_FILTER_KEY = "bloom_filter";
private static final int BIT_SIZE = 1000000;
private static final int HASH_COUNT = 3;
/**
* 添加元素到布隆过滤器
*/
public void addElement(String key) {
for (int i = 0; i < HASH_COUNT; i++) {
long hash = hash(key, i);
redisTemplate.opsForValue().setBit(BLOOM_FILTER_KEY + ":" + i, hash % BIT_SIZE, true);
}
}
/**
* 判断元素是否存在
*/
public boolean contains(String key) {
for (int i = 0; i < HASH_COUNT; i++) {
long hash = hash(key, i);
if (!redisTemplate.opsForValue().getBit(BLOOM_FILTER_KEY + ":" + i, hash % BIT_SIZE)) {
return false;
}
}
return true;
}
/**
* 哈希函数
*/
private long hash(String key, int seed) {
long hash = 0;
for (int i = 0; i < key.length(); i++) {
hash = hash * seed + key.charAt(i);
}
return Math.abs(hash);
}
}
2. 空值缓存
对于查询结果为空的数据,同样将其缓存到Redis中,设置较短的过期时间,避免大量无效请求打到数据库。
@Service
public class UserService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private UserRepository userRepository;
private static final String USER_CACHE_KEY = "user:";
private static final long CACHE_TIMEOUT = 300; // 5分钟
public User getUserById(Long id) {
String key = USER_CACHE_KEY + id;
// 先从缓存中获取
Object cachedUser = redisTemplate.opsForValue().get(key);
if (cachedUser != null) {
if (cachedUser instanceof String && "null".equals(cachedUser)) {
return null; // 缓存空值
}
return (User) cachedUser;
}
// 缓存未命中,查询数据库
User user = userRepository.findById(id);
// 将结果缓存到Redis中
if (user == null) {
// 缓存空值
redisTemplate.opsForValue().set(key, "null", CACHE_TIMEOUT, TimeUnit.SECONDS);
} else {
redisTemplate.opsForValue().set(key, user, CACHE_TIMEOUT, TimeUnit.SECONDS);
}
return user;
}
}
3. 互斥锁机制
使用分布式锁确保同一时间只有一个线程去数据库查询数据,其他线程等待结果。
@Service
public class UserService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private UserRepository userRepository;
private static final String USER_CACHE_KEY = "user:";
private static final String LOCK_KEY = "lock:user:";
private static final long LOCK_TIMEOUT = 5000; // 5秒
public User getUserById(Long id) {
String key = USER_CACHE_KEY + id;
String lockKey = LOCK_KEY + id;
// 先从缓存中获取
Object cachedUser = redisTemplate.opsForValue().get(key);
if (cachedUser != null) {
if (cachedUser instanceof String && "null".equals(cachedUser)) {
return null;
}
return (User) cachedUser;
}
// 获取分布式锁
String lockValue = UUID.randomUUID().toString();
if (redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, LOCK_TIMEOUT, TimeUnit.MILLISECONDS)) {
try {
// 重新从缓存获取(可能其他线程已经缓存了数据)
cachedUser = redisTemplate.opsForValue().get(key);
if (cachedUser != null) {
if (cachedUser instanceof String && "null".equals(cachedUser)) {
return null;
}
return (User) cachedUser;
}
// 查询数据库
User user = userRepository.findById(id);
// 缓存结果
if (user == null) {
redisTemplate.opsForValue().set(key, "null", 60, TimeUnit.SECONDS);
} else {
redisTemplate.opsForValue().set(key, user, 3600, TimeUnit.SECONDS);
}
return user;
} finally {
// 释放锁
releaseLock(lockKey, lockValue);
}
} else {
// 获取锁失败,等待一段时间后重试
try {
Thread.sleep(100);
return getUserById(id);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
}
}
private void releaseLock(String lockKey, String lockValue) {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockKey), lockValue);
}
}
缓存击穿解决方案
1. 热点数据永不过期
对于热点数据,可以设置为永不过期,通过后台任务定期更新缓存数据。
@Service
public class HotDataCacheService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private UserRepository userRepository;
private static final String HOT_USER_KEY = "hot_user:";
private static final long CACHE_TIMEOUT = 3600; // 1小时
/**
* 获取热点用户数据,永不过期
*/
public User getHotUser(Long userId) {
String key = HOT_USER_KEY + userId;
// 先从缓存获取
Object cachedUser = redisTemplate.opsForValue().get(key);
if (cachedUser != null) {
return (User) cachedUser;
}
// 缓存未命中,查询数据库
User user = userRepository.findById(userId);
if (user != null) {
// 永不过期缓存
redisTemplate.opsForValue().set(key, user);
}
return user;
}
/**
* 定期更新热点数据
*/
@Scheduled(fixedRate = 300000) // 5分钟执行一次
public void refreshHotData() {
// 执行热点数据的刷新逻辑
List<Long> hotUserIds = getHotUserIds();
for (Long userId : hotUserIds) {
String key = HOT_USER_KEY + userId;
User user = userRepository.findById(userId);
if (user != null) {
redisTemplate.opsForValue().set(key, user);
}
}
}
private List<Long> getHotUserIds() {
// 获取热点用户ID列表的逻辑
return Arrays.asList(1L, 2L, 3L);
}
}
2. 互斥锁防穿透
使用分布式锁防止缓存击穿,确保同一时间只有一个线程去数据库查询数据。
@Service
public class UserService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private UserRepository userRepository;
private static final String USER_CACHE_KEY = "user:";
private static final String LOCK_KEY = "lock:user:";
private static final long CACHE_TIMEOUT = 3600; // 1小时
public User getUserById(Long id) {
String key = USER_CACHE_KEY + id;
String lockKey = LOCK_KEY + id;
// 先从缓存获取
Object cachedUser = redisTemplate.opsForValue().get(key);
if (cachedUser != null) {
return (User) cachedUser;
}
// 获取分布式锁
String lockValue = UUID.randomUUID().toString();
if (redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, 5000, TimeUnit.MILLISECONDS)) {
try {
// 重新检查缓存
cachedUser = redisTemplate.opsForValue().get(key);
if (cachedUser != null) {
return (User) cachedUser;
}
// 查询数据库
User user = userRepository.findById(id);
// 缓存结果
if (user != null) {
redisTemplate.opsForValue().set(key, user, CACHE_TIMEOUT, TimeUnit.SECONDS);
} else {
// 缓存空值,设置较短过期时间
redisTemplate.opsForValue().set(key, "null", 60, TimeUnit.SECONDS);
}
return user;
} finally {
// 释放锁
releaseLock(lockKey, lockValue);
}
} else {
// 等待一段时间后重试
try {
Thread.sleep(100);
return getUserById(id);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
}
}
private void releaseLock(String lockKey, String lockValue) {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockKey), lockValue);
}
}
缓存雪崩解决方案
1. 设置随机过期时间
为缓存设置随机的过期时间,避免大量数据同时失效。
@Service
public class CacheService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private static final String CACHE_KEY_PREFIX = "cache:";
public void setCacheWithRandomTimeout(String key, Object value, int baseTimeout) {
// 设置随机过期时间,避免集中失效
Random random = new Random();
int randomTimeout = baseTimeout + random.nextInt(300); // 5分钟到5分半钟
String fullKey = CACHE_KEY_PREFIX + key;
redisTemplate.opsForValue().set(fullKey, value, randomTimeout, TimeUnit.SECONDS);
}
public Object getCache(String key) {
String fullKey = CACHE_KEY_PREFIX + key;
return redisTemplate.opsForValue().get(fullKey);
}
}
2. 多级缓存架构
构建多级缓存架构,包括本地缓存和分布式缓存,提高系统的容错能力。
@Component
public class MultiLevelCache {
// 本地缓存(Caffeine)
private final Cache<String, Object> localCache;
// Redis缓存
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private static final String REDIS_CACHE_KEY = "cache:";
public MultiLevelCache() {
this.localCache = Caffeine.newBuilder()
.maximumSize(1000)
.expireAfterWrite(Duration.ofMinutes(5))
.build();
}
/**
* 多级缓存获取
*/
public Object get(String key) {
// 1. 先从本地缓存获取
Object value = localCache.getIfPresent(key);
if (value != null) {
return value;
}
// 2. 从Redis缓存获取
String redisKey = REDIS_CACHE_KEY + key;
value = redisTemplate.opsForValue().get(redisKey);
if (value != null) {
// 3. 更新本地缓存
localCache.put(key, value);
return value;
}
return null;
}
/**
* 多级缓存设置
*/
public void set(String key, Object value, long timeout) {
// 1. 设置Redis缓存
String redisKey = REDIS_CACHE_KEY + key;
redisTemplate.opsForValue().set(redisKey, value, timeout, TimeUnit.SECONDS);
// 2. 设置本地缓存
localCache.put(key, value);
}
/**
* 缓存失效时的降级处理
*/
public Object getWithFallback(String key, Supplier<Object> fallback) {
try {
Object value = get(key);
if (value != null) {
return value;
}
// 本地缓存和Redis都未命中,执行降级逻辑
return fallback.get();
} catch (Exception e) {
// 发生异常时的降级处理
return fallback.get();
}
}
}
3. 熔断器模式
结合熔断器模式,在缓存失效导致数据库压力过大时,自动熔断并降级。
@Component
public class CacheServiceWithCircuitBreaker {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
// 使用Resilience4j熔断器
private final CircuitBreaker circuitBreaker;
public CacheServiceWithCircuitBreaker() {
this.circuitBreaker = CircuitBreaker.ofDefaults("cacheCircuitBreaker");
}
public User getUserById(Long id) {
// 使用熔断器包装方法
Supplier<User> supplier = () -> {
String key = "user:" + id;
// 先从缓存获取
Object cachedUser = redisTemplate.opsForValue().get(key);
if (cachedUser != null) {
return (User) cachedUser;
}
// 缓存未命中,查询数据库
User user = queryDatabase(id);
// 缓存结果
if (user != null) {
redisTemplate.opsForValue().set(key, user, 3600, TimeUnit.SECONDS);
} else {
redisTemplate.opsForValue().set(key, "null", 60, TimeUnit.SECONDS);
}
return user;
};
return CircuitBreaker.decorateSupplier(circuitBreaker, supplier).get();
}
private User queryDatabase(Long id) {
// 数据库查询逻辑
return new User(id, "User" + id);
}
}
分布式锁实现
1. 基于Redis的分布式锁实现
@Component
public class RedisDistributedLock {
private static final String LOCK_PREFIX = "lock:";
private static final long DEFAULT_TIMEOUT = 3000; // 3秒
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 获取分布式锁
*/
public boolean acquireLock(String lockKey, String lockValue, long timeout) {
String key = LOCK_PREFIX + lockKey;
Boolean result = redisTemplate.opsForValue().setIfAbsent(key, lockValue, timeout, TimeUnit.MILLISECONDS);
return result != null && result;
}
/**
* 释放分布式锁
*/
public void releaseLock(String lockKey, String lockValue) {
String key = LOCK_PREFIX + lockKey;
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(key), lockValue);
}
/**
* 带重试机制的锁获取
*/
public boolean acquireLockWithRetry(String lockKey, String lockValue, int maxRetries, long retryInterval) {
for (int i = 0; i < maxRetries; i++) {
if (acquireLock(lockKey, lockValue, DEFAULT_TIMEOUT)) {
return true;
}
try {
Thread.sleep(retryInterval);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
return false;
}
}
2. 锁的超时机制和自动续期
@Component
public class AutoRenewLockService {
private static final String LOCK_PREFIX = "lock:";
private static final long DEFAULT_TIMEOUT = 3000;
private static final long RENEW_INTERVAL = 1000; // 1秒
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private final Map<String, ScheduledFuture<?>> scheduledTasks = new ConcurrentHashMap<>();
/**
* 获取分布式锁并启动自动续期
*/
public boolean acquireLockWithAutoRenew(String lockKey, String lockValue, long timeout) {
String key = LOCK_PREFIX + lockKey;
Boolean result = redisTemplate.opsForValue().setIfAbsent(key, lockValue, timeout, TimeUnit.MILLISECONDS);
if (result != null && result) {
// 启动自动续期任务
startAutoRenewTask(lockKey, lockValue, timeout);
return true;
}
return false;
}
/**
* 启动自动续期任务
*/
private void startAutoRenewTask(String lockKey, String lockValue, long timeout) {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(() -> {
try {
renewLock(lockKey, lockValue, timeout);
} catch (Exception e) {
// 记录日志,但不中断续期任务
log.error("Lock auto-renew failed", e);
}
}, RENEW_INTERVAL, RENEW_INTERVAL, TimeUnit.MILLISECONDS);
scheduledTasks.put(lockKey, future);
}
/**
* 续期锁
*/
private void renewLock(String lockKey, String lockValue, long timeout) {
String key = LOCK_PREFIX + lockKey;
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('pexpire', KEYS[1], ARGV[2]) else return 0 end";
redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(key), lockValue, String.valueOf(timeout));
}
/**
* 释放锁并停止续期任务
*/
public void releaseLock(String lockKey, String lockValue) {
String key = LOCK_PREFIX + lockKey;
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(key), lockValue);
// 停止续期任务
ScheduledFuture<?> future = scheduledTasks.remove(lockKey);
if (future != null) {
future.cancel(true);
}
}
}
多级缓存架构设计
1. 缓存层次结构设计
@Component
public class MultiLevelCacheManager {
// 本地缓存(JVM级别)
private final Cache<String, Object> localCache;
// 二级缓存(Redis)
@Autowired
private RedisTemplate<String, Object> redisTemplate;
// 缓存配置
private static final String LOCAL_CACHE_KEY = "local_cache:";
private static final String REDIS_CACHE_KEY = "redis_cache:";
public MultiLevelCacheManager() {
this.localCache = Caffeine.newBuilder()
.maximumSize(1000)
.expireAfterWrite(Duration.ofMinutes(5))
.recordStats()
.build();
}
/**
* 获取数据 - 多级缓存查找
*/
public Object get(String key) {
// 1. 本地缓存查找
Object value = localCache.getIfPresent(key);
if (value != null) {
return value;
}
// 2. Redis缓存查找
String redisKey = REDIS_CACHE_KEY + key;
value = redisTemplate.opsForValue().get(redisKey);
if (value != null) {
// 3. 更新本地缓存
localCache.put(key, value);
return value;
}
return null;
}
/**
* 设置数据 - 多级缓存同步
*/
public void set(String key, Object value, long timeout) {
// 1. 设置Redis缓存
String redisKey = REDIS_CACHE_KEY + key;
redisTemplate.opsForValue().set(redisKey, value, timeout, TimeUnit.SECONDS);
// 2. 设置本地缓存
localCache.put(key, value);
}
/**
* 删除数据 - 多级缓存同步删除
*/
public void delete(String key) {
// 1. 删除Redis缓存
String redisKey = REDIS_CACHE_KEY + key;
redisTemplate.delete(redisKey);
// 2. 删除本地缓存
localCache.invalidate(key);
}
/**
* 获取缓存统计信息
*/
public CacheStats getCacheStats() {
return localCache.stats();
}
}
2. 缓存预热机制
@Component
public class CacheWarmupService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private UserRepository userRepository;
private static final String USER_CACHE_KEY = "user:";
/**
* 系统启动时预热缓存
*/
@EventListener
public void handleApplicationStarted(ApplicationReadyEvent event) {
log.info("Starting cache warmup...");
warmUpCache();
}
/**
* 缓存预热
*/
private void warmUpCache() {
// 获取热门用户ID列表
List<Long> hotUserIds = getHotUserIds();
for (Long userId : hotUserIds) {
try {
String key = USER_CACHE_KEY + userId;
// 查询数据库
User user = userRepository.findById(userId);
if (user != null) {
// 缓存到Redis
redisTemplate.opsForValue().set(key, user, 3600, TimeUnit.SECONDS);
}
} catch (Exception e) {
log.error("Cache warmup failed for user: {}", userId, e);
}
}
log.info("Cache warmup completed");
}
private List<Long> getHotUserIds() {
// 实际应用中可以从数据库或配置中心获取热门用户列表
return Arrays.asList(1L, 2L, 3L, 4L, 5L);
}
}
3. 缓存监控和告警
@Component
public class CacheMonitor {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private static final String METRICS_KEY = "cache_metrics:";
/**
* 记录缓存访问统计
*/
public void recordCacheAccess(String cacheKey, boolean hit) {
String key = METRICS_KEY + cacheKey;
Map<String, Object> metrics = new HashMap<>();
metrics.put("access_time", System.currentTimeMillis());
metrics.put("hit", hit);
redisTemplate.opsForHash().put(key, "metrics", metrics);
}
/**
* 获取缓存命中率
*/
public double getHitRate(String cacheKey) {
// 实现缓存命中率计算逻辑
return 0.95; // 示例值
}
/**
* 缓存异常监控
*/
@EventListener
public void handleCacheException(CacheExceptionEvent event) {
log.warn("Cache exception occurred: {}", event.getMessage());
// 发送告警通知
sendAlert(event);
}
private void sendAlert(CacheExceptionEvent event) {
// 实现告警逻辑,如发送邮件、短信等
log.info("Sending alert for cache exception: {}", event.getMessage());
}
}
最佳实践和注意事项
1. 缓存策略选择
public class CacheStrategy {
/**
* LRU缓存策略
*/
public static final String LRU = "LRU";
/**
* LFU缓存策略
*/
public static final String LFU = "LFU";
/**
* FIFO缓存策略
*/
public static final String FIFO = "FIFO";
/**
* 根据业务场景选择合适的缓存策略
*/
public static void configureCacheStrategy(String strategy) {
switch (strategy) {
case LRU:
// 配置LRU策略
break;
case LFU:
// 配置LFU策略
break;
case FIFO:
// 配置FIFO策略
break;
default:
throw new IllegalArgumentException("Unknown cache strategy: " + strategy);
}
}
}
2. 性能优化建议
@Component
public class CachePerformanceOptimizer {
/**
* 批量操作优化
*/
public void batchSetCache(List<String> keys, List<Object> values) {
// 使用Pipeline批量操作
redisTemplate.executePipelined(new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
for (int i = 0; i < keys.size(); i++) {
connection.set(keys.get(i).getBytes(),
SerializationUtils.serialize(values.get(i)));
}
return null;
}
});
}
/**
* 异步缓存更新
*/
@Async
public void asyncUpdateCache(String key, Object value) {
redisTemplate.ops
评论 (0)