引言
在现代软件开发领域,微服务架构已成为构建大规模分布式系统的重要模式。Node.js凭借其非阻塞I/O和事件驱动的特性,成为实现微服务架构的理想选择。本文将详细介绍如何使用Express框架和TypeScript构建完整的微服务架构,涵盖从基础搭建到高级功能的各个方面。
微服务架构概述
什么是微服务架构
微服务架构是一种将单一应用程序拆分为多个小型、独立服务的架构模式。每个服务:
- 运行在自己的进程中
- 通过轻量级通信机制(通常是HTTP API)进行交互
- 专注于特定的业务功能
- 可以独立部署和扩展
微服务的优势与挑战
优势:
- 技术栈灵活性:不同服务可以使用不同的技术栈
- 独立部署:服务可以独立开发、测试和部署
- 可扩展性:可以根据需要单独扩展特定服务
- 团队协作:小团队可以负责特定服务
挑战:
- 分布式复杂性:网络通信、数据一致性等问题
- 服务治理:服务发现、负载均衡、熔断等
- 监控和调试:分布式系统的问题追踪更加困难
- 数据管理:跨服务的数据一致性
技术选型分析
Node.js的优势
Node.js在微服务架构中具有显著优势:
- 高性能:基于V8引擎,事件驱动的非阻塞I/O模型
- 生态丰富:npm生态系统提供了大量成熟的包
- 开发效率:JavaScript/TypeScript统一开发语言
- 社区活跃:持续更新和改进
Express框架选择
Express是Node.js最流行的Web应用框架:
- 轻量级:核心简单,功能可扩展
- 中间件支持:丰富的中间件生态系统
- 易学易用:API设计简洁直观
- 社区支持:文档完善,学习资源丰富
TypeScript的价值
TypeScript为Node.js微服务带来:
- 类型安全:编译时类型检查,减少运行时错误
- 开发体验:智能提示、重构支持
- 代码维护:清晰的接口定义,便于团队协作
- 可预测性:明确的数据结构和函数签名
项目初始化与基础架构搭建
项目结构设计
microservice-project/
├── packages/
│ ├── user-service/
│ │ ├── src/
│ │ │ ├── controllers/
│ │ │ ├── models/
│ │ │ ├── routes/
│ │ │ ├── services/
│ │ │ ├── middleware/
│ │ │ └── utils/
│ │ ├── package.json
│ │ └── tsconfig.json
│ ├── order-service/
│ │ ├── src/
│ │ │ ├── controllers/
│ │ │ ├── models/
│ │ │ ├── routes/
│ │ │ ├── services/
│ │ │ ├── middleware/
│ │ │ └── utils/
│ │ ├── package.json
│ │ └── tsconfig.json
├── shared/
│ ├── types/
│ ├── utils/
│ └── package.json
├── docker-compose.yml
├── package.json
└── tsconfig.base.json
TypeScript配置文件
// tsconfig.base.json
{
"compilerOptions": {
"target": "ES2020",
"module": "commonjs",
"lib": ["ES2020"],
"types": ["node"],
"typeRoots": ["./node_modules/@types"],
"allowJs": true,
"skipLibCheck": true,
"esModuleInterop": true,
"allowSyntheticDefaultImports": true,
"strict": true,
"forceConsistentCasingInFileNames": true,
"noFallthroughCasesInSwitch": true,
"moduleResolution": "node",
"resolveJsonModule": true,
"isolatedModules": true,
"noEmit": true
},
"exclude": ["node_modules", "**/*.spec.ts"]
}
基础服务模板
// packages/user-service/src/app.ts
import express, { Application } from 'express';
import cors from 'cors';
import helmet from 'helmet';
import morgan from 'morgan';
import { errorHandler } from './middleware/errorHandler';
class App {
public app: Application;
constructor() {
this.app = express();
this.initializeMiddlewares();
}
private initializeMiddlewares(): void {
this.app.use(helmet());
this.app.use(cors());
this.app.use(morgan('combined'));
this.app.use(express.json());
this.app.use(express.urlencoded({ extended: true }));
}
public listen(port: number, callback?: () => void): void {
this.app.listen(port, callback);
}
public getServer(): Application {
return this.app;
}
}
export default new App();
核心服务实现
用户服务示例
// packages/user-service/src/models/User.ts
import { Document, Schema, model } from 'mongoose';
export interface IUser extends Document {
name: string;
email: string;
createdAt: Date;
updatedAt: Date;
}
const userSchema: Schema = new Schema({
name: { type: String, required: true },
email: { type: String, required: true, unique: true },
}, {
timestamps: true
});
export default model<IUser>('User', userSchema);
// packages/user-service/src/services/UserService.ts
import { IUser } from '../models/User';
import User from '../models/User';
export class UserService {
public async createUser(userData: Partial<IUser>): Promise<IUser> {
try {
const user = new User(userData);
return await user.save();
} catch (error) {
throw new Error(`Failed to create user: ${error.message}`);
}
}
public async getUserById(id: string): Promise<IUser | null> {
try {
return await User.findById(id);
} catch (error) {
throw new Error(`Failed to get user: ${error.message}`);
}
}
public async getAllUsers(): Promise<IUser[]> {
try {
return await User.find({});
} catch (error) {
throw new Error(`Failed to get users: ${error.message}`);
}
}
public async updateUser(id: string, userData: Partial<IUser>): Promise<IUser | null> {
try {
return await User.findByIdAndUpdate(id, userData, { new: true });
} catch (error) {
throw new Error(`Failed to update user: ${error.message}`);
}
}
public async deleteUser(id: string): Promise<boolean> {
try {
const result = await User.findByIdAndDelete(id);
return result !== null;
} catch (error) {
throw new Error(`Failed to delete user: ${error.message}`);
}
}
}
// packages/user-service/src/controllers/UserController.ts
import { Request, Response } from 'express';
import { UserService } from '../services/UserService';
import { IUser } from '../models/User';
export class UserController {
private userService: UserService;
constructor() {
this.userService = new UserService();
}
public async createUser(req: Request, res: Response): Promise<Response> {
try {
const user = await this.userService.createUser(req.body);
return res.status(201).json(user);
} catch (error) {
return res.status(400).json({ error: error.message });
}
}
public async getUserById(req: Request, res: Response): Promise<Response> {
try {
const user = await this.userService.getUserById(req.params.id);
if (!user) {
return res.status(404).json({ error: 'User not found' });
}
return res.json(user);
} catch (error) {
return res.status(500).json({ error: error.message });
}
}
public async getAllUsers(req: Request, res: Response): Promise<Response> {
try {
const users = await this.userService.getAllUsers();
return res.json(users);
} catch (error) {
return res.status(500).json({ error: error.message });
}
}
public async updateUser(req: Request, res: Response): Promise<Response> {
try {
const user = await this.userService.updateUser(req.params.id, req.body);
if (!user) {
return res.status(404).json({ error: 'User not found' });
}
return res.json(user);
} catch (error) {
return res.status(500).json({ error: error.message });
}
}
public async deleteUser(req: Request, res: Response): Promise<Response> {
try {
const deleted = await this.userService.deleteUser(req.params.id);
if (!deleted) {
return res.status(404).json({ error: 'User not found' });
}
return res.json({ message: 'User deleted successfully' });
} catch (error) {
return res.status(500).json({ error: error.message });
}
}
}
服务间通信机制
HTTP通信实现
// packages/user-service/src/utils/httpClient.ts
import axios, { AxiosInstance, AxiosRequestConfig } from 'axios';
export class HttpClient {
private client: AxiosInstance;
constructor(baseURL: string, timeout: number = 5000) {
this.client = axios.create({
baseURL,
timeout,
headers: {
'Content-Type': 'application/json',
}
});
// 请求拦截器
this.client.interceptors.request.use(
(config) => {
// 添加认证信息等
return config;
},
(error) => {
return Promise.reject(error);
}
);
// 响应拦截器
this.client.interceptors.response.use(
(response) => response,
(error) => {
console.error('HTTP Error:', error);
return Promise.reject(error);
}
);
}
public async get<T>(url: string, config?: AxiosRequestConfig): Promise<T> {
const response = await this.client.get<T>(url, config);
return response.data;
}
public async post<T>(url: string, data?: any, config?: AxiosRequestConfig): Promise<T> {
const response = await this.client.post<T>(url, data, config);
return response.data;
}
public async put<T>(url: string, data?: any, config?: AxiosRequestConfig): Promise<T> {
const response = await this.client.put<T>(url, data, config);
return response.data;
}
public async delete<T>(url: string, config?: AxiosRequestConfig): Promise<T> {
const response = await this.client.delete<T>(url, config);
return response.data;
}
}
export const httpClient = new HttpClient('http://localhost:3001');
消息队列集成
// packages/user-service/src/services/EventService.ts
import { EventEmitter } from 'events';
import amqp, { Connection, Channel } from 'amqplib';
export class EventService {
private connection: Connection | null = null;
private channel: Channel | null = null;
private eventEmitter: EventEmitter;
constructor() {
this.eventEmitter = new EventEmitter();
}
public async connect(): Promise<void> {
try {
this.connection = await amqp.connect('amqp://localhost');
this.channel = await this.connection.createChannel();
// 声明交换机
await this.channel.assertExchange('user.events', 'topic', { durable: true });
console.log('Connected to RabbitMQ');
} catch (error) {
console.error('Failed to connect to RabbitMQ:', error);
throw error;
}
}
public async publishEvent(exchange: string, routingKey: string, message: any): Promise<void> {
if (!this.channel) {
throw new Error('Not connected to RabbitMQ');
}
const buffer = Buffer.from(JSON.stringify(message));
await this.channel.publish(exchange, routingKey, buffer);
console.log(`Published event to ${exchange}.${routingKey}`);
}
public async subscribeToEvent(exchange: string, routingKey: string, callback: (message: any) => void): Promise<void> {
if (!this.channel) {
throw new Error('Not connected to RabbitMQ');
}
const queue = await this.channel.assertQueue('', { exclusive: true });
await this.channel.bindQueue(queue.queue, exchange, routingKey);
await this.channel.consume(queue.queue, (msg) => {
if (msg !== null) {
const message = JSON.parse(msg.content.toString());
callback(message);
this.channel?.ack(msg);
}
});
}
public emit(event: string, data: any): void {
this.eventEmitter.emit(event, data);
}
public on(event: string, listener: (data: any) => void): void {
this.eventEmitter.on(event, listener);
}
}
中间件与安全控制
认证中间件
// packages/user-service/src/middleware/auth.ts
import { Request, Response, NextFunction } from 'express';
import jwt from 'jsonwebtoken';
export interface AuthRequest extends Request {
user?: any;
}
export const authenticateToken = (req: AuthRequest, res: Response, next: NextFunction): void => {
const authHeader = req.headers['authorization'];
const token = authHeader && authHeader.split(' ')[1];
if (!token) {
return res.status(401).json({ error: 'Access token required' });
}
jwt.verify(token, process.env.JWT_SECRET || 'default-secret', (err, user) => {
if (err) {
return res.status(403).json({ error: 'Invalid token' });
}
req.user = user;
next();
});
};
export const authorizeRoles = (...roles: string[]) => {
return (req: AuthRequest, res: Response, next: NextFunction): void => {
if (!req.user || !roles.includes(req.user.role)) {
return res.status(403).json({ error: 'Insufficient permissions' });
}
next();
};
};
请求验证中间件
// packages/user-service/src/middleware/validation.ts
import { Request, Response, NextFunction } from 'express';
import { body, validationResult, ValidationChain } from 'express-validator';
export const validate = (req: Request, res: Response, next: NextFunction): void => {
const errors = validationResult(req);
if (!errors.isEmpty()) {
return res.status(400).json({
errors: errors.array()
});
}
next();
};
export const userValidationRules = (): ValidationChain[] => [
body('name')
.notEmpty()
.withMessage('Name is required')
.isLength({ min: 2, max: 50 })
.withMessage('Name must be between 2 and 50 characters'),
body('email')
.isEmail()
.normalizeEmail()
.withMessage('Valid email is required'),
];
监控与日志系统
日志配置
// packages/user-service/src/utils/logger.ts
import winston from 'winston';
import { format, transports } from 'winston';
const { combine, timestamp, printf } = format;
const logFormat = printf(({ level, message, timestamp, ...meta }) => {
return `${timestamp} [${level.toUpperCase()}]: ${message} ${
Object.keys(meta).length ? JSON.stringify(meta) : ''
}`;
});
export const logger = winston.createLogger({
level: 'info',
format: combine(
timestamp(),
logFormat
),
defaultMeta: { service: 'user-service' },
transports: [
new transports.Console({
format: combine(
timestamp(),
logFormat
)
}),
new transports.File({ filename: 'logs/error.log', level: 'error' }),
new transports.File({ filename: 'logs/combined.log' })
]
});
性能监控
// packages/user-service/src/middleware/monitoring.ts
import { Request, Response, NextFunction } from 'express';
import { logger } from '../utils/logger';
export const requestMonitoring = (req: Request, res: Response, next: NextFunction): void => {
const start = Date.now();
res.on('finish', () => {
const duration = Date.now() - start;
const logData = {
method: req.method,
url: req.url,
statusCode: res.statusCode,
duration: `${duration}ms`,
userAgent: req.get('User-Agent'),
ip: req.ip
};
if (res.statusCode >= 500) {
logger.error('Server Error', logData);
} else if (res.statusCode >= 400) {
logger.warn('Client Error', logData);
} else {
logger.info('Request Completed', logData);
}
});
next();
};
export const metricsMiddleware = () => {
return (req: Request, res: Response, next: NextFunction): void => {
// 这里可以集成Prometheus等监控系统
next();
};
};
错误处理机制
统一错误处理
// packages/user-service/src/middleware/errorHandler.ts
import { Request, Response, NextFunction } from 'express';
export interface CustomError extends Error {
statusCode?: number;
}
export const errorHandler = (
err: CustomError,
req: Request,
res: Response,
next: NextFunction
): Response => {
console.error('Error occurred:', err);
// 默认错误响应
const errorResponse = {
message: err.message || 'Internal Server Error',
statusCode: err.statusCode || 500,
timestamp: new Date().toISOString()
};
// 根据错误类型返回不同状态码
if (err.name === 'ValidationError') {
return res.status(400).json({
...errorResponse,
details: err.message
});
}
if (err.name === 'CastError') {
return res.status(400).json({
...errorResponse,
details: 'Invalid ID format'
});
}
if (err.statusCode) {
return res.status(err.statusCode).json(errorResponse);
}
// 默认500错误
return res.status(500).json(errorResponse);
};
自定义业务异常
// packages/user-service/src/utils/exceptions.ts
export class BusinessError extends Error {
public statusCode: number;
constructor(message: string, statusCode: number = 400) {
super(message);
this.statusCode = statusCode;
this.name = 'BusinessError';
}
}
export class NotFoundError extends BusinessError {
constructor(message: string = 'Resource not found') {
super(message, 404);
this.name = 'NotFoundError';
}
}
export class ValidationError extends BusinessError {
constructor(message: string = 'Validation failed') {
super(message, 400);
this.name = 'ValidationError';
}
}
export class UnauthorizedError extends BusinessError {
constructor(message: string = 'Unauthorized access') {
super(message, 401);
this.name = 'UnauthorizedError';
}
}
部署配置
Docker配置
# packages/user-service/Dockerfile
FROM node:16-alpine
WORKDIR /app
COPY package*.json ./
RUN npm ci --only=production
COPY . .
EXPOSE 3000
CMD ["npm", "start"]
# docker-compose.yml
version: '3.8'
services:
user-service:
build: ./packages/user-service
ports:
- "3000:3000"
environment:
- NODE_ENV=production
- MONGODB_URI=mongodb://mongo:27017/userservice
- JWT_SECRET=your-jwt-secret-key
depends_on:
- mongo
networks:
- microservice-network
order-service:
build: ./packages/order-service
ports:
- "3001:3001"
environment:
- NODE_ENV=production
- MONGODB_URI=mongodb://mongo:27017/orderservice
- JWT_SECRET=your-jwt-secret-key
depends_on:
- mongo
networks:
- microservice-network
mongo:
image: mongo:5.0
ports:
- "27017:27017"
volumes:
- mongodb_data:/data/db
networks:
- microservice-network
volumes:
mongodb_data:
networks:
microservice-network:
driver: bridge
环境配置管理
// packages/user-service/src/config/env.ts
export const config = {
port: process.env.PORT || 3000,
nodeEnv: process.env.NODE_ENV || 'development',
mongoUri: process.env.MONGODB_URI || 'mongodb://localhost:27017/userservice',
jwtSecret: process.env.JWT_SECRET || 'default-secret-key',
rabbitMqUrl: process.env.RABBITMQ_URL || 'amqp://localhost',
serviceName: 'user-service'
};
测试策略
单元测试配置
// packages/user-service/src/__tests__/UserService.test.ts
import { UserService } from '../services/UserService';
import User, { IUser } from '../models/User';
jest.mock('../models/User');
describe('UserService', () => {
let userService: UserService;
let mockUser: Partial<IUser>;
beforeEach(() => {
userService = new UserService();
mockUser = {
name: 'John Doe',
email: 'john@example.com'
};
});
afterEach(() => {
jest.clearAllMocks();
});
describe('createUser', () => {
it('should create a user successfully', async () => {
(User.create as jest.Mock).mockResolvedValue(mockUser);
const result = await userService.createUser(mockUser);
expect(result).toEqual(mockUser);
expect(User.create).toHaveBeenCalledWith(mockUser);
});
it('should throw error when creation fails', async () => {
(User.create as jest.Mock).mockRejectedValue(new Error('Database error'));
await expect(userService.createUser(mockUser))
.rejects.toThrow('Failed to create user');
});
});
describe('getUserById', () => {
it('should return user when found', async () => {
(User.findById as jest.Mock).mockResolvedValue(mockUser);
const result = await userService.getUserById('123');
expect(result).toEqual(mockUser);
expect(User.findById).toHaveBeenCalledWith('123');
});
it('should return null when user not found', async () => {
(User.findById as jest.Mock).mockResolvedValue(null);
const result = await userService.getUserById('123');
expect(result).toBeNull();
});
});
});
集成测试示例
// packages/user-service/src/__tests__/api.test.ts
import request from 'supertest';
import app from '../app';
describe('User API', () => {
describe('GET /users', () => {
it('should return all users', async () => {
const response = await request(app.getServer())
.get('/users')
.expect(200);
expect(response.body).toBeInstanceOf(Array);
});
});
describe('POST /users', () => {
it('should create a new user', async () => {
const userData = {
name: 'Jane Doe',
email: 'jane@example.com'
};
const response = await request(app.getServer())
.post('/users')
.send(userData)
.expect(201);
expect(response.body.name).toBe(userData.name);
expect(response.body.email).toBe(userData.email);
});
it('should return validation error for invalid data', async () => {
const response = await request(app.getServer())
.post('/users')
.send({ name: 'J' })
.expect(400);
expect(response.body.errors).toBeDefined();
});
});
});
性能优化策略
缓存实现
// packages/user-service/src/utils/cache.ts
import NodeCache from 'node-cache';
export class CacheService {
private cache: NodeCache;
constructor(ttlSeconds: number = 300) {
this.cache = new NodeCache({ stdTTL: ttlSeconds, checkperiod: 120 });
}
public get<T>(key: string): T | null {
return this.cache.get<T>(key);
}
public set<T>(key: string, value: T): void {
this.cache.set(key, value);
}
public del(key: string): boolean {
return this.cache.del(key) > 0;
}
public flush(): void {
this.cache.flushAll();
}
}
export const cacheService = new CacheService(300); // 5分钟缓存
数据库优化
// packages/user-service/src/utils/database.ts
import mongoose, { Connection } from 'mongoose';
export class DatabaseManager {
private static connection: Connection | null = null;
public static async connect(uri: string): Promise<void> {
if (this.connection) {
return;
}
try {
const options = {
useNewUrlParser: true,
useUnifiedTopology: true,
maxPoolSize: 10,
serverSelectionTimeoutMS: 5000,
socketTimeoutMS: 45000,
};
this.connection = await mongoose.createConnection(uri, options);
console.log('Connected to MongoDB');
} catch (error) {
console.error('MongoDB connection error:', error);
throw error;
}
}
public static getConnection(): Connection {
if (!this.connection) {
throw new Error('Database not connected');
}
return this.connection;
}
}
部署最佳实践
CI/CD流程
# .github/workflows/ci-cd.yml
name: CI/CD Pipeline
on:
push:
branches: [ main, develop ]
pull_request:
branches: [ main ]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Setup Node.js
uses: actions/setup-node@v2
with:
node-version: '16'
- name: Install dependencies
run: npm ci
- name: Run tests
run: npm test
- name: Run linting
run: npm run lint
build-and-deploy:
needs: test
runs-on: ubuntu-latest
steps:
-
评论 (0)