Node.js高并发应用异常监控体系建设:基于Prometheus和Grafana的实时异常检测与告警机制

编程语言译者
编程语言译者 2025-12-19T01:06:00+08:00
0 0 1

引言

在现代Web应用开发中,Node.js凭借其事件驱动、非阻塞I/O模型,在处理高并发场景时表现出色。然而,高并发也带来了复杂的异常问题,如内存泄漏、连接超时、请求堆积等。构建一个完善的异常监控体系对于保障系统稳定性和快速定位问题至关重要。

本文将详细介绍如何为Node.js高并发应用构建完整的异常监控体系,涵盖异常捕获、日志收集、指标监控、告警通知等核心组件,并通过Prometheus和Grafana实现实时异常检测和可视化监控。

现代Node.js应用的异常挑战

高并发环境下的异常特点

在高并发场景下,Node.js应用面临的异常问题具有以下特点:

  1. 瞬时性:异常往往在高负载瞬间发生,难以复现
  2. 复杂性:多个请求同时处理可能导致复杂的异常链
  3. 隐蔽性:部分异常可能不会立即导致应用崩溃,但会影响性能
  4. 资源竞争:并发访问可能引发内存、CPU、连接等资源争用

典型的高并发异常场景

// 模拟高并发下的内存泄漏问题
const express = require('express');
const app = express();

// 不良实践:全局变量累积
let globalData = [];

app.get('/leak', (req, res) => {
    // 每次请求都向全局数组添加数据,导致内存泄漏
    globalData.push(new Array(1000000).fill('data'));
    res.json({ status: 'ok' });
});

// 不良实践:未处理的异步错误
app.get('/async-error', async (req, res) => {
    // 这个异步错误如果没有正确处理,可能导致应用崩溃
    await new Promise((resolve, reject) => {
        setTimeout(() => {
            reject(new Error('Async error occurred'));
        }, 1000);
    });
    res.json({ status: 'ok' });
});

异常捕获体系建设

全局错误处理机制

Node.js应用需要建立多层次的异常捕获机制:

const cluster = require('cluster');
const numCPUs = require('os').cpus().length;

// 主进程异常处理
if (cluster.isMaster) {
    console.log(`主进程 ${process.pid} 正在运行`);
    
    // 为每个工作进程创建
    for (let i = 0; i < numCPUs; i++) {
        cluster.fork();
    }
    
    // 监听工作进程退出
    cluster.on('exit', (worker, code, signal) => {
        console.log(`工作进程 ${worker.process.pid} 已退出`);
        // 重启工作进程
        cluster.fork();
    });
    
    // 处理未捕获的异常
    process.on('uncaughtException', (err) => {
        console.error('未捕获的异常:', err);
        // 发送告警通知
        sendAlert('Uncaught Exception', err.message);
        process.exit(1);
    });
    
    // 处理未处理的Promise拒绝
    process.on('unhandledRejection', (reason, promise) => {
        console.error('未处理的Promise拒绝:', reason);
        sendAlert('Unhandled Promise Rejection', reason.message);
    });
} else {
    // 工作进程逻辑
    const express = require('express');
    const app = express();
    
    // 应用级别的错误处理中间件
    app.use((err, req, res, next) => {
        console.error('应用错误:', err);
        sendAlert('Application Error', err.message);
        res.status(500).json({ error: 'Internal Server Error' });
    });
    
    // 启动应用
    app.listen(3000, () => {
        console.log(`工作进程 ${process.pid} 已启动`);
    });
}

自定义异常捕获工具

// exceptionHandler.js
class ExceptionHandler {
    constructor() {
        this.errorCount = 0;
        this.errors = new Map();
    }
    
