引言
在现代云原生应用开发中,微服务架构已成为构建可扩展、高可用系统的主流模式。Node.js凭借其非阻塞I/O特性和丰富的生态系统,在微服务领域展现出强大的竞争力。本文将深入探讨如何使用Express框架构建微服务,结合Docker容器化技术实现服务部署,并集成Redis缓存提升系统性能,最终构建一个完整的高可用服务网格架构。
微服务架构概述
什么是微服务架构
微服务架构是一种将单一应用程序拆分为多个小型、独立服务的软件设计方法。每个服务都围绕特定的业务功能构建,通过轻量级通信机制(通常是HTTP API)进行交互。这种架构模式具有以下优势:
- 可扩展性:可以独立扩展单个服务
- 技术多样性:不同服务可以使用不同的技术栈
- 容错性:单个服务故障不会影响整个系统
- 开发效率:团队可以并行开发不同服务
微服务核心组件
在微服务架构中,关键组件包括:
- 服务注册与发现
- 负载均衡
- API网关
- 配置管理
- 监控与日志
- 缓存系统
Express框架基础构建
项目初始化
首先,我们创建一个基础的Express应用来作为微服务的起点:
mkdir microservice-demo
cd microservice-demo
npm init -y
npm install express cors helmet morgan dotenv
npm install --save-dev nodemon
基础服务器结构
// app.js
const express = require('express');
const cors = require('cors');
const helmet = require('helmet');
const morgan = require('morgan');
require('dotenv').config();
const app = express();
const PORT = process.env.PORT || 3000;
// 中间件配置
app.use(helmet());
app.use(cors());
app.use(morgan('combined'));
app.use(express.json());
app.use(express.urlencoded({ extended: true }));
// 健康检查端点
app.get('/health', (req, res) => {
res.status(200).json({
status: 'OK',
timestamp: new Date().toISOString(),
service: 'user-service'
});
});
// 根路由
app.get('/', (req, res) => {
res.json({ message: 'Welcome to User Service API' });
});
// 错误处理中间件
app.use((err, req, res, next) => {
console.error(err.stack);
res.status(500).json({
error: 'Something went wrong!',
message: process.env.NODE_ENV === 'development' ? err.message : undefined
});
});
// 404处理
app.use('*', (req, res) => {
res.status(404).json({
error: 'Not Found',
message: 'Endpoint not found'
});
});
module.exports = app;
服务路由设计
// routes/userRoutes.js
const express = require('express');
const router = express.Router();
// 模拟用户数据存储(实际项目中应使用数据库)
const users = [
{ id: 1, name: 'Alice', email: 'alice@example.com' },
{ id: 2, name: 'Bob', email: 'bob@example.com' }
];
// 获取所有用户
router.get('/', (req, res) => {
res.json({
data: users,
count: users.length
});
});
// 根据ID获取用户
router.get('/:id', (req, res) => {
const id = parseInt(req.params.id);
const user = users.find(u => u.id === id);
if (!user) {
return res.status(404).json({
error: 'User not found'
});
}
res.json({ data: user });
});
// 创建新用户
router.post('/', (req, res) => {
const { name, email } = req.body;
if (!name || !email) {
return res.status(400).json({
error: 'Name and email are required'
});
}
const newUser = {
id: users.length + 1,
name,
email
};
users.push(newUser);
res.status(201).json({
data: newUser,
message: 'User created successfully'
});
});
module.exports = router;
Docker容器化部署
Dockerfile配置
# Dockerfile
FROM node:18-alpine
# 设置工作目录
WORKDIR /app
# 复制package文件
COPY package*.json ./
# 安装依赖
RUN npm ci --only=production
# 复制应用代码
COPY . .
# 暴露端口
EXPOSE 3000
# 创建非root用户
RUN addgroup -g 1001 -S nodejs
RUN adduser -S nextjs -u 1001
USER nextjs
# 健康检查
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
CMD curl -f http://localhost:3000/health || exit 1
# 启动命令
CMD ["node", "server.js"]
docker-compose配置
# docker-compose.yml
version: '3.8'
services:
# 用户服务
user-service:
build: .
ports:
- "3000:3000"
environment:
- NODE_ENV=production
- REDIS_URL=redis://redis:6379
depends_on:
- redis
networks:
- microservice-network
restart: unless-stopped
# Redis缓存服务
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis-data:/data
networks:
- microservice-network
restart: unless-stopped
# API网关(可选)
api-gateway:
image: nginx:alpine
ports:
- "80:80"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
depends_on:
- user-service
networks:
- microservice-network
restart: unless-stopped
volumes:
redis-data:
networks:
microservice-network:
driver: bridge
环境变量管理
// config/index.js
require('dotenv').config();
const config = {
port: process.env.PORT || 3000,
environment: process.env.NODE_ENV || 'development',
redis: {
url: process.env.REDIS_URL || 'redis://localhost:6379',
password: process.env.REDIS_PASSWORD || null,
db: process.env.REDIS_DB || 0
},
jwt: {
secret: process.env.JWT_SECRET || 'your-secret-key',
expiresIn: process.env.JWT_EXPIRES_IN || '24h'
}
};
module.exports = config;
Redis缓存集成
缓存客户端初始化
// services/redisClient.js
const redis = require('redis');
const config = require('../config');
class RedisClient {
constructor() {
this.client = redis.createClient({
url: config.redis.url,
password: config.redis.password,
db: config.redis.db,
retry_strategy: (options) => {
if (options.error && options.error.code === 'ECONNREFUSED') {
return new Error('Redis server connection refused');
}
if (options.total_retry_time > 1000 * 60 * 60) {
return new Error('Retry time exhausted');
}
if (options.attempt > 10) {
return undefined;
}
return Math.min(options.attempt * 100, 3000);
}
});
this.client.on('error', (err) => {
console.error('Redis Client Error:', err);
});
this.client.on('connect', () => {
console.log('Redis client connected');
});
this.client.on('ready', () => {
console.log('Redis client ready');
});
}
async connect() {
try {
await this.client.connect();
return this.client;
} catch (error) {
console.error('Failed to connect to Redis:', error);
throw error;
}
}
async disconnect() {
await this.client.quit();
}
// 缓存设置
async set(key, value, expireSeconds = 3600) {
try {
const serializedValue = JSON.stringify(value);
await this.client.setEx(key, expireSeconds, serializedValue);
return true;
} catch (error) {
console.error('Redis SET error:', error);
return false;
}
}
// 缓存获取
async get(key) {
try {
const value = await this.client.get(key);
return value ? JSON.parse(value) : null;
} catch (error) {
console.error('Redis GET error:', error);
return null;
}
}
// 删除缓存
async del(key) {
try {
await this.client.del(key);
return true;
} catch (error) {
console.error('Redis DEL error:', error);
return false;
}
}
// 缓存命中率统计
async getStats() {
try {
const info = await this.client.info();
return info;
} catch (error) {
console.error('Redis stats error:', error);
return null;
}
}
}
module.exports = new RedisClient();
缓存策略实现
// middleware/cacheMiddleware.js
const redisClient = require('../services/redisClient');
const cacheMiddleware = (duration = 3600) => {
return async (req, res, next) => {
const key = `cache:${req.originalUrl || req.url}`;
try {
// 尝试从缓存获取数据
const cachedData = await redisClient.get(key);
if (cachedData) {
console.log(`Cache hit for: ${key}`);
return res.json({
...cachedData,
cached: true,
timestamp: new Date().toISOString()
});
}
// 缓存未命中,记录原始响应函数
const originalJson = res.json;
res.json = function(data) {
// 将数据存储到缓存中
redisClient.set(key, data, duration)
.catch(err => console.error('Cache set error:', err));
return originalJson.call(this, data);
};
next();
} catch (error) {
console.error('Cache middleware error:', error);
next();
}
};
};
module.exports = cacheMiddleware;
高级缓存模式
// services/cacheService.js
const redisClient = require('./redisClient');
class CacheService {
// 缓存用户数据
static async cacheUser(userId, userData) {
const key = `user:${userId}`;
return await redisClient.set(key, userData, 3600); // 1小时过期
}
// 获取缓存的用户数据
static async getCachedUser(userId) {
const key = `user:${userId}`;
return await redisClient.get(key);
}
// 缓存列表数据
static async cacheList(key, data, expireSeconds = 1800) {
return await redisClient.set(key, data, expireSeconds);
}
// 获取缓存的列表数据
static async getCachedList(key) {
return await redisClient.get(key);
}
// 清除用户相关缓存
static async clearUserCache(userId) {
const userKey = `user:${userId}`;
const relatedKeys = [
userKey,
`user:profile:${userId}`,
`user:settings:${userId}`
];
for (const key of relatedKeys) {
await redisClient.del(key);
}
}
// 缓存预热
static async warmupCache() {
try {
const keys = await redisClient.client.keys('user:*');
console.log(`Found ${keys.length} cached users`);
for (const key of keys) {
const data = await redisClient.get(key);
if (data && typeof data === 'object') {
// 可以在这里添加缓存预热逻辑
console.log(`Warmup cache for: ${key}`);
}
}
} catch (error) {
console.error('Cache warmup error:', error);
}
}
// 缓存统计
static async getCacheStats() {
try {
const info = await redisClient.getStats();
return {
status: 'success',
data: info
};
} catch (error) {
return {
status: 'error',
message: error.message
};
}
}
}
module.exports = CacheService;
服务注册与发现
服务注册实现
// services/serviceRegistry.js
const redisClient = require('./redisClient');
const config = require('../config');
class ServiceRegistry {
constructor() {
this.serviceKeyPrefix = 'service:';
this.heartbeatInterval = 30000; // 30秒心跳
}
// 注册服务
async registerService(serviceInfo) {
const serviceKey = `${this.serviceKeyPrefix}${serviceInfo.name}:${serviceInfo.id}`;
try {
// 设置服务信息,包含健康状态和时间戳
await redisClient.set(serviceKey, {
...serviceInfo,
registeredAt: new Date().toISOString(),
lastHeartbeat: new Date().toISOString(),
status: 'healthy'
}, 60); // 1分钟过期时间
console.log(`Service registered: ${serviceInfo.name}`);
return true;
} catch (error) {
console.error('Service registration error:', error);
return false;
}
}
// 更新服务心跳
async updateHeartbeat(serviceName, serviceId) {
const serviceKey = `${this.serviceKeyPrefix}${serviceName}:${serviceId}`;
try {
const serviceInfo = await redisClient.get(serviceKey);
if (serviceInfo) {
serviceInfo.lastHeartbeat = new Date().toISOString();
await redisClient.set(serviceKey, serviceInfo, 60);
return true;
}
return false;
} catch (error) {
console.error('Heartbeat update error:', error);
return false;
}
}
// 获取服务列表
async getServices(serviceName) {
try {
const keys = await redisClient.client.keys(`${this.serviceKeyPrefix}${serviceName}:*`);
const services = [];
for (const key of keys) {
const serviceInfo = await redisClient.get(key);
if (serviceInfo) {
services.push(serviceInfo);
}
}
return services;
} catch (error) {
console.error('Get services error:', error);
return [];
}
}
// 获取健康服务
async getHealthyServices(serviceName) {
const allServices = await this.getServices(serviceName);
return allServices.filter(service => service.status === 'healthy');
}
// 取消服务注册
async unregisterService(serviceName, serviceId) {
const serviceKey = `${this.serviceKeyPrefix}${serviceName}:${serviceId}`;
try {
await redisClient.del(serviceKey);
console.log(`Service unregistered: ${serviceName}`);
return true;
} catch (error) {
console.error('Service unregistration error:', error);
return false;
}
}
// 启动心跳检测
startHeartbeat(serviceName, serviceId) {
setInterval(async () => {
await this.updateHeartbeat(serviceName, serviceId);
}, this.heartbeatInterval);
}
}
module.exports = new ServiceRegistry();
服务发现客户端
// clients/serviceDiscoveryClient.js
const serviceRegistry = require('../services/serviceRegistry');
class ServiceDiscoveryClient {
constructor() {
this.discoveryCache = new Map();
this.cacheTimeout = 5000; // 5秒缓存
}
// 发现服务实例
async discoverService(serviceName) {
const cacheKey = `discovery:${serviceName}`;
// 检查缓存
if (this.discoveryCache.has(cacheKey)) {
const cached = this.discoveryCache.get(cacheKey);
if (Date.now() - cached.timestamp < this.cacheTimeout) {
return cached.data;
}
}
try {
const services = await serviceRegistry.getHealthyServices(serviceName);
// 缓存结果
this.discoveryCache.set(cacheKey, {
data: services,
timestamp: Date.now()
});
return services;
} catch (error) {
console.error('Service discovery error:', error);
return [];
}
}
// 负载均衡选择服务实例
async selectServiceInstance(serviceName) {
const services = await this.discoverService(serviceName);
if (services.length === 0) {
throw new Error(`No healthy instances found for service: ${serviceName}`);
}
// 简单的轮询负载均衡
const randomIndex = Math.floor(Math.random() * services.length);
return services[randomIndex];
}
// 获取服务健康状态
async getServiceHealth(serviceName, serviceId) {
try {
const key = `service:${serviceName}:${serviceId}`;
const serviceInfo = await redisClient.get(key);
if (serviceInfo) {
return {
status: serviceInfo.status,
lastHeartbeat: serviceInfo.lastHeartbeat
};
}
return { status: 'unknown' };
} catch (error) {
console.error('Get service health error:', error);
return { status: 'error' };
}
}
}
module.exports = new ServiceDiscoveryClient();
高可用性设计
服务健康检查
// middleware/healthCheck.js
const redisClient = require('../services/redisClient');
const healthCheckMiddleware = async (req, res, next) => {
const checks = [];
// 检查数据库连接(如果使用)
checks.push(checkDatabase());
// 检查Redis连接
checks.push(checkRedis());
try {
await Promise.all(checks);
res.status(200).json({
status: 'healthy',
timestamp: new Date().toISOString(),
checks: {
redis: 'healthy',
database: 'healthy'
}
});
} catch (error) {
console.error('Health check failed:', error);
res.status(503).json({
status: 'unhealthy',
error: error.message,
timestamp: new Date().toISOString()
});
}
};
async function checkRedis() {
try {
const ping = await redisClient.client.ping();
if (ping !== 'PONG') {
throw new Error('Redis ping failed');
}
return true;
} catch (error) {
throw new Error(`Redis health check failed: ${error.message}`);
}
}
async function checkDatabase() {
// 这里可以添加数据库连接检查逻辑
// 示例:执行一个简单的查询
return true;
}
module.exports = healthCheckMiddleware;
容错机制实现
// middleware/fallbackMiddleware.js
const redisClient = require('../services/redisClient');
class FallbackService {
constructor() {
this.fallbackCache = new Map();
this.cacheTimeout = 60000; // 1分钟缓存
}
async getFallbackData(key, fallbackFn, cacheDuration = 300) {
const cacheKey = `fallback:${key}`;
// 检查缓存
if (this.fallbackCache.has(cacheKey)) {
const cached = this.fallbackCache.get(cacheKey);
if (Date.now() - cached.timestamp < this.cacheTimeout) {
return cached.data;
}
}
try {
const data = await fallbackFn();
// 缓存结果
this.fallbackCache.set(cacheKey, {
data,
timestamp: Date.now()
});
// 同时缓存到Redis(可选)
await redisClient.set(cacheKey, data, cacheDuration);
return data;
} catch (error) {
console.error('Fallback service error:', error);
// 返回之前缓存的数据
const cachedData = await redisClient.get(cacheKey);
if (cachedData) {
return cachedData;
}
throw error;
}
}
async clearCache(key) {
const cacheKey = `fallback:${key}`;
this.fallbackCache.delete(cacheKey);
await redisClient.del(cacheKey);
}
}
const fallbackService = new FallbackService();
const fallbackMiddleware = (fallbackKey, fallbackFn, cacheDuration = 300) => {
return async (req, res, next) => {
try {
const data = await fallbackService.getFallbackData(
fallbackKey,
fallbackFn,
cacheDuration
);
req.fallbackData = data;
next();
} catch (error) {
console.error('Fallback middleware error:', error);
res.status(500).json({
error: 'Service temporarily unavailable',
message: 'Please try again later'
});
}
};
};
module.exports = { fallbackMiddleware, fallbackService };
监控与日志
日志系统集成
// logger/index.js
const winston = require('winston');
const fs = require('fs');
const path = require('path');
// 确保日志目录存在
const logDir = path.join(__dirname, '../logs');
if (!fs.existsSync(logDir)) {
fs.mkdirSync(logDir, { recursive: true });
}
const logger = winston.createLogger({
level: 'info',
format: winston.format.combine(
winston.format.timestamp(),
winston.format.errors({ stack: true }),
winston.format.json()
),
defaultMeta: { service: 'user-service' },
transports: [
// 错误日志文件
new winston.transports.File({
filename: path.join(logDir, 'error.log'),
level: 'error',
maxsize: 5242880, // 5MB
maxFiles: 5
}),
// 所有日志文件
new winston.transports.File({
filename: path.join(logDir, 'combined.log'),
maxsize: 5242880,
maxFiles: 5
}),
// 控制台输出
new winston.transports.Console({
format: winston.format.combine(
winston.format.colorize(),
winston.format.simple()
)
})
]
});
// 添加请求日志中间件
const requestLogger = (req, res, next) => {
const start = Date.now();
// 记录响应结束事件
res.on('finish', () => {
const duration = Date.now() - start;
logger.info('HTTP Request', {
method: req.method,
url: req.url,
statusCode: res.statusCode,
duration: `${duration}ms`,
ip: req.ip,
userAgent: req.get('User-Agent')
});
});
next();
};
module.exports = { logger, requestLogger };
性能监控
// middleware/performanceMonitor.js
const express = require('express');
const router = express.Router();
class PerformanceMonitor {
constructor() {
this.metrics = {
requests: 0,
errors: 0,
averageResponseTime: 0,
startTime: Date.now()
};
this.requestCounts = new Map();
}
// 记录请求
recordRequest(req, res, next) {
const start = process.hrtime.bigint();
res.on('finish', () => {
const end = process.hrtime.bigint();
const duration = Number(end - start) / 1000000; // 转换为毫秒
this.metrics.requests++;
this.metrics.averageResponseTime =
(this.metrics.averageResponseTime * (this.metrics.requests - 1) + duration) /
this.metrics.requests;
// 记录特定端点的请求次数
const endpoint = `${req.method} ${req.path}`;
this.requestCounts.set(endpoint,
(this.requestCounts.get(endpoint) || 0) + 1);
if (res.statusCode >= 500) {
this.metrics.errors++;
}
});
next();
}
// 获取监控指标
getMetrics() {
return {
...this.metrics,
uptime: Math.floor((Date.now() - this.metrics.startTime) / 1000),
requestCounts: Object.fromEntries(this.requestCounts)
};
}
// 重置指标
resetMetrics() {
this.metrics = {
requests: 0,
errors: 0,
averageResponseTime: 0,
startTime: Date.now()
};
this.requestCounts.clear();
}
}
const monitor = new PerformanceMonitor();
// 指标端点
router.get('/metrics', (req, res) => {
res.json(monitor.getMetrics());
});
// 性能监控中间件
const performanceMiddleware = (req, res, next) => {
monitor.recordRequest(req, res, next);
};
module.exports = { performanceMiddleware, router: router };
完整的服务实现
用户服务完整实现
// services/userService.js
const redisClient = require('../services/redisClient');
const CacheService = require('../services/cacheService');
class UserService {
constructor() {
this.cachePrefix = 'user:';
this.cacheTTL = 3600; // 1小时
}
// 获取用户列表(带缓存)
async getUsers() {
const cacheKey = `${this.cachePrefix}list`;
try {
// 先尝试从缓存获取
let users = await CacheService.getCachedList(cacheKey);
if (!users) {
// 缓存未命中,从数据库或其他数据源获取
users = await this.fetchUsersFromSource();
// 存储到缓存
await CacheService.cacheList(cacheKey, users, this.cacheTTL);
}
return users;
} catch (error) {
console.error('Get users error:', error);
throw error;
}
}
// 获取单个用户(带缓存)
async getUserById(id) {
const cacheKey = `${this.cachePrefix}${id}`;
try {
// 先尝试从缓存获取
let user = await CacheService.getCachedUser(id);
if (!user) {
// 缓存未命中,从数据库或其他数据源获取
user = await this.fetchUserFromSource(id);
if (user) {
// 存储到缓存
await CacheService.cacheUser(id, user, this.cacheTTL);
}
}
return user;
} catch (error) {
console.error('Get user error:',
评论 (0)