Redis缓存穿透、击穿、雪崩解决方案:分布式锁、布隆过滤器、多级缓存架构设计与实现

晨曦之光
晨曦之光 2025-12-17T15:20:00+08:00
0 0 0

引言

在现代分布式系统中,Redis作为高性能的内存数据库,已经成为缓存系统的核心组件。然而,在实际应用过程中,我们经常会遇到缓存穿透、击穿、雪崩这三大经典问题,这些问题不仅影响系统的性能,还可能导致服务不可用。本文将深入分析这三个问题的产生原因,并详细介绍相应的解决方案,包括分布式锁实现、布隆过滤器应用以及多级缓存架构设计等核心技术。

Redis缓存三大问题概述

缓存穿透

缓存穿透是指查询一个不存在的数据,由于缓存中没有该数据,需要从数据库中查询,但数据库中也没有该数据。每次请求都会直接打到数据库,导致数据库压力增大,严重时可能造成数据库宕机。

缓存击穿

缓存击穿是指某个热点数据在缓存中过期的瞬间,大量并发请求同时访问该数据,这些请求会直接打到数据库,造成数据库瞬时压力过大。

缓存雪崩

缓存雪崩是指缓存中大量的数据在同一时间大面积失效,导致大量请求直接打到数据库,造成数据库压力过大,甚至导致服务不可用。

缓存穿透解决方案

1. 布隆过滤器(Bloom Filter)

布隆过滤器是一种概率型数据结构,用于快速判断一个元素是否存在于集合中。它通过多个哈希函数将元素映射到位数组中,虽然可能存在误判率,但可以有效防止缓存穿透。

// 使用Redis实现布隆过滤器
@Component
public class BloomFilterService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    private static final String BLOOM_FILTER_KEY = "bloom_filter";
    private static final int BIT_SIZE = 1000000;
    private static final int HASH_COUNT = 3;
    
    /**
     * 添加元素到布隆过滤器
     */
    public void addElement(String key) {
        for (int i = 0; i < HASH_COUNT; i++) {
            long hash = hash(key, i);
            redisTemplate.opsForValue().setBit(BLOOM_FILTER_KEY + ":" + i, hash % BIT_SIZE, true);
        }
    }
    
    /**
     * 判断元素是否存在
     */
    public boolean contains(String key) {
        for (int i = 0; i < HASH_COUNT; i++) {
            long hash = hash(key, i);
            if (!redisTemplate.opsForValue().getBit(BLOOM_FILTER_KEY + ":" + i, hash % BIT_SIZE)) {
                return false;
            }
        }
        return true;
    }
    
    /**
     * 哈希函数
     */
    private long hash(String key, int seed) {
        long hash = 0;
        for (int i = 0; i < key.length(); i++) {
            hash = hash * seed + key.charAt(i);
        }
        return Math.abs(hash);
    }
}

2. 空值缓存

对于查询结果为空的数据,同样将其缓存到Redis中,设置较短的过期时间,避免大量无效请求打到数据库。

@Service
public class UserService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @Autowired
    private UserRepository userRepository;
    
    private static final String USER_CACHE_KEY = "user:";
    private static final long CACHE_TIMEOUT = 300; // 5分钟
    
    public User getUserById(Long id) {
        String key = USER_CACHE_KEY + id;
        
        // 先从缓存中获取
        Object cachedUser = redisTemplate.opsForValue().get(key);
        if (cachedUser != null) {
            if (cachedUser instanceof String && "null".equals(cachedUser)) {
                return null; // 缓存空值
            }
            return (User) cachedUser;
        }
        
        // 缓存未命中,查询数据库
        User user = userRepository.findById(id);
        
        // 将结果缓存到Redis中
        if (user == null) {
            // 缓存空值
            redisTemplate.opsForValue().set(key, "null", CACHE_TIMEOUT, TimeUnit.SECONDS);
        } else {
            redisTemplate.opsForValue().set(key, user, CACHE_TIMEOUT, TimeUnit.SECONDS);
        }
        
        return user;
    }
}

3. 互斥锁机制

使用分布式锁确保同一时间只有一个线程去数据库查询数据,其他线程等待结果。

