Node.js · #pipeline#stream#nodejs

Node.js Stream编程实战

2025.06.11 7 min 2.8k
// 目录 · contents

前言

Stream(流)是Node.js中处理大量数据的核心抽象。与一次性加载全部数据到内存不同,流以小块(chunk)的方式逐步处理数据,内存使用恒定。无论是文件处理、网络通信还是数据转换,流都是Node.js的核心基础设施。

为什么需要Stream?

1
2
3
4
5
6
7
8
9
10
11
12
// 不使用流:一次性读取整个文件到内存
const fs = require('fs');

// 读取一个2GB文件 - 内存直接爆炸
const data = fs.readFileSync('huge-file.csv'); // 需要2GB内存
processData(data);

// 使用流:分块处理,内存占用恒定
const stream = fs.createReadStream('huge-file.csv');
stream.on('data', (chunk) => {
processChunk(chunk); // 每次处理64KB,内存恒定
});
graph LR
    subgraph "不使用Stream"
        FILE1[2GB文件] -->|全部读入| MEM1[2GB内存]
        MEM1 --> PROC1[处理]
    end

    subgraph "使用Stream"
        FILE2[2GB文件] -->|64KB块| MEM2[64KB内存]
        MEM2 --> PROC2[处理]
        PROC2 --> |下一块| MEM2
    end

四种流类型

graph TB
    subgraph "Readable"
        R[可读流] --> |产生数据| OUT1[data events]
        R_EX["fs.createReadStream<br>http.IncomingMessage<br>process.stdin"]
    end

    subgraph "Writable"
        IN1[write calls] --> W[可写流]
        W_EX["fs.createWriteStream<br>http.ServerResponse<br>process.stdout"]
    end

    subgraph "Transform"
        IN2[input] --> T[转换流] --> OUT2[output]
        T_EX["zlib.createGzip<br>crypto.createCipher<br>自定义转换"]
    end

    subgraph "Duplex"
        IN3[input] --> D[双工流] --> OUT3[output]
        D_EX["net.Socket<br>tls.TLSSocket"]
    end

Readable Stream

两种模式

stateDiagram-v2
    [*] --> Paused: 默认状态
    Paused --> Flowing: stream.resume()<br>stream.on('data')<br>stream.pipe()
    Flowing --> Paused: stream.pause()<br>stream.unpipe()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
const { Readable } = require('stream');

// 方式1: 继承Readable
class Counter extends Readable {
constructor(max, options) {
super(options);
this.max = max;
this.current = 0;
}

_read(size) {
if (this.current >= this.max) {
this.push(null); // null表示流结束
return;
}
this.current++;
// push的数据进入内部缓冲区
this.push(`Count: ${this.current}\n`);
}
}

const counter = new Counter(5);
counter.pipe(process.stdout);
// Count: 1
// Count: 2
// ...
// Count: 5

// 方式2: Readable.from(从可迭代对象创建流)
const { Readable } = require('stream');

async function* generateData() {
for (let i = 0; i < 1000; i++) {
yield JSON.stringify({ id: i, value: Math.random() }) + '\n';
}
}

const dataStream = Readable.from(generateData());
dataStream.pipe(process.stdout);

异步迭代器(推荐方式)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
const fs = require('fs');
const readline = require('readline');

// 使用async iterator逐行读取大文件
async function processFile(filePath) {
const fileStream = fs.createReadStream(filePath);
const rl = readline.createInterface({
input: fileStream,
crlfDelay: Infinity,
});

let lineCount = 0;
for await (const line of rl) {
lineCount++;
// 处理每一行
if (line.includes('ERROR')) {
console.log(`Line ${lineCount}: ${line}`);
}
}
console.log(`Total lines: ${lineCount}`);
}

// Readable流本身也是async iterable
async function readChunks(filePath) {
const stream = fs.createReadStream(filePath);
for await (const chunk of stream) {
console.log(`Received ${chunk.length} bytes`);
}
}

Writable Stream

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
const { Writable } = require('stream');

// 自定义Writable
class Logger extends Writable {
constructor(options) {
super(options);
this.logs = [];
}

_write(chunk, encoding, callback) {
const message = chunk.toString();
const logEntry = {
timestamp: new Date().toISOString(),
message: message.trim(),
};
this.logs.push(logEntry);

// 模拟异步写入
setImmediate(() => {
console.log(JSON.stringify(logEntry));
callback(); // 必须调用callback通知写入完成
});
}

// 批量写入优化
_writev(chunks, callback) {
const entries = chunks.map(({ chunk }) => ({
timestamp: new Date().toISOString(),
message: chunk.toString().trim(),
}));
this.logs.push(...entries);
console.log(`Batch write: ${entries.length} entries`);
callback();
}
}

