3 Commits

Author SHA1 Message Date
Nicolas
c5d0457615 Merge branch 'main' into lunny/remove_network 2026-05-24 09:58:45 +00:00
Nicolas
273f6b4247 fix(reporter): respect configured log level for job log forwarding (#989)
## Summary

- Non-raw_output log entries above the globally configured `log.level` are no longer forwarded to the Gitea job log output
- Step output (`raw_output=true`) is always forwarded regardless of level — it is actual job stdout/stderr, not runner internals
- State-machine fields (`stepResult`, `jobResult`) are always processed regardless of level, preserving correct tracking for skipped steps (whose `stepResult` is emitted at `DebugLevel` in `step.go`)
- Extracts a `shouldAppendLogRow` helper to avoid repeating the combined `!duringSteps() && entry.Level <= log.GetLevel()` guard in three places

## Why not the approach in #677

PR #677 adds `if entry.Level != log.GetLevel() { return nil }` at the top of `Fire()`. That has two bugs:
1. Uses `!=` instead of `>`, so `Error`/`Fatal` entries are dropped when the configured level is `Warn`
2. Returns early before processing `stepResult`/`jobResult` state fields — skipped steps (whose `stepResult` is logged at `DebugLevel`) would never be marked complete

This fix instead applies the level guard only at the `r.logRows` append sites, leaving state tracking unconditional.

Relates to #409.

Reviewed-on: https://gitea.com/gitea/runner/pulls/989
Reviewed-by: Lunny Xiao <xiaolunwen@gmail.com>
2026-05-23 17:28:44 +00:00
Lunny Xiao
3815aad750 fix cleanup network 2026-05-19 16:39:51 -07:00
6 changed files with 330 additions and 27 deletions

View File

@@ -8,12 +8,24 @@ package container
import (
"context"
"time"
"gitea.com/gitea/runner/act/common"
"github.com/moby/moby/client"
)
var (
dockerNetworkRemoveRetryInterval = 200 * time.Millisecond
dockerNetworkRemoveTimeout = 10 * time.Second
)
type dockerNetworkClient interface {
NetworkList(ctx context.Context, options client.NetworkListOptions) (client.NetworkListResult, error)
NetworkInspect(ctx context.Context, networkID string, options client.NetworkInspectOptions) (client.NetworkInspectResult, error)
NetworkRemove(ctx context.Context, networkID string, options client.NetworkRemoveOptions) (client.NetworkRemoveResult, error)
}
func NewDockerNetworkCreateExecutor(name string) common.Executor {
return func(ctx context.Context) error {
cli, err := GetDockerClient(ctx)
@@ -56,31 +68,64 @@ func NewDockerNetworkRemoveExecutor(name string) common.Executor {
}
defer cli.Close()
// Make sure that all network of the specified name are removed
// cli.NetworkRemove refuses to remove a network if there are duplicates
networks, err := cli.NetworkList(ctx, client.NetworkListOptions{})
return removeDockerNetworks(ctx, cli, name)
}
}
func removeDockerNetworks(ctx context.Context, cli dockerNetworkClient, name string) error {
cleanupCtx, cancel := context.WithTimeout(ctx, dockerNetworkRemoveTimeout)
defer cancel()
for {
pendingRemoval, err := removeDockerNetworksOnce(cleanupCtx, cli, name)
if err != nil {
return err
}
// For Gitea, reduce log noise
// common.Logger(ctx).Debugf("%v", networks)
for _, n := range networks.Items {
if n.Name == name {
result, err := cli.NetworkInspect(ctx, n.ID, client.NetworkInspectOptions{})
if err != nil {
return err
}
if len(result.Network.Containers) == 0 {
if _, err = cli.NetworkRemove(ctx, n.ID, client.NetworkRemoveOptions{}); err != nil {
common.Logger(ctx).Debugf("%v", err)
}
} else {
common.Logger(ctx).Debugf("Refusing to remove network %v because it still has active endpoints", name)
}
}
if !pendingRemoval {
return nil
}
return err
select {
case <-cleanupCtx.Done():
common.Logger(ctx).Warnf("Timed out waiting for Docker network %v endpoints to detach; leaving network behind", name)
return nil
case <-time.After(dockerNetworkRemoveRetryInterval):
}
}
}
func removeDockerNetworksOnce(ctx context.Context, cli dockerNetworkClient, name string) (bool, error) {
// Make sure that all network of the specified name are removed.
// cli.NetworkRemove refuses to remove a network if there are duplicates.
networks, err := cli.NetworkList(ctx, client.NetworkListOptions{})
if err != nil {
return false, err
}
// For Gitea, reduce log noise
// common.Logger(ctx).Debugf("%v", networks)
pendingRemoval := false
for _, n := range networks.Items {
if n.Name != name {
continue
}
result, err := cli.NetworkInspect(ctx, n.ID, client.NetworkInspectOptions{})
if err != nil {
return false, err
}
if len(result.Network.Containers) != 0 {
pendingRemoval = true
common.Logger(ctx).Debugf("Waiting to remove network %v because it still has active endpoints", name)
continue
}
if _, err = cli.NetworkRemove(ctx, n.ID, client.NetworkRemoveOptions{}); err != nil {
pendingRemoval = true
common.Logger(ctx).Debugf("Retrying Docker network removal for %v: %v", name, err)
}
}
return pendingRemoval, nil
}

View File

@@ -0,0 +1,115 @@
// Copyright 2026 The Gitea Authors. All rights reserved.
// Copyright 2026 The nektos/act Authors. All rights reserved.
// SPDX-License-Identifier: MIT
//go:build !(WITHOUT_DOCKER || !(linux || darwin || windows || netbsd))
package container
import (
"context"
"testing"
"time"
containernetwork "github.com/moby/moby/api/types/network"
"github.com/moby/moby/client"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type fakeDockerNetworkClient struct {
listResult client.NetworkListResult
inspectByID map[string][]client.NetworkInspectResult
inspectCalls map[string]int
removeCalls []string
removeErrs map[string][]error
removeIdx map[string]int
}
func (f *fakeDockerNetworkClient) NetworkList(context.Context, client.NetworkListOptions) (client.NetworkListResult, error) {
return f.listResult, nil
}
func (f *fakeDockerNetworkClient) NetworkInspect(_ context.Context, networkID string, _ client.NetworkInspectOptions) (client.NetworkInspectResult, error) {
idx := f.inspectCalls[networkID]
f.inspectCalls[networkID] = idx + 1
results := f.inspectByID[networkID]
if len(results) == 0 {
return client.NetworkInspectResult{}, nil
}
if idx >= len(results) {
return results[len(results)-1], nil
}
return results[idx], nil
}
func (f *fakeDockerNetworkClient) NetworkRemove(_ context.Context, networkID string, _ client.NetworkRemoveOptions) (client.NetworkRemoveResult, error) {
f.removeCalls = append(f.removeCalls, networkID)
idx := f.removeIdx[networkID]
f.removeIdx[networkID] = idx + 1
if errs := f.removeErrs[networkID]; idx < len(errs) {
return client.NetworkRemoveResult{}, errs[idx]
}
return client.NetworkRemoveResult{}, nil
}
func TestRemoveDockerNetworksRetriesUntilEndpointsDetach(t *testing.T) {
originalInterval := dockerNetworkRemoveRetryInterval
originalTimeout := dockerNetworkRemoveTimeout
dockerNetworkRemoveRetryInterval = time.Millisecond
dockerNetworkRemoveTimeout = 50 * time.Millisecond
t.Cleanup(func() {
dockerNetworkRemoveRetryInterval = originalInterval
dockerNetworkRemoveTimeout = originalTimeout
})
cli := &fakeDockerNetworkClient{
listResult: client.NetworkListResult{
Items: []containernetwork.Summary{{Network: containernetwork.Network{ID: "n1", Name: "test"}}},
},
inspectByID: map[string][]client.NetworkInspectResult{
"n1": {
{Network: containernetwork.Inspect{Containers: map[string]containernetwork.EndpointResource{"c1": {}}}},
{Network: containernetwork.Inspect{Containers: map[string]containernetwork.EndpointResource{}}},
},
},
inspectCalls: map[string]int{},
removeErrs: map[string][]error{},
removeIdx: map[string]int{},
}
err := removeDockerNetworks(context.Background(), cli, "test")
require.NoError(t, err)
assert.Equal(t, []string{"n1"}, cli.removeCalls)
assert.GreaterOrEqual(t, cli.inspectCalls["n1"], 2)
}
func TestRemoveDockerNetworksStopsRetryingAfterTimeout(t *testing.T) {
originalInterval := dockerNetworkRemoveRetryInterval
originalTimeout := dockerNetworkRemoveTimeout
dockerNetworkRemoveRetryInterval = time.Millisecond
dockerNetworkRemoveTimeout = 5 * time.Millisecond
t.Cleanup(func() {
dockerNetworkRemoveRetryInterval = originalInterval
dockerNetworkRemoveTimeout = originalTimeout
})
cli := &fakeDockerNetworkClient{
listResult: client.NetworkListResult{
Items: []containernetwork.Summary{{Network: containernetwork.Network{ID: "n1", Name: "test"}}},
},
inspectByID: map[string][]client.NetworkInspectResult{
"n1": {
{Network: containernetwork.Inspect{Containers: map[string]containernetwork.EndpointResource{"c1": {}}}},
},
},
inspectCalls: map[string]int{},
removeErrs: map[string][]error{},
removeIdx: map[string]int{},
}
err := removeDockerNetworks(context.Background(), cli, "test")
require.NoError(t, err)
assert.Empty(t, cli.removeCalls)
assert.Positive(t, cli.inspectCalls["n1"])
}

View File

@@ -35,6 +35,7 @@ func newJobExecutor(info jobInfo, sf stepFactory, rc *RunContext) common.Executo
steps := make([]common.Executor, 0)
preSteps := make([]common.Executor, 0)
var postExecutor common.Executor
var startErr error
steps = append(steps, func(ctx context.Context) error {
logger := common.Logger(ctx)
@@ -165,7 +166,12 @@ func newJobExecutor(info jobInfo, sf stepFactory, rc *RunContext) common.Executo
pipeline = append(pipeline, preSteps...)
pipeline = append(pipeline, steps...)
return common.NewPipelineExecutor(info.startContainer(), common.NewPipelineExecutor(pipeline...).
startContainer := func(ctx context.Context) error {
startErr = info.startContainer()(ctx)
return startErr
}
return common.NewPipelineExecutor(startContainer, common.NewPipelineExecutor(pipeline...).
Finally(func(ctx context.Context) error {
var cancel context.CancelFunc
if ctx.Err() == context.Canceled {
@@ -176,8 +182,23 @@ func newJobExecutor(info jobInfo, sf stepFactory, rc *RunContext) common.Executo
}
return postExecutor(ctx)
}).
Finally(info.interpolateOutputs()).
Finally(info.closeContainer()))
Finally(info.interpolateOutputs())).
Finally(func(ctx context.Context) error {
if startErr == nil {
return nil
}
cleanupCtx, cancel := context.WithTimeout(common.WithLogger(context.Background(), common.Logger(ctx)), time.Minute)
defer cancel()
logger := common.Logger(cleanupCtx)
logger.Infof("Cleaning up container for failed startup of job %s", rc.JobName)
if err := info.stopContainer()(cleanupCtx); err != nil {
logger.Errorf("Error while cleaning up failed job startup: %v", err)
}
return nil
}).
Finally(info.closeContainer())
}
func setJobResult(ctx context.Context, info jobInfo, rc *RunContext, success bool) {

View File

@@ -18,6 +18,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
func TestJobExecutor(t *testing.T) {
@@ -341,3 +342,64 @@ func TestNewJobExecutor(t *testing.T) {
})
}
}
func TestNewJobExecutorCleansUpAfterStartContainerFailure(t *testing.T) {
ctx := common.WithJobErrorContainer(context.Background())
jim := &jobInfoMock{}
sfm := &stepFactoryMock{}
rc := &RunContext{
JobName: "test",
JobContainer: &jobContainerMock{},
Run: &model.Run{
JobID: "test",
Workflow: &model.Workflow{
Jobs: map[string]*model.Job{
"test": {},
},
},
},
Config: &Config{},
}
rc.ExprEval = rc.NewExpressionEvaluator(ctx)
executorOrder := make([]string, 0)
startErr := errors.New("failed to start container")
stepModel := &model.Step{ID: "1"}
sm := &stepMock{}
jim.On("steps").Return([]*model.Step{stepModel})
jim.On("startContainer").Return(func(ctx context.Context) error {
executorOrder = append(executorOrder, "startContainer")
return startErr
})
jim.On("stopContainer").Return(func(ctx context.Context) error {
executorOrder = append(executorOrder, "stopContainer")
return nil
})
jim.On("closeContainer").Return(func(ctx context.Context) error {
executorOrder = append(executorOrder, "closeContainer")
return nil
})
jim.On("interpolateOutputs").Return(func(ctx context.Context) error {
return nil
})
sfm.On("newStep", stepModel, rc).Return(sm, nil)
sm.On("pre").Return(func(ctx context.Context) error {
return nil
})
sm.On("main").Return(func(ctx context.Context) error {
return nil
})
sm.On("post").Return(func(ctx context.Context) error {
return nil
})
executor := newJobExecutor(jim, sfm, rc)
err := executor(ctx)
require.ErrorIs(t, err, startErr)
assert.Equal(t, []string{"startContainer", "stopContainer", "closeContainer"}, executorOrder)
jim.AssertExpectations(t)
sfm.AssertExpectations(t)
sm.AssertExpectations(t)
}

View File

@@ -205,7 +205,7 @@ func (r *Reporter) Fire(entry *log.Entry) error {
urgentState = true
}
}
if !r.duringSteps() {
if r.shouldAppendLogRow(entry) {
r.logRows = appendIfNotNil(r.logRows, r.parseLogRow(entry))
}
r.unlockAndNotify(urgentState)
@@ -219,7 +219,7 @@ func (r *Reporter) Fire(entry *log.Entry) error {
}
}
if step == nil {
if !r.duringSteps() {
if r.shouldAppendLogRow(entry) {
r.logRows = appendIfNotNil(r.logRows, r.parseLogRow(entry))
}
r.unlockAndNotify(false)
@@ -246,7 +246,7 @@ func (r *Reporter) Fire(entry *log.Entry) error {
r.logRows = append(r.logRows, row)
}
}
} else if !r.duringSteps() {
} else if r.shouldAppendLogRow(entry) {
r.logRows = appendIfNotNil(r.logRows, r.parseLogRow(entry))
}
if v, ok := entry.Data["stepResult"]; ok && isJobStepEntry(entry) {
@@ -576,6 +576,13 @@ func (r *Reporter) duringSteps() bool {
return true
}
// shouldAppendLogRow reports whether a non-raw_output entry should be written
// to the job log: only when we are between steps and the entry's level is
// within the globally configured log level.
func (r *Reporter) shouldAppendLogRow(entry *log.Entry) bool {
return !r.duringSteps() && entry.Level <= log.GetLevel()
}
var stringToResult = map[string]runnerv1.Result{
"success": runnerv1.Result_RESULT_SUCCESS,
"failure": runnerv1.Result_RESULT_FAILURE,

View File

@@ -219,6 +219,59 @@ func TestReporter_Fire(t *testing.T) {
})
}
func TestReporter_LogLevelFiltering(t *testing.T) {
// Set global level to Info so Debug entries should be filtered.
origLevel := log.GetLevel()
log.SetLevel(log.InfoLevel)
defer log.SetLevel(origLevel)
client := mocks.NewClient(t)
client.On("UpdateLog", mock.Anything, mock.Anything).Return(func(_ context.Context, req *connect_go.Request[runnerv1.UpdateLogRequest]) (*connect_go.Response[runnerv1.UpdateLogResponse], error) {
return connect_go.NewResponse(&runnerv1.UpdateLogResponse{
AckIndex: req.Msg.Index + int64(len(req.Msg.Rows)),
}), nil
})
client.On("UpdateTask", mock.Anything, mock.Anything).Return(func(_ context.Context, req *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) {
return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{}), nil
})
ctx, cancel := context.WithCancel(context.Background())
taskCtx, err := structpb.NewStruct(map[string]any{})
require.NoError(t, err)
cfg, _ := config.LoadDefault("")
reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{Context: taskCtx}, cfg)
reporter.RunDaemon()
defer func() {
require.NoError(t, reporter.Close(""))
}()
reporter.ResetSteps(2)
dataStep0 := log.Fields{"stage": "Main", "stepNumber": 0, "raw_output": true}
dataStep0Internal := log.Fields{"stage": "Main", "stepNumber": 0}
// raw_output entries always appear in job log regardless of level.
require.NoError(t, reporter.Fire(&log.Entry{Message: "step output", Data: dataStep0, Level: log.InfoLevel}))
require.NoError(t, reporter.Fire(&log.Entry{Message: "step debug output", Data: dataStep0, Level: log.DebugLevel}))
assert.Equal(t, int64(2), reporter.state.Steps[0].LogLength, "raw_output entries must always be forwarded")
// Non-raw_output entries during steps are not added to logRows regardless of level.
require.NoError(t, reporter.Fire(&log.Entry{Message: "internal info", Data: dataStep0Internal, Level: log.InfoLevel}))
require.NoError(t, reporter.Fire(&log.Entry{Message: "internal debug", Data: dataStep0Internal, Level: log.DebugLevel}))
// stepResult at DebugLevel (skipped step) must still update state even when filtered from log.
require.NoError(t, reporter.Fire(&log.Entry{
Message: "Skipping step",
Data: log.Fields{
"stage": "Main",
"stepNumber": 1,
"stepResult": "skipped",
},
Level: log.DebugLevel,
}))
assert.Equal(t, runnerv1.Result_RESULT_SKIPPED, reporter.state.Steps[1].Result,
"stepResult at DebugLevel must update step state even when log entry is filtered from job log output")
}
// TestReporter_EphemeralRunnerDeletion reproduces the exact scenario from
// https://gitea.com/gitea/runner/issues/793:
//