Skip to content

Node.js Stream 实战指南

Stream 是 Node.js 最核心的模块之一,但很多开发者日常只用 fs.readFile 处理文件。当你需要处理大文件、构建管道式数据处理、或者实现高效 I/O 时,Stream 是不可或缺的工具。

为什么需要 Stream

先看一个常见问题:读取一个 2GB 的日志文件。

javascript
// 方案一:fs.readFile — 一次性读入内存
const fs = require('fs')

fs.readFile('./huge-log.txt', (err, data) => {
  if (err) throw err
  console.log(data.length)
})

// 问题:2GB 文件需要 2GB 内存,很可能 OOM
// 即使 Node.js 的 Buffer 有 2GB 限制(v12+ 默认 4GB),效率也很低
javascript
// 方案二:Stream — 分块读取,内存占用恒定
const fs = require('fs')

const stream = fs.createReadStream('./huge-log.txt', {
  encoding: 'utf8',
  highWaterMark: 64 * 1024 // 每次读取 64KB
})

let totalSize = 0
stream.on('data', (chunk) => {
  totalSize += chunk.length
  // 每次只处理 64KB,内存占用极低
})

stream.on('end', () => {
  console.log('总大小:', totalSize)
})

stream.on('error', (err) => {
  console.error('读取出错:', err)
})

Stream 把数据分成小块(chunk)处理,内存占用从 O(n) 变成 O(1)。

Stream 的四种类型

javascript
const { Readable, Writable, Duplex, Transform } = require('stream')

Readable(可读流)

javascript
const { Readable } = require('stream')

// 方式一:实现 _read 方法
class CounterStream extends Readable {
  constructor(max) {
    super({ encoding: 'utf8' })
    this.max = max
    this.current = 1
  }

  _read() {
    if (this.current > this.max) {
      this.push(null) // null 表示流结束
      return
    }
    // push 的数据会进入内部缓冲区
    this.push(`第 ${this.current} 行数据\n`)
    this.current++
  }
}

const counter = new CounterStream(5)
counter.on('data', (chunk) => {
  process.stdout.write(chunk)
})
// 输出:
// 第 1 行数据
// 第 2 行数据
// 第 3 行数据
// 第 4 行数据
// 第 5 行数据
javascript
// 方式二:使用 Readable.from() 从迭代器创建
const { Readable } = require('stream')

async function* generateUsers() {
  const users = [
    { id: 1, name: '张三' },
    { id: 2, name: '李四' },
    { id: 3, name: '王五' }
  ]
  for (const user of users) {
    yield JSON.stringify(user)
  }
}

const userStream = Readable.from(generateUsers())
userStream.on('data', (chunk) => {
  console.log('收到:', chunk.toString())
})

Writable(可写流)

javascript
const { Writable } = require('stream')

// 实现 _write 方法
class LogWriter extends Writable {
  constructor(options) {
    super(options)
    this.logs = []
  }

  _write(chunk, encoding, callback) {
    const message = chunk.toString()
    this.logs.push({
      time: new Date().toISOString(),
      message: message.trim()
    })
    // callback 必须调用,表示当前 chunk 处理完成
    callback()
  }

  // 可选:处理批量写入
  _writev(chunks, callback) {
    for (const { chunk } of chunks) {
      this.logs.push({
        time: new Date().toISOString(),
        message: chunk.toString().trim()
      })
    }
    callback()
  }
}

const logger = new LogWriter()
logger.write('服务器启动\n')
logger.write('监听端口 3000\n')
logger.end(() => {
  console.log('日志:', logger.logs)
})

Duplex(双工流)

Duplex 同时是 Readable 和 Writable,读写独立。

javascript
const { Duplex } = require('stream')

class TCPStream extends Duplex {
  constructor(socket) {
    super()
    this.socket = socket
    this.buffer = []

    // socket 的数据推入可读端
    socket.on('data', (data) => {
      this.push(data)
    })

    socket.on('end', () => {
      this.push(null)
    })
  }

  // _write 处理写入端
  _write(chunk, encoding, callback) {
    this.socket.write(chunk, encoding, callback)
  }

  // _read 处理读取端(这里由 socket 事件驱动)
  _read() {
    // socket 的 data 事件已经通过 push() 提供数据
  }
}

Transform(转换流)

Transform 是特殊的 Duplex,输入经过处理后输出。

