mirror of
https://gitea.com/gitea/act_runner.git
synced 2026-06-10 11:54:27 +02:00
Compare commits
2 Commits
renovate/d
...
lunny/remo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c5d0457615 | ||
|
|
3815aad750 |
@@ -8,12 +8,24 @@ package container
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
"gitea.com/gitea/runner/act/common"
|
"gitea.com/gitea/runner/act/common"
|
||||||
|
|
||||||
"github.com/moby/moby/client"
|
"github.com/moby/moby/client"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
dockerNetworkRemoveRetryInterval = 200 * time.Millisecond
|
||||||
|
dockerNetworkRemoveTimeout = 10 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
|
type dockerNetworkClient interface {
|
||||||
|
NetworkList(ctx context.Context, options client.NetworkListOptions) (client.NetworkListResult, error)
|
||||||
|
NetworkInspect(ctx context.Context, networkID string, options client.NetworkInspectOptions) (client.NetworkInspectResult, error)
|
||||||
|
NetworkRemove(ctx context.Context, networkID string, options client.NetworkRemoveOptions) (client.NetworkRemoveResult, error)
|
||||||
|
}
|
||||||
|
|
||||||
func NewDockerNetworkCreateExecutor(name string) common.Executor {
|
func NewDockerNetworkCreateExecutor(name string) common.Executor {
|
||||||
return func(ctx context.Context) error {
|
return func(ctx context.Context) error {
|
||||||
cli, err := GetDockerClient(ctx)
|
cli, err := GetDockerClient(ctx)
|
||||||
@@ -56,31 +68,64 @@ func NewDockerNetworkRemoveExecutor(name string) common.Executor {
|
|||||||
}
|
}
|
||||||
defer cli.Close()
|
defer cli.Close()
|
||||||
|
|
||||||
// Make sure that all network of the specified name are removed
|
return removeDockerNetworks(ctx, cli, name)
|
||||||
// cli.NetworkRemove refuses to remove a network if there are duplicates
|
}
|
||||||
networks, err := cli.NetworkList(ctx, client.NetworkListOptions{})
|
}
|
||||||
|
|
||||||
|
func removeDockerNetworks(ctx context.Context, cli dockerNetworkClient, name string) error {
|
||||||
|
cleanupCtx, cancel := context.WithTimeout(ctx, dockerNetworkRemoveTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
for {
|
||||||
|
pendingRemoval, err := removeDockerNetworksOnce(cleanupCtx, cli, name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if !pendingRemoval {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-cleanupCtx.Done():
|
||||||
|
common.Logger(ctx).Warnf("Timed out waiting for Docker network %v endpoints to detach; leaving network behind", name)
|
||||||
|
return nil
|
||||||
|
case <-time.After(dockerNetworkRemoveRetryInterval):
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func removeDockerNetworksOnce(ctx context.Context, cli dockerNetworkClient, name string) (bool, error) {
|
||||||
|
// Make sure that all network of the specified name are removed.
|
||||||
|
// cli.NetworkRemove refuses to remove a network if there are duplicates.
|
||||||
|
networks, err := cli.NetworkList(ctx, client.NetworkListOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
// For Gitea, reduce log noise
|
// For Gitea, reduce log noise
|
||||||
// common.Logger(ctx).Debugf("%v", networks)
|
// common.Logger(ctx).Debugf("%v", networks)
|
||||||
|
|
||||||
|
pendingRemoval := false
|
||||||
for _, n := range networks.Items {
|
for _, n := range networks.Items {
|
||||||
if n.Name == name {
|
if n.Name != name {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
result, err := cli.NetworkInspect(ctx, n.ID, client.NetworkInspectOptions{})
|
result, err := cli.NetworkInspect(ctx, n.ID, client.NetworkInspectOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(result.Network.Containers) != 0 {
|
||||||
|
pendingRemoval = true
|
||||||
|
common.Logger(ctx).Debugf("Waiting to remove network %v because it still has active endpoints", name)
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(result.Network.Containers) == 0 {
|
|
||||||
if _, err = cli.NetworkRemove(ctx, n.ID, client.NetworkRemoveOptions{}); err != nil {
|
if _, err = cli.NetworkRemove(ctx, n.ID, client.NetworkRemoveOptions{}); err != nil {
|
||||||
common.Logger(ctx).Debugf("%v", err)
|
pendingRemoval = true
|
||||||
}
|
common.Logger(ctx).Debugf("Retrying Docker network removal for %v: %v", name, err)
|
||||||
} else {
|
|
||||||
common.Logger(ctx).Debugf("Refusing to remove network %v because it still has active endpoints", name)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
return pendingRemoval, nil
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
115
act/container/docker_network_test.go
Normal file
115
act/container/docker_network_test.go
Normal file
@@ -0,0 +1,115 @@
|
|||||||
|
// Copyright 2026 The Gitea Authors. All rights reserved.
|
||||||
|
// Copyright 2026 The nektos/act Authors. All rights reserved.
|
||||||
|
// SPDX-License-Identifier: MIT
|
||||||
|
|
||||||
|
//go:build !(WITHOUT_DOCKER || !(linux || darwin || windows || netbsd))
|
||||||
|
|
||||||
|
package container
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
containernetwork "github.com/moby/moby/api/types/network"
|
||||||
|
"github.com/moby/moby/client"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
type fakeDockerNetworkClient struct {
|
||||||
|
listResult client.NetworkListResult
|
||||||
|
inspectByID map[string][]client.NetworkInspectResult
|
||||||
|
inspectCalls map[string]int
|
||||||
|
removeCalls []string
|
||||||
|
removeErrs map[string][]error
|
||||||
|
removeIdx map[string]int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fakeDockerNetworkClient) NetworkList(context.Context, client.NetworkListOptions) (client.NetworkListResult, error) {
|
||||||
|
return f.listResult, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fakeDockerNetworkClient) NetworkInspect(_ context.Context, networkID string, _ client.NetworkInspectOptions) (client.NetworkInspectResult, error) {
|
||||||
|
idx := f.inspectCalls[networkID]
|
||||||
|
f.inspectCalls[networkID] = idx + 1
|
||||||
|
results := f.inspectByID[networkID]
|
||||||
|
if len(results) == 0 {
|
||||||
|
return client.NetworkInspectResult{}, nil
|
||||||
|
}
|
||||||
|
if idx >= len(results) {
|
||||||
|
return results[len(results)-1], nil
|
||||||
|
}
|
||||||
|
return results[idx], nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fakeDockerNetworkClient) NetworkRemove(_ context.Context, networkID string, _ client.NetworkRemoveOptions) (client.NetworkRemoveResult, error) {
|
||||||
|
f.removeCalls = append(f.removeCalls, networkID)
|
||||||
|
idx := f.removeIdx[networkID]
|
||||||
|
f.removeIdx[networkID] = idx + 1
|
||||||
|
if errs := f.removeErrs[networkID]; idx < len(errs) {
|
||||||
|
return client.NetworkRemoveResult{}, errs[idx]
|
||||||
|
}
|
||||||
|
return client.NetworkRemoveResult{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRemoveDockerNetworksRetriesUntilEndpointsDetach(t *testing.T) {
|
||||||
|
originalInterval := dockerNetworkRemoveRetryInterval
|
||||||
|
originalTimeout := dockerNetworkRemoveTimeout
|
||||||
|
dockerNetworkRemoveRetryInterval = time.Millisecond
|
||||||
|
dockerNetworkRemoveTimeout = 50 * time.Millisecond
|
||||||
|
t.Cleanup(func() {
|
||||||
|
dockerNetworkRemoveRetryInterval = originalInterval
|
||||||
|
dockerNetworkRemoveTimeout = originalTimeout
|
||||||
|
})
|
||||||
|
|
||||||
|
cli := &fakeDockerNetworkClient{
|
||||||
|
listResult: client.NetworkListResult{
|
||||||
|
Items: []containernetwork.Summary{{Network: containernetwork.Network{ID: "n1", Name: "test"}}},
|
||||||
|
},
|
||||||
|
inspectByID: map[string][]client.NetworkInspectResult{
|
||||||
|
"n1": {
|
||||||
|
{Network: containernetwork.Inspect{Containers: map[string]containernetwork.EndpointResource{"c1": {}}}},
|
||||||
|
{Network: containernetwork.Inspect{Containers: map[string]containernetwork.EndpointResource{}}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
inspectCalls: map[string]int{},
|
||||||
|
removeErrs: map[string][]error{},
|
||||||
|
removeIdx: map[string]int{},
|
||||||
|
}
|
||||||
|
|
||||||
|
err := removeDockerNetworks(context.Background(), cli, "test")
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, []string{"n1"}, cli.removeCalls)
|
||||||
|
assert.GreaterOrEqual(t, cli.inspectCalls["n1"], 2)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRemoveDockerNetworksStopsRetryingAfterTimeout(t *testing.T) {
|
||||||
|
originalInterval := dockerNetworkRemoveRetryInterval
|
||||||
|
originalTimeout := dockerNetworkRemoveTimeout
|
||||||
|
dockerNetworkRemoveRetryInterval = time.Millisecond
|
||||||
|
dockerNetworkRemoveTimeout = 5 * time.Millisecond
|
||||||
|
t.Cleanup(func() {
|
||||||
|
dockerNetworkRemoveRetryInterval = originalInterval
|
||||||
|
dockerNetworkRemoveTimeout = originalTimeout
|
||||||
|
})
|
||||||
|
|
||||||
|
cli := &fakeDockerNetworkClient{
|
||||||
|
listResult: client.NetworkListResult{
|
||||||
|
Items: []containernetwork.Summary{{Network: containernetwork.Network{ID: "n1", Name: "test"}}},
|
||||||
|
},
|
||||||
|
inspectByID: map[string][]client.NetworkInspectResult{
|
||||||
|
"n1": {
|
||||||
|
{Network: containernetwork.Inspect{Containers: map[string]containernetwork.EndpointResource{"c1": {}}}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
inspectCalls: map[string]int{},
|
||||||
|
removeErrs: map[string][]error{},
|
||||||
|
removeIdx: map[string]int{},
|
||||||
|
}
|
||||||
|
|
||||||
|
err := removeDockerNetworks(context.Background(), cli, "test")
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Empty(t, cli.removeCalls)
|
||||||
|
assert.Positive(t, cli.inspectCalls["n1"])
|
||||||
|
}
|
||||||
@@ -35,6 +35,7 @@ func newJobExecutor(info jobInfo, sf stepFactory, rc *RunContext) common.Executo
|
|||||||
steps := make([]common.Executor, 0)
|
steps := make([]common.Executor, 0)
|
||||||
preSteps := make([]common.Executor, 0)
|
preSteps := make([]common.Executor, 0)
|
||||||
var postExecutor common.Executor
|
var postExecutor common.Executor
|
||||||
|
var startErr error
|
||||||
|
|
||||||
steps = append(steps, func(ctx context.Context) error {
|
steps = append(steps, func(ctx context.Context) error {
|
||||||
logger := common.Logger(ctx)
|
logger := common.Logger(ctx)
|
||||||
@@ -165,7 +166,12 @@ func newJobExecutor(info jobInfo, sf stepFactory, rc *RunContext) common.Executo
|
|||||||
pipeline = append(pipeline, preSteps...)
|
pipeline = append(pipeline, preSteps...)
|
||||||
pipeline = append(pipeline, steps...)
|
pipeline = append(pipeline, steps...)
|
||||||
|
|
||||||
return common.NewPipelineExecutor(info.startContainer(), common.NewPipelineExecutor(pipeline...).
|
startContainer := func(ctx context.Context) error {
|
||||||
|
startErr = info.startContainer()(ctx)
|
||||||
|
return startErr
|
||||||
|
}
|
||||||
|
|
||||||
|
return common.NewPipelineExecutor(startContainer, common.NewPipelineExecutor(pipeline...).
|
||||||
Finally(func(ctx context.Context) error {
|
Finally(func(ctx context.Context) error {
|
||||||
var cancel context.CancelFunc
|
var cancel context.CancelFunc
|
||||||
if ctx.Err() == context.Canceled {
|
if ctx.Err() == context.Canceled {
|
||||||
@@ -176,8 +182,23 @@ func newJobExecutor(info jobInfo, sf stepFactory, rc *RunContext) common.Executo
|
|||||||
}
|
}
|
||||||
return postExecutor(ctx)
|
return postExecutor(ctx)
|
||||||
}).
|
}).
|
||||||
Finally(info.interpolateOutputs()).
|
Finally(info.interpolateOutputs())).
|
||||||
Finally(info.closeContainer()))
|
Finally(func(ctx context.Context) error {
|
||||||
|
if startErr == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
cleanupCtx, cancel := context.WithTimeout(common.WithLogger(context.Background(), common.Logger(ctx)), time.Minute)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
logger := common.Logger(cleanupCtx)
|
||||||
|
logger.Infof("Cleaning up container for failed startup of job %s", rc.JobName)
|
||||||
|
if err := info.stopContainer()(cleanupCtx); err != nil {
|
||||||
|
logger.Errorf("Error while cleaning up failed job startup: %v", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}).
|
||||||
|
Finally(info.closeContainer())
|
||||||
}
|
}
|
||||||
|
|
||||||
func setJobResult(ctx context.Context, info jobInfo, rc *RunContext, success bool) {
|
func setJobResult(ctx context.Context, info jobInfo, rc *RunContext, success bool) {
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ import (
|
|||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/mock"
|
"github.com/stretchr/testify/mock"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestJobExecutor(t *testing.T) {
|
func TestJobExecutor(t *testing.T) {
|
||||||
@@ -341,3 +342,64 @@ func TestNewJobExecutor(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestNewJobExecutorCleansUpAfterStartContainerFailure(t *testing.T) {
|
||||||
|
ctx := common.WithJobErrorContainer(context.Background())
|
||||||
|
jim := &jobInfoMock{}
|
||||||
|
sfm := &stepFactoryMock{}
|
||||||
|
rc := &RunContext{
|
||||||
|
JobName: "test",
|
||||||
|
JobContainer: &jobContainerMock{},
|
||||||
|
Run: &model.Run{
|
||||||
|
JobID: "test",
|
||||||
|
Workflow: &model.Workflow{
|
||||||
|
Jobs: map[string]*model.Job{
|
||||||
|
"test": {},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Config: &Config{},
|
||||||
|
}
|
||||||
|
rc.ExprEval = rc.NewExpressionEvaluator(ctx)
|
||||||
|
|
||||||
|
executorOrder := make([]string, 0)
|
||||||
|
startErr := errors.New("failed to start container")
|
||||||
|
stepModel := &model.Step{ID: "1"}
|
||||||
|
sm := &stepMock{}
|
||||||
|
|
||||||
|
jim.On("steps").Return([]*model.Step{stepModel})
|
||||||
|
jim.On("startContainer").Return(func(ctx context.Context) error {
|
||||||
|
executorOrder = append(executorOrder, "startContainer")
|
||||||
|
return startErr
|
||||||
|
})
|
||||||
|
jim.On("stopContainer").Return(func(ctx context.Context) error {
|
||||||
|
executorOrder = append(executorOrder, "stopContainer")
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
jim.On("closeContainer").Return(func(ctx context.Context) error {
|
||||||
|
executorOrder = append(executorOrder, "closeContainer")
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
jim.On("interpolateOutputs").Return(func(ctx context.Context) error {
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
sfm.On("newStep", stepModel, rc).Return(sm, nil)
|
||||||
|
sm.On("pre").Return(func(ctx context.Context) error {
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
sm.On("main").Return(func(ctx context.Context) error {
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
sm.On("post").Return(func(ctx context.Context) error {
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
executor := newJobExecutor(jim, sfm, rc)
|
||||||
|
err := executor(ctx)
|
||||||
|
require.ErrorIs(t, err, startErr)
|
||||||
|
assert.Equal(t, []string{"startContainer", "stopContainer", "closeContainer"}, executorOrder)
|
||||||
|
|
||||||
|
jim.AssertExpectations(t)
|
||||||
|
sfm.AssertExpectations(t)
|
||||||
|
sm.AssertExpectations(t)
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user