Node.js微服务架构实战:Express + Docker + Redis构建高可用服务网格

NiceWind
NiceWind 2026-01-28T11:07:00+08:00
0 0 1

引言

在现代云原生应用开发中,微服务架构已成为构建可扩展、高可用系统的主流模式。Node.js凭借其非阻塞I/O特性和丰富的生态系统,在微服务领域展现出强大的竞争力。本文将深入探讨如何使用Express框架构建微服务,结合Docker容器化技术实现服务部署,并集成Redis缓存提升系统性能,最终构建一个完整的高可用服务网格架构。

微服务架构概述

什么是微服务架构

微服务架构是一种将单一应用程序拆分为多个小型、独立服务的软件设计方法。每个服务都围绕特定的业务功能构建,通过轻量级通信机制(通常是HTTP API)进行交互。这种架构模式具有以下优势:

  • 可扩展性:可以独立扩展单个服务
  • 技术多样性:不同服务可以使用不同的技术栈
  • 容错性:单个服务故障不会影响整个系统
  • 开发效率:团队可以并行开发不同服务

微服务核心组件

在微服务架构中,关键组件包括:

  • 服务注册与发现
  • 负载均衡
  • API网关
  • 配置管理
  • 监控与日志
  • 缓存系统

Express框架基础构建

项目初始化

首先,我们创建一个基础的Express应用来作为微服务的起点:

mkdir microservice-demo
cd microservice-demo
npm init -y
npm install express cors helmet morgan dotenv
npm install --save-dev nodemon

基础服务器结构

// app.js
const express = require('express');
const cors = require('cors');
const helmet = require('helmet');
const morgan = require('morgan');
require('dotenv').config();

const app = express();
const PORT = process.env.PORT || 3000;

// 中间件配置
app.use(helmet());
app.use(cors());
app.use(morgan('combined'));
app.use(express.json());
app.use(express.urlencoded({ extended: true }));

// 健康检查端点
app.get('/health', (req, res) => {
  res.status(200).json({
    status: 'OK',
    timestamp: new Date().toISOString(),
    service: 'user-service'
  });
});

// 根路由
app.get('/', (req, res) => {
  res.json({ message: 'Welcome to User Service API' });
});

// 错误处理中间件
app.use((err, req, res, next) => {
  console.error(err.stack);
  res.status(500).json({
    error: 'Something went wrong!',
    message: process.env.NODE_ENV === 'development' ? err.message : undefined
  });
});

// 404处理
app.use('*', (req, res) => {
  res.status(404).json({
    error: 'Not Found',
    message: 'Endpoint not found'
  });
});

module.exports = app;

服务路由设计

// routes/userRoutes.js
const express = require('express');
const router = express.Router();

// 模拟用户数据存储(实际项目中应使用数据库)
const users = [
  { id: 1, name: 'Alice', email: 'alice@example.com' },
  { id: 2, name: 'Bob', email: 'bob@example.com' }
];

// 获取所有用户
router.get('/', (req, res) => {
  res.json({
    data: users,
    count: users.length
  });
});

// 根据ID获取用户
router.get('/:id', (req, res) => {
  const id = parseInt(req.params.id);
  const user = users.find(u => u.id === id);
  
  if (!user) {
    return res.status(404).json({
      error: 'User not found'
    });
  }
  
  res.json({ data: user });
});

// 创建新用户
router.post('/', (req, res) => {
  const { name, email } = req.body;
  
  if (!name || !email) {
    return res.status(400).json({
      error: 'Name and email are required'
    });
  }
  
  const newUser = {
    id: users.length + 1,
    name,
    email
  };
  
  users.push(newUser);
  
  res.status(201).json({
    data: newUser,
    message: 'User created successfully'
  });
});

module.exports = router;

Docker容器化部署

Dockerfile配置

# Dockerfile
FROM node:18-alpine

# 设置工作目录
WORKDIR /app