@Service
public class UserService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @Autowired
    private UserRepository userRepository;
    
    private static final String USER_CACHE_KEY = "user:";
    private static final String LOCK_KEY = "lock:user:";
    private static final long LOCK_TIMEOUT = 5000; // 5秒
    
    public User getUserById(Long id) {
        String key = USER_CACHE_KEY + id;
        String lockKey = LOCK_KEY + id;
        
        // 先从缓存中获取
        Object cachedUser = redisTemplate.opsForValue().get(key);
        if (cachedUser != null) {
            if (cachedUser instanceof String && "null".equals(cachedUser)) {
                return null;
            }
            return (User) cachedUser;
        }
        
        // 获取分布式锁
        String lockValue = UUID.randomUUID().toString();
        if (redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, LOCK_TIMEOUT, TimeUnit.MILLISECONDS)) {
            try {
                // 重新从缓存获取(可能其他线程已经缓存了数据)
                cachedUser = redisTemplate.opsForValue().get(key);
                if (cachedUser != null) {
                    if (cachedUser instanceof String && "null".equals(cachedUser)) {
                        return null;
                    }
                    return (User) cachedUser;
                }
                
                // 查询数据库
                User user = userRepository.findById(id);
                
                // 缓存结果
                if (user == null) {
                    redisTemplate.opsForValue().set(key, "null", 60, TimeUnit.SECONDS);
                } else {
                    redisTemplate.opsForValue().set(key, user, 3600, TimeUnit.SECONDS);
                }
                
                return user;
            } finally {
                // 释放锁
                releaseLock(lockKey, lockValue);
            }
        } else {
            // 获取锁失败,等待一段时间后重试
            try {
                Thread.sleep(100);
                return getUserById(id);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return null;
            }
        }
    }
    
    private void releaseLock(String lockKey, String lockValue) {
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockKey), lockValue);
    }
}

缓存击穿解决方案

1. 热点数据永不过期

对于热点数据,可以设置为永不过期,通过后台任务定期更新缓存数据。

@Service
public class HotDataCacheService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @Autowired
    private UserRepository userRepository;
    
    private static final String HOT_USER_KEY = "hot_user:";
    private static final long CACHE_TIMEOUT = 3600; // 1小时
    
    /**
     * 获取热点用户数据,永不过期
     */
    public User getHotUser(Long userId) {
        String key = HOT_USER_KEY + userId;
        
        // 先从缓存获取
        Object cachedUser = redisTemplate.opsForValue().get(key);
        if (cachedUser != null) {
            return (User) cachedUser;
        }
        
        // 缓存未命中,查询数据库
        User user = userRepository.findById(userId);
        if (user != null) {
            // 永不过期缓存
            redisTemplate.opsForValue().set(key, user);
        }
        
        return user;
    }
    
    /**
     * 定期更新热点数据
     */
    @Scheduled(fixedRate = 300000) // 5分钟执行一次
    public void refreshHotData() {
        // 执行热点数据的刷新逻辑
        List<Long> hotUserIds = getHotUserIds();
        for (Long userId : hotUserIds) {
            String key = HOT_USER_KEY + userId;
            User user = userRepository.findById(userId);
            if (user != null) {
                redisTemplate.opsForValue().set(key, user);
            }
        }
    }
    
    private List<Long> getHotUserIds() {
        // 获取热点用户ID列表的逻辑
        return Arrays.asList(1L, 2L, 3L);
    }
}

2. 互斥锁防穿透

使用分布式锁防止缓存击穿,确保同一时间只有一个线程去数据库查询数据。

@Service
public class UserService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @Autowired
    private UserRepository userRepository;
    
    private static final String USER_CACHE_KEY = "user:";
    private static final String LOCK_KEY = "lock:user:";
    private static final long CACHE_TIMEOUT = 3600; // 1小时
    
    public User getUserById(Long id) {
        String key = USER_CACHE_KEY + id;
        String lockKey = LOCK_KEY + id;
        
        // 先从缓存获取
        Object cachedUser = redisTemplate.opsForValue().get(key);
        if (cachedUser != null) {
            return (User) cachedUser;
        }
        
        // 获取分布式锁
        String lockValue = UUID.randomUUID().toString();
        if (redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, 5000, TimeUnit.MILLISECONDS)) {
            try {
                // 重新检查缓存
                cachedUser = redisTemplate.opsForValue().get(key);
                if (cachedUser != null) {
                    return (User) cachedUser;
                }
                
                // 查询数据库
                User user = userRepository.findById(id);
                
                // 缓存结果
                if (user != null) {
                    redisTemplate.opsForValue().set(key, user, CACHE_TIMEOUT, TimeUnit.SECONDS);
                } else {
                    // 缓存空值,设置较短过期时间
                    redisTemplate.opsForValue().set(key, "null", 60, TimeUnit.SECONDS);
                }
                
                return user;
            } finally {
                // 释放锁
                releaseLock(lockKey, lockValue);
            }
        } else {
            // 等待一段时间后重试
            try {
                Thread.sleep(100);
                return getUserById(id);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return null;
            }
        }
    }
    
    private void releaseLock(String lockKey, String lockValue) {
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockKey), lockValue);
    }
}

