How to properly clear the internal buffer of a readable stream in node?

I’m working on my custom transform stream. Since I handle buffering and caching manually, I need to ensure that no chunks will be automatically sent when re-piping it. To achieve that I should clear/reset the internal buffer (the write queue) before attempting to call .pipe().

Since my stream logic is quite complicated, I made this simplified example to help you better understand what I’m trying to solve:

const { Readable, Writable } = require('stream')
const wait = ms => new Promise(res => setTimeout(res, ms))

;(async () => {
    const readable = new Readable()
    readable._read = () => {}

    const writable = new Writable()
    writable._write = (chunk, _encoding, callback) => {
        console.log(chunk.toString())
        callback()
    }

    readable.pipe(writable)
    await wait(100) // This small delay replaces listening to the pipe/unpipe events 

    readable.push('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')

    console.log('unpipe')
    readable.unpipe(writable)
    await wait(100)

    readable.push('bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb')

    // readable.clearInternalBufferOrWhatever() // ???

    console.log('pipe')
    readable.pipe(writable)
    await wait(100)

    readable.push('ccccccccccccccccccccccccccccccc')
})()

// => aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa (Expected ✔)
//    unpipe
//    pipe
//    bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb (Unexpected ❌)
//    ccccccccccccccccccccccccccccccc (Expected ✔)

As you can see, the writable stream has successfully received the second chunk ('bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb') and that’s what I’m trying to avoid. Any ideas?

Answer

I figured it out! Streams have an internal property called _readableState which contains the internal buffer. It’s possible to clear it with just 2 lines of code:

readable._readableState.buffer.clear()
readable._readableState.length = 0

Note that _readableState isn’t safe since it is internal and undocumented and could get breaking changes at anytime. However, this solution is confirmed to work on node v8.10.0 and v14.17.2.

Here is a full working example:

const { Readable, Writable } = require('stream')
const wait = ms => new Promise(res => setTimeout(res, ms))

;(async () => {
    const readable = new Readable()
    readable._read = () => {}

    const writable = new Writable()
    writable._write = (chunk, _encoding, callback) => {
        console.log(chunk.toString())
        callback()
    }

    readable.pipe(writable)
    await wait(100)
    readable.push('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')

    console.log('unpipe')
    readable.unpipe(writable)
    await wait(100)

    readable.push('bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb')

    readable._readableState.buffer.clear()
    readable._readableState.length = 0

    console.log('pipe')
    readable.pipe(writable)
    await wait(100)

    readable.push('ccccccccccccccccccccccccccccccc')
})()

// => aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
//    unpipe
//    pipe
//    ccccccccccccccccccccccccccccccc