# 复制package文件
COPY package*.json ./

# 安装依赖
RUN npm ci --only=production

# 复制应用代码
COPY . .

# 暴露端口
EXPOSE 3000

# 创建非root用户
RUN addgroup -g 1001 -S nodejs
RUN adduser -S nextjs -u 1001
USER nextjs

# 健康检查
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
  CMD curl -f http://localhost:3000/health || exit 1

# 启动命令
CMD ["node", "server.js"]

docker-compose配置

# docker-compose.yml
version: '3.8'

services:
  # 用户服务
  user-service:
    build: .
    ports:
      - "3000:3000"
    environment:
      - NODE_ENV=production
      - REDIS_URL=redis://redis:6379
    depends_on:
      - redis
    networks:
      - microservice-network
    restart: unless-stopped

  # Redis缓存服务
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis-data:/data
    networks:
      - microservice-network
    restart: unless-stopped

  # API网关(可选)
  api-gateway:
    image: nginx:alpine
    ports:
      - "80:80"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
    depends_on:
      - user-service
    networks:
      - microservice-network
    restart: unless-stopped

volumes:
  redis-data:

networks:
  microservice-network:
    driver: bridge

环境变量管理

// config/index.js
require('dotenv').config();

const config = {
  port: process.env.PORT || 3000,
  environment: process.env.NODE_ENV || 'development',
  redis: {
    url: process.env.REDIS_URL || 'redis://localhost:6379',
    password: process.env.REDIS_PASSWORD || null,
    db: process.env.REDIS_DB || 0
  },
  jwt: {
    secret: process.env.JWT_SECRET || 'your-secret-key',
    expiresIn: process.env.JWT_EXPIRES_IN || '24h'
  }
};

module.exports = config;

Redis缓存集成

缓存客户端初始化

// services/redisClient.js
const redis = require('redis');
const config = require('../config');

class RedisClient {
  constructor() {
    this.client = redis.createClient({
      url: config.redis.url,
      password: config.redis.password,
      db: config.redis.db,
      retry_strategy: (options) => {
        if (options.error && options.error.code === 'ECONNREFUSED') {
          return new Error('Redis server connection refused');
        }
        if (options.total_retry_time > 1000 * 60 * 60) {
          return new Error('Retry time exhausted');
        }
        if (options.attempt > 10) {
          return undefined;
        }
        return Math.min(options.attempt * 100, 3000);
      }
    });

    this.client.on('error', (err) => {
      console.error('Redis Client Error:', err);
    });

    this.client.on('connect', () => {
      console.log('Redis client connected');
    });

    this.client.on('ready', () => {
      console.log('Redis client ready');
    });
  }

  async connect() {
    try {
      await this.client.connect();
      return this.client;
    } catch (error) {
      console.error('Failed to connect to Redis:', error);
      throw error;
    }
  }

  async disconnect() {
    await this.client.quit();
  }

  // 缓存设置
  async set(key, value, expireSeconds = 3600) {
    try {
      const serializedValue = JSON.stringify(value);
      await this.client.setEx(key, expireSeconds, serializedValue);
      return true;
    } catch (error) {
      console.error('Redis SET error:', error);
      return false;
    }
  }

  // 缓存获取
  async get(key) {
    try {
      const value = await this.client.get(key);
      return value ? JSON.parse(value) : null;
    } catch (error) {
      console.error('Redis GET error:', error);
      return null;
    }
  }

  // 删除缓存
  async del(key) {
    try {
      await this.client.del(key);
      return true;
    } catch (error) {
      console.error('Redis DEL error:', error);
      return false;
    }
  }

  // 缓存命中率统计
  async getStats() {
    try {
      const info = await this.client.info();
      return info;
    } catch (error) {
      console.error('Redis stats error:', error);
      return null;
    }
  }
}

module.exports = new RedisClient();

缓存策略实现