javascript
const { Transform } = require('stream')

// 实例一:大写转换
class UpperCaseTransform extends Transform {
  _transform(chunk, encoding, callback) {
    // 将输入数据转为大写后推入输出
    this.push(chunk.toString().toUpperCase())
    callback()
  }
}

const upper = new UpperCaseTransform()
upper.on('data', (chunk) => {
  console.log(chunk.toString()) // 'HELLO WORLD'
})
upper.write('hello world')
upper.end()

// 实例二:JSON 行解析器
class JSONLineParser extends Transform {
  constructor() {
    super({ objectMode: true }) // 输出对象而非 Buffer
    this.buffer = ''
  }

  _transform(chunk, encoding, callback) {
    // 将新数据追加到缓冲区
    this.buffer += chunk.toString()

    // 按行分割
    const lines = this.buffer.split('\n')
    // 最后一行可能不完整,保留到下次
    this.buffer = lines.pop()

    for (const line of lines) {
      if (line.trim()) {
        try {
          this.push(JSON.parse(line))
        } catch (err) {
          this.emit('error', new Error(`JSON 解析失败: ${line}`))
        }
      }
    }

    callback()
  }

  _flush(callback) {
    // 处理最后一行
    if (this.buffer.trim()) {
      try {
        this.push(JSON.parse(this.buffer))
      } catch (err) {
        this.emit('error', new Error(`JSON 解析失败: ${this.buffer}`))
      }
    }
    callback()
  }
}

// 使用
const parser = new JSONLineParser()
parser.on('data', (obj) => {
  console.log('解析到对象:', obj)
})
parser.write('{"id":1,"name":"张三"}\n{"id":2,"name":"李四"}\n')
parser.end()

pipe 和 pipeline

pipe() 是 Stream 的核心机制,将可读流连接到可写流。

javascript
const fs = require('fs')

// 基本用法:复制文件
const readable = fs.createReadStream('./source.txt')
const writable = fs.createWriteStream('./dest.txt')

// pipe 自动处理:数据流、背压、结束传递
readable.pipe(writable)

writable.on('finish', () => {
  console.log('文件复制完成')
})
javascript
// 链式管道:多个转换流串联
const fs = require('fs')
const zlib = require('zlib')
const crypto = require('crypto')

// 压缩 → 加密 → 写入文件
fs.createReadStream('./data.txt')
  .pipe(zlib.createGzip())
  .pipe(crypto.createCipheriv('aes-256-cbc', key, iv))
  .pipe(fs.createWriteStream('./data.txt.enc'))
  .on('finish', () => console.log('加密压缩完成'))

// 解压解密:反过来
fs.createReadStream('./data.txt.enc')
  .pipe(crypto.createDecipheriv('aes-256-cbc', key, iv))
  .pipe(zlib.createGunzip())
  .pipe(fs.createWriteStream('./data-restored.txt'))
  .on('finish', () => console.log('解密解压完成'))

pipe() 有一个问题:如果管道中间出错,上游不会自动关闭。Node.js 10+ 引入了 pipeline 解决这个问题。

javascript
const { pipeline } = require('stream')
const fs = require('fs')
const zlib = require('zlib')

// pipeline 自动处理错误和资源清理
pipeline(
  fs.createReadStream('./input.txt'),
  zlib.createGzip(),
  fs.createWriteStream('./output.txt.gz'),
  (err) => {
    if (err) {
      console.error('管道出错:', err)
    } else {
      console.log('压缩完成')
    }
    // 无论成功还是失败,所有流都会被正确关闭
  }
)

背压(Backpressure)

背压是 Stream 处理速度不匹配时的自动调节机制。

javascript
const fs = require('fs')

const readable = fs.createReadStream('./huge-file.txt', {
  highWaterMark: 64 * 1024  // 读取缓冲区 64KB
})

const writable = fs.createWriteStream('./output.txt', {
  highWaterMark: 16 * 1024  // 写入缓冲区 16KB
})

readable.on('data', (chunk) => {
  // writable.write() 返回 false 表示内部缓冲区已满
  const canContinue = writable.write(chunk)
  if (!canContinue) {
    // 暂停读取,等写入端清空缓冲区
    console.log('背压触发,暂停读取')
    readable.pause()

    writable.once('drain', () => {
      console.log('缓冲区清空,恢复读取')
      readable.resume()
    })
  }
})

