Stage interface redesign: pipe type negotiation, pooled buffer copies#51
Stage interface redesign: pipe type negotiation, pooled buffer copies#51znull wants to merge 24 commits into
Conversation
409297e to
fa6c12d
Compare
79f4a1a to
911ed5b
Compare
|
This PR is still in draft mode. Do you want review/feedback already? |
@mhagger I ran out of time to review the LLM output before vacation. I wanted to read through the changes more fully myself before inflicting them on anyone else so I left it in draft mode. |
911ed5b to
4764d95
Compare
Ported from version-2 branch commits: - 95dc2e8 pipeline_test.go: get rid of a bunch of unnecessary tmpdirs - 5fdc22a TestPipelineStdinThatIsNeverClosed(): create stdin more simply - c2c9802 pipeline_test.go: use WithStdoutCloser() to close stdout pipes Tests that don't run external commands (or whose commands don't need a specific working directory) don't need t.TempDir().
Add some benchmarks that move MB-scale data through pipelines consisting of alternating commands and functions, one in small writes, and one buffered into larger writes, then processing it one line at a time. This is not so efficient, because every transition from `Function` → `Command` requires an extra (hidden) goroutine that copies the data from an `io.Reader` to a `*os.File`. We can make this faster!
* Rename * `newNopCloser()` → `newReaderNopCloser()` * `nopCloser` → `readerNopCloser` * `nopCloserWriterTo` → `readerWriterToNopCloser` * `nopWriteCloser` → `writerNopCloser` to help keep readers and writers straight and because only the `Close()` part is a NOP. * Move `writerNopCloser` to `nop_closer.go` to be with its siblings.
4764d95 to
f97fddd
Compare
2ed608b to
d14ef7b
Compare
08a9cf4 to
fca1bfc
Compare
There was a problem hiding this comment.
Pull request overview
This PR modernizes the pipeline stage contract for v2 by letting stages declare I/O preferences and receive both stdin and stdout from the pipeline, enabling better pipe selection and removing the synthetic ioCopier stage.
Changes:
- Redesigns
StagewithPreferences()andStart(..., stdin, stdout)soPipeline.Start()can negotiateos.Pipevsio.Pipe. - Reworks command/function/memory-limit stages for the new interface, including pooled stdout copies for non-file command destinations.
- Updates module path to
/v2and adds regression/benchmark coverage for pipe matching, empty pipelines, fast-path stdout, and start-failure cleanup.
Show a summary per file
| File | Description |
|---|---|
README.md |
Updates documentation links for the v2 module path. |
go.mod |
Changes the module path to github.com/github/go-pipe/v2. |
internal/ptree/ptree_test.go |
Updates internal import path for v2. |
pipe/stage.go |
Redefines the public Stage interface and adds I/O preference types. |
pipe/pipeline.go |
Reworks pipeline startup to negotiate pipe types and pass stdout directly. |
pipe/command.go |
Adapts command stages to the new interface and adds pooled stdout copy handling. |
pipe/function.go |
Adapts function stages to receive caller-provided stdout and panic handling. |
pipe/filter-error.go |
Forwards panic handlers through error-filtering wrappers. |
pipe/memorylimit.go |
Ports memory-watching wrappers to the new stage interface. |
pipe/nop_closer.go |
Splits reader/writer nop closers and adds test unwrapping support. |
pipe/copy_pool.go |
Adds pooled-buffer copy helper with ReaderFrom fast-path support. |
pipe/iocopier.go |
Removes the old synthetic copier stage. |
pipe/scanner.go |
Simplifies scanner error return. |
pipe/command_linux.go |
Updates internal import path for v2. |
pipe/command_test.go |
Applies formatting cleanup. |
pipe/command_nil_panic_test.go |
Updates direct Start call for the new signature. |
pipe/pipeline_test.go |
Updates tests/benchmarks for v2 behavior, empty pipelines, and panic forwarding. |
pipe/memorylimit_test.go |
Reworks memory-limit tests for the new pipeline flow. |
pipe/pipe_matching_test.go |
Adds coverage for negotiated stdin/stdout pipe types. |
pipe/export_test.go |
Exposes nop-closer unwrapping for external package tests. |
pipe/command_stdout_fastpath_test.go |
Adds tests pinning direct *os.File stdout handoff. |
pipe/command_starterror_test.go |
Adds regression coverage for start-failure copy-goroutine cleanup. |
Copilot's findings
Tip
Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
- Files reviewed: 22/22 changed files
- Comments generated: 2
d7948da to
33ca03c
Compare
I think this is actually worth taking a look at now. |
02b2958 to
913011b
Compare
When a command stage's stdout is not an `*os.File` (e.g. a
`bytes.Buffer` via `WithStdout()` or a custom writer via
`WithStdoutCloser()`), the fd-pass fast path doesn't apply and the data
has to flow through Go. Left to its own devices, `exec.Cmd` would set up
an internal `os.Pipe()` and run `io.Copy` with a fresh 32KB buffer
allocated per invocation.
To reduce GC pressure, let's do that copy directly: create the
`os.Pipe()` ourselves, set the write end as `cmd.Stdout` (so exec.Cmd
still does an fd dup into the child), and run the copy from the read end
to the user's writer in our own goroutine, drawing the 32KB buffer from
a `sync.Pool` (`copy_pool.go`).
Destinations that implement `io.ReaderFrom` (`*net.TCPConn`, `*os.File`)
are still routed through `ReadFrom` so platform fast paths like splice
continue to apply. The pure `*os.File` and `writerNopCloser{*os.File}`
paths are unchanged: the fd is still passed directly to the child
process.
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
`efStage` (the wrapper returned by `FilterError` and `IgnoreError`) embeds the `Stage` interface, which only exposes the four Stage methods. So when `Pipeline.Start()` checks `if phs, ok := s.(StagePanicHandlerAware); ok`, the assertion silently fails for any wrapped stage — even if the underlying stage is a `goStage` that implements `SetPanicHandler`. This means a configured `WithStagePanicHandler` is bypassed when the panicking Function is wrapped in `IgnoreError`. The goroutine inside `goStage.Start` sees `panicHandler == nil` and returns without calling `recover()`, letting the panic propagate up the runtime and crash the host process. memoryWatchStage had a similar issue. This was a pre-existing bug in main (not introduced by the version-2 Stage interface redesign), but we're already touching the panic-handler, so let's fix it here. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
07c3db2 to
78e3aae
Compare
Ensure that in case of cmd.Start() failure for pipelines that use pooled buffers, we don't leak a pooled-buffer-copy goroutine, and all closers are closed. Could be triggered/detected by using a command stage that fails on command-not-found under the race detector. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
The Stage interface in this series is not backwards compatible:
- Start()'s signature changed from:
Start(ctx, env, stdin io.ReadCloser) (io.ReadCloser, error)
to:
Start(ctx, env, stdin io.ReadCloser, stdout io.WriteCloser) error
- Stage gained a new required method Preferences().
Callers that only construct stages via the package's exported
constructors (Command(), CommandStage(), Function(), ...) are
unaffected, but anyone implementing Stage themselves has to update their
implementation.
It doesn't really mean much of anything to specify IOPreferenceNil. Nothing uses it, which isn't surprising because it's not clear what it would even mean anywhere other than the begin/end of a pipeline.
PR feedback: nil out lateClosers after use, so that closers can potentially be garbage collected in cases where the stage hangs around for a while.
192a7d8 to
f39433e
Compare
The internal noop-closer wrappers that go-pipe puts around the caller-owned stdin/stdout hid the underlying object's concrete type from stages. These were supposed to be unwrapped, but this wasn't done consistently (readerWriterToNopCloser), and external Stage implementations had no blessed way to unwrap at all. So, introduce exported UnwrapReader/UnwrapWriter as the sole way for recovering the underlying reader/writer, and delete readerWriterToNopCloser in favor of explicit unwrap. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Based on PR feedback, rewrite the use of recover() for function stages so that the code flows in forward order. Inline recoverPanic() to keep the recover in the first stack frame.
Of all the different types of pipe.Stage, most don't need to have a panic handler, because most are not running user functions. Yet we were paying the price of having panic forwarding as part of the interface, which was awkward and error-prone for the rest of the stages to implement cleanly. Instead, we can just pass the panic handler through Start instead. We use a trailing StartOptions struct carrying PanicHandler in Stage.Start. The StagePanicHandlerAware interface and its panic.go file are removed (StagePanicHandler moves next to StartOptions in stage.go). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Despite allowing a panic handler to be set, any panic in the user-supplied event handler run by memoryWatchStage in a library-spawned goroutine was not recovered. To cover that gap, pass opts.PanicHandler into monitor() and recover around the watch() call. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
f39433e to
4278c36
Compare
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
|
Thanks for your patience with my comments ✨ I think that I like the new style of dealing with the panic handler. I have just one question: why do we need a new |
Not at all, thanks for making them!
I considered that briefly, but @mhagger EDIT: I noticed two things reviewing the code of #53:
|
mhagger
left a comment
There was a problem hiding this comment.
Some thoughts about the tests related to handling panics.
Per mhagger's review suggestion on #51: rather than injecting a synthetic `watch` closure that panics, make `fakeLimitableStage.GetRSSAnon()` itself panic, and drive the test through the real `MemoryWatch` constructor and monitor goroutine. This exercises the genuine panic-propagation path. Co-authored-by: Michael Haggerty <mhagger@github.com> Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Per mhagger's review suggestion on #51: replace the re-exec'd subprocess test (and its child-process scaffolding) with a synchronous assert.PanicsWithValue. A panic in the monitor goroutine can't be observed from the test goroutine, so we drive the watch loop directly. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Rebases and modernizes @mhagger's Stage interface redesign (#21) onto current main, bumps the module version to v2, and takes advantage of the bump to make a couple other API changes.
What this is
go-pipe is the library used to wire up process pipelines (
cat foo | sed | filter-fn | writer): it spawns the subprocesses, connects stdin/stdout between stages, and propagates errors. This PR reworks the coreStageinterface so that the pipeline, not the stages, owns connecting adjacent stages. That allows for optimizations (like the copy-elimination below) and lets us delete the old syntheticioCopierentirely.The interface now hands each stage both its stdin and stdout, plus a new
StartOptions, and asks each stage to declare its I/O preferences so the pipeline can pick the cheapest pipe type between neighbors:Because this breaks the interface, the module path moves to
/v2.Perf optimizations
We want to minimize data copies, whether by letting the kernel move bytes instead of Go, or if we do copy in Go, doing it more efficiently. With the pipeline owning the connections it can:
*os.Filedestination's fd directly into the child, letting the kernel (sendfile(2)/splice(2)and friends) do the copy (so-called "zero-copy" i/o);exec.Cmdallocate a fresh one per pipeline.#49 and #50 were aiming for similar optimizations, but now they fall out of the structure instead of being bolted onto
ioCopier.Panic Handling
go-pipe runs user code (function stages, memory-limit event handlers) in goroutines it spawns itself, so a panic there used to be able to take down the whole process. There was already panic handling, but it was part of the Stage interface, which caused a lot of problems with error-prone boilerplate in stages that don't do any panic handling (which is most of them). Panic handling is now a callback passed into Start that the pipeline threads to every stage, so a single handler covers all stages.
Supersedes
Stageinterface to make stdin/stdout handling more flexible #21 (stale, merge conflicts)Stage2interface that allows such stages to be started more flexibly #20 (the opt-inStage2variant)sync.PoolforioCopiercopy buffers) — pool preserved incopy_pool.goioCopier) — sendfile preserved structurally; theWriterTopool-bypass workaround is gone, since we control the copy site nowThe
git-systems/pooled-copiesbranch (which carried #49 + #50) can be deleted after this merges./cc @mhagger @migue @carlosmn
Copilot Summary
Interface change
Stage.Startgainsstdoutand aStartOptionsstruct (a struct so future run-scoped options don't break the interface again):Pipeline.Start()uses each stage'sPreferences()to negotiate the pipe type between adjacent stages:os.Pipe()when either neighbor is a command (needs a real*os.Filefd)io.Pipe()when both neighbors are Go functions (all userspace, cheaper)stdoutpassed directly to the last stage — no syntheticioCopierModule path bumped to
github.com/github/go-pipe/v2for the breaking change.Fast paths preserved from #49/#50
ioCopieris deleted; the optimizations it carried now live in the stage structure:commandStagewriting to an*os.Filedestination dup's the fd into the child; for a non-*os.Filewriter that implementsio.ReaderFrom, the copy goes throughReadFrom(sendfile where the kernel supports it). Pinned inpipe/command_stdout_fastpath_test.go.*os.File, non-ReaderFromstdout,setupPooledStdoutbuilds anos.Pipe()and copies through async.Poolbuffer rather than lettingexec.Cmdallocate per-pipeline. Seepipe/copy_pool.go.Type unwrapping (replaces transparent NopCloser forwarding)
The old
NopCloserre-exposedio.WriterToby forwarding, so a naiveio.Copykept the fast path transparently through the wrapper. That's replaced by explicit, exported helpers:UnwrapReader(io.Reader) io.Reader/UnwrapWriter(io.Writer) io.Writerrecover the concrete type go-pipe wrapped around stdin/stdout (nil-safe; non-wrappers pass through unchanged).goStage.StartandcommandStage.Startcall them internally, so everypipe.Functionconsumer transparently regainsWriterTo/ReaderFrom/*os.Fileidentity — a superset of the single method the old forwarding preserved, and it picked up a casegoStagewas previously missing.Panic handling
StartOptions.PanicHandler(StagePanicHandler = func(p any) error) is set on the pipeline viaWithStagePanicHandlerand threaded to every stage'sStart. The previousStagePanicHandlerAwareopt-in interface is removed; wrapper stages just forwardopts.FunctionstageStageFuncs and memory-limit event handlers (both run in library-spawned goroutines). IfPanicHandleris nil the panic propagates and crashes — intentional, since gitrpcd treats an unhandled pipeline panic as catastrophic.PanicHandlerrather than crashing. The over-limit kill is guaranteed even if the event handler panics; a panic in a purely-informational handler (RSS-read error / peak usage) is recovered and the stage keeps running unmonitored ("fail open") — losing monitoring is not, on its own, a reason to stop a healthy stage. This is documented onMemoryLimit/StartOptions.PanicHandler.Commit structure
version-2(authorship preserved): linter shush,pipeline_test.gocleanup, pipeline benchmarks, NopCloser simplification, the Stage interface change, pipe-matching tests.MemoryLimitWithObserver, restore the Function-stage panic handler, fixmemoryWatchStage.Wait()to alwaysstopWatching(), lint.*os.Filestdout, avoid leaking the pooled-stdout goroutine whencmd.Start()fails./v2, removeIOPreferenceNil, handle nil stdin/stdout cleanly, exportUnwrap{Reader,Writer}and fully unwrapgoStagestdin, thread the panic handler throughStart(droppingStagePanicHandlerAware), and recover panics escaping the memory-watch goroutine.