fix cleanup network

This commit is contained in:
Lunny Xiao
2026-05-19 16:39:51 -07:00
parent 5464d33eef
commit 3815aad750
4 changed files with 267 additions and 24 deletions

View File

@@ -35,6 +35,7 @@ func newJobExecutor(info jobInfo, sf stepFactory, rc *RunContext) common.Executo
steps := make([]common.Executor, 0)
preSteps := make([]common.Executor, 0)
var postExecutor common.Executor
var startErr error
steps = append(steps, func(ctx context.Context) error {
logger := common.Logger(ctx)
@@ -165,7 +166,12 @@ func newJobExecutor(info jobInfo, sf stepFactory, rc *RunContext) common.Executo
pipeline = append(pipeline, preSteps...)
pipeline = append(pipeline, steps...)
return common.NewPipelineExecutor(info.startContainer(), common.NewPipelineExecutor(pipeline...).
startContainer := func(ctx context.Context) error {
startErr = info.startContainer()(ctx)
return startErr
}
return common.NewPipelineExecutor(startContainer, common.NewPipelineExecutor(pipeline...).
Finally(func(ctx context.Context) error {
var cancel context.CancelFunc
if ctx.Err() == context.Canceled {
@@ -176,8 +182,23 @@ func newJobExecutor(info jobInfo, sf stepFactory, rc *RunContext) common.Executo
}
return postExecutor(ctx)
}).
Finally(info.interpolateOutputs()).
Finally(info.closeContainer()))
Finally(info.interpolateOutputs())).
Finally(func(ctx context.Context) error {
if startErr == nil {
return nil
}
cleanupCtx, cancel := context.WithTimeout(common.WithLogger(context.Background(), common.Logger(ctx)), time.Minute)
defer cancel()
logger := common.Logger(cleanupCtx)
logger.Infof("Cleaning up container for failed startup of job %s", rc.JobName)
if err := info.stopContainer()(cleanupCtx); err != nil {
logger.Errorf("Error while cleaning up failed job startup: %v", err)
}
return nil
}).
Finally(info.closeContainer())
}
func setJobResult(ctx context.Context, info jobInfo, rc *RunContext, success bool) {

View File

@@ -18,6 +18,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
func TestJobExecutor(t *testing.T) {
@@ -341,3 +342,64 @@ func TestNewJobExecutor(t *testing.T) {
})
}
}
func TestNewJobExecutorCleansUpAfterStartContainerFailure(t *testing.T) {
ctx := common.WithJobErrorContainer(context.Background())
jim := &jobInfoMock{}
sfm := &stepFactoryMock{}
rc := &RunContext{
JobName: "test",
JobContainer: &jobContainerMock{},
Run: &model.Run{
JobID: "test",
Workflow: &model.Workflow{
Jobs: map[string]*model.Job{
"test": {},
},
},
},
Config: &Config{},
}
rc.ExprEval = rc.NewExpressionEvaluator(ctx)
executorOrder := make([]string, 0)
startErr := errors.New("failed to start container")
stepModel := &model.Step{ID: "1"}
sm := &stepMock{}
jim.On("steps").Return([]*model.Step{stepModel})
jim.On("startContainer").Return(func(ctx context.Context) error {
executorOrder = append(executorOrder, "startContainer")
return startErr
})
jim.On("stopContainer").Return(func(ctx context.Context) error {
executorOrder = append(executorOrder, "stopContainer")
return nil
})
jim.On("closeContainer").Return(func(ctx context.Context) error {
executorOrder = append(executorOrder, "closeContainer")
return nil
})
jim.On("interpolateOutputs").Return(func(ctx context.Context) error {
return nil
})
sfm.On("newStep", stepModel, rc).Return(sm, nil)
sm.On("pre").Return(func(ctx context.Context) error {
return nil
})
sm.On("main").Return(func(ctx context.Context) error {
return nil
})
sm.On("post").Return(func(ctx context.Context) error {
return nil
})
executor := newJobExecutor(jim, sfm, rc)
err := executor(ctx)
require.ErrorIs(t, err, startErr)
assert.Equal(t, []string{"startContainer", "stopContainer", "closeContainer"}, executorOrder)
jim.AssertExpectations(t)
sfm.AssertExpectations(t)
sm.AssertExpectations(t)
}