引言
在现代微服务架构中,API网关作为系统的核心入口,承担着路由转发、安全认证、流量控制、熔断降级等重要职责。Spring Cloud Gateway作为Spring Cloud生态中的核心组件,为构建统一的微服务网关提供了强大的支持。本文将深入探讨如何基于Spring Cloud Gateway构建一个功能完备、安全可靠的微服务网关,重点实现限流、熔断和安全认证等高级功能。
Spring Cloud Gateway概述
什么是Spring Cloud Gateway
Spring Cloud Gateway是Spring Cloud生态系统中的API网关组件,它基于Spring 5、Project Reactor和Spring Boot 2构建。Gateway旨在为微服务架构提供一种简单而有效的统一入口,它能够处理路由、过滤、限流、熔断等核心功能。
核心特性
Spring Cloud Gateway具有以下核心特性:
- 路由转发:支持基于路径、请求方法、请求头等条件的路由匹配
- 过滤机制:提供强大的过滤器功能,可在请求前后执行自定义逻辑
- 限流控制:内置限流机制,支持令牌桶和漏桶算法
- 熔断降级:集成Hystrix实现服务熔断和降级
- 安全认证:支持JWT、OAuth2等多种认证方式
- 负载均衡:与Ribbon集成,实现服务负载均衡
环境准备与依赖配置
项目结构
gateway-service/
├── src/main/java/com/example/gateway/
│ ├── GatewayApplication.java
│ ├── config/
│ │ ├── GatewayConfig.java
│ │ └── SecurityConfig.java
│ ├── filter/
│ │ ├── RateLimitFilter.java
│ │ └── JwtAuthenticationFilter.java
│ └── handler/
│ └── GlobalExceptionHandler.java
└── src/main/resources/
├── application.yml
└── bootstrap.yml
Maven依赖配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.0</version>
<relativePath/>
</parent>
<groupId>com.example</groupId>
<artifactId>gateway-service</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>gateway-service</name>
<properties>
<java.version>11</java.version>
<spring-cloud.version>2021.0.3</spring-cloud.version>
</properties>
<dependencies>
<!-- Spring Cloud Gateway -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<!-- Spring Cloud LoadBalancer -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-loadbalancer</artifactId>
</dependency>
<!-- Spring Cloud Circuit Breaker -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-circuitbreaker-reactor-resilience4j</artifactId>
</dependency>
<!-- Spring Security -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</dependency>
<!-- JWT -->
<dependency>
<groupId>io.jsonwebtoken</groupId>
<artifactId>jjwt</artifactId>
<version>0.9.1</version>
</dependency>
<!-- Spring Cloud Config Client -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
</dependency>
<!-- Spring Cloud Bus -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
<!-- Actuator -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- Test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
</project>
路由配置与服务发现
基础路由配置
server:
port: 8080
spring:
application:
name: gateway-service
cloud:
gateway:
routes:
# 用户服务路由
- id: user-service
uri: lb://user-service
predicates:
- Path=/api/users/**
filters:
- StripPrefix=2
- name: Retry
args:
retries: 3
statuses: BAD_GATEWAY
backoff:
firstBackoff: 10ms
maxBackoff: 100ms
factor: 2
basedOnPreviousValue: false
# 订单服务路由
- id: order-service
uri: lb://order-service
predicates:
- Path=/api/orders/**
filters:
- StripPrefix=2
- name: Retry
args:
retries: 3
statuses: BAD_GATEWAY
# 商品服务路由
- id: product-service
uri: lb://product-service
predicates:
- Path=/api/products/**
filters:
- StripPrefix=2
- name: Retry
args:
retries: 2
statuses: BAD_GATEWAY
backoff:
firstBackoff: 50ms
maxBackoff: 500ms
factor: 2
basedOnPreviousValue: false
# 服务发现配置
discovery:
client:
simple:
instances:
user-service:
- uri: http://localhost:8081
order-service:
- uri: http://localhost:8082
product-service:
- uri: http://localhost:8083
config:
import: "configserver:http://localhost:8888"
动态路由配置
@Configuration
public class DynamicRouteConfig {
@Autowired
private RouteDefinitionLocator routeDefinitionLocator;
@Bean
public RouteLocator customRouteLocator(RouteDefinitionLocator locator) {
return locator.getRouteDefinitions()
.filter(routeDefinition -> !routeDefinition.getId().equals("user-service"))
.map(routeDefinition -> {
// 动态修改路由配置
routeDefinition.setPredicates(
routeDefinition.getPredicates().stream()
.filter(predicate -> !predicate.getName().equals("Path"))
.collect(Collectors.toList())
);
return routeDefinition;
})
.flatMap(routeDefinition ->
RouteLocatorBuilder.builder()
.route(routeDefinition.getId(),
predicateSpec -> predicateSpec
.path(routeDefinition.getPredicates().get(0).getArgs().get("pattern"))
.filters(filterSpec ->
filterSpec.stripPrefix(2)
.retry(3)
)
.uri(routeDefinition.getUri())
)
.build()
);
}
}
限流功能实现
基于令牌桶算法的限流
@Component
public class RateLimitFilter implements GlobalFilter, Ordered {
private final RedisTemplate<String, String> redisTemplate;
private final ObjectMapper objectMapper;
public RateLimitFilter(RedisTemplate<String, String> redisTemplate, ObjectMapper objectMapper) {
this.redisTemplate = redisTemplate;
this.objectMapper = objectMapper;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
String path = request.getPath().value();
String clientId = getClientId(exchange);
// 限流规则配置
RateLimitConfig config = getRateLimitConfig(path, clientId);
if (config == null) {
return chain.filter(exchange);
}
// 检查是否超过限流阈值
String key = "rate_limit:" + clientId + ":" + path;
Long currentCount = redisTemplate.opsForValue().increment(key, 1);
if (currentCount == 1) {
// 设置过期时间
redisTemplate.expire(key, config.getInterval(), TimeUnit.SECONDS);
}
if (currentCount > config.getLimit()) {
// 限流处理
ServerHttpResponse response = exchange.getResponse();
response.setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
response.getHeaders().add("Content-Type", "application/json");
Map<String, Object> error = new HashMap<>();
error.put("timestamp", System.currentTimeMillis());
error.put("status", 429);
error.put("error", "Too Many Requests");
error.put("message", "请求过于频繁,请稍后再试");
try {
String errorJson = objectMapper.writeValueAsString(error);
DataBuffer buffer = response.bufferFactory().wrap(errorJson.getBytes());
return response.writeWith(Mono.just(buffer));
} catch (Exception e) {
return Mono.error(e);
}
}
return chain.filter(exchange);
}
private String getClientId(ServerWebExchange exchange) {
// 从请求头或参数中获取客户端标识
String clientId = exchange.getRequest().getHeaders().getFirst("X-Client-Id");
if (clientId == null) {
clientId = "default_client";
}
return clientId;
}
private RateLimitConfig getRateLimitConfig(String path, String clientId) {
// 这里可以配置不同的限流规则
Map<String, RateLimitConfig> configMap = new HashMap<>();
configMap.put("/api/users/**", new RateLimitConfig(100, 60)); // 100次/分钟
configMap.put("/api/orders/**", new RateLimitConfig(50, 60)); // 50次/分钟
configMap.put("/api/products/**", new RateLimitConfig(200, 60)); // 200次/分钟
return configMap.get(path);
}
@Override
public int getOrder() {
return -100;
}
public static class RateLimitConfig {
private int limit;
private int interval;
public RateLimitConfig(int limit, int interval) {
this.limit = limit;
this.interval = interval;
}
// getter and setter
public int getLimit() { return limit; }
public void setLimit(int limit) { this.limit = limit; }
public int getInterval() { return interval; }
public void setInterval(int interval) { this.interval = interval; }
}
}
基于Resilience4j的限流实现
@Configuration
public class CircuitBreakerConfig {
@Bean
public CircuitBreaker circuitBreaker() {
return CircuitBreaker.ofDefaults("api-circuit-breaker");
}
@Bean
public RateLimiter rateLimiter() {
RateLimiterConfig config = RateLimiterConfig.custom()
.limitForPeriod(10) // 每秒10个请求
.limitRefreshPeriod(Duration.ofSeconds(1))
.timeoutDuration(Duration.ofMillis(100))
.build();
return RateLimiter.of("api-rate-limiter", config);
}
@Bean
public GlobalFilter rateLimitFilter(RateLimiter rateLimiter) {
return (exchange, chain) -> {
try {
rateLimiter.acquirePermission();
return chain.filter(exchange);
} catch (RateLimiterException e) {
ServerHttpResponse response = exchange.getResponse();
response.setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
return response.writeWith(Mono.empty());
}
};
}
}
熔断降级机制
Hystrix熔断器配置
@Configuration
public class CircuitBreakerConfig {
@Bean
public ReactorLoadBalancer<ReactiveServiceInstance> reactorLoadBalancer(
Environment environment,
ServiceInstanceListSupplier serviceInstanceListSupplier,
LoadBalancerClientConfig config) {
return new RoundRobinLoadBalancer(serviceInstanceListSupplier, config);
}
@Bean
public GlobalFilter circuitBreakerFilter() {
return (exchange, chain) -> {
ServerHttpRequest request = exchange.getRequest();
String path = request.getPath().value();
if (path.contains("/api/users")) {
// 为用户服务添加熔断器
return chain.filter(exchange)
.doOnError(throwable -> {
// 熔断器错误处理
log.error("User service circuit breaker triggered", throwable);
});
}
return chain.filter(exchange);
};
}
}
自定义熔断降级处理
@Component
public class CircuitBreakerFallback {
private static final Logger log = LoggerFactory.getLogger(CircuitBreakerFallback.class);
public Mono<ServerResponse> fallbackHandler(ServerWebExchange exchange, Throwable throwable) {
ServerHttpResponse response = exchange.getResponse();
response.setStatusCode(HttpStatus.SERVICE_UNAVAILABLE);
response.getHeaders().add("Content-Type", "application/json");
Map<String, Object> error = new HashMap<>();
error.put("timestamp", System.currentTimeMillis());
error.put("status", 503);
error.put("error", "Service Unavailable");
error.put("message", "服务暂时不可用,请稍后再试");
error.put("path", exchange.getRequest().getPath().value());
try {
ObjectMapper objectMapper = new ObjectMapper();
String errorJson = objectMapper.writeValueAsString(error);
DataBuffer buffer = response.bufferFactory().wrap(errorJson.getBytes());
return ServerResponse.status(HttpStatus.SERVICE_UNAVAILABLE)
.body(BodyInserters.fromDataBuffers(Mono.just(buffer)));
} catch (Exception e) {
log.error("Error creating fallback response", e);
return ServerResponse.status(HttpStatus.SERVICE_UNAVAILABLE).build();
}
}
}
服务降级策略
@Service
public class UserServiceFallback {
private static final Logger log = LoggerFactory.getLogger(UserServiceFallback.class);
@HystrixCommand(
commandKey = "getUserById",
fallbackMethod = "getUserByIdFallback",
threadPoolKey = "user-service-pool"
)
public User getUserById(Long id) {
// 模拟服务调用
throw new RuntimeException("User service is unavailable");
}
public User getUserByIdFallback(Long id) {
log.warn("Using fallback for getUserById, id: {}", id);
User fallbackUser = new User();
fallbackUser.setId(id);
fallbackUser.setName("Fallback User");
fallbackUser.setEmail("fallback@example.com");
return fallbackUser;
}
@HystrixCommand(
commandKey = "getUsers",
fallbackMethod = "getUsersFallback",
commandProperties = {
@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "3000")
}
)
public List<User> getUsers() {
// 模拟服务调用
throw new RuntimeException("Users service is unavailable");
}
public List<User> getUsersFallback() {
log.warn("Using fallback for getUsers");
return Collections.emptyList();
}
}
安全认证实现
JWT认证配置
@Configuration
@EnableWebFluxSecurity
public class SecurityConfig {
@Bean
public SecurityWebFilterChain springSecurityFilterChain(ServerHttpSecurity http) {
http
.authorizeExchange(exchanges -> exchanges
.pathMatchers("/api/auth/**").permitAll()
.pathMatchers("/api/public/**").permitAll()
.anyExchange().authenticated()
)
.oauth2ResourceServer(oauth2 -> oauth2
.jwt(jwt -> jwt.jwtAuthenticationConverter(jwtAuthenticationConverter()))
)
.csrf(ServerHttpSecurity.CsrfSpec::disable)
.cors(CorsSpec::disable);
return http.build();
}
private Converter<Jwt, Mono<AbstractAuthenticationToken>> jwtAuthenticationConverter() {
JwtAuthenticationConverter jwtAuthenticationConverter = new JwtAuthenticationConverter();
jwtAuthenticationConverter.setJwtGrantedAuthoritiesConverter(new JwtGrantedAuthoritiesConverter());
return new ReactiveJwtAuthenticationConverterAdapter(jwtAuthenticationConverter);
}
@Bean
public JwtDecoder jwtDecoder() {
NimbusJwtDecoder jwtDecoder = new NimbusJwtDecoder(jwkSetUri());
return jwtDecoder;
}
private String jwkSetUri() {
return "http://localhost:8080/auth/realms/test/protocol/openid-connect/certs";
}
}
JWT认证过滤器
@Component
public class JwtAuthenticationFilter implements WebFilter {
private static final Logger log = LoggerFactory.getLogger(JwtAuthenticationFilter.class);
private final JwtDecoder jwtDecoder;
private final ReactiveAuthenticationManager authenticationManager;
public JwtAuthenticationFilter(JwtDecoder jwtDecoder,
ReactiveAuthenticationManager authenticationManager) {
this.jwtDecoder = jwtDecoder;
this.authenticationManager = authenticationManager;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
String token = extractToken(request);
if (token == null || !token.startsWith("Bearer ")) {
return chain.filter(exchange);
}
String jwtToken = token.substring(7);
return Mono.just(jwtToken)
.flatMap(jwtDecoder::decode)
.flatMap(this::authenticate)
.flatMap(authentication -> {
ServerWebExchange mutatedExchange = exchange.mutate()
.principal(Mono.just(authentication))
.build();
return chain.filter(mutatedExchange);
})
.onErrorResume(throwable -> {
log.error("JWT authentication failed", throwable);
ServerHttpResponse response = exchange.getResponse();
response.setStatusCode(HttpStatus.UNAUTHORIZED);
return response.writeWith(Mono.empty());
});
}
private String extractToken(ServerHttpRequest request) {
String bearerToken = request.getHeaders().getFirst("Authorization");
return bearerToken;
}
private Mono<Authentication> authenticate(Jwt jwt) {
JwtAuthenticationToken authenticationToken = new JwtAuthenticationToken(jwt);
return authenticationManager.authenticate(authenticationToken);
}
}
自定义认证管理器
@Component
public class CustomReactiveAuthenticationManager implements ReactiveAuthenticationManager {
private static final Logger log = LoggerFactory.getLogger(CustomReactiveAuthenticationManager.class);
private final UserDetailsService userDetailsService;
private final PasswordEncoder passwordEncoder;
public CustomReactiveAuthenticationManager(UserDetailsService userDetailsService,
PasswordEncoder passwordEncoder) {
this.userDetailsService = userDetailsService;
this.passwordEncoder = passwordEncoder;
}
@Override
public Mono<Authentication> authenticate(Authentication authentication) {
String username = authentication.getName();
String password = (String) authentication.getCredentials();
return userDetailsService.findByUsername(username)
.switchIfEmpty(Mono.error(new BadCredentialsException("User not found")))
.flatMap(userDetails -> {
if (!passwordEncoder.matches(password, userDetails.getPassword())) {
return Mono.error(new BadCredentialsException("Invalid password"));
}
Collection<GrantedAuthority> authorities = userDetails.getAuthorities();
return Mono.just(new UsernamePasswordAuthenticationToken(
userDetails.getUsername(),
userDetails.getPassword(),
authorities
));
});
}
}
高级过滤器实现
请求响应拦截器
@Component
public class RequestResponseLoggingFilter implements GlobalFilter, Ordered {
private static final Logger log = LoggerFactory.getLogger(RequestResponseLoggingFilter.class);
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
ServerHttpResponse response = exchange.getResponse();
// 记录请求开始时间
long startTime = System.currentTimeMillis();
// 记录请求信息
log.info("Request: {} {} from {}",
request.getMethod(),
request.getURI(),
request.getRemoteAddress());
// 记录请求头
request.getHeaders().forEach((name, values) ->
log.debug("Request Header: {} = {}", name, values));
// 包装响应以记录响应信息
ServerHttpResponseDecorator responseDecorator = new ServerHttpResponseDecorator(response) {
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
return super.writeWith(body);
}
};
return chain.filter(exchange.mutate().response(responseDecorator).build())
.doOnSuccess(aVoid -> {
long duration = System.currentTimeMillis() - startTime;
log.info("Response: {} {} - Duration: {}ms",
response.getStatusCode(),
response.getHeaders().getFirst("Content-Type"),
duration);
})
.doOnError(throwable -> {
long duration = System.currentTimeMillis() - startTime;
log.error("Error processing request: {} - Duration: {}ms",
throwable.getMessage(),
duration);
});
}
@Override
public int getOrder() {
return Ordered.LOWEST_PRECEDENCE;
}
}
请求体处理过滤器
@Component
public class RequestBodyFilter implements GlobalFilter, Ordered {
private static final Logger log = LoggerFactory.getLogger(RequestBodyFilter.class);
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
// 检查是否需要处理请求体
if (request.getMethod() == HttpMethod.POST || request.getMethod() == HttpMethod.PUT) {
return exchange.getRequestBody()
.flatMap(body -> {
try {
// 处理请求体
String bodyContent = bodyToString(body);
log.debug("Request body: {}", bodyContent);
// 可以在这里添加数据校验、日志记录等逻辑
return chain.filter(exchange);
} catch (Exception e) {
log.error("Error processing request body", e);
return Mono.error(e);
}
})
.switchIfEmpty(chain.filter(exchange));
}
return chain.filter(exchange);
}
private String bodyToString(DataBuffer dataBuffer) throws IOException {
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
DataBufferUtils.write(Mono.just(dataBuffer), outputStream)
.block(Duration.ofSeconds(10));
return outputStream.toString(StandardCharsets.UTF_8.name());
}
}
@Override
public int getOrder() {
return -50;
}
}
性能优化与监控
缓存策略实现
@Component
public class CacheFilter implements GlobalFilter, Ordered {
private final RedisTemplate<String, Object> redisTemplate;
private final ObjectMapper objectMapper;
public CacheFilter(RedisTemplate<String, Object> redisTemplate, ObjectMapper objectMapper) {
this.redisTemplate = redisTemplate;
this.objectMapper = objectMapper;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
String path = request.getPath().value();
// 只对GET请求进行缓存
if (request.getMethod() == HttpMethod.GET && isCacheablePath(path)) {
String cacheKey = "cache:" + path + ":" + generateCacheKey(request);
return Mono.fromCallable(() -> redisTemplate.opsForValue().get(cacheKey))
.flatMap(cachedResponse -> {
if (cachedResponse != null) {
// 返回缓存响应
ServerHttpResponse response = exchange.getResponse();
response.setStatusCode(HttpStatus.OK);
response.getHeaders().add("X-Cache", "HIT");
try {
String responseJson = objectMapper.writeValueAsString(cachedResponse);
DataBuffer buffer = response.bufferFactory().wrap(responseJson.getBytes());
return response.writeWith(Mono.just(buffer));
} catch (Exception e) {
return Mono.error(e);
}
}
return null;
})
.switchIfEmpty(chain.filter(exchange).doOnSuccess(aVoid -> {
// 缓存响应
ServerHttpResponse response = exchange.getResponse();
if (response.getStatusCode() == HttpStatus.OK) {
// 这里需要实现响应缓存逻辑
// 实际实现中需要更复杂的缓存策略
}
}));
}
return chain.filter(exchange);
}
private boolean isCacheablePath(String path) {
return path.startsWith("/api/products/") ||
path.startsWith("/api/users/");
}
private String generateCacheKey(ServerHttpRequest request) {
// 生成缓存键
StringBuilder keyBuilder = new StringBuilder();
request.getQueryParams().forEach((key, values) ->
keyBuilder.append(key).append("=").append(String.join(",", values)).append("&"));
return keyBuilder.toString();
}
@Override
public int getOrder() {
return -10;
}
}
指标监控配置
@Configuration
public class MonitoringConfig {
@Bean
public MeterRegistryCustomizer<MeterRegistry> metricsCommonTags() {
return registry -> registry.config()
.commonTags("application", "gateway-service");
}
@Bean
public MeterRegistryCustomizer<MeterRegistry> gatewayMetrics() {
return registry -> {
registry.counter("gateway.requests.total", "application", "gateway-service");
registry.timer("gateway.requests.duration", "application", "gateway-service");
};
}
@Bean
public GatewayMetricsFilter gatewayMetricsFilter() {
return new GatewayMetricsFilter();
}
}
配置管理与部署
配置中心集成
spring:
cloud:
config:
uri: http://localhost:8888
name: gateway-service
profile: dev
label: main
bus:
enabled: true
trace:
enabled: true
stream:
bindings:
input:
destination: gateway-events
content-type: application/json
output:
destination: gateway-events
content-type: application/json
Docker部署配置
FROM openjdk:11-jre-slim
WORKDIR /app
COPY target/gateway-service-0.0.1-SNAPSHOT.jar app.jar
EXPOSE 8080
ENTRYPOINT ["java", "-jar", "app.jar"]
version: '3.8'
services:
gateway:
build: .
ports:
- "8080:8080
评论 (0)