fix: prevent loss of step log output at end of step (#1028)

## Problem

Several runner code paths could drop the **tail** of a step's log output, so a
failing (or cancelled) step would show output that is missing its last line(s).
This was observed in practice and traced to four independent issues.

## Root causes & fixes

### 1. Trailing line without a newline was never flushed
`common.lineWriter` buffers output until it sees a `\n`. A final line **without**
a trailing newline (e.g. an error message printed right before a process exits,
a panic, `printf` without `\n`) stayed in the internal buffer and was never
emitted — the writer exposed no flush at all.

- Added `lineWriter.Flush()` (idempotent), a `Flusher` interface, and a
  `FlushWriter(io.Writer)` helper.
- Flush at every stream EOF: the exec copy goroutine, the container `attach()`
  streaming goroutine, and at step end (`useStepLogger`).

### 2. Cancellation/timeout truncated output
`waitForCommand` returned immediately on `ctx.Done()` and abandoned the
output-copy goroutine, losing output the command had already produced. It now
drains with a bounded grace period before returning. The response channel is
buffered so the goroutine can't leak if the drain times out.

### 3. `attach()` raced the final bytes
Container output was streamed in a fire-and-forget goroutine that `wait()` did
not synchronize with, so the step could proceed before the last bytes were
written. `wait()` now blocks on the streaming goroutine (bounded) so output is
fully drained and flushed first.

### 4. `::stop-commands::` silently dropped lines from the step log
Lines between `::stop-commands::<token>` and its end token were echoed without
the `raw_output` field **and** short-circuited the handler chain (`return false`),
so they never reached the step log (non-raw entries aren't appended while a step
is running). Now returns `true` so they are still captured.

Reviewed-on: https://gitea.com/gitea/runner/pulls/1028
Reviewed-by: Zettat123 <39446+zettat123@noreply.gitea.com>
This commit is contained in:
Nicolas
2026-06-14 20:43:19 +00:00
parent 33e6d1d8ff
commit 205af7cd01
7 changed files with 216 additions and 6 deletions

View File

@@ -12,6 +12,13 @@ import (
// LineHandler is a callback function for handling a line
type LineHandler func(line string) bool
// Flusher is implemented by writers that buffer a trailing, not-yet-terminated
// line. Callers should flush once the underlying stream has reached EOF so the
// final line (when it is not newline-terminated) is not lost.
type Flusher interface {
Flush()
}
type lineWriter struct {
buffer bytes.Buffer
handlers []LineHandler
@@ -24,6 +31,14 @@ func NewLineWriter(handlers ...LineHandler) io.Writer {
return w
}
// FlushWriter flushes w if it implements Flusher. It is a no-op otherwise, so
// callers can flush an io.Writer without knowing its concrete type.
func FlushWriter(w io.Writer) {
if f, ok := w.(Flusher); ok {
f.Flush()
}
}
func (lw *lineWriter) Write(p []byte) (n int, err error) {
pBuf := bytes.NewBuffer(p)
written := 0
@@ -44,6 +59,17 @@ func (lw *lineWriter) Write(p []byte) (n int, err error) {
return written, nil
}
// Flush emits any buffered, not-yet-newline-terminated content as a final line.
// It is safe to call multiple times; subsequent calls with an empty buffer are
// no-ops.
func (lw *lineWriter) Flush() {
if lw.buffer.Len() == 0 {
return
}
lw.handleLine(lw.buffer.String())
lw.buffer.Reset()
}
func (lw *lineWriter) handleLine(line string) {
for _, h := range lw.handlers {
ok := h(line)

View File

@@ -5,6 +5,7 @@
package common
import (
"io"
"testing"
"github.com/stretchr/testify/assert"
@@ -39,3 +40,33 @@ func TestLineWriter(t *testing.T) {
assert.Equal(" and another\n", lines[2])
assert.Equal("last line\n", lines[3])
}
func TestLineWriterFlush(t *testing.T) {
lines := make([]string, 0)
lineHandler := func(s string) bool {
lines = append(lines, s)
return true
}
lineWriter := NewLineWriter(lineHandler)
assert := assert.New(t)
_, err := lineWriter.Write([]byte("complete line\npartial line without newline"))
assert.NoError(err) //nolint:testifylint // pre-existing pattern from nektos/act
// Only the newline-terminated line is emitted before flushing.
assert.Equal([]string{"complete line\n"}, lines)
// Flushing emits the buffered, not-yet-terminated trailing line.
FlushWriter(lineWriter)
assert.Equal([]string{"complete line\n", "partial line without newline"}, lines)
// Flushing again is a no-op: nothing is buffered.
FlushWriter(lineWriter)
assert.Len(lines, 2)
}
func TestFlushWriterIgnoresNonFlusher(t *testing.T) {
// FlushWriter must be a safe no-op for writers that do not buffer lines.
assert.NotPanics(t, func() { FlushWriter(io.Discard) })
}

View File

@@ -20,6 +20,7 @@ import (
"slices"
"strconv"
"strings"
"time"
"gitea.com/gitea/runner/act/common"
"gitea.com/gitea/runner/act/filecollector"
@@ -45,6 +46,13 @@ import (
"github.com/spf13/pflag"
)
// drainGracePeriod bounds how long we wait for an output-copy goroutine to
// finish draining a container's output before returning, so that neither a
// cancellation (waitForCommand) nor a normal container exit (wait) truncates
// the tail of the log. It is a safety bound: in the common case the stream
// reaches EOF and the goroutine returns well before this elapses.
const drainGracePeriod = 2 * time.Second
// NewContainer creates a reference to a container
func NewContainer(input *NewContainerInput) ExecutionsEnvironment {
cr := new(containerReference)
@@ -229,6 +237,10 @@ type containerReference struct {
input *NewContainerInput
UID int
GID int
// attachDone is closed by the attach() streaming goroutine once it has
// drained and flushed the container's output. wait() blocks on it so the
// tail of the log lands before the step proceeds.
attachDone chan struct{}
LinuxContainerEnvironmentExtensions
}
@@ -730,7 +742,9 @@ func (cr *containerReference) tryReadGID() common.Executor {
func (cr *containerReference) waitForCommand(ctx context.Context, isTerminal bool, resp client.HijackedResponse, _ client.ExecCreateResult, _, _ string) error {
logger := common.Logger(ctx)
cmdResponse := make(chan error)
// Buffered so the copy goroutine never blocks on send if the grace-period
// drain below times out and no one is left to receive.
cmdResponse := make(chan error, 1)
go func() {
var outWriter io.Writer
@@ -749,6 +763,11 @@ func (cr *containerReference) waitForCommand(ctx context.Context, isTerminal boo
} else {
_, err = io.Copy(outWriter, resp.Reader)
}
// Flush any buffered, not-yet-newline-terminated trailing line so the
// final line of a command's output is not lost (e.g. an error message
// printed without a trailing newline before the process exits).
common.FlushWriter(outWriter)
common.FlushWriter(errWriter)
cmdResponse <- err
}()
@@ -760,6 +779,16 @@ func (cr *containerReference) waitForCommand(ctx context.Context, isTerminal boo
logger.Warnf("Failed to send CTRL+C: %+s", err)
}
// Give the copy goroutine a brief grace period to drain output already
// produced by the command before we return, so cancellation does not
// truncate the tail of the log. The goroutine exits once the hijacked
// stream is closed by resp.Close() in the caller's defer.
select {
case <-cmdResponse:
case <-time.After(drainGracePeriod):
logger.Warn("Timed out draining command output after cancellation")
}
// we return the context canceled error to prevent other steps
// from executing
return ctx.Err()
@@ -945,14 +974,23 @@ func (cr *containerReference) attach() common.Executor {
if errWriter == nil {
errWriter = os.Stderr
}
done := make(chan struct{})
cr.attachDone = done
go func() {
defer close(done)
var copyErr error
if !isTerminal || os.Getenv("NORAW") != "" {
_, err = stdcopy.StdCopy(outWriter, errWriter, out.Reader)
_, copyErr = stdcopy.StdCopy(outWriter, errWriter, out.Reader)
} else {
_, err = io.Copy(outWriter, out.Reader)
_, copyErr = io.Copy(outWriter, out.Reader)
}
if err != nil {
common.Logger(ctx).Error(err)
// Flush any buffered, not-yet-newline-terminated trailing line once
// the stream reaches EOF, so the final line of the container's
// output is not lost when it is not newline-terminated.
common.FlushWriter(outWriter)
common.FlushWriter(errWriter)
if copyErr != nil {
common.Logger(ctx).Error(copyErr)
}
}()
return nil
@@ -991,6 +1029,18 @@ func (cr *containerReference) wait() common.Executor {
logger.Debugf("Return status: %v", statusCode)
// The container has exited; wait for the attach() streaming goroutine to
// finish draining and flushing its output before returning, so the tail
// of the log is not lost. Bounded so a stuck stream cannot hang the step.
if cr.attachDone != nil {
select {
case <-cr.attachDone:
case <-time.After(drainGracePeriod):
logger.Warn("Timed out draining container output")
}
cr.attachDone = nil
}
if statusCode == 0 {
return nil
}

View File

@@ -8,6 +8,7 @@ import (
"bufio"
"bytes"
"context"
"encoding/binary"
"errors"
"io"
"net"
@@ -20,6 +21,7 @@ import (
"gitea.com/gitea/runner/act/common"
cerrdefs "github.com/containerd/errdefs"
"github.com/moby/moby/api/pkg/stdcopy"
"github.com/moby/moby/api/types/container"
mobyclient "github.com/moby/moby/client"
"github.com/sirupsen/logrus/hooks/test"
@@ -89,6 +91,11 @@ func (m *mockDockerClient) ExecInspect(ctx context.Context, execID string, opts
return args.Get(0).(mobyclient.ExecInspectResult), args.Error(1)
}
func (m *mockDockerClient) ContainerAttach(ctx context.Context, containerID string, opts mobyclient.ContainerAttachOptions) (mobyclient.ContainerAttachResult, error) {
args := m.Called(ctx, containerID, opts)
return args.Get(0).(mobyclient.ContainerAttachResult), args.Error(1)
}
func (m *mockDockerClient) ContainerWait(ctx context.Context, containerID string, opts mobyclient.ContainerWaitOptions) mobyclient.ContainerWaitResult {
args := m.Called(ctx, containerID, opts)
return args.Get(0).(mobyclient.ContainerWaitResult)
@@ -206,6 +213,71 @@ func TestDockerExecFailure(t *testing.T) {
client.AssertExpectations(t)
}
// stdcopyFrame wraps payload in a single Docker multiplexed-stream frame, the
// format StdCopy expects: an 8-byte header (stream type + 4-byte big-endian
// length) followed by the payload.
func stdcopyFrame(stream stdcopy.StdType, payload string) []byte {
b := make([]byte, 8+len(payload))
b[0] = byte(stream)
binary.BigEndian.PutUint32(b[4:8], uint32(len(payload)))
copy(b[8:], payload)
return b
}
// TestDockerAttachFlushesTrailingLine verifies that wait() blocks until the
// attach() streaming goroutine has drained and flushed the container's output,
// so a final line without a trailing newline is not lost.
func TestDockerAttachFlushesTrailingLine(t *testing.T) {
ctx := context.Background()
framed := bytes.NewBuffer(stdcopyFrame(stdcopy.Stdout, "line one\nlast line without newline"))
var lines []string
logWriter := common.NewLineWriter(func(s string) bool {
lines = append(lines, s)
return true
})
client := &mockDockerClient{}
client.On("ContainerAttach", ctx, "123", mock.AnythingOfType("client.ContainerAttachOptions")).
Return(mobyclient.ContainerAttachResult{
HijackedResponse: mobyclient.HijackedResponse{
Conn: &mockConn{},
Reader: bufio.NewReader(framed),
},
}, nil)
statusCh := make(chan container.WaitResponse, 1)
statusCh <- container.WaitResponse{StatusCode: 0}
errCh := make(chan error, 1)
client.On("ContainerWait", ctx, "123", mobyclient.ContainerWaitOptions{Condition: container.WaitConditionNotRunning}).
Return(mobyclient.ContainerWaitResult{
Result: (<-chan container.WaitResponse)(statusCh),
Error: (<-chan error)(errCh),
})
cr := &containerReference{
id: "123",
cli: client,
input: &NewContainerInput{
Image: "image",
Stdout: logWriter,
Stderr: logWriter,
},
}
require.NoError(t, cr.attach()(ctx))
require.NoError(t, cr.wait()(ctx))
// wait() must have blocked until the goroutine drained AND flushed; the
// trailing, non-newline-terminated line must therefore be present. Reading
// lines here is race-free because wait() synchronizes on attachDone, which
// the goroutine closes after the final append.
assert.Equal(t, []string{"line one\n", "last line without newline"}, lines)
client.AssertExpectations(t)
}
func TestDockerWaitFailure(t *testing.T) {
ctx := context.Background()

View File

@@ -48,8 +48,11 @@ func (rc *RunContext) commandHandler(ctx context.Context) common.LineHandler {
if resumeCommand != "" && command != resumeCommand {
// There should not be any emojis in the log output for Gitea.
// The code in the switch statement is the same.
// Return true (not false) so the line still reaches the raw_output
// log handler; otherwise everything between ::stop-commands:: and
// its end token is silently dropped from the step log.
logger.Infof("%s", line)
return false
return true
}
arg = UnescapeCommandData(arg)
kvPairs = unescapeKvPairs(kvPairs)

View File

@@ -28,6 +28,29 @@ func TestSetEnv(t *testing.T) {
a.Equal("valz", rc.Env["x"])
}
func TestStopCommandsKeepsSuppressedLinesInLog(t *testing.T) {
a := assert.New(t)
ctx := context.Background()
rc := new(RunContext)
handler := rc.commandHandler(ctx)
// Stop command processing until the matching end token is seen.
a.True(handler("::stop-commands::my-end-token\n"))
// A command-shaped line while stopped must not be executed (env unchanged),
// but must still return true so it reaches the raw_output log handler and is
// not dropped from the step log.
a.True(handler("::set-env name=x::valz\n"))
a.NotContains(rc.Env, "x")
// The matching end token resumes command processing.
a.True(handler("::my-end-token::\n"))
// Commands are processed again after resuming.
a.True(handler("::set-env name=y::valy\n"))
a.Equal("valy", rc.Env["y"])
}
func TestSetOutput(t *testing.T) {
a := assert.New(t)
ctx := context.Background()

View File

@@ -462,6 +462,11 @@ func useStepLogger(rc *RunContext, stepModel *model.Step, stage stepStage, execu
oldout, olderr := rc.JobContainer.ReplaceLogWriter(logWriter, logWriter)
defer rc.JobContainer.ReplaceLogWriter(oldout, olderr)
// Flush any buffered, not-yet-newline-terminated trailing line once the
// step has finished, so the final line of the step's output is not lost
// when it is not newline-terminated.
defer common.FlushWriter(logWriter)
return executor(ctx)
}
}