const logger = new Logger();
logger.write('Hello\n');
logger.write('World\n');
logger.end('Done\n'); // end()会触发finish事件
logger.on('finish', () => console.log('All writes completed'));

Transform Stream

Transform流既是Readable也是Writable,接收输入并产生输出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
const { Transform } = require('stream');

// CSV到JSON的转换流
class CSVToJSON extends Transform {
constructor(options) {
super({ ...options, readableObjectMode: true });
this.headers = null;
this.buffer = '';
}

_transform(chunk, encoding, callback) {
this.buffer += chunk.toString();
const lines = this.buffer.split('\n');
this.buffer = lines.pop(); // 保留不完整的行

for (const line of lines) {
if (!line.trim()) continue;

const values = line.split(',').map(v => v.trim());

if (!this.headers) {
this.headers = values;
continue;
}

const obj = {};
this.headers.forEach((header, i) => {
obj[header] = values[i];
});
this.push(JSON.stringify(obj) + '\n');
}
callback();
}

_flush(callback) {
// 处理最后一行(如果有)
if (this.buffer.trim() && this.headers) {
const values = this.buffer.split(',').map(v => v.trim());
const obj = {};
this.headers.forEach((header, i) => {
obj[header] = values[i];
});
this.push(JSON.stringify(obj) + '\n');
}
callback();
}
}

// 使用
const fs = require('fs');
fs.createReadStream('data.csv')
.pipe(new CSVToJSON())
.pipe(fs.createWriteStream('data.jsonl'));

实用Transform示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
const { Transform } = require('stream');

// 进度报告Transform
class ProgressReporter extends Transform {
constructor(totalSize, options) {
super(options);
this.totalSize = totalSize;
this.transferred = 0;
}

_transform(chunk, encoding, callback) {
this.transferred += chunk.length;
const percent = ((this.transferred / this.totalSize) * 100).toFixed(1);
process.stderr.write(`\rProgress: ${percent}%`);
this.push(chunk); // 透传数据
callback();
}

_flush(callback) {
process.stderr.write('\n');
callback();
}
}

// 数据过滤Transform
class JSONFilter extends Transform {
constructor(predicate, options) {
super({ ...options, objectMode: true });
this.predicate = predicate;
}

_transform(obj, encoding, callback) {
if (this.predicate(obj)) {
this.push(obj);
}
callback();
}
}

背压(Backpressure)

当写入速度快于读取速度时,数据会在内存中积压。背压机制用于协调生产者和消费者的速度。

sequenceDiagram
    participant R as Readable (快速)
    participant W as Writable (慢速)

    R->>W: write(chunk1) → true
    R->>W: write(chunk2) → true
    R->>W: write(chunk3) → false (缓冲区满)
    Note over R: 收到false,暂停读取

    Note over W: 处理积压的数据...

    W->>R: drain事件
    Note over R: 收到drain,恢复读取

    R->>W: write(chunk4) → true
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
const fs = require('fs');

// 手动处理背压
function copyFile(src, dest) {
const readable = fs.createReadStream(src);
const writable = fs.createWriteStream(dest);

readable.on('data', (chunk) => {
const canContinue = writable.write(chunk);
if (!canContinue) {
// 写入缓冲区满,暂停读取
readable.pause();
writable.once('drain', () => {
// 缓冲区排空,恢复读取
readable.resume();
});
}
});

readable.on('end', () => {
writable.end();
});
}

// pipe自动处理背压(推荐)
function copyFilePipe(src, dest) {
fs.createReadStream(src).pipe(fs.createWriteStream(dest));
// pipe内部自动处理drain/pause
}

pipeline(推荐方式)

stream.pipeline()是pipe的改进版,正确处理错误传播和清理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
const { pipeline } = require('stream/promises');
const fs = require('fs');
const zlib = require('zlib');
const { Transform } = require('stream');

// 读取文件 → 转换 → 压缩 → 写入
async function processAndCompress(inputPath, outputPath) {
const upperCase = new Transform({
transform(chunk, encoding, callback) {
callback(null, chunk.toString().toUpperCase());
}
});

await pipeline(
fs.createReadStream(inputPath),
upperCase,
zlib.createGzip(),
fs.createWriteStream(outputPath)
);
console.log('Pipeline complete');
}

// 错误处理
async function safePipeline() {
try {
await pipeline(
fs.createReadStream('input.txt'),
zlib.createGzip(),
fs.createWriteStream('output.gz')
);
} catch (err) {
console.error('Pipeline failed:', err.message);
// pipeline会自动清理所有流
}
}