好消息是 pipe() 自动处理了背压,这就是为什么推荐用 pipe 而不是手动处理 data 事件。

javascript
// pipe 内部自动实现了上面的背压逻辑
readable.pipe(writable)

实战:大文件逐行处理

处理大型 CSV 或日志文件是最常见的 Stream 场景。

javascript
const fs = require('fs')
const { Transform } = require('stream')
const { pipeline } = require('stream')

// 逐行读取流
class LineReader extends Transform {
  constructor() {
    super({ readableObjectMode: true })
    this.buffer = ''
  }

  _transform(chunk, encoding, callback) {
    this.buffer += chunk.toString()
    const lines = this.buffer.split('\n')
    this.buffer = lines.pop() // 保留最后一个不完整的行
    for (const line of lines) {
      this.push(line)
    }
    callback()
  }

  _flush(callback) {
    if (this.buffer) {
      this.push(this.buffer)
    }
    callback()
  }
}

// CSV 解析器
class CSVParser extends Transform {
  constructor(headers) {
    super({ writableObjectMode: true, readableObjectMode: true })
    this.headers = headers
    this.isFirstLine = !headers
  }

  _transform(line, encoding, callback) {
    if (this.isFirstLine) {
      this.headers = line.split(',').map(h => h.trim())
      this.isFirstLine = false
      callback()
      return
    }

    const values = line.split(',').map(v => v.trim())
    const obj = {}
    this.headers.forEach((header, i) => {
      obj[header] = values[i] || ''
    })
    this.push(obj)
    callback()
  }
}

// 数据过滤
class AgeFilter extends Transform {
  constructor(minAge) {
    super({ writableObjectMode: true, readableObjectMode: true })
    this.minAge = minAge
  }

  _transform(obj, encoding, callback) {
    const age = parseInt(obj.age, 10)
    if (!isNaN(age) && age >= this.minAge) {
      this.push(obj)
    }
    callback()
  }
}

// 数据转换为 JSON 行格式
class ToJSONLine extends Transform {
  constructor() {
    super({ writableObjectMode: true })
  }

  _transform(obj, encoding, callback) {
    this.push(JSON.stringify(obj) + '\n')
    callback()
  }
}

// 使用
pipeline(
  fs.createReadStream('./users.csv'),
  new LineReader(),
  new CSVParser(),     // 自动从第一行读取表头
  new AgeFilter(18),   // 过滤年龄 >= 18
  new ToJSONLine(),
  fs.createWriteStream('./adults.jsonl'),
  (err) => {
    if (err) console.error('处理出错:', err)
    else console.log('处理完成')
  }
)

实战:HTTP 请求流式下载

javascript
const http = require('http')
const fs = require('fs')
const { Transform } = require('stream')
const { pipeline } = require('stream')

// 进度跟踪流
class ProgressTracker extends Transform {
  constructor(total) {
    super()
    this.transferred = 0
    this.total = total
    this.lastPercent = 0
  }

  _transform(chunk, encoding, callback) {
    this.transferred += chunk.length
    const percent = Math.floor((this.transferred / this.total) * 100)

    // 每 10% 输出一次进度
    if (percent >= this.lastPercent + 10) {
      this.lastPercent = percent
      console.log(`下载进度: ${percent}%`)
    }

    this.push(chunk)
    callback()
  }
}

function downloadFile(url, dest) {
  return new Promise((resolve, reject) => {
    http.get(url, (response) => {
      // 处理重定向
      if (response.statusCode === 301 || response.statusCode === 302) {
        downloadFile(response.headers.location, dest)
          .then(resolve)
          .catch(reject)
        return
      }

      if (response.statusCode !== 200) {
        reject(new Error(`下载失败: ${response.statusCode}`))
        return
      }

      const totalSize = parseInt(response.headers['content-length'], 10)

      pipeline(
        response,
        new ProgressTracker(totalSize),
        fs.createWriteStream(dest),
        (err) => {
          if (err) reject(err)
          else resolve()
        }
      )
    }).on('error', reject)
  })
}

// 使用
downloadFile('http://example.com/large-file.zip', './download.zip')
  .then(() => console.log('下载完成'))
  .catch(console.error)

实战:Express 中的流式响应

javascript
const express = require('express')
const fs = require('fs')
const { Transform } = require('stream')
const { pipeline } = require('stream')
const path = require('path')

