Redis缓存穿透、击穿、雪崩解决方案:高并发场景下的缓存优化策略

Nora253
Nora253 2026-01-26T08:08:20+08:00
0 0 1

引言

在现代分布式系统中,Redis作为高性能的内存数据库,广泛应用于缓存层以提升系统性能和响应速度。然而,在高并发场景下,缓存的使用也带来了诸多挑战,其中最典型的三大问题就是缓存穿透、缓存击穿和缓存雪崩。这些问题如果处理不当,可能导致系统性能急剧下降,甚至服务不可用。

本文将深入分析这三种常见缓存问题的本质原因,并提供切实可行的解决方案,帮助开发者构建更加稳定可靠的高并发系统。

缓存穿透:空值缓存的防御机制

什么是缓存穿透

缓存穿透是指查询一个根本不存在的数据,由于缓存中没有该数据,需要去数据库查询。如果数据库中也没有这个数据,就直接返回空结果。在高并发场景下,大量的请求会直接打到数据库,造成数据库压力过大,严重时可能导致数据库宕机。

缓存穿透的危害

// 模拟缓存穿透的典型场景
@Service
public class UserService {
    @Autowired
    private RedisTemplate redisTemplate;
    
    @Autowired
    private UserMapper userMapper;
    
    public User getUserById(Long id) {
        // 1. 先从Redis中获取用户信息
        String key = "user:" + id;
        User user = (User) redisTemplate.opsForValue().get(key);
        
        // 2. 如果Redis中没有,直接查询数据库
        if (user == null) {
            user = userMapper.selectById(id); // 这里会直接访问数据库
            
            // 3. 将结果写入Redis(这里可能存在问题)
            if (user != null) {
                redisTemplate.opsForValue().set(key, user, 300, TimeUnit.SECONDS);
            }
        }
        
        return user;
    }
}

如上代码所示,当查询一个不存在的用户ID时,会直接穿透到数据库,造成不必要的压力。

缓存穿透解决方案

方案一:缓存空值

@Service
public class UserService {
    @Autowired
    private RedisTemplate redisTemplate;
    
    @Autowired
    private UserMapper userMapper;
    
    private static final Long NULL_USER_TTL = 300L; // 空值缓存过期时间
    
    public User getUserById(Long id) {
        String key = "user:" + id;
        Object cacheValue = redisTemplate.opsForValue().get(key);
        
        // 如果缓存中存在,直接返回
        if (cacheValue != null) {
            if (cacheValue instanceof String && "NULL".equals(cacheValue)) {
                return null; // 返回空值
            }
            return (User) cacheValue;
        }
        
        // 缓存未命中,查询数据库
        User user = userMapper.selectById(id);
        
        // 将查询结果写入缓存
        if (user != null) {
            redisTemplate.opsForValue().set(key, user, 300, TimeUnit.SECONDS);
        } else {
            // 关键:将空值也缓存起来,避免重复查询数据库
            redisTemplate.opsForValue().set(key, "NULL", NULL_USER_TTL, TimeUnit.SECONDS);
        }
        
        return user;
    }
}

方案二:布隆过滤器

@Component
public class BloomFilterCache {
    private static final int CAPACITY = 1000000; // 布隆过滤器容量
    private static final double ERROR_RATE = 0.01; // 误判率
    
    private final BloomFilter<String> bloomFilter;
    
    public BloomFilterCache() {
        this.bloomFilter = BloomFilter.create(
            Funnels.stringFunnel(Charset.defaultCharset()),
            CAPACITY,
            ERROR_RATE
        );
    }
    
    /**
     * 将已存在的key加入布隆过滤器
     */
    public void addKey(String key) {
        bloomFilter.put(key);
    }
    
    /**
     * 检查key是否存在
     */
    public boolean containsKey(String key) {
        return bloomFilter.mightContain(key);
    }
}