pipeline vs pipe

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// pipe的问题:错误处理不方便
readable
.pipe(transform1) // transform1出错不会传播
.pipe(transform2) // transform2出错不会传播
.pipe(writable); // 需要在每个流上单独监听error

// pipeline的优势:统一错误处理+自动清理
const { pipeline } = require('stream');

pipeline(
readable,
transform1,
transform2,
writable,
(err) => {
if (err) {
console.error('Pipeline failed:', err);
} else {
console.log('Pipeline succeeded');
}
}
);

stream.compose(Node.js 16+)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
const { compose } = require('stream');
const { Transform } = require('stream');

// 将多个Transform组合成一个
function createProcessingPipeline() {
const parse = new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
try {
callback(null, JSON.parse(chunk.toString()));
} catch (e) {
callback(e);
}
}
});

const filter = new Transform({
objectMode: true,
transform(obj, encoding, callback) {
if (obj.status === 'active') {
callback(null, obj);
} else {
callback(); // 跳过
}
}
});

const format = new Transform({
objectMode: true,
transform(obj, encoding, callback) {
callback(null, `${obj.id}: ${obj.name}\n`);
}
});

// 组合成单个可复用的流
return compose(parse, filter, format);
}

// 使用组合流
const processor = createProcessingPipeline();
fs.createReadStream('data.jsonl')
.pipe(processor)
.pipe(process.stdout);

实战案例

HTTP流式响应

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
const http = require('http');
const fs = require('fs');
const { pipeline } = require('stream/promises');
const zlib = require('zlib');

const server = http.createServer(async (req, res) => {
if (req.url === '/download') {
const filePath = '/data/large-file.csv';
const stat = fs.statSync(filePath);

res.writeHead(200, {
'Content-Type': 'text/csv',
'Content-Encoding': 'gzip',
'Transfer-Encoding': 'chunked',
});

try {
await pipeline(
fs.createReadStream(filePath),
zlib.createGzip(),
res
);
} catch (err) {
if (!res.headersSent) {
res.writeHead(500);
res.end('Internal Server Error');
}
}
}
});

日志文件实时分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
const fs = require('fs');
const readline = require('readline');
const { Transform } = require('stream');

class LogAnalyzer extends Transform {
constructor() {
super({ objectMode: true });
this.stats = { total: 0, errors: 0, warnings: 0 };
}

_transform(line, encoding, callback) {
this.stats.total++;

if (line.includes('ERROR')) {
this.stats.errors++;
this.push({ type: 'error', line, stats: { ...this.stats } });
} else if (line.includes('WARN')) {
this.stats.warnings++;
}
callback();
}

_flush(callback) {
this.push({ type: 'summary', stats: { ...this.stats } });
callback();
}
}

async function analyzeLog(filePath) {
const rl = readline.createInterface({
input: fs.createReadStream(filePath),
crlfDelay: Infinity,
});

const analyzer = new LogAnalyzer();

for await (const line of rl) {
analyzer.write(line);
}
analyzer.end();

for await (const result of analyzer) {
if (result.type === 'error') {
console.log(`[ERROR] ${result.line}`);
} else if (result.type === 'summary') {
console.log(`\nSummary: ${JSON.stringify(result.stats)}`);
}
}
}

大文件分块上传

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
const { Readable } = require('stream');
const fs = require('fs');

async function uploadInChunks(filePath, chunkSize = 5 * 1024 * 1024) {
const fileStream = fs.createReadStream(filePath, {
highWaterMark: chunkSize,
});

let partNumber = 0;
for await (const chunk of fileStream) {
partNumber++;
console.log(`Uploading part ${partNumber}: ${chunk.length} bytes`);

await uploadPart(partNumber, chunk);
}

console.log(`Upload complete: ${partNumber} parts`);
}

async function uploadPart(partNumber, data) {
// 模拟上传
return new Promise(resolve => setTimeout(resolve, 100));
}

总结

Node.js Stream编程的核心要点:

  1. 四种流类型:Readable、Writable、Transform、Duplex,各有使用场景
  2. 背压机制:自动协调生产者和消费者速度,防止内存溢出
  3. pipeline优于pipe:统一错误处理,自动清理资源
  4. 异步迭代器for await...of是消费Readable流的最优雅方式
  5. stream.compose:将多个Transform组合成可复用的单元
  6. 实际应用:文件处理、HTTP流式响应、日志分析、数据转换

流是Node.js处理大数据量的核心工具,掌握流编程能让你的应用在处理大文件和高并发场景下保持优秀的内存效率。

作者 · authorzt
发布 · date2025-06-11
篇幅 · length2.8k 字 · 7 min
许可 · licenseCC BY-SA 4.0
$ echo "comments" · 评论