// middleware/cacheMiddleware.js
const redisClient = require('../services/redisClient');

const cacheMiddleware = (duration = 3600) => {
  return async (req, res, next) => {
    const key = `cache:${req.originalUrl || req.url}`;
    
    try {
      // 尝试从缓存获取数据
      const cachedData = await redisClient.get(key);
      
      if (cachedData) {
        console.log(`Cache hit for: ${key}`);
        return res.json({
          ...cachedData,
          cached: true,
          timestamp: new Date().toISOString()
        });
      }
      
      // 缓存未命中,记录原始响应函数
      const originalJson = res.json;
      res.json = function(data) {
        // 将数据存储到缓存中
        redisClient.set(key, data, duration)
          .catch(err => console.error('Cache set error:', err));
        
        return originalJson.call(this, data);
      };
      
      next();
    } catch (error) {
      console.error('Cache middleware error:', error);
      next();
    }
  };
};

module.exports = cacheMiddleware;

高级缓存模式

// services/cacheService.js
const redisClient = require('./redisClient');

class CacheService {
  // 缓存用户数据
  static async cacheUser(userId, userData) {
    const key = `user:${userId}`;
    return await redisClient.set(key, userData, 3600); // 1小时过期
  }

  // 获取缓存的用户数据
  static async getCachedUser(userId) {
    const key = `user:${userId}`;
    return await redisClient.get(key);
  }

  // 缓存列表数据
  static async cacheList(key, data, expireSeconds = 1800) {
    return await redisClient.set(key, data, expireSeconds);
  }

  // 获取缓存的列表数据
  static async getCachedList(key) {
    return await redisClient.get(key);
  }

  // 清除用户相关缓存
  static async clearUserCache(userId) {
    const userKey = `user:${userId}`;
    const relatedKeys = [
      userKey,
      `user:profile:${userId}`,
      `user:settings:${userId}`
    ];
    
    for (const key of relatedKeys) {
      await redisClient.del(key);
    }
  }

  // 缓存预热
  static async warmupCache() {
    try {
      const keys = await redisClient.client.keys('user:*');
      console.log(`Found ${keys.length} cached users`);
      
      for (const key of keys) {
        const data = await redisClient.get(key);
        if (data && typeof data === 'object') {
          // 可以在这里添加缓存预热逻辑
          console.log(`Warmup cache for: ${key}`);
        }
      }
    } catch (error) {
      console.error('Cache warmup error:', error);
    }
  }

  // 缓存统计
  static async getCacheStats() {
    try {
      const info = await redisClient.getStats();
      return {
        status: 'success',
        data: info
      };
    } catch (error) {
      return {
        status: 'error',
        message: error.message
      };
    }
  }
}

module.exports = CacheService;

服务注册与发现

服务注册实现

// services/serviceRegistry.js
const redisClient = require('./redisClient');
const config = require('../config');

class ServiceRegistry {
  constructor() {
    this.serviceKeyPrefix = 'service:';
    this.heartbeatInterval = 30000; // 30秒心跳
  }

  // 注册服务
  async registerService(serviceInfo) {
    const serviceKey = `${this.serviceKeyPrefix}${serviceInfo.name}:${serviceInfo.id}`;
    
    try {
      // 设置服务信息,包含健康状态和时间戳
      await redisClient.set(serviceKey, {
        ...serviceInfo,
        registeredAt: new Date().toISOString(),
        lastHeartbeat: new Date().toISOString(),
        status: 'healthy'
      }, 60); // 1分钟过期时间
      
      console.log(`Service registered: ${serviceInfo.name}`);
      return true;
    } catch (error) {
      console.error('Service registration error:', error);
      return false;
    }
  }