@Service
public class UserService {
    @Autowired
    private RedisTemplate redisTemplate;
    
    @Autowired
    private BloomFilterCache bloomFilterCache;
    
    @Autowired
    private UserMapper userMapper;
    
    public User getUserById(Long id) {
        String key = "user:" + id;
        
        // 1. 先通过布隆过滤器检查key是否存在
        if (!bloomFilterCache.containsKey(key)) {
            return null; // 布隆过滤器判断不存在,直接返回
        }
        
        // 2. 如果可能存在,再查询缓存
        Object cacheValue = redisTemplate.opsForValue().get(key);
        if (cacheValue != null) {
            if (cacheValue instanceof String && "NULL".equals(cacheValue)) {
                return null;
            }
            return (User) cacheValue;
        }
        
        // 3. 缓存未命中,查询数据库
        User user = userMapper.selectById(id);
        
        if (user != null) {
            redisTemplate.opsForValue().set(key, user, 300, TimeUnit.SECONDS);
            bloomFilterCache.addKey(key); // 将存在的key加入布隆过滤器
        } else {
            redisTemplate.opsForValue().set(key, "NULL", 300, TimeUnit.SECONDS);
        }
        
        return user;
    }
}

缓存击穿:热点数据的保护策略

什么是缓存击穿

缓存击穿是指某个热点数据在缓存中过期失效,此时大量并发请求同时访问该数据,导致所有请求都直接打到数据库上,形成缓存雪崩的前兆。

缓存击穿的危害

// 缓存击穿示例
@Service
public class ProductService {
    @Autowired
    private RedisTemplate redisTemplate;
    
    @Autowired
    private ProductMapper productMapper;
    
    public Product getProductById(Long id) {
        String key = "product:" + id;
        
        // 1. 先从缓存中获取
        Product product = (Product) redisTemplate.opsForValue().get(key);
        
        if (product == null) {
            // 2. 缓存未命中,查询数据库
            product = productMapper.selectById(id);
            
            // 3. 写入缓存(如果存在)
            if (product != null) {
                redisTemplate.opsForValue().set(key, product, 300, TimeUnit.SECONDS);
            }
        }
        
        return product;
    }
}

当一个热点商品的缓存过期后,大量用户同时访问会导致数据库瞬间压力剧增。

缓存击穿解决方案

方案一:互斥锁机制

@Service
public class ProductService {
    @Autowired
    private RedisTemplate redisTemplate;
    
    @Autowired
    private ProductMapper productMapper;
    
    // 缓存过期时间
    private static final long CACHE_EXPIRE_TIME = 300L;
    
    public Product getProductById(Long id) {
        String key = "product:" + id;
        String lockKey = "lock:" + id;
        
        // 1. 先从缓存中获取
        Product product = (Product) redisTemplate.opsForValue().get(key);
        
        if (product == null) {
            // 2. 缓存未命中,尝试获取分布式锁
            String lockValue = UUID.randomUUID().toString();
            
            try {
                // 使用SET命令的NX和EX参数实现分布式锁
                Boolean lockResult = redisTemplate.opsForValue()
                    .setIfAbsent(lockKey, lockValue, 10, TimeUnit.SECONDS);
                
                if (lockResult) {
                    // 3. 获取到锁,查询数据库
                    product = productMapper.selectById(id);
                    
                    if (product != null) {
                        // 4. 将数据写入缓存
                        redisTemplate.opsForValue().set(key, product, CACHE_EXPIRE_TIME, TimeUnit.SECONDS);
                    } else {
                        // 5. 数据库中也没有,缓存空值
                        redisTemplate.opsForValue().set(key, "NULL", 300, TimeUnit.SECONDS);
                    }
                } else {
                    // 6. 获取锁失败,等待一段时间后重试
                    Thread.sleep(100);
                    return getProductById(id); // 递归调用
                }
            } finally {
                // 7. 释放锁
                releaseLock(lockKey, lockValue);
            }
        } else if ("NULL".equals(product)) {
            return null; // 返回空值
        }
        
        return product;
    }
    
