feat: add startup janitor for stale bind-workdir task workspaces (#870)

- Add idle-time cleanup for stale bind-workdir task directories instead of cleaning them on the task execution path.
- Make cleanup behavior configurable with `runner.startup_cleanup_age` as the stale-age threshold (default: `24h`) and `runner.idle_cleanup_interval` as the idle cleanup cadence (default: `10m`).
- Restrict cleanup scope to numeric task directory names only, to avoid touching operator-managed folders.
- Document the cleanup settings in `config.example.yaml` and `README.md`.
- Add tests for stale-directory cleanup, idle cleanup throttling, and config default/override parsing.

## Why

When a runner or host crashes, normal per-task cleanup may not run, leaving stale task directories under the bind-workdir root. Running this cleanup only while the runner is idle recovers that disk space without adding overhead to active job execution.

If you want, I can also tighten the wording around `startup_cleanup_age`, since the key name now reads a bit misleadingly relative to the actual behavior.

---------

Co-authored-by: silverwind <me@silverwind.io>
Reviewed-on: https://gitea.com/gitea/runner/pulls/870
Reviewed-by: silverwind <2021+silverwind@noreply.gitea.com>
This commit is contained in:
Nicolas
2026-05-05 20:11:44 +00:00
parent a22119cf88
commit 2a4d56c650
8 changed files with 556 additions and 3 deletions

View File

@@ -27,6 +27,11 @@ type TaskRunner interface {
Run(ctx context.Context, task *runnerv1.Task) error
}
// IdleRunner can run maintenance while the poller is idle.
type IdleRunner interface {
OnIdle(ctx context.Context)
}
type Poller struct {
client client.Client
runner TaskRunner
@@ -95,6 +100,7 @@ func (p *Poller) Poll() {
task, ok := p.fetchTask(p.pollingCtx, s)
if !ok {
p.runIdleMaintenance()
<-sem
if !p.waitBackoff(s) {
return
@@ -119,6 +125,7 @@ func (p *Poller) PollOnce() {
for {
task, ok := p.fetchTask(p.pollingCtx, s)
if !ok {
p.runIdleMaintenance()
if !p.waitBackoff(s) {
return
}
@@ -130,6 +137,12 @@ func (p *Poller) PollOnce() {
}
}
func (p *Poller) runIdleMaintenance() {
if idleRunner, ok := p.runner.(IdleRunner); ok {
idleRunner.OnIdle(p.jobsCtx)
}
}
func (p *Poller) Shutdown(ctx context.Context) error {
p.shutdownPolling()

View File

@@ -125,6 +125,11 @@ type mockRunner struct {
totalCompleted atomic.Int64
}
type idleAwareRunner struct {
mockRunner
idleCalls atomic.Int64
}
func (m *mockRunner) Run(ctx context.Context, _ *runnerv1.Task) error {
atomicMax(&m.maxConcurrent, m.running.Add(1))
select {
@@ -136,6 +141,78 @@ func (m *mockRunner) Run(ctx context.Context, _ *runnerv1.Task) error {
return nil
}
func TestPollerRunIdleMaintenance(t *testing.T) {
runner := &idleAwareRunner{}
p := &Poller{runner: runner, jobsCtx: context.Background()}
p.runIdleMaintenance()
assert.Equal(t, int64(1), runner.idleCalls.Load())
}
func (m *idleAwareRunner) OnIdle(_ context.Context) {
m.idleCalls.Add(1)
}
func TestPollerPollCallsOnIdle(t *testing.T) {
cli := mocks.NewClient(t)
cli.On("FetchTask", mock.Anything, mock.Anything).Return(
func(_ context.Context, _ *connect_go.Request[runnerv1.FetchTaskRequest]) (*connect_go.Response[runnerv1.FetchTaskResponse], error) {
return connect_go.NewResponse(&runnerv1.FetchTaskResponse{}), nil
},
)
cfg, err := config.LoadDefault("")
require.NoError(t, err)
cfg.Runner.Capacity = 1
cfg.Runner.FetchInterval = 10 * time.Millisecond
cfg.Runner.FetchIntervalMax = 10 * time.Millisecond
runner := &idleAwareRunner{}
poller := New(cfg, cli, runner)
var wg sync.WaitGroup
wg.Go(poller.Poll)
require.Eventually(t, func() bool {
return runner.idleCalls.Load() > 0
}, time.Second, 10*time.Millisecond)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
require.NoError(t, poller.Shutdown(ctx))
wg.Wait()
}
func TestPollerPollOnceCallsOnIdle(t *testing.T) {
cli := mocks.NewClient(t)
cli.On("FetchTask", mock.Anything, mock.Anything).Return(
func(_ context.Context, _ *connect_go.Request[runnerv1.FetchTaskRequest]) (*connect_go.Response[runnerv1.FetchTaskResponse], error) {
return connect_go.NewResponse(&runnerv1.FetchTaskResponse{}), nil
},
)
cfg, err := config.LoadDefault("")
require.NoError(t, err)
cfg.Runner.FetchInterval = 10 * time.Millisecond
cfg.Runner.FetchIntervalMax = 10 * time.Millisecond
runner := &idleAwareRunner{}
poller := New(cfg, cli, runner)
var wg sync.WaitGroup
wg.Go(poller.PollOnce)
require.Eventually(t, func() bool {
return runner.idleCalls.Load() > 0
}, time.Second, 10*time.Millisecond)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
require.NoError(t, poller.Shutdown(ctx))
wg.Wait()
}
// TestPoller_ConcurrencyLimitedByCapacity verifies that with capacity=3 and
// 6 available tasks, at most 3 tasks run concurrently, and FetchTask is
// never called concurrently (single poller).