  // 更新服务心跳
  async updateHeartbeat(serviceName, serviceId) {
    const serviceKey = `${this.serviceKeyPrefix}${serviceName}:${serviceId}`;
    
    try {
      const serviceInfo = await redisClient.get(serviceKey);
      if (serviceInfo) {
        serviceInfo.lastHeartbeat = new Date().toISOString();
        await redisClient.set(serviceKey, serviceInfo, 60);
        return true;
      }
      return false;
    } catch (error) {
      console.error('Heartbeat update error:', error);
      return false;
    }
  }

  // 获取服务列表
  async getServices(serviceName) {
    try {
      const keys = await redisClient.client.keys(`${this.serviceKeyPrefix}${serviceName}:*`);
      const services = [];
      
      for (const key of keys) {
        const serviceInfo = await redisClient.get(key);
        if (serviceInfo) {
          services.push(serviceInfo);
        }
      }
      
      return services;
    } catch (error) {
      console.error('Get services error:', error);
      return [];
    }
  }

  // 获取健康服务
  async getHealthyServices(serviceName) {
    const allServices = await this.getServices(serviceName);
    return allServices.filter(service => service.status === 'healthy');
  }

  // 取消服务注册
  async unregisterService(serviceName, serviceId) {
    const serviceKey = `${this.serviceKeyPrefix}${serviceName}:${serviceId}`;
    
    try {
      await redisClient.del(serviceKey);
      console.log(`Service unregistered: ${serviceName}`);
      return true;
    } catch (error) {
      console.error('Service unregistration error:', error);
      return false;
    }
  }

  // 启动心跳检测
  startHeartbeat(serviceName, serviceId) {
    setInterval(async () => {
      await this.updateHeartbeat(serviceName, serviceId);
    }, this.heartbeatInterval);
  }
}

module.exports = new ServiceRegistry();

服务发现客户端

// clients/serviceDiscoveryClient.js
const serviceRegistry = require('../services/serviceRegistry');

class ServiceDiscoveryClient {
  constructor() {
    this.discoveryCache = new Map();
    this.cacheTimeout = 5000; // 5秒缓存
  }

  // 发现服务实例
  async discoverService(serviceName) {
    const cacheKey = `discovery:${serviceName}`;
    
    // 检查缓存
    if (this.discoveryCache.has(cacheKey)) {
      const cached = this.discoveryCache.get(cacheKey);
      if (Date.now() - cached.timestamp < this.cacheTimeout) {
        return cached.data;
      }
    }

    try {
      const services = await serviceRegistry.getHealthyServices(serviceName);
      
      // 缓存结果
      this.discoveryCache.set(cacheKey, {
        data: services,
        timestamp: Date.now()
      });
      
      return services;
    } catch (error) {
      console.error('Service discovery error:', error);
      return [];
    }
  }

  // 负载均衡选择服务实例
  async selectServiceInstance(serviceName) {
    const services = await this.discoverService(serviceName);
    
    if (services.length === 0) {
      throw new Error(`No healthy instances found for service: ${serviceName}`);
    }

    // 简单的轮询负载均衡
    const randomIndex = Math.floor(Math.random() * services.length);
    return services[randomIndex];
  }

  // 获取服务健康状态
  async getServiceHealth(serviceName, serviceId) {
    try {
      const key = `service:${serviceName}:${serviceId}`;
      const serviceInfo = await redisClient.get(key);
      
      if (serviceInfo) {
        return {
          status: serviceInfo.status,
          lastHeartbeat: serviceInfo.lastHeartbeat
        };
      }
      
      return { status: 'unknown' };
    } catch (error) {
      console.error('Get service health error:', error);
      return { status: 'error' };
    }
  }
}

module.exports = new ServiceDiscoveryClient();

高可用性设计

服务健康检查

// middleware/healthCheck.js
const redisClient = require('../services/redisClient');

