mirror of
https://gitea.com/gitea/act_runner.git
synced 2026-05-08 08:13:25 +02:00
fix: serialize action-cache reads to prevent worktree race (#938)
`NewGitCloneExecutor` holds a per-directory mutex while it `git checkout --force`s a remote action into the shared `<ActionCacheDir>/<UsesHash>`, but four read sites ran unlocked: - `maybeCopyToActionDir`'s tar walk via `JobContainer.CopyDir` - `prepareActionExecutor`'s `readAction` parse of `action.yml` - `newReusableWorkflowExecutor`'s `model.NewWorkflowPlanner` after `cloneRemoteReusableWorkflow` released its lock - `execAsDocker` when `ActionCache == nil`: `docker build` walks `contextDir` for the daemon-side build context When two matrix jobs share a `uses:`, a read interleaved with a peer's checkout produces partial state — observed as `Cannot find module .../dist/index.js` and `setup-uv` failing on a half-written `action.yml`. Exports `acquireCloneLock` as `AcquireCloneLock` and takes it at all four sites. `container.ImageExistsLocally` / `NewDockerBuildExecutor` and `model.NewWorkflowPlanner` are indirected through package-level vars so the docker-action build path and the reusable-workflow read site are testable without a real daemon, mirroring `ContainerNewContainer`. Three regression tests cover the higher-risk sites (`maybeCopyToActionDir`, `execAsDocker`, `newReusableWorkflowExecutor`); each fails if its `AcquireCloneLock` is removed. Subsumed by https://gitea.com/gitea/runner/pulls/814 once that lands. Related: https://gitea.com/gitea/runner/pulls/930 --- This PR was written with the help of Claude Opus 4.7 --------- Co-authored-by: Nicolas <bircni@icloud.com> Reviewed-on: https://gitea.com/gitea/runner/pulls/938 Reviewed-by: Nicolas <bircni@icloud.com> Reviewed-by: Zettat123 <39446+zettat123@noreply.gitea.com> Co-authored-by: silverwind <me@silverwind.io> Co-committed-by: silverwind <me@silverwind.io>
This commit is contained in:
@@ -38,9 +38,11 @@ var (
|
|||||||
ErrNoRepo = errors.New("unable to find git repo")
|
ErrNoRepo = errors.New("unable to find git repo")
|
||||||
)
|
)
|
||||||
|
|
||||||
// acquireCloneLock returns an unlock function after locking the per-directory mutex for dir.
|
// AcquireCloneLock returns an unlock function after locking the per-directory mutex for dir.
|
||||||
// Only concurrent operations targeting the same directory are erialized; clones into different directories run in parallel.
|
// Only concurrent operations targeting the same directory are serialized; clones into different directories run in parallel.
|
||||||
func acquireCloneLock(dir string) func() {
|
// Callers reading files inside dir (e.g. tarring a checked-out action into a job container) must hold this lock too,
|
||||||
|
// otherwise a concurrent NewGitCloneExecutor on the same dir can mutate the worktree mid-read.
|
||||||
|
func AcquireCloneLock(dir string) func() {
|
||||||
v, _ := cloneLocks.LoadOrStore(dir, &sync.Mutex{})
|
v, _ := cloneLocks.LoadOrStore(dir, &sync.Mutex{})
|
||||||
mu := v.(*sync.Mutex)
|
mu := v.(*sync.Mutex)
|
||||||
mu.Lock()
|
mu.Lock()
|
||||||
@@ -308,7 +310,7 @@ func NewGitCloneExecutor(input NewGitCloneExecutorInput) common.Executor {
|
|||||||
logger.Infof("git clone '%s' # ref=%s", input.URL, input.Ref)
|
logger.Infof("git clone '%s' # ref=%s", input.URL, input.Ref)
|
||||||
logger.Debugf(" cloning %s to %s", input.URL, input.Dir)
|
logger.Debugf(" cloning %s to %s", input.URL, input.Dir)
|
||||||
|
|
||||||
defer acquireCloneLock(input.Dir)()
|
defer AcquireCloneLock(input.Dir)()
|
||||||
|
|
||||||
refName := plumbing.ReferenceName("refs/heads/" + input.Ref)
|
refName := plumbing.ReferenceName("refs/heads/" + input.Ref)
|
||||||
r, err := CloneIfRequired(ctx, refName, input, logger)
|
r, err := CloneIfRequired(ctx, refName, input, logger)
|
||||||
|
|||||||
@@ -310,11 +310,11 @@ func TestAcquireCloneLock(t *testing.T) {
|
|||||||
t.Run("same directory serializes", func(t *testing.T) {
|
t.Run("same directory serializes", func(t *testing.T) {
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
|
|
||||||
unlock1 := acquireCloneLock(dir)
|
unlock1 := AcquireCloneLock(dir)
|
||||||
|
|
||||||
secondAcquired := make(chan struct{})
|
secondAcquired := make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
unlock := acquireCloneLock(dir)
|
unlock := AcquireCloneLock(dir)
|
||||||
close(secondAcquired)
|
close(secondAcquired)
|
||||||
unlock()
|
unlock()
|
||||||
}()
|
}()
|
||||||
@@ -338,12 +338,12 @@ func TestAcquireCloneLock(t *testing.T) {
|
|||||||
dirA := t.TempDir()
|
dirA := t.TempDir()
|
||||||
dirB := t.TempDir()
|
dirB := t.TempDir()
|
||||||
|
|
||||||
unlockA := acquireCloneLock(dirA)
|
unlockA := AcquireCloneLock(dirA)
|
||||||
defer unlockA()
|
defer unlockA()
|
||||||
|
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
unlock := acquireCloneLock(dirB)
|
unlock := AcquireCloneLock(dirB)
|
||||||
unlock()
|
unlock()
|
||||||
close(done)
|
close(done)
|
||||||
}()
|
}()
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"gitea.com/gitea/runner/act/common"
|
"gitea.com/gitea/runner/act/common"
|
||||||
|
"gitea.com/gitea/runner/act/common/git"
|
||||||
"gitea.com/gitea/runner/act/container"
|
"gitea.com/gitea/runner/act/container"
|
||||||
"gitea.com/gitea/runner/act/model"
|
"gitea.com/gitea/runner/act/model"
|
||||||
|
|
||||||
@@ -44,6 +45,11 @@ type runAction func(step actionStep, actionDir string, remoteAction *remoteActio
|
|||||||
//go:embed res/trampoline.js
|
//go:embed res/trampoline.js
|
||||||
var trampoline embed.FS
|
var trampoline embed.FS
|
||||||
|
|
||||||
|
var (
|
||||||
|
ContainerImageExistsLocally = container.ImageExistsLocally
|
||||||
|
ContainerNewDockerBuildExecutor = container.NewDockerBuildExecutor
|
||||||
|
)
|
||||||
|
|
||||||
func readActionImpl(ctx context.Context, step *model.Step, actionDir, actionPath string, readFile actionYamlReader, writeFile fileWriter) (*model.Action, error) {
|
func readActionImpl(ctx context.Context, step *model.Step, actionDir, actionPath string, readFile actionYamlReader, writeFile fileWriter) (*model.Action, error) {
|
||||||
logger := common.Logger(ctx)
|
logger := common.Logger(ctx)
|
||||||
allErrors := []error{}
|
allErrors := []error{}
|
||||||
@@ -148,6 +154,8 @@ func maybeCopyToActionDir(ctx context.Context, step actionStep, actionDir, actio
|
|||||||
return rc.JobContainer.CopyTarStream(ctx, containerActionDirCopy, ta)
|
return rc.JobContainer.CopyTarStream(ctx, containerActionDirCopy, ta)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
defer git.AcquireCloneLock(actionDir)()
|
||||||
|
|
||||||
if err := removeGitIgnore(ctx, actionDir); err != nil {
|
if err := removeGitIgnore(ctx, actionDir); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -197,7 +205,7 @@ func runActionImpl(step actionStep, actionDir string, remoteAction *remoteAction
|
|||||||
if remoteAction == nil {
|
if remoteAction == nil {
|
||||||
location = containerActionDir
|
location = containerActionDir
|
||||||
}
|
}
|
||||||
return execAsDocker(ctx, step, actionName, location, remoteAction == nil)
|
return execAsDocker(ctx, step, actionName, actionDir, location, remoteAction == nil)
|
||||||
case x.IsComposite():
|
case x.IsComposite():
|
||||||
if err := maybeCopyToActionDir(ctx, step, actionDir, actionPath, containerActionDir); err != nil {
|
if err := maybeCopyToActionDir(ctx, step, actionDir, actionPath, containerActionDir); err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -265,7 +273,7 @@ func removeGitIgnore(ctx context.Context, directory string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// TODO: break out parts of function to reduce complexicity
|
// TODO: break out parts of function to reduce complexicity
|
||||||
func execAsDocker(ctx context.Context, step actionStep, actionName, basedir string, localAction bool) error {
|
func execAsDocker(ctx context.Context, step actionStep, actionName, actionDir, basedir string, localAction bool) error {
|
||||||
logger := common.Logger(ctx)
|
logger := common.Logger(ctx)
|
||||||
rc := step.getRunContext()
|
rc := step.getRunContext()
|
||||||
action := step.getActionModel()
|
action := step.getActionModel()
|
||||||
@@ -284,12 +292,12 @@ func execAsDocker(ctx context.Context, step actionStep, actionName, basedir stri
|
|||||||
image = strings.ToLower(image)
|
image = strings.ToLower(image)
|
||||||
contextDir, fileName := filepath.Split(filepath.Join(basedir, action.Runs.Image))
|
contextDir, fileName := filepath.Split(filepath.Join(basedir, action.Runs.Image))
|
||||||
|
|
||||||
anyArchExists, err := container.ImageExistsLocally(ctx, image, "any")
|
anyArchExists, err := ContainerImageExistsLocally(ctx, image, "any")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
correctArchExists, err := container.ImageExistsLocally(ctx, image, rc.Config.ContainerArchitecture)
|
correctArchExists, err := ContainerImageExistsLocally(ctx, image, rc.Config.ContainerArchitecture)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -321,13 +329,21 @@ func execAsDocker(ctx context.Context, step actionStep, actionName, basedir stri
|
|||||||
}
|
}
|
||||||
defer buildContext.Close()
|
defer buildContext.Close()
|
||||||
}
|
}
|
||||||
prepImage = container.NewDockerBuildExecutor(container.NewDockerBuildExecutorInput{
|
prepImage = ContainerNewDockerBuildExecutor(container.NewDockerBuildExecutorInput{
|
||||||
ContextDir: contextDir,
|
ContextDir: contextDir,
|
||||||
Dockerfile: fileName,
|
Dockerfile: fileName,
|
||||||
ImageTag: image,
|
ImageTag: image,
|
||||||
BuildContext: buildContext,
|
BuildContext: buildContext,
|
||||||
Platform: rc.Config.ContainerArchitecture,
|
Platform: rc.Config.ContainerArchitecture,
|
||||||
})
|
})
|
||||||
|
if buildContext == nil {
|
||||||
|
// Held across the whole build: the daemon drains contextDir lazily.
|
||||||
|
inner := prepImage
|
||||||
|
prepImage = func(ctx context.Context) error {
|
||||||
|
defer git.AcquireCloneLock(actionDir)()
|
||||||
|
return inner(ctx)
|
||||||
|
}
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
logger.Debugf("image '%s' for architecture '%s' already exists", image, rc.Config.ContainerArchitecture)
|
logger.Debugf("image '%s' for architecture '%s' already exists", image, rc.Config.ContainerArchitecture)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -9,8 +9,13 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"io/fs"
|
"io/fs"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gitea.com/gitea/runner/act/common"
|
||||||
|
"gitea.com/gitea/runner/act/common/git"
|
||||||
|
"gitea.com/gitea/runner/act/container"
|
||||||
"gitea.com/gitea/runner/act/model"
|
"gitea.com/gitea/runner/act/model"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
@@ -252,3 +257,153 @@ func TestActionRunner(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMaybeCopyToActionDirHoldsCloneLock(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
actionDir := t.TempDir()
|
||||||
|
|
||||||
|
releaseCopy := make(chan struct{})
|
||||||
|
release := sync.OnceFunc(func() { close(releaseCopy) })
|
||||||
|
defer release()
|
||||||
|
|
||||||
|
copyEntered := make(chan struct{})
|
||||||
|
|
||||||
|
cm := &containerMock{}
|
||||||
|
cm.On("CopyDir", "/var/run/act/actions/", actionDir+"/", false).Return(func(ctx context.Context) error {
|
||||||
|
close(copyEntered)
|
||||||
|
<-releaseCopy
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
step := &stepActionRemote{
|
||||||
|
Step: &model.Step{Uses: "remote/action@v1"},
|
||||||
|
RunContext: &RunContext{
|
||||||
|
Config: &Config{},
|
||||||
|
JobContainer: cm,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
copyDone := make(chan error, 1)
|
||||||
|
go func() {
|
||||||
|
copyDone <- maybeCopyToActionDir(ctx, step, actionDir, "", "/var/run/act/actions/")
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-copyEntered:
|
||||||
|
case err := <-copyDone:
|
||||||
|
t.Fatalf("maybeCopyToActionDir returned before CopyDir was entered: %v", err)
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatal("CopyDir was not entered within 1 second")
|
||||||
|
}
|
||||||
|
|
||||||
|
peerAcquired := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
unlock := git.AcquireCloneLock(actionDir)
|
||||||
|
close(peerAcquired)
|
||||||
|
unlock()
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-peerAcquired:
|
||||||
|
t.Fatal("peer AcquireCloneLock returned while CopyDir was running")
|
||||||
|
case <-time.After(50 * time.Millisecond):
|
||||||
|
}
|
||||||
|
|
||||||
|
release()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case err := <-copyDone:
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("maybeCopyToActionDir returned error: %v", err)
|
||||||
|
}
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatal("maybeCopyToActionDir did not return after CopyDir was unblocked")
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-peerAcquired:
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatal("peer AcquireCloneLock did not proceed after lock released")
|
||||||
|
}
|
||||||
|
|
||||||
|
cm.AssertExpectations(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestExecAsDockerHoldsCloneLockForRemoteUncached(t *testing.T) {
|
||||||
|
actionDir := t.TempDir()
|
||||||
|
|
||||||
|
unlockOnce := sync.OnceFunc(git.AcquireCloneLock(actionDir))
|
||||||
|
defer unlockOnce()
|
||||||
|
|
||||||
|
innerEntered := make(chan struct{})
|
||||||
|
releaseInner := make(chan struct{})
|
||||||
|
releaseOnce := sync.OnceFunc(func() { close(releaseInner) })
|
||||||
|
defer releaseOnce()
|
||||||
|
|
||||||
|
origImageExists := ContainerImageExistsLocally
|
||||||
|
ContainerImageExistsLocally = func(_ context.Context, _, _ string) (bool, error) {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
defer func() { ContainerImageExistsLocally = origImageExists }()
|
||||||
|
|
||||||
|
origBuildExec := ContainerNewDockerBuildExecutor
|
||||||
|
ContainerNewDockerBuildExecutor = func(_ container.NewDockerBuildExecutorInput) common.Executor {
|
||||||
|
return func(_ context.Context) error {
|
||||||
|
close(innerEntered)
|
||||||
|
<-releaseInner
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
defer func() { ContainerNewDockerBuildExecutor = origBuildExec }()
|
||||||
|
|
||||||
|
step := &stepActionRemote{
|
||||||
|
Step: &model.Step{ID: "1", Uses: "remote/action@v1", With: map[string]string{}},
|
||||||
|
RunContext: &RunContext{
|
||||||
|
Config: &Config{},
|
||||||
|
Run: &model.Run{
|
||||||
|
JobID: "1",
|
||||||
|
Workflow: &model.Workflow{
|
||||||
|
Name: "wf",
|
||||||
|
Jobs: map[string]*model.Job{"1": {}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
JobContainer: &containerMock{},
|
||||||
|
},
|
||||||
|
action: &model.Action{Runs: model.ActionRuns{Using: "docker", Image: "Dockerfile"}},
|
||||||
|
env: map[string]string{},
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
done := make(chan error, 1)
|
||||||
|
go func() { done <- execAsDocker(ctx, step, "test-action", actionDir, actionDir, false) }()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-innerEntered:
|
||||||
|
t.Fatal("inner build executor ran before clone lock was released")
|
||||||
|
case err := <-done:
|
||||||
|
t.Fatalf("execAsDocker returned before inner was entered: %v", err)
|
||||||
|
case <-time.After(50 * time.Millisecond):
|
||||||
|
}
|
||||||
|
|
||||||
|
unlockOnce()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-innerEntered:
|
||||||
|
case err := <-done:
|
||||||
|
t.Fatalf("execAsDocker returned without entering inner: %v", err)
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatal("inner build executor not entered after lock released")
|
||||||
|
}
|
||||||
|
|
||||||
|
cancel()
|
||||||
|
releaseOnce()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatal("execAsDocker did not return after inner was released and ctx was canceled")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -142,9 +142,16 @@ func cloneRemoteReusableWorkflow(rc *RunContext, cloneURL, ref, targetDirectory,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var modelNewWorkflowPlanner = model.NewWorkflowPlanner
|
||||||
|
|
||||||
func newReusableWorkflowExecutor(rc *RunContext, directory, workflow string) common.Executor {
|
func newReusableWorkflowExecutor(rc *RunContext, directory, workflow string) common.Executor {
|
||||||
return func(ctx context.Context) error {
|
return func(ctx context.Context) error {
|
||||||
planner, err := model.NewWorkflowPlanner(path.Join(directory, workflow), true)
|
// Scoped to the yaml read so concurrent invocations don't serialize
|
||||||
|
// on the whole job run.
|
||||||
|
planner, err := func() (model.WorkflowPlanner, error) {
|
||||||
|
defer git.AcquireCloneLock(directory)()
|
||||||
|
return modelNewWorkflowPlanner(path.Join(directory, workflow), true)
|
||||||
|
}()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,11 +5,15 @@ package runner
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gitea.com/gitea/runner/act/common/git"
|
||||||
"gitea.com/gitea/runner/act/model"
|
"gitea.com/gitea/runner/act/model"
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
@@ -71,6 +75,54 @@ func TestReusableWorkflowCachedBranchRefRefreshes(t *testing.T) {
|
|||||||
require.Equal(t, tmpl("v2"), string(got), "cached workflow file must reflect the updated branch tip")
|
require.Equal(t, tmpl("v2"), string(got), "cached workflow file must reflect the updated branch tip")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestNewReusableWorkflowExecutorHoldsCloneLock(t *testing.T) {
|
||||||
|
workflowDir := t.TempDir()
|
||||||
|
|
||||||
|
unlockOnce := sync.OnceFunc(git.AcquireCloneLock(workflowDir))
|
||||||
|
defer unlockOnce()
|
||||||
|
|
||||||
|
plannerCalled := make(chan struct{})
|
||||||
|
|
||||||
|
origPlanner := modelNewWorkflowPlanner
|
||||||
|
modelNewWorkflowPlanner = func(string, bool) (model.WorkflowPlanner, error) {
|
||||||
|
close(plannerCalled)
|
||||||
|
return nil, errors.New("stop")
|
||||||
|
}
|
||||||
|
defer func() { modelNewWorkflowPlanner = origPlanner }()
|
||||||
|
|
||||||
|
rc := &RunContext{
|
||||||
|
Config: &Config{},
|
||||||
|
Run: &model.Run{Workflow: &model.Workflow{Jobs: map[string]*model.Job{}}},
|
||||||
|
}
|
||||||
|
exec := newReusableWorkflowExecutor(rc, workflowDir, "reusable.yml")
|
||||||
|
|
||||||
|
done := make(chan error, 1)
|
||||||
|
go func() { done <- exec(context.Background()) }()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-plannerCalled:
|
||||||
|
t.Fatal("planner ran while clone lock was held")
|
||||||
|
case err := <-done:
|
||||||
|
t.Fatalf("executor returned before planner was reached: %v", err)
|
||||||
|
case <-time.After(50 * time.Millisecond):
|
||||||
|
}
|
||||||
|
|
||||||
|
unlockOnce()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-plannerCalled:
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatal("planner not called after lock was released")
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case err := <-done:
|
||||||
|
require.Error(t, err)
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatal("executor did not return after planner ran")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func gitMust(t *testing.T, dir string, args ...string) {
|
func gitMust(t *testing.T, dir string, args ...string) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
cmd := exec.Command("git", args...)
|
cmd := exec.Command("git", args...)
|
||||||
|
|||||||
@@ -145,6 +145,7 @@ func (sar *stepActionRemote) prepareActionExecutor() common.Executor {
|
|||||||
return common.NewPipelineExecutor(
|
return common.NewPipelineExecutor(
|
||||||
ntErr,
|
ntErr,
|
||||||
func(ctx context.Context) error {
|
func(ctx context.Context) error {
|
||||||
|
defer git.AcquireCloneLock(actionDir)()
|
||||||
actionModel, err := sar.readAction(ctx, sar.Step, actionDir, sar.remoteAction.Path, remoteReader(ctx), os.WriteFile)
|
actionModel, err := sar.readAction(ctx, sar.Step, actionDir, sar.remoteAction.Path, remoteReader(ctx), os.WriteFile)
|
||||||
sar.action = actionModel
|
sar.action = actionModel
|
||||||
return err
|
return err
|
||||||
|
|||||||
Reference in New Issue
Block a user