缓存雪崩解决方案

1. 设置随机过期时间

为缓存设置随机的过期时间,避免大量数据同时失效。

@Service
public class CacheService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    private static final String CACHE_KEY_PREFIX = "cache:";
    
    public void setCacheWithRandomTimeout(String key, Object value, int baseTimeout) {
        // 设置随机过期时间,避免集中失效
        Random random = new Random();
        int randomTimeout = baseTimeout + random.nextInt(300); // 5分钟到5分半钟
        
        String fullKey = CACHE_KEY_PREFIX + key;
        redisTemplate.opsForValue().set(fullKey, value, randomTimeout, TimeUnit.SECONDS);
    }
    
    public Object getCache(String key) {
        String fullKey = CACHE_KEY_PREFIX + key;
        return redisTemplate.opsForValue().get(fullKey);
    }
}

2. 多级缓存架构

构建多级缓存架构,包括本地缓存和分布式缓存,提高系统的容错能力。

@Component
public class MultiLevelCache {
    
    // 本地缓存(Caffeine)
    private final Cache<String, Object> localCache;
    
    // Redis缓存
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    private static final String REDIS_CACHE_KEY = "cache:";
    
    public MultiLevelCache() {
        this.localCache = Caffeine.newBuilder()
                .maximumSize(1000)
                .expireAfterWrite(Duration.ofMinutes(5))
                .build();
    }
    
    /**
     * 多级缓存获取
     */
    public Object get(String key) {
        // 1. 先从本地缓存获取
        Object value = localCache.getIfPresent(key);
        if (value != null) {
            return value;
        }
        
        // 2. 从Redis缓存获取
        String redisKey = REDIS_CACHE_KEY + key;
        value = redisTemplate.opsForValue().get(redisKey);
        if (value != null) {
            // 3. 更新本地缓存
            localCache.put(key, value);
            return value;
        }
        
        return null;
    }
    
    /**
     * 多级缓存设置
     */
    public void set(String key, Object value, long timeout) {
        // 1. 设置Redis缓存
        String redisKey = REDIS_CACHE_KEY + key;
        redisTemplate.opsForValue().set(redisKey, value, timeout, TimeUnit.SECONDS);
        
        // 2. 设置本地缓存
        localCache.put(key, value);
    }
    
    /**
     * 缓存失效时的降级处理
     */
    public Object getWithFallback(String key, Supplier<Object> fallback) {
        try {
            Object value = get(key);
            if (value != null) {
                return value;
            }
            
            // 本地缓存和Redis都未命中,执行降级逻辑
            return fallback.get();
        } catch (Exception e) {
            // 发生异常时的降级处理
            return fallback.get();
        }
    }
}

3. 熔断器模式

结合熔断器模式,在缓存失效导致数据库压力过大时,自动熔断并降级。

@Component
public class CacheServiceWithCircuitBreaker {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    // 使用Resilience4j熔断器
    private final CircuitBreaker circuitBreaker;
    
    public CacheServiceWithCircuitBreaker() {
        this.circuitBreaker = CircuitBreaker.ofDefaults("cacheCircuitBreaker");
    }
    
    public User getUserById(Long id) {
        // 使用熔断器包装方法
        Supplier<User> supplier = () -> {
            String key = "user:" + id;
            
            // 先从缓存获取
            Object cachedUser = redisTemplate.opsForValue().get(key);
            if (cachedUser != null) {
                return (User) cachedUser;
            }
            
            // 缓存未命中,查询数据库
            User user = queryDatabase(id);
            
            // 缓存结果
            if (user != null) {
                redisTemplate.opsForValue().set(key, user, 3600, TimeUnit.SECONDS);
            } else {
                redisTemplate.opsForValue().set(key, "null", 60, TimeUnit.SECONDS);
            }
            
            return user;
        };
        
        return CircuitBreaker.decorateSupplier(circuitBreaker, supplier).get();
    }
    
    private User queryDatabase(Long id) {
        // 数据库查询逻辑
        return new User(id, "User" + id);
    }
}

分布式锁实现

1. 基于Redis的分布式锁实现

@Component
public class RedisDistributedLock {
    
