mirror of
https://gitea.com/gitea/act_runner.git
synced 2026-06-21 17:24:23 +02:00
Compare commits
2 Commits
33e6d1d8ff
...
3996d6d032
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3996d6d032 | ||
|
|
205af7cd01 |
@@ -12,6 +12,13 @@ import (
|
|||||||
// LineHandler is a callback function for handling a line
|
// LineHandler is a callback function for handling a line
|
||||||
type LineHandler func(line string) bool
|
type LineHandler func(line string) bool
|
||||||
|
|
||||||
|
// Flusher is implemented by writers that buffer a trailing, not-yet-terminated
|
||||||
|
// line. Callers should flush once the underlying stream has reached EOF so the
|
||||||
|
// final line (when it is not newline-terminated) is not lost.
|
||||||
|
type Flusher interface {
|
||||||
|
Flush()
|
||||||
|
}
|
||||||
|
|
||||||
type lineWriter struct {
|
type lineWriter struct {
|
||||||
buffer bytes.Buffer
|
buffer bytes.Buffer
|
||||||
handlers []LineHandler
|
handlers []LineHandler
|
||||||
@@ -24,6 +31,14 @@ func NewLineWriter(handlers ...LineHandler) io.Writer {
|
|||||||
return w
|
return w
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FlushWriter flushes w if it implements Flusher. It is a no-op otherwise, so
|
||||||
|
// callers can flush an io.Writer without knowing its concrete type.
|
||||||
|
func FlushWriter(w io.Writer) {
|
||||||
|
if f, ok := w.(Flusher); ok {
|
||||||
|
f.Flush()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (lw *lineWriter) Write(p []byte) (n int, err error) {
|
func (lw *lineWriter) Write(p []byte) (n int, err error) {
|
||||||
pBuf := bytes.NewBuffer(p)
|
pBuf := bytes.NewBuffer(p)
|
||||||
written := 0
|
written := 0
|
||||||
@@ -44,6 +59,17 @@ func (lw *lineWriter) Write(p []byte) (n int, err error) {
|
|||||||
return written, nil
|
return written, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Flush emits any buffered, not-yet-newline-terminated content as a final line.
|
||||||
|
// It is safe to call multiple times; subsequent calls with an empty buffer are
|
||||||
|
// no-ops.
|
||||||
|
func (lw *lineWriter) Flush() {
|
||||||
|
if lw.buffer.Len() == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
lw.handleLine(lw.buffer.String())
|
||||||
|
lw.buffer.Reset()
|
||||||
|
}
|
||||||
|
|
||||||
func (lw *lineWriter) handleLine(line string) {
|
func (lw *lineWriter) handleLine(line string) {
|
||||||
for _, h := range lw.handlers {
|
for _, h := range lw.handlers {
|
||||||
ok := h(line)
|
ok := h(line)
|
||||||
|
|||||||
@@ -5,6 +5,7 @@
|
|||||||
package common
|
package common
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"io"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
@@ -39,3 +40,33 @@ func TestLineWriter(t *testing.T) {
|
|||||||
assert.Equal(" and another\n", lines[2])
|
assert.Equal(" and another\n", lines[2])
|
||||||
assert.Equal("last line\n", lines[3])
|
assert.Equal("last line\n", lines[3])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestLineWriterFlush(t *testing.T) {
|
||||||
|
lines := make([]string, 0)
|
||||||
|
lineHandler := func(s string) bool {
|
||||||
|
lines = append(lines, s)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
lineWriter := NewLineWriter(lineHandler)
|
||||||
|
|
||||||
|
assert := assert.New(t)
|
||||||
|
_, err := lineWriter.Write([]byte("complete line\npartial line without newline"))
|
||||||
|
assert.NoError(err) //nolint:testifylint // pre-existing pattern from nektos/act
|
||||||
|
|
||||||
|
// Only the newline-terminated line is emitted before flushing.
|
||||||
|
assert.Equal([]string{"complete line\n"}, lines)
|
||||||
|
|
||||||
|
// Flushing emits the buffered, not-yet-terminated trailing line.
|
||||||
|
FlushWriter(lineWriter)
|
||||||
|
assert.Equal([]string{"complete line\n", "partial line without newline"}, lines)
|
||||||
|
|
||||||
|
// Flushing again is a no-op: nothing is buffered.
|
||||||
|
FlushWriter(lineWriter)
|
||||||
|
assert.Len(lines, 2)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFlushWriterIgnoresNonFlusher(t *testing.T) {
|
||||||
|
// FlushWriter must be a safe no-op for writers that do not buffer lines.
|
||||||
|
assert.NotPanics(t, func() { FlushWriter(io.Discard) })
|
||||||
|
}
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ import (
|
|||||||
"slices"
|
"slices"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"gitea.com/gitea/runner/act/common"
|
"gitea.com/gitea/runner/act/common"
|
||||||
"gitea.com/gitea/runner/act/filecollector"
|
"gitea.com/gitea/runner/act/filecollector"
|
||||||
@@ -45,6 +46,13 @@ import (
|
|||||||
"github.com/spf13/pflag"
|
"github.com/spf13/pflag"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// drainGracePeriod bounds how long we wait for an output-copy goroutine to
|
||||||
|
// finish draining a container's output before returning, so that neither a
|
||||||
|
// cancellation (waitForCommand) nor a normal container exit (wait) truncates
|
||||||
|
// the tail of the log. It is a safety bound: in the common case the stream
|
||||||
|
// reaches EOF and the goroutine returns well before this elapses.
|
||||||
|
const drainGracePeriod = 2 * time.Second
|
||||||
|
|
||||||
// NewContainer creates a reference to a container
|
// NewContainer creates a reference to a container
|
||||||
func NewContainer(input *NewContainerInput) ExecutionsEnvironment {
|
func NewContainer(input *NewContainerInput) ExecutionsEnvironment {
|
||||||
cr := new(containerReference)
|
cr := new(containerReference)
|
||||||
@@ -229,6 +237,10 @@ type containerReference struct {
|
|||||||
input *NewContainerInput
|
input *NewContainerInput
|
||||||
UID int
|
UID int
|
||||||
GID int
|
GID int
|
||||||
|
// attachDone is closed by the attach() streaming goroutine once it has
|
||||||
|
// drained and flushed the container's output. wait() blocks on it so the
|
||||||
|
// tail of the log lands before the step proceeds.
|
||||||
|
attachDone chan struct{}
|
||||||
LinuxContainerEnvironmentExtensions
|
LinuxContainerEnvironmentExtensions
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -730,7 +742,9 @@ func (cr *containerReference) tryReadGID() common.Executor {
|
|||||||
func (cr *containerReference) waitForCommand(ctx context.Context, isTerminal bool, resp client.HijackedResponse, _ client.ExecCreateResult, _, _ string) error {
|
func (cr *containerReference) waitForCommand(ctx context.Context, isTerminal bool, resp client.HijackedResponse, _ client.ExecCreateResult, _, _ string) error {
|
||||||
logger := common.Logger(ctx)
|
logger := common.Logger(ctx)
|
||||||
|
|
||||||
cmdResponse := make(chan error)
|
// Buffered so the copy goroutine never blocks on send if the grace-period
|
||||||
|
// drain below times out and no one is left to receive.
|
||||||
|
cmdResponse := make(chan error, 1)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
var outWriter io.Writer
|
var outWriter io.Writer
|
||||||
@@ -749,6 +763,11 @@ func (cr *containerReference) waitForCommand(ctx context.Context, isTerminal boo
|
|||||||
} else {
|
} else {
|
||||||
_, err = io.Copy(outWriter, resp.Reader)
|
_, err = io.Copy(outWriter, resp.Reader)
|
||||||
}
|
}
|
||||||
|
// Flush any buffered, not-yet-newline-terminated trailing line so the
|
||||||
|
// final line of a command's output is not lost (e.g. an error message
|
||||||
|
// printed without a trailing newline before the process exits).
|
||||||
|
common.FlushWriter(outWriter)
|
||||||
|
common.FlushWriter(errWriter)
|
||||||
cmdResponse <- err
|
cmdResponse <- err
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@@ -760,6 +779,16 @@ func (cr *containerReference) waitForCommand(ctx context.Context, isTerminal boo
|
|||||||
logger.Warnf("Failed to send CTRL+C: %+s", err)
|
logger.Warnf("Failed to send CTRL+C: %+s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Give the copy goroutine a brief grace period to drain output already
|
||||||
|
// produced by the command before we return, so cancellation does not
|
||||||
|
// truncate the tail of the log. The goroutine exits once the hijacked
|
||||||
|
// stream is closed by resp.Close() in the caller's defer.
|
||||||
|
select {
|
||||||
|
case <-cmdResponse:
|
||||||
|
case <-time.After(drainGracePeriod):
|
||||||
|
logger.Warn("Timed out draining command output after cancellation")
|
||||||
|
}
|
||||||
|
|
||||||
// we return the context canceled error to prevent other steps
|
// we return the context canceled error to prevent other steps
|
||||||
// from executing
|
// from executing
|
||||||
return ctx.Err()
|
return ctx.Err()
|
||||||
@@ -945,14 +974,23 @@ func (cr *containerReference) attach() common.Executor {
|
|||||||
if errWriter == nil {
|
if errWriter == nil {
|
||||||
errWriter = os.Stderr
|
errWriter = os.Stderr
|
||||||
}
|
}
|
||||||
|
done := make(chan struct{})
|
||||||
|
cr.attachDone = done
|
||||||
go func() {
|
go func() {
|
||||||
|
defer close(done)
|
||||||
|
var copyErr error
|
||||||
if !isTerminal || os.Getenv("NORAW") != "" {
|
if !isTerminal || os.Getenv("NORAW") != "" {
|
||||||
_, err = stdcopy.StdCopy(outWriter, errWriter, out.Reader)
|
_, copyErr = stdcopy.StdCopy(outWriter, errWriter, out.Reader)
|
||||||
} else {
|
} else {
|
||||||
_, err = io.Copy(outWriter, out.Reader)
|
_, copyErr = io.Copy(outWriter, out.Reader)
|
||||||
}
|
}
|
||||||
if err != nil {
|
// Flush any buffered, not-yet-newline-terminated trailing line once
|
||||||
common.Logger(ctx).Error(err)
|
// the stream reaches EOF, so the final line of the container's
|
||||||
|
// output is not lost when it is not newline-terminated.
|
||||||
|
common.FlushWriter(outWriter)
|
||||||
|
common.FlushWriter(errWriter)
|
||||||
|
if copyErr != nil {
|
||||||
|
common.Logger(ctx).Error(copyErr)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
return nil
|
return nil
|
||||||
@@ -991,6 +1029,18 @@ func (cr *containerReference) wait() common.Executor {
|
|||||||
|
|
||||||
logger.Debugf("Return status: %v", statusCode)
|
logger.Debugf("Return status: %v", statusCode)
|
||||||
|
|
||||||
|
// The container has exited; wait for the attach() streaming goroutine to
|
||||||
|
// finish draining and flushing its output before returning, so the tail
|
||||||
|
// of the log is not lost. Bounded so a stuck stream cannot hang the step.
|
||||||
|
if cr.attachDone != nil {
|
||||||
|
select {
|
||||||
|
case <-cr.attachDone:
|
||||||
|
case <-time.After(drainGracePeriod):
|
||||||
|
logger.Warn("Timed out draining container output")
|
||||||
|
}
|
||||||
|
cr.attachDone = nil
|
||||||
|
}
|
||||||
|
|
||||||
if statusCode == 0 {
|
if statusCode == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import (
|
|||||||
"bufio"
|
"bufio"
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/binary"
|
||||||
"errors"
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
@@ -20,6 +21,7 @@ import (
|
|||||||
"gitea.com/gitea/runner/act/common"
|
"gitea.com/gitea/runner/act/common"
|
||||||
|
|
||||||
cerrdefs "github.com/containerd/errdefs"
|
cerrdefs "github.com/containerd/errdefs"
|
||||||
|
"github.com/moby/moby/api/pkg/stdcopy"
|
||||||
"github.com/moby/moby/api/types/container"
|
"github.com/moby/moby/api/types/container"
|
||||||
mobyclient "github.com/moby/moby/client"
|
mobyclient "github.com/moby/moby/client"
|
||||||
"github.com/sirupsen/logrus/hooks/test"
|
"github.com/sirupsen/logrus/hooks/test"
|
||||||
@@ -89,6 +91,11 @@ func (m *mockDockerClient) ExecInspect(ctx context.Context, execID string, opts
|
|||||||
return args.Get(0).(mobyclient.ExecInspectResult), args.Error(1)
|
return args.Get(0).(mobyclient.ExecInspectResult), args.Error(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *mockDockerClient) ContainerAttach(ctx context.Context, containerID string, opts mobyclient.ContainerAttachOptions) (mobyclient.ContainerAttachResult, error) {
|
||||||
|
args := m.Called(ctx, containerID, opts)
|
||||||
|
return args.Get(0).(mobyclient.ContainerAttachResult), args.Error(1)
|
||||||
|
}
|
||||||
|
|
||||||
func (m *mockDockerClient) ContainerWait(ctx context.Context, containerID string, opts mobyclient.ContainerWaitOptions) mobyclient.ContainerWaitResult {
|
func (m *mockDockerClient) ContainerWait(ctx context.Context, containerID string, opts mobyclient.ContainerWaitOptions) mobyclient.ContainerWaitResult {
|
||||||
args := m.Called(ctx, containerID, opts)
|
args := m.Called(ctx, containerID, opts)
|
||||||
return args.Get(0).(mobyclient.ContainerWaitResult)
|
return args.Get(0).(mobyclient.ContainerWaitResult)
|
||||||
@@ -206,6 +213,71 @@ func TestDockerExecFailure(t *testing.T) {
|
|||||||
client.AssertExpectations(t)
|
client.AssertExpectations(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// stdcopyFrame wraps payload in a single Docker multiplexed-stream frame, the
|
||||||
|
// format StdCopy expects: an 8-byte header (stream type + 4-byte big-endian
|
||||||
|
// length) followed by the payload.
|
||||||
|
func stdcopyFrame(stream stdcopy.StdType, payload string) []byte {
|
||||||
|
b := make([]byte, 8+len(payload))
|
||||||
|
b[0] = byte(stream)
|
||||||
|
binary.BigEndian.PutUint32(b[4:8], uint32(len(payload)))
|
||||||
|
copy(b[8:], payload)
|
||||||
|
return b
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestDockerAttachFlushesTrailingLine verifies that wait() blocks until the
|
||||||
|
// attach() streaming goroutine has drained and flushed the container's output,
|
||||||
|
// so a final line without a trailing newline is not lost.
|
||||||
|
func TestDockerAttachFlushesTrailingLine(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
framed := bytes.NewBuffer(stdcopyFrame(stdcopy.Stdout, "line one\nlast line without newline"))
|
||||||
|
|
||||||
|
var lines []string
|
||||||
|
logWriter := common.NewLineWriter(func(s string) bool {
|
||||||
|
lines = append(lines, s)
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
client := &mockDockerClient{}
|
||||||
|
client.On("ContainerAttach", ctx, "123", mock.AnythingOfType("client.ContainerAttachOptions")).
|
||||||
|
Return(mobyclient.ContainerAttachResult{
|
||||||
|
HijackedResponse: mobyclient.HijackedResponse{
|
||||||
|
Conn: &mockConn{},
|
||||||
|
Reader: bufio.NewReader(framed),
|
||||||
|
},
|
||||||
|
}, nil)
|
||||||
|
|
||||||
|
statusCh := make(chan container.WaitResponse, 1)
|
||||||
|
statusCh <- container.WaitResponse{StatusCode: 0}
|
||||||
|
errCh := make(chan error, 1)
|
||||||
|
client.On("ContainerWait", ctx, "123", mobyclient.ContainerWaitOptions{Condition: container.WaitConditionNotRunning}).
|
||||||
|
Return(mobyclient.ContainerWaitResult{
|
||||||
|
Result: (<-chan container.WaitResponse)(statusCh),
|
||||||
|
Error: (<-chan error)(errCh),
|
||||||
|
})
|
||||||
|
|
||||||
|
cr := &containerReference{
|
||||||
|
id: "123",
|
||||||
|
cli: client,
|
||||||
|
input: &NewContainerInput{
|
||||||
|
Image: "image",
|
||||||
|
Stdout: logWriter,
|
||||||
|
Stderr: logWriter,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NoError(t, cr.attach()(ctx))
|
||||||
|
require.NoError(t, cr.wait()(ctx))
|
||||||
|
|
||||||
|
// wait() must have blocked until the goroutine drained AND flushed; the
|
||||||
|
// trailing, non-newline-terminated line must therefore be present. Reading
|
||||||
|
// lines here is race-free because wait() synchronizes on attachDone, which
|
||||||
|
// the goroutine closes after the final append.
|
||||||
|
assert.Equal(t, []string{"line one\n", "last line without newline"}, lines)
|
||||||
|
|
||||||
|
client.AssertExpectations(t)
|
||||||
|
}
|
||||||
|
|
||||||
func TestDockerWaitFailure(t *testing.T) {
|
func TestDockerWaitFailure(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
|
|||||||
@@ -323,28 +323,28 @@ func (e *HostEnvironment) exec(ctx context.Context, command []string, cmdline st
|
|||||||
cmd.Dir = wd
|
cmd.Dir = wd
|
||||||
cmd.SysProcAttr = getSysProcAttr(cmdline, false)
|
cmd.SysProcAttr = getSysProcAttr(cmdline, false)
|
||||||
|
|
||||||
// On Windows a step often launches a process tree (a shell that starts a
|
// A step often launches a process tree (a shell that starts a child which
|
||||||
// child which spawns further GUI or background processes). The default
|
// spawns further background or GUI processes). The default context
|
||||||
// context cancellation only kills the direct child, leaving the rest of the
|
// cancellation only kills the direct child, leaving the rest of the tree
|
||||||
// tree running; and because the orphans inherit cmd's stdout/stderr pipe,
|
// running; and because the orphans inherit cmd's stdout/stderr pipe,
|
||||||
// cmd.Wait() would block forever, hanging the runner. Kill the whole tree
|
// cmd.Wait() would block forever, hanging the runner. Kill the whole tree on
|
||||||
// via a Job Object on cancellation, and bound the wait so a leftover pipe
|
// cancellation — via a Job Object on Windows and the process group on Unix
|
||||||
// writer can never hang Wait indefinitely.
|
// (see processKiller) — and bound the wait so a leftover pipe writer can
|
||||||
|
// never hang Wait indefinitely.
|
||||||
var killer atomic.Pointer[processKiller]
|
var killer atomic.Pointer[processKiller]
|
||||||
if runtime.GOOS == "windows" {
|
cmd.Cancel = func() error {
|
||||||
cmd.Cancel = func() error {
|
if k := killer.Load(); k != nil {
|
||||||
if k := killer.Load(); k != nil {
|
return k.Kill()
|
||||||
return k.Kill()
|
|
||||||
}
|
|
||||||
if cmd.Process != nil {
|
|
||||||
return cmd.Process.Kill()
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
// Once the step process has exited, give its I/O pipes at most this long
|
if cmd.Process != nil {
|
||||||
// to drain before Wait force-closes them and returns (Go's WaitDelay).
|
return cmd.Process.Kill()
|
||||||
cmd.WaitDelay = 10 * time.Second
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
// Once the step process has exited, give its I/O pipes at most this long to
|
||||||
|
// drain before Wait force-closes them and returns (Go's WaitDelay). This
|
||||||
|
// also covers a step that backgrounds a process holding the pipe open.
|
||||||
|
cmd.WaitDelay = 10 * time.Second
|
||||||
|
|
||||||
var ppty *os.File
|
var ppty *os.File
|
||||||
var tty *os.File
|
var tty *os.File
|
||||||
@@ -375,17 +375,16 @@ func (e *HostEnvironment) exec(ctx context.Context, command []string, cmdline st
|
|||||||
if err := cmd.Start(); err != nil {
|
if err := cmd.Start(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if runtime.GOOS == "windows" {
|
// Capture the started process for tree-kill on cancellation: a Job Object on
|
||||||
// Assign the started process to a Job Object so cmd.Cancel can kill the
|
// Windows (children spawned afterwards are auto-included) and the process
|
||||||
// whole descendant tree. Children spawned afterwards are auto-included.
|
// group on Unix. On failure (e.g. Windows nested-job restrictions) we fall
|
||||||
// On failure (e.g. nested-job restrictions) we fall back to the default
|
// back to the default single-process kill; WaitDelay + end-of-job cleanup
|
||||||
// single-process kill; WaitDelay + end-of-job cleanup still apply.
|
// still apply.
|
||||||
if k, kerr := newProcessKiller(cmd.Process); kerr != nil {
|
if k, kerr := newProcessKiller(cmd.Process); kerr != nil {
|
||||||
common.Logger(ctx).Warnf("process tree kill setup failed, falling back to single-process kill: %v", kerr)
|
common.Logger(ctx).Warnf("process tree kill setup failed, falling back to single-process kill: %v", kerr)
|
||||||
} else {
|
} else {
|
||||||
killer.Store(k)
|
killer.Store(k)
|
||||||
defer k.Close()
|
defer k.Close()
|
||||||
}
|
|
||||||
}
|
}
|
||||||
err = cmd.Wait()
|
err = cmd.Wait()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -1,19 +1,29 @@
|
|||||||
// Copyright 2026 The Gitea Authors. All rights reserved.
|
// Copyright 2026 The Gitea Authors. All rights reserved.
|
||||||
// SPDX-License-Identifier: MIT
|
// SPDX-License-Identifier: MIT
|
||||||
|
|
||||||
//go:build !windows
|
//go:build plan9
|
||||||
|
|
||||||
package container
|
package container
|
||||||
|
|
||||||
import "os"
|
import "os"
|
||||||
|
|
||||||
// processKiller is a no-op on non-Windows platforms. The Job Object based
|
// processKiller falls back to single-process termination on platforms without
|
||||||
// tree-kill is only wired in on Windows (see exec()); elsewhere the default
|
// a process-group / Job Object tree-kill. The Job Object (Windows) and process
|
||||||
// exec.CommandContext cancellation and Setpgid handling apply.
|
// group (Unix) based tree-kills live in process_windows.go / process_unix.go;
|
||||||
type processKiller struct{}
|
// here we just kill the direct child, matching the previous default behaviour.
|
||||||
|
type processKiller struct {
|
||||||
|
p *os.Process
|
||||||
|
}
|
||||||
|
|
||||||
func newProcessKiller(_ *os.Process) (*processKiller, error) { return &processKiller{}, nil }
|
func newProcessKiller(p *os.Process) (*processKiller, error) {
|
||||||
|
return &processKiller{p: p}, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (k *processKiller) Kill() error { return nil }
|
func (k *processKiller) Kill() error {
|
||||||
|
if k == nil || k.p == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return k.p.Kill()
|
||||||
|
}
|
||||||
|
|
||||||
func (k *processKiller) Close() error { return nil }
|
func (k *processKiller) Close() error { return nil }
|
||||||
|
|||||||
56
act/container/process_unix.go
Normal file
56
act/container/process_unix.go
Normal file
@@ -0,0 +1,56 @@
|
|||||||
|
// Copyright 2026 The Gitea Authors. All rights reserved.
|
||||||
|
// SPDX-License-Identifier: MIT
|
||||||
|
|
||||||
|
//go:build !windows && !plan9
|
||||||
|
|
||||||
|
package container
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"os"
|
||||||
|
"syscall"
|
||||||
|
)
|
||||||
|
|
||||||
|
// processKiller terminates a step process together with its whole process
|
||||||
|
// group, which is the Unix counterpart of the Windows Job Object tree-kill.
|
||||||
|
//
|
||||||
|
// Background: a step often launches a process tree (a shell that starts a child
|
||||||
|
// which in turn spawns further background processes). The default
|
||||||
|
// exec.CommandContext cancellation only kills the direct child, so cancelling a
|
||||||
|
// job left the rest of the tree running. Because those orphans inherited the
|
||||||
|
// step's stdout/stderr pipe, cmd.Wait() also blocked forever and the runner
|
||||||
|
// hung.
|
||||||
|
//
|
||||||
|
// Steps are started with Setpgid (or Setsid for the PTY path, see
|
||||||
|
// getSysProcAttr), which makes the step process the leader of a new process
|
||||||
|
// group whose ID equals its PID. Signalling the negative PID delivers to every
|
||||||
|
// process still in that group, so we can tear down the whole tree atomically on
|
||||||
|
// cancellation, which also closes the inherited pipe handles so cmd.Wait() can
|
||||||
|
// return.
|
||||||
|
type processKiller struct {
|
||||||
|
pgid int
|
||||||
|
}
|
||||||
|
|
||||||
|
// newProcessKiller captures the process group of p (an already-started
|
||||||
|
// process). Because the step is launched with Setpgid/Setsid, p is a group
|
||||||
|
// leader and its PGID equals its PID; children spawned afterwards stay in the
|
||||||
|
// same group unless they explicitly create their own.
|
||||||
|
func newProcessKiller(p *os.Process) (*processKiller, error) {
|
||||||
|
return &processKiller{pgid: p.Pid}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Kill sends SIGKILL to the entire process group (the step process and every
|
||||||
|
// descendant that stayed in the group). A missing group (ESRCH) means the
|
||||||
|
// processes already exited and is not treated as an error.
|
||||||
|
func (k *processKiller) Kill() error {
|
||||||
|
if k == nil || k.pgid <= 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err := syscall.Kill(-k.pgid, syscall.SIGKILL); err != nil && !errors.Is(err, syscall.ESRCH) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close is a no-op on Unix; there is no job handle to release.
|
||||||
|
func (k *processKiller) Close() error { return nil }
|
||||||
100
act/container/process_unix_test.go
Normal file
100
act/container/process_unix_test.go
Normal file
@@ -0,0 +1,100 @@
|
|||||||
|
// Copyright 2026 The Gitea Authors. All rights reserved.
|
||||||
|
// SPDX-License-Identifier: MIT
|
||||||
|
|
||||||
|
//go:build !windows && !plan9
|
||||||
|
|
||||||
|
package container
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"os/exec"
|
||||||
|
"path/filepath"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"syscall"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
// processAlive reports whether pid refers to a still-running process. Signal 0
|
||||||
|
// performs error checking without delivering a signal: a nil error (or EPERM)
|
||||||
|
// means the process exists, ESRCH means it is gone.
|
||||||
|
//
|
||||||
|
// On Linux, zombie processes (state Z in /proc/<pid>/stat) appear alive to
|
||||||
|
// kill(0) but have already terminated — their corpse lingers until the parent
|
||||||
|
// calls wait(). In a Docker container the child may be reparented to a PID 1
|
||||||
|
// that does not reap promptly, so we treat zombies as not alive.
|
||||||
|
func processAlive(pid int) bool {
|
||||||
|
err := syscall.Kill(pid, 0)
|
||||||
|
if err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
// On Linux /proc is available; check whether the process is a zombie.
|
||||||
|
if b, readErr := os.ReadFile(fmt.Sprintf("/proc/%d/stat", pid)); readErr == nil {
|
||||||
|
// Format: "pid (comm) state ..." — state follows the closing ')' of the
|
||||||
|
// command name (which may itself contain spaces and parens).
|
||||||
|
rest := string(b)
|
||||||
|
if idx := strings.LastIndex(rest, ") "); idx >= 0 {
|
||||||
|
fields := strings.Fields(rest[idx+2:])
|
||||||
|
if len(fields) > 0 && fields[0] == "Z" {
|
||||||
|
return false // zombie: terminated but not yet reaped
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestProcessKillerKillsTree verifies that a process group captured by the
|
||||||
|
// killer is terminated together with a child the step spawns afterwards. This
|
||||||
|
// mirrors a step that launches a child which spawns further processes, where
|
||||||
|
// cancelling the job must take down the whole tree, not just the direct child.
|
||||||
|
func TestProcessKillerKillsTree(t *testing.T) {
|
||||||
|
dir := t.TempDir()
|
||||||
|
pidFile := filepath.Join(dir, "child.pid")
|
||||||
|
|
||||||
|
// Parent shell backgrounds a long-lived child (writing its PID to a file)
|
||||||
|
// and then sleeps. With job control off (non-interactive sh) the backgrounded
|
||||||
|
// child stays in the parent's process group, so the group kill must reach it.
|
||||||
|
script := fmt.Sprintf(`sleep 600 & echo $! > %q; sleep 600`, pidFile)
|
||||||
|
cmd := exec.Command("/bin/sh", "-c", script)
|
||||||
|
// Launch as its own process-group leader, exactly like a real step does (see
|
||||||
|
// getSysProcAttr), so the killer's PGID == the process PID.
|
||||||
|
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
|
||||||
|
require.NoError(t, cmd.Start())
|
||||||
|
t.Cleanup(func() {
|
||||||
|
_ = syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL)
|
||||||
|
_ = cmd.Wait()
|
||||||
|
})
|
||||||
|
|
||||||
|
killer, err := newProcessKiller(cmd.Process)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer killer.Close()
|
||||||
|
|
||||||
|
// Wait for the backgrounded child PID to be reported.
|
||||||
|
var childPID int
|
||||||
|
require.Eventually(t, func() bool {
|
||||||
|
b, e := os.ReadFile(pidFile)
|
||||||
|
if e != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
s := strings.TrimSpace(string(b))
|
||||||
|
if s == "" {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
childPID, _ = strconv.Atoi(s)
|
||||||
|
return childPID > 0 && processAlive(childPID)
|
||||||
|
}, 20*time.Second, 100*time.Millisecond, "child process should start")
|
||||||
|
|
||||||
|
// Killing the group must terminate both the parent and the backgrounded child.
|
||||||
|
require.NoError(t, killer.Kill())
|
||||||
|
// Reap the parent so it does not linger as a zombie (which would still report
|
||||||
|
// as alive); SIGKILL makes Wait return promptly.
|
||||||
|
_ = cmd.Wait()
|
||||||
|
|
||||||
|
require.Eventually(t, func() bool {
|
||||||
|
return !processAlive(childPID)
|
||||||
|
}, 20*time.Second, 100*time.Millisecond, "backgrounded child should be terminated")
|
||||||
|
}
|
||||||
@@ -48,8 +48,11 @@ func (rc *RunContext) commandHandler(ctx context.Context) common.LineHandler {
|
|||||||
if resumeCommand != "" && command != resumeCommand {
|
if resumeCommand != "" && command != resumeCommand {
|
||||||
// There should not be any emojis in the log output for Gitea.
|
// There should not be any emojis in the log output for Gitea.
|
||||||
// The code in the switch statement is the same.
|
// The code in the switch statement is the same.
|
||||||
|
// Return true (not false) so the line still reaches the raw_output
|
||||||
|
// log handler; otherwise everything between ::stop-commands:: and
|
||||||
|
// its end token is silently dropped from the step log.
|
||||||
logger.Infof("%s", line)
|
logger.Infof("%s", line)
|
||||||
return false
|
return true
|
||||||
}
|
}
|
||||||
arg = UnescapeCommandData(arg)
|
arg = UnescapeCommandData(arg)
|
||||||
kvPairs = unescapeKvPairs(kvPairs)
|
kvPairs = unescapeKvPairs(kvPairs)
|
||||||
|
|||||||
@@ -28,6 +28,29 @@ func TestSetEnv(t *testing.T) {
|
|||||||
a.Equal("valz", rc.Env["x"])
|
a.Equal("valz", rc.Env["x"])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestStopCommandsKeepsSuppressedLinesInLog(t *testing.T) {
|
||||||
|
a := assert.New(t)
|
||||||
|
ctx := context.Background()
|
||||||
|
rc := new(RunContext)
|
||||||
|
handler := rc.commandHandler(ctx)
|
||||||
|
|
||||||
|
// Stop command processing until the matching end token is seen.
|
||||||
|
a.True(handler("::stop-commands::my-end-token\n"))
|
||||||
|
|
||||||
|
// A command-shaped line while stopped must not be executed (env unchanged),
|
||||||
|
// but must still return true so it reaches the raw_output log handler and is
|
||||||
|
// not dropped from the step log.
|
||||||
|
a.True(handler("::set-env name=x::valz\n"))
|
||||||
|
a.NotContains(rc.Env, "x")
|
||||||
|
|
||||||
|
// The matching end token resumes command processing.
|
||||||
|
a.True(handler("::my-end-token::\n"))
|
||||||
|
|
||||||
|
// Commands are processed again after resuming.
|
||||||
|
a.True(handler("::set-env name=y::valy\n"))
|
||||||
|
a.Equal("valy", rc.Env["y"])
|
||||||
|
}
|
||||||
|
|
||||||
func TestSetOutput(t *testing.T) {
|
func TestSetOutput(t *testing.T) {
|
||||||
a := assert.New(t)
|
a := assert.New(t)
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|||||||
@@ -462,6 +462,11 @@ func useStepLogger(rc *RunContext, stepModel *model.Step, stage stepStage, execu
|
|||||||
oldout, olderr := rc.JobContainer.ReplaceLogWriter(logWriter, logWriter)
|
oldout, olderr := rc.JobContainer.ReplaceLogWriter(logWriter, logWriter)
|
||||||
defer rc.JobContainer.ReplaceLogWriter(oldout, olderr)
|
defer rc.JobContainer.ReplaceLogWriter(oldout, olderr)
|
||||||
|
|
||||||
|
// Flush any buffered, not-yet-newline-terminated trailing line once the
|
||||||
|
// step has finished, so the final line of the step's output is not lost
|
||||||
|
// when it is not newline-terminated.
|
||||||
|
defer common.FlushWriter(logWriter)
|
||||||
|
|
||||||
return executor(ctx)
|
return executor(ctx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user