Node.js · #cluster#nodejs#pm2

Node.js集群与PM2进程管理

2025.06.25 8 min 3.0k
// 目录 · contents

前言

Node.js是单线程运行的,无法充分利用多核CPU。Cluster模块允许创建多个工作进程,共享同一个端口,从而实现多核利用和高可用。PM2是最流行的Node.js进程管理器,提供了集群管理、监控、日志和零停机部署等功能。本文将深入分析这两者的原理和使用方法。

Cluster模块原理

graph TB
    subgraph "Cluster Architecture"
        MASTER[Master Process<br>pid: 1234] --> |fork| W1[Worker 1<br>pid: 1235]
        MASTER --> |fork| W2[Worker 2<br>pid: 1236]
        MASTER --> |fork| W3[Worker 3<br>pid: 1237]
        MASTER --> |fork| W4[Worker 4<br>pid: 1238]

        CLIENT[Client Requests] --> |port 3000| MASTER
        MASTER --> |Round-Robin| W1
        MASTER --> |Round-Robin| W2
        MASTER --> |Round-Robin| W3
        MASTER --> |Round-Robin| W4
    end

Master进程通过child_process.fork()创建Worker进程。在Linux上,默认使用Round-Robin负载均衡(轮询分发请求)。

基础使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
const cluster = require('cluster');
const http = require('http');
const os = require('os');

if (cluster.isPrimary) {
const numCPUs = os.cpus().length;
console.log(`Master ${process.pid} is running`);
console.log(`Forking ${numCPUs} workers...`);

// 创建Worker进程
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}

// Worker退出时重启
cluster.on('exit', (worker, code, signal) => {
console.log(`Worker ${worker.process.pid} died (${signal || code})`);
console.log('Starting a new worker...');
cluster.fork();
});

// 监听Worker上线
cluster.on('online', (worker) => {
console.log(`Worker ${worker.process.pid} is online`);
});

} else {
// Worker进程创建HTTP服务器
const server = http.createServer((req, res) => {
res.writeHead(200);
res.end(`Hello from Worker ${process.pid}\n`);
});

server.listen(3000, () => {
console.log(`Worker ${process.pid} listening on port 3000`);
});
}

Master-Worker通信

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// Master和Worker之间使用IPC通信
if (cluster.isPrimary) {
const worker = cluster.fork();

// Master发送消息给Worker
worker.send({ type: 'config', data: { timeout: 5000 } });

// Master接收Worker的消息
worker.on('message', (msg) => {
console.log(`Message from worker ${worker.id}:`, msg);
});

// 广播给所有Worker
function broadcast(message) {
for (const id in cluster.workers) {
cluster.workers[id].send(message);
}
}

} else {
// Worker接收Master的消息
process.on('message', (msg) => {
if (msg.type === 'config') {
console.log('Received config:', msg.data);
}
});

// Worker发送消息给Master
process.send({ type: 'stats', data: { requestCount: 100 } });
}

优雅关闭(Graceful Shutdown)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
const cluster = require('cluster');
const http = require('http');

if (cluster.isPrimary) {
// ... fork workers ...

// 处理进程信号
process.on('SIGTERM', () => {
console.log('Master received SIGTERM, shutting down gracefully...');

for (const id in cluster.workers) {
cluster.workers[id].send({ type: 'shutdown' });
}

// 给Worker一些时间完成请求
setTimeout(() => {
console.log('Force shutdown');
process.exit(0);
}, 30000);
});

} else {
const server = http.createServer((req, res) => {
// 模拟请求处理
setTimeout(() => {
res.writeHead(200);
res.end('OK');
}, 100);
});

server.listen(3000);

// Worker收到关闭信号
process.on('message', (msg) => {
if (msg.type === 'shutdown') {
console.log(`Worker ${process.pid} shutting down...`);

// 停止接受新连接
server.close(() => {
console.log(`Worker ${process.pid} closed`);
process.exit(0);
});

// 超时强制退出
setTimeout(() => {
console.log(`Worker ${process.pid} force exit`);
process.exit(1);
}, 10000);
}
});
}

零停机重启