    private void releaseLock(String lockKey, String lockValue) {
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), 
                             Collections.singletonList(lockKey), lockValue);
    }
}

方案二:永不过期 + 异步更新

@Service
public class ProductService {
    @Autowired
    private RedisTemplate redisTemplate;
    
    @Autowired
    private ProductMapper productMapper;
    
    @Autowired
    private ExecutorService executorService;
    
    private static final long CACHE_EXPIRE_TIME = 300L;
    private static final long REFRESH_THRESHOLD = 60L; // 提前刷新阈值
    
    public Product getProductById(Long id) {
        String key = "product:" + id;
        
        // 1. 先从缓存中获取
        Product product = (Product) redisTemplate.opsForValue().get(key);
        
        if (product == null) {
            // 2. 缓存未命中,查询数据库
            product = productMapper.selectById(id);
            
            if (product != null) {
                // 3. 写入缓存(永不过期)
                redisTemplate.opsForValue().set(key, product);
                
                // 4. 异步刷新缓存(在后台线程中执行)
                refreshCacheAsync(id, product);
            } else {
                // 5. 数据库中也没有,缓存空值
                redisTemplate.opsForValue().set(key, "NULL", 300, TimeUnit.SECONDS);
            }
        } else {
            // 6. 检查是否需要刷新缓存
            checkAndRefreshCache(key, product, id);
        }
        
        return product;
    }
    
    private void refreshCacheAsync(Long id, Product product) {
        executorService.submit(() -> {
            try {
                Thread.sleep(REFRESH_THRESHOLD * 1000); // 等待刷新阈值
                
                // 重新查询数据库
                Product freshProduct = productMapper.selectById(id);
                if (freshProduct != null) {
                    String key = "product:" + id;
                    redisTemplate.opsForValue().set(key, freshProduct);
                }
            } catch (Exception e) {
                log.error("缓存刷新失败", e);
            }
        });
    }
    
    private void checkAndRefreshCache(String key, Product product, Long id) {
        // 可以通过设置一个标志位来标记是否需要刷新
        String refreshKey = "refresh:" + id;
        Boolean needRefresh = (Boolean) redisTemplate.opsForValue().get(refreshKey);
        
        if (needRefresh == null || needRefresh) {
            // 异步刷新缓存
            refreshCacheAsync(id, product);
            
            // 设置刷新标志位
            redisTemplate.opsForValue().set(refreshKey, false, CACHE_EXPIRE_TIME, TimeUnit.SECONDS);
        }
    }
}

缓存雪崩:系统整体的防护策略

什么是缓存雪崩

缓存雪崩是指在某一时刻,大量的缓存同时过期失效,导致所有请求都直接打到数据库上,造成数据库压力过大,甚至服务宕机。这通常发生在缓存集中失效的场景下。

缓存雪崩的危害

// 模拟缓存雪崩场景
@Service
public class NewsService {
    @Autowired
    private RedisTemplate redisTemplate;
    
    @Autowired
    private NewsMapper newsMapper;
    
    public List<News> getNewsList() {
        String key = "news:list";
        
        // 1. 从缓存中获取新闻列表
        List<News> newsList = (List<News>) redisTemplate.opsForValue().get(key);
        
        if (newsList == null) {
            // 2. 缓存未命中,查询数据库
            newsList = newsMapper.selectAll();
            
            // 3. 写入缓存(大量数据同时过期)
            redisTemplate.opsForValue().set(key, newsList, 600, TimeUnit.SECONDS);
        }
        
        return newsList;
    }
}

如果所有缓存都在同一时间失效,就会导致雪崩效应。

缓存雪崩解决方案

方案一:设置随机过期时间

@Service
public class NewsService {
    @Autowired
    private RedisTemplate redisTemplate;
    
