Node.js Stream编程实战
// 目录 · contents
前言
Stream(流)是Node.js中处理大量数据的核心抽象。与一次性加载全部数据到内存不同,流以小块(chunk)的方式逐步处理数据,内存使用恒定。无论是文件处理、网络通信还是数据转换,流都是Node.js的核心基础设施。
为什么需要Stream?
1 2 3 4 5 6 7 8 9 10 11 12
| const fs = require('fs');
const data = fs.readFileSync('huge-file.csv'); processData(data);
const stream = fs.createReadStream('huge-file.csv'); stream.on('data', (chunk) => { processChunk(chunk); });
|
graph LR
subgraph "不使用Stream"
FILE1[2GB文件] -->|全部读入| MEM1[2GB内存]
MEM1 --> PROC1[处理]
end
subgraph "使用Stream"
FILE2[2GB文件] -->|64KB块| MEM2[64KB内存]
MEM2 --> PROC2[处理]
PROC2 --> |下一块| MEM2
end
四种流类型
graph TB
subgraph "Readable"
R[可读流] --> |产生数据| OUT1[data events]
R_EX["fs.createReadStream<br>http.IncomingMessage<br>process.stdin"]
end
subgraph "Writable"
IN1[write calls] --> W[可写流]
W_EX["fs.createWriteStream<br>http.ServerResponse<br>process.stdout"]
end
subgraph "Transform"
IN2[input] --> T[转换流] --> OUT2[output]
T_EX["zlib.createGzip<br>crypto.createCipher<br>自定义转换"]
end
subgraph "Duplex"
IN3[input] --> D[双工流] --> OUT3[output]
D_EX["net.Socket<br>tls.TLSSocket"]
end
Readable Stream
两种模式
stateDiagram-v2
[*] --> Paused: 默认状态
Paused --> Flowing: stream.resume()<br>stream.on('data')<br>stream.pipe()
Flowing --> Paused: stream.pause()<br>stream.unpipe()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| const { Readable } = require('stream');
class Counter extends Readable { constructor(max, options) { super(options); this.max = max; this.current = 0; }
_read(size) { if (this.current >= this.max) { this.push(null); return; } this.current++; this.push(`Count: ${this.current}\n`); } }
const counter = new Counter(5); counter.pipe(process.stdout);
const { Readable } = require('stream');
async function* generateData() { for (let i = 0; i < 1000; i++) { yield JSON.stringify({ id: i, value: Math.random() }) + '\n'; } }
const dataStream = Readable.from(generateData()); dataStream.pipe(process.stdout);
|
异步迭代器(推荐方式)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| const fs = require('fs'); const readline = require('readline');
async function processFile(filePath) { const fileStream = fs.createReadStream(filePath); const rl = readline.createInterface({ input: fileStream, crlfDelay: Infinity, });
let lineCount = 0; for await (const line of rl) { lineCount++; if (line.includes('ERROR')) { console.log(`Line ${lineCount}: ${line}`); } } console.log(`Total lines: ${lineCount}`); }
async function readChunks(filePath) { const stream = fs.createReadStream(filePath); for await (const chunk of stream) { console.log(`Received ${chunk.length} bytes`); } }
|
Writable Stream
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| const { Writable } = require('stream');
class Logger extends Writable { constructor(options) { super(options); this.logs = []; }
_write(chunk, encoding, callback) { const message = chunk.toString(); const logEntry = { timestamp: new Date().toISOString(), message: message.trim(), }; this.logs.push(logEntry);
setImmediate(() => { console.log(JSON.stringify(logEntry)); callback(); }); }
_writev(chunks, callback) { const entries = chunks.map(({ chunk }) => ({ timestamp: new Date().toISOString(), message: chunk.toString().trim(), })); this.logs.push(...entries); console.log(`Batch write: ${entries.length} entries`); callback(); } }
const logger = new Logger(); logger.write('Hello\n'); logger.write('World\n'); logger.end('Done\n'); logger.on('finish', () => console.log('All writes completed'));
|
Transform流既是Readable也是Writable,接收输入并产生输出。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| const { Transform } = require('stream');
class CSVToJSON extends Transform { constructor(options) { super({ ...options, readableObjectMode: true }); this.headers = null; this.buffer = ''; }
_transform(chunk, encoding, callback) { this.buffer += chunk.toString(); const lines = this.buffer.split('\n'); this.buffer = lines.pop();
for (const line of lines) { if (!line.trim()) continue;
const values = line.split(',').map(v => v.trim());
if (!this.headers) { this.headers = values; continue; }
const obj = {}; this.headers.forEach((header, i) => { obj[header] = values[i]; }); this.push(JSON.stringify(obj) + '\n'); } callback(); }
_flush(callback) { if (this.buffer.trim() && this.headers) { const values = this.buffer.split(',').map(v => v.trim()); const obj = {}; this.headers.forEach((header, i) => { obj[header] = values[i]; }); this.push(JSON.stringify(obj) + '\n'); } callback(); } }
const fs = require('fs'); fs.createReadStream('data.csv') .pipe(new CSVToJSON()) .pipe(fs.createWriteStream('data.jsonl'));
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| const { Transform } = require('stream');
class ProgressReporter extends Transform { constructor(totalSize, options) { super(options); this.totalSize = totalSize; this.transferred = 0; }
_transform(chunk, encoding, callback) { this.transferred += chunk.length; const percent = ((this.transferred / this.totalSize) * 100).toFixed(1); process.stderr.write(`\rProgress: ${percent}%`); this.push(chunk); callback(); }
_flush(callback) { process.stderr.write('\n'); callback(); } }
class JSONFilter extends Transform { constructor(predicate, options) { super({ ...options, objectMode: true }); this.predicate = predicate; }
_transform(obj, encoding, callback) { if (this.predicate(obj)) { this.push(obj); } callback(); } }
|
背压(Backpressure)
当写入速度快于读取速度时,数据会在内存中积压。背压机制用于协调生产者和消费者的速度。
sequenceDiagram
participant R as Readable (快速)
participant W as Writable (慢速)
R->>W: write(chunk1) → true
R->>W: write(chunk2) → true
R->>W: write(chunk3) → false (缓冲区满)
Note over R: 收到false,暂停读取
Note over W: 处理积压的数据...
W->>R: drain事件
Note over R: 收到drain,恢复读取
R->>W: write(chunk4) → true
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| const fs = require('fs');
function copyFile(src, dest) { const readable = fs.createReadStream(src); const writable = fs.createWriteStream(dest);
readable.on('data', (chunk) => { const canContinue = writable.write(chunk); if (!canContinue) { readable.pause(); writable.once('drain', () => { readable.resume(); }); } });
readable.on('end', () => { writable.end(); }); }
function copyFilePipe(src, dest) { fs.createReadStream(src).pipe(fs.createWriteStream(dest)); }
|
pipeline(推荐方式)
stream.pipeline()是pipe的改进版,正确处理错误传播和清理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| const { pipeline } = require('stream/promises'); const fs = require('fs'); const zlib = require('zlib'); const { Transform } = require('stream');
async function processAndCompress(inputPath, outputPath) { const upperCase = new Transform({ transform(chunk, encoding, callback) { callback(null, chunk.toString().toUpperCase()); } });
await pipeline( fs.createReadStream(inputPath), upperCase, zlib.createGzip(), fs.createWriteStream(outputPath) ); console.log('Pipeline complete'); }
async function safePipeline() { try { await pipeline( fs.createReadStream('input.txt'), zlib.createGzip(), fs.createWriteStream('output.gz') ); } catch (err) { console.error('Pipeline failed:', err.message); } }
|
pipeline vs pipe
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| readable .pipe(transform1) .pipe(transform2) .pipe(writable);
const { pipeline } = require('stream');
pipeline( readable, transform1, transform2, writable, (err) => { if (err) { console.error('Pipeline failed:', err); } else { console.log('Pipeline succeeded'); } } );
|
stream.compose(Node.js 16+)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| const { compose } = require('stream'); const { Transform } = require('stream');
function createProcessingPipeline() { const parse = new Transform({ objectMode: true, transform(chunk, encoding, callback) { try { callback(null, JSON.parse(chunk.toString())); } catch (e) { callback(e); } } });
const filter = new Transform({ objectMode: true, transform(obj, encoding, callback) { if (obj.status === 'active') { callback(null, obj); } else { callback(); } } });
const format = new Transform({ objectMode: true, transform(obj, encoding, callback) { callback(null, `${obj.id}: ${obj.name}\n`); } });
return compose(parse, filter, format); }
const processor = createProcessingPipeline(); fs.createReadStream('data.jsonl') .pipe(processor) .pipe(process.stdout);
|
实战案例
HTTP流式响应
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| const http = require('http'); const fs = require('fs'); const { pipeline } = require('stream/promises'); const zlib = require('zlib');
const server = http.createServer(async (req, res) => { if (req.url === '/download') { const filePath = '/data/large-file.csv'; const stat = fs.statSync(filePath);
res.writeHead(200, { 'Content-Type': 'text/csv', 'Content-Encoding': 'gzip', 'Transfer-Encoding': 'chunked', });
try { await pipeline( fs.createReadStream(filePath), zlib.createGzip(), res ); } catch (err) { if (!res.headersSent) { res.writeHead(500); res.end('Internal Server Error'); } } } });
|
日志文件实时分析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| const fs = require('fs'); const readline = require('readline'); const { Transform } = require('stream');
class LogAnalyzer extends Transform { constructor() { super({ objectMode: true }); this.stats = { total: 0, errors: 0, warnings: 0 }; }
_transform(line, encoding, callback) { this.stats.total++;
if (line.includes('ERROR')) { this.stats.errors++; this.push({ type: 'error', line, stats: { ...this.stats } }); } else if (line.includes('WARN')) { this.stats.warnings++; } callback(); }
_flush(callback) { this.push({ type: 'summary', stats: { ...this.stats } }); callback(); } }
async function analyzeLog(filePath) { const rl = readline.createInterface({ input: fs.createReadStream(filePath), crlfDelay: Infinity, });
const analyzer = new LogAnalyzer();
for await (const line of rl) { analyzer.write(line); } analyzer.end();
for await (const result of analyzer) { if (result.type === 'error') { console.log(`[ERROR] ${result.line}`); } else if (result.type === 'summary') { console.log(`\nSummary: ${JSON.stringify(result.stats)}`); } } }
|
大文件分块上传
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| const { Readable } = require('stream'); const fs = require('fs');
async function uploadInChunks(filePath, chunkSize = 5 * 1024 * 1024) { const fileStream = fs.createReadStream(filePath, { highWaterMark: chunkSize, });
let partNumber = 0; for await (const chunk of fileStream) { partNumber++; console.log(`Uploading part ${partNumber}: ${chunk.length} bytes`);
await uploadPart(partNumber, chunk); }
console.log(`Upload complete: ${partNumber} parts`); }
async function uploadPart(partNumber, data) { return new Promise(resolve => setTimeout(resolve, 100)); }
|
总结
Node.js Stream编程的核心要点:
- 四种流类型:Readable、Writable、Transform、Duplex,各有使用场景
- 背压机制:自动协调生产者和消费者速度,防止内存溢出
- pipeline优于pipe:统一错误处理,自动清理资源
- 异步迭代器:
for await...of是消费Readable流的最优雅方式
- stream.compose:将多个Transform组合成可复用的单元
- 实际应用:文件处理、HTTP流式响应、日志分析、数据转换
流是Node.js处理大数据量的核心工具,掌握流编程能让你的应用在处理大文件和高并发场景下保持优秀的内存效率。