sequenceDiagram
    participant M as Master
    participant W1 as Worker 1 (旧)
    participant W2 as Worker 2 (旧)
    participant W3 as Worker 1 (新)
    participant W4 as Worker 2 (新)

    Note over M: 收到重启信号
    M->>W1: disconnect()
    Note over W1: 停止接受新连接<br>完成现有请求

    M->>W3: fork() 新Worker
    Note over W3: 启动并开始接受请求

    W1->>M: exit
    Note over W1: 旧Worker退出

    M->>W2: disconnect()
    M->>W4: fork() 新Worker
    Note over W4: 启动并开始接受请求

    W2->>M: exit
    Note over M: 所有旧Worker已替换<br>零停机完成
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// 零停机重启实现
if (cluster.isPrimary) {
const workers = [];
const numCPUs = os.cpus().length;

function forkWorker() {
const worker = cluster.fork();
workers.push(worker);
return worker;
}

// 初始化
for (let i = 0; i < numCPUs; i++) {
forkWorker();
}

// 逐个重启Worker
async function rollingRestart() {
const oldWorkers = [...Object.values(cluster.workers)];

for (const worker of oldWorkers) {
// 启动新Worker
const newWorker = forkWorker();

// 等待新Worker就绪
await new Promise((resolve) => {
newWorker.on('listening', resolve);
});

// 关闭旧Worker
worker.disconnect();

// 等待旧Worker退出
await new Promise((resolve) => {
worker.on('exit', resolve);
});

console.log(`Replaced worker ${worker.process.pid} with ${newWorker.process.pid}`);
}

console.log('Rolling restart complete');
}

// 收到SIGUSR2信号时触发重启
process.on('SIGUSR2', () => {
console.log('Received SIGUSR2, starting rolling restart...');
rollingRestart();
});
}

PM2进程管理器

安装和基本使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 全局安装PM2
npm install -g pm2

# 启动应用(单进程)
pm2 start app.js --name my-app

# 集群模式启动(自动利用所有CPU核心)
pm2 start app.js -i max --name my-app
# -i 0 或 -i max: 使用所有CPU核心
# -i 4: 使用4个进程

# 查看运行状态
pm2 list
pm2 status

# 查看详细信息
pm2 show my-app

# 查看日志
pm2 logs my-app
pm2 logs --lines 100

# 监控面板
pm2 monit

生态系统配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// ecosystem.config.js
module.exports = {
apps: [{
name: 'api-server',
script: './src/server.js',
instances: 'max', // 集群模式,使用所有CPU
exec_mode: 'cluster', // 集群模式
autorestart: true, // 自动重启
watch: false, // 生产环境不要开watch
max_memory_restart: '1G', // 内存超过1G自动重启
max_restarts: 10, // 最大重启次数
min_uptime: '10s', // 最小运行时间(防止启动循环)

// 环境变量
env: {
NODE_ENV: 'development',
PORT: 3000,
},
env_production: {
NODE_ENV: 'production',
PORT: 8080,
},

// 日志配置
log_date_format: 'YYYY-MM-DD HH:mm:ss Z',
error_file: './logs/error.log',
out_file: './logs/output.log',
merge_logs: true, // 集群模式合并日志

// 优雅关闭
kill_timeout: 10000, // 给进程10秒完成请求
listen_timeout: 5000, // 等待5秒确认进程在监听
shutdown_with_message: true,

// 指数退避重启
exp_backoff_restart_delay: 100,
}, {
name: 'worker',
script: './src/worker.js',
instances: 2,
exec_mode: 'cluster',
cron_restart: '0 */6 * * *', // 每6小时重启一次
env_production: {
NODE_ENV: 'production',
},
}],
};
1
2
3
4
5
# 使用配置文件启动
pm2 start ecosystem.config.js --env production

# 重载(零停机)
pm2 reload ecosystem.config.js --env production

PM2零停机部署

flowchart TB
    TRIGGER[触发部署] --> PULL[拉取最新代码]
    PULL --> INSTALL[安装依赖]
    INSTALL --> BUILD[构建项目]
    BUILD --> RELOAD[PM2 reload]

    subgraph "PM2 Reload过程"
        RELOAD --> NEW1[启动新Worker 1]
        NEW1 --> |就绪| KILL1[关闭旧Worker 1]
        KILL1 --> NEW2[启动新Worker 2]
        NEW2 --> |就绪| KILL2[关闭旧Worker 2]
        KILL2 --> DONE[完成]
    end

    Note over RELOAD,DONE: 整个过程不中断服务
1
2
3
4
# PM2 reload vs restart
pm2 restart my-app # 先停止所有进程,再启动(有停机时间)
pm2 reload my-app # 逐个重启进程(零停机)
pm2 gracefulReload my-app # 发送shutdown消息后等待

在应用中配合PM2实现优雅关闭

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
const http = require('http');

const server = http.createServer((req, res) => {
res.writeHead(200);
res.end('Hello World');
});

server.listen(process.env.PORT || 3000, () => {
console.log(`Worker ${process.pid} listening`);

// 通知PM2进程已就绪(配合wait_ready使用)
if (process.send) {
process.send('ready');
}
});

// PM2优雅关闭
process.on('SIGINT', gracefulShutdown);
process.on('SIGTERM', gracefulShutdown);

// PM2 shutdown_with_message模式
process.on('message', (msg) => {
if (msg === 'shutdown') {
gracefulShutdown();
}
});

let isShuttingDown = false;