    @Autowired
    private NewsMapper newsMapper;
    
    private static final long BASE_EXPIRE_TIME = 600L; // 基础过期时间
    private static final long RANDOM_RANGE = 300L; // 随机范围
    
    public List<News> getNewsList() {
        String key = "news:list";
        
        // 1. 从缓存中获取新闻列表
        List<News> newsList = (List<News>) redisTemplate.opsForValue().get(key);
        
        if (newsList == null) {
            // 2. 缓存未命中,查询数据库
            newsList = newsMapper.selectAll();
            
            // 3. 写入缓存(设置随机过期时间)
            long randomExpireTime = BASE_EXPIRE_TIME + 
                                  new Random().nextInt((int) RANDOM_RANGE);
            redisTemplate.opsForValue().set(key, newsList, randomExpireTime, TimeUnit.SECONDS);
        }
        
        return newsList;
    }
}

方案二:多级缓存架构

@Component
public class MultiLevelCache {
    @Autowired
    private RedisTemplate redisTemplate;
    
    // 本地缓存(如Caffeine)
    private final Cache<String, Object> localCache = Caffeine.newBuilder()
        .maximumSize(1000)
        .expireAfterWrite(300, TimeUnit.SECONDS)
        .build();
    
    public Object get(String key) {
        // 1. 先查本地缓存
        Object value = localCache.getIfPresent(key);
        if (value != null) {
            return value;
        }
        
        // 2. 再查Redis缓存
        value = redisTemplate.opsForValue().get(key);
        if (value != null) {
            // 3. 将数据同步到本地缓存
            localCache.put(key, value);
            return value;
        }
        
        return null;
    }
    
    public void set(String key, Object value, long timeout, TimeUnit unit) {
        // 1. 写入Redis
        redisTemplate.opsForValue().set(key, value, timeout, unit);
        
        // 2. 同步写入本地缓存
        localCache.put(key, value);
    }
}

@Service
public class NewsService {
    @Autowired
    private MultiLevelCache multiLevelCache;
    
    @Autowired
    private NewsMapper newsMapper;
    
    public List<News> getNewsList() {
        String key = "news:list";
        
        // 1. 多级缓存获取数据
        List<News> newsList = (List<News>) multiLevelCache.get(key);
        
        if (newsList == null) {
            // 2. 缓存未命中,查询数据库
            newsList = newsMapper.selectAll();
            
            // 3. 写入多级缓存
            multiLevelCache.set(key, newsList, 600, TimeUnit.SECONDS);
        }
        
        return newsList;
    }
}

方案三:限流和降级机制

@Component
public class RateLimiter {
    private final RedisTemplate redisTemplate;
    
    public boolean tryAcquire(String key, int permits, long timeout) {
        String script = "local key = KEYS[1] " +
                      "local permits = tonumber(ARGV[1]) " +
                      "local timeout = tonumber(ARGV[2]) " +
                      "local current = redis.call('GET', key) " +
                      "if current == false then " +
                      "  redis.call('SET', key, permits, 'EX', timeout) " +
                      "  return 1 " +
                      "else " +
                      "  local count = tonumber(current) - 1 " +
                      "  if count >= 0 then " +
                      "    redis.call('SET', key, count, 'EX', timeout) " +
                      "    return 1 " +
                      "  else " +
                      "    return 0 " +
                      "  end " +
                      "end";
        
        try {
            Long result = (Long) redisTemplate.execute(
                new DefaultRedisScript<>(script, Long.class),
                Collections.singletonList(key),
                String.valueOf(permits),
                String.valueOf(timeout)
            );
            
            return result != null && result == 1;
        } catch (Exception e) {
            log.error("限流失败", e);
            return true; // 发生异常时允许通过
        }
    }
}

@Service
public class NewsService {
    @Autowired
    private RedisTemplate redisTemplate;
    
    @Autowired
    private NewsMapper newsMapper;
    