    // 捕获并记录错误
    captureError(error, context = {}) {
        const errorId = this.generateErrorId(error);
        const timestamp = Date.now();
        
        const errorInfo = {
            id: errorId,
            message: error.message,
            stack: error.stack,
            context: context,
            timestamp: timestamp,
            count: 1
        };
        
        // 更新错误统计
        if (this.errors.has(errorId)) {
            const existing = this.errors.get(errorId);
            existing.count += 1;
            existing.lastOccurrence = timestamp;
        } else {
            this.errors.set(errorId, errorInfo);
        }
        
        this.errorCount++;
        console.error(`[ERROR] ${error.message}`, context);
        
        // 发送告警
        this.sendAlert(error, context);
    }
    
    // 生成错误ID
    generateErrorId(error) {
        return require('crypto')
            .createHash('md5')
            .update(error.message + Date.now().toString())
            .digest('hex');
    }
    
    // 发送告警通知
    sendAlert(error, context) {
        // 这里可以集成邮件、微信、Slack等告警方式
        console.log(`发送告警: ${error.message}`, context);
        
        // 可以集成钉钉、企业微信等通知服务
        this.sendDingTalkAlert(error, context);
    }
    
    // 钉钉告警
    sendDingTalkAlert(error, context) {
        const webhook = process.env.DINGTALK_WEBHOOK;
        if (webhook) {
            const axios = require('axios');
            axios.post(webhook, {
                msgtype: 'text',
                text: {
                    content: `Node.js应用异常告警\n` +
                           `时间: ${new Date().toISOString()}\n` +
                           `错误: ${error.message}\n` +
                           `上下文: ${JSON.stringify(context)}`
                }
            }).catch(err => {
                console.error('发送钉钉告警失败:', err);
            });
        }
    }
    
    // 获取错误统计
    getErrorStats() {
        return {
            totalErrors: this.errorCount,
            uniqueErrors: this.errors.size,
            errorList: Array.from(this.errors.values())
        };
    }
}

module.exports = new ExceptionHandler();

日志收集与管理

结构化日志系统

// logger.js
const winston = require('winston');
const { format, transports } = winston;
const path = require('path');

const logger = winston.createLogger({
    level: 'info',
    format: format.combine(
        format.timestamp(),
        format.errors({ stack: true }),
        format.json()
    ),
    defaultMeta: { service: 'nodejs-app' },
    transports: [
        // 错误日志文件
        new transports.File({ 
            filename: path.join(__dirname, '../logs/error.log'),
            level: 'error',
            maxsize: 5242880,
            maxFiles: 5
        }),
        // 普通日志文件
        new transports.File({ 
            filename: path.join(__dirname, '../logs/combined.log'),
            maxsize: 5242880,
            maxFiles: 5
        }),
        // 控制台输出
        new transports.Console({
            format: format.combine(
                format.colorize(),
                format.simple()
            )
        })
    ]
});

// 添加请求日志中间件
const requestLogger = (req, res, next) => {
    const start = Date.now();
    
    // 记录请求开始
    logger.info('请求开始', {
        method: req.method,
        url: req.url,
        ip: req.ip,
        userAgent: req.get('User-Agent')
    });
    
    // 监听响应结束
    res.on('finish', () => {
        const duration = Date.now() - start;
        logger.info('请求完成', {
            method: req.method,
            url: req.url,
            statusCode: res.statusCode,
            duration: `${duration}ms`,
            ip: req.ip
        });
    });
    
    next();
};

module.exports = { logger, requestLogger };

异常日志记录

// errorLogger.js
const { logger } = require('./logger');
const exceptionHandler = require('./exceptionHandler');

// 统一的错误处理函数
function logError(error, context = {}) {
    const errorInfo = {
        timestamp: new Date().toISOString(),
        error: error.message,
        stack: error.stack,
        context: context,
        service: process.env.SERVICE_NAME || 'unknown',
        instanceId: process.env.INSTANCE_ID || require('os').hostname()
    };
    
    logger.error('应用异常', errorInfo);
    
    // 同时记录到异常处理器
    exceptionHandler.captureError(error, context);
}