function gracefulShutdown() {
if (isShuttingDown) return;
isShuttingDown = true;

console.log(`Worker ${process.pid}: Graceful shutdown initiated`);

// 1. 停止接受新连接
server.close(() => {
console.log(`Worker ${process.pid}: All connections closed`);

// 2. 关闭数据库连接等资源
// await db.close();
// await redis.quit();

// 3. 退出进程
process.exit(0);
});

// 4. 超时强制退出
setTimeout(() => {
console.error(`Worker ${process.pid}: Force shutdown after timeout`);
process.exit(1);
}, 8000);
}

PM2监控与日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 实时监控
pm2 monit

# 查看运行指标
pm2 show my-app
# 输出包含:
# - CPU使用率
# - 内存使用
# - 重启次数
# - 运行时间
# - 日志路径

# 日志管理
pm2 logs # 查看所有日志
pm2 logs my-app --lines 200 # 查看最近200行
pm2 flush # 清空日志

# 使用pm2-logrotate进行日志轮转
pm2 install pm2-logrotate
pm2 set pm2-logrotate:max_size 10M
pm2 set pm2-logrotate:retain 7
pm2 set pm2-logrotate:compress true
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 自定义监控指标
const io = require('@pm2/io');

// 自定义指标
const requestCounter = io.counter({
name: 'Request Count',
});

const responseTime = io.histogram({
name: 'Response Time',
measurement: 'mean',
});

// 使用
app.use((req, res, next) => {
requestCounter.inc();
const start = Date.now();

res.on('finish', () => {
responseTime.update(Date.now() - start);
});

next();
});

// 自定义动作
io.action('clear-cache', (reply) => {
cache.clear();
reply({ success: true });
});
// 触发: pm2 trigger my-app clear-cache

PM2开机自启

1
2
3
4
5
6
7
8
9
# 生成开机脚本
pm2 startup
# 按照提示执行输出的命令

# 保存当前进程列表
pm2 save

# 重启后自动恢复
# PM2会在系统启动时自动运行pm2 resurrect

负载均衡策略

graph TB
    subgraph "Round-Robin (Linux默认)"
        RR_M[Master] -->|请求1| RR_W1[Worker 1]
        RR_M -->|请求2| RR_W2[Worker 2]
        RR_M -->|请求3| RR_W3[Worker 3]
        RR_M -->|请求4| RR_W1
    end

    subgraph "Nginx反向代理 (推荐)"
        NGINX[Nginx] -->|upstream| N1[Node :3001]
        NGINX -->|upstream| N2[Node :3002]
        NGINX -->|upstream| N3[Node :3003]
        NGINX -->|upstream| N4[Node :3004]
    end
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# Nginx负载均衡配置
upstream nodejs_cluster {
least_conn; # 最少连接数策略

server 127.0.0.1:3001;
server 127.0.0.1:3002;
server 127.0.0.1:3003;
server 127.0.0.1:3004;

keepalive 64; # 保持连接池
}

server {
listen 80;

location / {
proxy_pass http://nodejs_cluster;
proxy_http_version 1.1;
proxy_set_header Connection "";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
}
}

Cluster模式下的共享状态

graph TB
    subgraph "问题:每个Worker有自己的内存"
        W1[Worker 1<br>session: {a:1}]
        W2[Worker 2<br>session: {}]
        W3[Worker 3<br>session: {}]
        NOTE[用户登录在Worker 1<br>下次请求到Worker 2时Session丢失]
    end

    subgraph "解决方案:外部存储"
        WA[Worker 1] --> REDIS[(Redis)]
        WB[Worker 2] --> REDIS
        WC[Worker 3] --> REDIS
        NOTE2[所有Worker共享同一个Redis中的Session]
    end
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 使用Redis共享Session
const session = require('express-session');
const RedisStore = require('connect-redis').default;
const { createClient } = require('redis');

const redisClient = createClient({ url: 'redis://localhost:6379' });
redisClient.connect();

app.use(session({
store: new RedisStore({ client: redisClient }),
secret: process.env.SESSION_SECRET,
resave: false,
saveUninitialized: false,
cookie: {
secure: true,
httpOnly: true,
maxAge: 24 * 60 * 60 * 1000, // 24小时
},
}));

总结

Node.js集群和PM2的核心要点:

  1. Cluster模块:通过fork创建多个Worker进程,共享端口,利用多核CPU
  2. 负载均衡:Linux默认Round-Robin,生产环境建议Nginx反向代理
  3. PM2集群模式pm2 start app.js -i max一键启动集群
  4. 零停机部署pm2 reload逐个重启Worker,不中断服务
  5. 优雅关闭:监听SIGTERM信号,先停止接受新连接,等待现有请求完成后退出
  6. 共享状态:使用Redis等外部存储解决多进程间状态共享问题
  7. 监控:PM2 monit实时监控,自定义指标和动作

在生产环境中,PM2 + Nginx的组合是Node.js部署的常见方案,能够提供高可用性和良好的性能。

作者 · authorzt
发布 · date2025-06-25
篇幅 · length3.0k 字 · 8 min
许可 · licenseCC BY-SA 4.0
$ echo "comments" · 评论