const healthCheckMiddleware = async (req, res, next) => {
  const checks = [];
  
  // 检查数据库连接(如果使用)
  checks.push(checkDatabase());
  
  // 检查Redis连接
  checks.push(checkRedis());
  
  try {
    await Promise.all(checks);
    
    res.status(200).json({
      status: 'healthy',
      timestamp: new Date().toISOString(),
      checks: {
        redis: 'healthy',
        database: 'healthy'
      }
    });
  } catch (error) {
    console.error('Health check failed:', error);
    res.status(503).json({
      status: 'unhealthy',
      error: error.message,
      timestamp: new Date().toISOString()
    });
  }
};

async function checkRedis() {
  try {
    const ping = await redisClient.client.ping();
    if (ping !== 'PONG') {
      throw new Error('Redis ping failed');
    }
    return true;
  } catch (error) {
    throw new Error(`Redis health check failed: ${error.message}`);
  }
}

async function checkDatabase() {
  // 这里可以添加数据库连接检查逻辑
  // 示例:执行一个简单的查询
  return true;
}

module.exports = healthCheckMiddleware;

容错机制实现

// middleware/fallbackMiddleware.js
const redisClient = require('../services/redisClient');

class FallbackService {
  constructor() {
    this.fallbackCache = new Map();
    this.cacheTimeout = 60000; // 1分钟缓存
  }

  async getFallbackData(key, fallbackFn, cacheDuration = 300) {
    const cacheKey = `fallback:${key}`;
    
    // 检查缓存
    if (this.fallbackCache.has(cacheKey)) {
      const cached = this.fallbackCache.get(cacheKey);
      if (Date.now() - cached.timestamp < this.cacheTimeout) {
        return cached.data;
      }
    }

    try {
      const data = await fallbackFn();
      
      // 缓存结果
      this.fallbackCache.set(cacheKey, {
        data,
        timestamp: Date.now()
      });
      
      // 同时缓存到Redis(可选)
      await redisClient.set(cacheKey, data, cacheDuration);
      
      return data;
    } catch (error) {
      console.error('Fallback service error:', error);
      
      // 返回之前缓存的数据
      const cachedData = await redisClient.get(cacheKey);
      if (cachedData) {
        return cachedData;
      }
      
      throw error;
    }
  }

  async clearCache(key) {
    const cacheKey = `fallback:${key}`;
    this.fallbackCache.delete(cacheKey);
    await redisClient.del(cacheKey);
  }
}

const fallbackService = new FallbackService();

const fallbackMiddleware = (fallbackKey, fallbackFn, cacheDuration = 300) => {
  return async (req, res, next) => {
    try {
      const data = await fallbackService.getFallbackData(
        fallbackKey, 
        fallbackFn, 
        cacheDuration
      );
      
      req.fallbackData = data;
      next();
    } catch (error) {
      console.error('Fallback middleware error:', error);
      res.status(500).json({
        error: 'Service temporarily unavailable',
        message: 'Please try again later'
      });
    }
  };
};

module.exports = { fallbackMiddleware, fallbackService };

监控与日志

日志系统集成

// logger/index.js
const winston = require('winston');
const fs = require('fs');
const path = require('path');

// 确保日志目录存在
const logDir = path.join(__dirname, '../logs');
if (!fs.existsSync(logDir)) {
  fs.mkdirSync(logDir, { recursive: true });
}

const logger = winston.createLogger({
  level: 'info',
  format: winston.format.combine(
    winston.format.timestamp(),
    winston.format.errors({ stack: true }),
    winston.format.json()
  ),
  defaultMeta: { service: 'user-service' },
  transports: [
    // 错误日志文件
    new winston.transports.File({
      filename: path.join(logDir, 'error.log'),
      level: 'error',
      maxsize: 5242880, // 5MB
      maxFiles: 5
    }),
    // 所有日志文件
    new winston.transports.File({
      filename: path.join(logDir, 'combined.log'),
      maxsize: 5242880,
      maxFiles: 5
    }),
    // 控制台输出
    new winston.transports.Console({
      format: winston.format.combine(
        winston.format.colorize(),
        winston.format.simple()
      )
    })
  ]
});