const app = express()

// 大文件下载:支持断点续传
app.get('/download/:filename', (req, res) => {
  const filePath = path.join(__dirname, 'files', req.params.filename)

  if (!fs.existsSync(filePath)) {
    return res.status(404).json({ error: '文件不存在' })
  }

  const stat = fs.statSync(filePath)
  const fileSize = stat.size
  const range = req.headers.range

  if (range) {
    // 断点续传
    const parts = range.replace(/bytes=/, '').split('-')
    const start = parseInt(parts[0], 10)
    const end = parts[1] ? parseInt(parts[1], 10) : fileSize - 1
    const chunkSize = end - start + 1

    res.writeHead(206, {
      'Content-Range': `bytes ${start}-${end}/${fileSize}`,
      'Accept-Ranges': 'bytes',
      'Content-Length': chunkSize,
      'Content-Type': 'application/octet-stream'
    })

    const stream = fs.createReadStream(filePath, { start, end })
    stream.pipe(res)
  } else {
    res.writeHead(200, {
      'Content-Length': fileSize,
      'Content-Type': 'application/octet-stream'
    })
    fs.createReadStream(filePath).pipe(res)
  }
})

// 日志流式处理 API
app.get('/logs/search', (req, res) => {
  const keyword = req.query.keyword || ''
  const logFile = path.join(__dirname, 'logs', 'app.log')

  res.setHeader('Content-Type', 'text/plain; charset=utf-8')

  // 边读取边搜索边返回,不需要把整个日志文件加载到内存
  class KeywordFilter extends Transform {
    _transform(chunk, encoding, callback) {
      const lines = chunk.toString().split('\n')
      for (const line of lines) {
        if (line.includes(keyword)) {
          this.push(line + '\n')
        }
      }
      callback()
    }
  }

  pipeline(
    fs.createReadStream(logFile, { encoding: 'utf8' }),
    new KeywordFilter(),
    res,
    (err) => {
      if (err && !res.headersSent) {
        res.status(500).json({ error: '搜索出错' })
      }
    }
  )
})

Stream vs Buffer 对比

javascript
// 方案一:Buffer — 读取整个文件
const fs = require('fs')

function processWithBuffer(filePath) {
  const start = Date.now()
  const data = fs.readFileSync(filePath)
  const lines = data.toString().split('\n')
  const result = lines.filter(line => line.includes('ERROR'))
  console.log(`Buffer 方式: 找到 ${result.length} 条错误,耗时 ${Date.now() - start}ms`)
  console.log(`内存占用: ${(process.memoryUsage().heapUsed / 1024 / 1024).toFixed(2)} MB`)
}

// 方案二:Stream — 流式处理
function processWithStream(filePath) {
  const start = Date.now()
  const { Transform, pipeline } = require('stream')

  let count = 0

  class ErrorCounter extends Transform {
    _transform(chunk, encoding, callback) {
      const lines = chunk.toString().split('\n')
      count += lines.filter(line => line.includes('ERROR')).length
      callback()
    }
  }

  return new Promise((resolve) => {
    pipeline(
      fs.createReadStream(filePath),
      new ErrorCounter(),
      fs.createWriteStream('/dev/null'), // 丢弃输出
      () => {
        console.log(`Stream 方式: 找到 ${count} 条错误,耗时 ${Date.now() - start}ms`)
        console.log(`内存占用: ${(process.memoryUsage().heapUsed / 1024 / 1024).toFixed(2)} MB`)
        resolve()
      }
    )
  })
}

// 对比结果(处理 500MB 日志文件):
// Buffer 方式: 找到 12345 条错误,耗时 3200ms,内存占用 512 MB
// Stream 方式: 找到 12345 条错误,耗时 2800ms,内存占用 35 MB

小结

  • Stream 把数据分成小块处理,内存占用从 O(n) 降到 O(1),是处理大文件的首选方案
  • 四种类型:Readable(可读)、Writable(可写)、Duplex(双工)、Transform(转换)
  • pipe() 自动处理背压,推荐始终用 pipe 而非手动监听 data 事件
  • Node.js 10+ 的 pipeline() 更安全,自动处理错误和资源清理
  • 实际项目中 Stream 最适合大文件处理、日志分析、HTTP 流式响应等 I/O 密集场景

MIT Licensed