    @Autowired
    private RateLimiter rateLimiter;
    
    private static final String RATE_LIMIT_KEY = "news_rate_limit";
    private static final int MAX_REQUESTS = 100; // 最大请求数
    private static final long TIME_WINDOW = 60; // 时间窗口(秒)
    
    public List<News> getNewsList() {
        // 1. 限流检查
        if (!rateLimiter.tryAcquire(RATE_LIMIT_KEY, MAX_REQUESTS, TIME_WINDOW)) {
            // 2. 超过限流,降级处理
            return getFallbackNewsList();
        }
        
        String key = "news:list";
        
        // 3. 从缓存中获取新闻列表
        List<News> newsList = (List<News>) redisTemplate.opsForValue().get(key);
        
        if (newsList == null) {
            // 4. 缓存未命中,查询数据库
            newsList = newsMapper.selectAll();
            
            // 5. 写入缓存
            redisTemplate.opsForValue().set(key, newsList, 600, TimeUnit.SECONDS);
        }
        
        return newsList;
    }
    
    private List<News> getFallbackNewsList() {
        // 降级策略:返回默认数据或最近的数据
        log.warn("触发限流降级,返回缓存数据");
        String key = "news:list";
        return (List<News>) redisTemplate.opsForValue().get(key);
    }
}

性能优化最佳实践

缓存预热策略

@Component
public class CacheWarmupService {
    @Autowired
    private RedisTemplate redisTemplate;
    
    @Autowired
    private ProductMapper productMapper;
    
    @PostConstruct
    public void warmUpCache() {
        // 系统启动时预热热点数据
        List<Product> hotProducts = productMapper.selectHotProducts();
        
        for (Product product : hotProducts) {
            String key = "product:" + product.getId();
            redisTemplate.opsForValue().set(key, product, 3600, TimeUnit.SECONDS);
        }
        
        log.info("缓存预热完成,预热了{}个热点商品", hotProducts.size());
    }
}

缓存监控和告警

@Component
public class CacheMonitor {
    @Autowired
    private RedisTemplate redisTemplate;
    
    private static final Logger logger = LoggerFactory.getLogger(CacheMonitor.class);
    
    public void monitorCachePerformance() {
        // 监控缓存命中率
        String info = (String) redisTemplate.execute(
            new DefaultRedisScript<>("return redis.call('INFO')", String.class)
        );
        
        // 解析INFO信息中的缓存命中率
        double hitRate = calculateHitRate(info);
        
        if (hitRate < 0.8) {
            logger.warn("缓存命中率过低: {}%", hitRate * 100);
            // 发送告警通知
            sendAlert("缓存命中率异常", "当前命中率为" + hitRate);
        }
    }
    
    private double calculateHitRate(String info) {
        // 解析Redis INFO输出,提取命中率数据
        // 这里简化处理,实际应该解析完整的INFO输出
        return 0.95; // 示例值
    }
    
    private void sendAlert(String title, String message) {
        // 实现告警通知逻辑
        log.info("发送告警: {} - {}", title, message);
    }
}

总结

Redis缓存穿透、击穿、雪崩是高并发场景下常见的性能问题,需要我们从多个维度进行防护:

  1. 缓存穿透:通过缓存空值和布隆过滤器来防止恶意查询
  2. 缓存击穿:使用互斥锁或异步更新机制保护热点数据
  3. 缓存雪崩:设置随机过期时间、多级缓存架构和限流降级策略

在实际应用中,建议综合运用多种方案,并根据业务特点进行调整优化。同时,建立完善的监控体系,及时发现和处理潜在问题,确保系统在高并发场景下的稳定运行。

通过本文介绍的解决方案和最佳实践,开发者可以有效应对Redis缓存相关的性能挑战,构建更加健壮的分布式系统架构。

相关推荐
广告位招租

相似文章

    评论 (0)

    0/2000