// 添加请求日志中间件
const requestLogger = (req, res, next) => {
  const start = Date.now();
  
  // 记录响应结束事件
  res.on('finish', () => {
    const duration = Date.now() - start;
    
    logger.info('HTTP Request', {
      method: req.method,
      url: req.url,
      statusCode: res.statusCode,
      duration: `${duration}ms`,
      ip: req.ip,
      userAgent: req.get('User-Agent')
    });
  });
  
  next();
};

module.exports = { logger, requestLogger };

性能监控

// middleware/performanceMonitor.js
const express = require('express');
const router = express.Router();

class PerformanceMonitor {
  constructor() {
    this.metrics = {
      requests: 0,
      errors: 0,
      averageResponseTime: 0,
      startTime: Date.now()
    };
    
    this.requestCounts = new Map();
  }

  // 记录请求
  recordRequest(req, res, next) {
    const start = process.hrtime.bigint();
    
    res.on('finish', () => {
      const end = process.hrtime.bigint();
      const duration = Number(end - start) / 1000000; // 转换为毫秒
      
      this.metrics.requests++;
      this.metrics.averageResponseTime = 
        (this.metrics.averageResponseTime * (this.metrics.requests - 1) + duration) / 
        this.metrics.requests;
      
      // 记录特定端点的请求次数
      const endpoint = `${req.method} ${req.path}`;
      this.requestCounts.set(endpoint, 
        (this.requestCounts.get(endpoint) || 0) + 1);
      
      if (res.statusCode >= 500) {
        this.metrics.errors++;
      }
    });
    
    next();
  }

  // 获取监控指标
  getMetrics() {
    return {
      ...this.metrics,
      uptime: Math.floor((Date.now() - this.metrics.startTime) / 1000),
      requestCounts: Object.fromEntries(this.requestCounts)
    };
  }

  // 重置指标
  resetMetrics() {
    this.metrics = {
      requests: 0,
      errors: 0,
      averageResponseTime: 0,
      startTime: Date.now()
    };
    this.requestCounts.clear();
  }
}

const monitor = new PerformanceMonitor();

// 指标端点
router.get('/metrics', (req, res) => {
  res.json(monitor.getMetrics());
});

// 性能监控中间件
const performanceMiddleware = (req, res, next) => {
  monitor.recordRequest(req, res, next);
};

module.exports = { performanceMiddleware, router: router };

完整的服务实现

用户服务完整实现

// services/userService.js
const redisClient = require('../services/redisClient');
const CacheService = require('../services/cacheService');

class UserService {
  constructor() {
    this.cachePrefix = 'user:';
    this.cacheTTL = 3600; // 1小时
  }

  // 获取用户列表(带缓存)
  async getUsers() {
    const cacheKey = `${this.cachePrefix}list`;
    
    try {
      // 先尝试从缓存获取
      let users = await CacheService.getCachedList(cacheKey);
      
      if (!users) {
        // 缓存未命中,从数据库或其他数据源获取
        users = await this.fetchUsersFromSource();
        
        // 存储到缓存
        await CacheService.cacheList(cacheKey, users, this.cacheTTL);
      }
      
      return users;
    } catch (error) {
      console.error('Get users error:', error);
      throw error;
    }
  }

  // 获取单个用户(带缓存)
  async getUserById(id) {
    const cacheKey = `${this.cachePrefix}${id}`;
    
    try {
      // 先尝试从缓存获取
      let user = await CacheService.getCachedUser(id);
      
      if (!user) {
        // 缓存未命中,从数据库或其他数据源获取
        user = await this.fetchUserFromSource(id);
        
        if (user) {
          // 存储到缓存
          await CacheService.cacheUser(id, user, this.cacheTTL);
        }
      }
      
      return user;
    } catch (error) {
      console.error('Get user error:',
相关推荐
广告位招租

相似文章

    评论 (0)

    0/2000