From a546fbabdc730e641fff7d9801a052eb48609af4 Mon Sep 17 00:00:00 2001 From: Qingyu Wang Date: Wed, 3 Jun 2026 19:15:01 +0800 Subject: [PATCH 1/2] stream: fix pipeTo assert when writer is released during microtask The deferred write introduced by commit 65aa8f68d0 ("stream: fix pipeTo to defer writes per WHATWG spec") wraps the write operation in queueMicrotask(). A race condition exists where the pipe can shutdown and release the writer between when [kChunk] schedules the microtask and when it executes, causing an ERR_INTERNAL_ASSERTION because writer[[stream]] is undefined. Fix by moving the shuttingDown flag into the shared pipe state object so that PipeToReadableStreamReadRequest can check it. The microtask now skips the write if the pipe has already begun shutting down. This aligns with the WHATWG Streams spec step 15 which states: "if shuttingDown becomes true, the user agent must not initiate further reads from reader, and must only perform writes of already-read chunks". Fixes: https://github.com/nodejs/node/issues/63732 Signed-off-by: Qingyu Wang --- lib/internal/webstreams/readablestream.js | 21 ++++++----- ...-webstreams-pipeto-writer-released-race.js | 35 +++++++++++++++++++ 2 files changed, 47 insertions(+), 9 deletions(-) create mode 100644 test/parallel/test-webstreams-pipeto-writer-released-race.js diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index 8ae3fff11abbf1..c6f4634dcbe764 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -1424,8 +1424,6 @@ function readableStreamPipeTo( source[kState].disturbed = true; - let shuttingDown = false; - if (signal !== undefined) { try { validateAbortSignal(signal, 'options.signal'); @@ -1438,6 +1436,7 @@ function readableStreamPipeTo( const state = { currentWrite: PromiseResolve(), + shuttingDown: false, }; // The error here can be undefined. The rejected arg @@ -1462,8 +1461,8 @@ function readableStreamPipeTo( } function shutdownWithAnAction(action, rejected, originalError) { - if (shuttingDown) return; - shuttingDown = true; + if (state.shuttingDown) return; + state.shuttingDown = true; if (dest[kState].state === 'writable' && !writableStreamCloseQueuedOrInFlight(dest)) { PromisePrototypeThen( @@ -1483,8 +1482,8 @@ function readableStreamPipeTo( } function shutdown(rejected, error) { - if (shuttingDown) return; - shuttingDown = true; + if (state.shuttingDown) return; + state.shuttingDown = true; if (dest[kState].state === 'writable' && !writableStreamCloseQueuedOrInFlight(dest)) { PromisePrototypeThen( @@ -1546,11 +1545,11 @@ function readableStreamPipeTo( } async function step() { - if (shuttingDown) return true; + if (state.shuttingDown) return true; if (dest[kState].backpressure) { await writer[kState].ready.promise; - if (shuttingDown) return true; + if (state.shuttingDown) return true; } const controller = source[kState].controller; @@ -1563,7 +1562,7 @@ function readableStreamPipeTo( controller[kState].queue.length > 0) { while (controller[kState].queue.length > 0) { - if (shuttingDown) return true; + if (state.shuttingDown) return true; const chunk = dequeueValue(controller); @@ -1678,6 +1677,10 @@ class PipeToReadableStreamReadRequest { // synchronous write during enqueue(). See WHATWG Streams spec // "ReadableStreamPipeTo" step 15's "chunk steps". queueMicrotask(() => { + if (this.state.shuttingDown) { + this.promise.resolve(false); + return; + } this.state.currentWrite = writableStreamDefaultWriterWrite(this.writer, chunk); markPromiseAsHandled(this.state.currentWrite); this.promise.resolve(false); diff --git a/test/parallel/test-webstreams-pipeto-writer-released-race.js b/test/parallel/test-webstreams-pipeto-writer-released-race.js new file mode 100644 index 00000000000000..41fe191f57e670 --- /dev/null +++ b/test/parallel/test-webstreams-pipeto-writer-released-race.js @@ -0,0 +1,35 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const { ReadableStream, WritableStream } = require('stream/web'); + +{ + let sourceController; + let destController; + + const source = new ReadableStream({ + start(controller) { + sourceController = controller; + }, + }); + + const dest = new WritableStream({ + start(controller) { + destController = controller; + }, + write() {}, + }); + + source.pipeTo(dest, { preventCancel: true }).then( + common.mustNotCall('pipeTo should not resolve'), + common.mustCall((err) => { + assert.strictEqual(err.message, 'destination errored'); + }) + ); + + setImmediate(common.mustCall(() => { + destController.error(new Error('destination errored')); + sourceController.enqueue('chunk'); + })); +} From eedd317fc877c6fcd6019471599ff37b9ef200d8 Mon Sep 17 00:00:00 2001 From: Qingyu Wang Date: Wed, 3 Jun 2026 20:51:46 +0800 Subject: [PATCH 2/2] stream: narrow pipeTo guard to writer release check The state.shuttingDown guard introduced in the previous commit is too broad: it skips writes even when the destination is still writable. The WHATWG Streams spec requires that already-read chunks are written to a still-writable destination during shutdown (step 15, shutdown substeps 3.1-3.2). Replace the guard with a check for writer[kState].stream === undefined which is only true after finalize() has released the writer. This precisely targets the crash condition without violating the spec requirement. Restore shuttingDown as a closure-local variable since it no longer needs to be shared with PipeToReadableStreamReadRequest. Add a test case verifying that an already-read chunk is written when an AbortSignal fires after enqueue while the destination is still writable. Signed-off-by: Qingyu Wang --- lib/internal/webstreams/readablestream.js | 19 ++++++------ ...-webstreams-pipeto-writer-released-race.js | 31 +++++++++++++++++++ 2 files changed, 41 insertions(+), 9 deletions(-) diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index c6f4634dcbe764..61d85169607e19 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -1424,6 +1424,8 @@ function readableStreamPipeTo( source[kState].disturbed = true; + let shuttingDown = false; + if (signal !== undefined) { try { validateAbortSignal(signal, 'options.signal'); @@ -1436,7 +1438,6 @@ function readableStreamPipeTo( const state = { currentWrite: PromiseResolve(), - shuttingDown: false, }; // The error here can be undefined. The rejected arg @@ -1461,8 +1462,8 @@ function readableStreamPipeTo( } function shutdownWithAnAction(action, rejected, originalError) { - if (state.shuttingDown) return; - state.shuttingDown = true; + if (shuttingDown) return; + shuttingDown = true; if (dest[kState].state === 'writable' && !writableStreamCloseQueuedOrInFlight(dest)) { PromisePrototypeThen( @@ -1482,8 +1483,8 @@ function readableStreamPipeTo( } function shutdown(rejected, error) { - if (state.shuttingDown) return; - state.shuttingDown = true; + if (shuttingDown) return; + shuttingDown = true; if (dest[kState].state === 'writable' && !writableStreamCloseQueuedOrInFlight(dest)) { PromisePrototypeThen( @@ -1545,11 +1546,11 @@ function readableStreamPipeTo( } async function step() { - if (state.shuttingDown) return true; + if (shuttingDown) return true; if (dest[kState].backpressure) { await writer[kState].ready.promise; - if (state.shuttingDown) return true; + if (shuttingDown) return true; } const controller = source[kState].controller; @@ -1562,7 +1563,7 @@ function readableStreamPipeTo( controller[kState].queue.length > 0) { while (controller[kState].queue.length > 0) { - if (state.shuttingDown) return true; + if (shuttingDown) return true; const chunk = dequeueValue(controller); @@ -1677,7 +1678,7 @@ class PipeToReadableStreamReadRequest { // synchronous write during enqueue(). See WHATWG Streams spec // "ReadableStreamPipeTo" step 15's "chunk steps". queueMicrotask(() => { - if (this.state.shuttingDown) { + if (this.writer[kState].stream === undefined) { this.promise.resolve(false); return; } diff --git a/test/parallel/test-webstreams-pipeto-writer-released-race.js b/test/parallel/test-webstreams-pipeto-writer-released-race.js index 41fe191f57e670..64f84855a58867 100644 --- a/test/parallel/test-webstreams-pipeto-writer-released-race.js +++ b/test/parallel/test-webstreams-pipeto-writer-released-race.js @@ -33,3 +33,34 @@ const { ReadableStream, WritableStream } = require('stream/web'); sourceController.enqueue('chunk'); })); } + +{ + const ac = new AbortController(); + let sourceController; + const chunks = []; + + const source = new ReadableStream({ + start(controller) { + sourceController = controller; + }, + }, { highWaterMark: 0 }); + + const dest = new WritableStream({ + write: common.mustCall((chunk) => { + chunks.push(chunk); + }), + }, { highWaterMark: 1 }); + + source.pipeTo(dest, { signal: ac.signal }).then( + common.mustNotCall('pipeTo should not resolve'), + common.mustCall((err) => { + assert.strictEqual(err.name, 'AbortError'); + assert.deepStrictEqual(chunks, ['chunk']); + }) + ); + + setImmediate(common.mustCall(() => { + sourceController.enqueue('chunk'); + ac.abort(); + })); +}