// 请求错误处理中间件
function errorMiddleware(err, req, res, next) {
    logError(err, {
        method: req.method,
        url: req.url,
        params: req.params,
        query: req.query,
        body: req.body
    });
    
    // 发送标准错误响应
    res.status(500).json({
        error: 'Internal Server Error',
        message: process.env.NODE_ENV === 'production' ? 
            'An error occurred' : err.message
    });
}

module.exports = {
    logError,
    errorMiddleware
};

指标监控系统

Node.js应用指标收集

// metricsCollector.js
const client = require('prom-client');
const cluster = require('cluster');

// 创建指标收集器
const collectDefaultMetrics = client.collectDefaultMetrics;
const Registry = client.Registry;
const register = new Registry();

// 收集默认指标
collectDefaultMetrics({ register });

// 自定义应用指标
const httpRequestDurationSeconds = new client.Histogram({
    name: 'http_request_duration_seconds',
    help: 'HTTP请求耗时分布',
    labelNames: ['method', 'route', 'status_code'],
    buckets: [0.05, 0.1, 0.2, 0.5, 1, 2, 5, 10]
});

const httpRequestTotal = new client.Counter({
    name: 'http_requests_total',
    help: 'HTTP请求总数',
    labelNames: ['method', 'route', 'status_code']
});

const errorCounter = new client.Counter({
    name: 'app_errors_total',
    help: '应用错误总数',
    labelNames: ['error_type', 'service']
});

const memoryUsageGauge = new client.Gauge({
    name: 'nodejs_memory_usage_bytes',
    help: 'Node.js内存使用情况',
    labelNames: ['type']
});

const cpuUsageGauge = new client.Gauge({
    name: 'nodejs_cpu_usage_percent',
    help: 'Node.js CPU使用率'
});

// 指标更新函数
function updateMetrics() {
    // 内存指标
    const usage = process.memoryUsage();
    memoryUsageGauge.set({ type: 'rss' }, usage.rss);
    memoryUsageGauge.set({ type: 'heapTotal' }, usage.heapTotal);
    memoryUsageGauge.set({ type: 'heapUsed' }, usage.heapUsed);
    memoryUsageGauge.set({ type: 'external' }, usage.external);
    
    // CPU指标(简化版本)
    const cpu = process.cpuUsage();
    cpuUsageGauge.set(cpu.user / 1000); // 转换为百分比
    
    // 集群模式下的工作进程指标
    if (cluster.isWorker) {
        // 可以添加工作进程特定的指标
    }
}

// HTTP请求指标收集中间件
function metricsMiddleware(req, res, next) {
    const start = Date.now();
    
    res.on('finish', () => {
        const duration = (Date.now() - start) / 1000;
        httpRequestDurationSeconds.observe({
            method: req.method,
            route: req.route?.path || req.url,
            status_code: res.statusCode
        }, duration);
        
        httpRequestTotal.inc({
            method: req.method,
            route: req.route?.path || req.url,
            status_code: res.statusCode
        });
    });
    
    next();
}

// 错误指标收集
function recordError(error, type = 'unknown') {
    errorCounter.inc({
        error_type: type,
        service: process.env.SERVICE_NAME || 'unknown'
    });
}

module.exports = {
    register,
    updateMetrics,
    metricsMiddleware,
    recordError,
    httpRequestDurationSeconds,
    httpRequestTotal,
    errorCounter
};

Prometheus指标端点

// metricsServer.js
const express = require('express');
const { register } = require('./metricsCollector');

const app = express();

// 指标端点
app.get('/metrics', async (req, res) => {
    try {
        // 更新指标
        const metrics = await register.metrics();
        res.set('Content-Type', register.contentType);
        res.end(metrics);
    } catch (err) {
        console.error('获取指标失败:', err);
        res.status(500).end();
    }
});

// 健康检查端点
app.get('/health', (req, res) => {
    res.json({ 
        status: 'healthy',
        timestamp: new Date().toISOString(),
        uptime: process.uptime()
    });
});

module.exports = app;

Prometheus集成与配置

Prometheus配置文件

