From 205af7cd0118c60587848d89b2569498093da94a Mon Sep 17 00:00:00 2001 From: Nicolas Date: Sun, 14 Jun 2026 20:43:19 +0000 Subject: [PATCH] fix: prevent loss of step log output at end of step (#1028) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Problem Several runner code paths could drop the **tail** of a step's log output, so a failing (or cancelled) step would show output that is missing its last line(s). This was observed in practice and traced to four independent issues. ## Root causes & fixes ### 1. Trailing line without a newline was never flushed `common.lineWriter` buffers output until it sees a `\n`. A final line **without** a trailing newline (e.g. an error message printed right before a process exits, a panic, `printf` without `\n`) stayed in the internal buffer and was never emitted — the writer exposed no flush at all. - Added `lineWriter.Flush()` (idempotent), a `Flusher` interface, and a `FlushWriter(io.Writer)` helper. - Flush at every stream EOF: the exec copy goroutine, the container `attach()` streaming goroutine, and at step end (`useStepLogger`). ### 2. Cancellation/timeout truncated output `waitForCommand` returned immediately on `ctx.Done()` and abandoned the output-copy goroutine, losing output the command had already produced. It now drains with a bounded grace period before returning. The response channel is buffered so the goroutine can't leak if the drain times out. ### 3. `attach()` raced the final bytes Container output was streamed in a fire-and-forget goroutine that `wait()` did not synchronize with, so the step could proceed before the last bytes were written. `wait()` now blocks on the streaming goroutine (bounded) so output is fully drained and flushed first. ### 4. `::stop-commands::` silently dropped lines from the step log Lines between `::stop-commands::` and its end token were echoed without the `raw_output` field **and** short-circuited the handler chain (`return false`), so they never reached the step log (non-raw entries aren't appended while a step is running). Now returns `true` so they are still captured. Reviewed-on: https://gitea.com/gitea/runner/pulls/1028 Reviewed-by: Zettat123 <39446+zettat123@noreply.gitea.com> --- act/common/line_writer.go | 26 ++++++++++++ act/common/line_writer_test.go | 31 ++++++++++++++ act/container/docker_run.go | 60 +++++++++++++++++++++++--- act/container/docker_run_test.go | 72 ++++++++++++++++++++++++++++++++ act/runner/command.go | 5 ++- act/runner/command_test.go | 23 ++++++++++ act/runner/job_executor.go | 5 +++ 7 files changed, 216 insertions(+), 6 deletions(-) diff --git a/act/common/line_writer.go b/act/common/line_writer.go index c0ff4733..7bdd3f1f 100644 --- a/act/common/line_writer.go +++ b/act/common/line_writer.go @@ -12,6 +12,13 @@ import ( // LineHandler is a callback function for handling a line type LineHandler func(line string) bool +// Flusher is implemented by writers that buffer a trailing, not-yet-terminated +// line. Callers should flush once the underlying stream has reached EOF so the +// final line (when it is not newline-terminated) is not lost. +type Flusher interface { + Flush() +} + type lineWriter struct { buffer bytes.Buffer handlers []LineHandler @@ -24,6 +31,14 @@ func NewLineWriter(handlers ...LineHandler) io.Writer { return w } +// FlushWriter flushes w if it implements Flusher. It is a no-op otherwise, so +// callers can flush an io.Writer without knowing its concrete type. +func FlushWriter(w io.Writer) { + if f, ok := w.(Flusher); ok { + f.Flush() + } +} + func (lw *lineWriter) Write(p []byte) (n int, err error) { pBuf := bytes.NewBuffer(p) written := 0 @@ -44,6 +59,17 @@ func (lw *lineWriter) Write(p []byte) (n int, err error) { return written, nil } +// Flush emits any buffered, not-yet-newline-terminated content as a final line. +// It is safe to call multiple times; subsequent calls with an empty buffer are +// no-ops. +func (lw *lineWriter) Flush() { + if lw.buffer.Len() == 0 { + return + } + lw.handleLine(lw.buffer.String()) + lw.buffer.Reset() +} + func (lw *lineWriter) handleLine(line string) { for _, h := range lw.handlers { ok := h(line) diff --git a/act/common/line_writer_test.go b/act/common/line_writer_test.go index 546beb9e..f58d319d 100644 --- a/act/common/line_writer_test.go +++ b/act/common/line_writer_test.go @@ -5,6 +5,7 @@ package common import ( + "io" "testing" "github.com/stretchr/testify/assert" @@ -39,3 +40,33 @@ func TestLineWriter(t *testing.T) { assert.Equal(" and another\n", lines[2]) assert.Equal("last line\n", lines[3]) } + +func TestLineWriterFlush(t *testing.T) { + lines := make([]string, 0) + lineHandler := func(s string) bool { + lines = append(lines, s) + return true + } + + lineWriter := NewLineWriter(lineHandler) + + assert := assert.New(t) + _, err := lineWriter.Write([]byte("complete line\npartial line without newline")) + assert.NoError(err) //nolint:testifylint // pre-existing pattern from nektos/act + + // Only the newline-terminated line is emitted before flushing. + assert.Equal([]string{"complete line\n"}, lines) + + // Flushing emits the buffered, not-yet-terminated trailing line. + FlushWriter(lineWriter) + assert.Equal([]string{"complete line\n", "partial line without newline"}, lines) + + // Flushing again is a no-op: nothing is buffered. + FlushWriter(lineWriter) + assert.Len(lines, 2) +} + +func TestFlushWriterIgnoresNonFlusher(t *testing.T) { + // FlushWriter must be a safe no-op for writers that do not buffer lines. + assert.NotPanics(t, func() { FlushWriter(io.Discard) }) +} diff --git a/act/container/docker_run.go b/act/container/docker_run.go index 15a62479..87323cfd 100644 --- a/act/container/docker_run.go +++ b/act/container/docker_run.go @@ -20,6 +20,7 @@ import ( "slices" "strconv" "strings" + "time" "gitea.com/gitea/runner/act/common" "gitea.com/gitea/runner/act/filecollector" @@ -45,6 +46,13 @@ import ( "github.com/spf13/pflag" ) +// drainGracePeriod bounds how long we wait for an output-copy goroutine to +// finish draining a container's output before returning, so that neither a +// cancellation (waitForCommand) nor a normal container exit (wait) truncates +// the tail of the log. It is a safety bound: in the common case the stream +// reaches EOF and the goroutine returns well before this elapses. +const drainGracePeriod = 2 * time.Second + // NewContainer creates a reference to a container func NewContainer(input *NewContainerInput) ExecutionsEnvironment { cr := new(containerReference) @@ -229,6 +237,10 @@ type containerReference struct { input *NewContainerInput UID int GID int + // attachDone is closed by the attach() streaming goroutine once it has + // drained and flushed the container's output. wait() blocks on it so the + // tail of the log lands before the step proceeds. + attachDone chan struct{} LinuxContainerEnvironmentExtensions } @@ -730,7 +742,9 @@ func (cr *containerReference) tryReadGID() common.Executor { func (cr *containerReference) waitForCommand(ctx context.Context, isTerminal bool, resp client.HijackedResponse, _ client.ExecCreateResult, _, _ string) error { logger := common.Logger(ctx) - cmdResponse := make(chan error) + // Buffered so the copy goroutine never blocks on send if the grace-period + // drain below times out and no one is left to receive. + cmdResponse := make(chan error, 1) go func() { var outWriter io.Writer @@ -749,6 +763,11 @@ func (cr *containerReference) waitForCommand(ctx context.Context, isTerminal boo } else { _, err = io.Copy(outWriter, resp.Reader) } + // Flush any buffered, not-yet-newline-terminated trailing line so the + // final line of a command's output is not lost (e.g. an error message + // printed without a trailing newline before the process exits). + common.FlushWriter(outWriter) + common.FlushWriter(errWriter) cmdResponse <- err }() @@ -760,6 +779,16 @@ func (cr *containerReference) waitForCommand(ctx context.Context, isTerminal boo logger.Warnf("Failed to send CTRL+C: %+s", err) } + // Give the copy goroutine a brief grace period to drain output already + // produced by the command before we return, so cancellation does not + // truncate the tail of the log. The goroutine exits once the hijacked + // stream is closed by resp.Close() in the caller's defer. + select { + case <-cmdResponse: + case <-time.After(drainGracePeriod): + logger.Warn("Timed out draining command output after cancellation") + } + // we return the context canceled error to prevent other steps // from executing return ctx.Err() @@ -945,14 +974,23 @@ func (cr *containerReference) attach() common.Executor { if errWriter == nil { errWriter = os.Stderr } + done := make(chan struct{}) + cr.attachDone = done go func() { + defer close(done) + var copyErr error if !isTerminal || os.Getenv("NORAW") != "" { - _, err = stdcopy.StdCopy(outWriter, errWriter, out.Reader) + _, copyErr = stdcopy.StdCopy(outWriter, errWriter, out.Reader) } else { - _, err = io.Copy(outWriter, out.Reader) + _, copyErr = io.Copy(outWriter, out.Reader) } - if err != nil { - common.Logger(ctx).Error(err) + // Flush any buffered, not-yet-newline-terminated trailing line once + // the stream reaches EOF, so the final line of the container's + // output is not lost when it is not newline-terminated. + common.FlushWriter(outWriter) + common.FlushWriter(errWriter) + if copyErr != nil { + common.Logger(ctx).Error(copyErr) } }() return nil @@ -991,6 +1029,18 @@ func (cr *containerReference) wait() common.Executor { logger.Debugf("Return status: %v", statusCode) + // The container has exited; wait for the attach() streaming goroutine to + // finish draining and flushing its output before returning, so the tail + // of the log is not lost. Bounded so a stuck stream cannot hang the step. + if cr.attachDone != nil { + select { + case <-cr.attachDone: + case <-time.After(drainGracePeriod): + logger.Warn("Timed out draining container output") + } + cr.attachDone = nil + } + if statusCode == 0 { return nil } diff --git a/act/container/docker_run_test.go b/act/container/docker_run_test.go index 7f2c5226..dc51278e 100644 --- a/act/container/docker_run_test.go +++ b/act/container/docker_run_test.go @@ -8,6 +8,7 @@ import ( "bufio" "bytes" "context" + "encoding/binary" "errors" "io" "net" @@ -20,6 +21,7 @@ import ( "gitea.com/gitea/runner/act/common" cerrdefs "github.com/containerd/errdefs" + "github.com/moby/moby/api/pkg/stdcopy" "github.com/moby/moby/api/types/container" mobyclient "github.com/moby/moby/client" "github.com/sirupsen/logrus/hooks/test" @@ -89,6 +91,11 @@ func (m *mockDockerClient) ExecInspect(ctx context.Context, execID string, opts return args.Get(0).(mobyclient.ExecInspectResult), args.Error(1) } +func (m *mockDockerClient) ContainerAttach(ctx context.Context, containerID string, opts mobyclient.ContainerAttachOptions) (mobyclient.ContainerAttachResult, error) { + args := m.Called(ctx, containerID, opts) + return args.Get(0).(mobyclient.ContainerAttachResult), args.Error(1) +} + func (m *mockDockerClient) ContainerWait(ctx context.Context, containerID string, opts mobyclient.ContainerWaitOptions) mobyclient.ContainerWaitResult { args := m.Called(ctx, containerID, opts) return args.Get(0).(mobyclient.ContainerWaitResult) @@ -206,6 +213,71 @@ func TestDockerExecFailure(t *testing.T) { client.AssertExpectations(t) } +// stdcopyFrame wraps payload in a single Docker multiplexed-stream frame, the +// format StdCopy expects: an 8-byte header (stream type + 4-byte big-endian +// length) followed by the payload. +func stdcopyFrame(stream stdcopy.StdType, payload string) []byte { + b := make([]byte, 8+len(payload)) + b[0] = byte(stream) + binary.BigEndian.PutUint32(b[4:8], uint32(len(payload))) + copy(b[8:], payload) + return b +} + +// TestDockerAttachFlushesTrailingLine verifies that wait() blocks until the +// attach() streaming goroutine has drained and flushed the container's output, +// so a final line without a trailing newline is not lost. +func TestDockerAttachFlushesTrailingLine(t *testing.T) { + ctx := context.Background() + + framed := bytes.NewBuffer(stdcopyFrame(stdcopy.Stdout, "line one\nlast line without newline")) + + var lines []string + logWriter := common.NewLineWriter(func(s string) bool { + lines = append(lines, s) + return true + }) + + client := &mockDockerClient{} + client.On("ContainerAttach", ctx, "123", mock.AnythingOfType("client.ContainerAttachOptions")). + Return(mobyclient.ContainerAttachResult{ + HijackedResponse: mobyclient.HijackedResponse{ + Conn: &mockConn{}, + Reader: bufio.NewReader(framed), + }, + }, nil) + + statusCh := make(chan container.WaitResponse, 1) + statusCh <- container.WaitResponse{StatusCode: 0} + errCh := make(chan error, 1) + client.On("ContainerWait", ctx, "123", mobyclient.ContainerWaitOptions{Condition: container.WaitConditionNotRunning}). + Return(mobyclient.ContainerWaitResult{ + Result: (<-chan container.WaitResponse)(statusCh), + Error: (<-chan error)(errCh), + }) + + cr := &containerReference{ + id: "123", + cli: client, + input: &NewContainerInput{ + Image: "image", + Stdout: logWriter, + Stderr: logWriter, + }, + } + + require.NoError(t, cr.attach()(ctx)) + require.NoError(t, cr.wait()(ctx)) + + // wait() must have blocked until the goroutine drained AND flushed; the + // trailing, non-newline-terminated line must therefore be present. Reading + // lines here is race-free because wait() synchronizes on attachDone, which + // the goroutine closes after the final append. + assert.Equal(t, []string{"line one\n", "last line without newline"}, lines) + + client.AssertExpectations(t) +} + func TestDockerWaitFailure(t *testing.T) { ctx := context.Background() diff --git a/act/runner/command.go b/act/runner/command.go index 5ac20e95..3f84480e 100644 --- a/act/runner/command.go +++ b/act/runner/command.go @@ -48,8 +48,11 @@ func (rc *RunContext) commandHandler(ctx context.Context) common.LineHandler { if resumeCommand != "" && command != resumeCommand { // There should not be any emojis in the log output for Gitea. // The code in the switch statement is the same. + // Return true (not false) so the line still reaches the raw_output + // log handler; otherwise everything between ::stop-commands:: and + // its end token is silently dropped from the step log. logger.Infof("%s", line) - return false + return true } arg = UnescapeCommandData(arg) kvPairs = unescapeKvPairs(kvPairs) diff --git a/act/runner/command_test.go b/act/runner/command_test.go index 89105efb..c10b6f15 100644 --- a/act/runner/command_test.go +++ b/act/runner/command_test.go @@ -28,6 +28,29 @@ func TestSetEnv(t *testing.T) { a.Equal("valz", rc.Env["x"]) } +func TestStopCommandsKeepsSuppressedLinesInLog(t *testing.T) { + a := assert.New(t) + ctx := context.Background() + rc := new(RunContext) + handler := rc.commandHandler(ctx) + + // Stop command processing until the matching end token is seen. + a.True(handler("::stop-commands::my-end-token\n")) + + // A command-shaped line while stopped must not be executed (env unchanged), + // but must still return true so it reaches the raw_output log handler and is + // not dropped from the step log. + a.True(handler("::set-env name=x::valz\n")) + a.NotContains(rc.Env, "x") + + // The matching end token resumes command processing. + a.True(handler("::my-end-token::\n")) + + // Commands are processed again after resuming. + a.True(handler("::set-env name=y::valy\n")) + a.Equal("valy", rc.Env["y"]) +} + func TestSetOutput(t *testing.T) { a := assert.New(t) ctx := context.Background() diff --git a/act/runner/job_executor.go b/act/runner/job_executor.go index 7cc5fa9d..b8f7ccfd 100644 --- a/act/runner/job_executor.go +++ b/act/runner/job_executor.go @@ -462,6 +462,11 @@ func useStepLogger(rc *RunContext, stepModel *model.Step, stage stepStage, execu oldout, olderr := rc.JobContainer.ReplaceLogWriter(logWriter, logWriter) defer rc.JobContainer.ReplaceLogWriter(oldout, olderr) + // Flush any buffered, not-yet-newline-terminated trailing line once the + // step has finished, so the final line of the step's output is not lost + // when it is not newline-terminated. + defer common.FlushWriter(logWriter) + return executor(ctx) } }