    private static final String LOCK_PREFIX = "lock:";
    private static final long DEFAULT_TIMEOUT = 3000; // 3秒
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    /**
     * 获取分布式锁
     */
    public boolean acquireLock(String lockKey, String lockValue, long timeout) {
        String key = LOCK_PREFIX + lockKey;
        Boolean result = redisTemplate.opsForValue().setIfAbsent(key, lockValue, timeout, TimeUnit.MILLISECONDS);
        return result != null && result;
    }
    
    /**
     * 释放分布式锁
     */
    public void releaseLock(String lockKey, String lockValue) {
        String key = LOCK_PREFIX + lockKey;
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(key), lockValue);
    }
    
    /**
     * 带重试机制的锁获取
     */
    public boolean acquireLockWithRetry(String lockKey, String lockValue, int maxRetries, long retryInterval) {
        for (int i = 0; i < maxRetries; i++) {
            if (acquireLock(lockKey, lockValue, DEFAULT_TIMEOUT)) {
                return true;
            }
            try {
                Thread.sleep(retryInterval);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return false;
            }
        }
        return false;
    }
}

2. 锁的超时机制和自动续期

@Component
public class AutoRenewLockService {
    
    private static final String LOCK_PREFIX = "lock:";
    private static final long DEFAULT_TIMEOUT = 3000;
    private static final long RENEW_INTERVAL = 1000; // 1秒
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    private final Map<String, ScheduledFuture<?>> scheduledTasks = new ConcurrentHashMap<>();
    
    /**
     * 获取分布式锁并启动自动续期
     */
    public boolean acquireLockWithAutoRenew(String lockKey, String lockValue, long timeout) {
        String key = LOCK_PREFIX + lockKey;
        Boolean result = redisTemplate.opsForValue().setIfAbsent(key, lockValue, timeout, TimeUnit.MILLISECONDS);
        
        if (result != null && result) {
            // 启动自动续期任务
            startAutoRenewTask(lockKey, lockValue, timeout);
            return true;
        }
        
        return false;
    }
    
    /**
     * 启动自动续期任务
     */
    private void startAutoRenewTask(String lockKey, String lockValue, long timeout) {
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
        ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(() -> {
            try {
                renewLock(lockKey, lockValue, timeout);
            } catch (Exception e) {
                // 记录日志,但不中断续期任务
                log.error("Lock auto-renew failed", e);
            }
        }, RENEW_INTERVAL, RENEW_INTERVAL, TimeUnit.MILLISECONDS);
        
        scheduledTasks.put(lockKey, future);
    }
    
    /**
     * 续期锁
     */
    private void renewLock(String lockKey, String lockValue, long timeout) {
        String key = LOCK_PREFIX + lockKey;
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('pexpire', KEYS[1], ARGV[2]) else return 0 end";
        redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(key), lockValue, String.valueOf(timeout));
    }
    
    /**
     * 释放锁并停止续期任务
     */
    public void releaseLock(String lockKey, String lockValue) {
        String key = LOCK_PREFIX + lockKey;
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(key), lockValue);
        
        // 停止续期任务
        ScheduledFuture<?> future = scheduledTasks.remove(lockKey);
        if (future != null) {
            future.cancel(true);
        }
    }
}

多级缓存架构设计

1. 缓存层次结构设计

@Component
public class MultiLevelCacheManager {
    
    // 本地缓存(JVM级别)
    private final Cache<String, Object> localCache;
    
    // 二级缓存(Redis)
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    // 缓存配置
    private static final String LOCAL_CACHE_KEY = "local_cache:";
    private static final String REDIS_CACHE_KEY = "redis_cache:";
    
    public MultiLevelCacheManager() {
        this.localCache = Caffeine.newBuilder()
                .maximumSize(1000)
                .expireAfterWrite(Duration.ofMinutes(5))
                .recordStats()
                .build();
    }
    
    /**
     * 获取数据 - 多级缓存查找
     */
    public Object get(String key) {
        // 1. 本地缓存查找
        Object value = localCache.getIfPresent(key);
        if (value != null) {
            return value;
        }
        
        // 2. Redis缓存查找
        String redisKey = REDIS_CACHE_KEY + key;
        value = redisTemplate.opsForValue().get(redisKey);
        if (value != null) {
            // 3. 更新本地缓存
            localCache.put(key, value);
            return value;
        }
        
        return null;
    }
    
    /**
     * 设置数据 - 多级缓存同步
     */
    public void set(String key, Object value, long timeout) {
        // 1. 设置Redis缓存
        String redisKey = REDIS_CACHE_KEY + key;
        redisTemplate.opsForValue().set(redisKey, value, timeout, TimeUnit.SECONDS);
        
        // 2. 设置本地缓存
        localCache.put(key, value);
    }
    