# prometheus.yml
global:
  scrape_interval: 15s
  evaluation_interval: 15s

scrape_configs:
  - job_name: 'nodejs-app'
    static_configs:
      - targets: ['localhost:3000']
    metrics_path: '/metrics'
    scrape_interval: 5s
    
  - job_name: 'nodejs-cluster'
    static_configs:
      - targets: 
        - 'localhost:3000'
        - 'localhost:3001'
        - 'localhost:3002'
    metrics_path: '/metrics'
    scrape_interval: 5s

rule_files:
  - "alert.rules.yml"

alerting:
  alertmanagers:
    - static_configs:
        - targets:
          - localhost:9093

告警规则配置

# alert.rules.yml
groups:
- name: nodejs-app-alerts
  rules:
  - alert: HighErrorRate
    expr: rate(app_errors_total[5m]) > 10
    for: 2m
    labels:
      severity: critical
    annotations:
      summary: "应用错误率过高"
      description: "应用在最近5分钟内错误率超过每秒10次,当前值为 {{ $value }}"

  - alert: HighMemoryUsage
    expr: nodejs_memory_usage_bytes{type="rss"} / 1024 / 1024 > 512
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "内存使用率过高"
      description: "应用内存使用量超过512MB,当前值为 {{ $value }} MB"

  - alert: HighResponseTime
    expr: histogram_quantile(0.95, sum(rate(http_request_duration_seconds_bucket[5m])) by (le)) > 2
    for: 3m
    labels:
      severity: warning
    annotations:
      summary: "响应时间过长"
      description: "95%的请求响应时间超过2秒,当前值为 {{ $value }} 秒"

  - alert: HighCPUUsage
    expr: nodejs_cpu_usage_percent > 80
    for: 3m
    labels:
      severity: critical
    annotations:
      summary: "CPU使用率过高"
      description: "应用CPU使用率超过80%,当前值为 {{ $value }}%"

  - alert: ServiceDown
    expr: up == 0
    for: 1m
    labels:
      severity: critical
    annotations:
      summary: "服务不可用"
      description: "应用服务已停止响应"

Grafana可视化监控

Grafana仪表板配置

{
  "dashboard": {
    "id": null,
    "title": "Node.js 应用监控",
    "timezone": "browser",
    "schemaVersion": 16,
    "version": 0,
    "refresh": "5s",
    "panels": [
      {
        "type": "graph",
        "title": "HTTP请求响应时间",
        "targets": [
          {
            "expr": "histogram_quantile(0.95, sum(rate(http_request_duration_seconds_bucket[5m])) by (le))",
            "legendFormat": "95%分位数"
          },
          {
            "expr": "histogram_quantile(0.50, sum(rate(http_request_duration_seconds_bucket[5m])) by (le))",
            "legendFormat": "50%分位数"
          }
        ]
      },
      {
        "type": "graph",
        "title": "错误率监控",
        "targets": [
          {
            "expr": "rate(app_errors_total[5m])",
            "legendFormat": "{{error_type}}"
          }
        ]
      },
      {
        "type": "graph",
        "title": "内存使用情况",
        "targets": [
          {
            "expr": "nodejs_memory_usage_bytes{type=\"rss\"}",
            "legendFormat": "RSS"
          },
          {
            "expr": "nodejs_memory_usage_bytes{type=\"heapUsed\"}",
            "legendFormat": "Heap Used"
          }
        ]
      },
      {
        "type": "graph",
        "title": "CPU使用率",
        "targets": [
          {
            "expr": "nodejs_cpu_usage_percent",
            "legendFormat": "CPU使用率"
          }
        ]
      }
    ]
  }
}

监控面板优化建议

// dashboardOptimizer.js
class DashboardOptimizer {
    constructor() {
        this.alerts = new Map();
        this.performanceMetrics = new Map();
    }
    
    // 添加告警规则
    addAlert(alertName, condition, severity = 'warning') {
        this.alerts.set(alertName, {
            condition,
            severity,
            enabled: true,
            createdAt: new Date()
        });
    }
    
