引言
在现代Web应用开发中,微服务架构已成为构建可扩展、可维护系统的重要模式。Node.js凭借其非阻塞I/O特性和事件驱动模型,成为构建高性能微服务的理想选择。本文将深入探讨如何使用Express和Fastify构建高性能的Node.js微服务架构,涵盖从基础概念到实际实现的完整指南。
微服务架构概述
什么是微服务架构
微服务架构是一种将单一应用程序拆分为多个小型、独立服务的软件设计方法。每个服务:
- 运行在自己的进程中
- 通过轻量级通信机制(通常是HTTP API)进行交互
- 专注于特定的业务功能
- 可以独立部署和扩展
微服务的核心优势
- 技术多样性:不同服务可以使用不同的技术栈
- 可扩展性:可以根据需求单独扩展特定服务
- 容错性:单个服务故障不会影响整个系统
- 团队自治:不同团队可以独立开发和维护不同服务
- 部署灵活性:支持持续集成/持续部署
Node.js微服务架构设计模式
服务拆分策略
业务领域驱动拆分
// 示例:电商系统的服务拆分
const serviceStructure = {
user: {
name: '用户服务',
description: '处理用户注册、登录、权限管理',
techStack: ['Express', 'MongoDB']
},
product: {
name: '商品服务',
description: '管理商品信息、库存、价格',
techStack: ['Fastify', 'PostgreSQL']
},
order: {
name: '订单服务',
description: '处理订单创建、支付、状态管理',
techStack: ['Express', 'Redis']
},
payment: {
name: '支付服务',
description: '处理支付逻辑、退款、对账',
techStack: ['Fastify', 'Kafka']
}
};
按功能模块拆分
// 服务边界定义示例
const serviceBoundaries = {
authenticationService: {
endpoints: ['/login', '/register', '/logout'],
data: ['users', 'roles', 'permissions']
},
productService: {
endpoints: ['/products', '/categories', '/inventory'],
data: ['products', 'categories', 'stocks']
}
};
微服务设计原则
- 单一职责原则:每个服务应该只负责一个业务领域
- 去中心化治理:每个服务可以独立决策
- 容错性设计:服务应具备优雅降级能力
- 数据隔离:每个服务拥有自己的数据库
- 异步通信:优先使用异步消息传递
Express微服务实现
基础服务架构搭建
// app.js - Express服务基础结构
const express = require('express');
const cors = require('cors');
const helmet = require('helmet');
const morgan = require('morgan');
const rateLimit = require('express-rate-limit');
class Microservice {
constructor(name, port) {
this.app = express();
this.name = name;
this.port = port;
this.initializeMiddleware();
this.initializeRoutes();
this.initializeErrorHandling();
}
initializeMiddleware() {
// 安全中间件
this.app.use(helmet());
this.app.use(cors({
origin: '*',
methods: ['GET', 'POST', 'PUT', 'DELETE'],
allowedHeaders: ['Content-Type', 'Authorization']
}));
// 日志中间件
this.app.use(morgan('combined'));
// 限流中间件
const limiter = rateLimit({
windowMs: 15 * 60 * 1000, // 15分钟
max: 100 // 限制每个IP 100个请求
});
this.app.use(limiter);
// 解析中间件
this.app.use(express.json({ limit: '10mb' }));
this.app.use(express.urlencoded({ extended: true }));
}
initializeRoutes() {
// 基础路由
this.app.get('/', (req, res) => {
res.json({
service: this.name,
status: 'healthy',
timestamp: new Date().toISOString()
});
});
// 健康检查端点
this.app.get('/health', (req, res) => {
res.status(200).json({ status: 'healthy' });
});
}
initializeErrorHandling() {
// 全局错误处理
this.app.use((err, req, res, next) => {
console.error(err.stack);
res.status(500).json({
error: 'Internal Server Error',
message: process.env.NODE_ENV === 'development' ? err.message : undefined
});
});
// 404处理
this.app.use((req, res) => {
res.status(404).json({
error: 'Not Found',
message: 'Route not found'
});
});
}
start() {
this.app.listen(this.port, () => {
console.log(`${this.name} service listening on port ${this.port}`);
});
}
}
module.exports = Microservice;
用户服务实现示例
// services/user-service.js
const express = require('express');
const bcrypt = require('bcrypt');
const jwt = require('jsonwebtoken');
const { v4: uuidv4 } = require('uuid');
class UserService extends Microservice {
constructor() {
super('user-service', process.env.USER_SERVICE_PORT || 3001);
this.users = new Map(); // 实际项目中应使用数据库
this.initializeRoutes();
}
initializeRoutes() {
// 用户注册
this.app.post('/register', async (req, res) => {
try {
const { username, email, password } = req.body;
// 验证输入
if (!username || !email || !password) {
return res.status(400).json({
error: 'Missing required fields'
});
}
// 检查用户是否已存在
const existingUser = Array.from(this.users.values())
.find(user => user.email === email || user.username === username);
if (existingUser) {
return res.status(409).json({
error: 'User already exists'
});
}
// 密码加密
const hashedPassword = await bcrypt.hash(password, 12);
// 创建用户
const user = {
id: uuidv4(),
username,
email,
password: hashedPassword,
createdAt: new Date().toISOString()
};
this.users.set(user.id, user);
// 生成JWT token
const token = jwt.sign(
{ userId: user.id, username: user.username },
process.env.JWT_SECRET || 'secret-key',
{ expiresIn: '24h' }
);
res.status(201).json({
user: {
id: user.id,
username: user.username,
email: user.email
},
token
});
} catch (error) {
console.error('Registration error:', error);
res.status(500).json({
error: 'Registration failed'
});
}
});
// 用户登录
this.app.post('/login', async (req, res) => {
try {
const { email, password } = req.body;
if (!email || !password) {
return res.status(400).json({
error: 'Email and password required'
});
}
// 查找用户
const user = Array.from(this.users.values())
.find(u => u.email === email);
if (!user) {
return res.status(401).json({
error: 'Invalid credentials'
});
}
// 验证密码
const isValidPassword = await bcrypt.compare(password, user.password);
if (!isValidPassword) {
return res.status(401).json({
error: 'Invalid credentials'
});
}
// 生成JWT token
const token = jwt.sign(
{ userId: user.id, username: user.username },
process.env.JWT_SECRET || 'secret-key',
{ expiresIn: '24h' }
);
res.json({
user: {
id: user.id,
username: user.username,
email: user.email
},
token
});
} catch (error) {
console.error('Login error:', error);
res.status(500).json({
error: 'Login failed'
});
}
});
// 获取用户信息
this.app.get('/profile', this.authenticateToken, async (req, res) => {
try {
const user = this.users.get(req.user.userId);
if (!user) {
return res.status(404).json({
error: 'User not found'
});
}
res.json({
id: user.id,
username: user.username,
email: user.email,
createdAt: user.createdAt
});
} catch (error) {
console.error('Profile fetch error:', error);
res.status(500).json({
error: 'Failed to fetch profile'
});
}
});
}
authenticateToken(req, res, next) {
const authHeader = req.headers['authorization'];
const token = authHeader && authHeader.split(' ')[1];
if (!token) {
return res.status(401).json({
error: 'Access token required'
});
}
jwt.verify(token, process.env.JWT_SECRET || 'secret-key', (err, user) => {
if (err) {
return res.status(403).json({
error: 'Invalid token'
});
}
req.user = user;
next();
});
}
}
module.exports = UserService;
Fastify微服务实现
Fastify服务优势
Fastify相比Express具有以下优势:
- 更快的性能(约2倍性能提升)
- 更好的TypeScript支持
- 内置Schema验证
- 更少的内存占用
// services/product-service.js
const fastify = require('fastify')({ logger: true });
const { v4: uuidv4 } = require('uuid');
class ProductService {
constructor() {
this.products = new Map();
this.setupRoutes();
this.startServer();
}
setupRoutes() {
// 获取所有商品
fastify.get('/products', {
schema: {
querystring: {
type: 'object',
properties: {
limit: { type: 'number' },
offset: { type: 'number' }
}
}
}
}, async (request, reply) => {
try {
const { limit = 10, offset = 0 } = request.query;
const productsArray = Array.from(this.products.values());
const paginatedProducts = productsArray
.slice(offset, offset + limit);
reply.send({
products: paginatedProducts,
total: productsArray.length,
limit: parseInt(limit),
offset: parseInt(offset)
});
} catch (error) {
reply.status(500).send({
error: 'Failed to fetch products'
});
}
});
// 获取单个商品
fastify.get('/products/:id', {
schema: {
params: {
type: 'object',
properties: {
id: { type: 'string' }
}
}
}
}, async (request, reply) => {
try {
const { id } = request.params;
const product = this.products.get(id);
if (!product) {
return reply.status(404).send({
error: 'Product not found'
});
}
reply.send(product);
} catch (error) {
reply.status(500).send({
error: 'Failed to fetch product'
});
}
});
// 创建商品
fastify.post('/products', {
schema: {
body: {
type: 'object',
required: ['name', 'price'],
properties: {
name: { type: 'string' },
price: { type: 'number' },
description: { type: 'string' },
category: { type: 'string' }
}
}
}
}, async (request, reply) => {
try {
const productData = request.body;
const newProduct = {
id: uuidv4(),
...productData,
createdAt: new Date().toISOString()
};
this.products.set(newProduct.id, newProduct);
reply.status(201).send(newProduct);
} catch (error) {
reply.status(500).send({
error: 'Failed to create product'
});
}
});
// 更新商品
fastify.put('/products/:id', {
schema: {
params: {
type: 'object',
properties: {
id: { type: 'string' }
}
},
body: {
type: 'object',
properties: {
name: { type: 'string' },
price: { type: 'number' },
description: { type: 'string' },
category: { type: 'string' }
}
}
}
}, async (request, reply) => {
try {
const { id } = request.params;
const productData = request.body;
if (!this.products.has(id)) {
return reply.status(404).send({
error: 'Product not found'
});
}
const existingProduct = this.products.get(id);
const updatedProduct = {
...existingProduct,
...productData,
updatedAt: new Date().toISOString()
};
this.products.set(id, updatedProduct);
reply.send(updatedProduct);
} catch (error) {
reply.status(500).send({
error: 'Failed to update product'
});
}
});
// 删除商品
fastify.delete('/products/:id', {
schema: {
params: {
type: 'object',
properties: {
id: { type: 'string' }
}
}
}
}, async (request, reply) => {
try {
const { id } = request.params;
if (!this.products.has(id)) {
return reply.status(404).send({
error: 'Product not found'
});
}
this.products.delete(id);
reply.send({ message: 'Product deleted successfully' });
} catch (error) {
reply.status(500).send({
error: 'Failed to delete product'
});
}
});
// 健康检查
fastify.get('/health', async (request, reply) => {
reply.send({ status: 'healthy' });
});
}
startServer() {
fastify.listen(
process.env.PRODUCT_SERVICE_PORT || 3002,
'0.0.0.0',
(err) => {
if (err) {
fastify.log.error(err);
process.exit(1);
}
console.log('Product service listening on port 3002');
}
);
}
}
module.exports = ProductService;
API网关设计
网关核心功能
API网关是微服务架构中的关键组件,负责:
- 路由请求到正确的服务
- 负载均衡
- 安全认证
- 请求/响应转换
- 限流和监控
// gateway/api-gateway.js
const express = require('express');
const axios = require('axios');
const rateLimit = require('express-rate-limit');
const helmet = require('helmet');
const cors = require('cors');
class APIGateway {
constructor() {
this.app = express();
this.services = new Map();
this.initializeMiddleware();
this.setupRoutes();
}
initializeMiddleware() {
this.app.use(helmet());
this.app.use(cors({
origin: '*',
methods: ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS'],
allowedHeaders: ['Content-Type', 'Authorization']
}));
// 限流
const limiter = rateLimit({
windowMs: 15 * 60 * 1000,
max: 1000
});
this.app.use(limiter);
this.app.use(express.json({ limit: '10mb' }));
}
addService(name, baseUrl) {
this.services.set(name, baseUrl);
}
setupRoutes() {
// 动态路由配置
const serviceRoutes = {
user: '/api/users',
product: '/api/products',
order: '/api/orders'
};
// 用户服务路由
this.app.use('/api/users', async (req, res) => {
try {
const response = await axios({
method: req.method,
url: `${this.services.get('user')}${req.url}`,
headers: { ...req.headers },
data: req.body
});
res.status(response.status).json(response.data);
} catch (error) {
console.error('User service error:', error.message);
res.status(error.response?.status || 500).json({
error: 'Service unavailable'
});
}
});
// 商品服务路由
this.app.use('/api/products', async (req, res) => {
try {
const response = await axios({
method: req.method,
url: `${this.services.get('product')}${req.url}`,
headers: { ...req.headers },
data: req.body
});
res.status(response.status).json(response.data);
} catch (error) {
console.error('Product service error:', error.message);
res.status(error.response?.status || 500).json({
error: 'Service unavailable'
});
}
});
// 订单服务路由
this.app.use('/api/orders', async (req, res) => {
try {
const response = await axios({
method: req.method,
url: `${this.services.get('order')}${req.url}`,
headers: { ...req.headers },
data: req.body
});
res.status(response.status).json(response.data);
} catch (error) {
console.error('Order service error:', error.message);
res.status(error.response?.status || 500).json({
error: 'Service unavailable'
});
}
});
// 统一健康检查
this.app.get('/health', async (req, res) => {
try {
const healthChecks = {};
for (const [name, baseUrl] of this.services.entries()) {
try {
const response = await axios.get(`${baseUrl}/health`);
healthChecks[name] = response.data.status;
} catch (error) {
healthChecks[name] = 'unhealthy';
}
}
res.json({
status: 'healthy',
services: healthChecks,
timestamp: new Date().toISOString()
});
} catch (error) {
res.status(500).json({
error: 'Health check failed'
});
}
});
}
start(port = 8080) {
this.app.listen(port, () => {
console.log(`API Gateway listening on port ${port}`);
console.log('Registered services:', Array.from(this.services.keys()));
});
}
}
module.exports = APIGateway;
服务间通信机制
同步通信(HTTP)
// services/communication/http-communication.js
const axios = require('axios');
class HTTPCommunication {
constructor() {
this.clients = new Map();
}
// 创建服务客户端
createClient(serviceName, baseUrl) {
const client = axios.create({
baseURL: baseUrl,
timeout: 5000,
headers: {
'Content-Type': 'application/json'
}
});
// 请求拦截器
client.interceptors.request.use(
(config) => {
console.log(`Sending request to ${serviceName}: ${config.url}`);
return config;
},
(error) => {
return Promise.reject(error);
}
);
// 响应拦截器
client.interceptors.response.use(
(response) => {
console.log(`Received response from ${serviceName}: ${response.status}`);
return response;
},
(error) => {
console.error(`Error from ${serviceName}:`, error.message);
return Promise.reject(error);
}
);
this.clients.set(serviceName, client);
}
// 调用远程服务
async callService(serviceName, endpoint, options = {}) {
const client = this.clients.get(serviceName);
if (!client) {
throw new Error(`No client found for service: ${serviceName}`);
}
try {
const response = await client({
method: options.method || 'GET',
url: endpoint,
data: options.data,
params: options.params,
headers: options.headers
});
return response.data;
} catch (error) {
console.error(`Service call failed for ${serviceName}:`, error.message);
throw error;
}
}
// 调用用户服务获取用户信息
async getUser(userId) {
try {
const user = await this.callService('user', `/profile/${userId}`);
return user;
} catch (error) {
console.error('Failed to get user:', error.message);
throw new Error('User not found');
}
}
// 调用商品服务获取商品信息
async getProduct(productId) {
try {
const product = await this.callService('product', `/products/${productId}`);
return product;
} catch (error) {
console.error('Failed to get product:', error.message);
throw new Error('Product not found');
}
}
}
module.exports = HTTPCommunication;
异步通信(消息队列)
// services/communication/message-queue.js
const amqp = require('amqplib');
class MessageQueue {
constructor(connectionString) {
this.connectionString = connectionString;
this.connection = null;
this.channel = null;
}
async connect() {
try {
this.connection = await amqp.connect(this.connectionString);
this.channel = await this.connection.createChannel();
console.log('Connected to message queue');
} catch (error) {
console.error('Failed to connect to message queue:', error.message);
throw error;
}
}
async setupExchange(exchangeName, type = 'direct') {
try {
await this.channel.assertExchange(exchangeName, type, { durable: true });
console.log(`Exchange ${exchangeName} created`);
} catch (error) {
console.error('Failed to create exchange:', error.message);
throw error;
}
}
async setupQueue(queueName, exchangeName, routingKey) {
try {
await this.channel.assertQueue(queueName, { durable: true });
await this.channel.bindQueue(queueName, exchangeName, routingKey);
console.log(`Queue ${queueName} bound to exchange ${exchangeName}`);
} catch (error) {
console.error('Failed to setup queue:', error.message);
throw error;
}
}
// 发布消息
async publish(exchangeName, routingKey, message) {
try {
const msgBuffer = Buffer.from(JSON.stringify(message));
await this.channel.publish(exchangeName, routingKey, msgBuffer);
console.log('Message published:', message);
} catch (error) {
console.error('Failed to publish message:', error.message);
throw error;
}
}
// 订阅消息
async subscribe(queueName, callback) {
try {
await this.channel.consume(queueName, async (msg) => {
if (msg !== null) {
try {
const message = JSON.parse(msg.content.toString());
await callback(message);
this.channel.ack(msg);
} catch (error) {
console.error('Failed to process message:', error.message);
this.channel.nack(msg, false, false); // 拒绝消息并重新入队
}
}
});
} catch (error) {
console.error('Failed to subscribe to queue:', error.message);
throw error;
}
}
async close() {
if (this.channel) {
await this.channel.close();
}
if (this.connection) {
await this.connection.close();
}
}
}
module.exports = MessageQueue;
负载均衡与服务发现
基于Consul的服务发现
// services/discovery/consul-discovery.js
const Consul = require('consul');
class ServiceDiscovery {
constructor() {
this.consul = new Consul({
host: process.env.CONSUL_HOST || 'localhost',
port: process.env.CONSUL_PORT || 8500,
scheme: 'http'
});
}
// 注册服务
async registerService(serviceConfig) {
try {
await this.consul.agent.service.register({
name: serviceConfig.name,
id: serviceConfig.id,
address: serviceConfig.address,
port: serviceConfig.port,
check: {
http: `http://${serviceConfig.address}:${serviceConfig.port}/health`,
interval: '10s'
}
});
console.log(`Service ${serviceConfig.name} registered`);
} catch (error) {
console.error('Failed to register service:', error.message);
throw error;
}
}
// 发现服务
async discoverService(serviceName) {
try {
const services = await this.consul.health.service({
service: serviceName,
passing: true
});
return services.map(service => ({
id: service.Service.ID,
name: service.Service.Service,
address: service.Service.Address,
port: service.Service.Port
}));
} catch (error) {
console.error('Failed to discover service:', error.message);
throw error;
}
}
// 获取服务地址
async getServiceAddress(serviceName) {
const services = await this.discoverService(serviceName);
if (services.length === 0) {
throw new Error(`No healthy instances found for service: ${serviceName}`);
}
// 简单的轮询负载均衡
const randomService = services[Math.floor(Math.random() * services.length)];
return `${randomService.address}:${randomService.port}`;
}
}
module.exports = ServiceDiscovery;
负载均衡实现
// services/load-balancing/load-balancer.js
const axios = require('axios');
class LoadBalancer {
constructor() {
this.services = new Map();
this.currentPointer = 0;
}
addService(name, endpoints) {
this.services.set(name, {
endpoints: endpoints,
currentPointer: 0
});
}
// 轮询算法
getNextEndpoint(serviceName) {
const service = this.services.get(service
评论 (0)