    /**
     * 删除数据 - 多级缓存同步删除
     */
    public void delete(String key) {
        // 1. 删除Redis缓存
        String redisKey = REDIS_CACHE_KEY + key;
        redisTemplate.delete(redisKey);
        
        // 2. 删除本地缓存
        localCache.invalidate(key);
    }
    
    /**
     * 获取缓存统计信息
     */
    public CacheStats getCacheStats() {
        return localCache.stats();
    }
}

2. 缓存预热机制

@Component
public class CacheWarmupService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @Autowired
    private UserRepository userRepository;
    
    private static final String USER_CACHE_KEY = "user:";
    
    /**
     * 系统启动时预热缓存
     */
    @EventListener
    public void handleApplicationStarted(ApplicationReadyEvent event) {
        log.info("Starting cache warmup...");
        warmUpCache();
    }
    
    /**
     * 缓存预热
     */
    private void warmUpCache() {
        // 获取热门用户ID列表
        List<Long> hotUserIds = getHotUserIds();
        
        for (Long userId : hotUserIds) {
            try {
                String key = USER_CACHE_KEY + userId;
                
                // 查询数据库
                User user = userRepository.findById(userId);
                if (user != null) {
                    // 缓存到Redis
                    redisTemplate.opsForValue().set(key, user, 3600, TimeUnit.SECONDS);
                }
            } catch (Exception e) {
                log.error("Cache warmup failed for user: {}", userId, e);
            }
        }
        
        log.info("Cache warmup completed");
    }
    
    private List<Long> getHotUserIds() {
        // 实际应用中可以从数据库或配置中心获取热门用户列表
        return Arrays.asList(1L, 2L, 3L, 4L, 5L);
    }
}

3. 缓存监控和告警

@Component
public class CacheMonitor {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    private static final String METRICS_KEY = "cache_metrics:";
    
    /**
     * 记录缓存访问统计
     */
    public void recordCacheAccess(String cacheKey, boolean hit) {
        String key = METRICS_KEY + cacheKey;
        Map<String, Object> metrics = new HashMap<>();
        metrics.put("access_time", System.currentTimeMillis());
        metrics.put("hit", hit);
        
        redisTemplate.opsForHash().put(key, "metrics", metrics);
    }
    
    /**
     * 获取缓存命中率
     */
    public double getHitRate(String cacheKey) {
        // 实现缓存命中率计算逻辑
        return 0.95; // 示例值
    }
    
    /**
     * 缓存异常监控
     */
    @EventListener
    public void handleCacheException(CacheExceptionEvent event) {
        log.warn("Cache exception occurred: {}", event.getMessage());
        
        // 发送告警通知
        sendAlert(event);
    }
    
    private void sendAlert(CacheExceptionEvent event) {
        // 实现告警逻辑,如发送邮件、短信等
        log.info("Sending alert for cache exception: {}", event.getMessage());
    }
}

最佳实践和注意事项

1. 缓存策略选择

public class CacheStrategy {
    
    /**
     * LRU缓存策略
     */
    public static final String LRU = "LRU";
    
    /**
     * LFU缓存策略
     */
    public static final String LFU = "LFU";
    
    /**
     * FIFO缓存策略
     */
    public static final String FIFO = "FIFO";
    
    /**
     * 根据业务场景选择合适的缓存策略
     */
    public static void configureCacheStrategy(String strategy) {
        switch (strategy) {
            case LRU:
                // 配置LRU策略
                break;
            case LFU:
                // 配置LFU策略
                break;
            case FIFO:
                // 配置FIFO策略
                break;
            default:
                throw new IllegalArgumentException("Unknown cache strategy: " + strategy);
        }
    }
}

2. 性能优化建议

@Component
public class CachePerformanceOptimizer {
    
    /**
     * 批量操作优化
     */
    public void batchSetCache(List<String> keys, List<Object> values) {
        // 使用Pipeline批量操作
        redisTemplate.executePipelined(new RedisCallback<Object>() {
            @Override
            public Object doInRedis(RedisConnection connection) throws DataAccessException {
                for (int i = 0; i < keys.size(); i++) {
                    connection.set(keys.get(i).getBytes(), 
                                  SerializationUtils.serialize(values.get(i)));
                }
                return null;
            }
        });
    }
    
    /**
     * 异步缓存更新
     */
    @Async
    public void asyncUpdateCache(String key, Object value) {
        redisTemplate.ops
相关推荐
广告位招租

相似文章

    评论 (0)

    0/2000