    // 性能指标收集
    collectPerformanceMetrics() {
        const metrics = {
            memory: process.memoryUsage(),
            cpu: process.cpuUsage(),
            uptime: process.uptime(),
            loadavg: require('os').loadavg(),
            eventLoopDelay: this.calculateEventLoopDelay()
        };
        
        return metrics;
    }
    
    // 计算事件循环延迟
    calculateEventLoopDelay() {
        const start = process.hrtime.bigint();
        const end = process.hrtime.bigint();
        return Number(end - start) / 1000000; // 转换为毫秒
    }
    
    // 自动优化建议
    getOptimizationSuggestions() {
        const suggestions = [];
        const metrics = this.collectPerformanceMetrics();
        
        if (metrics.memory.rss > 1024 * 1024 * 1024) { // 1GB
            suggestions.push({
                type: 'memory',
                severity: 'high',
                suggestion: '考虑优化内存使用,检查是否存在内存泄漏'
            });
        }
        
        if (metrics.loadavg[0] > require('os').cpus().length) {
            suggestions.push({
                type: 'cpu',
                severity: 'medium',
                suggestion: 'CPU负载过高,考虑增加实例或优化代码'
            });
        }
        
        return suggestions;
    }
}

module.exports = new DashboardOptimizer();

告警通知系统

多渠道告警集成

// alertManager.js
const axios = require('axios');
const nodemailer = require('nodemailer');

class AlertManager {
    constructor() {
        this.alertChannels = [];
        this.initChannels();
    }
    
    // 初始化告警通道
    initChannels() {
        if (process.env.EMAIL_ALERTS === 'true') {
            this.addEmailChannel();
        }
        
        if (process.env.DINGTALK_WEBHOOK) {
            this.addDingTalkChannel();
        }
        
        if (process.env.WECHAT_WEBHOOK) {
            this.addWechatChannel();
        }
    }
    
    // 添加邮件告警通道
    addEmailChannel() {
        const transporter = nodemailer.createTransporter({
            host: process.env.SMTP_HOST,
            port: parseInt(process.env.SMTP_PORT),
            secure: process.env.SMTP_SECURE === 'true',
            auth: {
                user: process.env.SMTP_USER,
                pass: process.env.SMTP_PASS
            }
        });
        
        this.alertChannels.push({
            type: 'email',
            send: async (alert) => {
                try {
                    await transporter.sendMail({
                        from: process.env.EMAIL_FROM,
                        to: process.env.EMAIL_TO,
                        subject: `[ALERT] ${alert.title}`,
                        text: this.generateAlertText(alert)
                    });
                    console.log('邮件告警发送成功');
                } catch (error) {
                    console.error('邮件告警发送失败:', error);
                }
            }
        });
    }
    
    // 添加钉钉告警通道
    addDingTalkChannel() {
        this.alertChannels.push({
            type: 'dingtalk',
            send: async (alert) => {
                try {
                    await axios.post(process.env.DINGTALK_WEBHOOK, {
                        msgtype: 'markdown',
                        markdown: {
                            title: `[ALERT] ${alert.title}`,
                            text: this.generateDingTalkMarkdown(alert)
                        }
                    });
                    console.log('钉钉告警发送成功');
                } catch (error) {
                    console.error('钉钉告警发送失败:', error);
                }
            }
        });
    }
    
    // 添加企业微信告警通道
    addWechatChannel() {
        this.alertChannels.push({
            type: 'wechat',
            send: async (alert) => {
                try {
                    await axios.post(process.env.WECHAT_WEBHOOK, {
                        msgtype: 'text',
                        text: {
                            content: `[ALERT] ${alert.title}\n\n${this.generateAlertText(alert)}`
                        }
                    });
                    console.log('企业微信告警发送成功');
                } catch (error) {
                    console.error('企业微信告警发送失败:', error);
                }
            }
        });
    }
    
    // 发送告警
    async sendAlert(alert) {
        for (const channel of this.alertChannels) {
            try {
                await channel.send(alert);
            } catch (error) {
                console.error(`告警通道 ${channel.type} 发送失败:`, error);
            }
        }
    }
    
    // 生成告警文本
    generateAlertText(alert) {
        return `
时间: ${new Date().toISOString()}
严重程度: ${alert.severity}
标题: ${alert.title}
描述: ${alert.description}
服务: ${alert.service}
实例: ${alert.instance}
        `.trim();
    }
    
    // 生成钉钉Markdown
    generateDingTalkMarkdown(alert) {
        return `
# [ALERT] ${alert.title}

**时间**: ${new Date().toISOString()}
**严重程度**: ${alert.severity}
**服务**: ${alert.service}
**实例**: ${alert.instance}

## 描述

${alert.description}

## 建议措施

- 检查相关日志文件
- 分析性能指标
- 必要时进行代码优化
        `.trim();
    }
}

module.exports = new AlertManager();

告警降级机制

// alertDeduplicator.js
class AlertDeduplicator {
    constructor() {
        this.alertCache = new Map();
        this.cooldownPeriod = 300000; // 5分钟冷却期
    }
    
    // 检查是否需要发送告警
    shouldSendAlert(alert) {
        const key = this.generateAlertKey(alert);
        const now = Date.now();
        
        if (this.alertCache.has(key)) {
            const cached = this.alertCache.get(key);
            
            // 如果在冷却期内,且是相同的告警,则不发送
            if (now - cached.timestamp < this.cooldownPeriod) {
                return false;
            }
        }
        
        // 更新缓存
        this.alertCache.set(key, {
            timestamp: now,
            alert: alert
        });
        
        return true;
    }
    
    // 生成告警键
    generateAlertKey(alert) {
        return `${alert.title}_${alert.severity}_${alert.service}`;
    }
    
    // 清理过期缓存
    cleanup() {
        const now = Date.now();
        for (const [key, cached] of this.alertCache.entries()) {
            if (now - cached.timestamp > this.cooldownPeriod * 2) {
                this.alertCache.delete(key);
            }
        }
    }
    
    // 定期清理
    startCleanupTimer() {
        setInterval(() => {
            this.cleanup();
        }, this.cooldownPeriod);
    }
}

module.exports = new AlertDeduplicator();

实际部署与最佳实践

Docker化部署方案

# Dockerfile
FROM node:16-alpine

WORKDIR /app

COPY package*.json ./
RUN npm ci --only=production

COPY . .

EXPOSE 3000

# 健康检查
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
  CMD curl -f http://localhost:3000/health || exit 1

CMD ["node", "app.js"]
# docker-compose.yml
version: '3.8'

services:
  nodejs-app:
    build: .
    ports:
      - "3000:3000"
    environment:
      - NODE_ENV=production
      - SERVICE_NAME=my-nodejs-app
      - INSTANCE_ID=${INSTANCE_ID}
      - DINGTALK_WEBHOOK=${DINGTALK_WEBHOOK}
      - EMAIL_ALERTS=true
      - SMTP_HOST=smtp.gmail.com
      - SMTP_PORT=587
      - SMTP_SECURE=false
    volumes:
      - ./logs:/app/logs
    restart: unless-stopped
    
  prometheus:
    image: prom/prometheus:v2.37.0
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
      - prometheus_data:/prometheus
    restart: unless-stopped
    
  grafana:
    image: grafana/grafana-enterprise:9.5.0
    ports:
      - "3001:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin
      - GF_USERS_ALLOW_SIGN_UP=false
    volumes:
      - grafana_data:/var/lib/grafana
      - ./grafana-dashboards:/etc/grafana/provisioning/dashboards
      - ./grafana-datasources:/etc/grafana/provisioning/datasources
    restart: unless-stopped

volumes:
  prometheus_data:
  grafana_data:

性能

相关推荐
广告位招租

相似文章

    评论 (0)

    0/2000